From 4580cef22a37b5d286b4dbe580f41d46bde9f7ab Mon Sep 17 00:00:00 2001
From: Roman Khlopkov <roman.khlopkov@demlabs.net>
Date: Wed, 7 Oct 2020 17:33:51 +0000
Subject: [PATCH] [+] INT/TERM singal callbacks [+] Deinit functions call for
 proper global_db and chain net close

---
 CMakeLists.txt                                |   2 +-
 dap-sdk/core/include/dap_math_ops.h           |   1 -
 dap-sdk/net/client/dap_client_pvt.c           |  11 +-
 dap-sdk/net/core/dap_events_socket.c          |  62 +++---
 dap-sdk/net/core/dap_worker.c                 |   7 +-
 dap-sdk/net/core/include/dap_events_socket.h  |   3 +
 dap-sdk/net/stream/stream/dap_stream.c        |   1 -
 dap-sdk/net/stream/stream/dap_stream_ctl.c    |   5 +-
 modules/chain/dap_chain_ledger.c              |  19 +-
 modules/chain/include/dap_chain_ledger.h      |   1 -
 modules/channel/chain/dap_stream_ch_chain.c   | 195 ++++++++++--------
 .../chain/include/dap_stream_ch_chain.h       |   9 +-
 modules/net/dap_chain_net.c                   |   6 +-
 modules/service/vpn/dap_chain_net_srv_vpn.c   |   7 +-
 .../vpn/dap_chain_net_vpn_client_tun.c        |  32 +++
 .../include/dap_chain_net_vpn_client_tun.h    |   1 +
 modules/type/dag/dap_chain_cs_dag.c           |   5 +-
 17 files changed, 216 insertions(+), 151 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5c920801e9..463469d520 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -2,7 +2,7 @@ project(cellframe-sdk C)
 cmake_minimum_required(VERSION 2.8)
 
 set(CMAKE_C_STANDARD 11)
-set(CELLFRAME_SDK_NATIVE_VERSION "2.6-12")
+set(CELLFRAME_SDK_NATIVE_VERSION "2.6-13")
 add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"")
 
 set(DAPSDK_MODULES "")
diff --git a/dap-sdk/core/include/dap_math_ops.h b/dap-sdk/core/include/dap_math_ops.h
index 0d398559b4..ba8d4af9a9 100755
--- a/dap-sdk/core/include/dap_math_ops.h
+++ b/dap-sdk/core/include/dap_math_ops.h
@@ -27,5 +27,4 @@ typedef union int128{int64_t i64[2];} int128_t;
 
 #endif
 
-
 #endif
diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c
index 630e83a902..67ea0a664c 100644
--- a/dap-sdk/net/client/dap_client_pvt.c
+++ b/dap-sdk/net/client/dap_client_pvt.c
@@ -161,7 +161,6 @@ int dap_client_pvt_disconnect_all_n_wait(dap_client_pvt_t *a_client_pvt)
     //dap_client_pvt_t *a_client_pvt = (a_client) ? DAP_CLIENT_PVT(a_client) : NULL;
     if(!a_client_pvt)
         return -1;
-
     pthread_mutex_lock(&a_client_pvt->disconnected_mutex);
     dap_client_go_stage(a_client_pvt->client, STAGE_BEGIN, s_client_pvt_disconnected );
     pthread_cond_wait(&a_client_pvt->disconnected_cond, &a_client_pvt->disconnected_mutex);
@@ -272,7 +271,6 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
                     break;
                 case STAGE_STREAM_CTL: {
                     log_it(L_INFO, "Go to stage STREAM_CTL: prepare the request");
-
                     char *l_request = dap_strdup_printf("%d", DAP_CLIENT_PROTOCOL_VERSION);
                     size_t l_request_size = dap_strlen(l_request);
                     log_it(L_DEBUG, "STREAM_CTL request size %u", strlen(l_request));
@@ -309,18 +307,18 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
                         a_client_pvt->stage_status = STAGE_STATUS_ERROR;
                         break;
                     }
-        #ifdef _WIN32
+#ifdef _WIN32
                     {
                       int buffsize = 65536*4;
                       int optsize = sizeof( int );
                       setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (char *)&buffsize, &optsize );
                       setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, &optsize );
                     }
-        #else
+#else
                     int buffsize = 65536*4;
                     setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (const void *) &buffsize, sizeof(int));
                     setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (const void *) &buffsize, sizeof(int));
-        #endif
+#endif
 
                     // Wrap socket and setup callbacks
                     static dap_events_socket_callbacks_t l_s_callbacks = {
@@ -393,8 +391,6 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
                             log_it(L_INFO,"Connecting to remote %s:%s",a_client_pvt->uplink_addr, a_client_pvt->uplink_port);
                         }
                     }
-
-
                 }
                     break;
                 case STAGE_STREAM_CONNECTED: {
@@ -1091,6 +1087,7 @@ static void s_stream_es_callback_delete(dap_events_socket_t *a_es, void *arg)
     log_it(L_INFO, "Stream delete callback");
 
     dap_client_pvt_t * l_client_pvt =(dap_client_pvt_t*) a_es->_inheritor;
+
     a_es->_inheritor = NULL; // To prevent delete in reactor
 
     if(l_client_pvt == NULL) {
diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c
index 8a6ffdd995..1985ba9985 100644
--- a/dap-sdk/net/core/dap_events_socket.c
+++ b/dap-sdk/net/core/dap_events_socket.c
@@ -732,7 +732,6 @@ void dap_events_socket_worker_poll_update_unsafe(dap_events_socket_t * a_esocket
         log_it(L_ERROR, "Wrong poll index when remove from worker (unsafe): %u when total count %u", a_esocket->poll_index,
                a_esocket->worker->poll_count);
     }
-
 #else
 #error "Not defined dap_events_socket_set_writable_unsafe for your platform"
 #endif
@@ -791,40 +790,51 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool
     if ( a_es->worker)
         dap_events_socket_remove_from_worker_unsafe(a_es, a_es->worker);
 
-    if (a_es->events){ // It could be socket NOT from events
-        pthread_rwlock_wrlock( &a_es->events->sockets_rwlock );
-        if(!dap_events_socket_find_unsafe(a_es->socket, a_es->events)){
-            log_it( L_ERROR, "dap_events_socket 0x%x already deleted", a_es);
-            pthread_rwlock_unlock( &a_es->events->sockets_rwlock );
-            return ;
-        }
-
-        if(a_es->events->sockets)
-            HASH_DEL( a_es->events->sockets, a_es );
-        pthread_rwlock_unlock( &a_es->events->sockets_rwlock );
-    }
     //log_it( L_DEBUG, "dap_events_socket wrapped around %d socket is removed", a_es->socket );
 
     if( a_es->callbacks.delete_callback )
         a_es->callbacks.delete_callback( a_es, NULL ); // Init internal structure
 
-    if ( a_es->_inheritor && !preserve_inheritor )
-        DAP_DELETE( a_es->_inheritor );
+    dap_events_socket_delete_unsafe(a_es, preserve_inheritor);
+}
 
-    if ( a_es->socket && a_es->socket != -1) {
-#ifdef _WIN32
-        closesocket( a_es->socket );
-#else
-        close( a_es->socket );
-#ifdef DAP_EVENTS_CAPS_QUEUE_PIPE2
-        if( a_es->type == DESCRIPTOR_TYPE_QUEUE){
-            close( a_es->fd2);
+/**
+ * @brief dap_events_socket_delete_unsafe
+ * @param a_esocket
+ * @param a_preserve_inheritor
+ */
+void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_preserve_inheritor)
+{
+    if (a_esocket->events){ // It could be socket NOT from events
+        pthread_rwlock_wrlock( &a_esocket->events->sockets_rwlock );
+        if(!dap_events_socket_find_unsafe(a_esocket->socket, a_esocket->events)){
+            log_it( L_ERROR, "dap_events_socket 0x%x already deleted", a_esocket);
+            pthread_rwlock_unlock( &a_esocket->events->sockets_rwlock );
+            return ;
         }
-#endif
 
-#endif
+        if(a_esocket->events->sockets)
+            HASH_DEL( a_esocket->events->sockets, a_esocket );
+        pthread_rwlock_unlock( &a_esocket->events->sockets_rwlock );
+    }
+
+    if ( a_esocket->_inheritor && !a_preserve_inheritor )
+        DAP_DELETE( a_esocket->_inheritor );
+
+    if ( a_esocket->socket && a_esocket->socket != -1) {
+    #ifdef _WIN32
+        closesocket( a_esocket->socket );
+    #else
+        close( a_esocket->socket );
+    #ifdef DAP_EVENTS_CAPS_QUEUE_PIPE2
+        if( a_esocket->type == DESCRIPTOR_TYPE_QUEUE){
+            close( a_esocket->fd2);
+        }
+    #endif
+
+    #endif
     }
-    DAP_DELETE( a_es );
+    DAP_DELETE( a_esocket );
 }
 
 /**
diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c
index d50e05a2cc..1074b9f709 100644
--- a/dap-sdk/net/core/dap_worker.c
+++ b/dap-sdk/net/core/dap_worker.c
@@ -395,10 +395,8 @@ void *dap_worker_thread(void *arg)
                             log_it(L_ERROR, "Some error occured in send(): %s", strerror(errno));
                             l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
                             l_cur->buf_out_size = 0;
-
                         }
                     }else{
-
                         //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", l_bytes_sent,l_cur->buf_out_size);
                         if (l_bytes_sent) {
                             if ( l_bytes_sent <= (ssize_t) l_cur->buf_out_size ){
@@ -479,8 +477,9 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg)
     dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg;
     dap_worker_t * w = a_es->worker;
     //log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new);
-    if(dap_events_socket_check_unsafe( w, a_es)){
-        log_it(L_ERROR, "Already assigned %d (%p), you're doing smth wrong", a_es->socket, a_es);
+    if(dap_events_socket_check_unsafe( w, l_es_new)){
+        //log_it(L_ERROR, "Already assigned %d (%p), you're doing smth wrong", l_es_new->socket, l_es_new);
+        // Socket already present in worker, it's OK
         return;
     }
 
diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h
index 86e5560cad..feb65aa366 100644
--- a/dap-sdk/net/core/include/dap_events_socket.h
+++ b/dap-sdk/net/core/include/dap_events_socket.h
@@ -31,6 +31,7 @@
 #include "dap_common.h"
 
 #define DAP_EVENTS_SOCKET_MAX 8194
+
 // Caps for different platforms
 #if defined(DAP_OS_LINUX)
 //    #define DAP_EVENTS_CAPS_EPOLL
@@ -218,6 +219,8 @@ dap_events_socket_t * dap_events_socket_create_type_pipe_mt(dap_worker_t * a_w,
 int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg);
 int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value);
 
+void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_preserve_inheritor);
+
 dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events,
                                             int a_sock, dap_events_socket_callbacks_t *a_callbacks );
 dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct dap_events *a_events,
diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c
index 9904f8dac1..e57dc4e405 100644
--- a/dap-sdk/net/stream/stream/dap_stream.c
+++ b/dap-sdk/net/stream/stream/dap_stream.c
@@ -338,7 +338,6 @@ dap_stream_t* dap_stream_new_es_client(dap_events_socket_t * a_esocket)
     ret->esocket = a_esocket;
     ret->buf_defrag_size=0;
     ret->is_client_to_uplink = true;
-    log_it(L_NOTICE,"New stream with events socket instance for %s",a_esocket->hostaddr);
     return ret;
 }
 
diff --git a/dap-sdk/net/stream/stream/dap_stream_ctl.c b/dap-sdk/net/stream/stream/dap_stream_ctl.c
index 52bf55be3d..54d92f87fe 100644
--- a/dap-sdk/net/stream/stream/dap_stream_ctl.c
+++ b/dap-sdk/net/stream/stream/dap_stream_ctl.c
@@ -149,9 +149,10 @@ void s_proc(struct dap_http_simple *a_http_simple, void * a_arg)
             log_it(L_INFO, "legacy encryption mode used (OAES)");
             l_enc_type = DAP_ENC_KEY_TYPE_OAES;
             l_new_session = true;
-        }
-        if(l_new_session){
+        }else
+            log_it(L_DEBUG,"Encryption type %s (enc headers %d)",dap_enc_get_type_name(l_enc_type), l_enc_headers);
 
+        if(l_new_session){
             ss = dap_stream_session_pure_new();
             strncpy(ss->active_channels, l_channels_str, l_channels_str_size);
             char *key_str = calloc(1, KEX_KEY_STR_SIZE+1);
diff --git a/modules/chain/dap_chain_ledger.c b/modules/chain/dap_chain_ledger.c
index 8ede47afb5..f96ad62b39 100644
--- a/modules/chain/dap_chain_ledger.c
+++ b/modules/chain/dap_chain_ledger.c
@@ -334,7 +334,7 @@ int dap_chain_ledger_token_load(dap_ledger_t *a_ledger,  dap_chain_datum_token_t
  * @param a_ledger
  */
 static void s_treshold_emissions_proc(dap_ledger_t * a_ledger)
-{
+{ 
     bool l_success;
     do {
         l_success = false;
@@ -343,13 +343,15 @@ static void s_treshold_emissions_proc(dap_ledger_t * a_ledger)
             int l_res = dap_chain_ledger_token_emission_add(a_ledger, l_emission_item->datum_token_emission,
                                                             l_emission_item->datum_token_emission_size);
             if (!l_res) {
+                pthread_rwlock_wrlock(&PVT(a_ledger)->treshold_emissions_rwlock);
                 HASH_DEL(PVT(a_ledger)->treshold_emissions, l_emission_item);
+                pthread_rwlock_unlock(&PVT(a_ledger)->treshold_emissions_rwlock);
                 DAP_DELETE(l_emission_item->datum_token_emission);
                 DAP_DELETE(l_emission_item);
                 l_success = true;
             }
         }
-    } while (l_success);
+    } while (l_success); 
 }
 
 /**
@@ -357,22 +359,23 @@ static void s_treshold_emissions_proc(dap_ledger_t * a_ledger)
  * @param a_ledger
  */
 static void s_treshold_txs_proc( dap_ledger_t *a_ledger)
-{
+{  
     bool l_success;
     do {
         l_success = false;
         dap_chain_ledger_tx_item_t *l_tx_item, *l_tx_tmp;
         HASH_ITER(hh, PVT(a_ledger)->treshold_txs, l_tx_item, l_tx_tmp) {
             int l_res = dap_chain_ledger_tx_add(a_ledger, l_tx_item->tx);
-            if (!l_res) {
+            if (l_res == 1) {
+                pthread_rwlock_wrlock(&PVT(a_ledger)->treshold_txs_rwlock);
                 HASH_DEL(PVT(a_ledger)->treshold_txs, l_tx_item);
+                pthread_rwlock_unlock(&PVT(a_ledger)->treshold_txs_rwlock);
                 DAP_DELETE(l_tx_item->tx);
                 DAP_DELETE(l_tx_item);
                 l_success = true;
             }
         }
     } while (l_success);
-
 }
 
 
@@ -855,7 +858,7 @@ int dap_chain_ledger_tx_cache_check(dap_ledger_t *a_ledger, dap_chain_datum_tx_t
      &&
      4. tx1.dap_chain_datum_tx_out.addr.data.key == tx2.dap_chain_datum_tx_sig.pkey for unconditional output
      \\
-     5a. tx1.dap_chain_datum_tx_sig.pkey == tx1.dap_chain_datum_tx_sig.pkey for conditional owner
+     5a. tx1.dap_chain_datum_tx_sig.pkey == tx2.dap_chain_datum_tx_sig.pkey for conditional owner
      \\
      5b. tx1.dap_chain_datum_tx_out.condition == verify_svc_type(tx2) for conditional output
      &&
@@ -1302,7 +1305,7 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx)
                     pthread_rwlock_wrlock(&l_ledger_priv->treshold_txs_rwlock);
                     HASH_ADD(hh, l_ledger_priv->treshold_txs, tx_hash_fast, sizeof(dap_chain_hash_fast_t), l_item_tmp);
                     pthread_rwlock_unlock(&l_ledger_priv->treshold_txs_rwlock);
-                    log_it (L_DEBUG, "dap_chain_ledger_tx_add() tx %s added to threshold", l_tx_hash_str);
+                    log_it (L_DEBUG, "Tx %s added to threshold", l_tx_hash_str);
                     // Add it to cache
                     dap_chain_datum_tx_t *l_tx_cache = DAP_NEW_Z_SIZE(dap_chain_datum_tx_t, l_tx_size);
                     memcpy(l_tx_cache, a_tx, l_tx_size);
@@ -1536,6 +1539,7 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx)
         pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock);
         HASH_ADD(hh, l_ledger_priv->ledger_items, tx_hash_fast, sizeof(dap_chain_hash_fast_t), l_item_tmp); // tx_hash_fast: name of key field
         pthread_rwlock_unlock(&l_ledger_priv->ledger_rwlock);
+
         // Add it to cache
         uint8_t *l_tx_cache = DAP_NEW_Z_SIZE(uint8_t, l_tx_size + sizeof(l_item_tmp->cache_data));
         memcpy(l_tx_cache, &l_item_tmp->cache_data, sizeof(l_item_tmp->cache_data));
@@ -1546,6 +1550,7 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx)
             DAP_DELETE(l_tx_cache);
         }
         DAP_DELETE(l_gdb_group);
+
         s_treshold_txs_proc(a_ledger);
         ret = 1;
     }
diff --git a/modules/chain/include/dap_chain_ledger.h b/modules/chain/include/dap_chain_ledger.h
index c29478efed..252ea15478 100644
--- a/modules/chain/include/dap_chain_ledger.h
+++ b/modules/chain/include/dap_chain_ledger.h
@@ -64,7 +64,6 @@ typedef bool (* dap_chain_ledger_verificator_callback_t)(dap_chain_tx_out_cond_t
 
 dap_ledger_t* dap_chain_ledger_create(uint16_t a_check_flags, char *a_net_name);
 
-
 // Remove dap_ledger_t structure
 void dap_chain_ledger_handle_free(dap_ledger_t *a_ledger);
 
diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c
index fb32aab1c4..6ce9e8078b 100644
--- a/modules/channel/chain/dap_stream_ch_chain.c
+++ b/modules/channel/chain/dap_stream_ch_chain.c
@@ -103,7 +103,6 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg)
 {
     a_ch->internal = DAP_NEW_Z(dap_stream_ch_chain_t);
     dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
-    pthread_mutex_init(&l_ch_chain->mutex, NULL);
     l_ch_chain->ch = a_ch;
 }
 
@@ -120,7 +119,6 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg)
         dap_db_log_list_delete(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs); //dap_list_free_full(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs, (dap_callback_destroyed_t) free);
         DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs = NULL;
     }
-    pthread_mutex_destroy(&DAP_STREAM_CH_CHAIN(a_ch)->mutex);
 }
 
 
@@ -222,11 +220,12 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
     }
 
     dap_chain_hash_fast_t l_atom_hash = {};
-    dap_chain_atom_ptr_t l_atom_copy = l_ch_chain->pkt_data;
-    uint64_t l_atom_copy_size = l_ch_chain->pkt_data_size;
-    l_ch_chain->pkt_data = NULL;
-    l_ch_chain->pkt_data_size = 0;
-    if( l_atom_copy && l_atom_copy_size){
+    dap_list_t *l_pkt_copy_list = l_ch_chain->pkt_copy_list;
+    if (l_pkt_copy_list) {
+        l_ch_chain->pkt_copy_list = l_ch_chain->pkt_copy_list->next;
+        dap_chain_pkt_copy_t *l_pkt_copy = (dap_chain_pkt_copy_t *)l_pkt_copy_list->data;
+        dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_copy->pkt_data;
+        uint64_t l_atom_copy_size = l_pkt_copy->pkt_data_size;
         dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash);
         dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain);
         size_t l_atom_size =0;
@@ -280,8 +279,10 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
             DAP_DELETE(l_atom_copy);
         }
         l_chain->callback_atom_iter_delete(l_atom_iter);
+        DAP_DELETE(l_pkt_copy);
+        DAP_DELETE(l_pkt_copy_list);
     }else
-        log_it(L_WARNING, "In proc thread got stream ch packet with pkt_size: %zd and pkt_data: %p", l_atom_copy_size, l_atom_copy);
+        log_it(L_WARNING, "In proc thread got CHAINS stream ch packet with zero data");
     dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker);
     return true;
 }
@@ -291,93 +292,101 @@ bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
     UNUSED(a_thread);
     dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg;
     dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
-    dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
 
-    size_t l_data_obj_count = 0;
-    // deserialize data & Parse data from dap_db_log_pack()
-    dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_ch_chain->pkt_data,l_ch_chain->pkt_data_size, &l_data_obj_count);
-    //log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count );
-
-    for(size_t i = 0; i < l_data_obj_count; i++) {
-        // timestamp for exist obj
-        time_t l_timestamp_cur = 0;
-        // obj to add
-        dap_store_obj_t* l_obj = l_store_obj + i;
-        // read item from base;
-        size_t l_count_read = 0;
-        dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group,
-                l_obj->key, &l_count_read);
-        // get timestamp for the exist entry
-        if(l_read_obj)
-            l_timestamp_cur = l_read_obj->timestamp;
-        // get timestamp for the deleted entry
-        else
-        {
-            l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key);
-        }
+    dap_list_t *l_pkt_copy_list = l_ch_chain->pkt_copy_list;
+    if (l_pkt_copy_list) {
+        l_ch_chain->pkt_copy_list = l_ch_chain->pkt_copy_list->next;
+        dap_chain_pkt_copy_t *l_pkt_copy = (dap_chain_pkt_copy_t *)l_pkt_copy_list->data;
+        size_t l_data_obj_count = 0;
+        // deserialize data & Parse data from dap_db_log_pack()
+        dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_pkt_copy->pkt_data, l_pkt_copy->pkt_data_size, &l_data_obj_count);
+        //log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count );
+
+        for(size_t i = 0; i < l_data_obj_count; i++) {
+            // timestamp for exist obj
+            time_t l_timestamp_cur = 0;
+            // obj to add
+            dap_store_obj_t* l_obj = l_store_obj + i;
+            // read item from base;
+            size_t l_count_read = 0;
+            dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group,
+                    l_obj->key, &l_count_read);
+            // get timestamp for the exist entry
+            if(l_read_obj)
+                l_timestamp_cur = l_read_obj->timestamp;
+            // get timestamp for the deleted entry
+            else
+            {
+                l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key);
+            }
 
-        //check whether to apply the received data into the database
-        bool l_apply = true;
-        if(l_obj->timestamp < l_timestamp_cur)
-            l_apply = false;
-        else if(l_obj->type == 'd') {
-            // already deleted
-            if(!l_read_obj)
-                l_apply = false;
-        }
-        else if(l_obj->type == 'a') {
-            bool l_is_the_same_present = false;
-            if(l_read_obj &&
-                    l_read_obj->value_len == l_obj->value_len &&
-                    !memcmp(l_read_obj->value, l_obj->value, l_obj->value_len))
-                l_is_the_same_present = true;
-            // this data already present in global_db and not obsolete (out of date)
-            if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp))
+            //check whether to apply the received data into the database
+            bool l_apply = true;
+            if(l_obj->timestamp < l_timestamp_cur)
                 l_apply = false;
-        }
-        if(l_read_obj)
-            dap_store_obj_free(l_read_obj, l_count_read);
+            else if(l_obj->type == 'd') {
+                // already deleted
+                if(!l_read_obj)
+                    l_apply = false;
+            }
+            else if(l_obj->type == 'a') {
+                bool l_is_the_same_present = false;
+                if(l_read_obj &&
+                        l_read_obj->value_len == l_obj->value_len &&
+                        !memcmp(l_read_obj->value, l_obj->value, l_obj->value_len))
+                    l_is_the_same_present = true;
+                // this data already present in global_db and not obsolete (out of date)
+                if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp))
+                    l_apply = false;
+            }
+            if(l_read_obj)
+                dap_store_obj_free(l_read_obj, l_count_read);
 
-        if(!l_apply) {
-            // If request was from defined node_addr we update its state
-            if(l_ch_chain->request.node_addr.uint64) {
-                dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
+            if(!l_apply) {
+                // If request was from defined node_addr we update its state
+                if(l_ch_chain->request.node_addr.uint64) {
+                    dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
+                }
+                continue;
             }
-            continue;
-        }
 
-        char l_ts_str[50];
-        dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp);
-        /*log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\""
-                " timestamp=\"%s\" value_len=%u  ",
-                (char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group,
-                l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);*/
-        // apply received transaction
-        dap_chain_t *l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
-        if(l_chain) {
-            if(l_chain->callback_add_datums_with_group){
-                void * restrict l_store_obj_value = l_store_obj->value;
-                l_chain->callback_add_datums_with_group(l_chain,
-                        (dap_chain_datum_t** restrict) l_store_obj_value, 1,
-                        l_store_obj[i].group);
+            char l_ts_str[50];
+            dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp);
+            /*log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\""
+                    " timestamp=\"%s\" value_len=%u  ",
+                    (char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group,
+                    l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);*/
+            // apply received transaction
+            dap_chain_t *l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
+            if(l_chain) {
+                if(l_chain->callback_add_datums_with_group){
+                    void * restrict l_store_obj_value = l_store_obj->value;
+                    l_chain->callback_add_datums_with_group(l_chain,
+                            (dap_chain_datum_t** restrict) l_store_obj_value, 1,
+                            l_store_obj[i].group);
+                }
             }
-        }
-        // save data to global_db
-        if(!dap_chain_global_db_obj_save(l_obj, 1)) {
-            dap_stream_ch_chain_pkt_write_error(l_ch, l_ch_chain->request_net_id,
-                                                l_ch_chain->request_chain_id, l_ch_chain->request_cell_id,
-                                                "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED");
-            dap_stream_ch_set_ready_to_write_unsafe(l_ch, true);
-        } else {
-            // If request was from defined node_addr we update its state
-            if(l_ch_chain->request.node_addr.uint64) {
-                dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
+            // save data to global_db
+            if(!dap_chain_global_db_obj_save(l_obj, 1)) {
+                dap_stream_ch_chain_pkt_write_error(l_ch, l_ch_chain->request_net_id,
+                                                    l_ch_chain->request_chain_id, l_ch_chain->request_cell_id,
+                                                    "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED");
+                dap_stream_ch_set_ready_to_write_unsafe(l_ch, true);
+            } else {
+                // If request was from defined node_addr we update its state
+                if(l_ch_chain->request.node_addr.uint64) {
+                    dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
+                }
+                //log_it(L_DEBUG, "Added new GLOBAL_DB history pack");
             }
-            //log_it(L_DEBUG, "Added new GLOBAL_DB history pack");
         }
+        if(l_store_obj)
+            dap_store_obj_free(l_store_obj, l_data_obj_count);
+        DAP_DELETE(l_pkt_copy);
+        DAP_DELETE(l_pkt_copy_list);
+    } else {
+        log_it(L_WARNING, "In proc thread got GDB stream ch packet with zero data");
     }
-    if(l_store_obj)
-        dap_store_obj_free(l_store_obj, l_data_obj_count);
     dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker);
     return true;
 }
@@ -519,9 +528,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                 memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
                 memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
                 memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
-                l_ch_chain->pkt_data = DAP_NEW_SIZE(byte_t, l_chain_pkt_data_size);
-                memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
-                l_ch_chain->pkt_data_size = l_chain_pkt_data_size;
+                dap_chain_pkt_copy_t *l_pkt_copy = DAP_NEW_Z(dap_chain_pkt_copy_t);
+                l_pkt_copy->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size);
+                memcpy(l_pkt_copy->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
+                l_pkt_copy->pkt_data_size = l_chain_pkt_data_size;
+                l_ch_chain->pkt_copy_list = dap_list_append(l_ch_chain->pkt_copy_list, l_pkt_copy);
                 dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
                 dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_chain_pkt_callback, a_ch);
             } else {
@@ -548,9 +559,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
             memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
             memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
             memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
-            l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size);
-            memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
-            l_ch_chain->pkt_data_size = l_chain_pkt_data_size;
+            dap_chain_pkt_copy_t *l_pkt_copy = DAP_NEW_Z(dap_chain_pkt_copy_t);
+            l_pkt_copy->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size);
+            memcpy(l_pkt_copy->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
+            l_pkt_copy->pkt_data_size = l_chain_pkt_data_size;
+            l_ch_chain->pkt_copy_list = dap_list_append(l_ch_chain->pkt_copy_list, l_pkt_copy);
             dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
             dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_gdb_pkt_callback, a_ch);
         } else {
diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h
index 931d2b9714..b73c509b3e 100644
--- a/modules/channel/chain/include/dap_stream_ch_chain.h
+++ b/modules/channel/chain/include/dap_stream_ch_chain.h
@@ -45,8 +45,12 @@ typedef struct dap_chain_atom_item{
     UT_hash_handle hh;
 } dap_chain_atom_item_t;
 
+typedef struct dap_chain_pkt_copy {
+    uint64_t pkt_data_size;
+    byte_t *pkt_data;
+} dap_chain_pkt_copy_t;
+
 typedef struct dap_stream_ch_chain {
-    pthread_mutex_t mutex;
     dap_stream_ch_t * ch;
 
     dap_db_log_list_t *request_global_db_trs; // list of global db records
@@ -54,8 +58,7 @@ typedef struct dap_stream_ch_chain {
     dap_stream_ch_chain_state_t state;
 
     dap_chain_atom_iter_t * request_atom_iter;
-    byte_t *pkt_data;
-    uint64_t pkt_data_size;
+    dap_list_t *pkt_copy_list;
     uint64_t stats_request_atoms_processed;
     uint64_t stats_request_gdb_processed;
     dap_stream_ch_chain_sync_request_t request;
diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c
index 9370189c50..0b4fe001d2 100644
--- a/modules/net/dap_chain_net.c
+++ b/modules/net/dap_chain_net.c
@@ -704,14 +704,14 @@ static void *s_net_check_thread ( void *a_net )
     //dap_chain_global_db_set_callback_for_update_base(s_net_proc_thread_callback_update_db);
 
     while(1){
-        if ( !(p_net->flags & F_DAP_CHAIN_NET_SHUTDOWN) ) {
+        if (p_net->flags & F_DAP_CHAIN_NET_SHUTDOWN) {
             return NULL;
         }
 
-        // check or start sync
-        s_net_states_proc( l_net );
         if (p_net->flags & F_DAP_CHAIN_NET_GO_SYNC) {
+            // check or start sync
             s_net_states_proc( l_net );
+            continue;
         }
         struct timespec l_to;
 
diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c
index 4041396e15..d27b45c921 100644
--- a/modules/service/vpn/dap_chain_net_srv_vpn.c
+++ b/modules/service/vpn/dap_chain_net_srv_vpn.c
@@ -1323,13 +1323,16 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
             // for client only
             case VPN_PACKET_OP_CODE_VPN_RECV:{
                 a_ch->stream->esocket->last_ping_request = time(NULL); // not ping, but better  ;-)
+                //ch_sf_tun_client_send(CH_VPN(a_ch), l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size);
+                dap_events_socket_t *l_es = dap_chain_net_vpn_client_tun_get_esock();
                 // Find tun socket for current worker
-                ch_sf_tun_socket_t * l_tun = s_tun_sockets[a_ch->stream_worker->worker->id];
+                ch_sf_tun_socket_t *l_tun =  l_es ? l_es->_inheritor : NULL;
+                //ch_sf_tun_socket_t * l_tun = s_tun_sockets[a_ch->stream_worker->worker->id];
                 assert(l_tun);
                 s_stream_session_esocket_send(l_srv_session, l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size);
             } break;
 
-            // for servier only
+            // for server only
             case VPN_PACKET_OP_CODE_VPN_SEND: {
                 ch_sf_tun_socket_t * l_tun = s_tun_sockets[a_ch->stream_worker->worker->id];
                 assert(l_tun);
diff --git a/modules/service/vpn/dap_chain_net_vpn_client_tun.c b/modules/service/vpn/dap_chain_net_vpn_client_tun.c
index ca890bcfc0..1d1dc7e4ef 100644
--- a/modules/service/vpn/dap_chain_net_vpn_client_tun.c
+++ b/modules/service/vpn/dap_chain_net_vpn_client_tun.c
@@ -253,6 +253,32 @@ static void m_client_tun_write(dap_events_socket_t * a_es, void * arg)
 //    log_it(L_WARNING, __PRETTY_FUNCTION__);
 }
 
+void m_client_tun_new(dap_events_socket_t * a_es, void * arg)
+{
+    (void) arg;
+    ch_sf_tun_socket_t * l_tun_socket = DAP_NEW_Z(ch_sf_tun_socket_t);
+    if ( l_tun_socket ){
+        l_tun_socket->worker = a_es->worker;
+        l_tun_socket->worker_id = l_tun_socket->worker->id;
+        l_tun_socket->es = a_es;
+        //s_tun_sockets_queue_msg[a_es->worker->id] = dap_events_socket_create_type_queue_ptr_unsafe(a_es->worker, s_tun_recv_msg_callback );
+        //s_tun_sockets[a_es->worker->id] = l_tun_socket;
+
+        a_es->_inheritor = l_tun_socket;
+        //s_tun_attach_queue( a_es->fd );
+        {
+            struct ifreq ifr;
+            memset(&ifr, 0, sizeof(ifr));
+            ifr.ifr_flags = IFF_ATTACH_QUEUE;
+            ioctl(a_es->fd, TUNSETQUEUE, (void *)&ifr);
+        }
+        log_it(L_NOTICE,"New TUN event socket initialized for worker %u" , l_tun_socket->worker_id);
+
+    }else{
+        log_it(L_ERROR, "Can't allocate memory for tun socket");
+    }
+}
+
 static void m_client_tun_read(dap_events_socket_t * a_es, void * arg)
 {
     const static int tun_MTU = 100000; /// TODO Replace with detection of MTU size
@@ -304,6 +330,10 @@ static void m_client_tun_error(dap_events_socket_t * a_es, int a_arg)
   log_it(L_WARNING, " TUN client problems: code %d", a_arg);
 }
 
+dap_events_socket_t* dap_chain_net_vpn_client_tun_get_esock(void) {
+    return s_tun_events_socket;
+}
+
 int dap_chain_net_vpn_client_tun_create(const char *a_ipv4_addr_str, const char *a_ipv4_gw_str)
 {
     //    char dev[IFNAMSIZ] = { 0 };
@@ -412,6 +442,7 @@ int dap_chain_net_vpn_client_tun_create(const char *a_ipv4_addr_str, const char
     pthread_mutex_init(&s_clients_mutex, NULL);
 
     static dap_events_socket_callbacks_t l_s_callbacks = {
+            .new_callback = m_client_tun_new,//m_es_tun_new;
             .read_callback = m_client_tun_read,// for server
             .write_callback = m_client_tun_write,// for client
             .error_callback = m_client_tun_error,
@@ -421,6 +452,7 @@ int dap_chain_net_vpn_client_tun_create(const char *a_ipv4_addr_str, const char
     s_tun_events_socket = dap_events_socket_wrap_no_add(dap_events_get_default(), s_fd_tun, &l_s_callbacks);
     s_tun_events_socket->type = DESCRIPTOR_TYPE_FILE;
     dap_worker_add_events_socket_auto(s_tun_events_socket);
+    //dap_events_socket_assign_on_worker_mt(l_es, a_worker);
     s_tun_events_socket->_inheritor = NULL;
 
     //return 0;
diff --git a/modules/service/vpn/include/dap_chain_net_vpn_client_tun.h b/modules/service/vpn/include/dap_chain_net_vpn_client_tun.h
index 054b7642ad..1697d3b6c5 100644
--- a/modules/service/vpn/include/dap_chain_net_vpn_client_tun.h
+++ b/modules/service/vpn/include/dap_chain_net_vpn_client_tun.h
@@ -26,6 +26,7 @@
 #include "dap_chain_net_srv_vpn.h"
 
 int dap_chain_net_vpn_client_tun_init(const char *a_ipv4_gw_str);
+dap_events_socket_t* dap_chain_net_vpn_client_tun_get_esock(void);
 int dap_chain_net_vpn_client_tun_create(const char *a_ipv4_addr_str, const char *a_ipv4_gw_str);
 int dap_chain_net_vpn_client_tun_delete(void);
 int dap_chain_net_vpn_client_tun_status(void);
diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c
index a8ec207191..fb6f17b94b 100644
--- a/modules/type/dag/dap_chain_cs_dag.c
+++ b/modules/type/dag/dap_chain_cs_dag.c
@@ -287,8 +287,9 @@ void dap_chain_cs_dag_delete(dap_chain_t * a_chain)
         DAP_DELETE(l_dag->_pvt);
 }
 
-static int s_dap_chain_add_atom_to_ledger(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger, dap_chain_cs_dag_event_item_t * a_event_item){
 
+static int s_dap_chain_add_atom_to_ledger(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger, dap_chain_cs_dag_event_item_t * a_event_item)
+{
     dap_chain_datum_t *l_datum = (dap_chain_datum_t*) dap_chain_cs_dag_event_get_datum(a_event_item->event, a_event_item->event_size);
     switch (l_datum->header.type_id) {
         case DAP_CHAIN_DATUM_TOKEN_DECL: {
@@ -306,7 +307,7 @@ static int s_dap_chain_add_atom_to_ledger(dap_chain_cs_dag_t * a_dag, dap_ledger
             // don't save bad transactions to base
             int l_ret = dap_chain_ledger_tx_load(a_ledger, l_tx);
             if( l_ret != 1 ) {
-            return l_ret;
+                return l_ret;
             }
             dap_chain_cs_dag_event_item_t * l_tx_event= DAP_NEW_Z(dap_chain_cs_dag_event_item_t);
             l_tx_event->ts_added = a_event_item->ts_added;
-- 
GitLab