diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 55bb4951346d434d13abfbe38c37dfb8c0021a23..33e6e1c55968ca0e6680a6fe3068f2590fcf8186 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -492,7 +492,9 @@ static bool s_sync_update_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a DAP_DELETE(l_sync_request); return true; } - dap_chain_net_add_downlink(l_net, l_ch->stream_worker, l_ch->uuid, l_ch->stream->esocket_uuid); + dap_chain_net_add_downlink(l_net, l_ch->stream_worker, l_ch->uuid, l_ch->stream->esocket_uuid, + /*l_ch->stream->esocket->remote_addr_str*/ l_ch->stream->esocket->hostaddr, + dap_config_get_item_int32(g_config, "server", "listen_port_tcp")); dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); int l_flags = 0; if (dap_chain_net_get_extra_gdb_group(l_net, l_sync_request->request.node_addr)) diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 65c174d27981fd28aeaf1b5820fb9c1523fd5808..f953240db246dcffd8f2bcaa649881e7440d57ff 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -150,6 +150,8 @@ struct downlink { dap_stream_worker_t *worker; dap_stream_ch_uuid_t ch_uuid; dap_events_socket_uuid_t esocket_uuid; + char *addr; + int port; UT_hash_handle hh; }; @@ -281,8 +283,6 @@ static void s_update_links_timer_callback(void *a_arg){ dap_chain_net_t *l_net = (dap_chain_net_t*)a_arg; //Updated links size_t l_count_downlinks = 0,l_blocks_events = 0; - dap_stream_connection_t ** l_downlinks = dap_stream_connections_get_downlinks(&l_count_downlinks); - DAP_DEL_Z(l_downlinks); dap_chain_node_addr_t *l_current_addr = dap_chain_net_get_cur_addr(l_net); dap_chain_node_info_t *l_node_info = dap_chain_node_info_read(l_net, l_current_addr); if (!l_node_info) @@ -456,7 +456,9 @@ void dap_chain_net_add_gdb_notify_callback(dap_chain_net_t *a_net, dap_store_obj PVT(a_net)->gdb_notifiers = dap_list_append(PVT(a_net)->gdb_notifiers, l_notifier); } -int dap_chain_net_add_downlink(dap_chain_net_t *a_net, dap_stream_worker_t *a_worker, dap_stream_ch_uuid_t a_ch_uuid, dap_events_socket_uuid_t a_esocket_uuid) +int dap_chain_net_add_downlink(dap_chain_net_t *a_net, dap_stream_worker_t *a_worker, + dap_stream_ch_uuid_t a_ch_uuid, dap_events_socket_uuid_t a_esocket_uuid, + char *a_addr, int a_port) { if (!a_net || !a_worker) return -1; @@ -476,9 +478,13 @@ int dap_chain_net_add_downlink(dap_chain_net_t *a_net, dap_stream_worker_t *a_wo pthread_rwlock_unlock(&l_net_pvt->downlinks_lock); return -1; } - l_downlink->worker = a_worker; - l_downlink->ch_uuid = a_ch_uuid; - l_downlink->esocket_uuid = a_esocket_uuid; + *l_downlink = (struct downlink) { + .worker = a_worker, + .ch_uuid = a_ch_uuid, + .esocket_uuid = a_esocket_uuid, + .addr = a_addr, + .port = a_port + }; HASH_ADD_BYHASHVALUE(hh, l_net_pvt->downlinks, ch_uuid, sizeof(a_ch_uuid), a_hash_value, l_downlink); pthread_rwlock_unlock(&l_net_pvt->downlinks_lock); return 0; @@ -722,7 +728,7 @@ static int s_net_link_add(dap_chain_net_t *a_net, dap_chain_node_info_t *a_link_ static void s_net_link_remove(dap_chain_net_pvt_t *a_net_pvt, dap_chain_node_client_t *a_link, bool a_rebase) { - struct net_link *l_link, *l_link_tmp, *l_link_found = NULL; + struct net_link *l_link = NULL, *l_link_tmp = NULL, *l_link_found = NULL; HASH_ITER(hh, a_net_pvt->net_links, l_link, l_link_tmp) { if (l_link->link == a_link) { l_link_found = l_link; @@ -3700,3 +3706,39 @@ bool dap_chain_net_get_load_mode(dap_chain_net_t * a_net) { return PVT(a_net)->load_mode; } + +char *dap_chain_net_links_dump(dap_chain_net_t *a_net) { + dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); + pthread_mutex_lock(&l_net_pvt->uplinks_mutex); + dap_string_t *l_str_uplinks = dap_string_new("---------------------------\n" + "| ↑\\↓ |\t#\t|\t\tIP\t\t|\tPort\t|\n"); + struct net_link *l_link, *l_link_tmp = NULL; + size_t l_up_count = 0; + HASH_ITER(hh, l_net_pvt->net_links, l_link, l_link_tmp) { + dap_string_append_printf(l_str_uplinks, "| ↑ |\t%zu\t|\t%s\t\t|\t%u\t|\n", + ++l_up_count, + inet_ntoa(l_link->link_info->hdr.ext_addr_v4), + l_link->link_info->hdr.ext_port); + } + + size_t l_down_count = 0; + dap_string_t *l_str_downlinks = dap_string_new("---------------------------\n" + "| ↑\\↓ |\t#\t|\t\tIP\t\t|\tPort\t|\n"); + pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); + pthread_rwlock_rdlock(&l_net_pvt->downlinks_lock); + struct downlink *l_downlink = NULL, *l_downtmp = NULL; + HASH_ITER(hh, l_net_pvt->downlinks, l_downlink, l_downtmp) { + if (l_downlink->addr[0]) { + dap_string_append_printf(l_str_downlinks, "| ↓ |\t%zu\t|\t%s\t\t|\t%u\t|\n", + ++l_down_count, + l_downlink->addr, l_downlink->port); + } + } + pthread_rwlock_unlock(&l_net_pvt->downlinks_lock); + char *l_res_str = dap_strdup_printf("Count links: %zu\n\nUplinks: %zu\n%s\n\nDownlinks: %zu\n%s\n", + l_up_count + l_down_count, l_up_count, l_str_uplinks->str, + l_down_count, l_str_downlinks->str); + dap_string_free(l_str_uplinks, true); + dap_string_free(l_str_downlinks, true); + return l_res_str; +} diff --git a/modules/net/dap_chain_node_cli_cmd.c b/modules/net/dap_chain_node_cli_cmd.c index 82162dda646c8104782731e6c7a1d4fc410ca579..7e558f4a3909eba9372c911e794a5216bc66bb16 100644 --- a/modules/net/dap_chain_node_cli_cmd.c +++ b/modules/net/dap_chain_node_cli_cmd.c @@ -1026,6 +1026,7 @@ int com_global_db(int a_argc, char ** a_argv, char **a_str_reply) } dap_cli_server_cmd_set_reply_text(a_str_reply, "Group %s, key %s, data:\n %s", l_group_str, l_key_str, (char*)l_value_out); + DAP_DELETE(l_value_out); return 0; } case CMD_DELETE: @@ -1160,11 +1161,6 @@ int com_node(int a_argc, char ** a_argv, char **a_str_reply) } else if (dap_cli_server_cmd_find_option_val(a_argv, arg_index, MIN(a_argc, arg_index + 1), "connections", NULL)) { cmd_num = CMD_CONNECTIONS; -// char *l_str = NULL; -// dap_stream_connections_print(&l_str); -// dap_cli_server_cmd_set_reply_text(a_str_reply, "%s", l_str); -// DAP_DELETE(l_str); -// return 0; } else if (dap_cli_server_cmd_find_option_val(a_argv, arg_index, MIN(a_argc, arg_index + 1), "balancer", NULL)){ cmd_num = CMD_BALANCER; @@ -1575,41 +1571,9 @@ int com_node(int a_argc, char ** a_argv, char **a_str_reply) dap_cli_server_cmd_set_reply_text(a_str_reply, "Connection established"); } case CMD_CONNECTIONS: { - size_t l_uplink_count = 0; - size_t l_downlink_count = 0; - dap_stream_connection_t **l_uplinks = dap_stream_connections_get_uplinks(&l_uplink_count); - dap_stream_connection_t **l_downlinks = dap_stream_connections_get_downlinks(&l_downlink_count); - dap_string_t *l_str_uplinks = dap_string_new("---------------------------\n" - "| ↑\\↓ |\t#\t|\t\tIP\t\t|\tPort\t|\n"); - size_t l_broken_uplinks = 0; - for (size_t i = 0; i < l_uplink_count; i++) { - if (!l_uplinks[i]->stream || !l_uplinks[i]->stream->esocket) { - l_broken_uplinks++; - continue; - } - char *l_address = l_uplinks[i]->stream->esocket->remote_addr_str; - short l_port = l_uplinks[i]->stream->esocket->remote_port; - - dap_string_append_printf(l_str_uplinks, "| ↑ |\t%zu\t|\t%s\t\t|\t%u\t|\n", - i, l_address, l_port); - } - l_uplink_count -= l_broken_uplinks; - dap_string_t *l_str_downlinks = dap_string_new("---------------------------\n" - "| ↑\\↓ |\t#\t|\t\tIP\t\t|\tPort\t|\n"); - for (size_t i=0; i < l_downlink_count; i++) { - char *l_address = l_downlinks[i]->address; - - dap_string_append_printf(l_str_downlinks, "| ↓ |\t%zu\t|\t%s\t\t|\t%u\t|\n", - i, l_address, l_downlinks[i]->port); - } - dap_cli_server_cmd_set_reply_text(a_str_reply, "Count links: %zu\n\nUplinks: %zu\n%s\n\nDownlinks: %zu\n%s\n", - l_uplink_count + l_downlink_count, l_uplink_count, l_str_uplinks->str, - l_downlink_count, l_str_downlinks->str); - dap_string_free(l_str_uplinks, false); - dap_string_free(l_str_downlinks, false); - DAP_DELETE(l_downlinks); - DAP_DELETE(l_uplinks); - return 0; + char *l_reply = dap_chain_net_links_dump(l_net); + dap_cli_server_cmd_set_reply_text(a_str_reply, "%s", l_reply); + DAP_DELETE(l_reply); } break; case CMD_BALANCER: { @@ -1628,6 +1592,7 @@ int com_node(int a_argc, char ** a_argv, char **a_str_reply) } dap_cli_server_cmd_set_reply_text(a_str_reply, "Balancer link list:\n %s \n", l_string_balanc->str); + dap_string_free(l_string_balanc, true); } break; } diff --git a/modules/net/include/dap_chain_net.h b/modules/net/include/dap_chain_net.h index ffd4ea5ff1782ba248747c1aa3e02002b418d319..dc8fc580267ceb8bc76e466c2c99cc0a5ff8266a 100644 --- a/modules/net/include/dap_chain_net.h +++ b/modules/net/include/dap_chain_net.h @@ -205,7 +205,7 @@ bool dap_chain_net_get_extra_gdb_group(dap_chain_net_t *a_net, dap_chain_node_ad int dap_chain_net_verify_datum_for_add(dap_chain_t *a_chain, dap_chain_datum_t *a_datum, dap_hash_fast_t *a_datum_hash); char *dap_chain_net_verify_datum_err_code_to_str(dap_chain_datum_t *a_datum, int a_code); -int dap_chain_net_add_downlink(dap_chain_net_t *a_net, dap_stream_worker_t *a_worker, dap_stream_ch_uuid_t a_ch_uuid, dap_events_socket_uuid_t a_esocket_uuid); +int dap_chain_net_add_downlink(dap_chain_net_t *a_net, dap_stream_worker_t *a_worker, dap_stream_ch_uuid_t a_ch_uuid, dap_events_socket_uuid_t a_esocket_uuid, char *a_addr, int a_port); void dap_chain_net_add_gdb_notify_callback(dap_chain_net_t *a_net, dap_store_obj_callback_notify_t a_callback, void *a_cb_arg); void dap_chain_net_sync_gdb_broadcast(dap_global_db_context_t *a_context, dap_store_obj_t *a_obj, void *a_arg); @@ -222,3 +222,5 @@ dap_list_t *dap_chain_datum_list(dap_chain_net_t *a_net, dap_chain_t *a_chain, d int dap_chain_datum_add(dap_chain_t * a_chain, dap_chain_datum_t *a_datum, size_t a_datum_size, dap_hash_fast_t *a_datum_hash); bool dap_chain_net_get_load_mode(dap_chain_net_t * a_net); + +char *dap_chain_net_links_dump(dap_chain_net_t*);