Skip to content
Snippets Groups Projects
Commit e8e260cd authored by Roman Khlopkov's avatar Roman Khlopkov 🔜
Browse files

[+] TSD receipt for last_id in GDB groups

parent 72bb6a1b
No related branches found
No related tags found
2 merge requests!419Feature 5220,!339features-4903
Pipeline #8315 passed with stage
in 5 seconds
......@@ -122,8 +122,7 @@ typedef uint8_t byte_t;
#define DAP_NEW_Z_SIZE(a, b) DAP_CAST_REINT(a, rpcalloc(1,b))
#define DAP_REALLOC(a, b) rprealloc(a,b)
#define DAP_DELETE(a) rpfree(a)
#define DAP_DUP(a) ( __typeof(a) ret = memcpy(ret,a,sizeof(*a)) )
#define DAP_DUP_SIZE(a,b) ( __typeof(a) ret = memcpy(ret,a,b) )
#define DAP_DUP(a) memcpy(rpmalloc(sizeof(*a)), a, sizeof(*a))
#else
#define DAP_MALLOC(a) malloc(a)
#define DAP_FREE(a) free(a)
......@@ -139,8 +138,7 @@ typedef uint8_t byte_t;
#define DAP_NEW_Z_SIZE(a, b) DAP_CAST_REINT(a, calloc(1,b))
#define DAP_REALLOC(a, b) realloc(a,b)
#define DAP_DELETE(a) free(a)
#define DAP_DUP(a) ( __typeof(a) ret = memcpy(ret,a,sizeof(*a)) )
#define DAP_DUP_SIZE(a,b) ( __typeof(a) memcpy(ret,a,b) )
#define DAP_DUP(a) memcpy(malloc(sizeof(*a)), a, sizeof(*a))
#endif
#define DAP_DEL_Z(a) if(a) { DAP_DELETE(a); a=NULL;}
......
......@@ -23,11 +23,12 @@ This file is part of DAP SDK the open source project
#pragma once
#include "dap_common.h"
#include "dap_strfuncs.h"
typedef struct dap_tsd{
typedef struct dap_tsd {
uint16_t type;
uint32_t size;
byte_t data[];
} dap_tsd_t;
} DAP_ALIGN_PACKED dap_tsd_t;
dap_tsd_t * dap_tsd_create(uint16_t a_type, const void * a_data, size_t a_data_size);
dap_tsd_t* dap_tsd_find(byte_t * a_data, size_t a_data_size,uint16_t a_type);
......
......@@ -85,6 +85,7 @@ struct sync_request
struct{
dap_db_log_list_t *db_log; // db log
dap_list_t *db_iter;
char *sync_group;
} gdb;
struct{
dap_chain_atom_iter_t *request_atom_iter;
......@@ -572,19 +573,51 @@ static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_worker, void *a_a
struct sync_request *l_sync_request = (struct sync_request *) a_arg;
dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(a_worker), l_sync_request->ch_uuid);
if( l_ch == NULL ){
if( l_ch == NULL ) {
log_it(L_INFO,"Client disconnected before we sent the reply");
DAP_DELETE(l_sync_request);
return;
} else {
dap_stream_ch_chain_pkt_write_error_unsafe(l_ch, l_sync_request->request_hdr.net_id.uint64,
l_sync_request->request_hdr.chain_id.uint64,
l_sync_request->request_hdr.cell_id.uint64,
"ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED");
}
dap_stream_ch_chain_pkt_write_error_unsafe(l_ch, l_sync_request->request_hdr.net_id.uint64,
l_sync_request->request_hdr.chain_id.uint64,
l_sync_request->request_hdr.cell_id.uint64,
"ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED");
DAP_DELETE(l_sync_request);
}
static void s_gdb_sync_tsd_worker_callback(dap_worker_t *a_worker, void *a_arg)
{
struct sync_request *l_sync_request = (struct sync_request *) a_arg;
dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(a_worker), l_sync_request->ch_uuid);
if( l_ch == NULL ) {
log_it(L_INFO,"Client disconnected before we sent the reply");
} else {
size_t l_gr_len = strlen(l_sync_request->gdb.sync_group) + 1;
size_t l_data_size = 2 * sizeof(uint64_t) + l_gr_len;
dap_tsd_t *l_tsd_rec = DAP_NEW_SIZE(dap_tsd_t, l_data_size + sizeof(dap_tsd_t));
l_tsd_rec->type = DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_LAST_ID;
l_tsd_rec->size = l_data_size;
uint64_t l_node_addr = dap_chain_net_get_cur_addr_int(dap_chain_net_by_id(l_sync_request->request_hdr.net_id));
void *l_data_ptr = l_tsd_rec->data;
memcpy(l_data_ptr, &l_node_addr, sizeof(uint64_t));
l_data_ptr += sizeof(uint64_t);
memcpy(l_data_ptr, &l_sync_request->request.id_end, sizeof(uint64_t));
l_data_ptr += sizeof(uint64_t);
memcpy(l_data_ptr, l_sync_request->gdb.sync_group, l_gr_len);
log_it(L_INFO, "Allocated %d bytes, copied %d bytes, sent %d bytes",
l_data_size + sizeof(dap_tsd_t),
(long int)((byte_t *)l_data_ptr - l_tsd_rec->data) + l_gr_len,
l_tsd_rec->size + sizeof(dap_tsd_t));
dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD,
l_sync_request->request_hdr.net_id.uint64,
l_sync_request->request_hdr.chain_id.uint64,
l_sync_request->request_hdr.cell_id.uint64,
l_tsd_rec, l_tsd_rec->size + sizeof(dap_tsd_t));
DAP_DELETE(l_tsd_rec);
}
DAP_DELETE(l_sync_request->gdb.sync_group);
DAP_DELETE(l_sync_request);
}
/**
* @brief s_gdb_in_pkt_callback
......@@ -610,9 +643,28 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
log_it(L_WARNING, "In: GLOBAL_DB parse: packet in list with NULL data(pkt_data_size:%zd)", l_pkt_item->pkt_data_size);
}
uint64_t l_last_id = 0;
char *l_last_group = NULL;
char l_last_type = '\0';
bool l_group_changed = false;
for (size_t i = 0; i < l_data_obj_count; i++) {
// obj to add
dap_store_obj_t *l_obj = l_store_obj + i;
l_group_changed = l_last_group && (strcmp(l_last_group, l_obj->group) || l_last_type != l_obj->type);
// Send remote side notification about received obj
if (l_sync_request->request.node_addr.uint64 &&
(l_group_changed || i == l_data_obj_count - 1)) {
struct sync_request *l_sync_req_tsd = DAP_DUP(l_sync_request);
l_sync_req_tsd->request.id_end = l_last_id;
l_sync_req_tsd->gdb.sync_group = l_obj->type == 'a' ? dap_strdup(l_obj->group) :
dap_strdup_printf("%s.del", l_obj->group);
dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id,
s_gdb_sync_tsd_worker_callback, l_sync_req_tsd);
}
l_last_id = l_obj->id;
l_last_group = l_obj->group;
l_last_type = l_obj->type;
//check whether to apply the received data into the database
bool l_apply = false;
// timestamp for exist obj
......@@ -653,8 +705,7 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
}
// save data to global_db
if(!dap_chain_global_db_obj_save(l_obj, 1)) {
struct sync_request *l_sync_req_err = DAP_NEW_Z(struct sync_request);
memcpy(l_sync_req_err, l_sync_request, sizeof(struct sync_request));
struct sync_request *l_sync_req_err = DAP_DUP(l_sync_request);
dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id,
s_gdb_in_pkt_error_worker_callback, l_sync_req_err);
} else {
......@@ -662,8 +713,9 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
log_it(L_DEBUG, "Added new GLOBAL_DB synchronization record");
}
}
if(l_store_obj)
if(l_store_obj) {
dap_store_obj_free(l_store_obj, l_data_obj_count);
}
} else {
log_it(L_WARNING, "In proc thread got GDB stream ch packet with zero data");
}
......@@ -782,8 +834,21 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
} break;
// Response with metadata organized in TSD
case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD:{
if (s_debug_more)
case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD: {
if (l_chain_pkt_data_size) {
dap_tsd_t *l_tsd_rec = (dap_tsd_t *)l_chain_pkt->data;
if (l_tsd_rec->type != DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_LAST_ID ||
l_tsd_rec->size < 2 * sizeof(uint64_t) + 2) {
break;
}
void *l_data_ptr = l_tsd_rec->data;
uint64_t l_node_addr = *(uint64_t *)l_data_ptr;
l_data_ptr += sizeof(uint64_t);
uint64_t l_last_id = *(uint64_t *)l_data_ptr;
l_data_ptr += sizeof(uint64_t);
char *l_group = (char *)l_data_ptr;
dap_db_set_last_id_remote(l_node_addr, l_last_id, l_group);
} else if (s_debug_more)
log_it(L_DEBUG, "Global DB TSD packet detected");
} break;
......@@ -1333,11 +1398,6 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
}*/
l_skip_count++;
} else {
if (l_ch_chain->request.node_addr.uint64) {
dap_db_set_last_id_remote(l_ch_chain->request.node_addr.uint64,
dap_store_packet_get_id(l_obj->pkt),
dap_store_packet_get_group(l_obj->pkt));
}
l_hash_item = DAP_NEW(dap_stream_ch_chain_hash_item_t);
memcpy(&l_hash_item->hash, &l_obj->hash, sizeof(dap_chain_hash_fast_t));
l_hash_item->size = l_obj->pkt->data_size;
......
......@@ -75,6 +75,7 @@
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_COUNT 0x0002 // Items count
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_HASH_LAST 0x0003 // Hash of last(s) item
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_HASH_FIRST 0x0004 // Hash of first(s) item
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_LAST_ID 0x0100 // Last ID of GDB synced group
typedef enum dap_stream_ch_chain_state{
CHAIN_STATE_IDLE=0,
......
......@@ -10,8 +10,10 @@
#include "dap_chain_datum_tx_items.h"
#include "dap_chain_global_db_remote.h"
#include "dap_chain_global_db_hist.h"
#include "uthash.h"
#define GDB_SYNC_ALWAYS_FROM_ZERO
// for dap_db_history()
typedef struct dap_tx_data{
dap_chain_hash_fast_t tx_hash;
......
......@@ -183,27 +183,6 @@ dap_store_obj_pkt_t *dap_store_packet_multiple(dap_store_obj_pkt_t *a_old_pkt, d
return a_old_pkt;
}
char *dap_store_packet_get_group(dap_store_obj_pkt_t *a_pkt)
{
uint16_t l_gr_len;
memcpy(&l_gr_len, a_pkt->data + sizeof(uint32_t), sizeof(uint16_t));
char *l_ret_str = DAP_NEW_SIZE(char, l_gr_len + 1);
size_t l_gr_offset = sizeof(uint32_t) + sizeof(uint16_t);
memcpy(l_ret_str, a_pkt->data + l_gr_offset, l_gr_len);
l_ret_str[l_gr_len] = '\0';
return l_ret_str;
}
uint64_t dap_store_packet_get_id(dap_store_obj_pkt_t *a_pkt)
{
uint16_t l_gr_len;
memcpy(&l_gr_len, a_pkt->data + sizeof(uint32_t), sizeof(uint16_t));
size_t l_id_offset = sizeof(uint32_t) + sizeof(uint16_t) + l_gr_len;
uint64_t l_ret_id;
memcpy(&l_ret_id, a_pkt->data + l_id_offset, sizeof(uint64_t));
return l_ret_id;
}
void dap_store_packet_change_id(dap_store_obj_pkt_t *a_pkt, uint64_t a_id)
{
uint16_t l_gr_len;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment