diff --git a/dap_chain_node_cli_cmd.c b/dap_chain_node_cli_cmd.c index 0c332772e750203eaa7f4fa3b5b4f4ba4af16d4b..dcd7122fbc738eac130d40357c297618f051050c 100644 --- a/dap_chain_node_cli_cmd.c +++ b/dap_chain_node_cli_cmd.c @@ -913,7 +913,7 @@ int com_node(int a_argc, char ** a_argv, char **a_str_reply) return -1; } } - // is auto mode + // for auto mode int l_is_auto = 0; // list of dap_chain_node_addr_t struct unsigned int l_nodes_count = 0; @@ -937,55 +937,6 @@ int com_node(int a_argc, char ** a_argv, char **a_str_reply) else l_node_list = dap_list_concat(l_node_link_list, l_node_list); - /* - // select the nearest node from the list - unsigned int l_nodes_count = dap_list_length(l_node_list); - unsigned int l_thread_id = 0; - pthread_t *l_threads = DAP_NEW_Z_SIZE(pthread_t, sizeof(pthread_t) * l_nodes_count); - uint64_t *l_nodes_addr = DAP_NEW_Z_SIZE(uint64_t, sizeof(uint64_t) * l_nodes_count); - dap_list_t *l_node_list0 = l_node_list; - // send ping to all nodes - while(l_node_list) { - dap_chain_node_addr_t *l_node_addr = l_node_list->data; - dap_chain_node_info_t *l_node_info = node_info_read_and_reply(l_net, l_node_addr, NULL); - - // start sending ping - start_node_ping(&l_threads[l_thread_id], l_node_info->hdr.ext_addr_v4, l_node_info->hdr.ext_port, 1); - - l_nodes_addr[l_thread_id] = l_node_info->hdr.address.uint64; - l_thread_id++; - DAP_DELETE(l_node_info); - l_node_list = dap_list_next(l_node_list); - } - // wait for reply from nodes - int best_node_pos = -1; - int best_node_reply = INT32_MAX; - // timeout for all threads - int l_timeout_full_ms = 3000;// wait max 3 second - for(l_thread_id = 0; l_thread_id < l_nodes_count; l_thread_id++) { - if(l_timeout_full_ms<100) - l_timeout_full_ms = 100;// make small timeout anyway, may be - struct timespec l_time_start; - clock_gettime(CLOCK_MONOTONIC, &l_time_start); - int res = wait_node_ping(l_threads[l_thread_id], l_timeout_full_ms); - if(res > 0 && res < best_node_reply) { - best_node_pos = l_thread_id; - best_node_reply = res; - } - struct timespec l_time_stop; - clock_gettime(CLOCK_MONOTONIC, &l_time_stop); - l_timeout_full_ms -= timespec_diff(&l_time_start, &l_time_stop, NULL); - //printf(" thread %x ping=%d\n", l_threads[l_thread_id], res); - } - if(best_node_pos > 0) { - l_node_addr.uint64 = l_nodes_addr[best_node_pos]; - } - - DAP_DELETE(l_nodes_addr); - DAP_DELETE(l_threads); - dap_list_free_full(l_node_list0, free); - */ - // select random node from the list l_nodes_count = dap_list_length(l_node_list); if(l_nodes_count > 0) { @@ -1016,7 +967,7 @@ int com_node(int a_argc, char ** a_argv, char **a_str_reply) return -1; } // wait connected - int timeout_ms = 5000; //5 sec = 5000 ms + int timeout_ms = 100; //5 sec = 5000 ms res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_CONNECTED, timeout_ms); // select new node addr if(l_is_auto && res){ @@ -1032,16 +983,25 @@ int com_node(int a_argc, char ** a_argv, char **a_str_reply) // clean client struct dap_chain_node_client_close(l_node_client); DAP_DELETE(l_remote_node_info); + //return -1; continue; } } break; } while(1); - dap_list_free_full(l_node_list, free); + // for auto mode only + if(l_is_auto) { + //start background thread for testing connect to the nodes + dap_chain_node_ping_background_start(l_net, l_node_list); + dap_list_free_full(l_node_list, free); + } + + if(res) { - dap_chain_node_cli_set_reply_text(a_str_reply, "no response from node: code %d", res); + dap_chain_node_cli_set_reply_text(a_str_reply, "no response from remote node(s)"); + log_it(L_WARNING, "No response from remote node(s): err code %d", res); // clean client struct //dap_chain_node_client_close(l_node_client); //DAP_DELETE(l_remote_node_info); diff --git a/dap_chain_node_client.c b/dap_chain_node_client.c index bce0989d71e738eb7d3953a1b5d1953ee0a3a0e3..3ddaddcba9b8139172f25d9647fc13f1392d76fd 100644 --- a/dap_chain_node_client.c +++ b/dap_chain_node_client.c @@ -127,6 +127,8 @@ static void s_stage_status_callback(dap_client_t *a_client, void *a_arg) static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) { dap_chain_node_client_t *l_node_client = DAP_CHAIN_NODE_CLIENT(a_client); + if(!l_node_client) + return; // check for last attempt bool l_is_last_attempt = a_arg ? true : false; if(l_is_last_attempt){ @@ -471,6 +473,8 @@ dap_chain_node_client_t* dap_chain_client_connect(dap_chain_node_info_t *a_node_ // dap_client_stage_t l_stage_target = STAGE_STREAM_STREAMING; l_node_client->state = NODE_CLIENT_STATE_CONNECT; + // ref pvt client + //dap_client_pvt_ref(DAP_CLIENT_PVT(l_node_client->client)); // Handshake & connect dap_client_go_stage(l_node_client->client, a_stage_target, s_stage_connected_callback); return l_node_client; diff --git a/dap_chain_node_client.h b/dap_chain_node_client.h index 21678c23bf4ffb5d4b7a08f1dd004e5adda35dfc..4a5eb2e307768a133a46050a280d871b4bff22e2 100644 --- a/dap_chain_node_client.h +++ b/dap_chain_node_client.h @@ -75,7 +75,7 @@ typedef struct dap_chain_node_client { bool keep_connection; } dap_chain_node_client_t; -#define DAP_CHAIN_NODE_CLIENT(a) ( (dap_chain_node_client_t *) (a)->_inheritor ) +#define DAP_CHAIN_NODE_CLIENT(a) (a ? (dap_chain_node_client_t *) (a)->_inheritor : NULL) int dap_chain_node_client_init(void); diff --git a/dap_chain_node_ping.c b/dap_chain_node_ping.c index cd7d84492b218edd144b85c73bfcee133d08bd54..c43ba51a4bb2543658e8dd374b8ef04b73d8ed02 100644 --- a/dap_chain_node_ping.c +++ b/dap_chain_node_ping.c @@ -24,10 +24,14 @@ //#include <sys/socket.h> #include <time.h> #include <errno.h> +#include <pthread.h> #include "dap_common.h" #include "dap_client.h" #include "dap_strfuncs.h" +#include "dap_list.h" +#include "dap_chain_common.h" +#include "dap_chain_node.h" #include "dap_chain_node_ping.h" /* @@ -54,9 +58,10 @@ #else #include <signal.h> #endif - #include <pthread.h> +#include "iputils/iputils.h" + #include "dap_common.h" //#include "dap_client.h" #include "dap_strfuncs.h" @@ -71,7 +76,7 @@ static void* node_ping_proc(void *a_arg) int l_port = 0; int l_count; if(!a_arg) - return NULL; + return NULL ; memcpy(&l_count, a_arg, sizeof(int)); memcpy(&l_port, (a_arg + sizeof(int)), sizeof(int)); memcpy(&l_addr, (a_arg + 2 * sizeof(int)), sizeof(struct in_addr)); @@ -81,7 +86,7 @@ static void* node_ping_proc(void *a_arg) struct sockaddr_in sa4 = { .sin_family = AF_INET, .sin_addr = l_addr }; const char* str_ip4 = inet_ntop(AF_INET, &(((struct sockaddr_in *) &sa4)->sin_addr), host4, INET_ADDRSTRLEN); if(!str_ip4) - return NULL; + return NULL ; //printf(" %s %d ping start\n", str_ip4, l_count); /* // send ping @@ -115,7 +120,7 @@ static void* node_ping_proc(void *a_arg) uint8_t l_buf[l_buf_size]; const char* str_ip4 = inet_ntop(AF_INET, &(((struct sockaddr_in *) &sa4)->sin_addr), host4, - INET_ADDRSTRLEN); + INET_ADDRSTRLEN); char *l_str_to_send = dap_strdup_printf("GET /%s/ping_sub_url HTTP/1.1\r\nHost: %s\r\n\r\n", DAP_UPLINK_PATH_ENC_INIT, str_ip4); // send data to bad suburl @@ -131,8 +136,8 @@ static void* node_ping_proc(void *a_arg) } DAP_DELETE(l_str_to_send); } - else{ - ;//log_it(L_INFO, "Can't connect to node for ping"); + else { + ; //log_it(L_INFO, "Can't connect to node for ping"); } closesocket(l_socket); } @@ -169,3 +174,142 @@ int wait_node_ping(pthread_t l_thread, int timeout_ms) return l_ping_time; return -1; } + +static dap_chain_node_addr_t *s_node_addr_tr = NULL, *s_node_addr_ping = NULL; + +static void* node_ping_background_proc(void *a_arg) +{ + dap_chain_net_t *l_net; + dap_list_t *l_node_list; + memcpy(&l_net, a_arg, sizeof(dap_chain_net_t*)); + memcpy(&l_node_list, a_arg + sizeof(dap_chain_net_t*), sizeof(dap_list_t*)); + DAP_DELETE(a_arg); + dap_chain_node_addr_t l_node_addr = { 0 }; + + + // select the nearest node from the list + unsigned int l_nodes_count = dap_list_length(l_node_list); + unsigned int l_thread_id = 0; + pthread_t *l_threads = DAP_NEW_Z_SIZE(pthread_t, sizeof(pthread_t) * l_nodes_count); + uint64_t *l_nodes_addr = DAP_NEW_Z_SIZE(uint64_t, sizeof(uint64_t) * l_nodes_count); + dap_list_t *l_node_list0 = l_node_list; + + dap_chain_node_addr_t *s_node_addr_tr = NULL, *l_node_addr_ping = NULL; + int l_min_hops = INT32_MAX; + int l_min_ping = INT32_MAX; + // send ping to all nodes + while(l_node_list) { + dap_chain_node_addr_t *l_node_addr = l_node_list->data; + dap_chain_node_info_t *l_node_info = dap_chain_node_info_read(l_net, l_node_addr); + + + char *host4 = DAP_NEW_SIZE(char, INET_ADDRSTRLEN); + struct sockaddr_in sa4 = { .sin_family = AF_INET, .sin_addr = l_node_info->hdr.ext_addr_v4 }; + const char* str_ip4 = inet_ntop(AF_INET, &(((struct sockaddr_in *) &sa4)->sin_addr), host4, INET_ADDRSTRLEN); + if(!str_ip4) + continue; + int hops = 0, time_usec = 0; + int res = traceroute_util(str_ip4, &hops, &time_usec); + DAP_DELETE(host4); + if(l_min_hops>hops) + s_node_addr_tr = l_node_list->data; + + // start sending ping + start_node_ping(&l_threads[l_thread_id], l_node_info->hdr.ext_addr_v4, l_node_info->hdr.ext_port, 1); + + l_nodes_addr[l_thread_id] = l_node_info->hdr.address.uint64; + l_thread_id++; + DAP_DELETE(l_node_info); + l_node_list = dap_list_next(l_node_list); + } + // wait for reply from nodes + int best_node_pos = -1; + int best_node_reply = INT32_MAX; + // timeout for all threads + int l_timeout_full_ms = 3000; // wait max 3 second + for(l_thread_id = 0; l_thread_id < l_nodes_count; l_thread_id++) { + if(l_timeout_full_ms < 100) + l_timeout_full_ms = 100; // make small timeout anyway, may be + struct timespec l_time_start; + clock_gettime(CLOCK_MONOTONIC, &l_time_start); + int res = wait_node_ping(l_threads[l_thread_id], l_timeout_full_ms); + if(res > 0 && res < best_node_reply) { + best_node_pos = l_thread_id; + s_node_addr_ping = l_node_list->data; + best_node_reply = res; + } + struct timespec l_time_stop; + clock_gettime(CLOCK_MONOTONIC, &l_time_stop); + l_timeout_full_ms -= timespec_diff(&l_time_start, &l_time_stop, NULL); + //printf(" thread %x ping=%d\n", l_threads[l_thread_id], res); + } + if(best_node_pos > 0) { + l_node_addr.uint64 = l_nodes_addr[best_node_pos]; + } + + // allocate memory for best node addresses + dap_chain_node_addr_t *s_node_addr_tmp; + s_node_addr_tmp = DAP_NEW(dap_chain_node_addr_t); + memcmp(s_node_addr_tmp, s_node_addr_tr, sizeof(dap_chain_node_addr_t)); + s_node_addr_tr = s_node_addr_tmp; + s_node_addr_tmp = DAP_NEW(dap_chain_node_addr_t); + memcmp(s_node_addr_tmp, s_node_addr_ping, sizeof(dap_chain_node_addr_t)); + s_node_addr_ping = s_node_addr_tmp; + + // delete memory + DAP_DELETE(l_nodes_addr); + DAP_DELETE(l_threads); + dap_list_free_full(l_node_list0, free); + return 0; +} + +static pthread_t s_thread = 0; + +// start background thread for testing connect to the nodes +int dap_chain_node_ping_background_start(dap_chain_net_t *a_net, dap_list_t *a_node_list) +{ + if(!a_node_list) + return -1; + // already started + if(s_thread) + return 0; + // copy list + dap_list_t *l_node_list = NULL; + dap_list_t *l_node_list_tmp = a_node_list; + while(l_node_list_tmp) { + dap_chain_node_addr_t *l_addr = DAP_NEW(dap_chain_node_addr_t); + memcpy(l_addr, l_node_list_tmp->data, sizeof(dap_chain_node_addr_t)); + l_node_list = dap_list_append(l_node_list, l_addr); + l_node_list_tmp = dap_list_next(l_node_list_tmp); + } + // start searching for better nodes + uint8_t *l_arg = DAP_NEW_SIZE(uint8_t, sizeof(dap_chain_net_t*) + sizeof(dap_chain_net_t*)); + memcpy(l_arg, &a_net, sizeof(dap_chain_net_t*)); + memcpy(l_arg + sizeof(dap_chain_net_t*), &l_node_list, sizeof(dap_list_t*)); + pthread_create(&s_thread, NULL, node_ping_background_proc, l_arg); + return 0; +} + +const dap_chain_node_addr_t* dap_chain_node_ping_get_node_tr(void) +{ + return s_node_addr_tr; +} + +const dap_chain_node_addr_t* dap_chain_node_ping_get_node_ping(void) +{ + return s_node_addr_ping; +} + +int dap_chain_node_ping_background_stop(void) +{ + int l_ret = wait_node_ping(s_thread, 500); + s_thread = 0; + return l_ret; +} + +int dap_chain_node_ping_background_status(void) +{ + if(s_thread) + return 1; + return 0; +} diff --git a/dap_chain_node_ping.h b/dap_chain_node_ping.h index 8289031c8fab5b8ebfef3aa799ec38f48dc8b659..8085afe0243da2946aba77ea399ee6a48b9b90bf 100644 --- a/dap_chain_node_ping.h +++ b/dap_chain_node_ping.h @@ -28,3 +28,9 @@ int start_node_ping(pthread_t *a_thread, struct in_addr a_addr, int a_port, int // wait for ending ping within timeout_ms milliseconds int wait_node_ping(pthread_t l_thread, int timeout_ms); + + +// background thread for testing connect to the nodes +int dap_chain_node_ping_background_start(dap_chain_net_t *a_net, dap_list_t *a_node_list); +int dap_chain_node_ping_background_stop(void); +int dap_chain_node_ping_background_status(void);