Skip to content
Snippets Groups Projects
dap_chain_node_ping.c 12.19 KiB
/*
 * Authors:
 * Alexander Lysikov <alexander.lysikov@demlabs.net>
 * DeM Labs Inc.   https://demlabs.net

 This file is part of DAP (Distributed Applications Platform) the open source project

 DAP (Distributed Applications Platform) is free software: you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
 the Free Software Foundation, either version 3 of the License, or
 (at your option) any later version.

 DAP is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 GNU General Public License for more details.

 You should have received a copy of the GNU General Public License
 along with any DAP based project.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdio.h>
#include <string.h>
//#include <sys/socket.h>
#include <time.h>
#include <errno.h>
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#define __USE_GNU   /* See feature_test_macros(7) */
#include <pthread.h>

#include "dap_client.h"
#include "dap_common.h"
#include "dap_strfuncs.h"
#include "dap_list.h"
#include "dap_chain_common.h"
#include "dap_chain_node.h"
#include "dap_chain_node_ping.h"

/*
 #include <stdlib.h>
 #include <stdio.h>
 #include <stddef.h>
 #include <stdint.h>
 #include <string.h>
 #include <stdbool.h>
 #include <assert.h>
 #include <ctype.h>
 #include <dirent.h>
 */

#ifdef WIN32
#undef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#include <wepoll.h>
#else
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#endif
#include <pthread.h>

#include "iputils/iputils.h"

#include "dap_common.h"
#include "dap_net.h"
#include "dap_strfuncs.h"
#include "dap_chain_node_cli.h"
#include "dap_chain_node_ping.h"

#define LOG_TAG "chain_node_ping"

typedef struct s_dap_chain_node_ping_background_arg{
    dap_chain_net_t *net;
    dap_list_t *node_list;
}s_dap_chain_node_ping_background_arg_t;

static void* node_ping_proc(void *a_arg)
{
    if(!a_arg)
        return NULL;
    int l_count = *(int*)a_arg,
        l_port  = *(int*)(a_arg + sizeof(int));
    struct in_addr l_addr  = *(struct in_addr*)(a_arg + sizeof(int) * 2);
    DAP_DELETE(a_arg);

    /*char *host4 = DAP_NEW_SIZE(char, INET_ADDRSTRLEN);
    if (!host4) {
        log_it(L_CRITICAL, "%s", c_error_memory_alloc);
        return NULL;
    }
    struct sockaddr_in sa4 = { .sin_family = AF_INET, .sin_addr = l_addr };
    const char* str_ip4 = inet_ntop(AF_INET, &(((struct sockaddr_in *) &sa4)->sin_addr), host4, INET_ADDRSTRLEN);
    if(!str_ip4){
        DAP_DELETE(host4);
        return NULL ;
    }*/ // TODO implement string handling
    //printf(" %s %d ping start\n", str_ip4, l_count);
    /*
     // send ping
     ping_handle_t *l_ping_handle = ping_handle_create();
     //iputils_set_verbose();
     int res = ping_util(l_ping_handle, str_ip4, l_count);
     DAP_DELETE(l_ping_handle);
     printf(" %s %d ping=%d us\n",str_ip4,l_count,res );
     DAP_DELETE(host4);
     */

    // instead of ping to connect with server and send/recv header
    long res = -1;
    {
        struct timespec l_time_start, l_time_stop;
        struct sockaddr_in l_remote_addr = { 0 };
        //memset(&l_remote_addr, 0, sizeof(l_remote_addr));
        l_remote_addr.sin_family = AF_INET;
        l_remote_addr.sin_port = htons(l_port);
        l_remote_addr.sin_addr = l_addr;

        SOCKET l_socket = socket( PF_INET, SOCK_STREAM, 0);
        if(l_socket == INVALID_SOCKET) {
            log_it(L_ERROR, "Can't create socket");
            //DAP_DELETE(host4);
            return (void*) -1;
        }
        clock_gettime(CLOCK_MONOTONIC, &l_time_start);

        if(connect(l_socket, (struct sockaddr *) &l_remote_addr, sizeof(struct sockaddr_in)) != SOCKET_ERROR) {
            size_t l_buf_size = 1024;
            uint8_t l_buf[l_buf_size];

            //const char* str_ip4 = inet_ntop(AF_INET, &(((struct sockaddr_in *) &sa4)->sin_addr), host4,
            //INET_ADDRSTRLEN);
            char *l_str_to_send = dap_strdup_printf("GET /%s/ping_sub_url HTTP/1.1\r\nHost: %s\r\n\r\n",
            DAP_UPLINK_PATH_ENC_INIT, "str_ip4");
            // send data to bad suburl
            int l_send_count = send(l_socket, l_str_to_send, dap_strlen(l_str_to_send), 0);
            long l_recv_count = 0;
            // recv data with error message
            if(l_send_count > 30)
                l_recv_count = dap_net_recv(l_socket, l_buf, l_buf_size, 1000);
            // connect/send/recv was successful
            if(l_recv_count > 20) {
                clock_gettime(CLOCK_MONOTONIC, &l_time_stop);
                res = timespec_diff(&l_time_start, &l_time_stop, NULL);
            }
            DAP_DELETE(l_str_to_send);
        }
        else {
            ; //log_it(L_INFO, "Can't connect to node for ping");
        }
        //DAP_DELETE(host4);
        closesocket(l_socket);
    }
    return (void*)(size_t)res;
}

