From 9d174754035e4dc03913314dfe293298021e0580 Mon Sep 17 00:00:00 2001
From: "Constantin P." <papizh.konstantin@demlabs.net>
Date: Wed, 11 Sep 2024 06:42:44 +0000
Subject: [PATCH] Hotfix 13068 5

---
 .../consensus/esbocs/dap_chain_cs_esbocs.c    | 119 +++++++++---------
 .../esbocs/include/dap_chain_cs_esbocs.h      |   3 +-
 2 files changed, 63 insertions(+), 59 deletions(-)

diff --git a/modules/consensus/esbocs/dap_chain_cs_esbocs.c b/modules/consensus/esbocs/dap_chain_cs_esbocs.c
index 9d4eeca4ee..9ec843ca71 100644
--- a/modules/consensus/esbocs/dap_chain_cs_esbocs.c
+++ b/modules/consensus/esbocs/dap_chain_cs_esbocs.c
@@ -427,7 +427,7 @@ static void s_new_atom_notifier(void *a_arg, dap_chain_t *a_chain, dap_chain_cel
 {
     dap_chain_esbocs_session_t *l_session = a_arg;
     assert(l_session->chain == a_chain);
-    pthread_mutex_lock(&l_session->mutex);
+    //pthread_mutex_lock(&l_session->mutex);
     dap_chain_hash_fast_t l_last_block_hash;
     dap_chain_get_atom_last_hash(l_session->chain, a_id, &l_last_block_hash);
     if (!dap_hash_fast_compare(&l_last_block_hash, &l_session->cur_round.last_block_hash) &&
@@ -435,7 +435,7 @@ static void s_new_atom_notifier(void *a_arg, dap_chain_t *a_chain, dap_chain_cel
         l_session->new_round_enqueued = true;
         s_session_round_new(l_session);
     }
-    pthread_mutex_unlock(&l_session->mutex);
+    //pthread_mutex_unlock(&l_session->mutex);
     if (!PVT(l_session->esbocs)->collecting_addr)
         return;
     dap_chain_esbocs_block_collect_t l_block_collect_params = (dap_chain_esbocs_block_collect_t){
@@ -486,6 +486,7 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf
     DAP_NEW_Z_RET_VAL(l_session, dap_chain_esbocs_session_t, -8, NULL);
     l_session->chain = a_chain;
     l_session->esbocs = l_esbocs;
+    l_session->proc_thread = dap_proc_thread_get_auto();
     l_esbocs->session = l_session;
     DL_APPEND(s_session_items, l_session);
     log_it(L_INFO, "Init ESBOCS session for net:%s, chain:%s", a_chain->net_name, a_chain->name);
@@ -592,11 +593,11 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf
         log_it(L_ERROR, "This validator is not allowed to work in emergency mode. Use special decree to supply it");
         return -5;
     }
-    pthread_mutex_init(&l_session->mutex, NULL);
+    //pthread_mutex_init(&l_session->mutex, NULL);
     dap_chain_add_callback_notify(a_chain, s_new_atom_notifier, l_session);
     s_session_round_new(l_session);
 
-    l_session->cs_timer = !dap_proc_thread_timer_add(NULL, s_session_proc_state, l_session, 1000);
+    l_session->cs_timer = !dap_proc_thread_timer_add(l_session->proc_thread, s_session_proc_state, l_session, 1000);
     debug_if(l_esbocs_pvt->debug && l_session->cs_timer, L_MSG, "Consensus main timer is started");
 
     DAP_CHAIN_PVT(a_chain)->cs_started = true;
@@ -788,7 +789,7 @@ static void s_callback_delete(dap_chain_cs_blocks_t *a_blocks)
         log_it(L_INFO, "No session found");
         return;
     }
-    pthread_mutex_lock(&l_session->mutex);
+    //pthread_mutex_lock(&l_session->mutex);
     DL_DELETE(s_session_items, l_session);
     s_session_round_clear(l_session);
     dap_chain_esbocs_sync_item_t *l_sync_item, *l_sync_tmp;
@@ -802,8 +803,8 @@ static void s_callback_delete(dap_chain_cs_blocks_t *a_blocks)
         HASH_DEL(l_session->penalty, l_pen_item);
         DAP_DELETE(l_pen_item);
     }
-    pthread_mutex_unlock(&l_session->mutex);
-    pthread_mutex_destroy(&l_session->mutex);
+    //pthread_mutex_unlock(&l_session->mutex);
+    //pthread_mutex_destroy(&l_session->mutex);
     DAP_DEL_MULTY(l_session, a_blocks->_inheritor); // a_blocks->_inheritor - l_esbocs
 }
 
@@ -1031,10 +1032,10 @@ static void s_session_send_startsync(dap_chain_esbocs_session_t *a_session)
 static bool s_session_send_startsync_on_timer(void *a_arg)
 {
     dap_chain_esbocs_session_t *l_session = a_arg;
-    pthread_mutex_lock(&l_session->mutex);
+    //pthread_mutex_lock(&l_session->mutex);
     s_session_send_startsync(l_session);
     l_session->sync_timer = NULL;
-    pthread_mutex_unlock(&l_session->mutex);
+    //pthread_mutex_unlock(&l_session->mutex);
     return false;
 }
 
@@ -1461,8 +1462,7 @@ static void s_session_state_change(dap_chain_esbocs_session_t *a_session, enum s
             log_it(L_ERROR, "No previous state registered, can't roll back");
             if (!a_session->new_round_enqueued) {
                 a_session->new_round_enqueued = true;
-                dap_proc_thread_t *l_thread = DAP_PROC_THREAD(dap_context_current());
-                dap_proc_thread_callback_add(l_thread, s_session_round_new, a_session);
+                dap_proc_thread_callback_add(a_session->proc_thread, s_session_round_new, a_session);
             }
         }
     }
@@ -1476,8 +1476,8 @@ static void s_session_proc_state(void *a_arg)
     dap_chain_esbocs_session_t *l_session = a_arg;
     if (!l_session->cs_timer)
         return; // Timer is inactive
-    if (pthread_mutex_trylock(&l_session->mutex) != 0)
-        return; // Session is busy
+    //if (pthread_mutex_trylock(&l_session->mutex) != 0)
+    //    return; // Session is busy
     bool l_cs_debug = PVT(l_session->esbocs)->debug;
     dap_time_t l_time = dap_time_now();
     switch (l_session->state) {
@@ -1495,8 +1495,7 @@ static void s_session_proc_state(void *a_arg)
                                                                         l_session->cur_round.id);
                 if (!l_session->new_round_enqueued) {
                     l_session->new_round_enqueued = true;
-                    dap_proc_thread_t *l_thread = DAP_PROC_THREAD(dap_context_current());
-                    dap_proc_thread_callback_add(l_thread, s_session_round_new, l_session);
+                    dap_proc_thread_callback_add(l_session->proc_thread, s_session_round_new, l_session);
                 }
                 break;
             }
@@ -1518,8 +1517,7 @@ static void s_session_proc_state(void *a_arg)
                 l_session->sync_failed = true;
                 if (!l_session->new_round_enqueued) {
                     l_session->new_round_enqueued = true;
-                    dap_proc_thread_t *l_thread = DAP_PROC_THREAD(dap_context_current());
-                    dap_proc_thread_callback_add(l_thread, s_session_round_new, l_session);
+                    dap_proc_thread_callback_add(l_session->proc_thread, s_session_round_new, l_session);
                 }
             }
         }
@@ -1585,7 +1583,7 @@ static void s_session_proc_state(void *a_arg)
         break;
     }
 
-    pthread_mutex_unlock(&l_session->mutex);
+    //pthread_mutex_unlock(&l_session->mutex);
 }
 
 static void s_message_chain_add(dap_chain_esbocs_session_t *a_session,
@@ -2187,11 +2185,9 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg)
         log_it(L_CRITICAL, "%s", c_error_memory_alloc);
         return false;
     }
-    l_args->addr_from = a_ch->stream->node;
-    l_args->session = l_session;
-    l_args->message_size = l_message_size;
+    *l_args = (struct esbocs_msg_args){ a_ch->stream->node, l_session, l_message_size };
     memcpy(l_args->message, l_message, l_message_size);
-    dap_proc_thread_callback_add(dap_worker_get_current()->proc_queue_input, s_process_incoming_message, l_args);
+    dap_proc_thread_callback_add(l_session->proc_thread, s_process_incoming_message, l_args);
     return true;
 }
 
@@ -2223,10 +2219,11 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain
     dap_hash_fast(l_message, a_data_size, &l_data_hash);
 
     if (a_sender_node_addr) { //Process network messages only
-        pthread_mutex_lock(&l_session->mutex);
+        //pthread_mutex_lock(&l_session->mutex);
         if (l_message->hdr.chain_id.uint64 != l_session->chain->id.uint64) {
             debug_if(l_cs_debug, L_MSG, "Invalid chain ID %"DAP_UINT64_FORMAT_U, l_message->hdr.chain_id.uint64);
-            goto session_unlock;
+            //goto session_unlock;
+            return;
         }
         // check hash message dup
         dap_chain_esbocs_message_item_t *l_message_item_temp = NULL;
@@ -2236,7 +2233,8 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain
                                         " Message rejected: message hash is exists in chain (duplicate)",
                                             l_session->chain->net_name, l_session->chain->name,
                                                 l_session->cur_round.id, l_message->hdr.attempt_num);
-            goto session_unlock;
+            //goto session_unlock;
+            return;
         }
         l_message->hdr.sign_size = 0;   // restore header on signing time
         if (dap_sign_verify_all(l_sign, l_sign_size, l_message, l_message_data_size + sizeof(l_message->hdr))) {
@@ -2244,7 +2242,8 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain
                                         " Message rejected from addr:"NODE_ADDR_FP_STR" not passed verification",
                                             l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id,
                                                 l_session->cur_round.attempt_num, NODE_ADDR_FP_ARGS(a_sender_node_addr));
-            goto session_unlock;
+            //goto session_unlock;
+            return;
         }
         l_message->hdr.sign_size = l_sign_size; // restore original header
 
@@ -2256,7 +2255,8 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain
                                                 l_session->chain->net_name, l_session->chain->name,
                                                     l_session->cur_round.id);
                 s_session_sync_queue_add(l_session, l_message, a_data_size);
-                goto session_unlock;
+                //goto session_unlock;
+                return;
             }
         } else if (l_message->hdr.round_id != l_session->cur_round.id) {
             // round check
@@ -2291,7 +2291,8 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain
                                                     l_session->chain->net_name, l_session->chain->name,
                                                         l_session->cur_round.id, l_message->hdr.attempt_num,
                                                             s_voting_msg_type_to_str(l_message->hdr.type));
-                    goto session_unlock;
+                    //goto session_unlock;
+                    return;
                 }
             }
         }
@@ -2327,7 +2328,8 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain
                                     " Message rejected: validator key:%s not in the current validators list or not synced yet",
                                         l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id,
                                             l_message->hdr.attempt_num, l_validator_addr_str);
-        goto session_unlock;
+        //goto session_unlock;
+        return;
     }
 
     switch (l_message->hdr.type) {
@@ -2387,8 +2389,7 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain
                     l_session->cur_round.sync_attempt = l_sync_attempt - 1;
                     if (!l_session->new_round_enqueued) {
                         l_session->new_round_enqueued = true;
-                        dap_proc_thread_t *l_thread = DAP_PROC_THREAD(dap_context_current());
-                        dap_proc_thread_callback_add(l_thread, s_session_round_new, l_session);
+                        dap_proc_thread_callback_add(l_session->proc_thread, s_session_round_new, l_session);
                     }
                 }
             }
@@ -2467,7 +2468,8 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain
         l_store = DAP_NEW_Z(dap_chain_esbocs_store_t);
         if (!l_store) {
             log_it(L_CRITICAL, "%s", c_error_memory_alloc);
-            goto session_unlock;
+            //goto session_unlock;
+            return;
         }
         l_store->candidate_size = l_candidate_size;
         l_store->candidate_hash = *l_candidate_hash;
@@ -2693,9 +2695,9 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain
     default:
         break;
     }
-session_unlock:
-    if (a_sender_node_addr) //Process network message
-        pthread_mutex_unlock(&l_session->mutex);
+//session_unlock:
+    //if (a_sender_node_addr) //Process network message
+    //    pthread_mutex_unlock(&l_session->mutex);
 }
 
 static void s_message_send(dap_chain_esbocs_session_t *a_session, uint8_t a_message_type, dap_hash_fast_t *a_block_hash,
@@ -2704,44 +2706,44 @@ static void s_message_send(dap_chain_esbocs_session_t *a_session, uint8_t a_mess
     size_t l_message_size = sizeof(dap_chain_esbocs_message_hdr_t) + a_data_size;
     dap_chain_esbocs_message_t *l_message = NULL;
     DAP_NEW_Z_SIZE_RET(l_message, dap_chain_esbocs_message_t, l_message_size, NULL);
-    l_message->hdr.version = DAP_CHAIN_ESBOCS_PROTOCOL_VERSION;
-    l_message->hdr.type = a_message_type;
-    l_message->hdr.round_id = a_session->cur_round.id;
-    l_message->hdr.attempt_num = a_session->cur_round.attempt_num;
-    l_message->hdr.net_id = a_session->chain->net_id;
-    l_message->hdr.chain_id = a_session->chain->id;
-    l_message->hdr.ts_created = dap_time_now();
-    l_message->hdr.message_size = a_data_size;
-    l_message->hdr.candidate_hash = *a_block_hash;
+    *l_message = (dap_chain_esbocs_message_t) {
+        .hdr = (dap_chain_esbocs_message_hdr_t) { 
+            .version = DAP_CHAIN_ESBOCS_PROTOCOL_VERSION,
+            .type = a_message_type, 
+            .attempt_num = a_session->cur_round.attempt_num, 
+            .round_id = a_session->cur_round.id,
+            .message_size = a_data_size,
+            .ts_created = dap_time_now(),
+            .net_id = a_session->chain->net_id,
+            .chain_id = a_session->chain->id,
+            .candidate_hash = *a_block_hash
+        }
+    };
     if (a_data && a_data_size)
         memcpy(l_message->msg_n_sign, a_data, a_data_size);
 
     for (dap_list_t *it = a_validators; it; it = it->next) {
         dap_chain_esbocs_validator_t *l_validator = it->data;
-        if (l_validator->is_synced ||
-                a_message_type == DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC) {
-            debug_if(PVT(a_session->esbocs)->debug, L_MSG, "Send pkt type 0x%x to "NODE_ADDR_FP_STR,
-                                                            a_message_type, NODE_ADDR_FP_ARGS_S(l_validator->node_addr));
+        if ( l_validator->is_synced || a_message_type == DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC ) {
+            debug_if(PVT(a_session->esbocs)->debug, L_MSG, "Send pkt type 0x%x to "NODE_ADDR_FP_STR, a_message_type,
+                                                           NODE_ADDR_FP_ARGS_S(l_validator->node_addr));
             l_message->hdr.recv_addr = l_validator->node_addr;
             l_message->hdr.sign_size = 0;
-            dap_sign_t *l_sign = dap_sign_create(PVT(a_session->esbocs)->blocks_sign_key, l_message,
-                                                 l_message_size, 0);
+            dap_sign_t *l_sign = dap_sign_create( PVT(a_session->esbocs)->blocks_sign_key, l_message, l_message_size, 0 );
             size_t l_sign_size = dap_sign_get_size(l_sign);
             l_message->hdr.sign_size = l_sign_size;
-            l_message = DAP_REALLOC(l_message, l_message_size + l_sign_size);
-            if (!l_message) {
-                log_it(L_CRITICAL, "%s", c_error_memory_alloc);
-                return;
-            }
+            dap_chain_esbocs_message_t *l_message_signed = DAP_REALLOC(l_message, l_message_size + l_sign_size);
+            if ( !l_message_signed )
+                return DAP_DELETE(l_sign), DAP_DELETE(l_message), log_it(L_CRITICAL, "%s", c_error_memory_alloc);
+            l_message = l_message_signed;
             memcpy(l_message->msg_n_sign + a_data_size, l_sign, l_sign_size);
             DAP_DELETE(l_sign);
-            
             if (l_validator->node_addr.uint64 != a_session->my_addr.uint64) {
                 dap_stream_ch_pkt_send_by_addr(&l_validator->node_addr, DAP_STREAM_CH_ESBOCS_ID,
                                                a_message_type, l_message, l_message_size + l_sign_size);
                 continue;
             }
-            struct esbocs_msg_args *l_args = DAP_NEW_SIZE(struct esbocs_msg_args,
+            /*struct esbocs_msg_args *l_args = DAP_NEW_SIZE(struct esbocs_msg_args,
                                                           sizeof(struct esbocs_msg_args) + l_message_size + l_sign_size);
             if (!l_args) {
                 log_it(L_CRITICAL, "%s", c_error_memory_alloc);
@@ -2752,7 +2754,8 @@ static void s_message_send(dap_chain_esbocs_session_t *a_session, uint8_t a_mess
             l_args->session = a_session;
             l_args->message_size = l_message_size + l_sign_size;
             memcpy(l_args->message, l_message, l_message_size + l_sign_size);
-            dap_proc_thread_callback_add(dap_worker_get_current()->proc_queue_input, s_process_incoming_message, l_args);
+            dap_proc_thread_callback_add(a_session->proc_thread, s_process_incoming_message, l_args);*/
+            s_session_packet_in(a_session, &a_session->my_addr, (byte_t*)l_message, l_message_size + l_sign_size);
         }
     }
     DAP_DELETE(l_message);
diff --git a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h
index 5a99570e8b..e26355cf47 100644
--- a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h
+++ b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h
@@ -180,8 +180,9 @@ typedef struct dap_chain_esbocs_penalty_item {
 #define DAP_CHAIN_ESBOCS_PENALTY_KICK   3U      // Number of missed rounds to kick
 
 typedef struct dap_chain_esbocs_session {
-    pthread_mutex_t mutex;
+    //pthread_mutex_t mutex;
     bool cs_timer;
+    dap_proc_thread_t *proc_thread;
     dap_chain_block_t *processing_candidate;
 
     dap_chain_t *chain;
-- 
GitLab