diff --git a/modules/global-db/dap_chain_global_db_driver_mdbx.c b/modules/global-db/dap_chain_global_db_driver_mdbx.c index 4eacadc2ea7d1b8c46cbdd3546299d7b67eeecc0..18ee7ec90d6186a9b945e4ff8e65d0092dc01b66 100644 --- a/modules/global-db/dap_chain_global_db_driver_mdbx.c +++ b/modules/global-db/dap_chain_global_db_driver_mdbx.c @@ -51,29 +51,29 @@ #include "mdbx.h" /* LibMDBX API */ #define LOG_TAG "dap_chain_global_db_mdbx" -/** Struct for a MDBX instanse */ +extern int s_db_drvmode_async , /* Set a kind of processing requests to DB: + <> 0 - Async mode should be used */ + s_dap_global_db_debug_more; /* Enable extensible debug output */ + + +/** Struct for a MDBX DB context */ typedef struct __db_ctx__ { - atomic_ullong id; /* Just a counter of */ size_t namelen; /* Group name length */ char name[DAP_DB$SZ_MAXGROUPNAME + 1]; /* Group's name */ + + pthread_mutex_t dbi_mutex; /* Coordinate access the MDBX's <dbi> */ MDBX_dbi dbi; /* MDBX's internal context id */ MDBX_txn *txn; /* Current MDBX's transaction */ + UT_hash_handle hh; } dap_db_ctx_t; +static pthread_mutex_t s_db_ctx_mutex = PTHREAD_MUTEX_INITIALIZER; /* A mutex for working with a DB context */ -/** Struct for a item */ -typedef struct _obj_arg { - pdap_store_obj_t o; - uint64_t q; - uint64_t n; - uint64_t id; -} obj_arg; -static dap_db_ctx_t *s_db_ctxs = NULL; /* A pointer to a CDB instance. */ -static pthread_mutex_t s_db_mutex = PTHREAD_MUTEX_INITIALIZER; /* A mutex for working with a DB instanse. */ -static pthread_rwlock_t s_db_rwlock = PTHREAD_RWLOCK_INITIALIZER; /* A read-write lock for working with a DB instanse. */ +static dap_db_ctx_t *s_db_ctxs = NULL; /* A hash table of <group/subDB/table> == MDBX DB context */ +static pthread_rwlock_t s_db_ctxs_rwlock = PTHREAD_RWLOCK_INITIALIZER; /* A read-write lock for working with a <s_db_ctxs>. */ static char s_db_path[MAX_PATH]; /* A root directory for the MDBX files */ @@ -81,9 +81,9 @@ static char s_db_path[MAX_PATH]; /* A static int s_deinit(); static int s_flush(void); static int s_apply_store_obj (dap_store_obj_t *a_store_obj); -static dap_store_obj_t *s_read_last_store_obj(const char* a_group); +static dap_store_obj_t *s_read_last_store_obj(const char* a_group); static int s_is_obj(const char *a_group, const char *a_key); -static dap_store_obj_t *s_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out); +static dap_store_obj_t *s_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out); static dap_store_obj_t *s_read_cond_store_obj(const char *a_group, uint64_t a_id, size_t *a_count_out); static size_t s_read_count_store(const char *a_group, uint64_t a_id); static dap_list_t *s_get_groups_by_mask(const char *a_group_mask); @@ -92,10 +92,16 @@ static dap_list_t *s_get_groups_by_mask(const char *a_group_mask); static MDBX_env *s_mdbx_env; /* MDBX's context area */ static char s_subdir [] = "mdbx-db"; /* Name of subdir for the MDBX's database files */ - - - - +/* + * Suffix structure is supposed to be added at end of MDBX record, so : + * <value> + <suffix> + */ +struct __record_suffix__ { + uint64_t mbz; /* Must Be Zero ! */ + uint64_t id; /* An uniqe-like Id of the record - internaly created and maintained */ + uint64_t flags; /* Flag of the record : see RECORD_FLAGS enums */ + dap_time_t ts; /* Timestamp of the record */ +}; static dap_db_ctx_t *s_db_ctx_init_by_group(const char *a_group, int a_flags) @@ -103,9 +109,12 @@ static dap_db_ctx_t *s_db_ctx_init_by_group(const char *a_group, int a_flags) int l_rc; dap_db_ctx_t *l_db_ctx, *l_db_ctx2; - assert( !pthread_rwlock_rdlock(&s_db_rwlock) ); /* Get RD lock for lookup only */ + debug_if(s_dap_global_db_debug_more, L_DEBUG, "Init group/table '%s', flags: %#x ...", a_group, a_flags); + + + assert( !pthread_rwlock_rdlock(&s_db_ctxs_rwlock) ); /* Get RD lock for lookup only */ HASH_FIND_STR(s_db_ctxs, a_group, l_db_ctx); /* Is there exist context for the group ? */ - assert( !pthread_rwlock_unlock(&s_db_rwlock) ); + assert( !pthread_rwlock_unlock(&s_db_ctxs_rwlock) ); if ( l_db_ctx ) /* Found! Good job - return DB context */ return log_it(L_INFO, "Found DB context: %p for group: '%s'", l_db_ctx, a_group), l_db_ctx; @@ -122,6 +131,7 @@ dap_db_ctx_t *l_db_ctx, *l_db_ctx2; return log_it(L_ERROR, "Cannot allocate DB context for '%s', errno=%d", a_group, errno), NULL; memcpy(l_db_ctx->name, a_group, l_db_ctx->namelen = l_rc); /* Store group name in the DB context */ + pthread_mutex_init(&l_db_ctx->dbi_mutex, NULL); /* ** Start transaction, create table, commit. @@ -139,7 +149,7 @@ dap_db_ctx_t *l_db_ctx, *l_db_ctx2; /* ** Add new DB Context for the group into the hash for quick access */ - assert( !pthread_rwlock_wrlock(&s_db_rwlock) ); /* Get WR lock for the hash-table */ + assert( !pthread_rwlock_wrlock(&s_db_ctxs_rwlock) ); /* Get WR lock for the hash-table */ l_db_ctx2 = NULL; HASH_FIND_STR(s_db_ctxs, a_group, l_db_ctx2); /* Check for existence of group again!!! */ @@ -147,7 +157,7 @@ dap_db_ctx_t *l_db_ctx, *l_db_ctx2; if ( !l_db_ctx2) /* Still not exist - fine, add new record */ HASH_ADD_KEYPTR(hh, s_db_ctxs, l_db_ctx->name, l_db_ctx->namelen, l_db_ctx); - assert( !pthread_rwlock_unlock(&s_db_rwlock) ); + assert( !pthread_rwlock_unlock(&s_db_ctxs_rwlock) ); if ( l_db_ctx2 ) /* Relese unnecessary new context */ DAP_DEL_Z(l_db_ctx); @@ -161,12 +171,12 @@ static int s_deinit(void) { dap_db_ctx_t *l_db_ctx = NULL, *l_tmp; - assert ( !pthread_rwlock_wrlock(&s_db_rwlock) ); /* Prelock for WR */ + assert ( !pthread_rwlock_wrlock(&s_db_ctxs_rwlock) ); /* Prelock for WR */ HASH_ITER(hh, s_db_ctxs, l_db_ctx, l_tmp) { if (l_db_ctx->txn) - mdbx_txn_abort(l_db_ctx->txn); + mdbx_txn_commit(l_db_ctx->txn); if (l_db_ctx->dbi) mdbx_dbi_close(s_mdbx_env, l_db_ctx->dbi); @@ -178,7 +188,7 @@ dap_db_ctx_t *l_db_ctx = NULL, *l_tmp; DAP_DELETE(l_db_ctx); } - assert ( !pthread_rwlock_unlock(&s_db_rwlock) ); + assert ( !pthread_rwlock_unlock(&s_db_ctxs_rwlock) ); return 0; } @@ -262,156 +272,11 @@ DIR *dir; a_drv_callback->is_obj = s_is_obj; a_drv_callback->deinit = s_deinit; a_drv_callback->flush = s_flush; - a_drv_callback->db_ctx = NULL; return MDBX_SUCCESS; } - -/** - * @brief Serialize key and val to a item - * key -> key - * val[0..8] => id - * val[..] => value_len - * val[..] => value - * val[..] => timestamp - * @param a_obj a pointer to a item - * @param key a key - * @param val a serialize string - */ -static void s_serialize_val_to_dap_store_obj -( - dap_store_obj_t *a_obj, - const char *key, - const char *val) -{ -int offset = 0; - - if (!key) - return; - - a_obj->key = dap_strdup(key); - a_obj->id = dap_hex_to_uint(val, sizeof(uint64_t)); - offset += sizeof(uint64_t); - - a_obj->flags = dap_hex_to_uint(val + offset, sizeof(uint8_t)); - offset += sizeof(uint8_t); - - a_obj->value_len = dap_hex_to_uint(val + offset, sizeof(uint64_t)); - offset += sizeof(uint64_t); - - if (a_obj->value_len) { - a_obj->value = DAP_NEW_SIZE(uint8_t, a_obj->value_len); - memcpy((byte_t *)a_obj->value, val + offset, a_obj->value_len); - } - - offset += a_obj->value_len; - a_obj->timestamp = dap_hex_to_uint(val + offset, sizeof(uint64_t)); -} - -/** A callback function designed for finding a last item */ -static int s_get_last_obj_iter_callback( - void *arg, - const char *key, - int ksize, - const char *val, - int vsize, - uint32_t expire, - uint64_t oid) -{ - UNUSED(ksize); - UNUSED(val); - UNUSED(vsize); - UNUSED(expire); - UNUSED(oid); - - if (--((obj_arg *)arg)->q == 0) { - s_serialize_val_to_dap_store_obj((pdap_store_obj_t)(((obj_arg *)arg)->o), key, val); - return false; - } - - return true; -} - -//** A callback function designed for finding a some items */ -static int dap_mdbx_get_some_obj_iter_callback( - void *arg, - const char *key, - int ksize, - const char *val, - int vsize, - uint32_t expire, - uint64_t oid) -{ - UNUSED(ksize); - UNUSED(val); - UNUSED(vsize); - UNUSED(expire); - UNUSED(oid); - - dap_store_obj_t *l_obj = (dap_store_obj_t *)((obj_arg *)arg)->o; - s_serialize_val_to_dap_store_obj(&l_obj[((obj_arg *)arg)->n - ((obj_arg *)arg)->q], key, val); - if (--((obj_arg *)arg)->q == 0) { - return false; - } - return true; -} - -//** A callback function designed for finding a some items by conditionals */ -static int dap_mdbx_get_cond_obj_iter_callback( - void *arg, - const char *key, - int ksize, - const char *val, - int vsize, - uint32_t expire, - uint64_t oid) -{ - UNUSED(ksize); - UNUSED(val); - UNUSED(vsize); - UNUSED(expire); - UNUSED(oid); - - if (dap_hex_to_uint(val, sizeof(uint64_t)) < ((obj_arg *)arg)->id) { - return true; - } - pdap_store_obj_t l_obj = (pdap_store_obj_t)((obj_arg *)arg)->o; - s_serialize_val_to_dap_store_obj(&l_obj[((obj_arg *)arg)->n - ((obj_arg *)arg)->q], key, val); - if (--((obj_arg *)arg)->q == 0) { - return false; - } - return true; -} - -//** A callback function designed for counting items*/ -bool dap_mdbx_get_count_iter_callback( - void *arg, - const char *key, - int ksize, - const char *val, - int vsize, - uint32_t expire, - uint64_t oid) -{ - UNUSED(ksize); - UNUSED(val); - UNUSED(vsize); - UNUSED(expire); - UNUSED(oid); - UNUSED(key); - - if (dap_hex_to_uint(val, sizeof(uint64_t)) < ((obj_arg *)arg)->id) - return true; - - if (--((obj_arg *)arg)->q == 0) - return false; - - return true; -} - - /** * @brief Gets CDB by a_group. * @param a_group a group name @@ -421,9 +286,9 @@ static dap_db_ctx_t *s_get_db_ctx_by_group(const char *a_group) { dap_db_ctx_t *l_db_ctx = NULL; - assert ( !pthread_rwlock_rdlock(&s_db_rwlock) ); + assert ( !pthread_rwlock_rdlock(&s_db_ctxs_rwlock) ); HASH_FIND_STR(s_db_ctxs, a_group, l_db_ctx); - assert ( !pthread_rwlock_unlock(&s_db_rwlock) ); + assert ( !pthread_rwlock_unlock(&s_db_ctxs_rwlock) ); if ( !l_db_ctx ) log_it(L_ERROR, "No DB context for the group '%s'", a_group); @@ -447,28 +312,82 @@ static int s_flush(void) */ dap_store_obj_t *s_read_last_store_obj(const char* a_group) { +int l_rc; dap_db_ctx_t *l_db_ctx; +MDBX_val l_key, l_data, l_last_data, l_last_key; +MDBX_cursor *l_cursor = NULL; +struct __record_suffix__ *l_suff; +uint64_t l_id; +dap_store_obj_t *l_obj; - if (!a_group) + if (!a_group) /* Sanity check */ return NULL; - if (!(l_db_ctx = s_get_db_ctx_by_group(a_group)) ) + if ( !(l_db_ctx = s_get_db_ctx_by_group(a_group)) ) /* Get free DB Context */ return NULL; -#if 0 - void *l_iter = cdb_iterate_new(l_cdb, 0); - obj_arg l_arg; - l_arg.o = DAP_NEW_Z(dap_store_obj_t); - l_arg.q = l_cdb_stat.rnum; - cdb_iterate(l_cdb, dap_mdbx_get_last_obj_iter_callback, (void*)&l_arg, l_iter); - cdb_iterate_destroy(l_cdb, l_iter); - l_arg.o->group = dap_strdup(a_group); + if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, MDBX_TXN_RDONLY, &l_db_ctx->txn)) ) + return log_it (L_ERROR, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)), NULL; + do { + l_cursor = NULL; + l_id = 0; + l_last_key = l_last_data = (MDBX_val) {0, 0}; - return l_arg.o; -#endif + if ( MDBX_SUCCESS != (l_rc = mdbx_cursor_open(l_db_ctx->txn, l_db_ctx->dbi, &l_cursor)) ) { + log_it (L_ERROR, "mdbx_cursor_open: (%d) %s", l_rc, mdbx_strerror(l_rc)); + break; + } + + /* Iterate cursor to retieve records from DB - slect a <key> and <data> pair + ** with maximal <id> + */ + while ( MDBX_SUCCESS == (l_rc = mdbx_cursor_get(l_cursor, &l_key, &l_data, MDBX_NEXT)) ) + { + l_suff = (struct __record_suffix__ *) (l_data.iov_base + l_data.iov_len - sizeof(struct __record_suffix__)); + if ( l_id < l_suff->id ) + { + l_id = l_suff->id; + l_last_key = l_key; + l_last_data = l_data; + } + } + + } while (0); + + if (l_cursor) + mdbx_cursor_close(l_cursor); + if (l_db_ctx->txn) + mdbx_txn_commit(l_db_ctx->txn); + + + if ( !(l_last_key.iov_len | l_data.iov_len) ) /* Not found anything - return NULL */ + return NULL; + + + /* Found ! Allocate memory to <store object> < and <value> */ + if ( (l_obj = DAP_CALLOC(1, sizeof( dap_store_obj_t ))) ) + { + if ( (l_obj->value = DAP_MALLOC((l_data.iov_len + 1) - sizeof(struct __record_suffix__))) ) + { + /* Fill the <store obj> by data from the retreived record */ + l_obj->value = ((uint8_t *) l_obj) + sizeof( dap_store_obj_t ); + + l_obj->value_len = l_data.iov_len - sizeof(struct __record_suffix__); + memcpy(l_obj->value, l_data.iov_base, l_obj->value_len); + l_suff = (struct __record_suffix__ *) (l_data.iov_base + l_obj->value_len); + l_obj->id = l_suff->id; + l_obj->timestamp = l_suff->ts; + l_obj->flags = l_suff->flags; + assert ( !(l_obj->group = dap_strdup(a_group)) ); + } + else l_rc = MDBX_PROBLEM, log_it (L_ERROR, "Cannot allocate a memory for store object value, errno=%d", errno); + } + else l_rc = MDBX_PROBLEM, log_it (L_ERROR, "Cannot allocate a memory for store object, errno=%d", errno); + + return l_obj; } /** @@ -477,26 +396,30 @@ dap_db_ctx_t *l_db_ctx; * @param a_key the key * @return true or false */ -int s_is_obj(const char *a_group, const char *a_key) +int s_is_obj(const char *a_group, const char *a_key) { -bool l_ret = false; -dap_db_ctx_t l_db_ctx; -#if 0 -CDB *l_cdb = l_db_ctx->cdb; +int l_rc, l_rc2; +dap_db_ctx_t *l_db_ctx; +MDBX_val l_key, l_data; + + if (!a_group) /* Sanity check */ + return 0; - if(!a_group) - return false; + if ( !(l_db_ctx = s_get_db_ctx_by_group(a_group)) ) /* Get free DB Context */ + return 0; - if ( !(l_db_ctx = s_get_db_ctx_by_group(a_group)) ) - return false; + if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, MDBX_TXN_RDONLY, &l_db_ctx->txn)) ) + return log_it (L_ERROR, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)), 0; - if(a_key) { - //int l_vsize; - if( !cdb_is(l_cdb, a_key, (int) dap_strlen(a_key)) ) - l_ret = true; - } -#endif - return l_ret; + l_key.iov_base = (void *) a_key; /* Fill IOV for MDBX key */ + l_key.iov_len = strlen(a_key); + + l_rc = mdbx_get(l_db_ctx->txn, l_db_ctx->dbi, &l_key, &l_data); + + if ( MDBX_SUCCESS != (l_rc2 = mdbx_txn_commit(l_db_ctx->txn)) ) + log_it (L_ERROR, "mdbx_txn_commit: (%d) %s", l_rc2, mdbx_strerror(l_rc2)); + + return ( l_rc == MDBX_SUCCESS ); /*0 - RNF, 1 - SUCCESS */ } @@ -510,53 +433,81 @@ CDB *l_cdb = l_db_ctx->cdb; */ static dap_store_obj_t *s_read_cond_store_obj(const char *a_group, uint64_t a_id, size_t *a_count_out) { - if (!a_group) { +int l_rc; +dap_db_ctx_t *l_db_ctx; +MDBX_val l_key, l_data; +MDBX_cursor *l_cursor = NULL; +struct __record_suffix__ *l_suff; +dap_store_obj_t *l_obj; + + if (!a_group) /* Sanity check */ return NULL; - } - dap_db_ctx_t *l_db_ctx = s_get_db_ctx_by_group(a_group); - if (!l_db_ctx) { + + if ( !(l_db_ctx = s_get_db_ctx_by_group(a_group)) ) /* Get free DB Context */ return NULL; - } -#if 0 - CDB *l_cdb = l_db_ctx->cdb; - uint64_t l_count_out = 0; - if(a_count_out) { - l_count_out = *a_count_out; - } - CDBSTAT l_cdb_stat; - cdb_stat(l_cdb, &l_cdb_stat); + if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, MDBX_TXN_RDONLY, &l_db_ctx->txn)) ) + return log_it (L_ERROR, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)), NULL; - if (l_count_out == 0 || l_count_out > l_cdb_stat.rnum) { - l_count_out = l_cdb_stat.rnum; - } - obj_arg l_arg; - l_arg.o = DAP_NEW_Z_SIZE(dap_store_obj_t, l_count_out * sizeof(dap_store_obj_t)); - l_arg.n = l_count_out; - l_arg.q = l_count_out; - l_arg.id = a_id; - void *l_iter = cdb_iterate_new(l_cdb, 0); - /*l_count_out = */cdb_iterate(l_cdb, dap_mdbx_get_cond_obj_iter_callback, (void*)&l_arg, l_iter); - cdb_iterate_destroy(l_cdb, l_iter); - if (l_arg.q > 0) { - l_count_out = l_arg.n - l_arg.q; - void *tmp = DAP_REALLOC(l_arg.o, l_count_out * sizeof(dap_store_obj_t)); - if (!tmp && l_count_out) { - log_it(L_CRITICAL, "Couldn't re-allocate memory for portion of store objects!"); - DAP_DELETE(l_arg.o); - return NULL; + + do { + l_cursor = NULL; + *a_count_out = 0; + + if ( MDBX_SUCCESS != (l_rc = mdbx_cursor_open(l_db_ctx->txn, l_db_ctx->dbi, &l_cursor)) ) { + log_it (L_ERROR, "mdbx_cursor_open: (%d) %s", l_rc, mdbx_strerror(l_rc)); + break; } - l_arg.o = tmp; - } - if(a_count_out) { - *a_count_out = l_count_out; - } - for (uint64_t i = 0; i < l_count_out; ++i) { - l_arg.o[i].group = dap_strdup(a_group); + + /* Iterate cursor to retieve records from DB - slect a <key> and <data> pair + ** with maximal <id> + */ + l_key.iov_base = ""; + l_key.iov_len = 0; + + while ( MDBX_SUCCESS == (l_rc = mdbx_cursor_get(l_cursor, &l_key, &l_data, MDBX_NEXT)) ) + { + l_suff = (struct __record_suffix__ *) (l_data.iov_base + l_data.iov_len - sizeof(struct __record_suffix__)); + if ( a_id < l_suff->id ) { + *a_count_out = 1; + break; + } + } + + } while (0); + + if (l_cursor) + mdbx_cursor_close(l_cursor); + if (l_db_ctx->txn) + mdbx_txn_commit(l_db_ctx->txn); + + if ( l_rc != MDBX_SUCCESS ) + return NULL; + + /* Found ! Allocate memory to <store object> < and <value> */ + if ( (l_obj = DAP_CALLOC(1, sizeof( dap_store_obj_t ))) ) + { + if ( (l_obj->value = DAP_MALLOC((l_data.iov_len + 1) - sizeof(struct __record_suffix__))) ) + { + /* Fill the <store obj> by data from the retreived record */ + l_obj->value = ((uint8_t *) l_obj) + sizeof( dap_store_obj_t ); + + l_obj->value_len = l_data.iov_len - sizeof(struct __record_suffix__); + memcpy(l_obj->value, l_data.iov_base, l_obj->value_len); + + l_suff = (struct __record_suffix__ *) (l_data.iov_base + l_obj->value_len); + l_obj->id = l_suff->id; + l_obj->timestamp = l_suff->ts; + l_obj->flags = l_suff->flags; + assert ( !(l_obj->group = dap_strdup(a_group)) ); + } + else l_rc = MDBX_PROBLEM, log_it (L_ERROR, "Cannot allocate a memory for store object value, errno=%d", errno); } - return l_arg.o; -#endif + else l_rc = MDBX_PROBLEM, log_it (L_ERROR, "Cannot allocate a memory for store object, errno=%d", errno); + + + return l_obj; } @@ -568,27 +519,52 @@ static dap_store_obj_t *s_read_cond_store_obj(const char *a_group, uint64_t a_i */ size_t s_read_count_store(const char *a_group, uint64_t a_id) { - if (!a_group) { +int l_rc, l_count_out; +dap_db_ctx_t *l_db_ctx; +MDBX_val l_key, l_data; +MDBX_cursor *l_cursor = NULL; +struct __record_suffix__ *l_suff; + + if (!a_group) /* Sanity check */ return 0; - } -#if 0 - dap_db_ctx_t l_db_ctx = s_get_db_by_group(a_group); - if (!l_db_ctx) { + + if ( !(l_db_ctx = s_get_db_ctx_by_group(a_group)) ) /* Get free DB Context */ return 0; - } - CDB *l_cdb = l_db_ctx->cdb; - CDBSTAT l_cdb_stat; - cdb_stat(l_cdb, &l_cdb_stat); - obj_arg l_arg; - l_arg.q = l_cdb_stat.rnum; - l_arg.id = a_id; - void *l_iter = cdb_iterate_new(l_cdb, 0); - cdb_iterate(l_cdb, dap_mdbx_get_count_iter_callback, (void*)&l_arg, l_iter); - cdb_iterate_destroy(l_cdb, l_iter); - return l_cdb_stat.rnum - l_arg.q; -#endif + + + if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, MDBX_TXN_RDONLY, &l_db_ctx->txn)) ) + return log_it (L_ERROR, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)), 0; + + do { + l_cursor = NULL; + l_count_out = 0; + + if ( MDBX_SUCCESS != (l_rc = mdbx_cursor_open(l_db_ctx->txn, l_db_ctx->dbi, &l_cursor)) ) { + log_it (L_ERROR, "mdbx_cursor_open: (%d) %s", l_rc, mdbx_strerror(l_rc)); + break; + } + + /* Iterate cursor to retieve records from DB */ + while ( MDBX_SUCCESS == (l_rc = mdbx_cursor_get(l_cursor, &l_key, &l_data, MDBX_NEXT)) ) + { + l_suff = (struct __record_suffix__ *) (l_data.iov_base + l_data.iov_len - sizeof(struct __record_suffix__)); + l_count_out += (a_id == l_suff->id ); + } + + } while (0); + + if (l_cursor) + mdbx_cursor_close(l_cursor); + if (l_db_ctx->txn) + mdbx_txn_commit(l_db_ctx->txn); + + if ( l_count_out ) + log_it(L_WARNING, "A count of records with id: %zu - more then 1 (%d)", a_id, l_count_out); + + return l_count_out; } + static dap_list_t *s_get_groups_by_mask(const char *a_group_mask) { dap_list_t *l_ret_list; @@ -597,28 +573,19 @@ dap_db_ctx_t *l_db_ctx, *l_db_ctx2; if(!a_group_mask) return NULL; - assert ( !pthread_rwlock_rdlock(&s_db_rwlock) ); + assert ( !pthread_rwlock_rdlock(&s_db_ctxs_rwlock) ); HASH_ITER(hh, s_db_ctxs, l_db_ctx, l_db_ctx2) { if (!dap_fnmatch(a_group_mask, l_db_ctx->name, 0) ) /* Name match a pattern/mask ? */ l_ret_list = dap_list_prepend(l_ret_list, dap_strdup(l_db_ctx->name)); /* Add group name to output list */ } - assert ( !pthread_rwlock_rdlock(&s_db_rwlock) ); + assert ( !pthread_rwlock_rdlock(&s_db_ctxs_rwlock) ); return l_ret_list; } -/* - * Follows suffix structure is supposed to be added at end of MDBX record, so : - * <value> + <suffix> - */ -struct __record_suffix__ { - uint64_t id; - uint64_t flags; - uint64_t ts; -}; static int s_apply_store_obj (dap_store_obj_t *a_store_obj) { @@ -651,6 +618,8 @@ struct __record_suffix__ *l_suff; * so we are can performs a main work */ + assert ( !pthread_mutex_lock(&l_db_ctx->dbi_mutex) ); + if (a_store_obj->type == DAP_DB$K_OPTYPE_ADD ) { if( !a_store_obj->key ) @@ -682,17 +651,22 @@ struct __record_suffix__ *l_suff; /* So, finaly: start transaction, do INSERT, COMMIT or ABORT ... */ if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, 0, &l_db_ctx->txn)) ) + { + assert ( !pthread_mutex_unlock(&l_db_ctx->dbi_mutex) ); return DAP_FREE(l_val), log_it (L_ERROR, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)), -EIO; + } /* Generate <sequence number> for new record */ - if ( MDBX_RESULT_TRUE != mdbx_dbi_sequence (l_db_ctx->txn, l_db_ctx->dbi, &l_suff->id, 1) ) + if ( MDBX_SUCCESS != mdbx_dbi_sequence (l_db_ctx->txn, l_db_ctx->dbi, &l_suff->id, 1) ) { log_it (L_CRITICAL, "mdbx_dbi_sequence: (%d) %s", l_rc, mdbx_strerror(l_rc)); if ( MDBX_SUCCESS != (l_rc = mdbx_txn_abort(l_db_ctx->txn)) ) log_it (L_ERROR, "mdbx_txn_abort: (%d) %s", l_rc, mdbx_strerror(l_rc)); + assert ( !pthread_mutex_unlock(&l_db_ctx->dbi_mutex) ); + return DAP_FREE(l_val), -EIO; } @@ -702,7 +676,7 @@ struct __record_suffix__ *l_suff; { log_it (L_ERROR, "mdbx_put: (%d) %s", l_rc, mdbx_strerror(l_rc)); - if ( MDBX_SUCCESS != (l_rc2 = mdbx_txn_abort(l_db_ctx->txn)) ) + if ( MDBX_SUCCESS != (l_rc2 = mdbx_txn_commit(l_db_ctx->txn)) ) log_it (L_ERROR, "mdbx_abort: (%d) %s", l_rc2, mdbx_strerror(l_rc2)); } else if ( MDBX_SUCCESS != (l_rc = mdbx_txn_commit(l_db_ctx->txn)) ) @@ -711,6 +685,8 @@ struct __record_suffix__ *l_suff; if ( l_rc != MDBX_SUCCESS ) DAP_FREE(l_val); + assert ( !pthread_mutex_unlock(&l_db_ctx->dbi_mutex) ); + return ( l_rc == MDBX_SUCCESS ) ? 0 : -EIO; } /* DAP_DB$K_OPTYPE_ADD */ @@ -718,7 +694,11 @@ struct __record_suffix__ *l_suff; if (a_store_obj->type == DAP_DB$K_OPTYPE_DEL) { if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, 0, &l_db_ctx->txn)) ) + { + assert ( !pthread_mutex_unlock(&l_db_ctx->dbi_mutex) ); + return log_it (L_ERROR, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)), -ENOENT; + } if ( a_store_obj->key ) { /* Delete record */ @@ -734,15 +714,20 @@ struct __record_suffix__ *l_suff; } if ( l_rc != MDBX_SUCCESS ) { /* Check result of mdbx_drop/del */ - if ( MDBX_SUCCESS != (l_rc = mdbx_txn_abort(l_db_ctx->txn)) ) - l_rc2 = -EIO, log_it (L_ERROR, "mdbx_txn_abort: (%d) %s", l_rc, mdbx_strerror(l_rc)); + if ( MDBX_SUCCESS != (l_rc = mdbx_txn_commit(l_db_ctx->txn)) ) + l_rc2 = -EIO, log_it (L_ERROR, "mdbx_txn_commit: (%d) %s", l_rc, mdbx_strerror(l_rc)); } else if ( MDBX_SUCCESS != (l_rc = mdbx_txn_commit(l_db_ctx->txn)) ) - l_rc2 = -EIO, log_it (L_ERROR, "mdbx_txn_abort: (%d) %s", l_rc, mdbx_strerror(l_rc)); + l_rc2 = -EIO, log_it (L_ERROR, "mdbx_txn_commit: (%d) %s", l_rc, mdbx_strerror(l_rc)); + + assert ( !pthread_mutex_unlock(&l_db_ctx->dbi_mutex) ); return ( l_rc2 == MDBX_SUCCESS ) ? 0 : -EIO; } /* DAP_DB$K_OPTYPE_DEL */ + + assert ( !pthread_mutex_unlock(&l_db_ctx->dbi_mutex) ); + log_it (L_ERROR, "Unhandle/unknow DB opcode (%d/%#x)", a_store_obj->type, a_store_obj->type); return -EIO; @@ -776,13 +761,17 @@ struct __record_suffix__ *l_suff; if (!a_group) /* Sanity check */ return NULL; - if ( (l_db_ctx = s_get_db_ctx_by_group(a_group)) ) /* Get free DB Context */ + if ( !(l_db_ctx = s_get_db_ctx_by_group(a_group)) ) /* Get free DB Context */ return NULL; + assert ( !pthread_mutex_lock(&l_db_ctx->dbi_mutex) ); if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, MDBX_TXN_RDONLY, &l_db_ctx->txn)) ) + { + assert ( !pthread_mutex_unlock(&l_db_ctx->dbi_mutex) ); return log_it (L_ERROR, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)), NULL; + } @@ -794,13 +783,14 @@ struct __record_suffix__ *l_suff; { l_key.iov_base = (void *) a_key; /* Fill IOV for MDBX key */ l_key.iov_len = strlen(a_key); + l_obj = NULL; if ( MDBX_SUCCESS == (l_rc = mdbx_get(l_db_ctx->txn, l_db_ctx->dbi, &l_key, &l_data)) ) { /* Found ! Allocate memory to <store object> < and <value> */ if ( (l_obj = DAP_CALLOC(1, sizeof( dap_store_obj_t ))) ) { - if ( (l_obj->value = DAP_MALLOC((l_data.iov_len + 1) - sizeof(struct __record_suffix__))) ) + if ( (l_obj->value = DAP_CALLOC(1, (l_data.iov_len + 1) - sizeof(struct __record_suffix__))) ) { /* Fill the <store obj> by data from the retreived record */ l_obj->value = ((uint8_t *) l_obj) + sizeof( dap_store_obj_t ); @@ -812,6 +802,7 @@ struct __record_suffix__ *l_suff; l_obj->id = l_suff->id; l_obj->timestamp = l_suff->ts; l_obj->flags = l_suff->flags; + assert ( (l_obj->group = dap_strdup(a_group)) ); } else l_rc = MDBX_PROBLEM, log_it (L_ERROR, "Cannot allocate a memory for store object value, errno=%d", errno); } @@ -819,14 +810,17 @@ struct __record_suffix__ *l_suff; } else if ( l_rc != MDBX_NOTFOUND) log_it (L_ERROR, "mdbx_get: (%d) %s", l_rc, mdbx_strerror(l_rc)); - if ( MDBX_SUCCESS != (l_rc2 = mdbx_txn_abort(l_db_ctx->txn)) ) - log_it (L_ERROR, "mdbx_txn_abort: (%d) %s", l_rc2, mdbx_strerror(l_rc2)); + if ( MDBX_SUCCESS != (l_rc2 = mdbx_txn_commit(l_db_ctx->txn)) ) + log_it (L_ERROR, "mdbx_txn_commit: (%d) %s", l_rc2, mdbx_strerror(l_rc2)); - if ( l_rc != MDBX_SUCCESS ) { + if ( (l_rc != MDBX_SUCCESS) && l_obj ) { DAP_FREE(l_obj->value); DAP_DEL_Z(l_obj); } + + assert ( !pthread_mutex_unlock(&l_db_ctx->dbi_mutex) ); + return ( l_rc == MDBX_SUCCESS ) ? l_obj : NULL; } @@ -840,6 +834,8 @@ struct __record_suffix__ *l_suff; do { l_count_out = a_count_out? *a_count_out : DAP_DB$K_MAXOBJS; /* Limit a number of objects to be returned */ *a_count_out = 0; + l_cursor = NULL; + l_obj = NULL; /* * Retrive statistic for group/table, we need to compute a number of records can be retreived @@ -859,9 +855,8 @@ struct __record_suffix__ *l_suff; * Allocate memory for array of returned objects */ l_rc2 = l_count_out * sizeof(dap_store_obj_t); - if ( !(l_obj_arr = DAP_NEW_Z_SIZE(char, l_rc2)) ) { - log_it(L_ERROR, "Cannot allocate %zu octets for %d store objects", - l_count_out * sizeof(dap_store_obj_t), l_count_out); + if ( !(l_obj_arr = (dap_store_obj_t *) DAP_NEW_Z_SIZE(char, l_rc2)) ) { + log_it(L_ERROR, "Cannot allocate %zu octets for %d store objects", l_count_out * sizeof(dap_store_obj_t), l_count_out); break; } @@ -875,7 +870,7 @@ struct __record_suffix__ *l_suff; l_obj = l_obj_arr; for (int i = l_count_out; i && (l_rc = mdbx_cursor_get(l_cursor, &l_key, &l_data, MDBX_NEXT)); i--, l_obj++) { - if ( (l_obj->value = DAP_MALLOC((l_data.iov_len + 1) - sizeof(struct __record_suffix__))) ) + if ( (l_obj->value = DAP_CALLOC(1, (l_data.iov_len + 1) - sizeof(struct __record_suffix__))) ) { /* Fill the <store obj> by data from the retreived record */ l_obj->value = ((uint8_t *) l_obj) + sizeof( dap_store_obj_t ); @@ -887,6 +882,7 @@ struct __record_suffix__ *l_suff; l_obj->id = l_suff->id; l_obj->timestamp = l_suff->ts; l_obj->flags = l_suff->flags; + assert ( (l_obj->group = dap_strdup(a_group)) ); } else l_rc = MDBX_PROBLEM, log_it (L_ERROR, "Cannot allocate a memory for store object value, errno=%d", errno); } @@ -898,10 +894,16 @@ struct __record_suffix__ *l_suff; } } while (0); + if ( MDBX_SUCCESS != (l_rc2 = mdbx_txn_commit(l_db_ctx->txn)) ) + log_it (L_ERROR, "mdbx_txn_commit: (%d) %s", l_rc2, mdbx_strerror(l_rc2)); + if (l_cursor) mdbx_cursor_close(l_cursor); if (l_db_ctx->txn) - mdbx_txn_abort(l_db_ctx->txn); + mdbx_txn_commit(l_db_ctx->txn); + + + assert ( !pthread_mutex_unlock(&l_db_ctx->dbi_mutex) ); return l_obj_arr; } diff --git a/modules/global-db/include/dap_chain_global_db_driver.h b/modules/global-db/include/dap_chain_global_db_driver.h index aa7e4a53db49e44d627de9db638c5bea3be9d3ee..03baf208dc34a5f085c8884d6d5fd8aaf8d8cabb 100644 --- a/modules/global-db/include/dap_chain_global_db_driver.h +++ b/modules/global-db/include/dap_chain_global_db_driver.h @@ -106,9 +106,6 @@ typedef struct dap_db_driver_callbacks { dap_db_driver_callback_t deinit; dap_db_driver_callback_t flush; - - void *db_ctx; /* A context is created and maintained internaly by the - partucular DB driver - don't touch until u understand how it works at all */ } dap_db_driver_callbacks_t; diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 667555634c3bc377284f5ea970a80913937675b1..8adc7cf4dc9dd7294ee77465e01734b018a9b9a6 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -1376,7 +1376,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) { l_node_list_cur = dap_list_next(l_node_list_cur); } dap_chain_node_info_list_free(l_node_list); - + } else { log_it(L_ATT, "Not use bootstrap addresses, fill seed nodelist from root aliases"); pthread_rwlock_unlock(&l_net_pvt->rwlock); @@ -1665,9 +1665,9 @@ void s_set_reply_text_node_status(char **a_str_reply, dap_chain_net_t * a_net){ /** * @brief get type of chain - * - * @param l_chain - * @return char* + * + * @param l_chain + * @return char* */ const char* dap_chain_net_get_type(dap_chain_t *l_chain) { @@ -1689,14 +1689,14 @@ void s_chain_net_ledger_cache_reload(dap_chain_net_t *l_net) { dap_chain_ledger_purge(l_net->pub.ledger, false); dap_chain_t *l_chain = NULL; - DL_FOREACH(l_net->pub.chains, l_chain) + DL_FOREACH(l_net->pub.chains, l_chain) { - if (l_chain->callback_purge) + if (l_chain->callback_purge) l_chain->callback_purge(l_chain); - if (!strcmp(DAP_CHAIN_PVT(l_chain)->cs_name, "none")) + if (!strcmp(DAP_CHAIN_PVT(l_chain)->cs_name, "none")) dap_chain_gdb_ledger_load((char *)dap_chain_gdb_get_group(l_chain), l_chain); - else + else dap_chain_load_all(l_chain); } bool l_processed; @@ -2562,7 +2562,7 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) } char * l_chains_path = dap_strdup_printf("%s/network/%s", dap_config_path(), l_net->pub.name); DIR * l_chains_dir = opendir(l_chains_path); - DAP_DELETE (l_chains_path); + DAP_DEL_Z(l_chains_path); if ( l_chains_dir ){ // for sequential loading chains dap_list_t *l_prior_list = NULL; @@ -2624,7 +2624,7 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) l_chain->id.uint64, l_chain->name,l_chain02->name); log_it(L_ERROR, "Please, fix your configs and restart node"); return -2; - } + } if (!dap_strcmp(l_chain->name, l_chain02->name)) { log_it(L_ERROR, "Your network %s has chains with duplicate names %s: chain01 id = 0x%"DAP_UINT64_FORMAT_U", chain02 id = 0x%"DAP_UINT64_FORMAT_U"",l_chain->net_name, @@ -2632,9 +2632,9 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) log_it(L_ERROR, "Please, fix your configs and restart node"); return -2; } - } + } } - } + } bool l_processed; do { @@ -2678,7 +2678,7 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) if (l_chain ) l_chain->is_datum_pool_proc = true; l_net_pvt->only_static_links = true; - l_target_state = NET_STATE_ONLINE; + l_target_state = NET_STATE_ONLINE; log_it(L_INFO,"Root node role established"); } break; case NODE_ROLE_CELL_MASTER: