From 7ec6ef7088fcf3ab710f8fae947cb00ca9934598 Mon Sep 17 00:00:00 2001
From: Roman Khlopkov <roman.khlopkov@demlabs.net>
Date: Tue, 6 Oct 2020 09:32:17 +0000
Subject: [PATCH] bugs-4547

---
 dap-sdk/net/core/dap_worker.c                 |   5 +-
 modules/channel/chain/dap_stream_ch_chain.c   | 193 ++++++++++--------
 .../chain/include/dap_stream_ch_chain.h       |   8 +-
 3 files changed, 113 insertions(+), 93 deletions(-)

diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c
index 690cb0c4ce..c096d9d645 100644
--- a/dap-sdk/net/core/dap_worker.c
+++ b/dap-sdk/net/core/dap_worker.c
@@ -455,8 +455,9 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg)
     dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg;
     dap_worker_t * w = a_es->worker;
     //log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new);
-    if(dap_events_socket_check_unsafe( w, a_es)){
-        log_it(L_ERROR, "Already assigned %d (%p), you're doing smth wrong", a_es->socket, a_es);
+    if(dap_events_socket_check_unsafe( w, l_es_new)){
+        //log_it(L_ERROR, "Already assigned %d (%p), you're doing smth wrong", l_es_new->socket, l_es_new);
+        // Socket already present in worker, it's OK
         return;
     }
 
diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c
index 57e3241a72..a22f26fb99 100644
--- a/modules/channel/chain/dap_stream_ch_chain.c
+++ b/modules/channel/chain/dap_stream_ch_chain.c
@@ -219,11 +219,12 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
     }
 
     dap_chain_hash_fast_t l_atom_hash = {};
-    dap_chain_atom_ptr_t l_atom_copy = l_ch_chain->pkt_data;
-    uint64_t l_atom_copy_size = l_ch_chain->pkt_data_size;
-    l_ch_chain->pkt_data = NULL;
-    l_ch_chain->pkt_data_size = 0;
-    if( l_atom_copy && l_atom_copy_size){
+    dap_list_t *l_pkt_copy_list = l_ch_chain->pkt_copy_list;
+    if (l_pkt_copy_list) {
+        l_ch_chain->pkt_copy_list = l_ch_chain->pkt_copy_list->next;
+        dap_chain_pkt_copy_t *l_pkt_copy = (dap_chain_pkt_copy_t *)l_pkt_copy_list->data;
+        dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_copy->pkt_data;
+        uint64_t l_atom_copy_size = l_pkt_copy->pkt_data_size;
         dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash);
         dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain);
         size_t l_atom_size =0;
@@ -277,8 +278,10 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
             DAP_DELETE(l_atom_copy);
         }
         l_chain->callback_atom_iter_delete(l_atom_iter);
+        DAP_DELETE(l_pkt_copy);
+        DAP_DELETE(l_pkt_copy_list);
     }else
-        log_it(L_WARNING, "In proc thread got stream ch packet with pkt_size: %zd and pkt_data: %p", l_atom_copy_size, l_atom_copy);
+        log_it(L_WARNING, "In proc thread got CHAINS stream ch packet with zero data");
     dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker);
     return true;
 }
@@ -288,93 +291,101 @@ bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
     UNUSED(a_thread);
     dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg;
     dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
-    dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
 
-    size_t l_data_obj_count = 0;
-    // deserialize data & Parse data from dap_db_log_pack()
-    dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_ch_chain->pkt_data,l_ch_chain->pkt_data_size, &l_data_obj_count);
-    //log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count );
-
-    for(size_t i = 0; i < l_data_obj_count; i++) {
-        // timestamp for exist obj
-        time_t l_timestamp_cur = 0;
-        // obj to add
-        dap_store_obj_t* l_obj = l_store_obj + i;
-        // read item from base;
-        size_t l_count_read = 0;
-        dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group,
-                l_obj->key, &l_count_read);
-        // get timestamp for the exist entry
-        if(l_read_obj)
-            l_timestamp_cur = l_read_obj->timestamp;
-        // get timestamp for the deleted entry
-        else
-        {
-            l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key);
-        }
+    dap_list_t *l_pkt_copy_list = l_ch_chain->pkt_copy_list;
+    if (l_pkt_copy_list) {
+        l_ch_chain->pkt_copy_list = l_ch_chain->pkt_copy_list->next;
+        dap_chain_pkt_copy_t *l_pkt_copy = (dap_chain_pkt_copy_t *)l_pkt_copy_list->data;
+        size_t l_data_obj_count = 0;
+        // deserialize data & Parse data from dap_db_log_pack()
+        dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_pkt_copy->pkt_data, l_pkt_copy->pkt_data_size, &l_data_obj_count);
+        //log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count );
+
+        for(size_t i = 0; i < l_data_obj_count; i++) {
+            // timestamp for exist obj
+            time_t l_timestamp_cur = 0;
+            // obj to add
+            dap_store_obj_t* l_obj = l_store_obj + i;
+            // read item from base;
+            size_t l_count_read = 0;
+            dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group,
+                    l_obj->key, &l_count_read);
+            // get timestamp for the exist entry
+            if(l_read_obj)
+                l_timestamp_cur = l_read_obj->timestamp;
+            // get timestamp for the deleted entry
+            else
+            {
+                l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key);
+            }
 
-        //check whether to apply the received data into the database
-        bool l_apply = true;
-        if(l_obj->timestamp < l_timestamp_cur)
-            l_apply = false;
-        else if(l_obj->type == 'd') {
-            // already deleted
-            if(!l_read_obj)
-                l_apply = false;
-        }
-        else if(l_obj->type == 'a') {
-            bool l_is_the_same_present = false;
-            if(l_read_obj &&
-                    l_read_obj->value_len == l_obj->value_len &&
-                    !memcmp(l_read_obj->value, l_obj->value, l_obj->value_len))
-                l_is_the_same_present = true;
-            // this data already present in global_db and not obsolete (out of date)
-            if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp))
+            //check whether to apply the received data into the database
+            bool l_apply = true;
+            if(l_obj->timestamp < l_timestamp_cur)
                 l_apply = false;
-        }
-        if(l_read_obj)
-            dap_store_obj_free(l_read_obj, l_count_read);
+            else if(l_obj->type == 'd') {
+                // already deleted
+                if(!l_read_obj)
+                    l_apply = false;
+            }
+            else if(l_obj->type == 'a') {
+                bool l_is_the_same_present = false;
+                if(l_read_obj &&
+                        l_read_obj->value_len == l_obj->value_len &&
+                        !memcmp(l_read_obj->value, l_obj->value, l_obj->value_len))
+                    l_is_the_same_present = true;
+                // this data already present in global_db and not obsolete (out of date)
+                if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp))
+                    l_apply = false;
+            }
+            if(l_read_obj)
+                dap_store_obj_free(l_read_obj, l_count_read);
 
-        if(!l_apply) {
-            // If request was from defined node_addr we update its state
-            if(l_ch_chain->request.node_addr.uint64) {
-                dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
+            if(!l_apply) {
+                // If request was from defined node_addr we update its state
+                if(l_ch_chain->request.node_addr.uint64) {
+                    dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
+                }
+                continue;
             }
-            continue;
-        }
 
-        char l_ts_str[50];
-        dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp);
-        /*log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\""
-                " timestamp=\"%s\" value_len=%u  ",
-                (char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group,
-                l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);*/
-        // apply received transaction
-        dap_chain_t *l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
-        if(l_chain) {
-            if(l_chain->callback_datums_pool_proc_with_group){
-                void * restrict l_store_obj_value = l_store_obj->value;
-                l_chain->callback_datums_pool_proc_with_group(l_chain,
-                        (dap_chain_datum_t** restrict) l_store_obj_value, 1,
-                        l_store_obj[i].group);
+            char l_ts_str[50];
+            dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp);
+            /*log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\""
+                    " timestamp=\"%s\" value_len=%u  ",
+                    (char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group,
+                    l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);*/
+            // apply received transaction
+            dap_chain_t *l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
+            if(l_chain) {
+                if(l_chain->callback_datums_pool_proc_with_group){
+                    void * restrict l_store_obj_value = l_store_obj->value;
+                    l_chain->callback_datums_pool_proc_with_group(l_chain,
+                            (dap_chain_datum_t** restrict) l_store_obj_value, 1,
+                            l_store_obj[i].group);
+                }
             }
-        }
-        // save data to global_db
-        if(!dap_chain_global_db_obj_save(l_obj, 1)) {
-            dap_stream_ch_chain_pkt_write_error(l_ch, l_ch_chain->request_net_id,
-                                                l_ch_chain->request_chain_id, l_ch_chain->request_cell_id,
-                                                "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED");
-            dap_stream_ch_set_ready_to_write_unsafe(l_ch, true);
-        } else {
-            // If request was from defined node_addr we update its state
-            if(l_ch_chain->request.node_addr.uint64) {
-                dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
+            // save data to global_db
+            if(!dap_chain_global_db_obj_save(l_obj, 1)) {
+                dap_stream_ch_chain_pkt_write_error(l_ch, l_ch_chain->request_net_id,
+                                                    l_ch_chain->request_chain_id, l_ch_chain->request_cell_id,
+                                                    "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED");
+                dap_stream_ch_set_ready_to_write_unsafe(l_ch, true);
+            } else {
+                // If request was from defined node_addr we update its state
+                if(l_ch_chain->request.node_addr.uint64) {
+                    dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
+                }
+                //log_it(L_DEBUG, "Added new GLOBAL_DB history pack");
             }
-            //log_it(L_DEBUG, "Added new GLOBAL_DB history pack");
         }
+        if(l_store_obj)
+            dap_store_obj_free(l_store_obj, l_data_obj_count);
+        DAP_DELETE(l_pkt_copy);
+        DAP_DELETE(l_pkt_copy_list);
+    } else {
+        log_it(L_WARNING, "In proc thread got GDB stream ch packet with zero data");
     }
-    if(l_store_obj)
-        dap_store_obj_free(l_store_obj, l_data_obj_count);
     dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker);
     return true;
 }
@@ -516,9 +527,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                 memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
                 memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
                 memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
-                l_ch_chain->pkt_data = DAP_NEW_SIZE(byte_t, l_chain_pkt_data_size);
-                memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
-                l_ch_chain->pkt_data_size = l_chain_pkt_data_size;
+                dap_chain_pkt_copy_t *l_pkt_copy = DAP_NEW_Z(dap_chain_pkt_copy_t);
+                l_pkt_copy->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size);
+                memcpy(l_pkt_copy->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
+                l_pkt_copy->pkt_data_size = l_chain_pkt_data_size;
+                l_ch_chain->pkt_copy_list = dap_list_append(l_ch_chain->pkt_copy_list, l_pkt_copy);
                 dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
                 dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_chain_pkt_callback, a_ch);
             } else {
@@ -545,9 +558,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
             memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
             memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
             memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
-            l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size);
-            memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
-            l_ch_chain->pkt_data_size = l_chain_pkt_data_size;
+            dap_chain_pkt_copy_t *l_pkt_copy = DAP_NEW_Z(dap_chain_pkt_copy_t);
+            l_pkt_copy->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size);
+            memcpy(l_pkt_copy->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
+            l_pkt_copy->pkt_data_size = l_chain_pkt_data_size;
+            l_ch_chain->pkt_copy_list = dap_list_append(l_ch_chain->pkt_copy_list, l_pkt_copy);
             dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
             dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_gdb_pkt_callback, a_ch);
         } else {
diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h
index 931d2b9714..ef954e93ef 100644
--- a/modules/channel/chain/include/dap_stream_ch_chain.h
+++ b/modules/channel/chain/include/dap_stream_ch_chain.h
@@ -45,6 +45,11 @@ typedef struct dap_chain_atom_item{
     UT_hash_handle hh;
 } dap_chain_atom_item_t;
 
+typedef struct dap_chain_pkt_copy {
+    uint64_t pkt_data_size;
+    byte_t *pkt_data;
+} dap_chain_pkt_copy_t;
+
 typedef struct dap_stream_ch_chain {
     pthread_mutex_t mutex;
     dap_stream_ch_t * ch;
@@ -54,8 +59,7 @@ typedef struct dap_stream_ch_chain {
     dap_stream_ch_chain_state_t state;
 
     dap_chain_atom_iter_t * request_atom_iter;
-    byte_t *pkt_data;
-    uint64_t pkt_data_size;
+    dap_list_t *pkt_copy_list;
     uint64_t stats_request_atoms_processed;
     uint64_t stats_request_gdb_processed;
     dap_stream_ch_chain_sync_request_t request;
-- 
GitLab