From adc802939fb7c14bb540637f644db3ce98c85786 Mon Sep 17 00:00:00 2001
From: "roman.khlopkov" <roman.khlopkov@demlabs.net>
Date: Mon, 13 May 2024 21:40:25 +0300
Subject: [PATCH] [+] GOSSIP for legacy GlobalDB packets; [*] MT net loading

---
 modules/chain/dap_chain_ch.c |  8 ++---
 modules/net/dap_chain_net.c  | 70 +++++++++++++++++++++---------------
 2 files changed, 46 insertions(+), 32 deletions(-)

diff --git a/modules/chain/dap_chain_ch.c b/modules/chain/dap_chain_ch.c
index ee90a3d793..63efeb33a9 100644
--- a/modules/chain/dap_chain_ch.c
+++ b/modules/chain/dap_chain_ch.c
@@ -484,6 +484,7 @@ struct record_processing_args {
     dap_stream_ch_uuid_t uuid;
     dap_chain_ch_pkt_hdr_t hdr;
     dap_global_db_pkt_old_t *pkt;
+    bool new;
 };
 
 static bool s_gdb_in_pkt_proc_callback(void *a_arg)
@@ -497,12 +498,13 @@ static bool s_gdb_in_pkt_proc_callback(void *a_arg)
         DAP_DELETE(l_args);
         return false;
     }
-
     bool l_success = false;
     dap_stream_node_addr_t l_blank_addr = { .uint64 = 0 };
     for (uint32_t i = 0; i < l_objs_count; i++)
         if (!(l_success = dap_global_db_ch_check_store_obj(l_objs + i, &l_blank_addr)))
             break;
+    if (l_args->new && l_objs_count == 1)
+        l_objs[0].flags |= DAP_GLOBAL_DB_RECORD_NEW;
     if (l_success && dap_global_db_set_raw_sync(l_objs, l_objs_count)) {
         const char *l_err_str = s_error_type_to_string(DAP_CHAIN_CH_ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED);
         dap_chain_ch_pkt_t *l_chain_pkt = dap_chain_ch_pkt_new(l_args->hdr.net_id, l_args->hdr.chain_id, l_args->hdr.cell_id, l_err_str, strlen(l_err_str));
@@ -1243,9 +1245,6 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
         struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context;
         if (l_context && l_context->state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB_REMOTE) {
             log_it(L_WARNING, "Can't process GLOBAL_DB packet cause synchronization sequence violation");
-            dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
-                    l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
-                    DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
             break;
         }
         if (l_context)
@@ -1265,6 +1264,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
         l_args->worker = a_ch->stream_worker;
         l_args->uuid = a_ch->uuid;
         l_args->hdr = l_chain_pkt->hdr;
+        l_args->new = !l_context && l_pkt->obj_count == 1;
         dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_gdb_in_pkt_proc_callback, l_args);
     } break;
 
diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c
index 0d59a04016..4ca57ad20e 100644
--- a/modules/net/dap_chain_net.c
+++ b/modules/net/dap_chain_net.c
@@ -211,6 +211,9 @@ typedef struct dap_chain_net_item{
 #define PVT_S(a) ((dap_chain_net_pvt_t *)a.pvt)
 
 static dap_chain_net_item_t *s_net_items = NULL, *s_net_ids = NULL;
+static pthread_mutex_t s_net_cond_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t s_net_cond = PTHREAD_COND_INITIALIZER;
+static uint16_t s_net_loading_count = 0;
 
 static const char *c_net_states[] = {
     [NET_STATE_OFFLINE]             = "NET_STATE_OFFLINE",
@@ -247,7 +250,7 @@ static void s_net_states_notify(dap_chain_net_t * l_net);
 static void s_nodelist_change_notify(dap_store_obj_t *a_obj, void *a_arg);
 //static void s_net_proc_kill( dap_chain_net_t * a_net );
 static int s_net_init(const char * a_net_name, uint16_t a_acl_idx);
-static int s_net_load(dap_chain_net_t *a_net);
+static bool s_net_load(void *a_arg);
 static int s_net_try_online(dap_chain_net_t *a_net);
 static int s_cli_net(int argc, char ** argv, void **a_str_reply);
 static uint8_t *s_net_set_acl(dap_chain_hash_fast_t *a_pkey_hash);
@@ -743,17 +746,18 @@ static dap_chain_net_t *s_net_new(dap_chain_net_id_t *a_id, const char *a_name,
  */
 void dap_chain_net_load_all()
 {
-    int l_ret = 0;
-    if(!HASH_COUNT(s_net_items)){
+    s_net_loading_count = HASH_COUNT(s_net_items);
+    if (!s_net_loading_count) {
         log_it(L_ERROR, "Can't find any nets");
         return;
     }
+    pthread_mutex_lock(&s_net_cond_lock);
     dap_chain_net_item_t *l_net_items_current = NULL, *l_net_items_tmp = NULL;
-    HASH_ITER(hh, s_net_items, l_net_items_current, l_net_items_tmp) {
-        if( (l_ret = s_net_load(l_net_items_current->chain_net)) ) {
-            log_it(L_ERROR, "Loading chains of net %s finished with (%d) error code.", l_net_items_current->name, l_ret);
-        }
-    }
+    HASH_ITER(hh, s_net_items, l_net_items_current, l_net_items_tmp)
+        dap_proc_thread_callback_add(NULL, s_net_load, l_net_items_current->chain_net);
+    while (s_net_loading_count)
+        pthread_cond_wait(&s_net_cond, &s_net_cond_lock);
+    pthread_mutex_unlock(&s_net_cond_lock);
 }
 
 dap_string_t* dap_cli_list_net()
@@ -2102,20 +2106,21 @@ int s_net_init(const char *a_net_name, uint16_t a_acl_idx)
     return 0;
 }
 
-int s_net_load(dap_chain_net_t *a_net)
+bool s_net_load(void *a_arg)
 {
-    dap_chain_net_t *l_net = a_net;
+    dap_chain_net_t *l_net = a_arg;
+    int l_err_code = 0;
 
-    dap_config_t *l_cfg = NULL;
-    char *l_cfg_path = dap_strdup_printf("network/%s", a_net->pub.name);
-    l_cfg = dap_config_open ( l_cfg_path );
+    char *l_cfg_path = dap_strdup_printf("network/%s", l_net->pub.name);
+    dap_config_t *l_cfg = dap_config_open(l_cfg_path);
     DAP_DELETE(l_cfg_path);
     if (!l_cfg) {
         log_it(L_ERROR,"Can't open default network config");
-        return -1;
+        l_err_code = -1;
+        goto ret;
     }
 
-    dap_chain_net_pvt_t * l_net_pvt = PVT(l_net);
+    dap_chain_net_pvt_t *l_net_pvt = PVT(l_net);
 
     // reload ledger cache at once
     if (s_chain_net_reload_ledger_cache_once(l_net)) {
@@ -2184,19 +2189,19 @@ int s_net_load(dap_chain_net_t *a_net)
         case NODE_ROLE_ROOT:{
             // Set to process only zerochain
             dap_chain_id_t l_chain_id = {{0}};
-            dap_chain_t * l_chain = dap_chain_find_by_id(l_net->pub.id,l_chain_id);
-            if (l_chain )
-               l_chain->is_datum_pool_proc = true;
+            dap_chain_t *l_chain = dap_chain_find_by_id(l_net->pub.id, l_chain_id);
+            if (l_chain)
+                l_chain->is_datum_pool_proc = true;
             log_it(L_INFO,"Root node role established");
         } break;
         case NODE_ROLE_CELL_MASTER:
         case NODE_ROLE_MASTER:{
             uint16_t l_proc_chains_count=0;
-            char ** l_proc_chains = dap_config_get_array_str(l_cfg,"role-master" , "proc_chains", &l_proc_chains_count );
-            for ( size_t i = 0; i< l_proc_chains_count ; i++) {
+            char **l_proc_chains = dap_config_get_array_str(l_cfg, "role-master", "proc_chains", &l_proc_chains_count);
+            for (size_t i = 0; i< l_proc_chains_count ; i++) {
                 dap_chain_id_t l_chain_id = {};
                 if (dap_chain_id_parse(l_proc_chains[i], &l_chain_id) == 0) {
-                    dap_chain_t * l_chain = dap_chain_find_by_id(l_net->pub.id, l_chain_id );
+                    dap_chain_t *l_chain = dap_chain_find_by_id(l_net->pub.id, l_chain_id );
                     if (l_chain)
                         l_chain->is_datum_pool_proc = true;
                     else
@@ -2231,7 +2236,8 @@ int s_net_load(dap_chain_net_t *a_net)
                                                     DAP_CLUSTER_ROLE_EMBEDDED);
         if (!l_cluster) {
             log_it(L_ERROR, "Can't initialize mempool cluster for network %s", l_net->pub.name);
-            return -1;
+            l_err_code = -2;
+            goto ret;
         }
         dap_chain_net_add_auth_nodes_to_cluster(l_net, l_cluster);
         DAP_DELETE(l_gdb_groups_mask);
@@ -2247,7 +2253,7 @@ int s_net_load(dap_chain_net_t *a_net)
                                                           DAP_CLUSTER_ROLE_EMBEDDED);
     if (!l_net_pvt->orders_cluster) {
         log_it(L_ERROR, "Can't initialize orders cluster for network %s", l_net->pub.name);
-        return -1;
+        goto ret;
     }
     dap_chain_net_add_auth_nodes_to_cluster(l_net, l_net_pvt->orders_cluster);
     DAP_DELETE(l_gdb_groups_mask);
@@ -2269,7 +2275,8 @@ int s_net_load(dap_chain_net_t *a_net)
                                                          DAP_CLUSTER_ROLE_EMBEDDED);
     if (!l_net_pvt->nodes_cluster) {
         log_it(L_ERROR, "Can't initialize nodes cluster for network %s", l_net->pub.name);
-        return -1;
+        l_err_code = -3;
+        goto ret;
     }
     dap_chain_net_add_auth_nodes_to_cluster(l_net, l_net_pvt->nodes_cluster);
     dap_chain_net_add_nodelist_notify_callback(l_net, s_nodelist_change_notify, l_net);
@@ -2338,10 +2345,17 @@ int s_net_load(dap_chain_net_t *a_net)
     l_net_pvt->sync_context.sync_idle_time = dap_config_get_item_uint32_default(g_config, "chain", "sync_idle_time", 60);
     dap_proc_thread_timer_add(NULL, s_sync_timer_callback, l_net, 1000);
 
-    log_it(L_INFO, "Chain network \"%s\" initialized",l_net->pub.name);
-    dap_config_close(l_cfg);
-
-    return 0;
+    log_it(L_INFO, "Chain network \"%s\" initialized", l_net->pub.name);
+ret:
+    if (l_err_code)
+        log_it(L_ERROR, "Loading chains of net %s finished with (%d) error code.", l_net->pub.name, l_err_code);
+    if (l_cfg)
+        dap_config_close(l_cfg);
+    pthread_mutex_lock(&s_net_cond_lock);
+    s_net_loading_count--;
+    pthread_cond_signal(&s_net_cond);
+    pthread_mutex_unlock(&s_net_cond_lock);
+    return false;
 }
 
 static const uint64_t s_fork_sync_step = 20; // TODO get it from config
-- 
GitLab