Skip to content
Snippets Groups Projects
Commit 70a44d7f authored by dmitriy.gerasimov's avatar dmitriy.gerasimov
Browse files

Merge branch 'feature-3128' into 'master'

added getting of history  in background

See merge request !11
parents 496ff626 3082124b
No related branches found
No related tags found
1 merge request!11added getting of history in background
......@@ -52,7 +52,6 @@
#include "dap_chain_cell.h"
#include "dap_chain_global_db.h"
#include "dap_chain_global_db_hist.h"
#include "dap_chain_global_db_remote.h"
#include "dap_stream.h"
......@@ -112,8 +111,10 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg)
{
(void) a_arg;
if(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs)
dap_list_free_full(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs, (dap_callback_destroyed_t) free);
if(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs) {
dap_db_log_list_delete(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs); //dap_list_free_full(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs, (dap_callback_destroyed_t) free);
DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs = NULL;
}
pthread_mutex_destroy(&DAP_STREAM_CH_CHAIN(a_ch)->mutex);
}
......@@ -243,14 +244,16 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
l_ch_chain->request_last_ts = dap_db_log_get_last_id();
log_it(L_DEBUG, "Requested transactions %llu:%llu", l_request->id_start,
(uint64_t ) l_ch_chain->request_last_ts);
dap_list_t *l_list = dap_db_log_get_list(l_request->id_start + 1);
log_it(L_DEBUG, "Got %u items", dap_list_length(l_list));
if(l_list) {
//dap_list_t *l_list = dap_db_log_get_list(l_request->id_start + 1);
dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_request->id_start + 1);
log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list));
if(l_db_log) {
// Add it to outgoing list
dap_list_t *l_last = dap_list_last(l_list);
if(l_last)
l_last->next = l_ch_chain->request_global_db_trs;
l_ch_chain->request_global_db_trs = l_list;
l_ch_chain->request_global_db_trs = l_db_log;//l_list;
//dap_list_t *l_last = dap_list_last(l_list);
//if(l_last)
// l_last->next = l_ch_chain->request_global_db_trs;
//l_ch_chain->request_global_db_trs = l_list;
l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB;
dap_chain_node_addr_t l_node_addr = { 0 };
......@@ -274,7 +277,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
NULL, 0, l_ch_chain->callback_notify_arg);
}
log_it(L_INFO, "Prepared %u items for sync", dap_list_length(l_ch_chain->request_global_db_trs));
log_it(L_INFO, "Prepared %u items for sync", l_db_log->items_number - l_request->id_start);//dap_list_length(l_ch_chain->request_global_db_trs));
// go to send data from list [in s_stream_ch_packet_out()]
// no data to send -> send one empty message DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB_SYNCED
dap_stream_ch_set_ready_to_write(a_ch, true);
......@@ -367,11 +370,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
l_obj->key, &l_count_read);
//check whether to apply the received data into the database
bool l_no_apply = false;
bool l_apply = true;
if(l_obj->type == 'd') {
// already deleted
if(l_read_obj)
l_no_apply = true;
if(!l_read_obj)
l_apply = false;
}
else if(l_obj->type == 'a') {
bool l_is_the_same_present = false;
......@@ -381,12 +384,12 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
l_is_the_same_present = true;
// this data already present in global_db
if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp < l_store_obj->timestamp))
l_no_apply = true;
l_apply = false;
}
if(l_read_obj)
dap_store_obj_free(l_read_obj, l_count_read);
if(l_no_apply) {
if(!l_apply) {
// If request was from defined node_addr we update its state
if(l_ch_chain->request.node_addr.uint64) {
dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
......@@ -525,54 +528,52 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
// Get log diff
//size_t l_data_size_out = 0;
dap_list_t *l_list = l_ch_chain->request_global_db_trs; //dap_list_last( l_ch_chain->request_global_db_trs );
//log_it( L_NOTICE,"last of request_global_db_trs l_list = %X", l_list );
//log_it( L_WARNING,"Listing request_global_db_trs: ========" );
/* {
dap_list_t *l_item = l_ch_chain->request_global_db_trs;
if ( l_item ) {
//log_it( L_NOTICE,"list request_global_db_trs %X", l_item );
while( l_item ) {
//log_it( L_DEBUG,"list item %X data = %X", l_item, l_item->data );
l_item = l_item->next;
}
}
else {
//log_it( L_NOTICE,"request_global_db_trs EMPTY %X", l_item );
}
}
*/
bool l_is_stop = true;//l_list ? false : true;
while(l_list) {
dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs; //dap_list_last( l_ch_chain->request_global_db_trs );
dap_global_db_obj_t *l_obj = dap_db_log_list_get(l_db_list);
//dap_list_t *l_list = l_ch_chain->request_global_db_trs; //dap_list_last( l_ch_chain->request_global_db_trs );
bool l_is_stop = true; //l_list ? false : true;
while(l_obj) {
//log_it(L_DEBUG, "l_list = %X", l_list);
size_t len = dap_list_length( l_list );
//size_t len = dap_list_length( l_list );
//log_it( L_DEBUG,"len = dap_list_length(l_list ); = %u", len );
//log_it( L_DEBUG,"l_list->data = %p", l_list->data );
size_t l_items_total = dap_db_log_list_get_count(l_db_list);
size_t l_items_rest = dap_db_log_list_get_count_rest(l_db_list);
size_t l_item_size_out = 0;
uint8_t *l_item = dap_db_log_pack( (dap_global_db_obj_t *) l_list->data, &l_item_size_out );
uint8_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out);
if(!l_item || !l_item_size_out) {
log_it(L_WARNING, "Log pack returned NULL??? data=0x%x (nothing to send) (rest=%d records)", l_list->data, len-1);
log_it(L_WARNING, "Log pack returned NULL??? data=0x%x (nothing to send) (rest=%d records)", l_obj,
l_items_rest);
l_item_size_out = 0;
//dap_stream_ch_set_ready_to_write(a_ch, false);
// go to next item
l_obj = dap_db_log_list_get(l_db_list);
//if(l_obj)
// continue;
// stop global_db sync
//else
// break;
}
else {
log_it(L_INFO, "Send one global_dr record data=0x%x len=%d (rest=%d records)", l_item, l_item_size_out, len-1);
dap_stream_ch_chain_pkt_write( a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB,
log_it(L_INFO, "Send one global_db record data=0x%x len=%d (rest=%d/%d items)", l_item, l_item_size_out,
l_items_rest, l_items_total);
dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB,
l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
l_ch_chain->request_cell_id, l_item, l_item_size_out );
l_ch_chain->request_cell_id, l_item, l_item_size_out);
//dap_stream_ch_set_ready_to_write(a_ch, true);
//sleep(1);
DAP_DELETE(l_item);
// sent the record, another will be sent
l_is_stop = false;
break;
}
// remove current item from list and go to next item
dap_chain_global_db_obj_delete((dap_global_db_obj_t *) l_list->data);
/*dap_chain_global_db_obj_delete((dap_global_db_obj_t *) l_list->data);
l_ch_chain->request_global_db_trs = dap_list_delete_link(l_ch_chain->request_global_db_trs, l_list);
// nothing was sent
if(!l_item_size_out) {
......@@ -583,15 +584,17 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
// stop global_db sync
else
break;
}
// sent the record, another will be sent
l_is_stop = false;
break;
}*/
}
if(l_is_stop){
log_it(L_DEBUG, "l_list == 0, STOP");
log_it(L_DEBUG, "l_obj == 0, STOP");
if(l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB) {
// free log list
l_ch_chain->request_global_db_trs = NULL;
dap_db_log_list_delete(l_db_list);
// last message
log_it( L_DEBUG,"dap_db_log_get_last_timestamp_remote");
......
......@@ -28,6 +28,7 @@
#include "dap_chain_common.h"
#include "dap_chain.h"
#include "dap_chain_global_db_hist.h"
#include "dap_list.h"
#include "dap_stream_ch_chain_pkt.h"
#include "uthash.h"
......@@ -47,7 +48,7 @@ typedef struct dap_stream_ch_chain {
pthread_mutex_t mutex;
dap_stream_ch_t * ch;
dap_list_t *request_global_db_trs; // list of transactions
dap_db_log_list_t *request_global_db_trs; // list of transactions
dap_stream_ch_chain_state_t state;
dap_chain_atom_iter_t * request_atom_iter;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment