Skip to content
Snippets Groups Projects
dap_chain_node_client.c 34.04 KiB
/*
 * Authors:
 * Dmitriy A. Gearasimov <naeper@demlabs.net>
 * Alexander Lysikov <alexander.lysikov@demlabs.net>
 * DeM Labs Inc.   https://demlabs.net

 This file is part of DAP (Deus Applications Prototypes) the open source project

 DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
 the Free Software Foundation, either version 3 of the License, or
 (at your option) any later version.

 DAP is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 GNU General Public License for more details.

 You should have received a copy of the GNU General Public License
 along with any DAP based project.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdlib.h>
#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <stddef.h>
#include <stdint.h>
#include <errno.h>
#include <assert.h>
#include <string.h>

#ifdef WIN32
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#include <pthread.h>
#else
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#endif

#include <json-c/json.h>
#include "uthash.h"

#include "dap_common.h"
#include "dap_client.h"
#include "dap_config.h"
#include "dap_events.h"
#include "dap_timerfd.h"
#include "dap_hash.h"
#include "dap_uuid.h"
//#include "dap_http_client_simple.h"
#include "dap_client_pvt.h"
#include "dap_chain_global_db_remote.h"
#include "dap_chain_global_db_hist.h"

#include "dap_chain.h"
#include "dap_chain_cell.h"

#include "dap_chain_net_srv_common.h"
#include "dap_stream_worker.h"
#include "dap_stream_ch_pkt.h"
#include "dap_stream_ch_chain.h"
#include "dap_stream_ch_chain_pkt.h"
#include "dap_stream_ch_chain_net.h"
#include "dap_stream_ch_chain_net_pkt.h"
#include "dap_stream_ch_chain_net_srv.h"
#include "dap_stream_pkt.h"

//#include "dap_chain_common.h"
#include "dap_chain_node_client.h"

#define LOG_TAG "dap_chain_node_client"

typedef struct dap_chain_node_client_handle {
    uint64_t uuid;
    dap_chain_node_client_t * client;
    UT_hash_handle hh;
} dap_chain_node_client_handle_t;

static dap_chain_node_client_handle_t * s_clients = NULL;


//static int listen_port_tcp = 8079;

static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg);
static bool s_timer_update_states_callback(void * a_arg );

static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t*, uint8_t a_pkt_type,
        dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size,
        void * a_arg);
static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type,
        dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size,
        void * a_arg);
static bool dap_chain_node_client_connect_internal(dap_chain_node_client_t *a_node_client, const char *a_active_channels);

bool s_stream_ch_chain_debug_more = false;
uint32_t s_timer_update_states=60;

/**
 * @brief dap_chain_node_client_init
 * @return
 */
int dap_chain_node_client_init(void)
{
    s_stream_ch_chain_debug_more = dap_config_get_item_bool_default(g_config,"stream_ch_chain","debug_more",false);
    s_timer_update_states = dap_config_get_item_uint32_default(g_config,"node_client","timer_update_states",60);
    return 0;
}

/**
 * @brief dap_chain_node_client_deinit
 */
void dap_chain_node_client_deinit()
{
    dap_chain_node_client_handle_t *l_client = NULL, *l_tmp = NULL;
    HASH_ITER(hh, s_clients,l_client, l_tmp){
        HASH_DEL(s_clients,l_client);
        DAP_DELETE(l_client);
    }
    //dap_http_client_simple_deinit();
    dap_client_deinit();
}

/**
 * @brief stage_status_callback
 * @param a_client
 * @param a_arg
 */
static void s_stage_status_callback(dap_client_t *a_client, void *a_arg)
{
    (void) a_client;
    (void) a_arg;

    //printf("* stage_status_callback client=%x data=%x\n", a_client, a_arg);
}

/**
 * @brief s_stage_status_error_callback
 * @param a_client
 * @param a_arg
 */
