From c992e7135a0a0eba4549f9d616a7e0b7564fe182 Mon Sep 17 00:00:00 2001
From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net>
Date: Tue, 19 Jan 2021 21:40:58 +0700
Subject: [PATCH] ...

---
 CMakeLists.txt                                |   2 +-
 dap-sdk/net/core/dap_proc_thread.c            |  16 +-
 dap-sdk/net/core/include/dap_proc_thread.h    |   3 +
 dap-sdk/net/stream/ch/dap_stream_ch_pkt.c     |  69 ++++
 .../net/stream/ch/include/dap_stream_ch_pkt.h |   3 +
 dap-sdk/net/stream/stream/dap_stream_worker.c |  93 ++++-
 .../stream/stream/include/dap_stream_worker.h |   6 +
 modules/channel/chain/dap_stream_ch_chain.c   |  30 +-
 .../channel/chain/dap_stream_ch_chain_pkt.c   |  35 ++
 .../chain/include/dap_stream_ch_chain.h       |   3 +-
 .../chain/include/dap_stream_ch_chain_pkt.h   |  46 ++-
 modules/net/dap_chain_net.c                   | 318 +++---------------
 modules/net/dap_chain_node_client.c           |   2 +-
 13 files changed, 335 insertions(+), 291 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 8226ff439a..67bf9ca3bc 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -2,7 +2,7 @@ project(cellframe-sdk C)
 cmake_minimum_required(VERSION 2.8)
 
 set(CMAKE_C_STANDARD 11)
-set(CELLFRAME_SDK_NATIVE_VERSION "2.8-9")
+set(CELLFRAME_SDK_NATIVE_VERSION "2.8-10")
 add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"")
 set(DAPSDK_MODULES "")
 
diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c
index 423800273f..ee68c3c437 100644
--- a/dap-sdk/net/core/dap_proc_thread.c
+++ b/dap-sdk/net/core/dap_proc_thread.c
@@ -174,12 +174,12 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_va
 }
 
 /**
- * @brief s_update_poll_flags
+ * @brief dap_proc_thread_esocket_update_poll_flags
  * @param a_thread
  * @param a_esocket
  * @return
  */
-static int s_update_poll_flags(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket)
+int dap_proc_thread_esocket_update_poll_flags(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket)
 {
 #ifdef DAP_EVENTS_CAPS_EPOLL
     u_int events = a_esocket->ev_base_flags;
@@ -599,14 +599,14 @@ static void * s_proc_thread_function(void * a_arg)
                             memmove(l_cur->buf_out, l_cur->buf_out+l_bytes_sent, l_cur->buf_out_size );
                         }else{
                             l_cur->flags ^= DAP_SOCK_READY_TO_WRITE;
-                            s_update_poll_flags(l_thread, l_cur);
+                            dap_proc_thread_esocket_update_poll_flags(l_thread, l_cur);
                         }
                     }
 
                 }else{
                     log_it(L_DEBUG,"(!) Write event receieved but nothing in buffer, switching off this flag");
                     l_cur->flags ^= DAP_SOCK_READY_TO_WRITE;
-                    s_update_poll_flags(l_thread, l_cur);
+                    dap_proc_thread_esocket_update_poll_flags(l_thread, l_cur);
                 }
 
 
@@ -681,7 +681,7 @@ bool dap_proc_thread_assign_on_worker_inter(dap_proc_thread_t * a_thread, dap_wo
     //log_it(L_DEBUG,"Remove esocket %p from proc thread and send it to worker #%u",a_esocket, a_worker->id);
     dap_events_socket_assign_on_worker_inter(l_es_assign_input, a_esocket);
     l_es_assign_input->flags |= DAP_SOCK_READY_TO_WRITE;
-    s_update_poll_flags(a_thread, l_es_assign_input);
+    dap_proc_thread_esocket_update_poll_flags(a_thread, l_es_assign_input);
     return true;
 }
 
@@ -700,7 +700,7 @@ int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_
     dap_events_socket_t * l_es_io_input = a_thread->queue_io_input[a_worker->id];
     dap_events_socket_write_inter(l_es_io_input,a_esocket, a_data, a_data_size);
     l_es_io_input->flags |= DAP_SOCK_READY_TO_WRITE;
-    s_update_poll_flags(a_thread, l_es_io_input);
+    dap_proc_thread_esocket_update_poll_flags(a_thread, l_es_io_input);
     return 0;
 }
 
@@ -734,7 +734,7 @@ int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worke
 
     dap_events_socket_write_inter(l_es_io_input,a_esocket, l_data, l_data_size);
     l_es_io_input->flags |= DAP_SOCK_READY_TO_WRITE;
-    s_update_poll_flags(a_thread, l_es_io_input);
+    dap_proc_thread_esocket_update_poll_flags(a_thread, l_es_io_input);
     return 0;
 }
 
@@ -753,6 +753,6 @@ void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a
     dap_events_socket_queue_ptr_send_to_input(a_thread->queue_callback_input[a_worker_id],l_msg );
 
     a_thread->queue_callback_input[a_worker_id]->flags |= DAP_SOCK_READY_TO_WRITE;
-    s_update_poll_flags(a_thread, a_thread->queue_callback_input[a_worker_id]);
+    dap_proc_thread_esocket_update_poll_flags(a_thread, a_thread->queue_callback_input[a_worker_id]);
 
 }
diff --git a/dap-sdk/net/core/include/dap_proc_thread.h b/dap-sdk/net/core/include/dap_proc_thread.h
index d731338197..8f307aff59 100644
--- a/dap-sdk/net/core/include/dap_proc_thread.h
+++ b/dap-sdk/net/core/include/dap_proc_thread.h
@@ -55,6 +55,7 @@ typedef struct dap_proc_thread{
 #else
 #error "No poll for proc thread for your platform"
 #endif
+    void * _inheritor;
 } dap_proc_thread_t;
 
 int dap_proc_thread_init(uint32_t a_threads_count);
@@ -69,6 +70,8 @@ int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_
 int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker,  dap_events_socket_t *a_esocket,
                                         const char * a_format,...);
 
+int dap_proc_thread_esocket_update_poll_flags(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket);
+
 typedef void (*dap_proc_worker_callback_t)(dap_worker_t *,void *);
 
 void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a_worker_id, dap_proc_worker_callback_t a_callback, void * a_arg);
diff --git a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c
index c71aafc118..57a7373416 100644
--- a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c
+++ b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c
@@ -106,6 +106,47 @@ size_t dap_stream_ch_pkt_write_f_mt(dap_stream_worker_t * a_worker , dap_stream_
 
 }
 
+/**
+ * @brief dap_stream_ch_pkt_write_f_inter
+ * @param a_queue
+ * @param a_ch
+ * @param a_type
+ * @param a_format
+ * @return
+ */
+size_t dap_stream_ch_pkt_write_f_inter(dap_events_socket_t * a_queue  , dap_stream_ch_t *a_ch, uint8_t a_type, const char * a_format,...)
+{
+    va_list ap;
+    va_start(ap,a_format);
+    int l_data_size = dap_vsnprintf(NULL,0,a_format,ap);
+    if (l_data_size <0 ){
+        log_it(L_ERROR,"Can't write out formatted data '%s' with values",a_format);
+        return 0;
+    }
+    l_data_size++; // To calc trailing zero
+    dap_stream_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_stream_worker_msg_io_t);
+    l_msg->ch = a_ch;
+    l_msg->ch_pkt_type = a_type;
+    l_msg->data = DAP_NEW_SIZE(void,l_data_size);
+    l_msg->flags_set = DAP_SOCK_READY_TO_WRITE;
+    l_data_size = dap_vsnprintf(l_msg->data,0,a_format,ap);
+    if (l_data_size <0 ){
+        log_it(L_ERROR,"Can't write out formatted data '%s' with values",a_format);
+        DAP_DELETE(l_msg);
+        return 0;
+    }
+    l_data_size++;
+    l_msg->data_size = l_data_size;
+    int l_ret= dap_events_socket_queue_ptr_send_to_input(a_queue , l_msg );
+    if (l_ret!=0){
+        log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret);
+        DAP_DELETE(l_msg);
+        return 0;
+    }
+    return l_data_size;
+
+}
+
 /**
  * @brief dap_stream_ch_pkt_write_mt
  * @param a_ch
@@ -132,6 +173,34 @@ size_t dap_stream_ch_pkt_write_mt(dap_stream_worker_t * a_worker , dap_stream_ch
     return a_data_size;
 }
 
+
+/**
+ * @brief dap_stream_ch_pkt_write_inter
+ * @param a_queue
+ * @param a_ch
+ * @param a_type
+ * @param a_data
+ * @param a_data_size
+ * @return
+ */
+size_t dap_stream_ch_pkt_write_inter(dap_events_socket_t * a_queue , dap_stream_ch_t *a_ch, uint8_t a_type, const void * a_data, size_t a_data_size)
+{
+    dap_stream_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_stream_worker_msg_io_t);
+    l_msg->ch = a_ch;
+    l_msg->ch_pkt_type = a_type;
+    l_msg->data = DAP_NEW_SIZE(void,a_data_size);
+    l_msg->flags_set = DAP_SOCK_READY_TO_WRITE;
+    l_msg->data_size = a_data_size;
+    memcpy( l_msg->data, a_data, a_data_size);
+    int l_ret= dap_events_socket_queue_ptr_send_to_input(a_queue , l_msg );
+    if (l_ret!=0){
+        log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret);
+        DAP_DELETE(l_msg);
+        return 0;
+    }
+    return a_data_size;
+}
+
 /**
  * @brief dap_stream_ch_check_unsafe
  * @param a_worker
diff --git a/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h b/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h
index 43c8949c09..b561c723c4 100644
--- a/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h
+++ b/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h
@@ -58,3 +58,6 @@ bool dap_stream_ch_check_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t *
 
 size_t dap_stream_ch_pkt_write_f_mt(dap_stream_worker_t * a_worker , dap_stream_ch_t *a_ch, uint8_t a_type, const char * a_str,...);
 size_t dap_stream_ch_pkt_write_mt(dap_stream_worker_t * a_worker , dap_stream_ch_t *a_ch,  uint8_t a_type, const void * a_data, size_t a_data_size);
+
+size_t dap_stream_ch_pkt_write_f_inter(dap_events_socket_t * a_queue , dap_stream_ch_t *a_ch, uint8_t a_type, const char * a_str,...);
+size_t dap_stream_ch_pkt_write_inter(dap_events_socket_t * a_queue , dap_stream_ch_t *a_ch,  uint8_t a_type, const void * a_data, size_t a_data_size);
diff --git a/dap-sdk/net/stream/stream/dap_stream_worker.c b/dap-sdk/net/stream/stream/dap_stream_worker.c
index eda8d5a2ed..fda2567375 100644
--- a/dap-sdk/net/stream/stream/dap_stream_worker.c
+++ b/dap-sdk/net/stream/stream/dap_stream_worker.c
@@ -28,6 +28,13 @@
 
 #define LOG_TAG "dap_stream_worker"
 
+struct proc_thread_stream{
+    dap_proc_thread_t * proc_thread;
+    dap_events_socket_t ** queue_ch_io_input; // Inputs for ch assign queues
+    dap_stream_ch_t * channels; // Client channels assigned on worker. Unsafe list, operate only in worker's context
+    pthread_rwlock_t channels_rwlock;
+};
+
 static void s_ch_io_callback(dap_events_socket_t * a_es, void * a_msg);
 
 /**
@@ -40,18 +47,44 @@ int dap_stream_worker_init()
     for (uint32_t i = 0; i < l_worker_count; i++){
         dap_worker_t * l_worker = dap_events_worker_get(i);
         if (!l_worker) {
-            log_it(L_CRITICAL,"Can't init stream worker, woreker thread don't exist");
+            log_it(L_CRITICAL,"Can't init stream worker,- worker thread don't exist");
             return -2;
         }
         if (l_worker->_inheritor){
-            log_it(L_CRITICAL,"Can't init stream worker, core worker has already inheritor");
+            log_it(L_CRITICAL,"Can't init stream worker,- core worker has already inheritor");
             return -1;
         }
         dap_stream_worker_t *l_stream_worker =  DAP_NEW_Z(dap_stream_worker_t);
+        if(!l_stream_worker)
+            return -5;
         l_worker->_inheritor = l_stream_worker;
         l_stream_worker->worker = l_worker;
         pthread_rwlock_init( &l_stream_worker->channels_rwlock, NULL);
+
         l_stream_worker->queue_ch_io = dap_events_socket_create_type_queue_ptr_mt( l_worker, s_ch_io_callback);
+        if(! l_stream_worker->queue_ch_io)
+            return -6;
+    }
+    for (uint32_t i = 0; i < l_worker_count; i++){
+        dap_proc_thread_t * l_proc_thread  = dap_proc_thread_get(i);
+        if (!l_proc_thread) {
+            log_it(L_CRITICAL,"Can't init stream proc thread,- proc thread don't exist");
+            return -3;
+        }
+        if (l_proc_thread->_inheritor){
+            log_it(L_CRITICAL,"Can't init stream worker, core worker has already inheritor");
+            return -4;
+        }
+        struct proc_thread_stream * l_thread_stream = DAP_NEW_Z(struct proc_thread_stream);
+        if (!l_thread_stream)
+            return -7;
+        l_proc_thread->_inheritor = l_thread_stream;
+        l_thread_stream->queue_ch_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t *, sizeof (dap_events_socket_t*)*l_worker_count);
+        for (uint32_t j = 0; j < l_worker_count; j++){
+            dap_worker_t * l_worker = dap_events_worker_get(j);
+            dap_stream_worker_t *l_stream_worker = (dap_stream_worker_t*) l_worker->_inheritor;
+            l_thread_stream->queue_ch_io_input[i] = dap_events_socket_queue_ptr_create_input(l_stream_worker->queue_ch_io);
+        }
     }
     return 0;
 }
@@ -89,3 +122,59 @@ static void s_ch_io_callback(dap_events_socket_t * a_es, void * a_msg)
         dap_stream_ch_pkt_write_unsafe(l_msg_ch, l_msg->ch_pkt_type, l_msg->data,l_msg->data_size);
     DAP_DELETE(l_msg);
 }
+
+/**
+ * @brief dap_proc_thread_stream_ch_write_inter
+ * @param a_thread
+ * @param a_worker
+ * @param a_ch
+ * @param a_type
+ * @param a_data
+ * @param a_data_size
+ * @return
+ */
+size_t dap_proc_thread_stream_ch_write_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_stream_ch_t *a_ch, uint8_t a_type,
+                                        const void * a_data, size_t a_data_size)
+{
+    struct proc_thread_stream * l_thread_stream = (struct proc_thread_stream *) a_thread->_inheritor;
+    dap_events_socket_t* l_es_input = l_thread_stream->queue_ch_io_input[a_worker->id];
+    size_t l_ret = dap_stream_ch_pkt_write_inter(l_es_input,a_ch,a_type,a_data,a_data_size);
+    l_es_input->flags |= DAP_SOCK_READY_TO_WRITE;
+    dap_proc_thread_esocket_update_poll_flags(a_thread,l_es_input);
+    return l_ret;
+}
+
+/**
+ * @brief dap_proc_thread_stream_ch_write_f_inter
+ * @param a_thread
+ * @param a_worker
+ * @param a_ch
+ * @param a_type
+ * @param a_format
+ * @return
+ */
+size_t dap_proc_thread_stream_ch_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker,  dap_stream_ch_t *a_ch,uint8_t a_type,
+                                        const char * a_format,...)
+{
+    struct proc_thread_stream * l_thread_stream = (struct proc_thread_stream *) a_thread->_inheritor;
+    va_list ap, ap_copy;
+    va_start(ap,a_format);
+    va_copy(ap_copy, ap);
+    int l_data_size = dap_vsnprintf(NULL,0,a_format,ap);
+    va_end(ap);
+    if (l_data_size <0 ){
+        log_it(L_ERROR,"Can't write out formatted data '%s' with values",a_format);
+        va_end(ap_copy);
+        return 0;
+    }
+
+    dap_events_socket_t * l_es_io_input = l_thread_stream->queue_ch_io_input[a_worker->id];
+    char * l_data = DAP_NEW_SIZE(char,l_data_size+1); if (!l_data) return -1;
+    l_data_size = dap_vsprintf(l_data,a_format,ap_copy);
+    va_end(ap_copy);
+
+    size_t l_ret = dap_stream_ch_pkt_write_inter(l_es_io_input,a_ch,a_type, l_data, l_data_size);
+    l_es_io_input->flags |= DAP_SOCK_READY_TO_WRITE;
+    dap_proc_thread_esocket_update_poll_flags(a_thread, l_es_io_input);
+    return l_ret;
+}
diff --git a/dap-sdk/net/stream/stream/include/dap_stream_worker.h b/dap-sdk/net/stream/stream/include/dap_stream_worker.h
index 2e84a65055..5a33b8eb9a 100644
--- a/dap-sdk/net/stream/stream/include/dap_stream_worker.h
+++ b/dap-sdk/net/stream/stream/include/dap_stream_worker.h
@@ -22,6 +22,7 @@
 */
 #pragma once
 #include "dap_worker.h"
+#include "dap_proc_thread.h"
 #include "dap_stream_ch.h"
 
 typedef struct dap_stream_worker {
@@ -43,3 +44,8 @@ typedef struct dap_stream_worker_msg_io {
 } dap_stream_worker_msg_io_t;
 
 int dap_stream_worker_init();
+
+size_t dap_proc_thread_stream_ch_write_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_stream_ch_t *a_ch,
+                                        uint8_t a_type,const void * a_data, size_t a_data_size);
+size_t dap_proc_thread_stream_ch_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker,  dap_stream_ch_t *a_ch,
+                                        uint8_t a_type,const char * a_format,...);
diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c
index ac22939f2b..375f7b3f88 100644
--- a/modules/channel/chain/dap_stream_ch_chain.c
+++ b/modules/channel/chain/dap_stream_ch_chain.c
@@ -515,7 +515,7 @@ static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_worker, void *a_a
     UNUSED(a_worker);
     struct ch_chain_pkt_in * l_pkt_in = (struct ch_chain_pkt_in*) a_arg;
 
-    dap_stream_ch_chain_pkt_write_error(l_pkt_in->ch, l_pkt_in->pkt->pkt_hdr.net_id,
+    dap_stream_ch_chain_pkt_write_error_unsafe(l_pkt_in->ch, l_pkt_in->pkt->pkt_hdr.net_id,
                                         l_pkt_in->pkt->pkt_hdr.chain_id, l_pkt_in->pkt->pkt_hdr.cell_id,
                                         "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED");
     dap_stream_ch_set_ready_to_write_unsafe(l_pkt_in->ch, true);
@@ -679,7 +679,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                                                       l_chain_pkt_data_size, l_ch_chain->callback_notify_arg);
             }
         } else {
-            dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+            dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
                                                 l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
                                                 "ERROR_NET_INVALID_ID");
             dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
@@ -689,7 +689,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
     if (a_ch->stream->session->acl && !a_ch->stream->session->acl[l_acl_idx]) {
         log_it(L_WARNING, "Unauthorized request attempt to network %s",
                dap_chain_net_by_id(l_chain_pkt->hdr.net_id)->pub.name);
-        dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+        dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
                                             l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
                                             "ERROR_NET_NOT_AUTHORIZED");
         dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
@@ -781,7 +781,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                            l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64);
             }else{
                 log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN", l_chain_pkt_data_size);
-                dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+                dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
                         l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
                         "ERROR_CHAIN_PACKET_TYPE_FIRST_CHAIN_INCORRET_DATA_SIZE(%zd/%zd)",l_chain_pkt_data_size, sizeof(dap_chain_node_addr_t));
             }
@@ -831,7 +831,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
             if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) {
                 if(l_ch_chain->is_on_request){
                     log_it(L_WARNING, "Can't process SYNC_CHAINS request because its already busy with syncronization");
-                    dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+                    dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
                             l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
                             "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS");
                     dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
@@ -852,7 +852,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                         log_it(L_INFO, "Can't process SYNC_CHAINS request between %s and %s because not in idle state",
                                l_hash_from_str? l_hash_from_str:"(null)",
                                l_hash_to_str?l_hash_to_str:"(null)");
-                        dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+                        dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
                                 l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
                                 "ERROR_STATE_NOT_IN_IDLE");
                         dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
@@ -869,7 +869,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                 DAP_DELETE(l_hash_to_str);
             }else{
                 log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request));
-                dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+                dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
                         l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
                         "ERROR_CHAIN_PKT_DATA_SIZE" );
             }
@@ -879,7 +879,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
             if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) {
                 if(l_ch_chain->state != CHAIN_STATE_IDLE) {
                     log_it(L_WARNING, "Can't process SYNC_GLOBAL_DB request because not in idle state");
-                    dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+                    dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
                             l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
                             "ERROR_STATE_NOT_IN_IDLE");
                     dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
@@ -887,7 +887,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                 } else { // receive the latest global_db revision of the remote node -> go to send mode
                     if(l_ch_chain->is_on_request){
                         log_it(L_WARNING, "Can't process SYNC_CHAINS request because its already busy with syncronization");
-                        dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+                        dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
                                 l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
                                 "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS");
                         dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
@@ -911,7 +911,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                 }
             }else{
                 log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request));
-                dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+                dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
                         l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
                         "ERROR_CHAIN_PKT_DATA_SIZE" );
             }
@@ -946,7 +946,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                         dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_in_chains_callback, l_sync_request);
                     } else {
                         log_it(L_WARNING, "Empty chain packet");
-                        dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+                        dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
                                 l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
                                 "ERROR_CHAIN_PACKET_EMPTY");
                         dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
@@ -964,7 +964,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                       l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr) );
             }else {
                log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB", l_chain_pkt_data_size);
-               dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+               dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
                        l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
                        "ERROR_CHAIN_PACKET_TYPE_FIRST_GLOBAL_DB_INCORRET_DATA_SIZE");
             }
@@ -989,7 +989,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                 dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_gdb_in_pkt_proc_callback, l_sync_request);
             } else {
                 log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size");
-                dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+                dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
                         l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
                         "ERROR_GLOBAL_DB_PACKET_EMPTY");
                 dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
@@ -1026,7 +1026,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                 }
             }else{
                 log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request));
-                dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+                dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
                         l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
                         "ERROR_CHAIN_PKT_DATA_SIZE" );
             }
@@ -1035,7 +1035,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
         case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR:
             break;
         default: {
-            dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
+            dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
                                                 l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
                                                 "ERROR_UNKNOWN_CHAIN_PKT_TYPE");
             }
diff --git a/modules/channel/chain/dap_stream_ch_chain_pkt.c b/modules/channel/chain/dap_stream_ch_chain_pkt.c
index 19cc566016..d547e587d1 100644
--- a/modules/channel/chain/dap_stream_ch_chain_pkt.c
+++ b/modules/channel/chain/dap_stream_ch_chain_pkt.c
@@ -16,6 +16,7 @@
 #endif
 
 #include "dap_stream_ch.h"
+#include "dap_stream_worker.h"
 #include "dap_stream_ch_pkt.h"
 #include "dap_stream_ch_chain_pkt.h"
 #include "dap_chain.h"
@@ -68,6 +69,7 @@ size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_typ
     return l_ret;
 }
 
+
 size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id,
                                         dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id,
                                         const void * a_data, size_t a_data_size)
@@ -87,3 +89,36 @@ size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_strea
     DAP_DELETE(l_chain_pkt);
     return l_ret;
 }
+
+/**
+ * @brief dap_stream_ch_chain_pkt_write_inter
+ * @param a_thread
+ * @param a_worker
+ * @param a_ch
+ * @param a_type
+ * @param a_net_id
+ * @param a_chain_id
+ * @param a_cell_id
+ * @param a_data
+ * @param a_data_size
+ * @return
+ */
+size_t dap_stream_ch_chain_pkt_write_inter(dap_proc_thread_t * a_thread, dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id,
+                                        dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id,
+                                        const void * a_data, size_t a_data_size)
+{
+    dap_stream_ch_chain_pkt_t * l_chain_pkt;
+    size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr) + a_data_size;
+    l_chain_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size );
+    l_chain_pkt->hdr.version = 1;
+    l_chain_pkt->hdr.net_id.uint64 = a_net_id.uint64;
+    l_chain_pkt->hdr.cell_id.uint64 = a_cell_id.uint64;
+    l_chain_pkt->hdr.chain_id.uint64 = a_chain_id.uint64;
+
+    if (a_data_size && a_data)
+        memcpy( l_chain_pkt->data, a_data, a_data_size);
+
+    size_t l_ret  = dap_proc_thread_stream_ch_write_inter(a_thread,  a_worker->worker, a_ch, a_type , l_chain_pkt, l_chain_pkt_size);
+    DAP_DELETE(l_chain_pkt);
+    return l_ret;
+}
diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h
index 2cf9448dce..e6b333c3bc 100644
--- a/modules/channel/chain/include/dap_stream_ch_chain.h
+++ b/modules/channel/chain/include/dap_stream_ch_chain.h
@@ -29,6 +29,7 @@
 #include "dap_chain_common.h"
 #include "dap_chain.h"
 #include "dap_chain_global_db_hist.h"
+#include "dap_chain_node_client.h"
 #include "dap_list.h"
 #include "dap_stream_ch_chain_pkt.h"
 #include "uthash.h"
@@ -62,8 +63,8 @@ typedef struct dap_stream_ch_chain_hash_item{
 typedef struct dap_stream_ch_chain {
     dap_stream_ch_t * ch;
 
-
     dap_stream_ch_chain_state_t state;
+    dap_chain_node_client_t * node_client; // Node client associated with stream
     uint64_t stats_request_atoms_processed;
     uint64_t stats_request_gdb_processed;
 
diff --git a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h
index 8e36ebba3c..388400ae1a 100644
--- a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h
+++ b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h
@@ -29,6 +29,7 @@
 #include <stdarg.h>
 
 #include "dap_common.h"
+#include "dap_proc_thread.h"
 #include "dap_chain_common.h"
 #include "dap_chain_datum.h"
 #include "dap_chain_cs.h"
@@ -134,7 +135,20 @@ size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_strea
                                         dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id,
                                         const void * a_data, size_t a_data_size);
 
-inline static size_t dap_stream_ch_chain_pkt_write_error(dap_stream_ch_t *a_ch, dap_chain_net_id_t a_net_id,
+size_t dap_stream_ch_chain_pkt_write_inter(dap_proc_thread_t * a_thread, dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id,
+                                        dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id,
+                                        const void * a_data, size_t a_data_size);
+
+/**
+ * @brief dap_stream_ch_chain_pkt_write_error_unsafe
+ * @param a_ch
+ * @param a_net_id
+ * @param a_chain_id
+ * @param a_cell_id
+ * @param a_err_string_format
+ * @return
+ */
+inline static size_t dap_stream_ch_chain_pkt_write_error_unsafe(dap_stream_ch_t *a_ch, dap_chain_net_id_t a_net_id,
                                                   dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, const char * a_err_string_format,... )
 {
     va_list l_va;
@@ -152,3 +166,33 @@ inline static size_t dap_stream_ch_chain_pkt_write_error(dap_stream_ch_t *a_ch,
         return 0;
     }
 }
+
+/**
+ * @brief dap_stream_ch_chain_pkt_write_error_inter
+ * @param a_thread
+ * @param a_stream_worker
+ * @param a_ch
+ * @param a_net_id
+ * @param a_chain_id
+ * @param a_cell_id
+ * @param a_err_string_format
+ * @return
+ */
+inline static size_t dap_stream_ch_chain_pkt_write_error_inter(dap_proc_thread_t * a_thread, dap_stream_worker_t * a_stream_worker,  dap_stream_ch_t *a_ch, dap_chain_net_id_t a_net_id,
+                                                  dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, const char * a_err_string_format,... )
+{
+    va_list l_va;
+    char * l_str;
+    va_start(l_va, a_err_string_format);
+    int l_size = vsnprintf(NULL,0,a_err_string_format,l_va);
+    if(l_size >0){
+        l_size++;
+        l_str = DAP_NEW_S_SIZE(char, l_size);
+        vsnprintf(l_str,l_size,a_err_string_format,l_va);
+        va_end(l_va);
+        return  dap_stream_ch_chain_pkt_write_inter(a_thread, a_stream_worker, a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR, a_net_id, a_chain_id, a_cell_id, l_str,l_size );
+    }else{
+        va_end(l_va);
+        return 0;
+    }
+}
diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c
index 05dccc9db8..e288b5606a 100644
--- a/modules/net/dap_chain_net.c
+++ b/modules/net/dap_chain_net.c
@@ -142,7 +142,7 @@ typedef struct dap_chain_net_pvt{
 
     // Established links
     dap_list_t *links;                  // Links list
-    size_t links_count;
+    size_t links_connected_count;
 
     // Prepared links
     dap_list_t *links_info;             // Links info list
@@ -211,15 +211,9 @@ static void s_node_link_callback_stage(dap_chain_node_client_t * a_node_client,d
 static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client, int a_error, void * a_arg);
 
 static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg);
-static bool s_node_link_states_proc(dap_proc_thread_t *a_thread, void *a_arg);
 static void s_net_state_link_prepare_success(dap_worker_t * a_worker,dap_chain_node_info_t * a_node_info, void * a_arg);
 static void s_net_state_link_prepare_error(dap_worker_t * a_worker,dap_chain_node_info_t * a_node_info, void * a_arg, int a_errno);
 
-
-
-
-static bool s_net_check_timer_callback ( void * a_net);
-static void s_net_check_thread_start( dap_chain_net_t * a_net );
 static void s_net_proc_kill( dap_chain_net_t * a_net );
 int s_net_load(const char * a_net_name, uint16_t a_acl_idx);
 
@@ -435,30 +429,29 @@ static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_clie
         log_it(L_NOTICE, "Established connection with %s."NODE_ADDR_FP_STR,l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address));
     pthread_rwlock_wrlock(&l_net_pvt->rwlock);
     l_net_pvt->links = dap_list_append(l_net_pvt->links, a_node_client);
-    l_net_pvt->links_count++;
+    l_net_pvt->links_connected_count++;
+
+    // If we're fist time here - initiate the GDB sync
+    if (! a_node_client->is_reconnecting){
+        dap_stream_ch_chain_sync_request_t l_sync_gdb = {};
+        // Get last timestamp in log if wasn't SYNC_FROM_ZERO flag
+        if (! (l_net_pvt->flags & F_DAP_CHAIN_NET_SYNC_FROM_ZERO) )
+            l_sync_gdb.id_start = (uint64_t) dap_db_get_last_id_remote(a_node_client->remote_node_addr.uint64);
+        l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
+        log_it(L_DEBUG, "Prepared request to gdb sync from %llu to %llu", l_sync_gdb.id_start, l_sync_gdb.id_end?l_sync_gdb.id_end:-1 );
+        // find dap_chain_id_t
+        dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb");
+        dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t ) {0};
 
+        dap_stream_ch_chain_pkt_write_unsafe( a_node_client->ch_chain,
+                                                           DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id,
+                                                        l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb));
+    }
     a_node_client->is_reconnecting = false;
 
     if(l_net_pvt->state == NET_STATE_LINKS_CONNECTING ){
         l_net_pvt->state = NET_STATE_LINKS_ESTABLISHED;
-
-        a_node_client->stream_worker = dap_client_get_stream_worker(a_node_client->client);
-        a_node_client->ch_chain = dap_client_get_stream_ch_unsafe(a_node_client->client, dap_stream_ch_chain_get_id());
-        if (a_node_client->ch_chain){
-            a_node_client->ch_chain_uuid = a_node_client->ch_chain->uuid;
-            a_node_client->ch_chain_net = dap_client_get_stream_ch_unsafe(a_node_client->client, dap_stream_ch_chain_net_get_id());
-            if (a_node_client->ch_chain_net){
-                a_node_client->ch_chain_net_uuid = a_node_client->ch_chain_net->uuid;
-            }else{
-                log_it(L_WARNING,"No channel 'chain net' in stream connection");
-            }
-            dap_proc_queue_add_callback_inter(dap_client_get_stream_worker(a_node_client->client)->worker->proc_queue_input,
-                                          s_node_link_states_proc, a_node_client);
-        }else{
-            log_it(L_CRITICAL,"No channel 'chain' in stream connection, disconnecting link");
-            l_net_pvt->state = NET_STATE_LINKS_CONNECTING;
-            dap_client_go_stage(a_node_client->client,STAGE_BEGIN,NULL);
-        }
+        dap_proc_queue_add_callback_inter(a_node_client->stream_worker->worker->proc_queue_input,s_net_states_proc,l_net );
     }
     pthread_rwlock_unlock(&l_net_pvt->rwlock);
 
@@ -482,10 +475,10 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t * a_node_c
 
             a_node_client->is_reconnecting = true;
             dap_chain_node_client_create_n_connect(l_net, a_node_client->info,"CN",
-                                                                                            s_node_link_callback_connected,
-                                                                                            s_node_link_callback_disconnected,
-                                                                                            s_node_link_callback_stage,
-                                                                                            s_node_link_callback_error,NULL);
+                                                    s_node_link_callback_connected,
+                                                    s_node_link_callback_disconnected,
+                                                    s_node_link_callback_stage,
+                                                    s_node_link_callback_error,NULL);
         }else if (l_net_pvt->state_target == NET_STATE_OFFLINE){
             log_it(L_INFO, "%s."NODE_ADDR_FP_STR" disconnected",l_net->pub.name,NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address));
 
@@ -494,8 +487,8 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t * a_node_c
                    ,NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address)
                    , c_net_states[l_net_pvt->state_target]  );
         }
-        if(l_net_pvt->links_count)
-            l_net_pvt->links_count--;
+        if(l_net_pvt->links_connected_count)
+            l_net_pvt->links_connected_count--;
         else
             log_it(L_CRITICAL,"Links count is zero in disconnected callback, looks smbd decreased it twice or forget to increase on connect/reconnect");
     pthread_rwlock_unlock(&l_net_pvt->rwlock);
@@ -525,6 +518,30 @@ static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client,
 {
     dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg;
     log_it(L_WARNING, "Can't establish link with %s."NODE_ADDR_FP_STR, l_net->pub.name, NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address));
+}
+
+/**
+ * @brief s_node_link_callback_delete
+ * @param a_node_client
+ * @param a_arg
+ */
+static void s_node_link_callback_delete(dap_chain_node_client_t * a_node_client, void * a_arg)
+{
+    log_it(L_DEBUG,"Remove node client from list");
+    dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg;
+    dap_chain_net_pvt_t * l_net_pvt = PVT(l_net);
+    pthread_rwlock_wrlock(&l_net_pvt->rwlock);
+    for ( dap_list_t * it = l_net_pvt->links; it; it=it->next ){
+        dap_chain_node_client_t * l_client =(dap_chain_node_client_t *) it->data;
+        // Cut out current iterator if it equals with deleting handler
+        if (l_client == a_node_client){
+            if (it->prev)
+                it->prev->next = it->next;
+            if (it->next)
+                it->next->prev = it->prev;
+        }
+    }
+    pthread_rwlock_unlock(&l_net_pvt->rwlock);
     dap_chain_node_client_close(a_node_client);
 }
 
@@ -582,205 +599,6 @@ static void s_net_state_link_prepare_error(dap_worker_t * a_worker,dap_chain_nod
     }
 }
 
-/**
- * @brief s_node_link_states_proc
- * @param a_thread
- * @param a_arg
- * @return
- */
-static bool s_node_link_states_proc(dap_proc_thread_t *a_thread, void *a_arg)
-{
-    bool l_repeate_after_exit = false;
-    dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t*) a_arg;
-    assert(l_node_client);
-
-    dap_chain_net_pvt_t * l_net_pvt = PVT(l_node_client->net);
-    switch (l_node_client->state) {
-        case NODE_CLIENT_STATE_ESTABLISHED:
-            if(l_node_client->sync_chains){
-                l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS;
-                l_repeate_after_exit = true;
-            }
-            if(l_node_client->sync_gdb){
-                l_node_client->state = NODE_CLIENT_STATE_SYNC_GDB;
-                l_repeate_after_exit = true;
-            }
-        break;
-        case NODE_CLIENT_STATE_SYNC_GDB:{
-            dap_stream_worker_t *l_worker = dap_client_get_stream_worker(l_node_client->client);
-            dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, dap_stream_ch_chain_get_id());
-            if (   !l_ch_chain) { // Channel or stream or client itself closed
-                l_tmp = dap_list_next(l_tmp);
-                dap_chain_node_client_close(l_node_client);
-                l_net_pvt->links = dap_list_remove(l_net_pvt->links, l_node_client);
-                continue;
-            }
-
-            dap_stream_ch_chain_sync_request_t l_sync_gdb = {};
-            // Get last timestamp in log if wasn't SYNC_FROM_ZERO flag
-            if (! (l_net_pvt->flags & F_DAP_CHAIN_NET_SYNC_FROM_ZERO) )
-                l_sync_gdb.id_start = (uint64_t) dap_db_get_last_id_remote(l_node_client->remote_node_addr.uint64);
-            l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
-            log_it(L_DEBUG, "Prepared request to gdb sync from %llu to %llu", l_sync_gdb.id_start, l_sync_gdb.id_end?l_sync_gdb.id_end:-1 );
-            // find dap_chain_id_t
-            dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb");
-            dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t ) {0};
-            dap_chain_node_client_reset(l_node_client);
-            size_t l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id,
-                                                            l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb));
-            if (l_res == 0) {
-                log_it(L_WARNING, "Can't send GDB sync request");
-                continue;
-            }
-
-            // wait for finishing of request
-            int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms
-            // TODO add progress info to console
-            l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
-            switch (l_res) {
-            case -1:
-                log_it(L_WARNING, "Timeout with link sync gdb");
-                break;
-            case 0:
-                log_it(L_INFO, "Node sync gdb completed");
-                break;
-            default:
-                log_it(L_INFO, "Node sync gdb error %d",l_res);
-            }
-
-            dap_chain_node_client_reset(l_node_client);
-            l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS, l_net->pub.id,
-                                                     l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb));
-            l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
-            switch (l_res) {
-                case -1:
-                    log_it(L_WARNING, "Timeout with reverse link gdb sync");
-                    break;
-                case 0:
-                    log_it(L_INFO, "Node reverse gdb sync completed");
-                    break;
-                default:
-                    log_it(L_INFO, "Node reverse gdb sync error %d",l_res);
-            }
-
-            // -----
-            if (!l_net_pvt->links) {
-                l_net_pvt->state = NET_STATE_LINKS_PREPARE;
-            } else if (l_net_pvt->state_target >= NET_STATE_SYNC_CHAINS) {
-                l_net_pvt->state = NET_STATE_SYNC_CHAINS;
-            } else {    // Synchronization done, go offline
-                log_it(L_INFO, "Synchronization done, go offline");
-                l_net_pvt->flags ^= F_DAP_CHAIN_NET_GO_SYNC;
-                l_net_pvt->flags ^= F_DAP_CHAIN_NET_SYNC_FROM_ZERO;
-                l_net_pvt->state = l_net_pvt->state_target = NET_STATE_OFFLINE;
-            }
-        } break;
-        case NODE_CLIENT_STATE_SYNC_CHAINS:{
-            dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data;
-            dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, dap_stream_ch_chain_get_id());
-            if (!l_ch_chain) { // Channel or stream or client itself closed
-                l_tmp = dap_list_next(l_tmp);
-                dap_chain_node_client_close(l_node_client);
-                l_net_pvt->links = dap_list_remove(l_net_pvt->links, l_node_client);
-                continue;
-            }
-            dap_stream_worker_t *l_worker = dap_client_get_stream_worker(l_node_client->client);
-            dap_chain_t * l_chain = NULL;
-            int l_res = 0;
-            DL_FOREACH (l_net->pub.chains, l_chain) {
-                dap_chain_node_client_reset(l_node_client);
-                dap_stream_ch_chain_sync_request_t l_request = {0};
-
-                // TODO: Uncomment next block when finish with partial updates
-                /*
-                if (! (l_pvt_net->flags & F_DAP_CHAIN_NET_SYNC_FROM_ZERO) )
-                    dap_chain_get_atom_last_hash(l_chain,&l_request.hash_from);
-                */
-
-                if ( !dap_hash_fast_is_blank(&l_request.hash_from) ){
-                    if(dap_log_level_get() <= L_DEBUG){
-                        char l_hash_str[128]={[0]='\0'};
-                        dap_chain_hash_fast_to_str(&l_request.hash_from,l_hash_str,sizeof (l_hash_str)-1);
-                        log_it(L_DEBUG,"Send sync chain request to"NODE_ADDR_FP_STR" for %s:%s from %s to infinity",
-                               NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr) ,l_net->pub.name, l_chain->name,  l_hash_str);
-                    }
-                }else
-                    log_it(L_DEBUG,"Send sync chain request for all the chains for addr "NODE_ADDR_FP_STR,
-                           NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr));
-                dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id,
-                                                 l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request));
-                // wait for finishing of request
-                int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms
-                // TODO add progress info to console
-                l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
-                switch (l_res) {
-                    case -1:
-                        //log_it(L_WARNING, "Timeout with sync of chain '%s' ", l_chain->name);
-                        break;
-                    case 0:
-                        l_need_flush = true;
-                        log_it(L_INFO, "Sync of chain '%s' completed ", l_chain->name);
-                        break;
-                    default:
-                        log_it(L_ERROR, "Sync of chain '%s' error %d", l_chain->name,l_res);
-                }
-
-
-                dap_chain_node_client_reset(l_node_client);
-
-                l_request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
-                dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS, l_net->pub.id,
-                                                 l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request));
-                l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
-                switch (l_res) {
-                    case -1:
-                        //log_it(L_WARNING, "Timeout with reverse sync of chain '%s' ", l_chain->name);
-                        break;
-                    case 0:
-                        l_need_flush = true;
-                        log_it(L_INFO, "Reverse sync of chain '%s' completed ", l_chain->name);
-                        // set time of last sync
-                        {
-                            struct timespec l_to;
-                            clock_gettime(CLOCK_MONOTONIC, &l_to);
-                            l_net_pvt->last_sync = l_to.tv_sec;
-                        }
-                        break;
-                    default:
-                        log_it(L_ERROR, "Reverse sync of chain '%s' error %d", l_chain->name,l_res);
-                }
-
-            }
-
-            ///-------------------
-            if (l_need_flush) {
-                // flush global_db
-                dap_chain_global_db_flush();
-            }
-            if (!l_net_pvt->links ) {
-                log_it( L_INFO,"Return back to state LINKS_PREPARE ");
-                l_net_pvt->state = NET_STATE_LINKS_PREPARE;
-            } else {
-                if (l_net_pvt->state_target == NET_STATE_ONLINE) {
-                    l_net_pvt->flags ^= F_DAP_CHAIN_NET_GO_SYNC;
-                    l_net_pvt->flags ^= F_DAP_CHAIN_NET_SYNC_FROM_ZERO;
-                    l_net_pvt->state = NET_STATE_ONLINE;
-                    log_it(L_INFO, "Synchronization done, status online");
-                } else {    // Synchronization done, go offline
-                    l_net_pvt->state = l_net_pvt->state_target = NET_STATE_OFFLINE;
-                    log_it(L_INFO, "Synchronization done, go offline");
-                }
-            }
-        }break;
-        case NODE_CLIENT_STATE_SYNCED:
-        break;
-        default:{
-            log_it(L_WARNING,"Non-processing node client state %d", l_node_client->state);
-        }
-    }
-    return l_repeate_after_exit;
-}
-
 /**
  * @brief s_net_states_proc
  * @param l_net
@@ -927,29 +745,16 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg)
             log_it(L_DEBUG, "%s.state: NET_STATE_LINKS_CONNECTING",l_net->pub.name);
             for (dap_list_t *l_tmp = l_net_pvt->links_info; l_tmp; l_tmp = dap_list_next(l_tmp)) {
                 dap_chain_node_info_t *l_link_info = (dap_chain_node_info_t *)l_tmp->data;
-                dap_chain_node_client_t *l_node_client = dap_chain_node_client_create_n_connect(l_net, l_link_info,"CN",s_node_link_callback_connected,
-                                                                                                s_node_link_callback_disconnected,s_node_link_callback_stage,
-                                                                                                s_node_link_callback_error,NULL);
+                (void) dap_chain_node_client_create_n_connect(l_net, l_link_info,"CN",s_node_link_callback_connected,
+                                                                     s_node_link_callback_disconnected,s_node_link_callback_stage,
+                                                                     s_node_link_callback_error,NULL);
             }
         } break;
-
+        case NET_STATE_LINKS_ESTABLISHED:{
+        // TODO call some callbacks?
+        }break;
         case NET_STATE_ONLINE: {
-            if (l_net_pvt->flags & F_DAP_CHAIN_NET_GO_SYNC)
-            {
-                switch ( l_net_pvt->state_target) {
-                // disconnect
-                case NET_STATE_OFFLINE:
-                    l_net_pvt->state = NET_STATE_OFFLINE;
-                    log_it(L_NOTICE, "Going to disconnet");
-                    break;
-                case NET_STATE_ONLINE:
-                case NET_STATE_SYNC_GDB:
-                case NET_STATE_SYNC_CHAINS:
-                    l_net_pvt->state = NET_STATE_SYNC_GDB;
-                    break;
-                default: break;
-                }
-            }
+        // TODO call some callbacks?
         }
         break;
         default: log_it (L_DEBUG, "Unprocessed state");
@@ -959,17 +764,6 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg)
     return l_repeat_after_exit;
 }
 
-/**
- * @brief net_proc_start
- * @param a_cfg
- */
-static void s_net_check_thread_start( dap_chain_net_t * a_net )
-{
-    PVT(a_net)->main_timer = dap_timerfd_start(dap_config_get_item_uint64_default(g_config,"chain_net","net_check_timeout",10)*1000,
-                                               s_net_check_timer_callback, a_net);
-}
-
-
 dap_chain_node_role_t dap_chain_net_get_role(dap_chain_net_t * a_net)
 {
     return  PVT(a_net)->node_role;
diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c
index c2d34c109a..a8a04ab847 100644
--- a/modules/net/dap_chain_node_client.c
+++ b/modules/net/dap_chain_node_client.c
@@ -407,7 +407,7 @@ static void s_ch_chain_callback_notify_packet_R(dap_stream_ch_chain_net_srv_t* a
 
 dap_chain_node_client_t* dap_chain_client_connect(dap_chain_node_info_t *a_node_info, const char *a_active_channels)
 {
-    return dap_chain_node_client_create_n_connect(a_node_info,a_active_channels,NULL,NULL,NULL,NULL,NULL);
+    return dap_chain_node_client_create_n_connect(NULL,a_node_info,a_active_channels,NULL,NULL,NULL,NULL,NULL);
 }
 
 /**
-- 
GitLab