/* * Authors: * Roman Khlopkov <roman.khlopkov@demlabs.net> * DeM Labs Inc. https://demlabs.net * CellFrame https://cellframe.net * Sources https://gitlab.demlabs.net/cellframe * Copyright (c) 2017-2021 * All rights reserved. This file is part of CellFrame SDK the open source project CellFrame SDK is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. CellFrame SDK is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with any CellFrame SDK based project. If not, see <http://www.gnu.org/licenses/>. */ #include <stddef.h> #include <string.h> #include <pthread.h> #include <errno.h> #include <pwd.h> #include <fcntl.h> #include <unistd.h> #include <sys/stat.h> #include "dap_common.h" #include "dap_hash.h" #include "dap_file_utils.h" #include "dap_strfuncs.h" #include "dap_file_utils.h" #include "dap_chain_global_db_driver_pgsql.h" #define LOG_TAG "db_pgsql" #ifdef DAP_CHAIN_GDB_ENGINE_PGSQL struct dap_pgsql_conn_pool_item { PGconn *conn; int busy; }; static PGconn *s_trans_conn = NULL; static struct dap_pgsql_conn_pool_item s_conn_pool[DAP_PGSQL_POOL_COUNT]; static pthread_rwlock_t s_db_rwlock = PTHREAD_RWLOCK_INITIALIZER; static PGconn *s_pgsql_get_connection(void) { if (pthread_rwlock_rdlock(&s_db_rwlock) == EDEADLK) { return s_trans_conn; } PGconn *l_ret = NULL; for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) { if (!s_conn_pool[i].busy) { l_ret = s_conn_pool[i].conn; s_conn_pool[i].busy = 1; break; } } pthread_rwlock_unlock(&s_db_rwlock); return l_ret; } static void s_pgsql_free_connection(PGconn *a_conn) { if (pthread_rwlock_rdlock(&s_db_rwlock) == EDEADLK) { return; } for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) { if (s_conn_pool[i].conn == a_conn) { s_conn_pool[i].busy = 0; } } pthread_rwlock_unlock(&s_db_rwlock); } /** * SQLite library initialization, no thread safe * * return 0 if Ok, else error code >0 */ int dap_db_driver_pgsql_init(const char *a_filename_dir, dap_db_driver_callbacks_t *a_drv_callback) { dap_hash_fast_t l_dir_hash; dap_hash_fast(a_filename_dir, strlen(a_filename_dir), &l_dir_hash); char l_db_name[DAP_PGSQL_DBHASHNAME_LEN + 1]; dap_htoa64(l_db_name, l_dir_hash.raw, DAP_PGSQL_DBHASHNAME_LEN); l_db_name[DAP_PGSQL_DBHASHNAME_LEN] = '\0'; if (!dap_dir_test(a_filename_dir) || !readdir(opendir(a_filename_dir))) { // Create PostgreSQL database const char *l_base_conn_str = "dbname = postgres"; PGconn *l_base_conn = PQconnectdb(l_base_conn_str); if (PQstatus(l_base_conn) != CONNECTION_OK) { log_it(L_ERROR, "Can't init PostgreSQL database: \"%s\"", PQerrorMessage(l_base_conn)); PQfinish(l_base_conn); return -2; } char *l_query_str = dap_strdup_printf("DROP DATABASE IF EXISTS \"%s\"", l_db_name); PGresult *l_res = PQexec(l_base_conn, l_query_str); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { log_it(L_ERROR, "Drop database failed: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); PQfinish(l_base_conn); return -3; } PQclear(l_res); l_query_str = dap_strdup_printf("DROP TABLESPACE IF EXISTS \"%s\"", l_db_name); l_res = PQexec(l_base_conn, l_query_str); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { log_it(L_ERROR, "Drop tablespace failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); PQfinish(l_base_conn); return -4; } PQclear(l_res); // Check paths and create them if nessesary if (!dap_dir_test(a_filename_dir)) { log_it(L_NOTICE, "No directory %s, trying to create...", a_filename_dir); dap_mkdir_with_parents(a_filename_dir); if (!dap_dir_test(a_filename_dir)) { char l_errbuf[255]; l_errbuf[0] = '\0'; strerror_r(errno, l_errbuf, sizeof(l_errbuf)); log_it(L_ERROR, "Can't create directory, error code %d, error string \"%s\"", errno, l_errbuf); return -1; } log_it(L_NOTICE,"Directory created"); chown(a_filename_dir, getpwnam("postgres")->pw_uid, -1); } l_query_str = dap_strdup_printf("CREATE TABLESPACE \"%s\" LOCATION '%s'", l_db_name, a_filename_dir); l_res = PQexec(l_base_conn, l_query_str); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { log_it(L_ERROR, "Create tablespace failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); PQfinish(l_base_conn); return -5; } chmod(a_filename_dir, S_IRWXU | S_IRWXG | S_IRWXO); PQclear(l_res); l_query_str = dap_strdup_printf("CREATE DATABASE \"%s\" WITH TABLESPACE \"%s\"", l_db_name, l_db_name); l_res = PQexec(l_base_conn, l_query_str); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { log_it(L_ERROR, "Create database failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); PQfinish(l_base_conn); return -6; } PQclear(l_res); PQfinish(l_base_conn); } // Create connection pool for the DAP database char *l_conn_str = dap_strdup_printf("dbname = %s", l_db_name); for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) { s_conn_pool[i].conn = PQconnectdb(l_conn_str); s_conn_pool[i].busy = 0; if (PQstatus(s_conn_pool[i].conn) != CONNECTION_OK) { log_it(L_ERROR, "Can't connect PostgreSQL database: \"%s\"", PQerrorMessage(s_conn_pool[i].conn)); DAP_DELETE(l_conn_str); for (int j = 0; j <= i; j++) PQfinish(s_conn_pool[j].conn); return -7; } } DAP_DELETE(l_conn_str); pthread_rwlock_init(&s_db_rwlock, 0); a_drv_callback->transaction_start = dap_db_driver_pgsql_start_transaction; a_drv_callback->transaction_end = dap_db_driver_pgsql_end_transaction; a_drv_callback->apply_store_obj = dap_db_driver_pgsql_apply_store_obj; a_drv_callback->read_store_obj = dap_db_driver_pgsql_read_store_obj; a_drv_callback->read_cond_store_obj = dap_db_driver_pgsql_read_cond_store_obj; a_drv_callback->read_last_store_obj = dap_db_driver_pgsql_read_last_store_obj; a_drv_callback->get_groups_by_mask = dap_db_driver_pgsql_get_groups_by_mask; a_drv_callback->read_count_store = dap_db_driver_pgsql_read_count_store; a_drv_callback->is_obj = dap_db_driver_pgsql_is_obj; a_drv_callback->deinit = dap_db_driver_pgsql_deinit; a_drv_callback->flush = dap_db_driver_pgsql_flush; return 0; } int dap_db_driver_pgsql_deinit(void) { pthread_rwlock_wrlock(&s_db_rwlock); for (int j = 0; j <= DAP_PGSQL_POOL_COUNT; j++) PQfinish(s_conn_pool[j].conn); pthread_rwlock_unlock(&s_db_rwlock); pthread_rwlock_destroy(&s_db_rwlock); return 0; } /** * Start a transaction */ int dap_db_driver_pgsql_start_transaction(void) { s_trans_conn = s_pgsql_get_connection(); if (!s_trans_conn) return -1; pthread_rwlock_rdlock(&s_db_rwlock); PGresult *l_res = PQexec(s_trans_conn, "BEGIN"); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { log_it(L_ERROR, "Begin transaction failed with message: \"%s\"", PQresultErrorMessage(l_res)); pthread_rwlock_unlock(&s_db_rwlock); s_pgsql_free_connection(s_trans_conn); s_trans_conn = NULL; } return 0; } /** * End of transaction */ int dap_db_driver_pgsql_end_transaction(void) { if (s_trans_conn) return -1; PGresult *l_res = PQexec(s_trans_conn, "COMMIT"); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { log_it(L_ERROR, "End transaction failed with message: \"%s\"", PQresultErrorMessage(l_res)); } pthread_rwlock_unlock(&s_db_rwlock); s_pgsql_free_connection(s_trans_conn); s_trans_conn = NULL; return 0; } /** * Create table * * return 0 if Ok, else error code */ static int s_pgsql_create_group_table(const char *a_table_name, PGconn *a_conn) { if (!a_table_name) return -1; int l_ret = 0; char *l_query_str = dap_strdup_printf("CREATE TABLE \"%s\"" "(obj_id BIGSERIAL PRIMARY KEY, obj_ts BIGINT, " "obj_key TEXT UNIQUE, obj_val BYTEA)", a_table_name); PGresult *l_res = PQexec(a_conn, l_query_str); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { log_it(L_ERROR, "Create table failed with message: \"%s\"", PQresultErrorMessage(l_res)); l_ret = -3; } PQclear(l_res); return l_ret; } /** * Apply data (write or delete) * */ int dap_db_driver_pgsql_apply_store_obj(dap_store_obj_t *a_store_obj) { if (!a_store_obj || !a_store_obj->group) return -1; char *l_query_str = NULL; int l_ret = 0; PGresult *l_res = NULL; PGconn *l_conn = s_pgsql_get_connection(); if (!l_conn) { log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); return -2; } if (a_store_obj->type == 'a') { const char *l_param_vals[2]; time_t l_ts_to_store = htobe64(a_store_obj->timestamp); l_param_vals[0] = (const char *)&l_ts_to_store; l_param_vals[1] = (const char *)a_store_obj->value; int l_param_lens[2] = {sizeof(time_t), a_store_obj->value_len}; int l_param_formats[2] = {1, 1}; l_query_str = dap_strdup_printf("INSERT INTO \"%s\" (obj_ts, obj_key, obj_val) VALUES ($1, '%s', $2) " "ON CONFLICT (obj_key) DO UPDATE SET " "obj_id = EXCLUDED.obj_id, obj_ts = EXCLUDED.obj_ts, obj_val = EXCLUDED.obj_val;", a_store_obj->group, a_store_obj->key); // execute add request l_res = PQexecParams(l_conn, l_query_str, 2, NULL, l_param_vals, l_param_lens, l_param_formats, 0); DAP_DELETE(a_store_obj->value); DAP_DELETE(a_store_obj->key); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { if (a_store_obj->type == 'a' && s_pgsql_create_group_table(a_store_obj->group, l_conn) == 0) { PQclear(l_res); l_res = PQexecParams(l_conn, l_query_str, 2, NULL, l_param_vals, l_param_lens, l_param_formats, 0); } if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { log_it(L_ERROR, "Add object failed with message: \"%s\"", PQresultErrorMessage(l_res)); l_ret = -3; } } } else if (a_store_obj->type == 'd') { // delete one record if (a_store_obj->key) l_query_str = dap_strdup_printf("DELETE FROM \"%s\" WHERE obj_key = '%s'", a_store_obj->group, a_store_obj->key); // remove all group else l_query_str = dap_strdup_printf("DROP TABLE \"%s\"", a_store_obj->group); DAP_DELETE(a_store_obj->key); // execute delete request l_res = PQexec(l_conn, l_query_str); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { if (strcmp(PQresultErrorField(l_res, PG_DIAG_SQLSTATE), PGSQL_INVALID_TABLE)) log_it(L_ERROR, "Delete object failed with message: \"%s\"", PQresultErrorMessage(l_res)); l_ret = -4; } } else { log_it(L_ERROR, "Unknown store_obj type '0x%x'", a_store_obj->type); s_pgsql_free_connection(l_conn); return -5; } DAP_DELETE(l_query_str); PQclear(l_res); s_pgsql_free_connection(l_conn); return l_ret; } static void s_pgsql_fill_object(const char *a_group, dap_store_obj_t *a_obj, PGresult *a_res, int a_row) { a_obj->group = dap_strdup(a_group); for (int i = 0; i < PQnfields(a_res); i++) { if (i == PQfnumber(a_res, "obj_id")) { a_obj->id = be64toh(*(uint64_t *)PQgetvalue(a_res, a_row, i)); } else if (i == PQfnumber(a_res, "obj_ts")) { a_obj->timestamp = be64toh(*(time_t *)PQgetvalue(a_res, a_row, i)); } else if ((i == PQfnumber(a_res, "obj_key"))) { a_obj->key = dap_strdup(PQgetvalue(a_res, a_row, i)); } else if ((i == PQfnumber(a_res, "obj_val"))) { a_obj->value_len = PQgetlength(a_res, a_row, i); a_obj->value = DAP_DUP_SIZE(PQgetvalue(a_res, a_row, i), a_obj->value_len); } } } /** * Read several items * * a_group - group name * a_key - key name, may by NULL, it means reading the whole group * a_count_out[in], how many items to read, 0 - no limits * a_count_out[out], how many items was read */ dap_store_obj_t *dap_db_driver_pgsql_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out) { if (!a_group) return NULL; PGconn *l_conn = s_pgsql_get_connection(); if (!l_conn) { log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); return NULL; } char *l_query_str; if (a_key) { l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" WHERE obj_key = '%s'", a_group, a_key); } else { if (a_count_out && *a_count_out) l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" ORDER BY obj_id ASC LIMIT %d", a_group, *a_count_out); else l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" ORDER BY obj_id ASC", a_group); } PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1); s_pgsql_free_connection(l_conn); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { if (strcmp(PQresultErrorField(l_res, PG_DIAG_SQLSTATE), PGSQL_INVALID_TABLE)) log_it(L_ERROR, "Read objects failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); return NULL; } // parse reply size_t l_count = PQntuples(l_res); dap_store_obj_t *l_obj = l_count ? DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(dap_store_obj_t) * l_count) : NULL; for (int i = 0; i < l_count; i++) { // fill currrent item dap_store_obj_t *l_obj_cur = l_obj + i; s_pgsql_fill_object(a_group, l_obj_cur, l_res, i); } PQclear(l_res); if (a_count_out) *a_count_out = l_count; return l_obj; } /** * Read last item * * a_group - group name */ dap_store_obj_t *dap_db_driver_pgsql_read_last_store_obj(const char *a_group) { if (!a_group) return NULL; PGconn *l_conn = s_pgsql_get_connection(); if (!l_conn) { log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); return NULL; } char *l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" ORDER BY obj_id DESC LIMIT 1", a_group); PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1); s_pgsql_free_connection(l_conn); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { if (strcmp(PQresultErrorField(l_res, PG_DIAG_SQLSTATE), PGSQL_INVALID_TABLE)) log_it(L_ERROR, "Read last object failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); return NULL; } dap_store_obj_t *l_obj = NULL; if (PQntuples(l_res)) { l_obj = DAP_NEW_Z(dap_store_obj_t); s_pgsql_fill_object(a_group, l_obj, l_res, 0); } PQclear(l_res); return l_obj; } /** * Read several items with conditoin * * a_group - group name * a_id - read from this id * a_count_out[in], how many items to read, 0 - no limits * a_count_out[out], how many items was read */ dap_store_obj_t *dap_db_driver_pgsql_read_cond_store_obj(const char *a_group, uint64_t a_id, size_t *a_count_out) { if (!a_group) return NULL; PGconn *l_conn = s_pgsql_get_connection(); if (!l_conn) { log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); return NULL; } char *l_query_str; if (a_count_out && *a_count_out) l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" WHERE obj_id >= '%"DAP_UINT64_FORMAT_U"' " "ORDER BY obj_id ASC LIMIT %d", a_group, a_id, *a_count_out); else l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" WHERE obj_id >= '%"DAP_UINT64_FORMAT_U"' " "ORDER BY obj_id ASC", a_group, a_id); PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1); s_pgsql_free_connection(l_conn); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { if (strcmp(PQresultErrorField(l_res, PG_DIAG_SQLSTATE), PGSQL_INVALID_TABLE)) log_it(L_ERROR, "Conditional read objects failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); return NULL; } // parse reply size_t l_count = PQntuples(l_res); dap_store_obj_t *l_obj = l_count ? DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(dap_store_obj_t) * l_count) : NULL; for (int i = 0; i < l_count; i++) { // fill currrent item dap_store_obj_t *l_obj_cur = l_obj + i; s_pgsql_fill_object(a_group, l_obj_cur, l_res, i); } PQclear(l_res); if (a_count_out) *a_count_out = l_count; return l_obj; } dap_list_t *dap_db_driver_pgsql_get_groups_by_mask(const char *a_group_mask) { if (!a_group_mask) return NULL; PGconn *l_conn = s_pgsql_get_connection(); if (!l_conn) { log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); return NULL; } const char *l_query_str = "SELECT tablename FROM pg_catalog.pg_tables WHERE " "schemaname != 'information_schema' AND schemaname != 'pg_catalog'"; PGresult *l_res = PQexec(l_conn, l_query_str); s_pgsql_free_connection(l_conn); if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { log_it(L_ERROR, "Read tables failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); return NULL; } dap_list_t *l_ret_list = NULL; for (int i = 0; i < PQntuples(l_res); i++) { char *l_table_name = (char *)PQgetvalue(l_res, i, 0); if(!dap_fnmatch(a_group_mask, l_table_name, 0)) l_ret_list = dap_list_prepend(l_ret_list, dap_strdup(l_table_name)); } PQclear(l_res); return l_ret_list; } size_t dap_db_driver_pgsql_read_count_store(const char *a_group, uint64_t a_id) { if (!a_group) return 0; PGconn *l_conn = s_pgsql_get_connection(); if (!l_conn) { log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); return 0; } char *l_query_str = dap_strdup_printf("SELECT count(*) FROM \"%s\" WHERE obj_id >= '%"DAP_UINT64_FORMAT_U"'", a_group, a_id); PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1); s_pgsql_free_connection(l_conn); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { if (strcmp(PQresultErrorField(l_res, PG_DIAG_SQLSTATE), PGSQL_INVALID_TABLE)) log_it(L_ERROR, "Count objects failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); return 0; } size_t l_ret = be64toh(*(uint64_t *)PQgetvalue(l_res, 0, 0)); PQclear(l_res); return l_ret; } bool dap_db_driver_pgsql_is_obj(const char *a_group, const char *a_key) { if (!a_group) return NULL; PGconn *l_conn = s_pgsql_get_connection(); if (!l_conn) { log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); return NULL; } char *l_query_str = dap_strdup_printf("SELECT EXISTS(SELECT * FROM \"%s\" WHERE obj_key = '%s')", a_group, a_key); PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1); s_pgsql_free_connection(l_conn); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { if (strcmp(PQresultErrorField(l_res, PG_DIAG_SQLSTATE), PGSQL_INVALID_TABLE)) log_it(L_ERROR, "Existance check of object failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); return 0; } int l_ret = *PQgetvalue(l_res, 0, 0); PQclear(l_res); return l_ret; } int dap_db_driver_pgsql_flush() { PGconn *l_conn = s_pgsql_get_connection(); if (!l_conn) { log_it(L_ERROR, "Can't pick PostgreSQL connection from pool"); return -4; } int l_ret = 0; PGresult *l_res = PQexec(l_conn, "CHECKPOINT"); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { log_it(L_ERROR, "Flushing database on disk failed with message: \"%s\"", PQresultErrorMessage(l_res)); l_ret = -5; } PQclear(l_res); if (!l_ret) { PGresult *l_res = PQexec(l_conn, "VACUUM"); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { log_it(L_ERROR, "Vaccuming database failed with message: \"%s\"", PQresultErrorMessage(l_res)); l_ret = -6; } PQclear(l_res); } s_pgsql_free_connection(l_conn); return l_ret; } #endif