// start sending ping
int start_node_ping(pthread_t *a_thread, struct in_addr a_addr, int a_port, int a_count)
{
    uint8_t *l_data = DAP_NEW_Z_SIZE(uint8_t, sizeof(struct in_addr) + 2 * sizeof(int));
    if (!l_data) {
        log_it(L_CRITICAL, "%s", c_error_memory_alloc);
        return -1;
    }
    memcpy(l_data, &a_count, sizeof(int));
    memcpy(l_data + sizeof(int), &a_port, sizeof(int));
    memcpy(l_data + 2 * sizeof(int), &a_addr, sizeof(struct in_addr));
    pthread_create(a_thread, NULL, node_ping_proc, l_data);
    return 0;
}

// wait for ending ping within timeout_ms milliseconds
int wait_node_ping(pthread_t l_thread, int timeout_ms)
{
    long int l_ping_time = 0;
    struct timespec l_wait_time;
    clock_gettime(CLOCK_REALTIME, &l_wait_time);

    timeout_ms *= 1000;
    l_wait_time.tv_sec += timeout_ms / DAP_USEC_PER_SEC;
    l_wait_time.tv_nsec += 1000 * (timeout_ms % DAP_USEC_PER_SEC);
#if !defined(_WIN32) && !defined(__ANDROID__) && !defined (DAP_OS_DARWIN)
    int res = pthread_timedjoin_np(l_thread, (void **) &l_ping_time, &l_wait_time);
#else
    int res = pthread_join(l_thread, (void **) &l_ping_time);
#endif
    if(res == ETIMEDOUT) {
        pthread_kill(l_thread, 3); // SIGQUIT SIGABRT
    }
    else if(!res)
        return (int)l_ping_time;
    return -1;
}

static dap_chain_node_addr_t *s_node_addr_tr = NULL, *s_node_addr_ping = NULL;

static void* node_ping_background_proc(void *a_arg)
{
    if (!a_arg)
        return 0;
    s_dap_chain_node_ping_background_arg_t  *l_arg = (s_dap_chain_node_ping_background_arg_t*)a_arg;
    dap_chain_net_t *l_net = l_arg->net;
    dap_list_t *l_node_list = l_arg->node_list;
    dap_chain_node_addr_t l_node_addr = { 0 };
    s_node_addr_ping = DAP_NEW(dap_chain_node_addr_t);

    // select the nearest node from the list
    unsigned int l_nodes_count = dap_list_length(l_node_list);
    unsigned int l_thread_id = 0;
    pthread_t l_threads[l_nodes_count];
    memset(l_threads, 0, l_nodes_count * sizeof(pthread_t));
    uint64_t l_nodes_addr[l_nodes_count];
    memset(l_nodes_addr, 0, l_nodes_count * sizeof(uint64_t));

    dap_list_t *l_node_list0 = l_node_list;

    int l_min_hops = INT32_MAX;
    int l_min_ping = INT32_MAX;
    // send ping to all nodes
    while(l_node_list) {
        dap_chain_node_addr_t *l_node_addr = l_node_list->data;
        dap_chain_node_info_t *l_node_info = dap_chain_node_info_read(l_net, l_node_addr);
        if (!l_node_info) {
            log_it(L_ERROR, "Can't extract node info");
            l_node_list = dap_list_next(l_node_list);
            continue;
        }

        int hops = 0, time_usec = 0;
#if defined(DAP_OS_LINUX) && ! defined(DAP_OS_ANDROID)
        //int res = traceroute_util(l_node_info->hdr.ext_addr, &hops, &time_usec);
#endif
        if(l_min_hops>hops) {
            l_min_hops = hops;
            s_node_addr_tr = l_node_list->data;
        }

        // start sending ping
        //start_node_ping(&l_threads[l_thread_id], l_node_info->hdr.ext_addr_v4, l_node_info->hdr.ext_port, 1);
        l_nodes_addr[l_thread_id] = l_node_info->address.uint64;
        l_thread_id++;
        DAP_DELETE(l_node_info);
        l_node_list = dap_list_next(l_node_list);
    }
    // wait for reply from nodes
    int best_node_pos = -1;
    int best_node_reply = INT32_MAX;
    // timeout for all threads
    int l_timeout_full_ms = 3000; // wait max 3 second
    for(l_thread_id = 0; l_thread_id < l_nodes_count; l_thread_id++) {
        if(l_timeout_full_ms < 100)
            l_timeout_full_ms = 100; // make small timeout anyway, may be
        struct timespec l_time_start;
        clock_gettime(CLOCK_MONOTONIC, &l_time_start);
        int res = wait_node_ping(l_threads[l_thread_id], l_timeout_full_ms);
        if(res > 0 && res < best_node_reply) {
            best_node_pos = l_thread_id;
            s_node_addr_ping->uint64 = l_nodes_addr[l_thread_id];
            best_node_reply = res;
        }
        struct timespec l_time_stop;
        clock_gettime(CLOCK_MONOTONIC, &l_time_stop);
        l_timeout_full_ms -= timespec_diff(&l_time_start, &l_time_stop, NULL);
        //printf(" thread %x ping=%d\n", l_threads[l_thread_id], res);
    }
    if(best_node_pos > 0) {
        l_node_addr.uint64 = l_nodes_addr[best_node_pos];
    }
    // TODO
    UNUSED(l_node_addr);

    // allocate memory for best node addresses
    dap_chain_node_addr_t *l_node_addr_tmp;
    l_node_addr_tmp = DAP_NEW(dap_chain_node_addr_t);
    if (!l_node_addr_tmp) {
        log_it(L_CRITICAL, "%s", c_error_memory_alloc);
        dap_list_free_full(l_node_list0, NULL);
        DAP_DEL_Z(s_node_addr_ping);
        DAP_DELETE(a_arg);
        return 0;
    }
    memcpy(l_node_addr_tmp, s_node_addr_tr, sizeof(dap_chain_node_addr_t));
    DAP_DELETE(s_node_addr_tr);
    s_node_addr_tr = l_node_addr_tmp;
    DAP_DELETE(l_node_addr_tmp);

    l_node_addr_tmp = DAP_NEW(dap_chain_node_addr_t);
    if (!l_node_addr_tmp) {
        log_it(L_CRITICAL, "%s", c_error_memory_alloc);
        dap_list_free_full(l_node_list0, NULL);
        DAP_DEL_Z(s_node_addr_ping);
        DAP_DELETE(a_arg);
        return 0;
    }
    memcpy(l_node_addr_tmp, s_node_addr_ping, sizeof(dap_chain_node_addr_t));
    DAP_DELETE(s_node_addr_ping);
    s_node_addr_ping = l_node_addr_tmp;
    dap_list_free_full(l_node_list0, NULL);
    DAP_DELETE(a_arg);
    return 0;
}

static pthread_t s_thread = 0;

// start background thread for testing connect to the nodes
int dap_chain_node_ping_background_start(dap_chain_net_t *a_net, dap_list_t *a_node_list)
{
    if(!a_node_list)
        return -1;
    // already started
    if(s_thread)
        return 0;
    // copy list
    dap_list_t *l_node_list = NULL;
    dap_list_t *l_node_list_tmp = a_node_list;
    while(l_node_list_tmp) {
        dap_chain_node_addr_t *l_addr = DAP_NEW(dap_chain_node_addr_t);
        if (!l_addr) {
            log_it(L_CRITICAL, "%s", c_error_memory_alloc);
            dap_list_free_full(l_node_list, NULL);
            return -1;
        }
        memcpy(l_addr, l_node_list_tmp->data, sizeof(dap_chain_node_addr_t));
        l_node_list = dap_list_append(l_node_list, l_addr);
        l_node_list_tmp = dap_list_next(l_node_list_tmp);
    }
    // start searching for better nodes
    s_dap_chain_node_ping_background_arg_t *l_arg = DAP_NEW(s_dap_chain_node_ping_background_arg_t);
//    uint8_t *l_arg = DAP_NEW_SIZE(uint8_t, sizeof(dap_chain_net_t*) + sizeof(dap_list_t*));

    if (!l_arg) {
        log_it(L_CRITICAL, "%s", c_error_memory_alloc);
        dap_list_free_full(l_node_list, NULL);
        return -1;
    }
    l_arg->net = a_net;
    l_arg->node_list = l_node_list;
//    memcpy(l_arg, &a_net, sizeof(dap_chain_net_t*));
//    memcpy(l_arg + sizeof(dap_chain_net_t*), &l_node_list, sizeof(dap_list_t*));
    pthread_create(&s_thread, NULL, node_ping_background_proc, l_arg);
    return 0;
}
const dap_chain_node_addr_t* dap_chain_node_ping_get_node_tr(void)
{
    return s_node_addr_tr;
}

const dap_chain_node_addr_t* dap_chain_node_ping_get_node_ping(void)
{
    return s_node_addr_ping;
}

int dap_chain_node_ping_background_stop(void)
{
    int l_ret = wait_node_ping(s_thread, 500);
    s_thread = 0;
    return l_ret;
}

int dap_chain_node_ping_background_status(void)
{
    if(s_thread)
        return 1;
    return 0;
}