From 99dfebc7c32e511c8a0e1b319393b928ff9c2b26 Mon Sep 17 00:00:00 2001
From: Roman Khlopkov <roman.khlopkov@demlabs.net>
Date: Wed, 6 Oct 2021 18:09:40 +0300
Subject: [PATCH] [+] PostgreSQL driver for GDB

---
 dap-sdk/core/src/dap_file_utils.c             |   4 +-
 .../dap_chain_global_db_driver_pgsql.c        | 515 +++++++-----------
 .../dap_chain_global_db_driver_pgsql.h        |   6 +
 3 files changed, 216 insertions(+), 309 deletions(-)

diff --git a/dap-sdk/core/src/dap_file_utils.c b/dap-sdk/core/src/dap_file_utils.c
index 30b3f7dc98..ef6c15699c 100755
--- a/dap-sdk/core/src/dap_file_utils.c
+++ b/dap-sdk/core/src/dap_file_utils.c
@@ -160,8 +160,8 @@ int dap_mkdir_with_parents(const char *a_dir_path)
 #ifdef _WIN32
             int result = mkdir(path);
 #else
-            int result = mkdir(path, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH | S_IWOTH);
-                         chmod(path, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH | S_IWOTH);
+            int result = mkdir(path, S_IRWXU | S_IRWXG | S_IRWXO);
+                         chmod(path, S_IRWXU | S_IRWXG | S_IRWXO);
 #endif
             if(result == -1) {
                 errno = ENOTDIR;
diff --git a/modules/global-db/dap_chain_global_db_driver_pgsql.c b/modules/global-db/dap_chain_global_db_driver_pgsql.c
index 88553149c2..fa5e6e8fd6 100644
--- a/modules/global-db/dap_chain_global_db_driver_pgsql.c
+++ b/modules/global-db/dap_chain_global_db_driver_pgsql.c
@@ -28,10 +28,10 @@
 #include <pthread.h>
 #include <errno.h>
 #include <pwd.h>
-
-#ifdef DAP_OS_UNIX
+#include <fcntl.h>
 #include <unistd.h>
-#endif
+#include <sys/stat.h>
+
 #include "dap_common.h"
 #include "dap_hash.h"
 #include "dap_file_utils.h"
@@ -46,13 +46,16 @@ struct dap_pgsql_conn_pool_item {
     int busy;
 };
 
+static PGconn *s_trans_conn = NULL;
 static struct dap_pgsql_conn_pool_item s_conn_pool[DAP_PGSQL_POOL_COUNT];
 static pthread_rwlock_t s_db_rwlock = PTHREAD_RWLOCK_INITIALIZER;
 
 static PGconn *s_pgsql_get_connection(void)
 {
+    if (pthread_rwlock_rdlock(s&_db_rwlock) == EDEADLK) {
+        return s_trans_conn;
+    }
     PGconn *l_ret = NULL;
-    pthread_rwlock_rdlock(&s_db_rwlock);
     for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) {
         if (!s_conn_pool[i].busy) {
             l_ret = s_conn_pool[i].conn;
@@ -66,7 +69,9 @@ static PGconn *s_pgsql_get_connection(void)
 
 static void s_pgsql_free_connection(PGconn *a_conn)
 {
-    pthread_rwlock_rdlock(&s_db_rwlock);
+    if (pthread_rwlock_rdlock(s&_db_rwlock) == EDEADLK) {
+        return;
+    }
     for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) {
         if (s_conn_pool[i].conn == a_conn) {
             s_conn_pool[i].busy = 0;
@@ -82,42 +87,29 @@ static void s_pgsql_free_connection(PGconn *a_conn)
  */
 int dap_db_driver_pgsql_init(const char *a_filename_dir, dap_db_driver_callbacks_t *a_drv_callback)
 {
-    // Check paths and create them if nessesary
-    if (!dap_dir_test(a_filename_dir)) {
-        log_it(L_NOTICE, "No directory %s, trying to create...", a_filename_dir);
-        int l_mkdir_ret = dap_mkdir_with_parents(a_filename_dir);
-        int l_errno = errno;
-        if (!dap_dir_test(a_filename_dir)) {
-            char l_errbuf[255];
-            l_errbuf[0] = '\0';
-            strerror_r(l_errno, l_errbuf, sizeof(l_errbuf));
-            log_it(L_ERROR, "Can't create directory, error code %d, error string \"%s\"", l_mkdir_ret, l_errbuf);
-            return -1;
-        } else
-            log_it(L_NOTICE,"Directory created");
-    }
     dap_hash_fast_t l_dir_hash;
     dap_hash_fast(a_filename_dir, strlen(a_filename_dir), &l_dir_hash);
     char l_db_name[DAP_PGSQL_DBHASHNAME_LEN + 1];
     dap_htoa64(l_db_name, l_dir_hash.raw, DAP_PGSQL_DBHASHNAME_LEN);
     l_db_name[DAP_PGSQL_DBHASHNAME_LEN] = '\0';
-    // Open PostgreSQL database, create if nessesary
-    const char *l_base_conn_str = "dbname = postgres";
-    PGconn *l_base_conn = PQconnectdb(l_base_conn_str);
-    if (PQstatus(l_base_conn) != CONNECTION_OK) {
-        log_it(L_ERROR, "Can't init PostgreSQL database: \"%s\"", PQerrorMessage(l_base_conn));
-        PQfinish(l_base_conn);
-        return -2;
-    }
-    char *l_query_str = dap_strdup_printf("SELECT EXISTS (SELECT * FROM pg_database WHERE datname = '%s')", l_db_name);
-    PGresult *l_res = PQexec(l_base_conn, l_query_str);
-    DAP_DELETE(l_query_str);
-    if (PQresultStatus(l_res) != PGRES_TUPLES_OK) {
-        log_it(L_ERROR, "Can't read PostgreSQL database: \"%s\"", PQresultErrorMessage(l_res));
-        PQclear(l_res);
-        return -3;
-    }
-    if (*PQgetvalue(l_res, 0, 0) == 'f') {  //false, database not exists, than create it
+    if (!dap_dir_test(a_filename_dir) || !readdir(opendir(a_filename_dir))) {
+        // Create PostgreSQL database
+        const char *l_base_conn_str = "dbname = postgres";
+        PGconn *l_base_conn = PQconnectdb(l_base_conn_str);
+        if (PQstatus(l_base_conn) != CONNECTION_OK) {
+            log_it(L_ERROR, "Can't init PostgreSQL database: \"%s\"", PQerrorMessage(l_base_conn));
+            PQfinish(l_base_conn);
+            return -2;
+        }
+        char *l_query_str = dap_strdup_printf("DROP DATABASE IF EXISTS \"%s\"", l_db_name);
+        PGresult *l_res = PQexec(l_base_conn, l_query_str);
+        DAP_DELETE(l_query_str);
+        if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
+            log_it(L_ERROR, "Drop database failed: \"%s\"", PQresultErrorMessage(l_res));
+            PQclear(l_res);
+            PQfinish(l_base_conn);
+            return -3;
+        }
         PQclear(l_res);
         l_query_str = dap_strdup_printf("DROP TABLESPACE IF EXISTS \"%s\"", l_db_name);
         l_res = PQexec(l_base_conn, l_query_str);
@@ -125,19 +117,34 @@ int dap_db_driver_pgsql_init(const char *a_filename_dir, dap_db_driver_callbacks
         if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
             log_it(L_ERROR, "Drop tablespace failed with message: \"%s\"", PQresultErrorMessage(l_res));
             PQclear(l_res);
+            PQfinish(l_base_conn);
             return -4;
         }
         PQclear(l_res);
-        dap_mkdir_with_parents(a_filename_dir);
-        chown(a_filename_dir, getpwnam("postgres")->pw_uid, -1);
+        // Check paths and create them if nessesary
+        if (!dap_dir_test(a_filename_dir)) {
+            log_it(L_NOTICE, "No directory %s, trying to create...", a_filename_dir);
+            dap_mkdir_with_parents(a_filename_dir);
+            if (!dap_dir_test(a_filename_dir)) {
+                char l_errbuf[255];
+                l_errbuf[0] = '\0';
+                strerror_r(errno, l_errbuf, sizeof(l_errbuf));
+                log_it(L_ERROR, "Can't create directory, error code %d, error string \"%s\"", errno, l_errbuf);
+                return -1;
+            }
+            log_it(L_NOTICE,"Directory created");
+            chown(a_filename_dir, getpwnam("postgres")->pw_uid, -1);
+        }
         l_query_str = dap_strdup_printf("CREATE TABLESPACE \"%s\" LOCATION '%s'", l_db_name, a_filename_dir);
         l_res = PQexec(l_base_conn, l_query_str);
         DAP_DELETE(l_query_str);
         if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
             log_it(L_ERROR, "Create tablespace failed with message: \"%s\"", PQresultErrorMessage(l_res));
             PQclear(l_res);
+            PQfinish(l_base_conn);
             return -5;
         }
+        chmod(a_filename_dir, S_IRWXU | S_IRWXG | S_IRWXO);
         PQclear(l_res);
         l_query_str = dap_strdup_printf("CREATE DATABASE \"%s\" WITH TABLESPACE \"%s\"", l_db_name, l_db_name);
         l_res = PQexec(l_base_conn, l_query_str);
@@ -145,11 +152,12 @@ int dap_db_driver_pgsql_init(const char *a_filename_dir, dap_db_driver_callbacks
         if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
             log_it(L_ERROR, "Create database failed with message: \"%s\"", PQresultErrorMessage(l_res));
             PQclear(l_res);
+            PQfinish(l_base_conn);
             return -6;
         }
+        PQclear(l_res);
+        PQfinish(l_base_conn);
     }
-    PQclear(l_res);
-    PQfinish(l_base_conn);
     // Create connection pool for the DAP database
     char *l_conn_str = dap_strdup_printf("dbname = %s", l_db_name);
     for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) {
@@ -169,13 +177,13 @@ int dap_db_driver_pgsql_init(const char *a_filename_dir, dap_db_driver_callbacks
     a_drv_callback->transaction_end = dap_db_driver_pgsql_end_transaction;
     a_drv_callback->apply_store_obj = dap_db_driver_pgsql_apply_store_obj;
     a_drv_callback->read_store_obj = dap_db_driver_pgsql_read_store_obj;
-    //a_drv_callback->read_cond_store_obj = dap_db_driver_sqlite_read_cond_store_obj;
-    //a_drv_callback->read_last_store_obj = dap_db_driver_sqlite_read_last_store_obj;
-    //a_drv_callback->get_groups_by_mask  = dap_db_driver_sqlite_get_groups_by_mask;
-    //a_drv_callback->read_count_store = dap_db_driver_sqlite_read_count_store;
-    //a_drv_callback->is_obj = dap_db_driver_sqlite_is_obj;
-    //a_drv_callback->deinit = dap_db_driver_sqlite_deinit;
-    //a_drv_callback->flush = dap_db_driver_sqlite_flush;
+    a_drv_callback->read_cond_store_obj = dap_db_driver_pgsql_read_cond_store_obj;
+    a_drv_callback->read_last_store_obj = dap_db_driver_pgsql_read_last_store_obj;
+    a_drv_callback->get_groups_by_mask  = dap_db_driver_pgsql_get_groups_by_mask;
+    a_drv_callback->read_count_store = dap_db_driver_pgsql_read_count_store;
+    a_drv_callback->is_obj = dap_db_driver_pgsql_is_obj;
+    a_drv_callback->deinit = dap_db_driver_pgsql_deinit;
+    a_drv_callback->flush = dap_db_driver_pgsql_flush;
     return 0;
 }
 
@@ -195,8 +203,17 @@ int dap_db_driver_pgsql_deinit(void)
  */
 int dap_db_driver_pgsql_start_transaction(void)
 {
-    // TODO make a transaction with a single connection from pool
-    //PGresult *l_res = PQexec(l_conn, "BEGIN");
+    s_trans_conn = s_pgsql_get_connection();
+    if (!s_trans_conn)
+        return -1;
+    pthread_rwlock_rdlock(&s_db_rwlock);
+    PGresult *l_res = PQexec(s_trans_conn, "BEGIN");
+    if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
+        log_it(L_ERROR, "Begin transaction failed with message: \"%s\"", PQresultErrorMessage(l_res));
+        pthread_rwlock_unlock(&s_db_rwlock);
+        s_pgsql_free_connection(s_trans_conn);
+        s_trans_conn = NULL;
+    }
     return 0;
 }
 
@@ -205,8 +222,15 @@ int dap_db_driver_pgsql_start_transaction(void)
  */
 int dap_db_driver_pgsql_end_transaction(void)
 {
-    // TODO make a transaction with a single connection from pool
-    //PGresult *l_res = PQexec(l_conn, "COMMIT");
+    if (s_trans_conn)
+        return -1;
+    PGresult *l_res = PQexec(l_trans_conn, "COMMIT");
+    if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
+        log_it(L_ERROR, "End transaction failed with message: \"%s\"", PQresultErrorMessage(l_res));
+    }
+    pthread_rwlock_unlock(&s_db_rwlock);
+    s_pgsql_free_connection(s_trans_conn);
+    s_trans_conn = NULL;
     return 0;
 }
 
@@ -215,27 +239,21 @@ int dap_db_driver_pgsql_end_transaction(void)
  *
  * return 0 if Ok, else error code
  */
-static int s_pgsql_create_group_table(const char *a_table_name)
+static int s_pgsql_create_group_table(const char *a_table_name, PGconn *a_conn)
 {
     if (!a_table_name)
         return -1;
-    PGconn *l_conn = s_pgsql_get_connection();
-    if (!l_conn) {
-        log_it(L_ERROR, "Can't pick PostgreSQL connection from pool");
-        return -2;
-    }
     int l_ret = 0;
     char *l_query_str = dap_strdup_printf("CREATE TABLE \"%s\""
                                           "(obj_id SERIAL PRIMARY KEY, obj_ts BIGINT, obj_key TEXT UNIQUE, obj_val BYTEA)",
                                           a_table_name);
-    PGresult *l_res = PQexec(l_conn, l_query_str);
+    PGresult *l_res = PQexec(a_conn, l_query_str);
     DAP_DELETE(l_query_str);
     if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
         log_it(L_ERROR, "Create table failed with message: \"%s\"", PQresultErrorMessage(l_res));
         l_ret = -3;
     }
     PQclear(l_res);
-    s_pgsql_free_connection(l_conn);
     return l_ret;
 }
 
@@ -264,13 +282,13 @@ int dap_db_driver_pgsql_apply_store_obj(dap_store_obj_t *a_store_obj)
         int l_param_formats[2] = {1, 1};
         l_query_str = dap_strdup_printf("INSERT INTO \"%s\" (obj_ts, obj_key, obj_val) VALUES ($1, '%s', $2) "
                                         "ON CONFLICT (obj_key) DO UPDATE SET "
-                                        "obj_ts = EXCLUDED.obj_ts, obj_val = EXCLUDED.obj_val;",
+                                        "obj_id = EXCLUDED.obj_id, obj_ts = EXCLUDED.obj_ts, obj_val = EXCLUDED.obj_val;",
                                         a_store_obj->group,  a_store_obj->key);
 
         // execute add request
         l_res = PQexecParams(l_conn, l_query_str, 2, NULL, l_param_vals, l_param_lens, l_param_formats, 0);
         if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
-            if (a_store_obj->type == 'a' && s_pgsql_create_group_table(a_store_obj->group) == 0) {
+            if (a_store_obj->type == 'a' && s_pgsql_create_group_table(a_store_obj->group, l_conn) == 0) {
                 PQclear(l_res);
                 l_res = PQexecParams(l_conn, l_query_str, 2, NULL, l_param_vals, l_param_lens, l_param_formats, 0);
             }
@@ -342,15 +360,16 @@ dap_store_obj_t *dap_db_driver_pgsql_read_store_obj(const char *a_group, const c
     }
     char *l_query_str;
     if (a_key) {
-       l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" WHERE obj_key='%s'", a_group, a_key);
+       l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" WHERE obj_key = '%s'", a_group, a_key);
     } else {
-        if (a_count_out)
+        if (a_count_out && *a_count_out)
             l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" ORDER BY obj_id ASC LIMIT %d", a_group, *a_count_out);
         else
             l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" ORDER BY obj_id ASC", a_group);
     }
 
     PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1);
+    s_pgsql_free_connection(l_conn);
     DAP_DELETE(l_query_str);
     if (PQresultStatus(l_res) != PGRES_TUPLES_OK) {
         log_it(L_ERROR, "Read objects failed with message: \"%s\"", PQresultErrorMessage(l_res));
@@ -372,138 +391,32 @@ dap_store_obj_t *dap_db_driver_pgsql_read_store_obj(const char *a_group, const c
     return l_obj;
 }
 
-#if 0
-int dap_db_driver_sqlite_flush()
-{
-    log_it(L_DEBUG, "Start flush sqlite data base.");
-    pthread_rwlock_wrlock(&s_db_rwlock);
-    if(!s_db){
-        pthread_rwlock_unlock(&s_db_rwlock);
-        return -666;
-    }
-    dap_db_driver_sqlite_close(s_db);
-    char *l_error_message = NULL;
-    s_db = dap_db_driver_sqlite_open(s_filename_db, SQLITE_OPEN_READWRITE, &l_error_message);
-    if(!s_db) {
-        pthread_rwlock_unlock(&s_db_rwlock);
-        log_it(L_ERROR, "Can't init sqlite err: \"%s\"", l_error_message? l_error_message: "UNKNOWN");
-        dap_db_driver_sqlite_free(l_error_message);
-        return -3;
-    }
-#ifndef _WIN32
-    sync();
-#endif
-    if(!dap_db_driver_sqlite_set_pragma(s_db, "synchronous", "NORMAL")) // 0 | OFF | 1 | NORMAL | 2 | FULL
-        log_it(L_WARNING, "Can't set new synchronous mode\n");
-    if(!dap_db_driver_sqlite_set_pragma(s_db, "journal_mode", "OFF")) // DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF
-        log_it(L_WARNING, "Can't set new journal mode\n");
-
-    if(!dap_db_driver_sqlite_set_pragma(s_db, "page_size", "1024")) // DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF
-        log_it(L_WARNING, "Can't set page_size\n");
-    pthread_rwlock_unlock(&s_db_rwlock);
-    return 0;
-}
-
 /**
- * Selects the next entry from the result of the query and returns an array
- *
- * l_res: identifier received in sqlite_query ()
- * l_row_out [out]: pointer to a column or NULL
- *
- * return:
- * SQLITE_ROW(100) has another row ready
- * SQLITE_DONE(101) finished executing,
- * SQLITE_CONSTRAINT(19) data is not unique and will not be added
- */
-static int dap_db_driver_sqlite_fetch_array(sqlite3_stmt *l_res, SQLITE_ROW_VALUE **l_row_out)
-{
-    SQLITE_ROW_VALUE *l_row = NULL;
-    // go to next the string
-    int l_rc = sqlite3_step(l_res);
-    if(l_rc == SQLITE_ROW) // SQLITE_ROW(100) or SQLITE_DONE(101) or SQLITE_BUSY(5)
-    {
-        int l_iCol; // number of the column in the row
-        // allocate memory for a row with data
-        l_row = (SQLITE_ROW_VALUE*) sqlite3_malloc(sizeof(SQLITE_ROW_VALUE));
-        int l_count = sqlite3_column_count(l_res); // get the number of columns
-        // allocate memory for all columns
-        l_row->val = (SQLITE_VALUE*) sqlite3_malloc(l_count * (int)sizeof(SQLITE_VALUE));
-        if(l_row->val)
-        {
-            l_row->count = l_count; // number of columns
-            for(l_iCol = 0; l_iCol < l_row->count; l_iCol++)
-                    {
-                SQLITE_VALUE *cur_val = l_row->val + l_iCol;
-                cur_val->len = sqlite3_column_bytes(l_res, l_iCol); // how many bytes will be needed
-                cur_val->type = (signed char)sqlite3_column_type(l_res, l_iCol); // field type
-                if(cur_val->type == SQLITE_INTEGER)
-                {
-                    cur_val->val.val_int64 = sqlite3_column_int64(l_res, l_iCol);
-                }
-                else if(cur_val->type == SQLITE_FLOAT)
-                    cur_val->val.val_float = sqlite3_column_double(l_res, l_iCol);
-                else if(cur_val->type == SQLITE_BLOB)
-                    cur_val->val.val_blob = (const unsigned char*) sqlite3_column_blob(l_res, l_iCol);
-                else if(cur_val->type == SQLITE_TEXT)
-                    cur_val->val.val_str = (const char*) sqlite3_column_text(l_res, l_iCol); //sqlite3_mprintf("%s",sqlite3_column_text(l_res,iCol));
-                else
-                    cur_val->val.val_str = NULL;
-            }
-        }
-        else
-            l_row->count = 0; // number of columns
-    }
-    if(l_row_out)
-        *l_row_out = l_row;
-    else
-        dap_db_driver_sqlite_row_free(l_row);
-    return l_rc;
-}
-
-/**
- * Read last items
+ * Read last item
  *
  * a_group - group name
  */
-dap_store_obj_t* dap_db_driver_sqlite_read_last_store_obj(const char *a_group)
+dap_store_obj_t *dap_db_driver_pgsql_read_last_store_obj(const char *a_group)
 {
-
-    dap_store_obj_t *l_obj = NULL;
-    char *l_error_message = NULL;
-    sqlite3_stmt *l_res;
-    if(!a_group)
+    if (!a_group)
         return NULL;
-    char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group);
-    char *l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' ORDER BY id DESC LIMIT 1", l_table_name);
-    pthread_rwlock_wrlock(&s_db_rwlock);
-    if(!s_db){
-        pthread_rwlock_unlock(&s_db_rwlock);
+    PGconn *l_conn = s_pgsql_get_connection();
+    if (!l_conn) {
+        log_it(L_ERROR, "Can't pick PostgreSQL connection from pool");
         return NULL;
     }
-
-    int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, &l_error_message);
-    pthread_rwlock_unlock(&s_db_rwlock);
-    sqlite3_free(l_str_query);
-    DAP_DEL_Z(l_table_name);
-    if(l_ret != SQLITE_OK) {
-        //log_it(L_ERROR, "read last l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
-        dap_db_driver_sqlite_free(l_error_message);
+    char *l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" ORDER BY obj_id DESC LIMIT 1", a_group);
+    PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1);
+    s_pgsql_free_connection(l_conn);
+    DAP_DELETE(l_query_str);
+    if (PQresultStatus(l_res) != PGRES_TUPLES_OK) {
+        log_it(L_ERROR, "Read last object failed with message: \"%s\"", PQresultErrorMessage(l_res));
+        PQclear(l_res);
         return NULL;
     }
-
-    SQLITE_ROW_VALUE *l_row = NULL;
-    l_ret = dap_db_driver_sqlite_fetch_array(l_res, &l_row);
-    if(l_ret != SQLITE_ROW && l_ret != SQLITE_DONE)
-    {
-        //log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
-    }
-    if(l_ret == SQLITE_ROW && l_row) {
-        l_obj = DAP_NEW_Z(dap_store_obj_t);
-        fill_one_item(a_group, l_obj, l_row);
-    }
-    dap_db_driver_sqlite_row_free(l_row);
-    dap_db_driver_sqlite_query_free(l_res);
-
+    dap_store_obj_t *l_obj = DAP_NEW_Z(dap_store_obj_t);
+    s_pgsql_fill_object(a_group, l_obj, l_res, 0);
+    PQclear(l_res);
     return l_obj;
 }
 
@@ -515,156 +428,144 @@ dap_store_obj_t* dap_db_driver_sqlite_read_last_store_obj(const char *a_group)
  * a_count_out[in], how many items to read, 0 - no limits
  * a_count_out[out], how many items was read
  */
-dap_store_obj_t* dap_db_driver_sqlite_read_cond_store_obj(const char *a_group, uint64_t a_id, size_t *a_count_out)
+dap_store_obj_t *dap_db_driver_pgsql_read_cond_store_obj(const char *a_group, uint64_t a_id, size_t *a_count_out)
 {
-    dap_store_obj_t *l_obj = NULL;
-    char *l_error_message = NULL;
-    sqlite3_stmt *l_res;
-    if(!a_group)
+    if (!a_group)
         return NULL;
-
-    char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group);
-    // no limit
-    int l_count_out = 0;
-    if(a_count_out)
-        l_count_out = (int)*a_count_out;
-    char *l_str_query;
-    if(l_count_out)
-        l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE id>='%lld' ORDER BY id ASC LIMIT %d",
-                l_table_name, a_id, l_count_out);
-    else
-        l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE id>='%lld' ORDER BY id ASC",
-                l_table_name, a_id);
-    pthread_rwlock_wrlock(&s_db_rwlock);
-    if(!s_db){
-        pthread_rwlock_unlock(&s_db_rwlock);
+    PGconn *l_conn = s_pgsql_get_connection();
+    if (!l_conn) {
+        log_it(L_ERROR, "Can't pick PostgreSQL connection from pool");
         return NULL;
     }
-
-    int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, &l_error_message);
-    pthread_rwlock_unlock(&s_db_rwlock);
-    sqlite3_free(l_str_query);
-    DAP_DEL_Z(l_table_name);
-
-    if(l_ret != SQLITE_OK) {
-        //log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
-        dap_db_driver_sqlite_free(l_error_message);
+    char *l_query_str;
+    if (a_count_out && *a_count_out)
+        l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" WHERE obj_id >= '%"DAP_UINT64_FORMAT_U"' "
+                                        "ORDER BY obj_id ASC LIMIT %d", a_group, a_id, *a_count_out);
+    else
+        l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" WHERE obj_id >= '%"DAP_UINT64_FORMAT_U"' "
+                                        "ORDER BY obj_id ASC", a_group, a_id);
+    PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1);
+    s_pgsql_free_connection(l_conn);
+    DAP_DELETE(l_query_str);
+    if (PQresultStatus(l_res) != PGRES_TUPLES_OK) {
+        log_it(L_ERROR, "Conditional read objects failed with message: \"%s\"", PQresultErrorMessage(l_res));
+        PQclear(l_res);
         return NULL;
     }
 
-    //int b = qlite3_column_count(s_db);
-    SQLITE_ROW_VALUE *l_row = NULL;
-    l_count_out = 0;
-    int l_count_sized = 0;
-    do {
-        l_ret = dap_db_driver_sqlite_fetch_array(l_res, &l_row);
-        if(l_ret != SQLITE_ROW && l_ret != SQLITE_DONE)
-        {
-           // log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
-        }
-        if(l_ret == SQLITE_ROW && l_row) {
-            // realloc memory
-            if(l_count_out >= l_count_sized) {
-                l_count_sized += 10;
-                l_obj = DAP_REALLOC(l_obj, sizeof(dap_store_obj_t) * (uint64_t)l_count_sized);
-                memset(l_obj + l_count_out, 0, sizeof(dap_store_obj_t) * (uint64_t)(l_count_sized - l_count_out));
-            }
-            // fill current item
-            dap_store_obj_t *l_obj_cur = l_obj + l_count_out;
-            fill_one_item(a_group, l_obj_cur, l_row);
-            l_count_out++;
-        }
-        dap_db_driver_sqlite_row_free(l_row);
-    } while(l_row);
-
-    dap_db_driver_sqlite_query_free(l_res);
-
-    if(a_count_out)
-        *a_count_out = (size_t)l_count_out;
+    // parse reply
+    size_t l_count = PQntuples(l_res);
+    dap_store_obj_t *l_obj = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(dap_store_obj_t) * l_count);
+    for (int i = 0; i < l_count; i++) {
+        // fill currrent item
+        dap_store_obj_t *l_obj_cur = l_obj + i;
+        s_pgsql_fill_object(a_group, l_obj_cur, l_res, i);
+    }
+    PQclear(l_res);
+    if (a_count_out)
+        *a_count_out = l_count;
     return l_obj;
 }
 
 
-
-dap_list_t* dap_db_driver_sqlite_get_groups_by_mask(const char *a_group_mask)
+dap_list_t *dap_db_driver_pgsql_get_groups_by_mask(const char *a_group_mask)
 {
-    if(!a_group_mask || !s_db)
+    if (!a_group_mask)
         return NULL;
-    sqlite3_stmt *l_res;
-    const char *l_str_query = "SELECT name FROM sqlite_master WHERE type ='table' AND name NOT LIKE 'sqlite_%'";
-    dap_list_t *l_ret_list = NULL;
-    pthread_rwlock_wrlock(&s_db_rwlock);
-    int l_ret = dap_db_driver_sqlite_query(s_db, (char *)l_str_query, &l_res, NULL);
-    pthread_rwlock_unlock(&s_db_rwlock);
-    if(l_ret != SQLITE_OK) {
-        //log_it(L_ERROR, "Get tables l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
+    PGconn *l_conn = s_pgsql_get_connection();
+    if (!l_conn) {
+        log_it(L_ERROR, "Can't pick PostgreSQL connection from pool");
         return NULL;
     }
-    char * l_mask = dap_db_driver_sqlite_make_table_name(a_group_mask);
-    SQLITE_ROW_VALUE *l_row = NULL;
-    while (dap_db_driver_sqlite_fetch_array(l_res, &l_row) == SQLITE_ROW && l_row) {
-        char *l_table_name = (char *)l_row->val->val.val_str;
-        if(!dap_fnmatch(l_mask, l_table_name, 0))
-            l_ret_list = dap_list_prepend(l_ret_list, dap_db_driver_sqlite_make_group_name(l_table_name));
-        dap_db_driver_sqlite_row_free(l_row);
+    const char *l_query_str = "SELECT tablename FROM pg_catalog.pg_tables WHERE "
+                              "schemaname != 'information_schema' AND schemaname != 'pg_catalog'";
+    PGresult *l_res = PQexec(l_conn, l_query_str);
+    s_pgsql_free_connection(l_conn);
+    if (PQresultStatus(l_res) != PGRES_TUPLES_OK) {
+        log_it(L_ERROR, "Read tables failed with message: \"%s\"", PQresultErrorMessage(l_res));
+        PQclear(l_res);
+        return NULL;
     }
-    dap_db_driver_sqlite_query_free(l_res);
+
+    dap_list_t *l_ret_list = NULL;
+    for (int i = 0; i < PQntuples(l_res); i++) {
+        char *l_table_name = (char *)PQgetvalue(l_res, i, 0);
+        if(!dap_fnmatch(a_group_mask, l_table_name, 0))
+            l_ret_list = dap_list_prepend(l_ret_list, l_table_name);
+    }
+    PQclear(l_res);
     return l_ret_list;
 }
 
-size_t dap_db_driver_sqlite_read_count_store(const char *a_group, uint64_t a_id)
+size_t dap_db_driver_pgsql_read_count_store(const char *a_group, uint64_t a_id)
 {
-    sqlite3_stmt *l_res;
-    if(!a_group || ! s_db)
+    if (!a_group)
         return 0;
-
-    char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group);
-    char *l_str_query = sqlite3_mprintf("SELECT COUNT(*) FROM '%s' WHERE id>='%lld'", l_table_name, a_id);
-    pthread_rwlock_wrlock(&s_db_rwlock);
-    int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, NULL);
-    pthread_rwlock_unlock(&s_db_rwlock);
-    sqlite3_free(l_str_query);
-    DAP_DEL_Z(l_table_name);
-
-    if(l_ret != SQLITE_OK) {
-        //log_it(L_ERROR, "Count l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
+    PGconn *l_conn = s_pgsql_get_connection();
+    if (!l_conn) {
+        log_it(L_ERROR, "Can't pick PostgreSQL connection from pool");
         return 0;
     }
-    size_t l_ret_val;
-    SQLITE_ROW_VALUE *l_row = NULL;
-    if (dap_db_driver_sqlite_fetch_array(l_res, &l_row) == SQLITE_ROW && l_row) {
-        l_ret_val = (size_t)l_row->val->val.val_int64;
-        dap_db_driver_sqlite_row_free(l_row);
+    char *l_query_str=  dap_strdup_printf("SELECT count(*) FROM \"%s\" WHERE obj_id >= '%"DAP_UINT64_FORMAT_U"'",
+                                          a_group, a_id);
+    PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1);
+    s_pgsql_free_connection(l_conn);
+    DAP_DELETE(l_query_str);
+    if (PQresultStatus(l_res) != PGRES_TUPLES_OK) {
+        log_it(L_ERROR, "Count objects failed with message: \"%s\"", PQresultErrorMessage(l_res));
+        PQclear(l_res);
+        return 0;
     }
-    dap_db_driver_sqlite_query_free(l_res);
-    return l_ret_val;
+    size_t l_ret = be64toh(*(uint64_t *)PQgetvalue(l_res, 0, 0));
+    PQclear(l_res);
+    return l_ret;
 }
 
-bool dap_db_driver_sqlite_is_obj(const char *a_group, const char *a_key)
+bool dap_db_driver_pgsql_is_obj(const char *a_group, const char *a_key)
 {
-    sqlite3_stmt *l_res;
-    if(!a_group || ! s_db)
-        return false;
-
-    char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group);
-    char *l_str_query = sqlite3_mprintf("SELECT EXISTS(SELECT * FROM '%s' WHERE key='%s')", l_table_name, a_key);
-    pthread_rwlock_wrlock(&s_db_rwlock);
-    int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, NULL);
-    pthread_rwlock_unlock(&s_db_rwlock);
-    sqlite3_free(l_str_query);
-    DAP_DEL_Z(l_table_name);
+    if (!a_group)
+        return NULL;
+    PGconn *l_conn = s_pgsql_get_connection();
+    if (!l_conn) {
+        log_it(L_ERROR, "Can't pick PostgreSQL connection from pool");
+        return NULL;
+    }
+    char *l_query_str = dap_strdup_printf("SELECT EXISTS(SELECT * FROM \"%s\" WHERE obj_key = '%s')", a_group, a_key);
+    PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1);
+    s_pgsql_free_connection(l_conn);
+    DAP_DELETE(l_query_str);
+    if (PQresultStatus(l_res) != PGRES_TUPLES_OK) {
+        log_it(L_ERROR, "Count objects failed with message: \"%s\"", PQresultErrorMessage(l_res));
+        PQclear(l_res);
+        return 0;
+    }
+    int l_ret = *PQgetvalue(l_res, 0, 0);
+    PQclear(l_res);
+    return l_ret;
+}
 
