From 9f6546da1ec4b936b88fb8f022befeed649373bd Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Mon, 7 Feb 2022 21:39:55 +0300 Subject: [PATCH 1/4] [+] TPS peak value calc --- modules/chain/dap_chain_ledger.c | 46 ++++++++++++++-- modules/chain/include/dap_chain_ledger.h | 2 +- .../dap_chain_global_db_driver_sqlite.c | 2 +- modules/net/dap_chain_net.c | 52 ++++++++++--------- modules/type/dag/dap_chain_cs_dag.c | 4 ++ 5 files changed, 76 insertions(+), 30 deletions(-) diff --git a/modules/chain/dap_chain_ledger.c b/modules/chain/dap_chain_ledger.c index 13bed2717..4faae9862 100644 --- a/modules/chain/dap_chain_ledger.c +++ b/modules/chain/dap_chain_ledger.c @@ -50,6 +50,7 @@ #include "dap_strfuncs.h" #include "dap_config.h" #include "dap_cert.h" +#include "dap_timerfd.h" #include "dap_chain_datum_tx_token.h" #include "dap_chain_datum_token.h" #include "dap_chain_mempool.h" @@ -193,6 +194,12 @@ typedef struct dap_ledger_private { dap_chain_cell_id_t local_cell_id; bool load_mode; + // TPS section + dap_timerfd_t *tps_timer; + time_t tps_start_time; + time_t tps_current_time; + time_t tps_end_time; + size_t tps_count; } dap_ledger_private_t; #define PVT(a) ( (dap_ledger_private_t* ) a->_internal ) @@ -1096,6 +1103,8 @@ dap_ledger_t* dap_chain_ledger_create(uint16_t a_check_flags, char *a_net_name) l_ledger_priv->check_token_emission = a_check_flags & DAP_CHAIN_LEDGER_CHECK_TOKEN_EMISSION; l_ledger_priv->net = dap_chain_net_by_name(a_net_name); l_ledger_priv->load_mode = true; + l_ledger_priv->tps_timer = NULL; + l_ledger_priv->tps_count = 0; log_it(L_DEBUG,"Created ledger \"%s\"",a_net_name); if (dap_config_get_item_bool_default(g_config, "ledger", "cached", true)) { @@ -2118,7 +2127,12 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, dap_list_t *l_list_bound_items = NULL; dap_list_t *l_list_tx_out = NULL; dap_chain_ledger_tx_item_t *l_item_tmp = NULL; - + if (!l_ledger_priv->tps_timer) { + l_ledger_priv->tps_start_time = time(NULL); + l_ledger_priv->tps_current_time = l_ledger_priv->tps_start_time; + l_ledger_priv->tps_count = 0; + l_ledger_priv->tps_timer = dap_timerfd_start(500, s_ledger_tps_callback, l_ledger_priv); + } dap_chain_hash_fast_t *l_tx_hash = dap_chain_node_datum_tx_calc_hash(a_tx); char l_tx_hash_str[70]; dap_chain_hash_fast_to_str(l_tx_hash,l_tx_hash_str,sizeof(l_tx_hash_str)); @@ -2377,7 +2391,7 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, l_item_tmp = DAP_NEW_Z(dap_chain_ledger_tx_item_t); memcpy(&l_item_tmp->tx_hash_fast, l_tx_hash, sizeof(dap_chain_hash_fast_t)); l_item_tmp->tx = DAP_NEW_SIZE(dap_chain_datum_tx_t, dap_chain_datum_tx_get_size(a_tx)); - l_item_tmp->cache_data.ts_created = (time_t) a_tx->header.ts_created; + l_item_tmp->cache_data.ts_created = time(NULL); // Time of transasction added to ledger dap_list_t *l_tist_tmp = dap_chain_datum_tx_items_get(a_tx, TX_ITEM_TYPE_OUT_ALL, &l_item_tmp->cache_data.n_outs); // If debug mode dump the UTXO if (dap_log_level_get() == L_DEBUG && s_debug_more) { @@ -2412,7 +2426,9 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); HASH_ADD(hh, l_ledger_priv->ledger_items, tx_hash_fast, sizeof(dap_chain_hash_fast_t), l_item_tmp); // tx_hash_fast: name of key field pthread_rwlock_unlock(&l_ledger_priv->ledger_rwlock); - + // Count TPS + l_ledger_priv->tps_end_time = time(NULL); + l_ledger_priv->tps_count++; // Add it to cache uint8_t *l_tx_cache = DAP_NEW_Z_SIZE(uint8_t, l_tx_size + sizeof(l_item_tmp->cache_data)); memcpy(l_tx_cache, &l_item_tmp->cache_data, sizeof(l_item_tmp->cache_data)); @@ -2433,6 +2449,17 @@ FIN: return ret; } +bool s_ledger_tps_callback(void *a_arg) +{ + dap_ledger_private_t *l_ledger_pvt = (dap_ledger_private_t *)a_arg; + if (l_ledger_pvt->tps_current_time != l_ledger_pvt->tps_end_time) { + l_ledger_pvt->tps_current_time = l_ledger_pvt->tps_end_time; + return true; + } + l_ledger_pvt->tps_timer = NULL; + return false; +} + int dap_chain_ledger_tx_load(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx) { if (!PVT(a_ledger)->load_mode) { @@ -2517,7 +2544,6 @@ int dap_chain_ledger_tx_remove(dap_ledger_t *a_ledger, dap_chain_hash_fast_t *a_ void dap_chain_ledger_purge(dap_ledger_t *a_ledger, bool a_preserve_db) { dap_ledger_private_t *l_ledger_priv = PVT(a_ledger); - const int l_hash_str_size = DAP_CHAIN_HASH_FAST_SIZE * 2 + 2; pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); pthread_rwlock_wrlock(&l_ledger_priv->tokens_rwlock); pthread_rwlock_wrlock(&l_ledger_priv->treshold_emissions_rwlock); @@ -2657,6 +2683,18 @@ uint64_t dap_chain_ledger_count_from_to(dap_ledger_t * a_ledger, time_t a_ts_fro return l_ret; } +size_t dap_chain_ledger_count_tps(dap_ledger_t *a_ledger, time_t *a_ts_from, time_t *a_ts_to) +{ + if (!a_ledger) + return 0; + dap_ledger_private_t *l_ledger_priv = PVT(a_ledger); + if (a_ts_from) + *a_ts_from = l_ledger_priv->tps_start_time; + if (a_ts_to) + *a_ts_to = l_ledger_priv->tps_end_time; + return l_ledger_priv->tps_count; +} + /** * Check whether used 'out' items */ diff --git a/modules/chain/include/dap_chain_ledger.h b/modules/chain/include/dap_chain_ledger.h index 13892ffc3..b0e2f35fa 100644 --- a/modules/chain/include/dap_chain_ledger.h +++ b/modules/chain/include/dap_chain_ledger.h @@ -175,7 +175,7 @@ void dap_chain_ledger_load_end(dap_ledger_t *a_ledger); unsigned dap_chain_ledger_count(dap_ledger_t *a_ledger); uint64_t dap_chain_ledger_count_from_to(dap_ledger_t * a_ledger, time_t a_ts_from, time_t a_ts_to ); - +size_t dap_chain_ledger_count_tps(dap_ledger_t *a_ledger, time_t *a_ts_from, time_t *a_ts_to); /** * Check whether used 'out' items */ diff --git a/modules/global-db/dap_chain_global_db_driver_sqlite.c b/modules/global-db/dap_chain_global_db_driver_sqlite.c index 4d9343e1e..171d43c58 100644 --- a/modules/global-db/dap_chain_global_db_driver_sqlite.c +++ b/modules/global-db/dap_chain_global_db_driver_sqlite.c @@ -87,7 +87,7 @@ static int dap_db_driver_sqlite_exec(sqlite3 *l_db, const char *l_query, char ** static sqlite3 *s_sqlite_get_connection(void) { if (pthread_rwlock_wrlock(&s_db_rwlock) == EDEADLK) { - return NULL; + return s_trans; } sqlite3 *l_ret = NULL; diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index c6f734446..6566c07da 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -1555,28 +1555,21 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) dap_chain_node_cli_find_option_val(argv, arg_index, argc, "-to", &l_to_str); dap_chain_node_cli_find_option_val(argv, arg_index, argc, "-prev_sec", &l_prev_sec_str); - if (l_from_str ) { + time_t l_ts_now = time(NULL); + if (l_from_str) { strptime( (char *)l_from_str, c_time_fmt, &l_from_tm ); - } - - if (l_to_str) { - strptime( (char *)l_to_str, c_time_fmt, &l_to_tm ); - } - - if ( l_to_str == NULL ){ // If not set '-to' - we set up current time - time_t l_ts_now = time(NULL); - localtime_r(&l_ts_now, &l_to_tm); - } - - if ( l_prev_sec_str ){ - time_t l_ts_now = time(NULL); + if (l_to_str) { + strptime( (char *)l_to_str, c_time_fmt, &l_to_tm ); + } else { // If not set '-to' - we set up current time + localtime_r(&l_ts_now, &l_to_tm); + } + } else if (l_prev_sec_str) { l_ts_now -= strtol( l_prev_sec_str, NULL,10 ); localtime_r(&l_ts_now, &l_from_tm ); - }/*else if ( l_from_str == NULL ){ // If not set '-from' we set up current time minus 10 seconds - time_t l_ts_now = time(NULL); - l_ts_now -= 10; + } else if ( l_from_str == NULL ) { // If not set '-from' we set up current time minus 60 seconds + l_ts_now -= 60; localtime_r(&l_ts_now, &l_from_tm ); - }*/ + } // Form timestamps from/to time_t l_from_ts = mktime(&l_from_tm); @@ -1600,6 +1593,15 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) dap_string_append_printf( l_ret_str, "\tTotal: %"DAP_UINT64_FORMAT_U"\n", l_tx_count ); dap_chain_node_cli_set_reply_text( a_str_reply, l_ret_str->str ); dap_string_free( l_ret_str, false ); + } else if (strcmp(l_stats_str, "tps") == 0) { + dap_string_t * l_ret_str = dap_string_new("Transactions per second peak values:\n"); + dap_string_append_printf(l_ret_str, "\tSpeed: %.3Lf TPS\n", l_tps); + dap_string_append_printf(l_ret_str, "\tTotal: %"DAP_UINT64_FORMAT_U"\n", l_tx_count); + dap_chain_node_cli_set_reply_text( a_str_reply, l_ret_str->str ); + dap_string_free( l_ret_str, false ); + } else { + dap_chain_node_cli_set_reply_text(a_str_reply, + "Subcommand 'stats' requires one of parameter: tx, tps\n"); } } else if ( l_go_str){ if ( strcmp(l_go_str,"online") == 0 ) { @@ -1613,14 +1615,16 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) c_net_states[NET_STATE_OFFLINE]); dap_chain_net_state_go_to(l_net, NET_STATE_OFFLINE); - } - else if(strcmp(l_go_str, "sync") == 0) { + } else if(strcmp(l_go_str, "sync") == 0) { dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" resynchronizing", l_net->pub.name); if (PVT(l_net)->state_target == NET_STATE_ONLINE) dap_chain_net_state_go_to(l_net, NET_STATE_ONLINE); else dap_chain_net_state_go_to(l_net, NET_STATE_SYNC_CHAINS); + } else { + dap_chain_node_cli_set_reply_text(a_str_reply, + "Subcommand 'go' requires one of parameter: online, offline, sync\n"); } } else if ( l_get_str){ @@ -1678,7 +1682,7 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) dap_chain_node_cli_set_reply_text(a_str_reply,"Stopped network\n"); }else { dap_chain_node_cli_set_reply_text(a_str_reply, - "Subcommand \"link\" requires one of parameter: list\n"); + "Subcommand 'link' requires one of parameter: list\n"); ret = -3; } @@ -1704,7 +1708,7 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) } else { dap_chain_node_cli_set_reply_text(a_str_reply, - "Subcommand \"sync\" requires one of parameter: all,gdb,chains\n"); + "Subcommand 'sync' requires one of parameter: all, gdb, chains\n"); ret = -2; } } else if (l_ca_str) { @@ -1807,7 +1811,7 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) return 0; } else { dap_chain_node_cli_set_reply_text(a_str_reply, - "Subcommand \"ca\" requires one of parameter: add, list, del\n"); + "Subcommand 'ca' requires one of parameter: add, list, del\n"); ret = -5; } } else if (l_ledger_str && !strcmp(l_ledger_str, "reload")) { @@ -1837,7 +1841,7 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) } while (l_processed); } else { dap_chain_node_cli_set_reply_text(a_str_reply, - "Command requires one of subcomand: sync, link, go, get, stats, ca, ledger"); + "Command 'net' requires one of subcomand: sync, link, go, get, stats, ca, ledger"); ret = -1; } diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 3bafe23cf..504556873 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -313,6 +313,10 @@ static void s_dap_chain_cs_dag_purge(dap_chain_t *a_chain) DAP_DELETE(l_event_current); } pthread_rwlock_unlock(&l_dag_pvt->events_rwlock); + dap_chain_cell_t *l_cell_cur, l_cell_tmp; + HASH_ITER(hh, a_chain->cells, l_cell_cur, l_cell_tmp) { + dap_chain_cell_close(l_cell_cur); + } } /** -- GitLab From eec695bd3041bd3e1a5ee914888ad9de98f328b6 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Wed, 9 Feb 2022 11:40:16 +0300 Subject: [PATCH 2/4] [*] TPS with nanoseconds accuracy --- modules/chain/dap_chain_ledger.c | 38 ++++++++++++-------- modules/chain/include/dap_chain_ledger.h | 3 +- modules/net/dap_chain_net.c | 44 +++++++++++++----------- modules/type/dag/dap_chain_cs_dag.c | 2 +- 4 files changed, 49 insertions(+), 38 deletions(-) diff --git a/modules/chain/dap_chain_ledger.c b/modules/chain/dap_chain_ledger.c index 4faae9862..2637950a7 100644 --- a/modules/chain/dap_chain_ledger.c +++ b/modules/chain/dap_chain_ledger.c @@ -196,9 +196,9 @@ typedef struct dap_ledger_private { bool load_mode; // TPS section dap_timerfd_t *tps_timer; - time_t tps_start_time; - time_t tps_current_time; - time_t tps_end_time; + struct timespec tps_start_time; + struct timespec tps_current_time; + struct timespec tps_end_time; size_t tps_count; } dap_ledger_private_t; #define PVT(a) ( (dap_ledger_private_t* ) a->_internal ) @@ -206,11 +206,12 @@ typedef struct dap_ledger_private { static dap_chain_ledger_tx_item_t* tx_item_find_by_addr(dap_ledger_t *a_ledger, const dap_chain_addr_t *a_addr, const char * a_token, dap_chain_hash_fast_t *a_tx_first_hash); - static void s_treshold_emissions_proc( dap_ledger_t * a_ledger); static void s_treshold_txs_proc( dap_ledger_t * a_ledger); static int s_token_tsd_parse(dap_ledger_t * a_ledger, dap_chain_ledger_token_item_t *a_token_item , dap_chain_datum_token_t * a_token, size_t a_token_size); static int s_ledger_permissions_check(dap_chain_ledger_token_item_t * a_token_item, uint16_t a_permission_id, const void * a_data,size_t a_data_size ); +static bool s_ledger_tps_callback(void *a_arg); + static size_t s_treshold_emissions_max = 1000; static size_t s_treshold_txs_max = 10000; static bool s_debug_more = false; @@ -2128,8 +2129,9 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, dap_list_t *l_list_tx_out = NULL; dap_chain_ledger_tx_item_t *l_item_tmp = NULL; if (!l_ledger_priv->tps_timer) { - l_ledger_priv->tps_start_time = time(NULL); - l_ledger_priv->tps_current_time = l_ledger_priv->tps_start_time; + clock_gettime(CLOCK_REALTIME, &l_ledger_priv->tps_start_time); + l_ledger_priv->tps_current_time.tv_sec = l_ledger_priv->tps_start_time.tv_sec; + l_ledger_priv->tps_current_time.tv_nsec = l_ledger_priv->tps_start_time.tv_nsec; l_ledger_priv->tps_count = 0; l_ledger_priv->tps_timer = dap_timerfd_start(500, s_ledger_tps_callback, l_ledger_priv); } @@ -2427,7 +2429,7 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, HASH_ADD(hh, l_ledger_priv->ledger_items, tx_hash_fast, sizeof(dap_chain_hash_fast_t), l_item_tmp); // tx_hash_fast: name of key field pthread_rwlock_unlock(&l_ledger_priv->ledger_rwlock); // Count TPS - l_ledger_priv->tps_end_time = time(NULL); + clock_gettime(CLOCK_REALTIME, &l_ledger_priv->tps_end_time); l_ledger_priv->tps_count++; // Add it to cache uint8_t *l_tx_cache = DAP_NEW_Z_SIZE(uint8_t, l_tx_size + sizeof(l_item_tmp->cache_data)); @@ -2449,11 +2451,13 @@ FIN: return ret; } -bool s_ledger_tps_callback(void *a_arg) +static bool s_ledger_tps_callback(void *a_arg) { dap_ledger_private_t *l_ledger_pvt = (dap_ledger_private_t *)a_arg; - if (l_ledger_pvt->tps_current_time != l_ledger_pvt->tps_end_time) { - l_ledger_pvt->tps_current_time = l_ledger_pvt->tps_end_time; + if (l_ledger_pvt->tps_current_time.tv_sec != l_ledger_pvt->tps_end_time.tv_sec || + l_ledger_pvt->tps_current_time.tv_nsec != l_ledger_pvt->tps_end_time.tv_nsec) { + l_ledger_pvt->tps_current_time.tv_sec = l_ledger_pvt->tps_end_time.tv_sec; + l_ledger_pvt->tps_current_time.tv_nsec = l_ledger_pvt->tps_end_time.tv_nsec; return true; } l_ledger_pvt->tps_timer = NULL; @@ -2683,15 +2687,19 @@ uint64_t dap_chain_ledger_count_from_to(dap_ledger_t * a_ledger, time_t a_ts_fro return l_ret; } -size_t dap_chain_ledger_count_tps(dap_ledger_t *a_ledger, time_t *a_ts_from, time_t *a_ts_to) +size_t dap_chain_ledger_count_tps(dap_ledger_t *a_ledger, struct timespec *a_ts_from, struct timespec *a_ts_to) { if (!a_ledger) return 0; dap_ledger_private_t *l_ledger_priv = PVT(a_ledger); - if (a_ts_from) - *a_ts_from = l_ledger_priv->tps_start_time; - if (a_ts_to) - *a_ts_to = l_ledger_priv->tps_end_time; + if (a_ts_from) { + a_ts_from->tv_sec = l_ledger_priv->tps_start_time.tv_sec; + a_ts_from->tv_nsec = l_ledger_priv->tps_start_time.tv_nsec; + } + if (a_ts_to) { + a_ts_to->tv_sec = l_ledger_priv->tps_end_time.tv_sec; + a_ts_to->tv_nsec = l_ledger_priv->tps_end_time.tv_nsec; + } return l_ledger_priv->tps_count; } diff --git a/modules/chain/include/dap_chain_ledger.h b/modules/chain/include/dap_chain_ledger.h index b0e2f35fa..500338644 100644 --- a/modules/chain/include/dap_chain_ledger.h +++ b/modules/chain/include/dap_chain_ledger.h @@ -175,7 +175,8 @@ void dap_chain_ledger_load_end(dap_ledger_t *a_ledger); unsigned dap_chain_ledger_count(dap_ledger_t *a_ledger); uint64_t dap_chain_ledger_count_from_to(dap_ledger_t * a_ledger, time_t a_ts_from, time_t a_ts_to ); -size_t dap_chain_ledger_count_tps(dap_ledger_t *a_ledger, time_t *a_ts_from, time_t *a_ts_to); +size_t dap_chain_ledger_count_tps(dap_ledger_t *a_ledger, struct timespec *a_ts_from, struct timespec *a_ts_to); + /** * Check whether used 'out' items */ diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 6566c07da..11a0edaef 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -1537,18 +1537,16 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) if ( !dap_strcmp(l_sync_mode_str,"all") ) dap_chain_net_get_flag_sync_from_zero(l_net); - if ( l_stats_str ){ + if ( l_stats_str ){ + + char l_from_str_new[50], l_to_str_new[50]; + const char c_time_fmt[]="%Y-%m-%d_%H:%M:%S"; + struct tm l_from_tm = {}, l_to_tm = {}; + if ( strcmp(l_stats_str,"tx") == 0 ) { const char *l_to_str = NULL; - struct tm l_to_tm = {0}; - const char *l_from_str = NULL; - struct tm l_from_tm = {0}; - const char *l_prev_sec_str = NULL; - //time_t l_prev_sec_ts; - - const char c_time_fmt[]="%Y-%m-%d_%H:%M:%S"; // Read from/to time dap_chain_node_cli_find_option_val(argv, arg_index, argc, "-from", &l_from_str); @@ -1570,20 +1568,12 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) l_ts_now -= 60; localtime_r(&l_ts_now, &l_from_tm ); } - - // Form timestamps from/to time_t l_from_ts = mktime(&l_from_tm); time_t l_to_ts = mktime(&l_to_tm); - // Produce strings - char l_from_str_new[50]; - char l_to_str_new[50]; + dap_string_t * l_ret_str = dap_string_new("Transactions statistics:\n"); strftime(l_from_str_new, sizeof(l_from_str_new), c_time_fmt,&l_from_tm ); strftime(l_to_str_new, sizeof(l_to_str_new), c_time_fmt,&l_to_tm ); - - - dap_string_t * l_ret_str = dap_string_new("Transactions statistics:\n"); - dap_string_append_printf( l_ret_str, "\tFrom: %s\tTo: %s\n", l_from_str_new, l_to_str_new); log_it(L_INFO, "Calc TPS from %s to %s", l_from_str_new, l_to_str_new); uint64_t l_tx_count = dap_chain_ledger_count_from_to ( l_net->pub.ledger, l_from_ts, l_to_ts); @@ -1594,11 +1584,23 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) dap_chain_node_cli_set_reply_text( a_str_reply, l_ret_str->str ); dap_string_free( l_ret_str, false ); } else if (strcmp(l_stats_str, "tps") == 0) { + struct timespec l_from_time_acc = {}, l_to_time_acc = {}; dap_string_t * l_ret_str = dap_string_new("Transactions per second peak values:\n"); - dap_string_append_printf(l_ret_str, "\tSpeed: %.3Lf TPS\n", l_tps); - dap_string_append_printf(l_ret_str, "\tTotal: %"DAP_UINT64_FORMAT_U"\n", l_tx_count); - dap_chain_node_cli_set_reply_text( a_str_reply, l_ret_str->str ); - dap_string_free( l_ret_str, false ); + size_t l_tx_num = dap_chain_ledger_count_tps(l_net->pub.ledger, &l_from_time_acc, &l_to_time_acc); + if (l_tx_num) { + localtime_r(&l_from_time_acc.tv_sec, &l_from_tm); + strftime(l_from_str_new, sizeof(l_from_str_new), c_time_fmt, &l_from_tm); + localtime_r(&l_to_time_acc.tv_sec, &l_to_tm); + strftime(l_to_str_new, sizeof(l_to_str_new), c_time_fmt, &l_to_tm); + dap_string_append_printf(l_ret_str, "\tFrom: %s\tTo: %s\n", l_from_str_new, l_to_str_new); + uint64_t l_diff_ns = (l_to_time_acc.tv_sec - l_from_time_acc.tv_sec) * 1000000000 + + l_to_time_acc.tv_nsec - l_from_time_acc.tv_nsec; + long double l_tps = (long double)(l_tx_num * 1000000000) / (long double)(l_diff_ns); + dap_string_append_printf(l_ret_str, "\tSpeed: %.3Lf TPS\n", l_tps); + } + dap_string_append_printf(l_ret_str, "\tTotal: %zu\n", l_tx_num); + dap_chain_node_cli_set_reply_text(a_str_reply, l_ret_str->str); + dap_string_free(l_ret_str, false); } else { dap_chain_node_cli_set_reply_text(a_str_reply, "Subcommand 'stats' requires one of parameter: tx, tps\n"); diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 504556873..0de334839 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -313,7 +313,7 @@ static void s_dap_chain_cs_dag_purge(dap_chain_t *a_chain) DAP_DELETE(l_event_current); } pthread_rwlock_unlock(&l_dag_pvt->events_rwlock); - dap_chain_cell_t *l_cell_cur, l_cell_tmp; + dap_chain_cell_t *l_cell_cur, *l_cell_tmp; HASH_ITER(hh, a_chain->cells, l_cell_cur, l_cell_tmp) { dap_chain_cell_close(l_cell_cur); } -- GitLab From 0f84c28c20af85aba2f80ce6ed8ebf6a0a4767d7 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Wed, 9 Feb 2022 12:36:58 +0300 Subject: [PATCH 3/4] [*] Srv client master port --- .../net/stream/ch/include/dap_stream_ch_pkt.h | 2 + .../dap_stream_ch_chain_net_srv.c | 4 +- .../dap_stream_ch_chain_net_srv.h | 4 +- .../include/dap_stream_ch_chain_net_srv.h | 1 + modules/net/dap_chain_node_client.c | 26 ++-- modules/net/include/dap_chain_node_client.h | 11 ++ modules/net/srv/dap_chain_net_srv_client.c | 125 ++++++++++++++---- modules/net/srv/dap_chain_net_srv_common.c | 28 ++++ .../srv/dap_chain_net_srv_stream_session.c | 2 +- modules/net/srv/include/dap_chain_net_srv.h | 23 +++- .../srv/include/dap_chain_net_srv_client.h | 33 +++-- .../dap_chain_net_srv_stream_session.h | 7 +- modules/service/vpn/dap_chain_net_srv_vpn.c | 22 +-- .../xchange/dap_chain_net_srv_xchange.c | 16 +-- modules/type/dag/dap_chain_cs_dag.c | 7 +- 15 files changed, 226 insertions(+), 85 deletions(-) diff --git a/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h b/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h index 63bf9d2ad..4c9a324a6 100644 --- a/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h +++ b/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h @@ -49,6 +49,8 @@ typedef struct dap_stream_ch_pkt{ uint8_t data[]; } __attribute__((packed)) dap_stream_ch_pkt_t; +typedef void (*dap_stream_ch_callback_packet_t)(void *, uint8_t, dap_stream_ch_pkt_t *, void *); + int dap_stream_ch_pkt_init(); void dap_stream_ch_pkt_deinit(); diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c index 1baf6eb0b..ed450782a 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c @@ -224,7 +224,7 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) l_err.usage_id = l_usage->id; // Create one client - l_usage->client = DAP_NEW_Z( dap_chain_net_srv_client_t); + l_usage->client = DAP_NEW_Z( dap_chain_net_srv_client_remote_t); l_usage->client->stream_worker = l_ch->stream_worker; l_usage->client->ch = l_ch; l_usage->client->session_id = l_ch->stream->session->id; @@ -656,7 +656,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_success->hdr.srv_uid); if ( l_srv && l_srv->callback_client_success){ // Create client for client) - dap_chain_net_srv_client_t *l_client = DAP_NEW_Z( dap_chain_net_srv_client_t); + dap_chain_net_srv_client_remote_t *l_client = DAP_NEW_Z( dap_chain_net_srv_client_remote_t); l_client->ch = a_ch; l_client->stream_worker = a_ch->stream_worker; l_client->ts_created = time(NULL); diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.h b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.h index ced85e9ac..01fb26820 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.h +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.h @@ -24,9 +24,9 @@ #pragma once -#include <pthread.h> -#include "dap_stream_ch.h" +#include "dap_chain_net_srv_common.h" #include "dap_stream_ch_pkt.h" +#include "dap_chain_net_srv_stream_session.h" typedef struct dap_stream_ch_chain_net_srv dap_stream_ch_chain_net_srv_t; diff --git a/modules/channel/chain-net-srv/include/dap_stream_ch_chain_net_srv.h b/modules/channel/chain-net-srv/include/dap_stream_ch_chain_net_srv.h index d614b20b0..bf64cc21d 100644 --- a/modules/channel/chain-net-srv/include/dap_stream_ch_chain_net_srv.h +++ b/modules/channel/chain-net-srv/include/dap_stream_ch_chain_net_srv.h @@ -25,6 +25,7 @@ #pragma once +#include "dap_stream_ch.h" #include "dap_chain_common.h" int dap_stream_ch_chain_net_srv_init(void); diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 68f142228..830413534 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -621,9 +621,9 @@ static int save_stat_to_database(dap_stream_ch_chain_net_srv_pkt_test_t *a_reque */ static void s_ch_chain_callback_notify_packet_R(dap_stream_ch_chain_net_srv_t* a_ch_chain, uint8_t a_pkt_type, dap_stream_ch_pkt_t *a_pkt, void * a_arg) { + UNUSED(a_ch_chain); dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; switch (a_pkt_type) { - // get new generated current node address case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE: { dap_stream_ch_chain_net_srv_pkt_test_t *l_request = (dap_stream_ch_chain_net_srv_pkt_test_t *) a_pkt->data; size_t l_request_size = l_request->data_size + sizeof(dap_stream_ch_chain_net_srv_pkt_test_t); @@ -631,9 +631,7 @@ static void s_ch_chain_callback_notify_packet_R(dap_stream_ch_chain_net_srv_t* a log_it(L_WARNING, "Wrong request size, less or more than required"); break; } - // todo to write result to database save_stat_to_database(l_request, l_node_client); - //... l_node_client->state = NODE_CLIENT_STATE_CHECKED; #ifndef _WIN32 pthread_cond_broadcast(&l_node_client->wait_cond); @@ -742,7 +740,7 @@ static bool dap_chain_node_client_connect_internal(dap_chain_node_client_t *a_no dap_chain_node_client_close(a_node_client); return false; } - dap_client_set_uplink_unsafe(a_node_client->client, strdup(host), a_node_client->info->hdr.ext_port); + dap_client_set_uplink_unsafe(a_node_client->client, host, a_node_client->info->hdr.ext_port); a_node_client->state = NODE_CLIENT_STATE_CONNECTING ; // Handshake & connect dap_client_go_stage(a_node_client->client, STAGE_STREAM_STREAMING, s_stage_connected_callback); @@ -810,8 +808,8 @@ void dap_chain_node_client_close(dap_chain_node_client_t *a_client) // clean client dap_client_pvt_t *l_client_pvt = dap_client_pvt_find(a_client->client->pvt_uuid); if (l_client_pvt) { - dap_client_delete_mt(a_client->client); a_client->client->_inheritor = NULL; + dap_client_delete_mt(a_client->client); } #ifndef _WIN32 pthread_cond_destroy(&a_client->wait_cond); @@ -820,6 +818,8 @@ void dap_chain_node_client_close(dap_chain_node_client_t *a_client) #endif pthread_mutex_destroy(&a_client->wait_mutex); a_client->client = NULL; + if (a_client->info) + DAP_DELETE(a_client->info); DAP_DELETE(a_client); } else { log_it(L_WARNING, "Chain node client was removed from hash table before for some reasons"); @@ -955,7 +955,7 @@ int dap_chain_node_client_set_callbacks(dap_client_t *a_client, uint8_t a_ch_id) if(l_ch) { // C if(a_ch_id == dap_stream_ch_chain_get_id()) { - dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); l_ch_chain->callback_notify_packet_out = s_ch_chain_callback_notify_packet_out; l_ch_chain->callback_notify_packet_in = s_ch_chain_callback_notify_packet_in; l_ch_chain->callback_notify_arg = l_node_client; @@ -972,9 +972,17 @@ int dap_chain_node_client_set_callbacks(dap_client_t *a_client, uint8_t a_ch_id) } // R if(a_ch_id == dap_stream_ch_chain_net_srv_get_id()) { - dap_stream_ch_chain_net_srv_t * l_ch_chain = DAP_STREAM_CH_CHAIN_NET_SRV(l_ch); - l_ch_chain->notify_callback = s_ch_chain_callback_notify_packet_R; - l_ch_chain->notify_callback_arg = l_node_client; + dap_stream_ch_chain_net_srv_t *l_ch_chain = DAP_STREAM_CH_CHAIN_NET_SRV(l_ch); + if (l_node_client->callbacks.srv_pkt_in) { + l_ch_chain->notify_callback = (dap_stream_ch_chain_net_srv_callback_packet_t) + l_node_client->callbacks.srv_pkt_in; + l_ch_chain->notify_callback_arg = l_node_client->callbacks_arg; + } else { + l_ch_chain->notify_callback = s_ch_chain_callback_notify_packet_R; + l_ch_chain->notify_callback_arg = l_node_client; + } + l_node_client->ch_chain_net_srv = l_ch; + memcpy(&l_node_client->ch_chain_net_srv_uuid, &l_ch->uuid, sizeof(dap_stream_ch_uuid_t)); } l_ret = 0; } else { diff --git a/modules/net/include/dap_chain_node_client.h b/modules/net/include/dap_chain_node_client.h index b74d295b5..829ec01c4 100644 --- a/modules/net/include/dap_chain_node_client.h +++ b/modules/net/include/dap_chain_node_client.h @@ -28,6 +28,7 @@ #include "uthash.h" #include "dap_client.h" #include "dap_chain_node.h" +#include "dap_stream_ch_pkt.h" // connection states typedef enum dap_chain_node_client_state { @@ -69,6 +70,13 @@ typedef struct dap_chain_node_client_callbacks{ dap_chain_node_client_callback_t delete; dap_chain_node_client_callback_stage_t stage; dap_chain_node_client_callback_error_t error; + + dap_stream_ch_callback_packet_t chain_pkt_in; + dap_stream_ch_callback_packet_t chain_pkt_out; + dap_stream_ch_callback_packet_t net_pkt_in; + dap_stream_ch_callback_packet_t net_pkt_out; + dap_stream_ch_callback_packet_t srv_pkt_in; + dap_stream_ch_callback_packet_t srv_pkt_out; } dap_chain_node_client_callbacks_t; // state for a client connection @@ -94,6 +102,9 @@ typedef struct dap_chain_node_client { // Channel chain net dap_stream_ch_t * ch_chain_net; dap_stream_ch_uuid_t ch_chain_net_uuid; + // Channel chain net srv + dap_stream_ch_t * ch_chain_net_srv; + dap_stream_ch_uuid_t ch_chain_net_srv_uuid; dap_chain_node_info_t * info; dap_events_t *events; diff --git a/modules/net/srv/dap_chain_net_srv_client.c b/modules/net/srv/dap_chain_net_srv_client.c index 09ad3a42c..0a957e887 100644 --- a/modules/net/srv/dap_chain_net_srv_client.c +++ b/modules/net/srv/dap_chain_net_srv_client.c @@ -1,9 +1,10 @@ /* * Authors: * Dmitriy Gerasimov <naeper@demlabs.net> +* Roman Khlopkov <roman.khlopkov@demlabs.net> * Cellframe https://cellframe.net * DeM Labs Inc. https://demlabs.net -* Copyright (c) 2017-2019 +* Copyright (c) 2017-2022 * All rights reserved. This file is part of CellFrame SDK the open source project @@ -22,36 +23,108 @@ You should have received a copy of the GNU General Public License along with any CellFrame SDK based project. If not, see <http://www.gnu.org/licenses/>. */ +#include "dap_stream_ch_chain_net_srv.h" #include "dap_chain_net_srv.h" #include "dap_chain_net_srv_client.h" #include "dap_common.h" #define LOG_TAG "dap_chain_net_srv_client" -/* - * Init service client - * l_uid service id - * a_callback_client_success callback to start client service - */ -// -int dap_chain_net_srv_client_init(dap_chain_net_srv_uid_t a_uid, - dap_chain_net_srv_callback_data_t a_callback_request, - dap_chain_net_srv_callback_data_t a_callback_response_success, - dap_chain_net_srv_callback_data_t a_callback_response_error, - dap_chain_net_srv_callback_data_t a_callback_receipt_next_success, - dap_chain_net_srv_callback_data_t a_callback_client_success, - dap_chain_net_srv_callback_sign_request_t a_callback_client_sign_request, - void *a_inhertor) { - - dap_chain_net_srv_t *l_srv_custom = dap_chain_net_srv_get(a_uid); - if(!l_srv_custom) { - l_srv_custom = dap_chain_net_srv_add(a_uid, a_callback_request, - a_callback_response_success, a_callback_response_error, - a_callback_receipt_next_success); +static void s_srv_client_pkt_in(dap_stream_ch_chain_net_srv_t *a_ch_chain, uint8_t a_pkt_type, dap_stream_ch_pkt_t *a_pkt, void *a_arg); +static void s_srv_client_callback_connected(dap_chain_node_client_t *a_node_client, void *a_arg); +static void s_srv_client_callback_disconnected(dap_chain_node_client_t *a_node_client, void *a_arg); +static void s_srv_client_callback_deleted(dap_chain_node_client_t *a_node_client, void *a_arg); + +static dap_chain_node_client_callbacks_t s_callbacks = { + .connected = s_srv_client_callback_connected, + .disconnected = s_srv_client_callback_disconnected, + .delete = s_srv_client_callback_deleted +}; + +dap_chain_net_srv_client_t *dap_chain_net_srv_client_create_n_connect(dap_chain_net_t *a_net, char *a_addr, uint16_t a_port, + dap_chain_net_srv_client_callbacks_t *a_callbacks, + void *a_callbacks_arg) +{ + dap_chain_net_srv_client_t *l_ret = DAP_NEW_Z(dap_chain_net_srv_client_t); + if (a_callbacks) + memcpy(&l_ret->callbacks, a_callbacks, sizeof(*a_callbacks)); + l_ret->callbacks_arg = a_callbacks_arg; + if (a_callbacks->pkt_in) + s_callbacks.srv_pkt_in = a_callbacks->pkt_in; + else + s_callbacks.srv_pkt_in = (dap_stream_ch_callback_packet_t)s_srv_client_pkt_in; + dap_chain_node_info_t *l_info = DAP_NEW_Z(dap_chain_node_info_t); + inet_pton(AF_INET, a_addr, &l_info->hdr.ext_addr_v4); + l_info->hdr.ext_port = a_port; + l_ret->node_client = dap_chain_node_client_create_n_connect(a_net, l_info, "R", &s_callbacks, l_ret); + return l_ret; +} + +ssize_t dap_chain_net_srv_client_write(dap_chain_net_srv_client_t *a_client, uint8_t a_type, void *a_pkt_data, size_t a_pkt_data_size) +{ + if (!a_client || !a_client->net_client || dap_client_get_stage(a_client->net_client) != STAGE_STREAM_STREAMING) + return -1; + dap_stream_worker_t *l_stream_worker = dap_client_get_stream_worker(a_client->net_client); + return dap_stream_ch_pkt_write_mt(l_stream_worker, a_client->ch_uuid, a_type, a_pkt_data, a_pkt_data_size); +} + +static void s_srv_client_callback_connected(dap_chain_node_client_t *a_node_client, void *a_arg) +{ + log_it(L_INFO, "Service client connected well"); + dap_chain_net_srv_client_t *l_srv_client = (dap_chain_net_srv_client_t *)a_arg; + memcpy(&l_srv_client->ch_uuid, &a_node_client->ch_chain_net_srv_uuid, sizeof(l_srv_client->ch_uuid)); + l_srv_client->net_client = a_node_client->client; + if (l_srv_client->callbacks.connected) + l_srv_client->callbacks.connected(l_srv_client, l_srv_client->callbacks_arg); + /* Test code for get reply from server + const int l_test_size = 64; + size_t l_request_size = l_test_size + sizeof(dap_stream_ch_chain_net_srv_pkt_test_t); + dap_stream_ch_chain_net_srv_pkt_test_t *l_request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_test_t, l_request_size); + l_request->data_size = l_test_size; + randombytes(l_request->data, l_test_size); + dap_hash_fast(l_request->data, l_request->data_size, &l_request->data_hash); + dap_chain_net_srv_client_write(l_srv_client, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_REQUEST, l_request, l_request_size); + DAP_DELETE(l_request); */ +} + +static void s_srv_client_callback_disconnected(dap_chain_node_client_t *a_node_client, void *a_arg) +{ + UNUSED(a_node_client); + log_it(L_INFO, "Service client disconnected"); + dap_chain_net_srv_client_t *l_srv_client = (dap_chain_net_srv_client_t *)a_arg; + if (l_srv_client->callbacks.disconnected) + l_srv_client->callbacks.disconnected(l_srv_client, l_srv_client->callbacks_arg); +} + +static void s_srv_client_callback_deleted(dap_chain_node_client_t *a_node_client, void *a_arg) +{ + UNUSED(a_node_client); + log_it(L_INFO, "Service client deleted"); + dap_chain_net_srv_client_t *l_srv_client = (dap_chain_net_srv_client_t *)a_arg; + if (l_srv_client->callbacks.deleted) + l_srv_client->callbacks.deleted(l_srv_client, l_srv_client->callbacks_arg); + DAP_DELETE(l_srv_client); +} + +static void s_srv_client_pkt_in(dap_stream_ch_chain_net_srv_t *a_ch_chain, uint8_t a_pkt_type, dap_stream_ch_pkt_t *a_pkt, void *a_arg) +{ + UNUSED(a_ch_chain); + //dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)a_arg; + switch (a_pkt_type) { + case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE: { + dap_stream_ch_chain_net_srv_pkt_test_t *l_request = (dap_stream_ch_chain_net_srv_pkt_test_t *) a_pkt->data; + size_t l_request_size = l_request->data_size + sizeof(dap_stream_ch_chain_net_srv_pkt_test_t); + if(a_pkt->hdr.size != l_request_size) { + log_it(L_WARNING, "Wrong request size, less or more than required"); + break; + } + log_it(L_INFO, "Service client check request success!"); + } break; + case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE: + case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS: + case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR: + break; + default: + break; } - l_srv_custom->callback_client_success = a_callback_client_success; - l_srv_custom->callback_client_sign_request = a_callback_client_sign_request; - if(a_inhertor) - l_srv_custom->_inhertor = a_inhertor; - return 0; } diff --git a/modules/net/srv/dap_chain_net_srv_common.c b/modules/net/srv/dap_chain_net_srv_common.c index 601f84723..457980cec 100644 --- a/modules/net/srv/dap_chain_net_srv_common.c +++ b/modules/net/srv/dap_chain_net_srv_common.c @@ -45,5 +45,33 @@ #include "dap_chain_datum_tx_items.h" #include "dap_stream.h" #include "dap_chain_net_srv_common.h" +#include "dap_chain_net_srv.h" +/* + * Init service client + * l_uid service id + * a_callback_client_success callback to start client service + */ +// +int dap_chain_net_srv_remote_init(dap_chain_net_srv_uid_t a_uid, + dap_chain_net_srv_callback_data_t a_callback_request, + dap_chain_net_srv_callback_data_t a_callback_response_success, + dap_chain_net_srv_callback_data_t a_callback_response_error, + dap_chain_net_srv_callback_data_t a_callback_receipt_next_success, + dap_chain_net_srv_callback_data_t a_callback_client_success, + dap_chain_net_srv_callback_sign_request_t a_callback_client_sign_request, + void *a_inhertor) +{ + dap_chain_net_srv_t *l_srv_custom = dap_chain_net_srv_get(a_uid); + if(!l_srv_custom) { + l_srv_custom = dap_chain_net_srv_add(a_uid, a_callback_request, + a_callback_response_success, a_callback_response_error, + a_callback_receipt_next_success); + } + l_srv_custom->callback_client_success = a_callback_client_success; + l_srv_custom->callback_client_sign_request = a_callback_client_sign_request; + if(a_inhertor) + l_srv_custom->_inhertor = a_inhertor; + return 0; +} diff --git a/modules/net/srv/dap_chain_net_srv_stream_session.c b/modules/net/srv/dap_chain_net_srv_stream_session.c index e90ba9ddd..3a2763bd6 100644 --- a/modules/net/srv/dap_chain_net_srv_stream_session.c +++ b/modules/net/srv/dap_chain_net_srv_stream_session.c @@ -87,7 +87,7 @@ void dap_chain_net_srv_usage_delete (dap_chain_net_srv_stream_session_t * a_srv_ if ( a_usage->receipt ) DAP_DELETE( a_usage->receipt ); if ( a_usage->client ){ - for (dap_chain_net_srv_client_t * l_srv_client = a_usage->client, * tmp = NULL; l_srv_client; ){ + for (dap_chain_net_srv_client_remote_t * l_srv_client = a_usage->client, * tmp = NULL; l_srv_client; ){ tmp = l_srv_client; l_srv_client = l_srv_client->next; DAP_DELETE( tmp); diff --git a/modules/net/srv/include/dap_chain_net_srv.h b/modules/net/srv/include/dap_chain_net_srv.h index 2fc2a3bea..c98bcb43c 100755 --- a/modules/net/srv/include/dap_chain_net_srv.h +++ b/modules/net/srv/include/dap_chain_net_srv.h @@ -31,9 +31,9 @@ along with any CellFrame SDK based project. If not, see <http://www.gnu.org/lic typedef struct dap_chain_net_srv dap_chain_net_srv_t; -typedef void (*dap_chain_net_srv_callback_t)(dap_chain_net_srv_t *, dap_chain_net_srv_client_t *); -typedef int (*dap_chain_net_srv_callback_data_t)(dap_chain_net_srv_t *, uint32_t, dap_chain_net_srv_client_t *, const void *, size_t ); -typedef int (*dap_chain_net_srv_callback_sign_request_t)(dap_chain_net_srv_t *, uint32_t, dap_chain_net_srv_client_t *, dap_chain_datum_tx_receipt_t **, size_t ); +typedef void (*dap_chain_net_srv_callback_t)(dap_chain_net_srv_t *, dap_chain_net_srv_client_remote_t *); +typedef int (*dap_chain_net_srv_callback_data_t)(dap_chain_net_srv_t *, uint32_t, dap_chain_net_srv_client_remote_t *, const void *, size_t ); +typedef int (*dap_chain_net_srv_callback_sign_request_t)(dap_chain_net_srv_t *, uint32_t, dap_chain_net_srv_client_remote_t *, dap_chain_datum_tx_receipt_t **, size_t ); typedef void (*dap_chain_net_srv_callback_ch_t)(dap_chain_net_srv_t *, dap_stream_ch_t *); typedef struct dap_chain_net_srv_banlist_item { @@ -81,8 +81,21 @@ typedef struct dap_chain_net_srv // Pointer to inheritor object void * _inhertor; } dap_chain_net_srv_t; -typedef void (*dap_chain_net_srv_callback_new_t)(dap_chain_net_srv_t *, dap_config_t *); +typedef struct dap_chain_net_srv_client_remote +{ + dap_stream_ch_t * ch; // Use ONLY in own context, not thread-safe + time_t ts_created; + dap_stream_worker_t * stream_worker; + int session_id; + dap_chain_net_remote_t *net_remote; // For remotes + uint64_t bytes_received; + uint64_t bytes_sent; + struct dap_chain_net_srv_client_remote *prev; + struct dap_chain_net_srv_client_remote *next; +} dap_chain_net_srv_client_remote_t; + +typedef void (*dap_chain_net_srv_callback_new_t)(dap_chain_net_srv_t *, dap_config_t *); int dap_chain_net_srv_init(); void dap_chain_net_srv_deinit(void); @@ -115,7 +128,7 @@ dap_chain_datum_tx_receipt_t * dap_chain_net_srv_issue_receipt(dap_chain_net_srv ); -int dap_chain_net_srv_client_init(dap_chain_net_srv_uid_t a_uid, +int dap_chain_net_srv_remote_init(dap_chain_net_srv_uid_t a_uid, dap_chain_net_srv_callback_data_t a_callback_request, dap_chain_net_srv_callback_data_t a_callback_response_success, dap_chain_net_srv_callback_data_t a_callback_response_error, diff --git a/modules/net/srv/include/dap_chain_net_srv_client.h b/modules/net/srv/include/dap_chain_net_srv_client.h index 724fbccf1..f1ea76d4e 100644 --- a/modules/net/srv/include/dap_chain_net_srv_client.h +++ b/modules/net/srv/include/dap_chain_net_srv_client.h @@ -32,17 +32,28 @@ along with any CellFrame SDK based project. If not, see <http://www.gnu.org/lic #include "dap_stream_worker.h" #include "dap_chain_net_srv_common.h" #include "dap_chain_net_remote.h" +#include "dap_chain_node_client.h" +typedef struct dap_chain_net_srv_client dap_chain_net_srv_client_t; -typedef struct dap_chain_net_srv_client -{ - dap_stream_ch_t * ch; // Use ONLY in own context, not thread-safe - time_t ts_created; - dap_stream_worker_t * stream_worker; - int session_id; - dap_chain_net_remote_t *net_remote; // For remotes - uint64_t bytes_received; - uint64_t bytes_sent; - struct dap_chain_net_srv_client *prev; - struct dap_chain_net_srv_client *next; +typedef void (*dap_chain_net_srv_client_callback_t)(dap_chain_net_srv_client_t *, void *); + +typedef struct dap_chain_net_srv_client_callbacks { + dap_chain_net_srv_client_callback_t connected; + dap_chain_net_srv_client_callback_t disconnected; + dap_chain_net_srv_client_callback_t deleted; + dap_stream_ch_callback_packet_t pkt_in; +} dap_chain_net_srv_client_callbacks_t; + +typedef struct dap_chain_net_srv_client { + dap_chain_net_srv_client_callbacks_t callbacks; + void *callbacks_arg; + dap_stream_ch_uuid_t ch_uuid; + dap_chain_node_client_t *node_client; + dap_client_t *net_client; } dap_chain_net_srv_client_t; + +dap_chain_net_srv_client_t *dap_chain_net_srv_client_create_n_connect(dap_chain_net_t *a_net, char *a_addr, uint16_t a_port, + dap_chain_net_srv_client_callbacks_t *a_callbacks, + void *a_callbacks_arg); +ssize_t dap_chain_net_srv_client_write(dap_chain_net_srv_client_t *a_client, uint8_t a_type, void *a_pkt_data, size_t a_pkt_data_size); diff --git a/modules/net/srv/include/dap_chain_net_srv_stream_session.h b/modules/net/srv/include/dap_chain_net_srv_stream_session.h index 3a7377526..e3d117b5f 100644 --- a/modules/net/srv/include/dap_chain_net_srv_stream_session.h +++ b/modules/net/srv/include/dap_chain_net_srv_stream_session.h @@ -31,14 +31,13 @@ along with any CellFrame SDK based project. If not, see <http://www.gnu.org/lic #include "dap_sign.h" #include "dap_chain_datum_tx.h" #include "dap_chain_datum_tx_receipt.h" -//#include "dap_chain_net_srv.h" #include "dap_chain_net_srv_order.h" -#include "dap_chain_net_srv_client.h" #include "dap_chain_wallet.h" typedef struct dap_chain_net_srv dap_chain_net_srv_t; +typedef struct dap_chain_net_srv_client_remote dap_chain_net_srv_client_remote_t; -typedef struct dap_chain_net_srv_usage{ +typedef struct dap_chain_net_srv_usage { uint32_t id; // Usage id pthread_rwlock_t rwlock; time_t ts_created; // Created timpestamp @@ -51,7 +50,7 @@ typedef struct dap_chain_net_srv_usage{ dap_chain_net_srv_price_t * price; // Price for issue next receipt size_t receipt_size; size_t receipt_next_size; - dap_chain_net_srv_client_t * client; + dap_chain_net_srv_client_remote_t *client; dap_chain_datum_tx_t * tx_cond; dap_chain_hash_fast_t tx_cond_hash; dap_chain_hash_fast_t client_pkey_hash; diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index 85226074b..e1e2a3cda 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -162,14 +162,14 @@ static vpn_local_network_t *s_raw_server = NULL; static pthread_rwlock_t s_raw_server_rwlock = PTHREAD_RWLOCK_INITIALIZER; // Service callbacks -static int s_callback_requested(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t * a_srv_client +static int s_callback_requested(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t * a_srv_client , const void * a_custom_data, size_t a_custom_data_size ); -static int s_callback_response_success(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t * a_srv_client +static int s_callback_response_success(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t * a_srv_client , const void * a_custom_data, size_t a_custom_data_size ); -static int s_callback_response_error(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t * a_srv_client +static int s_callback_response_error(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t * a_srv_client , const void * a_custom_data, size_t a_custom_data_size ); -static int s_callback_receipt_next_success(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t * a_srv_client, +static int s_callback_receipt_next_success(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t * a_srv_client, const void * a_receipt_next, size_t a_receipt_next_size); @@ -542,7 +542,7 @@ static dap_events_socket_t * s_tun_event_stream_create(dap_worker_t * a_worker, * @param a_success_size * @return */ -static int s_callback_client_success(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t * a_srv_client, +static int s_callback_client_success(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t * a_srv_client, const void * a_success, size_t a_success_size) { if(!a_srv || !a_srv_client || !a_srv_client->stream_worker || !a_success || a_success_size < sizeof(dap_stream_ch_chain_net_srv_pkt_success_t)) @@ -607,7 +607,7 @@ static int s_callback_client_success(dap_chain_net_srv_t * a_srv, uint32_t a_usa * @param a_receipt_size * @return */ -static int s_callback_client_sign_request(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t * a_srv_client, +static int s_callback_client_sign_request(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t * a_srv_client, dap_chain_datum_tx_receipt_t **a_receipt, size_t a_receipt_size) { dap_chain_datum_tx_receipt_t *l_receipt = *a_receipt; @@ -648,7 +648,7 @@ int dap_chain_net_srv_client_vpn_init(dap_config_t * l_config) { } - if(!dap_chain_net_srv_client_init(l_uid, s_callback_requested, + if(!dap_chain_net_srv_remote_init(l_uid, s_callback_requested, s_callback_response_success, s_callback_response_error, s_callback_receipt_next_success, s_callback_client_success, @@ -909,7 +909,7 @@ void dap_chain_net_srv_vpn_deinit(void) /** * Callback calls after successful request for service */ -static int s_callback_requested(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t * a_srv_client +static int s_callback_requested(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t * a_srv_client , const void * a_custom_data, size_t a_custom_data_size ) { @@ -923,7 +923,7 @@ static int s_callback_requested(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id /** * Called if responses success with all signature checks */ -static int s_callback_response_success(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t * a_srv_client +static int s_callback_response_success(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t * a_srv_client , const void * a_request, size_t a_request_size ) { int l_ret = 0; @@ -974,7 +974,7 @@ static int s_callback_response_success(dap_chain_net_srv_t * a_srv, uint32_t a_u -static int s_callback_receipt_next_success(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t * a_srv_client, +static int s_callback_receipt_next_success(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t * a_srv_client, const void * a_receipt_next, size_t a_receipt_next_size) { dap_chain_net_srv_stream_session_t * l_srv_session = (dap_chain_net_srv_stream_session_t *) a_srv_client->ch->stream->session->_inheritor; @@ -998,7 +998,7 @@ static int s_callback_receipt_next_success(dap_chain_net_srv_t * a_srv, uint32_t /** * If error */ -static int s_callback_response_error(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t * a_srv_client +static int s_callback_response_error(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t * a_srv_client , const void * a_custom_data, size_t a_custom_data_size ) { if (a_custom_data_size != sizeof (dap_stream_ch_chain_net_srv_pkt_error_t)){ diff --git a/modules/service/xchange/dap_chain_net_srv_xchange.c b/modules/service/xchange/dap_chain_net_srv_xchange.c index df904b670..966a13a4d 100644 --- a/modules/service/xchange/dap_chain_net_srv_xchange.c +++ b/modules/service/xchange/dap_chain_net_srv_xchange.c @@ -33,10 +33,10 @@ #define LOG_TAG "dap_chain_net_srv_xchange" static int s_cli_srv_xchange(int a_argc, char **a_argv, char **a_str_reply); -static int s_callback_requested(dap_chain_net_srv_t *a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t *a_srv_client, const void *a_data, size_t a_data_size); -static int s_callback_response_success(dap_chain_net_srv_t *a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t *a_srv_client, const void *a_data, size_t a_data_size); -static int s_callback_response_error(dap_chain_net_srv_t *a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t *a_srv_client, const void *a_data, size_t a_data_size); -static int s_callback_receipt_next_success(dap_chain_net_srv_t *a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t *a_srv_client, const void *a_data, size_t a_data_size); +static int s_callback_requested(dap_chain_net_srv_t *a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t *a_srv_client, const void *a_data, size_t a_data_size); +static int s_callback_response_success(dap_chain_net_srv_t *a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t *a_srv_client, const void *a_data, size_t a_data_size); +static int s_callback_response_error(dap_chain_net_srv_t *a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t *a_srv_client, const void *a_data, size_t a_data_size); +static int s_callback_receipt_next_success(dap_chain_net_srv_t *a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t *a_srv_client, const void *a_data, size_t a_data_size); static dap_chain_net_srv_xchange_price_t *s_xchange_db_load(char *a_key, uint8_t *a_item); static dap_chain_net_srv_xchange_t *s_srv_xchange; @@ -917,22 +917,22 @@ static int s_cli_srv_xchange(int a_argc, char **a_argv, char **a_str_reply) return 0; } -static int s_callback_requested(dap_chain_net_srv_t *a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t *a_srv_client, const void *a_data, size_t a_data_size) +static int s_callback_requested(dap_chain_net_srv_t *a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t *a_srv_client, const void *a_data, size_t a_data_size) { return 0; } -static int s_callback_response_success(dap_chain_net_srv_t *a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t *a_srv_client, const void *a_data, size_t a_data_size) +static int s_callback_response_success(dap_chain_net_srv_t *a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t *a_srv_client, const void *a_data, size_t a_data_size) { return 0; } -static int s_callback_response_error(dap_chain_net_srv_t *a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t *a_srv_client, const void *a_data, size_t a_data_size) +static int s_callback_response_error(dap_chain_net_srv_t *a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t *a_srv_client, const void *a_data, size_t a_data_size) { return 0; } -static int s_callback_receipt_next_success(dap_chain_net_srv_t *a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t *a_srv_client, const void *a_data, size_t a_data_size) +static int s_callback_receipt_next_success(dap_chain_net_srv_t *a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t *a_srv_client, const void *a_data, size_t a_data_size) { return 0; } diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index ed006f15a..c6646bf60 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -263,18 +263,13 @@ int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) l_dag->datum_add_hashes_count = dap_config_get_item_uint16_default(a_chain_cfg,"dag","datum_add_hashes_count",1); l_dag->use_event_round_cfg = false; l_dag->callback_cs_set_event_round_cfg = s_dag_chain_cs_set_event_round_cfg; - // l_dag->gdb_group_events_round_new = dap_strdup( dap_config_get_item_str_default(a_chain_cfg,"dag","gdb_group_events_round_new", - // "events.round.new")); - char * l_round_new_str = dap_strdup( dap_config_get_item_str_default(a_chain_cfg,"dag","gdb_group_events_round_new", "new")); dap_chain_net_t *l_net = dap_chain_net_by_id(a_chain->net_id); if(!l_dag->is_celled){ - //char * gdb_group = dap_strdup_printf( "%016llx-%016llx-round", l_net->pub.id.uint64, a_chain->id.uint64); char * gdb_group = dap_strdup_printf( "%s-%s-round", l_net->pub.name, a_chain->name); l_dag->gdb_group_events_round_new = dap_strdup_printf( "%s.%s", gdb_group, l_round_new_str); dap_chain_global_db_add_sync_group(gdb_group, s_history_callback_round_notify, l_dag); - }else { - //char * gdb_group = dap_strdup_printf( "%016llx-%016llx-%016llx-round", l_net->pub.id.uint64, a_chain->id.uint64, l_net->pub.cell_id.uint64); + } else { char * gdb_group = dap_strdup_printf( "%s-%s-%016llx-round", l_net->pub.name, a_chain->name, l_net->pub.cell_id.uint64); l_dag->gdb_group_events_round_new = dap_strdup_printf( "%s.%s", gdb_group, l_round_new_str); dap_chain_global_db_add_sync_group(gdb_group, s_history_callback_round_notify, l_dag); -- GitLab From 1e57da72a05edd24e2d4aa93dfe40ad3ee5dffd4 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Wed, 9 Feb 2022 14:47:59 +0300 Subject: [PATCH 4/4] [*] Net & order notificators --- modules/net/dap_chain_net.c | 25 +++---- modules/net/dap_chain_node.c | 1 + modules/net/include/dap_chain_net.h | 2 +- modules/net/srv/dap_chain_net_srv_client.c | 17 ++--- modules/net/srv/dap_chain_net_srv_order.c | 74 +++++++++++++------ .../net/srv/include/dap_chain_net_srv_order.h | 2 +- 6 files changed, 73 insertions(+), 48 deletions(-) diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 11a0edaef..0c3fe21b8 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -186,6 +186,7 @@ typedef struct dap_chain_net_pvt{ // General rwlock for structure pthread_rwlock_t rwlock; + dap_list_t *notify_callbacks; } dap_chain_net_pvt_t; typedef struct dap_chain_net_item{ @@ -264,9 +265,6 @@ static bool s_seed_mode = false; static uint8_t *dap_chain_net_set_acl(dap_chain_hash_fast_t *a_pkey_hash); uint8_t *dap_chain_net_set_acl_param(dap_chain_hash_fast_t *a_pkey_hash); - -static dap_global_db_obj_callback_notify_t s_srv_callback_notify = NULL; - /** * @brief * init network settings from cellrame-node.cfg file @@ -390,9 +388,9 @@ dap_chain_net_state_t dap_chain_net_get_target_state(dap_chain_net_t *a_net) * * @param a_callback dap_global_db_obj_callback_notify_t callback function */ -void dap_chain_net_set_srv_callback_notify(dap_global_db_obj_callback_notify_t a_callback) +void dap_chain_net_add_notify_callback(dap_chain_net_t *a_net, dap_global_db_obj_callback_notify_t a_callback) { - s_srv_callback_notify = a_callback; + PVT(a_net)->notify_callbacks = dap_list_append(PVT(a_net)->notify_callbacks, a_callback); } /** @@ -460,16 +458,17 @@ void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const c * @param a_value buffer with data * @param a_value_len buffer size */ -static void s_gbd_history_callback_notify (void * a_arg, const char a_op_code, const char * a_group, - const char * a_key, const void * a_value, const size_t a_value_len) +static void s_gbd_history_callback_notify(void *a_arg, const char a_op_code, const char *a_group, + const char *a_key, const void *a_value, const size_t a_value_len) { if (!a_arg) { return; } - dap_chain_node_mempool_autoproc_notify(a_arg, a_op_code, a_group, a_key, a_value, a_value_len); - dap_chain_net_sync_gdb_broadcast(a_arg, a_op_code, a_group, a_key, a_value, a_value_len); - if (s_srv_callback_notify) { - s_srv_callback_notify(a_arg, a_op_code, a_group, a_key, a_value, a_value_len); + dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; + for (dap_list_t *it = PVT(l_net)->notify_callbacks; it; it = PVT(l_net)->notify_callbacks->next) { + dap_global_db_obj_callback_notify_t l_callback = (dap_global_db_obj_callback_notify_t)it->data; + if (l_callback) + l_callback(a_arg, a_op_code, a_group, a_key, a_value, a_value_len); } } @@ -1953,7 +1952,7 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) if (l_gdb_sync_groups && l_gdb_sync_groups_count > 0) { for(uint16_t i = 0; i < l_gdb_sync_groups_count; i++) { // add group to special sync - dap_chain_global_db_add_sync_extra_group(l_gdb_sync_groups[i], s_gbd_history_callback_notify, l_net); + dap_chain_global_db_add_sync_extra_group(l_gdb_sync_groups[i], dap_chain_net_sync_gdb_broadcast, l_net); } } @@ -2384,7 +2383,7 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) } l_net_pvt->load_mode = false; dap_chain_ledger_load_end(l_net->pub.ledger); - + dap_chain_net_add_notify_callback(l_net, dap_chain_net_sync_gdb_broadcast); if (l_target_state != l_net_pvt->state_target) dap_chain_net_state_go_to(l_net, l_target_state); diff --git a/modules/net/dap_chain_node.c b/modules/net/dap_chain_node.c index 07532845a..0d7f37ff9 100644 --- a/modules/net/dap_chain_node.c +++ b/modules/net/dap_chain_node.c @@ -292,6 +292,7 @@ bool dap_chain_node_mempool_autoproc_init() } DAP_DELETE(l_gdb_group_mempool); } + dap_chain_net_add_notify_callback(l_net_list[i], dap_chain_node_mempool_autoproc_notify); } } DAP_DELETE(l_net_list); diff --git a/modules/net/include/dap_chain_net.h b/modules/net/include/dap_chain_net.h index 4d65fc3ea..74312794b 100644 --- a/modules/net/include/dap_chain_net.h +++ b/modules/net/include/dap_chain_net.h @@ -177,7 +177,7 @@ bool dap_chain_net_get_add_gdb_group(dap_chain_net_t *a_net, dap_chain_node_addr int dap_chain_net_verify_datum_for_add(dap_chain_net_t *a_net, dap_chain_datum_t * a_datum ); void dap_chain_net_dump_datum(dap_string_t * a_str_out, dap_chain_datum_t * a_datum, const char *a_hash_out_type); -void dap_chain_net_set_srv_callback_notify(dap_global_db_obj_callback_notify_t a_callback); +void dap_chain_net_add_notify_callback(dap_chain_net_t *a_net, dap_global_db_obj_callback_notify_t a_callback); void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const char *a_group, const char *a_key, const void *a_value, const size_t a_value_len); diff --git a/modules/net/srv/dap_chain_net_srv_client.c b/modules/net/srv/dap_chain_net_srv_client.c index 0a957e887..60b817c55 100644 --- a/modules/net/srv/dap_chain_net_srv_client.c +++ b/modules/net/srv/dap_chain_net_srv_client.c @@ -35,12 +35,6 @@ static void s_srv_client_callback_connected(dap_chain_node_client_t *a_node_clie static void s_srv_client_callback_disconnected(dap_chain_node_client_t *a_node_client, void *a_arg); static void s_srv_client_callback_deleted(dap_chain_node_client_t *a_node_client, void *a_arg); -static dap_chain_node_client_callbacks_t s_callbacks = { - .connected = s_srv_client_callback_connected, - .disconnected = s_srv_client_callback_disconnected, - .delete = s_srv_client_callback_deleted -}; - dap_chain_net_srv_client_t *dap_chain_net_srv_client_create_n_connect(dap_chain_net_t *a_net, char *a_addr, uint16_t a_port, dap_chain_net_srv_client_callbacks_t *a_callbacks, void *a_callbacks_arg) @@ -49,14 +43,19 @@ dap_chain_net_srv_client_t *dap_chain_net_srv_client_create_n_connect(dap_chain_ if (a_callbacks) memcpy(&l_ret->callbacks, a_callbacks, sizeof(*a_callbacks)); l_ret->callbacks_arg = a_callbacks_arg; + dap_chain_node_client_callbacks_t l_callbacks = { + .connected = s_srv_client_callback_connected, + .disconnected = s_srv_client_callback_disconnected, + .delete = s_srv_client_callback_deleted + }; if (a_callbacks->pkt_in) - s_callbacks.srv_pkt_in = a_callbacks->pkt_in; + l_callbacks.srv_pkt_in = a_callbacks->pkt_in; else - s_callbacks.srv_pkt_in = (dap_stream_ch_callback_packet_t)s_srv_client_pkt_in; + l_callbacks.srv_pkt_in = (dap_stream_ch_callback_packet_t)s_srv_client_pkt_in; dap_chain_node_info_t *l_info = DAP_NEW_Z(dap_chain_node_info_t); inet_pton(AF_INET, a_addr, &l_info->hdr.ext_addr_v4); l_info->hdr.ext_port = a_port; - l_ret->node_client = dap_chain_node_client_create_n_connect(a_net, l_info, "R", &s_callbacks, l_ret); + l_ret->node_client = dap_chain_node_client_create_n_connect(a_net, l_info, "R", &l_callbacks, l_ret); return l_ret; } diff --git a/modules/net/srv/dap_chain_net_srv_order.c b/modules/net/srv/dap_chain_net_srv_order.c index 21dcd4d34..01197f6b7 100644 --- a/modules/net/srv/dap_chain_net_srv_order.c +++ b/modules/net/srv/dap_chain_net_srv_order.c @@ -56,6 +56,11 @@ char *s_server_continents[]={ "Antarctica" }; +struct dap_order_notify { + dap_chain_net_t *net; + dap_global_db_obj_callback_notify_t callback; +}; +static dap_list_t *s_order_notify_callbacks = NULL; static void s_srv_order_callback_notify(void *a_arg, const char a_op_code, const char *a_group, const char *a_key, const void *a_value, const size_t a_value_len); @@ -65,7 +70,11 @@ static void s_srv_order_callback_notify(void *a_arg, const char a_op_code, const */ int dap_chain_net_srv_order_init(void) { - dap_chain_net_set_srv_callback_notify(s_srv_order_callback_notify); + uint16_t l_net_count; + dap_chain_net_t **l_net_list = dap_chain_net_list(&l_net_count); + for (uint16_t i = 0; i < l_net_count; i++) { + dap_chain_net_add_notify_callback(l_net_list[i], s_srv_order_callback_notify); + } //geoip_info_t *l_ipinfo = chain_net_geoip_get_ip_info("8.8.8.8"); return 0; } @@ -75,7 +84,7 @@ int dap_chain_net_srv_order_init(void) */ void dap_chain_net_srv_order_deinit() { - + dap_list_free_full(s_order_notify_callbacks, free); } size_t dap_chain_net_srv_order_get_size(dap_chain_net_srv_order_t *a_order) @@ -521,37 +530,54 @@ static void s_srv_order_callback_notify(void *a_arg, const char a_op_code, const const char *a_key, const void *a_value, const size_t a_value_len) { UNUSED(a_value_len); - if (!a_arg || !a_value || a_op_code != 'a' || !dap_config_get_item_bool_default(g_config, "srv", "order_signed_only", false)) { + if (!a_arg || !a_key) { return; } dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; char *l_gdb_group_str = dap_chain_net_srv_order_get_gdb_group(l_net); if (!strcmp(a_group, l_gdb_group_str)) { - dap_chain_net_srv_order_t *l_order = (dap_chain_net_srv_order_t *)a_value; - if (l_order->version != 2) { - dap_chain_global_db_gr_del(dap_strdup(a_key), a_group); - } else { - dap_sign_t *l_sign = (dap_sign_t *)&l_order->ext[l_order->ext_size]; - if (!dap_sign_verify_size(l_sign, a_value_len) || - dap_sign_verify(l_sign, l_order, sizeof(dap_chain_net_srv_order_t) + l_order->ext_size) != 1) { - dap_chain_global_db_gr_del(dap_strdup(a_key), a_group); - DAP_DELETE(l_gdb_group_str); - return; + for (dap_list_t *it = s_order_notify_callbacks; it; it = s_order_notify_callbacks->next) { + struct dap_order_notify *l_notifier = (struct dap_order_notify *)it->data; + if ((l_notifier->net == NULL || l_notifier->net == l_net) && + l_notifier->callback) { + l_notifier->callback(a_arg, a_op_code, a_group, a_key, a_value, a_value_len); } - /*dap_chain_hash_fast_t l_pkey_hash; - if (!dap_sign_get_pkey_hash(l_sign, &l_pkey_hash)) { + } + if (a_value && a_op_code != 'a' && dap_config_get_item_bool_default(g_config, "srv", "order_signed_only", false)) { + dap_chain_net_srv_order_t *l_order = (dap_chain_net_srv_order_t *)a_value; + if (l_order->version != 2) { dap_chain_global_db_gr_del(dap_strdup(a_key), a_group); - DAP_DELETE(l_gdb_group_str); - return; + } else { + dap_sign_t *l_sign = (dap_sign_t *)&l_order->ext[l_order->ext_size]; + if (!dap_sign_verify_size(l_sign, a_value_len) || + dap_sign_verify(l_sign, l_order, sizeof(dap_chain_net_srv_order_t) + l_order->ext_size) != 1) { + dap_chain_global_db_gr_del(dap_strdup(a_key), a_group); + DAP_DELETE(l_gdb_group_str); + return; + } + /*dap_chain_hash_fast_t l_pkey_hash; + if (!dap_sign_get_pkey_hash(l_sign, &l_pkey_hash)) { + dap_chain_global_db_gr_del(dap_strdup(a_key), a_group); + DAP_DELETE(l_gdb_group_str); + return; + } + dap_chain_addr_t l_addr = {}; + dap_chain_addr_fill(&l_addr, l_sign->header.type, &l_pkey_hash, l_net->pub.id); + uint128_t l_balance = dap_chain_ledger_calc_balance(l_net->pub.ledger, &l_addr, l_order->price_ticker); + uint64_t l_solvency = dap_chain_uint128_to(l_balance); + if (l_solvency < l_order->price && !dap_chain_net_srv_stake_key_delegated(&l_addr)) { + dap_chain_global_db_gr_del(dap_strdup(a_key), a_group); + }*/ } - dap_chain_addr_t l_addr = {}; - dap_chain_addr_fill(&l_addr, l_sign->header.type, &l_pkey_hash, l_net->pub.id); - uint128_t l_balance = dap_chain_ledger_calc_balance(l_net->pub.ledger, &l_addr, l_order->price_ticker); - uint64_t l_solvency = dap_chain_uint128_to(l_balance); - if (l_solvency < l_order->price && !dap_chain_net_srv_stake_key_delegated(&l_addr)) { - dap_chain_global_db_gr_del(dap_strdup(a_key), a_group); - }*/ } DAP_DELETE(l_gdb_group_str); } } + +void dap_chain_net_srv_order_add_notify_callback(dap_chain_net_t *a_net, dap_global_db_obj_callback_notify_t a_callback) +{ + struct dap_order_notify *l_notifier = DAP_NEW(struct dap_order_notify); + l_notifier->net = a_net; + l_notifier->callback = a_callback; + s_order_notify_callbacks = dap_list_append(s_order_notify_callbacks, l_notifier); +} diff --git a/modules/net/srv/include/dap_chain_net_srv_order.h b/modules/net/srv/include/dap_chain_net_srv_order.h index c5bbdadf8..ad33542ff 100644 --- a/modules/net/srv/include/dap_chain_net_srv_order.h +++ b/modules/net/srv/include/dap_chain_net_srv_order.h @@ -110,7 +110,7 @@ char *dap_chain_net_srv_order_create(dap_chain_net_t * a_net, int dap_chain_net_srv_order_save(dap_chain_net_t * a_net, dap_chain_net_srv_order_t *a_order); void dap_chain_net_srv_order_dump_to_string(dap_chain_net_srv_order_t *a_order,dap_string_t * a_str_out, const char *a_hash_out_type); - +void dap_chain_net_srv_order_add_notify_callback(dap_chain_net_t *a_net, dap_global_db_obj_callback_notify_t a_callback); /** * @brief dap_chain_net_srv_order_get_gdb_group_mempool * @param l_chain -- GitLab