From c628ccbf8ca7d7edef9ed108cbebce035689cc60 Mon Sep 17 00:00:00 2001 From: "pavel.uhanov" <pavel.uhanov@demlabs.net> Date: Thu, 12 Sep 2024 12:55:44 +0300 Subject: [PATCH] Revert "Hotfix 13068 5" This reverts commit 9d174754035e4dc03913314dfe293298021e0580. --- .../consensus/esbocs/dap_chain_cs_esbocs.c | 119 +++++++++--------- .../esbocs/include/dap_chain_cs_esbocs.h | 3 +- 2 files changed, 59 insertions(+), 63 deletions(-) diff --git a/modules/consensus/esbocs/dap_chain_cs_esbocs.c b/modules/consensus/esbocs/dap_chain_cs_esbocs.c index 9ec843ca71..9d4eeca4ee 100644 --- a/modules/consensus/esbocs/dap_chain_cs_esbocs.c +++ b/modules/consensus/esbocs/dap_chain_cs_esbocs.c @@ -427,7 +427,7 @@ static void s_new_atom_notifier(void *a_arg, dap_chain_t *a_chain, dap_chain_cel { dap_chain_esbocs_session_t *l_session = a_arg; assert(l_session->chain == a_chain); - //pthread_mutex_lock(&l_session->mutex); + pthread_mutex_lock(&l_session->mutex); dap_chain_hash_fast_t l_last_block_hash; dap_chain_get_atom_last_hash(l_session->chain, a_id, &l_last_block_hash); if (!dap_hash_fast_compare(&l_last_block_hash, &l_session->cur_round.last_block_hash) && @@ -435,7 +435,7 @@ static void s_new_atom_notifier(void *a_arg, dap_chain_t *a_chain, dap_chain_cel l_session->new_round_enqueued = true; s_session_round_new(l_session); } - //pthread_mutex_unlock(&l_session->mutex); + pthread_mutex_unlock(&l_session->mutex); if (!PVT(l_session->esbocs)->collecting_addr) return; dap_chain_esbocs_block_collect_t l_block_collect_params = (dap_chain_esbocs_block_collect_t){ @@ -486,7 +486,6 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf DAP_NEW_Z_RET_VAL(l_session, dap_chain_esbocs_session_t, -8, NULL); l_session->chain = a_chain; l_session->esbocs = l_esbocs; - l_session->proc_thread = dap_proc_thread_get_auto(); l_esbocs->session = l_session; DL_APPEND(s_session_items, l_session); log_it(L_INFO, "Init ESBOCS session for net:%s, chain:%s", a_chain->net_name, a_chain->name); @@ -593,11 +592,11 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf log_it(L_ERROR, "This validator is not allowed to work in emergency mode. Use special decree to supply it"); return -5; } - //pthread_mutex_init(&l_session->mutex, NULL); + pthread_mutex_init(&l_session->mutex, NULL); dap_chain_add_callback_notify(a_chain, s_new_atom_notifier, l_session); s_session_round_new(l_session); - l_session->cs_timer = !dap_proc_thread_timer_add(l_session->proc_thread, s_session_proc_state, l_session, 1000); + l_session->cs_timer = !dap_proc_thread_timer_add(NULL, s_session_proc_state, l_session, 1000); debug_if(l_esbocs_pvt->debug && l_session->cs_timer, L_MSG, "Consensus main timer is started"); DAP_CHAIN_PVT(a_chain)->cs_started = true; @@ -789,7 +788,7 @@ static void s_callback_delete(dap_chain_cs_blocks_t *a_blocks) log_it(L_INFO, "No session found"); return; } - //pthread_mutex_lock(&l_session->mutex); + pthread_mutex_lock(&l_session->mutex); DL_DELETE(s_session_items, l_session); s_session_round_clear(l_session); dap_chain_esbocs_sync_item_t *l_sync_item, *l_sync_tmp; @@ -803,8 +802,8 @@ static void s_callback_delete(dap_chain_cs_blocks_t *a_blocks) HASH_DEL(l_session->penalty, l_pen_item); DAP_DELETE(l_pen_item); } - //pthread_mutex_unlock(&l_session->mutex); - //pthread_mutex_destroy(&l_session->mutex); + pthread_mutex_unlock(&l_session->mutex); + pthread_mutex_destroy(&l_session->mutex); DAP_DEL_MULTY(l_session, a_blocks->_inheritor); // a_blocks->_inheritor - l_esbocs } @@ -1032,10 +1031,10 @@ static void s_session_send_startsync(dap_chain_esbocs_session_t *a_session) static bool s_session_send_startsync_on_timer(void *a_arg) { dap_chain_esbocs_session_t *l_session = a_arg; - //pthread_mutex_lock(&l_session->mutex); + pthread_mutex_lock(&l_session->mutex); s_session_send_startsync(l_session); l_session->sync_timer = NULL; - //pthread_mutex_unlock(&l_session->mutex); + pthread_mutex_unlock(&l_session->mutex); return false; } @@ -1462,7 +1461,8 @@ static void s_session_state_change(dap_chain_esbocs_session_t *a_session, enum s log_it(L_ERROR, "No previous state registered, can't roll back"); if (!a_session->new_round_enqueued) { a_session->new_round_enqueued = true; - dap_proc_thread_callback_add(a_session->proc_thread, s_session_round_new, a_session); + dap_proc_thread_t *l_thread = DAP_PROC_THREAD(dap_context_current()); + dap_proc_thread_callback_add(l_thread, s_session_round_new, a_session); } } } @@ -1476,8 +1476,8 @@ static void s_session_proc_state(void *a_arg) dap_chain_esbocs_session_t *l_session = a_arg; if (!l_session->cs_timer) return; // Timer is inactive - //if (pthread_mutex_trylock(&l_session->mutex) != 0) - // return; // Session is busy + if (pthread_mutex_trylock(&l_session->mutex) != 0) + return; // Session is busy bool l_cs_debug = PVT(l_session->esbocs)->debug; dap_time_t l_time = dap_time_now(); switch (l_session->state) { @@ -1495,7 +1495,8 @@ static void s_session_proc_state(void *a_arg) l_session->cur_round.id); if (!l_session->new_round_enqueued) { l_session->new_round_enqueued = true; - dap_proc_thread_callback_add(l_session->proc_thread, s_session_round_new, l_session); + dap_proc_thread_t *l_thread = DAP_PROC_THREAD(dap_context_current()); + dap_proc_thread_callback_add(l_thread, s_session_round_new, l_session); } break; } @@ -1517,7 +1518,8 @@ static void s_session_proc_state(void *a_arg) l_session->sync_failed = true; if (!l_session->new_round_enqueued) { l_session->new_round_enqueued = true; - dap_proc_thread_callback_add(l_session->proc_thread, s_session_round_new, l_session); + dap_proc_thread_t *l_thread = DAP_PROC_THREAD(dap_context_current()); + dap_proc_thread_callback_add(l_thread, s_session_round_new, l_session); } } } @@ -1583,7 +1585,7 @@ static void s_session_proc_state(void *a_arg) break; } - //pthread_mutex_unlock(&l_session->mutex); + pthread_mutex_unlock(&l_session->mutex); } static void s_message_chain_add(dap_chain_esbocs_session_t *a_session, @@ -2185,9 +2187,11 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) log_it(L_CRITICAL, "%s", c_error_memory_alloc); return false; } - *l_args = (struct esbocs_msg_args){ a_ch->stream->node, l_session, l_message_size }; + l_args->addr_from = a_ch->stream->node; + l_args->session = l_session; + l_args->message_size = l_message_size; memcpy(l_args->message, l_message, l_message_size); - dap_proc_thread_callback_add(l_session->proc_thread, s_process_incoming_message, l_args); + dap_proc_thread_callback_add(dap_worker_get_current()->proc_queue_input, s_process_incoming_message, l_args); return true; } @@ -2219,11 +2223,10 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain dap_hash_fast(l_message, a_data_size, &l_data_hash); if (a_sender_node_addr) { //Process network messages only - //pthread_mutex_lock(&l_session->mutex); + pthread_mutex_lock(&l_session->mutex); if (l_message->hdr.chain_id.uint64 != l_session->chain->id.uint64) { debug_if(l_cs_debug, L_MSG, "Invalid chain ID %"DAP_UINT64_FORMAT_U, l_message->hdr.chain_id.uint64); - //goto session_unlock; - return; + goto session_unlock; } // check hash message dup dap_chain_esbocs_message_item_t *l_message_item_temp = NULL; @@ -2233,8 +2236,7 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain " Message rejected: message hash is exists in chain (duplicate)", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num); - //goto session_unlock; - return; + goto session_unlock; } l_message->hdr.sign_size = 0; // restore header on signing time if (dap_sign_verify_all(l_sign, l_sign_size, l_message, l_message_data_size + sizeof(l_message->hdr))) { @@ -2242,8 +2244,7 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain " Message rejected from addr:"NODE_ADDR_FP_STR" not passed verification", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_session->cur_round.attempt_num, NODE_ADDR_FP_ARGS(a_sender_node_addr)); - //goto session_unlock; - return; + goto session_unlock; } l_message->hdr.sign_size = l_sign_size; // restore original header @@ -2255,8 +2256,7 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id); s_session_sync_queue_add(l_session, l_message, a_data_size); - //goto session_unlock; - return; + goto session_unlock; } } else if (l_message->hdr.round_id != l_session->cur_round.id) { // round check @@ -2291,8 +2291,7 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, s_voting_msg_type_to_str(l_message->hdr.type)); - //goto session_unlock; - return; + goto session_unlock; } } } @@ -2328,8 +2327,7 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain " Message rejected: validator key:%s not in the current validators list or not synced yet", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, l_validator_addr_str); - //goto session_unlock; - return; + goto session_unlock; } switch (l_message->hdr.type) { @@ -2389,7 +2387,8 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain l_session->cur_round.sync_attempt = l_sync_attempt - 1; if (!l_session->new_round_enqueued) { l_session->new_round_enqueued = true; - dap_proc_thread_callback_add(l_session->proc_thread, s_session_round_new, l_session); + dap_proc_thread_t *l_thread = DAP_PROC_THREAD(dap_context_current()); + dap_proc_thread_callback_add(l_thread, s_session_round_new, l_session); } } } @@ -2468,8 +2467,7 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain l_store = DAP_NEW_Z(dap_chain_esbocs_store_t); if (!l_store) { log_it(L_CRITICAL, "%s", c_error_memory_alloc); - //goto session_unlock; - return; + goto session_unlock; } l_store->candidate_size = l_candidate_size; l_store->candidate_hash = *l_candidate_hash; @@ -2695,9 +2693,9 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain default: break; } -//session_unlock: - //if (a_sender_node_addr) //Process network message - // pthread_mutex_unlock(&l_session->mutex); +session_unlock: + if (a_sender_node_addr) //Process network message + pthread_mutex_unlock(&l_session->mutex); } static void s_message_send(dap_chain_esbocs_session_t *a_session, uint8_t a_message_type, dap_hash_fast_t *a_block_hash, @@ -2706,44 +2704,44 @@ static void s_message_send(dap_chain_esbocs_session_t *a_session, uint8_t a_mess size_t l_message_size = sizeof(dap_chain_esbocs_message_hdr_t) + a_data_size; dap_chain_esbocs_message_t *l_message = NULL; DAP_NEW_Z_SIZE_RET(l_message, dap_chain_esbocs_message_t, l_message_size, NULL); - *l_message = (dap_chain_esbocs_message_t) { - .hdr = (dap_chain_esbocs_message_hdr_t) { - .version = DAP_CHAIN_ESBOCS_PROTOCOL_VERSION, - .type = a_message_type, - .attempt_num = a_session->cur_round.attempt_num, - .round_id = a_session->cur_round.id, - .message_size = a_data_size, - .ts_created = dap_time_now(), - .net_id = a_session->chain->net_id, - .chain_id = a_session->chain->id, - .candidate_hash = *a_block_hash - } - }; + l_message->hdr.version = DAP_CHAIN_ESBOCS_PROTOCOL_VERSION; + l_message->hdr.type = a_message_type; + l_message->hdr.round_id = a_session->cur_round.id; + l_message->hdr.attempt_num = a_session->cur_round.attempt_num; + l_message->hdr.net_id = a_session->chain->net_id; + l_message->hdr.chain_id = a_session->chain->id; + l_message->hdr.ts_created = dap_time_now(); + l_message->hdr.message_size = a_data_size; + l_message->hdr.candidate_hash = *a_block_hash; if (a_data && a_data_size) memcpy(l_message->msg_n_sign, a_data, a_data_size); for (dap_list_t *it = a_validators; it; it = it->next) { dap_chain_esbocs_validator_t *l_validator = it->data; - if ( l_validator->is_synced || a_message_type == DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC ) { - debug_if(PVT(a_session->esbocs)->debug, L_MSG, "Send pkt type 0x%x to "NODE_ADDR_FP_STR, a_message_type, - NODE_ADDR_FP_ARGS_S(l_validator->node_addr)); + if (l_validator->is_synced || + a_message_type == DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC) { + debug_if(PVT(a_session->esbocs)->debug, L_MSG, "Send pkt type 0x%x to "NODE_ADDR_FP_STR, + a_message_type, NODE_ADDR_FP_ARGS_S(l_validator->node_addr)); l_message->hdr.recv_addr = l_validator->node_addr; l_message->hdr.sign_size = 0; - dap_sign_t *l_sign = dap_sign_create( PVT(a_session->esbocs)->blocks_sign_key, l_message, l_message_size, 0 ); + dap_sign_t *l_sign = dap_sign_create(PVT(a_session->esbocs)->blocks_sign_key, l_message, + l_message_size, 0); size_t l_sign_size = dap_sign_get_size(l_sign); l_message->hdr.sign_size = l_sign_size; - dap_chain_esbocs_message_t *l_message_signed = DAP_REALLOC(l_message, l_message_size + l_sign_size); - if ( !l_message_signed ) - return DAP_DELETE(l_sign), DAP_DELETE(l_message), log_it(L_CRITICAL, "%s", c_error_memory_alloc); - l_message = l_message_signed; + l_message = DAP_REALLOC(l_message, l_message_size + l_sign_size); + if (!l_message) { + log_it(L_CRITICAL, "%s", c_error_memory_alloc); + return; + } memcpy(l_message->msg_n_sign + a_data_size, l_sign, l_sign_size); DAP_DELETE(l_sign); + if (l_validator->node_addr.uint64 != a_session->my_addr.uint64) { dap_stream_ch_pkt_send_by_addr(&l_validator->node_addr, DAP_STREAM_CH_ESBOCS_ID, a_message_type, l_message, l_message_size + l_sign_size); continue; } - /*struct esbocs_msg_args *l_args = DAP_NEW_SIZE(struct esbocs_msg_args, + struct esbocs_msg_args *l_args = DAP_NEW_SIZE(struct esbocs_msg_args, sizeof(struct esbocs_msg_args) + l_message_size + l_sign_size); if (!l_args) { log_it(L_CRITICAL, "%s", c_error_memory_alloc); @@ -2754,8 +2752,7 @@ static void s_message_send(dap_chain_esbocs_session_t *a_session, uint8_t a_mess l_args->session = a_session; l_args->message_size = l_message_size + l_sign_size; memcpy(l_args->message, l_message, l_message_size + l_sign_size); - dap_proc_thread_callback_add(a_session->proc_thread, s_process_incoming_message, l_args);*/ - s_session_packet_in(a_session, &a_session->my_addr, (byte_t*)l_message, l_message_size + l_sign_size); + dap_proc_thread_callback_add(dap_worker_get_current()->proc_queue_input, s_process_incoming_message, l_args); } } DAP_DELETE(l_message); diff --git a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h index e26355cf47..5a99570e8b 100644 --- a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h +++ b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h @@ -180,9 +180,8 @@ typedef struct dap_chain_esbocs_penalty_item { #define DAP_CHAIN_ESBOCS_PENALTY_KICK 3U // Number of missed rounds to kick typedef struct dap_chain_esbocs_session { - //pthread_mutex_t mutex; + pthread_mutex_t mutex; bool cs_timer; - dap_proc_thread_t *proc_thread; dap_chain_block_t *processing_candidate; dap_chain_t *chain; -- GitLab