diff --git a/dap-sdk/core/include/dap_common.h b/dap-sdk/core/include/dap_common.h index d86474fc8db33f75da5db0b1acc4ebc50921b215..769f9c254b53b4e4dfaa41035062c6f7c91f363b 100755 --- a/dap-sdk/core/include/dap_common.h +++ b/dap-sdk/core/include/dap_common.h @@ -123,6 +123,7 @@ typedef uint8_t byte_t; #define DAP_REALLOC(a, b) rprealloc(a,b) #define DAP_DELETE(a) rpfree(a) #define DAP_DUP(a) memcpy(rpmalloc(sizeof(*a)), a, sizeof(*a)) + #define DAP_DUP_SIZE(a, s) memcpy(rpmalloc(s), a, s) #else #define DAP_MALLOC(a) malloc(a) #define DAP_FREE(a) free(a) @@ -139,6 +140,7 @@ typedef uint8_t byte_t; #define DAP_REALLOC(a, b) realloc(a,b) #define DAP_DELETE(a) free(a) #define DAP_DUP(a) memcpy(malloc(sizeof(*a)), a, sizeof(*a)) + #define DAP_DUP_SIZE(a, s) memcpy(malloc(s), a, s) #endif #define DAP_DEL_Z(a) if(a) { DAP_DELETE(a); a=NULL;} diff --git a/modules/global-db/dap_chain_global_db.c b/modules/global-db/dap_chain_global_db.c index 829c573e043ad52c2c514062ad86dadc3243d31b..991c83a21d5fec114b05b29a3a4b080074660798 100644 --- a/modules/global-db/dap_chain_global_db.c +++ b/modules/global-db/dap_chain_global_db.c @@ -277,9 +277,9 @@ uint8_t * dap_chain_global_db_gr_get(const char *a_key, size_t *a_data_len_out, return l_ret_value; } -uint8_t * dap_chain_global_db_get(const char *a_key, size_t *a_data_out) +uint8_t * dap_chain_global_db_get(const char *a_key, size_t *a_data_len_out) { - return dap_chain_global_db_gr_get(a_key, a_data_out, GROUP_LOCAL_GENERAL); + return dap_chain_global_db_gr_get(a_key, a_data_len_out, GROUP_LOCAL_GENERAL); } diff --git a/modules/global-db/dap_chain_global_db_driver_pgsql.c b/modules/global-db/dap_chain_global_db_driver_pgsql.c index fc56c2fc3261b518355f25d0a55b3c0b5cb8fbf6..88553149c20c459287f578309ba531b4f3be55d1 100644 --- a/modules/global-db/dap_chain_global_db_driver_pgsql.c +++ b/modules/global-db/dap_chain_global_db_driver_pgsql.c @@ -1,11 +1,10 @@ /* * Authors: - * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> - * Alexander Lysikov <alexander.lysikov@demlabs.net> + * Roman Khlopkov <roman.khlopkov@demlabs.net> * DeM Labs Inc. https://demlabs.net * CellFrame https://cellframe.net * Sources https://gitlab.demlabs.net/cellframe - * Copyright (c) 2017-2019 + * Copyright (c) 2017-2021 * All rights reserved. This file is part of CellFrame SDK the open source project @@ -28,6 +27,7 @@ #include <string.h> #include <pthread.h> #include <errno.h> +#include <pwd.h> #ifdef DAP_OS_UNIX #include <unistd.h> @@ -41,9 +41,40 @@ #define LOG_TAG "db_pgsql" -static PGconn *s_conn_pool; +struct dap_pgsql_conn_pool_item { + PGconn *conn; + int busy; +}; + +static struct dap_pgsql_conn_pool_item s_conn_pool[DAP_PGSQL_POOL_COUNT]; static pthread_rwlock_t s_db_rwlock = PTHREAD_RWLOCK_INITIALIZER; +static PGconn *s_pgsql_get_connection(void) +{ + PGconn *l_ret = NULL; + pthread_rwlock_rdlock(&s_db_rwlock); + for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) { + if (!s_conn_pool[i].busy) { + l_ret = s_conn_pool[i].conn; + s_conn_pool[i].busy = 1; + break; + } + } + pthread_rwlock_unlock(&s_db_rwlock); + return l_ret; +} + +static void s_pgsql_free_connection(PGconn *a_conn) +{ + pthread_rwlock_rdlock(&s_db_rwlock); + for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) { + if (s_conn_pool[i].conn == a_conn) { + s_conn_pool[i].busy = 0; + } + } + pthread_rwlock_unlock(&s_db_rwlock); +} + /** * SQLite library initialization, no thread safe * @@ -52,244 +83,296 @@ static pthread_rwlock_t s_db_rwlock = PTHREAD_RWLOCK_INITIALIZER; int dap_db_driver_pgsql_init(const char *a_filename_dir, dap_db_driver_callbacks_t *a_drv_callback) { // Check paths and create them if nessesary - if(!dap_dir_test(a_filename_dir)){ + if (!dap_dir_test(a_filename_dir)) { log_it(L_NOTICE, "No directory %s, trying to create...", a_filename_dir); int l_mkdir_ret = dap_mkdir_with_parents(a_filename_dir); int l_errno = errno; - if(!dap_dir_test(a_filename_dir)){ + if (!dap_dir_test(a_filename_dir)) { char l_errbuf[255]; l_errbuf[0] = '\0'; - strerror_r(l_errno,l_errbuf,sizeof(l_errbuf)); + strerror_r(l_errno, l_errbuf, sizeof(l_errbuf)); log_it(L_ERROR, "Can't create directory, error code %d, error string \"%s\"", l_mkdir_ret, l_errbuf); return -1; - }else + } else log_it(L_NOTICE,"Directory created"); } dap_hash_fast_t l_dir_hash; - dap_chain_hash_fast_from_str(a_filename_dir, &l_dir_hash); - char *l_db_name = dap_hash_fast_to_str_new(&l_dir_hash); + dap_hash_fast(a_filename_dir, strlen(a_filename_dir), &l_dir_hash); + char l_db_name[DAP_PGSQL_DBHASHNAME_LEN + 1]; + dap_htoa64(l_db_name, l_dir_hash.raw, DAP_PGSQL_DBHASHNAME_LEN); + l_db_name[DAP_PGSQL_DBHASHNAME_LEN] = '\0'; // Open PostgreSQL database, create if nessesary const char *l_base_conn_str = "dbname = postgres"; - char *l_error_message = NULL; PGconn *l_base_conn = PQconnectdb(l_base_conn_str); if (PQstatus(l_base_conn) != CONNECTION_OK) { - l_error_message = PQerrorMessage(l_base_conn); - log_it(L_ERROR, "Can't init PostgreSQL database: \"%s\"", l_error_message); - DAP_DELETE(l_error_message); + log_it(L_ERROR, "Can't init PostgreSQL database: \"%s\"", PQerrorMessage(l_base_conn)); + PQfinish(l_base_conn); return -2; } - char *l_query_str = dap_strdup_printf("SELECT EXISTS (SELECT * FROM pg_database WHERE datname = 'ololo')");//, l_db_name); + char *l_query_str = dap_strdup_printf("SELECT EXISTS (SELECT * FROM pg_database WHERE datname = '%s')", l_db_name); PGresult *l_res = PQexec(l_base_conn, l_query_str); + DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { log_it(L_ERROR, "Can't read PostgreSQL database: \"%s\"", PQresultErrorMessage(l_res)); + PQclear(l_res); return -3; } + if (*PQgetvalue(l_res, 0, 0) == 'f') { //false, database not exists, than create it + PQclear(l_res); + l_query_str = dap_strdup_printf("DROP TABLESPACE IF EXISTS \"%s\"", l_db_name); + l_res = PQexec(l_base_conn, l_query_str); + DAP_DELETE(l_query_str); + if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { + log_it(L_ERROR, "Drop tablespace failed with message: \"%s\"", PQresultErrorMessage(l_res)); + PQclear(l_res); + return -4; + } + PQclear(l_res); + dap_mkdir_with_parents(a_filename_dir); + chown(a_filename_dir, getpwnam("postgres")->pw_uid, -1); + l_query_str = dap_strdup_printf("CREATE TABLESPACE \"%s\" LOCATION '%s'", l_db_name, a_filename_dir); + l_res = PQexec(l_base_conn, l_query_str); + DAP_DELETE(l_query_str); + if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { + log_it(L_ERROR, "Create tablespace failed with message: \"%s\"", PQresultErrorMessage(l_res)); + PQclear(l_res); + return -5; + } + PQclear(l_res); + l_query_str = dap_strdup_printf("CREATE DATABASE \"%s\" WITH TABLESPACE \"%s\"", l_db_name, l_db_name); + l_res = PQexec(l_base_conn, l_query_str); + DAP_DELETE(l_query_str); + if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { + log_it(L_ERROR, "Create database failed with message: \"%s\"", PQresultErrorMessage(l_res)); + PQclear(l_res); + return -6; + } + } + PQclear(l_res); + PQfinish(l_base_conn); + // Create connection pool for the DAP database + char *l_conn_str = dap_strdup_printf("dbname = %s", l_db_name); + for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) { + s_conn_pool[i].conn = PQconnectdb(l_conn_str); + s_conn_pool[i].busy = 0; + if (PQstatus(s_conn_pool[i].conn) != CONNECTION_OK) { + log_it(L_ERROR, "Can't connect PostgreSQL database: \"%s\"", PQerrorMessage(s_conn_pool[i].conn)); + DAP_DELETE(l_conn_str); + for (int j = 0; j <= i; j++) + PQfinish(s_conn_pool[j].conn); + return -7; + } + } + DAP_DELETE(l_conn_str); + pthread_rwlock_init(&s_db_rwlock, 0); + a_drv_callback->transaction_start = dap_db_driver_pgsql_start_transaction; + a_drv_callback->transaction_end = dap_db_driver_pgsql_end_transaction; + a_drv_callback->apply_store_obj = dap_db_driver_pgsql_apply_store_obj; + a_drv_callback->read_store_obj = dap_db_driver_pgsql_read_store_obj; + //a_drv_callback->read_cond_store_obj = dap_db_driver_sqlite_read_cond_store_obj; + //a_drv_callback->read_last_store_obj = dap_db_driver_sqlite_read_last_store_obj; + //a_drv_callback->get_groups_by_mask = dap_db_driver_sqlite_get_groups_by_mask; + //a_drv_callback->read_count_store = dap_db_driver_sqlite_read_count_store; + //a_drv_callback->is_obj = dap_db_driver_sqlite_is_obj; + //a_drv_callback->deinit = dap_db_driver_sqlite_deinit; + //a_drv_callback->flush = dap_db_driver_sqlite_flush; return 0; } -#if 0 -a_drv_callback->apply_store_obj = dap_db_driver_sqlite_apply_store_obj; -a_drv_callback->read_store_obj = dap_db_driver_sqlite_read_store_obj; -a_drv_callback->read_cond_store_obj = dap_db_driver_sqlite_read_cond_store_obj; -a_drv_callback->read_last_store_obj = dap_db_driver_sqlite_read_last_store_obj; -a_drv_callback->transaction_start = dap_db_driver_sqlite_start_transaction; -a_drv_callback->transaction_end = dap_db_driver_sqlite_end_transaction; -a_drv_callback->get_groups_by_mask = dap_db_driver_sqlite_get_groups_by_mask; -a_drv_callback->read_count_store = dap_db_driver_sqlite_read_count_store; -a_drv_callback->is_obj = dap_db_driver_sqlite_is_obj; -a_drv_callback->deinit = dap_db_driver_sqlite_deinit; -a_drv_callback->flush = dap_db_driver_sqlite_flush; - -/* Set always-secure search path, so malicious users can't take -control. */ -res = PQexec(conn, -"SELECT pg_catalog.set_config('search_path', '', -false)"); -if (PQresultStatus(res) != PGRES_TUPLES_OK) -{ -fprintf(stderr, "SET failed: %s", PQerrorMessage(conn)); -PQclear(res); -exit_nicely(conn); -} -/* -* Should PQclear PGresult whenever it is no longer needed to -avoid memory -* leaks -*/ -PQclear(res); -/* -* Our test case here involves using a cursor, for which we -must be inside -* a transaction block. We could do the whole thing with a -single -* PQexec() of "select * from pg_database", but that's too -trivial to make -* a good example. -*/ -/* Start a transaction block */ -res = PQexec(conn, "BEGIN"); -if (PQresultStatus(res) != PGRES_COMMAND_OK) -{ -fprintf(stderr, "BEGIN command failed: %s", -PQerrorMessage(conn)); -PQclear(res); -exit_nicely(conn); -} -893 -libpq — C Library -PQclear(res); -/* -* Fetch rows from pg_database, the system catalog of databases -*/ -res = PQexec(conn, "DECLARE myportal CURSOR FOR select * from -pg_database"); -if (PQresultStatus(res) != PGRES_COMMAND_OK) -{ -fprintf(stderr, "DECLARE CURSOR failed: %s", -PQerrorMessage(conn)); -PQclear(res); -exit_nicely(conn); -} -PQclear(res); -res = PQexec(conn, "FETCH ALL in myportal"); -if (PQresultStatus(res) != PGRES_TUPLES_OK) -{ -fprintf(stderr, "FETCH ALL failed: %s", -PQerrorMessage(conn)); -PQclear(res); -exit_nicely(conn); -} -/* first, print out the attribute names */ -nFields = PQnfields(res); -for (i = 0; i < nFields; i++) -printf("%-15s", PQfname(res, i)); -printf("\n\n"); -/* next, print out the rows */ -for (i = 0; i < PQntuples(res); i++) + + +int dap_db_driver_pgsql_deinit(void) { -for (j = 0; j < nFields; j++) -printf("%-15s", PQgetvalue(res, i, j)); -printf("\n"); -} -PQclear(res); -/* close the portal ... we don't bother to check for errors ... -*/ -res = PQexec(conn, "CLOSE myportal"); -PQclear(res); -/* end the transaction */ -res = PQexec(conn, "END"); -PQclear(res); -/* close the connection to the database and cleanup */ -PQfinish(conn); -return 0; + pthread_rwlock_wrlock(&s_db_rwlock); + for (int j = 0; j <= DAP_PGSQL_POOL_COUNT; j++) + PQfinish(s_conn_pool[j].conn); + pthread_rwlock_unlock(&s_db_rwlock); + pthread_rwlock_destroy(&s_db_rwlock); + return 0; } - -int dap_db_driver_sqlite_deinit(void) +/** + * Start a transaction + */ +int dap_db_driver_pgsql_start_transaction(void) { - pthread_rwlock_wrlock(&s_db_rwlock); - if(!s_db){ - pthread_rwlock_unlock(&s_db_rwlock); - return -666; - } - dap_db_driver_sqlite_close(s_db); - pthread_rwlock_unlock(&s_db_rwlock); - s_db = NULL; - return sqlite3_shutdown(); + // TODO make a transaction with a single connection from pool + //PGresult *l_res = PQexec(l_conn, "BEGIN"); + return 0; } -// additional function for sqlite to convert byte to number -static void byte_to_bin(sqlite3_context *l_context, int a_argc, sqlite3_value **a_argv) +/** + * End of transaction + */ +int dap_db_driver_pgsql_end_transaction(void) { - const unsigned char *l_text; - if(a_argc != 1) - sqlite3_result_null(l_context); - l_text = (const unsigned char *) sqlite3_value_blob(a_argv[0]); - if(l_text && l_text[0]) - { - int l_result = (int) l_text[0]; - sqlite3_result_int(l_context, l_result); - return; - } - sqlite3_result_null(l_context); + // TODO make a transaction with a single connection from pool + //PGresult *l_res = PQexec(l_conn, "COMMIT"); + return 0; } /** - * Open SQLite database - * a_filename_utf8 - database file name - * a_flags - database access flags (SQLITE_OPEN_READONLY, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE) - * a_error_message[out] - Error messages (the memory requires deletion via sqlite_free ()) + * Create table * - * return: database identifier, NULL when an error occurs. + * return 0 if Ok, else error code */ -sqlite3* dap_db_driver_sqlite_open(const char *a_filename_utf8, int a_flags, char **a_error_message) +static int s_pgsql_create_group_table(const char *a_table_name) { - sqlite3 *l_db = NULL; - - int l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_NOMUTEX, NULL); - // if unable to open the database file - if(l_rc == SQLITE_CANTOPEN) { - log_it(L_WARNING,"No database on path %s, creating one from scratch", a_filename_utf8); - if(l_db) - sqlite3_close(l_db); - // try to create database - l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_NOMUTEX| SQLITE_OPEN_CREATE, NULL); - } - - if(l_rc != SQLITE_OK) { - log_it(L_CRITICAL,"Can't open database on path %s (code %d: \"%s\" )", a_filename_utf8, l_rc, sqlite3_errstr(l_rc)); - if(a_error_message) - *a_error_message = sqlite3_mprintf("Can't open database: %s\n", sqlite3_errmsg(l_db)); - sqlite3_close(l_db); - return NULL; + if (!a_table_name) + return -1; + PGconn *l_conn = s_pgsql_get_connection(); + if (!l_conn) { + log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); + return -2; } - // added user functions - sqlite3_create_function(l_db, "byte_to_bin", 1, SQLITE_UTF8, NULL, &byte_to_bin, NULL, NULL); - return l_db; + int l_ret = 0; + char *l_query_str = dap_strdup_printf("CREATE TABLE \"%s\"" + "(obj_id SERIAL PRIMARY KEY, obj_ts BIGINT, obj_key TEXT UNIQUE, obj_val BYTEA)", + a_table_name); + PGresult *l_res = PQexec(l_conn, l_query_str); + DAP_DELETE(l_query_str); + if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { + log_it(L_ERROR, "Create table failed with message: \"%s\"", PQresultErrorMessage(l_res)); + l_ret = -3; + } + PQclear(l_res); + s_pgsql_free_connection(l_conn); + return l_ret; } /** - * Close the database + * Apply data (write or delete) + * */ -void dap_db_driver_sqlite_close(sqlite3 *l_db) +int dap_db_driver_pgsql_apply_store_obj(dap_store_obj_t *a_store_obj) { - if(l_db) - sqlite3_close(l_db); + if (!a_store_obj || !a_store_obj->group) + return -1; + char *l_query_str = NULL; + int l_ret = 0; + PGresult *l_res = NULL; + PGconn *l_conn = s_pgsql_get_connection(); + if (!l_conn) { + log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); + return -2; + } + if (a_store_obj->type == 'a') { + const char *l_param_vals[2]; + time_t l_ts_to_store = htobe64(a_store_obj->timestamp); + l_param_vals[0] = (const char *)&l_ts_to_store; + l_param_vals[1] = (const char *)a_store_obj->value; + int l_param_lens[2] = {sizeof(time_t), a_store_obj->value_len}; + int l_param_formats[2] = {1, 1}; + l_query_str = dap_strdup_printf("INSERT INTO \"%s\" (obj_ts, obj_key, obj_val) VALUES ($1, '%s', $2) " + "ON CONFLICT (obj_key) DO UPDATE SET " + "obj_ts = EXCLUDED.obj_ts, obj_val = EXCLUDED.obj_val;", + a_store_obj->group, a_store_obj->key); + + // execute add request + l_res = PQexecParams(l_conn, l_query_str, 2, NULL, l_param_vals, l_param_lens, l_param_formats, 0); + if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { + if (a_store_obj->type == 'a' && s_pgsql_create_group_table(a_store_obj->group) == 0) { + PQclear(l_res); + l_res = PQexecParams(l_conn, l_query_str, 2, NULL, l_param_vals, l_param_lens, l_param_formats, 0); + } + if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { + log_it(L_ERROR, "Add object failed with message: \"%s\"", PQresultErrorMessage(l_res)); + l_ret = -3; + } + } + } else if (a_store_obj->type == 'd') { + // delete one record + if (a_store_obj->key) + l_query_str = dap_strdup_printf("DELETE FROM \"%s\" WHERE key = \"%s\"", + a_store_obj->group, a_store_obj->key); + // remove all group + else + l_query_str = dap_strdup_printf("DROP TABLE \"%s\"", a_store_obj->group); + // execute delete request + l_res = PQexec(l_conn, l_query_str); + if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { + log_it(L_ERROR, "Delete object failed with message: \"%s\"", PQresultErrorMessage(l_res)); + l_ret = -4; + } + } + else { + log_it(L_ERROR, "Unknown store_obj type '0x%x'", a_store_obj->type); + s_pgsql_free_connection(l_conn); + return -5; + } + DAP_DELETE(l_query_str); + PQclear(l_res); + s_pgsql_free_connection(l_conn); + return l_ret; } -/* - * Clear the memory allocated via sqlite3_mprintf() - */ -void dap_db_driver_sqlite_free(char *memory) + +static void s_pgsql_fill_object(const char *a_group, dap_store_obj_t *a_obj, PGresult *a_res, int a_row) { - if(memory) - sqlite3_free(memory); + a_obj->group = dap_strdup(a_group); + + for (int i = 0; i < PQnfields(a_res); i++) { + if (i == PQfnumber(a_res, "obj_id")) { + a_obj->id = be32toh(*(uint32_t *)PQgetvalue(a_res, a_row, i)); + } else if (i == PQfnumber(a_res, "obj_ts")) { + a_obj->timestamp = be64toh(*(time_t *)PQgetvalue(a_res, a_row, i)); + } else if ((i == PQfnumber(a_res, "obj_key"))) { + a_obj->key = dap_strdup(PQgetvalue(a_res, a_row, i)); + } else if ((i == PQfnumber(a_res, "obj_val"))) { + a_obj->value_len = PQgetlength(a_res, a_row, i); + a_obj->value = DAP_DUP_SIZE(PQgetvalue(a_res, a_row, i), a_obj->value_len); + } + } } /** - * Set specific pragma statements - * www.sqlite.org/pragma.html + * Read several items * - *PRAGMA page_size = bytes; // page size DB; it is reasonable to make it equal to the size of the disk cluster 4096 - *PRAGMA cache_size = -kibibytes; // by default it is equal to 2000 pages of database - *PRAGMA encoding = "UTF-8"; // default = UTF-8 - *PRAGMA foreign_keys = 1; // default = 0 - *PRAGMA journal_mode = DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF; - *PRAGMA synchronous = 0 | OFF | 1 | NORMAL | 2 | FULL; + * a_group - group name + * a_key - key name, may by NULL, it means reading the whole group + * a_count_out[in], how many items to read, 0 - no limits + * a_count_out[out], how many items was read */ -bool dap_db_driver_sqlite_set_pragma(sqlite3 *a_db, char *a_param, char *a_mode) +dap_store_obj_t *dap_db_driver_pgsql_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out) { - if(!a_param || !a_mode) - { - printf("[sqlite_set_pragma] err!!! no param or mode\n"); - return false; + if (!a_group) + return NULL; + PGconn *l_conn = s_pgsql_get_connection(); + if (!l_conn) { + log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); + return NULL; } - char *l_str_query = sqlite3_mprintf("PRAGMA %s = %s", a_param, a_mode); - int l_rc = dap_db_driver_sqlite_exec(a_db, l_str_query, NULL); // default synchronous=FULL - sqlite3_free(l_str_query); - if(l_rc == SQLITE_OK) - return true; - return false; + char *l_query_str; + if (a_key) { + l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" WHERE obj_key='%s'", a_group, a_key); + } else { + if (a_count_out) + l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" ORDER BY obj_id ASC LIMIT %d", a_group, *a_count_out); + else + l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" ORDER BY obj_id ASC", a_group); + } + + PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1); + DAP_DELETE(l_query_str); + if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { + log_it(L_ERROR, "Read objects failed with message: \"%s\"", PQresultErrorMessage(l_res)); + PQclear(l_res); + return NULL; + } + + // parse reply + size_t l_count = PQntuples(l_res); + dap_store_obj_t *l_obj = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(dap_store_obj_t) * l_count); + for (int i = 0; i < l_count; i++) { + // fill currrent item + dap_store_obj_t *l_obj_cur = l_obj + i; + s_pgsql_fill_object(a_group, l_obj_cur, l_res, i); + } + PQclear(l_res); + if (a_count_out) + *a_count_out = l_count; + return l_obj; } +#if 0 int dap_db_driver_sqlite_flush() { log_it(L_DEBUG, "Start flush sqlite data base."); @@ -321,111 +404,6 @@ int dap_db_driver_sqlite_flush() return 0; } -/** - * Execute SQL query to database that does not return data - * - * return 0 if Ok, else error code >0 - */ -static int dap_db_driver_sqlite_exec(sqlite3 *l_db, const char *l_query, char **l_error_message) -{ - char *l_zErrMsg = NULL; - int l_rc = sqlite3_exec(l_db, l_query, NULL, 0, &l_zErrMsg); - //printf("%s\n",l_query); - if(l_rc != SQLITE_OK) - { - if(l_error_message && l_zErrMsg) - *l_error_message = sqlite3_mprintf("SQL error: %s", l_zErrMsg); - if(l_zErrMsg) - sqlite3_free(l_zErrMsg); - return l_rc; - } - if(l_zErrMsg) - sqlite3_free(l_zErrMsg); - return l_rc; -} - -/** - * Create table - * - * return 0 if Ok, else error code - */ -static int dap_db_driver_sqlite_create_group_table(const char *a_table_name) -{ - char *l_error_message = NULL; - if(!s_db || !a_table_name) - return -1; - char *l_query = - dap_strdup_printf( - "create table if not exists '%s'(id INTEGER NOT NULL PRIMARY KEY, key TEXT KEY, hash BLOB, ts INTEGER KEY, value BLOB)", - a_table_name); - if(dap_db_driver_sqlite_exec(s_db, (const char*) l_query, &l_error_message) != SQLITE_OK) - { - log_it(L_ERROR, "Creatу_table : %s\n", l_error_message); - dap_db_driver_sqlite_free(l_error_message); - DAP_DELETE(l_query); - return -1; - } - DAP_DELETE(l_query); - // create unique index - key - l_query = dap_strdup_printf("create unique index if not exists 'idx_key_%s' ON '%s' (key)", a_table_name, - a_table_name); - if(dap_db_driver_sqlite_exec(s_db, (const char*) l_query, &l_error_message) != SQLITE_OK) { - log_it(L_ERROR, "Create unique index : %s\n", l_error_message); - dap_db_driver_sqlite_free(l_error_message); - DAP_DELETE(l_query); - return -1; - } - DAP_DELETE(l_query); - return 0; -} - -/** - * Prepare SQL query for database - * l_query [in] SQL-string with a query to database, example: - * SELECT * FROM data - * SELECT id, sd FROM data LIMIT 300 - * SELECT id, sd FROM data ORDER BY id ASC/DESC - * SELECT * FROM data WHERE time>449464766900000 and time<449464766910000" - * SELECT * FROM data WHERE hex(sd) LIKE '%370%' - * hex(x'0806') -> '08f6' или quote(sd) -> X'08f6' - * substr(x'031407301210361320690000',3,2) -> x'0730' - * - * CAST(substr(sd,5,2) as TEXT) - * additional function of line to number _uint8 - * byte_to_bin(x'ff') -> 255 - */ -static int dap_db_driver_sqlite_query(sqlite3 *db, char *query, sqlite3_stmt **l_res, char **l_error_message) -{ - const char *pzTail; // OUT: Pointer to unused portion of zSql - int l_rc = sqlite3_prepare_v2(db, query, -1, l_res, &pzTail); - if(l_rc != SQLITE_OK) - { - if(l_error_message) - { - const char *zErrMsg = sqlite3_errmsg(db); - if(zErrMsg) - *l_error_message = sqlite3_mprintf("SQL Query error: %s\n", zErrMsg); - } - return l_rc; - } - return l_rc; -} - -/** - * Clear memory after fetching a string - * - * return 0 if Ok, else -1 - */ -static void dap_db_driver_sqlite_row_free(SQLITE_ROW_VALUE *row) -{ - if(row) { - // delete the whole string - sqlite3_free(row->val); - // delete structure - sqlite3_free(row); - } -} - /** * Selects the next entry from the result of the query and returns an array * @@ -482,235 +460,6 @@ static int dap_db_driver_sqlite_fetch_array(sqlite3_stmt *l_res, SQLITE_ROW_VALU return l_rc; } -/** - * Clear memory when request processing is complete - */ -static bool dap_db_driver_sqlite_query_free(sqlite3_stmt *l_res) -{ - if(!l_res) - return false; - int rc = sqlite3_finalize(l_res); - if(rc != SQLITE_OK) - return false; - return true; -} - -/** - * Convert the array into a string to save to blob - */ -static char* dap_db_driver_get_string_from_blob(uint8_t *blob, int len) -{ - char *str_out; - int ret; - if(!blob) - return NULL; - str_out = (char*) sqlite3_malloc(len * 2 + 1); - ret = (int)dap_bin2hex(str_out, (const void*)blob, (size_t)len); - str_out[len * 2] = 0; - return str_out; - -} - -/** - * Cleaning the database from the deleted data - * - * return 0 if Ok, else error code >0 - */ -int dap_db_driver_sqlite_vacuum(sqlite3 *l_db) -{ - if(!s_db) - return -1; - int l_rc = dap_db_driver_sqlite_exec(l_db, "VACUUM", NULL); - return l_rc; -} - -/** - * Start a transaction - */ -int dap_db_driver_sqlite_start_transaction(void) -{ - pthread_rwlock_wrlock(&s_db_rwlock); - if(!s_db){ - pthread_rwlock_unlock(&s_db_rwlock); - return -666; - } - - if(SQLITE_OK == dap_db_driver_sqlite_exec(s_db, "BEGIN", NULL)){ - pthread_rwlock_unlock(&s_db_rwlock); - return 0; - }else{ - pthread_rwlock_unlock(&s_db_rwlock); - return -1; - } -} - -/** - * End of transaction - */ -int dap_db_driver_sqlite_end_transaction(void) -{ - pthread_rwlock_wrlock(&s_db_rwlock); - if(!s_db){ - pthread_rwlock_unlock(&s_db_rwlock); - return -666; - } - if(SQLITE_OK == dap_db_driver_sqlite_exec(s_db, "COMMIT", NULL)){ - pthread_rwlock_unlock(&s_db_rwlock); - return 0; - }else{ - pthread_rwlock_unlock(&s_db_rwlock); - return -1; - } -} - -char *dap_db_driver_sqlite_make_group_name(const char *a_table_name) -{ - char *l_table_name = dap_strdup(a_table_name); - ssize_t l_table_name_len = (ssize_t)dap_strlen(l_table_name); - const char *l_needle = "_"; - // replace '_' to '.' - while(1){ - char *l_str = dap_strstr_len(l_table_name, l_table_name_len, l_needle); - if(l_str) - *l_str = '.'; - else - break; - } - return l_table_name; -} - -char *dap_db_driver_sqlite_make_table_name(const char *a_group_name) -{ - char *l_group_name = dap_strdup(a_group_name); - ssize_t l_group_name_len = (ssize_t)dap_strlen(l_group_name); - const char *l_needle = "."; - // replace '.' to '_' - while(1){ - char *l_str = dap_strstr_len(l_group_name, l_group_name_len, l_needle); - if(l_str) - *l_str = '_'; - else - break; - } - return l_group_name; -} - -/** - * Apply data (write or delete) - * - */ -int dap_db_driver_sqlite_apply_store_obj(dap_store_obj_t *a_store_obj) -{ - if(!a_store_obj || !a_store_obj->group ) - return -1; - char *l_query = NULL; - char *l_error_message = NULL; - char *l_table_name = dap_db_driver_sqlite_make_table_name(a_store_obj->group); - if(a_store_obj->type == 'a') { - if(!a_store_obj->key || !a_store_obj->value || !a_store_obj->value_len) - return -1; - //dap_chain_hash_fast_t l_hash; - //dap_hash_fast(a_store_obj->value, a_store_obj->value_len, &l_hash); - - char *l_blob_hash = "";//dap_db_driver_get_string_from_blob((uint8_t*) &l_hash, sizeof(dap_chain_hash_fast_t)); - char *l_blob_value = dap_db_driver_get_string_from_blob(a_store_obj->value, (int)a_store_obj->value_len); - //add one record - l_query = sqlite3_mprintf("insert into '%s' values(NULL, '%s', x'%s', '%lld', x'%s')", - l_table_name, a_store_obj->key, l_blob_hash, a_store_obj->timestamp, l_blob_value); - //dap_db_driver_sqlite_free(l_blob_hash); - dap_db_driver_sqlite_free(l_blob_value); - } - else if(a_store_obj->type == 'd') { - //delete one record - if(a_store_obj->key) - l_query = sqlite3_mprintf("delete from '%s' where key = '%s'", - l_table_name, a_store_obj->key); - // remove all group - else { - l_query = sqlite3_mprintf("drop table if exists '%s'", l_table_name); - } - } - else { - log_it(L_ERROR, "Unknown store_obj type '0x%x'", a_store_obj->type); - return -1; - } - // execute request - pthread_rwlock_wrlock(&s_db_rwlock); - if(!s_db){ - pthread_rwlock_unlock(&s_db_rwlock); - return -666; - } - - int l_ret = dap_db_driver_sqlite_exec(s_db, l_query, &l_error_message); - if(l_ret == SQLITE_ERROR) { - dap_db_driver_sqlite_free(l_error_message); - l_error_message = NULL; - // create table - dap_db_driver_sqlite_create_group_table(l_table_name); - // repeat request - l_ret = dap_db_driver_sqlite_exec(s_db, l_query, &l_error_message); - - } - // entry with the same hash is already present - if(l_ret == SQLITE_CONSTRAINT) { - dap_db_driver_sqlite_free(l_error_message); - l_error_message = NULL; - //delete exist record - char *l_query_del = sqlite3_mprintf("delete from '%s' where key = '%s'", l_table_name, a_store_obj->key); - l_ret = dap_db_driver_sqlite_exec(s_db, l_query_del, &l_error_message); - dap_db_driver_sqlite_free(l_query_del); - if(l_ret != SQLITE_OK) { - log_it(L_INFO, "Entry with the same key is already present and can't delete, %s", l_error_message); - dap_db_driver_sqlite_free(l_error_message); - l_error_message = NULL; - } - // repeat request - l_ret = dap_db_driver_sqlite_exec(s_db, l_query, &l_error_message); - } - pthread_rwlock_unlock(&s_db_rwlock); - // missing database - if(l_ret != SQLITE_OK) { - log_it(L_ERROR, "sqlite apply error: %s", l_error_message); - dap_db_driver_sqlite_free(l_error_message); - l_ret = -1; - } - dap_db_driver_sqlite_free(l_query); - DAP_DELETE(l_table_name); - return l_ret; -} - -static void fill_one_item(const char *a_group, dap_store_obj_t *a_obj, SQLITE_ROW_VALUE *a_row) -{ - a_obj->group = dap_strdup(a_group); - - for(int l_iCol = 0; l_iCol < a_row->count; l_iCol++) { - SQLITE_VALUE *l_cur_val = a_row->val + l_iCol; - switch (l_iCol) { - case 0: - if(l_cur_val->type == SQLITE_INTEGER) - a_obj->id = (uint64_t)l_cur_val->val.val_int64; - break; // id - case 1: - if(l_cur_val->type == SQLITE_INTEGER) - a_obj->timestamp = l_cur_val->val.val_int64; - break; // ts - case 2: - if(l_cur_val->type == SQLITE_TEXT) - a_obj->key = dap_strdup(l_cur_val->val.val_str); - break; // key - case 3: - if(l_cur_val->type == SQLITE_BLOB) - { - a_obj->value_len = (size_t) l_cur_val->len; - a_obj->value = DAP_NEW_SIZE(uint8_t, a_obj->value_len); - memcpy(a_obj->value, l_cur_val->val.val_blob, a_obj->value_len); - } - break; // value - } - } - -} - /** * Read last items * @@ -835,83 +584,7 @@ dap_store_obj_t* dap_db_driver_sqlite_read_cond_store_obj(const char *a_group, u return l_obj; } -/** - * Read several items - * - * a_group - group name - * a_key - key name, may by NULL, it means reading the whole group - * a_count_out[in], how many items to read, 0 - no limits - * a_count_out[out], how many items was read - */ -dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out) -{ - if(!a_group || !s_db) - return NULL; - dap_store_obj_t *l_obj = NULL; - sqlite3_stmt *l_res; - char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group); - // no limit - uint64_t l_count_out = 0; - if(a_count_out) - l_count_out = *a_count_out; - char *l_str_query; - if(a_key) { - if(l_count_out) - l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE key='%s' ORDER BY id ASC LIMIT %d", - l_table_name, a_key, l_count_out); - else - l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE key='%s' ORDER BY id ASC", - l_table_name, a_key); - } - else { - if(l_count_out) - l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' ORDER BY id ASC LIMIT %d", - l_table_name, l_count_out); - else - l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' ORDER BY id ASC", l_table_name); - } - pthread_rwlock_wrlock(&s_db_rwlock); - int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, NULL); - pthread_rwlock_unlock(&s_db_rwlock); - - sqlite3_free(l_str_query); - DAP_DEL_Z(l_table_name); - if(l_ret != SQLITE_OK) { - //log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); - return NULL; - } - //int b = qlite3_column_count(s_db); - SQLITE_ROW_VALUE *l_row = NULL; - l_count_out = 0; - uint64_t l_count_sized = 0; - do { - l_ret = dap_db_driver_sqlite_fetch_array(l_res, &l_row); - if(l_ret != SQLITE_ROW && l_ret != SQLITE_DONE) - { - // log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); - } - if(l_ret == SQLITE_ROW && l_row) { - // realloc memory - if(l_count_out >= l_count_sized) { - l_count_sized += 10; - l_obj = DAP_REALLOC(l_obj, sizeof(dap_store_obj_t) * l_count_sized); - memset(l_obj + l_count_out, 0, sizeof(dap_store_obj_t) * (l_count_sized - l_count_out)); - } - // fill currrent item - dap_store_obj_t *l_obj_cur = l_obj + l_count_out; - fill_one_item(a_group, l_obj_cur, l_row); - l_count_out++; - } - dap_db_driver_sqlite_row_free(l_row); - } while(l_row); - - dap_db_driver_sqlite_query_free(l_res); - - if(a_count_out) - *a_count_out = l_count_out; - return l_obj; -} dap_list_t* dap_db_driver_sqlite_get_groups_by_mask(const char *a_group_mask) { diff --git a/modules/global-db/include/dap_chain_global_db.h b/modules/global-db/include/dap_chain_global_db.h index 0cefa57fac338ecbf91aceb9e88f51814dd23091..98129b6a05c45f107c2bd08fdcc747527afdc470 100644 --- a/modules/global-db/include/dap_chain_global_db.h +++ b/modules/global-db/include/dap_chain_global_db.h @@ -62,8 +62,8 @@ void dap_global_db_obj_track_history(void* a_store_data); */ void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group); dap_store_obj_t* dap_chain_global_db_obj_gr_get(const char *a_key, size_t *a_data_len_out, const char *a_group); -uint8_t * dap_chain_global_db_gr_get(const char *a_key, size_t *a_data_out, const char *a_group); -uint8_t * dap_chain_global_db_get(const char *a_key, size_t *a_data_out); +uint8_t * dap_chain_global_db_gr_get(const char *a_key, size_t *a_data_len_out, const char *a_group); +uint8_t * dap_chain_global_db_get(const char *a_key, size_t *a_data_len_out); /** * Set one entry to base diff --git a/modules/global-db/include/dap_chain_global_db_driver_pgsql.h b/modules/global-db/include/dap_chain_global_db_driver_pgsql.h index 72fc70db648d45ebea2a94e357b8127589ff25fb..59f25eae4e300af6aec55ea7719bc25820812fb3 100644 --- a/modules/global-db/include/dap_chain_global_db_driver_pgsql.h +++ b/modules/global-db/include/dap_chain_global_db_driver_pgsql.h @@ -5,4 +5,12 @@ #include "/usr/include/postgresql/libpq-fe.h" #endif +#define DAP_PGSQL_DBHASHNAME_LEN 8 +#define DAP_PGSQL_POOL_COUNT 16 + int dap_db_driver_pgsql_init(const char *a_filename_dir, dap_db_driver_callbacks_t *a_drv_callback); +int dap_db_driver_pgsql_deinit(); +int dap_db_driver_pgsql_start_transaction(void); +int dap_db_driver_pgsql_end_transaction(void); +int dap_db_driver_pgsql_apply_store_obj(dap_store_obj_t *a_store_obj); +dap_store_obj_t *dap_db_driver_pgsql_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out);