Skip to content
Snippets Groups Projects
Commit 11dde2b4 authored by Roman Khlopkov's avatar Roman Khlopkov 🔜
Browse files

[+] PostgreSQL base funcs

parent 15c85621
No related branches found
No related tags found
1 merge request!419Feature 5220
...@@ -123,6 +123,7 @@ typedef uint8_t byte_t; ...@@ -123,6 +123,7 @@ typedef uint8_t byte_t;
#define DAP_REALLOC(a, b) rprealloc(a,b) #define DAP_REALLOC(a, b) rprealloc(a,b)
#define DAP_DELETE(a) rpfree(a) #define DAP_DELETE(a) rpfree(a)
#define DAP_DUP(a) memcpy(rpmalloc(sizeof(*a)), a, sizeof(*a)) #define DAP_DUP(a) memcpy(rpmalloc(sizeof(*a)), a, sizeof(*a))
#define DAP_DUP_SIZE(a, s) memcpy(rpmalloc(s), a, s)
#else #else
#define DAP_MALLOC(a) malloc(a) #define DAP_MALLOC(a) malloc(a)
#define DAP_FREE(a) free(a) #define DAP_FREE(a) free(a)
...@@ -139,6 +140,7 @@ typedef uint8_t byte_t; ...@@ -139,6 +140,7 @@ typedef uint8_t byte_t;
#define DAP_REALLOC(a, b) realloc(a,b) #define DAP_REALLOC(a, b) realloc(a,b)
#define DAP_DELETE(a) free(a) #define DAP_DELETE(a) free(a)
#define DAP_DUP(a) memcpy(malloc(sizeof(*a)), a, sizeof(*a)) #define DAP_DUP(a) memcpy(malloc(sizeof(*a)), a, sizeof(*a))
#define DAP_DUP_SIZE(a, s) memcpy(malloc(s), a, s)
#endif #endif
#define DAP_DEL_Z(a) if(a) { DAP_DELETE(a); a=NULL;} #define DAP_DEL_Z(a) if(a) { DAP_DELETE(a); a=NULL;}
......
...@@ -277,9 +277,9 @@ uint8_t * dap_chain_global_db_gr_get(const char *a_key, size_t *a_data_len_out, ...@@ -277,9 +277,9 @@ uint8_t * dap_chain_global_db_gr_get(const char *a_key, size_t *a_data_len_out,
return l_ret_value; return l_ret_value;
} }
uint8_t * dap_chain_global_db_get(const char *a_key, size_t *a_data_out) uint8_t * dap_chain_global_db_get(const char *a_key, size_t *a_data_len_out)
{ {
return dap_chain_global_db_gr_get(a_key, a_data_out, GROUP_LOCAL_GENERAL); return dap_chain_global_db_gr_get(a_key, a_data_len_out, GROUP_LOCAL_GENERAL);
} }
......
/* /*
* Authors: * Authors:
* Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> * Roman Khlopkov <roman.khlopkov@demlabs.net>
* Alexander Lysikov <alexander.lysikov@demlabs.net>
* DeM Labs Inc. https://demlabs.net * DeM Labs Inc. https://demlabs.net
* CellFrame https://cellframe.net * CellFrame https://cellframe.net
* Sources https://gitlab.demlabs.net/cellframe * Sources https://gitlab.demlabs.net/cellframe
* Copyright (c) 2017-2019 * Copyright (c) 2017-2021
* All rights reserved. * All rights reserved.
This file is part of CellFrame SDK the open source project This file is part of CellFrame SDK the open source project
...@@ -28,6 +27,7 @@ ...@@ -28,6 +27,7 @@
#include <string.h> #include <string.h>
#include <pthread.h> #include <pthread.h>
#include <errno.h> #include <errno.h>
#include <pwd.h>
#ifdef DAP_OS_UNIX #ifdef DAP_OS_UNIX
#include <unistd.h> #include <unistd.h>
...@@ -41,9 +41,40 @@ ...@@ -41,9 +41,40 @@
#define LOG_TAG "db_pgsql" #define LOG_TAG "db_pgsql"
static PGconn *s_conn_pool; struct dap_pgsql_conn_pool_item {
PGconn *conn;
int busy;
};
static struct dap_pgsql_conn_pool_item s_conn_pool[DAP_PGSQL_POOL_COUNT];
static pthread_rwlock_t s_db_rwlock = PTHREAD_RWLOCK_INITIALIZER; static pthread_rwlock_t s_db_rwlock = PTHREAD_RWLOCK_INITIALIZER;
static PGconn *s_pgsql_get_connection(void)
{
PGconn *l_ret = NULL;
pthread_rwlock_rdlock(&s_db_rwlock);
for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) {
if (!s_conn_pool[i].busy) {
l_ret = s_conn_pool[i].conn;
s_conn_pool[i].busy = 1;
break;
}
}
pthread_rwlock_unlock(&s_db_rwlock);
return l_ret;
}
static void s_pgsql_free_connection(PGconn *a_conn)
{
pthread_rwlock_rdlock(&s_db_rwlock);
for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) {
if (s_conn_pool[i].conn == a_conn) {
s_conn_pool[i].busy = 0;
}
}
pthread_rwlock_unlock(&s_db_rwlock);
}
/** /**
* SQLite library initialization, no thread safe * SQLite library initialization, no thread safe
* *
...@@ -52,244 +83,296 @@ static pthread_rwlock_t s_db_rwlock = PTHREAD_RWLOCK_INITIALIZER; ...@@ -52,244 +83,296 @@ static pthread_rwlock_t s_db_rwlock = PTHREAD_RWLOCK_INITIALIZER;
int dap_db_driver_pgsql_init(const char *a_filename_dir, dap_db_driver_callbacks_t *a_drv_callback) int dap_db_driver_pgsql_init(const char *a_filename_dir, dap_db_driver_callbacks_t *a_drv_callback)
{ {
// Check paths and create them if nessesary // Check paths and create them if nessesary
if(!dap_dir_test(a_filename_dir)){ if (!dap_dir_test(a_filename_dir)) {
log_it(L_NOTICE, "No directory %s, trying to create...", a_filename_dir); log_it(L_NOTICE, "No directory %s, trying to create...", a_filename_dir);
int l_mkdir_ret = dap_mkdir_with_parents(a_filename_dir); int l_mkdir_ret = dap_mkdir_with_parents(a_filename_dir);
int l_errno = errno; int l_errno = errno;
if(!dap_dir_test(a_filename_dir)){ if (!dap_dir_test(a_filename_dir)) {
char l_errbuf[255]; char l_errbuf[255];
l_errbuf[0] = '\0'; l_errbuf[0] = '\0';
strerror_r(l_errno,l_errbuf,sizeof(l_errbuf)); strerror_r(l_errno, l_errbuf, sizeof(l_errbuf));
log_it(L_ERROR, "Can't create directory, error code %d, error string \"%s\"", l_mkdir_ret, l_errbuf); log_it(L_ERROR, "Can't create directory, error code %d, error string \"%s\"", l_mkdir_ret, l_errbuf);
return -1; return -1;
}else } else
log_it(L_NOTICE,"Directory created"); log_it(L_NOTICE,"Directory created");
} }
dap_hash_fast_t l_dir_hash; dap_hash_fast_t l_dir_hash;
dap_chain_hash_fast_from_str(a_filename_dir, &l_dir_hash); dap_hash_fast(a_filename_dir, strlen(a_filename_dir), &l_dir_hash);
char *l_db_name = dap_hash_fast_to_str_new(&l_dir_hash); char l_db_name[DAP_PGSQL_DBHASHNAME_LEN + 1];
dap_htoa64(l_db_name, l_dir_hash.raw, DAP_PGSQL_DBHASHNAME_LEN);
l_db_name[DAP_PGSQL_DBHASHNAME_LEN] = '\0';
// Open PostgreSQL database, create if nessesary // Open PostgreSQL database, create if nessesary
const char *l_base_conn_str = "dbname = postgres"; const char *l_base_conn_str = "dbname = postgres";
char *l_error_message = NULL;
PGconn *l_base_conn = PQconnectdb(l_base_conn_str); PGconn *l_base_conn = PQconnectdb(l_base_conn_str);
if (PQstatus(l_base_conn) != CONNECTION_OK) { if (PQstatus(l_base_conn) != CONNECTION_OK) {
l_error_message = PQerrorMessage(l_base_conn); log_it(L_ERROR, "Can't init PostgreSQL database: \"%s\"", PQerrorMessage(l_base_conn));
log_it(L_ERROR, "Can't init PostgreSQL database: \"%s\"", l_error_message); PQfinish(l_base_conn);
DAP_DELETE(l_error_message);
return -2; return -2;
} }
char *l_query_str = dap_strdup_printf("SELECT EXISTS (SELECT * FROM pg_database WHERE datname = 'ololo')");//, l_db_name); char *l_query_str = dap_strdup_printf("SELECT EXISTS (SELECT * FROM pg_database WHERE datname = '%s')", l_db_name);
PGresult *l_res = PQexec(l_base_conn, l_query_str); PGresult *l_res = PQexec(l_base_conn, l_query_str);
DAP_DELETE(l_query_str);
if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { if (PQresultStatus(l_res) != PGRES_TUPLES_OK) {
log_it(L_ERROR, "Can't read PostgreSQL database: \"%s\"", PQresultErrorMessage(l_res)); log_it(L_ERROR, "Can't read PostgreSQL database: \"%s\"", PQresultErrorMessage(l_res));
PQclear(l_res);
return -3; return -3;
} }
if (*PQgetvalue(l_res, 0, 0) == 'f') { //false, database not exists, than create it
PQclear(l_res);
l_query_str = dap_strdup_printf("DROP TABLESPACE IF EXISTS \"%s\"", l_db_name);
l_res = PQexec(l_base_conn, l_query_str);
DAP_DELETE(l_query_str);
if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
log_it(L_ERROR, "Drop tablespace failed with message: \"%s\"", PQresultErrorMessage(l_res));
PQclear(l_res);
return -4;
}
PQclear(l_res);
dap_mkdir_with_parents(a_filename_dir);
chown(a_filename_dir, getpwnam("postgres")->pw_uid, -1);
l_query_str = dap_strdup_printf("CREATE TABLESPACE \"%s\" LOCATION '%s'", l_db_name, a_filename_dir);
l_res = PQexec(l_base_conn, l_query_str);
DAP_DELETE(l_query_str);
if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
log_it(L_ERROR, "Create tablespace failed with message: \"%s\"", PQresultErrorMessage(l_res));
PQclear(l_res);
return -5;
}
PQclear(l_res);
l_query_str = dap_strdup_printf("CREATE DATABASE \"%s\" WITH TABLESPACE \"%s\"", l_db_name, l_db_name);
l_res = PQexec(l_base_conn, l_query_str);
DAP_DELETE(l_query_str);
if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
log_it(L_ERROR, "Create database failed with message: \"%s\"", PQresultErrorMessage(l_res));
PQclear(l_res);
return -6;
}
}
PQclear(l_res);
PQfinish(l_base_conn);
// Create connection pool for the DAP database
char *l_conn_str = dap_strdup_printf("dbname = %s", l_db_name);
for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) {
s_conn_pool[i].conn = PQconnectdb(l_conn_str);
s_conn_pool[i].busy = 0;
if (PQstatus(s_conn_pool[i].conn) != CONNECTION_OK) {
log_it(L_ERROR, "Can't connect PostgreSQL database: \"%s\"", PQerrorMessage(s_conn_pool[i].conn));
DAP_DELETE(l_conn_str);
for (int j = 0; j <= i; j++)
PQfinish(s_conn_pool[j].conn);
return -7;
}
}
DAP_DELETE(l_conn_str);
pthread_rwlock_init(&s_db_rwlock, 0);
a_drv_callback->transaction_start = dap_db_driver_pgsql_start_transaction;
a_drv_callback->transaction_end = dap_db_driver_pgsql_end_transaction;
a_drv_callback->apply_store_obj = dap_db_driver_pgsql_apply_store_obj;
a_drv_callback->read_store_obj = dap_db_driver_pgsql_read_store_obj;
//a_drv_callback->read_cond_store_obj = dap_db_driver_sqlite_read_cond_store_obj;
//a_drv_callback->read_last_store_obj = dap_db_driver_sqlite_read_last_store_obj;
//a_drv_callback->get_groups_by_mask = dap_db_driver_sqlite_get_groups_by_mask;
//a_drv_callback->read_count_store = dap_db_driver_sqlite_read_count_store;
//a_drv_callback->is_obj = dap_db_driver_sqlite_is_obj;
//a_drv_callback->deinit = dap_db_driver_sqlite_deinit;
//a_drv_callback->flush = dap_db_driver_sqlite_flush;
return 0; return 0;
} }
#if 0
a_drv_callback->apply_store_obj = dap_db_driver_sqlite_apply_store_obj;
a_drv_callback->read_store_obj = dap_db_driver_sqlite_read_store_obj; int dap_db_driver_pgsql_deinit(void)
a_drv_callback->read_cond_store_obj = dap_db_driver_sqlite_read_cond_store_obj;
a_drv_callback->read_last_store_obj = dap_db_driver_sqlite_read_last_store_obj;
a_drv_callback->transaction_start = dap_db_driver_sqlite_start_transaction;
a_drv_callback->transaction_end = dap_db_driver_sqlite_end_transaction;
a_drv_callback->get_groups_by_mask = dap_db_driver_sqlite_get_groups_by_mask;
a_drv_callback->read_count_store = dap_db_driver_sqlite_read_count_store;
a_drv_callback->is_obj = dap_db_driver_sqlite_is_obj;
a_drv_callback->deinit = dap_db_driver_sqlite_deinit;
a_drv_callback->flush = dap_db_driver_sqlite_flush;
/* Set always-secure search path, so malicious users can't take
control. */
res = PQexec(conn,
"SELECT pg_catalog.set_config('search_path', '',
false)");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, "SET failed: %s", PQerrorMessage(conn));
PQclear(res);
exit_nicely(conn);
}
/*
* Should PQclear PGresult whenever it is no longer needed to
avoid memory
* leaks
*/
PQclear(res);
/*
* Our test case here involves using a cursor, for which we
must be inside
* a transaction block. We could do the whole thing with a
single
* PQexec() of "select * from pg_database", but that's too
trivial to make
* a good example.
*/
/* Start a transaction block */
res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "BEGIN command failed: %s",
PQerrorMessage(conn));
PQclear(res);
exit_nicely(conn);
}
893
libpq — C Library
PQclear(res);
/*
* Fetch rows from pg_database, the system catalog of databases
*/
res = PQexec(conn, "DECLARE myportal CURSOR FOR select * from
pg_database");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "DECLARE CURSOR failed: %s",
PQerrorMessage(conn));
PQclear(res);
exit_nicely(conn);
}
PQclear(res);
res = PQexec(conn, "FETCH ALL in myportal");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, "FETCH ALL failed: %s",
PQerrorMessage(conn));
PQclear(res);
exit_nicely(conn);
}
/* first, print out the attribute names */
nFields = PQnfields(res);
for (i = 0; i < nFields; i++)
printf("%-15s", PQfname(res, i));
printf("\n\n");
/* next, print out the rows */
for (i = 0; i < PQntuples(res); i++)
{ {
for (j = 0; j < nFields; j++) pthread_rwlock_wrlock(&s_db_rwlock);
printf("%-15s", PQgetvalue(res, i, j)); for (int j = 0; j <= DAP_PGSQL_POOL_COUNT; j++)
printf("\n"); PQfinish(s_conn_pool[j].conn);
} pthread_rwlock_unlock(&s_db_rwlock);
PQclear(res); pthread_rwlock_destroy(&s_db_rwlock);
/* close the portal ... we don't bother to check for errors ... return 0;
*/
res = PQexec(conn, "CLOSE myportal");
PQclear(res);
/* end the transaction */
res = PQexec(conn, "END");
PQclear(res);
/* close the connection to the database and cleanup */
PQfinish(conn);
return 0;
} }
/**
int dap_db_driver_sqlite_deinit(void) * Start a transaction
*/
int dap_db_driver_pgsql_start_transaction(void)
{ {
pthread_rwlock_wrlock(&s_db_rwlock); // TODO make a transaction with a single connection from pool
if(!s_db){ //PGresult *l_res = PQexec(l_conn, "BEGIN");
pthread_rwlock_unlock(&s_db_rwlock); return 0;
return -666;
}
dap_db_driver_sqlite_close(s_db);
pthread_rwlock_unlock(&s_db_rwlock);
s_db = NULL;
return sqlite3_shutdown();
} }
// additional function for sqlite to convert byte to number /**
static void byte_to_bin(sqlite3_context *l_context, int a_argc, sqlite3_value **a_argv) * End of transaction
*/
int dap_db_driver_pgsql_end_transaction(void)
{ {
const unsigned char *l_text; // TODO make a transaction with a single connection from pool
if(a_argc != 1) //PGresult *l_res = PQexec(l_conn, "COMMIT");
sqlite3_result_null(l_context); return 0;
l_text = (const unsigned char *) sqlite3_value_blob(a_argv[0]);
if(l_text && l_text[0])
{
int l_result = (int) l_text[0];
sqlite3_result_int(l_context, l_result);
return;
}
sqlite3_result_null(l_context);
} }
/** /**
* Open SQLite database * Create table
* a_filename_utf8 - database file name
* a_flags - database access flags (SQLITE_OPEN_READONLY, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE)
* a_error_message[out] - Error messages (the memory requires deletion via sqlite_free ())
* *
* return: database identifier, NULL when an error occurs. * return 0 if Ok, else error code
*/ */
sqlite3* dap_db_driver_sqlite_open(const char *a_filename_utf8, int a_flags, char **a_error_message) static int s_pgsql_create_group_table(const char *a_table_name)
{ {
sqlite3 *l_db = NULL; if (!a_table_name)
return -1;
int l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_NOMUTEX, NULL); PGconn *l_conn = s_pgsql_get_connection();
// if unable to open the database file if (!l_conn) {
if(l_rc == SQLITE_CANTOPEN) { log_it(L_ERROR, "Can't pick PostgreSQL connection from pool");
log_it(L_WARNING,"No database on path %s, creating one from scratch", a_filename_utf8); return -2;
if(l_db)
sqlite3_close(l_db);
// try to create database
l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_NOMUTEX| SQLITE_OPEN_CREATE, NULL);
}
if(l_rc != SQLITE_OK) {
log_it(L_CRITICAL,"Can't open database on path %s (code %d: \"%s\" )", a_filename_utf8, l_rc, sqlite3_errstr(l_rc));
if(a_error_message)
*a_error_message = sqlite3_mprintf("Can't open database: %s\n", sqlite3_errmsg(l_db));
sqlite3_close(l_db);
return NULL;
} }
// added user functions int l_ret = 0;
sqlite3_create_function(l_db, "byte_to_bin", 1, SQLITE_UTF8, NULL, &byte_to_bin, NULL, NULL); char *l_query_str = dap_strdup_printf("CREATE TABLE \"%s\""
return l_db; "(obj_id SERIAL PRIMARY KEY, obj_ts BIGINT, obj_key TEXT UNIQUE, obj_val BYTEA)",
a_table_name);
PGresult *l_res = PQexec(l_conn, l_query_str);
DAP_DELETE(l_query_str);
if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
log_it(L_ERROR, "Create table failed with message: \"%s\"", PQresultErrorMessage(l_res));
l_ret = -3;
}
PQclear(l_res);
s_pgsql_free_connection(l_conn);
return l_ret;
} }
/** /**
* Close the database * Apply data (write or delete)
*
*/ */
void dap_db_driver_sqlite_close(sqlite3 *l_db) int dap_db_driver_pgsql_apply_store_obj(dap_store_obj_t *a_store_obj)
{ {
if(l_db) if (!a_store_obj || !a_store_obj->group)
sqlite3_close(l_db); return -1;
char *l_query_str = NULL;
int l_ret = 0;
PGresult *l_res = NULL;
PGconn *l_conn = s_pgsql_get_connection();
if (!l_conn) {
log_it(L_ERROR, "Can't pick PostgreSQL connection from pool");
return -2;
}
if (a_store_obj->type == 'a') {
const char *l_param_vals[2];
time_t l_ts_to_store = htobe64(a_store_obj->timestamp);
l_param_vals[0] = (const char *)&l_ts_to_store;
l_param_vals[1] = (const char *)a_store_obj->value;
int l_param_lens[2] = {sizeof(time_t), a_store_obj->value_len};
int l_param_formats[2] = {1, 1};
l_query_str = dap_strdup_printf("INSERT INTO \"%s\" (obj_ts, obj_key, obj_val) VALUES ($1, '%s', $2) "
"ON CONFLICT (obj_key) DO UPDATE SET "
"obj_ts = EXCLUDED.obj_ts, obj_val = EXCLUDED.obj_val;",
a_store_obj->group, a_store_obj->key);
// execute add request
l_res = PQexecParams(l_conn, l_query_str, 2, NULL, l_param_vals, l_param_lens, l_param_formats, 0);
if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
if (a_store_obj->type == 'a' && s_pgsql_create_group_table(a_store_obj->group) == 0) {
PQclear(l_res);
l_res = PQexecParams(l_conn, l_query_str, 2, NULL, l_param_vals, l_param_lens, l_param_formats, 0);
}
if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
log_it(L_ERROR, "Add object failed with message: \"%s\"", PQresultErrorMessage(l_res));
l_ret = -3;
}
}
} else if (a_store_obj->type == 'd') {
// delete one record
if (a_store_obj->key)
l_query_str = dap_strdup_printf("DELETE FROM \"%s\" WHERE key = \"%s\"",
a_store_obj->group, a_store_obj->key);
// remove all group
else
l_query_str = dap_strdup_printf("DROP TABLE \"%s\"", a_store_obj->group);
// execute delete request
l_res = PQexec(l_conn, l_query_str);
if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
log_it(L_ERROR, "Delete object failed with message: \"%s\"", PQresultErrorMessage(l_res));
l_ret = -4;
}
}
else {
log_it(L_ERROR, "Unknown store_obj type '0x%x'", a_store_obj->type);
s_pgsql_free_connection(l_conn);
return -5;
}
DAP_DELETE(l_query_str);
PQclear(l_res);
s_pgsql_free_connection(l_conn);
return l_ret;
} }
/*
* Clear the memory allocated via sqlite3_mprintf() static void s_pgsql_fill_object(const char *a_group, dap_store_obj_t *a_obj, PGresult *a_res, int a_row)
*/
void dap_db_driver_sqlite_free(char *memory)
{ {
if(memory) a_obj->group = dap_strdup(a_group);
sqlite3_free(memory);
for (int i = 0; i < PQnfields(a_res); i++) {
if (i == PQfnumber(a_res, "obj_id")) {
a_obj->id = be32toh(*(uint32_t *)PQgetvalue(a_res, a_row, i));
} else if (i == PQfnumber(a_res, "obj_ts")) {
a_obj->timestamp = be64toh(*(time_t *)PQgetvalue(a_res, a_row, i));
} else if ((i == PQfnumber(a_res, "obj_key"))) {
a_obj->key = dap_strdup(PQgetvalue(a_res, a_row, i));
} else if ((i == PQfnumber(a_res, "obj_val"))) {
a_obj->value_len = PQgetlength(a_res, a_row, i);
a_obj->value = DAP_DUP_SIZE(PQgetvalue(a_res, a_row, i), a_obj->value_len);
}
}
} }
/** /**
* Set specific pragma statements * Read several items
* www.sqlite.org/pragma.html
* *
*PRAGMA page_size = bytes; // page size DB; it is reasonable to make it equal to the size of the disk cluster 4096 * a_group - group name
*PRAGMA cache_size = -kibibytes; // by default it is equal to 2000 pages of database * a_key - key name, may by NULL, it means reading the whole group
*PRAGMA encoding = "UTF-8"; // default = UTF-8 * a_count_out[in], how many items to read, 0 - no limits
*PRAGMA foreign_keys = 1; // default = 0 * a_count_out[out], how many items was read
*PRAGMA journal_mode = DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF;
*PRAGMA synchronous = 0 | OFF | 1 | NORMAL | 2 | FULL;
*/ */
bool dap_db_driver_sqlite_set_pragma(sqlite3 *a_db, char *a_param, char *a_mode) dap_store_obj_t *dap_db_driver_pgsql_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out)
{ {
if(!a_param || !a_mode) if (!a_group)
{ return NULL;
printf("[sqlite_set_pragma] err!!! no param or mode\n"); PGconn *l_conn = s_pgsql_get_connection();
return false; if (!l_conn) {
log_it(L_ERROR, "Can't pick PostgreSQL connection from pool");
return NULL;
} }
char *l_str_query = sqlite3_mprintf("PRAGMA %s = %s", a_param, a_mode); char *l_query_str;
int l_rc = dap_db_driver_sqlite_exec(a_db, l_str_query, NULL); // default synchronous=FULL if (a_key) {
sqlite3_free(l_str_query); l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" WHERE obj_key='%s'", a_group, a_key);
if(l_rc == SQLITE_OK) } else {
return true; if (a_count_out)
return false; l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" ORDER BY obj_id ASC LIMIT %d", a_group, *a_count_out);
else
l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" ORDER BY obj_id ASC", a_group);
}
PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1);
DAP_DELETE(l_query_str);
if (PQresultStatus(l_res) != PGRES_TUPLES_OK) {
log_it(L_ERROR, "Read objects failed with message: \"%s\"", PQresultErrorMessage(l_res));
PQclear(l_res);
return NULL;
}
// parse reply
size_t l_count = PQntuples(l_res);
dap_store_obj_t *l_obj = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(dap_store_obj_t) * l_count);
for (int i = 0; i < l_count; i++) {
// fill currrent item
dap_store_obj_t *l_obj_cur = l_obj + i;
s_pgsql_fill_object(a_group, l_obj_cur, l_res, i);
}
PQclear(l_res);
if (a_count_out)
*a_count_out = l_count;
return l_obj;
} }
#if 0
int dap_db_driver_sqlite_flush() int dap_db_driver_sqlite_flush()
{ {
log_it(L_DEBUG, "Start flush sqlite data base."); log_it(L_DEBUG, "Start flush sqlite data base.");
...@@ -321,111 +404,6 @@ int dap_db_driver_sqlite_flush() ...@@ -321,111 +404,6 @@ int dap_db_driver_sqlite_flush()
return 0; return 0;
} }
/**
* Execute SQL query to database that does not return data
*
* return 0 if Ok, else error code >0
*/
static int dap_db_driver_sqlite_exec(sqlite3 *l_db, const char *l_query, char **l_error_message)
{
char *l_zErrMsg = NULL;
int l_rc = sqlite3_exec(l_db, l_query, NULL, 0, &l_zErrMsg);
//printf("%s\n",l_query);
if(l_rc != SQLITE_OK)
{
if(l_error_message && l_zErrMsg)
*l_error_message = sqlite3_mprintf("SQL error: %s", l_zErrMsg);
if(l_zErrMsg)
sqlite3_free(l_zErrMsg);
return l_rc;
}
if(l_zErrMsg)
sqlite3_free(l_zErrMsg);
return l_rc;
}
/**
* Create table
*
* return 0 if Ok, else error code
*/
static int dap_db_driver_sqlite_create_group_table(const char *a_table_name)
{
char *l_error_message = NULL;
if(!s_db || !a_table_name)
return -1;
char *l_query =
dap_strdup_printf(
"create table if not exists '%s'(id INTEGER NOT NULL PRIMARY KEY, key TEXT KEY, hash BLOB, ts INTEGER KEY, value BLOB)",
a_table_name);
if(dap_db_driver_sqlite_exec(s_db, (const char*) l_query, &l_error_message) != SQLITE_OK)
{
log_it(L_ERROR, "Creatу_table : %s\n", l_error_message);
dap_db_driver_sqlite_free(l_error_message);
DAP_DELETE(l_query);
return -1;
}
DAP_DELETE(l_query);
// create unique index - key
l_query = dap_strdup_printf("create unique index if not exists 'idx_key_%s' ON '%s' (key)", a_table_name,
a_table_name);
if(dap_db_driver_sqlite_exec(s_db, (const char*) l_query, &l_error_message) != SQLITE_OK) {
log_it(L_ERROR, "Create unique index : %s\n", l_error_message);
dap_db_driver_sqlite_free(l_error_message);
DAP_DELETE(l_query);
return -1;
}
DAP_DELETE(l_query);
return 0;
}
/**
* Prepare SQL query for database
* l_query [in] SQL-string with a query to database, example:
* SELECT * FROM data
* SELECT id, sd FROM data LIMIT 300
* SELECT id, sd FROM data ORDER BY id ASC/DESC
* SELECT * FROM data WHERE time>449464766900000 and time<449464766910000"
* SELECT * FROM data WHERE hex(sd) LIKE '%370%'
* hex(x'0806') -> '08f6' или quote(sd) -> X'08f6'
* substr(x'031407301210361320690000',3,2) -> x'0730'
*
* CAST(substr(sd,5,2) as TEXT)
* additional function of line to number _uint8
* byte_to_bin(x'ff') -> 255
*/
static int dap_db_driver_sqlite_query(sqlite3 *db, char *query, sqlite3_stmt **l_res, char **l_error_message)
{
const char *pzTail; // OUT: Pointer to unused portion of zSql
int l_rc = sqlite3_prepare_v2(db, query, -1, l_res, &pzTail);
if(l_rc != SQLITE_OK)
{
if(l_error_message)
{
const char *zErrMsg = sqlite3_errmsg(db);
if(zErrMsg)
*l_error_message = sqlite3_mprintf("SQL Query error: %s\n", zErrMsg);
}
return l_rc;
}
return l_rc;
}
/**
* Clear memory after fetching a string
*
* return 0 if Ok, else -1
*/
static void dap_db_driver_sqlite_row_free(SQLITE_ROW_VALUE *row)
{
if(row) {
// delete the whole string
sqlite3_free(row->val);
// delete structure
sqlite3_free(row);
}
}
/** /**
* Selects the next entry from the result of the query and returns an array * Selects the next entry from the result of the query and returns an array
* *
...@@ -482,235 +460,6 @@ static int dap_db_driver_sqlite_fetch_array(sqlite3_stmt *l_res, SQLITE_ROW_VALU ...@@ -482,235 +460,6 @@ static int dap_db_driver_sqlite_fetch_array(sqlite3_stmt *l_res, SQLITE_ROW_VALU
return l_rc; return l_rc;
} }
/**
* Clear memory when request processing is complete
*/
static bool dap_db_driver_sqlite_query_free(sqlite3_stmt *l_res)
{
if(!l_res)
return false;
int rc = sqlite3_finalize(l_res);
if(rc != SQLITE_OK)
return false;
return true;
}
/**
* Convert the array into a string to save to blob
*/
static char* dap_db_driver_get_string_from_blob(uint8_t *blob, int len)
{
char *str_out;
int ret;
if(!blob)
return NULL;
str_out = (char*) sqlite3_malloc(len * 2 + 1);
ret = (int)dap_bin2hex(str_out, (const void*)blob, (size_t)len);
str_out[len * 2] = 0;
return str_out;
}
/**
* Cleaning the database from the deleted data
*
* return 0 if Ok, else error code >0
*/
int dap_db_driver_sqlite_vacuum(sqlite3 *l_db)
{
if(!s_db)
return -1;
int l_rc = dap_db_driver_sqlite_exec(l_db, "VACUUM", NULL);
return l_rc;
}
/**
* Start a transaction
*/
int dap_db_driver_sqlite_start_transaction(void)
{
pthread_rwlock_wrlock(&s_db_rwlock);
if(!s_db){
pthread_rwlock_unlock(&s_db_rwlock);
return -666;
}
if(SQLITE_OK == dap_db_driver_sqlite_exec(s_db, "BEGIN", NULL)){
pthread_rwlock_unlock(&s_db_rwlock);
return 0;
}else{
pthread_rwlock_unlock(&s_db_rwlock);
return -1;
}
}
/**
* End of transaction
*/
int dap_db_driver_sqlite_end_transaction(void)
{
pthread_rwlock_wrlock(&s_db_rwlock);
if(!s_db){
pthread_rwlock_unlock(&s_db_rwlock);
return -666;
}
if(SQLITE_OK == dap_db_driver_sqlite_exec(s_db, "COMMIT", NULL)){
pthread_rwlock_unlock(&s_db_rwlock);
return 0;
}else{
pthread_rwlock_unlock(&s_db_rwlock);
return -1;
}
}
char *dap_db_driver_sqlite_make_group_name(const char *a_table_name)
{
char *l_table_name = dap_strdup(a_table_name);
ssize_t l_table_name_len = (ssize_t)dap_strlen(l_table_name);
const char *l_needle = "_";
// replace '_' to '.'
while(1){
char *l_str = dap_strstr_len(l_table_name, l_table_name_len, l_needle);
if(l_str)
*l_str = '.';
else
break;
}
return l_table_name;
}
char *dap_db_driver_sqlite_make_table_name(const char *a_group_name)
{
char *l_group_name = dap_strdup(a_group_name);
ssize_t l_group_name_len = (ssize_t)dap_strlen(l_group_name);
const char *l_needle = ".";
// replace '.' to '_'
while(1){
char *l_str = dap_strstr_len(l_group_name, l_group_name_len, l_needle);
if(l_str)
*l_str = '_';
else
break;
}
return l_group_name;
}
/**
* Apply data (write or delete)
*
*/
int dap_db_driver_sqlite_apply_store_obj(dap_store_obj_t *a_store_obj)
{
if(!a_store_obj || !a_store_obj->group )
return -1;
char *l_query = NULL;
char *l_error_message = NULL;
char *l_table_name = dap_db_driver_sqlite_make_table_name(a_store_obj->group);
if(a_store_obj->type == 'a') {
if(!a_store_obj->key || !a_store_obj->value || !a_store_obj->value_len)
return -1;
//dap_chain_hash_fast_t l_hash;
//dap_hash_fast(a_store_obj->value, a_store_obj->value_len, &l_hash);
char *l_blob_hash = "";//dap_db_driver_get_string_from_blob((uint8_t*) &l_hash, sizeof(dap_chain_hash_fast_t));
char *l_blob_value = dap_db_driver_get_string_from_blob(a_store_obj->value, (int)a_store_obj->value_len);
//add one record
l_query = sqlite3_mprintf("insert into '%s' values(NULL, '%s', x'%s', '%lld', x'%s')",
l_table_name, a_store_obj->key, l_blob_hash, a_store_obj->timestamp, l_blob_value);
//dap_db_driver_sqlite_free(l_blob_hash);
dap_db_driver_sqlite_free(l_blob_value);
}
else if(a_store_obj->type == 'd') {
//delete one record
if(a_store_obj->key)
l_query = sqlite3_mprintf("delete from '%s' where key = '%s'",
l_table_name, a_store_obj->key);
// remove all group
else {
l_query = sqlite3_mprintf("drop table if exists '%s'", l_table_name);
}
}
else {
log_it(L_ERROR, "Unknown store_obj type '0x%x'", a_store_obj->type);
return -1;
}
// execute request
pthread_rwlock_wrlock(&s_db_rwlock);
if(!s_db){
pthread_rwlock_unlock(&s_db_rwlock);
return -666;
}
int l_ret = dap_db_driver_sqlite_exec(s_db, l_query, &l_error_message);
if(l_ret == SQLITE_ERROR) {
dap_db_driver_sqlite_free(l_error_message);
l_error_message = NULL;
// create table
dap_db_driver_sqlite_create_group_table(l_table_name);
// repeat request
l_ret = dap_db_driver_sqlite_exec(s_db, l_query, &l_error_message);
}
// entry with the same hash is already present
if(l_ret == SQLITE_CONSTRAINT) {
dap_db_driver_sqlite_free(l_error_message);
l_error_message = NULL;
//delete exist record
char *l_query_del = sqlite3_mprintf("delete from '%s' where key = '%s'", l_table_name, a_store_obj->key);
l_ret = dap_db_driver_sqlite_exec(s_db, l_query_del, &l_error_message);
dap_db_driver_sqlite_free(l_query_del);
if(l_ret != SQLITE_OK) {
log_it(L_INFO, "Entry with the same key is already present and can't delete, %s", l_error_message);
dap_db_driver_sqlite_free(l_error_message);
l_error_message = NULL;
}
// repeat request
l_ret = dap_db_driver_sqlite_exec(s_db, l_query, &l_error_message);
}
pthread_rwlock_unlock(&s_db_rwlock);
// missing database
if(l_ret != SQLITE_OK) {
log_it(L_ERROR, "sqlite apply error: %s", l_error_message);
dap_db_driver_sqlite_free(l_error_message);
l_ret = -1;
}
dap_db_driver_sqlite_free(l_query);
DAP_DELETE(l_table_name);
return l_ret;
}
static void fill_one_item(const char *a_group, dap_store_obj_t *a_obj, SQLITE_ROW_VALUE *a_row)
{
a_obj->group = dap_strdup(a_group);
for(int l_iCol = 0; l_iCol < a_row->count; l_iCol++) {
SQLITE_VALUE *l_cur_val = a_row->val + l_iCol;
switch (l_iCol) {
case 0:
if(l_cur_val->type == SQLITE_INTEGER)
a_obj->id = (uint64_t)l_cur_val->val.val_int64;
break; // id
case 1:
if(l_cur_val->type == SQLITE_INTEGER)
a_obj->timestamp = l_cur_val->val.val_int64;
break; // ts
case 2:
if(l_cur_val->type == SQLITE_TEXT)
a_obj->key = dap_strdup(l_cur_val->val.val_str);
break; // key
case 3:
if(l_cur_val->type == SQLITE_BLOB)
{
a_obj->value_len = (size_t) l_cur_val->len;
a_obj->value = DAP_NEW_SIZE(uint8_t, a_obj->value_len);
memcpy(a_obj->value, l_cur_val->val.val_blob, a_obj->value_len);
}
break; // value
}
}
}
/** /**
* Read last items * Read last items
* *
...@@ -835,83 +584,7 @@ dap_store_obj_t* dap_db_driver_sqlite_read_cond_store_obj(const char *a_group, u ...@@ -835,83 +584,7 @@ dap_store_obj_t* dap_db_driver_sqlite_read_cond_store_obj(const char *a_group, u
return l_obj; return l_obj;
} }
/**
* Read several items
*
* a_group - group name
* a_key - key name, may by NULL, it means reading the whole group
* a_count_out[in], how many items to read, 0 - no limits
* a_count_out[out], how many items was read
*/
dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out)
{
if(!a_group || !s_db)
return NULL;
dap_store_obj_t *l_obj = NULL;
sqlite3_stmt *l_res;
char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group);
// no limit
uint64_t l_count_out = 0;
if(a_count_out)
l_count_out = *a_count_out;
char *l_str_query;
if(a_key) {
if(l_count_out)
l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE key='%s' ORDER BY id ASC LIMIT %d",
l_table_name, a_key, l_count_out);
else
l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE key='%s' ORDER BY id ASC",
l_table_name, a_key);
}
else {
if(l_count_out)
l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' ORDER BY id ASC LIMIT %d",
l_table_name, l_count_out);
else
l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' ORDER BY id ASC", l_table_name);
}
pthread_rwlock_wrlock(&s_db_rwlock);
int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, NULL);
pthread_rwlock_unlock(&s_db_rwlock);
sqlite3_free(l_str_query);
DAP_DEL_Z(l_table_name);
if(l_ret != SQLITE_OK) {
//log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
return NULL;
}
//int b = qlite3_column_count(s_db);
SQLITE_ROW_VALUE *l_row = NULL;
l_count_out = 0;
uint64_t l_count_sized = 0;
do {
l_ret = dap_db_driver_sqlite_fetch_array(l_res, &l_row);
if(l_ret != SQLITE_ROW && l_ret != SQLITE_DONE)
{
// log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
}
if(l_ret == SQLITE_ROW && l_row) {
// realloc memory
if(l_count_out >= l_count_sized) {
l_count_sized += 10;
l_obj = DAP_REALLOC(l_obj, sizeof(dap_store_obj_t) * l_count_sized);
memset(l_obj + l_count_out, 0, sizeof(dap_store_obj_t) * (l_count_sized - l_count_out));
}
// fill currrent item
dap_store_obj_t *l_obj_cur = l_obj + l_count_out;
fill_one_item(a_group, l_obj_cur, l_row);
l_count_out++;
}
dap_db_driver_sqlite_row_free(l_row);
} while(l_row);
dap_db_driver_sqlite_query_free(l_res);
if(a_count_out)
*a_count_out = l_count_out;
return l_obj;
}
dap_list_t* dap_db_driver_sqlite_get_groups_by_mask(const char *a_group_mask) dap_list_t* dap_db_driver_sqlite_get_groups_by_mask(const char *a_group_mask)
{ {
......
...@@ -62,8 +62,8 @@ void dap_global_db_obj_track_history(void* a_store_data); ...@@ -62,8 +62,8 @@ void dap_global_db_obj_track_history(void* a_store_data);
*/ */
void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group); void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group);
dap_store_obj_t* dap_chain_global_db_obj_gr_get(const char *a_key, size_t *a_data_len_out, const char *a_group); dap_store_obj_t* dap_chain_global_db_obj_gr_get(const char *a_key, size_t *a_data_len_out, const char *a_group);
uint8_t * dap_chain_global_db_gr_get(const char *a_key, size_t *a_data_out, const char *a_group); uint8_t * dap_chain_global_db_gr_get(const char *a_key, size_t *a_data_len_out, const char *a_group);
uint8_t * dap_chain_global_db_get(const char *a_key, size_t *a_data_out); uint8_t * dap_chain_global_db_get(const char *a_key, size_t *a_data_len_out);
/** /**
* Set one entry to base * Set one entry to base
......
...@@ -5,4 +5,12 @@ ...@@ -5,4 +5,12 @@
#include "/usr/include/postgresql/libpq-fe.h" #include "/usr/include/postgresql/libpq-fe.h"
#endif #endif
#define DAP_PGSQL_DBHASHNAME_LEN 8
#define DAP_PGSQL_POOL_COUNT 16
int dap_db_driver_pgsql_init(const char *a_filename_dir, dap_db_driver_callbacks_t *a_drv_callback); int dap_db_driver_pgsql_init(const char *a_filename_dir, dap_db_driver_callbacks_t *a_drv_callback);
int dap_db_driver_pgsql_deinit();
int dap_db_driver_pgsql_start_transaction(void);
int dap_db_driver_pgsql_end_transaction(void);
int dap_db_driver_pgsql_apply_store_obj(dap_store_obj_t *a_store_obj);
dap_store_obj_t *dap_db_driver_pgsql_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out);
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment