diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 468c99d74c927095d016391f2a0ac61c1b9907d7..6d5817df2d4aae9edad35539b3d76831d4760720 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -184,7 +184,7 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, pthread_rwlock_wrlock(&a_events->sockets_rwlock); HASH_ADD_INT(a_events->sockets, socket, l_ret); pthread_rwlock_unlock(&a_events->sockets_rwlock); - }else + }else if(s_debug_reactor) log_it(L_WARNING, "Be carefull, you've wrapped socket 0 or -1 so it wasn't added to global list. Do it yourself when possible"); //log_it( L_DEBUG,"Dap event socket wrapped around %d sock a_events = %X", a_sock, a_events ); diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 87108fc04cd7ec30dab678151b66c380615f6de7..260428d710ee19fcbd8aa0858c5036caa1368954 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -178,6 +178,54 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_va // log_it(L_DEBUG, "<-- Proc event callback end"); } +/** + * @brief dap_proc_thread_assign_esocket_unsafe + * @param a_thread + * @param a_esocket + * @return + */ +int dap_proc_thread_assign_esocket_unsafe(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket) +{ +#ifdef DAP_EVENTS_CAPS_EPOLL + // Init events for EPOLL + a_esocket->ev.events = a_esocket->ev_base_flags ; + if(a_esocket->flags & DAP_SOCK_READY_TO_READ ) + a_esocket->ev.events |= EPOLLIN; + if(a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) + a_esocket->ev.events |= EPOLLOUT; + a_esocket->ev.data.ptr = a_esocket; + return epoll_ctl(a_worker->epoll_fd, EPOLL_CTL_ADD, a_esocket->socket, &a_esocket->ev); +#elif defined (DAP_EVENTS_CAPS_POLL) + if ( a_thread->poll_count == a_thread->poll_count_max ){ // realloc + a_thread->poll_count_max *= 2; + log_it(L_WARNING, "Too many descriptors (%u), resizing array twice to %u", a_thread->poll_count, a_thread->poll_count_max); + a_thread->poll =DAP_REALLOC(a_thread->poll, a_thread->poll_count_max * sizeof(*a_thread->poll)); + a_thread->esockets =DAP_REALLOC(a_thread->esockets, a_thread->poll_count_max * sizeof(*a_thread->esockets)); + } + + a_thread->poll[a_thread->poll_count].fd = a_thread->proc_queue->esocket->fd; + a_thread->poll[a_thread->poll_count].events = a_thread->proc_queue->esocket->poll_base_flags; + a_thread->esockets[a_thread->poll_count] = a_thread->proc_queue->esocket; + a_thread->poll_count++; +#elif defined (DAP_EVENTS_CAPS_KQUEUE) + u_short l_flags = a_esocket->kqueue_base_flags; + u_int l_fflags = a_esocket->kqueue_base_fflags; + short l_filter = a_esocket->kqueue_base_filter; + if(a_esocket->flags & DAP_SOCK_READY_TO_READ ) + l_fflags |= NOTE_READ; + if(a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) + l_fflags |= NOTE_WRITE; + + EV_SET(&a_esocket->kqueue_event , a_esocket->socket, l_filter, EV_ADD| l_flags | EV_CLEAR, l_fflags,0, a_esocket); + return kevent ( a_worker->kqueue_fd,&a_esocket->kqueue_event,1,NULL,0,NULL)==1 ? 0 : -1 ; + +#else +#error "Unimplemented new esocket on worker callback for current platform" +#endif + + return dap_proc_thread_esocket_update_poll_flags(a_thread,a_esocket); +} + /** * @brief dap_proc_thread_esocket_update_poll_flags * @param a_thread @@ -373,10 +421,7 @@ static void * s_proc_thread_function(void * a_arg) l_thread->esockets = DAP_NEW_Z_SIZE(dap_events_socket_t*,l_thread->poll_count_max *sizeof (*l_thread->esockets)); // Add proc queue - l_thread->poll[l_thread->poll_count].fd = l_thread->proc_queue->esocket->fd; - l_thread->poll[l_thread->poll_count].events = l_thread->proc_queue->esocket->poll_base_flags; - l_thread->esockets[l_thread->poll_count] = l_thread->proc_queue->esocket; - l_thread->poll_count++; + dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->proc_queue->esocket); // Add proc event l_thread->poll[l_thread->poll_count].fd = l_thread->proc_event->fd; diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index b1f3a4f03aeb8a56c900bb9d059e9c5a317aada1..298e86923143140a612ba143c5ddb7e070496670 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -149,7 +149,7 @@ void *dap_worker_thread(void *arg) l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker, s_connection_timeout * 1000 / 2, s_socket_all_check_activity, l_worker); - + pthread_cond_broadcast(&l_worker->started_cond); bool s_loop_is_active = true; while(s_loop_is_active) { int l_selected_sockets; @@ -187,30 +187,30 @@ void *dap_worker_thread(void *arg) bool l_flag_hup, l_flag_rdhup, l_flag_read, l_flag_write, l_flag_error, l_flag_nval, l_flag_msg, l_flag_pri; #ifdef DAP_EVENTS_CAPS_EPOLL l_cur = (dap_events_socket_t *) l_epoll_events[n].data.ptr; - uint32_t l_cur_events = l_epoll_events[n].events; - l_flag_hup = l_cur_events & EPOLLHUP; - l_flag_rdhup = l_cur_events & EPOLLRDHUP; - l_flag_write = l_cur_events & EPOLLOUT; - l_flag_read = l_cur_events & EPOLLIN; - l_flag_error = l_cur_events & EPOLLERR; - l_flag_pri = l_cur_events & EPOLLPRI; + uint32_t l_cur_flags = l_epoll_events[n].events; + l_flag_hup = l_cur_flags & EPOLLHUP; + l_flag_rdhup = l_cur_flags & EPOLLRDHUP; + l_flag_write = l_cur_flags & EPOLLOUT; + l_flag_read = l_cur_flags & EPOLLIN; + l_flag_error = l_cur_flags & EPOLLERR; + l_flag_pri = l_cur_flags & EPOLLPRI; l_flag_nval = false; #elif defined ( DAP_EVENTS_CAPS_POLL) - short l_cur_events =l_worker->poll[n].revents; + short l_cur_flags =l_worker->poll[n].revents; if (l_worker->poll[n].fd == -1) // If it was deleted on previous iterations continue; - if (!l_cur_events) // No events for this socket + if (!l_cur_flags) // No events for this socket continue; - l_flag_hup = l_cur_events& POLLHUP; - l_flag_rdhup = l_cur_events & POLLRDHUP; - l_flag_write = (l_cur_events & POLLOUT) || (l_cur_events &POLLRDNORM)|| (l_cur_events &POLLRDBAND ) ; - l_flag_read = l_cur_events & POLLIN || (l_cur_events &POLLWRNORM)|| (l_cur_events &POLLWRBAND ); - l_flag_error = l_cur_events & POLLERR; - l_flag_nval = l_cur_events & POLLNVAL; - l_flag_pri = l_cur_events & POLLPRI; - l_flag_msg = l_cur_events & POLLMSG; + l_flag_hup = l_cur_flags& POLLHUP; + l_flag_rdhup = l_cur_flags & POLLRDHUP; + l_flag_write = (l_cur_flags & POLLOUT) || (l_cur_flags &POLLRDNORM)|| (l_cur_flags &POLLRDBAND ) ; + l_flag_read = l_cur_flags & POLLIN || (l_cur_flags &POLLWRNORM)|| (l_cur_flags &POLLWRBAND ); + l_flag_error = l_cur_flags & POLLERR; + l_flag_nval = l_cur_flags & POLLNVAL; + l_flag_pri = l_cur_flags & POLLPRI; + l_flag_msg = l_cur_flags & POLLMSG; l_cur = l_worker->poll_esocket[n]; //log_it(L_DEBUG, "flags: returned events 0x%0X requested events 0x%0X",l_worker->poll[n].revents,l_worker->poll[n].events ); #elif defined (DAP_EVENTS_CAPS_KQUEUE) diff --git a/dap-sdk/net/core/include/dap_proc_thread.h b/dap-sdk/net/core/include/dap_proc_thread.h index 12a7fc0ab4b5eab330a43cdfd2f6552e8f44cfbe..caf4ea2e5a155c64996c185025c7432a47a61f46 100644 --- a/dap-sdk/net/core/include/dap_proc_thread.h +++ b/dap-sdk/net/core/include/dap_proc_thread.h @@ -80,4 +80,4 @@ typedef void (*dap_proc_worker_callback_t)(dap_worker_t *,void *); void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a_worker_id, dap_proc_worker_callback_t a_callback, void * a_arg); -dap_proc_thread_t * dap_proc_thread_assign_esocket_unsafe(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket); +int dap_proc_thread_assign_esocket_unsafe(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket); diff --git a/dap-sdk/net/server/notify_server/src/dap_notify_srv.c b/dap-sdk/net/server/notify_server/src/dap_notify_srv.c index 722603fa1aefe6d99e2491756ff14a9431df370f..fde9b2d2833c6ea3c63cb0b4ba536f583ee443f7 100644 --- a/dap-sdk/net/server/notify_server/src/dap_notify_srv.c +++ b/dap-sdk/net/server/notify_server/src/dap_notify_srv.c @@ -54,9 +54,11 @@ int dap_notify_server_init(const char * a_notify_socket_path) if (!s_notify_server) return -1; +#ifdef DAP_OS_UNIX struct sockaddr_un l_sock_un={0}; l_sock_un.sun_family = AF_LOCAL; strncpy(l_sock_un.sun_path, a_notify_socket_path, sizeof(l_sock_un.sun_path) - 1); +#endif return 0; } @@ -75,7 +77,7 @@ void dap_notify_server_deinit() */ struct dap_events_socket * dap_notify_server_create_inter() { - + return NULL; } /** @@ -101,7 +103,7 @@ int dap_notify_server_send_f_inter(struct dap_events_socket * a_input, const cha */ int dap_notify_server_send_f_mt(const char * a_format,...) { - + return -1; } /**