diff --git a/dap_chain_global_db_driver.c b/dap_chain_global_db_driver.c index aa9dbecd62b43fa40cf1a62ce19bcaef972a6e32..b14f1e62fbb2fb540781cd72b4feab2c7348341f 100755 --- a/dap_chain_global_db_driver.c +++ b/dap_chain_global_db_driver.c @@ -526,6 +526,21 @@ int dap_chain_global_db_driver_delete(pdap_store_obj_t a_store_obj, size_t a_sto return dap_chain_global_db_driver_appy(a_store_obj, a_store_count); } +/** + * Read the number of items + * + * a_group - group name + * a_id - from this id + */ +size_t dap_chain_global_db_driver_count(const char *a_group, uint64_t id) +{ + size_t l_count_out = 0; + // read the number of items + if(s_drv_callback.read_count_store) + l_count_out = s_drv_callback.read_count_store(a_group, id); + return l_count_out; +} + /** * Read last items * diff --git a/dap_chain_global_db_driver.h b/dap_chain_global_db_driver.h index 7af2e04273b7413f2925484592791bb41d8ef9cb..f078d51570f50c49580c58a3973d5eb795975dc6 100755 --- a/dap_chain_global_db_driver.h +++ b/dap_chain_global_db_driver.h @@ -50,6 +50,7 @@ typedef int (*dap_db_driver_write_callback_t)(dap_store_obj_t*); typedef dap_store_obj_t* (*dap_db_driver_read_callback_t)(const char *,const char *, size_t *); typedef dap_store_obj_t* (*dap_db_driver_read_cond_callback_t)(const char *,uint64_t , size_t *); typedef dap_store_obj_t* (*dap_db_driver_read_last_callback_t)(const char *); +typedef size_t (*dap_db_driver_read_count_callback_t)(const char *,uint64_t); typedef int (*dap_db_driver_callback_t)(void); typedef struct dap_db_driver_callbacks { @@ -57,6 +58,7 @@ typedef struct dap_db_driver_callbacks { dap_db_driver_read_callback_t read_store_obj; dap_db_driver_read_last_callback_t read_last_store_obj; dap_db_driver_read_cond_callback_t read_cond_store_obj; + dap_db_driver_read_count_callback_t read_count_store; dap_db_driver_callback_t transaction_start; dap_db_driver_callback_t transaction_end; dap_db_driver_callback_t deinit; @@ -79,6 +81,7 @@ int dap_chain_global_db_driver_delete(pdap_store_obj_t a_store_obj, size_t a_sto dap_store_obj_t* dap_chain_global_db_driver_read_last(const char *a_group); dap_store_obj_t* dap_chain_global_db_driver_cond_read(const char *a_group, uint64_t id, size_t *a_count_out); dap_store_obj_t* dap_chain_global_db_driver_read(const char *a_group, const char *a_key, size_t *count_out); +size_t dap_chain_global_db_driver_count(const char *a_group, uint64_t id); dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp, size_t a_store_obj_count); diff --git a/dap_chain_global_db_driver_cdb.c b/dap_chain_global_db_driver_cdb.c index fedb2c003c05c272c7352fb4f1274b72c3a2c201..09961f515ebf3f9c5c6eb17118c1dcad590fca6a 100644 --- a/dap_chain_global_db_driver_cdb.c +++ b/dap_chain_global_db_driver_cdb.c @@ -227,6 +227,7 @@ int dap_db_driver_cdb_init(const char *a_cdb_path, dap_db_driver_callbacks_t *a_ a_drv_callback->apply_store_obj = dap_db_driver_cdb_apply_store_obj; a_drv_callback->read_store_obj = dap_db_driver_cdb_read_store_obj; a_drv_callback->read_cond_store_obj = dap_db_driver_cdb_read_cond_store_obj; + a_drv_callback->read_count_store = dap_db_driver_cdb_read_count_store; a_drv_callback->deinit = dap_db_driver_cdb_deinit; a_drv_callback->flush = dap_db_driver_cdb_flush; @@ -418,6 +419,21 @@ dap_store_obj_t* dap_db_driver_cdb_read_cond_store_obj(const char *a_group, uint return l_arg.o; } +size_t dap_db_driver_cdb_read_count_store(const char *a_group, uint64_t a_id) +{ + if(!a_group) { + return 0; + } + pcdb_instance l_cdb_i = dap_cdb_get_db_by_group(a_group); + if(!l_cdb_i) { + return NULL; + } + CDB *l_cdb = l_cdb_i->cdb; + CDBSTAT l_cdb_stat; + cdb_stat(l_cdb, &l_cdb_stat); + return (size_t) l_cdb_stat.rnum; +} + int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) { if(!a_store_obj || !a_store_obj->group) { return -1; diff --git a/dap_chain_global_db_driver_cdb.h b/dap_chain_global_db_driver_cdb.h index e4309d64482fdd34c77724b818934570147f3842..44997bc4954fac4ec14454d07bef04231a9b7f43 100644 --- a/dap_chain_global_db_driver_cdb.h +++ b/dap_chain_global_db_driver_cdb.h @@ -45,4 +45,5 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t); dap_store_obj_t *dap_db_driver_cdb_read_last_store_obj(const char*); dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char*, const char*, size_t*); +size_t dap_db_driver_cdb_read_count_store(const char *a_group, uint64_t a_id); dap_store_obj_t* dap_db_driver_cdb_read_cond_store_obj(const char*, uint64_t, size_t*); diff --git a/dap_chain_global_db_hist.c b/dap_chain_global_db_hist.c index da8e809d7792cfd0ab524c11ab72c3f942b4598b..a48020163e9e74a260692a73aafe4b1237235ea2 100755 --- a/dap_chain_global_db_hist.c +++ b/dap_chain_global_db_hist.c @@ -5,12 +5,10 @@ #include <dap_common.h> #include <dap_strfuncs.h> -#include <dap_list.h> #include <dap_string.h> #include <dap_hash.h> #include "dap_chain_datum_tx_items.h" -#include "dap_chain_global_db.h" #include "dap_chain_global_db_hist.h" #include "uthash.h" @@ -52,19 +50,25 @@ static int dap_db_history_unpack_hist(char *l_str_in, dap_global_db_hist_t *a_re static char* dap_db_new_history_timestamp() { static pthread_mutex_t s_mutex = PTHREAD_MUTEX_INITIALIZER; + uint64_t l_suffix = 0; + time_t l_cur_time; // get unique key pthread_mutex_lock(&s_mutex); static time_t s_last_time = 0; static uint64_t s_suffix = 0; - time_t l_cur_time = time(NULL); - if(s_last_time == l_cur_time) + time_t l_cur_time_tmp = time(NULL); + if(s_last_time == l_cur_time_tmp) s_suffix++; else { s_suffix = 0; - s_last_time = l_cur_time; + s_last_time = l_cur_time_tmp; } - char *l_str = dap_strdup_printf("%lld_%lld", (uint64_t) l_cur_time, s_suffix); + // save tmp values + l_cur_time = l_cur_time_tmp; + l_suffix = s_suffix; pthread_mutex_unlock(&s_mutex); + + char *l_str = dap_strdup_printf("%lld_%lld", (uint64_t) l_cur_time, l_suffix); return l_str; } @@ -1218,3 +1222,184 @@ void dap_db_log_del_list(dap_list_t *a_list) { dap_list_free_full(a_list, (dap_callback_destroyed_t) dap_chain_global_db_obj_delete); } + + + + +/** + * Thread for reading log list + * instead dap_db_log_get_list() + */ +static void *s_list_thread_proc(void *arg) +{ + dap_db_log_list_t *l_dap_db_log_list = (dap_db_log_list_t*) arg; + size_t l_items_number = 0; + while(1) { + bool is_process; + // check for break process + pthread_mutex_lock(&l_dap_db_log_list->list_mutex); + is_process = l_dap_db_log_list->is_process; + size_t l_item_start = l_dap_db_log_list->item_start; + size_t l_data_size_out = l_dap_db_log_list->item_last; + pthread_mutex_unlock(&l_dap_db_log_list->list_mutex); + if(!is_process) + break; + // calculating how many items required to read + l_data_size_out = min(10, l_data_size_out - l_item_start); + if(!l_data_size_out) + break; + // read next 1...10 items + dap_store_obj_t *l_objs = dap_chain_global_db_cond_load(GROUP_LOCAL_HISTORY, l_item_start, &l_data_size_out); + if(!l_objs) + break; + dap_list_t *l_list = NULL; + for(size_t i = 0; i < l_data_size_out; i++) { + dap_store_obj_t *l_obj_cur = l_objs + i; + dap_global_db_obj_t *l_item = DAP_NEW(dap_global_db_obj_t); + l_item->id = l_obj_cur->id; + l_item->key = dap_strdup(l_obj_cur->key); + l_item->value = (uint8_t*) dap_strdup((char*) l_obj_cur->value); + l_list = dap_list_append(l_list, l_item); + } + pthread_mutex_lock(&l_dap_db_log_list->list_mutex); + // add l_list to list_write + l_dap_db_log_list->list_write = dap_list_concat(l_dap_db_log_list->list_write, l_list); + // init read list if it ended already + if(!l_dap_db_log_list->list_read) + l_dap_db_log_list->list_read = l_list; + l_dap_db_log_list->item_start += l_data_size_out; + pthread_mutex_unlock(&l_dap_db_log_list->list_mutex); + l_items_number += l_data_size_out; + log_it(L_DEBUG, "loaded items n=%u/%u", l_data_size_out, l_items_number); + dap_store_obj_free(l_objs, l_data_size_out); + // ... + } + + pthread_mutex_lock(&l_dap_db_log_list->list_mutex); + l_dap_db_log_list->is_process = false; + pthread_mutex_unlock(&l_dap_db_log_list->list_mutex); + pthread_exit(0); + return 0; +} + +/** + * instead dap_db_log_get_list() + */ +dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id) +{ + + log_it(L_DEBUG, "Start loading db list_write..."); + dap_db_log_list_t *l_dap_db_log_list = DAP_NEW_Z(dap_db_log_list_t); + + size_t l_data_size_out = dap_chain_global_db_driver_count(GROUP_LOCAL_HISTORY, first_id); + // debug +// if(l_data_size_out>11) +// l_data_size_out = 11; + l_dap_db_log_list->item_start = first_id; + l_dap_db_log_list->item_last = first_id + l_data_size_out; + l_dap_db_log_list->items_number = l_data_size_out; + l_dap_db_log_list->items_rest = l_data_size_out; + // there are too few items, read items right now + if(l_data_size_out <= 10) { + dap_list_t *l_list = NULL; + // read first items + dap_store_obj_t *l_objs = dap_chain_global_db_cond_load(GROUP_LOCAL_HISTORY, first_id, &l_data_size_out); + for(size_t i = 0; i < l_data_size_out; i++) { + dap_store_obj_t *l_obj_cur = l_objs + i; + dap_global_db_obj_t *l_item = DAP_NEW(dap_global_db_obj_t); + l_item->id = l_obj_cur->id; + l_item->key = dap_strdup(l_obj_cur->key); + l_item->value = (uint8_t*) dap_strdup((char*) l_obj_cur->value); + l_list = dap_list_append(l_list, l_item); + } + l_dap_db_log_list->list_write = l_list; + l_dap_db_log_list->list_read = l_list; + log_it(L_DEBUG, "loaded items n=%d", l_data_size_out); + dap_store_obj_free(l_objs, l_data_size_out); + } + // start thread for items loading + else { + l_dap_db_log_list->is_process = true; + pthread_mutex_init(&l_dap_db_log_list->list_mutex, NULL); + pthread_create(&l_dap_db_log_list->thread, NULL, s_list_thread_proc, l_dap_db_log_list); + } + return l_dap_db_log_list; +} + +/** + * Get number of items + */ +size_t dap_db_log_list_get_count(dap_db_log_list_t *a_db_log_list) +{ + if(!a_db_log_list) + return NULL; + size_t l_items_number; + pthread_mutex_lock(&a_db_log_list->list_mutex); + l_items_number = a_db_log_list->items_number; + pthread_mutex_unlock(&a_db_log_list->list_mutex); + return l_items_number; +} + +size_t dap_db_log_list_get_count_rest(dap_db_log_list_t *a_db_log_list) +{ + if(!a_db_log_list) + return NULL; + size_t l_items_rest; + pthread_mutex_lock(&a_db_log_list->list_mutex); + l_items_rest = a_db_log_list->items_rest; + pthread_mutex_unlock(&a_db_log_list->list_mutex); + return l_items_rest; +} +/** + * Get one item from log_list + */ +dap_global_db_obj_t* dap_db_log_list_get(dap_db_log_list_t *a_db_log_list) +{ + if(!a_db_log_list) + return NULL; + dap_list_t *l_list; + bool l_is_process; + int l_count = 0; + while(1) { + pthread_mutex_lock(&a_db_log_list->list_mutex); + l_is_process = a_db_log_list->is_process; + // check next item + l_list = a_db_log_list->list_read; + if (l_list){ + a_db_log_list->list_read = dap_list_next(a_db_log_list->list_read); + a_db_log_list->items_rest--; + } + pthread_mutex_unlock(&a_db_log_list->list_mutex); + // wait reading next item, no more 1 sec (50 ms * 100 times) + if(!l_list && l_is_process) { + dap_usleep(DAP_USEC_PER_SEC / 200); + l_count++; + if(l_count > 100) + break; + } + else + break; + } + log_it(L_DEBUG, "get item n=%d", a_db_log_list->items_number - a_db_log_list->items_rest); + return (dap_global_db_obj_t*) l_list ? l_list->data : NULL; + //return l_list; +} + +/** + * Get log diff as list_write + */ +void dap_db_log_list_delete(dap_db_log_list_t *a_db_log_list) +{ + if(!a_db_log_list) + return; + // stop thread if it has created + if(a_db_log_list->thread) { + pthread_mutex_lock(&a_db_log_list->list_mutex); + a_db_log_list->is_process = false; + pthread_mutex_unlock(&a_db_log_list->list_mutex); + pthread_join(a_db_log_list->thread, NULL); + } + dap_list_free_full(a_db_log_list->list_write, (dap_callback_destroyed_t) dap_chain_global_db_obj_delete); + pthread_mutex_destroy(&a_db_log_list->list_mutex); + DAP_DELETE(a_db_log_list); +} diff --git a/dap_chain_global_db_hist.h b/dap_chain_global_db_hist.h index 6f9788d1a534cf297e132d0ccb5a6f36fefa1f81..851cfad6fe96dabbf1bbaa409ba0e8a340c5de2d 100755 --- a/dap_chain_global_db_hist.h +++ b/dap_chain_global_db_hist.h @@ -1,6 +1,8 @@ #pragma once #include <stdbool.h> +#include <dap_list.h> +#include "dap_chain_global_db.h" #include "dap_chain_global_db_driver.h" #define GLOBAL_DB_HIST_REC_SEPARATOR "\r;" @@ -19,3 +21,23 @@ bool dap_db_history_add(char a_type, pdap_store_obj_t a_store_obj, size_t a_dap_ // Truncate the history log bool dap_db_history_truncate(void); + +// for dap_db_log_list_xxx() +typedef struct dap_db_log_list { + dap_list_t *list_write; // writed list + dap_list_t *list_read; // readed list (inside list_write) + bool is_process; + size_t item_start; + size_t item_last; + size_t items_rest; + size_t items_number; + pthread_t thread; + pthread_mutex_t list_mutex; +} dap_db_log_list_t; + +dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id); +size_t dap_db_log_list_get_count(dap_db_log_list_t *a_db_log_list); +size_t dap_db_log_list_get_count_rest(dap_db_log_list_t *a_db_log_list); +dap_global_db_obj_t* dap_db_log_list_get(dap_db_log_list_t *a_db_log_list); +void dap_db_log_list_delete(dap_db_log_list_t *a_db_log_list); +