From 651e77071498f908e5f86f6e095296613f525202 Mon Sep 17 00:00:00 2001
From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net>
Date: Wed, 11 Nov 2020 19:40:49 +0700
Subject: [PATCH] [!] Working interthread queues

---
 dap-sdk/core/include/dap_common.h           |   4 +-
 dap-sdk/net/core/dap_events_socket.c        |  23 ++-
 dap-sdk/net/core/dap_proc_queue.c           |   8 +-
 dap-sdk/net/core/dap_proc_thread.c          | 153 ++++++++++++--------
 dap-sdk/net/core/dap_worker.c               |  44 +++---
 modules/channel/chain/dap_stream_ch_chain.c |   2 +-
 6 files changed, 139 insertions(+), 95 deletions(-)

diff --git a/dap-sdk/core/include/dap_common.h b/dap-sdk/core/include/dap_common.h
index 5ed8b225ad..3f1c18578b 100755
--- a/dap-sdk/core/include/dap_common.h
+++ b/dap-sdk/core/include/dap_common.h
@@ -25,7 +25,8 @@
 //#define _XOPEN_SOURCE 700
 
 #pragma once
-
+#define __STDC_WANT_LIB_EXT1__ 1
+#include <string.h>
 #include <stdarg.h>
 #include <stddef.h>
 #include <stdbool.h>
@@ -41,7 +42,6 @@
 #include <dispatch/dispatch.h>
 #endif
 #include "portable_endian.h"
-
 typedef uint8_t byte_t;
 
 // Stuffs an integer into a pointer type
diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c
index ea9519f113..0ebf4522f7 100644
--- a/dap-sdk/net/core/dap_events_socket.c
+++ b/dap-sdk/net/core/dap_events_socket.c
@@ -151,8 +151,16 @@ void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct da
 
 void dap_events_socket_assign_on_worker_inter(dap_events_socket_t * a_es_input, dap_events_socket_t * a_es)
 {
+    if (!a_es)
+        log_it(L_ERROR, "Can't send NULL esocket in interthreads pipe input");
+    if (!a_es_input)
+        log_it(L_ERROR, "Interthreads pipe input is NULL");
+    if (! a_es || ! a_es_input)
+        return;
+
     a_es->last_ping_request = time(NULL);
-   // log_it(L_DEBUG, "Assigned %p on worker %u", a_es, a_worker->id);
+    //log_it(L_DEBUG, "Interthread assign esocket %p(fd %d) on input esocket %p (fd %d)", a_es, a_es->fd,
+    //       a_es_input, a_es_input->fd);
     dap_worker_add_events_socket_inter(a_es_input,a_es);
 
 }
@@ -355,9 +363,8 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc
     memset(l_file_buf, 0, l_file_buf_size);
     fread(l_file_buf, l_file_buf_size, 1, l_sys_max_pipe_size_fd);
     uint64_t l_sys_max_pipe_size = strtoull(l_file_buf, 0, 10);
-    if (l_sys_max_pipe_size && fcntl(l_pipe[0], F_SETPIPE_SZ, l_sys_max_pipe_size) == l_sys_max_pipe_size) {
-        log_it(L_DEBUG, "Successfully resized pipe buffer to %lld", l_sys_max_pipe_size);
-    }
+    fcntl(l_pipe[0], F_SETPIPE_SZ, l_sys_max_pipe_size);
+
 #elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX)
     char l_mq_name[64];
     struct mq_attr l_mq_attr ={0};
@@ -427,7 +434,9 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket)
         if (a_esocket->flags & DAP_SOCK_QUEUE_PTR){
             void * l_queue_ptr = NULL;
 #if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2)
-            if(read( a_esocket->fd, &l_queue_ptr,sizeof (void *)) == sizeof (void *))
+            ssize_t l_read_ret = read( a_esocket->fd, &l_queue_ptr,sizeof (void *));
+            int l_read_errno = errno;
+            if( l_read_ret == (ssize_t) sizeof (void *))
                 a_esocket->callbacks.queue_ptr_callback(a_esocket, l_queue_ptr);
             else if ( (errno != EAGAIN) && (errno != EWOULDBLOCK) )  // we use blocked socket for now but who knows...
                 log_it(L_WARNING, "Can't read packet from pipe");
@@ -677,7 +686,9 @@ static void add_ptr_to_buf(dap_events_socket_t * a_es, void* a_arg)
  */
 int dap_events_socket_queue_ptr_send_to_input(dap_events_socket_t * a_es_input, void * a_arg)
 {
-   return dap_events_socket_write_unsafe(a_es_input,&a_arg,sizeof (a_arg) )==sizeof (a_arg) ;
+    volatile void * l_arg = a_arg;
+    int ret= dap_events_socket_write_unsafe(a_es_input,&l_arg,sizeof (l_arg) )==sizeof (l_arg)?0:1 ;
+    return ret;
 }
 
 /**
diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/net/core/dap_proc_queue.c
index e9ae5baa8b..8a5a288f70 100644
--- a/dap-sdk/net/core/dap_proc_queue.c
+++ b/dap-sdk/net/core/dap_proc_queue.c
@@ -20,6 +20,7 @@
     You should have received a copy of the GNU General Public License
     along with any DAP SDK based project.  If not, see <http://www.gnu.org/licenses/>.
 */
+#include <assert.h>
 #include "dap_worker.h"
 #include "dap_proc_queue.h"
 #include "dap_proc_thread.h"
@@ -66,10 +67,9 @@ void dap_proc_queue_delete(dap_proc_queue_t * a_queue)
  */
 static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg)
 {
-    //log_it(L_DEBUG, "New callback in list accepted");
     dap_proc_queue_t * l_queue = (dap_proc_queue_t*) a_es->_inheritor;
     dap_proc_queue_msg_t * l_msg = (dap_proc_queue_msg_t*) a_msg;
-
+    assert(l_msg);
     // We have callback to add in list
     if (l_msg->callback){
         dap_proc_queue_item_t * l_item = DAP_NEW_Z(dap_proc_queue_item_t);
@@ -78,8 +78,8 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg)
         l_item->next=l_queue->items;
         l_queue->items = l_item;
         // Add on top so after call this callback will be executed first
-        dap_events_socket_queue_ptr_send(l_queue->proc_thread->proc_event,NULL);
-        //log_it( L_DEBUG, "Sent signal to proc thread that we have callbacks on board");
+        dap_events_socket_event_signal(l_queue->proc_thread->proc_event,1);
+        //log_it( L_DEBUG, "Sent signal to proc thread that we have callback %p/%p on board", l_msg->callback,l_msg->callback_arg);
     }
     if (l_msg->signal_kill){ // Say to kill this object and delete its inherior dap_proc_queue_t
         a_es->flags |= DAP_SOCK_SIGNAL_CLOSE;
diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c
index 293ffae4c9..0f3b5bfe0d 100644
--- a/dap-sdk/net/core/dap_proc_thread.c
+++ b/dap-sdk/net/core/dap_proc_thread.c
@@ -117,7 +117,7 @@ dap_proc_thread_t * dap_proc_thread_get_auto()
  * @param a_esocket
  * @param a_value
  */
-static void s_proc_event_callback(dap_events_socket_t * a_esocket, void * a_value)
+static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_value)
 {
     (void) a_value;
     //log_it(L_DEBUG, "Proc event callback");
@@ -148,6 +148,35 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, void * a_valu
         dap_events_socket_event_signal(a_esocket,1);
 }
 
+/**
+ * @brief s_update_poll_flags
+ * @param a_thread
+ * @param a_esocket
+ * @return
+ */
+static int s_update_poll_flags(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket)
+{
+#ifdef DAP_EVENTS_CAPS_EPOLL
+    l_ev.events = a_esocket->ev_base_events;
+    if( a_esocket->flags & DAP_SOCK_READY_TO_READ)
+        l_ev.events |= EPOLLIN;
+    if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE)
+        l_ev.events |= EPOLLOUT;
+    l_ev.data.ptr = a_esocket ;
+    if( epoll_ctl(a_thread->epoll_ctl, EPOLL_CTL_MOD, a_esocket->fd , &l_ev) != 0 ){
+        log_it(L_CRITICAL, "Can't add proc queue on epoll ctl");
+        return -1;
+    }
+#elif defined (DAP_EVENTS_CAPS_POLL)
+    a_thread->poll[a_esocket->poll_index].events= a_esocket->poll_base_flags;
+    if( a_esocket->flags & DAP_SOCK_READY_TO_READ)
+        a_thread->poll[a_esocket->poll_index].revents |= POLLIN;
+    if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE)
+        a_thread->poll[a_esocket->poll_index].revents |= POLLOUT;
+#endif
+    return 0;
+}
+
 static void * s_proc_thread_function(void * a_arg)
 {
 
@@ -161,14 +190,16 @@ static void * s_proc_thread_function(void * a_arg)
 
     // Init proc_queue for related worker
     dap_worker_t * l_worker_related = dap_events_worker_get(l_thread->cpu_id);
+    assert(l_worker_related);
     l_worker_related->proc_queue = l_thread->proc_queue;
     l_worker_related->proc_queue_input = dap_events_socket_queue_ptr_create_input(l_worker_related->proc_queue->esocket);
 
+    dap_events_socket_assign_on_worker_mt(l_worker_related->proc_queue_input,l_worker_related);
 
-    l_thread->proc_event = dap_events_socket_create_type_queue_ptr_unsafe(NULL, s_proc_event_callback);
+    l_thread->proc_event = dap_events_socket_create_type_event_unsafe(NULL, s_proc_event_callback);
     l_thread->proc_event->_inheritor = l_thread; // we pass thread through it
     size_t l_workers_count= dap_events_worker_get_count();
-
+    assert(l_workers_count);
     l_thread->queue_assign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)*l_workers_count  );
     l_thread->queue_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)*l_workers_count  );
 
@@ -180,7 +211,7 @@ static void * s_proc_thread_function(void * a_arg)
         l_thread->queue_io_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_io );
     }
 #ifdef DAP_EVENTS_CAPS_EPOLL
-    struct epoll_event l_epoll_events = l_thread->epoll_events, l_ev;
+    struct epoll_event *l_epoll_events = l_thread->epoll_events, l_ev;
     memset(l_thread->epoll_events, 0,sizeof (l_thread->epoll_events));
 
     // Create epoll ctl
@@ -237,17 +268,21 @@ static void * s_proc_thread_function(void * a_arg)
     l_thread->poll_count++;
 
     for (size_t n = 0; n< dap_events_worker_get_count(); n++){
-        l_thread->queue_assign_input[n]->poll_index = l_thread->poll_count;
-        l_thread->poll[l_thread->poll_count].fd = l_thread->queue_assign_input[n]->fd;
-        l_thread->poll[l_thread->poll_count].events = l_thread->queue_assign_input[n]->poll_base_flags;
-        l_thread->esockets[l_thread->poll_count] = l_thread->queue_assign_input[n];
-        l_thread->poll_count++;
-
-        l_thread->queue_io_input[n]->poll_index = l_thread->poll_count;
-        l_thread->poll[l_thread->poll_count].fd = l_thread->queue_io_input[n]->fd;
-        l_thread->poll[l_thread->poll_count].events = l_thread->queue_io_input[n]->poll_base_flags;
-        l_thread->esockets[l_thread->poll_count] = l_thread->queue_io_input[n];
-        l_thread->poll_count++;
+        dap_events_socket_t * l_queue_assign_input =  l_thread->queue_assign_input[n];
+        dap_events_socket_t * l_queue_io_input =  l_thread->queue_io_input[n];
+        if (l_queue_assign_input&&l_queue_io_input){
+            l_queue_assign_input->poll_index = l_thread->poll_count;
+            l_thread->poll[l_thread->poll_count].fd = l_queue_assign_input->fd;
+            l_thread->poll[l_thread->poll_count].events = l_queue_assign_input->poll_base_flags;
+            l_thread->esockets[l_thread->poll_count] = l_queue_assign_input;
+            l_thread->poll_count++;
+
+            l_queue_io_input->poll_index = l_thread->poll_count;
+            l_thread->poll[l_thread->poll_count].fd = l_queue_io_input->fd;
+            l_thread->poll[l_thread->poll_count].events = l_queue_io_input->poll_base_flags;
+            l_thread->esockets[l_thread->poll_count] = l_queue_io_input;
+            l_thread->poll_count++;
+        }
     }
 
 #else
@@ -271,7 +306,7 @@ static void * s_proc_thread_function(void * a_arg)
 #else
 #error "Unimplemented poll wait analog for this platform"
 #endif
-        //log_it(L_DEBUG,"Proc thread waked up");
+
         if(l_selected_sockets == -1) {
             if( errno == EINTR)
                 continue;
@@ -318,6 +353,8 @@ static void * s_proc_thread_function(void * a_arg)
                 log_it(L_ERROR, "dap_events_socket NULL");
                 continue;
             }
+            //log_it(L_DEBUG,"Waked up esocket %p (socket %d) {read:%s,write:%s,error:%s} ", l_cur, l_cur->fd,
+            //           l_flag_read?"true":"false", l_flag_write?"true":"false", l_flag_error?"true":"false" );
             time_t l_cur_time = time( NULL);
             l_cur->last_time_active = l_cur_time;
             if (l_flag_error){
@@ -348,40 +385,39 @@ static void * s_proc_thread_function(void * a_arg)
             }
             if (l_flag_write ){
                 ssize_t l_bytes_sent = -1;
-                switch (l_cur->type) {
-                    case DESCRIPTOR_TYPE_QUEUE:
-                        if (l_cur->flags & DAP_SOCK_QUEUE_PTR){
-#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2)
-                            l_bytes_sent = write(l_cur->socket, l_cur->buf_out, sizeof (void *) ); // We send pointer by pointer
-#elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX)
-                            l_bytes_sent = mq_send(a_es->mqd, (const char *) l_cur->buf_out, sizeof (void *),0);
-#else
-#error "Not implemented dap_events_socket_queue_ptr_send() for this platform"
-#endif
-                            int l_errno = errno;
-                            break;
-                        }break;
-                    default:
-                        log_it(L_ERROR, "Dont process write flags for this socket %d in proc thread", l_cur->fd);
+                if (l_cur->buf_out_size){
+                    switch (l_cur->type) {
+                        case DESCRIPTOR_TYPE_QUEUE:
+                            if (l_cur->flags & DAP_SOCK_QUEUE_PTR){
+                                #if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2)
+                                    l_bytes_sent = write(l_cur->socket, l_cur->buf_out, sizeof (void *) ); // We send pointer by pointer
+                                #elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX)
+                                    l_bytes_sent = mq_send(a_es->mqd, (const char *) l_cur->buf_out, sizeof (void *),0);
+                                #else
+                                    #error "Not implemented dap_events_socket_queue_ptr_send() for this platform"
+                                #endif
+                                int l_errno = errno;
+
+                                break;
+                            }break;
+                        default:
+                            log_it(L_ERROR, "Dont process write flags for this socket %d in proc thread", l_cur->fd);
 
+                    }
+                }else{
+                    log_it(L_DEBUG,"(!) Write event receieved but nothing in buffer, switching off this flag");
+                    l_cur->flags ^= DAP_SOCK_READY_TO_WRITE;
+                    s_update_poll_flags(l_thread, l_cur);
                 }
                 if(l_bytes_sent>0){
                     l_cur->buf_out_size -= l_bytes_sent;
+                    //log_it(L_DEBUG,"Sent %zd bytes out, left %zd in buf out", l_bytes_sent, l_cur->buf_out);
                     if (l_cur->buf_out_size ){ // Shrink output buffer
+
                         memmove(l_cur->buf_out, l_cur->buf_out+l_bytes_sent, l_cur->buf_out_size );
                     }else{
-                        #ifdef DAP_EVENTS_CAPS_EPOLL
-                        l_ev.events = l_epoll_events[n].events ^ EPOLLOUT  ;
-                        l_ev.data.ptr = l_cur;
-                        if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_MOD, l_cur->fd , &l_ev) != 0 ){
-                            log_it(L_CRITICAL, "Can't update queue_ptr on epoll ctl on proc thread");
-                            return NULL;
-                        }
-                        #elif defined ( DAP_EVENTS_CAPS_POLL)
-                        l_thread->poll[n].events ^= POLLOUT;
-                        #else
-                        #error "Not implemented poll/epoll here"
-                        #endif
+                        l_cur->flags ^= DAP_SOCK_READY_TO_WRITE;
+                        s_update_poll_flags(l_thread, l_cur);
                     }
                 }
             }
@@ -417,10 +453,12 @@ static void * s_proc_thread_function(void * a_arg)
            l_poll_compress = false;
            for (size_t i = 0; i < l_thread->poll_count ; i++)  {
                if ( l_thread->poll[i].fd == -1){
-                   for(size_t j = i; j < l_thread->poll_count-1; j++){
-                       l_thread->poll[j].fd = l_thread->poll[j+1].fd;
-                       l_thread->esockets[j] = l_thread->esockets[j+1];
-                       l_thread->esockets[j]->poll_index = j;
+                   if ( l_thread->poll_count){
+                       for(size_t j = i; j < l_thread->poll_count-1; j++){
+                           l_thread->poll[j].fd = l_thread->poll[j+1].fd;
+                           l_thread->esockets[j] = l_thread->esockets[j+1];
+                           l_thread->esockets[j]->poll_index = j;
+                       }
                    }
                    i--;
                    l_thread->poll_count--;
@@ -449,22 +487,11 @@ static void * s_proc_thread_function(void * a_arg)
  */
 bool dap_proc_thread_assign_on_worker_inter(dap_proc_thread_t * a_thread, dap_worker_t * a_worker, dap_events_socket_t *a_esocket  )
 {
-    dap_events_socket_assign_on_worker_inter(a_thread->queue_assign_input[a_worker->id], a_esocket);
-
-#ifdef DAP_EVENTS_CAPS_EPOLL
-    struct epoll_event l_ev;
-    l_ev.events = a_esocket->ev_base_flags | EPOLLOUT  ;
-    l_ev.data.ptr = a_esocket;
-    if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_MOD, e_socket->fd , &l_ev) != 0 ){
-        log_it(L_ERROR, "Can't update queue_ptr on epoll ctl on proc thread");
-        return false;
-    }
-#elif defined ( DAP_EVENTS_CAPS_POLL)
-    a_thread->poll[a_esocket->poll_index].events |= POLLOUT;
-#else
-#error "Not implemented poll/epoll here"
-#endif
-
+    dap_events_socket_t * l_es_assign_input = a_thread->queue_assign_input[a_worker->id];
+    //log_it(L_DEBUG,"Remove esocket %p from proc thread and send it to worker #%u",a_esocket, a_worker->id);
+    dap_events_socket_assign_on_worker_inter(l_es_assign_input, a_esocket);
+    l_es_assign_input->flags |= DAP_SOCK_READY_TO_WRITE;
+    s_update_poll_flags(a_thread, l_es_assign_input);
     return true;
 }
 
diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c
index 414c63079d..901e5960d1 100644
--- a/dap-sdk/net/core/dap_worker.c
+++ b/dap-sdk/net/core/dap_worker.c
@@ -168,7 +168,8 @@ void *dap_worker_thread(void *arg)
                 log_it(L_ERROR, "dap_events_socket NULL");
                 continue;
             }
-            //            log_it(L_DEBUG, "Worker=%d fd=%d", l_worker->id, l_cur->socket);
+            //log_it(L_DEBUG, "Worker #%u esocket %p fd=%d (read=%s, write=%s, error=%s)", l_worker->id, l_cur, l_cur->socket,
+            //                    l_flag_read?"true":"false", l_flag_write?"true":"false", l_flag_error?"true":"false" );
 
             int l_sock_err = 0, l_sock_err_size = sizeof(l_sock_err);
             //connection already closed (EPOLLHUP - shutdown has been made in both directions)
@@ -398,11 +399,10 @@ void *dap_worker_thread(void *arg)
                             l_errno = errno;
                         break;
                         case DESCRIPTOR_TYPE_QUEUE:
-                            if (l_cur->flags & DAP_SOCK_QUEUE_PTR){
+                             if (l_cur->flags & DAP_SOCK_QUEUE_PTR && l_cur->buf_out_size>= sizeof (void*)){
 #if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2)
 
-                                l_bytes_sent = write(l_cur->socket, (char *) (l_cur->buf_out + l_bytes_sent),
-                                                 sizeof (void *) ); // We send pointer by pointer
+                                l_bytes_sent = write(l_cur->socket, l_cur->buf_out, sizeof (void *) ); // We send pointer by pointer
 #elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX)
                                 l_bytes_sent = mq_send(a_es->mqd, (const char *)&a_arg,sizeof (a_arg),0);
 #else
@@ -413,8 +413,7 @@ void *dap_worker_thread(void *arg)
                             }
                         case DESCRIPTOR_TYPE_PIPE:
                         case DESCRIPTOR_TYPE_FILE:
-                            l_bytes_sent = write(l_cur->socket, (char *) (l_cur->buf_out + l_bytes_sent),
-                                    l_cur->buf_out_size );
+                            l_bytes_sent = write(l_cur->socket, (char *) (l_cur->buf_out), l_cur->buf_out_size );
                             l_errno = errno;
                         break;
                         default:
@@ -476,10 +475,12 @@ void *dap_worker_thread(void *arg)
            l_worker->poll_compress = false;
            for (size_t i = 0; i < l_worker->poll_count ; i++)  {
                if ( l_worker->poll[i].fd == -1){
-                   for(size_t j = i; j < l_worker->poll_count-1; j++){
-                       l_worker->poll[j].fd = l_worker->poll[j+1].fd;
-                       l_worker->poll_esocket[j] = l_worker->poll_esocket[j+1];
-                       l_worker->poll_esocket[j]->poll_index = j;
+                   if( l_worker->poll_count){
+                       for(size_t j = i; j < l_worker->poll_count-1; j++){
+                           l_worker->poll[j].fd = l_worker->poll[j+1].fd;
+                           l_worker->poll_esocket[j] = l_worker->poll_esocket[j+1];
+                           l_worker->poll_esocket[j]->poll_index = j;
+                       }
                    }
                    i--;
                    l_worker->poll_count--;
@@ -499,10 +500,15 @@ void *dap_worker_thread(void *arg)
  */
 static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg)
 {
+    dap_worker_t * l_worker = a_es->worker;
     dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg;
-    dap_worker_t * w = a_es->worker;
+    if (!l_es_new){
+        log_it(L_ERROR,"NULL esocket accepted to add on worker #%u", l_worker->id);
+        return;
+    }
+
     //log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new);
-    if(dap_events_socket_check_unsafe( w, l_es_new)){
+    if(dap_events_socket_check_unsafe( l_worker, l_es_new)){
         // Socket already present in worker, it's OK
         return;
     }
@@ -512,13 +518,13 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg)
         case DESCRIPTOR_TYPE_SOCKET_UDP:
         case DESCRIPTOR_TYPE_SOCKET:
         case DESCRIPTOR_TYPE_SOCKET_LISTENING:{
-            int l_cpu = w->id;
+            int l_cpu = l_worker->id;
             setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu));
         }break;
         default: {}
     }
 
-    l_es_new->worker = w;
+    l_es_new->worker = l_worker;
     l_es_new->last_time_active = time(NULL);
     // We need to differ new and reassigned esockets. If its new - is_initialized is false
     if ( ! l_es_new->is_initalized ){
@@ -528,18 +534,18 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg)
     }
 
     if (l_es_new->socket>0){
-        int l_ret = dap_worker_add_events_socket_unsafe(l_es_new,w);
+        int l_ret = dap_worker_add_events_socket_unsafe(l_es_new,l_worker);
         if (  l_ret != 0 ){
             log_it(L_CRITICAL,"Can't add event socket's handler to worker i/o poll mechanism with error %d", errno);
         }else{
             // Add in global list
             // Add in worker
             l_es_new->me = l_es_new;
-            HASH_ADD(hh_worker, w->esockets, me, sizeof(void *), l_es_new );
-            w->event_sockets_count++;
+            HASH_ADD(hh_worker, l_worker->esockets, me, sizeof(void *), l_es_new );
+            l_worker->event_sockets_count++;
             //log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id);
             if (l_es_new->callbacks.worker_assign_callback)
-                l_es_new->callbacks.worker_assign_callback(l_es_new, w);
+                l_es_new->callbacks.worker_assign_callback(l_es_new, l_worker);
 
         }
     }else{
@@ -704,7 +710,7 @@ void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_wor
  */
 void dap_worker_add_events_socket_inter(dap_events_socket_t * a_es_input, dap_events_socket_t * a_events_socket)
 {
-    if( dap_events_socket_queue_ptr_send_to_input( a_es_input, a_events_socket ) != sizeof (a_events_socket) ){
+    if( dap_events_socket_queue_ptr_send_to_input( a_es_input, a_events_socket ) != 0 ){
         int l_errno = errno;
         char l_errbuf[128];
         *l_errbuf = 0;
diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c
index 01bab756e8..ffaf123cc1 100644
--- a/modules/channel/chain/dap_stream_ch_chain.c
+++ b/modules/channel/chain/dap_stream_ch_chain.c
@@ -619,7 +619,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
     case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: {
         dap_stream_ch_chain_sync_request_t l_sync_chains = {};
         memcpy(&l_sync_chains, l_chain_pkt->data, l_chain_pkt_data_size);
-        dap_chain_t *l_chain = l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
+        dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
         dap_chain_hash_fast_t *l_hash = dap_db_get_last_hash_remote(l_sync_chains.node_addr.uint64, l_chain);
         if (l_hash) {
             memcpy(&l_sync_chains.hash_from, l_hash, sizeof(*l_hash));
-- 
GitLab