diff --git a/dap_stream_ch_chain.c b/dap_stream_ch_chain.c index 7bda747debe18cfb989e6c06a4283502c54c4ec0..ef7bcc9df9e9b68cd43475579c7e48c0129b1e55 100644 --- a/dap_stream_ch_chain.c +++ b/dap_stream_ch_chain.c @@ -245,9 +245,14 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_DEBUG, "Requested transactions %llu:%llu", l_request->id_start, (uint64_t ) l_ch_chain->request_last_ts); //dap_list_t *l_list = dap_db_log_get_list(l_request->id_start + 1); - dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_request->id_start + 1); - log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list)); + uint64_t l_start_item = l_request->id_start; + // If the current global_db has been truncated, but the remote node has not known this + if(l_request->id_start > l_ch_chain->request_last_ts) { + l_start_item = 0; + } + dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_start_item + 1); if(l_db_log) { + log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list)); // Add it to outgoing list l_ch_chain->request_global_db_trs = l_db_log;//l_list; //dap_list_t *l_last = dap_list_last(l_list); @@ -264,6 +269,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); } else { + log_it(L_DEBUG, "No items to sync from %u", l_request->id_start + 1); dap_stream_ch_chain_sync_request_t l_request = { { 0 } }; l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, @@ -277,7 +283,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, NULL, 0, l_ch_chain->callback_notify_arg); } - log_it(L_INFO, "Prepared %u items for sync", l_db_log->items_number - l_request->id_start);//dap_list_length(l_ch_chain->request_global_db_trs)); + //log_it(L_INFO, "Prepared %u items for sync", l_db_log->items_number - l_request->id_start);//dap_list_length(l_ch_chain->request_global_db_trs)); // go to send data from list [in s_stream_ch_packet_out()] // no data to send -> send one empty message DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB_SYNCED dap_stream_ch_set_ready_to_write(a_ch, true); @@ -362,16 +368,28 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) // log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count ); for(size_t i = 0; i < l_data_obj_count; i++) { - + // timestamp for exist obj + time_t l_timestamp_cur = 0; + // obj to add dap_store_obj_t* l_obj = l_store_obj + i; // read item from base; size_t l_count_read = 0; dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group, l_obj->key, &l_count_read); + // get timestamp for the exist entry + if(l_read_obj) + l_timestamp_cur = l_read_obj->timestamp; + // get timestamp for the deleted entry + else + { + l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key); + } //check whether to apply the received data into the database bool l_apply = true; - if(l_obj->type == 'd') { + if(l_obj->timestamp < l_timestamp_cur) + l_apply = false; + else if(l_obj->type == 'd') { // already deleted if(!l_read_obj) l_apply = false; @@ -380,10 +398,10 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) bool l_is_the_same_present = false; if(l_read_obj && l_read_obj->value_len == l_obj->value_len && - !memcmp(l_read_obj->value, l_read_obj->value, l_obj->value_len)) + !memcmp(l_read_obj->value, l_obj->value, l_obj->value_len)) l_is_the_same_present = true; - // this data already present in global_db - if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp < l_store_obj->timestamp)) + // this data already present in global_db and not obsolete (out of date) + if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp)) l_apply = false; } if(l_read_obj) @@ -444,7 +462,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } } }*/ - // save data to global_db if(!dap_chain_global_db_obj_save(l_obj, 1)) { dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,