From f9efa2a34b3e2f0f97dd1e3744beb4e28f21982a Mon Sep 17 00:00:00 2001 From: "roman.khlopkov" <roman.khlopkov@demlabs.net> Date: Thu, 16 May 2024 11:43:39 +0300 Subject: [PATCH] [*] Atomic states for legacy context rewiew & rework --- modules/chain/dap_chain_ch.c | 61 +++++++++++++++---------------- modules/common/dap_chain_common.c | 3 +- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/modules/chain/dap_chain_ch.c b/modules/chain/dap_chain_ch.c index 5f284f4822..71b9fce9cb 100644 --- a/modules/chain/dap_chain_ch.c +++ b/modules/chain/dap_chain_ch.c @@ -369,7 +369,7 @@ static bool s_sync_out_gdb_proc_callback(void *a_arg) return false; if (l_cur_state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB && l_cur_state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB) // Illegal context - goto context_delete; + return false; dap_list_t *l_list_out = dap_global_db_legacy_list_get_multiple(l_context->db_list, s_update_pack_size); if (!l_list_out) { dap_chain_ch_sync_request_old_t l_payload = { .node_addr = g_node_addr }; @@ -380,18 +380,15 @@ static bool s_sync_out_gdb_proc_callback(void *a_arg) l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, &l_payload, sizeof(l_payload)); if (l_cur_state == DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB) { - if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_IDLE) || - l_cur_state == DAP_CHAIN_CH_STATE_ERROR) + log_it(L_INFO, "Synchronized database: items synchronized %" DAP_UINT64_FORMAT_U " from %zu", + l_context->stats_request_gdbs_processed, l_context->db_list->items_number); + if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_IDLE)) goto context_delete; return false; } - log_it(L_INFO, "Synchronized database: items synchronized %" DAP_UINT64_FORMAT_U " from %zu", - l_context->stats_request_gdbs_processed, l_context->db_list->items_number); dap_global_db_legacy_list_rewind(l_context->db_list); - if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB_REMOTE) || - l_cur_state == DAP_CHAIN_CH_STATE_WAITING) - return false; - goto context_delete; + atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB_REMOTE); + return false; } void *l_data = NULL; @@ -403,8 +400,9 @@ static bool s_sync_out_gdb_proc_callback(void *a_arg) l_data = DAP_NEW_Z_SIZE(dap_chain_ch_update_element_t, l_data_size); if (!l_data) { log_it(L_CRITICAL, g_error_memory_alloc); - l_context->state = DAP_CHAIN_CH_STATE_ERROR; - goto context_delete; + if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_ERROR)) + goto context_delete; + return false; } } bool l_go_wait = false; @@ -443,8 +441,10 @@ static bool s_sync_out_gdb_proc_callback(void *a_arg) } l_pkt_pack = dap_global_db_pkt_pack_old(l_pkt_pack, l_pkt); if (!l_pkt_pack || l_cur_size == l_pkt_pack->data_size) { - l_context->state = DAP_CHAIN_CH_STATE_ERROR; - goto context_delete; + log_it(L_CRITICAL, g_error_memory_alloc); + if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_ERROR)) + goto context_delete; + return false; } l_context->stats_request_gdbs_processed++; l_data = l_pkt_pack; @@ -475,7 +475,7 @@ static bool s_sync_out_gdb_proc_callback(void *a_arg) l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, NULL, 0); } - return true; + return l_go_wait ? false : true; context_delete: dap_worker_exec_callback_on(l_context->worker->worker, s_legacy_sync_context_delete, l_context); return false; @@ -526,15 +526,16 @@ static bool s_sync_out_chains_proc_callback(void *a_arg) return false; if (l_cur_state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS && l_cur_state != DAP_CHAIN_CH_STATE_SYNC_CHAINS) // Illegal context - goto context_delete; + return false; dap_chain_ch_update_element_t *l_hashes = NULL; if (l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_CHAINS) { l_hashes = DAP_NEW_Z_SIZE(dap_chain_ch_update_element_t, s_update_pack_size * sizeof(dap_chain_ch_update_element_t)); if (!l_hashes) { log_it(L_CRITICAL, g_error_memory_alloc); - l_context->state = DAP_CHAIN_CH_STATE_ERROR; - goto context_delete; + if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_ERROR)) + goto context_delete; + return false; } } size_t l_data_size = 0; @@ -600,19 +601,16 @@ static bool s_sync_out_chains_proc_callback(void *a_arg) debug_if(l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_CHAINS, L_INFO, "Synchronized chain: items synchronized %" DAP_UINT64_FORMAT_U, l_context->stats_request_atoms_processed); if (l_cur_state == DAP_CHAIN_CH_STATE_SYNC_CHAINS) { - if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_IDLE) || - l_cur_state == DAP_CHAIN_CH_STATE_ERROR) + if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_IDLE)) goto context_delete; return false; } l_context->atom_iter->chain->callback_atom_iter_get(l_context->atom_iter, DAP_CHAIN_ITER_OP_FIRST, NULL); - if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_SYNC_CHAINS_REMOTE) || - l_cur_state == DAP_CHAIN_CH_STATE_WAITING) - return false; - goto context_delete; + atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_SYNC_CHAINS_REMOTE); + return false; } - return true; + return l_go_wait ? false : true; context_delete: dap_worker_exec_callback_on(l_context->worker->worker, s_legacy_sync_context_delete, l_context); return false; @@ -1681,10 +1679,8 @@ static void s_ch_chain_go_idle(dap_chain_ch_t *a_ch_chain) if (a_ch_chain->legacy_sync_context) { dap_chain_ch_state_t l_current_state = atomic_exchange( &((struct legacy_sync_context *)a_ch_chain->legacy_sync_context)->state, DAP_CHAIN_CH_STATE_IDLE); - if (l_current_state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS && - l_current_state != DAP_CHAIN_CH_STATE_SYNC_CHAINS && - l_current_state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB && - l_current_state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB) + if (l_current_state != DAP_CHAIN_CH_STATE_IDLE && + l_current_state != DAP_CHAIN_CH_STATE_ERROR) // Context will not be removed from proc thread s_legacy_sync_context_delete(a_ch_chain->legacy_sync_context); a_ch_chain->legacy_sync_context = NULL; @@ -1699,16 +1695,19 @@ static void s_stream_ch_io_complete(dap_events_socket_t *a_es, void *a_arg) dap_stream_ch_t *l_ch = dap_stream_ch_by_id_unsafe(l_stream, DAP_CHAIN_CH_ID); assert(l_ch); struct legacy_sync_context *l_context = DAP_CHAIN_CH(l_ch)->legacy_sync_context; - if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_WAITING) + if (!l_context) return; + dap_chain_ch_state_t l_expected = DAP_CHAIN_CH_STATE_WAITING; if (l_context->prev_state == DAP_CHAIN_CH_STATE_UPDATE_CHAINS || l_context->prev_state == DAP_CHAIN_CH_STATE_SYNC_CHAINS) { - l_context->state = l_context->prev_state; + if (!atomic_compare_exchange_strong(&l_context->state, &l_expected, l_context->prev_state)) + return; l_context->enqueued_data_size = 0; dap_proc_thread_callback_add(l_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_context); } else if (l_context->prev_state == DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB || l_context->prev_state == DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB) { - l_context->state = l_context->prev_state; + if (!atomic_compare_exchange_strong(&l_context->state, &l_expected, l_context->prev_state)) + return; l_context->enqueued_data_size = 0; dap_proc_thread_callback_add(l_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_context); } else diff --git a/modules/common/dap_chain_common.c b/modules/common/dap_chain_common.c index 1968c12792..92343bb29c 100644 --- a/modules/common/dap_chain_common.c +++ b/modules/common/dap_chain_common.c @@ -74,7 +74,8 @@ size_t dap_chain_hash_slow_to_str( dap_chain_hash_slow_t *a_hash, char *a_str, s char *dap_chain_addr_to_str(const dap_chain_addr_t *a_addr) { dap_return_val_if_pass(!a_addr, NULL); - dap_return_val_if_pass(dap_chain_addr_is_blank(a_addr), "null"); + if (dap_chain_addr_is_blank(a_addr)) + return "null"; static _Thread_local char s_buf[DAP_ENC_BASE58_ENCODE_SIZE(sizeof(dap_chain_addr_t))] = { '\0' }; return dap_enc_base58_encode(a_addr, sizeof(dap_chain_addr_t), s_buf) ? s_buf : NULL; } -- GitLab