From 42abe42c04efef51cb352f781a07777a026fa39d Mon Sep 17 00:00:00 2001 From: Dmitry Gerasimov <dmitriy.gerasimov@demlabs.net> Date: Wed, 22 Jun 2022 14:50:04 +0700 Subject: [PATCH] [!] Refactoring with some renames --- modules/chain/dap_chain_ledger.c | 30 +- modules/channel/chain/dap_stream_ch_chain.c | 8 +- .../block-ton/dap_chain_cs_block_ton.c | 598 ++++++++++-------- .../include/dap_chain_cs_block_ton.h | 19 +- .../consensus/dag-poa/dap_chain_cs_dag_poa.c | 10 +- modules/consensus/none/dap_chain_cs_none.c | 6 +- modules/global-db/dap_global_db.c | 10 +- modules/global-db/include/dap_global_db.h | 4 +- modules/mempool/dap_chain_mempool.c | 16 +- modules/net/dap_chain_net.c | 133 ++-- modules/net/dap_chain_node.c | 2 +- modules/net/dap_chain_node_cli_cmd.c | 20 +- modules/net/include/dap_chain_net.h | 4 +- .../service/datum/dap_chain_net_srv_datum.c | 4 +- modules/type/blocks/dap_chain_block_chunk.c | 2 +- modules/type/blocks/dap_chain_cs_blocks.c | 69 +- .../type/blocks/include/dap_chain_cs_blocks.h | 3 +- modules/type/dag/dap_chain_cs_dag.c | 2 +- 18 files changed, 522 insertions(+), 418 deletions(-) diff --git a/modules/chain/dap_chain_ledger.c b/modules/chain/dap_chain_ledger.c index a144baf472..9b920050da 100644 --- a/modules/chain/dap_chain_ledger.c +++ b/modules/chain/dap_chain_ledger.c @@ -1197,19 +1197,19 @@ static void s_threshold_txs_proc( dap_ledger_t *a_ledger) * @param a_key * @param a_values_total * @param a_values_shift - * @param a_value_count + * @param a_values_count * @param a_values * @param a_arg */ static bool s_load_cache_gdb_loaded_balances_callback(dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_global_db_obj_t * a_values, + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg) { dap_ledger_t * l_ledger = (dap_ledger_t*) a_arg; dap_ledger_private_t * l_ledger_pvt = PVT(l_ledger); - for (size_t i = 0; i < a_value_count; i++) { + for (size_t i = 0; i < a_values_count; i++) { dap_ledger_wallet_balance_t *l_balance_item = DAP_NEW_Z(dap_ledger_wallet_balance_t); l_balance_item->key = DAP_NEW_Z_SIZE(char, strlen(a_values[i].key) + 1); strcpy(l_balance_item->key, a_values[i].key); @@ -1239,20 +1239,20 @@ static bool s_load_cache_gdb_loaded_balances_callback(dap_global_db_context_t * * @param a_key * @param a_values_total * @param a_values_shift - * @param a_value_count + * @param a_values_count * @param a_values * @param a_arg */ static bool s_load_cache_gdb_loaded_spent_txs_callback(dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_global_db_obj_t * a_values, + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg) { dap_ledger_t * l_ledger = (dap_ledger_t*) a_arg; dap_ledger_private_t * l_ledger_pvt = PVT(l_ledger); - for (size_t i = 0; i < a_value_count; i++) { + for (size_t i = 0; i < a_values_count; i++) { dap_chain_ledger_tx_spent_item_t *l_tx_spent_item = DAP_NEW_Z(dap_chain_ledger_tx_spent_item_t); dap_chain_hash_fast_from_str(a_values[i].key, &l_tx_spent_item->tx_hash_fast); strncpy(l_tx_spent_item->token_ticker, (char *)a_values[i].value, @@ -1274,19 +1274,19 @@ static bool s_load_cache_gdb_loaded_spent_txs_callback(dap_global_db_context_t * * @param a_key * @param a_values_total * @param a_values_shift - * @param a_value_count + * @param a_values_count * @param a_values * @param a_arg */ static bool s_load_cache_gdb_loaded_txs_callback(dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_global_db_obj_t * a_values, + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg) { dap_ledger_t * l_ledger = (dap_ledger_t*) a_arg; dap_ledger_private_t * l_ledger_pvt = PVT(l_ledger); - for (size_t i = 0; i < a_value_count; i++) { + for (size_t i = 0; i < a_values_count; i++) { dap_chain_ledger_tx_item_t *l_tx_item = DAP_NEW_Z(dap_chain_ledger_tx_item_t); dap_chain_hash_fast_from_str(a_values[i].key, &l_tx_item->tx_hash_fast); l_tx_item->tx = DAP_NEW_Z_SIZE(dap_chain_datum_tx_t, a_values[i].value_len - sizeof(l_tx_item->cache_data)); @@ -1310,7 +1310,7 @@ static bool s_load_cache_gdb_loaded_txs_callback(dap_global_db_context_t * a_glo * @param a_key * @param a_values_total * @param a_values_shift - * @param a_value_count + * @param a_values_count * @param a_values * @param a_arg * @return Always true thats means to clear up a_values @@ -1318,13 +1318,13 @@ static bool s_load_cache_gdb_loaded_txs_callback(dap_global_db_context_t * a_glo static bool s_load_cache_gdb_loaded_emissions_callback(dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_global_db_obj_t * a_values, + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg) { dap_ledger_t * l_ledger = (dap_ledger_t*) a_arg; dap_ledger_private_t * l_ledger_pvt = PVT(l_ledger); - for (size_t i = 0; i < a_value_count; i++) { + for (size_t i = 0; i < a_values_count; i++) { if (a_values[i].value_len <= sizeof(dap_hash_fast_t)) continue; const char *c_token_ticker = ((dap_chain_datum_token_emission_t *) @@ -1360,14 +1360,14 @@ static bool s_load_cache_gdb_loaded_emissions_callback(dap_global_db_context_t * * @param a_key * @param a_values_total * @param a_values_shift - * @param a_value_count + * @param a_values_count * @param a_values * @param a_arg */ static bool s_load_cache_gdb_loaded_tokens_callback(dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_global_db_obj_t * a_values, + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg) { dap_ledger_t * l_ledger = (dap_ledger_t*) a_arg; @@ -1379,7 +1379,7 @@ static bool s_load_cache_gdb_loaded_tokens_callback(dap_global_db_context_t * a_ pthread_mutex_unlock(&l_ledger_pvt->load_mutex); } - for (size_t i = 0; i < a_value_count; i++) { + for (size_t i = 0; i < a_values_count; i++) { if (a_values[i].value_len <= sizeof(uint256_t)) continue; dap_chain_datum_token_t *l_token = (dap_chain_datum_token_t *)(a_values[i].value + sizeof(uint256_t)); diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 15a495438e..07de20b74a 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -114,7 +114,7 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg); static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg); static void s_gdb_in_pkt_proc_set_raw_callback(dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_current, const size_t a_values_shift, - const size_t a_value_count, dap_store_obj_t * a_values, void * a_arg); + const size_t a_values_count, dap_store_obj_t * a_values, void * a_arg); static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_thread, void *a_arg); static void s_free_log_list_gdb ( dap_stream_ch_chain_t * a_ch_chain); @@ -656,7 +656,7 @@ dap_chain_t *dap_chain_get_chain_from_group_name(dap_chain_net_id_t a_net_id, co return false; dap_chain_t *l_chain = NULL; DL_FOREACH(l_net->pub.chains, l_chain) { - char *l_chain_group_name = dap_chain_net_get_gdb_group_from_chain(l_chain); + char *l_chain_group_name = dap_chain_net_get_gdb_group_from_chain_new(l_chain); if (!strcmp(a_group_name, l_chain_group_name)) { DAP_DELETE(l_chain_group_name); return l_chain; @@ -837,12 +837,12 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) * @param a_key * @param a_values_current * @param a_values_shift - * @param a_value_count + * @param a_values_count * @param a_values * @param a_arg */ static void s_gdb_in_pkt_proc_set_raw_callback(dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_current, const size_t a_values_shift, - const size_t a_value_count, dap_store_obj_t * a_values, void * a_arg) + const size_t a_values_count, dap_store_obj_t * a_values, void * a_arg) { struct sync_request *l_sync_req = (struct sync_request*) a_arg; diff --git a/modules/consensus/block-ton/dap_chain_cs_block_ton.c b/modules/consensus/block-ton/dap_chain_cs_block_ton.c index b7952140a6..694f38dc26 100644 --- a/modules/consensus/block-ton/dap_chain_cs_block_ton.c +++ b/modules/consensus/block-ton/dap_chain_cs_block_ton.c @@ -17,33 +17,33 @@ static int s_callback_new(dap_chain_t *a_chain, dap_config_t *a_chain_cfg); static void s_session_packet_in(void * a_arg, dap_chain_node_addr_t * a_sender_node_addr, dap_chain_hash_fast_t *a_data_hash, uint8_t *a_data, size_t a_data_size); static void s_session_candidate_to_chain( - dap_chain_cs_block_ton_items_t *a_session, dap_chain_hash_fast_t *a_candidate_hash, + dap_chain_cs_block_ton_session_t *a_session, dap_chain_hash_fast_t *a_candidate_hash, dap_chain_block_t *a_candidate, size_t a_candidate_size); -static bool s_session_candidate_submit(dap_chain_cs_block_ton_items_t *a_session); +static void s_session_candidate_submit(dap_chain_cs_block_ton_session_t *a_session); static bool s_session_timer(); static int s_session_atom_validation(dap_chain_cs_blocks_t *a_blocks, dap_chain_block_t *a_block, size_t a_block_size); -static uint8_t *s_message_data_sign(dap_chain_cs_block_ton_items_t *a_session, +static uint8_t *s_message_data_sign(dap_chain_cs_block_ton_session_t *a_session, dap_chain_cs_block_ton_message_t *a_message, size_t *a_sign_size); -static void s_message_send(dap_chain_cs_block_ton_items_t *a_session, uint8_t a_message_type, +static void s_message_send(dap_chain_cs_block_ton_session_t *a_session, uint8_t a_message_type, uint8_t *a_data, size_t a_data_size, dap_list_t *a_validators); -static void s_message_chain_add(dap_chain_cs_block_ton_items_t * a_session, dap_chain_node_addr_t * a_sender_node_addr, +static void s_message_chain_add(dap_chain_cs_block_ton_session_t * a_session, dap_chain_node_addr_t * a_sender_node_addr, dap_chain_cs_block_ton_message_t * a_message, size_t a_message_size, dap_chain_hash_fast_t *a_message_hash); -static void s_session_round_start(dap_chain_cs_block_ton_items_t *a_session); -static bool s_session_send_startsync(dap_chain_cs_block_ton_items_t *a_session); +static void s_session_round_start(dap_chain_cs_block_ton_session_t *a_session); +static bool s_session_send_startsync(dap_chain_cs_block_ton_session_t *a_session); static bool s_session_round_start_callback_load_session_store(dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_global_db_obj_t * a_values, void * a_arg); + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg); -static void s_session_block_new_delete(dap_chain_cs_block_ton_items_t *a_session); -static void s_session_my_candidate_delete(dap_chain_cs_block_ton_items_t *a_session); -static void s_session_round_finish(dap_chain_cs_block_ton_items_t *a_session, bool a_is_time_proc_lock); +static void s_session_block_new_delete(dap_chain_cs_block_ton_session_t *a_session); +static void s_session_my_candidate_delete(dap_chain_cs_block_ton_session_t *a_session); +static void s_session_round_finish(dap_chain_cs_block_ton_session_t *a_session, bool a_is_time_proc_lock); static dap_chain_node_addr_t *s_session_get_validator( - dap_chain_cs_block_ton_items_t *a_session, dap_chain_node_addr_t *a_addr, + dap_chain_cs_block_ton_session_t *a_session, dap_chain_node_addr_t *a_addr, dap_list_t *a_validators); static uint16_t s_session_message_count( - dap_chain_cs_block_ton_items_t *a_session, uint8_t a_round_name, uint8_t a_type, + dap_chain_cs_block_ton_session_t *a_session, uint8_t a_round_name, uint8_t a_type, dap_chain_hash_fast_t *a_candidate_hash, uint16_t *a_attempt_number); static void s_callback_delete(dap_chain_cs_blocks_t *a_blocks); static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cfg); @@ -52,11 +52,11 @@ static int s_callback_block_verify(dap_chain_cs_blocks_t *a_blocks, dap_chain_bl static int s_compare_validators_list_stake(const void * a_item1, const void * a_item2, void *a_unused); static int s_compare_validators_list_addr(const void * a_item1, const void * a_item2, void *a_unused); -static dap_list_t *s_get_validators_addr_list(dap_chain_cs_block_ton_items_t *a_session); //(dap_chain_t *a_chain); +static dap_list_t *s_get_validators_addr_list(dap_chain_cs_block_ton_session_t *a_session); //(dap_chain_t *a_chain); static bool s_hash_is_null(dap_chain_hash_fast_t *a_hash); -static dap_chain_cs_block_ton_items_t * s_session_items; +static dap_chain_cs_block_ton_session_t * s_session_items; static dap_timerfd_t * s_session_cs_timer = NULL; typedef struct dap_chain_cs_block_ton_pvt @@ -245,7 +245,7 @@ static int s_compare_validators_list_addr(const void * a_item1, const void * a_i return -1; } -static dap_list_t *s_get_validators_addr_list(dap_chain_cs_block_ton_items_t *a_session) {//(dap_chain_t *a_chain) { +static dap_list_t *s_get_validators_addr_list(dap_chain_cs_block_ton_session_t *a_session) {//(dap_chain_t *a_chain) { dap_chain_cs_blocks_t *l_blocks = DAP_CHAIN_CS_BLOCKS(a_session->chain); dap_chain_cs_block_ton_t *l_ton = DAP_CHAIN_CS_BLOCK_TON(l_blocks); @@ -305,7 +305,7 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf } dap_chain_net_t *l_net = dap_chain_net_by_id(a_chain->net_id); - dap_chain_cs_block_ton_items_t *l_session = DAP_NEW_Z(dap_chain_cs_block_ton_items_t); + dap_chain_cs_block_ton_session_t *l_session = DAP_NEW_Z(dap_chain_cs_block_ton_session_t); l_session->chain = a_chain; l_session->ton = l_ton; @@ -367,18 +367,18 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf * @param a_key * @param a_values_total * @param a_values_shift - * @param a_value_count + * @param a_values_count * @param a_values * @param a_arg */ static bool s_session_round_start_callback_load_session_store(dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_global_db_obj_t * a_values, void * a_arg) + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg) { - dap_chain_cs_block_ton_items_t *l_session = (dap_chain_cs_block_ton_items_t*) a_arg; - if (a_value_count) { + dap_chain_cs_block_ton_session_t *l_session = (dap_chain_cs_block_ton_session_t*) a_arg; + if (a_values_count) { dap_chain_cs_block_ton_store_t *l_store_candidate_ready = NULL; size_t l_candidate_ready_size = 0; - for (size_t i = 0; i < a_value_count; i++) { + for (size_t i = 0; i < a_values_count; i++) { if (!a_values[i].value_len) continue; dap_chain_cs_block_ton_store_t *l_store = @@ -419,7 +419,7 @@ static bool s_session_round_start_callback_load_session_store(dap_global_db_con * @brief s_session_round_start * @param a_session */ -static void s_session_round_start(dap_chain_cs_block_ton_items_t *a_session) +static void s_session_round_start(dap_chain_cs_block_ton_session_t *a_session) { a_session->cur_round.validators_start = NULL; @@ -450,7 +450,7 @@ static void s_session_round_start(dap_chain_cs_block_ton_items_t *a_session) * @param a_session * @return */ -static bool s_session_send_startsync(dap_chain_cs_block_ton_items_t *a_session) +static bool s_session_send_startsync(dap_chain_cs_block_ton_session_t *a_session) { dap_chain_cs_block_ton_message_startsync_t *l_startsync = DAP_NEW_Z(dap_chain_cs_block_ton_message_startsync_t); @@ -469,12 +469,12 @@ static bool s_session_send_startsync(dap_chain_cs_block_ton_items_t *a_session) typedef struct s_session_send_votefor_data { dap_chain_cs_block_ton_message_votefor_t *votefor; - dap_chain_cs_block_ton_items_t *session; + dap_chain_cs_block_ton_session_t *session; } DAP_ALIGN_PACKED s_session_send_votefor_data_t; static bool s_session_send_votefor(s_session_send_votefor_data_t *a_data){ dap_chain_cs_block_ton_message_votefor_t *l_votefor = a_data->votefor; - dap_chain_cs_block_ton_items_t *l_session = a_data->session; + dap_chain_cs_block_ton_session_t *l_session = a_data->session; l_votefor->round_id.uint64 = l_session->cur_round.id.uint64; s_message_send(l_session, DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_VOTE_FOR, (uint8_t*)l_votefor, sizeof(dap_chain_cs_block_ton_message_votefor_t), @@ -492,19 +492,19 @@ static bool s_session_send_votefor(s_session_send_votefor_data_t *a_data){ * @param a_key * @param a_values_total * @param a_values_shift - * @param a_value_count + * @param a_values_count * @param a_values * @param a_arg */ static bool s_session_round_start_callback_load_session_store_coordinator_state_proc(dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_global_db_obj_t * a_values, void * a_arg) + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg) { - dap_chain_cs_block_ton_items_t * l_session = (dap_chain_cs_block_ton_items_t *) a_arg; + dap_chain_cs_block_ton_session_t * l_session = (dap_chain_cs_block_ton_session_t *) a_arg; dap_list_t *l_list_candidate = NULL; // I am a coordinator :-) select candidate - if (a_value_count) { - for (size_t i = 0; i < a_value_count; i++) { + if (a_values_count) { + for (size_t i = 0; i < a_values_count; i++) { if (!a_values[i].value_len) continue; @@ -555,158 +555,179 @@ static bool s_session_round_start_callback_load_session_store_coordinator_state_ } /** - * @brief s_session_timer - * @return + * @brief s_session_proc_state + * @param l_session */ -static bool s_session_timer() +static void s_session_proc_state( dap_chain_cs_block_ton_session_t * a_session) { - dap_time_t l_time = dap_time_now(); - dap_chain_cs_block_ton_items_t *l_session = NULL; - DL_FOREACH(s_session_items, l_session) { - if ( l_session->time_proc_lock ) { - continue; - } - pthread_rwlock_rdlock(&l_session->rwlock); - l_session->time_proc_lock = true; // lock - skip check by reasons: prev check is not finish - switch (l_session->state) { - case DAP_STREAM_CH_CHAIN_SESSION_STATE_IDLE: { - if ( (((l_time/10)*10) % PVT(l_session->ton)->round_start_multiple_of) == 0 - && (l_time - ((l_time/10)*10)) <= 3 - && l_time > l_session->ts_round_finish - && (l_time-l_session->ts_round_finish) >= PVT(l_session->ton)->session_idle_min) { - - // round start - l_session->state = DAP_STREAM_CH_CHAIN_SESSION_STATE_WAIT_START; - s_session_round_start(l_session); - return true; // Unlock happens after round start in its callbacks - } - goto session_unlock; - } //break; - case DAP_STREAM_CH_CHAIN_SESSION_STATE_WAIT_START: { - if ( (l_time-l_session->ts_round_sync_start) >= PVT(l_session->ton)->round_start_sync_timeout ) { // timeout start sync - uint16_t l_startsync_count = l_session->cur_round.validators_start_count; - if ( ((float)l_startsync_count/l_session->cur_round.validators_count) >= ((float)2/3) ) { - // if sync more 2/3 validators then start round and submit candidate - if (PVT(l_session->ton)->debug) - log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu More than 2/3 of the validators are synchronized, so starting the round and send the candidate", - l_session->chain->net_name, l_session->chain->name, - l_session->cur_round.id.uint64, l_session->attempt_current_number); + dap_time_t l_time = dap_time_now(); - l_session->ts_round_start = l_time; - l_session->state = DAP_STREAM_CH_CHAIN_SESSION_STATE_CS_PROC; - - // sort validators list - dap_list_t *l_validators_start = l_session->cur_round.validators_start; - l_session->cur_round.validators_start = NULL; - dap_list_t *l_validators_list_temp = dap_list_first(l_session->cur_round.validators_list); - while (l_validators_list_temp) { - dap_chain_node_addr_t *l_validator_1 = (dap_chain_node_addr_t *)l_validators_list_temp->data; - l_validators_list_temp = l_validators_list_temp->next; - dap_list_t *l_validators_start_temp = dap_list_first(l_validators_start); - while (l_validators_start_temp) { - dap_chain_node_addr_t *l_validator_2 = (dap_chain_node_addr_t *)l_validators_start_temp->data; - l_validators_start_temp = l_validators_start_temp->next; - if ( l_validator_1->uint64 == l_validator_2->uint64 ) { - l_session->cur_round.validators_start = - dap_list_append(l_session->cur_round.validators_start, l_validator_1); - } - } - } + if(pthread_rwlock_tryrdlock(&a_session->rwlock) != 0 ){ // Session is busy + return; + } + if ( a_session->time_proc_lock ) { // Already in timer processing + pthread_rwlock_unlock (&a_session->rwlock); + return; + } + a_session->time_proc_lock = true; // lock - skip check by reasons: prev check is not finish + pthread_rwlock_unlock (&a_session->rwlock); // Mostly we're writting in session on the next operations, at least we do it with state + pthread_rwlock_wrlock( &a_session->rwlock); + + switch (a_session->state) { + case DAP_STREAM_CH_CHAIN_SESSION_STATE_IDLE: { + if ( (((l_time/10)*10) % PVT(a_session->ton)->round_start_multiple_of) == 0 + && (l_time - ((l_time/10)*10)) <= 3 + && l_time > a_session->ts_round_finish + && (l_time-a_session->ts_round_finish) >= PVT(a_session->ton)->session_idle_min) { + + // round start + a_session->state = DAP_STREAM_CH_CHAIN_SESSION_STATE_WAIT_START; + s_session_round_start(a_session); + goto session_processed; // Unlock happens after round start in its callbacks + } + goto session_unlock; + } //break; + case DAP_STREAM_CH_CHAIN_SESSION_STATE_WAIT_START: { + if ( (l_time-a_session->ts_round_sync_start) >= PVT(a_session->ton)->round_start_sync_timeout ) { // timeout start sync + uint16_t l_startsync_count = a_session->cur_round.validators_start_count; + if ( ((float)l_startsync_count/a_session->cur_round.validators_count) >= ((float)2/3) ) { + // if sync more 2/3 validators then start round and submit candidate + if (PVT(a_session->ton)->debug) + log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu More than 2/3 of the validators are synchronized, so starting the round and send the candidate", + a_session->chain->net_name, a_session->chain->name, + a_session->cur_round.id.uint64, a_session->attempt_current_number); + + a_session->ts_round_start = l_time; + a_session->state = DAP_STREAM_CH_CHAIN_SESSION_STATE_CS_PROC; + + // sort validators list + dap_list_t *l_validators_start = a_session->cur_round.validators_start; + a_session->cur_round.validators_start = NULL; + dap_list_t *l_validators_list_temp = dap_list_first(a_session->cur_round.validators_list); + while (l_validators_list_temp) { + dap_chain_node_addr_t *l_validator_1 = (dap_chain_node_addr_t *)l_validators_list_temp->data; + l_validators_list_temp = l_validators_list_temp->next; + dap_list_t *l_validators_start_temp = dap_list_first(l_validators_start); + while (l_validators_start_temp) { + dap_chain_node_addr_t *l_validator_2 = (dap_chain_node_addr_t *)l_validators_start_temp->data; + l_validators_start_temp = l_validators_start_temp->next; + if ( l_validator_1->uint64 == l_validator_2->uint64 ) { + a_session->cur_round.validators_start = + dap_list_append(a_session->cur_round.validators_start, l_validator_1); + } + } + } - // first coordinator - l_session->attempt_coordinator = - (dap_chain_node_addr_t *)(dap_list_first(l_session->cur_round.validators_start)->data); - } else { - s_session_round_finish(l_session, true); - if (PVT(l_session->ton)->debug) - log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Round finish by reason: can't synchronized 2/3 of the validators", - l_session->chain->net_name, l_session->chain->name, - l_session->cur_round.id.uint64, l_session->attempt_current_number); - return true; - } - } - goto session_unlock; - } //break; - case DAP_STREAM_CH_CHAIN_SESSION_STATE_WAIT_SIGNS: - case DAP_STREAM_CH_CHAIN_SESSION_STATE_CS_PROC: { - if ( !l_session->cur_round.submit && l_session->attempt_current_number == 1 ) { - dap_list_t *l_validators_list = dap_list_first(l_session->cur_round.validators_start); - int l_my_number = -1; - int i = 0; - while(l_validators_list) { - if( ((dap_chain_node_addr_t*)l_validators_list->data)->uint64 == l_session->my_addr->uint64) { - l_my_number = i; - break; - } - i++; - l_validators_list = l_validators_list->next; - } - if ( l_my_number != -1 ) { - l_my_number++; - if ( (l_time-l_session->ts_round_start) >= - (dap_time_t)((PVT(l_session->ton)->next_candidate_delay*l_my_number)+PVT(l_session->ton)->first_message_delay) ) { - l_session->cur_round.submit = true; - s_session_candidate_submit(l_session); - } - } - } + // first coordinator + a_session->attempt_coordinator = + (dap_chain_node_addr_t *)(dap_list_first(a_session->cur_round.validators_start)->data); + } else { + s_session_round_finish(a_session, true); + if (PVT(a_session->ton)->debug) + log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Round finish by reason: can't synchronized 2/3 of the validators", + a_session->chain->net_name, a_session->chain->name, + a_session->cur_round.id.uint64, a_session->attempt_current_number); + goto session_processed; + } + } + goto session_unlock; + } //break; + case DAP_STREAM_CH_CHAIN_SESSION_STATE_WAIT_SIGNS: + case DAP_STREAM_CH_CHAIN_SESSION_STATE_CS_PROC: { + if ( !a_session->cur_round.submit && a_session->attempt_current_number == 1 ) { + dap_list_t *l_validators_list = dap_list_first(a_session->cur_round.validators_start); + int l_my_number = -1; + int i = 0; + while(l_validators_list) { + if( ((dap_chain_node_addr_t*)l_validators_list->data)->uint64 == a_session->my_addr->uint64) { + l_my_number = i; + break; + } + i++; + l_validators_list = l_validators_list->next; + } + if ( l_my_number != -1 ) { + l_my_number++; + if ( (l_time-a_session->ts_round_start) >= + (dap_time_t)((PVT(a_session->ton)->next_candidate_delay*l_my_number)+PVT(a_session->ton)->first_message_delay) ) { + a_session->cur_round.submit = true; + s_session_candidate_submit(a_session); + goto session_processed; + } + } + } + case DAP_STREAM_CH_CHAIN_SESSION_STATE_CS_PROC_AFTER_SUBMIT: + + if ( (l_time-a_session->ts_round_start) >= + (dap_time_t)(PVT(a_session->ton)->round_attempt_duration*a_session->attempt_current_number) ) { + + a_session->attempt_current_number++; + if ( a_session->attempt_current_number > PVT(a_session->ton)->round_attempts_max ) { + s_session_round_finish(a_session,true); // attempts is out + if (PVT(a_session->ton)->debug) + log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Round finish by reason: attempts is out", + a_session->chain->net_name, a_session->chain->name, + a_session->cur_round.id.uint64, a_session->attempt_current_number); + goto session_processed; + } + if ( a_session->cur_round.candidates_count == 0 ) { // no candidates + s_session_round_finish(a_session,true); + if (PVT(a_session->ton)->debug) + log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Round finish by reason: no block candidates", + a_session->chain->net_name, a_session->chain->name, + a_session->cur_round.id.uint64, a_session->attempt_current_number); + goto session_processed; + } + if ( a_session->state == DAP_STREAM_CH_CHAIN_SESSION_STATE_WAIT_SIGNS ) { + goto session_unlock; + } - if ( (l_time-l_session->ts_round_start) >= - (dap_time_t)(PVT(l_session->ton)->round_attempt_duration*l_session->attempt_current_number) ) { + uint16_t l_validators_count = a_session->cur_round.validators_start_count; + uint64_t l_mod = 0; + if (!PVT(a_session->ton)->validators_list_by_stake) { + // rotate validatir list in non-stake mode + l_mod = a_session->cur_round.id.uint64; + } + uint16_t l_validators_index = + ( (a_session->attempt_current_number-2+l_mod) + - (l_validators_count + *((a_session->attempt_current_number-2+l_mod)/l_validators_count))); + + a_session->attempt_coordinator = (dap_chain_node_addr_t *) + (dap_list_nth(a_session->cur_round.validators_start, + l_validators_index)->data); + if (PVT(a_session->ton)->debug) + log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Start attempt: selected coordinator "NODE_ADDR_FP_STR"(index:%u)", + a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id.uint64, + a_session->attempt_current_number, NODE_ADDR_FP_ARGS(a_session->attempt_coordinator), + l_validators_index); + + if ( a_session->my_addr->uint64 == a_session->attempt_coordinator->uint64 ) { + // I am coordinator :-) select candidate + dap_global_db_get_all(a_session->gdb_group_store,0,s_session_round_start_callback_load_session_store_coordinator_state_proc, a_session); + goto session_processed; // Unlock in its callback + } + } + goto session_unlock; + } + } +session_unlock: + a_session->time_proc_lock = false; // unlock + pthread_rwlock_unlock(&a_session->rwlock); +session_processed: // Stay locked + ; - l_session->attempt_current_number++; - if ( l_session->attempt_current_number > PVT(l_session->ton)->round_attempts_max ) { - s_session_round_finish(l_session,true); // attempts is out - if (PVT(l_session->ton)->debug) - log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Round finish by reason: attempts is out", - l_session->chain->net_name, l_session->chain->name, - l_session->cur_round.id.uint64, l_session->attempt_current_number); - return true; - } - if ( l_session->cur_round.candidates_count == 0 ) { // no candidates - s_session_round_finish(l_session,true); - if (PVT(l_session->ton)->debug) - log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Round finish by reason: no block candidates", - l_session->chain->net_name, l_session->chain->name, - l_session->cur_round.id.uint64, l_session->attempt_current_number); - return true; - } - if ( l_session->state == DAP_STREAM_CH_CHAIN_SESSION_STATE_WAIT_SIGNS ) { - goto session_unlock; - } - - uint16_t l_validators_count = l_session->cur_round.validators_start_count; - uint64_t l_mod = 0; - if (!PVT(l_session->ton)->validators_list_by_stake) { - // rotate validatir list in non-stake mode - l_mod = l_session->cur_round.id.uint64; - } - uint16_t l_validators_index = - ( (l_session->attempt_current_number-2+l_mod) - - (l_validators_count - *((l_session->attempt_current_number-2+l_mod)/l_validators_count))); - - l_session->attempt_coordinator = (dap_chain_node_addr_t *) - (dap_list_nth(l_session->cur_round.validators_start, - l_validators_index)->data); - if (PVT(l_session->ton)->debug) - log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Start attempt: selected coordinator "NODE_ADDR_FP_STR"(index:%u)", - l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, - l_session->attempt_current_number, NODE_ADDR_FP_ARGS(l_session->attempt_coordinator), - l_validators_index); +} - if ( l_session->my_addr->uint64 == l_session->attempt_coordinator->uint64 ) { - // I am coordinator :-) select candidate - dap_global_db_get_all(l_session->gdb_group_store,0,s_session_round_start_callback_load_session_store_coordinator_state_proc, l_session); - return true; // Unlock in its callback - } - } - goto session_unlock; - } - } -session_unlock: - l_session->time_proc_lock = false; // unlock - pthread_rwlock_unlock(&l_session->rwlock); +/** + * @brief s_session_timer + * @return + */ +static bool s_session_timer() +{ + dap_chain_cs_block_ton_session_t *l_session = NULL; + DL_FOREACH(s_session_items, l_session) { + s_session_proc_state(l_session); } return true; } @@ -719,7 +740,7 @@ session_unlock: * @param a_candidate_size */ static void s_session_candidate_to_chain( - dap_chain_cs_block_ton_items_t *a_session, dap_chain_hash_fast_t *a_candidate_hash, + dap_chain_cs_block_ton_session_t *a_session, dap_chain_hash_fast_t *a_candidate_hash, dap_chain_block_t *a_candidate, size_t a_candidate_size) { @@ -796,9 +817,8 @@ static void s_session_candidate_to_chain( DAP_DELETE(l_candidate); } break; default: - // DAP_DELETE(l_candidate); - // log_it(L_CRITICAL, "TON: Wtf is this ret code? %d", l_candidate_hash_str); - break; + log_it(L_CRITICAL, "TON: Wtf is this ret code ? Atom hash %s code %d", l_candidate_hash_str, l_res); + DAP_DELETE(l_candidate); } DAP_DELETE(l_candidate_hash_str); dap_chain_hash_fast_t l_my_candidate_hash; @@ -811,7 +831,98 @@ static void s_session_candidate_to_chain( //DAP_DELETE(l_candidate); } -static bool s_session_candidate_submit(dap_chain_cs_block_ton_items_t *a_session){ +/** + * @brief Callback in session state processing timer's context to process session state after GDB callback + * @param a_worker + * @param a_arg + */ +static void s_callback_worker_session_proc_state(dap_worker_t * a_worker, void * a_arg) +{ + dap_chain_cs_block_ton_session_t * l_session = (dap_chain_cs_block_ton_session_t*) a_arg; + s_session_proc_state(l_session); +} + +/** + * @brief s_callback_block_new_add_datums_op_results + * @param a_cs_blocks + * @param a_rc + * @param a_arg + */ +static void s_callback_block_new_add_datums_op_results (dap_chain_cs_blocks_t * a_cs_blocks, int a_rc, void * a_arg) +{ + dap_chain_cs_block_ton_session_t * l_session = (dap_chain_cs_block_ton_session_t*) a_arg; + dap_chain_t *l_chain = l_session->chain; + dap_chain_cs_blocks_t *l_blocks = DAP_CHAIN_CS_BLOCKS(l_chain); + + if ( l_blocks->block_new_size && l_blocks->block_new) { + l_session->my_candidate = (dap_chain_block_t *)DAP_DUP_SIZE(l_blocks->block_new, l_blocks->block_new_size); + l_session->my_candidate_size = l_blocks->block_new_size; + s_session_block_new_delete(l_session); + } + + size_t l_submit_size = l_session->my_candidate ? + sizeof(dap_chain_cs_block_ton_message_submit_t)+l_session->my_candidate_size + : sizeof(dap_chain_cs_block_ton_message_submit_t); + + // dap_chain_cs_new_block_add_datums(dap_chain_t *a_chain); + // size_t l_submit_size = l_blocks->block_new ? + // sizeof(dap_chain_cs_block_ton_message_submit_t)+a_session->my_candidate_size + // : sizeof(dap_chain_cs_block_ton_message_submit_t); + + dap_chain_cs_block_ton_message_submit_t *l_submit = + DAP_NEW_SIZE(dap_chain_cs_block_ton_message_submit_t, l_submit_size); + l_submit->round_id.uint64 = l_session->cur_round.id.uint64; + l_submit->candidate_size = l_session->my_candidate_size; + + bool l_candidate_exists = false; + if ( l_session->my_candidate ) { + dap_chain_hash_fast_t l_candidate_hash; + dap_hash_fast(l_session->my_candidate, l_session->my_candidate_size, &l_candidate_hash); + // pass if this candidate participated in old round + if ( !l_session->old_round.my_candidate_hash + || memcmp(&l_candidate_hash, l_session->old_round.my_candidate_hash, + sizeof(dap_chain_hash_fast_t)) != 0 ) { + memcpy(&l_submit->candidate_hash, &l_candidate_hash, sizeof(dap_chain_hash_fast_t)); + l_session->cur_round.my_candidate_hash = + (dap_chain_hash_fast_t*)DAP_DUP_SIZE(&l_candidate_hash, sizeof(dap_chain_hash_fast_t)); + memcpy(l_submit->candidate, l_session->my_candidate, l_session->my_candidate_size); + if (PVT(l_session->ton)->debug) { + char *l_hash_str = dap_chain_hash_fast_to_str_new(&l_candidate_hash); + log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U" Submit my candidate:%s", + l_session->chain->net_name, l_session->chain->name, + l_session->cur_round.id.uint64, l_hash_str); + DAP_DELETE(l_hash_str); + } + l_session->my_candidate_attempts_count++; + l_candidate_exists = true; + } + } + + if (!l_candidate_exists) { // no my candidate, send null hash + dap_chain_hash_fast_t l_candidate_hash_null={0}; + l_session->cur_round.my_candidate_hash = NULL; + memcpy(&l_submit->candidate_hash, &l_candidate_hash_null, sizeof(dap_chain_hash_fast_t)); + if (PVT(l_session->ton)->debug) + log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu I don't have a candidate. I submit a null candidate.", + l_session->chain->net_name, l_session->chain->name, + l_session->cur_round.id.uint64, l_session->attempt_current_number); + } + s_message_send(l_session, DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_SUBMIT, + (uint8_t*)l_submit, l_submit_size, l_session->cur_round.validators_start); + DAP_DELETE(l_submit); + + l_session->time_proc_lock = false; // unlock + l_session->state = DAP_STREAM_CH_CHAIN_SESSION_STATE_CS_PROC_AFTER_SUBMIT; // Seconds stage after candidate submitted + + dap_worker_t * l_worker = l_session->worker; + dap_global_db_context_t * l_gdb_context = (dap_global_db_context_t*) dap_context_current()->_inheritor; + assert(l_worker); + assert(l_gdb_context); + pthread_rwlock_unlock(&l_session->rwlock); + dap_worker_exec_callback_inter( l_gdb_context->queue_worker_callback_input[l_worker->id],s_callback_worker_session_proc_state, l_session ); +} + +static void s_session_candidate_submit(dap_chain_cs_block_ton_session_t *a_session){ // if (!a_session->my_candidate // || a_session->my_candidate_attempts_count @@ -827,67 +938,11 @@ static bool s_session_candidate_submit(dap_chain_cs_block_ton_items_t *a_session // } dap_chain_t *l_chain = a_session->chain; - dap_chain_cs_blocks_t *l_blocks = DAP_CHAIN_CS_BLOCKS(l_chain); - s_session_my_candidate_delete(a_session); - dap_chain_cs_new_block_add_datums(l_chain); // add new datums from queue - if ( l_blocks->block_new_size && l_blocks->block_new) { - a_session->my_candidate = (dap_chain_block_t *)DAP_DUP_SIZE(l_blocks->block_new, l_blocks->block_new_size); - a_session->my_candidate_size = l_blocks->block_new_size; - s_session_block_new_delete(a_session); - } - - size_t l_submit_size = a_session->my_candidate ? - sizeof(dap_chain_cs_block_ton_message_submit_t)+a_session->my_candidate_size - : sizeof(dap_chain_cs_block_ton_message_submit_t); - - // dap_chain_cs_new_block_add_datums(dap_chain_t *a_chain); - // size_t l_submit_size = l_blocks->block_new ? - // sizeof(dap_chain_cs_block_ton_message_submit_t)+a_session->my_candidate_size - // : sizeof(dap_chain_cs_block_ton_message_submit_t); - - dap_chain_cs_block_ton_message_submit_t *l_submit = - DAP_NEW_SIZE(dap_chain_cs_block_ton_message_submit_t, l_submit_size); - l_submit->round_id.uint64 = a_session->cur_round.id.uint64; - l_submit->candidate_size = a_session->my_candidate_size; - - bool l_candidate_exists = false; - if ( a_session->my_candidate ) { - dap_chain_hash_fast_t l_candidate_hash; - dap_hash_fast(a_session->my_candidate, a_session->my_candidate_size, &l_candidate_hash); - // pass if this candidate participated in old round - if ( !a_session->old_round.my_candidate_hash - || memcmp(&l_candidate_hash, a_session->old_round.my_candidate_hash, - sizeof(dap_chain_hash_fast_t)) != 0 ) { - memcpy(&l_submit->candidate_hash, &l_candidate_hash, sizeof(dap_chain_hash_fast_t)); - a_session->cur_round.my_candidate_hash = - (dap_chain_hash_fast_t*)DAP_DUP_SIZE(&l_candidate_hash, sizeof(dap_chain_hash_fast_t)); - memcpy(l_submit->candidate, a_session->my_candidate, a_session->my_candidate_size); - if (PVT(a_session->ton)->debug) { - char *l_hash_str = dap_chain_hash_fast_to_str_new(&l_candidate_hash); - log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U" Submit my candidate:%s", - a_session->chain->net_name, a_session->chain->name, - a_session->cur_round.id.uint64, l_hash_str); - DAP_DELETE(l_hash_str); - } - a_session->my_candidate_attempts_count++; - l_candidate_exists = true; - } - } - - if (!l_candidate_exists) { // no my candidate, send null hash - dap_chain_hash_fast_t l_candidate_hash_null={0}; - a_session->cur_round.my_candidate_hash = NULL; - memcpy(&l_submit->candidate_hash, &l_candidate_hash_null, sizeof(dap_chain_hash_fast_t)); - if (PVT(a_session->ton)->debug) - log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu I don't have a candidate. I submit a null candidate.", - a_session->chain->net_name, a_session->chain->name, - a_session->cur_round.id.uint64, a_session->attempt_current_number); - } - s_message_send(a_session, DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_SUBMIT, - (uint8_t*)l_submit, l_submit_size, a_session->cur_round.validators_start); - DAP_DELETE(l_submit); + s_session_my_candidate_delete(a_session); + a_session->worker = dap_worker_get_current(); + assert (a_session->worker); // It must be called from worker's timer - return false; // for timer + dap_chain_cs_new_block_add_datums(l_chain, s_callback_block_new_add_datums_op_results, a_session); // add new datums from queue } static int s_session_atom_validation(dap_chain_cs_blocks_t *a_blocks, dap_chain_block_t *a_block, size_t a_block_size){ @@ -902,13 +957,13 @@ static int s_session_atom_validation(dap_chain_cs_blocks_t *a_blocks, dap_chain_ return -1; } -static void s_session_block_new_delete(dap_chain_cs_block_ton_items_t *a_session) { +static void s_session_block_new_delete(dap_chain_cs_block_ton_session_t *a_session) { dap_chain_t *l_chain = a_session->chain; dap_chain_cs_blocks_t *l_blocks = DAP_CHAIN_CS_BLOCKS(l_chain); l_blocks->callback_new_block_del(l_blocks); } -static void s_session_my_candidate_delete(dap_chain_cs_block_ton_items_t *a_session) { +static void s_session_my_candidate_delete(dap_chain_cs_block_ton_session_t *a_session) { if (a_session->my_candidate){ if (PVT(a_session->ton)->debug) { dap_chain_hash_fast_t l_candidate_hash; @@ -935,7 +990,7 @@ static bool s_hash_is_null(dap_chain_hash_fast_t *a_hash){ } struct session_round_finish_args{ - dap_chain_cs_block_ton_items_t *session; + dap_chain_cs_block_ton_session_t *session; bool is_time_proc_lock; }; @@ -947,31 +1002,29 @@ struct session_round_finish_args{ * @param a_key * @param a_values_total * @param a_values_shift - * @param a_value_count + * @param a_values_count * @param a_values * @param a_arg */ static bool s_session_round_finish_callback_load_store(dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_global_db_obj_t * a_values, void * a_arg) + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg) { - dap_chain_cs_block_ton_items_t *l_session = ((struct session_round_finish_args *)a_arg)->session; + dap_chain_cs_block_ton_session_t *l_session = ((struct session_round_finish_args *)a_arg)->session; bool l_is_time_proc_lock = ((struct session_round_finish_args *)a_arg)->is_time_proc_lock; DAP_DELETE(a_arg); - if(l_is_time_proc_lock){ // If its not locked we lock it by ourself for writting - pthread_rwlock_unlock(&l_session->rwlock); + if(! l_is_time_proc_lock){ // If its not locked we lock it by ourself for writting + pthread_rwlock_wrlock(&l_session->rwlock); // lock for writting } - pthread_rwlock_wrlock(&l_session->rwlock); // lock for writting - l_session->state = DAP_STREAM_CH_CHAIN_SESSION_STATE_IDLE; l_session->ts_round_finish = dap_time_now(); - if (a_value_count) { + if (a_values_count) { dap_chain_cs_block_ton_store_t *l_store_candidate_ready = NULL; size_t l_candidate_ready_size = 0; - for (size_t i = 0; i < a_value_count; i++) { + for (size_t i = 0; i < a_values_count; i++) { if (!a_values[i].value_len) continue; dap_chain_cs_block_ton_store_t *l_store = @@ -1079,7 +1132,7 @@ static bool s_session_round_finish_callback_load_store(dap_global_db_context_t * * @param a_is_time_proc_lock * @return */ -static void s_session_round_finish(dap_chain_cs_block_ton_items_t *a_session, bool a_is_time_proc_lock) +static void s_session_round_finish(dap_chain_cs_block_ton_session_t *a_session, bool a_is_time_proc_lock) { struct session_round_finish_args * l_args = DAP_NEW_Z(struct session_round_finish_args); l_args->session = a_session; @@ -1093,7 +1146,7 @@ static void s_session_round_finish(dap_chain_cs_block_ton_items_t *a_session, bo // this is planned for get validator addr if validator addr list to be changed to stakes, // but currently it using for check validator addr exists static dap_chain_node_addr_t *s_session_get_validator( - dap_chain_cs_block_ton_items_t * a_session, dap_chain_node_addr_t * a_addr, + dap_chain_cs_block_ton_session_t * a_session, dap_chain_node_addr_t * a_addr, dap_list_t *a_validators) { // dap_chain_cs_block_ton_round_t *l_round = a_round_name == DAP_TON$ROUND_CUR ? // 'c' or 'o' // &a_session->cur_round : &a_session->old_round; @@ -1108,7 +1161,7 @@ static dap_chain_node_addr_t *s_session_get_validator( } static uint16_t s_session_message_count( - dap_chain_cs_block_ton_items_t *a_session, uint8_t a_round_name, uint8_t a_type, + dap_chain_cs_block_ton_session_t *a_session, uint8_t a_round_name, uint8_t a_type, dap_chain_hash_fast_t *a_candidate_hash, uint16_t *a_attempt_number) { dap_chain_cs_block_ton_message_item_t *l_messages_items = NULL; l_messages_items = a_round_name == DAP_TON$ROUND_CUR ? // 'c' or 'o' @@ -1150,7 +1203,7 @@ static uint16_t s_session_message_count( struct vote_for_load_store_args { - dap_chain_cs_block_ton_items_t *session; + dap_chain_cs_block_ton_session_t *session; dap_chain_hash_fast_t candidate_hash; }; @@ -1162,14 +1215,14 @@ struct vote_for_load_store_args * @param a_key * @param a_values_total * @param a_values_shift - * @param a_value_count + * @param a_values_count * @param a_values * @param a_arg */ static bool s_session_packet_in_callback_vote_for_load_store (dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_global_db_obj_t * a_values, void * a_arg) + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg) { - dap_chain_cs_block_ton_items_t * l_session = ((struct vote_for_load_store_args *) a_arg)->session; + dap_chain_cs_block_ton_session_t * l_session = ((struct vote_for_load_store_args *) a_arg)->session; dap_chain_hash_fast_t * l_candidate_hash = &((struct vote_for_load_store_args *) a_arg)->candidate_hash; dap_chain_cs_block_ton_store_t *l_found_best = NULL; @@ -1177,8 +1230,8 @@ static bool s_session_packet_in_callback_vote_for_load_store (dap_global_db_cont dap_chain_cs_block_ton_store_t *l_found_approve_vf = NULL; // dap_chain_cs_block_ton_store_t *l_found_approve = NULL; pthread_rwlock_rdlock(&l_session->rwlock); - if (a_value_count) { - for (size_t i = 0; i < a_value_count; i++) { + if (a_values_count) { + for (size_t i = 0; i < a_values_count; i++) { if (!a_values[i].value_len) continue; dap_chain_cs_block_ton_store_t *l_store = @@ -1262,7 +1315,7 @@ static bool s_session_packet_in_callback_vote_for_load_store (dap_global_db_cont static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_node_addr, dap_chain_hash_fast_t *a_data_hash, uint8_t *a_data, size_t a_data_size) { bool l_message_delete = true; - dap_chain_cs_block_ton_items_t *l_session = (dap_chain_cs_block_ton_items_t *)a_arg; + dap_chain_cs_block_ton_session_t *l_session = (dap_chain_cs_block_ton_session_t *)a_arg; dap_chain_cs_block_ton_message_t *l_message = (dap_chain_cs_block_ton_message_t *)DAP_DUP_SIZE(a_data, a_data_size); @@ -1867,7 +1920,8 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod log_it (L_ERROR, "Can't process get_all request for vote_for_load_store"); DAP_DELETE(l_args); } - } break; + pthread_rwlock_unlock(&l_session->rwlock); + } break; case DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_VOTE: { dap_chain_cs_block_ton_message_vote_t *l_vote = (dap_chain_cs_block_ton_message_vote_t *) @@ -1933,7 +1987,10 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod (l_message->sign_n_message+l_message->hdr.sign_size); dap_chain_hash_fast_t *l_candidate_hash = &l_precommit->candidate_hash; + pthread_rwlock_rdlock(&l_session->rwlock); + if ( l_precommit->attempt_number != l_session->attempt_current_number) { + pthread_rwlock_unlock(&l_session->rwlock); goto handler_finish; } if ( s_hash_is_null(l_candidate_hash) ) { @@ -1941,6 +1998,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hu Receive PRE_COMMIT: candidate: NULL", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number); + pthread_rwlock_unlock(&l_session->rwlock); goto handler_finish_save; } @@ -1950,7 +2008,6 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number, l_candidate_hash_str); - pthread_rwlock_rdlock(&l_session->rwlock); uint16_t l_attempt_number = l_session->attempt_current_number; uint16_t l_precommit_count = s_session_message_count( l_session, DAP_TON$ROUND_CUR, DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_PRE_COMMIT, @@ -1987,7 +2044,9 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod DAP_DELETE(l_commitsign); DAP_DELETE(l_candidate); DAP_DELETE(l_candidate_sign); - + pthread_rwlock_unlock(&l_session->rwlock); + + pthread_rwlock_wrlock(&l_session->rwlock); l_session->state = DAP_STREAM_CH_CHAIN_SESSION_STATE_WAIT_SIGNS; l_session->ts_round_state_commit = dap_time_now(); @@ -1995,6 +2054,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U" attempt:%hu Candidate:%s collected PRE_COMMIT more than 2/3 of the validators, so to sent a COMMIT_SIGN", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number, l_candidate_hash_str); + pthread_rwlock_unlock(&l_session->rwlock); } } else { @@ -2095,7 +2155,7 @@ handler_finish: return; } -static uint8_t *s_message_data_sign(dap_chain_cs_block_ton_items_t *a_session, +static uint8_t *s_message_data_sign(dap_chain_cs_block_ton_session_t *a_session, dap_chain_cs_block_ton_message_t *a_message, size_t *a_sign_size) { size_t l_size[5] = {sizeof(a_message->hdr.id), sizeof(a_message->hdr.ts_created), sizeof(a_message->hdr.type), sizeof(a_message->hdr.chain_id), @@ -2117,7 +2177,7 @@ static uint8_t *s_message_data_sign(dap_chain_cs_block_ton_items_t *a_session, return l_data; } -static void s_message_send(dap_chain_cs_block_ton_items_t *a_session, uint8_t a_message_type, +static void s_message_send(dap_chain_cs_block_ton_session_t *a_session, uint8_t a_message_type, uint8_t *a_data, size_t a_data_size, dap_list_t *a_validators) { dap_chain_net_t *l_net = dap_chain_net_by_id(a_session->chain->net_id); size_t l_message_size = sizeof(dap_chain_cs_block_ton_message_hdr_t)+a_data_size; @@ -2155,7 +2215,7 @@ static void s_message_send(dap_chain_cs_block_ton_items_t *a_session, uint8_t a_ } -static void s_message_chain_add(dap_chain_cs_block_ton_items_t *a_session, dap_chain_node_addr_t *a_sender_node_addr, +static void s_message_chain_add(dap_chain_cs_block_ton_session_t *a_session, dap_chain_node_addr_t *a_sender_node_addr, dap_chain_cs_block_ton_message_t *a_message, size_t a_message_size, dap_chain_hash_fast_t *a_message_hash) { diff --git a/modules/consensus/block-ton/include/dap_chain_cs_block_ton.h b/modules/consensus/block-ton/include/dap_chain_cs_block_ton.h index 0ad67933ec..c4a7f3a0d9 100644 --- a/modules/consensus/block-ton/include/dap_chain_cs_block_ton.h +++ b/modules/consensus/block-ton/include/dap_chain_cs_block_ton.h @@ -4,10 +4,11 @@ #include "dap_chain_cs_blocks.h" #include "dap_cert.h" -#define DAP_STREAM_CH_CHAIN_SESSION_STATE_IDLE 0x04 -#define DAP_STREAM_CH_CHAIN_SESSION_STATE_WAIT_START 0x08 -#define DAP_STREAM_CH_CHAIN_SESSION_STATE_CS_PROC 0x12 -#define DAP_STREAM_CH_CHAIN_SESSION_STATE_WAIT_SIGNS 0x16 +#define DAP_STREAM_CH_CHAIN_SESSION_STATE_IDLE 0x04 +#define DAP_STREAM_CH_CHAIN_SESSION_STATE_WAIT_START 0x08 +#define DAP_STREAM_CH_CHAIN_SESSION_STATE_CS_PROC 0x12 +#define DAP_STREAM_CH_CHAIN_SESSION_STATE_CS_PROC_AFTER_SUBMIT 0x13 +#define DAP_STREAM_CH_CHAIN_SESSION_STATE_WAIT_SIGNS 0x16 #define DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_START_SYNC 0x32 @@ -56,7 +57,7 @@ typedef struct dap_chain_cs_block_ton_round { uint16_t candidates_count; } dap_chain_cs_block_ton_round_t; -typedef struct dap_chain_cs_block_ton_items { +typedef struct dap_chain_cs_block_ton_session { dap_chain_t *chain; dap_chain_cs_block_ton_t *ton; @@ -84,14 +85,14 @@ typedef struct dap_chain_cs_block_ton_items { dap_enc_key_t *blocks_sign_key; - struct dap_chain_cs_block_ton_items *next; - struct dap_chain_cs_block_ton_items *prev; + struct dap_chain_cs_block_ton_session *next; + struct dap_chain_cs_block_ton_session *prev; bool time_proc_lock; // flag - skip check if prev check is not finish + dap_worker_t * worker; // Worker where it was processed last time pthread_rwlock_t rwlock; - -} dap_chain_cs_block_ton_items_t; +} dap_chain_cs_block_ton_session_t; typedef struct dap_chain_cs_block_ton_message_hdr { uint8_t type; diff --git a/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c b/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c index 533d536fec..b145b20a63 100644 --- a/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c +++ b/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c @@ -410,12 +410,12 @@ typedef struct event_clean_dup_items { * @param a_key * @param a_values_total * @param a_values_shift - * @param a_value_count + * @param a_values_count * @param a_values * @param a_arg */ static bool s_poa_round_check_callback_load_round_new(dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_global_db_obj_t * a_values, void * a_arg) + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg) { dap_chain_t * l_chain = (dap_chain_t *) a_arg; dap_chain_cs_dag_t *l_dag = DAP_CHAIN_CS_DAG(l_chain); @@ -424,8 +424,8 @@ static bool s_poa_round_check_callback_load_round_new(dap_global_db_context_t * char *l_gdb_group_round_new = l_dag->gdb_group_events_round_new; size_t l_events_count = 0; - if (a_value_count) { - for (size_t i = 0; i<a_value_count; i++) { + if (a_values_count) { + for (size_t i = 0; i<a_values_count; i++) { dap_chain_cs_dag_event_round_item_t *l_event_round_item = (dap_chain_cs_dag_event_round_item_t *)a_values[i].value; if ( (dap_time_now() - l_event_round_item->round_info.ts_update) > (l_poa_pvt->confirmations_timeout+l_poa_pvt->wait_sync_before_complete+10) ) { @@ -436,7 +436,7 @@ static bool s_poa_round_check_callback_load_round_new(dap_global_db_context_t * l_events_count++; } } - dap_global_db_objs_delete(a_values, a_value_count); + dap_global_db_objs_delete(a_values, a_values_count); } if (!l_events_count) { diff --git a/modules/consensus/none/dap_chain_cs_none.c b/modules/consensus/none/dap_chain_cs_none.c index 3bc07a45b3..d7bb7334f2 100644 --- a/modules/consensus/none/dap_chain_cs_none.c +++ b/modules/consensus/none/dap_chain_cs_none.c @@ -281,12 +281,12 @@ const char* dap_chain_gdb_get_group(dap_chain_t * a_chain) * @param a_key * @param a_values_total * @param a_values_shift - * @param a_value_count + * @param a_values_count * @param a_values * @param a_arg */ static bool s_ledger_load_callback(dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_global_db_obj_t * a_values, void * a_arg) + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg) { assert(a_arg); dap_chain_t * l_chain = (dap_chain_t *) a_arg; @@ -296,7 +296,7 @@ static bool s_ledger_load_callback(dap_global_db_context_t * a_global_db_context dap_chain_gdb_private_t * l_gdb_pvt = PVT(l_gdb); assert(l_gdb_pvt); // make list of datums - for(size_t i = 0; i < a_value_count; i++) { + for(size_t i = 0; i < a_values_count; i++) { s_chain_callback_atom_add(l_chain, a_values[i].value, a_values[i].value_len); } l_gdb_pvt->is_load_mode = false; diff --git a/modules/global-db/dap_global_db.c b/modules/global-db/dap_global_db.c index 3cceff7515..8c30d3b835 100644 --- a/modules/global-db/dap_global_db.c +++ b/modules/global-db/dap_global_db.c @@ -939,16 +939,16 @@ struct objs_get{ * @param a_key * @param a_values_total * @param a_values_shift - * @param a_value_count + * @param a_values_count * @param a_values * @param a_arg */ static bool s_objs_get_callback (dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_global_db_obj_t * a_values, void * a_arg) + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg) { struct objs_get * l_args = (struct objs_get *) a_arg; l_args->objs = a_values; - l_args->objs_count = a_value_count; + l_args->objs_count = a_values_count; pthread_mutex_lock(&l_args->mutex); pthread_cond_broadcast(&l_args->cond); pthread_mutex_unlock(&l_args->mutex); @@ -988,11 +988,11 @@ struct store_objs_get{ }; static bool s_store_objs_get_callback (dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_store_obj_t * a_values, void * a_arg) + const size_t a_values_count, dap_store_obj_t * a_values, void * a_arg) { struct store_objs_get * l_args = (struct store_objs_get *) a_arg; l_args->objs = a_values; - l_args->objs_count = a_value_count; + l_args->objs_count = a_values_count; pthread_mutex_lock(&l_args->mutex); pthread_cond_broadcast(&l_args->cond); pthread_mutex_unlock(&l_args->mutex); diff --git a/modules/global-db/include/dap_global_db.h b/modules/global-db/include/dap_global_db.h index aa5b727e0e..661e978187 100644 --- a/modules/global-db/include/dap_global_db.h +++ b/modules/global-db/include/dap_global_db.h @@ -75,9 +75,9 @@ typedef struct dap_global_db_obj { typedef void (*dap_global_db_callback_result_t) (dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const void * a_value, const size_t a_value_len, dap_nanotime_t value_ts, bool a_is_pinned, void * a_arg); typedef bool (*dap_global_db_callback_results_t) (dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_global_db_obj_t * a_values, void * a_arg); + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg); typedef bool (*dap_global_db_callback_results_raw_t) (dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_current, const size_t a_values_shift, - const size_t a_value_count, dap_store_obj_t * a_values, void * a_arg); + const size_t a_values_count, dap_store_obj_t * a_values, void * a_arg); // Return codes #define DAP_GLOBAL_DB_RC_SUCCESS 0 #define DAP_GLOBAL_DB_RC_NO_RESULTS -1 diff --git a/modules/mempool/dap_chain_mempool.c b/modules/mempool/dap_chain_mempool.c index 5bc18da7ef..803631c386 100644 --- a/modules/mempool/dap_chain_mempool.c +++ b/modules/mempool/dap_chain_mempool.c @@ -68,7 +68,7 @@ void s_tx_create_massive_gdb_save_callback( dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_global_db_obj_t * a_values, void * a_arg); + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg); int dap_datum_mempool_init(void) { @@ -91,7 +91,7 @@ char *dap_chain_mempool_datum_add(const dap_chain_datum_t *a_datum, dap_chain_t dap_hash_fast(a_datum, dap_chain_datum_size(a_datum), &l_key_hash); char * l_key_str = dap_chain_hash_fast_to_str_new(&l_key_hash); - char * l_gdb_group = dap_chain_net_get_gdb_group_mempool(a_chain); + char * l_gdb_group = dap_chain_net_get_gdb_group_mempool_new(a_chain); if (dap_chain_global_db_gr_set(l_key_str, a_datum, dap_chain_datum_size(a_datum), l_gdb_group)) { log_it(L_NOTICE, "Datum with hash %s was placed in mempool", l_key_str); @@ -358,7 +358,7 @@ int dap_chain_mempool_tx_create_massive( dap_chain_t * a_chain, dap_enc_key_t *a } dap_list_free_full(l_list_used_out, NULL); - char * l_gdb_group = dap_chain_net_get_gdb_group_mempool(a_chain); + char * l_gdb_group = dap_chain_net_get_gdb_group_mempool_new(a_chain); //return 0; dap_global_db_set_multiple(l_gdb_group, l_objs,a_tx_num, s_tx_create_massive_gdb_save_callback , NULL ); @@ -374,13 +374,13 @@ int dap_chain_mempool_tx_create_massive( dap_chain_t * a_chain, dap_enc_key_t *a * @param a_key * @param a_values_total * @param a_values_shift - * @param a_value_count + * @param a_values_count * @param a_values * @param a_arg */ void s_tx_create_massive_gdb_save_callback( dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, - const size_t a_value_count, dap_global_db_obj_t * a_values, void * a_arg) + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg) { dap_global_db_objs_delete(a_values, a_values_total); // Delete objs thats passed as arg; if( a_rc ==0 ) { @@ -499,7 +499,7 @@ dap_chain_hash_fast_t* dap_chain_mempool_tx_create_cond_input(dap_chain_net_t * char * l_gdb_group; if(a_net->pub.default_chain) - l_gdb_group = dap_chain_net_get_gdb_group_mempool(a_net->pub.default_chain); + l_gdb_group = dap_chain_net_get_gdb_group_mempool_new(a_net->pub.default_chain); else l_gdb_group = dap_chain_net_get_gdb_group_mempool_by_chain_type( a_net ,CHAIN_TYPE_TX); @@ -635,7 +635,7 @@ dap_chain_hash_fast_t *dap_chain_mempool_base_tx_create(dap_chain_t *a_chain, da dap_chain_id_t a_emission_chain_id, uint256_t a_emission_value, const char *a_ticker, dap_chain_addr_t *a_addr_to, dap_cert_t **a_certs, size_t a_certs_count) { - char *l_gdb_group_mempool_base_tx = dap_chain_net_get_gdb_group_mempool(a_chain); + char *l_gdb_group_mempool_base_tx = dap_chain_net_get_gdb_group_mempool_new(a_chain); // create first transaction (with tx_token) dap_chain_datum_tx_t *l_tx = DAP_NEW_Z_SIZE(dap_chain_datum_tx_t, sizeof(dap_chain_datum_tx_t)); l_tx->header.ts_created = time(NULL); @@ -689,7 +689,7 @@ dap_chain_hash_fast_t *dap_chain_mempool_base_tx_create(dap_chain_t *a_chain, da dap_chain_datum_token_emission_t *dap_chain_mempool_emission_get(dap_chain_t *a_chain, const char *a_emission_hash_str) { size_t l_emission_size; - char *l_gdb_group = dap_chain_net_get_gdb_group_mempool(a_chain); + char *l_gdb_group = dap_chain_net_get_gdb_group_mempool_new(a_chain); dap_chain_datum_t *l_emission = (dap_chain_datum_t *)dap_chain_global_db_gr_get( a_emission_hash_str, &l_emission_size, l_gdb_group); if (!l_emission) { diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 9c06b2a3cc..8102c6d56f 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -639,7 +639,7 @@ static void s_gbd_history_callback_notify(void *a_arg, const char a_op_code, con if (!l_chain) { continue; } - char *l_gdb_group_str = dap_chain_net_get_gdb_group_mempool(l_chain); + char *l_gdb_group_str = dap_chain_net_get_gdb_group_mempool_new(l_chain); if (!strcmp(a_group, l_gdb_group_str)) { for (dap_list_t *it = DAP_CHAIN_PVT(l_chain)->mempool_notifires; it; it = it->next) { dap_chain_gdb_notifier_t *el = (dap_chain_gdb_notifier_t *)it->data; @@ -2965,7 +2965,7 @@ char * dap_chain_net_get_gdb_group_mempool_by_chain_type(dap_chain_net_t * l_net { for(int i = 0; i < l_chain->datum_types_count; i++) { if(l_chain->datum_types[i] == a_datum_type) - return dap_chain_net_get_gdb_group_mempool(l_chain); + return dap_chain_net_get_gdb_group_mempool_new(l_chain); } } return NULL; @@ -3118,76 +3118,83 @@ bool dap_chain_net_get_flag_sync_from_zero( dap_chain_net_t * a_net) return PVT(a_net)->flags &F_DAP_CHAIN_NET_SYNC_FROM_ZERO ; } + +bool s_proc_mempool_callback_load(dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg) +{ + dap_chain_t * l_chain = (dap_chain_t*) a_arg; + dap_chain_net_t * l_net = dap_chain_net_by_id( l_chain->net_id ); + dap_string_t * l_str_tmp = dap_string_new(NULL); + if(a_values_count) { + log_it(L_INFO, "%s.%s: Found %zu records :", l_net->pub.name, l_chain->name, + a_values_count); + size_t l_datums_size = a_values_count; + dap_chain_datum_t ** l_datums = DAP_NEW_Z_SIZE(dap_chain_datum_t*, + sizeof(dap_chain_datum_t*) * l_datums_size); + size_t l_objs_size_tmp = (a_values_count > 15) ? min(a_values_count, 10) : a_values_count; + for(size_t i = 0; i < a_values_count; i++) { + dap_chain_datum_t * l_datum = (dap_chain_datum_t*) a_values[i].value; + int l_verify_datum= dap_chain_net_verify_datum_for_add( l_net, l_datum) ; + if (l_verify_datum != 0){ + log_it(L_WARNING, "Datum doesn't pass verifications (code %d), delete such datum from pool", + l_verify_datum); + dap_chain_global_db_gr_del( a_values[i].key, a_group); + l_datums[i] = NULL; + }else{ + l_datums[i] = l_datum; + if(i < l_objs_size_tmp) { + char buf[50] = { '\0' }; + const char *l_type = NULL; + DAP_DATUM_TYPE_STR(l_datum->header.type_id, l_type) + dap_time_t l_ts_create = (dap_time_t) l_datum->header.ts_create; + log_it(L_INFO, "\t\t0x%s: type_id=%s ts_create=%s data_size=%u", + a_values[i].key, l_type, + dap_ctime_r(&l_ts_create, buf), l_datum->header.data_size); + } + } + } + size_t l_objs_processed = l_chain->callback_add_datums(l_chain, l_datums, l_datums_size); + // Delete processed objects + size_t l_objs_processed_tmp = (l_objs_processed > 15) ? min(l_objs_processed, 10) : l_objs_processed; + for(size_t i = 0; i < l_objs_processed; i++) { + dap_chain_global_db_gr_del(a_values[i].key, a_group); + if(i < l_objs_processed_tmp) { + dap_string_append_printf(l_str_tmp, "New event created, removed datum 0x%s from mempool \n", + a_values[i].key); + } + } + if(l_objs_processed < l_datums_size) + log_it(L_WARNING, "%s.%s: %zu records not processed", l_net->pub.name, l_chain->name, + l_datums_size - l_objs_processed); + dap_global_db_objs_delete(a_values, a_values_count); + + // Cleanup datums array + if(l_datums){ + for(size_t i = 0; i < a_values_count; i++) { + if (l_datums[i]) + DAP_DELETE(l_datums[i]); + } + DAP_DEL_Z(l_datums); + } + } + else { + log_it(L_INFO, "%s.%s: No records in mempool", l_net->pub.name, l_chain ? l_chain->name : "[no chain]"); + } + return true; +} + + /** * @brief dap_chain_net_proc_datapool * @param a_net */ void dap_chain_net_proc_mempool (dap_chain_net_t * a_net) { - dap_string_t * l_str_tmp = dap_string_new(NULL); dap_chain_t *l_chain; DL_FOREACH(a_net->pub.chains, l_chain) { - char *l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool(l_chain); - - size_t l_objs_size = 0; - dap_global_db_obj_t * l_objs = dap_chain_global_db_gr_load(l_gdb_group_mempool, &l_objs_size); - if(l_objs_size) { - log_it(L_INFO, "%s.%s: Found %zu records :", a_net->pub.name, l_chain->name, - l_objs_size); - size_t l_datums_size = l_objs_size; - dap_chain_datum_t ** l_datums = DAP_NEW_Z_SIZE(dap_chain_datum_t*, - sizeof(dap_chain_datum_t*) * l_datums_size); - size_t l_objs_size_tmp = (l_objs_size > 15) ? min(l_objs_size, 10) : l_objs_size; - for(size_t i = 0; i < l_objs_size; i++) { - dap_chain_datum_t * l_datum = (dap_chain_datum_t*) l_objs[i].value; - int l_verify_datum= dap_chain_net_verify_datum_for_add( a_net, l_datum) ; - if (l_verify_datum != 0){ - log_it(L_WARNING, "Datum doesn't pass verifications (code %d), delete such datum from pool", - l_verify_datum); - dap_chain_global_db_gr_del( l_objs[i].key, l_gdb_group_mempool); - l_datums[i] = NULL; - }else{ - l_datums[i] = l_datum; - if(i < l_objs_size_tmp) { - char buf[50] = { '\0' }; - const char *l_type = NULL; - DAP_DATUM_TYPE_STR(l_datum->header.type_id, l_type) - dap_time_t l_ts_create = (dap_time_t) l_datum->header.ts_create; - log_it(L_INFO, "\t\t0x%s: type_id=%s ts_create=%s data_size=%u", - l_objs[i].key, l_type, - dap_ctime_r(&l_ts_create, buf), l_datum->header.data_size); - } - } - } - size_t l_objs_processed = l_chain->callback_add_datums(l_chain, l_datums, l_datums_size); - // Delete processed objects - size_t l_objs_processed_tmp = (l_objs_processed > 15) ? min(l_objs_processed, 10) : l_objs_processed; - for(size_t i = 0; i < l_objs_processed; i++) { - dap_chain_global_db_gr_del(l_objs[i].key, l_gdb_group_mempool); - if(i < l_objs_processed_tmp) { - dap_string_append_printf(l_str_tmp, "New event created, removed datum 0x%s from mempool \n", - l_objs[i].key); - } - } - if(l_objs_processed < l_datums_size) - log_it(L_WARNING, "%s.%s: %zu records not processed", a_net->pub.name, l_chain->name, - l_datums_size - l_objs_processed); - dap_global_db_objs_delete(l_objs, l_objs_size); - - // Cleanup datums array - if(l_datums){ - for(size_t i = 0; i < l_objs_size; i++) { - if (l_datums[i]) - DAP_DELETE(l_datums[i]); - } - DAP_DEL_Z(l_datums); - } - } - else { - log_it(L_INFO, "%s.%s: No records in mempool", a_net->pub.name, l_chain ? l_chain->name : "[no chain]"); - } + char *l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool_new(l_chain); + dap_global_db_get_all(l_gdb_group_mempool,0,s_proc_mempool_callback_load, l_chain); DAP_DELETE(l_gdb_group_mempool); - } } diff --git a/modules/net/dap_chain_node.c b/modules/net/dap_chain_node.c index 7ee41a233a..cd13b78d95 100644 --- a/modules/net/dap_chain_node.c +++ b/modules/net/dap_chain_node.c @@ -304,7 +304,7 @@ bool dap_chain_node_mempool_autoproc_init() continue; } char *l_gdb_group_mempool = NULL; - l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool(l_chain); + l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool_new(l_chain); size_t l_objs_size = 0; dap_global_db_obj_t *l_objs = dap_global_db_objs_get(l_gdb_group_mempool, &l_objs_size); if (l_objs_size) { diff --git a/modules/net/dap_chain_node_cli_cmd.c b/modules/net/dap_chain_node_cli_cmd.c index 9ec515f1de..7f7286f76d 100644 --- a/modules/net/dap_chain_node_cli_cmd.c +++ b/modules/net/dap_chain_node_cli_cmd.c @@ -2133,7 +2133,7 @@ int com_token_decl_sign(int argc, char ** argv, char ** a_str_reply) return -7; } - char * l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool(l_chain); + char * l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool_new(l_chain); if(!l_gdb_group_mempool) { l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool_by_chain_type(l_net, CHAIN_TYPE_TOKEN); } @@ -2276,7 +2276,7 @@ int com_token_decl_sign(int argc, char ** argv, char ** a_str_reply) * @param a_hash_out_type */ void s_com_mempool_list_print_for_chain(dap_chain_net_t * a_net, dap_chain_t * a_chain, dap_string_t * a_str_tmp, const char *a_hash_out_type){ - char * l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool(a_chain); + char * l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool_new(a_chain); if(!l_gdb_group_mempool){ dap_string_append_printf(a_str_tmp, "%s.%s: chain not found\n", a_net->pub.name, a_chain->name); }else{ @@ -2403,7 +2403,7 @@ int com_mempool_delete(int argc, char ** argv, char ** a_str_reply) l_datum_hash_hex_str = dap_enc_base58_to_hex_str_from_str(l_datum_hash_str); l_datum_hash_base58_str = dap_strdup(l_datum_hash_str); } - char * l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool(l_chain); + char * l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool_new(l_chain); uint8_t *l_data_tmp = l_datum_hash_hex_str ? dap_chain_global_db_gr_get(l_datum_hash_hex_str, NULL, l_gdb_group_mempool) : NULL; if(l_data_tmp && dap_chain_global_db_gr_del(l_datum_hash_hex_str, l_gdb_group_mempool)) { if(!dap_strcmp(l_hash_out_type,"hex")) @@ -2467,7 +2467,7 @@ int com_mempool_proc(int argc, char ** argv, char ** a_str_reply) } char * l_gdb_group_mempool = NULL, *l_gdb_group_mempool_tmp; - l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool(l_chain); + l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool_new(l_chain); l_gdb_group_mempool_tmp = l_gdb_group_mempool; // If full or light it doesnt work @@ -2480,7 +2480,7 @@ int com_mempool_proc(int argc, char ** argv, char ** a_str_reply) int ret = 0; dap_chain_node_cli_find_option_val(argv, arg_index, argc, "-datum", &l_datum_hash_str); if(l_datum_hash_str) { - char * l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool(l_chain); + char * l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool_new(l_chain); dap_string_t * l_str_tmp = dap_string_new(NULL); size_t l_datum_size=0; const char *l_datum_hash_out_str; @@ -2846,7 +2846,7 @@ int com_token_update(int a_argc, char ** a_argv, char ** a_str_reply) // Add datum to mempool with datum_token_update hash as a key char * l_gdb_group_mempool; if(l_chain) { - l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool(l_chain); + l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool_new(l_chain); } else { l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool_by_chain_type(l_net, CHAIN_TYPE_TOKEN); @@ -3353,7 +3353,7 @@ int com_token_decl(int a_argc, char ** a_argv, char ** a_str_reply) // Add datum to mempool with datum_token hash as a key char * l_gdb_group_mempool; if (l_chain) - l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool(l_chain); + l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool_new(l_chain); else l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool_by_chain_type(l_net, CHAIN_TYPE_TOKEN); if (!l_gdb_group_mempool) { @@ -3552,7 +3552,7 @@ int com_token_emit(int a_argc, char ** a_argv, char ** a_str_reply) // Delete token emission DAP_DEL_Z(l_emission); - char *l_gdb_group_mempool_emission = dap_chain_net_get_gdb_group_mempool(l_chain_emission); + char *l_gdb_group_mempool_emission = dap_chain_net_get_gdb_group_mempool_new(l_chain_emission); size_t l_datum_emission_size = sizeof(l_datum_emission->header) + l_datum_emission->header.data_size; @@ -4198,7 +4198,7 @@ int com_tx_verify(int a_argc, char **a_argv, char **a_str_reply) } } size_t l_tx_size = 0; - char *l_gdb_group = dap_chain_net_get_gdb_group_mempool(l_chain); + char *l_gdb_group = dap_chain_net_get_gdb_group_mempool_new(l_chain); dap_chain_datum_tx_t *l_tx = (dap_chain_datum_tx_t *) dap_chain_global_db_gr_get(l_hex_str_from58 ? l_hex_str_from58 : l_tx_hash_str, &l_tx_size, l_gdb_group); DAP_DEL_Z(l_hex_str_from58); @@ -4727,7 +4727,7 @@ static int s_check_cmd(int a_arg_index, int a_argc, char **a_argv, char **a_str_ dap_global_db_obj_t *l_objs = NULL; char *l_gdb_group = NULL; - l_gdb_group = dap_chain_net_get_gdb_group_mempool(l_chain); + l_gdb_group = dap_chain_net_get_gdb_group_mempool_new(l_chain); if (!l_gdb_group) { dap_chain_node_cli_set_reply_text(a_str_reply, "Not found network group for chain: %s", l_chain->name); l_ret = -1; diff --git a/modules/net/include/dap_chain_net.h b/modules/net/include/dap_chain_net.h index cf7d197865..75433d7a22 100644 --- a/modules/net/include/dap_chain_net.h +++ b/modules/net/include/dap_chain_net.h @@ -163,7 +163,7 @@ dap_chain_node_role_t dap_chain_net_get_role(dap_chain_net_t * a_net); * @param l_chain * @return */ -DAP_STATIC_INLINE char * dap_chain_net_get_gdb_group_mempool(dap_chain_t * l_chain) +DAP_STATIC_INLINE char * dap_chain_net_get_gdb_group_mempool_new(dap_chain_t * l_chain) { dap_chain_net_t * l_net = l_chain ? dap_chain_net_by_id(l_chain->net_id) : NULL; if ( l_net ) { @@ -173,7 +173,7 @@ DAP_STATIC_INLINE char * dap_chain_net_get_gdb_group_mempool(dap_chain_t * l_cha return NULL; } -DAP_STATIC_INLINE char * dap_chain_net_get_gdb_group_from_chain(dap_chain_t * l_chain) +DAP_STATIC_INLINE char * dap_chain_net_get_gdb_group_from_chain_new(dap_chain_t * l_chain) { dap_chain_net_t * l_net = l_chain ? dap_chain_net_by_id(l_chain->net_id) : NULL; if ( l_net ) diff --git a/modules/service/datum/dap_chain_net_srv_datum.c b/modules/service/datum/dap_chain_net_srv_datum.c index 6769701059..378bf7688c 100644 --- a/modules/service/datum/dap_chain_net_srv_datum.c +++ b/modules/service/datum/dap_chain_net_srv_datum.c @@ -123,7 +123,7 @@ static int s_srv_datum_cli(int argc, char ** argv, char **a_str_reply) { dap_chain_node_cli_find_option_val(argv, arg_index, argc, "datum", &l_datum_cmd_str); if ( l_datum_cmd_str != NULL ) { if ( strcmp(l_datum_cmd_str, "save") == 0) { - char * l_gdb_group = dap_chain_net_get_gdb_group_mempool(l_chain); + char * l_gdb_group = dap_chain_net_get_gdb_group_mempool_new(l_chain); size_t l_datum_size = 0; size_t l_path_length = strlen(l_system_datum_folder)+8+strlen(l_datum_hash_str); @@ -217,7 +217,7 @@ void s_order_notficator(void *a_arg, const char a_op_code, const char *a_group, dap_chain_datum_tx_t *l_tx_cond = NULL; DL_FOREACH(l_net->pub.chains, l_chain) { size_t l_datum_size; - char *l_gdb_group = dap_chain_net_get_gdb_group_mempool(l_chain); + char *l_gdb_group = dap_chain_net_get_gdb_group_mempool_new(l_chain); l_datum = (dap_chain_datum_t *)dap_chain_global_db_gr_get(l_tx_cond_hash_str, &l_datum_size, l_gdb_group); if (l_datum) break; diff --git a/modules/type/blocks/dap_chain_block_chunk.c b/modules/type/blocks/dap_chain_block_chunk.c index b4795aa686..5b83e405ac 100644 --- a/modules/type/blocks/dap_chain_block_chunk.c +++ b/modules/type/blocks/dap_chain_block_chunk.c @@ -42,7 +42,7 @@ dap_chain_block_chunks_t * dap_chain_block_chunks_create(dap_chain_cs_blocks_t * l_ret->gdb_group = dap_strdup_printf("local.%s.%s.block.chunks",a_blocks->chain->net_name, a_blocks->chain->name ); size_t l_objs_count =0; - dap_global_db_obj_t * l_objs= dap_chain_global_db_gr_load(l_ret->gdb_group, &l_objs_count); + dap_global_db_obj_t * l_objs= dap_global_db_objs_get(l_ret->gdb_group, &l_objs_count); for(size_t n=0; n< l_objs_count; n++){ dap_chain_block_cache_t *l_block_cache = dap_chain_block_cache_new(a_blocks, (dap_chain_block_t*)l_objs[n].value, l_objs[n].value_len); dap_chain_block_chunks_add(l_ret, l_block_cache ); diff --git a/modules/type/blocks/dap_chain_cs_blocks.c b/modules/type/blocks/dap_chain_cs_blocks.c index 1ae3534e57..a7c0eef4dc 100644 --- a/modules/type/blocks/dap_chain_cs_blocks.c +++ b/modules/type/blocks/dap_chain_cs_blocks.c @@ -428,7 +428,7 @@ static int s_cli_blocks(int a_argc, char ** a_argv, char **a_str_reply) }break; case SUBCMD_NEW_DATUM_ADD:{ size_t l_datums_count=1; - char * l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool(l_chain); + char * l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool_new(l_chain); dap_chain_datum_t ** l_datums = DAP_NEW_Z_SIZE(dap_chain_datum_t*, sizeof(dap_chain_datum_t*)*l_datums_count); size_t l_datum_size = 0; @@ -1305,31 +1305,42 @@ static size_t s_callback_add_datums(dap_chain_t *a_chain, dap_chain_datum_t **a_ return l_datum_processed; } -void dap_chain_cs_new_block_add_datums(dap_chain_t *a_chain) +/** + * @brief blocks async operations arguments + * @param blocks Consensus blocks object + */ +struct op_results_args{ + dap_chain_cs_blocks_t * blocks; + dap_chain_cs_blocks_callback_op_results_t callback_op_results; + void * callback_arg; +}; + + +static bool s_callback_new_block_add_datums (dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, + const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg) { - dap_chain_cs_blocks_t *l_blocks = DAP_CHAIN_CS_BLOCKS(a_chain); + struct op_results_args *l_args = (struct op_results_args *) a_arg; + dap_chain_cs_blocks_t *l_blocks = l_args->blocks; + dap_chain_t * l_chain = l_blocks->chain; dap_chain_cs_blocks_pvt_t *l_blocks_pvt = PVT(l_blocks); pthread_rwlock_wrlock(&l_blocks_pvt->datums_lock); - char *l_gdb_group = l_blocks->gdb_group_datums_queue; - size_t l_objs_size = 0; - dap_global_db_obj_t *l_objs = dap_chain_global_db_gr_load(l_gdb_group, &l_objs_size); - - if (l_objs_size) { - for (size_t i = 0; i < l_objs_size; i++) { - if (!l_objs[i].value_len) { - dap_chain_global_db_gr_del(l_objs[i].key, l_gdb_group); // delete from datums queue + + if (a_values_count) { + for (size_t i = 0; i < a_values_count; i++) { + if (!a_values[i].value_len) { + dap_global_db_delete(a_values[i].key, a_group, NULL, NULL); // delete from datums queue continue; } - dap_chain_datum_t *l_datum = (dap_chain_datum_t *)l_objs[i].value; + dap_chain_datum_t *l_datum = (dap_chain_datum_t *)a_values[i].value; size_t l_datum_size = dap_chain_datum_size(l_datum); if(!l_datum_size || l_datum == NULL){ // Was wrong datum thats not passed checks log_it(L_WARNING,"Datum in mempool processing comes NULL"); - dap_chain_global_db_gr_del(l_objs[i].key, l_gdb_group); // delete from datums queue + dap_chain_global_db_gr_del(a_values[i].key, a_group); // delete from datums queue continue; } // Verify for correctness - dap_chain_net_t *l_net = dap_chain_net_by_id(a_chain->net_id); + dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain->net_id); int l_verify_datum = dap_chain_net_verify_datum_for_add(l_net, l_datum); if (l_verify_datum != 0 && l_verify_datum != DAP_CHAIN_CS_VERIFY_CODE_TX_NO_PREVIOUS && @@ -1337,7 +1348,7 @@ void dap_chain_cs_new_block_add_datums(dap_chain_t *a_chain) l_verify_datum != DAP_CHAIN_CS_VERIFY_CODE_TX_NO_TOKEN) { log_it(L_WARNING, "Datum doesn't pass verifications (code %d)", l_verify_datum); - dap_chain_global_db_gr_del(l_objs[i].key, l_gdb_group); // delete from datums queue + dap_global_db_delete(a_values[i].key, a_group, NULL, NULL); continue; } if (l_blocks->block_new_size + l_datum_size > l_blocks_pvt->block_size_maximum) @@ -1346,7 +1357,7 @@ void dap_chain_cs_new_block_add_datums(dap_chain_t *a_chain) if (!l_blocks->block_new) { l_blocks->block_new = dap_chain_block_new(&l_blocks_pvt->block_cache_last->block_hash, &l_blocks->block_new_size); dap_chain_net_t *l_net = dap_chain_net_by_id(l_blocks->chain->net_id); - l_blocks->block_new->hdr.cell_id.uint64 = a_chain->cells->id.uint64; + l_blocks->block_new->hdr.cell_id.uint64 = l_chain->cells->id.uint64; l_blocks->block_new->hdr.chain_id.uint64 = l_blocks->chain->id.uint64; } @@ -1354,8 +1365,32 @@ void dap_chain_cs_new_block_add_datums(dap_chain_t *a_chain) l_datum, l_datum_size); } } - dap_global_db_objs_delete(l_objs, l_objs_size); pthread_rwlock_unlock(&l_blocks_pvt->datums_lock); + l_args->callback_op_results( l_args->blocks,0, l_args->callback_arg); + DAP_DELETE(l_args); + return true; +} + + +/** + * @brief Create new block and add datums from block's queue + * @param a_chain Chain object + * @param a_callback_op_results Executes after request completed + * @param a_arg Custom argument + */ +void dap_chain_cs_new_block_add_datums(dap_chain_t *a_chain, dap_chain_cs_blocks_callback_op_results_t a_callback_op_results, void * a_arg ) +{ + dap_chain_cs_blocks_t *l_blocks = DAP_CHAIN_CS_BLOCKS(a_chain); + + struct op_results_args * l_args = DAP_NEW_Z(struct op_results_args); + l_args->blocks = l_blocks; + l_args->callback_op_results = a_callback_op_results; + l_args->callback_arg = a_arg; + if( dap_global_db_get_all(l_blocks->gdb_group_datums_queue,0,s_callback_new_block_add_datums, l_args ) != 0 ){ + log_it(L_ERROR, "Can't execute get_all gdb request for dap_chain_cs_new_block_add_datums() function"); + DAP_DELETE(l_args); + } + } // static size_t s_callback_add_datums(dap_chain_t *a_chain, dap_chain_datum_t **a_datums, size_t a_datums_count) diff --git a/modules/type/blocks/include/dap_chain_cs_blocks.h b/modules/type/blocks/include/dap_chain_cs_blocks.h index ac07ba1ffb..2bcebb989f 100644 --- a/modules/type/blocks/include/dap_chain_cs_blocks.h +++ b/modules/type/blocks/include/dap_chain_cs_blocks.h @@ -30,6 +30,7 @@ typedef struct dap_chain_cs_blocks dap_chain_cs_blocks_t; typedef void (*dap_chain_cs_blocks_callback_t)(dap_chain_cs_blocks_t *); +typedef void (*dap_chain_cs_blocks_callback_op_results_t)(dap_chain_cs_blocks_t * a_cs_blocks, int a_rc, void * a_arg); typedef int (*dap_chain_cs_blocks_callback_block_t)(dap_chain_cs_blocks_t *, dap_chain_block_t *, size_t); typedef size_t (*dap_chain_cs_blocks_callback_block_sign_t)(dap_chain_cs_blocks_t *, dap_chain_block_t **, size_t); @@ -64,5 +65,5 @@ void dap_chain_cs_blocks_deinit(); int dap_chain_cs_blocks_new(dap_chain_t * a_chain, dap_config_t * a_chain_config); void dap_chain_cs_blocks_delete(dap_chain_t * a_chain); -void dap_chain_cs_new_block_add_datums(dap_chain_t *a_chain); +void dap_chain_cs_new_block_add_datums(dap_chain_t *a_chain,dap_chain_cs_blocks_callback_op_results_t a_callback_op_results, void * a_arg); diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 85f91b0fc6..c132878ae9 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -1612,7 +1612,7 @@ static int s_cli_dag(int argc, char ** argv, char **a_str_reply) switch ( l_event_subcmd ){ case SUBCMD_EVENT_CREATE:{ size_t l_datums_count=1; - char * l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool(l_chain); + char * l_gdb_group_mempool = dap_chain_net_get_gdb_group_mempool_new(l_chain); dap_chain_datum_t ** l_datums = DAP_NEW_Z_SIZE(dap_chain_datum_t*, sizeof(dap_chain_datum_t*)*l_datums_count); size_t l_datum_size = 0; -- GitLab