From 5c3458bc57d309c3acdfb961aca82b850fb4e8a9 Mon Sep 17 00:00:00 2001
From: Roman Khlopkov <roman.khlopkov@demlabs.net>
Date: Wed, 9 Sep 2020 08:26:11 +0000
Subject: [PATCH] bugs-4442

---
 dap-sdk/net/stream/ch/dap_stream_ch_pkt.c   |   4 +-
 modules/channel/chain/dap_stream_ch_chain.c |  12 +-
 modules/net/dap_chain_net.c                 |  36 +++---
 modules/net/dap_chain_node_client.c         | 127 ++------------------
 modules/service/vpn/dap_chain_net_srv_vpn.c |   1 -
 modules/type/dag/dap_chain_cs_dag.c         |  23 ++--
 6 files changed, 49 insertions(+), 154 deletions(-)

diff --git a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c
index 0fce460367..ab46209234 100644
--- a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c
+++ b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c
@@ -162,8 +162,8 @@ bool dap_stream_ch_check_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t *
  */
 size_t dap_stream_ch_pkt_write_unsafe(dap_stream_ch_t * a_ch,  uint8_t a_type, const void * a_data, size_t a_data_size)
 {
-    if (! a_data_size){
-        log_it(L_WARNING,"Zero data size to write out in channel");
+    if (!a_data_size || !a_ch || !a_data) {
+        log_it(L_WARNING, "NULL ptr or zero data size to write out in channel");
         return 0;
     }
     //log_it(L_DEBUG,"Output: Has %u bytes of %c type for %c channel id",data_size, (char)type, (char) ch->proc->id );
diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c
index fe8116937f..607a183509 100644
--- a/modules/channel/chain/dap_stream_ch_chain.c
+++ b/modules/channel/chain/dap_stream_ch_chain.c
@@ -138,9 +138,9 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg)
     dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create(l_chain);
     l_ch_chain->request_atom_iter = l_iter;
     l_lasts = l_chain->callback_atom_iter_get_lasts(l_iter, &l_lasts_count, &l_lasts_sizes);
-    log_it(L_INFO, "Found %d atoms for synchronization", l_lasts_count);
+    log_it(L_INFO, "Found %d last atoms for synchronization", l_lasts_count);
     if (l_lasts && l_lasts_sizes) {
-        for(long int i = l_lasts_count - 1; i >= 0; i--) {
+        for(long int i = 0; i < l_lasts_count; i++) {
             dap_chain_atom_item_t * l_item = NULL;
             dap_chain_hash_fast_t l_atom_hash;
             dap_hash_fast(l_lasts[i], l_lasts_sizes[i], &l_atom_hash);
@@ -150,6 +150,7 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg)
             if(l_item == NULL) { // Not found, add new lasts
                 l_item = DAP_NEW_Z(dap_chain_atom_item_t);
                 l_item->atom = l_lasts[i];
+                l_item->atom_size = l_lasts_sizes[i];
                 memcpy(&l_item->atom_hash, &l_atom_hash, sizeof(l_atom_hash));
                 HASH_ADD(hh, l_ch_chain->request_atoms_lasts, atom_hash, sizeof(l_atom_hash),
                         l_item);
@@ -743,14 +744,14 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
                         l_packet_out = true;
                         l_ch_chain->stats_request_atoms_processed++;
                         // Then parse links and populate new lasts
-                        size_t l_lasts_size = 0;
+                        size_t l_links_count = 0;
                         dap_chain_atom_ptr_t * l_links = NULL;
                         size_t *l_links_sizes = NULL;
                         dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create_from(l_chain, l_atom_item->atom, l_atom_item->atom_size);
-                        l_links = l_chain->callback_atom_iter_get_links(l_iter, &l_lasts_size, &l_links_sizes);
+                        l_links = l_chain->callback_atom_iter_get_links(l_iter, &l_links_count, &l_links_sizes);
                         DAP_DELETE(l_iter);
                         if (l_links && l_links_sizes) {
-                            for(size_t i = 0; i < l_lasts_size; i++) { // Find links
+                            for(size_t i = 0; i < l_links_count; i++) { // Find links
                                 dap_chain_atom_item_t * l_link_item = NULL;
                                 dap_chain_hash_fast_t l_link_hash;
                                 dap_hash_fast(l_links[i], l_links_sizes[i], &l_link_hash);
@@ -759,6 +760,7 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
                                 if(l_link_item == NULL) { // Not found, add new lasts
                                     l_link_item = DAP_NEW_Z(dap_chain_atom_item_t);
                                     l_link_item->atom = l_links[i];// do not use memory cause it will be deleted
+                                    l_link_item->atom_size = l_links_sizes[i];
                                     memcpy(&l_link_item->atom_hash, &l_link_hash, sizeof(l_link_hash));
                                     HASH_ADD(hh, l_ch_chain->request_atoms_lasts, atom_hash, sizeof(l_link_hash), l_link_item);
                                 }
diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c
index 2e42961970..fb3731e98d 100644
--- a/modules/net/dap_chain_net.c
+++ b/modules/net/dap_chain_net.c
@@ -277,8 +277,12 @@ void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const c
         for (dap_list_t *l_tmp = PVT(l_net)->links; l_tmp; l_tmp = dap_list_next(l_tmp)) {
             dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data;
             dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id());
+            if (!l_ch_chain) {
+                continue;
+            }
             dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_net->pub.id,
-                                          l_chain_id, l_net->pub.cell_id, l_data_out, sizeof(dap_store_obj_pkt_t) + l_data_out->data_size);
+                                                 l_chain_id, l_net->pub.cell_id, l_data_out,
+                                                 sizeof(dap_store_obj_pkt_t) + l_data_out->data_size);
         }
         dap_list_free_full(l_list_out, free);
     }
@@ -487,7 +491,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net)
                 // find dap_chain_id_t
                 dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb");
                 dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t ) {};
-
+                l_node_client->state = NODE_CLIENT_STATE_CONNECTED;
                 size_t l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id,
                                                                 l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb));
                 if (l_res == 0) {
@@ -509,6 +513,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net)
                 default:
                     log_it(L_INFO, "Node sync error %d",l_res);
                 }
