Skip to content
Snippets Groups Projects
Commit 469154de authored by alexander.lysikov's avatar alexander.lysikov
Browse files

fixed node synchronization

parent 70a44d7f
No related branches found
No related tags found
1 merge request!12Feature 3128 3232
...@@ -246,8 +246,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -246,8 +246,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
(uint64_t ) l_ch_chain->request_last_ts); (uint64_t ) l_ch_chain->request_last_ts);
//dap_list_t *l_list = dap_db_log_get_list(l_request->id_start + 1); //dap_list_t *l_list = dap_db_log_get_list(l_request->id_start + 1);
dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_request->id_start + 1); dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_request->id_start + 1);
log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list));
if(l_db_log) { if(l_db_log) {
log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list));
// Add it to outgoing list // Add it to outgoing list
l_ch_chain->request_global_db_trs = l_db_log;//l_list; l_ch_chain->request_global_db_trs = l_db_log;//l_list;
//dap_list_t *l_last = dap_list_last(l_list); //dap_list_t *l_last = dap_list_last(l_list);
...@@ -264,6 +264,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -264,6 +264,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t));
} else { } else {
log_it(L_DEBUG, "No items to sync from %u", l_request->id_start + 1);
dap_stream_ch_chain_sync_request_t l_request = { { 0 } }; dap_stream_ch_chain_sync_request_t l_request = { { 0 } };
l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64);
dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
...@@ -277,7 +278,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -277,7 +278,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
NULL, 0, l_ch_chain->callback_notify_arg); NULL, 0, l_ch_chain->callback_notify_arg);
} }
log_it(L_INFO, "Prepared %u items for sync", l_db_log->items_number - l_request->id_start);//dap_list_length(l_ch_chain->request_global_db_trs)); //log_it(L_INFO, "Prepared %u items for sync", l_db_log->items_number - l_request->id_start);//dap_list_length(l_ch_chain->request_global_db_trs));
// go to send data from list [in s_stream_ch_packet_out()] // go to send data from list [in s_stream_ch_packet_out()]
// no data to send -> send one empty message DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB_SYNCED // no data to send -> send one empty message DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB_SYNCED
dap_stream_ch_set_ready_to_write(a_ch, true); dap_stream_ch_set_ready_to_write(a_ch, true);
...@@ -362,16 +363,28 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -362,16 +363,28 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
// log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count ); // log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count );
for(size_t i = 0; i < l_data_obj_count; i++) { for(size_t i = 0; i < l_data_obj_count; i++) {
// timestamp for exist obj
time_t l_timestamp_cur = 0;
// obj to add
dap_store_obj_t* l_obj = l_store_obj + i; dap_store_obj_t* l_obj = l_store_obj + i;
// read item from base; // read item from base;
size_t l_count_read = 0; size_t l_count_read = 0;
dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group, dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group,
l_obj->key, &l_count_read); l_obj->key, &l_count_read);
// get timestamp for the exist entry
if(l_read_obj)
l_timestamp_cur = l_read_obj->timestamp;
// get timestamp for the deleted entry
else
{
l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key);
}
//check whether to apply the received data into the database //check whether to apply the received data into the database
bool l_apply = true; bool l_apply = true;
if(l_obj->type == 'd') { if(l_obj->timestamp < l_timestamp_cur)
l_apply = false;
else if(l_obj->type == 'd') {
// already deleted // already deleted
if(!l_read_obj) if(!l_read_obj)
l_apply = false; l_apply = false;
...@@ -380,10 +393,10 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -380,10 +393,10 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
bool l_is_the_same_present = false; bool l_is_the_same_present = false;
if(l_read_obj && if(l_read_obj &&
l_read_obj->value_len == l_obj->value_len && l_read_obj->value_len == l_obj->value_len &&
!memcmp(l_read_obj->value, l_read_obj->value, l_obj->value_len)) !memcmp(l_read_obj->value, l_obj->value, l_obj->value_len))
l_is_the_same_present = true; l_is_the_same_present = true;
// this data already present in global_db // this data already present in global_db and not obsolete (out of date)
if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp < l_store_obj->timestamp)) if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp))
l_apply = false; l_apply = false;
} }
if(l_read_obj) if(l_read_obj)
...@@ -444,7 +457,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -444,7 +457,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
} }
} }
}*/ }*/
// save data to global_db // save data to global_db
if(!dap_chain_global_db_obj_save(l_obj, 1)) { if(!dap_chain_global_db_obj_save(l_obj, 1)) {
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment