diff --git a/dap_chain_global_db_driver.c b/dap_chain_global_db_driver.c index dc464e50f4174e0c65b79a6b5e29e456a77957d9..822b52347db6210dd8486f841f1dcfc010b07b8d 100755 --- a/dap_chain_global_db_driver.c +++ b/dap_chain_global_db_driver.c @@ -41,6 +41,9 @@ static char *s_used_driver = NULL; +//#define USE_WRITE_BUFFER + +#ifdef USE_WRITE_BUFFER static int save_write_buf(void); // for write buffer @@ -60,6 +63,7 @@ dap_list_t *s_list_end = NULL; pthread_t s_write_buf_thread; volatile static bool s_write_buf_state = 0; static void* func_write_buf(void * arg); +#endif //USE_WRITE_BUFFER static dap_db_driver_callbacks_t s_drv_callback; @@ -82,6 +86,7 @@ int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db) l_ret = dap_db_driver_sqlite_init(a_filename_db, &s_drv_callback); if(!dap_strcmp(s_used_driver, "cdb")) l_ret = dap_db_driver_cdb_init(a_filename_db, &s_drv_callback); +#ifdef USE_WRITE_BUFFER if(!l_ret) { pthread_condattr_t l_condattr; pthread_condattr_init(&l_condattr); @@ -92,6 +97,7 @@ int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db) s_write_buf_state = true; pthread_create(&s_write_buf_thread, NULL, func_write_buf, NULL); } +#endif return l_ret; } @@ -101,6 +107,7 @@ int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db) void dap_db_driver_deinit(void) { +#ifdef USE_WRITE_BUFFER // wait for close thread { pthread_mutex_lock(&s_mutex_cond); @@ -125,12 +132,12 @@ void dap_db_driver_deinit(void) s_list_begin = s_list_end = NULL; pthread_mutex_unlock(&s_mutex_add_start); pthread_mutex_unlock(&s_mutex_add_end); + pthread_cond_destroy(&s_cond_add_end); +#endif // deinit driver if(s_drv_callback.deinit) s_drv_callback.deinit(); - pthread_cond_destroy(&s_cond_add_end); - } dap_store_obj_t* dap_store_obj_copy(dap_store_obj_t *a_store_obj, size_t a_store_count) @@ -165,7 +172,8 @@ void dap_store_obj_free(dap_store_obj_t *a_store_obj, size_t a_store_count) static size_t dap_db_get_size_pdap_store_obj_t(pdap_store_obj_t store_obj) { - size_t size = sizeof(uint32_t) + 2 * sizeof(uint16_t) + sizeof(size_t) + sizeof(time_t) + dap_strlen(store_obj->group) + + size_t size = sizeof(uint32_t) + 2 * sizeof(uint16_t) + sizeof(size_t) + sizeof(time_t) + + dap_strlen(store_obj->group) + dap_strlen(store_obj->key) + store_obj->value_len; return size; } @@ -177,7 +185,8 @@ static size_t dap_db_get_size_pdap_store_obj_t(pdap_store_obj_t store_obj) * @param a_size_out[out] size of output structure * @return NULL in case of an error */ -dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp, size_t a_store_obj_count) +dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp, + size_t a_store_obj_count) { if(!a_store_obj || a_store_obj_count < 1) return NULL; @@ -193,7 +202,7 @@ dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, tim uint32_t l_count = (uint32_t) a_store_obj_count; memcpy(l_pkt->data + l_offset, &l_count, sizeof(uint32_t)); l_offset += sizeof(uint32_t); - for( size_t l_q = 0; l_q < a_store_obj_count; ++l_q) { + for(size_t l_q = 0; l_q < a_store_obj_count; ++l_q) { dap_store_obj_t obj = a_store_obj[l_q]; //uint16_t section_size = (uint16_t) dap_strlen(obj.section); uint16_t group_size = (uint16_t) dap_strlen(obj.group); @@ -287,6 +296,7 @@ char* dap_chain_global_db_driver_hash(const uint8_t *data, size_t data_size) if(!data || data_size <= 0) return NULL; dap_chain_hash_fast_t l_hash; + memset(&l_hash, 0, sizeof(dap_chain_hash_fast_t)); dap_hash_fast(data, data_size, &l_hash); size_t a_str_max = (sizeof(l_hash.raw) + 1) * 2 + 2; /* heading 0x */ char *a_str = DAP_NEW_Z_SIZE(char, a_str_max); @@ -329,6 +339,7 @@ static int wait_data(pthread_mutex_t *a_mutex, pthread_cond_t *a_cond, int l_tim return l_res; } +#ifdef USE_WRITE_BUFFER // return 0 if buffer empty, 1 data present static bool check_fill_buf(void) { @@ -427,12 +438,14 @@ static void* func_write_buf(void * arg) } pthread_exit(0); } +#endif //USE_WRITE_BUFFER int dap_chain_global_db_driver_appy(pdap_store_obj_t a_store_obj, size_t a_store_count) { //dap_store_obj_t *l_store_obj = dap_store_obj_copy(a_store_obj, a_store_count); if(!a_store_obj || !a_store_count) return -1; +#ifdef USE_WRITE_BUFFER // add all records into write buffer pthread_mutex_lock(&s_mutex_add_end); for(size_t i = 0; i < a_store_count; i++) { @@ -455,9 +468,32 @@ int dap_chain_global_db_driver_appy(pdap_store_obj_t a_store_obj, size_t a_store pthread_mutex_lock(&s_mutex_cond); pthread_cond_broadcast(&s_cond_add_end); pthread_mutex_unlock(&s_mutex_cond); - pthread_mutex_unlock(&s_mutex_add_end); return 0; +#else + int l_ret = 0; + // apply to database + if(a_store_count > 1 && s_drv_callback.transaction_start) + s_drv_callback.transaction_start(); + + if(s_drv_callback.apply_store_obj) + for(size_t i = 0; i < a_store_count; i++) { + dap_store_obj_t *l_obj = dap_store_obj_copy(a_store_obj + i, 1); + assert(l_obj); + if(!s_drv_callback.apply_store_obj(a_store_obj)) { + //log_it(L_INFO, "Write item Ok %s/%s\n", l_obj->group, l_obj->key); + } + else { + log_it(L_ERROR, "Can't write item %s/%s\n", l_obj->group, l_obj->key); + l_ret -= 1; + } + } + + if(a_store_count > 1 && s_drv_callback.transaction_end) + s_drv_callback.transaction_end(); + return l_ret; +#endif + } int dap_chain_global_db_driver_add(pdap_store_obj_t a_store_obj, size_t a_store_count) @@ -482,8 +518,10 @@ int dap_chain_global_db_driver_delete(pdap_store_obj_t a_store_obj, size_t a_sto dap_store_obj_t* dap_chain_global_db_driver_read_last(const char *a_group) { dap_store_obj_t *l_ret = NULL; +#ifdef USE_WRITE_BUFFER // wait apply write buffer wait_write_buf(); +#endif // read records using the selected database engine if(s_drv_callback.read_last_store_obj) l_ret = s_drv_callback.read_last_store_obj(a_group); @@ -502,8 +540,10 @@ dap_store_obj_t* dap_chain_global_db_driver_read_last(const char *a_group) dap_store_obj_t* dap_chain_global_db_driver_cond_read(const char *a_group, uint64_t id, size_t *a_count_out) { dap_store_obj_t *l_ret = NULL; +#ifdef USE_WRITE_BUFFER // wait apply write buffer wait_write_buf(); +#endif // read records using the selected database engine if(s_drv_callback.read_cond_store_obj) l_ret = s_drv_callback.read_cond_store_obj(a_group, id, a_count_out); @@ -521,8 +561,10 @@ dap_store_obj_t* dap_chain_global_db_driver_cond_read(const char *a_group, uint6 dap_store_obj_t* dap_chain_global_db_driver_read(const char *a_group, const char *a_key, size_t *a_count_out) { dap_store_obj_t *l_ret = NULL; +#ifdef USE_WRITE_BUFFER // wait apply write buffer wait_write_buf(); +#endif // read records using the selected database engine if(s_drv_callback.read_store_obj) l_ret = s_drv_callback.read_store_obj(a_group, a_key, a_count_out);