From f73f18514da084f0451bab359b0fd4b614aed977 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Thu, 23 Jun 2022 19:53:17 +0000 Subject: [PATCH] bugs-6383 --- 3rdparty/cuttdb/CMakeLists.txt | 13 +- 3rdparty/cuttdb/src/vio_apnd2.c | 2 +- CMakeLists.txt | 31 +---- cmake/OS_Detection.cmake | 6 +- dap-sdk/core/include/dap_common.h | 55 +++----- dap-sdk/core/include/dap_strfuncs.h | 2 +- dap-sdk/core/include/dap_time.h | 1 + dap-sdk/core/src/dap_common.c | 122 ++---------------- dap-sdk/core/src/dap_strfuncs.c | 12 +- dap-sdk/core/src/dap_time.c | 20 ++- dap-sdk/crypto/src/dap_cert_file.c | 16 +-- dap-sdk/io/dap_context.c | 94 ++++++-------- dap-sdk/io/dap_events_socket.c | 65 +++++----- dap-sdk/io/dap_proc_queue.c | 51 +------- dap-sdk/io/dap_timerfd.c | 52 +++----- dap-sdk/io/include/dap_events_socket.h | 2 +- dap-sdk/io/include/dap_proc_queue.h | 10 +- modules/chain/dap_chain_ledger.c | 2 +- modules/channel/chain/dap_stream_ch_chain.c | 109 ++++++++-------- .../chain/include/dap_stream_ch_chain.h | 2 +- .../block-ton/dap_chain_cs_block_ton.c | 12 +- modules/global-db/dap_chain_global_db.c | 14 +- .../dap_chain_global_db_driver_mdbx.c | 14 +- .../global-db/include/dap_chain_global_db.h | 6 - .../include/dap_chain_global_db_driver.h | 10 +- .../xchange/dap_chain_net_srv_xchange.c | 2 +- 26 files changed, 256 insertions(+), 469 deletions(-) diff --git a/3rdparty/cuttdb/CMakeLists.txt b/3rdparty/cuttdb/CMakeLists.txt index fcb7e8a592..948b986642 100755 --- a/3rdparty/cuttdb/CMakeLists.txt +++ b/3rdparty/cuttdb/CMakeLists.txt @@ -5,27 +5,22 @@ project(dap_cuttdb C) add_definitions ("-D_GNU_SOURCE") set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC") -file(GLOB cuttdb_src src/*.c) -file(GLOB cuttdb_h src/*.h) +file(GLOB cuttdb_src RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} src/*.c) +file(GLOB cuttdb_h RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} src/*.h) +list(APPEND cuttdb_src ${cuttdb_h}) # the server part ain't ported, and thus not built, so are tests. list(FILTER cuttdb_src EXCLUDE REGEX "ae_") -list(FILTER cuttdb_h EXCLUDE REGEX "ae_") list(FILTER cuttdb_src EXCLUDE REGEX "server.") -list(FILTER cuttdb_h EXCLUDE REGEX "server.") list(FILTER cuttdb_src EXCLUDE REGEX "dump.") -list(FILTER cuttdb_h EXCLUDE REGEX "dump.") list(FILTER cuttdb_src EXCLUDE REGEX "builddb.") -list(FILTER cuttdb_h EXCLUDE REGEX "builddb.") list(FILTER cuttdb_src EXCLUDE REGEX "test_mt.") -list(FILTER cuttdb_h EXCLUDE REGEX "test_mt.") if(UNIX) list(FILTER cuttdb_src EXCLUDE REGEX "mman.") - list(FILTER cuttdb_h EXCLUDE REGEX "mman.") endif() -add_library(${PROJECT_NAME} STATIC ${cuttdb_src} ${cuttdb_h}) +add_library(${PROJECT_NAME} STATIC ${cuttdb_src}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE C) set_target_properties(${PROJECT_NAME} PROPERTIES COMPILER_LANGUAGE C) diff --git a/3rdparty/cuttdb/src/vio_apnd2.c b/3rdparty/cuttdb/src/vio_apnd2.c index fca40584a8..68dc326df1 100644 --- a/3rdparty/cuttdb/src/vio_apnd2.c +++ b/3rdparty/cuttdb/src/vio_apnd2.c @@ -423,7 +423,7 @@ static int _vio_apnd2_open(CDBVIO *vio, const char *filepath, int flags) if (flags & CDB_TRUNC) rflags |= O_TRUNC; - if (_vio_apnd2_checkpid(vio) < 0) { + if (/*_vio_apnd2_checkpid(vio) <*/ 0) { goto ERRRET; } diff --git a/CMakeLists.txt b/CMakeLists.txt index 8f3f2e6fce..8c27ff6aee 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,35 +8,14 @@ add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(BUILD_CRYPTO_TESTS ON) -if(NOT DEFINED ${CELLFRAME_MODULES}) - include (cmake/OS_Detection.cmake) - if (WIN32) - set(CELLFRAME_MODULES "core chains mining network srv cs-dag-poa cs-block-poa cs-dag-pos cs-block-pos cs-block-ton cs-none srv-app srv-app-db srv-datum srv-stake srv-xchange") - elseif(BSD) - set(CELLFRAME_MODULES "core chains mining network srv cs-dag-poa cs-block-poa cs-dag-pos cs-block-pos cs-block-ton cs-none srv-app srv-app-db srv-datum srv-stake srv-xchange") - elseif(DARWIN) - set(CELLFRAME_MODULES "core chains mining network srv cs-dag-poa cs-block-poa cs-dag-pos cs-block-pos cs-block-ton cs-none srv-app srv-app-db srv-datum srv-stake srv-xchange") - elseif(ANDROID) - set(CELLFRAME_MODULES "core chains mining network srv cs-dag-poa cs-block-poa cs-dag-pos cs-block-pos cs-block-ton cs-none srv-app srv-app-db srv-datum srv-stake srv-xchange") - elseif(LINUX) - set(CELLFRAME_MODULES "core chains mining network srv cs-dag-poa cs-block-poa cs-dag-pos cs-block-pos cs-block-ton cs-none srv-app srv-app-db srv-datum srv-stake srv-xchange modules-dynamic srv-vpn") - endif() -endif() - set(DAPSDK_MODULES "") -if(NOT DEFINED ${CELLFRAME_MODULES}) - include (cmake/OS_Detection.cmake) - if (WIN32) - set(CELLFRAME_MODULES "core chains mining network srv cs-dag-poa cs-block-poa cs-dag-pos cs-block-pos cs-block-ton cs-none srv-app srv-app-db srv-datum srv-stake srv-xchange") - elseif(BSD) - set(CELLFRAME_MODULES "core chains mining network srv cs-dag-poa cs-block-poa cs-dag-pos cs-block-pos cs-block-ton cs-none srv-app srv-app-db srv-datum srv-stake srv-xchange") - elseif(DARWIN) - set(CELLFRAME_MODULES "core chains mining network srv cs-dag-poa cs-block-poa cs-dag-pos cs-block-pos cs-block-ton cs-none srv-app srv-app-db srv-datum srv-stake srv-xchange") - elseif(ANDROID) - set(CELLFRAME_MODULES "core chains mining network srv cs-dag-poa cs-block-poa cs-dag-pos cs-block-pos cs-block-ton cs-none srv-app srv-app-db srv-datum srv-stake srv-xchange") - elseif(LINUX) +if(NOT DEFINED CELLFRAME_MODULES) + include (cmake/OS_Detection.cmake) + if(LINUX) set(CELLFRAME_MODULES "core chains mining network srv cs-dag-poa cs-block-poa cs-dag-pos cs-block-pos cs-block-ton cs-none srv-app srv-app-db srv-datum srv-stake srv-xchange modules-dynamic srv-vpn") + elseif(WIN32 OR BSD OR DARWIN OR ANDROID) + set(CELLFRAME_MODULES "core chains mining network srv cs-dag-poa cs-block-poa cs-dag-pos cs-block-pos cs-block-ton cs-none srv-app srv-app-db srv-datum srv-stake srv-xchange") endif() endif() diff --git a/cmake/OS_Detection.cmake b/cmake/OS_Detection.cmake index 8271975aac..3a3af67d2a 100644 --- a/cmake/OS_Detection.cmake +++ b/cmake/OS_Detection.cmake @@ -138,12 +138,7 @@ endif() if(WIN32) message(STATUS "[*] Building for Windows") - add_definitions("-DHAVE_PREAD") - add_definitions("-DHAVE_MMAP") - add_definitions("-DHAVE_STRNDUP") - add_definitions ("-DUNDEBUG") - add_definitions ("-DNDEBUG") add_definitions ("-DWIN32") add_definitions ("-D_WINDOWS") add_definitions ("-D__WINDOWS__") @@ -162,6 +157,7 @@ if(WIN32) set(_CCOPT "-mconsole -static -Wall -std=gnu11 -Wextra -Wno-deprecated-declarations -Wno-unused-local-typedefs -Wno-unused-function -Wno-implicit-fallthrough -Wno-unused-variable -Wno-unused-parameter -Wno-unused-but-set-variable -g3 -ggdb -fno-strict-aliasing -fno-eliminate-unused-debug-symbols -pg") set(_LOPT "-mconsole -static -pg") else() + add_definitions ("-DNDEBUG") set(_CCOPT "-static -std=gnu11 -Wall -Wextra -Wno-deprecated-declarations -Wno-unused-local-typedefs -Wno-unused-function -Wno-implicit-fallthrough -Wno-unused-variable -Wno-unused-parameter -Wno-unused-but-set-variable -O3 -fno-ident -ffast-math -fno-strict-aliasing -ftree-vectorize -mfpmath=sse -mmmx -msse2 -fno-asynchronous-unwind-tables -ffunction-sections -Wl,--gc-sections -Wl,--strip-all") endif() diff --git a/dap-sdk/core/include/dap_common.h b/dap-sdk/core/include/dap_common.h index 308eb1b16e..ec943d9c92 100755 --- a/dap-sdk/core/include/dap_common.h +++ b/dap-sdk/core/include/dap_common.h @@ -386,8 +386,10 @@ static const uint16_t s_ascii_table_data[256] = { //const uint16_t * const c_dap_ascii_table = s_ascii_table_data; -#define dap_ascii_isspace(c) (s_ascii_table_data[(unsigned char) (c)] & DAP_ASCII_SPACE) != 0 -#define dap_ascii_isalpha(c) (s_ascii_table_data[(unsigned char) (c)] & DAP_ASCII_ALPHA) != 0 +#define dap_ascii_isspace(c) (s_ascii_table_data[(unsigned char) (c)] & DAP_ASCII_SPACE) +#define dap_ascii_isalpha(c) (s_ascii_table_data[(unsigned char) (c)] & DAP_ASCII_ALPHA) +#define dap_ascii_isalnum(c) (s_ascii_table_data[(unsigned char) (c)] & DAP_ASCII_ALNUM) +#define dap_ascii_isdigit(c) (s_ascii_table_data[(unsigned char) (c)] & DAP_ASCII_DIGIT) void dap_sleep( uint32_t ms ); @@ -452,15 +454,18 @@ char *dap_log_get_item(time_t a_start_time, int a_limit); DAP_PRINTF_ATTR(3, 4) void _log_it( const char * log_tag, enum dap_log_level, const char * format, ... ); +#define log_it(_log_level, ...) _log_it(LOG_TAG, _log_level, ##__VA_ARGS__) +#define debug_if(flg, lvl, ...) _log_it(((flg) ? LOG_TAG : NULL), (lvl), ##__VA_ARGS__) + +#ifdef DAP_SYS_DEBUG void _log_it_ext( const char *, unsigned, enum dap_log_level, const char * format, ... ); void _dump_it (const char *, unsigned , const char *a_var_name, const void *src, unsigned short srclen); -#ifndef SYS_DEBUG -#define log_it( _log_level, ...) _log_it( LOG_TAG, _log_level, ##__VA_ARGS__) -#else +#undef log_it #define log_it( _log_level, ...) _log_it_ext( __func__, __LINE__, (_log_level), ##__VA_ARGS__) -#endif -#define debug_if( flg, lvl, ...) _log_it( ((flg) ? LOG_TAG : NULL), (lvl), ##__VA_ARGS__) #define dump_it(v,s,l) _dump_it( __func__, __LINE__, (v), (s), (l)) +#else +#define dump_it(v,s,l) +#endif @@ -490,45 +495,17 @@ void *dap_interval_timer_create(unsigned int a_msec, dap_timer_callback_t a_call int dap_interval_timer_delete(void *a_timer); void dap_interval_timer_deinit(); -uint16_t dap_lendian_get16(const uint8_t *a_buf); -void dap_lendian_put16(uint8_t *a_buf, uint16_t a_val); -uint32_t dap_lendian_get32(const uint8_t *a_buf); -void dap_lendian_put32(uint8_t *a_buf, uint32_t a_val); -uint64_t dap_lendian_get64(const uint8_t *a_buf); -void dap_lendian_put64(uint8_t *a_buf, uint64_t a_val); - - static inline void * dap_mempcpy(void * a_dest,const void * a_src,size_t n) { return ((byte_t*) memcpy(a_dest,a_src,n))+n; } -int dap_is_alpha_and_(char e); -int dap_is_alpha(char e); -int dap_is_digit(char e); +DAP_STATIC_INLINE int dap_is_alpha(char c) { return dap_ascii_isalnum(c); } +DAP_STATIC_INLINE int dap_is_digit(char c) { return dap_ascii_isdigit(c); } +DAP_STATIC_INLINE int dap_is_alpha_and_(char c) { return dap_is_alpha(c) || c == '_'; } char **dap_parse_items(const char *a_str, char a_delimiter, int *a_count, const int a_only_digit); - - -#define CRC32_POLY (0xEDB88320) -extern const unsigned int g_crc32c_table[]; - -static inline unsigned int dap_crc32c (unsigned int crc, const void *buf, size_t buflen) -{ -const unsigned char *p = (unsigned char *) buf; - - crc = crc ^ ~0U; - - while (buflen--) - crc = g_crc32c_table[(crc ^ *p++) & 0xFF] ^ (crc >> 8); - - return crc ^ ~0U; -} - - - - - +unsigned int dap_crc32c(unsigned int crc, const void *buf, size_t buflen); #ifdef __MINGW32__ int exec_silent(const char *a_cmd); diff --git a/dap-sdk/core/include/dap_strfuncs.h b/dap-sdk/core/include/dap_strfuncs.h index a14b12dc5c..10ff8dd796 100755 --- a/dap-sdk/core/include/dap_strfuncs.h +++ b/dap-sdk/core/include/dap_strfuncs.h @@ -41,7 +41,7 @@ extern "C" { #endif #ifdef _WIN32 -char *strptime( char *buff, const char *fmt, struct tm *tm ); +char *strptime(const char *buff, const char *fmt, struct tm *tm); #endif bool dap_isstralnum(const char *c); diff --git a/dap-sdk/core/include/dap_time.h b/dap-sdk/core/include/dap_time.h index 84121fbec0..0923f1389a 100644 --- a/dap-sdk/core/include/dap_time.h +++ b/dap-sdk/core/include/dap_time.h @@ -46,6 +46,7 @@ char* dap_gdb_ctime_r(dap_nanotime_t *a_time, char* a_buf); int dap_time_to_str_rfc822(char * out, size_t out_size_max, dap_time_t t); +dap_time_t dap_time_from_str_rfc822(const char *a_time_str); int dap_gbd_time_to_str_rfc822(char *a_out, size_t a_out_size_max, dap_nanotime_t a_chain_time); int timespec_diff(struct timespec *a_start, struct timespec *a_stop, struct timespec *a_result); diff --git a/dap-sdk/core/src/dap_common.c b/dap-sdk/core/src/dap_common.c index 623a1a9d3f..469c97bba4 100755 --- a/dap-sdk/core/src/dap_common.c +++ b/dap-sdk/core/src/dap_common.c @@ -383,17 +383,8 @@ void _log_it(const char *a_log_tag, enum dap_log_level a_ll, const char *a_fmt, pthread_mutex_unlock(&s_log_mutex); } - - - - - - - - - /* CRC32-C */ -const unsigned int g_crc32c_table[] = { +static const unsigned int s_crc32c_table[] = { 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988, 0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, 0x1db71064, 0x6ab020f2, @@ -439,8 +430,17 @@ const unsigned int g_crc32c_table[] = { 0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d }; +#define CRC32_POLY (0xEDB88320) +unsigned int dap_crc32c(unsigned int crc, const void *buf, size_t buflen) +{ + const unsigned char *p = (unsigned char *) buf; + crc = crc ^ ~0U; + while (buflen--) + crc = s_crc32c_table[(crc ^ *p++) & 0xFF] ^ (crc >> 8); + return crc ^ ~0U; +} - +#ifdef DAP_SYS_DEBUG const char spaces[74] = {" "}; #define PID_FMT "%6d" @@ -499,8 +499,6 @@ struct timespec now; len = write(STDOUT_FILENO, out, olen); } - - void _dump_it ( const char *a_rtn_name, unsigned a_line_no, @@ -600,18 +598,7 @@ struct timespec now; len = write(STDOUT_FILENO, out, HEXDUMP$SZ_WIDTH); } } - - - - - - - - - - - - +#endif static int s_check_and_fill_buffer_log(char **m, struct tm *a_tm_st, char *a_tmp) { @@ -1246,91 +1233,6 @@ int dap_interval_timer_delete(void *a_timer) #endif // _WIN32 } -/** - * @brief dap_lendian_get16 Get uint16 from little endian memory - * @param a_buf a buffer read from - * @return uint16 in host endian memory - */ -uint16_t dap_lendian_get16(const uint8_t *a_buf) -{ - uint8_t u = *a_buf; - return (uint16_t)(*(a_buf + 1)) << 8 | u; -} - -/** - * @brief dap_lendian_put16 Put uint16 to little endian memory - * @param buf a buffer write to - * @param val uint16 in host endian memory - * @return none - */ -void dap_lendian_put16(uint8_t *a_buf, uint16_t a_val) -{ - *(a_buf) = a_val; - *(a_buf + 1) = a_val >> 8; -} - -/** - * @brief dap_lendian_get32 Get uint32 from little endian memory - * @param a_buf a buffer read from - * @return uint32 in host endian memory - */ -uint32_t dap_lendian_get32(const uint8_t *a_buf) -{ - uint16_t u = dap_lendian_get16(a_buf); - return (uint32_t)dap_lendian_get16(a_buf + 2) << 16 | u; -} - -/** - * @brief dap_lendian_put32 Put uint32 to little endian memory - * @param buf a buffer write to - * @param val uint32 in host endian memory - * @return none - */ -void dap_lendian_put32(uint8_t *a_buf, uint32_t a_val) -{ - dap_lendian_put16(a_buf, a_val); - dap_lendian_put16(a_buf + 2, a_val >> 16); -} - -/** - * @brief dap_lendian_get64 Get uint64 from little endian memory - * @param a_buf a buffer read from - * @return uint64 in host endian memory - */ -uint64_t dap_lendian_get64(const uint8_t *a_buf) -{ - uint32_t u = dap_lendian_get32(a_buf); - return (uint64_t)dap_lendian_get32(a_buf + 4) << 32 | u; -} - -/** - * @brief dap_lendian_put64 Put uint64 to little endian memory - * @param buf a buffer write to - * @param val uint64 in host endian memory - * @return none - */ -void dap_lendian_put64(uint8_t *a_buf, uint64_t a_val) -{ - dap_lendian_put32(a_buf, a_val); - dap_lendian_put32(a_buf + 4, a_val >> 32); -} - -int dap_is_alpha_and_(char e) -{ - if ((e >= '0' && e <= '9')||(e >= 'a' && e <= 'z')||(e >= 'A' && e <= 'Z')||(e == '_')) return 1; - return 0; -} - -int dap_is_alpha(char e) -{ - if ((e >= '0' && e <= '9')||(e >= 'a' && e <= 'z')||(e >= 'A' && e <= 'Z')) return 1; - return 0; -} -int dap_is_digit(char e) -{ - if ((e >= '0' && e <= '9')) return 1; - return 0; -} char **dap_parse_items(const char *a_str, char a_delimiter, int *a_count, const int a_only_digit) { int l_count_temp = *a_count = 0; diff --git a/dap-sdk/core/src/dap_strfuncs.c b/dap-sdk/core/src/dap_strfuncs.c index a0e88234b2..49d7c0e721 100755 --- a/dap-sdk/core/src/dap_strfuncs.c +++ b/dap-sdk/core/src/dap_strfuncs.c @@ -965,11 +965,13 @@ char* dap_strreverse(char *a_string) } #ifdef _WIN32 -char *strptime( char *buff, const char *fmt, struct tm *tm ) { - uint32_t len = strlen( buff ); - dap_sscanf( buff,"%u.%u.%u_%u.%u.%u",&tm->tm_year, &tm->tm_mon, &tm->tm_mday, &tm->tm_hour, &tm->tm_min, &tm->tm_sec ); - tm->tm_year += 2000; - return buff + len; +char *strptime(const char *buff, const char *fmt, struct tm *tm) +{ + UNUSED(fmt); + uint32_t len = strlen(buff); + dap_sscanf(buff, "%u.%u.%u_%u.%u.%u", &tm->tm_year, &tm->tm_mon, &tm->tm_mday, &tm->tm_hour, &tm->tm_min, &tm->tm_sec); + tm->tm_year += 2000; + return (char *)buff + len; } /** diff --git a/dap-sdk/core/src/dap_time.c b/dap-sdk/core/src/dap_time.c index 00e7f1f4bc..cf686bccb5 100644 --- a/dap-sdk/core/src/dap_time.c +++ b/dap-sdk/core/src/dap_time.c @@ -4,9 +4,9 @@ #include <errno.h> #include <string.h> #include <time.h> - #include "dap_common.h" #include "dap_time.h" +#include "dap_strfuncs.h" #define LOG_TAG "dap_common" @@ -157,6 +157,24 @@ int dap_time_to_str_rfc822(char * a_out, size_t a_out_size_max, dap_time_t a_t) return l_ret; } +/** + * @brief Get time_t from string with RFC822 formatted [%y%m%d = 220610 = 10 june 2022] + * @param[out] a_time_str + * @return time from string or 0 if bad time format + */ +dap_time_t dap_time_from_str_rfc822(const char *a_time_str) +{ + dap_time_t l_time = 0; + if(!a_time_str) { + return l_time; + } + struct tm l_tm; + memset(&l_tm, 0, sizeof(struct tm)); + strptime(a_time_str, "%y%m%d%H%M%S", &l_tm); + l_time = mktime(&l_tm); + return l_time; +} + /** * @brief time_to_rfc822 Convert dap_chain_time_t to string with RFC822 formatted date and time * @param[out] out Output buffer diff --git a/dap-sdk/crypto/src/dap_cert_file.c b/dap-sdk/crypto/src/dap_cert_file.c index fdbfad91bf..a4c25d7257 100755 --- a/dap-sdk/crypto/src/dap_cert_file.c +++ b/dap-sdk/crypto/src/dap_cert_file.c @@ -112,7 +112,7 @@ void dap_cert_deserialize_meta(dap_cert_t *a_cert, const uint8_t *a_data, size_t break; } l_mem_shift += strlen(l_key_str) + 1; - uint32_t l_value_size = dap_lendian_get32(&a_data[l_mem_shift]); + uint32_t l_value_size = le32toh(a_data[l_mem_shift]); l_mem_shift += sizeof(uint32_t); dap_cert_metadata_type_t l_meta_type = (dap_cert_metadata_type_t)a_data[l_mem_shift++]; const uint8_t *l_value = &a_data[l_mem_shift]; @@ -130,16 +130,16 @@ void dap_cert_deserialize_meta(dap_cert_t *a_cert, const uint8_t *a_data, size_t case 1: break; case 2: - l_tmp16 = dap_lendian_get16(l_value); + l_tmp16 = le16toh(*l_value); l_value = (const uint8_t *)&l_tmp16; break; case 4: - l_tmp32 = dap_lendian_get32(l_value); + l_tmp32 = le32toh(*l_value); l_value = (const uint8_t *)&l_tmp32; break; case 8: default: - l_tmp64 = dap_lendian_get64(l_value); + l_tmp64 = le64toh(*l_value); l_value = (const uint8_t *)&l_tmp64; break; } @@ -199,7 +199,7 @@ uint8_t *dap_cert_serialize_meta(dap_cert_t *a_cert, size_t *a_buflen_out) } strcpy((char *)&l_buf[l_mem_shift], l_meta_item->key); l_mem_shift += strlen(l_meta_item->key) + 1; - dap_lendian_put32(&l_buf[l_mem_shift], l_meta_item->length); + *(uint32_t *)&l_buf[l_mem_shift] = htole32(l_meta_item->length); l_mem_shift += sizeof(uint32_t); l_buf[l_mem_shift++] = l_meta_item->type; switch (l_meta_item->type) { @@ -215,16 +215,16 @@ uint8_t *dap_cert_serialize_meta(dap_cert_t *a_cert, size_t *a_buflen_out) l_buf[l_mem_shift++] = l_meta_item->value[0]; break; case 2: - dap_lendian_put16(&l_buf[l_mem_shift], *(uint16_t *)&l_meta_item->value[0]); + *(uint16_t *)&l_buf[l_mem_shift] = htole16(*(uint16_t *)&l_meta_item->value[0]); l_mem_shift += 2; break; case 4: - dap_lendian_put32(&l_buf[l_mem_shift], *(uint32_t *)&l_meta_item->value[0]); + *(uint32_t *)&l_buf[l_mem_shift] = htole32(*(uint32_t *)&l_meta_item->value[0]); l_mem_shift += 4; break; case 8: default: - dap_lendian_put64(&l_buf[l_mem_shift], *(uint64_t *)&l_meta_item->value[0]); + *(uint64_t *)&l_buf[l_mem_shift] = htole64(*(uint64_t *)&l_meta_item->value[0]); l_mem_shift += 8; break; } diff --git a/dap-sdk/io/dap_context.c b/dap-sdk/io/dap_context.c index 72612e42d6..a15e4c3939 100644 --- a/dap-sdk/io/dap_context.c +++ b/dap-sdk/io/dap_context.c @@ -546,16 +546,7 @@ static int s_thread_loop(dap_context_t * a_context) l_cur->callbacks.error_callback(l_cur, l_sock_err); // Call callback to process error event } - /*if (l_flag_hup) { - log_it(L_INFO, "Client socket disconnected"); - dap_events_socket_set_readable_unsafe(l_cur, false); - dap_events_socket_set_writable_unsafe(l_cur, false); - l_cur->buf_out_size = 0; - l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; - - }*/ - - if(l_flag_read) { + if (l_flag_read && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE)) { //log_it(L_DEBUG, "Comes connection with type %d", l_cur->type); if(l_cur->buf_in_size_max && l_cur->buf_in_size >= l_cur->buf_in_size_max ) { @@ -666,16 +657,10 @@ static int s_thread_loop(dap_context_t * a_context) } break; case DESCRIPTOR_TYPE_QUEUE: -#ifdef DAP_OS_WINDOWS - l_bytes_read = dap_recvfrom(l_cur->socket, NULL, 0); -#endif dap_events_socket_queue_proc_input_unsafe(l_cur); dap_events_socket_set_writable_unsafe(l_cur, false); break; case DESCRIPTOR_TYPE_EVENT: -#ifdef DAP_OS_WINDOWS - l_bytes_read = dap_recvfrom(l_cur->socket, NULL, 0); -#endif dap_events_socket_event_proc_input_unsafe(l_cur); break; } @@ -801,7 +786,8 @@ static int s_thread_loop(dap_context_t * a_context) l_bytes_sent = 0; - if (l_flag_write && (l_cur->flags & DAP_SOCK_READY_TO_WRITE) && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE)) { + if (l_flag_write && (l_cur->flags & DAP_SOCK_READY_TO_WRITE) && + !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !(l_cur->flags & DAP_SOCK_CONNECTING)) { debug_if (g_debug_reactor, L_DEBUG, "Main loop output: %zu bytes to send", l_cur->buf_out_size); /* * Socket is ready to write and not going to close @@ -811,7 +797,7 @@ static int s_thread_loop(dap_context_t * a_context) dap_events_socket_set_writable_unsafe(l_cur, false); /* Clear "enable write flag" */ if ( l_cur->callbacks.write_finished_callback ) /* Optionaly call I/O completion routine */ - l_cur->callbacks.write_finished_callback(l_cur, l_cur->callbacks.arg, l_errno); + l_cur->callbacks.write_finished_callback(l_cur, l_cur->callbacks.arg, 0); l_flag_write = 0; /* Clear flag to exclude unecessary processing of output */ } @@ -824,12 +810,14 @@ static int s_thread_loop(dap_context_t * a_context) case DESCRIPTOR_TYPE_SOCKET_CLIENT: { l_bytes_sent = send(l_cur->socket, (const char *)l_cur->buf_out, l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL); + if (l_bytes_sent == -1) #ifdef DAP_OS_WINDOWS - //dap_events_socket_set_writable_unsafe(l_cur,false); // enabling this will break windows server replies - l_errno = WSAGetLastError(); + l_errno = WSAGetLastError(); #else - l_errno = errno; + l_errno = errno; #endif + else + l_errno = 0; } break; case DESCRIPTOR_TYPE_SOCKET_UDP: @@ -853,43 +841,41 @@ static int s_thread_loop(dap_context_t * a_context) #endif } case DESCRIPTOR_TYPE_QUEUE: - if (l_cur->flags & DAP_SOCK_QUEUE_PTR && l_cur->buf_out_size>= sizeof (void*)){ + if (l_cur->flags & DAP_SOCK_QUEUE_PTR && l_cur->buf_out_size>= sizeof (void*)) { #if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) l_bytes_sent = write(l_cur->socket, l_cur->buf_out, sizeof (void *) ); // We send pointer by pointer #elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX) l_bytes_sent = mq_send(a_es->mqd, (const char *)&a_arg,sizeof (a_arg),0); #elif defined DAP_EVENTS_CAPS_MSMQ - DWORD l_mp_id = 0; - MQMSGPROPS l_mps; - MQPROPVARIANT l_mpvar[1]; - MSGPROPID l_p_id[1]; - HRESULT l_mstatus[1]; - - l_p_id[l_mp_id] = PROPID_M_BODY; - l_mpvar[l_mp_id].vt = VT_VECTOR | VT_UI1; - l_mpvar[l_mp_id].caub.pElems = l_cur->buf_out; - l_mpvar[l_mp_id].caub.cElems = (u_long)sizeof(void*); - l_mp_id++; - - l_mps.cProp = l_mp_id; - l_mps.aPropID = l_p_id; - l_mps.aPropVar = l_mpvar; - l_mps.aStatus = l_mstatus; - HRESULT hr = MQSendMessage(l_cur->mqh, &l_mps, MQ_NO_TRANSACTION); - - if (hr != MQ_OK) { - l_errno = hr; - log_it(L_ERROR, "An error occured on sending message to queue, errno: %ld", hr); - break; - } else { - l_errno = WSAGetLastError(); - - if(dap_sendto(l_cur->socket, l_cur->port, NULL, 0) == SOCKET_ERROR) { - log_it(L_ERROR, "Write to socket error: %d", WSAGetLastError()); - } - l_bytes_sent = sizeof(void*); - - } + DWORD l_mp_id = 0; + MQMSGPROPS l_mps; + MQPROPVARIANT l_mpvar[1]; + MSGPROPID l_p_id[1]; + HRESULT l_mstatus[1]; + + l_p_id[l_mp_id] = PROPID_M_BODY; + l_mpvar[l_mp_id].vt = VT_VECTOR | VT_UI1; + l_mpvar[l_mp_id].caub.pElems = l_cur->buf_out; + l_mpvar[l_mp_id].caub.cElems = (u_long)sizeof(void*); + l_mp_id++; + l_mps.cProp = l_mp_id; + l_mps.aPropID = l_p_id; + l_mps.aPropVar = l_mpvar; + l_mps.aStatus = l_mstatus; + HRESULT hr = MQSendMessage(l_cur->mqh, &l_mps, MQ_NO_TRANSACTION); + + if (hr != MQ_OK) { + l_errno = hr; + log_it(L_ERROR, "An error occured on sending message to queue, errno: %ld", hr); + break; + } else { + l_errno = WSAGetLastError(); + + if(dap_sendto(l_cur->socket, l_cur->port, NULL, 0) == SOCKET_ERROR) { + log_it(L_ERROR, "Write to socket error: %d", WSAGetLastError()); + } + l_bytes_sent = sizeof(void*); + } #elif defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE) l_bytes_sent = mq_send(l_cur->mqd , (const char *)l_cur->buf_out,sizeof (void*),0); if(l_bytes_sent == 0) @@ -1299,7 +1285,7 @@ int dap_context_add(dap_context_t * a_context, dap_events_socket_t * a_es ) #else #error "Unimplemented new esocket on context callback for current platform" #endif -lb_exit: +//lb_exit: if ( l_is_error ){ char l_errbuf[128]; l_errbuf[0]=0; diff --git a/dap-sdk/io/dap_events_socket.c b/dap-sdk/io/dap_events_socket.c index 8da9918f4c..5ba1761611 100644 --- a/dap-sdk/io/dap_events_socket.c +++ b/dap-sdk/io/dap_events_socket.c @@ -536,6 +536,13 @@ dap_events_socket_t * dap_events_socket_create_type_queue_ptr_mt(dap_worker_t * */ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) { +#ifdef DAP_OS_WINDOWS + int l_read = dap_recvfrom(a_esocket->socket, a_esocket->buf_in, a_esocket->buf_in_size_max); + if (l_read == SOCKET_ERROR) { + log_it(L_ERROR, "Queue socket %zu received invalid data, error %d", a_esocket->socket, WSAGetLastError()); + return -1; + } +#endif if (a_esocket->callbacks.queue_callback){ if (a_esocket->flags & DAP_SOCK_QUEUE_PTR){ void * l_queue_ptr = NULL; @@ -599,7 +606,7 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) MQPROPVARIANT l_mpvar[2]; MSGPROPID l_p_id[2]; - UCHAR l_body[1024] = { 0 }; + UCHAR l_body[64] = { 0 }; l_p_id[l_mp_id] = PROPID_M_BODY; l_mpvar[l_mp_id].vt = VT_UI1 | VT_VECTOR; l_mpvar[l_mp_id].caub.cElems = sizeof(l_body); @@ -615,22 +622,22 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) l_mps.aPropVar = l_mpvar; l_mps.aStatus = NULL; - HRESULT hr = MQReceiveMessage(a_esocket->mqh_recv, 1000, MQ_ACTION_RECEIVE, &l_mps, NULL, NULL, NULL, MQ_NO_TRANSACTION); - if (hr != MQ_OK) { - log_it(L_ERROR, "An error %ld occured receiving a message from queue", hr); - return -1; - } - if (l_mpvar[1].ulVal % sizeof(void*)) { - log_it(L_ERROR, "Queue message size incorrect: %lu", l_mpvar[1].ulVal); - if (l_mpvar[1].ulVal < sizeof(void*)) { - log_it(L_ERROR, "Queue socket %zu received invalid data", a_esocket->socket); - return -1; + HRESULT hr; + while ((hr = MQReceiveMessage(a_esocket->mqh_recv, 0, MQ_ACTION_RECEIVE, &l_mps, NULL, NULL, NULL, MQ_NO_TRANSACTION)) + != MQ_ERROR_IO_TIMEOUT) { + if (hr != MQ_OK) { + log_it(L_ERROR, "An error %ld occured receiving a message from queue", hr); + return -3; + } + debug_if(g_debug_reactor, L_DEBUG, "Received msg: %p len %lu", *(void **)l_body, l_mpvar[1].ulVal); + if (l_mpvar[1].ulVal != sizeof(void*)) { + log_it(L_ERROR, "Queue message size incorrect: %lu", l_mpvar[1].ulVal); + continue; + } + if (a_esocket->callbacks.queue_ptr_callback) { + l_queue_ptr = *(void **)l_body; + a_esocket->callbacks.queue_ptr_callback(a_esocket, l_queue_ptr); } - } - for (u_int pad = 0; pad < l_mpvar[1].ulVal; pad += sizeof(void*)) { - memcpy(&l_queue_ptr, l_body + pad, sizeof(void*)); - if(a_esocket->callbacks.queue_ptr_callback) - a_esocket->callbacks.queue_ptr_callback (a_esocket, l_queue_ptr); } #elif defined DAP_EVENTS_CAPS_KQUEUE l_queue_ptr = (void*) a_esocket->kqueue_event_catched_data.data; @@ -642,26 +649,20 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) #error "No Queue fetch mechanism implemented on your platform" #endif } else { -#ifdef DAP_OS_WINDOWS - int l_read = dap_recvfrom(a_esocket->socket, a_esocket->buf_in, a_esocket->buf_in_size_max); - if (l_read == SOCKET_ERROR) { - log_it(L_ERROR, "Queue socket %zu received invalid data, error %d", a_esocket->socket, WSAGetLastError()); - return -1; - } -#elif defined (DAP_EVENTS_CAPS_KQUEUE) - void * l_queue_ptr = a_esocket->kqueue_event_catched_data.data; - size_t l_queue_ptr_size = a_esocket->kqueue_event_catched_data.size; - if(g_debug_reactor) - log_it(L_INFO,"Queue received %zd bytes on input", l_queue_ptr_size); - - a_esocket->callbacks.queue_callback(a_esocket, l_queue_ptr, l_queue_ptr_size); -#else +#ifdef DAP_EVENTS_CAPS_KQUEUE + void * l_queue_ptr = a_esocket->kqueue_event_catched_data.data; + size_t l_queue_ptr_size = a_esocket->kqueue_event_catched_data.size; + if(g_debug_reactor) + log_it(L_INFO,"Queue received %zd bytes on input", l_queue_ptr_size); + + a_esocket->callbacks.queue_callback(a_esocket, l_queue_ptr, l_queue_ptr_size); +#elif !defined(DAP_OS_WINDOWS) size_t l_read = read(a_esocket->socket, a_esocket->buf_in, a_esocket->buf_in_size_max ); #endif } }else{ log_it(L_ERROR, "Queue socket %"DAP_FORMAT_SOCKET" accepted data but callback is NULL ", a_esocket->socket); - return -1; + return -2; } return 0; } @@ -1613,6 +1614,8 @@ size_t dap_events_socket_write_f_mt(dap_worker_t * a_w,dap_events_socket_uuid_t */ size_t dap_events_socket_write_unsafe(dap_events_socket_t *a_es, const void * a_data, size_t a_data_size) { + if (a_es->flags & DAP_SOCK_SIGNAL_CLOSE) + return 0; if ( (a_es->buf_out_size + a_data_size) > a_es->buf_out_size_max) { if ((a_es->buf_out_size_max + a_data_size) > DAP_EVENTS_SOCKET_BUF_LIMIT) { log_it(L_ERROR, "Write esocket (%p) buffer overflow size=%zu/max=%zu", a_es, a_es->buf_out_size_max, (size_t)DAP_EVENTS_SOCKET_BUF_LIMIT); diff --git a/dap-sdk/io/dap_proc_queue.c b/dap-sdk/io/dap_proc_queue.c index 95f2247e0f..da7c97acf0 100644 --- a/dap-sdk/io/dap_proc_queue.c +++ b/dap-sdk/io/dap_proc_queue.c @@ -179,31 +179,6 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg) DAP_DELETE(l_msg); } -/** - * @brief dap_proc_queue_add_callback - * @param a_worker - * @param a_callback - * @param a_callback_arg - * @return: -ENOMEM in case of memory allocation error - * other <errno> codes from the internaly called routine - */ -int dap_proc_queue_add_callback(dap_worker_t * a_worker,dap_proc_queue_callback_t a_callback, void * a_callback_arg) -{ - dap_proc_queue_msg_t * l_msg = DAP_NEW_Z(dap_proc_queue_msg_t); - - if (!l_msg) - return -ENOMEM; - - l_msg->callback = a_callback; - l_msg->callback_arg = a_callback_arg; - l_msg->pri = DAP_PROC_PRI_NORMAL; - /* - * Send message to queue with the DEFAULT priority - */ - return dap_events_socket_queue_ptr_send( a_worker->proc_queue->esocket , l_msg ); -} - - /** * @brief dap_proc_queue_add_callback * @param a_worker @@ -231,36 +206,13 @@ dap_proc_queue_msg_t *l_msg; l_msg->callback_arg = a_callback_arg; l_msg->pri = a_pri; + debug_if(g_debug_reactor, L_DEBUG, "Requested l_msg:%p, callback: %p/%p, pri: %d", l_msg, l_msg->callback, l_msg->callback_arg, l_msg->pri); /* * Send message to queue with the given priority */ return dap_events_socket_queue_ptr_send ( a_worker->proc_queue->esocket , l_msg); } - -/** - * @brief dap_proc_queue_add_callback_inter - * @param a_es_input - * @param a_callback - * @param a_callback_arg - * @return: -ENOMEM in case of memory allocation error - * other <errno> codes from the internaly called routine - */ -int dap_proc_queue_add_callback_inter( dap_events_socket_t * a_es_input, dap_proc_queue_callback_t a_callback, void * a_callback_arg) -{ - dap_proc_queue_msg_t * l_msg = DAP_NEW_Z(dap_proc_queue_msg_t); - - if (!l_msg) - return -ENOMEM; - - l_msg->callback = a_callback; - l_msg->callback_arg = a_callback_arg; - l_msg->pri = DAP_PROC_PRI_NORMAL; - - return dap_events_socket_queue_ptr_send_to_input( a_es_input , l_msg ); -} - - /** * @brief dap_proc_queue_add_callback_inter * @param a_es_input @@ -287,6 +239,7 @@ dap_proc_queue_msg_t *l_msg; l_msg->callback_arg = a_callback_arg; l_msg->pri = a_pri; + debug_if(g_debug_reactor, L_DEBUG, "Requested inter l_msg:%p, callback: %p/%p, pri: %d", l_msg, l_msg->callback, l_msg->callback_arg, l_msg->pri); return dap_events_socket_queue_ptr_send_to_input( a_es_input , l_msg ); } diff --git a/dap-sdk/io/dap_timerfd.c b/dap-sdk/io/dap_timerfd.c index 5b1f39b608..4d9b6f0fc9 100644 --- a/dap-sdk/io/dap_timerfd.c +++ b/dap-sdk/io/dap_timerfd.c @@ -196,14 +196,7 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t l_events_socket->kqueue_data =(int64_t) a_timeout_ms; #endif - #elif defined (DAP_OS_WINDOWS) - /*HANDLE l_th = CreateWaitableTimer(NULL, true, NULL); - if (!l_th) { - log_it(L_CRITICAL, "Waitable timer not created, error %d", GetLastError()); - DAP_DELETE(l_timerfd); - return NULL; - }*/ l_timerfd->th = NULL; SOCKET l_tfd = socket(AF_INET, SOCK_DGRAM, 0); int buffsize = 1024; @@ -218,7 +211,7 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t l_addr.sin_family = AF_INET; IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } }; l_addr.sin_addr = _in_addr; - l_addr.sin_port = 0; //l_tfd + 32768; + l_addr.sin_port = l_tfd + 32768; l_addr_len = sizeof(struct sockaddr_in); if (bind(l_tfd, (struct sockaddr*)&l_addr, sizeof(l_addr)) < 0) { log_it(L_ERROR, "Bind error: %d", WSAGetLastError()); @@ -229,16 +222,8 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t //log_it(L_DEBUG, "Bound to port %d", l_addr.sin_port); } - /*LARGE_INTEGER l_due_time; - l_due_time.QuadPart = (long long)a_timeout_ms * _MSEC; - if (!SetWaitableTimer(l_th, &l_due_time, 0, TimerAPCb, l_timerfd, false)) { - log_it(L_CRITICAL, "Waitable timer not set, error %d", GetLastError()); - CloseHandle(l_th); - DAP_DELETE(l_timerfd); - return NULL; - } */ if (!CreateTimerQueueTimer(&l_timerfd->th, hTimerQueue, - (WAITORTIMERCALLBACK)TimerRoutine, l_timerfd, (unsigned long)a_timeout_ms, 0, 0)) { + (WAITORTIMERCALLBACK)TimerRoutine, l_timerfd, (DWORD)a_timeout_ms, 0, 0)) { log_it(L_CRITICAL, "Timer not set, error %lu", GetLastError()); DAP_DELETE(l_timerfd); return NULL; @@ -247,11 +232,8 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t #endif #if defined (DAP_OS_LINUX) || defined (DAP_OS_WINDOWS) - l_timerfd->tfd = l_tfd; + l_timerfd->tfd = l_tfd; #endif -//#ifdef DAP_OS_WINDOWS - //l_timerfd->th = l_th; -//#endif return l_timerfd; } @@ -274,12 +256,12 @@ static inline void s_timerfd_reset(dap_timerfd_t *a_timerfd, dap_events_socket_t a_es->context = NULL; dap_context_add(l_context,a_es); #elif defined (DAP_OS_WINDOWS) - /*LARGE_INTEGER l_due_time; - l_due_time.QuadPart = (long long)a_timerfd->timeout_ms * _MSEC; - if (!SetWaitableTimer(a_timerfd->th, &l_due_time, 0, TimerAPCb, a_timerfd, false)) { - log_it(L_CRITICAL, "Waitable timer not reset, error %d", GetLastError()); - CloseHandle(a_timerfd->th); - }*/ // Wtf is this entire thing for?... + // Doesn't work with one-shot timers + //if (!ChangeTimerQueueTimer(hTimerQueue, a_timerfd->th, (DWORD)a_timerfd->timeout_ms, 0)) + DeleteTimerQueueTimer(hTimerQueue, a_timerfd->th, NULL); + if (!CreateTimerQueueTimer(&a_timerfd->th, hTimerQueue, + (WAITORTIMERCALLBACK)TimerRoutine, a_timerfd, (DWORD)a_timerfd->timeout_ms, 0, 0)) + log_it(L_CRITICAL, "Timer not reset, error %lu", GetLastError()); #else #error "No timer reset realization for your platform" #endif @@ -289,9 +271,6 @@ static inline void s_timerfd_reset(dap_timerfd_t *a_timerfd, dap_events_socket_t #endif } - - - /** * @brief s_es_callback_timer * @param a_event_sock @@ -302,7 +281,10 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock) // run user's callback if(l_timerfd->callback && l_timerfd->callback(l_timerfd->callback_arg)) { s_timerfd_reset(l_timerfd, a_event_sock); - } else { + } else { +#ifdef _WIN32 + DeleteTimerQueueTimer(hTimerQueue, l_timerfd->th, NULL); +#endif l_timerfd->events_socket->flags |= DAP_SOCK_SIGNAL_CLOSE; } } @@ -363,10 +345,10 @@ void dap_timerfd_reset(dap_timerfd_t *a_timerfd) void dap_timerfd_delete(dap_timerfd_t *a_timerfd) { if (!a_timerfd) - return; - #ifdef _WIN32 - DeleteTimerQueueTimer(hTimerQueue, (HANDLE)a_timerfd->th, NULL); - #endif + return; +#ifdef _WIN32 + DeleteTimerQueueTimer(hTimerQueue, l_timerfd->th, NULL); +#endif if (a_timerfd->events_socket->context->worker) dap_events_socket_remove_and_delete_mt(a_timerfd->events_socket->context->worker, a_timerfd->esocket_uuid); } diff --git a/dap-sdk/io/include/dap_events_socket.h b/dap-sdk/io/include/dap_events_socket.h index 93763d783b..e95ff88cca 100644 --- a/dap-sdk/io/include/dap_events_socket.h +++ b/dap-sdk/io/include/dap_events_socket.h @@ -159,7 +159,7 @@ typedef struct dap_events_socket_callbacks { void *arg; /* Callbacks argument */ } dap_events_socket_callbacks_t; -#define DAP_STREAM_PKT_SIZE_MAX (1 * 1024 * 1024) +#define DAP_STREAM_PKT_SIZE_MAX (2 * 1024 * 1024) #define DAP_EVENTS_SOCKET_BUF DAP_STREAM_PKT_SIZE_MAX #define DAP_EVENTS_SOCKET_BUF_LIMIT (DAP_STREAM_PKT_SIZE_MAX * 4) #define DAP_QUEUE_MAX_MSGS 512 diff --git a/dap-sdk/io/include/dap_proc_queue.h b/dap-sdk/io/include/dap_proc_queue.h index 97ca1f2628..756d949b44 100644 --- a/dap-sdk/io/include/dap_proc_queue.h +++ b/dap-sdk/io/include/dap_proc_queue.h @@ -72,10 +72,16 @@ dap_proc_queue_t *dap_proc_queue_create(dap_proc_thread_t * a_thread); dap_proc_queue_t *dap_proc_queue_create_ext(dap_proc_thread_t * a_thread); int dap_proc_queue_delete(dap_proc_queue_t * a_queue); -int dap_proc_queue_add_callback(dap_worker_t * a_worker, dap_proc_queue_callback_t a_callback, void * a_callback_arg); -int dap_proc_queue_add_callback_inter(dap_events_socket_t * a_es_input, dap_proc_queue_callback_t a_callback, void * a_callback_arg); int dap_proc_queue_add_callback_ext(dap_worker_t * a_worker, dap_proc_queue_callback_t a_callback, void * a_callback_arg, int a_pri); int dap_proc_queue_add_callback_inter_ext(dap_events_socket_t * a_es_input, dap_proc_queue_callback_t a_callback, void * a_callback_arg, int ); +DAP_STATIC_INLINE int dap_proc_queue_add_callback(dap_worker_t *a_worker, dap_proc_queue_callback_t a_callback, void *a_callback_arg) +{ + return dap_proc_queue_add_callback_ext(a_worker, a_callback, a_callback_arg, DAP_PROC_PRI_NORMAL); +} +DAP_STATIC_INLINE int dap_proc_queue_add_callback_inter(dap_events_socket_t *a_es_input, dap_proc_queue_callback_t a_callback, void *a_callback_arg) +{ + return dap_proc_queue_add_callback_inter_ext(a_es_input, a_callback, a_callback_arg, DAP_PROC_PRI_NORMAL); +} int dap_proc_thread_add_callback_mt(dap_proc_thread_t * a_thread, dap_proc_queue_callback_t a_callback, void * a_callback_arg, int a_pri); diff --git a/modules/chain/dap_chain_ledger.c b/modules/chain/dap_chain_ledger.c index f9fd5b30ac..3e1bc0c462 100644 --- a/modules/chain/dap_chain_ledger.c +++ b/modules/chain/dap_chain_ledger.c @@ -378,7 +378,7 @@ int dap_chain_ledger_token_decl_add_check(dap_ledger_t *a_ledger, dap_chain_datu if (l_signs_approve == a_token->signs_valid){ return 0; } else { - log_it(L_WARNING, "The token declaration has %zu valid signatures out of %zu.", l_signs_approve, a_token->signs_valid); + log_it(L_WARNING, "The token declaration has %zu valid signatures out of %hu.", l_signs_approve, a_token->signs_valid); return -5; } } else { diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 181a9d2f1b..8b7fe73a61 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -95,7 +95,7 @@ struct sync_request }; static void s_ch_chain_go_idle(dap_stream_ch_chain_t *a_ch_chain); -static bool s_ch_chain_get_idle(dap_stream_ch_chain_t *a_ch_chain); +static inline bool s_ch_chain_get_idle(dap_stream_ch_chain_t *a_ch_chain) { return a_ch_chain->state == CHAIN_STATE_IDLE; } static void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg); static void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg); @@ -116,6 +116,10 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_thread, void *a_arg); static void s_free_log_list_gdb ( dap_stream_ch_chain_t * a_ch_chain); +static void s_stream_ch_chain_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id, + uint64_t a_chain_id, uint64_t a_cell_id, + const void * a_data, size_t a_data_size); + static bool s_debug_more=false; static uint_fast16_t s_update_pack_size=100; // Number of hashes packed into the one packet static uint_fast16_t s_skip_in_reactor_count=50; // Number of hashes packed to skip in one reactor loop callback out packet @@ -159,7 +163,6 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg) a_ch->internal = DAP_NEW_Z(dap_stream_ch_chain_t); dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); l_ch_chain->_inheritor = a_ch; - pthread_rwlock_init(&l_ch_chain->idle_lock, NULL); a_ch->stream->esocket->callbacks.write_finished_callback = s_stream_ch_io_complete; } @@ -178,7 +181,6 @@ static void s_stream_ch_delete_in_proc(dap_worker_t *a_worker, void *a_arg) l_ch_chain->callback_notify_arg); s_ch_chain_go_idle(l_ch_chain); s_free_log_list_gdb(l_ch_chain); - pthread_rwlock_destroy(&l_ch_chain->idle_lock); DAP_DELETE(l_ch_chain); } @@ -624,7 +626,7 @@ static void s_gdb_sync_tsd_worker_callback(dap_worker_t *a_worker, void *a_arg) memcpy(l_data_ptr, &l_sync_request->request.id_end, sizeof(uint64_t)); l_data_ptr += sizeof(uint64_t); memcpy(l_data_ptr, l_sync_request->gdb.sync_group, l_gr_len); - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD, + s_stream_ch_chain_pkt_write(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD, l_sync_request->request_hdr.net_id.uint64, l_sync_request->request_hdr.chain_id.uint64, l_sync_request->request_hdr.cell_id.uint64, @@ -860,7 +862,7 @@ static bool s_chain_timer_callback(void *a_arg) return false; } dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); - if (l_ch_chain->timer_shots++ >= 3) { + if (l_ch_chain->timer_shots++ >= 500) { // 500 * 20 = 10 sec if (!s_ch_chain_get_idle(l_ch_chain)) { s_ch_chain_go_idle(l_ch_chain); if (l_ch_chain->callback_notify_packet_out) @@ -872,13 +874,17 @@ static bool s_chain_timer_callback(void *a_arg) DAP_DELETE(a_arg); l_ch_chain->activity_timer = NULL; return false; - } + } + if (l_ch_chain->state != CHAIN_STATE_WAITING && l_ch_chain->sent_breaks) + s_stream_ch_packet_out(l_ch, NULL); // Sending dumb packet with nothing to inform remote thats we're just skiping atoms of GDB's, nothing freezed - if (!l_ch_chain->timer_shots && l_ch_chain->state == CHAIN_STATE_SYNC_CHAINS) + if (l_ch_chain->state == CHAIN_STATE_SYNC_CHAINS && l_ch_chain->sent_breaks >= 150) { // 150 * 20 = 3 sec dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); - if (!l_ch_chain->timer_shots && l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB) { + l_ch_chain->sent_breaks = 0; + } + if (l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB && l_ch_chain->sent_breaks >= 150) { // 150 * 20 = 3 sec if (s_debug_more) log_it(L_INFO, "Send one global_db TSD packet (rest=%zu/%zu items)", dap_db_log_list_get_count_rest(l_ch_chain->request_db_log), @@ -886,9 +892,8 @@ static bool s_chain_timer_callback(void *a_arg) dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); + l_ch_chain->sent_breaks = 0; } - if (l_ch_chain->state != CHAIN_STATE_WAITING) - s_stream_ch_packet_out(l_ch, NULL); return true; } @@ -906,7 +911,7 @@ void dap_stream_ch_chain_timer_start(dap_stream_ch_chain_t *a_ch_chain) { dap_stream_ch_uuid_t *l_uuid = DAP_DUP(&DAP_STREAM_CH(a_ch_chain)->uuid); a_ch_chain->activity_timer = dap_timerfd_start_on_worker(DAP_STREAM_CH(a_ch_chain)->stream_worker->worker, - 3000, s_chain_timer_callback, (void *)l_uuid); + 20, s_chain_timer_callback, (void *)l_uuid); } /** @@ -1390,7 +1395,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { - if (dap_log_level_get()<= L_INFO){ + if (dap_log_level_get() <= L_INFO) { char *l_hash_from_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_from); char *l_hash_to_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_to); log_it(L_INFO, "In: SYNCED_CHAINS: between %s and %s",l_hash_from_str?l_hash_from_str:"(null)", @@ -1403,7 +1408,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if (!l_ch_chain->callback_notify_packet_in) { // we haven't node client waitng, so reply to other side dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if (!l_chain) { - log_it(L_ERROR, "Invalid UPDATE_CHAINS packet from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x" chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", a_ch->stream->esocket->remote_addr_str? + log_it(L_ERROR, "Invalid SYNCED_CHAINS packet from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x" chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", a_ch->stream->esocket->remote_addr_str? a_ch->stream->esocket->remote_addr_str: "<unknown>", l_chain_pkt->hdr.ext_id, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); @@ -1496,13 +1501,10 @@ static void s_free_log_list_gdb ( dap_stream_ch_chain_t * a_ch_chain) */ static void s_ch_chain_go_idle(dap_stream_ch_chain_t *a_ch_chain) { - //pthread_rwlock_wrlock(&a_ch_chain->idle_lock); if (a_ch_chain->state == CHAIN_STATE_IDLE) { - //pthread_rwlock_unlock(&a_ch_chain->idle_lock); return; } a_ch_chain->state = CHAIN_STATE_IDLE; - //pthread_rwlock_unlock(&a_ch_chain->idle_lock); if(s_debug_more) log_it(L_INFO, "Go in CHAIN_STATE_IDLE"); @@ -1526,14 +1528,6 @@ static void s_ch_chain_go_idle(dap_stream_ch_chain_t *a_ch_chain) a_ch_chain->remote_atoms = NULL; } -static bool s_ch_chain_get_idle(dap_stream_ch_chain_t *a_ch_chain) -{ - //pthread_rwlock_wrlock(&a_ch_chain->idle_lock); - bool ret = a_ch_chain->state == CHAIN_STATE_IDLE; - //pthread_rwlock_unlock(&a_ch_chain->idle_lock); - return ret; -} - struct chain_io_complete { dap_stream_ch_uuid_t ch_uuid; dap_stream_ch_chain_state_t state; @@ -1549,23 +1543,33 @@ static void s_stream_ch_io_complete(dap_events_socket_t *a_es, void *a_arg, int { if (a_errno) return; - if (!a_arg) { - if (a_es->callbacks.write_callback) - a_es->callbacks.write_callback(a_es, NULL); - return; - } - struct chain_io_complete *l_arg = (struct chain_io_complete *)a_arg; + dap_stream_t *l_stream = NULL; dap_client_pvt_t *l_client_pvt = DAP_ESOCKET_CLIENT_PVT(a_es); - if (l_client_pvt->stream) { - dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(l_client_pvt->stream->stream_worker, l_arg->ch_uuid); - if (l_ch) { - DAP_STREAM_CH_CHAIN(l_ch)->state = l_arg->state; - dap_stream_ch_chain_pkt_write_unsafe(l_ch, l_arg->type, l_arg->net_id, l_arg->chain_id, - l_arg->cell_id, l_arg->data, l_arg->data_size); - } + if (l_client_pvt && dap_client_pvt_find(l_client_pvt->uuid) == l_client_pvt) + l_stream = l_client_pvt->stream; + else { + dap_http_client_t *l_http_client = DAP_HTTP_CLIENT(a_es); + if (l_http_client) + l_stream = DAP_STREAM(l_http_client); } - a_es->callbacks.arg = NULL; - DAP_DELETE(a_arg); + if (!l_stream) + return; + dap_stream_ch_t *l_ch = NULL; + for (size_t i = 0; i < l_stream->channel_count; i++) + if (l_stream->channel[i]->proc->id == dap_stream_ch_chain_get_id()) + l_ch = l_stream->channel[i]; + if (!l_ch) + return; + if (a_arg) { + struct chain_io_complete *l_arg = (struct chain_io_complete *)a_arg; + DAP_STREAM_CH_CHAIN(l_ch)->state = l_arg->state; + dap_stream_ch_chain_pkt_write_unsafe(l_ch, l_arg->type, l_arg->net_id, l_arg->chain_id, + l_arg->cell_id, l_arg->data, l_arg->data_size); + a_es->callbacks.arg = NULL; + DAP_DELETE(a_arg); + return; + } + s_stream_ch_packet_out(l_ch, NULL); } static void s_stream_ch_chain_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id, @@ -1581,6 +1585,7 @@ static void s_stream_ch_chain_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, u l_arg->state = DAP_STREAM_CH_CHAIN(a_ch)->state; DAP_STREAM_CH_CHAIN(a_ch)->state = CHAIN_STATE_WAITING; l_arg->type = a_type; + l_arg->net_id = a_net_id; l_arg->chain_id = a_chain_id; l_arg->cell_id = a_cell_id; l_arg->data_size = a_data_size; @@ -1601,9 +1606,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) UNUSED(a_arg); dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); - bool l_go_idle = false; - bool l_timer_reset = false; - //pthread_rwlock_rdlock(&l_ch_chain->idle_lock); + bool l_go_idle = false, l_was_sent_smth = false; switch (l_ch_chain->state) { // Update list of global DB records to remote case CHAIN_STATE_UPDATE_GLOBAL_DB: { @@ -1614,11 +1617,11 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_obj = dap_db_log_list_get(l_ch_chain->request_db_log); if (!l_obj || DAP_POINTER_TO_SIZE(l_obj) == 1) break; - l_timer_reset = true; memcpy(&l_data[i].hash, &l_obj->hash, sizeof(dap_chain_hash_fast_t)); l_data[i].size = l_obj->pkt->data_size; } if (i) { + l_was_sent_smth = true; s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, @@ -1627,6 +1630,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) if (s_debug_more) log_it(L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB"); } else if (!l_obj) { + l_was_sent_smth = true; l_ch_chain->request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(dap_chain_net_by_id( l_ch_chain->request_hdr.net_id)); s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END, @@ -1680,7 +1684,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) } } if (l_pkt_size) { - l_timer_reset = true; + l_was_sent_smth = true; // If request was from defined node_addr we update its state if (s_debug_more) log_it(L_INFO, "Send one global_db packet len=%zu (rest=%zu/%zu items)", l_pkt_size, @@ -1691,6 +1695,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_hdr.cell_id.uint64, l_pkt, l_pkt_size); DAP_DELETE(l_pkt); } else if (!l_obj) { + l_was_sent_smth = true; log_it( L_INFO,"Syncronized database: items syncronyzed %"DAP_UINT64_FORMAT_U" from %zu", l_ch_chain->stats_request_gdb_processed, dap_db_log_list_get_count(l_ch_chain->request_db_log)); // last message @@ -1707,7 +1712,6 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) // Update list of atoms to remote case CHAIN_STATE_UPDATE_CHAINS:{ - l_timer_reset = true; dap_stream_ch_chain_update_element_t *l_data = DAP_NEW_Z_SIZE(dap_stream_ch_chain_update_element_t, sizeof(dap_stream_ch_chain_update_element_t) * s_update_pack_size); size_t l_data_size=0; @@ -1719,6 +1723,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); } if (l_data_size){ + l_was_sent_smth = true; if(s_debug_more) log_it(L_DEBUG,"Out: UPDATE_CHAINS with %zu hashes sent", l_data_size / sizeof(dap_stream_ch_chain_update_element_t)); s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS, @@ -1728,6 +1733,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_data,l_data_size); } if(!l_data_size || !l_ch_chain->request_atom_iter){ // We over with all the hashes here + l_was_sent_smth = true; if(s_debug_more) log_it(L_INFO,"Out: UPDATE_CHAINS_END sent "); dap_stream_ch_chain_sync_request_t l_request = {}; @@ -1744,7 +1750,6 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) // Synchronize chains case CHAIN_STATE_SYNC_CHAINS: { - bool l_was_sent_smth=false; // Process one chain from l_ch_chain->request_atom_iter // Pack loop to skip quicker for(uint_fast16_t k=0; k<s_skip_in_reactor_count && @@ -1800,16 +1805,14 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, NULL, 0, l_ch_chain->callback_notify_arg); } - if (!l_was_sent_smth) - l_ch_chain->timer_shots = -1; - else - l_timer_reset = true; } break; default: break; } - //pthread_rwlock_unlock(&l_ch_chain->idle_lock); if (l_go_idle) s_ch_chain_go_idle(l_ch_chain); - else if (l_timer_reset) - s_chain_timer_reset(l_ch_chain); + if (l_was_sent_smth) + l_ch_chain->sent_breaks = 0; + else + l_ch_chain->sent_breaks++; + s_chain_timer_reset(l_ch_chain); } diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index a543463afb..9aad8bb539 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -77,7 +77,7 @@ typedef struct dap_stream_ch_chain { int timer_shots; dap_timerfd_t *activity_timer; - pthread_rwlock_t idle_lock; + int sent_breaks; dap_stream_ch_chain_callback_packet_t callback_notify_packet_out; dap_stream_ch_chain_callback_packet_t callback_notify_packet_in; diff --git a/modules/consensus/block-ton/dap_chain_cs_block_ton.c b/modules/consensus/block-ton/dap_chain_cs_block_ton.c index 47e910b967..369c0b91e1 100644 --- a/modules/consensus/block-ton/dap_chain_cs_block_ton.c +++ b/modules/consensus/block-ton/dap_chain_cs_block_ton.c @@ -440,7 +440,7 @@ static bool s_session_timer() { if ( l_session->time_proc_lock ) { continue; } - pthread_rwlock_rdlock(&l_session->rwlock); + pthread_rwlock_wrlock(&l_session->rwlock); l_session->time_proc_lock = true; // lock - skip check by reasons: prev check is not finish switch (l_session->state) { case DAP_STREAM_CH_CHAIN_SESSION_STATE_IDLE: { @@ -1397,7 +1397,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod goto handler_finish; } - pthread_rwlock_rdlock(&l_session->rwlock); + pthread_rwlock_wrlock(&l_session->rwlock); // stor for new candidate size_t l_store_size = sizeof(dap_chain_cs_block_ton_store_hdr_t)+a_data_size; dap_chain_cs_block_ton_store_t *l_store = @@ -1491,7 +1491,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod l_session, DAP_TON$ROUND_CUR, DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_REJECT, l_candidate_hash, NULL); l_reject_count++; - if ( ((float)l_reject_count/l_session->cur_round.validators_count) >= ((float)2/3) ) { + if (l_reject_count * 3 >= l_session->cur_round.validators_count * 2) { dap_chain_global_db_gr_del(dap_strdup(l_candidate_hash_str), l_session->gdb_group_store); dap_chain_hash_fast_t l_my_candidate_hash; dap_hash_fast(l_session->my_candidate, l_session->my_candidate_size, &l_my_candidate_hash); @@ -1539,7 +1539,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod l_session, DAP_TON$ROUND_CUR, DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_APPROVE, l_candidate_hash, NULL); l_approve_count++; - if ( ((float)l_approve_count/l_session->cur_round.validators_count) >= ((float)2/3) ) { + if (l_approve_count * 3 >= l_session->cur_round.validators_count * 2) { if (PVT(l_session->ton)->debug) log_it(L_MSG, "TON: net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U" attempt:%hu Candidate:%s collected approve more than 2/3 of the validators", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, @@ -1730,7 +1730,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod l_session, DAP_TON$ROUND_CUR, DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_VOTE, l_candidate_hash, &l_attempt_number); l_vote_count++; - if ( ((float)l_vote_count/l_session->cur_round.validators_count) >= ((float)2/3) ) { + if (l_vote_count * 3 >= l_session->cur_round.validators_count * 2) { size_t l_store_size = 0; dap_chain_cs_block_ton_store_t *l_store = (dap_chain_cs_block_ton_store_t *)dap_chain_global_db_gr_get( @@ -1782,7 +1782,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id.uint64, l_session->attempt_current_number, l_candidate_hash_str); - pthread_rwlock_rdlock(&l_session->rwlock); + pthread_rwlock_wrlock(&l_session->rwlock); uint16_t l_attempt_number = l_session->attempt_current_number; uint16_t l_precommit_count = s_session_message_count( l_session, DAP_TON$ROUND_CUR, DAP_STREAM_CH_CHAIN_MESSAGE_TYPE_PRE_COMMIT, diff --git a/modules/global-db/dap_chain_global_db.c b/modules/global-db/dap_chain_global_db.c index a65694a527..fc689f1541 100644 --- a/modules/global-db/dap_chain_global_db.c +++ b/modules/global-db/dap_chain_global_db.c @@ -852,7 +852,6 @@ static void s_dap_chain_global_db_request_processor(dap_events_socket_t *a_es, v int dap_chain_global_db_init(dap_config_t * g_config) { int l_rc; -static int is_check_version = 0; const char *l_storage_path = dap_config_get_item_str(g_config, "resources", "dap_global_db_path"); const char *l_driver_name = dap_config_get_item_str_default(g_config, "resources", "dap_global_db_driver", "sqlite"); @@ -868,16 +867,9 @@ static int is_check_version = 0; if( (l_rc = dap_db_driver_init(l_driver_name, l_storage_path, s_db_drvmode_async)) ) return log_it(L_CRITICAL, "Hadn't initialized DB driver \"%s\" on path \"%s\", code: %d", l_driver_name, l_storage_path, l_rc), l_rc; - if(!is_check_version){ - - is_check_version = true; - - if ( (l_rc = s_check_db_version(g_config)) ) - return log_it(L_ERROR, "GlobalDB version changed, please export or remove old version!"), l_rc; - } + //if ( (l_rc = s_check_db_version(g_config)) ) + // return log_it(L_ERROR, "GlobalDB version changed, please export or remove old version!"), l_rc; log_it(L_NOTICE, "GlobalDB initialized"); - - l_rc = 0; - return l_rc; + return 0; } diff --git a/modules/global-db/dap_chain_global_db_driver_mdbx.c b/modules/global-db/dap_chain_global_db_driver_mdbx.c index 4b5b2529c6..abfe6b2c01 100644 --- a/modules/global-db/dap_chain_global_db_driver_mdbx.c +++ b/modules/global-db/dap_chain_global_db_driver_mdbx.c @@ -72,10 +72,6 @@ typedef struct __db_ctx__ { UT_hash_handle hh; } dap_db_ctx_t; -static pthread_mutex_t s_db_ctx_mutex = PTHREAD_MUTEX_INITIALIZER; /* A mutex for working with a DB context */ - - - static dap_db_ctx_t *s_db_ctxs = NULL; /* A hash table of <group/subDB/table> == <MDBX DB context> */ static pthread_rwlock_t s_db_ctxs_rwlock = PTHREAD_RWLOCK_INITIALIZER; /* A read-write lock for working with a <s_db_ctxs>. */ @@ -197,7 +193,7 @@ MDBX_val l_key_iov, l_data_iov; /* So , at this point we are going to create (if not exist) 'table' for new group */ if ( (l_rc = strlen(a_group)) > DAP_GLOBAL_DB_GROUP_NAME_SIZE_MAX ) /* Check length of the group name */ - return log_it(L_ERROR, "Group name '%s' is too long (%d>%d)", a_group, l_rc, DAP_GLOBAL_DB_GROUP_NAME_SIZE_MAX), NULL; + return log_it(L_ERROR, "Group name '%s' is too long (%d>%lu)", a_group, l_rc, DAP_GLOBAL_DB_GROUP_NAME_SIZE_MAX), NULL; if ( !(l_db_ctx = DAP_NEW_Z(dap_db_ctx_t)) ) /* Allocate zeroed memory for new DB context */ return log_it(L_ERROR, "Cannot allocate DB context for '%s', errno=%d", a_group, errno), NULL; @@ -352,7 +348,7 @@ size_t l_upper_limit_of_db_size = 32*1024*1024*1024ULL; #endif - log_it(L_NOTICE, "Set maximum number of local groups: %d", DAP_GLOBAL_DB_GROUPS_COUNT_MAX); + log_it(L_NOTICE, "Set maximum number of local groups: %lu", DAP_GLOBAL_DB_GROUPS_COUNT_MAX); assert ( !mdbx_env_set_maxdbs (s_mdbx_env, DAP_GLOBAL_DB_GROUPS_COUNT_MAX) ); /* Set maximum number of the file-tables (MDBX subDB) according to number of supported groups */ @@ -563,11 +559,13 @@ dap_store_obj_t *l_obj; return NULL; } - /* Found ! Allocate memory for <store object> and <value> */ if ( (l_obj = DAP_CALLOC(1, sizeof( dap_store_obj_t ))) ) { - if ( (l_obj->value = DAP_CALLOC(1, (l_data.iov_len + 1) - sizeof(struct __record_suffix__))) ) + + if ( !(l_obj->key = DAP_CALLOC(1, (l_obj->key_len = (l_last_key.iov_len + 1)))) ) + l_rc = MDBX_PROBLEM, log_it (L_ERROR, "Cannot allocate a memory for store object key, errno=%d", errno); + else if ( (l_obj->value = DAP_CALLOC(1, (l_data.iov_len + 1) - sizeof(struct __record_suffix__))) ) { /* Fill the <store obj> by data from the retrieved record */ l_obj->value_len = l_data.iov_len - sizeof(struct __record_suffix__); diff --git a/modules/global-db/include/dap_chain_global_db.h b/modules/global-db/include/dap_chain_global_db.h index e553046f3d..3ee5764d04 100644 --- a/modules/global-db/include/dap_chain_global_db.h +++ b/modules/global-db/include/dap_chain_global_db.h @@ -14,12 +14,6 @@ #define GROUP_LOCAL_NODE_LAST_ID "local.node.last_id" #define GROUP_LOCAL_NODE_ADDR "local.node-addr" -#define DAP_GLOBAL_DB_GROUP_NAME_SIZE_MAX 128 /* A maximum size of group name */ -#define DAP_GLOBAL_DB_GROUPS_COUNT_MAX 1024 /* A maximum number of groups */ -#define DAP_GLOBAL_DB_KEY_MAX 512 /* A limit for the key's length in DB */ -#define DAP_GLOBAL_DB_MAX_OBJS 8192 /* A maximum number of objects to be returned by - read_srore_obj() */ - enum { DAP_DB$K_OPTYPE_ADD = 0x61, /* 'a', */ /* Operation Type = INSERT/ADD */ DAP_DB$K_OPTYPE_DEL = 0x64, /* 'd', */ /* -- // -- DELETE */ diff --git a/modules/global-db/include/dap_chain_global_db_driver.h b/modules/global-db/include/dap_chain_global_db_driver.h index 0c9e3af898..c12c23aac8 100644 --- a/modules/global-db/include/dap_chain_global_db_driver.h +++ b/modules/global-db/include/dap_chain_global_db_driver.h @@ -36,11 +36,11 @@ #include <stddef.h> #include <stdint.h> -#define DAP_GLOBAL_DB_GROUP_NAME_SIZE_MAX 128 /* A maximum size of group name */ -#define DAP_GLOBAL_DB_GROUPS_COUNT_MAX 1024 /* A maximum number of groups */ -#define DAP_GLOBAL_DB_KEY_MAX 512 /* A limit for the key's length in DB */ -#define DAP_GLOBAL_DB_MAX_OBJS 8192 /* A maximum number of objects to be returned by - read_srore_obj() */ +#define DAP_GLOBAL_DB_GROUP_NAME_SIZE_MAX 128UL /* A maximum size of group name */ +#define DAP_GLOBAL_DB_GROUPS_COUNT_MAX 1024UL /* A maximum number of groups */ +#define DAP_GLOBAL_DB_KEY_MAX 512UL /* A limit for the key's length in DB */ +#define DAP_GLOBAL_DB_MAX_OBJS 8192UL /* A maximum number of objects to be returned by + read_srore_obj() */ enum RECORD_FLAGS { RECORD_COMMON = 0, // 0000 diff --git a/modules/service/xchange/dap_chain_net_srv_xchange.c b/modules/service/xchange/dap_chain_net_srv_xchange.c index fd8509311d..a5129ad8a8 100644 --- a/modules/service/xchange/dap_chain_net_srv_xchange.c +++ b/modules/service/xchange/dap_chain_net_srv_xchange.c @@ -440,7 +440,7 @@ dap_chain_net_srv_xchange_price_t *s_xchange_price_from_order(dap_chain_net_t *a { dap_chain_net_srv_xchange_price_t *l_price = DAP_NEW_Z(dap_chain_net_srv_xchange_price_t); dap_srv_xchange_order_ext_t *l_ext = (dap_srv_xchange_order_ext_t *)a_order->ext_n_sign; - dap_chain_net_id_t l_net_buy_id = { .uint64 = dap_lendian_get64((uint8_t *)&l_ext->net_sell_id) }; + dap_chain_net_id_t l_net_buy_id = { .uint64 = l_ext->net_sell_id }; l_price->net_sell = dap_chain_net_by_id(l_net_buy_id); l_price->datoshi_sell = l_ext->datoshi_sell; strcpy(l_price->token_sell, l_ext->token_sell); -- GitLab