From 69f2079057b2b5288a30a16df915d79c469ebd59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Al=D0=B5x=D0=B0nder=20Lysik=D0=BEv?= <alexander.lysikov@demlabs.net> Date: Thu, 27 Jun 2019 22:23:37 +0500 Subject: [PATCH] modified sync order --- dap_chain_net.c | 122 +++++++++++++++++++++++++++++++++++++++--------- dap_chain_net.h | 4 +- 2 files changed, 101 insertions(+), 25 deletions(-) diff --git a/dap_chain_net.c b/dap_chain_net.c index 958f226588..7e0e47f621 100644 --- a/dap_chain_net.c +++ b/dap_chain_net.c @@ -25,6 +25,7 @@ #include <stddef.h> #include <string.h> #include <pthread.h> +#include <errno.h> #include "uthash.h" #include "utlist.h" @@ -32,6 +33,7 @@ #include "dap_common.h" #include "dap_string.h" #include "dap_strfuncs.h" +#include "dap_file_utils.h" #include "dap_config.h" #include "dap_hash.h" #include "dap_chain_net.h" @@ -77,6 +79,7 @@ typedef struct dap_chain_net_pvt{ dap_chain_node_info_t * node_info; // Current node's info dap_chain_node_client_t * links; + size_t link_cur; size_t links_count; dap_chain_node_addr_t *links_addrs; @@ -124,6 +127,8 @@ static int s_net_states_proc(dap_chain_net_t * l_net); static void * s_net_proc_thread ( void * a_net); static void s_net_proc_thread_start( dap_chain_net_t * a_net ); static void s_net_proc_kill( dap_chain_net_t * a_net ); +int s_net_load(const char * a_net_name); + static void s_gbd_history_callback_notify (void * a_arg,const char a_op_code, const char * a_prefix, const char * a_group, const char * a_key, const void * a_value, const size_t a_value_len); @@ -185,6 +190,7 @@ static void s_gbd_history_callback_notify (void * a_arg, const char a_op_code, c } + /** * @brief s_net_states_proc * @param l_net @@ -198,6 +204,9 @@ lb_proc_state: switch ( PVT(l_net)->state ){ case NET_STATE_OFFLINE:{ log_it(L_NOTICE,"%s.state: NET_STATE_OFFLINE",l_net->pub.name); + // reset current link + PVT(l_net)->link_cur = 0; + // delete all links dap_chain_node_client_t * l_node_client = NULL, *l_node_client_tmp = NULL; HASH_ITER(hh,PVT(l_net)->links,l_node_client,l_node_client_tmp){ HASH_DEL(PVT(l_net)->links, l_node_client); @@ -246,28 +255,33 @@ lb_proc_state: l_address.uint64 = dap_chain_net_get_cur_addr(l_net) ? dap_chain_net_get_cur_addr(l_net)->uint64 : dap_db_get_cur_node_addr(); + // get current node info dap_chain_node_info_t *l_cur_node_info = dap_chain_node_info_read(l_net, &l_address); - uint16_t l_links_addrs_count = l_cur_node_info->hdr.links_number + PVT(l_net)->seed_aliases_count; - PVT(l_net)->links_addrs = DAP_NEW_Z_SIZE(dap_chain_node_addr_t, - l_links_addrs_count * sizeof(dap_chain_node_addr_t)); - - // add linked nodes for connect - for(uint16_t i = 0; i < min(1, l_cur_node_info->hdr.links_number); i++) { - dap_chain_node_addr_t *l_addr = l_cur_node_info->links + i; - dap_chain_node_info_t *l_remore_node_info = dap_chain_node_info_read(l_net, l_addr); - // if only nodes from the same cell - if(l_cur_node_info->hdr.cell_id.uint64 == l_remore_node_info->hdr.cell_id.uint64) { - PVT(l_net)->links_addrs[PVT(l_net)->links_addrs_count].uint64 = - l_remore_node_info->hdr.address.uint64; - PVT(l_net)->links_addrs_count++; + if ( l_cur_node_info ) { + uint16_t l_links_addrs_count = l_cur_node_info->hdr.links_number + PVT(l_net)->seed_aliases_count; + PVT(l_net)->links_addrs = DAP_NEW_Z_SIZE(dap_chain_node_addr_t, + l_links_addrs_count * sizeof(dap_chain_node_addr_t)); + + // add linked nodes for connect + for(uint16_t i = 0; i < min(1, l_cur_node_info->hdr.links_number); i++) { + dap_chain_node_addr_t *l_addr = l_cur_node_info->links + i; + dap_chain_node_info_t *l_remore_node_info = dap_chain_node_info_read(l_net, l_addr); + // if only nodes from the same cell + if(l_cur_node_info->hdr.cell_id.uint64 == l_remore_node_info->hdr.cell_id.uint64) { + PVT(l_net)->links_addrs[PVT(l_net)->links_addrs_count].uint64 = + l_remore_node_info->hdr.address.uint64; + PVT(l_net)->links_addrs_count++; + } + DAP_DELETE(l_remore_node_info); } - DAP_DELETE(l_remore_node_info); } - // add root nodes for connect - if(!PVT(l_net)->links_addrs_count) + if(!PVT(l_net)->links_addrs_count){ + PVT(l_net)->links_addrs = DAP_NEW_Z_SIZE(dap_chain_node_addr_t, + min(1, PVT(l_net)->seed_aliases_count) * sizeof(dap_chain_node_addr_t)); + for(uint16_t i = 0; i < min(1, PVT(l_net)->seed_aliases_count); i++) { dap_chain_node_addr_t * l_node_addr = dap_chain_node_alias_find(l_net, PVT(l_net)->seed_aliases[i]); if(l_node_addr) { @@ -275,6 +289,7 @@ lb_proc_state: PVT(l_net)->links_addrs_count++; } } + } DAP_DELETE(l_cur_node_info); }else { // TODO read cell's nodelist and populate array with it @@ -297,7 +312,9 @@ lb_proc_state: case NET_STATE_LINKS_CONNECTING:{ log_it(L_NOTICE,"%s.state: NET_STATE_LINKS_CONNECTING",l_net->pub.name); size_t l_links_established = 0; - for (size_t i =0 ; i < PVT(l_net)->links_addrs_count ; i++ ){ + //for (size_t i =0 ; i < PVT(l_net)->links_addrs_count ; i++ ) + for (size_t i = PVT(l_net)->link_cur ; i < PVT(l_net)->links_addrs_count ; i++ ) + { log_it(L_INFO,"Establishing connection with " NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S( PVT(l_net)->links_addrs[i]) ); dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, &PVT(l_net)->links_addrs[i] ); @@ -315,6 +332,8 @@ lb_proc_state: log_it(L_NOTICE, "Connected link %u",i); l_links_established++; HASH_ADD(hh,PVT(l_net)->links, remote_node_addr,sizeof(l_node_client->remote_node_addr), l_node_client); + PVT(l_net)->link_cur++; + break; }else { log_it(L_NOTICE, "Cant establish link %u",i); dap_chain_node_client_close(l_node_client); @@ -455,6 +474,7 @@ lb_proc_state: log_it(L_WARNING,"Can't send GDB sync request"); HASH_DEL(PVT(l_net)->links,l_node_client); dap_chain_node_client_close(l_node_client); + PVT(l_net)->state = NET_STATE_OFFLINE; continue; } @@ -518,6 +538,9 @@ lb_proc_state: } PVT(l_net)->state = NET_STATE_ONLINE; + // end sync, return to online state + if(PVT(l_net)->state_target > NET_STATE_ONLINE) + PVT(l_net)->state_target = NET_STATE_ONLINE; }pthread_mutex_unlock(&PVT(l_net)->state_mutex ); goto lb_proc_state; case NET_STATE_ONLINE: { @@ -568,10 +591,34 @@ static void * s_net_proc_thread ( void * a_net) else l_to.tv_nsec = (long) l_nsec_new; // signal waiting - pthread_cond_timedwait(&PVT(l_net)->state_proc_cond, &PVT(l_net)->state_mutex, &l_to); + int l_ret = pthread_cond_timedwait(&PVT(l_net)->state_proc_cond, &PVT(l_net)->state_mutex, &l_to); //pthread_cond_wait(&PVT(l_net)->state_proc_cond,&PVT(l_net)->state_mutex); pthread_mutex_unlock( &PVT(l_net)->state_mutex ); - log_it( L_DEBUG, "Waked up net proHASH_COUNT( c thread"); + // only check connection + if(l_ret==ETIMEDOUT){ + // send ping +/* dap_stream_ch_chain_net_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_PING, + l_ch_chain_net_pkt->hdr.net_id, NULL, 0); + + if(0 == dap_stream_ch_chain_pkt_write(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, + l_net->pub.id, l_chain_id_null, l_chain_cell_id_null, &l_sync_request, + sizeof(l_sync_request))) { + dap_chain_node_cli_set_reply_text(a_str_reply, "Error: Cant send sync chains request"); + // clean client struct + dap_chain_node_client_close(l_node_client); + DAP_DELETE(l_remote_node_info); + return -1; + } + dap_stream_ch_set_ready_to_write(l_ch_chain, true); + // wait pong + timeout_ms = 120000; // 20 min = 1200 sec = 1 200 000 ms + res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_PONG, timeout_ms); + if(res) { + PVT(l_net)->state = NET_STATE_OFFLINE; + }*/ + + } + log_it( L_DEBUG, "Waked up net proHASH_COUNT( c thread, cond_wait=%d", l_ret); } return NULL; @@ -696,8 +743,33 @@ int dap_chain_net_init() dap_chain_global_db_add_history_group_prefix("global"); dap_chain_global_db_add_history_callback_notify("global", s_gbd_history_callback_notify, NULL ); + return 0; +} - return 0; +void dap_chain_net_load_all() +{ + char * l_net_dir_str = dap_strdup_printf("%s/network", dap_config_path()); + DIR * l_net_dir = opendir( l_net_dir_str); + if ( l_net_dir ){ + struct dirent * l_dir_entry; + while ( (l_dir_entry = readdir(l_net_dir) )!= NULL ){ + if (l_dir_entry->d_name[0]=='\0' || l_dir_entry->d_name[0]=='.') + continue; + // don't search in directories + char * l_full_path = dap_strdup_printf("%s/%s", l_net_dir_str, l_dir_entry->d_name); + if(dap_dir_test(l_full_path)) { + DAP_DELETE(l_full_path); + continue; + } + DAP_DELETE(l_full_path); + log_it(L_DEBUG,"Network config %s try to load", l_dir_entry->d_name); + char* l_dot_pos = rindex(l_dir_entry->d_name,'.'); + if ( l_dot_pos ) + *l_dot_pos = '\0'; + s_net_load(l_dir_entry->d_name ); + } + } + DAP_DELETE (l_net_dir_str); } /** @@ -869,8 +941,12 @@ static int s_cli_net(int argc, char ** argv, char **a_str_reply) return ret; } - -int dap_chain_net_load(const char * a_net_name) +/** + * @brief s_net_load + * @param a_net_name + * @return + */ +int s_net_load(const char * a_net_name) { static dap_config_t *l_cfg=NULL; dap_string_t *l_cfg_path = dap_string_new("network/"); @@ -1113,7 +1189,7 @@ int dap_chain_net_load(const char * a_net_name) } - if (s_seed_mode) { // If we seed we do everything manual. First think - prefil list of node_addrs and its aliases + if (s_seed_mode || !dap_config_get_item_bool_default(g_config ,"general", "auto_online",false ) ) { // If we seed we do everything manual. First think - prefil list of node_addrs and its aliases PVT(l_net)->state_target = NET_STATE_OFFLINE; } PVT(l_net)->load_mode = false; diff --git a/dap_chain_net.h b/dap_chain_net.h index 08ec015c8a..71258ab56e 100644 --- a/dap_chain_net.h +++ b/dap_chain_net.h @@ -67,7 +67,7 @@ typedef struct dap_chain_net{ int dap_chain_net_init(void); void dap_chain_net_deinit(void); -int dap_chain_net_load(const char * a_net_name); +void dap_chain_net_load_all(); int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_new_state); @@ -76,7 +76,7 @@ inline static int dap_chain_net_stop(dap_chain_net_t * a_net) { return dap_chain inline static int dap_chain_net_links_establish(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_LINKS_ESTABLISHED); } inline static int dap_chain_net_sync_chains(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_SYNC_CHAINS); } inline static int dap_chain_net_sync_gdb(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_SYNC_GDB); } -inline static int dap_chain_net_sync_all(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_ONLINE); } +inline static int dap_chain_net_sync_all(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_SYNC_GDB); }//NET_STATE_ONLINE void dap_chain_net_delete( dap_chain_net_t * a_net); void dap_chain_net_proc_datapool (dap_chain_net_t * a_net); -- GitLab