diff --git a/core/src/dap_file_utils.c b/core/src/dap_file_utils.c index 13113b124bfeb8dd78f9208c6cc112e4a109cfde..c7085846fce8ac4809e3fd387c631caeaa236c11 100755 --- a/core/src/dap_file_utils.c +++ b/core/src/dap_file_utils.c @@ -43,7 +43,7 @@ #include <sys/stat.h> #include <stdarg.h> -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS #include <windows.h> #include <io.h> #endif @@ -93,7 +93,7 @@ bool dap_file_test(const char * a_file_path) { if(!a_file_path) return false; -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS int attr = GetFileAttributesA(a_file_path); if(attr != -1 && (attr & FILE_ATTRIBUTE_NORMAL)) return true; @@ -117,7 +117,7 @@ bool dap_file_simple_test(const char * a_file_path) { if(!a_file_path) return false; -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS int attr = GetFileAttributesA(a_file_path); if(attr != -1) return true; @@ -141,7 +141,7 @@ bool dap_dir_test(const char * a_dir_path) { if(!a_dir_path) return false; -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS int attr = GetFileAttributesA(a_dir_path); if(attr != -1 && (attr & FILE_ATTRIBUTE_DIRECTORY)) return true; @@ -175,7 +175,7 @@ int dap_mkdir_with_parents(const char *a_dir_path) memcpy(path, a_dir_path, strlen(a_dir_path)); char *p; // skip the root component if it is present, i.e. the "/" in Unix or "C:\" in Windows -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS if(((path[0] >= 'a' && path[0] <= 'z') || (path[0] >= 'A' && path[0] <= 'Z')) && (path[1] == ':') && DAP_IS_DIR_SEPARATOR(path[2])) { p = path + 3; @@ -198,7 +198,7 @@ int dap_mkdir_with_parents(const char *a_dir_path) *p = '\0'; if(!dap_dir_test(path)) { -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS int result = mkdir(path); #else int result = mkdir(path, S_IRWXU | S_IRWXG | S_IRWXO); @@ -252,7 +252,7 @@ char* dap_path_get_basename(const char *a_file_name) // string only containing slashes return dap_strdup(DAP_DIR_SEPARATOR_S); -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS if (l_last_nonslash == 1 && dap_ascii_isalpha(a_file_name[0]) && a_file_name[1] == ':') @@ -264,7 +264,7 @@ char* dap_path_get_basename(const char *a_file_name) while(l_base >= 0 && !DAP_IS_DIR_SEPARATOR(a_file_name[l_base])) l_base--; -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS if (l_base == -1 && dap_ascii_isalpha(a_file_name[0]) && a_file_name[1] == ':') @@ -304,7 +304,7 @@ bool dap_path_is_absolute(const char *a_file_name) if(DAP_IS_DIR_SEPARATOR(a_file_name[0])) return true; -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS // Recognize drive letter on native Windows if (dap_ascii_isalpha(a_file_name[0]) && a_file_name[1] == ':' && DAP_IS_DIR_SEPARATOR (a_file_name[2])) @@ -338,7 +338,7 @@ const char *dap_path_skip_root (const char *file_name) char *p; p = strchr(file_name + 2, DAP_DIR_SEPARATOR); -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS { char *q; q = strchr (file_name + 2, '/'); @@ -370,7 +370,7 @@ const char *dap_path_skip_root (const char *file_name) return (char*) file_name; } -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS /* Skip X:\ */ if (dap_ascii_isalpha (file_name[0]) && file_name[1] == ':' && @@ -403,7 +403,7 @@ char* dap_path_get_dirname(const char *a_file_name) l_base = strrchr(a_file_name, DAP_DIR_SEPARATOR); -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS { char *l_q; l_q = strrchr (a_file_name, '/'); @@ -414,7 +414,7 @@ char* dap_path_get_dirname(const char *a_file_name) if(!l_base) { -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS if (dap_ascii_isalpha(a_file_name[0]) && a_file_name[1] == ':') { char l_drive_colon_dot[4]; @@ -434,7 +434,7 @@ char* dap_path_get_dirname(const char *a_file_name) while(l_base > a_file_name && DAP_IS_DIR_SEPARATOR(*l_base)) l_base--; -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS /* base points to the char before the last slash. * * In case file_name is the root of a drive (X:\) or a child of the @@ -503,7 +503,7 @@ void dap_subs_free(dap_list_name_directories_t *subs_list){ dap_list_name_directories_t *dap_get_subs(const char *a_path_dir){ dap_list_name_directories_t *list = NULL; dap_list_name_directories_t *element; -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS size_t m_size = strlen(a_path_dir); char *m_path = DAP_NEW_SIZE(char, m_size + 2); memcpy(m_path, a_path_dir, m_size); @@ -639,7 +639,7 @@ static bool get_contents_stdio(const char *filename, FILE *f, char **contents, s return false; } -#ifndef _WIN32 +#ifndef DAP_OS_WINDOWS static bool dap_get_contents_regfile(const char *filename, struct stat *stat_buf, int fd, char **contents, size_t *length) @@ -730,7 +730,7 @@ static bool dap_get_contents_posix(const char *filename, char **contents, size_t } } -#else /* _WIN32 */ +#else /* DAP_OS_WINDOWS */ static bool dap_get_contents_win32(const char *filename, char **contents, size_t *length) { @@ -761,7 +761,7 @@ bool dap_file_get_contents(const char *filename, char **contents, size_t *length if(length) *length = 0; -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS return dap_get_contents_win32 (filename, contents, length); #else return dap_get_contents_posix(filename, contents, length); @@ -940,7 +940,7 @@ char* dap_build_path(const char *separator, const char *first_element, ...) return str; } -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS static char* dap_build_pathname_va(const char *first_element, va_list *args, char **str_array) { @@ -1055,7 +1055,7 @@ static char* dap_build_filename_va(const char *first_argument, va_list *args, ch { char *str; -#ifndef _WIN32 +#ifndef DAP_OS_WINDOWS str = dap_build_path_va(DAP_DIR_SEPARATOR_S, first_argument, args, str_array); #else str = dap_build_pathname_va(first_argument, args, str_array); @@ -1269,7 +1269,7 @@ char* dap_canonicalize_filename(const char *filename, const char *relative_to) */ char* dap_get_current_dir(void) { -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS char *dir = NULL; wchar_t dummy[2], *wdir; diff --git a/global-db/dap_global_db.c b/global-db/dap_global_db.c index 68f73e840f60bc861afb6e8750ef56494634254d..712ed9e2f29343bc3d2f50362cb601e154ea6f7d 100644 --- a/global-db/dap_global_db.c +++ b/global-db/dap_global_db.c @@ -184,8 +184,8 @@ int dap_global_db_init() const char **l_white_list = dap_config_get_array_str(g_config, "global_db", "white_list_sync_groups", &l_size_white_list); for (int i = 0; i < l_size_white_list; i++) s_dbi->whitelist = dap_list_append(s_dbi->whitelist, dap_strdup(l_white_list[i])); - // One year for objects lifetime by default - s_dbi->store_time_limit = dap_config_get_item_uint32_default(g_config, "global_db", "store_time_limit", 365 * 24); + // One week for objects lifetime by default + s_dbi->store_time_limit = dap_config_get_item_uint64(g_config, "global_db", "ttl"); // Time between sync attempts, in seconds s_dbi->sync_idle_time = dap_config_get_item_uint32_default(g_config, "global_db", "sync_idle_time", 30); } @@ -267,15 +267,24 @@ static int s_store_obj_apply(dap_global_db_instance_t *a_dbi, dap_store_obj_t *a a_obj->group, a_obj->key); return -12; } - // Limit time - uint64_t l_time_store_lim_sec = l_cluster->ttl ? l_cluster->ttl : l_cluster->dbi->store_time_limit * 3600ULL; - uint64_t l_limit_time = l_time_store_lim_sec ? dap_nanotime_now() - dap_nanotime_from_sec(l_time_store_lim_sec) : 0; - if (l_limit_time && a_obj->timestamp < l_limit_time) { + // Check time + dap_nanotime_t l_ttl = dap_nanotime_from_sec(l_cluster->ttl), + l_now = dap_nanotime_now(); + if ( a_obj->timestamp > l_now ) { if (g_dap_global_db_debug_more) { char l_ts_str[DAP_TIME_STR_SIZE]; dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), dap_nanotime_to_sec(a_obj->timestamp)); - log_it(L_NOTICE, "Rejected too old object with group %s and key %s and timestamp %s", - a_obj->group, a_obj->key, l_ts_str); + log_it(L_NOTICE, "Rejected record \"%s : %s\" from future ts %s", + a_obj->group, a_obj->key, l_ts_str); + } + return -13; + } + if ( l_ttl && a_obj->timestamp + l_ttl < l_now ) { + if (g_dap_global_db_debug_more) { + char l_ts_str[DAP_TIME_STR_SIZE]; + dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), dap_nanotime_to_sec(a_obj->timestamp)); + log_it(L_NOTICE, "Rejected too old record \"%s : %s\" ts %s", + a_obj->group, a_obj->key, l_ts_str); } return -13; } diff --git a/global-db/dap_global_db_ch.c b/global-db/dap_global_db_ch.c index 384dc604ecd6dc9e91f44dd9e4c84286c79c0eed..23fe998c9e86100c3a29c4bad91c5dd9daf81716 100644 --- a/global-db/dap_global_db_ch.c +++ b/global-db/dap_global_db_ch.c @@ -104,11 +104,11 @@ bool s_proc_thread_reader(void *a_arg) dap_global_db_hash_pkt_t *l_hashes_pkt = dap_global_db_driver_hashes_read(l_group, l_pkt->last_hash); if (l_hashes_pkt && l_hashes_pkt->hashes_count) { dap_global_db_driver_hash_t *l_hashes_diff = (dap_global_db_driver_hash_t *)(l_hashes_pkt->group_n_hashses + l_hashes_pkt->group_name_len); - uint64_t l_time_store_lim_sec = l_cluster->ttl ? l_cluster->ttl : l_cluster->dbi->store_time_limit * 3600ULL; - uint64_t l_limit_time = l_time_store_lim_sec ? dap_nanotime_now() - dap_nanotime_from_sec(l_time_store_lim_sec) : 0; - if (l_limit_time) { + dap_nanotime_t l_ttl = dap_nanotime_from_sec(l_cluster->ttl); + if (l_ttl) { + dap_nanotime_t l_now = dap_nanotime_now(); uint32_t i; - for (i = 0; i < l_hashes_pkt->hashes_count && be64toh((l_hashes_diff + i)->bets) < l_limit_time; i++) { + for (i = 0; i < l_hashes_pkt->hashes_count && be64toh((l_hashes_diff + i)->bets) + l_ttl < l_now; i++) { if (dap_global_db_driver_hash_is_blank(l_hashes_diff + i)) break; dap_store_obj_t l_to_del = { .timestamp = be64toh((l_hashes_diff + i)->bets), diff --git a/global-db/dap_global_db_cluster.c b/global-db/dap_global_db_cluster.c index 9db48a834db40bd17a2a3563493a4cb7ae88e83f..508c7da86aedc66e805c3ba357f6203cebb9204d 100644 --- a/global-db/dap_global_db_cluster.c +++ b/global-db/dap_global_db_cluster.c @@ -41,13 +41,12 @@ static dap_global_db_cluster_t *s_local_cluster = NULL, *s_global_cluster = NULL int dap_global_db_cluster_init() { dap_global_db_ch_init(); - // Pseudo-cluster for global scope if ( !(s_global_cluster = dap_global_db_cluster_add( dap_global_db_instance_get_default(), DAP_STREAM_CLUSTER_GLOBAL, - *(dap_guuid_t *)&uint128_0, DAP_GLOBAL_DB_CLUSTER_GLOBAL, - DAP_GLOBAL_DB_UNCLUSTERED_TTL, true, - DAP_GDB_MEMBER_ROLE_GUEST, DAP_CLUSTER_TYPE_SYSTEM))) + *(dap_guuid_t*)&uint128_0, DAP_GLOBAL_DB_CLUSTER_GLOBAL, + dap_config_get_item_uint64_default(g_config, "global_db", "ttl_unclustered", DAP_GLOBAL_DB_UNCLUSTERED_TTL), + true, DAP_GDB_MEMBER_ROLE_GUEST, DAP_CLUSTER_TYPE_SYSTEM))) return -1; // Pseudo-cluster for local scope (unsynced groups). @@ -94,7 +93,7 @@ void dap_global_db_cluster_broadcast(dap_global_db_cluster_t *a_cluster, dap_sto } dap_global_db_cluster_t *dap_global_db_cluster_add(dap_global_db_instance_t *a_dbi, const char *a_mnemonim, dap_guuid_t a_guuid, - const char *a_group_mask, uint32_t a_ttl, bool a_owner_root_access, + const char *a_group_mask, uint64_t a_ttl, bool a_owner_root_access, dap_global_db_role_t a_default_role, dap_cluster_type_t a_links_cluster_role) { dap_global_db_cluster_t *it; @@ -140,7 +139,7 @@ dap_global_db_cluster_t *dap_global_db_cluster_add(dap_global_db_instance_t *a_d DAP_DELETE(l_cluster); return NULL; } - l_cluster->ttl = (uint64_t)a_ttl * 3600; // Convert to seconds + l_cluster->ttl = a_dbi->store_time_limit ? a_ttl ? dap_min(a_dbi->store_time_limit, a_ttl) : a_dbi->store_time_limit : a_ttl; l_cluster->default_role = a_default_role; l_cluster->owner_root_access = a_owner_root_access; l_cluster->dbi = a_dbi; diff --git a/global-db/include/dap_global_db.h b/global-db/include/dap_global_db.h index cdb370014f80799f2ed241d838d84fb7578b3e70..b020cfaa0df74d2d9cbf3ef60d660771da0d8a0a 100644 --- a/global-db/include/dap_global_db.h +++ b/global-db/include/dap_global_db.h @@ -41,7 +41,7 @@ typedef struct dap_global_db_instance { const char *driver_name; // GlobalDB driver name dap_list_t *whitelist; dap_list_t *blacklist; - uint32_t store_time_limit; + uint64_t store_time_limit; dap_global_db_cluster_t *clusters; dap_enc_key_t *signing_key; uint32_t sync_idle_time; diff --git a/global-db/include/dap_global_db_cluster.h b/global-db/include/dap_global_db_cluster.h index 9ce5c2eb1c8f9130078d93477bfcf1203b71d7a9..642803433b44ca16c80ba3d101360753d93bc175 100644 --- a/global-db/include/dap_global_db_cluster.h +++ b/global-db/include/dap_global_db_cluster.h @@ -32,7 +32,7 @@ along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/ #define DAP_GLOBAL_DB_CLUSTER_GLOBAL DAP_STREAM_CLUSTER_GLOBAL ".*" // This groups mask is for globally broadcasting grops #define DAP_GLOBAL_DB_CLUSTER_LOCAL DAP_STREAM_CLUSTER_LOCAL ".*" // This groups mask is for not broadcasting groups -#define DAP_GLOBAL_DB_UNCLUSTERED_TTL 1 // Time-to-life for "global.*" mask, 1 hour by default +#define DAP_GLOBAL_DB_UNCLUSTERED_TTL 3600 // Time-to-life for "global.*" mask, 1 hour by default typedef enum dap_global_db_role { DAP_GDB_MEMBER_ROLE_NOBODY = 0, // No access @@ -92,7 +92,7 @@ void dap_global_db_cluster_deinit(); dap_global_db_cluster_t *dap_global_db_cluster_by_group(dap_global_db_instance_t *a_dbi, const char *a_group_name); void dap_global_db_cluster_broadcast(dap_global_db_cluster_t *a_cluster, dap_store_obj_t *a_store_obj); dap_global_db_cluster_t *dap_global_db_cluster_add(dap_global_db_instance_t *a_dbi, const char *a_mnemonim, dap_guuid_t a_guuid, - const char *a_group_mask, uint32_t a_ttl, bool a_owner_root_access, + const char *a_group_mask, uint64_t a_ttl, bool a_owner_root_access, dap_global_db_role_t a_default_role, dap_cluster_type_t a_links_cluster_role); DAP_STATIC_INLINE int dap_global_db_cluster_member_delete(dap_global_db_cluster_t *a_cluster, dap_stream_node_addr_t *a_member_addr) { diff --git a/io/dap_context.c b/io/dap_context.c index 1968b7c477942559979877525d3d78f93466bcd1..c2ac6741799125cab63a5fede46db714aae00600 100644 --- a/io/dap_context.c +++ b/io/dap_context.c @@ -60,6 +60,7 @@ #include <mswsock.h> #include <io.h> #include <winternl.h> +#include <ntstatus.h> #endif #ifdef DAP_OS_DARWIN @@ -86,19 +87,15 @@ #include "dap_proc_thread.h" #include "dap_worker.h" -pthread_key_t g_dap_context_pth_key; // Thread-specific object with pointer on current context +static _Thread_local dap_context_t *s_context = NULL; static void *s_context_thread(void *arg); // Context thread - -pthread_rwlock_t s_contexts_rwlock = PTHREAD_RWLOCK_INITIALIZER; -dap_list_t * s_contexts = NULL; /** * @brief dap_context_init * @return */ int dap_context_init() { - pthread_key_create(&g_dap_context_pth_key,NULL); #ifdef DAP_OS_UNIX struct rlimit l_fdlimit; if (getrlimit(RLIMIT_NOFILE, &l_fdlimit)) @@ -113,14 +110,12 @@ int dap_context_init() return 0; } -void dap_context_current_print(pthread_key_t g_dap_context_pth_key) +void dap_context_deinit() { - log_it(L_ATT, "[!] Not found data by key %d in LTS", g_dap_context_pth_key); } -void dap_context_deinit() -{ - pthread_key_delete(g_dap_context_pth_key); +dap_context_t* dap_context_current() { + return s_context; } /** @@ -138,10 +133,6 @@ dap_context_t * dap_context_new(int a_type) l_context->id = s_context_id_max; l_context->type = a_type; s_context_id_max++; - - pthread_rwlock_wrlock(&s_contexts_rwlock); - s_contexts = dap_list_prepend(s_contexts,l_context); - pthread_rwlock_unlock(&s_contexts_rwlock); return l_context; } @@ -262,7 +253,9 @@ static void *s_context_thread(void *a_arg) dap_context_msg_run_t * l_msg = (dap_context_msg_run_t*) a_arg; dap_context_t * l_context = l_msg->context; assert(l_context); - + if (s_context) + return log_it( L_ERROR, "Context %d already bound to current thread", s_context->id ), NULL; + s_context = l_context; l_context->cpu_id = l_msg->cpu_id; int l_priority = l_msg->priority; #ifdef DAP_OS_WINDOWS @@ -320,8 +313,6 @@ static void *s_context_thread(void *a_arg) pthread_setschedparam(pthread_self(), l_sched_policy, &l_sched_params);; } #endif // DAP_OS_WINDOWS - - pthread_setspecific(g_dap_context_pth_key, l_context); // Now we're running and initalized for sure, so we can assign flags to the current context l_context->running_flags = l_msg->flags; l_context->is_running = true; @@ -354,11 +345,6 @@ static void *s_context_thread(void *a_arg) log_it(L_NOTICE,"Exiting context #%u", l_context->id); - // Removes from the list - pthread_rwlock_wrlock(&s_contexts_rwlock); - s_contexts = dap_list_remove (s_contexts,l_context); - pthread_rwlock_unlock(&s_contexts_rwlock); - // Free memory. Because nobody expected to work with context outside itself it have to be safe pthread_cond_destroy(&l_context->started_cond); pthread_mutex_destroy(&l_context->started_mutex); @@ -397,10 +383,11 @@ int dap_worker_thread_loop(dap_context_t * a_context) dap_overlapped_t *ol; HANDLE ev; WINBOOL ev_signaled; per_io_type_t op; + DWORD flags; debug_if(g_debug_reactor, L_INFO, "Completed %lu items in context #%d", l_entries_num, a_context->id); - for (ULONG i = 0; i < l_entries_num; dap_overlapped_free(ol), ++i, op = 0, ev = NULL, ev_signaled = FALSE) { - // DWORD flags; + for ( ULONG i = 0; i < l_entries_num; dap_overlapped_free(ol), op = '\0', ev = NULL, ev_signaled = FALSE, ++i ) { l_errno = 0; + l_bytes = ol_entries[i].dwNumberOfBytesTransferred; if (( ol = (dap_overlapped_t*)ol_entries[i].lpOverlapped )) { op = ol->op; ev = ol->ol.hEvent; @@ -417,17 +404,22 @@ int dap_worker_thread_loop(dap_context_t * a_context) case io_write: l_cur = ev ? (dap_events_socket_t*)ol_entries[i].lpCompletionKey : dap_context_find(a_context, (dap_events_socket_uuid_t)ol_entries[i].lpCompletionKey); + if ( !l_cur ) { + if (ev) log_it(L_ERROR, "Completion of op '%c', but key is null! Lost %lu bytes", op, l_bytes); + else log_it(L_ERROR, "Completion of op '%c', but key "DAP_FORMAT_ESOCKET_UUID" not found! Lost %lu bytes", + op, (dap_events_socket_uuid_t)ol_entries[i].lpCompletionKey, l_bytes); + continue; + } break; default: l_cur = (dap_events_socket_t*)ol_entries[i].lpCompletionKey; + if ( !l_cur ) { + log_it(L_ERROR, "Completion with null key! Dump it"); + continue; + } break; } - if ( !l_cur ) { - log_it(L_ERROR, "NULL esocket completion, nothing to do"); - continue; - } - l_bytes = ol_entries[i].dwNumberOfBytesTransferred; uint32_t l_cur_flags = l_cur->flags; DWORD l_buf_in_size = l_cur->buf_in_size, l_buf_out_size = l_cur->buf_out_size; debug_if(g_debug_reactor, L_DEBUG, "\n\tCompletion on \"%s\" "DAP_FORMAT_ESOCKET_UUID", bytes: %lu, operation: '%c', " @@ -442,7 +434,6 @@ int dap_worker_thread_loop(dap_context_t * a_context) l_buf_in_size, l_buf_out_size, ev ? ev_signaled ? "SET" : "UNSET" : "N/A", l_cur->pending_read, l_cur->pending_write); - if ( /*!l_cur->context && */ FLAG_CLOSE(l_cur->flags) ) { // Pending delete, socket will be closed if ( op == io_read || l_cur->type == DESCRIPTOR_TYPE_TIMER ) l_cur->pending_read = 0; @@ -457,11 +448,24 @@ int dap_worker_thread_loop(dap_context_t * a_context) // AcceptEx completed l_cur->pending_read = 0; if ( NT_ERROR(ol->ol.Internal) ) { - log_it(L_ERROR, "\"AcceptEx\" on "DAP_FORMAT_ESOCKET_UUID" : %zu failed, ntstatus %llx : %s", - l_cur->uuid, l_cur->socket, ol->ol.Internal, dap_str_ntstatus(ol->ol.Internal)); - l_errno = pfnRtlNtStatusToDosError(ol->ol.Internal); - dap_events_socket_set_readable_unsafe_ex(l_cur, true, ol); - ol = NULL; + log_it(L_ERROR, "\"AcceptEx\" on "DAP_FORMAT_ESOCKET_UUID" : %zu failed, ntstatus 0x%llx : %s", + l_cur->uuid, l_cur->socket, ol->ol.Internal, dap_str_ntstatus(ol->ol.Internal)); + closesocket(l_cur->socket2); + if ( ol->ol.Internal == STATUS_CONNECTION_RESET ) { + l_errno = WSAECONNRESET; // It's ok, just continue accept()'ing + dap_events_socket_set_readable_unsafe_ex(l_cur, true, ol); + ol = NULL; + } else { + // l_errno = pfnRtlNtStatusToDosError(ol->ol.Internal); + /* + TODO: though another syscall is discouraged here, there's no way to obtain WSA last error + which the cross-platform error-handling functions rely on, since NtStatusToDosError() + returns irrelevant error code for completed WSA*(). NTSTATUS propagation needs tweaking + error handlers for Windows. We'll probably get down to it later + */ + WSAGetOverlappedResult(l_cur->socket, &ol->ol, &l_bytes, FALSE, &flags); + l_errno = WSAGetLastError(); + } break; } @@ -523,10 +527,18 @@ int dap_worker_thread_loop(dap_context_t * a_context) l_cur->pending_read = 0; if ( !l_bytes ) { if ( NT_ERROR(ol->ol.Internal) ) { - l_errno = pfnRtlNtStatusToDosError(ol->ol.Internal); - log_it(L_ERROR, "Connection to %s : %u closed with error %d, ntstatus %llx: %s", - l_cur->remote_addr_str, l_cur->remote_port, l_errno, ol->ol.Internal, - dap_str_ntstatus(ol->ol.Internal)); + // l_errno = pfnRtlNtStatusToDosError(ol->ol.Internal); + /* + TODO: though another syscall is discouraged here, there's no way to obtain WSA last error + which the cross-platform error-handling functions rely on, since NtStatusToDosError() + returns irrelevant error code for completed WSA*(). NTSTATUS propagation needs tweaking + error handlers for Windows. We'll probably get down to it later + */ + WSAGetOverlappedResult(l_cur->socket, &ol->ol, &l_bytes, FALSE, &flags); + l_errno = WSAGetLastError(); + log_it(L_ERROR, "Connection to %s : %u closed with error %d: \"%s\", ntstatus 0x%llx", + l_cur->remote_addr_str, l_cur->remote_port, l_errno, dap_strerror(l_errno), + ol->ol.Internal); } else { log_it(L_INFO, "Connection to %s : %u closed", l_cur->remote_addr_str, l_cur->remote_port); if (!l_cur->no_close) @@ -564,10 +576,27 @@ int dap_worker_thread_loop(dap_context_t * a_context) --l_cur->pending_write; if ( !l_cur->server && l_cur->flags & DAP_SOCK_CONNECTING ) { if ( NT_ERROR(ol->ol.Internal) ) { - l_errno = pfnRtlNtStatusToDosError(ol->ol.Internal); - log_it(L_ERROR, "ConnectEx to %s : %u failed, ntstatus %llx: %s", - l_cur->remote_addr_str, l_cur->remote_port, ol->ol.Internal, - dap_str_ntstatus(ol->ol.Internal)); + // l_errno = pfnRtlNtStatusToDosError(ol->ol.Internal); + /* + TODO: though another syscall is discouraged here, there's no way to obtain WSA last error + which the cross-platform error-handling functions rely on, since NtStatusToDosError() + returns irrelevant error code for completed WSA*(). NTSTATUS propagation needs tweaking + error handlers for Windows. We'll probably get down to it later + */ + WSAGetOverlappedResult(l_cur->socket, &ol->ol, &l_bytes, FALSE, &flags); + l_errno = WSAGetLastError(); + log_it(L_ERROR, "ConnectEx to %s : %u failed with error %d: \"%s\", ntstatus 0x%llx", + l_cur->remote_addr_str, l_cur->remote_port, l_errno, dap_strerror(l_errno), + ol->ol.Internal); + /* TODO: optimization + switch (l_errno) { + case WSAECONNREFUSED: + case WSAENETUNREACH: + case WSAETIMEDOUT: + // Retry with the same socket! + break; + default: break; + } */ break; } else if ( setsockopt(l_cur->socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0) ) { l_errno = WSAGetLastError(); @@ -586,10 +615,18 @@ int dap_worker_thread_loop(dap_context_t * a_context) break; } else if ( !l_bytes ) { if ( NT_ERROR(ol->ol.Internal) ) { - l_errno = pfnRtlNtStatusToDosError(ol->ol.Internal); - log_it(L_ERROR, "Connection on es %zu to remote %s : %u closed with ntstatus %llx: %s", - l_cur->socket, l_cur->remote_addr_str, l_cur->remote_port, ol->ol.Internal, - dap_str_ntstatus(ol->ol.Internal)); + // l_errno = pfnRtlNtStatusToDosError(ol->ol.Internal); + /* + TODO: though another syscall is discouraged here, there's no way to obtain WSA last error + which the cross-platform error-handling functions rely on, since NtStatusToDosError() + returns irrelevant error code for completed WSA*(). NTSTATUS propagation needs tweaking + error handlers for Windows. We'll probably get down to it later + */ + WSAGetOverlappedResult(l_cur->socket, &ol->ol, &l_bytes, FALSE, &flags); + l_errno = WSAGetLastError(); + log_it(L_ERROR, "Connection on es %zu to remote %s : %u closed with error %d: %s, ntstatus 0x%llx", + l_cur->socket, l_cur->remote_addr_str, l_cur->remote_port, l_errno, dap_strerror(l_errno), + ol->ol.Internal); } else log_it(L_INFO, "Connection on es %zu to remote %s : %u closed", l_cur->socket, l_cur->remote_addr_str, l_cur->remote_port); @@ -1507,7 +1544,7 @@ int dap_context_add(dap_context_t * a_context, dap_events_socket_t * a_es ) } #ifdef DAP_EVENTS_CAPS_IOCP - // TODO: reassignment requires some extra calls to WDK: + // TODO: reassignment requires some extra calls to WDK. Also there must be no pending I/O ops /* #include <ntifs.h> int len = sizeof(FILE_COMPLETION_INFORMATION); diff --git a/io/dap_net.c b/io/dap_net.c index 36062e77c83eee62e370b433e3e6255bd05e9c2b..6d865590592556d63fa22654eb10fc6f0e9284d0 100644 --- a/io/dap_net.c +++ b/io/dap_net.c @@ -51,18 +51,26 @@ int dap_net_resolve_host(const char *a_host, const char *a_port, bool a_numeric_ struct addrinfo *l_res = NULL, l_hints = { .ai_flags = l_ai_flags, .ai_family = AF_UNSPEC, .ai_socktype = SOCK_STREAM }; if (a_family) *a_family = AF_UNSPEC; - if (( l_ret = getaddrinfo(a_host, a_port, &l_hints, &l_res) ) || !l_res ) - return log_it(L_ERROR, "getaddrinfo() failed, error %d \"%s\"", l_ret, gai_strerror(l_ret) ), -2; - if (a_family) - *a_family = l_res->ai_family; - l_ret = l_res->ai_addrlen; - memcpy(a_addr_out, l_res->ai_addr, l_res->ai_addrlen); - freeaddrinfo(l_res); + if (( l_ret = getaddrinfo(a_host, a_port, &l_hints, &l_res) ) || !l_res ) { +#ifdef DAP_OS_WINDOWS + _set_errno( WSAGetLastError() ); + log_it( L_ERROR, "getaddrinfo() failed, error %d: \"%s\"", errno, dap_strerror(errno) ); +#else + log_it( L_ERROR, "getaddrinfo() failed, error %d: \"%s\"", l_ret, gai_strerror(l_ret) ); +#endif + l_ret = -2; + } else { + if (a_family) + *a_family = l_res->ai_family; + l_ret = l_res->ai_addrlen; + memcpy(a_addr_out, l_res->ai_addr, l_res->ai_addrlen); + freeaddrinfo(l_res); + } return l_ret; } int dap_net_parse_config_address(const char *a_src, char *a_addr, uint16_t *a_port, struct sockaddr_storage *a_saddr, int *a_family) { - dap_return_val_if_fail_err( !!a_src && ( !!a_addr || !!a_port ), 0, "Required args are not provided"); + dap_return_val_if_fail_err( !!a_src && ( !!a_addr || !!a_port ), -1, "Required args are not provided"); int l_len = 0, l_type = 0; char *l_bpos = NULL, *l_cpos = NULL; /* diff --git a/io/dap_server.c b/io/dap_server.c index 48c1e0674f94b883091d0258055d799f608dc16f..99406bce6618b3766e30b647b79fe0ba29d95959 100644 --- a/io/dap_server.c +++ b/io/dap_server.c @@ -225,9 +225,9 @@ int dap_server_listen_addr_add( dap_server_t *a_server, const char *a_addr, uint l_es->listener_port = a_port; l_es->addr_storage = l_saddr; l_es->type = a_type; - dap_worker_add_events_socket_auto(l_es); + l_es->no_close = true; a_server->es_listeners = dap_list_prepend(a_server->es_listeners, l_es); - return 0; + return !!dap_worker_add_events_socket_auto(l_es); } int dap_server_callbacks_set(dap_server_t* a_server, dap_events_socket_callbacks_t *a_server_cbs, dap_events_socket_callbacks_t *a_client_cbs) { @@ -374,31 +374,7 @@ static void s_es_server_accept(dap_events_socket_t *a_es_listener, SOCKET a_remo log_it(L_ERROR, "Unsupported protocol family %hu from accept()", l_family); break; } -#ifdef DAP_EVENTS_CAPS_IOCP dap_worker_add_events_socket( dap_events_worker_get_auto(), l_es_new ); -#else - dap_worker_t *l_worker = dap_events_worker_get_auto(); - if (l_worker->id == a_es_listener->worker->id) { -#ifdef DAP_OS_UNIX -#if defined (SO_INCOMING_CPU) - int l_cpu = l_worker->id; - setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu)); -#endif -#endif - l_es_new->worker = l_worker; - l_es_new->last_time_active = time(NULL); - if (dap_worker_add_events_socket_unsafe(l_worker, l_es_new)) { - log_it(L_CRITICAL, "Can't add event socket's handler to worker i/o poll mechanism with error %d", errno); - return; - } - if (l_es_new->callbacks.new_callback) - l_es_new->callbacks.new_callback(l_es_new, NULL); - l_es_new->is_initalized = true; - debug_if(g_debug_reactor, L_INFO, "Direct addition of esocket %p uuid 0x%"DAP_UINT64_FORMAT_x" to worker %d", - l_es_new, l_es_new->uuid, l_worker->id); - } else - dap_worker_add_events_socket(l_worker, l_es_new); -#endif } /** diff --git a/io/dap_worker.c b/io/dap_worker.c index 2f78107886cb0b3f49e267f66a4783944bc6450d..5abfa68a9fc088b33b1e4fdfc4180779284d4b03 100644 --- a/io/dap_worker.c +++ b/io/dap_worker.c @@ -45,7 +45,7 @@ typedef struct dap_worker_msg_callback { void * arg; } dap_worker_msg_callback_t; -pthread_key_t g_pth_key_worker; +static _Thread_local dap_worker_t* s_worker = NULL; static time_t s_connection_timeout = 60; // seconds @@ -58,6 +58,10 @@ static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg); #endif static void s_queue_callback_callback( dap_events_socket_t * a_es, void * a_arg); +dap_worker_t *dap_worker_get_current() { + return s_worker; +} + /** * @brief dap_worker_init * @param a_threads_count @@ -69,8 +73,6 @@ int dap_worker_init( size_t a_conn_timeout ) if ( a_conn_timeout ) s_connection_timeout = a_conn_timeout; - pthread_key_create( &g_pth_key_worker, NULL); - return 0; } @@ -99,10 +101,11 @@ static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags) */ int dap_worker_context_callback_started(dap_context_t * a_context, void *a_arg) { - dap_worker_t *l_worker = (dap_worker_t *) a_arg; + dap_worker_t *l_worker = (dap_worker_t*) a_arg; assert(l_worker); - pthread_setspecific(g_pth_key_worker, l_worker); - + if (s_worker) + return log_it(L_ERROR, "Worker %d is already assigned to current thread %u", s_worker->id), -1; + s_worker = l_worker; #if defined(DAP_EVENTS_CAPS_KQUEUE) a_context->kqueue_fd = kqueue(); @@ -181,15 +184,16 @@ int dap_worker_add_events_socket_unsafe(dap_worker_t *a_worker, dap_events_socke { int err = dap_context_add(a_worker->context, a_esocket); if (!err) { -#ifndef DAP_EVENTS_CAPS_IOCP - a_esocket->is_initalized = true; -#endif switch (a_esocket->type) { case DESCRIPTOR_TYPE_SOCKET_UDP: case DESCRIPTOR_TYPE_SOCKET_CLIENT: case DESCRIPTOR_TYPE_SOCKET_LISTENING: a_esocket->last_time_active = time(NULL); - default:; +#ifdef SO_INCOMING_CPU + int l_cpu = a_worker->context->cpu_id; + setsockopt(a_esocket->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu)); +#endif + default: break; } } return err; @@ -209,11 +213,9 @@ static int s_queue_es_add(dap_events_socket_t *a_es, void * a_arg) assert(l_context); dap_worker_t * l_worker = a_es->worker; assert(l_worker); + if (!a_arg) + return log_it(L_ERROR,"NULL esocket accepted to add on worker #%u", l_worker->id), -1; dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg; - if (!l_es_new){ - log_it(L_ERROR,"NULL esocket accepted to add on worker #%u", l_worker->id); - return -1; - } debug_if(g_debug_reactor, L_DEBUG, "Added es %p \"%s\" [%s] to worker #%d", l_es_new, dap_events_socket_get_type_str(l_es_new), @@ -234,20 +236,7 @@ static int s_queue_es_add(dap_events_socket_t *a_es, void * a_arg) return -2; } - switch( l_es_new->type){ - - case DESCRIPTOR_TYPE_SOCKET_UDP: - case DESCRIPTOR_TYPE_SOCKET_CLIENT: - case DESCRIPTOR_TYPE_SOCKET_LISTENING:{ - l_es_new->last_time_active = time(NULL); -#if defined (DAP_OS_UNIX) && defined (SO_INCOMING_CPU) - int l_cpu = l_worker->context->cpu_id; - setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu)); -#endif - } break; - default: {} - } - if (dap_context_add(l_context, l_es_new)) { + if ( dap_worker_add_events_socket_unsafe(l_worker, l_es_new) ) { log_it(L_ERROR, "Can't add event socket's handler to worker i/o poll mechanism with error %d", errno); return -3; } @@ -261,7 +250,6 @@ static int s_queue_es_add(dap_events_socket_t *a_es, void * a_arg) l_es_new->callbacks.worker_assign_callback(l_es_new, l_worker); l_es_new->is_initalized = true; - return 0; } diff --git a/io/include/dap_context.h b/io/include/dap_context.h index 960a2f013df683fc3938301df9009d652c1f39c0..9f719dda6b2fecb60398d72a2e1752dcc41d64e0 100644 --- a/io/include/dap_context.h +++ b/io/include/dap_context.h @@ -136,9 +136,6 @@ struct { #define DAP_CONTEXT_PRIORITY_LOW -3 #endif -// pthread kernel object for current context pointer -extern pthread_key_t g_dap_context_pth_key; - /// Next functions are thread-safe int dap_context_init(); // Init void dap_context_deinit(); // Deinit @@ -156,21 +153,12 @@ int dap_context_run(dap_context_t * a_context,int a_cpu_id, int a_sched_policy, void dap_context_stop_n_kill(dap_context_t * a_context); void dap_context_wait(dap_context_t * a_context); -void dap_context_current_print(pthread_key_t g_dap_context_pth_key); /** * @brief dap_context_current Get current context * @return Returns current context(if present, if not returns NULL) */ -static inline dap_context_t * dap_context_current() -{ - dap_context_t* l_ret = (dap_context_t*) pthread_getspecific(g_dap_context_pth_key); - if (!l_ret) { - dap_context_current_print(g_dap_context_pth_key); - } - return l_ret; - -} +dap_context_t* dap_context_current(); /// ALL THIS FUNCTIONS ARE UNSAFE AND SHOULD BE MOVED TO DAP_WORKER SUBTYPE! CALL THEM ONLY INSIDE THEIR OWN CONTEXT!! diff --git a/io/include/dap_worker.h b/io/include/dap_worker.h index 36fb86543e61efd8294fd15005c6efe4dd34b2de..45860131d9ae60e03fdb4bbed9c12f5341a2f6ae 100644 --- a/io/include/dap_worker.h +++ b/io/include/dap_worker.h @@ -75,8 +75,6 @@ typedef struct dap_worker_msg_io{ // Message for callback execution typedef void (*dap_worker_callback_t)(void *a_arg); -extern pthread_key_t g_pth_key_worker; - #define DAP_WORKER(a) (dap_worker_t *)((a)->_inheritor) #ifdef __cplusplus @@ -86,9 +84,7 @@ extern "C" { int dap_worker_init( size_t a_conn_timeout ); void dap_worker_deinit(); -static inline dap_worker_t * dap_worker_get_current(){ - return (dap_worker_t*) pthread_getspecific(g_pth_key_worker); -} +dap_worker_t *dap_worker_get_current(); #define dap_worker_get_auto dap_events_worker_get_auto int dap_worker_add_events_socket_unsafe(dap_worker_t *a_worker, dap_events_socket_t *a_esocket); diff --git a/net/client/dap_client_http.c b/net/client/dap_client_http.c index 6fe51c3bbfeefe52d7efdcf986294bf63e57cd9d..66aca4e1d577fd729edf0e38de22d6018652cc41 100644 --- a/net/client/dap_client_http.c +++ b/net/client/dap_client_http.c @@ -415,27 +415,12 @@ static void s_http_read(dap_events_socket_t * a_es, void * arg) */ static void s_http_error(dap_events_socket_t * a_es, int a_errno) { - if (a_es == NULL) { - log_it(L_ERROR,"Esocket is NULL for s_http_error"); - return; - } - char l_errbuf[128]; - l_errbuf[0] = '\0'; - if (a_errno == ETIMEDOUT){ - strncpy(l_errbuf,"Connection timeout", sizeof (l_errbuf)-1); - }else if (a_errno == ECONNREFUSED){ - strncpy(l_errbuf,"Host is down", sizeof (l_errbuf)-1); - }else if (a_errno == EHOSTUNREACH){ - strncpy(l_errbuf,"No route to host", sizeof (l_errbuf)-1); - }else if(a_errno) - strerror_r(a_errno, l_errbuf, sizeof (l_errbuf)); - else - strncpy(l_errbuf,"Unknown Error", sizeof (l_errbuf)-1); - - if (a_es->flags & DAP_SOCK_CONNECTING){ - log_it(L_WARNING, "Socket %"DAP_FORMAT_SOCKET" connecting error: %s (code %d)" , a_es->socket, l_errbuf, a_errno); - }else - log_it(L_WARNING, "Socket %"DAP_FORMAT_SOCKET" error: %s (code %d)", a_es->socket, l_errbuf, a_errno); + if (!a_es) + return log_it(L_ERROR, "s_http_error: es is null!"); + + log_it( L_WARNING, "Socket %"DAP_FORMAT_SOCKET" %serror %d: %s", + a_es->socket, a_es->flags & DAP_SOCK_CONNECTING ? "connecting " : "", + a_errno, dap_strerror(a_errno) ); dap_client_http_t * l_client_http = DAP_CLIENT_HTTP(a_es); diff --git a/net/client/dap_client_pvt.c b/net/client/dap_client_pvt.c index 483a908eb550501a003c3de24c13faede35940a8..eb7aea285c6596fa5e7380787972f29172875a18 100644 --- a/net/client/dap_client_pvt.c +++ b/net/client/dap_client_pvt.c @@ -1394,7 +1394,7 @@ static void s_stream_es_callback_error(dap_events_socket_t * a_es, int a_error) dap_client_pvt_t *l_client_pvt = DAP_CLIENT_PVT(l_client); log_it(L_WARNING, "STREAM error %d: \"%s\"", a_error, dap_strerror(a_error)); #ifdef DAP_OS_WINDOWS - if (a_error == ERROR_SEM_TIMEOUT) + if (a_error == WSAETIMEDOUT || a_error == ERROR_SEM_TIMEOUT) a_error = ETIMEDOUT; #endif l_client_pvt->last_error = a_error == ETIMEDOUT