Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (8)
Showing
with 862 additions and 2197 deletions
......@@ -16,14 +16,7 @@
#include "cdb_bgtask.h"
#include <stdlib.h>
#include <errno.h>
#if defined(DAP_OS_DARWIN)
#include <signal.h>
#elif defined(_WIN32)
#include <sys/signal.h>
#else
#include <signal.h>
#endif
/* where thread begins */
static void *_cdb_bgtask_func(void *arg);
......
......@@ -2,7 +2,7 @@ project(cellframe-sdk C)
cmake_minimum_required(VERSION 3.0)
set(CMAKE_C_STANDARD 11)
set(CELLFRAME_SDK_NATIVE_VERSION "2.9-40")
set(CELLFRAME_SDK_NATIVE_VERSION "2.9-41")
add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"")
set(DAPSDK_MODULES "")
message("Cellframe modules: ${CELLFRAME_MODULES}")
......
......@@ -122,8 +122,8 @@ typedef uint8_t byte_t;
#define DAP_NEW_Z_SIZE(a, b) DAP_CAST_REINT(a, rpcalloc(1,b))
#define DAP_REALLOC(a, b) rprealloc(a,b)
#define DAP_DELETE(a) rpfree(a)
#define DAP_DUP(a) ( __typeof(a) ret = memcpy(ret,a,sizeof(*a)) )
#define DAP_DUP_SIZE(a,b) ( __typeof(a) ret = memcpy(ret,a,b) )
#define DAP_DUP(a, b) memcpy(a, b, sizeof(*b))
#define DAP_DUP_SIZE(a, b, s) memcpy(a, b, s)
#else
#define DAP_MALLOC(a) malloc(a)
#define DAP_FREE(a) free(a)
......@@ -139,8 +139,8 @@ typedef uint8_t byte_t;
#define DAP_NEW_Z_SIZE(a, b) DAP_CAST_REINT(a, calloc(1,b))
#define DAP_REALLOC(a, b) realloc(a,b)
#define DAP_DELETE(a) free(a)
#define DAP_DUP(a) ( __typeof(a) ret = memcpy(ret,a,sizeof(*a)) )
#define DAP_DUP_SIZE(a,b) ( __typeof(a) memcpy(ret,a,b) )
#define DAP_DUP(a, b) memcpy(a, b, sizeof(*b))
#define DAP_DUP_SIZE(a, b, s) memcpy(a, b, s)
#endif
#define DAP_DEL_Z(a) if(a) { DAP_DELETE(a); a=NULL;}
......
......@@ -115,14 +115,13 @@ void dap_chain_cell_delete(dap_chain_cell_t *a_cell)
int dap_chain_cell_load(dap_chain_t * a_chain, const char * a_cell_file_path)
{
dap_chain_cell_t * l_cell = dap_chain_cell_create();
l_cell->file_storage_path = dap_strdup( a_cell_file_path );
char *l_file_path = dap_strdup_printf("%s/%s", DAP_CHAIN_PVT (a_chain)->file_storage_dir,
l_cell->file_storage_path);
l_cell->file_storage = fopen(l_file_path,"rb");
DAP_DELETE(l_file_path);
{
char l_file_path[MAX_PATH] = {'\0'};
dap_snprintf(l_file_path, MAX_PATH, "%s/%s", DAP_CHAIN_PVT(a_chain)->file_storage_dir,
l_cell->file_storage_path);
l_cell->file_storage = fopen(l_file_path,"rb");
}
if ( l_cell->file_storage ){
dap_chain_cell_file_header_t l_hdr = {0};
if ( fread( &l_hdr,1,sizeof(l_hdr),l_cell->file_storage ) == sizeof (l_hdr) ) {
......@@ -186,12 +185,12 @@ int dap_chain_cell_file_append( dap_chain_cell_t * a_cell, const void* a_atom, s
size_t l_total_wrote_bytes = 0;
// file need to be opened or created
if(a_cell->file_storage == NULL) {
char *l_file_path = dap_strdup_printf("%s/%s", DAP_CHAIN_PVT ( a_cell->chain)->file_storage_dir,
a_cell->file_storage_path);
char l_file_path[MAX_PATH] = {'\0'};
dap_snprintf(l_file_path, MAX_PATH, "%s/%s", DAP_CHAIN_PVT(a_cell->chain)->file_storage_dir,
a_cell->file_storage_path);
a_cell->file_storage = fopen(l_file_path, "ab");
if(a_cell->file_storage == NULL)
a_cell->file_storage = fopen(l_file_path, "wb");
DAP_DELETE(l_file_path);
}
if(!a_cell->file_storage) {
log_it(L_ERROR, "File \"%s\" not opened for write cell 0x%016X",
......@@ -255,10 +254,10 @@ int dap_chain_cell_file_update( dap_chain_cell_t * a_cell)
return -1;
}
if(a_cell->file_storage == NULL ){ // File need to be created
char *l_file_path = dap_strdup_printf("%s/%s", DAP_CHAIN_PVT ( a_cell->chain)->file_storage_dir,
a_cell->file_storage_path);
char l_file_path[MAX_PATH] = {'\0'};
dap_snprintf(l_file_path, MAX_PATH, "%s/%s", DAP_CHAIN_PVT(a_cell->chain)->file_storage_dir,
a_cell->file_storage_path);
a_cell->file_storage = fopen(l_file_path, "wb");
DAP_DELETE(l_file_path);
if(a_cell->file_storage) {
dap_chain_cell_file_header_t l_hdr = {
.signature = DAP_CHAIN_CELL_FILE_SIGNATURE,
......
......@@ -77,8 +77,6 @@ struct sync_request
dap_stream_ch_chain_pkt_hdr_t request_hdr;
dap_chain_pkt_item_t pkt;
dap_stream_ch_chain_update_element_t *local_gdbs;
uint64_t local_gdbs_count;
dap_stream_ch_chain_hash_item_t *remote_atoms; // Remote atoms
dap_stream_ch_chain_hash_item_t *remote_gdbs; // Remote gdbs
......@@ -331,8 +329,7 @@ static void s_sync_out_gdb_first_worker_callback(dap_worker_t *a_worker, void *a
dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_hdr.net_id);
// Add it to outgoing list
l_ch_chain->request_global_db_trs = l_sync_request->gdb.db_log;
l_ch_chain->request_db_iter = NULL;
l_ch_chain->request_db_log = l_sync_request->gdb.db_log;
l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB;
dap_chain_node_addr_t l_node_addr = { 0 };
l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
......@@ -347,7 +344,6 @@ static void s_sync_out_gdb_first_worker_callback(dap_worker_t *a_worker, void *a
if( a_worker){ // We send NULL to prevent delete
s_sync_request_delete(l_sync_request);
l_ch_chain->is_on_request = false;
}
}
......@@ -369,19 +365,16 @@ static void s_sync_out_gdb_last_worker_callback(dap_worker_t *a_worker, void *a_
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN( l_ch );
s_sync_out_gdb_first_worker_callback(NULL,a_arg); // NULL to say callback not to delete request
dap_stream_ch_chain_sync_request_t l_request = {0};
if (s_debug_more )
log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB");
dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request));
l_ch_chain->request_hdr.cell_id.uint64, NULL, 0);
l_ch_chain->state = CHAIN_STATE_IDLE;
if(l_ch_chain->callback_notify_packet_out)
l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
NULL, 0, l_ch_chain->callback_notify_arg);
l_ch_chain->is_on_request = false;
s_sync_request_delete(l_sync_request);
}
/**
......@@ -392,27 +385,23 @@ static void s_sync_out_gdb_last_worker_callback(dap_worker_t *a_worker, void *a_
*/
static bool s_sync_out_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
{
struct sync_request *l_sync_request = (struct sync_request *) a_arg;
// Get log diff
uint64_t l_local_last_id = dap_db_log_get_last_id();
if (s_debug_more)
log_it(L_DEBUG, "Sync out gdb proc, requested transactions %"DAP_UINT64_FORMAT_u":%"DAP_UINT64_FORMAT_u" from address "NODE_ADDR_FP_STR,
l_sync_request->request.id_start, l_local_last_id, NODE_ADDR_FP_ARGS_S(l_sync_request->request.node_addr));
uint64_t l_start_item = l_sync_request->request.id_start;
// If the current global_db has been truncated, but the remote node has not known this
if(l_sync_request->request.id_start > l_local_last_id) {
l_start_item = 0;
}
struct sync_request *l_sync_request = (struct sync_request *)a_arg;
dap_chain_net_t *l_net = dap_chain_net_by_id(l_sync_request->request_hdr.net_id);
dap_list_t *l_add_groups = dap_chain_net_get_add_gdb_group(l_net, l_sync_request->request.node_addr);
dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_start_item + 1, l_add_groups);
int l_flags = 0;
if (dap_chain_net_get_add_gdb_group(l_net, l_sync_request->request.node_addr))
l_flags |= F_DB_LOG_ADD_EXTRA_GROUPS;
if (!l_sync_request->request.id_start)
l_flags |= F_DB_LOG_SYNC_FROM_ZERO;
dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_sync_request->request.node_addr, l_flags);
if(l_db_log) {
if (s_debug_more)
log_it(L_DEBUG, "Sync out gdb proc, requested %"DAP_UINT64_FORMAT_u" transactions from address "NODE_ADDR_FP_STR,
l_db_log->items_number, NODE_ADDR_FP_ARGS_S(l_sync_request->request.node_addr));
l_sync_request->gdb.db_log = l_db_log;
dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_gdb_first_worker_callback,l_sync_request );
dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_gdb_first_worker_callback, l_sync_request );
} else {
dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_gdb_last_worker_callback,l_sync_request );
dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_gdb_last_worker_callback, l_sync_request );
}
return true;
}
......@@ -427,7 +416,6 @@ static void s_sync_update_gdb_start_worker_callback(dap_worker_t *a_worker, void
s_sync_request_delete(l_sync_request);
return;
}
dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_START,
l_sync_request->request_hdr.net_id.uint64, l_sync_request->request_hdr.chain_id.uint64,
l_sync_request->request_hdr.cell_id.uint64, NULL, 0);
......@@ -438,21 +426,24 @@ static void s_sync_update_gdb_start_worker_callback(dap_worker_t *a_worker, void
DAP_DELETE(l_sync_request);
}
static void s_sync_update_gdb_proc_callback(dap_worker_t *a_worker, void *a_arg)
static bool s_sync_update_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
{
struct sync_request *l_sync_request = (struct sync_request *) a_arg;
dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(a_worker), l_sync_request->ch_uuid);
if( l_ch == NULL ){
log_it(L_INFO,"Client disconnected before we sent the reply");
s_sync_request_delete(l_sync_request);
return ;
}
// TODO make a local_gdbs hash table
struct sync_request *l_sync_request = (struct sync_request *)a_arg;
log_it(L_DEBUG, "Prepare request to gdb sync from %s", l_sync_request->request.id_start ? "last sync" : "zero");
dap_chain_net_t *l_net = dap_chain_net_by_id(l_sync_request->request_hdr.net_id);
dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(l_sync_request->worker), l_sync_request->ch_uuid);
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
l_ch_chain->local_gdbs = l_sync_request->local_gdbs;
l_ch_chain->local_gdbs_count = l_sync_request->local_gdbs_count;
dap_worker_exec_callback_on( l_sync_request->worker, s_sync_update_gdb_start_worker_callback, l_sync_request);
int l_flags = 0;
if (dap_chain_net_get_add_gdb_group(l_net, l_sync_request->request.node_addr))
l_flags |= F_DB_LOG_ADD_EXTRA_GROUPS;
if (!l_sync_request->request.id_start)
l_flags |= F_DB_LOG_SYNC_FROM_ZERO;
dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_sync_request->request.node_addr, l_flags);
l_ch_chain->request_db_log = l_db_log;
l_ch_chain->state = CHAIN_STATE_UPDATE_GLOBAL_DB;
l_sync_request->request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_update_gdb_start_worker_callback, l_sync_request);
return true;
}
/**
......@@ -583,7 +574,7 @@ static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_worker, void *a_a
dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(a_worker), l_sync_request->ch_uuid);
if( l_ch == NULL ){
log_it(L_INFO,"Client disconnected before we sent the reply");
s_sync_request_delete(l_sync_request);
DAP_DELETE(l_sync_request);
return;
}
......@@ -609,7 +600,7 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
if (l_pkt_item->pkt_data_size) {
size_t l_data_obj_count = 0;
// deserialize data & Parse data from dap_db_log_pack()
dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_pkt_item->pkt_data, l_pkt_item->pkt_data_size, &l_data_obj_count);
dap_store_obj_t *l_store_obj = dap_store_unpacket_multiple((dap_store_obj_pkt_t *)l_pkt_item->pkt_data, &l_data_obj_count);
if (s_debug_more){
if (l_data_obj_count)
log_it(L_INFO, "In: GLOBAL_DB parse: pkt_data_size=%zd, l_data_obj_count = %d",l_pkt_item->pkt_data_size, l_data_obj_count );
......@@ -619,46 +610,25 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
log_it(L_WARNING, "In: GLOBAL_DB parse: packet in list with NULL data(pkt_data_size:%zd)", l_pkt_item->pkt_data_size);
}
for(size_t i = 0; i < l_data_obj_count; i++) {
// timestamp for exist obj
time_t l_timestamp_cur = 0;
for (size_t i = 0; i < l_data_obj_count; i++) {
// obj to add
dap_store_obj_t* l_obj = l_store_obj + i;
// read item from base;
size_t l_count_read = 0;
dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group,
l_obj->key, &l_count_read);
// get timestamp for the exist entry
if(l_read_obj)
l_timestamp_cur = l_read_obj->timestamp;
// get timestamp for the deleted entry
else
{
l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key);
}
dap_store_obj_t *l_obj = l_store_obj + i;
//check whether to apply the received data into the database
bool l_apply = true;
if(l_obj->timestamp < l_timestamp_cur)
l_apply = false;
else if(l_obj->type == 'd') {
// already deleted
if(!l_read_obj)
l_apply = false;
bool l_apply = false;
// timestamp for exist obj
time_t l_timestamp_cur = 0;
if (dap_chain_global_db_driver_is(l_obj->group, l_obj->key)) {
dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group, l_obj->key, NULL);
if (l_read_obj) {
l_timestamp_cur = l_read_obj->timestamp;
dap_store_obj_free(l_read_obj, 1);
}
}
else if(l_obj->type == 'a') {
bool l_is_the_same_present = false;
if(l_read_obj &&
l_read_obj->value_len == l_obj->value_len &&
!memcmp(l_read_obj->value, l_obj->value, l_obj->value_len))
l_is_the_same_present = true;
// this data already present in global_db and not obsolete (out of date)
if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp))
l_apply = false;
// check the applied object newer that we have stored or erased
if (l_obj->timestamp > global_db_gr_del_get_timestamp(l_obj->group, l_obj->key) &&
l_obj->timestamp > l_timestamp_cur) {
l_apply = true;
}
if(l_read_obj)
dap_store_obj_free(l_read_obj, l_count_read);
if (s_debug_more){
char l_ts_str[50];
dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp);
......@@ -667,12 +637,7 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
(char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group,
l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);
}
if(!l_apply) {
// If request was from defined node_addr we update its state
if(l_sync_request->request.node_addr.uint64) {
dap_db_set_last_id_remote(l_sync_request->request.node_addr.uint64, l_obj->id);
}
if (!l_apply) {
continue;
}
......@@ -680,7 +645,7 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
dap_chain_t *l_chain = dap_chain_find_by_id(l_sync_request->request_hdr.net_id, l_sync_request->request_hdr.chain_id);
if(l_chain) {
if(l_chain->callback_add_datums_with_group){
void * restrict l_store_obj_value = l_store_obj->value;
void * restrict l_store_obj_value = l_store_obj[i].value;
l_chain->callback_add_datums_with_group(l_chain,
(dap_chain_datum_t** restrict) l_store_obj_value, 1,
l_store_obj[i].group);
......@@ -688,17 +653,13 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
}
// save data to global_db
if(!dap_chain_global_db_obj_save(l_obj, 1)) {
if(l_store_obj)
dap_store_obj_free(l_store_obj, l_data_obj_count);
dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_gdb_in_pkt_error_worker_callback, l_sync_request);
return true;
struct sync_request *l_sync_req_err = DAP_NEW_Z(struct sync_request);
memcpy(l_sync_req_err, l_sync_request, sizeof(struct sync_request));
dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id,
s_gdb_in_pkt_error_worker_callback, l_sync_req_err);
} else {
// If request was from defined node_addr we update its state
if(l_sync_request->request.node_addr.uint64) {
dap_db_set_last_id_remote(l_sync_request->request.node_addr.uint64, l_obj->id);
}
if (s_debug_more)
log_it(L_DEBUG, "Added new GLOBAL_DB history pack");
log_it(L_DEBUG, "Added new GLOBAL_DB synchronization record");
}
}
if(l_store_obj)
......@@ -721,7 +682,6 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
struct sync_request *dap_stream_ch_chain_create_sync_request(dap_stream_ch_chain_pkt_t *a_chain_pkt, dap_stream_ch_t* a_ch)
{
dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
memcpy(&l_ch_chain->request, a_chain_pkt->data, sizeof(l_ch_chain->request));
memcpy(&l_ch_chain->request_hdr, &a_chain_pkt->hdr, sizeof(l_ch_chain->request_hdr));
struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request);
l_sync_request->ch_uuid = a_ch->uuid;
......@@ -749,11 +709,13 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
{
dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
if (!l_ch_chain) {
log_it(L_ERROR, "No chain in channel, returning");
return;
}
dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg;
dap_stream_ch_chain_pkt_t * l_chain_pkt = (dap_stream_ch_chain_pkt_t *) l_ch_pkt->data;
if (!l_chain_pkt) {
log_it(L_ERROR, "No chain packet in channel packet, returning");
return;
}
if (l_ch_pkt->hdr.size< sizeof (l_chain_pkt->hdr) ){
......@@ -807,21 +769,30 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
if(s_debug_more)
log_it(L_INFO, "In: UPDATE_GLOBAL_DB_REQ pkt: net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 ,
l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
l_ch_chain->state = CHAIN_STATE_UPDATE_GLOBAL_DB;
if (l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t))
memcpy(&l_ch_chain->request, l_chain_pkt->data, sizeof(dap_stream_ch_chain_sync_request_t));
dap_chain_node_client_t *l_client = (dap_chain_node_client_t *)l_ch_chain->callback_notify_arg;
if (l_client && l_client->resync_gdb)
l_ch_chain->request.id_start = 0;
else
l_ch_chain->request.id_start = 1; // incremental sync by default
struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch);
l_ch_chain->stats_request_gdb_processed = 0;
dap_worker_exec_callback_on( a_ch->stream_worker->worker, s_sync_update_gdb_proc_callback, l_sync_request);
}break;
dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_update_gdb_proc_callback, l_sync_request);
} break;
// Response with metadata organized in TSD
case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD:{
if (s_debug_more)
log_it(L_DEBUG, "Global DB TSD packet detected");
} break;
}break;
// If requested - begin to recieve record's hashes
case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_START:{
if (s_debug_more)
log_it(L_INFO, "In: UPDATE_GLOBAL_DB_START pkt net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 ,
l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
if(l_ch_chain->state != CHAIN_STATE_IDLE){
if (l_ch_chain->state != CHAIN_STATE_IDLE){
log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_START request because its already busy with syncronization");
s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
......@@ -830,12 +801,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
}
memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_t));
l_ch_chain->state = CHAIN_STATE_UPDATE_GLOBAL_DB_REMOTE;
dap_stream_ch_chain_hash_item_t *l_hash_item = NULL, *l_tmp = NULL;
HASH_ITER(hh, l_ch_chain->remote_gdbs, l_hash_item, l_tmp) {
HASH_DEL(l_ch_chain->remote_gdbs, l_hash_item);
DAP_DELETE(l_hash_item);
}
}break;
} break;
// Response with gdb element hashes and sizes
case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB:{
if(s_debug_more)
......@@ -858,14 +824,14 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
memcpy(&l_hash_item->hash, &l_element->hash, sizeof (l_element->hash));
l_hash_item->size = l_element->size;
HASH_ADD(hh, l_ch_chain->remote_gdbs, hash, sizeof (l_hash_item->hash), l_hash_item);
if (s_debug_more){
/*if (s_debug_more){
char l_hash_str[72]={ [0]='\0'};
dap_chain_hash_fast_to_str(&l_hash_item->hash,l_hash_str,sizeof (l_hash_str));
log_it(L_INFO,"In: Updated remote hash gdb list with %s ", l_hash_str);
}
log_it(L_DEBUG,"In: Updated remote hash gdb list with %s ", l_hash_str);
}*/
}
}
}break;
} break;
// End of response with starting of DB sync
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB:
case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END: {
......@@ -893,8 +859,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
else
log_it(L_INFO, "In: SYNC_GLOBAL_DB pkt");
}
if (l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t))
memcpy(&l_ch_chain->request, l_chain_pkt->data, sizeof(dap_stream_ch_chain_sync_request_t));
struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch);
l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB;
dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_sync_request);
}else{
log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request));
......@@ -943,7 +910,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
if (!l_ch_chain->callback_notify_packet_in) { // we haven't node client waitng, so reply to other side
dap_stream_ch_chain_sync_request_t l_sync_gdb = {};
l_sync_gdb.id_start = dap_db_get_last_id_remote(l_sync_gdb.node_addr.uint64);
dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id);
l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ, l_chain_pkt->hdr.net_id.uint64,
......@@ -953,7 +919,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: {
dap_stream_ch_chain_sync_request_t l_sync_gdb = {};
l_sync_gdb.id_start = dap_db_get_last_id_remote(l_sync_gdb.node_addr.uint64);
l_sync_gdb.id_start = 1;
dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id);
l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
log_it(L_INFO, "In: SYNC_GLOBAL_DB_RVRS pkt: net 0x%016x chain 0x%016x cell 0x%016x, request gdb sync from %u", l_chain_pkt->hdr.net_id.uint64 ,
......@@ -969,7 +935,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
} break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: {
if (s_debug_more)
log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 ,
log_it(L_INFO, "In: FIRST_GLOBAL_DB_GROUP pkt net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 ,
l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
} break;
......@@ -999,11 +965,12 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
l_chain_pkt->hdr.net_id.uint64,l_chain_pkt->hdr.chain_id.uint64,
l_chain_pkt->hdr.cell_id.uint64, NULL, 0);
}
}break;
} break;
// Response with metadata organized in TSD
case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD :{
}break;
if (s_debug_more)
log_it(L_DEBUG, "Chain TSD packet detected");
} break;
// If requested - begin to send atom hashes
case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START:{
......@@ -1029,19 +996,14 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
}
l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS_REMOTE;
memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_hdr_t));
dap_stream_ch_chain_hash_item_t *l_hash_item = NULL, *l_tmp = NULL;
HASH_ITER(hh, l_ch_chain->remote_atoms, l_hash_item, l_tmp) {
HASH_DEL(l_ch_chain->remote_atoms, l_hash_item);
DAP_DELETE(l_hash_item);
}
if(s_debug_more)
log_it(L_INFO,"In: UPDATE_CHAINS_START pkt");
} break;
// Response with atom hashes and sizes
case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS :{
uint l_count_added=0;
uint l_count_total=0;
unsigned int l_count_added=0;
unsigned int l_count_total=0;
dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
if (! l_chain){
......@@ -1119,13 +1081,16 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
}
struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch);
l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS;
char *l_hash_from_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_from);
char *l_hash_to_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_to);
log_it(L_INFO, "In: SYNC_CHAINS pkt: net 0x%016x chain 0x%016x cell 0x%016x between %s and %s", l_ch_chain->request_hdr.net_id.uint64 ,
l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64,
l_hash_from_str? l_hash_from_str: "(null)", l_hash_to_str?l_hash_to_str:"(null)");
DAP_DELETE(l_hash_from_str);
DAP_DELETE(l_hash_to_str);
l_ch_chain->stats_request_atoms_processed = 0;
if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS) {
char *l_hash_from_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_from);
char *l_hash_to_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_to);
log_it(L_INFO, "In: SYNC_CHAINS pkt: net 0x%016x chain 0x%016x cell 0x%016x between %s and %s", l_ch_chain->request_hdr.net_id.uint64 ,
l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64,
l_hash_from_str? l_hash_from_str: "(null)", l_hash_to_str?l_hash_to_str:"(null)");
DAP_DELETE(l_hash_from_str);
DAP_DELETE(l_hash_to_str);
}
dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_sync_request);
} else {
log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: Wrong chain packet size %zd when expected %zd",
......@@ -1144,7 +1109,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr),
l_chain_pkt_data_size, l_ch_chain->request_hdr.net_id.uint64 ,
l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64);
l_ch_chain->stats_request_atoms_processed = 0;
}else{
log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN", l_chain_pkt_data_size);
s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
......@@ -1205,6 +1169,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
"ERROR_NET_INVALID_ID");
break;
}
if (s_debug_more) {
log_it(L_INFO, "Out: UPDATE_CHAINS_REQ pkt");
}
dap_stream_ch_chain_sync_request_t l_request= {};
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ, l_chain_pkt->hdr.net_id.uint64,
l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_request, sizeof(l_request));
......@@ -1241,14 +1208,13 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
l_error_str[l_chain_pkt_data_size-1]='\0'; // To be sure that nobody sends us garbage
// without trailing zero
log_it(L_WARNING,"In from remote addr %s chain id 0x%016x got error on his side: '%s'",
l_ch_chain->ch->stream->esocket->remote_addr_str?
l_ch_chain->ch->stream->esocket->remote_addr_str: "<no addr>",
l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt_data_size>1? l_error_str:"<empty>");
l_ch_chain->ch->stream->esocket->remote_addr_str ? l_ch_chain->ch->stream->esocket->remote_addr_str: "<no addr>",
l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt_data_size > 1 ? l_error_str:"<empty>");
} break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: {
log_it(L_INFO, "In from "NODE_ADDR_FP_STR": SYNCED_ALL net 0x%016x chain 0x%016x cell 0x%016x",NODE_ADDR_FP_ARGS_S(l_ch_chain->node_client->remote_node_addr), l_chain_pkt->hdr.net_id.uint64 ,
l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
} break;
default: {
......@@ -1285,29 +1251,17 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain)
}
DAP_DEL_Z(a_ch_chain->request_atom_iter);
}
}
static void s_process_gdb_iter(dap_stream_ch_t *a_ch)
{
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs;
dap_store_obj_pkt_t *l_pkt = (dap_store_obj_pkt_t *)l_ch_chain->request_db_iter->data;
uint32_t l_pkt_size = sizeof(dap_store_obj_pkt_t) + l_pkt->data_size;
// TODO find current record hash and compare it with hash table
if( s_debug_more)
log_it(L_INFO, "Send one global_db record packet len=%d (rest=%d/%d items)", l_pkt_size,
dap_db_log_list_get_count_rest(l_db_list), dap_db_log_list_get_count(l_db_list));
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64, l_pkt, l_pkt_size);
dap_list_t *l_iter = dap_list_next(l_ch_chain->request_db_iter);
if (l_iter) {
l_ch_chain->request_db_iter = l_iter;
} else {
l_ch_chain->stats_request_gdb_processed++;
l_ch_chain->request_db_iter = dap_list_first(l_ch_chain->request_db_iter);
dap_list_free_full(l_ch_chain->request_db_iter, free);
l_ch_chain->request_db_iter = NULL;
// free log list
dap_db_log_list_delete(a_ch_chain->request_db_log);
a_ch_chain->request_db_log = NULL;
dap_stream_ch_chain_hash_item_t *l_hash_item = NULL, *l_tmp = NULL;
HASH_ITER(hh, a_ch_chain->remote_gdbs, l_hash_item, l_tmp) {
HASH_DEL(a_ch_chain->remote_gdbs, l_hash_item);
DAP_DELETE(l_hash_item);
}
HASH_ITER(hh, a_ch_chain->remote_atoms, l_hash_item, l_tmp) {
HASH_DEL(a_ch_chain->remote_atoms, l_hash_item);
DAP_DELETE(l_hash_item);
}
}
......@@ -1325,69 +1279,104 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
switch (l_ch_chain->state) {
// Update list of global DB records to remote
case CHAIN_STATE_UPDATE_GLOBAL_DB: {
if (l_ch_chain->stats_request_gdb_processed == l_ch_chain->local_gdbs_count) {
dap_stream_ch_chain_sync_request_t l_sync_gdb = {};
l_sync_gdb.id_start = dap_db_get_last_id_remote(l_ch_chain->request.node_addr.uint64);
dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_hdr.net_id);
l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64, &l_sync_gdb, sizeof(dap_stream_ch_chain_sync_request_t));
if (s_debug_more )
log_it(L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END");
dap_stream_ch_chain_go_idle(l_ch_chain);
} else {
uint_fast16_t l_count = l_ch_chain->local_gdbs_count - l_ch_chain->stats_request_gdb_processed;
if (l_count > s_update_pack_size)
l_count = s_update_pack_size;
dap_stream_ch_chain_update_element_t l_data[s_update_pack_size];
uint_fast16_t i;
for (i = 0; i < s_update_pack_size; i++) {
dap_db_log_list_obj_t *l_obj = dap_db_log_list_get(l_ch_chain->request_db_log);
if (!l_obj)
break;
memcpy(&l_data[i].hash, &l_obj->hash, sizeof(dap_chain_hash_fast_t));
l_data[i].size = l_obj->pkt->data_size;
}
if (i) {
dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64,
&l_ch_chain->local_gdbs[l_ch_chain->stats_request_gdb_processed],
l_count * sizeof(dap_stream_ch_chain_update_element_t));
l_ch_chain->stats_request_gdb_processed += l_count;
l_data, i * sizeof(dap_stream_ch_chain_update_element_t));
l_ch_chain->stats_request_gdb_processed += i;
if (s_debug_more)
log_it(L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB");
} else {
l_ch_chain->request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(dap_chain_net_by_id(
l_ch_chain->request_hdr.net_id));
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END,
l_ch_chain->request_hdr.net_id.uint64,
l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64,
&l_ch_chain->request, sizeof(dap_stream_ch_chain_sync_request_t));
if (s_debug_more )
log_it(L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END");
dap_stream_ch_chain_go_idle(l_ch_chain);
}
} break;
// Synchronize GDB
case CHAIN_STATE_SYNC_GLOBAL_DB: {
if (l_ch_chain->request_db_iter) {
s_process_gdb_iter(a_ch);
} else {
dap_global_db_obj_t *l_obj;
do { // Get log diff
size_t l_item_size_out = 0;
l_obj = dap_db_log_list_get(l_ch_chain->request_global_db_trs);
l_ch_chain->request_db_iter = dap_db_log_pack(l_obj, &l_item_size_out);
if (l_ch_chain->request_db_iter && l_item_size_out) {
break;
}
// Item not found, maybe it has deleted? Then go to the next item
} while (l_obj);
if (l_ch_chain->request_db_iter) {
s_process_gdb_iter(a_ch);
// Get global DB record
dap_store_obj_pkt_t *l_pkt = NULL;
dap_db_log_list_obj_t *l_obj = NULL;
size_t l_pkt_size = 0;
for (uint_fast16_t l_skip_count = 0; l_skip_count < s_skip_in_reactor_count; ) {
l_obj = dap_db_log_list_get(l_ch_chain->request_db_log);
if (!l_obj)
break;
dap_stream_ch_chain_hash_item_t *l_hash_item = NULL;
HASH_FIND(hh, l_ch_chain->remote_gdbs, &l_obj->hash, sizeof(dap_hash_fast_t), l_hash_item);
if (l_hash_item) { // If found - skip it
/*if (s_debug_more) {
char l_request_atom_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE];
dap_chain_hash_fast_to_str(&l_obj->hash, l_request_atom_hash_str, DAP_CHAIN_HASH_FAST_STR_SIZE);
log_it(L_DEBUG, "Out CHAIN: skip GDB hash %s because its already present in remote GDB hash table",
l_request_atom_hash_str);
}*/
l_skip_count++;
} else {
// free log list
dap_db_log_list_delete(l_ch_chain->request_global_db_trs);
l_ch_chain->request_global_db_trs = NULL;
log_it( L_INFO,"Syncronized database: last id %"DAP_UINT64_FORMAT_U", items syncronyzed %"DAP_UINT64_FORMAT_U" ", dap_db_log_get_last_id(),
l_ch_chain->stats_request_gdb_processed );
// last message
dap_stream_ch_chain_sync_request_t l_request = {};
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request));
dap_stream_ch_chain_go_idle(l_ch_chain);
if (l_ch_chain->callback_notify_packet_out)
l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
NULL, 0, l_ch_chain->callback_notify_arg);
if (l_ch_chain->request.node_addr.uint64) {
dap_db_set_last_id_remote(l_ch_chain->request.node_addr.uint64,
dap_store_packet_get_id(l_obj->pkt),
dap_store_packet_get_group(l_obj->pkt));
}
l_hash_item = DAP_NEW(dap_stream_ch_chain_hash_item_t);
memcpy(&l_hash_item->hash, &l_obj->hash, sizeof(dap_chain_hash_fast_t));
l_hash_item->size = l_obj->pkt->data_size;
HASH_ADD(hh, l_ch_chain->remote_gdbs, hash, sizeof(dap_chain_hash_fast_t), l_hash_item);
l_pkt = dap_store_packet_multiple(l_pkt, l_obj->pkt);
l_ch_chain->stats_request_gdb_processed++;
l_pkt_size = sizeof(dap_store_obj_pkt_t) + l_pkt->data_size;
if (l_pkt_size >= DAP_CHAIN_PKT_EXPECT_SIZE)
break;
}
}
if (l_pkt_size) {
// If request was from defined node_addr we update its state
if( s_debug_more)
log_it(L_INFO, "Send one global_db packet len=%d (rest=%d/%d items)", l_pkt_size,
dap_db_log_list_get_count_rest(l_ch_chain->request_db_log),
dap_db_log_list_get_count(l_ch_chain->request_db_log));
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64, l_pkt, l_pkt_size);
DAP_DELETE(l_pkt);
} else if (l_obj) {
// Sending dumb packet with nothing to inform remote thats we're just skiping GDBs, nothing freezed
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64, NULL, 0);
} else {
log_it( L_INFO,"Syncronized database: items syncronyzed %"DAP_UINT64_FORMAT_u" from %"DAP_UINT64_FORMAT_u"",
l_ch_chain->stats_request_gdb_processed, dap_db_log_list_get_count(l_ch_chain->request_db_log));
// last message
dap_stream_ch_chain_sync_request_t l_request = {};
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request));
dap_stream_ch_chain_go_idle(l_ch_chain);
if (l_ch_chain->callback_notify_packet_out)
l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
NULL, 0, l_ch_chain->callback_notify_arg);
}
} break;
// Update list of atoms to remote
......@@ -1478,6 +1467,8 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
}
// Then get next atom and populate new last
l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL);
if (l_was_sent_smth)
break;
}
if(!l_ch_chain->request_atom_iter || !l_ch_chain->request_atom_iter->cur) { // All chains synced
dap_stream_ch_chain_sync_request_t l_request = {0};
......
......@@ -65,31 +65,23 @@ typedef struct dap_stream_ch_chain {
uint64_t stats_request_atoms_processed;
uint64_t stats_request_gdb_processed;
dap_stream_ch_chain_update_element_t *local_gdbs;
uint64_t local_gdbs_count;
dap_stream_ch_chain_hash_item_t * remote_atoms; // Remote atoms
dap_stream_ch_chain_hash_item_t * remote_gdbs; // Remote gdbs
// request section
dap_chain_atom_iter_t *request_atom_iter;
dap_db_log_list_t *request_global_db_trs; // list of global db records
dap_db_log_list_t *request_db_log; // list of global db records
dap_stream_ch_chain_sync_request_t request;
dap_stream_ch_chain_pkt_hdr_t request_hdr;
dap_list_t *request_db_iter;
bool request_updates_complete;
bool is_on_request; // Protects request section
bool is_on_reverse_request;
dap_stream_ch_chain_callback_packet_t callback_notify_packet_out;
dap_stream_ch_chain_callback_packet_t callback_notify_packet_in;
void *callback_notify_arg;
} dap_stream_ch_chain_t;
#define DAP_STREAM_CH_CHAIN(a) ((dap_stream_ch_chain_t *) ((a)->internal) )
#define DAP_CHAIN_PKT_EXPECT_SIZE 7168
int dap_stream_ch_chain_init(void);
void dap_stream_ch_chain_deinit(void);
......
......@@ -152,14 +152,10 @@ dap_chain_tx_token_t* dap_chain_datum_tx_item_token_create(dap_chain_hash_fast_t
{
if(!a_ticker)
return NULL;
size_t a_ticker_len = strlen(a_ticker);
dap_chain_tx_token_t *l_item = DAP_NEW_Z(dap_chain_tx_token_t);
l_item->header.type = TX_ITEM_TYPE_TOKEN;
memcpy (& l_item->header.token_emission_hash, a_datum_token_hash, sizeof ( *a_datum_token_hash ) );
if(a_ticker_len >= sizeof(l_item->header.ticker))
a_ticker_len = sizeof(l_item->header.ticker) - 1;
strncpy(l_item->header.ticker, a_ticker, a_ticker_len);
strncpy(l_item->header.ticker, a_ticker, sizeof(l_item->header.ticker) - 1);
return l_item;
}
......
......@@ -128,7 +128,7 @@ int dap_chain_gdb_init(void)
* @param a_value
* @param a_value_len
*/
static void s_history_callback_notify(void * a_arg, const char a_op_code, const char * a_prefix, const char * a_group,
static void s_history_callback_notify(void * a_arg, const char a_op_code, const char * a_group,
const char * a_key, const void * a_value, const size_t a_value_size)
{
if (a_arg){
......@@ -136,8 +136,8 @@ static void s_history_callback_notify(void * a_arg, const char a_op_code, const
dap_chain_net_t *l_net = dap_chain_net_by_id( l_gdb->chain->net_id);
log_it(L_DEBUG,"%s.%s: op_code='%c' group=\"%s\" key=\"%s\" value_size=%u",l_net->pub.name,
l_gdb->chain->name, a_op_code, a_group, a_key, a_value_size);
dap_chain_node_mempool_autoproc_notify((void *)l_net, a_op_code, a_prefix, a_group, a_key, a_value, a_value_size);
dap_chain_net_sync_gdb_broadcast((void *)l_net, a_op_code, a_prefix, a_group, a_key, a_value, a_value_size);
dap_chain_node_mempool_autoproc_notify((void *)l_net, a_op_code, a_group, a_key, a_value, a_value_size);
dap_chain_net_sync_gdb_broadcast((void *)l_net, a_op_code, a_group, a_key, a_value, a_value_size);
}
}
......@@ -174,9 +174,7 @@ int dap_chain_gdb_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg)
}
// Add group prefix that will be tracking all changes
dap_chain_global_db_add_history_group_prefix("chain-gdb", GROUP_LOCAL_HISTORY);
dap_chain_global_db_add_history_callback_notify("chain-gdb", s_history_callback_notify,l_gdb);
dap_chain_global_db_add_sync_group("chain-gdb", s_history_callback_notify, l_gdb);
// load ledger
l_gdb_priv->is_load_mode = true;
......
......@@ -62,135 +62,68 @@ static inline void unlock()
}
// Callback table item
typedef struct history_group_item
typedef struct sync_group_item
{
char prefix[32];
uint8_t padding[7];
bool auto_track; // Track history actions automaticly
dap_global_db_obj_callback_notify_t callback_notify;
void * callback_arg;
char *group_name_for_history;
UT_hash_handle hh;
} history_group_item_t;
// Callback table item
typedef struct history_extra_group_item
{
char *group_name;
char *group_mask;
char *group_name_for_history;
dap_global_db_obj_callback_notify_t callback_notify;
void * callback_arg;
UT_hash_handle hh;
} history_extra_group_item_t;
} sync_group_item_t;
// Tacked group callbacks
static history_group_item_t * s_history_group_items = NULL;
static history_extra_group_item_t * s_history_extra_group_items = NULL;
char * extract_group_prefix(const char * a_group);
static sync_group_item_t *s_sync_group_items = NULL;
static sync_group_item_t *s_sync_group_extra_items = NULL;
static bool s_track_history = false;
/**
* @brief extract_group_prefix
* @param a_group
* @return
*/
char * extract_group_prefix(const char* a_group)
{
char * l_group_prefix = NULL, *l_delimeter;
size_t l_group_prefix_size;
// l_delimeter = index(a_group, '.');
l_delimeter = strchr(a_group, '.');
if(l_delimeter == NULL) {
l_group_prefix = dap_strdup(a_group);
l_group_prefix_size = dap_strlen(l_group_prefix) + 1;
} else {
l_group_prefix_size = (size_t) l_delimeter - (size_t) a_group;
if(l_group_prefix_size > 1)
l_group_prefix = strndup(a_group, l_group_prefix_size);
}
return l_group_prefix;
}
/*
* Get history group by group name
* @brief dap_chain_global_db_add_sync_group
* @details Add group name for synchronization
* @param a_group_prefix
*/
char* dap_chain_global_db_get_history_group_by_group_name(const char * a_group_name)
void dap_chain_global_db_add_sync_group(const char *a_group_prefix, dap_global_db_obj_callback_notify_t a_callback, void *a_arg)
{
if(!s_history_extra_group_items || !a_group_name)
return NULL;
history_extra_group_item_t * l_history_extra_group_item = NULL;
HASH_FIND_STR(s_history_extra_group_items, a_group_name, l_history_extra_group_item);
if(l_history_extra_group_item) {
return dap_strdup(l_history_extra_group_item->group_name_for_history);
}else
return NULL;
sync_group_item_t * l_item = DAP_NEW_Z(sync_group_item_t);
l_item->group_mask = dap_strdup_printf("%s.*", a_group_prefix);
l_item->group_name_for_history = dap_strdup(GROUP_LOCAL_HISTORY);
l_item->callback_notify = a_callback;
l_item->callback_arg = a_arg;
HASH_ADD_STR(s_sync_group_items, group_mask, l_item);
}
/**
* @brief dap_chain_global_db_add_history_group_prefix
* @details Add group prefix that will be tracking all changes
* @brief dap_chain_global_db_add_sync_extra_group
* @details Add group name for synchronization with especially node addresses
* @param a_group_prefix
* @param a_group_name_for_history
*/
void dap_chain_global_db_add_history_group_prefix(const char * a_group_prefix, const char * a_group_name_for_history)
void dap_chain_global_db_add_sync_extra_group(const char *a_group_mask, dap_global_db_obj_callback_notify_t a_callback, void *a_arg)
{
history_group_item_t * l_item = DAP_NEW_Z(history_group_item_t);
snprintf(l_item->prefix, sizeof(l_item->prefix), "%s", a_group_prefix);
l_item->group_name_for_history = dap_strdup(a_group_name_for_history);//GROUP_LOCAL_HISTORY
l_item->auto_track = true;
HASH_ADD_STR(s_history_group_items, prefix, l_item);
sync_group_item_t* l_item = DAP_NEW_Z(sync_group_item_t);
l_item->group_mask = dap_strdup(a_group_mask);
l_item->group_name_for_history = dap_strdup(GROUP_LOCAL_HISTORY".extra");
l_item->callback_notify = a_callback;
l_item->callback_arg = a_arg;
HASH_ADD_STR(s_sync_group_extra_items, group_mask, l_item);
}
/**
* @brief dap_chain_global_db_add_history_callback_notify
* @param a_group_prefix
* @param a_callback
*/
void dap_chain_global_db_add_history_callback_notify(const char * a_group_prefix,
dap_global_db_obj_callback_notify_t a_callback, void * a_arg)
dap_list_t *dap_chain_db_get_sync_groups_internal(sync_group_item_t *a_table)
{
history_group_item_t * l_item = NULL;
HASH_FIND_STR(s_history_group_items, a_group_prefix, l_item);
if(l_item) {
l_item->callback_notify = a_callback;
l_item->callback_arg = a_arg;
} else
log_it(L_WARNING, "Can't setup notify callback for groups with prefix %s. Possible not in history track state",
a_group_prefix);
dap_list_t *l_ret = NULL;
sync_group_item_t *l_item = NULL, *l_item_tmp = NULL;
HASH_ITER(hh, a_table, l_item, l_item_tmp) {
l_ret = dap_list_append(l_ret, l_item->group_mask);
}
return l_ret;
}
/**
* @brief dap_chain_global_db_add_history_extra_group
* @details Add group prefix that will be tracking all changes
* @param a_group_prefix
*/
const char* dap_chain_global_db_add_history_extra_group(const char * a_group_name, dap_chain_node_addr_t *a_nodes, uint16_t *a_nodes_count)
dap_list_t *dap_chain_db_get_sync_groups()
{
history_extra_group_item_t* l_item = DAP_NEW_Z(history_extra_group_item_t);
l_item->group_name = dap_strdup(a_group_name);
l_item->group_name_for_history = dap_strdup_printf("local.history.%s", a_group_name);
HASH_ADD_STR(s_history_extra_group_items, group_name, l_item);
return (const char*)l_item->group_name_for_history;
return dap_chain_db_get_sync_groups_internal(s_sync_group_items);
}
/**
* @brief dap_chain_global_db_add_history_extra_group_callback_notify
* @param a_group_prefix
* @param a_callback
*/
void dap_chain_global_db_add_history_extra_group_callback_notify(const char * a_group_prefix,
dap_global_db_obj_callback_notify_t a_callback, void * a_arg)
dap_list_t *dap_chain_db_get_sync_extra_groups()
{
history_extra_group_item_t * l_item = NULL;
HASH_FIND_STR(s_history_extra_group_items, a_group_prefix, l_item);
if(l_item) {
l_item->callback_notify = a_callback;
l_item->callback_arg = a_arg;
} else
log_it(L_WARNING, "Can't setup notify callback for extra groups with prefix %s. Possible not in history track state",
a_group_prefix);
return dap_chain_db_get_sync_groups_internal(s_sync_group_extra_items);
}
/**
......@@ -220,13 +153,8 @@ void dap_chain_global_db_obj_delete(dap_global_db_obj_t *obj)
*/
void dap_chain_global_db_objs_delete(dap_global_db_obj_t *objs, size_t a_count)
{
//int i = 0;
//while(objs) {
for(size_t i = 0; i < a_count; i++) {
//if(!(objs[i]))
// break;
dap_chain_global_db_obj_clean(objs + i);
//i++;
}
DAP_DELETE(objs);
}
......@@ -239,8 +167,9 @@ void dap_chain_global_db_objs_delete(dap_global_db_obj_t *objs, size_t a_count)
int dap_chain_global_db_init(dap_config_t * g_config)
{
const char *l_storage_path = dap_config_get_item_str(g_config, "resources", "dap_global_db_path");
//const char *l_driver_name = dap_config_get_item_str_default(g_config, "resources", "dap_global_db_driver", "sqlite");
const char *l_driver_name = dap_config_get_item_str_default(g_config, "resources", "dap_global_db_driver", "cdb");
const char *l_driver_name = dap_config_get_item_str_default(g_config, "resources", "dap_global_db_driver", "sqlite");
//const char *l_driver_name = dap_config_get_item_str_default(g_config, "resources", "dap_global_db_driver", "cdb");
s_track_history = dap_config_get_item_bool_default(g_config, "resources", "dap_global_db_track_history", s_track_history);
lock();
int res = dap_db_driver_init(l_driver_name, l_storage_path);
unlock();
......@@ -260,20 +189,20 @@ void dap_chain_global_db_deinit(void)
dap_db_driver_deinit();
//dap_db_deinit();
unlock();
history_group_item_t * l_item = NULL, *l_item_tmp = NULL;
HASH_ITER(hh, s_history_group_items, l_item, l_item_tmp)
sync_group_item_t * l_item = NULL, *l_item_tmp = NULL;
HASH_ITER(hh, s_sync_group_items, l_item, l_item_tmp)
{
DAP_DELETE(l_item->group_name_for_history);
DAP_DELETE(l_item);
}
history_extra_group_item_t * l_add_item = NULL, *l_add_item_tmp = NULL;
HASH_ITER(hh, s_history_extra_group_items, l_add_item, l_add_item_tmp)
sync_group_item_t * l_add_item = NULL, *l_add_item_tmp = NULL;
HASH_ITER(hh, s_sync_group_extra_items, l_add_item, l_add_item_tmp)
{
DAP_DELETE(l_add_item->group_name);
DAP_DELETE(l_add_item->group_mask);
DAP_DELETE(l_add_item->group_name_for_history);
DAP_DELETE(l_add_item);
}
s_history_group_items = NULL;
s_sync_group_items = NULL;
}
......@@ -299,19 +228,6 @@ void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group)
// read one item
dap_store_obj_t *l_store_data = dap_chain_global_db_driver_read(a_group, a_key, &l_count);
return l_store_data;
/* size_t count = 0;
if(!a_key)
return NULL;
size_t query_len = (size_t) snprintf(NULL, 0, "(&(cn=%s)(objectClass=%s))", a_key, a_group);
char *query = DAP_NEW_Z_SIZE(char, query_len + 1); //char query[32 + strlen(a_key)];
snprintf(query, query_len + 1, "(&(cn=%s)(objectClass=%s))", a_key, a_group); // objectClass != ou
lock();
dap_store_obj_t *store_data = dap_db_read_data(query, &count);
unlock();
assert(count <= 1);
DAP_DELETE(query);
return store_data;*/
}
/**
......@@ -323,18 +239,14 @@ void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group)
*/
dap_store_obj_t* dap_chain_global_db_obj_gr_get(const char *a_key, size_t *a_data_len_out, const char *a_group)
{
//uint8_t *l_ret_value = NULL;
// read several items, 0 - no limits
size_t l_data_len_out = 0;
if(a_data_len_out)
l_data_len_out = *a_data_len_out;
dap_store_obj_t *l_store_data = dap_chain_global_db_driver_read(a_group, a_key, &l_data_len_out);
if(l_store_data) {
//l_ret_value = (l_store_data->value) ? DAP_NEW_SIZE(uint8_t, l_store_data->value_len) : NULL; //ret_value = (store_data->value) ? strdup(store_data->value) : NULL;
//memcpy(l_ret_value, l_store_data->value, l_store_data->value_len);
if(a_data_len_out)
*a_data_len_out = l_data_len_out;//l_store_data->value_len;
//dap_store_obj_free(l_store_data, l_data_len_out);
*a_data_len_out = l_data_len_out;
}
return l_store_data;
}
......@@ -363,28 +275,6 @@ uint8_t * dap_chain_global_db_gr_get(const char *a_key, size_t *a_data_len_out,
dap_store_obj_free(l_store_data, l_data_len_out);
}
return l_ret_value;
/*ldb
* uint8_t *l_ret_value = NULL;
size_t l_count = 0;
if(!a_key)
return NULL;
size_t l_query_len =(size_t) snprintf(NULL, 0, "(&(cn=%s)(objectClass=%s))", a_key, a_group);
char *l_query = DAP_NEW_Z_SIZE(char, l_query_len + 1); //char query[32 + strlen(a_key)];
snprintf(l_query, l_query_len + 1, "(&(cn=%s)(objectClass=%s))", a_key, a_group); // objectClass != ou
lock();
pdap_store_obj_t store_data = dap_db_read_data(l_query, &l_count);
unlock();
if(l_count == 1 && store_data && !strcmp(store_data->key, a_key)) {
l_ret_value = (store_data->value) ? DAP_NEW_SIZE(uint8_t, store_data->value_len) : NULL; //ret_value = (store_data->value) ? strdup(store_data->value) : NULL;
memcpy(l_ret_value, store_data->value, store_data->value_len);
if(a_data_out)
*a_data_out = store_data->value_len;
}
dap_store_obj_free(store_data, l_count);
DAP_DELETE(l_query);
return l_ret_value;*/
}
uint8_t * dap_chain_global_db_get(const char *a_key, size_t *a_data_out)
......@@ -398,18 +288,20 @@ uint8_t * dap_chain_global_db_get(const char *a_key, size_t *a_data_out)
*/
static bool global_db_gr_del_add(char *a_key,const char *a_group, time_t a_timestamp)
{
dap_store_obj_t store_data;// = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(struct dap_store_obj));
dap_store_obj_t store_data;
memset(&store_data, 0, sizeof(dap_store_obj_t));
store_data.type = 'a';
store_data.key = a_key;//dap_strdup(a_key);
store_data.key = a_key;
// no data
store_data.value = NULL;
store_data.value_len = 0;
// group = parent group + '.del'
store_data.group = dap_strdup_printf("%s.del", a_group);
store_data.timestamp = a_timestamp;//time(NULL);
store_data.timestamp = a_timestamp;
lock();
int l_res = dap_chain_global_db_driver_add(&store_data, 1);
int l_res = 0;
if (!dap_chain_global_db_driver_is(store_data.group, store_data.key))
l_res = dap_chain_global_db_driver_add(&store_data, 1);
unlock();
DAP_DELETE(store_data.group);
if(l_res>=0)
......@@ -420,11 +312,11 @@ static bool global_db_gr_del_add(char *a_key,const char *a_group, time_t a_times
/**
* Delete info about the deleted entry from the base
*/
static bool global_db_gr_del_del(char *a_key,const char *a_group)
static bool global_db_gr_del_del(char *a_key, const char *a_group)
{
if(!a_key)
return NULL;
dap_store_obj_t store_data;// = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(struct dap_store_obj));
dap_store_obj_t store_data;
memset(&store_data, 0, sizeof(dap_store_obj_t));
store_data.key = a_key;
// store_data->c_key = a_key;
......@@ -456,7 +348,7 @@ time_t global_db_gr_del_get_timestamp(const char *a_group, char *a_key)
store_data.group = dap_strdup_printf("%s.del", a_group);
//store_data->c_group = a_group;
lock();
if(dap_chain_global_db_driver_is(store_data.group, store_data.key)) {
if (dap_chain_global_db_driver_is(store_data.group, store_data.key)) {
size_t l_count_out = 0;
dap_store_obj_t *l_obj = dap_chain_global_db_driver_read(store_data.group, store_data.key, &l_count_out);
assert(l_count_out <= 1);
......@@ -468,155 +360,7 @@ time_t global_db_gr_del_get_timestamp(const char *a_group, char *a_key)
return l_timestamp;
}
/**
*
*/
/**
* @brief dap_chain_global_db_gr_set
* @param a_key
* @param a_value
* @param a_value_len
* @param a_group
* @details Set one entry to base. IMPORTANT: a_key and a_value should be passed without free after (it will be released by gdb itself)
* @return
*/
bool dap_chain_global_db_gr_set(char *a_key, void *a_value, size_t a_value_len, const char *a_group)
{
dap_store_obj_t store_data;// = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(struct dap_store_obj));
memset(&store_data, 0, sizeof(dap_store_obj_t));
store_data.type = 'a';
store_data.key = a_key;//dap_strdup(a_key);
store_data.value = a_value;//DAP_NEW_Z_SIZE(uint8_t, a_value_len);
//memcpy(store_data.value, a_value, a_value_len);
store_data.value_len = (a_value_len == (size_t) -1) ? dap_strlen((const char*) a_value) : a_value_len;
store_data.group = (char*)a_group;//dap_strdup(a_group);
store_data.timestamp = time(NULL);
lock();
int l_res = dap_chain_global_db_driver_add(&store_data, 1);
unlock();
// Extract prefix if added successfuly, add history log and call notify callback if present
if(!l_res) {
// Delete info about the deleted entry from the base if one present
global_db_gr_del_del(a_key, a_group);
char * l_group_prefix = extract_group_prefix(a_group);
history_group_item_t * l_history_group_item = NULL;
if(l_group_prefix)
HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item);
if(l_history_group_item) {
if(l_history_group_item->auto_track) {
lock();
dap_db_history_add('a', &store_data, 1, l_history_group_item->group_name_for_history);
unlock();
}
if(l_history_group_item->callback_notify)
l_history_group_item->callback_notify(l_history_group_item->callback_arg, 'a', l_group_prefix, a_group,
a_key, a_value, a_value_len);
}
// looking for extra group
else {
history_extra_group_item_t * l_history_extra_group_item = NULL;
HASH_FIND_STR(s_history_extra_group_items, a_group, l_history_extra_group_item);
if(l_history_extra_group_item) {
lock();
dap_db_history_add('a', &store_data, 1, l_history_extra_group_item->group_name_for_history);
unlock();
if(l_history_extra_group_item->callback_notify)
l_history_extra_group_item->callback_notify(l_history_extra_group_item->callback_arg, 'a',
l_group_prefix,
a_group,
a_key, a_value, a_value_len);
}
}
if(l_group_prefix)
DAP_DELETE(l_group_prefix);
} else {
log_it(L_ERROR, "Save error: %d", l_res);
}
//DAP_DELETE(store_data);
return !l_res;
}
bool dap_chain_global_db_set( char *a_key, void *a_value, size_t a_value_len)
{
return dap_chain_global_db_gr_set(a_key, a_value, a_value_len, GROUP_LOCAL_GENERAL);
}
/**
* Delete entry from base
*/
bool dap_chain_global_db_gr_del(char *a_key,const char *a_group)
{
if(!a_key)
return NULL;
dap_store_obj_t store_data;// = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(struct dap_store_obj));
memset(&store_data, 0, sizeof(dap_store_obj_t));
store_data.key = a_key;
store_data.group = (char*)a_group;
lock();
int l_res = dap_chain_global_db_driver_delete(&store_data, 1);
unlock();
// do not add to history if l_res=1 (already deleted)
if(!l_res) {
// added to Del group
global_db_gr_del_add(a_key, a_group, time(NULL));
// Extract prefix
char * l_group_prefix = extract_group_prefix(a_group);
history_group_item_t * l_history_group_item = NULL;
if(l_group_prefix)
HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item);
if(l_history_group_item) {
if(l_history_group_item->auto_track) {
lock();
dap_db_history_add('d', &store_data, 1, l_history_group_item->group_name_for_history);
unlock();
}
if(l_history_group_item->callback_notify)
l_history_group_item->callback_notify(l_history_group_item->callback_arg, 'd', l_group_prefix, a_group,
a_key, NULL, 0);
}
// looking for extra group
else {
history_extra_group_item_t * l_history_extra_group_item = NULL;
HASH_FIND_STR(s_history_extra_group_items, a_group, l_history_extra_group_item);
if(l_history_extra_group_item) {
lock();
dap_db_history_add('d', &store_data, 1, l_history_extra_group_item->group_name_for_history);
unlock();
if(l_history_extra_group_item->callback_notify)
l_history_extra_group_item->callback_notify(l_history_extra_group_item->callback_arg, 'd',
l_group_prefix, a_group, a_key, NULL, 0);
}
}
if(l_group_prefix)
DAP_DELETE(l_group_prefix);
}
//DAP_DELETE(store_data);
if(l_res>=0){
// added to Del group
global_db_gr_del_add(a_key, a_group, time(NULL));
/*/ read del info
char *l_group = dap_strdup_printf("%s.del", a_group);
size_t l_data_size_out = 0;
dap_store_obj_t *l_objs = dap_chain_global_db_obj_gr_get(a_key, &l_data_size_out,l_group);
// update timestamp
if(l_objs){
if(l_objs->timestamp<time(NULL))
dap_store_obj_free(l_objs, l_data_size_out);
}
DAP_DELETE(l_group);*/
return true;
}
return false;
}
bool dap_chain_global_db_del(char *a_key)
{
return dap_chain_global_db_gr_del(a_key, GROUP_LOCAL_GENERAL);
......@@ -687,6 +431,120 @@ dap_global_db_obj_t* dap_chain_global_db_load(size_t *a_data_size_out)
{
return dap_chain_global_db_gr_load(GROUP_LOCAL_GENERAL, a_data_size_out);
}
/**
* @brief extract_group_mask
* @param a_group
* @return
*/
static sync_group_item_t *find_item_by_mask(sync_group_item_t *a_items, const char *a_group)
{
sync_group_item_t * l_item = NULL, *l_item_tmp = NULL;
HASH_ITER(hh, a_items, l_item, l_item_tmp) {
if (!dap_fnmatch(l_item->group_mask, a_group, 0))
return l_item;
}
return NULL;
}
void dap_global_db_obj_track_history(void* a_store_data)
{
if (!s_track_history)
return;
dap_store_obj_t *l_obj = (dap_store_obj_t *)a_store_data;
sync_group_item_t *l_sync_group_item = find_item_by_mask(s_sync_group_items, l_obj->group);
if(l_sync_group_item) {
lock();
dap_db_history_add((char)l_obj->type, l_obj, 1, l_sync_group_item->group_name_for_history);
unlock();
if(l_sync_group_item->callback_notify) {
if(l_obj) {
l_sync_group_item->callback_notify(l_sync_group_item->callback_arg,
(const char)l_obj->type,
l_obj->group, l_obj->key,
l_obj->value, l_obj->value_len);
}
}
} else { // looking for extra group
sync_group_item_t *l_sync_extra_group_item = find_item_by_mask(s_sync_group_extra_items, l_obj->group);
if(l_sync_extra_group_item) {
lock();
dap_db_history_add((char)l_obj->type, l_obj, 1, l_sync_extra_group_item->group_name_for_history);
unlock();
if(l_sync_extra_group_item->callback_notify)
l_sync_extra_group_item->callback_notify(l_sync_extra_group_item->callback_arg,
(const char)l_obj->type, l_obj->group, l_obj->key,
l_obj->value, l_obj->value_len);
}
}
}
/**
* @brief dap_chain_global_db_gr_set
* @param a_key
* @param a_value
* @param a_value_len
* @param a_group
* @details Set one entry to base. IMPORTANT: a_key and a_value should be passed without free after (it will be released by gdb itself)
* @return
*/
bool dap_chain_global_db_gr_set(char *a_key, void *a_value, size_t a_value_len, const char *a_group)
{
dap_store_obj_t store_data;
memset(&store_data, 0, sizeof(dap_store_obj_t));
store_data.key = a_key;
store_data.value = a_value;
store_data.value_len = (a_value_len == (size_t) -1) ? dap_strlen((const char*) a_value) : a_value_len;
store_data.group = (char*)a_group;
store_data.timestamp = time(NULL);
lock();
int l_res = dap_chain_global_db_driver_add(&store_data, 1);
unlock();
// Extract prefix if added successfuly, add history log and call notify callback if present
if(!l_res) {
// delete info about the deleted entry from the base if one present
global_db_gr_del_del(store_data.key, store_data.group);
dap_global_db_obj_track_history(&store_data);
} else {
log_it(L_ERROR, "Save error: %d", l_res);
}
return !l_res;
}
bool dap_chain_global_db_set( char *a_key, void *a_value, size_t a_value_len)
{
return dap_chain_global_db_gr_set(a_key, a_value, a_value_len, GROUP_LOCAL_GENERAL);
}
/**
* Delete entry from base
*/
bool dap_chain_global_db_gr_del(char *a_key,const char *a_group)
{
if(!a_key)
return NULL;
dap_store_obj_t store_data;
memset(&store_data, 0, sizeof(dap_store_obj_t));
store_data.key = a_key;
store_data.group = (char*)a_group;
lock();
int l_res = dap_chain_global_db_driver_delete(&store_data, 1);
unlock();
if(l_res >= 0) {
// add to Del group
global_db_gr_del_add(store_data.key, store_data.group, store_data.timestamp);
}
// do not add to history if l_res=1 (already deleted)
if (!l_res) {
dap_global_db_obj_track_history(&store_data);
}
return !l_res;
}
/**
* Write to the database from an array of data_size bytes
*
......@@ -702,67 +560,19 @@ bool dap_chain_global_db_obj_save(void* a_store_data, size_t a_objs_count)
int l_res = dap_chain_global_db_driver_appy(a_store_data, a_objs_count);
unlock();
// Extract prefix if added successfuly, add history log and call notify callback if present
if(!l_res) {
for(size_t i = 0; i < a_objs_count; i++) {
dap_store_obj_t *a_store_obj = a_store_data + i;
if(a_store_obj->type == 'a')
// delete info about the deleted entry from the base if one present
global_db_gr_del_del(a_store_obj->key, a_store_obj->group);
else if(a_store_obj->type == 'd')
// add to Del group
global_db_gr_del_add(a_store_obj->key, a_store_obj->group, a_store_obj->timestamp);
history_group_item_t * l_history_group_item = NULL;
dap_store_obj_t* l_obj = (dap_store_obj_t*)a_store_data + i;
char * l_group_prefix = extract_group_prefix(l_obj->group);
if(l_group_prefix)
HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item);
if(l_history_group_item) {
if(l_history_group_item->auto_track) {
lock();
dap_db_history_add((char)l_obj->type, l_obj, 1, l_history_group_item->group_name_for_history);
unlock();
}
if(l_history_group_item->callback_notify) {
if(l_obj) {
l_history_group_item->callback_notify(l_history_group_item->callback_arg,
(const char)l_obj->type,
l_group_prefix, l_obj->group, l_obj->key,
l_obj->value, l_obj->value_len);
} else {
break;
}
}
}
// looking for extra group
else {
history_extra_group_item_t * l_history_extra_group_item = NULL;
HASH_FIND_STR(s_history_extra_group_items, l_obj->group, l_history_extra_group_item);
if(l_history_extra_group_item) {
lock();
dap_db_history_add((char)l_obj->type, l_obj, 1, l_history_extra_group_item->group_name_for_history);
unlock();
if(l_history_extra_group_item->callback_notify)
l_history_extra_group_item->callback_notify(l_history_extra_group_item->callback_arg,
(const char)l_obj->type,
l_group_prefix, l_obj->group, l_obj->key,
l_obj->value, l_obj->value_len);
}
}
DAP_DELETE(l_group_prefix);
}
}
if(l_res >= 0) {
return true;
for(size_t i = 0; i < a_objs_count; i++) {
dap_store_obj_t *a_store_obj = (dap_store_obj_t *)a_store_data + i;
if (a_store_obj->type == 'a' && !l_res)
// delete info about the deleted entry from the base if one present
global_db_gr_del_del(a_store_obj->key, a_store_obj->group);
else if (a_store_obj->type == 'd' && l_res >= 0)
// add to Del group
global_db_gr_del_add(a_store_obj->key, a_store_obj->group, a_store_obj->timestamp);
if (!l_res)
// Extract prefix if added successfuly, add history log and call notify callback if present
dap_global_db_obj_track_history(a_store_obj);
}
return false;
return !l_res;
}
bool dap_chain_global_db_gr_save(dap_global_db_obj_t* a_objs, size_t a_objs_count, const char *a_group)
......@@ -783,39 +593,13 @@ bool dap_chain_global_db_gr_save(dap_global_db_obj_t* a_objs, size_t a_objs_coun
//log_it(L_DEBUG,"Added %u objects", a_objs_count);
int l_res = dap_chain_global_db_driver_add(l_store_data, a_objs_count);
unlock();
if(!l_res) {
if(!l_res) {
for(size_t i = 0; i < a_objs_count; i++) {
history_group_item_t * l_history_group_item = NULL;
dap_store_obj_t *l_obj = l_store_data + i;
char * l_group_prefix = extract_group_prefix(l_obj->group);
if(l_group_prefix)
HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item);
if(l_history_group_item) {
if(l_history_group_item->auto_track) {
lock();
dap_db_history_add('a', l_store_data, 1, l_history_group_item->group_name_for_history);
unlock();
}
if(l_history_group_item->callback_notify) {
if(l_obj) {
l_history_group_item->callback_notify(l_history_group_item->callback_arg, 'a',
l_group_prefix, l_obj->group, l_obj->key,
l_obj->value, l_obj->value_len);
} else {
break;
}
}
}
DAP_DELETE(l_group_prefix);
dap_global_db_obj_track_history(l_store_data + i);
}
}
DAP_DELETE(l_store_data); //dap_store_obj_free(store_data, a_objs_count);
if(!l_res) {
return true;
}
DAP_DELETE(l_store_data);
return !l_res;
}
return false;
}
......@@ -834,66 +618,3 @@ char* dap_chain_global_db_hash(const uint8_t *data, size_t data_size)
{
return dap_chain_global_db_driver_hash(data, data_size);
}
/**
* Parse data from dap_db_log_pack()
*
* return dap_store_obj_t*
*/
void* dap_db_log_unpack(const void *a_data, size_t a_data_size, size_t *a_store_obj_count)
{
const dap_store_obj_pkt_t *l_pkt = (const dap_store_obj_pkt_t*) a_data;
if (! l_pkt || ! a_data_size)
return NULL;
if( (l_pkt->data_size+ sizeof(dap_store_obj_pkt_t)) != ((size_t) a_data_size ))
return NULL;
size_t l_store_obj_count = 0;
dap_store_obj_t *l_obj = dap_store_unpacket_multiple(l_pkt, &l_store_obj_count);
if(a_store_obj_count)
*a_store_obj_count = l_store_obj_count;
return l_obj;
}
/**
* Get timestamp from dap_db_log_pack()
*/
time_t dap_db_log_unpack_get_timestamp(uint8_t *a_data, size_t a_data_size)
{
dap_store_obj_pkt_t *l_pkt = (dap_store_obj_pkt_t*) a_data;
if(!l_pkt || l_pkt->data_size != (a_data_size - sizeof(dap_store_obj_pkt_t)))
return 0;
return l_pkt->timestamp;
}
/**
* Get log diff as string
*/
char* dap_db_log_get_diff(size_t *a_data_size_out)
{
//DapList *l_group_list = dap_list_append(l_group_list,GROUP_HISTORY);
size_t l_data_size_out = 0;
dap_global_db_obj_t *l_objs = dap_chain_global_db_gr_load(GROUP_LOCAL_HISTORY, &l_data_size_out);
// make keys & val vector
char **l_keys_vals0 = DAP_NEW_SIZE(char*, sizeof(char*) * (l_data_size_out * 2 + 2));
char **l_keys_vals = l_keys_vals0 + 1;
size_t i;
// first element - number of records
l_keys_vals0[0] = dap_strdup_printf("%d", l_data_size_out);
for(i = 0; i < l_data_size_out; i++) {
dap_global_db_obj_t *l_obj_cur = l_objs + i;
l_keys_vals[i] = l_obj_cur->key;
l_keys_vals[i + l_data_size_out] = (char*) l_obj_cur->value;
}
if(a_data_size_out)
*a_data_size_out = l_data_size_out;
// last element - NULL (marker)
l_keys_vals[l_data_size_out * 2] = NULL;
char *l_keys_vals_flat = dap_strjoinv(GLOBAL_DB_HIST_KEY_SEPARATOR, l_keys_vals0);
DAP_DELETE(l_keys_vals0[0]);
DAP_DELETE(l_keys_vals0);
//dap_strfreev(l_keys_vals0);
dap_chain_global_db_objs_delete(l_objs, l_data_size_out);
return l_keys_vals_flat;
}
......@@ -198,148 +198,6 @@ void dap_store_obj_free(dap_store_obj_t *a_store_obj, size_t a_store_count)
DAP_DELETE(a_store_obj);
}
static size_t dap_db_get_size_pdap_store_obj_t(pdap_store_obj_t store_obj)
{
size_t size = sizeof(uint32_t) + 2 * sizeof(uint16_t) + sizeof(size_t) + sizeof(time_t)
+ sizeof(uint64_t) + dap_strlen(store_obj->group) +
dap_strlen(store_obj->key) + store_obj->value_len;
return size;
}
/**
* serialization
* @param a_store_obj_count count of structures store_obj
* @param a_timestamp create data time
* @param a_size_out[out] size of output structure
* @return NULL in case of an error
*/
dap_list_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp,
size_t a_store_obj_count)
{
if (!a_store_obj || a_store_obj_count < 1)
return NULL;
// calculate output structure size
dap_list_t *l_ret = NULL;
dap_store_obj_pkt_t *l_pkt;
uint32_t l_obj_count = 0, l_data_size_out = 0;
for (size_t l_q = 0; l_q < a_store_obj_count; ++l_q) {
l_data_size_out += dap_db_get_size_pdap_store_obj_t(&a_store_obj[l_q]);
if (l_data_size_out > DAP_CHAIN_PKT_EXPECT_SIZE || (l_q == a_store_obj_count - 1 && l_data_size_out)) {
l_pkt = DAP_NEW_Z_SIZE(dap_store_obj_pkt_t, sizeof(dap_store_obj_pkt_t) + l_data_size_out);
l_pkt->data_size = l_data_size_out;
l_pkt->timestamp = a_timestamp;
l_pkt->obj_count = l_q + 1 - l_obj_count;
l_ret = dap_list_append(l_ret, l_pkt);
l_data_size_out = 0;
l_obj_count = l_q + 1;
}
}
l_obj_count = 0;
for (dap_list_t *l_iter = l_ret; l_iter; l_iter = dap_list_next(l_iter)) {
l_pkt = (dap_store_obj_pkt_t *)l_iter->data;
uint64_t l_offset = 0;
for(size_t l_q = 0; l_q < l_pkt->obj_count; ++l_q) {
dap_store_obj_t obj = a_store_obj[l_obj_count + l_q];
//uint16_t section_size = (uint16_t) dap_strlen(obj.section);
uint16_t group_size = (uint16_t) dap_strlen(obj.group);
uint16_t key_size = (uint16_t) dap_strlen(obj.key);
memcpy(l_pkt->data + l_offset, &obj.type, sizeof(int));
l_offset += sizeof(int);
//memcpy(l_pkt->data + l_offset, &section_size, sizeof(uint16_t));
//l_offset += sizeof(uint16_t);
//memcpy(l_pkt->data + l_offset, obj.section, section_size);
//l_offset += section_size;
memcpy(l_pkt->data + l_offset, &group_size, sizeof(uint16_t));
l_offset += sizeof(uint16_t);
memcpy(l_pkt->data + l_offset, obj.group, group_size);
l_offset += group_size;
memcpy(l_pkt->data + l_offset, &obj.id, sizeof(uint64_t));
l_offset += sizeof(uint64_t);
memcpy(l_pkt->data + l_offset, &obj.timestamp, sizeof(time_t));
l_offset += sizeof(time_t);
memcpy(l_pkt->data + l_offset, &key_size, sizeof(uint16_t));
l_offset += sizeof(uint16_t);
memcpy(l_pkt->data + l_offset, obj.key, key_size);
l_offset += key_size;
memcpy(l_pkt->data + l_offset, &obj.value_len, sizeof(size_t));
l_offset += sizeof(size_t);
memcpy(l_pkt->data + l_offset, obj.value, obj.value_len);
l_offset += obj.value_len;
}
l_obj_count += l_pkt->obj_count;
assert(l_pkt->data_size == l_offset);
}
return l_ret;
}
/**
* deserialization
* @param store_obj_count[out] count of the output structures store_obj
* @return NULL in case of an error*
*/
dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, size_t *store_obj_count)
{
if(!pkt || pkt->data_size < 1)
return NULL;
uint64_t offset = 0;
uint32_t count = pkt->obj_count;
dap_store_obj_t *store_obj = DAP_NEW_Z_SIZE(dap_store_obj_t, count * sizeof(struct dap_store_obj));
for(size_t q = 0; q < count; ++q) {
dap_store_obj_t *obj = store_obj + q;
uint16_t str_length;
if (offset+sizeof (int)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'type' field"); break;} // Check for buffer boundries
memcpy(&obj->type, pkt->data + offset, sizeof(int));
offset += sizeof(int);
//memcpy(&str_size, pkt->data + offset, sizeof(uint16_t));
//offset += sizeof(uint16_t);
//obj->section = DAP_NEW_Z_SIZE(char, str_size + 1);
//memcpy(obj->section, pkt->data + offset, str_size);
//offset += str_size;
if (offset+sizeof (uint16_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'group_length' field"); break;} // Check for buffer boundries
memcpy(&str_length, pkt->data + offset, sizeof(uint16_t));
offset += sizeof(uint16_t);
if (offset+str_length> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'group' field"); break;} // Check for buffer boundries
obj->group = DAP_NEW_Z_SIZE(char, str_length + 1);
memcpy(obj->group, pkt->data + offset, str_length);
offset += str_length;
if (offset+sizeof (uint64_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'id' field"); break;} // Check for buffer boundries
memcpy(&obj->id, pkt->data + offset, sizeof(uint64_t));
offset += sizeof(uint64_t);
if (offset+sizeof (time_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'timestamp' field"); break;} // Check for buffer boundries
memcpy(&obj->timestamp, pkt->data + offset, sizeof(time_t));
offset += sizeof(time_t);
if (offset+sizeof (uint16_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'key_length' field"); break;} // Check for buffer boundries
memcpy(&str_length, pkt->data + offset, sizeof(uint16_t));
offset += sizeof(uint16_t);
if (offset+ str_length > pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'key' field"); break;} // Check for buffer boundries
obj->key = DAP_NEW_Z_SIZE(char, str_length + 1);
memcpy(obj->key, pkt->data + offset, str_length);
offset += str_length;
if (offset+sizeof (size_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'value_length' field"); break;} // Check for buffer boundries
memcpy(&obj->value_len, pkt->data + offset, sizeof(size_t));
offset += sizeof(size_t);
if (offset+obj->value_len> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'value' field"); break;} // Check for buffer boundries
obj->value = DAP_NEW_Z_SIZE(uint8_t, obj->value_len + 1);
memcpy(obj->value, pkt->data + offset, obj->value_len);
offset += obj->value_len;
}
//assert(pkt->data_size == offset);
if(store_obj_count)
*store_obj_count = count;
return store_obj;
}
/**
* Calc hash for data
*
......
......@@ -121,6 +121,23 @@ bool dap_cdb_get_cond_obj_iter_callback(void *arg, const char *key, int ksize, c
return true;
}
bool dap_cdb_get_count_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) {
UNUSED(ksize);
UNUSED(val);
UNUSED(vsize);
UNUSED(expire);
UNUSED(oid);
UNUSED(key);
if (dap_hex_to_uint(val, sizeof(uint64_t)) < ((pobj_arg)arg)->id) {
return true;
}
if (--((pobj_arg)arg)->q == 0) {
return false;
}
return true;
}
pcdb_instance dap_cdb_init_group(char *a_group, int a_flags) {
pcdb_instance l_cdb_i = NULL;
pthread_mutex_lock(&cdb_mutex);
......@@ -421,19 +438,23 @@ dap_store_obj_t* dap_db_driver_cdb_read_cond_store_obj(const char *a_group, uint
size_t dap_db_driver_cdb_read_count_store(const char *a_group, uint64_t a_id)
{
if(!a_group) {
if (!a_group) {
return 0;
}
pcdb_instance l_cdb_i = dap_cdb_get_db_by_group(a_group);
if(!l_cdb_i) {
if (!l_cdb_i) {
return 0;
}
CDB *l_cdb = l_cdb_i->cdb;
CDBSTAT l_cdb_stat;
cdb_stat(l_cdb, &l_cdb_stat);
if(a_id > l_cdb_stat.rnum)
return 0;
return (size_t) l_cdb_stat.rnum - a_id + 1;
obj_arg l_arg;
l_arg.q = l_cdb_stat.rnum;
l_arg.id = a_id;
void *l_iter = cdb_iterate_new(l_cdb, 0);
cdb_iterate(l_cdb, dap_cdb_get_count_iter_callback, (void*)&l_arg, l_iter);
cdb_iterate_destroy(l_cdb, l_iter);
return l_cdb_stat.rnum - l_arg.q;
}
/**
......@@ -446,11 +467,10 @@ dap_list_t* dap_db_driver_cdb_get_groups_by_mask(const char *a_group_mask)
return NULL;
cdb_instance *cur_cdb, *tmp;
pthread_rwlock_rdlock(&cdb_rwlock);
HASH_ITER(hh, s_cdb, cur_cdb, tmp)
{
if(!dap_fnmatch(a_group_mask, cur_cdb->local_group, 0))
if(dap_fnmatch("*.del", cur_cdb->local_group, 0))
l_ret_list = dap_list_prepend(l_ret_list, dap_strdup(cur_cdb->local_group));
HASH_ITER(hh, s_cdb, cur_cdb, tmp) {
char *l_table_name = cur_cdb->local_group;
if(!dap_fnmatch(a_group_mask, l_table_name, 0))
l_ret_list = dap_list_prepend(l_ret_list, dap_strdup(l_table_name));
}
pthread_rwlock_unlock(&cdb_rwlock);
return l_ret_list;
......
......@@ -36,6 +36,7 @@
#include "dap_hash.h"
#include "dap_file_utils.h"
#include "dap_strfuncs.h"
#include "dap_file_utils.h"
#include "dap_chain_global_db_driver_sqlite.h"
#define LOG_TAG "db_sqlite"
......@@ -90,6 +91,7 @@ int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks
log_it(L_ERROR, "Can't init sqlite err=%d (%s)", l_ret, sqlite3_errstr(l_ret));
return -2;
}
// Check paths and create them if nessesary
char * l_filename_dir = dap_path_get_dirname(a_filename_db);
if(!dap_dir_test(l_filename_dir)){
log_it(L_NOTICE, "No directory %s, trying to create...",l_filename_dir);
......@@ -106,7 +108,7 @@ int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks
log_it(L_NOTICE,"Directory created");
}
DAP_DEL_Z(l_filename_dir);
// Open Sqlite file, create if nessesary
char *l_error_message = NULL;
s_db = dap_db_driver_sqlite_open(a_filename_db, SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, &l_error_message);
if(!s_db) {
......@@ -131,6 +133,9 @@ int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks
a_drv_callback->read_last_store_obj = dap_db_driver_sqlite_read_last_store_obj;
a_drv_callback->transaction_start = dap_db_driver_sqlite_start_transaction;
a_drv_callback->transaction_end = dap_db_driver_sqlite_end_transaction;
a_drv_callback->get_groups_by_mask = dap_db_driver_sqlite_get_groups_by_mask;
a_drv_callback->read_count_store = dap_db_driver_sqlite_read_count_store;
a_drv_callback->is_obj = dap_db_driver_sqlite_is_obj;
a_drv_callback->deinit = dap_db_driver_sqlite_deinit;
a_drv_callback->flush = dap_db_driver_sqlite_flush;
s_filename_db = strdup(a_filename_db);
......@@ -415,7 +420,6 @@ static int dap_db_driver_sqlite_fetch_array(sqlite3_stmt *l_res, SQLITE_ROW_VALU
if(cur_val->type == SQLITE_INTEGER)
{
cur_val->val.val_int64 = sqlite3_column_int64(l_res, l_iCol);
cur_val->val.val_int = sqlite3_column_int(l_res, l_iCol);
}
else if(cur_val->type == SQLITE_FLOAT)
cur_val->val.val_float = sqlite3_column_double(l_res, l_iCol);
......@@ -518,6 +522,22 @@ int dap_db_driver_sqlite_end_transaction(void)
}
}
char *dap_db_driver_sqlite_make_group_name(const char *a_table_name)
{
char *l_table_name = dap_strdup(a_table_name);
ssize_t l_table_name_len = (ssize_t)dap_strlen(l_table_name);
const char *l_needle = "_";
// replace '_' to '.'
while(1){
char *l_str = dap_strstr_len(l_table_name, l_table_name_len, l_needle);
if(l_str)
*l_str = '.';
else
break;
}
return l_table_name;
}
char *dap_db_driver_sqlite_make_table_name(const char *a_group_name)
{
char *l_group_name = dap_strdup(a_group_name);
......@@ -726,10 +746,10 @@ dap_store_obj_t* dap_db_driver_sqlite_read_cond_store_obj(const char *a_group, u
l_count_out = (int)*a_count_out;
char *l_str_query;
if(l_count_out)
l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE id>'%lld' ORDER BY id ASC LIMIT %d",
l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE id>='%lld' ORDER BY id ASC LIMIT %d",
l_table_name, a_id, l_count_out);
else
l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE id>'%lld' ORDER BY id ASC",
l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE id>='%lld' ORDER BY id ASC",
l_table_name, a_id);
pthread_rwlock_wrlock(&s_db_rwlock);
if(!s_db){
......@@ -790,12 +810,10 @@ dap_store_obj_t* dap_db_driver_sqlite_read_cond_store_obj(const char *a_group, u
*/
dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out)
{
if(!a_group || !s_db)
return NULL;
dap_store_obj_t *l_obj = NULL;
char *l_error_message = NULL;
sqlite3_stmt *l_res;
if(!a_group)
return NULL;
char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group);
// no limit
uint64_t l_count_out = 0;
......@@ -818,18 +836,13 @@ dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const
l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' ORDER BY id ASC", l_table_name);
}
pthread_rwlock_wrlock(&s_db_rwlock);
if(!s_db){
pthread_rwlock_unlock(&s_db_rwlock);
return NULL;
}
int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, &l_error_message);
int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, NULL);
pthread_rwlock_unlock(&s_db_rwlock);
sqlite3_free(l_str_query);
DAP_DEL_Z(l_table_name);
if(l_ret != SQLITE_OK) {
//log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
dap_db_driver_sqlite_free(l_error_message);
return NULL;
}
......@@ -864,3 +877,85 @@ dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const
*a_count_out = l_count_out;
return l_obj;
}
dap_list_t* dap_db_driver_sqlite_get_groups_by_mask(const char *a_group_mask)
{
if(!a_group_mask || !s_db)
return NULL;
sqlite3_stmt *l_res;
const char *l_str_query = "SELECT name FROM sqlite_master WHERE type ='table' AND name NOT LIKE 'sqlite_%'";
dap_list_t *l_ret_list = NULL;
pthread_rwlock_wrlock(&s_db_rwlock);
int l_ret = dap_db_driver_sqlite_query(s_db, (char *)l_str_query, &l_res, NULL);
pthread_rwlock_unlock(&s_db_rwlock);
if(l_ret != SQLITE_OK) {
//log_it(L_ERROR, "Get tables l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
return NULL;
}
char * l_mask = dap_db_driver_sqlite_make_table_name(a_group_mask);
SQLITE_ROW_VALUE *l_row = NULL;
while (dap_db_driver_sqlite_fetch_array(l_res, &l_row) == SQLITE_ROW && l_row) {
char *l_table_name = (char *)l_row->val->val.val_str;
if(!dap_fnmatch(l_mask, l_table_name, 0))
l_ret_list = dap_list_prepend(l_ret_list, dap_db_driver_sqlite_make_group_name(l_table_name));
dap_db_driver_sqlite_row_free(l_row);
}
dap_db_driver_sqlite_query_free(l_res);
return l_ret_list;
}
size_t dap_db_driver_sqlite_read_count_store(const char *a_group, uint64_t a_id)
{
sqlite3_stmt *l_res;
if(!a_group || ! s_db)
return 0;
char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group);
char *l_str_query = sqlite3_mprintf("SELECT COUNT(*) FROM '%s' WHERE id>='%lld'", l_table_name, a_id);
pthread_rwlock_wrlock(&s_db_rwlock);
int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, NULL);
pthread_rwlock_unlock(&s_db_rwlock);
sqlite3_free(l_str_query);
DAP_DEL_Z(l_table_name);
if(l_ret != SQLITE_OK) {
//log_it(L_ERROR, "Count l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
return 0;
}
size_t l_ret_val;
SQLITE_ROW_VALUE *l_row = NULL;
if (dap_db_driver_sqlite_fetch_array(l_res, &l_row) == SQLITE_ROW && l_row) {
l_ret_val = (size_t)l_row->val->val.val_int64;
dap_db_driver_sqlite_row_free(l_row);
}
dap_db_driver_sqlite_query_free(l_res);
return l_ret_val;
}
bool dap_db_driver_sqlite_is_obj(const char *a_group, const char *a_key)
{
sqlite3_stmt *l_res;
if(!a_group || ! s_db)
return false;
char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group);
char *l_str_query = sqlite3_mprintf("SELECT EXISTS(SELECT * FROM '%s' WHERE key='%s')", l_table_name, a_key);
pthread_rwlock_wrlock(&s_db_rwlock);
int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, NULL);
pthread_rwlock_unlock(&s_db_rwlock);
sqlite3_free(l_str_query);
DAP_DEL_Z(l_table_name);
if(l_ret != SQLITE_OK) {
//log_it(L_ERROR, "Exists l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
return false;
}
bool l_ret_val;
SQLITE_ROW_VALUE *l_row = NULL;
if (dap_db_driver_sqlite_fetch_array(l_res, &l_row) == SQLITE_ROW && l_row) {
l_ret_val = (size_t)l_row->val->val.val_int64;
dap_db_driver_sqlite_row_free(l_row);
}
dap_db_driver_sqlite_query_free(l_res);
return l_ret_val;
}
......@@ -90,7 +90,6 @@ uint64_t dap_db_get_cur_node_addr(char *a_net_name)
time_t l_dt = time(NULL) - l_node_time;
//NODE_TIME_EXPIRED
if(l_node_time && l_dt > addr_time_expired) {
//log_it(L_NOTICE, "Node 0x%016X set last synced timestamp %"DAP_UINT64_FORMAT_U"", a_id);
l_node_addr_ret = 0;
}
DAP_DELETE(l_key);
......@@ -103,31 +102,33 @@ uint64_t dap_db_get_cur_node_addr(char *a_net_name)
/**
* Set last id for remote node
*/
bool dap_db_set_last_id_remote(uint64_t a_node_addr, uint64_t a_id)
bool dap_db_set_last_id_remote(uint64_t a_node_addr, uint64_t a_id, char *a_group)
{
//log_it( L_DEBUG, "Node 0x%016X set last synced timestamp %"DAP_UINT64_FORMAT_U"", a_node_addr, a_id);
uint64_t *l_id = DAP_NEW(uint64_t);
*l_id = a_id;
return dap_chain_global_db_gr_set(dap_strdup_printf("%ju", a_node_addr),
l_id, sizeof(uint64_t),
GROUP_LOCAL_NODE_LAST_ID);
//log_it( L_DEBUG, "Node 0x%016X set last synced id %"DAP_UINT64_FORMAT_u"", a_node_addr, a_id);
char *l_node_addr_str = dap_strdup_printf("%ju%s", a_node_addr, a_group);
bool l_ret = dap_chain_global_db_gr_set(l_node_addr_str, &a_id, sizeof(uint64_t),
GROUP_LOCAL_NODE_LAST_ID);
DAP_DELETE(l_node_addr_str);
return l_ret;
}
/**
* Get last id for remote node
*/
uint64_t dap_db_get_last_id_remote(uint64_t a_node_addr)
uint64_t dap_db_get_last_id_remote(uint64_t a_node_addr, char *a_group)
{
char *l_node_addr_str = dap_strdup_printf("%ju", a_node_addr);
size_t l_timestamp_len = 0;
uint8_t *l_timestamp = dap_chain_global_db_gr_get((const char*) l_node_addr_str, &l_timestamp_len,
GROUP_LOCAL_NODE_LAST_ID);
uint64_t l_ret_timestamp = 0;
if(l_timestamp && l_timestamp_len == sizeof(uint64_t))
memcpy(&l_ret_timestamp, l_timestamp, l_timestamp_len);
char *l_node_addr_str = dap_strdup_printf("%ju%s", a_node_addr, a_group);
size_t l_id_len = 0;
uint8_t *l_id = dap_chain_global_db_gr_get((const char*) l_node_addr_str, &l_id_len,
GROUP_LOCAL_NODE_LAST_ID);
uint64_t l_ret_id = 0;
if (l_id) {
if (l_id_len == sizeof(uint64_t))
memcpy(&l_ret_id, l_id, l_id_len);
DAP_DELETE(l_id);
}
DAP_DELETE(l_node_addr_str);
DAP_DELETE(l_timestamp);
return l_ret_timestamp;
return l_ret_id;
}
/**
......@@ -152,3 +153,166 @@ dap_chain_hash_fast_t *dap_db_get_last_hash_remote(uint64_t a_node_addr, dap_cha
DAP_DELETE(l_node_chain_str);
return (dap_chain_hash_fast_t *)l_hash;
}
static size_t dap_db_get_size_pdap_store_obj_t(pdap_store_obj_t store_obj)
{
size_t size = sizeof(uint32_t) + 2 * sizeof(uint16_t) + sizeof(time_t)
+ 2 * sizeof(uint64_t) + dap_strlen(store_obj->group) +
dap_strlen(store_obj->key) + store_obj->value_len;
return size;
}
/**
* serialization
* @param a_old_pkt an object for multiplexation
* @param a_new_pkt an object for multiplexation
* @return NULL in case of an error
*/
dap_store_obj_pkt_t *dap_store_packet_multiple(dap_store_obj_pkt_t *a_old_pkt, dap_store_obj_pkt_t *a_new_pkt)
{
if (!a_new_pkt)
return a_old_pkt;
if (a_old_pkt)
a_old_pkt = (dap_store_obj_pkt_t *)DAP_REALLOC(a_old_pkt,
a_old_pkt->data_size + a_new_pkt->data_size + sizeof(dap_store_obj_pkt_t));
else
a_old_pkt = DAP_NEW_Z_SIZE(dap_store_obj_pkt_t, a_new_pkt->data_size + sizeof(dap_store_obj_pkt_t));
memcpy(a_old_pkt->data + a_old_pkt->data_size, a_new_pkt->data, a_new_pkt->data_size);
a_old_pkt->data_size += a_new_pkt->data_size;
a_old_pkt->obj_count++;
return a_old_pkt;
}
char *dap_store_packet_get_group(dap_store_obj_pkt_t *a_pkt)
{
uint16_t l_gr_len;
memcpy(&l_gr_len, a_pkt->data + sizeof(uint32_t), sizeof(uint16_t));
char *l_ret_str = DAP_NEW_SIZE(char, l_gr_len + 1);
size_t l_gr_offset = sizeof(uint32_t) + sizeof(uint16_t);
memcpy(l_ret_str, a_pkt->data + l_gr_offset, l_gr_len);
l_ret_str[l_gr_len] = '\0';
return l_ret_str;
}
uint64_t dap_store_packet_get_id(dap_store_obj_pkt_t *a_pkt)
{
uint16_t l_gr_len;
memcpy(&l_gr_len, a_pkt->data + sizeof(uint32_t), sizeof(uint16_t));
size_t l_id_offset = sizeof(uint32_t) + sizeof(uint16_t) + l_gr_len;
uint64_t l_ret_id;
memcpy(&l_ret_id, a_pkt->data + l_id_offset, sizeof(uint64_t));
return l_ret_id;
}
void dap_store_packet_change_id(dap_store_obj_pkt_t *a_pkt, uint64_t a_id)
{
uint16_t l_gr_len;
memcpy(&l_gr_len, a_pkt->data + sizeof(uint32_t), sizeof(uint16_t));
size_t l_id_offset = sizeof(uint32_t) + sizeof(uint16_t) + l_gr_len;
memcpy(a_pkt->data + l_id_offset, &a_id, sizeof(uint64_t));
}
/**
* serialization
* @param a_store_obj an object for serialization
* @return NULL in case of an error
*/
dap_store_obj_pkt_t *dap_store_packet_single(pdap_store_obj_t a_store_obj)
{
if (!a_store_obj)
return NULL;
uint32_t l_data_size_out = dap_db_get_size_pdap_store_obj_t(a_store_obj);
dap_store_obj_pkt_t *l_pkt = DAP_NEW_SIZE(dap_store_obj_pkt_t, l_data_size_out + sizeof(dap_store_obj_pkt_t));
l_pkt->data_size = l_data_size_out;
l_pkt->obj_count = 1;
l_pkt->timestamp = 0;
uint32_t l_type = a_store_obj->type;
memcpy(l_pkt->data, &l_type, sizeof(uint32_t));
uint64_t l_offset = sizeof(uint32_t);
uint16_t group_size = (uint16_t) dap_strlen(a_store_obj->group);
memcpy(l_pkt->data + l_offset, &group_size, sizeof(uint16_t));
l_offset += sizeof(uint16_t);
memcpy(l_pkt->data + l_offset, a_store_obj->group, group_size);
l_offset += group_size;
memcpy(l_pkt->data + l_offset, &a_store_obj->id, sizeof(uint64_t));
l_offset += sizeof(uint64_t);
memcpy(l_pkt->data + l_offset, &a_store_obj->timestamp, sizeof(time_t));
l_offset += sizeof(time_t);
uint16_t key_size = (uint16_t) dap_strlen(a_store_obj->key);
memcpy(l_pkt->data + l_offset, &key_size, sizeof(uint16_t));
l_offset += sizeof(uint16_t);
memcpy(l_pkt->data + l_offset, a_store_obj->key, key_size);
l_offset += key_size;
memcpy(l_pkt->data + l_offset, &a_store_obj->value_len, sizeof(uint64_t));
l_offset += sizeof(uint64_t);
memcpy(l_pkt->data + l_offset, a_store_obj->value, a_store_obj->value_len);
l_offset += a_store_obj->value_len;
assert(l_offset == l_data_size_out);
return l_pkt;
}
/**
* deserialization
* @param store_obj_count[out] count of the output structures store_obj
* @return NULL in case of an error*
*/
dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, size_t *store_obj_count)
{
if(!pkt || pkt->data_size < 1)
return NULL;
uint64_t offset = 0;
uint32_t count = pkt->obj_count;
dap_store_obj_t *store_obj = DAP_NEW_SIZE(dap_store_obj_t, count * sizeof(struct dap_store_obj));
for(size_t q = 0; q < count; ++q) {
dap_store_obj_t *obj = store_obj + q;
uint16_t str_length;
uint32_t l_type;
if (offset+sizeof (uint32_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'type' field"); break;} // Check for buffer boundries
memcpy(&l_type, pkt->data + offset, sizeof(uint32_t));
obj->type = l_type;
offset += sizeof(uint32_t);
if (offset+sizeof (uint16_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'group_length' field"); break;} // Check for buffer boundries
memcpy(&str_length, pkt->data + offset, sizeof(uint16_t));
offset += sizeof(uint16_t);
if (offset+str_length> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'group' field"); break;} // Check for buffer boundries
obj->group = DAP_NEW_SIZE(char, str_length + 1);
memcpy(obj->group, pkt->data + offset, str_length);
obj->group[str_length] = '\0';
offset += str_length;
if (offset+sizeof (uint64_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'id' field"); break;} // Check for buffer boundries
memcpy(&obj->id, pkt->data + offset, sizeof(uint64_t));
offset += sizeof(uint64_t);
if (offset+sizeof (time_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'timestamp' field"); break;} // Check for buffer boundries
memcpy(&obj->timestamp, pkt->data + offset, sizeof(time_t));
offset += sizeof(time_t);
if (offset+sizeof (uint16_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'key_length' field"); break;} // Check for buffer boundries
memcpy(&str_length, pkt->data + offset, sizeof(uint16_t));
offset += sizeof(uint16_t);
if (offset+ str_length > pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'key' field"); break;} // Check for buffer boundries
obj->key = DAP_NEW_SIZE(char, str_length + 1);
memcpy(obj->key, pkt->data + offset, str_length);
obj->key[str_length] = '\0';
offset += str_length;
if (offset+sizeof (uint64_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'value_length' field"); break;} // Check for buffer boundries
memcpy(&obj->value_len, pkt->data + offset, sizeof(uint64_t));
offset += sizeof(uint64_t);
if (offset+obj->value_len> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'value' field"); break;} // Check for buffer boundries
obj->value = DAP_NEW_SIZE(uint8_t, obj->value_len);
memcpy(obj->value, pkt->data + offset, obj->value_len);
offset += obj->value_len;
}
//assert(pkt->data_size == offset);
if(store_obj_count)
*store_obj_count = count;
return store_obj;
}
......@@ -23,9 +23,8 @@ typedef struct dap_global_db_obj {
size_t value_len;
}DAP_ALIGN_PACKED dap_global_db_obj_t, *pdap_global_db_obj_t;
typedef void (*dap_global_db_obj_callback_notify_t) (void * a_arg, const char a_op_code, const char * a_prefix, const char * a_group,
const char * a_key, const void * a_value,
const size_t a_value_len);
typedef void (*dap_global_db_obj_callback_notify_t) (void * a_arg, const char a_op_code, const char * a_group,
const char * a_key, const void * a_value, const size_t a_value_len);
/**
* Flush DB
......@@ -49,22 +48,15 @@ void dap_chain_global_db_objs_delete(dap_global_db_obj_t *objs, size_t a_count);
int dap_chain_global_db_init(dap_config_t * a_config);
void dap_chain_global_db_deinit(void);
/*
* Get history group by group name
*/
char* dap_chain_global_db_get_history_group_by_group_name(const char * a_group_name);
/**
* Setup callbacks and filters
*/
// Add group prefix that will be tracking all changes
void dap_chain_global_db_add_history_group_prefix(const char * a_group_prefix, const char * a_group_name_for_history);
void dap_chain_global_db_add_history_callback_notify(const char * a_group_prefix,
dap_global_db_obj_callback_notify_t a_callback, void * a_arg);
const char* dap_chain_global_db_add_history_extra_group(const char * a_group_name, dap_chain_node_addr_t *a_nodes, uint16_t *a_nodes_count);
void dap_chain_global_db_add_history_extra_group_callback_notify(const char * a_group_prefix,
dap_global_db_obj_callback_notify_t a_callback, void * a_arg);
// Add group name that will be synchronized
void dap_chain_global_db_add_sync_group(const char *a_group_prefix, dap_global_db_obj_callback_notify_t a_callback, void *a_arg);
void dap_chain_global_db_add_sync_extra_group(const char *a_group_mask, dap_global_db_obj_callback_notify_t a_callback, void *a_arg);
dap_list_t *dap_chain_db_get_sync_groups();
dap_list_t *dap_chain_db_get_sync_extra_groups();
void dap_global_db_obj_track_history(void* a_store_data);
/**
* Get entry from base
*/
......@@ -116,26 +108,3 @@ bool dap_chain_global_db_save(dap_global_db_obj_t* a_objs, size_t a_objs_count);
*/
char* dap_chain_global_db_hash(const uint8_t *data, size_t data_size);
char* dap_chain_global_db_hash_fast(const uint8_t *data, size_t data_size);
// Get data according the history log
dap_list_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out);
// Get data according the history log
//char* dap_db_history_tx(dap_chain_hash_fast_t * a_tx_hash, const char *a_group_mempool);
//char* dap_db_history_addr(dap_chain_addr_t * a_addr, const char *a_group_mempool);
//char* dap_db_history(dap_chain_addr_t * a_addr, const char *a_group_mempool);
// Parse data from dap_db_log_pack()
void* dap_db_log_unpack(const void *a_data, size_t a_data_size, size_t *a_store_obj_count);
// Get timestamp from dap_db_log_pack()
//time_t dap_db_log_unpack_get_timestamp(uint8_t *a_data, size_t a_data_size);
// Get last id in log
uint64_t dap_db_log_get_group_history_last_id(const char *a_history_group_name);
uint64_t dap_db_log_get_last_id(void);
// Get log diff as list
dap_list_t* dap_db_log_get_list(uint64_t first_id);
// Free list getting from dap_db_log_get_list()
void dap_db_log_del_list(dap_list_t *a_list);
// Get log diff as string
char* dap_db_log_get_diff(size_t *a_data_size_out);
......@@ -29,15 +29,12 @@
#include "dap_common.h"
#include "dap_list.h"
#define DAP_CHAIN_PKT_EXPECT_SIZE 7168
typedef struct dap_store_obj {
uint64_t id;
time_t timestamp;
uint8_t type;
char *group;
char *key;
//const char *c_group;
const char *c_key;
uint8_t *value;
size_t value_len;
......@@ -92,8 +89,3 @@ dap_store_obj_t* dap_chain_global_db_driver_read(const char *a_group, const char
bool dap_chain_global_db_driver_is(const char *a_group, const char *a_key);
size_t dap_chain_global_db_driver_count(const char *a_group, uint64_t id);
dap_list_t* dap_chain_global_db_driver_get_groups_by_mask(const char *a_group_mask);
dap_list_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj,
time_t a_timestamp, size_t a_store_obj_count);
dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *a_pkt,
size_t *a_store_obj_count);
......@@ -50,3 +50,6 @@ int dap_db_driver_sqlite_apply_store_obj(dap_store_obj_t *a_store_obj);
dap_store_obj_t* dap_db_driver_sqlite_read_last_store_obj(const char *a_group);
dap_store_obj_t* dap_db_driver_sqlite_read_cond_store_obj(const char *a_group, uint64_t a_id, size_t *a_count_out);
dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out);
dap_list_t* dap_db_driver_sqlite_get_groups_by_mask(const char *a_group_mask);
size_t dap_db_driver_sqlite_read_count_store(const char *a_group, uint64_t a_id);
bool dap_db_driver_sqlite_is_obj(const char *a_group, const char *a_key);
......@@ -8,6 +8,9 @@
#define GLOBAL_DB_HIST_REC_SEPARATOR "\r;"
#define GLOBAL_DB_HIST_KEY_SEPARATOR "\a;"
#define F_DB_LOG_ADD_EXTRA_GROUPS 1
#define F_DB_LOG_SYNC_FROM_ZERO 2
typedef struct dap_global_db_hist {
char type;// 'a' add or 'd' delete
const char *group;
......@@ -18,34 +21,36 @@ typedef struct dap_global_db_hist {
//Add data to the history log
bool dap_db_history_add(char a_type, pdap_store_obj_t a_store_obj, size_t a_dap_store_count, const char *a_group);
// Truncate the history log
bool dap_db_history_truncate(void);
// for dap_db_log_list_xxx()
typedef struct dap_db_log_list_group {
char *name;
uint64_t last_id_synced;
uint64_t count;
} dap_db_log_list_group_t;
typedef struct dap_db_log_list_obj {
dap_store_obj_pkt_t *pkt;
dap_hash_fast_t hash;
} dap_db_log_list_obj_t;
// for dap_db_log_list_xxx()
typedef struct dap_db_log_list {
dap_list_t *list_write; // writed list
dap_list_t *list_read; // readed list (inside list_write)
bool is_process;
size_t item_start; // first item to read from db
size_t item_last; // last item to read from db
size_t items_rest; // rest items to read from list_read
size_t items_number_main;
size_t items_number_add;
size_t items_number; // remaining items in list_write after reading from db
char **group_names;
int64_t group_number; // number of group
int64_t group_cur; // current group number, -1 for the main group, 0 ... group_count for the additional group
size_t *group_number_items; // number of items for each group
uint64_t *group_last_id;
dap_list_t *add_groups; // additional group for sync
size_t items_number; // total items in list_write after reading from db
dap_list_t *groups;
pthread_t thread;
pthread_mutex_t list_mutex;
} dap_db_log_list_t;
dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_groups);
dap_db_log_list_t* dap_db_log_list_start(dap_chain_node_addr_t a_addr, int flags);
size_t dap_db_log_list_get_count(dap_db_log_list_t *a_db_log_list);
size_t dap_db_log_list_get_count_rest(dap_db_log_list_t *a_db_log_list);
dap_global_db_obj_t* dap_db_log_list_get(dap_db_log_list_t *a_db_log_list);
dap_db_log_list_obj_t *dap_db_log_list_get(dap_db_log_list_t *a_db_log_list);
void dap_db_log_list_delete(dap_db_log_list_t *a_db_log_list);
// Get last id in log
uint64_t dap_db_log_get_group_last_id(const char *a_group_name);
uint64_t dap_db_log_get_last_id(void);
......@@ -4,16 +4,24 @@
#include <time.h>
#include "dap_chain.h"
#include "dap_chain_common.h"
#include "dap_chain_global_db_driver.h"
// Set addr for current node
bool dap_db_set_cur_node_addr(uint64_t a_address, char *a_net_name);
bool dap_db_set_cur_node_addr_exp(uint64_t a_address, char *a_net_name );
uint64_t dap_db_get_cur_node_addr(char *a_net_name);
// Set last id for remote node
bool dap_db_set_last_id_remote(uint64_t a_node_addr, uint64_t a_id);
bool dap_db_set_last_id_remote(uint64_t a_node_addr, uint64_t a_id, char *a_group);
// Get last id for remote node
uint64_t dap_db_get_last_id_remote(uint64_t a_node_addr);
uint64_t dap_db_get_last_id_remote(uint64_t a_node_addr, char *a_group);
// Set last hash for chain for remote node
bool dap_db_set_last_hash_remote(uint64_t a_node_addr, dap_chain_t *a_chain, dap_chain_hash_fast_t *a_hash);
// Get last hash for chain for remote node
dap_chain_hash_fast_t *dap_db_get_last_hash_remote(uint64_t a_node_addr, dap_chain_t *a_chain);
dap_store_obj_pkt_t *dap_store_packet_single(pdap_store_obj_t a_store_obj);
dap_store_obj_pkt_t *dap_store_packet_multiple(dap_store_obj_pkt_t *a_old_pkt, dap_store_obj_pkt_t *a_new_pkt);
dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *a_pkt, size_t *a_store_obj_count);
char *dap_store_packet_get_group(dap_store_obj_pkt_t *a_pkt);
uint64_t dap_store_packet_get_id(dap_store_obj_pkt_t *a_pkt);
void dap_store_packet_change_id(dap_store_obj_pkt_t *a_pkt, uint64_t a_id);
......@@ -37,7 +37,7 @@ endif()
add_library(${PROJECT_NAME} STATIC ${DAP_CHAIN_NET_SRCS} ${DAP_CHAIN_NET_HEADERS} ${IPUTILS_SRCS} ${IPUTILS_HEADERS})
if(WIN32)
target_link_libraries(dap_core dap_chain_net dap_crypto dap_client dap_server_core dap_stream_ch_chain dap_stream_ch_chain_net dap_stream_ch_chain_net_srv dap_chain dap_chain_wallet dap_chain_net_srv
target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_client dap_server_core dap_notify_srv dap_stream_ch_chain dap_stream_ch_chain_net dap_stream_ch_chain_net_srv dap_chain dap_chain_wallet dap_chain_net_srv
dap_chain_mempool dap_chain_global_db dap_chain_cs_none)
endif()
......