Skip to content
Snippets Groups Projects
Commit 566a3d76 authored by p-const's avatar p-const
Browse files

Warnings and errors fixes, multithreading support

parent e86d6194
Branches feature-2304
No related tags found
No related merge requests found
......@@ -25,6 +25,7 @@
#include <stddef.h>
#include <string.h>
#include <dirent.h>
#include <pthread.h>
#include <sys/stat.h>
#include <uthash.h>
#include "dap_common.h"
......@@ -50,6 +51,7 @@ typedef struct _cdb_instance {
static char *s_cdb_path = NULL;
static pcdb_instance s_cdb = NULL;
static pthread_mutex_t cdb_mutex = PTHREAD_MUTEX_INITIALIZER;
static inline void dap_cdb_uint_to_hex(char *arr, uint64_t val, short size) {
short i = 0;
......@@ -80,46 +82,52 @@ static void cdb_serialize_val_to_dap_store_obj(pdap_store_obj_t a_obj, char *key
}
int offset = 0;
a_obj->key = dap_strdup(key);
unsigned char l_id[uint64_size] = {'\0'};
memcpy(l_id, val, uint64_size);
a_obj->id = dap_cdb_hex_to_uint(l_id, uint64_size);
a_obj->id = dap_cdb_hex_to_uint((unsigned char*)val, uint64_size);
offset += uint64_size;
unsigned char l_val_size[sizeof(unsigned long)] = {'\0'};
memcpy(l_val_size, val + offset, sizeof(unsigned long));
a_obj->value_len = dap_cdb_hex_to_uint(l_val_size, sizeof(unsigned long));
a_obj->value_len = dap_cdb_hex_to_uint((unsigned char*)val + offset, sizeof(unsigned long));
offset += sizeof(unsigned long);
a_obj->value = DAP_NEW_SIZE(uint8_t, a_obj->value_len);
memcpy(a_obj->value, val + offset, a_obj->value_len);
offset += a_obj->value_len;
unsigned char l_rawtime[sizeof(time_t)] = {'\0'};
memcpy(l_rawtime, val + offset, sizeof(time_t));
a_obj->timestamp = dap_cdb_hex_to_uint(l_rawtime, sizeof(time_t));
a_obj->timestamp = dap_cdb_hex_to_uint((unsigned char*)val + offset, sizeof(time_t));
}
pcdb_instance *dap_cdb_init_group(char *a_group, int a_flags) {
pcdb_instance l_cdb_i = DAP_NEW(cdb_instance);
pcdb_instance dap_cdb_init_group(char *a_group, int a_flags) {
pcdb_instance l_cdb_i = NULL;
pthread_mutex_lock(&cdb_mutex);
char l_cdb_path[strlen(s_cdb_path) + strlen(a_group) + 2];
HASH_FIND_STR(s_cdb, a_group, l_cdb_i);
if (l_cdb_i && !(a_flags & (1 << 1))) {
goto FIN;
}
l_cdb_i = DAP_NEW(cdb_instance);
l_cdb_i->local_group = dap_strdup(a_group);
l_cdb_i->cdb = cdb_new();
char l_cdb_path[strlen(s_cdb_path) + strlen(a_group) + 2];
memset(l_cdb_path, '\0', strlen(s_cdb_path) + strlen(a_group) + 2);
strcat(l_cdb_path, s_cdb_path);
strcat(l_cdb_path, "/");
strcat(l_cdb_path, a_group);
cdb_options l_opts = { 1000000, 128, 1024 };
if ((dap_db_driver_cdb_options(&l_opts, l_cdb_i->cdb) != CDB_SUCCESS) ||
cdb_open(l_cdb_i->cdb, l_cdb_path, a_flags) < 0)
{
if (cdb_option(l_cdb_i->cdb, l_opts.hsize, l_opts.pcacheMB, l_opts.rcacheMB) != CDB_SUCCESS) {
log_it(L_ERROR, "Options are inacceptable: \"%s\"", cdb_errmsg(cdb_errno(l_cdb_i->cdb)));
goto ERR;
}
if (cdb_open(l_cdb_i->cdb, l_cdb_path, a_flags) != CDB_SUCCESS) {
log_it(L_ERROR, "An error occured while opening CDB: \"%s\"", cdb_errmsg(cdb_errno(l_cdb_i->cdb)));
DAP_DELETE(l_cdb_i->local_group);
DAP_DELETE(l_cdb_i);
return NULL;
goto ERR;
}
if (!(a_flags & (1 << 1))) {
HASH_ADD_KEYPTR(hh, s_cdb, l_cdb_i->local_group, strlen(l_cdb_i->local_group), l_cdb_i);
}
HASH_ADD_KEYPTR(hh, s_cdb, l_cdb_i->local_group, strlen(l_cdb_i->local_group), l_cdb_i);
FIN:
pthread_mutex_unlock(&cdb_mutex);
return l_cdb_i;
ERR:
cdb_destroy(l_cdb_i->cdb);
DAP_DELETE(l_cdb_i->local_group);
DAP_DELETE(l_cdb_i);
pthread_mutex_unlock(&cdb_mutex);
return NULL;
}
int dap_db_driver_cdb_init(const char *a_cdb_path, dap_db_driver_callbacks_t *a_drv_callback) {
......@@ -141,7 +149,6 @@ int dap_db_driver_cdb_init(const char *a_cdb_path, dap_db_driver_callbacks_t *a_
pcdb_instance l_cdb_i = dap_cdb_init_group(d->d_name, CDB_CREAT | CDB_PAGEWARMUP);
if (!l_cdb_i) {
dap_db_driver_cdb_deinit();
DAP_DELETE(s_cdb_path);
closedir(dir);
return -2;
}
......@@ -164,7 +171,9 @@ int dap_db_driver_cdb_init(const char *a_cdb_path, dap_db_driver_callbacks_t *a_
CDB *dap_cdb_get_db_by_group(const char *a_group) {
pcdb_instance l_cdb_i = NULL;
pthread_mutex_lock(&cdb_mutex);
HASH_FIND_STR(s_cdb, a_group, l_cdb_i);
pthread_mutex_unlock(&cdb_mutex);
if (!l_cdb_i) {
return NULL;
}
......@@ -182,44 +191,44 @@ int dap_cdb_add_group(const char *a_group) {
}
bool dap_cdb_get_last_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) {
/* this is wrong! TODO: instead of 'oid' must checkout real 'arg->id' */
/*if (oid != ((pobj_arg)arg)->q) {
return true;
}*/
cdb_serialize_val_to_dap_store_obj((pdap_store_obj_t)(((pobj_arg)arg)->o), key, val);
return false;
if (--((pobj_arg)arg)->q == 0) {
cdb_serialize_val_to_dap_store_obj((pdap_store_obj_t)(((pobj_arg)arg)->o), (char*)key, (char*)val);
return false;
}
return true;
}
bool dap_cdb_get_some_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) {
((pobj_arg)arg)->q--;
uint64_t q = ((pobj_arg)arg)->q;
uint64_t n = ((pobj_arg)arg)->n;
pdap_store_obj_t l_obj = (pdap_store_obj_t)((pobj_arg)arg)->o;
cdb_serialize_val_to_dap_store_obj(&l_obj[n-q-1], key, val);
if (q == 0) {
cdb_serialize_val_to_dap_store_obj(&l_obj[((pobj_arg)arg)->n - ((pobj_arg)arg)->q], (char*)key, (char*)val);
if (--((pobj_arg)arg)->q == 0) {
return false;
}
return true;
}
bool dap_cdb_get_cond_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) {
/* No need due to this implementation design */
if (((pobj_arg)arg)->n <= ((pobj_arg)arg)->q) {
pdap_store_obj_t l_obj = (pdap_store_obj_t)((pobj_arg)arg)->o;
cdb_serialize_val_to_dap_store_obj(&l_obj[((pobj_arg)arg)->q - ((pobj_arg)arg)->n], (char*)key, (char*)val);
}
if (--((pobj_arg)arg)->n == 0) {
return false;
}
return true;
}
int dap_db_driver_cdb_deinit() {
cdb_instance *cur_cdb, *tmp;
HASH_ITER(hh, s_cdb, cur_cdb, tmp) {
HASH_DEL(s_cdb, cur_cdb);
DAP_DELETE(cur_cdb->local_group);
cdb_destroy(cur_cdb->cdb);
HASH_DEL(s_cdb, cur_cdb);
DAP_DELETE(cur_cdb);
}
if (s_cdb_path) {
DAP_DELETE(s_cdb_path);
}
return CDB_SUCCESS;
}
int dap_db_driver_cdb_options(pcdb_options l_opts, CDB* a_cdb) {
if (cdb_option(a_cdb,
l_opts->hsize,
l_opts->pcacheMB,
l_opts->rcacheMB) != CDB_SUCCESS) return -1;
return CDB_SUCCESS;
}
......@@ -227,13 +236,13 @@ dap_store_obj_t *dap_db_driver_cdb_read_last_store_obj(const char* a_group) {
if (!a_group) {
return NULL;
}
CDB *l_cdb = dap_cdb_get_db_by_group(a_group);
struct CDB *l_cdb = dap_cdb_get_db_by_group(a_group);
if (!l_cdb) {
return NULL;
}
CDBSTAT l_cdb_stat;
cdb_stat(l_cdb, &l_cdb_stat);
void *l_iter = cdb_iterate_new(l_cdb, l_cdb_stat.rnum);
void *l_iter = cdb_iterate_new(l_cdb, 0);
obj_arg l_arg;
l_arg.o = DAP_NEW_Z(dap_store_obj_t);
l_arg.q = l_cdb_stat.rnum;
......@@ -247,7 +256,6 @@ dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char *a_group, const cha
if (!a_group) {
return NULL;
}
//CDB_group l_group = dap_db_cdb_define_group(a_group);
CDB *l_cdb = dap_cdb_get_db_by_group(a_group);
if (!l_cdb) {
return NULL;
......@@ -261,7 +269,7 @@ dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char *a_group, const cha
return NULL;
}
l_obj = DAP_NEW_Z(dap_store_obj_t);
cdb_serialize_val_to_dap_store_obj(l_obj, a_key, l_value);
cdb_serialize_val_to_dap_store_obj(l_obj, (char*)a_key, l_value);
l_obj->group = dap_strdup(a_group);
cdb_free_val((void**)&l_value);
} else {
......@@ -307,15 +315,15 @@ dap_store_obj_t* dap_db_driver_cdb_read_cond_store_obj(const char *a_group, uint
}
CDBSTAT l_cdb_stat;
cdb_stat(l_cdb, &l_cdb_stat);
if ((l_count_out == 0) || (l_count_out > l_cdb_stat.rnum)) {
l_count_out = l_cdb_stat.rnum;
if ((l_count_out == 0) || (l_count_out > l_cdb_stat.rnum - a_id)) {
l_count_out = l_cdb_stat.rnum - a_id;
}
obj_arg l_arg;
l_arg.o = DAP_NEW_Z_SIZE(dap_store_obj_t, l_count_out * sizeof(dap_store_obj_t));
l_arg.q = l_count_out;
l_arg.q = l_count_out;
void *l_iter = cdb_iterate_new(l_cdb, a_id + 1); // wrong! TODO: make use of obj->id
l_count_out = cdb_iterate(l_cdb, dap_cdb_get_some_obj_iter_callback, (void*)&l_arg, l_iter);
l_arg.n = l_count_out + a_id;
void *l_iter = cdb_iterate_new(l_cdb, 0);
l_count_out = cdb_iterate(l_cdb, dap_cdb_get_cond_obj_iter_callback, (void*)&l_arg, l_iter);
cdb_iterate_destroy(l_cdb, l_iter);
if(a_count_out) {
*a_count_out = l_count_out;
......@@ -344,19 +352,17 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) {
char *l_val = DAP_NEW_Z_SIZE(char, uint64_size + sizeof(unsigned long) + a_store_obj->value_len + sizeof(time_t));
dap_cdb_uint_to_hex(l_val, a_store_obj->id, uint64_size);
offset += uint64_size;
dap_cdb_uint_to_hex(l_val + offset, a_store_obj->value_len, sizeof(unsigned long));
offset += sizeof(unsigned long);
memcpy(l_val + offset, a_store_obj->value, a_store_obj->value_len);
offset += a_store_obj->value_len;
unsigned long l_time = (unsigned long)a_store_obj->timestamp;
dap_cdb_uint_to_hex(l_val + offset, l_time, sizeof(time_t));
offset += sizeof(time_t);
l_rec.val = l_val;
// log_it(L_DEBUG,"cbd_set() group: %s key: %s \tkey_size: %d\t val_size: %d",a_store_obj->group, l_rec.key, strlen(l_rec.key),offset);
cdb_set(l_cdb, l_rec.key, strlen(l_rec.key), l_rec.val, offset);
if (cdb_set2(l_cdb, l_rec.key, strlen(l_rec.key), l_rec.val, offset, CDB_INSERTCACHE | CDB_INSERTIFNOEXIST, 0) < 0) {
log_it(L_ERROR, "Couldn't add record with key [%s] to CDB: \"%s\"", l_rec.key, cdb_errmsg(cdb_errno(l_cdb)));
}
DAP_DELETE(l_rec.key);
DAP_DELETE(l_rec.val);
} else if(a_store_obj->type == 'd') {
......
......@@ -44,4 +44,3 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t);
dap_store_obj_t *dap_db_driver_cdb_read_last_store_obj(const char*);
dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char*, const char*, size_t*);
dap_store_obj_t* dap_db_driver_cdb_read_cond_store_obj(const char*, uint64_t, size_t*);
int dap_db_driver_cdb_options(pcdb_options l_opts, CDB* a_cdb);
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment