From 45d225f3694af334bae3d5b74b871caa4811d183 Mon Sep 17 00:00:00 2001
From: Roman Khlopkov <roman.khlopkov@demlabs.net>
Date: Fri, 4 Sep 2020 17:22:20 +0300
Subject: [PATCH 1/4] [+] Packet split for GDB sync

---
 modules/channel/chain/dap_stream_ch_chain.c   | 16 +--
 .../channel/chain/dap_stream_ch_chain_pkt.c   |  8 +-
 .../global-db/dap_chain_global_db_driver.c    | 97 ++++++++++---------
 modules/global-db/dap_chain_global_db_hist.c  | 44 ++-------
 .../global-db/include/dap_chain_global_db.h   |  2 +-
 .../include/dap_chain_global_db_driver.h      |  9 +-
 modules/net/dap_chain_net.c                   |  6 +-
 7 files changed, 87 insertions(+), 95 deletions(-)

diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c
index 8f94aa9be..aea46e12c 100644
--- a/modules/channel/chain/dap_stream_ch_chain.c
+++ b/modules/channel/chain/dap_stream_ch_chain.c
@@ -628,12 +628,10 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
             bool l_is_stop = true;
             while(l_obj) {
                 size_t l_item_size_out = 0;
-                uint8_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out);
+                dap_list_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out);
                 // Item not found, maybe it has deleted? Then go to the next item
                 if(!l_item || !l_item_size_out) {
-                    //log_it(L_WARNING, "Log pack returned NULL??? data=0x%x (nothing to send) (rest=%d records)", l_obj, l_items_rest);
                     l_item_size_out = 0;
-                    // go to next item
                     l_obj = dap_db_log_list_get(l_db_list);
                 }
                 else {
@@ -641,12 +639,16 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
                     size_t l_items_rest = dap_db_log_list_get_count_rest(l_db_list);
                     log_it(L_INFO, "Send one global_db record data=0x%x len=%d (rest=%d/%d items)", l_item, l_item_size_out,
                             l_items_rest, l_items_total);*/
-                    dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB,
-                                                         l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
-                                                         l_ch_chain->request_cell_id, l_item, l_item_size_out);
+                    for (dap_list_t *l_iter = l_item; l_iter; l_iter = dap_list_next(l_iter)) {
+                        dap_store_obj_pkt_t *l_pkt = (dap_store_obj_pkt_t *)l_iter->data;
+                        uint32_t l_pkt_size = sizeof(dap_store_obj_pkt_t) + l_pkt->data_size;
+                        dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB,
+                                                             l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
+                                                             l_ch_chain->request_cell_id, l_pkt, l_pkt_size);
+                    }
                     l_packet_out = true;
                     l_ch_chain->stats_request_gdb_processed++;
-                    DAP_DELETE(l_item);
+                    dap_list_free_full(l_item, free);
                     // sent the record, another will be sent
                     l_is_stop = false;
                     break;
diff --git a/modules/channel/chain/dap_stream_ch_chain_pkt.c b/modules/channel/chain/dap_stream_ch_chain_pkt.c
index aff9bb677..19cc56601 100644
--- a/modules/channel/chain/dap_stream_ch_chain_pkt.c
+++ b/modules/channel/chain/dap_stream_ch_chain_pkt.c
@@ -49,8 +49,8 @@ dap_stream_ch_chain_state_t dap_stream_ch_chain_pkt_type_to_dap_stream_ch_chain_
  * @return
  */
 size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id,
-                                     dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id,
-        const void * a_data, size_t a_data_size)
+                                            dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id,
+                                            const void * a_data, size_t a_data_size)
 {
     dap_stream_ch_chain_pkt_t * l_chain_pkt;
     size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr) + a_data_size;
@@ -69,8 +69,8 @@ size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_typ
 }
 
 size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id,
-                                     dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id,
-        const void * a_data, size_t a_data_size)
+                                        dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id,
+                                        const void * a_data, size_t a_data_size)
 {
     dap_stream_ch_chain_pkt_t * l_chain_pkt;
     size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr) + a_data_size;
diff --git a/modules/global-db/dap_chain_global_db_driver.c b/modules/global-db/dap_chain_global_db_driver.c
index b1e805831..fd07a1bda 100644
--- a/modules/global-db/dap_chain_global_db_driver.c
+++ b/modules/global-db/dap_chain_global_db_driver.c
@@ -196,53 +196,64 @@ static size_t dap_db_get_size_pdap_store_obj_t(pdap_store_obj_t store_obj)
  * @param a_size_out[out] size of output structure
  * @return NULL in case of an error
  */
-dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp,
+dap_list_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp,
         size_t a_store_obj_count)
 {
-    if(!a_store_obj || a_store_obj_count < 1)
+    if (!a_store_obj || a_store_obj_count < 1)
         return NULL;
-    size_t l_data_size_out = sizeof(uint32_t); // size of output data
+
     // calculate output structure size
-    for(size_t l_q = 0; l_q < a_store_obj_count; ++l_q)
+    dap_list_t *l_ret = NULL;
+    dap_store_obj_pkt_t *l_pkt;
+    uint32_t l_obj_count = 0, l_data_size_out = 0;
+    for (size_t l_q = 0; l_q < a_store_obj_count; ++l_q) {
         l_data_size_out += dap_db_get_size_pdap_store_obj_t(&a_store_obj[l_q]);
-
-    dap_store_obj_pkt_t *l_pkt = DAP_NEW_Z_SIZE(dap_store_obj_pkt_t, sizeof(dap_store_obj_pkt_t) + l_data_size_out);
-    l_pkt->data_size = l_data_size_out;
-    l_pkt->timestamp = a_timestamp;
-    uint64_t l_offset = 0;
-    uint32_t l_count = (uint32_t) a_store_obj_count;
-    memcpy(l_pkt->data + l_offset, &l_count, sizeof(uint32_t));
-    l_offset += sizeof(uint32_t);
-    for(size_t l_q = 0; l_q < a_store_obj_count; ++l_q) {
-        dap_store_obj_t obj = a_store_obj[l_q];
-        //uint16_t section_size = (uint16_t) dap_strlen(obj.section);
-        uint16_t group_size = (uint16_t) dap_strlen(obj.group);
-        uint16_t key_size = (uint16_t) dap_strlen(obj.key);
-        memcpy(l_pkt->data + l_offset, &obj.type, sizeof(int));
-        l_offset += sizeof(int);
-        //memcpy(l_pkt->data + l_offset, &section_size, sizeof(uint16_t));
-        //l_offset += sizeof(uint16_t);
-        //memcpy(l_pkt->data + l_offset, obj.section, section_size);
-        //l_offset += section_size;
-        memcpy(l_pkt->data + l_offset, &group_size, sizeof(uint16_t));
-        l_offset += sizeof(uint16_t);
-        memcpy(l_pkt->data + l_offset, obj.group, group_size);
-        l_offset += group_size;
-        memcpy(l_pkt->data + l_offset, &obj.id, sizeof(uint64_t));
-        l_offset += sizeof(uint64_t);
-        memcpy(l_pkt->data + l_offset, &obj.timestamp, sizeof(time_t));
-        l_offset += sizeof(time_t);
-        memcpy(l_pkt->data + l_offset, &key_size, sizeof(uint16_t));
-        l_offset += sizeof(uint16_t);
-        memcpy(l_pkt->data + l_offset, obj.key, key_size);
-        l_offset += key_size;
-        memcpy(l_pkt->data + l_offset, &obj.value_len, sizeof(size_t));
-        l_offset += sizeof(size_t);
-        memcpy(l_pkt->data + l_offset, obj.value, obj.value_len);
-        l_offset += obj.value_len;
+        if (l_data_size_out > DAP_CHAIN_PKT_EXPECT_SIZE || (l_q == a_store_obj_count - 1 && l_data_size_out)) {
+            l_pkt = DAP_NEW_Z_SIZE(dap_store_obj_pkt_t, sizeof(dap_store_obj_pkt_t) + l_data_size_out);
+            l_pkt->data_size = l_data_size_out;
+            l_pkt->timestamp = a_timestamp;
+            l_pkt->obj_count = l_q + 1 - l_obj_count;
+            l_ret = dap_list_append(l_ret, l_pkt);
+            l_data_size_out = 0;
+            l_obj_count = l_q + 1;
+        }
     }
-    assert(l_data_size_out == l_offset);
-    return l_pkt;
+    l_obj_count = 0;
+    for (dap_list_t *l_iter = l_ret; l_iter; l_iter = dap_list_next(l_iter)) {
+        l_pkt = (dap_store_obj_pkt_t *)l_iter->data;
+        uint64_t l_offset = 0;
+        for(size_t l_q = 0; l_q < l_pkt->obj_count; ++l_q) {
+            dap_store_obj_t obj = a_store_obj[l_obj_count + l_q];
+            //uint16_t section_size = (uint16_t) dap_strlen(obj.section);
+            uint16_t group_size = (uint16_t) dap_strlen(obj.group);
+            uint16_t key_size = (uint16_t) dap_strlen(obj.key);
+            memcpy(l_pkt->data + l_offset, &obj.type, sizeof(int));
+            l_offset += sizeof(int);
+            //memcpy(l_pkt->data + l_offset, &section_size, sizeof(uint16_t));
+            //l_offset += sizeof(uint16_t);
+            //memcpy(l_pkt->data + l_offset, obj.section, section_size);
+            //l_offset += section_size;
+            memcpy(l_pkt->data + l_offset, &group_size, sizeof(uint16_t));
+            l_offset += sizeof(uint16_t);
+            memcpy(l_pkt->data + l_offset, obj.group, group_size);
+            l_offset += group_size;
+            memcpy(l_pkt->data + l_offset, &obj.id, sizeof(uint64_t));
+            l_offset += sizeof(uint64_t);
+            memcpy(l_pkt->data + l_offset, &obj.timestamp, sizeof(time_t));
+            l_offset += sizeof(time_t);
+            memcpy(l_pkt->data + l_offset, &key_size, sizeof(uint16_t));
+            l_offset += sizeof(uint16_t);
+            memcpy(l_pkt->data + l_offset, obj.key, key_size);
+            l_offset += key_size;
+            memcpy(l_pkt->data + l_offset, &obj.value_len, sizeof(size_t));
+            l_offset += sizeof(size_t);
+            memcpy(l_pkt->data + l_offset, obj.value, obj.value_len);
+            l_offset += obj.value_len;
+        }
+        l_obj_count += l_pkt->obj_count;
+        assert(l_pkt->data_size == l_offset);
+    }
+    return l_ret;
 }
 /**
  * deserialization
@@ -255,9 +266,7 @@ dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, siz
     if(!pkt || pkt->data_size < 1)
         return NULL;
     uint64_t offset = 0;
-    uint32_t count;
-    memcpy(&count, pkt->data, sizeof(uint32_t));
-    offset += sizeof(uint32_t);
+    uint32_t count = pkt->obj_count;
     dap_store_obj_t *store_obj = DAP_NEW_Z_SIZE(dap_store_obj_t, count * sizeof(struct dap_store_obj));
     for(size_t q = 0; q < count; ++q) {
         dap_store_obj_t *obj = store_obj + q;
diff --git a/modules/global-db/dap_chain_global_db_hist.c b/modules/global-db/dap_chain_global_db_hist.c
index c2683b156..4ac96d147 100644
--- a/modules/global-db/dap_chain_global_db_hist.c
+++ b/modules/global-db/dap_chain_global_db_hist.c
@@ -77,7 +77,7 @@ static char* dap_db_new_history_timestamp()
  *
  * return dap_store_obj_pkt_t*
  */
-uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out)
+dap_list_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out)
 {
     if(!a_obj)
         return NULL;
@@ -122,15 +122,18 @@ uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out)
         i++;
     }
     // serialize data
-    dap_store_obj_pkt_t *l_data_out = dap_store_packet_multiple(l_store_obj, l_timestamp, l_count);
+    dap_list_t *l_data_out = dap_store_packet_multiple(l_store_obj, l_timestamp, l_count);
 
     dap_store_obj_free(l_store_obj, l_count);
     dap_strfreev(l_keys);
 
     if(l_data_out && a_data_size_out) {
-        *a_data_size_out = sizeof(dap_store_obj_pkt_t) + l_data_out->data_size;
+        *a_data_size_out = 0;
+        for (dap_list_t *l_iter = l_data_out; l_iter; l_iter = dap_list_next(l_iter)) {
+            *a_data_size_out += sizeof(dap_store_obj_pkt_t) + ((dap_store_obj_pkt_t *)l_data_out)->data_size;
+        }
     }
-    return (uint8_t*) l_data_out;
+    return l_data_out;
 
 }
 
@@ -1326,7 +1329,6 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr
     dap_db_log_list_t *l_dap_db_log_list = DAP_NEW_Z(dap_db_log_list_t);
 
     size_t l_add_groups_num = 0;// number of group
-    //size_t l_add_groups_items_num = 0;// number items in all groups
     dap_list_t *l_add_groups_mask = a_add_groups_mask;
     // calc l_add_groups_num
     while(l_add_groups_mask) {
@@ -1337,7 +1339,6 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr
         l_add_groups_mask = dap_list_next(l_add_groups_mask);
     }
 
-    //size_t l_add_groups_num = dap_list_length(a_add_groups_mask);
     size_t l_data_size_out_main = dap_chain_global_db_driver_count(GROUP_LOCAL_HISTORY, first_id);
     size_t *l_data_size_out_add_items = DAP_NEW_Z_SIZE(size_t, sizeof(size_t) * l_add_groups_num);
     uint64_t *l_group_last_id = DAP_NEW_Z_SIZE(uint64_t, sizeof(uint64_t) * l_add_groups_num);
@@ -1366,9 +1367,6 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr
     }
     if(!(l_data_size_out_main + l_data_size_out_add_items_count))
         return NULL;
-    // debug
-//    if(l_data_size_out>11)
-//        l_data_size_out = 11;
     l_dap_db_log_list->item_start = first_id;
     l_dap_db_log_list->item_last = first_id + l_data_size_out_main;
     l_dap_db_log_list->items_number_main = l_data_size_out_main;
@@ -1381,31 +1379,9 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr
     l_dap_db_log_list->group_names = l_group_names;
     l_dap_db_log_list->group_cur = -1;
     l_dap_db_log_list->add_groups = a_add_groups_mask;
-    // there are too few items, read items right now
-    if(0) {//l_data_size_out <= 10) {
-        dap_list_t *l_list = NULL;
-        // read first items
-        size_t l_objs_count = 0;
-        dap_store_obj_t *l_objs = dap_chain_global_db_cond_load(GROUP_LOCAL_HISTORY, first_id, &l_objs_count);
-        for(size_t i = 0; i < l_objs_count; i++) {
-            dap_store_obj_t *l_obj_cur = l_objs + i;
-            dap_global_db_obj_t *l_item = DAP_NEW(dap_global_db_obj_t);
-            l_item->id = l_obj_cur->id;
-            l_item->key = dap_strdup(l_obj_cur->key);
-            l_item->value = (uint8_t*) dap_strdup((char*) l_obj_cur->value);
-            l_list = dap_list_append(l_list, l_item);
-        }
-        l_dap_db_log_list->list_write = l_list;
-        l_dap_db_log_list->list_read = l_list;
-        //log_it(L_DEBUG, "loaded items n=%d", l_data_size_out);
-        dap_store_obj_free(l_objs, l_objs_count);
-    }
-    // start thread for items loading
-    else {
-        l_dap_db_log_list->is_process = true;
-        pthread_mutex_init(&l_dap_db_log_list->list_mutex, NULL);
-        pthread_create(&l_dap_db_log_list->thread, NULL, s_list_thread_proc, l_dap_db_log_list);
-    }
+    l_dap_db_log_list->is_process = true;
+    pthread_mutex_init(&l_dap_db_log_list->list_mutex, NULL);
+    pthread_create(&l_dap_db_log_list->thread, NULL, s_list_thread_proc, l_dap_db_log_list);
     return l_dap_db_log_list;
 }
 
diff --git a/modules/global-db/include/dap_chain_global_db.h b/modules/global-db/include/dap_chain_global_db.h
index 4ac3c5a9a..76ae06271 100644
--- a/modules/global-db/include/dap_chain_global_db.h
+++ b/modules/global-db/include/dap_chain_global_db.h
@@ -119,7 +119,7 @@ char* dap_chain_global_db_hash(const uint8_t *data, size_t data_size);
 char* dap_chain_global_db_hash_fast(const uint8_t *data, size_t data_size);
 
 // Get data according the history log
-uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out);
+dap_list_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out);
 
 // Get data according the history log
 //char* dap_db_history_tx(dap_chain_hash_fast_t * a_tx_hash, const char *a_group_mempool);
diff --git a/modules/global-db/include/dap_chain_global_db_driver.h b/modules/global-db/include/dap_chain_global_db_driver.h
index c8d7ef9c5..4bccdc9e0 100644
--- a/modules/global-db/include/dap_chain_global_db_driver.h
+++ b/modules/global-db/include/dap_chain_global_db_driver.h
@@ -29,6 +29,8 @@
 #include "dap_common.h"
 #include "dap_list.h"
 
+#define DAP_CHAIN_PKT_EXPECT_SIZE 7168
+
 typedef struct dap_store_obj {
 	uint64_t id;
     time_t timestamp;
@@ -42,8 +44,9 @@ typedef struct dap_store_obj {
 }DAP_ALIGN_PACKED dap_store_obj_t, *pdap_store_obj_t;
 
 typedef struct dap_store_obj_pkt {
-	time_t timestamp;
-	size_t data_size;
+    uint64_t timestamp;
+    uint64_t data_size;
+    uint32_t obj_count;
 	uint8_t data[];
 }__attribute__((packed)) dap_store_obj_pkt_t;
 
@@ -90,7 +93,7 @@ bool dap_chain_global_db_driver_is(const char *a_group, const char *a_key);
 size_t dap_chain_global_db_driver_count(const char *a_group, uint64_t id);
 dap_list_t* dap_chain_global_db_driver_get_groups_by_mask(const char *a_group_mask);
 
-dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj,
+dap_list_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj,
 		time_t a_timestamp, size_t a_store_obj_count);
 dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *a_pkt,
 		size_t *a_store_obj_count);
diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c
index 8d2e991ab..3dba36268 100644
--- a/modules/net/dap_chain_net.c
+++ b/modules/net/dap_chain_net.c
@@ -268,7 +268,9 @@ void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const c
         l_obj->type = (uint8_t)a_op_code;
         DAP_DELETE(l_obj->group);
         l_obj->group = dap_strdup(a_group);
-        dap_store_obj_pkt_t *l_data_out = dap_store_packet_multiple(l_obj, l_obj->timestamp, 1);
+        dap_list_t *l_list_out = dap_store_packet_multiple(l_obj, l_obj->timestamp, 1);
+        // Expect only one element in list
+        dap_store_obj_pkt_t *l_data_out = (dap_store_obj_pkt_t *)l_list_out->data;
         dap_store_obj_free(l_obj, 1);
         dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb");
         dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t) {};
@@ -278,7 +280,7 @@ void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const c
             dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_net->pub.id,
                                           l_chain_id, l_net->pub.cell_id, l_data_out, sizeof(dap_store_obj_pkt_t) + l_data_out->data_size);
         }
-        DAP_DELETE(l_data_out);
+        dap_list_free_full(l_list_out, free);
     }
 }
 
-- 
GitLab


From 7a48933efa5c1bcd9930c8cd19a974552919c15f Mon Sep 17 00:00:00 2001
From: Roman Khlopkov <roman.khlopkov@demlabs.net>
Date: Fri, 4 Sep 2020 19:45:29 +0300
Subject: [PATCH 2/4] [*] Infinite cycle in root role for sync fixed

---
 modules/net/dap_chain_net.c | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c
index 3dba36268..bbca88442 100644
--- a/modules/net/dap_chain_net.c
+++ b/modules/net/dap_chain_net.c
@@ -391,10 +391,10 @@ static int s_net_states_proc(dap_chain_net_t * l_net)
                 case NODE_ROLE_ARCHIVE:
                 case NODE_ROLE_CELL_MASTER: {
                     // Add other root nodes as synchronization links
-                    while (dap_list_length(l_pvt_net->links_info) < s_max_links_count) {
-                        int i = rand() % l_pvt_net->seed_aliases_count;
+                    for (int i = 0; i < MIN(s_max_links_count, l_pvt_net->seed_aliases_count); i++) {
                         dap_chain_node_addr_t *l_link_addr = dap_chain_node_alias_find(l_net, l_pvt_net->seed_aliases[i]);
-                        dap_chain_node_info_read(l_net, l_link_addr);
+                        dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, l_link_addr);
+                        l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info);
                     }
                 } break;
                 case NODE_ROLE_FULL:
-- 
GitLab


From b1f5aa151c96e6a6e7ace59ac63e28feb319a976 Mon Sep 17 00:00:00 2001
From: Roman Khlopkov <roman.khlopkov@demlabs.net>
Date: Sat, 5 Sep 2020 13:46:18 +0300
Subject: [PATCH 3/4] [*] Small bugfix with sync

---
 modules/channel/chain/dap_stream_ch_chain.c | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c
index aea46e12c..2077b7976 100644
--- a/modules/channel/chain/dap_stream_ch_chain.c
+++ b/modules/channel/chain/dap_stream_ch_chain.c
@@ -138,8 +138,8 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg)
     dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create(l_chain);
     l_ch_chain->request_atom_iter = l_iter;
     l_lasts = l_chain->callback_atom_iter_get_lasts(l_iter, &l_lasts_count, &l_lasts_sizes);
-    if(l_lasts&& l_lasts_sizes) {
-        for(size_t i = l_lasts_count - 1; i >= 0; i--) {
+    if (l_lasts && l_lasts_sizes) {
+        for(long int i = l_lasts_count - 1; i >= 0; i--) {
             dap_chain_atom_item_t * l_item = NULL;
             dap_chain_hash_fast_t l_atom_hash;
             dap_hash_fast(l_lasts[i], l_lasts_sizes[i],
@@ -559,11 +559,15 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                 dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id,
                                               l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_gdb, sizeof(l_sync_gdb));
             }
+                break;
             case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: {
                 dap_stream_ch_chain_sync_request_t l_sync_chains = {};
                 dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id,
                                               l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_chains, sizeof(l_sync_chains));
             }
+                break;
+            case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR:
+                break;
             default: {
                 dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
                                                     l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
-- 
GitLab


From f23bbc3c061acb1fa3d7e97a4f2dbea8b8d74573 Mon Sep 17 00:00:00 2001
From: Roman Khlopkov <roman.khlopkov@demlabs.net>
Date: Mon, 7 Sep 2020 20:46:50 +0300
Subject: [PATCH 4/4] [*] Fragmentation GDB packets debugged

---
 dap-sdk/net/core/dap_worker.c                 |   9 +-
 dap-sdk/net/server-udp/dap_udp_server.c       |   9 +-
 .../net/server/http_server/dap_http_folder.c  |   1 +
 .../net/server/http_server/dap_http_simple.c  |   7 +-
 .../http_server/http_client/dap_http_client.c |   4 +-
 modules/channel/chain/dap_stream_ch_chain.c   | 517 +++++++++---------
 .../chain/include/dap_stream_ch_chain.h       |   3 +-
 .../chain/include/dap_stream_ch_chain_pkt.h   |   2 +-
 modules/net/dap_chain_net.c                   |   4 +-
 modules/service/vpn/dap_chain_net_srv_vpn.c   |   2 +
 10 files changed, 282 insertions(+), 276 deletions(-)

diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c
index 68d1b2019..b3611126c 100644
--- a/dap-sdk/net/core/dap_worker.c
+++ b/dap-sdk/net/core/dap_worker.c
@@ -256,14 +256,15 @@ void *dap_worker_thread(void *arg)
             // Socket is ready to write
             if(((l_epoll_events[n].events & EPOLLOUT) || (l_cur->flags & DAP_SOCK_READY_TO_WRITE))
                     && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE)) {
-                //log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size);
+
+                //log_it(L_DEBUG, "Main loop output: %u bytes to send", l_cur->buf_out_size);
                 if(l_cur->callbacks.write_callback)
                     l_cur->callbacks.write_callback(l_cur, NULL); // Call callback to process write event
+
                 if (l_cur->worker == NULL ){ // esocket was unassigned in callback, we don't need any ops with it now,
                                              // continue to poll another esockets
                     continue;
                 }
-
                 if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) {
 
                     static const uint32_t buf_out_zero_count_max = 2;
@@ -307,15 +308,11 @@ void *dap_worker_thread(void *arg)
                     if (l_errno != EAGAIN && l_errno != EWOULDBLOCK ){ // If we have non-blocking socket
                         log_it(L_ERROR, "Some error occured in send(): %s", strerror(errno));
                         l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
-                        break;
                     }
                 }else{
                     //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", l_bytes_sent,l_cur->buf_out_size);
-                    //}
-                    //log_it(L_DEBUG,"Output: sent %u bytes",total_sent);
                     if (l_bytes_sent) {
                         l_cur->buf_out_size -= l_bytes_sent;
-                        //log_it(L_DEBUG,"Output: left %u bytes in buffer",l_cur->buf_out_size);
                         if (l_cur->buf_out_size) {
                             memmove(l_cur->buf_out, &l_cur->buf_out[l_bytes_sent], l_cur->buf_out_size);
                         } else {
diff --git a/dap-sdk/net/server-udp/dap_udp_server.c b/dap-sdk/net/server-udp/dap_udp_server.c
index d02ac6a62..26a0fc39b 100644
--- a/dap-sdk/net/server-udp/dap_udp_server.c
+++ b/dap-sdk/net/server-udp/dap_udp_server.c
@@ -182,14 +182,7 @@ static void write_cb( EPOLL_HANDLE efd, int revents, dap_server_t *sh )
     dap_events_socket_t *client = udp_client->esocket;
 
     if( client != NULL && !check_close(client) && (client->flags & DAP_SOCK_READY_TO_WRITE) ) {
-
-      if ( sh->client_callbacks.write_callback )
-        sh->client_callbacks.write_callback( client, NULL );
-
       if ( client->buf_out_size > 0 ) {
-
-
-
         struct sockaddr_in addr;
         addr.sin_family = AF_INET;
         dap_udp_client_get_address( client, (unsigned int *)&addr.sin_addr.s_addr, &addr.sin_port );
@@ -211,6 +204,8 @@ static void write_cb( EPOLL_HANDLE efd, int revents, dap_server_t *sh )
         sb_payload_ready = false;
       }
       LL_DELETE( udp->waiting_clients, udp_client );
+      if ( sh->client_callbacks.write_callback )
+        sh->client_callbacks.write_callback( client, NULL );
     }
     else if( client == NULL ) {
       LL_DELETE( udp->waiting_clients, udp_client );
diff --git a/dap-sdk/net/server/http_server/dap_http_folder.c b/dap-sdk/net/server/http_server/dap_http_folder.c
index d16e8a0d4..857f39775 100644
--- a/dap-sdk/net/server/http_server/dap_http_folder.c
+++ b/dap-sdk/net/server/http_server/dap_http_folder.c
@@ -297,6 +297,7 @@ void dap_http_folder_data_write(dap_http_client_t * cl_ht, void * arg)
     dap_http_file_t * cl_ht_file= DAP_HTTP_FILE(cl_ht);
     cl_ht->esocket->buf_out_size=fread(cl_ht->esocket->buf_out,1,sizeof(cl_ht->esocket->buf_out),cl_ht_file->fd);
     cl_ht_file->position+=cl_ht->esocket->buf_out_size;
+    dap_events_socket_set_writable_unsafe(cl_ht->esocket, true);
 
     if(feof(cl_ht_file->fd)!=0){
         log_it(L_INFO, "All the file %s is sent out",cl_ht_file->local_path);
diff --git a/dap-sdk/net/server/http_server/dap_http_simple.c b/dap-sdk/net/server/http_server/dap_http_simple.c
index 3e43bacb7..9428a6f00 100644
--- a/dap-sdk/net/server/http_server/dap_http_simple.c
+++ b/dap-sdk/net/server/http_server/dap_http_simple.c
@@ -228,14 +228,10 @@ inline static bool _is_supported_user_agents_list_setted()
 inline static void _set_only_write_http_client_state(dap_http_client_t* http_client)
 {
 //  log_it(L_DEBUG,"_set_only_write_http_client_state");
-//  Sleep(300);
-
-  http_client->esocket->flags = DAP_SOCK_READY_TO_WRITE; // To not to touch epoll_fd we clean flags by ourself
-//  http_client->state_write=DAP_HTTP_CLIENT_STATE_NONE;
 
   http_client->state_write=DAP_HTTP_CLIENT_STATE_START;
   dap_events_socket_set_writable_unsafe(http_client->esocket,true);
-//  http_client->state_write=DAP_HTTP_CLIENT_STATE_START;
+  dap_events_socket_set_readable_unsafe(http_client->esocket, false);
 }
 
 static void _copy_reply_and_mime_to_response( dap_http_simple_t *cl_sh )
@@ -380,6 +376,7 @@ static void s_http_client_data_write( dap_http_client_t * a_http_client, void *a
     l_http_simple->reply_sent += dap_events_socket_write_unsafe( a_http_client->esocket,
                                               l_http_simple->reply_byte + l_http_simple->reply_sent,
                                               a_http_client->out_content_length - l_http_simple->reply_sent );
+    dap_events_socket_set_writable_unsafe(a_http_client->esocket, true);
 
     if ( l_http_simple->reply_sent >= a_http_client->out_content_length ) {
         log_it(L_INFO, "All the reply (%u) is sent out", a_http_client->out_content_length );
diff --git a/dap-sdk/net/server/http_server/http_client/dap_http_client.c b/dap-sdk/net/server/http_server/http_client/dap_http_client.c
index b5add239f..ef122b7a5 100644
--- a/dap-sdk/net/server/http_server/http_client/dap_http_client.c
+++ b/dap-sdk/net/server/http_server/http_client/dap_http_client.c
@@ -498,7 +498,7 @@ void dap_http_client_write( dap_events_socket_t * cl, void *arg )
             log_it( L_INFO," HTTP response with %u status code", l_http_client->reply_status_code );
             dap_events_socket_write_f_unsafe(cl, "HTTP/1.1 %u %s\r\n",l_http_client->reply_status_code, l_http_client->reply_reason_phrase[0] ?
                             l_http_client->reply_reason_phrase : http_status_reason_phrase(l_http_client->reply_status_code) );
-
+            dap_events_socket_set_writable_unsafe(cl, true);
             dap_http_client_out_header_generate( l_http_client );
             l_http_client->state_write = DAP_HTTP_CLIENT_STATE_HEADERS;
         } break;
@@ -508,6 +508,7 @@ void dap_http_client_write( dap_events_socket_t * cl, void *arg )
             if ( hdr == NULL ) {
                 log_it(L_DEBUG, "Output: headers are over (reply status code %u)",l_http_client->reply_status_code);
                 dap_events_socket_write_f_unsafe(cl, "\r\n");
+                dap_events_socket_set_writable_unsafe(cl, true);
                 if ( l_http_client->out_content_length || l_http_client->out_content_ready ) {
                     l_http_client->state_write=DAP_HTTP_CLIENT_STATE_DATA;
                 } else {
@@ -520,6 +521,7 @@ void dap_http_client_write( dap_events_socket_t * cl, void *arg )
             } else {
                 //log_it(L_WARNING,"Output: header %s: %s",hdr->name,hdr->value);
                 dap_events_socket_write_f_unsafe(cl, "%s: %s\r\n", hdr->name, hdr->value);
+                dap_events_socket_set_writable_unsafe(cl, true);
                 dap_http_header_remove( &l_http_client->out_headers, hdr );
             }
         } break;
diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c
index 2077b7976..fe8116937 100644
--- a/modules/channel/chain/dap_stream_ch_chain.c
+++ b/modules/channel/chain/dap_stream_ch_chain.c
@@ -138,12 +138,12 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg)
     dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create(l_chain);
     l_ch_chain->request_atom_iter = l_iter;
     l_lasts = l_chain->callback_atom_iter_get_lasts(l_iter, &l_lasts_count, &l_lasts_sizes);
+    log_it(L_INFO, "Found %d atoms for synchronization", l_lasts_count);
     if (l_lasts && l_lasts_sizes) {
         for(long int i = l_lasts_count - 1; i >= 0; i--) {
             dap_chain_atom_item_t * l_item = NULL;
             dap_chain_hash_fast_t l_atom_hash;
-            dap_hash_fast(l_lasts[i], l_lasts_sizes[i],
-                    &l_atom_hash);
+            dap_hash_fast(l_lasts[i], l_lasts_sizes[i], &l_atom_hash);
             pthread_mutex_lock(&l_ch_chain->mutex);
             HASH_FIND(hh, l_ch_chain->request_atoms_lasts, &l_atom_hash, sizeof(l_atom_hash),
                     l_item);
@@ -207,6 +207,7 @@ bool s_sync_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg)
         //log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list));
         // Add it to outgoing list
         l_ch_chain->request_global_db_trs = l_db_log;
+        l_ch_chain->db_iter = NULL;
         l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB;
     } else {
         dap_stream_ch_chain_sync_request_t l_request = {};
@@ -214,13 +215,12 @@ bool s_sync_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg)
         l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0;
         l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64);
         dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
-                l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
-                l_ch_chain->request_cell_id, &l_request, sizeof(l_request));
+                                             l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
+                                             l_ch_chain->request_cell_id, &l_request, sizeof(l_request));
         l_ch_chain->state = CHAIN_STATE_IDLE;
         if(l_ch_chain->callback_notify_packet_out)
-            l_ch_chain->callback_notify_packet_out(l_ch_chain,
-            DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
-            NULL, 0, l_ch_chain->callback_notify_arg);
+            l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
+                                                    NULL, 0, l_ch_chain->callback_notify_arg);
     }
     //log_it(L_INFO, "Prepared %u items for sync", l_db_log->items_number - l_request->id_start);
     // go to send data from list [in s_stream_ch_packet_out()]
@@ -381,91 +381,75 @@ bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
  */
 void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
 {
-    //static char *s_net_name = NULL;
     dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
-    if(l_ch_chain) {
-        dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg;
-        dap_stream_ch_chain_pkt_t * l_chain_pkt = (dap_stream_ch_chain_pkt_t *) l_ch_pkt->data;
-        uint8_t l_acl_idx = dap_chain_net_acl_idx_by_id(l_chain_pkt->hdr.net_id);
-        bool l_error = false;
-        char l_err_str[64];
-        if (l_acl_idx == (uint8_t)-1) {
-            log_it(L_ERROR, "Invalid net id in packet");
-            strcpy(l_err_str, "ERROR_NET_INVALID_ID");
-            l_error = true;
-        }
-        if (!l_error && a_ch->stream->session->acl && !a_ch->stream->session->acl[l_acl_idx]) {
-            log_it(L_WARNING, "Unauthorized request attempt to network %s",
-                   dap_chain_net_by_id(l_chain_pkt->hdr.net_id)->pub.name);
-            strcpy(l_err_str, "ERROR_NET_NOT_AUTHORIZED");
-            l_error = true;
-        }
-        if (l_error) {
+    if (!l_ch_chain) {
+        return;
+    }
+    dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg;
+    dap_stream_ch_chain_pkt_t * l_chain_pkt = (dap_stream_ch_chain_pkt_t *) l_ch_pkt->data;
+    if (!l_chain_pkt) {
+        return;
+    }
+    size_t l_chain_pkt_data_size = l_ch_pkt->hdr.size - sizeof(l_chain_pkt->hdr);
+    uint8_t l_acl_idx = dap_chain_net_acl_idx_by_id(l_chain_pkt->hdr.net_id);
+    if (l_acl_idx == (uint8_t)-1) {
+        log_it(L_ERROR, "Invalid net id in packet");
+        if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR) {
+            if(l_ch_chain->callback_notify_packet_in) {
+                l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt,
+                                                      l_chain_pkt_data_size, l_ch_chain->callback_notify_arg);
+            }
+        } else {
             dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
-                    l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, l_err_str);
+                                                l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
+                                                "ERROR_NET_INVALID_ID");
             dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
         }
-        size_t l_chain_pkt_data_size = l_ch_pkt->hdr.size - sizeof(l_chain_pkt->hdr);
-        if (!l_error && l_chain_pkt) {
-            switch (l_ch_pkt->hdr.type) {
-            case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: {
-                log_it(L_INFO, "In:  SYNCED_ALL pkt");
-            }
-                break;
-            case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: {
-                log_it(L_INFO, "In:  SYNCED_GLOBAL_DB pkt");
-            }
-                break;
-            case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: {
-                log_it(L_INFO, "In:  SYNCED_GLOBAL_DB_GROUP pkt");
-            }
-                break;
-            case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: {
-                log_it(L_INFO, "In:  SYNCED_GLOBAL_DB_GROUP pkt");
-            }
-                break;
+        return;
+    }
+    if (a_ch->stream->session->acl && !a_ch->stream->session->acl[l_acl_idx]) {
+        log_it(L_WARNING, "Unauthorized request attempt to network %s",
+               dap_chain_net_by_id(l_chain_pkt->hdr.net_id)->pub.name);
+        dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+                                            l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
+                                            "ERROR_NET_NOT_AUTHORIZED");
+        dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
+        return;
+    }
+    switch (l_ch_pkt->hdr.type) {
+    case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: {
+        log_it(L_INFO, "In:  SYNCED_ALL pkt");
+    }
+        break;
+    case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: {
+        log_it(L_INFO, "In:  SYNCED_GLOBAL_DB pkt");
+    }
+        break;
+    case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: {
+        log_it(L_INFO, "In:  SYNCED_GLOBAL_DB_GROUP pkt");
+    }
+        break;
+    case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: {
+        log_it(L_INFO, "In:  SYNCED_GLOBAL_DB_GROUP pkt");
+    }
+        break;
 
-            case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: {
-                log_it(L_INFO, "In:  SYNCED_CHAINS pkt");
-            }
-                break;
-            case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: {
-                log_it(L_INFO, "In:  SYNC_CHAINS pkt");
-                dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
-                if(l_chain) {
-                    if(l_ch_chain->state != CHAIN_STATE_IDLE) {
-                        log_it(L_INFO, "Can't process SYNC_CHAINS request because not in idle state");
-                        dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
-                                l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
-                                "ERROR_STATE_NOT_IN_IDLE");
-                        dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
-                    } else {
-                        // fill ids
-                        if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) {
-                            dap_stream_ch_chain_sync_request_t * l_request =
-                                    (dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data;
-                            memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size);
-                            memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
-                            memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
-                            memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
-                    }
-                        dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
-                        dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_chains_callback, a_ch);
-                    }
-                }
-            }
-                break;
-            case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: {
-                log_it(L_INFO, "In:  SYNC_GLOBAL_DB pkt");
-                if(l_ch_chain->state != CHAIN_STATE_IDLE) {
-                    log_it(L_INFO, "Can't process SYNC_GLOBAL_DB request because not in idle state");
-                    dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
-                            l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
-                            "ERROR_STATE_NOT_IN_IDLE");
-                    dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
-                    break;
-                }
-                // receive the latest global_db revision of the remote node -> go to send mode
+    case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: {
+        log_it(L_INFO, "In:  SYNCED_CHAINS pkt");
+    }
+        break;
+    case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: {
+        log_it(L_INFO, "In:  SYNC_CHAINS pkt");
+        dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
+        if(l_chain) {
+            if(l_ch_chain->state != CHAIN_STATE_IDLE) {
+                log_it(L_INFO, "Can't process SYNC_CHAINS request because not in idle state");
+                dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+                        l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
+                        "ERROR_STATE_NOT_IN_IDLE");
+                dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
+            } else {
+                // fill ids
                 if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) {
                     dap_stream_ch_chain_sync_request_t * l_request =
                             (dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data;
@@ -473,114 +457,138 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                     memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
                     memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
                     memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
-                    dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
-                    dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_gdb_callback, a_ch);
-                }
-                else {
-                    log_it(L_ERROR, "Get DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB session_id=%u bad request",
-                            a_ch->stream->session->id);
-                    dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
-                            l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
-                            "ERROR_SYNC_GLOBAL_DB_REQUEST_BAD");
-                    dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
-                }
             }
-                break;
-                // first packet of data with source node address
-            case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN: {
-                log_it(L_INFO, "In: FIRST_CHAIN data_size=%d", l_chain_pkt_data_size);
-                if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t))
-                    memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size);
+                dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
+                dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_chains_callback, a_ch);
             }
-                break;
-            case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: {
-                //log_it(L_INFO, "In: CHAIN pkt");
-                dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
-                if(l_chain) {
-                    // Expect atom element in
-                    if(l_chain_pkt_data_size > 0) {
-                        memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
-                        memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
-                        memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
-                        l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size);
-                        memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
-                        l_ch_chain->pkt_data_size = l_chain_pkt_data_size;
-                        dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
-                        dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_chain_pkt_callback, a_ch);
-                    } else {
-                        log_it(L_WARNING, "Empty chain packet");
-                        dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
-                                l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
-                                "ERROR_CHAIN_PACKET_EMPTY");
-                        dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
-                    }
-                }
-            }
-                break;
-                // first packet of data with source node address
-            case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB: {
-                log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d", l_chain_pkt_data_size);
-                if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t))
-                    memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size);
-            }
-                break;
-            case DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB: {
-                //log_it(L_INFO, "In: GLOBAL_DB data_size=%d", l_chain_pkt_data_size);
-                // get transaction and save it to global_db
-                if(l_chain_pkt_data_size > 0) {
-                    memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
-                    memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
-                    memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
-                    l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size);
-                    memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
-                    l_ch_chain->pkt_data_size = l_chain_pkt_data_size;
-                    dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
-                    dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_gdb_pkt_callback, a_ch);
-                } else {
-                    log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size");
-                    dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
-                            l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
-                            "ERROR_GLOBAL_DB_PACKET_EMPTY");
-                    dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
-                }
-            }
-                break;
-            case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: {
-                dap_stream_ch_chain_sync_request_t l_sync_gdb = {};
-                memcpy(&l_sync_gdb, l_chain_pkt->data, l_chain_pkt_data_size);
-                dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id);
-                l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ?
-                                                  dap_chain_net_get_cur_addr(l_net)->uint64 :
-                                                  dap_db_get_cur_node_addr(l_net->pub.name);
-                // Get last timestamp in log
-                l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64);
-                // no limit
-                l_sync_gdb.id_end = (uint64_t)0;
-                dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id,
-                                              l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_gdb, sizeof(l_sync_gdb));
-            }
-                break;
-            case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: {
-                dap_stream_ch_chain_sync_request_t l_sync_chains = {};
-                dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id,
-                                              l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_chains, sizeof(l_sync_chains));
-            }
-                break;
-            case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR:
-                break;
-            default: {
+        }
+    }
+        break;
+    case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: {
+        log_it(L_INFO, "In:  SYNC_GLOBAL_DB pkt");
+        if(l_ch_chain->state != CHAIN_STATE_IDLE) {
+            log_it(L_INFO, "Can't process SYNC_GLOBAL_DB request because not in idle state");
+            dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+                    l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
+                    "ERROR_STATE_NOT_IN_IDLE");
+            dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
+            break;
+        }
+        // receive the latest global_db revision of the remote node -> go to send mode
+        if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) {
+            dap_stream_ch_chain_sync_request_t * l_request =
+                    (dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data;
+            memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size);
+            memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
+            memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
+            memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
+            dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
+            dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_gdb_callback, a_ch);
+        }
+        else {
+            log_it(L_ERROR, "Get DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB session_id=%u bad request",
+                    a_ch->stream->session->id);
+            dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+                    l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
+                    "ERROR_SYNC_GLOBAL_DB_REQUEST_BAD");
+            dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
+        }
+    }
+        break;
+        // first packet of data with source node address
+    case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN: {
+        log_it(L_INFO, "In: FIRST_CHAIN data_size=%d", l_chain_pkt_data_size);
+        if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t))
+            memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size);
+    }
+        break;
+    case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: {
+        log_it(L_INFO, "In: CHAIN pkt data_size=%d", l_chain_pkt_data_size);
+        dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
+        if(l_chain) {
+            // Expect atom element in
+            if(l_chain_pkt_data_size > 0) {
+                memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
+                memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
+                memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
+                l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size);
+                memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
+                l_ch_chain->pkt_data_size = l_chain_pkt_data_size;
+                dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
+                dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_chain_pkt_callback, a_ch);
+            } else {
+                log_it(L_WARNING, "Empty chain packet");
                 dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
-                                                    l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
-                                                    "ERROR_UNKNOWN_CHAIN_PKT_TYPE");
-                }
+                        l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
+                        "ERROR_CHAIN_PACKET_EMPTY");
+                dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
             }
-            if(l_ch_chain->callback_notify_packet_in)
-                l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt,
-                                                      l_chain_pkt_data_size, l_ch_chain->callback_notify_arg);
         }
     }
+        break;
+        // first packet of data with source node address
+    case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB: {
+        log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d", l_chain_pkt_data_size);
+        if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t))
+            memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size);
+    }
+        break;
+    case DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB: {
+        log_it(L_INFO, "In: GLOBAL_DB data_size=%d", l_chain_pkt_data_size);
+        // get transaction and save it to global_db
+        if(l_chain_pkt_data_size > 0) {
+            memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
+            memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
+            memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
+            l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size);
+            memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
+            l_ch_chain->pkt_data_size = l_chain_pkt_data_size;
+            dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
+            dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_gdb_pkt_callback, a_ch);
+        } else {
+            log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size");
+            dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+                    l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
+                    "ERROR_GLOBAL_DB_PACKET_EMPTY");
+            dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
+        }
+    }
+        break;
+    case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: {
+        dap_stream_ch_chain_sync_request_t l_sync_gdb = {};
+        memcpy(&l_sync_gdb, l_chain_pkt->data, l_chain_pkt_data_size);
+        dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id);
+        l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ?
+                                          dap_chain_net_get_cur_addr(l_net)->uint64 :
+                                          dap_db_get_cur_node_addr(l_net->pub.name);
+        // Get last timestamp in log
+        l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64);
+        // no limit
+        l_sync_gdb.id_end = (uint64_t)0;
+        dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id,
+                                      l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_gdb, sizeof(l_sync_gdb));
+    }
+        break;
+    case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: {
+        dap_stream_ch_chain_sync_request_t l_sync_chains = {};
+        dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id,
+                                      l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_chains, sizeof(l_sync_chains));
+    }
+        break;
+    case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR:
+        break;
+    default: {
+        dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+                                            l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
+                                            "ERROR_UNKNOWN_CHAIN_PKT_TYPE");
+        }
+    }
+    if(l_ch_chain->callback_notify_packet_in)
+        l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt,
+                                              l_chain_pkt_data_size, l_ch_chain->callback_notify_arg);
 }
 
+
 /**
  * @brief dap_stream_ch_chain_go_idle
  * @param a_ch_chain
@@ -607,6 +615,29 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain)
     pthread_mutex_unlock(&a_ch_chain->mutex);
 }
 
+bool s_process_gdb_iter(dap_stream_ch_t *a_ch)
+{
+    dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
+    dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs;
+    dap_store_obj_pkt_t *l_pkt = (dap_store_obj_pkt_t *)l_ch_chain->db_iter->data;
+    uint32_t l_pkt_size = sizeof(dap_store_obj_pkt_t) + l_pkt->data_size;
+    log_it(L_INFO, "Send one global_db record packet len=%d (rest=%d/%d items)", l_pkt_size,
+           dap_db_log_list_get_count_rest(l_db_list), dap_db_log_list_get_count(l_db_list));
+    dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB,
+                                         l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
+                                         l_ch_chain->request_cell_id, l_pkt, l_pkt_size);
+    dap_list_t *l_iter = dap_list_next(l_ch_chain->db_iter);
+    if (l_iter) {
+        l_ch_chain->db_iter = l_iter;
+    } else {
+        l_ch_chain->stats_request_gdb_processed++;
+        l_ch_chain->db_iter = dap_list_first(l_ch_chain->db_iter);
+        dap_list_free_full(l_ch_chain->db_iter, free);
+        l_ch_chain->db_iter = NULL;
+    }
+    return true;
+}
+
 bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
 {
     UNUSED(a_thread);
@@ -617,81 +648,56 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
     //  log_it( L_DEBUG,"l_ch_chain %X", l_ch_chain );
     bool l_packet_out = false;
     switch (l_ch_chain->state) {
+
         case CHAIN_STATE_IDLE: {
             dap_stream_ch_chain_go_idle(l_ch_chain);
         } break;
 
-        case CHAIN_STATE_SYNC_ALL:
-
         case CHAIN_STATE_SYNC_GLOBAL_DB: {
-
-            // Get log diff
-            dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs;
-            dap_global_db_obj_t *l_obj = dap_db_log_list_get(l_db_list);
-
-            bool l_is_stop = true;
-            while(l_obj) {
-                size_t l_item_size_out = 0;
-                dap_list_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out);
-                // Item not found, maybe it has deleted? Then go to the next item
-                if(!l_item || !l_item_size_out) {
-                    l_item_size_out = 0;
-                    l_obj = dap_db_log_list_get(l_db_list);
-                }
-                else {
-                    /*size_t l_items_total = dap_db_log_list_get_count(l_db_list);
-                    size_t l_items_rest = dap_db_log_list_get_count_rest(l_db_list);
-                    log_it(L_INFO, "Send one global_db record data=0x%x len=%d (rest=%d/%d items)", l_item, l_item_size_out,
-                            l_items_rest, l_items_total);*/
-                    for (dap_list_t *l_iter = l_item; l_iter; l_iter = dap_list_next(l_iter)) {
-                        dap_store_obj_pkt_t *l_pkt = (dap_store_obj_pkt_t *)l_iter->data;
-                        uint32_t l_pkt_size = sizeof(dap_store_obj_pkt_t) + l_pkt->data_size;
-                        dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB,
-                                                             l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
-                                                             l_ch_chain->request_cell_id, l_pkt, l_pkt_size);
+            if (l_ch_chain->db_iter) {
+                l_packet_out = s_process_gdb_iter(l_ch);
+            } else {
+                dap_global_db_obj_t *l_obj;
+                do { // Get log diff
+                    size_t l_item_size_out = 0;
+                    l_obj = dap_db_log_list_get(l_ch_chain->request_global_db_trs);
+                    l_ch_chain->db_iter = dap_db_log_pack(l_obj, &l_item_size_out);
+                    if (l_ch_chain->db_iter && l_item_size_out) {
+                        break;
                     }
+                    // Item not found, maybe it has deleted? Then go to the next item
+                } while (l_obj);
+                if (l_ch_chain->db_iter) {
+                    l_packet_out = s_process_gdb_iter(l_ch);
+                } else {
+                    //log_it(L_DEBUG, "l_obj == 0, STOP");
+                    // free log list
+                    dap_db_log_list_delete(l_ch_chain->request_global_db_trs);
+                    l_ch_chain->request_global_db_trs = NULL;
+                    // last message
+                    dap_stream_ch_chain_sync_request_t l_request = {};
+                    dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id);
+                    l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0;
+                    l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64);
+                    l_request.id_end = 0;
+
+                    log_it( L_DEBUG,"Syncronized database:  last id %llu, items syncronyzed %llu ", l_request.id_start,
+                            l_ch_chain->stats_request_gdb_processed );
+
+                    dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
+                            l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
+                            l_ch_chain->request_cell_id, &l_request, sizeof(l_request));
                     l_packet_out = true;
-                    l_ch_chain->stats_request_gdb_processed++;
-                    dap_list_free_full(l_item, free);
-                    // sent the record, another will be sent
-                    l_is_stop = false;
-                    break;
-                }
-            }
 
-            if(l_is_stop){
-                //log_it(L_DEBUG, "l_obj == 0, STOP");
-                // free log list
-                l_ch_chain->request_global_db_trs = NULL;
-                dap_db_log_list_delete(l_db_list);
-                // last message
-                dap_stream_ch_chain_sync_request_t l_request = {};
-                dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id);
-                l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0;
-                l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64);
-                l_request.id_end = 0;
-
-                log_it( L_DEBUG,"Syncronized database:  last id %llu, items syncronyzed %llu ", l_request.id_start,
-                        l_ch_chain->stats_request_gdb_processed );
-
-                dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
-                        l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
-                        l_ch_chain->request_cell_id, &l_request, sizeof(l_request));
-                l_packet_out = true;
-
-                if(l_ch_chain->callback_notify_packet_out)
-                    l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
-                    NULL, 0, l_ch_chain->callback_notify_arg);
-
-                if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL)
+                    if(l_ch_chain->callback_notify_packet_out)
+                        l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
+                        NULL, 0, l_ch_chain->callback_notify_arg);
                     dap_stream_ch_chain_go_idle(l_ch_chain);
+                }
             }
+        } break;
 
-        }
-        if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL)
-            break;
-
-            // Synchronize chains
+        // Synchronize chains
         case CHAIN_STATE_SYNC_CHAINS: {
             //log_it(L_DEBUG, "CHAIN_STATE_SYNC_CHAINS");
             dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
@@ -730,6 +736,7 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
 
                         // Then flush it out to the remote
                         size_t l_atom_size = l_atom_item->atom_size;
+                        log_it(L_INFO, "Send one chain packet len=%d", l_atom_size);
                         dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_net_id,
                                                              l_ch_chain->request_chain_id, l_ch_chain->request_cell_id,
                                                              l_atom_item->atom, l_atom_size);
@@ -766,8 +773,8 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
                 }
                 pthread_mutex_unlock(&l_ch_chain->mutex);
             }
-        }
-        break;
+        } break;
+        default: break;
     }
     if (l_packet_out) {
         dap_stream_ch_set_ready_to_write_unsafe(l_ch, true);
@@ -784,6 +791,10 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
 void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
 {
     (void) a_arg;
+    if (a_ch->stream->esocket->buf_out_size > DAP_EVENTS_SOCKET_BUF / 2) {
+        return;
+    }
+    dap_stream_ch_set_ready_to_write_unsafe(a_ch, false);
     dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
     if (l_ch_chain && l_ch_chain->state != CHAIN_STATE_IDLE) {
         dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h
index 2d1ef9751..9e576b765 100644
--- a/modules/channel/chain/include/dap_stream_ch_chain.h
+++ b/modules/channel/chain/include/dap_stream_ch_chain.h
@@ -49,7 +49,8 @@ typedef struct dap_stream_ch_chain {
     pthread_mutex_t mutex;
     dap_stream_ch_t * ch;
 
-    dap_db_log_list_t *request_global_db_trs; // list of transactions
+    dap_db_log_list_t *request_global_db_trs; // list of global db records
+    dap_list_t *db_iter;
     dap_stream_ch_chain_state_t state;
 
     dap_chain_atom_iter_t * request_atom_iter;
diff --git a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h
index 5091c1553..a56e07c49 100644
--- a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h
+++ b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h
@@ -57,7 +57,7 @@ typedef enum dap_stream_ch_chain_state{
     CHAIN_STATE_IDLE=0,
     CHAIN_STATE_SYNC_CHAINS,
     CHAIN_STATE_SYNC_GLOBAL_DB,
-    CHAIN_STATE_SYNC_ALL,
+    CHAIN_STATE_SYNC_ALL
 } dap_stream_ch_chain_state_t;
 
 typedef struct dap_stream_ch_chain_sync_request{
diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c
index bbca88442..2e4296197 100644
--- a/modules/net/dap_chain_net.c
+++ b/modules/net/dap_chain_net.c
@@ -496,7 +496,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net)
                 }
 
                 // wait for finishing of request
-                int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms
+                int timeout_ms = 30000; // 5 min = 300 sec = 300 000 ms
                 // TODO add progress info to console
                 l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
                 switch (l_res) {
@@ -556,7 +556,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net)
                     dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id,
                                                      l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request));
                     // wait for finishing of request
-                    int timeout_ms = 120000; // 2 min = 120 sec = 120 000 ms
+                    int timeout_ms = 20000; // 2 min = 120 sec = 120 000 ms
                     // TODO add progress info to console
                     if (dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id())) {
                         l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c
index ae7385b02..4edf2abcb 100644
--- a/modules/service/vpn/dap_chain_net_srv_vpn.c
+++ b/modules/service/vpn/dap_chain_net_srv_vpn.c
@@ -1223,6 +1223,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                 assert(l_tun);
                 // Unsafely send it
                 dap_events_socket_write_unsafe( l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size);
+                dap_events_socket_set_writable_unsafe(l_tun->es, true);
             } break;
 
             // for servier only
@@ -1231,6 +1232,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                 assert(l_tun);
                 // Unsafely send it
                 size_t l_ret = dap_events_socket_write_unsafe( l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size);
+                dap_events_socket_set_writable_unsafe(l_tun->es, true);
                 if( l_ret)
                     s_update_limits (a_ch, l_srv_session, l_usage,l_ret );
             } break;
-- 
GitLab