Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/libdap-chain-global-db
1 result
Show changes
Commits on Source (2)
......@@ -3,7 +3,7 @@
* Alexander Lysikov <alexander.lysikov@demlabs.net>
* DeM Labs Inc. https://demlabs.net
* Kelvin Project https://github.com/kelvinblockchain
* Copyright (c) 2019
* Copyright (c) 2019-2020
* All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
......@@ -139,6 +139,8 @@ void dap_chain_global_db_add_history_callback_notify(const char * a_group_prefix
*/
void dap_chain_global_db_obj_clean(dap_global_db_obj_t *obj)
{
if(!obj)
return;
DAP_DELETE(obj->key);
DAP_DELETE(obj->value);
obj->key = NULL;
......@@ -244,6 +246,31 @@ void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group)
return store_data;*/
}
/**
* @brief dap_chain_global_db_obj_gr_get
* @param a_key
* @param a_data_out
* @param a_group
* @return
*/
dap_store_obj_t* dap_chain_global_db_obj_gr_get(const char *a_key, size_t *a_data_len_out, const char *a_group)
{
uint8_t *l_ret_value = NULL;
// read several items, 0 - no limits
size_t l_data_len_out = 0;
if(a_data_len_out)
l_data_len_out = *a_data_len_out;
dap_store_obj_t *l_store_data = dap_chain_global_db_driver_read(a_group, a_key, &l_data_len_out);
if(l_store_data) {
l_ret_value = (l_store_data->value) ? DAP_NEW_SIZE(uint8_t, l_store_data->value_len) : NULL; //ret_value = (store_data->value) ? strdup(store_data->value) : NULL;
memcpy(l_ret_value, l_store_data->value, l_store_data->value_len);
if(a_data_len_out)
*a_data_len_out = l_store_data->value_len;
//dap_store_obj_free(l_store_data, l_data_len_out);
}
return l_store_data;
}
/**
* @brief dap_chain_global_db_gr_get
* @param a_key
......@@ -296,6 +323,82 @@ uint8_t * dap_chain_global_db_get(const char *a_key, size_t *a_data_out)
return dap_chain_global_db_gr_get(a_key, a_data_out, GROUP_LOCAL_GENERAL);
}
/**
* Add info about the deleted entry to the base
*/
static bool global_db_gr_del_add(char *a_key,const char *a_group, time_t a_timestamp)
{
dap_store_obj_t store_data;// = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(struct dap_store_obj));
memset(&store_data, 0, sizeof(dap_store_obj_t));
store_data.type = 'a';
store_data.key = a_key;//dap_strdup(a_key);
// no data
store_data.value = NULL;
store_data.value_len = 0;
// group = parent group + '.del'
store_data.group = dap_strdup_printf("%s.del", a_group);
store_data.timestamp = a_timestamp;//time(NULL);
lock();
int l_res = dap_chain_global_db_driver_add(&store_data, 1);
unlock();
DAP_DELETE(store_data.group);
if(l_res>=0)
return true;
return false;
}
/**
* Delete info about the deleted entry from the base
*/
static bool global_db_gr_del_del(char *a_key,const char *a_group)
{
if(!a_key)
return NULL;
dap_store_obj_t store_data;// = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(struct dap_store_obj));
memset(&store_data, 0, sizeof(dap_store_obj_t));
store_data.key = a_key;
// store_data->c_key = a_key;
store_data.group = dap_strdup_printf("%s.del", a_group);
//store_data->c_group = a_group;
lock();
int l_res = 0;
if(dap_chain_global_db_driver_is(store_data.group, store_data.key))
l_res = dap_chain_global_db_driver_delete(&store_data, 1);
unlock();
DAP_DELETE(store_data.group);
if(l_res>=0)
return true;
return false;
}
/**
* Get timestamp of the deleted entry
*/
time_t global_db_gr_del_get_timestamp(const char *a_group, char *a_key)
{
time_t l_timestamp = 0;
if(!a_key)
return l_timestamp;
dap_store_obj_t store_data;
memset(&store_data, 0, sizeof(dap_store_obj_t));
store_data.key = a_key;
// store_data->c_key = a_key;
store_data.group = dap_strdup_printf("%s.del", a_group);
//store_data->c_group = a_group;
lock();
if(dap_chain_global_db_driver_is(store_data.group, store_data.key)) {
size_t l_count_out = 0;
dap_store_obj_t *l_obj = dap_chain_global_db_driver_read(store_data.group, store_data.key, &l_count_out);
assert(l_count_out <= 1);
l_timestamp = l_obj->timestamp;
dap_store_obj_free(l_obj, l_count_out);
}
unlock();
DAP_DELETE(store_data.group);
return l_timestamp;
}
/**
*
*/
......@@ -328,6 +431,9 @@ bool dap_chain_global_db_gr_set(char *a_key, void *a_value, size_t a_value_len,
// Extract prefix if added successfuly, add history log and call notify callback if present
if(!l_res) {
// Delete info about the deleted entry from the base if one present
global_db_gr_del_del(a_key, a_group);
char * l_group_prefix = extract_group_prefix(a_group);
history_group_item_t * l_history_group_item = NULL;
if(l_group_prefix)
......@@ -357,6 +463,7 @@ bool dap_chain_global_db_set( char *a_key, void *a_value, size_t a_value_len)
{
return dap_chain_global_db_gr_set(a_key, a_value, a_value_len, GROUP_LOCAL_GENERAL);
}
/**
* Delete entry from base
*/
......@@ -368,11 +475,14 @@ bool dap_chain_global_db_gr_del(char *a_key,const char *a_group)
store_data->key = a_key;
// store_data->c_key = a_key;
store_data->group = dap_strdup(a_group);
store_data->c_group = a_group;
//store_data->c_group = a_group;
lock();
int l_res = dap_chain_global_db_driver_delete(store_data, 1);
unlock();
// do not add to history if l_res=1 (already deleted)
if(!l_res) {
// added to Del group
global_db_gr_del_add(a_key, a_group, time(NULL));
// Extract prefix
char * l_group_prefix = extract_group_prefix(a_group);
history_group_item_t * l_history_group_item = NULL;
......@@ -392,8 +502,21 @@ bool dap_chain_global_db_gr_del(char *a_key,const char *a_group)
DAP_DELETE(l_group_prefix);
}
//DAP_DELETE(store_data);
if(!l_res)
if(l_res>=0){
// added to Del group
global_db_gr_del_add(a_key, a_group, time(NULL));
/*/ read del info
char *l_group = dap_strdup_printf("%s.del", a_group);
size_t l_data_size_out = 0;
dap_store_obj_t *l_objs = dap_chain_global_db_obj_gr_get(a_key, &l_data_size_out,l_group);
// update timestamp
if(l_objs){
if(l_objs->timestamp<time(NULL))
dap_store_obj_free(l_objs, l_data_size_out);
}
DAP_DELETE(l_group);*/
return true;
}
return false;
}
bool dap_chain_global_db_del(char *a_key)
......@@ -496,6 +619,16 @@ bool dap_chain_global_db_obj_save(void* a_store_data, size_t a_objs_count)
// Extract prefix if added successfuly, add history log and call notify callback if present
if(!l_res) {
for(size_t i = 0; i < a_objs_count; i++) {
dap_store_obj_t *a_store_obj = a_store_data + i;
if(a_store_obj->type == 'a')
// delete info about the deleted entry from the base if one present
global_db_gr_del_del(a_store_obj->key, a_store_obj->group);
else if(a_store_obj->type == 'd')
// add to Del group
global_db_gr_del_add(a_store_obj->key, a_store_obj->group, a_store_obj->timestamp);
history_group_item_t * l_history_group_item = NULL;
dap_store_obj_t* l_obj = (dap_store_obj_t*)a_store_data + i;
char * l_group_prefix = extract_group_prefix(l_obj->group);
......@@ -523,8 +656,9 @@ bool dap_chain_global_db_obj_save(void* a_store_data, size_t a_objs_count)
}
}
if(!l_res)
if(l_res >= 0) {
return true;
}
return false;
}
......
......@@ -61,6 +61,7 @@ void dap_chain_global_db_add_history_callback_notify(const char * a_group_prefix
* Get entry from base
*/
void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group);
dap_store_obj_t* dap_chain_global_db_obj_gr_get(const char *a_key, size_t *a_data_len_out, const char *a_group);
uint8_t * dap_chain_global_db_gr_get(const char *a_key, size_t *a_data_out, const char *a_group);
uint8_t * dap_chain_global_db_get(const char *a_key, size_t *a_data_out);
......@@ -76,6 +77,11 @@ bool dap_chain_global_db_set( char *a_key, void *a_value, size_t a_value_len);
bool dap_chain_global_db_gr_del(char *a_key, const char *a_group);
bool dap_chain_global_db_del(char *a_key);
/**
* Get timestamp of the deleted entry
*/
time_t global_db_gr_del_get_timestamp(const char *a_group, char *a_key);
/**
* Read the entire database into an array of size bytes
*
......
......@@ -407,12 +407,21 @@ static int save_write_buf(void)
dap_store_obj_t *l_obj = s_list_begin->data;
assert(l_obj);
if(s_drv_callback.apply_store_obj) {
if(!s_drv_callback.apply_store_obj(l_obj)) {
int l_ret_tmp = s_drv_callback.apply_store_obj(l_obj);
if(l_ret_tmp == 1) {
log_it(L_INFO, "item is missing (may be already deleted) %s/%s\n", l_obj->group, l_obj->key);
l_ret = 1;
}
if(l_ret_tmp < 0) {
log_it(L_ERROR, "Can't write item %s/%s\n", l_obj->group, l_obj->key);
l_ret -= 1;
}
/*if(!s_drv_callback.apply_store_obj(l_obj)) {
//log_it(L_INFO, "Write item Ok %s/%s\n", l_obj->group, l_obj->key);
}
else {
log_it(L_ERROR, "Can't write item %s/%s\n", l_obj->group, l_obj->key);
}
}*/
}
s_list_begin = dap_list_next(s_list_begin);
......@@ -496,10 +505,12 @@ int dap_chain_global_db_driver_appy(pdap_store_obj_t a_store_obj, size_t a_store
for(size_t i = 0; i < a_store_count; i++) {
dap_store_obj_t *l_store_obj_cur = a_store_obj + i;
assert(l_store_obj_cur);
if(!s_drv_callback.apply_store_obj(l_store_obj_cur)) {
//log_it(L_INFO, "Write item Ok %s/%s\n", l_obj->group, l_obj->key);
int l_ret_tmp = s_drv_callback.apply_store_obj(l_store_obj_cur);
if(l_ret_tmp == 1) {
log_it(L_INFO, "item is missing (may be already deleted) %s/%s\n", l_store_obj_cur->group, l_store_obj_cur->key);
l_ret = 1;
}
else {
if(l_ret_tmp < 0) {
log_it(L_ERROR, "Can't write item %s/%s\n", l_store_obj_cur->group, l_store_obj_cur->key);
l_ret -= 1;
}
......@@ -601,3 +612,18 @@ dap_store_obj_t* dap_chain_global_db_driver_read(const char *a_group, const char
l_ret = s_drv_callback.read_store_obj(a_group, a_key, a_count_out);
return l_ret;
}
/**
* Check an element in the database
*
* a_group - group name
* a_key - key name
*/
bool dap_chain_global_db_driver_is(const char *a_group, const char *a_key)
{
bool l_ret = NULL;
// read records using the selected database engine
if(s_drv_callback.is_obj)
l_ret = s_drv_callback.is_obj(a_group, a_key);
return l_ret;
}
......@@ -34,7 +34,7 @@ typedef struct dap_store_obj {
uint8_t type;
char *group;
char *key;
const char *c_group;
//const char *c_group;
const char *c_key;
uint8_t *value;
size_t value_len;
......@@ -51,6 +51,7 @@ typedef dap_store_obj_t* (*dap_db_driver_read_callback_t)(const char *,const cha
typedef dap_store_obj_t* (*dap_db_driver_read_cond_callback_t)(const char *,uint64_t , size_t *);
typedef dap_store_obj_t* (*dap_db_driver_read_last_callback_t)(const char *);
typedef size_t (*dap_db_driver_read_count_callback_t)(const char *,uint64_t);
typedef bool (*dap_db_driver_is_obj_callback_t)(const char *, const char *);
typedef int (*dap_db_driver_callback_t)(void);
typedef struct dap_db_driver_callbacks {
......@@ -59,6 +60,7 @@ typedef struct dap_db_driver_callbacks {
dap_db_driver_read_last_callback_t read_last_store_obj;
dap_db_driver_read_cond_callback_t read_cond_store_obj;
dap_db_driver_read_count_callback_t read_count_store;
dap_db_driver_is_obj_callback_t is_obj;
dap_db_driver_callback_t transaction_start;
dap_db_driver_callback_t transaction_end;
dap_db_driver_callback_t deinit;
......@@ -81,6 +83,7 @@ int dap_chain_global_db_driver_delete(pdap_store_obj_t a_store_obj, size_t a_sto
dap_store_obj_t* dap_chain_global_db_driver_read_last(const char *a_group);
dap_store_obj_t* dap_chain_global_db_driver_cond_read(const char *a_group, uint64_t id, size_t *a_count_out);
dap_store_obj_t* dap_chain_global_db_driver_read(const char *a_group, const char *a_key, size_t *count_out);
bool dap_chain_global_db_driver_is(const char *a_group, const char *a_key);
size_t dap_chain_global_db_driver_count(const char *a_group, uint64_t id);
dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj,
......
......@@ -228,6 +228,7 @@ int dap_db_driver_cdb_init(const char *a_cdb_path, dap_db_driver_callbacks_t *a_
a_drv_callback->read_store_obj = dap_db_driver_cdb_read_store_obj;
a_drv_callback->read_cond_store_obj = dap_db_driver_cdb_read_cond_store_obj;
a_drv_callback->read_count_store = dap_db_driver_cdb_read_count_store;
a_drv_callback->is_obj = dap_db_driver_cdb_is_obj;
a_drv_callback->deinit = dap_db_driver_cdb_deinit;
a_drv_callback->flush = dap_db_driver_cdb_flush;
......@@ -321,6 +322,26 @@ dap_store_obj_t *dap_db_driver_cdb_read_last_store_obj(const char* a_group) {
return l_arg.o;
}
bool dap_db_driver_cdb_is_obj(const char *a_group, const char *a_key)
{
bool l_ret = false;
if(!a_group) {
return false;
}
pcdb_instance l_cdb_i = dap_cdb_get_db_by_group(a_group);
if(!l_cdb_i) {
return false;
}
CDB *l_cdb = l_cdb_i->cdb;
if(a_key) {
int l_vsize;
if(!cdb_is(l_cdb, a_key, (int) dap_strlen(a_key)))
l_ret = true;
}
return l_ret;
}
dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out) {
if (!a_group) {
return NULL;
......@@ -426,12 +447,14 @@ size_t dap_db_driver_cdb_read_count_store(const char *a_group, uint64_t a_id)
}
pcdb_instance l_cdb_i = dap_cdb_get_db_by_group(a_group);
if(!l_cdb_i) {
return NULL;
return 0;
}
CDB *l_cdb = l_cdb_i->cdb;
CDBSTAT l_cdb_stat;
cdb_stat(l_cdb, &l_cdb_stat);
return (size_t) l_cdb_stat.rnum;
if(a_id > l_cdb_stat.rnum)
return 0;
return (size_t) l_cdb_stat.rnum - a_id + 1;
}
int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) {
......@@ -448,7 +471,7 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) {
return -1;
}
if(a_store_obj->type == 'a') {
if(!a_store_obj->key || !a_store_obj->value || !a_store_obj->value_len){
if(!a_store_obj->key) {// || !a_store_obj->value || !a_store_obj->value_len){
return -2;
}
cdb_record l_rec;
......@@ -459,7 +482,9 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) {
offset += sizeof(uint64_t);
dap_cdb_uint_to_hex(l_val + offset, a_store_obj->value_len, sizeof(unsigned long));
offset += sizeof(unsigned long);
memcpy(l_val + offset, a_store_obj->value, a_store_obj->value_len);
if(a_store_obj->value && a_store_obj->value_len){
memcpy(l_val + offset, a_store_obj->value, a_store_obj->value_len);
}
offset += a_store_obj->value_len;
unsigned long l_time = (unsigned long)a_store_obj->timestamp;
dap_cdb_uint_to_hex(l_val + offset, l_time, sizeof(time_t));
......@@ -473,7 +498,8 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) {
DAP_DELETE(l_rec.val);
} else if(a_store_obj->type == 'd') {
if(a_store_obj->key) {
cdb_del(l_cdb_i->cdb, a_store_obj->key, (int)strlen(a_store_obj->key));
if(cdb_del(l_cdb_i->cdb, a_store_obj->key, (int) strlen(a_store_obj->key)) == -3)
ret = 1;
} else {
cdb_destroy(l_cdb_i->cdb);
if (!dap_cdb_init_group(a_store_obj->group, CDB_TRUNC | CDB_PAGEWARMUP)) {
......
......@@ -47,3 +47,4 @@ dap_store_obj_t *dap_db_driver_cdb_read_last_store_obj(const char*);
dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char*, const char*, size_t*);
size_t dap_db_driver_cdb_read_count_store(const char *a_group, uint64_t a_id);
dap_store_obj_t* dap_db_driver_cdb_read_cond_store_obj(const char*, uint64_t, size_t*);
bool dap_db_driver_cdb_is_obj(const char *a_group, const char *a_key);
......@@ -107,6 +107,7 @@ uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out)
l_obj->id = a_obj->id;
l_obj->group = dap_strdup(l_rec.group);
l_obj->key = dap_strdup(l_keys[i]);
l_obj->timestamp = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key);
}
if(l_obj == NULL) {
dap_store_obj_free(l_store_obj, l_count);
......@@ -1292,6 +1293,8 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id)
dap_db_log_list_t *l_dap_db_log_list = DAP_NEW_Z(dap_db_log_list_t);
size_t l_data_size_out = dap_chain_global_db_driver_count(GROUP_LOCAL_HISTORY, first_id);
if(!l_data_size_out)
return NULL;
// debug
// if(l_data_size_out>11)
// l_data_size_out = 11;
......
......@@ -25,12 +25,12 @@ bool dap_db_history_truncate(void);
// for dap_db_log_list_xxx()
typedef struct dap_db_log_list {
dap_list_t *list_write; // writed list
dap_list_t *list_read; // readed list (inside list_write)
dap_list_t *list_read; // readed list (inside list_write)
bool is_process;
size_t item_start;
size_t item_last;
size_t items_rest;
size_t items_number;
size_t item_start; // first item to read from db
size_t item_last; // last item to read from db
size_t items_rest; // rest items to read from list_read
size_t items_number; // remaining items in list_write after reading from db
pthread_t thread;
pthread_mutex_t list_mutex;
} dap_db_log_list_t;
......
......@@ -106,7 +106,7 @@ uint64_t dap_db_get_cur_node_addr(char *a_net_name)
bool dap_db_log_set_last_id_remote(uint64_t a_node_addr, uint64_t a_id)
{
dap_global_db_obj_t l_objs;
l_objs.key = dap_strdup_printf("%lld", a_node_addr);
l_objs.key = dap_strdup_printf("%ju", a_node_addr);
l_objs.value = (uint8_t*) &a_id;
l_objs.value_len = sizeof(uint64_t);
bool l_ret = dap_chain_global_db_gr_save(&l_objs, 1, GROUP_LOCAL_NODE_LAST_ID);
......@@ -120,7 +120,7 @@ bool dap_db_log_set_last_id_remote(uint64_t a_node_addr, uint64_t a_id)
*/
uint64_t dap_db_log_get_last_id_remote(uint64_t a_node_addr)
{
char *l_node_addr_str = dap_strdup_printf("%lld", a_node_addr);
char *l_node_addr_str = dap_strdup_printf("%ju", a_node_addr);
size_t l_timestamp_len = 0;
uint8_t *l_timestamp = dap_chain_global_db_gr_get((const char*) l_node_addr_str, &l_timestamp_len,
GROUP_LOCAL_NODE_LAST_ID);
......
......@@ -1086,6 +1086,61 @@ int cdb_set2(CDB *db, const char *key, int ksize, const char *val, int vsize, in
}
int cdb_is(CDB *db, const char *key, int ksize)
{
FOFF soffs[SFOFFNUM];
FOFF *offs;
int dupnum, ret = -3;
uint64_t hash;
uint32_t now = time(NULL);
uint32_t lockid;
if (db->rcache) {
char *cval;
cdb_lock_lock(db->rclock);
cval = cdb_ht_get(db->rcache, key, ksize, 0, true);
if (cval) {
db->rchit++;
cdb_lock_unlock(db->rclock);
return 0;
} else {
db->rcmiss++;
if (db->vio == NULL) {
cdb_lock_unlock(db->rclock);
return -3;
}
}
cdb_lock_unlock(db->rclock);
}
offs = soffs;
hash = CDBHASH64(key, ksize);
lockid = (hash >> 24) % db->hsize % MLOCKNUM;
cdb_lock_lock(db->mlock[lockid]);
dupnum = cdb_getoff(db, hash, &offs, CDB_LOCKED);
if (dupnum <= 0) {
cdb_lock_unlock(db->mlock[lockid]);
return -1;
}
else
ret = 0;
cdb_lock_unlock(db->mlock[lockid]);
if (RCOVERFLOW(db))
_cdb_recout(db);
if (offs != soffs)
free(offs);
if (ret < 0)
cdb_seterrno(db, CDB_NOTFOUND, __FILE__, __LINE__);
else {
db->rcmiss++;
cdb_seterrno(db, CDB_SUCCESS, __FILE__, __LINE__);
}
return ret;
}
int cdb_get(CDB *db, const char *key, int ksize, void **val, int *vsize)
{
......
......@@ -225,10 +225,12 @@ void *cdb_ht_get(CDBHASHTABLE *ht, const void *key, int ksize, int *vsize, bool
res = cdb_ht_get3(ht, key, ksize, mtf);
if (res) {
*vsize = res->vsize;
if(vsize)
*vsize = res->vsize;
return cdb_ht_itemval(ht, res);
} else {
*vsize = 0;
if(vsize)
*vsize = 0;
return NULL;
}
}
......
......@@ -155,6 +155,8 @@ int cdb_set(CDB *db, const char *key, int ksize, const char *val, int vsize);
int cdb_set2(CDB *db, const char *key, int ksize, const char *val, int vsize, int opt, int expire);
int cdb_is(CDB *db, const char *key, int ksize);
/* get an record by 'key', the value will be allocated and passed out by 'val', its size is
'vsize'. return 0 if success, or -1 at failure. */
int cdb_get(CDB *db, const char *key, int ksize, void **val, int *vsize);
......