Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/libdap-stream-ch-chain
1 result
Show changes
Commits on Source (3)
......@@ -245,9 +245,14 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
log_it(L_DEBUG, "Requested transactions %llu:%llu", l_request->id_start,
(uint64_t ) l_ch_chain->request_last_ts);
//dap_list_t *l_list = dap_db_log_get_list(l_request->id_start + 1);
dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_request->id_start + 1);
log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list));
uint64_t l_start_item = l_request->id_start;
// If the current global_db has been truncated, but the remote node has not known this
if(l_request->id_start > l_ch_chain->request_last_ts) {
l_start_item = 0;
}
dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_start_item + 1);
if(l_db_log) {
log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list));
// Add it to outgoing list
l_ch_chain->request_global_db_trs = l_db_log;//l_list;
//dap_list_t *l_last = dap_list_last(l_list);
......@@ -264,6 +269,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t));
} else {
log_it(L_DEBUG, "No items to sync from %u", l_request->id_start + 1);
dap_stream_ch_chain_sync_request_t l_request = { { 0 } };
l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64);
dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
......@@ -277,7 +283,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
NULL, 0, l_ch_chain->callback_notify_arg);
}
log_it(L_INFO, "Prepared %u items for sync", l_db_log->items_number - l_request->id_start);//dap_list_length(l_ch_chain->request_global_db_trs));
//log_it(L_INFO, "Prepared %u items for sync", l_db_log->items_number - l_request->id_start);//dap_list_length(l_ch_chain->request_global_db_trs));
// go to send data from list [in s_stream_ch_packet_out()]
// no data to send -> send one empty message DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB_SYNCED
dap_stream_ch_set_ready_to_write(a_ch, true);
......@@ -362,16 +368,28 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
// log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count );
for(size_t i = 0; i < l_data_obj_count; i++) {
// timestamp for exist obj
time_t l_timestamp_cur = 0;
// obj to add
dap_store_obj_t* l_obj = l_store_obj + i;
// read item from base;
size_t l_count_read = 0;
dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group,
l_obj->key, &l_count_read);
// get timestamp for the exist entry
if(l_read_obj)
l_timestamp_cur = l_read_obj->timestamp;
// get timestamp for the deleted entry
else
{
l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key);
}
//check whether to apply the received data into the database
bool l_apply = true;
if(l_obj->type == 'd') {
if(l_obj->timestamp < l_timestamp_cur)
l_apply = false;
else if(l_obj->type == 'd') {
// already deleted
if(!l_read_obj)
l_apply = false;
......@@ -380,10 +398,10 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
bool l_is_the_same_present = false;
if(l_read_obj &&
l_read_obj->value_len == l_obj->value_len &&
!memcmp(l_read_obj->value, l_read_obj->value, l_obj->value_len))
!memcmp(l_read_obj->value, l_obj->value, l_obj->value_len))
l_is_the_same_present = true;
// this data already present in global_db
if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp < l_store_obj->timestamp))
// this data already present in global_db and not obsolete (out of date)
if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp))
l_apply = false;
}
if(l_read_obj)
......@@ -444,7 +462,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
}
}
}*/
// save data to global_db
if(!dap_chain_global_db_obj_save(l_obj, 1)) {
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
......