diff --git a/modules/consensus/esbocs/dap_chain_cs_esbocs.c b/modules/consensus/esbocs/dap_chain_cs_esbocs.c index 3b1a31eda3cce85e2839e2c71216540e0d8730ce..a8ac4aa6c9ae2479e3d267eb56da5a49a1bf2f16 100644 --- a/modules/consensus/esbocs/dap_chain_cs_esbocs.c +++ b/modules/consensus/esbocs/dap_chain_cs_esbocs.c @@ -98,6 +98,11 @@ DAP_STATIC_INLINE char *s_get_penalty_group(dap_chain_net_id_t a_net_id) return dap_strdup_printf(DAP_CHAIN_ESBOCS_GDB_GROUPS_PREFIX".%s.penalty", l_net->pub.gdb_groups_prefix); } +DAP_STATIC_INLINE size_t s_get_esbocs_message_size(dap_chain_esbocs_message_t *a_message) +{ + return sizeof(*a_message) + a_message->hdr.sign_size + a_message->hdr.message_size; +} + static dap_chain_esbocs_session_t * s_session_items; static dap_timerfd_t *s_session_cs_timer = NULL; @@ -696,7 +701,7 @@ static void s_session_round_new(dap_chain_esbocs_session_t *a_session) for (dap_list_t *it = l_item->messages; it; it = it->next) { dap_hash_fast_t l_msg_hash; dap_chain_esbocs_message_t *l_msg = it->data; - size_t l_msg_size = sizeof(*l_msg) + l_msg->hdr.sign_size + l_msg->hdr.message_size; + size_t l_msg_size = s_get_esbocs_message_size(l_msg); dap_hash_fast(l_msg, l_msg_size, &l_msg_hash); s_session_packet_in(a_session, NULL, NULL, &l_msg_hash, (uint8_t *)l_msg, l_msg_size); } @@ -831,6 +836,8 @@ dap_chain_esbocs_directive_t *s_session_directive_ready(dap_chain_esbocs_session } if (!l_item) return NULL; + debug_if(PVT(a_session->esbocs)->debug, L_MSG, "Current consensus online %hu from %zu is acceptable, so issue the directive", + a_session->cur_round.total_validators_synced, l_list_length); uint32_t l_directive_size = s_directive_calc_size(l_kick ? DAP_CHAIN_ESBOCS_DIRECTIVE_KICK : DAP_CHAIN_ESBOCS_DIRECTIVE_LIFT); dap_chain_esbocs_directive_t *l_ret = DAP_NEW_Z_SIZE(dap_chain_esbocs_directive_t, l_directive_size); l_ret->version = DAP_CHAIN_ESBOCS_DIRECTIVE_VERSION; @@ -888,8 +895,7 @@ static void s_session_state_change(dap_chain_esbocs_session_t *a_session, enum s } else s_session_candidate_submit(a_session); } else { - dap_chain_esbocs_message_item_t *l_item, *l_tmp; - HASH_ITER(hh, a_session->cur_round.message_items, l_item, l_tmp) { + for (dap_chain_esbocs_message_item_t *l_item = a_session->cur_round.message_items; l_item; l_item = l_item->hh.next) { if (l_item->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_SUBMIT && dap_chain_addr_compare(&l_item->signing_addr, &a_session->cur_round.attempt_submit_validator)) { dap_hash_fast_t *l_candidate_hash = &l_item->message->hdr.candidate_hash; @@ -1128,6 +1134,19 @@ static void s_session_candidate_submit(dap_chain_esbocs_session_t *a_session) static void s_session_candidate_verify(dap_chain_esbocs_session_t *a_session, dap_chain_block_t *a_candidate, size_t a_candidate_size, dap_hash_fast_t *a_candidate_hash) { + // Process early received messages + for (dap_chain_esbocs_message_item_t *l_item = a_session->cur_round.message_items; l_item; l_item = l_item->hh.next) { + if (l_item->unprocessed && + (l_item->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE || + l_item->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_REJECT || + l_item->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_COMMIT_SIGN) && + dap_hash_fast_compare(&l_item->message->hdr.candidate_hash, a_candidate_hash) && + l_item->message->hdr.attempt_num == a_session->cur_round.attempt_num) { + s_session_packet_in(a_session, NULL, NULL, &l_item->message_hash, + (uint8_t *)l_item->message, s_get_esbocs_message_size(l_item->message)); + } + } + // Process candidate a_session->processing_candidate = a_candidate; dap_chain_cs_blocks_t *l_blocks = DAP_CHAIN_CS_BLOCKS(a_session->chain); if (l_blocks->chain->callback_atom_verify(l_blocks->chain, a_candidate, a_candidate_size) == ATOM_ACCEPT) { @@ -1167,7 +1186,7 @@ static void s_session_candidate_precommit(dap_chain_esbocs_session_t *a_session, HASH_FIND(hh, a_session->cur_round.store_items, l_candidate_hash, sizeof(dap_chain_hash_fast_t), l_store); if (!l_store) { l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); - log_it(L_WARNING, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Receive PRE_COMMIT message for unknown candidate %s", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_message->hdr.attempt_num, @@ -1544,13 +1563,26 @@ static void s_session_directive_process(dap_chain_esbocs_session_t *a_session, d l_directive_hash_str); DAP_DELETE(l_directive_hash_str); } - uint8_t l_type = l_vote_for ? DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR : DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_AGAINST; - s_message_send(a_session, l_type, a_directive_hash, NULL, 0, a_session->cur_round.all_validators); a_session->cur_round.directive_hash = *a_directive_hash; a_session->cur_round.directive = DAP_DUP_SIZE(a_directive, a_directive->size); s_session_state_change(a_session, DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_VOTING, dap_time_now()); + + // Process early received directive votes + for (dap_chain_esbocs_message_item_t *l_item = a_session->cur_round.message_items; l_item; l_item = l_item->hh.next) { + if (l_item->unprocessed && + (l_item->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR || + l_item->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_AGAINST) && + dap_hash_fast_compare(&l_item->message->hdr.candidate_hash, a_directive_hash) && + l_item->message->hdr.attempt_num == a_session->cur_round.attempt_num) { + s_session_packet_in(a_session, NULL, NULL, &l_item->message_hash, + (uint8_t *)l_item->message, s_get_esbocs_message_size(l_item->message)); + } + } + // Send own vote + uint8_t l_type = l_vote_for ? DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR : DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_AGAINST; + s_message_send(a_session, l_type, a_directive_hash, NULL, 0, a_session->cur_round.all_validators); } static int s_session_directive_apply(dap_chain_esbocs_directive_t *a_directive, dap_hash_fast_t *a_directive_hash) @@ -1619,13 +1651,22 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod dap_chain_esbocs_message_t *l_message = (dap_chain_esbocs_message_t *)a_data; bool l_cs_debug = PVT(l_session->esbocs)->debug; uint16_t l_cs_level = PVT(l_session->esbocs)->min_validators_count; + + if (a_data_size < sizeof(dap_chain_esbocs_message_hdr_t)) { + log_it(L_WARNING, "Too smalll message size %zu, less than header size %zu", a_data_size, sizeof(dap_chain_esbocs_message_hdr_t)); + return; + } + size_t l_message_data_size = l_message->hdr.message_size; byte_t *l_message_data = l_message->msg_n_sign; dap_chain_hash_fast_t *l_candidate_hash = &l_message->hdr.candidate_hash; dap_sign_t *l_sign = (dap_sign_t *)(l_message_data + l_message_data_size); size_t l_sign_size = l_message->hdr.sign_size; + dap_chain_esbocs_round_t *l_round = &l_session->cur_round; + dap_chain_addr_t l_signing_addr; + char *l_validator_addr_str = NULL; - if (a_sender_node_addr) { //Process network message + if (a_sender_node_addr) { //Process network messages only pthread_mutex_lock(&l_session->mutex); debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Receive pkt type:0x%x from addr:"NODE_ADDR_FP_STR", my_addr:"NODE_ADDR_FP_STR"", @@ -1693,16 +1734,31 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod l_session->cur_round.id, l_session->cur_round.attempt_num); goto session_unlock; } + + // check hash message dup + dap_chain_esbocs_message_item_t *l_message_item_temp = NULL; + HASH_FIND(hh, l_round->message_items, a_data_hash, sizeof(dap_chain_hash_fast_t), l_message_item_temp); + if (l_message_item_temp) { + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Message rejected: message hash is exists in chain (duplicate)", + l_session->chain->net_name, l_session->chain->name, + l_session->cur_round.id, l_message->hdr.attempt_num); + goto session_unlock; + } + dap_chain_addr_fill_from_sign(&l_signing_addr, l_sign, l_session->chain->net_id); + s_message_chain_add(l_session, l_message, a_data_size, a_data_hash, &l_signing_addr); } + // Process local & network messages - dap_chain_addr_t l_signing_addr; - char *l_validator_addr_str = NULL; dap_chain_addr_fill_from_sign(&l_signing_addr, l_sign, l_session->chain->net_id); if (l_cs_debug) l_validator_addr_str = dap_chain_addr_to_str(&l_signing_addr); bool l_not_in_list = false; switch (l_message->hdr.type) { case DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC: + // Add local sync messages, cause a round clear + if (!a_sender_node_addr) + s_message_chain_add(l_session, l_message, a_data_size, a_data_hash, &l_signing_addr); // Accept all validators if (!dap_chain_net_srv_stake_key_delegated(&l_signing_addr)) l_not_in_list = true; @@ -1727,27 +1783,18 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod goto session_unlock; } - dap_chain_esbocs_round_t *l_round = &l_session->cur_round; - - // check hash message dup - dap_chain_esbocs_message_item_t *l_message_item_temp = NULL; - HASH_FIND(hh, l_round->message_items, a_data_hash, sizeof(dap_chain_hash_fast_t), l_message_item_temp); - if (l_message_item_temp) { - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." - " Message rejected: message hash is exists in chain (duplicate)", - l_session->chain->net_name, l_session->chain->name, - l_session->cur_round.id, l_message->hdr.attempt_num); - goto session_unlock; - } - // check messages chain dap_chain_esbocs_message_item_t *l_chain_message, *l_chain_message_tmp; HASH_ITER(hh, l_round->message_items, l_chain_message, l_chain_message_tmp) { bool l_same_type = l_chain_message->message->hdr.type == l_message->hdr.type || (l_chain_message->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE && - l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_REJECT) || + l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_REJECT) || (l_chain_message->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_REJECT && - l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE); + l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE) || + (l_chain_message->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR && + l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_AGAINST) || + (l_chain_message->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_AGAINST && + l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR); if (l_same_type && dap_chain_addr_compare(&l_chain_message->signing_addr, &l_signing_addr) && dap_hash_fast_compare(&l_chain_message->message->hdr.candidate_hash, &l_message->hdr.candidate_hash)) { if (l_message->hdr.type != DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC || // Not sync or same sync attempt @@ -1762,8 +1809,6 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod } } - s_message_chain_add(l_session, l_message, a_data_size, a_data_hash, &l_signing_addr); - switch (l_message->hdr.type) { case DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC: { if (l_message_data_size != sizeof(uint64_t)) { @@ -1880,51 +1925,23 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod } } break; + case DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE: case DAP_CHAIN_ESBOCS_MSG_TYPE_REJECT: { dap_chain_esbocs_store_t *l_store; char *l_candidate_hash_str = NULL; + bool l_approve = l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE; HASH_FIND(hh, l_session->cur_round.store_items, l_candidate_hash, sizeof(dap_chain_hash_fast_t), l_store); if (!l_store) { - l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); - log_it(L_WARNING, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." - " Receive REJECT message for unknown candidate %s", - l_session->chain->net_name, l_session->chain->name, - l_session->cur_round.id, l_message->hdr.attempt_num, - l_candidate_hash_str); - DAP_DELETE(l_candidate_hash_str); - break; - } - - if (l_cs_debug) { l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." - " Receive REJECT: candidate %s", - l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, - l_message->hdr.attempt_num, l_candidate_hash_str); - } - if (++l_store->reject_count >= l_cs_level && !l_store->decide_reject && - dap_hash_fast_compare(&l_session->cur_round.attempt_candidate_hash, l_candidate_hash)) { - l_store->decide_reject = true; - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." - " Candidate %s rejected by minimum number of validators, attempt failed", - l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, - l_message->hdr.attempt_num, l_candidate_hash_str); - s_session_attempt_new(l_session); - } - DAP_DEL_Z(l_candidate_hash_str); - } break; - - case DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE: { - dap_chain_esbocs_store_t *l_store; - char *l_candidate_hash_str = NULL; - HASH_FIND(hh, l_session->cur_round.store_items, l_candidate_hash, sizeof(dap_chain_hash_fast_t), l_store); - if (!l_store) { - l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); - log_it(L_WARNING, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." - " Receive APPROVE message for unknown candidate %s", + " Receive %s message for unknown candidate %s, process it later", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, - l_candidate_hash_str); + l_approve ? "APPROVE" : "REJECT", l_candidate_hash_str); + dap_chain_esbocs_message_item_t *l_unprocessed_item = NULL; + HASH_FIND(hh, l_round->message_items, a_data_hash, sizeof(dap_chain_hash_fast_t), l_unprocessed_item); + if (l_unprocessed_item) + l_unprocessed_item->unprocessed = true; DAP_DELETE(l_candidate_hash_str); break; } @@ -1932,11 +1949,11 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod if (l_cs_debug) { l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." - " Receive APPROVE: candidate %s", + " Receive %s: candidate %s", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, - l_message->hdr.attempt_num, l_candidate_hash_str); + l_message->hdr.attempt_num, l_approve ? "APPROVE" : "REJECT", l_candidate_hash_str); } - if (++l_store->approve_count >= l_cs_level && !l_store->decide_approve && + if (l_approve && ++l_store->approve_count >= l_cs_level && !l_store->decide_approve && dap_hash_fast_compare(&l_session->cur_round.attempt_candidate_hash, l_candidate_hash)) { l_store->decide_approve = true; debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." @@ -1951,6 +1968,15 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod l_candidate_sign, l_candidate_sign_size, l_session->cur_round.validators_list); DAP_DELETE(l_candidate_sign); } + if (!l_approve && ++l_store->reject_count >= l_cs_level && !l_store->decide_reject && + dap_hash_fast_compare(&l_session->cur_round.attempt_candidate_hash, l_candidate_hash)) { + l_store->decide_reject = true; + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Candidate %s rejected by minimum number of validators, attempt failed", + l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, + l_message->hdr.attempt_num, l_candidate_hash_str); + s_session_attempt_new(l_session); + } DAP_DEL_Z(l_candidate_hash_str); } break; @@ -2061,11 +2087,15 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod } if (!dap_hash_fast_compare(&l_session->cur_round.directive_hash, l_candidate_hash)) { debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." - "Received VOTE %s unknown directive", + "Received VOTE %s unknown directive, it will be processed later", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR ? "FOR" : "AGAINST"); + dap_chain_esbocs_message_item_t *l_unprocessed_item = NULL; + HASH_FIND(hh, l_round->message_items, a_data_hash, sizeof(dap_chain_hash_fast_t), l_unprocessed_item); + if (l_unprocessed_item) + l_unprocessed_item->unprocessed = true; break; } if (l_cs_debug) { diff --git a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h index c1a78ff09c943015cb71b8ffd896197b62f4fe9a..61df118cea88da5b5c013df3ba808907fe5ca76b 100644 --- a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h +++ b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h @@ -64,6 +64,7 @@ typedef struct dap_chain_esbocs_message_item { dap_hash_fast_t message_hash; dap_chain_esbocs_message_t *message; dap_chain_addr_t signing_addr; + bool unprocessed; // Do not count one message twice UT_hash_handle hh; } dap_chain_esbocs_message_item_t;