-
Roman Khlopkov authored91f94f85
dap_chain_global_db_driver.c 20.02 KiB
/*
* Authors:
* Alexander Lysikov <alexander.lysikov@demlabs.net>
* DeM Labs Inc. https://demlabs.net
* Kelvin Project https://github.com/kelvinblockchain
* Copyright (c) 2019
* All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stddef.h>
#include <errno.h>
#include <stdint.h>
#include <string.h>
#include <pthread.h>
#include <assert.h>
#include "dap_common.h"
#include "dap_file_utils.h"
#include "dap_strfuncs.h"
#include "dap_list.h"
#include "dap_hash.h"
#include "dap_chain_global_db_driver_sqlite.h"
#include "dap_chain_global_db_driver_cdb.h"
#include "dap_chain_global_db_driver_mdbx.h"
#include "dap_chain_global_db_driver_pgsql.h"
#include "dap_chain_global_db_driver.h"
#define LOG_TAG "db_driver"
// A selected database driver.
static char *s_used_driver = NULL;
//#define USE_WRITE_BUFFER
#ifdef USE_WRITE_BUFFER
static int save_write_buf(void);
// for write buffer
pthread_mutex_t s_mutex_add_start = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t s_mutex_add_end = PTHREAD_MUTEX_INITIALIZER;
//pthread_rwlock_rdlock
// new data in buffer to write
pthread_mutex_t s_mutex_cond = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t s_cond_add_end; // = PTHREAD_COND_INITIALIZER;
// writing ended
pthread_mutex_t s_mutex_write_end = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t s_cond_write_end; // = PTHREAD_COND_INITIALIZER;
dap_list_t *s_list_begin = NULL;
dap_list_t *s_list_end = NULL;
pthread_t s_write_buf_thread;
volatile static bool s_write_buf_state = 0;
static void* func_write_buf(void * arg);
#endif //USE_WRITE_BUFFER
static dap_db_driver_callbacks_t s_drv_callback;
/**
* @brief Initializes a database driver.
* @note You should Call this function before using the driver.
* @param driver_name a string determining a type of database driver:
* "сdb", "sqlite" ("sqlite3") or "pgsql"
* @param a_filename_db a path to a database file
* @return Returns 0, if successful; otherwise <0.
*/
int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db)
{
int l_ret = -1;
if(s_used_driver)
dap_db_driver_deinit();
// Fill callbacks with zeros
memset(&s_drv_callback, 0, sizeof(dap_db_driver_callbacks_t));
// Setup driver name
s_used_driver = dap_strdup(a_driver_name);
dap_mkdir_with_parents(a_filename_db);
// Compose path
char l_db_path_ext[strlen(a_driver_name) + strlen(a_filename_db) + 6];
dap_snprintf(l_db_path_ext, sizeof(l_db_path_ext), "%s/gdb-%s", a_filename_db, a_driver_name);
// Check for engine
if(!dap_strcmp(s_used_driver, "ldb"))
l_ret = -1;
else if(!dap_strcmp(s_used_driver, "sqlite") || !dap_strcmp(s_used_driver, "sqlite3") )
l_ret = dap_db_driver_sqlite_init(l_db_path_ext, &s_drv_callback);
else if(!dap_strcmp(s_used_driver, "cdb"))
l_ret = dap_db_driver_cdb_init(l_db_path_ext, &s_drv_callback);
#ifdef DAP_CHAIN_GDB_ENGINE_MDBX
else if(!dap_strcmp(s_used_driver, "mdbx"))
l_ret = dap_db_driver_mdbx_init(l_db_path_ext, &s_drv_callback);
#endif
#ifdef DAP_CHAIN_GDB_ENGINE_PGSQL
else if(!dap_strcmp(s_used_driver, "pgsql"))
l_ret = dap_db_driver_pgsql_init(l_db_path_ext, &s_drv_callback);
#endif
else
log_it(L_ERROR, "Unknown global_db driver \"%s\"", a_driver_name);
#ifdef USE_WRITE_BUFFER
if(!l_ret) {
pthread_condattr_t l_condattr;
pthread_condattr_init(&l_condattr);
pthread_condattr_setclock(&l_condattr, CLOCK_MONOTONIC);
pthread_cond_init(&s_cond_add_end, &l_condattr);
pthread_cond_init(&s_cond_write_end, &l_condattr);
// thread for save buffer to database
s_write_buf_state = true;
pthread_create(&s_write_buf_thread, NULL, func_write_buf, NULL);
}
#endif
return l_ret;
}
/**
* @brief Deinitializes a database driver.
* @note You should call this function after using the driver.
* @return (none)
*/
void dap_db_driver_deinit(void)
{
#ifdef USE_WRITE_BUFFER
// wait for close thread
{
pthread_mutex_lock(&s_mutex_cond);
pthread_cond_broadcast(&s_cond_add_end);
pthread_mutex_unlock(&s_mutex_cond);
s_write_buf_state = false;
pthread_join(s_write_buf_thread, NULL);
}
//save_write_buf();
pthread_mutex_lock(&s_mutex_add_end);
pthread_mutex_lock(&s_mutex_add_start);
while(s_list_begin != s_list_end) {
// free memory
dap_store_obj_free((dap_store_obj_t*) s_list_begin->data, 1);
dap_list_free1(s_list_begin);
s_list_begin = dap_list_next(s_list_begin);
}
//dap_store_obj_free((dap_store_obj_t*) s_list_begin->data, 1);
dap_list_free1(s_list_begin);
s_list_begin = s_list_end = NULL;
pthread_mutex_unlock(&s_mutex_add_start);
pthread_mutex_unlock(&s_mutex_add_end);
pthread_cond_destroy(&s_cond_add_end);
#endif
// deinit driver
if(s_drv_callback.deinit)
s_drv_callback.deinit();
if(s_used_driver){
DAP_DELETE(s_used_driver);
s_used_driver = NULL;
}
}
/**
* @brief Flushes a database cahce to disk.
* @return Returns 0, if successful; otherwise <0.
*/
int dap_db_driver_flush(void)
{
return s_drv_callback.flush();
}
/**
* @brief Copies objects from a_store_obj.
* @param a_store_obj a pointer to the source objects
* @param a_store_count a number of objects
* @return A pointer to the copied objects.
*/
dap_store_obj_t* dap_store_obj_copy(dap_store_obj_t *a_store_obj, size_t a_store_count)
{
if(!a_store_obj || !a_store_count)
return NULL;
dap_store_obj_t *l_store_obj = DAP_NEW_SIZE(dap_store_obj_t, sizeof(dap_store_obj_t) * a_store_count);
for(size_t i = 0; i < a_store_count; i++) {
dap_store_obj_t *l_store_obj_dst = l_store_obj + i;
dap_store_obj_t *l_store_obj_src = a_store_obj + i;
memcpy(l_store_obj_dst, l_store_obj_src, sizeof(dap_store_obj_t));
l_store_obj_dst->group = dap_strdup(l_store_obj_src->group);
l_store_obj_dst->key = dap_strdup(l_store_obj_src->key);
l_store_obj_dst->value = DAP_DUP_SIZE(l_store_obj_src->value, l_store_obj_src->value_len);
}
return l_store_obj;
}
/**
* @brief Deallocates memory of objects.
* @param a_store_obj a pointer to objects
* @param a_store_count a number of objects
* @return (none)
*/
void dap_store_obj_free(dap_store_obj_t *a_store_obj, size_t a_store_count)
{
if(!a_store_obj)
return;
for(size_t i = 0; i < a_store_count; i++) {
dap_store_obj_t *l_store_obj_cur = a_store_obj + i;
DAP_DELETE(l_store_obj_cur->group);
DAP_DELETE(l_store_obj_cur->key);
DAP_DELETE(l_store_obj_cur->value);
}
DAP_DELETE(a_store_obj);
}
/**
* @brief Calculates a hash of data.
* @param data a pointer to data
* @param data_size a size of data
* @return Returns a hash string if successful; otherwise NULL.
*/
char* dap_chain_global_db_driver_hash(const uint8_t *data, size_t data_size)
{
if(!data || !data_size)
return NULL;
dap_chain_hash_fast_t l_hash;
memset(&l_hash, 0, sizeof(dap_chain_hash_fast_t));
dap_hash_fast(data, data_size, &l_hash);
size_t a_str_max = (sizeof(l_hash.raw) + 1) * 2 + 2; /* heading 0x */
char *a_str = DAP_NEW_Z_SIZE(char, a_str_max);
size_t hash_len = (size_t)dap_chain_hash_fast_to_str(&l_hash, a_str, a_str_max);
if(!hash_len) {
DAP_DELETE(a_str);
return NULL;
}
return a_str;
}
/**
* @brief Waits for a buffer to be written.
* @param a_mutex a mutex
* @param a_cond a condition
* @param l_timeout_ms timeout in ms, if -1 endless waiting
* @return Returns 0 if successful or 1 when the timeout is due.
*/
static int wait_data(pthread_mutex_t *a_mutex, pthread_cond_t *a_cond, int l_timeout_ms)
{
int l_res = 0;
pthread_mutex_lock(a_mutex);
// endless waiting
if(l_timeout_ms == -1)
l_res = pthread_cond_wait(a_cond, a_mutex);
// waiting no more than timeout in milliseconds
else {
struct timespec l_to;
clock_gettime(CLOCK_MONOTONIC, &l_to);
int64_t l_nsec_new = l_to.tv_nsec + l_timeout_ms * 1000000ll;
// if the new number of nanoseconds is more than a second
if(l_nsec_new > (long) 1e9) {
l_to.tv_sec += l_nsec_new / (long) 1e9;
l_to.tv_nsec = l_nsec_new % (long) 1e9;
}
else
l_to.tv_nsec = (long) l_nsec_new;
l_res = pthread_cond_timedwait(a_cond, a_mutex, &l_to);
}
pthread_mutex_unlock(a_mutex);
if(l_res == ETIMEDOUT)
return 1;
return l_res;
}
#ifdef USE_WRITE_BUFFER
/**
* @brief Checks if a buffer is not empty.
* @return Returns true if the buffer is not empty, false if it is empty.
*/
static bool check_fill_buf(void)
{
dap_list_t *l_list_begin;
dap_list_t *l_list_end;
pthread_mutex_lock(&s_mutex_add_start);
pthread_mutex_lock(&s_mutex_add_end);
l_list_end = s_list_end;
l_list_begin = s_list_begin;
pthread_mutex_unlock(&s_mutex_add_end);
pthread_mutex_unlock(&s_mutex_add_start);
bool l_ret = (l_list_begin != l_list_end) ? 1 : 0;
// if(l_ret)
// printf("** Wait s_beg=0x%x s_end=0x%x \n", l_list_begin, l_list_end);
return l_ret;
}
/**
* @brief Waits until the buffer is not empty.
* @return (none)
*/
static void wait_write_buf()
{
// printf("** Start wait data\n");
// wait data
while(1) {
if(!check_fill_buf())
break;
if(!wait_data(&s_mutex_write_end, &s_cond_write_end, 50))
break;
}
// printf("** End wait data\n");
}
/**
* @brief Saves data from a buffer to a database.
* @return 0
*/
static int save_write_buf(void)
{
dap_list_t *l_list_end;
// fix end of buffer
pthread_mutex_lock(&s_mutex_add_end);
l_list_end = s_list_end;
pthread_mutex_unlock(&s_mutex_add_end);
// save data from begin to fixed end
pthread_mutex_lock(&s_mutex_add_start);
if(s_list_begin != l_list_end) {
if(s_drv_callback.transaction_start)
s_drv_callback.transaction_start();
int cnt = 0;
while(s_list_begin != l_list_end) {
// apply to database
dap_store_obj_t *l_obj = s_list_begin->data;
assert(l_obj);
if(s_drv_callback.apply_store_obj) {
int l_ret_tmp = s_drv_callback.apply_store_obj(l_obj);
if(l_ret_tmp == 1) {
log_it(L_INFO, "item is missing (may be already deleted) %s/%s\n", l_obj->group, l_obj->key);
l_ret = 1;
}
if(l_ret_tmp < 0) {
log_it(L_ERROR, "Can't write item %s/%s\n", l_obj->group, l_obj->key);
l_ret -= 1;
}
/*if(!s_drv_callback.apply_store_obj(l_obj)) {
//log_it(L_INFO, "Write item Ok %s/%s\n", l_obj->group, l_obj->key);
}
else {
log_it(L_ERROR, "Can't write item %s/%s\n", l_obj->group, l_obj->key);
}*/
}
s_list_begin = dap_list_next(s_list_begin);
// printf("** ap2*record *l_beg=0x%x l_nex=0x%x d_beg=0x%x l_end=0x%x d_end=0x%x sl_end=0x%x\n", s_list_begin,
// s_list_begin->next, s_list_begin->data, l_list_end, l_list_end->data, s_list_end);
//printf("** free data=0x%x list=0x%x\n", s_list_begin->prev->data, s_list_begin->prev);
// free memory
dap_store_obj_free((dap_store_obj_t*) s_list_begin->prev->data, 1);
dap_list_free1(s_list_begin->prev);
s_list_begin->prev = NULL;
cnt++;
}
if(s_drv_callback.transaction_end)
s_drv_callback.transaction_end();
//printf("** writing ended cnt=%d\n", cnt);
// writing ended
pthread_mutex_lock(&s_mutex_write_end);
pthread_cond_broadcast(&s_cond_write_end);
pthread_mutex_unlock(&s_mutex_write_end);
}
pthread_mutex_unlock(&s_mutex_add_start);
return 0;
}
/**
* @brief A thread for saving data from buffer to a database
* @param arg NULL, is not used
* @return NULL
*/
static void* func_write_buf(void * arg)
{
while(1) {
if(!s_write_buf_state)
break;
//save_write_buf
if(save_write_buf() == 0) {
if(!s_write_buf_state)
break;
// wait data
wait_data(&s_mutex_cond, &s_cond_add_end, 2000); // 2 sec
}
}
return NULL;
}
#endif //USE_WRITE_BUFFER
/**
* @brief Applies objects to database.
* @param a_store an pointer to the objects
* @param a_store_count a number of objectss
* @return Returns 0, if successful.
*/
int dap_chain_global_db_driver_appy(pdap_store_obj_t a_store_obj, size_t a_store_count)
{
//dap_store_obj_t *l_store_obj = dap_store_obj_copy(a_store_obj, a_store_count);
if(!a_store_obj || !a_store_count)
return -1;
#ifdef USE_WRITE_BUFFER
// add all records into write buffer
pthread_mutex_lock(&s_mutex_add_end);
for(size_t i = 0; i < a_store_count; i++) {
dap_store_obj_t *l_store_obj_cur = dap_store_obj_copy(a_store_obj + i, 1);
// first record in buf
if(!s_list_end) {
s_list_end = dap_list_append(s_list_end, l_store_obj_cur);
pthread_mutex_lock(&s_mutex_add_start);
s_list_begin = s_list_end;
pthread_mutex_unlock(&s_mutex_add_start);
//log_it(L_DEBUG,"First record in list: *!!add record=0x%x / 0x%x obj=0x%x / 0x%x\n", s_list_end, s_list_end->data, s_list_end->prev);
}
else
s_list_end->data = l_store_obj_cur;
dap_list_append(s_list_end, NULL);
s_list_end = dap_list_last(s_list_end);
//log_it(L_DEBUG, "**+add record l_cur=0x%x / 0x%x l_new=0x%x / 0x%x\n", s_list_end->prev, s_list_end->prev->data,s_list_end, s_list_end->data);
}
// buffer changed
pthread_mutex_lock(&s_mutex_cond);
pthread_cond_broadcast(&s_cond_add_end);
pthread_mutex_unlock(&s_mutex_cond);
pthread_mutex_unlock(&s_mutex_add_end);
return 0;
#else
int l_ret = 0;
// apply to database
if(a_store_count > 1 && s_drv_callback.transaction_start)
s_drv_callback.transaction_start();
if(s_drv_callback.apply_store_obj)
for(size_t i = 0; i < a_store_count; i++) {
dap_store_obj_t *l_store_obj_cur = a_store_obj + i;
assert(l_store_obj_cur);
char *l_cur_key = dap_strdup(l_store_obj_cur->key);
int l_ret_tmp = s_drv_callback.apply_store_obj(l_store_obj_cur);
if(l_ret_tmp == 1) {
log_it(L_INFO, "Item is missing (may be already deleted) %s/%s\n", l_store_obj_cur->group, l_cur_key);
l_ret = 1;
}
if(l_ret_tmp < 0) {
log_it(L_ERROR, "Can't write item %s/%s (code %d)\n", l_store_obj_cur->group, l_cur_key, l_ret_tmp);
l_ret -= 1;
}
DAP_DEL_Z(l_cur_key);
if (l_ret)
break;
}
if(a_store_count > 1 && s_drv_callback.transaction_end)
s_drv_callback.transaction_end();
return l_ret;
#endif
}
/**
* @brief Adds objects to a database.
* @param a_store_obj objects to be added
* @param a_store_count a number of added objects
* @return Returns 0 if sucseesful.
*/
int dap_chain_global_db_driver_add(pdap_store_obj_t a_store_obj, size_t a_store_count)
{
for(size_t i = 0; i < a_store_count; i++)
a_store_obj[i].type = 'a';
return dap_chain_global_db_driver_appy(a_store_obj, a_store_count);
}
/**
* @brief Deletes objects from a database.
* @param a_store_obj objects to be deleted
* @param a_store_count a number of deleted objects
* @return Returns 0 if sucseesful.
*/
int dap_chain_global_db_driver_delete(pdap_store_obj_t a_store_obj, size_t a_store_count)
{
for(size_t i = 0; i < a_store_count; i++)
a_store_obj[i].type = 'd';
return dap_chain_global_db_driver_appy(a_store_obj, a_store_count);
}
/**
* @brief Gets a number of stored objects in a database by a_group and id.
* @param a_group the group name string
* @param a_id id
* @return Returns a number of objects.
*/
size_t dap_chain_global_db_driver_count(const char *a_group, uint64_t id)
{
size_t l_count_out = 0;
// read the number of items
if(s_drv_callback.read_count_store)
l_count_out = s_drv_callback.read_count_store(a_group, id);
return l_count_out;
}
/**
* @brief Gets a list of group names matching the pattern.
* Check whether the groups match the pattern a_group_mask, which is a shell wildcard pattern
* patterns: [] {} [!] * ? https://en.wikipedia.org/wiki/Glob_(programming).
* @param a_group_mask the group mask string
* @return If successful, returns the list of group names, otherwise NULL.
*/
dap_list_t *dap_chain_global_db_driver_get_groups_by_mask(const char *a_group_mask)
{
dap_list_t *l_list = NULL;
if(s_drv_callback.get_groups_by_mask)
l_list = s_drv_callback.get_groups_by_mask(a_group_mask);
return l_list;
}
/**
* @brief Reads last object in the database.
* @param a_group the group name
* @return If successful, a pointer to the object, otherwise NULL.
*/
dap_store_obj_t* dap_chain_global_db_driver_read_last(const char *a_group)
{
dap_store_obj_t *l_ret = NULL;
#ifdef USE_WRITE_BUFFER
// wait apply write buffer
wait_write_buf();
#endif
// read records using the selected database engine
if(s_drv_callback.read_last_store_obj)
l_ret = s_drv_callback.read_last_store_obj(a_group);
return l_ret;
}
/**
* @brief Reads several objects from a database by a_group and id.
* @param a_group the group name string
* @param a_id id
* @param a_count_out[in] a number of objects to be read, if 0 - no limits
* @param a_count_out[out] a count of objects that were read
* @return If successful, a pointer to an objects, otherwise NULL.
*/
dap_store_obj_t* dap_chain_global_db_driver_cond_read(const char *a_group, uint64_t id, size_t *a_count_out)
{
dap_store_obj_t *l_ret = NULL;
#ifdef USE_WRITE_BUFFER
// wait apply write buffer
wait_write_buf();
#endif
// read records using the selected database engine
if(s_drv_callback.read_cond_store_obj)
l_ret = s_drv_callback.read_cond_store_obj(a_group, id, a_count_out);
return l_ret;
}
/**
* @brief Reads several objects from a database by a_group and a_key.
* If a_key is NULL, reads whole group.
* @param a_group a group name string
* @param a_key an object key string. If equal NULL, it means reading the whole group
* @param a_count_out[in] a number of objects to be read, if 0 - no limits
* @param a_count_out[out] a number of objects that were read
* @return If successful, a pointer to an objects, otherwise NULL.
*/
dap_store_obj_t* dap_chain_global_db_driver_read(const char *a_group, const char *a_key, size_t *a_count_out)
{
dap_store_obj_t *l_ret = NULL;
#ifdef USE_WRITE_BUFFER
// wait apply write buffer
wait_write_buf();
#endif
// read records using the selected database engine
if(s_drv_callback.read_store_obj)
l_ret = s_drv_callback.read_store_obj(a_group, a_key, a_count_out);
return l_ret;
}
/**
* @brief Checks if an object is in a database by a_group and a_key.
* @param a_group a group name string
* @param a_key a object key string
* @return Returns true if it is, false otherwise.
*/
bool dap_chain_global_db_driver_is(const char *a_group, const char *a_key)
{
bool l_ret = NULL;
// read records using the selected database engine
if(s_drv_callback.is_obj)
l_ret = s_drv_callback.is_obj(a_group, a_key);
return l_ret;
}