diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/net/core/dap_proc_queue.c index 1907b40511f8b5c5067289e7fe909f0b55c3e9cd..4cf3938267b3413fdfe7a35372e83818ab929926 100644 --- a/dap-sdk/net/core/dap_proc_queue.c +++ b/dap-sdk/net/core/dap_proc_queue.c @@ -31,7 +31,8 @@ typedef struct dap_proc_queue_msg{ dap_proc_queue_callback_t callback; void * callback_arg; - bool signal_kill; + int signal_kill, + pri; /* Message priority, see DAP_QUE$K_PRI* constants */ } dap_proc_queue_msg_t; static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg); @@ -69,7 +70,8 @@ int dap_proc_queue_delete(dap_proc_queue_t * a_queue) if (!l_msg) return -ENOMEM; - l_msg->signal_kill = true; + l_msg->signal_kill = 1; /* TRUE */ + l_msg->pri = DAP_QUE$K_PRI_HIGH; /* Assume that KILL must be delivered ASAP */ return dap_events_socket_queue_ptr_send( a_queue->esocket, l_msg ); } @@ -83,36 +85,41 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg) { dap_proc_queue_t * l_queue = (dap_proc_queue_t*) a_es->_inheritor; dap_proc_queue_msg_t * l_msg = (dap_proc_queue_msg_t*) a_msg; + dap_proc_queue_item_t * l_item = DAP_NEW_Z(dap_proc_queue_item_t); + assert(l_msg); - // We have callback to add in list - if (l_msg->callback) { - dap_proc_queue_item_t * l_item = DAP_NEW_Z(dap_proc_queue_item_t); - if (! l_item){ - log_it(L_CRITICAL,"Can't allocate memory for callback item, exiting"); - DAP_DELETE(l_msg); - return; - } + + if ( !l_item ) + { + log_it(L_CRITICAL,"Can't allocate memory for callback item, exiting"); + DAP_DELETE(l_msg); + return; + } + + // We have callback to add in list according with the priority (!!!) + if (l_msg->callback) + { l_item->callback = l_msg->callback; l_item->callback_arg = l_msg->callback_arg; - if ( l_queue->item_last) - l_queue->item_last->prev = l_item; + if ( l_queue->items[l_msg->pri].item_last) + l_queue->items[l_msg->pri].item_last->prev = l_item; - l_item->next=l_queue->item_last ; - l_queue->item_last = l_item; + l_item->next = l_queue->items[l_msg->pri].item_last ; + l_queue->items[l_msg->pri].item_last = l_item; - if( l_queue->item_first == NULL){ + if( l_queue->items[l_msg->pri].item_first == NULL){ //log_it( L_DEBUG, "Added callback %p/%p in proc thread %u callback queue: first in list", l_msg->callback,l_msg->callback_arg, l_queue->proc_thread->cpu_id); - l_queue->item_first = l_item; + l_queue->items[l_msg->pri].item_first = l_item; }//else // log_it( L_DEBUG, "Added callback %p/%p in proc thread %u callback queue: last in list", l_msg->callback,l_msg->callback_arg, l_queue->proc_thread->cpu_id); // Add on top so after call this callback will be executed first - dap_events_socket_event_signal(l_queue->proc_thread->proc_event,1); + dap_events_socket_event_signal(l_queue->proc_thread->proc_event, 1); } - if (l_msg->signal_kill){ // Say to kill this object and delete its inherior dap_proc_queue_t + + if (l_msg->signal_kill) // Say to kill this object and delete its inherior dap_proc_queue_t a_es->flags |= DAP_SOCK_SIGNAL_CLOSE; - } DAP_DELETE(l_msg); } @@ -134,10 +141,48 @@ int dap_proc_queue_add_callback(dap_worker_t * a_worker,dap_proc_queue_callback_ l_msg->callback = a_callback; l_msg->callback_arg = a_callback_arg; - + l_msg->pri = DAP_QUE$K_PRI_NORMAL; + /* + * Send message to queue with the DEFAULT priority + */ return dap_events_socket_queue_ptr_send( a_worker->proc_queue->esocket , l_msg ); } + +/** + * @brief dap_proc_queue_add_callback + * @param a_worker + * @param a_callback + * @param a_callback_arg + * @param a_pri - priority, DAP_QUE$K_PRI* constants + * @return: -ENOMEM in case of memory allocation error + * other <errno> codes from the internaly called routine + */ +int dap_proc_queue_add_callback_ext(dap_worker_t * a_worker,dap_proc_queue_callback_t a_callback, void * a_callback_arg, + int a_pri) +{ +dap_proc_queue_msg_t *l_msg; + + if ( !(a_pri < DAP_QUE$K_PRIMAX) ) /* Check that priority level is in legal range */ + { + log_it(L_WARNING, "Priority level %d is incorrect (should be is in range %d-%d)", a_pri, DAP_QUE$K_PRI0 + 1, DAP_QUE$K_PRIMAX - 1); + a_pri = DAP_QUE$K_PRI_NORMAL; + } + + if ( !(l_msg = DAP_NEW_Z(dap_proc_queue_msg_t)) ) /* Allocate memory for a new message */ + return -ENOMEM; + + l_msg->callback = a_callback; + l_msg->callback_arg = a_callback_arg; + l_msg->pri = a_pri; + + /* + * Send message to queueu with the given priority + */ + return dap_events_socket_queue_ptr_send ( a_worker->proc_queue->esocket , l_msg); +} + + /** * @brief dap_proc_queue_add_callback_inter * @param a_es_input @@ -155,6 +200,37 @@ int dap_proc_queue_add_callback_inter( dap_events_socket_t * a_es_input, dap_pro l_msg->callback = a_callback; l_msg->callback_arg = a_callback_arg; + l_msg->pri = DAP_QUE$K_PRI_NORMAL; + + return dap_events_socket_queue_ptr_send_to_input( a_es_input , l_msg ); +} + + +/** + * @brief dap_proc_queue_add_callback_inter + * @param a_es_input + * @param a_callback + * @param a_callback_arg + * @return: -ENOMEM in case of memory allocation error + * other <errno> codes from the internaly called routine + */ +int dap_proc_queue_add_callback_inter_ext( dap_events_socket_t * a_es_input, dap_proc_queue_callback_t a_callback, void * a_callback_arg, + int a_pri) +{ +dap_proc_queue_msg_t * l_msg; + + if ( !(a_pri < DAP_QUE$K_PRIMAX) ) /* Check that priority level is in legal range */ + { + log_it(L_WARNING, "Priority level %d is incorrect (should be is in range %d-%d)", a_pri, DAP_QUE$K_PRI0 + 1, DAP_QUE$K_PRIMAX - 1); + a_pri = DAP_QUE$K_PRI_NORMAL; + } + + if ( !(l_msg = DAP_NEW_Z(dap_proc_queue_msg_t)) ) /* Allocate memory for a new message */ + return -ENOMEM; + + l_msg->callback = a_callback; + l_msg->callback_arg = a_callback_arg; + l_msg->pri = a_pri; return dap_events_socket_queue_ptr_send_to_input( a_es_input , l_msg ); } diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 6663c852ec8de8e9454f6ba3b3d10b9326408cc5..343e735528d0b8225620bdb216cb373f24e4785a 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -152,58 +152,94 @@ unsigned l_id_min = 0, l_size_min = UINT32_MAX, l_queue_size; } /** - * @brief s_proc_event_callback + * @brief s_proc_event_callback - get from queue next element and execute action routine, + * repeat execution depending on status is returned by action routine. + * * @param a_esocket * @param a_value + * */ -static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_value) +static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t __attribute__((unused)) a_value) { - (void) a_value; - if(s_debug_reactor) +dap_proc_thread_t *l_thread; +dap_proc_queue_item_t *l_item, *l_item_old; +int l_is_anybody_for_repeat, l_is_finished, l_iter_cnt, l_cur_pri; +dap_proc_queue_t *l_queue; + + if (s_debug_reactor) log_it(L_DEBUG, "--> Proc event callback start"); - dap_proc_thread_t * l_thread = (dap_proc_thread_t *) a_esocket->_inheritor; - dap_proc_queue_item_t * l_item = l_thread->proc_queue->item_first; - dap_proc_queue_item_t * l_item_old = NULL; - bool l_is_anybody_for_repeat=false; - while(l_item){ - if(s_debug_reactor) - log_it(L_INFO, "Proc event callback: %p/%p", l_item->callback, l_item->callback_arg); - bool l_is_finished = l_item->callback(l_thread, l_item->callback_arg); - if (l_is_finished){ - if ( l_item->prev ){ - l_item->prev->next = l_item_old; - } - if(l_item_old){ - l_item_old->prev = l_item->prev; - if ( ! l_item->prev ) { // We deleted tail - l_thread->proc_queue->item_last = l_item_old; + if ( !(l_thread = (dap_proc_thread_t *) a_esocket->_inheritor) ) + { + log_it(L_ERROR, "NULL <dap_proc_thread_t> context is detected"); + return; + } + + l_is_anybody_for_repeat = 0; + l_iter_cnt = DAP_QUE$K_ITER_NR; + l_cur_pri = (DAP_QUE$K_PRIMAX - 1); + l_queue = l_thread->proc_queue; + + for ( ; l_iter_cnt && l_cur_pri--; ) /* Run from higest to lowest ... */ + { + if ( !(l_item = l_queue->items[l_cur_pri].item_first) ) /* Is there something to do at all ? */ + continue; + + for ( l_item_old = NULL ; l_item; ) + { + if(s_debug_reactor) + log_it(L_INFO, "Proc event callback: %p/%p, prio=%d, iteration=%d", + l_item->callback, l_item->callback_arg, l_cur_pri, l_iter_cnt); + + l_is_finished = l_item->callback(l_thread, l_item->callback_arg); + + if (l_is_finished) { + if ( l_item->prev ) + l_item->prev->next = l_item_old; + + if(l_item_old){ + l_item_old->prev = l_item->prev; + + if ( ! l_item->prev ) // We deleted tail + l_queue->items[l_cur_pri].item_last = l_item_old; + + DAP_DELETE(l_item); + l_item = l_item_old->prev; + } else { + l_queue->items[l_cur_pri].item_first = l_item->prev; + + if ( l_item->prev) + l_item->prev->next = NULL; // Prev if it was - now its NULL + else + l_queue->items[l_cur_pri].item_last = NULL; // NULL last item + + DAP_DELETE(l_item); + l_item = l_queue->items[l_cur_pri].item_first; + } /* if (l_is_finished) */ + + if ( s_debug_reactor ) { + log_it(L_DEBUG, "Proc event finished"); } + } else { + if ( s_debug_reactor ) + log_it(L_DEBUG, "Proc event not finished"); - DAP_DELETE(l_item); - l_item = l_item_old->prev; - }else{ - l_thread->proc_queue->item_first = l_item->prev; - if ( l_item->prev){ - l_item->prev->next = NULL; // Prev if it was - now its NULL - }else - l_thread->proc_queue->item_last = NULL; // NULL last item - - DAP_DELETE(l_item); - l_item = l_thread->proc_queue->item_first; + l_item_old = l_item; + l_item = l_item->prev; } - if(s_debug_reactor) - log_it(L_DEBUG, "Proc event finished"); - }else{ - if(s_debug_reactor) - log_it(L_DEBUG, "Proc event not finished"); - l_item_old = l_item; - l_item=l_item->prev; + + l_is_anybody_for_repeat += (!l_is_finished); + + if ( (l_iter_cnt -= 1) == 1 ) /* Rest a last iteration to get a chance to */ + break; /* execute callback with the <l_cur_pri> - 1 */ } - l_is_anybody_for_repeat = !l_is_finished; } - if(l_is_anybody_for_repeat) // Arm event if we have smth to proc again - dap_events_socket_event_signal(a_esocket,1); + + l_is_anybody_for_repeat += (!l_iter_cnt); /* All iterations is used - the set "say what again" */ + + if ( l_is_anybody_for_repeat ) /* Arm event if we have something to proc again */ + dap_events_socket_event_signal(a_esocket, 1); + if(s_debug_reactor) log_it(L_DEBUG, "<-- Proc event callback end"); } @@ -318,7 +354,7 @@ int dap_proc_thread_esocket_update_poll_flags(dap_proc_thread_t * a_thread, dap_ } struct kevent * l_event = &a_esocket->kqueue_event; // Check & add - bool l_is_error=false; + int l_is_error=false; int l_errno=0; if (a_esocket->type == DESCRIPTOR_TYPE_EVENT || a_esocket->type == DESCRIPTOR_TYPE_QUEUE){ EV_SET(l_event, a_esocket->socket, EVFILT_USER,EV_ADD| EV_CLEAR ,0,0, &a_esocket->kqueue_event_catched_data ); @@ -507,7 +543,7 @@ static void * s_proc_thread_function(void * a_arg) #elif defined(DAP_EVENTS_CAPS_POLL) l_thread->poll_count_max = DAP_EVENTS_SOCKET_MAX; l_thread->poll_count = 0; - bool l_poll_compress = false; + int l_poll_compress = false; l_thread->poll = DAP_NEW_Z_SIZE(struct pollfd,l_thread->poll_count_max *sizeof (*l_thread->poll)); l_thread->esockets = DAP_NEW_Z_SIZE(dap_events_socket_t*,l_thread->poll_count_max *sizeof (*l_thread->esockets)); @@ -620,7 +656,7 @@ static void * s_proc_thread_function(void * a_arg) } for(size_t n = 0; n < l_sockets_max; n++) { dap_events_socket_t * l_cur; - bool l_flag_hup, l_flag_rdhup, l_flag_read, l_flag_write, l_flag_error, + int l_flag_hup, l_flag_rdhup, l_flag_read, l_flag_write, l_flag_error, l_flag_nval,l_flag_pri,l_flag_msg; #ifdef DAP_EVENTS_CAPS_EPOLL l_cur = (dap_events_socket_t *) l_epoll_events[n].data.ptr; diff --git a/dap-sdk/net/core/include/dap_proc_queue.h b/dap-sdk/net/core/include/dap_proc_queue.h index da531274ff5204bb6a3a4fa33a1adb87a380fcfb..09800fb3c37216239968c26dd7b07591b06274cb 100644 --- a/dap-sdk/net/core/include/dap_proc_queue.h +++ b/dap-sdk/net/core/include/dap_proc_queue.h @@ -25,27 +25,53 @@ typedef struct dap_proc_thread dap_proc_thread_t; -typedef bool (*dap_proc_queue_callback_t)(dap_proc_thread_t*,void* ); // Callback for processor. Returns true if - // we want to stop callback execution and - // not to go on next loop +typedef bool (*dap_proc_queue_callback_t)(dap_proc_thread_t*,void* ); // Callback for processor. Returns true if + // we want to stop callback execution and + // not to go on next loop +enum { + DAP_QUE$K_PRI0 = 0, /* Lowest priority (Idle) */ + DAP_QUE$K_PRI_IDLE = DAP_QUE$K_PRI0, /* Don't use Idle if u are not sure that understand how it works */ + + DAP_QUE$K_PRI1 = 1, /* Low priority */ + DAP_QUE$K_PRI_LOW = DAP_QUE$K_PRI1, + + + DAP_QUE$K_PRI2 = 2, + DAP_QUE$K_PRI_NORMAL = DAP_QUE$K_PRI2, /* Default priority for any queue's entry; + has assigned implicitly */ + + DAP_QUE$K_PRI3 = 3, /* Higest priority */ + DAP_QUE$K_PRI_HIGH = DAP_QUE$K_PRI3, + + DAP_QUE$K_PRIMAX = 4 /* End-of-list marker */ +}; + +#define DAP_QUE$K_ITER_NR 7 typedef struct dap_proc_queue_item{ - dap_proc_queue_callback_t callback; - void *callback_arg; - struct dap_proc_queue_item * next; - struct dap_proc_queue_item * prev; + dap_proc_queue_callback_t callback; /* An address of the action routine */ + void *callback_arg; /* Address of the action routine argument */ + + struct dap_proc_queue_item *prev; /* Links to back and forward entries */ + struct dap_proc_queue_item *next; } dap_proc_queue_item_t; typedef struct dap_proc_queue{ - dap_proc_thread_t * proc_thread; - dap_events_socket_t *esocket; - dap_proc_queue_item_t * item_last; - dap_proc_queue_item_t * item_first; + dap_proc_thread_t *proc_thread; /* An assigned processor threads for the quueue's entries */ + dap_events_socket_t *esocket; + + struct { + dap_proc_queue_item_t *item_first, *item_last; /* Queue's entries ... */ + } items [DAP_QUE$K_PRIMAX]; + } dap_proc_queue_t; dap_proc_queue_t * dap_proc_queue_create(dap_proc_thread_t * a_thread); int dap_proc_queue_delete(dap_proc_queue_t * a_queue); int dap_proc_queue_add_callback(dap_worker_t * a_worker, dap_proc_queue_callback_t a_callback, void * a_callback_arg); -int dap_proc_queue_add_callback_inter( dap_events_socket_t * a_es_input, dap_proc_queue_callback_t a_callback, void * a_callback_arg); +int dap_proc_queue_add_callback_inter(dap_events_socket_t * a_es_input, dap_proc_queue_callback_t a_callback, void * a_callback_arg); + +int dap_proc_queue_add_callback_ext(dap_worker_t * a_worker, dap_proc_queue_callback_t a_callback, void * a_callback_arg, int a_pri); +int dap_proc_queue_add_callback_inter_ext(dap_events_socket_t * a_es_input, dap_proc_queue_callback_t a_callback, void * a_callback_arg, int ); diff --git a/dap-sdk/net/core/include/dap_proc_thread.h b/dap-sdk/net/core/include/dap_proc_thread.h index 930539d06a80cd0801116570282756acc7dae1aa..6e87433c02165152a7ce0f3a22b877c85f6e2ca8 100644 --- a/dap-sdk/net/core/include/dap_proc_thread.h +++ b/dap-sdk/net/core/include/dap_proc_thread.h @@ -29,18 +29,20 @@ #include "dap_worker.h" typedef struct dap_proc_thread{ - uint32_t cpu_id; - pthread_t thread_id; - dap_proc_queue_t * proc_queue; + uint32_t cpu_id; + pthread_t thread_id; /* TID has been returned by pthread_create() */ + + dap_proc_queue_t *proc_queue; /* Queues */ + atomic_uint proc_queue_size; /* Thread's load factor - is not supported at the moment */ + dap_events_socket_t * proc_event; // Should be armed if we have to deal with it dap_events_socket_t ** queue_assign_input; // Inputs for assign queues dap_events_socket_t ** queue_io_input; // Inputs for assign queues dap_events_socket_t ** queue_callback_input; // Inputs for worker context callback queues - atomic_uint proc_queue_size; /* Thread's load factor */ - bool signal_kill; - bool signal_exit; + int signal_kill; + int signal_exit; dap_events_socket_t * event_exit; diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/net/core/include/dap_worker.h index 37503dba9ea17ef95fd2efb3716c0ddc7354fac7..5dcdf6851ffc6430af3ea93d7ae29304b7f72b5f 100644 --- a/dap-sdk/net/core/include/dap_worker.h +++ b/dap-sdk/net/core/include/dap_worker.h @@ -23,12 +23,13 @@ #pragma once #include "dap_events_socket.h" +#include "dap_proc_queue.h" #include <pthread.h> #include "dap_common.h" -typedef struct dap_proc_queue dap_proc_queue_t; +//typedef struct dap_proc_queue dap_proc_queue_t; typedef struct dap_timerfd dap_timerfd_t; typedef struct dap_worker { @@ -42,7 +43,7 @@ typedef struct dap_worker dap_events_socket_t *esockets; // Hashmap of event sockets // Signal to exit - bool signal_exit; + int signal_exit; // worker control queues dap_events_socket_t * queue_es_new; // Queue socket for new socket