From 381e5f44c2f6102bbd1d7c5548407df822f83337 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Al=D0=B5x=D0=B0nder=20Lysik=D0=BEv?=
 <alexander.lysikov@demlabs.net>
Date: Tue, 28 May 2019 22:43:40 +0500
Subject: [PATCH] Fixed writing in global_db. Added SQLite fine tuning

---
 dap_chain_global_db.c               |  22 ++++--
 dap_chain_global_db.h               |   1 -
 dap_chain_global_db_driver.c        | 117 +++++++++++++++++++++++-----
 dap_chain_global_db_driver.h        |   5 +-
 dap_chain_global_db_driver_sqlite.c |  47 ++++++++++-
 dap_chain_global_db_driver_sqlite.h |   1 +
 6 files changed, 161 insertions(+), 32 deletions(-)

diff --git a/dap_chain_global_db.c b/dap_chain_global_db.c
index 31802e7..20d00b5 100755
--- a/dap_chain_global_db.c
+++ b/dap_chain_global_db.c
@@ -173,7 +173,10 @@ void dap_chain_global_db_deinit(void)
  */
 void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group)
 {
-    size_t count = 0;
+    dap_store_obj_t *l_store_data = dap_db_read_data(a_group, a_key, NULL);
+    return l_store_data;
+
+/*    size_t count = 0;
     if(!a_key)
         return NULL;
     size_t query_len = (size_t) snprintf(NULL, 0, "(&(cn=%s)(objectClass=%s))", a_key, a_group);
@@ -184,7 +187,7 @@ void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group)
     unlock();
     assert(count <= 1);
     DAP_DELETE(query);
-    return store_data;
+    return store_data;*/
 }
 
 /**
@@ -196,8 +199,15 @@ void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group)
  */
 uint8_t * dap_chain_global_db_gr_get(const char *a_key, size_t *a_data_len_out, const char *a_group)
 {
-    dap_store_obj_t *l_store_data = dap_db_read_data(a_group, a_key);
-    return l_store_data;
+    uint8_t *l_ret_value = NULL;
+    dap_store_obj_t *l_store_data = dap_db_read_data(a_group, a_key, a_data_len_out);
+    if(l_store_data) {
+        l_ret_value = (l_store_data->value) ? DAP_NEW_SIZE(uint8_t, l_store_data->value_len) : NULL; //ret_value = (store_data->value) ? strdup(store_data->value) : NULL;
+        memcpy(l_ret_value, l_store_data->value, l_store_data->value_len);
+        if(a_data_len_out)
+            *a_data_len_out = l_store_data->value_len;
+    }
+    return l_ret_value;
 
 /*ldb
  *     uint8_t *l_ret_value = NULL;
@@ -332,7 +342,7 @@ dap_global_db_obj_t** dap_chain_global_db_gr_load(const char *a_group, size_t *a
     size_t count = 0;
     // Read data
     lock();
-    pdap_store_obj_t store_obj = dap_db_read_data(l_query, &count);
+    pdap_store_obj_t store_obj = dap_db_read_data(a_group, NULL, &count);
     unlock();
     DAP_DELETE(l_query);
     // Serialization data
@@ -389,7 +399,7 @@ bool dap_chain_global_db_obj_save(void* a_store_data, size_t a_objs_count)
             size_t l_count = 0;
             char *l_query = dap_strdup_printf("(&(cn=%s)(objectClass=%s))", l_obj->key, l_obj->group);
             lock();
-            dap_store_obj_t *l_read_store_data = dap_db_read_data(l_query, &l_count);
+            dap_store_obj_t *l_read_store_data = dap_db_read_data(l_query,NULL, &l_count);
             unlock();
             // whether to add a record
             if(l_obj->type == 'a' && l_read_store_data) {
diff --git a/dap_chain_global_db.h b/dap_chain_global_db.h
index e0d01fe..e80ba73 100755
--- a/dap_chain_global_db.h
+++ b/dap_chain_global_db.h
@@ -44,7 +44,6 @@ void dap_chain_global_db_deinit(void);
 /**
  * Setup callbacks and filters
  */
-
 void dap_chain_global_db_add_history_group_prefix(const char * a_group_prefix); // Add group prefix that will be tracking all changes
 void dap_chain_global_db_add_history_callback_notify(const char * a_group_prefix,
                                                      dap_global_db_obj_callback_notify_t a_callback, void * a_arg);
diff --git a/dap_chain_global_db_driver.c b/dap_chain_global_db_driver.c
index 47455b2..2cd5054 100644
--- a/dap_chain_global_db_driver.c
+++ b/dap_chain_global_db_driver.c
@@ -46,11 +46,18 @@ static int save_write_buf(void);
 pthread_mutex_t s_mutex_add_start = PTHREAD_MUTEX_INITIALIZER;
 pthread_mutex_t s_mutex_add_end = PTHREAD_MUTEX_INITIALIZER;
 //pthread_rwlock_rdlock
+// new data in buffer to write
+pthread_mutex_t s_mutex_cond = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t s_cond_add_end; // = PTHREAD_COND_INITIALIZER;
+// writing ended
+pthread_mutex_t s_mutex_write_end = PTHREAD_MUTEX_INITIALIZER;
+pthread_cond_t s_cond_write_end; // = PTHREAD_COND_INITIALIZER;
+
 dap_list_t *s_list_begin = NULL;
 dap_list_t *s_list_end = NULL;
 
 pthread_t s_write_buf_thread;
+volatile static bool s_write_buf_state = 0;
 static void* func_write_buf(void * arg);
 
 static dap_db_driver_callbacks_t s_drv_callback;
@@ -77,7 +84,9 @@ int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db)
         pthread_condattr_init(&l_condattr);
         pthread_condattr_setclock(&l_condattr, CLOCK_MONOTONIC);
         pthread_cond_init(&s_cond_add_end, &l_condattr);
