diff --git a/modules/consensus/esbocs/dap_chain_cs_esbocs.c b/modules/consensus/esbocs/dap_chain_cs_esbocs.c index 9d4eeca4ee44ad4c2228e1d0a5727f2b9f86bd86..9ec843ca7105ac76258eb40e2b6e84c60f4aea18 100644 --- a/modules/consensus/esbocs/dap_chain_cs_esbocs.c +++ b/modules/consensus/esbocs/dap_chain_cs_esbocs.c @@ -427,7 +427,7 @@ static void s_new_atom_notifier(void *a_arg, dap_chain_t *a_chain, dap_chain_cel { dap_chain_esbocs_session_t *l_session = a_arg; assert(l_session->chain == a_chain); - pthread_mutex_lock(&l_session->mutex); + //pthread_mutex_lock(&l_session->mutex); dap_chain_hash_fast_t l_last_block_hash; dap_chain_get_atom_last_hash(l_session->chain, a_id, &l_last_block_hash); if (!dap_hash_fast_compare(&l_last_block_hash, &l_session->cur_round.last_block_hash) && @@ -435,7 +435,7 @@ static void s_new_atom_notifier(void *a_arg, dap_chain_t *a_chain, dap_chain_cel l_session->new_round_enqueued = true; s_session_round_new(l_session); } - pthread_mutex_unlock(&l_session->mutex); + //pthread_mutex_unlock(&l_session->mutex); if (!PVT(l_session->esbocs)->collecting_addr) return; dap_chain_esbocs_block_collect_t l_block_collect_params = (dap_chain_esbocs_block_collect_t){ @@ -486,6 +486,7 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf DAP_NEW_Z_RET_VAL(l_session, dap_chain_esbocs_session_t, -8, NULL); l_session->chain = a_chain; l_session->esbocs = l_esbocs; + l_session->proc_thread = dap_proc_thread_get_auto(); l_esbocs->session = l_session; DL_APPEND(s_session_items, l_session); log_it(L_INFO, "Init ESBOCS session for net:%s, chain:%s", a_chain->net_name, a_chain->name); @@ -592,11 +593,11 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf log_it(L_ERROR, "This validator is not allowed to work in emergency mode. Use special decree to supply it"); return -5; } - pthread_mutex_init(&l_session->mutex, NULL); + //pthread_mutex_init(&l_session->mutex, NULL); dap_chain_add_callback_notify(a_chain, s_new_atom_notifier, l_session); s_session_round_new(l_session); - l_session->cs_timer = !dap_proc_thread_timer_add(NULL, s_session_proc_state, l_session, 1000); + l_session->cs_timer = !dap_proc_thread_timer_add(l_session->proc_thread, s_session_proc_state, l_session, 1000); debug_if(l_esbocs_pvt->debug && l_session->cs_timer, L_MSG, "Consensus main timer is started"); DAP_CHAIN_PVT(a_chain)->cs_started = true; @@ -788,7 +789,7 @@ static void s_callback_delete(dap_chain_cs_blocks_t *a_blocks) log_it(L_INFO, "No session found"); return; } - pthread_mutex_lock(&l_session->mutex); + //pthread_mutex_lock(&l_session->mutex); DL_DELETE(s_session_items, l_session); s_session_round_clear(l_session); dap_chain_esbocs_sync_item_t *l_sync_item, *l_sync_tmp; @@ -802,8 +803,8 @@ static void s_callback_delete(dap_chain_cs_blocks_t *a_blocks) HASH_DEL(l_session->penalty, l_pen_item); DAP_DELETE(l_pen_item); } - pthread_mutex_unlock(&l_session->mutex); - pthread_mutex_destroy(&l_session->mutex); + //pthread_mutex_unlock(&l_session->mutex); + //pthread_mutex_destroy(&l_session->mutex); DAP_DEL_MULTY(l_session, a_blocks->_inheritor); // a_blocks->_inheritor - l_esbocs } @@ -1031,10 +1032,10 @@ static void s_session_send_startsync(dap_chain_esbocs_session_t *a_session) static bool s_session_send_startsync_on_timer(void *a_arg) { dap_chain_esbocs_session_t *l_session = a_arg; - pthread_mutex_lock(&l_session->mutex); + //pthread_mutex_lock(&l_session->mutex); s_session_send_startsync(l_session); l_session->sync_timer = NULL; - pthread_mutex_unlock(&l_session->mutex); + //pthread_mutex_unlock(&l_session->mutex); return false; } @@ -1461,8 +1462,7 @@ static void s_session_state_change(dap_chain_esbocs_session_t *a_session, enum s log_it(L_ERROR, "No previous state registered, can't roll back"); if (!a_session->new_round_enqueued) { a_session->new_round_enqueued = true; - dap_proc_thread_t *l_thread = DAP_PROC_THREAD(dap_context_current()); - dap_proc_thread_callback_add(l_thread, s_session_round_new, a_session); + dap_proc_thread_callback_add(a_session->proc_thread, s_session_round_new, a_session); } } } @@ -1476,8 +1476,8 @@ static void s_session_proc_state(void *a_arg) dap_chain_esbocs_session_t *l_session = a_arg; if (!l_session->cs_timer) return; // Timer is inactive - if (pthread_mutex_trylock(&l_session->mutex) != 0) - return; // Session is busy + //if (pthread_mutex_trylock(&l_session->mutex) != 0) + // return; // Session is busy bool l_cs_debug = PVT(l_session->esbocs)->debug; dap_time_t l_time = dap_time_now(); switch (l_session->state) { @@ -1495,8 +1495,7 @@ static void s_session_proc_state(void *a_arg) l_session->cur_round.id); if (!l_session->new_round_enqueued) { l_session->new_round_enqueued = true; - dap_proc_thread_t *l_thread = DAP_PROC_THREAD(dap_context_current()); - dap_proc_thread_callback_add(l_thread, s_session_round_new, l_session); + dap_proc_thread_callback_add(l_session->proc_thread, s_session_round_new, l_session); } break; } @@ -1518,8 +1517,7 @@ static void s_session_proc_state(void *a_arg) l_session->sync_failed = true; if (!l_session->new_round_enqueued) { l_session->new_round_enqueued = true; - dap_proc_thread_t *l_thread = DAP_PROC_THREAD(dap_context_current()); - dap_proc_thread_callback_add(l_thread, s_session_round_new, l_session); + dap_proc_thread_callback_add(l_session->proc_thread, s_session_round_new, l_session); } } } @@ -1585,7 +1583,7 @@ static void s_session_proc_state(void *a_arg) break; } - pthread_mutex_unlock(&l_session->mutex); + //pthread_mutex_unlock(&l_session->mutex); } static void s_message_chain_add(dap_chain_esbocs_session_t *a_session, @@ -2187,11 +2185,9 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) log_it(L_CRITICAL, "%s", c_error_memory_alloc); return false; } - l_args->addr_from = a_ch->stream->node; - l_args->session = l_session; - l_args->message_size = l_message_size; + *l_args = (struct esbocs_msg_args){ a_ch->stream->node, l_session, l_message_size }; memcpy(l_args->message, l_message, l_message_size); - dap_proc_thread_callback_add(dap_worker_get_current()->proc_queue_input, s_process_incoming_message, l_args); + dap_proc_thread_callback_add(l_session->proc_thread, s_process_incoming_message, l_args); return true; } @@ -2223,10 +2219,11 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain dap_hash_fast(l_message, a_data_size, &l_data_hash); if (a_sender_node_addr) { //Process network messages only - pthread_mutex_lock(&l_session->mutex); + //pthread_mutex_lock(&l_session->mutex); if (l_message->hdr.chain_id.uint64 != l_session->chain->id.uint64) { debug_if(l_cs_debug, L_MSG, "Invalid chain ID %"DAP_UINT64_FORMAT_U, l_message->hdr.chain_id.uint64); - goto session_unlock; + //goto session_unlock; + return; } // check hash message dup dap_chain_esbocs_message_item_t *l_message_item_temp = NULL; @@ -2236,7 +2233,8 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain " Message rejected: message hash is exists in chain (duplicate)", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num); - goto session_unlock; + //goto session_unlock; + return; } l_message->hdr.sign_size = 0; // restore header on signing time if (dap_sign_verify_all(l_sign, l_sign_size, l_message, l_message_data_size + sizeof(l_message->hdr))) { @@ -2244,7 +2242,8 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain " Message rejected from addr:"NODE_ADDR_FP_STR" not passed verification", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_session->cur_round.attempt_num, NODE_ADDR_FP_ARGS(a_sender_node_addr)); - goto session_unlock; + //goto session_unlock; + return; } l_message->hdr.sign_size = l_sign_size; // restore original header @@ -2256,7 +2255,8 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id); s_session_sync_queue_add(l_session, l_message, a_data_size); - goto session_unlock; + //goto session_unlock; + return; } } else if (l_message->hdr.round_id != l_session->cur_round.id) { // round check @@ -2291,7 +2291,8 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, s_voting_msg_type_to_str(l_message->hdr.type)); - goto session_unlock; + //goto session_unlock; + return; } } } @@ -2327,7 +2328,8 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain " Message rejected: validator key:%s not in the current validators list or not synced yet", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, l_validator_addr_str); - goto session_unlock; + //goto session_unlock; + return; } switch (l_message->hdr.type) { @@ -2387,8 +2389,7 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain l_session->cur_round.sync_attempt = l_sync_attempt - 1; if (!l_session->new_round_enqueued) { l_session->new_round_enqueued = true; - dap_proc_thread_t *l_thread = DAP_PROC_THREAD(dap_context_current()); - dap_proc_thread_callback_add(l_thread, s_session_round_new, l_session); + dap_proc_thread_callback_add(l_session->proc_thread, s_session_round_new, l_session); } } } @@ -2467,7 +2468,8 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain l_store = DAP_NEW_Z(dap_chain_esbocs_store_t); if (!l_store) { log_it(L_CRITICAL, "%s", c_error_memory_alloc); - goto session_unlock; + //goto session_unlock; + return; } l_store->candidate_size = l_candidate_size; l_store->candidate_hash = *l_candidate_hash; @@ -2693,9 +2695,9 @@ static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain default: break; } -session_unlock: - if (a_sender_node_addr) //Process network message - pthread_mutex_unlock(&l_session->mutex); +//session_unlock: + //if (a_sender_node_addr) //Process network message + // pthread_mutex_unlock(&l_session->mutex); } static void s_message_send(dap_chain_esbocs_session_t *a_session, uint8_t a_message_type, dap_hash_fast_t *a_block_hash, @@ -2704,44 +2706,44 @@ static void s_message_send(dap_chain_esbocs_session_t *a_session, uint8_t a_mess size_t l_message_size = sizeof(dap_chain_esbocs_message_hdr_t) + a_data_size; dap_chain_esbocs_message_t *l_message = NULL; DAP_NEW_Z_SIZE_RET(l_message, dap_chain_esbocs_message_t, l_message_size, NULL); - l_message->hdr.version = DAP_CHAIN_ESBOCS_PROTOCOL_VERSION; - l_message->hdr.type = a_message_type; - l_message->hdr.round_id = a_session->cur_round.id; - l_message->hdr.attempt_num = a_session->cur_round.attempt_num; - l_message->hdr.net_id = a_session->chain->net_id; - l_message->hdr.chain_id = a_session->chain->id; - l_message->hdr.ts_created = dap_time_now(); - l_message->hdr.message_size = a_data_size; - l_message->hdr.candidate_hash = *a_block_hash; + *l_message = (dap_chain_esbocs_message_t) { + .hdr = (dap_chain_esbocs_message_hdr_t) { + .version = DAP_CHAIN_ESBOCS_PROTOCOL_VERSION, + .type = a_message_type, + .attempt_num = a_session->cur_round.attempt_num, + .round_id = a_session->cur_round.id, + .message_size = a_data_size, + .ts_created = dap_time_now(), + .net_id = a_session->chain->net_id, + .chain_id = a_session->chain->id, + .candidate_hash = *a_block_hash + } + }; if (a_data && a_data_size) memcpy(l_message->msg_n_sign, a_data, a_data_size); for (dap_list_t *it = a_validators; it; it = it->next) { dap_chain_esbocs_validator_t *l_validator = it->data; - if (l_validator->is_synced || - a_message_type == DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC) { - debug_if(PVT(a_session->esbocs)->debug, L_MSG, "Send pkt type 0x%x to "NODE_ADDR_FP_STR, - a_message_type, NODE_ADDR_FP_ARGS_S(l_validator->node_addr)); + if ( l_validator->is_synced || a_message_type == DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC ) { + debug_if(PVT(a_session->esbocs)->debug, L_MSG, "Send pkt type 0x%x to "NODE_ADDR_FP_STR, a_message_type, + NODE_ADDR_FP_ARGS_S(l_validator->node_addr)); l_message->hdr.recv_addr = l_validator->node_addr; l_message->hdr.sign_size = 0; - dap_sign_t *l_sign = dap_sign_create(PVT(a_session->esbocs)->blocks_sign_key, l_message, - l_message_size, 0); + dap_sign_t *l_sign = dap_sign_create( PVT(a_session->esbocs)->blocks_sign_key, l_message, l_message_size, 0 ); size_t l_sign_size = dap_sign_get_size(l_sign); l_message->hdr.sign_size = l_sign_size; - l_message = DAP_REALLOC(l_message, l_message_size + l_sign_size); - if (!l_message) { - log_it(L_CRITICAL, "%s", c_error_memory_alloc); - return; - } + dap_chain_esbocs_message_t *l_message_signed = DAP_REALLOC(l_message, l_message_size + l_sign_size); + if ( !l_message_signed ) + return DAP_DELETE(l_sign), DAP_DELETE(l_message), log_it(L_CRITICAL, "%s", c_error_memory_alloc); + l_message = l_message_signed; memcpy(l_message->msg_n_sign + a_data_size, l_sign, l_sign_size); DAP_DELETE(l_sign); - if (l_validator->node_addr.uint64 != a_session->my_addr.uint64) { dap_stream_ch_pkt_send_by_addr(&l_validator->node_addr, DAP_STREAM_CH_ESBOCS_ID, a_message_type, l_message, l_message_size + l_sign_size); continue; } - struct esbocs_msg_args *l_args = DAP_NEW_SIZE(struct esbocs_msg_args, + /*struct esbocs_msg_args *l_args = DAP_NEW_SIZE(struct esbocs_msg_args, sizeof(struct esbocs_msg_args) + l_message_size + l_sign_size); if (!l_args) { log_it(L_CRITICAL, "%s", c_error_memory_alloc); @@ -2752,7 +2754,8 @@ static void s_message_send(dap_chain_esbocs_session_t *a_session, uint8_t a_mess l_args->session = a_session; l_args->message_size = l_message_size + l_sign_size; memcpy(l_args->message, l_message, l_message_size + l_sign_size); - dap_proc_thread_callback_add(dap_worker_get_current()->proc_queue_input, s_process_incoming_message, l_args); + dap_proc_thread_callback_add(a_session->proc_thread, s_process_incoming_message, l_args);*/ + s_session_packet_in(a_session, &a_session->my_addr, (byte_t*)l_message, l_message_size + l_sign_size); } } DAP_DELETE(l_message); diff --git a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h index 5a99570e8ba8fcc13714422ddb36179ac177e934..e26355cf476bd50dc4a8496d4829c7c0ceedd057 100644 --- a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h +++ b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h @@ -180,8 +180,9 @@ typedef struct dap_chain_esbocs_penalty_item { #define DAP_CHAIN_ESBOCS_PENALTY_KICK 3U // Number of missed rounds to kick typedef struct dap_chain_esbocs_session { - pthread_mutex_t mutex; + //pthread_mutex_t mutex; bool cs_timer; + dap_proc_thread_t *proc_thread; dap_chain_block_t *processing_candidate; dap_chain_t *chain;