From 4f13485ad3c1f99ae96f41a7c6460fffe7c9e40d Mon Sep 17 00:00:00 2001 From: Dmitriy Naidolinskiy <dmitriy.naidolinskiy@demlabs.net> Date: Wed, 5 Jan 2022 05:04:39 +0000 Subject: [PATCH] Feature 5343 --- .../dap_chain_global_db_driver_pgsql.c | 1 + .../dap_chain_global_db_driver_sqlite.c | 151 +++++++++++++----- .../dap_chain_global_db_driver_sqlite.h | 2 + modules/net/dap_chain_node_client.c | 3 +- 4 files changed, 113 insertions(+), 44 deletions(-) diff --git a/modules/global-db/dap_chain_global_db_driver_pgsql.c b/modules/global-db/dap_chain_global_db_driver_pgsql.c index 2722bed420..389e2fa091 100644 --- a/modules/global-db/dap_chain_global_db_driver_pgsql.c +++ b/modules/global-db/dap_chain_global_db_driver_pgsql.c @@ -77,6 +77,7 @@ static void s_pgsql_free_connection(PGconn *a_conn) for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) { if (s_conn_pool[i].conn == a_conn) { s_conn_pool[i].busy = 0; + break; } } pthread_rwlock_unlock(&s_db_rwlock); diff --git a/modules/global-db/dap_chain_global_db_driver_sqlite.c b/modules/global-db/dap_chain_global_db_driver_sqlite.c index a853e58f93..a234a70615 100644 --- a/modules/global-db/dap_chain_global_db_driver_sqlite.c +++ b/modules/global-db/dap_chain_global_db_driver_sqlite.c @@ -41,8 +41,15 @@ #define LOG_TAG "db_sqlite" -static sqlite3 *s_db = NULL; +struct dap_sqlite_conn_pool_item { + sqlite3 *conn; + int busy; +}; + +//static sqlite3 *s_db = NULL; +static sqlite3 *s_trans = NULL; static char *s_filename_db = NULL; +static struct dap_sqlite_conn_pool_item s_conn_pool[DAP_SQLITE_POOL_COUNT]; static pthread_rwlock_t s_db_rwlock = PTHREAD_RWLOCK_INITIALIZER; // Value of one field in the table typedef struct _SQLITE_VALUE_ @@ -77,6 +84,41 @@ typedef struct _SQLITE_ROW_VALUE_ static int dap_db_driver_sqlite_exec(sqlite3 *l_db, const char *l_query, char **l_error_message); +static sqlite3 *s_sqlite_get_connection(void) +{ + if (pthread_rwlock_wrlock(&s_db_rwlock) == EDEADLK) { + return NULL; + } + + sqlite3 *l_ret = NULL; + for (int i = 0; i < DAP_SQLITE_POOL_COUNT; i++) { + if (!s_conn_pool[i].busy) { + l_ret = s_conn_pool[i].conn; + s_conn_pool[i].busy = 1; + break; + } + } + + pthread_rwlock_unlock(&s_db_rwlock); + return l_ret; +} + +static void s_sqlite_free_connection(sqlite3 *a_conn) +{ + if (pthread_rwlock_wrlock(&s_db_rwlock) == EDEADLK) { + return; + } + + for (int i = 0; i < DAP_SQLITE_POOL_COUNT; i++) { + if (s_conn_pool[i].conn == a_conn) { + s_conn_pool[i].busy = 0; + break; + } + } + + pthread_rwlock_unlock(&s_db_rwlock); +} + /** * @brief Initializes a SQLite database. * @note no thread safe @@ -112,13 +154,18 @@ int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks DAP_DEL_Z(l_filename_dir); // Open Sqlite file, create if nessesary char *l_error_message = NULL; - s_db = dap_db_driver_sqlite_open(a_filename_db, SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, &l_error_message); - if(!s_db) { - log_it(L_ERROR, "Can't init sqlite err: \"%s\"", l_error_message); - dap_db_driver_sqlite_free(l_error_message); - l_ret = -3; - } - else { + for (int i = 0; i < DAP_SQLITE_POOL_COUNT; i++) { + s_conn_pool[i].conn = dap_db_driver_sqlite_open(a_filename_db, SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, &l_error_message); + if (!s_conn_pool[i].conn) { + log_it(L_ERROR, "Can't init sqlite err: \"%s\"", l_error_message); + dap_db_driver_sqlite_free(l_error_message); + l_ret = -3; + for(int ii = i - 1; ii >= 0; ii--) { + dap_db_driver_sqlite_close(s_conn_pool[ii].conn); + } + goto end; + } + sqlite3 *s_db = s_conn_pool[i].conn; if(!dap_db_driver_sqlite_set_pragma(s_db, "synchronous", "NORMAL")) // 0 | OFF | 1 | NORMAL | 2 | FULL printf("can't set new synchronous mode\n"); if(!dap_db_driver_sqlite_set_pragma(s_db, "journal_mode", "OFF")) // DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF @@ -126,6 +173,9 @@ int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks if(!dap_db_driver_sqlite_set_pragma(s_db, "page_size", "1024")) // DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF printf("can't set page_size\n"); + s_conn_pool[i].busy = 0; + } + // *PRAGMA page_size = bytes; // page size DB; it is reasonable to make it equal to the size of the disk cluster 4096 // *PRAGMA cache_size = -kibibytes; // by default it is equal to 2000 pages of database // @@ -141,7 +191,7 @@ int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks a_drv_callback->deinit = dap_db_driver_sqlite_deinit; a_drv_callback->flush = dap_db_driver_sqlite_flush; s_filename_db = strdup(a_filename_db); - } +end: return l_ret; } @@ -153,13 +203,14 @@ int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks int dap_db_driver_sqlite_deinit(void) { pthread_rwlock_wrlock(&s_db_rwlock); - if(!s_db){ - pthread_rwlock_unlock(&s_db_rwlock); - return -666; - } - dap_db_driver_sqlite_close(s_db); + for (int i = 0; i < DAP_SQLITE_POOL_COUNT; i++) { + if (s_conn_pool[i].conn) { + dap_db_driver_sqlite_close(s_conn_pool[i].conn); + s_conn_pool[i].busy = 0; + } + } pthread_rwlock_unlock(&s_db_rwlock); - s_db = NULL; + //s_db = NULL; return sqlite3_shutdown(); } @@ -191,14 +242,14 @@ sqlite3* dap_db_driver_sqlite_open(const char *a_filename_utf8, int a_flags, cha { sqlite3 *l_db = NULL; - int l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_FULLMUTEX, NULL); + int l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_NOMUTEX, NULL); // if unable to open the database file if(l_rc == SQLITE_CANTOPEN) { log_it(L_WARNING,"No database on path %s, creating one from scratch", a_filename_utf8); if(l_db) sqlite3_close(l_db); // try to create database - l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_FULLMUTEX| SQLITE_OPEN_CREATE, NULL); + l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_NOMUTEX| SQLITE_OPEN_CREATE, NULL); } if(l_rc != SQLITE_OK) { @@ -269,16 +320,14 @@ bool dap_db_driver_sqlite_set_pragma(sqlite3 *a_db, char *a_param, char *a_mode) int dap_db_driver_sqlite_flush() { log_it(L_DEBUG, "Start flush sqlite data base."); - pthread_rwlock_wrlock(&s_db_rwlock); + sqlite3 *s_db = s_sqlite_get_connection(); if(!s_db){ - pthread_rwlock_unlock(&s_db_rwlock); return -666; } dap_db_driver_sqlite_close(s_db); char *l_error_message = NULL; s_db = dap_db_driver_sqlite_open(s_filename_db, SQLITE_OPEN_READWRITE, &l_error_message); if(!s_db) { - pthread_rwlock_unlock(&s_db_rwlock); log_it(L_ERROR, "Can't init sqlite err: \"%s\"", l_error_message? l_error_message: "UNKNOWN"); dap_db_driver_sqlite_free(l_error_message); return -3; @@ -293,7 +342,6 @@ int dap_db_driver_sqlite_flush() if(!dap_db_driver_sqlite_set_pragma(s_db, "page_size", "1024")) // DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF log_it(L_WARNING, "Can't set page_size\n"); - pthread_rwlock_unlock(&s_db_rwlock); return 0; } @@ -331,6 +379,7 @@ static int dap_db_driver_sqlite_exec(sqlite3 *l_db, const char *l_query, char ** */ static int dap_db_driver_sqlite_create_group_table(const char *a_table_name) { + sqlite3 *s_db = s_sqlite_get_connection(); char *l_error_message = NULL; if(!s_db || !a_table_name) return -1; @@ -343,6 +392,7 @@ static int dap_db_driver_sqlite_create_group_table(const char *a_table_name) log_it(L_ERROR, "Creatу_table : %s\n", l_error_message); dap_db_driver_sqlite_free(l_error_message); DAP_DELETE(l_query); + s_sqlite_free_connection(s_db); return -1; } DAP_DELETE(l_query); @@ -353,9 +403,11 @@ static int dap_db_driver_sqlite_create_group_table(const char *a_table_name) log_it(L_ERROR, "Create unique index : %s\n", l_error_message); dap_db_driver_sqlite_free(l_error_message); DAP_DELETE(l_query); + s_sqlite_free_connection(s_db); return -1; } DAP_DELETE(l_query); + s_sqlite_free_connection(s_db); return 0; } @@ -512,13 +564,14 @@ int dap_db_driver_sqlite_vacuum(sqlite3 *l_db) */ int dap_db_driver_sqlite_start_transaction(void) { + s_trans = s_sqlite_get_connection(); pthread_rwlock_wrlock(&s_db_rwlock); - if(!s_db){ + if(!s_trans){ pthread_rwlock_unlock(&s_db_rwlock); return -666; } - if(SQLITE_OK == dap_db_driver_sqlite_exec(s_db, "BEGIN", NULL)){ + if(SQLITE_OK == dap_db_driver_sqlite_exec(s_trans, "BEGIN", NULL)){ pthread_rwlock_unlock(&s_db_rwlock); return 0; }else{ @@ -535,11 +588,11 @@ int dap_db_driver_sqlite_start_transaction(void) int dap_db_driver_sqlite_end_transaction(void) { pthread_rwlock_wrlock(&s_db_rwlock); - if(!s_db){ + if(!s_trans){ pthread_rwlock_unlock(&s_db_rwlock); return -666; } - if(SQLITE_OK == dap_db_driver_sqlite_exec(s_db, "COMMIT", NULL)){ + if(SQLITE_OK == dap_db_driver_sqlite_exec(s_trans, "COMMIT", NULL)){ pthread_rwlock_unlock(&s_db_rwlock); return 0; }else{ @@ -630,9 +683,8 @@ int dap_db_driver_sqlite_apply_store_obj(dap_store_obj_t *a_store_obj) return -1; } // execute request - pthread_rwlock_wrlock(&s_db_rwlock); + sqlite3 *s_db = s_sqlite_get_connection(); if(!s_db){ - pthread_rwlock_unlock(&s_db_rwlock); return -666; } @@ -662,13 +714,13 @@ int dap_db_driver_sqlite_apply_store_obj(dap_store_obj_t *a_store_obj) // repeat request l_ret = dap_db_driver_sqlite_exec(s_db, l_query, &l_error_message); } - pthread_rwlock_unlock(&s_db_rwlock); // missing database if(l_ret != SQLITE_OK) { log_it(L_ERROR, "sqlite apply error: %s", l_error_message); dap_db_driver_sqlite_free(l_error_message); l_ret = -1; } + s_sqlite_free_connection(s_db); if (a_store_obj->key) DAP_DELETE(a_store_obj->key); dap_db_driver_sqlite_free(l_query); @@ -731,19 +783,18 @@ dap_store_obj_t* dap_db_driver_sqlite_read_last_store_obj(const char *a_group) return NULL; char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group); char *l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' ORDER BY id DESC LIMIT 1", l_table_name); - pthread_rwlock_wrlock(&s_db_rwlock); + sqlite3 *s_db = s_sqlite_get_connection(); if(!s_db){ - pthread_rwlock_unlock(&s_db_rwlock); return NULL; } int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, &l_error_message); - pthread_rwlock_unlock(&s_db_rwlock); sqlite3_free(l_str_query); DAP_DEL_Z(l_table_name); if(l_ret != SQLITE_OK) { //log_it(L_ERROR, "read last l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); dap_db_driver_sqlite_free(l_error_message); + s_sqlite_free_connection(s_db); return NULL; } @@ -760,6 +811,8 @@ dap_store_obj_t* dap_db_driver_sqlite_read_last_store_obj(const char *a_group) dap_db_driver_sqlite_row_free(l_row); dap_db_driver_sqlite_query_free(l_res); + s_sqlite_free_connection(s_db); + return l_obj; } @@ -785,27 +838,27 @@ dap_store_obj_t* dap_db_driver_sqlite_read_cond_store_obj(const char *a_group, u int l_count_out = 0; if(a_count_out) l_count_out = (int)*a_count_out; - char *l_str_query; + char *l_str_query = NULL; if(l_count_out) l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE id>='%lld' ORDER BY id ASC LIMIT %d", l_table_name, a_id, l_count_out); else l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE id>='%lld' ORDER BY id ASC", l_table_name, a_id); - pthread_rwlock_wrlock(&s_db_rwlock); + sqlite3 *s_db = s_sqlite_get_connection(); if(!s_db){ - pthread_rwlock_unlock(&s_db_rwlock); + if (l_str_query) sqlite3_free(l_str_query); return NULL; } int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, &l_error_message); - pthread_rwlock_unlock(&s_db_rwlock); sqlite3_free(l_str_query); DAP_DEL_Z(l_table_name); if(l_ret != SQLITE_OK) { //log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); dap_db_driver_sqlite_free(l_error_message); + s_sqlite_free_connection(s_db); return NULL; } @@ -835,9 +888,11 @@ dap_store_obj_t* dap_db_driver_sqlite_read_cond_store_obj(const char *a_group, u } while(l_row); dap_db_driver_sqlite_query_free(l_res); + s_sqlite_free_connection(s_db); if(a_count_out) *a_count_out = (size_t)l_count_out; + return l_obj; } @@ -851,6 +906,7 @@ dap_store_obj_t* dap_db_driver_sqlite_read_cond_store_obj(const char *a_group, u */ dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out) { + sqlite3 *s_db = s_sqlite_get_connection(); if(!a_group || !s_db) return NULL; dap_store_obj_t *l_obj = NULL; @@ -876,14 +932,13 @@ dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const else l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' ORDER BY id ASC", l_table_name); } - pthread_rwlock_wrlock(&s_db_rwlock); int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, NULL); - pthread_rwlock_unlock(&s_db_rwlock); sqlite3_free(l_str_query); DAP_DEL_Z(l_table_name); if(l_ret != SQLITE_OK) { //log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); + s_sqlite_free_connection(s_db); return NULL; } @@ -913,6 +968,7 @@ dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const } while(l_row); dap_db_driver_sqlite_query_free(l_res); + s_sqlite_free_connection(s_db); if(a_count_out) *a_count_out = l_count_out; @@ -927,16 +983,16 @@ dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const */ dap_list_t* dap_db_driver_sqlite_get_groups_by_mask(const char *a_group_mask) { + sqlite3 *s_db = s_sqlite_get_connection(); if(!a_group_mask || !s_db) return NULL; sqlite3_stmt *l_res; const char *l_str_query = "SELECT name FROM sqlite_master WHERE type ='table' AND name NOT LIKE 'sqlite_%'"; dap_list_t *l_ret_list = NULL; - pthread_rwlock_wrlock(&s_db_rwlock); int l_ret = dap_db_driver_sqlite_query(s_db, (char *)l_str_query, &l_res, NULL); - pthread_rwlock_unlock(&s_db_rwlock); if(l_ret != SQLITE_OK) { //log_it(L_ERROR, "Get tables l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); + s_sqlite_free_connection(s_db); return NULL; } char * l_mask = dap_db_driver_sqlite_make_table_name(a_group_mask); @@ -948,6 +1004,9 @@ dap_list_t* dap_db_driver_sqlite_get_groups_by_mask(const char *a_group_mask) dap_db_driver_sqlite_row_free(l_row); } dap_db_driver_sqlite_query_free(l_res); + + s_sqlite_free_connection(s_db); + return l_ret_list; } @@ -960,20 +1019,20 @@ dap_list_t* dap_db_driver_sqlite_get_groups_by_mask(const char *a_group_mask) */ size_t dap_db_driver_sqlite_read_count_store(const char *a_group, uint64_t a_id) { + sqlite3 *s_db = s_sqlite_get_connection(); sqlite3_stmt *l_res; if(!a_group || ! s_db) return 0; char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group); char *l_str_query = sqlite3_mprintf("SELECT COUNT(*) FROM '%s' WHERE id>='%lld'", l_table_name, a_id); - pthread_rwlock_wrlock(&s_db_rwlock); int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, NULL); - pthread_rwlock_unlock(&s_db_rwlock); sqlite3_free(l_str_query); DAP_DEL_Z(l_table_name); if(l_ret != SQLITE_OK) { //log_it(L_ERROR, "Count l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); + s_sqlite_free_connection(s_db); return 0; } size_t l_ret_val = 0; @@ -983,6 +1042,9 @@ size_t dap_db_driver_sqlite_read_count_store(const char *a_group, uint64_t a_id) dap_db_driver_sqlite_row_free(l_row); } dap_db_driver_sqlite_query_free(l_res); + + s_sqlite_free_connection(s_db); + return l_ret_val; } @@ -995,20 +1057,20 @@ size_t dap_db_driver_sqlite_read_count_store(const char *a_group, uint64_t a_id) */ bool dap_db_driver_sqlite_is_obj(const char *a_group, const char *a_key) { + sqlite3 *s_db = s_sqlite_get_connection(); sqlite3_stmt *l_res; if(!a_group || ! s_db) return false; char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group); char *l_str_query = sqlite3_mprintf("SELECT EXISTS(SELECT * FROM '%s' WHERE key='%s')", l_table_name, a_key); - pthread_rwlock_wrlock(&s_db_rwlock); int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, NULL); - pthread_rwlock_unlock(&s_db_rwlock); sqlite3_free(l_str_query); DAP_DEL_Z(l_table_name); if(l_ret != SQLITE_OK) { //log_it(L_ERROR, "Exists l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); + s_sqlite_free_connection(s_db); return false; } bool l_ret_val = false; @@ -1018,5 +1080,8 @@ bool dap_db_driver_sqlite_is_obj(const char *a_group, const char *a_key) dap_db_driver_sqlite_row_free(l_row); } dap_db_driver_sqlite_query_free(l_res); + + s_sqlite_free_connection(s_db); + return l_ret_val; } diff --git a/modules/global-db/include/dap_chain_global_db_driver_sqlite.h b/modules/global-db/include/dap_chain_global_db_driver_sqlite.h index 7c05f5fd33..aa5e50219b 100644 --- a/modules/global-db/include/dap_chain_global_db_driver_sqlite.h +++ b/modules/global-db/include/dap_chain_global_db_driver_sqlite.h @@ -27,6 +27,8 @@ #include <sqlite3.h> #include "dap_chain_global_db_driver.h" +#define DAP_SQLITE_POOL_COUNT 16 + int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks_t *a_drv_callback); int dap_db_driver_sqlite_deinit(void); diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index f615b7a82d..f748cb3e05 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -233,7 +233,8 @@ static bool s_timer_update_states_callback(void *a_arg) dap_chain_net_t * l_net = l_node_client->net; assert(l_net); // If we do nothing - init sync process - if (l_ch_chain->state == CHAIN_STATE_IDLE && dap_chain_net_sync_trylock(l_net, l_me)) { + //if (l_ch_chain->state == CHAIN_STATE_IDLE && dap_chain_net_sync_trylock(l_net, l_me)) { + if (l_ch_chain->state == CHAIN_STATE_IDLE) { log_it(L_INFO, "Start synchronization process with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr)); dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); -- GitLab