-    if(l_ret != SQLITE_OK) {
-        //log_it(L_ERROR, "Exists l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
-        return false;
+int dap_db_driver_pgsql_flush()
+{
+    PGconn *l_conn = s_pgsql_get_connection();
+    if (!l_conn) {
+        log_it(L_ERROR, "Can't pick PostgreSQL connection from pool");
+        return -4;
     }
-    bool l_ret_val;
-    SQLITE_ROW_VALUE *l_row = NULL;
-    if (dap_db_driver_sqlite_fetch_array(l_res, &l_row) == SQLITE_ROW && l_row) {
-        l_ret_val = (size_t)l_row->val->val.val_int64;
-        dap_db_driver_sqlite_row_free(l_row);
+    int l_ret = 0;
+    PGresult *l_res = PQexec(l_conn, "CHECKPOINT");
+    if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
+        log_it(L_ERROR, "Flushing database on disk failed with message: \"%s\"", PQresultErrorMessage(l_res));
+        l_ret = -5;
     }
-    dap_db_driver_sqlite_query_free(l_res);
-    return l_ret_val;
+    PQclear(l_res);
+    if (!l_ret) {
+        PGresult *l_res = PQexec(l_conn, "VACUUM");
+        if (PQresultStatus(l_res) != PGRES_COMMAND_OK) {
+            log_it(L_ERROR, "Vaccuming database failed with message: \"%s\"", PQresultErrorMessage(l_res));
+            l_ret = -6;
+        }
+        PQclear(l_res);
+    }
+    s_pgsql_free_connection(l_conn);
+    return l_ret;
 }
-#endif
diff --git a/modules/global-db/include/dap_chain_global_db_driver_pgsql.h b/modules/global-db/include/dap_chain_global_db_driver_pgsql.h
index 59f25eae4e..d6c3beb800 100644
--- a/modules/global-db/include/dap_chain_global_db_driver_pgsql.h
+++ b/modules/global-db/include/dap_chain_global_db_driver_pgsql.h
@@ -14,3 +14,9 @@ int dap_db_driver_pgsql_start_transaction(void);
 int dap_db_driver_pgsql_end_transaction(void);
 int dap_db_driver_pgsql_apply_store_obj(dap_store_obj_t *a_store_obj);
 dap_store_obj_t *dap_db_driver_pgsql_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out);
+dap_store_obj_t *dap_db_driver_pgsql_read_last_store_obj(const char *a_group);
+dap_store_obj_t *dap_db_driver_pgsql_read_cond_store_obj(const char *a_group, uint64_t a_id, size_t *a_count_out);
+dap_list_t *dap_db_driver_pgsql_get_groups_by_mask(const char *a_group_mask);
+size_t dap_db_driver_pgsql_read_count_store(const char *a_group, uint64_t a_id);
+bool dap_db_driver_pgsql_is_obj(const char *a_group, const char *a_key);
+int dap_db_driver_pgsql_flush();
-- 
GitLab