diff --git a/3rdparty/wolfssl/CMakeLists.txt b/3rdparty/wolfssl/CMakeLists.txt index e9889976e2b519cdd5d6954ceb46cbf89a37b923..550c2b74b20840f19fda3f1ed0b016727de273c8 100644 --- a/3rdparty/wolfssl/CMakeLists.txt +++ b/3rdparty/wolfssl/CMakeLists.txt @@ -44,6 +44,7 @@ set(LIBTOOL_FULL_VERSION ${LIBTOOL_SO_VERSION}.${LIBTOOL_AGE}.${LIBTOOL_REVISION set(WOLFSSL_DEFINITIONS) set(WOLFSSL_LINK_LIBS) +#set(WOLFSSL_INSTALL_LIBS) list(APPEND WOLFSSL_DEFINITIONS "-DWOLFSSL_STATIC_RSA") list(APPEND WOLFSSL_DEFINITIONS "-DWOLFSSL_STATIC_PSK") @@ -1133,7 +1134,7 @@ else() endif() set(WOLFSSL_EXAMPLES_HELP_STRING "Enable examples (default: enabled)") -option(WOLFSSL_EXAMPLES ${WOLFSSL_EXAMPLES_HELP_STRING} ${EXAMPLES_DEFAULT}) +option(WOLFSSL_EXAMPLES ${WOLFSSL_EXAMPLES_HELP_STRING} "no") #${EXAMPLES_DEFAULT}) if(NOT WOLFSSL_FILESYSTEM OR NOT WOLFSSL_INLINE OR @@ -1149,7 +1150,7 @@ else() endif() set(WOLFSSL_CRYPT_TESTS_HELP_STRING "Enable Crypt Bench/Test (default: enabled)") -option(WOLFSSL_CRYPT_TESTS ${WOLFSSL_CRYPT_TESTS_HELP_STRING} ${CRYPT_TESTS_DEFAULT}) +option(WOLFSSL_CRYPT_TESTS ${WOLFSSL_CRYPT_TESTS_HELP_STRING} "no") #${CRYPT_TESTS_DEFAULT}) # TODO: - LIBZ # - PKCS#11 @@ -1465,3 +1466,68 @@ if(WOLFSSL_CRYPT_TESTS) PROPERTY RUNTIME_OUTPUT_NAME benchmark) endif() + +if(WOLFSSL_INSTALL_LIBS) + #################################################### + # Installation + #################################################### + + include(GNUInstallDirs) + + set(EXCLUDED_HEADERS_REGEX + "(internal|\ + options|\ + pic32mz-crypt|\ + ti-hash|\ + ti-ccm|\ + nrf51|\ + ksdk_port|\ + dcp_port|\ + xil-sha3|\ + caam_driver|\ + wolfcaam|\ + wolfcaam_sha|\ + stm32|\ + stsafe|\ + esp32-cry|\ + cryptoCell|\ + renesas-tsip-crypt|\ + psoc6_crypto).h") + + set(INSTALLED_EXAMPLES + ${CMAKE_CURRENT_SOURCE_DIR}/examples/echoserver/echoserver.c + ${CMAKE_CURRENT_SOURCE_DIR}/examples/sctp/sctp-server.c + ${CMAKE_CURRENT_SOURCE_DIR}/examples/sctp/sctp-client-dtls.c + ${CMAKE_CURRENT_SOURCE_DIR}/examples/sctp/sctp-client.c + ${CMAKE_CURRENT_SOURCE_DIR}/examples/sctp/sctp-server-dtls.c + ${CMAKE_CURRENT_SOURCE_DIR}/examples/echoclient/echoclient.c + ${CMAKE_CURRENT_SOURCE_DIR}/examples/server/server.c + ${CMAKE_CURRENT_SOURCE_DIR}/examples/benchmark/tls_bench.c + ${CMAKE_CURRENT_SOURCE_DIR}/examples/client/client.c) + + # Install the library + install(TARGETS wolfssl + DESTINATION ${CMAKE_INSTALL_LIBDIR} + EXPORT wolfssl-targets + LIBRARY) + # Install the headers + install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/wolfssl/ + DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/wolfssl + FILES_MATCHING PATTERN "*.h" + REGEX ${EXCLUDED_HEADERS_REGEX} EXCLUDE) + # Install the examples + install(FILES ${INSTALLED_EXAMPLES} + DESTINATION ${CMAKE_INSTALL_DOCDIR}/example) + # Install README.txt and taoCert.txt + install(FILES + ${CMAKE_CURRENT_SOURCE_DIR}/doc/README.txt + ${CMAKE_CURRENT_SOURCE_DIR}/certs/taoCert.txt + DESTINATION ${CMAKE_INSTALL_DOCDIR}/wolfssl) + # Install the export set + install(EXPORT wolfssl-targets + DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/wolfssl + FILE wolfssl-config.cmake) + + # TODO: Distro build + rules for what to include in the distro. + # See various include.am files. +endif() diff --git a/CMakeLists.txt b/CMakeLists.txt index 16a384bc489b9e8f9ef6b472b5b39bdcfe707333..241e576efd0d1108735b74ba11daf0f9ae5d97ce 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 3.0) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.9-2") +set(CELLFRAME_SDK_NATIVE_VERSION "2.9-3") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") message("Cellframe modules: ${CELLFRAME_MODULES}") diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index da7237b0ddef01c8bef5adc4fabce9c14ad5a863..a48574c66e66ce8192658c6f1c36124764142d69 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -61,8 +61,7 @@ #define LOG_TAG "dap_worker" -// temporary too big timout for no closing sockets opened to not keep alive peers -static time_t s_connection_timeout = 20000; // 60; // seconds +static time_t s_connection_timeout = 60; // seconds static bool s_debug_reactor=true; static bool s_socket_all_check_activity( void * a_arg); diff --git a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c index 57a737341688ca92abac34544fd71e0009cb943b..c060766e5ade6ac0c3aee61683d4b89f28a3bcb5 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c @@ -237,6 +237,10 @@ size_t dap_stream_ch_pkt_write_unsafe(dap_stream_ch_t * a_ch, uint8_t a_type, c log_it(L_WARNING, "Channel is NULL ptr"); return 0; } + if (!a_ch->proc) { + log_it(L_WARNING, "Channel PROC is NULL ptr"); + return 0; + } //log_it(L_DEBUG,"Output: Has %u bytes of %c type for %c channel id",data_size, (char)type, (char) ch->proc->id ); dap_stream_ch_pkt_hdr_t l_hdr; diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index b4b4825a75e99dfd6a0884af93b6ce41e36677de..99e76a7ff7a02802896afbe355eb7f6fc2a71e09 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -75,9 +75,12 @@ struct sync_request dap_stream_ch_t * ch; dap_stream_ch_chain_sync_request_t request; dap_stream_ch_chain_pkt_hdr_t request_hdr; - dap_list_t *pkt_list; + dap_chain_pkt_item_t pkt; - dap_stream_ch_chain_hash_item_t * remote_atoms; + dap_stream_ch_chain_update_element_t *local_gdbs; + uint64_t local_gdbs_count; + dap_stream_ch_chain_hash_item_t *remote_atoms; // Remote atoms + dap_stream_ch_chain_hash_item_t *remote_gdbs; // Remote gdbs uint64_t stats_request_elemets_processed; union{ @@ -91,23 +94,17 @@ struct sync_request }; }; -struct ch_chain_pkt_in -{ - dap_stream_ch_t * ch; - dap_chain_pkt_item_t * pkt; -}; - static void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg); static void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg); static void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg); static void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg); +static void s_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const char * a_err_string); static bool s_sync_out_chains_proc_callback(dap_proc_thread_t *a_thread, void *a_arg); static void s_sync_out_chains_last_worker_callback(dap_worker_t *a_worker, void *a_arg); static void s_sync_out_chains_first_worker_callback(dap_worker_t *a_worker, void *a_arg); static bool s_sync_out_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_arg); -static void s_sync_out_gdb_first_gdb_worker_callback(dap_worker_t *a_worker, void *a_arg); static void s_sync_out_gdb_synced_data_worker_callback(dap_worker_t *a_worker, void *a_arg); static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg); @@ -160,15 +157,8 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg) */ static void s_sync_request_delete(struct sync_request * a_sync_request) { - if (a_sync_request->pkt_list) { - dap_list_t *l_tmp_item = a_sync_request->pkt_list; while(l_tmp_item) { - dap_chain_pkt_item_t *l_pkt_copy = (dap_chain_pkt_item_t *)l_tmp_item->data; - DAP_DELETE(l_pkt_copy->pkt_data); - DAP_DELETE(l_pkt_copy); - dap_list_t *l_trash_item = l_tmp_item; - l_tmp_item = dap_list_next(l_tmp_item); - DAP_DELETE(l_trash_item); - } + if (a_sync_request->pkt.pkt_data) { + DAP_DELETE(a_sync_request->pkt.pkt_data); } if (a_sync_request->gdb.db_iter) { @@ -328,7 +318,7 @@ static bool s_sync_out_chains_proc_callback(dap_proc_thread_t *a_thread, void *a * @param a_worker * @param a_arg */ -static void s_sync_out_gdb_first_gdb_worker_callback(dap_worker_t *a_worker, void *a_arg) +static void s_sync_out_gdb_first_worker_callback(dap_worker_t *a_worker, void *a_arg) { struct sync_request *l_sync_request = (struct sync_request *) a_arg; dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN( l_sync_request->ch ); @@ -360,11 +350,11 @@ static void s_sync_out_gdb_first_gdb_worker_callback(dap_worker_t *a_worker, voi * @param a_worker * @param a_arg */ -static void s_sync_out_gdb_synced_data_worker_callback(dap_worker_t *a_worker, void *a_arg) +static void s_sync_out_gdb_last_worker_callback(dap_worker_t *a_worker, void *a_arg) { UNUSED(a_worker); dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN( ((struct sync_request *) a_arg)->ch ); - s_sync_out_gdb_first_gdb_worker_callback(NULL,a_arg); // NULL to say callback not to delete request + s_sync_out_gdb_first_worker_callback(NULL,a_arg); // NULL to say callback not to delete request dap_stream_ch_chain_sync_request_t l_request = {0}; if (s_debug_more ) @@ -394,7 +384,8 @@ static bool s_sync_out_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_ar // Get log diff uint64_t l_local_last_id = dap_db_log_get_last_id(); if (s_debug_more) - log_it(L_DEBUG, "Sync out gdb proc, requested transactions %llu:%llu", l_sync_request->request.id_start, l_local_last_id); + log_it(L_DEBUG, "Sync out gdb proc, requested transactions %llu:%llu from address "NODE_ADDR_FP_STR, + l_sync_request->request.id_start, l_local_last_id, NODE_ADDR_FP_ARGS_S(l_sync_request->request.node_addr)); uint64_t l_start_item = l_sync_request->request.id_start; // If the current global_db has been truncated, but the remote node has not known this if(l_sync_request->request.id_start > l_local_last_id) { @@ -406,13 +397,38 @@ static bool s_sync_out_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_ar if(l_db_log) { l_sync_request->gdb.db_log = l_db_log; - dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_gdb_first_gdb_worker_callback,l_sync_request ); + dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_gdb_first_worker_callback,l_sync_request ); } else { - dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_gdb_synced_data_worker_callback,l_sync_request ); + dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_gdb_last_worker_callback,l_sync_request ); } return true; } +static void s_sync_update_gdb_start_worker_callback(dap_worker_t *a_worker, void *a_arg) +{ + UNUSED(a_worker); + struct sync_request *l_sync_request = (struct sync_request *)a_arg; + dap_stream_ch_chain_pkt_write_unsafe(l_sync_request->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_START, + l_sync_request->request_hdr.net_id.uint64, l_sync_request->request_hdr.chain_id.uint64, + l_sync_request->request_hdr.cell_id.uint64, NULL, 0); + if (s_debug_more) + log_it(L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_START for net_id 0x%016"DAP_UINT64_FORMAT_x" " + "chain_id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x"", + l_sync_request->request_hdr.net_id.uint64, l_sync_request->request_hdr.chain_id.uint64, l_sync_request->request_hdr.cell_id.uint64); + DAP_DELETE(l_sync_request); +} + +static bool s_sync_update_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) +{ + struct sync_request *l_sync_request = (struct sync_request *)a_arg; + // TODO make a local_gdbs hash table + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_sync_request->ch); + l_ch_chain->local_gdbs = l_sync_request->local_gdbs; + l_ch_chain->local_gdbs_count = l_sync_request->local_gdbs_count; + dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_update_gdb_start_worker_callback, l_sync_request); + return true; +} + /** * @brief s_chain_in_pkt_callback * @param a_thread @@ -423,130 +439,110 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) { UNUSED(a_thread); struct sync_request *l_sync_request = (struct sync_request *) a_arg; + dap_chain_pkt_item_t *l_pkt_item = &l_sync_request->pkt; dap_chain_hash_fast_t l_atom_hash = {}; - if (l_sync_request->pkt_list) { - dap_chain_pkt_item_t *l_pkt_item = NULL; - dap_list_t *l_pkt_iter =l_sync_request->pkt_list; - l_pkt_item = (dap_chain_pkt_item_t *)l_pkt_iter->data; - l_sync_request->pkt_list = l_sync_request->pkt_list->next; - if (l_sync_request->pkt_list ){ - l_sync_request->pkt_list->prev = NULL; + if (l_pkt_item->pkt_data_size) { + dap_chain_t *l_chain = dap_chain_find_by_id(l_sync_request->request_hdr.net_id, l_sync_request->request_hdr.chain_id); + if (!l_chain) { + if (s_debug_more) + log_it(L_WARNING, "No chain found for DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN"); + return true; } - if (l_pkt_item){ - dap_chain_t *l_chain = dap_chain_find_by_id(l_pkt_item->pkt_hdr.net_id, l_pkt_item->pkt_hdr.chain_id); - if (!l_chain) { - if (s_debug_more) - log_it(L_WARNING, "No chain found for DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN"); - return true; - } - dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_item->pkt_data; - uint64_t l_atom_copy_size = l_pkt_item->pkt_data_size; - if ( l_atom_copy_size && l_pkt_item && l_atom_copy ){ - pthread_rwlock_wrlock(&l_chain->atoms_rwlock); - dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash); - dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain); - size_t l_atom_size =0; - if ( l_chain->callback_atom_find_by_hash(l_atom_iter, &l_atom_hash, &l_atom_size) == NULL ) { - dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size); - if ( l_atom_add_res != ATOM_REJECT && dap_chain_has_file_store(l_chain)) { - if (s_debug_more){ - char l_atom_hash_str[72]={[0]='\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); - log_it(L_INFO,"Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); - } + dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_item->pkt_data; + uint64_t l_atom_copy_size = l_pkt_item->pkt_data_size; + if ( l_atom_copy_size && l_pkt_item && l_atom_copy ){ + pthread_rwlock_wrlock(&l_chain->atoms_rwlock); + dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash); + dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain); + size_t l_atom_size =0; + if ( l_chain->callback_atom_find_by_hash(l_atom_iter, &l_atom_hash, &l_atom_size) == NULL ) { + dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size); + if ( l_atom_add_res != ATOM_REJECT && dap_chain_has_file_store(l_chain)) { + if (s_debug_more){ + char l_atom_hash_str[72]={[0]='\0'}; + dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); + log_it(L_INFO,"Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); + } - // append to file - dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_pkt_item->pkt_hdr.cell_id); - int l_res; - if (l_cell) { - // add one atom only - l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size); - // rewrite all file - //l_res = dap_chain_cell_file_update(l_cell); - if(l_res < 0) { - log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash, - l_cell ? l_cell->file_storage_path : "[null]"); - } else { - dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); - } - // add all atoms from treshold - if (l_chain->callback_atom_add_from_treshold){ - dap_chain_atom_ptr_t l_atom_treshold; - do{ - size_t l_atom_treshold_size; - // add into ledger - if (s_debug_more) - log_it(L_DEBUG, "Try to add atom from treshold"); - l_atom_treshold = l_chain->callback_atom_add_from_treshold(l_chain, &l_atom_treshold_size); - // add into file - if(l_atom_treshold) { - l_res = dap_chain_cell_file_append(l_cell, l_atom_treshold, l_atom_treshold_size); - log_it(L_INFO, "Added atom from treshold"); - if(l_res < 0) { - log_it(L_ERROR, "Can't save event 0x%x from treshold to the file '%s'", - l_atom_treshold, l_cell ? l_cell->file_storage_path : "[null]"); - } + // append to file + dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_sync_request->request_hdr.cell_id); + int l_res; + if (l_cell) { + // add one atom only + l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size); + // rewrite all file + //l_res = dap_chain_cell_file_update(l_cell); + if(l_res < 0) { + log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash, + l_cell ? l_cell->file_storage_path : "[null]"); + } else { + dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); + } + // add all atoms from treshold + if (l_chain->callback_atom_add_from_treshold){ + dap_chain_atom_ptr_t l_atom_treshold; + do{ + size_t l_atom_treshold_size; + // add into ledger + if (s_debug_more) + log_it(L_DEBUG, "Try to add atom from treshold"); + l_atom_treshold = l_chain->callback_atom_add_from_treshold(l_chain, &l_atom_treshold_size); + // add into file + if(l_atom_treshold) { + l_res = dap_chain_cell_file_append(l_cell, l_atom_treshold, l_atom_treshold_size); + log_it(L_INFO, "Added atom from treshold"); + if(l_res < 0) { + log_it(L_ERROR, "Can't save event 0x%x from treshold to the file '%s'", + l_atom_treshold, l_cell ? l_cell->file_storage_path : "[null]"); } } - while(l_atom_treshold); } - - // delete cell and close file - dap_chain_cell_delete(l_cell); + while(l_atom_treshold); } - else{ - log_it(L_ERROR, "Can't get cell for cell_id 0x%x for save event to file", l_pkt_item->pkt_hdr.cell_id); - } - }else if(l_atom_add_res == ATOM_PASS){ - if (s_debug_more){ - char l_atom_hash_str[72]={[0]='\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); - log_it(L_WARNING,"Not accepted atom (code ATOM_PASS) with hash %s for %s:%s and moved into the treshold", l_atom_hash_str, l_chain->net_name, l_chain->name); - } - }else{ - if (s_debug_more){ - char l_atom_hash_str[72]={[0]='\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); - log_it(L_WARNING,"Not accepted atom (code %d) with hash %s for %s:%s", l_atom_add_res, l_atom_hash_str, l_chain->net_name, l_chain->name); - } + // delete cell and close file + dap_chain_cell_delete(l_cell); } - DAP_DEL_Z(l_atom_copy); - } else { + else{ + log_it(L_ERROR, "Can't get cell for cell_id 0x%x for save event to file", l_sync_request->request_hdr.cell_id); + + } + }else if(l_atom_add_res == ATOM_PASS){ + if (s_debug_more){ + char l_atom_hash_str[72]={[0]='\0'}; + dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); + log_it(L_WARNING,"Not accepted atom (code ATOM_PASS) with hash %s for %s:%s and moved into the treshold", l_atom_hash_str, l_chain->net_name, l_chain->name); + } + }else{ if (s_debug_more){ char l_atom_hash_str[72]={[0]='\0'}; dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); - log_it(L_WARNING,"Already has atom with hash %s ", l_atom_hash_str); + log_it(L_WARNING,"Not accepted atom (code %d) with hash %s for %s:%s", l_atom_add_res, l_atom_hash_str, l_chain->net_name, l_chain->name); } - dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); - DAP_DELETE(l_atom_copy); } - l_chain->callback_atom_iter_delete(l_atom_iter); - pthread_rwlock_unlock(&l_chain->atoms_rwlock); - }else{ - if (!l_pkt_item) - log_it(L_WARNING, "chain packet item is NULL"); - if (l_atom_copy_size) - log_it(L_WARNING, "chain packet item data size is zero"); + DAP_DEL_Z(l_atom_copy); + } else { + if (s_debug_more){ + char l_atom_hash_str[72]={[0]='\0'}; + dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); + log_it(L_WARNING,"Already has atom with hash %s ", l_atom_hash_str); + } + dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); + DAP_DELETE(l_atom_copy); } + l_chain->callback_atom_iter_delete(l_atom_iter); + pthread_rwlock_unlock(&l_chain->atoms_rwlock); }else{ - log_it(L_WARNING, "pkt copy is NULL"); + if (!l_pkt_item) + log_it(L_WARNING, "chain packet item is NULL"); + if (l_atom_copy_size) + log_it(L_WARNING, "chain packet item data size is zero"); } - if (l_pkt_item) - DAP_DELETE(l_pkt_item); - - if (l_pkt_iter) - DAP_DELETE(l_pkt_iter); }else log_it(L_WARNING, "In proc thread got CHAINS stream ch packet with zero data"); - - if(l_sync_request->pkt_list){ - return false; - }else{ - DAP_DELETE(l_sync_request); - return true; - } + DAP_DELETE(l_sync_request); + return true; } /** @@ -557,13 +553,14 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_worker, void *a_arg) { UNUSED(a_worker); - struct ch_chain_pkt_in * l_pkt_in = (struct ch_chain_pkt_in*) a_arg; - dap_stream_ch_chain_pkt_write_error_unsafe(l_pkt_in->ch, l_pkt_in->pkt->pkt_hdr.net_id.uint64, - l_pkt_in->pkt->pkt_hdr.chain_id.uint64, l_pkt_in->pkt->pkt_hdr.cell_id.uint64, - "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED"); - dap_stream_ch_set_ready_to_write_unsafe(l_pkt_in->ch, true); - DAP_DELETE(l_pkt_in); + struct sync_request *l_sync_request = (struct sync_request *) a_arg; + + dap_stream_ch_chain_pkt_write_error_unsafe(l_sync_request->ch, l_sync_request->request_hdr.net_id.uint64, + l_sync_request->request_hdr.chain_id.uint64, + l_sync_request->request_hdr.cell_id.uint64, + "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED"); + DAP_DELETE(l_sync_request); } @@ -576,14 +573,9 @@ static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_worker, void *a_a static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) { struct sync_request *l_sync_request = (struct sync_request *) a_arg; + dap_chain_pkt_item_t *l_pkt_item = &l_sync_request->pkt; - dap_list_t *l_pkt_iter = l_sync_request->pkt_list; - if (l_pkt_iter) { - l_sync_request->pkt_list =l_sync_request->pkt_list->next; - if (l_sync_request->pkt_list ) - l_sync_request->pkt_list->prev = NULL; - - dap_chain_pkt_item_t *l_pkt_item = (dap_chain_pkt_item_t *)l_pkt_iter->data; + if (l_pkt_item->pkt_data_size) { size_t l_data_obj_count = 0; // deserialize data & Parse data from dap_db_log_pack() dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_pkt_item->pkt_data, l_pkt_item->pkt_data_size, &l_data_obj_count); @@ -654,7 +646,7 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) } // apply received transaction - dap_chain_t *l_chain = dap_chain_find_by_id(l_pkt_item->pkt_hdr.net_id, l_pkt_item->pkt_hdr.chain_id); + dap_chain_t *l_chain = dap_chain_find_by_id(l_sync_request->request_hdr.net_id, l_sync_request->request_hdr.chain_id); if(l_chain) { if(l_chain->callback_add_datums_with_group){ void * restrict l_store_obj_value = l_store_obj->value; @@ -665,10 +657,10 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) } // save data to global_db if(!dap_chain_global_db_obj_save(l_obj, 1)) { - struct ch_chain_pkt_in * l_pkt_in = DAP_NEW_Z(struct ch_chain_pkt_in); - l_pkt_in->ch = l_sync_request->ch; - l_pkt_in->pkt = l_pkt_item; - dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id,s_gdb_in_pkt_error_worker_callback, l_sync_request); + if(l_store_obj) + dap_store_obj_free(l_store_obj, l_data_obj_count); + dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_gdb_in_pkt_error_worker_callback, l_sync_request); + return true; } else { // If request was from defined node_addr we update its state if(l_sync_request->request.node_addr.uint64) { @@ -680,21 +672,11 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) } if(l_store_obj) dap_store_obj_free(l_store_obj, l_data_obj_count); - if (l_pkt_item) - DAP_DELETE(l_pkt_item); - if (l_pkt_iter) - DAP_DELETE(l_pkt_iter); - if(l_sync_request->pkt_list) - return false; - else{ - DAP_DELETE(l_sync_request); - return true; - } - } else { log_it(L_WARNING, "In proc thread got GDB stream ch packet with zero data"); - return true; } + DAP_DELETE(l_sync_request); + return true; } /** @@ -702,21 +684,27 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) * @param a_ch_chain * @param a_net */ -void dap_stream_ch_chain_create_sync_request_gdb(dap_stream_ch_chain_t * a_ch_chain, dap_chain_net_t * a_net) +struct sync_request *dap_stream_ch_chain_create_sync_request(dap_stream_ch_chain_pkt_t *a_chain_pkt, dap_stream_ch_t* a_ch) { - a_ch_chain->is_on_request = true; - memset(&a_ch_chain->request_hdr,0,sizeof (a_ch_chain->request_hdr)); - a_ch_chain->request_hdr.net_id = a_net->pub.id; - + dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); + memcpy(&l_ch_chain->request, a_chain_pkt->data, sizeof(l_ch_chain->request)); + memcpy(&l_ch_chain->request_hdr, &a_chain_pkt->hdr, sizeof(l_ch_chain->request_hdr)); struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request); - l_sync_request->ch = a_ch_chain->ch; - l_sync_request->worker = a_ch_chain->ch->stream_worker->worker; - memcpy(&l_sync_request->request, &a_ch_chain->request, sizeof (a_ch_chain->request)); - memcpy(&l_sync_request->request_hdr, &a_ch_chain->request_hdr, sizeof (a_ch_chain->request_hdr)); - dap_proc_queue_add_callback_inter(a_ch_chain->ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, - l_sync_request); + l_sync_request->ch = a_ch; + l_sync_request->worker = a_ch->stream_worker->worker; + l_sync_request->remote_gdbs = l_ch_chain->remote_gdbs; + l_sync_request->remote_atoms = l_ch_chain->remote_atoms; + memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof(l_ch_chain->request)); + memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof(l_ch_chain->request_hdr)); + return l_sync_request; } +static void s_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const char * a_err_string) +{ + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); + dap_stream_ch_chain_go_idle(l_ch_chain); + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, a_net_id, a_chain_id, a_cell_id, a_err_string); +} /** * @brief s_stream_ch_packet_in @@ -754,7 +742,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) a_ch->stream->esocket->remote_addr_str: "<unknown>", l_chain_pkt->hdr.ext_id, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_NET_INVALID_ID"); // Who are you? I don't know you! go away! @@ -766,7 +754,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_WARNING, "Unauthorized request attempt from %s to network %s", a_ch->stream->esocket->remote_addr_str? a_ch->stream->esocket->remote_addr_str: "<unknown>", dap_chain_net_by_id(l_chain_pkt->hdr.net_id)->pub.name); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_NET_NOT_AUTHORIZED"); return; @@ -775,18 +763,57 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) /// --- GDB update --- // Request for gdbs list update case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ:{ + if(l_ch_chain->state != CHAIN_STATE_IDLE){ + log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_REQ request because its already busy with syncronization"); + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); + break; + } + if(s_debug_more) + log_it(L_INFO, "In: UPDATE_GLOBAL_DB_REQ pkt: net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); + l_ch_chain->state = CHAIN_STATE_UPDATE_GLOBAL_DB; + struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); + l_ch_chain->stats_request_gdb_processed = 0; + dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_update_gdb_proc_callback, l_sync_request); }break; // Response with metadata organized in TSD case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD:{ }break; - // If requested - begin to send atom hashes + // If requested - begin to recieve record's hashes case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_START:{ + if (s_debug_more) + log_it(L_INFO, "In: UPDATE_GLOBAL_DB_START pkt net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); + if(l_ch_chain->state != CHAIN_STATE_IDLE){ + log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_START request because its already busy with syncronization"); + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); + break; + } + memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_t)); l_ch_chain->state = CHAIN_STATE_UPDATE_GLOBAL_DB_REMOTE; - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + dap_stream_ch_chain_hash_item_t *l_hash_item = NULL, *l_tmp = NULL; + HASH_ITER(hh, l_ch_chain->remote_gdbs, l_hash_item, l_tmp) { + HASH_DEL(l_ch_chain->remote_gdbs, l_hash_item); + DAP_DELETE(l_hash_item); + } }break; // Response with gdb element hashes and sizes case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB:{ + if(s_debug_more) + log_it(L_INFO, "In: UPDATE_GLOBAL_DB pkt data_size=%d ", l_chain_pkt_data_size); + if (l_ch_chain->state != CHAIN_STATE_UPDATE_GLOBAL_DB_REMOTE || + memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_t))) { + log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB request because its already busy with syncronization"); + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); + break; + } for ( dap_stream_ch_chain_update_element_t * l_element =(dap_stream_ch_chain_update_element_t *) l_chain_pkt->data; (size_t) (((byte_t*)l_element) - l_chain_pkt->data ) < l_chain_pkt_data_size; l_element++){ @@ -805,43 +832,138 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } } }break; - // End of response - case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END:{ - l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS_REMOTE ; // Switch on update chains hashes - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ , - l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, - l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); - }break; + // End of response with starting of DB sync + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END: { + if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { + if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB && l_ch_chain->state != CHAIN_STATE_IDLE) { + log_it(L_WARNING, "Can't process SYNC_GLOBAL_DB request because not in idle state"); + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + "ERROR_STATE_NOT_IN_IDLE"); + break; + } + if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END && + (l_ch_chain->state != CHAIN_STATE_UPDATE_GLOBAL_DB_REMOTE || + memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_t)))) { + log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_END request because its already busy with syncronization"); + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); + break; + } + if(s_debug_more) + { + if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END) + log_it(L_INFO, "In: UPDATE_GLOBAL_DB_END pkt"); + else + log_it(L_INFO, "In: SYNC_GLOBAL_DB pkt"); + } + struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); + l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB; + dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_sync_request); + }else{ + log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + "ERROR_CHAIN_PKT_DATA_SIZE" ); + } + } break; + // first packet of data with source node address + case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB: { + if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)){ + memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size); + l_ch_chain->stats_request_gdb_processed = 0; + log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d net 0x%016x chain 0x%016x cell 0x%016x from address "NODE_ADDR_FP_STR, + l_chain_pkt_data_size, l_chain_pkt->hdr.net_id.uint64 , + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr) ); + }else { + log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB", l_chain_pkt_data_size); + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + "ERROR_CHAIN_PACKET_TYPE_FIRST_GLOBAL_DB_INCORRET_DATA_SIZE"); + } + } break; + + case DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB: { + if(s_debug_more) + log_it(L_INFO, "In: GLOBAL_DB data_size=%d ", l_chain_pkt_data_size); + // get transaction and save it to global_db + if(l_chain_pkt_data_size > 0) { + struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); + dap_chain_pkt_item_t *l_pkt_item = &l_sync_request->pkt; + l_pkt_item->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size); + memcpy(l_pkt_item->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); + l_pkt_item->pkt_data_size = l_chain_pkt_data_size; + dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_gdb_in_pkt_proc_callback, l_sync_request); + } else { + log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size"); + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + "ERROR_GLOBAL_DB_PACKET_EMPTY"); + } + } break; + + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: { + log_it(L_INFO, "In: SYNCED_GLOBAL_DB: net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); + if (!l_ch_chain->callback_notify_packet_in) { // we haven't node client waitng, so reply to other side + dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; + l_sync_gdb.id_start = dap_db_get_last_id_remote(l_sync_gdb.node_addr.uint64); + dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); + l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_sync_gdb, sizeof(l_sync_gdb)); + } + } break; + + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: { + dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; + l_sync_gdb.id_start = dap_db_get_last_id_remote(l_sync_gdb.node_addr.uint64); + dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); + l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); + log_it(L_INFO, "In: SYNC_GLOBAL_DB_RVRS pkt: net 0x%016x chain 0x%016x cell 0x%016x, request gdb sync from %u", l_chain_pkt->hdr.net_id.uint64 , + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id, l_sync_gdb.id_start ); + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_sync_gdb, sizeof(l_sync_gdb)); + } break; + + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: { + if (s_debug_more) + log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); + } break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: { + if (s_debug_more) + log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); + } break; /// --- Chains update --- // Request for atoms list update case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ:{ - if(l_ch_chain->is_on_request){ + if (l_ch_chain->state != CHAIN_STATE_IDLE) { log_it(L_WARNING, "Can't process UPDATE_CHAINS_REQ request because its already busy with syncronization"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); break; } - - l_ch_chain->is_on_request=true; - memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); - if(s_debug_more) - log_it(L_INFO, "In: UPDATE_CHAINS_REQ pkt: net 0x%016x chain 0x%016x cell 0x%016x", l_ch_chain->request_hdr.net_id.uint64 , - l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64); + log_it(L_INFO, "In: UPDATE_CHAINS_REQ pkt: net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); - if(l_chain) { - struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request); - l_sync_request->ch = a_ch; - l_sync_request->worker = a_ch->stream_worker->worker; - l_sync_request->remote_atoms = l_ch_chain->remote_atoms; - l_ch_chain->remote_atoms = NULL; - - memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request)); - memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr)); - dap_proc_queue_add_callback_inter( a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_sync_request ); + if (l_chain) { + l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS; + if(s_debug_more) + log_it(L_INFO, "Out: UPDATE_CHAINS_START pkt: net %s chain %s cell 0x%016x", l_chain->name, + l_chain->net_name, l_chain_pkt->hdr.cell_id.uint64); + l_ch_chain->request_atom_iter = l_chain->callback_atom_iter_create(l_chain); + l_chain->callback_atom_iter_get_first(l_ch_chain->request_atom_iter, NULL); + memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_hdr_t)); + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START, + l_chain_pkt->hdr.net_id.uint64,l_chain_pkt->hdr.chain_id.uint64, + l_chain_pkt->hdr.cell_id.uint64, NULL, 0); } }break; // Response with metadata organized in TSD @@ -850,34 +972,37 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) }break; // If requested - begin to send atom hashes - case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END: case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START:{ - if(s_debug_more) - log_it(L_INFO,"In: Requested update chains start"); - memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr , sizeof (l_ch_chain->request_hdr)); - - if(l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END ) - l_ch_chain->request_updates_complete = true; - l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS_REMOTE; - - dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id); - if (l_ch_chain){ - l_ch_chain->request_atom_iter = l_chain->callback_atom_iter_create(l_chain); - l_chain->callback_atom_iter_get_first(l_ch_chain->request_atom_iter, NULL); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - }else{ + if (l_ch_chain->state != CHAIN_STATE_IDLE) { + log_it(L_WARNING, "Can't process UPDATE_CHAINS_START request because its already busy with syncronization"); + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); + break; + } + dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); + if (!l_chain) { log_it(L_ERROR, "Invalid UPDATE_CHAINS_START request from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x" chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", a_ch->stream->esocket->remote_addr_str? a_ch->stream->esocket->remote_addr_str: "<unknown>", l_chain_pkt->hdr.ext_id, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_NET_INVALID_ID"); // Who are you? I don't know you! go away! a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; + break; } - - }break; + l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS_REMOTE; + memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_hdr_t)); + dap_stream_ch_chain_hash_item_t *l_hash_item = NULL, *l_tmp = NULL; + HASH_ITER(hh, l_ch_chain->remote_atoms, l_hash_item, l_tmp) { + HASH_DEL(l_ch_chain->remote_atoms, l_hash_item); + DAP_DELETE(l_hash_item); + } + if(s_debug_more) + log_it(L_INFO,"In: UPDATE_CHAINS_START pkt"); + } break; // Response with atom hashes and sizes case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS :{ @@ -890,17 +1015,13 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) a_ch->stream->esocket->remote_addr_str: "<unknown>", l_chain_pkt->hdr.ext_id, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_NET_INVALID_ID"); // Who are you? I don't know you! go away! a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; break; } - - dap_chain_atom_iter_t * l_iter = l_chain->callback_atom_iter_create(l_chain); - - for ( dap_stream_ch_chain_update_element_t * l_element =(dap_stream_ch_chain_update_element_t *) l_chain_pkt->data; (size_t) (((byte_t*)l_element) - l_chain_pkt->data ) < l_chain_pkt_data_size; l_element++){ @@ -923,176 +1044,92 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } if (s_debug_more) log_it(L_INFO,"In: Added %u from %u remote atom hash in list",l_count_added,l_count_total); - l_chain->callback_atom_iter_delete(l_iter); - }break; - // End of response - //case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END:{ - // l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS_REMOTE; // Switch on update chains hashes to remote - // dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - //}break; - - // first packet of data with source node address - case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN: { - l_ch_chain->stats_request_atoms_processed =0; - if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)){ - log_it(L_INFO, "From "NODE_ADDR_FP_STR": FIRST_CHAIN data_size=%d net 0x%016x chain 0x%016x cell 0x%016x ", - NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr), - l_chain_pkt_data_size, l_ch_chain->request_hdr.net_id.uint64 , - l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64); - }else{ - log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN", l_chain_pkt_data_size); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_CHAIN_PACKET_TYPE_FIRST_CHAIN_INCORRET_DATA_SIZE(%zd/%zd)",l_chain_pkt_data_size, sizeof(dap_chain_node_addr_t)); - } - } - break; - - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: { - log_it(L_INFO, "In: SYNCED_ALL net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: { - log_it(L_INFO, "In: SYNCED_GLOBAL_DB: net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: { - if (s_debug_more) - log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: { - if (s_debug_more) - log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - } - break; - - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { - l_ch_chain->request_updates_complete = false; - if (dap_log_level_get()<= L_INFO){ - char *l_hash_from_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_from); - char *l_hash_to_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_to); - log_it(L_INFO, "In: SYNCED_CHAINS: between %s and %s",l_hash_from_str?l_hash_from_str:"(null)", - l_hash_to_str? l_hash_to_str: "(null)"); - if(l_hash_from_str) - DAP_DELETE(l_hash_from_str); - if(l_hash_to_str) - DAP_DELETE(l_hash_to_str); - } - //l_ch_chain->state = CHAIN_STATE_IDLE; + } break; - } - break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END: case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: { - // fill ids if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { - if(l_ch_chain->is_on_request){ - log_it(L_WARNING, "Can't process SYNC_CHAINS request because its already busy with syncronization"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS && l_ch_chain->state != CHAIN_STATE_IDLE) { + log_it(L_WARNING, "Can't process SYNC_CHAINS request because not in idle state"); + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + "ERROR_STATE_NOT_IN_IDLE"); + break; + } + if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END && + (l_ch_chain->state != CHAIN_STATE_UPDATE_CHAINS_REMOTE || + memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_t)))) { + log_it(L_WARNING, "Can't process UPDATE_CHAINS_END request because its already busy with syncronization"); + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); break; } - l_ch_chain->is_on_request=true; - memcpy(&l_ch_chain->request, l_chain_pkt->data, l_chain_pkt_data_size); - memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); + dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); + if (!l_chain) { + log_it(L_ERROR, "Invalid UPDATE_CHAINS packet from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x" chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", a_ch->stream->esocket->remote_addr_str? + a_ch->stream->esocket->remote_addr_str: "<unknown>", l_chain_pkt->hdr.ext_id, + l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, + l_chain_pkt->hdr.cell_id.uint64); + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + "ERROR_NET_INVALID_ID"); + break; + } + if(s_debug_more) + { + if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END) + log_it(L_INFO, "In: UPDATE_CHAINS_END pkt"); + else + log_it(L_INFO, "In: SYNC_CHAINS pkt"); + } + struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); + l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS; char *l_hash_from_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_from); char *l_hash_to_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_to); log_it(L_INFO, "In: SYNC_CHAINS pkt: net 0x%016x chain 0x%016x cell 0x%016x between %s and %s", l_ch_chain->request_hdr.net_id.uint64 , l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, l_hash_from_str? l_hash_from_str: "(null)", l_hash_to_str?l_hash_to_str:"(null)"); - dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); - if(l_chain) { - if(l_ch_chain->state != CHAIN_STATE_IDLE) { - l_ch_chain->is_on_request=false; - log_it(L_INFO, "Can't process SYNC_CHAINS request between %s and %s because not in idle state", - l_hash_from_str? l_hash_from_str:"(null)", - l_hash_to_str?l_hash_to_str:"(null)"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_STATE_NOT_IN_IDLE"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } else { - struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request); - l_sync_request->ch = a_ch; - l_sync_request->worker = a_ch->stream_worker->worker; - memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request)); - memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr)); - dap_proc_queue_add_callback_inter( a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_sync_request ); - } - } DAP_DELETE(l_hash_from_str); DAP_DELETE(l_hash_to_str); - }else{ + dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_sync_request); + } else { log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_CHAIN_PKT_DATA_SIZE" ); } - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: { - if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { - if(l_ch_chain->state != CHAIN_STATE_IDLE) { - log_it(L_WARNING, "Can't process SYNC_GLOBAL_DB request because not in idle state"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_STATE_NOT_IN_IDLE"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - break; - } else { // receive the latest global_db revision of the remote node -> go to send mode - if(l_ch_chain->is_on_request){ - log_it(L_WARNING, "Can't process SYNC_CHAINS request because its already busy with syncronization"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - break; - } - l_ch_chain->is_on_request = true; - dap_stream_ch_chain_sync_request_t * l_request = - (dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data; - memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size); - memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); - log_it(L_INFO, "In: SYNC_GLOBAL_DB pkt: net 0x%016x chain 0x%016x cell 0x%016x, range between %u and %u", - l_ch_chain->request_hdr.net_id.uint64 , l_ch_chain->request_hdr.chain_id.uint64, - l_ch_chain->request_hdr.cell_id.uint64, l_ch_chain->request.id_start, l_ch_chain->request.id_end ); - - dap_stream_ch_chain_create_sync_request_gdb(l_ch_chain,dap_chain_net_by_id(l_ch_chain->request_hdr.net_id) ); - } + } break; + // first packet of data with source node address + case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN: { + if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)){ + memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_hdr_t)); + memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, sizeof(dap_chain_node_addr_t)); + log_it(L_INFO, "From "NODE_ADDR_FP_STR": FIRST_CHAIN data_size=%d net 0x%016x chain 0x%016x cell 0x%016x ", + NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr), + l_chain_pkt_data_size, l_ch_chain->request_hdr.net_id.uint64 , + l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64); + l_ch_chain->stats_request_atoms_processed = 0; }else{ - log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN", l_chain_pkt_data_size); + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_CHAIN_PKT_DATA_SIZE" ); + "ERROR_CHAIN_PACKET_TYPE_FIRST_CHAIN_INCORRET_DATA_SIZE"); } - } - break; + } break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: { if(l_chain_pkt_data_size) { dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if(l_chain) { // Expect atom element in if(l_chain_pkt_data_size > 0) { - dap_chain_pkt_item_t *l_pkt_item = DAP_NEW_Z(dap_chain_pkt_item_t); - memcpy(&l_pkt_item->pkt_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); + struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); + dap_chain_pkt_item_t *l_pkt_item = &l_sync_request->pkt; l_pkt_item->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size); memcpy(l_pkt_item->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); l_pkt_item->pkt_data_size = l_chain_pkt_data_size; - - struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request); - l_sync_request->ch = a_ch; - l_sync_request->worker = a_ch->stream_worker->worker; - memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request)); - memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr)); - - l_sync_request->pkt_list = dap_list_append(l_sync_request->pkt_list, l_pkt_item); if (s_debug_more){ dap_chain_hash_fast_t l_atom_hash={0}; dap_hash_fast(l_chain_pkt->data, l_chain_pkt_data_size ,&l_atom_hash); @@ -1103,68 +1140,43 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_in_chains_callback, l_sync_request); } else { log_it(L_WARNING, "Empty chain packet"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_CHAIN_PACKET_EMPTY"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); } } } - } - break; - // first packet of data with source node address - case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB: - if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)){ - memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size); - log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d net 0x%016x chain 0x%016x cell 0x%016x from address "NODE_ADDR_FP_STR, - l_chain_pkt_data_size, l_chain_pkt->hdr.net_id.uint64 , - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr) ); - }else { - log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB", l_chain_pkt_data_size); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_CHAIN_PACKET_TYPE_FIRST_GLOBAL_DB_INCORRET_DATA_SIZE"); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB: { - if(s_debug_more) - log_it(L_INFO, "In: GLOBAL_DB data_size=%d ", l_chain_pkt_data_size); - // get transaction and save it to global_db - if(l_chain_pkt_data_size > 0) { - dap_chain_pkt_item_t *l_pkt_item = DAP_NEW_Z(dap_chain_pkt_item_t); - memcpy(&l_pkt_item->pkt_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); - l_pkt_item->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size); - memcpy(l_pkt_item->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); - l_pkt_item->pkt_data_size = l_chain_pkt_data_size; + } break; - struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request); - l_sync_request->ch = a_ch; - l_sync_request->worker = a_ch->stream_worker->worker; - memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request)); - memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr)); - l_sync_request->pkt_list = dap_list_append(l_sync_request->pkt_list, l_pkt_item); - dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_gdb_in_pkt_proc_callback, l_sync_request); - } else { - log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - "ERROR_GLOBAL_DB_PACKET_EMPTY"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { + if (dap_log_level_get()<= L_INFO){ + char *l_hash_from_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_from); + char *l_hash_to_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_to); + log_it(L_INFO, "In: SYNCED_CHAINS: between %s and %s",l_hash_from_str?l_hash_from_str:"(null)", + l_hash_to_str? l_hash_to_str: "(null)"); + if(l_hash_from_str) + DAP_DELETE(l_hash_from_str); + if(l_hash_to_str) + DAP_DELETE(l_hash_to_str); } - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: { - dap_stream_ch_chain_sync_request_t l_sync_gdb = {0}; - memcpy(&l_sync_gdb, l_chain_pkt->data, l_chain_pkt_data_size); - l_sync_gdb.id_start = dap_db_get_last_id_remote(l_sync_gdb.node_addr.uint64); - dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); - l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); - log_it(L_INFO, "In: SYNC_GLOBAL_DB_RVRS pkt: net 0x%016x chain 0x%016x cell 0x%016x, request gdb sync from %u", l_chain_pkt->hdr.net_id.uint64 , - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id, l_sync_gdb.id_start ); - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_sync_gdb, sizeof(l_sync_gdb)); - } - break; + if (!l_ch_chain->callback_notify_packet_in) { // we haven't node client waitng, so reply to other side + dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); + if (!l_chain) { + log_it(L_ERROR, "Invalid UPDATE_CHAINS packet from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x" chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", a_ch->stream->esocket->remote_addr_str? + a_ch->stream->esocket->remote_addr_str: "<unknown>", l_chain_pkt->hdr.ext_id, + l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, + l_chain_pkt->hdr.cell_id.uint64); + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + "ERROR_NET_INVALID_ID"); + break; + } + dap_stream_ch_chain_sync_request_t l_request= {}; + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_request, sizeof(l_request)); + } + } break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: { if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { dap_stream_ch_chain_sync_request_t l_request={0}; @@ -1183,12 +1195,12 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } }else{ log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_CHAIN_PKT_DATA_SIZE" ); } - } - break; + } break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR:{ char * l_error_str = (char*)l_chain_pkt->data; if(l_chain_pkt_data_size>1) @@ -1196,9 +1208,15 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) // without trailing zero log_it(L_WARNING,"In: ERROR packet: '%s'",l_ch_chain->node_client, l_chain_pkt_data_size>1? l_error_str:"<empty>"); - }break; + } break; + + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: { + log_it(L_INFO, "In: SYNCED_ALL net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); + } break; + default: { - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_UNKNOWN_CHAIN_PKT_TYPE"); } @@ -1215,7 +1233,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) */ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain) { - a_ch_chain->is_on_request = false; a_ch_chain->state = CHAIN_STATE_IDLE; if(s_debug_more) log_it(L_INFO, "Go in CHAIN_STATE_IDLE"); @@ -1238,6 +1255,7 @@ static void s_process_gdb_iter(dap_stream_ch_t *a_ch) dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs; dap_store_obj_pkt_t *l_pkt = (dap_store_obj_pkt_t *)l_ch_chain->request_db_iter->data; uint32_t l_pkt_size = sizeof(dap_store_obj_pkt_t) + l_pkt->data_size; + // TODO find current record hash and compare it with hash table if( s_debug_more) log_it(L_INFO, "Send one global_db record packet len=%d (rest=%d/%d items)", l_pkt_size, dap_db_log_list_get_count_rest(l_db_list), dap_db_log_list_get_count(l_db_list)); @@ -1267,6 +1285,33 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); switch (l_ch_chain->state) { + case CHAIN_STATE_UPDATE_GLOBAL_DB: { + if (l_ch_chain->stats_request_gdb_processed == l_ch_chain->local_gdbs_count) { + dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; + l_sync_gdb.id_start = dap_db_get_last_id_remote(l_ch_chain->request.node_addr.uint64); + dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_hdr.net_id); + l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); + dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END, + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, &l_sync_gdb, sizeof(dap_stream_ch_chain_sync_request_t)); + if (s_debug_more ) + log_it(L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END"); + dap_stream_ch_chain_go_idle(l_ch_chain); + } else { + uint_fast16_t l_count = l_ch_chain->local_gdbs_count - l_ch_chain->stats_request_gdb_processed; + if (l_count > s_update_pack_size) + l_count = s_update_pack_size; + dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB, + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, + &l_ch_chain->local_gdbs[l_ch_chain->stats_request_gdb_processed], + l_count * sizeof(dap_stream_ch_chain_update_element_t)); + l_ch_chain->stats_request_gdb_processed += l_count; + if (s_debug_more) + log_it(L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB"); + } + } break; + // Synchronize GDB case CHAIN_STATE_SYNC_GLOBAL_DB: { if (l_ch_chain->request_db_iter) { @@ -1291,7 +1336,6 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) log_it( L_INFO,"Syncronized database: last id %llu, items syncronyzed %llu ", dap_db_log_get_last_id(), l_ch_chain->stats_request_gdb_processed ); // last message - l_ch_chain->is_on_request = false; dap_stream_ch_chain_sync_request_t l_request = {}; dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, @@ -1305,8 +1349,9 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) } break; // Update list of atoms to remote - case CHAIN_STATE_UPDATE_CHAINS_REMOTE:{ - dap_stream_ch_chain_update_element_t * l_data= DAP_NEW_Z_SIZE(dap_stream_ch_chain_update_element_t,sizeof (dap_stream_ch_chain_update_element_t)*s_update_pack_size); + case CHAIN_STATE_UPDATE_CHAINS:{ + dap_stream_ch_chain_update_element_t *l_data = DAP_NEW_Z_SIZE(dap_stream_ch_chain_update_element_t, + sizeof(dap_stream_ch_chain_update_element_t) * s_update_pack_size); size_t l_data_size=0; for(uint_fast16_t n=0; n<s_update_pack_size && (l_ch_chain->request_atom_iter && l_ch_chain->request_atom_iter->cur);n++){ memcpy(&l_data[n].hash, l_ch_chain->request_atom_iter->cur_hash, sizeof (l_data[n].hash)); @@ -1327,22 +1372,16 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) if(!l_data_size || !l_ch_chain->request_atom_iter){ // We over with all the hashes here if(s_debug_more) log_it(L_INFO,"Out: UPDATE_CHAINS_END sent "); - - if (l_ch_chain->request_updates_complete){ - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ , + dap_stream_ch_chain_sync_request_t l_request = {}; + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, - NULL,0); - }else - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END, - l_ch_chain->request_hdr.net_id.uint64, - l_ch_chain->request_hdr.chain_id.uint64, - l_ch_chain->request_hdr.cell_id.uint64, - NULL,0); - dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); - l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS; + &l_request, sizeof(dap_stream_ch_chain_sync_request_t)); + dap_stream_ch_chain_go_idle(l_ch_chain); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, false); } + DAP_DELETE(l_data); }break; // Synchronize chains @@ -1398,8 +1437,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) // Then get next atom and populate new last l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); } - if(!l_ch_chain->request_atom_iter || - ( l_ch_chain->request_atom_iter &&(! l_ch_chain->request_atom_iter->cur) ) ) { // All chains synced + if(!l_ch_chain->request_atom_iter || !l_ch_chain->request_atom_iter->cur) { // All chains synced dap_stream_ch_chain_sync_request_t l_request = {0}; // last message l_was_sent_smth = true; diff --git a/modules/channel/chain/dap_stream_ch_chain_pkt.c b/modules/channel/chain/dap_stream_ch_chain_pkt.c index c97d1f1e663bd9b2fc4835bcb88d613dbd4dd766..c819d5ea0ce264969e74e77a8f53d49c3acc505b 100644 --- a/modules/channel/chain/dap_stream_ch_chain_pkt.c +++ b/modules/channel/chain/dap_stream_ch_chain_pkt.c @@ -49,7 +49,7 @@ dap_stream_ch_chain_state_t dap_stream_ch_chain_pkt_type_to_dap_stream_ch_chain_ * @param data_size * @return */ -size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type,uint64_t a_net_id, +size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size) { diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index 06293450d584db323b7ee403088ba03caa35ae04..07cd345916bd4ac0b1567405d083743aa4539418 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -34,8 +34,6 @@ #include "dap_stream_ch_chain_pkt.h" #include "uthash.h" -#define DAP_CHAIN_PKT_MAX_SIZE 25000 // WARNING: be sure to not exceed this limit - typedef struct dap_stream_ch_chain dap_stream_ch_chain_t; typedef void (*dap_stream_ch_chain_callback_packet_t)(dap_stream_ch_chain_t*, uint8_t a_pkt_type, dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, @@ -48,7 +46,6 @@ typedef struct dap_chain_atom_item{ } dap_chain_atom_item_t; typedef struct dap_chain_pkt_item { - dap_stream_ch_chain_pkt_hdr_t pkt_hdr; uint64_t pkt_data_size; byte_t *pkt_data; } dap_chain_pkt_item_t; @@ -69,6 +66,8 @@ typedef struct dap_stream_ch_chain { uint64_t stats_request_gdb_processed; + dap_stream_ch_chain_update_element_t *local_gdbs; + uint64_t local_gdbs_count; dap_stream_ch_chain_hash_item_t * remote_atoms; // Remote atoms dap_stream_ch_chain_hash_item_t * remote_gdbs; // Remote gdbs diff --git a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h index 9c8e352409c16ff090bb168eff77ad9d19c5ebd9..f510cec825ea8cdbea40108cda501c9cc19d2525 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h +++ b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h @@ -36,11 +36,10 @@ #include "dap_stream_ch.h" -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN 0x20 - #define DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN 0x01 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB 0x11 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN 0x20 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB 0x21 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP 0x31 diff --git a/modules/global-db/dap_chain_global_db_hist.c b/modules/global-db/dap_chain_global_db_hist.c index 5b280698bf1fac474ee8d57e7503fceca03d9e1a..1429d6238ece3fa409c65261baa565bc25d6d27a 100644 --- a/modules/global-db/dap_chain_global_db_hist.c +++ b/modules/global-db/dap_chain_global_db_hist.c @@ -1338,7 +1338,8 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr l_add_groups_mask = dap_list_next(l_add_groups_mask); } - size_t l_data_size_out_main = dap_chain_global_db_driver_count(GROUP_LOCAL_HISTORY, first_id); + size_t l_data_size_out_main = dap_db_log_get_last_id() - first_id + 1; + //dap_chain_global_db_driver_count(GROUP_LOCAL_HISTORY, first_id); - not working for sqlite size_t *l_data_size_out_add_items = DAP_NEW_Z_SIZE(size_t, sizeof(size_t) * l_add_groups_num); uint64_t *l_group_last_id = DAP_NEW_Z_SIZE(uint64_t, sizeof(uint64_t) * l_add_groups_num); char **l_group_names = DAP_NEW_Z_SIZE(char*, sizeof(char*) * l_add_groups_num); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index efaa9ffe1542e594db802fab624a5c0491a664da..9a7bcd33518249767d8cd1455210f2a9f282475f 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -23,6 +23,17 @@ along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. */ + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#ifndef _XOPEN_SOURCE +#define _XOPEN_SOURCE /* See feature_test_macros(7) */ +#endif +#ifndef __USE_XOPEN +#define __USE_XOPEN +#endif +#include <time.h> #include <stdio.h> #include <stdlib.h> #include <stddef.h> @@ -95,13 +106,6 @@ #include <sys/types.h> #include <dirent.h> -#define _XOPEN_SOURCE /* See feature_test_macros(7) */ -#ifndef __USE_XOPEN -#define __USE_XOPEN -#endif -#define _GNU_SOURCE -#include <time.h> - #define LOG_TAG "chain_net" #define F_DAP_CHAIN_NET_SYNC_FROM_ZERO ( 1 << 8 ) @@ -290,7 +294,6 @@ int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_n PVT(a_net)->state_target = a_new_state; pthread_mutex_lock( &PVT(a_net)->state_mutex_cond); // Preventing call of state_go_to before wait cond will be armed - pthread_mutex_unlock( &PVT(a_net)->state_mutex_cond); // set flag for sync PVT(a_net)->flags |= F_DAP_CHAIN_NET_GO_SYNC; #ifndef _WIN32 @@ -298,6 +301,8 @@ int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_n #else SetEvent( PVT(a_net)->state_proc_cond ); #endif + pthread_mutex_unlock( &PVT(a_net)->state_mutex_cond); + dap_proc_queue_add_callback(dap_events_worker_get_auto(), s_net_states_proc, a_net); return 0; } @@ -477,7 +482,7 @@ static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_clie if(a_node_client->ch_chain_net) a_node_client->ch_chain_net_uuid = a_node_client->ch_chain_net->uuid; dap_stream_ch_chain_pkt_write_unsafe( a_node_client->ch_chain , - DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id.uint64, + DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ, l_net->pub.id.uint64, l_chain_id.uint64, l_net->pub.cell_id.uint64, &l_sync_gdb, sizeof(l_sync_gdb)); } a_node_client->is_reconnecting = false; @@ -770,6 +775,11 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) assert(l_net); dap_chain_net_pvt_t *l_net_pvt = PVT(l_net); assert(l_net_pvt); + if (l_net_pvt->state_target == NET_STATE_OFFLINE) { + l_net_pvt->state = NET_STATE_OFFLINE; + return true; + } + pthread_rwlock_wrlock(&l_net_pvt->rwlock); switch (l_net_pvt->state) { @@ -1335,23 +1345,21 @@ static int s_cli_net( int argc, char **argv, void *arg_func, char **a_str_reply) } } else if ( l_go_str){ if ( strcmp(l_go_str,"online") == 0 ) { + dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" going from state %s to %s", + l_net->pub.name,c_net_states[PVT(l_net)->state], + c_net_states[NET_STATE_ONLINE]); dap_chain_net_state_go_to(l_net, NET_STATE_ONLINE); - dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" go from state %s to %s", - l_net->pub.name,c_net_states[PVT(l_net)->state], - c_net_states[PVT(l_net)->state_target]); } else if ( strcmp(l_go_str,"offline") == 0 ) { + dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" going from state %s to %s", + l_net->pub.name,c_net_states[PVT(l_net)->state], + c_net_states[NET_STATE_OFFLINE]); dap_chain_net_state_go_to(l_net, NET_STATE_OFFLINE); - dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" go from state %s to %s", - l_net->pub.name,c_net_states[PVT(l_net)->state], - c_net_states[PVT(l_net)->state_target]); } else if(strcmp(l_go_str, "sync") == 0) { + dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" resynchronizing", + l_net->pub.name); dap_chain_net_state_go_to(l_net, NET_STATE_SYNC_GDB); - dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" go from state %s to %s", - l_net->pub.name, c_net_states[PVT(l_net)->state], - c_net_states[PVT(l_net)->state_target]); - } } else if ( l_get_str){ @@ -2023,6 +2031,8 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) return -2; } // Do specific role actions post-chain created + l_net_pvt->state_target = NET_STATE_OFFLINE; + dap_chain_net_state_t l_target_state = NET_STATE_OFFLINE; switch ( l_net_pvt->node_role.enums ) { case NODE_ROLE_ROOT_MASTER:{ // Set to process everything in datum pool @@ -2037,7 +2047,7 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) if (l_chain ) l_chain->is_datum_pool_proc = true; - l_net_pvt->state_target = NET_STATE_ONLINE; + l_target_state = NET_STATE_ONLINE; log_it(L_INFO,"Root node role established"); } break; case NODE_ROLE_CELL_MASTER: @@ -2062,12 +2072,12 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) // DAP_DELETE (l_proc_chains); //l_proc_chains = NULL; - l_net_pvt->state_target = NET_STATE_ONLINE; + l_target_state = NET_STATE_ONLINE; log_it(L_INFO,"Master node role established"); } break; case NODE_ROLE_FULL:{ log_it(L_INFO,"Full node role established"); - l_net_pvt->state_target = NET_STATE_ONLINE; + l_target_state = NET_STATE_ONLINE; } break; case NODE_ROLE_LIGHT: default: @@ -2076,19 +2086,16 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) } if (s_seed_mode || !dap_config_get_item_bool_default(g_config ,"general", "auto_online",false ) ) { // If we seed we do everything manual. First think - prefil list of node_addrs and its aliases - l_net_pvt->state_target = NET_STATE_OFFLINE; - }else{ - l_net_pvt->state = NET_STATE_LINKS_PREPARE; + l_target_state = NET_STATE_OFFLINE; } - l_net_pvt->load_mode = false; - l_net_pvt->flags |= F_DAP_CHAIN_NET_GO_SYNC; + + if (l_target_state != l_net_pvt->state_target) + dap_chain_net_state_go_to(l_net, l_target_state); // Start the proc thread log_it(L_NOTICE, "Сhain network \"%s\" initialized",l_net_item->name); - - dap_proc_queue_add_callback(dap_events_worker_get_auto(), s_net_states_proc,l_net ); dap_config_close(l_cfg); } return 0; diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index c7154e822196f830532c5d72625327352f1ed13a..f51c0fabeb3070c2bc9e1de770c10b88e8a8af08 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -223,7 +223,7 @@ static bool s_timer_update_states_callback(void * a_arg ) dap_chain_t *l_chain = l_net->pub.chains; dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t ) {0}; dap_stream_ch_chain_pkt_write_unsafe( l_node_client->ch_chain , - DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id.uint64, + DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ, l_net->pub.id.uint64, l_chain_id.uint64, l_net->pub.cell_id.uint64, &l_sync_gdb, sizeof(l_sync_gdb)); @@ -357,128 +357,92 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha SetEvent( l_node_client->wait_cond ); #endif break; - - case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END:{ - dap_chain_net_t * l_net = l_node_client->net; - assert(l_net); - if (s_stream_ch_chain_debug_more) - log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" UPDATE_CHAINS_END: %zd hashes on remote", - l_net->pub.name, - NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr ) - ); - }break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START:{ - dap_chain_net_t * l_net = l_node_client->net; - assert(l_net); - dap_chain_net_set_state(l_net, NET_STATE_SYNC_CHAINS ); + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ:{ + l_node_client->state = NODE_CLIENT_STATE_SYNC_GDB_UPDATES; }break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB : case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_START:{ + l_node_client->state = NODE_CLIENT_STATE_SYNC_GDB_RVRS; dap_chain_net_t * l_net = l_node_client->net; assert(l_net); - dap_chain_net_set_state(l_net, NET_STATE_SYNC_GDB ); + dap_chain_net_set_state(l_net, NET_STATE_SYNC_GDB); }break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB:{ + case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB:{ + l_node_client->state = NODE_CLIENT_STATE_SYNC_GDB; + }break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ:{ + l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES; + }break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START:{ + l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_RVRS; dap_chain_net_t * l_net = l_node_client->net; assert(l_net); - if(s_stream_ch_chain_debug_more) - log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" synced GDB, request reverse sync", l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr )); - - l_node_client->state = NODE_CLIENT_STATE_SYNC_GDB_RVRS ; - a_ch_chain->is_on_reverse_request = true; - - dap_stream_ch_chain_create_sync_request_gdb(a_ch_chain, l_node_client->net); - + dap_chain_net_set_state(l_net, NET_STATE_SYNC_CHAINS); + }break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN:{ + l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS; }break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { - dap_stream_ch_chain_sync_request_t * l_request = NULL; - if(a_pkt_data_size == sizeof(*l_request)) - l_request = (dap_stream_ch_chain_sync_request_t*) a_pkt->data; - - if(l_request) { - // Process it if need - } - - // Check if we over with it before - if ( ! l_node_client->cur_cell ){ - if(s_stream_ch_chain_debug_more) - log_it(L_INFO, "In: No current cell in sync state, anyway we over it"); - }else - l_node_client->cur_cell =(dap_chain_cell_t *) l_node_client->cur_cell->hh.next; - - dap_chain_net_t * l_net = l_node_client->net; + dap_chain_net_t *l_net = l_node_client->net; assert(l_net); - dap_chain_node_addr_t * l_node_addr = dap_chain_net_get_cur_addr(l_net); - - // If over with cell, switch on next chain - if ( l_node_client->cur_cell){ - // Check if we over with it before - if ( !l_node_client->cur_chain ){ - log_it(L_ERROR, "In: No chain but cell is present, over wit it"); - }else{ - dap_chain_id_t l_chain_id=l_node_client->cur_chain->id; - dap_chain_cell_id_t l_cell_id = l_node_client->cur_cell->id; - l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES; - dap_stream_ch_chain_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START, - l_net->pub.id.uint64 , - l_chain_id.uint64,l_cell_id.uint64,NULL,0); - } - }else{ + dap_chain_id_t l_chain_id = {}; + dap_chain_cell_id_t l_cell_id = {}; + if (a_pkt_type == DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB) { + if(s_stream_ch_chain_debug_more) + log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" synced GDB. Going to update chains", l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr )); + // TODO check if target net state == NET_STATE_SYNC_GDB to not synchronize chains, if it + l_node_client->cur_chain = l_net->pub.chains; + l_node_client->cur_cell = l_node_client->cur_chain ? l_node_client->cur_chain->cells : NULL; + } else { // Check if we over with it before - if ( !l_node_client->cur_chain ){ - log_it(L_WARNING, "In: No current chain in sync state, anyway we over it"); + if ( ! l_node_client->cur_cell ){ + if(s_stream_ch_chain_debug_more) + log_it(L_INFO, "In: No current cell in sync state, anyway we over it"); + }else + l_node_client->cur_cell =(dap_chain_cell_t *) l_node_client->cur_cell->hh.next; + + // If over with cell, switch on next chain + if ( l_node_client->cur_cell){ + // Check if we over with it before + if ( !l_node_client->cur_chain ){ + log_it(L_ERROR, "In: No chain but cell is present, over with it"); + } }else{ - l_node_client->cur_chain = (dap_chain_t *) l_node_client->cur_chain->next; - l_node_client->cur_cell = l_node_client->cur_chain? l_node_client->cur_chain->cells : NULL; - } - dap_chain_id_t l_chain_id={0}; - dap_chain_cell_id_t l_cell_id = {0}; - - if (l_node_client->cur_cell) - l_cell_id = l_node_client->cur_cell->id; - - // Check if we have some more chains and cells in it to sync - if( l_node_client->cur_chain ){ - l_chain_id=l_node_client->cur_chain->id; - - l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES; - if (s_stream_ch_chain_debug_more) - log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" started to sync %s chain",l_net->pub.name, - NODE_ADDR_FP_ARGS(l_node_addr), l_node_client->cur_chain->name ); - - dap_stream_ch_chain_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START, - l_net->pub.id.uint64 , - l_chain_id.uint64,l_cell_id.uint64,NULL,0); - }else{ // If no - over with sync process - log_it(L_INFO, "In: State node %s."NODE_ADDR_FP_STR" is SYNCED, init reverse SYNC",l_net->pub.name, NODE_ADDR_FP_ARGS(l_node_addr) ); - l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_RVRS; - dap_chain_net_set_state(l_net, NET_STATE_ONLINE); - #ifndef _WIN32 - pthread_cond_broadcast(&l_node_client->wait_cond); - #else - SetEvent( l_node_client->wait_cond ); - #endif - a_ch_chain->state = CHAIN_STATE_SYNC_CHAINS ; - - if (l_node_client->net->pub.chains){ - dap_chain_t * l_chain = l_node_client->net->pub.chains; - a_ch_chain->request_atom_iter = l_chain->callback_atom_iter_create(l_chain); - size_t l_first_size = 0; - l_chain->callback_atom_iter_get_first(a_ch_chain->request_atom_iter, &l_first_size); + // Check if we over with it before + if ( !l_node_client->cur_chain ){ + log_it(L_WARNING, "In: No current chain in sync state, anyway we over it"); + }else{ + l_node_client->cur_chain = (dap_chain_t *) l_node_client->cur_chain->next; + l_node_client->cur_cell = l_node_client->cur_chain ? l_node_client->cur_chain->cells : NULL; } - - - dap_stream_ch_set_ready_to_write_unsafe(a_ch_chain->ch, true); } } + if (l_node_client->cur_cell) + l_cell_id = l_node_client->cur_cell->id; + // Check if we have some more chains and cells in it to sync + if( l_node_client->cur_chain ){ + l_chain_id=l_node_client->cur_chain->id; + if (s_stream_ch_chain_debug_more) { + dap_chain_node_addr_t * l_node_addr = dap_chain_net_get_cur_addr(l_net); + log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" started to sync %s chain",l_net->pub.name, + NODE_ADDR_FP_ARGS(l_node_addr), l_node_client->cur_chain->name ); + } + dap_stream_ch_chain_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ, + l_net->pub.id.uint64 , + l_chain_id.uint64,l_cell_id.uint64,NULL,0); + }else{ // If no - over with sync process + dap_chain_node_addr_t * l_node_addr = dap_chain_net_get_cur_addr(l_net); + log_it(L_INFO, "In: State node %s."NODE_ADDR_FP_STR" is SYNCED",l_net->pub.name, NODE_ADDR_FP_ARGS(l_node_addr) ); + l_node_client->state = NODE_CLIENT_STATE_SYNCED; + dap_chain_net_set_state(l_net, NET_STATE_ONLINE); +#ifndef _WIN32 + pthread_cond_broadcast(&l_node_client->wait_cond); +#else + SetEvent( l_node_client->wait_cond ); +#endif + } } break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL:{ - dap_chain_net_t * l_net = l_node_client->net; - assert(l_net); - l_node_client->state = NODE_CLIENT_STATE_SYNCED; - dap_chain_net_set_state(l_net, NET_STATE_ONLINE); - }break; default: break; } } @@ -503,62 +467,13 @@ static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t* a_ch_ch dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; switch (a_pkt_type) { - - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB:{ - if (a_ch_chain->is_on_reverse_request){ - a_ch_chain->is_on_reverse_request = false; - dap_chain_net_t * l_net = l_node_client->net; - // We over with GLOBAL_DB and switch on syncing chains - l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES; - - // Begin from the first chain - l_node_client->cur_chain = l_node_client->net->pub.chains; - dap_chain_cell_id_t l_cell_id={0}; - dap_chain_id_t l_chain_id={0}; - - - if(! l_node_client->cur_chain){ - log_it(L_CRITICAL,"In: Can't sync chains for %s because there is no chains in it",l_net->pub.name); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch_chain->ch,l_net->pub.id.uint64, - l_chain_id.uint64,l_cell_id.uint64,"ERROR_CHAIN_NO_CHAINS"); - l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES ; - - }else{ // If present - select the first one cell in chain - l_chain_id=l_node_client->cur_chain->id; - dap_chain_cell_t * l_cell = l_node_client->cur_chain->cells; - if (l_cell){ - l_cell_id=l_cell->id; - } - uint64_t l_net_id = l_net->pub.id.uint64; - - dap_stream_ch_chain_pkt_t * l_chain_pkt; - size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr); - l_chain_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size ); - l_chain_pkt->hdr.version = 1; - l_chain_pkt->hdr.net_id.uint64 = l_net_id; - l_chain_pkt->hdr.cell_id.uint64 = l_cell_id.uint64; - l_chain_pkt->hdr.chain_id.uint64 = l_chain_id.uint64; - dap_stream_ch_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START , - l_chain_pkt,l_chain_pkt_size); - DAP_DELETE(l_chain_pkt); - log_it(L_INFO, - "In: Send UPDATE_CHAINS_START: net_id=0x%016x chain_id=0x%016x cell_id=0x%016x ", - l_net_id,l_chain_id.uint64,l_cell_id.uint64 - ); - } - } - }break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: { + if(s_stream_ch_chain_debug_more) + log_it(L_INFO,"Out: global database sent to uplink "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr)); + } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { if(s_stream_ch_chain_debug_more) - log_it(L_INFO,"Out: chains all sent to uplink "NODE_ADDR_FP_STR,NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr)); - l_node_client->state = NODE_CLIENT_STATE_SYNCED ; - #ifndef _WIN32 - pthread_cond_broadcast(&l_node_client->wait_cond); - #else - SetEvent( l_node_client->wait_cond ); - #endif - a_ch_chain->is_on_reverse_request = false; + log_it(L_INFO,"Out: chain %x sent to uplink "NODE_ADDR_FP_STR, l_node_client->cur_chain->id, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr)); }break; default: { }