/* * Authors: * Alexander Lysikov <alexander.lysikov@demlabs.net> * DeM Labs Inc. https://demlabs.net This file is part of DAP (Distributed Applications Platform) the open source project DAP (Distributed Applications Platform) is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. DAP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. */ #include <stdio.h> #include <string.h> //#include <sys/socket.h> #include <time.h> #include <errno.h> #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #define __USE_GNU /* See feature_test_macros(7) */ #include <pthread.h> #include "dap_client.h" #include "dap_common.h" #include "dap_strfuncs.h" #include "dap_list.h" #include "dap_chain_common.h" #include "dap_chain_node.h" #include "dap_chain_node_ping.h" /* #include <stdlib.h> #include <stdio.h> #include <stddef.h> #include <stdint.h> #include <string.h> #include <stdbool.h> #include <assert.h> #include <ctype.h> #include <dirent.h> */ #ifdef WIN32 #undef _WIN32_WINNT #define _WIN32_WINNT 0x0600 #include <winsock2.h> #include <windows.h> #include <mswsock.h> #include <ws2tcpip.h> #include <io.h> #include <wepoll.h> #else #include <signal.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #endif #include <pthread.h> #include "iputils/iputils.h" #include "dap_common.h" #include "dap_net.h" #include "dap_strfuncs.h" #include "dap_chain_node_cli.h" #include "dap_chain_node_ping.h" #define LOG_TAG "chain_node_ping" typedef struct s_dap_chain_node_ping_background_arg{ dap_chain_net_t *net; dap_list_t *node_list; }s_dap_chain_node_ping_background_arg_t; static void* node_ping_proc(void *a_arg) { if(!a_arg) return NULL; int l_count = *(int*)a_arg, l_port = *(int*)(a_arg + sizeof(int)); struct in_addr l_addr = *(struct in_addr*)(a_arg + sizeof(int) * 2); DAP_DELETE(a_arg); /*char *host4 = DAP_NEW_SIZE(char, INET_ADDRSTRLEN); if (!host4) { log_it(L_CRITICAL, "%s", g_error_memory_alloc); return NULL; } struct sockaddr_in sa4 = { .sin_family = AF_INET, .sin_addr = l_addr }; const char* str_ip4 = inet_ntop(AF_INET, &(((struct sockaddr_in *) &sa4)->sin_addr), host4, INET_ADDRSTRLEN); if(!str_ip4){ DAP_DELETE(host4); return NULL ; }*/ // TODO implement string handling //printf(" %s %d ping start\n", str_ip4, l_count); /* // send ping ping_handle_t *l_ping_handle = ping_handle_create(); //iputils_set_verbose(); int res = ping_util(l_ping_handle, str_ip4, l_count); DAP_DELETE(l_ping_handle); printf(" %s %d ping=%d us\n",str_ip4,l_count,res ); DAP_DELETE(host4); */ // instead of ping to connect with server and send/recv header long res = -1; { struct timespec l_time_start, l_time_stop; struct sockaddr_in l_remote_addr = { 0 }; //memset(&l_remote_addr, 0, sizeof(l_remote_addr)); l_remote_addr.sin_family = AF_INET; l_remote_addr.sin_port = htons(l_port); l_remote_addr.sin_addr = l_addr; SOCKET l_socket = socket( PF_INET, SOCK_STREAM, 0); if(l_socket == INVALID_SOCKET) { log_it(L_ERROR, "Can't create socket"); //DAP_DELETE(host4); return (void*) -1; } clock_gettime(CLOCK_MONOTONIC, &l_time_start); if(connect(l_socket, (struct sockaddr *) &l_remote_addr, sizeof(struct sockaddr_in)) != SOCKET_ERROR) { size_t l_buf_size = 1024; uint8_t l_buf[l_buf_size]; //const char* str_ip4 = inet_ntop(AF_INET, &(((struct sockaddr_in *) &sa4)->sin_addr), host4, //INET_ADDRSTRLEN); char *l_str_to_send = dap_strdup_printf("GET /%s/ping_sub_url HTTP/1.1\r\nHost: %s\r\n\r\n", DAP_UPLINK_PATH_ENC_INIT, "str_ip4"); // send data to bad suburl int l_send_count = send(l_socket, l_str_to_send, dap_strlen(l_str_to_send), 0); long l_recv_count = 0; // recv data with error message if(l_send_count > 30) l_recv_count = dap_net_recv(l_socket, l_buf, l_buf_size, 1000); // connect/send/recv was successful if(l_recv_count > 20) { clock_gettime(CLOCK_MONOTONIC, &l_time_stop); res = timespec_diff(&l_time_start, &l_time_stop, NULL); } DAP_DELETE(l_str_to_send); } else { ; //log_it(L_INFO, "Can't connect to node for ping"); } //DAP_DELETE(host4); closesocket(l_socket); } return (void*)(size_t)res; } // start sending ping int start_node_ping(pthread_t *a_thread, struct in_addr a_addr, int a_port, int a_count) { uint8_t *l_data = DAP_NEW_Z_SIZE(uint8_t, sizeof(struct in_addr) + 2 * sizeof(int)); if (!l_data) { log_it(L_CRITICAL, "%s", g_error_memory_alloc); return -1; } memcpy(l_data, &a_count, sizeof(int)); memcpy(l_data + sizeof(int), &a_port, sizeof(int)); memcpy(l_data + 2 * sizeof(int), &a_addr, sizeof(struct in_addr)); pthread_create(a_thread, NULL, node_ping_proc, l_data); return 0; } // wait for ending ping within timeout_ms milliseconds int wait_node_ping(pthread_t l_thread, int timeout_ms) { long int l_ping_time = 0; struct timespec l_wait_time; clock_gettime(CLOCK_REALTIME, &l_wait_time); timeout_ms *= 1000; l_wait_time.tv_sec += timeout_ms / DAP_USEC_PER_SEC; l_wait_time.tv_nsec += 1000 * (timeout_ms % DAP_USEC_PER_SEC); #if !defined(_WIN32) && !defined(__ANDROID__) && !defined (DAP_OS_DARWIN) int res = pthread_timedjoin_np(l_thread, (void **) &l_ping_time, &l_wait_time); #else int res = pthread_join(l_thread, (void **) &l_ping_time); #endif if(res == ETIMEDOUT) { pthread_kill(l_thread, 3); // SIGQUIT SIGABRT } else if(!res) return (int)l_ping_time; return -1; } static dap_chain_node_addr_t *s_node_addr_tr = NULL, *s_node_addr_ping = NULL; static void* node_ping_background_proc(void *a_arg) { if (!a_arg) return 0; s_dap_chain_node_ping_background_arg_t *l_arg = (s_dap_chain_node_ping_background_arg_t*)a_arg; dap_chain_net_t *l_net = l_arg->net; dap_list_t *l_node_list = l_arg->node_list; dap_chain_node_addr_t l_node_addr = { 0 }; s_node_addr_ping = DAP_NEW(dap_chain_node_addr_t); // select the nearest node from the list unsigned int l_nodes_count = dap_list_length(l_node_list); unsigned int l_thread_id = 0; pthread_t l_threads[l_nodes_count]; memset(l_threads, 0, l_nodes_count * sizeof(pthread_t)); uint64_t l_nodes_addr[l_nodes_count]; memset(l_nodes_addr, 0, l_nodes_count * sizeof(uint64_t)); dap_list_t *l_node_list0 = l_node_list; int l_min_hops = INT32_MAX; int l_min_ping = INT32_MAX; // send ping to all nodes while(l_node_list) { dap_chain_node_addr_t *l_node_addr = l_node_list->data; dap_chain_node_info_t *l_node_info = dap_chain_node_info_read(l_net, l_node_addr); if (!l_node_info) { log_it(L_ERROR, "Can't extract node info"); l_node_list = dap_list_next(l_node_list); continue; } int hops = 0, time_usec = 0; #if defined(DAP_OS_LINUX) && ! defined(DAP_OS_ANDROID) //int res = traceroute_util(l_node_info->hdr.ext_addr, &hops, &time_usec); #endif if(l_min_hops>hops) { l_min_hops = hops; s_node_addr_tr = l_node_list->data; } // start sending ping //start_node_ping(&l_threads[l_thread_id], l_node_info->hdr.ext_addr_v4, l_node_info->hdr.ext_port, 1); l_nodes_addr[l_thread_id] = l_node_info->address.uint64; l_thread_id++; DAP_DELETE(l_node_info); l_node_list = dap_list_next(l_node_list); } // wait for reply from nodes int best_node_pos = -1; int best_node_reply = INT32_MAX; // timeout for all threads int l_timeout_full_ms = 3000; // wait max 3 second for(l_thread_id = 0; l_thread_id < l_nodes_count; l_thread_id++) { if(l_timeout_full_ms < 100) l_timeout_full_ms = 100; // make small timeout anyway, may be struct timespec l_time_start; clock_gettime(CLOCK_MONOTONIC, &l_time_start); int res = wait_node_ping(l_threads[l_thread_id], l_timeout_full_ms); if(res > 0 && res < best_node_reply) { best_node_pos = l_thread_id; s_node_addr_ping->uint64 = l_nodes_addr[l_thread_id]; best_node_reply = res; } struct timespec l_time_stop; clock_gettime(CLOCK_MONOTONIC, &l_time_stop); l_timeout_full_ms -= timespec_diff(&l_time_start, &l_time_stop, NULL); //printf(" thread %x ping=%d\n", l_threads[l_thread_id], res); } if(best_node_pos > 0) { l_node_addr.uint64 = l_nodes_addr[best_node_pos]; } // TODO UNUSED(l_node_addr); // allocate memory for best node addresses dap_chain_node_addr_t *l_node_addr_tmp; l_node_addr_tmp = DAP_NEW(dap_chain_node_addr_t); if (!l_node_addr_tmp) { log_it(L_CRITICAL, "%s", g_error_memory_alloc); dap_list_free_full(l_node_list0, NULL); DAP_DEL_Z(s_node_addr_ping); DAP_DELETE(a_arg); return 0; } memcpy(l_node_addr_tmp, s_node_addr_tr, sizeof(dap_chain_node_addr_t)); DAP_DELETE(s_node_addr_tr); s_node_addr_tr = l_node_addr_tmp; DAP_DELETE(l_node_addr_tmp); l_node_addr_tmp = DAP_NEW(dap_chain_node_addr_t); if (!l_node_addr_tmp) { log_it(L_CRITICAL, "%s", g_error_memory_alloc); dap_list_free_full(l_node_list0, NULL); DAP_DEL_Z(s_node_addr_ping); DAP_DELETE(a_arg); return 0; } memcpy(l_node_addr_tmp, s_node_addr_ping, sizeof(dap_chain_node_addr_t)); DAP_DELETE(s_node_addr_ping); s_node_addr_ping = l_node_addr_tmp; dap_list_free_full(l_node_list0, NULL); DAP_DELETE(a_arg); return 0; } static pthread_t s_thread = 0; // start background thread for testing connect to the nodes int dap_chain_node_ping_background_start(dap_chain_net_t *a_net, dap_list_t *a_node_list) { if(!a_node_list) return -1; // already started if(s_thread) return 0; // copy list dap_list_t *l_node_list = NULL; dap_list_t *l_node_list_tmp = a_node_list; while(l_node_list_tmp) { dap_chain_node_addr_t *l_addr = DAP_NEW(dap_chain_node_addr_t); if (!l_addr) { log_it(L_CRITICAL, "%s", g_error_memory_alloc); dap_list_free_full(l_node_list, NULL); return -1; } memcpy(l_addr, l_node_list_tmp->data, sizeof(dap_chain_node_addr_t)); l_node_list = dap_list_append(l_node_list, l_addr); l_node_list_tmp = dap_list_next(l_node_list_tmp); } // start searching for better nodes s_dap_chain_node_ping_background_arg_t *l_arg = DAP_NEW(s_dap_chain_node_ping_background_arg_t); // uint8_t *l_arg = DAP_NEW_SIZE(uint8_t, sizeof(dap_chain_net_t*) + sizeof(dap_list_t*)); if (!l_arg) { log_it(L_CRITICAL, "%s", g_error_memory_alloc); dap_list_free_full(l_node_list, NULL); return -1; } l_arg->net = a_net; l_arg->node_list = l_node_list; // memcpy(l_arg, &a_net, sizeof(dap_chain_net_t*)); // memcpy(l_arg + sizeof(dap_chain_net_t*), &l_node_list, sizeof(dap_list_t*)); pthread_create(&s_thread, NULL, node_ping_background_proc, l_arg); return 0; } const dap_chain_node_addr_t* dap_chain_node_ping_get_node_tr(void) { return s_node_addr_tr; } const dap_chain_node_addr_t* dap_chain_node_ping_get_node_ping(void) { return s_node_addr_ping; } int dap_chain_node_ping_background_stop(void) { int l_ret = wait_node_ping(s_thread, 500); s_thread = 0; return l_ret; } int dap_chain_node_ping_background_status(void) { if(s_thread) return 1; return 0; }