Skip to content
Snippets Groups Projects
Unverified Commit 0188ea0f authored by Dmitriy A. Gerasimov's avatar Dmitriy A. Gerasimov Committed by GitHub
Browse files

Merge pull request #6 from cellframe/feature-2313

disabled write buffer
parents 271c4f39 f547ea43
No related branches found
No related tags found
No related merge requests found
...@@ -41,6 +41,9 @@ ...@@ -41,6 +41,9 @@
static char *s_used_driver = NULL; static char *s_used_driver = NULL;
//#define USE_WRITE_BUFFER
#ifdef USE_WRITE_BUFFER
static int save_write_buf(void); static int save_write_buf(void);
// for write buffer // for write buffer
...@@ -60,6 +63,7 @@ dap_list_t *s_list_end = NULL; ...@@ -60,6 +63,7 @@ dap_list_t *s_list_end = NULL;
pthread_t s_write_buf_thread; pthread_t s_write_buf_thread;
volatile static bool s_write_buf_state = 0; volatile static bool s_write_buf_state = 0;
static void* func_write_buf(void * arg); static void* func_write_buf(void * arg);
#endif //USE_WRITE_BUFFER
static dap_db_driver_callbacks_t s_drv_callback; static dap_db_driver_callbacks_t s_drv_callback;
...@@ -82,6 +86,7 @@ int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db) ...@@ -82,6 +86,7 @@ int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db)
l_ret = dap_db_driver_sqlite_init(a_filename_db, &s_drv_callback); l_ret = dap_db_driver_sqlite_init(a_filename_db, &s_drv_callback);
if(!dap_strcmp(s_used_driver, "cdb")) if(!dap_strcmp(s_used_driver, "cdb"))
l_ret = dap_db_driver_cdb_init(a_filename_db, &s_drv_callback); l_ret = dap_db_driver_cdb_init(a_filename_db, &s_drv_callback);
#ifdef USE_WRITE_BUFFER
if(!l_ret) { if(!l_ret) {
pthread_condattr_t l_condattr; pthread_condattr_t l_condattr;
pthread_condattr_init(&l_condattr); pthread_condattr_init(&l_condattr);
...@@ -92,6 +97,7 @@ int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db) ...@@ -92,6 +97,7 @@ int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db)
s_write_buf_state = true; s_write_buf_state = true;
pthread_create(&s_write_buf_thread, NULL, func_write_buf, NULL); pthread_create(&s_write_buf_thread, NULL, func_write_buf, NULL);
} }
#endif
return l_ret; return l_ret;
} }
...@@ -101,6 +107,7 @@ int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db) ...@@ -101,6 +107,7 @@ int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db)
void dap_db_driver_deinit(void) void dap_db_driver_deinit(void)
{ {
#ifdef USE_WRITE_BUFFER
// wait for close thread // wait for close thread
{ {
pthread_mutex_lock(&s_mutex_cond); pthread_mutex_lock(&s_mutex_cond);
...@@ -125,12 +132,12 @@ void dap_db_driver_deinit(void) ...@@ -125,12 +132,12 @@ void dap_db_driver_deinit(void)
s_list_begin = s_list_end = NULL; s_list_begin = s_list_end = NULL;
pthread_mutex_unlock(&s_mutex_add_start); pthread_mutex_unlock(&s_mutex_add_start);
pthread_mutex_unlock(&s_mutex_add_end); pthread_mutex_unlock(&s_mutex_add_end);
pthread_cond_destroy(&s_cond_add_end);
#endif
// deinit driver // deinit driver
if(s_drv_callback.deinit) if(s_drv_callback.deinit)
s_drv_callback.deinit(); s_drv_callback.deinit();
pthread_cond_destroy(&s_cond_add_end);
} }
dap_store_obj_t* dap_store_obj_copy(dap_store_obj_t *a_store_obj, size_t a_store_count) dap_store_obj_t* dap_store_obj_copy(dap_store_obj_t *a_store_obj, size_t a_store_count)
...@@ -165,7 +172,8 @@ void dap_store_obj_free(dap_store_obj_t *a_store_obj, size_t a_store_count) ...@@ -165,7 +172,8 @@ void dap_store_obj_free(dap_store_obj_t *a_store_obj, size_t a_store_count)
static size_t dap_db_get_size_pdap_store_obj_t(pdap_store_obj_t store_obj) static size_t dap_db_get_size_pdap_store_obj_t(pdap_store_obj_t store_obj)
{ {
size_t size = sizeof(uint32_t) + 2 * sizeof(uint16_t) + sizeof(size_t) + sizeof(time_t) + dap_strlen(store_obj->group) + size_t size = sizeof(uint32_t) + 2 * sizeof(uint16_t) + sizeof(size_t) + sizeof(time_t)
+ dap_strlen(store_obj->group) +
dap_strlen(store_obj->key) + store_obj->value_len; dap_strlen(store_obj->key) + store_obj->value_len;
return size; return size;
} }
...@@ -177,7 +185,8 @@ static size_t dap_db_get_size_pdap_store_obj_t(pdap_store_obj_t store_obj) ...@@ -177,7 +185,8 @@ static size_t dap_db_get_size_pdap_store_obj_t(pdap_store_obj_t store_obj)
* @param a_size_out[out] size of output structure * @param a_size_out[out] size of output structure
* @return NULL in case of an error * @return NULL in case of an error
*/ */
dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp, size_t a_store_obj_count) dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp,
size_t a_store_obj_count)
{ {
if(!a_store_obj || a_store_obj_count < 1) if(!a_store_obj || a_store_obj_count < 1)
return NULL; return NULL;
...@@ -193,7 +202,7 @@ dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, tim ...@@ -193,7 +202,7 @@ dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, tim
uint32_t l_count = (uint32_t) a_store_obj_count; uint32_t l_count = (uint32_t) a_store_obj_count;
memcpy(l_pkt->data + l_offset, &l_count, sizeof(uint32_t)); memcpy(l_pkt->data + l_offset, &l_count, sizeof(uint32_t));
l_offset += sizeof(uint32_t); l_offset += sizeof(uint32_t);
for( size_t l_q = 0; l_q < a_store_obj_count; ++l_q) { for(size_t l_q = 0; l_q < a_store_obj_count; ++l_q) {
dap_store_obj_t obj = a_store_obj[l_q]; dap_store_obj_t obj = a_store_obj[l_q];
//uint16_t section_size = (uint16_t) dap_strlen(obj.section); //uint16_t section_size = (uint16_t) dap_strlen(obj.section);
uint16_t group_size = (uint16_t) dap_strlen(obj.group); uint16_t group_size = (uint16_t) dap_strlen(obj.group);
...@@ -287,6 +296,7 @@ char* dap_chain_global_db_driver_hash(const uint8_t *data, size_t data_size) ...@@ -287,6 +296,7 @@ char* dap_chain_global_db_driver_hash(const uint8_t *data, size_t data_size)
if(!data || data_size <= 0) if(!data || data_size <= 0)
return NULL; return NULL;
dap_chain_hash_fast_t l_hash; dap_chain_hash_fast_t l_hash;
memset(&l_hash, 0, sizeof(dap_chain_hash_fast_t));
dap_hash_fast(data, data_size, &l_hash); dap_hash_fast(data, data_size, &l_hash);
size_t a_str_max = (sizeof(l_hash.raw) + 1) * 2 + 2; /* heading 0x */ size_t a_str_max = (sizeof(l_hash.raw) + 1) * 2 + 2; /* heading 0x */
char *a_str = DAP_NEW_Z_SIZE(char, a_str_max); char *a_str = DAP_NEW_Z_SIZE(char, a_str_max);
...@@ -329,6 +339,7 @@ static int wait_data(pthread_mutex_t *a_mutex, pthread_cond_t *a_cond, int l_tim ...@@ -329,6 +339,7 @@ static int wait_data(pthread_mutex_t *a_mutex, pthread_cond_t *a_cond, int l_tim
return l_res; return l_res;
} }
#ifdef USE_WRITE_BUFFER
// return 0 if buffer empty, 1 data present // return 0 if buffer empty, 1 data present
static bool check_fill_buf(void) static bool check_fill_buf(void)
{ {
...@@ -427,12 +438,14 @@ static void* func_write_buf(void * arg) ...@@ -427,12 +438,14 @@ static void* func_write_buf(void * arg)
} }
pthread_exit(0); pthread_exit(0);
} }
#endif //USE_WRITE_BUFFER
int dap_chain_global_db_driver_appy(pdap_store_obj_t a_store_obj, size_t a_store_count) int dap_chain_global_db_driver_appy(pdap_store_obj_t a_store_obj, size_t a_store_count)
{ {
//dap_store_obj_t *l_store_obj = dap_store_obj_copy(a_store_obj, a_store_count); //dap_store_obj_t *l_store_obj = dap_store_obj_copy(a_store_obj, a_store_count);
if(!a_store_obj || !a_store_count) if(!a_store_obj || !a_store_count)
return -1; return -1;
#ifdef USE_WRITE_BUFFER
// add all records into write buffer // add all records into write buffer
pthread_mutex_lock(&s_mutex_add_end); pthread_mutex_lock(&s_mutex_add_end);
for(size_t i = 0; i < a_store_count; i++) { for(size_t i = 0; i < a_store_count; i++) {
...@@ -455,9 +468,32 @@ int dap_chain_global_db_driver_appy(pdap_store_obj_t a_store_obj, size_t a_store ...@@ -455,9 +468,32 @@ int dap_chain_global_db_driver_appy(pdap_store_obj_t a_store_obj, size_t a_store
pthread_mutex_lock(&s_mutex_cond); pthread_mutex_lock(&s_mutex_cond);
pthread_cond_broadcast(&s_cond_add_end); pthread_cond_broadcast(&s_cond_add_end);
pthread_mutex_unlock(&s_mutex_cond); pthread_mutex_unlock(&s_mutex_cond);
pthread_mutex_unlock(&s_mutex_add_end); pthread_mutex_unlock(&s_mutex_add_end);
return 0; return 0;
#else
int l_ret = 0;
// apply to database
if(a_store_count > 1 && s_drv_callback.transaction_start)
s_drv_callback.transaction_start();
if(s_drv_callback.apply_store_obj)
for(size_t i = 0; i < a_store_count; i++) {
dap_store_obj_t *l_obj = dap_store_obj_copy(a_store_obj + i, 1);
assert(l_obj);
if(!s_drv_callback.apply_store_obj(a_store_obj)) {
//log_it(L_INFO, "Write item Ok %s/%s\n", l_obj->group, l_obj->key);
}
else {
log_it(L_ERROR, "Can't write item %s/%s\n", l_obj->group, l_obj->key);
l_ret -= 1;
}
}
if(a_store_count > 1 && s_drv_callback.transaction_end)
s_drv_callback.transaction_end();
return l_ret;
#endif
} }
int dap_chain_global_db_driver_add(pdap_store_obj_t a_store_obj, size_t a_store_count) int dap_chain_global_db_driver_add(pdap_store_obj_t a_store_obj, size_t a_store_count)
...@@ -482,8 +518,10 @@ int dap_chain_global_db_driver_delete(pdap_store_obj_t a_store_obj, size_t a_sto ...@@ -482,8 +518,10 @@ int dap_chain_global_db_driver_delete(pdap_store_obj_t a_store_obj, size_t a_sto
dap_store_obj_t* dap_chain_global_db_driver_read_last(const char *a_group) dap_store_obj_t* dap_chain_global_db_driver_read_last(const char *a_group)
{ {
dap_store_obj_t *l_ret = NULL; dap_store_obj_t *l_ret = NULL;
#ifdef USE_WRITE_BUFFER
// wait apply write buffer // wait apply write buffer
wait_write_buf(); wait_write_buf();
#endif
// read records using the selected database engine // read records using the selected database engine
if(s_drv_callback.read_last_store_obj) if(s_drv_callback.read_last_store_obj)
l_ret = s_drv_callback.read_last_store_obj(a_group); l_ret = s_drv_callback.read_last_store_obj(a_group);
...@@ -502,8 +540,10 @@ dap_store_obj_t* dap_chain_global_db_driver_read_last(const char *a_group) ...@@ -502,8 +540,10 @@ dap_store_obj_t* dap_chain_global_db_driver_read_last(const char *a_group)
dap_store_obj_t* dap_chain_global_db_driver_cond_read(const char *a_group, uint64_t id, size_t *a_count_out) dap_store_obj_t* dap_chain_global_db_driver_cond_read(const char *a_group, uint64_t id, size_t *a_count_out)
{ {
dap_store_obj_t *l_ret = NULL; dap_store_obj_t *l_ret = NULL;
#ifdef USE_WRITE_BUFFER
// wait apply write buffer // wait apply write buffer
wait_write_buf(); wait_write_buf();
#endif
// read records using the selected database engine // read records using the selected database engine
if(s_drv_callback.read_cond_store_obj) if(s_drv_callback.read_cond_store_obj)
l_ret = s_drv_callback.read_cond_store_obj(a_group, id, a_count_out); l_ret = s_drv_callback.read_cond_store_obj(a_group, id, a_count_out);
...@@ -521,8 +561,10 @@ dap_store_obj_t* dap_chain_global_db_driver_cond_read(const char *a_group, uint6 ...@@ -521,8 +561,10 @@ dap_store_obj_t* dap_chain_global_db_driver_cond_read(const char *a_group, uint6
dap_store_obj_t* dap_chain_global_db_driver_read(const char *a_group, const char *a_key, size_t *a_count_out) dap_store_obj_t* dap_chain_global_db_driver_read(const char *a_group, const char *a_key, size_t *a_count_out)
{ {
dap_store_obj_t *l_ret = NULL; dap_store_obj_t *l_ret = NULL;
#ifdef USE_WRITE_BUFFER
// wait apply write buffer // wait apply write buffer
wait_write_buf(); wait_write_buf();
#endif
// read records using the selected database engine // read records using the selected database engine
if(s_drv_callback.read_store_obj) if(s_drv_callback.read_store_obj)
l_ret = s_drv_callback.read_store_obj(a_group, a_key, a_count_out); l_ret = s_drv_callback.read_store_obj(a_group, a_key, a_count_out);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment