Skip to content
Snippets Groups Projects
Commit a91f9c4f authored by alexander.lysikov's avatar alexander.lysikov
Browse files

fixed multiple transaction transfers

parent 75cfe993
No related branches found
No related tags found
No related merge requests found
......@@ -108,7 +108,7 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg)
*/
void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
{
static char *s_net_name = NULL;
//static char *s_net_name = NULL;
dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
if(l_ch_chain) {
dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg;
......@@ -122,10 +122,10 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: {
log_it(L_INFO, "In: SYNCED_GLOBAL_DB pkt");
if(s_net_name) {
DAP_DELETE(s_net_name);
s_net_name = NULL; //"kelvin-testnet"
}
/*if(s_net_name) {
DAP_DELETE(s_net_name);
s_net_name = NULL; //"kelvin-testnet"
}*/
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: {
......@@ -183,12 +183,15 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
dap_stream_ch_chain_sync_request_t * l_request =
(dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data;
memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size);
memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
// Get log diff
l_ch_chain->request_last_ts = dap_db_log_get_last_id();
log_it(L_DEBUG, "Requested transactions %llu:%llu", l_request->ts_start,
log_it(L_DEBUG, "Requested transactions %llu:%llu", l_request->id_start,
(uint64_t ) l_ch_chain->request_last_ts);
dap_list_t *l_list = dap_db_log_get_list((time_t) l_request->ts_start);
dap_list_t *l_list = dap_db_log_get_list((time_t) l_request->id_start);
log_it(L_DEBUG, "Got %u items", dap_list_length(l_list));
if(l_list) {
// Add it to outgoing list
......@@ -198,9 +201,15 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
l_ch_chain->request_global_db_trs = l_list;
l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB;
dap_chain_node_addr_t l_node_addr = { 0 };
l_node_addr.uint64 = dap_db_get_cur_node_addr();
dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB,
l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t));
} else {
dap_stream_ch_chain_sync_request_t l_request = { { 0 } };
l_request.ts_start = dap_db_log_get_last_timestamp_remote(l_ch_chain->request.node_addr.uint64);
l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64);
dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
l_ch_chain->request_cell_id, &l_request, sizeof(l_request));
......@@ -248,6 +257,16 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
dap_stream_ch_set_ready_to_write(a_ch, true);
}
}
}
break;
// first packet of data with source node address
case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB: {
log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d", l_chain_pkt_data_size);
if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t))
memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size);
//memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
//memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
//memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB: {
......@@ -265,8 +284,40 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
//if ( dap_log_level_get()== L_DEBUG )
//if ( l_data_obj_count && l_store_obj )
// l_store_obj_reversed = DAP_NEW_Z_SIZE(dap_store_obj_t,l_data_obj_count+1);
// Reverse order
for(size_t i = 0; i < l_data_obj_count; i++) {
dap_store_obj_t* l_obj = l_store_obj + i;
// read item from base;
size_t l_count_read = 0;
dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group,
l_obj->key, &l_count_read);
//check whether to apply the received data into the database
bool l_no_apply = false;
if(l_obj->type == 'd' && !l_read_obj) {
l_no_apply = true;
}
else if(l_obj->type == 'a') {
bool l_is_the_same_present = false;
if(l_read_obj &&
l_read_obj->value_len == l_obj->value_len &&
!memcmp(l_read_obj->value, l_read_obj->value, l_obj->value_len))
l_is_the_same_present = true;
// this data already present in global_db
if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp < l_store_obj->timestamp))
l_no_apply = true;
}
if(l_read_obj)
dap_store_obj_free(l_read_obj, l_count_read);
if(l_no_apply) {
// If request was from defined node_addr we update its state
if(l_ch_chain->request.node_addr.uint64) {
dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
}
continue;
}
char l_ts_str[50];
dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp);
log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\""
......@@ -275,87 +326,63 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
l_store_obj[i].key,
l_ts_str,
l_store_obj[i].value_len);
// read net_name
if(!s_net_name)
{
static dap_config_t *l_cfg = NULL;
if((l_cfg = dap_config_open("network/default")) == NULL) {
log_it(L_ERROR, "Can't open default network config");
} else {
s_net_name = dap_strdup(dap_config_get_item_str(l_cfg, "general", "name"));
dap_config_close(l_cfg);
}
}
// add datum in ledger if necessary
{
dap_chain_net_t *l_net = dap_chain_net_by_name(s_net_name);
dap_chain_t * l_chain;
if(l_net) {
DL_FOREACH(l_net->pub.chains, l_chain)
{
const char *l_chain_name = l_chain->name; //l_chain_name = dap_strdup("gdb");
dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, l_chain_name);
//const char *l_group_name = "chain-gdb.kelvin-testnet.chain-F00000000000000F";//dap_chain_gdb_get_group(l_chain);
if(l_chain->callback_datums_pool_proc_with_group)
l_chain->callback_datums_pool_proc_with_group(l_chain,
(dap_chain_datum_t**) &(l_store_obj->value), 1, l_store_obj[i].group);
}
}
}
}
// check if there is an l_store_obj in the base
for(size_t n_obj = 0; n_obj < l_data_obj_count; n_obj++) {
dap_store_obj_t* l_obj = l_store_obj + n_obj;
// read item from base;
size_t l_count_read = 0;
dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group,
l_obj->key, &l_count_read);
// save data to global_db
if(!l_read_obj || (l_read_obj && l_read_obj->timestamp > l_store_obj->timestamp)) {
if(!dap_chain_global_db_obj_save(l_obj, 1)) {
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
"ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED");
dap_stream_ch_set_ready_to_write(a_ch, true);
} else {
// If request was from defined node_addr we update its state
if(l_ch_chain->request.node_addr.uint64) {
dap_db_log_set_last_timestamp_remote(l_ch_chain->request.node_addr.uint64,
l_obj->id);
}
log_it(L_DEBUG, "Added new GLOBAL_DB history pack");
}
// apply received transaction
dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
if(l_chain) {
if(l_chain->callback_datums_pool_proc_with_group)
l_chain->callback_datums_pool_proc_with_group(l_chain,
(dap_chain_datum_t**) &(l_store_obj->value), 1,
l_store_obj[i].group);
}
if(l_read_obj)
dap_store_obj_free(l_read_obj, l_count_read);
/*else {
// read net_name
if(!s_net_name)
{
static dap_config_t *l_cfg = NULL;
if((l_cfg = dap_config_open("network/default")) == NULL) {
log_it(L_ERROR, "Can't open default network config");
} else {
s_net_name = dap_strdup(dap_config_get_item_str(l_cfg, "general", "name"));
dap_config_close(l_cfg);
}
}
// add datum in ledger if necessary
{
dap_chain_net_t *l_net = dap_chain_net_by_name(s_net_name);
dap_chain_t * l_chain;
if(l_net) {
DL_FOREACH(l_net->pub.chains, l_chain)
{
const char *l_chain_name = l_chain->name; //l_chain_name = dap_strdup("gdb");
dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, l_chain_name);
//const char *l_group_name = "chain-gdb.kelvin-testnet.chain-F00000000000000F";//dap_chain_gdb_get_group(l_chain);
if(l_chain->callback_datums_pool_proc_with_group)
l_chain->callback_datums_pool_proc_with_group(l_chain,
(dap_chain_datum_t**) &(l_store_obj->value), 1,
l_store_obj[i].group);
}
}
}
}*/
}
if (l_store_obj)
dap_store_obj_free(l_store_obj, l_data_obj_count);
/*/ save data to global_db
if(l_read_obj && l_read_obj->timestamp > l_store_obj->timestamp) {
if(!dap_chain_global_db_obj_save(l_store_obj, l_data_obj_count)) {
log_it(L_ERROR, "Not saved to global_db objs=0x%x count=%d", l_store_obj,
l_data_obj_count);
// save data to global_db
if(!dap_chain_global_db_obj_save(l_obj, 1)) {
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
"ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED");
dap_stream_ch_set_ready_to_write(a_ch, true);
} else {
// If request was from defined node_addr we update its state
if(l_ch_chain->request.node_addr.uint64) {
dap_db_log_set_last_timestamp_remote(l_ch_chain->request.node_addr.uint64,
dap_db_log_get_last_timestamp());
dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64,
l_obj->id);
}
log_it(L_DEBUG, "Added new GLOBAL_DB history pack");
}
}*/
// DAP_DELETE(l_store_obj);
//if (l_store_obj_reversed)
// DAP_DELETE(l_store_obj_reversed);
}
if(l_store_obj)
dap_store_obj_free(l_store_obj, l_data_obj_count);
} else {
log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size");
......@@ -394,7 +421,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
log_it(L_DEBUG, "CHAIN_STATE_IDLE");
// Cleanup after request
//memset(&l_ch_chain->request, 0, sizeof(l_ch_chain->request));
memset(&l_ch_chain->request, 0, sizeof(l_ch_chain->request));
memset(&l_ch_chain->request_net_id, 0, sizeof(l_ch_chain->request_net_id));
memset(&l_ch_chain->request_cell_id, 0, sizeof(l_ch_chain->request_cell_id));
memset(&l_ch_chain->request_chain_id, 0, sizeof(l_ch_chain->request_chain_id));
......@@ -476,8 +503,8 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
dap_stream_ch_chain_sync_request_t l_request = { { 0 } };
l_request.node_addr.uint64 = dap_db_get_cur_node_addr();
l_request.ts_start = dap_db_log_get_last_timestamp_remote(l_ch_chain->request.node_addr.uint64);
l_request.ts_end = 0;
l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64);
l_request.id_end = 0;
dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
......
......@@ -35,6 +35,7 @@
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN 0x01
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB 0x11
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB 0x21
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS 0x02
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB 0x12
......@@ -51,8 +52,8 @@ typedef struct dap_stream_ch_chain_sync_request{
dap_chain_node_addr_t node_addr; // Requesting node's address
dap_chain_hash_fast_t hash_from;
dap_chain_hash_fast_t hash_to;
uint64_t ts_start;
uint64_t ts_end;
uint64_t id_start;
uint64_t id_end;
} DAP_ALIGN_PACKED dap_stream_ch_chain_sync_request_t;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment