Skip to content
Snippets Groups Projects
Commit bb8191f3 authored by Roman Khlopkov's avatar Roman Khlopkov 🔜
Browse files

[+] Voting post-process

parent 1889b18a
No related branches found
No related tags found
No related merge requests found
......@@ -98,6 +98,11 @@ DAP_STATIC_INLINE char *s_get_penalty_group(dap_chain_net_id_t a_net_id)
return dap_strdup_printf(DAP_CHAIN_ESBOCS_GDB_GROUPS_PREFIX".%s.penalty", l_net->pub.gdb_groups_prefix);
}
DAP_STATIC_INLINE size_t s_get_esbocs_message_size(dap_chain_esbocs_message_t *a_message)
{
return sizeof(*a_message) + a_message->hdr.sign_size + a_message->hdr.message_size;
}
static dap_chain_esbocs_session_t * s_session_items;
static dap_timerfd_t *s_session_cs_timer = NULL;
......@@ -696,7 +701,7 @@ static void s_session_round_new(dap_chain_esbocs_session_t *a_session)
for (dap_list_t *it = l_item->messages; it; it = it->next) {
dap_hash_fast_t l_msg_hash;
dap_chain_esbocs_message_t *l_msg = it->data;
size_t l_msg_size = sizeof(*l_msg) + l_msg->hdr.sign_size + l_msg->hdr.message_size;
size_t l_msg_size = s_get_esbocs_message_size(l_msg);
dap_hash_fast(l_msg, l_msg_size, &l_msg_hash);
s_session_packet_in(a_session, NULL, NULL, &l_msg_hash, (uint8_t *)l_msg, l_msg_size);
}
......@@ -831,6 +836,8 @@ dap_chain_esbocs_directive_t *s_session_directive_ready(dap_chain_esbocs_session
}
if (!l_item)
return NULL;
debug_if(PVT(a_session->esbocs)->debug, L_MSG, "Current consensus online %hu from %zu is acceptable, so issue the directive",
a_session->cur_round.total_validators_synced, l_list_length);
uint32_t l_directive_size = s_directive_calc_size(l_kick ? DAP_CHAIN_ESBOCS_DIRECTIVE_KICK : DAP_CHAIN_ESBOCS_DIRECTIVE_LIFT);
dap_chain_esbocs_directive_t *l_ret = DAP_NEW_Z_SIZE(dap_chain_esbocs_directive_t, l_directive_size);
l_ret->version = DAP_CHAIN_ESBOCS_DIRECTIVE_VERSION;
......@@ -888,8 +895,7 @@ static void s_session_state_change(dap_chain_esbocs_session_t *a_session, enum s
} else
s_session_candidate_submit(a_session);
} else {
dap_chain_esbocs_message_item_t *l_item, *l_tmp;
HASH_ITER(hh, a_session->cur_round.message_items, l_item, l_tmp) {
for (dap_chain_esbocs_message_item_t *l_item = a_session->cur_round.message_items; l_item; l_item = l_item->hh.next) {
if (l_item->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_SUBMIT &&
dap_chain_addr_compare(&l_item->signing_addr, &a_session->cur_round.attempt_submit_validator)) {
dap_hash_fast_t *l_candidate_hash = &l_item->message->hdr.candidate_hash;
......@@ -1128,6 +1134,19 @@ static void s_session_candidate_submit(dap_chain_esbocs_session_t *a_session)
static void s_session_candidate_verify(dap_chain_esbocs_session_t *a_session, dap_chain_block_t *a_candidate,
size_t a_candidate_size, dap_hash_fast_t *a_candidate_hash)
{
// Process early received messages
for (dap_chain_esbocs_message_item_t *l_item = a_session->cur_round.message_items; l_item; l_item = l_item->hh.next) {
if (l_item->unprocessed &&
(l_item->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE ||
l_item->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_REJECT ||
l_item->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_COMMIT_SIGN) &&
dap_hash_fast_compare(&l_item->message->hdr.candidate_hash, a_candidate_hash) &&
l_item->message->hdr.attempt_num == a_session->cur_round.attempt_num) {
s_session_packet_in(a_session, NULL, NULL, &l_item->message_hash,
(uint8_t *)l_item->message, s_get_esbocs_message_size(l_item->message));
}
}
// Process candidate
a_session->processing_candidate = a_candidate;
dap_chain_cs_blocks_t *l_blocks = DAP_CHAIN_CS_BLOCKS(a_session->chain);
if (l_blocks->chain->callback_atom_verify(l_blocks->chain, a_candidate, a_candidate_size) == ATOM_ACCEPT) {
......@@ -1167,7 +1186,7 @@ static void s_session_candidate_precommit(dap_chain_esbocs_session_t *a_session,
HASH_FIND(hh, a_session->cur_round.store_items, l_candidate_hash, sizeof(dap_chain_hash_fast_t), l_store);
if (!l_store) {
l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash);
log_it(L_WARNING, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu."
log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu."
" Receive PRE_COMMIT message for unknown candidate %s",
a_session->chain->net_name, a_session->chain->name,
a_session->cur_round.id, a_message->hdr.attempt_num,
......@@ -1544,13 +1563,26 @@ static void s_session_directive_process(dap_chain_esbocs_session_t *a_session, d
l_directive_hash_str);
DAP_DELETE(l_directive_hash_str);
}
uint8_t l_type = l_vote_for ? DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR : DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_AGAINST;
s_message_send(a_session, l_type, a_directive_hash, NULL, 0, a_session->cur_round.all_validators);
a_session->cur_round.directive_hash = *a_directive_hash;
a_session->cur_round.directive = DAP_DUP_SIZE(a_directive, a_directive->size);
s_session_state_change(a_session, DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_VOTING, dap_time_now());
// Process early received directive votes
for (dap_chain_esbocs_message_item_t *l_item = a_session->cur_round.message_items; l_item; l_item = l_item->hh.next) {
if (l_item->unprocessed &&
(l_item->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR ||
l_item->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_AGAINST) &&
dap_hash_fast_compare(&l_item->message->hdr.candidate_hash, a_directive_hash) &&
l_item->message->hdr.attempt_num == a_session->cur_round.attempt_num) {
s_session_packet_in(a_session, NULL, NULL, &l_item->message_hash,
(uint8_t *)l_item->message, s_get_esbocs_message_size(l_item->message));
}
}
// Send own vote
uint8_t l_type = l_vote_for ? DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR : DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_AGAINST;
s_message_send(a_session, l_type, a_directive_hash, NULL, 0, a_session->cur_round.all_validators);
}
static int s_session_directive_apply(dap_chain_esbocs_directive_t *a_directive, dap_hash_fast_t *a_directive_hash)
......@@ -1619,13 +1651,22 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod
dap_chain_esbocs_message_t *l_message = (dap_chain_esbocs_message_t *)a_data;
bool l_cs_debug = PVT(l_session->esbocs)->debug;
uint16_t l_cs_level = PVT(l_session->esbocs)->min_validators_count;
if (a_data_size < sizeof(dap_chain_esbocs_message_hdr_t)) {
log_it(L_WARNING, "Too smalll message size %zu, less than header size %zu", a_data_size, sizeof(dap_chain_esbocs_message_hdr_t));
return;
}
size_t l_message_data_size = l_message->hdr.message_size;
byte_t *l_message_data = l_message->msg_n_sign;
dap_chain_hash_fast_t *l_candidate_hash = &l_message->hdr.candidate_hash;
dap_sign_t *l_sign = (dap_sign_t *)(l_message_data + l_message_data_size);
size_t l_sign_size = l_message->hdr.sign_size;
dap_chain_esbocs_round_t *l_round = &l_session->cur_round;
dap_chain_addr_t l_signing_addr;
char *l_validator_addr_str = NULL;
if (a_sender_node_addr) { //Process network message
if (a_sender_node_addr) { //Process network messages only
pthread_mutex_lock(&l_session->mutex);
debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu."
" Receive pkt type:0x%x from addr:"NODE_ADDR_FP_STR", my_addr:"NODE_ADDR_FP_STR"",
......@@ -1693,16 +1734,31 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod
l_session->cur_round.id, l_session->cur_round.attempt_num);
goto session_unlock;
}
// check hash message dup
dap_chain_esbocs_message_item_t *l_message_item_temp = NULL;
HASH_FIND(hh, l_round->message_items, a_data_hash, sizeof(dap_chain_hash_fast_t), l_message_item_temp);
if (l_message_item_temp) {
debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu."
" Message rejected: message hash is exists in chain (duplicate)",
l_session->chain->net_name, l_session->chain->name,
l_session->cur_round.id, l_message->hdr.attempt_num);
goto session_unlock;
}
dap_chain_addr_fill_from_sign(&l_signing_addr, l_sign, l_session->chain->net_id);
s_message_chain_add(l_session, l_message, a_data_size, a_data_hash, &l_signing_addr);
}
// Process local & network messages
dap_chain_addr_t l_signing_addr;
char *l_validator_addr_str = NULL;
dap_chain_addr_fill_from_sign(&l_signing_addr, l_sign, l_session->chain->net_id);
if (l_cs_debug)
l_validator_addr_str = dap_chain_addr_to_str(&l_signing_addr);
bool l_not_in_list = false;
switch (l_message->hdr.type) {
case DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC:
// Add local sync messages, cause a round clear
if (!a_sender_node_addr)
s_message_chain_add(l_session, l_message, a_data_size, a_data_hash, &l_signing_addr);
// Accept all validators
if (!dap_chain_net_srv_stake_key_delegated(&l_signing_addr))
l_not_in_list = true;
......@@ -1727,27 +1783,18 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod
goto session_unlock;
}
dap_chain_esbocs_round_t *l_round = &l_session->cur_round;
// check hash message dup
dap_chain_esbocs_message_item_t *l_message_item_temp = NULL;
HASH_FIND(hh, l_round->message_items, a_data_hash, sizeof(dap_chain_hash_fast_t), l_message_item_temp);
if (l_message_item_temp) {
debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu."
" Message rejected: message hash is exists in chain (duplicate)",
l_session->chain->net_name, l_session->chain->name,
l_session->cur_round.id, l_message->hdr.attempt_num);
goto session_unlock;
}
// check messages chain
dap_chain_esbocs_message_item_t *l_chain_message, *l_chain_message_tmp;
HASH_ITER(hh, l_round->message_items, l_chain_message, l_chain_message_tmp) {
bool l_same_type = l_chain_message->message->hdr.type == l_message->hdr.type ||
(l_chain_message->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE &&
l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_REJECT) ||
l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_REJECT) ||
(l_chain_message->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_REJECT &&
l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE);
l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE) ||
(l_chain_message->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR &&
l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_AGAINST) ||
(l_chain_message->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_AGAINST &&
l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR);
if (l_same_type && dap_chain_addr_compare(&l_chain_message->signing_addr, &l_signing_addr) &&
dap_hash_fast_compare(&l_chain_message->message->hdr.candidate_hash, &l_message->hdr.candidate_hash)) {
if (l_message->hdr.type != DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC || // Not sync or same sync attempt
......@@ -1762,8 +1809,6 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod
}
}
s_message_chain_add(l_session, l_message, a_data_size, a_data_hash, &l_signing_addr);
switch (l_message->hdr.type) {
case DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC: {
if (l_message_data_size != sizeof(uint64_t)) {
......@@ -1880,51 +1925,23 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod
}
} break;
case DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE:
case DAP_CHAIN_ESBOCS_MSG_TYPE_REJECT: {
dap_chain_esbocs_store_t *l_store;
char *l_candidate_hash_str = NULL;
bool l_approve = l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE;
HASH_FIND(hh, l_session->cur_round.store_items, l_candidate_hash, sizeof(dap_chain_hash_fast_t), l_store);
if (!l_store) {
l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash);
log_it(L_WARNING, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu."
" Receive REJECT message for unknown candidate %s",
l_session->chain->net_name, l_session->chain->name,
l_session->cur_round.id, l_message->hdr.attempt_num,
l_candidate_hash_str);
DAP_DELETE(l_candidate_hash_str);
break;
}
if (l_cs_debug) {
l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash);
log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu."
" Receive REJECT: candidate %s",
l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id,
l_message->hdr.attempt_num, l_candidate_hash_str);
}
if (++l_store->reject_count >= l_cs_level && !l_store->decide_reject &&
dap_hash_fast_compare(&l_session->cur_round.attempt_candidate_hash, l_candidate_hash)) {
l_store->decide_reject = true;
debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu."
" Candidate %s rejected by minimum number of validators, attempt failed",
l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id,
l_message->hdr.attempt_num, l_candidate_hash_str);
s_session_attempt_new(l_session);
}
DAP_DEL_Z(l_candidate_hash_str);
} break;
case DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE: {
dap_chain_esbocs_store_t *l_store;
char *l_candidate_hash_str = NULL;
HASH_FIND(hh, l_session->cur_round.store_items, l_candidate_hash, sizeof(dap_chain_hash_fast_t), l_store);
if (!l_store) {
l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash);
log_it(L_WARNING, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu."
" Receive APPROVE message for unknown candidate %s",
" Receive %s message for unknown candidate %s, process it later",
l_session->chain->net_name, l_session->chain->name,
l_session->cur_round.id, l_message->hdr.attempt_num,
l_candidate_hash_str);
l_approve ? "APPROVE" : "REJECT", l_candidate_hash_str);
dap_chain_esbocs_message_item_t *l_unprocessed_item = NULL;
HASH_FIND(hh, l_round->message_items, a_data_hash, sizeof(dap_chain_hash_fast_t), l_unprocessed_item);
if (l_unprocessed_item)
l_unprocessed_item->unprocessed = true;
DAP_DELETE(l_candidate_hash_str);
break;
}
......@@ -1932,11 +1949,11 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod
if (l_cs_debug) {
l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash);
log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu."
" Receive APPROVE: candidate %s",
" Receive %s: candidate %s",
l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id,
l_message->hdr.attempt_num, l_candidate_hash_str);
l_message->hdr.attempt_num, l_approve ? "APPROVE" : "REJECT", l_candidate_hash_str);
}
if (++l_store->approve_count >= l_cs_level && !l_store->decide_approve &&
if (l_approve && ++l_store->approve_count >= l_cs_level && !l_store->decide_approve &&
dap_hash_fast_compare(&l_session->cur_round.attempt_candidate_hash, l_candidate_hash)) {
l_store->decide_approve = true;
debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu."
......@@ -1951,6 +1968,15 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod
l_candidate_sign, l_candidate_sign_size, l_session->cur_round.validators_list);
DAP_DELETE(l_candidate_sign);
}
if (!l_approve && ++l_store->reject_count >= l_cs_level && !l_store->decide_reject &&
dap_hash_fast_compare(&l_session->cur_round.attempt_candidate_hash, l_candidate_hash)) {
l_store->decide_reject = true;
debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu."
" Candidate %s rejected by minimum number of validators, attempt failed",
l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id,
l_message->hdr.attempt_num, l_candidate_hash_str);
s_session_attempt_new(l_session);
}
DAP_DEL_Z(l_candidate_hash_str);
} break;
......@@ -2061,11 +2087,15 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod
}
if (!dap_hash_fast_compare(&l_session->cur_round.directive_hash, l_candidate_hash)) {
debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu."
"Received VOTE %s unknown directive",
"Received VOTE %s unknown directive, it will be processed later",
l_session->chain->net_name, l_session->chain->name,
l_session->cur_round.id, l_message->hdr.attempt_num,
l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR ?
"FOR" : "AGAINST");
dap_chain_esbocs_message_item_t *l_unprocessed_item = NULL;
HASH_FIND(hh, l_round->message_items, a_data_hash, sizeof(dap_chain_hash_fast_t), l_unprocessed_item);
if (l_unprocessed_item)
l_unprocessed_item->unprocessed = true;
break;
}
if (l_cs_debug) {
......
......@@ -64,6 +64,7 @@ typedef struct dap_chain_esbocs_message_item {
dap_hash_fast_t message_hash;
dap_chain_esbocs_message_t *message;
dap_chain_addr_t signing_addr;
bool unprocessed; // Do not count one message twice
UT_hash_handle hh;
} dap_chain_esbocs_message_item_t;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment