/* * Authors: * Dmitriy A. Gearasimov <naeper@demlabs.net> * Alexander Lysikov <alexander.lysikov@demlabs.net> * DeM Labs Inc. https://demlabs.net This file is part of DAP (Deus Applications Prototypes) the open source project DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. DAP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. */ #include <stdlib.h> #include <stdio.h> #include <time.h> #include <stdlib.h> #include <stddef.h> #include <stdint.h> #include <errno.h> #include <assert.h> #include <string.h> #ifdef WIN32 #undef _WIN32_WINNT #define _WIN32_WINNT 0x0600 #include <winsock2.h> #include <windows.h> #include <mswsock.h> #include <ws2tcpip.h> #include <io.h> #include <wepoll.h> #include <pthread.h> #endif #include "dap_common.h" #include "dap_client.h" #include "dap_config.h" #include "dap_events.h" #include "dap_http_client_simple.h" #include "dap_client_pvt.h" #include "dap_stream_ch_pkt.h" #include "dap_stream_ch_chain.h" #include "dap_stream_ch_chain_pkt.h" #include "dap_stream_ch_chain_net.h" #include "dap_stream_ch_chain_net_pkt.h" #include "dap_stream_pkt.h" //#include "dap_chain_common.h" #include "dap_chain_node_client.h" #define LOG_TAG "dap_chain_node_client" #define DAP_APP_NAME NODE_NETNAME"-node" #define SYSTEM_PREFIX "/opt/"DAP_APP_NAME #define SYSTEM_CONFIGS_DIR SYSTEM_PREFIX"/etc" //static int listen_port_tcp = 8079; static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg); static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t*, uint8_t a_pkt_type, dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg); static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type, dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg); /** * @brief dap_chain_node_client_init * @return */ int dap_chain_node_client_init(void) { dap_config_t *g_config; // read listen_port_tcp from settings memcpy( s_sys_dir_path + l_sys_dir_path_len, SYSTEM_CONFIGS_DIR, sizeof(SYSTEM_CONFIGS_DIR) ); dap_config_init(s_sys_dir_path); memset(s_sys_dir_path + l_sys_dir_path_len, '\0', MAX_PATH - l_sys_dir_path_len); if((g_config = dap_config_open(DAP_APP_NAME)) == NULL) { return -1; } /*else { const char *port_str = dap_config_get_item_str(g_config, "server", "listen_port_tcp"); listen_port_tcp = (port_str) ? atoi(port_str) : 8079; }*/ if(g_config) dap_config_close(g_config); return 0; } /** * @brief dap_chain_node_client_deinit */ void dap_chain_node_client_deinit() { dap_http_client_simple_deinit(); dap_client_deinit(); } /** * @brief stage_status_callback * @param a_client * @param a_arg */ static void s_stage_status_callback(dap_client_t *a_client, void *a_arg) { (void) a_client; (void) a_arg; //printf("* stage_status_callback client=%x data=%x\n", a_client, a_arg); } /** * @brief s_stage_status_error_callback * @param a_client * @param a_arg */ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) { dap_chain_node_client_t *l_node_client = DAP_CHAIN_NODE_CLIENT(a_client); // check for last attempt bool l_is_last_attempt = a_arg ? true : false; if(l_is_last_attempt){ pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_DISCONNECTED; pthread_mutex_unlock(&l_node_client->wait_mutex); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else SetEvent( l_node_client->wait_cond ); #endif } if(l_node_client && l_node_client->keep_connection && ((dap_client_get_stage(a_client) != STAGE_STREAM_STREAMING) || (dap_client_get_stage_status(a_client) == STAGE_STATUS_ERROR))) { log_it(L_NOTICE,"Some errors happends, current state is %s but we need to return back to STAGE_STREAM_STREAMING", dap_client_get_stage_str(a_client) ); log_it(L_DEBUG, "Wakeup all who waits"); pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_ERROR; pthread_mutex_unlock(&l_node_client->wait_mutex); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else SetEvent( l_node_client->wait_cond ); #endif //dap_client_go_stage( a_client , STAGE_STREAM_STREAMING, s_stage_end_callback ); } //printf("* tage_status_error_callback client=%x data=%x\n", a_client, a_arg); } /** * @brief a_stage_end_callback * @param a_client * @param a_arg */ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) { dap_chain_node_client_t *l_node_client = a_client->_inheritor; //assert(l_node_client); if(l_node_client) { log_it(L_NOTICE, "Stream connection with node " NODE_ADDR_FP_STR " established", NODE_ADDR_FP_ARGS_S( l_node_client->remote_node_addr)); pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_CONNECTED; // find current channel code dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); dap_stream_ch_t * l_ch = NULL; if(l_client_internal && l_client_internal->active_channels) l_ch = dap_client_get_stream_ch(a_client, l_client_internal->active_channels[0]); //dap_stream_ch_t * l_ch = dap_client_get_stream_ch(a_client, dap_stream_ch_chain_get_id()); if(l_ch) { dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); l_ch_chain->callback_notify_packet_out = s_ch_chain_callback_notify_packet_out; l_ch_chain->callback_notify_packet_in = s_ch_chain_callback_notify_packet_in; l_ch_chain->callback_notify_arg = l_node_client; } else { log_it(L_WARNING, "No ch_chain channel, can't init notify callback for pkt type CH_CHAIN"); } pthread_mutex_unlock(&l_node_client->wait_mutex); if(l_node_client->callback_connected) l_node_client->callback_connected(l_node_client, a_arg); l_node_client->keep_connection = true; log_it(L_DEBUG, "Wakeup all who waits"); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else SetEvent( l_node_client->wait_cond ); #endif } } /** * @brief s_ch_chain_callback_notify_packet_in2 - for dap_stream_ch_chain_net * @param a_ch_chain_net * @param a_pkt_type * @param a_pkt_net * @param a_pkt_data_size * @param a_arg */ static void s_ch_chain_callback_notify_packet_in2(dap_stream_ch_chain_net_t* a_ch_chain_net, uint8_t a_pkt_type, dap_stream_ch_chain_net_pkt_t *a_pkt_net, size_t a_pkt_net_data_size, void * a_arg) { dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; switch (a_pkt_type) { // get new generated current node address case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE: { if(a_pkt_net_data_size == sizeof(dap_chain_node_addr_t)) { dap_chain_node_addr_t *l_addr = (dap_chain_node_addr_t *) a_pkt_net->data; memcpy(&l_node_client->cur_node_addr, l_addr, sizeof(dap_chain_node_addr_t)); } pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_NODE_ADDR_LEASED; pthread_mutex_unlock(&l_node_client->wait_mutex); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else SetEvent( l_node_client->wait_cond ); #endif break; } // get remote node address case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR: { if(a_pkt_net_data_size == sizeof(dap_chain_node_addr_t)) { dap_chain_node_addr_t *l_addr = (dap_chain_node_addr_t *) a_pkt_net->data; memcpy(&l_node_client->remote_node_addr, l_addr, sizeof(dap_chain_node_addr_t)); } pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_GET_NODE_ADDR; pthread_mutex_unlock(&l_node_client->wait_mutex); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else SetEvent( l_node_client->wait_cond ); #endif break; } } } /** * @brief s_ch_chain_callback_notify_packet_in - for dap_stream_ch_chain * @param a_ch_chain * @param a_pkt_type * @param a_pkt * @param a_pkt_data_size * @param a_arg */ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type, dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg) { dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; switch (a_pkt_type) { case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR: pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_ERROR; dap_snprintf(l_node_client->last_error, sizeof(l_node_client->last_error), "%s", (char*) a_pkt->data); log_it(L_WARNING, "Received packet DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR with error \"%s\"", l_node_client->last_error); pthread_mutex_unlock(&l_node_client->wait_mutex); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else SetEvent( l_node_client->wait_cond ); #endif case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { dap_stream_ch_chain_sync_request_t * l_request = NULL; if(a_pkt_data_size == sizeof(*l_request)) l_request = (dap_stream_ch_chain_sync_request_t*) a_pkt->data; if(l_request) { if(l_request->id_start < (uint64_t) dap_db_log_get_last_id()) { log_it(L_INFO, "Remote is synced but we have updates for it"); // Get log diff a_ch_chain->request_last_ts = dap_db_log_get_last_id(); dap_list_t *l_list = dap_db_log_get_list((time_t) l_request->id_start); if(l_list) { // Add it to outgoing list l_list->prev = a_ch_chain->request_global_db_trs; a_ch_chain->request_global_db_trs = l_list; a_ch_chain->request_net_id.uint64 = a_pkt->hdr.net_id.uint64; a_ch_chain->request_cell_id.uint64 = a_pkt->hdr.cell_id.uint64; a_ch_chain->request_chain_id.uint64 = a_pkt->hdr.chain_id.uint64; a_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB; dap_chain_node_addr_t l_node_addr = { 0 }; dap_chain_net_t *l_net = dap_chain_net_by_id(a_ch_chain->request_net_id); l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); dap_stream_ch_chain_pkt_write(a_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, a_ch_chain->request_net_id, a_ch_chain->request_chain_id, a_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); log_it(L_INFO, "Sync for remote tr_count=%d", dap_list_length(l_list)); dap_stream_ch_set_ready_to_write(a_ch_chain->ch, true); } } else { log_it(L_INFO, "Remote node has lastes ts for us"); pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_SYNCED; pthread_mutex_unlock(&l_node_client->wait_mutex); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else SetEvent( l_node_client->wait_cond ); #endif } } else { log_it(L_INFO, "Sync notify without request to sync back, stay in SYNCED state"); pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_SYNCED; pthread_mutex_unlock(&l_node_client->wait_mutex); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else SetEvent( l_node_client->wait_cond ); #endif } } default: { } } } /** * @brief s_ch_chain_callback_notify_packet_in * @param a_ch_chain * @param a_pkt_type * @param a_pkt * @param a_pkt_data_size * @param a_arg */ static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type, dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg) { (void) a_pkt; (void) a_pkt_data_size; (void) a_ch_chain; dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; switch (a_pkt_type) { case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_SYNCED; pthread_mutex_unlock(&l_node_client->wait_mutex); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else SetEvent( l_node_client->wait_cond ); #endif } break; default: { } } } /** * Create connection to server * * return a connection handle, or NULL, if an error */ dap_chain_node_client_t* dap_chain_client_connect(dap_chain_node_info_t *a_node_info, dap_client_stage_t a_stage_target, const char *a_active_channels) { if(!a_node_info) { log_it(L_ERROR, "Can't connect to the node: null object node_info"); return NULL; } dap_chain_node_client_t *l_node_client = DAP_NEW_Z(dap_chain_node_client_t); l_node_client->state = NODE_CLIENT_STATE_DISCONNECTED; #ifndef _WIN32 pthread_condattr_t attr; pthread_condattr_init(&attr); pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); pthread_cond_init(&l_node_client->wait_cond, &attr); #else l_node_client->wait_cond = CreateEventA( NULL, FALSE, FALSE, NULL ); #endif pthread_mutex_init(&l_node_client->wait_mutex, NULL); l_node_client->events = NULL; //dap_events_new(); l_node_client->client = dap_client_new(l_node_client->events, s_stage_status_callback, s_stage_status_error_callback); l_node_client->client->_inheritor = l_node_client; l_node_client->remote_node_addr.uint64 = a_node_info->hdr.address.uint64; dap_client_set_active_channels(l_node_client->client, a_active_channels); int hostlen = 128; char host[hostlen]; if(a_node_info->hdr.ext_addr_v4.s_addr) { struct sockaddr_in sa4 = { .sin_family = AF_INET, .sin_addr = a_node_info->hdr.ext_addr_v4 }; inet_ntop(AF_INET, &(((struct sockaddr_in *) &sa4)->sin_addr), host, hostlen); } else { struct sockaddr_in6 sa6 = { .sin6_family = AF_INET6, .sin6_addr = a_node_info->hdr.ext_addr_v6 }; inet_ntop(AF_INET6, &(((struct sockaddr_in6 *) &sa6)->sin6_addr), host, hostlen); } // address not defined if(!strcmp(host, "::")) { dap_chain_node_client_close(l_node_client); return NULL; } dap_client_set_uplink(l_node_client->client, strdup(host), a_node_info->hdr.ext_port); // dap_client_stage_t a_stage_target = STAGE_ENC_INIT; // dap_client_stage_t l_stage_target = STAGE_STREAM_STREAMING; l_node_client->state = NODE_CLIENT_STATE_CONNECT; // Handshake & connect dap_client_go_stage(l_node_client->client, a_stage_target, s_stage_connected_callback); return l_node_client; } /** * Create connection to server * * return a connection handle, or NULL, if an error */ dap_chain_node_client_t* dap_chain_node_client_connect(dap_chain_node_info_t *a_node_info) { dap_client_stage_t l_stage_target = STAGE_STREAM_STREAMING; const char *l_active_channels = "CN"; return dap_chain_client_connect(a_node_info, l_stage_target, l_active_channels); } /** * Close connection to server, delete chain_node_client_t *client */ void dap_chain_node_client_close(dap_chain_node_client_t *a_client) { if(a_client) { pthread_mutex_lock(&a_client->wait_mutex); a_client->client->_inheritor = NULL; // because client->_inheritor == a_client pthread_mutex_unlock(&a_client->wait_mutex); // clean client dap_client_delete(a_client->client); a_client->client = NULL; #ifndef _WIN32 pthread_cond_destroy(&a_client->wait_cond); #else CloseHandle( a_client->wait_cond ); #endif pthread_mutex_destroy(&a_client->wait_mutex); DAP_DELETE(a_client); } } /** * Send stream request to server */ int dap_chain_node_client_send_ch_pkt(dap_chain_node_client_t *a_client, uint8_t a_ch_id, uint8_t a_type, const void *a_pkt_data, size_t a_pkt_data_size) { if(!a_client || a_client->state < NODE_CLIENT_STATE_CONNECTED) return -1; // dap_stream_t *l_stream = dap_client_get_stream(a_client->client); dap_stream_ch_t * l_ch = dap_client_get_stream_ch(a_client->client, a_ch_id); if(l_ch) { // dap_stream_ch_chain_net_t * l_ch_chain = DAP_STREAM_CH_CHAIN_NET(l_ch); dap_stream_ch_pkt_write(l_ch, a_type, a_pkt_data, a_pkt_data_size); dap_stream_ch_set_ready_to_write(l_ch, true); return 0; } else return -1; } /** * wait for the complete of request * * timeout_ms timeout in milliseconds * waited_state state which we will wait, sample NODE_CLIENT_STATE_CONNECT or NODE_CLIENT_STATE_SENDED * return -2 false, -1 timeout, 0 end of connection or sending data */ int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_state, int a_timeout_ms) { int ret = -1; if(!a_client) return -3; pthread_mutex_lock(&a_client->wait_mutex); // have waited if(a_client->state == a_waited_state) { pthread_mutex_unlock(&a_client->wait_mutex); return 0; } if(a_client->state == NODE_CLIENT_STATE_DISCONNECTED) { pthread_mutex_unlock(&a_client->wait_mutex); return -2; } #ifndef _WIN32 // prepare for signal waiting struct timespec to; clock_gettime( CLOCK_MONOTONIC, &to); int64_t nsec_new = to.tv_nsec + a_timeout_ms * 1000000ll; // if the new number of nanoseconds is more than a second if(nsec_new > (long) 1e9) { to.tv_sec += nsec_new / (long) 1e9; to.tv_nsec = nsec_new % (long) 1e9; } else to.tv_nsec = (long) nsec_new; #else pthread_mutex_unlock( &a_client->wait_mutex ); #endif // signal waiting do { #ifndef _WIN32 int wait = pthread_cond_timedwait(&a_client->wait_cond, &a_client->wait_mutex, &to); if(wait == 0 && ( a_client->state == a_waited_state || (a_client->state == NODE_CLIENT_STATE_ERROR || a_client->state == NODE_CLIENT_STATE_DISCONNECTED)) ) { ret = a_client->state == a_waited_state ? 0 : -2; break; } else if(wait == ETIMEDOUT) { // 110 260 ret = -1; break; } #else int wait = WaitForSingleObject( a_client->wait_cond, (uint32_t)a_timeout_ms ); pthread_mutex_lock( &a_client->wait_mutex ); if ( wait == WAIT_OBJECT_0 && ( a_client->state == a_waited_state || a_client->state == NODE_CLIENT_STATE_ERROR ) ) { ret = a_client->state == a_waited_state ? 0 : -2; break; } else if ( wait == WAIT_TIMEOUT || wait == WAIT_FAILED ) { ret = -1; break; } #endif } while(1); pthread_mutex_unlock(&a_client->wait_mutex); return ret; } int dap_chain_node_client_set_callbacks(dap_client_t *a_client, uint8_t a_ch_id) { int l_ret = -1; dap_chain_node_client_t *l_node_client = a_client->_inheritor; if(l_node_client) { pthread_mutex_lock(&l_node_client->wait_mutex); // find current channel code dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); dap_stream_ch_t * l_ch = NULL; if(l_client_internal) l_ch = dap_client_get_stream_ch(a_client, a_ch_id); if(l_ch) { if(a_ch_id == dap_stream_ch_chain_get_id()) { dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); l_ch_chain->callback_notify_packet_out = s_ch_chain_callback_notify_packet_out; l_ch_chain->callback_notify_packet_in = s_ch_chain_callback_notify_packet_in; l_ch_chain->callback_notify_arg = l_node_client; } if(a_ch_id == dap_stream_ch_chain_net_get_id()) { dap_stream_ch_chain_net_t *l_ch_chain = DAP_STREAM_CH_CHAIN_NET(l_ch); l_ch_chain->notify_callback = s_ch_chain_callback_notify_packet_in2; l_ch_chain->notify_callback_arg = l_node_client; } l_ret = 0; } else { } pthread_mutex_unlock(&l_node_client->wait_mutex); } return l_ret; }