+                l_node_client->state = NODE_CLIENT_STATE_CONNECTED;
                 l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS, l_net->pub.id,
                                                          l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb));
                 l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
@@ -557,21 +562,20 @@ static int s_net_states_proc(dap_chain_net_t * l_net)
                                                      l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request));
                     // wait for finishing of request
                     int timeout_ms = 20000; // 2 min = 120 sec = 120 000 ms
-                    // TODO add progress info to console
-                    if (dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id())) {
-                        l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
-                        switch (l_res) {
-                        case -1:
-                            log_it(L_WARNING,"Timeout with sync of chain '%s' ", l_chain->name);
-                            break;
-                        case 0:
-                            l_need_flush = true;
-                            log_it(L_INFO, "Sync of chain '%s' completed ", l_chain->name);
-                            break;
-                        default:
-                            log_it(L_ERROR, "Sync of chain '%s' error %d", l_chain->name,l_res);
-                        }
+                    // TODO add progress info to console                      
+                    l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
+                    switch (l_res) {
+                    case -1:
+                        log_it(L_WARNING,"Timeout with sync of chain '%s' ", l_chain->name);
+                        break;
+                    case 0:
+                        l_need_flush = true;
+                        log_it(L_INFO, "Sync of chain '%s' completed ", l_chain->name);
+                        break;
+                    default:
+                        log_it(L_ERROR, "Sync of chain '%s' error %d", l_chain->name,l_res);
                     }
+                    l_node_client->state = NODE_CLIENT_STATE_CONNECTED;
                     dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS, l_net->pub.id,
                                                      l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request));
                     l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c
index bb92eab2c7..ce8848a5b7 100644
--- a/modules/net/dap_chain_node_client.c
+++ b/modules/net/dap_chain_node_client.c
@@ -275,128 +275,19 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha
             l_request = (dap_stream_ch_chain_sync_request_t*) a_pkt->data;
 
         if(l_request) {
-            uint64_t l_id_last_here = 1;
-            // for sync chain not used time
-            //if(a_pkt_type != DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS)
-            //    l_id_last_here =(uint64_t) dap_db_log_get_last_id();
-            if(1) {//if(l_request->id_start < l_id_last_here) {
-                log_it(L_INFO, "Remote is synced but we have updates for it");
-                bool l_is_sync = true;
-                // Get log diff
-                if(a_pkt_type == DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB) {
-                    a_ch_chain->request_last_ts = dap_db_log_get_last_id();
-                    uint64_t l_start_item = l_request->id_start;
-                    dap_db_log_list_t *l_db_log = NULL;
-                    // If the current global_db has been truncated, but the remote node has not known this
-                    uint64_t l_last_id = dap_db_log_get_last_id();
-                    if(l_request->id_start > (uint64_t) a_ch_chain->request_last_ts){
-                        dap_chain_net_t *l_net = dap_chain_net_by_id(a_pkt->hdr.net_id);
-                        dap_list_t *l_add_groups = dap_chain_net_get_add_gdb_group(l_net, a_ch_chain->request.node_addr);
-                        l_db_log = dap_db_log_list_start(l_start_item + 1, l_add_groups);
-                        if(!l_db_log)
-                            l_start_item = 0;
-                    }
-                    //dap_list_t *l_list = dap_db_log_get_list(l_request->id_start + 1);
-                    if(!l_db_log){
-                        dap_chain_net_t *l_net = dap_chain_net_by_id(a_pkt->hdr.net_id);
-                        dap_list_t *l_add_groups = dap_chain_net_get_add_gdb_group(l_net, a_ch_chain->request.node_addr);
-                        l_db_log = dap_db_log_list_start(l_start_item + 1, l_add_groups);
-                    }
-                    if(l_db_log) {
-                        // Add it to outgoing list
-                        //l_list->prev = a_ch_chain->request_global_db_trs;
-                        a_ch_chain->request_global_db_trs = l_db_log;//l_list;
-                    }
-                    else
-                        l_is_sync = false;
-                }
-                if(l_is_sync) {
-                    a_ch_chain->request_net_id.uint64 = a_pkt->hdr.net_id.uint64;
-                    a_ch_chain->request_cell_id.uint64 = a_pkt->hdr.cell_id.uint64;
-                    a_ch_chain->request_chain_id.uint64 = a_pkt->hdr.chain_id.uint64;
-                    a_ch_chain->state = dap_stream_ch_chain_pkt_type_to_dap_stream_ch_chain_state(a_pkt_type);//CHAIN_STATE_SYNC_CHAINS;//GLOBAL_DB;
-
-                    // type of first packet
-                    uint8_t l_type =
-                            (a_pkt_type != DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS) ?
-                                    DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB :
-                                    DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN;
-                    if(l_type == DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN)
-                    {
-                        dap_chain_t * l_chain = dap_chain_find_by_id(a_pkt->hdr.net_id, a_pkt->hdr.chain_id);
-                        if (l_chain ){
-                            dap_chain_atom_iter_t* l_iter = l_chain ? l_chain->callback_atom_iter_create(l_chain) : NULL;
-                            if ( l_iter ){
-                                //a_ch_chain->request_atom_iter = l_iter;
-
-                                dap_chain_atom_ptr_t * l_lasts = NULL;
-                                size_t l_lasts_size = 0;
-                                size_t *l_lasts_sizes = NULL;
-                                l_lasts = l_chain->callback_atom_iter_get_lasts(l_iter, &l_lasts_size, &l_lasts_sizes);
-                                if ( l_lasts){
-                                    for(size_t i = 0; i < l_lasts_size; i++) {
-                                        dap_chain_atom_item_t * l_item = NULL;
-                                        dap_chain_hash_fast_t l_atom_hash;
-                                        dap_hash_fast(l_lasts[i], l_lasts_sizes[i], &l_atom_hash);
-                                        pthread_mutex_lock(&a_ch_chain->mutex);
-                                        HASH_FIND(hh, a_ch_chain->request_atoms_lasts, &l_atom_hash, sizeof(l_atom_hash), l_item);
-                                        if(l_item == NULL) { // Not found, add new lasts
-                                            l_item = DAP_NEW_Z(dap_chain_atom_item_t);
-                                            l_item->atom = l_lasts[i];
-                                            memcpy(&l_item->atom_hash, &l_atom_hash, sizeof(l_atom_hash));
-                                            HASH_ADD(hh, a_ch_chain->request_atoms_lasts, atom_hash, sizeof(l_atom_hash), l_item);
-                                        }
-                                        //else
-                                        //    DAP_DELETE(l_lasts[i]);
-                                        pthread_mutex_unlock(&a_ch_chain->mutex);
-                                    }
-                                    DAP_DELETE(l_lasts);
-                                }
-                                DAP_DELETE(l_iter);
-                            }else{
-                                log_it(L_ERROR, "Can't create iterator for chain_id 0x%016X", a_pkt->hdr.chain_id );
-                            }
-                        }else {
-                            log_it(L_WARNING, "Can't find chain_id 0x%016X", a_pkt->hdr.chain_id );
-                        }
-                    }
-                    dap_chain_node_addr_t l_node_addr = { 0 };
-                    dap_chain_net_t *l_net = dap_chain_net_by_id(a_ch_chain->request_net_id);
-                    l_node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0;
-                    dap_stream_ch_chain_pkt_write_unsafe(a_ch_chain->ch, l_type,
-                            a_ch_chain->request_net_id, a_ch_chain->request_chain_id,
-                            a_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t));
-
-                    log_it(L_INFO, "Sync for remote tr type=%d", a_pkt_type);//_count=%d", dap_list_length(l_list));
-                    dap_stream_ch_set_ready_to_write_unsafe(a_ch_chain->ch, true);
-                }
-                else {
-                    log_it(L_INFO, "Remote node has lastes timestamp for us type=%d", a_pkt_type);
-                    pthread_mutex_lock(&l_node_client->wait_mutex);
-                    l_node_client->state = NODE_CLIENT_STATE_SYNCED;
-#ifndef _WIN32
-                    pthread_cond_signal(&l_node_client->wait_cond);
-#else
-                    SetEvent( l_node_client->wait_cond );
-#endif
-                    pthread_mutex_unlock(&l_node_client->wait_mutex);
-                }
-            }
-        } else {
-            log_it(L_INFO, "Sync notify without request to sync back, stay in SYNCED state");
-            pthread_mutex_lock(&l_node_client->wait_mutex);
-            l_node_client->state = NODE_CLIENT_STATE_SYNCED;
+            // Process it if need
+        }
+        log_it(L_INFO, "Sync notify without request to sync back, stay in SYNCED state");
+        pthread_mutex_lock(&l_node_client->wait_mutex);
+        l_node_client->state = NODE_CLIENT_STATE_SYNCED;
 #ifndef _WIN32
-            pthread_cond_signal(&l_node_client->wait_cond);
+        pthread_cond_signal(&l_node_client->wait_cond);
 #else
-            SetEvent( l_node_client->wait_cond );
+        SetEvent( l_node_client->wait_cond );
 #endif
-            pthread_mutex_unlock(&l_node_client->wait_mutex);
-        }
-
-    }
-    default: {
+        pthread_mutex_unlock(&l_node_client->wait_mutex);
     }
+    default: break;
     }
 }
 
diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c
index ca160ab9d6..fcbcb3458c 100644
--- a/modules/service/vpn/dap_chain_net_srv_vpn.c
+++ b/modules/service/vpn/dap_chain_net_srv_vpn.c
@@ -1330,7 +1330,6 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                 assert(l_tun);
                 // Unsafely send it
                 size_t l_ret = s_stream_session_esocket_send(l_srv_session, l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size);
-                //dap_events_socket_set_writable_unsafe(l_tun->es, true);
                 if( l_ret)
                     s_update_limits (a_ch, l_srv_session, l_usage,l_ret );
             } break;
diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c
index b45dc45248..c0de3cc2d7 100644
--- a/modules/type/dag/dap_chain_cs_dag.c
+++ b/modules/type/dag/dap_chain_cs_dag.c
@@ -676,6 +676,7 @@ void s_dag_events_lasts_process_new_last_event(dap_chain_cs_dag_t * a_dag, dap_c
     dap_chain_cs_dag_event_item_t * l_event_last= DAP_NEW_Z(dap_chain_cs_dag_event_item_t);
     l_event_last->ts_added = a_event_item->ts_added;
     l_event_last->event = a_event_item->event;
+    l_event_last->event_size = a_event_item->event_size;
     dap_hash_fast(l_event_last->event, a_event_item->event_size,&l_event_last->hash );
     HASH_ADD(hh,PVT(a_dag)->events_lasts_unlinked,hash, sizeof(l_event_last->hash),l_event_last);
 }
@@ -854,26 +855,24 @@ static dap_chain_atom_ptr_t* s_chain_callback_atom_iter_get_lasts( dap_chain_ato
                                                                   size_t ** a_lasts_size_array )
 {
     dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG( a_atom_iter->chain );
-
-    if ( HASH_COUNT( PVT(l_dag)->events_lasts_unlinked ) > 0 ) {
+    dap_chain_atom_ptr_t * l_ret = NULL;
+    pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock);
+    size_t l_lasts_size = HASH_COUNT( PVT(l_dag)->events_lasts_unlinked );
+    if ( l_lasts_size > 0 ) {
         if( a_lasts_size)
-            *a_lasts_size = HASH_COUNT( PVT(l_dag)->events_lasts_unlinked );
-        dap_chain_atom_ptr_t * l_ret = DAP_NEW_Z_SIZE( dap_chain_atom_ptr_t,
-                                           sizeof (dap_chain_atom_ptr_t*) * HASH_COUNT( PVT(l_dag)->events_lasts_unlinked ) );
-
+            *a_lasts_size = l_lasts_size;
+        l_ret = DAP_NEW_Z_SIZE(dap_chain_atom_ptr_t, sizeof(dap_chain_atom_ptr_t *) * l_lasts_size);
         dap_chain_cs_dag_event_item_t * l_event_item = NULL, *l_event_item_tmp = NULL;
         size_t i = 0;
-        pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock);
-        *a_lasts_size_array = DAP_NEW_Z_SIZE(size_t,sizeof (size_t)* HASH_CNT(hh,PVT(l_dag)->events_lasts_unlinked));
+        *a_lasts_size_array = DAP_NEW_Z_SIZE(size_t, sizeof(size_t) * l_lasts_size);
         HASH_ITER(hh,PVT(l_dag)->events_lasts_unlinked, l_event_item,l_event_item_tmp){
             l_ret[i] = l_event_item->event;
             (*a_lasts_size_array)[i] = l_event_item->event_size;
             i++;
-        }
-        pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock);
-        return l_ret;
+        }    
     }
-    return NULL;
+    pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock);
+    return l_ret;
 }
 
 /**
-- 
GitLab