Skip to content
Snippets Groups Projects
Commit 95bd2e70 authored by Aleksandr Lysikov's avatar Aleksandr Lysikov
Browse files

made a unique key to write to history

fixed some bugs
parent 6c26a857
No related branches found
No related tags found
No related merge requests found
......@@ -16,8 +16,17 @@
#define LOG_TAG "dap_global_db"
// for access from several streams
static pthread_mutex_t ldb_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t ldb_mutex_ = PTHREAD_MUTEX_INITIALIZER;
static inline void lock()
{
pthread_mutex_lock(&ldb_mutex_);
}
static inline void unlock()
{
pthread_mutex_unlock(&ldb_mutex_);
}
// Callback table item
typedef struct history_group_item
......@@ -131,9 +140,9 @@ int dap_chain_global_db_init(dap_config_t * g_config)
{
const char *a_storage_path = dap_config_get_item_str(g_config, "resources", "dap_global_db_path");
if(a_storage_path){
pthread_mutex_lock(&ldb_mutex);
lock();
int res = dap_db_init(a_storage_path);
pthread_mutex_unlock(&ldb_mutex);
unlock();
return res;
}
return -1;
......@@ -144,15 +153,15 @@ int dap_chain_global_db_init(dap_config_t * g_config)
*/
void dap_chain_global_db_deinit(void)
{
pthread_mutex_lock(&ldb_mutex);
lock();
dap_db_deinit();
unlock();
history_group_item_t * l_item = NULL, *l_item_tmp = NULL;
HASH_ITER(hh, s_history_group_items, l_item, l_item_tmp){
DAP_DELETE(l_item);
}
s_history_group_items = NULL;
pthread_mutex_unlock(&ldb_mutex);
}
/**
......@@ -168,9 +177,9 @@ void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group)
size_t query_len = (size_t) snprintf(NULL, 0, "(&(cn=%s)(objectClass=%s))", a_key, a_group);
char *query = DAP_NEW_Z_SIZE(char, query_len + 1); //char query[32 + strlen(a_key)];
snprintf(query, query_len + 1, "(&(cn=%s)(objectClass=%s))", a_key, a_group); // objectClass != ou
pthread_mutex_lock(&ldb_mutex);
lock();
dap_store_obj_t *store_data = dap_db_read_data(query, &count);
pthread_mutex_unlock(&ldb_mutex);
unlock();
assert(count <= 1);
DAP_DELETE(query);
return store_data;
......@@ -193,9 +202,9 @@ uint8_t * dap_chain_global_db_gr_get(const char *a_key, size_t *a_data_out, cons
char *l_query = DAP_NEW_Z_SIZE(char, l_query_len + 1); //char query[32 + strlen(a_key)];
snprintf(l_query, l_query_len + 1, "(&(cn=%s)(objectClass=%s))", a_key, a_group); // objectClass != ou
pthread_mutex_lock(&ldb_mutex);
lock();
pdap_store_obj_t store_data = dap_db_read_data(l_query, &l_count);
pthread_mutex_unlock(&ldb_mutex);
unlock();
if(l_count == 1 && store_data && !strcmp(store_data->key, a_key)) {
l_ret_value = (store_data->value) ? DAP_NEW_SIZE(uint8_t, store_data->value_len) : NULL; //ret_value = (store_data->value) ? strdup(store_data->value) : NULL;
memcpy(l_ret_value, store_data->value, store_data->value_len);
......@@ -227,8 +236,9 @@ bool dap_chain_global_db_gr_set(const char *a_key, const void *a_value, size_t a
store_data->value_len = (a_value_len == (size_t) -1) ? strlen((const char* ) a_value) : a_value_len;
store_data->group = strdup(a_group);
store_data->timestamp = time(NULL);
pthread_mutex_lock(&ldb_mutex);
lock();
int l_res = dap_db_add(store_data, 1);
unlock();
// Extract prefix if added successfuly, add history log and call notify callback if present
if (!l_res ){
......@@ -238,8 +248,11 @@ bool dap_chain_global_db_gr_set(const char *a_key, const void *a_value, size_t a
HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item);
if ( l_history_group_item ){
if ( l_history_group_item->auto_track )
if ( l_history_group_item->auto_track ){
lock();
dap_db_history_add('a', store_data, 1);
unlock();
}
if ( l_history_group_item->callback_notify )
l_history_group_item->callback_notify(l_history_group_item->callback_arg, 'a',l_group_prefix,a_group,a_key,a_value,a_value_len);
}
......@@ -248,7 +261,6 @@ bool dap_chain_global_db_gr_set(const char *a_key, const void *a_value, size_t a
}else {
log_it(L_ERROR,"Save error: %d",l_res);
}
pthread_mutex_unlock(&ldb_mutex);
DAP_DELETE(store_data);
return !l_res;
......@@ -268,8 +280,9 @@ bool dap_chain_global_db_gr_del(const char *a_key, const char *a_group)
pdap_store_obj_t store_data = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(struct dap_store_obj));
store_data->key = strdup( a_key);
store_data->group = strdup( a_group);
pthread_mutex_lock(&ldb_mutex);
lock();
int l_res = dap_db_delete(store_data, 1);
unlock();
if (!l_res){
// Extract prefix
char * l_group_prefix = extract_group_prefix (a_group);
......@@ -277,15 +290,17 @@ bool dap_chain_global_db_gr_del(const char *a_key, const char *a_group)
if ( l_group_prefix )
HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item);
if (l_history_group_item ){
if ( l_history_group_item->auto_track )
if(l_history_group_item->auto_track) {
lock();
dap_db_history_add('d', store_data, 1);
unlock();
}
if ( l_history_group_item->callback_notify )
l_history_group_item->callback_notify(l_history_group_item->callback_arg,'d',l_group_prefix,a_group, a_key, NULL, 0);
}
if ( l_group_prefix )
DAP_DELETE(l_group_prefix);
}
pthread_mutex_unlock(&ldb_mutex);
DAP_DELETE(store_data);
if(!l_res)
return true;
......@@ -309,9 +324,9 @@ dap_global_db_obj_t** dap_chain_global_db_gr_load(const char *a_group, size_t *a
snprintf(l_query, l_query_len + 1, "(objectClass=%s)", a_group);
size_t count = 0;
// Read data
pthread_mutex_lock(&ldb_mutex);
lock();
pdap_store_obj_t store_obj = dap_db_read_data(l_query, &count);
pthread_mutex_unlock(&ldb_mutex);
unlock();
DAP_DELETE(l_query);
// Serialization data
dap_store_obj_pkt_t *pkt = dap_store_packet_multiple(store_obj, 0, count);
......@@ -366,28 +381,43 @@ bool dap_chain_global_db_obj_save(void* a_store_data, size_t a_objs_count)
dap_store_obj_t* l_obj = l_store_data + i;
size_t l_count = 0;
char *l_query = dap_strdup_printf("(&(cn=%s)(objectClass=%s))", l_obj->key, l_obj->group);
pthread_mutex_lock(&ldb_mutex);
lock();
dap_store_obj_t *l_read_store_data = dap_db_read_data(l_query, &l_count);
pthread_mutex_unlock(&ldb_mutex);
// don't save obj if (present timestamp) > (new timestamp)
if(l_read_store_data) {
unlock();
// whether to add a record
if(l_obj->type == 'a' && l_read_store_data) {
// don't save obj if (present timestamp) > (new timestamp)
if(l_count == 1 && l_read_store_data->timestamp >= l_obj->timestamp) {
// mark to not save
l_obj->timestamp = (time_t) -1;
// reduce the number of real records
l_objs_count--;
}
dab_db_free_pdap_store_obj_t(l_read_store_data, l_count);
}
// whether to delete a record
else if(l_obj->type == 'd' && !l_read_store_data) {
// mark to not apply because record already deleted
l_obj->timestamp = (time_t) -1;
// reduce the number of real records
l_objs_count--;
}
dab_db_free_pdap_store_obj_t(l_read_store_data, l_count);
DAP_DELETE(l_query);
}
// save data
// save/delete data
if(l_objs_count > 0) {
pthread_mutex_lock(&ldb_mutex);
int l_res = dap_db_add(l_store_data, a_objs_count);
lock();
int l_res = -1;
//add a record
if(l_store_data->type == 'a')
l_res = dap_db_add(l_store_data, a_objs_count);
//delete a record
if(l_store_data->type == 'd')
l_res = dap_db_delete(l_store_data, a_objs_count);
unlock();
// Extract prefix if added successfuly, add history log and call notify callback if present
if (!l_res){
for (size_t i =0; i< l_objs_count; i++ ){
......@@ -398,11 +428,14 @@ bool dap_chain_global_db_obj_save(void* a_store_data, size_t a_objs_count)
HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item);
if ( l_history_group_item ){
if ( l_history_group_item->auto_track )
dap_db_history_add('a', l_store_data, 1);
if ( l_history_group_item->auto_track ){
lock();
dap_db_history_add(l_store_data->type, l_store_data, 1);
unlock();
}
if ( l_history_group_item->callback_notify ){
if (l_obj){
l_history_group_item->callback_notify( l_history_group_item->callback_arg, 'a',
l_history_group_item->callback_notify( l_history_group_item->callback_arg, l_store_data->type,
l_group_prefix, l_obj->group , l_obj->key,
l_obj->value, l_obj->value_len );
}else {
......@@ -414,7 +447,6 @@ bool dap_chain_global_db_obj_save(void* a_store_data, size_t a_objs_count)
}
}
pthread_mutex_unlock(&ldb_mutex);
if(!l_res)
return true;
}
......@@ -438,8 +470,9 @@ bool dap_chain_global_db_gr_save(dap_global_db_obj_t* a_objs, size_t a_objs_coun
store_data_cur->timestamp = l_timestamp;
}
if(l_store_data) {
pthread_mutex_lock(&ldb_mutex);
lock();
int l_res = dap_db_add(l_store_data, a_objs_count);
unlock();
if (!l_res){
for (size_t i =0; i< a_objs_count; i++ ){
history_group_item_t * l_history_group_item = NULL;
......@@ -450,8 +483,11 @@ bool dap_chain_global_db_gr_save(dap_global_db_obj_t* a_objs, size_t a_objs_coun
HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item);
if ( l_history_group_item ){
if ( l_history_group_item->auto_track )
if ( l_history_group_item->auto_track ){
lock();
dap_db_history_add('a', l_store_data, 1);
unlock();
}
if ( l_history_group_item->callback_notify ){
if (l_obj){
l_history_group_item->callback_notify('a',l_group_prefix, l_obj->group , l_obj->key,
......@@ -465,7 +501,6 @@ bool dap_chain_global_db_gr_save(dap_global_db_obj_t* a_objs, size_t a_objs_coun
}
}
pthread_mutex_unlock(&ldb_mutex);
DAP_DELETE(l_store_data); //dab_db_free_pdap_store_obj_t(store_data, a_objs_count);
if(!l_res)
return true;
......
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <pthread.h>
#include <dap_common.h>
#include <dap_strfuncs.h>
......@@ -30,10 +31,23 @@ static int dap_db_history_unpack_hist(char *l_str_in, dap_global_db_hist_t *a_re
return 1;
}
static char* dap_db_history_timestamp()
static char* dap_db_new_history_timestamp()
{
static pthread_mutex_t s_mutex = PTHREAD_MUTEX_INITIALIZER;
// get unique key
pthread_mutex_lock(&s_mutex);
static time_t s_last_time = 0;
static uint64_t s_suffix = 0;
time_t l_cur_time = time(NULL);
return dap_strdup_printf("%lld", (uint64_t) l_cur_time);
if(s_last_time == l_cur_time)
s_suffix++;
else {
s_suffix = 0;
s_last_time = l_cur_time;
}
char *l_str = dap_strdup_printf("%lld_%lld", (uint64_t) l_cur_time, s_suffix);
pthread_mutex_unlock(&s_mutex);
return l_str;
}
/**
......@@ -57,13 +71,24 @@ uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out)
int i = 0;
dap_store_obj_t *l_store_obj = DAP_NEW_Z_SIZE(dap_store_obj_t, l_count * sizeof(dap_store_obj_t));
while(l_keys[i]) {
dap_store_obj_t *l_obj = (dap_store_obj_t*) dap_chain_global_db_obj_get(l_keys[i], l_rec.group);
dap_store_obj_t *l_obj = NULL;
// add record - read record
if(l_rec.type=='a')
l_obj = (dap_store_obj_t*) dap_chain_global_db_obj_get(l_keys[i], l_rec.group);
// delete record - save only key for record
else if(l_rec.type=='d'){// //section=strdup("kelvin_nodes");
l_obj = (dap_store_obj_t*)DAP_NEW_Z(dap_store_obj_t);
l_obj->group = dap_strdup(l_rec.group);
l_obj->key = dap_strdup(l_keys[i]);
}
if (l_obj == NULL){
dab_db_free_pdap_store_obj_t(l_store_obj, l_count);
dap_strfreev(l_keys);
return NULL;
}
// save record type: 'a' or 'd'
l_obj->type = l_rec.type;
memcpy(l_store_obj + i, l_obj, sizeof(dap_store_obj_t));
DAP_DELETE(l_obj);
i++;
......@@ -118,12 +143,13 @@ bool dap_db_history_add(char a_type, pdap_store_obj_t a_store_obj, size_t a_dap_
dap_store_obj_t l_store_data;
// key - timestamp
// value - keys of added/deleted data
l_store_data.key = dap_db_history_timestamp();
l_store_data.key = dap_db_new_history_timestamp();
l_store_data.value = (uint8_t*) strdup(l_str) ;
l_store_data.value_len = l_str_len+1;
l_store_data.group = GROUP_GLOBAL_HISTORY;
l_store_data.timestamp = time(NULL);
int l_res = dap_db_add(&l_store_data, 1);
printf("!!!\n!!!HISTORY store save l_res=%d ts=%lld text=%s\n!!!\n",l_res,l_store_data.timestamp,l_str);
if(l_rec.keys_count > 1)
DAP_DELETE(l_rec.keys);
DAP_DELETE(l_str);
......@@ -216,5 +242,3 @@ void dap_db_log_del_list(dap_list_t *a_list)
{
dap_list_free_full(a_list, (dap_callback_destroyed_t) dap_chain_global_db_obj_delete);
}
......@@ -111,8 +111,6 @@ int dap_db_init(const char *path)
ldb_msg_add_string(msg, "objectClass", "top");
ldb_msg_add_string(msg, "objectClass", "section");
dap_db_add_msg(msg);
talloc_free(msg->dn);
talloc_free(msg);
// level 2: groups
dap_db_group_create( GROUP_GLOBAL_ADDRS_LEASED);
......@@ -458,8 +456,8 @@ int dap_db_delete(pdap_store_obj_t store_obj, size_t a_store_count)
static size_t dap_db_get_size_pdap_store_obj_t(pdap_store_obj_t store_obj)
{
size_t size = sizeof(uint32_t) + 3 * sizeof(uint16_t) + sizeof(size_t) + sizeof(time_t) + strlen(store_obj->group) +
strlen(store_obj->key) + strlen(store_obj->section) + store_obj->value_len;
size_t size = sizeof(uint32_t) + 3 * sizeof(uint16_t) + sizeof(size_t) + sizeof(time_t) + dap_strlen(store_obj->group) +
dap_strlen(store_obj->key) + dap_strlen(store_obj->section) + store_obj->value_len;
return size;
}
......@@ -488,9 +486,9 @@ dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, tim
l_offset += sizeof(uint32_t);
for( size_t l_q = 0; l_q < a_store_obj_count; ++l_q) {
dap_store_obj_t obj = a_store_obj[l_q];
uint16_t section_size = (uint16_t) strlen(obj.section);
uint16_t group_size = (uint16_t) strlen(obj.group);
uint16_t key_size = (uint16_t) strlen(obj.key);
uint16_t section_size = (uint16_t) dap_strlen(obj.section);
uint16_t group_size = (uint16_t) dap_strlen(obj.group);
uint16_t key_size = (uint16_t) dap_strlen(obj.key);
memcpy(l_pkt->data + l_offset, &obj.type, sizeof(int));
l_offset += sizeof(int);
memcpy(l_pkt->data + l_offset, &section_size, sizeof(uint16_t));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment