Skip to content
Snippets Groups Projects
Commit 93446706 authored by p-const's avatar p-const
Browse files

id now works properly

parent e6baf415
Branches bug-2325
No related tags found
No related merge requests found
......@@ -39,11 +39,13 @@ typedef struct _obj_arg {
pdap_store_obj_t o;
uint64_t q;
uint64_t n;
uint64_t id;
} obj_arg, *pobj_arg;
typedef struct _cdb_instance {
CDB *cdb;
char *local_group;
uint64_t id;
UT_hash_handle hh;
} cdb_instance, *pcdb_instance;
......@@ -74,13 +76,13 @@ static inline uint64_t dap_cdb_hex_to_uint(char *arr, short size) {
}
static void cdb_serialize_val_to_dap_store_obj(pdap_store_obj_t a_obj, char *key, char *val) {
if (!key || !val ) {
if (!key || !val) {
a_obj = NULL;
return;
}
int offset = 0;
a_obj->key = dap_strdup(key);
// a_obj->id = dap_cdb_hex_to_uint((unsigned char*)val, uint64_size);
a_obj->id = dap_cdb_hex_to_uint((unsigned char*)val, sizeof(uint64_t));
offset += sizeof(uint64_t);
a_obj->value_len = dap_cdb_hex_to_uint((unsigned char*)val + offset, sizeof(unsigned long));
offset += sizeof(unsigned long);
......@@ -90,6 +92,35 @@ static void cdb_serialize_val_to_dap_store_obj(pdap_store_obj_t a_obj, char *key
a_obj->timestamp = dap_cdb_hex_to_uint((unsigned char*)val + offset, sizeof(time_t));
}
bool dap_cdb_get_last_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) {
if (--((pobj_arg)arg)->q == 0) {
cdb_serialize_val_to_dap_store_obj((pdap_store_obj_t)(((pobj_arg)arg)->o), (char*)key, (char*)val);
return false;
}
return true;
}
bool dap_cdb_get_some_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) {
pdap_store_obj_t l_obj = (pdap_store_obj_t)((pobj_arg)arg)->o;
cdb_serialize_val_to_dap_store_obj(&l_obj[((pobj_arg)arg)->n - ((pobj_arg)arg)->q], (char*)key, (char*)val);
if (--((pobj_arg)arg)->q == 0) {
return false;
}
return true;
}
bool dap_cdb_get_cond_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) {
if (dap_cdb_hex_to_uint((unsigned char*)val, sizeof(uint64_t)) < ((pobj_arg)arg)->id) {
return true;
}
pdap_store_obj_t l_obj = (pdap_store_obj_t)((pobj_arg)arg)->o;
cdb_serialize_val_to_dap_store_obj(&l_obj[((pobj_arg)arg)->n - ((pobj_arg)arg)->q], (char*)key, (char*)val);
if (--((pobj_arg)arg)->q == 0) {
return false;
}
return true;
}
pcdb_instance dap_cdb_init_group(char *a_group, int a_flags) {
pcdb_instance l_cdb_i = NULL;
pthread_mutex_lock(&cdb_mutex);
......@@ -115,7 +146,30 @@ pcdb_instance dap_cdb_init_group(char *a_group, int a_flags) {
goto ERR;
}
if (!(a_flags & (1 << 1))) {
CDBSTAT l_cdb_stat;
cdb_stat(l_cdb_i->cdb, &l_cdb_stat);
if (l_cdb_stat.rnum > 0) {
void *l_iter = cdb_iterate_new(l_cdb_i->cdb, 0);
obj_arg l_arg;
l_arg.o = DAP_NEW_Z(dap_store_obj_t);
l_arg.q = l_cdb_stat.rnum;
cdb_iterate(l_cdb_i->cdb, dap_cdb_get_last_obj_iter_callback, (void*)&l_arg, l_iter);
cdb_iterate_destroy(l_cdb_i->cdb, l_iter);
l_cdb_i->id = l_arg.o->id;
log_it(L_INFO, "Group \"%s\" found" , l_cdb_i->local_group);
log_it(L_INFO, "Records: %-24u" , l_cdb_stat.rnum);
log_it(L_INFO, "Average read latency: %-24u" , l_cdb_stat.rlatcy);
log_it(L_INFO, "Average write latency: %-24u" , l_cdb_stat.wlatcy);
log_it(L_INFO, "Last id: %-24u" , l_cdb_i->id);
DAP_DELETE(l_arg.o);
} else {
log_it(L_INFO, "Group \"%s\" created" , l_cdb_i->local_group);
l_cdb_i->id = 0;
}
HASH_ADD_KEYPTR(hh, s_cdb, l_cdb_i->local_group, strlen(l_cdb_i->local_group), l_cdb_i);
} else {
log_it(L_INFO, "Group \"%s\" truncated" , l_cdb_i->local_group);
l_cdb_i->id = 0;
}
FIN:
pthread_mutex_unlock(&cdb_mutex);
......@@ -150,12 +204,6 @@ int dap_db_driver_cdb_init(const char *a_cdb_path, dap_db_driver_callbacks_t *a_
closedir(dir);
return -2;
}
CDBSTAT l_cdb_stat;
cdb_stat(l_cdb_i->cdb, &l_cdb_stat);
log_it(L_INFO, "Group \"%s\" found" , l_cdb_i->local_group);
log_it(L_INFO, "Records: %-24u" , l_cdb_stat.rnum);
log_it(L_INFO, "Average read latency: %-24u" , l_cdb_stat.rlatcy);
log_it(L_INFO, "Average write latency: %-24u" , l_cdb_stat.wlatcy);
}
a_drv_callback->read_last_store_obj = dap_db_driver_cdb_read_last_store_obj;
a_drv_callback->apply_store_obj = dap_db_driver_cdb_apply_store_obj;
......@@ -167,15 +215,12 @@ int dap_db_driver_cdb_init(const char *a_cdb_path, dap_db_driver_callbacks_t *a_
return CDB_SUCCESS;
}
CDB *dap_cdb_get_db_by_group(const char *a_group) {
pcdb_instance dap_cdb_get_db_by_group(const char *a_group) {
pcdb_instance l_cdb_i = NULL;
pthread_mutex_lock(&cdb_mutex);
HASH_FIND_STR(s_cdb, a_group, l_cdb_i);
pthread_mutex_unlock(&cdb_mutex);
if (!l_cdb_i) {
return NULL;
}
return l_cdb_i->cdb;
return l_cdb_i;
}
int dap_cdb_add_group(const char *a_group) {
......@@ -188,34 +233,6 @@ int dap_cdb_add_group(const char *a_group) {
return 0;
}
bool dap_cdb_get_last_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) {
if (--((pobj_arg)arg)->q == 0) {
cdb_serialize_val_to_dap_store_obj((pdap_store_obj_t)(((pobj_arg)arg)->o), (char*)key, (char*)val);
return false;
}
return true;
}
bool dap_cdb_get_some_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) {
pdap_store_obj_t l_obj = (pdap_store_obj_t)((pobj_arg)arg)->o;
cdb_serialize_val_to_dap_store_obj(&l_obj[((pobj_arg)arg)->n - ((pobj_arg)arg)->q], (char*)key, (char*)val);
if (--((pobj_arg)arg)->q == 0) {
return false;
}
return true;
}
bool dap_cdb_get_cond_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) {
if (((pobj_arg)arg)->n <= ((pobj_arg)arg)->q) {
pdap_store_obj_t l_obj = (pdap_store_obj_t)((pobj_arg)arg)->o;
cdb_serialize_val_to_dap_store_obj(&l_obj[((pobj_arg)arg)->q - ((pobj_arg)arg)->n], (char*)key, (char*)val);
}
if (--((pobj_arg)arg)->n == 0) {
return false;
}
return true;
}
int dap_db_driver_cdb_deinit() {
cdb_instance *cur_cdb, *tmp;
HASH_ITER(hh, s_cdb, cur_cdb, tmp) {
......@@ -234,10 +251,11 @@ dap_store_obj_t *dap_db_driver_cdb_read_last_store_obj(const char* a_group) {
if (!a_group) {
return NULL;
}
struct CDB *l_cdb = dap_cdb_get_db_by_group(a_group);
if (!l_cdb) {
pcdb_instance l_cdb_i = dap_cdb_get_db_by_group(a_group);
if (!l_cdb_i) {
return NULL;
}
CDB *l_cdb = l_cdb_i->cdb;
CDBSTAT l_cdb_stat;
cdb_stat(l_cdb, &l_cdb_stat);
void *l_iter = cdb_iterate_new(l_cdb, 0);
......@@ -247,7 +265,6 @@ dap_store_obj_t *dap_db_driver_cdb_read_last_store_obj(const char* a_group) {
cdb_iterate(l_cdb, dap_cdb_get_last_obj_iter_callback, (void*)&l_arg, l_iter);
cdb_iterate_destroy(l_cdb, l_iter);
l_arg.o->group = dap_strdup(a_group);
l_arg.o->id = l_cdb_stat.rnum;
return l_arg.o;
}
......@@ -255,10 +272,11 @@ dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char *a_group, const cha
if (!a_group) {
return NULL;
}
CDB *l_cdb = dap_cdb_get_db_by_group(a_group);
if (!l_cdb) {
pcdb_instance l_cdb_i = dap_cdb_get_db_by_group(a_group);
if (!l_cdb_i) {
return NULL;
}
CDB *l_cdb = l_cdb_i->cdb;
dap_store_obj_t *l_obj = NULL;
if (a_key) {
char *l_value;
......@@ -289,14 +307,13 @@ dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char *a_group, const cha
l_arg.q = l_count_out;
l_arg.n = l_count_out;
void *l_iter = cdb_iterate_new(l_cdb, 0);
l_count_out = cdb_iterate(l_cdb, dap_cdb_get_some_obj_iter_callback, (void*)&l_arg, l_iter);
/*l_count_out = */cdb_iterate(l_cdb, dap_cdb_get_some_obj_iter_callback, (void*)&l_arg, l_iter);
cdb_iterate_destroy(l_cdb, l_iter);
if(a_count_out) {
*a_count_out = l_count_out;
}
for (ulong i = 0; i < l_count_out; ++i) {
l_arg.o[i].group = dap_strdup(a_group);
l_arg.o[i].id = l_cdb_stat.rnum - l_count_out + i + 1;
}
l_obj = l_arg.o;
}
......@@ -307,41 +324,44 @@ dap_store_obj_t* dap_db_driver_cdb_read_cond_store_obj(const char *a_group, uint
if (!a_group) {
return NULL;
}
CDB *l_cdb = dap_cdb_get_db_by_group(a_group);
if (!l_cdb) {
pcdb_instance l_cdb_i = dap_cdb_get_db_by_group(a_group);
if (!l_cdb_i) {
return NULL;
}
CDB *l_cdb = l_cdb_i->cdb;
uint64_t l_count_out = 0;
if(a_count_out) {
l_count_out = *a_count_out;
}
CDBSTAT l_cdb_stat;
cdb_stat(l_cdb, &l_cdb_stat);
if (a_id + l_count_out > l_cdb_stat.rnum) {
*a_count_out = 0;
return NULL;
}
if (a_id + l_count_out == 0) {
a_id = 0;
if (l_count_out == 0 || l_count_out > l_cdb_stat.rnum) {
l_count_out = l_cdb_stat.rnum;
} else if (a_id == 0) {
a_id = l_cdb_stat.rnum - l_count_out;
} else if (l_count_out == 0) {
l_count_out = l_cdb_stat.rnum - a_id;
}
obj_arg l_arg;
l_arg.o = DAP_NEW_Z_SIZE(dap_store_obj_t, l_count_out * sizeof(dap_store_obj_t));
l_arg.n = l_count_out;
l_arg.q = l_count_out;
l_arg.n = l_count_out + a_id;
l_arg.id = a_id;
void *l_iter = cdb_iterate_new(l_cdb, 0);
l_count_out = cdb_iterate(l_cdb, dap_cdb_get_cond_obj_iter_callback, (void*)&l_arg, l_iter);
/*l_count_out = */cdb_iterate(l_cdb, dap_cdb_get_cond_obj_iter_callback, (void*)&l_arg, l_iter);
cdb_iterate_destroy(l_cdb, l_iter);
if (l_arg.q > 0) {
l_count_out = l_arg.n - l_arg.q;
void *tmp = DAP_REALLOC(l_arg.o, l_count_out * sizeof(dap_store_obj_t));
if (!tmp) {
log_it(L_CRITICAL, "Couldn't re-allocate memory for portion of store objects!");
DAP_DELETE(l_arg.o);
return NULL;
}
l_arg.o = tmp;
}
if(a_count_out) {
*a_count_out = l_count_out;
}
for (ulong i = 0; i < l_count_out; ++i) {
l_arg.o[i].group = dap_strdup(a_group);
l_arg.o[i].id = a_id + i + 1;
}
return l_arg.o;
}
......@@ -351,22 +371,23 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) {
return -1;
}
int ret = 0;
CDB *l_cdb = dap_cdb_get_db_by_group(a_store_obj->group);
if (!l_cdb) {
pcdb_instance l_cdb_i = dap_cdb_get_db_by_group(a_store_obj->group);
if (!l_cdb_i) {
dap_cdb_add_group(a_store_obj->group);
pcdb_instance l_cdb_i = dap_cdb_init_group(a_store_obj->group, CDB_CREAT | CDB_PAGEWARMUP);
if (!l_cdb_i) {
return -1;
}
l_cdb = l_cdb_i->cdb;
l_cdb_i = dap_cdb_init_group(a_store_obj->group, CDB_CREAT | CDB_PAGEWARMUP);
}
if (!l_cdb_i) {
return -1;
}
if(a_store_obj->type == 'a') {
if(!a_store_obj->key || !a_store_obj->value || !a_store_obj->value_len) return -2;
if(!a_store_obj->key || !a_store_obj->value || !a_store_obj->value_len){
return -2;
}
cdb_record l_rec;
l_rec.key = dap_strdup(a_store_obj->key);
int offset = 0;
char *l_val = DAP_NEW_Z_SIZE(char, sizeof(uint64_t) + sizeof(unsigned long) + a_store_obj->value_len + sizeof(time_t));
// dap_cdb_uint_to_hex(l_val, a_store_obj->id, uint64_size);
dap_cdb_uint_to_hex(l_val, ++l_cdb_i->id, sizeof(uint64_t));
offset += sizeof(uint64_t);
dap_cdb_uint_to_hex(l_val + offset, a_store_obj->value_len, sizeof(unsigned long));
offset += sizeof(unsigned long);
......@@ -376,17 +397,17 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) {
dap_cdb_uint_to_hex(l_val + offset, l_time, sizeof(time_t));
offset += sizeof(time_t);
l_rec.val = l_val;
if (cdb_set2(l_cdb, l_rec.key, strlen(l_rec.key), l_rec.val, offset, CDB_INSERTCACHE | CDB_OVERWRITE, 0) != CDB_SUCCESS) {
log_it(L_ERROR, "Couldn't add record with key [%s] to CDB: \"%s\"", l_rec.key, cdb_errmsg(cdb_errno(l_cdb)));
if (cdb_set2(l_cdb_i->cdb, l_rec.key, strlen(l_rec.key), l_rec.val, offset, CDB_INSERTCACHE | CDB_OVERWRITE, 0) != CDB_SUCCESS) {
log_it(L_ERROR, "Couldn't add record with key [%s] to CDB: \"%s\"", l_rec.key, cdb_errmsg(cdb_errno(l_cdb_i->cdb)));
ret = -1;
}
DAP_DELETE(l_rec.key);
DAP_DELETE(l_rec.val);
} else if(a_store_obj->type == 'd') {
if(a_store_obj->key) {
cdb_del(l_cdb, a_store_obj->key, strlen(a_store_obj->key));
cdb_del(l_cdb_i->cdb, a_store_obj->key, strlen(a_store_obj->key));
} else {
cdb_destroy(l_cdb);
cdb_destroy(l_cdb_i->cdb);
if (!dap_cdb_init_group(a_store_obj->group, CDB_TRUNC | CDB_PAGEWARMUP)) {
ret = -1;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment