From e6af078f80fda4ba103ae182cc119c9f760fada6 Mon Sep 17 00:00:00 2001 From: "roman.khlopkov" <roman.khlopkov@demlabs.net> Date: Sat, 10 Feb 2024 13:51:01 +0300 Subject: [PATCH] [*] Chains GOSSIP channel connection --- modules/chain/dap_chain.c | 23 +- modules/chain/dap_chain_cell.c | 6 +- modules/chain/dap_chain_ch.c | 232 +++++++----------- modules/chain/dap_chain_ch_pkt.c | 19 -- modules/chain/include/dap_chain_ch_pkt.h | 45 ++-- .../consensus/dag-poa/dap_chain_cs_dag_poa.c | 6 +- .../consensus/esbocs/dap_chain_cs_esbocs.c | 10 +- modules/net/dap_chain_net.c | 14 +- modules/net/dap_chain_node_cli_cmd.c | 10 +- modules/type/blocks/dap_chain_block_cache.c | 6 +- modules/type/blocks/dap_chain_cs_blocks.c | 20 +- modules/type/dag/dap_chain_cs_dag.c | 71 +++--- modules/type/none/dap_chain_cs_none.c | 5 +- 13 files changed, 195 insertions(+), 272 deletions(-) diff --git a/modules/chain/dap_chain.c b/modules/chain/dap_chain.c index 4afb24c9ae..9913c06017 100644 --- a/modules/chain/dap_chain.c +++ b/modules/chain/dap_chain.c @@ -707,14 +707,27 @@ bool dap_chain_get_atom_last_hash(dap_chain_t *a_chain, dap_hash_fast_t *a_atom_ ssize_t dap_chain_atom_save(dap_chain_cell_t *a_chain_cell, const uint8_t *a_atom, size_t a_atom_size, dap_hash_fast_t *a_new_atom_hash) { + dap_return_val_if_fail(a_chain_cell && a_chain_cell->chain, -1); dap_chain_t *l_chain = a_chain_cell->chain; - dap_return_val_if_fail(l_chain, -1); if (a_new_atom_hash) { // Atom is new and need to be distributed for the net - dap_cluster_t *l_net_cluster = dap_cluster_by_mnemonim(l_chain->net_name); - if (l_net_cluster) - dap_gossip_msg_issue(l_net_cluster, DAP_STREAM_CH_CHAIN_ID, - a_atom, a_atom_size, a_new_atom_hash); + dap_cluster_t *l_net_cluster = dap_cluster_find(l_chain->net_id.uint64); + if (l_net_cluster) { + size_t l_pkt_size = a_atom_size + sizeof(dap_stream_ch_chain_pkt_t); + dap_stream_ch_chain_pkt_t *l_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_pkt_size); + if (l_pkt) { + l_pkt->hdr.version = 2; + l_pkt->hdr.data_size = a_atom_size; + l_pkt->hdr.net_id = l_chain->net_id; + l_pkt->hdr.chain_id = l_chain->id; + l_pkt->hdr.cell_id = a_chain_cell->id; + memcpy(l_pkt->data, a_atom, a_atom_size); + dap_gossip_msg_issue(l_net_cluster, DAP_STREAM_CH_CHAIN_ID, + l_pkt, l_pkt_size, a_new_atom_hash); + DAP_DELETE(l_pkt); + } else + log_it(L_CRITICAL, "Not enough memory"); + } } ssize_t l_res = dap_chain_cell_file_append(a_chain_cell, a_atom, a_atom_size); if (l_chain->atom_notifiers) { diff --git a/modules/chain/dap_chain_cell.c b/modules/chain/dap_chain_cell.c index 5df2bb4331..dc7aaacdfd 100644 --- a/modules/chain/dap_chain_cell.c +++ b/modules/chain/dap_chain_cell.c @@ -239,10 +239,8 @@ int dap_chain_cell_load(dap_chain_t *a_chain, dap_chain_cell_t *a_cell) l_ret = -6; break; } - dap_chain_atom_verify_res_t l_res = a_chain->callback_atom_add(a_chain, l_element, l_el_size); // !!! blocking GDB call !!! - if (l_res == ATOM_PASS || l_res == ATOM_REJECT) { - DAP_DELETE(l_element); - } + a_chain->callback_atom_add(a_chain, l_element, l_el_size); // ??? blocking GDB call + DAP_DELETE(l_element); ++q; } if (l_ret < 0) { diff --git a/modules/chain/dap_chain_ch.c b/modules/chain/dap_chain_ch.c index e05de2e74f..0b59fcd393 100644 --- a/modules/chain/dap_chain_ch.c +++ b/modules/chain/dap_chain_ch.c @@ -66,6 +66,7 @@ #include "dap_stream_ch_proc.h" #include "dap_chain_ch.h" #include "dap_chain_ch_pkt.h" +#include "dap_stream_ch_gossip.h" #define LOG_TAG "dap_stream_ch_chain" @@ -123,13 +124,11 @@ static void s_free_log_list_gdb ( dap_stream_ch_chain_t * a_ch_chain); static void s_stream_ch_chain_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size); +static void s_gossip_payload_callback(void *a_payload, size_t a_payload_size, dap_stream_node_addr_t a_sender_addr); static bool s_debug_more=false; static uint_fast16_t s_update_pack_size=100; // Number of hashes packed into the one packet static uint_fast16_t s_skip_in_reactor_count=50; // Number of hashes packed to skip in one reactor loop callback out packet -static uint16_t s_size_ban_groups = 0; -static uint16_t s_size_white_groups = 0; - #ifdef DAP_SYS_DEBUG @@ -140,8 +139,6 @@ static dap_memstat_rec_t s_memstat [MEMSTAT$K_NR] = { #endif - - /** * @brief dap_stream_ch_chain_init * @return @@ -157,7 +154,7 @@ int dap_stream_ch_chain_init() for (int i = 0; i < MEMSTAT$K_NR; i++) dap_memstat_reg(&s_memstat[i]); #endif - + assert(!dap_stream_ch_gossip_callback_add(DAP_STREAM_CH_CHAIN_ID, s_gossip_payload_callback)); return 0; } @@ -543,52 +540,36 @@ static bool s_gdb_in_pkt_proc_callback(void *a_arg) */ static bool s_sync_in_chains_callback(void *a_arg) { - struct sync_request *l_sync_request = (struct sync_request *) a_arg; - if (!l_sync_request) { + dap_stream_ch_chain_pkt_t *l_chain_pkt = a_arg; + if (!l_chain_pkt || !l_chain_pkt->hdr.data_size) { log_it(L_CRITICAL, "Proc thread received corrupted chain packet!"); return false; } - dap_chain_pkt_item_t *l_pkt_item = &l_sync_request->pkt; - dap_chain_hash_fast_t l_atom_hash = {}; - if (l_pkt_item->pkt_data_size == 0 || !l_pkt_item->pkt_data) { - log_it(L_CRITICAL, "In proc thread got CHAINS stream ch packet with zero data"); - DAP_DEL_Z(l_pkt_item->pkt_data); - DAP_DELETE(l_sync_request); - return false; - } - dap_chain_t *l_chain = dap_chain_find_by_id(l_sync_request->request_hdr.net_id, l_sync_request->request_hdr.chain_id); + dap_chain_atom_ptr_t l_atom = (dap_chain_atom_ptr_t)l_chain_pkt->data; + uint64_t l_atom_size = l_chain_pkt->hdr.data_size; + dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if (!l_chain) { - if (s_debug_more) - log_it(L_WARNING, "No chain found for DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN"); - DAP_DEL_Z(l_pkt_item->pkt_data); - DAP_DELETE(l_sync_request); + debug_if(s_debug_more, L_WARNING, "No chain found for DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN"); + DAP_DELETE(l_chain_pkt); return false; } - dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_item->pkt_data; - uint64_t l_atom_copy_size = l_pkt_item->pkt_data_size; - dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash); - dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size); - char l_atom_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = {[0]='\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)); + char *l_atom_hash_str = NULL; + if (s_debug_more) + dap_get_data_hash_str_static(l_atom, l_atom_size, l_atom_hash_str); + dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom, l_atom_size); switch (l_atom_add_res) { case ATOM_PASS: - if (s_debug_more){ - log_it(L_WARNING, "Atom with hash %s for %s:%s not accepted (code ATOM_PASS, already present)", l_atom_hash_str, l_chain->net_name, l_chain->name); - } + debug_if(s_debug_more, L_WARNING, "Atom with hash %s for %s:%s not accepted (code ATOM_PASS, already present)", + l_atom_hash_str, l_chain->net_name, l_chain->name); //dap_chain_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); - DAP_DELETE(l_atom_copy); break; case ATOM_MOVE_TO_THRESHOLD: - if (s_debug_more) { - log_it(L_INFO, "Thresholded atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); - } + debug_if(s_debug_more, L_INFO, "Thresholded atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); //dap_chain_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); break; case ATOM_ACCEPT: - if (s_debug_more) { - log_it(L_INFO,"Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); - } - int l_res = dap_chain_atom_save(l_chain->cells, l_atom_copy, l_atom_copy_size, NULL); + debug_if(s_debug_more, L_INFO,"Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); + int l_res = dap_chain_atom_save(l_chain->cells, l_atom, l_atom_size, NULL); if(l_res < 0) { log_it(L_ERROR, "Can't save atom %s to the file", l_atom_hash_str); } else { @@ -596,23 +577,34 @@ static bool s_sync_in_chains_callback(void *a_arg) } break; case ATOM_REJECT: { - if (s_debug_more) { - char l_atom_hash_str[72] = {'\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); - log_it(L_WARNING,"Atom with hash %s for %s:%s rejected", l_atom_hash_str, l_chain->net_name, l_chain->name); - } - DAP_DELETE(l_atom_copy); + debug_if(s_debug_more, L_WARNING, "Atom with hash %s for %s:%s rejected", l_atom_hash_str, l_chain->net_name, l_chain->name); break; } default: - DAP_DELETE(l_atom_copy); log_it(L_CRITICAL, "Wtf is this ret code? %d", l_atom_add_res); break; } - DAP_DEL_Z(l_sync_request); + DAP_DELETE(l_chain_pkt); return false; } +static void s_gossip_payload_callback(void *a_payload, size_t a_payload_size, dap_stream_node_addr_t UNUSED_ARG a_sender_addr) +{ + assert(a_payload && a_payload_size); + dap_stream_ch_chain_pkt_t *l_chain_pkt = a_payload; + if (a_payload_size <= sizeof(dap_stream_ch_chain_pkt_t) || + a_payload_size != sizeof(dap_stream_ch_chain_pkt_t) + l_chain_pkt->hdr.data_size) { + log_it(L_WARNING, "Incorrect chain GOSSIP packet size"); + return; + } + dap_stream_ch_chain_pkt_t *l_chain_pkt_copy = DAP_DUP_SIZE(a_payload, a_payload_size); + if (!l_chain_pkt_copy) { + log_it(L_CRITICAL, "Not enough memory"); + return; + } + dap_proc_thread_callback_add(NULL, s_sync_in_chains_callback, l_chain_pkt_copy); +} + /** * @brief s_gdb_in_pkt_error_worker_callback * @param a_thread @@ -843,39 +835,24 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } } break; // End of response with starting of DB sync - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END: { if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { - if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB && l_ch_chain->state != CHAIN_STATE_IDLE) { - log_it(L_WARNING, "Can't process SYNC_GLOBAL_DB request because not in idle state"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_STATE_NOT_IN_IDLE"); - break; - } - if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END && - (l_ch_chain->state != CHAIN_STATE_UPDATE_GLOBAL_DB_REMOTE || - memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_t)))) { + if (l_ch_chain->state != CHAIN_STATE_UPDATE_GLOBAL_DB_REMOTE || + memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_t))) { log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_END request because its already busy with syncronization"); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); break; } - if(s_debug_more) - { - if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END) - log_it(L_INFO, "In: UPDATE_GLOBAL_DB_END pkt with total count %d hashes", + debug_if(s_debug_more, L_INFO, "In: UPDATE_GLOBAL_DB_END pkt with total count %d hashes", HASH_COUNT(l_ch_chain->remote_gdbs)); - else - log_it(L_INFO, "In: SYNC_GLOBAL_DB pkt"); - } if (l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) l_ch_chain->request = *(dap_stream_ch_chain_sync_request_t*)l_chain_pkt->data; struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_sync_request); - }else{ - log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); + } else { + log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_CHAIN_PKT_DATA_SIZE" ); @@ -970,10 +947,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if (!l_chain) { - log_it(L_ERROR, "Invalid UPDATE_CHAINS_START request from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x + log_it(L_ERROR, "Invalid UPDATE_CHAINS_START request from %s with net id 0x%016"DAP_UINT64_FORMAT_x " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", - a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.ext_id, - l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, + a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, @@ -989,15 +965,15 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; // Response with atom hashes and sizes - case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS :{ + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS: { unsigned int l_count_added=0; unsigned int l_count_total=0; dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if (! l_chain){ - log_it(L_ERROR, "Invalid UPDATE_CHAINS packet from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x + log_it(L_ERROR, "Invalid UPDATE_CHAINS packet from %s with net id 0x%016"DAP_UINT64_FORMAT_x " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", - a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.ext_id, + a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, @@ -1039,19 +1015,10 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_INFO,"In: Added %u from %u remote atom hash in list",l_count_added,l_count_total); } break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END: - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: { + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END: { if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { - if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS && l_ch_chain->state != CHAIN_STATE_IDLE) { - log_it(L_WARNING, "Can't process SYNC_CHAINS request because not in idle state"); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_STATE_NOT_IN_IDLE"); - break; - } - if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END && - (l_ch_chain->state != CHAIN_STATE_UPDATE_CHAINS_REMOTE || - memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_t)))) { + if (l_ch_chain->state != CHAIN_STATE_UPDATE_CHAINS_REMOTE || + memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_t))) { log_it(L_WARNING, "Can't process UPDATE_CHAINS_END request because its already busy with syncronization"); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, @@ -1060,40 +1027,23 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if (!l_chain) { - log_it(L_ERROR, "Invalid UPDATE_CHAINS packet from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x + log_it(L_ERROR, "Invalid UPDATE_CHAINS packet from %s with net id 0x%016"DAP_UINT64_FORMAT_x " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", - a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.ext_id, - l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, + a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_NET_INVALID_ID"); break; } - if(s_debug_more) - { - if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END) - log_it(L_INFO, "In: UPDATE_CHAINS_END pkt with total count %d hashes", + debug_if(s_debug_more, L_INFO, "In: UPDATE_CHAINS_END pkt with total count %d hashes", HASH_COUNT(l_ch_chain->remote_atoms)); - else - log_it(L_INFO, "In: SYNC_CHAINS pkt"); - } struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); l_ch_chain->stats_request_atoms_processed = 0; l_ch_chain->request_hdr = l_chain_pkt->hdr; - if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS) { - char l_hash_from_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = { '\0' }, l_hash_to_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = { '\0' }; - dap_chain_hash_fast_t l_hash_from = l_ch_chain->request.hash_from, - l_hash_to = l_ch_chain->request.hash_to; - dap_chain_hash_fast_to_str(&l_hash_from, l_hash_from_str, DAP_CHAIN_HASH_FAST_STR_SIZE); - dap_chain_hash_fast_to_str(&l_hash_to, l_hash_to_str, DAP_CHAIN_HASH_FAST_STR_SIZE); - log_it(L_INFO, "In: SYNC_CHAINS pkt: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x - " between %s and %s", l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, - l_hash_from_str[0] ? l_hash_from_str : "(null)", l_hash_to_str[0] ? l_hash_to_str : "(null)"); - } dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_sync_request); } else { - log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: Wrong chain packet size %zd when expected %zd", + log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, @@ -1118,37 +1068,36 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: { - if(l_chain_pkt_data_size) { - dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); - if(l_chain) { - // Expect atom element in - if(l_chain_pkt_data_size > 0) { - struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); - dap_chain_pkt_item_t *l_pkt_item = &l_sync_request->pkt; - l_pkt_item->pkt_data = DAP_DUP_SIZE(l_chain_pkt->data, l_chain_pkt_data_size); - if (!l_pkt_item->pkt_data) { - log_it(L_ERROR, "Not enough memory!"); - DAP_DELETE(l_sync_request); - break; - } - l_pkt_item->pkt_data_size = l_chain_pkt_data_size; - if (s_debug_more){ - dap_chain_hash_fast_t l_atom_hash={0}; - dap_hash_fast(l_chain_pkt->data, l_chain_pkt_data_size ,&l_atom_hash); - char l_atom_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = { '\0' }; - dap_chain_hash_fast_to_str(&l_atom_hash, l_atom_hash_str, DAP_CHAIN_HASH_FAST_STR_SIZE); - log_it(L_INFO, "In: CHAIN pkt: atom hash %s (size %zd)", l_atom_hash_str, l_chain_pkt_data_size); - } - if (dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_in_chains_callback, l_sync_request)) - log_it(L_ERROR, "System queue overflow with atom trying atom add. All following atoms will be rejected!"); - } else { - log_it(L_WARNING, "Empty chain packet"); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_CHAIN_PACKET_EMPTY"); - } - } + dap_stream_ch_chain_pkt_t *l_chain_pkt = (dap_stream_ch_chain_pkt_t *)l_ch_pkt->data; + if (l_chain_pkt_data_size <= sizeof(dap_stream_ch_chain_pkt_t) || + l_chain_pkt_data_size != sizeof(dap_stream_ch_chain_pkt_t) + l_chain_pkt->hdr.data_size) { + log_it(L_WARNING, "Incorrect chain packet size"); + break; + } + dap_cluster_t *l_cluster = dap_cluster_find(l_chain_pkt->hdr.net_id.uint64); + if (!l_cluster) { + log_it(L_WARNING, "Can't find cluster with ID 0x%" DAP_UINT64_FORMAT_X, l_chain_pkt->hdr.net_id.uint64); + break; + } + dap_cluster_member_t *l_check = dap_cluster_member_find_unsafe(l_cluster, &a_ch->stream->node); + if (!l_check) { + log_it(L_WARNING, "Node with addr "NODE_ADDR_FP_STR" isn't a member of cluster %s", + NODE_ADDR_FP_ARGS_S(a_ch->stream->node), l_cluster->mnemonim); + break; } + dap_stream_ch_chain_pkt_t *l_chain_pkt_copy = DAP_DUP_SIZE(l_chain_pkt->data, l_chain_pkt_data_size); + if (!l_chain_pkt_copy) { + log_it(L_CRITICAL, "Not enough memory"); + break; + } + if (l_chain_pkt_copy->hdr.version < 2) + l_chain_pkt_copy->hdr.data_size = l_chain_pkt_data_size; + if (s_debug_more) { + char *l_atom_hash_str; + dap_get_data_hash_str_static(l_chain_pkt->data, l_chain_pkt_data_size, l_atom_hash_str); + log_it(L_INFO, "In: CHAIN pkt: atom hash %s (size %zd)", l_atom_hash_str, l_chain_pkt_data_size); + } + dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_in_chains_callback, l_chain_pkt_copy); } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { @@ -1170,11 +1119,10 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if (!l_ch_chain->callback_notify_packet_in) { // we haven't node client waitng, so reply to other side dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if (!l_chain) { - log_it(L_ERROR, "Invalid SYNCED_CHAINS packet from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x + log_it(L_ERROR, "Invalid SYNCED_CHAINS packet from %s with net id 0x%016"DAP_UINT64_FORMAT_x " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", - a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.ext_id, - l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, - l_chain_pkt->hdr.cell_id.uint64); + a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_NET_INVALID_ID"); @@ -1199,12 +1147,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt_data_size ? l_error_str : "<empty>"); } break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: { - log_it(L_INFO, "In from "NODE_ADDR_FP_STR": SYNCED_ALL net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x, - NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr), l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - } break; - default: { s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, diff --git a/modules/chain/dap_chain_ch_pkt.c b/modules/chain/dap_chain_ch_pkt.c index 7f71ee760c..743fb83a2e 100644 --- a/modules/chain/dap_chain_ch_pkt.c +++ b/modules/chain/dap_chain_ch_pkt.c @@ -23,25 +23,6 @@ #define LOG_TAG "dap_stream_ch_chain_pkt" - -/** - * @brief dap_stream_ch_chain_pkt_to_dap_stream_ch_chain_state - * @param a_state - * @return - */ -dap_stream_ch_chain_state_t dap_stream_ch_chain_pkt_type_to_dap_stream_ch_chain_state(char a_state) -{ - switch (a_state) { - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: - return CHAIN_STATE_SYNC_ALL; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: - return CHAIN_STATE_SYNC_GLOBAL_DB; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: - return CHAIN_STATE_SYNC_CHAINS; - } - return CHAIN_STATE_IDLE; -} - /** * @brief dap_stream_ch_net_pkt_write * @param sid diff --git a/modules/chain/include/dap_chain_ch_pkt.h b/modules/chain/include/dap_chain_ch_pkt.h index c2b4f60616..2bbea66a36 100644 --- a/modules/chain/include/dap_chain_ch_pkt.h +++ b/modules/chain/include/dap_chain_ch_pkt.h @@ -38,37 +38,30 @@ #define DAP_STREAM_CH_CHAIN_PKT_VERSION 0x01 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN 0x01 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB 0x11 - -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN 0x20 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB 0x21 - -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS 0x02 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB 0x12 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_ALL 0x22 - -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS 0x03 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB 0x13 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL 0x23 - #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ 0x05 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD 0x15 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START 0x25 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS 0x35 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END 0x45 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN 0x20 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN 0x01 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS 0x03 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ 0x06 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD 0x16 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_START 0x26 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB 0x36 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END 0x46 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB 0x21 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB 0x11 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB 0x13 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_DELETE 0xda #define DAP_STREAM_CH_CHAIN_PKT_TYPE_TIMEOUT 0xfe #define DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR 0xff // TSD sections +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD 0x15 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD 0x16 + #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_PROTO 0x0001 // Protocol version #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_COUNT 0x0002 // Items count #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_HASH_LAST 0x0003 // Hash of last(s) item @@ -102,20 +95,16 @@ typedef struct dap_stream_ch_chain_sync_request{ } DAP_ALIGN_PACKED dap_stream_ch_chain_sync_request_t; -typedef struct dap_stream_ch_chain_pkt_hdr{ - union{ - struct{ - uint8_t version; - uint8_t padding[7]; - } DAP_ALIGN_PACKED; - uint64_t ext_id; - }DAP_ALIGN_PACKED; +typedef struct dap_stream_ch_chain_pkt_hdr { + uint8_t version; + uint8_t padding[3]; + uint32_t data_size; dap_chain_net_id_t net_id; dap_chain_id_t chain_id; dap_chain_cell_id_t cell_id; } DAP_ALIGN_PACKED dap_stream_ch_chain_pkt_hdr_t; -typedef struct dap_stream_ch_chain_pkt{ +typedef struct dap_stream_ch_chain_pkt { dap_stream_ch_chain_pkt_hdr_t hdr; uint8_t data[]; } DAP_ALIGN_PACKED dap_stream_ch_chain_pkt_t; @@ -123,18 +112,12 @@ typedef struct dap_stream_ch_chain_pkt{ static const char* c_dap_stream_ch_chain_pkt_type_str[]={ [DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN", [DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB", - [DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS", - [DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB", - [DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_ALL] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_ALL", [DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS", [DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB", - [DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL", [DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR" }; -dap_stream_ch_chain_state_t dap_stream_ch_chain_pkt_type_to_dap_stream_ch_chain_state(char a_state); - size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size); diff --git a/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c b/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c index bff0dc5aa1..2c731036bd 100644 --- a/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c +++ b/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c @@ -612,7 +612,7 @@ static bool s_callback_round_event_to_chain_callback_get_round_item(dap_global_d dap_chain_cs_dag_event_round_item_t *l_chosen_item = s_round_event_choose_dup(l_dups_list, l_max_signs_count); if (l_chosen_item) { size_t l_event_size = l_chosen_item->event_size; - dap_chain_cs_dag_event_t *l_new_atom = (dap_chain_cs_dag_event_t *)DAP_DUP_SIZE(l_chosen_item->event_n_signs, l_event_size); + dap_chain_cs_dag_event_t *l_new_atom = (dap_chain_cs_dag_event_t *)l_chosen_item->event_n_signs; dap_hash_fast_t l_event_hash; dap_hash_fast(l_new_atom, l_event_size, &l_event_hash); char *l_event_hash_hex_str = DAP_NEW_STACK_SIZE(char, DAP_CHAIN_HASH_FAST_STR_SIZE); @@ -625,12 +625,10 @@ static bool s_callback_round_event_to_chain_callback_get_round_item(dap_global_d if (l_res == ATOM_ACCEPT) { dap_chain_atom_save(l_dag->chain->cells, (dap_chain_atom_ptr_t)l_new_atom, l_event_size, &l_event_hash); s_poa_round_clean(l_dag->chain); - } else if (l_res != ATOM_MOVE_TO_THRESHOLD) - DAP_DELETE(l_new_atom); + } log_it(L_INFO, "Event %s from round %"DAP_UINT64_FORMAT_U" %s", l_event_hash_hex_str, l_round_id, dap_chain_atom_verify_res_str[l_res]); } else { - DAP_DELETE(l_new_atom); char l_datum_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE]; dap_chain_hash_fast_to_str(&l_chosen_item->round_info.datum_hash, l_datum_hash_str, DAP_CHAIN_HASH_FAST_STR_SIZE); log_it(L_INFO, "Event %s from round %"DAP_UINT64_FORMAT_U" not added into chain, because the inner datum %s doesn't pass verification (%s)", diff --git a/modules/consensus/esbocs/dap_chain_cs_esbocs.c b/modules/consensus/esbocs/dap_chain_cs_esbocs.c index 4a1e64b677..009d51244a 100644 --- a/modules/consensus/esbocs/dap_chain_cs_esbocs.c +++ b/modules/consensus/esbocs/dap_chain_cs_esbocs.c @@ -468,7 +468,7 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf l_session->my_signing_addr = l_my_signing_addr; char *l_sync_group = s_get_penalty_group(l_net->pub.id); l_session->db_cluster = dap_global_db_cluster_add(dap_global_db_instance_get_default(), - l_sync_group, l_sync_group, + NULL, 0, l_sync_group, 72 * 3600, true, DAP_GDB_MEMBER_ROLE_NOBODY, DAP_CLUSTER_ROLE_AUTONOMIC); DAP_DELETE(l_sync_group); @@ -1533,13 +1533,12 @@ static bool s_session_candidate_to_chain(dap_chain_esbocs_session_t *a_session, return false; } bool res = false; - dap_chain_block_t *l_candidate = DAP_DUP_SIZE(a_candidate, a_candidate_size); - dap_chain_atom_verify_res_t l_res = a_session->chain->callback_atom_add(a_session->chain, l_candidate, a_candidate_size); + dap_chain_atom_verify_res_t l_res = a_session->chain->callback_atom_add(a_session->chain, a_candidate, a_candidate_size); char *l_candidate_hash_str = dap_chain_hash_fast_to_str_new(a_candidate_hash); switch (l_res) { case ATOM_ACCEPT: // block save to chain - if (dap_chain_atom_save(a_session->chain->cells, (uint8_t *)l_candidate, a_candidate_size, a_candidate_hash) < 0) + if (dap_chain_atom_save(a_session->chain->cells, (uint8_t *)a_candidate, a_candidate_size, a_candidate_hash) < 0) log_it(L_ERROR, "Can't save atom %s to the file", l_candidate_hash_str); else { @@ -1552,15 +1551,12 @@ static bool s_session_candidate_to_chain(dap_chain_esbocs_session_t *a_session, break; case ATOM_PASS: log_it(L_WARNING, "Atom with hash %s not accepted (code ATOM_PASS, already present)", l_candidate_hash_str); - DAP_DELETE(l_candidate); break; case ATOM_REJECT: log_it(L_WARNING,"Atom with hash %s rejected", l_candidate_hash_str); - DAP_DELETE(l_candidate); break; default: log_it(L_CRITICAL, "Wtf is this ret code ? Atom hash %s code %d", l_candidate_hash_str, l_res); - DAP_DELETE(l_candidate); } DAP_DELETE(l_candidate_hash_str); return res; diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 1cf3a9d7cf..5934ebda6f 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -428,7 +428,7 @@ dap_chain_node_info_t *dap_chain_net_balancer_link_from_cfg(dap_chain_net_t *a_n void dap_chain_net_add_cluster_link(dap_chain_net_t *a_net, dap_stream_node_addr_t *a_node_addr) { dap_return_if_fail(a_net && a_node_addr); - dap_cluster_t *l_links_cluster = dap_cluster_by_mnemonim(a_net->pub.gdb_groups_prefix); + dap_cluster_t *l_links_cluster = dap_cluster_by_mnemonim(a_net->pub.name); if (l_links_cluster) dap_cluster_member_add(l_links_cluster, a_node_addr, 0, NULL); else @@ -2840,8 +2840,8 @@ int s_net_load(dap_chain_net_t *a_net) l_gdb_groups_mask = dap_strdup_printf("%s.chain-%s.mempool", l_net->pub.gdb_groups_prefix, l_chain->name); dap_global_db_cluster_t *l_cluster = dap_global_db_cluster_add( dap_global_db_instance_get_default(), - l_net->pub.name, l_gdb_groups_mask, - DAP_CHAIN_NET_MEMPOOL_TTL, true, + l_net->pub.name, l_net->pub.id.uint64, + l_gdb_groups_mask, DAP_CHAIN_NET_MEMPOOL_TTL, true, DAP_GDB_MEMBER_ROLE_USER, DAP_CLUSTER_ROLE_EMBEDDED); if (!l_cluster) { @@ -2856,8 +2856,8 @@ int s_net_load(dap_chain_net_t *a_net) // Service orders cluster l_gdb_groups_mask = dap_strdup_printf("%s.service.orders", l_net->pub.gdb_groups_prefix); l_net_pvt->orders_cluster = dap_global_db_cluster_add(dap_global_db_instance_get_default(), - l_net->pub.name, l_gdb_groups_mask, - 0, true, + l_net->pub.name, l_net->pub.id.uint64, + l_gdb_groups_mask, 0, true, DAP_GDB_MEMBER_ROLE_GUEST, DAP_CLUSTER_ROLE_EMBEDDED); if (!l_net_pvt->orders_cluster) { @@ -2871,8 +2871,8 @@ int s_net_load(dap_chain_net_t *a_net) l_net->pub.gdb_nodes_aliases = dap_strdup_printf("%s.nodes.aliases",l_net->pub.gdb_groups_prefix); l_gdb_groups_mask = dap_strdup_printf("%s.nodes*", l_net->pub.gdb_groups_prefix); l_net_pvt->nodes_cluster = dap_global_db_cluster_add(dap_global_db_instance_get_default(), - l_net->pub.name, l_gdb_groups_mask, - 0, true, + l_net->pub.name, l_net->pub.id.uint64, + l_gdb_groups_mask, 0, true, DAP_GDB_MEMBER_ROLE_GUEST, DAP_CLUSTER_ROLE_EMBEDDED); if (!l_net_pvt->nodes_cluster) { diff --git a/modules/net/dap_chain_node_cli_cmd.c b/modules/net/dap_chain_node_cli_cmd.c index d7c4c4b394..7a898eaf48 100644 --- a/modules/net/dap_chain_node_cli_cmd.c +++ b/modules/net/dap_chain_node_cli_cmd.c @@ -1209,7 +1209,10 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) break; // make connect - case CMD_CONNECT: { + case CMD_CONNECT: + dap_cli_server_cmd_set_reply_text(a_str_reply, "Not implemented yet"); + break; +#if 0 // get address from alias if addr not defined if(alias_str && !l_node_addr.uint64) { dap_chain_node_addr_t *address_tmp = dap_chain_node_alias_find(l_net, alias_str); @@ -1389,6 +1392,7 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) return 0; } +#endif // make handshake case CMD_HANDSHAKE: { // get address from alias if addr not defined @@ -1505,6 +1509,7 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) l_key_str_out ? "" : " not"); DAP_DELETE(l_key_str_out); } break; + case CMD_UNBAN: { dap_chain_net_t *l_netl = NULL; dap_chain_t *l_chain = NULL; @@ -1566,6 +1571,7 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) l_key_str_out ? "" : " not"); DAP_DELETE(l_key_str_out); } break; + case CMD_BANLIST: { dap_string_t *l_str_ban_list = dap_string_new("Ban list:\n"); dap_http_ban_list_client_ipv4_print(l_str_ban_list); @@ -1574,6 +1580,7 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) dap_cli_server_cmd_set_reply_text(a_str_reply, "%s", l_str_ban_list->str); dap_string_free(l_str_ban_list, true); } break; + case CMD_BALANCER: { //balancer link list size_t l_node_num = 0; @@ -1592,6 +1599,7 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) l_string_balanc->str); dap_string_free(l_string_balanc, true); } break; + default: dap_cli_server_cmd_set_reply_text(a_str_reply, "Unrecognized subcommand '%s'", arg_index < a_argc ? a_argv[arg_index] : "(null)"); diff --git a/modules/type/blocks/dap_chain_block_cache.c b/modules/type/blocks/dap_chain_block_cache.c index 2fccdd5bca..d17e917b0d 100644 --- a/modules/type/blocks/dap_chain_block_cache.c +++ b/modules/type/blocks/dap_chain_block_cache.c @@ -65,7 +65,11 @@ dap_chain_block_cache_t *dap_chain_block_cache_new(dap_hash_fast_t *a_block_hash log_it(L_CRITICAL, "Memory allocation error"); return NULL; } - l_block_cache->block = a_block; + l_block_cache->block = DAP_DUP_SIZE(a_block, a_block_size); + if (!l_block_cache->block) { + log_it(L_CRITICAL, "Memory allocation error"); + return NULL; + } l_block_cache->block_size = a_block_size; l_block_cache->block_number = a_block_number; l_block_cache->ts_created = a_block->hdr.ts_created; diff --git a/modules/type/blocks/dap_chain_cs_blocks.c b/modules/type/blocks/dap_chain_cs_blocks.c index c822b9827d..01fc8e8c3c 100644 --- a/modules/type/blocks/dap_chain_cs_blocks.c +++ b/modules/type/blocks/dap_chain_cs_blocks.c @@ -1314,25 +1314,18 @@ static dap_chain_atom_verify_res_t s_callback_atom_add(dap_chain_t * a_chain, da debug_if(s_debug_more, L_DEBUG, "... %s is already present", l_block_cache->block_hash_str); pthread_rwlock_unlock(&PVT(l_blocks)->rwlock); return ATOM_PASS; - } else { - l_block_cache = dap_chain_block_cache_new(&l_block_hash, l_block, l_block_size, PVT(l_blocks)->blocks_count + 1); - if (!l_block_cache) { - log_it(L_DEBUG, "... corrupted block"); - pthread_rwlock_unlock(&PVT(l_blocks)->rwlock); - return ATOM_REJECT; - } - debug_if(s_debug_more, L_DEBUG, "... new block %s", l_block_cache->block_hash_str); } - dap_chain_atom_verify_res_t ret = s_callback_atom_verify(a_chain, a_atom, a_atom_size); switch (ret) { case ATOM_ACCEPT: + l_block_cache = dap_chain_block_cache_new(&l_block_hash, l_block, l_block_size, PVT(l_blocks)->blocks_count + 1); + if (!l_block_cache) + break; + debug_if(s_debug_more, L_DEBUG, "... new block %s", l_block_cache->block_hash_str); HASH_ADD(hh, PVT(l_blocks)->blocks, block_hash, sizeof(l_block_cache->block_hash), l_block_cache); ++PVT(l_blocks)->blocks_count; - pthread_rwlock_unlock(&PVT(l_blocks)->rwlock); debug_if(s_debug_more, L_DEBUG, "Verified atom %p: ACCEPTED", a_atom); - s_add_atom_datums(l_blocks, l_block_cache); - return ret; + break; case ATOM_MOVE_TO_THRESHOLD: // TODO: reimplement and enable threshold for blocks /* { @@ -1342,7 +1335,6 @@ static dap_chain_atom_verify_res_t s_callback_atom_add(dap_chain_t * a_chain, da */ ret = ATOM_REJECT; case ATOM_REJECT: - dap_chain_block_cache_delete(l_block_cache); debug_if(s_debug_more, L_DEBUG, "Verified atom %p: REJECTED", a_atom); break; default: @@ -1350,6 +1342,8 @@ static dap_chain_atom_verify_res_t s_callback_atom_add(dap_chain_t * a_chain, da break; } pthread_rwlock_unlock(&PVT(l_blocks)->rwlock); + if (ret == ATOM_ACCEPT) + s_add_atom_datums(l_blocks, l_block_cache); return ret; } diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 0b5f66f454..80bb467422 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -323,7 +323,7 @@ static int s_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) l_dag->gdb_group_events_round_new = dap_strdup_printf(l_dag->is_celled ? "dag-%s-%s-%016llx-round.new" : "dag-%s-%s-round.new", l_net->pub.gdb_groups_prefix, a_chain->name, 0LLU); dap_global_db_cluster_t *l_dag_cluster = dap_global_db_cluster_add(dap_global_db_instance_get_default(), - l_dag->gdb_group_events_round_new, l_dag->gdb_group_events_round_new, + NULL, 0, l_dag->gdb_group_events_round_new, 900, true, DAP_GDB_MEMBER_ROLE_NOBODY, DAP_CLUSTER_ROLE_AUTONOMIC); dap_global_db_cluster_add_notify_callback(l_dag_cluster, s_round_changes_notify, l_dag); dap_chain_net_add_poa_certs_to_cluster(l_net, l_dag_cluster); @@ -497,30 +497,18 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG(a_chain); dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *) a_atom; - dap_chain_cs_dag_event_item_t * l_event_item = DAP_NEW_Z(dap_chain_cs_dag_event_item_t); - if (!l_event_item) { - log_it(L_CRITICAL, "Memory allocation error"); - return ATOM_REJECT; - } - pthread_mutex_t *l_events_mutex = &PVT(l_dag)->events_mutex; - l_event_item->event = l_event; - l_event_item->event_size = a_atom_size; - l_event_item->ts_added = dap_time_now(); - + dap_chain_cs_dag_event_item_t *l_event_item = NULL; dap_chain_hash_fast_t l_event_hash; dap_chain_cs_dag_event_calc_hash(l_event, a_atom_size, &l_event_hash); - l_event_item->hash = l_event_hash; - - if(s_debug_more) { + if (s_debug_more) { char l_event_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = { '\0' }; dap_chain_hash_fast_to_str(&l_event_item->hash, l_event_hash_str, sizeof(l_event_hash_str)); log_it(L_DEBUG, "Processing event: %s ... (size %zd)", l_event_hash_str,a_atom_size); } - - pthread_mutex_lock(l_events_mutex); + pthread_mutex_lock(&PVT(l_dag)->events_mutex); // check if we already have this event - dap_chain_atom_verify_res_t ret = s_dap_chain_check_if_event_is_present(PVT(l_dag)->events, &l_event_item->hash) || - s_dap_chain_check_if_event_is_present(PVT(l_dag)->events_treshold, &l_event_item->hash) ? ATOM_PASS : ATOM_ACCEPT; + dap_chain_atom_verify_res_t ret = s_dap_chain_check_if_event_is_present(PVT(l_dag)->events, &l_event_hash) || + s_dap_chain_check_if_event_is_present(PVT(l_dag)->events_treshold, &l_event_hash) ? ATOM_PASS : ATOM_ACCEPT; // verify hashes and consensus switch (ret) { @@ -534,13 +522,28 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha break; case ATOM_PASS: debug_if(s_debug_more, L_DEBUG, "Atom already present"); - DAP_DELETE(l_event_item); - pthread_mutex_unlock(l_events_mutex); + pthread_mutex_unlock(&PVT(l_dag)->events_mutex); return ret; default: break; } + l_event_item = DAP_NEW_Z(dap_chain_cs_dag_event_item_t); + if (!l_event_item) { + log_it(L_CRITICAL, "Memory allocation error"); + pthread_mutex_unlock(&PVT(l_dag)->events_mutex); + return ATOM_REJECT; + } + l_event_item->event = DAP_DUP_SIZE(a_atom, a_atom_size); + if (!l_event_item->event) { + log_it(L_CRITICAL, "Memory allocation error"); + pthread_mutex_unlock(&PVT(l_dag)->events_mutex); + return ATOM_REJECT; + } + l_event_item->event_size = a_atom_size; + l_event_item->ts_added = dap_time_now(); + l_event_item->hash = l_event_hash; + switch (ret) { case ATOM_MOVE_TO_THRESHOLD: { dap_chain_cs_dag_blocked_t *el = NULL; @@ -588,10 +591,13 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha s_dag_events_lasts_process_new_last_event(l_dag, l_event_item); } break; default: - DAP_DELETE(l_event_item); // Neither added, nor freed break; } - pthread_mutex_unlock(l_events_mutex); + pthread_mutex_unlock(&PVT(l_dag)->events_mutex); + if (ret == ATOM_REJECT) { // Neither added, nor freed + DAP_DELETE(l_event_item->event); + DAP_DELETE(l_event_item); + } return ret; } @@ -726,15 +732,15 @@ static bool s_chain_callback_datums_pool_proc(dap_chain_t *a_chain, dap_chain_da } dap_hash_fast_t l_event_hash; dap_hash_fast(l_event, l_event_size, &l_event_hash); + bool l_res = false; if (l_dag->is_add_directly) { - dap_chain_atom_verify_res_t l_verify_res; - switch (l_verify_res = s_chain_callback_atom_add(a_chain, l_event, l_event_size)) { - case ATOM_ACCEPT: - return dap_chain_atom_save(a_chain->cells, (uint8_t *)l_event, l_event_size, &l_event_hash) > 0; - default: + dap_chain_atom_verify_res_t l_verify_res = s_chain_callback_atom_add(a_chain, l_event, l_event_size); + if (l_verify_res == ATOM_ACCEPT) + l_res = dap_chain_atom_save(a_chain->cells, (uint8_t *)l_event, l_event_size, &l_event_hash) > 0; + else log_it(L_ERROR, "Can't add new event to the file, atom verification result %d", l_verify_res); - return false; - } + DAP_DELETE(l_event); + return l_res; } dap_global_db_set_sync(l_dag->gdb_group_events_round_new, DAG_ROUND_CURRENT_KEY, @@ -742,7 +748,8 @@ static bool s_chain_callback_datums_pool_proc(dap_chain_t *a_chain, dap_chain_da dap_chain_cs_dag_event_round_item_t l_round_item = { .round_info.datum_hash = l_datum_hash }; char *l_event_hash_hex_str = DAP_NEW_STACK_SIZE(char, DAP_CHAIN_HASH_FAST_STR_SIZE); dap_chain_hash_fast_to_str(&l_event_hash, l_event_hash_hex_str, DAP_CHAIN_HASH_FAST_STR_SIZE); - bool l_res = dap_chain_cs_dag_event_gdb_set(l_dag, l_event_hash_hex_str, l_event, l_event_size, &l_round_item); + l_res = dap_chain_cs_dag_event_gdb_set(l_dag, l_event_hash_hex_str, l_event, l_event_size, &l_round_item) == DAP_GLOBAL_DB_RC_SUCCESS; + DAP_DELETE(l_event); log_it(l_res ? L_INFO : L_ERROR, l_res ? "Event %s placed in the new forming round [id %"DAP_UINT64_FORMAT_U"]" : "Can't add new event [%s] to the new events round [id %"DAP_UINT64_FORMAT_U"]", @@ -1539,9 +1546,7 @@ static int s_cli_dag(int argc, char ** argv, void **a_str_reply) dap_string_append_printf( l_str_ret_tmp, "Event %s verification passed\n", l_objs[i].key); // If not verify only mode we add if ( ! l_verify_only ){ - dap_chain_atom_ptr_t l_new_atom = DAP_DUP_SIZE(l_event, l_event_size); // produce deep copy of event; - if(s_chain_callback_atom_add(l_chain, l_new_atom, l_event_size) < 0) { // Add new atom in chain - DAP_DELETE(l_new_atom); + if (s_chain_callback_atom_add(l_chain, l_event, l_event_size) != ATOM_ACCEPT) { // Add new atom in chain dap_string_append_printf(l_str_ret_tmp, "Event %s not added in chain\n", l_objs[i].key); } else { // add event to delete diff --git a/modules/type/none/dap_chain_cs_none.c b/modules/type/none/dap_chain_cs_none.c index acdfb12159..87669c1d41 100644 --- a/modules/type/none/dap_chain_cs_none.c +++ b/modules/type/none/dap_chain_cs_none.c @@ -168,7 +168,8 @@ static int s_cs_callback_new(dap_chain_t *a_chain, dap_config_t UNUSED_ARG *a_ch l_nochain_priv->group_datums = dap_chain_net_get_gdb_group_nochain_new(a_chain); // Add group prefix that will be tracking all changes dap_global_db_cluster_t *l_nonconsensus_cluster = - dap_global_db_cluster_add(dap_global_db_instance_get_default(), l_net->pub.name, + dap_global_db_cluster_add(dap_global_db_instance_get_default(), + l_net->pub.name, l_net->pub.id.uint64, l_nochain_priv->group_datums, 0, true, DAP_GDB_MEMBER_ROLE_USER, DAP_CLUSTER_ROLE_EMBEDDED); if (!l_nonconsensus_cluster) { @@ -373,7 +374,7 @@ static dap_chain_atom_verify_res_t s_nonconsensus_callback_atom_add(dap_chain_t dap_chain_datum_t *l_datum = (dap_chain_datum_t*) a_atom; dap_hash_fast_t l_datum_hash; dap_hash_fast(l_datum->data, l_datum->header.data_size, &l_datum_hash); - if(dap_chain_datum_add(a_chain, l_datum, a_atom_size, &l_datum_hash)) + if (dap_chain_datum_add(a_chain, l_datum, a_atom_size, &l_datum_hash)) return ATOM_REJECT; dap_nonconsensus_datum_hash_item_t * l_hash_item = DAP_NEW_Z(dap_nonconsensus_datum_hash_item_t); -- GitLab