From 16fc98466c00a88899698428c3be32652cbad076 Mon Sep 17 00:00:00 2001 From: "Ruslan (The BadAss SysMan) Laishev" <ruslan.laishev@demlabs.net> Date: Thu, 19 May 2022 18:56:18 +0300 Subject: [PATCH] [*] Async GDB : maintenance commit --- modules/global-db/dap_chain_global_db.c | 194 +++++++++++++----------- 1 file changed, 103 insertions(+), 91 deletions(-) diff --git a/modules/global-db/dap_chain_global_db.c b/modules/global-db/dap_chain_global_db.c index b14d214b8d..04269f5861 100644 --- a/modules/global-db/dap_chain_global_db.c +++ b/modules/global-db/dap_chain_global_db.c @@ -59,7 +59,7 @@ int s_db_drvmode_async , /* S static dap_proc_thread_t *s_global_db_proc_thread; /* Dedicated thread to process GDB Asyn requests */ - +dap_proc_queue_t *s_global_db_proc_queue; int s_db_add_sync_group(dap_list_t **a_grp_list, dap_sync_group_item_t *a_item) @@ -239,72 +239,7 @@ static int s_check_db_version(dap_config_t *g_config) return res; } - -void s_dap_chain_global_db_request_processor(void) -{ - -} - -/** - * @brief Initializes a database by g_config structure. - * @note You should call this function before calling any other functions in this library. - * @param g_config a pointer to the configuration structure - * @return Returns 0 if successful; otherwise, <0. - */ -int dap_chain_global_db_init(dap_config_t * g_config) -{ -int l_rc; -static int is_check_version = 0; -dap_proc_queue_t *l_queue; - - - const char *l_storage_path = dap_config_get_item_str(g_config, "resources", "dap_global_db_path"); - const char *l_driver_name = dap_config_get_item_str_default(g_config, "resources", "dap_global_db_driver", "sqlite"); - - s_track_history = dap_config_get_item_bool_default(g_config, "resources", "dap_global_db_track_history", s_track_history); - - s_db_drvmode_async = dap_config_get_item_bool(g_config, "resources", "dap_global_db_drvmode_async"); - log_it(L_NOTICE,"DB Driver Async mode: %s", s_db_drvmode_async ? "ON": "OFF"); - - s_dap_global_db_debug_more = dap_config_get_item_bool(g_config, "resources", "debug_more"); - - - if( (l_rc = dap_db_driver_init(l_driver_name, l_storage_path, s_db_drvmode_async)) ) - return log_it(L_CRITICAL, "Hadn't initialized DB driver \"%s\" on path \"%s\", code: %d", l_driver_name, l_storage_path, l_rc), l_rc; - - if(!is_check_version){ - - if ( (l_rc = s_check_db_version(g_config)) ) - return log_it(L_ERROR, "GlobalDB version changed, please export or remove old version!"), l_rc; - - is_check_version = true; - } - - log_it(L_NOTICE, "GlobalDB initialized"); - - /* - * Create a dedicated thread to process request to GDB - */ - if ( !(s_global_db_proc_thread = dap_proc_thread_run_custom()) ) - { - log_it(L_ERROR, "Error create dedicated thread for GDB request processing"); - log_it(L_ERROR, "Async DB processing is switched OFF"); - return -EIO; - } - - if ( !(l_queue = dap_proc_queue_create_ext (s_global_db_proc_thread)) ) - { - log_it(L_ERROR, "Error create queue for GDB request processing"); - log_it(L_ERROR, "Async DB processing is switched OFF"); - return -EIO; - } - - l_queue->esocket = dap_events_socket_create_type_queue_ptr_unsafe(NULL, s_dap_chain_global_db_request_processor); - - return l_rc; -} - -static void s_clear_sync_grp(void *a_elm) +static inline void s_clear_sync_grp(void *a_elm) { dap_sync_group_item_t *l_item = (dap_sync_group_item_t *)a_elm; DAP_DELETE(l_item->group_mask); @@ -331,9 +266,7 @@ void dap_chain_global_db_deinit(void) */ int dap_chain_global_db_flush(void) { - int res = dap_db_driver_flush(); - - return res; + return dap_db_driver_flush(); } /** @@ -517,11 +450,11 @@ int l_res = 0; */ uint64_t global_db_gr_del_get_timestamp(const char *a_group, const char *a_key) { - uint64_t l_timestamp = 0; - dap_store_obj_t store_data = { 0 }; - char l_group[DAP_DB$SZ_MAXGROUPNAME]; - size_t l_count_out = 0; - dap_store_obj_t *l_obj; +uint64_t l_timestamp = 0; +dap_store_obj_t store_data = { 0 }; +char l_group[DAP_DB$SZ_MAXGROUPNAME]; +size_t l_count_out = 0; +dap_store_obj_t *l_obj; if(!a_key) return l_timestamp; @@ -593,15 +526,15 @@ dap_store_obj_t* dap_chain_global_db_cond_load(const char *a_group, uint64_t a_f * @param a_records_count_out[out] a number of data * @return If successful, a pointer to data; otherwise NULL. */ -dap_global_db_obj_t* dap_chain_global_db_gr_load(const char *a_group, size_t *a_records_count_out) +dap_global_db_obj_t *dap_chain_global_db_gr_load(const char *a_group, size_t *a_records_count_out) { size_t l_count = 0; dap_store_obj_t *l_store_obj = dap_chain_global_db_driver_read(a_group, NULL, &l_count); + if(!l_store_obj) return NULL; - dap_global_db_obj_t *l_data = DAP_NEW_Z_SIZE(dap_global_db_obj_t, - l_count * sizeof(dap_global_db_obj_t)); + dap_global_db_obj_t *l_data = DAP_NEW_Z_SIZE(dap_global_db_obj_t, l_count * sizeof(dap_global_db_obj_t)); if (!l_data) { dap_store_obj_free(l_store_obj, l_count); return NULL; @@ -618,9 +551,12 @@ dap_global_db_obj_t* dap_chain_global_db_gr_load(const char *a_group, size_t *a_ }; l_valid++; } + dap_store_obj_free(l_store_obj, l_count); + if (a_records_count_out) *a_records_count_out = l_valid; + return l_data; } @@ -642,6 +578,7 @@ void dap_global_db_change_notify(dap_store_obj_t *a_store_data) { dap_store_obj_t *l_obj = a_store_data; dap_list_t *it = s_sync_group_items; + while (it) { for (; it; it = it->next) { dap_sync_group_item_t *l_sync_group_item = (dap_sync_group_item_t *)it->data; @@ -711,14 +648,12 @@ bool dap_chain_global_db_gr_set_ext(const char *a_key, const void *a_value, size */ bool dap_chain_global_db_gr_set(const char *a_key, const void *a_value, size_t a_value_len, const char *a_group) { - uint8_t l_flags = RECORD_COMMON; - return dap_chain_global_db_gr_set_ext(a_key, a_value, a_value_len, a_group, l_flags); + return dap_chain_global_db_gr_set_ext(a_key, a_value, a_value_len, a_group, RECORD_COMMON); } bool dap_chain_global_db_gr_pinned_set(const char *a_key, const void *a_value, size_t a_value_len, const char *a_group) { - uint8_t l_flags = RECORD_PINNED; - return dap_chain_global_db_gr_set_ext(a_key, a_value, a_value_len, a_group, l_flags); + return dap_chain_global_db_gr_set_ext(a_key, a_value, a_value_len, a_group, RECORD_PINNED); } /** @@ -745,10 +680,8 @@ bool dap_chain_global_db_gr_del(const char *a_key, const char *a_group) store_data.key = a_key; store_data.group = (char*)a_group; - int l_res = dap_chain_global_db_driver_delete(&store_data, 1); - if (a_key) { if (l_res >= 0) { // add to Del group @@ -760,6 +693,7 @@ bool dap_chain_global_db_gr_del(const char *a_key, const char *a_group) dap_global_db_change_notify(&store_data); } } + return !l_res; } @@ -777,10 +711,8 @@ dap_store_obj_t *l_store_obj; if(!a_objs_count) return true; - int l_res = dap_chain_global_db_driver_apply(a_store_data, a_objs_count); - l_store_obj = (dap_store_obj_t *)a_store_data; int l_res_del = 0; @@ -854,7 +786,7 @@ char* dap_chain_global_db_hash(const uint8_t *data, size_t data_size) -int dap_global_db_req_enqueue ( +int dap_global_db_req_enqueue ( int a_req_type, const char *a_group, const char *a_key, @@ -864,8 +796,8 @@ int dap_global_db_req_enqueue ( void *a_cb_arg ) { -dap_grobal_db_req_t *l_db_req; int l_rc; +dap_grobal_db_req_t *l_db_req; dap_proc_queue_t *l_que; dap_worker_t *l_wrk; dap_proc_thread_t *l_proc_thd; @@ -887,10 +819,20 @@ dap_events_socket_t *l_es; l_wrk = l_que->esocket->worker; l_proc_thd = l_que->esocket->proc_thread; + if ( !l_wrk && !l_proc_thd ) + return log_it(L_ERROR, "Both <worker> or <proc_thread> contexts are NULL"), -EINVAL; + + if ( l_wrk && l_proc_thd ) + return log_it(L_ERROR, "Both <worker> or <proc_thread> contexts are NOT NULL"), -EINVAL; + + /* So, at this point we should decide a what <event socket> context will be used */ + l_es = l_wrk ? l_wrk->esockets : l_proc_thd->esockets[0]; + if ( !(l_db_req = DAP_NEW_Z(dap_grobal_db_req_t)) ) /* Allocate memory for new DB Request context */ return log_it(L_ERROR, "Cannot allocate memory for DB Request, errno=%d", errno), -errno; - l_db_req->req = a_req_type; /* Fill by inputs data ... */ + /* Fill by data DB Request context - is supposed to be used at executor end */ + l_db_req->req = a_req_type; l_db_req->cb_rtn = a_cb_rtn; l_db_req->cb_arg = a_cb_arg; l_db_req->es = l_es; @@ -900,7 +842,13 @@ dap_events_socket_t *l_es; l_db_req->value = (void *) a_data; l_db_req->value_len = a_data_size; - if ( (l_rc = dap_events_socket_queue_ptr_send_to_input(l_es, l_db_req)) )/* Enqueue DB Request to processor */ + /* + dap_events_socket_queue_ptr_send_to_input( l_worker->queue_callback_gdb_input, ...) + Либо + dap_events_socket_queue_ptr_send_to_input( l_thread->queue_callback_gdb_input, ...) + */ + + if ( (l_rc = dap_events_socket_queue_ptr_send_to_input(l_es->callbacks., l_db_req)) )/* Enqueue DB Request to processor */ { DAP_DELETE(l_db_req); log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_rc); @@ -908,6 +856,70 @@ dap_events_socket_t *l_es; a_req_type, a_group, a_key, a_data, a_data_size); } - return l_rc; } + + + + +static void s_dap_chain_global_db_request_processor(dap_events_socket_t *a_es, void *a_arg) +{ + +} + +/** + * @brief Initializes a database by g_config structure. + * @note You should call this function before calling any other functions in this library. + * @param g_config a pointer to the configuration structure + * @return Returns 0 if successful; otherwise, <0. + */ +int dap_chain_global_db_init(dap_config_t * g_config) +{ +int l_rc; +static int is_check_version = 0; + + const char *l_storage_path = dap_config_get_item_str(g_config, "resources", "dap_global_db_path"); + const char *l_driver_name = dap_config_get_item_str_default(g_config, "resources", "dap_global_db_driver", "sqlite"); + + s_track_history = dap_config_get_item_bool_default(g_config, "resources", "dap_global_db_track_history", s_track_history); + + s_db_drvmode_async = dap_config_get_item_bool(g_config, "resources", "dap_global_db_drvmode_async"); + log_it(L_NOTICE,"DB Driver Async mode: %s", s_db_drvmode_async ? "ON": "OFF"); + + s_dap_global_db_debug_more = dap_config_get_item_bool(g_config, "resources", "debug_more"); + + + if( (l_rc = dap_db_driver_init(l_driver_name, l_storage_path, s_db_drvmode_async)) ) + return log_it(L_CRITICAL, "Hadn't initialized DB driver \"%s\" on path \"%s\", code: %d", l_driver_name, l_storage_path, l_rc), l_rc; + + if(!is_check_version){ + + if ( (l_rc = s_check_db_version(g_config)) ) + return log_it(L_ERROR, "GlobalDB version changed, please export or remove old version!"), l_rc; + + is_check_version = true; + } + + log_it(L_NOTICE, "GlobalDB initialized"); + + /* + * Create a dedicated thread to process request to GDB + */ + if ( !(s_global_db_proc_thread = dap_proc_thread_run_custom()) ) + { + log_it(L_ERROR, "Error create dedicated thread for GDB request processing"); + log_it(L_ERROR, "Async DB processing is switched OFF"); + return -EIO; + } + + if ( !(s_global_db_proc_queue = dap_proc_queue_create_ext (s_global_db_proc_thread)) ) + { + log_it(L_ERROR, "Error create queue for GDB request processing"); + log_it(L_ERROR, "Async DB processing is switched OFF"); + return -EIO; + } + + s_global_db_proc_queue->esocket = dap_events_socket_create_type_queue_ptr_unsafe(NULL, s_dap_chain_global_db_request_processor); + + return l_rc; +} -- GitLab