+        pthread_cond_init(&s_cond_write_end, &l_condattr);
         // thread for save buffer to database
+        s_write_buf_state = true;
         pthread_create(&s_write_buf_thread, NULL, func_write_buf, NULL);
     }
     return l_ret;
@@ -89,7 +98,17 @@ int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db)
 
 void dap_db_driver_deinit(void)
 {
-    save_write_buf();
+    // wait for close thread
+    {
+        pthread_mutex_lock(&s_mutex_cond);
+        pthread_cond_broadcast(&s_cond_add_end);
+        pthread_mutex_unlock(&s_mutex_cond);
+
+        s_write_buf_state = false;
+        pthread_join(s_write_buf_thread, NULL);
+    }
+
+    //save_write_buf();
     pthread_mutex_lock(&s_mutex_add_end);
     pthread_mutex_lock(&s_mutex_add_start);
     while(s_list_begin != s_list_end) {
@@ -103,9 +122,12 @@ void dap_db_driver_deinit(void)
     s_list_begin = s_list_end = NULL;
     pthread_mutex_unlock(&s_mutex_add_start);
     pthread_mutex_unlock(&s_mutex_add_end);
+    // deinit driver
     if(s_drv_callback.deinit)
         s_drv_callback.deinit();
 
+    pthread_cond_destroy(&s_cond_add_end);
+
 }
 
 dap_store_obj_t* dap_store_obj_copy(dap_store_obj_t *a_store_obj, size_t a_store_count)
@@ -163,13 +185,13 @@ char* dap_db_driver_db_hash(const uint8_t *data, size_t data_size)
  * Wait data to write buffer
  * return 0 - Ok, 1 - timeout
  */
-static int wait_data(int l_timeout_ms)
+static int wait_data(pthread_mutex_t *a_mutex, pthread_cond_t *a_cond, int l_timeout_ms)
 {
     int l_res = 0;
-    pthread_mutex_lock(&s_mutex_add_end);
+    pthread_mutex_lock(a_mutex);
     // endless waiting
     if(l_timeout_ms == -1)
-        l_res = pthread_cond_wait(&s_cond_add_end, &s_mutex_add_end);
+        l_res = pthread_cond_wait(a_cond, a_mutex);
     // waiting no more than timeout in milliseconds
     else {
         struct timespec l_to;
@@ -182,21 +204,55 @@ static int wait_data(int l_timeout_ms)
         }
         else
             l_to.tv_nsec = (long) l_nsec_new;
-        l_res = pthread_cond_timedwait(&s_cond_add_end, &s_mutex_add_end, &l_to);
+        l_res = pthread_cond_timedwait(a_cond, a_mutex, &l_to);
     }
-    pthread_mutex_unlock(&s_mutex_add_end);
+    pthread_mutex_unlock(a_mutex);
     if(l_res == ETIMEDOUT)
         return 1;
     return l_res;
 }
 
-static int save_write_buf(void)
+// return 0 if buffer empty, 1 data present
+static bool check_fill_buf(void)
 {
+    dap_list_t *l_list_begin;
     dap_list_t *l_list_end;
+    pthread_mutex_lock(&s_mutex_add_start);
     pthread_mutex_lock(&s_mutex_add_end);
     l_list_end = s_list_end;
+    l_list_begin = s_list_begin;
     pthread_mutex_unlock(&s_mutex_add_end);
+    pthread_mutex_unlock(&s_mutex_add_start);
+
+    bool l_ret = (l_list_begin != l_list_end) ? 1 : 0;
+//    if(l_ret)
+//        printf("** Wait s_beg=0x%x s_end=0x%x \n", l_list_begin, l_list_end);
+    return l_ret;
+}
+
+// wait apply write buffer
+static void wait_write_buf()
+{
+//    printf("** Start wait data\n");
+    // wait data
+    while(1) {
+        if(!check_fill_buf())
+            break;
+        if(!wait_data(&s_mutex_write_end, &s_cond_write_end, 50))
+            break;
+    }
+//    printf("** End wait data\n");
+}
 
+// save data from buffer to database
+static int save_write_buf(void)
+{
+    dap_list_t *l_list_end;
+    // fix end of buffer
+    pthread_mutex_lock(&s_mutex_add_end);
+    l_list_end = s_list_end;
+    pthread_mutex_unlock(&s_mutex_add_end);
+    // save data from begin to fixed end
     pthread_mutex_lock(&s_mutex_add_start);
     if(s_list_begin != l_list_end) {
         if(s_drv_callback.transaction_start)
@@ -216,30 +272,43 @@ static int save_write_buf(void)
             }
 
             s_list_begin = dap_list_next(s_list_begin);
+//            printf("** ap2*record *l_beg=0x%x l_nex=0x%x d_beg=0x%x l_end=0x%x d_end=0x%x sl_end=0x%x\n", s_list_begin,
+            //                  s_list_begin->next, s_list_begin->data, l_list_end, l_list_end->data, s_list_end);
+
+            //printf("** free data=0x%x list=0x%x\n", s_list_begin->prev->data, s_list_begin->prev);
             // free memory
             dap_store_obj_free((dap_store_obj_t*) s_list_begin->prev->data, 1);
             dap_list_free1(s_list_begin->prev);
             s_list_begin->prev = NULL;
             cnt++;
-
         }
         if(s_drv_callback.transaction_end)
             s_drv_callback.transaction_end();
+        printf("** writing ended cnt=%d\n", cnt);
+        // writing ended
+        pthread_mutex_lock(&s_mutex_write_end);
+        pthread_cond_broadcast(&s_cond_write_end);
+        pthread_mutex_unlock(&s_mutex_write_end);
     }
     pthread_mutex_unlock(&s_mutex_add_start);
-    // wait data
-    //wait_data
-
+    return 0;
 }
 
+// thread for save data from buffer to database
 static void* func_write_buf(void * arg)
 {
     while(1) {
+        if(!s_write_buf_state)
+            break;
         //save_write_buf
-        if(save_write_buf() == 0)
+        if(save_write_buf() == 0) {
+            if(!s_write_buf_state)
+                break;
             // wait data
-            wait_data(2000); // 2 sec
+            wait_data(&s_mutex_cond, &s_cond_add_end, 2000); // 2 sec
+        }
     }
+    pthread_exit(0);
 }
 
 int dap_db_add(pdap_store_obj_t a_store_obj, size_t a_store_count)
@@ -247,6 +316,7 @@ int dap_db_add(pdap_store_obj_t a_store_obj, size_t a_store_count)
     //dap_store_obj_t *l_store_obj = dap_store_obj_copy(a_store_obj, a_store_count);
     if(!a_store_obj || !a_store_count)
         return -1;
+    a_store_obj->type = 'a';
     // add all records into write buffer
     pthread_mutex_lock(&s_mutex_add_end);
     for(size_t i = 0; i < a_store_count; i++) {
@@ -257,27 +327,36 @@ int dap_db_add(pdap_store_obj_t a_store_obj, size_t a_store_count)
             pthread_mutex_lock(&s_mutex_add_start);
             s_list_begin = s_list_end;
             pthread_mutex_unlock(&s_mutex_add_start);
+            //printf("*!!add record=0x%x / 0x%x    obj=0x%x / 0x%x\n", s_list_end, s_list_end->data, s_list_end->prev);
         }
         else
             s_list_end->data = l_store_obj_cur;
-        s_list_end = dap_list_append(s_list_end, NULL);
+        dap_list_append(s_list_end, NULL);
         s_list_end = dap_list_last(s_list_end);
+        //printf("**+add record l_cur=0x%x / 0x%x l_new=0x%x / 0x%x\n", s_list_end->prev, s_list_end->prev->data,s_list_end, s_list_end->data);
     }
     // buffer changed
+    pthread_mutex_lock(&s_mutex_cond);
     pthread_cond_broadcast(&s_cond_add_end);
+    pthread_mutex_unlock(&s_mutex_cond);
+
     pthread_mutex_unlock(&s_mutex_add_end);
+    return 0;
 }
 
 int dap_db_delete(pdap_store_obj_t a_store_obj, size_t a_store_count)
 {
-    dap_db_add(a_store_obj, a_store_count);
+    a_store_obj->type = 'd';
+    return dap_db_add(a_store_obj, a_store_count);
 }
 
-dap_store_obj_t* dap_db_read_data(const char *a_group, const char *a_key)
+dap_store_obj_t* dap_db_read_data(const char *a_group, const char *a_key, size_t *count_out)
 {
-    // apply write buffer
-    save_write_buf();
+    dap_store_obj_t *l_ret = NULL;
+    // wait apply write buffer
+    wait_write_buf();
     // read record
     if(s_drv_callback.read_store_obj)
-        s_drv_callback.read_store_obj(a_group, a_key);
+        l_ret = s_drv_callback.read_store_obj(a_group, a_key, count_out);
+    return l_ret;
 }
diff --git a/dap_chain_global_db_driver.h b/dap_chain_global_db_driver.h
index 8a235ec..9d819aa 100644
--- a/dap_chain_global_db_driver.h
+++ b/dap_chain_global_db_driver.h
@@ -47,10 +47,9 @@ typedef struct dap_store_obj_pkt {
 }__attribute__((packed)) dap_store_obj_pkt_t;
 
 typedef int (*dap_db_driver_write_callback_t)(dap_store_obj_t*);
-typedef dap_store_obj_t* (*dap_db_driver_read_callback_t)(const char *,const char *);
+typedef dap_store_obj_t* (*dap_db_driver_read_callback_t)(const char *,const char *, size_t *);
 typedef int (*dap_db_driver_callback_t)(void);
 
-
 typedef struct dap_db_driver_callbacks {
     dap_db_driver_write_callback_t apply_store_obj;
     dap_db_driver_read_callback_t read_store_obj;
@@ -70,7 +69,7 @@ char* dap_db_driver_db_hash(const uint8_t *data, size_t data_size);
 
 int dap_db_add(pdap_store_obj_t a_store_obj, size_t a_store_count);
 int dap_db_delete(pdap_store_obj_t a_store_obj, size_t a_store_count);
-dap_store_obj_t* dap_db_read_data(const char *a_group, const char *a_key);
+dap_store_obj_t* dap_db_read_data(const char *a_group, const char *a_key, size_t *count_out);
 
 pdap_store_obj_t dap_db_read_file_data(const char *a_path, const char *a_group); // state of emergency only, if LDB database is inaccessible
 dap_store_obj_pkt_t *dap_store_packet_single(pdap_store_obj_t a_store_obj);
diff --git a/dap_chain_global_db_driver_sqlite.c b/dap_chain_global_db_driver_sqlite.c
index 3ea4c5d..ef8984b 100644
--- a/dap_chain_global_db_driver_sqlite.c
+++ b/dap_chain_global_db_driver_sqlite.c
@@ -62,6 +62,8 @@ typedef struct _SQLITE_ROW_VALUE_
     SQLITE_VALUE *val; // array of field values
 } SQLITE_ROW_VALUE;
 
+static int dap_db_driver_sqlite_exec(sqlite3 *l_db, const char *l_query, char **l_error_message);
+
 /**
  * SQLite library initialization, no thread safe
  *
@@ -83,6 +85,16 @@ int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks
         dap_db_driver_sqlite_free(l_error_message);
     }
     else {
+        if(!dap_db_driver_sqlite_set_pragma(s_db, "synchronous", "NORMAL")) // 0 | OFF | 1 | NORMAL | 2 | FULL
+            printf("can't set new synchronous mode\n");
+        if(!dap_db_driver_sqlite_set_pragma(s_db, "journal_mode", "OFF")) // DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF
+            printf("can't set new journal mode\n");
+
+        if(!dap_db_driver_sqlite_set_pragma(s_db, "page_size", "1024")) // DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF
+                    printf("can't set page_size\n");
+  //      *PRAGMA page_size = bytes; // page size DB; it is reasonable to make it equal to the size of the disk cluster 4096
+    //     *PRAGMA cache_size = -kibibytes; // by default it is equal to 2000 pages of database
+//
         a_drv_callback->apply_store_obj = dap_db_driver_sqlite_apply_store_obj;
         a_drv_callback->read_store_obj = dap_db_driver_sqlite_read_store_obj;
         a_drv_callback->transaction_start = dap_db_driver_sqlite_start_transaction;
@@ -162,6 +174,32 @@ void dap_db_driver_sqlite_free(char *memory)
         sqlite3_free(memory);
 }
 
+/**
+ * Set specific pragma statements
+ * www.sqlite.org/pragma.html
+ *
+ *PRAGMA page_size = bytes; // page size DB; it is reasonable to make it equal to the size of the disk cluster 4096
+ *PRAGMA cache_size = -kibibytes; // by default it is equal to 2000 pages of database
+ *PRAGMA encoding = "UTF-8";  // default = UTF-8
+ *PRAGMA foreign_keys = 1; // default = 0
+ *PRAGMA journal_mode = DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF;
+ *PRAGMA synchronous = 0 | OFF | 1 | NORMAL | 2 | FULL;
+ */
+bool dap_db_driver_sqlite_set_pragma(sqlite3 *a_db, char *a_param, char *a_mode)
+{
+    if(!a_param || !a_mode)
+            {
+        printf("[sqlite_set_pragma] err!!! no param or mode\n");
+        return false;
+    }
+    char *l_str_query = sqlite3_mprintf("PRAGMA %s = %s", a_param, a_mode);
+    int l_rc = dap_db_driver_sqlite_exec(a_db, l_str_query, NULL); // default synchronous=FULL
+    sqlite3_free(l_str_query);
+    if(l_rc == SQLITE_OK)
+        return true;
+    return false;
+}
+
 /**
  * Execute SQL query to database that does not return data
  *
@@ -431,7 +469,7 @@ int dap_db_driver_sqlite_apply_store_obj(dap_store_obj_t *a_store_obj)
         log_it(L_ERROR, "Unknown store_obj type '0x%x'", a_store_obj->type);
         return -1;
     }
-    int l_ret = dap_db_driver_sqlite_exec(s_db, l_query, &l_error_message);
+    int l_ret = dap_db_driver_sqlite_exec(s_db, l_query, NULL);
     // missing database
     if(l_ret == SQLITE_ERROR) {
         // create table
@@ -440,7 +478,7 @@ int dap_db_driver_sqlite_apply_store_obj(dap_store_obj_t *a_store_obj)
     }
     dap_db_driver_sqlite_free(l_query);
     // entry with the same hash is already present
-    if(l_ret == SQLITE_CONSTRAINT){
+    if(l_ret == SQLITE_CONSTRAINT) {
         log_it(L_INFO, "Entry with the same hash is already present, %s", l_error_message);
         dap_db_driver_sqlite_free(l_error_message);
         return 0;
@@ -456,14 +494,17 @@ int dap_db_driver_sqlite_apply_store_obj(dap_store_obj_t *a_store_obj)
 
 /**
  * Read data
+ * a_key may be NULL
  */
 dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const char *a_key)
 {
     dap_store_obj_t *l_obj = NULL;
     char *l_error_message = NULL;
     sqlite3_stmt *l_res;
+    if(!a_group || !a_key)
+        return NULL;
     char *l_str_query = sqlite3_mprintf("SELECT ts,value FROM '%s' WHERE key='%s' LIMIT 1", a_group, a_key);
-    int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, &l_error_message); // default synchronous=FULL
+    int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, &l_error_message);
     sqlite3_free(l_str_query);
     if(l_ret != SQLITE_OK) {
         log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
diff --git a/dap_chain_global_db_driver_sqlite.h b/dap_chain_global_db_driver_sqlite.h
index 476de9d..f75b770 100644
--- a/dap_chain_global_db_driver_sqlite.h
+++ b/dap_chain_global_db_driver_sqlite.h
@@ -31,6 +31,7 @@ int dap_db_driver_sqlite_deinit(void);
 sqlite3* dap_db_driver_sqlite_open(const char *a_filename_utf8, int a_flags, char **error_message);
 void dap_db_driver_sqlite_close(sqlite3 *l_db);
 void dap_db_driver_sqlite_free(char *memory);
+bool dap_db_driver_sqlite_set_pragma(sqlite3 *a_db, char *a_param, char *a_mode);
 
 
 // ** SQLite callbacks **
-- 
GitLab