Skip to content
Snippets Groups Projects
dap_chain_node_client.c 30.73 KiB
/*
 * Authors:
 * Dmitriy A. Gearasimov <naeper@demlabs.net>
 * Alexander Lysikov <alexander.lysikov@demlabs.net>
 * DeM Labs Inc.   https://demlabs.net

 This file is part of DAP (Deus Applications Prototypes) the open source project

 DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
 the Free Software Foundation, either version 3 of the License, or
 (at your option) any later version.

 DAP is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 GNU General Public License for more details.

 You should have received a copy of the GNU General Public License
 along with any DAP based project.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdlib.h>
#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <stddef.h>
#include <stdint.h>
#include <errno.h>
#include <assert.h>
#include <string.h>
#include <json-c/json.h>

#ifdef WIN32
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#include <pthread.h>
#else
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#endif

#include "dap_common.h"
#include "dap_client.h"
#include "dap_config.h"
#include "dap_events.h"
#include "dap_hash.h"
//#include "dap_http_client_simple.h"
#include "dap_client_pvt.h"
#include "dap_chain_global_db_remote.h"
#include "dap_chain_global_db_hist.h"

#include "dap_chain.h"
#include "dap_chain_cell.h"

#include "dap_chain_net_srv_common.h"
#include "dap_stream_worker.h"
#include "dap_stream_ch_pkt.h"
#include "dap_stream_ch_chain.h"
#include "dap_stream_ch_chain_pkt.h"
#include "dap_stream_ch_chain_net.h"
#include "dap_stream_ch_chain_net_pkt.h"
#include "dap_stream_ch_chain_net_srv.h"
#include "dap_stream_pkt.h"

//#include "dap_chain_common.h"
#include "dap_chain_node_client.h"

#define LOG_TAG "dap_chain_node_client"


//static int listen_port_tcp = 8079;

static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg);
static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t*, uint8_t a_pkt_type,
        dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size,
        void * a_arg);
static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type,
        dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size,
        void * a_arg);

bool s_stream_ch_chain_debug_more = false;
/**
 * @brief dap_chain_node_client_init
 * @return
 */
int dap_chain_node_client_init(void)
{
    s_stream_ch_chain_debug_more = dap_config_get_item_bool_default(g_config,"stream_ch_chain","debug_more",false);

    return 0;
}

/**
 * @brief dap_chain_node_client_deinit
 */
void dap_chain_node_client_deinit()
{
    //dap_http_client_simple_deinit();
    dap_client_deinit();
}

/**
 * @brief stage_status_callback
 * @param a_client
 * @param a_arg
 */
static void s_stage_status_callback(dap_client_t *a_client, void *a_arg)
{
    (void) a_client;
    (void) a_arg;

    //printf("* stage_status_callback client=%x data=%x\n", a_client, a_arg);
}

/**
 * @brief s_stage_status_error_callback
 * @param a_client
 * @param a_arg
 */
static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg)
{
    dap_chain_node_client_t *l_node_client = DAP_CHAIN_NODE_CLIENT(a_client);
    if(!l_node_client)
        return;
    // check for last attempt
    bool l_is_last_attempt = a_arg ? true : false;
    if(l_is_last_attempt){
        pthread_mutex_lock(&l_node_client->wait_mutex);
        l_node_client->state = NODE_CLIENT_STATE_DISCONNECTED;
#ifndef _WIN32
        pthread_cond_broadcast(&l_node_client->wait_cond);
#else
        SetEvent( l_node_client->wait_cond );
#endif
        pthread_mutex_unlock(&l_node_client->wait_mutex);
        return;
    }

    if(l_node_client && l_node_client->keep_connection &&
            ((dap_client_get_stage(a_client) != STAGE_STREAM_STREAMING) ||
                    (dap_client_get_stage_status(a_client) == STAGE_STATUS_ERROR))) {
        log_it(L_NOTICE,"Some errors happends, current state is %s but we need to return back to STAGE_STREAM_STREAMING",
                dap_client_get_stage_str(a_client) );

        // TODO make different error codes
        if(l_node_client->callbacks.error)
            l_node_client->callbacks.error(l_node_client, EINVAL,l_node_client->callbacks_arg );

        if (s_stream_ch_chain_debug_more)
            log_it(L_DEBUG, "Wakeup all who waits");
        pthread_mutex_lock(&l_node_client->wait_mutex);
        l_node_client->state = NODE_CLIENT_STATE_ERROR;

#ifndef _WIN32
        pthread_cond_broadcast(&l_node_client->wait_cond);
#else
        SetEvent( l_node_client->wait_cond );
#endif
        pthread_mutex_unlock(&l_node_client->wait_mutex);
        //dap_client_go_stage( a_client , STAGE_STREAM_STREAMING, s_stage_end_callback );
    }

    //printf("* tage_status_error_callback client=%x data=%x\n", a_client, a_arg);
}

/**
 * @brief a_stage_end_callback
 * @param a_client
 * @param a_arg
 */
static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg)
{
    dap_chain_node_client_t *l_node_client = a_client->_inheritor;
    //assert(l_node_client);
    if(l_node_client) {
        log_it(L_NOTICE, "Stream connection with node " NODE_ADDR_FP_STR " established",
                NODE_ADDR_FP_ARGS_S( l_node_client->remote_node_addr));
        // set callbacks for C and N channels; for R and S it is not needed
        dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client);
        if(l_client_internal && l_client_internal->active_channels) {
            size_t l_channels_count = dap_strlen(l_client_internal->active_channels);
            for(size_t i = 0; i < l_channels_count; i++) {
                if(dap_chain_node_client_set_callbacks(a_client, l_client_internal->active_channels[i]) == -1) {
                    log_it(L_WARNING, "No ch_chain channel, can't init notify callback for pkt type CH_CHAIN");
                }
            }
        }
        if(l_node_client->callbacks.connected)
            l_node_client->callbacks.connected(l_node_client, l_node_client->callbacks_arg);
        l_node_client->keep_connection = true;
        if(s_stream_ch_chain_debug_more)
            log_it(L_DEBUG, "Wakeup all who waits");
        l_node_client->state = NODE_CLIENT_STATE_ESTABLISHED;
#ifndef _WIN32
        pthread_cond_broadcast(&l_node_client->wait_cond);
#else
        SetEvent( l_node_client->wait_cond );
#endif
    }
}

/**
 * @brief s_ch_chain_callback_notify_packet_in2 - for dap_stream_ch_chain_net
 * @param a_ch_chain_net
 * @param a_pkt_type
 * @param a_pkt_net
 * @param a_pkt_data_size
 * @param a_arg
 */
static void s_ch_chain_callback_notify_packet_in2(dap_stream_ch_chain_net_t* a_ch_chain_net, uint8_t a_pkt_type,
        dap_stream_ch_chain_net_pkt_t *a_pkt_net, size_t a_pkt_net_data_size, void * a_arg)
{
    dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg;
    switch (a_pkt_type) {
    // get new generated current node address
    case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE: {
        if(a_pkt_net_data_size == sizeof(dap_chain_node_addr_t)) {
            dap_chain_node_addr_t *l_addr = (dap_chain_node_addr_t *) a_pkt_net->data;
            memcpy(&l_node_client->cur_node_addr, l_addr, sizeof(dap_chain_node_addr_t));
        }
        l_node_client->state = NODE_CLIENT_STATE_NODE_ADDR_LEASED;
#ifndef _WIN32
        pthread_cond_broadcast(&l_node_client->wait_cond);
#else
        SetEvent( l_node_client->wait_cond );
#endif
        break;
    }
    // get remote node address
    case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR: {

        if(a_pkt_net_data_size == sizeof(dap_chain_node_addr_t)) {
            dap_chain_node_addr_t *l_addr = (dap_chain_node_addr_t *) a_pkt_net->data;
            memcpy(&l_node_client->remote_node_addr, l_addr, sizeof(dap_chain_node_addr_t));
        }
        l_node_client->state = NODE_CLIENT_STATE_GET_NODE_ADDR;
#ifndef _WIN32
        pthread_cond_broadcast(&l_node_client->wait_cond);
#else
            SetEvent( l_node_client->wait_cond );
#endif
            break;
    }
    }
}


/**
 * @brief s_ch_chain_callback_notify_packet_in - for dap_stream_ch_chain
 * @param a_ch_chain
 * @param a_pkt_type
 * @param a_pkt
 * @param a_pkt_data_size
 * @param a_arg
 */
static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type,
        dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size,
        void * a_arg)
{
    dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg;

    switch (a_pkt_type) {
        case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR:
            dap_snprintf(l_node_client->last_error, sizeof(l_node_client->last_error),
                    "%s", (char*) a_pkt->data);
            log_it(L_WARNING, "In: Received packet DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR with error \"%s\"",
                    l_node_client->last_error);
            l_node_client->state = NODE_CLIENT_STATE_ERROR;

    #ifndef _WIN32
            pthread_cond_broadcast(&l_node_client->wait_cond);
    #else
            SetEvent( l_node_client->wait_cond );
    #endif
        break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END:{
            dap_chain_net_t * l_net = l_node_client->net;
            assert(l_net);
            if (s_stream_ch_chain_debug_more)
                log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" UPDATE_CHAINS_END: %zd hashes on remote",
                       l_net->pub.name,
                       NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr )
                       );
        }break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB:{
            dap_chain_net_t * l_net = l_node_client->net;
            assert(l_net);
            if(s_stream_ch_chain_debug_more)
                log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" synced GDB", l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr ));

            // We over with GLOBAL_DB and switch on syncing chains
            l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES;
            dap_chain_net_state_t l_net_state = dap_chain_net_get_state(l_node_client->net);
            if (l_net_state == NET_STATE_SYNC_GDB  )
                dap_chain_net_set_state(l_node_client->net, NET_STATE_SYNC_CHAINS );

            // Begin from the first chain
            l_node_client->cur_chain =  l_node_client->net->pub.chains;
            dap_chain_cell_id_t l_cell_id={0};
            dap_chain_id_t l_chain_id={0};
            if(! l_node_client->cur_chain){
                log_it(L_CRITICAL,"In: Can't sync chains for %s because there is no chains in it",l_net->pub.name);
                dap_stream_ch_chain_pkt_write_error_unsafe(a_ch_chain->ch,l_net->pub.id.uint64,
                                                           l_chain_id.uint64,l_cell_id.uint64,"ERROR_CHAIN_NO_CHAINS");
            }else{ // If present - select the first one cell in chain
                l_chain_id=l_node_client->cur_chain->id;
                dap_chain_cell_t * l_cell = l_node_client->cur_chain->cells;
                if (l_cell){
                    l_cell_id=l_cell->id;
                }
                uint64_t l_net_id = l_net->pub.id.uint64;

                dap_stream_ch_chain_pkt_t * l_chain_pkt;
                size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr);
                l_chain_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size );
                l_chain_pkt->hdr.version = 1;
                l_chain_pkt->hdr.net_id.uint64 = l_net_id;
                l_chain_pkt->hdr.cell_id.uint64 = l_cell_id.uint64;
                l_chain_pkt->hdr.chain_id.uint64 = l_chain_id.uint64;
                dap_stream_ch_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START ,
                                                     l_chain_pkt,l_chain_pkt_size);
                DAP_DELETE(l_chain_pkt);
                log_it(L_INFO,
                       "In: Send UPDATE_CHAINS_START: net_id=0x%016x chain_id=0x%016x cell_id=0x%016x  ",
                       l_net_id,l_chain_id.uint64,l_cell_id.uint64
                       );
            }

        }break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: {
            dap_stream_ch_chain_sync_request_t * l_request = NULL;
            if(a_pkt_data_size == sizeof(*l_request))
                l_request = (dap_stream_ch_chain_sync_request_t*) a_pkt->data;

            if(l_request) {
                // Process it if need
            }

            // Check if we over with it before
            if ( !l_node_client->cur_cell ){
                log_it(L_WARNING, "In: No current cell in sync state, anyway we over it");
            }else
                l_node_client->cur_cell =(dap_chain_cell_t *)  l_node_client->cur_cell->hh.next;

            dap_chain_net_t * l_net = l_node_client->net;
            assert(l_net);
            dap_chain_node_addr_t * l_node_addr = dap_chain_net_get_cur_addr(l_net);

            // If  over with cell, switch on next chain
            if ( l_node_client->cur_cell){
                // Check if we over with it before
                if ( !l_node_client->cur_chain ){
                    log_it(L_ERROR, "In: No chain but cell is present, over wit it");
                }else{
                    dap_chain_id_t  l_chain_id=l_node_client->cur_chain->id;
                    dap_chain_cell_id_t l_cell_id = l_node_client->cur_cell->id;
                    l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES;
                    dap_stream_ch_chain_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START,
                                                         l_net->pub.id.uint64 ,
                                                         l_chain_id.uint64,l_cell_id.uint64,NULL,0);
                }
            }else{
                // Check if we over with it before
                if ( !l_node_client->cur_chain ){
                    log_it(L_WARNING, "In: No current chain in sync state, anyway we over it");
                }else{
                    l_node_client->cur_chain = (dap_chain_t *) l_node_client->cur_chain->next;
                    l_node_client->cur_cell = l_node_client->cur_chain? l_node_client->cur_chain->cells : NULL;
                }
                dap_chain_id_t  l_chain_id={0};
                dap_chain_cell_id_t l_cell_id = {0};

                if (l_node_client->cur_cell)
                    l_cell_id = l_node_client->cur_cell->id;

                // Check if we have some more chains and cells in it to sync
                if( l_node_client->cur_chain ){
                    l_chain_id=l_node_client->cur_chain->id;

                    l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES;
                    if (s_stream_ch_chain_debug_more)
                        log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" started to sync %s chain",l_net->pub.name,
                               NODE_ADDR_FP_ARGS(l_node_addr), l_node_client->cur_chain->name );

                    dap_stream_ch_chain_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START,
                                                         l_net->pub.id.uint64 ,
                                                         l_chain_id.uint64,l_cell_id.uint64,NULL,0);
                }else{ // If no - over with sync process
                    log_it(L_INFO, "In: State node %s."NODE_ADDR_FP_STR" is SYNCED",l_net->pub.name, NODE_ADDR_FP_ARGS(l_node_addr) );
                    l_node_client->state = NODE_CLIENT_STATE_SYNCED;
                    if (dap_chain_net_get_state(l_net) == NET_STATE_SYNC_CHAINS )
                        dap_chain_net_set_state(l_net, NET_STATE_ONLINE);
            #ifndef _WIN32
                    pthread_cond_broadcast(&l_node_client->wait_cond);
            #else
                    SetEvent( l_node_client->wait_cond );
            #endif

                }
            }

        } break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL:{
            dap_chain_net_t * l_net = l_node_client->net;
            assert(l_net);
            dap_chain_net_set_state(l_net, NET_STATE_ONLINE);
        }break;
        default: break;
    }
}



/**
 * @brief s_ch_chain_callback_notify_packet_in
 * @param a_ch_chain
 * @param a_pkt_type
 * @param a_pkt
 * @param a_pkt_data_size
 * @param a_arg
 */
static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type,
        dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size,
        void * a_arg)
{
    (void) a_pkt;
    (void) a_pkt_data_size;
    (void) a_ch_chain;
    dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg;
    switch (a_pkt_type) {
    case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL:
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB:
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: {
        l_node_client->state = NODE_CLIENT_STATE_SYNCED;
#ifndef _WIN32
        pthread_cond_broadcast(&l_node_client->wait_cond);
#else
        SetEvent( l_node_client->wait_cond );
#endif
    }
        break;
    default: {
    }
    }
}

static int save_stat_to_database(dap_stream_ch_chain_net_srv_pkt_test_t *a_request, dap_chain_node_client_t * a_node_client)
{
    int l_ret = 0;
    if(!a_request)
        return -1;
    long l_t1_ms = (long) a_request->send_time1.tv_sec * 1000 + a_request->send_time1.tv_usec / 1000;
    long l_t2_ms = (long) a_request->recv_time1.tv_sec * 1000 + a_request->recv_time1.tv_usec / 1000;
    struct json_object *jobj = json_object_new_object();
    time_t l_cur_t = time(NULL);
    char buf[1024];
    dap_time_to_str_rfc822( buf, sizeof(buf), l_cur_t );
    json_object_object_add(jobj, "time_save", json_object_new_int64(l_cur_t));
    json_object_object_add(jobj, "time_save_str", json_object_new_string(buf));
    json_object_object_add(jobj, "time_connect", json_object_new_int(a_request->time_connect_ms));
    json_object_object_add(jobj, "time_transmit", json_object_new_int(l_t2_ms-l_t1_ms));
    json_object_object_add(jobj, "ip_send", json_object_new_string(a_request->ip_send));
    json_object_object_add(jobj, "ip_recv", json_object_new_string(a_request->ip_recv));
    json_object_object_add(jobj, "time_len_send", json_object_new_int(a_request->data_size_send));
    json_object_object_add(jobj, "time_len_recv", json_object_new_int(a_request->data_size_recv));
    json_object_object_add(jobj, "err_code", json_object_new_int(a_request->err_code));
    const char* json_str = json_object_to_json_string(jobj);
    // save statistics
    char *l_group = NULL;
    dap_chain_net_t * l_net = dap_chain_net_by_id(a_request->net_id);
    if(l_net) {
        l_group = dap_strdup_printf("%s.orders-test-stat", l_net->pub.gdb_groups_prefix);
    }
    if(l_group) {
        dap_store_obj_t *l_obj = dap_chain_global_db_get_last(l_group);
        int64_t l_key = 0;
        if(l_obj) {
            l_key = strtoll(l_obj->key, NULL, 16);
        }
        char *l_key_str = dap_strdup_printf("%06x", ++l_key);
        if(!dap_chain_global_db_gr_set(dap_strdup(l_key_str), (uint8_t *) json_str, strlen(json_str) + 1, l_group)) {
            l_ret = -1;
        }
        DAP_DELETE(l_key_str);
        DAP_DELETE(l_group);
    }
    else
        l_ret = -2;
    json_object_put(jobj);
    return l_ret;
}
/**
 * @brief s_ch_chain_callback_notify_packet_R - Callback for channel 'R'
 * @param a_ch_chain
 * @param a_pkt_type
 * @param a_pkt
 * @param a_arg
 */
static void s_ch_chain_callback_notify_packet_R(dap_stream_ch_chain_net_srv_t* a_ch_chain, uint8_t a_pkt_type, dap_stream_ch_pkt_t *a_pkt, void * a_arg)
{
    dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg;
    switch (a_pkt_type) {
    // get new generated current node address
    case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE: {
            dap_stream_ch_chain_net_srv_pkt_test_t *l_request = (dap_stream_ch_chain_net_srv_pkt_test_t *) a_pkt->data;
            size_t l_request_size = l_request->data_size + sizeof(dap_stream_ch_chain_net_srv_pkt_test_t);
            if(a_pkt->hdr.size != l_request_size) {
                log_it(L_WARNING, "Wrong request size, less or more than required");
                break;
            }
            // todo to write result to database
            save_stat_to_database(l_request, l_node_client);
            //...
            l_node_client->state = NODE_CLIENT_STATE_CHECKED;
#ifndef _WIN32
            pthread_cond_broadcast(&l_node_client->wait_cond);
#else
            SetEvent( l_node_client->wait_cond );
#endif
            break;
        }
    }
}

/**
 * Create connection to server
 *
 * return a connection handle, or NULL, if an error
 */

dap_chain_node_client_t* dap_chain_node_client_connect_channels(dap_chain_net_t * l_net, dap_chain_node_info_t *a_node_info, const char *a_active_channels)
{
    return dap_chain_net_client_create_n_connect_channels(l_net,a_node_info,a_active_channels);
}

/**
 * @brief dap_chain_node_client_create_n_connect
 * @param a_net
 * @param a_node_info
 * @param a_active_channels
 * @param a_callbacks
 * @param a_callback_arg
 * @return
 */
dap_chain_node_client_t* dap_chain_node_client_create_n_connect(dap_chain_net_t * a_net, dap_chain_node_info_t *a_node_info,
        const char *a_active_channels,dap_chain_node_client_callbacks_t *a_callbacks, void * a_callback_arg )
{
    if(!a_node_info) {
        log_it(L_ERROR, "Can't connect to the node: null object node_info");
        return NULL;
    }
    dap_chain_node_client_t *l_node_client = DAP_NEW_Z(dap_chain_node_client_t);
    l_node_client->state = NODE_CLIENT_STATE_DISCONNECTED;
    l_node_client->callbacks_arg = a_callback_arg;
    if(a_callbacks)
        memcpy(&l_node_client->callbacks,a_callbacks,sizeof (*a_callbacks));
    l_node_client->info = a_node_info;
    l_node_client->net = a_net;

#ifndef _WIN32
    pthread_condattr_t attr;
    pthread_condattr_init(&attr);
    pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
    pthread_cond_init(&l_node_client->wait_cond, &attr);
#else
    l_node_client->wait_cond = CreateEventA( NULL, FALSE, FALSE, NULL );
#endif

    pthread_mutex_init(&l_node_client->wait_mutex, NULL);
    l_node_client->events = NULL; //dap_events_new();
    l_node_client->client = dap_client_new(l_node_client->events, s_stage_status_callback,
            s_stage_status_error_callback);
    l_node_client->client->_inheritor = l_node_client;
    l_node_client->remote_node_addr.uint64 = a_node_info->hdr.address.uint64;
    dap_client_set_active_channels_unsafe(l_node_client->client, a_active_channels);

    //dap_client_set_auth_cert(l_node_client->client, dap_cert_find_by_name("auth")); // TODO provide the certificate choice

    int hostlen = 128;
    char host[hostlen];
    if(a_node_info->hdr.ext_addr_v4.s_addr){
        struct sockaddr_in sa4 = { .sin_family = AF_INET, .sin_addr = a_node_info->hdr.ext_addr_v4 };
        inet_ntop(AF_INET, &(((struct sockaddr_in *) &sa4)->sin_addr), host, hostlen);
        log_it(L_INFO, "Connecting to %s address",host);
    } else {
        struct sockaddr_in6 sa6 = { .sin6_family = AF_INET6, .sin6_addr = a_node_info->hdr.ext_addr_v6 };
        inet_ntop(AF_INET6, &(((struct sockaddr_in6 *) &sa6)->sin6_addr), host, hostlen);
        log_it(L_INFO, "Connecting to %s address",host);
    }
    // address not defined
    if(!strcmp(host, "::")) {
        dap_chain_node_client_close(l_node_client);
        return NULL;
    }
    dap_client_set_uplink_unsafe(l_node_client->client, strdup(host), a_node_info->hdr.ext_port);
//    dap_client_stage_t a_stage_target = STAGE_ENC_INIT;
//    dap_client_stage_t l_stage_target = STAGE_STREAM_STREAMING;

    l_node_client->state = NODE_CLIENT_STATE_CONNECTING ;
    // ref pvt client
    //dap_client_pvt_ref(DAP_CLIENT_PVT(l_node_client->client));
    // Handshake & connect
    dap_client_go_stage(l_node_client->client, STAGE_STREAM_STREAMING, s_stage_connected_callback);
    return l_node_client;
}

/**
 * Create connection to server
 *
 * return a connection handle, or NULL, if an error
 */
dap_chain_node_client_t* dap_chain_node_client_connect(dap_chain_net_t * a_net,dap_chain_node_info_t *a_node_info)
{
    const char *l_active_channels = "CN";
    return dap_chain_node_client_connect_channels(a_net,a_node_info, l_active_channels);
}


void dap_chain_node_client_reset(dap_chain_node_client_t *a_client)
{
    if (a_client->state > NODE_CLIENT_STATE_ESTABLISHED) {
        a_client->state = NODE_CLIENT_STATE_ESTABLISHED;
    }
}
/**
 * Close connection to server, delete chain_node_client_t *client
 */
void dap_chain_node_client_close(dap_chain_node_client_t *a_client)
{
    if (a_client && a_client->client) { // block tryes to close twice
        // clean client
        dap_client_delete_mt(a_client->client);
#ifndef _WIN32
        pthread_cond_destroy(&a_client->wait_cond);
#else
        CloseHandle( a_client->wait_cond );
#endif
        pthread_mutex_destroy(&a_client->wait_mutex);
        a_client->client = NULL;
        DAP_DELETE(a_client);
    }
}

/**
 * Send stream request to server
 */
int dap_chain_node_client_send_ch_pkt(dap_chain_node_client_t *a_client, uint8_t a_ch_id, uint8_t a_type,
        const void *a_pkt_data, size_t a_pkt_data_size)
{
    if(!a_client || a_client->state < NODE_CLIENT_STATE_ESTABLISHED)
        return -1;

    dap_stream_worker_t *l_stream_worker = dap_client_get_stream_worker(a_client->client);
    dap_stream_ch_t * l_ch = dap_client_get_stream_ch_unsafe(a_client->client, a_ch_id);
    if(l_ch) {
//        dap_stream_ch_chain_net_t * l_ch_chain = DAP_STREAM_CH_CHAIN_NET(l_ch);
        dap_stream_ch_pkt_write_mt(l_stream_worker , l_ch , a_type, a_pkt_data, a_pkt_data_size);
        return 0;
    } else
        return -1;
}

/**
 * wait for the complete of request
 *
 * timeout_ms timeout in milliseconds
 * waited_state state which we will wait, sample NODE_CLIENT_STATE_CONNECT or NODE_CLIENT_STATE_SENDED
 * return -2 false, -1 timeout, 0 end of connection or sending data
 */
int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_state, int a_timeout_ms)
{
    int ret = -1;
    if(!a_client){
        log_it(L_ERROR, "Can't wait for status for (null) object");
        return -3;
    }

    pthread_mutex_lock(&a_client->wait_mutex);
    // have waited
    if(a_client->state == a_waited_state) {
        log_it(L_INFO, "We're already in state %s",dap_chain_node_client_state_to_str(a_client->state));
        pthread_mutex_unlock(&a_client->wait_mutex);
        return 0;
    }

    if (a_client->state < NODE_CLIENT_STATE_ESTABLISHED && a_waited_state > NODE_CLIENT_STATE_ESTABLISHED) {
        log_it(L_WARNING, "Waited state can't be achieved");
        pthread_mutex_unlock(&a_client->wait_mutex);
        return -2;
    }

#ifndef DAP_OS_WINDOWS
    // prepare for signal waiting
    struct timespec l_cond_timeout;
    clock_gettime( CLOCK_MONOTONIC, &l_cond_timeout);
    l_cond_timeout.tv_sec += a_timeout_ms/1000;
#else
    pthread_mutex_unlock( &a_client->wait_mutex );
#endif

    // signal waiting


#ifndef DAP_OS_WINDOWS
    do {
        int l_ret_wait = pthread_cond_timedwait(&a_client->wait_cond, &a_client->wait_mutex, &l_cond_timeout);
        if(l_ret_wait == 0 && (
                a_client->state == a_waited_state ||
                        (a_client->state == NODE_CLIENT_STATE_ERROR || a_client->state == NODE_CLIENT_STATE_DISCONNECTED))
                ) {
            ret = a_client->state == a_waited_state ? 0 : -2;
            break;
        }
        else if(l_ret_wait == ETIMEDOUT) { // 110 260
            //log_it(L_NOTICE,"Wait for status is stopped by timeout");
            ret = -1;
            break;
        }else if (l_ret_wait != 0 ){
            char l_errbuf[128];
            l_errbuf[0] = '\0';
            strerror_r(l_ret_wait,l_errbuf,sizeof (l_errbuf));
            log_it(L_ERROR, "Pthread condition timed wait returned \"%s\"(code %d)", l_errbuf, l_ret_wait);
        }
    } while(1);
#else
    DWORD wait = WaitForSingleObject( a_client->wait_cond, (uint32_t)a_timeout_ms);
    if ( wait == WAIT_OBJECT_0 && (
             a_client->state == a_waited_state ||
             a_client->state == NODE_CLIENT_STATE_ERROR ||
             a_client->state == NODE_CLIENT_STATE_DISCONNECTED))
    {
        return a_client->state == a_waited_state ? 0 : -2;
    } else if ( wait == WAIT_TIMEOUT || wait == WAIT_FAILED ) {
        return -1;
    }
#endif

#ifndef DAP_OS_WINDOWS
    pthread_mutex_unlock(&a_client->wait_mutex);
#endif
    return ret;
}

int dap_chain_node_client_set_callbacks(dap_client_t *a_client, uint8_t a_ch_id)
{
    int l_ret = -1;
    dap_chain_node_client_t *l_node_client = a_client->_inheritor;
    if(l_node_client) {
        pthread_mutex_lock(&l_node_client->wait_mutex);
        // find current channel code
        dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client);
        dap_stream_ch_t * l_ch = NULL;
        if(l_client_internal)
            l_ch = dap_client_get_stream_ch_unsafe(a_client, a_ch_id);
        if(l_ch) {
            // C
            if(a_ch_id == dap_stream_ch_chain_get_id()) {
                dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
                l_ch_chain->callback_notify_packet_out = s_ch_chain_callback_notify_packet_out;
                l_ch_chain->callback_notify_packet_in = s_ch_chain_callback_notify_packet_in;
                l_ch_chain->callback_notify_arg = l_node_client;
            }
            // N
            if(a_ch_id == dap_stream_ch_chain_net_get_id()) {
                dap_stream_ch_chain_net_t *l_ch_chain = DAP_STREAM_CH_CHAIN_NET(l_ch);
                l_ch_chain->notify_callback = s_ch_chain_callback_notify_packet_in2;
                l_ch_chain->notify_callback_arg = l_node_client;
            }
            // R
            if(a_ch_id == dap_stream_ch_chain_net_srv_get_id()) {
                dap_stream_ch_chain_net_srv_t * l_ch_chain = DAP_STREAM_CH_CHAIN_NET_SRV(l_ch);
                l_ch_chain->notify_callback = s_ch_chain_callback_notify_packet_R;
                l_ch_chain->notify_callback_arg = l_node_client;
            }
            l_ret = 0;
        } else {
        }
        pthread_mutex_unlock(&l_node_client->wait_mutex);
    }
    return l_ret;
}

/*static void nodelist_response_callback(dap_client_t *a_client, void *data, size_t data_len)
{
}

static void nodelist_response_error_callback(dap_client_t *a_client, int a_err)
{
}*/

/**
 * Send nodelist request to server
 */
int dap_chain_node_client_send_nodelist_req(dap_chain_node_client_t *a_client)
{
    if(!a_client || !a_client->client || a_client->state < NODE_CLIENT_STATE_ESTABLISHED)
        return -1;
    //dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client->client);

    //TODO send request to get nodelist
    //dap_client_request_enc(a_client->client, DAP_UPLINK_PATH_NODE_LIST, "", "", "", 0,
    //        nodelist_response_callback, nodelist_response_error_callback);
    return 1;
}