From 95bd2e70b940b38a95570bff285cd717d70956a5 Mon Sep 17 00:00:00 2001 From: Aleksandr Lysikov <lysikov@inbox.ru> Date: Wed, 22 May 2019 21:30:01 +0500 Subject: [PATCH] made a unique key to write to history fixed some bugs --- dap_chain_global_db.c | 99 ++++++++++++++++++++++++++------------ dap_chain_global_db_hist.c | 38 ++++++++++++--- dap_chain_global_db_pvt.c | 12 ++--- 3 files changed, 103 insertions(+), 46 deletions(-) diff --git a/dap_chain_global_db.c b/dap_chain_global_db.c index f7a1fe8..5097093 100755 --- a/dap_chain_global_db.c +++ b/dap_chain_global_db.c @@ -16,8 +16,17 @@ #define LOG_TAG "dap_global_db" // for access from several streams -static pthread_mutex_t ldb_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t ldb_mutex_ = PTHREAD_MUTEX_INITIALIZER; +static inline void lock() +{ + pthread_mutex_lock(&ldb_mutex_); +} + +static inline void unlock() +{ + pthread_mutex_unlock(&ldb_mutex_); +} // Callback table item typedef struct history_group_item @@ -131,9 +140,9 @@ int dap_chain_global_db_init(dap_config_t * g_config) { const char *a_storage_path = dap_config_get_item_str(g_config, "resources", "dap_global_db_path"); if(a_storage_path){ - pthread_mutex_lock(&ldb_mutex); + lock(); int res = dap_db_init(a_storage_path); - pthread_mutex_unlock(&ldb_mutex); + unlock(); return res; } return -1; @@ -144,15 +153,15 @@ int dap_chain_global_db_init(dap_config_t * g_config) */ void dap_chain_global_db_deinit(void) { - pthread_mutex_lock(&ldb_mutex); + lock(); dap_db_deinit(); + unlock(); history_group_item_t * l_item = NULL, *l_item_tmp = NULL; HASH_ITER(hh, s_history_group_items, l_item, l_item_tmp){ DAP_DELETE(l_item); } s_history_group_items = NULL; - pthread_mutex_unlock(&ldb_mutex); } /** @@ -168,9 +177,9 @@ void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group) size_t query_len = (size_t) snprintf(NULL, 0, "(&(cn=%s)(objectClass=%s))", a_key, a_group); char *query = DAP_NEW_Z_SIZE(char, query_len + 1); //char query[32 + strlen(a_key)]; snprintf(query, query_len + 1, "(&(cn=%s)(objectClass=%s))", a_key, a_group); // objectClass != ou - pthread_mutex_lock(&ldb_mutex); + lock(); dap_store_obj_t *store_data = dap_db_read_data(query, &count); - pthread_mutex_unlock(&ldb_mutex); + unlock(); assert(count <= 1); DAP_DELETE(query); return store_data; @@ -193,9 +202,9 @@ uint8_t * dap_chain_global_db_gr_get(const char *a_key, size_t *a_data_out, cons char *l_query = DAP_NEW_Z_SIZE(char, l_query_len + 1); //char query[32 + strlen(a_key)]; snprintf(l_query, l_query_len + 1, "(&(cn=%s)(objectClass=%s))", a_key, a_group); // objectClass != ou - pthread_mutex_lock(&ldb_mutex); + lock(); pdap_store_obj_t store_data = dap_db_read_data(l_query, &l_count); - pthread_mutex_unlock(&ldb_mutex); + unlock(); if(l_count == 1 && store_data && !strcmp(store_data->key, a_key)) { l_ret_value = (store_data->value) ? DAP_NEW_SIZE(uint8_t, store_data->value_len) : NULL; //ret_value = (store_data->value) ? strdup(store_data->value) : NULL; memcpy(l_ret_value, store_data->value, store_data->value_len); @@ -227,8 +236,9 @@ bool dap_chain_global_db_gr_set(const char *a_key, const void *a_value, size_t a store_data->value_len = (a_value_len == (size_t) -1) ? strlen((const char* ) a_value) : a_value_len; store_data->group = strdup(a_group); store_data->timestamp = time(NULL); - pthread_mutex_lock(&ldb_mutex); + lock(); int l_res = dap_db_add(store_data, 1); + unlock(); // Extract prefix if added successfuly, add history log and call notify callback if present if (!l_res ){ @@ -238,8 +248,11 @@ bool dap_chain_global_db_gr_set(const char *a_key, const void *a_value, size_t a HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item); if ( l_history_group_item ){ - if ( l_history_group_item->auto_track ) + if ( l_history_group_item->auto_track ){ + lock(); dap_db_history_add('a', store_data, 1); + unlock(); + } if ( l_history_group_item->callback_notify ) l_history_group_item->callback_notify(l_history_group_item->callback_arg, 'a',l_group_prefix,a_group,a_key,a_value,a_value_len); } @@ -248,7 +261,6 @@ bool dap_chain_global_db_gr_set(const char *a_key, const void *a_value, size_t a }else { log_it(L_ERROR,"Save error: %d",l_res); } - pthread_mutex_unlock(&ldb_mutex); DAP_DELETE(store_data); return !l_res; @@ -268,8 +280,9 @@ bool dap_chain_global_db_gr_del(const char *a_key, const char *a_group) pdap_store_obj_t store_data = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(struct dap_store_obj)); store_data->key = strdup( a_key); store_data->group = strdup( a_group); - pthread_mutex_lock(&ldb_mutex); + lock(); int l_res = dap_db_delete(store_data, 1); + unlock(); if (!l_res){ // Extract prefix char * l_group_prefix = extract_group_prefix (a_group); @@ -277,15 +290,17 @@ bool dap_chain_global_db_gr_del(const char *a_key, const char *a_group) if ( l_group_prefix ) HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item); if (l_history_group_item ){ - if ( l_history_group_item->auto_track ) + if(l_history_group_item->auto_track) { + lock(); dap_db_history_add('d', store_data, 1); + unlock(); + } if ( l_history_group_item->callback_notify ) l_history_group_item->callback_notify(l_history_group_item->callback_arg,'d',l_group_prefix,a_group, a_key, NULL, 0); } if ( l_group_prefix ) DAP_DELETE(l_group_prefix); } - pthread_mutex_unlock(&ldb_mutex); DAP_DELETE(store_data); if(!l_res) return true; @@ -309,9 +324,9 @@ dap_global_db_obj_t** dap_chain_global_db_gr_load(const char *a_group, size_t *a snprintf(l_query, l_query_len + 1, "(objectClass=%s)", a_group); size_t count = 0; // Read data - pthread_mutex_lock(&ldb_mutex); + lock(); pdap_store_obj_t store_obj = dap_db_read_data(l_query, &count); - pthread_mutex_unlock(&ldb_mutex); + unlock(); DAP_DELETE(l_query); // Serialization data dap_store_obj_pkt_t *pkt = dap_store_packet_multiple(store_obj, 0, count); @@ -366,28 +381,43 @@ bool dap_chain_global_db_obj_save(void* a_store_data, size_t a_objs_count) dap_store_obj_t* l_obj = l_store_data + i; size_t l_count = 0; char *l_query = dap_strdup_printf("(&(cn=%s)(objectClass=%s))", l_obj->key, l_obj->group); - pthread_mutex_lock(&ldb_mutex); + lock(); dap_store_obj_t *l_read_store_data = dap_db_read_data(l_query, &l_count); - pthread_mutex_unlock(&ldb_mutex); - // don't save obj if (present timestamp) > (new timestamp) - if(l_read_store_data) { + unlock(); + // whether to add a record + if(l_obj->type == 'a' && l_read_store_data) { + // don't save obj if (present timestamp) > (new timestamp) if(l_count == 1 && l_read_store_data->timestamp >= l_obj->timestamp) { // mark to not save l_obj->timestamp = (time_t) -1; // reduce the number of real records l_objs_count--; } - dab_db_free_pdap_store_obj_t(l_read_store_data, l_count); } + // whether to delete a record + else if(l_obj->type == 'd' && !l_read_store_data) { + // mark to not apply because record already deleted + l_obj->timestamp = (time_t) -1; + // reduce the number of real records + l_objs_count--; + } + dab_db_free_pdap_store_obj_t(l_read_store_data, l_count); DAP_DELETE(l_query); } - // save data + // save/delete data if(l_objs_count > 0) { - pthread_mutex_lock(&ldb_mutex); - int l_res = dap_db_add(l_store_data, a_objs_count); + lock(); + int l_res = -1; + //add a record + if(l_store_data->type == 'a') + l_res = dap_db_add(l_store_data, a_objs_count); + //delete a record + if(l_store_data->type == 'd') + l_res = dap_db_delete(l_store_data, a_objs_count); + unlock(); // Extract prefix if added successfuly, add history log and call notify callback if present if (!l_res){ for (size_t i =0; i< l_objs_count; i++ ){ @@ -398,11 +428,14 @@ bool dap_chain_global_db_obj_save(void* a_store_data, size_t a_objs_count) HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item); if ( l_history_group_item ){ - if ( l_history_group_item->auto_track ) - dap_db_history_add('a', l_store_data, 1); + if ( l_history_group_item->auto_track ){ + lock(); + dap_db_history_add(l_store_data->type, l_store_data, 1); + unlock(); + } if ( l_history_group_item->callback_notify ){ if (l_obj){ - l_history_group_item->callback_notify( l_history_group_item->callback_arg, 'a', + l_history_group_item->callback_notify( l_history_group_item->callback_arg, l_store_data->type, l_group_prefix, l_obj->group , l_obj->key, l_obj->value, l_obj->value_len ); }else { @@ -414,7 +447,6 @@ bool dap_chain_global_db_obj_save(void* a_store_data, size_t a_objs_count) } } - pthread_mutex_unlock(&ldb_mutex); if(!l_res) return true; } @@ -438,8 +470,9 @@ bool dap_chain_global_db_gr_save(dap_global_db_obj_t* a_objs, size_t a_objs_coun store_data_cur->timestamp = l_timestamp; } if(l_store_data) { - pthread_mutex_lock(&ldb_mutex); + lock(); int l_res = dap_db_add(l_store_data, a_objs_count); + unlock(); if (!l_res){ for (size_t i =0; i< a_objs_count; i++ ){ history_group_item_t * l_history_group_item = NULL; @@ -450,8 +483,11 @@ bool dap_chain_global_db_gr_save(dap_global_db_obj_t* a_objs, size_t a_objs_coun HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item); if ( l_history_group_item ){ - if ( l_history_group_item->auto_track ) + if ( l_history_group_item->auto_track ){ + lock(); dap_db_history_add('a', l_store_data, 1); + unlock(); + } if ( l_history_group_item->callback_notify ){ if (l_obj){ l_history_group_item->callback_notify('a',l_group_prefix, l_obj->group , l_obj->key, @@ -465,7 +501,6 @@ bool dap_chain_global_db_gr_save(dap_global_db_obj_t* a_objs, size_t a_objs_coun } } - pthread_mutex_unlock(&ldb_mutex); DAP_DELETE(l_store_data); //dab_db_free_pdap_store_obj_t(store_data, a_objs_count); if(!l_res) return true; diff --git a/dap_chain_global_db_hist.c b/dap_chain_global_db_hist.c index aa025ef..a84bdf6 100755 --- a/dap_chain_global_db_hist.c +++ b/dap_chain_global_db_hist.c @@ -1,6 +1,7 @@ #include <string.h> #include <stdlib.h> #include <time.h> +#include <pthread.h> #include <dap_common.h> #include <dap_strfuncs.h> @@ -30,10 +31,23 @@ static int dap_db_history_unpack_hist(char *l_str_in, dap_global_db_hist_t *a_re return 1; } -static char* dap_db_history_timestamp() +static char* dap_db_new_history_timestamp() { + static pthread_mutex_t s_mutex = PTHREAD_MUTEX_INITIALIZER; + // get unique key + pthread_mutex_lock(&s_mutex); + static time_t s_last_time = 0; + static uint64_t s_suffix = 0; time_t l_cur_time = time(NULL); - return dap_strdup_printf("%lld", (uint64_t) l_cur_time); + if(s_last_time == l_cur_time) + s_suffix++; + else { + s_suffix = 0; + s_last_time = l_cur_time; + } + char *l_str = dap_strdup_printf("%lld_%lld", (uint64_t) l_cur_time, s_suffix); + pthread_mutex_unlock(&s_mutex); + return l_str; } /** @@ -57,13 +71,24 @@ uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out) int i = 0; dap_store_obj_t *l_store_obj = DAP_NEW_Z_SIZE(dap_store_obj_t, l_count * sizeof(dap_store_obj_t)); while(l_keys[i]) { - - dap_store_obj_t *l_obj = (dap_store_obj_t*) dap_chain_global_db_obj_get(l_keys[i], l_rec.group); + dap_store_obj_t *l_obj = NULL; + // add record - read record + if(l_rec.type=='a') + l_obj = (dap_store_obj_t*) dap_chain_global_db_obj_get(l_keys[i], l_rec.group); + // delete record - save only key for record + else if(l_rec.type=='d'){// //section=strdup("kelvin_nodes"); + l_obj = (dap_store_obj_t*)DAP_NEW_Z(dap_store_obj_t); + l_obj->group = dap_strdup(l_rec.group); + l_obj->key = dap_strdup(l_keys[i]); + } if (l_obj == NULL){ dab_db_free_pdap_store_obj_t(l_store_obj, l_count); dap_strfreev(l_keys); return NULL; } + // save record type: 'a' or 'd' + l_obj->type = l_rec.type; + memcpy(l_store_obj + i, l_obj, sizeof(dap_store_obj_t)); DAP_DELETE(l_obj); i++; @@ -118,12 +143,13 @@ bool dap_db_history_add(char a_type, pdap_store_obj_t a_store_obj, size_t a_dap_ dap_store_obj_t l_store_data; // key - timestamp // value - keys of added/deleted data - l_store_data.key = dap_db_history_timestamp(); + l_store_data.key = dap_db_new_history_timestamp(); l_store_data.value = (uint8_t*) strdup(l_str) ; l_store_data.value_len = l_str_len+1; l_store_data.group = GROUP_GLOBAL_HISTORY; l_store_data.timestamp = time(NULL); int l_res = dap_db_add(&l_store_data, 1); + printf("!!!\n!!!HISTORY store save l_res=%d ts=%lld text=%s\n!!!\n",l_res,l_store_data.timestamp,l_str); if(l_rec.keys_count > 1) DAP_DELETE(l_rec.keys); DAP_DELETE(l_str); @@ -216,5 +242,3 @@ void dap_db_log_del_list(dap_list_t *a_list) { dap_list_free_full(a_list, (dap_callback_destroyed_t) dap_chain_global_db_obj_delete); } - - diff --git a/dap_chain_global_db_pvt.c b/dap_chain_global_db_pvt.c index 0f29a49..edd50a8 100755 --- a/dap_chain_global_db_pvt.c +++ b/dap_chain_global_db_pvt.c @@ -111,8 +111,6 @@ int dap_db_init(const char *path) ldb_msg_add_string(msg, "objectClass", "top"); ldb_msg_add_string(msg, "objectClass", "section"); dap_db_add_msg(msg); - talloc_free(msg->dn); - talloc_free(msg); // level 2: groups dap_db_group_create( GROUP_GLOBAL_ADDRS_LEASED); @@ -458,8 +456,8 @@ int dap_db_delete(pdap_store_obj_t store_obj, size_t a_store_count) static size_t dap_db_get_size_pdap_store_obj_t(pdap_store_obj_t store_obj) { - size_t size = sizeof(uint32_t) + 3 * sizeof(uint16_t) + sizeof(size_t) + sizeof(time_t) + strlen(store_obj->group) + - strlen(store_obj->key) + strlen(store_obj->section) + store_obj->value_len; + size_t size = sizeof(uint32_t) + 3 * sizeof(uint16_t) + sizeof(size_t) + sizeof(time_t) + dap_strlen(store_obj->group) + + dap_strlen(store_obj->key) + dap_strlen(store_obj->section) + store_obj->value_len; return size; } @@ -488,9 +486,9 @@ dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, tim l_offset += sizeof(uint32_t); for( size_t l_q = 0; l_q < a_store_obj_count; ++l_q) { dap_store_obj_t obj = a_store_obj[l_q]; - uint16_t section_size = (uint16_t) strlen(obj.section); - uint16_t group_size = (uint16_t) strlen(obj.group); - uint16_t key_size = (uint16_t) strlen(obj.key); + uint16_t section_size = (uint16_t) dap_strlen(obj.section); + uint16_t group_size = (uint16_t) dap_strlen(obj.group); + uint16_t key_size = (uint16_t) dap_strlen(obj.key); memcpy(l_pkt->data + l_offset, &obj.type, sizeof(int)); l_offset += sizeof(int); memcpy(l_pkt->data + l_offset, §ion_size, sizeof(uint16_t)); -- GitLab