diff --git a/dap-sdk/core/src/dap_file_utils.c b/dap-sdk/core/src/dap_file_utils.c index 30b3f7dc98effc0e4a69c5e6a0de728dcf81386f..ef6c15699c5bd3cd016580589b7205aa30220a97 100755 --- a/dap-sdk/core/src/dap_file_utils.c +++ b/dap-sdk/core/src/dap_file_utils.c @@ -160,8 +160,8 @@ int dap_mkdir_with_parents(const char *a_dir_path) #ifdef _WIN32 int result = mkdir(path); #else - int result = mkdir(path, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH | S_IWOTH); - chmod(path, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH | S_IWOTH); + int result = mkdir(path, S_IRWXU | S_IRWXG | S_IRWXO); + chmod(path, S_IRWXU | S_IRWXG | S_IRWXO); #endif if(result == -1) { errno = ENOTDIR; diff --git a/modules/global-db/dap_chain_global_db_driver_pgsql.c b/modules/global-db/dap_chain_global_db_driver_pgsql.c index 88553149c20c459287f578309ba531b4f3be55d1..fa5e6e8fd6e61a2ae89e319d8e2207d07bdc5181 100644 --- a/modules/global-db/dap_chain_global_db_driver_pgsql.c +++ b/modules/global-db/dap_chain_global_db_driver_pgsql.c @@ -28,10 +28,10 @@ #include <pthread.h> #include <errno.h> #include <pwd.h> - -#ifdef DAP_OS_UNIX +#include <fcntl.h> #include <unistd.h> -#endif +#include <sys/stat.h> + #include "dap_common.h" #include "dap_hash.h" #include "dap_file_utils.h" @@ -46,13 +46,16 @@ struct dap_pgsql_conn_pool_item { int busy; }; +static PGconn *s_trans_conn = NULL; static struct dap_pgsql_conn_pool_item s_conn_pool[DAP_PGSQL_POOL_COUNT]; static pthread_rwlock_t s_db_rwlock = PTHREAD_RWLOCK_INITIALIZER; static PGconn *s_pgsql_get_connection(void) { + if (pthread_rwlock_rdlock(s&_db_rwlock) == EDEADLK) { + return s_trans_conn; + } PGconn *l_ret = NULL; - pthread_rwlock_rdlock(&s_db_rwlock); for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) { if (!s_conn_pool[i].busy) { l_ret = s_conn_pool[i].conn; @@ -66,7 +69,9 @@ static PGconn *s_pgsql_get_connection(void) static void s_pgsql_free_connection(PGconn *a_conn) { - pthread_rwlock_rdlock(&s_db_rwlock); + if (pthread_rwlock_rdlock(s&_db_rwlock) == EDEADLK) { + return; + } for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) { if (s_conn_pool[i].conn == a_conn) { s_conn_pool[i].busy = 0; @@ -82,42 +87,29 @@ static void s_pgsql_free_connection(PGconn *a_conn) */ int dap_db_driver_pgsql_init(const char *a_filename_dir, dap_db_driver_callbacks_t *a_drv_callback) { - // Check paths and create them if nessesary - if (!dap_dir_test(a_filename_dir)) { - log_it(L_NOTICE, "No directory %s, trying to create...", a_filename_dir); - int l_mkdir_ret = dap_mkdir_with_parents(a_filename_dir); - int l_errno = errno; - if (!dap_dir_test(a_filename_dir)) { - char l_errbuf[255]; - l_errbuf[0] = '\0'; - strerror_r(l_errno, l_errbuf, sizeof(l_errbuf)); - log_it(L_ERROR, "Can't create directory, error code %d, error string \"%s\"", l_mkdir_ret, l_errbuf); - return -1; - } else - log_it(L_NOTICE,"Directory created"); - } dap_hash_fast_t l_dir_hash; dap_hash_fast(a_filename_dir, strlen(a_filename_dir), &l_dir_hash); char l_db_name[DAP_PGSQL_DBHASHNAME_LEN + 1]; dap_htoa64(l_db_name, l_dir_hash.raw, DAP_PGSQL_DBHASHNAME_LEN); l_db_name[DAP_PGSQL_DBHASHNAME_LEN] = '\0'; - // Open PostgreSQL database, create if nessesary - const char *l_base_conn_str = "dbname = postgres"; - PGconn *l_base_conn = PQconnectdb(l_base_conn_str); - if (PQstatus(l_base_conn) != CONNECTION_OK) { - log_it(L_ERROR, "Can't init PostgreSQL database: \"%s\"", PQerrorMessage(l_base_conn)); - PQfinish(l_base_conn); - return -2; - } - char *l_query_str = dap_strdup_printf("SELECT EXISTS (SELECT * FROM pg_database WHERE datname = '%s')", l_db_name); - PGresult *l_res = PQexec(l_base_conn, l_query_str); - DAP_DELETE(l_query_str); - if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { - log_it(L_ERROR, "Can't read PostgreSQL database: \"%s\"", PQresultErrorMessage(l_res)); - PQclear(l_res); - return -3; - } - if (*PQgetvalue(l_res, 0, 0) == 'f') { //false, database not exists, than create it + if (!dap_dir_test(a_filename_dir) || !readdir(opendir(a_filename_dir))) { + // Create PostgreSQL database + const char *l_base_conn_str = "dbname = postgres"; + PGconn *l_base_conn = PQconnectdb(l_base_conn_str); + if (PQstatus(l_base_conn) != CONNECTION_OK) { + log_it(L_ERROR, "Can't init PostgreSQL database: \"%s\"", PQerrorMessage(l_base_conn)); + PQfinish(l_base_conn); + return -2; + } + char *l_query_str = dap_strdup_printf("DROP DATABASE IF EXISTS \"%s\"", l_db_name); + PGresult *l_res = PQexec(l_base_conn, l_query_str); + DAP_DELETE(l_query_str); + if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { + log_it(L_ERROR, "Drop database failed: \"%s\"", PQresultErrorMessage(l_res)); + PQclear(l_res); + PQfinish(l_base_conn); + return -3; + } PQclear(l_res); l_query_str = dap_strdup_printf("DROP TABLESPACE IF EXISTS \"%s\"", l_db_name); l_res = PQexec(l_base_conn, l_query_str); @@ -125,19 +117,34 @@ int dap_db_driver_pgsql_init(const char *a_filename_dir, dap_db_driver_callbacks if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { log_it(L_ERROR, "Drop tablespace failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); + PQfinish(l_base_conn); return -4; } PQclear(l_res); - dap_mkdir_with_parents(a_filename_dir); - chown(a_filename_dir, getpwnam("postgres")->pw_uid, -1); + // Check paths and create them if nessesary + if (!dap_dir_test(a_filename_dir)) { + log_it(L_NOTICE, "No directory %s, trying to create...", a_filename_dir); + dap_mkdir_with_parents(a_filename_dir); + if (!dap_dir_test(a_filename_dir)) { + char l_errbuf[255]; + l_errbuf[0] = '\0'; + strerror_r(errno, l_errbuf, sizeof(l_errbuf)); + log_it(L_ERROR, "Can't create directory, error code %d, error string \"%s\"", errno, l_errbuf); + return -1; + } + log_it(L_NOTICE,"Directory created"); + chown(a_filename_dir, getpwnam("postgres")->pw_uid, -1); + } l_query_str = dap_strdup_printf("CREATE TABLESPACE \"%s\" LOCATION '%s'", l_db_name, a_filename_dir); l_res = PQexec(l_base_conn, l_query_str); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { log_it(L_ERROR, "Create tablespace failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); + PQfinish(l_base_conn); return -5; } + chmod(a_filename_dir, S_IRWXU | S_IRWXG | S_IRWXO); PQclear(l_res); l_query_str = dap_strdup_printf("CREATE DATABASE \"%s\" WITH TABLESPACE \"%s\"", l_db_name, l_db_name); l_res = PQexec(l_base_conn, l_query_str); @@ -145,11 +152,12 @@ int dap_db_driver_pgsql_init(const char *a_filename_dir, dap_db_driver_callbacks if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { log_it(L_ERROR, "Create database failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); + PQfinish(l_base_conn); return -6; } + PQclear(l_res); + PQfinish(l_base_conn); } - PQclear(l_res); - PQfinish(l_base_conn); // Create connection pool for the DAP database char *l_conn_str = dap_strdup_printf("dbname = %s", l_db_name); for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) { @@ -169,13 +177,13 @@ int dap_db_driver_pgsql_init(const char *a_filename_dir, dap_db_driver_callbacks a_drv_callback->transaction_end = dap_db_driver_pgsql_end_transaction; a_drv_callback->apply_store_obj = dap_db_driver_pgsql_apply_store_obj; a_drv_callback->read_store_obj = dap_db_driver_pgsql_read_store_obj; - //a_drv_callback->read_cond_store_obj = dap_db_driver_sqlite_read_cond_store_obj; - //a_drv_callback->read_last_store_obj = dap_db_driver_sqlite_read_last_store_obj; - //a_drv_callback->get_groups_by_mask = dap_db_driver_sqlite_get_groups_by_mask; - //a_drv_callback->read_count_store = dap_db_driver_sqlite_read_count_store; - //a_drv_callback->is_obj = dap_db_driver_sqlite_is_obj; - //a_drv_callback->deinit = dap_db_driver_sqlite_deinit; - //a_drv_callback->flush = dap_db_driver_sqlite_flush; + a_drv_callback->read_cond_store_obj = dap_db_driver_pgsql_read_cond_store_obj; + a_drv_callback->read_last_store_obj = dap_db_driver_pgsql_read_last_store_obj; + a_drv_callback->get_groups_by_mask = dap_db_driver_pgsql_get_groups_by_mask; + a_drv_callback->read_count_store = dap_db_driver_pgsql_read_count_store; + a_drv_callback->is_obj = dap_db_driver_pgsql_is_obj; + a_drv_callback->deinit = dap_db_driver_pgsql_deinit; + a_drv_callback->flush = dap_db_driver_pgsql_flush; return 0; } @@ -195,8 +203,17 @@ int dap_db_driver_pgsql_deinit(void) */ int dap_db_driver_pgsql_start_transaction(void) { - // TODO make a transaction with a single connection from pool - //PGresult *l_res = PQexec(l_conn, "BEGIN"); + s_trans_conn = s_pgsql_get_connection(); + if (!s_trans_conn) + return -1; + pthread_rwlock_rdlock(&s_db_rwlock); + PGresult *l_res = PQexec(s_trans_conn, "BEGIN"); + if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { + log_it(L_ERROR, "Begin transaction failed with message: \"%s\"", PQresultErrorMessage(l_res)); + pthread_rwlock_unlock(&s_db_rwlock); + s_pgsql_free_connection(s_trans_conn); + s_trans_conn = NULL; + } return 0; } @@ -205,8 +222,15 @@ int dap_db_driver_pgsql_start_transaction(void) */ int dap_db_driver_pgsql_end_transaction(void) { - // TODO make a transaction with a single connection from pool - //PGresult *l_res = PQexec(l_conn, "COMMIT"); + if (s_trans_conn) + return -1; + PGresult *l_res = PQexec(l_trans_conn, "COMMIT"); + if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { + log_it(L_ERROR, "End transaction failed with message: \"%s\"", PQresultErrorMessage(l_res)); + } + pthread_rwlock_unlock(&s_db_rwlock); + s_pgsql_free_connection(s_trans_conn); + s_trans_conn = NULL; return 0; } @@ -215,27 +239,21 @@ int dap_db_driver_pgsql_end_transaction(void) * * return 0 if Ok, else error code */ -static int s_pgsql_create_group_table(const char *a_table_name) +static int s_pgsql_create_group_table(const char *a_table_name, PGconn *a_conn) { if (!a_table_name) return -1; - PGconn *l_conn = s_pgsql_get_connection(); - if (!l_conn) { - log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); - return -2; - } int l_ret = 0; char *l_query_str = dap_strdup_printf("CREATE TABLE \"%s\"" "(obj_id SERIAL PRIMARY KEY, obj_ts BIGINT, obj_key TEXT UNIQUE, obj_val BYTEA)", a_table_name); - PGresult *l_res = PQexec(l_conn, l_query_str); + PGresult *l_res = PQexec(a_conn, l_query_str); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { log_it(L_ERROR, "Create table failed with message: \"%s\"", PQresultErrorMessage(l_res)); l_ret = -3; } PQclear(l_res); - s_pgsql_free_connection(l_conn); return l_ret; } @@ -264,13 +282,13 @@ int dap_db_driver_pgsql_apply_store_obj(dap_store_obj_t *a_store_obj) int l_param_formats[2] = {1, 1}; l_query_str = dap_strdup_printf("INSERT INTO \"%s\" (obj_ts, obj_key, obj_val) VALUES ($1, '%s', $2) " "ON CONFLICT (obj_key) DO UPDATE SET " - "obj_ts = EXCLUDED.obj_ts, obj_val = EXCLUDED.obj_val;", + "obj_id = EXCLUDED.obj_id, obj_ts = EXCLUDED.obj_ts, obj_val = EXCLUDED.obj_val;", a_store_obj->group, a_store_obj->key); // execute add request l_res = PQexecParams(l_conn, l_query_str, 2, NULL, l_param_vals, l_param_lens, l_param_formats, 0); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { - if (a_store_obj->type == 'a' && s_pgsql_create_group_table(a_store_obj->group) == 0) { + if (a_store_obj->type == 'a' && s_pgsql_create_group_table(a_store_obj->group, l_conn) == 0) { PQclear(l_res); l_res = PQexecParams(l_conn, l_query_str, 2, NULL, l_param_vals, l_param_lens, l_param_formats, 0); } @@ -342,15 +360,16 @@ dap_store_obj_t *dap_db_driver_pgsql_read_store_obj(const char *a_group, const c } char *l_query_str; if (a_key) { - l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" WHERE obj_key='%s'", a_group, a_key); + l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" WHERE obj_key = '%s'", a_group, a_key); } else { - if (a_count_out) + if (a_count_out && *a_count_out) l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" ORDER BY obj_id ASC LIMIT %d", a_group, *a_count_out); else l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" ORDER BY obj_id ASC", a_group); } PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1); + s_pgsql_free_connection(l_conn); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { log_it(L_ERROR, "Read objects failed with message: \"%s\"", PQresultErrorMessage(l_res)); @@ -372,138 +391,32 @@ dap_store_obj_t *dap_db_driver_pgsql_read_store_obj(const char *a_group, const c return l_obj; } -#if 0 -int dap_db_driver_sqlite_flush() -{ - log_it(L_DEBUG, "Start flush sqlite data base."); - pthread_rwlock_wrlock(&s_db_rwlock); - if(!s_db){ - pthread_rwlock_unlock(&s_db_rwlock); - return -666; - } - dap_db_driver_sqlite_close(s_db); - char *l_error_message = NULL; - s_db = dap_db_driver_sqlite_open(s_filename_db, SQLITE_OPEN_READWRITE, &l_error_message); - if(!s_db) { - pthread_rwlock_unlock(&s_db_rwlock); - log_it(L_ERROR, "Can't init sqlite err: \"%s\"", l_error_message? l_error_message: "UNKNOWN"); - dap_db_driver_sqlite_free(l_error_message); - return -3; - } -#ifndef _WIN32 - sync(); -#endif - if(!dap_db_driver_sqlite_set_pragma(s_db, "synchronous", "NORMAL")) // 0 | OFF | 1 | NORMAL | 2 | FULL - log_it(L_WARNING, "Can't set new synchronous mode\n"); - if(!dap_db_driver_sqlite_set_pragma(s_db, "journal_mode", "OFF")) // DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF - log_it(L_WARNING, "Can't set new journal mode\n"); - - if(!dap_db_driver_sqlite_set_pragma(s_db, "page_size", "1024")) // DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF - log_it(L_WARNING, "Can't set page_size\n"); - pthread_rwlock_unlock(&s_db_rwlock); - return 0; -} - /** - * Selects the next entry from the result of the query and returns an array - * - * l_res: identifier received in sqlite_query () - * l_row_out [out]: pointer to a column or NULL - * - * return: - * SQLITE_ROW(100) has another row ready - * SQLITE_DONE(101) finished executing, - * SQLITE_CONSTRAINT(19) data is not unique and will not be added - */ -static int dap_db_driver_sqlite_fetch_array(sqlite3_stmt *l_res, SQLITE_ROW_VALUE **l_row_out) -{ - SQLITE_ROW_VALUE *l_row = NULL; - // go to next the string - int l_rc = sqlite3_step(l_res); - if(l_rc == SQLITE_ROW) // SQLITE_ROW(100) or SQLITE_DONE(101) or SQLITE_BUSY(5) - { - int l_iCol; // number of the column in the row - // allocate memory for a row with data - l_row = (SQLITE_ROW_VALUE*) sqlite3_malloc(sizeof(SQLITE_ROW_VALUE)); - int l_count = sqlite3_column_count(l_res); // get the number of columns - // allocate memory for all columns - l_row->val = (SQLITE_VALUE*) sqlite3_malloc(l_count * (int)sizeof(SQLITE_VALUE)); - if(l_row->val) - { - l_row->count = l_count; // number of columns - for(l_iCol = 0; l_iCol < l_row->count; l_iCol++) - { - SQLITE_VALUE *cur_val = l_row->val + l_iCol; - cur_val->len = sqlite3_column_bytes(l_res, l_iCol); // how many bytes will be needed - cur_val->type = (signed char)sqlite3_column_type(l_res, l_iCol); // field type - if(cur_val->type == SQLITE_INTEGER) - { - cur_val->val.val_int64 = sqlite3_column_int64(l_res, l_iCol); - } - else if(cur_val->type == SQLITE_FLOAT) - cur_val->val.val_float = sqlite3_column_double(l_res, l_iCol); - else if(cur_val->type == SQLITE_BLOB) - cur_val->val.val_blob = (const unsigned char*) sqlite3_column_blob(l_res, l_iCol); - else if(cur_val->type == SQLITE_TEXT) - cur_val->val.val_str = (const char*) sqlite3_column_text(l_res, l_iCol); //sqlite3_mprintf("%s",sqlite3_column_text(l_res,iCol)); - else - cur_val->val.val_str = NULL; - } - } - else - l_row->count = 0; // number of columns - } - if(l_row_out) - *l_row_out = l_row; - else - dap_db_driver_sqlite_row_free(l_row); - return l_rc; -} - -/** - * Read last items + * Read last item * * a_group - group name */ -dap_store_obj_t* dap_db_driver_sqlite_read_last_store_obj(const char *a_group) +dap_store_obj_t *dap_db_driver_pgsql_read_last_store_obj(const char *a_group) { - - dap_store_obj_t *l_obj = NULL; - char *l_error_message = NULL; - sqlite3_stmt *l_res; - if(!a_group) + if (!a_group) return NULL; - char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group); - char *l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' ORDER BY id DESC LIMIT 1", l_table_name); - pthread_rwlock_wrlock(&s_db_rwlock); - if(!s_db){ - pthread_rwlock_unlock(&s_db_rwlock); + PGconn *l_conn = s_pgsql_get_connection(); + if (!l_conn) { + log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); return NULL; } - - int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, &l_error_message); - pthread_rwlock_unlock(&s_db_rwlock); - sqlite3_free(l_str_query); - DAP_DEL_Z(l_table_name); - if(l_ret != SQLITE_OK) { - //log_it(L_ERROR, "read last l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); - dap_db_driver_sqlite_free(l_error_message); + char *l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" ORDER BY obj_id DESC LIMIT 1", a_group); + PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1); + s_pgsql_free_connection(l_conn); + DAP_DELETE(l_query_str); + if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { + log_it(L_ERROR, "Read last object failed with message: \"%s\"", PQresultErrorMessage(l_res)); + PQclear(l_res); return NULL; } - - SQLITE_ROW_VALUE *l_row = NULL; - l_ret = dap_db_driver_sqlite_fetch_array(l_res, &l_row); - if(l_ret != SQLITE_ROW && l_ret != SQLITE_DONE) - { - //log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); - } - if(l_ret == SQLITE_ROW && l_row) { - l_obj = DAP_NEW_Z(dap_store_obj_t); - fill_one_item(a_group, l_obj, l_row); - } - dap_db_driver_sqlite_row_free(l_row); - dap_db_driver_sqlite_query_free(l_res); - + dap_store_obj_t *l_obj = DAP_NEW_Z(dap_store_obj_t); + s_pgsql_fill_object(a_group, l_obj, l_res, 0); + PQclear(l_res); return l_obj; } @@ -515,156 +428,144 @@ dap_store_obj_t* dap_db_driver_sqlite_read_last_store_obj(const char *a_group) * a_count_out[in], how many items to read, 0 - no limits * a_count_out[out], how many items was read */ -dap_store_obj_t* dap_db_driver_sqlite_read_cond_store_obj(const char *a_group, uint64_t a_id, size_t *a_count_out) +dap_store_obj_t *dap_db_driver_pgsql_read_cond_store_obj(const char *a_group, uint64_t a_id, size_t *a_count_out) { - dap_store_obj_t *l_obj = NULL; - char *l_error_message = NULL; - sqlite3_stmt *l_res; - if(!a_group) + if (!a_group) return NULL; - - char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group); - // no limit - int l_count_out = 0; - if(a_count_out) - l_count_out = (int)*a_count_out; - char *l_str_query; - if(l_count_out) - l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE id>='%lld' ORDER BY id ASC LIMIT %d", - l_table_name, a_id, l_count_out); - else - l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE id>='%lld' ORDER BY id ASC", - l_table_name, a_id); - pthread_rwlock_wrlock(&s_db_rwlock); - if(!s_db){ - pthread_rwlock_unlock(&s_db_rwlock); + PGconn *l_conn = s_pgsql_get_connection(); + if (!l_conn) { + log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); return NULL; } - - int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, &l_error_message); - pthread_rwlock_unlock(&s_db_rwlock); - sqlite3_free(l_str_query); - DAP_DEL_Z(l_table_name); - - if(l_ret != SQLITE_OK) { - //log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); - dap_db_driver_sqlite_free(l_error_message); + char *l_query_str; + if (a_count_out && *a_count_out) + l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" WHERE obj_id >= '%"DAP_UINT64_FORMAT_U"' " + "ORDER BY obj_id ASC LIMIT %d", a_group, a_id, *a_count_out); + else + l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" WHERE obj_id >= '%"DAP_UINT64_FORMAT_U"' " + "ORDER BY obj_id ASC", a_group, a_id); + PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1); + s_pgsql_free_connection(l_conn); + DAP_DELETE(l_query_str); + if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { + log_it(L_ERROR, "Conditional read objects failed with message: \"%s\"", PQresultErrorMessage(l_res)); + PQclear(l_res); return NULL; } - //int b = qlite3_column_count(s_db); - SQLITE_ROW_VALUE *l_row = NULL; - l_count_out = 0; - int l_count_sized = 0; - do { - l_ret = dap_db_driver_sqlite_fetch_array(l_res, &l_row); - if(l_ret != SQLITE_ROW && l_ret != SQLITE_DONE) - { - // log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); - } - if(l_ret == SQLITE_ROW && l_row) { - // realloc memory - if(l_count_out >= l_count_sized) { - l_count_sized += 10; - l_obj = DAP_REALLOC(l_obj, sizeof(dap_store_obj_t) * (uint64_t)l_count_sized); - memset(l_obj + l_count_out, 0, sizeof(dap_store_obj_t) * (uint64_t)(l_count_sized - l_count_out)); - } - // fill current item - dap_store_obj_t *l_obj_cur = l_obj + l_count_out; - fill_one_item(a_group, l_obj_cur, l_row); - l_count_out++; - } - dap_db_driver_sqlite_row_free(l_row); - } while(l_row); - - dap_db_driver_sqlite_query_free(l_res); - - if(a_count_out) - *a_count_out = (size_t)l_count_out; + // parse reply + size_t l_count = PQntuples(l_res); + dap_store_obj_t *l_obj = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(dap_store_obj_t) * l_count); + for (int i = 0; i < l_count; i++) { + // fill currrent item + dap_store_obj_t *l_obj_cur = l_obj + i; + s_pgsql_fill_object(a_group, l_obj_cur, l_res, i); + } + PQclear(l_res); + if (a_count_out) + *a_count_out = l_count; return l_obj; } - -dap_list_t* dap_db_driver_sqlite_get_groups_by_mask(const char *a_group_mask) +dap_list_t *dap_db_driver_pgsql_get_groups_by_mask(const char *a_group_mask) { - if(!a_group_mask || !s_db) + if (!a_group_mask) return NULL; - sqlite3_stmt *l_res; - const char *l_str_query = "SELECT name FROM sqlite_master WHERE type ='table' AND name NOT LIKE 'sqlite_%'"; - dap_list_t *l_ret_list = NULL; - pthread_rwlock_wrlock(&s_db_rwlock); - int l_ret = dap_db_driver_sqlite_query(s_db, (char *)l_str_query, &l_res, NULL); - pthread_rwlock_unlock(&s_db_rwlock); - if(l_ret != SQLITE_OK) { - //log_it(L_ERROR, "Get tables l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); + PGconn *l_conn = s_pgsql_get_connection(); + if (!l_conn) { + log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); return NULL; } - char * l_mask = dap_db_driver_sqlite_make_table_name(a_group_mask); - SQLITE_ROW_VALUE *l_row = NULL; - while (dap_db_driver_sqlite_fetch_array(l_res, &l_row) == SQLITE_ROW && l_row) { - char *l_table_name = (char *)l_row->val->val.val_str; - if(!dap_fnmatch(l_mask, l_table_name, 0)) - l_ret_list = dap_list_prepend(l_ret_list, dap_db_driver_sqlite_make_group_name(l_table_name)); - dap_db_driver_sqlite_row_free(l_row); + const char *l_query_str = "SELECT tablename FROM pg_catalog.pg_tables WHERE " + "schemaname != 'information_schema' AND schemaname != 'pg_catalog'"; + PGresult *l_res = PQexec(l_conn, l_query_str); + s_pgsql_free_connection(l_conn); + if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { + log_it(L_ERROR, "Read tables failed with message: \"%s\"", PQresultErrorMessage(l_res)); + PQclear(l_res); + return NULL; } - dap_db_driver_sqlite_query_free(l_res); + + dap_list_t *l_ret_list = NULL; + for (int i = 0; i < PQntuples(l_res); i++) { + char *l_table_name = (char *)PQgetvalue(l_res, i, 0); + if(!dap_fnmatch(a_group_mask, l_table_name, 0)) + l_ret_list = dap_list_prepend(l_ret_list, l_table_name); + } + PQclear(l_res); return l_ret_list; } -size_t dap_db_driver_sqlite_read_count_store(const char *a_group, uint64_t a_id) +size_t dap_db_driver_pgsql_read_count_store(const char *a_group, uint64_t a_id) { - sqlite3_stmt *l_res; - if(!a_group || ! s_db) + if (!a_group) return 0; - - char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group); - char *l_str_query = sqlite3_mprintf("SELECT COUNT(*) FROM '%s' WHERE id>='%lld'", l_table_name, a_id); - pthread_rwlock_wrlock(&s_db_rwlock); - int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, NULL); - pthread_rwlock_unlock(&s_db_rwlock); - sqlite3_free(l_str_query); - DAP_DEL_Z(l_table_name); - - if(l_ret != SQLITE_OK) { - //log_it(L_ERROR, "Count l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); + PGconn *l_conn = s_pgsql_get_connection(); + if (!l_conn) { + log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); return 0; } - size_t l_ret_val; - SQLITE_ROW_VALUE *l_row = NULL; - if (dap_db_driver_sqlite_fetch_array(l_res, &l_row) == SQLITE_ROW && l_row) { - l_ret_val = (size_t)l_row->val->val.val_int64; - dap_db_driver_sqlite_row_free(l_row); + char *l_query_str= dap_strdup_printf("SELECT count(*) FROM \"%s\" WHERE obj_id >= '%"DAP_UINT64_FORMAT_U"'", + a_group, a_id); + PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1); + s_pgsql_free_connection(l_conn); + DAP_DELETE(l_query_str); + if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { + log_it(L_ERROR, "Count objects failed with message: \"%s\"", PQresultErrorMessage(l_res)); + PQclear(l_res); + return 0; } - dap_db_driver_sqlite_query_free(l_res); - return l_ret_val; + size_t l_ret = be64toh(*(uint64_t *)PQgetvalue(l_res, 0, 0)); + PQclear(l_res); + return l_ret; } -bool dap_db_driver_sqlite_is_obj(const char *a_group, const char *a_key) +bool dap_db_driver_pgsql_is_obj(const char *a_group, const char *a_key) { - sqlite3_stmt *l_res; - if(!a_group || ! s_db) - return false; - - char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group); - char *l_str_query = sqlite3_mprintf("SELECT EXISTS(SELECT * FROM '%s' WHERE key='%s')", l_table_name, a_key); - pthread_rwlock_wrlock(&s_db_rwlock); - int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, NULL); - pthread_rwlock_unlock(&s_db_rwlock); - sqlite3_free(l_str_query); - DAP_DEL_Z(l_table_name); + if (!a_group) + return NULL; + PGconn *l_conn = s_pgsql_get_connection(); + if (!l_conn) { + log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); + return NULL; + } + char *l_query_str = dap_strdup_printf("SELECT EXISTS(SELECT * FROM \"%s\" WHERE obj_key = '%s')", a_group, a_key); + PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1); + s_pgsql_free_connection(l_conn); + DAP_DELETE(l_query_str); + if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { + log_it(L_ERROR, "Count objects failed with message: \"%s\"", PQresultErrorMessage(l_res)); + PQclear(l_res); + return 0; + } + int l_ret = *PQgetvalue(l_res, 0, 0); + PQclear(l_res); + return l_ret; +} - if(l_ret != SQLITE_OK) { - //log_it(L_ERROR, "Exists l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); - return false; +int dap_db_driver_pgsql_flush() +{ + PGconn *l_conn = s_pgsql_get_connection(); + if (!l_conn) { + log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); + return -4; } - bool l_ret_val; - SQLITE_ROW_VALUE *l_row = NULL; - if (dap_db_driver_sqlite_fetch_array(l_res, &l_row) == SQLITE_ROW && l_row) { - l_ret_val = (size_t)l_row->val->val.val_int64; - dap_db_driver_sqlite_row_free(l_row); + int l_ret = 0; + PGresult *l_res = PQexec(l_conn, "CHECKPOINT"); + if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { + log_it(L_ERROR, "Flushing database on disk failed with message: \"%s\"", PQresultErrorMessage(l_res)); + l_ret = -5; } - dap_db_driver_sqlite_query_free(l_res); - return l_ret_val; + PQclear(l_res); + if (!l_ret) { + PGresult *l_res = PQexec(l_conn, "VACUUM"); + if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { + log_it(L_ERROR, "Vaccuming database failed with message: \"%s\"", PQresultErrorMessage(l_res)); + l_ret = -6; + } + PQclear(l_res); + } + s_pgsql_free_connection(l_conn); + return l_ret; } -#endif diff --git a/modules/global-db/include/dap_chain_global_db_driver_pgsql.h b/modules/global-db/include/dap_chain_global_db_driver_pgsql.h index 59f25eae4e300af6aec55ea7719bc25820812fb3..d6c3beb8003616d7a5f403ca1c2e3737bba6ddd4 100644 --- a/modules/global-db/include/dap_chain_global_db_driver_pgsql.h +++ b/modules/global-db/include/dap_chain_global_db_driver_pgsql.h @@ -14,3 +14,9 @@ int dap_db_driver_pgsql_start_transaction(void); int dap_db_driver_pgsql_end_transaction(void); int dap_db_driver_pgsql_apply_store_obj(dap_store_obj_t *a_store_obj); dap_store_obj_t *dap_db_driver_pgsql_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out); +dap_store_obj_t *dap_db_driver_pgsql_read_last_store_obj(const char *a_group); +dap_store_obj_t *dap_db_driver_pgsql_read_cond_store_obj(const char *a_group, uint64_t a_id, size_t *a_count_out); +dap_list_t *dap_db_driver_pgsql_get_groups_by_mask(const char *a_group_mask); +size_t dap_db_driver_pgsql_read_count_store(const char *a_group, uint64_t a_id); +bool dap_db_driver_pgsql_is_obj(const char *a_group, const char *a_key); +int dap_db_driver_pgsql_flush();