From 1202dca67f42739eb659ed29239ef183cbd28dd7 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Mon, 17 Feb 2025 07:08:27 +0000 Subject: [PATCH] feature-15518 --- dap-sdk | 2 +- modules/chain/dap_chain_ch.c | 952 +----------------- modules/chain/dap_chain_ch_pkt.c | 4 +- modules/chain/include/dap_chain.h | 2 + modules/chain/include/dap_chain_ch.h | 4 - modules/chain/include/dap_chain_ch_pkt.h | 67 +- .../consensus/esbocs/dap_chain_cs_esbocs.c | 4 +- modules/ledger/dap_chain_ledger_decree.c | 2 +- modules/ledger/dap_chain_ledger_tx.c | 2 +- modules/net/dap_chain_net.c | 8 +- modules/net/dap_chain_node.c | 2 +- modules/node-cli/dap_chain_node_cli_cmd.c | 8 +- modules/type/blocks/dap_chain_cs_blocks.c | 19 +- .../type/blocks/include/dap_chain_cs_blocks.h | 2 - 14 files changed, 43 insertions(+), 1035 deletions(-) diff --git a/dap-sdk b/dap-sdk index 825fe16538..e15ca735db 160000 --- a/dap-sdk +++ b/dap-sdk @@ -1 +1 @@ -Subproject commit 825fe16538c62b1c4732b16a415c9cb966ff6085 +Subproject commit e15ca735db71aa657f357821bf0e9d19346a9ae2 diff --git a/modules/chain/dap_chain_ch.c b/modules/chain/dap_chain_ch.c index b882638ef9..b4d8e0bff8 100644 --- a/modules/chain/dap_chain_ch.c +++ b/modules/chain/dap_chain_ch.c @@ -24,16 +24,12 @@ */ #include "dap_common.h" -#include "dap_list.h" #include "dap_config.h" #include "dap_hash.h" #include "dap_time.h" #include "dap_worker.h" #include "dap_proc_thread.h" #include "dap_chain.h" -#include "dap_chain_cell.h" -#include "dap_global_db_legacy.h" -#include "dap_global_db_ch.h" #include "dap_stream.h" #include "dap_stream_pkt.h" #include "dap_stream_worker.h" @@ -73,45 +69,11 @@ typedef struct dap_chain_ch_hash_item { UT_hash_handle hh; } dap_chain_ch_hash_item_t; -struct legacy_sync_context { - dap_stream_worker_t *worker; - dap_stream_ch_uuid_t ch_uuid; - dap_stream_node_addr_t remote_addr; - dap_chain_ch_pkt_hdr_t request_hdr; - - _Atomic(dap_chain_ch_state_t) state; - dap_chain_ch_error_type_t last_error; - - bool is_type_of_gdb; - union { - struct { - dap_chain_ch_hash_item_t *remote_atoms; // Remote atoms - dap_chain_atom_iter_t *atom_iter; // Atom iterator - uint64_t stats_request_atoms_processed; // Atoms statictic - }; - struct { - dap_chain_ch_hash_item_t *remote_gdbs; // Remote gdbs - dap_global_db_legacy_list_t *db_list; // DB iterator - uint64_t stats_request_gdbs_processed; // DB statictic - }; - }; - - dap_time_t last_activity; - dap_chain_ch_state_t prev_state; - size_t enqueued_data_size; -}; - typedef struct dap_chain_ch { void *_inheritor; dap_timerfd_t *sync_timer; struct sync_context *sync_context; int idle_ack_counter; - - // Legacy section // - dap_timerfd_t *activity_timer; - uint32_t timer_shots; - int sent_breaks; - struct legacy_sync_context *legacy_sync_context; } dap_chain_ch_t; #define DAP_CHAIN_CH(a) ((dap_chain_ch_t *) ((a)->internal) ) @@ -143,9 +105,6 @@ static uint32_t s_sync_timeout = 30; static uint32_t s_sync_packets_per_thread_call = 10; static uint32_t s_sync_ack_window_size = 16; // atoms -// Legacy -static const uint_fast16_t s_update_pack_size = 100; // Number of hashes packed into the one packet - #ifdef DAP_SYS_DEBUG enum {MEMSTAT$K_STM_CH_CHAIN, MEMSTAT$K_NR}; @@ -160,12 +119,10 @@ const char* const s_error_type_to_string[] = { [DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE] = "INCORRECT_SYNC_SEQUENCE", [DAP_CHAIN_CH_ERROR_SYNC_TIMEOUT] = "SYNCHRONIZATION_TIMEOUT", [DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE] = "INVALID_PACKET_SIZE", - [DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE] = "INVALID_LEGACY_PACKET_SIZE", [DAP_CHAIN_CH_ERROR_NET_INVALID_ID] = "INVALID_NET_ID", [DAP_CHAIN_CH_ERROR_CHAIN_NOT_FOUND] = "CHAIN_NOT_FOUND", [DAP_CHAIN_CH_ERROR_ATOM_NOT_FOUND] = "ATOM_NOT_FOUND", [DAP_CHAIN_CH_ERROR_UNKNOWN_CHAIN_PKT_TYPE] = "UNKNOWN_CHAIN_PACKET_TYPE", - [DAP_CHAIN_CH_ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED] = "GLOBAL_DB_INTERNAL_SAVING_ERROR", [DAP_CHAIN_CH_ERROR_NET_IS_OFFLINE] = "NET_IS_OFFLINE", [DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY] = "OUT_OF_MEMORY", [DAP_CHAIN_CH_ERROR_INTERNAL] = "INTERNAL_ERROR" @@ -238,338 +195,6 @@ static void s_stream_ch_delete(dap_stream_ch_t *a_ch, void *a_arg) #endif } -// *** Legacy support code *** // - -/** - * @brief dap_chain_ch_create_sync_request_gdb - * @param a_ch_chain - * @param a_net - */ -struct legacy_sync_context *s_legacy_sync_context_create(dap_stream_ch_t *a_ch) -{ - dap_chain_ch_t * l_ch_chain = DAP_CHAIN_CH(a_ch); - dap_return_val_if_fail(l_ch_chain, NULL); - - struct legacy_sync_context *l_context = DAP_NEW_Z_RET_VAL_IF_FAIL(struct legacy_sync_context, NULL); - *l_context = (struct legacy_sync_context) { - .worker = a_ch->stream_worker, - .ch_uuid = a_ch->uuid, - .state = DAP_CHAIN_CH_STATE_IDLE, - .last_activity = dap_time_now() - }; - - - dap_stream_ch_uuid_t *l_uuid = DAP_DUP(&a_ch->uuid); - if (!l_uuid) { - log_it(L_CRITICAL, "%s", c_error_memory_alloc); - DAP_DELETE(l_context); - return NULL; - } - l_ch_chain->sync_timer = dap_timerfd_start_on_worker(a_ch->stream_worker->worker, 1000, s_sync_timer_callback, l_uuid); - a_ch->stream->esocket->callbacks.write_finished_callback = s_stream_ch_io_complete; - a_ch->stream->esocket->callbacks.arg = l_context; - if (l_context->worker->worker->_inheritor != a_ch->stream_worker) - log_it(L_CRITICAL, "Corrupted stream worker %p", a_ch->stream_worker); - return l_context; -} - -/** - * @brief s_stream_ch_chain_delete - * @param a_ch_chain - */ -static void s_legacy_sync_context_delete(void *a_arg) -{ - struct legacy_sync_context *l_context = a_arg; - dap_return_if_fail(l_context); - - dap_chain_ch_hash_item_t *l_hash_item, *l_tmp; - - if (l_context->is_type_of_gdb) { - HASH_ITER(hh, l_context->remote_gdbs, l_hash_item, l_tmp) { - // Clang bug at this, l_hash_item should change at every loop cycle - HASH_DEL(l_context->remote_gdbs, l_hash_item); - DAP_DELETE(l_hash_item); - } - l_context->remote_atoms = NULL; - - if (l_context->db_list) - dap_global_db_legacy_list_delete(l_context->db_list); - } else { - HASH_ITER(hh, l_context->remote_atoms, l_hash_item, l_tmp) { - // Clang bug at this, l_hash_item should change at every loop cycle - HASH_DEL(l_context->remote_atoms, l_hash_item); - DAP_DELETE(l_hash_item); - } - l_context->remote_gdbs = NULL; - - if (l_context->atom_iter) - l_context->atom_iter->chain->callback_atom_iter_delete(l_context->atom_iter); - } - - dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(l_context->worker, l_context->ch_uuid); - if (l_ch) { - DAP_CHAIN_CH(l_ch)->legacy_sync_context = NULL; - l_ch->stream->esocket->callbacks.write_finished_callback = NULL; - l_ch->stream->esocket->callbacks.arg = NULL; - } - - DAP_DELETE(l_context); -} - -static bool s_sync_out_gdb_proc_callback(void *a_arg) -{ - struct legacy_sync_context *l_context = a_arg; - dap_chain_ch_state_t l_cur_state = l_context->state; - if (l_cur_state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB && l_cur_state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB) { - // Illegal context - assert(l_cur_state == DAP_CHAIN_CH_STATE_IDLE); - goto context_delete; - } - dap_list_t *l_list_out = dap_global_db_legacy_list_get_multiple(l_context->db_list, s_update_pack_size); - if (!l_list_out) { - dap_chain_ch_sync_request_old_t l_payload = { .node_addr = g_node_addr }; - uint8_t l_type = l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB ? DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END - : DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB; - debug_if(s_debug_legacy, L_INFO, "Out: %s", dap_chain_ch_pkt_type_to_str(l_type)); - dap_chain_ch_pkt_write(l_context->worker, l_context->ch_uuid, l_type, - l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, - &l_payload, sizeof(l_payload), DAP_CHAIN_CH_PKT_VERSION_LEGACY); - if (l_cur_state == DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB) { - log_it(L_INFO, "Synchronized database: items synchronized %" DAP_UINT64_FORMAT_U " from %zu", - l_context->stats_request_gdbs_processed, l_context->db_list->items_number); - l_context->state = DAP_CHAIN_CH_STATE_IDLE; - goto context_delete; - } - dap_global_db_legacy_list_rewind(l_context->db_list); - if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB_REMOTE)) - return false; - goto context_delete; - } - - void *l_data = NULL; - size_t l_data_size = 0; - uint8_t l_type = DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB; - if (l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB) { - l_type = DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB; - l_data_size = dap_list_length(l_list_out) * sizeof(dap_chain_ch_update_element_t); - l_data = DAP_NEW_Z_SIZE(dap_chain_ch_update_element_t, l_data_size); - if (!l_data) { - log_it(L_CRITICAL, "%s", c_error_memory_alloc); - l_context->state = DAP_CHAIN_CH_STATE_ERROR; - goto context_delete; - } - } - bool l_go_wait = false; - size_t i = 0; - for (dap_list_t *it = l_list_out; it; it = it->next, i++) { - dap_global_db_pkt_old_t *l_pkt = it->data; - if (l_context->db_list->items_rest) - --l_context->db_list->items_rest; - dap_hash_t l_pkt_hash; - dap_hash_fast(l_pkt->data, l_pkt->data_size, &l_pkt_hash); - if (l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB) { - dap_chain_ch_update_element_t *l_hashes = l_data; - l_hashes[i].hash = l_pkt_hash; - l_hashes[i].size = l_pkt->data_size; - } else { // l_cur_state == DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB - dap_chain_ch_hash_item_t *l_hash_item = NULL; - HASH_FIND(hh, l_context->remote_gdbs, &l_pkt_hash, sizeof(dap_hash_fast_t), l_hash_item); - if (!l_hash_item) { - dap_global_db_pkt_old_t *l_pkt_pack = l_data; - size_t l_cur_size = l_pkt_pack ? l_pkt_pack->data_size : 0; - if (l_cur_size + sizeof(dap_global_db_pkt_old_t) + l_pkt->data_size >= DAP_CHAIN_PKT_EXPECT_SIZE) { - l_context->enqueued_data_size += l_data_size; - if (l_context->enqueued_data_size > DAP_EVENTS_SOCKET_BUF_SIZE / 2) - l_go_wait = true; - dap_chain_ch_pkt_write(l_context->worker, l_context->ch_uuid, l_type, - l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, - l_data, l_data_size, DAP_CHAIN_CH_PKT_VERSION_LEGACY); - debug_if(s_debug_legacy, L_INFO, "Send one global_db packet len=%zu (rest=%zu/%zu items)", l_data_size, - l_context->db_list->items_rest, l_context->db_list->items_number); - l_context->last_activity = dap_time_now(); - DAP_DEL_Z(l_pkt_pack); - l_cur_size = 0; - } - l_pkt_pack = dap_global_db_pkt_pack_old(l_pkt_pack, l_pkt); - if (!l_pkt_pack || l_cur_size == l_pkt_pack->data_size) { - log_it(L_CRITICAL, "%s", c_error_memory_alloc); - l_context->state = DAP_CHAIN_CH_STATE_ERROR; - goto context_delete; - } - l_context->stats_request_gdbs_processed++; - l_data = l_pkt_pack; - l_data_size = sizeof(dap_global_db_pkt_old_t) + l_pkt_pack->data_size; - } /* else // Over-extended debug - debug_if(s_debug_legacy, L_DEBUG, "Skip GDB hash %s because its already present in remote GDB hash table", - dap_hash_fast_to_str_static(&l_pkt_hash)); - */ - } - } - dap_list_free_full(l_list_out, NULL); - - if (l_data && l_data_size) { - dap_chain_ch_pkt_write(l_context->worker, l_context->ch_uuid, l_type, - l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, - l_data, l_data_size, DAP_CHAIN_CH_PKT_VERSION_LEGACY); - if (l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB) - debug_if(s_debug_legacy, L_INFO, "Out: %s, %zu records", dap_chain_ch_pkt_type_to_str(l_type), i); - else - debug_if(s_debug_legacy, L_INFO, "Send one global_db packet len=%zu (rest=%zu/%zu items)", l_data_size, - l_context->db_list->items_rest, l_context->db_list->items_number); - l_context->last_activity = dap_time_now(); - DAP_DELETE(l_data); - } else if (l_context->last_activity + 3 < dap_time_now()) { - l_context->last_activity = dap_time_now(); - debug_if(s_debug_more, L_INFO, "Send one GlobalDB no freeze packet"); - dap_chain_ch_pkt_write(l_context->worker, l_context->ch_uuid, DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB_NO_FREEZE, - l_context->request_hdr.net_id, l_context->request_hdr.chain_id, - l_context->request_hdr.cell_id, NULL, 0, DAP_CHAIN_CH_PKT_VERSION_LEGACY); - } - if (!l_go_wait) - return true; - l_context->prev_state = l_cur_state; - if (!atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_WAITING)) - goto context_delete; - return false; -context_delete: - dap_worker_exec_callback_on(l_context->worker->worker, s_legacy_sync_context_delete, l_context); - return false; -} - -struct record_processing_args { - dap_stream_worker_t *worker; - dap_stream_ch_uuid_t uuid; - dap_chain_ch_pkt_hdr_t hdr; - dap_global_db_pkt_old_t *pkt; - bool new; -}; - -static bool s_gdb_in_pkt_proc_callback(void *a_arg) -{ - struct record_processing_args *l_args = a_arg; - size_t l_objs_count = 0; - dap_store_obj_t *l_objs = dap_global_db_pkt_deserialize_old(l_args->pkt, &l_objs_count); - DAP_DELETE(l_args->pkt); - if (!l_objs || !l_objs_count) { - log_it(L_WARNING, "Deserialization of legacy global DB packet failed"); - DAP_DELETE(l_args); - return false; - } - bool l_success = false; - dap_stream_node_addr_t l_blank_addr = { .uint64 = 0 }; - for (uint32_t i = 0; i < l_objs_count; i++) - if (!(l_success = dap_global_db_ch_check_store_obj(l_objs + i, &l_blank_addr))) - break; - if (l_args->new && l_objs_count == 1) - l_objs[0].flags |= DAP_GLOBAL_DB_RECORD_NEW; - if (l_success) - dap_global_db_set_raw_sync(l_objs, l_objs_count); - dap_store_obj_free(l_objs, l_objs_count); - DAP_DELETE(l_args); - return false; -} - -static bool s_sync_out_chains_proc_callback(void *a_arg) -{ - struct legacy_sync_context *l_context = a_arg; - dap_chain_ch_state_t l_cur_state = l_context->state; - if (l_cur_state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS && l_cur_state != DAP_CHAIN_CH_STATE_SYNC_CHAINS) { - // Illegal context - assert(l_cur_state == DAP_CHAIN_CH_STATE_IDLE); - goto context_delete; - } - - dap_chain_ch_update_element_t *l_hashes = NULL; - if (l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_CHAINS) { - l_hashes = DAP_NEW_Z_SIZE(dap_chain_ch_update_element_t, s_update_pack_size * sizeof(dap_chain_ch_update_element_t)); - if (!l_hashes) { - log_it(L_CRITICAL, "%s", c_error_memory_alloc); - l_context->state = DAP_CHAIN_CH_STATE_ERROR; - goto context_delete; - } - } - size_t l_data_size = 0; - bool l_chain_end = false, l_go_wait = false; - for (uint_fast16_t i = 0; i < s_update_pack_size; i++) { - if (!l_context->atom_iter->cur || !l_context->atom_iter->cur_size) { - l_chain_end = true; - break; - } - if (l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_CHAINS) { - l_hashes[i].hash = *l_context->atom_iter->cur_hash; - l_hashes[i].size = l_context->atom_iter->cur_size; - l_data_size += sizeof(dap_chain_ch_update_element_t); - } else { // l_cur_state == DAP_CHAIN_CH_STATE_SYNC_CHAINS - dap_chain_ch_hash_item_t *l_hash_item = NULL; - HASH_FIND(hh, l_context->remote_atoms, l_context->atom_iter->cur_hash, sizeof(dap_hash_fast_t), l_hash_item); - if (!l_hash_item) { - l_context->enqueued_data_size += l_context->atom_iter->cur_size; - if (l_context->enqueued_data_size > DAP_EVENTS_SOCKET_BUF_SIZE / 2) - l_go_wait = true; - dap_chain_ch_pkt_write(l_context->worker, l_context->ch_uuid, DAP_CHAIN_CH_PKT_TYPE_CHAIN_OLD, - l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, - l_context->atom_iter->cur, l_context->atom_iter->cur_size, DAP_CHAIN_CH_PKT_VERSION_LEGACY); - debug_if(s_debug_legacy, L_INFO, "Out CHAIN pkt: atom hash %s (size %zd)", dap_hash_fast_to_str_static(l_context->atom_iter->cur_hash), - l_context->atom_iter->cur_size); - l_context->last_activity = dap_time_now(); - l_context->stats_request_atoms_processed++; - } /* else // Over-extended debug - debug_if(s_debug_legacy, L_DEBUG, "Skip atom hash %s because its already present in remote atoms hash table", - dap_hash_fast_to_str_static(&l_context->atom_iter->cur_hash)); - */ - } - l_context->atom_iter->chain->callback_atom_iter_get(l_context->atom_iter, DAP_CHAIN_ITER_OP_NEXT, NULL); - if (l_go_wait) - break; - } - - if (l_hashes) { - dap_chain_ch_pkt_write(l_context->worker, l_context->ch_uuid, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS, - l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, - l_hashes, l_data_size, DAP_CHAIN_CH_PKT_VERSION_LEGACY); - debug_if(s_debug_legacy, L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS, %zu records", l_data_size / sizeof(dap_chain_ch_update_element_t)); - DAP_DELETE(l_hashes); - } else if (l_context->last_activity + 3 < dap_time_now()) { - l_context->last_activity = dap_time_now(); - debug_if(s_debug_more, L_INFO, "Send one chain no freeze packet"); - dap_chain_ch_pkt_write(l_context->worker, l_context->ch_uuid, DAP_CHAIN_CH_PKT_TYPE_CHAINS_NO_FREEZE, - l_context->request_hdr.net_id, l_context->request_hdr.chain_id, - l_context->request_hdr.cell_id, NULL, 0, DAP_CHAIN_CH_PKT_VERSION_LEGACY); - } - - if (l_chain_end) { - dap_chain_ch_sync_request_old_t l_payload = { .node_addr = g_node_addr }; - uint8_t l_type = l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_CHAINS ? DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_END - : DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS; - debug_if(s_debug_legacy, L_INFO, "Out: %s", dap_chain_ch_pkt_type_to_str(l_type)); - dap_chain_ch_pkt_write(l_context->worker, l_context->ch_uuid, l_type, - l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, - &l_payload, sizeof(l_payload), DAP_CHAIN_CH_PKT_VERSION_LEGACY); - debug_if(l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_CHAINS, L_INFO, - "Synchronized chain: items synchronized %" DAP_UINT64_FORMAT_U, l_context->stats_request_atoms_processed); - if (l_cur_state == DAP_CHAIN_CH_STATE_SYNC_CHAINS) { - l_context->state = DAP_CHAIN_CH_STATE_IDLE; - goto context_delete; - } - l_context->atom_iter->chain->callback_atom_iter_get(l_context->atom_iter, DAP_CHAIN_ITER_OP_FIRST, NULL); - if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_SYNC_CHAINS_REMOTE)) - return false; - goto context_delete; - } - if (!l_go_wait) - return true; - l_context->prev_state = l_cur_state; - if (!atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_WAITING)) - goto context_delete; - return false; -context_delete: - dap_worker_exec_callback_on(l_context->worker->worker, s_legacy_sync_context_delete, l_context); - return false; -} - -// *** End of legacy support code *** // - - struct atom_processing_args { dap_stream_node_addr_t addr; bool ack_req; @@ -668,7 +293,7 @@ void dap_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, dap_chain_net_id_t dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(a_ch); dap_return_if_fail(l_ch_chain); const char *l_err_str = a_error < DAP_CHAIN_CH_ERROR_LAST ? s_error_type_to_string[a_error] : "UNDEFINED ERROR"; - dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_ERROR, a_net_id, a_chain_id, a_cell_id, l_err_str, strlen(l_err_str) + 1, DAP_CHAIN_CH_PKT_VERSION_LEGACY); + dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_ERROR, a_net_id, a_chain_id, a_cell_id, l_err_str, strlen(l_err_str) + 1, DAP_CHAIN_CH_PKT_VERSION_CURRENT); s_ch_chain_go_idle(l_ch_chain); } @@ -699,8 +324,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_chain_pkt->hdr.version, DAP_CHAIN_CH_PKT_VERSION_CURRENT); return false; } - if (l_chain_pkt->hdr.version > DAP_CHAIN_CH_PKT_VERSION_LEGACY && - l_chain_pkt_data_size != l_chain_pkt->hdr.data_size) { + if (l_chain_pkt_data_size != l_chain_pkt->hdr.data_size) { log_it(L_WARNING, "Incorrect chain packet size %zu, expected %u", l_chain_pkt_data_size, l_chain_pkt->hdr.data_size); return false; @@ -708,8 +332,6 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) switch (l_ch_pkt->hdr.type) { - /* *** New synchronization protocol *** */ - case DAP_CHAIN_CH_PKT_TYPE_ERROR: { if (!l_chain_pkt_data_size || l_chain_pkt->data[l_chain_pkt_data_size - 1] != 0) { log_it(L_WARNING, "Incorrect format with data size %zu in packet %s", l_chain_pkt_data_size, @@ -762,7 +384,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; case DAP_CHAIN_CH_PKT_TYPE_CHAIN_REQ: { - if (l_chain_pkt_data_size != sizeof(dap_chain_ch_sync_request_t)) { + if (l_chain_pkt_data_size != sizeof(dap_chain_ch_sync_request_old_t) && l_chain_pkt_data_size != sizeof(dap_chain_ch_sync_request_t)) { log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_CHAIN_REQ: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(dap_chain_ch_sync_request_t)); dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, @@ -770,13 +392,15 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE); return false; } + bool l_is_legacy = l_chain_pkt_data_size == sizeof(dap_chain_ch_sync_request_old_t); + // CAUTION: Unsafe cast, must check 'l_is_legacy' variable before access 'generation' field dap_chain_ch_sync_request_t *l_request = (dap_chain_ch_sync_request_t *)l_chain_pkt->data; if (s_debug_more) log_it(L_INFO, "In: CHAIN_REQ pkt: net 0x%016" DAP_UINT64_FORMAT_x " chain 0x%016" DAP_UINT64_FORMAT_x " cell 0x%016" DAP_UINT64_FORMAT_x ", hash from %s, num from %" DAP_UINT64_FORMAT_U, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, dap_hash_fast_to_str_static(&l_request->hash_from), l_request->num_from); - if (l_ch_chain->sync_context || l_ch_chain->legacy_sync_context) { + if (l_ch_chain->sync_context) { log_it(L_WARNING, "Can't process CHAIN_REQ request cause already busy with synchronization"); dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, @@ -799,7 +423,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) DAP_CHAIN_CH_ERROR_NET_IS_OFFLINE); break; } - bool l_sync_from_begin = dap_hash_fast_is_blank(&l_request->hash_from); + bool l_sync_from_begin = dap_hash_fast_is_blank(&l_request->hash_from) || (!l_is_legacy && l_request->generation < l_chain->generation); dap_chain_atom_iter_t *l_iter = l_chain->callback_atom_iter_create(l_chain, l_chain_pkt->hdr.cell_id, l_sync_from_begin ? NULL : &l_request->hash_from); if (!l_iter) { @@ -993,517 +617,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) DAP_CHAIN_CH_ERROR_UNKNOWN_CHAIN_PKT_TYPE); return false; -// } -//} - - /* *** Legacy *** */ - - /// --- GDB update --- - // Request for gdbs list update - case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_REQ: { - if (l_chain_pkt_data_size != sizeof(dap_chain_ch_sync_request_old_t)) { - log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size, - dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type)); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE); - return false; - } - dap_cluster_t *l_net_cluster = dap_cluster_find(dap_guuid_compose(l_chain_pkt->hdr.net_id.uint64, 0)); - if (!l_net_cluster || !l_net_cluster->mnemonim) { - log_it(L_WARNING, "Net id 0x%016" DAP_UINT64_FORMAT_x " not found", l_chain_pkt->hdr.net_id.uint64); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_NET_INVALID_ID); - break; - } - if (!dap_link_manager_get_net_condition(l_chain_pkt->hdr.net_id.uint64)) { - log_it(L_WARNING, "Net id 0x%016" DAP_UINT64_FORMAT_x " is offline", l_chain_pkt->hdr.net_id.uint64); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_NET_IS_OFFLINE); - break; - } - if (l_ch_chain->sync_context || l_ch_chain->legacy_sync_context) { - log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB request because its already busy with syncronization"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS); - break; - } - dap_global_db_legacy_list_t *l_db_list = dap_global_db_legacy_list_start(l_net_cluster->mnemonim); - if (!l_db_list) { - log_it(L_ERROR, "Can't create legacy DB list"); - dap_global_db_legacy_list_delete(l_db_list); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY); - break; - } - struct legacy_sync_context *l_context = s_legacy_sync_context_create(a_ch); - if (!l_context) { - log_it(L_ERROR, "Can't create sychronization context"); - dap_global_db_legacy_list_delete(l_db_list); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY); - break; - } - l_context->is_type_of_gdb = true; - l_context->db_list = l_db_list; - l_context->remote_addr = *(dap_stream_node_addr_t *)l_chain_pkt->data; - l_context->request_hdr = l_chain_pkt->hdr; - l_ch_chain->legacy_sync_context = l_context; - l_context->state = DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB; - debug_if(s_debug_legacy, L_DEBUG, "Sync out gdb proc, requested %" DAP_UINT64_FORMAT_U " records from address " NODE_ADDR_FP_STR " (unverified)", - l_db_list->items_number, NODE_ADDR_FP_ARGS_S(l_context->remote_addr)); - log_it(L_INFO, "In: UPDATE_GLOBAL_DB_REQ pkt: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x, - l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - debug_if(s_debug_legacy, L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_START"); - dap_chain_ch_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_START, - l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, - l_chain_pkt->hdr.cell_id, &g_node_addr, sizeof(dap_chain_node_addr_t), - DAP_CHAIN_CH_PKT_VERSION_LEGACY); - dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_context); - } break; - - // If requested - begin to recieve record's hashes - case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_START: { - struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context; - if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB_REMOTE) { - log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_START packet cause synchronization sequence violation"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); - break; - } - debug_if(s_debug_legacy, L_INFO, "In: UPDATE_GLOBAL_DB_START pkt net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x, - l_context->request_hdr.net_id.uint64, l_context->request_hdr.chain_id.uint64, l_context->request_hdr.cell_id.uint64); - } break; - - // Response with gdb element hashes and sizes - case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB: { - if (l_chain_pkt_data_size % sizeof(dap_chain_ch_update_element_t) || l_chain_pkt_data_size > UINT16_MAX) { - log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size, - dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type)); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE); - return false; - } - debug_if(s_debug_legacy, L_INFO, "In: UPDATE_GLOBAL_DB pkt data_size=%zu", l_chain_pkt_data_size); - struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context; - if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB_REMOTE) { - log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB packet cause synchronization sequence violation"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); - break; - } - l_context->last_activity = dap_time_now(); - - for (dap_chain_ch_update_element_t *l_element = (dap_chain_ch_update_element_t *)l_chain_pkt->data; - (size_t)((byte_t *)(l_element + 1) - l_chain_pkt->data) <= l_chain_pkt_data_size; - l_element++) { - dap_chain_ch_hash_item_t * l_hash_item = NULL; - unsigned l_hash_item_hashv; - HASH_VALUE(&l_element->hash, sizeof(l_element->hash), l_hash_item_hashv); - HASH_FIND_BYHASHVALUE(hh, l_context->remote_gdbs, &l_element->hash, sizeof(l_element->hash), - l_hash_item_hashv, l_hash_item); - if (!l_hash_item) { - l_hash_item = DAP_NEW_Z(dap_chain_ch_hash_item_t); - if (!l_hash_item) { - log_it(L_CRITICAL, "%s", c_error_memory_alloc); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY); - break; - } - l_hash_item->hash = l_element->hash; - l_hash_item->size = l_element->size; - HASH_ADD_BYHASHVALUE(hh, l_context->remote_gdbs, hash, sizeof(l_hash_item->hash), - l_hash_item_hashv, l_hash_item); - //debug_if(s_debug_legacy, L_DEBUG, "In: Updated remote hash GDB list with %s", dap_chain_hash_fast_to_str_static(&l_hash_item->hash)); - } - } - } break; - - // End of response with GlobalDB hashes - case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END: { - if (l_chain_pkt_data_size != sizeof(dap_chain_ch_sync_request_old_t)) { - log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size, - dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type)); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE); - return false; - } - struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context; - if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB_REMOTE) { - log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_END packet cause synchronization sequence violation"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); - break; - } - debug_if(s_debug_legacy, L_INFO, "In: UPDATE_GLOBAL_DB_END pkt with total count %d hashes", HASH_COUNT(l_context->remote_gdbs)); - l_context->state = DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB; - debug_if(s_debug_legacy, L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB"); - dap_chain_ch_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB, - l_context->request_hdr.net_id, l_context->request_hdr.chain_id, - l_context->request_hdr.cell_id, &g_node_addr, sizeof(dap_chain_node_addr_t), - DAP_CHAIN_CH_PKT_VERSION_LEGACY); - dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_context); - } break; - - // first packet of data with source node address - case DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB: { - if (l_chain_pkt_data_size != sizeof(dap_chain_node_addr_t)) { - log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size, - dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type)); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE); - return false; - } - struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context; - if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB_REMOTE) { - log_it(L_WARNING, "Can't process FIRST_GLOBAL_DB packet cause synchronization sequence violation"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); - break; - } - debug_if(s_debug_legacy, L_INFO, "In: FIRST_GLOBAL_DB data_size=%zu net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x - " from address "NODE_ADDR_FP_STR "(unverified)", l_chain_pkt_data_size, l_context->request_hdr.net_id.uint64, - l_context->request_hdr.chain_id.uint64, l_context->request_hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(l_context->remote_addr)); - } break; - - // Dummy packet for freeze detection - case DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB_NO_FREEZE: { - struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context; - if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB_REMOTE) { - log_it(L_WARNING, "Can't process GLOBAL_DB_NO_FREEZE packet cause synchronization sequence violation"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); - break; - } - debug_if(s_debug_legacy, L_DEBUG, "Global DB no freeze packet detected"); - l_context->last_activity = dap_time_now(); - } break; - - case DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB: { - dap_global_db_pkt_old_t *l_pkt = (dap_global_db_pkt_old_t *)l_chain_pkt->data; - if (l_chain_pkt_data_size < sizeof(dap_global_db_pkt_old_t) || - (uint64_t)sizeof(*l_pkt) + l_pkt->data_size < l_pkt->data_size || - l_chain_pkt_data_size != (uint64_t)sizeof(*l_pkt) + l_pkt->data_size) { - log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size, - dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type)); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE); - return false; - } - struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context; - if (l_context && l_context->state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB_REMOTE) { - log_it(L_WARNING, "Can't process GLOBAL_DB packet cause synchronization sequence violation"); - break; - } - if (l_context) - l_context->last_activity = dap_time_now(); - debug_if(s_debug_legacy, L_INFO, "In: GLOBAL_DB_OLD data_size=%zu", l_chain_pkt_data_size); - // get records and save it to global_db - struct record_processing_args *l_args = DAP_NEW(struct record_processing_args); - if (!l_args || !( l_args->pkt = DAP_DUP_SIZE(l_pkt, l_chain_pkt_data_size) )) { - log_it(L_CRITICAL, "%s", c_error_memory_alloc); - DAP_DELETE(l_args); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY); - - break; - } - l_args->worker = a_ch->stream_worker; - l_args->uuid = a_ch->uuid; - l_args->hdr = l_chain_pkt->hdr; - l_args->new = !l_context && l_pkt->obj_count == 1; - dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_gdb_in_pkt_proc_callback, l_args); - } break; - - case DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB: { - struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context; - if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB_REMOTE) { - log_it(L_WARNING, "Can't process SYNCED_GLOBAL_DB packet cause synchronization sequence violation"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); - break; - } - debug_if(s_debug_legacy, L_INFO, "In: SYNCED_GLOBAL_DB: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x, - l_context->request_hdr.net_id.uint64, l_context->request_hdr.chain_id.uint64, l_context->request_hdr.cell_id.uint64); - // we haven't node client waitng, so reply to other side - l_context->state = DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB_REMOTE; - dap_chain_ch_sync_request_old_t l_request = { .node_addr = g_node_addr }; - debug_if(s_debug_legacy, L_INFO, "Out: UPDATE_GLOBAL_DB_REQ pkt"); - dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_REQ, l_context->request_hdr.net_id, - l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, &l_request, sizeof(l_request), - DAP_CHAIN_CH_PKT_VERSION_LEGACY); - } break; - - /// --- Chains update --- - // Request for atoms list update - case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_REQ: { - if (l_chain_pkt_data_size) { // Expected packet with no data - log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size, - dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type)); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE); - return false; - } - if (!dap_link_manager_get_net_condition(l_chain_pkt->hdr.net_id.uint64)) { - log_it(L_WARNING, "Net id 0x%016" DAP_UINT64_FORMAT_x " is offline", l_chain_pkt->hdr.net_id.uint64); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_NET_IS_OFFLINE); - break; - } - if (l_ch_chain->sync_context || l_ch_chain->legacy_sync_context) { - log_it(L_WARNING, "Can't process UPDATE_CHAINS request because its already busy with syncronization"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS); - break; - } - dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); - if (!l_chain) { - log_it(L_WARNING, "Requested chain not found"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_CHAIN_NOT_FOUND); - break; - } - dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain, l_chain_pkt->hdr.cell_id, NULL); - if (!l_atom_iter) { - log_it(L_ERROR, "Can't create legacy atom iterator"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY); - break; - } - struct legacy_sync_context *l_context = s_legacy_sync_context_create(a_ch); - if (!l_context) { - log_it(L_ERROR, "Can't create sychronization context"); - l_chain->callback_atom_iter_delete(l_atom_iter); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY); - break; - } - l_chain->callback_atom_iter_get(l_atom_iter, DAP_CHAIN_ITER_OP_FIRST, NULL); - l_context->atom_iter = l_atom_iter; - l_context->request_hdr = l_chain_pkt->hdr; - l_ch_chain->legacy_sync_context = l_context; - l_context->state = DAP_CHAIN_CH_STATE_UPDATE_CHAINS; - debug_if(s_debug_legacy, L_DEBUG, "Sync out chains proc, requested chain %s for net %s from address " NODE_ADDR_FP_STR " (unverified)", - l_chain->name, l_chain->net_name, NODE_ADDR_FP_ARGS_S(l_context->remote_addr)); - debug_if(s_debug_legacy, L_INFO, "In: UPDATE_CHAINS_REQ pkt: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x, - l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - debug_if(s_debug_legacy, L_INFO, "Out: UPDATE_CHAINS_START pkt: net %s chain %s cell 0x%016"DAP_UINT64_FORMAT_X, l_chain->name, - l_chain->net_name, l_chain_pkt->hdr.cell_id.uint64); - dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_START, - l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, - l_chain_pkt->hdr.cell_id, NULL, 0, - DAP_CHAIN_CH_PKT_VERSION_LEGACY); - dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_context); - } break; - - // If requested - begin to send atom hashes - case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_START: { - struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context; - if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS_REMOTE) { - log_it(L_WARNING, "Can't process UPDATE_CHAINS_START packet cause synchronization sequence violation"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); - break; - } - debug_if(s_debug_legacy, L_INFO, "In: UPDATE_CHAINS_START pkt net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x, - l_context->request_hdr.net_id.uint64, l_context->request_hdr.chain_id.uint64, l_context->request_hdr.cell_id.uint64); - } break; - - // Response with atom hashes and sizes - case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS: { - if (l_chain_pkt_data_size % sizeof(dap_chain_ch_update_element_t) || l_chain_pkt_data_size > UINT16_MAX) { - log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size, - dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type)); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE); - return false; - } - debug_if(s_debug_legacy, L_INFO, "In: UPDATE_CHAINS pkt data_size=%zu", l_chain_pkt_data_size); - struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context; - if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS_REMOTE) { - log_it(L_WARNING, "Can't process UPDATE_CHAINS packet cause synchronization sequence violation"); - if(l_context) - dap_stream_ch_write_error_unsafe(a_ch, l_context->request_hdr.net_id, - l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, - DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); - break; - } - l_context->last_activity = dap_time_now(); - - unsigned int l_count_added = 0; - unsigned int l_count_total = 0; - for (dap_chain_ch_update_element_t *l_element = (dap_chain_ch_update_element_t *)l_chain_pkt->data; - (size_t)((byte_t *)(l_element + 1) - l_chain_pkt->data) <= l_chain_pkt_data_size; - l_element++) { - dap_chain_ch_hash_item_t *l_hash_item = NULL; - unsigned l_hash_item_hashv; - HASH_VALUE(&l_element->hash, sizeof(l_element->hash), l_hash_item_hashv); - HASH_FIND_BYHASHVALUE(hh, l_context->remote_atoms, &l_element->hash, sizeof(l_element->hash), - l_hash_item_hashv, l_hash_item); - if (!l_hash_item) { - l_hash_item = DAP_NEW_Z(dap_chain_ch_hash_item_t); - if (!l_hash_item) { - log_it(L_CRITICAL, "%s", c_error_memory_alloc); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY); - break; - } - l_hash_item->hash = l_element->hash; - l_hash_item->size = l_element->size; - HASH_ADD_BYHASHVALUE(hh, l_context->remote_atoms, hash, sizeof(l_hash_item->hash), - l_hash_item_hashv, l_hash_item); - l_count_added++; - //debug_if(s_debug_legacy, L_DEBUG, "In: Updated remote hash GDB list with %s", dap_chain_hash_fast_to_str_static(&l_hash_item->hash)); - } - l_count_total++; - } - debug_if(s_debug_legacy, L_INFO, "In: Added %u from %u remote atom hash in list", l_count_added, l_count_total); - } break; - - // End of response with chain hashes - case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_END: { - if (l_chain_pkt_data_size != sizeof(dap_chain_ch_sync_request_old_t)) { - log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size, - dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type)); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE); - return false; - } - struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context; - if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS_REMOTE) { - log_it(L_WARNING, "Can't process UPDATE_CHAINS_END packet cause synchronization sequence violation"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); - break; - } - debug_if(s_debug_legacy, L_INFO, "In: UPDATE_CHAINS_END pkt with total count %d hashes", HASH_COUNT(l_context->remote_atoms)); - l_context->state = DAP_CHAIN_CH_STATE_SYNC_CHAINS; - debug_if(s_debug_legacy, L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_FIRST_CHAIN"); - dap_chain_ch_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_CHAIN_CH_PKT_TYPE_FIRST_CHAIN, - l_context->request_hdr.net_id, l_context->request_hdr.chain_id, - l_context->request_hdr.cell_id, &g_node_addr, sizeof(dap_chain_node_addr_t), - DAP_CHAIN_CH_PKT_VERSION_LEGACY); - dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_context); - } break; - - // first packet of data with source node address (legacy, unverified) - case DAP_CHAIN_CH_PKT_TYPE_FIRST_CHAIN: { - if (l_chain_pkt_data_size != sizeof(dap_chain_node_addr_t)) { - log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size, - dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type)); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE); - return false; - } - struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context; - if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_SYNC_CHAINS_REMOTE) { - log_it(L_WARNING, "Can't process FIRST_CHAIN packet cause synchronization sequence violation"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); - break; - } - debug_if(s_debug_legacy, L_INFO, "In: FIRST_CHAIN data_size=%zu net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x - " from address "NODE_ADDR_FP_STR "(unverified)", l_chain_pkt_data_size, l_context->request_hdr.net_id.uint64, - l_context->request_hdr.chain_id.uint64, l_context->request_hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(l_context->remote_addr)); - } break; - - // Dummy packet for freeze detection - case DAP_CHAIN_CH_PKT_TYPE_CHAINS_NO_FREEZE: { - struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context; - if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_SYNC_CHAINS_REMOTE) { - log_it(L_WARNING, "Can't process CHAINS_NO_FREEZE packet cause synchronization sequence violation"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); - break; - } - debug_if(s_debug_legacy, L_DEBUG, "Chains no freeze packet detected"); - l_context->last_activity = dap_time_now(); - } break; - - case DAP_CHAIN_CH_PKT_TYPE_CHAIN_OLD: { - if (!l_chain_pkt_data_size || l_chain_pkt_data_size > sizeof(dap_chain_ch_pkt_t) + DAP_CHAIN_ATOM_MAX_SIZE) { - log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size, - dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type)); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE); - return false; - } - struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context; - if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_SYNC_CHAINS_REMOTE) { - log_it(L_WARNING, "Can't process CHAIN_OLD packet cause synchronization sequence violation"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); - break; - } - struct atom_processing_args *l_args = DAP_NEW_Z_SIZE(struct atom_processing_args, l_ch_pkt->hdr.data_size + sizeof(struct atom_processing_args)); - if (!l_args) { - log_it(L_CRITICAL, "%s", c_error_memory_alloc); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY); - break; - } - l_chain_pkt->hdr.data_size = l_chain_pkt_data_size; - memcpy(l_args->data, l_chain_pkt, l_ch_pkt->hdr.data_size); - debug_if(s_debug_legacy, L_INFO, "In: CHAIN_OLD pkt: atom hash %s (size %zd)", - dap_get_data_hash_str(l_chain_pkt->data, l_chain_pkt_data_size).s, l_chain_pkt_data_size); - dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_in_chains_callback, l_args); - } break; - - case DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS: { - struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context; - if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_SYNC_CHAINS_REMOTE) { - log_it(L_WARNING, "Can't process SYNCED_CHAINS packet cause synchronization sequence violation"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); - break; - } - debug_if(s_debug_legacy, L_INFO, "In: SYNCED_CHAINS: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x, - l_context->request_hdr.net_id.uint64, l_context->request_hdr.chain_id.uint64, l_context->request_hdr.cell_id.uint64); - // we haven't node client waitng, so reply to other side - l_context->state = DAP_CHAIN_CH_STATE_UPDATE_CHAINS_REMOTE; - debug_if(s_debug_legacy, L_INFO, "Out: UPDATE_CHAINS_REQ pkt"); - dap_chain_ch_sync_request_old_t l_request = { .node_addr = g_node_addr }; - dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_REQ, l_context->request_hdr.net_id, - l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, &l_request, sizeof(l_request), - DAP_CHAIN_CH_PKT_VERSION_LEGACY); - } break; - } - return true; } @@ -1540,18 +654,6 @@ static bool s_sync_timer_callback(void *a_arg) DAP_CHAIN_CH_PKT_VERSION_CURRENT); l_timer_break = true; } - } else if (l_ch_chain->legacy_sync_context) { - struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context; - if (l_context->last_activity + s_sync_timeout <= dap_time_now()) { - log_it(L_ERROR, "Sync timeout for node " NODE_ADDR_FP_STR " (unverified) with net 0x%016" DAP_UINT64_FORMAT_x - " chain 0x%016" DAP_UINT64_FORMAT_x " cell 0x%016" DAP_UINT64_FORMAT_x, - NODE_ADDR_FP_ARGS_S(l_context->remote_addr), l_context->request_hdr.net_id.uint64, - l_context->request_hdr.chain_id.uint64, l_context->request_hdr.cell_id.uint64); - dap_chain_ch_pkt_write_unsafe(l_ch, DAP_CHAIN_CH_PKT_TYPE_ERROR, l_context->request_hdr.net_id, - l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, l_err_str, strlen(l_err_str) + 1, - DAP_CHAIN_CH_PKT_VERSION_LEGACY); - l_timer_break = true; - } } else l_timer_break = true; @@ -1643,44 +745,4 @@ static void s_ch_chain_go_idle(dap_chain_ch_t *a_ch_chain) dap_timerfd_delete_unsafe(a_ch_chain->sync_timer); a_ch_chain->sync_timer = NULL; } -//} - // Legacy - if (a_ch_chain->legacy_sync_context) { - dap_chain_ch_state_t l_current_state = atomic_exchange(&a_ch_chain->legacy_sync_context->state, - DAP_CHAIN_CH_STATE_IDLE); - if (l_current_state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS && - l_current_state != DAP_CHAIN_CH_STATE_SYNC_CHAINS && - l_current_state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB && - l_current_state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB && - l_current_state != DAP_CHAIN_CH_STATE_IDLE && - l_current_state != DAP_CHAIN_CH_STATE_ERROR) - // Context will not be destroyed from proc thread - s_legacy_sync_context_delete(a_ch_chain->legacy_sync_context); - a_ch_chain->legacy_sync_context = NULL; - } -} - -static void s_stream_ch_io_complete(dap_events_socket_t *a_es, void *a_arg) -{ - dap_return_if_fail(a_arg); - dap_stream_t *l_stream = dap_stream_get_from_es(a_es); - assert(l_stream); - dap_stream_ch_t *l_ch = dap_stream_ch_by_id_unsafe(l_stream, DAP_CHAIN_CH_ID); - assert(l_ch); - struct legacy_sync_context *l_context = DAP_CHAIN_CH(l_ch)->legacy_sync_context; - if (!l_context) - return; - dap_chain_ch_state_t l_expected = DAP_CHAIN_CH_STATE_WAITING; - if (!atomic_compare_exchange_strong(&l_context->state, &l_expected, l_context->prev_state)) - return; - if (l_context->prev_state == DAP_CHAIN_CH_STATE_UPDATE_CHAINS || - l_context->prev_state == DAP_CHAIN_CH_STATE_SYNC_CHAINS) { - l_context->enqueued_data_size = 0; - dap_proc_thread_callback_add(l_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_context); - } else if (l_context->prev_state == DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB || - l_context->prev_state == DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB) { - l_context->enqueued_data_size = 0; - dap_proc_thread_callback_add(l_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_context); - } else - log_it(L_ERROR, "Unexpected legacy sync context state %d", l_context->state); } diff --git a/modules/chain/dap_chain_ch_pkt.c b/modules/chain/dap_chain_ch_pkt.c index c40aeddcd1..eca885a43d 100644 --- a/modules/chain/dap_chain_ch_pkt.c +++ b/modules/chain/dap_chain_ch_pkt.c @@ -29,9 +29,7 @@ static void s_chain_pkt_fill(dap_chain_ch_pkt_t *a_pkt, dap_chain_net_id_t a_net { *a_pkt = (dap_chain_ch_pkt_t) { .hdr = { .version = a_version, - .data_size = a_version == DAP_CHAIN_CH_PKT_VERSION_LEGACY - ? 0 - : a_data_size, + .data_size = a_data_size, .net_id = a_net_id, .cell_id = a_cell_id, .chain_id = a_chain_id } diff --git a/modules/chain/include/dap_chain.h b/modules/chain/include/dap_chain.h index 4307ddf4c6..320756219f 100644 --- a/modules/chain/include/dap_chain.h +++ b/modules/chain/include/dap_chain.h @@ -164,6 +164,8 @@ typedef struct dap_chain { uint16_t load_priority; char *name; char *net_name; + uint16_t generation; + bool is_datum_pool_proc; bool is_mapped; atomic_int load_progress; diff --git a/modules/chain/include/dap_chain_ch.h b/modules/chain/include/dap_chain_ch.h index da98e7e88f..352176fd0e 100644 --- a/modules/chain/include/dap_chain_ch.h +++ b/modules/chain/include/dap_chain_ch.h @@ -61,10 +61,6 @@ typedef enum dap_chain_ch_error_type { DAP_CHAIN_CH_ERROR_NET_IS_OFFLINE, DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY, DAP_CHAIN_CH_ERROR_INTERNAL, -// Legacy - DAP_CHAIN_CH_ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED, - DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE, - // DAP_CHAIN_CH_ERROR_LAST } dap_chain_ch_error_type_t; diff --git a/modules/chain/include/dap_chain_ch_pkt.h b/modules/chain/include/dap_chain_ch_pkt.h index cd2aec3332..40ed40747e 100644 --- a/modules/chain/include/dap_chain_ch_pkt.h +++ b/modules/chain/include/dap_chain_ch_pkt.h @@ -29,37 +29,12 @@ #include <stdarg.h> #include "dap_common.h" -#include "dap_proc_thread.h" #include "dap_chain_common.h" -#include "dap_chain_datum.h" -#include "dap_chain_cs.h" - #include "dap_stream_ch.h" -#define DAP_CHAIN_CH_PKT_VERSION_LEGACY 0x01 +//#define DAP_CHAIN_CH_PKT_VERSION_LEGACY 0x02 #define DAP_CHAIN_CH_PKT_VERSION_CURRENT 0x02 -//Legacy -#define DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_REQ 0x06 -#define DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_START 0x26 -#define DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB 0x36 -#define DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END 0x46 -#define DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB 0x21 -#define DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB 0x11 -#define DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB 0x13 - -#define DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_REQ 0x05 -#define DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_START 0x25 -#define DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS 0x35 -#define DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_END 0x45 -#define DAP_CHAIN_CH_PKT_TYPE_FIRST_CHAIN 0x20 -#define DAP_CHAIN_CH_PKT_TYPE_CHAIN_OLD 0x01 -#define DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS 0x03 - -// Freeze detectors -#define DAP_CHAIN_CH_PKT_TYPE_CHAINS_NO_FREEZE 0x15 -#define DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB_NO_FREEZE 0x16 - // Stable #define DAP_CHAIN_CH_PKT_TYPE_CHAIN_REQ 0x80 #define DAP_CHAIN_CH_PKT_TYPE_CHAIN_MISS 0x69 @@ -69,38 +44,9 @@ #define DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAIN 0x88 #define DAP_CHAIN_CH_PKT_TYPE_ERROR 0xff -// *** Legacy *** // - -typedef struct dap_chain_ch_update_element { - dap_hash_fast_t hash; - uint32_t size; -} DAP_ALIGN_PACKED dap_chain_ch_update_element_t; - -typedef struct dap_chain_ch_sync_request_old { - dap_chain_node_addr_t node_addr; // Requesting node's address - dap_chain_hash_fast_t hash_from; - byte_t unused[48]; -} DAP_ALIGN_PACKED dap_chain_ch_sync_request_old_t; - DAP_STATIC_INLINE const char *dap_chain_ch_pkt_type_to_str(uint8_t a_pkt_type) { switch (a_pkt_type) { - case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_REQ: return "DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_REQ"; - case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_START: return "DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_START"; - case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB: return "DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB"; - case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END: return "DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END"; - case DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB: return "DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END"; - case DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB: return "DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB"; - case DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB: return "DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB"; - - case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_REQ: return "DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_REQ"; - case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_START: return "DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_START"; - case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS: return "DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS"; - case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_END: return "DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_END"; - case DAP_CHAIN_CH_PKT_TYPE_FIRST_CHAIN: return "DAP_CHAIN_CH_PKT_TYPE_FIRST_CHAIN"; - case DAP_CHAIN_CH_PKT_TYPE_CHAIN_OLD: return "DAP_CHAIN_CH_PKT_TYPE_CHAIN_OLD"; - case DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS: return "DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS"; - case DAP_CHAIN_CH_PKT_TYPE_CHAIN_REQ: return "DAP_CHAIN_CH_PKT_TYPE_CHAIN_REQ"; case DAP_CHAIN_CH_PKT_TYPE_CHAIN_MISS: return "DAP_CHAIN_CH_PKT_TYPE_CHAIN_MISS"; case DAP_CHAIN_CH_PKT_TYPE_CHAIN: return "DAP_CHAIN_CH_PKT_TYPE_CHAIN"; @@ -113,17 +59,26 @@ DAP_STATIC_INLINE const char *dap_chain_ch_pkt_type_to_str(uint8_t a_pkt_type) } } +// *** Legacy *** // + +typedef struct dap_chain_ch_sync_request_old { + dap_chain_hash_fast_t hash_from; + uint64_t num_from; +} DAP_ALIGN_PACKED dap_chain_ch_sync_request_old_t; + // *** Active *** // typedef struct dap_chain_ch_sync_request { dap_chain_hash_fast_t hash_from; uint64_t num_from; + uint16_t generation; } DAP_ALIGN_PACKED dap_chain_ch_sync_request_t; typedef struct dap_chain_ch_summary { uint64_t num_cur; uint64_t num_last; - byte_t reserved[128]; + uint16_t generation; + byte_t reserved[126]; } DAP_ALIGN_PACKED dap_chain_ch_summary_t; typedef struct dap_chain_ch_miss_info { diff --git a/modules/consensus/esbocs/dap_chain_cs_esbocs.c b/modules/consensus/esbocs/dap_chain_cs_esbocs.c index 733c7f3745..0acc302611 100644 --- a/modules/consensus/esbocs/dap_chain_cs_esbocs.c +++ b/modules/consensus/esbocs/dap_chain_cs_esbocs.c @@ -1706,7 +1706,7 @@ static void s_session_candidate_submit(dap_chain_esbocs_session_t *a_session) &l_chain->hardfork_decree_hash, sizeof(l_chain->hardfork_decree_hash)); if (l_candidate_size) l_candidate_size = dap_chain_block_meta_add(&l_candidate, l_candidate_size, DAP_CHAIN_BLOCK_META_GENERATION, - &l_blocks->generation, sizeof(uint16_t)); + &l_chain->generation, sizeof(uint16_t)); } if (l_candidate_size) { dap_hash_fast(l_candidate, l_candidate_size, &l_candidate_hash); @@ -2889,7 +2889,7 @@ static int s_callback_block_verify(dap_chain_cs_blocks_t *a_blocks, dap_chain_bl } if (l_esbocs->session->is_hardfork) { uint8_t *l_generation = dap_chain_block_meta_get(a_block, a_block_size, DAP_CHAIN_BLOCK_META_GENERATION); - if (!l_generation || *(uint16_t *)l_generation != a_blocks->generation) { + if (!l_generation || *(uint16_t *)l_generation != a_blocks->chain->generation) { log_it(L_WARNING, "Can't process hardfork block %s with incorrect generation meta", dap_hash_fast_to_str_static(a_block_hash)); return -302; } diff --git a/modules/ledger/dap_chain_ledger_decree.c b/modules/ledger/dap_chain_ledger_decree.c index d73cf55471..76f11129d1 100644 --- a/modules/ledger/dap_chain_ledger_decree.c +++ b/modules/ledger/dap_chain_ledger_decree.c @@ -623,7 +623,7 @@ const char *l_ban_addr; DAP_CHAIN_DATUM_DECREE_TSD_TYPE_NODE_ADDR, sizeof(dap_stream_node_addr_t)); dap_hash_fast(a_decree, dap_chain_datum_decree_get_size(a_decree), &l_chain->hardfork_decree_hash); dap_tsd_t* l_changed_addrs = dap_tsd_find(a_decree->data_n_signs, a_decree->header.data_size,DAP_CHAIN_DATUM_DECREE_TSD_TYPE_HARDFORK_CHANGED_ADDRS); - return dap_chain_esbocs_set_hardfork_prepare(l_chain, l_block_num, l_addrs, json_tokener_parse(l_changed_addrs->data)); + return dap_chain_esbocs_set_hardfork_prepare(l_chain, l_block_num, l_addrs, json_tokener_parse((char *)l_changed_addrs->data)); } case DAP_CHAIN_DATUM_DECREE_COMMON_SUBTYPE_POLICY: { if (!a_apply) diff --git a/modules/ledger/dap_chain_ledger_tx.c b/modules/ledger/dap_chain_ledger_tx.c index 7b4af85b2b..f5fa8cb165 100644 --- a/modules/ledger/dap_chain_ledger_tx.c +++ b/modules/ledger/dap_chain_ledger_tx.c @@ -2498,7 +2498,7 @@ static dap_chain_addr_t* s_change_addr(struct json_object *a_json, dap_chain_add const char * l_out_addr = dap_chain_addr_to_str_static(a_addr); struct json_object *l_json = json_object_object_get(a_json, l_out_addr); if(l_json && json_object_is_type(l_json, json_type_string)) { - char * l_change_str = json_object_get_string(l_json); + const char *l_change_str = json_object_get_string(l_json); dap_chain_addr_t* l_ret_addr = dap_chain_addr_from_str(l_change_str); DAP_DELETE(l_change_str); return l_ret_addr; diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index b9fba632f6..0b19b4fd8f 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -104,7 +104,7 @@ #include "dap_stream_cluster.h" #include "dap_http_ban_list_client.h" #include "dap_net.h" -#include "dap_context.h" +#include "dap_chain_cs.h" #include "dap_chain_cs_esbocs.h" #include "dap_chain_policy.h" @@ -786,7 +786,7 @@ static dap_chain_net_t *s_net_new(const char *a_net_name, dap_config_t *a_cfg) } // deactivate policy uint16_t l_policy_count = 0; - char **l_policy_str = dap_config_get_array_str(a_cfg, "policy", "deactivate", &l_policy_count); + const char **l_policy_str = dap_config_get_array_str(a_cfg, "policy", "deactivate", &l_policy_count); for (uint16_t i = 0; i < l_policy_count; ++i) { dap_chain_policy_add_to_exception_list(strtoll(l_policy_str[i], NULL, 10), l_ret->pub.id.uint64); } @@ -3073,7 +3073,7 @@ static void s_ch_in_pkt_callback(dap_stream_ch_t *a_ch, uint8_t a_type, const vo l_net_pvt->sync_context.cur_chain->atom_num_last = l_miss_info->last_num; return; } - dap_chain_ch_sync_request_t l_request = {}; + dap_chain_ch_sync_request_t l_request = { .generation = l_net_pvt->sync_context.cur_chain->generation }; l_request.num_from = l_net_pvt->sync_context.requested_atom_num > s_fork_sync_step ? l_net_pvt->sync_context.requested_atom_num - s_fork_sync_step : 0; @@ -3240,7 +3240,7 @@ static void s_sync_timer_callback(void *a_arg) l_net_pvt->sync_context.cur_cell = l_net_pvt->sync_context.cur_chain->cells; l_net_pvt->sync_context.cur_chain->state = CHAIN_SYNC_STATE_WAITING; - dap_chain_ch_sync_request_t l_request = {}; + dap_chain_ch_sync_request_t l_request = { .generation = l_net_pvt->sync_context.cur_chain->generation}; uint64_t l_last_num = 0; if (!dap_chain_get_atom_last_hash_num_ts(l_net_pvt->sync_context.cur_chain, l_net_pvt->sync_context.cur_cell diff --git a/modules/net/dap_chain_node.c b/modules/net/dap_chain_node.c index 5cb5a6a637..773e8bef3c 100644 --- a/modules/net/dap_chain_node.c +++ b/modules/net/dap_chain_node.c @@ -562,7 +562,7 @@ int dap_chain_node_hardfork_prepare(dap_chain_t *a_chain, dap_time_t a_last_bloc } l_states->trusted_addrs = a_trusted_addrs; a_chain->hardfork_data = l_states; - DAP_CHAIN_CS_BLOCKS(a_chain)->generation++; + a_chain->generation++; l_net->pub.ledger->is_hardfork_state = true; return 0; } diff --git a/modules/node-cli/dap_chain_node_cli_cmd.c b/modules/node-cli/dap_chain_node_cli_cmd.c index 0e3c347fd9..8a68dfbb4d 100644 --- a/modules/node-cli/dap_chain_node_cli_cmd.c +++ b/modules/node-cli/dap_chain_node_cli_cmd.c @@ -1124,7 +1124,7 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) } log_it(L_NOTICE, "Stream connection established"); - dap_chain_ch_sync_request_old_t l_sync_request = {}; + dap_chain_ch_sync_request_old_old_t l_sync_request = {}; dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, DAP_CHAIN_CH_ID); // fill begin id l_sync_request.id_start = 1; @@ -1167,7 +1167,7 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) // reset state NODE_CLIENT_STATE_SYNCED dap_chain_node_client_reset(l_node_client); // send request - dap_chain_ch_sync_request_old_t l_sync_request = {}; + dap_chain_ch_sync_request_old_old_t l_sync_request = {}; if(0 == dap_chain_ch_pkt_write_unsafe(l_ch_chain, DAP_CHAIN_CH_PKT_TYPE_SYNC_CHAINS, l_net->pub.id.uint64, l_chain->id.uint64, l_remote_node_info->hdr.cell_id.uint64, &l_sync_request, sizeof(l_sync_request))) { @@ -4134,7 +4134,7 @@ int cmd_decree(int a_argc, char **a_argv, void **a_str_reply) char ** l_addr_pair = dap_strsplit(l_addrs[i], ":", 256); if (!l_addr_pair || !l_addr_pair[0] || !l_addr_pair[1]) continue; - json_object_object_add(l_json_arr_addrs, l_addr_pair[0], l_addr_pair[1]); + json_object_object_add(l_json_arr_addrs, l_addr_pair[0], json_object_new_string(l_addr_pair[1])); } const char * l_addr_array_str = json_object_to_json_string(l_json_arr_addrs); l_tsd = dap_tsd_create(DAP_CHAIN_DATUM_DECREE_TSD_TYPE_HARDFORK_CHANGED_ADDRS, l_addr_array_str, strlen(l_addr_array_str) + 1); @@ -6103,4 +6103,4 @@ int com_policy(int argc, char **argv, void **reply) { DAP_DELETE(l_decree_hash_str); return 0; -} \ No newline at end of file +} diff --git a/modules/type/blocks/dap_chain_cs_blocks.c b/modules/type/blocks/dap_chain_cs_blocks.c index 147849011f..f15476b4c1 100644 --- a/modules/type/blocks/dap_chain_cs_blocks.c +++ b/modules/type/blocks/dap_chain_cs_blocks.c @@ -1553,7 +1553,7 @@ static int s_add_atom_datums(dap_chain_cs_blocks_t *a_blocks, dap_chain_block_ca } dap_hash_fast_t *l_datum_hash = a_block_cache->datum_hash + i; dap_ledger_datum_iter_data_t l_datum_index_data = { .token_ticker = "0", .action = DAP_CHAIN_TX_TAG_ACTION_UNKNOWN , .uid.uint64 = 0 }; - bool is_hardfork_related_block = a_block_cache->generation && a_block_cache->generation == a_blocks->generation; + bool is_hardfork_related_block = a_block_cache->generation && a_block_cache->generation == a_blocks->chain->generation; int l_res = dap_chain_datum_add(a_blocks->chain, l_datum, l_datum_size, l_datum_hash, &l_datum_index_data); if (l_datum->header.type_id != DAP_CHAIN_DATUM_TX || l_res != DAP_LEDGER_CHECK_ALREADY_CACHED) { // If this is any datum other than a already cached transaction l_ret++; @@ -1833,17 +1833,13 @@ static dap_chain_atom_verify_res_t s_callback_atom_add(dap_chain_t * a_chain, da } else { uint8_t *l_generation_meta = dap_chain_block_meta_get(l_block, a_atom_size, DAP_CHAIN_BLOCK_META_GENERATION); - l_blocks->generation = l_generation_meta ? *(uint16_t *)l_generation_meta : 0; - if (l_blocks->generation) { + uint16_t l_generation = l_generation_meta ? *(uint16_t *)l_generation_meta : 0; + if (l_generation && a_chain->generation < l_generation) { dap_hash_fast_t *l_hardfork_decree_hash = (dap_hash_fast_t *)dap_chain_block_meta_get(l_block, a_atom_size, DAP_CHAIN_BLOCK_META_LINK); if (!l_hardfork_decree_hash) { log_it(L_ERROR, "Can't find hardfork decree hash in candidate block meta"); return ATOM_REJECT; } - if (dap_chain_net_srv_stake_switch_table(a_chain->net_id, false)) { // to main - log_it(L_CRITICAL, "Can't accept hardfork genesis block %s: error in switching to main table", dap_hash_fast_to_str_static(a_atom_hash)); - return ATOM_REJECT; - } if (dap_chain_net_srv_stake_hardfork_data_import(a_chain->net_id, l_hardfork_decree_hash)) { // True import log_it(L_ERROR, "Can't accept hardfork genesis block %s: error in hardfork data restoring", dap_hash_fast_to_str_static(a_atom_hash)); return ATOM_REJECT; @@ -2007,7 +2003,7 @@ static dap_chain_atom_verify_res_t s_callback_atom_verify(dap_chain_t *a_chain, else if(dap_hash_fast_compare(&l_block_hash, &PVT(l_blocks)->static_genesis_block_hash) && !dap_hash_fast_is_blank(&l_block_hash)) log_it(L_NOTICE, "Accepting static genesis block %s", dap_hash_fast_to_str_static(a_atom_hash)); - else if (l_generation) { + else if (l_generation && a_chain->generation < l_generation) { log_it(L_NOTICE, "Accepting hardfork genesis block %s and restore data", dap_hash_fast_to_str_static(a_atom_hash)); dap_hash_fast_t *l_hardfork_decree_hash = (dap_hash_fast_t *)dap_chain_block_meta_get(l_block, a_atom_size, DAP_CHAIN_BLOCK_META_LINK); if (!l_hardfork_decree_hash) { @@ -2019,12 +2015,13 @@ static dap_chain_atom_verify_res_t s_callback_atom_verify(dap_chain_t *a_chain, return ATOM_REJECT; } if (dap_chain_net_srv_stake_hardfork_data_import(a_chain->net_id, l_hardfork_decree_hash)) { // Sandbox - if (dap_chain_net_srv_stake_switch_table(a_chain->net_id, false)) { // return to main - log_it(L_CRITICAL, "Can't accept hardfork genesis block %s: error in switching to main table", dap_hash_fast_to_str_static(a_atom_hash)); - } log_it(L_ERROR, "Can't accept hardfork genesis block %s: error in hardfork data restoring", dap_hash_fast_to_str_static(a_atom_hash)); return ATOM_REJECT; } + if (dap_chain_net_srv_stake_switch_table(a_chain->net_id, false)) { // return to main + log_it(L_CRITICAL, "Can't accept hardfork genesis block %s: error in switching to main table", dap_hash_fast_to_str_static(a_atom_hash)); + return ATOM_REJECT; + } } else { char l_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE]; dap_hash_fast_to_str(&PVT(l_blocks)->static_genesis_block_hash, l_hash_str, sizeof(l_hash_str)); diff --git a/modules/type/blocks/include/dap_chain_cs_blocks.h b/modules/type/blocks/include/dap_chain_cs_blocks.h index 04c5aa8ac2..f04dafebc8 100644 --- a/modules/type/blocks/include/dap_chain_cs_blocks.h +++ b/modules/type/blocks/include/dap_chain_cs_blocks.h @@ -44,8 +44,6 @@ typedef struct dap_chain_cs_blocks { dap_chain_block_t *block_new; // For new block creating size_t block_new_size; - uint16_t generation; - dap_chain_cs_blocks_callback_t callback_delete; dap_chain_cs_blocks_callback_block_create_t callback_block_create; dap_chain_cs_blocks_callback_block_verify_t callback_block_verify; -- GitLab