diff --git a/CMakeLists.txt b/CMakeLists.txt index 6ba5ba01588a009b8e19fd9dadadf27ed01d0e1b..3c18deda471cd5aa2eb6246bb2a3b06ecd70edb3 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ file(GLOB DAP_CHAIN_GLOBAL_DB_HDR *.h) add_library(${PROJECT_NAME} STATIC ${DAP_CHAIN_GLOBAL_DB_SRC} ${DAP_CHAIN_GLOBAL_DB_HDR}) -target_link_libraries(dap_chain_global_db dap_core dap_crypto dap_chain dap_chain_crypto ldb talloc tevent) +target_link_libraries(dap_chain_global_db dap_core dap_crypto dap_chain dap_chain_crypto ldb talloc tevent sqlite3) target_include_directories(dap_chain_global_db INTERFACE .) set(${PROJECT_NAME}_DEFINITIONS CACHE INTERNAL "${PROJECT_NAME}: Definitions" FORCE) diff --git a/dap_chain_global_db.c b/dap_chain_global_db.c index 76d669f625b7eb06155a0e938702070db13a9fd9..71b053b8efe5176800210862335b751b3b2565aa 100755 --- a/dap_chain_global_db.c +++ b/dap_chain_global_db.c @@ -3,29 +3,29 @@ #include <stdint.h> #include <pthread.h> #include <time.h> +#include <assert.h> #include "uthash.h" -#include "dap_hash.h" #include "dap_chain_common.h" #include "dap_strfuncs.h" -#include "dap_chain_global_db_pvt.h" +//#include "dap_chain_global_db_pvt.h" #include "dap_chain_global_db_hist.h" #include "dap_chain_global_db.h" #define LOG_TAG "dap_global_db" // for access from several streams -static pthread_mutex_t ldb_mutex_ = PTHREAD_MUTEX_INITIALIZER; +//static pthread_mutex_t ldb_mutex_ = PTHREAD_MUTEX_INITIALIZER; static inline void lock() { - pthread_mutex_lock(&ldb_mutex_); + //pthread_mutex_lock(&ldb_mutex_); } static inline void unlock() { - pthread_mutex_unlock(&ldb_mutex_); + //pthread_mutex_unlock(&ldb_mutex_); } // Callback table item @@ -42,28 +42,27 @@ typedef struct history_group_item // Tacked group callbacks static history_group_item_t * s_history_group_items = NULL; -char * extract_group_prefix (const char * a_group); - +char * extract_group_prefix(const char * a_group); /** * @brief extract_group_prefix * @param a_group * @return */ -char * extract_group_prefix (const char * a_group) +char * extract_group_prefix(const char * a_group) { - char * l_group_prefix = NULL, *l_delimeter ; + char * l_group_prefix = NULL, *l_delimeter; size_t l_group_prefix_size; - l_delimeter = index (a_group,'.'); - if ( l_delimeter == NULL ){ - l_group_prefix = strdup(a_group); - l_group_prefix_size = strlen( l_group_prefix)+1; + l_delimeter = index(a_group, '.'); + if(l_delimeter == NULL) { + l_group_prefix = dap_strdup(a_group); + l_group_prefix_size = dap_strlen(l_group_prefix) + 1; } else { - l_group_prefix_size = (size_t)l_delimeter- (size_t) a_group; - if ( l_group_prefix_size > 1 ) + l_group_prefix_size = (size_t) l_delimeter - (size_t) a_group; + if(l_group_prefix_size > 1) l_group_prefix = strndup(a_group, l_group_prefix_size); } - return l_group_prefix; + return l_group_prefix; } /** @@ -74,9 +73,9 @@ char * extract_group_prefix (const char * a_group) void dap_chain_global_db_add_history_group_prefix(const char * a_group_prefix) { history_group_item_t * l_item = DAP_NEW_Z(history_group_item_t); - snprintf(l_item->prefix,sizeof (l_item->prefix),"%s",a_group_prefix); + snprintf(l_item->prefix, sizeof(l_item->prefix), "%s", a_group_prefix); l_item->auto_track = true; - HASH_ADD_STR(s_history_group_items , prefix, l_item ); + HASH_ADD_STR(s_history_group_items, prefix, l_item); } /** @@ -84,18 +83,19 @@ void dap_chain_global_db_add_history_group_prefix(const char * a_group_prefix) * @param a_group_prefix * @param a_callback */ -void dap_chain_global_db_add_history_callback_notify(const char * a_group_prefix, dap_global_db_obj_callback_notify_t a_callback, void * a_arg) +void dap_chain_global_db_add_history_callback_notify(const char * a_group_prefix, + dap_global_db_obj_callback_notify_t a_callback, void * a_arg) { history_group_item_t * l_item = NULL; HASH_FIND_STR(s_history_group_items, a_group_prefix, l_item); - if ( l_item ){ + if(l_item) { l_item->callback_notify = a_callback; l_item->callback_arg = a_arg; - }else - log_it(L_WARNING, "Can't setup notify callback for groups with prefix %s. Possible not in history track state", a_group_prefix); + } else + log_it(L_WARNING, "Can't setup notify callback for groups with prefix %s. Possible not in history track state", + a_group_prefix); } - /** * Clean struct dap_global_db_obj_t */ @@ -138,14 +138,14 @@ void dap_chain_global_db_objs_delete(dap_global_db_obj_t **objs) */ int dap_chain_global_db_init(dap_config_t * g_config) { - const char *a_storage_path = dap_config_get_item_str(g_config, "resources", "dap_global_db_path"); - if(a_storage_path){ - lock(); - int res = dap_db_init(a_storage_path); - unlock(); - return res; - } - return -1; + const char *l_storage_path = dap_config_get_item_str(g_config, "resources", "dap_global_db_path"); + const char *l_driver_name = dap_config_get_item_str_default(g_config, "resources", "dap_global_db_driver", + "sqlite"); + lock(); + int res = dap_db_driver_init(l_driver_name, l_storage_path); + //int res = dap_db_init(a_storage_path); + unlock(); + return res; } /** @@ -154,10 +154,12 @@ int dap_chain_global_db_init(dap_config_t * g_config) void dap_chain_global_db_deinit(void) { lock(); - dap_db_deinit(); + dap_db_driver_deinit(); + //dap_db_deinit(); unlock(); history_group_item_t * l_item = NULL, *l_item_tmp = NULL; - HASH_ITER(hh, s_history_group_items, l_item, l_item_tmp){ + HASH_ITER(hh, s_history_group_items, l_item, l_item_tmp) + { DAP_DELETE(l_item); } s_history_group_items = NULL; @@ -171,18 +173,23 @@ void dap_chain_global_db_deinit(void) */ void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group) { - size_t count = 0; - if(!a_key) - return NULL; - size_t query_len = (size_t) snprintf(NULL, 0, "(&(cn=%s)(objectClass=%s))", a_key, a_group); - char *query = DAP_NEW_Z_SIZE(char, query_len + 1); //char query[32 + strlen(a_key)]; - snprintf(query, query_len + 1, "(&(cn=%s)(objectClass=%s))", a_key, a_group); // objectClass != ou - lock(); - dap_store_obj_t *store_data = dap_db_read_data(query, &count); - unlock(); - assert(count <= 1); - DAP_DELETE(query); - return store_data; + size_t l_count = 1; + // read one item + dap_store_obj_t *l_store_data = dap_chain_global_db_driver_read(a_group, a_key, &l_count); + return l_store_data; + + /* size_t count = 0; + if(!a_key) + return NULL; + size_t query_len = (size_t) snprintf(NULL, 0, "(&(cn=%s)(objectClass=%s))", a_key, a_group); + char *query = DAP_NEW_Z_SIZE(char, query_len + 1); //char query[32 + strlen(a_key)]; + snprintf(query, query_len + 1, "(&(cn=%s)(objectClass=%s))", a_key, a_group); // objectClass != ou + lock(); + dap_store_obj_t *store_data = dap_db_read_data(query, &count); + unlock(); + assert(count <= 1); + DAP_DELETE(query); + return store_data;*/ } /** @@ -192,28 +199,42 @@ void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group) * @param a_group * @return */ -uint8_t * dap_chain_global_db_gr_get(const char *a_key, size_t *a_data_out, const char *a_group) +uint8_t * dap_chain_global_db_gr_get(const char *a_key, size_t *a_data_len_out, const char *a_group) { uint8_t *l_ret_value = NULL; - size_t l_count = 0; - if(!a_key) - return NULL; - size_t l_query_len =(size_t) snprintf(NULL, 0, "(&(cn=%s)(objectClass=%s))", a_key, a_group); - - char *l_query = DAP_NEW_Z_SIZE(char, l_query_len + 1); //char query[32 + strlen(a_key)]; - snprintf(l_query, l_query_len + 1, "(&(cn=%s)(objectClass=%s))", a_key, a_group); // objectClass != ou - lock(); - pdap_store_obj_t store_data = dap_db_read_data(l_query, &l_count); - unlock(); - if(l_count == 1 && store_data && !strcmp(store_data->key, a_key)) { - l_ret_value = (store_data->value) ? DAP_NEW_SIZE(uint8_t, store_data->value_len) : NULL; //ret_value = (store_data->value) ? strdup(store_data->value) : NULL; - memcpy(l_ret_value, store_data->value, store_data->value_len); - if(a_data_out) - *a_data_out = store_data->value_len; + // read several items, 0 - no limits + if(a_data_len_out) + *a_data_len_out = 0; + dap_store_obj_t *l_store_data = dap_chain_global_db_driver_read(a_group, a_key, a_data_len_out); + if(l_store_data) { + l_ret_value = (l_store_data->value) ? DAP_NEW_SIZE(uint8_t, l_store_data->value_len) : NULL; //ret_value = (store_data->value) ? strdup(store_data->value) : NULL; + memcpy(l_ret_value, l_store_data->value, l_store_data->value_len); + if(a_data_len_out) + *a_data_len_out = l_store_data->value_len; } - dab_db_free_pdap_store_obj_t(store_data, l_count); - DAP_DELETE(l_query); return l_ret_value; + + /*ldb + * uint8_t *l_ret_value = NULL; + size_t l_count = 0; + if(!a_key) + return NULL; + size_t l_query_len =(size_t) snprintf(NULL, 0, "(&(cn=%s)(objectClass=%s))", a_key, a_group); + + char *l_query = DAP_NEW_Z_SIZE(char, l_query_len + 1); //char query[32 + strlen(a_key)]; + snprintf(l_query, l_query_len + 1, "(&(cn=%s)(objectClass=%s))", a_key, a_group); // objectClass != ou + lock(); + pdap_store_obj_t store_data = dap_db_read_data(l_query, &l_count); + unlock(); + if(l_count == 1 && store_data && !strcmp(store_data->key, a_key)) { + l_ret_value = (store_data->value) ? DAP_NEW_SIZE(uint8_t, store_data->value_len) : NULL; //ret_value = (store_data->value) ? strdup(store_data->value) : NULL; + memcpy(l_ret_value, store_data->value, store_data->value_len); + if(a_data_out) + *a_data_out = store_data->value_len; + } + dap_store_obj_free(store_data, l_count); + DAP_DELETE(l_query); + return l_ret_value;*/ } uint8_t * dap_chain_global_db_get(const char *a_key, size_t *a_data_out) @@ -221,45 +242,46 @@ uint8_t * dap_chain_global_db_get(const char *a_key, size_t *a_data_out) return dap_chain_global_db_gr_get(a_key, a_data_out, GROUP_LOCAL_GENERAL); } - /** * Set one entry to base */ bool dap_chain_global_db_gr_set(const char *a_key, const void *a_value, size_t a_value_len, const char *a_group) { pdap_store_obj_t store_data = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(struct dap_store_obj)); - store_data->key = strdup(a_key ); - store_data->value = DAP_NEW_Z_SIZE(uint8_t,a_value_len); + store_data->type = 'a'; + store_data->key = dap_strdup(a_key); + store_data->value = DAP_NEW_Z_SIZE(uint8_t, a_value_len); memcpy(store_data->value, a_value, a_value_len); - store_data->value_len = (a_value_len == (size_t) -1) ? strlen((const char* ) a_value) : a_value_len; - store_data->group = strdup(a_group); + store_data->value_len = (a_value_len == (size_t) -1) ? dap_strlen((const char*) a_value) : a_value_len; + store_data->group = dap_strdup(a_group); store_data->timestamp = time(NULL); lock(); - int l_res = dap_db_add(store_data, 1); + int l_res = dap_chain_global_db_driver_add(store_data, 1); unlock(); // Extract prefix if added successfuly, add history log and call notify callback if present - if (!l_res ){ - char * l_group_prefix = extract_group_prefix (a_group); + if(!l_res) { + char * l_group_prefix = extract_group_prefix(a_group); history_group_item_t * l_history_group_item = NULL; - if ( l_group_prefix ) + if(l_group_prefix) HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item); - if ( l_history_group_item ){ - if ( l_history_group_item->auto_track ){ + if(l_history_group_item) { + if(l_history_group_item->auto_track) { lock(); dap_db_history_add('a', store_data, 1); unlock(); } - if ( l_history_group_item->callback_notify ) - l_history_group_item->callback_notify(l_history_group_item->callback_arg, 'a',l_group_prefix,a_group,a_key,a_value,a_value_len); + if(l_history_group_item->callback_notify) + l_history_group_item->callback_notify(l_history_group_item->callback_arg, 'a', l_group_prefix, a_group, + a_key, a_value, a_value_len); } - if ( l_group_prefix) - DAP_DELETE( l_group_prefix); - }else { - log_it(L_ERROR,"Save error: %d",l_res); + if(l_group_prefix) + DAP_DELETE(l_group_prefix); + } else { + log_it(L_ERROR, "Save error: %d", l_res); } DAP_DELETE(store_data); @@ -278,27 +300,28 @@ bool dap_chain_global_db_gr_del(const char *a_key, const char *a_group) if(!a_key) return NULL; pdap_store_obj_t store_data = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(struct dap_store_obj)); - store_data->key = strdup( a_key); - store_data->group = strdup( a_group); + store_data->key = dap_strdup(a_key); + store_data->group = dap_strdup(a_group); lock(); - int l_res = dap_db_delete(store_data, 1); + int l_res = dap_chain_global_db_driver_delete(store_data, 1); unlock(); - if (!l_res){ + if(!l_res) { // Extract prefix - char * l_group_prefix = extract_group_prefix (a_group); + char * l_group_prefix = extract_group_prefix(a_group); history_group_item_t * l_history_group_item = NULL; - if ( l_group_prefix ) + if(l_group_prefix) HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item); - if (l_history_group_item ){ + if(l_history_group_item) { if(l_history_group_item->auto_track) { lock(); dap_db_history_add('d', store_data, 1); unlock(); } - if ( l_history_group_item->callback_notify ) - l_history_group_item->callback_notify(l_history_group_item->callback_arg,'d',l_group_prefix,a_group, a_key, NULL, 0); + if(l_history_group_item->callback_notify) + l_history_group_item->callback_notify(l_history_group_item->callback_arg, 'd', l_group_prefix, a_group, + a_key, NULL, 0); } - if ( l_group_prefix ) + if(l_group_prefix) DAP_DELETE(l_group_prefix); } DAP_DELETE(store_data); @@ -310,6 +333,41 @@ bool dap_chain_global_db_del(const char *a_key) { return dap_chain_global_db_gr_del(a_key, GROUP_LOCAL_GENERAL); } + + +/** + * Read last item in global_db + * + * @param data_size[out] size of output array + * @return array (note:not Null-terminated string) on NULL in case of an error + */ +dap_store_obj_t* dap_chain_global_db_get_last(const char *a_group) +{ + // Read data + lock(); + dap_store_obj_t *l_store_obj = dap_chain_global_db_driver_read_last(a_group); + unlock(); + return l_store_obj; +} + + + + +/** + * Read the entire database with condition into an array of size bytes + * + * @param data_size[out] size of output array + * @return array (note:not Null-terminated string) on NULL in case of an error + */ +dap_store_obj_t* dap_chain_global_db_cond_load(const char *a_group, uint64_t a_first_id, size_t *a_data_size_out) +{ + // Read data + lock(); + dap_store_obj_t *l_store_obj = dap_chain_global_db_driver_cond_read(a_group, a_first_id, a_data_size_out); + unlock(); + return l_store_obj; +} + /** * Read the entire database into an array of size bytes * @@ -318,45 +376,65 @@ bool dap_chain_global_db_del(const char *a_key) */ dap_global_db_obj_t** dap_chain_global_db_gr_load(const char *a_group, size_t *a_data_size_out) { - size_t l_query_len = (size_t) snprintf(NULL, 0, "(objectClass=%s)", a_group); - char *l_query = DAP_NEW_Z_SIZE(char, l_query_len + 1); - //const char *query = "(objectClass=addr_leased)"; - snprintf(l_query, l_query_len + 1, "(objectClass=%s)", a_group); size_t count = 0; // Read data lock(); - pdap_store_obj_t store_obj = dap_db_read_data(l_query, &count); + dap_store_obj_t *l_store_obj = dap_chain_global_db_driver_read(a_group, NULL, &count); unlock(); - DAP_DELETE(l_query); - // Serialization data - dap_store_obj_pkt_t *pkt = dap_store_packet_multiple(store_obj, 0, count); - dab_db_free_pdap_store_obj_t(store_obj, count); - if(pkt) - { - size_t count_new = 0; - pdap_store_obj_t store_data = dap_store_unpacket(pkt, &count_new); - assert(count_new == count); - //char **data = DAP_NEW_SIZE(char*, (count_new + 1) * sizeof(char*)); - dap_global_db_obj_t **data = DAP_NEW_Z_SIZE(dap_global_db_obj_t*, - (count_new + 1) * sizeof(dap_global_db_obj_t*)); // last item in mass must be zero - for(size_t i = 0; i < count_new; i++) { - pdap_store_obj_t store_data_cur = store_data + i; - assert(store_data_cur); - data[i] = DAP_NEW(dap_global_db_obj_t); - data[i]->key = strdup(store_data_cur->key); - data[i]->value_len = store_data_cur->value_len; - data[i]->value = DAP_NEW_Z_SIZE(uint8_t, store_data_cur->value_len + 1); - memcpy(data[i]->value, store_data_cur->value, store_data_cur->value_len); - } - DAP_DELETE(store_data); - DAP_DELETE(pkt); - if(a_data_size_out) - *a_data_size_out = count_new; - return data; + if(!l_store_obj || !count) + return NULL; + dap_global_db_obj_t **l_data = DAP_NEW_Z_SIZE(dap_global_db_obj_t*, + (count + 1) * sizeof(dap_global_db_obj_t*)); // last item in mass must be zero + for(size_t i = 0; i < count; i++) { + dap_store_obj_t *l_store_obj_cur = l_store_obj + i; + assert(l_store_obj_cur); + l_data[i] = DAP_NEW(dap_global_db_obj_t); + l_data[i]->key = dap_strdup(l_store_obj_cur->key); + l_data[i]->value_len = l_store_obj_cur->value_len; + l_data[i]->value = DAP_NEW_Z_SIZE(uint8_t, l_store_obj_cur->value_len + 1); + memcpy(l_data[i]->value, l_store_obj_cur->value, l_store_obj_cur->value_len); } + dap_store_obj_free(l_store_obj, count); if(a_data_size_out) - *a_data_size_out = 0; - return NULL; + *a_data_size_out = count; + return l_data; + /*size_t l_query_len = (size_t) snprintf(NULL, 0, "(objectClass=%s)", a_group); + char *l_query = DAP_NEW_Z_SIZE(char, l_query_len + 1); + //const char *query = "(objectClass=addr_leased)"; + snprintf(l_query, l_query_len + 1, "(objectClass=%s)", a_group); + size_t count = 0; + // Read data + lock(); + pdap_store_obj_t store_obj = dap_chain_global_db_driver_read(a_group, NULL, &count); + unlock(); + DAP_DELETE(l_query); + // Serialization data + dap_store_obj_pkt_t *pkt = dap_store_packet_multiple(store_obj, 0, count); + dap_store_obj_free(store_obj, count); + if(pkt) + { + size_t count_new = 0; + pdap_store_obj_t store_data = dap_store_unpacket_multiple(pkt, &count_new); + assert(count_new == count); + //char **data = DAP_NEW_SIZE(char*, (count_new + 1) * sizeof(char*)); + dap_global_db_obj_t **data = DAP_NEW_Z_SIZE(dap_global_db_obj_t*, + (count_new + 1) * sizeof(dap_global_db_obj_t*)); // last item in mass must be zero + for(size_t i = 0; i < count_new; i++) { + pdap_store_obj_t store_data_cur = store_data + i; + assert(store_data_cur); + data[i] = DAP_NEW(dap_global_db_obj_t); + data[i]->key = strdup(store_data_cur->key); + data[i]->value_len = store_data_cur->value_len; + data[i]->value = DAP_NEW_Z_SIZE(uint8_t, store_data_cur->value_len + 1); + memcpy(data[i]->value, store_data_cur->value, store_data_cur->value_len); + } + DAP_DELETE(store_data); + DAP_DELETE(pkt); + if(a_data_size_out) + *a_data_size_out = count_new; + return data; + }*/ + } dap_global_db_obj_t** dap_chain_global_db_load(size_t *a_data_size_out) @@ -370,7 +448,7 @@ dap_global_db_obj_t** dap_chain_global_db_load(size_t *a_data_size_out) */ bool dap_chain_global_db_obj_save(void* a_store_data, size_t a_objs_count) { - dap_store_obj_t* l_store_data = (dap_store_obj_t*) a_store_data; +/* dap_store_obj_t* l_store_data = (dap_store_obj_t*) a_store_data; if(l_store_data && a_objs_count > 0) { // real records size_t l_objs_count = a_objs_count; @@ -382,7 +460,7 @@ bool dap_chain_global_db_obj_save(void* a_store_data, size_t a_objs_count) size_t l_count = 0; char *l_query = dap_strdup_printf("(&(cn=%s)(objectClass=%s))", l_obj->key, l_obj->group); lock(); - dap_store_obj_t *l_read_store_data = dap_db_read_data(l_query, &l_count); + dap_store_obj_t *l_read_store_data = dap_chain_global_db_driver_read(l_query, NULL, &l_count); unlock(); // whether to add a record if(l_obj->type == 'a' && l_read_store_data) { @@ -402,57 +480,50 @@ bool dap_chain_global_db_obj_save(void* a_store_data, size_t a_objs_count) // reduce the number of real records l_objs_count--; } - dab_db_free_pdap_store_obj_t(l_read_store_data, l_count); + dap_store_obj_free(l_read_store_data, l_count); DAP_DELETE(l_query); - } + }*/ - // save/delete data - if(l_objs_count > 0) { + // save/delete data + if(!a_objs_count) + return true; - lock(); - int l_res = -1; - //add a record - if(l_store_data->type == 'a') - l_res = dap_db_add(l_store_data, a_objs_count); - //delete a record - if(l_store_data->type == 'd') - l_res = dap_db_delete(l_store_data, a_objs_count); - unlock(); - // Extract prefix if added successfuly, add history log and call notify callback if present - if (!l_res){ - for (size_t i =0; i< l_objs_count; i++ ){ - history_group_item_t * l_history_group_item = NULL; - dap_store_obj_t* l_obj = l_store_data + i; - char * l_group_prefix = extract_group_prefix (l_obj->group ); - if ( l_group_prefix ) - HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item); - - if ( l_history_group_item ){ - if ( l_history_group_item->auto_track ){ - lock(); - dap_db_history_add(l_store_data->type, l_store_data, 1); - unlock(); - } - if ( l_history_group_item->callback_notify ){ - if (l_obj){ - l_history_group_item->callback_notify( l_history_group_item->callback_arg, l_store_data->type, - l_group_prefix, l_obj->group , l_obj->key, - l_obj->value, l_obj->value_len ); - }else { - break; - } - } + lock(); + int l_res = dap_chain_global_db_driver_appy(a_store_data, a_objs_count); + unlock(); + + // Extract prefix if added successfuly, add history log and call notify callback if present + if(!l_res) { + for(size_t i = 0; i < a_objs_count; i++) { + history_group_item_t * l_history_group_item = NULL; + dap_store_obj_t* l_obj = a_store_data + i; + char * l_group_prefix = extract_group_prefix(l_obj->group); + if(l_group_prefix) + HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item); + + if(l_history_group_item) { + if(l_history_group_item->auto_track) { + lock(); + dap_db_history_add(l_obj->type, l_obj, 1); + unlock(); + } + if(l_history_group_item->callback_notify) { + if(l_obj) { + l_history_group_item->callback_notify(l_history_group_item->callback_arg, + l_obj->type, + l_group_prefix, l_obj->group, l_obj->key, + l_obj->value, l_obj->value_len); + } else { + break; } - DAP_DELETE( l_group_prefix); } - } - if(!l_res) - return true; + DAP_DELETE(l_group_prefix); } - else - return true; + } + if(!l_res) + return true; return false; } @@ -460,51 +531,56 @@ bool dap_chain_global_db_gr_save(dap_global_db_obj_t* a_objs, size_t a_objs_coun { dap_store_obj_t *l_store_data = DAP_NEW_Z_SIZE(dap_store_obj_t, a_objs_count * sizeof(struct dap_store_obj)); time_t l_timestamp = time(NULL); + char *l_group = dap_strdup(a_group); for(size_t q = 0; q < a_objs_count; ++q) { dap_store_obj_t *store_data_cur = l_store_data + q; dap_global_db_obj_t *a_obj_cur = a_objs + q; store_data_cur->key = a_obj_cur->key; - store_data_cur->group = strdup(a_group); + store_data_cur->group = l_group; store_data_cur->value = a_obj_cur->value; store_data_cur->value_len = a_obj_cur->value_len; store_data_cur->timestamp = l_timestamp; } if(l_store_data) { lock(); - int l_res = dap_db_add(l_store_data, a_objs_count); + int l_res = dap_chain_global_db_driver_add(l_store_data, a_objs_count); unlock(); - if (!l_res){ - for (size_t i =0; i< a_objs_count; i++ ){ + if(!l_res) { + for(size_t i = 0; i < a_objs_count; i++) { history_group_item_t * l_history_group_item = NULL; dap_store_obj_t *l_obj = l_store_data + i; - char * l_group_prefix = extract_group_prefix (l_obj->group ); - if ( l_group_prefix ) + char * l_group_prefix = extract_group_prefix(l_obj->group); + if(l_group_prefix) HASH_FIND_STR(s_history_group_items, l_group_prefix, l_history_group_item); - if ( l_history_group_item ){ - if ( l_history_group_item->auto_track ){ + if(l_history_group_item) { + if(l_history_group_item->auto_track) { lock(); dap_db_history_add('a', l_store_data, 1); unlock(); } - if ( l_history_group_item->callback_notify ){ - if (l_obj){ - l_history_group_item->callback_notify(l_history_group_item->callback_arg,'a',l_group_prefix, l_obj->group , l_obj->key, - l_obj->value, l_obj->value_len ); - }else { - break; - } + if(l_history_group_item->callback_notify) { + if(l_obj) { + l_history_group_item->callback_notify(l_history_group_item->callback_arg, 'a', + l_group_prefix, l_obj->group, l_obj->key, + l_obj->value, l_obj->value_len); + } else { + break; + } } } - DAP_DELETE( l_group_prefix); + DAP_DELETE(l_group_prefix); } } - DAP_DELETE(l_store_data); //dab_db_free_pdap_store_obj_t(store_data, a_objs_count); - if(!l_res) + DAP_DELETE(l_store_data); //dap_store_obj_free(store_data, a_objs_count); + if(!l_res){ + DAP_DELETE(l_group); return true; + } } + DAP_DELETE(l_group); return false; } @@ -520,32 +596,21 @@ bool dap_chain_global_db_save(dap_global_db_obj_t* a_objs, size_t a_objs_count) */ char* dap_chain_global_db_hash(const uint8_t *data, size_t data_size) { - if(!data || data_size <= 0) - return NULL; - dap_chain_hash_fast_t l_hash; - dap_hash_fast( data, data_size, &l_hash); - size_t a_str_max = (sizeof(l_hash.raw) + 1) * 2 + 2; /* heading 0x */ - char *a_str = DAP_NEW_Z_SIZE(char, a_str_max); - size_t hash_len = dap_chain_hash_fast_to_str(&l_hash, a_str, a_str_max); - if(!hash_len) { - DAP_DELETE(a_str); - return NULL; - } - return a_str; + return dap_chain_global_db_driver_hash(data, data_size); } /** -* Parse data from dap_db_log_pack() -* -* return dap_store_obj_t* -*/ + * Parse data from dap_db_log_pack() + * + * return dap_store_obj_t* + */ void* dap_db_log_unpack(const void *a_data, size_t a_data_size, size_t *a_store_obj_count) { const dap_store_obj_pkt_t *l_pkt = (const dap_store_obj_pkt_t*) a_data; - if(!l_pkt || l_pkt->data_size != ( (size_t ) a_data_size - sizeof(dap_store_obj_pkt_t))) + if(!l_pkt || l_pkt->data_size != ((size_t) a_data_size - sizeof(dap_store_obj_pkt_t))) return NULL; size_t l_store_obj_count = 0; - dap_store_obj_t *l_obj = dap_store_unpacket(l_pkt, &l_store_obj_count); + dap_store_obj_t *l_obj = dap_store_unpacket_multiple(l_pkt, &l_store_obj_count); if(a_store_obj_count) *a_store_obj_count = l_store_obj_count; @@ -582,7 +647,7 @@ char* dap_db_log_get_diff(size_t *a_data_size_out) l_keys_vals[i] = l_obj_cur->key; l_keys_vals[i + l_data_size_out] = (char*) l_obj_cur->value; } - if (a_data_size_out) + if(a_data_size_out) *a_data_size_out = l_data_size_out; // last element - NULL (marker) l_keys_vals[l_data_size_out * 2] = NULL; diff --git a/dap_chain_global_db.h b/dap_chain_global_db.h index e0d01fec390c634390f227b6ac4103e3f9e91670..17a9aae76976d9e63ec4e63ad74dc89e060fa155 100755 --- a/dap_chain_global_db.h +++ b/dap_chain_global_db.h @@ -7,6 +7,8 @@ #include "dap_common.h" #include "dap_config.h" #include "dap_list.h" +#include "dap_chain_global_db_driver.h" + #define GROUP_LOCAL_HISTORY "global.history" #define GROUP_LOCAL_NODE_LAST_TS "local.node.last_ts" @@ -44,7 +46,6 @@ void dap_chain_global_db_deinit(void); /** * Setup callbacks and filters */ - void dap_chain_global_db_add_history_group_prefix(const char * a_group_prefix); // Add group prefix that will be tracking all changes void dap_chain_global_db_add_history_callback_notify(const char * a_group_prefix, dap_global_db_obj_callback_notify_t a_callback, void * a_arg); @@ -74,6 +75,8 @@ bool dap_chain_global_db_del(const char *a_key); * @param data_size[out] size of output array * @return array (note:not Null-terminated string) on NULL in case of an error */ +dap_store_obj_t* dap_chain_global_db_get_last(const char *a_group); +dap_store_obj_t* dap_chain_global_db_cond_load(const char *a_group, uint64_t a_first_id, size_t *a_data_size_out); dap_global_db_obj_t** dap_chain_global_db_gr_load(const char *a_group, size_t *a_data_size_out); dap_global_db_obj_t** dap_chain_global_db_load(size_t *a_data_size_out); @@ -99,7 +102,7 @@ uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out); // Parse data from dap_db_log_pack() void* dap_db_log_unpack(const void *a_data, size_t a_data_size, size_t *a_store_obj_count); // Get timestamp from dap_db_log_pack() -time_t dap_db_log_unpack_get_timestamp(uint8_t *a_data, size_t a_data_size); +//time_t dap_db_log_unpack_get_timestamp(uint8_t *a_data, size_t a_data_size); // Get last timestamp in log time_t dap_db_log_get_last_timestamp(void); diff --git a/dap_chain_global_db_driver.c b/dap_chain_global_db_driver.c new file mode 100755 index 0000000000000000000000000000000000000000..bb456a1f4595abc27f16b436d07fa2fb7d9fd605 --- /dev/null +++ b/dap_chain_global_db_driver.c @@ -0,0 +1,525 @@ +/* + * Authors: + * Alexander Lysikov <alexander.lysikov@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2019 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <stddef.h> +#include <errno.h> +#include <stdint.h> +#include <string.h> +#include <pthread.h> + +#include "dap_common.h" +#include "dap_strfuncs.h" +#include "dap_list.h" +#include "dap_hash.h" + +#include "dap_chain_global_db_driver_sqlite.h" +#include "dap_chain_global_db_driver.h" + +#define LOG_TAG "db_driver" + +static char *s_used_driver = NULL; + +static int save_write_buf(void); + +// for write buffer +pthread_mutex_t s_mutex_add_start = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t s_mutex_add_end = PTHREAD_MUTEX_INITIALIZER; +//pthread_rwlock_rdlock +// new data in buffer to write +pthread_mutex_t s_mutex_cond = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t s_cond_add_end; // = PTHREAD_COND_INITIALIZER; +// writing ended +pthread_mutex_t s_mutex_write_end = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t s_cond_write_end; // = PTHREAD_COND_INITIALIZER; + +dap_list_t *s_list_begin = NULL; +dap_list_t *s_list_end = NULL; + +pthread_t s_write_buf_thread; +volatile static bool s_write_buf_state = 0; +static void* func_write_buf(void * arg); + +static dap_db_driver_callbacks_t s_drv_callback; + +/** + * Select driver + * driver_name may be "ldb", "sqlite" + * + * return 0 OK, <0 Error + */ +int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db) +{ + int l_ret = -1; + if(s_used_driver) + dap_db_driver_deinit(); + s_used_driver = dap_strdup(a_driver_name); + memset(&s_drv_callback, 0, sizeof(dap_db_driver_callbacks_t)); + if(!dap_strcmp(s_used_driver, "ldb")) + l_ret = -1; + if(!dap_strcmp(s_used_driver, "sqlite")) + l_ret = dap_db_driver_sqlite_init(a_filename_db, &s_drv_callback); + if(!l_ret) { + pthread_condattr_t l_condattr; + pthread_condattr_init(&l_condattr); + pthread_condattr_setclock(&l_condattr, CLOCK_MONOTONIC); + pthread_cond_init(&s_cond_add_end, &l_condattr); + pthread_cond_init(&s_cond_write_end, &l_condattr); + // thread for save buffer to database + s_write_buf_state = true; + pthread_create(&s_write_buf_thread, NULL, func_write_buf, NULL); + } + return l_ret; +} + +/** + * Shutting down the db library + */ + +void dap_db_driver_deinit(void) +{ + // wait for close thread + { + pthread_mutex_lock(&s_mutex_cond); + pthread_cond_broadcast(&s_cond_add_end); + pthread_mutex_unlock(&s_mutex_cond); + + s_write_buf_state = false; + pthread_join(s_write_buf_thread, NULL); + } + + //save_write_buf(); + pthread_mutex_lock(&s_mutex_add_end); + pthread_mutex_lock(&s_mutex_add_start); + while(s_list_begin != s_list_end) { + // free memory + dap_store_obj_free((dap_store_obj_t*) s_list_begin->data, 1); + dap_list_free1(s_list_begin); + s_list_begin = dap_list_next(s_list_begin); + } + //dap_store_obj_free((dap_store_obj_t*) s_list_begin->data, 1); + dap_list_free1(s_list_begin); + s_list_begin = s_list_end = NULL; + pthread_mutex_unlock(&s_mutex_add_start); + pthread_mutex_unlock(&s_mutex_add_end); + // deinit driver + if(s_drv_callback.deinit) + s_drv_callback.deinit(); + + pthread_cond_destroy(&s_cond_add_end); + +} + +dap_store_obj_t* dap_store_obj_copy(dap_store_obj_t *a_store_obj, size_t a_store_count) +{ + if(!a_store_obj || !a_store_count) + return NULL; + dap_store_obj_t *l_store_obj = DAP_NEW_SIZE(dap_store_obj_t, sizeof(dap_store_obj_t) * a_store_count); + for(size_t i = 0; i < a_store_count; i++) { + dap_store_obj_t *l_store_obj_dst = l_store_obj + i; + dap_store_obj_t *l_store_obj_src = a_store_obj + i; + memcpy(l_store_obj_dst, l_store_obj_src, sizeof(dap_store_obj_t)); + l_store_obj_dst->group = dap_strdup(l_store_obj_src->group); + l_store_obj_dst->key = dap_strdup(l_store_obj_src->key); + l_store_obj_dst->value = DAP_NEW_SIZE(uint8_t, l_store_obj_dst->value_len); + memcpy(l_store_obj_dst->value, l_store_obj_src->value, l_store_obj_dst->value_len); + } + return l_store_obj; +} + +void dap_store_obj_free(dap_store_obj_t *a_store_obj, size_t a_store_count) +{ + if(!a_store_obj) + return; + for(size_t i = 0; i < a_store_count; i++) { + dap_store_obj_t *l_store_obj_cur = a_store_obj + i; + DAP_DELETE(l_store_obj_cur->group); + DAP_DELETE(l_store_obj_cur->key); + DAP_DELETE(l_store_obj_cur->value); + } + DAP_DELETE(a_store_obj); +} + +static size_t dap_db_get_size_pdap_store_obj_t(pdap_store_obj_t store_obj) +{ + size_t size = sizeof(uint32_t) + 2 * sizeof(uint16_t) + sizeof(size_t) + sizeof(time_t) + dap_strlen(store_obj->group) + + dap_strlen(store_obj->key) + store_obj->value_len; + return size; +} + +/** + * serialization + * @param a_store_obj_count count of structures store_obj + * @param a_timestamp create data time + * @param a_size_out[out] size of output structure + * @return NULL in case of an error + */ +dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp, size_t a_store_obj_count) +{ + if(!a_store_obj || a_store_obj_count < 1) + return NULL; + size_t l_data_size_out = sizeof(uint32_t); // size of output data + // calculate output structure size + for(size_t l_q = 0; l_q < a_store_obj_count; ++l_q) + l_data_size_out += dap_db_get_size_pdap_store_obj_t(&a_store_obj[l_q]); + + dap_store_obj_pkt_t *l_pkt = DAP_NEW_Z_SIZE(dap_store_obj_pkt_t, sizeof(dap_store_obj_pkt_t) + l_data_size_out); + l_pkt->data_size = l_data_size_out; + l_pkt->timestamp = a_timestamp; + uint64_t l_offset = 0; + uint32_t l_count = (uint32_t) a_store_obj_count; + memcpy(l_pkt->data + l_offset, &l_count, sizeof(uint32_t)); + l_offset += sizeof(uint32_t); + for( size_t l_q = 0; l_q < a_store_obj_count; ++l_q) { + dap_store_obj_t obj = a_store_obj[l_q]; + //uint16_t section_size = (uint16_t) dap_strlen(obj.section); + uint16_t group_size = (uint16_t) dap_strlen(obj.group); + uint16_t key_size = (uint16_t) dap_strlen(obj.key); + memcpy(l_pkt->data + l_offset, &obj.type, sizeof(int)); + l_offset += sizeof(int); + //memcpy(l_pkt->data + l_offset, §ion_size, sizeof(uint16_t)); + //l_offset += sizeof(uint16_t); + //memcpy(l_pkt->data + l_offset, obj.section, section_size); + //l_offset += section_size; + memcpy(l_pkt->data + l_offset, &group_size, sizeof(uint16_t)); + l_offset += sizeof(uint16_t); + memcpy(l_pkt->data + l_offset, obj.group, group_size); + l_offset += group_size; + memcpy(l_pkt->data + l_offset, &obj.timestamp, sizeof(time_t)); + l_offset += sizeof(time_t); + memcpy(l_pkt->data + l_offset, &key_size, sizeof(uint16_t)); + l_offset += sizeof(uint16_t); + memcpy(l_pkt->data + l_offset, obj.key, key_size); + l_offset += key_size; + memcpy(l_pkt->data + l_offset, &obj.value_len, sizeof(size_t)); + l_offset += sizeof(size_t); + memcpy(l_pkt->data + l_offset, obj.value, obj.value_len); + l_offset += obj.value_len; + } + assert(l_data_size_out == l_offset); + return l_pkt; +} +/** + * deserialization + * @param store_obj_count[out] count of the output structures store_obj + * @return NULL in case of an error* + */ + +dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, size_t *store_obj_count) +{ + if(!pkt || pkt->data_size < 1) + return NULL; + uint64_t offset = 0; + uint32_t count; + memcpy(&count, pkt->data, sizeof(uint32_t)); + offset += sizeof(uint32_t); + dap_store_obj_t *store_obj = DAP_NEW_Z_SIZE(dap_store_obj_t, count * sizeof(struct dap_store_obj)); + for(size_t q = 0; q < count; ++q) { + dap_store_obj_t *obj = store_obj + q; + uint16_t str_size; + memcpy(&obj->type, pkt->data + offset, sizeof(int)); + offset += sizeof(int); + + //memcpy(&str_size, pkt->data + offset, sizeof(uint16_t)); + //offset += sizeof(uint16_t); + //obj->section = DAP_NEW_Z_SIZE(char, str_size + 1); + //memcpy(obj->section, pkt->data + offset, str_size); + //offset += str_size; + + memcpy(&str_size, pkt->data + offset, sizeof(uint16_t)); + offset += sizeof(uint16_t); + obj->group = DAP_NEW_Z_SIZE(char, str_size + 1); + memcpy(obj->group, pkt->data + offset, str_size); + offset += str_size; + + memcpy(&obj->timestamp, pkt->data + offset, sizeof(time_t)); + offset += sizeof(time_t); + + memcpy(&str_size, pkt->data + offset, sizeof(uint16_t)); + offset += sizeof(uint16_t); + obj->key = DAP_NEW_Z_SIZE(char, str_size + 1); + memcpy(obj->key, pkt->data + offset, str_size); + offset += str_size; + + memcpy(&obj->value_len, pkt->data + offset, sizeof(size_t)); + offset += sizeof(size_t); + + obj->value = DAP_NEW_Z_SIZE(uint8_t, obj->value_len + 1); + memcpy(obj->value, pkt->data + offset, obj->value_len); + offset += obj->value_len; + } + assert(pkt->data_size == offset); + if(store_obj_count) + *store_obj_count = count; + return store_obj; +} + +/** + * Calc hash for data + * + * return hash or NULL + */ +char* dap_chain_global_db_driver_hash(const uint8_t *data, size_t data_size) +{ + if(!data || data_size <= 0) + return NULL; + dap_chain_hash_fast_t l_hash; + dap_hash_fast(data, data_size, &l_hash); + size_t a_str_max = (sizeof(l_hash.raw) + 1) * 2 + 2; /* heading 0x */ + char *a_str = DAP_NEW_Z_SIZE(char, a_str_max); + size_t hash_len = dap_chain_hash_fast_to_str(&l_hash, a_str, a_str_max); + if(!hash_len) { + DAP_DELETE(a_str); + return NULL; + } + return a_str; +} + +/** + * Wait data to write buffer + * return 0 - Ok, 1 - timeout + */ +static int wait_data(pthread_mutex_t *a_mutex, pthread_cond_t *a_cond, int l_timeout_ms) +{ + int l_res = 0; + pthread_mutex_lock(a_mutex); + // endless waiting + if(l_timeout_ms == -1) + l_res = pthread_cond_wait(a_cond, a_mutex); + // waiting no more than timeout in milliseconds + else { + struct timespec l_to; + clock_gettime(CLOCK_MONOTONIC, &l_to); + int64_t l_nsec_new = l_to.tv_nsec + l_timeout_ms * 1000000ll; + // if the new number of nanoseconds is more than a second + if(l_nsec_new > (long) 1e9) { + l_to.tv_sec += l_nsec_new / (long) 1e9; + l_to.tv_nsec = l_nsec_new % (long) 1e9; + } + else + l_to.tv_nsec = (long) l_nsec_new; + l_res = pthread_cond_timedwait(a_cond, a_mutex, &l_to); + } + pthread_mutex_unlock(a_mutex); + if(l_res == ETIMEDOUT) + return 1; + return l_res; +} + +// return 0 if buffer empty, 1 data present +static bool check_fill_buf(void) +{ + dap_list_t *l_list_begin; + dap_list_t *l_list_end; + pthread_mutex_lock(&s_mutex_add_start); + pthread_mutex_lock(&s_mutex_add_end); + l_list_end = s_list_end; + l_list_begin = s_list_begin; + pthread_mutex_unlock(&s_mutex_add_end); + pthread_mutex_unlock(&s_mutex_add_start); + + bool l_ret = (l_list_begin != l_list_end) ? 1 : 0; +// if(l_ret) +// printf("** Wait s_beg=0x%x s_end=0x%x \n", l_list_begin, l_list_end); + return l_ret; +} + +// wait apply write buffer +static void wait_write_buf() +{ +// printf("** Start wait data\n"); + // wait data + while(1) { + if(!check_fill_buf()) + break; + if(!wait_data(&s_mutex_write_end, &s_cond_write_end, 50)) + break; + } +// printf("** End wait data\n"); +} + +// save data from buffer to database +static int save_write_buf(void) +{ + dap_list_t *l_list_end; + // fix end of buffer + pthread_mutex_lock(&s_mutex_add_end); + l_list_end = s_list_end; + pthread_mutex_unlock(&s_mutex_add_end); + // save data from begin to fixed end + pthread_mutex_lock(&s_mutex_add_start); + if(s_list_begin != l_list_end) { + if(s_drv_callback.transaction_start) + s_drv_callback.transaction_start(); + int cnt = 0; + while(s_list_begin != l_list_end) { + // apply to database + dap_store_obj_t *l_obj = s_list_begin->data; + assert(l_obj); + if(s_drv_callback.apply_store_obj) { + if(!s_drv_callback.apply_store_obj(l_obj)) { + log_it(L_INFO, "Write item Ok %s/%s\n", l_obj->group, l_obj->key); + } + else { + log_it(L_ERROR, "Can't write item %s/%s\n", l_obj->group, l_obj->key); + } + } + + s_list_begin = dap_list_next(s_list_begin); +// printf("** ap2*record *l_beg=0x%x l_nex=0x%x d_beg=0x%x l_end=0x%x d_end=0x%x sl_end=0x%x\n", s_list_begin, + // s_list_begin->next, s_list_begin->data, l_list_end, l_list_end->data, s_list_end); + + //printf("** free data=0x%x list=0x%x\n", s_list_begin->prev->data, s_list_begin->prev); + // free memory + dap_store_obj_free((dap_store_obj_t*) s_list_begin->prev->data, 1); + dap_list_free1(s_list_begin->prev); + s_list_begin->prev = NULL; + cnt++; + } + if(s_drv_callback.transaction_end) + s_drv_callback.transaction_end(); + //printf("** writing ended cnt=%d\n", cnt); + // writing ended + pthread_mutex_lock(&s_mutex_write_end); + pthread_cond_broadcast(&s_cond_write_end); + pthread_mutex_unlock(&s_mutex_write_end); + } + pthread_mutex_unlock(&s_mutex_add_start); + return 0; +} + +// thread for save data from buffer to database +static void* func_write_buf(void * arg) +{ + while(1) { + if(!s_write_buf_state) + break; + //save_write_buf + if(save_write_buf() == 0) { + if(!s_write_buf_state) + break; + // wait data + wait_data(&s_mutex_cond, &s_cond_add_end, 2000); // 2 sec + } + } + pthread_exit(0); +} + +int dap_chain_global_db_driver_appy(pdap_store_obj_t a_store_obj, size_t a_store_count) +{ + //dap_store_obj_t *l_store_obj = dap_store_obj_copy(a_store_obj, a_store_count); + if(!a_store_obj || !a_store_count) + return -1; + // add all records into write buffer + pthread_mutex_lock(&s_mutex_add_end); + for(size_t i = 0; i < a_store_count; i++) { + dap_store_obj_t *l_store_obj_cur = dap_store_obj_copy(a_store_obj + i, 1); + // first record in buf + if(!s_list_end) { + s_list_end = dap_list_append(s_list_end, l_store_obj_cur); + pthread_mutex_lock(&s_mutex_add_start); + s_list_begin = s_list_end; + pthread_mutex_unlock(&s_mutex_add_start); + //printf("*!!add record=0x%x / 0x%x obj=0x%x / 0x%x\n", s_list_end, s_list_end->data, s_list_end->prev); + } + else + s_list_end->data = l_store_obj_cur; + dap_list_append(s_list_end, NULL); + s_list_end = dap_list_last(s_list_end); + //printf("**+add record l_cur=0x%x / 0x%x l_new=0x%x / 0x%x\n", s_list_end->prev, s_list_end->prev->data,s_list_end, s_list_end->data); + } + // buffer changed + pthread_mutex_lock(&s_mutex_cond); + pthread_cond_broadcast(&s_cond_add_end); + pthread_mutex_unlock(&s_mutex_cond); + + pthread_mutex_unlock(&s_mutex_add_end); + return 0; +} + +int dap_chain_global_db_driver_add(pdap_store_obj_t a_store_obj, size_t a_store_count) +{ + a_store_obj->type = 'a'; + return dap_chain_global_db_driver_appy(a_store_obj, a_store_count); +} + +int dap_chain_global_db_driver_delete(pdap_store_obj_t a_store_obj, size_t a_store_count) +{ + a_store_obj->type = 'd'; + return dap_chain_global_db_driver_appy(a_store_obj, a_store_count); +} + +/** + * Read last items + * + * a_group - group name + */ +dap_store_obj_t* dap_chain_global_db_driver_read_last(const char *a_group) +{ + dap_store_obj_t *l_ret = NULL; + // wait apply write buffer + wait_write_buf(); + // read records using the selected database engine + if(s_drv_callback.read_last_store_obj) + l_ret = s_drv_callback.read_last_store_obj(a_group); + return l_ret; +} + +/** + * Read several items + * + * a_group - group name + * a_key - key name, may by NULL, it means reading the whole group + * a_id - from this id + * a_count_out[in], how many items to read, 0 - no limits + * a_count_out[out], how many items was read + */ +dap_store_obj_t* dap_chain_global_db_driver_cond_read(const char *a_group, uint64_t id, size_t *a_count_out) +{ + dap_store_obj_t *l_ret = NULL; + // wait apply write buffer + wait_write_buf(); + // read records using the selected database engine + if(s_drv_callback.read_cond_store_obj) + l_ret = s_drv_callback.read_cond_store_obj(a_group, id, a_count_out); + return l_ret; +} + +/** + * Read several items + * + * a_group - group name + * a_key - key name, may by NULL, it means reading the whole group + * a_count_out[in], how many items to read, 0 - no limits + * a_count_out[out], how many items was read + */ +dap_store_obj_t* dap_chain_global_db_driver_read(const char *a_group, const char *a_key, size_t *a_count_out) +{ + dap_store_obj_t *l_ret = NULL; + // wait apply write buffer + wait_write_buf(); + // read records using the selected database engine + if(s_drv_callback.read_store_obj) + l_ret = s_drv_callback.read_store_obj(a_group, a_key, a_count_out); + return l_ret; +} diff --git a/dap_chain_global_db_driver.h b/dap_chain_global_db_driver.h new file mode 100755 index 0000000000000000000000000000000000000000..b0e099d41408ceea3d77ba7eab03cedfdb525a5b --- /dev/null +++ b/dap_chain_global_db_driver.h @@ -0,0 +1,86 @@ +/* + * Authors: + * Alexander Lysikov <alexander.lysikov@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2019 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef _GLOBAL_DB_DRIVER_H_ +#define _GLOBAL_DB_DRIVER_H_ + +#include <stddef.h> +#include <stdint.h> +#include "dap_common.h" + +typedef struct dap_store_obj { + uint64_t id; + time_t timestamp; + uint8_t type; + char *group; + char *key; + uint8_t *value; + size_t value_len; +}DAP_ALIGN_PACKED dap_store_obj_t, *pdap_store_obj_t; + +typedef struct dap_store_obj_pkt { + time_t timestamp; + size_t data_size; + uint8_t data[]; +}__attribute__((packed)) dap_store_obj_pkt_t; + +typedef int (*dap_db_driver_write_callback_t)(dap_store_obj_t*); +typedef dap_store_obj_t* (*dap_db_driver_read_callback_t)(const char *,const char *, size_t *); +typedef dap_store_obj_t* (*dap_db_driver_read_cond_callback_t)(const char *,uint64_t , size_t *); +typedef dap_store_obj_t* (*dap_db_driver_read_last_callback_t)(const char *); +typedef int (*dap_db_driver_callback_t)(void); + +typedef struct dap_db_driver_callbacks { + dap_db_driver_write_callback_t apply_store_obj; + dap_db_driver_read_callback_t read_store_obj; + dap_db_driver_read_last_callback_t read_last_store_obj; + dap_db_driver_read_cond_callback_t read_cond_store_obj; + dap_db_driver_callback_t transaction_start; + dap_db_driver_callback_t transaction_end; + dap_db_driver_callback_t deinit; +} dap_db_driver_callbacks_t; + + +int dap_db_driver_init(const char *driver_name, const char *a_filename_db); +void dap_db_driver_deinit(void); + +dap_store_obj_t* dap_store_obj_copy(dap_store_obj_t *a_store_obj, size_t a_store_count); +void dap_store_obj_free(dap_store_obj_t *a_store_obj, size_t a_store_count); + +char* dap_chain_global_db_driver_hash(const uint8_t *data, size_t data_size); + +int dap_chain_global_db_driver_appy(pdap_store_obj_t a_store_obj, size_t a_store_count); +int dap_chain_global_db_driver_add(pdap_store_obj_t a_store_obj, size_t a_store_count); +int dap_chain_global_db_driver_delete(pdap_store_obj_t a_store_obj, size_t a_store_count); +dap_store_obj_t* dap_chain_global_db_driver_read_last(const char *a_group); +dap_store_obj_t* dap_chain_global_db_driver_cond_read(const char *a_group, uint64_t id, size_t *a_count_out); +dap_store_obj_t* dap_chain_global_db_driver_read(const char *a_group, const char *a_key, size_t *count_out); + +dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, + time_t a_timestamp, size_t a_store_obj_count); +dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *a_pkt, + size_t *a_store_obj_count); + + +#endif //_GLOBAL_DB_DRIVER_H_ diff --git a/dap_chain_global_db_driver_sqlite.c b/dap_chain_global_db_driver_sqlite.c new file mode 100755 index 0000000000000000000000000000000000000000..0a9ad3e311ce13940db012729df929fc44b02048 --- /dev/null +++ b/dap_chain_global_db_driver_sqlite.c @@ -0,0 +1,709 @@ +/* + * Authors: + * Alexander Lysikov <alexander.lysikov@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2019 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <stddef.h> +#include <string.h> +#include "dap_common.h" +#include "dap_hash.h" +#include "dap_strfuncs.h" +#include "dap_chain_global_db_driver_sqlite.h" + +#define LOG_TAG "db_sqlite" + +static sqlite3 *s_db = NULL; + +// Value of one field in the table +typedef struct _SQLITE_VALUE_ +{ + int32_t len; + char type; + /* + #define SQLITE_INTEGER 1 + #define SQLITE_FLOAT 2 + #define SQLITE_TEXT 3 + #define SQLITE_BLOB 4 + #define SQLITE_NULL 5 + */ + union + { + int val_int; + long long val_int64; + double val_float; + char *val_str; + unsigned char *val_blob; + } val; +} SQLITE_VALUE; + +// Content of one row in the table +typedef struct _SQLITE_ROW_VALUE_ +{ + int count; // number of columns in a row + SQLITE_VALUE *val; // array of field values +} SQLITE_ROW_VALUE; + +static int dap_db_driver_sqlite_exec(sqlite3 *l_db, const char *l_query, char **l_error_message); + +/** + * SQLite library initialization, no thread safe + * + * return 0 if Ok, else error code >0 + */ +int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks_t *a_drv_callback) +{ + int l_ret = -1; + if(sqlite3_threadsafe() && !sqlite3_config(SQLITE_CONFIG_SERIALIZED)) + l_ret = sqlite3_initialize(); + if(l_ret != SQLITE_OK) { + log_it(L_ERROR, "Can't init sqlite err=%d", l_ret); + return l_ret; + } + char *l_error_message = NULL; + s_db = dap_db_driver_sqlite_open(a_filename_db, SQLITE_OPEN_READWRITE, &l_error_message); + if(!s_db) { + log_it(L_ERROR, "Can't init sqlite err=%d", l_error_message); + dap_db_driver_sqlite_free(l_error_message); + } + else { + if(!dap_db_driver_sqlite_set_pragma(s_db, "synchronous", "NORMAL")) // 0 | OFF | 1 | NORMAL | 2 | FULL + printf("can't set new synchronous mode\n"); + if(!dap_db_driver_sqlite_set_pragma(s_db, "journal_mode", "OFF")) // DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF + printf("can't set new journal mode\n"); + + if(!dap_db_driver_sqlite_set_pragma(s_db, "page_size", "1024")) // DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF + printf("can't set page_size\n"); + // *PRAGMA page_size = bytes; // page size DB; it is reasonable to make it equal to the size of the disk cluster 4096 + // *PRAGMA cache_size = -kibibytes; // by default it is equal to 2000 pages of database +// + a_drv_callback->apply_store_obj = dap_db_driver_sqlite_apply_store_obj; + a_drv_callback->read_store_obj = dap_db_driver_sqlite_read_store_obj; + a_drv_callback->read_cond_store_obj = dap_db_driver_sqlite_read_cond_store_obj; + a_drv_callback->read_last_store_obj = dap_db_driver_sqlite_read_last_store_obj; + a_drv_callback->transaction_start = dap_db_driver_sqlite_start_transaction; + a_drv_callback->transaction_end = dap_db_driver_sqlite_end_transaction; + a_drv_callback->deinit = dap_db_driver_sqlite_deinit; + } + return l_ret; +} + +int dap_db_driver_sqlite_deinit(void) +{ + dap_db_driver_sqlite_close(s_db); + s_db = NULL; + return sqlite3_shutdown(); +} + +// additional function for sqlite to convert byte to number +static void byte_to_bin(sqlite3_context *l_context, int a_argc, sqlite3_value **a_argv) +{ + unsigned char *l_text; + if(a_argc != 1) + sqlite3_result_null(l_context); + l_text = (unsigned char *) sqlite3_value_blob(a_argv[0]); + if(l_text && l_text[0]) + { + int l_result = (int) l_text[0]; + sqlite3_result_int(l_context, l_result); + return; + } + sqlite3_result_null(l_context); +} + +/** + * Open SQLite database + * a_filename_utf8 - database file name + * a_flags - database access flags (SQLITE_OPEN_READONLY, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE) + * a_error_message[out] - Error messages (the memory requires deletion via sqlite_free ()) + * + * return: database identifier, NULL when an error occurs. + */ +sqlite3* dap_db_driver_sqlite_open(const char *a_filename_utf8, int a_flags, char **a_error_message) +{ + sqlite3 *l_db = NULL; + + int l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_FULLMUTEX, NULL); + // if unable to open the database file + if(l_rc == SQLITE_CANTOPEN) { + // try to create database + l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_FULLMUTEX | SQLITE_OPEN_CREATE, NULL); + } + if(l_rc != SQLITE_OK) + { + if(a_error_message) + *a_error_message = sqlite3_mprintf("Can't open database: %s\n", sqlite3_errmsg(l_db)); + sqlite3_close(l_db); + return NULL; + } + // added user functions + sqlite3_create_function(l_db, "byte_to_bin", 1, SQLITE_UTF8, NULL, &byte_to_bin, NULL, NULL); + return l_db; +} + +/** + * Close the database + */ +void dap_db_driver_sqlite_close(sqlite3 *l_db) +{ + if(l_db) + sqlite3_close(l_db); +} +/* + * Clear the memory allocated via sqlite3_mprintf() + */ +void dap_db_driver_sqlite_free(char *memory) +{ + if(memory) + sqlite3_free(memory); +} + +/** + * Set specific pragma statements + * www.sqlite.org/pragma.html + * + *PRAGMA page_size = bytes; // page size DB; it is reasonable to make it equal to the size of the disk cluster 4096 + *PRAGMA cache_size = -kibibytes; // by default it is equal to 2000 pages of database + *PRAGMA encoding = "UTF-8"; // default = UTF-8 + *PRAGMA foreign_keys = 1; // default = 0 + *PRAGMA journal_mode = DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF; + *PRAGMA synchronous = 0 | OFF | 1 | NORMAL | 2 | FULL; + */ +bool dap_db_driver_sqlite_set_pragma(sqlite3 *a_db, char *a_param, char *a_mode) +{ + if(!a_param || !a_mode) + { + printf("[sqlite_set_pragma] err!!! no param or mode\n"); + return false; + } + char *l_str_query = sqlite3_mprintf("PRAGMA %s = %s", a_param, a_mode); + int l_rc = dap_db_driver_sqlite_exec(a_db, l_str_query, NULL); // default synchronous=FULL + sqlite3_free(l_str_query); + if(l_rc == SQLITE_OK) + return true; + return false; +} + +/** + * Execute SQL query to database that does not return data + * + * return 0 if Ok, else error code >0 + */ +static int dap_db_driver_sqlite_exec(sqlite3 *l_db, const char *l_query, char **l_error_message) +{ + char *l_zErrMsg = NULL; + int l_rc = sqlite3_exec(l_db, l_query, NULL, 0, &l_zErrMsg); + //printf("%s\n",l_query); + if(l_rc != SQLITE_OK) + { + if(l_error_message && l_zErrMsg) + *l_error_message = sqlite3_mprintf("SQL error: %s", l_zErrMsg); + if(l_zErrMsg) + sqlite3_free(l_zErrMsg); + return l_rc; + } + if(l_zErrMsg) + sqlite3_free(l_zErrMsg); + return l_rc; +} + +/** + * Create table + * + * return 0 if Ok, else error code + */ +static int dap_db_driver_sqlite_create_group_table(const char *a_table_name) +{ + char *l_error_message = NULL; + if(!s_db || !a_table_name) + return -1; + char *l_query = + dap_strdup_printf( + "create table if not exists '%s'(id INTEGER NOT NULL PRIMARY KEY, key TEXT KEY, hash BLOB, ts INTEGER KEY, value BLOB)", + a_table_name); + if(dap_db_driver_sqlite_exec(s_db, (const char*) l_query, &l_error_message) != SQLITE_OK) + { + log_it(L_ERROR, "Creatу_table : %s\n", l_error_message); + dap_db_driver_sqlite_free(l_error_message); + DAP_DELETE(l_query); + return -1; + } + DAP_DELETE(l_query); + // create unique index - key + l_query = dap_strdup_printf("create unique index if not exists 'idx_key_%s' ON '%s' (key)", a_table_name, + a_table_name); + if(dap_db_driver_sqlite_exec(s_db, (const char*) l_query, &l_error_message) != SQLITE_OK) { + log_it(L_ERROR, "Create unique index : %s\n", l_error_message); + dap_db_driver_sqlite_free(l_error_message); + DAP_DELETE(l_query); + return -1; + } + DAP_DELETE(l_query); + return 0; +} + +/** + * Prepare SQL query for database + * l_query [in] SQL-string with a query to database, example: + * SELECT * FROM data + * SELECT id, sd FROM data LIMIT 300 + * SELECT id, sd FROM data ORDER BY id ASC/DESC + * SELECT * FROM data WHERE time>449464766900000 and time<449464766910000" + * SELECT * FROM data WHERE hex(sd) LIKE '%370%' + * hex(x'0806') -> '08f6' или quote(sd) -> X'08f6' + * substr(x'031407301210361320690000',3,2) -> x'0730' + * + * CAST(substr(sd,5,2) as TEXT) + * additional function of line to number _uint8 + * byte_to_bin(x'ff') -> 255 + */ +static int dap_db_driver_sqlite_query(sqlite3 *db, char *query, sqlite3_stmt **l_res, char **l_error_message) +{ + const char *pzTail; // OUT: Pointer to unused portion of zSql + int l_rc = sqlite3_prepare_v2(db, query, -1, l_res, &pzTail); + if(l_rc != SQLITE_OK) + { + if(l_error_message) + { + const char *zErrMsg = sqlite3_errmsg(db); + if(zErrMsg) + *l_error_message = sqlite3_mprintf("SQL Query error: %s\n", zErrMsg); + } + return l_rc; + } + return l_rc; +} + +/** + * Clear memory after fetching a string + * + * return 0 if Ok, else -1 + */ +static void dap_db_driver_sqlite_row_free(SQLITE_ROW_VALUE *row) +{ + if(row) { + // delete the whole string + sqlite3_free(row->val); + // delete structure + sqlite3_free(row); + } +} + +/** + * Selects the next entry from the result of the query and returns an array + * + * l_res: identifier received in sqlite_query () + * l_row_out [out]: pointer to a column or NULL + * + * return: + * SQLITE_ROW(100) has another row ready + * SQLITE_DONE(101) finished executing, + * SQLITE_CONSTRAINT(19) data is not unique and will not be added + */ +static int dap_db_driver_sqlite_fetch_array(sqlite3_stmt *l_res, SQLITE_ROW_VALUE **l_row_out) +{ + SQLITE_ROW_VALUE *l_row = NULL; + // go to next the string + int l_rc = sqlite3_step(l_res); + if(l_rc == SQLITE_ROW) // SQLITE_ROW(100) or SQLITE_DONE(101) or SQLITE_BUSY(5) + { + int l_iCol; // number of the column in the row + // allocate memory for a row with data + l_row = (SQLITE_ROW_VALUE*) sqlite3_malloc(sizeof(SQLITE_ROW_VALUE)); + int l_count = sqlite3_column_count(l_res); // get the number of columns + // allocate memory for all columns + l_row->val = (SQLITE_VALUE*) sqlite3_malloc(l_count * sizeof(SQLITE_VALUE)); + if(l_row->val) + { + l_row->count = l_count; // number of columns + for(l_iCol = 0; l_iCol < l_row->count; l_iCol++) + { + SQLITE_VALUE *cur_val = l_row->val + l_iCol; + cur_val->len = sqlite3_column_bytes(l_res, l_iCol); // how many bytes will be needed + cur_val->type = sqlite3_column_type(l_res, l_iCol); // field type + if(cur_val->type == SQLITE_INTEGER) + { + cur_val->val.val_int64 = sqlite3_column_int64(l_res, l_iCol); + cur_val->val.val_int = sqlite3_column_int(l_res, l_iCol); + } + else if(cur_val->type == SQLITE_FLOAT) + cur_val->val.val_float = sqlite3_column_double(l_res, l_iCol); + else if(cur_val->type == SQLITE_BLOB) + cur_val->val.val_blob = (unsigned char*) sqlite3_column_blob(l_res, l_iCol); + else if(cur_val->type == SQLITE_TEXT) + cur_val->val.val_str = (char*) sqlite3_column_text(l_res, l_iCol); //sqlite3_mprintf("%s",sqlite3_column_text(l_res,iCol)); + else + cur_val->val.val_str = NULL; + } + } + else + l_row->count = 0; // number of columns + } + if(l_row_out) + *l_row_out = l_row; + else + dap_db_driver_sqlite_row_free(l_row); + return l_rc; +} + +/** + * Clear memory when request processing is complete + */ +static bool dap_db_driver_sqlite_query_free(sqlite3_stmt *l_res) +{ + if(!l_res) + return false; + int rc = sqlite3_finalize(l_res); + if(rc != SQLITE_OK) + return false; + return true; +} + +/** + * Convert the array into a string to save to blob + */ +static char* dap_db_driver_get_string_from_blob(uint8_t *blob, int len) +{ + char *str_out; + int ret; + if(!blob) + return NULL; + str_out = (char*) sqlite3_malloc(len * 2 + 1); + ret = dap_bin2hex((char*) str_out, blob, len); + str_out[len * 2] = 0; + return str_out; + +} + +/** + * Cleaning the database from the deleted data + * + * return 0 if Ok, else error code >0 + */ +int dap_db_driver_sqlite_vacuum(sqlite3 *l_db) +{ + if(!s_db) + return -1; + int l_rc = dap_db_driver_sqlite_exec(l_db, "VACUUM", NULL); + return l_rc; +} + +/** + * Start a transaction + */ +int dap_db_driver_sqlite_start_transaction(void) +{ + if(s_db) + { + if(SQLITE_OK == dap_db_driver_sqlite_exec(s_db, "BEGIN", NULL)) + return 0; + else + return -1; + } + else + return -1; +} + +/** + * End of transaction + */ +int dap_db_driver_sqlite_end_transaction(void) +{ + if(s_db) + { + if(SQLITE_OK == dap_db_driver_sqlite_exec(s_db, "COMMIT", NULL)) + return 0; + else + return -1; + } + else + return -1; +} + +/** + * Apply data (write or delete) + * + */ +int dap_db_driver_sqlite_apply_store_obj(dap_store_obj_t *a_store_obj) +{ + if(!a_store_obj || !a_store_obj->group) + return -1; + char *l_query = NULL; + char *l_error_message = NULL; + if(a_store_obj->type == 'a') { + if(!a_store_obj->key || !a_store_obj->value || !a_store_obj->value_len) + return -1; + dap_chain_hash_fast_t l_hash; + dap_hash_fast(a_store_obj->value, a_store_obj->value_len, &l_hash); + + char *l_blob_hash = dap_db_driver_get_string_from_blob((uint8_t*) &l_hash, sizeof(dap_chain_hash_fast_t)); + char *l_blob_value = dap_db_driver_get_string_from_blob(a_store_obj->value, a_store_obj->value_len); + //add one record + l_query = sqlite3_mprintf("insert into '%s' values(NULL, '%s', x'%s', '%lld', x'%s')", + a_store_obj->group, a_store_obj->key, l_blob_hash, a_store_obj->timestamp, l_blob_value); + dap_db_driver_sqlite_free(l_blob_hash); + dap_db_driver_sqlite_free(l_blob_value); + } + else if(a_store_obj->type == 'd') { + //delete one record + if(a_store_obj->key) + l_query = sqlite3_mprintf("delete from '%s' where key = '%s'", + a_store_obj->group, a_store_obj->key); + // remove all group + else + l_query = sqlite3_mprintf("drop table if exists '%s'", a_store_obj->group); + } + else { + log_it(L_ERROR, "Unknown store_obj type '0x%x'", a_store_obj->type); + return -1; + } + int l_ret = dap_db_driver_sqlite_exec(s_db, l_query, NULL); + // missing database + if(l_ret == SQLITE_ERROR) { + // create table + dap_db_driver_sqlite_create_group_table(a_store_obj->group); + l_ret = dap_db_driver_sqlite_exec(s_db, l_query, &l_error_message); + } + dap_db_driver_sqlite_free(l_query); + // entry with the same hash is already present + if(l_ret == SQLITE_CONSTRAINT) { + log_it(L_INFO, "Entry with the same key is already present, %s", l_error_message); + dap_db_driver_sqlite_free(l_error_message); + return 0; + } + if(l_ret != SQLITE_OK) + { + log_it(L_ERROR, "sqlite apply error: %s", l_error_message); + dap_db_driver_sqlite_free(l_error_message); + return -1; + } + return 0; +} + +static void fill_one_item(const char *a_group, dap_store_obj_t *a_obj, SQLITE_ROW_VALUE *a_row) +{ + a_obj->group = dap_strdup(a_group); + + for(int l_iCol = 0; l_iCol < a_row->count; l_iCol++) { + SQLITE_VALUE *l_cur_val = a_row->val + l_iCol; + switch (l_iCol) { + case 0: + if(l_cur_val->type == SQLITE_INTEGER) + a_obj->id = l_cur_val->val.val_int64; + break; // id + case 1: + if(l_cur_val->type == SQLITE_INTEGER) + a_obj->timestamp = l_cur_val->val.val_int64; + break; // ts + case 2: + if(l_cur_val->type == SQLITE_TEXT) + a_obj->key = dap_strdup(l_cur_val->val.val_str); + break; // key + case 3: + if(l_cur_val->type == SQLITE_BLOB) + { + a_obj->value_len = (size_t) l_cur_val->len; + a_obj->value = DAP_NEW_SIZE(uint8_t, a_obj->value_len); + memcpy(a_obj->value, l_cur_val->val.val_blob, a_obj->value_len); + } + break; // value + } + } + +} + +/** + * Read last items + * + * a_group - group name + */ +dap_store_obj_t* dap_db_driver_sqlite_read_last_store_obj(const char *a_group) +{ + dap_store_obj_t *l_obj = NULL; + char *l_error_message = NULL; + sqlite3_stmt *l_res; + if(!a_group) + return NULL; + char *l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' ORDER BY id DESC LIMIT 1", a_group); + int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, &l_error_message); + sqlite3_free(l_str_query); + if(l_ret != SQLITE_OK) { + log_it(L_ERROR, "read last l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); + dap_db_driver_sqlite_free(l_error_message); + return NULL; + } + + SQLITE_ROW_VALUE *l_row = NULL; + l_ret = dap_db_driver_sqlite_fetch_array(l_res, &l_row); + if(l_ret != SQLITE_ROW && l_ret != SQLITE_DONE) + { + log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); + } + if(l_ret == SQLITE_ROW && l_row) { + l_obj = DAP_NEW_Z(dap_store_obj_t); + fill_one_item(a_group, l_obj, l_row); + } + dap_db_driver_sqlite_row_free(l_row); + dap_db_driver_sqlite_query_free(l_res); + + return l_obj; +} + +/** + * Read several items with conditoin + * + * a_group - group name + * a_id - read from this id + * a_count_out[in], how many items to read, 0 - no limits + * a_count_out[out], how many items was read + */ +dap_store_obj_t* dap_db_driver_sqlite_read_cond_store_obj(const char *a_group, uint64_t a_id, size_t *a_count_out) +{ + dap_store_obj_t *l_obj = NULL; + char *l_error_message = NULL; + sqlite3_stmt *l_res; + if(!a_group) + return NULL; + // no limit + int l_count_out = 0; + if(a_count_out) + l_count_out = *a_count_out; + char *l_str_query; + if(l_count_out) + l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE id>'%lld' ORDER BY id ASC LIMIT %d", + a_group, a_id, l_count_out); + else + l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE id>'%lld' ORDER BY id ASC", + a_group, a_id); + int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, &l_error_message); + sqlite3_free(l_str_query); + if(l_ret != SQLITE_OK) { + log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); + dap_db_driver_sqlite_free(l_error_message); + return NULL; + } + + //int b = qlite3_column_count(s_db); + SQLITE_ROW_VALUE *l_row = NULL; + l_count_out = 0; + int l_count_sized = 0; + do { + l_ret = dap_db_driver_sqlite_fetch_array(l_res, &l_row); + if(l_ret != SQLITE_ROW && l_ret != SQLITE_DONE) + { + log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); + } + if(l_ret == SQLITE_ROW && l_row) { + // realloc memory + if(l_count_out >= l_count_sized) { + l_count_sized += 10; + l_obj = DAP_REALLOC(l_obj, sizeof(dap_store_obj_t) * l_count_sized); + memset(l_obj + l_count_out, 0, sizeof(dap_store_obj_t) * (l_count_sized - l_count_out)); + } + // fill current item + dap_store_obj_t *l_obj_cur = l_obj + l_count_out; + fill_one_item(a_group, l_obj_cur, l_row); + l_count_out++; + } + dap_db_driver_sqlite_row_free(l_row); + } while(l_row); + + dap_db_driver_sqlite_query_free(l_res); + + if(a_count_out) + *a_count_out = l_count_out; + return l_obj; +} + +/** + * Read several items + * + * a_group - group name + * a_key - key name, may by NULL, it means reading the whole group + * a_count_out[in], how many items to read, 0 - no limits + * a_count_out[out], how many items was read + */ +dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out) +{ + dap_store_obj_t *l_obj = NULL; + char *l_error_message = NULL; + sqlite3_stmt *l_res; + if(!a_group) + return NULL; + // no limit + int l_count_out = 0; + if(a_count_out) + l_count_out = *a_count_out; + char *l_str_query; + if(a_key) { + if(l_count_out) + l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE key='%s' ORDER BY id ASC LIMIT %d", + a_group, a_key, l_count_out); + else + l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE key='%s' ORDER BY id ASC", + a_group, a_key); + } + else { + if(l_count_out) + l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' ORDER BY id ASC LIMIT %d", + a_group, l_count_out); + else + l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' ORDER BY id ASC", a_group); + } + int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, &l_error_message); + sqlite3_free(l_str_query); + if(l_ret != SQLITE_OK) { + log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); + dap_db_driver_sqlite_free(l_error_message); + return NULL; + } + + //int b = qlite3_column_count(s_db); + SQLITE_ROW_VALUE *l_row = NULL; + l_count_out = 0; + int l_count_sized = 0; + do { + l_ret = dap_db_driver_sqlite_fetch_array(l_res, &l_row); + if(l_ret != SQLITE_ROW && l_ret != SQLITE_DONE) + { + log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); + } + if(l_ret == SQLITE_ROW && l_row) { + // realloc memory + if(l_count_out >= l_count_sized) { + l_count_sized += 10; + l_obj = DAP_REALLOC(l_obj, sizeof(dap_store_obj_t) * l_count_sized); + memset(l_obj + l_count_out, 0, sizeof(dap_store_obj_t) * (l_count_sized - l_count_out)); + } + // fill currrent item + dap_store_obj_t *l_obj_cur = l_obj + l_count_out; + fill_one_item(a_group, l_obj_cur, l_row); + l_count_out++; + } + dap_db_driver_sqlite_row_free(l_row); + } while(l_row); + + dap_db_driver_sqlite_query_free(l_res); + + if(a_count_out) + *a_count_out = l_count_out; + return l_obj; +} diff --git a/dap_chain_global_db_driver_sqlite.h b/dap_chain_global_db_driver_sqlite.h new file mode 100755 index 0000000000000000000000000000000000000000..202bf7fc0e2ca50c45bea2bb8ddbf9d9d8b118b9 --- /dev/null +++ b/dap_chain_global_db_driver_sqlite.h @@ -0,0 +1,49 @@ +/* + * Authors: + * Alexander Lysikov <alexander.lysikov@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2019 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "sqlite3.h" +#include "dap_chain_global_db_driver.h" + +int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks_t *a_drv_callback); +int dap_db_driver_sqlite_deinit(void); + +sqlite3* dap_db_driver_sqlite_open(const char *a_filename_utf8, int a_flags, char **error_message); +void dap_db_driver_sqlite_close(sqlite3 *l_db); +void dap_db_driver_sqlite_free(char *memory); +bool dap_db_driver_sqlite_set_pragma(sqlite3 *a_db, char *a_param, char *a_mode); + + +// ** SQLite callbacks ** + +// Start a transaction +int dap_db_driver_sqlite_start_transaction(void); +// End of transaction +int dap_db_driver_sqlite_end_transaction(void); + +// Apply data (write or delete) +int dap_db_driver_sqlite_apply_store_obj(dap_store_obj_t *a_store_obj); +// Read data +dap_store_obj_t* dap_db_driver_sqlite_read_last_store_obj(const char *a_group); +dap_store_obj_t* dap_db_driver_sqlite_read_cond_store_obj(const char *a_group, uint64_t a_id, size_t *a_count_out); +dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out); diff --git a/dap_chain_global_db_hist.c b/dap_chain_global_db_hist.c index eefc4778a64f628487c087eefa3ae82a4f85b268..b3704aa2c6f29b8ddeb944bf9e0afa5d79823b27 100755 --- a/dap_chain_global_db_hist.c +++ b/dap_chain_global_db_hist.c @@ -24,7 +24,7 @@ static int dap_db_history_unpack_hist(char *l_str_in, dap_global_db_hist_t *a_re if(l_count != 4) return -1; a_rec_out->type = l_strv[0][0]; - a_rec_out->keys_count = strtoul(l_strv[1], NULL,10); + a_rec_out->keys_count = strtoul(l_strv[1], NULL, 10); a_rec_out->group = dap_strdup(l_strv[2]); a_rec_out->keys = dap_strdup(l_strv[3]); dap_strfreev(l_strv); @@ -73,16 +73,16 @@ uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out) while(l_keys[i]) { dap_store_obj_t *l_obj = NULL; // add record - read record - if(l_rec.type=='a') + if(l_rec.type == 'a') l_obj = (dap_store_obj_t*) dap_chain_global_db_obj_get(l_keys[i], l_rec.group); // delete record - save only key for record - else if(l_rec.type=='d'){// //section=strdup("kelvin_nodes"); - l_obj = (dap_store_obj_t*)DAP_NEW_Z(dap_store_obj_t); + else if(l_rec.type == 'd') { // //section=strdup("kelvin_nodes"); + l_obj = (dap_store_obj_t*) DAP_NEW_Z(dap_store_obj_t); l_obj->group = dap_strdup(l_rec.group); l_obj->key = dap_strdup(l_keys[i]); } - if (l_obj == NULL){ - dab_db_free_pdap_store_obj_t(l_store_obj, l_count); + if(l_obj == NULL) { + dap_store_obj_free(l_store_obj, l_count); dap_strfreev(l_keys); return NULL; } @@ -96,7 +96,7 @@ uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out) // serialize data dap_store_obj_pkt_t *l_data_out = dap_store_packet_multiple(l_store_obj, l_timestamp, l_count); - dab_db_free_pdap_store_obj_t(l_store_obj, l_count); + dap_store_obj_free(l_store_obj, l_count); dap_strfreev(l_keys); if(l_data_out && a_data_size_out) { @@ -106,8 +106,6 @@ uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out) } - - /** * Add data to the history log */ @@ -125,7 +123,7 @@ bool dap_db_history_add(char a_type, pdap_store_obj_t a_store_obj, size_t a_dap_ l_rec.keys = a_store_obj->key; else { // make keys vector - char **l_keys = DAP_NEW_Z_SIZE(char*, sizeof(char*) * ( ((size_t) a_dap_store_count) + 1)); + char **l_keys = DAP_NEW_Z_SIZE(char*, sizeof(char*) * (((size_t ) a_dap_store_count) + 1)); size_t i; for(i = 0; i < a_dap_store_count; i++) { // if it is marked, the data has not been saved @@ -144,12 +142,11 @@ bool dap_db_history_add(char a_type, pdap_store_obj_t a_store_obj, size_t a_dap_ // key - timestamp // value - keys of added/deleted data l_store_data.key = dap_db_new_history_timestamp(); - l_store_data.value = (uint8_t*) strdup(l_str) ; - l_store_data.value_len = l_str_len+1; + l_store_data.value = (uint8_t*) strdup(l_str); + l_store_data.value_len = l_str_len + 1; l_store_data.group = GROUP_LOCAL_HISTORY; l_store_data.timestamp = time(NULL); - int l_res = dap_db_add(&l_store_data, 1); - //printf("!!!\n!!!HISTORY store save l_res=%d ts=%lld text=%s\n!!!\n",l_res,l_store_data.timestamp,l_str); + int l_res = dap_chain_global_db_driver_add(&l_store_data, 1); if(l_rec.keys_count > 1) DAP_DELETE(l_rec.keys); DAP_DELETE(l_str); @@ -172,22 +169,27 @@ bool dap_db_history_truncate(void) */ time_t dap_db_log_get_last_timestamp(void) { - char *last_key = NULL; - size_t l_data_size_out = 0; - dap_global_db_obj_t **l_objs = dap_chain_global_db_gr_load(GROUP_LOCAL_HISTORY, &l_data_size_out); - if(l_data_size_out > 0) - last_key = l_objs[0]->key; - for(size_t i = 1; i < l_data_size_out; i++) { - dap_global_db_obj_t *l_obj_cur = l_objs[i]; - if(strcmp(last_key, l_obj_cur->key) < 0) { - last_key = l_obj_cur->key; - //printf("l_obj_cur->key=%s last_key\n", l_obj_cur->key); - } - //printf("l_obj_cur->key=%s\n", l_obj_cur->key); + //dap_store_obj_t *l_last_obj = dap_chain_global_db_driver_read_last( + dap_store_obj_t *l_last_obj = dap_chain_global_db_get_last(GROUP_LOCAL_HISTORY); + if(l_last_obj) { + return l_last_obj->id; } - time_t l_ret_time = last_key? strtoll(last_key, NULL, 10): 0; - dap_chain_global_db_objs_delete(l_objs); - return l_ret_time; + /* char *last_key = NULL; + size_t l_data_size_out = 0; + dap_global_db_obj_t **l_objs = dap_chain_global_db_gr_load(GROUP_LOCAL_HISTORY, &l_data_size_out); + if(l_data_size_out > 0) + last_key = l_objs[0]->key; + for(size_t i = 1; i < l_data_size_out; i++) { + dap_global_db_obj_t *l_obj_cur = l_objs[i]; + if(strcmp(last_key, l_obj_cur->key) < 0) { + last_key = l_obj_cur->key; + //printf("l_obj_cur->key=%s last_key\n", l_obj_cur->key); + } + //printf("l_obj_cur->key=%s\n", l_obj_cur->key); + } + time_t l_ret_time = last_key? strtoll(last_key, NULL, 10): 0; + dap_chain_global_db_objs_delete(l_objs); + return l_ret_time;*/ } static int compare_items(const void * l_a, const void * l_b) @@ -204,24 +206,41 @@ static int compare_items(const void * l_a, const void * l_b) dap_list_t* dap_db_log_get_list(time_t first_timestamp) { dap_list_t *l_list = NULL; - size_t l_list_count = 0; - char *l_first_key_str = dap_strdup_printf("%lld", (int64_t) first_timestamp); size_t l_data_size_out = 0; - dap_global_db_obj_t **l_objs = dap_chain_global_db_gr_load(GROUP_LOCAL_HISTORY, &l_data_size_out); + dap_store_obj_t *l_objs = dap_chain_global_db_cond_load(GROUP_LOCAL_HISTORY, first_timestamp, &l_data_size_out); + //dap_global_db_obj_t **l_objs = dap_chain_global_db_gr_load(GROUP_LOCAL_HISTORY, first_timestamp, &l_data_size_out); for(size_t i = 0; i < l_data_size_out; i++) { - dap_global_db_obj_t *l_obj_cur = l_objs[i]; -// log_it(L_DEBUG,"%lld and %lld tr",strtoll(l_obj_cur->key,NULL,10), first_timestamp ); - if( strtoll(l_obj_cur->key,NULL,10) > (long long) first_timestamp ) { - dap_global_db_obj_t *l_item = DAP_NEW(dap_global_db_obj_t); - l_item->key = dap_strdup(l_obj_cur->key); - l_item->value =(uint8_t*) dap_strdup((char*) l_obj_cur->value); - l_list = dap_list_append(l_list, l_item); - l_list_count++; - } + dap_store_obj_t *l_obj_cur = l_objs + i; + dap_global_db_obj_t *l_item = DAP_NEW(dap_global_db_obj_t); + l_item->key = dap_strdup(l_obj_cur->key); + l_item->value = (uint8_t*) dap_strdup((char*) l_obj_cur->value); + l_list = dap_list_append(l_list, l_item); } - // sort list by key (time str) - dap_list_sort(l_list, (dap_callback_compare_t) compare_items); - log_it(L_DEBUG,"Prepared %u items (list size %u)", l_list_count, dap_list_length(l_list)); + dap_store_obj_free(l_objs, l_data_size_out); + + return l_list; + /* + size_t l_list_count = 0; + char *l_first_key_str = dap_strdup_printf("%lld", (int64_t) first_timestamp); + size_t l_data_size_out = 0; + + for(size_t i = 0; i < l_data_size_out; i++) { + dap_global_db_obj_t *l_obj_cur = l_objs[i]; + // log_it(L_DEBUG,"%lld and %lld tr",strtoll(l_obj_cur->key,NULL,10), first_timestamp ); + if( strtoll(l_obj_cur->key,NULL,10) > (long long) first_timestamp ) { + dap_global_db_obj_t *l_item = DAP_NEW(dap_global_db_obj_t); + l_item->key = dap_strdup(l_obj_cur->key); + l_item->value =(uint8_t*) dap_strdup((char*) l_obj_cur->value); + l_list = dap_list_append(l_list, l_item); + l_list_count++; + } + } + // sort list by key (time str) + //dap_list_sort(l_list, (dap_callback_compare_t) compare_items); + log_it(L_DEBUG,"Prepared %u items (list size %u)", l_list_count, dap_list_length(l_list)); + DAP_DELETE(l_first_key_str); + dap_chain_global_db_objs_delete(l_objs); + */ /*/ dbg - sort result l_data_size_out = dap_list_length(l_list); for(size_t i = 0; i < l_data_size_out; i++) { @@ -230,9 +249,6 @@ dap_list_t* dap_db_log_get_list(time_t first_timestamp) printf("2 %d %s\n", i, l_item->key); }*/ - DAP_DELETE(l_first_key_str); - dap_chain_global_db_objs_delete(l_objs); - return l_list; } /** diff --git a/dap_chain_global_db_hist.h b/dap_chain_global_db_hist.h index b21d6341919d89f021b6384cebf05151db39e041..62e98c5844bf6cc84f79563db42850d416a41761 100755 --- a/dap_chain_global_db_hist.h +++ b/dap_chain_global_db_hist.h @@ -1,7 +1,7 @@ #pragma once #include <stdbool.h> -#include "dap_chain_global_db_pvt.h" +#include "dap_chain_global_db_driver.h" #define GLOBAL_DB_HIST_REC_SEPARATOR "\r;" #define GLOBAL_DB_HIST_KEY_SEPARATOR "\a;" diff --git a/dap_chain_global_db_pvt.c b/dap_chain_global_db_pvt.c deleted file mode 100755 index 69099cbbeb4def30885f18ac80eaf25c8e2af595..0000000000000000000000000000000000000000 --- a/dap_chain_global_db_pvt.c +++ /dev/null @@ -1,575 +0,0 @@ -#include <string.h> -#include <stdio.h> -#include <assert.h> -#include <time.h> - -#include "dap_chain_global_db.h" -#include "dap_chain_global_db_pvt.h" -#include "dap_strfuncs.h" -#include "dap_list.h" - -#define LOG_TAG "dap_global_db" -#define TDB_PREFIX_LEN 7 - -static struct ldb_context *s_ldb = NULL; -static TALLOC_CTX *s_mem_ctx = NULL; - -//static int dap_store_len = 0; // initialized only when reading from local db -static char *dap_db_path = NULL; - -#define CALL2(a, b, ...) (a), (b) -#define dap_db_add_record(...) dap_db_merge(CALL2(__VA_ARGS__, 0)) - -static int dap_db_add_msg(struct ldb_message *a_msg) -{ - if(ldb_msg_sanity_check(s_ldb, a_msg) != LDB_SUCCESS) { - log_it(L_ERROR, "LDB message is inconsistent: %s", ldb_errstring(s_ldb)); - return -1; - } - ldb_transaction_start(s_ldb); - int status = ldb_add(s_ldb, a_msg); - // Delete the entry if it already exist and add again - if(status == LDB_ERR_ENTRY_ALREADY_EXISTS) { - ldb_delete(s_ldb, a_msg->dn); - status = ldb_add(s_ldb, a_msg); - } - if(status != LDB_SUCCESS) { - if(status == LDB_ERR_ENTRY_ALREADY_EXISTS) { - log_it(L_INFO, "Entry %s already present, skipped", ldb_dn_get_linearized(a_msg->dn)); - } - else { - log_it(L_ERROR, "LDB adding error: %s", ldb_errstring(s_ldb)); - } - ldb_transaction_cancel(s_ldb); - return -2; - } - else { - ldb_transaction_commit(s_ldb); - //log_it(L_INFO, "Entry %s added", ldb_dn_get_linearized(a_msg->dn)); - return 0; - } -} - -/** - * @brief dap_db_group_create - * @param a_group - */ -void dap_db_group_create(const char * a_group) -{ - struct ldb_message *msg; - - // level 2: group record - msg = ldb_msg_new(s_ldb); - msg->dn = ldb_dn_new(s_mem_ctx, s_ldb, "ou=addrs_leased,dc=kelvin_nodes"); - ldb_msg_add_string(msg, "ou", a_group ); - ldb_msg_add_string(msg, "objectClass", "group"); - ldb_msg_add_string(msg, "section", "kelvin_nodes"); - ldb_msg_add_string(msg, "description", "Whitelist of Kelvin blockchain nodes"); - dap_db_add_msg(msg); - talloc_free(msg->dn); - talloc_free(msg); - -} - -/** - * @brief dap_db_init - * @param path - * @return - */ -int dap_db_init(const char *path) -{ - s_mem_ctx = talloc_new(NULL); - if(ldb_global_init() != 0) { - log_it(L_ERROR, "Couldn't initialize LDB's global information"); - return -1; - }; - if((s_ldb = ldb_init(s_mem_ctx, NULL)) != LDB_SUCCESS) { - log_it(L_INFO, "ldb context initialized"); - char *l_tdb_path = DAP_NEW_Z_SIZE(char,strlen(path) + TDB_PREFIX_LEN ); - memset(l_tdb_path, '\0', strlen(path) + TDB_PREFIX_LEN); - strcat(l_tdb_path, "tdb://"); // using tdb for simplicity, no need for separate LDAP server - strcat(l_tdb_path, path); - struct ldb_result *data_message; - if(ldb_connect(s_ldb, l_tdb_path, 0, NULL) != LDB_SUCCESS) { - log_it(L_ERROR, "Couldn't connect to database"); - DAP_DELETE(l_tdb_path); - return 1; - } - dap_db_path = strdup(l_tdb_path); - const char *query = "(dn=*)"; - if(ldb_search(s_ldb, s_mem_ctx, &data_message, NULL, LDB_SCOPE_DEFAULT, NULL, "%s", query) != LDB_SUCCESS) { - log_it(L_ERROR, "Database querying failed"); - DAP_DELETE(l_tdb_path); - return 2; - } - struct ldb_message *msg; - if(data_message->count == 0) { - // level 1: section record - msg = ldb_msg_new(s_ldb); - msg->dn = ldb_dn_new(s_mem_ctx, s_ldb, "dc=kelvin_nodes"); - ldb_msg_add_string(msg, "dc", "kelvin_nodes"); - ldb_msg_add_string(msg, "objectClass", "top"); - ldb_msg_add_string(msg, "objectClass", "section"); - dap_db_add_msg(msg); - - // level 2: groups - dap_db_group_create( GROUP_LOCAL_HISTORY); - dap_db_group_create( GROUP_LOCAL_GENERAL ); - dap_db_group_create( GROUP_LOCAL_NODE_LAST_TS); - - } - talloc_free(data_message); - DAP_DELETE(l_tdb_path); - return 0; - } - else { - log_it(L_ERROR, "Couldn't initialize LDB context"); - return -2; - } -} - -int dap_db_del_msg(struct ldb_dn *ldn) -{ - ldb_transaction_start(s_ldb); - int status = ldb_delete(s_ldb, ldn); - if(status != LDB_SUCCESS) { - log_it(L_ERROR, "LDB deleting error: %s", ldb_errstring(s_ldb)); - ldb_transaction_cancel(s_ldb); - return -2; - } - else { - ldb_transaction_commit(s_ldb); - //log_it(L_INFO, "Entry %s deleted", ldb_dn_get_linearized(ldn)); - return 0; - } -} - -static int compare_message_items(const void * l_a, const void * l_b) -{ - const struct ldb_message *l_item_a = (const struct ldb_message*) l_a; - const struct ldb_message *l_item_b = (const struct ldb_message*) l_b; - const struct ldb_val *l_val_a = ldb_msg_find_ldb_val(l_item_a, "time"); - const struct ldb_val *l_val_b = ldb_msg_find_ldb_val(l_item_b, "time"); - time_t timestamp_a = 0; - time_t timestamp_b = 0; - if(l_val_a) - memcpy(×tamp_a, l_val_a->data, min(sizeof(time_t), l_val_a->length)); - if(l_val_b) - memcpy(×tamp_b, l_val_b->data, min(sizeof(time_t), l_val_b->length)); - if(timestamp_a == timestamp_b) - return 0; - if(timestamp_a < timestamp_b) - return -1; - return 1; -} - -/** - * Get data from base - * - * query RFC2254 (The String Representation of LDAP Search Filters) - * sample: - * (uid=testuser) Matches to all users that have exactly the value testuser for the attribute uid. - * (uid=test*) Matches to all users that have values for the attribute uid that start with test. - * (!(uid=test*)) Matches to all users that have values for the attribute uid that do not start with test. - * (&(department=1234)(city=Paris)) Matches to all users that have exactly the value 1234 for the attribute department and exactly the value Paris for the attribute city . - * - */ -pdap_store_obj_t dap_db_read_data(const char *a_query, size_t *a_count) -{ - struct ldb_result *data_message; - /* - CN commonName (2.5.4.3) - L localityName (2.5.4.7) - ST stateOrProvinceName (2.5.4.8) - O organizationName (2.5.4.10) - OU organizationalUnitName (2.5.4.11) - C countryName (2.5.4.6) - STREET streetAddress (2.5.4.9) - DC domainComponent (0.9.2342.19200300.100.1.25) - UID userId (0.9.2342.19200300.100.1.1) - */ - if(ldb_connect(s_ldb, dap_db_path, LDB_FLG_RDONLY, NULL) != LDB_SUCCESS) { - log_it(L_ERROR, "Couldn't connect to database"); - return NULL; - } - //sample: query = "(objectClass=addr_leased)"; - if(ldb_search(s_ldb, NULL, &data_message, NULL, LDB_SCOPE_DEFAULT, NULL, a_query) != LDB_SUCCESS) { - log_it(L_ERROR, "Database querying failed"); - return NULL; - } - //log_it(L_INFO, "Obtained binary data, %d entries", data_message->count); - - // not found data - if(!data_message->count ) { - talloc_free(data_message); - return NULL; - } - - pdap_store_obj_t store_data = DAP_NEW_Z_SIZE(dap_store_obj_t, data_message->count * sizeof(struct dap_store_obj)); - if(!store_data) { - log_it(L_ERROR, "Couldn't allocate memory, store objects unobtained"); - talloc_free(data_message); - return NULL; - } - - dap_list_t *l_list_items = NULL; - // fill list - for(size_t i = 0; i < data_message->count; i++) { - l_list_items = dap_list_prepend(l_list_items, data_message->msgs[i]); - } - // sort list by time - l_list_items = dap_list_sort(l_list_items, (dap_callback_compare_t) compare_message_items); - - dap_list_t *l_list = l_list_items; - size_t q = 0; - while(l_list) { - const struct ldb_message *l_msgs = l_list->data; - store_data[q].section = strdup(ldb_msg_find_attr_as_string(l_msgs, "section", "")); //strdup("kelvin_nodes"); - store_data[q].group = strdup(ldb_msg_find_attr_as_string(l_msgs, "objectClass", "")); //strdup(group); - store_data[q].type = 1; - store_data[q].key = strdup(ldb_msg_find_attr_as_string(l_msgs, "cn", "")); - // get timestamp - const struct ldb_val *l_val = ldb_msg_find_ldb_val(l_msgs, "time"); - if(l_val) { - memcpy(&store_data[q].timestamp, l_val->data, min(sizeof(time_t), l_val->length)); - } - // get value - l_val = ldb_msg_find_ldb_val(l_msgs, "val"); - if(l_val) { - store_data[q].value_len = l_val->length; - store_data[q].value = DAP_NEW_SIZE(uint8_t, l_val->length); - memcpy(store_data[q].value, l_val->data, l_val->length); - } - q++; - l_list = dap_list_next(l_list); - //log_it(L_INFO, "Record %s read successfully", ldb_dn_get_linearized(data_message->msgs[q]->dn)); - } - size_t dap_store_len = data_message->count; - /*for(size_t q = 0; q < dap_store_len; ++q) { - store_data[q].section = strdup(ldb_msg_find_attr_as_string(data_message->msgs[q], "section", "")); //strdup("kelvin_nodes"); - store_data[q].group = strdup(ldb_msg_find_attr_as_string(data_message->msgs[q], "objectClass", "")); //strdup(group); - store_data[q].type = 1; - store_data[q].key = strdup(ldb_msg_find_attr_as_string(data_message->msgs[q], "cn", "")); - // get timestamp - const struct ldb_val *l_val = ldb_msg_find_ldb_val(data_message->msgs[q], "time"); - if(l_val) { - memcpy(&store_data[q].timestamp, l_val->data, min(sizeof(time_t), l_val->length)); - } - // get value - l_val = ldb_msg_find_ldb_val(data_message->msgs[q], "val"); - if(l_val) { - store_data[q].value_len = l_val->length; - store_data[q].value = DAP_NEW_SIZE(uint8_t, l_val->length); - memcpy(store_data[q].value, l_val->data, l_val->length); - } - //log_it(L_INFO, "Record %s read successfully", ldb_dn_get_linearized(data_message->msgs[q]->dn)); - }*/ - talloc_free(data_message); - dap_list_free(l_list_items); - if(a_count) - *a_count = dap_store_len; - return store_data; -} - -/** - * clean memory - */ -void dab_db_free_pdap_store_obj_t(pdap_store_obj_t store_data, size_t count) -{ - if(!store_data) - return; - for(size_t i = 0; i < count; i++) { - pdap_store_obj_t store_one = store_data + i; - DAP_DELETE(store_one->section); - DAP_DELETE(store_one->group); - DAP_DELETE(store_one->key); - DAP_DELETE(store_one->value); - } - DAP_DELETE(store_data); -} - -/* Get the entire content without using query expression - * This function is highly dissuaded from being used - * */ -pdap_store_obj_t dap_db_read_file_data(const char *path, const char *group) -{ - struct ldb_ldif *ldif_msg; - FILE *fs = fopen(path, "r"); - if(!fs) { - log_it(L_ERROR, "Can't open file %s", path); - return NULL; - } - pdap_store_obj_t store_data = (pdap_store_obj_t) malloc(256 * sizeof(dap_store_obj_t)); - if(store_data != NULL) { - log_it(L_INFO, "We're about to put entries in store objects"); - } - else { - log_it(L_ERROR, "Couldn't allocate memory, store objects unobtained"); - fclose(fs); - return NULL; - } - - size_t q = 0; - for(ldif_msg = ldb_ldif_read_file(s_ldb, fs); ldif_msg; ldif_msg = ldb_ldif_read_file(s_ldb, fs), q++) { - if(q % 256 == 0) { - store_data = (pdap_store_obj_t) realloc(store_data, (q + 256) * sizeof(dap_store_obj_t)); - } - /* if (ldif_msg->changetype == LDB_CHANGETYPE_ADD) { - / ... / - } */ // in case we gonna use extra LDIF functionality - const char *key = ldb_msg_find_attr_as_string(ldif_msg->msg, "cn", NULL); - if(key != NULL) { - store_data[q].section = strdup("kelvin-testnet"); - store_data[q].group = strdup(group); - store_data[q].type = 1; - store_data[q].key = strdup(key); - store_data[q].value =(uint8_t*) strdup( ldb_msg_find_attr_as_string(ldif_msg->msg, "time", NULL)); - store_data[q].value_len = strlen ( (char*) store_data[q].value) +1; - log_it(L_INFO, "Record %s stored successfully", ldb_dn_get_linearized(ldif_msg->msg->dn)); - } - ldb_ldif_read_free(s_ldb, ldif_msg); - } - fclose(fs); - return store_data; -} - -/* - * Add multiple entries received from remote node to local database. - * Since we don't know the size, it must be supplied too - * - * dap_store_size the count records - * return 0 if Ok, <0 if errors - */ -int dap_db_add(pdap_store_obj_t a_store_obj, size_t a_store_count) -{ - int l_ret = 0; - if(a_store_obj == NULL) { - log_it(L_ERROR, "Invalid Dap store objects passed"); - return -1; - } - if(ldb_connect(s_ldb, dap_db_path, 0, NULL) != LDB_SUCCESS) { - log_it(L_ERROR, "Couldn't connect to database"); - return -2; - } - //log_it(L_INFO, "We're about to put %d records into database", a_store_count); - struct ldb_message *l_msg; - if(a_store_count == 0) { - a_store_count = 1; - } - for(size_t q = 0; q < a_store_count; q++) { - // level 3: leased address, single whitelist entity - - // if it is marked, don't save - if(a_store_obj[q].timestamp == (time_t) -1) - continue; - - l_msg = ldb_msg_new(s_ldb); - char dn[256]; - memset(dn, '\0', 256); - strcat(dn, "cn="); - strcat(dn, a_store_obj[q].key); - //strcat(dn, ",ou=addrs_leased,dc=kelvin_nodes"); - strcat(dn, ",ou="); - strcat(dn, a_store_obj[q].group); - strcat(dn, ",dc=kelvin_nodes"); - l_msg->dn = ldb_dn_new(s_mem_ctx, s_ldb, dn); - int l_res = ldb_msg_add_string(l_msg, "cn", a_store_obj[q].key); - ldb_msg_add_string(l_msg, "objectClass", a_store_obj[q].group); - ldb_msg_add_string(l_msg, "section", "kelvin_nodes"); - ldb_msg_add_string(l_msg, "description", "Approved Kelvin node"); - - struct ldb_val l_val; - struct ldb_message_element *return_el; - l_val.data = (uint8_t*) &a_store_obj[q].timestamp; - l_val.length = sizeof(time_t); - l_res = ldb_msg_add_value(l_msg, "time", &l_val, &return_el); - - l_val.data = a_store_obj[q].value; - l_val.length = a_store_obj[q].value_len; - l_res = ldb_msg_add_value(l_msg, "val", &l_val, &return_el); - - l_ret += dap_db_add_msg(l_msg); // accumulation error codes - talloc_free(l_msg->dn); - talloc_free(l_msg); - } - return l_ret; -} - -/* - * Delete multiple entries from local database. - * - * dap_store_size the count records - * return 0 if Ok, <0 if errors - */ -int dap_db_delete(pdap_store_obj_t store_obj, size_t a_store_count) -{ - int ret = 0; - if(store_obj == NULL) { - log_it(L_ERROR, "Invalid Dap store objects passed"); - return -1; - } - if(ldb_connect(s_ldb, dap_db_path, 0, NULL) != LDB_SUCCESS) { - log_it(L_ERROR, "Couldn't connect to database"); - return -2; - } - //log_it(L_INFO, "We're delete %d records from database", a_store_count); - if(a_store_count == 0) { - a_store_count = 1; - } - for(size_t q = 0; q < a_store_count; q++) { - char dn[128]; - memset(dn, '\0', 128); - strcat(dn, "cn="); - strcat(dn, store_obj[q].key); - //strcat(dn, ",ou=addrs_leased,dc=kelvin_nodes"); - strcat(dn, ",ou="); - strcat(dn, store_obj[q].group); - strcat(dn, ",dc=kelvin_nodes"); - struct ldb_dn *ldn = ldb_dn_new(s_mem_ctx, s_ldb, dn); - ret += dap_db_del_msg(ldn); // accumulation error codes - talloc_free(ldn); - } - return ret; - -} - -/* serialization */ -/*dap_store_obj_pkt_t *dap_store_packet_single(pdap_store_obj_t store_obj) - { - dap_store_obj_pkt_t *pkt = DAP_NEW_Z_SIZE(dap_store_obj_pkt_t, - sizeof(int) + 4 + strlen(store_obj->group) + strlen(store_obj->key) + strlen(store_obj->section) - + strlen(store_obj->value)); - pkt->grp_size = strlen(store_obj->group) + 1; - pkt->name_size = strlen(store_obj->key) + 1; - pkt->sec_size = strlen(store_obj->section) + 1; - pkt->type = store_obj->type; - memcpy(pkt->data, &store_obj->section, pkt->sec_size); - memcpy(pkt->data + pkt->sec_size, &store_obj->group, pkt->grp_size); - memcpy(pkt->data + pkt->sec_size + pkt->grp_size, &store_obj->key, pkt->name_size); - memcpy(pkt->data + pkt->sec_size + pkt->grp_size + pkt->name_size, &store_obj->value, strlen(store_obj->value) + 1); - return pkt; - }*/ - -static size_t dap_db_get_size_pdap_store_obj_t(pdap_store_obj_t store_obj) -{ - size_t size = sizeof(uint32_t) + 3 * sizeof(uint16_t) + sizeof(size_t) + sizeof(time_t) + dap_strlen(store_obj->group) + - dap_strlen(store_obj->key) + dap_strlen(store_obj->section) + store_obj->value_len; - return size; -} - -/** - * serialization - * @param a_store_obj_count count of structures store_obj - * @param a_timestamp create data time - * @param a_size_out[out] size of output structure - * @return NULL in case of an error - */ -dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp, size_t a_store_obj_count) -{ - if(!a_store_obj || a_store_obj_count < 1) - return NULL; - size_t l_data_size_out = sizeof(uint32_t); // size of output data - // calculate output structure size - for(size_t l_q = 0; l_q < a_store_obj_count; ++l_q) - l_data_size_out += dap_db_get_size_pdap_store_obj_t(&a_store_obj[l_q]); - - dap_store_obj_pkt_t *l_pkt = DAP_NEW_Z_SIZE(dap_store_obj_pkt_t, sizeof(dap_store_obj_pkt_t) + l_data_size_out); - l_pkt->data_size = l_data_size_out; - l_pkt->timestamp = a_timestamp; - uint64_t l_offset = 0; - uint32_t l_count = (uint32_t) a_store_obj_count; - memcpy(l_pkt->data + l_offset, &l_count, sizeof(uint32_t)); - l_offset += sizeof(uint32_t); - for( size_t l_q = 0; l_q < a_store_obj_count; ++l_q) { - dap_store_obj_t obj = a_store_obj[l_q]; - uint16_t section_size = (uint16_t) dap_strlen(obj.section); - uint16_t group_size = (uint16_t) dap_strlen(obj.group); - uint16_t key_size = (uint16_t) dap_strlen(obj.key); - memcpy(l_pkt->data + l_offset, &obj.type, sizeof(int)); - l_offset += sizeof(int); - memcpy(l_pkt->data + l_offset, §ion_size, sizeof(uint16_t)); - l_offset += sizeof(uint16_t); - memcpy(l_pkt->data + l_offset, obj.section, section_size); - l_offset += section_size; - memcpy(l_pkt->data + l_offset, &group_size, sizeof(uint16_t)); - l_offset += sizeof(uint16_t); - memcpy(l_pkt->data + l_offset, obj.group, group_size); - l_offset += group_size; - memcpy(l_pkt->data + l_offset, &obj.timestamp, sizeof(time_t)); - l_offset += sizeof(time_t); - memcpy(l_pkt->data + l_offset, &key_size, sizeof(uint16_t)); - l_offset += sizeof(uint16_t); - memcpy(l_pkt->data + l_offset, obj.key, key_size); - l_offset += key_size; - memcpy(l_pkt->data + l_offset, &obj.value_len, sizeof(size_t)); - l_offset += sizeof(size_t); - memcpy(l_pkt->data + l_offset, obj.value, obj.value_len); - l_offset += obj.value_len; - } - assert(l_data_size_out == l_offset); - return l_pkt; -} -/** - * deserialization - * @param store_obj_count[out] count of the output structures store_obj - * @return NULL in case of an error* - */ - -dap_store_obj_t *dap_store_unpacket(const dap_store_obj_pkt_t *pkt, size_t *store_obj_count) -{ - if(!pkt || pkt->data_size < 1) - return NULL; - uint64_t offset = 0; - uint32_t count; - memcpy(&count, pkt->data, sizeof(uint32_t)); - offset += sizeof(uint32_t); - dap_store_obj_t *store_obj = DAP_NEW_Z_SIZE(dap_store_obj_t, count * sizeof(struct dap_store_obj)); - for(size_t q = 0; q < count; ++q) { - dap_store_obj_t *obj = store_obj + q; - uint16_t str_size; - memcpy(&obj->type, pkt->data + offset, sizeof(int)); - offset += sizeof(int); - - memcpy(&str_size, pkt->data + offset, sizeof(uint16_t)); - offset += sizeof(uint16_t); - obj->section = DAP_NEW_Z_SIZE(char, str_size + 1); - memcpy(obj->section, pkt->data + offset, str_size); - offset += str_size; - - memcpy(&str_size, pkt->data + offset, sizeof(uint16_t)); - offset += sizeof(uint16_t); - obj->group = DAP_NEW_Z_SIZE(char, str_size + 1); - memcpy(obj->group, pkt->data + offset, str_size); - offset += str_size; - - memcpy(&obj->timestamp, pkt->data + offset, sizeof(time_t)); - offset += sizeof(time_t); - - memcpy(&str_size, pkt->data + offset, sizeof(uint16_t)); - offset += sizeof(uint16_t); - obj->key = DAP_NEW_Z_SIZE(char, str_size + 1); - memcpy(obj->key, pkt->data + offset, str_size); - offset += str_size; - - memcpy(&obj->value_len, pkt->data + offset, sizeof(size_t)); - offset += sizeof(size_t); - - obj->value = DAP_NEW_Z_SIZE(uint8_t, obj->value_len + 1); - memcpy(obj->value, pkt->data + offset, obj->value_len); - offset += obj->value_len; - } - assert(pkt->data_size == offset); - if(store_obj_count) - *store_obj_count = count; - return store_obj; -} - -void dap_db_deinit() -{ - talloc_free(s_ldb); - talloc_free(s_mem_ctx); - free(dap_db_path); - s_ldb = NULL; - s_mem_ctx = NULL; - dap_db_path = NULL; -} diff --git a/dap_chain_global_db_pvt.h b/dap_chain_global_db_pvt.h deleted file mode 100755 index 7fab7e35dbd05751c2d8dc05e95ca88e2e4fb9e9..0000000000000000000000000000000000000000 --- a/dap_chain_global_db_pvt.h +++ /dev/null @@ -1,41 +0,0 @@ -#pragma once - -#include <stdint.h> -#include "dap_common.h" -#include "ldb.h" - -typedef struct dap_store_obj { - time_t timestamp; - uint8_t type; - char *section; - char *group; - char *key; - uint8_t *value; - size_t value_len; -}DAP_ALIGN_PACKED dap_store_obj_t, *pdap_store_obj_t; - -typedef struct dap_store_obj_pkt { - /*uint8_t type; - uint8_t sec_size; - uint8_t grp_size; - uint8_t name_size;*/ - time_t timestamp; - size_t data_size; - uint8_t data[]; -}__attribute__((packed)) dap_store_obj_pkt_t; - -int dap_db_init(const char*); -void dap_db_group_create(const char *); -void dap_db_deinit(void); - -int dap_db_add(pdap_store_obj_t a_store_obj, size_t a_store_count); -int dap_db_delete(pdap_store_obj_t a_store_obj, size_t a_store_count); - -pdap_store_obj_t dap_db_read_data(const char *a_query, size_t *a_count); -pdap_store_obj_t dap_db_read_file_data(const char *a_path, const char *a_group); // state of emergency only, if LDB database is inaccessible -dap_store_obj_pkt_t *dap_store_packet_single(pdap_store_obj_t a_store_obj); -dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp, size_t a_store_obj_count); -dap_store_obj_t *dap_store_unpacket(const dap_store_obj_pkt_t *a_pkt, size_t *a_store_obj_count); - -void dab_db_free_pdap_store_obj_t(pdap_store_obj_t a_store_data, size_t a_count); -