diff --git a/3rdparty/uthash/src/utarray.h b/3rdparty/uthash/src/utarray.h index 223f950f1a0739c9c2ac47e5adf79fa2b25dad11..dc1cf8e13cea045315f7b7cecd9ff73ec88c3742 100644 --- a/3rdparty/uthash/src/utarray.h +++ b/3rdparty/uthash/src/utarray.h @@ -1,5 +1,5 @@ /* -Copyright (c) 2008-2018, Troy D. Hanson http://troydhanson.github.com/uthash/ +Copyright (c) 2008-2022, Troy D. Hanson https://troydhanson.github.io/uthash/ All rights reserved. Redistribution and use in source and binary forms, with or without @@ -26,7 +26,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #ifndef UTARRAY_H #define UTARRAY_H -#define UTARRAY_VERSION 2.1.0 +#define UTARRAY_VERSION 2.3.0 #include <stddef.h> /* size_t */ #include <string.h> /* memset, etc */ diff --git a/3rdparty/uthash/src/uthash.h b/3rdparty/uthash/src/uthash.h index 39eeca73abac61db33ecad100c653845a483ec3b..49c69df03010c23b2bc53833afe69dc802e38531 100644 --- a/3rdparty/uthash/src/uthash.h +++ b/3rdparty/uthash/src/uthash.h @@ -1,5 +1,5 @@ /* -Copyright (c) 2003-2018, Troy D. Hanson http://troydhanson.github.com/uthash/ +Copyright (c) 2003-2022, Troy D. Hanson https://troydhanson.github.io/uthash/ All rights reserved. Redistribution and use in source and binary forms, with or without @@ -24,12 +24,22 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #ifndef UTHASH_H #define UTHASH_H -#define UTHASH_VERSION 2.1.0 +#define UTHASH_VERSION 2.3.0 #include <string.h> /* memcmp, memset, strlen */ #include <stddef.h> /* ptrdiff_t */ #include <stdlib.h> /* exit */ +#if defined(HASH_DEFINE_OWN_STDINT) && HASH_DEFINE_OWN_STDINT +/* This codepath is provided for backward compatibility, but I plan to remove it. */ +#warning "HASH_DEFINE_OWN_STDINT is deprecated; please use HASH_NO_STDINT instead" +typedef unsigned int uint32_t; +typedef unsigned char uint8_t; +#elif defined(HASH_NO_STDINT) && HASH_NO_STDINT +#else +#include <stdint.h> /* uint8_t, uint32_t */ +#endif + /* These macros use decltype or the earlier __typeof GNU extension. As decltype is only available in newer compilers (VS2010 or gcc 4.3+ when compiling c++ source) this code uses whatever method is needed @@ -62,23 +72,6 @@ do { } while (0) #endif -/* a number of the hash function use uint32_t which isn't defined on Pre VS2010 */ -#if defined(_WIN32) -#if defined(_MSC_VER) && _MSC_VER >= 1600 -#include <stdint.h> -#elif defined(__WATCOMC__) || defined(__MINGW32__) || defined(__CYGWIN__) -#include <stdint.h> -#else -typedef unsigned int uint32_t; -typedef unsigned char uint8_t; -#endif -#elif defined(__GNUC__) && !defined(__VXWORKS__) -#include <stdint.h> -#else -typedef unsigned int uint32_t; -typedef unsigned char uint8_t; -#endif - #ifndef uthash_malloc #define uthash_malloc(sz) malloc(sz) /* malloc fcn */ #endif @@ -92,15 +85,12 @@ typedef unsigned char uint8_t; #define uthash_strlen(s) strlen(s) #endif -#ifdef uthash_memcmp -/* This warning will not catch programs that define uthash_memcmp AFTER including uthash.h. */ -#warning "uthash_memcmp is deprecated; please use HASH_KEYCMP instead" -#else -#define uthash_memcmp(a,b,n) memcmp(a,b,n) +#ifndef HASH_FUNCTION +#define HASH_FUNCTION(keyptr,keylen,hashv) HASH_JEN(keyptr, keylen, hashv) #endif #ifndef HASH_KEYCMP -#define HASH_KEYCMP(a,b,n) uthash_memcmp(a,b,n) +#define HASH_KEYCMP(a,b,n) memcmp(a,b,n) #endif #ifndef uthash_noexpand_fyi @@ -158,7 +148,7 @@ do { #define HASH_VALUE(keyptr,keylen,hashv) \ do { \ - HASH_FCN(keyptr, keylen, hashv); \ + HASH_FUNCTION(keyptr, keylen, hashv); \ } while (0) #define HASH_FIND_BYHASHVALUE(hh,head,keyptr,keylen,hashval,out) \ @@ -590,13 +580,6 @@ do { #define HASH_EMIT_KEY(hh,head,keyptr,fieldlen) #endif -/* default to Jenkin's hash unless overridden e.g. DHASH_FUNCTION=HASH_SAX */ -#ifdef HASH_FUNCTION -#define HASH_FCN HASH_FUNCTION -#else -#define HASH_FCN HASH_JEN -#endif - /* The Bernstein hash function, used in Perl prior to v5.6. Note (x<<5+x)=x*33. */ #define HASH_BER(key,keylen,hashv) \ do { \ @@ -610,7 +593,9 @@ do { /* SAX/FNV/OAT/JEN hash functions are macro variants of those listed at - * http://eternallyconfuzzled.com/tuts/algorithms/jsw_tut_hashing.aspx */ + * http://eternallyconfuzzled.com/tuts/algorithms/jsw_tut_hashing.aspx + * (archive link: https://archive.is/Ivcan ) + */ #define HASH_SAX(key,keylen,hashv) \ do { \ unsigned _sx_i; \ @@ -695,7 +680,8 @@ do { case 4: _hj_i += ( (unsigned)_hj_key[3] << 24 ); /* FALLTHROUGH */ \ case 3: _hj_i += ( (unsigned)_hj_key[2] << 16 ); /* FALLTHROUGH */ \ case 2: _hj_i += ( (unsigned)_hj_key[1] << 8 ); /* FALLTHROUGH */ \ - case 1: _hj_i += _hj_key[0]; \ + case 1: _hj_i += _hj_key[0]; /* FALLTHROUGH */ \ + default: ; \ } \ HASH_JEN_MIX(_hj_i, _hj_j, hashv); \ } while (0) @@ -743,6 +729,8 @@ do { case 1: hashv += *_sfh_key; \ hashv ^= hashv << 10; \ hashv += hashv >> 1; \ + break; \ + default: ; \ } \ \ /* Force "avalanching" of final 127 bits */ \ @@ -764,7 +752,7 @@ do { } \ while ((out) != NULL) { \ if ((out)->hh.hashv == (hashval) && (out)->hh.keylen == (keylen_in)) { \ - if (HASH_KEYCMP((out)->hh.key, keyptr, keylen_in) == 0) { \ + if (HASH_KEYCMP((out)->hh.key, keyptr, keylen_in) == 0) { \ break; \ } \ } \ @@ -850,12 +838,12 @@ do { struct UT_hash_handle *_he_thh, *_he_hh_nxt; \ UT_hash_bucket *_he_new_buckets, *_he_newbkt; \ _he_new_buckets = (UT_hash_bucket*)uthash_malloc( \ - 2UL * (tbl)->num_buckets * sizeof(struct UT_hash_bucket)); \ + sizeof(struct UT_hash_bucket) * (tbl)->num_buckets * 2U); \ if (!_he_new_buckets) { \ HASH_RECORD_OOM(oomed); \ } else { \ uthash_bzero(_he_new_buckets, \ - 2UL * (tbl)->num_buckets * sizeof(struct UT_hash_bucket)); \ + sizeof(struct UT_hash_bucket) * (tbl)->num_buckets * 2U); \ (tbl)->ideal_chain_maxlen = \ ((tbl)->num_items >> ((tbl)->log2_num_buckets+1U)) + \ ((((tbl)->num_items & (((tbl)->num_buckets*2U)-1U)) != 0U) ? 1U : 0U); \ diff --git a/3rdparty/uthash/src/utlist.h b/3rdparty/uthash/src/utlist.h index 5bb1ac9b72e556ea35d50a2fa2377bd50640e7f6..492908cf106ee6da8f10e76387822e8768fa80f2 100644 --- a/3rdparty/uthash/src/utlist.h +++ b/3rdparty/uthash/src/utlist.h @@ -1,5 +1,5 @@ /* -Copyright (c) 2007-2018, Troy D. Hanson http://troydhanson.github.com/uthash/ +Copyright (c) 2007-2022, Troy D. Hanson https://troydhanson.github.io/uthash/ All rights reserved. Redistribution and use in source and binary forms, with or without @@ -24,7 +24,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #ifndef UTLIST_H #define UTLIST_H -#define UTLIST_VERSION 2.1.0 +#define UTLIST_VERSION 2.3.0 #include <assert.h> diff --git a/3rdparty/uthash/src/utringbuffer.h b/3rdparty/uthash/src/utringbuffer.h index ce2890e60cd4a94b17424dab522b39bc0995cc7c..60341179853ff371407250c8e502fd76e8e23976 100644 --- a/3rdparty/uthash/src/utringbuffer.h +++ b/3rdparty/uthash/src/utringbuffer.h @@ -1,5 +1,5 @@ /* -Copyright (c) 2015-2018, Troy D. Hanson http://troydhanson.github.com/uthash/ +Copyright (c) 2015-2022, Troy D. Hanson https://troydhanson.github.io/uthash/ All rights reserved. Redistribution and use in source and binary forms, with or without @@ -26,7 +26,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #ifndef UTRINGBUFFER_H #define UTRINGBUFFER_H -#define UTRINGBUFFER_VERSION 2.1.0 +#define UTRINGBUFFER_VERSION 2.3.0 #include <stdlib.h> #include <string.h> diff --git a/3rdparty/uthash/src/utstack.h b/3rdparty/uthash/src/utstack.h index 3b0c1a0dff1860a0383ecfe5946de3a378f28ffe..94b8c513336f157b8ef937409247120ca016a713 100644 --- a/3rdparty/uthash/src/utstack.h +++ b/3rdparty/uthash/src/utstack.h @@ -1,5 +1,5 @@ /* -Copyright (c) 2018-2018, Troy D. Hanson http://troydhanson.github.com/uthash/ +Copyright (c) 2018-2022, Troy D. Hanson https://troydhanson.github.io/uthash/ All rights reserved. Redistribution and use in source and binary forms, with or without @@ -24,7 +24,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #ifndef UTSTACK_H #define UTSTACK_H -#define UTSTACK_VERSION 2.1.0 +#define UTSTACK_VERSION 2.3.0 /* * This file contains macros to manipulate a singly-linked list as a stack. @@ -35,9 +35,9 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * struct item { * int id; * struct item *next; - * } + * }; * - * struct item *stack = NULL: + * struct item *stack = NULL; * * int main() { * int count; diff --git a/3rdparty/uthash/src/utstring.h b/3rdparty/uthash/src/utstring.h index 4cf5ffd3dd42128ee8f46c56153ad572930f6ba2..f0270fb632324b4ce962e5c51c8db50ee9edcc30 100644 --- a/3rdparty/uthash/src/utstring.h +++ b/3rdparty/uthash/src/utstring.h @@ -1,5 +1,5 @@ /* -Copyright (c) 2008-2018, Troy D. Hanson http://troydhanson.github.com/uthash/ +Copyright (c) 2008-2022, Troy D. Hanson https://troydhanson.github.io/uthash/ All rights reserved. Redistribution and use in source and binary forms, with or without @@ -26,7 +26,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #ifndef UTSTRING_H #define UTSTRING_H -#define UTSTRING_VERSION 2.1.0 +#define UTSTRING_VERSION 2.3.0 #include <stdlib.h> #include <string.h> diff --git a/dap-sdk/core/src/dap_string.c b/dap-sdk/core/src/dap_string.c index eedf616d07972a2ff30fbfa43aa13075a503f64b..e71f82ae4f13777baeb3d7e4f59fec080acdcd36 100755 --- a/dap-sdk/core/src/dap_string.c +++ b/dap-sdk/core/src/dap_string.c @@ -69,7 +69,8 @@ dap_string_t * dap_string_sized_new(size_t a_dfl_size) l_string->str = NULL; dap_string_maybe_expand(l_string, max(a_dfl_size, 2)); - l_string->str[0] = 0; + if(l_string->str) + l_string->str[0] = 0; return l_string; } diff --git a/dap-sdk/io/dap_context.c b/dap-sdk/io/dap_context.c index 132b36ee290699fccd396beaf314845cb0dba4e8..bbcd6faedc8420d67453a2c640d4e63ccfedf1b6 100644 --- a/dap-sdk/io/dap_context.c +++ b/dap-sdk/io/dap_context.c @@ -101,6 +101,7 @@ dap_list_t * s_contexts; */ int dap_context_init() { + pthread_key_create(&g_dap_context_pth_key,NULL); #ifdef DAP_OS_UNIX struct rlimit l_fdlimit; if (getrlimit(RLIMIT_NOFILE, &l_fdlimit)) @@ -115,6 +116,11 @@ int dap_context_init() return 0; } +void dap_context_deinit() +{ + pthread_key_delete(g_dap_context_pth_key); +} + /** * @brief dap_context_new * @return @@ -1368,7 +1374,7 @@ int dap_context_remove( dap_events_socket_t * a_es) #if defined(DAP_EVENTS_CAPS_EPOLL) //Check if its present on current selection - for (ssize_t n = l_context->esocket_current; n< l_context->esockets_selected; n++ ){ + for (ssize_t n = l_context->esocket_current + 1; n< l_context->esockets_selected; n++ ){ struct epoll_event * l_event = &l_context->epoll_events[n]; if ( l_event->data.ptr == a_es ) // Found in selection l_event->data.ptr = NULL; // signal to skip on its iteration @@ -1388,7 +1394,7 @@ int dap_context_remove( dap_events_socket_t * a_es) if (a_es->socket != -1 && a_es->type != DESCRIPTOR_TYPE_TIMER){ // Check if its present on current selection - for (ssize_t n = l_context->esocket_current; n< l_context->esockets_selected; n++ ){ + for (ssize_t n = l_context->esocket_current+1; n< l_context->esockets_selected; n++ ){ struct kevent * l_kevent_selected = &l_context->kqueue_events_selected[n]; dap_events_socket_t * l_cur = NULL; diff --git a/dap-sdk/io/dap_events.c b/dap-sdk/io/dap_events.c index 368f81dec3945f68f1e3e09c86e8bfe5e1560bc5..ab17ac81330504e31a732b2c07078f70bd7faf6b 100644 --- a/dap-sdk/io/dap_events.c +++ b/dap-sdk/io/dap_events.c @@ -216,7 +216,7 @@ int dap_events_init( uint32_t a_threads_count, size_t a_conn_timeout ) a_threads_count = l_cpu_count; s_threads_count = a_threads_count ? a_threads_count : l_cpu_count; - + assert(s_threads_count); s_workers = DAP_NEW_Z_SIZE(dap_worker_t*,s_threads_count*sizeof (dap_worker_t*) ); if ( !s_workers ) return -1; diff --git a/dap-sdk/io/dap_events_socket.c b/dap-sdk/io/dap_events_socket.c index 5ba1761611d225c784b9c593f1348cbb6d5df475..b5eef34f91f7bda8834b1e9566027afd8f887056 100644 --- a/dap-sdk/io/dap_events_socket.c +++ b/dap-sdk/io/dap_events_socket.c @@ -505,9 +505,9 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket #endif l_es->flags = DAP_SOCK_QUEUE_PTR; - l_es->_pvt = DAP_NEW_Z(struct queue_ptr_input_pvt); - l_es->callbacks.delete_callback = s_socket_type_queue_ptr_input_callback_delete; - l_es->callbacks.queue_ptr_callback = a_es->callbacks.queue_ptr_callback; + //l_es->_pvt = DAP_NEW_Z(struct queue_ptr_input_pvt); + //l_es->callbacks.delete_callback = s_socket_type_queue_ptr_input_callback_delete; + //l_es->callbacks.queue_ptr_callback = a_es->callbacks.queue_ptr_callback; return l_es; } diff --git a/dap-sdk/io/dap_worker.c b/dap-sdk/io/dap_worker.c index 9f389b5a931a7a3016554deefa77d763dcb003d8..c5eba48f2f6480d13739707b3da216835cb23173 100644 --- a/dap-sdk/io/dap_worker.c +++ b/dap-sdk/io/dap_worker.c @@ -104,7 +104,7 @@ void dap_worker_context_callback_started( dap_context_t * a_context, void *a_arg l_worker->queue_callback = dap_context_create_queue(a_context, s_queue_callback_callback); - l_worker->timer_check_activity = dap_timerfd_create(s_connection_timeout * 1000 / 2, + l_worker->timer_check_activity = dap_timerfd_create (s_connection_timeout * 1000 / 2, s_socket_all_check_activity, l_worker); dap_worker_add_events_socket_unsafe( l_worker->timer_check_activity->events_socket, l_worker); diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 2290c9224ba89f7425cb5271c2a6dbaa930053d4..2cc11c374888ed2632f7e09fc68138d453a43bd0 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -754,7 +754,7 @@ static void s_gdb_in_pkt_proc_callback_get_ts_callback(dap_global_db_context_t * return; } - dap_store_obj_free_one(l_sync_request->obj); + dap_store_obj_free_one(l_obj); DAP_DELETE(l_sync_request); } /** diff --git a/modules/common/include/dap_chain_common.h b/modules/common/include/dap_chain_common.h index 50061ba043fc7ffeb21d44fab0c6751e3e845cec..059f11648713d6899721d54e336bb50959886d9f 100644 --- a/modules/common/include/dap_chain_common.h +++ b/modules/common/include/dap_chain_common.h @@ -89,6 +89,10 @@ typedef union dap_chain_node_addr { inline static int dap_chain_node_addr_from_str( dap_chain_node_addr_t * a_addr, const char * a_addr_str){ return (int) sscanf(a_addr_str,NODE_ADDR_FP_STR,NODE_ADDR_FPS_ARGS(a_addr) )-4; } + +inline static bool dap_chain_node_addr_not_null(dap_chain_node_addr_t * a_addr){ + return a_addr->uint64 != 0; +} /** * * diff --git a/modules/global-db/dap_global_db.c b/modules/global-db/dap_global_db.c index 75be266eb1934b1f5f8ee698fffe5ba34d30199f..65cf3db7248092e88a29279863a1d4e48cef62bf 100644 --- a/modules/global-db/dap_global_db.c +++ b/modules/global-db/dap_global_db.c @@ -78,8 +78,8 @@ struct queue_io_msg{ union{ struct{ // Raw request dap_store_obj_t * values_raw; - size_t values_raw_count; - size_t values_raw_shift; + uint64_t values_raw_total; + uint64_t values_raw_shift; }; struct{ //deserialized requests // Different variant of message params @@ -113,6 +113,8 @@ struct queue_io_msg{ }; + + static uint32_t s_global_db_version = 0; // Current GlobalDB version static pthread_cond_t s_check_db_cond = PTHREAD_COND_INITIALIZER; // Check version condition static pthread_mutex_t s_check_db_mutex = PTHREAD_MUTEX_INITIALIZER; // Check version condition mutex @@ -137,6 +139,10 @@ static void s_check_db_version_callback_set (dap_global_db_context_t * a_global_ static void s_context_callback_started( dap_context_t * a_context, void *a_arg); static void s_context_callback_stopped( dap_context_t * a_context, void *a_arg); + +// Opcode to string +static const char *s_msg_opcode_to_str(enum queue_io_msg_opcode a_opcode); + // Queue i/o processing callback static void s_queue_io_callback( dap_events_socket_t * a_es, void * a_arg); @@ -150,7 +156,7 @@ static bool s_msg_opcode_get_all_raw(struct queue_io_msg * a_msg); static bool s_msg_opcode_set(struct queue_io_msg * a_msg); static bool s_msg_opcode_set_raw(struct queue_io_msg * a_msg); -static bool s_msg_opcode_set_multiple(struct queue_io_msg * a_msg); +static bool s_msg_opcode_set_multiple_zc(struct queue_io_msg * a_msg); static bool s_msg_opcode_pin(struct queue_io_msg * a_msg); static bool s_msg_opcode_unpin(struct queue_io_msg * a_msg); static bool s_msg_opcode_delete(struct queue_io_msg * a_msg); @@ -188,7 +194,7 @@ int dap_global_db_init(const char * a_storage_path, const char * a_driver_name) log_it(L_CRITICAL, "Can't initialize GlobalDB without storage path"); } - if ( a_driver_name == NULL && a_driver_name == NULL ){ + if ( a_driver_name == NULL && s_driver_name == NULL ){ log_it(L_CRITICAL, "Can't initialize GlobalDB without driver name"); } @@ -274,7 +280,8 @@ int dap_global_db_get(const char * a_group, const char *a_key, dap_global_db_cal if (l_ret != 0){ log_it(L_ERROR, "Can't exec get request, code %d", l_ret); s_queue_io_msg_delete(l_msg); - } + }else + debug_if(g_dap_global_db_debug_more, L_DEBUG, "Have sent get request for %s:%s", a_group, a_key); return l_ret; } @@ -291,7 +298,7 @@ static bool s_msg_opcode_get(struct queue_io_msg * a_msg) &l_count_records); if(l_store_obj != NULL && l_count_records>=1){ if(a_msg->callback_result) - a_msg->callback_result(s_context_global_db, DAP_GLOBAL_DB_RC_SUCCESS, l_store_obj->group, l_store_obj->key, + a_msg->callback_result(s_context_global_db, DAP_GLOBAL_DB_RC_SUCCESS, l_store_obj->group, l_store_obj->key, l_store_obj->value, l_store_obj->value_len, l_store_obj->timestamp, l_store_obj->flags & RECORD_PINNED, a_msg->callback_arg ); dap_store_obj_free(l_store_obj,l_count_records); @@ -326,7 +333,8 @@ int dap_global_db_get_raw(const char * a_group, const char *a_key,dap_global_db_ if (l_ret != 0){ log_it(L_ERROR, "Can't exec get request, code %d", l_ret); s_queue_io_msg_delete(l_msg); - } + }else + debug_if(g_dap_global_db_debug_more, L_DEBUG, "Have sent get_raw request for %s:%s", a_group, a_key); return l_ret; } @@ -379,7 +387,8 @@ int dap_global_db_get_del_ts(const char * a_group, const char *a_key,dap_global_ if (l_ret != 0){ log_it(L_ERROR, "Can't exec get_del_ts request, code %d", l_ret); s_queue_io_msg_delete(l_msg); - } + }else + debug_if(g_dap_global_db_debug_more, L_DEBUG, "Have sent get_del_ts request for \"%s\" group \"%s\" key" , a_group, a_key); return l_ret; } @@ -449,7 +458,8 @@ int dap_global_db_get_last(const char * a_group, dap_global_db_callback_result_t if (l_ret != 0){ log_it(L_ERROR, "Can't exec get_last request, code %d", l_ret); s_queue_io_msg_delete(l_msg); - } + }else + debug_if(g_dap_global_db_debug_more, L_DEBUG, "Have sent get_last request for \"%s\" group", a_group); return l_ret; } @@ -499,7 +509,8 @@ int dap_global_db_get_all(const char * a_group,size_t a_results_page_size, dap_g if (l_ret != 0){ log_it(L_ERROR, "Can't exec get_all request, code %d", l_ret); s_queue_io_msg_delete(l_msg); - } + }else + debug_if(g_dap_global_db_debug_more, L_DEBUG, "Have sent get_all request for \"%s\" group", a_group); return l_ret; } @@ -516,15 +527,17 @@ static bool s_msg_opcode_get_all(struct queue_io_msg * a_msg) } dap_store_obj_t *l_store_objs = dap_chain_global_db_driver_cond_read(a_msg->group, a_msg->values_shift , &l_values_count); dap_global_db_obj_t *l_objs = NULL; - + debug_if(g_dap_global_db_debug_more, L_DEBUG,"Get all request from group %s recieved %zu values from total %zu",a_msg->group, + l_values_count, a_msg->value_length ); // Form objs from store_objs if(l_store_objs){ l_objs = DAP_NEW_Z_SIZE(dap_global_db_obj_t,sizeof(dap_global_db_obj_t)*l_values_count); for(int i = 0; i < l_values_count; i++){ l_objs[i].id = l_store_objs[i].id; l_objs[i].is_pinned = l_store_objs[i].flags & RECORD_PINNED; - l_objs[i].key = l_store_objs[i].key; - l_objs[i].value = l_store_objs[i].value; + l_objs[i].key = dap_strdup(l_store_objs[i].key); + l_objs[i].value = DAP_DUP_SIZE(l_store_objs[i].value, l_store_objs[i].value_len); + l_objs[i].value_len = l_store_objs[i].value_len; l_objs[i].timestamp = l_store_objs[i].timestamp; } } @@ -533,32 +546,36 @@ static bool s_msg_opcode_get_all(struct queue_io_msg * a_msg) // Call callback if present if(a_msg->callback_results) l_delete_objs = a_msg->callback_results(s_context_global_db, l_objs? DAP_GLOBAL_DB_RC_SUCCESS:DAP_GLOBAL_DB_RC_NO_RESULTS - , a_msg->group, a_msg->key, a_msg->values_total, l_values_count, - a_msg->values_shift, + , a_msg->group, a_msg->key, + a_msg->values_total,a_msg->values_shift, l_values_count, l_objs, a_msg->callback_arg ); // Clean memory if(l_store_objs) dap_store_obj_free(l_store_objs,l_values_count); if(l_objs && l_delete_objs) - DAP_DELETE(l_objs); + dap_global_db_objs_delete(l_objs,l_values_count); - // Check for values_shift overflow and update it - if(l_values_count && a_msg->values_shift< UINT64_MAX - l_values_count && - l_values_count + a_msg->values_shift < a_msg->values_total ){ - a_msg->values_shift += l_values_count; + // Here we also check if the reply was with zero values. To prevent endless loop we don't resend query request in such cases + if(a_msg->values_total && l_values_count){ + // Check for values_shift overflow and update it + if( a_msg->values_shift < (UINT64_MAX - l_values_count) && + l_values_count + a_msg->values_shift <= a_msg->values_total ){ + a_msg->values_shift += l_values_count; - } - - if( a_msg->values_shift < a_msg->values_total){ // Have to process callback again - int l_ret = dap_events_socket_queue_ptr_send(s_context_global_db->queue_io,a_msg); - if ( l_ret ){ - log_it(L_ERROR,"Can't resend i/o message for opcode GET_ALL after value shift %" - DAP_UINT64_FORMAT_U" error code %d", a_msg->values_shift,l_ret); - return true; }else - return false; // Don't delete it because it just sent again to the queue - }else // All values are sent - return true; + log_it(L_WARNING, "Values overflow, can't grow up, values_shift:%"DAP_UINT64_FORMAT_U" values_total:%"DAP_UINT64_FORMAT_U" values_current:%zu", a_msg->values_shift, a_msg->values_total, l_values_count ); + + if(a_msg->values_shift < a_msg->values_total ){ // Have to process callback again + int l_ret = dap_events_socket_queue_ptr_send(s_context_global_db->queue_io,a_msg); + debug_if(g_dap_global_db_debug_more, L_NOTICE, "Resending get all request values_shift:%"DAP_UINT64_FORMAT_U" values_total:%"DAP_UINT64_FORMAT_U" values_current:%zu", a_msg->values_shift, a_msg->values_total, l_values_count ); + if ( l_ret ){ + log_it(L_ERROR,"Can't resend i/o message for opcode GET_ALL after value shift %" + DAP_UINT64_FORMAT_U" (total values %" DAP_UINT64_FORMAT_U") error code %d", a_msg->values_shift, a_msg->values_total, l_ret); + }else + return false; // Don't delete it because it just sent again to the queue + } + } + return true; // All values are sent } /** @@ -589,7 +606,8 @@ int dap_global_db_get_all_raw(const char * a_group, uint64_t a_first_id,size_t a if (l_ret != 0){ log_it(L_ERROR, "Can't exec get_all_raw request, code %d", l_ret); s_queue_io_msg_delete(l_msg); - } + }else + debug_if(g_dap_global_db_debug_more, L_DEBUG, "Have sent get_all request for \"%s\" group", a_group); return l_ret; } @@ -602,38 +620,46 @@ static bool s_msg_opcode_get_all_raw(struct queue_io_msg * a_msg) { size_t l_values_count = 0; if(! a_msg->values_total){ // First msg process - a_msg->values_raw_count = dap_chain_global_db_driver_count(a_msg->group,0); + a_msg->values_raw_total = dap_chain_global_db_driver_count(a_msg->group,0); } dap_store_obj_t *l_store_objs = dap_chain_global_db_driver_cond_read(a_msg->group, a_msg->values_raw_shift , &l_values_count); - + debug_if(g_dap_global_db_debug_more, L_DEBUG,"Get all raw request from group %s recieved %zu values from total %zu",a_msg->group, + l_values_count, a_msg->value_length ); // Call callback if present if(a_msg->callback_results_raw) a_msg->callback_results_raw(s_context_global_db, l_store_objs? DAP_GLOBAL_DB_RC_SUCCESS:DAP_GLOBAL_DB_RC_NO_RESULTS - , a_msg->group, a_msg->key, a_msg->values_raw_count, l_values_count, - a_msg->values_raw_count, + , a_msg->group, a_msg->key,l_values_count, a_msg->values_raw_shift, + a_msg->values_raw_total, l_store_objs, a_msg->callback_arg ); // Clean memory if(l_store_objs) dap_store_obj_free(l_store_objs,l_values_count); - // Check for values_shift overflow and update it - if(l_values_count && a_msg->values_raw_count< UINT64_MAX - l_values_count && - l_values_count + a_msg->values_raw_count < a_msg->values_raw_count ){ - a_msg->values_raw_count += l_values_count; + // Here we also check if the reply was with zero values. To prevent endless loop we don't resend query request in such cases - } + if( a_msg->values_raw_total && l_values_count){ + // Check for values_shift overflow and update it + if( a_msg->values_raw_total && l_values_count && a_msg->values_raw_shift< UINT64_MAX - l_values_count && + l_values_count + a_msg->values_raw_shift <= a_msg->values_raw_total ){ + a_msg->values_raw_shift += l_values_count; - if( a_msg->values_shift < a_msg->values_raw_count){ // Have to process callback again - int l_ret = dap_events_socket_queue_ptr_send(s_context_global_db->queue_io,a_msg); - if ( l_ret ){ - log_it(L_ERROR,"Can't resend i/o message for opcode GET_ALL_RAW after value shift %" - DAP_UINT64_FORMAT_U" error code %d", a_msg->values_shift,l_ret); - return true; - }else - return false; // Don't delete it because it just sent again to the queue - }else // All values are sent - return true; + } else + log_it(L_WARNING, "Values overflow, can't grow up, values_raw_shift:%"DAP_UINT64_FORMAT_U" values_raw_total:%"DAP_UINT64_FORMAT_U" values_raw_current:%zu", + a_msg->values_raw_shift, a_msg->values_raw_total, l_values_count ); + + if( a_msg->values_raw_total && l_values_count && a_msg->values_raw_shift < a_msg->values_raw_total){ // Have to process callback again + int l_ret = dap_events_socket_queue_ptr_send(s_context_global_db->queue_io,a_msg); + debug_if(g_dap_global_db_debug_more, L_NOTICE, "Resending get all request values_raw_shift:%"DAP_UINT64_FORMAT_U" values_raw_total:%"DAP_UINT64_FORMAT_U" values_raw_current:%zu", a_msg->values_raw_shift, a_msg->values_raw_total, l_values_count ); + + if ( l_ret ){ + log_it(L_ERROR,"Can't resend i/o message for opcode GET_ALL_RAW after value shift %" + DAP_UINT64_FORMAT_U" error code %d", a_msg->values_shift,l_ret); + }else + return false; // Don't delete it because it just sent again to the queue + } + } + return true; // All values are sent } @@ -659,6 +685,9 @@ int dap_global_db_set(const char * a_group, const char *a_key, const void * a_va l_msg->opcode = MSG_OPCODE_SET; l_msg->group = dap_strdup(a_group); l_msg->key = dap_strdup(a_key); + l_msg->value = DAP_DUP_SIZE(a_value, a_value_length); + l_msg->value_length = a_value_length; + l_msg->value_is_pinned = a_pin_value; l_msg->callback_arg = a_arg; l_msg->callback_result = a_callback; @@ -666,7 +695,9 @@ int dap_global_db_set(const char * a_group, const char *a_key, const void * a_va if (l_ret != 0){ log_it(L_ERROR, "Can't exec set request, code %d", l_ret); s_queue_io_msg_delete(l_msg); - } + }else + debug_if(g_dap_global_db_debug_more, L_DEBUG, "Have sent sent request for \"%s\" group \"%s\" key" , a_group, a_key); + return l_ret; } @@ -681,9 +712,8 @@ static bool s_msg_opcode_set(struct queue_io_msg * a_msg) dap_nanotime_t l_ts_now = dap_nanotime_now(); l_store_data.key = a_msg->key ; l_store_data.flags =a_msg->value_is_pinned ? RECORD_PINNED : 0 ; - l_store_data.value_len = ( a_msg->value_length == (size_t) -1) ? - dap_strlen( a_msg->value) : a_msg->value_length; - l_store_data.value = a_msg->value? a_msg->value : NULL; + l_store_data.value_len = a_msg->value_length; + l_store_data.value = a_msg->value; l_store_data.group = (char*) a_msg->group; l_store_data.timestamp = l_ts_now; @@ -705,18 +735,29 @@ static bool s_msg_opcode_set(struct queue_io_msg * a_msg) a_msg->value, a_msg->value_length, l_ts_now, a_msg->value_is_pinned , a_msg->callback_arg ); } + if(a_msg->value) + DAP_DELETE(a_msg->value); return true; } +/** + * @brief dap_global_db_set_unsafe + * @param a_global_db_context + * @param a_group + * @param a_key + * @param a_value + * @param a_value_length + * @param a_pin_value + * @return + */ int dap_global_db_set_unsafe(dap_global_db_context_t * a_global_db_context, const char * a_group, const char *a_key, const void * a_value, const size_t a_value_length, bool a_pin_value ) { dap_store_obj_t l_store_data = { 0 }; dap_nanotime_t l_ts_now = dap_nanotime_now(); l_store_data.key = a_key ; l_store_data.flags = a_pin_value ? RECORD_PINNED : 0 ; - l_store_data.value_len = ( a_value_length == (size_t) -1) ? - dap_strlen( a_value) : a_value_length; - l_store_data.value = a_value? a_value : NULL; + l_store_data.value_len = a_value_length; + l_store_data.value = a_value; l_store_data.group = (char*) a_group; l_store_data.timestamp = l_ts_now; @@ -748,14 +789,15 @@ int dap_global_db_set_raw(dap_store_obj_t * a_store_objs, size_t a_store_objs_co l_msg->callback_arg = a_arg; l_msg->callback_results_raw = a_callback; - l_msg->values_raw = a_store_objs; - l_msg->values_raw_count = a_store_objs_count; + l_msg->values_raw = dap_store_obj_copy(a_store_objs,a_store_objs_count) ; + l_msg->values_raw_total = a_store_objs_count; int l_ret = dap_events_socket_queue_ptr_send(s_context_global_db->queue_io,l_msg); if (l_ret != 0){ log_it(L_ERROR, "Can't exec set_raw request, code %d", l_ret); s_queue_io_msg_delete(l_msg); - } + }else + debug_if(g_dap_global_db_debug_more, L_DEBUG, "Have sent set_raw request for %zu objects" , a_store_objs_count); return l_ret; } @@ -769,10 +811,10 @@ static bool s_msg_opcode_set_raw(struct queue_io_msg * a_msg) { int l_ret = -1; size_t i=0; - if(a_msg->values_raw_count>0){ - l_ret = dap_chain_global_db_driver_add(a_msg->values_raw,a_msg->values_raw_count); + if(a_msg->values_raw_total>0){ + l_ret = dap_chain_global_db_driver_add(a_msg->values_raw,a_msg->values_raw_total); if(l_ret == 0){ - for(; i < a_msg->values_raw_count ; i++ ) { + for(; i < a_msg->values_raw_total ; i++ ) { s_record_del_history_del(a_msg->values_raw[i].key , a_msg->values_raw[i].group); s_change_notify(&a_msg->values_raw[i] , a_msg->values_raw[i].type ); } @@ -782,15 +824,15 @@ static bool s_msg_opcode_set_raw(struct queue_io_msg * a_msg) if(a_msg->callback_results_raw){ a_msg->callback_results_raw (s_context_global_db, l_ret==0 ? DAP_GLOBAL_DB_RC_SUCCESS: DAP_GLOBAL_DB_RC_ERROR, a_msg->group, a_msg->key, - a_msg->values_raw_count, 0, a_msg->values_raw_count, a_msg->values_raw , + a_msg->values_raw_total, 0, a_msg->values_raw_total, a_msg->values_raw , a_msg->callback_arg ); } - + dap_store_obj_free(a_msg->values_raw, a_msg->values_raw_total); return true; } /** - * @brief dap_global_db_set_multiple + * @brief dap_global_db_set_multiple_zc Set multiple values, without duplication (zero copy, values are freed after set callback execution ) * @param a_group * @param a_values * @param a_values_count @@ -798,7 +840,7 @@ static bool s_msg_opcode_set_raw(struct queue_io_msg * a_msg) * @param a_arg * @return */ -int dap_global_db_set_multiple(const char * a_group, dap_global_db_obj_t * a_values, size_t a_values_count, dap_global_db_callback_results_t a_callback, void * a_arg ) +int dap_global_db_set_multiple_zc(const char * a_group, dap_global_db_obj_t * a_values, size_t a_values_count, dap_global_db_callback_results_t a_callback, void * a_arg ) { if(s_context_global_db == NULL){ log_it(L_ERROR, "GlobalDB context is not initialized, can't call dap_global_db_set"); @@ -816,7 +858,8 @@ int dap_global_db_set_multiple(const char * a_group, dap_global_db_obj_t * a_val if (l_ret != 0){ log_it(L_ERROR, "Can't exec set_multiple request, code %d", l_ret); s_queue_io_msg_delete(l_msg); - } + }else + debug_if(g_dap_global_db_debug_more, L_DEBUG, "Have sent set_multiple request for \"%s\" group with %zu values" , a_group, a_values_count); return l_ret; } @@ -825,7 +868,7 @@ int dap_global_db_set_multiple(const char * a_group, dap_global_db_obj_t * a_val * @param a_msg * @return */ -static bool s_msg_opcode_set_multiple(struct queue_io_msg * a_msg) +static bool s_msg_opcode_set_multiple_zc(struct queue_io_msg * a_msg) { int l_ret = -1; size_t i=0; @@ -847,12 +890,14 @@ static bool s_msg_opcode_set_multiple(struct queue_io_msg * a_msg) } } + bool l_delete_values = true; if(a_msg->callback_results){ - a_msg->callback_results(s_context_global_db, l_ret==0 ? DAP_GLOBAL_DB_RC_SUCCESS: + l_delete_values = a_msg->callback_results(s_context_global_db, l_ret==0 ? DAP_GLOBAL_DB_RC_SUCCESS: DAP_GLOBAL_DB_RC_ERROR, a_msg->group, a_msg->key, i, 0, a_msg->values_count, a_msg->values , a_msg->callback_arg ); } + dap_global_db_objs_delete( a_msg->values, a_msg->values_count); return true; } @@ -882,7 +927,8 @@ int dap_global_db_pin(const char * a_group, const char *a_key, dap_global_db_cal if (l_ret != 0){ log_it(L_ERROR, "Can't exec pin request, code %d", l_ret); s_queue_io_msg_delete(l_msg); - } + }else + debug_if(g_dap_global_db_debug_more, L_DEBUG, "Have sent pin request for \"%s\" group \"%s\" key" , a_group, a_key); return l_ret; } @@ -893,6 +939,27 @@ int dap_global_db_pin(const char * a_group, const char *a_key, dap_global_db_cal */ static bool s_msg_opcode_pin(struct queue_io_msg * a_msg) { + size_t l_count_records = 0; + dap_store_obj_t *l_store_obj = dap_chain_global_db_driver_read( a_msg->group, + a_msg->key, + &l_count_records); + if(l_store_obj != NULL && l_count_records>=1){ + l_store_obj->flags |= RECORD_PINNED; + int l_res = dap_chain_global_db_driver_add(l_store_obj,1); + if(l_res == 0){ + s_record_del_history_del(a_msg->key, a_msg->group); + s_change_notify(l_store_obj , l_store_obj->type ); + }else + log_it(L_ERROR,"Can't save pinned gdb data, code %d ", l_res); + + if(a_msg->callback_result) + a_msg->callback_result(s_context_global_db, l_res == 0? DAP_GLOBAL_DB_RC_SUCCESS : DAP_GLOBAL_DB_RC_ERROR, l_store_obj->group, l_store_obj->key, + l_store_obj->value, l_store_obj->value_len, l_store_obj->timestamp, + l_store_obj->flags & RECORD_PINNED, a_msg->callback_arg ); + dap_store_obj_free(l_store_obj,l_count_records); + }else if(a_msg->callback_result) + a_msg->callback_result(s_context_global_db, DAP_GLOBAL_DB_RC_NO_RESULTS, a_msg->group, a_msg->key, + NULL, 0, 0,0, a_msg->callback_arg ); return true; } @@ -921,7 +988,9 @@ int dap_global_db_unpin(const char * a_group, const char *a_key, dap_global_db_c if (l_ret != 0){ log_it(L_ERROR, "Can't exec unpin request, code %d", l_ret); s_queue_io_msg_delete(l_msg); - } + }else + debug_if(g_dap_global_db_debug_more, L_DEBUG, "Have sent unpin request for \"%s\" group \"%s\" key" , a_group, a_key); + return l_ret; } @@ -932,6 +1001,27 @@ int dap_global_db_unpin(const char * a_group, const char *a_key, dap_global_db_c */ static bool s_msg_opcode_unpin(struct queue_io_msg * a_msg) { + size_t l_count_records = 0; + dap_store_obj_t *l_store_obj = dap_chain_global_db_driver_read( a_msg->group, + a_msg->key, + &l_count_records); + if(l_store_obj != NULL && l_count_records>=1){ + l_store_obj->flags ^= RECORD_PINNED; + int l_res = dap_chain_global_db_driver_add(l_store_obj,1); + if(l_res == 0){ + s_record_del_history_del(a_msg->key, a_msg->group); + s_change_notify(l_store_obj , l_store_obj->type ); + }else + log_it(L_ERROR,"Can't save pinned gdb data, code %d ", l_res); + + if(a_msg->callback_result) + a_msg->callback_result(s_context_global_db, l_res == 0? DAP_GLOBAL_DB_RC_SUCCESS : DAP_GLOBAL_DB_RC_ERROR, l_store_obj->group, l_store_obj->key, + l_store_obj->value, l_store_obj->value_len, l_store_obj->timestamp, + l_store_obj->flags & RECORD_PINNED, a_msg->callback_arg ); + dap_store_obj_free(l_store_obj,l_count_records); + }else if(a_msg->callback_result) + a_msg->callback_result(s_context_global_db, DAP_GLOBAL_DB_RC_NO_RESULTS, a_msg->group, a_msg->key, + NULL, 0, 0,0, a_msg->callback_arg ); return true; } @@ -960,7 +1050,9 @@ int dap_global_db_del(const char * a_group, const char *a_key, dap_global_db_cal if (l_ret != 0){ log_it(L_ERROR, "Can't exec del request, code %d", l_ret); s_queue_io_msg_delete(l_msg); - } + }else + debug_if(g_dap_global_db_debug_more, L_DEBUG, "Have sent del request for \"%s\" group \"%s\" key" , a_group, a_key); + return l_ret; } @@ -1035,8 +1127,8 @@ static bool s_objs_get_callback (dap_global_db_context_t * a_global_db_context,i l_args->objs = a_values; l_args->objs_count = a_values_count; pthread_mutex_lock(&l_args->mutex); - pthread_cond_broadcast(&l_args->cond); pthread_mutex_unlock(&l_args->mutex); + pthread_cond_broadcast(&l_args->cond); return false; } @@ -1049,6 +1141,7 @@ static bool s_objs_get_callback (dap_global_db_context_t * a_global_db_context,i */ dap_global_db_obj_t* dap_global_db_get_all_sync(const char *a_group, size_t *a_objs_count) { + debug_if(g_dap_global_db_debug_more, L_DEBUG, "get_all sync call executes for group \"%s\"", a_group); struct objs_get * l_args = DAP_NEW_Z(struct objs_get); pthread_mutex_init(&l_args->mutex,NULL); pthread_cond_init(&l_args->cond,NULL); @@ -1093,8 +1186,8 @@ static void s_sync_op_result_callback (dap_global_db_context_t * a_global_db_con struct sync_op_result * l_args = (struct sync_op_result *) a_arg; l_args->result = a_rc; pthread_mutex_lock(&l_args->mutex); - pthread_cond_broadcast(&l_args->cond); pthread_mutex_unlock(&l_args->mutex); + pthread_cond_broadcast(&l_args->cond); } /** @@ -1109,6 +1202,8 @@ static void s_sync_op_result_callback (dap_global_db_context_t * a_global_db_con int dap_global_db_set_sync(const char * a_group, const char *a_key, const void * a_value, const size_t a_value_length, bool a_pin_value ) { struct sync_op_result * l_args = DAP_NEW_Z(struct sync_op_result); + debug_if(g_dap_global_db_debug_more, L_DEBUG, "set sync call executes for group \"%s\" key \"%s\"", a_group, a_key); + pthread_mutex_init(&l_args->mutex,NULL); pthread_cond_init(&l_args->cond,NULL); pthread_mutex_lock(&l_args->mutex); @@ -1135,8 +1230,10 @@ int dap_global_db_del_sync(const char * a_group, const char *a_key ) pthread_mutex_init(&l_args->mutex,NULL); pthread_cond_init(&l_args->cond,NULL); pthread_mutex_lock(&l_args->mutex); - dap_global_db_del(a_group, a_key, s_sync_op_result_callback, l_args); - pthread_cond_wait(&l_args->cond, &l_args->mutex); + if (dap_global_db_del(a_group, a_key, s_sync_op_result_callback, l_args) == 0) + pthread_cond_wait(&l_args->cond, &l_args->mutex); + else + l_args->result = -777; pthread_mutex_unlock(&l_args->mutex); pthread_mutex_destroy(&l_args->mutex); pthread_cond_destroy(&l_args->cond); @@ -1184,8 +1281,8 @@ static void s_store_obj_get_callback (dap_global_db_context_t * a_global_db_cont } pthread_mutex_lock(&l_args->mutex); - pthread_cond_broadcast(&l_args->cond); pthread_mutex_unlock(&l_args->mutex); + pthread_cond_broadcast(&l_args->cond); } /** @@ -1243,14 +1340,16 @@ static bool s_get_all_raw_sync_callback (dap_global_db_context_t * a_global_db_c l_args->objs = a_values; l_args->objs_count = a_values_count; pthread_mutex_lock(&l_args->mutex); - pthread_cond_broadcast(&l_args->cond); pthread_mutex_unlock(&l_args->mutex); + pthread_cond_broadcast(&l_args->cond); return false; } dap_store_obj_t* dap_global_db_get_all_raw_sync(const char *a_group, uint64_t a_first_id, size_t *a_objs_count) { struct store_objs_get * l_args = DAP_NEW_Z(struct store_objs_get); + debug_if(g_dap_global_db_debug_more, L_DEBUG, "get_all_raw sync call executes for group %s", a_group); + pthread_mutex_init(&l_args->mutex,NULL); pthread_cond_init(&l_args->cond,NULL); pthread_mutex_lock(&l_args->mutex); @@ -1400,7 +1499,30 @@ static bool s_msg_opcode_context_exec(struct queue_io_msg * a_msg) return true; } - +/** + * @brief s_msg_opcode_to_str + * @param a_opcode + * @return + */ +static const char *s_msg_opcode_to_str(enum queue_io_msg_opcode a_opcode) +{ + switch(a_opcode){ + case MSG_OPCODE_GET: return "GET"; + case MSG_OPCODE_GET_RAW: return "GET_RAW"; + case MSG_OPCODE_GET_LAST: return "GET_LAST"; + case MSG_OPCODE_GET_ALL: return "GET_ALL"; + case MSG_OPCODE_GET_ALL_RAW: return "GET_ALL_RAW"; + case MSG_OPCODE_SET: return "SET"; + case MSG_OPCODE_SET_MULTIPLE: return "SET_MULTIPLE"; + case MSG_OPCODE_SET_RAW: return "SET_RAW"; + case MSG_OPCODE_PIN: return "PIN"; + case MSG_OPCODE_UNPIN: return "UNPIN"; + case MSG_OPCODE_DELETE: return "DELETE"; + case MSG_OPCODE_FLUSH: return "FLUSH"; + case MSG_OPCODE_CONTEXT_EXEC: return "CONTEXT_EXEC"; + default: return "UNKNOWN"; + } +} /** * @brief s_queue_io_callback @@ -1415,6 +1537,8 @@ static void s_queue_io_callback( dap_events_socket_t * a_es, void * a_arg) bool l_msg_delete = false; // if msg resent again it shouldn't be deleted in the end of callback assert(l_msg); + debug_if(g_dap_global_db_debug_more, L_NOTICE, "Received GlobalDB I/O message with opcode %s", s_msg_opcode_to_str(l_msg->opcode) ); + switch(l_msg->opcode){ case MSG_OPCODE_GET: l_msg_delete = s_msg_opcode_get(l_msg); break; case MSG_OPCODE_GET_RAW: l_msg_delete = s_msg_opcode_get_raw(l_msg); break; @@ -1422,7 +1546,7 @@ static void s_queue_io_callback( dap_events_socket_t * a_es, void * a_arg) case MSG_OPCODE_GET_ALL: l_msg_delete = s_msg_opcode_get_all(l_msg); break; case MSG_OPCODE_GET_ALL_RAW: l_msg_delete = s_msg_opcode_get_all_raw(l_msg); break; case MSG_OPCODE_SET: l_msg_delete = s_msg_opcode_set(l_msg); break; - case MSG_OPCODE_SET_MULTIPLE: l_msg_delete = s_msg_opcode_set_multiple(l_msg); break; + case MSG_OPCODE_SET_MULTIPLE: l_msg_delete = s_msg_opcode_set_multiple_zc(l_msg); break; case MSG_OPCODE_SET_RAW: l_msg_delete = s_msg_opcode_set_raw(l_msg); break; case MSG_OPCODE_PIN: l_msg_delete = s_msg_opcode_pin(l_msg); break; case MSG_OPCODE_UNPIN: l_msg_delete = s_msg_opcode_unpin(l_msg); break; @@ -1523,10 +1647,74 @@ int l_res = -1; */ static void s_queue_io_msg_delete( struct queue_io_msg * a_msg) { - if (a_msg->group) - DAP_DELETE(a_msg->group); - if (a_msg->key) - DAP_DELETE(a_msg->key); + switch(a_msg->opcode){ + case MSG_OPCODE_GET: + if (a_msg->group) + DAP_DELETE(a_msg->group); + if (a_msg->key) + DAP_DELETE(a_msg->key); + break; + case MSG_OPCODE_GET_RAW: + if (a_msg->group) + DAP_DELETE(a_msg->group); + if (a_msg->key) + DAP_DELETE(a_msg->key); + break; + case MSG_OPCODE_GET_DEL_TS: + if (a_msg->group) + DAP_DELETE(a_msg->group); + if (a_msg->key) + DAP_DELETE(a_msg->key); + break; + case MSG_OPCODE_GET_LAST: + if (a_msg->group) + DAP_DELETE(a_msg->group); + break; + case MSG_OPCODE_GET_LAST_RAW: + if (a_msg->group) + DAP_DELETE(a_msg->group); + break; + case MSG_OPCODE_GET_ALL: + if (a_msg->group) + DAP_DELETE(a_msg->group); + break; + case MSG_OPCODE_GET_ALL_RAW: + if (a_msg->group) + DAP_DELETE(a_msg->group); + break; + case MSG_OPCODE_SET: + if (a_msg->group) + DAP_DELETE(a_msg->group); + if (a_msg->key) + DAP_DELETE(a_msg->key); + break; + case MSG_OPCODE_SET_RAW: + break; + case MSG_OPCODE_SET_MULTIPLE: + if (a_msg->group) + DAP_DELETE(a_msg->group); + break; + case MSG_OPCODE_PIN: + if (a_msg->group) + DAP_DELETE(a_msg->group); + if (a_msg->key) + DAP_DELETE(a_msg->key); + break; + case MSG_OPCODE_UNPIN: + if (a_msg->group) + DAP_DELETE(a_msg->group); + if (a_msg->key) + DAP_DELETE(a_msg->key); + break; + case MSG_OPCODE_DELETE: + if (a_msg->group) + DAP_DELETE(a_msg->group); + if (a_msg->key) + DAP_DELETE(a_msg->key); + break; + default:; + } + DAP_DELETE(a_msg); } diff --git a/modules/global-db/include/dap_global_db.h b/modules/global-db/include/dap_global_db.h index 8e237b265a867397140e86375755b586459dbbd0..5f2bb95daad4085872545224df7ac31391dd2615 100644 --- a/modules/global-db/include/dap_global_db.h +++ b/modules/global-db/include/dap_global_db.h @@ -82,21 +82,51 @@ typedef struct dap_global_db_obj { typedef void (*dap_global_db_callback_t) (dap_global_db_context_t * a_global_db_context, void * a_arg); -typedef void (*dap_global_db_callback_result_t) (dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const void * a_value, const size_t a_value_size, dap_nanotime_t a_value_ts, bool a_is_pinned, void * a_arg); +/** + * @brief callback for single result + * @arg a_rc DAP_GLOBAL_DB_RC_SUCCESS if success others if not + */ +typedef void (*dap_global_db_callback_result_t) (dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const void * a_value, + const size_t a_value_size, dap_nanotime_t a_value_ts, bool a_is_pinned, void * a_arg); + +/** + * @brief callback for single raw result + * @arg a_rc DAP_GLOBAL_DB_RC_SUCCESS if success others if not + * @return true if we need to free a_store_obj, false otherwise. + */ typedef bool (*dap_global_db_callback_result_raw_t) (dap_global_db_context_t * a_global_db_context,int a_rc, dap_store_obj_t * a_store_obj, void * a_arg); -typedef bool (*dap_global_db_callback_results_t) (dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_total, const size_t a_values_shift, +/** + * @brief callback for multiple result, with pagination + * @arg a_rc DAP_GLOBAL_DB_RC_SUCCESS if success others if not + * @arg a_values_total Total values number + * @arg a_values_shift Current shift from beginning of values set + * @arg a_values_count Current number of items in a_values + * @arg a_values Current items (page of items) + * @arg a_arg Custom argument + * @return true if we need to free a_store_obj, false otherwise. + */ +typedef bool (*dap_global_db_callback_results_t) (dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, + const size_t a_values_total, const size_t a_values_shift, const size_t a_values_count, dap_global_db_obj_t * a_values, void * a_arg); -typedef bool (*dap_global_db_callback_results_raw_t) (dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, const size_t a_values_current, const size_t a_values_shift, +/** + * @brief callback for multiple raw result, with pagination + * @arg a_rc DAP_GLOBAL_DB_RC_SUCCESS if success other sif not + * @arg a_values_total Total values number + * @arg a_values_shift Current shift from beginning of values set + * @arg a_values_count Current number of items in a_values + * @arg a_values Current items (page of items) + * @return true if we need to free a_store_obj, false otherwise. + */ +typedef bool (*dap_global_db_callback_results_raw_t) (dap_global_db_context_t * a_global_db_context,int a_rc, const char * a_group, const char * a_key, + const size_t a_values_current, const size_t a_values_shift, const size_t a_values_count, dap_store_obj_t * a_values, void * a_arg); // Return codes #define DAP_GLOBAL_DB_RC_SUCCESS 0 #define DAP_GLOBAL_DB_RC_NO_RESULTS -1 #define DAP_GLOBAL_DB_RC_ERROR -666 - - extern bool g_dap_global_db_debug_more; int dap_global_db_init(const char * a_path, const char * a_driver); @@ -116,7 +146,7 @@ int dap_global_db_set(const char * a_group, const char *a_key, const void * a_va int dap_global_db_set_raw(dap_store_obj_t * a_store_objs, size_t a_store_objs_count, dap_global_db_callback_results_raw_t a_callback, void * a_arg ); // Set multiple. In callback writes total processed objects to a_values_total and a_values_count to the a_values_count as well -int dap_global_db_set_multiple(const char * a_group, dap_global_db_obj_t * a_values, size_t a_values_count, dap_global_db_callback_results_t a_callback, void * a_arg ); +int dap_global_db_set_multiple_zc(const char * a_group, dap_global_db_obj_t * a_values, size_t a_values_count, dap_global_db_callback_results_t a_callback, void * a_arg ); int dap_global_db_pin(const char * a_group, const char *a_key, dap_global_db_callback_result_t a_callback, void * a_arg ); int dap_global_db_unpin(const char * a_group, const char *a_key, dap_global_db_callback_result_t a_callback, void * a_arg ); int dap_global_db_del(const char * a_group, const char *a_key, dap_global_db_callback_result_t a_callback, void * a_arg ); diff --git a/modules/mempool/dap_chain_mempool.c b/modules/mempool/dap_chain_mempool.c index 4d1c8a047a3a8e85895ec6f2ba23e53049d65983..4c469ca2fd16dabc69edb91405af182a5bf56aeb 100644 --- a/modules/mempool/dap_chain_mempool.c +++ b/modules/mempool/dap_chain_mempool.c @@ -360,7 +360,7 @@ int dap_chain_mempool_tx_create_massive( dap_chain_t * a_chain, dap_enc_key_t *a char * l_gdb_group = dap_chain_net_get_gdb_group_mempool_new(a_chain); //return 0; - dap_global_db_set_multiple(l_gdb_group, l_objs,a_tx_num, s_tx_create_massive_gdb_save_callback , NULL ); + dap_global_db_set_multiple_zc(l_gdb_group, l_objs,a_tx_num, s_tx_create_massive_gdb_save_callback , NULL ); DAP_DELETE(l_gdb_group); return 0; } diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index ddd4952c955487e694a22ea33050055ce29fac27..dd66ea0691728d69a2b88554d1138910760a02f8 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -750,8 +750,10 @@ static dap_chain_node_info_t *s_get_dns_link_from_cfg(dap_chain_net_t *a_net) l_addr = l_net_pvt->bootstrap_nodes_addrs[i]; l_port = l_net_pvt->bootstrap_nodes_ports[i]; } - if (!l_addr.s_addr) + if (!l_addr.s_addr){ + log_it(L_WARNING,"Can't find address to connect at all"); return NULL; + } dap_chain_node_info_t *l_link_node_info = DAP_NEW_Z(dap_chain_node_info_t); if(! l_link_node_info){ log_it(L_CRITICAL,"Can't allocate memory for node link info"); diff --git a/modules/net/dap_chain_node.c b/modules/net/dap_chain_node.c index dba3a0a5b6fc5301ec79fdcfede3b9c9b3dfaa33..02402d03ad58f3e23a31fe80da06c9b5678f27e6 100644 --- a/modules/net/dap_chain_node.c +++ b/modules/net/dap_chain_node.c @@ -83,8 +83,7 @@ bool dap_chain_node_check_addr(dap_chain_net_t *a_net, dap_chain_node_addr_t *a_ */ bool dap_chain_node_alias_register(dap_chain_net_t *a_net, const char *a_alias, dap_chain_node_addr_t *a_addr) { - return dap_global_db_set(a_net->pub.gdb_nodes_aliases, a_alias, a_addr, sizeof(dap_chain_node_addr_t),true, - NULL, NULL)==0; + return dap_global_db_set_sync(a_net->pub.gdb_nodes_aliases, a_alias, a_addr, sizeof(dap_chain_node_addr_t),true)==0; } /** @@ -159,7 +158,7 @@ int dap_chain_node_info_save(dap_chain_net_t * a_net, dap_chain_node_info_t *a_n } //char *a_value = dap_chain_node_info_serialize(node_info, NULL); size_t l_node_info_size = dap_chain_node_info_get_size(a_node_info); - int l_res = dap_global_db_set( a_net->pub.gdb_nodes, l_key, a_node_info, l_node_info_size, true, NULL, NULL); + int l_res = dap_global_db_set_sync( a_net->pub.gdb_nodes, l_key, a_node_info, l_node_info_size, true); DAP_DELETE(l_key); diff --git a/modules/net/dap_chain_node_cli_cmd.c b/modules/net/dap_chain_node_cli_cmd.c index a787a58fe1fa4ec9bfbd44ae5a793ab315158932..e990f9e88561c0306f8dc04c5cfb07545082a428 100644 --- a/modules/net/dap_chain_node_cli_cmd.c +++ b/modules/net/dap_chain_node_cli_cmd.c @@ -633,6 +633,11 @@ static int node_info_dump_with_reply(dap_chain_net_t * a_net, dap_chain_node_add for(size_t i = 0; i < l_nodes_count; i++) { dap_chain_node_info_t *l_node_info = (dap_chain_node_info_t *)l_objs[i].value; // read node + if ( !dap_chain_node_addr_not_null(&l_node_info->hdr.address)){ + log_it(L_ERROR, "Node address is NULL"); + continue; + } + dap_chain_node_info_t *l_node_info_read = node_info_read_and_reply(a_net, &l_node_info->hdr.address, NULL); if (!l_node_info_read) { log_it(L_ERROR, "Invalid node info object, remove it"); diff --git a/modules/net/include/dap_chain_node.h b/modules/net/include/dap_chain_node.h index d14c6afbe7998d0ce6a9bf8950307c1e263c4f16..ea796943cd098277b8e1d2d9ba555d4baf9640b1 100644 --- a/modules/net/include/dap_chain_node.h +++ b/modules/net/include/dap_chain_node.h @@ -106,6 +106,9 @@ typedef dap_list_t dap_chain_node_info_list_t; #define DAP_CHAIN_NODE_MEMPOOL_INTERVAL 1000 // milliseconds + + + /** * Calculate size of struct dap_chain_node_info_t */ diff --git a/modules/type/blocks/dap_chain_cs_blocks.c b/modules/type/blocks/dap_chain_cs_blocks.c index 9bcf617fc3210237c5931f2f681aab702096ebc5..57b00cbed2edaec1bf4f2390027e85551f6e4386 100644 --- a/modules/type/blocks/dap_chain_cs_blocks.c +++ b/modules/type/blocks/dap_chain_cs_blocks.c @@ -690,8 +690,11 @@ static int s_add_atom_to_ledger(dap_chain_cs_blocks_t * a_blocks, dap_ledger_t * l_res=-1; } if (l_res != 1 ){ - log_it(L_WARNING, "Can't load datum #%zu (%s) from block %s to ledger: code %d", i, - dap_chain_datum_type_id_to_str(l_datum->header.type_id), + char l_time_str[256]; + l_time_str[0] = '\0'; + dap_time_to_str_rfc822(l_time_str, sizeof(l_time_str)-1, l_datum->header.ts_create ); + log_it(L_WARNING, "Can't load datum #%zu %s with hash %s from block %s to ledger: code %d", i, + dap_chain_datum_type_id_to_str(l_datum->header.type_id ), l_time_str, a_block_cache->block_hash_str, l_res); break; }