From f4f39fce5a5dbc649f25c798133f1f1a4aa6f897 Mon Sep 17 00:00:00 2001
From: p-const <p.const@bk.ru>
Date: Mon, 3 Jun 2019 12:02:06 +0300
Subject: [PATCH] Almost completely functional database, still minor fixes need

---
 dap_chain_global_db_driver_cdb.c | 281 ++++++++++++++++---------------
 1 file changed, 147 insertions(+), 134 deletions(-)

diff --git a/dap_chain_global_db_driver_cdb.c b/dap_chain_global_db_driver_cdb.c
index 7d475c7..3f62116 100644
--- a/dap_chain_global_db_driver_cdb.c
+++ b/dap_chain_global_db_driver_cdb.c
@@ -24,6 +24,9 @@
 
 #include <stddef.h>
 #include <string.h>
+#include <dirent.h>
+#include <sys/stat.h>
+#include <uthash.h>
 #include "dap_common.h"
 #include "dap_hash.h"
 #include "dap_strfuncs.h"
@@ -33,19 +36,20 @@
 
 #define uint64_size sizeof(uint64_t)
 
-typedef enum {
-    CDB_GROUP_LOCAL_HIST,
-    CDB_GROUP_LOCAL_LAST_TS,
-    CDB_GROUP_LOCAL_GEN
-} CDB_group;
-
 typedef struct _obj_arg {
     pdap_store_obj_t o;
     uint64_t q;
     uint64_t n;
 } obj_arg, *pobj_arg;
 
-static CDB **s_cdb = NULL; // TODO: instead of this, there must be a local group manager with assigned paths
+typedef struct _cdb_instance {
+    CDB *cdb;
+    char *local_group;
+    UT_hash_handle hh;
+} cdb_instance, *pcdb_instance;
+
+static char *s_cdb_path = NULL;
+static pcdb_instance s_cdb = NULL;
 
 static inline void dap_cdb_uint_to_hex(char *arr, uint64_t val, short size) {
     short i = 0;
@@ -69,7 +73,7 @@ static inline uint64_t dap_cdb_hex_to_uint(char *arr, short size) {
     return val;
 }
 
-static void cdb_serialize_val_to_dap_store_obj2(pdap_store_obj_t a_obj, char *key, char *val) {
+static void cdb_serialize_val_to_dap_store_obj(pdap_store_obj_t a_obj, char *key, char *val) {
     if (!key || !val) {
         a_obj = NULL;
         return;
@@ -96,12 +100,94 @@ static void cdb_serialize_val_to_dap_store_obj2(pdap_store_obj_t a_obj, char *ke
     a_obj->timestamp = dap_cdb_hex_to_uint(l_rawtime, sizeof(time_t));
 }
 
-bool dap_cdb_get_last_obj_iter_callback2(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) {
+pcdb_instance *dap_cdb_init_group(char *a_group, int a_flags) {
+    pcdb_instance l_cdb_i = DAP_NEW(cdb_instance);
+    l_cdb_i->local_group = dap_strdup(a_group);
+    l_cdb_i->cdb = cdb_new();
+    char l_cdb_path[strlen(s_cdb_path) + strlen(a_group) + 2];
+    memset(l_cdb_path, '\0', strlen(s_cdb_path) + strlen(a_group) + 2);
+    strcat(l_cdb_path, s_cdb_path);
+    strcat(l_cdb_path, "/");
+    strcat(l_cdb_path, a_group);
+    cdb_options l_opts = { 1000000, 128, 1024 };
+    if ((dap_db_driver_cdb_options(&l_opts, l_cdb_i->cdb) != CDB_SUCCESS) ||
+            cdb_open(l_cdb_i->cdb, l_cdb_path, a_flags) < 0)
+    {
+        log_it(L_ERROR, "An error occured while opening CDB: \"%s\"", cdb_errmsg(cdb_errno(l_cdb_i->cdb)));
+        DAP_DELETE(l_cdb_i->cdb);
+        DAP_DELETE(l_cdb_i->local_group);
+        DAP_DELETE(l_cdb_i);
+        return NULL;
+    }
+    HASH_ADD_KEYPTR(hh, s_cdb, l_cdb_i->local_group, strlen(l_cdb_i->local_group), l_cdb_i);
+    return l_cdb_i;
+}
+
+int dap_db_driver_cdb_init(const char *a_cdb_path, dap_db_driver_callbacks_t *a_drv_callback) {
+    s_cdb_path = dap_strdup(a_cdb_path);
+    if(s_cdb_path[strlen(s_cdb_path)] == '/') {
+        s_cdb_path[strlen(s_cdb_path)] = '\0';
+    }
+    mkdir(s_cdb_path, 0755);
+    struct dirent *d;
+    DIR *dir = opendir(s_cdb_path);
+    if (!dir) {
+        log_it(L_ERROR, "Couldn't open db directory");
+        return -1;
+    }
+    for (d = readdir(dir); d; d = readdir(dir)) {
+        if (!dap_strcmp(d->d_name, ".") || !dap_strcmp(d->d_name, "..")) {
+            continue;
+        }
+        pcdb_instance l_cdb_i = dap_cdb_init_group(d->d_name, CDB_CREAT | CDB_PAGEWARMUP);
+        if (!l_cdb_i) {
+            dap_db_driver_cdb_deinit();
+            DAP_DELETE(s_cdb_path);
+            closedir(dir);
+            return -2;
+        }
+        CDBSTAT l_cdb_stat;
+        cdb_stat(l_cdb_i->cdb, &l_cdb_stat);
+        log_it(L_INFO, "Group \"%s\" found"             , l_cdb_i->local_group);
+        log_it(L_INFO, "Records: %-24u"                 , l_cdb_stat.rnum);
+        log_it(L_INFO, "Average read latency: %-24u"    , l_cdb_stat.rlatcy);
+        log_it(L_INFO, "Average write latency: %-24u"   , l_cdb_stat.wlatcy);
+    }
+    a_drv_callback->read_last_store_obj = dap_db_driver_cdb_read_last_store_obj;
+    a_drv_callback->apply_store_obj     = dap_db_driver_cdb_apply_store_obj;
+    a_drv_callback->read_store_obj      = dap_db_driver_cdb_read_store_obj;
+    a_drv_callback->read_cond_store_obj = dap_db_driver_cdb_read_cond_store_obj;
+    a_drv_callback->deinit              = dap_db_driver_cdb_deinit;
+
+    closedir(dir);
+    return CDB_SUCCESS;
+}
+
+CDB *dap_cdb_get_db_by_group(const char *a_group) {
+    pcdb_instance l_cdb_i = NULL;
+    HASH_FIND_STR(s_cdb, a_group, l_cdb_i);
+    if (!l_cdb_i) {
+        return NULL;
+    }
+    return l_cdb_i->cdb;
+}
+
+int dap_cdb_add_group(const char *a_group) {
+    char l_cdb_path[strlen(s_cdb_path) + strlen(a_group) + 2];
+    memset(l_cdb_path, '\0', strlen(s_cdb_path) + strlen(a_group) + 2);
+    strcat(l_cdb_path, s_cdb_path);
+    strcat(l_cdb_path, "/");
+    strcat(l_cdb_path, a_group);
+    mkdir(l_cdb_path, 0755);
+    return 0;
+}
+
+bool dap_cdb_get_last_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) {
     /* this is wrong! TODO: instead of 'oid' must checkout real 'arg->id' */
     /*if (oid != ((pobj_arg)arg)->q) {
         return true;
     }*/
-    cdb_serialize_val_to_dap_store_obj2((pdap_store_obj_t)(((pobj_arg)arg)->o), key, val);
+    cdb_serialize_val_to_dap_store_obj((pdap_store_obj_t)(((pobj_arg)arg)->o), key, val);
     return false;
 }
 
@@ -110,7 +196,7 @@ bool dap_cdb_get_some_obj_iter_callback(void *arg, const char *key, int ksize, c
     uint64_t q = ((pobj_arg)arg)->q;
     uint64_t n = ((pobj_arg)arg)->n;
     pdap_store_obj_t l_obj = (pdap_store_obj_t)((pobj_arg)arg)->o;
-    cdb_serialize_val_to_dap_store_obj2(&l_obj[n-q-1], key, val);
+    cdb_serialize_val_to_dap_store_obj(&l_obj[n-q-1], key, val);
     if (q == 0) {
         return false;
     }
@@ -121,127 +207,39 @@ bool dap_cdb_get_cond_obj_iter_callback(void *arg, const char *key, int ksize, c
     /* No need due to this implementation design */
 }
 
-CDB_group dap_db_cdb_define_group(const char* a_group) {
-    /* turns out to be useless thing...
-      TODO: create local groups on-demand */
-    if (!dap_strcmp(a_group, "global.history")) {
-        return CDB_GROUP_LOCAL_HIST;
-    } else if (!dap_strcmp(a_group, "local.node.last_ts")) {
-        return CDB_GROUP_LOCAL_LAST_TS;
-    } else return CDB_GROUP_LOCAL_GEN;
+int dap_db_driver_cdb_deinit() {
+    cdb_instance *cur_cdb, *tmp;
+    HASH_ITER(hh, s_cdb, cur_cdb, tmp) {
+        HASH_DEL(s_cdb, cur_cdb);
+        cdb_destroy(cur_cdb->cdb);
+    }
+    return CDB_SUCCESS;
 }
 
-int dap_db_driver_cdb_options(pcdb_options l_opts, CDB_group a_group) {
-        if (cdb_option(s_cdb[a_group],
+int dap_db_driver_cdb_options(pcdb_options l_opts, CDB* a_cdb) {
+        if (cdb_option(a_cdb,
                        l_opts->hsize,
                        l_opts->pcacheMB,
                        l_opts->rcacheMB) != CDB_SUCCESS) return -1;
     return CDB_SUCCESS;
 }
 
-int dap_db_driver_cdb_init(const char *a_path_db, dap_db_driver_callbacks_t *a_drv_callback) {
-    s_cdb = DAP_NEW_Z_SIZE(CDB*, 3 * sizeof(CDB*));
-    if (!(s_cdb[CDB_GROUP_LOCAL_HIST]           = cdb_new())
-            || !(s_cdb[CDB_GROUP_LOCAL_LAST_TS] = cdb_new())
-            || !(s_cdb[CDB_GROUP_LOCAL_GEN]     = cdb_new()))
-    {
-        log_it(L_ERROR, "Couldn't initialize CDB");
-        DAP_DELETE(s_cdb);
-        return -1;
-    }
-    cdb_options l_opts = { 1000000,
-                           128,
-                           1024
-                         };
-    if ((dap_db_driver_cdb_options(&l_opts, CDB_GROUP_LOCAL_HIST) != CDB_SUCCESS)
-            || (dap_db_driver_cdb_options(&l_opts, CDB_GROUP_LOCAL_LAST_TS) != CDB_SUCCESS)
-            || (dap_db_driver_cdb_options(&l_opts, CDB_GROUP_LOCAL_GEN) != CDB_SUCCESS))
-    {
-        log_it(L_ERROR, "Specified CDB options unapplicable");
-        return -2;
-    }
-    log_it(L_INFO, "CDB initialized");
-    char *l_cdb_path = DAP_NEW_Z_SIZE(char, strlen(a_path_db) + 32);
-    int ret = CDB_SUCCESS;
-
-    memset(l_cdb_path, '\0', strlen(a_path_db) + 32);
-    strcat(l_cdb_path, a_path_db);
-    strcat(l_cdb_path + strlen(a_path_db), "_global_history");
-    if (cdb_open(s_cdb[CDB_GROUP_LOCAL_HIST], l_cdb_path, CDB_CREAT | CDB_PAGEWARMUP) < 0) {
-        log_it(L_ERROR, "CDB database couldn't be opened: \"%s\"", cdb_errmsg(cdb_errno(s_cdb[CDB_GROUP_LOCAL_HIST])));
-        ret = -3;
-        goto ERR;
-    }
-
-    memset(l_cdb_path + strlen(a_path_db), '\0', 32);
-    strcat(l_cdb_path, "_local_node_last");
-    if (cdb_open(s_cdb[CDB_GROUP_LOCAL_LAST_TS], l_cdb_path, CDB_CREAT | CDB_PAGEWARMUP) < 0) {
-        log_it(L_ERROR, "CDB database couldn't be opened: \"%s\"", cdb_errmsg(cdb_errno(s_cdb[CDB_GROUP_LOCAL_LAST_TS])));
-        ret = -4;
-        goto ERR;
+dap_store_obj_t *dap_db_driver_cdb_read_last_store_obj(const char* a_group) {
+    if (!a_group) {
+        return NULL;
     }
-
-    memset(l_cdb_path + strlen(a_path_db), '\0', 32);
-    strcat(l_cdb_path, "_local_general");
-    if (cdb_open(s_cdb[CDB_GROUP_LOCAL_GEN], l_cdb_path, CDB_CREAT | CDB_PAGEWARMUP) < 0) {
-        log_it(L_ERROR, "CDB database couldn't be opened: \"%s\"", cdb_errmsg(cdb_errno(s_cdb[CDB_GROUP_LOCAL_GEN])));
-        ret = -5;
-        goto ERR;
+    CDB *l_cdb = dap_cdb_get_db_by_group(a_group);
+    if (!l_cdb) {
+        return NULL;
     }
-
     CDBSTAT l_cdb_stat;
-    cdb_stat(s_cdb[CDB_GROUP_LOCAL_GEN], &l_cdb_stat);
-    log_it(L_INFO, "Database \"%s\" opened"         , l_cdb_path);
-    log_it(L_INFO, "Records: %-24u"                 , l_cdb_stat.rnum);
-    log_it(L_INFO, "Average read latency: %-24u"    , l_cdb_stat.rlatcy);
-    log_it(L_INFO, "Average write latency: %-24u"   , l_cdb_stat.wlatcy);
-    cdb_stat(s_cdb[CDB_GROUP_LOCAL_HIST], &l_cdb_stat);
-    log_it(L_INFO, "Database \"%s\" opened"         , l_cdb_path);
-    log_it(L_INFO, "Records: %-24u"                 , l_cdb_stat.rnum);
-    log_it(L_INFO, "Average read latency: %-24u"    , l_cdb_stat.rlatcy);
-    log_it(L_INFO, "Average write latency: %-24u"   , l_cdb_stat.wlatcy);
-    cdb_stat(s_cdb[CDB_GROUP_LOCAL_LAST_TS], &l_cdb_stat);
-    log_it(L_INFO, "Database \"%s\" opened"         , l_cdb_path);
-    log_it(L_INFO, "Records: %-24u"                 , l_cdb_stat.rnum);
-    log_it(L_INFO, "Average read latency: %-24u"    , l_cdb_stat.rlatcy);
-    log_it(L_INFO, "Average write latency: %-24u"   , l_cdb_stat.wlatcy);
-    DAP_DELETE(l_cdb_path);
-
-    a_drv_callback->read_last_store_obj = dap_db_driver_cdb_read_last_store_obj;
-    a_drv_callback->apply_store_obj     = dap_db_driver_cdb_apply_store_obj;
-    a_drv_callback->read_store_obj      = dap_db_driver_cdb_read_store_obj;
-    a_drv_callback->read_cond_store_obj = dap_db_driver_cdb_read_cond_store_obj;
-    a_drv_callback->deinit              = dap_db_driver_cdb_deinit;
-
-    return CDB_SUCCESS;
-ERR:
-    cdb_destroy(s_cdb[0]);
-    cdb_destroy(s_cdb[1]);
-    cdb_destroy(s_cdb[2]);
-    DAP_DELETE(l_cdb_path);
-    DAP_DELETE(s_cdb);
-    return ret;
-}
-
-int dap_db_driver_cdb_deinit() {
-    cdb_destroy(s_cdb[0]);
-    cdb_destroy(s_cdb[1]);
-    cdb_destroy(s_cdb[2]);
-    DAP_DELETE(s_cdb);
-    return CDB_SUCCESS;
-}
-
-dap_store_obj_t *dap_db_driver_cdb_read_last_store_obj(const char* a_group) {
-    if (!a_group) return NULL;
-    CDB_group l_group = dap_db_cdb_define_group(a_group);
-    CDBSTAT l_cdb_stat;
-    cdb_stat(s_cdb[l_group], &l_cdb_stat);
-    void *l_iter = cdb_iterate_new(s_cdb[l_group], l_cdb_stat.rnum);
+    cdb_stat(l_cdb, &l_cdb_stat);
+    void *l_iter = cdb_iterate_new(l_cdb, l_cdb_stat.rnum);
     obj_arg l_arg;
     l_arg.o = DAP_NEW_Z(dap_store_obj_t);
     l_arg.q = l_cdb_stat.rnum;
-    cdb_iterate(s_cdb[l_group], dap_cdb_get_last_obj_iter_callback2, (void*)&l_arg, l_iter);
-    cdb_iterate_destroy(s_cdb[l_group], l_iter);
+    cdb_iterate(l_cdb, dap_cdb_get_last_obj_iter_callback, (void*)&l_arg, l_iter);
+    cdb_iterate_destroy(l_cdb, l_iter);
     l_arg.o->group = dap_strdup(a_group);
     return l_arg.o;
 }
@@ -250,17 +248,21 @@ dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char *a_group, const cha
     if (!a_group) {
         return NULL;
     }
-    CDB_group l_group = dap_db_cdb_define_group(a_group);
+    //CDB_group l_group = dap_db_cdb_define_group(a_group);
+    CDB *l_cdb = dap_cdb_get_db_by_group(a_group);
+    if (!l_cdb) {
+        return NULL;
+    }
     dap_store_obj_t *l_obj = NULL;
     if (a_key) {
         char *l_value;
         int l_vsize;
-        cdb_get(s_cdb[l_group], a_key, strlen(a_key), (void**)&l_value, &l_vsize);
+        cdb_get(l_cdb, a_key, strlen(a_key), (void**)&l_value, &l_vsize);
         if (!l_value) {
             return NULL;
         }
         l_obj = DAP_NEW_Z(dap_store_obj_t);
-        cdb_serialize_val_to_dap_store_obj2(l_obj, a_key, l_value);
+        cdb_serialize_val_to_dap_store_obj(l_obj, a_key, l_value);
         l_obj->group = dap_strdup(a_group);
         cdb_free_val((void**)&l_value);
     } else {
@@ -269,7 +271,7 @@ dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char *a_group, const cha
             l_count_out = *a_count_out;
         }
         CDBSTAT l_cdb_stat;
-        cdb_stat(s_cdb[l_group], &l_cdb_stat);
+        cdb_stat(l_cdb, &l_cdb_stat);
         if ((l_count_out == 0) || (l_count_out > l_cdb_stat.rnum)) {
             l_count_out = l_cdb_stat.rnum;
         }
@@ -277,9 +279,9 @@ dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char *a_group, const cha
         l_arg.o = DAP_NEW_Z_SIZE(dap_store_obj_t, l_count_out * sizeof(dap_store_obj_t));
         l_arg.q = l_count_out;
         l_arg.n = l_count_out;
-        void *l_iter = cdb_iterate_new(s_cdb[l_group], 0);
-        l_count_out = cdb_iterate(s_cdb[l_group], dap_cdb_get_some_obj_iter_callback, (void*)&l_arg, l_iter);
-        cdb_iterate_destroy(s_cdb[l_group], l_iter);
+        void *l_iter = cdb_iterate_new(l_cdb, 0);
+        l_count_out = cdb_iterate(l_cdb, dap_cdb_get_some_obj_iter_callback, (void*)&l_arg, l_iter);
+        cdb_iterate_destroy(l_cdb, l_iter);
         if(a_count_out) {
             *a_count_out = l_count_out;
         }
@@ -295,14 +297,17 @@ dap_store_obj_t* dap_db_driver_cdb_read_cond_store_obj(const char *a_group, uint
     if (!a_group) {
         return NULL;
     }
-    CDB_group l_group = dap_db_cdb_define_group(a_group);
+    CDB *l_cdb = dap_cdb_get_db_by_group(a_group);
+    if (!l_cdb) {
+        return NULL;
+    }
     dap_store_obj_t *l_obj = NULL;
     uint64_t l_count_out = 0;
     if(a_count_out) {
         l_count_out = *a_count_out;
     }
     CDBSTAT l_cdb_stat;
-    cdb_stat(s_cdb[l_group], &l_cdb_stat);
+    cdb_stat(l_cdb, &l_cdb_stat);
     if ((l_count_out == 0) || (l_count_out > l_cdb_stat.rnum)) {
         l_count_out = l_cdb_stat.rnum;
     }
@@ -310,9 +315,9 @@ dap_store_obj_t* dap_db_driver_cdb_read_cond_store_obj(const char *a_group, uint
     l_arg.o = DAP_NEW_Z_SIZE(dap_store_obj_t, l_count_out * sizeof(dap_store_obj_t));
     l_arg.q = l_count_out;
     l_arg.q = l_count_out;
-    void *l_iter = cdb_iterate_new(s_cdb[l_group], a_id + 1); // wrong! TODO: make use of obj->id
-    l_count_out = cdb_iterate(s_cdb[l_group], dap_cdb_get_some_obj_iter_callback, (void*)&l_arg, l_iter);
-    cdb_iterate_destroy(s_cdb[l_group], l_iter);
+    void *l_iter = cdb_iterate_new(l_cdb, a_id + 1); // wrong! TODO: make use of obj->id
+    l_count_out = cdb_iterate(l_cdb, dap_cdb_get_some_obj_iter_callback, (void*)&l_arg, l_iter);
+    cdb_iterate_destroy(l_cdb, l_iter);
     if(a_count_out) {
         *a_count_out = l_count_out;
     }
@@ -326,7 +331,12 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) {
     if(!a_store_obj || !a_store_obj->group) {
         return -1;
     }
-    CDB_group l_group = dap_db_cdb_define_group(a_store_obj->group);
+    CDB *l_cdb = dap_cdb_get_db_by_group(a_store_obj->group);
+    if (!l_cdb) {
+        dap_cdb_add_group(a_store_obj->group);
+        pcdb_instance l_cdb_i = dap_cdb_init_group(a_store_obj->group, CDB_CREAT | CDB_PAGEWARMUP);
+        l_cdb = l_cdb_i->cdb;
+    }
     if(a_store_obj->type == 'a') {
         if(!a_store_obj->key || !a_store_obj->value || !a_store_obj->value_len) return -2;
         cdb_record l_rec;
@@ -346,12 +356,15 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) {
         dap_cdb_uint_to_hex(l_val + offset, l_time, sizeof(time_t));
         offset += sizeof(time_t);
         l_rec.val = l_val;
-        cdb_set(s_cdb[l_group], l_rec.key, strlen(l_rec.key), l_rec.val, offset);
+        cdb_set(l_cdb, l_rec.key, strlen(l_rec.key), l_rec.val, offset);
+        DAP_DELETE(l_rec.key);
+        DAP_DELETE(l_rec.val);
     } else if(a_store_obj->type == 'd') {
         if(a_store_obj->key) {
-            cdb_del(s_cdb[l_group], a_store_obj->key, strlen(a_store_obj->key));
+            cdb_del(l_cdb, a_store_obj->key, strlen(a_store_obj->key));
         } else {
-            cdb_close(s_cdb[l_group]); // TODO: re-open a cdb the appropriate path using the TRUNCATE arg
+            cdb_destroy(l_cdb);
+            dap_cdb_init_group(a_store_obj->group, CDB_TRUNC | CDB_PAGEWARMUP);
         }
     }
     return 0;
-- 
GitLab