Skip to content
Snippets Groups Projects
Commit d7106fee authored by dmitriy.gerasimov's avatar dmitriy.gerasimov
Browse files

[+] More kqueue polling

parent 1c5718b3
No related branches found
No related tags found
No related merge requests found
Pipeline #7176 passed with stage
in 10 seconds
......@@ -50,11 +50,16 @@
#include <sys/timerfd.h>
#endif
#include <pthread.h>
#ifdef DAP_OS_BSD
#include <pthread_np.h>
#include <sys/event.h>
#include <err.h>
#include <fcntl.h>
typedef cpuset_t cpu_set_t; // Adopt BSD CPU setstructure to POSIX variant
#endif
#if defined(DAP_OS_ANDROID)
......@@ -257,11 +262,11 @@ int dap_events_start( dap_events_t *a_events )
l_worker->id = i;
l_worker->events = a_events;
pthread_rwlock_init(&l_worker->esocket_rwlock,NULL);
#ifdef DAP_EVENTS_CAPS_EPOLL
l_worker->epoll_fd = epoll_create( DAP_MAX_EVENTS_COUNT );
pthread_mutex_init(& l_worker->started_mutex, NULL);
pthread_cond_init( & l_worker->started_cond, NULL);
#if defined(DAP_EVENTS_CAPS_EPOLL)
l_worker->epoll_fd = epoll_create( DAP_MAX_EVENTS_COUNT );
//log_it(L_DEBUG, "Created event_fd %d for worker %u", l_worker->epoll_fd,i);
#ifdef DAP_OS_WINDOWS
if (!l_worker->epoll_fd) {
......@@ -276,6 +281,10 @@ int dap_events_start( dap_events_t *a_events )
DAP_DELETE(l_worker);
return -1;
}
#elif defined(DAP_EVENTS_CAPS_POLL)
#elif defined(DAP_EVENTS_CAPS_KQUEUE)
#else
#error "Not defined worker init for your platform"
#endif
s_workers[i] = l_worker;
pthread_mutex_lock(&l_worker->started_mutex);
......
......@@ -29,20 +29,26 @@
#include <assert.h>
#include <errno.h>
#ifndef _WIN32
#if defined (DAP_OS_LINUX)
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/select.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#elif defined (DAP_OS_BSD)
#include <sys/types.h>
#include <sys/select.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#else
#elif defined (DAP_OS_WINDOWS)
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <io.h>
#endif
......@@ -52,6 +58,14 @@
#include <sys/resource.h>
#endif
#ifdef DAP_OS_BSD
#include <pthread_np.h>
#include <sys/event.h>
#include <err.h>
typedef cpuset_t cpu_set_t; // Adopt BSD CPU setstructure to POSIX variant
#endif
#include <fcntl.h>
#include <pthread.h>
......@@ -95,7 +109,6 @@ int dap_events_socket_init( )
#if defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE)
#include <sys/time.h>
#include <sys/resource.h>
struct rlimit l_mqueue_limit;
l_mqueue_limit.rlim_cur = RLIM_INFINITY;
l_mqueue_limit.rlim_max = RLIM_INFINITY;
......@@ -161,6 +174,10 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events,
l_ret->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP;
#elif defined(DAP_EVENTS_CAPS_POLL)
l_ret->poll_base_flags = POLLERR | POLLRDHUP | POLLHUP;
#elif defined(DAP_EVENTS_CAPS_KQUEUE)
l_ret->kqueue_base_flags = EV_ADD | EV_ENABLE | EV_CLEAR;
l_ret->kqueue_base_fflags = NOTE_CLOSE | NOTE_CLOSE_WRITE | NOTE_DELETE | NOTE_REVOKE ;
l_ret->kqueue_base_filter = EVFILT_VNODE;
#endif
if ( a_sock!= 0 && a_sock != -1){
......@@ -259,6 +276,10 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c
l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP;
#elif defined(DAP_EVENTS_CAPS_POLL)
l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP;
#elif defined(DAP_EVENTS_CAPS_KQUEUE)
l_es->kqueue_base_flags = EV_ADD | EV_ENABLE | EV_CLEAR;
l_es->kqueue_base_fflags = NOTE_CLOSE | NOTE_CLOSE_WRITE | NOTE_DELETE | NOTE_REVOKE ;
l_es->kqueue_base_filter = EVFILT_VNODE;
#else
#error "Not defined s_create_type_pipe for your platform"
#endif
......@@ -326,7 +347,7 @@ dap_events_socket_t * dap_events_socket_create(dap_events_desc_type_t a_type, da
#ifdef DAP_OS_UNIX
l_sock_class = AF_LOCAL;
#elif defined DAP_OS_WINDOWS
l_sock_class = AF_UNIX;
l_sock_class = AF_INET;
#endif
break;
default:
......@@ -407,9 +428,16 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket
l_es->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP;
#elif defined(DAP_EVENTS_CAPS_POLL)
l_es->poll_base_flags = POLLERR | POLLRDHUP | POLLHUP;
#elif defined(DAP_EVENTS_CAPS_KQUEUE)
// We don't create descriptor for kqueue at all
l_es->fd = -1;
l_es->kqueue_base_flags = EV_ADD | EV_ENABLE | EV_CLEAR;
l_es->kqueue_base_fflags = NOTE_TRIGGER;
l_es->kqueue_base_filter = EVFILT_USER;
#else
#error "Not defined s_create_type_pipe for your platform"
#endif
#ifdef DAP_EVENTS_CAPS_QUEUE_MQUEUE
l_es->mqd = a_es->mqd;
char l_mq_name[64];
......@@ -436,7 +464,7 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket
return NULL;
}
assert(l_es->mqd);
#elif defined (DAP_EVENTS_CAPS_QUEUE_PIPE2)
#elif defined (DAP_EVENTS_CAPS_QUEUE_PIPE2) || defined (DAP_EVENTS_CAPS_QUEUE_PIPE)
l_es->fd = a_es->fd2;
#elif defined DAP_EVENTS_CAPS_MSMQ
l_es->mqh = a_es->mqh;
......@@ -474,6 +502,9 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket
log_it(L_ERROR, "Can't open message queue for queue type, error: 0x%x", hr);
return NULL;
}
#elif defined (DAP_EVENTS_CAPS_KQUEUE)
// We don't create descriptor for kqueue at all
l_es->fd = l_es->fd2 = -1;
#else
#error "Not defined dap_events_socket_queue_ptr_create_input() for this platform"
#endif
......@@ -512,17 +543,25 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc
l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP;
#elif defined(DAP_EVENTS_CAPS_POLL)
l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP;
#elif defined(DAP_EVENTS_CAPS_KQUEUE)
l_es->kqueue_base_flags = EV_ADD | EV_ENABLE | EV_CLEAR;
l_es->kqueue_base_fflags = NOTE_TRIGGER;
l_es->kqueue_base_filter = EVFILT_USER;
#else
#error "Not defined s_create_type_queue_ptr for your platform"
#endif
#ifdef DAP_EVENTS_CAPS_QUEUE_PIPE2
#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) || defined(DAP_EVENTS_CAPS_QUEUE_PIPE)
int l_pipe[2];
int l_errno;
char l_errbuf[128];
l_errbuf[0]=0;
#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2)
if( pipe2(l_pipe,O_DIRECT | O_NONBLOCK ) < 0 ){
#elif defined(DAP_EVENTS_CAPS_QUEUE_PIPE)
if( pipe(l_pipe) < 0 ){
#endif
l_errno = errno;
strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
switch (l_errno) {
......@@ -531,10 +570,27 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc
}
DAP_DELETE(l_es);
return NULL;
}//else
}
//else
// log_it(L_DEBUG, "Created one-way unnamed packet pipe %d->%d", l_pipe[0], l_pipe[1]);
l_es->fd = l_pipe[0];
l_es->fd2 = l_pipe[1];
#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE)
// If we have no pipe2() we should set nonblock mode via fcntl
if (l_es->fd > 0 && l_es->fd2 > 0 ) {
int l_flags = fcntl(l_es->fd, F_GETFL, 0);
if (l_flags != -1){
l_flags |= O_NONBLOCK);
fcntl(l_es->fd, F_SETFL, l_flags) == 0);
}
l_flags = fcntl(l_es->fd2, F_GETFL, 0);
if (l_flags != -1){
l_flags |= O_NONBLOCK);
fcntl(l_es->fd2, F_SETFL, l_flags) == 0);
}
}
#endif
#if !defined (DAP_OS_ANDROID)
FILE* l_sys_max_pipe_size_fd = fopen("/proc/sys/fs/pipe-max-size", "r");
......@@ -678,6 +734,9 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc
if (hr != MQ_OK) {
log_it(L_DEBUG, "Message queue %d NOT purged, possible data corruption, err %d", l_es->mq_num, hr);
}
#elif defined (DAP_EVENTS_CAPS_KQUEUE)
// We don't create descriptor for kqueue at all
l_es->fd = l_es->fd2 = -1;
#else
#error "Not implemented s_create_type_queue_ptr() on your platform"
......@@ -834,6 +893,10 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_
l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP;
#elif defined(DAP_EVENTS_CAPS_POLL)
l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP;
#elif defined(DAP_EVENTS_CAPS_KQUEUE)
l_es->kqueue_base_flags = EV_ADD | EV_ENABLE | EV_CLEAR;
l_es->kqueue_base_fflags = NOTE_CLOSE | NOTE_CLOSE_WRITE | NOTE_DELETE | NOTE_REVOKE ;
l_es->kqueue_base_filter = EVFILT_VNODE;
#else
#error "Not defined s_create_type_event for your platform"
#endif
......
......@@ -31,6 +31,11 @@
#include "wepoll.h"
#elif defined (DAP_EVENTS_CAPS_POLL)
#include <poll.h>
#elif defined (DAP_EVENTS_CAPS_KQUEUE)
#include <pthread_np.h>
#include <sys/event.h>
#include <err.h>
typedef cpuset_t cpu_set_t; // Adopt BSD CPU setstructure to POSIX variant
#else
#error "Unimplemented poll for this platform"
#endif
......@@ -209,6 +214,22 @@ int dap_proc_thread_esocket_update_poll_flags(dap_proc_thread_t * a_thread, dap_
a_thread->poll[a_esocket->poll_index].revents |= POLLIN;
if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE)
a_thread->poll[a_esocket->poll_index].revents |= POLLOUT;
#elif defined (DAP_EVENTS_CAPS_KQUEUE)
u_short l_flags = a_esocket->kqueue_base_flags;
u_int l_fflags = a_esocket->kqueue_base_fflags;
short l_filter = a_esocket->kqueue_base_filter;
if(a_esocket->flags & DAP_SOCK_READY_TO_READ )
l_fflags |= NOTE_READ;
if(a_esocket->flags & DAP_SOCK_READY_TO_WRITE )
l_fflags |= NOTE_WRITE;
EV_SET(&a_esocket->kqueue_event , a_esocket->socket, l_filter, EV_ADD| l_flags | EV_CLEAR, l_fflags,0, a_esocket);
if( kevent ( a_thread->kqueue_fd,&a_esocket->kqueue_event,1,NULL,0,NULL)!=1 ){
log_it(L_CRITICAL, "Can't add descriptor in proc thread kqueue , err: %d", errno);
return -1;
}
#else
#error "Not defined dap_proc_thread.c::s_update_poll_flags() on your platform"
#endif
......@@ -404,6 +425,25 @@ static void * s_proc_thread_function(void * a_arg)
l_thread->poll_count++;
}
}
#elif defined (DAP_EVENTS_CAPS_KQUEUE)
// Create kqueue fd
l_thread->kqueue_fd = kqueue();
l_thread->kqueue_events_count_max = DAP_EVENTS_SOCKET_MAX;
l_thread->kqueue_events = DAP_NEW_Z_SIZE(struct kevent, l_worker->kqueue_events_count_max *sizeof(struct kevent));
dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->proc_queue->esocket);
dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->proc_event);
for (size_t n = 0; n< dap_events_worker_get_count(); n++){
// Queue asssign
dap_proc_thread_assign_esocket_unsafe(l_thread, l_thread->queue_assign_input[n]);
// Queue IO
dap_proc_thread_assign_esocket_unsafe(l_thread, l_thread->queue_io_input[n]);
// Queue callback
dap_proc_thread_assign_esocket_unsafe(l_thread, l_thread->queue_callback_input[n]);
}
#else
#error "Unimplemented poll events analog for this platform"
......@@ -416,13 +456,18 @@ static void * s_proc_thread_function(void * a_arg)
// Main loop
while (! l_thread->signal_kill){
int l_selected_sockets;
size_t l_sockets_max;
#ifdef DAP_EVENTS_CAPS_EPOLL
//log_it(L_DEBUG, "Epoll_wait call");
int l_selected_sockets = epoll_wait(l_thread->epoll_ctl, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1);
size_t l_sockets_max = (size_t)l_selected_sockets;
l_selected_sockets = epoll_wait(l_thread->epoll_ctl, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1);
l_sockets_max = (size_t)l_selected_sockets;
#elif defined (DAP_EVENTS_CAPS_POLL)
int l_selected_sockets = poll(l_thread->poll,l_thread->poll_count,-1);
size_t l_sockets_max = l_thread->poll_count;
l_selected_sockets = poll(l_thread->poll,l_thread->poll_count,-1);
l_sockets_max = l_thread->poll_count;
#elif defined (DAP_EVENTS_CAPS_KQUEUE)
l_selected_sockets = kevent(l_thread->kqueue_fd,NULL,0,l_thread->kqueue_events,l_thread->kqueue_events_count_max,NULL);
l_sockets_max = l_thread->kqueue_events_count_max;
#else
#error "Unimplemented poll wait analog for this platform"
#endif
......@@ -472,6 +517,11 @@ static void * s_proc_thread_function(void * a_arg)
l_flag_nval = l_cur_events & POLLNVAL;
l_flag_pri = l_cur_events & POLLPRI;
l_flag_msg = l_cur_events & POLLMSG;
#elif defined (DAP_EVENTS_CAPS_KQUEUE)
l_cur = (dap_events_socket_t*) l_thread->kqueue_events[n].udata;
u_int l_cur_flags = l_thread->kqueue_events[n].fflags;
l_flag_write = l_cur_flags & EVFILT_WRITE;
l_flag_read = l_cur_flags & EVFILT_READ;
#else
#error "Unimplemented fetch esocket after poll"
#endif
......
......@@ -101,6 +101,7 @@ void *dap_worker_thread(void *arg)
uint32_t l_tn = l_worker->id;
dap_cpu_assign_thread_on(l_worker->id);
pthread_setspecific(l_worker->events->pth_key_worker, l_worker);
struct sched_param l_shed_params;
l_shed_params.sched_priority = 0;
#ifdef DAP_OS_WINDOWS
......@@ -113,6 +114,18 @@ void *dap_worker_thread(void *arg)
#ifdef DAP_EVENTS_CAPS_EPOLL
struct epoll_event l_epoll_events[ DAP_EVENTS_SOCKET_MAX]= {{0}};
log_it(L_INFO, "Worker #%d started with epoll fd %d and assigned to dedicated CPU unit", l_worker->id, l_worker->epoll_fd);
#elif defined(DAP_EVENTS_CAPS_KQUEUE)
l_worker->kqueue_fd = kqueue();
if (l_worker->kqueue_fd == -1 ){
int l_errno = errno;
char l_errbuf[255];
strerror_r(l_errno,l_errbuf,sizeof(l_errbuf));
log_it (L_CRITICAL,"Can't create kqueue():\"\" code %d",l_errbuf,l_errno);
pthread_cond_broadcast(&l_worker->started_cond);
return NULL;
}
l_worker->kqueue_events_count_max = DAP_EVENTS_SOCKET_MAX;
l_worker->kqueue_events = DAP_NEW_Z_SIZE(struct kevent, l_worker->kqueue_events_count_max *sizeof(struct kevent));
#elif defined(DAP_EVENTS_CAPS_POLL)
l_worker->poll_count_max = DAP_EVENTS_SOCKET_MAX;
l_worker->poll = DAP_NEW_Z_SIZE(struct pollfd,l_worker->poll_count_max*sizeof (struct pollfd));
......@@ -137,16 +150,19 @@ void *dap_worker_thread(void *arg)
l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker, s_connection_timeout * 1000 / 2,
s_socket_all_check_activity, l_worker);
pthread_setspecific(l_worker->events->pth_key_worker, l_worker);
pthread_cond_broadcast(&l_worker->started_cond);
bool s_loop_is_active = true;
while(s_loop_is_active) {
int l_selected_sockets;
size_t l_sockets_max;
#ifdef DAP_EVENTS_CAPS_EPOLL
int l_selected_sockets = epoll_wait(l_worker->epoll_fd, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1);
size_t l_sockets_max = l_selected_sockets;
l_selected_sockets = epoll_wait(l_worker->epoll_fd, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1);
l_sockets_max = l_selected_sockets;
#elif defined(DAP_EVENTS_CAPS_POLL)
int l_selected_sockets = poll(l_worker->poll, l_worker->poll_count, -1);
size_t l_sockets_max = l_worker->poll_count;
l_selected_sockets = poll(l_worker->poll, l_worker->poll_count, -1);
l_sockets_max = l_worker->poll_count;
#elif defined(DAP_EVENTS_CAPS_KQUEUE)
l_selected_sockets = kevent(l_worker->kqueue_fd,NULL,0,l_worker->kqueue_events,l_worker->kqueue_events_count_max,NULL);
l_sockets_max = l_worker->kqueue_events_count_max;
#else
#error "Unimplemented poll wait analog for this platform"
#endif
......@@ -197,6 +213,12 @@ void *dap_worker_thread(void *arg)
l_flag_msg = l_cur_events & POLLMSG;
l_cur = l_worker->poll_esocket[n];
//log_it(L_DEBUG, "flags: returned events 0x%0X requested events 0x%0X",l_worker->poll[n].revents,l_worker->poll[n].events );
#elif defined (DAP_EVENTS_CAPS_KQUEUE)
l_cur = (dap_events_socket_t*) l_worker->kqueue_events[n].udata;
u_int l_cur_flags = l_worker->kqueue_events[n].fflags;
l_flag_write = l_cur_flags & EVFILT_WRITE;
l_flag_read = l_cur_flags & EVFILT_READ;
#else
#error "Unimplemented fetch esocket after poll"
#endif
......@@ -546,7 +568,6 @@ void *dap_worker_thread(void *arg)
case DESCRIPTOR_TYPE_QUEUE:
if (l_cur->flags & DAP_SOCK_QUEUE_PTR && l_cur->buf_out_size>= sizeof (void*)){
#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2)
l_bytes_sent = write(l_cur->socket, l_cur->buf_out, sizeof (void *) ); // We send pointer by pointer
#elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX)
l_bytes_sent = mq_send(a_es->mqd, (const char *)&a_arg,sizeof (a_arg),0);
......@@ -960,6 +981,18 @@ int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_wo
a_worker->poll_esocket[a_worker->poll_count] = a_esocket;
a_worker->poll_count++;
return 0;
#elif defined (DAP_EVENTS_CAPS_KQUEUE)
u_short l_flags = a_esocket->kqueue_base_flags;
u_int l_fflags = a_esocket->kqueue_base_fflags;
short l_filter = a_esocket->kqueue_base_filter;
if(a_esocket->flags & DAP_SOCK_READY_TO_READ )
l_fflags |= NOTE_READ;
if(a_esocket->flags & DAP_SOCK_READY_TO_WRITE )
l_fflags |= NOTE_WRITE;
EV_SET(&a_esocket->kqueue_event , a_esocket->socket, l_filter, EV_ADD| l_flags | EV_CLEAR, l_fflags,0, a_esocket);
return kevent ( a_worker->kqueue_fd,&a_esocket->kqueue_event,1,NULL,0,NULL)==1 ? 0 : -1 ;
#else
#error "Unimplemented new esocket on worker callback for current platform"
#endif
......
......@@ -55,10 +55,16 @@ typedef int SOCKET;
#include <netinet/in.h>
#include <sys/eventfd.h>
#include <mqueue.h>
#elif defined (DAP_OS_UNIX)
#elif defined (DAP_OS_BSD)
#define DAP_EVENTS_CAPS_KQUEUE
#define DAP_EVENTS_CAPS_PIPE_POSIX
#define DAP_EVENTS_CAPS_EVENT_KEVENT
#define DAP_EVENTS_CAPS_QUEUE_KEVENT
#include <netinet/in.h>
#elif defined (DAP_OS_UNIX)
#define DAP_EVENTS_CAPS_POLL
#define DAP_EVENTS_CAPS_PIPE_POSIX
#define DAP_EVENTS_CAPS_EVENT_PIPE
#define DAP_EVENTS_CAPS_QUEUE_SOCKETPAIR
#include <netinet/in.h>
#elif defined (DAP_OS_WINDOWS)
......@@ -161,7 +167,7 @@ typedef struct dap_events_socket {
mqd_t mqd;
};
uint32_t mqd_id;
#elif defined DAP_EVENTS_CAPS_MSMQ
#elif defined(DAP_EVENTS_CAPS_MSMQ)
};
QUEUEHANDLE mqh, mqh_recv;
u_int mq_num;
......@@ -171,9 +177,11 @@ typedef struct dap_events_socket {
};
#endif
#if defined DAP_EVENTS_CAPS_PIPE_POSIX
#if defined(DAP_EVENTS_CAPS_PIPE_POSIX)
int fd2;
#endif
dap_events_desc_type_t type;
uint128_t uuid; // Unique UID
// Related sockets (be careful - possible problems, delete them before )
......@@ -232,6 +240,13 @@ typedef struct dap_events_socket {
#elif defined (DAP_EVENTS_CAPS_POLL)
short poll_base_flags;
uint32_t poll_index; // index in poll array on worker
#elif defined (DAP_EVENTS_CAPS_KQUEUE)
struct kevent kqueue_event;
short kqueue_base_filter;
u_short kqueue_base_flags;
u_int kqueue_base_fflags;
int64_t kqueue_data;
uint64_t kqueue_ext[4];
#endif
dap_events_socket_callbacks_t callbacks;
......
......@@ -52,6 +52,10 @@ typedef struct dap_proc_thread{
dap_events_socket_t ** esockets;
size_t poll_count;
size_t poll_count_max;
#elif defined (DAP_EVENTS_CAPS_KQUEUE)
int kqueue_fd;
struct kevent * kqueue_events;
int kqueue_events_count;
#else
#error "No poll for proc thread for your platform"
#endif
......
......@@ -39,6 +39,8 @@
#define MSG_DONTWAIT 0
#define MSG_NOSIGNAL 0
#include "winsock.h"
#elif defined(DAP_OS_BSD)
#else
#error "No poll headers for your platform"
#endif
......
......@@ -27,9 +27,13 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#if defined DAP_OS_UNIX
#if defined DAP_OS_LINUX
#include <sys/time.h>
#include <sys/timerfd.h>
#elif defined DAP_OS_BSD
#include <sys/event.h>
#elif defined DAP_OS_WINDOWS
#define _MSEC -10000
#endif
......
......@@ -65,6 +65,7 @@ typedef struct dap_worker
#if defined DAP_EVENTS_CAPS_MSMQ
HANDLE msmq_events[MAXIMUM_WAIT_OBJECTS];
#endif
#if defined DAP_EVENTS_CAPS_EPOLL
EPOLL_HANDLE epoll_fd;
#elif defined ( DAP_EVENTS_CAPS_POLL)
......@@ -74,6 +75,12 @@ typedef struct dap_worker
size_t poll_count;
size_t poll_count_max;
bool poll_compress; // Some of fd's became NULL so arrays need to be reassigned
#elif defined (DAP_EVENTS_CAPS_KQUEUE)
int kqueue_fd;
struct kevent * kqueue_events;
int kqueue_events_count_max;
#else
#error "Not defined worker for your platform"
#endif
pthread_cond_t started_cond;
pthread_mutex_t started_mutex;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment