From 2ab82e08d27dae727800d376fe0532396fc62925 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Wed, 6 Apr 2022 15:11:47 +0000 Subject: [PATCH] Revert "Merge branch 'bugfix-5974' into 'release-3.1'" This reverts merge request !619 --- dap-sdk/net/core/dap_events_socket.c | 2 +- dap-sdk/net/core/dap_proc_queue.c | 73 +++++++------------- dap-sdk/net/core/dap_proc_thread.c | 84 ++++++++++++++--------- dap-sdk/net/core/include/dap_proc_queue.h | 8 +-- 4 files changed, 83 insertions(+), 84 deletions(-) diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 3ab3b3534..8a4c33262 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -2125,7 +2125,7 @@ size_t dap_events_socket_write_unsafe(dap_events_socket_t *a_es, const void * a_ { if (a_es->buf_out_size + a_data_size > a_es->buf_out_size_max) { if (a_es->buf_out_size_max + a_data_size > DAP_EVENTS_SOCKET_BUF_LIMIT) { - log_it(L_ERROR, "Write esocket (%p) buffer overflow size=%zu/max=%zu", a_es, a_es->buf_out_size_max, (size_t)DAP_EVENTS_SOCKET_BUF_LIMIT); + log_it(L_ERROR, "Write esocket buffer overflow size=%zu/max=%zu", a_es->buf_out_size_max, (size_t)DAP_EVENTS_SOCKET_BUF_LIMIT); return 0; } else { size_t l_new_size = a_es->buf_out_size_max * 2; diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/net/core/dap_proc_queue.c index 436aabc15..811fb77f1 100644 --- a/dap-sdk/net/core/dap_proc_queue.c +++ b/dap-sdk/net/core/dap_proc_queue.c @@ -49,11 +49,6 @@ dap_proc_queue_t * dap_proc_queue_create(dap_proc_thread_t * a_thread) if (!l_queue) return NULL; - for (int i = 0; i < DAP_QUE$K_PRIMAX; i++) { - assert ( !(pthread_mutex_init(&l_queue->list[i].lock, 0 )) ); - assert ( !(pthread_cond_init(&l_queue->list[i].cond, 0 )) ); - } - l_queue->proc_thread = a_thread; l_queue->esocket = dap_events_socket_create_type_queue_ptr_unsafe(NULL,s_queue_esocket_callback); l_queue->esocket->proc_thread = a_thread; @@ -88,62 +83,46 @@ int dap_proc_queue_delete(dap_proc_queue_t * a_queue) */ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg) { - int l_rc, pri; - dap_proc_queue_t *l_queue; - dap_proc_queue_msg_t *l_msg; - dap_proc_queue_item_t *l_item; - - assert ( a_es ); - assert( (l_queue = (dap_proc_queue_t*) a_es->_inheritor) ); - - if ( !(l_msg = (dap_proc_queue_msg_t*) a_msg) ) { - log_it(L_CRITICAL, "%s: a_es: %p, a_msg is NULL", __PRETTY_FUNCTION__, a_es); - return; - } + dap_proc_queue_t * l_queue = (dap_proc_queue_t*) a_es->_inheritor; + dap_proc_queue_msg_t * l_msg = (dap_proc_queue_msg_t*) a_msg; + dap_proc_queue_item_t * l_item = DAP_NEW_Z(dap_proc_queue_item_t); - if ( !l_msg->callback ) { - if ( l_msg->signal_kill ) /* Say to kill this object and delete its inherior dap_proc_queue_t */ - a_es->flags |= DAP_SOCK_SIGNAL_CLOSE; + assert(l_msg); + if ( !l_item ) + { + log_it(L_CRITICAL,"Can't allocate memory for callback item, exiting"); DAP_DELETE(l_msg); return; } - log_it(L_DEBUG, "l_queue: %p, l_msg: %p, callback: %p/%p, pri: %d", l_queue, l_msg, l_msg->callback, l_msg->callback_arg, l_msg->pri); - - if ( !(l_item = DAP_NEW_Z(dap_proc_queue_item_t)) ) { - log_it(L_CRITICAL,"Can't allocate memory, drop l_msg:%p, callback: %p/%p, pri: %d", l_msg, l_msg->callback, l_msg->callback_arg, l_msg->pri); + //log_it(L_DEBUG, "l_msg:%p, callback: %p/%p, pri: %d", l_msg, l_msg->callback, l_msg->callback_arg, l_msg->pri); - if ( l_msg->signal_kill ) /* Say to kill this object and delete its inherior dap_proc_queue_t */ - a_es->flags |= DAP_SOCK_SIGNAL_CLOSE; - DAP_DELETE(l_msg); - return; - } - - /* - * So, all checks has been finished, now we can prepare new entry - */ - pri = l_msg->pri; /* Validate priority */ - pri = MIN(pri, DAP_QUE$K_PRIMAX - 1); - pri = MAX(pri, 0); + // We have callback to add in list according with the priority (!!!) + if (l_msg->callback) + { + l_item->callback = l_msg->callback; + l_item->callback_arg = l_msg->callback_arg; - l_item->callback = l_msg->callback; - l_item->callback_arg = l_msg->callback_arg; + if ( l_queue->items[l_msg->pri].item_last) + l_queue->items[l_msg->pri].item_last->prev = l_item; - assert ( !pthread_mutex_lock(&l_queue->list[pri].lock) ); /* Protect list from other threads */ - l_rc = s_dap_insqtail (&l_queue->list[pri].items, l_item, 1); - assert ( !pthread_mutex_unlock(&l_queue->list[pri].lock) ); + l_item->next = l_queue->items[l_msg->pri].item_last ; + l_queue->items[l_msg->pri].item_last = l_item; - if ( l_rc ) - log_it(L_CRITICAL, "Enqueue failed: %d, drop l_msg:%p, callback: %p/%p, pri: %d", l_rc, l_msg, l_msg->callback, l_msg->callback_arg, l_msg->pri); - else - log_it(L_DEBUG, "Enqueued l_msg:%p, callback: %p/%p, pri: %d", l_msg, l_msg->callback, l_msg->callback_arg, l_msg->pri); + if( l_queue->items[l_msg->pri].item_first == NULL){ + //log_it( L_DEBUG, "Added callback %p/%p in proc thread %u callback queue: first in list", l_msg->callback,l_msg->callback_arg, l_queue->proc_thread->cpu_id); + l_queue->items[l_msg->pri].item_first = l_item; + }//else + // log_it( L_DEBUG, "Added callback %p/%p in proc thread %u callback queue: last in list", l_msg->callback,l_msg->callback_arg, l_queue->proc_thread->cpu_id); - dap_events_socket_event_signal(l_queue->proc_thread->proc_event, 1); /* Add on top so after call this callback will be executed first */ + // Add on top so after call this callback will be executed first + dap_events_socket_event_signal(l_queue->proc_thread->proc_event, 1); + } - if (l_msg->signal_kill) /* Say to kill this object and delete its inherior dap_proc_queue_t */ + if (l_msg->signal_kill) // Say to kill this object and delete its inherior dap_proc_queue_t a_es->flags |= DAP_SOCK_SIGNAL_CLOSE; DAP_DELETE(l_msg); diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index fbecc880b..3694c2e75 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -162,12 +162,12 @@ unsigned l_id_min = 0, l_size_min = UINT32_MAX, l_queue_size; static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t __attribute__((unused)) a_value) { dap_proc_thread_t *l_thread; -dap_proc_queue_item_t *l_item; -int l_rc, l_is_anybody_for_repeat, l_is_finished, l_iter_cnt, l_cur_pri; -size_t l_size; +dap_proc_queue_item_t *l_item, *l_item_old; +int l_is_anybody_for_repeat, l_is_finished, l_iter_cnt, l_cur_pri; dap_proc_queue_t *l_queue; - debug_if (s_debug_reactor, L_DEBUG, "--> Proc event callback start, a_esocket:%p ", a_esocket); + if (s_debug_reactor) + log_it(L_DEBUG, "--> Proc event callback start"); if ( !(l_thread = (dap_proc_thread_t *) a_esocket->_inheritor) ) { @@ -176,50 +176,72 @@ dap_proc_queue_t *l_queue; } l_is_anybody_for_repeat = 0; - /*@RRL: l_iter_cnt = DAP_QUE$K_ITER_NR; */ + l_iter_cnt = DAP_QUE$K_ITER_NR; l_cur_pri = (DAP_QUE$K_PRIMAX - 1); l_queue = l_thread->proc_queue; - for ( ; l_cur_pri; l_cur_pri--, l_iter_cnt-- ) /* Run from higest to lowest ... */ + for ( ; l_iter_cnt && l_cur_pri--; ) /* Run from higest to lowest ... */ { - if ( !l_queue->list[l_cur_pri].items.nr ) /* A lockless quick check */ + if ( !(l_item = l_queue->items[l_cur_pri].item_first) ) /* Is there something to do at all ? */ continue; - assert ( !pthread_mutex_lock(&l_queue->list[l_cur_pri].lock) ); /* Protect list from other threads */ - l_rc = s_dap_remqhead (&l_queue->list[l_cur_pri].items, (void **) &l_item, &l_size); - assert ( !pthread_mutex_unlock(&l_queue->list[l_cur_pri].lock) ); + for ( l_item_old = NULL ; l_item; ) + { + if(s_debug_reactor) + log_it(L_INFO, "Proc event callback: %p/%p, prio=%d, iteration=%d", + l_item->callback, l_item->callback_arg, l_cur_pri, l_iter_cnt); - if ( l_rc == -ENOENT ) { /* Queue is empty ? */ - debug_if (s_debug_reactor, L_DEBUG, "a_esocket:%p - nothing to do at prio: %d ", a_esocket, l_cur_pri); - continue; - } + l_is_finished = l_item->callback(l_thread, l_item->callback_arg); - debug_if (s_debug_reactor, L_INFO, "Proc event callback: %p/%p, prio=%d, iteration=%d", - l_item->callback, l_item->callback_arg, l_cur_pri, l_iter_cnt); + if (l_is_finished) { + if ( l_item->prev ) + l_item->prev->next = l_item_old; - assert(l_item->callback); /* Just for ensuring ... */ + if(l_item_old){ + l_item_old->prev = l_item->prev; - l_is_finished = l_item->callback(l_thread, l_item->callback_arg); - debug_if (s_debug_reactor, L_INFO, "Proc event callback: %p/%p, prio=%d, iteration=%d - is %sfinished", - l_item->callback, l_item->callback_arg, l_cur_pri, l_iter_cnt, l_is_finished ? "" : "not "); + if ( ! l_item->prev ) // We deleted tail + l_queue->items[l_cur_pri].item_last = l_item_old; - if ( !(l_is_finished) ) { - /* Rearm callback to be executed again */ - assert ( !pthread_mutex_lock(&l_queue->list[l_cur_pri].lock) ); - l_rc = s_dap_insqtail (&l_queue->list[l_cur_pri].items, l_item, 1); - assert ( !pthread_mutex_unlock(&l_queue->list[l_cur_pri].lock) ); - } - else { - DAP_DELETE(l_item); - } + DAP_DELETE(l_item); + l_item = l_item_old->prev; + } else { + l_queue->items[l_cur_pri].item_first = l_item->prev; + + if ( l_item->prev) + l_item->prev->next = NULL; // Prev if it was - now its NULL + else + l_queue->items[l_cur_pri].item_last = NULL; // NULL last item - l_is_anybody_for_repeat += (!l_is_finished); + DAP_DELETE(l_item); + l_item = l_queue->items[l_cur_pri].item_first; + } /* if (l_is_finished) */ + + if ( s_debug_reactor ) { + log_it(L_DEBUG, "Proc event finished"); + } + } else { + if ( s_debug_reactor ) + log_it(L_DEBUG, "Proc event not finished"); + + l_item_old = l_item; + l_item = l_item->prev; + } + + l_is_anybody_for_repeat += (!l_is_finished); + + if ( (l_iter_cnt -= 1) == 1 ) /* Rest a last iteration to get a chance to */ + break; /* execute callback with the <l_cur_pri> - 1 */ + } } + l_is_anybody_for_repeat += (!l_iter_cnt); /* All iterations is used - the set "say what again" */ + if ( l_is_anybody_for_repeat ) /* Arm event if we have something to proc again */ dap_events_socket_event_signal(a_esocket, 1); - debug_if(s_debug_reactor, L_DEBUG, "<-- Proc event callback end, repeat flag is: %d, iterations: %d", l_is_anybody_for_repeat, l_iter_cnt); + if(s_debug_reactor) + log_it(L_DEBUG, "<-- Proc event callback end, repeat flag is: %d, iterations: %d", l_is_anybody_for_repeat, l_iter_cnt); } diff --git a/dap-sdk/net/core/include/dap_proc_queue.h b/dap-sdk/net/core/include/dap_proc_queue.h index be2b8fdd4..d6a1a8d46 100644 --- a/dap-sdk/net/core/include/dap_proc_queue.h +++ b/dap-sdk/net/core/include/dap_proc_queue.h @@ -23,7 +23,6 @@ #pragma once #include "dap_events_socket.h" -#include "dap_list.h" /* Simple List routines */ typedef struct dap_proc_thread dap_proc_thread_t; @@ -63,10 +62,9 @@ typedef struct dap_proc_queue{ dap_events_socket_t *esocket; struct { - pthread_mutex_t lock; /* To coordinate access to the queuee's entries */ - pthread_cond_t cond; /* For signaling to waiters ... */ - dap_slist_t items; /* An array of list according of priority numbers */ - } list [DAP_QUE$K_PRIMAX]; + dap_proc_queue_item_t *item_first, *item_last; /* Queue's entries ... */ + } items [DAP_QUE$K_PRIMAX]; + } dap_proc_queue_t; dap_proc_queue_t * dap_proc_queue_create(dap_proc_thread_t * a_thread); -- GitLab