From 2e19ddb2fb2a65085cdff20f857f216d975e7491 Mon Sep 17 00:00:00 2001 From: Dmitriy Gerasimov <naeper@demlabs.net> Date: Sat, 6 Mar 2021 17:43:45 +0700 Subject: [PATCH] [*] kevent fixes --- dap-sdk/core/libdap.pri | 3 + dap-sdk/net/core/dap_events_socket.c | 122 ++++++++++--------- dap-sdk/net/core/dap_proc_thread.c | 51 ++++---- dap-sdk/net/core/dap_worker.c | 69 +++++++---- dap-sdk/net/core/include/dap_events_socket.h | 1 + dap-sdk/net/core/include/dap_worker.h | 4 + 6 files changed, 145 insertions(+), 105 deletions(-) diff --git a/dap-sdk/core/libdap.pri b/dap-sdk/core/libdap.pri index 5dc33a68f3..ecab6682ce 100755 --- a/dap-sdk/core/libdap.pri +++ b/dap-sdk/core/libdap.pri @@ -26,6 +26,9 @@ darwin { DEFINES += DAP_OS_DARWIN DAP_OS_BSD LIBS+ = -lrt -ljson-c -lmagic QMAKE_LIBDIR += /usr/local/lib + + QMAKE_CFLAGS_DEBUG += -gdwarf-2 + QMAKE_CXXFLAGS_DEBUG += -gdwarf-2 } win32 { diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 077360faf5..eada001c92 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -288,7 +288,7 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c #elif defined(DAP_EVENTS_CAPS_POLL) l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; #elif defined(DAP_EVENTS_CAPS_KQUEUE) - l_es->kqueue_base_flags = EV_ADD | EV_ENABLE | EV_CLEAR; + l_es->kqueue_base_flags = EV_ENABLE | EV_CLEAR; l_es->kqueue_base_fflags = NOTE_DELETE | NOTE_REVOKE ; #if !defined(DAP_OS_DARWIN) l_es->kqueue_base_fflags |= NOTE_CLOSE | NOTE_CLOSE_WRITE ; @@ -449,8 +449,8 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket #elif defined(DAP_EVENTS_CAPS_KQUEUE) // We don't create descriptor for kqueue at all l_es->fd = -1; - l_es->kqueue_base_flags = EV_ADD | EV_ENABLE | EV_CLEAR; - l_es->kqueue_base_fflags = NOTE_TRIGGER; + l_es->pipe_out = a_es; + l_es->kqueue_base_flags = EV_ENABLE | EV_CLEAR | EV_ONESHOT; l_es->kqueue_base_filter = EVFILT_USER; #else #error "Not defined s_create_type_pipe for your platform" @@ -530,6 +530,7 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket l_es->flags = DAP_SOCK_QUEUE_PTR; l_es->_pvt = DAP_NEW_Z(struct queue_ptr_input_pvt); l_es->callbacks.delete_callback = s_socket_type_queue_ptr_input_callback_delete; + l_es->callbacks.queue_ptr_callback = a_es->callbacks.queue_ptr_callback; return l_es; } @@ -562,8 +563,7 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc #elif defined(DAP_EVENTS_CAPS_POLL) l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; #elif defined(DAP_EVENTS_CAPS_KQUEUE) - l_es->kqueue_base_flags = EV_ADD | EV_ENABLE | EV_CLEAR; - l_es->kqueue_base_fflags = NOTE_TRIGGER; + l_es->kqueue_base_flags = EV_ENABLE | EV_CLEAR |EV_ONESHOT; l_es->kqueue_base_filter = EVFILT_USER; #else #error "Not defined s_create_type_queue_ptr for your platform" @@ -871,7 +871,7 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) #elif defined DAP_EVENTS_CAPS_KQUEUE l_queue_ptr = (void*) a_esocket->kqueue_event_catched->ident; if(a_esocket->callbacks.queue_ptr_callback) - a_esocket->callbacks.queue_ptr_callback (a_esocket, l_queue_ptr); + a_esocket->callbacks.queue_ptr_callback (a_esocket, l_queue_ptr); #else #error "No Queue fetch mechanism implemented on your platform" #endif @@ -920,8 +920,7 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_ #elif defined(DAP_EVENTS_CAPS_POLL) l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; #elif defined(DAP_EVENTS_CAPS_KQUEUE) - l_es->kqueue_base_flags = EV_ADD | EV_ENABLE | EV_CLEAR; - l_es->kqueue_base_fflags = NOTE_TRIGGER; + l_es->kqueue_base_flags = EV_ENABLE | EV_CLEAR; l_es->kqueue_base_filter = EVFILT_USER; #else #error "Not defined s_create_type_event for your platform" @@ -1170,18 +1169,21 @@ int dap_events_socket_queue_ptr_send_to_input(dap_events_socket_t * a_es_input, void * l_arg = a_arg; #if defined (DAP_EVENTS_CAPS_KQUEUE) if (a_es_input->pipe_out){ - int l_ret; - struct kevent l_event={0}; - dap_events_socket_t * l_es = a_es_input->pipe_out; - EV_SET(&l_event,(uintptr_t) a_arg, l_es->kqueue_base_filter,l_es->kqueue_base_flags, l_es->kqueue_base_fflags,0, l_es); + int l_ret; + struct kevent l_event={0}; + dap_events_socket_t * l_es = a_es_input->pipe_out; + assert(l_es); + EV_SET(&l_event,(uintptr_t) a_arg, l_es->kqueue_base_filter,l_es->kqueue_base_flags | EV_ADD, l_es->kqueue_base_fflags,0, l_es); if(l_es->worker) - l_ret=kevent(l_es->worker->kqueue_fd,&l_event,1,NULL,0,NULL); + l_ret=kevent(l_es->worker->kqueue_fd,&l_event,1,NULL,0,NULL); else if (l_es->proc_thread) - l_ret=kevent(l_es->proc_thread->kqueue_fd,&l_event,1,NULL,0,NULL); - return l_ret==1?0 : -1; + l_ret=kevent(l_es->proc_thread->kqueue_fd,&l_event,1,NULL,0,NULL); + else + l_ret=-100; + return l_ret!=-1?0 : -1; }else{ - log_it(L_ERROR,"No pipe_out pointer for queue socket, possible created wrong"); - return -2; + log_it(L_ERROR,"No pipe_out pointer for queue socket, possible created wrong"); + return -2; } #else @@ -1263,18 +1265,23 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg) } #elif defined (DAP_EVENTS_CAPS_KQUEUE) struct kevent l_event={0}; - EV_SET(&l_event,(uintptr_t) a_arg, a_es->kqueue_base_filter,a_es->kqueue_base_flags, a_es->kqueue_base_fflags,0, a_es); + EV_SET(&l_event,(uintptr_t) a_arg, a_es->kqueue_base_filter,a_es->kqueue_base_flags| EV_ADD, a_es->kqueue_base_fflags,0, a_es); int l_n; if(a_es->worker) l_n = kevent(a_es->worker->kqueue_fd,&l_event,1,NULL,0,NULL); else if (a_es->proc_thread) l_n = kevent(a_es->proc_thread->kqueue_fd,&l_event,1,NULL,0,NULL); else { - log_it(L_WARNING,"Trying to send pointer in queue thats not assigned to any worker or proc thread"); - l_n = 0; + log_it(L_WARNING,"Trying to send pointer in queue thats not assigned to any worker or proc thread"); + l_n = 0; + } + if(l_n != -1 ){ + l_ret = sizeof (a_arg); + }else{ + log_it(L_ERROR,"Sending kevent error code %d", l_n); + l_errno = errno; + l_ret = -1; } - l_errno = errno; - l_ret = l_n==1? sizeof(a_arg) : -1; #else #error "Not implemented dap_events_socket_queue_ptr_send() for this platform" @@ -1321,7 +1328,7 @@ int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value } #elif defined (DAP_EVENTS_CAPS_KQUEUE) struct kevent l_event={0}; - EV_SET(&l_event,0, a_es->kqueue_base_filter,a_es->kqueue_base_flags, a_es->kqueue_base_fflags,a_value, a_es); + EV_SET(&l_event,0, a_es->kqueue_base_filter,a_es->kqueue_base_flags| EV_ADD, a_es->kqueue_base_fflags,a_value, a_es); int l_n; if(a_es->worker) l_n = kevent(a_es->worker->kqueue_fd,&l_event,1,NULL,0,NULL); @@ -1358,30 +1365,30 @@ void dap_events_socket_queue_on_remove_and_delete(dap_events_socket_t* a_es) dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct dap_events *a_events, int a_sock, dap_events_socket_callbacks_t *a_callbacks ) { - assert( a_events ); - assert( a_callbacks ); - assert( a_server ); - - //log_it( L_DEBUG,"Dap event socket wrapped around %d sock", a_sock ); - dap_events_socket_t * l_es = DAP_NEW_Z( dap_events_socket_t ); if (!l_es) return NULL; - - l_es->socket = a_sock; - l_es->events = a_events; - l_es->server = a_server; - l_es->uuid = dap_uuid_generate_uint128(); - memcpy(&l_es->callbacks,a_callbacks, sizeof ( l_es->callbacks) ); - l_es->buf_out_size_max = l_es->buf_in_size_max = DAP_EVENTS_SOCKET_BUF; - l_es->buf_in = a_callbacks->timer_callback ? NULL : DAP_NEW_Z_SIZE(byte_t, l_es->buf_in_size_max+1); - l_es->buf_out = a_callbacks->timer_callback ? NULL : DAP_NEW_Z_SIZE(byte_t, l_es->buf_out_size_max+1); - l_es->buf_in_size = l_es->buf_out_size = 0; - l_es->flags = DAP_SOCK_READY_TO_READ; - l_es->last_time_active = l_es->last_ping_request = time( NULL ); - - pthread_rwlock_wrlock( &a_events->sockets_rwlock ); - HASH_ADD_INT(a_events->sockets, socket, l_es); - pthread_rwlock_unlock( &a_events->sockets_rwlock ); - - return l_es; + assert( a_events ); + assert( a_callbacks ); + assert( a_server ); + + //log_it( L_DEBUG,"Dap event socket wrapped around %d sock", a_sock ); + dap_events_socket_t * l_es = DAP_NEW_Z( dap_events_socket_t ); if (!l_es) return NULL; + + l_es->socket = a_sock; + l_es->events = a_events; + l_es->server = a_server; + l_es->uuid = dap_uuid_generate_uint128(); + memcpy(&l_es->callbacks,a_callbacks, sizeof ( l_es->callbacks) ); + l_es->buf_out_size_max = l_es->buf_in_size_max = DAP_EVENTS_SOCKET_BUF; + l_es->buf_in = a_callbacks->timer_callback ? NULL : DAP_NEW_Z_SIZE(byte_t, l_es->buf_in_size_max+1); + l_es->buf_out = a_callbacks->timer_callback ? NULL : DAP_NEW_Z_SIZE(byte_t, l_es->buf_out_size_max+1); + l_es->buf_in_size = l_es->buf_out_size = 0; + l_es->flags = DAP_SOCK_READY_TO_READ; + l_es->last_time_active = l_es->last_ping_request = time( NULL ); + + pthread_rwlock_wrlock( &a_events->sockets_rwlock ); + HASH_ADD_INT(a_events->sockets, socket, l_es); + pthread_rwlock_unlock( &a_events->sockets_rwlock ); + + return l_es; } /** @@ -1462,18 +1469,18 @@ void dap_events_socket_worker_poll_update_unsafe(dap_events_socket_t * a_esocket if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE || a_esocket->flags &DAP_SOCK_CONNECTING ) l_filter |= EVFILT_WRITE; - EV_SET(l_event, a_esocket->socket, l_filter,l_flags,l_fflags,a_esocket->kqueue_data,a_esocket); + EV_SET(l_event, a_esocket->socket, l_filter,l_flags| EV_ADD,l_fflags,a_esocket->kqueue_data,a_esocket); if( a_esocket->worker){ - if ( kevent(a_esocket->worker->kqueue_fd,l_event,1,NULL,0,NULL)!=1 ){ - int l_errno = errno; - char l_errbuf[128]; - l_errbuf[0]=0; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it(L_ERROR,"Can't update client socket state on kqueue fd %d: \"%s\" (%d)", - a_esocket->worker->kqueue_fd, l_errbuf, l_errno); + if ( kevent(a_esocket->worker->kqueue_fd,l_event,1,NULL,0,NULL)!=1 ){ + int l_errno = errno; + char l_errbuf[128]; + l_errbuf[0]=0; + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + log_it(L_ERROR,"Can't update client socket state on kqueue fd %d: \"%s\" (%d)", + a_esocket->worker->kqueue_fd, l_errbuf, l_errno); } - } } + } #else #error "Not defined dap_events_socket_set_writable_unsafe for your platform" @@ -1618,9 +1625,8 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap // log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_worker->id ); #elif defined(DAP_EVENTS_CAPS_KQUEUE) if (a_es->socket != -1 ){ - struct kevent * l_event = &a_es->kqueue_event; + struct kevent * l_event = &a_es->kqueue_event; EV_SET(l_event, a_es->socket, 0 ,EV_DELETE, 0,0,a_es); - if ( kevent( a_worker->kqueue_fd,l_event,1,NULL,0,NULL) != 1 ) { int l_errno = errno; char l_errbuf[128]; diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index cc6a4ad277..cfa487b5a6 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -285,7 +285,7 @@ int dap_proc_thread_esocket_update_poll_flags(dap_proc_thread_t * a_thread, dap_ l_fflags |= NOTE_READ; if(a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) l_fflags |= NOTE_WRITE; - EV_SET(&a_esocket->kqueue_event , a_esocket->socket, l_filter, EV_ADD| l_flags | EV_CLEAR, l_fflags,0, a_esocket); + EV_SET(&a_esocket->kqueue_event , a_esocket->socket, l_filter, EV_ADD| l_flags, l_fflags,0, a_esocket); if( kevent ( a_thread->kqueue_fd,&a_esocket->kqueue_event,1,NULL,0,NULL)!=1 ){ log_it(L_CRITICAL, "Can't add descriptor in proc thread kqueue , err: %d", errno); @@ -482,13 +482,13 @@ static void * s_proc_thread_function(void * a_arg) for (size_t n = 0; n< dap_events_worker_get_count(); n++){ // Queue asssign - dap_proc_thread_assign_esocket_unsafe(l_thread, l_thread->queue_assign_input[n]); + dap_proc_thread_assign_esocket_unsafe(l_thread, l_thread->queue_assign_input[n]); // Queue IO - dap_proc_thread_assign_esocket_unsafe(l_thread, l_thread->queue_io_input[n]); + dap_proc_thread_assign_esocket_unsafe(l_thread, l_thread->queue_io_input[n]); // Queue callback - dap_proc_thread_assign_esocket_unsafe(l_thread, l_thread->queue_callback_input[n]); + dap_proc_thread_assign_esocket_unsafe(l_thread, l_thread->queue_callback_input[n]); } #else @@ -513,7 +513,7 @@ static void * s_proc_thread_function(void * a_arg) l_sockets_max = l_thread->poll_count; #elif defined (DAP_EVENTS_CAPS_KQUEUE) l_selected_sockets = kevent(l_thread->kqueue_fd,NULL,0,l_thread->kqueue_events,l_thread->kqueue_events_count_max,NULL); - l_sockets_max = l_thread->kqueue_events_count_max; + l_sockets_max = l_selected_sockets; #else #error "Unimplemented poll wait analog for this platform" #endif @@ -564,20 +564,21 @@ static void * s_proc_thread_function(void * a_arg) l_flag_pri = l_cur_events & POLLPRI; l_flag_msg = l_cur_events & POLLMSG; #elif defined (DAP_EVENTS_CAPS_KQUEUE) - l_cur = (dap_events_socket_t*) l_thread->kqueue_events[n].udata; + struct kevent * l_kevent = &l_thread->kqueue_events[n]; + l_cur = (dap_events_socket_t*) l_kevent->udata; assert(l_cur); - l_cur->kqueue_event_catched = &l_thread->kqueue_events[n]; + l_cur->kqueue_event_catched = l_kevent; #ifndef DAP_OS_DARWIN - u_int l_cur_events = l_thread->kqueue_events[n].flags; + u_int l_cur_events = l_thread->kqueue_events[n].fflags; #else uint32_t l_cur_events = l_thread->kqueue_events[n].fflags; #endif - l_flag_write = l_cur_events & EVFILT_WRITE; - l_flag_read = l_cur_events & EVFILT_READ; - l_flag_error = l_cur_events & EVFILT_EXCEPT; + l_flag_write = l_thread->kqueue_events[n].filter & EVFILT_WRITE; + l_flag_read = l_thread->kqueue_events[n].filter & EVFILT_READ; + l_flag_error = l_thread->kqueue_events[n].flags & EV_ERROR; l_flag_nval = l_flag_pri = l_flag_msg = l_flag_hup= 0; - l_flag_rdhup = l_cur_events & EVFILT_EXCEPT & NOTE_DELETE; + l_flag_rdhup = l_thread->kqueue_events[n].filter & EVFILT_EXCEPT && l_cur_events&&NOTE_DELETE; #else #error "Unimplemented fetch esocket after poll" #endif @@ -681,18 +682,18 @@ static void * s_proc_thread_function(void * a_arg) log_it(L_WARNING,"mq_send %p errno: %d", l_ptr_in, l_errno); } #elif defined (DAP_EVENTS_CAPS_KQUEUE) - struct kevent* l_event=&l_cur->kqueue_event; - void * l_ptr; - memcpy(l_ptr,l_cur->buf_out,sizeof(l_ptr) ); - - EV_SET(l_event,(uintptr_t) l_ptr, l_cur->kqueue_base_filter,l_cur->kqueue_base_flags, l_cur->kqueue_base_fflags,0, l_cur); - int l_n = kevent(l_thread->kqueue_fd,l_event,1,NULL,0,NULL); - if (l_n == 1) - l_bytes_sent = sizeof(l_ptr); - else{ - l_errno = errno; - log_it(L_WARNING,"queue ptr send error: kevent %p errno: %d", l_ptr, l_errno); - } + struct kevent* l_event=&l_cur->kqueue_event; + void * l_ptr; + memcpy(l_ptr,l_cur->buf_out,sizeof(l_ptr) ); + + EV_SET(l_event,(uintptr_t) l_ptr, l_cur->kqueue_base_filter,l_cur->kqueue_base_flags| EV_ADD, l_cur->kqueue_base_fflags,0, l_cur); + int l_n = kevent(l_thread->kqueue_fd,l_event,1,NULL,0,NULL); + if (l_n == 1) + l_bytes_sent = sizeof(l_ptr); + else{ + l_errno = errno; + log_it(L_WARNING,"queue ptr send error: kevent %p errno: %d", l_ptr, l_errno); + } #else #error "Not implemented dap_events_socket_queue_ptr_send() for this platform" #endif @@ -745,7 +746,7 @@ static void * s_proc_thread_function(void * a_arg) if (l_cur->socket != -1 ){ struct kevent * l_event = &l_cur->kqueue_event; EV_SET(l_event, l_cur->socket, 0 ,EV_DELETE, 0,0,l_cur); - if ( kevent( l_thread->kqueue_fd,l_event,1,NULL,0,NULL) != 1 ) { + if ( kevent( l_thread->kqueue_fd,l_event,1,NULL,0,NULL) != 1 ) { int l_errno = errno; char l_errbuf[128]; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index c028bc7c94..47c2a774e3 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -121,16 +121,17 @@ void *dap_worker_thread(void *arg) log_it(L_INFO, "Worker #%d started with epoll fd %d and assigned to dedicated CPU unit", l_worker->id, l_worker->epoll_fd); #elif defined(DAP_EVENTS_CAPS_KQUEUE) l_worker->kqueue_fd = kqueue(); - if (l_worker->kqueue_fd == -1 ){ - int l_errno = errno; - char l_errbuf[255]; - strerror_r(l_errno,l_errbuf,sizeof(l_errbuf)); - log_it (L_CRITICAL,"Can't create kqueue():\"\" code %d",l_errbuf,l_errno); - pthread_cond_broadcast(&l_worker->started_cond); - return NULL; + if (l_worker->kqueue_fd == -1 ){ + int l_errno = errno; + char l_errbuf[255]; + strerror_r(l_errno,l_errbuf,sizeof(l_errbuf)); + log_it (L_CRITICAL,"Can't create kqueue():\"\" code %d",l_errbuf,l_errno); + pthread_cond_broadcast(&l_worker->started_cond); + return NULL; } + l_worker->kqueue_events_selected_count_max = 100; l_worker->kqueue_events_count_max = DAP_EVENTS_SOCKET_MAX; - l_worker->kqueue_events = DAP_NEW_Z_SIZE(struct kevent, l_worker->kqueue_events_count_max *sizeof(struct kevent)); + l_worker->kqueue_events_selected = DAP_NEW_Z_SIZE(struct kevent, l_worker->kqueue_events_selected_count_max *sizeof(struct kevent)); #elif defined(DAP_EVENTS_CAPS_POLL) l_worker->poll_count_max = DAP_EVENTS_SOCKET_MAX; l_worker->poll = DAP_NEW_Z_SIZE(struct pollfd,l_worker->poll_count_max*sizeof (struct pollfd)); @@ -166,8 +167,9 @@ void *dap_worker_thread(void *arg) l_selected_sockets = poll(l_worker->poll, l_worker->poll_count, -1); l_sockets_max = l_worker->poll_count; #elif defined(DAP_EVENTS_CAPS_KQUEUE) - l_selected_sockets = kevent(l_worker->kqueue_fd,NULL,0,l_worker->kqueue_events,l_worker->kqueue_events_count_max,NULL); - l_sockets_max = l_worker->kqueue_events_count_max; + l_selected_sockets = kevent(l_worker->kqueue_fd,NULL,0,l_worker->kqueue_events_selected,l_worker->kqueue_events_selected_count_max, + NULL); + l_sockets_max = l_selected_sockets; #else #error "Unimplemented poll wait analog for this platform" #endif @@ -219,7 +221,7 @@ void *dap_worker_thread(void *arg) l_cur = l_worker->poll_esocket[n]; //log_it(L_DEBUG, "flags: returned events 0x%0X requested events 0x%0X",l_worker->poll[n].revents,l_worker->poll[n].events ); #elif defined (DAP_EVENTS_CAPS_KQUEUE) - struct kevent * l_kevent_selected = &l_worker->kqueue_events[n]; + struct kevent * l_kevent_selected = &l_worker->kqueue_events_selected[n]; l_cur = (dap_events_socket_t*) l_kevent_selected->udata; assert(l_cur); l_cur->kqueue_event_catched = l_kevent_selected; @@ -230,11 +232,11 @@ void *dap_worker_thread(void *arg) #endif - l_flag_write = l_cur_flags & EVFILT_WRITE; - l_flag_read = l_cur_flags & EVFILT_READ; - l_flag_error = l_cur_flags & EVFILT_EXCEPT; + l_flag_write = l_kevent_selected->filter & EVFILT_WRITE; + l_flag_read = l_kevent_selected->filter & EVFILT_READ; + l_flag_error = l_kevent_selected->flags & EV_ERROR; l_flag_nval = l_flag_pri = l_flag_msg = l_flag_hup= 0; - l_flag_rdhup = l_cur_flags & EVFILT_EXCEPT & NOTE_DELETE; + l_flag_rdhup = l_kevent_selected->filter & EVFILT_EXCEPT && l_kevent_selected->fflags && NOTE_DELETE; #else @@ -696,6 +698,9 @@ void *dap_worker_thread(void *arg) log_it(L_INFO, "Process signal to close %s sock %u type %d [thread %u]", l_cur->remote_addr_str ? l_cur->remote_addr_str : "", l_cur->socket, l_cur->type, l_tn); dap_events_socket_remove_and_delete_unsafe( l_cur, false); +#ifdef DAP_EVENTS_CAPS_KQUEUE + l_worker->kqueue_events_count--; +#endif } else if (l_cur->buf_out_size ) { if(s_debug_reactor) log_it(L_INFO, "Got signal to close %s sock %u [thread %u] type %d but buffer is not empty(%zd)", @@ -1016,13 +1021,33 @@ int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_wo u_short l_flags = a_esocket->kqueue_base_flags; u_int l_fflags = a_esocket->kqueue_base_fflags; short l_filter = a_esocket->kqueue_base_filter; - if(a_esocket->flags & DAP_SOCK_READY_TO_READ ) - l_fflags |= NOTE_READ; - if(a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) - l_fflags |= NOTE_WRITE; - - EV_SET(&a_esocket->kqueue_event , a_esocket->socket, l_filter, EV_ADD| l_flags | EV_CLEAR, l_fflags,0, a_esocket); - return kevent ( a_worker->kqueue_fd,&a_esocket->kqueue_event,1,NULL,0,NULL)==1 ? 0 : errno; + if(a_esocket->flags & DAP_SOCK_READY_TO_READ ) + l_fflags |= NOTE_READ; + if(a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) + l_fflags |= NOTE_WRITE; + + EV_SET(&a_esocket->kqueue_event , a_esocket->socket, l_filter, EV_ADD| l_flags | EV_CLEAR, l_fflags,0, a_esocket); + if(kevent ( a_worker->kqueue_fd,&a_esocket->kqueue_event,1,NULL,0,NULL) != -1 ){ + a_worker->kqueue_events_count++; +/* if ( a_worker->kqueue_events_count == (size_t) a_worker->kqueue_events_count_max ){ // realloc + if(a_worker->kqueue_events_count_max > (INT32_MAX -a_worker->kqueue_events_count_max ) ){ + log_it(L_CRITICAL,"Reached esocket count limit, can't add more than %d", a_worker->kqueue_events_count_max); + a_worker->kqueue_events_count--; + return -999; + } + a_worker->kqueue_events_count_max *= 2; + log_it(L_WARNING, "Too many descriptors (%u), resizing array twice to %u", a_worker->poll_count, a_worker->poll_count_max); + a_worker->poll =DAP_REALLOC(a_worker->poll, a_worker->poll_count_max * sizeof(*a_worker->poll)); + a_worker->poll_esocket =DAP_REALLOC(a_worker->poll_esocket, a_worker->poll_count_max * sizeof(*a_worker->poll_esocket)); + }*/ + return 0; + }else{ + int l_errno = errno; + char l_errbuf[128]={}; + strerror_r(errno,l_errbuf,sizeof (l_errbuf)); + log_it(L_ERROR,"Can't add socket on kqueue: %s (code %d)", l_errbuf,l_errno); + return l_errno; + } #else #error "Unimplemented new esocket on worker callback for current platform" #endif diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 0732a3f795..e416cd9a5f 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -245,6 +245,7 @@ typedef struct dap_events_socket { short kqueue_base_filter; unsigned short kqueue_base_flags; unsigned int kqueue_base_fflags; + int64_t kqueue_data; #endif diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/net/core/include/dap_worker.h index df6591861d..ea74cda3c9 100644 --- a/dap-sdk/net/core/include/dap_worker.h +++ b/dap-sdk/net/core/include/dap_worker.h @@ -77,8 +77,12 @@ typedef struct dap_worker bool poll_compress; // Some of fd's became NULL so arrays need to be reassigned #elif defined (DAP_EVENTS_CAPS_KQUEUE) int kqueue_fd; + struct kevent * kqueue_events_selected; struct kevent * kqueue_events; + size_t kqueue_events_count; + int kqueue_events_count_max; + int kqueue_events_selected_count_max; #else #error "Not defined worker for your platform" #endif -- GitLab