diff --git a/CMakeLists.txt b/CMakeLists.txt index 14b4ffc9cc4e88398632384998306e9256edf0ad..08fce37d85e3ea2bb2beaa8bc14b5f617bc96753 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -85,7 +85,7 @@ endif() if (CELLFRAME_MODULES MATCHES "network") message("[+] Module 'network'") set(CELLFRAME_LIBS ${CELLFRAME_LIBS} dap_io dap_json_rpc dap_enc_server dap_notify_srv dap_http_server dap_session - dap_stream dap_stream_ch dap_client dap_cli_server dap_stream_ch_chain dap_stream_ch_chain_net dap_chain_net dap_chain_net_srv dap_stream_ch_chain_voting dap_chain_mempool ) + dap_stream dap_stream_ch dap_client dap_cli_server dap_stream_ch_chain_net dap_chain_net dap_chain_net_srv dap_stream_ch_chain_voting dap_chain_mempool ) endif() # Chain net services diff --git a/dap-sdk b/dap-sdk index f8a874cb6a49bb84f3cba682f955833cf6038d21..5f9e8c29db8112744e947c2fa29d0a71936cecde 160000 --- a/dap-sdk +++ b/dap-sdk @@ -1 +1 @@ -Subproject commit f8a874cb6a49bb84f3cba682f955833cf6038d21 +Subproject commit 5f9e8c29db8112744e947c2fa29d0a71936cecde diff --git a/modules/CMakeLists.txt b/modules/CMakeLists.txt index 6f7f30de5096b901005ff1fe5d7237124744ac76..188c40318a74c72a529f19877ee7b0ecc95003ab 100644 --- a/modules/CMakeLists.txt +++ b/modules/CMakeLists.txt @@ -22,7 +22,6 @@ if (CELLFRAME_MODULES MATCHES "network") add_subdirectory(net/srv) add_subdirectory(json_rpc) # Stream channels - add_subdirectory(channel/chain) add_subdirectory(channel/chain-net) add_subdirectory(channel/chain-voting) endif() diff --git a/modules/chain/CMakeLists.txt b/modules/chain/CMakeLists.txt index 892e2ac0578aaf5d0031add097f30a72be3520b6..f635a344e37e81cb627ab9e6493f0bfc4b39abab 100644 --- a/modules/chain/CMakeLists.txt +++ b/modules/chain/CMakeLists.txt @@ -14,7 +14,7 @@ endif() #find_package(PkgConfig REQUIRED) #pkg_search_module(GLIB REQUIRED glib-2.0) -target_link_libraries(${PROJECT_NAME} dap_chain_common dap_global_db dap_notify_srv ${GLIB_LDFLAGS}) +target_link_libraries(${PROJECT_NAME} dap_chain_common dap_global_db dap_notify_srv dap_stream ${GLIB_LDFLAGS}) target_include_directories(${PROJECT_NAME} INTERFACE . include/ ${GLIB_INCLUDE_DIRS}) target_include_directories(${PROJECT_NAME} PUBLIC include) target_include_directories(${PROJECT_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../../../dap-sdk/3rdparty/uthash/src) diff --git a/modules/chain/dap_chain.c b/modules/chain/dap_chain.c index 2d9c11b8870a26318e58224ab1b2f480e63deb1e..f22458c787606abb4b1342d7db8d999f5777ad6a 100644 --- a/modules/chain/dap_chain.c +++ b/modules/chain/dap_chain.c @@ -39,6 +39,8 @@ #include "dap_chain_cell.h" #include "dap_chain_cs.h" #include "dap_cert_file.h" +#include "dap_chain_ch.h" +#include "dap_stream_ch_gossip.h" #include <uthash.h> #include <pthread.h> @@ -703,32 +705,45 @@ bool dap_chain_get_atom_last_hash(dap_chain_t *a_chain, dap_hash_fast_t *a_atom_ return l_ret; } -ssize_t dap_chain_atom_save(dap_chain_t *a_chain, const uint8_t *a_atom, size_t a_atom_size, dap_chain_cell_id_t a_cell_id) +ssize_t dap_chain_atom_save(dap_chain_cell_t *a_chain_cell, const uint8_t *a_atom, size_t a_atom_size, dap_hash_fast_t *a_new_atom_hash) { - dap_chain_cell_t *l_cell = dap_chain_cell_find_by_id(a_chain, a_cell_id); - if (!l_cell) { - log_it(L_INFO, "Creating cell 0x%016"DAP_UINT64_FORMAT_X, a_cell_id.uint64); - l_cell = dap_chain_cell_create_fill(a_chain, a_cell_id); - if (!l_cell) { - log_it(L_ERROR, "Can't create cell with id 0x%"DAP_UINT64_FORMAT_x" to save event...", a_cell_id.uint64); - return -7; + dap_return_val_if_fail(a_chain_cell && a_chain_cell->chain, -1); + dap_chain_t *l_chain = a_chain_cell->chain; + + if (a_new_atom_hash) { // Atom is new and need to be distributed for the net + dap_cluster_t *l_net_cluster = dap_cluster_find(l_chain->net_id.uint64); + if (l_net_cluster) { + size_t l_pkt_size = a_atom_size + sizeof(dap_stream_ch_chain_pkt_t); + dap_stream_ch_chain_pkt_t *l_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_pkt_size); + if (l_pkt) { + l_pkt->hdr.version = 2; + l_pkt->hdr.data_size = a_atom_size; + l_pkt->hdr.net_id = l_chain->net_id; + l_pkt->hdr.chain_id = l_chain->id; + l_pkt->hdr.cell_id = a_chain_cell->id; + memcpy(l_pkt->data, a_atom, a_atom_size); + dap_gossip_msg_issue(l_net_cluster, DAP_STREAM_CH_CHAIN_ID, + l_pkt, l_pkt_size, a_new_atom_hash); + DAP_DELETE(l_pkt); + } else + log_it(L_CRITICAL, "Not enough memory"); } } - ssize_t l_res = dap_chain_cell_file_append(l_cell, a_atom, a_atom_size); - if (a_chain->atom_notifiers) { + ssize_t l_res = dap_chain_cell_file_append(a_chain_cell, a_atom, a_atom_size); + if (l_chain->atom_notifiers) { dap_list_t *l_iter; - DL_FOREACH(a_chain->atom_notifiers, l_iter) { + DL_FOREACH(l_chain->atom_notifiers, l_iter) { dap_chain_atom_notifier_t *l_notifier = (dap_chain_atom_notifier_t*)l_iter->data; - l_notifier->callback(l_notifier->arg, a_chain, l_cell->id, (void*)a_atom, a_atom_size); + l_notifier->callback(l_notifier->arg, l_chain, a_chain_cell->id, (void*)a_atom, a_atom_size); } } - if (a_chain->callback_atom_add_from_treshold) { + if (l_chain->callback_atom_add_from_treshold) { dap_chain_atom_ptr_t l_atom_treshold; do { size_t l_atom_treshold_size; - l_atom_treshold = a_chain->callback_atom_add_from_treshold(a_chain, &l_atom_treshold_size); + l_atom_treshold = l_chain->callback_atom_add_from_treshold(l_chain, &l_atom_treshold_size); if (l_atom_treshold) { - if (dap_chain_cell_file_append(l_cell, l_atom_treshold, l_atom_treshold_size) > 0) + if (dap_chain_cell_file_append(a_chain_cell, l_atom_treshold, l_atom_treshold_size) > 0) log_it(L_INFO, "Added atom from treshold"); else log_it(L_ERROR, "Can't add atom from treshold"); diff --git a/modules/chain/dap_chain_cell.c b/modules/chain/dap_chain_cell.c index 5df2bb4331c452557430d6fa950bcb1300380387..dc7aaacdfd9cb1d252a6fee0147f27dc0a76cf55 100644 --- a/modules/chain/dap_chain_cell.c +++ b/modules/chain/dap_chain_cell.c @@ -239,10 +239,8 @@ int dap_chain_cell_load(dap_chain_t *a_chain, dap_chain_cell_t *a_cell) l_ret = -6; break; } - dap_chain_atom_verify_res_t l_res = a_chain->callback_atom_add(a_chain, l_element, l_el_size); // !!! blocking GDB call !!! - if (l_res == ATOM_PASS || l_res == ATOM_REJECT) { - DAP_DELETE(l_element); - } + a_chain->callback_atom_add(a_chain, l_element, l_el_size); // ??? blocking GDB call + DAP_DELETE(l_element); ++q; } if (l_ret < 0) { diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/chain/dap_chain_ch.c similarity index 77% rename from modules/channel/chain/dap_stream_ch_chain.c rename to modules/chain/dap_chain_ch.c index 2e600f6f1a237c97eac9e2ec4a87466bfc380b5a..0b59fcd393d446fb7c96c4577737e2543e12900c 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/chain/dap_chain_ch.c @@ -64,9 +64,9 @@ #include "dap_stream_ch_pkt.h" #include "dap_stream_ch.h" #include "dap_stream_ch_proc.h" -#include "dap_stream_ch_chain.h" -#include "dap_stream_ch_chain_pkt.h" -#include "dap_chain_net.h" +#include "dap_chain_ch.h" +#include "dap_chain_ch_pkt.h" +#include "dap_stream_ch_gossip.h" #define LOG_TAG "dap_stream_ch_chain" @@ -105,15 +105,15 @@ static void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg); static void s_stream_ch_io_complete(dap_events_socket_t *a_es, void *a_arg, int a_errno); static void s_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const char * a_err_string); -static bool s_sync_out_chains_proc_callback(dap_proc_thread_t *a_thread, void *a_arg); +static bool s_sync_out_chains_proc_callback(void *a_arg); static void s_sync_out_chains_last_worker_callback(dap_worker_t *a_worker, void *a_arg); static void s_sync_out_chains_first_worker_callback(dap_worker_t *a_worker, void *a_arg); -static bool s_sync_out_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_arg); +static bool s_sync_out_gdb_proc_callback(void *a_arg); -static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg); +static bool s_sync_in_chains_callback(void *a_arg); -static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg); +static bool s_gdb_in_pkt_proc_callback(void *a_arg); static bool s_gdb_in_pkt_proc_set_raw_callback(dap_global_db_instance_t *a_dbi, int a_rc, const char *a_group, const size_t a_values_total, const size_t a_values_count, @@ -124,15 +124,11 @@ static void s_free_log_list_gdb ( dap_stream_ch_chain_t * a_ch_chain); static void s_stream_ch_chain_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size); +static void s_gossip_payload_callback(void *a_payload, size_t a_payload_size, dap_stream_node_addr_t a_sender_addr); static bool s_debug_more=false; static uint_fast16_t s_update_pack_size=100; // Number of hashes packed into the one packet static uint_fast16_t s_skip_in_reactor_count=50; // Number of hashes packed to skip in one reactor loop callback out packet -static char **s_list_ban_groups = NULL; -static char **s_list_white_groups = NULL; -static uint16_t s_size_ban_groups = 0; -static uint16_t s_size_white_groups = 0; - #ifdef DAP_SYS_DEBUG @@ -143,8 +139,6 @@ static dap_memstat_rec_t s_memstat [MEMSTAT$K_NR] = { #endif - - /** * @brief dap_stream_ch_chain_init * @return @@ -152,18 +146,15 @@ static dap_memstat_rec_t s_memstat [MEMSTAT$K_NR] = { int dap_stream_ch_chain_init() { log_it(L_NOTICE, "Chains and global db exchange channel initialized"); - dap_stream_ch_proc_add(DAP_STREAM_CH_ID, s_stream_ch_new, s_stream_ch_delete, s_stream_ch_packet_in, + dap_stream_ch_proc_add(DAP_STREAM_CH_CHAIN_ID, s_stream_ch_new, s_stream_ch_delete, s_stream_ch_packet_in, s_stream_ch_packet_out); s_debug_more = dap_config_get_item_bool_default(g_config,"stream_ch_chain","debug_more",false); s_update_pack_size = dap_config_get_item_int16_default(g_config,"stream_ch_chain","update_pack_size",100); - s_list_ban_groups = dap_config_get_array_str(g_config, "stream_ch_chain", "ban_list_sync_groups", &s_size_ban_groups); - s_list_white_groups = dap_config_get_array_str(g_config, "stream_ch_chain", "white_list_sync_groups", &s_size_white_groups); - #ifdef DAP_SYS_DEBUG for (int i = 0; i < MEMSTAT$K_NR; i++) dap_memstat_reg(&s_memstat[i]); #endif - + assert(!dap_stream_ch_gossip_callback_add(DAP_STREAM_CH_CHAIN_ID, s_gossip_payload_callback)); return 0; } @@ -276,16 +267,13 @@ static void s_sync_out_chains_first_worker_callback(dap_worker_t *a_worker, void l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS; l_ch_chain->request_atom_iter = l_sync_request->chain.request_atom_iter; - dap_chain_node_addr_t l_node_addr = {}; - dap_chain_net_t *l_net = dap_chain_net_by_id(l_sync_request->request_hdr.net_id); - l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); if (s_debug_more ) log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN"); dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, - l_ch_chain->request_hdr.cell_id.uint64, &l_node_addr, sizeof(dap_chain_node_addr_t)); + l_ch_chain->request_hdr.cell_id.uint64, &g_node_addr, sizeof(dap_chain_node_addr_t)); DAP_DELETE(l_sync_request); } @@ -331,7 +319,7 @@ static void s_sync_out_chains_last_worker_callback(dap_worker_t *a_worker, void * @param a_arg * @return */ -static bool s_sync_out_chains_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) +static bool s_sync_out_chains_proc_callback(void *a_arg) { struct sync_request * l_sync_request = (struct sync_request *) a_arg; @@ -385,16 +373,13 @@ static void s_sync_out_gdb_first_worker_callback(dap_worker_t *a_worker, void *a return; } - dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_hdr.net_id); // Add it to outgoing list l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB; - dap_chain_node_addr_t l_node_addr = { 0 }; - l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); if (s_debug_more ) log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB"); dap_stream_ch_chain_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, - l_ch_chain->request_hdr.cell_id.uint64, &l_node_addr, sizeof(dap_chain_node_addr_t)); + l_ch_chain->request_hdr.cell_id.uint64, &g_node_addr, sizeof(dap_chain_node_addr_t)); if(l_ch_chain->callback_notify_packet_out) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, NULL, 0, l_ch_chain->callback_notify_arg); @@ -445,7 +430,7 @@ static void s_sync_out_gdb_last_worker_callback(dap_worker_t *a_worker, void *a_ * @param a_arg * @return */ -static bool s_sync_out_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) +static bool s_sync_out_gdb_proc_callback(void *a_arg) { /* struct sync_request *l_sync_request = (struct sync_request *)a_arg; @@ -480,7 +465,7 @@ static bool s_sync_out_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_ar } else { dap_worker_exec_callback_on(dap_events_worker_get(l_sync_request->worker->id), s_sync_out_gdb_last_worker_callback, l_sync_request ); } */ - return true; + return false; } static void s_sync_update_gdb_start_worker_callback(dap_worker_t *a_worker, void *a_arg) @@ -503,7 +488,7 @@ static void s_sync_update_gdb_start_worker_callback(dap_worker_t *a_worker, void DAP_DELETE(l_sync_request); } -static bool s_sync_update_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) +static bool s_sync_update_gdb_proc_callback(void *a_arg) { /* struct sync_request *l_sync_request = (struct sync_request *)a_arg; @@ -539,7 +524,12 @@ static bool s_sync_update_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a l_sync_request->request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); dap_worker_exec_callback_on(dap_events_worker_get(l_sync_request->worker->id), s_sync_update_gdb_start_worker_callback, l_sync_request); */ - return true; + return false; +} + +static bool s_gdb_in_pkt_proc_callback(void *a_arg) +{ + return false; } /** @@ -548,55 +538,38 @@ static bool s_sync_update_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a * @param a_arg void * @return */ -static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) +static bool s_sync_in_chains_callback(void *a_arg) { - UNUSED(a_thread); - struct sync_request *l_sync_request = (struct sync_request *) a_arg; - if (!l_sync_request) { + dap_stream_ch_chain_pkt_t *l_chain_pkt = a_arg; + if (!l_chain_pkt || !l_chain_pkt->hdr.data_size) { log_it(L_CRITICAL, "Proc thread received corrupted chain packet!"); return false; } - dap_chain_pkt_item_t *l_pkt_item = &l_sync_request->pkt; - dap_chain_hash_fast_t l_atom_hash = {}; - if (l_pkt_item->pkt_data_size == 0 || !l_pkt_item->pkt_data) { - log_it(L_CRITICAL, "In proc thread got CHAINS stream ch packet with zero data"); - DAP_DEL_Z(l_pkt_item->pkt_data); - DAP_DELETE(l_sync_request); - return false; - } - dap_chain_t *l_chain = dap_chain_find_by_id(l_sync_request->request_hdr.net_id, l_sync_request->request_hdr.chain_id); + dap_chain_atom_ptr_t l_atom = (dap_chain_atom_ptr_t)l_chain_pkt->data; + uint64_t l_atom_size = l_chain_pkt->hdr.data_size; + dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if (!l_chain) { - if (s_debug_more) - log_it(L_WARNING, "No chain found for DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN"); - DAP_DEL_Z(l_pkt_item->pkt_data); - DAP_DELETE(l_sync_request); + debug_if(s_debug_more, L_WARNING, "No chain found for DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN"); + DAP_DELETE(l_chain_pkt); return false; } - dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_item->pkt_data; - uint64_t l_atom_copy_size = l_pkt_item->pkt_data_size; - dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash); - dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size); - char l_atom_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = {[0]='\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)); + char *l_atom_hash_str = NULL; + if (s_debug_more) + dap_get_data_hash_str_static(l_atom, l_atom_size, l_atom_hash_str); + dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom, l_atom_size); switch (l_atom_add_res) { case ATOM_PASS: - if (s_debug_more){ - log_it(L_WARNING, "Atom with hash %s for %s:%s not accepted (code ATOM_PASS, already present)", l_atom_hash_str, l_chain->net_name, l_chain->name); - } + debug_if(s_debug_more, L_WARNING, "Atom with hash %s for %s:%s not accepted (code ATOM_PASS, already present)", + l_atom_hash_str, l_chain->net_name, l_chain->name); //dap_chain_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); - DAP_DELETE(l_atom_copy); break; case ATOM_MOVE_TO_THRESHOLD: - if (s_debug_more) { - log_it(L_INFO, "Thresholded atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); - } + debug_if(s_debug_more, L_INFO, "Thresholded atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); //dap_chain_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); break; case ATOM_ACCEPT: - if (s_debug_more) { - log_it(L_INFO,"Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); - } - int l_res = dap_chain_atom_save(l_chain, l_atom_copy, l_atom_copy_size, l_sync_request->request_hdr.cell_id); + debug_if(s_debug_more, L_INFO,"Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); + int l_res = dap_chain_atom_save(l_chain->cells, l_atom, l_atom_size, NULL); if(l_res < 0) { log_it(L_ERROR, "Can't save atom %s to the file", l_atom_hash_str); } else { @@ -604,23 +577,34 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) } break; case ATOM_REJECT: { - if (s_debug_more) { - char l_atom_hash_str[72] = {'\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); - log_it(L_WARNING,"Atom with hash %s for %s:%s rejected", l_atom_hash_str, l_chain->net_name, l_chain->name); - } - DAP_DELETE(l_atom_copy); + debug_if(s_debug_more, L_WARNING, "Atom with hash %s for %s:%s rejected", l_atom_hash_str, l_chain->net_name, l_chain->name); break; } default: - DAP_DELETE(l_atom_copy); log_it(L_CRITICAL, "Wtf is this ret code? %d", l_atom_add_res); break; } - DAP_DEL_Z(l_sync_request); + DAP_DELETE(l_chain_pkt); return false; } +static void s_gossip_payload_callback(void *a_payload, size_t a_payload_size, dap_stream_node_addr_t UNUSED_ARG a_sender_addr) +{ + assert(a_payload && a_payload_size); + dap_stream_ch_chain_pkt_t *l_chain_pkt = a_payload; + if (a_payload_size <= sizeof(dap_stream_ch_chain_pkt_t) || + a_payload_size != sizeof(dap_stream_ch_chain_pkt_t) + l_chain_pkt->hdr.data_size) { + log_it(L_WARNING, "Incorrect chain GOSSIP packet size"); + return; + } + dap_stream_ch_chain_pkt_t *l_chain_pkt_copy = DAP_DUP_SIZE(a_payload, a_payload_size); + if (!l_chain_pkt_copy) { + log_it(L_CRITICAL, "Not enough memory"); + return; + } + dap_proc_thread_callback_add(NULL, s_sync_in_chains_callback, l_chain_pkt_copy); +} + /** * @brief s_gdb_in_pkt_error_worker_callback * @param a_thread @@ -642,87 +626,6 @@ static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_worker, void *a_a DAP_DELETE(l_sync_request); } -/** - * @brief s_gdb_sync_tsd_worker_callback - * @param a_worker - * @param a_arg - */ -static void s_gdb_sync_tsd_worker_callback(dap_worker_t *a_worker, void *a_arg) -{ - struct sync_request *l_sync_request = (struct sync_request *) a_arg; - - dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(a_worker), l_sync_request->ch_uuid); - if( l_ch == NULL ) { - log_it(L_INFO,"Client disconnected before we sent the reply"); - } else { - size_t l_gr_len = strlen(l_sync_request->gdb.sync_group) + 1; - size_t l_data_size = 2 * sizeof(uint64_t) + l_gr_len; - dap_tsd_t *l_tsd_rec = DAP_NEW_SIZE(dap_tsd_t, l_data_size + sizeof(dap_tsd_t)); - l_tsd_rec->type = DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_LAST_ID; - l_tsd_rec->size = l_data_size; - uint64_t l_node_addr = dap_chain_net_get_cur_addr_int(dap_chain_net_by_id(l_sync_request->request_hdr.net_id)); - void *l_data_ptr = l_tsd_rec->data; - memcpy(l_data_ptr, &l_node_addr, sizeof(uint64_t)); - l_data_ptr += sizeof(uint64_t); - memcpy(l_data_ptr, &l_sync_request->request.id_end, sizeof(uint64_t)); - l_data_ptr += sizeof(uint64_t); - memcpy(l_data_ptr, l_sync_request->gdb.sync_group, l_gr_len); - s_stream_ch_chain_pkt_write(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD, - l_sync_request->request_hdr.net_id.uint64, - l_sync_request->request_hdr.chain_id.uint64, - l_sync_request->request_hdr.cell_id.uint64, - l_tsd_rec, l_tsd_rec->size + sizeof(dap_tsd_t)); - DAP_DELETE(l_tsd_rec); - } - DAP_DELETE(l_sync_request->gdb.sync_group); - DAP_DELETE(l_sync_request); -} - -/** - * @brief s_gdb_in_pkt_proc_callback_get_ts_callback - * @param a_global_db_context - * @param a_rc - * @param a_group - * @param a_key - * @param a_value - * @param a_value_len - * @param value_ts - * @param a_is_pinned - * @param a_arg - */ - -/** - * @brief s_gdb_in_pkt_callback - * @param a_thread - * @param a_arg - * @return - */ -static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) -{ - return true; -} - -/** - * @brief s_gdb_in_pkt_proc_set_raw_callback - * @param a_global_db_context - * @param a_rc - * @param a_group - * @param a_key - * @param a_values_current - * @param a_values_shift - * @param a_values_count - * @param a_values - * @param a_arg - */ -static bool s_gdb_in_pkt_proc_set_raw_callback(dap_global_db_instance_t *a_dbi, - int a_rc, const char *a_group, - const size_t a_values_total, const size_t a_values_count, - dap_store_obj_t *a_values, void *a_arg) -{ - return true; -} - - /** * @brief dap_stream_ch_chain_create_sync_request_gdb * @param a_ch_chain @@ -848,45 +751,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) return; } size_t l_chain_pkt_data_size = l_ch_pkt->hdr.data_size - sizeof(l_chain_pkt->hdr); - dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); - if (!l_net) { - if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR) { - if(l_ch_chain->callback_notify_packet_in) { - l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt, - l_chain_pkt_data_size, l_ch_chain->callback_notify_arg); - } - } else { - log_it(L_ERROR, "Invalid request from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x - " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", - a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.ext_id, - l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, - l_chain_pkt->hdr.cell_id.uint64); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_NET_INVALID_ID"); - // Who are you? I don't know you! go away! - a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; - } - return; - } - if (dap_chain_net_get_state(l_net) == NET_STATE_OFFLINE) { - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_NET_IS_OFFLINE"); - a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; - return; - } - uint16_t l_acl_idx = dap_chain_net_get_acl_idx(l_net); - uint8_t l_acl = a_ch->stream->session->acl ? a_ch->stream->session->acl[l_acl_idx] : 1; - if (!l_acl) { - log_it(L_WARNING, "Unauthorized request attempt from %s to network %s", - a_ch->stream->esocket->remote_addr_str, - dap_chain_net_by_id(l_chain_pkt->hdr.net_id)->pub.name); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_NET_NOT_AUTHORIZED"); - return; - } + s_chain_timer_reset(l_ch_chain); switch (l_ch_pkt->hdr.type) { /// --- GDB update --- @@ -903,11 +768,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); if (l_chain_pkt_data_size == (size_t)sizeof(dap_stream_ch_chain_sync_request_t)) l_ch_chain->request = *(dap_stream_ch_chain_sync_request_t*)l_chain_pkt->data; - dap_chain_node_client_t *l_client = (dap_chain_node_client_t *)l_ch_chain->callback_notify_arg; - if (l_client && l_client->resync_gdb) - l_ch_chain->request.id_start = 0; - else - l_ch_chain->request.id_start = 1; // incremental sync by default + l_ch_chain->request.id_start = 0; struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); l_ch_chain->stats_request_gdb_processed = 0; l_ch_chain->request_hdr = l_chain_pkt->hdr; @@ -916,26 +777,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) // Response with metadata organized in TSD case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD: { - if (l_chain_pkt_data_size) { - dap_tsd_t *l_tsd_rec = (dap_tsd_t *)l_chain_pkt->data; - if (l_tsd_rec->type != DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_LAST_ID || - l_tsd_rec->size < 2 * sizeof(uint64_t) + 2) { - break; - } - void *l_data_ptr = l_tsd_rec->data; - uint64_t l_node_addr = *(uint64_t *)l_data_ptr; - l_data_ptr += sizeof(uint64_t); - uint64_t l_last_id = *(uint64_t *)l_data_ptr; - l_data_ptr += sizeof(uint64_t); - char *l_group = (char *)l_data_ptr; - //dap_db_set_last_id_remote(l_node_addr, l_last_id, l_group); - if (s_debug_more) { - dap_chain_node_addr_t l_addr; - l_addr.uint64 = l_node_addr; - log_it(L_INFO, "Set last_id %"DAP_UINT64_FORMAT_U" for group %s for node "NODE_ADDR_FP_STR, - l_last_id, l_group, NODE_ADDR_FP_ARGS_S(l_addr)); - } - } else if (s_debug_more) + if (s_debug_more) log_it(L_DEBUG, "Global DB TSD packet detected"); } break; @@ -993,39 +835,24 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } } break; // End of response with starting of DB sync - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END: { if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { - if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB && l_ch_chain->state != CHAIN_STATE_IDLE) { - log_it(L_WARNING, "Can't process SYNC_GLOBAL_DB request because not in idle state"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_STATE_NOT_IN_IDLE"); - break; - } - if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END && - (l_ch_chain->state != CHAIN_STATE_UPDATE_GLOBAL_DB_REMOTE || - memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_t)))) { + if (l_ch_chain->state != CHAIN_STATE_UPDATE_GLOBAL_DB_REMOTE || + memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_t))) { log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_END request because its already busy with syncronization"); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); break; } - if(s_debug_more) - { - if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END) - log_it(L_INFO, "In: UPDATE_GLOBAL_DB_END pkt with total count %d hashes", + debug_if(s_debug_more, L_INFO, "In: UPDATE_GLOBAL_DB_END pkt with total count %d hashes", HASH_COUNT(l_ch_chain->remote_gdbs)); - else - log_it(L_INFO, "In: SYNC_GLOBAL_DB pkt"); - } if (l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) l_ch_chain->request = *(dap_stream_ch_chain_sync_request_t*)l_chain_pkt->data; struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_sync_request); - }else{ - log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); + } else { + log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_CHAIN_PKT_DATA_SIZE" ); @@ -1070,36 +897,12 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); if (!l_ch_chain->callback_notify_packet_in) { // we haven't node client waitng, so reply to other side dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; - dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); - l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); + l_sync_gdb.node_addr.uint64 = g_node_addr.uint64; dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_sync_gdb, sizeof(l_sync_gdb)); } } break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: { - dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; - l_sync_gdb.id_start = 1; - dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); - l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); - log_it(L_INFO, "In: SYNC_GLOBAL_DB_RVRS pkt: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x - ", request gdb sync from %"DAP_UINT64_FORMAT_U, l_chain_pkt->hdr.net_id.uint64 , l_chain_pkt->hdr.chain_id.uint64, - l_chain_pkt->hdr.cell_id.uint64, l_sync_gdb.id_start ); - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_sync_gdb, sizeof(l_sync_gdb)); - } break; - - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: { - if (s_debug_more) - log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x, - l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - } break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: { - if (s_debug_more) - log_it(L_INFO, "In: FIRST_GLOBAL_DB_GROUP pkt net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x, - l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - } break; - /// --- Chains update --- // Request for atoms list update case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ:{ @@ -1144,10 +947,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if (!l_chain) { - log_it(L_ERROR, "Invalid UPDATE_CHAINS_START request from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x + log_it(L_ERROR, "Invalid UPDATE_CHAINS_START request from %s with net id 0x%016"DAP_UINT64_FORMAT_x " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", - a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.ext_id, - l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, + a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, @@ -1163,15 +965,15 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; // Response with atom hashes and sizes - case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS :{ + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS: { unsigned int l_count_added=0; unsigned int l_count_total=0; dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if (! l_chain){ - log_it(L_ERROR, "Invalid UPDATE_CHAINS packet from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x + log_it(L_ERROR, "Invalid UPDATE_CHAINS packet from %s with net id 0x%016"DAP_UINT64_FORMAT_x " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", - a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.ext_id, + a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, @@ -1213,19 +1015,10 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_INFO,"In: Added %u from %u remote atom hash in list",l_count_added,l_count_total); } break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END: - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: { + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END: { if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { - if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS && l_ch_chain->state != CHAIN_STATE_IDLE) { - log_it(L_WARNING, "Can't process SYNC_CHAINS request because not in idle state"); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_STATE_NOT_IN_IDLE"); - break; - } - if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END && - (l_ch_chain->state != CHAIN_STATE_UPDATE_CHAINS_REMOTE || - memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_t)))) { + if (l_ch_chain->state != CHAIN_STATE_UPDATE_CHAINS_REMOTE || + memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_t))) { log_it(L_WARNING, "Can't process UPDATE_CHAINS_END request because its already busy with syncronization"); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, @@ -1234,40 +1027,23 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if (!l_chain) { - log_it(L_ERROR, "Invalid UPDATE_CHAINS packet from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x + log_it(L_ERROR, "Invalid UPDATE_CHAINS packet from %s with net id 0x%016"DAP_UINT64_FORMAT_x " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", - a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.ext_id, - l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, + a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_NET_INVALID_ID"); break; } - if(s_debug_more) - { - if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END) - log_it(L_INFO, "In: UPDATE_CHAINS_END pkt with total count %d hashes", + debug_if(s_debug_more, L_INFO, "In: UPDATE_CHAINS_END pkt with total count %d hashes", HASH_COUNT(l_ch_chain->remote_atoms)); - else - log_it(L_INFO, "In: SYNC_CHAINS pkt"); - } struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); l_ch_chain->stats_request_atoms_processed = 0; l_ch_chain->request_hdr = l_chain_pkt->hdr; - if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS) { - char l_hash_from_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = { '\0' }, l_hash_to_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = { '\0' }; - dap_chain_hash_fast_t l_hash_from = l_ch_chain->request.hash_from, - l_hash_to = l_ch_chain->request.hash_to; - dap_chain_hash_fast_to_str(&l_hash_from, l_hash_from_str, DAP_CHAIN_HASH_FAST_STR_SIZE); - dap_chain_hash_fast_to_str(&l_hash_to, l_hash_to_str, DAP_CHAIN_HASH_FAST_STR_SIZE); - log_it(L_INFO, "In: SYNC_CHAINS pkt: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x - " between %s and %s", l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, - l_hash_from_str[0] ? l_hash_from_str : "(null)", l_hash_to_str[0] ? l_hash_to_str : "(null)"); - } dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_sync_request); } else { - log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: Wrong chain packet size %zd when expected %zd", + log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, @@ -1292,37 +1068,36 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: { - if(l_chain_pkt_data_size) { - dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); - if(l_chain) { - // Expect atom element in - if(l_chain_pkt_data_size > 0) { - struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); - dap_chain_pkt_item_t *l_pkt_item = &l_sync_request->pkt; - l_pkt_item->pkt_data = DAP_DUP_SIZE(l_chain_pkt->data, l_chain_pkt_data_size); - if (!l_pkt_item->pkt_data) { - log_it(L_ERROR, "Not enough memory!"); - DAP_DELETE(l_sync_request); - break; - } - l_pkt_item->pkt_data_size = l_chain_pkt_data_size; - if (s_debug_more){ - dap_chain_hash_fast_t l_atom_hash={0}; - dap_hash_fast(l_chain_pkt->data, l_chain_pkt_data_size ,&l_atom_hash); - char l_atom_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = { '\0' }; - dap_chain_hash_fast_to_str(&l_atom_hash, l_atom_hash_str, DAP_CHAIN_HASH_FAST_STR_SIZE); - log_it(L_INFO, "In: CHAIN pkt: atom hash %s (size %zd)", l_atom_hash_str, l_chain_pkt_data_size); - } - if (dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_in_chains_callback, l_sync_request)) - log_it(L_ERROR, "System queue overflow with atom trying atom add. All following atoms will be rejected!"); - } else { - log_it(L_WARNING, "Empty chain packet"); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_CHAIN_PACKET_EMPTY"); - } - } + dap_stream_ch_chain_pkt_t *l_chain_pkt = (dap_stream_ch_chain_pkt_t *)l_ch_pkt->data; + if (l_chain_pkt_data_size <= sizeof(dap_stream_ch_chain_pkt_t) || + l_chain_pkt_data_size != sizeof(dap_stream_ch_chain_pkt_t) + l_chain_pkt->hdr.data_size) { + log_it(L_WARNING, "Incorrect chain packet size"); + break; + } + dap_cluster_t *l_cluster = dap_cluster_find(l_chain_pkt->hdr.net_id.uint64); + if (!l_cluster) { + log_it(L_WARNING, "Can't find cluster with ID 0x%" DAP_UINT64_FORMAT_X, l_chain_pkt->hdr.net_id.uint64); + break; + } + dap_cluster_member_t *l_check = dap_cluster_member_find_unsafe(l_cluster, &a_ch->stream->node); + if (!l_check) { + log_it(L_WARNING, "Node with addr "NODE_ADDR_FP_STR" isn't a member of cluster %s", + NODE_ADDR_FP_ARGS_S(a_ch->stream->node), l_cluster->mnemonim); + break; } + dap_stream_ch_chain_pkt_t *l_chain_pkt_copy = DAP_DUP_SIZE(l_chain_pkt->data, l_chain_pkt_data_size); + if (!l_chain_pkt_copy) { + log_it(L_CRITICAL, "Not enough memory"); + break; + } + if (l_chain_pkt_copy->hdr.version < 2) + l_chain_pkt_copy->hdr.data_size = l_chain_pkt_data_size; + if (s_debug_more) { + char *l_atom_hash_str; + dap_get_data_hash_str_static(l_chain_pkt->data, l_chain_pkt_data_size, l_atom_hash_str); + log_it(L_INFO, "In: CHAIN pkt: atom hash %s (size %zd)", l_atom_hash_str, l_chain_pkt_data_size); + } + dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_in_chains_callback, l_chain_pkt_copy); } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { @@ -1344,11 +1119,10 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if (!l_ch_chain->callback_notify_packet_in) { // we haven't node client waitng, so reply to other side dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if (!l_chain) { - log_it(L_ERROR, "Invalid SYNCED_CHAINS packet from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x + log_it(L_ERROR, "Invalid SYNCED_CHAINS packet from %s with net id 0x%016"DAP_UINT64_FORMAT_x " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", - a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.ext_id, - l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, - l_chain_pkt->hdr.cell_id.uint64); + a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_NET_INVALID_ID"); @@ -1363,32 +1137,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } } break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: { - if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { - dap_stream_ch_chain_sync_request_t l_request = { }; - dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); - if (l_chain) { - dap_chain_hash_fast_t l_hash; - dap_chain_get_atom_last_hash(l_chain, &l_hash, l_chain_pkt->hdr.cell_id); // Move away from i/o reactor to callback processor - l_request.hash_from = l_hash; - if( dap_log_level_get()<= L_INFO){ - char l_hash_from_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = {'\0'}; - dap_chain_hash_fast_to_str(&l_hash, l_hash_from_str, DAP_CHAIN_HASH_FAST_STR_SIZE); - log_it(L_INFO, "In: SYNC_CHAINS_RVRS pkt: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x - "request chains sync from %s", l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - l_hash_from_str[0] ? l_hash_from_str : "(null)"); - } - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_request, sizeof(l_request)); - } - }else{ - log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_CHAIN_PKT_DATA_SIZE" ); - } - } break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR:{ char * l_error_str = (char*)l_chain_pkt->data; if(l_chain_pkt_data_size>1) @@ -1399,12 +1147,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt_data_size ? l_error_str : "<empty>"); } break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: { - log_it(L_INFO, "In from "NODE_ADDR_FP_STR": SYNCED_ALL net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x, - NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr), l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - } break; - default: { s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, @@ -1491,7 +1233,7 @@ static void s_stream_ch_io_complete(dap_events_socket_t *a_es, void *a_arg, int return; dap_stream_ch_t *l_ch = NULL; for (size_t i = 0; i < l_stream->channel_count; i++) - if (l_stream->channel[i]->proc->id == DAP_STREAM_CH_ID) + if (l_stream->channel[i]->proc->id == DAP_STREAM_CH_CHAIN_ID) l_ch = l_stream->channel[i]; if (!l_ch || !DAP_STREAM_CH_CHAIN(l_ch)) return; diff --git a/modules/channel/chain/dap_stream_ch_chain_pkt.c b/modules/chain/dap_chain_ch_pkt.c similarity index 90% rename from modules/channel/chain/dap_stream_ch_chain_pkt.c rename to modules/chain/dap_chain_ch_pkt.c index e4b48b5a6bf042d36cf7f425b7d24b720362b662..743fb83a2ed2132609bb8e39ffe883ce59f2ed60 100644 --- a/modules/channel/chain/dap_stream_ch_chain_pkt.c +++ b/modules/chain/dap_chain_ch_pkt.c @@ -18,30 +18,11 @@ #include "dap_stream_ch.h" #include "dap_stream_worker.h" #include "dap_stream_ch_pkt.h" -#include "dap_stream_ch_chain_pkt.h" +#include "dap_chain_ch_pkt.h" #include "dap_chain.h" #define LOG_TAG "dap_stream_ch_chain_pkt" - -/** - * @brief dap_stream_ch_chain_pkt_to_dap_stream_ch_chain_state - * @param a_state - * @return - */ -dap_stream_ch_chain_state_t dap_stream_ch_chain_pkt_type_to_dap_stream_ch_chain_state(char a_state) -{ - switch (a_state) { - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: - return CHAIN_STATE_SYNC_ALL; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: - return CHAIN_STATE_SYNC_GLOBAL_DB; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: - return CHAIN_STATE_SYNC_CHAINS; - } - return CHAIN_STATE_IDLE; -} - /** * @brief dap_stream_ch_net_pkt_write * @param sid diff --git a/modules/chain/include/dap_chain.h b/modules/chain/include/dap_chain.h index 08cd3ab3bc0b0dca131845cebc0c3452c14b5ad0..cd90370807ec279a8e1c5a1910160f5c7cd23b3e 100644 --- a/modules/chain/include/dap_chain.h +++ b/modules/chain/include/dap_chain.h @@ -259,7 +259,7 @@ void dap_chain_delete(dap_chain_t * a_chain); void dap_chain_add_callback_notify(dap_chain_t * a_chain, dap_chain_callback_notify_t a_callback, void * a_arg); dap_chain_atom_ptr_t dap_chain_get_atom_by_hash(dap_chain_t * a_chain, dap_chain_hash_fast_t * a_atom_hash, size_t * a_atom_size); bool dap_chain_get_atom_last_hash(dap_chain_t *a_chain, dap_hash_fast_t *a_atom_hash, dap_chain_cell_id_t a_cel_id); -ssize_t dap_chain_atom_save(dap_chain_t *a_chain, const uint8_t *a_atom, size_t a_atom_size, dap_chain_cell_id_t a_cell_id); +ssize_t dap_chain_atom_save(dap_chain_cell_t *a_chain_cell, const uint8_t *a_atom, size_t a_atom_size, dap_hash_fast_t *a_new_atom_hash); int dap_cert_chain_file_save(dap_chain_datum_t *datum, char *net_name); const char* dap_chain_get_path(dap_chain_t *a_chain); diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/chain/include/dap_chain_ch.h similarity index 93% rename from modules/channel/chain/include/dap_stream_ch_chain.h rename to modules/chain/include/dap_chain_ch.h index 830725239d7b03560b42190dbf52f93eb31e2131..0685aaf4124365ddd82bf11d2a6c480d1deb382c 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/chain/include/dap_chain_ch.h @@ -28,9 +28,8 @@ #include "dap_chain_common.h" #include "dap_chain.h" -#include "dap_chain_node_client.h" #include "dap_list.h" -#include "dap_stream_ch_chain_pkt.h" +#include "dap_chain_ch_pkt.h" #include "uthash.h" #include "dap_global_db_cluster.h" @@ -89,11 +88,10 @@ typedef struct dap_stream_ch_chain { #define DAP_STREAM_CH_CHAIN(a) ((dap_stream_ch_chain_t *) ((a)->internal) ) #define DAP_STREAM_CH(a) ((dap_stream_ch_t *)((a)->_inheritor)) #define DAP_CHAIN_PKT_EXPECT_SIZE 7168 -#define DAP_STREAM_CH_ID 'C' +#define DAP_STREAM_CH_CHAIN_ID 'C' int dap_stream_ch_chain_init(void); void dap_stream_ch_chain_deinit(void); -void dap_stream_ch_chain_create_sync_request_gdb(dap_stream_ch_chain_t * a_ch_chain, dap_chain_net_t * a_net); void dap_stream_ch_chain_timer_start(dap_stream_ch_chain_t *a_ch_chain); void dap_stream_ch_chain_reset_unsafe(dap_stream_ch_chain_t *a_ch_chain); diff --git a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h b/modules/chain/include/dap_chain_ch_pkt.h similarity index 86% rename from modules/channel/chain/include/dap_stream_ch_chain_pkt.h rename to modules/chain/include/dap_chain_ch_pkt.h index 69af3d189fa88cb171f5f44d285d2deaff645ac8..2bbea66a36a29fc76ce75951ec5f503671c448a1 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h +++ b/modules/chain/include/dap_chain_ch_pkt.h @@ -38,43 +38,30 @@ #define DAP_STREAM_CH_CHAIN_PKT_VERSION 0x01 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN 0x01 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB 0x11 - -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN 0x20 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB 0x21 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP 0x31 - -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS 0x02 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB 0x12 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_ALL 0x22 - -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS 0x03 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB 0x13 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL 0x23 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP 0x33 - -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS 0x04 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS 0x14 - - #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ 0x05 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD 0x15 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START 0x25 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS 0x35 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END 0x45 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN 0x20 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN 0x01 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS 0x03 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ 0x06 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD 0x16 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_START 0x26 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB 0x36 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END 0x46 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB 0x21 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB 0x11 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB 0x13 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_DELETE 0xda #define DAP_STREAM_CH_CHAIN_PKT_TYPE_TIMEOUT 0xfe #define DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR 0xff // TSD sections +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD 0x15 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD 0x16 + #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_PROTO 0x0001 // Protocol version #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_COUNT 0x0002 // Items count #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_HASH_LAST 0x0003 // Hash of last(s) item @@ -108,20 +95,16 @@ typedef struct dap_stream_ch_chain_sync_request{ } DAP_ALIGN_PACKED dap_stream_ch_chain_sync_request_t; -typedef struct dap_stream_ch_chain_pkt_hdr{ - union{ - struct{ - uint8_t version; - uint8_t padding[7]; - } DAP_ALIGN_PACKED; - uint64_t ext_id; - }DAP_ALIGN_PACKED; +typedef struct dap_stream_ch_chain_pkt_hdr { + uint8_t version; + uint8_t padding[3]; + uint32_t data_size; dap_chain_net_id_t net_id; dap_chain_id_t chain_id; dap_chain_cell_id_t cell_id; } DAP_ALIGN_PACKED dap_stream_ch_chain_pkt_hdr_t; -typedef struct dap_stream_ch_chain_pkt{ +typedef struct dap_stream_ch_chain_pkt { dap_stream_ch_chain_pkt_hdr_t hdr; uint8_t data[]; } DAP_ALIGN_PACKED dap_stream_ch_chain_pkt_t; @@ -129,18 +112,12 @@ typedef struct dap_stream_ch_chain_pkt{ static const char* c_dap_stream_ch_chain_pkt_type_str[]={ [DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN", [DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB", - [DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS", - [DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB", - [DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_ALL] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_ALL", [DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS", [DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB", - [DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL", [DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR" }; -dap_stream_ch_chain_state_t dap_stream_ch_chain_pkt_type_to_dap_stream_ch_chain_state(char a_state); - size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size); diff --git a/modules/channel/chain-net-srv/CMakeLists.txt b/modules/channel/chain-net-srv/CMakeLists.txt index 993f891e6557851315bfd8a989262036bf4c8656..39a9e5e9c766ef5c3d620a59e5d2f618c7f6a918 100644 --- a/modules/channel/chain-net-srv/CMakeLists.txt +++ b/modules/channel/chain-net-srv/CMakeLists.txt @@ -6,7 +6,7 @@ file(GLOB DAP_STREAM_CH_CHAIN_NET_SRV_HDRS include/*.h) add_library(${PROJECT_NAME} STATIC ${DAP_STREAM_CH_CHAIN_NET_SRV_SRCS} ${DAP_STREAM_CH_CHAIN_NET_SRV_HDRS}) -target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_chain_common dap_chain dap_chain_mempool dap_chain_net dap_chain_net_srv dap_io dap_stream dap_stream_ch dap_stream_ch_chain dap_stream_ch_chain_net) +target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_chain_common dap_chain dap_chain_mempool dap_chain_net dap_chain_net_srv dap_io dap_stream dap_stream_ch dap_stream_ch_chain_net) target_include_directories(${PROJECT_NAME} INTERFACE .) target_include_directories(${PROJECT_NAME} PUBLIC include) @@ -18,4 +18,4 @@ INSTALL(TARGETS ${PROJECT_NAME} ARCHIVE DESTINATION lib/modules/channel/chain-net-srv/ PUBLIC_HEADER DESTINATION include/modules/channel/chain-net-srv/ ) -endif() \ No newline at end of file +endif() diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c index 2481153ee9e1ba31919fad4cc0a66ef6eab738eb..14749e119fc946f9cb7f4b1fa57fd50142fce62e 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c @@ -133,7 +133,7 @@ static pthread_mutex_t s_ht_grace_table_mutex; int dap_stream_ch_chain_net_srv_init(void) { log_it(L_NOTICE,"Chain network services channel initialized"); - dap_stream_ch_proc_add(DAP_STREAM_CH_ID_NET_SRV, s_stream_ch_new,s_stream_ch_delete,s_stream_ch_packet_in,s_stream_ch_packet_out); + dap_stream_ch_proc_add(DAP_STREAM_CH_NET_SRV_ID, s_stream_ch_new,s_stream_ch_delete,s_stream_ch_packet_in,s_stream_ch_packet_out); pthread_mutex_init(&s_ht_grace_table_mutex, NULL); return 0; diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.h b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.h index 3346512aa75e30fe3096705b04f3649f74851fde..18d3f1a11204de86bffdce08f40d4ad26b7a3af1 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.h +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.h @@ -53,7 +53,7 @@ typedef struct dap_stream_ch_chain_net_srv { } dap_stream_ch_chain_net_srv_t; #define DAP_STREAM_CH_CHAIN_NET_SRV(a) ((dap_stream_ch_chain_net_srv_t *) ((a)->internal) ) -#define DAP_STREAM_CH_ID_NET_SRV 'R' +#define DAP_STREAM_CH_NET_SRV_ID 'R' int dap_stream_ch_chain_net_srv_init(void); diff --git a/modules/channel/chain-net/CMakeLists.txt b/modules/channel/chain-net/CMakeLists.txt index 7f854714d53d638d37fe176b556289393ffa6846..cd648793930ffe5f0616ad46460e4b51783ab98e 100644 --- a/modules/channel/chain-net/CMakeLists.txt +++ b/modules/channel/chain-net/CMakeLists.txt @@ -6,8 +6,7 @@ file(GLOB DAP_STREAM_CH_CHAIN_NET_HDRS include/*.h) add_library(${PROJECT_NAME} STATIC ${DAP_STREAM_CH_CHAIN_NET_SRCS} ${DAP_STREAM_CH_CHAIN_NET_HDRS}) -target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_stream dap_stream_ch dap_stream_ch_chain - dap_chain_net dap_chain_net_srv_stake) +target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_stream dap_stream_ch dap_chain_net dap_chain_net_srv_stake) target_include_directories(${PROJECT_NAME} INTERFACE .) target_include_directories(${PROJECT_NAME} PUBLIC include) @@ -20,4 +19,4 @@ INSTALL(TARGETS ${PROJECT_NAME} ARCHIVE DESTINATION lib/modules/channel/chain-net/ PUBLIC_HEADER DESTINATION include/modules/channel/chain-net/ ) -endif() \ No newline at end of file +endif() diff --git a/modules/channel/chain-net/dap_stream_ch_chain_net.c b/modules/channel/chain-net/dap_stream_ch_chain_net.c index 0ffbd23c38c5c769801edbd03936129f287c1c1f..129ff0c22f2f4550c714e322f848783bd3073893 100644 --- a/modules/channel/chain-net/dap_stream_ch_chain_net.c +++ b/modules/channel/chain-net/dap_stream_ch_chain_net.c @@ -69,7 +69,7 @@ static void s_stream_ch_packet_out(dap_stream_ch_t* ch, void* arg); int dap_stream_ch_chain_net_init() { log_it(L_NOTICE, "Chain network channel initialized"); - dap_stream_ch_proc_add(DAP_STREAM_CH_ID_NET, s_stream_ch_new, s_stream_ch_delete, + dap_stream_ch_proc_add(DAP_STREAM_CH_NET_ID, s_stream_ch_new, s_stream_ch_delete, s_stream_ch_packet_in, s_stream_ch_packet_out); return 0; @@ -156,9 +156,16 @@ void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void* a_arg) l_ch_chain_net_pkt->hdr.net_id, l_err_str, sizeof(l_err_str)); return; } + /*if (dap_chain_net_get_state(l_net) == NET_STATE_OFFLINE) { + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + "ERROR_NET_IS_OFFLINE"); + a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; + return; + }*/ switch (l_ch_pkt->hdr.type) { case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ANNOUNCE: - assert(dap_stream_node_addr_not_null(&a_ch->stream->node)); + assert(!dap_stream_node_addr_is_blank(&a_ch->stream->node)); dap_chain_net_add_cluster_link(l_net, &a_ch->stream->node); break; // received ping request - > send pong request diff --git a/modules/channel/chain-net/dap_stream_ch_chain_net_pkt.c b/modules/channel/chain-net/dap_stream_ch_chain_net_pkt.c index c43d17b8ca0e1506c0c0f509c6bc9f87c5ed97ff..24be3ab116c691b8d1641316cf3be559f4fe1f30 100644 --- a/modules/channel/chain-net/dap_stream_ch_chain_net_pkt.c +++ b/modules/channel/chain-net/dap_stream_ch_chain_net_pkt.c @@ -19,7 +19,6 @@ #include <dap_stream.h> #include <dap_stream_pkt.h> #include <dap_stream_ch_pkt.h> -#include "dap_stream_ch_chain_pkt.h" #include "dap_stream_ch_chain_net.h" #include "dap_stream_ch_chain_net_pkt.h" diff --git a/modules/channel/chain-net/include/dap_stream_ch_chain_net.h b/modules/channel/chain-net/include/dap_stream_ch_chain_net.h index 6838b09975b30d6d27be1d681819dc2150fe8bec..d9518d618364f34de559766974590d8824b9c6c7 100644 --- a/modules/channel/chain-net/include/dap_stream_ch_chain_net.h +++ b/modules/channel/chain-net/include/dap_stream_ch_chain_net.h @@ -61,7 +61,7 @@ typedef struct dap_stream_ch_chain_validator_test{ #define D_SIGN 0x40//data signed #define F_CERT 0x80//faund sert -#define DAP_STREAM_CH_ID_NET 'N' +#define DAP_STREAM_CH_NET_ID 'N' #define DAP_STREAM_CH_CHAIN_NET(a) ((dap_stream_ch_chain_net_t *) ((a)->internal) ) dap_chain_node_addr_t dap_stream_ch_chain_net_from_session_data_extract_node_addr(uint32_t a_session_id); diff --git a/modules/channel/chain-voting/dap_stream_ch_chain_voting.c b/modules/channel/chain-voting/dap_stream_ch_chain_voting.c index 04088bc2697aa2a47422dae269b701ce2ef3132f..bc51e1c74d2febc7eae877b0bfefde7d83b73ae1 100644 --- a/modules/channel/chain-voting/dap_stream_ch_chain_voting.c +++ b/modules/channel/chain-voting/dap_stream_ch_chain_voting.c @@ -3,8 +3,7 @@ #include "dap_stream_ch_pkt.h" #include "dap_stream_ch.h" #include "dap_stream_ch_proc.h" -#include "dap_stream_ch_chain.h" -#include "dap_stream_ch_chain_pkt.h" +#include "dap_chain_ch.h" #include "dap_stream_ch_chain_voting.h" #include "dap_chain_net.h" #include "dap_client_pvt.h" @@ -42,7 +41,7 @@ int dap_stream_ch_chain_voting_init() log_it(L_NOTICE, "Chains voting channel initialized"); pthread_rwlock_init(&s_node_client_list_lock, NULL); - dap_stream_ch_proc_add(DAP_STREAM_CH_ID_VOTING, + dap_stream_ch_proc_add(DAP_STREAM_CH_VOTING_ID, s_stream_ch_new, s_stream_ch_delete, s_stream_ch_packet_in, @@ -59,7 +58,7 @@ void dap_stream_ch_chain_voting_in_callback_add(void* a_arg, dap_chain_voting_ch s_pkt_in_callback_count++; } -static bool s_callback_pkt_in_call_all(dap_proc_thread_t UNUSED_ARG *a_thread, void *a_arg) +static bool s_callback_pkt_in_call_all(void *a_arg) { dap_stream_ch_chain_voting_pkt_t *l_voting_pkt = a_arg; for (size_t i = 0; i < s_pkt_in_callback_count; i++) { @@ -99,7 +98,7 @@ void dap_stream_ch_chain_voting_message_write(dap_chain_net_t *a_net, dap_chain_ log_it(L_WARNING, "Can't find validator's addr "NODE_ADDR_FP_STR" in database", NODE_ADDR_FP_ARGS(a_remote_node_addr)); return; } - char l_channels[] = { DAP_STREAM_CH_ID_VOTING, '\0' }; + char l_channels[] = { DAP_STREAM_CH_VOTING_ID, '\0' }; dap_chain_node_client_t *l_node_client = dap_chain_node_client_connect_channels(a_net, l_node_info, l_channels); if (!l_node_client || !l_node_client->client) { log_it(L_ERROR, "Can't connect to remote node "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS(a_remote_node_addr)); @@ -124,7 +123,7 @@ void dap_stream_ch_chain_voting_message_write(dap_chain_net_t *a_net, dap_chain_ log_it(L_ERROR, "NULL node_client in item of voting channel"); return; } - dap_chain_node_client_write_mt(l_node_client_item->node_client, DAP_STREAM_CH_ID_VOTING, + dap_chain_node_client_write_mt(l_node_client_item->node_client, DAP_STREAM_CH_VOTING_ID, DAP_STREAM_CH_CHAIN_VOTING_PKT_TYPE_DATA, a_voting_pkt, l_voting_pkt_size); } else diff --git a/modules/channel/chain-voting/include/dap_stream_ch_chain_voting.h b/modules/channel/chain-voting/include/dap_stream_ch_chain_voting.h index f55f67014b3fbedfd086279a0d3032c471ecf504..5f55d10dea0daee4f7209fb5d484fbf5ba52f438 100644 --- a/modules/channel/chain-voting/include/dap_stream_ch_chain_voting.h +++ b/modules/channel/chain-voting/include/dap_stream_ch_chain_voting.h @@ -12,7 +12,7 @@ #define DAP_STREAM_CH_CHAIN_VOTING_PKT_TYPE_TEST 0x02 #define DAP_STREAM_CH_CHAIN_VOTING_PKT_TYPE_ERROR 0xFF -#define DAP_STREAM_CH_ID_VOTING 'V' +#define DAP_STREAM_CH_VOTING_ID 'V' typedef void (*dap_chain_voting_ch_callback_t)(void *a_arg, dap_chain_node_addr_t *a_sender_addr, dap_chain_node_addr_t *a_receiver_addr, dap_chain_hash_fast_t *a_data_hash, uint8_t *a_data, size_t a_data_size); diff --git a/modules/channel/chain/CMakeLists.txt b/modules/channel/chain/CMakeLists.txt deleted file mode 100644 index 66387a0de86452ea111e52b2c7084cf96b0a5d95..0000000000000000000000000000000000000000 --- a/modules/channel/chain/CMakeLists.txt +++ /dev/null @@ -1,22 +0,0 @@ -cmake_minimum_required(VERSION 3.10) -project (dap_stream_ch_chain) - -file(GLOB DAP_STREAM_CH_CHAIN_SRCS *.c) -file(GLOB DAP_STREAM_CH_CHAIN_HDRS include/*.h) - -add_library(${PROJECT_NAME} STATIC ${DAP_STREAM_CH_CHAIN_SRCS} ${DAP_STREAM_CH_CHAIN_HDRS}) - -target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_chain dap_chain_net dap_global_db dap_stream dap_stream_ch) - -target_include_directories(${PROJECT_NAME} INTERFACE .) -target_include_directories(${PROJECT_NAME} PUBLIC include) -target_include_directories(${PROJECT_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../../../../dap-sdk/3rdparty/uthash/src) - -if (INSTALL_SDK) -set_target_properties(${PROJECT_NAME} PROPERTIES PUBLIC_HEADER "${DAP_STREAM_CH_CHAIN_HDRS}") -INSTALL(TARGETS ${PROJECT_NAME} - LIBRARY DESTINATION lib/modules/channel/chain/ - ARCHIVE DESTINATION lib/modules/channel/chain/ - PUBLIC_HEADER DESTINATION include/modules/channel/chain/ -) -endif() \ No newline at end of file diff --git a/modules/common/include/dap_chain_common.h b/modules/common/include/dap_chain_common.h index 2e59ae9b82542a03b6b28d50a40dacdcbd18589c..6939ab164d8a8129834a7c13c32b8d5bcb642ec4 100644 --- a/modules/common/include/dap_chain_common.h +++ b/modules/common/include/dap_chain_common.h @@ -82,7 +82,7 @@ typedef union dap_chain_node_role{ typedef dap_stream_node_addr_t dap_chain_node_addr_t; #define dap_chain_node_addr_str_check dap_stream_node_addr_str_check #define dap_chain_node_addr_from_str dap_stream_node_addr_from_str -#define dap_chain_node_addr_not_null dap_stream_node_addr_not_null +#define dap_chain_node_addr_is_blank dap_stream_node_addr_is_blank typedef union dap_chain_net_id{ uint64_t uint64; diff --git a/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c b/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c index 76388e5da89764ed2957b9f8b604657afa648ea2..2c731036bd1cf1c5865c9b5056cad6533356dfb3 100644 --- a/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c +++ b/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c @@ -612,23 +612,23 @@ static bool s_callback_round_event_to_chain_callback_get_round_item(dap_global_d dap_chain_cs_dag_event_round_item_t *l_chosen_item = s_round_event_choose_dup(l_dups_list, l_max_signs_count); if (l_chosen_item) { size_t l_event_size = l_chosen_item->event_size; - dap_chain_cs_dag_event_t *l_new_atom = (dap_chain_cs_dag_event_t *)DAP_DUP_SIZE(l_chosen_item->event_n_signs, l_event_size); - char *l_event_hash_hex_str; - dap_get_data_hash_str_static(l_new_atom, l_event_size, l_event_hash_hex_str); + dap_chain_cs_dag_event_t *l_new_atom = (dap_chain_cs_dag_event_t *)l_chosen_item->event_n_signs; + dap_hash_fast_t l_event_hash; + dap_hash_fast(l_new_atom, l_event_size, &l_event_hash); + char *l_event_hash_hex_str = DAP_NEW_STACK_SIZE(char, DAP_CHAIN_HASH_FAST_STR_SIZE); + dap_chain_hash_fast_to_str(&l_event_hash, l_event_hash_hex_str, DAP_CHAIN_HASH_FAST_STR_SIZE); dap_chain_datum_t *l_datum = dap_chain_cs_dag_event_get_datum(l_new_atom, l_event_size); l_dag->round_completed = dap_max(l_new_atom->header.round_id, l_dag->round_current); int l_verify_datum = dap_chain_net_verify_datum_for_add(l_dag->chain, l_datum, &l_chosen_item->round_info.datum_hash); if (!l_verify_datum) { dap_chain_atom_verify_res_t l_res = l_dag->chain->callback_atom_add(l_dag->chain, l_new_atom, l_event_size); if (l_res == ATOM_ACCEPT) { - dap_chain_atom_save(l_dag->chain, (dap_chain_atom_ptr_t)l_new_atom, l_event_size, l_dag->chain->cells->id); + dap_chain_atom_save(l_dag->chain->cells, (dap_chain_atom_ptr_t)l_new_atom, l_event_size, &l_event_hash); s_poa_round_clean(l_dag->chain); - } else if (l_res != ATOM_MOVE_TO_THRESHOLD) - DAP_DELETE(l_new_atom); + } log_it(L_INFO, "Event %s from round %"DAP_UINT64_FORMAT_U" %s", l_event_hash_hex_str, l_round_id, dap_chain_atom_verify_res_str[l_res]); } else { - DAP_DELETE(l_new_atom); char l_datum_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE]; dap_chain_hash_fast_to_str(&l_chosen_item->round_info.datum_hash, l_datum_hash_str, DAP_CHAIN_HASH_FAST_STR_SIZE); log_it(L_INFO, "Event %s from round %"DAP_UINT64_FORMAT_U" not added into chain, because the inner datum %s doesn't pass verification (%s)", diff --git a/modules/consensus/esbocs/dap_chain_cs_esbocs.c b/modules/consensus/esbocs/dap_chain_cs_esbocs.c index f60f227347758d18a9f67ec475db570ff31416a0..009d51244a7c9878107f981e27872748caa813c3 100644 --- a/modules/consensus/esbocs/dap_chain_cs_esbocs.c +++ b/modules/consensus/esbocs/dap_chain_cs_esbocs.c @@ -232,6 +232,7 @@ static int s_callback_new(dap_chain_t *a_chain, dap_config_t *a_chain_cfg) } char l_cert_name[512]; dap_cert_t *l_cert_cur; + dap_chain_net_t *l_net = dap_chain_net_by_id(a_chain->net_id); for (size_t i = 0; i < l_auth_certs_count; i++) { snprintf(l_cert_name, sizeof(l_cert_name), "%s.%zu", l_auth_certs_prefix, i); if ((l_cert_cur = dap_cert_find_by_name(l_cert_name)) == NULL) { @@ -266,20 +267,19 @@ static int s_callback_new(dap_chain_t *a_chain, dap_config_t *a_chain_cfg) l_validator->weight = uint256_1; l_esbocs_pvt->poa_validators = dap_list_append(l_esbocs_pvt->poa_validators, l_validator); - dap_chain_net_t *l_net = dap_chain_net_by_id(a_chain->net_id); if (!l_esbocs_pvt->poa_mode) { // auth certs in PoA mode will be first PoS validators keys dap_hash_fast_t l_stake_tx_hash = {}; uint256_t l_weight = dap_chain_net_srv_stake_get_allowed_min_value(); dap_chain_net_srv_stake_key_delegate(l_net, &l_signing_addr, &l_stake_tx_hash, l_weight, &l_signer_node_addr); } - // Preset reward for block signs, before first reward decree - const char *l_preset_reward_str = dap_config_get_item_str(a_chain_cfg, "esbocs", "preset_reward"); - if (l_preset_reward_str) { - uint256_t l_preset_reward = dap_chain_balance_scan(l_preset_reward_str); - if (!IS_ZERO_256(l_preset_reward)) - dap_chain_net_add_reward(l_net, l_preset_reward, 0); - } + } + // Preset reward for block signs, before first reward decree + const char *l_preset_reward_str = dap_config_get_item_str(a_chain_cfg, "esbocs", "preset_reward"); + if (l_preset_reward_str) { + uint256_t l_preset_reward = dap_chain_balance_scan(l_preset_reward_str); + if (!IS_ZERO_256(l_preset_reward)) + dap_chain_net_add_reward(l_net, l_preset_reward, 0); } l_blocks->chain->callback_created = s_callback_created; @@ -468,7 +468,7 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf l_session->my_signing_addr = l_my_signing_addr; char *l_sync_group = s_get_penalty_group(l_net->pub.id); l_session->db_cluster = dap_global_db_cluster_add(dap_global_db_instance_get_default(), - l_sync_group, l_sync_group, + NULL, 0, l_sync_group, 72 * 3600, true, DAP_GDB_MEMBER_ROLE_NOBODY, DAP_CLUSTER_ROLE_AUTONOMIC); DAP_DELETE(l_sync_group); @@ -509,6 +509,7 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf if (IS_ZERO_256(l_esbocs_pvt->minimum_fee)) { log_it(L_ERROR, "No valid order found was signed by this validator deledgated key. Switch off validator mode."); + DAP_DEL_Z(l_esbocs->session); return -4; } pthread_mutexattr_t l_mutex_attr; @@ -1532,13 +1533,12 @@ static bool s_session_candidate_to_chain(dap_chain_esbocs_session_t *a_session, return false; } bool res = false; - dap_chain_block_t *l_candidate = DAP_DUP_SIZE(a_candidate, a_candidate_size); - dap_chain_atom_verify_res_t l_res = a_session->chain->callback_atom_add(a_session->chain, l_candidate, a_candidate_size); + dap_chain_atom_verify_res_t l_res = a_session->chain->callback_atom_add(a_session->chain, a_candidate, a_candidate_size); char *l_candidate_hash_str = dap_chain_hash_fast_to_str_new(a_candidate_hash); switch (l_res) { case ATOM_ACCEPT: // block save to chain - if (dap_chain_atom_save(a_session->chain, (uint8_t *)l_candidate, a_candidate_size, a_session->chain->cells->id) < 0) + if (dap_chain_atom_save(a_session->chain->cells, (uint8_t *)a_candidate, a_candidate_size, a_candidate_hash) < 0) log_it(L_ERROR, "Can't save atom %s to the file", l_candidate_hash_str); else { @@ -1551,15 +1551,12 @@ static bool s_session_candidate_to_chain(dap_chain_esbocs_session_t *a_session, break; case ATOM_PASS: log_it(L_WARNING, "Atom with hash %s not accepted (code ATOM_PASS, already present)", l_candidate_hash_str); - DAP_DELETE(l_candidate); break; case ATOM_REJECT: log_it(L_WARNING,"Atom with hash %s rejected", l_candidate_hash_str); - DAP_DELETE(l_candidate); break; default: log_it(L_CRITICAL, "Wtf is this ret code ? Atom hash %s code %d", l_candidate_hash_str, l_res); - DAP_DELETE(l_candidate); } DAP_DELETE(l_candidate_hash_str); return res; diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 36f9f2d91381b7cb3cd945a40796966dbc5cf834..5934ebda6f952918d115c3bb0da68d09d68817eb 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -104,8 +104,7 @@ #include "dap_global_db.h" #include "dap_stream_ch_chain_net_pkt.h" #include "dap_stream_ch_chain_net.h" -#include "dap_stream_ch_chain.h" -#include "dap_stream_ch_chain_pkt.h" +#include "dap_chain_ch.h" #include "dap_stream_ch.h" #include "dap_stream.h" #include "dap_stream_ch_pkt.h" @@ -244,7 +243,7 @@ static const dap_chain_node_client_callbacks_t s_node_link_callbacks = { }; // State machine switchs here -static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg); +static bool s_net_states_proc(void *a_arg); struct json_object *s_net_states_json_collect(dap_chain_net_t * l_net); @@ -382,7 +381,7 @@ int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_n } if (PVT(a_net)->state != NET_STATE_OFFLINE){ PVT(a_net)->state = PVT(a_net)->state_target = NET_STATE_OFFLINE; - s_net_states_proc(NULL, a_net); + s_net_states_proc(a_net); } PVT(a_net)->state_target = a_new_state; //PVT(a_net)->flags |= F_DAP_CHAIN_NET_SYNC_FROM_ZERO; // TODO set this flag according to -mode argument from command line @@ -429,7 +428,7 @@ dap_chain_node_info_t *dap_chain_net_balancer_link_from_cfg(dap_chain_net_t *a_n void dap_chain_net_add_cluster_link(dap_chain_net_t *a_net, dap_stream_node_addr_t *a_node_addr) { dap_return_if_fail(a_net && a_node_addr); - dap_cluster_t *l_links_cluster = dap_cluster_by_mnemonim(a_net->pub.gdb_groups_prefix); + dap_cluster_t *l_links_cluster = dap_cluster_by_mnemonim(a_net->pub.name); if (l_links_cluster) dap_cluster_member_add(l_links_cluster, a_node_addr, 0, NULL); else @@ -543,7 +542,7 @@ static bool s_net_link_callback_connect_delayed(void *a_arg) dap_chain_node_client_t *l_client = l_link->link; log_it(L_MSG, "Connecting to link "NODE_ADDR_FP_STR" [%s]", NODE_ADDR_FP_ARGS_S(l_client->info->hdr.address), inet_ntoa(l_client->info->hdr.ext_addr_v4)); - dap_chain_node_client_connect(l_client, "GND"); + dap_chain_node_client_connect(l_client, "CGND"); l_link->delay_timer = NULL; return false; } @@ -563,7 +562,7 @@ static bool s_net_link_start(dap_chain_net_t *a_net, struct net_link *a_link, ui return true; } log_it(L_MSG, "Connecting to link "NODE_ADDR_FP_STR" [%s]", NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address), inet_ntoa(l_link_info->hdr.ext_addr_v4)); - return dap_chain_node_client_connect(l_client, "GND"); + return dap_chain_node_client_connect(l_client, "CGND"); } /** @@ -574,7 +573,7 @@ static void s_fill_links_from_root_aliases(dap_chain_net_t * a_net) { dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); for (size_t i = 0; i < l_net_pvt->seed_nodes_count; i++) { - dap_chain_node_info_t l_link_node_info; + dap_chain_node_info_t l_link_node_info = {}; l_link_node_info.hdr.ext_addr_v4 = l_net_pvt->seed_nodes_ipv4[i].sin_addr; l_link_node_info.hdr.ext_port = l_net_pvt->seed_nodes_ipv4[i].sin_port; if (s_net_link_add(a_net, &l_link_node_info) > 0) // Maximum links count reached @@ -1071,9 +1070,8 @@ static void s_net_states_notify(dap_chain_net_t *a_net) * @brief s_net_states_proc * @param l_net */ -static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) +static bool s_net_states_proc(void *a_arg) { - UNUSED(a_thread); bool l_repeat_after_exit = false; // If true - repeat on next iteration of proc thread loop dap_chain_net_t *l_net = (dap_chain_net_t *) a_arg; assert(l_net); @@ -2405,7 +2403,7 @@ void dap_chain_net_delete(dap_chain_net_t *a_net) { // Synchronously going to offline state PVT(a_net)->state = PVT(a_net)->state_target = NET_STATE_OFFLINE; - s_net_states_proc(NULL, a_net); + s_net_states_proc(a_net); dap_chain_net_item_t *l_net_item; HASH_FIND(hh, s_net_items, a_net->pub.name, strlen(a_net->pub.name), l_net_item); if (l_net_item) { @@ -2842,8 +2840,8 @@ int s_net_load(dap_chain_net_t *a_net) l_gdb_groups_mask = dap_strdup_printf("%s.chain-%s.mempool", l_net->pub.gdb_groups_prefix, l_chain->name); dap_global_db_cluster_t *l_cluster = dap_global_db_cluster_add( dap_global_db_instance_get_default(), - l_net->pub.name, l_gdb_groups_mask, - DAP_CHAIN_NET_MEMPOOL_TTL, true, + l_net->pub.name, l_net->pub.id.uint64, + l_gdb_groups_mask, DAP_CHAIN_NET_MEMPOOL_TTL, true, DAP_GDB_MEMBER_ROLE_USER, DAP_CLUSTER_ROLE_EMBEDDED); if (!l_cluster) { @@ -2858,8 +2856,8 @@ int s_net_load(dap_chain_net_t *a_net) // Service orders cluster l_gdb_groups_mask = dap_strdup_printf("%s.service.orders", l_net->pub.gdb_groups_prefix); l_net_pvt->orders_cluster = dap_global_db_cluster_add(dap_global_db_instance_get_default(), - l_net->pub.name, l_gdb_groups_mask, - 0, true, + l_net->pub.name, l_net->pub.id.uint64, + l_gdb_groups_mask, 0, true, DAP_GDB_MEMBER_ROLE_GUEST, DAP_CLUSTER_ROLE_EMBEDDED); if (!l_net_pvt->orders_cluster) { @@ -2873,8 +2871,8 @@ int s_net_load(dap_chain_net_t *a_net) l_net->pub.gdb_nodes_aliases = dap_strdup_printf("%s.nodes.aliases",l_net->pub.gdb_groups_prefix); l_gdb_groups_mask = dap_strdup_printf("%s.nodes*", l_net->pub.gdb_groups_prefix); l_net_pvt->nodes_cluster = dap_global_db_cluster_add(dap_global_db_instance_get_default(), - l_net->pub.name, l_gdb_groups_mask, - 0, true, + l_net->pub.name, l_net->pub.id.uint64, + l_gdb_groups_mask, 0, true, DAP_GDB_MEMBER_ROLE_GUEST, DAP_CLUSTER_ROLE_EMBEDDED); if (!l_net_pvt->nodes_cluster) { diff --git a/modules/net/dap_chain_net_decree.c b/modules/net/dap_chain_net_decree.c index a6c105e3d5f368ce7a930adad08e6070732d45e6..b6db60d2cd5329a944647f118dd99f6f4aaacc63 100644 --- a/modules/net/dap_chain_net_decree.c +++ b/modules/net/dap_chain_net_decree.c @@ -34,7 +34,7 @@ #include "dap_chain_net_tx.h" #include "dap_chain_net_srv_stake_pos_delegate.h" #include "dap_chain_node_net_ban_list.h" -#include "dap_enc_http_ban_list_client.h" +#include "dap_http_ban_list_client.h" @@ -478,10 +478,10 @@ static int s_common_decree_handler(dap_chain_datum_decree_t *a_decree, dap_chain dap_hash_fast(a_decree, dap_chain_datum_decree_get_size(a_decree), &l_decree_hash); if (l_tsd->type == DAP_CHAIN_DATUM_DECREE_TSD_TYPE_IP_V4){ struct in_addr l_ip_addr = dap_tsd_get_scalar(l_tsd, struct in_addr); - dap_enc_http_ban_list_client_add_ipv4(l_ip_addr, l_decree_hash, a_decree->header.ts_created); + dap_http_ban_list_client_add_ipv4(l_ip_addr, l_decree_hash, a_decree->header.ts_created); } else if (l_tsd->type == DAP_CHAIN_DATUM_DECREE_TSD_TYPE_IP_V6){ struct in6_addr l_ip_addr = dap_tsd_get_scalar(l_tsd, struct in6_addr); - dap_enc_http_ban_list_client_add_ipv6(l_ip_addr, l_decree_hash, a_decree->header.ts_created); + dap_http_ban_list_client_add_ipv6(l_ip_addr, l_decree_hash, a_decree->header.ts_created); } else if (l_tsd->type == DAP_CHAIN_DATUM_DECREE_TSD_TYPE_NODE_ADDR){ dap_chain_node_addr_t l_addr_node = dap_tsd_get_scalar(l_tsd, dap_chain_node_addr_t); if (!dap_chain_node_net_ban_list_add_node_addr(l_addr_node, l_decree_hash, a_decree->header.ts_created, a_net)) @@ -509,10 +509,10 @@ static int s_common_decree_handler(dap_chain_datum_decree_t *a_decree, dap_chain dap_hash_fast(a_decree, dap_chain_datum_decree_get_size(a_decree), &l_decree_hash); if (l_tsd->type == DAP_CHAIN_DATUM_DECREE_TSD_TYPE_IP_V4){ struct in_addr l_ip_addr = dap_tsd_get_scalar(l_tsd, struct in_addr); - dap_enc_http_ban_list_client_remove_ipv4(l_ip_addr); + dap_http_ban_list_client_remove_ipv4(l_ip_addr); } else if (l_tsd->type == DAP_CHAIN_DATUM_DECREE_TSD_TYPE_IP_V6){ struct in6_addr l_ip_addr = dap_tsd_get_scalar(l_tsd, struct in6_addr); - dap_enc_http_ban_list_client_remove_ipv6(l_ip_addr); + dap_http_ban_list_client_remove_ipv6(l_ip_addr); } else if (l_tsd->type == DAP_CHAIN_DATUM_DECREE_TSD_TYPE_NODE_ADDR){ dap_chain_node_addr_t l_addr_node = dap_tsd_get_scalar(l_tsd, dap_chain_node_addr_t); dap_chain_node_net_ban_list_remove_node_addr(a_net, l_addr_node); diff --git a/modules/net/dap_chain_node_cli_cmd.c b/modules/net/dap_chain_node_cli_cmd.c index a247fed0567014c5ebd85abfbc493c892930662b..7a898eaf48199ecb1993cf67adfb1a5c7b499087 100644 --- a/modules/net/dap_chain_node_cli_cmd.c +++ b/modules/net/dap_chain_node_cli_cmd.c @@ -104,8 +104,7 @@ #include "dap_global_db_cluster.h" #include "dap_stream_ch_chain_net.h" -#include "dap_stream_ch_chain.h" -#include "dap_stream_ch_chain_pkt.h" +#include "dap_chain_ch.h" #include "dap_stream_ch_chain_net_pkt.h" #include "dap_enc_base64.h" #include "dap_chain_net_srv_stake_pos_delegate.h" @@ -482,8 +481,8 @@ static int node_info_dump_with_reply(dap_chain_net_t * a_net, dap_chain_node_add dap_global_db_obj_t *l_aliases_objs = dap_global_db_get_all_sync(a_net->pub.gdb_nodes_aliases, &l_data_size); for (size_t i = 0; i < l_nodes_count; i++) { dap_chain_node_info_t *l_node_info = (dap_chain_node_info_t*)l_objs[i].value; - if (!dap_chain_node_addr_not_null(&l_node_info->hdr.address)){ - log_it(L_ERROR, "Node address is NULL"); + if (dap_chain_node_addr_is_blank(&l_node_info->hdr.address)){ + log_it(L_ERROR, "Node address is empty"); continue; } /* @@ -787,7 +786,7 @@ int com_global_db(int a_argc, char ** a_argv, void **a_str_reply) dap_cli_server_cmd_set_reply_text(a_str_reply, "record already pinned"); break; } - if(dap_global_db_set_sync( l_group, l_key, l_value, l_value_len,true ) ==0 ){ + if(dap_global_db_set_sync( l_group, l_key, l_value, l_value_len, true) ==0 ){ dap_cli_server_cmd_set_reply_text(a_str_reply, "record successfully pinned"); } else{ @@ -1210,7 +1209,10 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) break; // make connect - case CMD_CONNECT: { + case CMD_CONNECT: + dap_cli_server_cmd_set_reply_text(a_str_reply, "Not implemented yet"); + break; +#if 0 // get address from alias if addr not defined if(alias_str && !l_node_addr.uint64) { dap_chain_node_addr_t *address_tmp = dap_chain_node_alias_find(l_net, alias_str); @@ -1317,7 +1319,7 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) log_it(L_NOTICE, "Stream connection established"); dap_stream_ch_chain_sync_request_t l_sync_request = {}; - dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, DAP_STREAM_CH_ID); + dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, DAP_STREAM_CH_CHAIN_ID); // fill begin id l_sync_request.id_start = 1; // fill current node address @@ -1390,6 +1392,7 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) return 0; } +#endif // make handshake case CMD_HANDSHAKE: { // get address from alias if addr not defined @@ -1506,6 +1509,7 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) l_key_str_out ? "" : " not"); DAP_DELETE(l_key_str_out); } break; + case CMD_UNBAN: { dap_chain_net_t *l_netl = NULL; dap_chain_t *l_chain = NULL; @@ -1567,14 +1571,16 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) l_key_str_out ? "" : " not"); DAP_DELETE(l_key_str_out); } break; + case CMD_BANLIST: { dap_string_t *l_str_ban_list = dap_string_new("Ban list:\n"); - dap_enc_http_ban_list_client_ipv4_print(l_str_ban_list); - dap_enc_http_ban_list_client_ipv6_print(l_str_ban_list); + dap_http_ban_list_client_ipv4_print(l_str_ban_list); + dap_http_ban_list_client_ipv6_print(l_str_ban_list); dap_chain_node_net_ban_list_print(l_str_ban_list); dap_cli_server_cmd_set_reply_text(a_str_reply, "%s", l_str_ban_list->str); dap_string_free(l_str_ban_list, true); } break; + case CMD_BALANCER: { //balancer link list size_t l_node_num = 0; @@ -1593,6 +1599,7 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) l_string_balanc->str); dap_string_free(l_string_balanc, true); } break; + default: dap_cli_server_cmd_set_reply_text(a_str_reply, "Unrecognized subcommand '%s'", arg_index < a_argc ? a_argv[arg_index] : "(null)"); diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 98afec04ffcb5401c46382aa0839b25529bdf52b..4d82fde476e55d04679c4498ef98bc3d273b3eba 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -61,8 +61,8 @@ #include "dap_chain_net_srv.h" #include "dap_stream_worker.h" #include "dap_stream_ch_pkt.h" -#include "dap_stream_ch_chain.h" -#include "dap_stream_ch_chain_pkt.h" +#include "dap_chain_ch.h" +#include "dap_chain_ch_pkt.h" #include "dap_stream_ch_chain_net.h" #include "dap_stream_ch_proc.h" #include "dap_stream_ch_chain_net_pkt.h" @@ -192,11 +192,12 @@ dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_chain_node_cli bool l_trylocked = dap_chain_net_sync_trylock(l_net, a_node_client); if (l_trylocked) { log_it(L_INFO, "Start synchronization process with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr)); - dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; - l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); - dap_stream_ch_chain_pkt_write_unsafe(a_node_client->ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ, - l_net->pub.id.uint64, 0, 0, - &l_sync_gdb, sizeof(l_sync_gdb)); + dap_stream_ch_chain_sync_request_t l_sync_chain = {}; + l_sync_chain.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); + dap_stream_ch_chain_pkt_write_unsafe(a_node_client->ch_chain, + DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ, + l_net->pub.id.uint64, l_net->pub.chains->id.uint64, 0, + &l_sync_chain, sizeof(l_sync_chain)); if (!l_ch_chain->activity_timer) dap_stream_ch_chain_timer_start(l_ch_chain); return NODE_SYNC_STATUS_STARTED; @@ -229,7 +230,7 @@ static bool s_timer_update_states_callback(void *a_arg) l_me->callbacks.disconnected(l_me, l_me->callbacks_arg); if (l_me->keep_connection) { log_it(L_INFO, "Reconnecting node client with peer "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_me->remote_node_addr)); - l_me->state = NODE_CLIENT_STATE_CONNECTING ; + l_me->state = NODE_CLIENT_STATE_CONNECTING; dap_client_go_stage(l_me->client, STAGE_STREAM_STREAMING, s_stage_connected_callback); } } @@ -930,7 +931,7 @@ static int s_node_client_set_notify_callbacks(dap_client_t *a_client, uint8_t a_ l_ret = 0; switch (a_ch_id) { // 'C' - case DAP_STREAM_CH_ID: { + case DAP_STREAM_CH_CHAIN_ID: { dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); l_ch_chain->callback_notify_packet_out = s_ch_chain_callback_notify_packet_out; l_ch_chain->callback_notify_packet_in = s_ch_chain_callback_notify_packet_in; @@ -940,7 +941,7 @@ static int s_node_client_set_notify_callbacks(dap_client_t *a_client, uint8_t a_ break; } // 'N' - case DAP_STREAM_CH_ID_NET: { + case DAP_STREAM_CH_NET_ID: { dap_stream_ch_chain_net_t *l_ch_chain = DAP_STREAM_CH_CHAIN_NET(l_ch); l_ch_chain->notify_callback = s_ch_chain_callback_notify_packet_in2; l_ch_chain->notify_callback_arg = l_node_client; @@ -949,7 +950,7 @@ static int s_node_client_set_notify_callbacks(dap_client_t *a_client, uint8_t a_ break; } // 'R' - case DAP_STREAM_CH_ID_NET_SRV: { + case DAP_STREAM_CH_NET_SRV_ID: { dap_stream_ch_chain_net_srv_t *l_ch_chain = DAP_STREAM_CH_CHAIN_NET_SRV(l_ch); if (l_node_client->notify_callbacks.srv_pkt_in) { l_ch_chain->notify_callback = (dap_stream_ch_chain_net_srv_callback_packet_t)l_node_client->notify_callbacks.srv_pkt_in; @@ -963,7 +964,7 @@ static int s_node_client_set_notify_callbacks(dap_client_t *a_client, uint8_t a_ break; } // 'V' - case DAP_STREAM_CH_ID_VOTING: { + case DAP_STREAM_CH_VOTING_ID: { dap_stream_ch_chain_voting_t *l_ch_chain = DAP_STREAM_CH_CHAIN_VOTING(l_ch); // l_ch_chain->callback_notify = s_ch_chain_callback_notify_voting_packet_in; l_ch_chain->callback_notify_arg = l_node_client; @@ -973,7 +974,7 @@ static int s_node_client_set_notify_callbacks(dap_client_t *a_client, uint8_t a_ } default: { l_ret = -2; - log_it(L_ERROR, "Unknown channel id %d", a_ch_id); + log_it(L_ERROR, "Unknown channel id %d (%c)", a_ch_id, a_ch_id); break; } } diff --git a/modules/net/dap_chain_node_net_ban_list.c b/modules/net/dap_chain_node_net_ban_list.c index 9df4d9d395730a1badac8455d5bcb872d8bb1025..4ee911be5db6eb038bd9cb92c1719b4e6051fe91 100644 --- a/modules/net/dap_chain_node_net_ban_list.c +++ b/modules/net/dap_chain_node_net_ban_list.c @@ -43,7 +43,7 @@ bool dap_chain_node_net_ban_list_add_node_addr(dap_chain_node_addr_t node_addr, //Resolve addr to ip struct in_addr l_in; if (s_chain_node_net_ban_list_addr_resolve_ip_v4(a_net, node_addr, &l_in)) { - dap_enc_http_ban_list_client_add_ipv4(l_in, a_decree_hash, a_time_created); + dap_http_ban_list_client_add_ipv4(l_in, a_decree_hash, a_time_created); l_record->node_addr = node_addr; l_record->decree_hash = a_decree_hash; l_record->ts_created_decree = a_time_created; @@ -88,7 +88,7 @@ void dap_chain_node_net_ban_list_remove_node_addr(dap_chain_net_t *a_net, dap_ch if (l_record->node_addr.uint64 == node_addr.uint64) { struct in_addr l_in; if (s_chain_node_net_ban_list_addr_resolve_ip_v4(a_net, l_record->node_addr, &l_in)) { - dap_enc_http_ban_list_client_remove_ipv4(l_in); + dap_http_ban_list_client_remove_ipv4(l_in); } else { log_it(L_WARNING, "Can't resolve node address "NODE_ADDR_FP_STR" to ip at remove node addr from ban list", NODE_ADDR_FP_ARGS_S(l_record->node_addr)); diff --git a/modules/net/include/dap_chain_node.h b/modules/net/include/dap_chain_node.h index 4fa078bf0e8593c4fd5b6bf065e8742497035cff..7c11e247ce5aee7274925b93ef27c4cd4d945201 100644 --- a/modules/net/include/dap_chain_node.h +++ b/modules/net/include/dap_chain_node.h @@ -53,7 +53,7 @@ typedef struct dap_chain_node_info { typedef dap_stream_node_addr_t dap_chain_node_addr_t; #define dap_chain_node_addr_str_check dap_stream_node_addr_str_check #define dap_chain_node_addr_from_str dap_stream_node_addr_from_str -#define dap_chain_node_addr_not_null dap_stream_node_addr_not_null +#define dap_chain_node_addr_is_blank dap_stream_node_addr_is_blank /** * Calculate size of struct dap_chain_node_info_t diff --git a/modules/net/include/dap_chain_node_net_ban_list.h b/modules/net/include/dap_chain_node_net_ban_list.h index 8a26fc023555273eca5c1548edfe13033fddf62a..a017d7c9f93715961e660db6aa1e5e4fad583cf6 100644 --- a/modules/net/include/dap_chain_node_net_ban_list.h +++ b/modules/net/include/dap_chain_node_net_ban_list.h @@ -10,7 +10,7 @@ #include "dap_chain_common.h" #include "dap_hash.h" #include "dap_time.h" -#include "dap_enc_http_ban_list_client.h" +#include "dap_http_ban_list_client.h" #include "dap_chain_net.h" #include "uthash.h" diff --git a/modules/net/srv/dap_chain_net_srv_client.c b/modules/net/srv/dap_chain_net_srv_client.c index d05c7c95c99ed5a8db7f0613a703e93dc69fdef1..11d7fc5e69a55486855fde1e0a7adc127e1c32ea 100644 --- a/modules/net/srv/dap_chain_net_srv_client.c +++ b/modules/net/srv/dap_chain_net_srv_client.c @@ -64,7 +64,7 @@ dap_chain_net_srv_client_t *dap_chain_net_srv_client_create_n_connect(dap_chain_ } inet_pton(AF_INET, a_addr, &l_info->hdr.ext_addr_v4); l_info->hdr.ext_port = a_port; - const char l_channels[] = {DAP_STREAM_CH_ID_NET_SRV, '\0'}; + const char l_channels[] = {DAP_STREAM_CH_NET_SRV_ID, '\0'}; l_ret->node_client = dap_chain_node_client_create_n_connect(a_net, l_info, l_channels, &l_callbacks, l_ret); @@ -82,7 +82,7 @@ ssize_t dap_chain_net_srv_client_write(dap_chain_net_srv_client_t *a_client, uin if (!a_client || !a_client->net_client || dap_client_get_stage(a_client->net_client) != STAGE_STREAM_STREAMING) return -1; if (a_type == DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_REQUEST) { - dap_stream_ch_t *l_ch = dap_client_get_stream_ch_unsafe(a_client->net_client, DAP_STREAM_CH_ID_NET_SRV); + dap_stream_ch_t *l_ch = dap_client_get_stream_ch_unsafe(a_client->net_client, DAP_STREAM_CH_NET_SRV_ID); dap_stream_ch_chain_net_srv_t *a_ch_chain = DAP_STREAM_CH_CHAIN_NET_SRV(l_ch); dap_stream_ch_chain_net_srv_pkt_request_t *l_request = (dap_stream_ch_chain_net_srv_pkt_request_t *)a_pkt_data; a_ch_chain->srv_uid.uint64 = l_request->hdr.srv_uid.uint64; diff --git a/modules/net/srv/dap_chain_net_srv_order.c b/modules/net/srv/dap_chain_net_srv_order.c index a3af1a285d6027ebf6e1b79789e95fec45af0c11..d1379bfd22a6c150a7204dc4ddbbbe7693d73dc0 100644 --- a/modules/net/srv/dap_chain_net_srv_order.c +++ b/modules/net/srv/dap_chain_net_srv_order.c @@ -356,7 +356,7 @@ char *dap_chain_net_srv_order_save(dap_chain_net_t *a_net, dap_chain_net_srv_ord : dap_chain_net_srv_order_get_gdb_group(a_net); if (!l_gdb_group_str) return NULL; - int l_rc = dap_global_db_set_sync(l_gdb_group_str, l_order_hash_str, a_order, l_order_size, true); + int l_rc = dap_global_db_set_sync(l_gdb_group_str, l_order_hash_str, a_order, l_order_size, false); DAP_DELETE(l_gdb_group_str); if (l_rc != DAP_GLOBAL_DB_RC_SUCCESS) DAP_DEL_Z(l_order_hash_str); diff --git a/modules/service/stake/dap_chain_net_srv_stake_pos_delegate.c b/modules/service/stake/dap_chain_net_srv_stake_pos_delegate.c index 3d226a3a562ea6bd068d57fb710d165961c2b60b..1e2534e5e4e21d77cd2ba9f7ba36537a1a1bb34d 100644 --- a/modules/service/stake/dap_chain_net_srv_stake_pos_delegate.c +++ b/modules/service/stake/dap_chain_net_srv_stake_pos_delegate.c @@ -2185,7 +2185,7 @@ int dap_chain_net_srv_stake_check_validator(dap_chain_net_t * a_net, dap_hash_fa } log_it(L_NOTICE, "Stream connection established"); - uint8_t l_ch_id = DAP_STREAM_CH_ID_NET; + uint8_t l_ch_id = DAP_STREAM_CH_NET_ID; dap_stream_ch_t * l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, l_ch_id); randombytes(l_test_data, sizeof(l_test_data)); diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index 8da57ebddb7c2b1f562415801d4f675ad560a541..3cd1c8e5946f9325ef094d128d2ca72fa5c487a1 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -902,7 +902,7 @@ int dap_chain_net_srv_vpn_init(dap_config_t * g_config) { log_it(L_INFO,"TUN driver configured successfuly"); s_vpn_service_create(g_config); - dap_stream_ch_proc_add(DAP_STREAM_CH_ID_NET_SRV_VPN, s_ch_vpn_new, s_ch_vpn_delete, s_ch_packet_in, + dap_stream_ch_proc_add(DAP_STREAM_CH_NET_SRV_ID_VPN, s_ch_vpn_new, s_ch_vpn_delete, s_ch_packet_in, s_ch_packet_out); // add console command to display vpn statistics diff --git a/modules/service/vpn/dap_chain_net_vpn_client.c b/modules/service/vpn/dap_chain_net_vpn_client.c index f9278506e8d93b569dbcafd653b5f391d4ae7cf7..68fd1d2b17f335dfeea71e9fc18e28bf3746e82d 100644 --- a/modules/service/vpn/dap_chain_net_vpn_client.c +++ b/modules/service/vpn/dap_chain_net_vpn_client.c @@ -99,7 +99,7 @@ dap_stream_worker_t* dap_chain_net_vpn_client_get_stream_worker(void) dap_stream_ch_t* dap_chain_net_vpn_client_get_stream_ch(void) { - return s_vpn_client ? dap_client_get_stream_ch_unsafe(s_vpn_client->client, DAP_STREAM_CH_ID_NET_SRV_VPN) : NULL; + return s_vpn_client ? dap_client_get_stream_ch_unsafe(s_vpn_client->client, DAP_STREAM_CH_NET_SRV_ID_VPN) : NULL; } /// TODO convert below callback to processor of stage @@ -515,7 +515,7 @@ int dap_chain_net_vpn_client_check(dap_chain_net_t *a_net, const char *a_ipv4_st clock_gettime(CLOCK_REALTIME, &l_t);//get_cur_time_msec long l_t1 = l_t.tv_sec * 1000 + l_t.tv_nsec / 1e6; - const char l_active_channels[] = { DAP_STREAM_CH_ID_NET_SRV, 0 }; //only R, without S + const char l_active_channels[] = { DAP_STREAM_CH_NET_SRV_ID, 0 }; //only R, without S if(a_ipv4_str) inet_pton(AF_INET, a_ipv4_str, &(s_node_info->hdr.ext_addr_v4)); if(a_ipv6_str) @@ -545,7 +545,7 @@ int dap_chain_net_vpn_client_check(dap_chain_net_t *a_net, const char *a_ipv4_st int l_dtime_connect_ms = l_t2 - l_t1; { - uint8_t l_ch_id = DAP_STREAM_CH_ID_NET_SRV; // Channel id for chain net request = 'R' + uint8_t l_ch_id = DAP_STREAM_CH_NET_SRV_ID; // Channel id for chain net request = 'R' dap_stream_ch_t *l_ch = dap_client_get_stream_ch_unsafe(s_vpn_client->client, l_ch_id); if(l_ch) { typedef dap_stream_ch_chain_net_srv_pkt_test_t pkt_t; @@ -603,7 +603,7 @@ int dap_chain_net_vpn_client_start(dap_chain_net_t *a_net, const char *a_ipv4_st s_node_info = DAP_NEW_Z(dap_chain_node_info_t); s_node_info->hdr.ext_port = a_port; - const char l_active_channels[] = { DAP_STREAM_CH_ID_NET_SRV, DAP_STREAM_CH_ID_NET_SRV_VPN, 0 }; //R, S + const char l_active_channels[] = { DAP_STREAM_CH_NET_SRV_ID, DAP_STREAM_CH_NET_SRV_ID_VPN, 0 }; //R, S if(a_ipv4_str) inet_pton(AF_INET, a_ipv4_str, &(s_node_info->hdr.ext_addr_v4)); if(a_ipv6_str) @@ -634,7 +634,7 @@ int dap_chain_net_vpn_client_start(dap_chain_net_t *a_net, const char *a_ipv4_st // send first packet to server { - uint8_t l_ch_id = DAP_STREAM_CH_ID_NET_SRV; // Channel id for chain net request = 'R' + uint8_t l_ch_id = DAP_STREAM_CH_NET_SRV_ID; // Channel id for chain net request = 'R' dap_stream_ch_t *l_ch = dap_client_get_stream_ch_unsafe(s_vpn_client->client, l_ch_id); if(l_ch) { dap_stream_ch_chain_net_srv_pkt_request_t l_request; @@ -676,7 +676,7 @@ int dap_chain_net_vpn_client_stop(void) dap_chain_net_vpn_client_status_t dap_chain_net_vpn_client_status(void) { if(s_vpn_client) { - uint8_t l_ch_id = DAP_STREAM_CH_ID_NET_SRV; // Channel id for chain net request = 'R' + uint8_t l_ch_id = DAP_STREAM_CH_NET_SRV_ID; // Channel id for chain net request = 'R' dap_stream_ch_t *l_ch = dap_client_get_stream_ch_unsafe(s_vpn_client->client, l_ch_id); if(!l_ch) return VPN_CLIENT_STATUS_CONN_LOST; diff --git a/modules/service/vpn/include/dap_chain_net_srv_vpn.h b/modules/service/vpn/include/dap_chain_net_srv_vpn.h index 8a844bbd01626c279407f921189971b32de684bb..82cccff2fa0364d805b390265c7389c06adfb048 100644 --- a/modules/service/vpn/include/dap_chain_net_srv_vpn.h +++ b/modules/service/vpn/include/dap_chain_net_srv_vpn.h @@ -34,7 +34,7 @@ #define DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_CLIENT 0x01 #define DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA 0x02 -#define DAP_STREAM_CH_ID_NET_SRV_VPN 'S' +#define DAP_STREAM_CH_NET_SRV_ID_VPN 'S' #define DAP_CHAIN_NET_SRV_VPN_ID 0x0000000000000001 diff --git a/modules/type/blocks/dap_chain_block_cache.c b/modules/type/blocks/dap_chain_block_cache.c index 2fccdd5bca02922351fde632eb33a839bf7729c1..d17e917b0d9644bda61d5de4b136c4b4f4125587 100644 --- a/modules/type/blocks/dap_chain_block_cache.c +++ b/modules/type/blocks/dap_chain_block_cache.c @@ -65,7 +65,11 @@ dap_chain_block_cache_t *dap_chain_block_cache_new(dap_hash_fast_t *a_block_hash log_it(L_CRITICAL, "Memory allocation error"); return NULL; } - l_block_cache->block = a_block; + l_block_cache->block = DAP_DUP_SIZE(a_block, a_block_size); + if (!l_block_cache->block) { + log_it(L_CRITICAL, "Memory allocation error"); + return NULL; + } l_block_cache->block_size = a_block_size; l_block_cache->block_number = a_block_number; l_block_cache->ts_created = a_block->hdr.ts_created; diff --git a/modules/type/blocks/dap_chain_cs_blocks.c b/modules/type/blocks/dap_chain_cs_blocks.c index c822b9827d9a288e5cc8e917fdc33d47bcc194bb..01fc8e8c3c484ca2df6b188408da22f683b63e90 100644 --- a/modules/type/blocks/dap_chain_cs_blocks.c +++ b/modules/type/blocks/dap_chain_cs_blocks.c @@ -1314,25 +1314,18 @@ static dap_chain_atom_verify_res_t s_callback_atom_add(dap_chain_t * a_chain, da debug_if(s_debug_more, L_DEBUG, "... %s is already present", l_block_cache->block_hash_str); pthread_rwlock_unlock(&PVT(l_blocks)->rwlock); return ATOM_PASS; - } else { - l_block_cache = dap_chain_block_cache_new(&l_block_hash, l_block, l_block_size, PVT(l_blocks)->blocks_count + 1); - if (!l_block_cache) { - log_it(L_DEBUG, "... corrupted block"); - pthread_rwlock_unlock(&PVT(l_blocks)->rwlock); - return ATOM_REJECT; - } - debug_if(s_debug_more, L_DEBUG, "... new block %s", l_block_cache->block_hash_str); } - dap_chain_atom_verify_res_t ret = s_callback_atom_verify(a_chain, a_atom, a_atom_size); switch (ret) { case ATOM_ACCEPT: + l_block_cache = dap_chain_block_cache_new(&l_block_hash, l_block, l_block_size, PVT(l_blocks)->blocks_count + 1); + if (!l_block_cache) + break; + debug_if(s_debug_more, L_DEBUG, "... new block %s", l_block_cache->block_hash_str); HASH_ADD(hh, PVT(l_blocks)->blocks, block_hash, sizeof(l_block_cache->block_hash), l_block_cache); ++PVT(l_blocks)->blocks_count; - pthread_rwlock_unlock(&PVT(l_blocks)->rwlock); debug_if(s_debug_more, L_DEBUG, "Verified atom %p: ACCEPTED", a_atom); - s_add_atom_datums(l_blocks, l_block_cache); - return ret; + break; case ATOM_MOVE_TO_THRESHOLD: // TODO: reimplement and enable threshold for blocks /* { @@ -1342,7 +1335,6 @@ static dap_chain_atom_verify_res_t s_callback_atom_add(dap_chain_t * a_chain, da */ ret = ATOM_REJECT; case ATOM_REJECT: - dap_chain_block_cache_delete(l_block_cache); debug_if(s_debug_more, L_DEBUG, "Verified atom %p: REJECTED", a_atom); break; default: @@ -1350,6 +1342,8 @@ static dap_chain_atom_verify_res_t s_callback_atom_add(dap_chain_t * a_chain, da break; } pthread_rwlock_unlock(&PVT(l_blocks)->rwlock); + if (ret == ATOM_ACCEPT) + s_add_atom_datums(l_blocks, l_block_cache); return ret; } diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 3bbad15c16162aa5dc6803d8e5c9f8da110d1796..80bb4674224923a2c0cb413f41081f5a975694e6 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -205,7 +205,7 @@ static bool s_dag_rounds_events_iter(dap_global_db_instance_t *a_dbi, const size_t a_values_current, const size_t a_values_count, dap_store_obj_t *a_values, void *a_arg) { - dap_return_val_if_pass(a_rc != DAP_GLOBAL_DB_RC_SUCCESS, false); + dap_return_val_if_pass(a_rc == DAP_GLOBAL_DB_RC_ERROR, false); for (size_t i = 0; i < a_values_count; i++) { dap_store_obj_t *l_obj_cur = a_values + i; @@ -323,7 +323,7 @@ static int s_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) l_dag->gdb_group_events_round_new = dap_strdup_printf(l_dag->is_celled ? "dag-%s-%s-%016llx-round.new" : "dag-%s-%s-round.new", l_net->pub.gdb_groups_prefix, a_chain->name, 0LLU); dap_global_db_cluster_t *l_dag_cluster = dap_global_db_cluster_add(dap_global_db_instance_get_default(), - l_dag->gdb_group_events_round_new, l_dag->gdb_group_events_round_new, + NULL, 0, l_dag->gdb_group_events_round_new, 900, true, DAP_GDB_MEMBER_ROLE_NOBODY, DAP_CLUSTER_ROLE_AUTONOMIC); dap_global_db_cluster_add_notify_callback(l_dag_cluster, s_round_changes_notify, l_dag); dap_chain_net_add_poa_certs_to_cluster(l_net, l_dag_cluster); @@ -497,30 +497,18 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG(a_chain); dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *) a_atom; - dap_chain_cs_dag_event_item_t * l_event_item = DAP_NEW_Z(dap_chain_cs_dag_event_item_t); - if (!l_event_item) { - log_it(L_CRITICAL, "Memory allocation error"); - return ATOM_REJECT; - } - pthread_mutex_t *l_events_mutex = &PVT(l_dag)->events_mutex; - l_event_item->event = l_event; - l_event_item->event_size = a_atom_size; - l_event_item->ts_added = dap_time_now(); - + dap_chain_cs_dag_event_item_t *l_event_item = NULL; dap_chain_hash_fast_t l_event_hash; dap_chain_cs_dag_event_calc_hash(l_event, a_atom_size, &l_event_hash); - l_event_item->hash = l_event_hash; - - if(s_debug_more) { + if (s_debug_more) { char l_event_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = { '\0' }; dap_chain_hash_fast_to_str(&l_event_item->hash, l_event_hash_str, sizeof(l_event_hash_str)); log_it(L_DEBUG, "Processing event: %s ... (size %zd)", l_event_hash_str,a_atom_size); } - - pthread_mutex_lock(l_events_mutex); + pthread_mutex_lock(&PVT(l_dag)->events_mutex); // check if we already have this event - dap_chain_atom_verify_res_t ret = s_dap_chain_check_if_event_is_present(PVT(l_dag)->events, &l_event_item->hash) || - s_dap_chain_check_if_event_is_present(PVT(l_dag)->events_treshold, &l_event_item->hash) ? ATOM_PASS : ATOM_ACCEPT; + dap_chain_atom_verify_res_t ret = s_dap_chain_check_if_event_is_present(PVT(l_dag)->events, &l_event_hash) || + s_dap_chain_check_if_event_is_present(PVT(l_dag)->events_treshold, &l_event_hash) ? ATOM_PASS : ATOM_ACCEPT; // verify hashes and consensus switch (ret) { @@ -534,13 +522,28 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha break; case ATOM_PASS: debug_if(s_debug_more, L_DEBUG, "Atom already present"); - DAP_DELETE(l_event_item); - pthread_mutex_unlock(l_events_mutex); + pthread_mutex_unlock(&PVT(l_dag)->events_mutex); return ret; default: break; } + l_event_item = DAP_NEW_Z(dap_chain_cs_dag_event_item_t); + if (!l_event_item) { + log_it(L_CRITICAL, "Memory allocation error"); + pthread_mutex_unlock(&PVT(l_dag)->events_mutex); + return ATOM_REJECT; + } + l_event_item->event = DAP_DUP_SIZE(a_atom, a_atom_size); + if (!l_event_item->event) { + log_it(L_CRITICAL, "Memory allocation error"); + pthread_mutex_unlock(&PVT(l_dag)->events_mutex); + return ATOM_REJECT; + } + l_event_item->event_size = a_atom_size; + l_event_item->ts_added = dap_time_now(); + l_event_item->hash = l_event_hash; + switch (ret) { case ATOM_MOVE_TO_THRESHOLD: { dap_chain_cs_dag_blocked_t *el = NULL; @@ -588,10 +591,13 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha s_dag_events_lasts_process_new_last_event(l_dag, l_event_item); } break; default: - DAP_DELETE(l_event_item); // Neither added, nor freed break; } - pthread_mutex_unlock(l_events_mutex); + pthread_mutex_unlock(&PVT(l_dag)->events_mutex); + if (ret == ATOM_REJECT) { // Neither added, nor freed + DAP_DELETE(l_event_item->event); + DAP_DELETE(l_event_item); + } return ret; } @@ -724,28 +730,30 @@ static bool s_chain_callback_datums_pool_proc(dap_chain_t *a_chain, dap_chain_da log_it(L_ERROR,"Can't create new event!"); return false; } - + dap_hash_fast_t l_event_hash; + dap_hash_fast(l_event, l_event_size, &l_event_hash); + bool l_res = false; if (l_dag->is_add_directly) { - dap_chain_atom_verify_res_t l_verify_res; - switch (l_verify_res = s_chain_callback_atom_add(a_chain, l_event, l_event_size)) { - case ATOM_ACCEPT: - return dap_chain_atom_save(a_chain, (uint8_t *)l_event, l_event_size, a_chain->cells->id) > 0; - default: + dap_chain_atom_verify_res_t l_verify_res = s_chain_callback_atom_add(a_chain, l_event, l_event_size); + if (l_verify_res == ATOM_ACCEPT) + l_res = dap_chain_atom_save(a_chain->cells, (uint8_t *)l_event, l_event_size, &l_event_hash) > 0; + else log_it(L_ERROR, "Can't add new event to the file, atom verification result %d", l_verify_res); - return false; - } + DAP_DELETE(l_event); + return l_res; } dap_global_db_set_sync(l_dag->gdb_group_events_round_new, DAG_ROUND_CURRENT_KEY, &l_current_round, sizeof(uint64_t), false); dap_chain_cs_dag_event_round_item_t l_round_item = { .round_info.datum_hash = l_datum_hash }; - char *l_event_hash_str; - dap_get_data_hash_str_static(l_event, l_event_size, l_event_hash_str); - bool l_res = dap_chain_cs_dag_event_gdb_set(l_dag, l_event_hash_str, l_event, l_event_size, &l_round_item); + char *l_event_hash_hex_str = DAP_NEW_STACK_SIZE(char, DAP_CHAIN_HASH_FAST_STR_SIZE); + dap_chain_hash_fast_to_str(&l_event_hash, l_event_hash_hex_str, DAP_CHAIN_HASH_FAST_STR_SIZE); + l_res = dap_chain_cs_dag_event_gdb_set(l_dag, l_event_hash_hex_str, l_event, l_event_size, &l_round_item) == DAP_GLOBAL_DB_RC_SUCCESS; + DAP_DELETE(l_event); log_it(l_res ? L_INFO : L_ERROR, l_res ? "Event %s placed in the new forming round [id %"DAP_UINT64_FORMAT_U"]" : "Can't add new event [%s] to the new events round [id %"DAP_UINT64_FORMAT_U"]", - l_event_hash_str, l_current_round); + l_event_hash_hex_str, l_current_round); return l_res; } @@ -1538,9 +1546,7 @@ static int s_cli_dag(int argc, char ** argv, void **a_str_reply) dap_string_append_printf( l_str_ret_tmp, "Event %s verification passed\n", l_objs[i].key); // If not verify only mode we add if ( ! l_verify_only ){ - dap_chain_atom_ptr_t l_new_atom = DAP_DUP_SIZE(l_event, l_event_size); // produce deep copy of event; - if(s_chain_callback_atom_add(l_chain, l_new_atom, l_event_size) < 0) { // Add new atom in chain - DAP_DELETE(l_new_atom); + if (s_chain_callback_atom_add(l_chain, l_event, l_event_size) != ATOM_ACCEPT) { // Add new atom in chain dap_string_append_printf(l_str_ret_tmp, "Event %s not added in chain\n", l_objs[i].key); } else { // add event to delete diff --git a/modules/type/none/dap_chain_cs_none.c b/modules/type/none/dap_chain_cs_none.c index acdfb121599fc21167216490ccc67b8cde46d4ae..87669c1d41d672721c2af599f1f361f644e7bc54 100644 --- a/modules/type/none/dap_chain_cs_none.c +++ b/modules/type/none/dap_chain_cs_none.c @@ -168,7 +168,8 @@ static int s_cs_callback_new(dap_chain_t *a_chain, dap_config_t UNUSED_ARG *a_ch l_nochain_priv->group_datums = dap_chain_net_get_gdb_group_nochain_new(a_chain); // Add group prefix that will be tracking all changes dap_global_db_cluster_t *l_nonconsensus_cluster = - dap_global_db_cluster_add(dap_global_db_instance_get_default(), l_net->pub.name, + dap_global_db_cluster_add(dap_global_db_instance_get_default(), + l_net->pub.name, l_net->pub.id.uint64, l_nochain_priv->group_datums, 0, true, DAP_GDB_MEMBER_ROLE_USER, DAP_CLUSTER_ROLE_EMBEDDED); if (!l_nonconsensus_cluster) { @@ -373,7 +374,7 @@ static dap_chain_atom_verify_res_t s_nonconsensus_callback_atom_add(dap_chain_t dap_chain_datum_t *l_datum = (dap_chain_datum_t*) a_atom; dap_hash_fast_t l_datum_hash; dap_hash_fast(l_datum->data, l_datum->header.data_size, &l_datum_hash); - if(dap_chain_datum_add(a_chain, l_datum, a_atom_size, &l_datum_hash)) + if (dap_chain_datum_add(a_chain, l_datum, a_atom_size, &l_datum_hash)) return ATOM_REJECT; dap_nonconsensus_datum_hash_item_t * l_hash_item = DAP_NEW_Z(dap_nonconsensus_datum_hash_item_t);