From 65180b71e94e465415ac1683e3d620f6616ae2ac Mon Sep 17 00:00:00 2001
From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net>
Date: Sat, 29 Aug 2020 23:25:01 +0700
Subject: [PATCH] [-] Removed some debug output [+] Added some checks

---
 dap-sdk/net/client/dap_client_pvt.c           |  2 +
 dap-sdk/net/core/dap_events_socket.c          | 23 ++++----
 dap-sdk/net/core/dap_proc_queue.c             |  4 +-
 dap-sdk/net/core/dap_proc_thread.c            |  6 +-
 dap-sdk/net/core/dap_worker.c                 | 18 ++++--
 .../net/server/http_server/dap_http_simple.c  |  3 +-
 dap-sdk/net/stream/ch/dap_stream_ch.c         |  2 +-
 dap-sdk/net/stream/stream/dap_stream.c        |  3 +-
 .../net/stream/stream/include/dap_stream.h    |  2 +-
 modules/channel/chain/dap_stream_ch_chain.c   | 55 ++++++++++---------
 10 files changed, 67 insertions(+), 51 deletions(-)

diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c
index 1404f7e32f..8486ad41fd 100644
--- a/dap-sdk/net/client/dap_client_pvt.c
+++ b/dap-sdk/net/client/dap_client_pvt.c
@@ -490,6 +490,8 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
 
             // new added, whether it is necessary?
             a_client_pvt->stream->session->key = a_client_pvt->stream_key;
+            a_client_pvt->stream_worker = (dap_stream_worker_t*) l_worker->_inheritor;
+
 
             // connect
             struct sockaddr_in l_remote_addr;
diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c
index 3f48f92454..d9f71bf55b 100644
--- a/dap-sdk/net/core/dap_events_socket.c
+++ b/dap-sdk/net/core/dap_events_socket.c
@@ -94,7 +94,7 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events,
     ret->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP;
 #endif
 
-  log_it( L_DEBUG,"Dap event socket wrapped around %d sock a_events = %X", a_sock, a_events );
+ // log_it( L_DEBUG,"Dap event socket wrapped around %d sock a_events = %X", a_sock, a_events );
 
   return ret;
 }
@@ -157,8 +157,8 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c
         log_it( L_ERROR, "Error detected, can't create pipe(): '%s' (%d)", l_errbuf, l_errno);
         DAP_DELETE(l_es);
         return NULL;
-    }else
-        log_it(L_DEBUG, "Created one-way unnamed bytestream pipe %d->%d", l_pipe[0], l_pipe[1]);
+    }//else
+     //   log_it(L_DEBUG, "Created one-way unnamed bytestream pipe %d->%d", l_pipe[0], l_pipe[1]);
     l_es->fd = l_pipe[0];
     l_es->fd2 = l_pipe[1];
     return l_es;
@@ -223,8 +223,8 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc
         }
         DAP_DELETE(l_es);
         return NULL;
-    }else
-        log_it(L_DEBUG, "Created one-way unnamed packet pipe %d->%d", l_pipe[0], l_pipe[1]);
+    }//else
+     //   log_it(L_DEBUG, "Created one-way unnamed packet pipe %d->%d", l_pipe[0], l_pipe[1]);
     l_es->fd = l_pipe[0];
     l_es->fd2 = l_pipe[1];
 #endif
@@ -450,7 +450,7 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da
   assert( a_callbacks );
   assert( a_server );
 
-  log_it( L_DEBUG,"Dap event socket wrapped around %d sock", a_sock );
+  //log_it( L_DEBUG,"Dap event socket wrapped around %d sock", a_sock );
   dap_events_socket_t * ret = DAP_NEW_Z( dap_events_socket_t );
 
   ret->socket = a_sock;
@@ -569,7 +569,7 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool
     if ( !a_es )
         return;
 
-    log_it( L_DEBUG, "es is going to be removed from the lists and free the memory (0x%016X)", a_es );
+    //log_it( L_DEBUG, "es is going to be removed from the lists and free the memory (0x%016X)", a_es );
     dap_events_socket_remove_from_worker_unsafe(a_es, a_es->worker);
 
     if (a_es->events){ // It could be socket NOT from events
@@ -584,7 +584,7 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool
             HASH_DEL( a_es->events->sockets, a_es );
         pthread_rwlock_unlock( &a_es->events->sockets_rwlock );
     }
-    log_it( L_DEBUG, "dap_events_socket wrapped around %d socket is removed", a_es->socket );
+    //log_it( L_DEBUG, "dap_events_socket wrapped around %d socket is removed", a_es->socket );
 
     if( a_es->callbacks.delete_callback )
         a_es->callbacks.delete_callback( a_es, NULL ); // Init internal structure
@@ -624,9 +624,8 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap
         strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
         log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd %d  \"%s\" (%d)",
                 a_worker->epoll_fd, l_errbuf, l_errno);
-    }
-    else
-        log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_worker->id );
+    } //else
+      //  log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_worker->id );
     a_worker->event_sockets_count--;
     if(a_worker->esockets)
         HASH_DELETE(hh_worker,a_worker->esockets, a_es);
@@ -794,7 +793,7 @@ size_t dap_events_socket_write_unsafe(dap_events_socket_t *sc, const void * data
  */
 size_t dap_events_socket_write_f_unsafe(dap_events_socket_t *sc, const char * format,...)
 {
-    log_it(L_DEBUG,"dap_events_socket_write_f %u sock", sc->socket );
+    //log_it(L_DEBUG,"dap_events_socket_write_f %u sock", sc->socket );
 
     size_t max_data_size = sizeof(sc->buf_out)-sc->buf_out_size;
     va_list ap;
diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/net/core/dap_proc_queue.c
index 632d40ef81..d41f6604ca 100644
--- a/dap-sdk/net/core/dap_proc_queue.c
+++ b/dap-sdk/net/core/dap_proc_queue.c
@@ -65,7 +65,7 @@ void dap_proc_queue_delete(dap_proc_queue_t * a_queue)
  */
 static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg)
 {
-    log_it(L_DEBUG, "New callback in list accepted");
+    //log_it(L_DEBUG, "New callback in list accepted");
     dap_proc_queue_t * l_queue = (dap_proc_queue_t*) a_es->_inheritor;
     dap_proc_queue_msg_t * l_msg = (dap_proc_queue_msg_t*) a_msg;
 
@@ -78,7 +78,7 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg)
         l_queue->items = l_item;
         // Add on top so after call this callback will be executed first
         dap_events_socket_queue_ptr_send(l_queue->proc_thread->proc_event,NULL);
-        log_it( L_DEBUG, "Sent signal to proc thread that we have callbacks on board");
+        //log_it( L_DEBUG, "Sent signal to proc thread that we have callbacks on board");
     }
     if (l_msg->signal_kill){ // Say to kill this object and delete its inherior dap_proc_queue_t
         a_es->kill_signal = true;
diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c
index 930f67b702..27d03da39d 100644
--- a/dap-sdk/net/core/dap_proc_thread.c
+++ b/dap-sdk/net/core/dap_proc_thread.c
@@ -116,7 +116,7 @@ dap_proc_thread_t * dap_proc_thread_get_auto()
 static void s_proc_event_callback(dap_events_socket_t * a_esocket, void * a_value)
 {
     (void) a_value;
-    log_it(L_DEBUG, "Proc event callback");
+    //log_it(L_DEBUG, "Proc event callback");
     dap_proc_thread_t * l_thread = (dap_proc_thread_t *) a_esocket->_inheritor;
     dap_proc_queue_item_t * l_item = l_thread->proc_queue->items;
     dap_proc_queue_item_t * l_item_old = NULL;
@@ -247,8 +247,8 @@ static void * s_proc_thread_function(void * a_arg)
 #ifdef DAP_EVENTS_CAPS_EPOLL
                 if ( epoll_ctl( l_thread->epoll_ctl, EPOLL_CTL_DEL, l_cur->fd, &l_cur->ev ) == -1 )
                     log_it( L_ERROR,"Can't remove event socket's handler from the epoll ctl" );
-                else
-                    log_it( L_DEBUG,"Removed epoll's event from proc thread #%u", l_thread->cpu_id );
+                //else
+                //    log_it( L_DEBUG,"Removed epoll's event from proc thread #%u", l_thread->cpu_id );
                 if (l_cur->callbacks.delete_callback)
                     l_cur->callbacks.delete_callback(l_cur, l_thread);
                 if(l_cur->_inheritor)
diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c
index 6c61733b2f..ff88a74640 100644
--- a/dap-sdk/net/core/dap_worker.c
+++ b/dap-sdk/net/core/dap_worker.c
@@ -135,7 +135,7 @@ void *dap_worker_thread(void *arg)
                         //cur->no_close = false;
                         if (l_sock_err) {
                             l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
-                            log_it(L_DEBUG, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err));
+                            log_it(L_INFO, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err));
                         }
                     break;
                     default: log_it(L_WARNING, "Unimplemented EPOLLHUP for socket type %d", l_cur->type);
@@ -228,9 +228,13 @@ void *dap_worker_thread(void *arg)
                     if(l_bytes_read > 0) {
                         l_cur->buf_in_size += l_bytes_read;
                         //log_it(L_DEBUG, "Received %d bytes", l_bytes_read);
-                        if(l_cur->callbacks.read_callback)
+                        if(l_cur->callbacks.read_callback){
                             l_cur->callbacks.read_callback(l_cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well
-                        else{
+                            if (l_cur->worker == NULL ){ // esocket was unassigned in callback, we don't need any ops with it now,
+                                                         // continue to poll another esockets
+                                continue;
+                            }
+                        }else{
                             log_it(L_WARNING, "We have incomming %u data but no read callback on socket %d, removing from read set", l_cur->socket);
                             dap_events_socket_set_readable_unsafe(l_cur,false);
                         }
@@ -255,6 +259,10 @@ void *dap_worker_thread(void *arg)
                 //log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size);
                 if(l_cur->callbacks.write_callback)
                     l_cur->callbacks.write_callback(l_cur, NULL); // Call callback to process write event
+                if (l_cur->worker == NULL ){ // esocket was unassigned in callback, we don't need any ops with it now,
+                                             // continue to poll another esockets
+                    continue;
+                }
 
                 if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) {
 
@@ -389,7 +397,7 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg)
             l_es_new->me = l_es_new;
             HASH_ADD(hh_worker, w->esockets, me, sizeof(void *), l_es_new );
             w->event_sockets_count++;
-            log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id);
+            //log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id);
             if (l_es_new->callbacks.worker_assign_callback)
                 l_es_new->callbacks.worker_assign_callback(l_es_new, w);
 
@@ -463,7 +471,7 @@ static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg)
     dap_events_socket_t *l_msg_es = NULL;
     HASH_FIND(hh_worker, a_es->worker->esockets, &l_msg->esocket , sizeof (void*), l_msg_es );
     if ( l_msg_es == NULL){
-        log_it(L_DEBUG, "We got i/o message for client thats now not in list. Lost %u data", l_msg->data_size);
+        log_it(L_INFO, "We got i/o message for client thats now not in list. Lost %u data", l_msg->data_size);
         DAP_DELETE(l_msg);
         return;
     }
diff --git a/dap-sdk/net/server/http_server/dap_http_simple.c b/dap-sdk/net/server/http_server/dap_http_simple.c
index 9a11db0ba4..49e72c20a1 100644
--- a/dap-sdk/net/server/http_server/dap_http_simple.c
+++ b/dap-sdk/net/server/http_server/dap_http_simple.c
@@ -358,7 +358,8 @@ static void s_http_client_headers_read( dap_http_client_t *a_http_client, void *
     } else {
         log_it( L_DEBUG, "No data section, execution proc callback" );
         dap_events_socket_remove_from_worker_unsafe(a_http_client->esocket,a_http_client->esocket->worker);
-        dap_proc_queue_add_callback( a_http_client->esocket->worker->proc_queue, s_proc_queue_callback, l_http_simple);
+        dap_proc_queue_add_callback( l_http_simple->worker->proc_queue, s_proc_queue_callback, l_http_simple);
+
     }
 }
 
diff --git a/dap-sdk/net/stream/ch/dap_stream_ch.c b/dap-sdk/net/stream/ch/dap_stream_ch.c
index 036dbac039..c507880b31 100644
--- a/dap-sdk/net/stream/ch/dap_stream_ch.c
+++ b/dap-sdk/net/stream/ch/dap_stream_ch.c
@@ -96,7 +96,7 @@ dap_stream_ch_t* dap_stream_ch_new(dap_stream_t* a_stream, uint8_t id)
         l_ch_new->ready_to_read = true;
 
         // Init on stream worker
-        dap_stream_worker_t * l_stream_worker = DAP_STREAM_WORKER( a_stream->esocket->worker );
+        dap_stream_worker_t * l_stream_worker = a_stream->stream_worker;
         l_ch_new->stream_worker = l_stream_worker;
         HASH_ADD(hh_worker,l_stream_worker->channels, me,sizeof (void*),l_ch_new);
 
diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c
index f47b347b67..f14bff6ce9 100644
--- a/dap-sdk/net/stream/stream/dap_stream.c
+++ b/dap-sdk/net/stream/stream/dap_stream.c
@@ -322,6 +322,7 @@ dap_stream_t * stream_new(dap_http_client_t * a_sh)
 
     pthread_rwlock_init( &ret->rwlock, NULL);
     ret->esocket = a_sh->esocket;
+    ret->stream_worker = (dap_stream_worker_t*) a_sh->esocket->worker->_inheritor;
     ret->conn_http=a_sh;
     ret->buf_defrag_size = 0;
     ret->seq_id = 0;
@@ -333,6 +334,7 @@ dap_stream_t * stream_new(dap_http_client_t * a_sh)
     return ret;
 }
 
+
 /**
  * @brief dap_stream_delete
  * @param a_stream
@@ -391,7 +393,6 @@ dap_stream_t* dap_stream_new_es(dap_events_socket_t * a_es)
     ret->esocket = a_es;
     ret->buf_defrag_size=0;
     ret->is_client_to_uplink = true;
-
     log_it(L_NOTICE,"New stream with events socket instance for %s",a_es->hostaddr);
     return ret;
 }
diff --git a/dap-sdk/net/stream/stream/include/dap_stream.h b/dap-sdk/net/stream/stream/include/dap_stream.h
index 9f953ff098..3130f61dfb 100644
--- a/dap-sdk/net/stream/stream/include/dap_stream.h
+++ b/dap-sdk/net/stream/stream/include/dap_stream.h
@@ -55,7 +55,7 @@ typedef struct dap_stream {
     pthread_rwlock_t rwlock;
     dap_stream_session_t * session;
     dap_events_socket_t * esocket; // Connection
-
+    dap_stream_worker_t * stream_worker;
     struct dap_http_client * conn_http; // HTTP-specific
 
     char * service_key;
diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c
index 4b1d47d0f0..859485cc2b 100644
--- a/modules/channel/chain/dap_stream_ch_chain.c
+++ b/modules/channel/chain/dap_stream_ch_chain.c
@@ -129,6 +129,7 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg)
     UNUSED(a_thread);
     dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg;
     dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
+
     dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
 
     dap_chain_atom_ptr_t * l_lasts = NULL;
@@ -243,31 +244,36 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
     dap_chain_hash_fast_t l_atom_hash = {};
     dap_chain_atom_ptr_t l_atom_copy = l_ch_chain->pkt_data;
     uint64_t l_atom_copy_size = l_ch_chain->pkt_data_size;
-    dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash);
-    dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain);
-    size_t l_atom_size =0;
-    if ( l_chain->callback_atom_find_by_hash(l_atom_iter, &l_atom_hash, &l_atom_size) != NULL ) {
-        dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size);
-        if(l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) {
-            // append to file
-            dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_ch_chain->request_cell_id);
-            // add one atom only
-            int l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size);
-            // rewrite all file
-            //l_res = dap_chain_cell_file_update(l_cell);
-            if(!l_cell || l_res < 0) {
-                log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash,
-                        l_cell ? l_cell->file_storage_path : "[null]");
+    l_ch_chain->pkt_data = NULL;
+    l_ch_chain->pkt_data_size = 0;
+    if( l_atom_copy && l_atom_copy_size){
+        dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash);
+        dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain);
+        size_t l_atom_size =0;
+        if ( l_chain->callback_atom_find_by_hash(l_atom_iter, &l_atom_hash, &l_atom_size) != NULL ) {
+            dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size);
+            if(l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) {
+                // append to file
+                dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_ch_chain->request_cell_id);
+                // add one atom only
+                int l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size);
+                // rewrite all file
+                //l_res = dap_chain_cell_file_update(l_cell);
+                if(!l_cell || l_res < 0) {
+                    log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash,
+                            l_cell ? l_cell->file_storage_path : "[null]");
+                }
+                // delete cell and close file
+                dap_chain_cell_delete(l_cell);
             }
-            // delete cell and close file
-            dap_chain_cell_delete(l_cell);
-        }
-        if(l_atom_add_res == ATOM_PASS)
+            if(l_atom_add_res == ATOM_PASS)
+                DAP_DELETE(l_atom_copy);
+        } else {
             DAP_DELETE(l_atom_copy);
-    } else {
-        DAP_DELETE(l_atom_copy);
-    }
-    l_chain->callback_atom_iter_delete(l_atom_iter);
+        }
+        l_chain->callback_atom_iter_delete(l_atom_iter);
+    }else
+        log_it(L_WARNING, "In proc thread got stream ch packet with pkt_size: %zd and pkt_data: %p", l_atom_copy_size, l_atom_copy);
     dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker);
     return true;
 }
@@ -749,8 +755,7 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
                         }
                         HASH_DEL(l_ch_chain->request_atoms_lasts, l_atom_item);
                         break;
-                    }
-                    else{
+                    } else {
                         HASH_DEL(l_ch_chain->request_atoms_lasts, l_atom_item);
                     }
                 }
-- 
GitLab