From 2ab82e08d27dae727800d376fe0532396fc62925 Mon Sep 17 00:00:00 2001
From: Roman Khlopkov <roman.khlopkov@demlabs.net>
Date: Wed, 6 Apr 2022 15:11:47 +0000
Subject: [PATCH] Revert "Merge branch 'bugfix-5974' into 'release-3.1'"

This reverts merge request !619
---
 dap-sdk/net/core/dap_events_socket.c      |  2 +-
 dap-sdk/net/core/dap_proc_queue.c         | 73 +++++++-------------
 dap-sdk/net/core/dap_proc_thread.c        | 84 ++++++++++++++---------
 dap-sdk/net/core/include/dap_proc_queue.h |  8 +--
 4 files changed, 83 insertions(+), 84 deletions(-)

diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c
index 3ab3b3534..8a4c33262 100644
--- a/dap-sdk/net/core/dap_events_socket.c
+++ b/dap-sdk/net/core/dap_events_socket.c
@@ -2125,7 +2125,7 @@ size_t dap_events_socket_write_unsafe(dap_events_socket_t *a_es, const void * a_
 {
     if (a_es->buf_out_size + a_data_size > a_es->buf_out_size_max) {
         if (a_es->buf_out_size_max + a_data_size > DAP_EVENTS_SOCKET_BUF_LIMIT) {
-            log_it(L_ERROR, "Write esocket (%p) buffer overflow size=%zu/max=%zu", a_es, a_es->buf_out_size_max, (size_t)DAP_EVENTS_SOCKET_BUF_LIMIT);
+            log_it(L_ERROR, "Write esocket buffer overflow size=%zu/max=%zu", a_es->buf_out_size_max, (size_t)DAP_EVENTS_SOCKET_BUF_LIMIT);
             return 0;
         } else {
             size_t l_new_size = a_es->buf_out_size_max * 2;
diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/net/core/dap_proc_queue.c
index 436aabc15..811fb77f1 100644
--- a/dap-sdk/net/core/dap_proc_queue.c
+++ b/dap-sdk/net/core/dap_proc_queue.c
@@ -49,11 +49,6 @@ dap_proc_queue_t * dap_proc_queue_create(dap_proc_thread_t * a_thread)
     if (!l_queue)
         return NULL;
 
-    for (int i = 0; i < DAP_QUE$K_PRIMAX; i++) {
-        assert ( !(pthread_mutex_init(&l_queue->list[i].lock, 0 )) );
-        assert ( !(pthread_cond_init(&l_queue->list[i].cond, 0 )) );
-    }
-
     l_queue->proc_thread = a_thread;
     l_queue->esocket = dap_events_socket_create_type_queue_ptr_unsafe(NULL,s_queue_esocket_callback);
     l_queue->esocket->proc_thread = a_thread;
@@ -88,62 +83,46 @@ int dap_proc_queue_delete(dap_proc_queue_t * a_queue)
  */
 static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg)
 {
-    int l_rc, pri;
-    dap_proc_queue_t *l_queue;
-    dap_proc_queue_msg_t *l_msg;
-    dap_proc_queue_item_t *l_item;
-
-    assert ( a_es );
-    assert( (l_queue = (dap_proc_queue_t*) a_es->_inheritor) );
-
-    if ( !(l_msg = (dap_proc_queue_msg_t*) a_msg) ) {
-        log_it(L_CRITICAL, "%s: a_es: %p, a_msg is NULL", __PRETTY_FUNCTION__, a_es);
-        return;
-    }
+    dap_proc_queue_t * l_queue = (dap_proc_queue_t*) a_es->_inheritor;
+    dap_proc_queue_msg_t * l_msg = (dap_proc_queue_msg_t*) a_msg;
+    dap_proc_queue_item_t * l_item = DAP_NEW_Z(dap_proc_queue_item_t);
 
-    if ( !l_msg->callback ) {
-        if ( l_msg->signal_kill )                                           /* Say to kill this object and delete its inherior dap_proc_queue_t */
-            a_es->flags |= DAP_SOCK_SIGNAL_CLOSE;
+    assert(l_msg);
 
+    if ( !l_item )
+    {
+        log_it(L_CRITICAL,"Can't allocate memory for callback item, exiting");
         DAP_DELETE(l_msg);
         return;
     }
 
-    log_it(L_DEBUG, "l_queue: %p, l_msg: %p, callback: %p/%p, pri: %d", l_queue, l_msg, l_msg->callback, l_msg->callback_arg, l_msg->pri);
-
 
-    if ( !(l_item = DAP_NEW_Z(dap_proc_queue_item_t)) ) {
-        log_it(L_CRITICAL,"Can't allocate memory, drop l_msg:%p, callback: %p/%p, pri: %d", l_msg, l_msg->callback, l_msg->callback_arg, l_msg->pri);
+    //log_it(L_DEBUG, "l_msg:%p, callback: %p/%p, pri: %d", l_msg, l_msg->callback, l_msg->callback_arg, l_msg->pri);
 
-        if ( l_msg->signal_kill )                                           /* Say to kill this object and delete its inherior dap_proc_queue_t */
-            a_es->flags |= DAP_SOCK_SIGNAL_CLOSE;
 
-        DAP_DELETE(l_msg);
-        return;
-    }
-
-    /*
-     * So, all checks has been finished, now we can prepare new entry
-     */
-    pri = l_msg->pri;                                                       /* Validate priority */
-    pri = MIN(pri, DAP_QUE$K_PRIMAX - 1);
-    pri = MAX(pri, 0);
+    // We have callback to add in list according with the priority (!!!)
+    if (l_msg->callback)
+    {
+        l_item->callback = l_msg->callback;
+        l_item->callback_arg = l_msg->callback_arg;
 
-    l_item->callback = l_msg->callback;
-    l_item->callback_arg = l_msg->callback_arg;
+        if ( l_queue->items[l_msg->pri].item_last)
+            l_queue->items[l_msg->pri].item_last->prev = l_item;
 
-    assert ( !pthread_mutex_lock(&l_queue->list[pri].lock) );               /* Protect list from other threads */
-    l_rc = s_dap_insqtail (&l_queue->list[pri].items, l_item, 1);
-    assert ( !pthread_mutex_unlock(&l_queue->list[pri].lock) );
+        l_item->next = l_queue->items[l_msg->pri].item_last ;
+        l_queue->items[l_msg->pri].item_last = l_item;
 
-    if ( l_rc )
-        log_it(L_CRITICAL, "Enqueue failed: %d, drop l_msg:%p, callback: %p/%p, pri: %d", l_rc, l_msg, l_msg->callback, l_msg->callback_arg, l_msg->pri);
-    else
-        log_it(L_DEBUG, "Enqueued l_msg:%p, callback: %p/%p, pri: %d", l_msg, l_msg->callback, l_msg->callback_arg, l_msg->pri);
+        if( l_queue->items[l_msg->pri].item_first == NULL){
+            //log_it( L_DEBUG, "Added callback %p/%p in proc thread %u callback queue: first in list", l_msg->callback,l_msg->callback_arg, l_queue->proc_thread->cpu_id);
+            l_queue->items[l_msg->pri].item_first = l_item;
+        }//else
+        //    log_it( L_DEBUG, "Added callback %p/%p in proc thread %u callback queue: last in list", l_msg->callback,l_msg->callback_arg, l_queue->proc_thread->cpu_id);
 
-    dap_events_socket_event_signal(l_queue->proc_thread->proc_event, 1);    /* Add on top so after call this callback will be executed first */
+        // Add on top so after call this callback will be executed first
+        dap_events_socket_event_signal(l_queue->proc_thread->proc_event, 1);
+    }
 
-    if (l_msg->signal_kill)                                                 /* Say to kill this object and delete its inherior dap_proc_queue_t */
+    if (l_msg->signal_kill) // Say to kill this object and delete its inherior dap_proc_queue_t
         a_es->flags |= DAP_SOCK_SIGNAL_CLOSE;
 
     DAP_DELETE(l_msg);
diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c
index fbecc880b..3694c2e75 100644
--- a/dap-sdk/net/core/dap_proc_thread.c
+++ b/dap-sdk/net/core/dap_proc_thread.c
@@ -162,12 +162,12 @@ unsigned l_id_min = 0, l_size_min = UINT32_MAX, l_queue_size;
 static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t __attribute__((unused))  a_value)
 {
 dap_proc_thread_t   *l_thread;
-dap_proc_queue_item_t *l_item;
-int     l_rc, l_is_anybody_for_repeat, l_is_finished, l_iter_cnt, l_cur_pri;
-size_t  l_size;
+dap_proc_queue_item_t *l_item, *l_item_old;
+int l_is_anybody_for_repeat, l_is_finished, l_iter_cnt, l_cur_pri;
 dap_proc_queue_t    *l_queue;
 
-    debug_if (s_debug_reactor, L_DEBUG, "--> Proc event callback start, a_esocket:%p ", a_esocket);
+    if (s_debug_reactor)
+        log_it(L_DEBUG, "--> Proc event callback start");
 
     if ( !(l_thread = (dap_proc_thread_t *) a_esocket->_inheritor) )
         {
@@ -176,50 +176,72 @@ dap_proc_queue_t    *l_queue;
         }
 
     l_is_anybody_for_repeat = 0;
-    /*@RRL:  l_iter_cnt = DAP_QUE$K_ITER_NR; */
+    l_iter_cnt = DAP_QUE$K_ITER_NR;
     l_cur_pri = (DAP_QUE$K_PRIMAX - 1);
     l_queue = l_thread->proc_queue;
 
-    for ( ; l_cur_pri; l_cur_pri--, l_iter_cnt-- )                          /* Run from higest to lowest ... */
+    for ( ; l_iter_cnt && l_cur_pri--; )                                    /* Run from higest to lowest ... */
     {
-        if ( !l_queue->list[l_cur_pri].items.nr )                           /* A lockless quick check */
+        if ( !(l_item = l_queue->items[l_cur_pri].item_first) )             /* Is there something to do at all ? */
             continue;
 
-        assert ( !pthread_mutex_lock(&l_queue->list[l_cur_pri].lock) );     /* Protect list from other threads */
-        l_rc = s_dap_remqhead (&l_queue->list[l_cur_pri].items, (void **) &l_item, &l_size);
-        assert ( !pthread_mutex_unlock(&l_queue->list[l_cur_pri].lock) );
+        for ( l_item_old = NULL ; l_item;  )
+        {
+            if(s_debug_reactor)
+                log_it(L_INFO, "Proc event callback: %p/%p, prio=%d, iteration=%d",
+                       l_item->callback, l_item->callback_arg, l_cur_pri, l_iter_cnt);
 
-        if  ( l_rc == -ENOENT ) {                                           /* Queue is empty ? */
-            debug_if (s_debug_reactor, L_DEBUG, "a_esocket:%p - nothing to do at prio: %d ", a_esocket, l_cur_pri);
-            continue;
-        }
+            l_is_finished = l_item->callback(l_thread, l_item->callback_arg);
 
-        debug_if (s_debug_reactor, L_INFO, "Proc event callback: %p/%p, prio=%d, iteration=%d",
-                       l_item->callback, l_item->callback_arg, l_cur_pri, l_iter_cnt);
+            if (l_is_finished) {
+                if ( l_item->prev )
+                    l_item->prev->next = l_item_old;
 
-        assert(l_item->callback);                                           /* Just for ensuring ... */
+                if(l_item_old){
+                    l_item_old->prev = l_item->prev;
 
-        l_is_finished = l_item->callback(l_thread, l_item->callback_arg);
-        debug_if (s_debug_reactor, L_INFO, "Proc event callback: %p/%p, prio=%d, iteration=%d - is %sfinished",
-                           l_item->callback, l_item->callback_arg, l_cur_pri, l_iter_cnt, l_is_finished ? "" : "not ");
+                    if ( ! l_item->prev )  // We deleted tail
+                        l_queue->items[l_cur_pri].item_last = l_item_old;
 
-        if ( !(l_is_finished) ) {
-                                                                            /* Rearm callback to be executed again */
-            assert ( !pthread_mutex_lock(&l_queue->list[l_cur_pri].lock) );
-            l_rc = s_dap_insqtail (&l_queue->list[l_cur_pri].items, l_item, 1);
-            assert ( !pthread_mutex_unlock(&l_queue->list[l_cur_pri].lock) );
-        }
-        else    {
-            DAP_DELETE(l_item);
-        }
+                    DAP_DELETE(l_item);
+                    l_item = l_item_old->prev;
+                } else  {
+                    l_queue->items[l_cur_pri].item_first = l_item->prev;
+
+                    if ( l_item->prev)
+                        l_item->prev->next = NULL; // Prev if it was - now its NULL
+                    else
+                        l_queue->items[l_cur_pri].item_last = NULL; // NULL last item
 
-        l_is_anybody_for_repeat += (!l_is_finished);
+                    DAP_DELETE(l_item);
+                    l_item = l_queue->items[l_cur_pri].item_first;
+                } /* if (l_is_finished) */
+
+                if ( s_debug_reactor ) {
+                    log_it(L_DEBUG, "Proc event finished");
+                }
+            } else {
+                if ( s_debug_reactor )
+                    log_it(L_DEBUG, "Proc event not finished");
+
+                l_item_old = l_item;
+                l_item = l_item->prev;
+            }
+
+            l_is_anybody_for_repeat += (!l_is_finished);
+
+            if ( (l_iter_cnt -= 1) == 1 )                                   /* Rest a last iteration to get a chance to  */
+                break;                                                      /* execute callback with the <l_cur_pri> - 1 */
+        }
     }
 
+    l_is_anybody_for_repeat += (!l_iter_cnt);                               /* All iterations is used - the set "say what again" */
+
     if ( l_is_anybody_for_repeat )                                          /* Arm event if we have something to proc again */
         dap_events_socket_event_signal(a_esocket, 1);
 
-    debug_if(s_debug_reactor, L_DEBUG, "<-- Proc event callback end, repeat flag is: %d, iterations: %d", l_is_anybody_for_repeat, l_iter_cnt);
+    if(s_debug_reactor)
+        log_it(L_DEBUG, "<-- Proc event callback end, repeat flag is: %d, iterations: %d", l_is_anybody_for_repeat, l_iter_cnt);
 }
 
 
diff --git a/dap-sdk/net/core/include/dap_proc_queue.h b/dap-sdk/net/core/include/dap_proc_queue.h
index be2b8fdd4..d6a1a8d46 100644
--- a/dap-sdk/net/core/include/dap_proc_queue.h
+++ b/dap-sdk/net/core/include/dap_proc_queue.h
@@ -23,7 +23,6 @@
 #pragma once
 
 #include "dap_events_socket.h"
-#include "dap_list.h"                                                       /* Simple List routines */
 
 typedef struct dap_proc_thread dap_proc_thread_t;
 
@@ -63,10 +62,9 @@ typedef struct dap_proc_queue{
         dap_events_socket_t *esocket;
 
         struct {
-        pthread_mutex_t     lock;                                           /* To coordinate access to the queuee's entries */
-        pthread_cond_t      cond;                                           /* For signaling  to waiters ... */
-        dap_slist_t         items;                                          /* An array of list according of priority numbers */
-        } list [DAP_QUE$K_PRIMAX];
+            dap_proc_queue_item_t   *item_first, *item_last;                 /* Queue's entries ... */
+        } items [DAP_QUE$K_PRIMAX];
+
 } dap_proc_queue_t;
 
 dap_proc_queue_t * dap_proc_queue_create(dap_proc_thread_t * a_thread);
-- 
GitLab