From 6d9b9b3406b0bbec66a479dd26e871427e6dbdc9 Mon Sep 17 00:00:00 2001 From: "Ruslan (The BadAss SysMan) Laishev" <ruslan.laishev@demlabs.net> Date: Fri, 6 May 2022 23:44:31 +0300 Subject: [PATCH] [*] features-4751-MDBX - Maintenance commit --- .gitmodules | 3 + 3rdparty/libmdbx | 1 + CMakeLists.txt | 1 + .../global-db/dap_chain_global_db_driver.c | 4 +- .../dap_chain_global_db_driver_mdbx.c | 768 +++++++++++++++--- .../include/dap_chain_global_db_driver.h | 21 +- 6 files changed, 689 insertions(+), 109 deletions(-) create mode 100644 .gitmodules create mode 160000 3rdparty/libmdbx diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000000..c592e5eab1 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "3rdparty/libmdbx"] + path = 3rdparty/libmdbx + url = https://gitflic.ru/project/erthink/libmdbx diff --git a/3rdparty/libmdbx b/3rdparty/libmdbx new file mode 160000 index 0000000000..9230201ca9 --- /dev/null +++ b/3rdparty/libmdbx @@ -0,0 +1 @@ +Subproject commit 9230201ca9f0bd383deeb829fa83fe6fd22a7b1c diff --git a/CMakeLists.txt b/CMakeLists.txt index b22bb7f224..a959efb0bb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -100,6 +100,7 @@ endif() add_subdirectory(dap-sdk) add_subdirectory(3rdparty/monero_crypto) add_subdirectory(3rdparty/cuttdb) +add_subdirectory(3rdparty/libmdbx) if(DAPSDK_MODULES MATCHES "ssl-support") add_subdirectory(3rdparty/wolfssl) endif() diff --git a/modules/global-db/dap_chain_global_db_driver.c b/modules/global-db/dap_chain_global_db_driver.c index e62052accc..3cbee088b3 100644 --- a/modules/global-db/dap_chain_global_db_driver.c +++ b/modules/global-db/dap_chain_global_db_driver.c @@ -109,11 +109,9 @@ int l_ret = -1; l_ret = dap_db_driver_sqlite_init(l_db_path_ext, &s_drv_callback); else if(!dap_strcmp(s_used_driver, "cdb")) l_ret = dap_db_driver_cdb_init(l_db_path_ext, &s_drv_callback); - -#ifdef DAP_CHAIN_GDB_ENGINE_MDBX else if(!dap_strcmp(s_used_driver, "mdbx")) l_ret = dap_db_driver_mdbx_init(l_db_path_ext, &s_drv_callback); -#endif + #ifdef DAP_CHAIN_GDB_ENGINE_PGSQL else if(!dap_strcmp(s_used_driver, "pgsql")) l_ret = dap_db_driver_pgsql_init(l_db_path_ext, &s_drv_callback); diff --git a/modules/global-db/dap_chain_global_db_driver_mdbx.c b/modules/global-db/dap_chain_global_db_driver_mdbx.c index b5007fd69e..48eb767468 100644 --- a/modules/global-db/dap_chain_global_db_driver_mdbx.c +++ b/modules/global-db/dap_chain_global_db_driver_mdbx.c @@ -1,8 +1,8 @@ /* - * Authors: - * Gerasimov Dmitriy <gerasimov.dmitriy@demlabs.net> + * AUTHORS: + * Ruslan R. (The BadAss SysMan) Laishev <ruslan.laishev@demlabs.net> * DeM Labs Ltd. https://demlabs.net - * Copyright (c) 2021 + * Copyright (c) 2022 * All rights reserved. This file is part of DAP SDK the open source project @@ -19,6 +19,13 @@ You should have received a copy of the GNU General Public License along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. + + DESCRIPTION: + + DESIGN ISSUE: + + MODIFICATION HISTORY: + 4-MAY-2022 RRL Developing for actual version of the LibMDBX */ #include <stddef.h> @@ -26,7 +33,10 @@ #include <dirent.h> #include <pthread.h> #include <sys/stat.h> +#include <errno.h> #include <uthash.h> +#include <stdatomic.h> + #define _GNU_SOURCE #include "dap_chain_global_db_driver_mdbx.h" @@ -35,14 +45,43 @@ #include "dap_file_utils.h" #include "dap_common.h" -#ifdef DAP_CHAIN_GDB_ENGINE_MDBX +#define DAP_CHAIN_GDB_ENGINE_MDBX 1 +//#ifdef DAP_CHAIN_GDB_ENGINE_MDBX +#include "mdbx.h" /* LibMDBX API */ #define LOG_TAG "dap_chain_global_db_mdbx" -static char *s_cdb_path = NULL; +/** Struct for a MDBX instanse */ +typedef struct __db_ctx__ { + atomic_ullong id; /* Just a counter of */ + size_t namelen; /* Group name length */ + char name[DAP$SZ_MAXGROUPNAME + 1]; /* Group's name */ + MDBX_dbi dbi; /* MDBX's internal context id */ + MDBX_txn *txn; /* Current MDBX's transaction */ + UT_hash_handle hh; +} dap_db_ctx_t; + + +/** Struct for a item */ +typedef struct _obj_arg { + pdap_store_obj_t o; + uint64_t q; + uint64_t n; + uint64_t id; +} obj_arg; + + +static dap_db_ctx_t *s_db_ctxs = NULL; /* A pointer to a CDB instance. */ +static pthread_mutex_t s_db_mutex = PTHREAD_MUTEX_INITIALIZER; /* A mutex for working with a DB instanse. */ +static pthread_rwlock_t s_db_rwlock = PTHREAD_RWLOCK_INITIALIZER; /* A read-write lock for working with a DB instanse. */ + +static char s_db_path[MAX_PATH]; /* A root directory for the MDBX files */ + static int s_driver_callback_deinit(); static int s_driver_callback_flush(void); -static int s_driver_callback_apply_store_obj(pdap_store_obj_t a_store_obj); + +static int s_apply_store_obj (dap_store_obj_t *a_store_obj); + static dap_store_obj_t *s_driver_callback_read_last_store_obj(const char* a_group); static bool s_driver_callback_is_obj(const char *a_group, const char *a_key); static dap_store_obj_t *s_driver_callback_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out); @@ -50,158 +89,687 @@ static dap_store_obj_t* s_driver_callback_read_cond_store_obj(const char *a_grou static size_t s_driver_callback_read_count_store(const char *a_group, uint64_t a_id); static dap_list_t* s_driver_callback_get_groups_by_mask(const char *a_group_mask); -/** - * @brief dap_db_driver_mdbx_init - * @param a_cdb_path - * @param a_drv_callback - * @return - */ -int dap_db_driver_mdbx_init(const char *a_cdb_path, dap_db_driver_callbacks_t *a_drv_callback) + +static MDBX_env *s_mdbx_env; /* MDBX's context area */ +static char s_subdir [] = "mdbx-db"; /* Name of subdir for the MDBX's database files */ + + +static dap_db_ctx_t *s_db_ctx_init_by_group(const char *a_group, int a_flags) { - s_cdb_path = dap_strdup(a_cdb_path); - if(s_cdb_path[strlen(s_cdb_path)] == '/') { - s_cdb_path[strlen(s_cdb_path)] = '\0'; - } - dap_mkdir_with_parents(s_cdb_path); - struct dirent *d; - DIR *dir = opendir(s_cdb_path); - if (!dir) { - log_it(L_ERROR, "Couldn't open db directory"); - return -1; +int l_rc; +dap_db_ctx_t *l_db_ctx, *l_db_ctx2; + + assert( !pthread_rwlock_rdlock(&s_db_rwlock) ); /* Get RD lock for lookup only */ + HASH_FIND_STR(s_db_ctxs, a_group, l_db_ctx); /* Is there exist context for the group ? */ + assert( !pthread_rwlock_unlock(&s_db_rwlock) ); + + if ( l_db_ctx ) /* Found! Good job - return DB context */ + return log_it(L_INFO, "Found DB context: %p for group: '%s'", l_db_ctx, a_group), l_db_ctx; + + if ( !(a_flags & MDBX_CREATE) ) /* Not found and we don't need to create it ? */ + return NULL; + + /* So , at this point we are going to create 'table' for new group */ + + if ( (l_rc = strlen(a_group)) > DAP$SZ_MAXGROUPNAME ) /* Check length of the group name */ + return log_it(L_ERROR, "Group name '%s' is too long (%d>%d)", a_group, l_rc, DAP$SZ_MAXGROUPNAME), NULL; + + if ( !(l_db_ctx = DAP_NEW_Z(dap_db_ctx_t)) ) /* Allocate zeroed memory for new context */ + return log_it(L_ERROR, "Cannot allocate DB context for '%s', errno=%d", a_group, errno), NULL; + + memcpy(l_db_ctx->name, a_group, l_db_ctx->namelen = l_rc); /* Store group name in the DB context */ + + /* Start transaction, create table, commit. */ + if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, 0, &l_db_ctx->txn)) ) + return log_it(L_CRITICAL, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)), NULL; + + if ( MDBX_SUCCESS != (l_rc = mdbx_dbi_open(l_db_ctx->txn, a_group, a_flags, &l_db_ctx->dbi)) ) + return log_it(L_CRITICAL, "mdbx_dbi_open: (%d) %s", l_rc, mdbx_strerror(l_rc)), NULL; + + if ( MDBX_SUCCESS != (l_rc = mdbx_txn_commit(l_db_ctx->txn)) ) + return log_it(L_CRITICAL, "mdbx_txn_commit: (%d) %s", l_rc, mdbx_strerror(l_rc)), NULL; + + + /* Add new DB Context for the group into the hash for quick access */ + assert( !pthread_rwlock_wrlock(&s_db_rwlock) ); /* Get WR lock for the hash-table */ + + HASH_FIND_STR(s_db_ctxs, a_group, l_db_ctx2 = NULL); /* Check for existence of group again!!! */ + + if ( !l_db_ctx2) /* Still not exist - fine, add new record */ + HASH_ADD_KEYPTR(hh, s_db_ctxs, l_db_ctx->name, l_db_ctx->namelen, l_db_ctx); + + assert( !pthread_rwlock_unlock(&s_db_rwlock) ); + + if ( l_db_ctx2 ) /* Relese unnecessary new context */ + DAP_DEL_Z(l_db_ctx); + + return l_db_ctx2 ? l_db_ctx2 : l_db_ctx; +} + + + +static int s_deinit(void) +{ +dap_db_ctx_t *l_db_ctx = NULL, *l_tmp; + + assert ( !pthread_rwlock_wrlock(&s_db_rwlock) ); /* Prelock for WR */ + + HASH_ITER(hh, s_db_ctxs, l_db_ctx, l_tmp) + { + if (l_db_ctx->txn) + mdbx_txn_abort(l_db_ctx->txn); + + if (l_db_ctx->dbi) + mdbx_dbi_close(s_mdbx_env, l_db_ctx->dbi); + + if (s_mdbx_env) + mdbx_env_close(s_mdbx_env); + + HASH_DEL(s_db_ctxs, l_db_ctx); + DAP_DELETE(l_db_ctx); } - for (d = readdir(dir); d; d = readdir(dir)) { + + assert ( !pthread_rwlock_unlock(&s_db_rwlock) ); + + return 0; +} + + +int dap_db_driver_mdbx_init(const char *a_mdbx_path, dap_db_driver_callbacks_t *a_drv_callback) +{ +int l_rc; +struct dirent *d; +DIR *dir; + + snprintf(s_db_path, sizeof(s_db_path), "%s/%s", a_mdbx_path, s_subdir );/* Make a path to MDBX root */ + dap_mkdir_with_parents(s_db_path); /* Create directory for the MDBX storage */ + + log_it(L_NOTICE, "Directory '%s' will be used as an location for MDBX database files", s_db_path); + if ( MDBX_SUCCESS != (l_rc = mdbx_env_create(&s_mdbx_env)) ) + return log_it(L_CRITICAL, "mdbx_env_create: (%d) %s", l_rc, mdbx_strerror(l_rc)), -ENOENT; + + log_it(L_NOTICE, "Set maximum number of local groups: %d", DAP$K_MAXGROUPS); + assert ( !mdbx_env_set_maxdbs (s_mdbx_env, DAP$K_MAXGROUPS) ); /* Set maximum number of the file-tables (MDBX subDB) + according to number of supported groupes */ + + + /* We set "unlim" for all MDBX characteristics at the moment */ + if ( MDBX_SUCCESS != (l_rc = mdbx_env_set_geometry(s_mdbx_env, 0, -1, -1, -1,-1, -1)) ) + return log_it (L_CRITICAL, "mdbx_env_set_geometry (%s): (%d) %s", s_db_path, l_rc, mdbx_strerror(l_rc)), -EINVAL; + + if ( MDBX_SUCCESS != (l_rc = mdbx_env_open(s_mdbx_env, s_db_path, MDBX_CREATE | MDBX_COALESCE | MDBX_LIFORECLAIM, 0664)) ) + return log_it (L_CRITICAL, "mdbx_env_open (%s): (%d) %s", s_db_path, l_rc, mdbx_strerror(l_rc)), -EINVAL; + + + /* Scan target MDBX directory and open MDBX database for every file-local-group */ + if ( !(dir = opendir(s_db_path)) ) + return log_it(L_ERROR, "Couldn't open DB directory '%s', errno=%d", s_db_path, errno), -errno; + + while ( (d = readdir(dir))) + { #ifdef _DIRENT_HAVE_D_TYPE if (d->d_type != DT_DIR) continue; -#else +#elif defined(DAP_OS_LINUX) struct _stat buf; int res = _stat(d->d_name, &buf); if (!S_ISDIR(buf.st_mode) || !res) { continue; } -#endif - if (!dap_strcmp(d->d_name, ".") || !dap_strcmp(d->d_name, "..")) { +#elif defined (DAP_OS_BSD) + struct stat buf; + int res = stat(d->d_name, &buf); + if (!S_ISDIR(buf.st_mode) || !res) { continue; } - if (0) { - s_driver_callback_deinit(); +#endif + if ( (d->d_name[0] == '.') || !dap_strcmp(d->d_name, "..")) + continue; + + if ( !s_db_ctx_init_by_group(d->d_name, 0) ) + { + dap_db_driver_mdbx_deinit(); closedir(dir); - return -2; + return -ENOENT; } } - a_drv_callback->read_last_store_obj = s_driver_callback_read_last_store_obj; - a_drv_callback->apply_store_obj = s_driver_callback_apply_store_obj; - a_drv_callback->read_store_obj = s_driver_callback_read_store_obj; - a_drv_callback->read_cond_store_obj = s_driver_callback_read_cond_store_obj; - a_drv_callback->read_count_store = s_driver_callback_read_count_store; - a_drv_callback->get_groups_by_mask = s_driver_callback_get_groups_by_mask; - a_drv_callback->is_obj = s_driver_callback_is_obj; - a_drv_callback->deinit = s_driver_callback_deinit; - a_drv_callback->flush = s_driver_callback_flush; - closedir(dir); - return 0; + +#if __SYS$STARLET__ + s_db_ctx_init_by_group("test", 0); + s_db_ctx_init_by_group("test2", 0); +#endif /* __SYS$STARLET__ */ + + /* Fill the Driver Interface Table */ + a_drv_callback->apply_store_obj = s_apply_store_obj; + a_drv_callback->read_last_store_obj = s_read_last_store_obj; + + a_drv_callback->read_store_obj = s_read_store_obj; + a_drv_callback->read_cond_store_obj = s_read_cond_store_obj; + a_drv_callback->read_count_store = s_read_count_store; + a_drv_callback->get_groups_by_mask = s_get_groups_by_mask; + a_drv_callback->is_obj = s_is_obj; + a_drv_callback->deinit = s_deinit; + a_drv_callback->flush = s_flush; + + + + return MDBX_SUCCESS; } + + /** - * @brief s_driver_callback_deinit - * @return + * @brief Serialize key and val to a item + * key -> key + * val[0..8] => id + * val[..] => value_len + * val[..] => value + * val[..] => timestamp + * @param a_obj a pointer to a item + * @param key a key + * @param val a serialize string */ -static int s_driver_callback_deinit() +static void s_serialize_val_to_dap_store_obj( + dap_store_obj_t *a_obj, + const char *key, + const char *val) { - return 0; +int offset = 0; + + if (!key) + return; + + a_obj->key = dap_strdup(key); + a_obj->id = dap_hex_to_uint(val, sizeof(uint64_t)); + offset += sizeof(uint64_t); + + a_obj->flags = dap_hex_to_uint(val + offset, sizeof(uint8_t)); + offset += sizeof(uint8_t); + + a_obj->value_len = dap_hex_to_uint(val + offset, sizeof(uint64_t)); + offset += sizeof(uint64_t); + + if (a_obj->value_len) { + a_obj->value = DAP_NEW_SIZE(uint8_t, a_obj->value_len); + memcpy((byte_t *)a_obj->value, val + offset, a_obj->value_len); + } + + offset += a_obj->value_len; + a_obj->timestamp = dap_hex_to_uint(val + offset, sizeof(uint64_t)); } -/** - * @brief s_driver_callback_flush - * @return - */ -static int s_driver_callback_flush(void) +/** A callback function designed for finding a last item */ +static int dap_mdbx_get_last_obj_iter_callback( + void *arg, + const char *key, + int ksize, + const char *val, + int vsize, + uint32_t expire, + uint64_t oid) { - int ret = 0; - log_it(L_DEBUG, "Flushing MDBX on the disk"); - return ret; + UNUSED(ksize); + UNUSED(val); + UNUSED(vsize); + UNUSED(expire); + UNUSED(oid); + + if (--((obj_arg *)arg)->q == 0) { + s_serialize_val_to_dap_store_obj((pdap_store_obj_t)(((obj_arg *)arg)->o), key, val); + return false; + } + + return true; +} + +//** A callback function designed for finding a some items */ +static int dap_mdbx_get_some_obj_iter_callback( + void *arg, + const char *key, + int ksize, + const char *val, + int vsize, + uint32_t expire, + uint64_t oid) +{ + UNUSED(ksize); + UNUSED(val); + UNUSED(vsize); + UNUSED(expire); + UNUSED(oid); + + dap_store_obj_t *l_obj = (dap_store_obj_t *)((obj_arg *)arg)->o; + s_serialize_val_to_dap_store_obj(&l_obj[((obj_arg *)arg)->n - ((obj_arg *)arg)->q], key, val); + if (--((obj_arg *)arg)->q == 0) { + return false; + } + return true; +} + +//** A callback function designed for finding a some items by conditionals */ +static int dap_mdbx_get_cond_obj_iter_callback( + void *arg, + const char *key, + int ksize, + const char *val, + int vsize, + uint32_t expire, + uint64_t oid) +{ + UNUSED(ksize); + UNUSED(val); + UNUSED(vsize); + UNUSED(expire); + UNUSED(oid); + + if (dap_hex_to_uint(val, sizeof(uint64_t)) < ((obj_arg *)arg)->id) { + return true; + } + pdap_store_obj_t l_obj = (pdap_store_obj_t)((obj_arg *)arg)->o; + s_serialize_val_to_dap_store_obj(&l_obj[((obj_arg *)arg)->n - ((obj_arg *)arg)->q], key, val); + if (--((obj_arg *)arg)->q == 0) { + return false; + } + return true; } +//** A callback function designed for counting items*/ +bool dap_mdbx_get_count_iter_callback( + void *arg, + const char *key, + int ksize, + const char *val, + int vsize, + uint32_t expire, + uint64_t oid) +{ + UNUSED(ksize); + UNUSED(val); + UNUSED(vsize); + UNUSED(expire); + UNUSED(oid); + UNUSED(key); + + if (dap_hex_to_uint(val, sizeof(uint64_t)) < ((obj_arg *)arg)->id) + return true; + + if (--((obj_arg *)arg)->q == 0) + return false; + + return true; +} + + /** - * @brief s_driver_callback_read_last_store_obj - * @param a_group - * @return + * @brief Gets CDB by a_group. + * @param a_group a group name + * @return if CDB is found, a pointer to CDB, otherwise NULL. */ -static dap_store_obj_t *s_driver_callback_read_last_store_obj(const char* a_group) +static dap_db_ctx_t *s_get_db_ctx_by_group(const char *a_group) { - return NULL; +dap_db_ctx_t *l_db_ctx = NULL; + + assert ( !pthread_rwlock_rdlock(&s_db_rwlock) ); + HASH_FIND_STR(s_db_ctxs, a_group, l_db_ctx); + assert ( !pthread_rwlock_unlock(&s_db_rwlock) ); + + return l_db_ctx; } /** - * @brief s_driver_callback_is_obj - * @param a_group - * @param a_key - * @return + * @brief Flushing CDB to the disk. + * @return 0 */ -static bool s_driver_callback_is_obj(const char *a_group, const char *a_key) +int dap_db_driver_flush(void) { - return false; + return log_it(L_DEBUG, "Flushing CDB to disk"), 0; } /** - * @brief s_driver_callback_read_store_obj - * @param a_group - * @param a_key - * @param a_count_out - * @return + * @brief Read last store item from CDB. + * @param a_group a group name + * @return If successful, a pointer to item, otherwise NULL. */ -static dap_store_obj_t *s_driver_callback_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out) +dap_store_obj_t *dap_db_driver_mdbx_read_last_store_obj(const char* a_group) { - return NULL; +dap_db_ctx_t *l_db_ctx; + + if (!a_group) + return NULL; + + if (!(l_db_ctx = s_get_db_ctx_by_group(a_group)) ) + return NULL; + + void *l_iter = cdb_iterate_new(l_cdb, 0); + obj_arg l_arg; + + l_arg.o = DAP_NEW_Z(dap_store_obj_t); + l_arg.q = l_cdb_stat.rnum; + + cdb_iterate(l_cdb, dap_mdbx_get_last_obj_iter_callback, (void*)&l_arg, l_iter); + cdb_iterate_destroy(l_cdb, l_iter); + + l_arg.o->group = dap_strdup(a_group); + + return l_arg.o; } /** - * @brief s_driver_callback_read_cond_store_obj - * @param a_group - * @param a_id - * @param a_count_out - * @return + * @brief Checks if CDB has a_key + * @param a_group the group name + * @param a_key the key + * @return true or false */ -static dap_store_obj_t* s_driver_callback_read_cond_store_obj(const char *a_group, uint64_t a_id, size_t *a_count_out) +int dap_db_driver_mdbx_is_obj(const char *a_group, const char *a_key) { - return NULL; +bool l_ret = false; +dap_db_ctx_t l_db_ctx; +CDB *l_cdb = l_db_ctx->cdb; + + if(!a_group) + return false; + + if ( !(l_db_ctx = dap_mdbx_get_db_by_group(a_group)) ) + return false; + + if(a_key) { + //int l_vsize; + if( !cdb_is(l_cdb, a_key, (int) dap_strlen(a_key)) ) + l_ret = true; + } + + return l_ret; } /** - * @brief s_driver_callback_read_count_store - * @param a_group - * @param a_id - * @return + * @brief Gets items from CDB by a_group and a_key. If a_key=NULL then gets a_count_out items. + * @param a_group the group name + * @param a_key the key or NULL + * @param a_count_out IN. Count of read items. OUT Count of items was read + * @return If successful, pointer to items; otherwise NULL. */ -static size_t s_driver_callback_read_count_store(const char *a_group, uint64_t a_id) +static dap_store_obj_t *s_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out) { - return 0; +dap_db_ctx_t l_db_ctx; +dap_store_obj_t *l_obj; + + if (!a_group) + return NULL; + + if ( l_db_ctx = dap_mdbx_get_db_by_group(a_group) ) + return NULL; + + CDB *l_cdb = l_db_ctx->cdb; + dap_store_obj_t *l_obj = NULL; + + + if (a_key) { + char *l_value; + int l_vsize; + cdb_get(l_cdb, a_key, (int)strlen(a_key), (void**)&l_value, &l_vsize); + if (!l_value) { + return NULL; + } + l_obj = DAP_NEW_Z(dap_store_obj_t); + s_serialize_val_to_dap_store_obj(l_obj, a_key, l_value); + l_obj->group = dap_strdup(a_group); + cdb_free_val((void**)&l_value); + if(a_count_out) { + *a_count_out = 1; + } + } else { + uint64_t l_count_out = 0; + if(a_count_out) { + l_count_out = *a_count_out; + } + CDBSTAT l_cdb_stat; + cdb_stat(l_cdb, &l_cdb_stat); + if ((l_count_out == 0) || (l_count_out > l_cdb_stat.rnum)) { + l_count_out = l_cdb_stat.rnum; + } + obj_arg l_arg; + l_arg.o = DAP_NEW_Z_SIZE(dap_store_obj_t, l_count_out * sizeof(dap_store_obj_t)); + l_arg.q = l_count_out; + l_arg.n = l_count_out; + void *l_iter = cdb_iterate_new(l_cdb, 0); + /*l_count_out = */cdb_iterate(l_cdb, dap_mdbx_get_some_obj_iter_callback, (void*)&l_arg, l_iter); + cdb_iterate_destroy(l_cdb, l_iter); + if(a_count_out) { + *a_count_out = l_count_out; + } + for (uint64_t i = 0; i < l_count_out; ++i) { + l_arg.o[i].group = dap_strdup(a_group); + } + l_obj = l_arg.o; + } + + return l_obj; } /** - * @brief s_driver_callback_get_groups_by_mask - * @details Check whether the groups match the pattern a_group_mask, which is a shell wildcard pattern - * @param a_group_mask - * @return + * @brief Gets items from CDB by a_group and a_id. + * @param a_group the group name + * @param a_id id + * @param a_count_out[in] a count of items + * @param a_count[out] a count of items were got + * @return If successful, pointer to items, otherwise NULL. */ -static dap_list_t* s_driver_callback_get_groups_by_mask(const char *a_group_mask) -{ - return NULL; +dap_store_obj_t* dap_db_driver_mdbx_read_cond_store_obj(const char *a_group, uint64_t a_id, size_t *a_count_out) { + if (!a_group) { + return NULL; + } + dap_db_ctx_t l_db_ctx = dap_mdbx_get_db_by_group(a_group); + if (!l_db_ctx) { + return NULL; + } + CDB *l_cdb = l_db_ctx->cdb; + uint64_t l_count_out = 0; + if(a_count_out) { + l_count_out = *a_count_out; + } + CDBSTAT l_cdb_stat; + cdb_stat(l_cdb, &l_cdb_stat); + + if (l_count_out == 0 || l_count_out > l_cdb_stat.rnum) { + l_count_out = l_cdb_stat.rnum; + } + obj_arg l_arg; + l_arg.o = DAP_NEW_Z_SIZE(dap_store_obj_t, l_count_out * sizeof(dap_store_obj_t)); + l_arg.n = l_count_out; + l_arg.q = l_count_out; + l_arg.id = a_id; + void *l_iter = cdb_iterate_new(l_cdb, 0); + /*l_count_out = */cdb_iterate(l_cdb, dap_mdbx_get_cond_obj_iter_callback, (void*)&l_arg, l_iter); + cdb_iterate_destroy(l_cdb, l_iter); + if (l_arg.q > 0) { + l_count_out = l_arg.n - l_arg.q; + void *tmp = DAP_REALLOC(l_arg.o, l_count_out * sizeof(dap_store_obj_t)); + if (!tmp && l_count_out) { + log_it(L_CRITICAL, "Couldn't re-allocate memory for portion of store objects!"); + DAP_DELETE(l_arg.o); + return NULL; + } + l_arg.o = tmp; + } + if(a_count_out) { + *a_count_out = l_count_out; + } + for (uint64_t i = 0; i < l_count_out; ++i) { + l_arg.o[i].group = dap_strdup(a_group); + } + return l_arg.o; } /** - * @brief s_driver_callback_apply_store_obj - * @param a_store_obj - * @return + * @brief Reads count of items in CDB by a_group and a_id. + * @param a_group the group name + * @param a_id id + * @return If successful, count of store items; otherwise 0. */ -static int s_driver_callback_apply_store_obj(pdap_store_obj_t a_store_obj) +size_t dap_db_driver_mdbx_read_count_store(const char *a_group, uint64_t a_id) { - if(!a_store_obj || !a_store_obj->group) { - return -1; + if (!a_group) { + return 0; + } + dap_db_ctx_t l_db_ctx = dap_mdbx_get_db_by_group(a_group); + if (!l_db_ctx) { + return 0; } - int ret = 0; - return ret; + CDB *l_cdb = l_db_ctx->cdb; + CDBSTAT l_cdb_stat; + cdb_stat(l_cdb, &l_cdb_stat); + obj_arg l_arg; + l_arg.q = l_cdb_stat.rnum; + l_arg.id = a_id; + void *l_iter = cdb_iterate_new(l_cdb, 0); + cdb_iterate(l_cdb, dap_mdbx_get_count_iter_callback, (void*)&l_arg, l_iter); + cdb_iterate_destroy(l_cdb, l_iter); + return l_cdb_stat.rnum - l_arg.q; } -#endif +static dap_list_t *s_get_groups_by_mask(const char *a_group_mask) +{ +dap_list_t *l_ret_list; +dap_db_ctx_t *l_db_ctx, *l_db_ctx2; + + if(!a_group_mask) + return NULL; + + assert ( !pthread_rwlock_rdlock(&s_db_rwlock) ); + + HASH_ITER(hh, s_db_ctxs, l_db_ctx, l_db_ctx2) { + if (!dap_fnmatch(a_group_mask, l_db_ctx->name, 0) ) /* Name match a pattern/mask ? */ + l_ret_list = dap_list_prepend(l_ret_list, dap_strdup(l_db_ctx->name)); /* Add group name to output list */ + } + + assert ( !pthread_rwlock_rdlock(&s_db_rwlock) ); + + return l_ret_list; +} + + +/* + * Follows suffix structure is supposed to be added at end of MDBX record, so : + * <value> + <suffix> + */ +struct __record_suffix__ { + uint64_t id; + uint8_t flags; + uint64_t ts; +}; + +static int s_apply_store_obj (dap_store_obj_t *a_store_obj) +{ +int l_rc = 0, l_rc2; +dap_db_ctx_t *l_db_ctx; +MDBX_val l_key, l_data; +char *l_val; +struct __record_suffix__ *l_suff; + + if ( !a_store_obj || !a_store_obj->group) /* Sanity checks ... */ + return -EINVAL; + + + if ( !(l_db_ctx = s_get_db_ctx_by_group(a_store_obj->group)) ) { /* Get a DB context for the group */ + log_it(L_WARNING, "No DB context for the group '%s', create it ...", a_store_obj->group); + /* Group is not found ? Try to create table for new group */ + if ( !(l_db_ctx = s_db_ctx_init_by_group(a_store_obj->group, MDBX_CREATE)) ) + return log_it(L_WARNING, "Cannot create DB context for the group '%s'", a_store_obj->group), -EIO; + + log_it(L_NOTICE, "DB context for the group '%s' has been created", a_store_obj->group); + + + if ( a_store_obj->type == DAP_DB$K_OPTYPE_DEL ) /* Nothing to do anymore */ + return 0; + } + + + /* At this point we have got the DB Context for the table/group + * so we are can performs a main work + */ + + + if (a_store_obj->type == DAP_DB$K_OPTYPE_ADD ) { + if( !a_store_obj->key ) + return -ENOENT; + + l_key.iov_base = (void *) a_store_obj->key; /* Fill IOV for MDBX key */ + l_key.iov_len = a_store_obj->key_len ? a_store_obj->key_len : strnlen(a_store_obj->key, DAP$SZ_MAXKEY); + + /* + * Now we are ready to form a record in next format: + * <value> + <suffix> + */ + l_rc = a_store_obj->value_len + sizeof(struct __record_suffix__); /* Compute a length of the arrea to keep value+suffix */ + + if ( !(l_val = DAP_NEW_Z_SIZE(char, l_rc)) ) + return log_it(L_ERROR, "Cannot allocate memory for new records, %d octets, errno=%d", l_rc, errno), -errno; + + l_data.iov_base = l_val; /* Fill IOV for MDBX data */ + l_data.iov_len = l_rc; + + /* + * Fill suffix's fields + */ + l_suff = (struct __record_suffix__ *) (l_val + a_store_obj->value_len); + l_suff->id = atomic_fetch_add(&l_db_ctx->id, 1); + l_suff->flags = a_store_obj->flags; + l_suff->ts = a_store_obj->timestamp; + + memcpy(l_val, a_store_obj->value, a_store_obj->value_len); /* Put <value> into the record */ + + /* So, finaly: start transaction, do INSERT, COMMIT or ABORT ... */ + if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, 0, &l_db_ctx->txn)) ) + return DAP_FREE(l_val), log_it (L_ERROR, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)), -EIO; + + + if ( MDBX_SUCCESS != (l_rc = mdbx_put(l_db_ctx->txn, l_db_ctx->dbi, &l_key, &l_data, 0)) ) + { + log_it (L_ERROR, "mdbx_put: (%d) %s", l_rc, mdbx_strerror(l_rc)); + + if ( MDBX_SUCCESS != (l_rc2 = mdbx_txn_abort(l_db_ctx->txn)) ) + log_it (L_ERROR, "mdbx_abort: (%d) %s", l_rc2, mdbx_strerror(l_rc2)); + } + else if ( MDBX_SUCCESS != (l_rc = mdbx_txn_commit(l_db_ctx->txn)) ) + log_it (L_ERROR, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)); + + if ( l_rc != MDBX_SUCCESS ) + DAP_FREE(l_val); + + return ( l_rc == MDBX_SUCCESS ) ? 0 : -EIO; + } + + + + if (a_store_obj->type == DAP_DB$K_OPTYPE_DEL) { + if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, 0, &l_db_ctx->txn)) ) + return log_it (L_ERROR, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)), -ENOENT; + + + if ( a_store_obj->key ) { /* Delete record */ + l_key.iov_base = (void *) a_store_obj->key; + l_key.iov_len = a_store_obj->key_len ? a_store_obj->key_len : strnlen(a_store_obj->key, DAP$SZ_MAXKEY); + + if ( MDBX_SUCCESS != (l_rc = mdbx_del(l_db_ctx->txn, l_db_ctx->dbi, &l_key, NULL)) ) + l_rc2 = -EIO, log_it (L_ERROR, "mdbx_del: (%d) %s", l_rc, mdbx_strerror(l_rc)); + } + else { /* Truncate whole table */ + if ( MDBX_SUCCESS != (l_rc = mdbx_drop(l_db_ctx->txn, l_db_ctx->dbi, 0)) ) + l_rc2 = -EIO, log_it (L_ERROR, "mdbx_drop: (%d) %s", l_rc, mdbx_strerror(l_rc)); + } + + if ( l_rc != MDBX_SUCCESS ) { /* Check result of mdbx_drop/del */ + if ( MDBX_SUCCESS != (l_rc = mdbx_txn_abort(l_db_ctx->txn)) ) + l_rc2 = -EIO, log_it (L_ERROR, "mdbx_txn_abort: (%d) %s", l_rc, mdbx_strerror(l_rc)); + } + else if ( MDBX_SUCCESS != (l_rc = mdbx_txn_commit(l_db_ctx->txn)) ) + l_rc2 = -EIO, log_it (L_ERROR, "mdbx_txn_abort: (%d) %s", l_rc, mdbx_strerror(l_rc)); + + return ( l_rc2 == MDBX_SUCCESS ) ? 0 : -EIO; + } + + log_it (L_ERROR, "Unhandle/unknow DB opcode (%d/%#x)", a_store_obj->type, a_store_obj->type); + + return -EIO; +} diff --git a/modules/global-db/include/dap_chain_global_db_driver.h b/modules/global-db/include/dap_chain_global_db_driver.h index fd0463e091..c6072a3b38 100644 --- a/modules/global-db/include/dap_chain_global_db_driver.h +++ b/modules/global-db/include/dap_chain_global_db_driver.h @@ -31,9 +31,13 @@ #include <stddef.h> #include <stdint.h> +#define DAP$SZ_MAXGROUPNAME 128 /* A maximum size of group name */ +#define DAP$K_MAXGROUPS 1024 /* A maximum number of groups */ +#define DAP$SZ_MAXKEY 512 /* A limit for the key's length in DB */ + enum { - DAP_DB$K_OPTYPE_ADD = 'a', /* Operation Type = INSERT/ADD */ - DAP_DB$K_OPTYPE_DEL = 'd', /* -- // -- DELETE */ + DAP_DB$K_OPTYPE_ADD = 'a', /* Operation Type = INSERT/ADD */ + DAP_DB$K_OPTYPE_DEL = 'd', /* -- // -- DELETE */ }; @@ -45,15 +49,20 @@ enum RECORD_FLAGS { typedef struct dap_store_obj { uint64_t id; dap_gdb_time_t timestamp; - uint32_t type; /* Operation type: ADD/DELETE, see DAP_DB$K_OPTYPE_* constants */ - uint8_t flags; /* RECORD_FLAGS */ + uint32_t type; /* Operation type: ADD/DELETE, see DAP_DB$K_OPTYPE_* constants */ + uint8_t flags; /* RECORD_FLAGS */ + char *group; + uint64_t group_len; + const char *key; + uint64_t key_len; + uint8_t *value; uint64_t value_len; - dap_proc_queue_callback_t cb; /* (Async mode only!) A call back to be called on request completion */ - const void *cb_arg; /* (Async mode only!) An argument of the callback rotine */ + dap_proc_queue_callback_t cb; /* (Async mode only!) A call back to be called on request completion */ + const void *cb_arg; /* (Async mode only!) An argument of the callback rotine */ } dap_store_obj_t, *pdap_store_obj_t; typedef struct dap_store_obj_pkt { -- GitLab