From 9344670688294bc39d3fe5f65d835e7abe0ef3d2 Mon Sep 17 00:00:00 2001 From: p-const <p.const@bk.ru> Date: Thu, 27 Jun 2019 17:54:16 +0300 Subject: [PATCH] id now works properly --- dap_chain_global_db_driver_cdb.c | 173 +++++++++++++++++-------------- 1 file changed, 97 insertions(+), 76 deletions(-) diff --git a/dap_chain_global_db_driver_cdb.c b/dap_chain_global_db_driver_cdb.c index 5ea1008..67742a3 100644 --- a/dap_chain_global_db_driver_cdb.c +++ b/dap_chain_global_db_driver_cdb.c @@ -39,11 +39,13 @@ typedef struct _obj_arg { pdap_store_obj_t o; uint64_t q; uint64_t n; + uint64_t id; } obj_arg, *pobj_arg; typedef struct _cdb_instance { CDB *cdb; char *local_group; + uint64_t id; UT_hash_handle hh; } cdb_instance, *pcdb_instance; @@ -74,13 +76,13 @@ static inline uint64_t dap_cdb_hex_to_uint(char *arr, short size) { } static void cdb_serialize_val_to_dap_store_obj(pdap_store_obj_t a_obj, char *key, char *val) { - if (!key || !val ) { + if (!key || !val) { a_obj = NULL; return; } int offset = 0; a_obj->key = dap_strdup(key); - // a_obj->id = dap_cdb_hex_to_uint((unsigned char*)val, uint64_size); + a_obj->id = dap_cdb_hex_to_uint((unsigned char*)val, sizeof(uint64_t)); offset += sizeof(uint64_t); a_obj->value_len = dap_cdb_hex_to_uint((unsigned char*)val + offset, sizeof(unsigned long)); offset += sizeof(unsigned long); @@ -90,6 +92,35 @@ static void cdb_serialize_val_to_dap_store_obj(pdap_store_obj_t a_obj, char *key a_obj->timestamp = dap_cdb_hex_to_uint((unsigned char*)val + offset, sizeof(time_t)); } +bool dap_cdb_get_last_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) { + if (--((pobj_arg)arg)->q == 0) { + cdb_serialize_val_to_dap_store_obj((pdap_store_obj_t)(((pobj_arg)arg)->o), (char*)key, (char*)val); + return false; + } + return true; +} + +bool dap_cdb_get_some_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) { + pdap_store_obj_t l_obj = (pdap_store_obj_t)((pobj_arg)arg)->o; + cdb_serialize_val_to_dap_store_obj(&l_obj[((pobj_arg)arg)->n - ((pobj_arg)arg)->q], (char*)key, (char*)val); + if (--((pobj_arg)arg)->q == 0) { + return false; + } + return true; +} + +bool dap_cdb_get_cond_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) { + if (dap_cdb_hex_to_uint((unsigned char*)val, sizeof(uint64_t)) < ((pobj_arg)arg)->id) { + return true; + } + pdap_store_obj_t l_obj = (pdap_store_obj_t)((pobj_arg)arg)->o; + cdb_serialize_val_to_dap_store_obj(&l_obj[((pobj_arg)arg)->n - ((pobj_arg)arg)->q], (char*)key, (char*)val); + if (--((pobj_arg)arg)->q == 0) { + return false; + } + return true; +} + pcdb_instance dap_cdb_init_group(char *a_group, int a_flags) { pcdb_instance l_cdb_i = NULL; pthread_mutex_lock(&cdb_mutex); @@ -115,7 +146,30 @@ pcdb_instance dap_cdb_init_group(char *a_group, int a_flags) { goto ERR; } if (!(a_flags & (1 << 1))) { + CDBSTAT l_cdb_stat; + cdb_stat(l_cdb_i->cdb, &l_cdb_stat); + if (l_cdb_stat.rnum > 0) { + void *l_iter = cdb_iterate_new(l_cdb_i->cdb, 0); + obj_arg l_arg; + l_arg.o = DAP_NEW_Z(dap_store_obj_t); + l_arg.q = l_cdb_stat.rnum; + cdb_iterate(l_cdb_i->cdb, dap_cdb_get_last_obj_iter_callback, (void*)&l_arg, l_iter); + cdb_iterate_destroy(l_cdb_i->cdb, l_iter); + l_cdb_i->id = l_arg.o->id; + log_it(L_INFO, "Group \"%s\" found" , l_cdb_i->local_group); + log_it(L_INFO, "Records: %-24u" , l_cdb_stat.rnum); + log_it(L_INFO, "Average read latency: %-24u" , l_cdb_stat.rlatcy); + log_it(L_INFO, "Average write latency: %-24u" , l_cdb_stat.wlatcy); + log_it(L_INFO, "Last id: %-24u" , l_cdb_i->id); + DAP_DELETE(l_arg.o); + } else { + log_it(L_INFO, "Group \"%s\" created" , l_cdb_i->local_group); + l_cdb_i->id = 0; + } HASH_ADD_KEYPTR(hh, s_cdb, l_cdb_i->local_group, strlen(l_cdb_i->local_group), l_cdb_i); + } else { + log_it(L_INFO, "Group \"%s\" truncated" , l_cdb_i->local_group); + l_cdb_i->id = 0; } FIN: pthread_mutex_unlock(&cdb_mutex); @@ -150,12 +204,6 @@ int dap_db_driver_cdb_init(const char *a_cdb_path, dap_db_driver_callbacks_t *a_ closedir(dir); return -2; } - CDBSTAT l_cdb_stat; - cdb_stat(l_cdb_i->cdb, &l_cdb_stat); - log_it(L_INFO, "Group \"%s\" found" , l_cdb_i->local_group); - log_it(L_INFO, "Records: %-24u" , l_cdb_stat.rnum); - log_it(L_INFO, "Average read latency: %-24u" , l_cdb_stat.rlatcy); - log_it(L_INFO, "Average write latency: %-24u" , l_cdb_stat.wlatcy); } a_drv_callback->read_last_store_obj = dap_db_driver_cdb_read_last_store_obj; a_drv_callback->apply_store_obj = dap_db_driver_cdb_apply_store_obj; @@ -167,15 +215,12 @@ int dap_db_driver_cdb_init(const char *a_cdb_path, dap_db_driver_callbacks_t *a_ return CDB_SUCCESS; } -CDB *dap_cdb_get_db_by_group(const char *a_group) { +pcdb_instance dap_cdb_get_db_by_group(const char *a_group) { pcdb_instance l_cdb_i = NULL; pthread_mutex_lock(&cdb_mutex); HASH_FIND_STR(s_cdb, a_group, l_cdb_i); pthread_mutex_unlock(&cdb_mutex); - if (!l_cdb_i) { - return NULL; - } - return l_cdb_i->cdb; + return l_cdb_i; } int dap_cdb_add_group(const char *a_group) { @@ -188,34 +233,6 @@ int dap_cdb_add_group(const char *a_group) { return 0; } -bool dap_cdb_get_last_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) { - if (--((pobj_arg)arg)->q == 0) { - cdb_serialize_val_to_dap_store_obj((pdap_store_obj_t)(((pobj_arg)arg)->o), (char*)key, (char*)val); - return false; - } - return true; -} - -bool dap_cdb_get_some_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) { - pdap_store_obj_t l_obj = (pdap_store_obj_t)((pobj_arg)arg)->o; - cdb_serialize_val_to_dap_store_obj(&l_obj[((pobj_arg)arg)->n - ((pobj_arg)arg)->q], (char*)key, (char*)val); - if (--((pobj_arg)arg)->q == 0) { - return false; - } - return true; -} - -bool dap_cdb_get_cond_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) { - if (((pobj_arg)arg)->n <= ((pobj_arg)arg)->q) { - pdap_store_obj_t l_obj = (pdap_store_obj_t)((pobj_arg)arg)->o; - cdb_serialize_val_to_dap_store_obj(&l_obj[((pobj_arg)arg)->q - ((pobj_arg)arg)->n], (char*)key, (char*)val); - } - if (--((pobj_arg)arg)->n == 0) { - return false; - } - return true; -} - int dap_db_driver_cdb_deinit() { cdb_instance *cur_cdb, *tmp; HASH_ITER(hh, s_cdb, cur_cdb, tmp) { @@ -234,10 +251,11 @@ dap_store_obj_t *dap_db_driver_cdb_read_last_store_obj(const char* a_group) { if (!a_group) { return NULL; } - struct CDB *l_cdb = dap_cdb_get_db_by_group(a_group); - if (!l_cdb) { + pcdb_instance l_cdb_i = dap_cdb_get_db_by_group(a_group); + if (!l_cdb_i) { return NULL; } + CDB *l_cdb = l_cdb_i->cdb; CDBSTAT l_cdb_stat; cdb_stat(l_cdb, &l_cdb_stat); void *l_iter = cdb_iterate_new(l_cdb, 0); @@ -247,7 +265,6 @@ dap_store_obj_t *dap_db_driver_cdb_read_last_store_obj(const char* a_group) { cdb_iterate(l_cdb, dap_cdb_get_last_obj_iter_callback, (void*)&l_arg, l_iter); cdb_iterate_destroy(l_cdb, l_iter); l_arg.o->group = dap_strdup(a_group); - l_arg.o->id = l_cdb_stat.rnum; return l_arg.o; } @@ -255,10 +272,11 @@ dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char *a_group, const cha if (!a_group) { return NULL; } - CDB *l_cdb = dap_cdb_get_db_by_group(a_group); - if (!l_cdb) { + pcdb_instance l_cdb_i = dap_cdb_get_db_by_group(a_group); + if (!l_cdb_i) { return NULL; } + CDB *l_cdb = l_cdb_i->cdb; dap_store_obj_t *l_obj = NULL; if (a_key) { char *l_value; @@ -289,14 +307,13 @@ dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char *a_group, const cha l_arg.q = l_count_out; l_arg.n = l_count_out; void *l_iter = cdb_iterate_new(l_cdb, 0); - l_count_out = cdb_iterate(l_cdb, dap_cdb_get_some_obj_iter_callback, (void*)&l_arg, l_iter); + /*l_count_out = */cdb_iterate(l_cdb, dap_cdb_get_some_obj_iter_callback, (void*)&l_arg, l_iter); cdb_iterate_destroy(l_cdb, l_iter); if(a_count_out) { *a_count_out = l_count_out; } for (ulong i = 0; i < l_count_out; ++i) { l_arg.o[i].group = dap_strdup(a_group); - l_arg.o[i].id = l_cdb_stat.rnum - l_count_out + i + 1; } l_obj = l_arg.o; } @@ -307,41 +324,44 @@ dap_store_obj_t* dap_db_driver_cdb_read_cond_store_obj(const char *a_group, uint if (!a_group) { return NULL; } - CDB *l_cdb = dap_cdb_get_db_by_group(a_group); - if (!l_cdb) { + pcdb_instance l_cdb_i = dap_cdb_get_db_by_group(a_group); + if (!l_cdb_i) { return NULL; } + CDB *l_cdb = l_cdb_i->cdb; uint64_t l_count_out = 0; if(a_count_out) { l_count_out = *a_count_out; } CDBSTAT l_cdb_stat; cdb_stat(l_cdb, &l_cdb_stat); - if (a_id + l_count_out > l_cdb_stat.rnum) { - *a_count_out = 0; - return NULL; - } - if (a_id + l_count_out == 0) { - a_id = 0; + + if (l_count_out == 0 || l_count_out > l_cdb_stat.rnum) { l_count_out = l_cdb_stat.rnum; - } else if (a_id == 0) { - a_id = l_cdb_stat.rnum - l_count_out; - } else if (l_count_out == 0) { - l_count_out = l_cdb_stat.rnum - a_id; } obj_arg l_arg; l_arg.o = DAP_NEW_Z_SIZE(dap_store_obj_t, l_count_out * sizeof(dap_store_obj_t)); + l_arg.n = l_count_out; l_arg.q = l_count_out; - l_arg.n = l_count_out + a_id; + l_arg.id = a_id; void *l_iter = cdb_iterate_new(l_cdb, 0); - l_count_out = cdb_iterate(l_cdb, dap_cdb_get_cond_obj_iter_callback, (void*)&l_arg, l_iter); + /*l_count_out = */cdb_iterate(l_cdb, dap_cdb_get_cond_obj_iter_callback, (void*)&l_arg, l_iter); cdb_iterate_destroy(l_cdb, l_iter); + if (l_arg.q > 0) { + l_count_out = l_arg.n - l_arg.q; + void *tmp = DAP_REALLOC(l_arg.o, l_count_out * sizeof(dap_store_obj_t)); + if (!tmp) { + log_it(L_CRITICAL, "Couldn't re-allocate memory for portion of store objects!"); + DAP_DELETE(l_arg.o); + return NULL; + } + l_arg.o = tmp; + } if(a_count_out) { *a_count_out = l_count_out; } for (ulong i = 0; i < l_count_out; ++i) { l_arg.o[i].group = dap_strdup(a_group); - l_arg.o[i].id = a_id + i + 1; } return l_arg.o; } @@ -351,22 +371,23 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) { return -1; } int ret = 0; - CDB *l_cdb = dap_cdb_get_db_by_group(a_store_obj->group); - if (!l_cdb) { + pcdb_instance l_cdb_i = dap_cdb_get_db_by_group(a_store_obj->group); + if (!l_cdb_i) { dap_cdb_add_group(a_store_obj->group); - pcdb_instance l_cdb_i = dap_cdb_init_group(a_store_obj->group, CDB_CREAT | CDB_PAGEWARMUP); - if (!l_cdb_i) { - return -1; - } - l_cdb = l_cdb_i->cdb; + l_cdb_i = dap_cdb_init_group(a_store_obj->group, CDB_CREAT | CDB_PAGEWARMUP); + } + if (!l_cdb_i) { + return -1; } if(a_store_obj->type == 'a') { - if(!a_store_obj->key || !a_store_obj->value || !a_store_obj->value_len) return -2; + if(!a_store_obj->key || !a_store_obj->value || !a_store_obj->value_len){ + return -2; + } cdb_record l_rec; l_rec.key = dap_strdup(a_store_obj->key); int offset = 0; char *l_val = DAP_NEW_Z_SIZE(char, sizeof(uint64_t) + sizeof(unsigned long) + a_store_obj->value_len + sizeof(time_t)); - // dap_cdb_uint_to_hex(l_val, a_store_obj->id, uint64_size); + dap_cdb_uint_to_hex(l_val, ++l_cdb_i->id, sizeof(uint64_t)); offset += sizeof(uint64_t); dap_cdb_uint_to_hex(l_val + offset, a_store_obj->value_len, sizeof(unsigned long)); offset += sizeof(unsigned long); @@ -376,17 +397,17 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) { dap_cdb_uint_to_hex(l_val + offset, l_time, sizeof(time_t)); offset += sizeof(time_t); l_rec.val = l_val; - if (cdb_set2(l_cdb, l_rec.key, strlen(l_rec.key), l_rec.val, offset, CDB_INSERTCACHE | CDB_OVERWRITE, 0) != CDB_SUCCESS) { - log_it(L_ERROR, "Couldn't add record with key [%s] to CDB: \"%s\"", l_rec.key, cdb_errmsg(cdb_errno(l_cdb))); + if (cdb_set2(l_cdb_i->cdb, l_rec.key, strlen(l_rec.key), l_rec.val, offset, CDB_INSERTCACHE | CDB_OVERWRITE, 0) != CDB_SUCCESS) { + log_it(L_ERROR, "Couldn't add record with key [%s] to CDB: \"%s\"", l_rec.key, cdb_errmsg(cdb_errno(l_cdb_i->cdb))); ret = -1; } DAP_DELETE(l_rec.key); DAP_DELETE(l_rec.val); } else if(a_store_obj->type == 'd') { if(a_store_obj->key) { - cdb_del(l_cdb, a_store_obj->key, strlen(a_store_obj->key)); + cdb_del(l_cdb_i->cdb, a_store_obj->key, strlen(a_store_obj->key)); } else { - cdb_destroy(l_cdb); + cdb_destroy(l_cdb_i->cdb); if (!dap_cdb_init_group(a_store_obj->group, CDB_TRUNC | CDB_PAGEWARMUP)) { ret = -1; } -- GitLab