From 00130f779eba9990cd1498937171accd404f7b78 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Al=D0=B5x=D0=B0nder=20Lysik=D0=BEv?=
 <alexander.lysikov@demlabs.net>
Date: Thu, 26 Dec 2019 22:55:40 +0500
Subject: [PATCH] added automatic synchronization

---
 dap_chain_net.c          | 142 +++++++++++++++++++++++++++++++--------
 dap_chain_net.h          |   1 +
 dap_chain_node.c         |   6 ++
 dap_chain_node_cli_cmd.c |  15 ++++-
 4 files changed, 135 insertions(+), 29 deletions(-)

diff --git a/dap_chain_net.c b/dap_chain_net.c
index 2457c7c453..a8c5223745 100755
--- a/dap_chain_net.c
+++ b/dap_chain_net.c
@@ -86,6 +86,7 @@
 #define LOG_TAG "chain_net"
 
 #define F_DAP_CHAIN_NET_SHUTDOWN  ( 1 << 9 )
+#define F_DAP_CHAIN_NET_GO_SYNC   ( 1 << 10 )
 
 /**
   * @struct dap_chain_net_pvt
@@ -100,7 +101,8 @@ typedef struct dap_chain_net_pvt{
 #endif    
     pthread_mutex_t state_mutex;
     dap_chain_node_role_t node_role;
-    uint32_t  flags;    
+    uint32_t  flags;
+    time_t    last_sync;
 
     dap_chain_node_addr_t * node_addr;
     dap_chain_node_info_t * node_info; // Current node's info
@@ -164,6 +166,19 @@ static void s_gbd_history_callback_notify (void * a_arg,const char a_op_code, co
 static int s_cli_net(int argc, char ** argv, char **str_reply);
 
 static bool s_seed_mode = false;
+
+
+/**
+ * @brief s_net_set_go_sync
+ * @param a_net
+ * @return
+ */
+void s_net_set_go_sync(dap_chain_net_t * a_net)
+{
+    if(a_net)
+        PVT(a_net)->flags |= F_DAP_CHAIN_NET_GO_SYNC;
+}
+
 /**
  * @brief s_net_state_to_str
  * @param l_state
@@ -276,6 +291,42 @@ lb_proc_state:
                     }else {
                         log_it(L_WARNING,"No nodeinfo in global_db to prepare links for connecting, find nearest 3 links and fill global_db");
                     }
+
+                    // add other root nodes for connect
+                    //if(!PVT(l_net)->links_addrs_count)
+                    {
+                        // use no more then 4 root node
+                        int l_use_root_nodes = min(4, PVT(l_net)->seed_aliases_count);
+                        if(!PVT(l_net)->links_addrs_count) {
+                            PVT(l_net)->links_addrs = DAP_NEW_Z_SIZE(dap_chain_node_addr_t,
+                                    l_use_root_nodes * sizeof(dap_chain_node_addr_t));
+                        }
+                        else{
+                            PVT(l_net)->links_addrs = DAP_REALLOC(PVT(l_net)->links_addrs,
+                                    (PVT(l_net)->links_addrs_count+l_use_root_nodes) * sizeof(dap_chain_node_addr_t));
+                            memset(PVT(l_net)->links_addrs + PVT(l_net)->links_addrs_count, 0,
+                                    l_use_root_nodes * sizeof(dap_chain_node_addr_t));
+                        }
+
+                        for(uint16_t i = 0; i < l_use_root_nodes; i++) {
+                            dap_chain_node_addr_t * l_node_addr = dap_chain_node_alias_find(l_net, PVT(l_net)->seed_aliases[i]);
+                            if(l_node_addr) {
+                                PVT(l_net)->links_addrs[PVT(l_net)->links_addrs_count].uint64 = l_node_addr->uint64;
+                                PVT(l_net)->links_addrs_count++;
+                            }
+                        }
+                    }
+                    // shuffle the order of the nodes
+                    for(size_t i = 0; i < PVT(l_net)->links_addrs_count; i++) {
+                        unsigned int l_new_node_pos = rand() % (PVT(l_net)->links_addrs_count);
+                        if(i == l_new_node_pos)
+                            continue;
+                        uint64_t l_tmp_uint64 = PVT(l_net)->links_addrs[i].uint64;
+                        PVT(l_net)->links_addrs[i].uint64 = PVT(l_net)->links_addrs[l_new_node_pos].uint64;
+                        PVT(l_net)->links_addrs[l_new_node_pos].uint64 = l_tmp_uint64;
+                    }
+
+
                 } break;
                 case NODE_ROLE_FULL:
                 case NODE_ROLE_MASTER:
@@ -326,6 +377,15 @@ lb_proc_state:
                             }
                         }
                     }
+                    // shuffle the order of the nodes
+                    for(size_t i = 0; i < PVT(l_net)->links_addrs_count; i++) {
+                        unsigned int l_new_node_pos = rand() % (PVT(l_net)->links_addrs_count);
+                        if(i==l_new_node_pos)
+                            continue;
+                        uint64_t l_tmp_uint64 = PVT(l_net)->links_addrs[i].uint64;
+                        PVT(l_net)->links_addrs[i].uint64 = PVT(l_net)->links_addrs[l_new_node_pos].uint64;
+                        PVT(l_net)->links_addrs[l_new_node_pos].uint64 = l_tmp_uint64;
+                    }
                     DAP_DELETE(l_cur_node_info);
                 }else {
                         // TODO read cell's nodelist and populate array with it
@@ -350,12 +410,11 @@ lb_proc_state:
         case NET_STATE_LINKS_CONNECTING:{
             log_it(L_NOTICE,"%s.state: NET_STATE_LINKS_CONNECTING",l_net->pub.name);
             size_t l_links_established = 0;
-            //for (size_t i =0 ; i < PVT(l_net)->links_addrs_count ; i++ )
-            for (size_t i = PVT(l_net)->link_cur ; i < PVT(l_net)->links_addrs_count ; i++ )
-            {
+            for(size_t j = PVT(l_net)->link_cur; j < PVT(l_net)->links_addrs_count; j++) {
+                //size_t j =
                 log_it(L_INFO,"Establishing connection with " NODE_ADDR_FP_STR,
-                       NODE_ADDR_FP_ARGS_S( PVT(l_net)->links_addrs[i]) );
-                dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, &PVT(l_net)->links_addrs[i] );
+                       NODE_ADDR_FP_ARGS_S( PVT(l_net)->links_addrs[j]) );
+                dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, &PVT(l_net)->links_addrs[j] );
                 if ( l_link_node_info ) {
                     dap_chain_node_client_t *l_node_client = dap_chain_node_client_connect(l_link_node_info );
                     if(!l_node_client) {
@@ -367,13 +426,13 @@ lb_proc_state:
                     int timeout_ms = 5000; //15 sec = 15000 ms
                     int res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_CONNECTED, timeout_ms);
                     if (res == 0 ){
-                        log_it(L_NOTICE, "Connected link %u",i);
+                        log_it(L_NOTICE, "Connected link %u",j);
                         l_links_established++;
                         HASH_ADD(hh,PVT(l_net)->links, remote_node_addr,sizeof(l_node_client->remote_node_addr), l_node_client);
                         PVT(l_net)->link_cur++;
                         break;
                     }else {
-                        log_it(L_NOTICE, "Cant establish link %u",i);
+                        log_it(L_NOTICE, "Cant establish link %u",j);
                         dap_chain_node_client_close(l_node_client);
                     }
                 }
@@ -413,7 +472,7 @@ lb_proc_state:
                         } break;
                        default:{
                         // get addr for current node if it absent
-                        if(dap_chain_net_get_cur_addr_int(l_net))
+                        if(!dap_chain_net_get_cur_addr_int(l_net))
                             PVT(l_net)->state = NET_STATE_ADDR_REQUEST;
                         else
                             PVT( l_net)->state = NET_STATE_SYNC_GDB;
@@ -564,18 +623,25 @@ lb_proc_state:
 
         case NET_STATE_SYNC_CHAINS:{
             dap_chain_node_client_t * l_node_client = NULL, *l_node_client_tmp = NULL;
-            uint8_t l_ch_id = dap_stream_ch_chain_get_id(); // Channel id for global_db sync
+            uint8_t l_ch_id = dap_stream_ch_chain_get_id(); // Channel id for global_db and chains sync
+
             HASH_ITER(hh,PVT(l_net)->links,l_node_client,l_node_client_tmp){
+                dap_stream_ch_t * l_ch_chain = dap_client_get_stream_ch(l_node_client->client, l_ch_id);
+                if(!l_ch_chain){
+                    log_it(L_DEBUG,"Can't get stream_ch for id='%c' ", l_ch_id);
+                    continue;
+                }
                 dap_chain_t * l_chain = NULL;
                 DL_FOREACH(l_net->pub.chains, l_chain ){
-                    size_t l_lasts_size = 0;
-                    dap_chain_atom_ptr_t * l_lasts;
-                    dap_chain_atom_iter_t * l_atom_iter = l_chain->callback_atom_iter_create(l_chain);
-                    l_lasts = l_chain->callback_atom_iter_get_lasts(l_atom_iter, &l_lasts_size);
-                    if( l_lasts ) {
+                    //size_t l_lasts_size = 0;
+                    //dap_chain_atom_ptr_t * l_lasts;
+                    //dap_chain_atom_iter_t * l_atom_iter = l_chain->callback_atom_iter_create(l_chain);
+                    //l_lasts = l_chain->callback_atom_iter_get_lasts(l_atom_iter, &l_lasts_size);
+                    //if( l_lasts ) {
+                        l_node_client->state = NODE_CLIENT_STATE_CONNECTED;
                         dap_stream_ch_chain_sync_request_t l_request = { { 0 } };
-                        dap_hash_fast(l_lasts[0], l_chain->callback_atom_get_size(l_lasts[0]), &l_request.hash_from);
-                        dap_stream_ch_chain_pkt_write(dap_client_get_stream_ch(l_node_client->client, l_ch_id),
+                        //dap_hash_fast(l_lasts[0], l_chain->callback_atom_get_size(l_lasts[0]), &l_request.hash_from);
+                        dap_stream_ch_chain_pkt_write(l_ch_chain,
                         DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id, l_chain->id,
                                 l_net->pub.cell_id, &l_request, sizeof(l_request));
                         //
@@ -589,18 +655,24 @@ lb_proc_state:
                         int l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
                         switch (l_res) {
                         case -1:
-                            log_it(L_WARNING,"Timeout with link sync");
+                            log_it(L_WARNING,"Timeout with sync of chain '%s' ", l_chain->name);
                             break;
                         case 0:
-                            log_it(L_INFO, "Node sync completed");
+                            log_it(L_INFO, "sync of chain '%s' completed ", l_chain->name);
+                            // set time of last sync
+                            {
+                                struct timespec l_to;
+                                clock_gettime( CLOCK_MONOTONIC, &l_to);
+                                PVT(l_net)->last_sync = l_to.tv_sec;
+                            }
                             break;
                         default:
-                            log_it(L_INFO, "Node sync error %d",l_res);
+                            log_it(L_INFO, "sync of chain '%s' error %d", l_chain->name,l_res);
                         }
 
-                        DAP_DELETE( l_lasts );
-                    }
-                    DAP_DELETE( l_atom_iter );
+                        //DAP_DELETE( l_lasts );
+                    //}
+                    //DAP_DELETE( l_atom_iter );
                 }
 
             }
@@ -621,11 +693,20 @@ lb_proc_state:
                 log_it(L_NOTICE, "Going to disconnet");
                 pthread_mutex_unlock(&PVT(l_net)->state_mutex);
                 goto lb_proc_state;
+            case NET_STATE_ONLINE:
+                // if flag set then go to SYNC_GDB
+                if(PVT(l_net)->flags & F_DAP_CHAIN_NET_GO_SYNC)
+                    PVT(l_net)->flags &= ~F_DAP_CHAIN_NET_GO_SYNC;
+                else
+                    break;
                 // sync
             case NET_STATE_SYNC_GDB:
                 PVT(l_net)->state = NET_STATE_SYNC_GDB;
                 pthread_mutex_unlock(&PVT(l_net)->state_mutex);
                 goto lb_proc_state;
+                    PVT(l_net)->state = NET_STATE_SYNC_GDB;
+                    pthread_mutex_unlock(&PVT(l_net)->state_mutex);
+                    goto lb_proc_state;
             }
         }
             break;
@@ -657,8 +738,9 @@ static void *s_net_proc_thread ( void *a_net )
         // prepare for signal waiting
 
         struct timespec l_to;
+
         clock_gettime( CLOCK_MONOTONIC, &l_to );
-        int64_t l_nsec_new = l_to.tv_nsec + l_timeout_ms * 10000000ll;
+        int64_t l_nsec_new = l_to.tv_nsec + l_timeout_ms * 1000000ll;
 
         // if the new number of nanoseconds is more than a second
         if(l_nsec_new > (long) 1e9) {
@@ -671,14 +753,19 @@ static void *s_net_proc_thread ( void *a_net )
         // signal waiting
         pthread_cond_timedwait( &p_net->state_proc_cond, &p_net->state_mutex, &l_to );
 
-        //pthread_cond_wait(&PVT(l_net)->state_proc_cond,&PVT(l_net)->state_mutex);
+        // checking whether new sync is needed
+        time_t l_sync_timeout = 300;// 300 sec = 5 min
+        clock_gettime( CLOCK_MONOTONIC, &l_to );
+        if(l_to.tv_sec >= p_net->last_sync + l_sync_timeout)
+            p_net->flags |= F_DAP_CHAIN_NET_GO_SYNC;
+
         pthread_mutex_unlock( &p_net->state_mutex );
     #else // WIN32
 
         WaitForSingleObject( p_net->state_proc_cond, (uint32_t)l_timeout_ms );
 
     #endif
-        //log_it( L_DEBUG, "Waked up s_net_proc_thread( )" );
+        log_it( L_DEBUG, "Waked up s_net_proc_thread( )" );
     }
 
     return NULL;
@@ -1756,7 +1843,8 @@ void dap_chain_net_proc_mempool (dap_chain_net_t * a_net)
             // Delete processed objects
             size_t l_objs_processed_tmp = (l_objs_processed > 15) ? min(l_objs_processed, 10) : l_objs_processed;
             for(size_t i = 0; i < l_objs_processed; i++) {
-                dap_chain_global_db_gr_del(l_objs[i].key, l_gdb_group_mempool);
+                if(dap_chain_global_db_gr_del(l_objs[i].key, l_gdb_group_mempool))
+                    s_net_set_go_sync(a_net);
                 if(i < l_objs_processed_tmp) {
                     dap_string_append_printf(l_str_tmp, "New event created, removed datum 0x%s from mempool \n",
                             l_objs[i].key);
diff --git a/dap_chain_net.h b/dap_chain_net.h
index 5cc4f2c16f..1e6e03a584 100644
--- a/dap_chain_net.h
+++ b/dap_chain_net.h
@@ -84,6 +84,7 @@ void dap_chain_net_deinit(void);
 
 void dap_chain_net_load_all();
 
+void s_net_set_go_sync(dap_chain_net_t * a_net);
 int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_new_state);
 
 inline static int dap_chain_net_start(dap_chain_net_t * a_net){ return dap_chain_net_state_go_to(a_net,NET_STATE_ONLINE); }
diff --git a/dap_chain_node.c b/dap_chain_node.c
index 790b627340..0e9ff6584e 100644
--- a/dap_chain_node.c
+++ b/dap_chain_node.c
@@ -90,6 +90,8 @@ bool dap_chain_node_alias_register(dap_chain_net_t * a_net,const char *alias, da
 //    a_value[2 * sizeof(dap_chain_node_addr_t)] = '\0';
     bool res = dap_chain_global_db_gr_set(a_key,  addr, sizeof(dap_chain_node_addr_t)
                                           , a_net->pub.gdb_nodes_aliases);
+    if(res)
+        s_net_set_go_sync(a_net);
     return res;
 }
 
@@ -113,6 +115,8 @@ bool dap_chain_node_alias_delete(dap_chain_net_t * a_net,const char *a_alias)
 {
     char *a_key = strdup(a_alias);
     bool res = dap_chain_global_db_gr_del(a_key, a_net->pub.gdb_nodes_aliases);
+    if(res)
+        s_net_set_go_sync(a_net);
     return res;
 }
 
@@ -151,6 +155,8 @@ int dap_chain_node_info_save(dap_chain_net_t * a_net, dap_chain_node_info_t *nod
     //char *a_value = dap_chain_node_info_serialize(node_info, NULL);
     size_t node_info_size = dap_chain_node_info_get_size(node_info);
     bool res = dap_chain_global_db_gr_set(l_key, node_info, node_info_size, a_net->pub.gdb_nodes);
+    if(res)
+        s_net_set_go_sync(a_net);
     DAP_DELETE(l_key);
     //DAP_DELETE(a_value);
     return res?0:-3;
diff --git a/dap_chain_node_cli_cmd.c b/dap_chain_node_cli_cmd.c
index 2d6b43577b..ff28d49712 100644
--- a/dap_chain_node_cli_cmd.c
+++ b/dap_chain_node_cli_cmd.c
@@ -223,6 +223,8 @@ static bool node_info_save_and_reply(dap_chain_net_t * a_net, dap_chain_node_inf
     bool res = dap_chain_global_db_gr_set(a_key, (uint8_t *) node_info, node_info_size,a_net->pub.gdb_nodes);
     DAP_DELETE(a_key);
     //DAP_DELETE(a_value);
+    if(res)
+        s_net_set_go_sync(a_net);
     return res;
 }
 
@@ -323,6 +325,7 @@ static int node_info_del_with_reply(dap_chain_net_t * a_net, dap_chain_node_info
             }
             // set text response
             dap_chain_node_cli_set_reply_text(str_reply, "node deleted");
+            s_net_set_go_sync(a_net);
         }
         else
             dap_chain_node_cli_set_reply_text(str_reply, "node not deleted");
@@ -1101,7 +1104,7 @@ int com_node(int a_argc, char ** a_argv, char **a_str_reply)
         if(0 == dap_stream_ch_chain_pkt_write(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB,
                 l_net->pub.id, l_chain_id_null, l_chain_cell_id_null, &l_sync_request,
                 sizeof(l_sync_request))) {
-            dap_chain_node_cli_set_reply_text(a_str_reply, "Error: Cant send sync chains request");
+            dap_chain_node_cli_set_reply_text(a_str_reply, "Error: Can't send sync chains request");
             // clean client struct
             dap_chain_node_client_close(l_node_client);
             DAP_DELETE(l_remote_node_info);
@@ -1838,6 +1841,8 @@ int com_token_decl_sign(int argc, char ** argv, char ** a_str_reply)
 
                     // Add datum to mempool with datum_token hash as a key
                     if(dap_chain_global_db_gr_set(l_key_str, (uint8_t *) l_datum, l_datum_size, l_gdb_group_mempool)) {
+                        s_net_set_go_sync(l_net);
+
                         char* l_hash_str = strdup(l_datum_hash_str);
                         // Remove old datum from pool
                         if(dap_chain_global_db_gr_del( l_hash_str, l_gdb_group_mempool)) {
@@ -1848,6 +1853,7 @@ int com_token_decl_sign(int argc, char ** argv, char ** a_str_reply)
                             DAP_DELETE(l_datum);
                             //DAP_DELETE(l_datum_token);
                             DAP_DELETE(l_gdb_group_mempool);
+                            s_net_set_go_sync(l_net);
                             return 0;
                         } else {
                             dap_chain_node_cli_set_reply_text(a_str_reply,
@@ -2001,6 +2007,7 @@ int com_mempool_delete(int argc, char ** argv, char ** a_str_reply)
             if(dap_chain_global_db_gr_del(l_datum_hash_str2, l_gdb_group_mempool)) {
                 dap_chain_node_cli_set_reply_text(a_str_reply, "Datum %s deleted", l_datum_hash_str);
                 DAP_DELETE( l_datum_hash_str2);
+                s_net_set_go_sync(l_net);
                 return 0;
             } else {
                 dap_chain_node_cli_set_reply_text(a_str_reply, "Error! Can't find datum %s", l_datum_hash_str);
@@ -2090,7 +2097,8 @@ int com_mempool_proc(int argc, char ** argv, char ** a_str_reply)
             for(size_t i = 0; i < l_datums_size; i++) {
                 if(l_procecced[i]!=1)
                     continue;
-                dap_chain_global_db_gr_del(l_objs[i].key, l_gdb_group_mempool_tmp);
+                if(dap_chain_global_db_gr_del(l_objs[i].key, l_gdb_group_mempool_tmp))
+                    s_net_set_go_sync(l_net);
                 l_objs_processed_cur++;
                 if(l_objs_processed_cur < l_objs_processed_tmp) {
                     dap_string_append_printf(l_str_tmp, "New event created, removed datum 0x%s from mempool \n",
@@ -2275,6 +2283,7 @@ int com_token_decl(int argc, char ** argv, char ** a_str_reply)
 
     }
     if(dap_chain_global_db_gr_set(l_key_str, (uint8_t *) l_datum, l_datum_size, l_gdb_group_mempool)) {
+        s_net_set_go_sync(l_net);
         dap_chain_node_cli_set_reply_text(a_str_reply, "datum %s with token %s is placed in datum pool ", l_key_str,
                 l_ticker);
         DAP_DELETE(l_datum);
@@ -2487,6 +2496,7 @@ int com_token_emit(int argc, char ** argv, char ** str_reply)
     // Add to mempool emission token
     if(dap_chain_global_db_gr_set(l_key_str, (uint8_t *) l_datum_emission, l_datum_emission_size
             , l_gdb_group_mempool_emission)) {
+        s_net_set_go_sync(l_net);
         str_reply_tmp = dap_strdup_printf("datum emission %s is placed in datum pool ", l_key_str);
     }
     else {
@@ -2541,6 +2551,7 @@ int com_token_emit(int argc, char ** argv, char ** str_reply)
     // Add to mempool tx token
     if(dap_chain_global_db_gr_set(l_key_str, l_datum_tx, l_datum_tx_size
             , l_gdb_group_mempool_base_tx)) {
+        s_net_set_go_sync(l_net);
         dap_chain_node_cli_set_reply_text(str_reply, "%s\ndatum tx %s is placed in datum pool ", str_reply_tmp,
                 l_key_str);
     }
-- 
GitLab