static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg)
{
    if (s_stream_ch_chain_debug_more)
        log_it(L_DEBUG, "s_stage_status_error_callback");

    dap_chain_node_client_t *l_node_client = DAP_CHAIN_NODE_CLIENT(a_client);
    if(!l_node_client)
        return;
    // check for last attempt
    bool l_is_last_attempt = a_arg ? true : false;
    if(l_is_last_attempt){
        pthread_mutex_lock(&l_node_client->wait_mutex);
        l_node_client->state = NODE_CLIENT_STATE_DISCONNECTED;
#ifndef _WIN32
        pthread_cond_broadcast(&l_node_client->wait_cond);
#else
        SetEvent( l_node_client->wait_cond );
#endif
        pthread_mutex_unlock(&l_node_client->wait_mutex);
        l_node_client->esocket_uuid = 0;

        if (l_node_client->keep_connection) {
            uint128_t *l_uuid = DAP_NEW(uint128_t);
            memcpy(l_uuid, &l_node_client->uuid, sizeof(uint128_t));
            dap_timerfd_start_on_worker(dap_events_worker_get_auto(),s_timer_update_states*1000,s_timer_update_states_callback, l_uuid);
        }
        return;
    }
    // TODO make different error codes
    if(l_node_client->callbacks.error)
        l_node_client->callbacks.error(l_node_client, EINVAL,l_node_client->callbacks_arg );
}

/**
 * @brief s_timer_update_states_callback
 * @param a_arg
 * @return
 */
static bool s_timer_update_states_callback(void * a_arg )
{
    dap_chain_node_client_handle_t *l_client_found = NULL;
    uint64_t *l_uuid = (uint64_t *)a_arg;
    assert(l_uuid);
    HASH_FIND(hh, s_clients, l_uuid, sizeof(*l_uuid), l_client_found);
    if(!l_client_found){
        log_it(L_DEBUG,"Chain node client %p was deleted before timer fired, nothing to do", l_uuid);
        DAP_DELETE(l_uuid);
        return false;
    }

    dap_chain_node_client_t *l_me = l_client_found->client;
    dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default());
    assert(l_worker);
    assert(l_me);
    dap_events_socket_t * l_es = NULL;
    dap_events_socket_uuid_t l_es_uuid = l_me->esocket_uuid;
    // check if esocket still in worker
    if( (l_es = dap_worker_esocket_find_uuid(l_worker, l_es_uuid)) != NULL ){
        dap_client_t * l_client = dap_client_from_esocket(l_es);
        if (l_client ) {
            dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t*) l_client->_inheritor;
            if (l_node_client && l_node_client->ch_chain) {
                dap_stream_ch_chain_t * l_ch_chain = (dap_stream_ch_chain_t*) l_node_client->ch_chain->internal;
                assert(l_ch_chain);
                dap_chain_net_t * l_net = l_node_client->net;
                assert(l_net);

                // If we do nothing - init sync process
                if (l_ch_chain->state == CHAIN_STATE_IDLE ||l_ch_chain->state == CHAIN_STATE_SYNC_ALL ){
                    dap_stream_ch_chain_sync_request_t l_sync_gdb = {};
                    l_sync_gdb.id_start = (uint64_t) dap_db_get_last_id_remote(l_node_client->remote_node_addr.uint64);
                    l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
                    log_it(L_DEBUG, "Prepared request to gdb sync from %"DAP_UINT64_FORMAT_U" to %"DAP_UINT64_FORMAT_U"", l_sync_gdb.id_start,
                           l_sync_gdb.id_end?l_sync_gdb.id_end:-1 );
                    // find dap_chain_id_t
                    dap_chain_t *l_chain = l_net->pub.chains;
                    dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t ) {0};
                    dap_stream_ch_chain_pkt_write_unsafe( l_node_client->ch_chain ,
                                                      DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ, l_net->pub.id.uint64,
                                                                    l_chain_id.uint64, l_net->pub.cell_id.uint64,
                                                          &l_sync_gdb, sizeof(l_sync_gdb));
                }
                return true;
            }
        }
    }
    // if we not returned yet
    l_me->state = NODE_CLIENT_STATE_DISCONNECTED;
    if (l_me->keep_connection) {
        log_it(L_INFO, "Reconnecting node client with peer "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_me->remote_node_addr));
        dap_chain_node_client_connect_internal(l_me, "CN"); // isn't always CN here?
    }
    DAP_DELETE(l_uuid);
    return false;
}

/**
 * @brief a_stage_end_callback
 * @param a_client
 * @param a_arg
 */
