/* * Authors: * Dmitriy A. Gearasimov <naeper@demlabs.net> * Alexander Lysikov <alexander.lysikov@demlabs.net> * DeM Labs Inc. https://demlabs.net This file is part of DAP (Deus Applications Prototypes) the open source project DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. DAP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. */ #include "dap_time.h" #include <stdlib.h> #include <stdio.h> #include <time.h> #include <stdlib.h> #include <stddef.h> #include <stdint.h> #include <errno.h> #include <assert.h> #include <string.h> #ifdef WIN32 #include <winsock2.h> #include <windows.h> #include <mswsock.h> #include <ws2tcpip.h> #include <io.h> #include <pthread.h> #else #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #endif #include "json.h" #include "uthash.h" #include "dap_common.h" #include "dap_client.h" #include "dap_config.h" #include "dap_events.h" #include "dap_timerfd.h" #include "dap_hash.h" #include "dap_uuid.h" #include "dap_client.h" #include "dap_client_pvt.h" #include "dap_global_db_remote.h" #include "dap_chain.h" #include "dap_chain_cell.h" #include "dap_chain_net_srv.h" #include "dap_stream_worker.h" #include "dap_stream_ch_pkt.h" #include "dap_stream_ch_chain.h" #include "dap_stream_ch_chain_pkt.h" #include "dap_stream_ch_chain_net.h" #include "dap_stream_ch_chain_net_pkt.h" #include "dap_stream_ch_chain_net_srv.h" #include "dap_stream_ch_chain_voting.h" #include "dap_stream_pkt.h" #include "dap_chain_node_client.h" #define LOG_TAG "dap_chain_node_client" static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg); static bool s_timer_update_states_callback(void *a_arg); static int s_node_client_set_notify_callbacks(dap_client_t *a_client, uint8_t a_ch_id); static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t*, uint8_t a_pkt_type, dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg); static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type, dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg); bool s_stream_ch_chain_debug_more = false; uint32_t s_timer_update_states = 600; #ifdef DAP_OS_WINDOWS #define dap_cond_signal(x) SetEvent(x) #else #define dap_cond_signal(x) pthread_cond_broadcast(&x) #endif /** * @brief dap_chain_node_client_init * @return */ int dap_chain_node_client_init(void) { s_stream_ch_chain_debug_more = dap_config_get_item_bool_default(g_config, "stream_ch_chain", "debug_more", false); s_timer_update_states = dap_config_get_item_uint32_default(g_config, "node_client", "timer_update_states", s_timer_update_states); return 0; } /** * @brief dap_chain_node_client_deinit */ void dap_chain_node_client_deinit() { dap_client_deinit(); } static bool s_timer_node_reconnect(void *a_arg) { if (!a_arg) return false; dap_chain_node_client_t *l_me = a_arg; if (l_me->keep_connection && l_me->state == NODE_CLIENT_STATE_DISCONNECTED) { if (dap_client_get_stage(l_me->client) == STAGE_BEGIN) { log_it(L_INFO, "Reconnecting node client with peer "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_me->remote_node_addr)); l_me->state = NODE_CLIENT_STATE_CONNECTING ; dap_client_go_stage(l_me->client, STAGE_STREAM_STREAMING, s_stage_connected_callback); } } return false; } /** * @brief s_stage_status_error_callback * @param a_client * @param a_arg */ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) { if (s_stream_ch_chain_debug_more) log_it(L_DEBUG, "s_stage_status_error_callback"); dap_chain_node_client_t *l_node_client = DAP_CHAIN_NODE_CLIENT(a_client); if(!l_node_client) return; if (l_node_client->sync_timer) { // Disable timer, it will be restarted with new connection dap_timerfd_delete_unsafe(l_node_client->sync_timer); l_node_client->sync_timer = NULL; } // check for last attempt bool l_is_last_attempt = a_arg ? true : false; if (l_is_last_attempt) { pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_DISCONNECTED; dap_cond_signal(l_node_client->wait_cond); pthread_mutex_unlock(&l_node_client->wait_mutex); l_node_client->esocket_uuid = 0; dap_chain_net_sync_unlock(l_node_client->net, l_node_client); if (l_node_client->callbacks.disconnected) { l_node_client->callbacks.disconnected(l_node_client, l_node_client->callbacks_arg); } if (l_node_client->keep_connection) { if (dap_client_get_stage(l_node_client->client) != STAGE_BEGIN) dap_client_go_stage(l_node_client->client, STAGE_BEGIN, NULL); l_node_client->reconnect_timer = dap_timerfd_start(45 * 1000, s_timer_node_reconnect, l_node_client); } } else if(l_node_client->callbacks.error) // TODO make different error codes l_node_client->callbacks.error(l_node_client, EINVAL, l_node_client->callbacks_arg); } /** * @brief dap_chain_node_client_start_sync * @param a_uuid * @param a_wrlock * @return */ dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_chain_node_client_t *a_node_client) { assert(a_node_client); // check if esocket still in worker dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(a_node_client->stream_worker, a_node_client->ch_chain_uuid); if (l_ch) { dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); assert(l_ch_chain); dap_chain_net_t * l_net = a_node_client->net; assert(l_net); // If we do nothing - init sync process if (l_ch_chain->state == CHAIN_STATE_IDLE) { bool l_trylocked = dap_chain_net_sync_trylock(l_net, a_node_client); if (l_trylocked) { log_it(L_INFO, "Start synchronization process with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr)); dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); dap_stream_ch_chain_pkt_write_unsafe(a_node_client->ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ, l_net->pub.id.uint64, 0, 0, &l_sync_gdb, sizeof(l_sync_gdb)); if (!l_ch_chain->activity_timer) dap_stream_ch_chain_timer_start(l_ch_chain); return NODE_SYNC_STATUS_STARTED; } else return NODE_SYNC_STATUS_WAITING; } else return NODE_SYNC_STATUS_IN_PROGRESS; } return NODE_SYNC_STATUS_FAILED; } /** * @brief s_timer_update_states_callback * @param a_arg * @return */ static bool s_timer_update_states_callback(void *a_arg) { if (!a_arg) return false; dap_chain_node_client_t *l_me = a_arg; dap_chain_node_sync_status_t l_status = dap_chain_node_client_start_sync(l_me); if (l_status == NODE_SYNC_STATUS_FAILED) { l_me->state = NODE_CLIENT_STATE_DISCONNECTED; if (l_me->keep_connection) { if (dap_client_get_stage(l_me->client) != STAGE_BEGIN) { dap_client_go_stage(l_me->client, STAGE_BEGIN, NULL); return true; } if (l_me->is_connected && l_me->callbacks.disconnected) l_me->callbacks.disconnected(l_me, l_me->callbacks_arg); if (l_me->keep_connection) { log_it(L_INFO, "Reconnecting node client with peer "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_me->remote_node_addr)); l_me->state = NODE_CLIENT_STATE_CONNECTING ; dap_client_go_stage(l_me->client, STAGE_STREAM_STREAMING, s_stage_connected_callback); } } } return true; } /** * @brief a_stage_end_callback * @param a_client * @param a_arg */ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) { dap_chain_node_client_t *l_node_client = DAP_CHAIN_NODE_CLIENT(a_client); UNUSED(a_arg); if(l_node_client) { char l_ip_addr_str[INET_ADDRSTRLEN] = {}; inet_ntop(AF_INET, &l_node_client->info->hdr.ext_addr_v4, l_ip_addr_str, INET_ADDRSTRLEN); log_it(L_NOTICE, "Stream connection with node "NODE_ADDR_FP_STR" (%s:%hu) established", NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr), l_ip_addr_str, l_node_client->info->hdr.ext_port); // set callbacks for C and N channels; for R and S it is not needed if (a_client->active_channels) { size_t l_channels_count = dap_strlen(a_client->active_channels); for(size_t i = 0; i < l_channels_count; i++) { if(s_node_client_set_notify_callbacks(a_client, a_client->active_channels[i]) == -1) { log_it(L_WARNING, "No ch_chain channel, can't init notify callback for pkt type CH_CHAIN"); return; } } } if(l_node_client->callbacks.connected) l_node_client->callbacks.connected(l_node_client, l_node_client->callbacks_arg); pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_ESTABLISHED; if (s_stream_ch_chain_debug_more) log_it(L_DEBUG, "Wakeup all who waits"); dap_cond_signal(l_node_client->wait_cond); pthread_mutex_unlock(&l_node_client->wait_mutex); dap_stream_t * l_stream = dap_client_get_stream(a_client); if (l_stream) { l_node_client->esocket_uuid = l_stream->esocket->uuid; l_node_client->stream_worker = l_stream->stream_worker; if (l_node_client->keep_connection) { if(l_node_client->stream_worker){ s_timer_update_states_callback(l_node_client); l_node_client->sync_timer = dap_timerfd_start_on_worker(l_stream->esocket->context->worker, s_timer_update_states * 1000, s_timer_update_states_callback, l_node_client); }else{ log_it(L_ERROR,"After NODE_CLIENT_STATE_ESTABLISHED: Node client has no worker, too dangerous to run update states in alien context"); } } } } } /** * @brief s_ch_chain_callback_notify_packet_in2 - for dap_stream_ch_chain_net * @param a_ch_chain_net * @param a_pkt_type * @param a_pkt_net * @param a_pkt_data_size * @param a_arg */ static void s_ch_chain_callback_notify_packet_in2(dap_stream_ch_chain_net_t* a_ch_chain_net, uint8_t a_pkt_type, dap_stream_ch_chain_net_pkt_t *a_pkt_net, size_t a_pkt_net_data_size, void * a_arg) { dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; switch (a_pkt_type) { // get new generated current node address case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE: { if(a_pkt_net_data_size == sizeof(dap_chain_node_addr_t)) { l_node_client->cur_node_addr = *(dap_chain_node_addr_t*)a_pkt_net->data; } pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_NODE_ADDR_LEASED; dap_cond_signal(l_node_client->wait_cond); pthread_mutex_unlock(&l_node_client->wait_mutex); break; } // get remote node address case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR: { if(a_pkt_net_data_size == sizeof(dap_chain_node_addr_t)) { l_node_client->remote_node_addr = *(dap_chain_node_addr_t*)a_pkt_net->data; } pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_GET_NODE_ADDR; dap_cond_signal(l_node_client->wait_cond); pthread_mutex_unlock(&l_node_client->wait_mutex); break; } case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_VALIDATOR_READY: { if(a_pkt_net_data_size == sizeof(dap_chain_node_addr_t)) { l_node_client->remote_node_addr = *(dap_chain_node_addr_t*)a_pkt_net->data; } pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->callbacks_arg = DAP_DUP_SIZE(a_pkt_net->data, a_pkt_net_data_size); l_node_client->state = NODE_CLIENT_STATE_VALID_READY; dap_cond_signal(l_node_client->wait_cond); pthread_mutex_unlock(&l_node_client->wait_mutex); break; } break; default:; } } /** * @brief s_ch_chain_callback_notify_packet_in - for dap_stream_ch_chain * @param a_ch_chain * @param a_pkt_type * @param a_pkt * @param a_pkt_data_size * @param a_arg */ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type, dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg) { UNUSED(a_pkt_data_size); dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; dap_chain_net_t *l_net = l_node_client->net; assert(l_net); bool l_finished = false; switch (a_pkt_type) { case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR: snprintf(l_node_client->last_error, sizeof(l_node_client->last_error), "%s", (char*) a_pkt->data); log_it(L_WARNING, "In: Received packet DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR with error \"%s\"", l_node_client->last_error); l_node_client->state = NODE_CLIENT_STATE_ERROR; if (!strcmp(l_node_client->last_error, "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS")) { dap_stream_ch_chain_reset_unsafe(a_ch_chain); l_finished = true; } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ:{ l_node_client->state = NODE_CLIENT_STATE_SYNC_GDB_UPDATES; }break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_START:{ l_node_client->state = NODE_CLIENT_STATE_SYNC_GDB_RVRS; dap_chain_net_t * l_net = l_node_client->net; assert(l_net); dap_chain_net_set_state(l_net, NET_STATE_SYNC_GDB); }break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB:{ l_node_client->state = NODE_CLIENT_STATE_SYNC_GDB; }break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ:{ l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES; }break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START:{ l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_RVRS; dap_chain_net_t * l_net = l_node_client->net; assert(l_net); dap_chain_net_set_state(l_net, NET_STATE_SYNC_CHAINS); }break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN:{ l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS; }break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { dap_chain_id_t l_chain_id = {}; dap_chain_cell_id_t l_cell_id = {}; if (a_pkt_type == DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB) { if (dap_chain_net_get_target_state(l_net) != NET_STATE_SYNC_GDB) { if(s_stream_ch_chain_debug_more) log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" synced GDB. Going to update chains", l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr )); l_node_client->cur_chain = l_net->pub.chains; l_node_client->cur_cell = l_node_client->cur_chain ? l_node_client->cur_chain->cells : NULL; } else l_node_client->cur_chain = NULL; } else { // Check if we over with it before if ( ! l_node_client->cur_cell ){ if(s_stream_ch_chain_debug_more) log_it(L_INFO, "In: No current cell in sync state, anyway we over it"); }else l_node_client->cur_cell =(dap_chain_cell_t *) l_node_client->cur_cell->hh.next; // If over with cell, switch on next chain if ( l_node_client->cur_cell){ // Check if we over with it before if ( !l_node_client->cur_chain ){ log_it(L_ERROR, "In: No chain but cell is present, over with it"); } }else{ // Check if we over with it before if ( !l_node_client->cur_chain ){ log_it(L_WARNING, "In: No current chain in sync state, anyway we over it"); }else{ l_node_client->cur_chain = (dap_chain_t *) l_node_client->cur_chain->next; l_node_client->cur_cell = l_node_client->cur_chain ? l_node_client->cur_chain->cells : NULL; } } } if (l_node_client->cur_cell) l_cell_id = l_node_client->cur_cell->id; // Check if we have some more chains and cells in it to sync dap_chain_node_addr_t l_node_addr; l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); if( l_node_client->cur_chain ){ l_chain_id=l_node_client->cur_chain->id; if (s_stream_ch_chain_debug_more) { log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" started to sync %s chain",l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_node_addr), l_node_client->cur_chain->name ); } dap_stream_ch_chain_pkt_write_unsafe(l_node_client->ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ, l_net->pub.id.uint64 , l_chain_id.uint64,l_cell_id.uint64,NULL,0); } else { // If no - over with sync process log_it(L_INFO, "In: State node %s."NODE_ADDR_FP_STR" is SYNCED",l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_node_addr) ); l_finished = true; } } break; default: break; } if (l_finished) { pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_SYNCED; dap_cond_signal(l_node_client->wait_cond); pthread_mutex_unlock(&l_node_client->wait_mutex); bool l_have_waiting = dap_chain_net_sync_unlock(l_net, l_node_client); if (dap_chain_net_get_target_state(l_net) == NET_STATE_ONLINE) { dap_timerfd_reset_unsafe(l_node_client->sync_timer); dap_chain_net_set_state(l_net, NET_STATE_ONLINE); } else if (!l_have_waiting) { // l_node_client object is not presented after dap_chain_net_state_go_to with NET_STATE_OFFLINE dap_chain_net_state_go_to(l_net, NET_STATE_OFFLINE); } } } /** * @brief s_ch_chain_callback_notify_packet_in * @param a_ch_chain * @param a_pkt_type * @param a_pkt * @param a_pkt_data_size * @param a_arg */ static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type, dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg) { (void) a_pkt; (void) a_pkt_data_size; (void) a_ch_chain; dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; assert(a_arg); switch (a_pkt_type) { case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: { if(s_stream_ch_chain_debug_more) log_it(L_INFO,"Out: global database sent to uplink "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr)); } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { if(s_stream_ch_chain_debug_more) log_it(L_INFO,"Out: chain %"DAP_UINT64_FORMAT_x" sent to uplink "NODE_ADDR_FP_STR,l_node_client->cur_chain ? l_node_client->cur_chain->id.uint64 : 0, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr)); } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_TIMEOUT: case DAP_STREAM_CH_CHAIN_PKT_TYPE_DELETE: { dap_chain_net_t *l_net = l_node_client->net; assert(l_net); dap_chain_node_addr_t *l_node_addr = dap_chain_net_get_cur_addr(l_net); log_it(L_DEBUG, "In: State node %s."NODE_ADDR_FP_STR" %s", l_net->pub.name, NODE_ADDR_FP_ARGS(l_node_addr), a_pkt_type == DAP_STREAM_CH_CHAIN_PKT_TYPE_TIMEOUT ? "is timeout for sync" : "stream closed"); l_node_client->state = NODE_CLIENT_STATE_ERROR; if (l_node_client->sync_timer) dap_timerfd_reset_unsafe(l_node_client->sync_timer); bool l_have_waiting = dap_chain_net_sync_unlock(l_net, l_node_client); if (!l_have_waiting) { if (dap_chain_net_get_target_state(l_net) == NET_STATE_ONLINE) dap_chain_net_set_state(l_net, NET_STATE_ONLINE); else dap_chain_net_state_go_to(l_net, NET_STATE_OFFLINE); } } break; default:; } } /** * @brief s_save_stat_to_database_callback_set_stat * @param a_global_db_context * @param a_rc * @param a_group * @param a_key * @param a_value * @param a_value_len * @param a_value_ts * @param a_is_pinned * @param a_arg */ static void s_save_stat_to_database_callback_set_stat (dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const void * a_value, const size_t a_value_len, dap_nanotime_t a_value_ts, bool a_is_pinned, void * a_arg) { if( a_rc != DAP_GLOBAL_DB_RC_SUCCESS) log_it(L_ERROR,"Can't save stats to GlobalDB, code %d", a_rc); DAP_DELETE(a_arg); } /** * @brief s_save_stat_to_database_callback_get_last_stat * @param a_global_db_context * @param a_rc * @param a_group * @param a_key * @param a_value * @param a_value_len * @param a_value_ts * @param a_is_pinned * @param a_arg */ static void s_save_stat_to_database_callback_get_last_stat (dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const void * a_value, const size_t a_value_len, dap_nanotime_t a_value_ts, bool a_is_pinned, void * a_arg) { char * l_json_str = (char *) a_arg; uint64_t l_key = 0; if(a_rc == DAP_GLOBAL_DB_RC_SUCCESS) { l_key = strtoll(a_key, NULL, 16); } char *l_key_str = dap_strdup_printf("%06"DAP_UINT64_FORMAT_x, ++l_key); dap_global_db_set(a_group, l_key_str, l_json_str, strlen(l_json_str) + 1,false, s_save_stat_to_database_callback_set_stat, l_json_str); DAP_DELETE(l_key_str); } /** * @brief save_stat_to_database * * @param a_request * @param a_node_client * @return int */ static int s_save_stat_to_database(dap_stream_ch_chain_net_srv_pkt_test_t *a_request, dap_chain_node_client_t * a_node_client) { UNUSED(a_node_client); int l_ret = 0; if(!a_request) return -1; long l_t1_ms = a_request->send_time1 / 1e6; long l_t2_ms = a_request->recv_time1 / 1e6; struct json_object *jobj = json_object_new_object(); time_t l_cur_t = time(NULL); char buf[1024]; dap_time_to_str_rfc822( buf, sizeof(buf), l_cur_t ); json_object_object_add(jobj, "time_save", json_object_new_int64(l_cur_t)); json_object_object_add(jobj, "time_save_str", json_object_new_string(buf)); json_object_object_add(jobj, "time_connect", json_object_new_int(a_request->time_connect_ms)); json_object_object_add(jobj, "time_transmit", json_object_new_int(l_t2_ms-l_t1_ms)); json_object_object_add(jobj, "ip_send", json_object_new_string((char *)a_request->ip_send)); json_object_object_add(jobj, "ip_recv", json_object_new_string((char *)a_request->ip_recv)); json_object_object_add(jobj, "time_len_send", json_object_new_int(a_request->data_size_send)); json_object_object_add(jobj, "time_len_recv", json_object_new_int(a_request->data_size_recv)); json_object_object_add(jobj, "err_code", json_object_new_int(a_request->err_code)); const char* l_json_str = json_object_to_json_string(jobj); // save statistics char *l_group = NULL; dap_chain_net_t * l_net = dap_chain_net_by_id(a_request->net_id); if(l_net) { l_group = dap_strdup_printf("local.%s.orders-test-stat", l_net->pub.gdb_groups_prefix); } if(l_group) { dap_global_db_get_last( l_group, s_save_stat_to_database_callback_get_last_stat, dap_strdup(l_json_str)); DAP_DELETE(l_group); } else l_ret = -2; json_object_put(jobj); return l_ret; } /** * @brief s_ch_chain_callback_notify_packet_R - Callback for channel 'R' * @param a_ch_chain * @param a_pkt_type * @param a_pkt * @param a_arg */ static void s_ch_chain_callback_notify_packet_R(dap_stream_ch_chain_net_srv_t* a_ch_chain, uint8_t a_pkt_type, dap_stream_ch_pkt_t *a_pkt, void * a_arg) { UNUSED(a_ch_chain); dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; switch (a_pkt_type) { // get new generated current node address case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE: { dap_stream_ch_chain_net_srv_pkt_test_t *l_request = (dap_stream_ch_chain_net_srv_pkt_test_t *) a_pkt->data; size_t l_request_size = l_request->data_size + sizeof(dap_stream_ch_chain_net_srv_pkt_test_t); if(a_pkt->hdr.data_size != l_request_size) { log_it(L_WARNING, "Wrong request size, less or more than required"); break; } s_save_stat_to_database(l_request, l_node_client); pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_CHECKED; dap_cond_signal(l_node_client->wait_cond); pthread_mutex_unlock(&l_node_client->wait_mutex); break; } case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE: case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS: case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR: break; default: break; } } /** * @brief dap_chain_node_client_create_n_connect * @param a_net * @param a_node_info * @param a_active_channels * @param a_callbacks * @param a_callback_arg * @return */ dap_chain_node_client_t *dap_chain_node_client_create_n_connect(dap_chain_net_t *a_net, dap_chain_node_info_t *a_node_info, const char *a_active_channels, const dap_chain_node_client_callbacks_t *a_callbacks, void *a_callback_arg) { dap_chain_node_client_t *l_node_client = dap_chain_node_client_create(a_net, a_node_info, a_callbacks, a_callback_arg); if (dap_chain_node_client_connect(l_node_client, a_active_channels)) return l_node_client; return NULL; } dap_chain_node_client_t *dap_chain_node_client_create(dap_chain_net_t *a_net, dap_chain_node_info_t *a_node_info, const dap_chain_node_client_callbacks_t *a_callbacks, void *a_callback_arg) { if(!a_node_info) { log_it(L_ERROR, "Can't connect to the node: null object node_info"); return NULL; } dap_chain_node_client_t *l_node_client = DAP_NEW_Z(dap_chain_node_client_t); l_node_client->state = NODE_CLIENT_STATE_DISCONNECTED; l_node_client->callbacks_arg = a_callback_arg; if (a_callbacks) l_node_client->callbacks = *a_callbacks; l_node_client->info = DAP_DUP(a_node_info); l_node_client->net = a_net; #ifndef _WIN32 pthread_condattr_t attr; pthread_condattr_init(&attr); #ifndef DAP_OS_DARWIN pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); #endif pthread_cond_init(&l_node_client->wait_cond, &attr); #else l_node_client->wait_cond = CreateEventA( NULL, FALSE, FALSE, NULL ); #endif pthread_mutex_init(&l_node_client->wait_mutex, NULL); l_node_client->remote_node_addr.uint64 = a_node_info->hdr.address.uint64; return l_node_client; } void s_client_delete_callback(UNUSED_ARG dap_client_t *a_client, void *a_arg) { // TODO make decision for possible client replacement assert(a_arg); ((dap_chain_node_client_t *)a_arg)->client = NULL; dap_chain_node_client_close_unsafe(a_arg); } /** * @brief dap_chain_node_client_connect * Create new dap_client, setup it, and send it in adventure trip * @param a_node_client dap_chain_node_client_t * @param a_active_channels a_active_channels * @return true * @return false */ bool dap_chain_node_client_connect(dap_chain_node_client_t *a_node_client, const char *a_active_channels) { if (!a_node_client) return false; a_node_client->client = dap_client_new(s_client_delete_callback, s_stage_status_error_callback, a_node_client); dap_client_set_is_always_reconnect(a_node_client->client, false); a_node_client->client->_inheritor = a_node_client; dap_client_set_active_channels_unsafe(a_node_client->client, a_active_channels); dap_client_set_auth_cert(a_node_client->client, a_node_client->net->pub.name); char l_host_addr[INET6_ADDRSTRLEN]; if(a_node_client->info->hdr.ext_addr_v4.s_addr){ struct sockaddr_in sa4 = { .sin_family = AF_INET, .sin_addr = a_node_client->info->hdr.ext_addr_v4 }; inet_ntop(AF_INET, &(((struct sockaddr_in *) &sa4)->sin_addr), l_host_addr, INET6_ADDRSTRLEN); } else { struct sockaddr_in6 sa6 = { .sin6_family = AF_INET6, .sin6_addr = a_node_client->info->hdr.ext_addr_v6 }; inet_ntop(AF_INET6, &(((struct sockaddr_in6 *) &sa6)->sin6_addr), l_host_addr, INET6_ADDRSTRLEN); } log_it(L_INFO, "Connecting to %s address", l_host_addr); // address not defined if(!strcmp(l_host_addr, "::")) { log_it(L_WARNING, "Undefined address with node client connect to"); return false; } int ret_code = 0; dap_client_set_uplink_unsafe(a_node_client->client, l_host_addr, a_node_client->info->hdr.ext_port); a_node_client->state = NODE_CLIENT_STATE_CONNECTING; // Handshake & connect dap_client_go_stage(a_node_client->client, STAGE_STREAM_STREAMING, s_stage_connected_callback); return true; } /** * @brief dap_chain_node_client_reset * * @param a_client dap_chain_node_client_t */ void dap_chain_node_client_reset(dap_chain_node_client_t *a_client) { if (a_client->state > NODE_CLIENT_STATE_ESTABLISHED) { a_client->state = NODE_CLIENT_STATE_ESTABLISHED; } } /** * @brief dap_chain_node_client_close * Close connection to server, delete chain_node_client_t *client * @param a_client dap_chain_node_client_t */ void dap_chain_node_client_close_unsafe(dap_chain_node_client_t *a_node_client) { char l_node_addr_str[INET_ADDRSTRLEN] = {}; inet_ntop(AF_INET, &a_node_client->info->hdr.ext_addr_v4, l_node_addr_str, INET_ADDRSTRLEN); log_it(L_INFO, "Closing node client to uplink %s:%d ["NODE_ADDR_FP_STR"]", l_node_addr_str, a_node_client->info->hdr.ext_port, NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr)); if (a_node_client->sync_timer) dap_timerfd_delete_unsafe(a_node_client->sync_timer); if (a_node_client->reconnect_timer) dap_timerfd_delete_mt(a_node_client->reconnect_timer->worker, a_node_client->reconnect_timer->esocket_uuid); if (a_node_client->callbacks.delete) a_node_client->callbacks.delete(a_node_client, a_node_client->net); if (a_node_client->stream_worker) { dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(a_node_client->stream_worker, a_node_client->ch_chain_uuid); if (l_ch) { dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); l_ch_chain->callback_notify_packet_in = NULL; l_ch_chain->callback_notify_packet_out = NULL; } l_ch = dap_stream_ch_find_by_uuid_unsafe(a_node_client->stream_worker, a_node_client->ch_chain_net_uuid); if (l_ch) { dap_stream_ch_chain_net_t *l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(l_ch); l_ch_chain_net->notify_callback = NULL; } } // clean client if(a_node_client->client){ a_node_client->client->delete_callback = NULL; dap_client_delete_unsafe(a_node_client->client); } #ifndef _WIN32 pthread_cond_destroy(&a_node_client->wait_cond); #else CloseHandle( a_node_client->wait_cond ); #endif pthread_mutex_destroy(&a_node_client->wait_mutex); DAP_DEL_Z(a_node_client->info); DAP_DELETE(a_node_client); } void s_close_on_worker_callback(UNUSED_ARG dap_worker_t *a_worker, void *a_arg) { assert(a_arg); dap_chain_node_client_close_unsafe(a_arg); } void dap_chain_node_client_close_mt(dap_chain_node_client_t *a_node_client) { if (a_node_client->client) dap_worker_exec_callback_on(DAP_CLIENT_PVT(a_node_client->client)->worker, s_close_on_worker_callback, a_node_client); else dap_chain_node_client_close_unsafe(a_node_client); } /** * @brief dap_chain_node_client_send_ch_pkt * Send stream request to server * @param a_client * @param a_ch_id * @param a_type * @param a_pkt_data * @param a_pkt_data_size * @return int */ int dap_chain_node_client_send_ch_pkt(dap_chain_node_client_t *a_client, uint8_t a_ch_id, uint8_t a_type, const void *a_pkt_data, size_t a_pkt_data_size) { if(!a_client || a_client->state < NODE_CLIENT_STATE_ESTABLISHED) return -1; dap_stream_worker_t *l_stream_worker = dap_client_get_stream_worker(a_client->client); dap_stream_ch_pkt_write_mt(l_stream_worker , a_client->ch_chain_uuid , a_type, a_pkt_data, a_pkt_data_size); return 0; } /** * @brief dap_chain_node_client_wait * wait for the complete of request * * timeout_ms timeout in milliseconds * waited_state state which we will wait, sample NODE_CLIENT_STATE_CONNECT or NODE_CLIENT_STATE_SENDED * @param a_client * @param a_waited_state * @param a_timeout_ms * @return int return -2 false, -1 timeout, 0 end of connection or sending data */ int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_state, int a_timeout_ms) { int ret = -1; if(!a_client){ log_it(L_ERROR, "Can't wait for status for (null) object"); return -3; } a_client->keep_connection = false; pthread_mutex_lock(&a_client->wait_mutex); // have waited if(a_client->state == a_waited_state) { log_it(L_INFO, "We're already in state %s",dap_chain_node_client_state_to_str(a_client->state)); pthread_mutex_unlock(&a_client->wait_mutex); return 0; } if (a_client->state < NODE_CLIENT_STATE_ESTABLISHED && a_waited_state > NODE_CLIENT_STATE_ESTABLISHED) { log_it(L_WARNING, "Waited state can't be achieved"); pthread_mutex_unlock(&a_client->wait_mutex); return -2; } #ifndef DAP_OS_WINDOWS // prepare for signal waiting struct timespec l_cond_timeout; clock_gettime( CLOCK_MONOTONIC, &l_cond_timeout); l_cond_timeout.tv_sec += a_timeout_ms/1000; // signal waiting dap_chain_node_client_state_t l_clinet_state = a_client->state; while (a_client->state == l_clinet_state) { int l_ret_wait = pthread_cond_timedwait(&a_client->wait_cond, &a_client->wait_mutex, &l_cond_timeout); if(l_ret_wait == 0 && ( a_client->state == a_waited_state || (a_client->state == NODE_CLIENT_STATE_ERROR || a_client->state == NODE_CLIENT_STATE_DISCONNECTED)) ) { ret = a_client->state == a_waited_state ? 0 : -2; break; } else if(l_ret_wait == ETIMEDOUT) { // 110 260 //log_it(L_NOTICE,"Wait for status is stopped by timeout"); ret = -1; break; }else if (l_ret_wait != 0 ){ char l_errbuf[128]; l_errbuf[0] = '\0'; strerror_r(l_ret_wait,l_errbuf,sizeof (l_errbuf)); log_it(L_ERROR, "Pthread condition timed wait returned \"%s\"(code %d)", l_errbuf, l_ret_wait); } } pthread_mutex_unlock(&a_client->wait_mutex); #else pthread_mutex_unlock( &a_client->wait_mutex ); DWORD wait = WaitForSingleObject( a_client->wait_cond, (uint32_t)a_timeout_ms); if ( wait == WAIT_OBJECT_0 && ( a_client->state == a_waited_state || a_client->state == NODE_CLIENT_STATE_ERROR || a_client->state == NODE_CLIENT_STATE_DISCONNECTED)) { return a_client->state == a_waited_state ? 0 : -2; } else if ( wait == WAIT_TIMEOUT || wait == WAIT_FAILED ) { return -1; } #endif return ret; } /** * @brief s_node_client_set_notify_callbacks * * @param a_client dap_client_t * @param a_ch_id uint8_t * @return int */ static int s_node_client_set_notify_callbacks(dap_client_t *a_client, uint8_t a_ch_id) { //TODO pass callbacks through stream creation to internal ch structures int l_ret = -1; dap_chain_node_client_t *l_node_client = a_client->_inheritor; if(l_node_client) { pthread_mutex_lock(&l_node_client->wait_mutex); // find current channel code dap_stream_ch_t *l_ch = dap_client_get_stream_ch_unsafe(a_client, a_ch_id); if(l_ch) { // C if(a_ch_id == dap_stream_ch_chain_get_id()) { dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); l_ch_chain->callback_notify_packet_out = s_ch_chain_callback_notify_packet_out; l_ch_chain->callback_notify_packet_in = s_ch_chain_callback_notify_packet_in; l_ch_chain->callback_notify_arg = l_node_client; l_node_client->ch_chain = l_ch; l_node_client->ch_chain_uuid = l_ch->uuid; } // N if(a_ch_id == dap_stream_ch_chain_net_get_id()) { dap_stream_ch_chain_net_t *l_ch_chain = DAP_STREAM_CH_CHAIN_NET(l_ch); l_ch_chain->notify_callback = s_ch_chain_callback_notify_packet_in2; l_ch_chain->notify_callback_arg = l_node_client; l_node_client->ch_chain_net = l_ch; l_node_client->ch_chain_net_uuid = l_ch->uuid; } // R if(a_ch_id == dap_stream_ch_chain_net_srv_get_id()) { dap_stream_ch_chain_net_srv_t *l_ch_chain = DAP_STREAM_CH_CHAIN_NET_SRV(l_ch); if (l_node_client->notify_callbacks.srv_pkt_in) { l_ch_chain->notify_callback = (dap_stream_ch_chain_net_srv_callback_packet_t) l_node_client->notify_callbacks.srv_pkt_in; l_ch_chain->notify_callback_arg = l_node_client->callbacks_arg; } else { l_ch_chain->notify_callback = s_ch_chain_callback_notify_packet_R; l_ch_chain->notify_callback_arg = l_node_client; } l_node_client->ch_chain_net_srv = l_ch; l_node_client->ch_chain_net_srv_uuid = l_ch->uuid; } // V if ( a_ch_id == dap_stream_ch_chain_voting_get_id() ) { dap_stream_ch_chain_voting_t *l_ch_chain = DAP_STREAM_CH_CHAIN_VOTING(l_ch); // l_ch_chain->callback_notify = s_ch_chain_callback_notify_voting_packet_in; l_ch_chain->callback_notify_arg = l_node_client; l_node_client->ch_chain_net = l_ch; l_node_client->ch_chain_net_uuid = l_ch->uuid; } l_ret = 0; } else { } pthread_mutex_unlock(&l_node_client->wait_mutex); } return l_ret; } /** * @brief dap_chain_node_client_send_nodelist_req * Send nodelist request to server * @param a_client * @return int */ int dap_chain_node_client_send_nodelist_req(dap_chain_node_client_t *a_client) { if(!a_client || !a_client->client || a_client->state < NODE_CLIENT_STATE_ESTABLISHED) return -1; //dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client->client); //TODO send request to get nodelist //dap_client_request_enc(a_client->client, DAP_UPLINK_PATH_NODE_LIST, "", "", "", 0, // nodelist_response_callback, nodelist_response_error_callback); return 1; }