From 88d4deac8f521a9bee3d9b84c19580ee9421e856 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Thu, 22 Oct 2020 11:45:19 +0000 Subject: [PATCH] features-4577 --- CMakeLists.txt | 2 +- dap-sdk/net/client/dap_client.c | 1 + dap-sdk/net/client/dap_client_pvt.c | 9 +- dap-sdk/net/client/include/dap_client_pvt.h | 2 +- modules/channel/chain/dap_stream_ch_chain.c | 149 +++++++++--------- .../chain/include/dap_stream_ch_chain.h | 8 +- .../chain/include/dap_stream_ch_chain_pkt.h | 4 +- .../global-db/dap_chain_global_db_remote.c | 45 ++++-- .../include/dap_chain_global_db_remote.h | 10 +- modules/net/dap_chain_net.c | 77 +++------ modules/net/dap_chain_node_cli_cmd.c | 14 +- 11 files changed, 158 insertions(+), 163 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index dae2eb368f..c3b3b72aa9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.6-20") +set(CELLFRAME_SDK_NATIVE_VERSION "2.6-21") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/dap-sdk/net/client/dap_client.c b/dap-sdk/net/client/dap_client.c index 54cc987894..63ed2cee10 100644 --- a/dap-sdk/net/client/dap_client.c +++ b/dap-sdk/net/client/dap_client.c @@ -234,6 +234,7 @@ void dap_client_delete(dap_client_t * a_client) pthread_mutex_lock(&a_client->mutex); dap_client_pvt_delete_n_wait(DAP_CLIENT_PVT(a_client)); + a_client->_internal = NULL; pthread_mutex_unlock(&a_client->mutex); pthread_mutex_destroy(&a_client->mutex); DAP_DELETE(a_client); diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 857a6bdb1f..f440ae4f5d 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -150,6 +150,7 @@ static void s_client_pvt_disconnected(dap_client_t * a_client, void * a_arg ) (void) a_arg; // To be sure thats cond waiter is waiting and unlocked mutex pthread_mutex_lock(&DAP_CLIENT_PVT(a_client)->disconnected_mutex); + DAP_CLIENT_PVT(a_client)->stage_status = STAGE_STATUS_ABORTING; pthread_cond_broadcast(&DAP_CLIENT_PVT(a_client)->disconnected_cond); pthread_mutex_unlock(&DAP_CLIENT_PVT(a_client)->disconnected_mutex); } @@ -161,16 +162,15 @@ static void s_client_pvt_disconnected(dap_client_t * a_client, void * a_arg ) */ int dap_client_pvt_disconnect_all_n_wait(dap_client_pvt_t *a_client_pvt) { - //dap_client_pvt_t *a_client_pvt = (a_client) ? DAP_CLIENT_PVT(a_client) : NULL; if(!a_client_pvt) return -1; pthread_mutex_lock(&a_client_pvt->disconnected_mutex); dap_client_go_stage(a_client_pvt->client, STAGE_BEGIN, s_client_pvt_disconnected ); - while (a_client_pvt->stage != STAGE_BEGIN) { + while (a_client_pvt->stage_status != STAGE_STATUS_ABORTING) { pthread_cond_wait(&a_client_pvt->disconnected_cond, &a_client_pvt->disconnected_mutex); } pthread_mutex_unlock(&a_client_pvt->disconnected_mutex); - + a_client_pvt->stage_status = STAGE_STATUS_DONE; return 0; } @@ -214,6 +214,7 @@ void dap_client_pvt_delete_n_wait(dap_client_pvt_t * a_client_pvt) pthread_mutex_destroy( &a_client_pvt->disconnected_mutex); pthread_cond_destroy( &a_client_pvt->disconnected_cond); + DAP_DELETE(a_client_pvt); } /** @@ -248,7 +249,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) case STAGE_STREAM_CONNECTED: case STAGE_STREAM_STREAMING: dap_stream_delete(a_client_pvt->stream); - dap_events_socket_remove_and_delete_mt(a_client_pvt->worker, a_client_pvt->stream_es); + dap_events_socket_remove_and_delete_unsafe(a_client_pvt->stream_es, true); a_client_pvt->stream = NULL; a_client_pvt->stream_es = NULL; break; diff --git a/dap-sdk/net/client/include/dap_client_pvt.h b/dap-sdk/net/client/include/dap_client_pvt.h index 7ca76b349c..8af4b059d9 100644 --- a/dap-sdk/net/client/include/dap_client_pvt.h +++ b/dap-sdk/net/client/include/dap_client_pvt.h @@ -107,7 +107,7 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char dap_client_callback_int_t a_error_proc); void dap_client_pvt_new(dap_client_pvt_t * a_client_internal); -void dap_client_pvt_delete_n_wait(dap_client_pvt_t * a_client_pvts); +void dap_client_pvt_delete_n_wait(dap_client_pvt_t * a_client_pvt); //int dap_client_pvt_ref(dap_client_pvt_t * a_client_internal); //int dap_client_pvt_unref(dap_client_pvt_t * a_client_internal); diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index a1a27d08ff..419569d8a5 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -116,9 +116,27 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) (void) a_arg; if(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs) { - dap_db_log_list_delete(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs); //dap_list_free_full(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs, (dap_callback_destroyed_t) free); + dap_db_log_list_delete(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs); DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs = NULL; } + + if (DAP_STREAM_CH_CHAIN(a_ch)->pkt_copy_list) { + dap_list_t *l_tmp_item = DAP_STREAM_CH_CHAIN(a_ch)->pkt_copy_list; + while(l_tmp_item) { + dap_chain_pkt_copy_t *l_pkt_copy = (dap_chain_pkt_copy_t *)l_tmp_item->data; + DAP_DELETE(l_pkt_copy->pkt_data); + DAP_DELETE(l_pkt_copy); + dap_list_t *l_trash_item = l_tmp_item; + l_tmp_item = dap_list_next(l_tmp_item); + DAP_DELETE(l_trash_item); + } + DAP_STREAM_CH_CHAIN(a_ch)->pkt_copy_list = NULL; + } + if (DAP_STREAM_CH_CHAIN(a_ch)->db_iter) { + DAP_STREAM_CH_CHAIN(a_ch)->db_iter = dap_list_first( DAP_STREAM_CH_CHAIN(a_ch)->db_iter); + dap_list_free_full( DAP_STREAM_CH_CHAIN(a_ch)->db_iter, free); + DAP_STREAM_CH_CHAIN(a_ch)->db_iter = NULL; + } } @@ -128,26 +146,30 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); - dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); + dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id); l_ch_chain->request_atom_iter = l_chain->callback_atom_iter_create(l_chain); size_t l_first_size = 0; dap_chain_atom_ptr_t *l_first = l_chain->callback_atom_iter_get_first(l_ch_chain->request_atom_iter, &l_first_size); if (l_first && l_first_size) { // first packet + if (!dap_hash_fast_is_blank(&l_ch_chain->request.hash_from)) { + l_first = l_chain->callback_atom_find_by_hash(l_ch_chain->request_atom_iter, + &l_ch_chain->request.hash_from, &l_first_size); + } l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS; - dap_chain_node_addr_t l_node_addr = { 0 }; - dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); + dap_chain_node_addr_t l_node_addr = {}; + dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_hdr.net_id); l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); + l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, + l_ch_chain->request_hdr.cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); } else { // last packet dap_stream_ch_chain_sync_request_t l_request = {}; dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); + l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, + l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); DAP_DEL_Z(l_ch_chain->request_atom_iter); l_ch_chain->state = CHAIN_STATE_IDLE; if (l_ch_chain->callback_notify_packet_out) @@ -166,35 +188,31 @@ bool s_sync_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg) dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); // Get log diff - l_ch_chain->request_last_ts = dap_db_log_get_last_id(); - //log_it(L_DEBUG, "Requested transactions %llu:%llu", l_request->id_start, (uint64_t ) l_ch_chain->request_last_ts); + uint64_t l_local_last_id = dap_db_log_get_last_id(); + log_it(L_DEBUG, "Requested transactions %llu:%llu", l_ch_chain->request.id_start, l_local_last_id); uint64_t l_start_item = l_ch_chain->request.id_start; // If the current global_db has been truncated, but the remote node has not known this - if(l_ch_chain->request.id_start > l_ch_chain->request_last_ts) { + if(l_ch_chain->request.id_start > l_local_last_id) { l_start_item = 0; } - dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); + dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_hdr.net_id); dap_list_t *l_add_groups = dap_chain_net_get_add_gdb_group(l_net, l_ch_chain->request.node_addr); dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_start_item + 1, l_add_groups); dap_chain_node_addr_t l_node_addr = { 0 }; l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); + l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, + l_ch_chain->request_hdr.cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); if(l_db_log) { - //log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list)); // Add it to outgoing list l_ch_chain->request_global_db_trs = l_db_log; l_ch_chain->db_iter = NULL; l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB; } else { dap_stream_ch_chain_sync_request_t l_request = {}; - //log_it(L_DEBUG, "No items to sync from %u", l_request->id_start + 1); - l_request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); - l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); + l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, + l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); l_ch_chain->state = CHAIN_STATE_IDLE; if(l_ch_chain->callback_notify_packet_out) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, @@ -202,7 +220,7 @@ bool s_sync_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg) } //log_it(L_INFO, "Prepared %u items for sync", l_db_log->items_number - l_request->id_start); // go to send data from list [in s_stream_ch_packet_out()] - // no data to send -> send one empty message DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB_SYNCED + // no data to send -> send one empty message DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); return true; @@ -213,17 +231,16 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) UNUSED(a_thread); dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); - dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); - if (!l_chain) { - log_it(L_WARNING, "No chain found for DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN"); - return true; - } - dap_chain_hash_fast_t l_atom_hash = {}; dap_list_t *l_pkt_copy_list = l_ch_chain->pkt_copy_list; if (l_pkt_copy_list) { l_ch_chain->pkt_copy_list = l_ch_chain->pkt_copy_list->next; dap_chain_pkt_copy_t *l_pkt_copy = (dap_chain_pkt_copy_t *)l_pkt_copy_list->data; + dap_chain_t *l_chain = dap_chain_find_by_id(l_pkt_copy->pkt_hdr.net_id, l_pkt_copy->pkt_hdr.chain_id); + if (!l_chain) { + log_it(L_WARNING, "No chain found for DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN"); + return true; + } dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_copy->pkt_data; uint64_t l_atom_copy_size = l_pkt_copy->pkt_data_size; if ( l_atom_copy_size && l_pkt_copy && l_atom_copy ){ @@ -234,7 +251,7 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size); if (l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) { // append to file - dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_ch_chain->request_cell_id); + dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_pkt_copy->pkt_hdr.cell_id); int l_res; if (l_cell) { // add one atom only @@ -244,6 +261,8 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) if(l_res < 0) { log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash, l_cell ? l_cell->file_storage_path : "[null]"); + } else { + dap_db_set_last_hash_remote(l_ch_chain->request.node_addr.uint64, l_chain, &l_atom_hash); } // add all atoms from treshold if (l_chain->callback_atom_add_from_treshold){ @@ -270,13 +289,14 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) dap_chain_cell_delete(l_cell); } else{ - log_it(L_ERROR, "Can't get cell for cell_id 0x%x for save event to file", l_ch_chain->request_cell_id); + log_it(L_ERROR, "Can't get cell for cell_id 0x%x for save event to file", l_pkt_copy->pkt_hdr.cell_id); } } if(l_atom_add_res == ATOM_PASS) DAP_DELETE(l_atom_copy); } else { + dap_db_set_last_hash_remote(l_ch_chain->request.node_addr.uint64, l_chain, &l_atom_hash); DAP_DELETE(l_atom_copy); } l_chain->callback_atom_iter_delete(l_atom_iter); @@ -356,7 +376,7 @@ bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) if(!l_apply) { // If request was from defined node_addr we update its state if(l_ch_chain->request.node_addr.uint64) { - dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id); + dap_db_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id); } continue; } @@ -368,7 +388,7 @@ bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) (char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group, l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);*/ // apply received transaction - dap_chain_t *l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); + dap_chain_t *l_chain = dap_chain_find_by_id(l_pkt_copy->pkt_hdr.net_id, l_pkt_copy->pkt_hdr.chain_id); if(l_chain) { if(l_chain->callback_add_datums_with_group){ void * restrict l_store_obj_value = l_store_obj->value; @@ -379,14 +399,14 @@ bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) } // save data to global_db if(!dap_chain_global_db_obj_save(l_obj, 1)) { - dap_stream_ch_chain_pkt_write_error(l_ch, l_ch_chain->request_net_id, - l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, + dap_stream_ch_chain_pkt_write_error(l_ch, l_pkt_copy->pkt_hdr.net_id, + l_pkt_copy->pkt_hdr.chain_id, l_pkt_copy->pkt_hdr.cell_id, "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED"); dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); } else { // If request was from defined node_addr we update its state if(l_ch_chain->request.node_addr.uint64) { - dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id); + dap_db_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id); } //log_it(L_DEBUG, "Added new GLOBAL_DB history pack"); } @@ -482,10 +502,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_sync_request_t * l_request = (dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data; memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size); - memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); - memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); - memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); - } + memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); + } dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_sync_chains_callback, a_ch); } @@ -507,9 +525,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_sync_request_t * l_request = (dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data; memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size); - memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); - memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); - memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); + memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_sync_gdb_callback, a_ch); } @@ -536,10 +552,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if(l_chain) { // Expect atom element in if(l_chain_pkt_data_size > 0) { - memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); - memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); - memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); dap_chain_pkt_copy_t *l_pkt_copy = DAP_NEW_Z(dap_chain_pkt_copy_t); + memcpy(&l_pkt_copy->pkt_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); l_pkt_copy->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size); memcpy(l_pkt_copy->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); l_pkt_copy->pkt_data_size = l_chain_pkt_data_size; @@ -567,10 +581,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_INFO, "In: GLOBAL_DB data_size=%d", l_chain_pkt_data_size); // get transaction and save it to global_db if(l_chain_pkt_data_size > 0) { - memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); - memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); - memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); dap_chain_pkt_copy_t *l_pkt_copy = DAP_NEW_Z(dap_chain_pkt_copy_t); + memcpy(&l_pkt_copy->pkt_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); l_pkt_copy->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size); memcpy(l_pkt_copy->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); l_pkt_copy->pkt_data_size = l_chain_pkt_data_size; @@ -589,18 +601,22 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: { dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; memcpy(&l_sync_gdb, l_chain_pkt->data, l_chain_pkt_data_size); + l_sync_gdb.id_start = dap_db_get_last_id_remote(l_sync_gdb.node_addr.uint64); dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); - // Get last timestamp in log - l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); - // no limit - l_sync_gdb.id_end = (uint64_t)0; dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: { dap_stream_ch_chain_sync_request_t l_sync_chains = {}; + memcpy(&l_sync_chains, l_chain_pkt->data, l_chain_pkt_data_size); + dap_chain_t *l_chain = l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); + dap_chain_hash_fast_t *l_hash = dap_db_get_last_hash_remote(l_sync_chains.node_addr.uint64, l_chain); + if (l_hash) { + memcpy(&l_sync_chains.hash_from, l_hash, sizeof(*l_hash)); + DAP_DELETE(l_hash); + } dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_chains, sizeof(l_sync_chains)); } @@ -630,10 +646,7 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain) // Cleanup after request memset(&a_ch_chain->request, 0, sizeof(a_ch_chain->request)); - memset(&a_ch_chain->request_net_id, 0, sizeof(a_ch_chain->request_net_id)); - memset(&a_ch_chain->request_cell_id, 0, sizeof(a_ch_chain->request_cell_id)); - memset(&a_ch_chain->request_chain_id, 0, sizeof(a_ch_chain->request_chain_id)); - memset(&a_ch_chain->request_last_ts, 0, sizeof(a_ch_chain->request_last_ts)); + memset(&a_ch_chain->request_hdr, 0, sizeof(a_ch_chain->request_hdr)); DAP_DEL_Z(a_ch_chain->request_atom_iter); } @@ -646,8 +659,8 @@ bool s_process_gdb_iter(dap_stream_ch_t *a_ch) log_it(L_INFO, "Send one global_db record packet len=%d (rest=%d/%d items)", l_pkt_size, dap_db_log_list_get_count_rest(l_db_list), dap_db_log_list_get_count(l_db_list)); dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, l_pkt, l_pkt_size); + l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, + l_ch_chain->request_hdr.cell_id, l_pkt, l_pkt_size); dap_list_t *l_iter = dap_list_next(l_ch_chain->db_iter); if (l_iter) { l_ch_chain->db_iter = l_iter; @@ -696,19 +709,13 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) // free log list dap_db_log_list_delete(l_ch_chain->request_global_db_trs); l_ch_chain->request_global_db_trs = NULL; + log_it( L_DEBUG,"Syncronized database: last id %llu, items syncronyzed %llu ", dap_db_log_get_last_id(), + l_ch_chain->stats_request_gdb_processed ); // last message dap_stream_ch_chain_sync_request_t l_request = {}; - dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); - l_request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); - l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); - l_request.id_end = 0; - - log_it( L_DEBUG,"Syncronized database: last id %llu, items syncronyzed %llu ", l_request.id_start, - l_ch_chain->stats_request_gdb_processed ); - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); + l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, + l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); l_packet_out = true; if(l_ch_chain->callback_notify_packet_out) @@ -726,8 +733,8 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) dap_stream_ch_chain_sync_request_t l_request = {}; // last message dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); + l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, + l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); l_packet_out = true; log_it( L_DEBUG,"Synced: %llu atoms processed", l_ch_chain->stats_request_atoms_processed); dap_stream_ch_chain_go_idle(l_ch_chain); @@ -736,8 +743,8 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) 0, l_ch_chain->callback_notify_arg); } else { // Process one chain from l_ch_chain->request_atom_iter log_it(L_INFO, "Send one CHAIN packet len=%d", l_ch_chain->request_atom_iter->cur_size); - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_net_id, - l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id, + l_ch_chain->request_hdr.chain_id, l_ch_chain->request_hdr.cell_id, l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size); l_packet_out = true; l_ch_chain->stats_request_atoms_processed++; diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index b73c509b3e..5adb61246d 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -46,6 +46,7 @@ typedef struct dap_chain_atom_item{ } dap_chain_atom_item_t; typedef struct dap_chain_pkt_copy { + dap_stream_ch_chain_pkt_hdr_t pkt_hdr; uint64_t pkt_data_size; byte_t *pkt_data; } dap_chain_pkt_copy_t; @@ -57,15 +58,12 @@ typedef struct dap_stream_ch_chain { dap_list_t *db_iter; dap_stream_ch_chain_state_t state; - dap_chain_atom_iter_t * request_atom_iter; + dap_chain_atom_iter_t *request_atom_iter; dap_list_t *pkt_copy_list; uint64_t stats_request_atoms_processed; uint64_t stats_request_gdb_processed; dap_stream_ch_chain_sync_request_t request; - dap_chain_net_id_t request_net_id; - dap_chain_id_t request_chain_id; - dap_chain_cell_id_t request_cell_id; - time_t request_last_ts; + dap_stream_ch_chain_pkt_hdr_t request_hdr; dap_stream_ch_chain_callback_packet_t callback_notify_packet_out; dap_stream_ch_chain_callback_packet_t callback_notify_packet_in; diff --git a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h index a56e07c490..ae718a35bf 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h +++ b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h @@ -63,9 +63,9 @@ typedef enum dap_stream_ch_chain_state{ typedef struct dap_stream_ch_chain_sync_request{ dap_chain_node_addr_t node_addr; // Requesting node's address dap_chain_hash_fast_t hash_from; - dap_chain_hash_fast_t hash_to; + dap_chain_hash_fast_t hash_to; // unused uint64_t id_start; - uint64_t id_end; + uint64_t id_end; // unused } DAP_ALIGN_PACKED dap_stream_ch_chain_sync_request_t; diff --git a/modules/global-db/dap_chain_global_db_remote.c b/modules/global-db/dap_chain_global_db_remote.c index d8beff46ee..ed60de95fa 100644 --- a/modules/global-db/dap_chain_global_db_remote.c +++ b/modules/global-db/dap_chain_global_db_remote.c @@ -5,7 +5,7 @@ #include <dap_common.h> #include <dap_strfuncs.h> #include <dap_string.h> -//#include "dap_chain_node.h" +#include "dap_chain.h" #include "dap_chain_global_db.h" #include "dap_chain_global_db_remote.h" @@ -103,22 +103,20 @@ uint64_t dap_db_get_cur_node_addr(char *a_net_name) /** * Set last id for remote node */ -bool dap_db_log_set_last_id_remote(uint64_t a_node_addr, uint64_t a_id) +bool dap_db_set_last_id_remote(uint64_t a_node_addr, uint64_t a_id) { - dap_global_db_obj_t l_objs; - l_objs.key = dap_strdup_printf("%ju", a_node_addr); - l_objs.value = (uint8_t*) &a_id; - l_objs.value_len = sizeof(uint64_t); - bool l_ret = dap_chain_global_db_gr_save(&l_objs, 1, GROUP_LOCAL_NODE_LAST_ID); - DAP_DELETE(l_objs.key); - //log_it( L_DEBUG, "Node 0x%016X set last synced timestamp %llu",a_id); - return l_ret; + //log_it( L_DEBUG, "Node 0x%016X set last synced timestamp %llu", a_node_addr, a_id); + uint64_t *l_id = DAP_NEW(uint64_t); + *l_id = a_id; + return dap_chain_global_db_gr_set(dap_strdup_printf("%ju", a_node_addr), + l_id, sizeof(uint64_t), + GROUP_LOCAL_NODE_LAST_ID); } /** * Get last id for remote node */ -uint64_t dap_db_log_get_last_id_remote(uint64_t a_node_addr) +uint64_t dap_db_get_last_id_remote(uint64_t a_node_addr) { char *l_node_addr_str = dap_strdup_printf("%ju", a_node_addr); size_t l_timestamp_len = 0; @@ -131,3 +129,28 @@ uint64_t dap_db_log_get_last_id_remote(uint64_t a_node_addr) DAP_DELETE(l_timestamp); return l_ret_timestamp; } + +/** + * Set last hash for chain for remote node + */ +bool dap_db_set_last_hash_remote(uint64_t a_node_addr, dap_chain_t *a_chain, dap_chain_hash_fast_t *a_hash) +{ + //log_it( L_DEBUG, "Node 0x%016X set last synced timestamp %llu", a_id); + dap_chain_hash_fast_t *l_hash = DAP_NEW(dap_chain_hash_fast_t); + memcpy(l_hash, a_hash, sizeof(*a_hash)); + return dap_chain_global_db_gr_set(dap_strdup_printf("%ju%s%s", a_node_addr, a_chain->net_name, a_chain->name), + l_hash, sizeof(*l_hash), GROUP_LOCAL_NODE_LAST_ID); +} + +/** + * Get last hash for chain for remote node + */ +dap_chain_hash_fast_t *dap_db_get_last_hash_remote(uint64_t a_node_addr, dap_chain_t *a_chain) +{ + char *l_node_chain_str = dap_strdup_printf("%ju%s%s", a_node_addr, a_chain->net_name, a_chain->name); + size_t l_hash_len = 0; + uint8_t *l_hash = dap_chain_global_db_gr_get((const char*)l_node_chain_str, &l_hash_len, + GROUP_LOCAL_NODE_LAST_ID); + DAP_DELETE(l_node_chain_str); + return (dap_chain_hash_fast_t *)l_hash; +} diff --git a/modules/global-db/include/dap_chain_global_db_remote.h b/modules/global-db/include/dap_chain_global_db_remote.h index e144802446..7a58f3b1e4 100644 --- a/modules/global-db/include/dap_chain_global_db_remote.h +++ b/modules/global-db/include/dap_chain_global_db_remote.h @@ -2,7 +2,7 @@ #include <stdbool.h> #include <time.h> - +#include "dap_chain.h" #include "dap_chain_common.h" // Set addr for current node bool dap_db_set_cur_node_addr(uint64_t a_address, char *a_net_name); @@ -10,6 +10,10 @@ bool dap_db_set_cur_node_addr_exp(uint64_t a_address, char *a_net_name ); uint64_t dap_db_get_cur_node_addr(char *a_net_name); // Set last id for remote node -bool dap_db_log_set_last_id_remote(uint64_t a_node_addr, uint64_t a_id); +bool dap_db_set_last_id_remote(uint64_t a_node_addr, uint64_t a_id); // Get last id for remote node -uint64_t dap_db_log_get_last_id_remote(uint64_t a_node_addr); +uint64_t dap_db_get_last_id_remote(uint64_t a_node_addr); +// Set last hash for chain for remote node +bool dap_db_set_last_hash_remote(uint64_t a_node_addr, dap_chain_t *a_chain, dap_chain_hash_fast_t *a_hash); +// Get last hash for chain for remote node +dap_chain_hash_fast_t *dap_db_get_last_hash_remote(uint64_t a_node_addr, dap_chain_t *a_chain); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index a565bb7489..29e2e3a087 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -527,10 +527,7 @@ static int s_net_states_proc(dap_chain_net_t *a_net) dap_stream_worker_t *l_worker = dap_client_get_stream_worker(l_node_client->client); dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; // Get last timestamp in log - l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_node_client->remote_node_addr.uint64); - // no limit - l_sync_gdb.id_end = (uint64_t)0; - + l_sync_gdb.id_start = (uint64_t) dap_db_get_last_id_remote(l_node_client->remote_node_addr.uint64); l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(a_net); log_it(L_DEBUG, "Prepared request to gdb sync from %llu to %llu", l_sync_gdb.id_start, l_sync_gdb.id_end); // find dap_chain_id_t @@ -550,7 +547,7 @@ static int s_net_states_proc(dap_chain_net_t *a_net) l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); switch (l_res) { case -1: - log_it(L_WARNING,"Timeout with link sync"); + log_it(L_WARNING, "Timeout with link sync"); break; case 0: log_it(L_INFO, "Node sync completed"); @@ -601,8 +598,12 @@ static int s_net_states_proc(dap_chain_net_t *a_net) int l_res = 0; DL_FOREACH (a_net->pub.chains, l_chain) { dap_chain_node_client_reset(l_node_client); - dap_stream_ch_chain_sync_request_t l_request ; - memset(&l_request, 0, sizeof (l_request)); + dap_stream_ch_chain_sync_request_t l_request = {}; + dap_chain_hash_fast_t *l_hash = dap_db_get_last_hash_remote(l_node_client->remote_node_addr.uint64, l_chain); + if (l_hash) { + memcpy(&l_request.hash_from, l_hash, sizeof(*l_hash)); + DAP_DELETE(l_hash); + } dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, a_net->pub.id, l_chain->id, a_net->pub.cell_id, &l_request, sizeof(l_request)); // wait for finishing of request @@ -611,7 +612,7 @@ static int s_net_states_proc(dap_chain_net_t *a_net) l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); switch (l_res) { case -1: - log_it(L_WARNING,"Timeout with sync of chain '%s' ", l_chain->name); + log_it(L_WARNING, "Timeout with sync of chain '%s' ", l_chain->name); break; case 0: l_need_flush = true; @@ -621,12 +622,13 @@ static int s_net_states_proc(dap_chain_net_t *a_net) log_it(L_ERROR, "Sync of chain '%s' error %d", l_chain->name,l_res); } dap_chain_node_client_reset(l_node_client); + l_request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(a_net); dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS, a_net->pub.id, l_chain->id, a_net->pub.cell_id, &l_request, sizeof(l_request)); l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); switch (l_res) { case -1: - log_it(L_WARNING,"Timeout with reverse sync of chain '%s' ", l_chain->name); + log_it(L_WARNING, "Timeout with reverse sync of chain '%s' ", l_chain->name); break; case 0: l_need_flush = true; @@ -914,29 +916,24 @@ void dap_chain_net_load_all() void s_set_reply_text_node_status(char **a_str_reply, dap_chain_net_t * a_net){ char* l_node_address_text_block = NULL; dap_chain_node_addr_t l_cur_node_addr = { 0 }; - l_cur_node_addr.uint64 = dap_chain_net_get_cur_addr(a_net) ? dap_chain_net_get_cur_addr(a_net)->uint64 : dap_db_get_cur_node_addr(a_net->pub.name); + l_cur_node_addr.uint64 = dap_chain_net_get_cur_addr_int(a_net); if(!l_cur_node_addr.uint64) l_node_address_text_block = dap_strdup_printf(", cur node address not defined"); else l_node_address_text_block = dap_strdup_printf(", cur node address " NODE_ADDR_FP_STR,NODE_ADDR_FP_ARGS_S(l_cur_node_addr)); char* l_sync_current_link_text_block = NULL; - if(PVT(a_net)->state == NET_STATE_LINKS_PREPARE || - PVT(a_net)->state == NET_STATE_LINKS_CONNECTING || - PVT(a_net)->state == NET_STATE_SYNC_GDB || - PVT(a_net)->state == NET_STATE_SYNC_CHAINS) - l_sync_current_link_text_block = dap_strdup_printf(", processing link %u from %u", + if (PVT(a_net)->state != NET_STATE_OFFLINE) + l_sync_current_link_text_block = dap_strdup_printf(", active links %u from %u", dap_list_length(PVT(a_net)->links), dap_list_length(PVT(a_net)->links_info)); - dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" has state %s (target state %s)%s%s", a_net->pub.name, c_net_states[PVT(a_net)->state], - c_net_states[PVT(a_net)->state_target], - (l_sync_current_link_text_block)? l_sync_current_link_text_block: "", - l_node_address_text_block - ); - + c_net_states[PVT(a_net)->state_target], + (l_sync_current_link_text_block)? l_sync_current_link_text_block: "", + l_node_address_text_block + ); DAP_DELETE(l_sync_current_link_text_block); DAP_DELETE(l_node_address_text_block); } @@ -2005,44 +2002,6 @@ dap_list_t* dap_chain_net_get_node_list(dap_chain_net_t * l_net) } dap_chain_global_db_objs_delete(l_objs, l_nodes_count); return l_node_list; - - // get remote node list - /*dap_chain_node_info_t *l_node_info = dap_chain_node_info_read(l_net, l_node_address); - if(!l_node_info) - continue; - // start connect - //debug inet_pton( AF_INET, "192.168.100.93", &l_node_info->hdr.ext_addr_v4); - dap_chain_node_client_t *l_node_client = dap_chain_node_client_connect(l_node_info); - //dap_chain_node_client_t *l_node_client = dap_chain_client_connect(l_node_info, l_stage_target, l_active_channels); - if(!l_node_client) { - DAP_DELETE(l_node_info); - continue; - } - // wait connected - int timeout_ms = 5000; //5 sec = 5000 ms - int res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_CONNECTED, timeout_ms); - if(res) { - // clean client struct - dap_chain_node_client_close(l_node_client); - DAP_DELETE(l_node_info); - continue; - } - res = dap_chain_node_client_send_nodelist_req(l_node_client); - if(res) { - // clean client struct - dap_chain_node_client_close(l_node_client); - DAP_DELETE(l_node_info); - continue; - } - res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_NODELIST_GOT, timeout_ms); - if(res) { - // clean client struct - dap_chain_node_client_close(l_node_client); - DAP_DELETE(l_node_info); - continue; - } - DAP_DELETE(l_node_info); - */ } /** diff --git a/modules/net/dap_chain_node_cli_cmd.c b/modules/net/dap_chain_node_cli_cmd.c index 17fc21b741..808303bc42 100644 --- a/modules/net/dap_chain_node_cli_cmd.c +++ b/modules/net/dap_chain_node_cli_cmd.c @@ -1046,10 +1046,7 @@ int com_node(int a_argc, char ** a_argv, void *arg_func, char **a_str_reply) dap_stream_ch_chain_sync_request_t l_sync_request = { { 0 } }; dap_stream_ch_t * l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, dap_stream_ch_chain_get_id()); // fill begin id - l_sync_request.id_start = (uint64_t) dap_db_log_get_last_id_remote( - l_remote_node_info->hdr.address.uint64); - // fill end id = 0 - no time limit - //l_sync_request.ts_end = 0; + l_sync_request.id_start = dap_db_get_last_id_remote(l_remote_node_info->hdr.address.uint64); // fill current node address l_sync_request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); @@ -1162,9 +1159,14 @@ int com_node(int a_argc, char ** a_argv, void *arg_func, char **a_str_reply) DL_FOREACH(l_net->pub.chains, l_chain) { // reset state NODE_CLIENT_STATE_SYNCED - l_node_client->state = NODE_CLIENT_STATE_CONNECTED; + dap_chain_node_client_reset(l_node_client); // send request - dap_stream_ch_chain_sync_request_t l_sync_request = { { 0 } }; + dap_stream_ch_chain_sync_request_t l_sync_request = {}; + dap_chain_hash_fast_t *l_hash = dap_db_get_last_hash_remote(l_node_client->remote_node_addr.uint64, l_chain); + if (l_hash) { + memcpy(&l_sync_request.hash_from, l_hash, sizeof(*l_hash)); + DAP_DELETE(l_hash); + } if(0 == dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id, l_chain->id, l_remote_node_info->hdr.cell_id, &l_sync_request, sizeof(l_sync_request))) { -- GitLab