From e686fb7aa4c6ad664db1c7a9704692199f8e47c9 Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net> Date: Fri, 15 Jan 2021 01:00:34 +0700 Subject: [PATCH] ... --- modules/net/dap_chain_net.c | 434 ++++++++++-------- modules/net/dap_chain_node_cli_cmd.c | 4 +- modules/net/dap_chain_node_client.c | 19 +- modules/net/include/dap_chain_node_client.h | 17 +- .../service/vpn/dap_chain_net_vpn_client.c | 4 +- 5 files changed, 262 insertions(+), 216 deletions(-) diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 2de8c47ba3..4173ca89b7 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -62,6 +62,10 @@ #include "dap_cert_file.h" #include "dap_timerfd.h" +#include "dap_stream_worker.h" +#include "dap_worker.h" +#include "dap_proc_queue.h" +#include "dap_proc_thread.h" #include "dap_enc_http.h" #include "dap_chain_common.h" @@ -136,8 +140,13 @@ typedef struct dap_chain_net_pvt{ dap_chain_node_addr_t * node_addr; dap_chain_node_info_t * node_info; // Current node's info + // Established links dap_list_t *links; // Links list + size_t links_count; + + // Prepared links dap_list_t *links_info; // Links info list + atomic_uint links_dns_requests; bool load_mode; @@ -202,6 +211,7 @@ static void s_node_link_callback_stage(dap_chain_node_client_t * a_node_client,d static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client, int a_error, void * a_arg); static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg); +static bool s_node_link_states_proc(dap_proc_thread_t *a_thread, void *a_arg); static void s_net_state_link_prepare_success(dap_worker_t * a_worker,dap_chain_node_info_t * a_node_info, void * a_arg); static void s_net_state_link_prepare_error(dap_worker_t * a_worker,dap_chain_node_info_t * a_node_info, void * a_arg, int a_errno); @@ -413,6 +423,23 @@ static void s_fill_links_from_root_aliases(dap_chain_net_t * a_net) static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_client, void * a_arg) { dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; + dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); + dap_chain_node_info_t * l_link_info = a_node_client->info; + + a_node_client->state = NODE_CLIENT_STATE_ESTABLISHED; + + log_it(L_DEBUG, "Established connection with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address)); + pthread_rwlock_wrlock(&l_net_pvt->rwlock); + l_net_pvt->links = dap_list_append(l_net_pvt->links, a_node_client); + l_net_pvt->links_count++; + size_t l_links_count = l_net_pvt->links_count; + + if(l_net_pvt->state == NET_STATE_LINKS_CONNECTING ) + l_net_pvt->state = NET_STATE_LINKS_ESTABLISHED; + pthread_rwlock_unlock(&l_net_pvt->rwlock); + + dap_proc_queue_add_callback_inter(dap_client_get_stream_worker(a_node_client->client)->worker->proc_queue_input, + s_node_link_states_proc, a_node_client); } /** @@ -447,6 +474,9 @@ static void s_node_link_callback_stage(dap_chain_node_client_t * a_node_client,d static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client, int a_error, void * a_arg) { dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; + log_it(L_DEBUG, "Can't establish link with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address)); + dap_chain_node_client_close(l_node_client); + l_node_client = NULL; } @@ -500,6 +530,205 @@ static void s_net_state_link_prepare_error(dap_worker_t * a_worker,dap_chain_nod l_net_pvt->links_dns_requests--; } +/** + * @brief s_node_link_states_proc + * @param a_thread + * @param a_arg + * @return + */ +static bool s_node_link_states_proc(dap_proc_thread_t *a_thread, void *a_arg) +{ + bool l_repeate_after_exit = false; + dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t*) a_arg; + assert(l_node_client); + + dap_chain_net_pvt_t * l_net_pvt = PVT(l_node_client->net); + switch (l_node_client->state) { + case NODE_CLIENT_STATE_ESTABLISHED: + if(l_node_client->sync_chains){ + l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS; + l_repeate_after_exit = true; + } + if(l_node_client->sync_gdb){ + l_node_client->state = NODE_CLIENT_STATE_SYNC_GDB; + l_repeate_after_exit = true; + } + break; + case NODE_CLIENT_STATE_SYNC_GDB:{ + dap_stream_worker_t *l_worker = dap_client_get_stream_worker(l_node_client->client); + dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, dap_stream_ch_chain_get_id()); + if ( !l_ch_chain) { // Channel or stream or client itself closed + l_tmp = dap_list_next(l_tmp); + dap_chain_node_client_close(l_node_client); + l_net_pvt->links = dap_list_remove(l_net_pvt->links, l_node_client); + continue; + } + + dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; + // Get last timestamp in log if wasn't SYNC_FROM_ZERO flag + if (! (l_net_pvt->flags & F_DAP_CHAIN_NET_SYNC_FROM_ZERO) ) + l_sync_gdb.id_start = (uint64_t) dap_db_get_last_id_remote(l_node_client->remote_node_addr.uint64); + l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); + log_it(L_DEBUG, "Prepared request to gdb sync from %llu to %llu", l_sync_gdb.id_start, l_sync_gdb.id_end?l_sync_gdb.id_end:-1 ); + // find dap_chain_id_t + dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb"); + dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t ) {0}; + dap_chain_node_client_reset(l_node_client); + size_t l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id, + l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); + if (l_res == 0) { + log_it(L_WARNING, "Can't send GDB sync request"); + continue; + } + + // wait for finishing of request + int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms + // TODO add progress info to console + l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); + switch (l_res) { + case -1: + log_it(L_WARNING, "Timeout with link sync gdb"); + break; + case 0: + log_it(L_INFO, "Node sync gdb completed"); + break; + default: + log_it(L_INFO, "Node sync gdb error %d",l_res); + } + + dap_chain_node_client_reset(l_node_client); + l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS, l_net->pub.id, + l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); + l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); + switch (l_res) { + case -1: + log_it(L_WARNING, "Timeout with reverse link gdb sync"); + break; + case 0: + log_it(L_INFO, "Node reverse gdb sync completed"); + break; + default: + log_it(L_INFO, "Node reverse gdb sync error %d",l_res); + } + + // ----- + if (!l_net_pvt->links) { + l_net_pvt->state = NET_STATE_LINKS_PREPARE; + } else if (l_net_pvt->state_target >= NET_STATE_SYNC_CHAINS) { + l_net_pvt->state = NET_STATE_SYNC_CHAINS; + } else { // Synchronization done, go offline + log_it(L_INFO, "Synchronization done, go offline"); + l_net_pvt->flags ^= F_DAP_CHAIN_NET_GO_SYNC; + l_net_pvt->flags ^= F_DAP_CHAIN_NET_SYNC_FROM_ZERO; + l_net_pvt->state = l_net_pvt->state_target = NET_STATE_OFFLINE; + } + } break; + case NODE_CLIENT_STATE_SYNC_CHAINS:{ + dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data; + dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, dap_stream_ch_chain_get_id()); + if (!l_ch_chain) { // Channel or stream or client itself closed + l_tmp = dap_list_next(l_tmp); + dap_chain_node_client_close(l_node_client); + l_net_pvt->links = dap_list_remove(l_net_pvt->links, l_node_client); + continue; + } + dap_stream_worker_t *l_worker = dap_client_get_stream_worker(l_node_client->client); + dap_chain_t * l_chain = NULL; + int l_res = 0; + DL_FOREACH (l_net->pub.chains, l_chain) { + dap_chain_node_client_reset(l_node_client); + dap_stream_ch_chain_sync_request_t l_request = {0}; + + // TODO: Uncomment next block when finish with partial updates + /* + if (! (l_pvt_net->flags & F_DAP_CHAIN_NET_SYNC_FROM_ZERO) ) + dap_chain_get_atom_last_hash(l_chain,&l_request.hash_from); + */ + + if ( !dap_hash_fast_is_blank(&l_request.hash_from) ){ + if(dap_log_level_get() <= L_DEBUG){ + char l_hash_str[128]={[0]='\0'}; + dap_chain_hash_fast_to_str(&l_request.hash_from,l_hash_str,sizeof (l_hash_str)-1); + log_it(L_DEBUG,"Send sync chain request to"NODE_ADDR_FP_STR" for %s:%s from %s to infinity", + NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr) ,l_net->pub.name, l_chain->name, l_hash_str); + } + }else + log_it(L_DEBUG,"Send sync chain request for all the chains for addr "NODE_ADDR_FP_STR, + NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr)); + dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id, + l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request)); + // wait for finishing of request + int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms + // TODO add progress info to console + l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); + switch (l_res) { + case -1: + //log_it(L_WARNING, "Timeout with sync of chain '%s' ", l_chain->name); + break; + case 0: + l_need_flush = true; + log_it(L_INFO, "Sync of chain '%s' completed ", l_chain->name); + break; + default: + log_it(L_ERROR, "Sync of chain '%s' error %d", l_chain->name,l_res); + } + + + dap_chain_node_client_reset(l_node_client); + + l_request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); + dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS, l_net->pub.id, + l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request)); + l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); + switch (l_res) { + case -1: + //log_it(L_WARNING, "Timeout with reverse sync of chain '%s' ", l_chain->name); + break; + case 0: + l_need_flush = true; + log_it(L_INFO, "Reverse sync of chain '%s' completed ", l_chain->name); + // set time of last sync + { + struct timespec l_to; + clock_gettime(CLOCK_MONOTONIC, &l_to); + l_net_pvt->last_sync = l_to.tv_sec; + } + break; + default: + log_it(L_ERROR, "Reverse sync of chain '%s' error %d", l_chain->name,l_res); + } + + } + + ///------------------- + if (l_need_flush) { + // flush global_db + dap_chain_global_db_flush(); + } + if (!l_net_pvt->links ) { + log_it( L_INFO,"Return back to state LINKS_PREPARE "); + l_net_pvt->state = NET_STATE_LINKS_PREPARE; + } else { + if (l_net_pvt->state_target == NET_STATE_ONLINE) { + l_net_pvt->flags ^= F_DAP_CHAIN_NET_GO_SYNC; + l_net_pvt->flags ^= F_DAP_CHAIN_NET_SYNC_FROM_ZERO; + l_net_pvt->state = NET_STATE_ONLINE; + log_it(L_INFO, "Synchronization done, status online"); + } else { // Synchronization done, go offline + l_net_pvt->state = l_net_pvt->state_target = NET_STATE_OFFLINE; + log_it(L_INFO, "Synchronization done, go offline"); + } + } + }break; + case NODE_CLIENT_STATE_SYNCED: + break; + default:{ + log_it(L_WARNING,"Non-processing node client state %d", l_node_client->state); + } + } + return l_repeate_after_exit; +} + /** * @brief s_net_states_proc * @param l_net @@ -563,6 +792,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) if (l_net_pvt->seed_aliases_count) { // Add other root nodes as synchronization links s_fill_links_from_root_aliases(l_net); + l_repeat_after_exit = true; break; } } @@ -620,8 +850,9 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) } l_tries++; } - if (l_sync_fill_root_nodes) + if (l_sync_fill_root_nodes){ s_fill_links_from_root_aliases(l_net); + } } break; } if (l_net_pvt->state_target != NET_STATE_OFFLINE) { @@ -636,215 +867,20 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) } else { l_net_pvt->state = NET_STATE_OFFLINE; } + l_repeat_after_exit = true; + } break; case NET_STATE_LINKS_CONNECTING: { log_it(L_DEBUG, "%s.state: NET_STATE_LINKS_CONNECTING",l_net->pub.name); for (dap_list_t *l_tmp = l_net_pvt->links_info; l_tmp; l_tmp = dap_list_next(l_tmp)) { dap_chain_node_info_t *l_link_info = (dap_chain_node_info_t *)l_tmp->data; - dap_chain_node_client_t *l_node_client = dap_chain_node_client_create_n_connect(l_link_info,"CN",s_node_link_callback_connected, + dap_chain_node_client_t *l_node_client = dap_chain_node_client_create_n_connect(l_net, l_link_info,"CN",s_node_link_callback_connected, s_node_link_callback_disconnected,s_node_link_callback_stage, s_node_link_callback_error,NULL); - if (l_node_client) { - // wait connected - int res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_CONNECTED, 20000 ); - if (res == 0 ) { - log_it(L_DEBUG, "Established connection with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address)); - l_net_pvt->links = dap_list_append(l_net_pvt->links, l_node_client); - } else { - log_it(L_DEBUG, "Can't establish link with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address)); - dap_chain_node_client_close(l_node_client); - l_node_client = NULL; - } - } - if (dap_list_length(l_net_pvt->links) >= s_required_links_count) { - break; - } - } - if (l_net_pvt->links) { // We have at least one working link - l_net_pvt->state = NET_STATE_SYNC_GDB; - } else { // Try to find another links - struct timespec l_sleep = {3, 0}; - nanosleep(&l_sleep, NULL); - l_net_pvt->state = NET_STATE_OFFLINE; } } break; - case NET_STATE_SYNC_GDB:{ - for (dap_list_t *l_tmp = l_net_pvt->links; l_tmp; ) { - dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data; - dap_stream_worker_t *l_worker = dap_client_get_stream_worker(l_node_client->client); - dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, dap_stream_ch_chain_get_id()); - if ( !l_ch_chain) { // Channel or stream or client itself closed - l_tmp = dap_list_next(l_tmp); - dap_chain_node_client_close(l_node_client); - l_net_pvt->links = dap_list_remove(l_net_pvt->links, l_node_client); - continue; - } - - dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; - // Get last timestamp in log if wasn't SYNC_FROM_ZERO flag - if (! (l_net_pvt->flags & F_DAP_CHAIN_NET_SYNC_FROM_ZERO) ) - l_sync_gdb.id_start = (uint64_t) dap_db_get_last_id_remote(l_node_client->remote_node_addr.uint64); - l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); - log_it(L_DEBUG, "Prepared request to gdb sync from %llu to %llu", l_sync_gdb.id_start, l_sync_gdb.id_end?l_sync_gdb.id_end:-1 ); - // find dap_chain_id_t - dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb"); - dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t ) {0}; - dap_chain_node_client_reset(l_node_client); - size_t l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id, - l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); - if (l_res == 0) { - log_it(L_WARNING, "Can't send GDB sync request"); - continue; - } - - // wait for finishing of request - int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms - // TODO add progress info to console - l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); - switch (l_res) { - case -1: - log_it(L_WARNING, "Timeout with link sync gdb"); - break; - case 0: - log_it(L_INFO, "Node sync gdb completed"); - break; - default: - log_it(L_INFO, "Node sync gdb error %d",l_res); - } - - dap_chain_node_client_reset(l_node_client); - l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS, l_net->pub.id, - l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); - l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); - switch (l_res) { - case -1: - log_it(L_WARNING, "Timeout with reverse link gdb sync"); - break; - case 0: - log_it(L_INFO, "Node reverse gdb sync completed"); - break; - default: - log_it(L_INFO, "Node reverse gdb sync error %d",l_res); - } - - l_tmp = dap_list_next(l_tmp); - } - if (!l_net_pvt->links) { - l_net_pvt->state = NET_STATE_LINKS_PREPARE; - } else if (l_net_pvt->state_target >= NET_STATE_SYNC_CHAINS) { - l_net_pvt->state = NET_STATE_SYNC_CHAINS; - } else { // Synchronization done, go offline - log_it(L_INFO, "Synchronization done, go offline"); - l_net_pvt->flags ^= F_DAP_CHAIN_NET_GO_SYNC; - l_net_pvt->flags ^= F_DAP_CHAIN_NET_SYNC_FROM_ZERO; - l_net_pvt->state = l_net_pvt->state_target = NET_STATE_OFFLINE; - } - } - break; - - case NET_STATE_SYNC_CHAINS: { - bool l_need_flush = false; - for (dap_list_t *l_tmp = l_net_pvt->links; l_tmp; l_tmp = dap_list_next(l_tmp)) { - dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data; - dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, dap_stream_ch_chain_get_id()); - if (!l_ch_chain) { // Channel or stream or client itself closed - l_tmp = dap_list_next(l_tmp); - dap_chain_node_client_close(l_node_client); - l_net_pvt->links = dap_list_remove(l_net_pvt->links, l_node_client); - continue; - } - dap_stream_worker_t *l_worker = dap_client_get_stream_worker(l_node_client->client); - dap_chain_t * l_chain = NULL; - int l_res = 0; - DL_FOREACH (l_net->pub.chains, l_chain) { - dap_chain_node_client_reset(l_node_client); - dap_stream_ch_chain_sync_request_t l_request = {0}; - - // TODO: Uncomment next block when finish with partial updates - /* - if (! (l_pvt_net->flags & F_DAP_CHAIN_NET_SYNC_FROM_ZERO) ) - dap_chain_get_atom_last_hash(l_chain,&l_request.hash_from); - */ - - if ( !dap_hash_fast_is_blank(&l_request.hash_from) ){ - if(dap_log_level_get() <= L_DEBUG){ - char l_hash_str[128]={[0]='\0'}; - dap_chain_hash_fast_to_str(&l_request.hash_from,l_hash_str,sizeof (l_hash_str)-1); - log_it(L_DEBUG,"Send sync chain request to"NODE_ADDR_FP_STR" for %s:%s from %s to infinity", - NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr) ,l_net->pub.name, l_chain->name, l_hash_str); - } - }else - log_it(L_DEBUG,"Send sync chain request for all the chains for addr "NODE_ADDR_FP_STR, - NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr)); - dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id, - l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request)); - // wait for finishing of request - int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms - // TODO add progress info to console - l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); - switch (l_res) { - case -1: - //log_it(L_WARNING, "Timeout with sync of chain '%s' ", l_chain->name); - break; - case 0: - l_need_flush = true; - log_it(L_INFO, "Sync of chain '%s' completed ", l_chain->name); - break; - default: - log_it(L_ERROR, "Sync of chain '%s' error %d", l_chain->name,l_res); - } - - - dap_chain_node_client_reset(l_node_client); - - l_request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); - dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS, l_net->pub.id, - l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request)); - l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); - switch (l_res) { - case -1: - //log_it(L_WARNING, "Timeout with reverse sync of chain '%s' ", l_chain->name); - break; - case 0: - l_need_flush = true; - log_it(L_INFO, "Reverse sync of chain '%s' completed ", l_chain->name); - // set time of last sync - { - struct timespec l_to; - clock_gettime(CLOCK_MONOTONIC, &l_to); - l_net_pvt->last_sync = l_to.tv_sec; - } - break; - default: - log_it(L_ERROR, "Reverse sync of chain '%s' error %d", l_chain->name,l_res); - } - - } - l_tmp = dap_list_next(l_tmp); - } - if (l_need_flush) { - // flush global_db - dap_chain_global_db_flush(); - } - if (!l_net_pvt->links ) { - log_it( L_INFO,"Return back to state LINKS_PREPARE "); - l_net_pvt->state = NET_STATE_LINKS_PREPARE; - } else { - if (l_net_pvt->state_target == NET_STATE_ONLINE) { - l_net_pvt->flags ^= F_DAP_CHAIN_NET_GO_SYNC; - l_net_pvt->flags ^= F_DAP_CHAIN_NET_SYNC_FROM_ZERO; - l_net_pvt->state = NET_STATE_ONLINE; - log_it(L_INFO, "Synchronization done, status online"); - } else { // Synchronization done, go offline - l_net_pvt->state = l_net_pvt->state_target = NET_STATE_OFFLINE; - log_it(L_INFO, "Synchronization done, go offline"); - } - } - } - break; - case NET_STATE_ONLINE: { if (l_net_pvt->flags & F_DAP_CHAIN_NET_GO_SYNC) { diff --git a/modules/net/dap_chain_node_cli_cmd.c b/modules/net/dap_chain_node_cli_cmd.c index fa6935d9a4..7535458cd4 100644 --- a/modules/net/dap_chain_node_cli_cmd.c +++ b/modules/net/dap_chain_node_cli_cmd.c @@ -1002,7 +1002,7 @@ int com_node(int a_argc, char ** a_argv, void *arg_func, char **a_str_reply) } // wait connected int timeout_ms = 7000; // 7 sec = 7000 ms - res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_CONNECTED, timeout_ms); + res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_ESTABLISHED, timeout_ms); // select new node addr if(l_is_auto && res){ if(l_remote_node_addr && l_nodes_count>1){ @@ -1228,7 +1228,7 @@ int com_node(int a_argc, char ** a_argv, void *arg_func, char **a_str_reply) return -7; } // wait handshake - int res = dap_chain_node_client_wait(client, NODE_CLIENT_STATE_CONNECTED, timeout_ms); + int res = dap_chain_node_client_wait(client, NODE_CLIENT_STATE_ESTABLISHED, timeout_ms); if(res != 1) { dap_chain_node_cli_set_reply_text(a_str_reply, "no response from node"); // clean client struct diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index c5b1d9da96..c2d34c109a 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -182,7 +182,7 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) l_node_client->callback_connected(l_node_client, a_arg); l_node_client->keep_connection = true; log_it(L_DEBUG, "Wakeup all who waits"); - l_node_client->state = NODE_CLIENT_STATE_CONNECTED; + l_node_client->state = NODE_CLIENT_STATE_ESTABLISHED; #ifndef _WIN32 pthread_cond_broadcast(&l_node_client->wait_cond); #else @@ -412,6 +412,7 @@ dap_chain_node_client_t* dap_chain_client_connect(dap_chain_node_info_t *a_node_ /** * @brief dap_chain_node_client_go_stage + * @param a_net * @param a_node_info * @param a_active_channels * @param a_callback_connected @@ -421,7 +422,7 @@ dap_chain_node_client_t* dap_chain_client_connect(dap_chain_node_info_t *a_node_ * @param a_callback_arg * @return */ -dap_chain_node_client_t* dap_chain_node_client_create_n_connect(dap_chain_node_info_t *a_node_info, +dap_chain_node_client_t* dap_chain_node_client_create_n_connect(dap_chain_net_t * a_net, dap_chain_node_info_t *a_node_info, const char *a_active_channels, dap_chain_node_client_callback_t a_callback_connected, dap_chain_node_client_callback_t a_callback_disconnected, dap_chain_node_client_callback_stage_t a_callback_stage, dap_chain_node_client_callback_error_t a_callback_error, void * a_callback_arg ) @@ -437,6 +438,8 @@ dap_chain_node_client_t* dap_chain_node_client_create_n_connect(dap_chain_node_i l_node_client->callback_discconnected = a_callback_disconnected; l_node_client->callback_error = a_callback_error; l_node_client->callback_stage = a_callback_stage; + l_node_client->info = a_node_info; + l_node_client->net = a_net; #ifndef _WIN32 pthread_condattr_t attr; @@ -479,7 +482,7 @@ dap_chain_node_client_t* dap_chain_node_client_create_n_connect(dap_chain_node_i // dap_client_stage_t a_stage_target = STAGE_ENC_INIT; // dap_client_stage_t l_stage_target = STAGE_STREAM_STREAMING; - l_node_client->state = NODE_CLIENT_STATE_CONNECT; + l_node_client->state = NODE_CLIENT_STATE_CONNECTING ; // ref pvt client //dap_client_pvt_ref(DAP_CLIENT_PVT(l_node_client->client)); // Handshake & connect @@ -500,8 +503,8 @@ dap_chain_node_client_t* dap_chain_node_client_connect(dap_chain_node_info_t *a_ void dap_chain_node_client_reset(dap_chain_node_client_t *a_client) { - if (a_client->state > NODE_CLIENT_STATE_CONNECTED) { - a_client->state = NODE_CLIENT_STATE_CONNECTED; + if (a_client->state > NODE_CLIENT_STATE_ESTABLISHED) { + a_client->state = NODE_CLIENT_STATE_ESTABLISHED; } } @@ -530,7 +533,7 @@ void dap_chain_node_client_close(dap_chain_node_client_t *a_client) int dap_chain_node_client_send_ch_pkt(dap_chain_node_client_t *a_client, uint8_t a_ch_id, uint8_t a_type, const void *a_pkt_data, size_t a_pkt_data_size) { - if(!a_client || a_client->state < NODE_CLIENT_STATE_CONNECTED) + if(!a_client || a_client->state < NODE_CLIENT_STATE_ESTABLISHED) return -1; dap_stream_worker_t *l_stream_worker = dap_client_get_stream_worker(a_client->client); @@ -566,7 +569,7 @@ int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_s return 0; } - if (a_client->state < NODE_CLIENT_STATE_CONNECTED && a_waited_state > NODE_CLIENT_STATE_CONNECTED) { + if (a_client->state < NODE_CLIENT_STATE_ESTABLISHED && a_waited_state > NODE_CLIENT_STATE_ESTABLISHED) { log_it(L_WARNING, "Waited state can't be achieved"); pthread_mutex_unlock(&a_client->wait_mutex); return -2; @@ -676,7 +679,7 @@ static void nodelist_response_error_callback(dap_client_t *a_client, int a_err) */ int dap_chain_node_client_send_nodelist_req(dap_chain_node_client_t *a_client) { - if(!a_client || !a_client->client || a_client->state < NODE_CLIENT_STATE_CONNECTED) + if(!a_client || !a_client->client || a_client->state < NODE_CLIENT_STATE_ESTABLISHED) return -1; //dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client->client); diff --git a/modules/net/include/dap_chain_node_client.h b/modules/net/include/dap_chain_node_client.h index aedb408b74..8a4d01e127 100644 --- a/modules/net/include/dap_chain_node_client.h +++ b/modules/net/include/dap_chain_node_client.h @@ -37,8 +37,8 @@ typedef enum dap_chain_node_client_state { NODE_CLIENT_STATE_NODE_ADDR_LEASED = 2, NODE_CLIENT_STATE_PING = 3, NODE_CLIENT_STATE_PONG = 4, - NODE_CLIENT_STATE_CONNECT = 5, - NODE_CLIENT_STATE_CONNECTED = 100, + NODE_CLIENT_STATE_CONNECTING = 5, + NODE_CLIENT_STATE_ESTABLISHED = 100, //NODE_CLIENT_STATE_SEND, //NODE_CLIENT_STATE_SENDED, NODE_CLIENT_STATE_SYNC_GDB = 101, @@ -56,9 +56,16 @@ typedef void (*dap_chain_node_client_callback_error_t)(dap_chain_node_client_t * // state for a client connection typedef struct dap_chain_node_client { dap_chain_node_client_state_t state; + + bool sync_gdb; + bool sync_chains; + dap_chain_cell_id_t cell_id; dap_client_t *client; + dap_chain_node_info_t * info; dap_events_t *events; + + dap_chain_net_t * net; char last_error[128]; #ifndef _WIN32 @@ -91,7 +98,7 @@ int dap_chain_node_client_init(void); void dap_chain_node_client_deinit(void); -dap_chain_node_client_t* dap_chain_node_client_create_n_connect(dap_chain_node_info_t *a_node_info, const char *a_active_channels, +dap_chain_node_client_t* dap_chain_node_client_create_n_connect(dap_chain_net_t * a_net, dap_chain_node_info_t *a_node_info, const char *a_active_channels, dap_chain_node_client_callback_t a_callback_connected, dap_chain_node_client_callback_t a_callback_disconnected, dap_chain_node_client_callback_stage_t a_callback_stage, @@ -145,8 +152,8 @@ static inline const char * dap_chain_node_client_state_to_str( dap_chain_node_cl case NODE_CLIENT_STATE_NODE_ADDR_LEASED: return "NODE_ADDR_LEASED"; case NODE_CLIENT_STATE_PING: return "PING"; case NODE_CLIENT_STATE_PONG: return "PONG"; - case NODE_CLIENT_STATE_CONNECT: return "CONNECT"; - case NODE_CLIENT_STATE_CONNECTED: return "CONNECTED"; + case NODE_CLIENT_STATE_CONNECTING: return "CONNECT"; + case NODE_CLIENT_STATE_ESTABLISHED: return "CONNECTED"; case NODE_CLIENT_STATE_SYNC_GDB: return "SYNC_GDB"; case NODE_CLIENT_STATE_SYNC_CHAINS: return "SYNC_CHAINS"; case NODE_CLIENT_STATE_SYNCED: return "SYNCED"; diff --git a/modules/service/vpn/dap_chain_net_vpn_client.c b/modules/service/vpn/dap_chain_net_vpn_client.c index d60335d635..27d606e7f1 100644 --- a/modules/service/vpn/dap_chain_net_vpn_client.c +++ b/modules/service/vpn/dap_chain_net_vpn_client.c @@ -448,7 +448,7 @@ int dap_chain_net_vpn_client_check(dap_chain_net_t *a_net, const char *a_ipv4_st } // wait connected int l_timeout_ms = l_timeout_conn_ms; //5 sec = 5000 ms - int l_res = dap_chain_node_client_wait(s_vpn_client, NODE_CLIENT_STATE_CONNECTED, l_timeout_ms); + int l_res = dap_chain_node_client_wait(s_vpn_client, NODE_CLIENT_STATE_ESTABLISHED, l_timeout_ms); if(l_res) { log_it(L_ERROR, "No response from VPN server=%s:%d", a_ipv4_str, a_port); // clean client struct @@ -543,7 +543,7 @@ int dap_chain_net_vpn_client_start(dap_chain_net_t *a_net, const char *a_ipv4_st } // wait connected int timeout_ms = 5000; //5 sec = 5000 ms - int res = dap_chain_node_client_wait(s_vpn_client, NODE_CLIENT_STATE_CONNECTED, timeout_ms); + int res = dap_chain_node_client_wait(s_vpn_client, NODE_CLIENT_STATE_ESTABLISHED, timeout_ms); if(res) { log_it(L_ERROR, "No response from VPN server=%s:%d", a_ipv4_str, a_port); // clean client struct -- GitLab