Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • dap/dap-sdk
1 result
Show changes
Commits on Source (4)
Showing with 691 additions and 600 deletions
......@@ -35,7 +35,7 @@ int dap_strncmp(const char *a_str1, const char *a_str2, size_t a_n);
char* dap_strdup(const char *a_str);
char* dap_strdup_vprintf(const char *a_format, va_list a_args);
DAP_PRINTF_ATTR(1, 2) char *dap_strdup_printf(const char *a_format, ...);
char *dap_strncpy(char *a_dst, const char *a_src, size_t a_limit);
char* dap_stpcpy(char *a_dest, const char *a_src);
char* dap_strstr_len(const char *a_haystack, ssize_t a_haystack_len, const char *a_needle);
// concatenates all of str_array's strings, sliding in an optional separator, the returned string is newly allocated.
......
......@@ -69,8 +69,11 @@ void dap_list_free_full(dap_list_t *a_list, dap_callback_destroyed_t a_free_func
*
* Returns: either @list or the new start of the DapList if @list was %NULL
*/
dap_list_t *dap_list_append(dap_list_t *a_list, void* a_data)
dap_list_t *dap_list_append(dap_list_t *a_list, void *a_data)
{
// sanity check
dap_return_val_if_pass(!a_data, a_list);
//func work
dap_list_t *l_el = DAP_NEW_Z(dap_list_t);
if (!l_el) {
log_it(L_CRITICAL, "Out of memory");
......@@ -106,6 +109,9 @@ dap_list_t *dap_list_append(dap_list_t *a_list, void* a_data)
*/
dap_list_t *dap_list_prepend(dap_list_t *a_list, void *a_data)
{
// sanity check
dap_return_val_if_pass(!a_data, a_list);
//func work
dap_list_t *l_el = DAP_NEW_Z(dap_list_t);
if (!l_el) {
log_it(L_CRITICAL, "Out of memory");
......
......@@ -404,6 +404,32 @@ char* dap_stpcpy(char *a_dest, const char *a_src)
return l_d - 1;
}
/**
* dap_strncpy:
* @a_dst: destination buffer.
* @a_src: source string.
* @a_limit: destination buffer max size
*
* Copies a null-terminated string into the dest buffer, include the
* trailing null, limited to specified size, and return a pointer
* to the trailing null byte. Although limit reached before source string end
* the trailing NULL is inserted to the destination string
*
* Returns: a pointer to trailing null byte.
**/
char *dap_strncpy(char *a_dst, const char *a_src, size_t a_limit)
{
dap_return_val_if_fail(a_dst && a_src, NULL);
do {
*a_dst++ = *a_src;
a_limit--;
} while (*a_src++ != '\0' && a_limit);
--a_dst;
if (*a_dst != '\0')
*a_dst = '\0';
return a_dst;
}
/**
* dap_strstr_len:
* @a_haystack: a string
......
......@@ -45,7 +45,7 @@ bool save_process_pid_in_file(const char* file_path) {
pid_t get_pid_from_file(const char* file_path) {
FILE * fpid = fopen(file_path, "r");
if (fpid == NULL) {
log_it(L_ERROR, "Cant create/open file by path %s",file_path);
log_it(L_ERROR, "Cant open file by path %s",file_path);
return false;
}
......
......@@ -73,44 +73,30 @@ struct queue_io_msg{
dap_global_db_callback_results_raw_t callback_results_raw;
};
// Custom argument passed to the callback
void * callback_arg;
union{
struct{ // Raw get request
uint64_t values_raw_last_id;
void *callback_arg;
union {
struct { // Get all request
dap_global_db_driver_hash_t last_hash;
uint64_t values_page_size;
uint64_t total_records;
uint64_t processed_records;
};
struct{ //Raw set request
dap_store_obj_t * values_raw;
struct { // Raw set request
dap_store_obj_t *values_raw;
uint64_t values_raw_total;
};
struct{ //deserialized requests
// Different variant of message params
union{
// values for multile set
struct{
dap_global_db_obj_t * values;
size_t values_count;
};
// Values for get multiple request
struct{
uint64_t values_last_id; // For multiple records request here stores next request id
uint64_t values_total; // Total values
};
// Value for singe request
struct{
void * value;
size_t value_length;
bool value_is_pinned;
};
};
char * group; // Group
char * key; // Key
struct { // Set multiply zero-copy
dap_global_db_obj_t *values;
uint64_t values_count;
};
struct { // Value for singe request
void *value;
size_t value_length;
bool value_is_pinned;
char *group; // Group
char *key; // Key
};
};
dap_nanotime_t timestamp;
dap_global_db_instance_t *dbi;
};
......@@ -258,6 +244,25 @@ void dap_global_db_deinit() {
dap_db_driver_deinit();
dap_global_db_cluster_deinit();
}
bool dap_global_db_group_match_mask(const char *a_group, const char *a_mask)
{
dap_return_val_if_fail(a_group && a_mask && *a_group && *a_mask, false);
const char *l_group_tail = a_group + strlen(a_group); // Pointer to trailng zero
if (!strcmp(l_group_tail - sizeof(DAP_GLOBAL_DB_DEL_SUFFIX), DAP_GLOBAL_DB_DEL_SUFFIX))
l_group_tail -= sizeof(DAP_GLOBAL_DB_DEL_SUFFIX); // Pointer to '.' of .del group suffix
const char *l_mask_tail = a_mask + strlen(a_mask);
const char *l_group_it = a_group, *l_mask_it = a_mask;
const char *l_wildcard = strchr(a_mask, '*');
while (l_mask_it < (l_wildcard ? l_wildcard : l_mask_tail) &&
l_group_it < l_group_tail)
if (*l_group_it++ != *l_mask_it++)
return false;
if (l_mask_it == l_wildcard && ++l_mask_it < l_mask_tail)
return strstr(l_group_it, l_mask_it);
return true;
}
static int s_store_obj_apply(dap_store_obj_t *a_obj)
{
assert(a_obj->type == DAP_GLOBAL_DB_OPTYPE_ADD);
......@@ -267,7 +272,7 @@ static int s_store_obj_apply(dap_store_obj_t *a_obj)
return -11;
}
dap_global_db_driver_hash_t a_obj_drv_hash = dap_global_db_driver_hash_get(a_obj);
if (dap_global_db_driver_is_hash(a_obj->group, &a_obj_drv_hash)) {
if (dap_global_db_driver_is_hash(a_obj->group, a_obj_drv_hash)) {
debug_if(g_dap_global_db_debug_more, L_NOTICE, "Rejected duplicate object with group %s and key %s",
a_obj->group, a_obj->key);
return -12;
......@@ -827,10 +832,10 @@ dap_global_db_obj_t *dap_global_db_get_all_sync(const char *a_group, size_t *a_o
* @param a_arg
* @return
*/
int dap_global_db_get_all(const char * a_group, size_t a_results_page_size, dap_global_db_callback_results_t a_callback, void * a_arg)
int dap_global_db_get_all(const char *a_group, size_t a_results_page_size, dap_global_db_callback_results_t a_callback, void *a_arg)
{
// TODO make usable a_results_page_size
if(s_dbi == NULL){
if (s_dbi == NULL) {
log_it(L_ERROR, "GlobalDB context is not initialized, can't call dap_global_db_get_all");
return DAP_GLOBAL_DB_RC_ERROR;
}
......@@ -848,7 +853,7 @@ int dap_global_db_get_all(const char * a_group, size_t a_results_page_size, dap_
l_msg->callback_arg = a_arg;
l_msg->callback_results = a_callback;
l_msg->values_page_size = a_results_page_size;
l_msg->timestamp = 0;
l_msg->last_hash = c_dap_global_db_driver_hash_start;
l_ret = dap_proc_thread_callback_add(NULL, s_queue_io_callback, l_msg);
......@@ -870,43 +875,43 @@ static bool s_msg_opcode_get_all(struct queue_io_msg * a_msg)
{
dap_return_val_if_pass(!a_msg, false);
dap_global_db_iter_t *l_iter = dap_global_db_driver_iter_create(a_msg->group);
if (!l_iter) {
log_it(L_ERROR, "Iterator creation error");
return false;
}
bool l_ret = false;
size_t l_values_count = a_msg->values_page_size;
dap_global_db_obj_t *l_objs= NULL;
dap_store_obj_t *l_store_objs = NULL;
size_t l_total_records = dap_global_db_driver_count(a_msg->group, 0);
if (a_msg->values_page_size >= l_total_records || !a_msg->values_page_size) {
if (!a_msg->values_page_size) {
l_objs = dap_global_db_get_all_sync(a_msg->group, &l_values_count);
if (a_msg->callback_results)
l_ret = a_msg->callback_results(a_msg->dbi,
a_msg->callback_results(a_msg->dbi,
l_objs ? DAP_GLOBAL_DB_RC_SUCCESS : DAP_GLOBAL_DB_RC_NO_RESULTS,
a_msg->group, l_values_count, l_values_count,
l_objs, a_msg->callback_arg);
dap_global_db_objs_delete(l_objs, l_values_count);
} else {
for (size_t i = 0; (i < l_total_records) && l_ret; i += l_values_count) {
l_values_count = i + a_msg->values_page_size < l_total_records ? a_msg->values_page_size : l_total_records - i;
l_store_objs = dap_global_db_driver_cond_read(l_iter, &l_values_count, 0);
l_objs = s_objs_from_store_objs(l_store_objs, l_values_count);
// Call callback if present
if (a_msg->callback_results)
l_ret = a_msg->callback_results(a_msg->dbi,
l_objs ? DAP_GLOBAL_DB_RC_SUCCESS : DAP_GLOBAL_DB_RC_NO_RESULTS,
a_msg->group, l_total_records, l_values_count,
l_objs, a_msg->callback_arg);
dap_global_db_objs_delete(l_objs, l_values_count);
}
// All values are sent
return false;
}
return l_ret; // All values are sent
if (!a_msg->total_records)
a_msg->total_records = dap_global_db_driver_count(a_msg->group, c_dap_global_db_driver_hash_start);
if (a_msg->total_records)
l_store_objs = dap_global_db_driver_cond_read(a_msg->group, a_msg->last_hash, &l_values_count);
int l_rc = DAP_GLOBAL_DB_RC_NO_RESULTS;
if (l_store_objs && l_values_count) {
a_msg->processed_records += a_msg->values_page_size;
a_msg->last_hash = dap_global_db_driver_hash_get(l_store_objs + l_values_count - 1);
if (dap_global_db_driver_hash_is_blank(a_msg->last_hash)) {
l_rc = DAP_GLOBAL_DB_RC_PROGRESS;
l_values_count--;
} else
l_rc = DAP_GLOBAL_DB_RC_SUCCESS;
}
l_objs = l_store_objs ? s_objs_from_store_objs(l_store_objs, l_values_count) : NULL;
// Call callback if present
bool l_ret = false;
if (a_msg->callback_results)
l_ret = a_msg->callback_results(a_msg->dbi, l_rc,
a_msg->group, a_msg->total_records, l_values_count,
l_objs, a_msg->callback_arg);
dap_global_db_objs_delete(l_objs, l_values_count);
return l_rc == DAP_GLOBAL_DB_RC_PROGRESS && l_ret;
}
/* *** Get_all_raw functions group *** */
......@@ -951,11 +956,10 @@ int dap_global_db_get_all_raw(const char * a_group, size_t a_results_page_size,
l_msg->dbi = s_dbi;
l_msg->opcode = MSG_OPCODE_GET_ALL_RAW ;
l_msg->group = dap_strdup(a_group);
l_msg->values_raw_last_id = 0;
l_msg->values_page_size = a_results_page_size;
l_msg->callback_arg = a_arg;
l_msg->callback_results_raw = a_callback;
l_msg->timestamp = a_timestamp;
l_msg->last_hash = c_dap_global_db_driver_hash_start;
int l_ret = dap_proc_thread_callback_add(NULL, s_queue_io_callback, l_msg);
if (l_ret != 0){
......@@ -975,40 +979,41 @@ static bool s_msg_opcode_get_all_raw(struct queue_io_msg *a_msg)
{
dap_return_val_if_pass(!a_msg, false);
dap_global_db_iter_t *l_iter = dap_global_db_driver_iter_create(a_msg->group);
if (!l_iter) {
log_it(L_ERROR, "Iterator creation error");
return false;
}
bool l_ret = false;
size_t l_values_count = a_msg->values_page_size;
dap_store_obj_t *l_store_objs = NULL;
dap_nanotime_t l_timestamp = a_msg->timestamp;
size_t l_total_records = dap_global_db_driver_count(a_msg->group, l_timestamp);
if (a_msg->values_page_size >= l_total_records || !a_msg->values_page_size) {
if (!a_msg->values_page_size) {
l_store_objs = dap_global_db_get_all_raw_sync(a_msg->group, &l_values_count);
if (a_msg->callback_results)
l_ret = a_msg->callback_results_raw(s_dbi,
a_msg->callback_results_raw(s_dbi,
l_store_objs ? DAP_GLOBAL_DB_RC_SUCCESS : DAP_GLOBAL_DB_RC_NO_RESULTS,
a_msg->group, l_total_records, l_values_count,
a_msg->group, a_msg->total_records, l_values_count,
l_store_objs, a_msg->callback_arg);
dap_store_obj_free(l_store_objs, l_values_count);
} else {
for (size_t i = 0; (i < l_total_records) && l_ret; i += a_msg->values_page_size) {
l_values_count = i + a_msg->values_page_size < l_total_records ? a_msg->values_page_size : l_total_records - i;
l_store_objs = dap_global_db_driver_cond_read(l_iter, &l_values_count, l_timestamp);
// Call callback if present
if (a_msg->callback_results)
l_ret = a_msg->callback_results_raw(s_dbi,
l_store_objs ? DAP_GLOBAL_DB_RC_SUCCESS : DAP_GLOBAL_DB_RC_NO_RESULTS,
a_msg->group, l_total_records, l_values_count,
l_store_objs, a_msg->callback_arg);
dap_store_obj_free(l_store_objs, l_values_count);
}
}
return l_ret; // All values are sent
// All values are sent
return false;
}
if (!a_msg->total_records)
a_msg->total_records = dap_global_db_driver_count(a_msg->group, c_dap_global_db_driver_hash_start);
if (a_msg->total_records)
l_store_objs = dap_global_db_driver_cond_read(a_msg->group, a_msg->last_hash, &l_values_count);
int l_rc = DAP_GLOBAL_DB_RC_NO_RESULTS;
if (l_store_objs && l_values_count) {
a_msg->processed_records += a_msg->values_page_size;
a_msg->last_hash = dap_global_db_driver_hash_get(l_store_objs + l_values_count - 1);
if (dap_global_db_driver_hash_is_blank(a_msg->last_hash)) {
l_rc = DAP_GLOBAL_DB_RC_PROGRESS;
l_values_count--;
} else
l_rc = DAP_GLOBAL_DB_RC_SUCCESS;
}
// Call callback if present
bool l_ret = false;
if (a_msg->callback_results)
l_ret = a_msg->callback_results_raw(a_msg->dbi, l_rc,
a_msg->group, a_msg->total_records, l_values_count,
l_store_objs, a_msg->callback_arg);
dap_store_obj_free(l_store_objs, l_values_count);
return l_rc == DAP_GLOBAL_DB_RC_PROGRESS && l_ret;
}
static int s_set_sync_with_ts(dap_global_db_instance_t *a_dbi, const char *a_group, const char *a_key, const void *a_value,
......@@ -1619,28 +1624,23 @@ static bool s_queue_io_callback(dap_proc_thread_t UNUSED_ARG *a_thread, void * a
debug_if(g_dap_global_db_debug_more, L_NOTICE, "Received GlobalDB I/O message with opcode %s", s_msg_opcode_to_str(l_msg->opcode) );
switch(l_msg->opcode){
case MSG_OPCODE_GET: s_msg_opcode_get(l_msg); break;
case MSG_OPCODE_GET_RAW: s_msg_opcode_get_raw(l_msg); break;
case MSG_OPCODE_GET_LAST: s_msg_opcode_get_last(l_msg); break;
case MSG_OPCODE_GET_LAST_RAW: s_msg_opcode_get_last_raw(l_msg); break;
case MSG_OPCODE_GET_DEL_TS: s_msg_opcode_get_del_ts(l_msg); break;
case MSG_OPCODE_GET_ALL: if (s_msg_opcode_get_all(l_msg))
return true;
break;
case MSG_OPCODE_GET_ALL_RAW: if (s_msg_opcode_get_all(l_msg))
return true;
break;
case MSG_OPCODE_SET: s_msg_opcode_set(l_msg); break;
case MSG_OPCODE_SET_MULTIPLE: s_msg_opcode_set_multiple_zc(l_msg); break;
case MSG_OPCODE_SET_RAW: s_msg_opcode_set_raw(l_msg); break;
case MSG_OPCODE_PIN: s_msg_opcode_pin(l_msg); break;
case MSG_OPCODE_DELETE: s_msg_opcode_delete(l_msg); break;
case MSG_OPCODE_FLUSH: s_msg_opcode_flush(l_msg); break;
default:{
log_it(L_WARNING, "Message with undefined opcode %d received in queue_io",
l_msg->opcode);
}
switch (l_msg->opcode) {
case MSG_OPCODE_GET: s_msg_opcode_get(l_msg); break;
case MSG_OPCODE_GET_RAW: s_msg_opcode_get_raw(l_msg); break;
case MSG_OPCODE_GET_LAST: s_msg_opcode_get_last(l_msg); break;
case MSG_OPCODE_GET_LAST_RAW: s_msg_opcode_get_last_raw(l_msg); break;
case MSG_OPCODE_GET_DEL_TS: s_msg_opcode_get_del_ts(l_msg); break;
case MSG_OPCODE_GET_ALL: if (s_msg_opcode_get_all(l_msg)) return true;
case MSG_OPCODE_GET_ALL_RAW: if (s_msg_opcode_get_all(l_msg)) return true;
case MSG_OPCODE_SET: s_msg_opcode_set(l_msg); break;
case MSG_OPCODE_SET_MULTIPLE: s_msg_opcode_set_multiple_zc(l_msg); break;
case MSG_OPCODE_SET_RAW: s_msg_opcode_set_raw(l_msg); break;
case MSG_OPCODE_PIN: s_msg_opcode_pin(l_msg); break;
case MSG_OPCODE_DELETE: s_msg_opcode_delete(l_msg); break;
case MSG_OPCODE_FLUSH: s_msg_opcode_flush(l_msg); break;
default:
log_it(L_WARNING, "Message with undefined opcode %d received in queue_io",
l_msg->opcode);
}
s_queue_io_msg_delete(l_msg);
return false;
......
......@@ -58,29 +58,11 @@ void dap_global_db_cluster_deinit()
}
}
static bool s_group_match_mask(const char *a_group, const char *a_mask)
{
dap_return_val_if_fail(a_group && a_mask && *a_group && *a_mask, false);
const char *l_group_tail = a_group + strlen(a_group); // Pointer to trailng zero
if (!strcmp(l_group_tail - sizeof(DAP_GLOBAL_DB_DEL_SUFFIX), DAP_GLOBAL_DB_DEL_SUFFIX))
l_group_tail -= sizeof(DAP_GLOBAL_DB_DEL_SUFFIX); // Pointer to '.' of .del group suffix
const char *l_mask_tail = a_mask + strlen(a_mask);
const char *l_group_it = a_group, *l_mask_it = a_mask;
const char *l_wildcard = strchr(a_mask, '*');
while (l_mask_it < (l_wildcard ? l_wildcard : l_mask_tail) &&
l_group_it < l_group_tail)
if (*l_group_it++ != *l_mask_it++)
return false;
if (l_mask_it == l_wildcard && ++l_mask_it < l_mask_tail)
return strstr(l_group_it, l_mask_it);
return true;
}
dap_global_db_cluster_t *dap_global_db_cluster_by_group(dap_global_db_instance_t *a_dbi, const char *a_group_name)
{
dap_global_db_cluster_t *it;
DL_FOREACH(a_dbi->clusters, it)
if (s_group_match_mask(a_group_name, it->groups_mask))
if (dap_global_db_group_match_mask(a_group_name, it->groups_mask))
return it;
return NULL;
}
......
......@@ -64,10 +64,11 @@
#define LOG_TAG "db_driver"
const dap_global_db_driver_hash_t c_dap_global_db_driver_hash_start = {};
// A selected database driver.
static char s_used_driver [32]; /* Name of the driver */
static dap_db_driver_callbacks_t s_drv_callback; /* A set of interface routines for the selected
DB Driver at startup time */
......@@ -316,12 +317,12 @@ dap_store_obj_t *l_store_obj_cur = a_store_obj;
* @param a_iter data base iterator
* @return Returns a number of objects.
*/
size_t dap_global_db_driver_count(const char *a_group, dap_nanotime_t a_timestamp)
size_t dap_global_db_driver_count(const char *a_group, dap_global_db_driver_hash_t a_hash_from)
{
size_t l_count_out = 0;
// read the number of items
if(s_drv_callback.read_count_store)
l_count_out = s_drv_callback.read_count_store(a_group, a_timestamp);
if (s_drv_callback.read_count_store)
l_count_out = s_drv_callback.read_count_store(a_group, a_hash_from);
return l_count_out;
}
......@@ -362,64 +363,15 @@ dap_store_obj_t* dap_global_db_driver_read_last(const char *a_group)
* @param a_count_out elements count
* @return If successful, a pointer to the object, otherwise NULL.
*/
dap_store_obj_t* dap_global_db_driver_cond_read(dap_global_db_iter_t* a_iter, size_t *a_count_out, dap_nanotime_t a_timestamp)
dap_store_obj_t *dap_global_db_driver_cond_read(const char *a_group, dap_global_db_driver_hash_t a_hash_from, size_t *a_count_out)
{
dap_return_val_if_pass(!a_iter, NULL);
dap_store_obj_t *l_ret = NULL;
dap_return_val_if_fail(a_group, NULL);
// read records using the selected database engine
if(s_drv_callback.read_cond_store_obj)
l_ret = s_drv_callback.read_cond_store_obj(a_iter, a_count_out, a_timestamp);
return l_ret;
if (s_drv_callback.read_cond_store_obj)
return s_drv_callback.read_cond_store_obj(a_group, a_hash_from, a_count_out);
return NULL;
}
/**
* @brief Create iterator to the first element in the a_group database.
* @param a_group the group name string
* @return If successful, a pointer to an iterator, otherwise NULL.
*/
dap_global_db_iter_t *dap_global_db_driver_iter_create(const char *a_group)
{
if (!a_group || !s_drv_callback.iter_create)
return NULL;
// create return object
dap_global_db_iter_t *l_ret = DAP_NEW_Z(dap_global_db_iter_t);
if (!l_ret) {
log_it(L_CRITICAL, "Memory allocation error");
return NULL;
}
l_ret->db_group = dap_strdup(a_group);
if (!l_ret->db_group) {
log_it(L_CRITICAL, "Memory allocation error");
DAP_DELETE(l_ret);
return NULL;
}
if (s_drv_callback.iter_create(l_ret)) {
log_it(L_ERROR, "Error iterator create in %s, line %d", __PRETTY_FUNCTION__, __LINE__);
DAP_DELETE(l_ret->db_group);
DAP_DELETE(l_ret);
return NULL;
}
return l_ret;
}
/**
* @brief Delete iterator and free memory
* @param a_iter deleting itaretor
* @return -.
*/
void dap_global_db_driver_iter_delete(dap_global_db_iter_t* a_iter)
{
dap_return_if_pass(!a_iter);
DAP_DEL_Z(a_iter->db_iter);
DAP_DEL_Z(a_iter->db_group);
DAP_DEL_Z(a_iter);
}
/**
* @brief Reads several objects from a database by a_group and a_key.
* If a_key is NULL, reads whole group.
......@@ -447,14 +399,14 @@ dap_store_obj_t* dap_global_db_driver_read(const char *a_group, const char *a_ke
bool dap_global_db_driver_is(const char *a_group, const char *a_key)
{
// read records using the selected database engine
if(s_drv_callback.is_obj && a_group && a_key)
if (s_drv_callback.is_obj && a_group && a_key)
return s_drv_callback.is_obj(a_group, a_key);
else
return false;
return false;
}
bool dap_global_db_driver_is_hash(const char *a_group, const dap_global_db_driver_hash_t *a_hash)
bool dap_global_db_driver_is_hash(const char *a_group, dap_global_db_driver_hash_t a_hash)
{
// TODO 9575
if (s_drv_callback.is_hash && a_group)
return s_drv_callback.is_hash(a_group, a_hash);
return false;
}
/*
/*
* AUTHORS:
* Ruslan R. (The BadAss SysMan) Laishev <ruslan.laishev@demlabs.net>
* DeM Labs Ltd. https://demlabs.net
......@@ -74,13 +74,12 @@ typedef struct __db_ctx__ {
/*
* MDBX record structure
*/
struct DAP_ALIGN_PACKED driver_record {
dap_nanotime_t timestamp; /* Timestamp of the record */
struct DAP_ALIGN_PACKED driver_record { /* Timestamp and CRC is the driver key of the record, packed in big-endian format */
uint64_t key_len; /* Legth of global DB text key part */
uint64_t value_len; /* Length of value part */
uint8_t flags; /* Flag of the record : see RECORD_FLAGS enums */
uint32_t crc; /* Object integrity */
uint64_t sign_len; /* Size control */
byte_t value_n_sign[]; /* Serialized form */
uint8_t flags; /* Flag of the record : see RECORD_FLAGS enums */
byte_t key_n_value_n_sign[]; /* Serialized form */
};
static pthread_mutex_t s_db_ctx_mutex = PTHREAD_MUTEX_INITIALIZER; /* A mutex for working with a DB context */
......@@ -94,13 +93,13 @@ static int s_db_mdbx_deinit();
static int s_db_mdbx_flush(void);
static int s_db_mdbx_apply_store_obj (dap_store_obj_t *a_store_obj);
static dap_store_obj_t *s_db_mdbx_read_last_store_obj(const char* a_group);
static dap_store_obj_t *s_db_mdbx_get_by_hash(const char *a_group, dap_global_db_driver_hash_t a_hash);
static bool s_db_mdbx_is_obj(const char *a_group, const char *a_key);
static bool s_db_mdbx_is_hash(const char *a_group, dap_global_db_driver_hash_t a_hash);
static dap_store_obj_t *s_db_mdbx_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out);
static dap_store_obj_t *s_db_mdbx_read_cond_store_obj(dap_global_db_iter_t *a_iter, size_t *a_count_out, dap_nanotime_t a_timestamp);
static size_t s_db_mdbx_read_count_store(const char *a_group, dap_nanotime_t a_timestamp);
static dap_store_obj_t *s_db_mdbx_read_cond_store_obj(const char *a_group, dap_global_db_driver_hash_t a_hash_from, size_t *a_count_out);
static size_t s_db_mdbx_read_count_store(const char *a_group, dap_global_db_driver_hash_t a_hash_from);
static dap_list_t *s_db_mdbx_get_groups_by_mask(const char *a_group_mask);
static int s_db_mdbx_iter_create(dap_global_db_iter_t *a_iter);
static MDBX_env *s_mdbx_env; /* MDBX's context area */
static char s_subdir [] = ""; /* Name of subdir for the MDBX's database files */
......@@ -342,7 +341,7 @@ size_t l_upper_limit_of_db_size = 16;
* [resources]
* mdbx_upper_limit_of_db_size=32
*/
l_upper_limit_of_db_size = dap_config_get_item_uint32_default ( g_config, "resources", "mdbx_upper_limit_of_db_size", l_upper_limit_of_db_size);
l_upper_limit_of_db_size = dap_config_get_item_uint32_default ( g_config, "global_db", "mdbx_upper_limit_of_db_size", l_upper_limit_of_db_size);
l_upper_limit_of_db_size *= 1024*1024*1024ULL;
log_it(L_INFO, "Set MDBX Upper Limit of DB Size to %zu octets", l_upper_limit_of_db_size);
......@@ -364,7 +363,6 @@ size_t l_upper_limit_of_db_size = 16;
dap_assert ( !(l_rc = mdbx_env_set_maxdbs (s_mdbx_env, DAP_GLOBAL_DB_GROUPS_COUNT_MAX)) );/* Set maximum number of the file-tables (MDBX subDB)
according to number of supported groups */
/* We set "unlim" for all MDBX characteristics at the moment */
if ( MDBX_SUCCESS != (l_rc = mdbx_env_set_geometry(s_mdbx_env, -1, -1, l_upper_limit_of_db_size, -1, -1, -1)) )
......@@ -428,15 +426,15 @@ size_t l_upper_limit_of_db_size = 16;
*/
a_drv_dpt->apply_store_obj = s_db_mdbx_apply_store_obj;
a_drv_dpt->read_last_store_obj = s_db_mdbx_read_last_store_obj;
a_drv_dpt->get_by_hash = s_db_mdbx_get_by_hash;
a_drv_dpt->read_store_obj = s_db_mdbx_read_store_obj;
a_drv_dpt->read_cond_store_obj = s_db_mdbx_read_cond_store_obj;
a_drv_dpt->read_count_store = s_db_mdbx_read_count_store;
a_drv_dpt->get_groups_by_mask = s_db_mdbx_get_groups_by_mask;
a_drv_dpt->is_obj = s_db_mdbx_is_obj;
a_drv_dpt->is_hash = s_db_mdbx_is_hash;
a_drv_dpt->deinit = s_db_mdbx_deinit;
a_drv_dpt->flush = s_db_mdbx_flush;
a_drv_dpt->iter_create = s_db_mdbx_iter_create;
/*
* MDBX support transactions but on the current circuimstance we will not get
......@@ -496,31 +494,11 @@ static int s_db_mdbx_flush(void)
return log_it(L_DEBUG, "Flushing resident part of the MDBX to disk"), 0;
}
/**
* @brief Create iterator with position on first element
*
* @param a_group a group name string
* @return If successful, a pointer to an objects, otherwise NULL.
*/
static int s_db_mdbx_iter_create(dap_global_db_iter_t *a_iter)
{
dap_return_val_if_pass(!a_iter || !a_iter->db_group, -1); /* Sanity check */
MDBX_val *l_mdbx_iter = DAP_NEW_Z(MDBX_val);
if (!l_mdbx_iter) {
log_it(L_CRITICAL, "Memory allocation error");
return -2;
}
a_iter->db_type = DAP_GLOBAL_DB_TYPE_CURRENT;
a_iter->db_iter = (void *)l_mdbx_iter;
return 0;
}
/*
* DESCRIPTION: Action routine to read record with a give <id > from the table
* DESCRIPTION: Action routine to read record from the table
*
* INPUTS:
* a_group: A group/table name to be looked in
* a_id: An id of record to be looked for
* a_obj: An address to the <store object> with the record
*
* OUTPUTS:
......@@ -540,43 +518,80 @@ int s_fill_store_obj(const char *a_group, MDBX_val *a_key, MDBX_val *a_data, dap
if (!a_obj->group)
return log_it(L_CRITICAL, "Cannot allocate a memory for store object group"), -3;
if (!a_key->iov_len)
return log_it(L_ERROR, "Zero length of global DB record key"), -4;
if ( (a_obj->key = DAP_NEW_Z_SIZE(char, a_key->iov_len + 1)) )
memcpy((char *)a_obj->key, a_key->iov_base, a_key->iov_len);
else {
if (a_key->iov_len != sizeof(dap_global_db_driver_hash_t)) {
DAP_DELETE(a_obj->group);
return log_it(L_CRITICAL, "Cannot allocate a memory for store object key"), -5;
return log_it(L_ERROR, "Invalid length of global DB record key, expected %zu, got %zu",
sizeof(dap_global_db_driver_hash_t), a_key->iov_len), -4;
}
dap_global_db_driver_hash_t *l_driver_key = a_key->iov_base;
a_obj->timestamp = be64toh(l_driver_key->bets);
a_obj->crc = be64toh(l_driver_key->becrc);
struct driver_record *l_record = a_data->iov_base;
if (a_data->iov_len < sizeof(*l_record) || // Do not intersct bounds of readed array, check it twice
a_data->iov_len < sizeof(*l_record) + l_record->sign_len + l_record->value_len ||
l_record->sign_len == 0)
if (a_data->iov_len < sizeof(*l_record) || // Do not intersect bounds of read array, check it twice
a_data->iov_len < sizeof(*l_record) + l_record->sign_len + l_record->value_len + l_record->key_len) {
DAP_DELETE(a_obj->group);
return log_it(L_ERROR, "Corrupted global DB record internal value"), -6;
a_obj->timestamp = l_record->timestamp;
}
if (!l_record->key_len) {
DAP_DELETE(a_obj->group);
return log_it(L_ERROR, "Ivalid driver record with zero text key length"), -9;
}
if ( (a_obj->key = DAP_NEW_SIZE(char, l_record->key_len)) )
memcpy((char *)a_obj->key, l_record->key_n_value_n_sign, l_record->key_len);
else {
DAP_DELETE(a_obj->group);
return log_it(L_CRITICAL, "Cannot allocate a memory for store object key"), -5;
}
a_obj->value_len = l_record->value_len;
a_obj->flags = l_record->flags;
a_obj->crc = l_record->crc;
if (a_obj->value_len &&
!(a_obj->value = DAP_DUP_SIZE(l_record->value_n_sign, a_obj->value_len))) {
if (a_obj->value_len &&
!(a_obj->value = DAP_DUP_SIZE(l_record->key_n_value_n_sign + l_record->key_len, a_obj->value_len))) {
DAP_DELETE(a_obj->group);
DAP_DELETE(a_obj->key);
return log_it(L_CRITICAL, "Cannot allocate a memory for store object value"), -7;
}
dap_sign_t *l_sign = (dap_sign_t *)(l_record->value_n_sign + l_record->value_len);
if (dap_sign_get_size(l_sign) != l_record->sign_len ||
!(a_obj->sign = (dap_sign_t *)DAP_DUP_SIZE(l_sign, l_record->sign_len))) {
DAP_DELETE(a_obj->group);
DAP_DELETE(a_obj->key);
DAP_DEL_Z(a_obj->value);
if (dap_sign_get_size(l_sign) != l_record->sign_len)
return log_it(L_ERROR, "Corrupted global DB record internal value"), -6;
else
return log_it(L_CRITICAL, "Cannot allocate a memory for store object value"), -8;
if (l_record->sign_len >= sizeof(dap_sign_t)) {
dap_sign_t *l_sign = (dap_sign_t *)(l_record->key_n_value_n_sign + l_record->key_len + l_record->value_len);
if (dap_sign_get_size(l_sign) != l_record->sign_len ||
!(a_obj->sign = (dap_sign_t *)DAP_DUP_SIZE(l_sign, l_record->sign_len))) {
DAP_DELETE(a_obj->group);
DAP_DELETE(a_obj->key);
DAP_DEL_Z(a_obj->value);
if (dap_sign_get_size(l_sign) != l_record->sign_len)
return log_it(L_ERROR, "Corrupted global DB record internal value"), -6;
else
return log_it(L_CRITICAL, "Cannot allocate a memory for store object value"), -8;
}
}
return 0;
}
int s_get_obj_by_text_key(MDBX_txn *a_txn, MDBX_dbi a_dbi, MDBX_val *a_key, MDBX_val *a_data, const char *a_text_key)
{
int l_rc = 0;
MDBX_cursor *l_cursor = NULL;
if ( MDBX_SUCCESS != (l_rc = mdbx_cursor_open(a_txn, a_dbi, &l_cursor)) ) {
log_it(L_ERROR, "mdbx_cursor_open: (%d) %s", l_rc, mdbx_strerror(l_rc));
return l_rc;
}
if ( MDBX_SUCCESS != (l_rc = mdbx_cursor_get(l_cursor, a_key, a_data, MDBX_FIRST)) ) {
log_it(L_ERROR, "mdbx_cursor_get: (%d) %s", l_rc, mdbx_strerror(l_rc));
return l_rc;
}
size_t l_key_len = strlen(a_text_key) + 1;
do {
struct driver_record *l_record = a_data->iov_base;
if (l_key_len == l_record->key_len &&
!memcmp(l_record->key_n_value_n_sign, a_text_key, l_key_len)) {
mdbx_cursor_close(l_cursor);
return MDBX_SUCCESS;
}
} while ( MDBX_SUCCESS == (l_rc = mdbx_cursor_get(l_cursor, a_key, a_data, MDBX_NEXT)) );
mdbx_cursor_close(l_cursor);
return MDBX_NOTFOUND;
}
/*
* DESCRIPTION: Action routine - lookup in the group/table a last stored record (with the bigest Id).
* We mainatain internaly <id> of record (it's just sequence),
......@@ -601,42 +616,37 @@ MDBX_val l_key={0}, l_data={0};
MDBX_cursor *l_cursor = NULL;
dap_store_obj_t *l_obj;
/* Sanity check and Get DB Context for group/table*/
dap_return_val_if_pass(!a_group || !(l_db_ctx = s_get_db_ctx_for_group(a_group)), NULL)
/* Sanity check for group/table */
dap_return_val_if_fail(a_group, NULL);
if ( !(l_db_ctx = s_get_db_ctx_for_group(a_group)) )
return NULL;
MDBX_txn *l_txn;
if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, MDBX_TXN_RDONLY, &l_txn)) )
return log_it (L_ERROR, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)), NULL;
if ( MDBX_SUCCESS != (l_rc = mdbx_cursor_open(l_txn, l_db_ctx->dbi, &l_cursor)) ) {
mdbx_txn_commit(l_txn);
log_it(L_ERROR, "mdbx_cursor_open: (%d) %s", l_rc, mdbx_strerror(l_rc));
return NULL;
}
/* Iterate cursor to retieve records from DB - select a <key> and <data> pair
** with maximal <id>
*/
MDBX_val l_last_key = {}, l_last_data = {};
dap_nanotime_t l_current_timestamp = 0;
while (MDBX_SUCCESS == (l_rc = mdbx_cursor_get(l_cursor, &l_key, &l_data, MDBX_NEXT))) {
struct driver_record *l_record = l_data.iov_base;
if (l_record->timestamp < l_current_timestamp) {
l_current_timestamp = l_record->timestamp;
l_last_key = l_key;
l_last_data = l_data;
}
if ( MDBX_SUCCESS != (l_rc = mdbx_cursor_get(l_cursor, &l_key, &l_data, MDBX_LAST)) ) {
mdbx_txn_commit(l_txn);
log_it(L_ERROR, "mdbx_cursor_get: (%d) %s", l_rc, mdbx_strerror(l_rc));
return NULL;
}
if (l_cursor) // Release uncesessary MDBX cursor area,
mdbx_cursor_close(l_cursor); //but keep transaction !!!
if (!l_last_key.iov_len || !l_last_data.iov_len) { /* Not found anything - return NULL */
if (!l_key.iov_len || !l_data.iov_len) { /* Not found anything - return NULL */
mdbx_txn_commit(l_txn);
return NULL;
}
/* Found ! Allocate memory for <store object>, <key> and <value> */
if ( (l_obj = DAP_CALLOC(1, sizeof( dap_store_obj_t ))) ) {
if (s_fill_store_obj(a_group, &l_last_key, &l_last_data, l_obj)) {
if (s_fill_store_obj(a_group, &l_key, &l_data, l_obj)) {
l_rc = MDBX_PROBLEM;
DAP_DEL_Z(l_obj);
}
......@@ -644,7 +654,8 @@ dap_store_obj_t *l_obj;
l_rc = MDBX_PROBLEM, log_it (L_ERROR, "Cannot allocate a memory for store object, errno=%d", errno);
}
mdbx_txn_commit(l_txn);
return l_rc == MDBX_SUCCESS ? l_obj : NULL;
return l_obj;
}
/*
......@@ -659,31 +670,77 @@ dap_store_obj_t *l_obj;
* 1 - SUCCESS, record is exist
* 0 - Record-No-Found
*/
bool s_db_mdbx_is_obj(const char *a_group, const char *a_key)
bool s_db_mdbx_is_obj(const char *a_group, const char *a_key)
{
int l_rc, l_rc2;
dap_db_ctx_t *l_db_ctx;
MDBX_val l_key, l_data;
MDBX_val l_key, l_data;
if (!a_group || !a_key) /* Sanity check */
return 0;
dap_return_val_if_fail(a_group && a_key, NULL) /* Sanity check */
if ( !(l_db_ctx = s_get_db_ctx_for_group(a_group)) ) /* Get DB Context for group/table */
return 0;
MDBX_txn *l_txn;
if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, MDBX_TXN_RDONLY, &l_txn)) )
return log_it(L_ERROR, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)), 0;
l_key.iov_base = (void *)a_key; /* Fill IOV for MDBX key */
l_key.iov_len = strlen(a_key);
l_rc = mdbx_get(l_txn, l_db_ctx->dbi, &l_key, &l_data);
l_rc = s_get_obj_by_text_key(l_txn, l_db_ctx->dbi, &l_key, &l_data, a_key);
if ( MDBX_SUCCESS != (l_rc2 = mdbx_txn_commit(l_txn)) )
log_it (L_ERROR, "mdbx_txn_commit: (%d) %s", l_rc2, mdbx_strerror(l_rc2));
return ( l_rc == MDBX_SUCCESS ); /*0 - RNF, 1 - SUCCESS */
}
static bool s_db_mdbx_is_hash(const char *a_group, dap_global_db_driver_hash_t a_hash)
{
dap_return_val_if_fail(a_group, NULL); /* Sanity check */
dap_db_ctx_t *l_db_ctx = s_get_db_ctx_for_group(a_group);
if (!l_db_ctx)
return false;
int l_rc;
MDBX_txn *l_txn;
if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, MDBX_TXN_RDONLY, &l_txn)) )
return log_it(L_ERROR, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)), NULL;
MDBX_val l_key, l_data;
l_key.iov_base = &a_hash; /* Fill IOV for MDBX key */
l_key.iov_len = sizeof(a_hash);
l_rc = mdbx_get(l_txn, l_db_ctx->dbi, &l_key, &l_data);
if (l_rc != MDBX_NOTFOUND && l_rc != MDBX_SUCCESS)
log_it (L_ERROR, "mdbx_get: (%d) %s", l_rc, mdbx_strerror(l_rc));
mdbx_txn_commit(l_txn);
return l_rc == MDBX_SUCCESS;
}
static dap_store_obj_t *s_db_mdbx_get_by_hash(const char *a_group, dap_global_db_driver_hash_t a_hash)
{
dap_return_val_if_fail(a_group, NULL); /* Sanity check */
dap_db_ctx_t *l_db_ctx = s_get_db_ctx_for_group(a_group);
if (!l_db_ctx)
return false;
int l_rc;
MDBX_txn *l_txn;
if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, MDBX_TXN_RDONLY, &l_txn)) )
return log_it(L_ERROR, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)), NULL;
MDBX_val l_key, l_data;
l_key.iov_base = &a_hash; /* Fill IOV for MDBX key */
l_key.iov_len = sizeof(a_hash);
dap_store_obj_t *l_obj = NULL;
if (MDBX_SUCCESS == (l_rc = mdbx_get(l_txn, l_db_ctx->dbi, &l_key, &l_data))) {
/* Found ! Make new <store_obj> */
if ( !(l_obj = DAP_NEW_Z(dap_store_obj_t)) ) {
log_it (L_ERROR, "Cannot allocate a memory for store object key, errno=%d", errno);
l_rc = MDBX_PROBLEM;
} else if ( s_fill_store_obj(a_group, &l_key, &l_data, l_obj) ) {
l_rc = MDBX_PROBLEM;
DAP_DEL_Z(l_obj);
}
} else if ( l_rc != MDBX_NOTFOUND )
log_it (L_ERROR, "mdbx_get: (%d) %s", l_rc, mdbx_strerror(l_rc));
mdbx_txn_commit(l_txn);
return l_obj;
}
/**
* @brief Reads some objects from a database by conditions
* @param a_iter iterator to looked for item
......@@ -691,68 +748,67 @@ MDBX_val l_key, l_data;
* @param a_count_out[out] a number of objects that were read
* @return If successful, a pointer to an objects, otherwise NULL.
*/
static dap_store_obj_t *s_db_mdbx_read_cond_store_obj(dap_global_db_iter_t *a_iter, size_t *a_count_out, dap_nanotime_t a_timestamp)
static dap_store_obj_t *s_db_mdbx_read_cond_store_obj(const char *a_group, dap_global_db_driver_hash_t a_hash_from, size_t *a_count_out)
{
dap_return_val_if_pass(!a_iter || !a_iter->db_iter || !a_iter->db_group, NULL); /* Sanity check, if !a_count_out return all items*/
if (a_iter->db_type != DAP_GLOBAL_DB_TYPE_CURRENT) {
log_it(L_ERROR, "Trying use iterator from another database for MDBX");
dap_return_val_if_fail(a_group, NULL); /* Sanity check */
dap_db_ctx_t *l_db_ctx = s_get_db_ctx_for_group(a_group);
if (!l_db_ctx)
return NULL;
}
int l_rc = 0;
MDBX_val l_data = {0};
MDBX_cursor* l_cursor = NULL;
size_t l_count_current = 0,
l_count_out = a_count_out ? *a_count_out : 0;
if (!l_count_out)
l_count_out = DAP_GLOBAL_DB_COND_READ_COUNT_DEFAULT;
dap_store_obj_t *l_obj = NULL, *l_obj_arr = NULL;
size_t l_count_out = 0, l_count_current = 0;
dap_db_ctx_t *l_db_ctx = s_get_db_ctx_for_group(a_iter->db_group);
dap_return_val_if_pass(!l_db_ctx, NULL); /* Sanity check */
/* Initialize MDBX cursor context area */
MDBX_txn *l_txn;
if (MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, MDBX_TXN_RDONLY, &l_txn)) ||
MDBX_SUCCESS != (l_rc = mdbx_cursor_open(l_txn, l_db_ctx->dbi, &l_cursor))) {
int l_rc = 0;
MDBX_txn *l_txn = NULL;
if (MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, MDBX_TXN_RDONLY, &l_txn))) {
log_it (L_ERROR, "mdbx_txn: (%d) %s", l_rc, mdbx_strerror(l_rc));
goto safe_ret;
}
MDBX_val *l_key = (MDBX_val *)a_iter->db_iter;
if (l_key->iov_base)
l_rc = mdbx_cursor_get(l_cursor, l_key, NULL, MDBX_SET_RANGE);
if (a_count_out && *a_count_out)
l_count_out = *a_count_out;
else
l_count_current = s_db_mdbx_read_count_store(a_iter->db_group, a_timestamp);
if (!(l_obj_arr = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(dap_store_obj_t) * l_count_out))) {
log_it(L_CRITICAL, "Memory allocation error");
MDBX_cursor *l_cursor = NULL;
if ( MDBX_SUCCESS != (l_rc = mdbx_cursor_open(l_txn, l_db_ctx->dbi, &l_cursor)) ) {
log_it(L_ERROR, "mdbx_cursor_open: (%d) %s", l_rc, mdbx_strerror(l_rc));
goto safe_ret;
}
MDBX_val l_key = { .iov_base = &a_hash_from, .iov_len = sizeof(a_hash_from) },
l_data = {};
if ( MDBX_SUCCESS != (l_rc = mdbx_cursor_get(l_cursor, &l_key, &l_data, MDBX_SET_UPPERBOUND))) {
if (l_rc != MDBX_NOTFOUND)
log_it(L_ERROR, "mdbx_cursor_get: (%d) %s", l_rc, mdbx_strerror(l_rc));
goto safe_ret;
}
l_obj_arr = DAP_NEW_Z_SIZE(dap_store_obj_t, l_count_out * sizeof(dap_store_obj_t) + 1);
if (!l_obj_arr) {
log_it(L_CRITICAL, "Can't allocate memory");
goto safe_ret;
}
/* Iterate cursor to retrieve records from DB */
for (int i = l_count_out; i && (MDBX_SUCCESS == (l_rc = mdbx_cursor_get(l_cursor, l_key, &l_data, MDBX_NEXT))); i--) {
struct driver_record *l_record = l_data.iov_base;
if (l_record->timestamp > a_timestamp) { /* Add if newer */
l_obj = l_obj_arr + l_count_current; /* Point <l_obj> to last array's element */
if (s_fill_store_obj(a_iter->db_group, l_key, &l_data, l_obj)) {
l_rc = MDBX_PROBLEM;
break;
}
l_count_current++;
do {
l_obj = l_obj_arr + l_count_current; // Point <l_obj> to last array's element
if (s_fill_store_obj(a_group, &l_key, &l_data, l_obj)) {
l_rc = MDBX_PROBLEM;
break;
}
}
} while (++l_count_current < l_count_out &&
(MDBX_SUCCESS == (l_rc = mdbx_cursor_get(l_cursor, &l_key, &l_data, MDBX_NEXT))));
// cut unused memory
if (!l_count_current) {
DAP_DEL_Z(l_obj_arr);
} else if (l_count_current < l_count_out && !(l_obj_arr = DAP_REALLOC(l_obj_arr, sizeof(dap_store_obj_t) * l_count_current))) {
log_it(L_ERROR, "Cannot cut area to keep %zu <store objects>", l_count_current);
l_rc = MDBX_PROBLEM;
if (l_rc == MDBX_NOTFOUND) {
// Add blank object to the end as marker of final itreation
l_count_current++;
if (l_count_current < l_count_out && !(l_obj_arr = DAP_REALLOC(l_obj_arr, sizeof(dap_store_obj_t) * l_count_current))) {
log_it(L_ERROR, "Cannot cut area to keep %zu <store objects>", l_count_current);
l_rc = MDBX_PROBLEM;
}
}
if ( (MDBX_SUCCESS != l_rc) && (l_rc != MDBX_NOTFOUND) )
log_it (L_ERROR, "mdbx_cursor_get: (%d) %s", l_rc, mdbx_strerror(l_rc));
log_it (L_ERROR, "mdbx_read_cond_store_obj: (%d) %s", l_rc, mdbx_strerror(l_rc));
safe_ret:
if (l_cursor)
mdbx_cursor_close(l_cursor);
mdbx_txn_commit(l_txn);
if (l_txn)
mdbx_txn_commit(l_txn);
if (a_count_out)
*a_count_out = l_count_current;
return l_obj_arr;
......@@ -763,46 +819,50 @@ safe_ret:
* @param a_iter started iterator
* @return count of has been found record.
*/
size_t s_db_mdbx_read_count_store(const char *a_group, dap_nanotime_t a_timestamp)
static size_t s_db_mdbx_read_count_store(const char *a_group, dap_global_db_driver_hash_t a_hash_from)
{
dap_return_val_if_fail(a_group, 0); /* Sanity check */
int l_rc = 0;
MDBX_val l_key = {}, l_data = {};
MDBX_cursor* l_cursor = NULL;
dap_store_obj_t *l_obj = NULL;
size_t l_ret_count = 0;
dap_db_ctx_t *l_db_ctx = s_get_db_ctx_for_group(a_group);
dap_return_val_if_pass(!l_db_ctx, 0);
if (!(l_obj = DAP_NEW_Z(dap_store_obj_t))) {
log_it(L_CRITICAL, "Memory allocation error");
if (!l_db_ctx)
return 0;
}
int l_rc = 0;
MDBX_txn *l_txn;
if (
MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, MDBX_TXN_RDONLY, &l_txn)) ||
MDBX_SUCCESS != (l_rc = mdbx_cursor_open(l_txn, l_db_ctx->dbi, &l_cursor))
) {
if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, MDBX_TXN_RDONLY, &l_txn))) {
log_it(L_ERROR, "mdbx_txn: (%d) %s", l_rc, mdbx_strerror(l_rc));
return 0;
}
// Return all entries count
if (dap_global_db_driver_hash_is_blank(a_hash_from)) {
MDBX_stat l_stat;
l_rc = mdbx_dbi_stat(l_txn, l_db_ctx->dbi, &l_stat, sizeof(MDBX_stat));
if (l_rc != MDBX_SUCCESS)
log_it(L_ERROR, "mdbx_dbi_stat: (%d) %s", l_rc, mdbx_strerror(l_rc));
else if (!l_stat.ms_entries) /* Nothing to retrieve , table contains no record */
debug_if(g_dap_global_db_debug_more, L_WARNING, "No object (-s) to be retrieved from the group '%s'", a_group);
mdbx_txn_commit(l_txn);
return l_rc == MDBX_SUCCESS ? l_stat.ms_entries : 0;
}
// Return count of entries after specified position by driver hash
MDBX_cursor *l_cursor = NULL;
if ( MDBX_SUCCESS != (l_rc = mdbx_cursor_open(l_txn, l_db_ctx->dbi, &l_cursor)) ) {
log_it(L_ERROR, "mdbx_cursor_open: (%d) %s", l_rc, mdbx_strerror(l_rc));
mdbx_txn_commit(l_txn);
log_it (L_ERROR, "mdbx_txn: (%d) %s", l_rc, mdbx_strerror(l_rc));
DAP_DEL_Z(l_obj);
return 0;
}
while ((MDBX_SUCCESS == (l_rc = mdbx_cursor_get(l_cursor, &l_key, &l_data, MDBX_NEXT))))
if (((struct driver_record *)l_data.iov_base)->timestamp > a_timestamp)
l_ret_count++;
if ( (MDBX_SUCCESS != l_rc) && (l_rc != MDBX_NOTFOUND) ) {
log_it (L_ERROR, "mdbx_cursor_get: (%d) %s", l_rc, mdbx_strerror(l_rc));
MDBX_val l_key = { .iov_base = &a_hash_from, .iov_len = sizeof(a_hash_from) },
l_data = {};
if ( MDBX_SUCCESS != (l_rc = mdbx_cursor_get(l_cursor, &l_key, &l_data, MDBX_SET_UPPERBOUND))) {
mdbx_cursor_close(l_cursor);
mdbx_txn_commit(l_txn);
if (l_rc != MDBX_NOTFOUND)
log_it(L_ERROR, "mdbx_cursor_get: (%d) %s", l_rc, mdbx_strerror(l_rc));
return 0;
}
size_t l_ret_count = 0;
while ((MDBX_SUCCESS == (l_rc = mdbx_cursor_get(l_cursor, &l_key, &l_data, MDBX_NEXT))))
l_ret_count++;
mdbx_cursor_close(l_cursor);
mdbx_txn_commit(l_txn);
dap_store_obj_free(l_obj, 1);
return l_ret_count;
}
......@@ -822,20 +882,16 @@ size_t s_db_mdbx_read_count_store(const char *a_group, dap_nanotime_t a_timestam
static dap_list_t *s_db_mdbx_get_groups_by_mask(const char *a_group_mask)
{
dap_list_t *l_ret_list;
dap_list_t *l_ret_list = NULL;
dap_db_ctx_t *l_db_ctx, *l_db_ctx2;
if(!a_group_mask)
return NULL;
l_ret_list = NULL;
dap_return_val_if_fail(a_group_mask, NULL);
dap_assert ( !pthread_rwlock_rdlock(&s_db_ctxs_rwlock) );
HASH_ITER(hh, s_db_ctxs, l_db_ctx, l_db_ctx2) {
if (!dap_fnmatch(a_group_mask, l_db_ctx->name, 0) ) /* Name match a pattern/mask ? */
l_ret_list = dap_list_prepend(l_ret_list, l_db_ctx->name); /* Add group name to output list */
}
HASH_ITER(hh, s_db_ctxs, l_db_ctx, l_db_ctx2)
if (dap_global_db_group_match_mask(l_db_ctx->name, a_group_mask) ) /* Name match a pattern/mask ? */
l_ret_list = dap_list_append(l_ret_list, l_db_ctx->name); /* Add group name to output list */
dap_assert ( !pthread_rwlock_unlock(&s_db_ctxs_rwlock) );
......@@ -857,7 +913,7 @@ dap_db_ctx_t *l_db_ctx, *l_db_ctx2;
* 0 - SUCCESS
* 0> - <errno>
*/
static int s_db_mdbx_apply_store_obj (dap_store_obj_t *a_store_obj)
static int s_db_mdbx_apply_store_obj(dap_store_obj_t *a_store_obj)
{
int l_rc = 0, l_rc2;
dap_db_ctx_t *l_db_ctx;
......@@ -876,43 +932,52 @@ MDBX_txn *l_txn;
}
/* At this point we have got the DB Context for the table/group so we are can performs a main work */
if (a_store_obj->type == DAP_GLOBAL_DB_OPTYPE_ADD) {
if( !a_store_obj->key )
if( !a_store_obj->key || !*a_store_obj->key)
return -ENOENT;
l_key.iov_base = (void *)a_store_obj->key; /* Fill IOV for MDBX key */
l_key.iov_len = strnlen(a_store_obj->key, DAP_GLOBAL_DB_KEY_SIZE_MAX);
/* Fill IOV for MDBX key */
if (!a_store_obj->crc) {
log_it(L_ERROR, "Global DB store object corrupted");
return MDBX_EINVAL;
}
if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, 0, &l_txn)) )
return log_it(L_ERROR, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)), l_rc;
l_rc = s_get_obj_by_text_key(l_txn, l_db_ctx->dbi, &l_key, &l_data, a_store_obj->key);
if (l_rc == MDBX_SUCCESS) {
// Drop object with same text key
if ( MDBX_SUCCESS != (l_rc = mdbx_del(l_txn, l_db_ctx->dbi, &l_key, NULL)) && l_rc != MDBX_NOTFOUND) {
mdbx_txn_abort(l_txn);
return log_it(L_ERROR, "mdbx_del: (%d) %s", l_rc, mdbx_strerror(l_rc)), l_rc;
}
}
mdbx_txn_commit(l_txn);
dap_global_db_driver_hash_t l_driver_key = dap_global_db_driver_hash_get(a_store_obj);
l_key.iov_base = &l_driver_key;
l_key.iov_len = sizeof(l_driver_key);
/* Compute a length of the area to keep record */
size_t l_record_len = sizeof(struct driver_record) + a_store_obj->value_len + dap_sign_get_size(a_store_obj->sign);
size_t l_key_len = strnlen(a_store_obj->key, DAP_GLOBAL_DB_KEY_SIZE_MAX - 1) + 1;
size_t l_record_len = sizeof(struct driver_record) + a_store_obj->value_len + l_key_len + dap_sign_get_size(a_store_obj->sign);
struct driver_record *l_record = DAP_NEW_Z_SIZE(struct driver_record, l_record_len);
if (!l_record) {
log_it(L_ERROR, "Cannot allocate memory for new records, %zu octets, errno=%d", l_record_len, errno);
return MDBX_PANIC;
}
l_record->timestamp = a_store_obj->timestamp;
dap_strncpy((char *)l_record->key_n_value_n_sign, a_store_obj->key, DAP_GLOBAL_DB_KEY_SIZE_MAX);
l_record->key_len = l_key_len;
l_record->value_len = a_store_obj->value_len;
// Don't save NEW attribute
l_record->flags = a_store_obj->flags & ~DAP_GLOBAL_DB_RECORD_NEW;
if (!a_store_obj->crc) {
DAP_DELETE(l_record);
log_it(L_ERROR, "Global DB store object corrupted");
return MDBX_EINVAL;
}
l_record->crc = a_store_obj->crc;
if (!a_store_obj->sign) {
DAP_DELETE(l_record);
log_it(L_ERROR, "Global DB store object unsigned");
return MDBX_EINVAL;
}
l_record->sign_len = dap_sign_get_size(a_store_obj->sign);
if (!l_record->sign_len) {
DAP_DELETE(l_record);
log_it(L_ERROR, "Global DB store object sign corrupted");
return MDBX_EINVAL;
}
if (a_store_obj->value_len) /* Put <value> into the record */
memcpy(l_record->value_n_sign, a_store_obj->value, a_store_obj->value_len);
/* Put the authorization sign */
memcpy(l_record->value_n_sign + a_store_obj->value_len, a_store_obj->sign, l_record->sign_len);
memcpy(l_record->key_n_value_n_sign + l_key_len, a_store_obj->value, a_store_obj->value_len);
if (a_store_obj->sign) {
/* Put the authorization sign */
l_record->sign_len = dap_sign_get_size(a_store_obj->sign);
memcpy(l_record->key_n_value_n_sign + l_key_len + a_store_obj->value_len, a_store_obj->sign, l_record->sign_len);
if (!l_record->sign_len) {
DAP_DELETE(l_record);
log_it(L_ERROR, "Global DB store object sign corrupted");
return MDBX_EINVAL;
}
}
l_data.iov_base = l_record; /* Fill IOV for MDBX data */
l_data.iov_len = l_record_len;
......@@ -932,12 +997,22 @@ MDBX_txn *l_txn;
if (a_store_obj->type == DAP_GLOBAL_DB_OPTYPE_DEL) {
if ( MDBX_SUCCESS != (l_rc = mdbx_txn_begin(s_mdbx_env, NULL, 0, &l_txn)) )
return log_it (L_ERROR, "mdbx_txn_begin: (%d) %s", l_rc, mdbx_strerror(l_rc)), -ENOENT;
if ( a_store_obj->key ) { /* Delete record */
l_key.iov_base = (void *) a_store_obj->key;
l_key.iov_len = strnlen(a_store_obj->key, DAP_GLOBAL_DB_KEY_SIZE_MAX);
if ( MDBX_SUCCESS != (l_rc = mdbx_del(l_txn, l_db_ctx->dbi, &l_key, NULL)) && l_rc != MDBX_NOTFOUND)
log_it (L_ERROR, "mdbx_del: (%d) %s", l_rc, mdbx_strerror(l_rc));
l_rc = (l_rc == MDBX_NOTFOUND) ? 1 : l_rc; /* Not found? It's OK */
if ( a_store_obj->key ) {
/* Delete record */
if (a_store_obj->crc && a_store_obj->timestamp) {
dap_global_db_driver_hash_t l_driver_key = dap_global_db_driver_hash_get(a_store_obj);
l_key.iov_base = &l_driver_key;
l_key.iov_len = sizeof(l_driver_key);
if ( MDBX_SUCCESS != (l_rc = mdbx_del(l_txn, l_db_ctx->dbi, &l_key, NULL)) && l_rc != MDBX_NOTFOUND)
log_it(L_ERROR, "mdbx_del: (%d) %s", l_rc, mdbx_strerror(l_rc));
} else {
l_rc = s_get_obj_by_text_key(l_txn, l_db_ctx->dbi, &l_key, &l_data, a_store_obj->key);
if (l_rc == MDBX_SUCCESS) {
if ( MDBX_SUCCESS != (l_rc = mdbx_del(l_txn, l_db_ctx->dbi, &l_key, NULL)) && l_rc != MDBX_NOTFOUND)
log_it(L_ERROR, "mdbx_del: (%d) %s", l_rc, mdbx_strerror(l_rc));
}
}
l_rc = (l_rc == MDBX_NOTFOUND) ? 1 : l_rc; /* Not found? It's OK */
} else if (MDBX_SUCCESS != (l_rc = mdbx_drop(l_txn, l_db_ctx->dbi, 0))) /* Drop the whole table */
log_it (L_ERROR, "mdbx_drop: (%d) %s", l_rc, mdbx_strerror(l_rc));
l_rc2 = 0;
......@@ -976,7 +1051,6 @@ uint64_t l_count_out;
dap_db_ctx_t *l_db_ctx;
dap_store_obj_t *l_obj, *l_obj_arr = NULL;
MDBX_val l_key, l_data;
MDBX_cursor *l_cursor;
MDBX_stat l_stat;
MDBX_txn *l_txn;
......@@ -991,9 +1065,7 @@ MDBX_txn *l_txn;
* Perfroms a find/get a record with the given key
*/
if ( a_key ) {
l_key.iov_base = (void *) a_key; /* Fill IOV for MDBX key */
l_key.iov_len = strlen(a_key);
if (MDBX_SUCCESS == (l_rc = mdbx_get(l_txn, l_db_ctx->dbi, &l_key, &l_data))) {
if (MDBX_SUCCESS == (l_rc = s_get_obj_by_text_key(l_txn, l_db_ctx->dbi, &l_key, &l_data, a_key))) {
/* Found ! Make new <store_obj> */
if ( !(l_obj = DAP_CALLOC(1, sizeof(dap_store_obj_t))) ) {
log_it (L_ERROR, "Cannot allocate a memory for store object key, errno=%d", errno);
......@@ -1014,8 +1086,8 @@ MDBX_txn *l_txn;
/*
** If a_key is NULL - retrieve a requested number of records from the table
*/
do {
l_cursor = NULL;
MDBX_cursor *l_cursor = NULL; /* Initialize MDBX cursor context area */
do {
/*
* Retrieve statistic for group/table, we need to compute a number of records can be retreived
*/
......@@ -1027,7 +1099,7 @@ MDBX_txn *l_txn;
debug_if(g_dap_global_db_debug_more, L_WARNING, "No object (-s) to be retrieved from the group '%s'", a_group);
break;
}
l_count_out = l_stat.ms_entries;
l_count_out = a_count_out && *a_count_out && *a_count_out <= l_stat.ms_entries ? *a_count_out : l_stat.ms_entries;
/*
* Allocate memory for array[l_count_out] of returned objects
......@@ -1036,32 +1108,35 @@ MDBX_txn *l_txn;
log_it(L_ERROR, "Cannot allocate %zu octets for %"DAP_UINT64_FORMAT_U" store objects", l_count_out * sizeof(dap_store_obj_t), l_count_out);
break;
}
/* Initialize MDBX cursor context area */
/* Iterate cursor to retrieve records from DB */
if ( MDBX_SUCCESS != (l_rc = mdbx_cursor_open(l_txn, l_db_ctx->dbi, &l_cursor)) ) {
log_it (L_ERROR, "mdbx_cursor_open: (%d) %s", l_rc, mdbx_strerror(l_rc));
break;
}
/* Iterate cursor to retrieve records from DB */
if ( MDBX_SUCCESS != (l_rc = mdbx_cursor_get(l_cursor, &l_key, &l_data, MDBX_FIRST)) ) {
log_it (L_ERROR, "mdbx_cursor_get FIRST: (%d) %s", l_rc, mdbx_strerror(l_rc));
break;
}
l_obj = l_obj_arr;
for (int i = l_count_out;
i && (MDBX_SUCCESS == (l_rc = mdbx_cursor_get(l_cursor, &l_key, &l_data, MDBX_NEXT))); i--, l_obj++)
{
do {
if (s_fill_store_obj(a_group, &l_key, &l_data, l_obj)) {
l_rc = MDBX_PROBLEM;
break;
} else if ( a_count_out )
} else if (a_count_out)
(*a_count_out)++;
}
l_count_out--;
l_obj++;
} while (MDBX_SUCCESS == (l_rc = mdbx_cursor_get(l_cursor, &l_key, &l_data, MDBX_NEXT)) && l_count_out);
if ( (MDBX_SUCCESS != l_rc) && (l_rc != MDBX_NOTFOUND) ) {
log_it (L_ERROR, "mdbx_cursor_get: (%d) %s", l_rc, mdbx_strerror(l_rc));
log_it (L_ERROR, "mdbx_get ALL: (%d) %s", l_rc, mdbx_strerror(l_rc));
break;
}
} while (0);
if (l_cursor)
mdbx_cursor_close(l_cursor);
mdbx_txn_commit(l_txn);
return l_obj_arr;
}
......@@ -100,6 +100,7 @@ typedef bool (*dap_global_db_callback_results_raw_t) (dap_global_db_instance_t *
dap_store_obj_t *a_values, void *a_arg);
// Return codes
#define DAP_GLOBAL_DB_RC_SUCCESS 0
#define DAP_GLOBAL_DB_RC_PROGRESS 1
#define DAP_GLOBAL_DB_RC_NO_RESULTS -1
#define DAP_GLOBAL_DB_RC_ERROR -6
......@@ -161,3 +162,4 @@ int dap_global_db_del_sync(const char * a_group, const char *a_key);
int dap_global_db_flush_sync();
bool dap_global_db_isalnum_group_key(const dap_store_obj_t *a_obj);
bool dap_global_db_group_match_mask(const char *a_group, const char *a_mask);
......@@ -37,6 +37,8 @@
#define DAP_GLOBAL_DB_GROUPS_COUNT_MAX 1024UL /* A maximum number of groups */
#define DAP_GLOBAL_DB_KEY_SIZE_MAX 512UL /* A limit for the key's length in DB */
#define DAP_GLOBAL_DB_COND_READ_COUNT_DEFAULT 256UL /* Default count of records to return with conditional read */
enum dap_global_db_record_flags {
DAP_GLOBAL_DB_RECORD_COMMON = 0,
DAP_GLOBAL_DB_RECORD_PINNED = 1,
......@@ -68,7 +70,7 @@ typedef struct dap_store_obj {
DAP_STATIC_INLINE dap_global_db_driver_hash_t dap_global_db_driver_hash_get(dap_store_obj_t *a_obj)
{
dap_global_db_driver_hash_t l_ret = { .bets = htobe64(a_obj->timestamp), .becrc = htobe32(a_obj->crc) };
dap_global_db_driver_hash_t l_ret = { .bets = htobe64(a_obj->timestamp), .becrc = htobe64(a_obj->crc) };
return l_ret;
}
......@@ -98,29 +100,23 @@ DAP_STATIC_INLINE char *dap_global_db_driver_hash_print(dap_global_db_driver_has
return l_ret;
}
// db type for iterator
typedef enum dap_global_db_iter_type {
DAP_GLOBAL_DB_TYPE_UNDEFINED = 0,
DAP_GLOBAL_DB_TYPE_MDBX = 1,
DAP_GLOBAL_DB_TYPE_SQLITE
} dap_global_db_iter_type_t;
extern const dap_global_db_driver_hash_t c_dap_global_db_driver_hash_start;
// db element iterator
typedef struct dap_global_db_iter {
dap_global_db_iter_type_t db_type;
const char *db_group;
void *db_iter;
} dap_global_db_iter_t;
DAP_STATIC_INLINE bool dap_global_db_driver_hash_is_blank(dap_global_db_driver_hash_t a_blank_candidate)
{
return !memcmp(&a_blank_candidate, &c_dap_global_db_driver_hash_start, sizeof(dap_global_db_driver_hash_t));
}
typedef int (*dap_db_driver_write_callback_t)(dap_store_obj_t *a_store_obj);
typedef dap_store_obj_t* (*dap_db_driver_read_callback_t)(const char *a_group, const char *a_key, size_t *a_count_out);
typedef dap_store_obj_t* (*dap_db_driver_read_cond_callback_t)(dap_global_db_iter_t *a_iter, size_t *a_count, dap_nanotime_t a_timestamp);
typedef dap_store_obj_t* (*dap_db_driver_read_cond_callback_t)(const char *a_group, dap_global_db_driver_hash_t a_hash_from, size_t *a_count);
typedef dap_store_obj_t* (*dap_db_driver_read_last_callback_t)(const char *a_group);
typedef size_t (*dap_db_driver_read_count_callback_t)(const char * a_group, dap_nanotime_t a_timestamp);
typedef size_t (*dap_db_driver_read_count_callback_t)(const char *a_group, dap_global_db_driver_hash_t a_hash_from);
typedef dap_list_t* (*dap_db_driver_get_groups_callback_t)(const char *a_mask);
typedef bool (*dap_db_driver_is_obj_callback_t)(const char *a_group, const char *a_key);
typedef bool (*dap_db_driver_is_hash_callback_t)(const char *a_group, dap_global_db_driver_hash_t a_hash);
typedef dap_store_obj_t * (*dap_db_driver_get_by_hash_callback_t)(const char *a_group, dap_global_db_driver_hash_t a_hash);
typedef int (*dap_db_driver_callback_t)(void);
typedef int (*dap_db_driver_iter_create_callback_t)(dap_global_db_iter_t *a_iter);
typedef struct dap_db_driver_callbacks {
dap_db_driver_write_callback_t apply_store_obj; /* Performs an DB's action like: INSERT/DELETE/UPDATE for the given
......@@ -134,17 +130,17 @@ typedef struct dap_db_driver_callbacks {
dap_db_driver_is_obj_callback_t is_obj; /* Check for existence of a record in the table/group for
a given <key> */
dap_db_driver_is_hash_callback_t is_hash; /* Check for existence of a record in the table/group for
a given driver hash */
dap_db_driver_get_by_hash_callback_t get_by_hash; /* Retrieve a record from the table/group for a given driver hash */
dap_db_driver_callback_t transaction_start; /* Allocate DB context for consequtive operations */
dap_db_driver_callback_t transaction_end; /* Release DB context at end of DB consequtive operations */
dap_db_driver_callback_t deinit;
dap_db_driver_callback_t flush;
dap_db_driver_iter_create_callback_t iter_create;
} dap_db_driver_callbacks_t;
int dap_db_driver_init(const char *driver_name, const char *a_filename_db, int a_mode_async);
void dap_db_driver_deinit(void);
......@@ -159,13 +155,11 @@ int dap_db_driver_flush(void);
int dap_global_db_driver_apply(dap_store_obj_t *a_store_obj, size_t a_store_count);
int dap_global_db_driver_add(dap_store_obj_t *a_store_obj, size_t a_store_count);
int dap_global_db_driver_delete(dap_store_obj_t * a_store_obj, size_t a_store_count);
dap_store_obj_t* dap_global_db_driver_read_last(const char *a_group);
dap_store_obj_t* dap_global_db_driver_cond_read(dap_global_db_iter_t* a_iter, size_t *a_count_out, dap_nanotime_t a_timestamp);
dap_store_obj_t* dap_global_db_driver_read(const char *a_group, const char *a_key, size_t *count_out);
dap_store_obj_t *dap_global_db_driver_read_last(const char *a_group);
dap_store_obj_t *dap_global_db_driver_cond_read(const char *a_group, dap_global_db_driver_hash_t a_hash_from, size_t *a_count_out);
dap_store_obj_t *dap_global_db_driver_read(const char *a_group, const char *a_key, size_t *count_out);
dap_store_obj_t *dap_global_db_driver_get_by_hash(const char *a_group, dap_global_db_driver_hash_t a_hash);
bool dap_global_db_driver_is(const char *a_group, const char *a_key);
bool dap_global_db_driver_is_hash(const char *a_group, const dap_global_db_driver_hash_t *a_hash);
size_t dap_global_db_driver_count(const char *a_group, dap_nanotime_t a_timestamp);
dap_list_t* dap_global_db_driver_get_groups_by_mask(const char *a_group_mask);
dap_global_db_iter_t *dap_global_db_driver_iter_create(const char *a_group);
void dap_global_db_driver_iter_delete(dap_global_db_iter_t* a_iter);
bool dap_global_db_driver_is_hash(const char *a_group, dap_global_db_driver_hash_t a_hash);
size_t dap_global_db_driver_count(const char *a_group, dap_global_db_driver_hash_t a_hash_from);
dap_list_t *dap_global_db_driver_get_groups_by_mask(const char *a_group_mask);
......@@ -1376,7 +1376,7 @@ void dap_events_socket_delete_mt(dap_worker_t * a_worker, dap_events_socket_uuid
* @param a_callbacks
* @return
*/
dap_events_socket_t *dap_events_socket_wrap_listener(dap_server_t *a_server, dap_events_socket_callbacks_t *a_callbacks)
dap_events_socket_t *dap_events_socket_wrap_listener(dap_server_t *a_server, SOCKET a_sock, dap_events_socket_callbacks_t *a_callbacks)
{
if (!a_callbacks || !a_server) {
log_it(L_CRITICAL, "Invalid arguments in dap_events_socket_wrap_listener");
......@@ -1386,10 +1386,9 @@ dap_events_socket_t *dap_events_socket_wrap_listener(dap_server_t *a_server, dap
if (!l_es)
return NULL;
l_es->socket = a_server->socket_listener;
l_es->socket = a_sock;
l_es->server = a_server;
l_es->callbacks = *a_callbacks;
l_es->_inheritor = a_server;
switch (a_server->type) {
case DAP_SERVER_UDP:
l_es->type = DESCRIPTOR_TYPE_SOCKET_UDP;
......@@ -1613,9 +1612,7 @@ void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_p
if (!a_preserve_inheritor )
DAP_DEL_Z(a_esocket->_inheritor);
DAP_DEL_Z(a_esocket->_pvt);
DAP_DEL_Z(a_esocket->buf_in);
DAP_DEL_Z(a_esocket->buf_out);
DAP_DEL_MULTY(a_esocket->_pvt, a_esocket->buf_in, a_esocket->buf_out);
#ifdef DAP_SYS_DEBUG
atomic_fetch_add(&s_memstat[MEMSTAT$K_BUF_OUT].free_nr, 1);
......
......@@ -70,12 +70,13 @@
#include "dap_server.h"
#include "dap_worker.h"
#include "dap_events.h"
#include "dap_strfuncs.h"
#define LOG_TAG "dap_server"
static dap_events_socket_t * s_es_server_create(int a_sock,
dap_events_socket_callbacks_t * a_callbacks, dap_server_t * a_server);
static int s_server_run(dap_server_t * a_server, dap_events_socket_callbacks_t *a_callbacks );
static int s_server_run(dap_server_t * a_server);
static void s_es_server_accept(dap_events_socket_t *a_es_listener, SOCKET a_remote_socket, struct sockaddr_storage *a_remote_addr);
static void s_es_server_error(dap_events_socket_t *a_es, int a_arg);
static void s_es_server_new(dap_events_socket_t *a_es, void * a_arg);
......@@ -115,11 +116,20 @@ dap_server_t* dap_server_get_default()
*/
void dap_server_delete(dap_server_t *a_server)
{
// sanity check
dap_return_if_pass(!a_server);
// func work
while (a_server->es_listeners) {
dap_events_socket_t *l_es = (dap_events_socket_t *)a_server->es_listeners->data;
dap_events_socket_remove_and_delete_mt(l_es->worker, l_es->uuid); // TODO unsafe moment. Replace storage to uuids
dap_list_t *l_tmp = a_server->es_listeners;
a_server->es_listeners = l_tmp->next;
DAP_DELETE(l_tmp);
}
if(a_server->delete_callback)
a_server->delete_callback(a_server,NULL);
if( a_server->_inheritor )
DAP_DELETE( a_server->_inheritor );
DAP_DEL_Z( a_server->_inheritor );
pthread_mutex_destroy(&a_server->started_mutex);
pthread_cond_destroy(&a_server->started_cond);
......@@ -128,149 +138,208 @@ void dap_server_delete(dap_server_t *a_server)
}
/**
* @brief dap_server_new_local
* @param a_events
* @param a_path
* @param a_mode
* @param a_callbacks
* @brief add listen addr to server
* @param a_server - server to add addr
* @param a_addr - addr or path to local
* @param a_port - port or read mode
* @param a_callbacks - pointer to callbacks
* @return
*/
dap_server_t *dap_server_new_local(const char *a_path, const char *a_mode, dap_events_socket_callbacks_t *a_callbacks)
int dap_server_listen_addr_add(dap_server_t *a_server, const char *a_addr, uint16_t a_port, dap_events_socket_callbacks_t *a_callbacks)
{
#ifdef DAP_OS_UNIX
mode_t l_listen_unix_socket_permissions = 0770;
if (a_mode)
sscanf(a_mode,"%ou", &l_listen_unix_socket_permissions );
return dap_server_new(a_path, l_listen_unix_socket_permissions, DAP_SERVER_LOCAL, a_callbacks);
#else
log_it(L_ERROR, "Local server is not implemented for your platform");
return NULL;
#endif
}
/**
* @brief dap_server_new
* @param a_events
* @param a_addr
* @param a_port
* @param a_type
* @return
*/
dap_server_t* dap_server_new(const char * a_addr, uint16_t a_port, dap_server_type_t a_type, dap_events_socket_callbacks_t *a_callbacks)
{
dap_server_t *l_server = DAP_NEW_Z(dap_server_t);
if (!l_server) {
log_it(L_CRITICAL, "Memory allocation error");
return NULL;
// sanity check
dap_return_val_if_pass(!a_server, -1);
if (!a_addr || !a_port) {
log_it(L_ERROR, "Listener addr %s %u unspecified", a_addr, a_port);
return -4;;
}
l_server->type = a_type;
if (l_server->type != DAP_SERVER_LOCAL)
l_server->port = a_port;
// Create socket
l_server->socket_listener = INVALID_SOCKET;
switch (l_server->type) {
// preparing
SOCKET l_socket_listener = INVALID_SOCKET;
switch (a_server->type) {
case DAP_SERVER_TCP:
l_server->socket_listener = socket(AF_INET, SOCK_STREAM, 0);
l_socket_listener = socket(AF_INET, SOCK_STREAM, 0);
break;
case DAP_SERVER_TCP_V6:
l_server->socket_listener = socket(AF_INET6, SOCK_STREAM, 0);
l_socket_listener = socket(AF_INET6, SOCK_STREAM, 0);
break;
case DAP_SERVER_UDP:
l_server->socket_listener = socket(AF_INET, SOCK_DGRAM, 0);
l_socket_listener = socket(AF_INET, SOCK_DGRAM, 0);
break;
#ifdef DAP_OS_UNIX
case DAP_SERVER_LOCAL:
l_server->socket_listener = socket(AF_LOCAL, SOCK_STREAM, 0);
l_socket_listener = socket(AF_LOCAL, SOCK_STREAM, 0);
break;
#endif
default:
log_it(L_ERROR, "Specified server type %s is not implemented for your platform",
dap_server_type_str(l_server->type));
DAP_DELETE(l_server);
return NULL;
dap_server_type_str(a_server->type));
return -1;
}
#ifdef DAP_OS_WINDOWS
if (l_server->socket_listener == INVALID_SOCKET) {
if (l_socket_listener == INVALID_SOCKET) {
log_it(L_ERROR, "Socket error: %d", WSAGetLastError());
#else
if (l_server->socket_listener < 0) {
if (l_socket_listener < 0) {
int l_errno = errno;
log_it (L_ERROR,"Socket error %s (%d)", strerror(l_errno), l_errno);
#endif
DAP_DELETE(l_server);
return NULL;
return -2;
}
log_it(L_NOTICE,"Listen socket %"DAP_FORMAT_SOCKET" created...", l_socket_listener);
// func work
// Create socket
dap_events_socket_t *l_es = dap_events_socket_wrap_listener(a_server, l_socket_listener, a_callbacks);
if (a_server->type != DAP_SERVER_LOCAL)
l_es->listener_port = a_port;
else
l_es->permission = a_port;
log_it(L_NOTICE,"Listen socket %"DAP_FORMAT_SOCKET" created...", l_server->socket_listener);
int reuse = 1;
if (setsockopt(l_server->socket_listener, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
if (setsockopt(l_socket_listener, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
log_it(L_WARNING, "Can't set up REUSEADDR flag to the socket");
reuse = 1;
#ifdef SO_REUSEPORT
if (setsockopt(l_server->socket_listener, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
if (setsockopt(l_socket_listener, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
log_it(L_WARNING, "Can't set up REUSEPORT flag to the socket");
#endif
void *l_addr_ptr = NULL;
const char *l_addr = a_addr;
switch (l_server->type) {
switch (a_server->type) {
case DAP_SERVER_TCP:
case DAP_SERVER_UDP:
if (!l_addr)
l_addr = "0.0.0.0";
l_server->listener_addr.sin_family = AF_INET;
l_server->listener_addr.sin_port = htons(l_server->port);
l_addr_ptr = &l_server->listener_addr.sin_addr;
l_es->listener_addr.sin_family = AF_INET;
l_es->listener_addr.sin_port = htons(l_es->listener_port);
l_addr_ptr = &l_es->listener_addr.sin_addr;
break;
case DAP_SERVER_TCP_V6:
if (!l_addr)
l_addr = "::0";
l_server->listener_addr_v6.sin6_family = AF_INET6;
l_server->listener_addr_v6.sin6_port = htons(l_server->port);
l_addr_ptr = &l_server->listener_addr_v6.sin6_addr;
l_es->listener_addr_v6.sin6_family = AF_INET6;
l_es->listener_addr_v6.sin6_port = htons(l_es->listener_port);
l_addr_ptr = &l_es->listener_addr_v6.sin6_addr;
default:
break;
}
if (l_server->type != DAP_SERVER_LOCAL) {
if (a_server->type != DAP_SERVER_LOCAL) {
if (inet_pton(AF_INET, l_addr, &l_addr_ptr) <= 0) {
log_it(L_ERROR, "Can't convert address %s to digital form", l_addr);
goto clean_n_quit;
}
strncpy(l_server->address, l_addr, sizeof(l_server->address)); // If NULL we listen everything
strncpy(l_es->listener_addr_str, l_addr, sizeof(l_es->listener_addr_str)); // If NULL we listen everything
}
#ifdef DAP_OS_UNIX
else {
if (!a_addr) {
log_it(L_ERROR, "Listener path unspecified");
goto clean_n_quit;
}
l_server->listener_path.sun_family = AF_UNIX;
strncpy(l_server->listener_path.sun_path, a_addr, sizeof(l_server->listener_path.sun_path) - 1);
if (access(l_server->listener_path.sun_path, R_OK) == -1) {
log_it(L_ERROR, "Listener path %s is unavailable", l_server->listener_path.sun_path);
l_es->listener_path.sun_family = AF_UNIX;
strncpy(l_es->listener_path.sun_path, a_addr, sizeof(l_es->listener_path.sun_path) - 1);
if (access(l_es->listener_path.sun_path, R_OK) == -1) {
log_it(L_ERROR, "Listener path %s is unavailable", l_es->listener_path.sun_path);
goto clean_n_quit;
}
unlink(l_server->listener_path.sun_path);
unlink(l_es->listener_path.sun_path);
}
#endif
if (s_server_run(l_server,a_callbacks))
a_server->es_listeners = dap_list_prepend(a_server->es_listeners, l_es);
if (s_server_run(a_server))
goto clean_n_quit;
#ifdef DAP_OS_UNIX
if (l_server->type == DAP_SERVER_LOCAL) {
mode_t l_listen_unix_socket_permissions = a_port;
chmod(l_server->listener_path.sun_path, l_listen_unix_socket_permissions);
if (a_server->type == DAP_SERVER_LOCAL) {
chmod(l_es->listener_path.sun_path, l_es->permission);
}
#endif
return l_server;
return 0;
clean_n_quit:
closesocket(l_server->socket_listener);
DAP_DELETE(l_server);
return NULL;
a_server->es_listeners = dap_list_remove(a_server->es_listeners, l_es);
dap_events_socket_delete_unsafe(l_es, false);
return -3;
}
/**
* @brief dap_server_new
* @param a_events
* @param a_addr
* @param a_port
* @param a_type
* @return
*/
dap_server_t *dap_server_new(char **a_addrs, uint16_t a_count, dap_server_type_t a_type, dap_events_socket_callbacks_t *a_callbacks)
{
// sanity check
dap_return_val_if_pass(!a_addrs || !a_count, NULL);
// memory alloc
dap_server_t *l_server = NULL;
DAP_NEW_Z_RET_VAL(l_server, dap_server_t, NULL, NULL);
// preparing
//create callback
dap_events_socket_callbacks_t l_callbacks = {0};
l_callbacks.new_callback = s_es_server_new;
l_callbacks.accept_callback = s_es_server_accept;
l_callbacks.error_callback = s_es_server_error;
if (a_callbacks) {
l_callbacks.read_callback = a_callbacks->read_callback;
l_callbacks.write_callback = a_callbacks->write_callback;
l_callbacks.error_callback = a_callbacks->error_callback;
}
l_server->type = a_type;
char l_curr_ip[INET6_ADDRSTRLEN] = {0};
// func work
for(size_t i = 0; i < a_count; ++i) {
// parsing full addr
int l_ret = -1;
if (l_server->type != DAP_SERVER_LOCAL) {
const char *l_curr_port_str = strstr(a_addrs[i], ":");
uint16_t l_curr_port = 0;
if (l_curr_port_str) {
memset(l_curr_ip, 0, sizeof(l_curr_ip));
strncpy(l_curr_ip, a_addrs[i], dap_min((size_t)(l_curr_port_str - a_addrs[i]), sizeof(l_curr_ip) - 1));
l_curr_port = atol(++l_curr_port_str);
} else {
l_curr_port = atol(a_addrs[i]);
}
switch (l_server->type) {
case DAP_SERVER_TCP:
case DAP_SERVER_UDP:
if (!l_curr_ip[0])
strcpy(l_curr_ip, "0.0.0.0"); // If NULL we listen everything
break;
case DAP_SERVER_TCP_V6:
if (!l_curr_ip[0])
strcpy(l_curr_ip, "::0");
break;
default:
break;
}
l_ret = dap_server_listen_addr_add(l_server, l_curr_ip, l_curr_port, &l_callbacks);
}
#ifdef DAP_OS_UNIX
else {
char l_curr_path[MAX_PATH] = {0};
mode_t l_listen_unix_socket_permissions = 0770;
const char *l_curr_mode_str = strstr(a_addrs[i], ":");
if (!l_curr_mode_str) {
strncpy(l_curr_path, a_addrs[i], sizeof(l_curr_path) - 1);
} else {
l_curr_mode_str++;
strncpy(l_curr_path, a_addrs[i], dap_min((size_t)(l_curr_mode_str - a_addrs[i]), sizeof(l_curr_path) - 1));
sscanf(l_curr_mode_str,"%ou", &l_listen_unix_socket_permissions );
}
l_ret = dap_server_listen_addr_add(l_server, l_curr_path, l_listen_unix_socket_permissions, &l_callbacks);
}
#endif
if (l_ret)
continue;
}
if (!l_server->es_listeners) {
log_it(L_ERROR, "Server not created");
DAP_DELETE(l_server);
return NULL;
}
return l_server;
}
/**
......@@ -278,73 +347,61 @@ clean_n_quit:
* @param a_server
* @param a_callbacks
*/
static int s_server_run(dap_server_t *a_server, dap_events_socket_callbacks_t *a_callbacks)
static int s_server_run(dap_server_t *a_server)
{
assert(a_server);
// sanity check
dap_return_val_if_pass(!a_server || !a_server->es_listeners, -1);
// func work
dap_events_socket_t *l_es = (dap_events_socket_t *)a_server->es_listeners->data;
void *l_listener_addr = NULL;
socklen_t l_listener_addr_len = 0;
switch (a_server->type) {
case DAP_SERVER_TCP:
case DAP_SERVER_UDP:
l_listener_addr = &a_server->listener_addr;
l_listener_addr_len = sizeof(a_server->listener_addr);
l_listener_addr = &l_es->listener_addr;
l_listener_addr_len = sizeof(l_es->listener_addr);
break;
case DAP_SERVER_TCP_V6:
l_listener_addr = &a_server->listener_addr_v6;
l_listener_addr_len = sizeof(a_server->listener_addr_v6);
l_listener_addr = &l_es->listener_addr_v6;
l_listener_addr_len = sizeof(l_es->listener_addr_v6);
break;
#ifdef DAP_OS_UNIX
case DAP_SERVER_LOCAL:
l_listener_addr = &a_server->listener_path;
l_listener_addr_len = sizeof(a_server->listener_path);
l_listener_addr = &l_es->listener_path;
l_listener_addr_len = sizeof(l_es->listener_path);
#endif
default:
log_it(L_ERROR, "Can't run server: unsupported server type %s", dap_server_type_str(a_server->type));
}
if (bind(a_server->socket_listener, (struct sockaddr *)l_listener_addr, l_listener_addr_len) < 0) {
if (bind(l_es->socket, (struct sockaddr *)l_listener_addr, l_listener_addr_len) < 0) {
#ifdef DAP_OS_WINDOWS
log_it(L_ERROR,"Bind error: %d", WSAGetLastError());
closesocket(a_server->socket_listener);
closesocket(l_es->socket);
#else
log_it(L_ERROR,"Bind error: %s",strerror(errno));
close(a_server->socket_listener);
close(l_es->socket);
if ( errno == EACCES ) // EACCES=13
log_it( L_ERROR, "Server can't start. Permission denied");
#endif
DAP_DELETE(a_server);
return -1;
} else {
log_it(L_INFO, "Binded %s:%u", a_server->address, a_server->port);
listen(a_server->socket_listener, SOMAXCONN);
log_it(L_INFO,"Binded %s:%u", l_es->listener_addr_str, l_es->listener_port);
listen(l_es->socket, SOMAXCONN);
}
#ifdef DAP_OS_WINDOWS
u_long l_mode = 1;
ioctlsocket(a_server->socket_listener, (long)FIONBIO, &l_mode);
ioctlsocket(l_es->socket, (long)FIONBIO, &l_mode);
#else
fcntl( a_server->socket_listener, F_SETFL, O_NONBLOCK);
fcntl(l_es->socket, F_SETFL, O_NONBLOCK);
#endif
pthread_mutex_init(&a_server->started_mutex,NULL);
pthread_cond_init(&a_server->started_cond,NULL);
dap_events_socket_callbacks_t l_callbacks;
memset(&l_callbacks,0,sizeof (l_callbacks));
l_callbacks.new_callback = s_es_server_new;
l_callbacks.accept_callback = s_es_server_accept;
l_callbacks.error_callback = s_es_server_error;
if (a_callbacks) {
l_callbacks.read_callback = a_callbacks->read_callback;
l_callbacks.write_callback = a_callbacks->write_callback;
l_callbacks.error_callback = a_callbacks->error_callback;
}
#ifdef DAP_EVENTS_CAPS_EPOLL
for(size_t l_worker_id = 0; l_worker_id < dap_events_thread_get_count() ; l_worker_id++){
dap_worker_t *l_w = dap_events_worker_get(l_worker_id);
assert(l_w);
dap_events_socket_t * l_es = dap_events_socket_wrap_listener(a_server, &l_callbacks);
if (l_es) {
l_es->type = a_server->type == DAP_SERVER_TCP ? DESCRIPTOR_TYPE_SOCKET_LISTENING : DESCRIPTOR_TYPE_SOCKET_UDP;
// Prepare for multi thread listening
......@@ -353,21 +410,19 @@ static int s_server_run(dap_server_t *a_server, dap_events_socket_callbacks_t *a
// if we have poll exclusive
l_es->ev_base_flags |= EPOLLET | EPOLLEXCLUSIVE;
#endif
l_es->_inheritor = a_server;
pthread_mutex_lock(&a_server->started_mutex);
dap_worker_add_events_socket( l_w, l_es );
while (!a_server->started)
pthread_cond_wait(&a_server->started_cond, &a_server->started_mutex);
pthread_mutex_unlock(&a_server->started_mutex);
} else{
log_it(L_WARNING, "Can't wrap event socket for %s:%u server", a_server->address, a_server->port);
log_it(L_WARNING, "Can't wrap event socket for %s:%u server", l_es->listener_addr_str, l_es->listener_port);
return -2;
}
}
#else
dap_worker_t *l_w = dap_events_worker_get_auto();
assert(l_w);
dap_events_socket_t *l_es = dap_events_socket_wrap_listener(a_server, &l_callbacks);
if (l_es) {
pthread_mutex_lock(&a_server->started_mutex);
dap_worker_add_events_socket( l_w, l_es );
......@@ -389,8 +444,8 @@ static int s_server_run(dap_server_t *a_server, dap_events_socket_callbacks_t *a
*/
static void s_es_server_new(dap_events_socket_t *a_es, void * a_arg)
{
log_it(L_DEBUG, "Created server socket %p on worker %u", a_es, a_es->worker->id);
dap_server_t *l_server = (dap_server_t*) a_es->_inheritor;
log_it(L_DEBUG, "Created server socket %p on worker %u", a_es, a_es->worker->id);;
dap_server_t *l_server = a_es->server;
pthread_mutex_lock( &l_server->started_mutex);
l_server->started = true;
pthread_cond_broadcast( &l_server->started_cond);
......@@ -419,11 +474,11 @@ static void s_es_server_error(dap_events_socket_t *a_es, int a_arg)
*/
static void s_es_server_accept(dap_events_socket_t *a_es_listener, SOCKET a_remote_socket, struct sockaddr_storage *a_remote_addr)
{
dap_server_t *l_server = (dap_server_t*)a_es_listener->_inheritor;
dap_server_t *l_server = a_es_listener->server;
assert(l_server);
dap_events_socket_t * l_es_new = NULL;
log_it(L_DEBUG, "[es:%p] Listening socket (binded on %s:%u) got new incoming connection", a_es_listener, l_server->address, l_server->port);
log_it(L_DEBUG, "[es:%p] Listening socket (binded on %s:%u) got new incoming connection", a_es_listener, a_es_listener->listener_addr_str, a_es_listener->listener_port);
if (a_remote_socket < 0) {
#ifdef DAP_OS_WINDOWS
log_it(L_ERROR, "Accept error: %d", WSAGetLastError());
......
......@@ -60,6 +60,7 @@ typedef int SOCKET;
//#define DAP_EVENTS_CAPS_AIO
//#define DAP_EVENTS_CAPS_AIO_THREADS
#include <netinet/in.h>
#include <sys/un.h>
#include <sys/eventfd.h>
#include <mqueue.h>
#include <sys/un.h>
......@@ -69,6 +70,7 @@ typedef int SOCKET;
#define DAP_EVENTS_CAPS_EVENT_KEVENT
#define DAP_EVENTS_CAPS_QUEUE_KEVENT
#include <netinet/in.h>
#include <sys/un.h>
#include <sys/event.h>
#include <sys/un.h>
#elif defined (DAP_OS_UNIX)
......@@ -254,13 +256,24 @@ typedef struct dap_events_socket {
// Remote address, port and others
union {
struct sockaddr_in remote_addr;
struct sockaddr_in listener_addr;
struct sockaddr_in6 remote_addr_v6;
struct sockaddr_in6 listener_addr_v6;
#ifdef DAP_OS_UNIX
struct sockaddr_un remote_path;
struct sockaddr_un listener_path; // Path to UNIX socket
#endif
};
char remote_addr_str[INET6_ADDRSTRLEN];
uint16_t remote_port;
union {
char remote_addr_str[INET6_ADDRSTRLEN];
char listener_addr_str[INET6_ADDRSTRLEN];
};
union {
uint16_t remote_port;
uint16_t listener_port;
mode_t permission;
};
// Links to related objects
dap_context_t *context;
......@@ -348,7 +361,7 @@ void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_p
void dap_events_socket_delete_mt(dap_worker_t * a_worker, dap_events_socket_uuid_t a_es_uuid);
dap_events_socket_t *dap_events_socket_wrap_no_add(SOCKET a_sock, dap_events_socket_callbacks_t *a_callbacks);
dap_events_socket_t *dap_events_socket_wrap_listener(dap_server_t *a_server, dap_events_socket_callbacks_t *a_callbacks);
dap_events_socket_t *dap_events_socket_wrap_listener(dap_server_t *a_server, SOCKET a_sock, dap_events_socket_callbacks_t *a_callbacks);
void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct dap_worker * a_worker);
void dap_events_socket_assign_on_worker_inter(dap_events_socket_t * a_es_input, dap_events_socket_t * a_es);
......
......@@ -78,19 +78,11 @@ struct dap_server;
typedef void (*dap_server_callback_t)( struct dap_server *,void * arg ); // Callback for specific server's operations
typedef struct dap_server {
dap_server_type_t type; // Server's type
uint16_t port; // Listen port
char address[INET6_ADDRSTRLEN]; // Listen address
SOCKET socket_listener;
union {
struct sockaddr_in listener_addr; // Kernel structure for listener's binded address
struct sockaddr_in6 listener_addr_v6; // Kernel structure for listener's binded address IPv6
#ifdef DAP_OS_UNIX
struct sockaddr_un listener_path; // Path to UNIX socket
#endif
};
void *_inheritor; // Pointer to the internal data, HTTP for example
dap_server_type_t type; // Server's type
dap_list_t *es_listeners;
void *_inheritor;
dap_cpu_stats_t cpu_stats;
......@@ -109,7 +101,7 @@ void dap_server_deinit( void ); // Deinit server module
void dap_server_set_default(dap_server_t* a_server);
dap_server_t* dap_server_get_default();
dap_server_t* dap_server_new(const char * a_addr, uint16_t a_port, dap_server_type_t a_type, dap_events_socket_callbacks_t *a_callbacks);
dap_server_t* dap_server_new_local(const char * a_path, const char * a_mode, dap_events_socket_callbacks_t *a_callbacks);
dap_server_t* dap_server_new(char **a_addrs, uint16_t a_count, dap_server_type_t a_type, dap_events_socket_callbacks_t *a_callbacks);
int dap_server_listen_addr_add(dap_server_t *a_server, const char *a_addr, uint16_t a_port, dap_events_socket_callbacks_t *a_callbacks);
void dap_server_delete(dap_server_t *a_server);
......@@ -57,29 +57,26 @@ static void s_notify_server_callback_delete(dap_events_socket_t * a_es, void * a
*/
int dap_notify_server_init()
{
const char * l_notify_socket_path = dap_config_get_item_str_default(g_config, "notify_server", "listen_path",NULL);
const char * l_notify_socket_path_mode = dap_config_get_item_str_default(g_config, "notify_server", "listen_path_mode","0600");
const char * l_notify_socket_address = dap_config_get_item_str_default(g_config, "notify_server", "listen_address",NULL);
uint16_t l_notify_socket_port = dap_config_get_item_uint16_default(g_config, "notify_server", "listen_port",0);
if(l_notify_socket_path){
s_notify_server = dap_server_new_local(l_notify_socket_path, l_notify_socket_path_mode, NULL);
}else if (l_notify_socket_address && l_notify_socket_port ){
s_notify_server = dap_server_new( l_notify_socket_address,
l_notify_socket_port, DAP_SERVER_TCP, NULL);
}else{
uint16_t l_notify_addrs_count = 0;
char **l_notify_addrs = dap_config_get_array_str(g_config, "notify_server", "listen_path", &l_notify_addrs_count);
if( l_notify_addrs ) {
s_notify_server = dap_server_new(l_notify_addrs, l_notify_addrs_count, DAP_SERVER_LOCAL, NULL);
} else if (l_notify_addrs = dap_config_get_array_str(g_config, "notify_server", "listen_address", &l_notify_addrs_count)) {
s_notify_server = dap_server_new(l_notify_addrs, l_notify_addrs_count, DAP_SERVER_TCP, NULL);
} else {
log_it(L_INFO,"Notify server is not configured, nothing to init but thats okay");
return 0;
}
if (!s_notify_server)
if (!s_notify_server) {
log_it(L_ERROR,"Notify server not initalized, check config");
return -1;
}
s_notify_server->client_callbacks.new_callback = s_notify_server_callback_new;
s_notify_server->client_callbacks.delete_callback = s_notify_server_callback_delete;
s_notify_server_queue = dap_events_socket_create_type_queue_ptr_mt(dap_events_worker_get_auto(),s_notify_server_callback_queue);
uint32_t l_workers_count = dap_events_thread_get_count();
s_notify_server_queue_inter = DAP_NEW_Z_SIZE(dap_events_socket_t*,sizeof (dap_events_socket_t*)*l_workers_count );
DAP_NEW_Z_COUNT_RET_VAL(s_notify_server_queue_inter, dap_events_socket_t *, l_workers_count, -2, NULL);
for(uint32_t i = 0; i < l_workers_count; i++){
s_notify_server_queue_inter[i] = dap_events_socket_queue_ptr_create_input(s_notify_server_queue);
dap_events_socket_assign_on_worker_mt(s_notify_server_queue_inter[i], dap_events_worker_get(i));
......