From d7106fee1828bd7853d47d0b9f2f1328d160d3bb Mon Sep 17 00:00:00 2001 From: Dmitriy Gerasimov <naeper@demlabs.net> Date: Tue, 9 Feb 2021 21:17:56 +0700 Subject: [PATCH] [+] More kqueue polling --- dap-sdk/net/core/dap_events.c | 15 +++- dap-sdk/net/core/dap_events_socket.c | 79 ++++++++++++++++++-- dap-sdk/net/core/dap_proc_thread.c | 58 +++++++++++++- dap-sdk/net/core/dap_worker.c | 47 ++++++++++-- dap-sdk/net/core/include/dap_events_socket.h | 21 +++++- dap-sdk/net/core/include/dap_proc_thread.h | 4 + dap-sdk/net/core/include/dap_server.h | 2 + dap-sdk/net/core/include/dap_timerfd.h | 6 +- dap-sdk/net/core/include/dap_worker.h | 7 ++ 9 files changed, 213 insertions(+), 26 deletions(-) diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c index aea5f2b593..80e5ff6277 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/net/core/dap_events.c @@ -50,11 +50,16 @@ #include <sys/timerfd.h> #endif + #include <pthread.h> #ifdef DAP_OS_BSD #include <pthread_np.h> +#include <sys/event.h> +#include <err.h> +#include <fcntl.h> typedef cpuset_t cpu_set_t; // Adopt BSD CPU setstructure to POSIX variant + #endif #if defined(DAP_OS_ANDROID) @@ -257,11 +262,11 @@ int dap_events_start( dap_events_t *a_events ) l_worker->id = i; l_worker->events = a_events; pthread_rwlock_init(&l_worker->esocket_rwlock,NULL); - -#ifdef DAP_EVENTS_CAPS_EPOLL - l_worker->epoll_fd = epoll_create( DAP_MAX_EVENTS_COUNT ); pthread_mutex_init(& l_worker->started_mutex, NULL); pthread_cond_init( & l_worker->started_cond, NULL); + +#if defined(DAP_EVENTS_CAPS_EPOLL) + l_worker->epoll_fd = epoll_create( DAP_MAX_EVENTS_COUNT ); //log_it(L_DEBUG, "Created event_fd %d for worker %u", l_worker->epoll_fd,i); #ifdef DAP_OS_WINDOWS if (!l_worker->epoll_fd) { @@ -276,6 +281,10 @@ int dap_events_start( dap_events_t *a_events ) DAP_DELETE(l_worker); return -1; } +#elif defined(DAP_EVENTS_CAPS_POLL) +#elif defined(DAP_EVENTS_CAPS_KQUEUE) +#else +#error "Not defined worker init for your platform" #endif s_workers[i] = l_worker; pthread_mutex_lock(&l_worker->started_mutex); diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 1479af7174..802cc1ba57 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -29,20 +29,26 @@ #include <assert.h> #include <errno.h> -#ifndef _WIN32 +#if defined (DAP_OS_LINUX) #include <sys/epoll.h> #include <sys/types.h> #include <sys/select.h> #include <unistd.h> #include <sys/socket.h> #include <arpa/inet.h> +#elif defined (DAP_OS_BSD) +#include <sys/types.h> +#include <sys/select.h> +#include <unistd.h> +#include <sys/socket.h> +#include <arpa/inet.h> - -#else +#elif defined (DAP_OS_WINDOWS) #include <winsock2.h> #include <windows.h> #include <mswsock.h> #include <io.h> + #endif @@ -52,6 +58,14 @@ #include <sys/resource.h> #endif +#ifdef DAP_OS_BSD +#include <pthread_np.h> +#include <sys/event.h> +#include <err.h> +typedef cpuset_t cpu_set_t; // Adopt BSD CPU setstructure to POSIX variant +#endif + + #include <fcntl.h> #include <pthread.h> @@ -95,7 +109,6 @@ int dap_events_socket_init( ) #if defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE) #include <sys/time.h> #include <sys/resource.h> - struct rlimit l_mqueue_limit; l_mqueue_limit.rlim_cur = RLIM_INFINITY; l_mqueue_limit.rlim_max = RLIM_INFINITY; @@ -161,6 +174,10 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, l_ret->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP; #elif defined(DAP_EVENTS_CAPS_POLL) l_ret->poll_base_flags = POLLERR | POLLRDHUP | POLLHUP; + #elif defined(DAP_EVENTS_CAPS_KQUEUE) + l_ret->kqueue_base_flags = EV_ADD | EV_ENABLE | EV_CLEAR; + l_ret->kqueue_base_fflags = NOTE_CLOSE | NOTE_CLOSE_WRITE | NOTE_DELETE | NOTE_REVOKE ; + l_ret->kqueue_base_filter = EVFILT_VNODE; #endif if ( a_sock!= 0 && a_sock != -1){ @@ -259,6 +276,10 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; #elif defined(DAP_EVENTS_CAPS_POLL) l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; +#elif defined(DAP_EVENTS_CAPS_KQUEUE) + l_es->kqueue_base_flags = EV_ADD | EV_ENABLE | EV_CLEAR; + l_es->kqueue_base_fflags = NOTE_CLOSE | NOTE_CLOSE_WRITE | NOTE_DELETE | NOTE_REVOKE ; + l_es->kqueue_base_filter = EVFILT_VNODE; #else #error "Not defined s_create_type_pipe for your platform" #endif @@ -326,7 +347,7 @@ dap_events_socket_t * dap_events_socket_create(dap_events_desc_type_t a_type, da #ifdef DAP_OS_UNIX l_sock_class = AF_LOCAL; #elif defined DAP_OS_WINDOWS - l_sock_class = AF_UNIX; + l_sock_class = AF_INET; #endif break; default: @@ -407,9 +428,16 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket l_es->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP; #elif defined(DAP_EVENTS_CAPS_POLL) l_es->poll_base_flags = POLLERR | POLLRDHUP | POLLHUP; +#elif defined(DAP_EVENTS_CAPS_KQUEUE) + // We don't create descriptor for kqueue at all + l_es->fd = -1; + l_es->kqueue_base_flags = EV_ADD | EV_ENABLE | EV_CLEAR; + l_es->kqueue_base_fflags = NOTE_TRIGGER; + l_es->kqueue_base_filter = EVFILT_USER; #else #error "Not defined s_create_type_pipe for your platform" #endif + #ifdef DAP_EVENTS_CAPS_QUEUE_MQUEUE l_es->mqd = a_es->mqd; char l_mq_name[64]; @@ -436,7 +464,7 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket return NULL; } assert(l_es->mqd); -#elif defined (DAP_EVENTS_CAPS_QUEUE_PIPE2) +#elif defined (DAP_EVENTS_CAPS_QUEUE_PIPE2) || defined (DAP_EVENTS_CAPS_QUEUE_PIPE) l_es->fd = a_es->fd2; #elif defined DAP_EVENTS_CAPS_MSMQ l_es->mqh = a_es->mqh; @@ -474,6 +502,9 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket log_it(L_ERROR, "Can't open message queue for queue type, error: 0x%x", hr); return NULL; } +#elif defined (DAP_EVENTS_CAPS_KQUEUE) + // We don't create descriptor for kqueue at all + l_es->fd = l_es->fd2 = -1; #else #error "Not defined dap_events_socket_queue_ptr_create_input() for this platform" #endif @@ -512,17 +543,25 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; #elif defined(DAP_EVENTS_CAPS_POLL) l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; +#elif defined(DAP_EVENTS_CAPS_KQUEUE) + l_es->kqueue_base_flags = EV_ADD | EV_ENABLE | EV_CLEAR; + l_es->kqueue_base_fflags = NOTE_TRIGGER; + l_es->kqueue_base_filter = EVFILT_USER; #else #error "Not defined s_create_type_queue_ptr for your platform" #endif -#ifdef DAP_EVENTS_CAPS_QUEUE_PIPE2 +#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) || defined(DAP_EVENTS_CAPS_QUEUE_PIPE) int l_pipe[2]; int l_errno; char l_errbuf[128]; l_errbuf[0]=0; +#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) if( pipe2(l_pipe,O_DIRECT | O_NONBLOCK ) < 0 ){ +#elif defined(DAP_EVENTS_CAPS_QUEUE_PIPE) + if( pipe(l_pipe) < 0 ){ +#endif l_errno = errno; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); switch (l_errno) { @@ -531,10 +570,27 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc } DAP_DELETE(l_es); return NULL; - }//else + } + //else // log_it(L_DEBUG, "Created one-way unnamed packet pipe %d->%d", l_pipe[0], l_pipe[1]); l_es->fd = l_pipe[0]; l_es->fd2 = l_pipe[1]; + +#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE) + // If we have no pipe2() we should set nonblock mode via fcntl + if (l_es->fd > 0 && l_es->fd2 > 0 ) { + int l_flags = fcntl(l_es->fd, F_GETFL, 0); + if (l_flags != -1){ + l_flags |= O_NONBLOCK); + fcntl(l_es->fd, F_SETFL, l_flags) == 0); + } + l_flags = fcntl(l_es->fd2, F_GETFL, 0); + if (l_flags != -1){ + l_flags |= O_NONBLOCK); + fcntl(l_es->fd2, F_SETFL, l_flags) == 0); + } + } +#endif #if !defined (DAP_OS_ANDROID) FILE* l_sys_max_pipe_size_fd = fopen("/proc/sys/fs/pipe-max-size", "r"); @@ -678,6 +734,9 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc if (hr != MQ_OK) { log_it(L_DEBUG, "Message queue %d NOT purged, possible data corruption, err %d", l_es->mq_num, hr); } +#elif defined (DAP_EVENTS_CAPS_KQUEUE) + // We don't create descriptor for kqueue at all + l_es->fd = l_es->fd2 = -1; #else #error "Not implemented s_create_type_queue_ptr() on your platform" @@ -834,6 +893,10 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_ l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; #elif defined(DAP_EVENTS_CAPS_POLL) l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; +#elif defined(DAP_EVENTS_CAPS_KQUEUE) + l_es->kqueue_base_flags = EV_ADD | EV_ENABLE | EV_CLEAR; + l_es->kqueue_base_fflags = NOTE_CLOSE | NOTE_CLOSE_WRITE | NOTE_DELETE | NOTE_REVOKE ; + l_es->kqueue_base_filter = EVFILT_VNODE; #else #error "Not defined s_create_type_event for your platform" #endif diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index ee68c3c437..ad16735a88 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -31,6 +31,11 @@ #include "wepoll.h" #elif defined (DAP_EVENTS_CAPS_POLL) #include <poll.h> +#elif defined (DAP_EVENTS_CAPS_KQUEUE) +#include <pthread_np.h> +#include <sys/event.h> +#include <err.h> +typedef cpuset_t cpu_set_t; // Adopt BSD CPU setstructure to POSIX variant #else #error "Unimplemented poll for this platform" #endif @@ -209,6 +214,22 @@ int dap_proc_thread_esocket_update_poll_flags(dap_proc_thread_t * a_thread, dap_ a_thread->poll[a_esocket->poll_index].revents |= POLLIN; if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE) a_thread->poll[a_esocket->poll_index].revents |= POLLOUT; + +#elif defined (DAP_EVENTS_CAPS_KQUEUE) + u_short l_flags = a_esocket->kqueue_base_flags; + u_int l_fflags = a_esocket->kqueue_base_fflags; + short l_filter = a_esocket->kqueue_base_filter; + if(a_esocket->flags & DAP_SOCK_READY_TO_READ ) + l_fflags |= NOTE_READ; + if(a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) + l_fflags |= NOTE_WRITE; + EV_SET(&a_esocket->kqueue_event , a_esocket->socket, l_filter, EV_ADD| l_flags | EV_CLEAR, l_fflags,0, a_esocket); + + if( kevent ( a_thread->kqueue_fd,&a_esocket->kqueue_event,1,NULL,0,NULL)!=1 ){ + log_it(L_CRITICAL, "Can't add descriptor in proc thread kqueue , err: %d", errno); + return -1; + } + #else #error "Not defined dap_proc_thread.c::s_update_poll_flags() on your platform" #endif @@ -404,6 +425,25 @@ static void * s_proc_thread_function(void * a_arg) l_thread->poll_count++; } } +#elif defined (DAP_EVENTS_CAPS_KQUEUE) + // Create kqueue fd + l_thread->kqueue_fd = kqueue(); + l_thread->kqueue_events_count_max = DAP_EVENTS_SOCKET_MAX; + l_thread->kqueue_events = DAP_NEW_Z_SIZE(struct kevent, l_worker->kqueue_events_count_max *sizeof(struct kevent)); + + dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->proc_queue->esocket); + dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->proc_event); + + for (size_t n = 0; n< dap_events_worker_get_count(); n++){ + // Queue asssign + dap_proc_thread_assign_esocket_unsafe(l_thread, l_thread->queue_assign_input[n]); + + // Queue IO + dap_proc_thread_assign_esocket_unsafe(l_thread, l_thread->queue_io_input[n]); + + // Queue callback + dap_proc_thread_assign_esocket_unsafe(l_thread, l_thread->queue_callback_input[n]); + } #else #error "Unimplemented poll events analog for this platform" @@ -416,13 +456,18 @@ static void * s_proc_thread_function(void * a_arg) // Main loop while (! l_thread->signal_kill){ + int l_selected_sockets; + size_t l_sockets_max; #ifdef DAP_EVENTS_CAPS_EPOLL //log_it(L_DEBUG, "Epoll_wait call"); - int l_selected_sockets = epoll_wait(l_thread->epoll_ctl, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1); - size_t l_sockets_max = (size_t)l_selected_sockets; + l_selected_sockets = epoll_wait(l_thread->epoll_ctl, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1); + l_sockets_max = (size_t)l_selected_sockets; #elif defined (DAP_EVENTS_CAPS_POLL) - int l_selected_sockets = poll(l_thread->poll,l_thread->poll_count,-1); - size_t l_sockets_max = l_thread->poll_count; + l_selected_sockets = poll(l_thread->poll,l_thread->poll_count,-1); + l_sockets_max = l_thread->poll_count; +#elif defined (DAP_EVENTS_CAPS_KQUEUE) + l_selected_sockets = kevent(l_thread->kqueue_fd,NULL,0,l_thread->kqueue_events,l_thread->kqueue_events_count_max,NULL); + l_sockets_max = l_thread->kqueue_events_count_max; #else #error "Unimplemented poll wait analog for this platform" #endif @@ -472,6 +517,11 @@ static void * s_proc_thread_function(void * a_arg) l_flag_nval = l_cur_events & POLLNVAL; l_flag_pri = l_cur_events & POLLPRI; l_flag_msg = l_cur_events & POLLMSG; +#elif defined (DAP_EVENTS_CAPS_KQUEUE) + l_cur = (dap_events_socket_t*) l_thread->kqueue_events[n].udata; + u_int l_cur_flags = l_thread->kqueue_events[n].fflags; + l_flag_write = l_cur_flags & EVFILT_WRITE; + l_flag_read = l_cur_flags & EVFILT_READ; #else #error "Unimplemented fetch esocket after poll" #endif diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 81e769d424..a0b7d72134 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -101,6 +101,7 @@ void *dap_worker_thread(void *arg) uint32_t l_tn = l_worker->id; dap_cpu_assign_thread_on(l_worker->id); + pthread_setspecific(l_worker->events->pth_key_worker, l_worker); struct sched_param l_shed_params; l_shed_params.sched_priority = 0; #ifdef DAP_OS_WINDOWS @@ -113,6 +114,18 @@ void *dap_worker_thread(void *arg) #ifdef DAP_EVENTS_CAPS_EPOLL struct epoll_event l_epoll_events[ DAP_EVENTS_SOCKET_MAX]= {{0}}; log_it(L_INFO, "Worker #%d started with epoll fd %d and assigned to dedicated CPU unit", l_worker->id, l_worker->epoll_fd); +#elif defined(DAP_EVENTS_CAPS_KQUEUE) + l_worker->kqueue_fd = kqueue(); + if (l_worker->kqueue_fd == -1 ){ + int l_errno = errno; + char l_errbuf[255]; + strerror_r(l_errno,l_errbuf,sizeof(l_errbuf)); + log_it (L_CRITICAL,"Can't create kqueue():\"\" code %d",l_errbuf,l_errno); + pthread_cond_broadcast(&l_worker->started_cond); + return NULL; + } + l_worker->kqueue_events_count_max = DAP_EVENTS_SOCKET_MAX; + l_worker->kqueue_events = DAP_NEW_Z_SIZE(struct kevent, l_worker->kqueue_events_count_max *sizeof(struct kevent)); #elif defined(DAP_EVENTS_CAPS_POLL) l_worker->poll_count_max = DAP_EVENTS_SOCKET_MAX; l_worker->poll = DAP_NEW_Z_SIZE(struct pollfd,l_worker->poll_count_max*sizeof (struct pollfd)); @@ -137,16 +150,19 @@ void *dap_worker_thread(void *arg) l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker, s_connection_timeout * 1000 / 2, s_socket_all_check_activity, l_worker); - pthread_setspecific(l_worker->events->pth_key_worker, l_worker); - pthread_cond_broadcast(&l_worker->started_cond); bool s_loop_is_active = true; while(s_loop_is_active) { + int l_selected_sockets; + size_t l_sockets_max; #ifdef DAP_EVENTS_CAPS_EPOLL - int l_selected_sockets = epoll_wait(l_worker->epoll_fd, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1); - size_t l_sockets_max = l_selected_sockets; + l_selected_sockets = epoll_wait(l_worker->epoll_fd, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1); + l_sockets_max = l_selected_sockets; #elif defined(DAP_EVENTS_CAPS_POLL) - int l_selected_sockets = poll(l_worker->poll, l_worker->poll_count, -1); - size_t l_sockets_max = l_worker->poll_count; + l_selected_sockets = poll(l_worker->poll, l_worker->poll_count, -1); + l_sockets_max = l_worker->poll_count; +#elif defined(DAP_EVENTS_CAPS_KQUEUE) + l_selected_sockets = kevent(l_worker->kqueue_fd,NULL,0,l_worker->kqueue_events,l_worker->kqueue_events_count_max,NULL); + l_sockets_max = l_worker->kqueue_events_count_max; #else #error "Unimplemented poll wait analog for this platform" #endif @@ -197,6 +213,12 @@ void *dap_worker_thread(void *arg) l_flag_msg = l_cur_events & POLLMSG; l_cur = l_worker->poll_esocket[n]; //log_it(L_DEBUG, "flags: returned events 0x%0X requested events 0x%0X",l_worker->poll[n].revents,l_worker->poll[n].events ); +#elif defined (DAP_EVENTS_CAPS_KQUEUE) + l_cur = (dap_events_socket_t*) l_worker->kqueue_events[n].udata; + u_int l_cur_flags = l_worker->kqueue_events[n].fflags; + l_flag_write = l_cur_flags & EVFILT_WRITE; + l_flag_read = l_cur_flags & EVFILT_READ; + #else #error "Unimplemented fetch esocket after poll" #endif @@ -546,7 +568,6 @@ void *dap_worker_thread(void *arg) case DESCRIPTOR_TYPE_QUEUE: if (l_cur->flags & DAP_SOCK_QUEUE_PTR && l_cur->buf_out_size>= sizeof (void*)){ #if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) - l_bytes_sent = write(l_cur->socket, l_cur->buf_out, sizeof (void *) ); // We send pointer by pointer #elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX) l_bytes_sent = mq_send(a_es->mqd, (const char *)&a_arg,sizeof (a_arg),0); @@ -960,6 +981,18 @@ int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_wo a_worker->poll_esocket[a_worker->poll_count] = a_esocket; a_worker->poll_count++; return 0; +#elif defined (DAP_EVENTS_CAPS_KQUEUE) + u_short l_flags = a_esocket->kqueue_base_flags; + u_int l_fflags = a_esocket->kqueue_base_fflags; + short l_filter = a_esocket->kqueue_base_filter; + if(a_esocket->flags & DAP_SOCK_READY_TO_READ ) + l_fflags |= NOTE_READ; + if(a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) + l_fflags |= NOTE_WRITE; + + EV_SET(&a_esocket->kqueue_event , a_esocket->socket, l_filter, EV_ADD| l_flags | EV_CLEAR, l_fflags,0, a_esocket); + return kevent ( a_worker->kqueue_fd,&a_esocket->kqueue_event,1,NULL,0,NULL)==1 ? 0 : -1 ; + #else #error "Unimplemented new esocket on worker callback for current platform" #endif diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 07b45c23ef..8c1a2b4a15 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -55,10 +55,16 @@ typedef int SOCKET; #include <netinet/in.h> #include <sys/eventfd.h> #include <mqueue.h> -#elif defined (DAP_OS_UNIX) +#elif defined (DAP_OS_BSD) #define DAP_EVENTS_CAPS_KQUEUE #define DAP_EVENTS_CAPS_PIPE_POSIX #define DAP_EVENTS_CAPS_EVENT_KEVENT + #define DAP_EVENTS_CAPS_QUEUE_KEVENT + #include <netinet/in.h> +#elif defined (DAP_OS_UNIX) + #define DAP_EVENTS_CAPS_POLL + #define DAP_EVENTS_CAPS_PIPE_POSIX + #define DAP_EVENTS_CAPS_EVENT_PIPE #define DAP_EVENTS_CAPS_QUEUE_SOCKETPAIR #include <netinet/in.h> #elif defined (DAP_OS_WINDOWS) @@ -161,7 +167,7 @@ typedef struct dap_events_socket { mqd_t mqd; }; uint32_t mqd_id; -#elif defined DAP_EVENTS_CAPS_MSMQ +#elif defined(DAP_EVENTS_CAPS_MSMQ) }; QUEUEHANDLE mqh, mqh_recv; u_int mq_num; @@ -171,9 +177,11 @@ typedef struct dap_events_socket { }; #endif -#if defined DAP_EVENTS_CAPS_PIPE_POSIX +#if defined(DAP_EVENTS_CAPS_PIPE_POSIX) int fd2; #endif + + dap_events_desc_type_t type; uint128_t uuid; // Unique UID // Related sockets (be careful - possible problems, delete them before ) @@ -232,6 +240,13 @@ typedef struct dap_events_socket { #elif defined (DAP_EVENTS_CAPS_POLL) short poll_base_flags; uint32_t poll_index; // index in poll array on worker +#elif defined (DAP_EVENTS_CAPS_KQUEUE) + struct kevent kqueue_event; + short kqueue_base_filter; + u_short kqueue_base_flags; + u_int kqueue_base_fflags; + int64_t kqueue_data; + uint64_t kqueue_ext[4]; #endif dap_events_socket_callbacks_t callbacks; diff --git a/dap-sdk/net/core/include/dap_proc_thread.h b/dap-sdk/net/core/include/dap_proc_thread.h index 8f307aff59..7c46f7805b 100644 --- a/dap-sdk/net/core/include/dap_proc_thread.h +++ b/dap-sdk/net/core/include/dap_proc_thread.h @@ -52,6 +52,10 @@ typedef struct dap_proc_thread{ dap_events_socket_t ** esockets; size_t poll_count; size_t poll_count_max; +#elif defined (DAP_EVENTS_CAPS_KQUEUE) + int kqueue_fd; + struct kevent * kqueue_events; + int kqueue_events_count; #else #error "No poll for proc thread for your platform" #endif diff --git a/dap-sdk/net/core/include/dap_server.h b/dap-sdk/net/core/include/dap_server.h index 4db1868453..d201033231 100644 --- a/dap-sdk/net/core/include/dap_server.h +++ b/dap-sdk/net/core/include/dap_server.h @@ -39,6 +39,8 @@ #define MSG_DONTWAIT 0 #define MSG_NOSIGNAL 0 #include "winsock.h" +#elif defined(DAP_OS_BSD) + #else #error "No poll headers for your platform" #endif diff --git a/dap-sdk/net/core/include/dap_timerfd.h b/dap-sdk/net/core/include/dap_timerfd.h index d6c7f003f7..0bf5dff9eb 100644 --- a/dap-sdk/net/core/include/dap_timerfd.h +++ b/dap-sdk/net/core/include/dap_timerfd.h @@ -27,9 +27,13 @@ #include <stdio.h> #include <stdlib.h> #include <unistd.h> -#if defined DAP_OS_UNIX + + +#if defined DAP_OS_LINUX #include <sys/time.h> #include <sys/timerfd.h> +#elif defined DAP_OS_BSD +#include <sys/event.h> #elif defined DAP_OS_WINDOWS #define _MSEC -10000 #endif diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/net/core/include/dap_worker.h index a88a9e4599..df6591861d 100644 --- a/dap-sdk/net/core/include/dap_worker.h +++ b/dap-sdk/net/core/include/dap_worker.h @@ -65,6 +65,7 @@ typedef struct dap_worker #if defined DAP_EVENTS_CAPS_MSMQ HANDLE msmq_events[MAXIMUM_WAIT_OBJECTS]; #endif + #if defined DAP_EVENTS_CAPS_EPOLL EPOLL_HANDLE epoll_fd; #elif defined ( DAP_EVENTS_CAPS_POLL) @@ -74,6 +75,12 @@ typedef struct dap_worker size_t poll_count; size_t poll_count_max; bool poll_compress; // Some of fd's became NULL so arrays need to be reassigned +#elif defined (DAP_EVENTS_CAPS_KQUEUE) + int kqueue_fd; + struct kevent * kqueue_events; + int kqueue_events_count_max; +#else +#error "Not defined worker for your platform" #endif pthread_cond_t started_cond; pthread_mutex_t started_mutex; -- GitLab