diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index aa01b74fc8ad9911af514f7d204f33c05254b755..a3dd9ebabc89f9fe0515bc24803c60c312cfbb16 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -106,6 +106,7 @@ typedef struct dap_chain_net_pvt{ #else HANDLE state_proc_cond; #endif + pthread_mutex_t state_mutex_cond; dap_chain_node_role_t node_role; uint32_t flags; time_t last_sync; @@ -134,7 +135,6 @@ typedef struct dap_chain_net_pvt{ dap_chain_net_state_t state; dap_chain_net_state_t state_target; - dap_chain_net_state_t state_new; } dap_chain_net_pvt_t; typedef struct dap_chain_net_item{ @@ -209,10 +209,19 @@ inline static const char * s_net_state_to_str(dap_chain_net_state_t l_state) */ int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_new_state) { - if (PVT(a_net)->state_new == a_new_state){ + if (PVT(a_net)->state_target == a_new_state){ log_it(L_WARNING,"Already going to state %s",s_net_state_to_str(a_new_state)); } - PVT(a_net)->state_new = a_new_state; + PVT(a_net)->state_target = a_new_state; + pthread_mutex_lock( &PVT(a_net)->state_mutex_cond); + // set flag for sync + PVT(a_net)->flags |= F_DAP_CHAIN_NET_GO_SYNC; +#ifndef _WIN32 + pthread_cond_signal( &PVT(a_net)->state_proc_cond ); +#else + SetEvent( PVT(a_net)->state_proc_cond ); +#endif + pthread_mutex_unlock( &PVT(a_net)->state_mutex_cond); return 0; } @@ -266,87 +275,84 @@ static void s_chain_callback_notify(void * a_arg, dap_chain_t *a_chain, dap_chai */ static int s_net_states_proc(dap_chain_net_t * l_net) { -#if DAP_DEBUG - dap_chain_net_pvt_t *pvt_debug = PVT(l_net); -#endif + + dap_chain_net_pvt_t *l_pvt_net = PVT(l_net); + int ret=0; - switch ( PVT(l_net)->state ){ + switch ( l_pvt_net->state ){ case NET_STATE_OFFLINE:{ - if (PVT(l_net)->state_new != NET_STATE_UNDEFINED) { - PVT(l_net)->state_target = PVT(l_net)->state_new; - PVT(l_net)->state_new = NET_STATE_UNDEFINED; - } // reset current link - PVT(l_net)->links_count = 0; + l_pvt_net->links_count = 0; // delete all links - dap_chain_node_client_close(PVT(l_net)->links); - PVT(l_net)->links_addrs_count = 0; - if ( PVT(l_net)->links_addrs ) - DAP_DELETE(PVT(l_net)->links_addrs); - PVT(l_net)->links_addrs = NULL; - - if ( PVT(l_net)->state_target != NET_STATE_OFFLINE ){ - PVT(l_net)->state = NET_STATE_LINKS_PREPARE; + dap_chain_node_client_close(l_pvt_net->links); + l_pvt_net->links = NULL; + l_pvt_net->links_addrs_count = 0; + if ( l_pvt_net->links_addrs ) + DAP_DELETE(l_pvt_net->links_addrs); + l_pvt_net->links_addrs = NULL; + + if ( l_pvt_net->state_target != NET_STATE_OFFLINE ){ + l_pvt_net->state = NET_STATE_LINKS_PREPARE; break; } // disable SYNC_GDB - PVT(l_net)->flags &= ~F_DAP_CHAIN_NET_GO_SYNC; - PVT(l_net)->last_sync = 0; + l_pvt_net->flags &= ~F_DAP_CHAIN_NET_GO_SYNC; + l_pvt_net->last_sync = 0; } break; case NET_STATE_LINKS_PREPARE:{ log_it(L_NOTICE,"%s.state: NET_STATE_LINKS_PREPARE",l_net->pub.name); - switch (PVT(l_net)->node_role.enums) { + switch (l_pvt_net->node_role.enums) { case NODE_ROLE_ROOT: case NODE_ROLE_ROOT_MASTER: case NODE_ROLE_ARCHIVE: case NODE_ROLE_CELL_MASTER:{ // This roles load predefined links from global_db - if ( PVT(l_net)->node_info ) { - if (PVT(l_net)->links_addrs ) - DAP_DELETE(PVT(l_net)->links_addrs); - PVT(l_net)->links_addrs_count = PVT(l_net)->node_info->hdr.links_number; - PVT(l_net)->links_addrs = DAP_NEW_Z_SIZE( dap_chain_node_addr_t, - PVT(l_net)->links_addrs_count * sizeof(dap_chain_node_addr_t)); - for (size_t i =0 ; i < PVT(l_net)->node_info->hdr.links_number; i++ ){ - PVT(l_net)->links_addrs[i].uint64 = PVT(l_net)->node_info->links[i].uint64; + if ( l_pvt_net->node_info ) { + if (l_pvt_net->links_addrs ) + DAP_DELETE(l_pvt_net->links_addrs); + l_pvt_net->links_addrs_count = l_pvt_net->node_info->hdr.links_number; + l_pvt_net->links_addrs = DAP_NEW_Z_SIZE( dap_chain_node_addr_t, + l_pvt_net->links_addrs_count * sizeof(dap_chain_node_addr_t)); + for (size_t i =0 ; i < l_pvt_net->node_info->hdr.links_number; i++ ){ + l_pvt_net->links_addrs[i].uint64 = l_pvt_net->node_info->links[i].uint64; } }else { log_it(L_WARNING,"No nodeinfo in global_db to prepare links for connecting, find nearest 3 links and fill global_db"); } // add other root nodes for connect - //if(!PVT(l_net)->links_addrs_count) + //if(!l_pvt_net->links_addrs_count) { // use no more then 4 root node - int l_use_root_nodes = min(4, PVT(l_net)->seed_aliases_count); - if(!PVT(l_net)->links_addrs_count) { - PVT(l_net)->links_addrs = DAP_NEW_Z_SIZE(dap_chain_node_addr_t, + int l_use_root_nodes = min(4, l_pvt_net->seed_aliases_count); + if(!l_pvt_net->links_addrs_count) { + l_pvt_net->links_addrs = DAP_NEW_Z_SIZE(dap_chain_node_addr_t, l_use_root_nodes * sizeof(dap_chain_node_addr_t)); } else{ - PVT(l_net)->links_addrs = DAP_REALLOC(PVT(l_net)->links_addrs, - (PVT(l_net)->links_addrs_count+l_use_root_nodes) * sizeof(dap_chain_node_addr_t)); - memset(PVT(l_net)->links_addrs + PVT(l_net)->links_addrs_count, 0, + l_pvt_net->links_addrs = DAP_REALLOC(l_pvt_net->links_addrs, + (l_pvt_net->links_addrs_count+l_use_root_nodes) * sizeof(dap_chain_node_addr_t)); + memset(l_pvt_net->links_addrs + l_pvt_net->links_addrs_count, 0, l_use_root_nodes * sizeof(dap_chain_node_addr_t)); } for(uint16_t i = 0; i < l_use_root_nodes; i++) { - dap_chain_node_addr_t * l_node_addr = dap_chain_node_alias_find(l_net, PVT(l_net)->seed_aliases[i]); + dap_chain_node_addr_t * l_node_addr = dap_chain_node_alias_find(l_net, l_pvt_net->seed_aliases[i]); if(l_node_addr) { - PVT(l_net)->links_addrs[PVT(l_net)->links_addrs_count].uint64 = l_node_addr->uint64; - PVT(l_net)->links_addrs_count++; + l_pvt_net->links_addrs[l_pvt_net->links_addrs_count].uint64 = l_node_addr->uint64; + l_pvt_net->links_addrs_count++; } } } // shuffle the order of the nodes - for(size_t i = 0; i < PVT(l_net)->links_addrs_count; i++) { - unsigned int l_new_node_pos = rand() % (PVT(l_net)->links_addrs_count); + for(size_t i = 0; i < l_pvt_net->links_addrs_count; i++) { + unsigned int l_new_node_pos = rand() % (l_pvt_net->links_addrs_count); if(i == l_new_node_pos) continue; - uint64_t l_tmp_uint64 = PVT(l_net)->links_addrs[i].uint64; - PVT(l_net)->links_addrs[i].uint64 = PVT(l_net)->links_addrs[l_new_node_pos].uint64; - PVT(l_net)->links_addrs[l_new_node_pos].uint64 = l_tmp_uint64; + uint64_t l_tmp_uint64 = l_pvt_net->links_addrs[i].uint64; + l_pvt_net->links_addrs[i].uint64 = l_pvt_net->links_addrs[l_new_node_pos].uint64; + l_pvt_net->links_addrs[l_new_node_pos].uint64 = l_tmp_uint64; } @@ -357,7 +363,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) // If we haven't any assigned shard - connect to root-0 if(1) { //if(l_net->pub.cell_id.uint64 == 0) { - //dap_chain_net_pvt_t *pvt_debug = PVT(l_net); + //dap_chain_net_pvt_t *pvt_debug = l_pvt_net; // get current node address dap_chain_node_addr_t l_address; l_address.uint64 = dap_chain_net_get_cur_addr(l_net) ? @@ -368,8 +374,8 @@ static int s_net_states_proc(dap_chain_net_t * l_net) dap_chain_node_info_t *l_cur_node_info = dap_chain_node_info_read(l_net, &l_address); if ( l_cur_node_info ) { - uint16_t l_links_addrs_count = l_cur_node_info->hdr.links_number + PVT(l_net)->seed_aliases_count; - PVT(l_net)->links_addrs = DAP_NEW_Z_SIZE(dap_chain_node_addr_t, + uint16_t l_links_addrs_count = l_cur_node_info->hdr.links_number + l_pvt_net->seed_aliases_count; + l_pvt_net->links_addrs = DAP_NEW_Z_SIZE(dap_chain_node_addr_t, l_links_addrs_count * sizeof(dap_chain_node_addr_t)); // add linked nodes for connect @@ -381,37 +387,37 @@ static int s_net_states_proc(dap_chain_net_t * l_net) // if only nodes from the same cell of cell=0 if(!l_cur_node_info->hdr.cell_id.uint64 || l_cur_node_info->hdr.cell_id.uint64 == l_remore_node_info->hdr.cell_id.uint64) { - PVT(l_net)->links_addrs[PVT(l_net)->links_addrs_count].uint64 = + l_pvt_net->links_addrs[l_pvt_net->links_addrs_count].uint64 = l_remore_node_info->hdr.address.uint64; - PVT(l_net)->links_addrs_count++; + l_pvt_net->links_addrs_count++; } DAP_DELETE(l_remore_node_info); } } } // if no links then add root nodes for connect - if(!PVT(l_net)->links_addrs_count){ + if(!l_pvt_net->links_addrs_count){ // use no more then 4 root node - int l_use_root_nodes = min(4, PVT(l_net)->seed_aliases_count); - PVT(l_net)->links_addrs = DAP_NEW_Z_SIZE(dap_chain_node_addr_t, + int l_use_root_nodes = min(4, l_pvt_net->seed_aliases_count); + l_pvt_net->links_addrs = DAP_NEW_Z_SIZE(dap_chain_node_addr_t, l_use_root_nodes * sizeof(dap_chain_node_addr_t)); for(uint16_t i = 0; i < l_use_root_nodes; i++) { - dap_chain_node_addr_t * l_node_addr = dap_chain_node_alias_find(l_net, PVT(l_net)->seed_aliases[i]); + dap_chain_node_addr_t * l_node_addr = dap_chain_node_alias_find(l_net, l_pvt_net->seed_aliases[i]); if(l_node_addr) { - PVT(l_net)->links_addrs[PVT(l_net)->links_addrs_count].uint64 = l_node_addr->uint64; - PVT(l_net)->links_addrs_count++; + l_pvt_net->links_addrs[l_pvt_net->links_addrs_count].uint64 = l_node_addr->uint64; + l_pvt_net->links_addrs_count++; } } } // shuffle the order of the nodes - for(size_t i = 0; i < PVT(l_net)->links_addrs_count; i++) { - unsigned int l_new_node_pos = rand() % (PVT(l_net)->links_addrs_count); + for(size_t i = 0; i < l_pvt_net->links_addrs_count; i++) { + unsigned int l_new_node_pos = rand() % (l_pvt_net->links_addrs_count); if(i==l_new_node_pos) continue; - uint64_t l_tmp_uint64 = PVT(l_net)->links_addrs[i].uint64; - PVT(l_net)->links_addrs[i].uint64 = PVT(l_net)->links_addrs[l_new_node_pos].uint64; - PVT(l_net)->links_addrs[l_new_node_pos].uint64 = l_tmp_uint64; + uint64_t l_tmp_uint64 = l_pvt_net->links_addrs[i].uint64; + l_pvt_net->links_addrs[i].uint64 = l_pvt_net->links_addrs[l_new_node_pos].uint64; + l_pvt_net->links_addrs[l_new_node_pos].uint64 = l_tmp_uint64; } DAP_DELETE(l_cur_node_info); }else { @@ -419,40 +425,40 @@ static int s_net_states_proc(dap_chain_net_t * l_net) } } break; } - if ( PVT(l_net)->state_target > NET_STATE_LINKS_PREPARE ){ - if ( PVT(l_net)->links_addrs_count>0 ) { // If links are present - PVT(l_net)->state = NET_STATE_LINKS_CONNECTING; - log_it(L_DEBUG,"Prepared %u links, start to establish them", PVT(l_net)->links_addrs_count ); + if ( l_pvt_net->state_target > NET_STATE_LINKS_PREPARE ){ + if ( l_pvt_net->links_addrs_count>0 ) { // If links are present + l_pvt_net->state = NET_STATE_LINKS_CONNECTING; + log_it(L_DEBUG,"Prepared %u links, start to establish them", l_pvt_net->links_addrs_count ); } else { log_it(L_WARNING,"No links for connecting, return back to OFFLINE state"); - PVT(l_net)->state = NET_STATE_OFFLINE; + l_pvt_net->state = NET_STATE_OFFLINE; // remove looping - PVT(l_net)->state_target = NET_STATE_OFFLINE; + l_pvt_net->state_target = NET_STATE_OFFLINE; } }else { log_it(L_WARNING,"Target state is NET_STATE_LINKS_PREPARE? Realy?"); - PVT(l_net)->state = NET_STATE_OFFLINE; + l_pvt_net->state = NET_STATE_OFFLINE; } } break; case NET_STATE_LINKS_CONNECTING:{ - size_t l_links_count = PVT(l_net)->links_count; + size_t l_links_count = l_pvt_net->links_count; if(l_links_count >= s_required_links_count || (l_links_count + 1) >= s_max_links_count) { // TODO what if other failed and we want more? } - if (l_links_count < PVT(l_net)->links_addrs_count) { - PVT(l_net)->links_count++; + if (l_links_count < l_pvt_net->links_addrs_count) { + l_pvt_net->links_count++; } else { log_it(L_NOTICE, "Can't establish enough links, go to offline"); - PVT(l_net)->state = NET_STATE_OFFLINE; - PVT(l_net)->state_target = NET_STATE_OFFLINE; + l_pvt_net->state = NET_STATE_OFFLINE; + l_pvt_net->state_target = NET_STATE_OFFLINE; break; } log_it(L_DEBUG, "%s.state: NET_STATE_LINKS_CONNECTING",l_net->pub.name); log_it(L_DEBUG, "Establishing connection with " NODE_ADDR_FP_STR, - NODE_ADDR_FP_ARGS_S( PVT(l_net)->links_addrs[l_links_count]) ); - dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, &PVT(l_net)->links_addrs[l_links_count]); + NODE_ADDR_FP_ARGS_S( l_pvt_net->links_addrs[l_links_count]) ); + dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, &l_pvt_net->links_addrs[l_links_count]); if ( l_link_node_info ) { dap_chain_node_client_t *l_node_client = dap_chain_node_client_connect(l_link_node_info); if(!l_node_client) { @@ -465,11 +471,12 @@ static int s_net_states_proc(dap_chain_net_t * l_net) int res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_CONNECTED, timeout_ms); if (res == 0 ){ log_it(L_DEBUG, "Connected link %u", l_links_count); - PVT(l_net)->links = l_node_client; - PVT(l_net)->state = NET_STATE_LINKS_ESTABLISHED; + l_pvt_net->links = l_node_client; + l_pvt_net->state = NET_STATE_LINKS_ESTABLISHED; }else { log_it(L_DEBUG, "Cant establish link %u", l_links_count); dap_chain_node_client_close(l_node_client); + l_node_client = NULL; } } } @@ -477,31 +484,31 @@ static int s_net_states_proc(dap_chain_net_t * l_net) case NET_STATE_LINKS_ESTABLISHED:{ log_it(L_DEBUG,"%s.state: NET_STATE_LINKS_ESTABLISHED",l_net->pub.name); - switch (PVT(l_net)->state_target) { + switch (l_pvt_net->state_target) { case NET_STATE_ONLINE:{ // Online - switch ( PVT(l_net)->node_role.enums ){ + switch ( l_pvt_net->node_role.enums ){ case NODE_ROLE_ROOT_MASTER: case NODE_ROLE_ROOT:{ /*dap_chain_node_client_t * l_node_client = NULL, *l_node_client_tmp = NULL; // Send everybody your address when linked - HASH_ITER(hh,PVT(l_net)->links,l_node_client,l_node_client_tmp){ + HASH_ITER(hh,l_pvt_net->links,l_node_client,l_node_client_tmp){ dap_stream_ch_chain_net_pkt_write(dap_client_get_stream_ch( l_node_client->client, dap_stream_ch_chain_net_get_id()), DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR, l_net->pub.id, dap_chain_net_get_cur_addr(l_net), sizeof (dap_chain_node_addr_t) ); }*/ - PVT(l_net)->state = NET_STATE_SYNC_GDB; + l_pvt_net->state = NET_STATE_SYNC_GDB; }break; case NODE_ROLE_CELL_MASTER: case NODE_ROLE_MASTER:{ - PVT(l_net)->state = NET_STATE_SYNC_GDB;//NET_STATE_ADDR_REQUEST; + l_pvt_net->state = NET_STATE_SYNC_GDB;//NET_STATE_ADDR_REQUEST; } break; default:{ // get addr for current node if it absent if(!dap_chain_net_get_cur_addr_int(l_net)) - PVT(l_net)->state = NET_STATE_ADDR_REQUEST; + l_pvt_net->state = NET_STATE_ADDR_REQUEST; else PVT( l_net)->state = NET_STATE_SYNC_GDB; } @@ -510,33 +517,33 @@ static int s_net_states_proc(dap_chain_net_t * l_net) break; case NET_STATE_SYNC_GDB: // we need only to sync gdb - PVT(l_net)->state = NET_STATE_SYNC_GDB ; - if ( PVT(l_net)->addr_request_attempts >=10 && PVT(l_net)->state == NET_STATE_ADDR_REQUEST){ - PVT(l_net)->addr_request_attempts = 0; - switch( PVT(l_net)->state_target){ + l_pvt_net->state = NET_STATE_SYNC_GDB ; + if ( l_pvt_net->addr_request_attempts >=10 && l_pvt_net->state == NET_STATE_ADDR_REQUEST){ + l_pvt_net->addr_request_attempts = 0; + switch( l_pvt_net->state_target){ case NET_STATE_ONLINE: case NET_STATE_SYNC_GDB: - PVT(l_net)->state = NET_STATE_SYNC_GDB; + l_pvt_net->state = NET_STATE_SYNC_GDB; break; case NET_STATE_SYNC_CHAINS: - PVT(l_net)->state = NET_STATE_SYNC_CHAINS; + l_pvt_net->state = NET_STATE_SYNC_CHAINS; break; default: { - PVT(l_net)->state = NET_STATE_OFFLINE; - PVT(l_net)->state_target = NET_STATE_OFFLINE; + l_pvt_net->state = NET_STATE_OFFLINE; + l_pvt_net->state_target = NET_STATE_OFFLINE; } } } break; case NET_STATE_SYNC_CHAINS: - PVT(l_net)->state = (PVT(l_net)->node_info && PVT(l_net)->node_info->hdr.address.uint64)? + l_pvt_net->state = (l_pvt_net->node_info && l_pvt_net->node_info->hdr.address.uint64)? NET_STATE_SYNC_CHAINS : NET_STATE_ADDR_REQUEST; break; case NET_STATE_ADDR_REQUEST : - PVT(l_net)->state = NET_STATE_ADDR_REQUEST; + l_pvt_net->state = NET_STATE_ADDR_REQUEST; break; default: break; } @@ -544,7 +551,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) // get addr for remote node case NET_STATE_ADDR_REQUEST: { int l_is_addr_leased = 0; - dap_chain_node_client_t *l_node_client = PVT(l_net)->links; + dap_chain_node_client_t *l_node_client = l_pvt_net->links; uint8_t l_ch_id = dap_stream_ch_chain_net_get_id(); // Channel id for chain net request dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch(l_node_client->client, l_ch_id); // set callback for l_ch_id @@ -557,27 +564,28 @@ static int s_net_states_proc(dap_chain_net_t * l_net) if (res == 0) { log_it(L_WARNING,"Can't send NODE_ADDR_REQUEST packet"); dap_chain_node_client_close(l_node_client); - PVT(l_net)->state = NET_STATE_LINKS_CONNECTING; + l_node_client = NULL; + l_pvt_net->state = NET_STATE_LINKS_CONNECTING; break; // try with another link } // wait for finishing of request int timeout_ms = 5000; // 5 sec = 5 000 ms // TODO add progress info to console - PVT(l_net)->addr_request_attempts++; + l_pvt_net->addr_request_attempts++; int l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_NODE_ADDR_LEASED, timeout_ms); switch (l_res) { case -1: log_it(L_WARNING,"Timeout with addr leasing"); // try again 3 times - if (PVT(l_net)->addr_request_attempts < 3) { + if (l_pvt_net->addr_request_attempts < 3) { break; } - PVT(l_net)->state = NET_STATE_LINKS_CONNECTING; + l_pvt_net->state = NET_STATE_LINKS_CONNECTING; break; // try with another link case 0: log_it(L_INFO, "Node address leased"); l_is_addr_leased++; - PVT(l_net)->addr_request_attempts = 0; + l_pvt_net->addr_request_attempts = 0; break; default: if (l_node_client->last_error[0]) { @@ -585,17 +593,17 @@ static int s_net_states_proc(dap_chain_net_t * l_net) l_node_client->last_error[0] = '\0'; } log_it(L_INFO, "Node address request error %d", l_res); - PVT(l_net)->state = NET_STATE_LINKS_CONNECTING; + l_pvt_net->state = NET_STATE_LINKS_CONNECTING; break; } if (l_is_addr_leased > 0) { - PVT(l_net)->state = NET_STATE_SYNC_GDB; + l_pvt_net->state = NET_STATE_SYNC_GDB; } } break; case NET_STATE_SYNC_GDB:{ // send request - dap_chain_node_client_t *l_node_client = PVT(l_net)->links; + dap_chain_node_client_t *l_node_client = l_pvt_net->links; dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; // Get last timestamp in log l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_node_client->remote_node_addr.uint64); @@ -622,7 +630,8 @@ static int s_net_states_proc(dap_chain_net_t * l_net) if (l_res == 0) { log_it(L_WARNING, "Can't send GDB sync request"); dap_chain_node_client_close(l_node_client); - PVT(l_net)->state = NET_STATE_LINKS_CONNECTING; + l_node_client = NULL; + l_pvt_net->state = NET_STATE_LINKS_CONNECTING; break; //try another link } @@ -643,21 +652,22 @@ static int s_net_states_proc(dap_chain_net_t * l_net) if (l_res) { // try another link break; } - if(PVT(l_net)->state_target >= NET_STATE_SYNC_CHAINS){ - PVT(l_net)->state = NET_STATE_SYNC_CHAINS; + if(l_pvt_net->state_target >= NET_STATE_SYNC_CHAINS){ + l_pvt_net->state = NET_STATE_SYNC_CHAINS; } else { - PVT(l_net)->state = NET_STATE_ONLINE; + l_pvt_net->flags &= ~F_DAP_CHAIN_NET_GO_SYNC; + l_pvt_net->state = NET_STATE_ONLINE; } } break; case NET_STATE_SYNC_CHAINS: { - dap_chain_node_client_t *l_node_client = PVT(l_net)->links; + dap_chain_node_client_t *l_node_client = l_pvt_net->links; uint8_t l_ch_id = dap_stream_ch_chain_get_id(); // Channel id for global_db and chains sync dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch(l_node_client->client, l_ch_id); if(!l_ch_chain) { log_it(L_DEBUG,"Can't get stream_ch for id='%c' ", l_ch_id); - PVT(l_net)->state = NET_STATE_LINKS_CONNECTING; + l_pvt_net->state = NET_STATE_LINKS_CONNECTING; break; } dap_chain_t * l_chain = NULL; @@ -695,7 +705,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) { struct timespec l_to; clock_gettime( CLOCK_MONOTONIC, &l_to); - PVT(l_net)->last_sync = l_to.tv_sec; + l_pvt_net->last_sync = l_to.tv_sec; } break; default: @@ -709,36 +719,35 @@ static int s_net_states_proc(dap_chain_net_t * l_net) //DAP_DELETE( l_atom_iter ); } dap_chain_node_client_close(l_node_client); + l_node_client = NULL; if (l_sync_errors) { - PVT(l_net)->state = NET_STATE_LINKS_CONNECTING; + l_pvt_net->state = NET_STATE_LINKS_CONNECTING; break; } log_it(L_INFO, "Synchronization done"); - PVT(l_net)->state = NET_STATE_ONLINE; - PVT(l_net)->links_count = 0; + l_pvt_net->flags &= ~F_DAP_CHAIN_NET_GO_SYNC; + l_pvt_net->state = NET_STATE_ONLINE; + l_pvt_net->links_count = 0; } break; case NET_STATE_ONLINE: { - if (PVT(l_net)->state_new != NET_STATE_UNDEFINED) { - PVT(l_net)->state_target = PVT(l_net)->state_new; - PVT(l_net)->state_new = NET_STATE_UNDEFINED; - } - switch ( PVT(l_net)->state_target) { + if (l_pvt_net->flags & F_DAP_CHAIN_NET_GO_SYNC) + { + switch ( l_pvt_net->state_target) { // disconnect case NET_STATE_OFFLINE: - PVT(l_net)->state = NET_STATE_OFFLINE; + l_pvt_net->state = NET_STATE_OFFLINE; log_it(L_NOTICE, "Going to disconnet"); break; case NET_STATE_ONLINE: - if((PVT(l_net)->flags & F_DAP_CHAIN_NET_GO_SYNC) == 0) - break; case NET_STATE_SYNC_GDB: case NET_STATE_SYNC_CHAINS: // if flag set then go to SYNC_GDB - PVT(l_net)->state = NET_STATE_LINKS_CONNECTING; + l_pvt_net->state = NET_STATE_LINKS_CONNECTING; break; default: break; + } } } break; @@ -770,18 +779,46 @@ static void *s_net_proc_thread ( void *a_net ) dap_chain_net_t *l_net = (dap_chain_net_t *)a_net; dap_chain_net_pvt_t *p_net = (dap_chain_net_pvt_t *)(void *)l_net->pvt; + const uint64_t l_timeout_ms = 60000;// 60 sec + // set callback to update data //dap_chain_global_db_set_callback_for_update_base(s_net_proc_thread_callback_update_db); while( !(p_net->flags & F_DAP_CHAIN_NET_SHUTDOWN) ) { + // check or start sync s_net_states_proc( l_net ); - // checking whether new sync is needed + +#ifndef _WIN32 + int l_ret = 0; struct timespec l_to; + // prepare for signal waiting + clock_gettime( CLOCK_MONOTONIC, &l_to ); + int64_t l_nsec_new = l_to.tv_nsec + l_timeout_ms * 1000000ll; + // if the new number of nanoseconds is more than a second + if(l_nsec_new > (long) 1e9) { + l_to.tv_sec += l_nsec_new / (long) 1e9; + l_to.tv_nsec = l_nsec_new % (long) 1e9; + } + else + l_to.tv_nsec = (long) l_nsec_new; + pthread_mutex_lock( &p_net->state_mutex_cond ); + // wait if flag not set then go to SYNC_GDB + while ((p_net->flags & F_DAP_CHAIN_NET_GO_SYNC) == 0 && l_ret == 0) { + // signal waiting + l_ret = pthread_cond_timedwait( &p_net->state_proc_cond, &p_net->state_mutex_cond, &l_to ); + } + pthread_mutex_unlock(&p_net->state_mutex_cond); +#else // WIN32 + + WaitForSingleObject( p_net->state_proc_cond, (uint32_t)l_timeout_ms ); + +#endif + // checking whether new sync is needed time_t l_sync_timeout = 300; // 300 sec = 5 min clock_gettime(CLOCK_MONOTONIC, &l_to); // start sync every l_sync_timeout sec - if(p_net->last_sync && l_to.tv_sec >= p_net->last_sync + l_sync_timeout) { + if(l_to.tv_sec >= p_net->last_sync + l_sync_timeout) { p_net->flags |= F_DAP_CHAIN_NET_GO_SYNC; } } @@ -1099,12 +1136,12 @@ static int s_cli_net( int argc, char **argv, void *arg_func, char **a_str_reply) dap_chain_net_state_go_to(l_net, NET_STATE_ONLINE); dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" go from state %s to %s", l_net->pub.name,c_net_states[PVT(l_net)->state], - c_net_states[PVT(l_net)->state_new]); + c_net_states[PVT(l_net)->state_target]); } else if ( strcmp(l_go_str,"offline") == 0 ) { dap_chain_net_state_go_to(l_net, NET_STATE_OFFLINE); dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" go from state %s to %s", l_net->pub.name,c_net_states[PVT(l_net)->state], - c_net_states[PVT(l_net)->state_new]); + c_net_states[PVT(l_net)->state_target]); } else if(strcmp(l_go_str, "sync") == 0) { diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 4b253daab7eb3e756518c63a28bfc31269f1ee08..b8d1c88a33f71d79ccfedc03a82205f0450ae625 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -523,7 +523,8 @@ void dap_chain_node_client_close(dap_chain_node_client_t *a_client) CloseHandle( a_client->wait_cond ); #endif pthread_mutex_destroy(&a_client->wait_mutex); - DAP_DEL_Z(a_client); + a_client->client = NULL; + DAP_DELETE(a_client); } } diff --git a/modules/net/include/dap_chain_net.h b/modules/net/include/dap_chain_net.h index 1ee163f511085e3ec452a041b0bc162b152de67b..174cfb338759548a74010e1f9f13fd8841777d29 100644 --- a/modules/net/include/dap_chain_net.h +++ b/modules/net/include/dap_chain_net.h @@ -61,8 +61,7 @@ typedef enum dap_chain_net_state{ NET_STATE_ADDR_REQUEST, // Waiting for address assign NET_STATE_SYNC_GDB, NET_STATE_SYNC_CHAINS, - NET_STATE_ONLINE, - NET_STATE_UNDEFINED + NET_STATE_ONLINE } dap_chain_net_state_t; typedef struct dap_chain_net{