diff --git a/modules/global-db/dap_chain_global_db.c b/modules/global-db/dap_chain_global_db.c index d425ad77c57a515c1d5251ce0fdbe13b9aa16eec..a87f43fed0b743b11f79747cc2547f2b42d851c3 100644 --- a/modules/global-db/dap_chain_global_db.c +++ b/modules/global-db/dap_chain_global_db.c @@ -157,8 +157,10 @@ void dap_chain_global_db_obj_clean(dap_global_db_obj_t *obj) { if(!obj) return; + DAP_DELETE(obj->key); DAP_DELETE(obj->value); + obj->key = NULL; obj->value = NULL; } @@ -180,12 +182,18 @@ void dap_chain_global_db_obj_delete(dap_global_db_obj_t *obj) * @param a_count a number of objects in the array * @return (none) */ -void dap_chain_global_db_objs_delete(dap_global_db_obj_t *objs, size_t a_count) +void dap_chain_global_db_objs_delete(dap_global_db_obj_t *a_objs, size_t a_count) { - for(size_t i = 0; i < a_count; i++) { - dap_chain_global_db_obj_clean(objs + i); - } - DAP_DELETE(objs); +dap_global_db_obj_t *l_objs; +size_t i; + + if ( !a_objs || !a_count ) /* Sanity checks */ + return; + + for(l_objs = a_objs, i = a_count; i--; l_objs++) /* Run over array's elements */ + dap_chain_global_db_obj_clean(a_objs); + + DAP_DELETE(a_objs); /* Finaly kill the the array */ } /** @@ -517,10 +525,11 @@ static sync_group_item_t *find_item_by_mask(sync_group_item_t *a_items, const ch * @param a_store_data a pointer to an object * @return (none) */ -void dap_global_db_obj_track_history(void* a_store_data) +void dap_global_db_obj_track_history(dap_store_obj_t *a_store_data) { - dap_store_obj_t *l_obj = (dap_store_obj_t *)a_store_data; + dap_store_obj_t *l_obj = a_store_data; sync_group_item_t *l_sync_group_item = find_item_by_mask(s_sync_group_items, l_obj->group); + if (l_sync_group_item) { if(l_sync_group_item->callback_notify) { l_sync_group_item->callback_notify(l_sync_group_item->callback_arg, diff --git a/modules/global-db/dap_chain_global_db_driver.c b/modules/global-db/dap_chain_global_db_driver.c index 74ddac6d9a0b9bda0e7b9c3c5053a83decb65cd8..be3492451e96d9d16c5845bea1475b45b71f9fd8 100644 --- a/modules/global-db/dap_chain_global_db_driver.c +++ b/modules/global-db/dap_chain_global_db_driver.c @@ -20,6 +20,10 @@ You should have received a copy of the GNU General Public License along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. + * + * MODIFICATION HISTORY: + * + * 24-FEB-2022 RRL Added Async I/O functionality for DB request processing */ #include <stddef.h> @@ -28,12 +32,15 @@ #include <string.h> #include <pthread.h> #include <assert.h> +#include <unistd.h> #include "dap_common.h" #include "dap_file_utils.h" #include "dap_strfuncs.h" -#include "dap_list.h" #include "dap_hash.h" +#include "dap_worker.h" +#include "dap_proc_queue.h" +#include "dap_events.h" #include "dap_chain_global_db_driver_sqlite.h" #include "dap_chain_global_db_driver_cdb.h" @@ -44,10 +51,83 @@ #define LOG_TAG "db_driver" // A selected database driver. -static char *s_used_driver = NULL; +static char s_used_driver [32]; /* Name of the driver */ + + +static dap_db_driver_callbacks_t s_drv_callback; /* A set of interface routines for the selected + DB Driver at startup time */ + +static int s_db_drvmode_async = 0; /* Set a kind of processing requests to DB: + <> 0 - Async mode should be used */ + + + +/* + * Nano API for Simple linked list - by BadAss SysMan + * Attention!!! No internaly locking is performed ! + */ +typedef struct __dap_slist_elm__ { + struct __dap_slist_elm__ *flink; /* Forward link */ + void *data; /* Pointer to carried data area */ + size_t datasz; /* A data portion size */ +} DAP_SLIST_ELM; + +typedef struct __dap_slist__ { + DAP_SLIST_ELM *head, /* An address of first element */ + *tail; /* An address of last element */ + int nr; /* A number of elements in list */ +} DAP_SLIST; + + +static pthread_mutex_t s_db_reqs_list_lock = PTHREAD_MUTEX_INITIALIZER; /* Lock to coordinate access to the <s_db_reqs_queue> */ +static DAP_SLIST s_db_reqs_list = {0}; /* A queue of request to DB - maintained in + the Async mode only */ + +static inline int s_dap_insqtail ( DAP_SLIST *q, dap_store_obj_t *data, int datasz) +{ +DAP_SLIST_ELM *elm; + + if ( !(elm = DAP_MALLOC(sizeof(DAP_SLIST_ELM))) ) /* Allocate memory for new element */ + return -ENOMEM; + + elm->flink = NULL; /* This element is terminal */ + elm->data = data; /* Store pointer to carried data */ + elm->datasz= datasz; /* A size of daa metric */ + + if ( q->tail ) /* Queue is not empty ? */ + (q->tail)->flink = elm; /* Correct forward link of "previous last" element + to point to new element */ + + q->tail = elm; /* Pointe list's tail to new element also */ + + if ( !q->head ) /* This is a first element in the list ? */ + q->head = elm; /* point head to the new element */ + + q->nr++; /* Adjust entries counter */ + log_it(L_DEBUG, "Put data: %p, size: %d (qlen: %d)", data, datasz, q->nr); + return 0; +} + +static inline int s_dap_remqhead ( DAP_SLIST *q, dap_store_obj_t **data, size_t *datasz) +{ +DAP_SLIST_ELM *elm; + + if ( !(elm = q->head) ) /* Queue is empty - just return error code */ + return -ENOENT; + + if ( !(q->head = elm->flink) ) /* Last element in the queue ? */ + q->tail = NULL; /* Reset tail to NULL */ + + *data = elm->data; + *datasz = elm->datasz; + + DAP_FREE(elm); /* Release memory has been allocated for the queue's element */ + + q->nr--; /* Adjust entries counter */ + log_it(L_DEBUG, "Get data: %p, size: %d (qlen: %d)", *data, *datasz, q->nr); + return 0; +} -static dap_db_driver_callbacks_t s_drv_callback; -static bool s_db_drvmode_async = false; /** * @brief Initializes a database driver. @@ -61,16 +141,22 @@ static bool s_db_drvmode_async = false; int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db, bool db_drvmode_async) { - int l_ret = -1; - if(s_used_driver) +int l_ret = -1; + + if (s_used_driver[0] ) dap_db_driver_deinit(); // Fill callbacks with zeros memset(&s_drv_callback, 0, sizeof(dap_db_driver_callbacks_t)); - s_db_drvmode_async = db_drvmode_async; + + if ( (s_db_drvmode_async = db_drvmode_async) ) /* Set a kind of processing requests to DB: <> 0 - Async mode should be used */ + { + s_db_reqs_list.head = s_db_reqs_list.tail = NULL; + s_db_reqs_list.nr = 0; + } // Setup driver name - s_used_driver = dap_strdup(a_driver_name); + strncpy( s_used_driver, a_driver_name, sizeof(s_used_driver) - 1); dap_mkdir_with_parents(a_filename_db); @@ -82,19 +168,21 @@ int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db, if(!dap_strcmp(s_used_driver, "ldb")) l_ret = -1; else if(!dap_strcmp(s_used_driver, "sqlite") || !dap_strcmp(s_used_driver, "sqlite3") ) - l_ret = dap_db_driver_sqlite_init(l_db_path_ext, &s_drv_callback, db_drvmode_async); + l_ret = dap_db_driver_sqlite_init(l_db_path_ext, &s_drv_callback, db_drvmode_async); else if(!dap_strcmp(s_used_driver, "cdb")) - l_ret = dap_db_driver_cdb_init(l_db_path_ext, &s_drv_callback, db_drvmode_async); + l_ret = dap_db_driver_cdb_init(l_db_path_ext, &s_drv_callback, db_drvmode_async); + #ifdef DAP_CHAIN_GDB_ENGINE_MDBX else if(!dap_strcmp(s_used_driver, "mdbx")) - l_ret = dap_db_driver_mdbx_init(l_db_path_ext, &s_drv_callback, db_drvmode_async); + l_ret = dap_db_driver_mdbx_init(l_db_path_ext, &s_drv_callback, db_drvmode_async); #endif #ifdef DAP_CHAIN_GDB_ENGINE_PGSQL else if(!dap_strcmp(s_used_driver, "pgsql")) - l_ret = dap_db_driver_pgsql_init(l_db_path_ext, &s_drv_callback, db_drvmode_async); + l_ret = dap_db_driver_pgsql_init(l_db_path_ext, &s_drv_callback, db_drvmode_async); #endif else log_it(L_ERROR, "Unknown global_db driver \"%s\"", a_driver_name); + return l_ret; } @@ -105,13 +193,24 @@ int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db, */ void dap_db_driver_deinit(void) { + log_it(L_NOTICE, "DeInit for %s ...", s_used_driver); + + if ( s_db_drvmode_async ) /* Let's finishing outstanding DB request ... */ + { + for ( int i = 7; i-- && s_db_reqs_list.nr; ) + { + log_it(L_WARNING, "Let's finished outstanding DB requests (%d) ... ", s_db_reqs_list.nr); + for ( int j = 3; (j = sleep(j)); ); /* Hibernate for 3 seconds ... */ + } + + log_it(L_INFO, "Number of outstanding DB requests: %d", s_db_reqs_list.nr); + } + // deinit driver if(s_drv_callback.deinit) s_drv_callback.deinit(); - if(s_used_driver){ - DAP_DELETE(s_used_driver); - s_used_driver = NULL; - } + + s_used_driver [ 0 ] = '\0'; } /** @@ -120,7 +219,7 @@ void dap_db_driver_deinit(void) */ int dap_db_driver_flush(void) { - return s_drv_callback.flush(); + return s_db_drvmode_async ? 0 : s_drv_callback.flush(); } /** @@ -131,17 +230,26 @@ int dap_db_driver_flush(void) */ dap_store_obj_t* dap_store_obj_copy(dap_store_obj_t *a_store_obj, size_t a_store_count) { +dap_store_obj_t *l_store_obj, *l_store_obj_dst, *l_store_obj_src; + if(!a_store_obj || !a_store_count) return NULL; - dap_store_obj_t *l_store_obj = DAP_NEW_SIZE(dap_store_obj_t, sizeof(dap_store_obj_t) * a_store_count); - for(size_t i = 0; i < a_store_count; i++) { - dap_store_obj_t *l_store_obj_dst = l_store_obj + i; - dap_store_obj_t *l_store_obj_src = a_store_obj + i; - memcpy(l_store_obj_dst, l_store_obj_src, sizeof(dap_store_obj_t)); + + if ( !(l_store_obj = DAP_NEW_SIZE(dap_store_obj_t, sizeof(dap_store_obj_t) * a_store_count)) ) + return NULL; + + l_store_obj_dst = l_store_obj; + l_store_obj_src = a_store_obj; + + for( int i = a_store_count; i--; l_store_obj_dst++, l_store_obj_src++) + { + *l_store_obj_dst = *l_store_obj_src; + l_store_obj_dst->group = dap_strdup(l_store_obj_src->group); l_store_obj_dst->key = dap_strdup(l_store_obj_src->key); l_store_obj_dst->value = DAP_DUP_SIZE(l_store_obj_src->value, l_store_obj_src->value_len); } + return l_store_obj; } @@ -153,14 +261,19 @@ dap_store_obj_t* dap_store_obj_copy(dap_store_obj_t *a_store_obj, size_t a_store */ void dap_store_obj_free(dap_store_obj_t *a_store_obj, size_t a_store_count) { +dap_store_obj_t *l_store_obj_cur; + if(!a_store_obj) return; - for(size_t i = 0; i < a_store_count; i++) { - dap_store_obj_t *l_store_obj_cur = a_store_obj + i; + + l_store_obj_cur = a_store_obj; + + for (int i = a_store_count; i--; l_store_obj_cur++ ) { DAP_DELETE((char *)l_store_obj_cur->group); DAP_DELETE((char *)l_store_obj_cur->key); DAP_DELETE((char *)l_store_obj_cur->value); } + DAP_DELETE(a_store_obj); } @@ -188,13 +301,14 @@ char* dap_chain_global_db_driver_hash(const uint8_t *data, size_t data_size) return a_str; } + /** * @brief Applies objects to database. * @param a_store an pointer to the objects * @param a_store_count a number of objectss * @return Returns 0, if successful. */ -int dap_chain_global_db_driver_apply(pdap_store_obj_t a_store_obj, size_t a_store_count) +static inline int s_dap_chain_global_db_driver_apply_do(dap_store_obj_t *a_store_obj, size_t a_store_count) { int l_ret; dap_store_obj_t *l_store_obj_cur; @@ -202,29 +316,126 @@ dap_store_obj_t *l_store_obj_cur; if(!a_store_obj || !a_store_count) return -1; - // apply to database - if(a_store_count > 1 && s_drv_callback.transaction_start) - s_drv_callback.transaction_start(); + log_it(L_DEBUG, "[%p] Process DB Request ...", a_store_obj); - l_store_obj_cur = a_store_obj; /* We have to use a power of the address's incremental arithmetic */ - l_ret = 0; /* Preset return code to OK */ + l_store_obj_cur = a_store_obj; /* We have to use a power of the address's incremental arithmetic */ + l_ret = 0; /* Preset return code to OK */ + + if (a_store_count > 1 && s_drv_callback.transaction_start) + s_drv_callback.transaction_start(); if(s_drv_callback.apply_store_obj) for(int i = a_store_count; (!l_ret) && i--; l_store_obj_cur++) { assert(l_store_obj_cur); /* Sanity check */ if ( 1 == (l_ret = s_drv_callback.apply_store_obj(l_store_obj_cur)) ) - log_it(L_INFO, "Item is missing (may be already deleted) %s/%s\n", l_store_obj_cur->group, l_store_obj_cur->key); + log_it(L_INFO, "[%p] Item is missing (may be already deleted) %s/%s", a_store_obj, l_store_obj_cur->group, l_store_obj_cur->key); else if (l_ret < 0) - log_it(L_ERROR, "Can't write item %s/%s (code %d)\n", l_store_obj_cur->group, l_store_obj_cur->key, l_ret); + log_it(L_ERROR, "[%p] Can't write item %s/%s (code %d)", a_store_obj, l_store_obj_cur->group, l_store_obj_cur->key, l_ret); } if(a_store_count > 1 && s_drv_callback.transaction_end) s_drv_callback.transaction_end(); + log_it(L_DEBUG, "[%p] Finished DB Request (code %d)", a_store_obj, l_ret); return l_ret; } + +static bool s_dap_driver_req_exec_complete (struct dap_proc_thread *a_dap_thd __attribute__((unused)), + void *arg) +{ + log_it(L_DEBUG, "[%p] Process callback on completion DB request ...", arg); + + return 1; /* 1 - Don't call it again */ +} + +static bool s_dap_driver_req_exec (struct dap_proc_thread *a_dap_thd __attribute__((unused)), + void *arg __attribute__((unused)) ) +{ +int l_ret; +dap_store_obj_t *l_store_obj_cur; +dap_worker_t *l_dap_worker; +size_t l_store_obj_cnt; + + log_it(L_DEBUG, "Entering, %d entries in the queue ...", s_db_reqs_list.nr); + + assert ( !pthread_mutex_lock(&s_db_reqs_list_lock) ); /* Get exclusive access to the request list */ + + if ( !s_db_reqs_list.nr ) /* Nothing to do ?! Just exit */ + { + assert ( !pthread_mutex_unlock(&s_db_reqs_list_lock) ); + return 1; /* 1 - Don't call it again */ + } + + if ( (l_ret = s_dap_remqhead (&s_db_reqs_list, &l_store_obj_cur, &l_store_obj_cnt)) ) + { + assert ( !pthread_mutex_unlock(&s_db_reqs_list_lock) ); + log_it(L_ERROR, "DB Request list is in incosistence state (code %d)", l_ret); + return 1; /* 1 - Don't call it again */ + } + + /* So at this point we are ready to do work in the DB */ + s_dap_chain_global_db_driver_apply_do(l_store_obj_cur, l_store_obj_cnt); + + assert ( !pthread_mutex_unlock(&s_db_reqs_list_lock) ); + + /* Enqueue "Exec Complete" callback routine */ + l_dap_worker = dap_events_worker_get_auto (); + if ( (l_ret = dap_proc_queue_add_callback(l_dap_worker, s_dap_driver_req_exec_complete, l_store_obj_cur)) ) + log_it(L_ERROR, "Enqueue completion callback for item %s/%s (code %d)", l_store_obj_cur->group, l_store_obj_cur->key, l_ret); + + return 1; /* 1 - Don't call it again */ +} + + +/** + * @brief Applies objects to database. + * @param a_store an pointer to the objects + * @param a_store_count a number of objectss + * @return Returns 0, if successful. + */ +int dap_chain_global_db_driver_apply(dap_store_obj_t *a_store_obj, size_t a_store_count) +{ +int l_ret; +dap_store_obj_t *l_store_obj_cur; +dap_worker_t *l_dap_worker; + + if(!a_store_obj || !a_store_count) + return -1; + + if ( !s_db_drvmode_async ) + s_dap_chain_global_db_driver_apply_do(a_store_obj, a_store_count); + + /* Async mode - put request into the list for deffered processing */ + l_ret = -ENOMEM; /* Preset return code to non-OK */ + + assert ( !pthread_mutex_lock(&s_db_reqs_list_lock) ); /* Get exclusive access to the request list */ + + if ( !(l_store_obj_cur = dap_store_obj_copy(a_store_obj, a_store_count)) ) + l_ret = - ENOMEM, log_it(L_ERROR, "[%p] No memory for DB Request for item %s/%s", a_store_obj, a_store_obj->group, a_store_obj->key); + else if ( (l_ret = s_dap_insqtail (&s_db_reqs_list, l_store_obj_cur, a_store_count)) ) + log_it(L_ERROR, "[%p] Can't enqueue DB request for item %s/%s (code %d)", a_store_obj, a_store_obj->group, a_store_obj->key, l_ret); + + assert ( !pthread_mutex_unlock(&s_db_reqs_list_lock) ); + + if ( !l_ret ) + { /* So finaly enqueue an execution routine */ + if ( !(l_dap_worker = dap_events_worker_get_auto ()) ) + l_ret = -EBUSY, log_it(L_ERROR, "[%p] Error process DB request for %s/%s, dap_events_worker_get_auto()->NULL", a_store_obj, l_store_obj_cur->group, l_store_obj_cur->key); + else l_ret = dap_proc_queue_add_callback(l_dap_worker, s_dap_driver_req_exec, NULL); + } + + log_it(L_DEBUG, "[%p DB Request has been enqueued (code %d)", l_store_obj_cur, l_ret); + + return l_ret; +} + + + + + + /** * @brief Adds objects to a database. * @param a_store_obj objects to be added diff --git a/modules/global-db/dap_chain_global_db_hist.c b/modules/global-db/dap_chain_global_db_hist.c index d6e93f0569627dd4cc3630d3ad5f73b8fab4628d..83f33885d3a0e407b7dd3579a2e149e2f2091756 100644 --- a/modules/global-db/dap_chain_global_db_hist.c +++ b/modules/global-db/dap_chain_global_db_hist.c @@ -37,7 +37,7 @@ static uint16_t s_size_white_list = 0; /** * @brief Packs members of a_rec structure into a single string. - * + * * @param a_rec a pointer to the structure * @return Returns the string. */ @@ -50,10 +50,10 @@ static char* dap_db_history_pack_hist(dap_global_db_hist_t *a_rec) /** * @brief Unpacks a single string into a structure. - * + * * @param l_str_in the string * @param a_rec_out the structure - * @return Returns 1 if successful, otherwise -1. + * @return Returns 1 if successful, otherwise -1. */ static int dap_db_history_unpack_hist(char *l_str_in, dap_global_db_hist_t *a_rec_out) { @@ -71,7 +71,7 @@ static int dap_db_history_unpack_hist(char *l_str_in, dap_global_db_hist_t *a_re /** * @brief Gets a current time with a suffix. - * + * * @return Returns a string containing a current time and a suffix. */ static char* dap_db_new_history_timestamp() @@ -101,18 +101,20 @@ static char* dap_db_new_history_timestamp() /** * @brief Adds data to the history log. - * + * * @param a_type a type of record * @param a_store_obj a pointer to the object structure * @param a_dap_store_count a number of objects * @param a_group a group name string * @return Returns true if successful, otherwise false. */ -bool dap_db_history_add(char a_type, pdap_store_obj_t a_store_obj, size_t a_dap_store_count, const char *a_group) +bool dap_db_history_add(uint32_t a_type, dap_store_obj_t *a_store_obj, size_t a_dap_store_count, const char *a_group) { if(!a_store_obj || a_dap_store_count <= 0) return false; + dap_global_db_hist_t l_rec; + l_rec.keys_count = a_dap_store_count; l_rec.type = a_type; // group name should be always the same @@ -154,7 +156,7 @@ bool dap_db_history_add(char a_type, pdap_store_obj_t a_store_obj, size_t a_dap_ /** * @brief Gets last id of the log. - * + * * @param a_group_name a group name string * @return Returns id if succeessful. */ @@ -171,7 +173,7 @@ uint64_t dap_db_log_get_group_last_id(const char *a_group_name) /** * @brief Gets last id of local.history group. - * + * * @return Returns id if succeess. */ uint64_t dap_db_log_get_last_id(void) @@ -181,7 +183,7 @@ uint64_t dap_db_log_get_last_id(void) /** * @brief A function for a thread for reading a log list - * + * * @param arg a pointer to the log list structure * @return Returns NULL. */ @@ -250,7 +252,7 @@ static void *s_list_thread_proc(void *arg) /** * @brief Starts a thread that readding a log list * @note instead dap_db_log_get_list() - * + * * @param a_addr a pointer to the structure * @param a_flags flags * @return Returns a pointer to the log list structure if successful, otherwise NULL pointer. @@ -276,23 +278,23 @@ dap_db_log_list_t* dap_db_log_list_start(dap_chain_node_addr_t a_addr, int a_fla static uint16_t s_size_ban_list = 0; static char **s_ban_list = NULL; - static uint16_t s_size_white_list = 0; - static char **s_white_list = NULL; + static uint16_t s_size_white_list = 0; + static char **s_white_list = NULL; static bool l_try_read_ban_list = false; - static bool l_try_read_white_list = false; + static bool l_try_read_white_list = false; if (!l_try_read_ban_list) { s_ban_list = dap_config_get_array_str(g_config, "stream_ch_chain", "ban_list_sync_groups", &s_size_ban_list); l_try_read_ban_list = true; } - if (!l_try_read_white_list) { - s_white_list = dap_config_get_array_str(g_config, "stream_ch_chain", "white_list_sync_groups", &s_size_white_list); - l_try_read_white_list = true; - } + if (!l_try_read_white_list) { + s_white_list = dap_config_get_array_str(g_config, "stream_ch_chain", "white_list_sync_groups", &s_size_white_list); + l_try_read_white_list = true; + } - /* delete if not condition */ - if (s_size_white_list > 0) { + /* delete if not condition */ + if (s_size_white_list > 0) { for (dap_list_t *l_groups = l_dap_db_log_list->groups; l_groups; ) { bool l_found = false; for (int i = 0; i < s_size_white_list; i++) { @@ -305,10 +307,10 @@ dap_db_log_list_t* dap_db_log_list_start(dap_chain_node_addr_t a_addr, int a_fla dap_list_t *l_tmp = l_groups->next; l_dap_db_log_list->groups = dap_list_delete_link(l_dap_db_log_list->groups, l_groups); l_groups = l_tmp; - } + } l_groups = dap_list_next(l_groups); } - } else if (s_size_ban_list > 0) { + } else if (s_size_ban_list > 0) { for (dap_list_t *l_groups = l_dap_db_log_list->groups; l_groups; ) { bool l_found = false; for (int i = 0; i < s_size_ban_list; i++) { @@ -349,7 +351,7 @@ dap_db_log_list_t* dap_db_log_list_start(dap_chain_node_addr_t a_addr, int a_fla /** * @brief Gets a number of objects from a log list. - * + * * @param a_db_log_list a pointer to the log list structure * @return Returns the number if successful, otherwise 0. */ @@ -366,7 +368,7 @@ size_t dap_db_log_list_get_count(dap_db_log_list_t *a_db_log_list) /** * @brief Gets a number of rest objects from a log list. - * + * * @param a_db_log_list a pointer to the log list structure * @return Returns the number if successful, otherwise 0. */ @@ -383,7 +385,7 @@ size_t dap_db_log_list_get_count_rest(dap_db_log_list_t *a_db_log_list) /** * @brief Gets an object from a list. - * + * * @param a_db_log_list a pointer to the log list * @return Returns a pointer to the object. */ @@ -406,7 +408,7 @@ dap_db_log_list_obj_t *dap_db_log_list_get(dap_db_log_list_t *a_db_log_list) /** * @brief Deallocates memory of a list item - * + * * @param a_item a pointer to the list item * @returns (none) */ @@ -419,7 +421,7 @@ void dap_db_log_list_delete_item(void *a_item) /** * @brief Deallocates memory of a log list. - * + * * @param a_db_log_list a pointer to the log list structure * @returns (none) */ diff --git a/modules/global-db/dap_chain_global_db_remote.c b/modules/global-db/dap_chain_global_db_remote.c index a0bb5fbdef6f52de7b2982f7479a065c5138a1bb..f5b149e9503e44bb3d655bd88ea4195c583b0428 100644 --- a/modules/global-db/dap_chain_global_db_remote.c +++ b/modules/global-db/dap_chain_global_db_remote.c @@ -59,8 +59,7 @@ bool dap_db_set_cur_node_addr(uint64_t a_address, char *a_net_name ) */ bool dap_db_set_cur_node_addr_exp(uint64_t a_address, char *a_net_name ) { - time_t l_cur_time = time(NULL); - return dap_db_set_cur_node_addr_common(a_address,a_net_name,l_cur_time); + return dap_db_set_cur_node_addr_common(a_address,a_net_name, time(NULL)); } /** @@ -73,7 +72,7 @@ uint64_t dap_db_get_cur_node_addr(char *a_net_name) { char l_key[DAP_DB_K_MAXKEYLEN], l_key_time[DAP_DB_K_MAXKEYLEN]; uint8_t *l_node_addr_data, *l_node_time_data; - size_t l_node_addr_len = 0, l_node_time_len = 0; +size_t l_node_addr_len = 0, l_node_time_len = 0; uint64_t l_node_addr_ret = 0; time_t l_node_time = 0; @@ -86,10 +85,10 @@ time_t l_node_time = 0; l_node_addr_data = dap_chain_global_db_gr_get(l_key, &l_node_addr_len, GROUP_LOCAL_GENERAL); l_node_time_data = dap_chain_global_db_gr_get(l_key_time, &l_node_time_len, GROUP_LOCAL_GENERAL); - if(l_node_addr_data && l_node_addr_len == sizeof(uint64_t)) + if(l_node_addr_data && (l_node_addr_len == sizeof(uint64_t)) ) l_node_addr_ret = *( (uint64_t *) l_node_addr_data ); - if(l_node_time_data && l_node_time_len == sizeof(time_t)) + if(l_node_time_data && (l_node_time_len == sizeof(time_t)) ) l_node_time = *( (time_t *) l_node_time_data ); DAP_DELETE(l_node_addr_data); @@ -252,38 +251,41 @@ void dap_store_packet_change_id(dap_store_obj_pkt_t *a_pkt, uint64_t a_id) * @param a_store_obj a pointer to the object to be serialized * @return Returns a pointer to the packed sructure if successful, otherwise NULL. */ -dap_store_obj_pkt_t *dap_store_packet_single(pdap_store_obj_t a_store_obj) +dap_store_obj_pkt_t *dap_store_packet_single(dap_store_obj_t *a_store_obj) { +int len; +unsigned char *pdata; + if (!a_store_obj) return NULL; uint32_t l_data_size_out = dap_db_get_size_pdap_store_obj_t(a_store_obj); dap_store_obj_pkt_t *l_pkt = DAP_NEW_SIZE(dap_store_obj_pkt_t, l_data_size_out + sizeof(dap_store_obj_pkt_t)); + + /* Fill packet header */ l_pkt->data_size = l_data_size_out; l_pkt->obj_count = 1; l_pkt->timestamp = 0; - uint32_t l_type = a_store_obj->type; - memcpy(l_pkt->data, &l_type, sizeof(uint32_t)); - uint64_t l_offset = sizeof(uint32_t); - uint16_t l_group_size = (uint16_t) dap_strlen(a_store_obj->group); - memcpy(l_pkt->data + l_offset, &l_group_size, sizeof(uint16_t)); - l_offset += sizeof(uint16_t); - memcpy(l_pkt->data + l_offset, a_store_obj->group, l_group_size); - l_offset += l_group_size; - memcpy(l_pkt->data + l_offset, &a_store_obj->id, sizeof(uint64_t)); - l_offset += sizeof(uint64_t); - memcpy(l_pkt->data + l_offset, &a_store_obj->timestamp, sizeof(uint64_t)); - l_offset += sizeof(uint64_t); - uint16_t l_key_size = (uint16_t) dap_strlen(a_store_obj->key); - memcpy(l_pkt->data + l_offset, &l_key_size, sizeof(uint16_t)); - l_offset += sizeof(uint16_t); - memcpy(l_pkt->data + l_offset, a_store_obj->key, l_key_size); - l_offset += l_key_size; - memcpy(l_pkt->data + l_offset, &a_store_obj->value_len, sizeof(uint64_t)); - l_offset += sizeof(uint64_t); - memcpy(l_pkt->data + l_offset, a_store_obj->value, a_store_obj->value_len); - l_offset += a_store_obj->value_len; - assert(l_offset == l_data_size_out); + + /* Put serialized data into the payload part of the packet */ + pdata = l_pkt->data; + *( (uint32_t *) pdata) = a_store_obj->type; pdata += sizeof(uint32_t); + + len = dap_strlen(a_store_obj->group); + *( (uint16_t *) pdata) = (uint16_t) len; pdata += sizeof(uint16_t); + memcpy(pdata, a_store_obj->group, len); pdata += len; + + *( (uint64_t *) pdata) = a_store_obj->id; pdata += sizeof(uint64_t); + *( (uint64_t *) pdata) = a_store_obj->timestamp; pdata += sizeof(uint64_t); + + len = dap_strlen(a_store_obj->key); + *( (uint16_t *) pdata) = (uint16_t) len; pdata += sizeof(uint16_t); + memcpy(pdata, a_store_obj->key, len); pdata += len; + + *( (uint64_t *) pdata) = a_store_obj->value_len; pdata += sizeof(uint64_t); + memcpy(pdata, a_store_obj->value, a_store_obj->value_len); pdata += a_store_obj->value_len; + + assert( (pdata - l_pkt->data) == l_data_size_out); return l_pkt; } @@ -347,8 +349,11 @@ dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, siz memcpy((char *)obj->value, pkt->data + offset, obj->value_len); offset += obj->value_len; } + assert(pkt->data_size == offset); + if(store_obj_count) *store_obj_count = count; + return store_obj; } diff --git a/modules/global-db/include/dap_chain_global_db.h b/modules/global-db/include/dap_chain_global_db.h index 4bdbffbd0d337e4efb4d55baa981a50989008e61..dd9eb11e0357b7b58b5535899ff79a7dd7c5da6b 100644 --- a/modules/global-db/include/dap_chain_global_db.h +++ b/modules/global-db/include/dap_chain_global_db.h @@ -48,7 +48,7 @@ void dap_chain_global_db_obj_delete(dap_global_db_obj_t *obj); /** * Delete mass of struct dap_global_db_obj_t */ -void dap_chain_global_db_objs_delete(dap_global_db_obj_t *objs, size_t a_count); +void dap_chain_global_db_objs_delete(dap_global_db_obj_t *a_objs, size_t a_count); int dap_chain_global_db_init(dap_config_t * a_config); @@ -61,7 +61,7 @@ void dap_chain_global_db_add_sync_group(const char *a_group_prefix, dap_global_d void dap_chain_global_db_add_sync_extra_group(const char *a_group_mask, dap_global_db_obj_callback_notify_t a_callback, void *a_arg); dap_list_t *dap_chain_db_get_sync_groups(); dap_list_t *dap_chain_db_get_sync_extra_groups(); -void dap_global_db_obj_track_history(void* a_store_data); +void dap_global_db_obj_track_history(dap_store_obj_t *a_store_data); /** * Get entry from base */ diff --git a/modules/global-db/include/dap_chain_global_db_driver.h b/modules/global-db/include/dap_chain_global_db_driver.h index ce523785ad25e9b9b94f5ceae34eb340d9cf6e9b..ab986ced3fbc764815cf52a6c0840518f2cd8af2 100644 --- a/modules/global-db/include/dap_chain_global_db_driver.h +++ b/modules/global-db/include/dap_chain_global_db_driver.h @@ -45,7 +45,9 @@ typedef struct dap_store_obj { const char *c_key; const uint8_t *value; uint64_t value_len; -}DAP_ALIGN_PACKED dap_store_obj_t, *pdap_store_obj_t; + + +} DAP_ALIGN_PACKED dap_store_obj_t, *pdap_store_obj_t; typedef struct dap_store_obj_pkt { uint64_t timestamp; @@ -87,7 +89,7 @@ int dap_db_driver_flush(void); char* dap_chain_global_db_driver_hash(const uint8_t *data, size_t data_size); -int dap_chain_global_db_driver_apply(pdap_store_obj_t a_store_obj, size_t a_store_count); +int dap_chain_global_db_driver_apply(dap_store_obj_t *a_store_obj, size_t a_store_count); int dap_chain_global_db_driver_add(pdap_store_obj_t a_store_obj, size_t a_store_count); int dap_chain_global_db_driver_delete(pdap_store_obj_t a_store_obj, size_t a_store_count); dap_store_obj_t* dap_chain_global_db_driver_read_last(const char *a_group); diff --git a/modules/global-db/include/dap_chain_global_db_hist.h b/modules/global-db/include/dap_chain_global_db_hist.h index a74fe304f15609d822c33b0bdca6aff353f94339..58e61888e072882061ec97d549e5ed2c1886b205 100644 --- a/modules/global-db/include/dap_chain_global_db_hist.h +++ b/modules/global-db/include/dap_chain_global_db_hist.h @@ -19,7 +19,7 @@ typedef struct dap_global_db_hist { } dap_global_db_hist_t; //Add data to the history log -bool dap_db_history_add(char a_type, pdap_store_obj_t a_store_obj, size_t a_dap_store_count, const char *a_group); +bool dap_db_history_add(uint32_t a_type, dap_store_obj_t *a_store_obj, size_t a_dap_store_count, const char *a_group); // for dap_db_log_list_xxx() diff --git a/modules/global-db/include/dap_chain_global_db_remote.h b/modules/global-db/include/dap_chain_global_db_remote.h index 7e63ac02b733e85c6f07e49433e403065908a176..a54ee5f9e751c84da258fb1213c17df3fb24a9b3 100644 --- a/modules/global-db/include/dap_chain_global_db_remote.h +++ b/modules/global-db/include/dap_chain_global_db_remote.h @@ -19,7 +19,7 @@ bool dap_db_set_last_hash_remote(uint64_t a_node_addr, dap_chain_t *a_chain, dap // Get last hash for chain for remote node dap_chain_hash_fast_t *dap_db_get_last_hash_remote(uint64_t a_node_addr, dap_chain_t *a_chain); -dap_store_obj_pkt_t *dap_store_packet_single(pdap_store_obj_t a_store_obj); +dap_store_obj_pkt_t *dap_store_packet_single(dap_store_obj_t *a_store_obj); dap_store_obj_pkt_t *dap_store_packet_multiple(dap_store_obj_pkt_t *a_old_pkt, dap_store_obj_pkt_t *a_new_pkt); dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *a_pkt, size_t *a_store_obj_count); char *dap_store_packet_get_group(dap_store_obj_pkt_t *a_pkt);