From c44e90d6b11d9a47c01c0c6a3034198f994b2818 Mon Sep 17 00:00:00 2001 From: cellframe <roman.khlopkov@demlabs.net> Date: Fri, 16 Dec 2022 20:07:23 +0300 Subject: [PATCH] [*] TON structure fixes --- .../block-ton/dap_chain_cs_block_ton.c | 247 +++++++++--------- .../include/dap_chain_cs_block_ton.h | 1 + 2 files changed, 123 insertions(+), 125 deletions(-) diff --git a/modules/consensus/block-ton/dap_chain_cs_block_ton.c b/modules/consensus/block-ton/dap_chain_cs_block_ton.c index 202342b72a..c5193bd9e3 100644 --- a/modules/consensus/block-ton/dap_chain_cs_block_ton.c +++ b/modules/consensus/block-ton/dap_chain_cs_block_ton.c @@ -29,10 +29,8 @@ static void s_message_send(dap_chain_cs_block_ton_session_t *a_session, uint8_t static void s_message_chain_add(dap_chain_cs_block_ton_session_t * a_session, dap_chain_node_addr_t * a_sender_node_addr, dap_chain_cs_block_ton_message_t * a_message, size_t a_message_size, dap_chain_hash_fast_t *a_message_hash); -static void s_session_round_start(dap_chain_cs_block_ton_session_t *a_session); +static void s_session_round_clear(dap_chain_cs_block_ton_session_t *a_session); static bool s_session_send_startsync(dap_chain_cs_block_ton_session_t *a_session); -static bool s_session_round_start_callback_load_session_store(dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg); static void s_session_my_candidate_delete(dap_chain_cs_block_ton_session_t *a_session); static void s_session_round_finish(dap_chain_cs_block_ton_session_t *a_session, bool a_failed); static dap_chain_node_addr_t *s_session_get_validator( @@ -314,6 +312,7 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf l_session->my_addr->uint64 = dap_chain_net_get_cur_addr_int(l_net); l_session->cur_round.id.uint64 = 1000; + pthread_rwlock_init(&l_session->cur_round.messages_rwlock, NULL); l_session->gdb_group_store = dap_strdup_printf("local.ton.%s.%s.store", a_chain->net_name, a_chain->name); l_session->gdb_group_message = dap_strdup_printf("local.ton.%s.%s.message", @@ -347,20 +346,6 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf return 0; } -/** - * @brief s_session_round_start - * @param a_session - */ -static void s_session_round_start(dap_chain_cs_block_ton_session_t *a_session) -{ - memset(&a_session->cur_round, 0, sizeof(a_session->cur_round)); - a_session->ts_round_start = 0; - a_session->ts_round_state_commit = 0; - a_session->attempt_current_number = 1; - a_session->ts_round_sync_start = dap_time_now(); - a_session->cur_round.id.uint64++; -} - /** * @brief s_session_send_startsync * @param a_session @@ -486,8 +471,13 @@ static void s_session_proc_state( dap_chain_cs_block_ton_session_t * a_session) && (l_time-a_session->ts_round_finish) >= PVT(a_session->ton)->session_idle_min) { // round start + s_session_round_clear(a_session); + a_session->cur_round.id.uint64++; a_session->state = DAP_STREAM_CH_CHAIN_SESSION_STATE_WAIT_START; - s_session_round_start(a_session); + a_session->ts_round_start = 0; + a_session->ts_round_state_commit = 0; + a_session->attempt_current_number = 1; + a_session->ts_round_sync_start = dap_time_now(); dap_chain_node_mempool_process_all(a_session->chain); dap_list_free_full(a_session->cur_round.validators_list, free); a_session->cur_round.validators_list = s_get_validators_addr_list(a_session); @@ -502,7 +492,7 @@ static void s_session_proc_state( dap_chain_cs_block_ton_session_t * a_session) break; } case DAP_STREAM_CH_CHAIN_SESSION_STATE_WAIT_START: { - if ( (l_time-a_session->ts_round_sync_start) < PVT(a_session->ton)->round_start_sync_timeout ) + if ( (l_time - a_session->ts_round_sync_start) < PVT(a_session->ton)->round_start_sync_timeout ) break; // timeout start sync uint16_t l_startsync_count = a_session->cur_round.validators_start_count; if (l_startsync_count * 3 >= a_session->cur_round.validators_count * 2) { @@ -652,6 +642,7 @@ static void s_session_candidate_to_chain( dap_list_t *l_commitsign_list = NULL; dap_chain_hash_fast_t *l_candidate_hash = NULL; dap_chain_cs_block_ton_message_item_t *l_message_item=NULL, *l_message_tmp=NULL; + pthread_rwlock_rdlock(&a_session->cur_round.messages_rwlock); HASH_ITER(hh, a_session->cur_round.messages_items, l_message_item, l_message_tmp) { uint8_t l_message_type = l_message_item->message->hdr.type; if ( l_message_type == DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_COMMIT_SIGN ) { @@ -662,6 +653,7 @@ static void s_session_candidate_to_chain( } } } + pthread_rwlock_unlock(&a_session->cur_round.messages_rwlock); if (!l_commitsign_list) { return; } @@ -806,11 +798,14 @@ static void s_session_round_clear(dap_chain_cs_block_ton_session_t *a_session) // Truncate this group dap_global_db_del(a_session->gdb_group_store, NULL, NULL, NULL); dap_chain_cs_block_ton_message_item_t *l_message_item=NULL, *l_message_tmp=NULL; + pthread_rwlock_wrlock(&a_session->cur_round.messages_rwlock); HASH_ITER(hh, a_session->cur_round.messages_items, l_message_item, l_message_tmp) { HASH_DEL(a_session->cur_round.messages_items, l_message_item); DAP_DELETE(l_message_item->message); DAP_DELETE(l_message_item); } + pthread_rwlock_unlock(&a_session->cur_round.messages_rwlock); + pthread_rwlock_destroy(&a_session->cur_round.messages_rwlock); if (a_session->cur_round.validators_start) // delete only links @@ -821,6 +816,8 @@ static void s_session_round_clear(dap_chain_cs_block_ton_session_t *a_session) dap_chain_cs_block_ton_round_id_t l_round_id = a_session->cur_round.id; a_session->cur_round = (dap_chain_cs_block_ton_round_t){.id = l_round_id}; + pthread_rwlock_init(&a_session->cur_round.messages_rwlock, NULL); + log_it(L_MSG, "All data for round %"DAP_UINT64_FORMAT_U" cleared", l_round_id.uint64); } /** @@ -1109,23 +1106,20 @@ static void s_callback_get_candidate_block_and_commit_sign(dap_global_db_context // check candidate hash sign if ( (l_sign_verified=dap_sign_verify(l_candidate_sign, l_candidate, l_offset+sizeof(l_candidate->hdr))) == 1 ) { - l_message->hdr.is_verified = true; l_store->hdr.sign_collected = true; + s_message_chain_add(l_session, &l_sender_node_addr, l_message, l_message_size, NULL); if (dap_global_db_set_unsafe(a_global_db_context, l_session->gdb_group_store, l_candidate_hash_str, l_store, l_store_size, true) == 0 ) { uint16_t l_commitsign_count = s_session_message_count( l_session, DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_COMMIT_SIGN, &l_candidate_hash, NULL); - l_commitsign_count++; if (l_commitsign_count * 3 >= l_round->validators_count * 2) { if (PVT(l_session->ton)->debug) log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Candidate:%s collected COMMIT_SIGN more than 2/3 of the validators, so finish this round", l_session->chain->net_name, l_session->chain->name, l_round->id.uint64, l_session->attempt_current_number, l_candidate_hash_str); - if (l_round_id.uint64 == l_session->cur_round.id.uint64) { - s_message_chain_add(l_session, &l_sender_node_addr, l_message, l_message_size, NULL); + if (l_round_id.uint64 == l_session->cur_round.id.uint64) s_session_round_finish(l_session, false); - } } } } else @@ -1456,9 +1450,8 @@ static void s_callback_check_and_save_candidate_block (dap_global_db_context_t static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_node_addr, dap_chain_hash_fast_t *a_data_hash, uint8_t *a_data, size_t a_data_size) { - dap_chain_cs_block_ton_session_t *l_session = (dap_chain_cs_block_ton_session_t *)a_arg; - dap_chain_cs_block_ton_message_t *l_message = - (dap_chain_cs_block_ton_message_t *)DAP_DUP_SIZE(a_data, a_data_size); + dap_chain_cs_block_ton_session_t *l_session = a_arg; + dap_chain_cs_block_ton_message_t *l_message = (dap_chain_cs_block_ton_message_t *)a_data; if (PVT(l_session->ton)->debug) log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Receive pkt type:%x from addr:"NODE_ADDR_FP_STR", my_addr:"NODE_ADDR_FP_STR"", @@ -1469,7 +1462,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod if( sizeof(*l_message)+l_message->hdr.sign_size > a_data_size){ log_it(L_WARNING, "TON: incorrect message size in header is %zu when data size is only %zu and header size is %zu", l_message->hdr.sign_size, a_data_size, sizeof(*l_message)); - goto handler_finish; + return; } size_t l_message_data_size = a_data_size - sizeof(*l_message) - l_message->hdr.sign_size ; byte_t * l_message_data = l_message->sign_n_message + l_message->hdr.sign_size; @@ -1486,32 +1479,29 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod break; } } - //DAP_DELETE(l_sign); DAP_DELETE(l_data); if (!l_verify_passed) { if (PVT(l_session->ton)->debug) log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Message rejected from addr:"NODE_ADDR_FP_STR" not passed verification", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number, NODE_ADDR_FP_ARGS(a_sender_node_addr)); - goto handler_finish; + return; } } - if (l_message->hdr.chain_id.uint64 != l_session->chain->id.uint64 ) { - goto handler_finish; - } + if (l_message->hdr.chain_id.uint64 != l_session->chain->id.uint64 ) + return; dap_time_t l_time = dap_time_now(); - l_message->hdr.is_verified=false; dap_chain_hash_fast_t l_data_hash = {}; dap_hash_fast(a_data, a_data_size, &l_data_hash); - if (memcmp(a_data_hash, &l_data_hash, sizeof(dap_chain_hash_fast_t)) != 0) { + if (!dap_hash_fast_compare(a_data_hash, &l_data_hash)) { if (PVT(l_session->ton)->debug) log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Message rejected: message hash does not match", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); - goto handler_finish; + return; } // consensus round start sync @@ -1534,7 +1524,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Message rejected: validator addr:"NODE_ADDR_FP_STR" not in the list.", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number, NODE_ADDR_FP_ARGS(a_sender_node_addr)); - goto handler_finish; + return; } if ( @@ -1547,7 +1537,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Message rejected: too much time difference: my time:%"DAP_UINT64_FORMAT_U" sender time:%"DAP_UINT64_FORMAT_U"", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number, l_time, l_startsync->ts); - goto handler_finish; + return; } // add check&save sender addr @@ -1559,7 +1549,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Message rejected: repeated sync message from addr:"NODE_ADDR_FP_STR"", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number, NODE_ADDR_FP_ARGS(a_sender_node_addr)); - goto handler_finish; + return; } l_list_temp = l_list_next; } @@ -1571,10 +1561,8 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod l_session->cur_round.validators_start = dap_list_append(l_session->cur_round.validators_start, l_validator); l_session->cur_round.validators_start_count = dap_list_length(l_session->cur_round.validators_start); - // if ( l_session->ts_round_start_pub < l_startsync->ts ) - // l_session->ts_round_start_pub = l_startsync->ts; - // l_session->ts_round_start = (dap_chain_time_t)time(NULL); // l_startsync->ts; // set max time of start consensus - goto handler_finish; + s_session_send_startsync(l_session); + return; } // validator check @@ -1589,21 +1577,21 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Message rejected: validator addr:"NODE_ADDR_FP_STR" not in the list.", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number, NODE_ADDR_FP_ARGS(a_sender_node_addr)); - goto handler_finish; + return; } // round check if ( l_message->hdr.type != DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_COMMIT_SIGN ) { if ( l_session->state != DAP_STREAM_CH_CHAIN_SESSION_STATE_CS_PROC && l_session->state != DAP_STREAM_CH_CHAIN_SESSION_STATE_WAIT_SIGNS ) { - goto handler_finish; + return; } if ( l_round_id != l_session->cur_round.id.uint64) { if (PVT(l_session->ton)->debug) log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Message rejected: round in message does not match to current round", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); - goto handler_finish; + return; } } else { if (l_round_id != l_session->cur_round.id.uint64) { @@ -1611,7 +1599,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Message rejected: round in message does not match to current round", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); - goto handler_finish; + return; } } @@ -1624,22 +1612,24 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Message rejected: message type:%x allowed only in first attempt", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number, l_message->hdr.type); - goto handler_finish; + return; } } } - dap_chain_cs_block_ton_message_item_t *l_messages_items = l_session->cur_round.messages_items; + dap_chain_cs_block_ton_round_t *l_round = &l_session->cur_round; // check hash message dup dap_chain_cs_block_ton_message_item_t *l_message_item_temp = NULL; - HASH_FIND(hh, l_messages_items, a_data_hash, sizeof(dap_chain_hash_fast_t), l_message_item_temp); + pthread_rwlock_rdlock(&l_round->messages_rwlock); + HASH_FIND(hh, l_round->messages_items, a_data_hash, sizeof(dap_chain_hash_fast_t), l_message_item_temp); + pthread_rwlock_unlock(&l_round->messages_rwlock); if (l_message_item_temp) { if (PVT(l_session->ton)->debug) log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Message rejected: message hash is exists in chain (duplicate?)", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); - goto handler_finish; + return; } // check validator index in queue for event Submit @@ -1658,18 +1648,20 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod if ( l_validator_number ) { // pass if I first validator int l_submit_count = 0; dap_chain_cs_block_ton_message_item_t *l_chain_message=NULL, *l_chain_message_tmp=NULL; - HASH_ITER(hh, l_messages_items, l_chain_message, l_chain_message_tmp) { + pthread_rwlock_rdlock(&l_round->messages_rwlock); + HASH_ITER(hh, l_round->messages_items, l_chain_message, l_chain_message_tmp) { if ( l_chain_message->message->hdr.type == DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_SUBMIT ) { l_submit_count++; } } + pthread_rwlock_unlock(&l_round->messages_rwlock); if ( l_validator_number < l_submit_count ) { // Skip this SUBMIT. Validator must wait its queue. if (PVT(l_session->ton)->debug) log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Message rejected: Validator must wait its queue for sent SUBMIT", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); - goto handler_finish; + return; } } } @@ -1677,7 +1669,8 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod uint32_t /* l_approve_count = 0, */ l_vote_count = 0, l_precommit_count = 0; // check messages chain dap_chain_cs_block_ton_message_item_t *l_chain_message=NULL, *l_chain_message_tmp=NULL; - HASH_ITER(hh, l_messages_items, l_chain_message, l_chain_message_tmp) { + pthread_rwlock_rdlock(&l_round->messages_rwlock); + HASH_ITER(hh, l_round->messages_items, l_chain_message, l_chain_message_tmp) { if (l_chain_message->message->hdr.sender_node_addr.uint64 == a_sender_node_addr->uint64) { dap_chain_hash_fast_t *l_candidate_hash_cur = &((dap_chain_cs_block_ton_message_getinfo_t *) @@ -1705,7 +1698,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Message rejected: duplicate messages APPROVE or REJECT for one candidate", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); - goto handler_finish; + return; } } } break; @@ -1723,7 +1716,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Message rejected: duplicate messages VOTE for one candidate for one attempt", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); - goto handler_finish; + return; } } break; // this messages should only appear once per round //attempt @@ -1736,7 +1729,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Message rejected: duplicate messages VOTE or PRE_COMMIT for one candidate for one attempt", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); - goto handler_finish; + return; } } } @@ -1756,6 +1749,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod } } } + pthread_rwlock_unlock(&l_round->messages_rwlock); // check message chain is correct switch (l_message->hdr.type) { @@ -1770,7 +1764,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Message rejected: this validator can't send a PRE_COMMIT because it didn't send a VOTE for this candidate", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); - goto handler_finish; + return; } } break; case DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_COMMIT_SIGN: { @@ -1779,7 +1773,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Message rejected: this validator can't send a COMMIT_SIGN because it didn't send a PRE_COMMIT for this candidate", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); - goto handler_finish; + return; } } break; } @@ -1789,34 +1783,34 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod if( sizeof(dap_chain_cs_block_ton_message_submit_t) > l_message_data_size){ log_it(L_WARNING, "Wrong submit message size,have %zu bytes for data section when only a header requires %zu bytes", l_message_data_size,sizeof(dap_chain_cs_block_ton_message_submit_t)); - goto handler_finish; + return; } dap_chain_cs_block_ton_message_submit_t *l_submit = (dap_chain_cs_block_ton_message_submit_t *)l_message_data; size_t l_candidate_size = l_submit->candidate_size; if( l_message_data_size < l_candidate_size + sizeof(*l_submit) ){ log_it(L_WARNING, "Wrong submit message size %zu when maximum is %zu for received message", l_candidate_size, l_message_data_size - sizeof(*l_submit)); - goto handler_finish; + return; } + + dap_chain_hash_fast_t l_candidate_hash; + dap_hash_fast(l_submit->candidate, l_candidate_size, &l_candidate_hash); + + // check candidate hash + if (!dap_hash_fast_compare(&l_submit->candidate_hash, &l_candidate_hash)) + return; + + s_message_chain_add(l_session, a_sender_node_addr, l_message, a_data_size, NULL); if (!l_candidate_size || dap_hash_fast_is_blank(&l_submit->candidate_hash)) { // null candidate - save chain and exit if (PVT(l_session->ton)->debug) log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Receive SUBMIT: candidate: NULL", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); - goto handler_finish_save; + return; } - dap_chain_hash_fast_t l_candidate_hash; - dap_hash_fast(l_submit->candidate, l_candidate_size, &l_candidate_hash); - - // check candidate hash - if (memcmp(&l_submit->candidate_hash, &l_candidate_hash, - sizeof(dap_chain_hash_fast_t)) != 0) { - goto handler_finish; - } dap_chain_block_t *l_candidate = (dap_chain_block_t *) DAP_DUP_SIZE(l_submit->candidate, l_candidate_size); - char *l_candidate_hash_str = dap_chain_hash_fast_to_str_new(&l_candidate_hash); if (PVT(l_session->ton)->debug) log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Receive SUBMIT: candidate:%s, size:%zu", @@ -1830,33 +1824,34 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod l_args->candidate_hash_str = l_candidate_hash_str; l_args->candidate = l_candidate; l_args->candidate_size = l_candidate_size; - memcpy(&l_args->candidate_hash,&l_candidate_hash, sizeof(l_candidate_hash)); + l_args->candidate_hash = l_candidate_hash; l_args->time = l_time; if(dap_global_db_get(l_session->gdb_group_store,l_candidate_hash_str,s_callback_check_and_save_candidate_block, l_args) != 0){ log_it(L_ERROR, "Can't call get request for s_check_block_exist_in_store() callback"); DAP_DELETE(l_candidate_hash_str); DAP_DELETE(l_args); - goto handler_finish; } } break; + case DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_REJECT: { if ( sizeof(dap_chain_cs_block_ton_message_reject_t) >l_message_data_size ){ log_it(L_WARNING, "Wrong reject message size,have %zu bytes for data section when requires %zu bytes", l_message_data_size,sizeof(dap_chain_cs_block_ton_message_reject_t)); - goto handler_finish; + return; } dap_chain_cs_block_ton_message_reject_t *l_reject = (dap_chain_cs_block_ton_message_reject_t *) l_message_data; dap_chain_hash_fast_t *l_candidate_hash = &l_reject->candidate_hash; - pthread_rwlock_rdlock(&l_session->rwlock); + s_message_chain_add(l_session, a_sender_node_addr, l_message, a_data_size, NULL); + pthread_rwlock_rdlock(&l_session->rwlock); if (dap_hash_fast_is_blank(l_candidate_hash)) { if (PVT(l_session->ton)->debug) log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Receive REJECT: NULL", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); pthread_rwlock_unlock(&l_session->rwlock); - goto handler_finish_save; + return; } char *l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); @@ -1864,13 +1859,11 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Receive REJECT: candidate:%s", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number, l_candidate_hash_str); - pthread_rwlock_unlock(&l_session->rwlock); - pthread_rwlock_wrlock(&l_session->rwlock); + pthread_rwlock_wrlock(&l_session->rwlock); uint16_t l_reject_count = s_session_message_count(l_session, DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_REJECT, l_candidate_hash, NULL); - l_reject_count++; if (l_reject_count * 3 >= l_session->cur_round.validators_count * 2) { dap_global_db_del_sync(l_session->gdb_group_store, l_candidate_hash_str); dap_chain_hash_fast_t l_my_candidate_hash; @@ -1885,11 +1878,12 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod pthread_rwlock_unlock(&l_session->rwlock); DAP_DELETE(l_candidate_hash_str); } break; + case DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_APPROVE: { if ( sizeof(dap_chain_cs_block_ton_message_approve_t) >l_message_data_size ){ log_it(L_WARNING, "Wrong approve message size,have %zu bytes for data section when requires %zu bytes", l_message_data_size,sizeof(dap_chain_cs_block_ton_message_approve_t)); - goto handler_finish; + return; } dap_chain_cs_block_ton_message_approve_t *l_approve = (dap_chain_cs_block_ton_message_approve_t *) l_message_data; dap_sign_t * l_candidate_sign = (dap_sign_t *) l_approve->candidate_hash_sign; @@ -1898,17 +1892,15 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod if (l_candidate_sign_size > l_candidate_hash_sign_size) { log_it(L_WARNING, "Wrong approve message size,have %zu bytes for candidate sign section when requires maximum %zu bytes", l_candidate_sign_size, l_candidate_hash_sign_size); - goto handler_finish; + return; } - dap_chain_hash_fast_t *l_candidate_hash = &l_approve->candidate_hash; - if (dap_hash_fast_is_blank(l_candidate_hash)) { if (PVT(l_session->ton)->debug) log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Receive APPROVE: candidate: NULL", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); - goto handler_finish_save; + return; } char *l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); @@ -1922,13 +1914,13 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod // check candidate hash sign if ( (l_sign_verified=dap_sign_verify( (dap_sign_t*)l_approve->candidate_hash_sign, l_candidate_hash, sizeof(dap_chain_hash_fast_t))) == 1 ) { - l_message->hdr.is_verified=true; + + s_message_chain_add(l_session, a_sender_node_addr, l_message, a_data_size, NULL); if ( l_session->attempt_current_number == 1 ) { // if this first attempt then send Vote event uint16_t l_approve_count = s_session_message_count( l_session, DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_APPROVE, l_candidate_hash, NULL); - l_approve_count++; if (l_approve_count * 3 >= l_session->cur_round.validators_count * 2) { if (PVT(l_session->ton)->debug) log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U" attempt:%hu Candidate:%s collected approve more than 2/3 of the validators", @@ -1957,11 +1949,12 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod } pthread_rwlock_unlock(&l_session->rwlock); } break; + case DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_VOTE_FOR: { if ( sizeof(dap_chain_cs_block_ton_message_votefor_t) >l_message_data_size ){ log_it(L_WARNING, "Wrong vote_for message size,have %zu bytes for data section when requires %zu bytes", l_message_data_size,sizeof(dap_chain_cs_block_ton_message_votefor_t)); - goto handler_finish; + return; } dap_chain_cs_block_ton_message_votefor_t *l_votefor = (dap_chain_cs_block_ton_message_votefor_t *) l_message_data; @@ -1972,7 +1965,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod uint16_t l_attempt_current = l_session->attempt_current_number; if ( l_votefor->attempt_number != l_attempt_current) { pthread_rwlock_unlock(&l_session->rwlock); - goto handler_finish; // wrong attempt number in message + return; // wrong attempt number in message } if (PVT(l_session->ton)->debug) { @@ -1985,7 +1978,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod if ( a_sender_node_addr->uint64 != l_session->attempt_coordinator->uint64 ) { pthread_rwlock_unlock(&l_session->rwlock); - goto handler_finish; // wrong coordinator addr + return; // wrong coordinator addr } uint16_t l_votefor_count = s_session_message_count( @@ -1997,9 +1990,11 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); pthread_rwlock_unlock(&l_session->rwlock); - goto handler_finish; + return; } + s_message_chain_add(l_session, a_sender_node_addr, l_message, a_data_size, NULL); + struct vote_for_load_store_args * l_args = DAP_NEW_Z(struct vote_for_load_store_args); l_args->session = l_session; l_args->candidate_hash = *l_candidate_hash; @@ -2011,18 +2006,19 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod DAP_DELETE(l_args); } pthread_rwlock_unlock(&l_session->rwlock); - } break; + } return; + case DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_VOTE: { if ( sizeof(dap_chain_cs_block_ton_message_vote_t) >l_message_data_size ){ log_it(L_WARNING, "Wrong vote message size,have %zu bytes for data section when requires %zu bytes", l_message_data_size,sizeof(dap_chain_cs_block_ton_message_vote_t)); - goto handler_finish; + return; } dap_chain_cs_block_ton_message_vote_t *l_vote = (dap_chain_cs_block_ton_message_vote_t *) l_message_data; dap_chain_hash_fast_t *l_candidate_hash = &l_vote->candidate_hash; if ( l_vote->attempt_number != l_session->attempt_current_number) { - goto handler_finish; + return; } if (dap_hash_fast_is_blank(l_candidate_hash)) { @@ -2030,7 +2026,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Receive VOTE: candidate: NULL", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); - goto handler_finish_save; + return; } char *l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); @@ -2039,13 +2035,15 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number, l_candidate_hash_str); + s_message_chain_add(l_session, a_sender_node_addr, l_message, a_data_size, NULL); + pthread_rwlock_rdlock(&l_session->rwlock); uint16_t l_attempt_number = l_session->attempt_current_number; uint16_t l_vote_count = s_session_message_count( l_session, DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_VOTE, l_candidate_hash, &l_attempt_number); - l_vote_count++; - if (l_vote_count * 3 >= l_session->cur_round.validators_count * 2){ + + if (l_vote_count * 3 >= l_session->cur_round.validators_count * 2) { struct proc_msg_type_vote * l_args = DAP_NEW_Z(struct proc_msg_type_vote); l_args->session = l_session; l_args->candidate_hash_str = l_candidate_hash_str; @@ -2057,12 +2055,13 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod } } pthread_rwlock_unlock(&l_session->rwlock); - } break; + } return; + case DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_PRE_COMMIT: { if ( sizeof(dap_chain_cs_block_ton_message_precommit_t) >l_message_data_size ){ log_it(L_WARNING, "Wrong pre_commit message size,have %zu bytes for data section when requires %zu bytes", l_message_data_size,sizeof(dap_chain_cs_block_ton_message_precommit_t)); - goto handler_finish; + return; } dap_chain_cs_block_ton_message_precommit_t *l_precommit = (dap_chain_cs_block_ton_message_precommit_t *) l_message_data; @@ -2072,7 +2071,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod if ( l_precommit->attempt_number != l_session->attempt_current_number) { pthread_rwlock_unlock(&l_session->rwlock); - goto handler_finish; + return; } if (dap_hash_fast_is_blank(l_candidate_hash)) { if (PVT(l_session->ton)->debug) @@ -2080,9 +2079,11 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); pthread_rwlock_unlock(&l_session->rwlock); - goto handler_finish_save; + return; } + s_message_chain_add(l_session, a_sender_node_addr, l_message, a_data_size, NULL); + char *l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); if (PVT(l_session->ton)->debug) log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Receive PRE_COMMIT: candidate:%s", @@ -2092,7 +2093,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod uint16_t l_attempt_number = l_session->attempt_current_number; uint16_t l_precommit_count = s_session_message_count(l_session, DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_PRE_COMMIT, l_candidate_hash, &l_attempt_number); - l_precommit_count++; + if ( 3* l_precommit_count >= 2*l_session->cur_round.validators_count ) { struct proc_msg_type_pre_commit * l_args = DAP_NEW_Z(struct proc_msg_type_pre_commit); l_args->session = l_session; @@ -2105,12 +2106,13 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod } } pthread_rwlock_unlock(&l_session->rwlock); - } break; + } return; + case DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_COMMIT_SIGN: { if ( sizeof(dap_chain_cs_block_ton_message_commitsign_t) >l_message_data_size ){ log_it(L_WARNING, "Wrong commit_sign message size,have %zu bytes for data section when requires %zu bytes", l_message_data_size,sizeof(dap_chain_cs_block_ton_message_commitsign_t)); - goto handler_finish; + return; } dap_chain_cs_block_ton_message_commitsign_t *l_commitsign = (dap_chain_cs_block_ton_message_commitsign_t *) l_message_data; @@ -2122,7 +2124,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod if (l_candidate_sign_size > l_message_candidate_sign_size_max ){ log_it(L_WARNING, "Wrong commit_sign message size,have %zu bytes for candidate sign section when requires maximum %zu bytes", l_candidate_sign_size, l_message_candidate_sign_size_max); - goto handler_finish; + return; } pthread_rwlock_rdlock(&l_session->rwlock); @@ -2134,7 +2136,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod l_round->id.uint64, l_session->attempt_current_number); pthread_rwlock_unlock(&l_session->rwlock); - goto handler_finish_save; + return; } char *l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); @@ -2160,21 +2162,11 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod DAP_DELETE(l_args); } // Message we'll proc in callback later pthread_rwlock_unlock(&l_session->rwlock); - - } break; + return; + } default: break; } - -handler_finish_save: - if (l_session->state != DAP_STREAM_CH_CHAIN_SESSION_STATE_IDLE) { - pthread_rwlock_wrlock(&l_session->rwlock); - s_message_chain_add(l_session, a_sender_node_addr, l_message, a_data_size, NULL); - pthread_rwlock_unlock(&l_session->rwlock); - } - return; -handler_finish: - DAP_DELETE(l_message); } static uint8_t *s_message_data_sign(dap_chain_cs_block_ton_session_t *a_session, @@ -2242,21 +2234,26 @@ static void s_message_chain_add(dap_chain_cs_block_ton_session_t *a_session, dap dap_chain_cs_block_ton_message_t *a_message, size_t a_message_size, dap_chain_hash_fast_t *a_message_hash) { - dap_chain_cs_block_ton_message_t *l_message = a_message; + if (a_session->state == DAP_STREAM_CH_CHAIN_SESSION_STATE_IDLE) + return; dap_chain_cs_block_ton_round_t *l_round = &a_session->cur_round; - l_message->hdr.is_genesis = dap_hash_fast_is_blank(&l_round->last_message_hash); - if (!l_message->hdr.is_genesis) { - l_message->hdr.prev_message_hash = l_round->last_message_hash; + a_message->hdr.is_genesis = dap_hash_fast_is_blank(&l_round->last_message_hash); + if (!a_message->hdr.is_genesis) { + a_message->hdr.prev_message_hash = l_round->last_message_hash; } + dap_chain_cs_block_ton_message_item_t *l_message_item = DAP_NEW_Z(dap_chain_cs_block_ton_message_item_t); dap_chain_hash_fast_t l_message_hash; dap_hash_fast(a_message, a_message_size, &l_message_hash); - - dap_chain_cs_block_ton_message_item_t *l_message_item = DAP_NEW_Z(dap_chain_cs_block_ton_message_item_t); - l_message_item->message = l_message; - l_round->last_message_hash = l_message_item->message_hash = l_message_hash; + l_message_item->message = DAP_DUP_SIZE(a_message, a_message_size); + if (a_message->hdr.type == DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_VOTE_FOR || + a_message->hdr.type == DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_VOTE) + l_message_item->message->hdr.is_verified = true; + + pthread_rwlock_wrlock(&l_round->messages_rwlock); HASH_ADD(hh, l_round->messages_items, message_hash, sizeof(l_message_item->message_hash), l_message_item); + pthread_rwlock_unlock(&l_round->messages_rwlock); l_round->messages_count++; if (a_message_hash) diff --git a/modules/consensus/block-ton/include/dap_chain_cs_block_ton.h b/modules/consensus/block-ton/include/dap_chain_cs_block_ton.h index f782a24e34..562b4f662d 100644 --- a/modules/consensus/block-ton/include/dap_chain_cs_block_ton.h +++ b/modules/consensus/block-ton/include/dap_chain_cs_block_ton.h @@ -48,6 +48,7 @@ typedef struct dap_chain_cs_block_ton_round { uint16_t validators_start_count; dap_chain_hash_fast_t last_message_hash; dap_chain_cs_block_ton_message_item_t *messages_items; + pthread_rwlock_t messages_rwlock; bool submit; uint16_t messages_count; dap_chain_hash_fast_t my_candidate_hash; -- GitLab