diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 0eec5a14dc5ee6ff351c0ccdc0a686e062a4bc3d..30346a117f9296bdfea71277fbd36b7374e02f10 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -492,8 +492,8 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket snprintf(l_mq_name,sizeof (l_mq_name), "/%s-queue_ptr-%u", dap_get_appname(), l_es->mqd_id ); - if ( (l_errno = mq_unlink(l_mq_name)) ) /* Mark this MQ to be deleted as the process will be terminated */ - log_it(L_DEBUG, "mq_unlink(%s)->%d", l_mq_name, l_errno); + //if ( (l_errno = mq_unlink(l_mq_name)) ) /* Mark this MQ to be deleted as the process will be terminated */ + // log_it(L_DEBUG, "mq_unlink(%s)->%d", l_mq_name, l_errno); if ( 0 >= (l_es->mqd = mq_open(l_mq_name, O_CREAT|O_WRONLY |O_NONBLOCK, 0700, &l_mq_attr)) ) { @@ -663,8 +663,8 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc l_es->mqd_id = atomic_fetch_add( &l_mq_last_number, 1); snprintf(l_mq_name,sizeof (l_mq_name), "/%s-queue_ptr-%u", dap_get_appname(), l_es->mqd_id ); - if ( (l_errno = mq_unlink(l_mq_name)) ) /* Mark this MQ to be deleted as the process will be terminated */ - log_it(L_DEBUG, "mq_unlink(%s)->%d", l_mq_name, l_errno); + // if ( (l_errno = mq_unlink(l_mq_name)) ) /* Mark this MQ to be deleted as the process will be terminated */ + // log_it(L_DEBUG, "mq_unlink(%s)->%d", l_mq_name, l_errno); if ( 0 >= (l_es->mqd = mq_open(l_mq_name, O_CREAT|O_RDWR |O_NONBLOCK, 0700, &l_mq_attr)) ) { @@ -1247,19 +1247,21 @@ int dap_events_socket_queue_ptr_send_to_input(dap_events_socket_t * a_es_input, * @param a_es * @param a_arg */ -int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg) +int dap_events_socket_queue_ptr_send( dap_events_socket_t *a_es, void *a_arg) { - int l_ret=-1024; - int l_errno; + int l_ret = -1024, l_errno; + if (s_debug_reactor) log_it(L_DEBUG,"Sent ptr %p to esocket queue %p (%d)", a_arg, a_es, a_es? a_es->fd : -1); + #if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) l_ret = write(a_es->fd2, &a_arg, sizeof(a_arg)); l_errno = errno; #elif defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE) assert(a_es); assert(a_es->mqd); - l_ret = mq_send(a_es->mqd, (const char *)&a_arg,sizeof (a_arg),0); + + l_ret = mq_send(a_es->mqd, (const char *)&a_arg, sizeof (a_arg), 0); l_errno = errno; if ( l_ret == EPERM){ log_it(L_ERROR,"No permissions to send data in mqueue"); @@ -1269,8 +1271,9 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg) l_errno = EAGAIN; if (l_ret == 0) l_ret = sizeof (a_arg); - else if (l_ret >0) + else if (l_ret > 0) l_ret = -l_ret; + #elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX) struct timespec l_timeout; clock_gettime(CLOCK_REALTIME, &l_timeout); @@ -2177,15 +2180,19 @@ size_t dap_events_socket_write_f_unsafe(dap_events_socket_t *a_es, const char * */ size_t dap_events_socket_pop_from_buf_in(dap_events_socket_t *a_es, void *a_data, size_t a_data_size) { - if(a_data_size<a_es->buf_in_size){ - memcpy(a_data,a_es->buf_in,a_data_size); - memmove(a_data,a_es->buf_in+a_data_size,a_es->buf_in_size-a_data_size); - }else{ - if(a_data_size>a_es->buf_in_size) - a_data_size=a_es->buf_in_size; - memcpy(a_data,a_es->buf_in,a_data_size); + if ( a_data_size < a_es->buf_in_size) + { + memcpy(a_data, a_es->buf_in, a_data_size); + memmove(a_es->buf_in, a_es->buf_in + a_data_size, a_es->buf_in_size - a_data_size); + } else { + if ( a_data_size > a_es->buf_in_size ) + a_data_size = a_es->buf_in_size; + + memcpy(a_data, a_es->buf_in, a_data_size); } - a_es->buf_in_size-=a_data_size; + + a_es->buf_in_size -= a_data_size; + return a_data_size; } @@ -2197,9 +2204,11 @@ size_t dap_events_socket_pop_from_buf_in(dap_events_socket_t *a_es, void *a_data */ void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size) { - if((shrink_size==0)||(cl->buf_in_size==0) ){ + if ( (!shrink_size) || (!cl->buf_in_size) ) return; - }else if(cl->buf_in_size>shrink_size){ + + if (cl->buf_in_size > shrink_size) + { size_t buf_size=cl->buf_in_size-shrink_size; uint8_t* tmp = cl->buf_in + shrink_size; memmove(cl->buf_in,tmp,buf_size); diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/net/core/dap_proc_queue.c index 4cf3938267b3413fdfe7a35372e83818ab929926..98615c12e02a2b9f96a6a02a5f8da47dbe2510ea 100644 --- a/dap-sdk/net/core/dap_proc_queue.c +++ b/dap-sdk/net/core/dap_proc_queue.c @@ -96,6 +96,10 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg) return; } + + log_it(L_DEBUG, "l_msg:%p, callback: %p/%p, pri: %d", l_msg, l_msg->callback, l_msg->callback_arg, l_msg->pri); + + // We have callback to add in list according with the priority (!!!) if (l_msg->callback) { diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 343e735528d0b8225620bdb216cb373f24e4785a..07cc81f660fac40621e07ba79afa73895f3e9f21 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -241,7 +241,7 @@ dap_proc_queue_t *l_queue; dap_events_socket_event_signal(a_esocket, 1); if(s_debug_reactor) - log_it(L_DEBUG, "<-- Proc event callback end"); + log_it(L_DEBUG, "<-- Proc event callback end, repeat flag is: %d, iterations: %d", l_is_anybody_for_repeat, l_iter_cnt); }