diff --git a/dap-sdk b/dap-sdk index 3800d14867e602f45b4ed6096193f5a0484b779b..7be39eec125851f08ce4283e5c289bf0bf87458c 160000 --- a/dap-sdk +++ b/dap-sdk @@ -1 +1 @@ -Subproject commit 3800d14867e602f45b4ed6096193f5a0484b779b +Subproject commit 7be39eec125851f08ce4283e5c289bf0bf87458c diff --git a/modules/consensus/esbocs/dap_chain_cs_esbocs.c b/modules/consensus/esbocs/dap_chain_cs_esbocs.c index 0d558436c012c236a7950982899d9417ac93e1a4..82067655f611b99a6b1ed0ec1091353fd30afbf7 100644 --- a/modules/consensus/esbocs/dap_chain_cs_esbocs.c +++ b/modules/consensus/esbocs/dap_chain_cs_esbocs.c @@ -51,6 +51,7 @@ enum s_esbocs_session_state { }; static dap_list_t *s_validator_check(dap_chain_addr_t *a_addr, dap_list_t *a_validators); +static void s_session_proc_state(void *a_arg); static void s_session_state_change(dap_chain_esbocs_session_t *a_session, enum s_esbocs_session_state a_new_state, dap_time_t a_time); static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg); static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain_node_addr_t *a_sender_node_addr, uint8_t *a_data, size_t a_data_size); @@ -65,7 +66,6 @@ static void s_session_candidate_verify(dap_chain_esbocs_session_t *a_session, da static void s_session_candidate_precommit(dap_chain_esbocs_session_t *a_session, dap_chain_esbocs_message_t *a_message); static void s_session_round_finish(dap_chain_esbocs_session_t *a_session, dap_chain_esbocs_store_t *l_store); -static bool s_session_timer(void *a_arg); static void s_message_send(dap_chain_esbocs_session_t *a_session, uint8_t a_message_type, dap_hash_fast_t *a_block_hash, const void *a_data, size_t a_data_size, dap_list_t *a_validators); static void s_message_chain_add(dap_chain_esbocs_session_t * a_session, @@ -576,8 +576,8 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf dap_chain_add_callback_notify(a_chain, s_new_atom_notifier, l_session); s_session_round_new(l_session); - l_session->cs_timer = dap_timerfd_start(1000, s_session_timer, l_session); - debug_if(l_esbocs_pvt->debug, L_MSG, "Consensus main timer is started"); + l_session->cs_timer = !dap_proc_thread_timer_add(NULL, s_session_proc_state, l_session, 1000); + debug_if(l_esbocs_pvt->debug && l_session->cs_timer, L_MSG, "Consensus main timer is started"); DAP_CHAIN_PVT(a_chain)->cs_started = true; return 0; @@ -619,10 +619,9 @@ void dap_chain_esbocs_stop_timer(dap_chain_net_id_t a_net_id) dap_chain_esbocs_session_t *l_session; DL_FOREACH(s_session_items, l_session) { if (l_session->chain->net_id.uint64 == a_net_id.uint64 && - l_session->cs_timer){ + l_session->cs_timer) { log_it(L_INFO, "Stop consensus timer for net: %s, chain: %s", dap_chain_net_by_id(a_net_id)->pub.name, l_session->chain->name); - dap_timerfd_delete_mt(l_session->cs_timer->worker, l_session->cs_timer->esocket_uuid); - l_session->cs_timer = NULL; + l_session->cs_timer = false; } } } @@ -631,9 +630,9 @@ void dap_chain_esbocs_start_timer(dap_chain_net_id_t a_net_id) { dap_chain_esbocs_session_t *l_session; DL_FOREACH(s_session_items, l_session) { - if (l_session->chain->net_id.uint64 == a_net_id.uint64){ + if (l_session->chain->net_id.uint64 == a_net_id.uint64) { log_it(L_INFO, "Start consensus timer for net: %s, chain: %s", dap_chain_net_by_id(a_net_id)->pub.name, l_session->chain->name); - l_session->cs_timer = dap_timerfd_start(1000, s_session_timer, l_session); + l_session->cs_timer = true; } } } @@ -707,7 +706,6 @@ static void s_callback_delete(dap_chain_cs_blocks_t *a_blocks) } pthread_mutex_lock(&l_session->mutex); DL_DELETE(s_session_items, l_session); - dap_timerfd_delete_mt(l_session->cs_timer->worker, l_session->cs_timer->esocket_uuid); s_session_round_clear(l_session); dap_chain_esbocs_sync_item_t *l_sync_item, *l_sync_tmp; HASH_ITER(hh, l_session->sync_items, l_sync_item, l_sync_tmp) { @@ -1326,119 +1324,115 @@ static void s_session_state_change(dap_chain_esbocs_session_t *a_session, enum s } } -static void s_session_proc_state(dap_chain_esbocs_session_t *a_session) +static void s_session_proc_state(void *a_arg) { - if (pthread_mutex_trylock(&a_session->mutex) != 0) + dap_chain_esbocs_session_t *l_session = a_arg; + if (!l_session->cs_timer) + return; // Timer is inactive + if (pthread_mutex_trylock(&l_session->mutex) != 0) return; // Session is busy - bool l_cs_debug = PVT(a_session->esbocs)->debug; + bool l_cs_debug = PVT(l_session->esbocs)->debug; dap_time_t l_time = dap_time_now(); - switch (a_session->state) { + switch (l_session->state) { case DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_START: { - a_session->listen_ensure = 1; - bool l_round_skip = PVT(a_session->esbocs)->emergency_mode ? - false : !s_validator_check(&a_session->my_signing_addr, a_session->cur_round.validators_list); - if (a_session->ts_round_sync_start && l_time - a_session->ts_round_sync_start >= - (dap_time_t)PVT(a_session->esbocs)->round_start_sync_timeout + - (a_session->sync_failed ? s_get_round_skip_timeout(a_session) : 0)) { - if (a_session->cur_round.attempt_num > PVT(a_session->esbocs)->round_attempts_max ) { - debug_if(PVT(a_session->esbocs)->debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U"." + l_session->listen_ensure = 1; + bool l_round_skip = PVT(l_session->esbocs)->emergency_mode ? + false : !s_validator_check(&l_session->my_signing_addr, l_session->cur_round.validators_list); + if (l_session->ts_round_sync_start && l_time - l_session->ts_round_sync_start >= + (dap_time_t)PVT(l_session->esbocs)->round_start_sync_timeout + + (l_session->sync_failed ? s_get_round_skip_timeout(l_session) : 0)) { + if (l_session->cur_round.attempt_num > PVT(l_session->esbocs)->round_attempts_max ) { + debug_if(PVT(l_session->esbocs)->debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U"." " Round finished by reason: attempts is out", - a_session->chain->net_name, a_session->chain->name, - a_session->cur_round.id); + l_session->chain->net_name, l_session->chain->name, + l_session->cur_round.id); dap_proc_thread_t *l_thread = DAP_PROC_THREAD(dap_context_current()); - dap_proc_thread_callback_add(l_thread, s_session_round_new, a_session); + dap_proc_thread_callback_add(l_thread, s_session_round_new, l_session); break; } - uint16_t l_min_validators_synced = PVT(a_session->esbocs)->emergency_mode ? - a_session->cur_round.total_validators_synced : a_session->cur_round.validators_synced_count; - if (l_min_validators_synced >= PVT(a_session->esbocs)->min_validators_count && !l_round_skip) { - a_session->cur_round.id = s_session_calc_current_round_id(a_session); + uint16_t l_min_validators_synced = PVT(l_session->esbocs)->emergency_mode ? + l_session->cur_round.total_validators_synced : l_session->cur_round.validators_synced_count; + if (l_min_validators_synced >= PVT(l_session->esbocs)->min_validators_count && !l_round_skip) { + l_session->cur_round.id = s_session_calc_current_round_id(l_session); debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Minimum count of validators are synchronized, wait to submit candidate", - a_session->chain->net_name, a_session->chain->name, - a_session->cur_round.id, a_session->cur_round.attempt_num); - s_session_state_change(a_session, DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_PROC, l_time); + l_session->chain->net_name, l_session->chain->name, + l_session->cur_round.id, l_session->cur_round.attempt_num); + s_session_state_change(l_session, DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_PROC, l_time); } else { // timeout start sync debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Round finished by reason: %s", - a_session->chain->net_name, a_session->chain->name, - a_session->cur_round.id, a_session->cur_round.attempt_num, + l_session->chain->net_name, l_session->chain->name, + l_session->cur_round.id, l_session->cur_round.attempt_num, l_round_skip ? "skipped" : "can't synchronize minimum number of validators"); - a_session->sync_failed = true; + l_session->sync_failed = true; dap_proc_thread_t *l_thread = DAP_PROC_THREAD(dap_context_current()); - dap_proc_thread_callback_add(l_thread, s_session_round_new, a_session); + dap_proc_thread_callback_add(l_thread, s_session_round_new, l_session); } } } break; case DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_PROC: - if (l_time - a_session->ts_stage_entry >= PVT(a_session->esbocs)->round_attempt_timeout * a_session->listen_ensure) { - a_session->listen_ensure += 2; + if (l_time - l_session->ts_stage_entry >= PVT(l_session->esbocs)->round_attempt_timeout * l_session->listen_ensure) { + l_session->listen_ensure += 2; debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Attempt finished by reason: haven't cantidate submitted", - a_session->chain->net_name, a_session->chain->name, - a_session->cur_round.id, a_session->cur_round.attempt_num); - s_session_attempt_new(a_session); + l_session->chain->net_name, l_session->chain->name, + l_session->cur_round.id, l_session->cur_round.attempt_num); + s_session_attempt_new(l_session); } break; case DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_SIGNS: - if (l_time - a_session->ts_stage_entry >= PVT(a_session->esbocs)->round_attempt_timeout) { + if (l_time - l_session->ts_stage_entry >= PVT(l_session->esbocs)->round_attempt_timeout) { dap_chain_esbocs_store_t *l_store; - HASH_FIND(hh, a_session->cur_round.store_items, &a_session->cur_round.attempt_candidate_hash, sizeof(dap_hash_fast_t), l_store); + HASH_FIND(hh, l_session->cur_round.store_items, &l_session->cur_round.attempt_candidate_hash, sizeof(dap_hash_fast_t), l_store); if (!l_store) { log_it(L_ERROR, "No round candidate found!"); - s_session_attempt_new(a_session); + s_session_attempt_new(l_session); break; } - if (dap_list_length(l_store->candidate_signs) >= PVT(a_session->esbocs)->min_validators_count) { + if (dap_list_length(l_store->candidate_signs) >= PVT(l_session->esbocs)->min_validators_count) { if(l_cs_debug) { - const char *l_candidate_hash_str = dap_chain_hash_fast_to_str_static(&a_session->cur_round.attempt_candidate_hash); + const char *l_candidate_hash_str = dap_chain_hash_fast_to_str_static(&l_session->cur_round.attempt_candidate_hash); log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu" " Candidate %s collected sings of minimum number of validators, so to sent PRE_COMMIT", - a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, - a_session->cur_round.attempt_num, l_candidate_hash_str); + l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, + l_session->cur_round.attempt_num, l_candidate_hash_str); } - s_session_state_change(a_session, DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_FINISH, l_time); + s_session_state_change(l_session, DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_FINISH, l_time); break; } debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Attempt finished by reason: cant't collect minimum number of validator's signs", - a_session->chain->net_name, a_session->chain->name, - a_session->cur_round.id, a_session->cur_round.attempt_num); - s_session_attempt_new(a_session); + l_session->chain->net_name, l_session->chain->name, + l_session->cur_round.id, l_session->cur_round.attempt_num); + s_session_attempt_new(l_session); } break; case DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_FINISH: - if (l_time - a_session->ts_stage_entry >= PVT(a_session->esbocs)->round_attempt_timeout * 2) { + if (l_time - l_session->ts_stage_entry >= PVT(l_session->esbocs)->round_attempt_timeout * 2) { debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Attempt finished by reason: cant't collect minimum number of validator's precommits with same final hash", - a_session->chain->net_name, a_session->chain->name, - a_session->cur_round.id, a_session->cur_round.attempt_num); - s_session_attempt_new(a_session); + l_session->chain->net_name, l_session->chain->name, + l_session->cur_round.id, l_session->cur_round.attempt_num); + s_session_attempt_new(l_session); } break; case DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_VOTING: - if (l_time - a_session->ts_stage_entry >= PVT(a_session->esbocs)->round_attempt_timeout * 2) { - const char *l_hash_str = dap_chain_hash_fast_to_str_static(&a_session->cur_round.directive_hash); + if (l_time - l_session->ts_stage_entry >= PVT(l_session->esbocs)->round_attempt_timeout * 2) { + const char *l_hash_str = dap_chain_hash_fast_to_str_static(&l_session->cur_round.directive_hash); debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Voting finished by reason: cant't collect minimum number of validator's votes for directive %s", - a_session->chain->net_name, a_session->chain->name, - a_session->cur_round.id, a_session->cur_round.attempt_num, + l_session->chain->net_name, l_session->chain->name, + l_session->cur_round.id, l_session->cur_round.attempt_num, l_hash_str); - s_session_state_change(a_session, DAP_CHAIN_ESBOCS_SESSION_STATE_PREVIOUS, l_time); + s_session_state_change(l_session, DAP_CHAIN_ESBOCS_SESSION_STATE_PREVIOUS, l_time); } break; default: break; } - pthread_mutex_unlock(&a_session->mutex); -} - -static bool s_session_timer(void *a_arg) -{ - dap_chain_esbocs_session_t *l_session = a_arg; - s_session_proc_state(l_session); - return true; + pthread_mutex_unlock(&l_session->mutex); } static void s_message_chain_add(dap_chain_esbocs_session_t *a_session, diff --git a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h index 6b2df2be82ec8da3a67bde9187717b4f06d3b2c8..15d7578f4956c74464aa9fd9ee7b9e97444816e7 100644 --- a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h +++ b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h @@ -182,7 +182,7 @@ typedef struct dap_chain_esbocs_penalty_item { typedef struct dap_chain_esbocs_session { pthread_mutex_t mutex; - dap_timerfd_t *cs_timer; + bool cs_timer; dap_chain_block_t *processing_candidate; dap_chain_t *chain;