diff --git a/modules/channel/chain-voting/dap_stream_ch_chain_voting.c b/modules/channel/chain-voting/dap_stream_ch_chain_voting.c index b7dd8dfe9288d9e0213296202f11f3ad444df0b4..5077f939c05e85147746d05df1c1b02b64d5f18a 100644 --- a/modules/channel/chain-voting/dap_stream_ch_chain_voting.c +++ b/modules/channel/chain-voting/dap_stream_ch_chain_voting.c @@ -71,6 +71,12 @@ static bool s_callback_pkt_in_call_all(UNUSED_ARG dap_proc_thread_t *a_thread, v return true; } +void dap_stream_ch_voting_queue_clear() +{ + for (struct voting_node_client_list *it = s_node_client_list; it; it = it->hh.next) + dap_chain_node_client_queue_clear(it->node_client); +} + void dap_stream_ch_chain_voting_message_write(dap_chain_net_t *a_net, dap_chain_node_addr_t *a_remote_node_addr, dap_stream_ch_chain_voting_pkt_t *a_voting_pkt) { diff --git a/modules/channel/chain-voting/include/dap_stream_ch_chain_voting.h b/modules/channel/chain-voting/include/dap_stream_ch_chain_voting.h index 530fbd064e5f6b96c5b2e8c27e46400aa2e3d96e..b26344f362e1c1609e567e8f436299ee32983ae5 100644 --- a/modules/channel/chain-voting/include/dap_stream_ch_chain_voting.h +++ b/modules/channel/chain-voting/include/dap_stream_ch_chain_voting.h @@ -64,3 +64,5 @@ size_t dap_stream_ch_chain_voting_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_ int dap_stream_ch_chain_voting_init(); void dap_stream_ch_chain_voting_deinit(); + +void dap_stream_ch_voting_queue_clear(); diff --git a/modules/consensus/esbocs/dap_chain_cs_esbocs.c b/modules/consensus/esbocs/dap_chain_cs_esbocs.c index 9be55a321cf74d4d97227aaba032d37b25bb58fd..1561dac1b5256a7b1e3e2480a7cb3234e612c87e 100644 --- a/modules/consensus/esbocs/dap_chain_cs_esbocs.c +++ b/modules/consensus/esbocs/dap_chain_cs_esbocs.c @@ -16,13 +16,16 @@ #include "dap_chain_node_cli_cmd.h" #define LOG_TAG "dap_chain_cs_esbocs" -const char* block_fee_group = "local.fee-collect-block-hashes"; + +static const char *s_block_fee_group = "local.fee-collect-block-hashes"; enum s_esbocs_session_state { DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_START, DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_PROC, DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_SIGNS, - DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_FINISH + DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_FINISH, + DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_VOTING, + DAP_CHAIN_ESBOCS_SESSION_STATE_PREVIOUS // fictive sate to change back }; static dap_list_t *s_validator_check(dap_chain_addr_t *a_addr, dap_list_t *a_validators); @@ -64,18 +67,37 @@ static int s_cli_esbocs(int argc, char ** argv, char **str_reply); DAP_STATIC_INLINE const char *s_voting_msg_type_to_str(uint8_t a_type) { switch (a_type) { - case DAP_STREAM_CH_VOTING_MSG_TYPE_START_SYNC: return "START_SYNC"; - case DAP_STREAM_CH_VOTING_MSG_TYPE_SUBMIT: return "SUBMIT"; - case DAP_STREAM_CH_VOTING_MSG_TYPE_APPROVE: return "APPROVE"; - case DAP_STREAM_CH_VOTING_MSG_TYPE_REJECT: return "REJECT"; - case DAP_STREAM_CH_VOTING_MSG_TYPE_COMMIT_SIGN: return "COMMIT_SIGN"; - case DAP_STREAM_CH_VOTING_MSG_TYPE_VOTE: return "VOTE"; - //case DAP_STREAM_CH_VOTING_MSG_TYPE_VOTE_FOR: return "VOTE_FOR" - case DAP_STREAM_CH_VOTING_MSG_TYPE_PRE_COMMIT: return "PRE_COMMIT"; + case DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC: return "START_SYNC"; + case DAP_CHAIN_ESBOCS_MSG_TYPE_SUBMIT: return "SUBMIT"; + case DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE: return "APPROVE"; + case DAP_CHAIN_ESBOCS_MSG_TYPE_REJECT: return "REJECT"; + case DAP_CHAIN_ESBOCS_MSG_TYPE_COMMIT_SIGN: return "COMMIT_SIGN"; + case DAP_CHAIN_ESBOCS_MSG_TYPE_PRE_COMMIT: return "PRE_COMMIT"; + case DAP_CHAIN_ESBOCS_MSG_TYPE_DIRECTIVE: return "DIRECTIVE"; + case DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR: return "VOTE_FOR"; + case DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_AGAINST: return "VOTE_AGAINST"; default: return "UNKNOWN"; } } +DAP_STATIC_INLINE uint32_t s_directive_calc_size(uint8_t a_type) +{ + uint32_t l_ret = sizeof(dap_chain_esbocs_directive_t); + switch (a_type) { + case DAP_CHAIN_ESBOCS_DIRECTIVE_KICK: + case DAP_CHAIN_ESBOCS_DIRECTIVE_LIFT: + l_ret += sizeof(dap_tsd_t) + sizeof(dap_chain_addr_t); + default:; + } + return l_ret; +} + +DAP_STATIC_INLINE char *s_get_penalty_group(dap_chain_net_id_t a_net_id) +{ + dap_chain_net_t *l_net = dap_chain_net_by_id(a_net_id); + return dap_strdup_printf(DAP_CHAIN_ESBOCS_GDB_GROUPS_PREFIX".%s.penalty", l_net->pub.gdb_groups_prefix); +} + static dap_chain_esbocs_session_t * s_session_items; static dap_timerfd_t *s_session_cs_timer = NULL; @@ -123,28 +145,22 @@ void dap_chain_cs_esbocs_deinit(void) static int s_callback_new(dap_chain_t *a_chain, dap_config_t *a_chain_cfg) { dap_chain_cs_blocks_new(a_chain, a_chain_cfg); + dap_chain_cs_blocks_t *l_blocks = DAP_CHAIN_CS_BLOCKS(a_chain); + dap_chain_esbocs_t *l_esbocs = DAP_NEW_Z(dap_chain_esbocs_t); + l_esbocs->blocks = l_blocks; + l_blocks->_inheritor = l_esbocs; l_blocks->callback_delete = s_callback_delete; l_blocks->callback_block_verify = s_callback_block_verify; l_blocks->callback_block_sign = s_callback_block_sign; - dap_chain_esbocs_t *l_esbocs = DAP_NEW_Z(dap_chain_esbocs_t); - l_esbocs->chain = a_chain; - l_esbocs->blocks = l_blocks; - dap_chain_esbocs_session_t *l_session = DAP_NEW_Z(dap_chain_esbocs_session_t); - l_session->chain = a_chain; - l_session->esbocs = l_esbocs; - - l_esbocs->chain->callback_set_min_validators_count = s_callback_set_min_validators_count; - - l_esbocs->session = l_session; - l_blocks->_inheritor = l_esbocs; - - l_esbocs->_pvt = DAP_NEW_Z(dap_chain_esbocs_pvt_t); - dap_chain_esbocs_pvt_t *l_esbocs_pvt = PVT(l_esbocs); + l_esbocs->chain = a_chain; + a_chain->callback_set_min_validators_count = s_callback_set_min_validators_count; a_chain->callback_get_minimum_fee = s_callback_get_minimum_fee; a_chain->callback_get_signing_certificate = s_callback_get_sign_key; + l_esbocs->_pvt = DAP_NEW_Z(dap_chain_esbocs_pvt_t); + dap_chain_esbocs_pvt_t *l_esbocs_pvt = PVT(l_esbocs); l_esbocs_pvt->debug = dap_config_get_item_bool_default(a_chain_cfg, "esbocs", "consensus_debug", false); l_esbocs_pvt->poa_mode = dap_config_get_item_bool_default(a_chain_cfg, "esbocs", "poa_mode", false); l_esbocs_pvt->round_start_sync_timeout = dap_config_get_item_uint16_default(a_chain_cfg, "esbocs", "round_start_sync_timeout", 15); @@ -230,6 +246,26 @@ static void s_new_atom_notifier(void *a_arg, UNUSED_ARG dap_chain_t *a_chain, UN pthread_mutex_unlock(&l_session->mutex); } +static void s_session_load_penaltys(dap_chain_esbocs_session_t *a_session) +{ + const char *l_penalty_group = s_get_penalty_group(a_session->chain->net_id); + size_t l_group_size = 0; + dap_global_db_obj_t *l_keys = dap_global_db_get_all_sync(l_penalty_group, &l_group_size); + for (size_t i = 0; i < l_group_size; i++) { + dap_chain_addr_t *l_addr = dap_chain_addr_from_str((l_keys + i)->key); + if (dap_chain_net_srv_stake_mark_validator_active(l_addr, false)) + dap_global_db_del(l_penalty_group, (l_keys + i)->key, NULL, NULL); + else { + dap_chain_esbocs_penalty_item_t *l_item = DAP_NEW_Z(dap_chain_esbocs_penalty_item_t); + l_item->signing_addr = *l_addr; + l_item->miss_count = DAP_CHAIN_ESBOCS_PENALTY_KICK; + HASH_ADD(hh, a_session->penalty, signing_addr, sizeof(dap_chain_addr_t), l_item); + } + DAP_DELETE(l_addr); + } + DAP_DELETE(l_penalty_group); +} + static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cfg) { dap_chain_cs_blocks_t *l_blocks = DAP_CHAIN_CS_BLOCKS(a_chain); @@ -310,9 +346,13 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf } } - dap_chain_esbocs_session_t *l_session = l_esbocs->session; + dap_chain_esbocs_session_t *l_session = DAP_NEW_Z(dap_chain_esbocs_session_t); + l_session->chain = a_chain; + l_session->esbocs = l_esbocs; + l_esbocs->session = l_session; l_session->my_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); l_session->my_signing_addr = l_my_signing_addr; + s_session_load_penaltys(l_session); pthread_mutexattr_t l_mutex_attr; pthread_mutexattr_init(&l_mutex_attr); pthread_mutexattr_settype(&l_mutex_attr, PTHREAD_MUTEX_RECURSIVE); @@ -358,11 +398,16 @@ static void s_callback_delete(dap_chain_cs_blocks_t *a_blocks) if (!s_session_items) dap_timerfd_delete_mt(s_session_cs_timer->worker, s_session_cs_timer->esocket_uuid); s_session_round_clear(l_session); - dap_chain_esbocs_sync_item_t *l_item, *l_tmp; - HASH_ITER(hh, l_session->sync_items, l_item, l_tmp) { - HASH_DEL(l_session->sync_items, l_item); - dap_list_free_full(l_item->messages, NULL); - DAP_DELETE(l_item); + dap_chain_esbocs_sync_item_t *l_sync_item, *l_sync_tmp; + HASH_ITER(hh, l_session->sync_items, l_sync_item, l_sync_tmp) { + HASH_DEL(l_session->sync_items, l_sync_item); + dap_list_free_full(l_sync_item->messages, NULL); + DAP_DELETE(l_sync_item); + } + dap_chain_esbocs_penalty_item_t *l_pen_item, *l_pen_tmp; + HASH_ITER(hh, l_session->penalty, l_pen_item, l_pen_tmp) { + HASH_DEL(l_session->penalty, l_pen_item); + DAP_DELETE(l_pen_item); } pthread_mutex_unlock(&l_session->mutex); DAP_DELETE(l_session); @@ -411,9 +456,9 @@ static dap_list_t *s_get_validators_list(dap_chain_esbocs_session_t *a_session, dap_list_t *l_ret = NULL; if (!l_esbocs_pvt->poa_mode) { - dap_list_t *l_validators = dap_chain_net_srv_stake_get_validators(a_session->chain->net_id); - a_session->cur_round.total_validators_count = dap_list_length(l_validators); - if (a_session->cur_round.total_validators_count < l_esbocs_pvt->min_validators_count) { + dap_list_t *l_validators = dap_chain_net_srv_stake_get_validators(a_session->chain->net_id, true); + uint16_t l_total_validators_count = dap_list_length(l_validators); + if (l_total_validators_count < l_esbocs_pvt->min_validators_count) { dap_list_free_full(l_validators, NULL); return NULL; } @@ -430,9 +475,10 @@ static dap_list_t *s_get_validators_list(dap_chain_esbocs_session_t *a_session, } } - //size_t n = (size_t)l_esbocs_pvt->min_validators_count * 3; - size_t l_consensus_optimum = (size_t)l_esbocs_pvt->min_validators_count * 2 - 1;//(n / 2) + (n % 2); - size_t l_need_vld_cnt = MIN(a_session->cur_round.total_validators_count, l_consensus_optimum); + a_session->cur_round.all_validators = dap_list_copy_deep(l_validators, s_callback_list_form, NULL); + + size_t l_consensus_optimum = (size_t)l_esbocs_pvt->min_validators_count * 2 - 1; + size_t l_need_vld_cnt = MIN(l_total_validators_count, l_consensus_optimum); dap_pseudo_random_seed(*(uint256_t *)&a_session->cur_round.last_block_hash); for (uint64_t i = 0; i < a_skip_count * l_need_vld_cnt; i++) @@ -534,13 +580,18 @@ static void s_session_send_startsync(dap_chain_esbocs_session_t *a_session) a_session->cur_round.sync_attempt, l_addr_list->str); dap_string_free(l_addr_list, true); } - dap_list_t *l_validators = dap_chain_net_srv_stake_get_validators(a_session->chain->net_id); - dap_list_t *l_send_list = dap_list_copy_deep(l_validators, s_callback_list_form, NULL); - dap_list_free_full(l_validators, NULL); - s_message_send(a_session, DAP_STREAM_CH_VOTING_MSG_TYPE_START_SYNC, &l_last_block_hash, + dap_list_t *l_inactive_validators = dap_chain_net_srv_stake_get_validators(a_session->chain->net_id, false); + dap_list_t *l_inactive_sendlist = dap_list_copy_deep(l_inactive_validators, s_callback_list_form, NULL); + dap_list_free_full(l_inactive_validators, NULL); + dap_list_t *l_total_sendlist = dap_list_concat(a_session->cur_round.all_validators, l_inactive_sendlist); + s_message_send(a_session, DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC, &l_last_block_hash, &a_session->cur_round.sync_attempt, sizeof(uint64_t), - l_send_list); - dap_list_free_full(l_send_list, NULL); + l_total_sendlist); + if (l_inactive_sendlist && l_inactive_sendlist->prev) { // List splitting + l_inactive_sendlist->prev->next = NULL; + l_inactive_sendlist->prev = NULL; + } + dap_list_free_full(l_inactive_sendlist, NULL); a_session->cur_round.sync_sent = true; } @@ -554,6 +605,32 @@ static bool s_session_send_startsync_on_timer(void *a_arg) return false; } +static void s_session_update_penalty(dap_chain_esbocs_session_t *a_session) +{ + //size_t l_list_length = dap_list_length(a_session->cur_round.all_validators); + //if (a_session->cur_round.total_validators_synced * 3 < l_list_length * 2) + // return; // Not a valid round, less than 2/3 participants + for (dap_list_t *it = a_session->cur_round.all_validators; it; it = it->next) { + if (((dap_chain_esbocs_validator_t *)it->data)->is_synced) + continue; // Penalty for non synced participants only + dap_chain_esbocs_penalty_item_t *l_item = NULL; + dap_chain_addr_t *l_signing_addr = &((dap_chain_esbocs_validator_t *)it->data)->signing_addr; + HASH_FIND(hh, a_session->penalty, l_signing_addr, sizeof(*l_signing_addr), l_item); + if (!l_item) { + l_item = DAP_NEW_Z(dap_chain_esbocs_penalty_item_t); + l_item->signing_addr = *l_signing_addr; + HASH_ADD(hh, a_session->penalty, signing_addr, sizeof(*l_signing_addr), l_item); + } + if (PVT(a_session->esbocs)->debug) { + char *l_addr_str = dap_chain_addr_to_str(l_signing_addr); + log_it(L_DEBUG, "Increment miss count %d for addr %s. Miss count for kick is %d", + l_item->miss_count, l_addr_str, DAP_CHAIN_ESBOCS_PENALTY_KICK); + DAP_DELETE(l_addr_str); + } + l_item->miss_count++; + } +} + static void s_session_round_clear(dap_chain_esbocs_session_t *a_session) { dap_chain_esbocs_message_item_t *l_message_item, *l_message_tmp; @@ -569,6 +646,9 @@ static void s_session_round_clear(dap_chain_esbocs_session_t *a_session) DAP_DELETE(l_store_item); } dap_list_free_full(a_session->cur_round.validators_list, NULL); + dap_list_free_full(a_session->cur_round.all_validators, NULL); + + DAP_DEL_Z(a_session->cur_round.directive); a_session->cur_round = (dap_chain_esbocs_round_t){ .id = a_session->cur_round.id, @@ -580,6 +660,10 @@ static void s_session_round_clear(dap_chain_esbocs_session_t *a_session) static void s_session_round_new(dap_chain_esbocs_session_t *a_session) { + if (!a_session->round_fast_forward) { + s_session_update_penalty(a_session); + dap_stream_ch_voting_queue_clear(); + } s_session_round_clear(a_session); a_session->cur_round.id++; a_session->cur_round.sync_attempt++; @@ -590,7 +674,7 @@ static void s_session_round_new(dap_chain_esbocs_session_t *a_session) } a_session->state = DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_START; a_session->ts_round_sync_start = 0; - a_session->ts_attempt_start = 0; + a_session->ts_stage_entry = 0; dap_hash_fast_t l_last_block_hash; s_get_last_block_hash(a_session->chain, &l_last_block_hash); @@ -653,7 +737,7 @@ static void s_session_attempt_new(dap_chain_esbocs_session_t *a_session) dap_chain_esbocs_validator_t *l_validator = it->data; if (l_validator->is_synced && !l_validator->is_chosen) { // We have synced validator with no submitted candidate - debug_if(PVT(a_session->esbocs)->debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U". Attempt:%hu is started", + debug_if(PVT(a_session->esbocs)->debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U". Attempt:%hhu is started", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_session->cur_round.attempt_num); s_session_state_change(a_session, DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_PROC, dap_time_now()); @@ -669,14 +753,15 @@ static void s_session_attempt_new(dap_chain_esbocs_session_t *a_session) static uint64_t s_session_calc_current_round_id(dap_chain_esbocs_session_t *a_session) { + uint16_t l_total_validators_count = dap_list_length(a_session->cur_round.all_validators); struct { uint64_t id; uint16_t counter; - } l_id_candidates[a_session->cur_round.total_validators_count]; + } l_id_candidates[l_total_validators_count]; uint16_t l_fill_idx = 0; dap_chain_esbocs_message_item_t *l_item, *l_tmp; HASH_ITER(hh, a_session->cur_round.message_items, l_item, l_tmp) { - if (l_item->message->hdr.type == DAP_STREAM_CH_VOTING_MSG_TYPE_START_SYNC && + if (l_item->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC && a_session->cur_round.sync_attempt == l_item->message->hdr.attempt_num) { uint64_t l_id_candidate = l_item->message->hdr.round_id; bool l_candidate_found = false; @@ -689,10 +774,10 @@ static uint64_t s_session_calc_current_round_id(dap_chain_esbocs_session_t *a_se if (!l_candidate_found) { l_id_candidates[l_fill_idx].id = l_id_candidate; l_id_candidates[l_fill_idx].counter = 1; - if (++l_fill_idx > a_session->cur_round.total_validators_count) { + if (++l_fill_idx > l_total_validators_count) { log_it(L_ERROR, "Count of sync messages with same sync attempt is greater" " than totel validators count %hu > %hu", - l_fill_idx, a_session->cur_round.total_validators_count); + l_fill_idx, l_total_validators_count); l_fill_idx--; break; } @@ -726,11 +811,53 @@ static int s_signs_sort_callback(const void *a_sign1, const void *a_sign2, UNUSE return l_ret; } +dap_chain_esbocs_directive_t *s_session_directive_ready(dap_chain_esbocs_session_t *a_session) +{ + bool l_kick = false; + dap_chain_esbocs_penalty_item_t *l_item, *l_tmp; + HASH_ITER(hh, a_session->penalty, l_item, l_tmp) { + int l_key_state = dap_chain_net_srv_stake_key_delegated(&l_item->signing_addr); + if (l_key_state == 0) { + HASH_DEL(a_session->penalty, l_item); + DAP_DELETE(l_item); + continue; + } + if (l_item->miss_count >= DAP_CHAIN_ESBOCS_PENALTY_KICK && l_key_state == 1) { + l_kick = true; + break; + } + if (l_item->miss_count == 0 && l_key_state == -1) + break; + } + if (!l_item) + return NULL; + uint32_t l_directive_size = s_directive_calc_size(l_kick ? DAP_CHAIN_ESBOCS_DIRECTIVE_KICK : DAP_CHAIN_ESBOCS_DIRECTIVE_LIFT); + dap_chain_esbocs_directive_t *l_ret = DAP_NEW_Z_SIZE(dap_chain_esbocs_directive_t, l_directive_size); + l_ret->version = DAP_CHAIN_ESBOCS_DIRECTIVE_VERSION; + l_ret->type = l_kick ? DAP_CHAIN_ESBOCS_DIRECTIVE_KICK : DAP_CHAIN_ESBOCS_DIRECTIVE_LIFT; + l_ret->size = l_directive_size; + l_ret->timestamp = dap_nanotime_now(); + dap_tsd_t *l_tsd = (dap_tsd_t *)l_ret->tsd; + l_tsd->type = DAP_CHAIN_ESBOCS_DIRECTIVE_TSD_TYPE_ADDR; + l_tsd->size = sizeof(dap_chain_addr_t); + *(dap_chain_addr_t *)l_tsd->data = l_item->signing_addr; + return l_ret; +} + static void s_session_state_change(dap_chain_esbocs_session_t *a_session, enum s_esbocs_session_state a_new_state, dap_time_t a_time) { + if (a_new_state != DAP_CHAIN_ESBOCS_SESSION_STATE_PREVIOUS) { + if (a_session->state == DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_VOTING) { + // Do not change this state, state changing will be applied after return to PREVIOUS state + a_session->old_state = a_new_state; + return; + } + a_session->old_state = a_session->state; + } a_session->state = a_new_state; - a_session->ts_attempt_start = a_time; - switch (a_session->state) { + a_session->ts_stage_entry = a_time; + + switch (a_new_state) { case DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_PROC: { dap_chain_esbocs_validator_t *l_validator = NULL; for (dap_list_t *it = a_session->cur_round.validators_list; it; it = it->next) { @@ -741,12 +868,29 @@ static void s_session_state_change(dap_chain_esbocs_session_t *a_session, enum s } } a_session->cur_round.attempt_submit_validator = l_validator->signing_addr; - if (dap_chain_addr_compare(&a_session->cur_round.attempt_submit_validator, &a_session->my_signing_addr)) - s_session_candidate_submit(a_session); - else { + if (dap_chain_addr_compare(&a_session->cur_round.attempt_submit_validator, &a_session->my_signing_addr)) { + dap_chain_esbocs_directive_t *l_directive = NULL; + if (!a_session->cur_round.directive) + l_directive = s_session_directive_ready(a_session); + if (l_directive) { + dap_hash_fast_t l_directive_hash; + dap_hash_fast(l_directive, l_directive->size, &l_directive_hash); + if (PVT(a_session->esbocs)->debug) { + char *l_candidate_hash_str = dap_chain_hash_fast_to_str_new(&l_directive_hash); + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu. Put on the vote my directive:%s", + a_session->chain->net_name, a_session->chain->name, + a_session->cur_round.id, a_session->cur_round.attempt_num, l_candidate_hash_str); + DAP_DELETE(l_candidate_hash_str); + } + s_message_send(a_session, DAP_CHAIN_ESBOCS_MSG_TYPE_DIRECTIVE, &l_directive_hash, + l_directive, l_directive->size, a_session->cur_round.all_validators); + DAP_DELETE(l_directive); + } else + s_session_candidate_submit(a_session); + } else { dap_chain_esbocs_message_item_t *l_item, *l_tmp; HASH_ITER(hh, a_session->cur_round.message_items, l_item, l_tmp) { - if (l_item->message->hdr.type == DAP_STREAM_CH_VOTING_MSG_TYPE_SUBMIT && + if (l_item->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_SUBMIT && dap_chain_addr_compare(&l_item->signing_addr, &a_session->cur_round.attempt_submit_validator)) { dap_hash_fast_t *l_candidate_hash = &l_item->message->hdr.candidate_hash; if (dap_hash_fast_is_blank(l_candidate_hash)) @@ -798,17 +942,25 @@ static void s_session_state_change(dap_chain_esbocs_session_t *a_session, enum s // Process received earlier PreCommit messages dap_chain_esbocs_message_item_t *l_chain_message, *l_chain_message_tmp; HASH_ITER(hh, a_session->cur_round.message_items, l_chain_message, l_chain_message_tmp) { - if (l_chain_message->message->hdr.type == DAP_STREAM_CH_VOTING_MSG_TYPE_PRE_COMMIT && + if (l_chain_message->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_PRE_COMMIT && dap_hash_fast_compare(&l_chain_message->message->hdr.candidate_hash, &a_session->cur_round.attempt_candidate_hash)) { s_session_candidate_precommit(a_session, l_chain_message->message); } } // Send own PreCommit - s_message_send(a_session, DAP_STREAM_CH_VOTING_MSG_TYPE_PRE_COMMIT, &l_store->candidate_hash, + s_message_send(a_session, DAP_CHAIN_ESBOCS_MSG_TYPE_PRE_COMMIT, &l_store->candidate_hash, &l_store->precommit_candidate_hash, sizeof(dap_chain_hash_fast_t), a_session->cur_round.validators_list); } break; + case DAP_CHAIN_ESBOCS_SESSION_STATE_PREVIOUS: { + if (a_session->old_state != DAP_CHAIN_ESBOCS_SESSION_STATE_PREVIOUS) + s_session_state_change(a_session, a_session->old_state, a_time); + else { + log_it(L_ERROR, "No previous state registered, can't roll back"); + s_session_round_new(a_session); + } + } default: break; } @@ -827,17 +979,17 @@ static void s_session_proc_state(dap_chain_esbocs_session_t *a_session) dap_time_t l_round_timeout = PVT(a_session->esbocs)->round_start_sync_timeout; bool l_round_skip = !s_validator_check(&a_session->my_signing_addr, a_session->cur_round.validators_list); if (l_round_skip) - l_round_timeout += PVT(a_session->esbocs)->round_attempt_timeout * 4 * PVT(a_session->esbocs)->round_attempts_max; + l_round_timeout += PVT(a_session->esbocs)->round_attempt_timeout * 6 * PVT(a_session->esbocs)->round_attempts_max; if (a_session->ts_round_sync_start && l_time - a_session->ts_round_sync_start >= l_round_timeout) { if (a_session->cur_round.validators_synced_count >= PVT(a_session->esbocs)->min_validators_count && !l_round_skip) { a_session->cur_round.id = s_session_calc_current_round_id(a_session); - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Minimum count of validators are synchronized, wait to submit candidate", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_session->cur_round.attempt_num); s_session_state_change(a_session, DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_PROC, l_time); } else { // timeout start sync - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Round finished by reason: %s", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_session->cur_round.attempt_num, @@ -847,9 +999,9 @@ static void s_session_proc_state(dap_chain_esbocs_session_t *a_session) } } break; case DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_PROC: - if (l_time - a_session->ts_attempt_start >= PVT(a_session->esbocs)->round_attempt_timeout * l_listen_ensure) { + if (l_time - a_session->ts_stage_entry >= PVT(a_session->esbocs)->round_attempt_timeout * l_listen_ensure) { l_listen_ensure += 2; - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Attempt finished by reason: haven't cantidate submitted", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_session->cur_round.attempt_num); @@ -858,7 +1010,7 @@ static void s_session_proc_state(dap_chain_esbocs_session_t *a_session) break; case DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_SIGNS: l_listen_ensure = 1; - if (l_time - a_session->ts_attempt_start >= PVT(a_session->esbocs)->round_attempt_timeout) { + if (l_time - a_session->ts_stage_entry >= PVT(a_session->esbocs)->round_attempt_timeout) { dap_chain_esbocs_store_t *l_store; HASH_FIND(hh, a_session->cur_round.store_items, &a_session->cur_round.attempt_candidate_hash, sizeof(dap_hash_fast_t), l_store); if (!l_store) { @@ -869,8 +1021,8 @@ static void s_session_proc_state(dap_chain_esbocs_session_t *a_session) if (dap_list_length(l_store->candidate_signs) >= PVT(a_session->esbocs)->min_validators_count) { if(l_cs_debug) { char *l_candidate_hash_str = dap_chain_hash_fast_to_str_new(&a_session->cur_round.attempt_candidate_hash); - log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu" - " Candidate:%s collected sings of minimum number of validators, so to sent PRE_COMMIT", + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu" + " Candidate %s collected sings of minimum number of validators, so to sent PRE_COMMIT", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_session->cur_round.attempt_num, l_candidate_hash_str); DAP_DELETE(l_candidate_hash_str); @@ -878,7 +1030,7 @@ static void s_session_proc_state(dap_chain_esbocs_session_t *a_session) s_session_state_change(a_session, DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_FINISH, l_time); break; } - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Attempt finished by reason: cant't collect minimum number of validator's signs", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_session->cur_round.attempt_num); @@ -886,14 +1038,26 @@ static void s_session_proc_state(dap_chain_esbocs_session_t *a_session) } break; case DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_FINISH: - if (l_time - a_session->ts_attempt_start >= PVT(a_session->esbocs)->round_attempt_timeout * 2) { - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + if (l_time - a_session->ts_stage_entry >= PVT(a_session->esbocs)->round_attempt_timeout * 2) { + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Attempt finished by reason: cant't collect minimum number of validator's precommits with same final hash", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_session->cur_round.attempt_num); s_session_attempt_new(a_session); } break; + case DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_VOTING: + if (l_time - a_session->ts_stage_entry >= PVT(a_session->esbocs)->round_attempt_timeout * 2) { + const char *l_hash_str = dap_chain_hash_fast_to_str_new(&a_session->cur_round.directive_hash); + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Voting finished by reason: cant't collect minimum number of validator's votes for directive %s", + a_session->chain->net_name, a_session->chain->name, + a_session->cur_round.id, a_session->cur_round.attempt_num, + l_hash_str); + DAP_DELETE(l_hash_str); + s_session_state_change(a_session, DAP_CHAIN_ESBOCS_SESSION_STATE_PREVIOUS, l_time); + } + break; default: break; } @@ -943,19 +1107,19 @@ static void s_session_candidate_submit(dap_chain_esbocs_session_t *a_session) dap_hash_fast(l_candidate, l_candidate_size, &l_candidate_hash); if (PVT(a_session->esbocs)->debug) { char *l_candidate_hash_str = dap_chain_hash_fast_to_str_new(&l_candidate_hash); - log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu. Submit my candidate:%s", + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu. Submit my candidate %s", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_session->cur_round.attempt_num, l_candidate_hash_str); DAP_DELETE(l_candidate_hash_str); } } else { // there is no my candidate, send null hash if (PVT(a_session->esbocs)->debug) - log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " I don't have a candidate. I submit a null candidate.", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_session->cur_round.attempt_num); } - s_message_send(a_session, DAP_STREAM_CH_VOTING_MSG_TYPE_SUBMIT, &l_candidate_hash, + s_message_send(a_session, DAP_CHAIN_ESBOCS_MSG_TYPE_SUBMIT, &l_candidate_hash, l_candidate, l_candidate_size, a_session->cur_round.validators_list); //Save candidate_hash memcpy(&(PVT(a_session->esbocs)->candidate_hash), &l_candidate_hash, sizeof(dap_hash_fast_t)); @@ -968,22 +1132,22 @@ static void s_session_candidate_verify(dap_chain_esbocs_session_t *a_session, da dap_chain_cs_blocks_t *l_blocks = DAP_CHAIN_CS_BLOCKS(a_session->chain); if (l_blocks->chain->callback_atom_verify(l_blocks->chain, a_candidate, a_candidate_size) == ATOM_ACCEPT) { // validation - OK, gen event Approve - s_message_send(a_session, DAP_STREAM_CH_VOTING_MSG_TYPE_APPROVE, a_candidate_hash, + s_message_send(a_session, DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE, a_candidate_hash, NULL, 0, a_session->cur_round.validators_list); if (PVT(a_session->esbocs)->debug) { char *l_candidate_hash_str = dap_chain_hash_fast_to_str_new(a_candidate_hash); - log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Sent APPROVE candidate:%s", + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu Sent APPROVE candidate %s", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_session->cur_round.attempt_num, l_candidate_hash_str); DAP_DELETE(l_candidate_hash_str); } } else { // validation - fail, gen event Reject - s_message_send(a_session, DAP_STREAM_CH_VOTING_MSG_TYPE_REJECT, a_candidate_hash, + s_message_send(a_session, DAP_CHAIN_ESBOCS_MSG_TYPE_REJECT, a_candidate_hash, NULL, 0, a_session->cur_round.validators_list); if (PVT(a_session->esbocs)->debug) { char *l_candidate_hash_str = dap_chain_hash_fast_to_str_new(a_candidate_hash); - log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Sent REJECT candidate:%s", + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu Sent REJECT candidate %s", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_session->cur_round.attempt_num, l_candidate_hash_str); DAP_DELETE(l_candidate_hash_str); @@ -1003,8 +1167,8 @@ static void s_session_candidate_precommit(dap_chain_esbocs_session_t *a_session, HASH_FIND(hh, a_session->cur_round.store_items, l_candidate_hash, sizeof(dap_chain_hash_fast_t), l_store); if (!l_store) { l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); - log_it(L_WARNING, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." - " Receive PRE_COMMIT message for unknown candidate:%s", + log_it(L_WARNING, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Receive PRE_COMMIT message for unknown candidate %s", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_message->hdr.attempt_num, l_candidate_hash_str); @@ -1021,8 +1185,8 @@ static void s_session_candidate_precommit(dap_chain_esbocs_session_t *a_session, l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); char *l_my_precommit_hash_str = dap_chain_hash_fast_to_str_new(&l_store->precommit_candidate_hash); char *l_remote_precommit_hash_str = dap_chain_hash_fast_to_str_new(l_precommit_hash); - log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." - " Candidate:%s has different final hash of local and remote validators\n" + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Candidate %s has different final hash of local and remote validators\n" "(%s and %s)", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_message->hdr.attempt_num, l_candidate_hash_str, @@ -1036,16 +1200,16 @@ static void s_session_candidate_precommit(dap_chain_esbocs_session_t *a_session, if (l_cs_debug) { l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); - log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." - " Receive PRE_COMMIT: candidate:%s", + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Receive PRE_COMMIT: candidate %s", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_message->hdr.attempt_num, l_candidate_hash_str); } if (++l_store->precommit_count >= l_cs_level && !l_store->decide_commit && dap_hash_fast_compare(&a_session->cur_round.attempt_candidate_hash, l_candidate_hash)) { l_store->decide_commit = true; - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." - " Candidate:%s precommted by minimum number of validators, try to finish this round", + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Candidate %s precommted by minimum number of validators, try to finish this round", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_message->hdr.attempt_num, l_candidate_hash_str); s_session_round_finish(a_session, l_store); @@ -1101,9 +1265,9 @@ typedef struct fee_serv_param dap_chain_t * chain; }fee_serv_param_t; -static void s_check_db_callback_fee_collect (dap_global_db_context_t *a_global_db_context, - int a_rc, const char *a_group, - const size_t a_values_total, const size_t a_values_count, +static void s_check_db_callback_fee_collect (UNUSED_ARG dap_global_db_context_t *a_global_db_context, + UNUSED_ARG int a_rc, UNUSED_ARG const char *a_group, + UNUSED_ARG const size_t a_values_total, const size_t a_values_count, dap_global_db_obj_t *a_values, void *a_arg) { int res = 0; @@ -1139,13 +1303,13 @@ static void s_check_db_callback_fee_collect (dap_global_db_context_t *a_global_d if(l_hash_tx) { log_it(L_NOTICE, "Fee collect transaction successfully created, hash=%s\n",l_hash_tx); - dap_global_db_del(block_fee_group, NULL, NULL, NULL); + dap_global_db_del(s_block_fee_group, NULL, NULL, NULL); DAP_DELETE(l_hash_tx); } } else { - res = dap_global_db_set(block_fee_group,l_block_cache->block_hash_str,&l_value_out_block,sizeof(uint256_t),false,NULL,NULL); + res = dap_global_db_set(s_block_fee_group,l_block_cache->block_hash_str,&l_value_out_block,sizeof(uint256_t),false,NULL,NULL); if(res) log_it(L_WARNING, "Unable to write data to database"); else @@ -1169,14 +1333,14 @@ static void s_check_db_callback_fee_collect (dap_global_db_context_t *a_global_d l_block_list, l_arg->value_fee, "hex"); if(l_hash_tx) { - dap_global_db_del(block_fee_group, NULL, NULL, NULL); + dap_global_db_del(s_block_fee_group, NULL, NULL, NULL); log_it(L_NOTICE, "Fee collect transaction successfully created, hash=%s\n",l_hash_tx); DAP_DELETE(l_hash_tx); } } else { - res = dap_global_db_set(block_fee_group,l_block_cache->block_hash_str,&l_value_out_block,sizeof(uint256_t),false,NULL,NULL); + res = dap_global_db_set(s_block_fee_group,l_block_cache->block_hash_str,&l_value_out_block,sizeof(uint256_t),false,NULL,NULL); if(res) log_it(L_WARNING, "Unable to write data to database"); else @@ -1237,7 +1401,7 @@ static void s_session_round_finish(dap_chain_esbocs_session_t *a_session, dap_ch if (l_cs_debug) { char *l_finish_candidate_hash_str = dap_chain_hash_fast_to_str_new(&l_store->candidate_hash); char *l_finish_block_hash_str = dap_chain_hash_fast_to_str_new(&l_store->precommit_candidate_hash); - log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Candidate:%s passed the consensus!\n" + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu Candidate %s passed the consensus!\n" "Move block %s to chains", a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, a_session->cur_round.attempt_num, l_finish_candidate_hash_str, l_finish_block_hash_str); @@ -1260,7 +1424,7 @@ static void s_session_round_finish(dap_chain_esbocs_session_t *a_session, dap_ch tmp->fee_need_cfg = PVT(a_session->esbocs)->fee_coll_set; tmp->key_from = a_session->blocks_sign_key; - dap_global_db_get_all(block_fee_group,0,s_check_db_callback_fee_collect,tmp); + dap_global_db_get_all(s_block_fee_group,0,s_check_db_callback_fee_collect,tmp); } } @@ -1276,6 +1440,170 @@ void s_session_sync_queue_add(dap_chain_esbocs_session_t *a_session, dap_chain_e l_sync_item->messages = dap_list_append(l_sync_item->messages, DAP_DUP_SIZE(a_message, a_message_size)); } +void s_session_validator_mark_online(dap_chain_esbocs_session_t *a_session, dap_chain_addr_t *a_signing_addr) +{ + bool l_in_list = false; + dap_list_t *l_list = s_validator_check(a_signing_addr, a_session->cur_round.all_validators); + if (l_list) { + ((dap_chain_esbocs_validator_t *)l_list->data)->is_synced = true; + a_session->cur_round.total_validators_synced++; + l_in_list = true; + } + dap_chain_esbocs_penalty_item_t *l_item = NULL; + HASH_FIND(hh, a_session->penalty, a_signing_addr, sizeof(*a_signing_addr), l_item); + if (!l_in_list) { + if (!l_item) { + log_it(L_ERROR, "Got sync message from validator not in active list nor in penalty list"); + l_item = DAP_NEW_Z(dap_chain_esbocs_penalty_item_t); + l_item->signing_addr = *a_signing_addr; + l_item->miss_count = DAP_CHAIN_ESBOCS_PENALTY_KICK; + HASH_ADD(hh, a_session->penalty, signing_addr, sizeof(*a_signing_addr), l_item); + return; + } + if (l_item->miss_count > DAP_CHAIN_ESBOCS_PENALTY_KICK) + l_item->miss_count = DAP_CHAIN_ESBOCS_PENALTY_KICK; + } + if (l_item) { + if (PVT(a_session->esbocs)->debug) { + const char *l_addr_str = dap_chain_addr_to_str(a_signing_addr); + log_it(L_DEBUG, "Decrement miss count %d for addr %s. Miss count for kick is %d", + l_item->miss_count, l_addr_str, DAP_CHAIN_ESBOCS_PENALTY_KICK); + DAP_DELETE(l_addr_str); + } + if (l_item->miss_count) + l_item->miss_count--; + if (l_in_list && !l_item->miss_count) { + HASH_DEL(a_session->penalty, l_item); + DAP_DELETE(l_item); + } + } +} + +static void s_session_directive_process(dap_chain_esbocs_session_t *a_session, dap_chain_esbocs_directive_t *a_directive, dap_chain_hash_fast_t *a_directive_hash) +{ + if (a_directive->size != s_directive_calc_size(a_directive->type)) { + log_it(L_ERROR, "Invalid directive size %u (expected %u)", + a_directive->size, s_directive_calc_size(a_directive->type)); + return; + } + bool l_vote_for = false; + switch (a_directive->type) { + case DAP_CHAIN_ESBOCS_DIRECTIVE_KICK: + case DAP_CHAIN_ESBOCS_DIRECTIVE_LIFT: { + dap_tsd_t *l_tsd = (dap_tsd_t *)a_directive->tsd; + if (l_tsd->size != sizeof(dap_chain_addr_t)) { + log_it(L_ERROR, "Invalid directive TSD size %u (expected %zu)", + l_tsd->size, sizeof(dap_chain_addr_t)); + return; + } + dap_chain_addr_t *l_voting_addr = (dap_chain_addr_t *)l_tsd->data; + if (l_voting_addr->net_id.uint64 != a_session->chain->net_id.uint64) { + log_it(L_WARNING, "Got directive to %s for invalid network id 0x%"DAP_UINT64_FORMAT_x + " (current network id is 0x%"DAP_UINT64_FORMAT_x, + a_directive->type == DAP_CHAIN_ESBOCS_DIRECTIVE_KICK ? "KICK" : "LIFT", + l_voting_addr->net_id.uint64, a_session->chain->net_id.uint64); + return; + } + int l_status = dap_chain_net_srv_stake_key_delegated(l_voting_addr); + if (l_status == 0) { + const char *l_addr_str = dap_chain_addr_to_str(l_voting_addr); + log_it(L_WARNING, "Trying to put to the vote directive type %s for non delegated key %s", + a_directive->type == DAP_CHAIN_ESBOCS_DIRECTIVE_KICK ? "KICK" : "LIFT", + l_addr_str); + DAP_DELETE(l_addr_str); + return; + } + dap_chain_esbocs_penalty_item_t *l_item = NULL; + HASH_FIND(hh, a_session->penalty, l_voting_addr, sizeof(*l_voting_addr), l_item); + if (l_status == 1) { // Key is active + if (a_directive->type == DAP_CHAIN_ESBOCS_DIRECTIVE_KICK) { + if (l_item && l_item->miss_count >= DAP_CHAIN_ESBOCS_PENALTY_KICK) + l_vote_for = true; + } else { // a_directive->type == DAP_CHAIN_ESBOCS_DIRECTIVE_LIFT + if (!l_item || l_item->miss_count < DAP_CHAIN_ESBOCS_PENALTY_KICK) + l_vote_for = true; + } + } else { // l_status == -1 // Key is inactive + if (a_directive->type == DAP_CHAIN_ESBOCS_DIRECTIVE_LIFT) { + if (l_item && l_item->miss_count == 0) + l_vote_for = true; + } else { // a_directive->type == DAP_CHAIN_ESBOCS_DIRECTIVE_KICK + if (!l_item || l_item->miss_count != 0) + l_vote_for = true; + } + } + } + default:; + } + + if (PVT(a_session->esbocs)->debug) { + char *l_directive_hash_str = dap_chain_hash_fast_to_str_new(a_directive_hash); + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu Send VOTE %s directive %s", + a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, + a_session->cur_round.attempt_num, l_vote_for ? "FOR" : "AGAINST", + l_directive_hash_str); + DAP_DELETE(l_directive_hash_str); + } + uint8_t l_type = l_vote_for ? DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR : DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_AGAINST; + s_message_send(a_session, l_type, a_directive_hash, NULL, 0, a_session->cur_round.all_validators); + + a_session->cur_round.directive_hash = *a_directive_hash; + a_session->cur_round.directive = DAP_DUP_SIZE(a_directive, a_directive->size); + + s_session_state_change(a_session, DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_VOTING, dap_time_now()); +} + +static int s_session_directive_apply(dap_chain_esbocs_directive_t *a_directive, dap_hash_fast_t *a_directive_hash) +{ + if (!a_directive) { + log_it(L_ERROR, "Can't apply NULL directive"); + return -1; + } + switch (a_directive->type) { + case DAP_CHAIN_ESBOCS_DIRECTIVE_KICK: + case DAP_CHAIN_ESBOCS_DIRECTIVE_LIFT: { + dap_chain_addr_t *l_key_addr = (dap_chain_addr_t *)(((dap_tsd_t *)a_directive->tsd)->data); + int l_status = dap_chain_net_srv_stake_key_delegated(l_key_addr); + if (l_status == 0) { + const char *l_key_str = dap_chain_addr_to_str(l_key_addr); + log_it(L_WARNING, "Invalid key %s with directive type %s applying", + l_key_str, a_directive->type == DAP_CHAIN_ESBOCS_DIRECTIVE_KICK ? + "KICK" : "LIFT"); + return -3; + } + const char *l_key_str = dap_chain_addr_to_str(l_key_addr); + const char *l_penalty_group = s_get_penalty_group(l_key_addr->net_id); + const char *l_directive_hash_str = dap_chain_hash_fast_to_str_new(a_directive_hash); + const char *l_key_hash_str = dap_chain_hash_fast_to_str_new(&l_key_addr->data.hash_fast); + if (l_status == 1 && a_directive->type == DAP_CHAIN_ESBOCS_DIRECTIVE_KICK) { + dap_chain_net_srv_stake_mark_validator_active(l_key_addr, false); + dap_global_db_set(l_penalty_group, l_key_str, NULL, 0, false, NULL, 0); + log_it(L_MSG, "Applied %s directive to exclude validator %s with pkey hash %s from consensus", + l_directive_hash_str, l_key_str, l_key_hash_str); + } else if (l_status == -1 && a_directive->type == DAP_CHAIN_ESBOCS_DIRECTIVE_LIFT) { + dap_chain_net_srv_stake_mark_validator_active(l_key_addr, true); + dap_global_db_del(l_penalty_group, l_key_str, NULL, 0); + log_it(L_MSG, "Applied %s directive to include validator %s with pkey hash %s in consensus", + l_directive_hash_str, l_key_str, l_key_hash_str); + } else { + log_it(L_MSG, "No need to apply directive %s. Validator %s with pkey hash %s already %s consensus", + l_directive_hash_str, l_key_str, l_key_hash_str, + a_directive->type == DAP_CHAIN_ESBOCS_DIRECTIVE_KICK ? + "excluded from" : "included in"); + } + DAP_DELETE(l_key_str); + DAP_DELETE(l_penalty_group); + DAP_DELETE(l_directive_hash_str); + DAP_DELETE(l_key_hash_str); + break; + } + default: + log_it(L_ERROR, "Unknown directive type %hu to apply", a_directive->type); + return -2; + } + return 0; +} + /** * @brief s_session_packet_in * @param a_arg @@ -1299,7 +1627,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod if (a_sender_node_addr) { //Process network message pthread_mutex_lock(&l_session->mutex); - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Receive pkt type:0x%x from addr:"NODE_ADDR_FP_STR", my_addr:"NODE_ADDR_FP_STR"", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_session->cur_round.attempt_num, l_message->hdr.type, @@ -1329,7 +1657,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod dap_chain_hash_fast_t l_data_hash = {}; dap_hash_fast(l_message, a_data_size, &l_data_hash); if (!dap_hash_fast_compare(a_data_hash, &l_data_hash)) { - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Message rejected: message hash does not match", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_session->cur_round.attempt_num); @@ -1338,7 +1666,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod l_message->hdr.sign_size = 0; // restore header on signing time if (dap_sign_verify_all(l_sign, l_sign_size, l_message, l_message_data_size + sizeof(l_message->hdr))) { - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Message rejected from addr:"NODE_ADDR_FP_STR" not passed verification", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_session->cur_round.attempt_num, NODE_ADDR_FP_ARGS(a_sender_node_addr)); @@ -1347,7 +1675,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod l_message->hdr.sign_size = l_sign_size; // restore original header // consensus round start sync - if (l_message->hdr.type == DAP_STREAM_CH_VOTING_MSG_TYPE_START_SYNC) { + if (l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC) { if (!dap_hash_fast_compare(&l_message->hdr.candidate_hash, &l_session->cur_round.last_block_hash)) { debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U"." " Sync message with different last block hash was added to the queue", @@ -1359,7 +1687,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod } else if (l_message->hdr.round_id != l_session->cur_round.id || l_message->hdr.attempt_num < l_session->cur_round.attempt_num) { // round check - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Message rejected: round or attempt in message does not match", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_session->cur_round.attempt_num); @@ -1372,11 +1700,11 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod dap_chain_addr_fill_from_sign(&l_signing_addr, l_sign, l_session->chain->net_id); if (l_cs_debug) l_validator_addr_str = dap_chain_addr_to_str(&l_signing_addr); - if ((l_message->hdr.type != DAP_STREAM_CH_VOTING_MSG_TYPE_START_SYNC && // Accept only current round synced validators + if ((l_message->hdr.type != DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC && // Accept only current round synced validators !s_validator_check_synced(&l_signing_addr, l_session->cur_round.validators_list)) || - (l_message->hdr.type == DAP_STREAM_CH_VOTING_MSG_TYPE_START_SYNC && // Accept all validators + (l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC && // Accept all validators !dap_chain_net_srv_stake_key_delegated(&l_signing_addr))) { - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Message rejected: validator addr:%s not in the current validators list", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, l_validator_addr_str); @@ -1389,7 +1717,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod dap_chain_esbocs_message_item_t *l_message_item_temp = NULL; HASH_FIND(hh, l_round->message_items, a_data_hash, sizeof(dap_chain_hash_fast_t), l_message_item_temp); if (l_message_item_temp) { - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Message rejected: message hash is exists in chain (duplicate)", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num); @@ -1400,15 +1728,15 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod dap_chain_esbocs_message_item_t *l_chain_message, *l_chain_message_tmp; HASH_ITER(hh, l_round->message_items, l_chain_message, l_chain_message_tmp) { bool l_same_type = l_chain_message->message->hdr.type == l_message->hdr.type || - (l_chain_message->message->hdr.type == DAP_STREAM_CH_VOTING_MSG_TYPE_APPROVE && - l_message->hdr.type == DAP_STREAM_CH_VOTING_MSG_TYPE_REJECT) || - (l_chain_message->message->hdr.type == DAP_STREAM_CH_VOTING_MSG_TYPE_REJECT && - l_message->hdr.type == DAP_STREAM_CH_VOTING_MSG_TYPE_APPROVE); + (l_chain_message->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE && + l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_REJECT) || + (l_chain_message->message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_REJECT && + l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE); if (l_same_type && dap_chain_addr_compare(&l_chain_message->signing_addr, &l_signing_addr) && dap_hash_fast_compare(&l_chain_message->message->hdr.candidate_hash, &l_message->hdr.candidate_hash)) { - if (l_message->hdr.type != DAP_STREAM_CH_VOTING_MSG_TYPE_START_SYNC || // Not sync or same sync attempt + if (l_message->hdr.type != DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC || // Not sync or same sync attempt *(uint64_t *)l_message_data == *(uint64_t *)l_chain_message->message->msg_n_sign) { - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Message rejected: duplicate message %s", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, @@ -1421,7 +1749,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod s_message_chain_add(l_session, l_message, a_data_size, a_data_hash, &l_signing_addr); switch (l_message->hdr.type) { - case DAP_STREAM_CH_VOTING_MSG_TYPE_START_SYNC: { + case DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC: { if (l_message_data_size != sizeof(uint64_t)) { log_it(L_WARNING, "Invalid START_SYNC message size"); break; @@ -1464,6 +1792,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod } else // Send it immediatly, if was not sent yet s_session_send_startsync(l_session); + s_session_validator_mark_online(l_session, &l_signing_addr); dap_list_t *l_list = s_validator_check(&l_signing_addr, l_session->cur_round.validators_list); if (!l_list) break; @@ -1471,7 +1800,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod l_validator->is_synced = true; if (++l_session->cur_round.validators_synced_count == dap_list_length(l_session->cur_round.validators_list)) { l_session->cur_round.id = s_session_calc_current_round_id(l_session); - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " All validators are synchronized, wait to submit candidate", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num); @@ -1479,11 +1808,11 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod } } break; - case DAP_STREAM_CH_VOTING_MSG_TYPE_SUBMIT: { + case DAP_CHAIN_ESBOCS_MSG_TYPE_SUBMIT: { uint8_t *l_candidate = l_message->msg_n_sign; size_t l_candidate_size = l_message_data_size; if (!l_candidate_size || dap_hash_fast_is_blank(&l_message->hdr.candidate_hash)) { - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Receive SUBMIT candidate NULL", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num); @@ -1495,7 +1824,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod dap_chain_hash_fast_t l_check_hash; dap_hash_fast(l_candidate, l_candidate_size, &l_check_hash); if (!dap_hash_fast_compare(&l_check_hash, l_candidate_hash)) { - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Receive SUBMIT candidate hash broken", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num); @@ -1504,7 +1833,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod if (l_cs_debug) { char *l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); - log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Receive SUBMIT candidate %s, size %zu", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, l_candidate_hash_str, l_candidate_size); @@ -1515,7 +1844,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod HASH_FIND(hh, l_session->cur_round.store_items, l_candidate_hash, sizeof(dap_chain_hash_fast_t), l_store); if (l_store) { char *l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); - log_it(L_WARNING, "Duplicate candidate:%s", l_candidate_hash_str); + log_it(L_WARNING, "Duplicate candidate: %s", l_candidate_hash_str); DAP_DELETE(l_candidate_hash_str); break; } @@ -1536,14 +1865,14 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod } } break; - case DAP_STREAM_CH_VOTING_MSG_TYPE_REJECT: { + case DAP_CHAIN_ESBOCS_MSG_TYPE_REJECT: { dap_chain_esbocs_store_t *l_store; char *l_candidate_hash_str = NULL; HASH_FIND(hh, l_session->cur_round.store_items, l_candidate_hash, sizeof(dap_chain_hash_fast_t), l_store); if (!l_store) { l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); - log_it(L_WARNING, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." - " Receive REJECT message for unknown candidate:%s", + log_it(L_WARNING, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Receive REJECT message for unknown candidate %s", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, l_candidate_hash_str); @@ -1553,16 +1882,16 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod if (l_cs_debug) { l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); - log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." - " Receive REJECT: candidate:%s", + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Receive REJECT: candidate %s", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, l_candidate_hash_str); } if (++l_store->reject_count >= l_cs_level && !l_store->decide_reject && dap_hash_fast_compare(&l_session->cur_round.attempt_candidate_hash, l_candidate_hash)) { l_store->decide_reject = true; - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." - " Candidate:%s rejected by minimum number of validators, attempt failed", + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Candidate %s rejected by minimum number of validators, attempt failed", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, l_candidate_hash_str); s_session_attempt_new(l_session); @@ -1570,14 +1899,14 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod DAP_DEL_Z(l_candidate_hash_str); } break; - case DAP_STREAM_CH_VOTING_MSG_TYPE_APPROVE: { + case DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE: { dap_chain_esbocs_store_t *l_store; char *l_candidate_hash_str = NULL; HASH_FIND(hh, l_session->cur_round.store_items, l_candidate_hash, sizeof(dap_chain_hash_fast_t), l_store); if (!l_store) { l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); - log_it(L_WARNING, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." - " Receive APPROVE message for unknown candidate:%s", + log_it(L_WARNING, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Receive APPROVE message for unknown candidate %s", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, l_candidate_hash_str); @@ -1587,30 +1916,30 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod if (l_cs_debug) { l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); - log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." - " Receive APPROVE: candidate:%s", + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Receive APPROVE: candidate %s", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, l_candidate_hash_str); } if (++l_store->approve_count >= l_cs_level && !l_store->decide_approve && dap_hash_fast_compare(&l_session->cur_round.attempt_candidate_hash, l_candidate_hash)) { l_store->decide_approve = true; - debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." - " Candidate:%s approved by minimum number of validators, let's sign it", + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Candidate %s approved by minimum number of validators, let's sign it", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, l_candidate_hash_str); size_t l_offset = dap_chain_block_get_sign_offset(l_store->candidate, l_store->candidate_size); dap_sign_t *l_candidate_sign = dap_sign_create(PVT(l_session->esbocs)->blocks_sign_key, l_store->candidate, l_offset + sizeof(l_store->candidate->hdr), 0); size_t l_candidate_sign_size = dap_sign_get_size(l_candidate_sign); - s_message_send(l_session, DAP_STREAM_CH_VOTING_MSG_TYPE_COMMIT_SIGN, l_candidate_hash, + s_message_send(l_session, DAP_CHAIN_ESBOCS_MSG_TYPE_COMMIT_SIGN, l_candidate_hash, l_candidate_sign, l_candidate_sign_size, l_session->cur_round.validators_list); DAP_DELETE(l_candidate_sign); } DAP_DEL_Z(l_candidate_hash_str); } break; - case DAP_STREAM_CH_VOTING_MSG_TYPE_COMMIT_SIGN: { + case DAP_CHAIN_ESBOCS_MSG_TYPE_COMMIT_SIGN: { if (l_message_data_size < sizeof(dap_sign_t)) { log_it(L_WARNING, "Wrong commit_sign message size, have %zu bytes for candidate sign section" " when requires at least %zu bytes", @@ -1631,8 +1960,8 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod HASH_FIND(hh, l_session->cur_round.store_items, l_candidate_hash, sizeof(dap_chain_hash_fast_t), l_store); if (!l_store) { l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); - log_it(L_WARNING, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." - " Receive COMMIT_SIGN message for unknown candidate:%s", + log_it(L_WARNING, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Receive COMMIT_SIGN message for unknown candidate %s", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, l_candidate_hash_str); @@ -1642,8 +1971,8 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod if (l_cs_debug) { l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); - log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." - " Receive COMMIT_SIGN: candidate:%s", + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Receive COMMIT_SIGN: candidate %s", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.attempt_num, l_candidate_hash_str); } @@ -1657,8 +1986,8 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod DAP_DUP_SIZE(l_candidate_sign, l_candidate_sign_size)); if (dap_list_length(l_store->candidate_signs) == l_round->validators_synced_count) { if (PVT(l_session->esbocs)->debug) - log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu." - " Candidate:%s collected signs of all synced validators", + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Candidate %s collected signs of all synced validators", l_session->chain->net_name, l_session->chain->name, l_round->id, l_message->hdr.attempt_num, l_candidate_hash_str); s_session_state_change(l_session, DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_FINISH, dap_time_now()); @@ -1666,12 +1995,89 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod } else { if (!l_candidate_hash_str) l_candidate_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); - log_it(L_WARNING, "Candidate:%s sign is incorrect: code %d", l_candidate_hash_str, l_sign_verified); + log_it(L_WARNING, "Candidate: %s sign is incorrect: code %d", l_candidate_hash_str, l_sign_verified); } DAP_DEL_Z(l_candidate_hash_str); } break; - case DAP_STREAM_CH_VOTING_MSG_TYPE_PRE_COMMIT: + case DAP_CHAIN_ESBOCS_MSG_TYPE_DIRECTIVE: { + if (l_session->cur_round.directive) { + log_it(L_WARNING, "Only one directive can be processed at a time"); + break; + } + dap_chain_esbocs_directive_t *l_directive = (dap_chain_esbocs_directive_t *)l_message->msg_n_sign; + size_t l_directive_size = l_message_data_size; + if (l_directive_size < sizeof(dap_chain_esbocs_directive_t) || l_directive_size != l_directive->size) { + log_it(L_WARNING, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Receive DIRECTIVE with invalid size %zu)", + l_session->chain->net_name, l_session->chain->name, + l_session->cur_round.id, l_message->hdr.attempt_num, + l_directive_size); + break; + } + // check directive hash + dap_chain_hash_fast_t l_directive_hash; + dap_hash_fast(l_directive, l_directive_size, &l_directive_hash); + if (!dap_hash_fast_compare(&l_directive_hash, l_candidate_hash)) { + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Receive DIRECTIVE hash broken", + l_session->chain->net_name, l_session->chain->name, + l_session->cur_round.id, l_message->hdr.attempt_num); + break; + } + if (l_cs_debug) { + char *l_dirtective_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Receive DIRECTIVE hash %s, size %zu", + l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, + l_message->hdr.attempt_num, l_dirtective_hash_str, l_directive_size); + DAP_DELETE(l_dirtective_hash_str); + } + s_session_directive_process(l_session, l_directive, &l_directive_hash); + } break; + + case DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR: + case DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_AGAINST: { + if (dap_hash_fast_is_blank(l_candidate_hash)) { + log_it(L_WARNING, "Receive VOTE %s for empty directive", + l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR ? + "FOR" : "AGAINST"); + break; + } + if (!dap_hash_fast_compare(&l_session->cur_round.directive_hash, l_candidate_hash)) { + debug_if(l_cs_debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + "Received VOTE %s unknown directive", + l_session->chain->net_name, l_session->chain->name, + l_session->cur_round.id, l_message->hdr.attempt_num, + l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR ? + "FOR" : "AGAINST"); + break; + } + if (l_cs_debug) { + char *l_directive_hash_str = dap_chain_hash_fast_to_str_new(l_candidate_hash); + log_it(L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." + " Receive VOTE %s directive %s", + l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, + l_message->hdr.attempt_num, l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR ? + "FOR" : "AGAINST", l_directive_hash_str); + DAP_DELETE(l_directive_hash_str); + } + if (l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR) { + if (!l_session->cur_round.directive_applied && + ++l_session->cur_round.votes_for_count * 3 >= + dap_list_length(l_session->cur_round.all_validators) * 2) { + s_session_directive_apply(l_session->cur_round.directive, &l_session->cur_round.directive_hash); + l_session->cur_round.directive_applied = true; + s_session_state_change(l_session, DAP_CHAIN_ESBOCS_SESSION_STATE_PREVIOUS, dap_time_now()); + } + } else // l_message->hdr.type == DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_AGAINST + if (++l_session->cur_round.votes_against_count * 3 >= + dap_list_length(l_session->cur_round.all_validators) * 2) + s_session_state_change(l_session, DAP_CHAIN_ESBOCS_SESSION_STATE_PREVIOUS, dap_time_now()); + + } break; + + case DAP_CHAIN_ESBOCS_MSG_TYPE_PRE_COMMIT: s_session_candidate_precommit(l_session, l_message); default: break; @@ -1715,7 +2121,7 @@ static void s_message_send(dap_chain_esbocs_session_t *a_session, uint8_t a_mess for (dap_list_t *it = a_validators; it; it = it->next) { dap_chain_esbocs_validator_t *l_validator = it->data; - if (l_validator->is_synced || a_message_type == DAP_STREAM_CH_VOTING_MSG_TYPE_START_SYNC) { + if (l_validator->is_synced || a_message_type == DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC) { debug_if(PVT(a_session->esbocs)->debug, L_MSG, "Send pkt type 0x%x to "NODE_ADDR_FP_STR, a_message_type, NODE_ADDR_FP_ARGS_S(l_validator->node_addr)); l_voting_pkt->hdr.receiver_node_addr = l_validator->node_addr; @@ -1760,7 +2166,7 @@ static int s_callback_block_verify(dap_chain_cs_blocks_t *a_blocks, dap_chain_bl return -8; } - if (l_esbocs->session->processing_candidate == a_block) + if (l_esbocs->session && l_esbocs->session->processing_candidate == a_block) // It's a block candidate, don't check signs return 0; diff --git a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h index 4a05245023ffd2853e04b6d357e6f1eea6833de8..c1a78ff09c943015cb71b8ffd896197b62f4fe9a 100644 --- a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h +++ b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h @@ -6,20 +6,28 @@ #include "dap_chain_cs_blocks.h" #include "dap_cert.h" -#define DAP_CHAIN_ESBOCS_PROTOCOL_VERSION 5 - -#define DAP_STREAM_CH_VOTING_MSG_TYPE_SUBMIT 0x04 -#define DAP_STREAM_CH_VOTING_MSG_TYPE_APPROVE 0x08 -#define DAP_STREAM_CH_VOTING_MSG_TYPE_REJECT 0x12 -#define DAP_STREAM_CH_VOTING_MSG_TYPE_COMMIT_SIGN 0x16 -#define DAP_STREAM_CH_VOTING_MSG_TYPE_VOTE 0x20 -//#define DAP_STREAM_CH_VOTING_MSG_TYPE_VOTE_FOR 0x24 -#define DAP_STREAM_CH_VOTING_MSG_TYPE_PRE_COMMIT 0x28 -#define DAP_STREAM_CH_VOTING_MSG_TYPE_START_SYNC 0x32 +#define DAP_CHAIN_ESBOCS_PROTOCOL_VERSION 6 +#define DAP_CHAIN_ESBOCS_GDB_GROUPS_PREFIX "esbocs" + +#define DAP_CHAIN_ESBOCS_MSG_TYPE_SUBMIT 0x04 +#define DAP_CHAIN_ESBOCS_MSG_TYPE_APPROVE 0x08 +#define DAP_CHAIN_ESBOCS_MSG_TYPE_REJECT 0x12 +#define DAP_CHAIN_ESBOCS_MSG_TYPE_COMMIT_SIGN 0x16 +#define DAP_CHAIN_ESBOCS_MSG_TYPE_PRE_COMMIT 0x28 +#define DAP_CHAIN_ESBOCS_MSG_TYPE_DIRECTIVE 0x20 +#define DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_FOR 0x22 +#define DAP_CHAIN_ESBOCS_MSG_TYPE_VOTE_AGAINST 0x24 +#define DAP_CHAIN_ESBOCS_MSG_TYPE_START_SYNC 0x32 #define DAP_CHAIN_BLOCKS_SESSION_ROUND_ID_SIZE 8 #define DAP_CHAIN_BLOCKS_SESSION_MESSAGE_ID_SIZE 8 +#define DAP_CHAIN_ESBOCS_DIRECTIVE_VERSION 1 +#define DAP_CHAIN_ESBOCS_DIRECTIVE_KICK 0x10 +#define DAP_CHAIN_ESBOCS_DIRECTIVE_LIFT 0x11 + +#define DAP_CHAIN_ESBOCS_DIRECTIVE_TSD_TYPE_ADDR 0x01 + typedef struct dap_chain_esbocs_session dap_chain_esbocs_session_t; /* consensus messages @@ -28,8 +36,10 @@ typedef struct dap_chain_esbocs_session dap_chain_esbocs_session_t; • Approve(round, candidate) — a block candidate has passed local validation • Reject(round, candidate) — a block candidate has failed local validation • CommitSign(round, candidate, signature) — a block candidate has been accepted and signed *** sign in data section -• Vote(round, candidate) — a vote for a block candidate in this round (even if the current process has another opinion) • PreCommit(round, candidate, final_hash) — a preliminary commitment to a block candidate *** candidate with signs hash in data section +• Directive(round, body) — a directive to change consensus parameters *** directive body in data section +• VoteFor(round, directive) — a vote for a directive in this round +• VoteAgainst(round, directive) — a vote against a directive in this round */ typedef struct dap_chain_esbocs_message_hdr { uint16_t version; @@ -85,23 +95,41 @@ typedef struct dap_chain_esbocs { void *_pvt; } dap_chain_esbocs_t; +typedef struct dap_chain_esbocs_directive { + uint8_t version; + uint8_t type; + uint16_t pad; + uint32_t size; + dap_nanotime_t timestamp; + byte_t tsd[]; +} DAP_ALIGN_PACKED dap_chain_esbocs_directive_t; + typedef struct dap_chain_esbocs_round { // Base fields uint64_t id; uint8_t attempt_num; dap_hash_fast_t last_block_hash; - // Round ancillary + // Round store dap_chain_esbocs_store_t *store_items; dap_chain_esbocs_message_item_t *message_items; + // Round directive + dap_hash_fast_t directive_hash; + dap_chain_esbocs_directive_t *directive; + bool directive_applied; + uint16_t votes_for_count; + uint16_t votes_against_count; // Attempt dependent fields dap_chain_addr_t attempt_submit_validator; dap_hash_fast_t attempt_candidate_hash; // Validators section - uint16_t validators_synced_count; dap_list_t *validators_list; + uint16_t validators_synced_count; + // Synchronization params uint64_t sync_attempt; bool sync_sent; - uint16_t total_validators_count; + // Check validators online & wide consensus sync + dap_list_t *all_validators; + uint16_t total_validators_synced; } dap_chain_esbocs_round_t; typedef struct dap_chain_esbocs_validator { @@ -112,6 +140,14 @@ typedef struct dap_chain_esbocs_validator { bool is_chosen; } dap_chain_esbocs_validator_t; +typedef struct dap_chain_esbocs_penalty_item { + dap_chain_addr_t signing_addr; + uint16_t miss_count; + UT_hash_handle hh; +} dap_chain_esbocs_penalty_item_t; + +#define DAP_CHAIN_ESBOCS_PENALTY_KICK 3U // Number of missed rounds to kick + typedef struct dap_chain_esbocs_session { pthread_mutex_t mutex; dap_chain_block_t *processing_candidate; @@ -121,11 +157,12 @@ typedef struct dap_chain_esbocs_session { dap_chain_node_addr_t my_addr; uint8_t state; // session state + uint8_t old_state; // for previous state return dap_chain_esbocs_round_t cur_round; bool round_fast_forward; dap_time_t ts_round_sync_start; // time of start sync - dap_time_t ts_attempt_start; // time of current attempt start + dap_time_t ts_stage_entry; // time of current stage entrance dap_chain_esbocs_sync_item_t *sync_items; dap_timerfd_t *sync_timer; @@ -133,11 +170,13 @@ typedef struct dap_chain_esbocs_session { dap_enc_key_t *blocks_sign_key; dap_chain_addr_t my_signing_addr; + dap_chain_esbocs_penalty_item_t *penalty; + struct dap_chain_esbocs_session *next; struct dap_chain_esbocs_session *prev; } dap_chain_esbocs_session_t; #define DAP_CHAIN_ESBOCS(a) ((dap_chain_esbocs_t *)(a)->_inheritor) + int dap_chain_cs_esbocs_init(); void dap_chain_cs_esbocs_deinit(void); - diff --git a/modules/net/include/dap_chain_node_client.h b/modules/net/include/dap_chain_node_client.h index 1d3230409727f6fc79d7ab3e8859c89247e675a9..ad428d19e236154abf6f9b0d9f5be74dde7827e8 100644 --- a/modules/net/include/dap_chain_node_client.h +++ b/modules/net/include/dap_chain_node_client.h @@ -171,12 +171,13 @@ DAP_STATIC_INLINE dap_chain_node_client_t* dap_chain_node_client_connect_default DAP_STATIC_INLINE ssize_t dap_chain_node_client_write_unsafe(dap_chain_node_client_t *a_client, const char a_ch_id, uint8_t a_type, void *a_data, size_t a_data_size) -{ return dap_client_write_unsafe(a_client->client, a_ch_id, a_type, a_data, a_data_size); } +{ if (!a_client) return 0; return dap_client_write_unsafe(a_client->client, a_ch_id, a_type, a_data, a_data_size); } DAP_STATIC_INLINE int dap_chain_node_client_write_mt(dap_chain_node_client_t *a_client, const char a_ch_id, uint8_t a_type, void *a_data, size_t a_data_size) -{ return dap_client_write_mt(a_client->client, a_ch_id, a_type, a_data, a_data_size); } +{ if (!a_client) return -1; return dap_client_write_mt(a_client->client, a_ch_id, a_type, a_data, a_data_size); } -dap_chain_node_client_t *dap_chain_node_client_find(dap_events_socket_uuid_t a_uuid); +DAP_STATIC_INLINE void dap_chain_node_client_queue_clear(dap_chain_node_client_t *a_client) +{ if (!a_client) return; dap_client_queue_clear(a_client->client); }; /** * Reset client state to connected state if it is connected diff --git a/modules/service/stake/dap_chain_net_srv_stake_pos_delegate.c b/modules/service/stake/dap_chain_net_srv_stake_pos_delegate.c index 959986e78f4ccf822240c22f6c30d421773a9ddd..31f9fa3556db4874e8bd453c03eaf974c1d51fc1 100644 --- a/modules/service/stake/dap_chain_net_srv_stake_pos_delegate.c +++ b/modules/service/stake/dap_chain_net_srv_stake_pos_delegate.c @@ -176,6 +176,7 @@ void dap_chain_net_srv_stake_key_delegate(dap_chain_net_t *a_net, dap_chain_addr l_stake->signing_addr = *a_signing_addr; l_stake->value = a_value; l_stake->tx_hash = *a_stake_tx_hash; + l_stake->is_active = true; if (!l_found) HASH_ADD(hh, s_srv_stake->itemlist, signing_addr, sizeof(dap_chain_addr_t), l_stake); if (!dap_hash_fast_is_blank(a_stake_tx_hash)) @@ -225,30 +226,46 @@ uint256_t dap_chain_net_srv_stake_get_allowed_min_value() return s_srv_stake->delegate_allowed_min; } -bool dap_chain_net_srv_stake_key_delegated(dap_chain_addr_t *a_signing_addr) +int dap_chain_net_srv_stake_key_delegated(dap_chain_addr_t *a_signing_addr) { assert(s_srv_stake); if (!a_signing_addr) - return false; + return 0; dap_chain_net_srv_stake_item_t *l_stake = NULL; HASH_FIND(hh, s_srv_stake->itemlist, a_signing_addr, sizeof(dap_chain_addr_t), l_stake); if (l_stake) // public key delegated for this network - return true; - return false; + return l_stake->is_active ? 1 : -1; + return 0; } -dap_list_t *dap_chain_net_srv_stake_get_validators(dap_chain_net_id_t a_net_id) +dap_list_t *dap_chain_net_srv_stake_get_validators(dap_chain_net_id_t a_net_id, bool a_is_active) { dap_list_t *l_ret = NULL; if (!s_srv_stake || !s_srv_stake->itemlist) return l_ret; for (dap_chain_net_srv_stake_item_t *l_stake = s_srv_stake->itemlist; l_stake; l_stake = l_stake->hh.next) - if (a_net_id.uint64 == l_stake->signing_addr.net_id.uint64) + if (a_net_id.uint64 == l_stake->signing_addr.net_id.uint64 && + l_stake->is_active == a_is_active) l_ret = dap_list_append(l_ret, DAP_DUP(l_stake)); return l_ret; } +int dap_chain_net_srv_stake_mark_validator_active(dap_chain_addr_t *a_signing_addr, bool a_on_off) +{ + assert(s_srv_stake); + if (!a_signing_addr) + return -1; + + dap_chain_net_srv_stake_item_t *l_stake = NULL; + HASH_FIND(hh, s_srv_stake->itemlist, a_signing_addr, sizeof(dap_chain_addr_t), l_stake); + if (l_stake) { // public key delegated for this network + l_stake->is_active = a_on_off; + return 0; + } + return -2; +} + int dap_chain_net_srv_stake_verify_key_and_node(dap_chain_addr_t *a_signing_addr, dap_chain_node_addr_t *a_node_addr) { assert(s_srv_stake); diff --git a/modules/service/stake/include/dap_chain_net_srv_stake_pos_delegate.h b/modules/service/stake/include/dap_chain_net_srv_stake_pos_delegate.h index e704a53adc62a982bd18211921a1f66c3d414571..6d5fc95306731483a3e2f9dab772c39822ec0c3c 100644 --- a/modules/service/stake/include/dap_chain_net_srv_stake_pos_delegate.h +++ b/modules/service/stake/include/dap_chain_net_srv_stake_pos_delegate.h @@ -70,9 +70,9 @@ void dap_chain_net_srv_stake_key_invalidate(dap_chain_addr_t *a_signing_addr); void dap_chain_net_srv_stake_set_allowed_min_value(uint256_t a_value); uint256_t dap_chain_net_srv_stake_get_allowed_min_value(); -bool dap_chain_net_srv_stake_key_delegated(dap_chain_addr_t *a_addr); +int dap_chain_net_srv_stake_key_delegated(dap_chain_addr_t *a_addr); int dap_chain_net_srv_stake_verify_key_and_node(dap_chain_addr_t* a_signing_addr, dap_chain_node_addr_t* a_node_addr); -dap_list_t *dap_chain_net_srv_stake_get_validators(dap_chain_net_id_t a_net_id); +dap_list_t *dap_chain_net_srv_stake_get_validators(dap_chain_net_id_t a_net_id, bool a_is_active); bool dap_chain_net_srv_stake_get_fee_validators(dap_chain_net_t *a_net, uint256_t *a_max_fee, uint256_t *a_average_fee, uint256_t *a_min_fee); @@ -87,3 +87,4 @@ bool dap_chain_net_srv_stake_check_validator(dap_chain_net_t * a_net, dap_hash_f dap_chain_datum_decree_t *dap_chain_net_srv_stake_decree_approve(dap_chain_net_t *a_net, dap_hash_fast_t *a_stake_tx_hash, dap_cert_t *a_cert); +int dap_chain_net_srv_stake_mark_validator_active(dap_chain_addr_t *a_signing_addr, bool a_on_off);