static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg)
{
    dap_chain_node_client_t *l_node_client = DAP_CHAIN_NODE_CLIENT(a_client);
    UNUSED(a_arg);
    if(l_node_client) {
        log_it(L_NOTICE, "Stream connection with node " NODE_ADDR_FP_STR " established",
                NODE_ADDR_FP_ARGS_S( l_node_client->remote_node_addr));
        // set callbacks for C and N channels; for R and S it is not needed
        dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client);
        if(l_client_internal && l_client_internal->active_channels) {
            size_t l_channels_count = dap_strlen(l_client_internal->active_channels);
            for(size_t i = 0; i < l_channels_count; i++) {
                if(dap_chain_node_client_set_callbacks(a_client, l_client_internal->active_channels[i]) == -1) {
                    log_it(L_WARNING, "No ch_chain channel, can't init notify callback for pkt type CH_CHAIN");
                }
            }
        }
        if(l_node_client->callbacks.connected)
            l_node_client->callbacks.connected(l_node_client, l_node_client->callbacks_arg);
        if(s_stream_ch_chain_debug_more)
            log_it(L_DEBUG, "Wakeup all who waits");
        l_node_client->state = NODE_CLIENT_STATE_ESTABLISHED;

        dap_stream_t * l_stream  = dap_client_get_stream(a_client);
        if (l_stream) {
            l_node_client->esocket_uuid = l_stream->esocket->uuid;
            if (l_node_client->keep_connection) {
                dap_events_socket_uuid_t *l_uuid = DAP_NEW(dap_events_socket_uuid_t);
                memcpy(l_uuid, &l_node_client->uuid, sizeof(dap_events_socket_uuid_t));
                dap_timerfd_start_on_worker(l_stream->esocket->worker,s_timer_update_states*1000,s_timer_update_states_callback, l_uuid);
            }
        }
#ifndef _WIN32
        pthread_cond_broadcast(&l_node_client->wait_cond);
#else
        SetEvent( l_node_client->wait_cond );
#endif
    }
}

/**
 * @brief s_ch_chain_callback_notify_packet_in2 - for dap_stream_ch_chain_net
 * @param a_ch_chain_net
 * @param a_pkt_type
 * @param a_pkt_net
 * @param a_pkt_data_size
 * @param a_arg
 */
static void s_ch_chain_callback_notify_packet_in2(dap_stream_ch_chain_net_t* a_ch_chain_net, uint8_t a_pkt_type,
        dap_stream_ch_chain_net_pkt_t *a_pkt_net, size_t a_pkt_net_data_size, void * a_arg)
{
    dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg;
    switch (a_pkt_type) {
    // get new generated current node address
    case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE: {
        if(a_pkt_net_data_size == sizeof(dap_chain_node_addr_t)) {
            dap_chain_node_addr_t *l_addr = (dap_chain_node_addr_t *) a_pkt_net->data;
            memcpy(&l_node_client->cur_node_addr, l_addr, sizeof(dap_chain_node_addr_t));
        }
        l_node_client->state = NODE_CLIENT_STATE_NODE_ADDR_LEASED;
#ifndef _WIN32
        pthread_cond_broadcast(&l_node_client->wait_cond);
#else
        SetEvent( l_node_client->wait_cond );
#endif
        break;
    }
    // get remote node address
    case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR: {

        if(a_pkt_net_data_size == sizeof(dap_chain_node_addr_t)) {
            dap_chain_node_addr_t *l_addr = (dap_chain_node_addr_t *) a_pkt_net->data;
            memcpy(&l_node_client->remote_node_addr, l_addr, sizeof(dap_chain_node_addr_t));
        }
        l_node_client->state = NODE_CLIENT_STATE_GET_NODE_ADDR;
#ifndef _WIN32
        pthread_cond_broadcast(&l_node_client->wait_cond);
#else
            SetEvent( l_node_client->wait_cond );
#endif
            break;
    }
    }
}


/**
 * @brief s_ch_chain_callback_notify_packet_in - for dap_stream_ch_chain
 * @param a_ch_chain
 * @param a_pkt_type
 * @param a_pkt
 * @param a_pkt_data_size
 * @param a_arg
 */
