diff --git a/dap_chain_global_db.c b/dap_chain_global_db.c index 31802e7573d78be25a39c3cd4bcc15596d797700..20d00b59c7721f76396e3e0599ae3812691db123 100755 --- a/dap_chain_global_db.c +++ b/dap_chain_global_db.c @@ -173,7 +173,10 @@ void dap_chain_global_db_deinit(void) */ void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group) { - size_t count = 0; + dap_store_obj_t *l_store_data = dap_db_read_data(a_group, a_key, NULL); + return l_store_data; + +/* size_t count = 0; if(!a_key) return NULL; size_t query_len = (size_t) snprintf(NULL, 0, "(&(cn=%s)(objectClass=%s))", a_key, a_group); @@ -184,7 +187,7 @@ void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group) unlock(); assert(count <= 1); DAP_DELETE(query); - return store_data; + return store_data;*/ } /** @@ -196,8 +199,15 @@ void* dap_chain_global_db_obj_get(const char *a_key, const char *a_group) */ uint8_t * dap_chain_global_db_gr_get(const char *a_key, size_t *a_data_len_out, const char *a_group) { - dap_store_obj_t *l_store_data = dap_db_read_data(a_group, a_key); - return l_store_data; + uint8_t *l_ret_value = NULL; + dap_store_obj_t *l_store_data = dap_db_read_data(a_group, a_key, a_data_len_out); + if(l_store_data) { + l_ret_value = (l_store_data->value) ? DAP_NEW_SIZE(uint8_t, l_store_data->value_len) : NULL; //ret_value = (store_data->value) ? strdup(store_data->value) : NULL; + memcpy(l_ret_value, l_store_data->value, l_store_data->value_len); + if(a_data_len_out) + *a_data_len_out = l_store_data->value_len; + } + return l_ret_value; /*ldb * uint8_t *l_ret_value = NULL; @@ -332,7 +342,7 @@ dap_global_db_obj_t** dap_chain_global_db_gr_load(const char *a_group, size_t *a size_t count = 0; // Read data lock(); - pdap_store_obj_t store_obj = dap_db_read_data(l_query, &count); + pdap_store_obj_t store_obj = dap_db_read_data(a_group, NULL, &count); unlock(); DAP_DELETE(l_query); // Serialization data @@ -389,7 +399,7 @@ bool dap_chain_global_db_obj_save(void* a_store_data, size_t a_objs_count) size_t l_count = 0; char *l_query = dap_strdup_printf("(&(cn=%s)(objectClass=%s))", l_obj->key, l_obj->group); lock(); - dap_store_obj_t *l_read_store_data = dap_db_read_data(l_query, &l_count); + dap_store_obj_t *l_read_store_data = dap_db_read_data(l_query,NULL, &l_count); unlock(); // whether to add a record if(l_obj->type == 'a' && l_read_store_data) { diff --git a/dap_chain_global_db.h b/dap_chain_global_db.h index e0d01fec390c634390f227b6ac4103e3f9e91670..e80ba73d3031015a5628d806e23921c9cde8b350 100755 --- a/dap_chain_global_db.h +++ b/dap_chain_global_db.h @@ -44,7 +44,6 @@ void dap_chain_global_db_deinit(void); /** * Setup callbacks and filters */ - void dap_chain_global_db_add_history_group_prefix(const char * a_group_prefix); // Add group prefix that will be tracking all changes void dap_chain_global_db_add_history_callback_notify(const char * a_group_prefix, dap_global_db_obj_callback_notify_t a_callback, void * a_arg); diff --git a/dap_chain_global_db_driver.c b/dap_chain_global_db_driver.c index 47455b2fafbe6016a7cd03fb280ad379c6f06430..2cd50549fd13c860a2db31f1099c330acbdc1298 100644 --- a/dap_chain_global_db_driver.c +++ b/dap_chain_global_db_driver.c @@ -46,11 +46,18 @@ static int save_write_buf(void); pthread_mutex_t s_mutex_add_start = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t s_mutex_add_end = PTHREAD_MUTEX_INITIALIZER; //pthread_rwlock_rdlock +// new data in buffer to write +pthread_mutex_t s_mutex_cond = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t s_cond_add_end; // = PTHREAD_COND_INITIALIZER; +// writing ended +pthread_mutex_t s_mutex_write_end = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t s_cond_write_end; // = PTHREAD_COND_INITIALIZER; + dap_list_t *s_list_begin = NULL; dap_list_t *s_list_end = NULL; pthread_t s_write_buf_thread; +volatile static bool s_write_buf_state = 0; static void* func_write_buf(void * arg); static dap_db_driver_callbacks_t s_drv_callback; @@ -77,7 +84,9 @@ int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db) pthread_condattr_init(&l_condattr); pthread_condattr_setclock(&l_condattr, CLOCK_MONOTONIC); pthread_cond_init(&s_cond_add_end, &l_condattr); + pthread_cond_init(&s_cond_write_end, &l_condattr); // thread for save buffer to database + s_write_buf_state = true; pthread_create(&s_write_buf_thread, NULL, func_write_buf, NULL); } return l_ret; @@ -89,7 +98,17 @@ int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db) void dap_db_driver_deinit(void) { - save_write_buf(); + // wait for close thread + { + pthread_mutex_lock(&s_mutex_cond); + pthread_cond_broadcast(&s_cond_add_end); + pthread_mutex_unlock(&s_mutex_cond); + + s_write_buf_state = false; + pthread_join(s_write_buf_thread, NULL); + } + + //save_write_buf(); pthread_mutex_lock(&s_mutex_add_end); pthread_mutex_lock(&s_mutex_add_start); while(s_list_begin != s_list_end) { @@ -103,9 +122,12 @@ void dap_db_driver_deinit(void) s_list_begin = s_list_end = NULL; pthread_mutex_unlock(&s_mutex_add_start); pthread_mutex_unlock(&s_mutex_add_end); + // deinit driver if(s_drv_callback.deinit) s_drv_callback.deinit(); + pthread_cond_destroy(&s_cond_add_end); + } dap_store_obj_t* dap_store_obj_copy(dap_store_obj_t *a_store_obj, size_t a_store_count) @@ -163,13 +185,13 @@ char* dap_db_driver_db_hash(const uint8_t *data, size_t data_size) * Wait data to write buffer * return 0 - Ok, 1 - timeout */ -static int wait_data(int l_timeout_ms) +static int wait_data(pthread_mutex_t *a_mutex, pthread_cond_t *a_cond, int l_timeout_ms) { int l_res = 0; - pthread_mutex_lock(&s_mutex_add_end); + pthread_mutex_lock(a_mutex); // endless waiting if(l_timeout_ms == -1) - l_res = pthread_cond_wait(&s_cond_add_end, &s_mutex_add_end); + l_res = pthread_cond_wait(a_cond, a_mutex); // waiting no more than timeout in milliseconds else { struct timespec l_to; @@ -182,21 +204,55 @@ static int wait_data(int l_timeout_ms) } else l_to.tv_nsec = (long) l_nsec_new; - l_res = pthread_cond_timedwait(&s_cond_add_end, &s_mutex_add_end, &l_to); + l_res = pthread_cond_timedwait(a_cond, a_mutex, &l_to); } - pthread_mutex_unlock(&s_mutex_add_end); + pthread_mutex_unlock(a_mutex); if(l_res == ETIMEDOUT) return 1; return l_res; } -static int save_write_buf(void) +// return 0 if buffer empty, 1 data present +static bool check_fill_buf(void) { + dap_list_t *l_list_begin; dap_list_t *l_list_end; + pthread_mutex_lock(&s_mutex_add_start); pthread_mutex_lock(&s_mutex_add_end); l_list_end = s_list_end; + l_list_begin = s_list_begin; pthread_mutex_unlock(&s_mutex_add_end); + pthread_mutex_unlock(&s_mutex_add_start); + + bool l_ret = (l_list_begin != l_list_end) ? 1 : 0; +// if(l_ret) +// printf("** Wait s_beg=0x%x s_end=0x%x \n", l_list_begin, l_list_end); + return l_ret; +} + +// wait apply write buffer +static void wait_write_buf() +{ +// printf("** Start wait data\n"); + // wait data + while(1) { + if(!check_fill_buf()) + break; + if(!wait_data(&s_mutex_write_end, &s_cond_write_end, 50)) + break; + } +// printf("** End wait data\n"); +} +// save data from buffer to database +static int save_write_buf(void) +{ + dap_list_t *l_list_end; + // fix end of buffer + pthread_mutex_lock(&s_mutex_add_end); + l_list_end = s_list_end; + pthread_mutex_unlock(&s_mutex_add_end); + // save data from begin to fixed end pthread_mutex_lock(&s_mutex_add_start); if(s_list_begin != l_list_end) { if(s_drv_callback.transaction_start) @@ -216,30 +272,43 @@ static int save_write_buf(void) } s_list_begin = dap_list_next(s_list_begin); +// printf("** ap2*record *l_beg=0x%x l_nex=0x%x d_beg=0x%x l_end=0x%x d_end=0x%x sl_end=0x%x\n", s_list_begin, + // s_list_begin->next, s_list_begin->data, l_list_end, l_list_end->data, s_list_end); + + //printf("** free data=0x%x list=0x%x\n", s_list_begin->prev->data, s_list_begin->prev); // free memory dap_store_obj_free((dap_store_obj_t*) s_list_begin->prev->data, 1); dap_list_free1(s_list_begin->prev); s_list_begin->prev = NULL; cnt++; - } if(s_drv_callback.transaction_end) s_drv_callback.transaction_end(); + printf("** writing ended cnt=%d\n", cnt); + // writing ended + pthread_mutex_lock(&s_mutex_write_end); + pthread_cond_broadcast(&s_cond_write_end); + pthread_mutex_unlock(&s_mutex_write_end); } pthread_mutex_unlock(&s_mutex_add_start); - // wait data - //wait_data - + return 0; } +// thread for save data from buffer to database static void* func_write_buf(void * arg) { while(1) { + if(!s_write_buf_state) + break; //save_write_buf - if(save_write_buf() == 0) + if(save_write_buf() == 0) { + if(!s_write_buf_state) + break; // wait data - wait_data(2000); // 2 sec + wait_data(&s_mutex_cond, &s_cond_add_end, 2000); // 2 sec + } } + pthread_exit(0); } int dap_db_add(pdap_store_obj_t a_store_obj, size_t a_store_count) @@ -247,6 +316,7 @@ int dap_db_add(pdap_store_obj_t a_store_obj, size_t a_store_count) //dap_store_obj_t *l_store_obj = dap_store_obj_copy(a_store_obj, a_store_count); if(!a_store_obj || !a_store_count) return -1; + a_store_obj->type = 'a'; // add all records into write buffer pthread_mutex_lock(&s_mutex_add_end); for(size_t i = 0; i < a_store_count; i++) { @@ -257,27 +327,36 @@ int dap_db_add(pdap_store_obj_t a_store_obj, size_t a_store_count) pthread_mutex_lock(&s_mutex_add_start); s_list_begin = s_list_end; pthread_mutex_unlock(&s_mutex_add_start); + //printf("*!!add record=0x%x / 0x%x obj=0x%x / 0x%x\n", s_list_end, s_list_end->data, s_list_end->prev); } else s_list_end->data = l_store_obj_cur; - s_list_end = dap_list_append(s_list_end, NULL); + dap_list_append(s_list_end, NULL); s_list_end = dap_list_last(s_list_end); + //printf("**+add record l_cur=0x%x / 0x%x l_new=0x%x / 0x%x\n", s_list_end->prev, s_list_end->prev->data,s_list_end, s_list_end->data); } // buffer changed + pthread_mutex_lock(&s_mutex_cond); pthread_cond_broadcast(&s_cond_add_end); + pthread_mutex_unlock(&s_mutex_cond); + pthread_mutex_unlock(&s_mutex_add_end); + return 0; } int dap_db_delete(pdap_store_obj_t a_store_obj, size_t a_store_count) { - dap_db_add(a_store_obj, a_store_count); + a_store_obj->type = 'd'; + return dap_db_add(a_store_obj, a_store_count); } -dap_store_obj_t* dap_db_read_data(const char *a_group, const char *a_key) +dap_store_obj_t* dap_db_read_data(const char *a_group, const char *a_key, size_t *count_out) { - // apply write buffer - save_write_buf(); + dap_store_obj_t *l_ret = NULL; + // wait apply write buffer + wait_write_buf(); // read record if(s_drv_callback.read_store_obj) - s_drv_callback.read_store_obj(a_group, a_key); + l_ret = s_drv_callback.read_store_obj(a_group, a_key, count_out); + return l_ret; } diff --git a/dap_chain_global_db_driver.h b/dap_chain_global_db_driver.h index 8a235ec3ddf4e4ca9ebea3b7ba193d46064cd464..9d819aac2198e3a05af11c3d24c4cbc1d80963a7 100644 --- a/dap_chain_global_db_driver.h +++ b/dap_chain_global_db_driver.h @@ -47,10 +47,9 @@ typedef struct dap_store_obj_pkt { }__attribute__((packed)) dap_store_obj_pkt_t; typedef int (*dap_db_driver_write_callback_t)(dap_store_obj_t*); -typedef dap_store_obj_t* (*dap_db_driver_read_callback_t)(const char *,const char *); +typedef dap_store_obj_t* (*dap_db_driver_read_callback_t)(const char *,const char *, size_t *); typedef int (*dap_db_driver_callback_t)(void); - typedef struct dap_db_driver_callbacks { dap_db_driver_write_callback_t apply_store_obj; dap_db_driver_read_callback_t read_store_obj; @@ -70,7 +69,7 @@ char* dap_db_driver_db_hash(const uint8_t *data, size_t data_size); int dap_db_add(pdap_store_obj_t a_store_obj, size_t a_store_count); int dap_db_delete(pdap_store_obj_t a_store_obj, size_t a_store_count); -dap_store_obj_t* dap_db_read_data(const char *a_group, const char *a_key); +dap_store_obj_t* dap_db_read_data(const char *a_group, const char *a_key, size_t *count_out); pdap_store_obj_t dap_db_read_file_data(const char *a_path, const char *a_group); // state of emergency only, if LDB database is inaccessible dap_store_obj_pkt_t *dap_store_packet_single(pdap_store_obj_t a_store_obj); diff --git a/dap_chain_global_db_driver_sqlite.c b/dap_chain_global_db_driver_sqlite.c index 3ea4c5dea5ea37f3c6f3f4935acf513d9ef8f941..ef8984ba5981c5b33498f785a8e7803af63c0a7d 100644 --- a/dap_chain_global_db_driver_sqlite.c +++ b/dap_chain_global_db_driver_sqlite.c @@ -62,6 +62,8 @@ typedef struct _SQLITE_ROW_VALUE_ SQLITE_VALUE *val; // array of field values } SQLITE_ROW_VALUE; +static int dap_db_driver_sqlite_exec(sqlite3 *l_db, const char *l_query, char **l_error_message); + /** * SQLite library initialization, no thread safe * @@ -83,6 +85,16 @@ int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks dap_db_driver_sqlite_free(l_error_message); } else { + if(!dap_db_driver_sqlite_set_pragma(s_db, "synchronous", "NORMAL")) // 0 | OFF | 1 | NORMAL | 2 | FULL + printf("can't set new synchronous mode\n"); + if(!dap_db_driver_sqlite_set_pragma(s_db, "journal_mode", "OFF")) // DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF + printf("can't set new journal mode\n"); + + if(!dap_db_driver_sqlite_set_pragma(s_db, "page_size", "1024")) // DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF + printf("can't set page_size\n"); + // *PRAGMA page_size = bytes; // page size DB; it is reasonable to make it equal to the size of the disk cluster 4096 + // *PRAGMA cache_size = -kibibytes; // by default it is equal to 2000 pages of database +// a_drv_callback->apply_store_obj = dap_db_driver_sqlite_apply_store_obj; a_drv_callback->read_store_obj = dap_db_driver_sqlite_read_store_obj; a_drv_callback->transaction_start = dap_db_driver_sqlite_start_transaction; @@ -162,6 +174,32 @@ void dap_db_driver_sqlite_free(char *memory) sqlite3_free(memory); } +/** + * Set specific pragma statements + * www.sqlite.org/pragma.html + * + *PRAGMA page_size = bytes; // page size DB; it is reasonable to make it equal to the size of the disk cluster 4096 + *PRAGMA cache_size = -kibibytes; // by default it is equal to 2000 pages of database + *PRAGMA encoding = "UTF-8"; // default = UTF-8 + *PRAGMA foreign_keys = 1; // default = 0 + *PRAGMA journal_mode = DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF; + *PRAGMA synchronous = 0 | OFF | 1 | NORMAL | 2 | FULL; + */ +bool dap_db_driver_sqlite_set_pragma(sqlite3 *a_db, char *a_param, char *a_mode) +{ + if(!a_param || !a_mode) + { + printf("[sqlite_set_pragma] err!!! no param or mode\n"); + return false; + } + char *l_str_query = sqlite3_mprintf("PRAGMA %s = %s", a_param, a_mode); + int l_rc = dap_db_driver_sqlite_exec(a_db, l_str_query, NULL); // default synchronous=FULL + sqlite3_free(l_str_query); + if(l_rc == SQLITE_OK) + return true; + return false; +} + /** * Execute SQL query to database that does not return data * @@ -431,7 +469,7 @@ int dap_db_driver_sqlite_apply_store_obj(dap_store_obj_t *a_store_obj) log_it(L_ERROR, "Unknown store_obj type '0x%x'", a_store_obj->type); return -1; } - int l_ret = dap_db_driver_sqlite_exec(s_db, l_query, &l_error_message); + int l_ret = dap_db_driver_sqlite_exec(s_db, l_query, NULL); // missing database if(l_ret == SQLITE_ERROR) { // create table @@ -440,7 +478,7 @@ int dap_db_driver_sqlite_apply_store_obj(dap_store_obj_t *a_store_obj) } dap_db_driver_sqlite_free(l_query); // entry with the same hash is already present - if(l_ret == SQLITE_CONSTRAINT){ + if(l_ret == SQLITE_CONSTRAINT) { log_it(L_INFO, "Entry with the same hash is already present, %s", l_error_message); dap_db_driver_sqlite_free(l_error_message); return 0; @@ -456,14 +494,17 @@ int dap_db_driver_sqlite_apply_store_obj(dap_store_obj_t *a_store_obj) /** * Read data + * a_key may be NULL */ dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const char *a_key) { dap_store_obj_t *l_obj = NULL; char *l_error_message = NULL; sqlite3_stmt *l_res; + if(!a_group || !a_key) + return NULL; char *l_str_query = sqlite3_mprintf("SELECT ts,value FROM '%s' WHERE key='%s' LIMIT 1", a_group, a_key); - int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, &l_error_message); // default synchronous=FULL + int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, &l_error_message); sqlite3_free(l_str_query); if(l_ret != SQLITE_OK) { log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db)); diff --git a/dap_chain_global_db_driver_sqlite.h b/dap_chain_global_db_driver_sqlite.h index 476de9dabea3d119c56bdc5a357caa30e6404126..f75b770103443a4ebddb6c9fcbd325074976e6f1 100644 --- a/dap_chain_global_db_driver_sqlite.h +++ b/dap_chain_global_db_driver_sqlite.h @@ -31,6 +31,7 @@ int dap_db_driver_sqlite_deinit(void); sqlite3* dap_db_driver_sqlite_open(const char *a_filename_utf8, int a_flags, char **error_message); void dap_db_driver_sqlite_close(sqlite3 *l_db); void dap_db_driver_sqlite_free(char *memory); +bool dap_db_driver_sqlite_set_pragma(sqlite3 *a_db, char *a_param, char *a_mode); // ** SQLite callbacks **