diff --git a/CMakeLists.txt b/CMakeLists.txt index e0a037830865a3f790808a31a24ecc48b886f841..a8b1295351b04566809b680cf7ecd707c94c4b62 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -51,7 +51,7 @@ if (CELLFRAME_MODULES MATCHES "srv-stake") endif() if (CELLFRAME_MODULES MATCHES "core") - SET(DAPSDK_MODULES "${DAPSDK_MODULES} core crypto") + SET(DAPSDK_MODULES "${DAPSDK_MODULES} core crypto io") endif() @@ -61,7 +61,7 @@ endif() if (CELLFRAME_MODULES MATCHES "dap-sdk-net-client") set(DAPSDK_MODULES "core crypto network-core network-client network-server") - set(CELLFRAME_LIBS ${CELLFRAME_LIBS} dap_core dap_crypto dap_server_core dap_server dap_client m pthread) + set(CELLFRAME_LIBS ${CELLFRAME_LIBS} dap_core dap_crypto dap_io dap_server dap_client m pthread) if (SUPPORT_PYTHON_PLUGINS) set(CELLFRAME_MODULES "${CELLFRAME_MODULES} core chains network cs-none srv-") endif() @@ -129,7 +129,7 @@ endif() # Networking if (CELLFRAME_MODULES MATCHES "network") message("[+] Module 'network'") - set(CELLFRAME_LIBS ${CELLFRAME_LIBS} dap_server_core dap_json_rpc dap_enc_server dap_notify_srv dap_http_server dap_session + set(CELLFRAME_LIBS ${CELLFRAME_LIBS} dap_io dap_json_rpc dap_enc_server dap_notify_srv dap_http_server dap_session dap_stream dap_stream_ch dap_client dap_stream_ch_chain dap_stream_ch_chain_net dap_chain_net dap_chain_net_srv dap_stream_ch_chain_voting dap_chain_mempool magic) endif() diff --git a/dap-sdk/CMakeLists.txt b/dap-sdk/CMakeLists.txt index 9a6f3fa6394bd90cbc3d3d93c2aae300b843a209..0781b5dec7acebc17ae9a3b356220bb8efc7f688 100644 --- a/dap-sdk/CMakeLists.txt +++ b/dap-sdk/CMakeLists.txt @@ -14,13 +14,18 @@ if (DAPSDK_MODULES MATCHES "crypto") add_subdirectory(crypto) endif() +# I/O subsystem +if (DAPSDK_MODULES MATCHES "io") + add_subdirectory(io) +endif() + # Networking core if (DAPSDK_MODULES MATCHES "network-core") - add_subdirectory(net/core) add_subdirectory(net/stream) endif() if (DAPSDK_MODULES MATCHES "network-pure") + add_subdirectory(io) add_subdirectory(net/core) endif() diff --git a/dap-sdk/net/core/CMakeLists.txt b/dap-sdk/io/CMakeLists.txt similarity index 68% rename from dap-sdk/net/core/CMakeLists.txt rename to dap-sdk/io/CMakeLists.txt index ca3fe8666b0f24b9575ffa8e9454e9e720e8bcfd..fc87bf0605419e5222abfae71c2cbc18dffcfa2e 100644 --- a/dap-sdk/net/core/CMakeLists.txt +++ b/dap-sdk/io/CMakeLists.txt @@ -1,23 +1,23 @@ cmake_minimum_required(VERSION 3.10) -project (dap_server_core C) +project (dap_io C) set(CMAKE_C_STANDARD 11) add_definitions ("-D_GNU_SOURCE") if(WIN32) - file(GLOB DAP_SERVER_CORE_SOURCES *.c ../../../3rdparty/wepoll/*.c) - file(GLOB DAP_SERVER_CORE_HEADERS include/*.h ../../../3rdparty/wepoll/*.h) + file(GLOB DAP_IO_SOURCES *.c ../../../3rdparty/wepoll/*.c) + file(GLOB DAP_IO_HEADERS include/*.h ../../../3rdparty/wepoll/*.h) else() - file(GLOB DAP_SERVER_CORE_SOURCES *.c) - file(GLOB DAP_SERVER_CORE_HEADERS include/*.h) + file(GLOB DAP_IO_SOURCES *.c) + file(GLOB DAP_IO_HEADERS include/*.h) endif() if(WIN32) include_directories(../../../modules/net/win32) endif() -add_library(${PROJECT_NAME} STATIC ${DAP_SERVER_CORE_HEADERS} ${DAP_SERVER_CORE_SOURCES}) +add_library(${PROJECT_NAME} STATIC ${DAP_IO_HEADERS} ${DAP_IO_SOURCES}) if(DAPSDK_MODULES MATCHES "ssl-support") target_link_libraries(${PROJECT_NAME} dap_core dap_crypto wolfssl) @@ -40,7 +40,7 @@ if (WIN32) target_include_directories(${PROJECT_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../../../3rdparty/wepoll) endif() -if (${BUILD_DAP_SERVER_CORE_TESTS} MATCHES ON) +if (${BUILD_DAP_IO_TESTS} MATCHES ON) enable_testing() add_subdirectory(test) endif() diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/io/dap_context.c similarity index 67% rename from dap-sdk/net/core/dap_worker.c rename to dap-sdk/io/dap_context.c index e25a00b357dc4720c8db982e7656d8b3b38fe8c3..7c0b353e15fd0b501a8473a1618aec79b958a5a1 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/io/dap_context.c @@ -2,7 +2,7 @@ * Authors: * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> * DeM Labs Ltd. https://demlabs.net - * Copyright (c) 2017 + * Copyright (c) 2022 * All rights reserved. This file is part of DAP SDK the open source project @@ -20,9 +20,7 @@ You should have received a copy of the GNU General Public License along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>. */ -#include <time.h> -#include <errno.h> -#include <unistd.h> + #if ! defined (_GNU_SOURCE) #define _GNU_SOURCE /* See feature_test_macros(7) */ #endif @@ -45,43 +43,20 @@ #endif +#define LOG_TAG "dap_context" + #include "dap_common.h" -#include "dap_config.h" -#include "dap_math_ops.h" +#include "dap_context.h" #include "dap_worker.h" -#include "dap_timerfd.h" -#include "dap_events.h" -#include "dap_enc_base64.h" -#include "dap_proc_queue.h" - -#ifndef DAP_NET_CLIENT_NO_SSL -#include <wolfssl/options.h> -#include "wolfssl/ssl.h" -#endif -#define LOG_TAG "dap_worker" - -static time_t s_connection_timeout = 60; // seconds - -static bool s_socket_all_check_activity( void * a_arg); -static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg); -static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg); -static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_arg); -static void s_queue_callback_callback( dap_events_socket_t * a_es, void * a_arg); -static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg); -static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags); +pthread_key_t g_dap_context_pth_key; /** - * @brief dap_worker_init - * @param a_threads_count - * @param conn_timeout + * @brief dap_context_init * @return */ -int dap_worker_init( size_t a_conn_timeout ) +int dap_context_init() { - if ( a_conn_timeout ) - s_connection_timeout = a_conn_timeout; - #ifdef DAP_OS_UNIX struct rlimit l_fdlimit; if (getrlimit(RLIMIT_NOFILE, &l_fdlimit)) @@ -96,101 +71,75 @@ int dap_worker_init( size_t a_conn_timeout ) return 0; } -void dap_worker_deinit( ) +/** + * @brief dap_context_new + * @return + */ +dap_context_t * dap_context_new() { + dap_context_t * l_context = DAP_NEW_Z(dap_context_t); + static uint32_t s_context_id_max = 0; + l_context->id = s_context_id_max; + s_context_id_max++; + return l_context; } /** - * @brief dap_worker_thread - * @param arg + * @brief dap_context_thread_init + * @param a_context * @return */ -void *dap_worker_thread(void *arg) +int dap_context_thread_init(dap_context_t * a_context) { - dap_events_socket_t *l_cur; - dap_worker_t *l_worker = (dap_worker_t *) arg; - uint32_t l_tn = l_worker->id; - int l_errno = 0, l_selected_sockets; - socklen_t l_error_len = sizeof(l_errno); - char l_error_buf[128] = {0}; - ssize_t l_bytes_sent = 0, l_bytes_read = 0, l_sockets_max; - const struct sched_param l_shed_params = {0}; - + pthread_setspecific(g_dap_context_pth_key, a_context); - dap_cpu_assign_thread_on(l_worker->id); - pthread_setspecific(l_worker->events->pth_key_worker, l_worker); +#if defined(DAP_EVENTS_CAPS_KQUEUE) + a_context->kqueue_fd = kqueue(); -#ifdef DAP_OS_WINDOWS - if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL)) - log_it(L_ERROR, "Couldn'r set thread priority, err: %lu", GetLastError()); -#else - pthread_setschedparam(pthread_self(),SCHED_FIFO ,&l_shed_params); -#endif - -#ifdef DAP_EVENTS_CAPS_EPOLL - struct epoll_event l_epoll_events[ DAP_EVENTS_SOCKET_MAX]= {{0}}; - log_it(L_INFO, "Worker #%d started with epoll fd %"DAP_FORMAT_HANDLE" and assigned to dedicated CPU unit", l_worker->id, l_worker->epoll_fd); -#elif defined(DAP_EVENTS_CAPS_KQUEUE) - l_worker->kqueue_fd = kqueue(); - - if (l_worker->kqueue_fd == -1 ){ - int l_errno = errno; - char l_errbuf[255]; - strerror_r(l_errno,l_errbuf,sizeof(l_errbuf)); - log_it (L_CRITICAL,"Can't create kqueue(): '%s' code %d",l_errbuf,l_errno); - pthread_cond_broadcast(&l_worker->started_cond); - return NULL; + if (a_context->kqueue_fd == -1 ){ + int l_errno = errno; + char l_errbuf[255]; + strerror_r(l_errno,l_errbuf,sizeof(l_errbuf)); + log_it (L_CRITICAL,"Can't create kqueue(): '%s' code %d",l_errbuf,l_errno); + return -1; } - l_worker->kqueue_events_selected_count_max = 100; - l_worker->kqueue_events_count_max = DAP_EVENTS_SOCKET_MAX; - l_worker->kqueue_events_selected = DAP_NEW_Z_SIZE(struct kevent, l_worker->kqueue_events_selected_count_max *sizeof(struct kevent)); + a_context->kqueue_events_selected_count_max = 100; + a_context->kqueue_events_count_max = DAP_EVENTS_SOCKET_MAX; + a_context->kqueue_events_selected = DAP_NEW_Z_SIZE(struct kevent, a_context->kqueue_events_selected_count_max *sizeof(struct kevent)); #elif defined(DAP_EVENTS_CAPS_POLL) - l_worker->poll_count_max = DAP_EVENTS_SOCKET_MAX; - l_worker->poll = DAP_NEW_Z_SIZE(struct pollfd,l_worker->poll_count_max*sizeof (struct pollfd)); - l_worker->poll_esocket = DAP_NEW_Z_SIZE(dap_events_socket_t*,l_worker->poll_count_max*sizeof (dap_events_socket_t*)); + a_context->poll_count_max = DAP_EVENTS_SOCKET_MAX; + a_context->poll = DAP_NEW_Z_SIZE(struct pollfd,a_context->poll_count_max*sizeof (struct pollfd)); + a_context->poll_esocket = DAP_NEW_Z_SIZE(dap_events_socket_t*,a_context->poll_count_max*sizeof (dap_events_socket_t*)); #else #error "Unimplemented socket array for this platform" #endif + return 0; +} - l_worker->queue_es_new_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); - l_worker->queue_es_delete_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); - l_worker->queue_es_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); - l_worker->queue_es_reassign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); - - - l_worker->queue_es_new = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_add_es_callback); - l_worker->queue_es_delete = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_delete_es_callback); - l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_es_io_callback); - l_worker->queue_es_reassign = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_es_reassign_callback ); - - - for( size_t n = 0; n < dap_events_worker_get_count(); n++) { - l_worker->queue_es_new_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_new); - l_worker->queue_es_delete_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_delete); - l_worker->queue_es_io_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_io); - l_worker->queue_es_reassign_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_reassign); - } - - l_worker->queue_callback = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_callback_callback); - l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback); +/** + * @brief dap_context_thread_loop + * @param a_context + * @return + */ +int dap_context_thread_loop(dap_context_t * a_context) +{ + int l_errno = 0, l_selected_sockets = 0; + dap_events_socket_t *l_cur = NULL; - l_worker->timer_check_activity = dap_timerfd_create(s_connection_timeout * 1000 / 2, - s_socket_all_check_activity, l_worker); - dap_worker_add_events_socket_unsafe( l_worker->timer_check_activity->events_socket, l_worker); - pthread_mutex_lock(&l_worker->started_mutex); - pthread_cond_broadcast(&l_worker->started_cond); - pthread_mutex_unlock(&l_worker->started_mutex); + socklen_t l_error_len = sizeof(l_errno); + char l_error_buf[128] = {0}; + ssize_t l_bytes_sent = 0, l_bytes_read = 0, l_sockets_max; - while (1) { + do { #ifdef DAP_EVENTS_CAPS_EPOLL - l_selected_sockets = epoll_wait(l_worker->epoll_fd, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1); + l_selected_sockets = epoll_wait(a_context->epoll_fd, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1); l_sockets_max = l_selected_sockets; #elif defined(DAP_EVENTS_CAPS_POLL) - l_selected_sockets = poll(l_worker->poll, l_worker->poll_count, -1); - l_sockets_max = l_worker->poll_count; + l_selected_sockets = poll(a_context->poll, a_context->poll_count, -1); + l_sockets_max = a_context->poll_count; #elif defined(DAP_EVENTS_CAPS_KQUEUE) - l_selected_sockets = kevent(l_worker->kqueue_fd,NULL,0,l_worker->kqueue_events_selected,l_worker->kqueue_events_selected_count_max, + l_selected_sockets = kevent(a_context->kqueue_fd,NULL,0,a_context->kqueue_events_selected,a_context->kqueue_events_selected_count_max, NULL); l_sockets_max = l_selected_sockets; #else @@ -200,10 +149,10 @@ void *dap_worker_thread(void *arg) if( errno == EINTR) continue; #ifdef DAP_OS_WINDOWS - log_it(L_ERROR, "Worker thread %d got errno %d", l_worker->id, WSAGetLastError()); + log_it(L_ERROR, "Context thread %d got errno %d", a_context->id, WSAGetLastError()); #else strerror_r(l_errno, l_error_buf, sizeof (l_error_buf) - 1); - log_it(L_ERROR, "Worker thread %d got errno:\"%s\" (%d)", l_worker->id, l_error_buf, l_errno); + log_it(L_ERROR, "Context thread %d got errno:\"%s\" (%d)", a_context->id, l_error_buf, l_errno); assert(l_errno); #endif break; @@ -225,9 +174,9 @@ void *dap_worker_thread(void *arg) l_flag_nval = false; l_flag_msg = false; #elif defined ( DAP_EVENTS_CAPS_POLL) - short l_cur_flags =l_worker->poll[n].revents; + short l_cur_flags =a_context->poll[n].revents; - if (l_worker->poll[n].fd == -1) // If it was deleted on previous iterations + if (a_context->poll[n].fd == -1) // If it was deleted on previous iterations continue; if (!l_cur_flags) // No events for this socket @@ -241,11 +190,11 @@ void *dap_worker_thread(void *arg) l_flag_nval = l_cur_flags & POLLNVAL; l_flag_pri = l_cur_flags & POLLPRI; l_flag_msg = l_cur_flags & POLLMSG; - l_cur = l_worker->poll_esocket[n]; - //log_it(L_DEBUG, "flags: returned events 0x%0X requested events 0x%0X",l_worker->poll[n].revents,l_worker->poll[n].events ); + l_cur = a_context->poll_esocket[n]; + //log_it(L_DEBUG, "flags: returned events 0x%0X requested events 0x%0X",a_context->poll[n].revents,a_context->poll[n].events ); #elif defined (DAP_EVENTS_CAPS_KQUEUE) l_flag_hup=l_flag_rdhup=l_flag_read=l_flag_write=l_flag_error=l_flag_nval=l_flag_msg =l_flag_pri = false; - struct kevent * l_kevent_selected = &l_worker->kqueue_events_selected[n]; + struct kevent * l_kevent_selected = &a_context->kqueue_events_selected[n]; if ( l_kevent_selected->filter == EVFILT_USER){ // If we have USER event it sends little different pointer dap_events_socket_w_data_t * l_es_w_data = (dap_events_socket_w_data_t *) l_kevent_selected->udata; //if(g_debug_reactor) @@ -273,7 +222,9 @@ void *dap_worker_thread(void *arg) case EVFILT_READ: l_flag_read = true; break; case EVFILT_WRITE: l_flag_write = true; break; case EVFILT_EXCEPT : l_flag_rdhup = true; break; - default: log_it(L_CRITICAL,"Unknown filter type in polling, exit thread"); return NULL; + default: + log_it(L_CRITICAL,"Unknown filter type in polling, exit thread"); + return -1; } if (l_kevent_selected->flags & EV_EOF) l_flag_rdhup = true; @@ -296,13 +247,13 @@ void *dap_worker_thread(void *arg) #else #error "Unimplemented fetch esocket after poll" #endif - if(!l_cur || (l_cur->worker && l_cur->worker != l_worker)) { + if(!l_cur || (l_cur->context && l_cur->context != a_context)) { log_it(L_WARNING, "dap_events_socket was destroyed earlier"); continue; } if(g_debug_reactor) { - log_it(L_DEBUG, "--Worker #%u esocket %p uuid 0x%016"DAP_UINT64_FORMAT_x" type %d fd=%"DAP_FORMAT_SOCKET" flags=0x%0X (%s:%s:%s:%s:%s:%s:%s:%s)--", - l_worker->id, l_cur, l_cur->uuid, l_cur->type, l_cur->socket, + log_it(L_DEBUG, "--Context #%u esocket %p uuid 0x%016"DAP_UINT64_FORMAT_x" type %d fd=%"DAP_FORMAT_SOCKET" flags=0x%0X (%s:%s:%s:%s:%s:%s:%s:%s)--", + a_context->id, l_cur, l_cur->uuid, l_cur->type, l_cur->socket, l_cur_flags, l_flag_read?"read":"", l_flag_write?"write":"", l_flag_error?"error":"", l_flag_hup?"hup":"", l_flag_rdhup?"rdhup":"", l_flag_msg?"msg":"", l_flag_nval?"nval":"", l_flag_pri?"pri":""); @@ -523,7 +474,7 @@ void *dap_worker_thread(void *arg) log_it(L_DEBUG, "Received %zd bytes for fd %d ", l_bytes_read, l_cur->fd); if(l_cur->callbacks.read_callback){ l_cur->callbacks.read_callback(l_cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well - if (l_cur->worker == NULL ){ // esocket was unassigned in callback, we don't need any ops with it now, + if (l_cur->context == NULL ){ // esocket was unassigned in callback, we don't need any ops with it now, // continue to poll another esockets continue; } @@ -606,7 +557,7 @@ void *dap_worker_thread(void *arg) l_cur->flags ^= DAP_SOCK_CONNECTING; if (l_cur->callbacks.connected_callback) l_cur->callbacks.connected_callback(l_cur); - dap_events_socket_worker_poll_update_unsafe(l_cur); + dap_context_poll_update(l_cur); } #endif } else { @@ -627,7 +578,7 @@ void *dap_worker_thread(void *arg) l_cur->flags ^= DAP_SOCK_CONNECTING; if (l_cur->callbacks.connected_callback) l_cur->callbacks.connected_callback(l_cur); - dap_events_socket_worker_poll_update_unsafe(l_cur); + dap_context_poll_update(l_cur); } } } @@ -652,7 +603,7 @@ void *dap_worker_thread(void *arg) if (l_cur->callbacks.write_callback) l_cur->callbacks.write_callback(l_cur, NULL); /* Call callback to process write event */ - if ( l_cur->worker && l_flag_write ){ // esocket wasn't unassigned in callback, we need some other ops with it + if ( l_cur->context && l_flag_write ){ // esocket wasn't unassigned in callback, we need some other ops with it switch (l_cur->type){ case DESCRIPTOR_TYPE_SOCKET_CLIENT: { l_bytes_sent = send(l_cur->socket, (const char *)l_cur->buf_out, @@ -736,7 +687,7 @@ void *dap_worker_thread(void *arg) l_es_w_data->esocket = l_cur; memcpy(&l_es_w_data->ptr, l_cur->buf_out,sizeof(l_cur)); EV_SET(l_event,l_cur->socket, l_cur->kqueue_base_filter,l_cur->kqueue_base_flags, l_cur->kqueue_base_fflags,l_cur->kqueue_data, l_es_w_data); - int l_n = kevent(l_worker->kqueue_fd,l_event,1,NULL,0,NULL); + int l_n = kevent(a_context->kqueue_fd,l_event,1,NULL,0,NULL); if (l_n == 1){ l_bytes_sent = sizeof(l_cur); }else{ @@ -818,18 +769,18 @@ void *dap_worker_thread(void *arg) { if (l_cur->buf_out_size == 0) { if(g_debug_reactor) - log_it(L_INFO, "Process signal to close %s sock %"DAP_FORMAT_SOCKET" (ptr 0x%p uuid 0x%016"DAP_UINT64_FORMAT_x") type %d [thread %u]", + log_it(L_INFO, "Process signal to close %s sock %"DAP_FORMAT_SOCKET" (ptr 0x%p uuid 0x%016"DAP_UINT64_FORMAT_x") type %d [context #%u]", l_cur->remote_addr_str ? l_cur->remote_addr_str : "", l_cur->socket, l_cur, l_cur->uuid, - l_cur->type, l_tn); + l_cur->type, a_context->id); for (ssize_t nn = n + 1; nn < l_sockets_max; nn++) { // Check for current selection if it has event duplication dap_events_socket_t *l_es_selected = NULL; #ifdef DAP_EVENTS_CAPS_EPOLL l_es_selected = (dap_events_socket_t *) l_epoll_events[nn].data.ptr; #elif defined ( DAP_EVENTS_CAPS_POLL) - l_es_selected = l_worker->poll_esocket[nn]; + l_es_selected = a_context->poll_esocket[nn]; #elif defined (DAP_EVENTS_CAPS_KQUEUE) - struct kevent * l_kevent_selected = &l_worker->kqueue_events_selected[n]; + struct kevent * l_kevent_selected = &a_context->kqueue_events_selected[n]; if ( l_kevent_selected->filter == EVFILT_USER){ // If we have USER event it sends little different pointer dap_events_socket_w_data_t * l_es_w_data = (dap_events_socket_w_data_t *) l_kevent_selected->udata; l_es_selected = l_es_w_data->esocket; @@ -851,21 +802,16 @@ void *dap_worker_thread(void *arg) //dap_events_socket_remove_and_delete_unsafe( l_cur, false); dap_events_remove_and_delete_socket_unsafe(dap_events_get_default(), l_cur, false); #ifdef DAP_EVENTS_CAPS_KQUEUE - l_worker->kqueue_events_count--; + a_context->kqueue_events_count--; #endif } else if (l_cur->buf_out_size ) { if(g_debug_reactor) - log_it(L_INFO, "Got signal to close %s sock %"DAP_FORMAT_SOCKET" [thread %u] type %d but buffer is not empty(%zu)", - l_cur->remote_addr_str ? l_cur->remote_addr_str : "", l_cur->socket, l_cur->type, l_tn, + log_it(L_INFO, "Got signal to close %s sock %"DAP_FORMAT_SOCKET" [context #%u] type %d but buffer is not empty(%zu)", + l_cur->remote_addr_str ? l_cur->remote_addr_str : "", l_cur->socket, l_cur->type, a_context->id, l_cur->buf_out_size); } } - if( l_worker->signal_exit){ - log_it(L_ATT,"Worker :%u finished", l_worker->id); - return NULL; - } - } #ifdef DAP_EVENTS_CAPS_POLL /***********************************************************/ @@ -873,312 +819,147 @@ void *dap_worker_thread(void *arg) /* to squeeze together the array and decrement the number */ /* of file descriptors. */ /***********************************************************/ - if ( l_worker->poll_compress){ - l_worker->poll_compress = false; - for (size_t i = 0; i < l_worker->poll_count ; i++) { - if ( l_worker->poll[i].fd == -1){ - if( l_worker->poll_count){ - for(size_t j = i; j < l_worker->poll_count-1; j++){ - l_worker->poll[j].fd = l_worker->poll[j+1].fd; - l_worker->poll[j].events = l_worker->poll[j+1].events; - l_worker->poll[j].revents = l_worker->poll[j+1].revents; - l_worker->poll_esocket[j] = l_worker->poll_esocket[j+1]; - if(l_worker->poll_esocket[j]) - l_worker->poll_esocket[j]->poll_index = j; + if ( a_context->poll_compress){ + a_context->poll_compress = false; + for (size_t i = 0; i < a_context->poll_count ; i++) { + if ( a_context->poll[i].fd == -1){ + if( a_context->poll_count){ + for(size_t j = i; j < a_context->poll_count-1; j++){ + a_context->poll[j].fd = a_context->poll[j+1].fd; + a_context->poll[j].events = a_context->poll[j+1].events; + a_context->poll[j].revents = a_context->poll[j+1].revents; + a_context->poll_esocket[j] = a_context->poll_esocket[j+1]; + if(a_context->poll_esocket[j]) + a_context->poll_esocket[j]->poll_index = j; } } i--; - l_worker->poll_count--; + a_context->poll_count--; } } } #endif - } // while - log_it(L_NOTICE,"Exiting thread #%u", l_worker->id); - return NULL; -} - -/** - * @brief s_new_es_callback - * @param a_es - * @param a_arg - */ -static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg) -{ - dap_worker_t * l_worker = a_es->worker; - dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg; - if (!l_es_new){ - log_it(L_ERROR,"NULL esocket accepted to add on worker #%u", l_worker->id); - return; - } - - if(g_debug_reactor) - log_it(L_NOTICE, "Received event socket %p (ident %"DAP_FORMAT_SOCKET" type %d) to add on worker", l_es_new, l_es_new->socket, l_es_new->type); - - switch( l_es_new->type){ - case DESCRIPTOR_TYPE_SOCKET_UDP: break; - case DESCRIPTOR_TYPE_SOCKET_CLIENT: break; - default:{} - } - -#ifdef DAP_EVENTS_CAPS_KQUEUE - if(l_es_new->socket!=0 && l_es_new->socket != -1 && - l_es_new->type != DESCRIPTOR_TYPE_EVENT && - l_es_new->type != DESCRIPTOR_TYPE_QUEUE && - l_es_new->type != DESCRIPTOR_TYPE_TIMER - ) -#else - if(l_es_new->socket!=0 && l_es_new->socket != INVALID_SOCKET) - -#endif - if(dap_worker_esocket_find_uuid( l_worker, l_es_new->uuid)){ - // Socket already present in worker, it's OK - return; - } - - switch( l_es_new->type){ - - case DESCRIPTOR_TYPE_SOCKET_UDP: - case DESCRIPTOR_TYPE_SOCKET_CLIENT: - case DESCRIPTOR_TYPE_SOCKET_LISTENING:{ + } while(!a_context->signal_exit); -#ifdef DAP_OS_UNIX -#if defined (SO_INCOMING_CPU) - int l_cpu = l_worker->id; - setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu)); -#endif -#endif - } break; - default: {} - } - - l_es_new->worker = l_worker; - l_es_new->last_time_active = time(NULL); - // We need to differ new and reassigned esockets. If its new - is_initialized is false - if ( ! l_es_new->is_initalized ){ - if (l_es_new->callbacks.new_callback) - l_es_new->callbacks.new_callback(l_es_new, NULL); - l_es_new->is_initalized = true; - } - - int l_ret = dap_worker_add_events_socket_unsafe(l_es_new,l_worker); - if ( l_ret != 0 ){ - log_it(L_CRITICAL,"Can't add event socket's handler to worker i/o poll mechanism with error %d", errno); - }else{ - // Add in worker - l_es_new->me = l_es_new; - if (l_es_new->socket!=0 && l_es_new->socket != INVALID_SOCKET){ - pthread_rwlock_wrlock(&l_worker->esocket_rwlock); - HASH_ADD(hh_worker, l_worker->esockets, uuid, sizeof(l_es_new->uuid), l_es_new ); - l_worker->event_sockets_count++; - pthread_rwlock_unlock(&l_worker->esocket_rwlock); - } - //log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id); - if (l_es_new->callbacks.worker_assign_callback) - l_es_new->callbacks.worker_assign_callback(l_es_new, l_worker); - - } + log_it(L_ATT,"Context :%u finished", a_context->id); + return 0; } -/** - * @brief s_delete_es_callback - * @param a_es - * @param a_arg - */ -static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg) -{ - assert(a_arg); - dap_events_socket_uuid_t * l_es_uuid_ptr = (dap_events_socket_uuid_t*) a_arg; - dap_events_socket_t * l_es; - if ( (l_es = dap_worker_esocket_find_uuid(a_es->worker,*l_es_uuid_ptr)) != NULL ){ - //l_es->flags |= DAP_SOCK_SIGNAL_CLOSE; // Send signal to socket to kill - dap_events_socket_remove_and_delete_unsafe(l_es,false); - }else - log_it(L_INFO, "While we were sending the delete() message, esocket %"DAP_UINT64_FORMAT_U" has been disconnected ", *l_es_uuid_ptr); - DAP_DELETE(l_es_uuid_ptr); -} /** - * @brief s_reassign_es_callback - * @param a_es - * @param a_arg + * @brief dap_context_poll_update + * @param a_esocket */ -static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_arg) +void dap_context_poll_update(dap_events_socket_t * a_esocket) { - assert(a_es); - dap_worker_t * l_worker = a_es->worker; - assert(l_worker); - dap_worker_msg_reassign_t * l_msg = (dap_worker_msg_reassign_t*) a_arg; - assert(l_msg); - dap_events_socket_t * l_es_reassign; - if ( ( l_es_reassign = dap_worker_esocket_find_uuid(l_worker, l_msg->esocket_uuid))!= NULL ){ - if( l_es_reassign->was_reassigned && l_es_reassign->flags & DAP_SOCK_REASSIGN_ONCE) { - log_it(L_INFO, "Reassgment request with DAP_SOCK_REASSIGN_ONCE allowed only once, declined reassigment from %u to %u", - l_es_reassign->worker->id, l_msg->worker_new->id); - - }else{ - dap_events_socket_reassign_between_workers_unsafe(l_es_reassign,l_msg->worker_new); - } - }else{ - log_it(L_INFO, "While we were sending the reassign message, esocket %p has been disconnected", l_msg->esocket); - } - DAP_DELETE(l_msg); -} + #if defined (DAP_EVENTS_CAPS_EPOLL) + int events = a_esocket->ev_base_flags | EPOLLERR; -/** - * @brief s_queue_callback - * @param a_es - * @param a_arg - */ -static void s_queue_callback_callback( dap_events_socket_t * a_es, void * a_arg) -{ - dap_worker_msg_callback_t * l_msg = (dap_worker_msg_callback_t *) a_arg; - assert(l_msg); - assert(l_msg->callback); - l_msg->callback(a_es->worker, l_msg->arg); - DAP_DELETE(l_msg); -} + // Check & add + if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) + events |= EPOLLIN; -/** - * @brief s_event_exit_callback - * @param a_es - * @param a_flags - */ -static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags) -{ - (void) a_flags; - a_es->worker->signal_exit = true; - if(g_debug_reactor) - log_it(L_DEBUG, "Worker :%u signaled to exit", a_es->worker->id); -} + if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE || a_esocket->flags &DAP_SOCK_CONNECTING ) + events |= EPOLLOUT; -/** - * @brief s_pipe_data_out_read_callback - * @param a_es - * @param a_arg - */ -static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg) -{ - assert(a_es); - dap_worker_t * l_worker = a_es->worker; - dap_worker_msg_io_t * l_msg = a_arg; - assert(l_msg); - // Check if it was removed from the list - dap_events_socket_t *l_msg_es = dap_worker_esocket_find_uuid(l_worker, l_msg->esocket_uuid); - if ( l_msg_es == NULL){ - log_it(L_INFO, "We got i/o message for esocket %"DAP_UINT64_FORMAT_U" thats now not in list. Lost %zu data", l_msg->esocket_uuid, l_msg->data_size); - DAP_DELETE(l_msg); - return; - } + a_esocket->ev.events = events; - if (l_msg->flags_set & DAP_SOCK_CONNECTING) - if (! (l_msg_es->flags & DAP_SOCK_CONNECTING) ){ - l_msg_es->flags |= DAP_SOCK_CONNECTING; - dap_events_socket_worker_poll_update_unsafe(l_msg_es); + if( a_esocket->context){ + if ( epoll_ctl(a_esocket->context->epoll_fd, EPOLL_CTL_MOD, a_esocket->socket, &a_esocket->ev) ){ +#ifdef DAP_OS_WINDOWS + int l_errno = WSAGetLastError(); +#else + int l_errno = errno; +#endif + char l_errbuf[128]; + l_errbuf[0]=0; + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + log_it(L_ERROR,"Can't update client socket state in the epoll_fd %"DAP_FORMAT_HANDLE": \"%s\" (%d)", + a_esocket->context->epoll_fd, l_errbuf, l_errno); + } } - - if (l_msg->flags_set & DAP_SOCK_CONNECTING) - if (! (l_msg_es->flags & DAP_SOCK_CONNECTING) ){ - l_msg_es->flags ^= DAP_SOCK_CONNECTING; - dap_events_socket_worker_poll_update_unsafe(l_msg_es); + #elif defined (DAP_EVENTS_CAPS_POLL) + if( a_esocket->context && a_esocket->is_initalized){ + if (a_esocket->poll_index < a_esocket->context->poll_count ){ + struct pollfd * l_poll = &a_esocket->context->poll[a_esocket->poll_index]; + l_poll->events = a_esocket->poll_base_flags | POLLERR ; + // Check & add + if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) + l_poll->events |= POLLIN; + if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE || a_esocket->flags &DAP_SOCK_CONNECTING ) + l_poll->events |= POLLOUT; + }else{ + log_it(L_ERROR, "Wrong poll index when remove from context (unsafe): %u when total count %u", a_esocket->poll_index, + a_esocket->context->poll_count); + } + } + #elif defined (DAP_EVENTS_CAPS_KQUEUE) + if (a_esocket->socket != -1 ){ // Not everything we add in poll + struct kevent * l_event = &a_esocket->kqueue_event; + short l_filter =a_esocket->kqueue_base_filter; + u_short l_flags =a_esocket->kqueue_base_flags; + u_int l_fflags =a_esocket->kqueue_base_fflags; + + int l_kqueue_fd = a_esocket->context? a_esocket->context->kqueue_fd : -1; + if ( l_kqueue_fd == -1 ){ + log_it(L_ERROR, "Esocket is not assigned with anything ,exit"); } - if (l_msg->flags_set & DAP_SOCK_READY_TO_READ) - dap_events_socket_set_readable_unsafe(l_msg_es, true); - if (l_msg->flags_unset & DAP_SOCK_READY_TO_READ) - dap_events_socket_set_readable_unsafe(l_msg_es, false); - if (l_msg->flags_set & DAP_SOCK_READY_TO_WRITE) - dap_events_socket_set_writable_unsafe(l_msg_es, true); - if (l_msg->flags_unset & DAP_SOCK_READY_TO_WRITE) - dap_events_socket_set_writable_unsafe(l_msg_es, false); - if (l_msg->data_size && l_msg->data) { - dap_events_socket_write_unsafe(l_msg_es, l_msg->data,l_msg->data_size); - DAP_DELETE(l_msg->data); - } - DAP_DELETE(l_msg); -} - -/** - * @brief s_socket_all_check_activity - * @param a_arg - */ -static bool s_socket_all_check_activity( void * a_arg) -{ - dap_worker_t *l_worker = (dap_worker_t*) a_arg; - assert(l_worker); - dap_events_socket_t *l_es = NULL, *tmp = NULL; - char l_curtimebuf[64]; - time_t l_curtime= time(NULL); - //dap_ctime_r(&l_curtime, l_curtimebuf); - //log_it(L_DEBUG,"Check sockets activity on worker #%u at %s", l_worker->id, l_curtimebuf); - pthread_rwlock_rdlock(&l_worker->esocket_rwlock); - HASH_ITER(hh_worker, l_worker->esockets, l_es, tmp ) { - pthread_rwlock_unlock(&l_worker->esocket_rwlock); - if (l_es->type == DESCRIPTOR_TYPE_SOCKET_CLIENT){ - if ( !(l_es->flags & DAP_SOCK_SIGNAL_CLOSE) && - ( l_curtime >= (l_es->last_time_active + s_connection_timeout) ) && !l_es->no_close ) { - log_it( L_INFO, "Socket %"DAP_FORMAT_SOCKET" timeout (diff %"DAP_UINT64_FORMAT_U" ), closing...", - l_es->socket, l_curtime - (time_t)l_es->last_time_active - s_connection_timeout ); - if (l_es->callbacks.error_callback) { - l_es->callbacks.error_callback(l_es, ETIMEDOUT); + // Check & add + bool l_is_error=false; + int l_errno=0; + if (a_esocket->type == DESCRIPTOR_TYPE_EVENT ){ + EV_SET(l_event, a_esocket->socket, EVFILT_USER,EV_ADD| EV_CLEAR ,0,0, &a_esocket->kqueue_event_catched_data ); + if( kevent( l_kqueue_fd,l_event,1,NULL,0,NULL) == -1){ + l_is_error = true; + l_errno = errno; + } + }else{ + EV_SET(l_event, a_esocket->socket, l_filter,l_flags| EV_ADD,l_fflags,a_esocket->kqueue_data,a_esocket); + if( a_esocket->flags & DAP_SOCK_READY_TO_READ ){ + EV_SET(l_event, a_esocket->socket, EVFILT_READ,l_flags| EV_ADD,l_fflags,a_esocket->kqueue_data,a_esocket); + if( kevent( l_kqueue_fd,l_event,1,NULL,0,NULL) == -1 ){ + l_is_error = true; + l_errno = errno; + } + } + if( !l_is_error){ + if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE || a_esocket->flags &DAP_SOCK_CONNECTING ){ + EV_SET(l_event, a_esocket->socket, EVFILT_WRITE,l_flags| EV_ADD,l_fflags,a_esocket->kqueue_data,a_esocket); + if(kevent( l_kqueue_fd,l_event,1,NULL,0,NULL) == -1){ + l_is_error = true; + l_errno = errno; + } } - dap_events_socket_remove_and_delete_unsafe(l_es,false); } } - pthread_rwlock_rdlock(&l_worker->esocket_rwlock); - } - pthread_rwlock_unlock(&l_worker->esocket_rwlock); - return true; -} + if (l_is_error && l_errno == EBADF){ + log_it(L_ATT,"Poll update: socket %d (%p ) disconnected, rise CLOSE flag to remove from queue, lost %"DAP_UINT64_FORMAT_U":%" DAP_UINT64_FORMAT_U + " bytes",a_esocket->socket,a_esocket,a_esocket->buf_in_size,a_esocket->buf_out_size); + a_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; + a_esocket->buf_in_size = a_esocket->buf_out_size = 0; // Reset everything from buffer, we close it now all + }else if ( l_is_error && l_errno != EINPROGRESS && l_errno != ENOENT){ + char l_errbuf[128]; + l_errbuf[0]=0; + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + log_it(L_ERROR,"Can't update client socket state on kqueue fd %d: \"%s\" (%d)", + l_kqueue_fd, l_errbuf, l_errno); + } + } -/** - * @brief sap_worker_add_events_socket - * @param a_events_socket - * @param a_worker - */ -void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_worker_t * a_worker) -{ -/* -#ifdef DAP_EVENTS_CAPS_KQUEUE - a_events_socket->worker = a_worker; - if(dap_worker_add_events_socket_unsafe(a_events_socket, a_worker)!=0) - a_events_socket->worker = NULL; - -#else*/ - if(g_debug_reactor) - log_it(L_DEBUG,"Worker add esocket %"DAP_FORMAT_SOCKET, a_events_socket->socket); - int l_ret = dap_events_socket_queue_ptr_send( a_worker->queue_es_new, a_events_socket ); - if(l_ret != 0 ){ - char l_errbuf[128]; - *l_errbuf = 0; - strerror_r(l_ret, l_errbuf, sizeof(l_errbuf)); - log_it(L_ERROR, "Can't send pointer in queue: \"%s\"(code %d)", l_errbuf, l_ret); - } -//#endif -} + #else + #error "Not defined dap_events_socket_set_writable_unsafe for your platform" + #endif -/** - * @brief dap_worker_add_events_socket_inter - * @param a_es_input - * @param a_events_socket - */ -void dap_worker_add_events_socket_inter(dap_events_socket_t * a_es_input, dap_events_socket_t * a_events_socket) -{ - if( dap_events_socket_queue_ptr_send_to_input( a_es_input, a_events_socket ) != 0 ){ - int l_errno = errno; - char l_errbuf[128]; - *l_errbuf = 0; - strerror_r(l_errno,l_errbuf,sizeof (l_errbuf)); - log_it(L_ERROR, "Cant send pointer to interthread queue input: \"%s\"(code %d)", l_errbuf, l_errno); - } } + /** - * @brief dap_worker_add_events_socket_unsafe - * @param a_worker + * @brief dap_context_add_events_socket_unsafe + * @param IOa_context * @param a_esocket */ -int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_worker_t * a_worker ) +int dap_context_add_esocket(dap_context_t * a_context, dap_events_socket_t * a_esocket ) { if(g_debug_reactor){ log_it(L_DEBUG,"Add event socket %p (socket %"DAP_FORMAT_SOCKET")", a_esocket, a_esocket->socket); @@ -1191,41 +972,45 @@ int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_wo if(a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) a_esocket->ev.events |= EPOLLOUT; a_esocket->ev.data.ptr = a_esocket; - return epoll_ctl(a_worker->epoll_fd, EPOLL_CTL_ADD, a_esocket->socket, &a_esocket->ev); + a_esocket->context = a_context; + return epoll_ctl(a_context->epoll_fd, EPOLL_CTL_ADD, a_esocket->socket, &a_esocket->ev); #elif defined (DAP_EVENTS_CAPS_POLL) - if ( a_worker->poll_count == a_worker->poll_count_max ){ // realloc - a_worker->poll_count_max *= 2; - log_it(L_WARNING, "Too many descriptors (%u), resizing array twice to %zu", a_worker->poll_count, a_worker->poll_count_max); - a_worker->poll =DAP_REALLOC(a_worker->poll, a_worker->poll_count_max * sizeof(*a_worker->poll)); - a_worker->poll_esocket =DAP_REALLOC(a_worker->poll_esocket, a_worker->poll_count_max * sizeof(*a_worker->poll_esocket)); + if ( a_context->poll_count == a_context->poll_count_max ){ // realloc + a_context->poll_count_max *= 2; + log_it(L_WARNING, "Too many descriptors (%u), resizing array twice to %zu", a_context->poll_count, a_context->poll_count_max); + a_context->poll =DAP_REALLOC(a_context->poll, a_context->poll_count_max * sizeof(*a_context->poll)); + a_context->poll_esocket =DAP_REALLOC(a_context->poll_esocket, a_context->poll_count_max * sizeof(*a_context->poll_esocket)); } - a_worker->poll[a_worker->poll_count].fd = a_esocket->socket; - a_esocket->poll_index = a_worker->poll_count; - a_worker->poll[a_worker->poll_count].events = a_esocket->poll_base_flags; + a_context->poll[a_context->poll_count].fd = a_esocket->socket; + a_esocket->poll_index = a_context->poll_count; + a_context->poll[a_context->poll_count].events = a_esocket->poll_base_flags; if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) - a_worker->poll[a_worker->poll_count].events |= POLLIN; + a_context->poll[a_context->poll_count].events |= POLLIN; if( (a_esocket->flags & DAP_SOCK_READY_TO_WRITE) || (a_esocket->flags & DAP_SOCK_CONNECTING) ) - a_worker->poll[a_worker->poll_count].events |= POLLOUT; + a_context->poll[a_context->poll_count].events |= POLLOUT; - a_worker->poll_esocket[a_worker->poll_count] = a_esocket; - a_worker->poll_count++; + a_context->poll_esocket[a_context->poll_count] = a_esocket; + a_context->poll_count++; + a_esocket->context = a_context; return 0; #elif defined (DAP_EVENTS_CAPS_KQUEUE) - a_esocket->worker = a_worker; if ( a_esocket->type == DESCRIPTOR_TYPE_QUEUE ){ + a_esocket->context = a_context; return 0; } if ( a_esocket->type == DESCRIPTOR_TYPE_EVENT && a_esocket->pipe_out){ + a_esocket->context = a_context; return 0; } struct kevent l_event; u_short l_flags = a_esocket->kqueue_base_flags; u_int l_fflags = a_esocket->kqueue_base_fflags; short l_filter = a_esocket->kqueue_base_filter; - int l_kqueue_fd =a_worker->kqueue_fd; + int l_kqueue_fd =a_context->kqueue_fd; if ( l_kqueue_fd == -1 ){ log_it(L_ERROR, "Esocket is not assigned with anything ,exit"); + return -1; } // Check & add bool l_is_error=false; @@ -1277,48 +1062,33 @@ int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_wo log_it(L_ERROR,"Can't update client socket state on kqueue fd %d: \"%s\" (%d)", a_esocket->socket, l_errbuf, l_errno); return l_errno; - }else + }else{ + a_esocket->context = a_context; return 0; - + } #else -#error "Unimplemented new esocket on worker callback for current platform" +#error "Unimplemented new esocket on context callback for current platform" #endif } -/** - * @brief dap_worker_exec_callback_on - */ -void dap_worker_exec_callback_on(dap_worker_t * a_worker, dap_worker_callback_t a_callback, void * a_arg) -{ - dap_worker_msg_callback_t * l_msg = DAP_NEW_Z(dap_worker_msg_callback_t); - l_msg->callback = a_callback; - l_msg->arg = a_arg; - int l_ret=dap_events_socket_queue_ptr_send( a_worker->queue_callback,l_msg ); - if(l_ret != 0 ){ - char l_errbuf[128]; - *l_errbuf = 0; - strerror_r(l_ret,l_errbuf,sizeof (l_errbuf)); - log_it(L_ERROR, "Cant send pointer in queue: \"%s\"(code %d)", l_errbuf, l_ret); - } - -} - /** - * @brief dap_worker_add_events_socket - * @param a_worker - * @param a_events_socket + * @brief dap_context_esocket_find_by_uuid + * @param a_context + * @param a_es_uuid + * @return */ -dap_worker_t *dap_worker_add_events_socket_auto( dap_events_socket_t *a_es) +dap_events_socket_t *dap_context_esocket_find_by_uuid(dap_context_t * a_context, dap_events_socket_uuid_t a_es_uuid ) { -// struct epoll_event ev = {0}; - dap_worker_t *l_worker = dap_events_worker_get_auto( ); - - a_es->events = l_worker->events; - dap_worker_add_events_socket( a_es, l_worker); - return l_worker; + if(a_context == NULL){ + log_it(L_ERROR, "Worker is NULL, can't fund esocket by UUID"); + return NULL; + } + dap_events_socket_t * l_ret = NULL; + if(a_context->esockets ) { + //HASH_FIND_PTR( a_worker->context->esockets, &a_es_uuid,l_ret ); + HASH_FIND(hh_worker, a_context->esockets, &a_es_uuid, sizeof(a_es_uuid), l_ret ); + } + return l_ret; } - - - diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/io/dap_events.c similarity index 98% rename from dap-sdk/net/core/dap_events.c rename to dap-sdk/io/dap_events.c index 25a4d7fbeaf6e2ac83d8f79f19b53d0244276edd..25a58a0679cae4203feef8a806dcec16f2e3d2f2 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/io/dap_events.c @@ -93,6 +93,7 @@ typedef cpuset_t cpu_set_t; // Adopt BSD CPU setstructure to POSIX variant #include "dap_strfuncs.h" #include "dap_server.h" #include "dap_events.h" +#include "dap_context.h" #include "dap_events_socket.h" #include "dap_proc_thread.h" #include "dap_config.h" @@ -223,6 +224,12 @@ int dap_events_init( uint32_t a_threads_count, size_t a_conn_timeout ) if ( !s_workers || !s_threads ) return -1; + if(dap_context_init() != 0){ + log_it( L_CRITICAL, "Can't init client submodule dap_context( )" ); + goto err; + + } + dap_worker_init(a_conn_timeout); if ( dap_events_socket_init() != 0 ) { log_it( L_CRITICAL, "Can't init client submodule dap_events_socket_init( )" ); @@ -333,8 +340,7 @@ int dap_events_start( dap_events_t *a_events ) l_worker->id = i; l_worker->events = a_events; - l_worker->esockets = NULL; - pthread_rwlock_init(&l_worker->esocket_rwlock,NULL); + l_worker->context = dap_context_new(); pthread_mutex_init(& l_worker->started_mutex, NULL); pthread_cond_init( & l_worker->started_cond, NULL); diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/io/dap_events_socket.c similarity index 89% rename from dap-sdk/net/core/dap_events_socket.c rename to dap-sdk/io/dap_events_socket.c index 7dc6f1fe619fa29244046321e9417134b129de1d..5a0b5205f1edc85bf203484c094e3cb1b8f1c13d 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/io/dap_events_socket.c @@ -1236,10 +1236,8 @@ int dap_events_socket_queue_ptr_send_to_input(dap_events_socket_t * a_es_input, l_es_w_data->esocket = l_es; l_es_w_data->ptr = a_arg; EV_SET(&l_event,a_es_input->socket+arc4random() , EVFILT_USER,EV_ADD | EV_CLEAR | EV_ONESHOT, NOTE_FFCOPY | NOTE_TRIGGER ,0, l_es_w_data); - if(l_es->worker) - l_ret=kevent(l_es->worker->kqueue_fd,&l_event,1,NULL,0,NULL); - else if (l_es->proc_thread) - l_ret=kevent(l_es->proc_thread->kqueue_fd,&l_event,1,NULL,0,NULL); + if(l_es->context) + l_ret=kevent(l_es->context->kqueue_fd,&l_event,1,NULL,0,NULL); else l_ret=-100; if(l_ret != -1 ){ @@ -1274,7 +1272,7 @@ int dap_events_socket_queue_ptr_send_to_input(dap_events_socket_t * a_es_input, */ int dap_events_socket_queue_ptr_send( dap_events_socket_t *a_es, void *a_arg) { - int l_ret = -1024, l_errno; + int l_ret = -1024, l_errno=0; if (g_debug_reactor) log_it(L_DEBUG,"Sent ptr %p to esocket queue %p (%d)", a_arg, a_es, a_es? a_es->fd : -1); @@ -1295,7 +1293,7 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t *a_es, void *a_arg) if (l_errno == EINVAL || l_errno == EINTR || l_errno == ETIMEDOUT) l_errno = EAGAIN; if (l_ret == 0) - l_ret = sizeof (a_arg); + l_ret = 0; else if (l_ret > 0) l_ret = -l_ret; @@ -1352,24 +1350,18 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t *a_es, void *a_arg) EV_SET(&l_event,a_es->socket+arc4random() , EVFILT_USER,EV_ADD | EV_CLEAR | EV_ONESHOT, NOTE_FFCOPY | NOTE_TRIGGER ,0, l_es_w_data); int l_n; if(a_es->pipe_out){ // If we have pipe out - we send events directly to the pipe out kqueue fd - if(a_es->pipe_out->worker){ + if(a_es->pipe_out->context){ if( g_debug_reactor) log_it(L_DEBUG, "Sent kevent() with ptr %p to pipe_out worker on esocket %d",a_arg,a_es); - l_n = kevent(a_es->pipe_out->worker->kqueue_fd,&l_event,1,NULL,0,NULL); - }else if (a_es->pipe_out->proc_thread){ - l_n = kevent(a_es->pipe_out->proc_thread->kqueue_fd,&l_event,1,NULL,0,NULL); - if( g_debug_reactor) log_it(L_DEBUG, "Sent kevent() with ptr %p to pipe_out proc_thread on esocket %d",a_arg,a_es); + l_n = kevent(a_es->pipe_out->context->kqueue_fd,&l_event,1,NULL,0,NULL); } else { log_it(L_WARNING,"Trying to send pointer in pipe out queue thats not assigned to any worker or proc thread"); l_n = 0; DAP_DELETE(l_es_w_data); } - }else if(a_es->worker){ - l_n = kevent(a_es->worker->kqueue_fd,&l_event,1,NULL,0,NULL); + }else if(a_es->context){ + l_n = kevent(a_es->context->kqueue_fd,&l_event,1,NULL,0,NULL); if( g_debug_reactor) log_it(L_DEBUG, "Sent kevent() with ptr %p to worker on esocket %d",a_arg,a_es); - }else if (a_es->proc_thread){ - l_n = kevent(a_es->proc_thread->kqueue_fd,&l_event,1,NULL,0,NULL); - if( g_debug_reactor) log_it(L_DEBUG, "Sent kevent() with ptr %p to proc_thread on esocket %d",a_arg,a_es); }else { log_it(L_WARNING,"Trying to send pointer in queue thats not assigned to any worker or proc thread"); l_n = 0; @@ -1377,7 +1369,7 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t *a_es, void *a_arg) } if(l_n != -1 ){ - return sizeof(a_arg); + return 0; }else{ l_errno = errno; log_it(L_ERROR,"Sending kevent error code %d", l_errno); @@ -1438,19 +1430,15 @@ int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value int l_n; if(a_es->pipe_out){ // If we have pipe out - we send events directly to the pipe out kqueue fd - if(a_es->pipe_out->worker) - l_n = kevent(a_es->pipe_out->worker->kqueue_fd,&l_event,1,NULL,0,NULL); - else if (a_es->pipe_out->proc_thread) - l_n = kevent(a_es->pipe_out->proc_thread->kqueue_fd,&l_event,1,NULL,0,NULL); + if(a_es->pipe_out->context) + l_n = kevent(a_es->pipe_out->context->kqueue_fd,&l_event,1,NULL,0,NULL); else { log_it(L_WARNING,"Trying to send pointer in pipe out queue thats not assigned to any worker or proc thread"); l_n = -1; } - }else if(a_es->worker) - l_n = kevent(a_es->worker->kqueue_fd,&l_event,1,NULL,0,NULL); - else if (a_es->proc_thread) - l_n = kevent(a_es->proc_thread->kqueue_fd,&l_event,1,NULL,0,NULL); - else + }else if(a_es->context) + l_n = kevent(a_es->context->kqueue_fd,&l_event,1,NULL,0,NULL); + else l_n = -1; if(l_n == -1){ @@ -1512,130 +1500,6 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da return l_es; } - -/** - * @brief dap_worker_esocket_find_uuid - * @param a_worker - * @param a_es_uuid - * @return - */ -dap_events_socket_t *dap_worker_esocket_find_uuid(dap_worker_t * a_worker, dap_events_socket_uuid_t a_es_uuid ) -{ - assert(a_worker); - dap_events_socket_t * l_ret = NULL; - if(a_worker->esockets ) { - pthread_rwlock_rdlock(&a_worker->esocket_rwlock); - //HASH_FIND_PTR( a_worker->esockets, &a_es_uuid,l_ret ); - HASH_FIND(hh_worker, a_worker->esockets, &a_es_uuid, sizeof(a_es_uuid), l_ret ); - pthread_rwlock_unlock(&a_worker->esocket_rwlock ); - } - return l_ret; -} - -void dap_events_socket_worker_poll_update_unsafe(dap_events_socket_t * a_esocket) -{ - #if defined (DAP_EVENTS_CAPS_EPOLL) - int events = a_esocket->ev_base_flags | EPOLLERR; - - // Check & add - if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) - events |= EPOLLIN; - - if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE || a_esocket->flags &DAP_SOCK_CONNECTING ) - events |= EPOLLOUT; - - a_esocket->ev.events = events; - - if( a_esocket->worker){ - if ( epoll_ctl(a_esocket->worker->epoll_fd, EPOLL_CTL_MOD, a_esocket->socket, &a_esocket->ev) ){ -#ifdef DAP_OS_WINDOWS - int l_errno = WSAGetLastError(); -#else - int l_errno = errno; -#endif - char l_errbuf[128]; - l_errbuf[0]=0; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it(L_ERROR,"Can't update client socket state in the epoll_fd %"DAP_FORMAT_HANDLE": \"%s\" (%d)", - a_esocket->worker->epoll_fd, l_errbuf, l_errno); - } - } - #elif defined (DAP_EVENTS_CAPS_POLL) - if( a_esocket->worker && a_esocket->is_initalized){ - if (a_esocket->poll_index < a_esocket->worker->poll_count ){ - struct pollfd * l_poll = &a_esocket->worker->poll[a_esocket->poll_index]; - l_poll->events = a_esocket->poll_base_flags | POLLERR ; - // Check & add - if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) - l_poll->events |= POLLIN; - if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE || a_esocket->flags &DAP_SOCK_CONNECTING ) - l_poll->events |= POLLOUT; - }else{ - log_it(L_ERROR, "Wrong poll index when remove from worker (unsafe): %u when total count %u", a_esocket->poll_index, - a_esocket->worker->poll_count); - } - } - #elif defined (DAP_EVENTS_CAPS_KQUEUE) - if (a_esocket->socket != -1 ){ // Not everything we add in poll - struct kevent * l_event = &a_esocket->kqueue_event; - short l_filter =a_esocket->kqueue_base_filter; - u_short l_flags =a_esocket->kqueue_base_flags; - u_int l_fflags =a_esocket->kqueue_base_fflags; - - int l_kqueue_fd = a_esocket->worker? a_esocket->worker->kqueue_fd : - a_esocket->proc_thread ? a_esocket->proc_thread->kqueue_fd : -1; - if ( l_kqueue_fd == -1 ){ - log_it(L_ERROR, "Esocket is not assigned with anything ,exit"); - } - - // Check & add - bool l_is_error=false; - int l_errno=0; - if (a_esocket->type == DESCRIPTOR_TYPE_EVENT ){ - EV_SET(l_event, a_esocket->socket, EVFILT_USER,EV_ADD| EV_CLEAR ,0,0, &a_esocket->kqueue_event_catched_data ); - if( kevent( l_kqueue_fd,l_event,1,NULL,0,NULL) == -1){ - l_is_error = true; - l_errno = errno; - } - }else{ - EV_SET(l_event, a_esocket->socket, l_filter,l_flags| EV_ADD,l_fflags,a_esocket->kqueue_data,a_esocket); - if( a_esocket->flags & DAP_SOCK_READY_TO_READ ){ - EV_SET(l_event, a_esocket->socket, EVFILT_READ,l_flags| EV_ADD,l_fflags,a_esocket->kqueue_data,a_esocket); - if( kevent( l_kqueue_fd,l_event,1,NULL,0,NULL) == -1 ){ - l_is_error = true; - l_errno = errno; - } - } - if( !l_is_error){ - if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE || a_esocket->flags &DAP_SOCK_CONNECTING ){ - EV_SET(l_event, a_esocket->socket, EVFILT_WRITE,l_flags| EV_ADD,l_fflags,a_esocket->kqueue_data,a_esocket); - if(kevent( l_kqueue_fd,l_event,1,NULL,0,NULL) == -1){ - l_is_error = true; - l_errno = errno; - } - } - } - } - if (l_is_error && l_errno == EBADF){ - log_it(L_ATT,"Poll update: socket %d (%p ) disconnected, rise CLOSE flag to remove from queue, lost %"DAP_UINT64_FORMAT_U":%" DAP_UINT64_FORMAT_U - " bytes",a_esocket->socket,a_esocket,a_esocket->buf_in_size,a_esocket->buf_out_size); - a_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; - a_esocket->buf_in_size = a_esocket->buf_out_size = 0; // Reset everything from buffer, we close it now all - }else if ( l_is_error && l_errno != EINPROGRESS && l_errno != ENOENT){ - char l_errbuf[128]; - l_errbuf[0]=0; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it(L_ERROR,"Can't update client socket state on kqueue fd %d: \"%s\" (%d)", - l_kqueue_fd, l_errbuf, l_errno); - } - } - - #else - #error "Not defined dap_events_socket_set_writable_unsafe for your platform" - #endif - -} - /** * @brief dap_events_socket_ready_to_read * @param sc @@ -1659,8 +1523,7 @@ void dap_events_socket_set_readable_unsafe( dap_events_socket_t *a_esocket, bool EV_SET(&l_event, a_esocket->socket, EVFILT_READ, a_esocket->kqueue_base_flags | l_op_flag,a_esocket->kqueue_base_fflags , a_esocket->kqueue_data,a_esocket); - int l_kqueue_fd = a_esocket->worker? a_esocket->worker->kqueue_fd : - a_esocket->proc_thread ? a_esocket->proc_thread->kqueue_fd : -1; + int l_kqueue_fd = a_esocket->context? a_esocket->context->kqueue_fd : -1; if( l_kqueue_fd>0 ){ int l_kevent_ret = kevent(l_kqueue_fd,&l_event,1,NULL,0,NULL); int l_errno = errno; @@ -1682,10 +1545,7 @@ void dap_events_socket_set_readable_unsafe( dap_events_socket_t *a_esocket, bool }else log_it(L_WARNING,"Trying to set readable/writable event, queue or timer thats you shouldnt do"); #else - if( a_esocket->worker) - dap_events_socket_worker_poll_update_unsafe( a_esocket); - else if( a_esocket->proc_thread) - dap_proc_thread_esocket_update_poll_flags(a_esocket->proc_thread,a_esocket ); + dap_context_poll_update(a_esocket); #endif } @@ -1716,8 +1576,7 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool EV_SET(&l_event, a_esocket->socket, EVFILT_WRITE, a_esocket->kqueue_base_flags | l_op_flag,a_esocket->kqueue_base_fflags , a_esocket->kqueue_data,a_esocket); - int l_kqueue_fd = a_esocket->worker? a_esocket->worker->kqueue_fd : - a_esocket->proc_thread ? a_esocket->proc_thread->kqueue_fd : -1; + int l_kqueue_fd = a_esocket->context? a_esocket->context->kqueue_fd : -1; if( l_kqueue_fd>0 ){ int l_kevent_ret=kevent(l_kqueue_fd,&l_event,1,NULL,0,NULL); int l_errno = errno; @@ -1739,10 +1598,7 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool }else log_it(L_WARNING,"Trying to set readable/writable event, queue or timer thats you shouldnt do"); #else - if( a_esocket->worker) - dap_events_socket_worker_poll_update_unsafe(a_esocket); - else if( a_esocket->proc_thread) - dap_proc_thread_esocket_update_poll_flags(a_esocket->proc_thread,a_esocket ); + dap_context_poll_update(a_esocket); #endif } @@ -1760,7 +1616,7 @@ bool s_remove_and_delete_unsafe_delayed_delete_callback(void * a_arg) assert(l_es_handler); assert(l_worker); dap_events_socket_t * l_es; - if( (l_es = dap_worker_esocket_find_uuid(l_worker, l_es_handler->esocket_uuid)) != NULL) + if( (l_es = dap_context_esocket_find_by_uuid(l_worker->context, l_es_handler->esocket_uuid)) != NULL) //dap_events_socket_remove_and_delete_unsafe(l_es,l_es_handler->value == 1); dap_events_remove_and_delete_socket_unsafe(dap_events_get_default(), l_es, l_es_handler->value == 1); DAP_DELETE(l_es_handler); @@ -1868,10 +1724,8 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap return; } - pthread_rwlock_wrlock(&a_worker->esocket_rwlock); a_worker->event_sockets_count--; - HASH_DELETE(hh_worker,a_worker->esockets, a_es); - pthread_rwlock_unlock(&a_worker->esocket_rwlock); + HASH_DELETE(hh_worker,a_worker->context->esockets, a_es); #if defined(DAP_EVENTS_CAPS_EPOLL) @@ -1888,36 +1742,36 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap struct kevent * l_event = &a_es->kqueue_event; if (a_es->kqueue_base_filter){ EV_SET(l_event, a_es->socket, a_es->kqueue_base_filter ,EV_DELETE, 0,0,a_es); - if ( kevent( a_worker->kqueue_fd,l_event,1,NULL,0,NULL) == -1 ) { + if ( kevent( a_worker->context->kqueue_fd,l_event,1,NULL,0,NULL) == -1 ) { int l_errno = errno; char l_errbuf[128]; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); log_it( L_ERROR,"Can't remove event socket's handler %d from the kqueue %d filter %d \"%s\" (%d)", a_es->socket, - a_worker->kqueue_fd,a_es->kqueue_base_filter, l_errbuf, l_errno); + a_worker->context->kqueue_fd,a_es->kqueue_base_filter, l_errbuf, l_errno); } }else{ EV_SET(l_event, a_es->socket, EVFILT_EXCEPT ,EV_DELETE, 0,0,a_es); - kevent( a_worker->kqueue_fd,l_event,1,NULL,0,NULL); // If this filter is not set up - no warnings + kevent( a_worker->context->kqueue_fd,l_event,1,NULL,0,NULL); // If this filter is not set up - no warnings if(a_es->flags & DAP_SOCK_READY_TO_WRITE){ EV_SET(l_event, a_es->socket, EVFILT_WRITE ,EV_DELETE, 0,0,a_es); - if ( kevent( a_worker->kqueue_fd,l_event,1,NULL,0,NULL) == -1 ) { + if ( kevent( a_worker->context->kqueue_fd,l_event,1,NULL,0,NULL) == -1 ) { int l_errno = errno; char l_errbuf[128]; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); log_it( L_ERROR,"Can't remove event socket's handler %d from the kqueue %d filter EVFILT_WRITE \"%s\" (%d)", a_es->socket, - a_worker->kqueue_fd, l_errbuf, l_errno); + a_worker->context->kqueue_fd, l_errbuf, l_errno); } } if(a_es->flags & DAP_SOCK_READY_TO_READ){ EV_SET(l_event, a_es->socket, EVFILT_READ ,EV_DELETE, 0,0,a_es); - if ( kevent( a_worker->kqueue_fd,l_event,1,NULL,0,NULL) == -1 ) { + if ( kevent( a_worker->context->kqueue_fd,l_event,1,NULL,0,NULL) == -1 ) { int l_errno = errno; char l_errbuf[128]; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); log_it( L_ERROR,"Can't remove event socket's handler %d from the kqueue %d filter EVFILT_READ \"%s\" (%d)", a_es->socket, - a_worker->kqueue_fd, l_errbuf, l_errno); + a_worker->context->kqueue_fd, l_errbuf, l_errno); } } @@ -2084,7 +1938,7 @@ size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_eve int l_ret= dap_events_socket_queue_ptr_send_to_input(a_es_input, l_msg ); if (l_ret!=0){ - log_it(L_ERROR, "wite f inter: wasn't send pointer to queue input: code %d", l_ret); + log_it(L_ERROR, "write f inter: wasn't send pointer to queue input: code %d", l_ret); DAP_DELETE(l_msg); return 0; } diff --git a/dap-sdk/net/core/dap_net.c b/dap-sdk/io/dap_net.c similarity index 100% rename from dap-sdk/net/core/dap_net.c rename to dap-sdk/io/dap_net.c diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/io/dap_proc_queue.c similarity index 85% rename from dap-sdk/net/core/dap_proc_queue.c rename to dap-sdk/io/dap_proc_queue.c index ae7c1daaf9ed717ab8804cf5abf2959582b92f04..6ffc4b63874549252857dbf5760c285117fd1592 100644 --- a/dap-sdk/net/core/dap_proc_queue.c +++ b/dap-sdk/io/dap_proc_queue.c @@ -51,7 +51,7 @@ dap_proc_queue_t * dap_proc_queue_create_ext(dap_proc_thread_t * a_thread) if (!l_queue) return NULL; - for (int i = 0; i < DAP_QUE$K_PRIMAX; i++) { + for (int i = 0; i < DAP_PROC_PRI_MAX; i++) { assert ( !(pthread_mutex_init(&l_queue->list[i].lock, 0 )) ); } @@ -78,7 +78,7 @@ dap_proc_queue_t * dap_proc_queue_create(dap_proc_thread_t * a_thread) if (!l_queue) return NULL; - for (int i = 0; i < DAP_QUE$K_PRIMAX; i++) { + for (int i = 0; i < DAP_PROC_PRI_MAX; i++) { assert ( !(pthread_mutex_init(&l_queue->list[i].lock, 0 )) ); } @@ -104,7 +104,7 @@ int dap_proc_queue_delete(dap_proc_queue_t * a_queue) return -ENOMEM; l_msg->signal_kill = 1; /* TRUE */ - l_msg->pri = DAP_QUE$K_PRI_HIGH; /* Assume that KILL must be delivered ASAP */ + l_msg->pri = DAP_PROC_PRI_HIGH; /* Assume that KILL must be delivered ASAP */ return dap_events_socket_queue_ptr_send( a_queue->esocket, l_msg ); } @@ -157,7 +157,7 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg) * So, all checks has been finished, now we can prepare new entry */ pri = l_msg->pri; /* Validate priority */ - pri = MIN(pri, DAP_QUE$K_PRIMAX - 1); + pri = MIN(pri, DAP_PROC_PRI_MAX - 1); pri = MAX(pri, 0); l_item->callback = l_msg->callback; @@ -197,7 +197,7 @@ int dap_proc_queue_add_callback(dap_worker_t * a_worker,dap_proc_queue_callback_ l_msg->callback = a_callback; l_msg->callback_arg = a_callback_arg; - l_msg->pri = DAP_QUE$K_PRI_NORMAL; + l_msg->pri = DAP_PROC_PRI_NORMAL; /* * Send message to queue with the DEFAULT priority */ @@ -219,10 +219,10 @@ int dap_proc_queue_add_callback_ext(dap_worker_t * a_worker,dap_proc_queue_callb { dap_proc_queue_msg_t *l_msg; - if ( !(a_pri < DAP_QUE$K_PRIMAX) ) /* Check that priority level is in legal range */ + if ( !(a_pri < DAP_PROC_PRI_MAX) ) /* Check that priority level is in legal range */ { - log_it(L_WARNING, "Priority level %d is incorrect (should be is in range %d-%d)", a_pri, DAP_QUE$K_PRI0 + 1, DAP_QUE$K_PRIMAX - 1); - a_pri = DAP_QUE$K_PRI_NORMAL; + log_it(L_WARNING, "Priority level %d is incorrect (should be is in range %d-%d)", a_pri, DAP_PROC_PRI_0 + 1, DAP_PROC_PRI_MAX - 1); + a_pri = DAP_PROC_PRI_NORMAL; } if ( !(l_msg = DAP_NEW_Z(dap_proc_queue_msg_t)) ) /* Allocate memory for a new message */ @@ -256,7 +256,7 @@ int dap_proc_queue_add_callback_inter( dap_events_socket_t * a_es_input, dap_pro l_msg->callback = a_callback; l_msg->callback_arg = a_callback_arg; - l_msg->pri = DAP_QUE$K_PRI_NORMAL; + l_msg->pri = DAP_PROC_PRI_NORMAL; return dap_events_socket_queue_ptr_send_to_input( a_es_input , l_msg ); } @@ -275,10 +275,10 @@ int dap_proc_queue_add_callback_inter_ext( dap_events_socket_t * a_es_input, dap { dap_proc_queue_msg_t *l_msg; - if ( !(a_pri < DAP_QUE$K_PRIMAX) ) /* Check that priority level is in legal range */ + if ( !(a_pri < DAP_PROC_PRI_MAX) ) /* Check that priority level is in legal range */ { - log_it(L_WARNING, "Priority level %d is incorrect (should be is in range %d-%d)", a_pri, DAP_QUE$K_PRI0 + 1, DAP_QUE$K_PRIMAX - 1); - a_pri = DAP_QUE$K_PRI_NORMAL; + log_it(L_WARNING, "Priority level %d is incorrect (should be is in range %d-%d)", a_pri, DAP_PROC_PRI_0 + 1, DAP_PROC_PRI_MAX - 1); + a_pri = DAP_PROC_PRI_NORMAL; } if ( !(l_msg = DAP_NEW_Z(dap_proc_queue_msg_t)) ) /* Allocate memory for a new message */ @@ -290,3 +290,31 @@ dap_proc_queue_msg_t *l_msg; return dap_events_socket_queue_ptr_send_to_input( a_es_input , l_msg ); } + +/** + * @brief dap_proc_thread_add_callback_mt + * @param a_thread + * @param a_callback + * @param a_callback_arg + * @return + */ +int dap_proc_thread_add_callback_mt(dap_proc_thread_t * a_thread, dap_proc_queue_callback_t a_callback, void * a_callback_arg, int a_pri) +{ + dap_proc_queue_msg_t *l_msg; + + if ( !(a_pri < DAP_PROC_PRI_MAX) ) /* Check that priority level is in legal range */ + { + log_it(L_WARNING, "Priority level %d is incorrect (should be is in range %d-%d)", a_pri, DAP_PROC_PRI_0 + 1, DAP_PROC_PRI_MAX - 1); + a_pri = DAP_PROC_PRI_NORMAL; + } + + if ( !(l_msg = DAP_NEW_Z(dap_proc_queue_msg_t)) ) /* Allocate memory for a new message */ + return -ENOMEM; + + l_msg->callback = a_callback; + l_msg->callback_arg = a_callback_arg; + l_msg->pri = a_pri; + + dap_events_socket_queue_ptr_send(a_thread->proc_queue->esocket, l_msg); +} + diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/io/dap_proc_thread.c similarity index 99% rename from dap-sdk/net/core/dap_proc_thread.c rename to dap-sdk/io/dap_proc_thread.c index 6a4151e277e7d370a45d2fd73560a166802337bf..d492f7dbe531e6fee13efada91c90f32d1f294d3 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/io/dap_proc_thread.c @@ -227,7 +227,7 @@ dap_proc_queue_t *l_queue; /*@RRL: l_iter_cnt = DAP_QUE$K_ITER_NR; */ l_queue = l_thread->proc_queue; - for (l_cur_pri = (DAP_QUE$K_PRIMAX - 1); l_cur_pri; l_cur_pri--, l_iter_cnt++ ) /* Run from higest to lowest ... */ + for (l_cur_pri = (DAP_PROC_PRI_MAX - 1); l_cur_pri; l_cur_pri--, l_iter_cnt++ ) /* Run from higest to lowest ... */ { if ( !l_queue->list[l_cur_pri].items.nr ) /* A lockless quick check */ continue; @@ -264,7 +264,7 @@ dap_proc_queue_t *l_queue; DAP_DELETE(l_item); } } - for (l_cur_pri = (DAP_QUE$K_PRIMAX - 1); l_cur_pri; l_cur_pri--) /* Really ?! */ + for (l_cur_pri = (DAP_PROC_PRI_MAX - 1); l_cur_pri; l_cur_pri--) /* Really ?! */ l_is_anybody_in_queue += l_queue->list[l_cur_pri].items.nr; if ( l_is_anybody_in_queue ) /* Arm event if we have something to proc again */ @@ -858,7 +858,7 @@ static void * s_proc_thread_function(void * a_arg) // Select socket and kqueue fd to send the event dap_events_socket_t * l_es_output = l_cur->pipe_out ? l_cur->pipe_out : l_cur; - int l_kqueue_fd = l_es_output->worker ? l_es_output->worker->kqueue_fd : l_es_output->proc_thread ? l_es_output->proc_thread->kqueue_fd : -1; + int l_kqueue_fd = l_es_output->context ? l_es_output->context->kqueue_fd : -1; struct kevent l_event; dap_events_socket_w_data_t * l_es_w_data = DAP_NEW_Z(dap_events_socket_w_data_t); @@ -1086,7 +1086,7 @@ int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worke * @param a_callback * @param a_arg */ -void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a_worker_id, dap_worker_callback_t a_callback, void * a_arg) +void dap_proc_thread_worker_exec_callback_inter(dap_proc_thread_t * a_thread, size_t a_worker_id, dap_worker_callback_t a_callback, void * a_arg) { dap_worker_msg_callback_t * l_msg = DAP_NEW_Z(dap_worker_msg_callback_t); l_msg->callback = a_callback; @@ -1100,6 +1100,7 @@ void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a #endif } + static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags) { (void) a_flags; @@ -1109,3 +1110,6 @@ static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags) log_it(L_DEBUG, "Proc_thread :%u signaled to exit", l_thread->cpu_id); } + + + diff --git a/dap-sdk/net/core/dap_server.c b/dap-sdk/io/dap_server.c similarity index 100% rename from dap-sdk/net/core/dap_server.c rename to dap-sdk/io/dap_server.c diff --git a/dap-sdk/net/core/dap_timerfd.c b/dap-sdk/io/dap_timerfd.c similarity index 89% rename from dap-sdk/net/core/dap_timerfd.c rename to dap-sdk/io/dap_timerfd.c index c38f9e27e3cc6cec551b0244e544237fdee5cca0..c358ed402c7e7b183ec26f61c71216c9f7bff3c5 100644 --- a/dap-sdk/net/core/dap_timerfd.c +++ b/dap-sdk/io/dap_timerfd.c @@ -255,7 +255,7 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t return l_timerfd; } -static void s_timerfd_reset(dap_timerfd_t *a_timerfd, dap_events_socket_t *a_event_sock) +static inline void s_timerfd_reset(dap_timerfd_t *a_timerfd, dap_events_socket_t *a_event_sock) { #if defined DAP_OS_LINUX struct itimerspec l_ts; @@ -289,6 +289,9 @@ static void s_timerfd_reset(dap_timerfd_t *a_timerfd, dap_events_socket_t *a_eve #endif } + + + /** * @brief s_es_callback_timer * @param a_event_sock @@ -304,6 +307,38 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock) } } +/** + * @brief s_timerfd_reset_worker_callback + * @param a_worker + * @param a_arg + */ +static void s_timerfd_reset_worker_callback( dap_worker_t * a_worker, void * a_arg ) +{ + dap_timerfd_t *l_timerfd = (dap_timerfd_t *) a_arg; + dap_events_socket_t *l_sock = NULL; + l_sock = dap_context_esocket_find_by_uuid(a_worker->context, l_timerfd->esocket_uuid); + if (l_sock) + s_timerfd_reset(l_timerfd, l_sock); + +} + +/** + * @brief s_timerfd_reset_proc_thread_callback + * @param a_thread + * @param a_arg + * @return + */ +static bool s_timerfd_reset_proc_thread_callback( dap_proc_thread_t * a_thread, void * a_arg ) +{ + dap_timerfd_t *l_timerfd = (dap_timerfd_t *) a_arg; + dap_events_socket_t *l_sock = NULL; + l_sock = dap_context_esocket_find_by_uuid(a_thread->context, l_timerfd->esocket_uuid); + if (l_sock) + s_timerfd_reset(l_timerfd, l_sock); + return true; +} + + /** * @brief dap_timerfd_reset * @param a_tfd @@ -312,13 +347,12 @@ void dap_timerfd_reset(dap_timerfd_t *a_timerfd) { if (!a_timerfd) return; - dap_events_socket_t *l_sock = NULL; - if (a_timerfd->worker) - l_sock = dap_worker_esocket_find_uuid(a_timerfd->worker, a_timerfd->esocket_uuid); - else if (a_timerfd->proc_thread) - l_sock = a_timerfd->events_socket; - if (l_sock) - s_timerfd_reset(a_timerfd, l_sock); + if (a_timerfd->worker){ + dap_worker_exec_callback_on(a_timerfd->worker,s_timerfd_reset_worker_callback, a_timerfd); + }else if (a_timerfd->proc_thread) + dap_proc_thread_add_callback_mt(a_timerfd->proc_thread,s_timerfd_reset_proc_thread_callback, a_timerfd, DAP_PROC_PRI_NORMAL ); + else + log_it(L_WARNING,"Timer's context undefined, cant' reset it"); } /** diff --git a/dap-sdk/io/dap_worker.c b/dap-sdk/io/dap_worker.c new file mode 100644 index 0000000000000000000000000000000000000000..d70092b68b5aac2b73d392f679f56afdee6ed924 --- /dev/null +++ b/dap-sdk/io/dap_worker.c @@ -0,0 +1,437 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Ltd. https://demlabs.net + * Copyright (c) 2017 + * All rights reserved. + + This file is part of DAP SDK the open source project + + DAP SDK is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP SDK is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#include <time.h> +#include <errno.h> +#include <unistd.h> + +#include "dap_common.h" +#include "dap_config.h" +#include "dap_context.h" +#include "dap_math_ops.h" +#include "dap_worker.h" +#include "dap_timerfd.h" +#include "dap_events.h" +#include "dap_enc_base64.h" +#include "dap_proc_queue.h" + +#ifndef DAP_NET_CLIENT_NO_SSL +#include <wolfssl/options.h> +#include "wolfssl/ssl.h" +#endif + +#define LOG_TAG "dap_worker" + +static time_t s_connection_timeout = 60; // seconds + +static bool s_socket_all_check_activity( void * a_arg); +static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg); +static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg); +static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_arg); +static void s_queue_callback_callback( dap_events_socket_t * a_es, void * a_arg); +static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg); +static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags); + +/** + * @brief dap_worker_init + * @param a_threads_count + * @param conn_timeout + * @return + */ +int dap_worker_init( size_t a_conn_timeout ) +{ + if ( a_conn_timeout ) + s_connection_timeout = a_conn_timeout; + + return 0; +} + +void dap_worker_deinit( ) +{ +} + +/** + * @brief dap_worker_thread + * @param arg + * @return + */ +void *dap_worker_thread(void *arg) +{ + dap_worker_t *l_worker = (dap_worker_t *) arg; + uint32_t l_tn = l_worker->id; + const struct sched_param l_shed_params = {0}; + + + dap_cpu_assign_thread_on(l_worker->id); + pthread_setspecific(l_worker->events->pth_key_worker, l_worker); + +#ifdef DAP_OS_WINDOWS + if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL)) + log_it(L_ERROR, "Couldn'r set thread priority, err: %lu", GetLastError()); +#else + pthread_setschedparam(pthread_self(),SCHED_FIFO ,&l_shed_params); +#endif + + if(dap_context_thread_init(l_worker->context)!=0){ + pthread_cond_broadcast(&l_worker->started_cond); + return NULL; + } + + l_worker->queue_es_new_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); + l_worker->queue_es_delete_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); + l_worker->queue_es_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); + l_worker->queue_es_reassign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); + + + l_worker->queue_es_new = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_add_es_callback); + l_worker->queue_es_delete = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_delete_es_callback); + l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_es_io_callback); + l_worker->queue_es_reassign = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_es_reassign_callback ); + + + for( size_t n = 0; n < dap_events_worker_get_count(); n++) { + l_worker->queue_es_new_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_new); + l_worker->queue_es_delete_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_delete); + l_worker->queue_es_io_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_io); + l_worker->queue_es_reassign_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_reassign); + } + + l_worker->queue_callback = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_callback_callback); + l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback); + + l_worker->timer_check_activity = dap_timerfd_create(s_connection_timeout * 1000 / 2, + s_socket_all_check_activity, l_worker); + dap_worker_add_events_socket_unsafe( l_worker->timer_check_activity->events_socket, l_worker); + pthread_mutex_lock(&l_worker->started_mutex); + pthread_cond_broadcast(&l_worker->started_cond); + pthread_mutex_unlock(&l_worker->started_mutex); + + dap_context_thread_loop(l_worker->context); + log_it(L_NOTICE,"Exiting thread #%u", l_worker->id); + return NULL; +} + +/** + * @brief s_new_es_callback + * @param a_es + * @param a_arg + */ +static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg) +{ + dap_worker_t * l_worker = a_es->worker; + dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg; + if (!l_es_new){ + log_it(L_ERROR,"NULL esocket accepted to add on worker #%u", l_worker->id); + return; + } + + if(g_debug_reactor) + log_it(L_NOTICE, "Received event socket %p (ident %"DAP_FORMAT_SOCKET" type %d) to add on worker", l_es_new, l_es_new->socket, l_es_new->type); + + switch( l_es_new->type){ + case DESCRIPTOR_TYPE_SOCKET_UDP: break; + case DESCRIPTOR_TYPE_SOCKET_CLIENT: break; + default:{} + } + +#ifdef DAP_EVENTS_CAPS_KQUEUE + if(l_es_new->socket!=0 && l_es_new->socket != -1 && + l_es_new->type != DESCRIPTOR_TYPE_EVENT && + l_es_new->type != DESCRIPTOR_TYPE_QUEUE && + l_es_new->type != DESCRIPTOR_TYPE_TIMER + ) +#else + if(l_es_new->socket!=0 && l_es_new->socket != INVALID_SOCKET) + +#endif + if(dap_context_esocket_find_by_uuid( l_worker->context, l_es_new->uuid)){ + // Socket already present in worker, it's OK + return; + } + + switch( l_es_new->type){ + + case DESCRIPTOR_TYPE_SOCKET_UDP: + case DESCRIPTOR_TYPE_SOCKET_CLIENT: + case DESCRIPTOR_TYPE_SOCKET_LISTENING:{ + +#ifdef DAP_OS_UNIX +#if defined (SO_INCOMING_CPU) + int l_cpu = l_worker->id; + setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu)); +#endif +#endif + } break; + default: {} + } + + l_es_new->worker = l_worker; + l_es_new->last_time_active = time(NULL); + // We need to differ new and reassigned esockets. If its new - is_initialized is false + if ( ! l_es_new->is_initalized ){ + if (l_es_new->callbacks.new_callback) + l_es_new->callbacks.new_callback(l_es_new, NULL); + l_es_new->is_initalized = true; + } + + int l_ret = dap_worker_add_events_socket_unsafe(l_es_new,l_worker); + if ( l_ret != 0 ){ + log_it(L_CRITICAL,"Can't add event socket's handler to worker i/o poll mechanism with error %d", errno); + }else{ + // Add in worker + l_es_new->me = l_es_new; + if (l_es_new->socket!=0 && l_es_new->socket != INVALID_SOCKET){ + HASH_ADD(hh_worker, l_worker->context->esockets, uuid, sizeof(l_es_new->uuid), l_es_new ); + l_worker->event_sockets_count++; + } + //log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id); + if (l_es_new->callbacks.worker_assign_callback) + l_es_new->callbacks.worker_assign_callback(l_es_new, l_worker); + + } +} + +/** + * @brief s_delete_es_callback + * @param a_es + * @param a_arg + */ +static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg) +{ + assert(a_arg); + dap_events_socket_uuid_t * l_es_uuid_ptr = (dap_events_socket_uuid_t*) a_arg; + dap_events_socket_t * l_es; + if ( (l_es = dap_context_esocket_find_by_uuid(a_es->worker->context,*l_es_uuid_ptr)) != NULL ){ + //l_es->flags |= DAP_SOCK_SIGNAL_CLOSE; // Send signal to socket to kill + dap_events_socket_remove_and_delete_unsafe(l_es,false); + }else + log_it(L_INFO, "While we were sending the delete() message, esocket %"DAP_UINT64_FORMAT_U" has been disconnected ", *l_es_uuid_ptr); + DAP_DELETE(l_es_uuid_ptr); +} + +/** + * @brief s_reassign_es_callback + * @param a_es + * @param a_arg + */ +static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_arg) +{ + assert(a_es); + dap_worker_t * l_worker = a_es->worker; + assert(l_worker); + dap_worker_msg_reassign_t * l_msg = (dap_worker_msg_reassign_t*) a_arg; + assert(l_msg); + dap_events_socket_t * l_es_reassign; + if ( ( l_es_reassign = dap_context_esocket_find_by_uuid(l_worker->context, l_msg->esocket_uuid))!= NULL ){ + if( l_es_reassign->was_reassigned && l_es_reassign->flags & DAP_SOCK_REASSIGN_ONCE) { + log_it(L_INFO, "Reassgment request with DAP_SOCK_REASSIGN_ONCE allowed only once, declined reassigment from %u to %u", + l_es_reassign->worker->id, l_msg->worker_new->id); + + }else{ + dap_events_socket_reassign_between_workers_unsafe(l_es_reassign,l_msg->worker_new); + } + }else{ + log_it(L_INFO, "While we were sending the reassign message, esocket %p has been disconnected", l_msg->esocket); + } + DAP_DELETE(l_msg); +} + +/** + * @brief s_queue_callback + * @param a_es + * @param a_arg + */ +static void s_queue_callback_callback( dap_events_socket_t * a_es, void * a_arg) +{ + dap_worker_msg_callback_t * l_msg = (dap_worker_msg_callback_t *) a_arg; + assert(l_msg); + assert(l_msg->callback); + l_msg->callback(a_es->worker, l_msg->arg); + DAP_DELETE(l_msg); +} + +/** + * @brief s_event_exit_callback + * @param a_es + * @param a_flags + */ +static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags) +{ + (void) a_flags; + a_es->worker->context->signal_exit = true; + if(g_debug_reactor) + log_it(L_DEBUG, "Worker :%u signaled to exit", a_es->worker->id); +} + +/** + * @brief s_pipe_data_out_read_callback + * @param a_es + * @param a_arg + */ +static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg) +{ + assert(a_es); + dap_worker_t * l_worker = a_es->worker; + dap_worker_msg_io_t * l_msg = a_arg; + assert(l_msg); + // Check if it was removed from the list + dap_events_socket_t *l_msg_es = dap_context_esocket_find_by_uuid(l_worker->context, l_msg->esocket_uuid); + if ( l_msg_es == NULL){ + log_it(L_INFO, "We got i/o message for esocket %"DAP_UINT64_FORMAT_U" thats now not in list. Lost %zu data", l_msg->esocket_uuid, l_msg->data_size); + DAP_DELETE(l_msg); + return; + } + + if (l_msg->flags_set & DAP_SOCK_CONNECTING) + if (! (l_msg_es->flags & DAP_SOCK_CONNECTING) ){ + l_msg_es->flags |= DAP_SOCK_CONNECTING; + dap_context_poll_update(l_msg_es); + } + + if (l_msg->flags_set & DAP_SOCK_CONNECTING) + if (! (l_msg_es->flags & DAP_SOCK_CONNECTING) ){ + l_msg_es->flags ^= DAP_SOCK_CONNECTING; + dap_context_poll_update(l_msg_es); + } + + if (l_msg->flags_set & DAP_SOCK_READY_TO_READ) + dap_events_socket_set_readable_unsafe(l_msg_es, true); + if (l_msg->flags_unset & DAP_SOCK_READY_TO_READ) + dap_events_socket_set_readable_unsafe(l_msg_es, false); + if (l_msg->flags_set & DAP_SOCK_READY_TO_WRITE) + dap_events_socket_set_writable_unsafe(l_msg_es, true); + if (l_msg->flags_unset & DAP_SOCK_READY_TO_WRITE) + dap_events_socket_set_writable_unsafe(l_msg_es, false); + if (l_msg->data_size && l_msg->data) { + dap_events_socket_write_unsafe(l_msg_es, l_msg->data,l_msg->data_size); + DAP_DELETE(l_msg->data); + } + DAP_DELETE(l_msg); +} + +/** + * @brief s_socket_all_check_activity + * @param a_arg + */ +static bool s_socket_all_check_activity( void * a_arg) +{ + dap_worker_t *l_worker = (dap_worker_t*) a_arg; + assert(l_worker); + dap_events_socket_t *l_es = NULL, *tmp = NULL; + char l_curtimebuf[64]; + time_t l_curtime= time(NULL); + //dap_ctime_r(&l_curtime, l_curtimebuf); + //log_it(L_DEBUG,"Check sockets activity on worker #%u at %s", l_worker->id, l_curtimebuf); + HASH_ITER(hh_worker, l_worker->context->esockets, l_es, tmp ) { + if (l_es->type == DESCRIPTOR_TYPE_SOCKET_CLIENT){ + if ( !(l_es->flags & DAP_SOCK_SIGNAL_CLOSE) && + ( l_curtime >= (l_es->last_time_active + s_connection_timeout) ) && !l_es->no_close ) { + log_it( L_INFO, "Socket %"DAP_FORMAT_SOCKET" timeout (diff %"DAP_UINT64_FORMAT_U" ), closing...", + l_es->socket, l_curtime - (time_t)l_es->last_time_active - s_connection_timeout ); + if (l_es->callbacks.error_callback) { + l_es->callbacks.error_callback(l_es, ETIMEDOUT); + } + dap_events_socket_remove_and_delete_unsafe(l_es,false); + } + } + } + return true; +} + +/** + * @brief sap_worker_add_events_socket + * @param a_events_socket + * @param a_worker + */ +void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_worker_t * a_worker) +{ +/*#ifdef DAP_EVENTS_CAPS_KQUEUE + a_events_socket->worker = a_worker; + if(dap_worker_add_events_socket_unsafe(a_events_socket, a_worker)!=0) + a_events_socket->worker = NULL; + +#else*/ + if(g_debug_reactor) + log_it(L_DEBUG,"Worker add esocket %"DAP_FORMAT_SOCKET, a_events_socket->socket); + int l_ret = dap_events_socket_queue_ptr_send( a_worker->queue_es_new, a_events_socket ); + if(l_ret != 0 ){ + char l_errbuf[128]; + *l_errbuf = 0; + strerror_r(l_ret, l_errbuf, sizeof(l_errbuf)); + log_it(L_ERROR, "Can't send pointer in queue: \"%s\"(code %d)", l_errbuf, l_ret); + } +//#endif +} + +/** + * @brief dap_worker_add_events_socket_inter + * @param a_es_input + * @param a_events_socket + */ +void dap_worker_add_events_socket_inter(dap_events_socket_t * a_es_input, dap_events_socket_t * a_events_socket) +{ + if( dap_events_socket_queue_ptr_send_to_input( a_es_input, a_events_socket ) != 0 ){ + int l_errno = errno; + char l_errbuf[128]; + *l_errbuf = 0; + strerror_r(l_errno,l_errbuf,sizeof (l_errbuf)); + log_it(L_ERROR, "Cant send pointer to interthread queue input: \"%s\"(code %d)", l_errbuf, l_errno); + } +} + + +/** + * @brief dap_worker_exec_callback_on + */ +void dap_worker_exec_callback_on(dap_worker_t * a_worker, dap_worker_callback_t a_callback, void * a_arg) +{ + dap_worker_msg_callback_t * l_msg = DAP_NEW_Z(dap_worker_msg_callback_t); + l_msg->callback = a_callback; + l_msg->arg = a_arg; + int l_ret=dap_events_socket_queue_ptr_send( a_worker->queue_callback,l_msg ); + if(l_ret != 0 ){ + char l_errbuf[128]; + *l_errbuf = 0; + strerror_r(l_ret,l_errbuf,sizeof (l_errbuf)); + log_it(L_ERROR, "Cant send pointer in queue: \"%s\"(code %d)", l_errbuf, l_ret); + } + +} + + +/** + * @brief dap_worker_add_events_socket + * @param a_worker + * @param a_events_socket + */ +dap_worker_t *dap_worker_add_events_socket_auto( dap_events_socket_t *a_es) +{ +// struct epoll_event ev = {0}; + dap_worker_t *l_worker = dap_events_worker_get_auto( ); + + a_es->events = l_worker->events; + dap_worker_add_events_socket( a_es, l_worker); + return l_worker; +} + + + diff --git a/dap-sdk/io/include/dap_context.h b/dap-sdk/io/include/dap_context.h new file mode 100644 index 0000000000000000000000000000000000000000..51099b9b04a1ae407baf7790a18396b4e81874e2 --- /dev/null +++ b/dap-sdk/io/include/dap_context.h @@ -0,0 +1,84 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Ltd. https://demlabs.net + * Copyright (c) 2022 + * All rights reserved. + + This file is part of DAP SDK the open source project + + DAP SDK is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP SDK is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#pragma once + +#include <pthread.h> +#include <uthash.h> +#include "dap_common.h" +#include "dap_events_socket.h" +#include "dap_proc_queue.h" + +typedef struct dap_context { + uint32_t id; + +#if defined DAP_EVENTS_CAPS_MSMQ + HANDLE msmq_events[MAXIMUM_WAIT_OBJECTS]; +#endif + +#if defined DAP_EVENTS_CAPS_EPOLL + EPOLL_HANDLE epoll_fd; + struct epoll_event epoll_events[ DAP_EVENTS_SOCKET_MAX]; +#elif defined ( DAP_EVENTS_CAPS_POLL) + int poll_fd; + struct pollfd * poll; + dap_events_socket_t ** poll_esocket; + atomic_uint poll_count; + size_t poll_count_max; + bool poll_compress; // Some of fd's became NULL so arrays need to be reassigned +#elif defined (DAP_EVENTS_CAPS_KQUEUE) + int kqueue_fd; + struct kevent * kqueue_events_selected; + struct kevent * kqueue_events; + size_t kqueue_events_count; + + int kqueue_events_count_max; + int kqueue_events_selected_count_max; +#else +#error "Not defined worker for your platform" +#endif + + dap_events_socket_t *esockets; // Hashmap of event sockets + + // Signal to exit + bool signal_exit; + +} dap_context_t; + +extern pthread_key_t g_dap_context_pth_key; +static inline dap_context_t * dap_context_current(){ + return (dap_context_t*) pthread_getspecific(g_dap_context_pth_key); +} + + +int dap_context_init(); // Init + +// New context create. Thread-safe functions +dap_context_t * dap_context_new(); + +/// ALL THIS FUNCTIONS ARE UNSAFE ! CALL THEM ONLY INSIDE THEIR OWN CONTEXT!! +int dap_context_thread_init(dap_context_t * a_context); +int dap_context_thread_loop(dap_context_t * a_context); + +int dap_context_add_esocket(dap_context_t * a_context, dap_events_socket_t * a_esocket ); +void dap_context_poll_update(dap_events_socket_t * a_esocket); +dap_events_socket_t *dap_context_esocket_find_by_uuid(dap_context_t * a_context, dap_events_socket_uuid_t a_es_uuid ); diff --git a/dap-sdk/net/core/include/dap_events.h b/dap-sdk/io/include/dap_events.h similarity index 100% rename from dap-sdk/net/core/include/dap_events.h rename to dap-sdk/io/include/dap_events.h diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/io/include/dap_events_socket.h similarity index 98% rename from dap-sdk/net/core/include/dap_events_socket.h rename to dap-sdk/io/include/dap_events_socket.h index d0b406a1c7c31ee99fe3625f716dcd939b022f5a..0151e76708bfb786dc8d3b936bfb5bea15374263 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/io/include/dap_events_socket.h @@ -115,6 +115,7 @@ typedef struct dap_events dap_events_t; typedef struct dap_events_socket dap_events_socket_t; typedef struct dap_worker dap_worker_t; typedef struct dap_proc_thread dap_proc_thread_t ; +typedef struct dap_context dap_context_t; typedef struct dap_server dap_server_t; typedef void (*dap_events_socket_callback_t) (dap_events_socket_t *,void * ); // Callback for specific client operations @@ -245,6 +246,7 @@ typedef struct dap_events_socket { // Links to related objects dap_events_t *events; + dap_context_t * context; dap_worker_t *worker; dap_proc_thread_t * proc_thread; // If assigned on dap_proc_thread_t object dap_server_t *server; // If this socket assigned with server @@ -341,13 +343,8 @@ void dap_events_socket_assign_on_worker_inter(dap_events_socket_t * a_es_input, void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, dap_events_socket_t * a_es, dap_worker_t * a_worker_new); void dap_events_socket_reassign_between_workers_unsafe(dap_events_socket_t * a_es, dap_worker_t * a_worker_new); -// Non-MT functions -dap_events_socket_t * dap_worker_esocket_find_uuid(dap_worker_t * a_worker, dap_events_socket_uuid_t a_es_uuid); - void dap_events_socket_set_readable_unsafe(dap_events_socket_t * sc,bool is_ready); void dap_events_socket_set_writable_unsafe(dap_events_socket_t * sc,bool is_ready); -void dap_events_socket_worker_poll_update_unsafe(dap_events_socket_t * a_esocket); - size_t dap_events_socket_write_unsafe(dap_events_socket_t *sc, const void * data, size_t data_size); size_t dap_events_socket_write_f_unsafe(dap_events_socket_t *sc, const char * format,...); diff --git a/dap-sdk/net/core/include/dap_net.h b/dap-sdk/io/include/dap_net.h similarity index 100% rename from dap-sdk/net/core/include/dap_net.h rename to dap-sdk/io/include/dap_net.h diff --git a/dap-sdk/net/core/include/dap_proc_queue.h b/dap-sdk/io/include/dap_proc_queue.h similarity index 74% rename from dap-sdk/net/core/include/dap_proc_queue.h rename to dap-sdk/io/include/dap_proc_queue.h index cd75526c8c096e86a2ed689a34063ef638a22763..97ca1f262860bed2731502593bd9de38f7ef7a49 100644 --- a/dap-sdk/net/core/include/dap_proc_queue.h +++ b/dap-sdk/io/include/dap_proc_queue.h @@ -31,21 +31,21 @@ typedef bool (*dap_proc_queue_callback_t)(dap_proc_thread_t *, void *); // C // we want to stop callback execution and // not to go on next loop enum { - DAP_QUE$K_PRI0 = 0, /* Lowest priority (Idle) */ - DAP_QUE$K_PRI_IDLE = DAP_QUE$K_PRI0, /* Don't use Idle if u are not sure that understand how it works */ + DAP_PROC_PRI_0 = 0, /* Lowest priority (Idle) */ + DAP_PROC_PRI_IDLE = DAP_PROC_PRI_0, /* Don't use Idle if u are not sure that understand how it works */ - DAP_QUE$K_PRI1 = 1, /* Low priority */ - DAP_QUE$K_PRI_LOW = DAP_QUE$K_PRI1, + DAP_PROC_PRI_1 = 1, /* Low priority */ + DAP_PROC_PRI_LOW = DAP_PROC_PRI_1, - DAP_QUE$K_PRI2 = 2, - DAP_QUE$K_PRI_NORMAL = DAP_QUE$K_PRI2, /* Default priority for any queue's entry; + DAP_PROC_PRI_2 = 2, + DAP_PROC_PRI_NORMAL = DAP_PROC_PRI_2, /* Default priority for any queue's entry; has assigned implicitly */ - DAP_QUE$K_PRI3 = 3, /* Higest priority */ - DAP_QUE$K_PRI_HIGH = DAP_QUE$K_PRI3, + DAP_PROC_PRI_3 = 3, /* Higest priority */ + DAP_PROC_PRI_HIGH = DAP_PROC_PRI_3, - DAP_QUE$K_PRIMAX = 4 /* End-of-list marker */ + DAP_PROC_PRI_MAX = 4 /* End-of-list marker */ }; #define DAP_QUE$K_ITER_NR 7 @@ -63,9 +63,9 @@ typedef struct dap_proc_queue{ dap_events_socket_t *esocket; struct { - pthread_mutex_t lock; /* To coordinate access to the queuee's entries */ - dap_slist_t items; /* List of the queue' entries */ - } list [DAP_QUE$K_PRIMAX]; /* An array of list according of priority numbers */ + pthread_mutex_t lock; /* To coordinate access to the queuee's entries */ + dap_slist_t items; /* List of the queue' entries */ + } list [DAP_PROC_PRI_MAX]; /* An array of list according of priority numbers */ } dap_proc_queue_t; dap_proc_queue_t *dap_proc_queue_create(dap_proc_thread_t * a_thread); @@ -77,4 +77,5 @@ int dap_proc_queue_add_callback_inter(dap_events_socket_t * a_es_input, dap_proc int dap_proc_queue_add_callback_ext(dap_worker_t * a_worker, dap_proc_queue_callback_t a_callback, void * a_callback_arg, int a_pri); int dap_proc_queue_add_callback_inter_ext(dap_events_socket_t * a_es_input, dap_proc_queue_callback_t a_callback, void * a_callback_arg, int ); +int dap_proc_thread_add_callback_mt(dap_proc_thread_t * a_thread, dap_proc_queue_callback_t a_callback, void * a_callback_arg, int a_pri); diff --git a/dap-sdk/net/core/include/dap_proc_thread.h b/dap-sdk/io/include/dap_proc_thread.h similarity index 94% rename from dap-sdk/net/core/include/dap_proc_thread.h rename to dap-sdk/io/include/dap_proc_thread.h index fb592d30cfe66020901a7aeba41bc4c0a8b5240c..5eb2df20900e0ec46d76b2d19e2d496d93d51dfc 100644 --- a/dap-sdk/net/core/include/dap_proc_thread.h +++ b/dap-sdk/io/include/dap_proc_thread.h @@ -27,7 +27,7 @@ #include "dap_proc_queue.h" #include "dap_worker.h" #include "dap_common.h" - +#include "dap_context.h" typedef struct dap_proc_thread{ uint32_t cpu_id; pthread_t thread_id; /* TID has been returned by pthread_create() */ @@ -43,6 +43,7 @@ typedef struct dap_proc_thread{ dap_events_socket_t *queue_gdb_input; /* Inputs for request to GDB, @RRL: #6238 */ + dap_context_t * context; int signal_kill; int signal_exit; @@ -87,6 +88,8 @@ int dap_proc_thread_esocket_update_poll_flags(dap_proc_thread_t * a_thread, dap_ typedef void (*dap_proc_worker_callback_t)(dap_worker_t *,void *); -void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a_worker_id, dap_proc_worker_callback_t a_callback, void * a_arg); +void dap_proc_thread_worker_exec_callback_inter(dap_proc_thread_t * a_thread, size_t a_worker_id, dap_proc_worker_callback_t a_callback, void * a_arg); int dap_proc_thread_assign_esocket_unsafe(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket); + + diff --git a/dap-sdk/net/core/include/dap_server.h b/dap-sdk/io/include/dap_server.h similarity index 100% rename from dap-sdk/net/core/include/dap_server.h rename to dap-sdk/io/include/dap_server.h diff --git a/dap-sdk/net/core/include/dap_timerfd.h b/dap-sdk/io/include/dap_timerfd.h similarity index 100% rename from dap-sdk/net/core/include/dap_timerfd.h rename to dap-sdk/io/include/dap_timerfd.h diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/io/include/dap_worker.h similarity index 78% rename from dap-sdk/net/core/include/dap_worker.h rename to dap-sdk/io/include/dap_worker.h index 4ddf9f9d30f40fc7e1caa1727e2d5c7ab96e2578..6e9ed819923bc8ae14a5e116cf864f6d13754884 100644 --- a/dap-sdk/net/core/include/dap_worker.h +++ b/dap-sdk/io/include/dap_worker.h @@ -23,26 +23,24 @@ #pragma once #include <pthread.h> +#include <stdatomic.h> #include "dap_events_socket.h" #include "dap_proc_queue.h" #include "dap_common.h" #include "dap_events.h" +#include "dap_context.h" +typedef struct dap_context dap_context_t; //typedef struct dap_proc_queue dap_proc_queue_t; typedef struct dap_timerfd dap_timerfd_t; typedef struct dap_worker { - uint32_t id; + uint32_t id; dap_events_t* events; dap_proc_queue_t* proc_queue; dap_events_socket_t *proc_queue_input; - uint32_t event_sockets_count; - pthread_rwlock_t esocket_rwlock; - dap_events_socket_t *esockets; // Hashmap of event sockets - - // Signal to exit - int signal_exit; + atomic_uint event_sockets_count; // worker control queues dap_events_socket_t *queue_es_new; // Queue socket for new socket @@ -64,30 +62,8 @@ typedef struct dap_worker dap_events_socket_t *queue_gdb_input; /* Inputs for request to GDB, @RRL: #6238 */ dap_timerfd_t * timer_check_activity; -#if defined DAP_EVENTS_CAPS_MSMQ - HANDLE msmq_events[MAXIMUM_WAIT_OBJECTS]; -#endif -#if defined DAP_EVENTS_CAPS_EPOLL - EPOLL_HANDLE epoll_fd; -#elif defined ( DAP_EVENTS_CAPS_POLL) - int poll_fd; - struct pollfd * poll; - dap_events_socket_t ** poll_esocket; - atomic_uint poll_count; - size_t poll_count_max; - bool poll_compress; // Some of fd's became NULL so arrays need to be reassigned -#elif defined (DAP_EVENTS_CAPS_KQUEUE) - int kqueue_fd; - struct kevent * kqueue_events_selected; - struct kevent * kqueue_events; - size_t kqueue_events_count; - - int kqueue_events_count_max; - int kqueue_events_selected_count_max; -#else -#error "Not defined worker for your platform" -#endif + dap_context_t *context; pthread_cond_t started_cond; pthread_mutex_t started_mutex; void * _inheritor; @@ -124,7 +100,14 @@ extern "C" { int dap_worker_init( size_t a_conn_timeout ); void dap_worker_deinit(); -int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_worker_t * a_worker); +static inline int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_worker_t * a_worker) +{ + int l_ret = dap_context_add_esocket(a_worker->context, a_esocket); + if (l_ret == 0 ) + a_esocket->worker = a_worker; + return l_ret; +} + void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_worker_t * a_worker); void dap_worker_add_events_socket_inter(dap_events_socket_t * a_es_input, dap_events_socket_t * a_events_socket); dap_worker_t *dap_worker_add_events_socket_auto( dap_events_socket_t * a_events_socket ); diff --git a/dap-sdk/net/core/libdap-net-core.pri b/dap-sdk/io/libdap-io.pri similarity index 80% rename from dap-sdk/net/core/libdap-net-core.pri rename to dap-sdk/io/libdap-io.pri index 2aa609cf66bb8c400a58bb40c9941aca317c1dbd..37dbe8509c076376eb21f8eb626019b0cc6a9cc2 100755 --- a/dap-sdk/net/core/libdap-net-core.pri +++ b/dap-sdk/io/libdap-io.pri @@ -5,7 +5,8 @@ HEADERS += $$PWD/include/dap_events.h \ $$PWD/include/dap_proc_thread.h \ $$PWD/include/dap_server.h \ $$PWD/include/dap_timerfd.h \ - $$PWD/include/dap_worker.h + $$PWD/include/dap_context.h \ + $$PWD/include/dap_worker.h SOURCES += $$PWD/dap_events.c \ $$PWD/dap_events_socket.c \ @@ -14,6 +15,7 @@ SOURCES += $$PWD/dap_events.c \ $$PWD/dap_proc_thread.c \ $$PWD/dap_server.c \ $$PWD/dap_timerfd.c \ - $$PWD/dap_worker.c + $$PWD/dap_context.c \ + $$PWD/dap_worker.c INCLUDEPATH += $$PWD/include diff --git a/dap-sdk/io/test/CMakeLists.txt b/dap-sdk/io/test/CMakeLists.txt new file mode 100755 index 0000000000000000000000000000000000000000..a5a6ffe1a3685b504d2ff6938e80ad27c3fd855f --- /dev/null +++ b/dap-sdk/io/test/CMakeLists.txt @@ -0,0 +1,17 @@ +project(server_io_test) + +if ( NOT ( TARGET dap_test ) ) + add_subdirectory(libdap-test) +endif() + +file(GLOB DAP_IO_TEST_SOURCES *.c) +file(GLOB DAP_IO_TEST_HEADERS *.h) + +add_executable(${PROJECT_NAME} ${DAP_IO_TEST_SOURCES} ${DAP_IO_TEST_HEADERS}) + +target_link_libraries(${PROJECT_NAME} dap_test dap_core dap_crypto dap_io) + +add_test( + NAME server_io_test + COMMAND server_io_test +) diff --git a/dap-sdk/net/core/test/dap_traffic_track_test.c b/dap-sdk/io/test/dap_traffic_track_test.c similarity index 100% rename from dap-sdk/net/core/test/dap_traffic_track_test.c rename to dap-sdk/io/test/dap_traffic_track_test.c diff --git a/dap-sdk/net/core/test/dap_traffic_track_test.h b/dap-sdk/io/test/dap_traffic_track_test.h similarity index 100% rename from dap-sdk/net/core/test/dap_traffic_track_test.h rename to dap-sdk/io/test/dap_traffic_track_test.h diff --git a/dap-sdk/net/core/test/main.c b/dap-sdk/io/test/main.c similarity index 100% rename from dap-sdk/net/core/test/main.c rename to dap-sdk/io/test/main.c diff --git a/dap-sdk/net/client/CMakeLists.txt b/dap-sdk/net/client/CMakeLists.txt index 4c1645ab84c8b52b1adb456dca7bc21175fba756..a96449ad5fc2ad37f85bd075aca0e9ccbc8814c0 100644 --- a/dap-sdk/net/client/CMakeLists.txt +++ b/dap-sdk/net/client/CMakeLists.txt @@ -9,9 +9,9 @@ add_library(${PROJECT_NAME} STATIC ${DAP_CLIENT_HEADERS} ${DAP_CLIENT_SOURCES}) if(DAPSDK_MODULES MATCHES "ssl-support") - target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_server_core dap_http_server dap_enc_server dap_stream dap_session dap_stream_ch json-c wolfssl) + target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_io dap_http_server dap_enc_server dap_stream dap_session dap_stream_ch json-c wolfssl) else() - target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_server_core dap_http_server dap_enc_server dap_stream dap_session dap_stream_ch json-c) + target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_io dap_http_server dap_enc_server dap_stream dap_session dap_stream_ch json-c) endif() if(UNIX AND NOT ANDROID AND NOT DARWIN) target_link_libraries(${PROJECT_NAME} rt) diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index 562f89d634f97a8158ed5c6b2a3c52c4ada70105..c321f6ab142e9920f10bf1b331aa478135a74b80 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -187,7 +187,7 @@ static bool s_timer_timeout_after_connected_check(void * a_arg) dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); // We're in own esocket context assert(l_worker); - dap_events_socket_t * l_es = dap_worker_esocket_find_uuid( l_worker, *l_es_uuid_ptr); + dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid( l_worker->context, *l_es_uuid_ptr); if(l_es){ dap_client_http_pvt_t * l_http_pvt = PVT(l_es); assert(l_http_pvt); @@ -224,7 +224,7 @@ static bool s_timer_timeout_check(void * a_arg) dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); // We're in own esocket context assert(l_worker); - dap_events_socket_t * l_es = dap_worker_esocket_find_uuid(l_worker, *l_es_uuid); + dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid(l_worker->context, *l_es_uuid); if(l_es){ if (l_es->flags & DAP_SOCK_CONNECTING ){ dap_client_http_pvt_t * l_http_pvt = PVT(l_es); diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 75d8b06c0b2550e289243b6f775b534184c408ac..bc9237ba22084b9fa93ff65ab0b5b27e9d1fcb78 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -233,7 +233,7 @@ static bool s_stream_timer_timeout_check(void * a_arg) dap_worker_t *l_worker = dap_events_get_current_worker(dap_events_get_default()); assert(l_worker); - dap_events_socket_t * l_es = dap_worker_esocket_find_uuid(l_worker, *l_es_uuid_ptr); + dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid(l_worker->context, *l_es_uuid_ptr); if(l_es){ if (l_es->flags & DAP_SOCK_CONNECTING ){ dap_client_pvt_t *l_client_pvt = DAP_ESOCKET_CLIENT_PVT(l_es); @@ -275,7 +275,7 @@ static bool s_stream_timer_timeout_after_connected_check(void * a_arg) dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); assert(l_worker); - dap_events_socket_t * l_es = dap_worker_esocket_find_uuid(l_worker, *l_es_uuid_ptr); + dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid(l_worker->context, *l_es_uuid_ptr); if( l_es ){ dap_client_pvt_t *l_client_pvt = DAP_ESOCKET_CLIENT_PVT(l_es); if (dap_client_pvt_find(l_client_pvt->uuid)) { @@ -315,7 +315,7 @@ static bool s_enc_init_delay_before_request_timer_callback(void * a_arg) assert (a_arg); dap_events_socket_uuid_t* l_es_uuid_ptr = (dap_events_socket_uuid_t*) a_arg; dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); - dap_events_socket_t * l_es = dap_worker_esocket_find_uuid(l_worker, *l_es_uuid_ptr); + dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid(l_worker->context, *l_es_uuid_ptr); if(l_es){ dap_client_pvt_t *l_client_pvt = DAP_ESOCKET_CLIENT_PVT(l_es); s_stage_status_after(l_client_pvt); diff --git a/dap-sdk/net/client/test/CMakeLists.txt b/dap-sdk/net/client/test/CMakeLists.txt index 065db522c9ca6c0ce661dc47e244e3f7c16cf6dc..db92897878bba8f5c160c4ad3dd2fd24ea4d324d 100755 --- a/dap-sdk/net/client/test/CMakeLists.txt +++ b/dap-sdk/net/client/test/CMakeLists.txt @@ -10,7 +10,7 @@ file(GLOB DAP_CLIENT_TEST_HEADERS *.h) add_executable(${PROJECT_NAME} ${DAP_CLIENT_TEST_SOURCES} ${DAP_CLIENT_TEST_HEADERS}) -target_link_libraries(${PROJECT_NAME} dap_test dap_core dap_crypto dap_server_core ev pthread) +target_link_libraries(${PROJECT_NAME} dap_test dap_core dap_crypto dap_io ev pthread) add_test( NAME dap_client_test diff --git a/dap-sdk/net/core/test/CMakeLists.txt b/dap-sdk/net/core/test/CMakeLists.txt deleted file mode 100755 index 1149697b7fd05108808487b46e94be484231cdd7..0000000000000000000000000000000000000000 --- a/dap-sdk/net/core/test/CMakeLists.txt +++ /dev/null @@ -1,17 +0,0 @@ -project(server_core_test) - -if ( NOT ( TARGET dap_test ) ) - add_subdirectory(libdap-test) -endif() - -file(GLOB DAP_SERVER_CORE_TEST_SOURCES *.c) -file(GLOB DAP_SERVER_CORE_TEST_HEADERS *.h) - -add_executable(${PROJECT_NAME} ${DAP_SERVER_CORE_TEST_SOURCES} ${DAP_SERVER_CORE_TEST_HEADERS}) - -target_link_libraries(${PROJECT_NAME} dap_test dap_core dap_crypto dap_server_core ev) - -add_test( - NAME server_core_test - COMMAND server_core_test -) diff --git a/dap-sdk/net/libdap-net.pri b/dap-sdk/net/libdap-net.pri index f753e5f14e8b91509e61f8dbe4e81e441219bf29..bd828a94f94b863d6214f6375434fb6ff0ef3e99 100755 --- a/dap-sdk/net/libdap-net.pri +++ b/dap-sdk/net/libdap-net.pri @@ -1,6 +1,5 @@ include (../net/client/libdap-net-client.pri) -include (../net/core/libdap-net-core.pri) include (../net/server/libdap-net-server.pri) include (../net/stream/libdap-net-stream.pri) diff --git a/dap-sdk/net/server/enc_server/CMakeLists.txt b/dap-sdk/net/server/enc_server/CMakeLists.txt index 4f95407a85ec0b43c836ac9f5da15bb12ae3fe64..8aaba2be3cae6e9d374e519d10c15d462df4228e 100644 --- a/dap-sdk/net/server/enc_server/CMakeLists.txt +++ b/dap-sdk/net/server/enc_server/CMakeLists.txt @@ -19,7 +19,7 @@ target_include_directories(${PROJECT_NAME} PUBLIC include) target_include_directories(${PROJECT_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../../../../3rdparty/uthash/src) if(WIN32) - target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_server_core dap_http_server json-c + target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_io dap_http_server json-c KERNEL32 USER32 SHELL32 @@ -41,6 +41,6 @@ if(WIN32) endif() if(UNIX) - target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_server_core dap_http_server json-c ) + target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_io dap_http_server json-c ) endif() diff --git a/dap-sdk/net/server/http_server/CMakeLists.txt b/dap-sdk/net/server/http_server/CMakeLists.txt index e4f3397887bd535d710929565b5021bca94cd3d7..2af0aec59f1e03d422c56188ec11e02fe7aa031d 100644 --- a/dap-sdk/net/server/http_server/CMakeLists.txt +++ b/dap-sdk/net/server/http_server/CMakeLists.txt @@ -26,7 +26,7 @@ if(DARWIN) endif() if(WIN32) - target_link_libraries(dap_http_server dap_core dap_crypto dap_server_core magic regex tre intl iconv + target_link_libraries(dap_http_server dap_core dap_crypto dap_io magic regex tre intl iconv KERNEL32 USER32 SHELL32 @@ -48,7 +48,7 @@ if(WIN32) endif() if(UNIX) - target_link_libraries(${PROJECT_NAME} dap_core dap_server_core magic json-c) + target_link_libraries(${PROJECT_NAME} dap_core dap_io magic json-c) target_include_directories(${PROJECT_NAME} PUBLIC ${IPUTILS_INCLUDE_DIRS}) endif() diff --git a/dap-sdk/net/server/json_rpc/CMakeLists.txt b/dap-sdk/net/server/json_rpc/CMakeLists.txt index 30207dcaa5881382df12e9c164e4bea61e7d0771..5dcedf1f91891868501c7d969f2b6c1f3984a9b6 100644 --- a/dap-sdk/net/server/json_rpc/CMakeLists.txt +++ b/dap-sdk/net/server/json_rpc/CMakeLists.txt @@ -14,5 +14,5 @@ target_include_directories(${PROJECT_NAME} INTERFACE . include/) target_include_directories(${PROJECT_NAME} PUBLIC include) target_include_directories(${PROJECT_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../../../../3rdparty/uthash/src) -target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_server_core dap_http_server dap_client json-c ) +target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_io dap_http_server dap_client json-c ) #target_link_libraries(${PROJECT_NAME} dap_core)# dap_http_server json-c) diff --git a/dap-sdk/net/server/notify_server/CMakeLists.txt b/dap-sdk/net/server/notify_server/CMakeLists.txt index 8390370099ff6c10b1d12bf7be69d16e16f23d3c..de3816c16ef052fe0e3b688b58bd0d34c373925e 100644 --- a/dap-sdk/net/server/notify_server/CMakeLists.txt +++ b/dap-sdk/net/server/notify_server/CMakeLists.txt @@ -8,5 +8,5 @@ add_library(${PROJECT_NAME} STATIC ${DAP_NOTIFY_SRV_SRCS} ${DAP_NOTIFY_SRV_HDRS} target_include_directories(${PROJECT_NAME} INTERFACE . include/) target_include_directories(${PROJECT_NAME} PUBLIC include) -target_link_libraries(${PROJECT_NAME} dap_core dap_server_core json-c ) +target_link_libraries(${PROJECT_NAME} dap_core dap_io json-c ) diff --git a/dap-sdk/net/stream/stream/CMakeLists.txt b/dap-sdk/net/stream/stream/CMakeLists.txt index 11d4b6896a85fa96512ad37d6a9c72cf06a36831..5cc4ec636542842c0eb1720158484e25beaa016c 100755 --- a/dap-sdk/net/stream/stream/CMakeLists.txt +++ b/dap-sdk/net/stream/stream/CMakeLists.txt @@ -6,7 +6,7 @@ file(GLOB STREAM_HDRS include/*.h) add_library(${PROJECT_NAME} STATIC ${STREAM_SRCS} ${STREAM_HDRS}) -target_link_libraries(dap_stream dap_core dap_server_core dap_crypto +target_link_libraries(dap_stream dap_core dap_io dap_crypto dap_http_server dap_enc_server dap_session dap_stream_ch dap_client) target_include_directories(dap_stream INTERFACE .) diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index 359d81b5348b175d8c0aa853388aa0b3ea153682..6e781e35001c63e390f7ec24a8ef9d3cf131d440 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -889,7 +889,7 @@ static bool s_callback_keepalive(void *a_arg, bool a_server_side) return false; dap_events_socket_uuid_t * l_es_uuid = (dap_events_socket_uuid_t*) a_arg; dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); - dap_events_socket_t * l_es = dap_worker_esocket_find_uuid(l_worker, *l_es_uuid); + dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid(l_worker->context, *l_es_uuid); if(l_es) { dap_stream_t *l_stream = NULL; if (a_server_side) { diff --git a/modules/chain/btc_rpc/CMakeLists.txt b/modules/chain/btc_rpc/CMakeLists.txt index 930181f7283cee04960465242c2c5448d8ee00b7..5a8bfff563162fe9ef2eb56b524418a2bb29b1ea 100644 --- a/modules/chain/btc_rpc/CMakeLists.txt +++ b/modules/chain/btc_rpc/CMakeLists.txt @@ -14,5 +14,5 @@ target_include_directories(${PROJECT_NAME} INTERFACE . include/) target_include_directories(${PROJECT_NAME} PUBLIC include) target_link_libraries(${PROJECT_NAME} dap_core dap_json_rpc) -#target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_server_core dap_http_server dap_client json-c ) +#target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_io dap_http_server dap_client json-c ) #target_link_libraries(${PROJECT_NAME} dap_core)# dap_http_server json-c) diff --git a/modules/channel/chain-net-srv/CMakeLists.txt b/modules/channel/chain-net-srv/CMakeLists.txt index 08b1ea5c1996bb155320f30f87a658a9759dab1d..e9c147d210b11713e39472c28ee466d62fe1089a 100644 --- a/modules/channel/chain-net-srv/CMakeLists.txt +++ b/modules/channel/chain-net-srv/CMakeLists.txt @@ -6,7 +6,7 @@ file(GLOB DAP_STREAM_CH_CHAIN_NET_SRV_HDRS include/*.h) add_library(${PROJECT_NAME} STATIC ${DAP_STREAM_CH_CHAIN_NET_SRV_SRCS} ${DAP_STREAM_CH_CHAIN_NET_SRV_HDRS}) -target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_chain_common dap_chain dap_chain_mempool dap_chain_net dap_chain_net_srv dap_server_core dap_stream dap_stream_ch dap_stream_ch_chain dap_stream_ch_chain_net) +target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_chain_common dap_chain dap_chain_mempool dap_chain_net dap_chain_net_srv dap_io dap_stream dap_stream_ch dap_stream_ch_chain dap_stream_ch_chain_net) target_include_directories(${PROJECT_NAME} INTERFACE .) target_include_directories(${PROJECT_NAME} PUBLIC include) diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index add1f75c8d46dddb91ef9aafedcd1434df9341eb..617dc0178693ffc00a04955e9b21a18fc194eb71 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -305,10 +305,10 @@ static bool s_sync_out_chains_proc_callback(dap_proc_thread_t *a_thread, void *a //pthread_rwlock_unlock(&l_chain->atoms_rwlock); - dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_chains_first_worker_callback, l_sync_request ); + dap_proc_thread_worker_exec_callback_inter(a_thread, l_sync_request->worker->id, s_sync_out_chains_first_worker_callback, l_sync_request ); } else { //pthread_rwlock_unlock(&l_chain->atoms_rwlock); - dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id,s_sync_out_chains_last_worker_callback, l_sync_request ); + dap_proc_thread_worker_exec_callback_inter(a_thread, l_sync_request->worker->id,s_sync_out_chains_last_worker_callback, l_sync_request ); } return true; } @@ -415,9 +415,9 @@ static bool s_sync_out_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_ar log_it(L_DEBUG, "Sync out gdb proc, requested %"DAP_UINT64_FORMAT_U" transactions from address "NODE_ADDR_FP_STR, l_ch_chain->request_db_log->items_number, NODE_ADDR_FP_ARGS_S(l_sync_request->request.node_addr)); l_sync_request->gdb.db_log = l_ch_chain->request_db_log; - dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_gdb_first_worker_callback, l_sync_request ); + dap_proc_thread_worker_exec_callback_inter(a_thread, l_sync_request->worker->id, s_sync_out_gdb_first_worker_callback, l_sync_request ); } else { - dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_gdb_last_worker_callback, l_sync_request ); + dap_proc_thread_worker_exec_callback_inter(a_thread, l_sync_request->worker->id, s_sync_out_gdb_last_worker_callback, l_sync_request ); } return true; } @@ -472,7 +472,7 @@ static bool s_sync_update_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a l_ch_chain->state = CHAIN_STATE_UPDATE_GLOBAL_DB; l_sync_request->gdb.db_log = l_ch_chain->request_db_log; l_sync_request->request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); - dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_update_gdb_start_worker_callback, l_sync_request); + dap_proc_thread_worker_exec_callback_inter(a_thread, l_sync_request->worker->id, s_sync_update_gdb_start_worker_callback, l_sync_request); return true; } @@ -743,7 +743,7 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) l_sync_req_tsd->request.id_end = l_last_id; l_sync_req_tsd->gdb.sync_group = l_obj->type == DAP_DB$K_OPTYPE_ADD ? dap_strdup(l_last_group) : dap_strdup_printf("%s.del", l_last_group); - dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, + dap_proc_thread_worker_exec_callback_inter(a_thread, l_sync_request->worker->id, s_gdb_sync_tsd_worker_callback, l_sync_req_tsd); } l_last_id = l_obj->id; @@ -806,7 +806,7 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) // save data to global_db if(!dap_chain_global_db_obj_save(l_obj, 1)) { struct sync_request *l_sync_req_err = DAP_DUP(l_sync_request); - dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, + dap_proc_thread_worker_exec_callback_inter(a_thread, l_sync_request->worker->id, s_gdb_in_pkt_error_worker_callback, l_sync_req_err); } else if (s_debug_more) log_it(L_DEBUG, "Added new GLOBAL_DB synchronization record"); diff --git a/modules/global-db/dap_chain_global_db.c b/modules/global-db/dap_chain_global_db.c index 7546c2e6add927f5105944a26b7678c7d5f2b1d0..3220ef0871176f8c571066ca7ffc5eef04a5d12a 100644 --- a/modules/global-db/dap_chain_global_db.c +++ b/modules/global-db/dap_chain_global_db.c @@ -827,8 +827,6 @@ dap_events_socket_t *l_es; if ( l_wrk && l_proc_thd ) return log_it(L_ERROR, "Both <worker> or <proc_thread> contexts are NOT NULL"), -EINVAL; - /* So, at this point we should decide a what <event socket> context will be used */ - l_es = l_wrk ? l_wrk->esockets : l_proc_thd->esockets[0]; if ( !(l_db_req = DAP_NEW_Z(dap_grobal_db_req_t)) ) /* Allocate memory for new DB Request context */ return log_it(L_ERROR, "Cannot allocate memory for DB Request, errno=%d", errno), -errno; diff --git a/modules/net/CMakeLists.txt b/modules/net/CMakeLists.txt index 0c5b0075b36ca522fdb0b47ea8ae20b82e01ef44..ea6f12058037b9e67982566cdfd3b2d75a312dca 100644 --- a/modules/net/CMakeLists.txt +++ b/modules/net/CMakeLists.txt @@ -37,16 +37,16 @@ endif() add_library(${PROJECT_NAME} STATIC ${DAP_CHAIN_NET_SRCS} ${DAP_CHAIN_NET_HEADERS} ${IPUTILS_SRCS} ${IPUTILS_HEADERS}) if(WIN32) - target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_client dap_server_core dap_notify_srv dap_stream_ch_chain dap_stream_ch_chain_net dap_stream_ch_chain_net_srv dap_chain dap_chain_wallet dap_chain_net_srv dap_stream_ch_chain_voting + target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_client dap_io dap_notify_srv dap_stream_ch_chain dap_stream_ch_chain_net dap_stream_ch_chain_net_srv dap_chain dap_chain_wallet dap_chain_net_srv dap_stream_ch_chain_voting dap_chain_mempool dap_chain_global_db dap_chain_cs_none) endif() if(LINUX) - target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_server_core dap_notify_srv dap_client dap_stream_ch_chain dap_stream_ch_chain_net dap_stream_ch_chain_net_srv dap_stream_ch_chain_voting dap_chain + target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_io dap_notify_srv dap_client dap_stream_ch_chain dap_stream_ch_chain_net dap_stream_ch_chain_net_srv dap_stream_ch_chain_voting dap_chain dap_chain_wallet dap_chain_net_srv dap_chain_mempool dap_chain_global_db dap_chain_cs_none resolv ) elseif(BSD) - target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_server_core dap_notify_srv dap_client dap_stream_ch_chain dap_stream_ch_chain_net dap_stream_ch_chain_net_srv dap_stream_ch_chain_voting dap_chain + target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_io dap_notify_srv dap_client dap_stream_ch_chain dap_stream_ch_chain_net dap_stream_ch_chain_net_srv dap_stream_ch_chain_voting dap_chain dap_chain_wallet dap_chain_net_srv dap_chain_mempool dap_chain_global_db dap_chain_cs_none ) endif() diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 6a53d0cca80f206dc8851e3d5db0b64417cfa3b2..e27494f7e5cc68d52f93a2441316b624c81d5480 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -211,7 +211,7 @@ dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_events_socket_ dap_events_socket_t * l_es = NULL; dap_events_socket_uuid_t l_es_uuid = l_me->esocket_uuid; // check if esocket still in worker - if( (l_es = dap_worker_esocket_find_uuid(l_worker, l_es_uuid)) != NULL ) { + if( (l_es = dap_context_esocket_find_by_uuid(l_worker->context, l_es_uuid)) != NULL ) { dap_client_t * l_client = dap_client_from_esocket(l_es); if (l_client ) { dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t*) l_client->_inheritor; diff --git a/modules/net/dap_chain_node_dns_client.c b/modules/net/dap_chain_node_dns_client.c index 1122be737e9606639e4952727a6ea6d224ad2523..3ed6c7f12ba94d2abf364501547a8455e7bec1fc 100644 --- a/modules/net/dap_chain_node_dns_client.c +++ b/modules/net/dap_chain_node_dns_client.c @@ -139,7 +139,7 @@ static bool s_dns_client_esocket_timeout_callback(void * a_arg) assert(l_worker); dap_events_socket_t * l_es; - if((l_es = dap_worker_esocket_find_uuid(l_worker ,*l_es_uuid_ptr) ) != NULL){ // If we've not closed this esocket + if((l_es = dap_context_esocket_find_by_uuid(l_worker->context,*l_es_uuid_ptr) ) != NULL){ // If we've not closed this esocket struct dns_client * l_dns_client = (struct dns_client*) l_es->_inheritor; log_it(L_WARNING,"DNS request timeout, bad network?"); if(! l_dns_client->is_callbacks_called ){ diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 3376bcd0556feb9e8ee0a20180d575ecb401f66b..5b9ff4421933a85b2ff6b13f0c6e9210d89309bf 100755 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -50,7 +50,7 @@ target_link_libraries(dap_global_db_test -pthread -lrt) target_link_libraries(dap_global_db_test ${DAP_LIBSDK_ROOT}/modules/global-db/libdap_chain_global_db.a) target_link_libraries(dap_global_db_test ${DAP_LIBSDK_ROOT}/dap-sdk/crypto/libdap_crypto.a) -target_link_libraries(dap_global_db_test ${DAP_LIBSDK_ROOT}/dap-sdk/net/core/libdap_server_core.a) +target_link_libraries(dap_global_db_test ${DAP_LIBSDK_ROOT}/dap-sdk/net/core/libdap_io.a) target_link_libraries(dap_global_db_test ${DAP_LIBSDK_ROOT}/dap-sdk/core/libdap_core.a) target_link_libraries(dap_global_db_test ${DAP_LIBSDK_ROOT}/3rdparty/cuttdb/libdap_cuttdb.a) target_link_libraries(dap_global_db_test -lsqlite3)