diff --git a/dap_chain_net.c b/dap_chain_net.c index 4113fb1c2bdb828c2459ae5c84534115d04d4f26..5882be76efe44b0bfb0a7f2704d5107aee211ddd 100644 --- a/dap_chain_net.c +++ b/dap_chain_net.c @@ -291,7 +291,7 @@ lb_proc_state: dap_chain_node_addr_t l_address; l_address.uint64 = dap_chain_net_get_cur_addr(l_net) ? dap_chain_net_get_cur_addr(l_net)->uint64 : - dap_db_get_cur_node_addr(); + dap_db_get_cur_node_addr(l_net->pub.name); // get current node info dap_chain_node_info_t *l_cur_node_info = dap_chain_node_info_read(l_net, &l_address); @@ -413,7 +413,13 @@ lb_proc_state: case NODE_ROLE_MASTER:{ PVT(l_net)->state = NET_STATE_ADDR_REQUEST; } break; - default: PVT( l_net)->state = NET_STATE_SYNC_GDB; + default:{ + // get addr for current node if it absent + if(dap_chain_net_get_cur_addr_int(l_net)) + PVT(l_net)->state = NET_STATE_ADDR_REQUEST; + else + PVT( l_net)->state = NET_STATE_SYNC_GDB; + } } }pthread_mutex_unlock(&PVT(l_net)->state_mutex ); goto lb_proc_state; case NET_STATE_SYNC_GDB: // we need only to sync gdb @@ -447,13 +453,19 @@ lb_proc_state: default:{} } }break; + // get addr for remote node case NET_STATE_ADDR_REQUEST:{ dap_chain_node_client_t * l_node_client = NULL, *l_node_client_tmp = NULL; HASH_ITER(hh,PVT(l_net)->links,l_node_client,l_node_client_tmp){ uint8_t l_ch_id = dap_stream_ch_chain_net_get_id(); // Channel id for chain net request - size_t res = dap_stream_ch_chain_net_pkt_write(dap_client_get_stream_ch(l_node_client->client, - l_ch_id), DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST, l_net->pub.id, - NULL, 0 ); + dap_stream_ch_t * l_ch_chain = dap_client_get_stream_ch(l_node_client->client, l_ch_id); + // set callback for l_ch_id + dap_chain_node_client_set_callbacks( l_node_client->client, l_ch_id); + // send request for new address + size_t res = dap_stream_ch_chain_net_pkt_write(l_ch_chain, + DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE_REQUEST, + //DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST, + l_net->pub.id, NULL, 0); if(res == 0) { log_it(L_WARNING,"Can't send NODE_ADDR_REQUEST packet"); HASH_DEL(PVT(l_net)->links,l_node_client); @@ -462,13 +474,18 @@ lb_proc_state: } // wait for finishing of request - int timeout_ms = 5000; // 2 min = 120 sec = 120 000 ms + int timeout_ms = 5000; // 5 sec = 5 000 ms // TODO add progress info to console PVT(l_net)->addr_request_attempts++; int l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_NODE_ADDR_LEASED, timeout_ms); switch (l_res) { case -1: log_it(L_WARNING,"Timeout with addr leasing"); + // try again 3 times + if(PVT(l_net)->addr_request_attempts < 3) { + pthread_mutex_unlock(&PVT(l_net)->state_mutex); + goto lb_proc_state; + } continue; // try with another link case 0: log_it(L_INFO, "Node address leased"); @@ -500,7 +517,7 @@ lb_proc_state: l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ? dap_chain_net_get_cur_addr(l_net)->uint64 : - dap_db_get_cur_node_addr(); + dap_db_get_cur_node_addr(l_net->pub.name); dap_chain_id_t l_chain_id_null = { { 0 } }; dap_chain_cell_id_t l_chain_cell_id_null = { { 0 } }; @@ -979,7 +996,7 @@ static int s_cli_net( int argc, char **argv, char **a_str_reply) if ( strcmp(l_get_str,"status") == 0 ) { // get current node address dap_chain_node_addr_t l_cur_node_addr = { 0 }; - l_cur_node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ? dap_chain_net_get_cur_addr(l_net)->uint64 : dap_db_get_cur_node_addr(); + l_cur_node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ? dap_chain_net_get_cur_addr(l_net)->uint64 : dap_db_get_cur_node_addr(l_net->pub.name); if(!l_cur_node_addr.uint64) { dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" has state %s (target state %s), active links %u from %u, cur node address not defined", @@ -1258,7 +1275,7 @@ int s_net_load(const char * a_net_name) if ( l_node_addr ) { char *l_addr_hash_str = dap_chain_node_addr_to_hash_str(l_node_addr); // save current node address - dap_db_set_cur_node_addr(l_node_addr->uint64); + dap_db_set_cur_node_addr(l_node_addr->uint64, l_net->pub.name); if(!l_addr_hash_str){ log_it(L_ERROR,"Can't get hash string for node address!"); } else { @@ -1345,7 +1362,6 @@ int s_net_load(const char * a_net_name) } break; case NODE_ROLE_CELL_MASTER: case NODE_ROLE_MASTER:{ - uint16_t l_proc_chains_count=0; char ** l_proc_chains = dap_config_get_array_str(l_cfg,"role-master" , "proc_chains", &l_proc_chains_count ); for ( size_t i = 0; i< l_proc_chains_count ; i++){ @@ -1525,6 +1541,12 @@ dap_chain_node_addr_t * dap_chain_net_get_cur_addr( dap_chain_net_t * l_net) return PVT(l_net)->node_info? &PVT(l_net)->node_info->hdr.address: PVT(l_net)->node_addr; } +uint64_t dap_chain_net_get_cur_addr_int(dap_chain_net_t * l_net) +{ + return dap_chain_net_get_cur_addr(l_net) ? dap_chain_net_get_cur_addr(l_net)->uint64 : + dap_db_get_cur_node_addr(l_net->pub.name); +} + dap_chain_cell_id_t * dap_chain_net_get_cur_cell( dap_chain_net_t * l_net) { return PVT(l_net)->node_info? &PVT(l_net)->node_info->hdr.cell_id: 0; diff --git a/dap_chain_net.h b/dap_chain_net.h index 9cff00b5e499d4e780252e4ca17f83cd4bc67ce1..05d9e73f13bd4e687299c83e89e91d342501908a 100644 --- a/dap_chain_net.h +++ b/dap_chain_net.h @@ -102,6 +102,7 @@ dap_ledger_t * dap_chain_ledger_by_net_name( const char * a_net_name); dap_chain_t * dap_chain_net_get_chain_by_name( dap_chain_net_t * l_net, const char * a_name); dap_chain_node_addr_t * dap_chain_net_get_cur_addr( dap_chain_net_t * l_net); +uint64_t dap_chain_net_get_cur_addr_int(dap_chain_net_t * l_net); dap_chain_cell_id_t * dap_chain_net_get_cur_cell( dap_chain_net_t * l_net); void dap_chain_net_links_connect(dap_chain_net_t * a_net); diff --git a/dap_chain_node_cli_cmd.c b/dap_chain_node_cli_cmd.c index 98208732c697f76631423cc65f1a8c0cd2c7a672..505008291e708c44740dedf35bd88e3531ca0177 100644 --- a/dap_chain_node_cli_cmd.c +++ b/dap_chain_node_cli_cmd.c @@ -289,7 +289,7 @@ static int node_info_del_with_reply(dap_chain_net_t * a_net, dap_chain_node_info return -1; } // check, current node have this addr or no - uint64_t l_cur_addr = dap_db_get_cur_node_addr(); + uint64_t l_cur_addr = dap_db_get_cur_node_addr(a_net->pub.name); if(l_cur_addr && l_cur_addr == a_node_info->hdr.address.uint64) { dap_chain_node_cli_set_reply_text(str_reply, "current node cannot be deleted"); return -1; @@ -1048,18 +1048,82 @@ int com_node(int a_argc, char ** a_argv, char **a_str_reply) DAP_DELETE(l_remote_node_info); return -1; } - log_it(L_NOTICE, "Stream connection established, now lets sync all"); + + log_it(L_NOTICE, "Stream connection established"); dap_stream_ch_chain_sync_request_t l_sync_request = { { 0 } }; - dap_stream_ch_t * l_ch_chain = dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id()); - // fill begin id - l_sync_request.id_start = (uint64_t) dap_db_log_get_last_id_remote( - l_remote_node_info->hdr.address.uint64); - // fill end id = 0 - no time limit - //l_sync_request.ts_end = 0; - // fill current node address - l_sync_request.node_addr.uint64 = - dap_chain_net_get_cur_addr(l_net) ? dap_chain_net_get_cur_addr(l_net)->uint64 : - dap_db_get_cur_node_addr(); + dap_stream_ch_t * l_ch_chain = dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id()); + // fill begin id + l_sync_request.id_start = (uint64_t) dap_db_log_get_last_id_remote( + l_remote_node_info->hdr.address.uint64); + // fill end id = 0 - no time limit + //l_sync_request.ts_end = 0; + // fill current node address + l_sync_request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); + + // if need to get current node address (feature-2630) + if(!l_sync_request.node_addr.uint64 ) + { + log_it(L_NOTICE, "Now get node addr"); + uint8_t l_ch_id = dap_stream_ch_chain_net_get_id(); + dap_stream_ch_t * l_ch_chain = dap_client_get_stream_ch(l_node_client->client, l_ch_id); + + int l_res = dap_chain_node_client_set_callbacks( l_node_client->client, l_ch_id); + + size_t res = dap_stream_ch_chain_net_pkt_write(l_ch_chain, + DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE_REQUEST, + //DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST, + l_net->pub.id, + NULL, 0); + if(res == 0) { + log_it(L_WARNING, "Can't send DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST packet"); + dap_chain_node_client_close(l_node_client); + DAP_DELETE(l_remote_node_info); + return -1; + } + int timeout_ms = 15000; // 15 sec = 15 000 ms + l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_NODE_ADDR_LEASED, timeout_ms); + switch (l_res) { + case 0: + if(l_node_client->cur_node_addr.uint64 != 0) { + + l_sync_request.node_addr.uint64 = l_node_client->cur_node_addr.uint64; + log_it(L_INFO, "Node address leased"); + l_sync_request.node_addr.uint64 = l_node_client->cur_node_addr.uint64; + // save cur address + // already saved + // dap_db_set_cur_node_addr_exp(l_sync_request.node_addr.uint64, l_net->pub.name); + } + else + log_it(L_WARNING, "Node address leased wrong!"); + break; + case -1: + log_it(L_WARNING, "Timeout with addr leasing"); + default: + if(l_res != -1) + log_it(L_WARNING, "Node address request error %d", l_res); + /*dap_chain_node_client_close(l_node_client); + DAP_DELETE(l_remote_node_info); + return -1;*/ + } + /* if(0 == dap_stream_ch_chain_pkt_write(l_ch_chain, DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST, + l_net->pub.id, l_chain_id_null, l_chain_cell_id_null, &l_sync_request, + sizeof(l_sync_request))) { + dap_chain_node_cli_set_reply_text(a_str_reply, "Error: Cant send sync chains request"); + // clean client struct + dap_chain_node_client_close(l_node_client); + DAP_DELETE(l_remote_node_info); + return -1; + } + dap_stream_ch_set_ready_to_write(l_ch_chain, true); + // wait for finishing of request + timeout_ms = 120000; // 20 min = 1200 sec = 1 200 000 ms + // TODO add progress info to console + res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); + */ + + } + log_it(L_NOTICE, "Now lets sync all"); + dap_chain_id_t l_chain_id_null = { { 0 } }; dap_chain_cell_id_t l_chain_cell_id_null = { { 0 } }; l_chain_id_null.uint64 = l_net->pub.id.uint64; @@ -1150,7 +1214,7 @@ int com_node(int a_argc, char ** a_argv, char **a_str_reply) dap_chain_node_info_t *node_info = node_info_read_and_reply(l_net, &l_node_addr, a_str_reply); if(!node_info) return -6; - int timeout_ms = 10000; //10 sec = 10000 ms + int timeout_ms = 5000; //5 sec = 5000 ms // start handshake dap_chain_node_client_t *client = dap_chain_node_client_connect(node_info); if(!client) { diff --git a/dap_chain_node_client.c b/dap_chain_node_client.c index cae2d63969d7f305ec3349cb2394e3eae56c2170..4a7e56ce9d245462dfb8f81e1517ed1088fae403 100644 --- a/dap_chain_node_client.c +++ b/dap_chain_node_client.c @@ -54,6 +54,8 @@ #include "dap_stream_ch_chain_net.h" #include "dap_stream_ch_chain_net_pkt.h" #include "dap_stream_pkt.h" + +//#include "dap_chain_common.h" #include "dap_chain_node_client.h" #define LOG_TAG "dap_chain_node_client" @@ -192,7 +194,57 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) } /** - * @brief s_ch_chain_callback_notify_packet + * @brief s_ch_chain_callback_notify_packet_in2 - for dap_stream_ch_chain_net + * @param a_ch_chain_net + * @param a_pkt_type + * @param a_pkt_net + * @param a_pkt_data_size + * @param a_arg + */ +static void s_ch_chain_callback_notify_packet_in2(dap_stream_ch_chain_net_t* a_ch_chain_net, uint8_t a_pkt_type, + dap_stream_ch_chain_net_pkt_t *a_pkt_net, size_t a_pkt_net_data_size, void * a_arg) +{ + dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; + switch (a_pkt_type) { + // get new generated current node address + case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE: { + if(a_pkt_net_data_size == sizeof(dap_chain_node_addr_t)) { + dap_chain_node_addr_t *l_addr = (dap_chain_node_addr_t *) a_pkt_net->data; + memcpy(&l_node_client->cur_node_addr, l_addr, sizeof(dap_chain_node_addr_t)); + } + pthread_mutex_lock(&l_node_client->wait_mutex); + l_node_client->state = NODE_CLIENT_STATE_NODE_ADDR_LEASED; + pthread_mutex_unlock(&l_node_client->wait_mutex); +#ifndef _WIN32 + pthread_cond_signal(&l_node_client->wait_cond); +#else + SetEvent( l_node_client->wait_cond ); + #endif + break; + } + // get remote node address + case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR: { + + if(a_pkt_net_data_size == sizeof(dap_chain_node_addr_t)) { + dap_chain_node_addr_t *l_addr = (dap_chain_node_addr_t *) a_pkt_net->data; + memcpy(&l_node_client->remote_node_addr, l_addr, sizeof(dap_chain_node_addr_t)); + } + pthread_mutex_lock(&l_node_client->wait_mutex); + l_node_client->state = NODE_CLIENT_STATE_GET_NODE_ADDR; + pthread_mutex_unlock(&l_node_client->wait_mutex); +#ifndef _WIN32 + pthread_cond_signal(&l_node_client->wait_cond); +#else + SetEvent( l_node_client->wait_cond ); +#endif + break; + } + } +} + +/** + * @brief s_ch_chain_callback_notify_packet_in - for dap_stream_ch_chain + * @param a_ch_chain * @param a_pkt_type * @param a_pkt * @param a_pkt_data_size @@ -219,16 +271,6 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha SetEvent( l_node_client->wait_cond ); #endif - case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE: - pthread_mutex_lock(&l_node_client->wait_mutex); - l_node_client->state = NODE_CLIENT_STATE_NODE_ADDR_LEASED; - pthread_mutex_unlock(&l_node_client->wait_mutex); -#ifndef _WIN32 - pthread_cond_signal(&l_node_client->wait_cond); -#else - SetEvent( l_node_client->wait_cond ); -#endif - break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { @@ -253,7 +295,8 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha a_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB; dap_chain_node_addr_t l_node_addr = { 0 }; - l_node_addr.uint64 = dap_db_get_cur_node_addr(); + dap_chain_net_t *l_net = dap_chain_net_by_id(a_ch_chain->request_net_id); + l_node_addr.uint64 = dap_db_get_cur_node_addr(l_net->pub.name); dap_stream_ch_chain_pkt_write(a_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, a_ch_chain->request_net_id, a_ch_chain->request_chain_id, a_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); @@ -518,3 +561,34 @@ int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_s pthread_mutex_unlock(&a_client->wait_mutex); return ret; } + +int dap_chain_node_client_set_callbacks(dap_client_t *a_client, uint8_t a_ch_id) +{ + int l_ret = -1; + dap_chain_node_client_t *l_node_client = a_client->_inheritor; + if(l_node_client) { + pthread_mutex_lock(&l_node_client->wait_mutex); + // find current channel code + dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); + dap_stream_ch_t * l_ch = NULL; + if(l_client_internal) + l_ch = dap_client_get_stream_ch(a_client, a_ch_id); + if(l_ch) { + if(a_ch_id == dap_stream_ch_chain_get_id()) { + dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + l_ch_chain->callback_notify_packet_out = s_ch_chain_callback_notify_packet_out; + l_ch_chain->callback_notify_packet_in = s_ch_chain_callback_notify_packet_in; + l_ch_chain->callback_notify_arg = l_node_client; + } + if(a_ch_id == dap_stream_ch_chain_net_get_id()) { + dap_stream_ch_chain_net_t *l_ch_chain = DAP_STREAM_CH_CHAIN_NET(l_ch); + l_ch_chain->notify_callback = s_ch_chain_callback_notify_packet_in2; + l_ch_chain->notify_callback_arg = l_node_client; + } + l_ret = 0; + } else { + } + pthread_mutex_unlock(&l_node_client->wait_mutex); + } + return l_ret; +} diff --git a/dap_chain_node_client.h b/dap_chain_node_client.h index 2787f63311c61002da7d2c653fb67036e07b27a5..ad959a3e88f5d1d4d8ee1818c28d8d788dba15aa 100644 --- a/dap_chain_node_client.h +++ b/dap_chain_node_client.h @@ -68,6 +68,7 @@ typedef struct dap_chain_node_client { // For hash indexing UT_hash_handle hh; + dap_chain_node_addr_t cur_node_addr; dap_chain_node_addr_t remote_node_addr; struct in_addr remote_ipv4; struct in6_addr remote_ipv6; @@ -109,3 +110,5 @@ int dap_chain_node_client_send_ch_pkt(dap_chain_node_client_t *a_client, uint8_t */ int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_state, int a_timeout_ms); +int dap_chain_node_client_set_callbacks(dap_client_t *a_client, uint8_t a_ch_id); +