static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type,
        dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size,
        void * a_arg)
{
    dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg;

    switch (a_pkt_type) {
        case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR:
            dap_snprintf(l_node_client->last_error, sizeof(l_node_client->last_error),
                    "%s", (char*) a_pkt->data);
            log_it(L_WARNING, "In: Received packet DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR with error \"%s\"",
                    l_node_client->last_error);
            l_node_client->state = NODE_CLIENT_STATE_ERROR;

    #ifndef _WIN32
            pthread_cond_broadcast(&l_node_client->wait_cond);
    #else
            SetEvent( l_node_client->wait_cond );
    #endif
        break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ:{
            l_node_client->state = NODE_CLIENT_STATE_SYNC_GDB_UPDATES;
        }break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_START:{
            l_node_client->state = NODE_CLIENT_STATE_SYNC_GDB_RVRS;
            dap_chain_net_t * l_net = l_node_client->net;
            assert(l_net);
            dap_chain_net_set_state(l_net, NET_STATE_SYNC_GDB);
        }break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB:{
            l_node_client->state = NODE_CLIENT_STATE_SYNC_GDB;
        }break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ:{
            l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES;
        }break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START:{
            l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_RVRS;
            dap_chain_net_t * l_net = l_node_client->net;
            assert(l_net);
            dap_chain_net_set_state(l_net, NET_STATE_SYNC_CHAINS);
        }break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN:{
            l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS;
        }break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB:
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: {
            dap_chain_net_t *l_net = l_node_client->net;
            assert(l_net);
            dap_chain_id_t  l_chain_id = {};
            dap_chain_cell_id_t l_cell_id = {};
            if (a_pkt_type == DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB) {
                if(s_stream_ch_chain_debug_more)
                    log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" synced GDB. Going to update chains", l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr ));
                // TODO check if target net state == NET_STATE_SYNC_GDB to not synchronize chains, if it
                l_node_client->cur_chain = l_net->pub.chains;
                l_node_client->cur_cell = l_node_client->cur_chain ? l_node_client->cur_chain->cells : NULL;
            } else {
                // Check if we over with it before
                if ( ! l_node_client->cur_cell ){
                    if(s_stream_ch_chain_debug_more)
                        log_it(L_INFO, "In: No current cell in sync state, anyway we over it");
                }else
                    l_node_client->cur_cell =(dap_chain_cell_t *)  l_node_client->cur_cell->hh.next;

                // If  over with cell, switch on next chain
                if ( l_node_client->cur_cell){
                    // Check if we over with it before
                    if ( !l_node_client->cur_chain ){
                        log_it(L_ERROR, "In: No chain but cell is present, over with it");
                    }
                }else{
                    // Check if we over with it before
                    if ( !l_node_client->cur_chain ){
                        log_it(L_WARNING, "In: No current chain in sync state, anyway we over it");
                    }else{
                        l_node_client->cur_chain = (dap_chain_t *) l_node_client->cur_chain->next;
                        l_node_client->cur_cell = l_node_client->cur_chain ? l_node_client->cur_chain->cells : NULL;
                    }
                }
            }

            if (l_node_client->cur_cell)
                l_cell_id = l_node_client->cur_cell->id;
            // Check if we have some more chains and cells in it to sync
            if( l_node_client->cur_chain ){
                l_chain_id=l_node_client->cur_chain->id;
                if (s_stream_ch_chain_debug_more) {
                    dap_chain_node_addr_t * l_node_addr = dap_chain_net_get_cur_addr(l_net);
                    log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" started to sync %s chain",l_net->pub.name,
                           NODE_ADDR_FP_ARGS(l_node_addr), l_node_client->cur_chain->name );
                }
                dap_stream_ch_chain_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ,
                                                     l_net->pub.id.uint64 ,
                                                     l_chain_id.uint64,l_cell_id.uint64,NULL,0);
            }else{ // If no - over with sync process
                dap_chain_node_addr_t * l_node_addr = dap_chain_net_get_cur_addr(l_net);
                log_it(L_INFO, "In: State node %s."NODE_ADDR_FP_STR" is SYNCED",l_net->pub.name, NODE_ADDR_FP_ARGS(l_node_addr) );
                l_node_client->state = NODE_CLIENT_STATE_SYNCED;
                dap_chain_net_set_state(l_net, NET_STATE_ONLINE);
#ifndef _WIN32
                pthread_cond_broadcast(&l_node_client->wait_cond);
#else
                SetEvent( l_node_client->wait_cond );
#endif
            }
        } break;
        default: break;
    }
}



/**
 * @brief s_ch_chain_callback_notify_packet_in
 * @param a_ch_chain
 * @param a_pkt_type
 * @param a_pkt
 * @param a_pkt_data_size
 * @param a_arg
 */
static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type,
        dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size,
        void * a_arg)
{
    (void) a_pkt;
    (void) a_pkt_data_size;
    (void) a_ch_chain;
    dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg;
    assert(a_arg);
    dap_stream_ch_t * l_ch = NULL;
    if((l_ch = dap_stream_ch_find_by_uuid_unsafe(l_node_client->stream_worker, l_node_client->ch_chain_uuid)) != NULL){
        switch (a_pkt_type) {
            case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: {
                if(s_stream_ch_chain_debug_more)
                    log_it(L_INFO,"Out: global database sent to uplink "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr));
            } break;
            case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: {
                if(s_stream_ch_chain_debug_more)
                    log_it(L_INFO,"Out: chain %x sent to uplink "NODE_ADDR_FP_STR,l_node_client->cur_chain ? l_node_client->cur_chain->id.uint64 : 0, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr));
            }break;
            default: {
            }
        }
    }
}

static int save_stat_to_database(dap_stream_ch_chain_net_srv_pkt_test_t *a_request, dap_chain_node_client_t * a_node_client)
{
    int l_ret = 0;
    if(!a_request)
        return -1;
    long l_t1_ms = (long) a_request->send_time1.tv_sec * 1000 + a_request->send_time1.tv_usec / 1000;
    long l_t2_ms = (long) a_request->recv_time1.tv_sec * 1000 + a_request->recv_time1.tv_usec / 1000;
    struct json_object *jobj = json_object_new_object();
    time_t l_cur_t = time(NULL);
    char buf[1024];
    dap_time_to_str_rfc822( buf, sizeof(buf), l_cur_t );
    json_object_object_add(jobj, "time_save", json_object_new_int64(l_cur_t));
    json_object_object_add(jobj, "time_save_str", json_object_new_string(buf));
    json_object_object_add(jobj, "time_connect", json_object_new_int(a_request->time_connect_ms));
    json_object_object_add(jobj, "time_transmit", json_object_new_int(l_t2_ms-l_t1_ms));
    json_object_object_add(jobj, "ip_send", json_object_new_string(a_request->ip_send));
    json_object_object_add(jobj, "ip_recv", json_object_new_string(a_request->ip_recv));
    json_object_object_add(jobj, "time_len_send", json_object_new_int(a_request->data_size_send));
    json_object_object_add(jobj, "time_len_recv", json_object_new_int(a_request->data_size_recv));
    json_object_object_add(jobj, "err_code", json_object_new_int(a_request->err_code));
    const char* json_str = json_object_to_json_string(jobj);
    // save statistics
    char *l_group = NULL;
    dap_chain_net_t * l_net = dap_chain_net_by_id(a_request->net_id);
    if(l_net) {
        l_group = dap_strdup_printf("%s.orders-test-stat", l_net->pub.gdb_groups_prefix);
    }
    if(l_group) {
        dap_store_obj_t *l_obj = dap_chain_global_db_get_last(l_group);
        int64_t l_key = 0;
        if(l_obj) {
            l_key = strtoll(l_obj->key, NULL, 16);
        }
        char *l_key_str = dap_strdup_printf("%06x", ++l_key);
        if(!dap_chain_global_db_gr_set(dap_strdup(l_key_str), (uint8_t *) json_str, strlen(json_str) + 1, l_group)) {
            l_ret = -1;
        }
        DAP_DELETE(l_key_str);
        DAP_DELETE(l_group);
    }
    else
        l_ret = -2;
    json_object_put(jobj);
    return l_ret;
}
/**
 * @brief s_ch_chain_callback_notify_packet_R - Callback for channel 'R'
 * @param a_ch_chain
 * @param a_pkt_type
 * @param a_pkt
 * @param a_arg
 */
static void s_ch_chain_callback_notify_packet_R(dap_stream_ch_chain_net_srv_t* a_ch_chain, uint8_t a_pkt_type, dap_stream_ch_pkt_t *a_pkt, void * a_arg)
{
    dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg;
    switch (a_pkt_type) {
    // get new generated current node address
    case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE: {
            dap_stream_ch_chain_net_srv_pkt_test_t *l_request = (dap_stream_ch_chain_net_srv_pkt_test_t *) a_pkt->data;
            size_t l_request_size = l_request->data_size + sizeof(dap_stream_ch_chain_net_srv_pkt_test_t);
            if(a_pkt->hdr.size != l_request_size) {
                log_it(L_WARNING, "Wrong request size, less or more than required");
                break;
            }
            // todo to write result to database
            save_stat_to_database(l_request, l_node_client);
            //...
            l_node_client->state = NODE_CLIENT_STATE_CHECKED;
#ifndef _WIN32
            pthread_cond_broadcast(&l_node_client->wait_cond);
#else
            SetEvent( l_node_client->wait_cond );
#endif
            break;
        }
    }
}

/**
 * Create connection to server
 *
 * return a connection handle, or NULL, if an error
 */

dap_chain_node_client_t* dap_chain_node_client_connect_channels(dap_chain_net_t * l_net, dap_chain_node_info_t *a_node_info, const char *a_active_channels)
{
    return dap_chain_net_client_create_n_connect_channels(l_net,a_node_info,a_active_channels);
}

/**
 * @brief dap_chain_node_client_create_n_connect
 * @param a_net
 * @param a_node_info
 * @param a_active_channels
 * @param a_callbacks
 * @param a_callback_arg
 * @return
 */
dap_chain_node_client_t* dap_chain_node_client_create_n_connect(dap_chain_net_t * a_net, dap_chain_node_info_t *a_node_info,
        const char *a_active_channels,dap_chain_node_client_callbacks_t *a_callbacks, void * a_callback_arg )
{
    if(!a_node_info) {
        log_it(L_ERROR, "Can't connect to the node: null object node_info");
        return NULL;
    }
    dap_chain_node_client_t *l_node_client = DAP_NEW_Z(dap_chain_node_client_t);
    l_node_client->state = NODE_CLIENT_STATE_DISCONNECTED;
    l_node_client->callbacks_arg = a_callback_arg;
    if(a_callbacks)
        memcpy(&l_node_client->callbacks,a_callbacks,sizeof (*a_callbacks));
    l_node_client->info = a_node_info;
    l_node_client->net = a_net;
    l_node_client->uuid = dap_uuid_generate_uint128();
    dap_chain_node_client_handle_t * l_client_handle = DAP_NEW_Z(dap_chain_node_client_handle_t);
    l_client_handle->uuid = l_node_client->uuid;
    l_client_handle->client = l_node_client;
    HASH_ADD(hh, s_clients, uuid, sizeof(l_client_handle->uuid), l_client_handle);

#ifndef _WIN32
    pthread_condattr_t attr;
    pthread_condattr_init(&attr);
#ifndef DAP_OS_DARWIN
    pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
#endif
    pthread_cond_init(&l_node_client->wait_cond, &attr);
#else
    l_node_client->wait_cond = CreateEventA( NULL, FALSE, FALSE, NULL );
#endif

    pthread_mutex_init(&l_node_client->wait_mutex, NULL);
    l_node_client->events = NULL; //dap_events_new();
    l_node_client->remote_node_addr.uint64 = a_node_info->hdr.address.uint64;
    if (dap_chain_node_client_connect_internal(l_node_client, a_active_channels))
        return l_node_client;
    return NULL;
}

// Create new dap_client, setup it, and send it in adventure trip
static bool dap_chain_node_client_connect_internal(dap_chain_node_client_t *a_node_client, const char *a_active_channels)
{
    a_node_client->client = dap_client_new(a_node_client->events, s_stage_status_callback,
            s_stage_status_error_callback);
    a_node_client->keep_connection = true;
    dap_client_set_is_always_reconnect(a_node_client->client, true);
    a_node_client->client->_inheritor = a_node_client;
    dap_client_set_active_channels_unsafe(a_node_client->client, a_active_channels);

    //dap_client_set_auth_cert(a_node_client->client, dap_cert_find_by_name("auth")); // TODO provide the certificate choice

    int hostlen = 128;
    char host[hostlen];
    if(a_node_client->info->hdr.ext_addr_v4.s_addr){
        struct sockaddr_in sa4 = { .sin_family = AF_INET, .sin_addr = a_node_client->info->hdr.ext_addr_v4 };
        inet_ntop(AF_INET, &(((struct sockaddr_in *) &sa4)->sin_addr), host, hostlen);
        log_it(L_INFO, "Connecting to %s address",host);
    } else {
        struct sockaddr_in6 sa6 = { .sin6_family = AF_INET6, .sin6_addr = a_node_client->info->hdr.ext_addr_v6 };
        inet_ntop(AF_INET6, &(((struct sockaddr_in6 *) &sa6)->sin6_addr), host, hostlen);
        log_it(L_INFO, "Connecting to %s address",host);
    }
    // address not defined
    if(!strcmp(host, "::")) {
        dap_chain_node_client_close(a_node_client);
        return false;
    }
    dap_client_set_uplink_unsafe(a_node_client->client, strdup(host), a_node_client->info->hdr.ext_port);
//    dap_client_stage_t a_stage_target = STAGE_ENC_INIT;
//    dap_client_stage_t l_stage_target = STAGE_STREAM_STREAMING;

    a_node_client->state = NODE_CLIENT_STATE_CONNECTING ;
    // ref pvt client
    //dap_client_pvt_ref(DAP_CLIENT_PVT(a_node_client->client));
    // Handshake & connect
    dap_client_go_stage(a_node_client->client, STAGE_STREAM_STREAMING, s_stage_connected_callback);
    return true;
}

/**
 * Create connection to server
 *
 * return a connection handle, or NULL, if an error
 */
dap_chain_node_client_t* dap_chain_node_client_connect(dap_chain_net_t * a_net,dap_chain_node_info_t *a_node_info)
{
    const char *l_active_channels = "CN";
    return dap_chain_node_client_connect_channels(a_net,a_node_info, l_active_channels);
}


void dap_chain_node_client_reset(dap_chain_node_client_t *a_client)
{
    if (a_client->state > NODE_CLIENT_STATE_ESTABLISHED) {
        a_client->state = NODE_CLIENT_STATE_ESTABLISHED;
    }
}

/**
 * Close connection to server, delete chain_node_client_t *client
 */
void dap_chain_node_client_close(dap_chain_node_client_t *a_client)
{
    if (a_client && a_client->client) { // block tryes to close twice
        char l_node_addr_str[INET_ADDRSTRLEN] = {};
        inet_ntop(AF_INET, &a_client->info->hdr.ext_addr_v4, l_node_addr_str, INET_ADDRSTRLEN);
        log_it(L_INFO, "Closing node client to uplink %s:%d", l_node_addr_str, a_client->info->hdr.ext_port);
        // clean client
        dap_client_delete_mt(a_client->client);
#ifndef _WIN32
        pthread_cond_destroy(&a_client->wait_cond);
#else
        CloseHandle( a_client->wait_cond );
#endif
        pthread_mutex_destroy(&a_client->wait_mutex);
        a_client->client->_inheritor = NULL;
        a_client->client = NULL;
        dap_chain_node_client_handle_t * l_client_found = NULL;
        HASH_FIND(hh,s_clients,&a_client->uuid,sizeof(a_client->uuid),l_client_found);
        if (l_client_found){
            HASH_DEL(s_clients,l_client_found);
            DAP_DELETE(l_client_found);
        }else{
            log_it(L_WARNING, "Chain node client was removed from hash table before for some reasons");
        }

        DAP_DELETE(a_client);
    }
}

/**
 * Send stream request to server
 */
int dap_chain_node_client_send_ch_pkt(dap_chain_node_client_t *a_client, uint8_t a_ch_id, uint8_t a_type,
        const void *a_pkt_data, size_t a_pkt_data_size)
{
    if(!a_client || a_client->state < NODE_CLIENT_STATE_ESTABLISHED)
        return -1;

    dap_stream_worker_t *l_stream_worker = dap_client_get_stream_worker(a_client->client);
    dap_stream_ch_pkt_write_mt(l_stream_worker , a_client->ch_chain_uuid , a_type, a_pkt_data, a_pkt_data_size);
    return 0;
}

/**
 * wait for the complete of request
 *
 * timeout_ms timeout in milliseconds
 * waited_state state which we will wait, sample NODE_CLIENT_STATE_CONNECT or NODE_CLIENT_STATE_SENDED
 * return -2 false, -1 timeout, 0 end of connection or sending data
 */
int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_state, int a_timeout_ms)
{
    int ret = -1;
    if(!a_client){
        log_it(L_ERROR, "Can't wait for status for (null) object");
        return -3;
    }
    a_client->keep_connection = false;
    pthread_mutex_lock(&a_client->wait_mutex);
    // have waited
    if(a_client->state == a_waited_state) {
        log_it(L_INFO, "We're already in state %s",dap_chain_node_client_state_to_str(a_client->state));
        pthread_mutex_unlock(&a_client->wait_mutex);
        return 0;
    }

    if (a_client->state < NODE_CLIENT_STATE_ESTABLISHED && a_waited_state > NODE_CLIENT_STATE_ESTABLISHED) {
        log_it(L_WARNING, "Waited state can't be achieved");
        pthread_mutex_unlock(&a_client->wait_mutex);
        return -2;
    }

#ifndef DAP_OS_WINDOWS
    // prepare for signal waiting
    struct timespec l_cond_timeout;
    clock_gettime( CLOCK_MONOTONIC, &l_cond_timeout);
    l_cond_timeout.tv_sec += a_timeout_ms/1000;
#else
    pthread_mutex_unlock( &a_client->wait_mutex );
#endif

    // signal waiting

#ifndef DAP_OS_WINDOWS
    do {
        int l_ret_wait = pthread_cond_timedwait(&a_client->wait_cond, &a_client->wait_mutex, &l_cond_timeout);
        if(l_ret_wait == 0 && (
                a_client->state == a_waited_state ||
                        (a_client->state == NODE_CLIENT_STATE_ERROR || a_client->state == NODE_CLIENT_STATE_DISCONNECTED))
                ) {
            ret = a_client->state == a_waited_state ? 0 : -2;
            break;
        }
        else if(l_ret_wait == ETIMEDOUT) { // 110 260
            //log_it(L_NOTICE,"Wait for status is stopped by timeout");
            ret = -1;
            break;
        }else if (l_ret_wait != 0 ){
            char l_errbuf[128];
            l_errbuf[0] = '\0';
            strerror_r(l_ret_wait,l_errbuf,sizeof (l_errbuf));
            log_it(L_ERROR, "Pthread condition timed wait returned \"%s\"(code %d)", l_errbuf, l_ret_wait);
        }
    } while(1);
#else
    DWORD wait = WaitForSingleObject( a_client->wait_cond, (uint32_t)a_timeout_ms);
    if ( wait == WAIT_OBJECT_0 && (
             a_client->state == a_waited_state ||
             a_client->state == NODE_CLIENT_STATE_ERROR ||
             a_client->state == NODE_CLIENT_STATE_DISCONNECTED))
    {
        return a_client->state == a_waited_state ? 0 : -2;
    } else if ( wait == WAIT_TIMEOUT || wait == WAIT_FAILED ) {
        return -1;
    }
#endif

#ifndef DAP_OS_WINDOWS
    pthread_mutex_unlock(&a_client->wait_mutex);
#endif
    return ret;
}

int dap_chain_node_client_set_callbacks(dap_client_t *a_client, uint8_t a_ch_id)
{
    int l_ret = -1;
    dap_chain_node_client_t *l_node_client = a_client->_inheritor;
    if(l_node_client) {
        pthread_mutex_lock(&l_node_client->wait_mutex);
        // find current channel code
        dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client);
        dap_stream_ch_t * l_ch = NULL;
        if(l_client_internal)
            l_ch = dap_client_get_stream_ch_unsafe(a_client, a_ch_id);
        if(l_ch) {
            // C
            if(a_ch_id == dap_stream_ch_chain_get_id()) {
                dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
                l_ch_chain->callback_notify_packet_out = s_ch_chain_callback_notify_packet_out;
                l_ch_chain->callback_notify_packet_in = s_ch_chain_callback_notify_packet_in;
                l_ch_chain->callback_notify_arg = l_node_client;
            }
            // N
            if(a_ch_id == dap_stream_ch_chain_net_get_id()) {
                dap_stream_ch_chain_net_t *l_ch_chain = DAP_STREAM_CH_CHAIN_NET(l_ch);
                l_ch_chain->notify_callback = s_ch_chain_callback_notify_packet_in2;
                l_ch_chain->notify_callback_arg = l_node_client;
            }
            // R
            if(a_ch_id == dap_stream_ch_chain_net_srv_get_id()) {
                dap_stream_ch_chain_net_srv_t * l_ch_chain = DAP_STREAM_CH_CHAIN_NET_SRV(l_ch);
                l_ch_chain->notify_callback = s_ch_chain_callback_notify_packet_R;
                l_ch_chain->notify_callback_arg = l_node_client;
            }
            l_ret = 0;
        } else {
        }
        pthread_mutex_unlock(&l_node_client->wait_mutex);
    }
    return l_ret;
}

/*static void nodelist_response_callback(dap_client_t *a_client, void *data, size_t data_len)
{
}

static void nodelist_response_error_callback(dap_client_t *a_client, int a_err)
{
}*/

/**
 * Send nodelist request to server
 */
int dap_chain_node_client_send_nodelist_req(dap_chain_node_client_t *a_client)
{
    if(!a_client || !a_client->client || a_client->state < NODE_CLIENT_STATE_ESTABLISHED)
        return -1;
    //dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client->client);

    //TODO send request to get nodelist
    //dap_client_request_enc(a_client->client, DAP_UPLINK_PATH_NODE_LIST, "", "", "", 0,
    //        nodelist_response_callback, nodelist_response_error_callback);
    return 1;
}