diff --git a/dap_chain_net.c b/dap_chain_net.c index caf511e6538b2eb63b2dd520bee360e01e8f8aa0..b0455f71acd560860351a417cef7b18b93b59ea2 100644 --- a/dap_chain_net.c +++ b/dap_chain_net.c @@ -40,6 +40,9 @@ #include "dap_chain_node_cli.h" #include "dap_chain_node_cli_cmd.h" +#include "dap_chain_global_db.h" +#include "dap_chain_global_db_remote.h" + #include "dap_stream_ch_chain_net_pkt.h" #include "dap_stream_ch_chain_net.h" #include "dap_stream_ch_chain.h" @@ -227,7 +230,7 @@ lb_proc_state: HASH_ITER(hh,PVT(l_net)->links,l_node_client,l_node_client_tmp){ dap_stream_ch_chain_sync_request_t l_sync_gdb = {{0}}; // Get last timestamp in log - l_sync_gdb.ts_start = (uint64_t) dap_db_log_get_last_timestamp(); + l_sync_gdb.ts_start = (uint64_t) dap_db_log_get_last_timestamp_remote(l_node_client->remote_node_addr.uint64); l_sync_gdb.ts_end = (uint64_t) time(NULL); uint8_t l_ch_id = dap_stream_ch_chain_get_id(); // Channel id for global_db sync int res = dap_chain_node_client_send_ch_pkt(l_node_client, l_ch_id, diff --git a/dap_chain_node_cli.c b/dap_chain_node_cli.c index 47659d749b629c08e28178fb610d2c8c02777d76..fa51f86ac79b5b0346607bd18810d2d2a0ba7575 100644 --- a/dap_chain_node_cli.c +++ b/dap_chain_node_cli.c @@ -300,9 +300,11 @@ static void* thread_one_client_func(void *args) } char *reply_body = dap_strdup_printf("ret_code: %d\r\n%s\r\n", res, (str_reply) ? str_reply : ""); // return the result of the command function - char *reply_str = dap_strdup_printf("HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\n%s", + char *reply_str = dap_strdup_printf("HTTP/1.1 200 OK\r\n" + "Content-Length: %d\r\n\r\n" + "%s", strlen(reply_body), reply_body); - int ret = send(newsockfd, reply_str, strlen(reply_str) + 1, 1000); + int ret = send(newsockfd, reply_str, strlen(reply_str) ,0); DAP_DELETE(str_reply); DAP_DELETE(reply_str); DAP_DELETE(reply_body); diff --git a/dap_chain_node_cli_cmd.c b/dap_chain_node_cli_cmd.c index 8c2d2581222b6d3c87bedc39b532c05c531d2b08..1ee99b542e21cff0d18151a2315addf3821f0959 100644 --- a/dap_chain_node_cli_cmd.c +++ b/dap_chain_node_cli_cmd.c @@ -933,34 +933,59 @@ int com_node(int argc, const char ** argv, char **str_reply) DAP_DELETE(l_remote_node_info); return -1; } + log_it(L_NOTICE, "Stream connection established, now lets sync all"); + dap_stream_ch_chain_sync_request_t l_sync_request = {{0}}; + dap_stream_ch_t * l_ch_chain = dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id() ); + l_sync_request.ts_start = (uint64_t) dap_db_log_get_last_timestamp_remote( l_remote_node_info->hdr.address.uint64 ); + //l_sync_request.ts_end = (time_t) time(NULL); + l_sync_request.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net)?dap_chain_net_get_cur_addr(l_net)->uint64: + dap_db_get_cur_node_addr(); + dap_chain_id_t l_chain_id_null = {{0}}; + dap_chain_cell_id_t l_chain_cell_id_null = {{0}}; + log_it(L_INFO,"Requested GLOBAL_DB syncronizatoin, %llu:%llu period", l_sync_request.ts_start, + l_sync_request.ts_end ) ; + if( 0 == dap_stream_ch_chain_pkt_write(l_ch_chain,DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, + l_net->pub.id, l_chain_id_null ,l_chain_cell_id_null ,&l_sync_request, + sizeof (l_sync_request))) { + dap_chain_node_cli_set_reply_text(str_reply, "Error: Cant send sync chains request"); + // clean client struct + dap_chain_node_client_close(l_node_client); + DAP_DELETE(l_remote_node_info); + return -1; + } + dap_stream_ch_set_ready_to_write(l_ch_chain,true); + // wait for finishing of request + timeout_ms = 120000; // 2 min = 120 sec = 120 000 ms + // TODO add progress info to console + res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); + // Requesting chains dap_chain_t *l_chain = NULL; - DL_FOREACH(l_net->pub.chains, l_chain) { // send request - // Get last timestamp in log dap_stream_ch_chain_sync_request_t l_sync_request = {{0}}; - l_sync_request.ts_start = (uint64_t) dap_db_log_get_last_timestamp(); - dap_stream_ch_t * l_ch_chain = dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id() ); - if( 0 == dap_stream_ch_chain_pkt_write(l_ch_chain,DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_ALL, + if( 0 == dap_stream_ch_chain_pkt_write(l_ch_chain,DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id, l_chain->id ,l_remote_node_info->hdr.cell_id,&l_sync_request, sizeof (l_sync_request))) { - dap_chain_node_cli_set_reply_text(str_reply, "no request sent"); + dap_chain_node_cli_set_reply_text(str_reply, "Error: Cant send sync chains request"); // clean client struct dap_chain_node_client_close(l_node_client); DAP_DELETE(l_remote_node_info); return -1; } + log_it(L_NOTICE, "Requested syncronization for chain \"%s\"",l_chain->name); + dap_stream_ch_set_ready_to_write(l_ch_chain,true); // wait for finishing of request timeout_ms = 120000; // 2 min = 120 sec = 120 000 ms // TODO add progress info to console res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); } + log_it(L_INFO,"Chains and gdb are synced"); DAP_DELETE(l_remote_node_info); dap_client_disconnect(l_node_client->client); dap_chain_node_client_close(l_node_client); - dap_chain_node_cli_set_reply_text(str_reply, "Node sync completed"); + dap_chain_node_cli_set_reply_text(str_reply, "Node sync completed: Chains and gdb are synced"); return 0; } break; diff --git a/dap_chain_node_client.c b/dap_chain_node_client.c index 9a29a093ef46bdc628c1cd51cef1434095df3f41..d837f15768ba32c65a34420465b1ae86800bc84f 100644 --- a/dap_chain_node_client.c +++ b/dap_chain_node_client.c @@ -49,7 +49,10 @@ static int listen_port_tcp = 8079; static void s_stage_end_callback(dap_client_t *a_client, void *a_arg); -static void s_ch_chain_callback_notify_packet(dap_stream_ch_chain_t*, uint8_t a_pkt_type, +static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t*, uint8_t a_pkt_type, + dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, + void * a_arg); +static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type, dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg); @@ -124,19 +127,25 @@ static void s_stage_end_callback(dap_client_t *a_client, void *a_arg) dap_chain_node_client_t *l_node_client = a_client->_inheritor; assert(l_node_client); if(l_node_client) { + log_it(L_NOTICE,"Stream connection with node 0x%016X established", l_node_client->remote_node_addr.uint64); pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_CONNECTED; dap_stream_ch_t * l_ch = dap_client_get_stream_ch( a_client , dap_stream_ch_chain_get_id() ); if (l_ch){ dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); - l_ch_chain->notify_callback = s_ch_chain_callback_notify_packet; - l_ch_chain->notify_callback_arg = l_node_client; + l_ch_chain->callback_notify_packet_out = s_ch_chain_callback_notify_packet_out; + l_ch_chain->callback_notify_packet_in = s_ch_chain_callback_notify_packet_in; + l_ch_chain->callback_notify_arg = l_node_client; + }else { + log_it(L_WARNING,"No ch_chain channel, can't init notify callback for pkt type CH_CHAIN"); } + pthread_mutex_unlock(&l_node_client->wait_mutex); if ( l_node_client->callback_connected ) l_node_client->callback_connected(l_node_client,a_arg); l_node_client->keep_connection = true; + log_it(L_DEBUG,"Wakeup all who waits"); pthread_cond_signal(&l_node_client->wait_cond); } } @@ -148,10 +157,73 @@ static void s_stage_end_callback(dap_client_t *a_client, void *a_arg) * @param a_pkt_data_size * @param a_arg */ -static void s_ch_chain_callback_notify_packet(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type, +static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type, dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg) { + dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; + switch (a_pkt_type) { + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS:{ + dap_stream_ch_chain_sync_request_t * l_request = NULL; + //if ( a_pkt_data_size == sizeof ( *l_request)) + // l_request = (dap_stream_ch_chain_sync_request_t* ) a_pkt->data; + + if ( l_request ){ + if ( l_request->ts_start < (uint64_t) dap_db_log_get_last_timestamp() ){ + log_it(L_INFO, "Remote is synced all but we have updates for it"); + // Get log diff + a_ch_chain->request_last_ts = dap_db_log_get_last_timestamp(); + dap_list_t *l_list = dap_db_log_get_list((time_t) l_request->ts_start); + + if ( l_list ) { + // Add it to outgoing list + l_list->next = a_ch_chain->request_global_db_trs; + a_ch_chain->request_global_db_trs = l_list; + a_ch_chain->request_net_id.uint64 = a_pkt->hdr.net_id.uint64; + a_ch_chain->request_cell_id.uint64 = a_pkt->hdr.cell_id.uint64; + a_ch_chain->request_chain_id.uint64 = a_pkt->hdr.chain_id.uint64; + a_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB ; + + log_it(L_INFO, "Sync for remote tr_count=%d",dap_list_length(l_list)); + dap_stream_ch_set_ready_to_write(a_ch_chain->ch, true); + } + }else { + log_it(L_INFO, "Remote node has lastes ts for us"); + pthread_mutex_lock(&l_node_client->wait_mutex); + l_node_client->state = NODE_CLIENT_STATE_SYNCED; + pthread_mutex_unlock(&l_node_client->wait_mutex); + pthread_cond_signal(&l_node_client->wait_cond); + } + }else { + log_it(L_INFO, "Sync notify without request to sync back, stay in SYNCED state"); + pthread_mutex_lock(&l_node_client->wait_mutex); + l_node_client->state = NODE_CLIENT_STATE_SYNCED; + pthread_mutex_unlock(&l_node_client->wait_mutex); + pthread_cond_signal(&l_node_client->wait_cond); + } + + } + default:{} + } +} + +/** + * @brief s_ch_chain_callback_notify_packet_in + * @param a_ch_chain + * @param a_pkt_type + * @param a_pkt + * @param a_pkt_data_size + * @param a_arg + */ +static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type, + dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, + void * a_arg) +{ + (void) a_pkt; + (void) a_pkt_data_size; + (void) a_ch_chain; dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; switch (a_pkt_type) { case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: @@ -161,12 +233,11 @@ static void s_ch_chain_callback_notify_packet(dap_stream_ch_chain_t* a_ch_chain, l_node_client->state = NODE_CLIENT_STATE_SYNCED; pthread_mutex_unlock(&l_node_client->wait_mutex); pthread_cond_signal(&l_node_client->wait_cond); - } + }break; default:{} } } - /** * Create connection to server * @@ -186,7 +257,7 @@ dap_chain_node_client_t* dap_chain_node_client_connect(dap_chain_node_info_t *no l_node_client->events = NULL; //dap_events_new(); l_node_client->client = dap_client_new(l_node_client->events, stage_status_callback, s_stage_status_error_callback); l_node_client->client->_inheritor = l_node_client; - dap_client_set_active_channels(l_node_client->client,"NC"); + dap_client_set_active_channels(l_node_client->client,"CN"); int hostlen = 128; char host[hostlen]; @@ -207,11 +278,11 @@ dap_chain_node_client_t* dap_chain_node_client_connect(dap_chain_node_info_t *no } dap_client_set_uplink(l_node_client->client, strdup(host), listen_port_tcp); // dap_client_stage_t a_stage_target = STAGE_ENC_INIT; - dap_client_stage_t a_stage_target = STAGE_STREAM_STREAMING; + dap_client_stage_t l_stage_target = STAGE_STREAM_STREAMING; l_node_client->state = NODE_CLIENT_STATE_CONNECT; // Handshake & connect - dap_client_go_stage(l_node_client->client, a_stage_target, s_stage_end_callback); + dap_client_go_stage(l_node_client->client, l_stage_target, s_stage_end_callback); return l_node_client; } @@ -284,6 +355,7 @@ int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_s to.tv_nsec = (long) nsec_new; // signal waiting do { + int wait = pthread_cond_timedwait(&a_client->wait_cond, &a_client->wait_mutex, &to); if(wait == 0 && a_client->state == a_waited_state) { ret = 1;