diff --git a/dap_chain_net.c b/dap_chain_net.c index cc1da6e83c6959c3a5f217a32bd21ee0cf640254..29140ebb6ea240611fbd5826f0284eead3f4d416 100644 --- a/dap_chain_net.c +++ b/dap_chain_net.c @@ -107,8 +107,6 @@ typedef struct dap_chain_net_pvt{ #else HANDLE state_proc_cond; #endif - pthread_mutex_t state_mutex_cond; - pthread_mutex_t state_mutex; dap_chain_node_role_t node_role; uint32_t flags; time_t last_sync; @@ -137,6 +135,7 @@ typedef struct dap_chain_net_pvt{ dap_chain_net_state_t state; dap_chain_net_state_t state_target; + dap_chain_net_state_t state_new; } dap_chain_net_pvt_t; typedef struct dap_chain_net_item{ @@ -211,22 +210,10 @@ inline static const char * s_net_state_to_str(dap_chain_net_state_t l_state) */ int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_new_state) { - pthread_mutex_lock( &PVT(a_net)->state_mutex_cond); - if (PVT(a_net)->state_target == a_new_state){ + if (PVT(a_net)->state_new == a_new_state){ log_it(L_WARNING,"Already going to state %s",s_net_state_to_str(a_new_state)); } - if(pthread_mutex_trylock(&PVT(a_net)->state_mutex) == 0) { - PVT(a_net)->state_target = a_new_state; - pthread_mutex_unlock(&PVT(a_net)->state_mutex); - } - // set flag for sync - PVT(a_net)->flags |= F_DAP_CHAIN_NET_GO_SYNC; -#ifndef _WIN32 - pthread_cond_signal( &PVT(a_net)->state_proc_cond ); -#else - SetEvent( PVT(a_net)->state_proc_cond ); -#endif - pthread_mutex_unlock( &PVT(a_net)->state_mutex_cond); + PVT(a_net)->state_new = a_new_state; return 0; } @@ -285,18 +272,13 @@ static int s_net_states_proc(dap_chain_net_t * l_net) #endif int ret=0; - pthread_mutex_lock(&PVT(l_net)->state_mutex ); switch ( PVT(l_net)->state ){ case NET_STATE_OFFLINE:{ - //log_it(L_DEBUG,"%s.state: NET_STATE_OFFLINE",l_net->pub.name); + PVT(l_net)->state_target = PVT(l_net)->state_new; // reset current link PVT(l_net)->links_count = 0; // delete all links - dap_chain_node_client_t * l_node_client = NULL, *l_node_client_tmp = NULL; - HASH_ITER(hh,PVT(l_net)->links,l_node_client,l_node_client_tmp){ - HASH_DEL(PVT(l_net)->links, l_node_client); - dap_chain_node_client_close(l_node_client); - } + dap_chain_node_client_close(PVT(l_net)->links); PVT(l_net)->links_addrs_count = 0; if ( PVT(l_net)->links_addrs ) DAP_DELETE(PVT(l_net)->links_addrs); @@ -307,8 +289,8 @@ static int s_net_states_proc(dap_chain_net_t * l_net) break; } // disable SYNC_GDB - if(PVT(l_net)->flags & F_DAP_CHAIN_NET_GO_SYNC) - PVT(l_net)->flags &= ~F_DAP_CHAIN_NET_GO_SYNC; + PVT(l_net)->flags &= ~F_DAP_CHAIN_NET_GO_SYNC; + PVT(l_net)->last_sync = 0; } break; case NET_STATE_LINKS_PREPARE:{ log_it(L_NOTICE,"%s.state: NET_STATE_LINKS_PREPARE",l_net->pub.name); @@ -658,7 +640,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) if (l_res) { // try another link break; } - if(PVT(l_net)->state_target >= NET_STATE_ONLINE){ + if(PVT(l_net)->state_target >= NET_STATE_SYNC_CHAINS){ PVT(l_net)->state = NET_STATE_SYNC_CHAINS; } else { PVT(l_net)->state = NET_STATE_ONLINE; @@ -728,18 +710,14 @@ static int s_net_states_proc(dap_chain_net_t * l_net) PVT(l_net)->state = NET_STATE_LINKS_CONNECTING; break; } + log_it(L_INFO, "Synchronization done"); PVT(l_net)->state = NET_STATE_ONLINE; PVT(l_net)->links_count = 0; - // end sync, go to offline state - // PVT(l_net)->state_target = NET_STATE_OFFLINE; - // end sync, return to online state - //if(PVT(l_net)->state_target > NET_STATE_ONLINE) - // PVT(l_net)->state_target = NET_STATE_ONLINE; } break; case NET_STATE_ONLINE: { - //log_it(L_NOTICE, "State online"); + PVT(l_net)->state_target = PVT(l_net)->state_new; switch ( PVT(l_net)->state_target) { // disconnect case NET_STATE_OFFLINE: @@ -747,16 +725,11 @@ static int s_net_states_proc(dap_chain_net_t * l_net) log_it(L_NOTICE, "Going to disconnet"); break; case NET_STATE_ONLINE: - // if flag set then go to SYNC_GDB - if(PVT(l_net)->flags & F_DAP_CHAIN_NET_GO_SYNC) - PVT(l_net)->flags &= ~F_DAP_CHAIN_NET_GO_SYNC; - else + if((PVT(l_net)->flags & F_DAP_CHAIN_NET_GO_SYNC) == 0) break; - // sync case NET_STATE_SYNC_GDB: + case NET_STATE_SYNC_CHAINS: // if flag set then go to SYNC_GDB - if(PVT(l_net)->flags & F_DAP_CHAIN_NET_GO_SYNC) - PVT(l_net)->flags &= ~F_DAP_CHAIN_NET_GO_SYNC; PVT(l_net)->state = NET_STATE_LINKS_CONNECTING; break; default: break; @@ -765,7 +738,6 @@ static int s_net_states_proc(dap_chain_net_t * l_net) break; default: log_it (L_DEBUG, "Unprocessed state"); } - pthread_mutex_unlock(&PVT(l_net)->state_mutex ); return ret; } @@ -792,52 +764,20 @@ static void *s_net_proc_thread ( void *a_net ) dap_chain_net_t *l_net = (dap_chain_net_t *)a_net; dap_chain_net_pvt_t *p_net = (dap_chain_net_pvt_t *)(void *)l_net->pvt; - const uint64_t l_timeout_ms = 60000;// 60 sec - // set callback to update data //dap_chain_global_db_set_callback_for_update_base(s_net_proc_thread_callback_update_db); while( !(p_net->flags & F_DAP_CHAIN_NET_SHUTDOWN) ) { - // check or start sync s_net_states_proc( l_net ); - - - // wait if flag not set then go to SYNC_GDB - if(!(PVT(l_net)->flags & F_DAP_CHAIN_NET_GO_SYNC)) { - struct timespec l_to; -#ifndef _WIN32 - pthread_mutex_lock( &p_net->state_mutex_cond ); - // prepare for signal waiting - - clock_gettime( CLOCK_MONOTONIC, &l_to ); - int64_t l_nsec_new = l_to.tv_nsec + l_timeout_ms * 1000000ll; - - // if the new number of nanoseconds is more than a second - if(l_nsec_new > (long) 1e9) { - l_to.tv_sec += l_nsec_new / (long) 1e9; - l_to.tv_nsec = l_nsec_new % (long) 1e9; - } - else - l_to.tv_nsec = (long) l_nsec_new; - - // signal waiting - pthread_cond_timedwait( &p_net->state_proc_cond, &p_net->state_mutex_cond, &l_to ); - pthread_mutex_unlock(&p_net->state_mutex_cond); -#else // WIN32 - - WaitForSingleObject( p_net->state_proc_cond, (uint32_t)l_timeout_ms ); - -#endif - // checking whether new sync is needed - time_t l_sync_timeout = 300; // 300 sec = 5 min - clock_gettime(CLOCK_MONOTONIC, &l_to); - // start sync every l_sync_timeout sec - if(l_to.tv_sec >= p_net->last_sync + l_sync_timeout) { - p_net->flags |= F_DAP_CHAIN_NET_GO_SYNC; - } + // checking whether new sync is needed + struct timespec l_to; + time_t l_sync_timeout = 300; // 300 sec = 5 min + clock_gettime(CLOCK_MONOTONIC, &l_to); + // start sync every l_sync_timeout sec + if(p_net->last_sync && l_to.tv_sec >= p_net->last_sync + l_sync_timeout) { + p_net->flags |= F_DAP_CHAIN_NET_GO_SYNC; } - //log_it( L_DEBUG, "Waked up s_net_proc_thread( )" ); } return NULL; @@ -900,9 +840,6 @@ static dap_chain_net_t *s_net_new(const char * a_id, const char * a_name , dap_chain_net_t *ret = DAP_NEW_Z_SIZE( dap_chain_net_t, sizeof(ret->pub) + sizeof(dap_chain_net_pvt_t) ); ret->pub.name = strdup( a_name ); - pthread_mutex_init( &PVT(ret)->state_mutex, NULL ); - pthread_mutex_init( &PVT(ret)->state_mutex_cond, NULL ); - #ifndef _WIN32 pthread_condattr_t l_attr; pthread_condattr_init( &l_attr ); @@ -1156,12 +1093,12 @@ static int s_cli_net( int argc, char **argv, void *arg_func, char **a_str_reply) dap_chain_net_state_go_to(l_net, NET_STATE_ONLINE); dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" go from state %s to %s", l_net->pub.name,c_net_states[PVT(l_net)->state], - c_net_states[PVT(l_net)->state_target]); + c_net_states[PVT(l_net)->state_new]); } else if ( strcmp(l_go_str,"offline") == 0 ) { dap_chain_net_state_go_to(l_net, NET_STATE_OFFLINE); dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" go from state %s to %s", l_net->pub.name,c_net_states[PVT(l_net)->state], - c_net_states[PVT(l_net)->state_target]); + c_net_states[PVT(l_net)->state_new]); } else if(strcmp(l_go_str, "sync") == 0) { @@ -1181,7 +1118,7 @@ static int s_cli_net( int argc, char **argv, void *arg_func, char **a_str_reply) dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" has state %s (target state %s), active links %u from %u, cur node address not defined", l_net->pub.name, c_net_states[PVT(l_net)->state], - c_net_states[PVT(l_net)->state_target], HASH_COUNT(PVT(l_net)->links), + c_net_states[PVT(l_net)->state_target], PVT(l_net)->links_count, PVT(l_net)->links_addrs_count ); } @@ -1189,7 +1126,7 @@ static int s_cli_net( int argc, char **argv, void *arg_func, char **a_str_reply) dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" has state %s (target state %s), active links %u from %u, cur node address " NODE_ADDR_FP_STR, l_net->pub.name, c_net_states[PVT(l_net)->state], - c_net_states[PVT(l_net)->state_target], HASH_COUNT(PVT(l_net)->links), + c_net_states[PVT(l_net)->state_target], PVT(l_net)->links_count, PVT(l_net)->links_addrs_count, NODE_ADDR_FP_ARGS_S(l_cur_node_addr) ); diff --git a/dap_chain_net.h b/dap_chain_net.h index 270049d9295ef81bb6e426aac3949f159d6a9b20..8b76bfcf1b80bab2dd9ba984004fa09228b40d0b 100644 --- a/dap_chain_net.h +++ b/dap_chain_net.h @@ -58,9 +58,9 @@ typedef enum dap_chain_net_state{ NET_STATE_LINKS_CONNECTING, NET_STATE_LINKS_ESTABLISHED, NET_STATE_ADDR_REQUEST, // Waiting for address assign - NET_STATE_ONLINE, NET_STATE_SYNC_GDB, NET_STATE_SYNC_CHAINS, + NET_STATE_ONLINE } dap_chain_net_state_t; typedef struct dap_chain_net{ @@ -92,7 +92,7 @@ inline static int dap_chain_net_stop(dap_chain_net_t * a_net) { return dap_chain inline static int dap_chain_net_links_establish(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_LINKS_ESTABLISHED); } inline static int dap_chain_net_sync_chains(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_SYNC_CHAINS); } inline static int dap_chain_net_sync_gdb(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_SYNC_GDB); } -inline static int dap_chain_net_sync_all(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_SYNC_GDB); }//NET_STATE_ONLINE +inline static int dap_chain_net_sync_all(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_SYNC_CHAINS); }//NET_STATE_ONLINE void dap_chain_net_delete( dap_chain_net_t * a_net); void dap_chain_net_proc_mempool (dap_chain_net_t * a_net); diff --git a/dap_chain_node.c b/dap_chain_node.c index 77d5e6b87e8c1829356a7bb1a3f2796beba1b26e..b6a1eef1a2cfa96edcd4b06715f6ba9d7f7ff760 100644 --- a/dap_chain_node.c +++ b/dap_chain_node.c @@ -263,27 +263,36 @@ int dap_chain_node_mempool_process(dap_chain_t *a_chain) return 0; } -void dap_chain_node_mempool_periodic() +void dap_chain_node_mempool_periodic(void *a_param) { + UNUSED(a_param); size_t l_net_count; + bool l_mempool_auto; dap_chain_net_t **l_net_list = dap_chain_net_list(&l_net_count); for (int i = 0; i < l_net_count; i++) { - dap_chain_node_role_t l_role = dap_chain_net_get_role(l_net_list[i]); - dap_chain_t *l_chain; - switch (l_role.enums) { - case NODE_ROLE_ROOT: - case NODE_ROLE_MASTER: - case NODE_ROLE_ROOT_MASTER: - case NODE_ROLE_CELL_MASTER: { - DL_FOREACH(l_net_list[i]->pub.chains, l_chain) { - for(uint16_t i = 0; i < l_chain->datum_types_count; i++) { - if(l_chain->datum_types[i] == CHAIN_TYPE_TX) - dap_chain_node_mempool_process(l_chain); + l_mempool_auto = dap_config_get_item_bool_default(g_config, "mempool", "auto_proc", false); + if (!l_mempool_auto) { + dap_chain_node_role_t l_role = dap_chain_net_get_role(l_net_list[i]); + switch (l_role.enums) { + case NODE_ROLE_ROOT: + case NODE_ROLE_MASTER: + case NODE_ROLE_ROOT_MASTER: + case NODE_ROLE_CELL_MASTER: + l_mempool_auto = true; + break; + default: + break; + } + } + if (l_mempool_auto) { + dap_chain_t *l_chain; + DL_FOREACH(l_net_list[i]->pub.chains, l_chain) { + for (uint16_t i = 0; i < l_chain->datum_types_count; i++) { + if (l_chain->datum_types[i] == CHAIN_TYPE_TX) { + dap_chain_node_mempool_process(l_chain); } } - } break; - default: - break; + } } } DAP_DELETE(l_net_list); @@ -297,15 +306,12 @@ static void *s_mempool_timer = NULL; */ int dap_chain_node_mempool_init() { - if ( dap_config_get_item_bool_default( g_config, "mempool", "auto_proc", true) ){ - s_mempool_timer = dap_interval_timer_create(DAP_CHAIN_NODE_MEMPOOL_INTERVAL, dap_chain_node_mempool_periodic); - if (s_mempool_timer) { - return 0; - } else { - return -1; - } - }else + s_mempool_timer = dap_interval_timer_create(DAP_CHAIN_NODE_MEMPOOL_INTERVAL, dap_chain_node_mempool_periodic, 0); + if (s_mempool_timer) { return 0; + } else { + return -1; + } } /** diff --git a/dap_chain_node_client.c b/dap_chain_node_client.c index 3b536019d72bdea3343ff25eaa523df0c1816e85..6b8999a512fceca9c2a337b6d5e8269a185fb784 100644 --- a/dap_chain_node_client.c +++ b/dap_chain_node_client.c @@ -502,16 +502,9 @@ dap_chain_node_client_t* dap_chain_node_client_connect(dap_chain_node_info_t *a_ */ void dap_chain_node_client_close(dap_chain_node_client_t *a_client) { - if(a_client) { - - //pthread_mutex_lock(&a_client->wait_mutex); - //a_client->client->_inheritor = NULL; // because client->_inheritor == a_client - //pthread_mutex_unlock(&a_client->wait_mutex); - + if (a_client && a_client->client) { // block tryes to close twice // clean client dap_client_delete(a_client->client); - //a_client->client = NULL; - #ifndef _WIN32 pthread_cond_destroy(&a_client->wait_cond); #else