diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index eada001c9206369143201f95135b0b0ebbdf646d..83a9ca25d10f3f5fc922b3f32204d1b8f8cef4a0 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -182,6 +182,7 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, #elif defined(DAP_EVENTS_CAPS_POLL) l_ret->poll_base_flags = POLLERR | POLLRDHUP | POLLHUP; #elif defined(DAP_EVENTS_CAPS_KQUEUE) + l_ret->kqueue_event_catched_data.esocket = l_ret; l_ret->kqueue_base_flags = EV_ADD | EV_ENABLE | EV_CLEAR; l_ret->kqueue_base_fflags = NOTE_DELETE | NOTE_REVOKE ; @@ -195,9 +196,7 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, pthread_rwlock_wrlock(&a_events->sockets_rwlock); HASH_ADD_INT(a_events->sockets, socket, l_ret); pthread_rwlock_unlock(&a_events->sockets_rwlock); - }else if(s_debug_reactor) - log_it(L_WARNING, "Be carefull, you've wrapped socket 0 or -1 so it wasn't added to global list. Do it yourself when possible"); - + } //log_it( L_DEBUG,"Dap event socket wrapped around %d sock a_events = %X", a_sock, a_events ); return l_ret; @@ -288,6 +287,7 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c #elif defined(DAP_EVENTS_CAPS_POLL) l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; #elif defined(DAP_EVENTS_CAPS_KQUEUE) + l_es->kqueue_event_catched_data.esocket = l_es; l_es->kqueue_base_flags = EV_ENABLE | EV_CLEAR; l_es->kqueue_base_fflags = NOTE_DELETE | NOTE_REVOKE ; #if !defined(DAP_OS_DARWIN) @@ -447,11 +447,14 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket #elif defined(DAP_EVENTS_CAPS_POLL) l_es->poll_base_flags = POLLERR | POLLRDHUP | POLLHUP; #elif defined(DAP_EVENTS_CAPS_KQUEUE) - // We don't create descriptor for kqueue at all - l_es->fd = -1; + // Here we have event identy thats we copy + l_es->fd = a_es->fd; // l_es->pipe_out = a_es; - l_es->kqueue_base_flags = EV_ENABLE | EV_CLEAR | EV_ONESHOT; + l_es->kqueue_base_flags = EV_ENABLE; + l_es->kqueue_base_fflags = NOTE_TRIGGER; l_es->kqueue_base_filter = EVFILT_USER; + l_es->kqueue_event_catched_data.esocket = l_es; + #else #error "Not defined s_create_type_pipe for your platform" #endif @@ -522,7 +525,6 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket } #elif defined (DAP_EVENTS_CAPS_KQUEUE) // We don't create descriptor for kqueue at all - l_es->fd = l_es->fd2 = -1; #else #error "Not defined dap_events_socket_queue_ptr_create_input() for this platform" #endif @@ -563,8 +565,11 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc #elif defined(DAP_EVENTS_CAPS_POLL) l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; #elif defined(DAP_EVENTS_CAPS_KQUEUE) - l_es->kqueue_base_flags = EV_ENABLE | EV_CLEAR |EV_ONESHOT; + l_es->kqueue_event_catched_data.esocket = l_es; + l_es->kqueue_base_flags = EV_CLEAR; + l_es->kqueue_base_fflags = 0; l_es->kqueue_base_filter = EVFILT_USER; + l_es->socket = arc4random(); #else #error "Not defined s_create_type_queue_ptr for your platform" #endif @@ -754,8 +759,6 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc } #elif defined (DAP_EVENTS_CAPS_KQUEUE) // We don't create descriptor for kqueue at all - l_es->fd = l_es->fd2 = -1; - #else #error "Not implemented s_create_type_queue_ptr() on your platform" #endif @@ -869,7 +872,7 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) a_esocket->callbacks.queue_ptr_callback (a_esocket, l_queue_ptr); } #elif defined DAP_EVENTS_CAPS_KQUEUE - l_queue_ptr = (void*) a_esocket->kqueue_event_catched->ident; + l_queue_ptr = (void*) a_esocket->kqueue_event_catched_data.data; if(a_esocket->callbacks.queue_ptr_callback) a_esocket->callbacks.queue_ptr_callback (a_esocket, l_queue_ptr); #else @@ -883,8 +886,8 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) return -1; } #elif defined (DAP_EVENTS_CAPS_KQUEUE) - void * l_queue_ptr = (void*) a_esocket->kqueue_event_catched->ident; - size_t l_queue_ptr_size = (size_t) a_esocket->kqueue_event_catched->data; + void * l_queue_ptr = a_esocket->kqueue_event_catched_data.data; + size_t l_queue_ptr_size = a_esocket->kqueue_event_catched_data.size; a_esocket->callbacks.queue_callback(a_esocket, l_queue_ptr, l_queue_ptr_size); #else size_t l_read = read(a_esocket->socket, a_esocket->buf_in, a_esocket->buf_in_size_max ); @@ -920,8 +923,10 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_ #elif defined(DAP_EVENTS_CAPS_POLL) l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; #elif defined(DAP_EVENTS_CAPS_KQUEUE) - l_es->kqueue_base_flags = EV_ENABLE | EV_CLEAR; + l_es->kqueue_base_flags = EV_CLEAR; l_es->kqueue_base_filter = EVFILT_USER; + l_es->socket = arc4random(); + l_es->kqueue_event_catched_data.esocket = l_es; #else #error "Not defined s_create_type_event for your platform" #endif @@ -984,7 +989,7 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_ //log_it(L_DEBUG, "Bound to port %d", l_addr.sin_port); } #elif defined(DAP_EVENTS_CAPS_KQUEUE) - l_es->fd2 = l_es->fd = -1; + // nothing to do #else #error "Not defined s_create_type_event() on your platform" #endif @@ -1055,8 +1060,7 @@ void dap_events_socket_event_proc_input_unsafe(dap_events_socket_t *a_esocket) return; } #elif defined (DAP_EVENTS_CAPS_KQUEUE) - unsigned int l_value = (unsigned int) a_esocket->kqueue_event_catched->data ; - a_esocket->callbacks.event_callback(a_esocket, l_value); + a_esocket->callbacks.event_callback(a_esocket, a_esocket->kqueue_event_catched_data.value); #else #error "No Queue fetch mechanism implemented on your platform" @@ -1166,27 +1170,32 @@ static void add_ptr_to_buf(dap_events_socket_t * a_es, void* a_arg) */ int dap_events_socket_queue_ptr_send_to_input(dap_events_socket_t * a_es_input, void * a_arg) { - void * l_arg = a_arg; #if defined (DAP_EVENTS_CAPS_KQUEUE) if (a_es_input->pipe_out){ int l_ret; struct kevent l_event={0}; dap_events_socket_t * l_es = a_es_input->pipe_out; assert(l_es); - EV_SET(&l_event,(uintptr_t) a_arg, l_es->kqueue_base_filter,l_es->kqueue_base_flags | EV_ADD, l_es->kqueue_base_fflags,0, l_es); + + dap_events_socket_w_data_t * l_es_w_data = DAP_NEW_Z(dap_events_socket_w_data_t); + l_es_w_data->esocket = l_es; + l_es_w_data->ptr = a_arg; + + EV_SET(&l_event,a_es_input->socket, EVFILT_USER,EV_ENABLE, NOTE_FFCOPY|NOTE_TRIGGER ,0, l_es_w_data); if(l_es->worker) l_ret=kevent(l_es->worker->kqueue_fd,&l_event,1,NULL,0,NULL); else if (l_es->proc_thread) l_ret=kevent(l_es->proc_thread->kqueue_fd,&l_event,1,NULL,0,NULL); else l_ret=-100; - return l_ret!=-1?0 : -1; + return l_ret!=-1 ?0 : -1; }else{ log_it(L_ERROR,"No pipe_out pointer for queue socket, possible created wrong"); return -2; } #else + void * l_arg = a_arg; /*if (a_es_input->buf_out_size >= sizeof(void*)) { if (memcmp(a_es_input->buf_out + a_es_input->buf_out_size - sizeof(void*), a_arg, sizeof(void*))) { log_it(L_INFO, "Ptr 0x%x already present in input, drop it", a_arg); @@ -1265,7 +1274,13 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg) } #elif defined (DAP_EVENTS_CAPS_KQUEUE) struct kevent l_event={0}; - EV_SET(&l_event,(uintptr_t) a_arg, a_es->kqueue_base_filter,a_es->kqueue_base_flags| EV_ADD, a_es->kqueue_base_fflags,0, a_es); + dap_events_socket_w_data_t * l_es_w_data = DAP_NEW_Z(dap_events_socket_w_data_t); + if(!l_es_w_data ) // Out of memory + return -666; + + l_es_w_data->esocket = a_es; + l_es_w_data->ptr = a_arg; + EV_SET(&l_event,a_es->socket, EVFILT_USER,EV_ENABLE, NOTE_FFCOPY|NOTE_TRIGGER ,0, l_es_w_data); int l_n; if(a_es->worker) l_n = kevent(a_es->worker->kqueue_fd,&l_event,1,NULL,0,NULL); @@ -1275,12 +1290,14 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg) log_it(L_WARNING,"Trying to send pointer in queue thats not assigned to any worker or proc thread"); l_n = 0; } - if(l_n != -1 ){ + + if(l_n == 0 ){ l_ret = sizeof (a_arg); }else{ - log_it(L_ERROR,"Sending kevent error code %d", l_n); l_errno = errno; + log_it(L_ERROR,"Sending kevent error code %d", l_n); l_ret = -1; + DAP_DELETE(l_es_w_data); } #else @@ -1328,7 +1345,12 @@ int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value } #elif defined (DAP_EVENTS_CAPS_KQUEUE) struct kevent l_event={0}; - EV_SET(&l_event,0, a_es->kqueue_base_filter,a_es->kqueue_base_flags| EV_ADD, a_es->kqueue_base_fflags,a_value, a_es); + dap_events_socket_w_data_t * l_es_w_data = DAP_NEW_Z(dap_events_socket_w_data_t); + l_es_w_data->esocket = a_es; + l_es_w_data->value = a_value; + + EV_SET(&l_event,a_es->socket, EVFILT_USER,EV_ENABLE | EV_ONESHOT, NOTE_FFCOPY|NOTE_TRIGGER ,(intptr_t) a_es->socket, l_es_w_data); + int l_n; if(a_es->worker) l_n = kevent(a_es->worker->kqueue_fd,&l_event,1,NULL,0,NULL); @@ -1456,7 +1478,7 @@ void dap_events_socket_worker_poll_update_unsafe(dap_events_socket_t * a_esocket } } #elif defined (DAP_EVENTS_CAPS_KQUEUE) - if (a_esocket->socket != -1 ){ // Not everything we add in poll + if (a_esocket->socket != -1 ){ // Not everything we add in poll struct kevent * l_event = &a_esocket->kqueue_event; short l_filter =a_esocket->kqueue_base_filter; u_short l_flags =a_esocket->kqueue_base_flags; @@ -1465,11 +1487,16 @@ void dap_events_socket_worker_poll_update_unsafe(dap_events_socket_t * a_esocket // Check & add if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) - l_filter |= EVFILT_READ; + l_filter |= EVFILT_READ; if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE || a_esocket->flags &DAP_SOCK_CONNECTING ) - l_filter |= EVFILT_WRITE; + l_filter |= EVFILT_WRITE; - EV_SET(l_event, a_esocket->socket, l_filter,l_flags| EV_ADD,l_fflags,a_esocket->kqueue_data,a_esocket); + if (a_esocket->type == DESCRIPTOR_TYPE_EVENT || a_esocket->type == DESCRIPTOR_TYPE_QUEUE) + EV_SET(l_event, a_esocket->socket, EVFILT_USER,EV_ADD ,NOTE_FFCOPY,0, &a_esocket->kqueue_event_catched_data ); + else + EV_SET(l_event, a_esocket->socket, l_filter,l_flags| EV_ADD,l_fflags,a_esocket->kqueue_data,a_esocket); + + if( a_esocket->worker){ if ( kevent(a_esocket->worker->kqueue_fd,l_event,1,NULL,0,NULL)!=1 ){ int l_errno = errno; diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/net/core/dap_proc_queue.c index 8172eb8d60b660e6eab79afc09a12cf3bd83fc0f..780e939d0ea7bb202cd7e1bb1fcef3529341391c 100644 --- a/dap-sdk/net/core/dap_proc_queue.c +++ b/dap-sdk/net/core/dap_proc_queue.c @@ -45,6 +45,7 @@ dap_proc_queue_t * dap_proc_queue_create(dap_proc_thread_t * a_thread) dap_proc_queue_t * l_queue = DAP_NEW_Z(dap_proc_queue_t); if (!l_queue) return NULL; l_queue->proc_thread = a_thread; l_queue->esocket = dap_events_socket_create_type_queue_ptr_unsafe(NULL,s_queue_esocket_callback); + l_queue->esocket->proc_thread = a_thread; l_queue->esocket->_inheritor = l_queue; return l_queue; } diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index a38fcfdb53cd980223a224d805da9a2815c06d56..7da6ecd4b21c9b4b1c553a90958397abbace1027 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -198,6 +198,10 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_va */ int dap_proc_thread_assign_esocket_unsafe(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket) { + assert(a_esocket); + assert(a_thread); + a_esocket->proc_thread = a_thread; + #ifdef DAP_EVENTS_CAPS_EPOLL // Init events for EPOLL a_esocket->ev.events = a_esocket->ev_base_flags ; @@ -220,7 +224,7 @@ int dap_proc_thread_assign_esocket_unsafe(dap_proc_thread_t * a_thread, dap_even a_thread->esockets[a_thread->poll_count] = a_thread->proc_queue->esocket; a_thread->poll_count++; #elif defined (DAP_EVENTS_CAPS_KQUEUE) - u_short l_flags = a_esocket->kqueue_base_flags; +/* u_short l_flags = a_esocket->kqueue_base_flags; u_int l_fflags = a_esocket->kqueue_base_fflags; short l_filter = a_esocket->kqueue_base_filter; if(a_esocket->flags & DAP_SOCK_READY_TO_READ ) @@ -230,7 +234,8 @@ int dap_proc_thread_assign_esocket_unsafe(dap_proc_thread_t * a_thread, dap_even EV_SET(&a_esocket->kqueue_event , a_esocket->socket, l_filter, EV_ADD| l_flags | EV_CLEAR, l_fflags,0, a_esocket); return kevent ( a_thread->kqueue_fd,&a_esocket->kqueue_event,1,NULL,0,NULL)==1 ? 0 : -1 ; - +*/ + // Nothing to do #else #error "Unimplemented new esocket on worker callback for current platform" #endif @@ -282,17 +287,28 @@ int dap_proc_thread_esocket_update_poll_flags(dap_proc_thread_t * a_thread, dap_ a_thread->poll[a_esocket->poll_index].revents |= POLLOUT; #elif defined (DAP_EVENTS_CAPS_KQUEUE) + u_short l_flags = a_esocket->kqueue_base_flags; u_int l_fflags = a_esocket->kqueue_base_fflags; short l_filter = a_esocket->kqueue_base_filter; if(a_esocket->flags & DAP_SOCK_READY_TO_READ ) - l_fflags |= NOTE_READ; + l_fflags |= NOTE_READ; if(a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) - l_fflags |= NOTE_WRITE; - EV_SET(&a_esocket->kqueue_event , a_esocket->socket, l_filter, EV_ADD| l_flags, l_fflags,0, a_esocket); - - if( kevent ( a_thread->kqueue_fd,&a_esocket->kqueue_event,1,NULL,0,NULL)!=1 ){ - log_it(L_CRITICAL, "Can't add descriptor in proc thread kqueue , err: %d", errno); + l_fflags |= NOTE_WRITE; + if (a_esocket->type == DESCRIPTOR_TYPE_EVENT || a_esocket->type == DESCRIPTOR_TYPE_QUEUE) + EV_SET(&a_esocket->kqueue_event , a_esocket->socket, EVFILT_USER,EV_ADD ,NOTE_FFCOPY, + a_esocket->kqueue_data, + &a_esocket->kqueue_event_catched_data ); + else + EV_SET(&a_esocket->kqueue_event , a_esocket->socket, l_filter, EV_ADD| l_flags, l_fflags, + a_esocket->kqueue_data, a_esocket); + int l_ret = kevent ( a_thread->kqueue_fd,&a_esocket->kqueue_event,1,NULL,0,NULL); + if( l_ret !=1 ){ + int l_errno = errno; + char l_errbuf[128]; + l_errbuf[0]='\0'; + strerror_r(l_errno,l_errbuf,sizeof (l_errbuf)); + log_it(L_CRITICAL, "Can't add descriptor in proc thread kqueue , err: %d (%s)", l_errno, l_errbuf); return -1; } @@ -569,8 +585,21 @@ static void * s_proc_thread_function(void * a_arg) l_flag_msg = l_cur_events & POLLMSG; #elif defined (DAP_EVENTS_CAPS_KQUEUE) struct kevent * l_kevent = &l_thread->kqueue_events[n]; - l_cur = (dap_events_socket_t*) l_kevent->udata; - assert(l_cur); + if (l_kevent->filter & EVFILT_USER){ + dap_events_socket_w_data_t * l_es_w_data = (dap_events_socket_w_data_t*) l_kevent->udata; + assert(l_es_w_data); + l_cur = l_es_w_data->esocket; + assert(l_cur); + memcpy(&l_cur->kqueue_event_catched_data,l_es_w_data,sizeof(*l_es_w_data)); + if(l_es_w_data != &l_cur->kqueue_event_catched_data ) + DAP_DELETE(l_es_w_data); + else if (s_debug_reactor) + log_it(L_DEBUG,"Own event signal without actual event data"); + l_kevent->filter |= EVFILT_READ; + }else{ + l_cur = (dap_events_socket_t*) l_kevent->udata; + assert(l_cur); + } l_cur->kqueue_event_catched = l_kevent; #ifndef DAP_OS_DARWIN u_int l_cur_events = l_thread->kqueue_events[n].fflags; @@ -578,11 +607,11 @@ static void * s_proc_thread_function(void * a_arg) uint32_t l_cur_events = l_thread->kqueue_events[n].fflags; #endif - l_flag_write = l_thread->kqueue_events[n].filter & EVFILT_WRITE; - l_flag_read = l_thread->kqueue_events[n].filter & EVFILT_READ; - l_flag_error = l_thread->kqueue_events[n].flags & EV_ERROR; + l_flag_write = l_kevent->filter & EVFILT_WRITE; + l_flag_read = l_kevent->filter & EVFILT_READ; + l_flag_error = l_kevent->flags & EV_ERROR; l_flag_nval = l_flag_pri = l_flag_msg = l_flag_hup= 0; - l_flag_rdhup = l_thread->kqueue_events[n].filter & EVFILT_EXCEPT && l_cur_events&&NOTE_DELETE; + l_flag_rdhup = l_kevent->filter & EVFILT_EXCEPT && l_cur_events&&NOTE_DELETE; #else #error "Unimplemented fetch esocket after poll" #endif @@ -687,16 +716,17 @@ static void * s_proc_thread_function(void * a_arg) } #elif defined (DAP_EVENTS_CAPS_KQUEUE) struct kevent* l_event=&l_cur->kqueue_event; - void * l_ptr; - memcpy(l_ptr,l_cur->buf_out,sizeof(l_ptr) ); - - EV_SET(l_event,(uintptr_t) l_ptr, l_cur->kqueue_base_filter,l_cur->kqueue_base_flags| EV_ADD, l_cur->kqueue_base_fflags,0, l_cur); + dap_events_socket_w_data_t * l_es_w_data = DAP_NEW_Z(dap_events_socket_w_data_t); + l_es_w_data->esocket = l_cur; + memcpy(&l_es_w_data->ptr,l_cur->buf_out,sizeof(l_es_w_data->ptr) ); + EV_SET(l_event,l_cur->socket, l_cur->kqueue_base_filter,l_cur->kqueue_base_flags| EV_ADD, l_cur->kqueue_base_fflags,0, l_es_w_data); int l_n = kevent(l_thread->kqueue_fd,l_event,1,NULL,0,NULL); if (l_n == 1) - l_bytes_sent = sizeof(l_ptr); + l_bytes_sent = sizeof(l_es_w_data->ptr); else{ - l_errno = errno; - log_it(L_WARNING,"queue ptr send error: kevent %p errno: %d", l_ptr, l_errno); + l_errno = errno; + log_it(L_WARNING,"queue ptr send error: kevent %p errno: %d", l_es_w_data->ptr, l_errno); + DAP_DELETE(l_es_w_data->ptr); } #else #error "Not implemented dap_events_socket_queue_ptr_send() for this platform" @@ -751,12 +781,12 @@ static void * s_proc_thread_function(void * a_arg) struct kevent * l_event = &l_cur->kqueue_event; EV_SET(l_event, l_cur->socket, 0 ,EV_DELETE, 0,0,l_cur); if ( kevent( l_thread->kqueue_fd,l_event,1,NULL,0,NULL) != 1 ) { - int l_errno = errno; - char l_errbuf[128]; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it( L_ERROR,"Can't remove event socket's handler %d from the epoll_fd %d \"%s\" (%d)", l_cur->socket, + int l_errno = errno; + char l_errbuf[128]; + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + log_it( L_ERROR,"Can't remove event socket's handler %d from the epoll_fd %d \"%s\" (%d)", l_cur->socket, l_thread->kqueue_fd, l_errbuf, l_errno); - } + } } #else diff --git a/dap-sdk/net/core/dap_timerfd.c b/dap-sdk/net/core/dap_timerfd.c index 008bde977763b4a395fee2c464430be941e303d1..d1be2687d867c1cc410fe30a23f5c0feddcd0237 100644 --- a/dap-sdk/net/core/dap_timerfd.c +++ b/dap-sdk/net/core/dap_timerfd.c @@ -154,16 +154,21 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t } l_events_socket->socket = l_tfd; #elif defined (DAP_OS_BSD) - l_events_socket->kqueue_base_flags = EV_ADD | EV_ONESHOT | EV_DISPATCH; + l_events_socket->kqueue_base_flags = EV_ONESHOT; l_events_socket->kqueue_base_filter = EVFILT_TIMER; + l_events_socket->socket = arc4random(); #ifdef DAP_OS_DARWIN - l_events_socket->kqueue_base_fflags = NOTE_USECONDS; - l_events_socket->kqueue_data =(int64_t) a_timeout_ms*1000; + // We have all timers not accurate but more power safe + // Usualy we don't need exactly 1-5-10 seconds so let it be so + // TODO make absolute timer without power-saving flags + l_events_socket->kqueue_base_fflags = NOTE_BACKGROUND; + l_events_socket->kqueue_data =(int64_t) a_timeout_ms; #else l_events_socket->kqueue_base_fflags = NOTE_MSECONDS; l_events_socket->kqueue_data =(int64_t) a_timeout_ms; #endif - l_events_socket->socket = rand(); + + #elif defined (DAP_OS_WINDOWS) HANDLE l_th = CreateWaitableTimer(NULL, true, NULL); if (!l_th) { diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 9467b64728d922b74b17629f43ceb468cdd56eb0..02dcc1f05142ec38257910becb9382a8959b42e6 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -158,8 +158,9 @@ void *dap_worker_thread(void *arg) l_worker->queue_callback = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_callback_callback); l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback); - l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker, s_connection_timeout * 1000 / 2, - s_socket_all_check_activity, l_worker); + l_worker->timer_check_activity = dap_timerfd_create(s_connection_timeout * 1000 / 2, + s_socket_all_check_activity, l_worker); + dap_worker_add_events_socket_unsafe( l_worker->timer_check_activity->events_socket, l_worker); pthread_cond_broadcast(&l_worker->started_cond); bool s_loop_is_active = true; while(s_loop_is_active) { @@ -227,7 +228,21 @@ void *dap_worker_thread(void *arg) //log_it(L_DEBUG, "flags: returned events 0x%0X requested events 0x%0X",l_worker->poll[n].revents,l_worker->poll[n].events ); #elif defined (DAP_EVENTS_CAPS_KQUEUE) struct kevent * l_kevent_selected = &l_worker->kqueue_events_selected[n]; - l_cur = (dap_events_socket_t*) l_kevent_selected->udata; + if ( l_kevent_selected->filter & EVFILT_USER){ // If we have USER event it sends little different pointer + dap_events_socket_w_data_t * l_es_w_data = (dap_events_socket_w_data_t *) l_kevent_selected->udata; + l_cur = l_es_w_data->esocket; + assert(l_cur); + memcpy(&l_cur->kqueue_event_catched_data, l_es_w_data, sizeof (*l_es_w_data)); // Copy event info for further processing + l_kevent_selected->filter |= EVFILT_READ; // We have all logic expected read flag on because of different realizations + void * l_ptr = &l_cur->kqueue_event_catched_data; + if(l_es_w_data != l_ptr){ + DAP_DELETE(l_es_w_data); + }else if (s_debug_reactor){ + log_it(L_DEBUG,"Own event signal without actual event data"); + } + }else{ + l_cur = (dap_events_socket_t*) l_kevent_selected->udata; + } assert(l_cur); l_cur->kqueue_event_catched = l_kevent_selected; #ifndef DAP_OS_DARWIN @@ -638,22 +653,27 @@ void *dap_worker_thread(void *arg) if (l_bytes_sent == -1 && l_errno == EINVAL) // To make compatible with other l_errno = EAGAIN; // non-blocking sockets #elif defined (DAP_EVENTS_CAPS_KQUEUE) - struct kevent* l_event=&l_cur->kqueue_event; - void * l_ptr; - memcpy(l_ptr,l_cur->buf_out,sizeof(l_ptr) ); - EV_SET(l_event,(uintptr_t) l_ptr, l_cur->kqueue_base_filter,l_cur->kqueue_base_flags, l_cur->kqueue_base_fflags,l_cur->kqueue_data, l_cur); - int l_n = kevent(l_worker->kqueue_fd,l_event,1,NULL,0,NULL); - if (l_n == 1) - l_bytes_sent = sizeof(l_ptr); - else{ - l_errno = errno; - log_it(L_WARNING,"queue ptr send error: kevent %p errno: %d", l_ptr, l_errno); - } + struct kevent* l_event=&l_cur->kqueue_event; + dap_events_socket_w_data_t * l_es_w_data = DAP_NEW_Z(dap_events_socket_w_data_t); + l_es_w_data->esocket = l_cur; + memcpy(&l_es_w_data->ptr, l_cur->buf_out,sizeof(l_cur)); + EV_SET(l_event,l_cur->socket, l_cur->kqueue_base_filter,l_cur->kqueue_base_flags, l_cur->kqueue_base_fflags,l_cur->kqueue_data, l_es_w_data); + int l_n = kevent(l_worker->kqueue_fd,l_event,1,NULL,0,NULL); + if (l_n == 1){ + l_bytes_sent = sizeof(l_cur); + }else{ + l_errno = errno; + log_it(L_WARNING,"queue ptr send error: kevent %p errno: %d", l_es_w_data, l_errno); + DAP_DELETE(l_es_w_data); + } #else #error "Not implemented dap_events_socket_queue_ptr_send() for this platform" #endif - } + }else{ + assert("Not implemented non-ptr queue send from outgoing buffer"); + // TODO Implement non-ptr queue output + } break; case DESCRIPTOR_TYPE_PIPE: case DESCRIPTOR_TYPE_FILE: @@ -765,7 +785,9 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg) return; } - //log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new); + if(s_debug_reactor) + log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new); + if(dap_events_socket_check_unsafe( l_worker, l_es_new)){ // Socket already present in worker, it's OK return; @@ -1031,7 +1053,8 @@ int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_wo if(a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) l_fflags |= NOTE_WRITE; - EV_SET(&a_esocket->kqueue_event , a_esocket->socket, l_filter, EV_ADD| l_flags | EV_CLEAR, l_fflags,0, a_esocket); + EV_SET(&a_esocket->kqueue_event , a_esocket->socket, l_filter, EV_ADD | l_flags , l_fflags, + a_esocket->kqueue_data, a_esocket); if(kevent ( a_worker->kqueue_fd,&a_esocket->kqueue_event,1,NULL,0,NULL) != -1 ){ a_worker->kqueue_events_count++; /* if ( a_worker->kqueue_events_count == (size_t) a_worker->kqueue_events_count_max ){ // realloc @@ -1045,6 +1068,9 @@ int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_wo a_worker->poll =DAP_REALLOC(a_worker->poll, a_worker->poll_count_max * sizeof(*a_worker->poll)); a_worker->poll_esocket =DAP_REALLOC(a_worker->poll_esocket, a_worker->poll_count_max * sizeof(*a_worker->poll_esocket)); }*/ + if(s_debug_reactor) + log_it(L_DEBUG,"Added ident %d in kqueue()", a_esocket->socket); + return 0; }else{ int l_errno = errno; diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index e416cd9a5f0ab0a4f5214a10895f611f472e64f8..e54ab21cf87fc8d32a3cbe60f25bbdb04ff7975b 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -156,6 +156,18 @@ typedef enum { DESCRIPTOR_TYPE_SOCKET_LOCAL_CLIENT, } dap_events_desc_type_t; + +// To transfer esocket link with some pre-sized data +typedef struct dap_events_socket_w_data{ + struct dap_events_socket * esocket; + union{ + byte_t * data; + void * ptr; + uint64_t value; + }; + size_t size; +} dap_events_socket_w_data_t; + typedef struct dap_events_socket { union { #ifdef DAP_OS_WINDOWS @@ -241,7 +253,9 @@ typedef struct dap_events_socket { #elif defined (DAP_EVENTS_CAPS_KQUEUE) struct kevent kqueue_event; struct kevent *kqueue_event_catched; - + + dap_events_socket_w_data_t kqueue_event_catched_data; + short kqueue_base_filter; unsigned short kqueue_base_flags; unsigned int kqueue_base_fflags; @@ -267,6 +281,8 @@ typedef struct dap_events_socket_handler{ uint128_t uuid; } dap_events_socket_handler_t; + + typedef struct dap_events_socket_handler_hh{ dap_events_socket_t * esocket; uint128_t uuid; diff --git a/modules/global-db/dap_chain_global_db_hist.c b/modules/global-db/dap_chain_global_db_hist.c index 1ec16018fcde68f8ef86f01cae6859ced99b3407..5b280698bf1fac474ee8d57e7503fceca03d9e1a 100644 --- a/modules/global-db/dap_chain_global_db_hist.c +++ b/modules/global-db/dap_chain_global_db_hist.c @@ -1246,7 +1246,7 @@ static void *s_list_thread_proc(void *arg) if(!is_process) break; // calculating how many items required to read - uint64_t l_item_count =(uint64_t) min(10, (int64_t)l_item_last - (int64_t)l_item_start + 1); + size_t l_item_count =(uint64_t) min(10, (int64_t)l_item_last - (int64_t)l_item_start + 1); dap_store_obj_t *l_objs = NULL; // read next 1...10 items if(l_item_count > 0)