From c9609480bc042258d3712a919e0b0dd033857188 Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net> Date: Sun, 24 Jan 2021 23:38:16 +0700 Subject: [PATCH] ... --- modules/chain/include/dap_chain.h | 2 +- modules/channel/chain/dap_stream_ch_chain.c | 88 ++++++++++++-------- modules/global-db/dap_chain_global_db.c | 13 ++- modules/net/CMakeLists.txt | 3 +- modules/net/dap_chain_node_cli.c | 11 ++- modules/net/dap_chain_node_cli_cmd.c | 48 ++++++++++- modules/net/dap_chain_node_client.c | 1 - modules/net/include/dap_chain_node_cli_cmd.h | 4 +- modules/type/dag/dap_chain_cs_dag.c | 5 ++ 9 files changed, 122 insertions(+), 53 deletions(-) diff --git a/modules/chain/include/dap_chain.h b/modules/chain/include/dap_chain.h index 9f2d849bd2..154702a44d 100644 --- a/modules/chain/include/dap_chain.h +++ b/modules/chain/include/dap_chain.h @@ -46,7 +46,7 @@ typedef void * dap_chain_atom_ptr_t; typedef struct dap_chain_atom_iter{ dap_chain_t * chain; dap_chain_atom_ptr_t cur; - dap_chain_hash_fast_t cur_hash; + dap_chain_hash_fast_t *cur_hash; size_t cur_size; void * cur_item; void * _inheritor; diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index ea5b4f9f23..f0e739c476 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -115,6 +115,7 @@ static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_thread, void *a_a static bool s_debug_more=false; static uint_fast16_t s_update_pack_size=100; // Number of hashes packed into the one packet +static uint_fast16_t s_skip_in_reactor_count=10; // Number of hashes packed to skip in one reactor loop callback out packet /** * @brief dap_stream_ch_chain_init * @return @@ -854,11 +855,29 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS :{ uint l_count_added=0; uint l_count_total=0; + + dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id); + if (! l_chain){ + log_it(L_ERROR, "Invalid UPDATE_CHAINS request from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x" chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", a_ch->stream->esocket->remote_addr_str? + a_ch->stream->esocket->remote_addr_str: "<unknown>", l_chain_pkt->hdr.ext_id, + l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, + l_chain_pkt->hdr.cell_id.uint64); + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + "ERROR_NET_INVALID_ID"); + // Who are you? I don't know you! go away! + a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; + break; + } + + dap_chain_atom_iter_t * l_iter = l_chain->callback_atom_iter_create(l_chain); + + for ( dap_stream_ch_chain_update_element_t * l_element =(dap_stream_ch_chain_update_element_t *) l_chain_pkt->data; (size_t) (((byte_t*)l_element) - l_chain_pkt->data ) < l_chain_pkt_data_size; l_element++){ dap_stream_ch_chain_hash_item_t * l_hash_item = NULL; - HASH_FIND(hh,l_ch_chain->remote_atoms, &l_element->hash, sizeof (l_element->hash), l_hash_item ); + HASH_FIND(hh,l_ch_chain->remote_atoms , &l_element->hash, sizeof (l_element->hash), l_hash_item ); if( ! l_hash_item ){ l_hash_item = DAP_NEW(dap_stream_ch_chain_hash_item_t); memcpy(&l_hash_item->hash, &l_element->hash, sizeof (l_element->hash)); @@ -876,7 +895,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } if (s_debug_more) log_it(L_INFO,"In: Added %u from %u remote atom hash in list",l_count_added,l_count_total); - + l_chain->callback_atom_iter_delete(l_iter); }break; // End of response //case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END:{ @@ -1259,9 +1278,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_update_element_t * l_data= DAP_NEW_Z_SIZE(dap_stream_ch_chain_update_element_t,sizeof (dap_stream_ch_chain_update_element_t)*s_update_pack_size); size_t l_data_size=0; for(uint_fast16_t n=0; n<s_update_pack_size && (l_ch_chain->request_atom_iter && l_ch_chain->request_atom_iter->cur);n++){ - // If present smth - hash it - dap_hash_fast(l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size,&l_data[n].hash); - l_data[n].size=l_ch_chain->request_atom_iter->cur_size; + memcpy(&l_data[n].hash, l_ch_chain->request_atom_iter->cur_hash, sizeof (l_data[n].hash)); // Shift offset counter l_data_size += sizeof (dap_stream_ch_chain_update_element_t); // Then get next atom @@ -1302,39 +1319,40 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) if (l_ch_chain->request_atom_iter && l_ch_chain->request_atom_iter->cur) { // Process one chain from l_ch_chain->request_atom_iter - // Check if present and skip if present + // Pack loop to skip quicker + for(uint_fast16_t k=0; k<s_skip_in_reactor_count; k++){ + // Check if present and skip if present + dap_stream_ch_chain_hash_item_t * l_hash_item = NULL; + HASH_FIND(hh,l_ch_chain->remote_atoms, l_ch_chain->request_atom_iter->cur_hash , sizeof (l_hash_item->hash), l_hash_item ); + if( l_hash_item ){ // If found - skip it + if(s_debug_more){ + char l_request_atom_hash_str[81]={[0]='\0'}; + dap_chain_hash_fast_to_str(l_ch_chain->request_atom_iter->cur_hash,l_request_atom_hash_str,sizeof (l_request_atom_hash_str)); + log_it(L_DEBUG, "Out CHAIN: skip atom hash %s because its already present in remote atom hash table", + l_request_atom_hash_str); + } + }else{ + l_hash_item = DAP_NEW_Z(dap_stream_ch_chain_hash_item_t); + if(s_debug_more){ + dap_hash_fast(l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size,&l_hash_item->hash); + char *l_atom_hash_str= dap_chain_hash_fast_to_str_new(&l_hash_item->hash); - dap_stream_ch_chain_hash_item_t * l_hash_item = NULL; - dap_chain_hash_fast_t l_request_atom_hash; - dap_hash_fast(l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size,&l_request_atom_hash); - HASH_FIND(hh,l_ch_chain->remote_atoms, &l_request_atom_hash , sizeof (l_hash_item->hash), l_hash_item ); - if( l_hash_item ){ // If found - skip it - if(s_debug_more){ - char l_request_atom_hash_str[81]={[0]='\0'}; - dap_chain_hash_fast_to_str(&l_request_atom_hash,l_request_atom_hash_str,sizeof (l_request_atom_hash_str)); - log_it(L_DEBUG, "Out CHAIN: skip atom hash %s because its already present in remote atom hash table", - l_request_atom_hash_str); - } - }else{ - l_hash_item = DAP_NEW_Z(dap_stream_ch_chain_hash_item_t); - if(s_debug_more){ - dap_hash_fast(l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size,&l_hash_item->hash); - char *l_atom_hash_str= dap_chain_hash_fast_to_str_new(&l_hash_item->hash); - - log_it(L_INFO, "Out CHAIN pkt: atom hash %s (size %zd) ", l_atom_hash_str, l_ch_chain->request_atom_iter->cur_size); - DAP_DELETE(l_atom_hash_str); + log_it(L_INFO, "Out CHAIN pkt: atom hash %s (size %zd) ", l_atom_hash_str, l_ch_chain->request_atom_iter->cur_size); + DAP_DELETE(l_atom_hash_str); + } + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id.uint64, + l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, + l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size); + break; // If sent smth - break out from pack loop + l_ch_chain->stats_request_atoms_processed++; + + l_hash_item->size = l_ch_chain->request_atom_iter->cur_size; + // Because we sent this atom to remote - we record it to not to send it twice + HASH_ADD(hh, l_ch_chain->remote_atoms, hash, sizeof (l_hash_item->hash), l_hash_item); } - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id.uint64, - l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, - l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size); - l_ch_chain->stats_request_atoms_processed++; - - l_hash_item->size = l_ch_chain->request_atom_iter->cur_size; - // Because we sent this atom to remote - we record it to not to send it twice - HASH_ADD(hh, l_ch_chain->remote_atoms, hash, sizeof (l_hash_item->hash), l_hash_item); + // Then get next atom and populate new last + l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); } - // Then get next atom and populate new last - l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); } else { // All chains synced dap_stream_ch_chain_sync_request_t l_request = {0}; // last message diff --git a/modules/global-db/dap_chain_global_db.c b/modules/global-db/dap_chain_global_db.c index ee4bdf3c10..e6ab5b05fc 100644 --- a/modules/global-db/dap_chain_global_db.c +++ b/modules/global-db/dap_chain_global_db.c @@ -25,15 +25,14 @@ #include <stdio.h> #include <stdint.h> #include <pthread.h> +#include <errno.h> #include <time.h> #include <assert.h> //#include <string.h> #include "uthash.h" - -#include "dap_chain_common.h" #include "dap_strfuncs.h" -//#include "dap_chain_global_db_pvt.h" +#include "dap_chain_common.h" #include "dap_chain_global_db_hist.h" #include "dap_chain_global_db.h" @@ -88,7 +87,10 @@ typedef struct history_extra_group_item static history_group_item_t * s_history_group_items = NULL; static char *s_storage_path = NULL; static history_extra_group_item_t * s_history_extra_group_items = NULL; - +#ifdef DAP_OS_UNIX +static int cmd_gdb_import(int argc, char ** argv, void *arg_func, char ** a_str_reply); +static int s_command_gdb_export(int argc, char ** argv, void *arg_func, char ** a_str_reply); +#endif char * extract_group_prefix(const char * a_group); /** @@ -115,6 +117,7 @@ char * extract_group_prefix(const char* a_group) return l_group_prefix; } + /* * Get history group by group name */ @@ -249,6 +252,8 @@ int dap_chain_global_db_init(dap_config_t * g_config) unlock(); if( res != 0 ) log_it(L_CRITICAL, "Hadn't initialized db driver \"%s\" on path \"%s\"", l_driver_name, s_storage_path ); + + return res; } diff --git a/modules/net/CMakeLists.txt b/modules/net/CMakeLists.txt index 3aeb6f18f3..79fe22e7ca 100644 --- a/modules/net/CMakeLists.txt +++ b/modules/net/CMakeLists.txt @@ -44,8 +44,7 @@ endif() if(UNIX) target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_client dap_stream_ch_chain dap_stream_ch_chain_net dap_stream_ch_chain_net_srv dap_chain dap_chain_wallet dap_chain_net_srv dap_chain_mempool dap_chain_global_db dap_chain_net_srv_stake dap_chain_cs_none - resolv - ) + resolv ) endif() target_include_directories(${PROJECT_NAME} INTERFACE . ) diff --git a/modules/net/dap_chain_node_cli.c b/modules/net/dap_chain_node_cli.c index 090c22ccb4..d1018f5ebf 100644 --- a/modules/net/dap_chain_node_cli.c +++ b/modules/net/dap_chain_node_cli.c @@ -1026,16 +1026,19 @@ int dap_chain_node_cli_init(dap_config_t * g_config) dap_chain_node_cli_cmd_item_create("stats", com_stats, NULL, "Print statistics", "stats cpu"); - // Export GDB to JSON - dap_chain_node_cli_cmd_item_create("gdb_export", com_gdb_export, NULL, "export", "export"); - //Import GDB from JSON - dap_chain_node_cli_cmd_item_create("gdb_import", com_gdb_import, NULL, "import", "import"); // Exit dap_chain_node_cli_cmd_item_create ("exit", com_exit, NULL, "Stop application and exit", "exit\n" ); +#ifdef DAP_OS_UNIX + // Export GDB to JSON + dap_chain_node_cli_cmd_item_create("gdb_export", cmd_gdb_export, NULL, "GDB export to file", "GDB export to file"); + + //Import GDB from JSON + dap_chain_node_cli_cmd_item_create("gdb_import", cmd_gdb_import, NULL, "GDB import from file", "GDB import from file"); +#endif // create thread for waiting of clients pthread_t l_thread_id; diff --git a/modules/net/dap_chain_node_cli_cmd.c b/modules/net/dap_chain_node_cli_cmd.c index c4400af5f7..3a01460cc3 100644 --- a/modules/net/dap_chain_node_cli_cmd.c +++ b/modules/net/dap_chain_node_cli_cmd.c @@ -81,6 +81,13 @@ #endif #include "dap_chain_cell.h" + +#include "dap_enc_base64.h" +#include <json-c/json.h> +#ifdef DAP_OS_UNIX +#include <dirent.h> +#endif + #include "dap_chain_common.h" #include "dap_chain_datum.h" #include "dap_chain_datum_token.h" @@ -94,10 +101,11 @@ #include "dap_stream_ch_chain.h" #include "dap_stream_ch_chain_pkt.h" #include "dap_stream_ch_chain_net_pkt.h" -#include <json-c/json.h> #include "dap_enc_base64.h" + #define LOG_TAG "chain_node_cli_cmd" + /** * Find in base addr by alias * @@ -4022,7 +4030,18 @@ int com_print_log(int argc, char ** argv, void *arg_func, char **str_reply) return 0; } -int com_gdb_export(int argc, char ** argv, void *arg_func, char ** a_str_reply) { + +#ifdef DAP_OS_UNIX +/** + * @brief cmd_gdb_export + * @param argc + * @param argv + * @param arg_func + * @param a_str_reply + * @return + */ +int cmd_gdb_export(int argc, char ** argv, void *arg_func, char ** a_str_reply) +{ int arg_index = 1; const char *l_filename = NULL; dap_chain_node_cli_find_option_val(argv, arg_index, argc, "filename", &l_filename); @@ -4083,8 +4102,13 @@ int com_gdb_export(int argc, char ** argv, void *arg_func, char ** a_str_reply) dap_store_obj_free(l_data, l_data_size); } if (json_object_to_file(l_path, l_json) == -1) { +#if JSON_C_MINOR_VERSION<15 + log_it(L_CRITICAL, "Couldn't export JSON to file, error code %d", errno ); + dap_chain_node_cli_set_reply_text (a_str_reply, "Couldn't export JSON to file, error code %d", errno ); +#else log_it(L_CRITICAL, "Couldn't export JSON to file, err '%s'", json_util_get_last_err()); - dap_chain_node_cli_set_reply_text(a_str_reply, json_util_get_last_err()); + dap_chain_node_cli_set_reply_text(a_str_reply, json_util_get_last_err()); +#endif json_object_put(l_json); return -1; } @@ -4092,7 +4116,16 @@ int com_gdb_export(int argc, char ** argv, void *arg_func, char ** a_str_reply) return 0; } -int com_gdb_import(int argc, char ** argv, void *arg_func, char ** a_str_reply) { +/** + * @brief cmd_gdb_import + * @param argc + * @param argv + * @param arg_func + * @param a_str_reply + * @return + */ +int cmd_gdb_import(int argc, char ** argv, void *arg_func, char ** a_str_reply) +{ int arg_index = 1; const char *l_filename = NULL; dap_chain_node_cli_find_option_val(argv, arg_index, argc, "filename", &l_filename); @@ -4106,8 +4139,13 @@ int com_gdb_import(int argc, char ** argv, void *arg_func, char ** a_str_reply) dap_snprintf(l_path, sizeof(l_path), "%s/../%s.json", l_db_path, l_filename); struct json_object *l_json = json_object_from_file(l_path); if (!l_json) { +#if JSON_C_MINOR_VERSION<15 + log_it(L_CRITICAL, "Import error occured: code %d", errno); + dap_chain_node_cli_set_reply_text(a_str_reply, "Import error occured: code %d",errno); +#else log_it(L_CRITICAL, "Import error occured: %s", json_util_get_last_err()); dap_chain_node_cli_set_reply_text(a_str_reply, json_util_get_last_err()); +#endif return -1; } for (size_t i = 0, l_groups_count = json_object_array_length(l_json); i < l_groups_count; ++i) { @@ -4150,3 +4188,5 @@ int com_gdb_import(int argc, char ** argv, void *arg_func, char ** a_str_reply) json_object_put(l_json); return 0; } + +#endif diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index b1e4585e24..67b32b6e9e 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -322,7 +322,6 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha l_chain_pkt->hdr.net_id.uint64 = l_net_id; l_chain_pkt->hdr.cell_id.uint64 = l_cell_id.uint64; l_chain_pkt->hdr.chain_id.uint64 = l_chain_id.uint64; - dap_stream_ch_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START , l_chain_pkt,l_chain_pkt_size); DAP_DELETE(l_chain_pkt); diff --git a/modules/net/include/dap_chain_node_cli_cmd.h b/modules/net/include/dap_chain_node_cli_cmd.h index 0ca91f08ab..7718569ecc 100644 --- a/modules/net/include/dap_chain_node_cli_cmd.h +++ b/modules/net/include/dap_chain_node_cli_cmd.h @@ -133,12 +133,12 @@ int com_print_log(int argc, char ** argv, void *arg_func, char **str_reply); int com_stats(int argc, char ** argv, void *arg_func, char **str_reply); int com_exit(int argc, char ** argv, void *arg_func, char **str_reply); +int cmd_gdb_import(int argc, char ** argv, void *arg_func, char ** a_str_reply); +int cmd_gdb_export(int argc, char ** argv, void *arg_func, char ** a_str_reply); int com_mempool_delete(int argc, char ** argv, void *arg_func, char ** a_str_reply); int com_mempool_list(int argc, char ** argv, void *arg_func, char ** a_str_reply); int com_mempool_proc(int argc, char ** argv, void *arg_func, char ** a_str_reply); -int com_gdb_export(int argc, char ** argv, void *arg_func, char ** a_str_reply); -int com_gdb_import(int argc, char ** argv, void *arg_func, char ** a_str_reply); /** * Place public CA into the mempool */ diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index d73d98acd0..67a2dd52f6 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -924,6 +924,7 @@ static dap_chain_atom_iter_t* s_chain_callback_atom_iter_create_from(dap_chain_t dap_chain_cs_dag_event_item_t * l_atom_item; HASH_FIND(hh, PVT(DAP_CHAIN_CS_DAG(a_chain))->events, &l_atom_hash, sizeof(l_atom_hash),l_atom_item ); l_atom_iter->cur_item = l_atom_item; + l_atom_iter->cur_hash = &l_atom_item->hash; } return l_atom_iter; @@ -982,9 +983,11 @@ static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get_first(dap_chain_atom_ if ( a_atom_iter->cur_item ){ a_atom_iter->cur = ((dap_chain_cs_dag_event_item_t*) a_atom_iter->cur_item)->event; a_atom_iter->cur_size = ((dap_chain_cs_dag_event_item_t*) a_atom_iter->cur_item)->event_size; + a_atom_iter->cur_hash = &((dap_chain_cs_dag_event_item_t*) a_atom_iter->cur_item)->hash; }else{ a_atom_iter->cur = NULL; a_atom_iter->cur_size = 0; + a_atom_iter->cur_hash = NULL; } if (a_ret_size) @@ -1089,6 +1092,7 @@ static dap_chain_atom_ptr_t s_chain_callback_atom_iter_find_by_hash(dap_chain_at a_atom_iter->cur_item = l_event_item; a_atom_iter->cur = l_event_item->event; a_atom_iter->cur_size= l_event_item->event_size; + a_atom_iter->cur_hash = &l_event_item->hash; if(a_atom_size) *a_atom_size = l_event_item->event_size; return l_event_item->event; @@ -1125,6 +1129,7 @@ static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get_next( dap_chain_atom_ // if l_event_item=NULL then items are over a_atom_iter->cur = l_event_item ? l_event_item->event : NULL; a_atom_iter->cur_size = a_atom_iter->cur ? l_event_item->event_size : 0; + a_atom_iter->cur_hash = l_event_item ? &l_event_item->hash : NULL; } if(a_atom_size) *a_atom_size = a_atom_iter->cur_size; -- GitLab