diff --git a/dap_stream_ch_chain.c b/dap_stream_ch_chain.c index 229573869c0f200b896c5863c4f5688458886439..7bda747debe18cfb989e6c06a4283502c54c4ec0 100644 --- a/dap_stream_ch_chain.c +++ b/dap_stream_ch_chain.c @@ -52,7 +52,6 @@ #include "dap_chain_cell.h" #include "dap_chain_global_db.h" -#include "dap_chain_global_db_hist.h" #include "dap_chain_global_db_remote.h" #include "dap_stream.h" @@ -112,8 +111,10 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) { (void) a_arg; - if(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs) - dap_list_free_full(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs, (dap_callback_destroyed_t) free); + if(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs) { + dap_db_log_list_delete(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs); //dap_list_free_full(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs, (dap_callback_destroyed_t) free); + DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs = NULL; + } pthread_mutex_destroy(&DAP_STREAM_CH_CHAIN(a_ch)->mutex); } @@ -243,14 +244,16 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_last_ts = dap_db_log_get_last_id(); log_it(L_DEBUG, "Requested transactions %llu:%llu", l_request->id_start, (uint64_t ) l_ch_chain->request_last_ts); - dap_list_t *l_list = dap_db_log_get_list(l_request->id_start + 1); - log_it(L_DEBUG, "Got %u items", dap_list_length(l_list)); - if(l_list) { + //dap_list_t *l_list = dap_db_log_get_list(l_request->id_start + 1); + dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_request->id_start + 1); + log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list)); + if(l_db_log) { // Add it to outgoing list - dap_list_t *l_last = dap_list_last(l_list); - if(l_last) - l_last->next = l_ch_chain->request_global_db_trs; - l_ch_chain->request_global_db_trs = l_list; + l_ch_chain->request_global_db_trs = l_db_log;//l_list; + //dap_list_t *l_last = dap_list_last(l_list); + //if(l_last) + // l_last->next = l_ch_chain->request_global_db_trs; + //l_ch_chain->request_global_db_trs = l_list; l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB; dap_chain_node_addr_t l_node_addr = { 0 }; @@ -274,7 +277,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, NULL, 0, l_ch_chain->callback_notify_arg); } - log_it(L_INFO, "Prepared %u items for sync", dap_list_length(l_ch_chain->request_global_db_trs)); + log_it(L_INFO, "Prepared %u items for sync", l_db_log->items_number - l_request->id_start);//dap_list_length(l_ch_chain->request_global_db_trs)); // go to send data from list [in s_stream_ch_packet_out()] // no data to send -> send one empty message DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB_SYNCED dap_stream_ch_set_ready_to_write(a_ch, true); @@ -367,11 +370,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_obj->key, &l_count_read); //check whether to apply the received data into the database - bool l_no_apply = false; + bool l_apply = true; if(l_obj->type == 'd') { // already deleted - if(l_read_obj) - l_no_apply = true; + if(!l_read_obj) + l_apply = false; } else if(l_obj->type == 'a') { bool l_is_the_same_present = false; @@ -381,12 +384,12 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_is_the_same_present = true; // this data already present in global_db if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp < l_store_obj->timestamp)) - l_no_apply = true; + l_apply = false; } if(l_read_obj) dap_store_obj_free(l_read_obj, l_count_read); - if(l_no_apply) { + if(!l_apply) { // If request was from defined node_addr we update its state if(l_ch_chain->request.node_addr.uint64) { dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id); @@ -525,54 +528,52 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) // Get log diff //size_t l_data_size_out = 0; - dap_list_t *l_list = l_ch_chain->request_global_db_trs; //dap_list_last( l_ch_chain->request_global_db_trs ); - - - //log_it( L_NOTICE,"last of request_global_db_trs l_list = %X", l_list ); - - //log_it( L_WARNING,"Listing request_global_db_trs: ========" ); - /* { - dap_list_t *l_item = l_ch_chain->request_global_db_trs; - if ( l_item ) { - //log_it( L_NOTICE,"list request_global_db_trs %X", l_item ); - - while( l_item ) { - //log_it( L_DEBUG,"list item %X data = %X", l_item, l_item->data ); - l_item = l_item->next; - } - } - else { - - //log_it( L_NOTICE,"request_global_db_trs EMPTY %X", l_item ); - } - } - */ - bool l_is_stop = true;//l_list ? false : true; - while(l_list) { - + dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs; //dap_list_last( l_ch_chain->request_global_db_trs ); + dap_global_db_obj_t *l_obj = dap_db_log_list_get(l_db_list); + //dap_list_t *l_list = l_ch_chain->request_global_db_trs; //dap_list_last( l_ch_chain->request_global_db_trs ); + bool l_is_stop = true; //l_list ? false : true; + while(l_obj) { //log_it(L_DEBUG, "l_list = %X", l_list); - size_t len = dap_list_length( l_list ); + //size_t len = dap_list_length( l_list ); //log_it( L_DEBUG,"len = dap_list_length(l_list ); = %u", len ); //log_it( L_DEBUG,"l_list->data = %p", l_list->data ); + size_t l_items_total = dap_db_log_list_get_count(l_db_list); + size_t l_items_rest = dap_db_log_list_get_count_rest(l_db_list); + size_t l_item_size_out = 0; - uint8_t *l_item = dap_db_log_pack( (dap_global_db_obj_t *) l_list->data, &l_item_size_out ); + uint8_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out); if(!l_item || !l_item_size_out) { - log_it(L_WARNING, "Log pack returned NULL??? data=0x%x (nothing to send) (rest=%d records)", l_list->data, len-1); + log_it(L_WARNING, "Log pack returned NULL??? data=0x%x (nothing to send) (rest=%d records)", l_obj, + l_items_rest); l_item_size_out = 0; //dap_stream_ch_set_ready_to_write(a_ch, false); + + // go to next item + l_obj = dap_db_log_list_get(l_db_list); + //if(l_obj) + // continue; + // stop global_db sync + //else + // break; } else { - log_it(L_INFO, "Send one global_dr record data=0x%x len=%d (rest=%d records)", l_item, l_item_size_out, len-1); - dap_stream_ch_chain_pkt_write( a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, + log_it(L_INFO, "Send one global_db record data=0x%x len=%d (rest=%d/%d items)", l_item, l_item_size_out, + l_items_rest, l_items_total); + dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, l_item, l_item_size_out ); + l_ch_chain->request_cell_id, l_item, l_item_size_out); + //dap_stream_ch_set_ready_to_write(a_ch, true); + //sleep(1); DAP_DELETE(l_item); + // sent the record, another will be sent + l_is_stop = false; + break; } // remove current item from list and go to next item - dap_chain_global_db_obj_delete((dap_global_db_obj_t *) l_list->data); + /*dap_chain_global_db_obj_delete((dap_global_db_obj_t *) l_list->data); l_ch_chain->request_global_db_trs = dap_list_delete_link(l_ch_chain->request_global_db_trs, l_list); // nothing was sent if(!l_item_size_out) { @@ -583,15 +584,17 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) // stop global_db sync else break; - } - // sent the record, another will be sent - l_is_stop = false; - break; + }*/ } + if(l_is_stop){ - log_it(L_DEBUG, "l_list == 0, STOP"); + log_it(L_DEBUG, "l_obj == 0, STOP"); if(l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB) { + // free log list + l_ch_chain->request_global_db_trs = NULL; + dap_db_log_list_delete(l_db_list); + // last message log_it( L_DEBUG,"dap_db_log_get_last_timestamp_remote"); diff --git a/dap_stream_ch_chain.h b/dap_stream_ch_chain.h index a85e9287b67cccd7b39167c1f4227a3b2719bf84..419053483bf28894dcbb1b0145111cc1180d502c 100755 --- a/dap_stream_ch_chain.h +++ b/dap_stream_ch_chain.h @@ -28,6 +28,7 @@ #include "dap_chain_common.h" #include "dap_chain.h" +#include "dap_chain_global_db_hist.h" #include "dap_list.h" #include "dap_stream_ch_chain_pkt.h" #include "uthash.h" @@ -47,7 +48,7 @@ typedef struct dap_stream_ch_chain { pthread_mutex_t mutex; dap_stream_ch_t * ch; - dap_list_t *request_global_db_trs; // list of transactions + dap_db_log_list_t *request_global_db_trs; // list of transactions dap_stream_ch_chain_state_t state; dap_chain_atom_iter_t * request_atom_iter;