From ce8dfa0a8afdc4dccfc479385063347d2890370c Mon Sep 17 00:00:00 2001
From: Roman Khlopkov <roman.khlopkov@demlabs.net>
Date: Fri, 7 Jul 2023 14:45:11 +0300
Subject: [PATCH] [*] Esbocs syncronyzation fixes

---
 .../consensus/esbocs/dap_chain_cs_esbocs.c    | 99 +++++++++++--------
 .../esbocs/include/dap_chain_cs_esbocs.h      |  2 +
 2 files changed, 58 insertions(+), 43 deletions(-)

diff --git a/modules/consensus/esbocs/dap_chain_cs_esbocs.c b/modules/consensus/esbocs/dap_chain_cs_esbocs.c
index 8bc3fbf1c0..464223aa16 100644
--- a/modules/consensus/esbocs/dap_chain_cs_esbocs.c
+++ b/modules/consensus/esbocs/dap_chain_cs_esbocs.c
@@ -131,6 +131,11 @@ typedef struct dap_chain_esbocs_pvt {
 
 #define PVT(a) ((dap_chain_esbocs_pvt_t *)a->_pvt)
 
+DAP_STATIC_INLINE uint16_t s_get_round_skip_timeout(dap_chain_esbocs_session_t *a_session)
+{
+    return PVT(a_session->esbocs)->round_attempt_timeout * 6 * PVT(a_session->esbocs)->round_attempts_max;
+}
+
 int dap_chain_cs_esbocs_init()
 {
     dap_stream_ch_chain_voting_init();
@@ -689,40 +694,45 @@ static void s_session_round_new(dap_chain_esbocs_session_t *a_session)
     }
     a_session->cur_round.validators_list = s_get_validators_list(a_session, a_session->cur_round.sync_attempt - 1);
     bool l_round_already_started = a_session->round_fast_forward;
-    if (s_validator_check(&a_session->my_signing_addr, a_session->cur_round.validators_list)) {
-        //I am a current round validator
-        dap_chain_esbocs_sync_item_t *l_item, *l_tmp;
-        HASH_FIND(hh, a_session->sync_items, &a_session->cur_round.last_block_hash, sizeof(dap_hash_fast_t), l_item);
-        if (l_item) {
-            debug_if(PVT(a_session->esbocs)->debug,
-                     L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U" already started. Process sync messages",
-                                a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id);
-            l_round_already_started = true;
-            for (dap_list_t *it = l_item->messages; it; it = it->next) {
-                dap_hash_fast_t l_msg_hash;
-                dap_chain_esbocs_message_t *l_msg = it->data;
-                size_t l_msg_size = s_get_esbocs_message_size(l_msg);
-                dap_hash_fast(l_msg, l_msg_size, &l_msg_hash);
-                s_session_packet_in(a_session, NULL, NULL, &l_msg_hash, (uint8_t *)l_msg, l_msg_size);
-            }
-        }
-        HASH_ITER(hh, a_session->sync_items, l_item, l_tmp) {
-            HASH_DEL(a_session->sync_items, l_item);
-            dap_list_free_full(l_item->messages, NULL);
-            DAP_DELETE(l_item);
-        }
-
+    dap_chain_esbocs_sync_item_t *l_item, *l_tmp;
+    HASH_FIND(hh, a_session->sync_items, &a_session->cur_round.last_block_hash, sizeof(dap_hash_fast_t), l_item);
+    if (l_item) {
+        debug_if(PVT(a_session->esbocs)->debug,
+                 L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U" already started. Process sync messages",
+                            a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id);
+        l_round_already_started = true;
+        for (dap_list_t *it = l_item->messages; it; it = it->next) {
+            dap_hash_fast_t l_msg_hash;
+            dap_chain_esbocs_message_t *l_msg = it->data;
+            size_t l_msg_size = s_get_esbocs_message_size(l_msg);
+            dap_hash_fast(l_msg, l_msg_size, &l_msg_hash);
+            s_session_packet_in(a_session, NULL, NULL, &l_msg_hash, (uint8_t *)l_msg, l_msg_size);
+        }
+    }
+    HASH_ITER(hh, a_session->sync_items, l_item, l_tmp) {
+        HASH_DEL(a_session->sync_items, l_item);
+        dap_list_free_full(l_item->messages, NULL);
+        DAP_DELETE(l_item);
+    }
+
+    if (!a_session->cur_round.sync_sent) {
+        uint16_t l_sync_send_delay =  a_session->sync_failed ?
+                                            s_get_round_skip_timeout(a_session) :
+                                            PVT(a_session->esbocs)->new_round_delay;
+        if (l_round_already_started)
+            l_sync_send_delay = 0;
         debug_if(PVT(a_session->esbocs)->debug, L_MSG,
                  "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U" start. Syncing validators in %u seconds",
                     a_session->chain->net_name, a_session->chain->name,
-                        a_session->cur_round.id, l_round_already_started ? 0 : PVT(a_session->esbocs)->new_round_delay);
+                        a_session->cur_round.id, l_sync_send_delay);
+        if (l_sync_send_delay)
+            a_session->sync_timer = dap_timerfd_start(l_sync_send_delay * 1000, s_session_send_startsync_on_timer, a_session);
+        else
+            s_session_send_startsync(a_session);
     }
-    if (PVT(a_session->esbocs)->new_round_delay && !l_round_already_started && !a_session->cur_round.sync_sent)
-        a_session->sync_timer = dap_timerfd_start(PVT(a_session->esbocs)->new_round_delay * 1000,
-                                                  s_session_send_startsync_on_timer, a_session);
-    else
-        s_session_send_startsync(a_session);
     a_session->round_fast_forward = false;
+    a_session->sync_failed = false;
+    a_session->listen_ensure = 0;
 }
 
 static void s_session_attempt_new(dap_chain_esbocs_session_t *a_session)
@@ -976,16 +986,15 @@ static void s_session_proc_state(dap_chain_esbocs_session_t *a_session)
 {
     if (pthread_mutex_trylock(&a_session->mutex) != 0)
         return; // Session is busy
-    static unsigned int l_listen_ensure;
     bool l_cs_debug = PVT(a_session->esbocs)->debug;
     dap_time_t l_time = dap_time_now();
     switch (a_session->state) {
     case DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_START: {
-        l_listen_ensure = 1;
+        a_session->listen_ensure = 1;
         dap_time_t l_round_timeout = PVT(a_session->esbocs)->round_start_sync_timeout;
         bool l_round_skip = !s_validator_check(&a_session->my_signing_addr, a_session->cur_round.validators_list);
         if (l_round_skip)
-            l_round_timeout += PVT(a_session->esbocs)->round_attempt_timeout * 6 * PVT(a_session->esbocs)->round_attempts_max;
+            l_round_timeout += s_get_round_skip_timeout(a_session);
         if (a_session->ts_round_sync_start && l_time - a_session->ts_round_sync_start >= l_round_timeout) {
             if (a_session->cur_round.validators_synced_count >= PVT(a_session->esbocs)->min_validators_count && !l_round_skip) {
                 a_session->cur_round.id = s_session_calc_current_round_id(a_session);
@@ -1000,13 +1009,14 @@ static void s_session_proc_state(dap_chain_esbocs_session_t *a_session)
                                                 a_session->chain->net_name, a_session->chain->name,
                                                     a_session->cur_round.id, a_session->cur_round.attempt_num,
                                                         l_round_skip ? "skipped" : "can't synchronize minimum number of validators");
+                a_session->sync_failed = true;
                 s_session_round_new(a_session);
             }
         }
     } break;
     case DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_PROC:
-        if (l_time - a_session->ts_stage_entry >= PVT(a_session->esbocs)->round_attempt_timeout * l_listen_ensure) {
-            l_listen_ensure += 2;
+        if (l_time - a_session->ts_stage_entry >= PVT(a_session->esbocs)->round_attempt_timeout * a_session->listen_ensure) {
+            a_session->listen_ensure += 2;
             debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu."
                                         " Attempt finished by reason: haven't cantidate submitted",
                                             a_session->chain->net_name, a_session->chain->name,
@@ -1015,7 +1025,6 @@ static void s_session_proc_state(dap_chain_esbocs_session_t *a_session)
         }
         break;
     case DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_SIGNS:
-        l_listen_ensure = 1;
         if (l_time - a_session->ts_stage_entry >= PVT(a_session->esbocs)->round_attempt_timeout) {
             dap_chain_esbocs_store_t *l_store;
             HASH_FIND(hh, a_session->cur_round.store_items, &a_session->cur_round.attempt_candidate_hash, sizeof(dap_hash_fast_t), l_store);
@@ -1849,6 +1858,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod
                                              " is greater than meassage sync attempt %"DAP_UINT64_FORMAT_U,
                                                 l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id,
                                                     l_session->cur_round.sync_attempt, l_sync_attempt);
+                 break;
             } else {
                 uint64_t l_attempts_miss = l_sync_attempt - l_session->cur_round.sync_attempt;
                 uint32_t l_attempts_miss_max = UINT16_MAX; // TODO calculate it rely on last block aceeption time & min round duration
@@ -1857,6 +1867,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod
                                                 " SYNC message is rejected - too much sync attempt difference %"DAP_UINT64_FORMAT_U,
                                                    l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id,
                                                        l_attempts_miss);
+                    break;
                 } else {
                     debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U
                                                 " SYNC message sync attempt %"DAP_UINT64_FORMAT_U" is greater than"
@@ -1879,14 +1890,16 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod
         if (!l_list)
             break;
         dap_chain_esbocs_validator_t *l_validator = l_list->data;
-        l_validator->is_synced = true;
-        if (++l_session->cur_round.validators_synced_count == dap_list_length(l_session->cur_round.validators_list)) {
-            l_session->cur_round.id = s_session_calc_current_round_id(l_session);
-            debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu."
-                                        " All validators are synchronized, wait to submit candidate",
-                                            l_session->chain->net_name, l_session->chain->name,
-                                                l_session->cur_round.id, l_message->hdr.attempt_num);
-            s_session_state_change(l_session, DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_PROC, dap_time_now());
+        if (!l_validator->is_synced) {
+            l_validator->is_synced = true;
+            if (++l_session->cur_round.validators_synced_count == dap_list_length(l_session->cur_round.validators_list)) {
+                l_session->cur_round.id = s_session_calc_current_round_id(l_session);
+                debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu."
+                                            " All validators are synchronized, wait to submit candidate",
+                                                l_session->chain->net_name, l_session->chain->name,
+                                                    l_session->cur_round.id, l_message->hdr.attempt_num);
+                s_session_state_change(l_session, DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_PROC, dap_time_now());
+            }
         }
     } break;
 
diff --git a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h
index 6d99bb1b69..e797c1dda3 100644
--- a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h
+++ b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h
@@ -161,6 +161,8 @@ typedef struct dap_chain_esbocs_session {
     uint8_t old_state; // for previous state return
     dap_chain_esbocs_round_t cur_round;
     bool round_fast_forward;
+    unsigned int listen_ensure;
+    bool sync_failed;
 
     dap_time_t ts_round_sync_start; // time of start sync
     dap_time_t ts_stage_entry; // time of current stage entrance
-- 
GitLab