diff --git a/dap_chain_net.c b/dap_chain_net.c index af39eb127d75f0879679fc25063b4a4bcf2b5410..958f226588f64228b7f0471541578786c080b62b 100644 --- a/dap_chain_net.c +++ b/dap_chain_net.c @@ -239,13 +239,44 @@ lb_proc_state: case NODE_ROLE_MASTER: case NODE_ROLE_LIGHT:{ // If we haven't any assigned shard - connect to root-0 - if ( l_net->pub.cell_id.uint64 == 0 ){ - PVT(l_net)->links_addrs_count=1; - PVT(l_net)->links_addrs = DAP_NEW_Z_SIZE(dap_chain_node_addr_t, - PVT(l_net)->links_addrs_count * sizeof(dap_chain_node_addr_t)); - dap_chain_node_addr_t * l_node_addr = dap_chain_node_alias_find(l_net, PVT(l_net)->seed_aliases[0] ); - PVT(l_net)->links_addrs[0].uint64 = l_node_addr? l_node_addr->uint64 : 0; - }else { + if(l_net->pub.cell_id.uint64 == 0) { + + // get current node address + dap_chain_node_addr_t l_address; + l_address.uint64 = dap_chain_net_get_cur_addr(l_net) ? + dap_chain_net_get_cur_addr(l_net)->uint64 : + dap_db_get_cur_node_addr(); + // get current node info + dap_chain_node_info_t *l_cur_node_info = dap_chain_node_info_read(l_net, &l_address); + + uint16_t l_links_addrs_count = l_cur_node_info->hdr.links_number + PVT(l_net)->seed_aliases_count; + PVT(l_net)->links_addrs = DAP_NEW_Z_SIZE(dap_chain_node_addr_t, + l_links_addrs_count * sizeof(dap_chain_node_addr_t)); + + // add linked nodes for connect + for(uint16_t i = 0; i < min(1, l_cur_node_info->hdr.links_number); i++) { + dap_chain_node_addr_t *l_addr = l_cur_node_info->links + i; + dap_chain_node_info_t *l_remore_node_info = dap_chain_node_info_read(l_net, l_addr); + // if only nodes from the same cell + if(l_cur_node_info->hdr.cell_id.uint64 == l_remore_node_info->hdr.cell_id.uint64) { + PVT(l_net)->links_addrs[PVT(l_net)->links_addrs_count].uint64 = + l_remore_node_info->hdr.address.uint64; + PVT(l_net)->links_addrs_count++; + } + DAP_DELETE(l_remore_node_info); + } + + // add root nodes for connect + if(!PVT(l_net)->links_addrs_count) + for(uint16_t i = 0; i < min(1, PVT(l_net)->seed_aliases_count); i++) { + dap_chain_node_addr_t * l_node_addr = dap_chain_node_alias_find(l_net, PVT(l_net)->seed_aliases[i]); + if(l_node_addr) { + PVT(l_net)->links_addrs[PVT(l_net)->links_addrs_count].uint64 = l_node_addr->uint64; + PVT(l_net)->links_addrs_count++; + } + } + DAP_DELETE(l_cur_node_info); + }else { // TODO read cell's nodelist and populate array with it } } break; @@ -402,11 +433,20 @@ lb_proc_state: HASH_ITER(hh,PVT(l_net)->links,l_node_client,l_node_client_tmp){ dap_stream_ch_chain_sync_request_t l_sync_gdb = {{0}}; // Get last timestamp in log - l_sync_gdb.ts_start = (uint64_t) dap_db_log_get_last_timestamp_remote(l_node_client->remote_node_addr.uint64); + l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_node_client->remote_node_addr.uint64); // no limit - l_sync_gdb.ts_end = (uint64_t)0;// time(NULL); + l_sync_gdb.id_end = (uint64_t)0; + + l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ? + dap_chain_net_get_cur_addr(l_net)->uint64 : + dap_db_get_cur_node_addr(); - log_it(L_DEBUG,"Prepared request to gdb sync from %llu to %llu",l_sync_gdb.ts_start,l_sync_gdb.ts_end); + dap_chain_id_t l_chain_id_null = { { 0 } }; + dap_chain_cell_id_t l_chain_cell_id_null = { { 0 } }; + l_chain_id_null.uint64 = l_net->pub.id.uint64; + l_chain_cell_id_null.uint64 = dap_chain_net_get_cur_cell(l_net) ? dap_chain_net_get_cur_cell(l_net)->uint64 : 0; + + log_it(L_DEBUG,"Prepared request to gdb sync from %llu to %llu",l_sync_gdb.id_start,l_sync_gdb.id_end); size_t l_res = dap_stream_ch_chain_pkt_write( dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id() ) , DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id, (dap_chain_id_t){{0}} , @@ -515,7 +555,7 @@ static void * s_net_proc_thread ( void * a_net) s_net_states_proc(l_net); pthread_mutex_lock( &PVT(l_net)->state_mutex ); - int l_timeout_ms = 3000;// 3 sec + int l_timeout_ms = 20000;// 20 sec // prepare for signal waiting struct timespec l_to; clock_gettime(CLOCK_MONOTONIC, &l_to); @@ -670,7 +710,7 @@ int dap_chain_net_init() static int s_cli_net(int argc, char ** argv, char **a_str_reply) { int arg_index=1; - dap_chain_net_t * l_net; + dap_chain_net_t * l_net = NULL; int ret = dap_chain_node_cli_cmd_values_parse_net_chain(&arg_index,argc,argv,a_str_reply,NULL,&l_net); if ( l_net ){ @@ -952,7 +992,7 @@ int dap_chain_net_load(const char * a_net_name) DAP_DELETE( l_seed_nodes_ipv4); DAP_DELETE(l_seed_nodes_addrs); - if ( l_node_alias_str ){ + if ( l_node_addr_str || l_node_alias_str ){ dap_chain_node_addr_t * l_node_addr; if ( l_node_addr_str == NULL) l_node_addr = dap_chain_node_alias_find(l_net, l_node_alias_str); @@ -971,6 +1011,8 @@ int dap_chain_net_load(const char * a_net_name) } if ( l_node_addr ) { char *l_addr_hash_str = dap_chain_node_addr_to_hash_str(l_node_addr); + // save current node address + dap_db_set_cur_node_addr(l_node_addr->uint64); if(!l_addr_hash_str){ log_it(L_ERROR,"Can't get hash string for node address!"); } else { @@ -986,6 +1028,9 @@ int dap_chain_net_load(const char * a_net_name) } } } + else{ + log_it(L_WARNING, "Not present our own address %s in database", (l_node_alias_str) ? l_node_alias_str: ""); + } } @@ -1173,6 +1218,11 @@ dap_chain_node_addr_t * dap_chain_net_get_cur_addr( dap_chain_net_t * l_net) return PVT(l_net)->node_info? &PVT(l_net)->node_info->hdr.address: PVT(l_net)->node_addr; } +dap_chain_cell_id_t * dap_chain_net_get_cur_cell( dap_chain_net_t * l_net) +{ + return PVT(l_net)->node_info? &PVT(l_net)->node_info->hdr.cell_id: 0; +} + /** * @brief dap_chain_net_proc_datapool * @param a_net diff --git a/dap_chain_net.h b/dap_chain_net.h index fb904da4a30bb0710f652a8e2de48e11e0e7a6d4..08ec015c8a64910bef9dc8f6b83c9aed94b4fae8 100644 --- a/dap_chain_net.h +++ b/dap_chain_net.h @@ -89,6 +89,7 @@ dap_ledger_t * dap_chain_ledger_by_net_name( const char * a_net_name); dap_chain_t * dap_chain_net_get_chain_by_name( dap_chain_net_t * l_net, const char * a_name); dap_chain_node_addr_t * dap_chain_net_get_cur_addr( dap_chain_net_t * l_net); +dap_chain_cell_id_t * dap_chain_net_get_cur_cell( dap_chain_net_t * l_net); void dap_chain_net_links_connect(dap_chain_net_t * a_net); diff --git a/dap_chain_node_cli_cmd.c b/dap_chain_node_cli_cmd.c index 4b0611d91aa78361db310a7535562020ed959f12..015907c10ff0619beafe27c6053f12a70570d57d 100644 --- a/dap_chain_node_cli_cmd.c +++ b/dap_chain_node_cli_cmd.c @@ -681,7 +681,7 @@ int com_node(int a_argc, char ** a_argv, char **a_str_reply) dap_chain_node_addr_t l_node_addr={0}; const char *l_addr_str = NULL, *alias_str = NULL; const char * l_net_str = NULL; -// find addr, alias + // find addr, alias dap_chain_node_cli_find_option_val(a_argv, arg_index, a_argc, "-addr", &l_addr_str); dap_chain_node_cli_find_option_val(a_argv, arg_index, a_argc, "-alias", &alias_str); dap_chain_node_cli_find_option_val(a_argv, arg_index, a_argc, "-net", &l_net_str); @@ -763,7 +763,7 @@ int com_node(int a_argc, char ** a_argv, char **a_str_reply) return -1; } // wait connected - int timeout_ms = 15000; //15 sec = 15000 ms + int timeout_ms = 5000; //5 sec = 5000 ms int res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_CONNECTED, timeout_ms); if(res ) { dap_chain_node_cli_set_reply_text(a_str_reply, "no response from node: code %d",res); @@ -775,19 +775,28 @@ int com_node(int a_argc, char ** a_argv, char **a_str_reply) log_it(L_NOTICE, "Stream connection established, now lets sync all"); dap_stream_ch_chain_sync_request_t l_sync_request = { { 0 } }; dap_stream_ch_t * l_ch_chain = dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id()); - // fill begin time - l_sync_request.ts_start = (uint64_t) dap_db_log_get_last_timestamp_remote( + // fill begin id + l_sync_request.id_start = (uint64_t) dap_db_log_get_last_id_remote( l_remote_node_info->hdr.address.uint64); - // fill end time = 0 - no time limit - //l_sync_request.ts_end = (time_t) time(NULL); + // fill end id = 0 - no time limit + //l_sync_request.ts_end = 0; // fill current node address l_sync_request.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ? dap_chain_net_get_cur_addr(l_net)->uint64 : dap_db_get_cur_node_addr(); dap_chain_id_t l_chain_id_null = { { 0 } }; dap_chain_cell_id_t l_chain_cell_id_null = { { 0 } }; - log_it(L_INFO, "Requested GLOBAL_DB syncronizatoin, %llu:%llu period", l_sync_request.ts_start, - l_sync_request.ts_end); + l_chain_id_null.uint64 = l_net->pub.id.uint64; + l_chain_cell_id_null.uint64 = dap_chain_net_get_cur_cell(l_net) ? dap_chain_net_get_cur_cell(l_net)->uint64 : 0; + + log_it(L_INFO, "Requested GLOBAL_DB syncronizatoin, %llu:%llu period", l_sync_request.id_start, + l_sync_request.id_end); + // copy l_sync_request to current + //dap_stream_ch_chain_t * l_s_ch_chain = DAP_STREAM_CH_CHAIN(l_ch_chain); + //l_s_ch_chain->request_net_id.uint64 = l_net->pub.id.uint64; + //l_s_ch_chain->request_cell_id.uint64 = l_chain_cell_id_null.uint64; + //memcpy(&l_s_ch_chain->request, &l_sync_request, sizeof(l_sync_request)); + if(0 == dap_stream_ch_chain_pkt_write(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id, l_chain_id_null, l_chain_cell_id_null, &l_sync_request, sizeof(l_sync_request))) { @@ -799,13 +808,13 @@ int com_node(int a_argc, char ** a_argv, char **a_str_reply) } dap_stream_ch_set_ready_to_write(l_ch_chain, true); // wait for finishing of request - timeout_ms = 120000; // 2 min = 120 sec = 120 000 ms + timeout_ms = 120000; // 20 min = 1200 sec = 1 200 000 ms // TODO add progress info to console res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); if ( res <0 ){ dap_chain_node_client_close(l_node_client); DAP_DELETE(l_remote_node_info); - dap_chain_node_cli_set_reply_text(a_str_reply, "Error: can't connect to node "NODE_ADDR_FP_STR, + dap_chain_node_cli_set_reply_text(a_str_reply, "Error: can't sync with node "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr)); return -2; diff --git a/dap_chain_node_client.c b/dap_chain_node_client.c index 15635f81a9e11a5a8022eb0f87a8996295e2d29d..25dce5bc72c2a8e3736ed22e50a77d6f5f110a20 100644 --- a/dap_chain_node_client.c +++ b/dap_chain_node_client.c @@ -132,7 +132,7 @@ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) { dap_chain_node_client_t *l_node_client = a_client->_inheritor; - assert(l_node_client); + //assert(l_node_client); if(l_node_client) { log_it(L_NOTICE,"Stream connection with node " NODE_ADDR_FP_STR " established", NODE_ADDR_FP_ARGS_S( l_node_client->remote_node_addr) ); @@ -195,11 +195,11 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha l_request = (dap_stream_ch_chain_sync_request_t* ) a_pkt->data; if ( l_request ){ - if ( l_request->ts_start < (uint64_t) dap_db_log_get_last_timestamp() ){ + if ( l_request->id_start < (uint64_t) dap_db_log_get_last_id() ){ log_it(L_INFO, "Remote is synced but we have updates for it"); // Get log diff - a_ch_chain->request_last_ts = dap_db_log_get_last_timestamp(); - dap_list_t *l_list = dap_db_log_get_list((time_t) l_request->ts_start); + a_ch_chain->request_last_ts = dap_db_log_get_last_id(); + dap_list_t *l_list = dap_db_log_get_list((time_t) l_request->id_start); if ( l_list ) { // Add it to outgoing list @@ -210,6 +210,12 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha a_ch_chain->request_chain_id.uint64 = a_pkt->hdr.chain_id.uint64; a_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB ; + dap_chain_node_addr_t l_node_addr = { 0 }; + l_node_addr.uint64 = dap_db_get_cur_node_addr(); + dap_stream_ch_chain_pkt_write(a_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, + a_ch_chain->request_net_id, a_ch_chain->request_chain_id, + a_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); + log_it(L_INFO, "Sync for remote tr_count=%d",dap_list_length(l_list)); dap_stream_ch_set_ready_to_write(a_ch_chain->ch, true); } @@ -319,13 +325,15 @@ dap_chain_node_client_t* dap_chain_node_client_connect(dap_chain_node_info_t *a_ void dap_chain_node_client_close(dap_chain_node_client_t *a_client) { if(a_client) { - // clean client - //dap_client_delete(a_client->client); pthread_mutex_lock(&a_client->wait_mutex); a_client->client->_inheritor = NULL;// because client->_inheritor == a_client pthread_mutex_unlock(&a_client->wait_mutex); + // clean client + //dap_client_delete(a_client->client); + //a_client->client = NULL; + pthread_cond_destroy(&a_client->wait_cond); pthread_mutex_destroy(&a_client->wait_mutex); DAP_DELETE(a_client);