diff --git a/modules/chain/dap_chain.c b/modules/chain/dap_chain.c index 608574bd6fd043936e1ab5efa8a84b8fa49442f2..2a2d7907a8581c693e2c265db8ebca05c25c8b2c 100644 --- a/modules/chain/dap_chain.c +++ b/modules/chain/dap_chain.c @@ -724,7 +724,8 @@ ssize_t dap_chain_atom_save(dap_chain_cell_t *a_chain_cell, const uint8_t *a_ato if (l_net_cluster) { size_t l_pkt_size = a_atom_size + sizeof(dap_chain_ch_pkt_t); dap_chain_ch_pkt_t *l_pkt = dap_chain_ch_pkt_new(l_chain->net_id, l_chain->id, - a_chain_cell->id, a_atom, a_atom_size); + a_chain_cell->id, a_atom, a_atom_size, + DAP_CHAIN_CH_PKT_VERSION_CURRENT); if (l_pkt) { dap_gossip_msg_issue(l_net_cluster, DAP_CHAIN_CH_ID, l_pkt, l_pkt_size, a_new_atom_hash); DAP_DELETE(l_pkt); diff --git a/modules/chain/dap_chain_ch.c b/modules/chain/dap_chain_ch.c index 71b9fce9cb489736123694d7b5df6ff8776c8279..cd9cc9923de6142b9649c86bbecb535e74cb283a 100644 --- a/modules/chain/dap_chain_ch.c +++ b/modules/chain/dap_chain_ch.c @@ -365,11 +365,11 @@ static bool s_sync_out_gdb_proc_callback(void *a_arg) { struct legacy_sync_context *l_context = a_arg; dap_chain_ch_state_t l_cur_state = l_context->state; - if (l_cur_state == DAP_CHAIN_CH_STATE_WAITING) - return false; - if (l_cur_state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB && l_cur_state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB) + if (l_cur_state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB && l_cur_state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB) { // Illegal context - return false; + assert(l_cur_state == DAP_CHAIN_CH_STATE_IDLE); + goto context_delete; + } dap_list_t *l_list_out = dap_global_db_legacy_list_get_multiple(l_context->db_list, s_update_pack_size); if (!l_list_out) { dap_chain_ch_sync_request_old_t l_payload = { .node_addr = g_node_addr }; @@ -378,17 +378,17 @@ static bool s_sync_out_gdb_proc_callback(void *a_arg) debug_if(s_debug_legacy, L_INFO, "Out: %s", dap_chain_ch_pkt_type_to_str(l_type)); dap_chain_ch_pkt_write_mt(l_context->worker, l_context->ch_uuid, l_type, l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, - &l_payload, sizeof(l_payload)); + &l_payload, sizeof(l_payload), DAP_CHAIN_CH_PKT_VERSION_LEGACY); if (l_cur_state == DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB) { log_it(L_INFO, "Synchronized database: items synchronized %" DAP_UINT64_FORMAT_U " from %zu", l_context->stats_request_gdbs_processed, l_context->db_list->items_number); - if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_IDLE)) - goto context_delete; - return false; + l_context->state = DAP_CHAIN_CH_STATE_IDLE; + goto context_delete; } dap_global_db_legacy_list_rewind(l_context->db_list); - atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB_REMOTE); - return false; + if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB_REMOTE)) + return false; + goto context_delete; } void *l_data = NULL; @@ -400,9 +400,8 @@ static bool s_sync_out_gdb_proc_callback(void *a_arg) l_data = DAP_NEW_Z_SIZE(dap_chain_ch_update_element_t, l_data_size); if (!l_data) { log_it(L_CRITICAL, g_error_memory_alloc); - if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_ERROR)) - goto context_delete; - return false; + l_context->state = DAP_CHAIN_CH_STATE_ERROR; + goto context_delete; } } bool l_go_wait = false; @@ -426,13 +425,12 @@ static bool s_sync_out_gdb_proc_callback(void *a_arg) if (l_cur_size + sizeof(dap_global_db_pkt_old_t) + l_pkt->data_size >= DAP_CHAIN_PKT_EXPECT_SIZE) { l_context->enqueued_data_size += l_data_size; if (!l_go_wait && l_context->enqueued_data_size > DAP_EVENTS_SOCKET_BUF_SIZE / 2) { - atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_WAITING); l_context->prev_state = l_cur_state; l_go_wait = true; } dap_chain_ch_pkt_write_mt(l_context->worker, l_context->ch_uuid, l_type, l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, - l_data, l_data_size); + l_data, l_data_size, DAP_CHAIN_CH_PKT_VERSION_LEGACY); debug_if(s_debug_legacy, L_INFO, "Send one global_db packet len=%zu (rest=%zu/%zu items)", l_data_size, l_context->db_list->items_rest, l_context->db_list->items_number); l_context->last_activity = dap_time_now(); @@ -442,9 +440,8 @@ static bool s_sync_out_gdb_proc_callback(void *a_arg) l_pkt_pack = dap_global_db_pkt_pack_old(l_pkt_pack, l_pkt); if (!l_pkt_pack || l_cur_size == l_pkt_pack->data_size) { log_it(L_CRITICAL, g_error_memory_alloc); - if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_ERROR)) - goto context_delete; - return false; + l_context->state = DAP_CHAIN_CH_STATE_ERROR; + goto context_delete; } l_context->stats_request_gdbs_processed++; l_data = l_pkt_pack; @@ -460,7 +457,7 @@ static bool s_sync_out_gdb_proc_callback(void *a_arg) if (l_data && l_data_size) { dap_chain_ch_pkt_write_mt(l_context->worker, l_context->ch_uuid, l_type, l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, - l_data, l_data_size); + l_data, l_data_size, DAP_CHAIN_CH_PKT_VERSION_LEGACY); if (l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB) debug_if(s_debug_legacy, L_INFO, "Out: %s, %zu records", dap_chain_ch_pkt_type_to_str(l_type), i); else @@ -473,9 +470,12 @@ static bool s_sync_out_gdb_proc_callback(void *a_arg) debug_if(s_debug_more, L_INFO, "Send one GlobalDB no freeze packet"); dap_chain_ch_pkt_write_mt(l_context->worker, l_context->ch_uuid, DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB_NO_FREEZE, l_context->request_hdr.net_id, l_context->request_hdr.chain_id, - l_context->request_hdr.cell_id, NULL, 0); + l_context->request_hdr.cell_id, NULL, 0, DAP_CHAIN_CH_PKT_VERSION_LEGACY); } - return l_go_wait ? false : true; + if (!l_go_wait) + return true; + if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_WAITING)) + return false; context_delete: dap_worker_exec_callback_on(l_context->worker->worker, s_legacy_sync_context_delete, l_context); return false; @@ -509,7 +509,8 @@ static bool s_gdb_in_pkt_proc_callback(void *a_arg) l_objs[0].flags |= DAP_GLOBAL_DB_RECORD_NEW; if (l_success && dap_global_db_set_raw_sync(l_objs, l_objs_count)) { const char *l_err_str = s_error_type_to_string(DAP_CHAIN_CH_ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED); - dap_chain_ch_pkt_t *l_chain_pkt = dap_chain_ch_pkt_new(l_args->hdr.net_id, l_args->hdr.chain_id, l_args->hdr.cell_id, l_err_str, strlen(l_err_str)); + dap_chain_ch_pkt_t *l_chain_pkt = dap_chain_ch_pkt_new(l_args->hdr.net_id, l_args->hdr.chain_id, l_args->hdr.cell_id, + l_err_str, strlen(l_err_str), DAP_CHAIN_CH_PKT_VERSION_LEGACY); dap_stream_ch_pkt_write_mt(l_args->worker, l_args->uuid, DAP_CHAIN_CH_PKT_TYPE_ERROR, l_chain_pkt, dap_chain_ch_pkt_get_size(l_chain_pkt)); DAP_DELETE(l_chain_pkt); } @@ -522,20 +523,19 @@ static bool s_sync_out_chains_proc_callback(void *a_arg) { struct legacy_sync_context *l_context = a_arg; dap_chain_ch_state_t l_cur_state = l_context->state; - if (l_cur_state == DAP_CHAIN_CH_STATE_WAITING) - return false; - if (l_cur_state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS && l_cur_state != DAP_CHAIN_CH_STATE_SYNC_CHAINS) + if (l_cur_state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS && l_cur_state != DAP_CHAIN_CH_STATE_SYNC_CHAINS) { // Illegal context - return false; + assert(l_cur_state == DAP_CHAIN_CH_STATE_IDLE); + goto context_delete; + } dap_chain_ch_update_element_t *l_hashes = NULL; if (l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_CHAINS) { l_hashes = DAP_NEW_Z_SIZE(dap_chain_ch_update_element_t, s_update_pack_size * sizeof(dap_chain_ch_update_element_t)); if (!l_hashes) { log_it(L_CRITICAL, g_error_memory_alloc); - if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_ERROR)) - goto context_delete; - return false; + l_context->state = DAP_CHAIN_CH_STATE_ERROR; + goto context_delete; } } size_t l_data_size = 0; @@ -555,13 +555,12 @@ static bool s_sync_out_chains_proc_callback(void *a_arg) if (!l_hash_item) { l_context->enqueued_data_size += l_context->atom_iter->cur_size; if (l_context->enqueued_data_size > DAP_EVENTS_SOCKET_BUF_SIZE / 2) { - atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_WAITING); l_context->prev_state = l_cur_state; l_go_wait = true; } dap_chain_ch_pkt_write_mt(l_context->worker, l_context->ch_uuid, DAP_CHAIN_CH_PKT_TYPE_CHAIN_OLD, l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, - l_context->atom_iter->cur, l_context->atom_iter->cur_size); + l_context->atom_iter->cur, l_context->atom_iter->cur_size, DAP_CHAIN_CH_PKT_VERSION_LEGACY); debug_if(s_debug_legacy, L_INFO, "Out CHAIN pkt: atom hash %s (size %zd)", dap_hash_fast_to_str_static(l_context->atom_iter->cur_hash), l_context->atom_iter->cur_size); l_context->last_activity = dap_time_now(); @@ -579,7 +578,7 @@ static bool s_sync_out_chains_proc_callback(void *a_arg) if (l_hashes) { dap_chain_ch_pkt_write_mt(l_context->worker, l_context->ch_uuid, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS, l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, - l_hashes, l_data_size); + l_hashes, l_data_size, DAP_CHAIN_CH_PKT_VERSION_LEGACY); debug_if(s_debug_legacy, L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS, %zu records", l_data_size / sizeof(dap_chain_ch_update_element_t)); DAP_DELETE(l_hashes); } else if (l_context->last_activity + 3 < dap_time_now()) { @@ -587,7 +586,7 @@ static bool s_sync_out_chains_proc_callback(void *a_arg) debug_if(s_debug_more, L_INFO, "Send one chain no freeze packet"); dap_chain_ch_pkt_write_mt(l_context->worker, l_context->ch_uuid, DAP_CHAIN_CH_PKT_TYPE_CHAINS_NO_FREEZE, l_context->request_hdr.net_id, l_context->request_hdr.chain_id, - l_context->request_hdr.cell_id, NULL, 0); + l_context->request_hdr.cell_id, NULL, 0, DAP_CHAIN_CH_PKT_VERSION_LEGACY); } if (l_chain_end) { @@ -597,20 +596,22 @@ static bool s_sync_out_chains_proc_callback(void *a_arg) debug_if(s_debug_legacy, L_INFO, "Out: %s", dap_chain_ch_pkt_type_to_str(l_type)); dap_chain_ch_pkt_write_mt(l_context->worker, l_context->ch_uuid, l_type, l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, - &l_payload, sizeof(l_payload)); + &l_payload, sizeof(l_payload), DAP_CHAIN_CH_PKT_VERSION_LEGACY); debug_if(l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_CHAINS, L_INFO, "Synchronized chain: items synchronized %" DAP_UINT64_FORMAT_U, l_context->stats_request_atoms_processed); if (l_cur_state == DAP_CHAIN_CH_STATE_SYNC_CHAINS) { - if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_IDLE)) - goto context_delete; - return false; + l_context->state = DAP_CHAIN_CH_STATE_IDLE; + goto context_delete; } l_context->atom_iter->chain->callback_atom_iter_get(l_context->atom_iter, DAP_CHAIN_ITER_OP_FIRST, NULL); - atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_SYNC_CHAINS_REMOTE); - return false; + if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_SYNC_CHAINS_REMOTE)) + return false; + goto context_delete; } - - return l_go_wait ? false : true; + if (!l_go_wait) + return true; + if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_WAITING)) + return false; context_delete: dap_worker_exec_callback_on(l_context->worker->worker, s_legacy_sync_context_delete, l_context); return false; @@ -680,7 +681,7 @@ static bool s_sync_in_chains_callback(void *a_arg) if (l_ack_send && l_args->ack_req) { uint64_t l_ack_num = (l_chain_pkt->hdr.num_hi << 16) | l_chain_pkt->hdr.num_lo; dap_chain_ch_pkt_t *l_pkt = dap_chain_ch_pkt_new(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - &l_ack_num, sizeof(uint64_t)); + &l_ack_num, sizeof(uint64_t), DAP_CHAIN_CH_PKT_VERSION_CURRENT); dap_stream_ch_pkt_send_by_addr(&l_args->addr, DAP_CHAIN_CH_ID, DAP_CHAIN_CH_PKT_TYPE_CHAIN_ACK, l_pkt, dap_chain_ch_pkt_get_size(l_pkt)); DAP_DELETE(l_pkt); debug_if(s_debug_more, L_DEBUG, "Out: CHAIN_ACK %s for net %s to destination " NODE_ADDR_FP_STR " with num %" DAP_UINT64_FORMAT_U, @@ -718,7 +719,7 @@ void dap_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, dap_chain_net_id_t dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(a_ch); dap_return_if_fail(l_ch_chain); const char *l_err_str = s_error_type_to_string(a_error); - dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_ERROR, a_net_id, a_chain_id, a_cell_id, l_err_str, strlen(l_err_str) + 1); + dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_ERROR, a_net_id, a_chain_id, a_cell_id, l_err_str, strlen(l_err_str) + 1, DAP_CHAIN_CH_PKT_VERSION_LEGACY); s_ch_chain_go_idle(l_ch_chain); } @@ -749,7 +750,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_chain_pkt->hdr.version, DAP_CHAIN_CH_PKT_VERSION_CURRENT); return false; } - if (l_chain_pkt->hdr.version >= 2 && + if (l_chain_pkt->hdr.version > DAP_CHAIN_CH_PKT_VERSION_LEGACY && l_chain_pkt_data_size != l_chain_pkt->hdr.data_size) { log_it(L_WARNING, "Incorrect chain packet size %zu, expected %u", l_chain_pkt_data_size, l_chain_pkt->hdr.data_size); @@ -820,7 +821,6 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE); return false; } - dap_chain_ch_pkt_set_version(DAP_CHAIN_CH_PKT_VERSION_CURRENT); dap_chain_ch_sync_request_t *l_request = (dap_chain_ch_sync_request_t *)l_chain_pkt->data; if (s_debug_more) log_it(L_INFO, "In: CHAIN_REQ pkt: net 0x%016" DAP_UINT64_FORMAT_x " chain 0x%016" DAP_UINT64_FORMAT_x @@ -870,7 +870,8 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_chain_ch_summary_t l_sum = { .num_cur = l_iter->cur_num, .num_last = l_last_num }; dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_CHAIN_SUMMARY, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, - l_chain_pkt->hdr.cell_id, &l_sum, sizeof(l_sum)); + l_chain_pkt->hdr.cell_id, &l_sum, sizeof(l_sum), + DAP_CHAIN_CH_PKT_VERSION_CURRENT); debug_if(s_debug_more, L_DEBUG, "Out: CHAIN_SUMMARY %s for net %s to destination " NODE_ADDR_FP_STR, l_chain->name, l_chain->net_name, NODE_ADDR_FP_ARGS_S(a_ch->stream->node)); struct sync_context *l_context = DAP_NEW_Z(struct sync_context); @@ -907,7 +908,8 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) .last_num = l_iter->cur_num }; dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_CHAIN_MISS, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, - l_chain_pkt->hdr.cell_id, &l_miss_info, sizeof(l_miss_info)); + l_chain_pkt->hdr.cell_id, &l_miss_info, sizeof(l_miss_info), + DAP_CHAIN_CH_PKT_VERSION_CURRENT); if (s_debug_more) { char l_last_hash_str[DAP_HASH_FAST_STR_SIZE]; dap_hash_fast_to_str(&l_miss_info.last_hash, l_last_hash_str, DAP_HASH_FAST_STR_SIZE); @@ -923,7 +925,8 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } else { dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAIN, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, - l_chain_pkt->hdr.cell_id, NULL, 0); + l_chain_pkt->hdr.cell_id, NULL, 0, + DAP_CHAIN_CH_PKT_VERSION_CURRENT); debug_if(s_debug_more, L_DEBUG, "Out: SYNCED_CHAIN %s for net %s to destination " NODE_ADDR_FP_STR, l_chain ? l_chain->name : "(null)", l_chain ? l_chain->net_name : "(null)", @@ -977,8 +980,9 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } if (l_context->num_last == l_ack_num) { dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAIN, - l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, - l_chain_pkt->hdr.cell_id, NULL, 0); + l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, + l_chain_pkt->hdr.cell_id, NULL, 0, + DAP_CHAIN_CH_PKT_VERSION_CURRENT); s_ch_chain_go_idle(l_ch_chain); break; } @@ -1056,7 +1060,6 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) DAP_CHAIN_CH_ERROR_NET_INVALID_ID); break; } - dap_chain_ch_pkt_set_version(1); if (!dap_link_manager_get_net_condition(l_chain_pkt->hdr.net_id.uint64)) { log_it(L_WARNING, "Net id 0x%016" DAP_UINT64_FORMAT_x " is offline", l_chain_pkt->hdr.net_id.uint64); dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, @@ -1102,7 +1105,8 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) debug_if(s_debug_legacy, L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_START"); dap_chain_ch_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_START, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, - l_chain_pkt->hdr.cell_id, &g_node_addr, sizeof(dap_chain_node_addr_t)); + l_chain_pkt->hdr.cell_id, &g_node_addr, sizeof(dap_chain_node_addr_t), + DAP_CHAIN_CH_PKT_VERSION_LEGACY); dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_context); } break; @@ -1190,7 +1194,8 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) debug_if(s_debug_legacy, L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB"); dap_chain_ch_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB, l_context->request_hdr.net_id, l_context->request_hdr.chain_id, - l_context->request_hdr.cell_id, &g_node_addr, sizeof(dap_chain_node_addr_t)); + l_context->request_hdr.cell_id, &g_node_addr, sizeof(dap_chain_node_addr_t), + DAP_CHAIN_CH_PKT_VERSION_LEGACY); dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_context); } break; @@ -1284,7 +1289,8 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_chain_ch_sync_request_old_t l_request = { .node_addr = g_node_addr }; debug_if(s_debug_legacy, L_INFO, "Out: UPDATE_GLOBAL_DB_REQ pkt"); dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_REQ, l_context->request_hdr.net_id, - l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, &l_request, sizeof(l_request)); + l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, &l_request, sizeof(l_request), + DAP_CHAIN_CH_PKT_VERSION_LEGACY); } break; /// --- Chains update --- @@ -1298,7 +1304,6 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE); return false; } - dap_chain_ch_pkt_set_version(1); if (!dap_link_manager_get_net_condition(l_chain_pkt->hdr.net_id.uint64)) { log_it(L_WARNING, "Net id 0x%016" DAP_UINT64_FORMAT_x " is offline", l_chain_pkt->hdr.net_id.uint64); dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, @@ -1350,8 +1355,9 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) debug_if(s_debug_legacy, L_INFO, "Out: UPDATE_CHAINS_START pkt: net %s chain %s cell 0x%016"DAP_UINT64_FORMAT_X, l_chain->name, l_chain->net_name, l_chain_pkt->hdr.cell_id.uint64); dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_START, - l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, - l_chain_pkt->hdr.cell_id, NULL, 0); + l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, + l_chain_pkt->hdr.cell_id, NULL, 0, + DAP_CHAIN_CH_PKT_VERSION_LEGACY); dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_context); } break; @@ -1444,7 +1450,8 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) debug_if(s_debug_legacy, L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_FIRST_CHAIN"); dap_chain_ch_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_CHAIN_CH_PKT_TYPE_FIRST_CHAIN, l_context->request_hdr.net_id, l_context->request_hdr.chain_id, - l_context->request_hdr.cell_id, &g_node_addr, sizeof(dap_chain_node_addr_t)); + l_context->request_hdr.cell_id, &g_node_addr, sizeof(dap_chain_node_addr_t), + DAP_CHAIN_CH_PKT_VERSION_LEGACY); dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_context); } break; @@ -1537,7 +1544,8 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) debug_if(s_debug_legacy, L_INFO, "Out: UPDATE_CHAINS_REQ pkt"); dap_chain_ch_sync_request_old_t l_request = { .node_addr = g_node_addr }; dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_REQ, l_context->request_hdr.net_id, - l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, &l_request, sizeof(l_request)); + l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, &l_request, sizeof(l_request), + DAP_CHAIN_CH_PKT_VERSION_LEGACY); } break; } @@ -1570,7 +1578,8 @@ static bool s_sync_timer_callback(void *a_arg) NODE_ADDR_FP_ARGS_S(l_context->addr), l_context->net_id.uint64, l_context->chain_id.uint64, l_context->cell_id.uint64); dap_chain_ch_pkt_write_unsafe(l_ch, DAP_CHAIN_CH_PKT_TYPE_ERROR, l_context->net_id, - l_context->chain_id, l_context->cell_id, l_err_str, strlen(l_err_str) + 1); + l_context->chain_id, l_context->cell_id, l_err_str, strlen(l_err_str) + 1, + DAP_CHAIN_CH_PKT_VERSION_CURRENT); l_timer_break = true; } } else if (l_ch_chain->legacy_sync_context) { @@ -1581,7 +1590,8 @@ static bool s_sync_timer_callback(void *a_arg) NODE_ADDR_FP_ARGS_S(l_context->remote_addr), l_context->request_hdr.net_id.uint64, l_context->request_hdr.chain_id.uint64, l_context->request_hdr.cell_id.uint64); dap_chain_ch_pkt_write_unsafe(l_ch, DAP_CHAIN_CH_PKT_TYPE_ERROR, l_context->request_hdr.net_id, - l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, l_err_str, strlen(l_err_str) + 1); + l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, l_err_str, strlen(l_err_str) + 1, + DAP_CHAIN_CH_PKT_VERSION_LEGACY); l_timer_break = true; } } else @@ -1614,7 +1624,7 @@ static bool s_chain_iter_callback(void *a_arg) if (l_iter->cur_num > atomic_load_explicit(&l_context->allowed_num, memory_order_acquire)) break; dap_chain_ch_pkt_t *l_pkt = dap_chain_ch_pkt_new(l_context->net_id, l_context->chain_id, l_context->cell_id, - l_atom, l_atom_size); + l_atom, l_atom_size, DAP_CHAIN_CH_PKT_VERSION_CURRENT); // For master format binary complience l_pkt->hdr.num_lo = l_iter->cur_num & 0xFFFF; l_pkt->hdr.num_hi = (l_iter->cur_num >> 16) & 0xFF; @@ -1679,7 +1689,11 @@ static void s_ch_chain_go_idle(dap_chain_ch_t *a_ch_chain) if (a_ch_chain->legacy_sync_context) { dap_chain_ch_state_t l_current_state = atomic_exchange( &((struct legacy_sync_context *)a_ch_chain->legacy_sync_context)->state, DAP_CHAIN_CH_STATE_IDLE); - if (l_current_state != DAP_CHAIN_CH_STATE_IDLE && + if (l_current_state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS && + l_current_state != DAP_CHAIN_CH_STATE_SYNC_CHAINS && + l_current_state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB && + l_current_state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB && + l_current_state != DAP_CHAIN_CH_STATE_IDLE && l_current_state != DAP_CHAIN_CH_STATE_ERROR) // Context will not be removed from proc thread s_legacy_sync_context_delete(a_ch_chain->legacy_sync_context); @@ -1698,16 +1712,14 @@ static void s_stream_ch_io_complete(dap_events_socket_t *a_es, void *a_arg) if (!l_context) return; dap_chain_ch_state_t l_expected = DAP_CHAIN_CH_STATE_WAITING; + if (!atomic_compare_exchange_strong(&l_context->state, &l_expected, l_context->prev_state)) + return; if (l_context->prev_state == DAP_CHAIN_CH_STATE_UPDATE_CHAINS || l_context->prev_state == DAP_CHAIN_CH_STATE_SYNC_CHAINS) { - if (!atomic_compare_exchange_strong(&l_context->state, &l_expected, l_context->prev_state)) - return; l_context->enqueued_data_size = 0; dap_proc_thread_callback_add(l_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_context); } else if (l_context->prev_state == DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB || l_context->prev_state == DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB) { - if (!atomic_compare_exchange_strong(&l_context->state, &l_expected, l_context->prev_state)) - return; l_context->enqueued_data_size = 0; dap_proc_thread_callback_add(l_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_context); } else diff --git a/modules/chain/dap_chain_ch_pkt.c b/modules/chain/dap_chain_ch_pkt.c index 7cd4802bc4e0c22e69489d6835c4c13172a9665c..c4d42a196f894b0e69aa220776c891237f154507 100644 --- a/modules/chain/dap_chain_ch_pkt.c +++ b/modules/chain/dap_chain_ch_pkt.c @@ -23,17 +23,13 @@ #define LOG_TAG "dap_chain_ch_pkt" -static uint8_t s_pkt_version = DAP_CHAIN_CH_PKT_VERSION_CURRENT; - -void dap_chain_ch_pkt_set_version(uint8_t a_version) { s_pkt_version = a_version; } - static void s_chain_pkt_fill(dap_chain_ch_pkt_t *a_pkt, dap_chain_net_id_t a_net_id, dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, - const void *a_data, size_t a_data_size) + const void *a_data, size_t a_data_size, uint8_t a_version) { *a_pkt = (dap_chain_ch_pkt_t) { - .hdr = { .version = s_pkt_version, - .data_size = s_pkt_version == 1 + .hdr = { .version = a_version, + .data_size = a_version == DAP_CHAIN_CH_PKT_VERSION_LEGACY ? 0 : a_data_size, .net_id = a_net_id, @@ -53,7 +49,7 @@ static void s_chain_pkt_fill(dap_chain_ch_pkt_t *a_pkt, dap_chain_net_id_t a_net */ size_t dap_chain_ch_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type, dap_chain_net_id_t a_net_id, dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, - const void *a_data, size_t a_data_size) + const void *a_data, size_t a_data_size, uint8_t a_version) { size_t l_chain_pkt_size = sizeof(dap_chain_ch_pkt_hdr_t) + a_data_size; dap_chain_ch_pkt_t *l_chain_pkt = l_chain_pkt_size > 0x3FFF @@ -64,7 +60,7 @@ size_t dap_chain_ch_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type, log_it(L_CRITICAL, g_error_memory_alloc); return 0; } - s_chain_pkt_fill(l_chain_pkt, a_net_id, a_chain_id, a_cell_id, a_data, a_data_size); + s_chain_pkt_fill(l_chain_pkt, a_net_id, a_chain_id, a_cell_id, a_data, a_data_size, a_version); size_t l_ret = dap_stream_ch_pkt_write_unsafe(a_ch, a_type, l_chain_pkt, l_chain_pkt_size); if (l_chain_pkt_size > 0x3FFF) @@ -73,12 +69,12 @@ size_t dap_chain_ch_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type, } dap_chain_ch_pkt_t *dap_chain_ch_pkt_new(dap_chain_net_id_t a_net_id, dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, - const void *a_data, size_t a_data_size) + const void *a_data, size_t a_data_size, uint8_t a_version) { size_t l_chain_pkt_size = sizeof(dap_chain_ch_pkt_hdr_t) + a_data_size; dap_chain_ch_pkt_t *l_chain_pkt = DAP_NEW_Z_SIZE(dap_chain_ch_pkt_t, l_chain_pkt_size); if (l_chain_pkt) - s_chain_pkt_fill(l_chain_pkt, a_net_id, a_chain_id, a_cell_id, a_data, a_data_size); + s_chain_pkt_fill(l_chain_pkt, a_net_id, a_chain_id, a_cell_id, a_data, a_data_size, a_version); else log_it(L_CRITICAL, g_error_memory_alloc); return l_chain_pkt; @@ -97,7 +93,7 @@ dap_chain_ch_pkt_t *dap_chain_ch_pkt_new(dap_chain_net_id_t a_net_id, dap_chain_ */ size_t dap_chain_ch_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_uuid_t a_ch_uuid, uint8_t a_type, dap_chain_net_id_t a_net_id, dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, - const void *a_data, size_t a_data_size) + const void *a_data, size_t a_data_size, uint8_t a_version) { size_t l_chain_pkt_size = sizeof(dap_chain_ch_pkt_hdr_t) + a_data_size; dap_chain_ch_pkt_t *l_chain_pkt = l_chain_pkt_size > 0x3FFF @@ -108,7 +104,7 @@ size_t dap_chain_ch_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_uu log_it(L_CRITICAL, g_error_memory_alloc); return 0; } - s_chain_pkt_fill(l_chain_pkt, a_net_id, a_chain_id, a_cell_id, a_data, a_data_size); + s_chain_pkt_fill(l_chain_pkt, a_net_id, a_chain_id, a_cell_id, a_data, a_data_size, a_version); size_t l_ret = dap_stream_ch_pkt_write_mt(a_worker, a_ch_uuid, a_type, l_chain_pkt, l_chain_pkt_size); if (l_chain_pkt_size > 0x3FFF) @@ -131,9 +127,9 @@ size_t dap_chain_ch_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_uu */ size_t dap_chain_ch_pkt_write_inter(dap_events_socket_t *a_es_input, dap_stream_ch_uuid_t a_ch_uuid, uint8_t a_type, dap_chain_net_id_t a_net_id, dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, - const void * a_data, size_t a_data_size) + const void * a_data, size_t a_data_size, uint8_t a_version) { - dap_chain_ch_pkt_t *l_chain_pkt = dap_chain_ch_pkt_new(a_net_id, a_chain_id, a_cell_id, a_data, a_data_size); + dap_chain_ch_pkt_t *l_chain_pkt = dap_chain_ch_pkt_new(a_net_id, a_chain_id, a_cell_id, a_data, a_data_size, a_version); size_t l_ret = dap_stream_ch_pkt_write_inter(a_es_input, a_ch_uuid, a_type, l_chain_pkt, dap_chain_ch_pkt_get_size(l_chain_pkt)); DAP_DELETE(l_chain_pkt); diff --git a/modules/chain/include/dap_chain_ch_pkt.h b/modules/chain/include/dap_chain_ch_pkt.h index 44fbd2d0952dd3b621d1d58303b2d50757cbc16d..67b0465d74251d973c672143f6d2cba1d607a093 100644 --- a/modules/chain/include/dap_chain_ch_pkt.h +++ b/modules/chain/include/dap_chain_ch_pkt.h @@ -36,6 +36,7 @@ #include "dap_stream_ch.h" +#define DAP_CHAIN_CH_PKT_VERSION_LEGACY 0x01 #define DAP_CHAIN_CH_PKT_VERSION_CURRENT 0x02 //Legacy @@ -112,8 +113,6 @@ DAP_STATIC_INLINE const char *dap_chain_ch_pkt_type_to_str(uint8_t a_pkt_type) } } -void dap_chain_ch_pkt_set_version(uint8_t a_version); - // *** Active *** // typedef struct dap_chain_ch_sync_request { @@ -151,16 +150,16 @@ typedef struct dap_chain_ch_pkt { DAP_STATIC_INLINE size_t dap_chain_ch_pkt_get_size(dap_chain_ch_pkt_t *a_pkt) { return sizeof(dap_chain_ch_pkt_hdr_t) + a_pkt->hdr.data_size; } dap_chain_ch_pkt_t *dap_chain_ch_pkt_new(dap_chain_net_id_t a_net_id, dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, - const void *a_data, size_t a_data_size); + const void *a_data, size_t a_data_size, uint8_t a_version); size_t dap_chain_ch_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type, dap_chain_net_id_t a_net_id, dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, - const void *a_data, size_t a_data_size); + const void *a_data, size_t a_data_size, uint8_t a_version); size_t dap_chain_ch_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_uuid_t a_ch_uuid, uint8_t a_type, dap_chain_net_id_t a_net_id, dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, - const void *a_data, size_t a_data_size); + const void *a_data, size_t a_data_size, uint8_t a_version); size_t dap_chain_ch_pkt_write_inter(dap_events_socket_t *a_es_input, dap_stream_ch_uuid_t a_ch_uuid, uint8_t a_type, dap_chain_net_id_t a_net_id, dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, - const void *a_data, size_t a_data_size); + const void *a_data, size_t a_data_size, uint8_t a_version); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index c214348e962cc3aa1e9a9792f87726261a3e263a..78965d2c3893fcbff48721a08a052d452931d71e 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -2448,7 +2448,8 @@ static void s_ch_in_pkt_callback(dap_stream_ch_t *a_ch, uint8_t a_type, const vo ? l_net_pvt->sync_context.cur_cell->id : c_dap_chain_cell_id_null, &l_request, - sizeof(l_request)); + sizeof(l_request), + DAP_CHAIN_CH_PKT_VERSION_CURRENT); l_net_pvt->sync_context.requested_atom_hash = l_request.hash_from; l_net_pvt->sync_context.requested_atom_num = l_request.num_from; } @@ -2542,7 +2543,7 @@ static void s_sync_timer_callback(void *a_arg) l_request.num_from = l_last_num; dap_chain_ch_pkt_t *l_chain_pkt = dap_chain_ch_pkt_new(l_net->pub.id, l_net_pvt->sync_context.cur_chain->id, l_net_pvt->sync_context.cur_cell ? l_net_pvt->sync_context.cur_cell->id : c_dap_chain_cell_id_null, - &l_request, sizeof(l_request)); + &l_request, sizeof(l_request), DAP_CHAIN_CH_PKT_VERSION_CURRENT); if (!l_chain_pkt) { log_it(L_CRITICAL, g_error_memory_alloc); return;