From cb1456c2995832cd894d7f7ca02ea6cdfec0f864 Mon Sep 17 00:00:00 2001 From: Dmitry Gerasimov <dmitriy.gerasimov@demlabs.net> Date: Tue, 31 May 2022 19:38:35 +0700 Subject: [PATCH] [*] Intermediate step, smth is working now Former-commit-id: 7fc56e3b642f814c8d339422e8215f889cf9f926 --- dap-sdk/io/dap_context.c | 443 ++++++++++++- dap-sdk/io/dap_events.c | 1 + dap-sdk/io/dap_events_socket.c | 446 +------------ dap-sdk/io/dap_proc_queue.c | 5 +- dap-sdk/io/dap_proc_thread.c | 619 +----------------- dap-sdk/io/dap_server.c | 6 +- dap-sdk/io/dap_timerfd.c | 16 +- dap-sdk/io/dap_worker.c | 53 +- dap-sdk/io/include/dap_context.h | 13 +- dap-sdk/io/include/dap_events_socket.h | 5 +- dap-sdk/io/include/dap_proc_thread.h | 21 +- dap-sdk/io/include/dap_worker.h | 5 +- dap-sdk/net/client/dap_client_pvt.c | 2 +- .../net/server/http_server/dap_http_simple.c | 4 +- .../server/notify_server/src/dap_notify_srv.c | 6 +- dap-sdk/net/stream/stream/dap_stream.c | 4 +- dap-sdk/net/stream/stream/dap_stream_worker.c | 2 +- modules/global-db/dap_chain_global_db.c | 4 +- modules/net/dap_chain_node_client.c | 4 +- modules/net/dap_chain_node_dns_client.c | 12 +- 20 files changed, 573 insertions(+), 1098 deletions(-) diff --git a/dap-sdk/io/dap_context.c b/dap-sdk/io/dap_context.c index 7c0b353e15..3c7b735846 100644 --- a/dap-sdk/io/dap_context.c +++ b/dap-sdk/io/dap_context.c @@ -20,6 +20,13 @@ You should have received a copy of the GNU General Public License along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>. */ +#include <stdlib.h> +#include <stdio.h> +#include <stdarg.h> +#include <string.h> +#include <assert.h> +#include <errno.h> +#include <stdatomic.h> #if ! defined (_GNU_SOURCE) #define _GNU_SOURCE /* See feature_test_macros(7) */ @@ -34,6 +41,28 @@ #include <ws2tcpip.h> #endif +#if defined (DAP_OS_LINUX) +#include <sys/epoll.h> +#include <sys/types.h> +#include <sys/select.h> +#include <unistd.h> +#include <sys/socket.h> +#include <arpa/inet.h> +#elif defined (DAP_OS_BSD) +#include <sys/types.h> +#include <sys/select.h> +#include <unistd.h> +#include <sys/socket.h> +#include <arpa/inet.h> + +#elif defined (DAP_OS_WINDOWS) +#include <winsock2.h> +#include <windows.h> +#include <mswsock.h> +#include <io.h> + +#endif + #ifdef DAP_OS_DARWIN #define NOTE_READ NOTE_LOWAT @@ -43,14 +72,21 @@ #endif +#if defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE) +#include <sys/time.h> +#include <sys/resource.h> +#endif + #define LOG_TAG "dap_context" #include "dap_common.h" +#include "dap_uuid.h" #include "dap_context.h" #include "dap_worker.h" - +#include "dap_events_socket.h" pthread_key_t g_dap_context_pth_key; + /** * @brief dap_context_init * @return @@ -850,7 +886,7 @@ int dap_context_thread_loop(dap_context_t * a_context) * @brief dap_context_poll_update * @param a_esocket */ -void dap_context_poll_update(dap_events_socket_t * a_esocket) +int dap_context_poll_update(dap_events_socket_t * a_esocket) { #if defined (DAP_EVENTS_CAPS_EPOLL) int events = a_esocket->ev_base_flags | EPOLLERR; @@ -876,6 +912,7 @@ void dap_context_poll_update(dap_events_socket_t * a_esocket) strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); log_it(L_ERROR,"Can't update client socket state in the epoll_fd %"DAP_FORMAT_HANDLE": \"%s\" (%d)", a_esocket->context->epoll_fd, l_errbuf, l_errno); + return l_errno; } } #elif defined (DAP_EVENTS_CAPS_POLL) @@ -891,6 +928,7 @@ void dap_context_poll_update(dap_events_socket_t * a_esocket) }else{ log_it(L_ERROR, "Wrong poll index when remove from context (unsafe): %u when total count %u", a_esocket->poll_index, a_esocket->context->poll_count); + return -666; } } #elif defined (DAP_EVENTS_CAPS_KQUEUE) @@ -903,6 +941,7 @@ void dap_context_poll_update(dap_events_socket_t * a_esocket) int l_kqueue_fd = a_esocket->context? a_esocket->context->kqueue_fd : -1; if ( l_kqueue_fd == -1 ){ log_it(L_ERROR, "Esocket is not assigned with anything ,exit"); + return -667; } // Check & add @@ -944,13 +983,14 @@ void dap_context_poll_update(dap_events_socket_t * a_esocket) strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); log_it(L_ERROR,"Can't update client socket state on kqueue fd %d: \"%s\" (%d)", l_kqueue_fd, l_errbuf, l_errno); + return l_errno; } } #else #error "Not defined dap_events_socket_set_writable_unsafe for your platform" #endif - + return 0; } @@ -961,6 +1001,9 @@ void dap_context_poll_update(dap_events_socket_t * a_esocket) */ int dap_context_add_esocket(dap_context_t * a_context, dap_events_socket_t * a_esocket ) { + if(a_context == NULL || a_esocket == NULL) + return -1; + if(g_debug_reactor){ log_it(L_DEBUG,"Add event socket %p (socket %"DAP_FORMAT_SOCKET")", a_esocket, a_esocket->socket); } @@ -1092,3 +1135,397 @@ dap_events_socket_t *dap_context_esocket_find_by_uuid(dap_context_t * a_context, } return l_ret; } + +/** + * @brief dap_context_create_esocket_queue + * @param a_context + * @param a_callback + * @return + */ + dap_events_socket_t * dap_context_create_esocket_queue(dap_context_t * a_context, dap_events_socket_callback_queue_ptr_t a_callback) +{ + dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); + if(!l_es){ + log_it(L_ERROR,"Can't allocate esocket!"); + return NULL; + } + + l_es->type = DESCRIPTOR_TYPE_QUEUE; + l_es->flags = DAP_SOCK_QUEUE_PTR; + l_es->uuid = dap_uuid_generate_uint64(); + + l_es->callbacks.queue_ptr_callback = a_callback; // Arm event callback + l_es->buf_in_size_max = DAP_QUEUE_MAX_MSGS * sizeof(void*); + l_es->buf_in = DAP_NEW_Z_SIZE(byte_t,l_es->buf_in_size_max); + l_es->buf_out = NULL; + +#if defined(DAP_EVENTS_CAPS_EPOLL) + l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; +#elif defined(DAP_EVENTS_CAPS_POLL) + l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; +#elif defined(DAP_EVENTS_CAPS_KQUEUE) + l_es->kqueue_event_catched_data.esocket = l_es; + l_es->kqueue_base_flags = EV_CLEAR; + l_es->kqueue_base_fflags = 0; + l_es->kqueue_base_filter = EVFILT_USER; + l_es->socket = arc4random(); +#else +#error "Not defined s_create_type_queue_ptr for your platform" +#endif + + +#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) || defined(DAP_EVENTS_CAPS_QUEUE_PIPE) + int l_pipe[2]; + l_errbuf[0]=0; +#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) + if( pipe2(l_pipe,O_DIRECT | O_NONBLOCK ) < 0 ){ +#elif defined(DAP_EVENTS_CAPS_QUEUE_PIPE) + if( pipe(l_pipe) < 0 ){ +#endif + l_errno = errno; + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + switch (l_errno) { + case EINVAL: log_it(L_CRITICAL, "Too old linux version thats doesn't support O_DIRECT flag for pipes (%s)", l_errbuf); break; + default: log_it( L_ERROR, "Error detected, can't create pipe(): '%s' (%d)", l_errbuf, l_errno); + } + DAP_DELETE(l_es); + return NULL; + } + //else + // log_it(L_DEBUG, "Created one-way unnamed packet pipe %d->%d", l_pipe[0], l_pipe[1]); + l_es->fd = l_pipe[0]; + l_es->fd2 = l_pipe[1]; + +#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE) + // If we have no pipe2() we should set nonblock mode via fcntl + if (l_es->fd > 0 && l_es->fd2 > 0 ) { + int l_flags = fcntl(l_es->fd, F_GETFL, 0); + if (l_flags != -1){ + l_flags |= O_NONBLOCK); + fcntl(l_es->fd, F_SETFL, l_flags) == 0); + } + l_flags = fcntl(l_es->fd2, F_GETFL, 0); + if (l_flags != -1){ + l_flags |= O_NONBLOCK); + fcntl(l_es->fd2, F_SETFL, l_flags) == 0); + } + } +#endif + +#if !defined (DAP_OS_ANDROID) + FILE* l_sys_max_pipe_size_fd = fopen("/proc/sys/fs/pipe-max-size", "r"); + if (l_sys_max_pipe_size_fd) { + const int l_file_buf_size = 64; + char l_file_buf[l_file_buf_size]; + memset(l_file_buf, 0, l_file_buf_size); + fread(l_file_buf, l_file_buf_size, 1, l_sys_max_pipe_size_fd); + uint64_t l_sys_max_pipe_size = strtoull(l_file_buf, 0, 10); + fcntl(l_pipe[0], F_SETPIPE_SZ, l_sys_max_pipe_size); + fclose(l_sys_max_pipe_size_fd); + } +#endif + +#elif defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE) + int l_errno; + char l_errbuf[128] = {0}, l_mq_name[64] = {0}; + struct mq_attr l_mq_attr; + static atomic_uint l_mq_last_number = 0; + + + l_mq_attr.mq_maxmsg = DAP_QUEUE_MAX_MSGS; // Don't think we need to hold more than 1024 messages + l_mq_attr.mq_msgsize = sizeof (void*); // We send only pointer on memory (???!!!), + // so use it with shared memory if you do access from another process + + l_es->mqd_id = atomic_fetch_add( &l_mq_last_number, 1); + snprintf(l_mq_name,sizeof (l_mq_name), "/%s-queue_ptr-%u", dap_get_appname(), l_es->mqd_id ); + // if ( (l_errno = mq_unlink(l_mq_name)) ) /* Mark this MQ to be deleted as the process will be terminated */ + // log_it(L_DEBUG, "mq_unlink(%s)->%d", l_mq_name, l_errno); + + if ( 0 >= (l_es->mqd = mq_open(l_mq_name, O_CREAT|O_RDWR |O_NONBLOCK, 0700, &l_mq_attr)) ) + { + log_it(L_CRITICAL,"Can't create mqueue descriptor %s: \"%s\" code %d (%s)", l_mq_name, l_errbuf, errno, + (strerror_r(errno, l_errbuf, sizeof (l_errbuf)), l_errbuf) ); + + DAP_DELETE(l_es->buf_in); + DAP_DELETE(l_es); + return NULL; + } + +#elif defined DAP_EVENTS_CAPS_MSMQ + l_es->socket = socket(AF_INET, SOCK_DGRAM, 0); + + if (l_es->socket == INVALID_SOCKET) { + log_it(L_ERROR, "Error creating socket for TYPE_QUEUE: %d", WSAGetLastError()); + DAP_DELETE(l_es); + return NULL; + } + + int buffsize = 1024; + setsockopt(l_es->socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, sizeof(int)); + + int reuse = 1; + if (setsockopt(l_es->socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0) + log_it(L_WARNING, "Can't set up REUSEADDR flag to the socket, err: %d", WSAGetLastError()); + + unsigned long l_mode = 1; + ioctlsocket(l_es->socket, FIONBIO, &l_mode); + + int l_addr_len; + struct sockaddr_in l_addr; + l_addr.sin_family = AF_INET; + IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } }; + l_addr.sin_addr = _in_addr; + l_addr.sin_port = 0; //l_es->socket + 32768; + l_addr_len = sizeof(struct sockaddr_in); + + if (bind(l_es->socket, (struct sockaddr*)&l_addr, sizeof(l_addr)) < 0) { + log_it(L_ERROR, "Bind error: %d", WSAGetLastError()); + } else { + int dummy = 100; + getsockname(l_es->socket, (struct sockaddr*)&l_addr, &dummy); + l_es->port = l_addr.sin_port; + //log_it(L_DEBUG, "Bound to port %d", l_addr.sin_port); + } + + MQQUEUEPROPS l_qps; + MQPROPVARIANT l_qp_var[1]; + QUEUEPROPID l_qp_id[1]; + HRESULT l_q_status[1]; + + WCHAR l_pathname[MQ_MAX_Q_NAME_LEN - 10] = { 0 }; + static atomic_uint s_queue_num = 0; + int pos = 0; +#ifdef DAP_BRAND + pos = _snwprintf_s(l_pathname, sizeof(l_pathname)/sizeof(l_pathname[0]), _TRUNCATE, L".\\PRIVATE$\\" DAP_BRAND "mq%d", l_es->mq_num = s_queue_num++); +#else + pos = _snwprintf_s(l_pathname, sizeof(l_pathname)/sizeof(l_pathname[0]), _TRUNCATE, L".\\PRIVATE$\\%hs_esmq%d", dap_get_appname(), l_es->mq_num = s_queue_num++); +#endif + if (pos < 0) { + log_it(L_ERROR, "Message queue path error"); + DAP_DELETE(l_es); + return NULL; + } + u_long l_p_id = 0; + l_qp_id[l_p_id] = PROPID_Q_PATHNAME; + l_qp_var[l_p_id].vt = VT_LPWSTR; + l_qp_var[l_p_id].pwszVal = l_pathname; + l_p_id++; + + l_qps.cProp = l_p_id; + l_qps.aPropID = l_qp_id; + l_qps.aPropVar = l_qp_var; + l_qps.aStatus = l_q_status; + + WCHAR l_direct_name[MQ_MAX_Q_NAME_LEN] = { 0 }; + WCHAR l_format_name[sizeof(l_direct_name) - 10] = { 0 }; + DWORD l_buflen = sizeof(l_format_name); + HRESULT hr = MQCreateQueue(NULL, &l_qps, l_format_name, &l_buflen); + if ((hr != MQ_OK) && (hr != MQ_ERROR_QUEUE_EXISTS) && (hr != MQ_INFORMATION_PROPERTY)) { + log_it(L_ERROR, "Can't create message queue for queue type, error: %ld", hr); + DAP_DELETE(l_es); + return NULL; + } + _snwprintf_s(l_direct_name, sizeof(l_direct_name)/sizeof(l_direct_name[0]), _TRUNCATE, L"DIRECT=OS:%ls", l_pathname); + + hr = MQOpenQueue(l_direct_name, MQ_SEND_ACCESS, MQ_DENY_NONE, &(l_es->mqh)); + if (hr == MQ_ERROR_QUEUE_NOT_FOUND) { + log_it(L_INFO, "Queue still not created, wait a bit..."); + Sleep(300); + hr = MQOpenQueue(l_direct_name, MQ_SEND_ACCESS, MQ_DENY_NONE, &(l_es->mqh)); + if (hr != MQ_OK) { + log_it(L_ERROR, "Can't open message queue for queue type, error: %ld", hr); + DAP_DELETE(l_es); + MQDeleteQueue(l_format_name); + return NULL; + } + } + hr = MQOpenQueue(l_direct_name, MQ_RECEIVE_ACCESS, MQ_DENY_NONE, &(l_es->mqh_recv)); + if (hr != MQ_OK) { + log_it(L_ERROR, "Can't open message queue for queue type, error: %ld", hr); + DAP_DELETE(l_es); + MQCloseQueue(l_es->mqh); + MQDeleteQueue(l_format_name); + return NULL; + } + hr = MQPurgeQueue(l_es->mqh_recv); + if (hr != MQ_OK) { + log_it(L_DEBUG, "Message queue %u NOT purged, possible data corruption, err %ld", l_es->mq_num, hr); + } +#elif defined (DAP_EVENTS_CAPS_KQUEUE) + // We don't create descriptor for kqueue at all +#else +#error "Not implemented s_create_type_queue_ptr() on your platform" +#endif + + if ( a_context) { + if(dap_context_add_esocket(a_context, l_es)) { +#ifdef DAP_OS_WINDOWS + errno = WSAGetLastError(); +#endif + log_it(L_ERROR, "Can't add esocket %"DAP_FORMAT_SOCKET" to polling, err %d", l_es->socket, errno); + } + } + + return l_es; +} + +/** + * @brief s_create_type_event + * @param a_context + * @param a_callback + * @return + */ +dap_events_socket_t * dap_context_create_esocket_event(dap_context_t * a_context, dap_events_socket_callback_event_t a_callback) +{ + dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); if (!l_es) return NULL; + l_es->buf_out_size_max = l_es->buf_in_size_max = 1; + l_es->buf_out = DAP_NEW_Z_SIZE(byte_t, l_es->buf_out_size_max); + l_es->type = DESCRIPTOR_TYPE_EVENT; + l_es->uuid = dap_uuid_generate_uint64(); + + l_es->callbacks.event_callback = a_callback; // Arm event callback +#if defined(DAP_EVENTS_CAPS_EPOLL) + l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; +#elif defined(DAP_EVENTS_CAPS_POLL) + l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; +#elif defined(DAP_EVENTS_CAPS_KQUEUE) + l_es->kqueue_base_flags = EV_CLEAR; + l_es->kqueue_base_filter = EVFILT_USER; + l_es->socket = arc4random(); + l_es->kqueue_event_catched_data.esocket = l_es; +#else +#error "Not defined s_create_type_event for your platform" +#endif + +#ifdef DAP_EVENTS_CAPS_EVENT_EVENTFD + if((l_es->fd = eventfd(0,EFD_NONBLOCK) ) < 0 ){ + int l_errno = errno; + char l_errbuf[128]; + l_errbuf[0]=0; + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + switch (l_errno) { + case EINVAL: log_it(L_CRITICAL, "An unsupported value was specified in flags: \"%s\" (%d)", l_errbuf, l_errno); break; + case EMFILE: log_it(L_CRITICAL, "The per-process limit on the number of open file descriptors has been reached: \"%s\" (%d)", l_errbuf, l_errno); break; + case ENFILE: log_it(L_CRITICAL, "The system-wide limit on the total number of open files has been reached: \"%s\" (%d)", l_errbuf, l_errno); break; + case ENODEV: log_it(L_CRITICAL, "Could not mount (internal) anonymous inode device: \"%s\" (%d)", l_errbuf, l_errno); break; + case ENOMEM: log_it(L_CRITICAL, "There was insufficient memory to create a new eventfd file descriptor: \"%s\" (%d)", l_errbuf, l_errno); break; + default: log_it( L_ERROR, "Error detected, can't create eventfd: '%s' (%d)", l_errbuf, l_errno); + } + DAP_DELETE(l_es); + return NULL; + }else { + l_es->fd2 = l_es->fd; + //log_it(L_DEBUG, "Created eventfd descriptor %d", l_es->fd ); + } +#elif defined DAP_OS_WINDOWS + + + l_es->socket = socket(AF_INET, SOCK_DGRAM, 0); + + if (l_es->socket == INVALID_SOCKET) { + log_it(L_ERROR, "Error creating socket for TYPE_QUEUE: %d", WSAGetLastError()); + DAP_DELETE(l_es); + return NULL; + } + + int buffsize = 1024; + setsockopt(l_es->socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, sizeof(int)); + + unsigned long l_mode = 1; + ioctlsocket(l_es->socket, FIONBIO, &l_mode); + + int reuse = 1; + if (setsockopt(l_es->socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0) + log_it(L_WARNING, "Can't set up REUSEADDR flag to the socket, err: %d", WSAGetLastError()); + + int l_addr_len; + struct sockaddr_in l_addr; + l_addr.sin_family = AF_INET; + IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } }; + l_addr.sin_addr = _in_addr; + l_addr.sin_port = 0; //l_es->socket + 32768; + l_addr_len = sizeof(struct sockaddr_in); + + if (bind(l_es->socket, (struct sockaddr*)&l_addr, sizeof(l_addr)) < 0) { + log_it(L_ERROR, "Bind error: %d", WSAGetLastError()); + } else { + int dummy = 100; + getsockname(l_es->socket, (struct sockaddr*)&l_addr, &dummy); + l_es->port = l_addr.sin_port; + //log_it(L_DEBUG, "Bound to port %d", l_addr.sin_port); + } +#elif defined(DAP_EVENTS_CAPS_KQUEUE) + // nothing to do +#else +#error "Not defined dap_context_create_esocket_event() on your platform" +#endif + if(a_context) + dap_context_add_esocket(a_context,l_es); + return l_es; +} + + +/** + * @brief dap_context_create_esocket_pipe + * @param a_context + * @param a_callback + * @param a_flags + * @return + */ +dap_events_socket_t * dap_context_create_esocket_pipe(dap_context_t * a_context, dap_events_socket_callback_t a_callback, uint32_t a_flags) +{ +#ifdef DAP_OS_WINDOWS + UNUSED(a_w); + UNUSED(a_callback); + UNUSED(a_flags); + return NULL; +#else + UNUSED(a_flags); + dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); + l_es->type = DESCRIPTOR_TYPE_PIPE; + l_es->uuid = dap_uuid_generate_uint64(); + l_es->callbacks.read_callback = a_callback; // Arm event callback +#if defined(DAP_EVENTS_CAPS_EPOLL) + l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; +#elif defined(DAP_EVENTS_CAPS_POLL) + l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; +#elif defined(DAP_EVENTS_CAPS_KQUEUE) + l_es->kqueue_event_catched_data.esocket = l_es; + l_es->kqueue_base_flags = EV_ENABLE | EV_CLEAR; + l_es->kqueue_base_fflags = NOTE_DELETE | NOTE_REVOKE ; +#if !defined(DAP_OS_DARWIN) + l_es->kqueue_base_fflags |= NOTE_CLOSE | NOTE_CLOSE_WRITE ; +#endif + l_es->kqueue_base_filter = EVFILT_VNODE; +#else +#error "Not defined s_create_type_pipe for your platform" +#endif + +#if defined(DAP_EVENTS_CAPS_PIPE_POSIX) + int l_pipe[2]; + int l_errno; + char l_errbuf[128]; + l_errbuf[0]=0; + if( pipe(l_pipe) < 0 ){ + l_errno = errno; + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + log_it( L_ERROR, "Error detected, can't create pipe(): '%s' (%d)", l_errbuf, l_errno); + DAP_DELETE(l_es); + return NULL; + }//else + // log_it(L_DEBUG, "Created one-way unnamed bytestream pipe %d->%d", l_pipe[0], l_pipe[1]); + l_es->fd = l_pipe[0]; + l_es->fd2 = l_pipe[1]; +#if defined DAP_OS_UNIX + fcntl( l_pipe[0], F_SETFL, O_NONBLOCK); + fcntl( l_pipe[1], F_SETFL, O_NONBLOCK); + // this sort of fd doesn't suit ioctlsocket()... +#endif + +#else +#error "No defined s_create_type_pipe() for your platform" +#endif + dap_context_add_esocket(a_context,l_es); + return l_es; +#endif +} diff --git a/dap-sdk/io/dap_events.c b/dap-sdk/io/dap_events.c index 25a58a0679..64f75ff883 100644 --- a/dap-sdk/io/dap_events.c +++ b/dap-sdk/io/dap_events.c @@ -341,6 +341,7 @@ int dap_events_start( dap_events_t *a_events ) l_worker->id = i; l_worker->events = a_events; l_worker->context = dap_context_new(); + l_worker->context->worker = l_worker; pthread_mutex_init(& l_worker->started_mutex, NULL); pthread_cond_init( & l_worker->started_cond, NULL); diff --git a/dap-sdk/io/dap_events_socket.c b/dap-sdk/io/dap_events_socket.c index 5a0b5205f1..7ea684c728 100644 --- a/dap-sdk/io/dap_events_socket.c +++ b/dap-sdk/io/dap_events_socket.c @@ -85,6 +85,7 @@ typedef cpuset_t cpu_set_t; // Adopt BSD CPU setstructure to POSIX variant #include "dap_events.h" #include "dap_timerfd.h" +#include "dap_context.h" #include "dap_events_socket.h" #define LOG_TAG "dap_events_socket" @@ -192,7 +193,6 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, return NULL; l_ret->socket = a_sock; - l_ret->events = a_events; l_ret->uuid = dap_uuid_generate_uint64(); if (a_callbacks) memcpy(&l_ret->callbacks, a_callbacks, sizeof(l_ret->callbacks) ); @@ -254,7 +254,7 @@ void dap_events_socket_assign_on_worker_inter(dap_events_socket_t * a_es_input, */ void dap_events_socket_reassign_between_workers_unsafe(dap_events_socket_t * a_es, dap_worker_t * a_worker_new) { - dap_worker_t * l_worker = a_es->worker; + dap_worker_t * l_worker = a_es->context->worker; dap_events_socket_t * l_queue_input= l_worker->queue_es_new_input[a_worker_new->id]; log_it(L_DEBUG, "Reassign between %u->%u workers: %p (%d) ", l_worker->id, a_worker_new->id, a_es, a_es->fd ); @@ -288,71 +288,7 @@ void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, } } -/** - * @brief s_create_type_pipe - * @param a_w - * @param a_callback - * @param a_flags - * @return - */ -dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags) -{ -#ifdef DAP_OS_WINDOWS - UNUSED(a_w); - UNUSED(a_callback); - UNUSED(a_flags); - return NULL; -#else - UNUSED(a_flags); - dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); - l_es->type = DESCRIPTOR_TYPE_PIPE; - l_es->worker = a_w; - l_es->events = a_w->events; - l_es->uuid = dap_uuid_generate_uint64(); - l_es->callbacks.read_callback = a_callback; // Arm event callback -#if defined(DAP_EVENTS_CAPS_EPOLL) - l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; -#elif defined(DAP_EVENTS_CAPS_POLL) - l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; -#elif defined(DAP_EVENTS_CAPS_KQUEUE) - l_es->kqueue_event_catched_data.esocket = l_es; - l_es->kqueue_base_flags = EV_ENABLE | EV_CLEAR; - l_es->kqueue_base_fflags = NOTE_DELETE | NOTE_REVOKE ; -#if !defined(DAP_OS_DARWIN) - l_es->kqueue_base_fflags |= NOTE_CLOSE | NOTE_CLOSE_WRITE ; -#endif - l_es->kqueue_base_filter = EVFILT_VNODE; -#else -#error "Not defined s_create_type_pipe for your platform" -#endif - -#if defined(DAP_EVENTS_CAPS_PIPE_POSIX) - int l_pipe[2]; - int l_errno; - char l_errbuf[128]; - l_errbuf[0]=0; - if( pipe(l_pipe) < 0 ){ - l_errno = errno; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it( L_ERROR, "Error detected, can't create pipe(): '%s' (%d)", l_errbuf, l_errno); - DAP_DELETE(l_es); - return NULL; - }//else - // log_it(L_DEBUG, "Created one-way unnamed bytestream pipe %d->%d", l_pipe[0], l_pipe[1]); - l_es->fd = l_pipe[0]; - l_es->fd2 = l_pipe[1]; -#if defined DAP_OS_UNIX - fcntl( l_pipe[0], F_SETFL, O_NONBLOCK); - fcntl( l_pipe[1], F_SETFL, O_NONBLOCK); - // this sort of fd doesn't suit ioctlsocket()... -#endif -#else -#error "No defined s_create_type_pipe() for your platform" -#endif - return l_es; -#endif -} /** * @brief dap_events_socket_create_type_pipe_mt @@ -363,7 +299,7 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c */ dap_events_socket_t * dap_events_socket_create_type_pipe_mt(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags) { - dap_events_socket_t * l_es = s_create_type_pipe(a_w, a_callback, a_flags); + dap_events_socket_t * l_es = dap_context_create_esocket_pipe(NULL, a_callback, a_flags); dap_worker_add_events_socket_unsafe(l_es,a_w); return l_es; } @@ -433,7 +369,7 @@ dap_events_socket_t * dap_events_socket_create(dap_events_desc_type_t a_type, da */ dap_events_socket_t * dap_events_socket_create_type_pipe_unsafe(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags) { - dap_events_socket_t * l_es = s_create_type_pipe(a_w, a_callback, a_flags); + dap_events_socket_t * l_es = dap_context_create_esocket_pipe(NULL, a_callback, a_flags); dap_worker_add_events_socket_unsafe(l_es,a_w); return l_es; } @@ -470,7 +406,6 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket l_es->buf_in_size_max = DAP_QUEUE_MAX_MSGS * sizeof(void*); l_es->buf_in = DAP_NEW_Z_SIZE(byte_t,l_es->buf_in_size_max ); //l_es->buf_out_size = 8 * sizeof(void*); - l_es->events = a_es->events; l_es->uuid = dap_uuid_generate_uint64(); #if defined(DAP_EVENTS_CAPS_EPOLL) l_es->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP; @@ -566,232 +501,6 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket return l_es; } -/** - * @brief s_create_type_queue - * @param a_w - * @param a_flags - * @return - */ -dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback) -{ - dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); - if(!l_es){ - log_it(L_ERROR,"Can't allocate esocket!"); - return NULL; - } - - l_es->type = DESCRIPTOR_TYPE_QUEUE; - l_es->flags = DAP_SOCK_QUEUE_PTR; - l_es->uuid = dap_uuid_generate_uint64(); - if (a_w){ - l_es->events = a_w->events; - l_es->worker = a_w; - } - - l_es->callbacks.queue_ptr_callback = a_callback; // Arm event callback - l_es->buf_in_size_max = DAP_QUEUE_MAX_MSGS * sizeof(void*); - l_es->buf_in = DAP_NEW_Z_SIZE(byte_t,l_es->buf_in_size_max); - l_es->buf_out = NULL; - -#if defined(DAP_EVENTS_CAPS_EPOLL) - l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; -#elif defined(DAP_EVENTS_CAPS_POLL) - l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; -#elif defined(DAP_EVENTS_CAPS_KQUEUE) - l_es->kqueue_event_catched_data.esocket = l_es; - l_es->kqueue_base_flags = EV_CLEAR; - l_es->kqueue_base_fflags = 0; - l_es->kqueue_base_filter = EVFILT_USER; - l_es->socket = arc4random(); -#else -#error "Not defined s_create_type_queue_ptr for your platform" -#endif - - -#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) || defined(DAP_EVENTS_CAPS_QUEUE_PIPE) - int l_pipe[2]; - l_errbuf[0]=0; -#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) - if( pipe2(l_pipe,O_DIRECT | O_NONBLOCK ) < 0 ){ -#elif defined(DAP_EVENTS_CAPS_QUEUE_PIPE) - if( pipe(l_pipe) < 0 ){ -#endif - l_errno = errno; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - switch (l_errno) { - case EINVAL: log_it(L_CRITICAL, "Too old linux version thats doesn't support O_DIRECT flag for pipes (%s)", l_errbuf); break; - default: log_it( L_ERROR, "Error detected, can't create pipe(): '%s' (%d)", l_errbuf, l_errno); - } - DAP_DELETE(l_es); - return NULL; - } - //else - // log_it(L_DEBUG, "Created one-way unnamed packet pipe %d->%d", l_pipe[0], l_pipe[1]); - l_es->fd = l_pipe[0]; - l_es->fd2 = l_pipe[1]; - -#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE) - // If we have no pipe2() we should set nonblock mode via fcntl - if (l_es->fd > 0 && l_es->fd2 > 0 ) { - int l_flags = fcntl(l_es->fd, F_GETFL, 0); - if (l_flags != -1){ - l_flags |= O_NONBLOCK); - fcntl(l_es->fd, F_SETFL, l_flags) == 0); - } - l_flags = fcntl(l_es->fd2, F_GETFL, 0); - if (l_flags != -1){ - l_flags |= O_NONBLOCK); - fcntl(l_es->fd2, F_SETFL, l_flags) == 0); - } - } -#endif - -#if !defined (DAP_OS_ANDROID) - FILE* l_sys_max_pipe_size_fd = fopen("/proc/sys/fs/pipe-max-size", "r"); - if (l_sys_max_pipe_size_fd) { - const int l_file_buf_size = 64; - char l_file_buf[l_file_buf_size]; - memset(l_file_buf, 0, l_file_buf_size); - fread(l_file_buf, l_file_buf_size, 1, l_sys_max_pipe_size_fd); - uint64_t l_sys_max_pipe_size = strtoull(l_file_buf, 0, 10); - fcntl(l_pipe[0], F_SETPIPE_SZ, l_sys_max_pipe_size); - fclose(l_sys_max_pipe_size_fd); - } -#endif - -#elif defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE) - int l_errno; - char l_errbuf[128] = {0}, l_mq_name[64] = {0}; - struct mq_attr l_mq_attr; - static atomic_uint l_mq_last_number = 0; - - - l_mq_attr.mq_maxmsg = DAP_QUEUE_MAX_MSGS; // Don't think we need to hold more than 1024 messages - l_mq_attr.mq_msgsize = sizeof (void*); // We send only pointer on memory (???!!!), - // so use it with shared memory if you do access from another process - - l_es->mqd_id = atomic_fetch_add( &l_mq_last_number, 1); - snprintf(l_mq_name,sizeof (l_mq_name), "/%s-queue_ptr-%u", dap_get_appname(), l_es->mqd_id ); - // if ( (l_errno = mq_unlink(l_mq_name)) ) /* Mark this MQ to be deleted as the process will be terminated */ - // log_it(L_DEBUG, "mq_unlink(%s)->%d", l_mq_name, l_errno); - - if ( 0 >= (l_es->mqd = mq_open(l_mq_name, O_CREAT|O_RDWR |O_NONBLOCK, 0700, &l_mq_attr)) ) - { - log_it(L_CRITICAL,"Can't create mqueue descriptor %s: \"%s\" code %d (%s)", l_mq_name, l_errbuf, errno, - (strerror_r(errno, l_errbuf, sizeof (l_errbuf)), l_errbuf) ); - - DAP_DELETE(l_es->buf_in); - DAP_DELETE(l_es); - return NULL; - } - -#elif defined DAP_EVENTS_CAPS_MSMQ - l_es->socket = socket(AF_INET, SOCK_DGRAM, 0); - - if (l_es->socket == INVALID_SOCKET) { - log_it(L_ERROR, "Error creating socket for TYPE_QUEUE: %d", WSAGetLastError()); - DAP_DELETE(l_es); - return NULL; - } - - int buffsize = 1024; - setsockopt(l_es->socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, sizeof(int)); - - int reuse = 1; - if (setsockopt(l_es->socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0) - log_it(L_WARNING, "Can't set up REUSEADDR flag to the socket, err: %d", WSAGetLastError()); - - unsigned long l_mode = 1; - ioctlsocket(l_es->socket, FIONBIO, &l_mode); - - int l_addr_len; - struct sockaddr_in l_addr; - l_addr.sin_family = AF_INET; - IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } }; - l_addr.sin_addr = _in_addr; - l_addr.sin_port = 0; //l_es->socket + 32768; - l_addr_len = sizeof(struct sockaddr_in); - - if (bind(l_es->socket, (struct sockaddr*)&l_addr, sizeof(l_addr)) < 0) { - log_it(L_ERROR, "Bind error: %d", WSAGetLastError()); - } else { - int dummy = 100; - getsockname(l_es->socket, (struct sockaddr*)&l_addr, &dummy); - l_es->port = l_addr.sin_port; - //log_it(L_DEBUG, "Bound to port %d", l_addr.sin_port); - } - - MQQUEUEPROPS l_qps; - MQPROPVARIANT l_qp_var[1]; - QUEUEPROPID l_qp_id[1]; - HRESULT l_q_status[1]; - - WCHAR l_pathname[MQ_MAX_Q_NAME_LEN - 10] = { 0 }; - static atomic_uint s_queue_num = 0; - int pos = 0; -#ifdef DAP_BRAND - pos = _snwprintf_s(l_pathname, sizeof(l_pathname)/sizeof(l_pathname[0]), _TRUNCATE, L".\\PRIVATE$\\" DAP_BRAND "mq%d", l_es->mq_num = s_queue_num++); -#else - pos = _snwprintf_s(l_pathname, sizeof(l_pathname)/sizeof(l_pathname[0]), _TRUNCATE, L".\\PRIVATE$\\%hs_esmq%d", dap_get_appname(), l_es->mq_num = s_queue_num++); -#endif - if (pos < 0) { - log_it(L_ERROR, "Message queue path error"); - DAP_DELETE(l_es); - return NULL; - } - u_long l_p_id = 0; - l_qp_id[l_p_id] = PROPID_Q_PATHNAME; - l_qp_var[l_p_id].vt = VT_LPWSTR; - l_qp_var[l_p_id].pwszVal = l_pathname; - l_p_id++; - - l_qps.cProp = l_p_id; - l_qps.aPropID = l_qp_id; - l_qps.aPropVar = l_qp_var; - l_qps.aStatus = l_q_status; - - WCHAR l_direct_name[MQ_MAX_Q_NAME_LEN] = { 0 }; - WCHAR l_format_name[sizeof(l_direct_name) - 10] = { 0 }; - DWORD l_buflen = sizeof(l_format_name); - HRESULT hr = MQCreateQueue(NULL, &l_qps, l_format_name, &l_buflen); - if ((hr != MQ_OK) && (hr != MQ_ERROR_QUEUE_EXISTS) && (hr != MQ_INFORMATION_PROPERTY)) { - log_it(L_ERROR, "Can't create message queue for queue type, error: %ld", hr); - DAP_DELETE(l_es); - return NULL; - } - _snwprintf_s(l_direct_name, sizeof(l_direct_name)/sizeof(l_direct_name[0]), _TRUNCATE, L"DIRECT=OS:%ls", l_pathname); - - hr = MQOpenQueue(l_direct_name, MQ_SEND_ACCESS, MQ_DENY_NONE, &(l_es->mqh)); - if (hr == MQ_ERROR_QUEUE_NOT_FOUND) { - log_it(L_INFO, "Queue still not created, wait a bit..."); - Sleep(300); - hr = MQOpenQueue(l_direct_name, MQ_SEND_ACCESS, MQ_DENY_NONE, &(l_es->mqh)); - if (hr != MQ_OK) { - log_it(L_ERROR, "Can't open message queue for queue type, error: %ld", hr); - DAP_DELETE(l_es); - MQDeleteQueue(l_format_name); - return NULL; - } - } - hr = MQOpenQueue(l_direct_name, MQ_RECEIVE_ACCESS, MQ_DENY_NONE, &(l_es->mqh_recv)); - if (hr != MQ_OK) { - log_it(L_ERROR, "Can't open message queue for queue type, error: %ld", hr); - DAP_DELETE(l_es); - MQCloseQueue(l_es->mqh); - MQDeleteQueue(l_format_name); - return NULL; - } - hr = MQPurgeQueue(l_es->mqh_recv); - if (hr != MQ_OK) { - log_it(L_DEBUG, "Message queue %u NOT purged, possible data corruption, err %ld", l_es->mq_num, hr); - } -#elif defined (DAP_EVENTS_CAPS_KQUEUE) - // We don't create descriptor for kqueue at all -#else -#error "Not implemented s_create_type_queue_ptr() on your platform" -#endif - return l_es; -} /** * @brief dap_events_socket_create_type_queue_mt @@ -802,7 +511,7 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc */ dap_events_socket_t * dap_events_socket_create_type_queue_ptr_mt(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback) { - dap_events_socket_t * l_es = s_create_type_queue_ptr(a_w, a_callback); + dap_events_socket_t * l_es = dap_context_create_esocket_queue(NULL, a_callback); assert(l_es); // If no worker - don't assign if ( a_w) @@ -811,28 +520,6 @@ dap_events_socket_t * dap_events_socket_create_type_queue_ptr_mt(dap_worker_t * } -/** - * @brief dap_events_socket_create_type_queue - * @param a_w - * @param a_callback - * @return - */ -dap_events_socket_t * dap_events_socket_create_type_queue_ptr_unsafe(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback) -{ - dap_events_socket_t * l_es = s_create_type_queue_ptr(a_w, a_callback); - assert(l_es); - // If no worker - don't assign - if ( a_w) { - if(dap_worker_add_events_socket_unsafe(l_es,a_w)) { -#ifdef DAP_OS_WINDOWS - errno = WSAGetLastError(); -#endif - log_it(L_ERROR, "Can't add esocket %"DAP_FORMAT_SOCKET" to polling, err %d", l_es->socket, errno); - } - } - return l_es; -} - /** * @brief dap_events_socket_queue_proc_input * @param a_esocket @@ -919,7 +606,7 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) void * l_queue_ptr = a_esocket->kqueue_event_catched_data.data; size_t l_queue_ptr_size = a_esocket->kqueue_event_catched_data.size; if(g_debug_reactor) - log_it(L_INFO,"Queue received %z bytes on input", l_queue_ptr_size); + log_it(L_INFO,"Queue received %zd bytes on input", l_queue_ptr_size); a_esocket->callbacks.queue_callback(a_esocket, l_queue_ptr, l_queue_ptr_size); #else @@ -933,101 +620,6 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) return 0; } -/** - * @brief s_create_type_event - * @param a_w - * @param a_callback - * @return - */ -dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_callback_event_t a_callback) -{ - dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); if (!l_es) return NULL; - l_es->buf_out_size_max = l_es->buf_in_size_max = 1; - l_es->buf_out = DAP_NEW_Z_SIZE(byte_t, l_es->buf_out_size_max); - l_es->type = DESCRIPTOR_TYPE_EVENT; - l_es->uuid = dap_uuid_generate_uint64(); - if (a_w){ - l_es->events = a_w->events; - l_es->worker = a_w; - } - l_es->callbacks.event_callback = a_callback; // Arm event callback -#if defined(DAP_EVENTS_CAPS_EPOLL) - l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; -#elif defined(DAP_EVENTS_CAPS_POLL) - l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; -#elif defined(DAP_EVENTS_CAPS_KQUEUE) - l_es->kqueue_base_flags = EV_CLEAR; - l_es->kqueue_base_filter = EVFILT_USER; - l_es->socket = arc4random(); - l_es->kqueue_event_catched_data.esocket = l_es; -#else -#error "Not defined s_create_type_event for your platform" -#endif - -#ifdef DAP_EVENTS_CAPS_EVENT_EVENTFD - if((l_es->fd = eventfd(0,EFD_NONBLOCK) ) < 0 ){ - int l_errno = errno; - char l_errbuf[128]; - l_errbuf[0]=0; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - switch (l_errno) { - case EINVAL: log_it(L_CRITICAL, "An unsupported value was specified in flags: \"%s\" (%d)", l_errbuf, l_errno); break; - case EMFILE: log_it(L_CRITICAL, "The per-process limit on the number of open file descriptors has been reached: \"%s\" (%d)", l_errbuf, l_errno); break; - case ENFILE: log_it(L_CRITICAL, "The system-wide limit on the total number of open files has been reached: \"%s\" (%d)", l_errbuf, l_errno); break; - case ENODEV: log_it(L_CRITICAL, "Could not mount (internal) anonymous inode device: \"%s\" (%d)", l_errbuf, l_errno); break; - case ENOMEM: log_it(L_CRITICAL, "There was insufficient memory to create a new eventfd file descriptor: \"%s\" (%d)", l_errbuf, l_errno); break; - default: log_it( L_ERROR, "Error detected, can't create eventfd: '%s' (%d)", l_errbuf, l_errno); - } - DAP_DELETE(l_es); - return NULL; - }else { - l_es->fd2 = l_es->fd; - //log_it(L_DEBUG, "Created eventfd descriptor %d", l_es->fd ); - } -#elif defined DAP_OS_WINDOWS - - - l_es->socket = socket(AF_INET, SOCK_DGRAM, 0); - - if (l_es->socket == INVALID_SOCKET) { - log_it(L_ERROR, "Error creating socket for TYPE_QUEUE: %d", WSAGetLastError()); - DAP_DELETE(l_es); - return NULL; - } - - int buffsize = 1024; - setsockopt(l_es->socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, sizeof(int)); - - unsigned long l_mode = 1; - ioctlsocket(l_es->socket, FIONBIO, &l_mode); - - int reuse = 1; - if (setsockopt(l_es->socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0) - log_it(L_WARNING, "Can't set up REUSEADDR flag to the socket, err: %d", WSAGetLastError()); - - int l_addr_len; - struct sockaddr_in l_addr; - l_addr.sin_family = AF_INET; - IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } }; - l_addr.sin_addr = _in_addr; - l_addr.sin_port = 0; //l_es->socket + 32768; - l_addr_len = sizeof(struct sockaddr_in); - - if (bind(l_es->socket, (struct sockaddr*)&l_addr, sizeof(l_addr)) < 0) { - log_it(L_ERROR, "Bind error: %d", WSAGetLastError()); - } else { - int dummy = 100; - getsockname(l_es->socket, (struct sockaddr*)&l_addr, &dummy); - l_es->port = l_addr.sin_port; - //log_it(L_DEBUG, "Bound to port %d", l_addr.sin_port); - } -#elif defined(DAP_EVENTS_CAPS_KQUEUE) - // nothing to do -#else -#error "Not defined s_create_type_event() on your platform" -#endif - return l_es; -} /** * @brief dap_events_socket_create_type_event_mt @@ -1037,7 +629,7 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_ */ dap_events_socket_t * dap_events_socket_create_type_event_mt(dap_worker_t * a_w, dap_events_socket_callback_event_t a_callback) { - dap_events_socket_t * l_es = s_create_type_event(a_w, a_callback); + dap_events_socket_t * l_es = dap_context_create_esocket_event(NULL, a_callback); // If no worker - don't assign if ( a_w) dap_events_socket_assign_on_worker_mt(l_es,a_w); @@ -1052,7 +644,8 @@ dap_events_socket_t * dap_events_socket_create_type_event_mt(dap_worker_t * a_w, */ dap_events_socket_t * dap_events_socket_create_type_event_unsafe(dap_worker_t * a_w, dap_events_socket_callback_event_t a_callback) { - dap_events_socket_t * l_es = s_create_type_event(a_w, a_callback); + + dap_events_socket_t * l_es = dap_context_create_esocket_event(NULL, a_callback); // If no worker - don't assign if ( a_w) dap_worker_add_events_socket_unsafe(l_es,a_w); @@ -1455,12 +1048,12 @@ int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value * @brief dap_events_socket_queue_on_remove_and_delete * @param a_es */ -void dap_events_socket_queue_on_remove_and_delete(dap_events_socket_t* a_es) +void dap_events_socket_delete_mt(dap_worker_t * a_worker, dap_events_socket_uuid_t a_es_uuid) { dap_events_socket_uuid_t * l_es_uuid_ptr= DAP_NEW_Z(dap_events_socket_uuid_t); - *l_es_uuid_ptr = a_es->uuid; + *l_es_uuid_ptr = a_es_uuid; - int l_ret= dap_events_socket_queue_ptr_send( a_es->worker->queue_es_delete, l_es_uuid_ptr ); + int l_ret= dap_events_socket_queue_ptr_send( a_worker->queue_es_delete, l_es_uuid_ptr ); if( l_ret != 0 ){ log_it(L_ERROR, "Queue send returned %d", l_ret); DAP_DELETE(l_es_uuid_ptr); @@ -1486,7 +1079,6 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da dap_events_socket_t * l_es = DAP_NEW_Z( dap_events_socket_t ); if (!l_es) return NULL; l_es->socket = a_sock; - l_es->events = a_events; l_es->server = a_server; l_es->uuid = dap_uuid_generate_uint64(); memcpy(&l_es->callbacks,a_callbacks, sizeof ( l_es->callbacks) ); @@ -1636,7 +1228,7 @@ void dap_events_socket_remove_and_delete_unsafe_delayed( dap_events_socket_t *a_ l_es_handler->value = a_preserve_inheritor ? 1 : 0; dap_events_socket_descriptor_close(a_es); - dap_worker_t * l_worker = a_es->worker; + dap_worker_t * l_worker = a_es->context->worker; dap_events_socket_remove_from_worker_unsafe( a_es, l_worker); a_es->flags |= DAP_SOCK_SIGNAL_CLOSE; dap_timerfd_start_on_worker(l_worker, s_delayed_ops_timeout_ms, @@ -1660,7 +1252,7 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool #endif //log_it( L_DEBUG, "es is going to be removed from the lists and free the memory (0x%016X)", a_es ); - dap_events_socket_remove_from_worker_unsafe(a_es, a_es->worker); + dap_events_socket_remove_from_worker_unsafe(a_es, a_es->context->worker); if( a_es->callbacks.delete_callback ) a_es->callbacks.delete_callback( a_es, NULL ); // Init internal structure @@ -1719,7 +1311,7 @@ void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_p */ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap_worker_t * a_worker) { - if (!a_es->worker) { + if (!a_es->context->worker) { log_it(L_INFO, "No worker assigned to esocket %"DAP_FORMAT_SOCKET, a_es->socket); return; } @@ -1778,16 +1370,16 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap } } #elif defined (DAP_EVENTS_CAPS_POLL) - if (a_es->poll_index < a_worker->poll_count ){ - a_worker->poll[a_es->poll_index].fd = -1; - a_worker->poll_compress = true; + if (a_es->poll_index < a_worker->context->poll_count ){ + a_worker->context->poll[a_es->poll_index].fd = -1; + a_worker->context->poll_compress = true; }else{ log_it(L_ERROR, "Wrong poll index when remove from worker (unsafe): %u when total count %u", a_es->poll_index, a_worker->poll_count); } #else #error "Unimplemented new esocket on worker callback for current platform" #endif - a_es->worker = NULL; + a_es->context = NULL; } /** diff --git a/dap-sdk/io/dap_proc_queue.c b/dap-sdk/io/dap_proc_queue.c index 6ffc4b6387..5ad0766c74 100644 --- a/dap-sdk/io/dap_proc_queue.c +++ b/dap-sdk/io/dap_proc_queue.c @@ -83,10 +83,9 @@ dap_proc_queue_t * dap_proc_queue_create(dap_proc_thread_t * a_thread) } l_queue->proc_thread = a_thread; - l_queue->esocket = dap_events_socket_create_type_queue_ptr_unsafe(NULL,s_queue_esocket_callback); + l_queue->esocket = dap_context_create_esocket_queue(a_thread->context,s_queue_esocket_callback); l_queue->esocket->proc_thread = a_thread; l_queue->esocket->_inheritor = l_queue; - return l_queue; } @@ -315,6 +314,6 @@ int dap_proc_thread_add_callback_mt(dap_proc_thread_t * a_thread, dap_proc_queue l_msg->callback_arg = a_callback_arg; l_msg->pri = a_pri; - dap_events_socket_queue_ptr_send(a_thread->proc_queue->esocket, l_msg); + return dap_events_socket_queue_ptr_send(a_thread->proc_queue->esocket, l_msg); } diff --git a/dap-sdk/io/dap_proc_thread.c b/dap-sdk/io/dap_proc_thread.c index d492f7dbe5..70d2e1653f 100644 --- a/dap-sdk/io/dap_proc_thread.c +++ b/dap-sdk/io/dap_proc_thread.c @@ -73,8 +73,6 @@ static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags) * @param a_cpu_count 0 means autodetect * @return */ -static pthread_cond_t s_started_cond = PTHREAD_COND_INITIALIZER; -static pthread_mutex_t s_started_mutex = PTHREAD_MUTEX_INITIALIZER; int dap_proc_thread_init(uint32_t a_threads_count) { @@ -85,17 +83,23 @@ int l_ret = 0; for (uint32_t i = 0; i < s_threads_count; i++ ) { - s_threads[i].cpu_id = i; - pthread_mutex_lock( &s_started_mutex ); + dap_proc_thread_t * l_thread = s_threads + i; + l_thread->cpu_id = i; + l_thread->context = dap_context_new(); + l_thread->context->proc_thread = l_thread; + pthread_cond_init(&l_thread->started_cond, NULL); + pthread_mutex_init( &l_thread->started_mutex,NULL); - if ( (l_ret = pthread_create( &s_threads[i].thread_id,NULL, s_proc_thread_function, &s_threads[i] )) ) { + pthread_mutex_lock( &l_thread->started_mutex ); + + if ( (l_ret = pthread_create( &l_thread->thread_id,NULL, s_proc_thread_function, &s_threads[i] )) ) { log_it(L_CRITICAL, "Create thread failed with code %d", l_ret); - pthread_mutex_unlock( &s_started_mutex ); + pthread_mutex_unlock( &l_thread->started_mutex ); return l_ret; } - pthread_cond_wait( &s_started_cond, &s_started_mutex); - pthread_mutex_unlock( &s_started_mutex); + pthread_cond_wait( &l_thread->started_cond, &l_thread->started_mutex); + pthread_mutex_unlock( &l_thread->started_mutex); } return l_ret; @@ -106,9 +110,9 @@ int l_ret = 0; */ void dap_proc_thread_deinit() { -int l_rc; -size_t l_sz; -dap_proc_thread_t *l_proc_thread; + int l_rc = 0; + size_t l_sz = 0; + dap_proc_thread_t *l_proc_thread = NULL; for (uint32_t i = s_threads_count; i--; ){ dap_events_socket_event_signal(s_threads[i].event_exit, 1); @@ -177,25 +181,25 @@ unsigned l_id_min = 0, l_size_min = UINT32_MAX, l_queue_size; */ dap_proc_thread_t * dap_proc_thread_run_custom(void) { -dap_proc_thread_t * l_proc_thread = DAP_NEW_Z(dap_proc_thread_t); -int l_ret; + dap_proc_thread_t * l_proc_thread = DAP_NEW_Z(dap_proc_thread_t); + int l_ret; if (l_proc_thread == NULL) return log_it(L_CRITICAL,"Out of memory, can't create new proc thread, errno=%d", errno), NULL; - pthread_mutex_lock( &s_started_mutex ); + pthread_mutex_lock( &l_proc_thread->started_mutex ); if ( (l_ret = pthread_create( &l_proc_thread->thread_id ,NULL, s_proc_thread_function, l_proc_thread )) ) { log_it(L_CRITICAL, "Create thread failed with code %d", l_ret); DAP_DEL_Z (l_proc_thread); }else{ - pthread_cond_wait( &s_started_cond, &s_started_mutex); + pthread_cond_wait( &l_proc_thread->started_cond, &l_proc_thread->started_mutex); pthread_rwlock_wrlock(&s_custom_threads_rwlock); assert ( !s_dap_slist_add2tail (&s_custom_threads, l_proc_thread, sizeof(l_proc_thread)) ); pthread_rwlock_unlock(&s_custom_threads_rwlock); } - pthread_mutex_unlock( &s_started_mutex ); + pthread_mutex_unlock( &l_proc_thread->started_mutex ); return l_proc_thread; } @@ -326,104 +330,10 @@ int dap_proc_thread_assign_esocket_unsafe(dap_proc_thread_t * a_thread, dap_even #error "Unimplemented new esocket on worker callback for current platform" #endif - return dap_proc_thread_esocket_update_poll_flags(a_thread,a_esocket); + return dap_context_poll_update(a_esocket); } -/** - * @brief dap_proc_thread_esocket_update_poll_flags - * @param a_thread - * @param a_esocket - * @return - */ -int dap_proc_thread_esocket_update_poll_flags(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket) -{ -#ifdef DAP_EVENTS_CAPS_EPOLL - u_int events = a_esocket->ev_base_flags; - if( a_esocket->flags & DAP_SOCK_READY_TO_READ) { - events |= EPOLLIN; -#ifdef DAP_OS_WINDOWS - events ^= EPOLLONESHOT; -#endif - } - if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE) { - events |= EPOLLOUT; -#ifdef DAP_OS_WINDOWS - events |= EPOLLONESHOT; -#endif - } - a_esocket->ev.events = events; - if( epoll_ctl(a_thread->epoll_ctl, EPOLL_CTL_MOD, a_esocket->socket, &a_esocket->ev) != 0 ){ -#ifdef DAP_OS_WINDOWS - errno = WSAGetLastError(); -#endif - log_it(L_CRITICAL, "Can't add proc queue on epoll ctl, err: %d", errno); - return -1; - } -#elif defined (DAP_EVENTS_CAPS_POLL) - if ( a_thread->poll_count == a_thread->poll_count_max ){ // realloc - a_thread->poll_count_max *= 2; - log_it(L_WARNING, "Too many descriptors (%zu), resizing array twice to %zu", a_thread->poll_count, a_thread->poll_count_max); - a_thread->poll =DAP_REALLOC(a_thread->poll, a_thread->poll_count_max * sizeof(*a_thread->poll)); - a_thread->esockets =DAP_REALLOC(a_thread->esockets, a_thread->poll_count_max * sizeof(*a_thread->esockets)); - } - a_thread->poll[a_esocket->poll_index].events= a_esocket->poll_base_flags; - if( a_esocket->flags & DAP_SOCK_READY_TO_READ) - a_thread->poll[a_esocket->poll_index].events |= POLLIN; - if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE) - a_thread->poll[a_esocket->poll_index].events |= POLLOUT; - -#elif defined (DAP_EVENTS_CAPS_KQUEUE) - - u_short l_flags = a_esocket->kqueue_base_flags; - u_int l_fflags = a_esocket->kqueue_base_fflags; - short l_filter = a_esocket->kqueue_base_filter; - int l_kqueue_fd = a_esocket->proc_thread ? a_esocket->proc_thread->kqueue_fd : -1; - if ( l_kqueue_fd == -1 ){ - log_it(L_ERROR, "Esocket is not assigned with anything ,exit"); - } - struct kevent * l_event = &a_esocket->kqueue_event; - // Check & add - int l_is_error=false; - int l_errno=0; - if (a_esocket->type == DESCRIPTOR_TYPE_EVENT || a_esocket->type == DESCRIPTOR_TYPE_QUEUE){ - EV_SET(l_event, a_esocket->socket, EVFILT_USER,EV_ADD| EV_CLEAR ,0,0, &a_esocket->kqueue_event_catched_data ); - if( kevent( l_kqueue_fd,l_event,1,NULL,0,NULL)!=0){ - l_is_error = true; - l_errno = errno; - } - }else{ - EV_SET(l_event, a_esocket->socket, l_filter,l_flags| EV_ADD,l_fflags,a_esocket->kqueue_data,a_esocket); - if( a_esocket->flags & DAP_SOCK_READY_TO_READ ){ - EV_SET(l_event, a_esocket->socket, EVFILT_READ,l_flags| EV_ADD,l_fflags,a_esocket->kqueue_data,a_esocket); - if( kevent( l_kqueue_fd,l_event,1,NULL,0,NULL) != 1 ){ - l_is_error = true; - l_errno = errno; - } - } - if( !l_is_error){ - if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE || a_esocket->flags &DAP_SOCK_CONNECTING ){ - EV_SET(l_event, a_esocket->socket, EVFILT_WRITE,l_flags| EV_ADD,l_fflags,a_esocket->kqueue_data,a_esocket); - if(kevent( l_kqueue_fd,l_event,1,NULL,0,NULL) != 1){ - l_is_error = true; - l_errno = errno; - } - } - } - } - - if ( l_is_error){ - char l_errbuf[128]; - l_errbuf[0]=0; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it(L_ERROR,"Can't update client socket state on kqueue fd %d: \"%s\" (%d)", - l_kqueue_fd, l_errbuf, l_errno); - } -#else -#error "Not defined dap_proc_thread.c::s_update_poll_flags() on your platform" -#endif - return 0; -} /** * @brief dap_proc_thread_create_queue_ptr @@ -434,11 +344,10 @@ int dap_proc_thread_esocket_update_poll_flags(dap_proc_thread_t * a_thread, dap_ */ dap_events_socket_t * dap_proc_thread_create_queue_ptr(dap_proc_thread_t * a_thread, dap_events_socket_callback_queue_ptr_t a_callback) { - dap_events_socket_t * l_es = dap_events_socket_create_type_queue_ptr_unsafe(NULL,a_callback); + dap_events_socket_t * l_es = dap_context_create_esocket_queue(a_thread->context,a_callback); if(l_es == NULL) return NULL; l_es->proc_thread = a_thread; - dap_proc_thread_assign_esocket_unsafe (a_thread, l_es); return l_es; } @@ -452,6 +361,8 @@ static void * s_proc_thread_function(void * a_arg) dap_proc_thread_t * l_thread = (dap_proc_thread_t*) a_arg; assert(l_thread); + dap_context_t * l_context = l_thread->context; + assert(l_context); dap_cpu_assign_thread_on(l_thread->cpu_id); struct sched_param l_shed_params; @@ -466,6 +377,11 @@ static void * s_proc_thread_function(void * a_arg) #else #error "Undefined set sched param" #endif + if(dap_context_thread_init(l_thread->context)!=0){ + pthread_cond_broadcast(&l_thread->started_cond); + return NULL; + } + l_thread->proc_queue = dap_proc_queue_create(l_thread); // Init proc_queue for related worker @@ -476,8 +392,10 @@ static void * s_proc_thread_function(void * a_arg) dap_events_socket_assign_on_worker_mt(l_worker_related->proc_queue_input,l_worker_related); - l_thread->proc_event = dap_events_socket_create_type_event_unsafe(NULL, s_proc_event_callback); - l_thread->event_exit = dap_events_socket_create_type_event_unsafe(NULL, s_event_exit_callback); + l_thread->proc_event = dap_context_create_esocket_event( l_context , s_proc_event_callback); + l_thread->proc_event->proc_thread = l_thread; + l_thread->event_exit = dap_context_create_esocket_event(l_context, s_event_exit_callback); + l_thread->event_exit->proc_thread = l_thread; l_thread->proc_event->_inheritor = l_thread; // we pass thread through it l_thread->event_exit->_inheritor = l_thread; @@ -496,138 +414,7 @@ static void * s_proc_thread_function(void * a_arg) l_thread->queue_io_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_io ); l_thread->queue_callback_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_callback ); } -#ifdef DAP_EVENTS_CAPS_EPOLL - struct epoll_event l_epoll_events[ DAP_EVENTS_SOCKET_MAX]= { { 0 } }; - - // Create epoll ctl - l_thread->epoll_ctl = epoll_create( DAP_EVENTS_SOCKET_MAX ); - - // add proc queue - l_thread->proc_queue->esocket->ev.events = l_thread->proc_queue->esocket->ev_base_flags; - l_thread->proc_queue->esocket->ev.data.ptr = l_thread->proc_queue->esocket; - if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->proc_queue->esocket->socket , &l_thread->proc_queue->esocket->ev) != 0 ){ -#ifdef DAP_OS_WINDOWS - errno = WSAGetLastError(); -#endif - log_it(L_CRITICAL, "Can't add proc queue %zu on epoll ctl, error %d", l_thread->proc_queue->esocket->socket, errno); - return NULL; - } - - // Add proc event - l_thread->proc_event->ev.events = l_thread->proc_event->ev_base_flags ; - l_thread->proc_event->ev.data.ptr = l_thread->proc_event; - if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->proc_event->socket , &l_thread->proc_event->ev) != 0 ){ -#ifdef DAP_OS_WINDOWS - errno = WSAGetLastError(); -#endif - log_it(L_CRITICAL, "Can't add proc event on epoll ctl, err: %d", errno); - return NULL; - } - - // Add exit event - l_thread->event_exit->ev.events = l_thread->event_exit->ev_base_flags; - l_thread->event_exit->ev.data.ptr = l_thread->event_exit; - if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->event_exit->socket , &l_thread->event_exit->ev) != 0 ){ -#ifdef DAP_OS_WINDOWS - errno = WSAGetLastError(); -#endif - log_it(L_CRITICAL, "Can't add exit event on epoll ctl, err: %d", errno); - return NULL; - } - - for (size_t n = 0; n< dap_events_worker_get_count(); n++){ - // Queue asssign - l_thread->queue_assign_input[n]->ev.events = l_thread->queue_assign_input[n]->ev_base_flags ; - l_thread->queue_assign_input[n]->ev.data.ptr = l_thread->queue_assign_input[n]; - if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_assign_input[n]->socket, &l_thread->queue_assign_input[n]->ev) != 0 ){ -#ifdef DAP_OS_WINDOWS - errno = WSAGetLastError(); -#endif - log_it(L_CRITICAL, "Can't add queue input on epoll ctl, err: %d", errno); - return NULL; - } - - // Queue IO - l_thread->queue_io_input[n]->ev.events = l_thread->queue_io_input[n]->ev_base_flags ; - l_thread->queue_io_input[n]->ev.data.ptr = l_thread->queue_io_input[n]; - if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_io_input[n]->fd , &l_thread->queue_io_input[n]->ev) != 0 ){ -#ifdef DAP_OS_WINDOWS - errno = WSAGetLastError(); -#endif - log_it(L_CRITICAL, "Can't add proc io input on epoll ctl, err: %d", errno); - return NULL; - } - - // Queue callback - l_thread->queue_callback_input[n]->ev.events = l_thread->queue_callback_input[n]->ev_base_flags ; - l_thread->queue_callback_input[n]->ev.data.ptr = l_thread->queue_callback_input[n]; - if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_callback_input[n]->fd , &l_thread->queue_callback_input[n]->ev) != 0 ){ -#ifdef DAP_OS_WINDOWS - errno = WSAGetLastError(); -#endif - log_it(L_CRITICAL, "Can't add proc io input on epoll ctl, err: %d", errno); - return NULL; - } - } -#elif defined(DAP_EVENTS_CAPS_POLL) - l_thread->poll_count_max = DAP_EVENTS_SOCKET_MAX; - l_thread->poll_count = 0; - int l_poll_compress = false; - l_thread->poll = DAP_NEW_Z_SIZE(struct pollfd,l_thread->poll_count_max *sizeof (*l_thread->poll)); - l_thread->esockets = DAP_NEW_Z_SIZE(dap_events_socket_t*,l_thread->poll_count_max *sizeof (*l_thread->esockets)); - - // Add proc queue - dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->proc_queue->esocket); - - // Add proc event - l_thread->poll[l_thread->poll_count].fd = l_thread->proc_event->fd; - l_thread->poll[l_thread->poll_count].events = l_thread->proc_event->poll_base_flags; - l_thread->esockets[l_thread->poll_count] = l_thread->proc_event; - l_thread->poll_count++; - - // Add exit event - l_thread->poll[l_thread->poll_count].fd = l_thread->event_exit->fd; - l_thread->poll[l_thread->poll_count].events = l_thread->event_exit->poll_base_flags; - l_thread->esockets[l_thread->poll_count] = l_thread->event_exit; - l_thread->poll_count++; - for (size_t n = 0; n< dap_events_worker_get_count(); n++){ - dap_events_socket_t * l_queue_assign_input = l_thread->queue_assign_input[n]; - dap_events_socket_t * l_queue_io_input = l_thread->queue_io_input[n]; - dap_events_socket_t * l_queue_callback_input = l_thread->queue_callback_input[n]; - if (l_queue_assign_input&&l_queue_io_input){ - - // Queue assign input - l_queue_assign_input->poll_index = l_thread->poll_count; - l_thread->poll[l_thread->poll_count].fd = l_queue_assign_input->fd; - l_thread->poll[l_thread->poll_count].events = l_queue_assign_input->poll_base_flags; - l_thread->esockets[l_thread->poll_count] = l_queue_assign_input; - l_thread->poll_count++; - - // Queue io input - l_queue_io_input->poll_index = l_thread->poll_count; - l_thread->poll[l_thread->poll_count].fd = l_queue_io_input->fd; - l_thread->poll[l_thread->poll_count].events = l_queue_io_input->poll_base_flags; - l_thread->esockets[l_thread->poll_count] = l_queue_io_input; - l_thread->poll_count++; - - // Queue callback input - l_queue_callback_input->poll_index = l_thread->poll_count; - l_thread->poll[l_thread->poll_count].fd = l_queue_callback_input->fd; - l_thread->poll[l_thread->poll_count].events = l_queue_callback_input->poll_base_flags; - l_thread->esockets[l_thread->poll_count] = l_queue_callback_input; - l_thread->poll_count++; - } - } -#elif defined (DAP_EVENTS_CAPS_KQUEUE) - // Create kqueue fd - l_thread->kqueue_fd = kqueue(); - l_thread->kqueue_events_count_max = DAP_EVENTS_SOCKET_MAX; - l_thread->kqueue_events = DAP_NEW_Z_SIZE(struct kevent, l_thread->kqueue_events_count_max *sizeof(struct kevent)); - - dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->proc_queue->esocket); - dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->proc_event); - dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->event_exit); for (size_t n = 0; n< dap_events_worker_get_count(); n++){ // Queue asssign @@ -640,349 +427,15 @@ static void * s_proc_thread_function(void * a_arg) dap_proc_thread_assign_esocket_unsafe(l_thread, l_thread->queue_callback_input[n]); } -#else -#error "Unimplemented poll events analog for this platform" -#endif //We've started! - pthread_mutex_lock(&s_started_mutex); - pthread_mutex_unlock(&s_started_mutex); - pthread_cond_broadcast(&s_started_cond); - - l_thread->signal_exit = false; - - // Main loop - while (!l_thread->signal_kill && !l_thread->signal_exit){ - - int l_selected_sockets; - size_t l_sockets_max; -#ifdef DAP_EVENTS_CAPS_EPOLL - //log_it(L_DEBUG, "Epoll_wait call"); - l_selected_sockets = epoll_wait(l_thread->epoll_ctl, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1); - l_sockets_max = (size_t)l_selected_sockets; -#elif defined (DAP_EVENTS_CAPS_POLL) - l_selected_sockets = poll(l_thread->poll,l_thread->poll_count,-1); - l_sockets_max = l_thread->poll_count; -#elif defined (DAP_EVENTS_CAPS_KQUEUE) - l_selected_sockets = kevent(l_thread->kqueue_fd,NULL,0,l_thread->kqueue_events,l_thread->kqueue_events_count_max,NULL); - l_sockets_max = l_selected_sockets; -#else -#error "Unimplemented poll wait analog for this platform" -#endif - - if(l_selected_sockets == -1) { - if( errno == EINTR) - continue; -#if defined DAP_OS_UNIX - int l_errno = errno; - char l_errbuf[128]; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it(L_ERROR, "Proc thread #%d got errno:\"%s\" (%d)", l_thread->cpu_id , l_errbuf, l_errno); -#elif DAP_OS_WINDOWS - log_it(L_ERROR, "Error occured on thread #%d, errno: %d", l_thread->cpu_id , errno); -#endif - break; - } - for(size_t n = 0; n < l_sockets_max; n++) { - dap_events_socket_t * l_cur; - int l_flag_hup, l_flag_rdhup, l_flag_read, l_flag_write, l_flag_error, - l_flag_nval,l_flag_pri,l_flag_msg; -#ifdef DAP_EVENTS_CAPS_EPOLL - l_cur = (dap_events_socket_t *) l_epoll_events[n].data.ptr; - uint32_t l_cur_events = l_epoll_events[n].events; - l_flag_hup = l_cur_events & EPOLLHUP; - l_flag_rdhup = l_cur_events & EPOLLHUP; - l_flag_write = l_cur_events & EPOLLOUT; - l_flag_read = l_cur_events & EPOLLIN; - l_flag_error = l_cur_events & EPOLLERR; - l_flag_nval = false; - l_flag_pri = false; - l_flag_msg = false; -#elif defined ( DAP_EVENTS_CAPS_POLL) - if(n>=l_thread->poll_count){ - log_it(L_WARNING,"selected_sockets(%d) is bigger then poll count (%zu)", l_selected_sockets, l_thread->poll_count); - break; - } - short l_cur_events = l_thread->poll[n].revents ; - if (!l_cur_events) - continue; - l_cur = l_thread->esockets[n]; - l_flag_hup = l_cur_events& POLLHUP; - l_flag_rdhup = l_cur_events & POLLRDHUP; - l_flag_write = (l_cur_events & POLLOUT) || (l_cur_events &POLLRDNORM)|| (l_cur_events &POLLRDBAND ) ; - l_flag_read = l_cur_events & POLLIN || (l_cur_events &POLLWRNORM)|| (l_cur_events &POLLWRBAND ); - l_flag_error = l_cur_events & POLLERR; - l_flag_nval = l_cur_events & POLLNVAL; - l_flag_pri = l_cur_events & POLLPRI; - l_flag_msg = l_cur_events & POLLMSG; -#elif defined (DAP_EVENTS_CAPS_KQUEUE) - struct kevent * l_kevent = &l_thread->kqueue_events[n]; - l_flag_hup=l_flag_rdhup=l_flag_read=l_flag_write=l_flag_error=l_flag_nval=l_flag_msg =l_flag_pri = false; - - if (l_kevent->filter & EVFILT_USER){ - dap_events_socket_w_data_t * l_es_w_data = (dap_events_socket_w_data_t*) l_kevent->udata; - assert(l_es_w_data); - l_cur = l_es_w_data->esocket; - assert(l_cur); - memcpy(&l_cur->kqueue_event_catched_data,l_es_w_data,sizeof(*l_es_w_data)); - if(l_es_w_data != &l_cur->kqueue_event_catched_data ) - DAP_DELETE(l_es_w_data); - else if (g_debug_reactor) - log_it(L_DEBUG,"Own event signal without actual event data"); - if ( l_cur->pipe_out == NULL){ // If we're not the input for pipe or queue - // we must drop write flag and set read flag - l_flag_read = true; - }else{ - l_flag_write = true; - } - }else{ - l_cur = (dap_events_socket_t*) l_kevent->udata; - assert(l_cur); - - switch (l_kevent->filter) { - case EVFILT_TIMER: - case EVFILT_READ: l_flag_read = true; break; - case EVFILT_WRITE: l_flag_write = true; break; - case EVFILT_EXCEPT : l_flag_rdhup = true; break; - default: log_it(L_CRITICAL,"Unknown filter type in polling, exit thread"); return NULL; - } - - } - l_cur->kqueue_event_catched = l_kevent; -#ifndef DAP_OS_DARWIN - u_int l_cur_events = l_thread->kqueue_events[n].fflags; -#else - uint32_t l_cur_events = l_thread->kqueue_events[n].fflags; -#endif - -#else -#error "Unimplemented fetch esocket after poll" -#endif - assert(l_cur); - if(g_debug_reactor) - log_it(L_DEBUG, "Proc thread #%u esocket %p fd=%"DAP_FORMAT_SOCKET" type=%d flags=0x%0X (%s:%s:%s:%s:%s:%s:%s:%s)", l_thread->cpu_id, l_cur, l_cur->socket, - l_cur->type, l_cur_events, l_flag_read?"read":"", l_flag_write?"write":"", l_flag_error?"error":"", - l_flag_hup?"hup":"", l_flag_rdhup?"rdhup":"", l_flag_msg?"msg":"", l_flag_nval?"nval":"", l_flag_pri?"pri":""); - - //log_it(L_DEBUG,"Waked up esocket %p (socket %d) {read:%s,write:%s,error:%s} ", l_cur, l_cur->fd, - // l_flag_read?"true":"false", l_flag_write?"true":"false", l_flag_error?"true":"false" ); - time_t l_cur_time = time( NULL); - l_cur->last_time_active = l_cur_time; - if (l_flag_error){ -#ifdef DAP_OS_WINDOWS - int l_errno = WSAGetLastError(); -#else - int l_errno = errno; -#endif - char l_errbuf[128]; - strerror_r(l_errno, l_errbuf,sizeof (l_errbuf)); - log_it(L_ERROR,"Some error on proc thread #%u with %"DAP_FORMAT_SOCKET" socket: %s(%d)",l_thread->cpu_id, l_cur->socket, l_errbuf, l_errno); - if(l_cur->callbacks.error_callback) - l_cur->callbacks.error_callback(l_cur, errno); - } - if (l_flag_read ){ - int32_t l_bytes_read = 0; - switch (l_cur->type) { - case DESCRIPTOR_TYPE_QUEUE: - dap_events_socket_queue_proc_input_unsafe(l_cur); -#ifdef DAP_OS_WINDOWS - l_bytes_read = dap_recvfrom(l_cur->socket, NULL, 0); -#endif - break; - case DESCRIPTOR_TYPE_EVENT: - dap_events_socket_event_proc_input_unsafe (l_cur); - break; - - default: - log_it(L_ERROR, "Unprocessed descriptor type accepted in proc thread loop"); -#ifdef DAP_OS_WINDOWS - l_bytes_read = dap_recvfrom(l_cur->socket, NULL, 0); -#endif - break; - } - } - if (l_flag_write ){ - int l_errno=0; - if (l_cur->buf_out_size){ - ssize_t l_bytes_sent = -1; - switch (l_cur->type) { - case DESCRIPTOR_TYPE_QUEUE: - if (l_cur->flags & DAP_SOCK_QUEUE_PTR){ - #if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) - l_bytes_sent = write(l_cur->socket, l_cur->buf_out, sizeof (void *) ); // We send pointer by pointer - #elif defined DAP_EVENTS_CAPS_MSMQ - DWORD l_mp_id = 0; - MQMSGPROPS l_mps; - MQPROPVARIANT l_mpvar[1]; - MSGPROPID l_p_id[1]; - HRESULT l_mstatus[1]; - - l_p_id[l_mp_id] = PROPID_M_BODY; - l_mpvar[l_mp_id].vt = VT_VECTOR | VT_UI1; - l_mpvar[l_mp_id].caub.pElems = l_cur->buf_out; - l_mpvar[l_mp_id].caub.cElems = (u_long)l_cur->buf_out_size; - l_mp_id++; - - l_mps.cProp = l_mp_id; - l_mps.aPropID = l_p_id; - l_mps.aPropVar = l_mpvar; - l_mps.aStatus = l_mstatus; - HRESULT hr = MQSendMessage(l_cur->mqh, &l_mps, MQ_NO_TRANSACTION); - - if (hr != MQ_OK) { - log_it(L_ERROR, "An error occured on sending message to queue, errno: %ld", hr); - break; - } else { - if(dap_sendto(l_cur->socket, l_cur->port, NULL, 0) == SOCKET_ERROR) { - log_it(L_ERROR, "Write to sock error: %d", WSAGetLastError()); - } - l_cur->buf_out_size = 0; - dap_events_socket_set_writable_unsafe(l_cur,false); - - break; - } - #elif defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE) - char * l_ptr = (char *) l_cur->buf_out; - l_bytes_sent = mq_send(l_cur->mqd, l_ptr, sizeof (l_ptr),0); - if (l_bytes_sent==0){ -// log_it(L_DEBUG,"mq_send %p success", l_ptr_in); - l_bytes_sent = sizeof (void *); - }else if (l_bytes_sent == -1 && errno == EINVAL){ // To make compatible with other - l_errno = EAGAIN; // non-blocking sockets -// log_it(L_DEBUG,"mq_send %p EAGAIN", l_ptr_in); - }else{ - l_errno = errno; - log_it(L_WARNING,"mq_send %p errno: %d", l_ptr, l_errno); - } - #elif defined (DAP_EVENTS_CAPS_KQUEUE) - - // Select socket and kqueue fd to send the event - dap_events_socket_t * l_es_output = l_cur->pipe_out ? l_cur->pipe_out : l_cur; - int l_kqueue_fd = l_es_output->context ? l_es_output->context->kqueue_fd : -1; - - struct kevent l_event; - dap_events_socket_w_data_t * l_es_w_data = DAP_NEW_Z(dap_events_socket_w_data_t); - l_es_w_data->esocket = l_es_output; - - memcpy(&l_es_w_data->ptr,l_cur->buf_out,sizeof(l_es_w_data->ptr) ); - EV_SET(&l_event,l_es_output->socket, EVFILT_USER,0,NOTE_TRIGGER ,0, l_es_w_data); - - - int l_n = l_kqueue_fd==-1 ? -1 : kevent(l_kqueue_fd,&l_event,1,NULL,0,NULL); - if (l_n != -1) - l_bytes_sent = sizeof(l_es_w_data->ptr); - else{ - l_errno = errno; - log_it(L_WARNING,"queue ptr send error: kevent %p errno: %d", l_es_w_data->ptr, l_errno); - DAP_DELETE(l_es_w_data); - } - #else - #error "Not implemented dap_events_socket_queue_ptr_send() for this platform" - #endif - //int l_errno = errno; - - break; - }break; - default: - log_it(L_ERROR, "Dont process write flags for this socket %d in proc thread", l_cur->fd); - - } - l_errno = errno; - - if(l_bytes_sent>0){ - l_cur->buf_out_size -= l_bytes_sent; - //log_it(L_DEBUG,"Sent %zd bytes out, left %zd in buf out", l_bytes_sent, l_cur->buf_out); - if (l_cur->buf_out_size ){ // Shrink output buffer - - memmove(l_cur->buf_out, l_cur->buf_out+l_bytes_sent, l_cur->buf_out_size ); - }else{ -#ifndef DAP_EVENTS_CAPS_KQUEUE - l_cur->flags ^= DAP_SOCK_READY_TO_WRITE; - dap_proc_thread_esocket_update_poll_flags(l_thread, l_cur); -#else - log_it(L_WARNING,"(!) Write event receieved but nothing in buffer"); - sleep(500); // to prevent shitting the log files -#endif - } - } - - }else{ - // TODO Make this code platform-independent -#ifndef DAP_EVENTS_CAPS_EVENT_KEVENT - log_it(L_DEBUG,"(!) Write event receieved but nothing in buffer, switching off this flag"); - l_cur->flags ^= DAP_SOCK_READY_TO_WRITE; - dap_proc_thread_esocket_update_poll_flags(l_thread, l_cur); - // TODO Make this code platform-independent -#else - log_it(L_WARNING,"(!) Write event receieved but nothing in buffer"); - sleep(500); // to prevent shitting the log files -#endif - } + pthread_cond_broadcast(&l_thread->started_cond); - } - if(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE){ -#ifdef DAP_EVENTS_CAPS_EPOLL - log_it(L_WARNING, "Deleting esocket %d from proc thread?...", l_cur->fd); - if ( epoll_ctl( l_thread->epoll_ctl, EPOLL_CTL_DEL, l_cur->fd, &l_cur->ev) == -1 ) - log_it( L_ERROR,"Can't remove event socket's handler from the epoll ctl" ); - //else - // log_it( L_DEBUG,"Removed epoll's event from proc thread #%u", l_thread->cpu_id ); - if (l_cur->callbacks.delete_callback) - l_cur->callbacks.delete_callback(l_cur, l_thread); - if(l_cur->_inheritor) - DAP_DELETE(l_cur->_inheritor); - DAP_DELETE(l_cur); -#elif defined (DAP_EVENTS_CAPS_POLL) - l_thread->poll[n].fd = -1; - l_poll_compress = true; -#elif defined (DAP_EVENTS_CAPS_KQUEUE) - if (l_cur->socket != -1 ){ - struct kevent * l_event = &l_cur->kqueue_event; - EV_SET(l_event, l_cur->socket, 0 ,EV_DELETE, 0,0,l_cur); - if ( kevent( l_thread->kqueue_fd,l_event,1,NULL,0,NULL) != 1 ) { - int l_errno = errno; - char l_errbuf[128]; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it( L_ERROR,"Can't remove event socket's handler %d from the epoll_fd %d \"%s\" (%d)", l_cur->socket, - l_thread->kqueue_fd, l_errbuf, l_errno); - } - } -#else -#error "Unimplemented poll ctl analog for this platform" -#endif - } + dap_context_thread_loop(l_thread->context); - } -#ifdef DAP_EVENTS_CAPS_POLL - /***********************************************************/ - /* If the compress_array flag was turned on, we need */ - /* to squeeze together the array and decrement the number */ - /* of file descriptors. */ - /***********************************************************/ - if ( l_poll_compress){ - l_poll_compress = false; - for (size_t i = 0; i < l_thread->poll_count ; i++) { - if ( l_thread->poll[i].fd == -1){ - for(size_t j = i; j +1 < l_thread->poll_count; j++){ - l_thread->poll[j].fd = l_thread->poll[j+1].fd; - l_thread->poll[j].events = l_thread->poll[j+1].events; - l_thread->poll[j].revents = l_thread->poll[j+1].revents; - l_thread->esockets[j] = l_thread->esockets[j+1]; - if(l_thread->esockets[j]) - l_thread->esockets[j]->poll_index = j; - } - i--; - l_thread->poll_count--; - } - } - } -#endif - } log_it(L_ATT, "Stop processing thread #%u", l_thread->cpu_id); - fflush(stdout); - // cleanip inputs for (size_t n=0; n<dap_events_worker_get_count(); n++){ dap_events_socket_delete_unsafe(l_thread->queue_assign_input[n], false); @@ -1105,7 +558,7 @@ static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags) { (void) a_flags; dap_proc_thread_t * l_thread = (dap_proc_thread_t *) a_es->_inheritor; - l_thread->signal_exit = true; + l_thread->context->signal_exit = true; if(g_debug_reactor) log_it(L_DEBUG, "Proc_thread :%u signaled to exit", l_thread->cpu_id); } diff --git a/dap-sdk/io/dap_server.c b/dap-sdk/io/dap_server.c index 792af68632..84471b9c63 100644 --- a/dap-sdk/io/dap_server.c +++ b/dap-sdk/io/dap_server.c @@ -104,7 +104,7 @@ void dap_server_delete(dap_server_t *a_server) { while (a_server->es_listeners) { dap_events_socket_t *l_es = (dap_events_socket_t *)a_server->es_listeners->data; - dap_events_socket_remove_and_delete_mt(l_es->worker, l_es->uuid); // TODO unsafe moment. Replace storage to uuids + dap_events_socket_remove_and_delete_mt(l_es->context->worker, l_es->uuid); // TODO unsafe moment. Replace storage to uuids dap_list_t *l_tmp = a_server->es_listeners; a_server->es_listeners = l_tmp->next; DAP_DELETE(l_tmp); @@ -345,7 +345,7 @@ static int s_server_run(dap_server_t * a_server, dap_events_socket_callbacks_t * */ static void s_es_server_new(dap_events_socket_t *a_es, void * a_arg) { - log_it(L_DEBUG, "Created server socket %p on worker %u", a_es, a_es->worker->id); + log_it(L_DEBUG, "Created server socket %p on worker %u", a_es, a_es->context->worker->id); dap_server_t *l_server = (dap_server_t*) a_es->_inheritor; pthread_mutex_lock( &l_server->started_mutex); pthread_mutex_unlock( &l_server->started_mutex); @@ -383,7 +383,7 @@ static void s_es_server_accept(dap_events_socket_t *a_es, SOCKET a_remote_socket dap_events_socket_t * l_es_new = NULL; log_it(L_DEBUG, "Listening socket (binded on %s:%u) got new incomming connection",l_server->address,l_server->port); log_it(L_DEBUG, "Accepted new connection (sock %"DAP_FORMAT_SOCKET" from %"DAP_FORMAT_SOCKET")", a_remote_socket, a_es->socket); - l_es_new = s_es_server_create(a_es->events,a_remote_socket,&l_server->client_callbacks,l_server); + l_es_new = s_es_server_create(dap_events_get_default(),a_remote_socket,&l_server->client_callbacks,l_server); //l_es_new->is_dont_reset_write_flag = true; // By default all income connection has this flag getnameinfo(a_remote_addr,a_remote_addr_size, l_es_new->hostaddr ,256, l_es_new->service,sizeof(l_es_new->service), diff --git a/dap-sdk/io/dap_timerfd.c b/dap-sdk/io/dap_timerfd.c index c358ed402c..55d6b02dd0 100644 --- a/dap-sdk/io/dap_timerfd.c +++ b/dap-sdk/io/dap_timerfd.c @@ -255,7 +255,7 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t return l_timerfd; } -static inline void s_timerfd_reset(dap_timerfd_t *a_timerfd, dap_events_socket_t *a_event_sock) +static inline void s_timerfd_reset(dap_timerfd_t *a_timerfd, dap_events_socket_t *a_es) { #if defined DAP_OS_LINUX struct itimerspec l_ts; @@ -269,10 +269,10 @@ static inline void s_timerfd_reset(dap_timerfd_t *a_timerfd, dap_events_socket_t log_it(L_WARNING, "Reset timerfd failed: timerfd_settime() errno=%d\n", errno); } #elif defined (DAP_OS_BSD) - dap_worker_add_events_socket_unsafe(a_event_sock,a_event_sock->worker); -//struct kevent * l_event = &a_event_sock->kqueue_event; -//EV_SET(l_event, 0, a_event_sock->kqueue_base_filter, a_event_sock->kqueue_base_flags,a_event_sock->kqueue_base_fflags,a_event_sock->kqueue_data,a_event_sock); -//kevent(a_event_sock->worker->kqueue_fd,l_event,1,NULL,0,NULL); + // Re-add timer in context + dap_context_t * l_context = a_es->context; + a_es->context = NULL; + dap_context_add_esocket(l_context,a_es); #elif defined (DAP_OS_WINDOWS) /*LARGE_INTEGER l_due_time; l_due_time.QuadPart = (long long)a_timerfd->timeout_ms * _MSEC; @@ -285,7 +285,7 @@ static inline void s_timerfd_reset(dap_timerfd_t *a_timerfd, dap_events_socket_t #endif #ifndef DAP_OS_BSD - dap_events_socket_set_readable_unsafe(a_event_sock, true); + dap_events_socket_set_readable_unsafe(a_es, true); #endif } @@ -367,6 +367,6 @@ void dap_timerfd_delete(dap_timerfd_t *a_timerfd) #ifdef _WIN32 DeleteTimerQueueTimer(hTimerQueue, (HANDLE)a_timerfd->th, NULL); #endif - if (a_timerfd->events_socket->worker) - dap_events_socket_remove_and_delete_mt(a_timerfd->events_socket->worker, a_timerfd->esocket_uuid); + if (a_timerfd->events_socket->context->worker) + dap_events_socket_remove_and_delete_mt(a_timerfd->events_socket->context->worker, a_timerfd->esocket_uuid); } diff --git a/dap-sdk/io/dap_worker.c b/dap-sdk/io/dap_worker.c index d70092b68b..7c5293241a 100644 --- a/dap-sdk/io/dap_worker.c +++ b/dap-sdk/io/dap_worker.c @@ -77,7 +77,8 @@ void dap_worker_deinit( ) void *dap_worker_thread(void *arg) { dap_worker_t *l_worker = (dap_worker_t *) arg; - uint32_t l_tn = l_worker->id; + assert(l_worker); + dap_context_t * l_context = l_worker->context; const struct sched_param l_shed_params = {0}; @@ -91,7 +92,7 @@ void *dap_worker_thread(void *arg) pthread_setschedparam(pthread_self(),SCHED_FIFO ,&l_shed_params); #endif - if(dap_context_thread_init(l_worker->context)!=0){ + if(dap_context_thread_init(l_context)!=0){ pthread_cond_broadcast(&l_worker->started_cond); return NULL; } @@ -102,10 +103,10 @@ void *dap_worker_thread(void *arg) l_worker->queue_es_reassign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); - l_worker->queue_es_new = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_add_es_callback); - l_worker->queue_es_delete = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_delete_es_callback); - l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_es_io_callback); - l_worker->queue_es_reassign = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_es_reassign_callback ); + l_worker->queue_es_new = dap_context_create_esocket_queue(l_context, s_queue_add_es_callback); + l_worker->queue_es_delete = dap_context_create_esocket_queue(l_context, s_queue_delete_es_callback); + l_worker->queue_es_io = dap_context_create_esocket_queue(l_context, s_queue_es_io_callback); + l_worker->queue_es_reassign = dap_context_create_esocket_queue(l_context, s_queue_es_reassign_callback ); for( size_t n = 0; n < dap_events_worker_get_count(); n++) { @@ -115,17 +116,15 @@ void *dap_worker_thread(void *arg) l_worker->queue_es_reassign_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_reassign); } - l_worker->queue_callback = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_callback_callback); - l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback); + l_worker->queue_callback = dap_context_create_esocket_queue(l_context, s_queue_callback_callback); + l_worker->event_exit = dap_context_create_esocket_event(l_context, s_event_exit_callback); l_worker->timer_check_activity = dap_timerfd_create(s_connection_timeout * 1000 / 2, s_socket_all_check_activity, l_worker); dap_worker_add_events_socket_unsafe( l_worker->timer_check_activity->events_socket, l_worker); - pthread_mutex_lock(&l_worker->started_mutex); pthread_cond_broadcast(&l_worker->started_cond); - pthread_mutex_unlock(&l_worker->started_mutex); - dap_context_thread_loop(l_worker->context); + dap_context_thread_loop(l_context); log_it(L_NOTICE,"Exiting thread #%u", l_worker->id); return NULL; } @@ -137,7 +136,11 @@ void *dap_worker_thread(void *arg) */ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg) { - dap_worker_t * l_worker = a_es->worker; + assert(a_es); + dap_context_t * l_context = a_es->context; + assert(l_context); + dap_worker_t * l_worker = l_context->worker; + assert(l_worker); dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg; if (!l_es_new){ log_it(L_ERROR,"NULL esocket accepted to add on worker #%u", l_worker->id); @@ -163,7 +166,7 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg) if(l_es_new->socket!=0 && l_es_new->socket != INVALID_SOCKET) #endif - if(dap_context_esocket_find_by_uuid( l_worker->context, l_es_new->uuid)){ + if(dap_context_esocket_find_by_uuid( l_context, l_es_new->uuid)){ // Socket already present in worker, it's OK return; } @@ -184,7 +187,6 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg) default: {} } - l_es_new->worker = l_worker; l_es_new->last_time_active = time(NULL); // We need to differ new and reassigned esockets. If its new - is_initialized is false if ( ! l_es_new->is_initalized ){ @@ -193,7 +195,7 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg) l_es_new->is_initalized = true; } - int l_ret = dap_worker_add_events_socket_unsafe(l_es_new,l_worker); + int l_ret =dap_context_add_esocket(l_context,l_es_new); if ( l_ret != 0 ){ log_it(L_CRITICAL,"Can't add event socket's handler to worker i/o poll mechanism with error %d", errno); }else{ @@ -220,7 +222,7 @@ static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg assert(a_arg); dap_events_socket_uuid_t * l_es_uuid_ptr = (dap_events_socket_uuid_t*) a_arg; dap_events_socket_t * l_es; - if ( (l_es = dap_context_esocket_find_by_uuid(a_es->worker->context,*l_es_uuid_ptr)) != NULL ){ + if ( (l_es = dap_context_esocket_find_by_uuid(a_es->context,*l_es_uuid_ptr)) != NULL ){ //l_es->flags |= DAP_SOCK_SIGNAL_CLOSE; // Send signal to socket to kill dap_events_socket_remove_and_delete_unsafe(l_es,false); }else @@ -236,15 +238,17 @@ static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_arg) { assert(a_es); - dap_worker_t * l_worker = a_es->worker; + dap_context_t * l_context = a_es->context; + assert(l_context); + dap_worker_t * l_worker = l_context->worker; assert(l_worker); dap_worker_msg_reassign_t * l_msg = (dap_worker_msg_reassign_t*) a_arg; assert(l_msg); dap_events_socket_t * l_es_reassign; - if ( ( l_es_reassign = dap_context_esocket_find_by_uuid(l_worker->context, l_msg->esocket_uuid))!= NULL ){ + if ( ( l_es_reassign = dap_context_esocket_find_by_uuid(l_context, l_msg->esocket_uuid))!= NULL ){ if( l_es_reassign->was_reassigned && l_es_reassign->flags & DAP_SOCK_REASSIGN_ONCE) { log_it(L_INFO, "Reassgment request with DAP_SOCK_REASSIGN_ONCE allowed only once, declined reassigment from %u to %u", - l_es_reassign->worker->id, l_msg->worker_new->id); + l_es_reassign->context->worker->id, l_msg->worker_new->id); }else{ dap_events_socket_reassign_between_workers_unsafe(l_es_reassign,l_msg->worker_new); @@ -265,7 +269,7 @@ static void s_queue_callback_callback( dap_events_socket_t * a_es, void * a_arg) dap_worker_msg_callback_t * l_msg = (dap_worker_msg_callback_t *) a_arg; assert(l_msg); assert(l_msg->callback); - l_msg->callback(a_es->worker, l_msg->arg); + l_msg->callback(a_es->context->worker, l_msg->arg); DAP_DELETE(l_msg); } @@ -277,9 +281,9 @@ static void s_queue_callback_callback( dap_events_socket_t * a_es, void * a_arg) static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags) { (void) a_flags; - a_es->worker->context->signal_exit = true; + a_es->context->signal_exit = true; if(g_debug_reactor) - log_it(L_DEBUG, "Worker :%u signaled to exit", a_es->worker->id); + log_it(L_DEBUG, "Worker :%u signaled to exit", a_es->context->worker->id); } /** @@ -290,7 +294,9 @@ static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags) static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg) { assert(a_es); - dap_worker_t * l_worker = a_es->worker; + dap_context_t * l_context = a_es->context; + assert(l_context); + dap_worker_t * l_worker = a_es->context->worker; dap_worker_msg_io_t * l_msg = a_arg; assert(l_msg); // Check if it was removed from the list @@ -428,7 +434,6 @@ dap_worker_t *dap_worker_add_events_socket_auto( dap_events_socket_t *a_es) // struct epoll_event ev = {0}; dap_worker_t *l_worker = dap_events_worker_get_auto( ); - a_es->events = l_worker->events; dap_worker_add_events_socket( a_es, l_worker); return l_worker; } diff --git a/dap-sdk/io/include/dap_context.h b/dap-sdk/io/include/dap_context.h index 51099b9b04..182a1fb421 100644 --- a/dap-sdk/io/include/dap_context.h +++ b/dap-sdk/io/include/dap_context.h @@ -28,8 +28,14 @@ #include "dap_events_socket.h" #include "dap_proc_queue.h" +typedef struct dap_worker dap_worker_t; +typedef struct dap_proc_thread dap_proc_thread_t; typedef struct dap_context { - uint32_t id; + uint32_t id; // Context ID + + // Compatibility fields, in future should be replaced with _inheritor + dap_proc_thread_t * proc_thread; // If the context belongs to proc_thread + dap_worker_t * worker; // If the context belongs to worker #if defined DAP_EVENTS_CAPS_MSMQ HANDLE msmq_events[MAXIMUM_WAIT_OBJECTS]; @@ -80,5 +86,8 @@ int dap_context_thread_init(dap_context_t * a_context); int dap_context_thread_loop(dap_context_t * a_context); int dap_context_add_esocket(dap_context_t * a_context, dap_events_socket_t * a_esocket ); -void dap_context_poll_update(dap_events_socket_t * a_esocket); +int dap_context_poll_update(dap_events_socket_t * a_esocket); dap_events_socket_t *dap_context_esocket_find_by_uuid(dap_context_t * a_context, dap_events_socket_uuid_t a_es_uuid ); +dap_events_socket_t * dap_context_create_esocket_queue(dap_context_t * a_context, dap_events_socket_callback_queue_ptr_t a_callback); +dap_events_socket_t * dap_context_create_esocket_event(dap_context_t * a_context, dap_events_socket_callback_event_t a_callback); +dap_events_socket_t * dap_context_create_esocket_pipe(dap_context_t * a_context, dap_events_socket_callback_t a_callback, uint32_t a_flags); diff --git a/dap-sdk/io/include/dap_events_socket.h b/dap-sdk/io/include/dap_events_socket.h index 0151e76708..52086076a9 100644 --- a/dap-sdk/io/include/dap_events_socket.h +++ b/dap-sdk/io/include/dap_events_socket.h @@ -245,9 +245,8 @@ typedef struct dap_events_socket { // Links to related objects - dap_events_t *events; +// dap_events_t *events; dap_context_t * context; - dap_worker_t *worker; dap_proc_thread_t * proc_thread; // If assigned on dap_proc_thread_t object dap_server_t *server; // If this socket assigned with server @@ -312,7 +311,6 @@ void dap_events_socket_deinit(void); // Deinit clients module dap_events_socket_t * dap_events_socket_create(dap_events_desc_type_t a_type, dap_events_socket_callbacks_t* a_callbacks); -dap_events_socket_t * dap_events_socket_create_type_queue_ptr_unsafe(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback); dap_events_socket_t * dap_events_socket_create_type_queue_ptr_mt(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback); int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket); @@ -331,6 +329,7 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg); int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value); void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_preserve_inheritor); +void dap_events_socket_delete_mt(dap_worker_t * a_worker, dap_events_socket_uuid_t a_es_uuid); dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, int a_sock, dap_events_socket_callbacks_t *a_callbacks ); diff --git a/dap-sdk/io/include/dap_proc_thread.h b/dap-sdk/io/include/dap_proc_thread.h index 5eb2df2090..e2b1e9fb5e 100644 --- a/dap-sdk/io/include/dap_proc_thread.h +++ b/dap-sdk/io/include/dap_proc_thread.h @@ -44,27 +44,12 @@ typedef struct dap_proc_thread{ dap_events_socket_t *queue_gdb_input; /* Inputs for request to GDB, @RRL: #6238 */ dap_context_t * context; - int signal_kill; - int signal_exit; dap_events_socket_t * event_exit; + pthread_cond_t started_cond; + pthread_mutex_t started_mutex ; -#ifdef DAP_EVENTS_CAPS_EPOLL - EPOLL_HANDLE epoll_ctl; -#elif defined (DAP_EVENTS_CAPS_POLL) - int poll_fd; - struct pollfd * poll; - dap_events_socket_t ** esockets; - size_t poll_count; - size_t poll_count_max; -#elif defined (DAP_EVENTS_CAPS_KQUEUE) - int kqueue_fd; - struct kevent * kqueue_events; - int kqueue_events_count_max; -#else -#error "No poll for proc thread for your platform" -#endif void * _inheritor; } dap_proc_thread_t; @@ -84,8 +69,6 @@ int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_ int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_uuid_t a_es_uuid, const char * a_format,...); -int dap_proc_thread_esocket_update_poll_flags(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket); - typedef void (*dap_proc_worker_callback_t)(dap_worker_t *,void *); void dap_proc_thread_worker_exec_callback_inter(dap_proc_thread_t * a_thread, size_t a_worker_id, dap_proc_worker_callback_t a_callback, void * a_arg); diff --git a/dap-sdk/io/include/dap_worker.h b/dap-sdk/io/include/dap_worker.h index 6e9ed81992..c9cf8201d4 100644 --- a/dap-sdk/io/include/dap_worker.h +++ b/dap-sdk/io/include/dap_worker.h @@ -102,10 +102,7 @@ void dap_worker_deinit(); static inline int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_worker_t * a_worker) { - int l_ret = dap_context_add_esocket(a_worker->context, a_esocket); - if (l_ret == 0 ) - a_esocket->worker = a_worker; - return l_ret; + return dap_context_add_esocket(a_worker->context, a_esocket); } void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_worker_t * a_worker); diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index bc9237ba22..56a7fdae61 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -215,7 +215,7 @@ static void s_stream_connected(dap_client_pvt_t * a_client_pvt) dap_events_socket_uuid_t * l_es_uuid_ptr = DAP_NEW_Z(dap_events_socket_uuid_t); assert(a_client_pvt->stream_es); *l_es_uuid_ptr = a_client_pvt->stream_es->uuid; - if( dap_timerfd_start_on_worker(a_client_pvt->stream_es->worker, s_client_timeout_active_after_connect_seconds * 1000, s_stream_timer_timeout_after_connected_check ,l_es_uuid_ptr) == NULL ){ + if( dap_timerfd_start_on_worker(a_client_pvt->stream_es->context->worker, s_client_timeout_active_after_connect_seconds * 1000, s_stream_timer_timeout_after_connected_check ,l_es_uuid_ptr) == NULL ){ log_it(L_ERROR,"Can't run timer for stream after connect check for esocket uuid %"DAP_UINT64_FORMAT_U, *l_es_uuid_ptr); DAP_DEL_Z(l_es_uuid_ptr); } diff --git a/dap-sdk/net/server/http_server/dap_http_simple.c b/dap-sdk/net/server/http_server/dap_http_simple.c index c37bd1ab71..6dead19f96 100644 --- a/dap-sdk/net/server/http_server/dap_http_simple.c +++ b/dap-sdk/net/server/http_server/dap_http_simple.c @@ -343,7 +343,7 @@ static void s_http_client_headers_read( dap_http_client_t *a_http_client, void * l_http_simple->esocket = a_http_client->esocket; l_http_simple->http_client = a_http_client; - l_http_simple->worker = a_http_client->esocket->worker; + l_http_simple->worker = a_http_client->esocket->context->worker; l_http_simple->reply_size_max = DAP_HTTP_SIMPLE_URL_PROC( a_http_client->proc )->reply_size_max; l_http_simple->reply_byte = DAP_NEW_Z_SIZE(uint8_t, DAP_HTTP_SIMPLE(a_http_client)->reply_size_max ); @@ -411,7 +411,7 @@ void s_http_client_data_read( dap_http_client_t *a_http_client, void * a_arg ) // bool isOK=true; log_it( L_INFO,"Data for http_simple_request collected" ); - dap_events_socket_remove_from_worker_unsafe(a_http_client->esocket,a_http_client->esocket->worker); + dap_events_socket_remove_from_worker_unsafe(a_http_client->esocket,a_http_client->esocket->context->worker); dap_proc_queue_add_callback_inter( l_http_simple->worker->proc_queue_input , s_proc_queue_callback, l_http_simple); } } diff --git a/dap-sdk/net/server/notify_server/src/dap_notify_srv.c b/dap-sdk/net/server/notify_server/src/dap_notify_srv.c index 9b79bd6e7b..37df9da127 100644 --- a/dap-sdk/net/server/notify_server/src/dap_notify_srv.c +++ b/dap-sdk/net/server/notify_server/src/dap_notify_srv.c @@ -182,7 +182,7 @@ static void s_notify_server_callback_queue(dap_events_socket_t * a_es, void * a_ } size_t l_str_len = a_arg? strlen((char*)a_arg): 0; if(l_str_len){ - dap_events_socket_write_mt(l_socket_handler->esocket->worker, //_inter(a_es->worker->queue_es_io_input[l_worker_id], + dap_events_socket_write_mt(l_socket_handler->esocket->context->worker, //_inter(a_es->worker->queue_es_io_input[l_worker_id], l_socket_handler->uuid, a_arg, l_str_len + 1); } @@ -206,12 +206,12 @@ static void s_notify_server_callback_new(dap_events_socket_t * a_es, void * a_ar uint64_t *l_uuid_u64 =(uint64_t*) &a_es->uuid; log_it(L_WARNING,"Trying to add notify client with uuid 0x%016"DAP_UINT64_FORMAT_X" but already present this UUID in list, updating only esocket pointer if so", *l_uuid_u64); l_hh_new->esocket = a_es; - l_hh_new->worker_id = a_es->worker->id; + l_hh_new->worker_id = a_es->context->worker->id; }else { l_hh_new = DAP_NEW_Z(dap_events_socket_handler_hh_t); l_hh_new->esocket = a_es; l_hh_new->uuid = a_es->uuid; - l_hh_new->worker_id = a_es->worker->id; + l_hh_new->worker_id = a_es->context->worker->id; a_es->no_close = true; HASH_ADD(hh, s_notify_server_clients, uuid, sizeof (l_hh_new->uuid), l_hh_new); } diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index 6e781e3500..d49d08488c 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -266,7 +266,7 @@ dap_stream_t *s_stream_new(dap_http_client_t *a_http_client) dap_stream_t *l_ret = DAP_NEW_Z(dap_stream_t); l_ret->esocket = a_http_client->esocket; - l_ret->stream_worker = (dap_stream_worker_t *)a_http_client->esocket->worker->_inheritor; + l_ret->stream_worker = (dap_stream_worker_t *)a_http_client->esocket->context->worker->_inheritor; l_ret->conn_http = a_http_client; l_ret->buf_defrag_size = 0; l_ret->seq_id = 0; @@ -274,7 +274,7 @@ dap_stream_t *s_stream_new(dap_http_client_t *a_http_client) // Start server keep-alive timer dap_events_socket_uuid_t *l_es_uuid = DAP_NEW_Z(dap_events_socket_uuid_t); *l_es_uuid = l_ret->esocket->uuid; - l_ret->keepalive_timer = dap_timerfd_start_on_worker(l_ret->esocket->worker, + l_ret->keepalive_timer = dap_timerfd_start_on_worker(l_ret->esocket->context->worker, STREAM_KEEPALIVE_TIMEOUT * 1000, (dap_timerfd_callback_t)s_callback_server_keepalive, l_es_uuid); diff --git a/dap-sdk/net/stream/stream/dap_stream_worker.c b/dap-sdk/net/stream/stream/dap_stream_worker.c index 8e553a3da9..8d1fa9e515 100644 --- a/dap-sdk/net/stream/stream/dap_stream_worker.c +++ b/dap-sdk/net/stream/stream/dap_stream_worker.c @@ -96,7 +96,7 @@ int dap_stream_worker_init() */ static void s_ch_io_callback(dap_events_socket_t * a_es, void * a_msg) { - dap_stream_worker_t * l_stream_worker = DAP_STREAM_WORKER( a_es->worker ); + dap_stream_worker_t * l_stream_worker = DAP_STREAM_WORKER( a_es->context->worker ); dap_stream_worker_msg_io_t * l_msg = (dap_stream_worker_msg_io_t*) a_msg; assert(l_msg); diff --git a/modules/global-db/dap_chain_global_db.c b/modules/global-db/dap_chain_global_db.c index 3220ef0871..331c6a9bea 100644 --- a/modules/global-db/dap_chain_global_db.c +++ b/modules/global-db/dap_chain_global_db.c @@ -818,8 +818,8 @@ dap_events_socket_t *l_es; l_proc_que = s_global_db_proc_thread->_inheritor; l_es = l_proc_que->esocket; - l_wrk = l_proc_que->esocket->worker; - l_proc_thd = l_proc_que->esocket->proc_thread; + l_wrk = l_proc_que->esocket->context->worker; + l_proc_thd = l_proc_que->esocket->context->proc_thread; if ( !l_wrk && !l_proc_thd ) return log_it(L_ERROR, "Both <worker> or <proc_thread> contexts are NULL"), -EINVAL; diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index e27494f7e5..298c919251 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -330,9 +330,9 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) l_node_client->stream_worker = l_stream->stream_worker; if (l_node_client->keep_connection) { dap_events_socket_uuid_t *l_uuid = DAP_DUP(&l_node_client->uuid); - dap_worker_exec_callback_on(l_stream->esocket->worker, s_node_client_connected_synchro_start_callback, l_uuid); + dap_worker_exec_callback_on(l_stream->esocket->context->worker, s_node_client_connected_synchro_start_callback, l_uuid); dap_events_socket_uuid_t *l_uuid_timer = DAP_DUP(&l_node_client->uuid); - l_node_client->sync_timer = dap_timerfd_start_on_worker(l_stream->esocket->worker, + l_node_client->sync_timer = dap_timerfd_start_on_worker(l_stream->esocket->context->worker, s_timer_update_states * 1000, s_timer_update_states_callback, l_uuid_timer); diff --git a/modules/net/dap_chain_node_dns_client.c b/modules/net/dap_chain_node_dns_client.c index 3ed6c7f12b..ee4fdc06aa 100644 --- a/modules/net/dap_chain_node_dns_client.c +++ b/modules/net/dap_chain_node_dns_client.c @@ -68,7 +68,7 @@ static void s_dns_client_esocket_read_callback(dap_events_socket_t * a_esocket, size_t l_addr_point = DNS_HEADER_SIZE + strlen(l_dns_client->name) + 2 + 2 * sizeof(uint16_t) + DNS_ANSWER_SIZE - sizeof(uint32_t); if (l_recieved < l_addr_point + sizeof(uint32_t)) { log_it(L_WARNING, "DNS answer incomplete"); - l_dns_client->callback_error(a_esocket->worker, l_dns_client->result,l_dns_client->callbacks_arg,EIO ); + l_dns_client->callback_error(a_esocket->context->worker, l_dns_client->result,l_dns_client->callbacks_arg,EIO ); l_dns_client->is_callbacks_called = true; a_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; a_esocket->buf_in_size = a_esocket->buf_out_size = 0; @@ -78,7 +78,7 @@ static void s_dns_client_esocket_read_callback(dap_events_socket_t * a_esocket, int l_answers_count = ntohs(*(uint16_t *)l_cur); if (l_answers_count != 1) { log_it(L_WARNING, "Incorrect DNS answer format"); - l_dns_client->callback_error(a_esocket->worker, l_dns_client->result,l_dns_client->callbacks_arg,EINVAL); + l_dns_client->callback_error(a_esocket->context->worker, l_dns_client->result,l_dns_client->callbacks_arg,EINVAL); l_dns_client->is_callbacks_called = true; a_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; a_esocket->buf_in_size = a_esocket->buf_out_size = 0; @@ -101,7 +101,7 @@ static void s_dns_client_esocket_read_callback(dap_events_socket_t * a_esocket, } } - l_dns_client->callback_success(a_esocket->worker,l_dns_client->result,l_dns_client->callbacks_arg); + l_dns_client->callback_success(a_esocket->context->worker,l_dns_client->result,l_dns_client->callbacks_arg); l_dns_client->is_callbacks_called = true; a_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; a_esocket->buf_in_size = a_esocket->buf_out_size = 0; @@ -116,7 +116,7 @@ static void s_dns_client_esocket_error_callback(dap_events_socket_t * a_esocket, { struct dns_client * l_dns_client = (struct dns_client*) a_esocket->_inheritor; log_it(L_ERROR,"DNS client esocket error %d", a_error); - l_dns_client->callback_error(a_esocket->worker, l_dns_client->result,l_dns_client->callbacks_arg,a_error); + l_dns_client->callback_error(a_esocket->context->worker, l_dns_client->result,l_dns_client->callbacks_arg,a_error); l_dns_client->is_callbacks_called = true; } @@ -143,7 +143,7 @@ static bool s_dns_client_esocket_timeout_callback(void * a_arg) struct dns_client * l_dns_client = (struct dns_client*) l_es->_inheritor; log_it(L_WARNING,"DNS request timeout, bad network?"); if(! l_dns_client->is_callbacks_called ){ - l_dns_client->callback_error(l_es->worker,l_dns_client->result,l_dns_client->callbacks_arg,ETIMEDOUT); + l_dns_client->callback_error(l_es->context->worker,l_dns_client->result,l_dns_client->callbacks_arg,ETIMEDOUT); l_dns_client->is_callbacks_called = true; } @@ -163,7 +163,7 @@ static void s_dns_client_esocket_delete_callback(dap_events_socket_t * a_esocket (void) a_arg; struct dns_client * l_dns_client = (struct dns_client*) a_esocket->_inheritor; if(! l_dns_client->is_callbacks_called ) - l_dns_client->callback_error(a_esocket->worker,l_dns_client->result,l_dns_client->callbacks_arg,EBUSY); + l_dns_client->callback_error(a_esocket->context->worker,l_dns_client->result,l_dns_client->callbacks_arg,EBUSY); if(l_dns_client->name) DAP_DELETE(l_dns_client->name); DAP_DEL_Z(l_dns_client->buf); -- GitLab