diff --git a/cmake/OS_Detection.cmake b/cmake/OS_Detection.cmake index 590a6a6f4226fcb1ff47b328f26942280ab3c116..79331705661057451440ea432d78e48833d9c567 100644 --- a/cmake/OS_Detection.cmake +++ b/cmake/OS_Detection.cmake @@ -95,12 +95,13 @@ if(UNIX) elseif (DARWIN) set(CCOPT_SYSTEM "-L/usr/local/lib -L/opt/homebrew/lib -I/opt/homebrew/include -I/usr/local/include") set(LDOPT_SYSTEM "-L/usr/local/lib -L/opt/homebrew/lib -lintl") + set(CCFLAGS_COMMON "-std=c11 -Wall -Wno-address-of-packed-member -Wno-unused-command-line-argument -Wno-deprecated-declarations -Wno-unused-local-typedefs -Wno-unused-function -Wno-implicit-fallthrough -Wno-unused-variable -Wno-unused-but-set-variable -Wno-unused-parameter") if(DAP_DEBUG) - set(_CCOPT "${CCOPT_SYSTEM} -DDAP_DEBUG -Wall -Wno-address-of-packed-member -Wno-unused-command-line-argument -Wno-deprecated-declarations -Wno-unused-local-typedefs -Wno-unused-function -Wno-implicit-fallthrough -Wno-unused-variable -Wno-unused-parameter -g3 -ggdb -fno-eliminate-unused-debug-symbols -fno-strict-aliasing -std=c11") + set(_CCOPT "${CCOPT_SYSTEM} -DDAP_DEBUG ${CCFLAGS_COMMON} -g3 -ggdb -fno-eliminate-unused-debug-symbols -fno-strict-aliasing") set(_LOPT "${LDOPT_SYSTEM}") SET(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS}") else() - set(_CCOPT "${CCOPT_SYSTEM} -Wno-address-of-packed-member -Wno-deprecated-declarations -Wno-unused-command-line-argument -Wno-unused-local-typedefs -Wno-unused-function -Wno-implicit-fallthrough -Wno-unused-variable -Wno-unused-parameter -O3 -fPIC -fno-strict-aliasing -fno-ident -ffast-math -ftree-vectorize -fno-asynchronous-unwind-tables -ffunction-sections -std=c11") + set(_CCOPT "${CCOPT_SYSTEM} ${CCFLAGS_COMMON} -O3 -fPIC -fno-strict-aliasing -fno-ident -ffast-math -ftree-vectorize -fno-asynchronous-unwind-tables -ffunction-sections") set(_LOPT "${LDOPT_SYSTEM}") SET(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS}") endif() diff --git a/dap-sdk/core/include/dap_common.h b/dap-sdk/core/include/dap_common.h index aea85d7025556ecd154f718ae5ceac428f515586..3f686dad6620a3d7e31666c13996d95353b8c6ae 100755 --- a/dap-sdk/core/include/dap_common.h +++ b/dap-sdk/core/include/dap_common.h @@ -188,11 +188,11 @@ DAP_STATIC_INLINE void _dap_aligned_free( void *ptr ) #define DAP_CLIENT_PROTOCOL_VERSION 24 -#if __SIZEOF_LONG__==8 +#if __SIZEOF_LONG__==8 && !defined(DAP_OS_DARWIN) #define DAP_UINT64_FORMAT_X "lX" #define DAP_UINT64_FORMAT_x "lx" #define DAP_UINT64_FORMAT_U "lu" -#elif __SIZEOF_LONG__==4 +#elif __SIZEOF_LONG__==4 || defined (DAP_OS_DARWIN) #define DAP_UINT64_FORMAT_X "llX" #define DAP_UINT64_FORMAT_x "llx" #define DAP_UINT64_FORMAT_U "llu" diff --git a/dap-sdk/io/dap_context.c b/dap-sdk/io/dap_context.c index 36b1d104b8d3354ee1b98c1d8b7c3a6f54a7e527..5d55aa830c169204cc84a7cbaf465656bbe748ec 100644 --- a/dap-sdk/io/dap_context.c +++ b/dap-sdk/io/dap_context.c @@ -106,7 +106,7 @@ int dap_context_init() l_fdlimit.rlim_cur = l_fdlimit.rlim_max; if (setrlimit(RLIMIT_NOFILE, &l_fdlimit)) return -2; - log_it(L_INFO, "Set maximum opened descriptors from %lu to %lu", l_oldlimit, l_fdlimit.rlim_cur); + log_it(L_INFO, "Set maximum opened descriptors from %" DAP_UINT64_FORMAT_U " to %"DAP_UINT64_FORMAT_U, l_oldlimit, l_fdlimit.rlim_cur); #endif return 0; } @@ -1139,8 +1139,8 @@ int dap_context_poll_update(dap_events_socket_t * a_esocket) } } if (l_is_error && l_errno == EBADF){ - log_it(L_ATT,"Poll update: socket %d (%p ) disconnected, rise CLOSE flag to remove from queue, lost %"DAP_UINT64_FORMAT_U":%" DAP_UINT64_FORMAT_U - " bytes",a_esocket->socket,a_esocket,a_esocket->buf_in_size,a_esocket->buf_out_size); + log_it(L_ATT,"Poll update: socket %d (%p ) disconnected, rise CLOSE flag to remove from queue, lost %zd:%zd bytes", + a_esocket->socket,a_esocket,a_esocket->buf_in_size,a_esocket->buf_out_size); a_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; a_esocket->buf_in_size = a_esocket->buf_out_size = 0; // Reset everything from buffer, we close it now all }else if ( l_is_error && l_errno != EINPROGRESS && l_errno != ENOENT){ @@ -1165,24 +1165,39 @@ int dap_context_poll_update(dap_events_socket_t * a_esocket) * @param IOa_context * @param a_esocket */ -int dap_context_add_esocket(dap_context_t * a_context, dap_events_socket_t * a_esocket ) +int dap_context_add(dap_context_t * a_context, dap_events_socket_t * a_es ) { - if(a_context == NULL || a_esocket == NULL) + // Check & add + bool l_is_error=false; + int l_errno=0; + + if(a_es == NULL){ + log_it(L_WARNING, "Can't add NULL esocket to the context"); return -1; + } + if(a_context == NULL){ + log_it(L_WARNING, "Can't add esocket to the NULL context"); + return -1; + } if(g_debug_reactor){ - log_it(L_DEBUG,"Add event socket %p (socket %"DAP_FORMAT_SOCKET")", a_esocket, a_esocket->socket); + log_it(L_DEBUG,"Add event socket %p (socket %"DAP_FORMAT_SOCKET")", a_es, a_es->socket); } #ifdef DAP_EVENTS_CAPS_EPOLL // Init events for EPOLL - a_esocket->ev.events = a_esocket->ev_base_flags ; - if(a_esocket->flags & DAP_SOCK_READY_TO_READ ) - a_esocket->ev.events |= EPOLLIN; - if(a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) - a_esocket->ev.events |= EPOLLOUT; - a_esocket->ev.data.ptr = a_esocket; - a_esocket->context = a_context; - return epoll_ctl(a_context->epoll_fd, EPOLL_CTL_ADD, a_esocket->socket, &a_esocket->ev); + a_es->ev.events = a_es->ev_base_flags ; + if(a_es->flags & DAP_SOCK_READY_TO_READ ) + a_es->ev.events |= EPOLLIN; + if(a_es->flags & DAP_SOCK_READY_TO_WRITE ) + a_es->ev.events |= EPOLLOUT; + a_es->ev.data.ptr = a_es; + a_es->context = a_context; + int l_ret = epoll_ctl(a_context->epoll_fd, EPOLL_CTL_ADD, a_es->socket, &a_es->ev); + if (l_ret != 0 ){ + l_is_error = true; + l_errno = l_ret; + } + #elif defined (DAP_EVENTS_CAPS_POLL) if ( a_context->poll_count == a_context->poll_count_max ){ // realloc a_context->poll_count_max *= 2; @@ -1190,109 +1205,197 @@ int dap_context_add_esocket(dap_context_t * a_context, dap_events_socket_t * a_e a_context->poll =DAP_REALLOC(a_context->poll, a_context->poll_count_max * sizeof(*a_context->poll)); a_context->poll_esocket =DAP_REALLOC(a_context->poll_esocket, a_context->poll_count_max * sizeof(*a_context->poll_esocket)); } - a_context->poll[a_context->poll_count].fd = a_esocket->socket; - a_esocket->poll_index = a_context->poll_count; - a_context->poll[a_context->poll_count].events = a_esocket->poll_base_flags; - if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) + a_context->poll[a_context->poll_count].fd = a_es->socket; + a_es->poll_index = a_context->poll_count; + a_context->poll[a_context->poll_count].events = a_es->poll_base_flags; + if( a_es->flags & DAP_SOCK_READY_TO_READ ) a_context->poll[a_context->poll_count].events |= POLLIN; - if( (a_esocket->flags & DAP_SOCK_READY_TO_WRITE) || (a_esocket->flags & DAP_SOCK_CONNECTING) ) + if( (a_es->flags & DAP_SOCK_READY_TO_WRITE) || (a_es->flags & DAP_SOCK_CONNECTING) ) a_context->poll[a_context->poll_count].events |= POLLOUT; - a_context->poll_esocket[a_context->poll_count] = a_esocket; + a_context->poll_esocket[a_context->poll_count] = a_es; a_context->poll_count++; - a_esocket->context = a_context; - return 0; + a_es->context = a_context; #elif defined (DAP_EVENTS_CAPS_KQUEUE) - if ( a_esocket->type == DESCRIPTOR_TYPE_QUEUE ){ - a_esocket->context = a_context; - return 0; + if ( a_es->type == DESCRIPTOR_TYPE_QUEUE ){ + goto lb_exit; } - if ( a_esocket->type == DESCRIPTOR_TYPE_EVENT && a_esocket->pipe_out){ - a_esocket->context = a_context; - return 0; + if ( a_es->type == DESCRIPTOR_TYPE_EVENT && a_es->pipe_out){ + goto lb_exit; } struct kevent l_event; - u_short l_flags = a_esocket->kqueue_base_flags; - u_int l_fflags = a_esocket->kqueue_base_fflags; - short l_filter = a_esocket->kqueue_base_filter; + u_short l_flags = a_es->kqueue_base_flags; + u_int l_fflags = a_es->kqueue_base_fflags; + short l_filter = a_es->kqueue_base_filter; int l_kqueue_fd =a_context->kqueue_fd; if ( l_kqueue_fd == -1 ){ log_it(L_ERROR, "Esocket is not assigned with anything ,exit"); - return -1; + l_is_error = true; + l_errno = -1; + goto lb_exit; } - // Check & add - bool l_is_error=false; - int l_errno=0; - if (a_esocket->type == DESCRIPTOR_TYPE_EVENT ){ - EV_SET(&l_event, a_esocket->socket, EVFILT_USER,EV_ADD| EV_CLEAR ,0,0, &a_esocket->kqueue_event_catched_data ); + if (a_es->type == DESCRIPTOR_TYPE_EVENT ){ + EV_SET(&l_event, a_es->socket, EVFILT_USER,EV_ADD| EV_CLEAR ,0,0, &a_es->kqueue_event_catched_data ); if( kevent( l_kqueue_fd,&l_event,1,NULL,0,NULL)!=0){ l_is_error = true; l_errno = errno; + goto lb_exit; } }else{ if( l_filter){ - EV_SET(&l_event, a_esocket->socket, l_filter,l_flags| EV_ADD,l_fflags,a_esocket->kqueue_data,a_esocket); + EV_SET(&l_event, a_es->socket, l_filter,l_flags| EV_ADD,l_fflags,a_es->kqueue_data,a_es); if( kevent( l_kqueue_fd,&l_event,1,NULL,0,NULL) != 0 ){ l_is_error = true; l_errno = errno; + goto lb_exit; }else if (g_debug_reactor){ - log_it(L_DEBUG, "kevent set custom filter %d on fd %d",l_filter, a_esocket->socket); + log_it(L_DEBUG, "kevent set custom filter %d on fd %d",l_filter, a_es->socket); } }else{ - if( a_esocket->flags & DAP_SOCK_READY_TO_READ ){ - EV_SET(&l_event, a_esocket->socket, EVFILT_READ,l_flags| EV_ADD,l_fflags,a_esocket->kqueue_data,a_esocket); + if( a_es->flags & DAP_SOCK_READY_TO_READ ){ + EV_SET(&l_event, a_es->socket, EVFILT_READ,l_flags| EV_ADD,l_fflags,a_es->kqueue_data,a_es); if( kevent( l_kqueue_fd,&l_event,1,NULL,0,NULL) != 0 ){ l_is_error = true; l_errno = errno; + goto lb_exit; }else if (g_debug_reactor){ - log_it(L_DEBUG, "kevent set EVFILT_READ on fd %d", a_esocket->socket); + log_it(L_DEBUG, "kevent set EVFILT_READ on fd %d", a_es->socket); } } if( !l_is_error){ - if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE || a_esocket->flags &DAP_SOCK_CONNECTING ){ - EV_SET(&l_event, a_esocket->socket, EVFILT_WRITE,l_flags| EV_ADD,l_fflags,a_esocket->kqueue_data,a_esocket); + if( a_es->flags & DAP_SOCK_READY_TO_WRITE || a_es->flags &DAP_SOCK_CONNECTING ){ + EV_SET(&l_event, a_es->socket, EVFILT_WRITE,l_flags| EV_ADD,l_fflags,a_es->kqueue_data,a_es); if(kevent( l_kqueue_fd,&l_event,1,NULL,0,NULL) != 0){ l_is_error = true; l_errno = errno; + goto lb_exit; }else if (g_debug_reactor){ - log_it(L_DEBUG, "kevent set EVFILT_WRITE on fd %d", a_esocket->socket); + log_it(L_DEBUG, "kevent set EVFILT_WRITE on fd %d", a_es->socket); } } } } } + +#else +#error "Unimplemented new esocket on context callback for current platform" +#endif +lb_exit: if ( l_is_error ){ char l_errbuf[128]; l_errbuf[0]=0; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); log_it(L_ERROR,"Can't update client socket state on kqueue fd %d: \"%s\" (%d)", - a_esocket->socket, l_errbuf, l_errno); + a_es->socket, l_errbuf, l_errno); return l_errno; + }else{ + a_es->context = a_context; + // Add in context HT + if (a_es->socket!=0 && a_es->socket != INVALID_SOCKET){ + HASH_ADD(hh, a_context->esockets, uuid, sizeof(a_es->uuid), a_es ); + a_context->event_sockets_count++; + } + return 0; } +} +/** + * @brief dap_context_remove Removes esocket from its own context + * @param a_esocket Esocket to remove from its own context (if present + * @return Zero if everything is ok, others if error + */ +int dap_context_remove( dap_events_socket_t * a_es) +{ + dap_context_t * l_context = a_es->context; + int l_ret = 0; + if (!l_context) { + log_it(L_WARNING, "No context assigned to esocket %"DAP_FORMAT_SOCKET, a_es->socket); + return -1; + } + + l_context->event_sockets_count--; + if(a_es->socket != 0 && a_es->socket != INVALID_SOCKET ) + HASH_DELETE(hh,l_context->esockets, a_es); + +#if defined(DAP_EVENTS_CAPS_EPOLL) + + if ( epoll_ctl( l_context->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 ) { + int l_errno = errno; + char l_errbuf[128]; + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd %"DAP_FORMAT_HANDLE" \"%s\" (%d)", + l_context->epoll_fd, l_errbuf, l_errno); + l_ret = l_errno; + } //else + // log_it( L_DEBUG,"Removed epoll's event from context #%u", l_context->id ); +#elif defined(DAP_EVENTS_CAPS_KQUEUE) + if (a_es->socket != -1 && a_es->type != DESCRIPTOR_TYPE_TIMER){ + struct kevent * l_event = &a_es->kqueue_event; + if (a_es->kqueue_base_filter){ + EV_SET(l_event, a_es->socket, a_es->kqueue_base_filter ,EV_DELETE, 0,0,a_es); + if ( kevent( l_context->kqueue_fd,l_event,1,NULL,0,NULL) == -1 ) { + int l_errno = errno; + char l_errbuf[128]; + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + log_it( L_ERROR,"Can't remove event socket's handler %d from the kqueue %d filter %d \"%s\" (%d)", a_es->socket, + l_context->kqueue_fd,a_es->kqueue_base_filter, l_errbuf, l_errno); + l_ret = l_errno; + } + }else{ + EV_SET(l_event, a_es->socket, EVFILT_EXCEPT ,EV_DELETE, 0,0,a_es); + kevent( l_context->kqueue_fd,l_event,1,NULL,0,NULL); // If this filter is not set up - no warnings + + + if(a_es->flags & DAP_SOCK_READY_TO_WRITE){ + EV_SET(l_event, a_es->socket, EVFILT_WRITE ,EV_DELETE, 0,0,a_es); + if ( kevent( l_context->kqueue_fd,l_event,1,NULL,0,NULL) == -1 ) { + int l_errno = errno; + char l_errbuf[128]; + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + log_it( L_ERROR,"Can't remove event socket's handler %d from the kqueue %d filter EVFILT_WRITE \"%s\" (%d)", a_es->socket, + l_context->kqueue_fd, l_errbuf, l_errno); + l_ret = l_errno; + } + } + if(a_es->flags & DAP_SOCK_READY_TO_READ){ + EV_SET(l_event, a_es->socket, EVFILT_READ ,EV_DELETE, 0,0,a_es); + if ( kevent( l_context->kqueue_fd,l_event,1,NULL,0,NULL) == -1 ) { + int l_errno = errno; + char l_errbuf[128]; + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + log_it( L_ERROR,"Can't remove event socket's handler %d from the kqueue %d filter EVFILT_READ \"%s\" (%d)", a_es->socket, + l_context->kqueue_fd, l_errbuf, l_errno); + l_ret = l_errno; + } + } + + } + } +#elif defined (DAP_EVENTS_CAPS_POLL) + if (a_es->poll_index < l_context->poll_count ){ + l_context->poll[a_es->poll_index].fd = -1; + l_context->poll_compress = true; + }else{ + log_it(L_ERROR, "Wrong poll index when remove from worker (unsafe): %u when total count %u", a_es->poll_index, l_context->poll_count); + l_ret = -2; + } #else -#error "Unimplemented new esocket on context callback for current platform" +#error "Unimplemented new esocket on worker callback for current platform" #endif - a_esocket->context = a_context; - // Add in context HT - if (a_esocket->socket!=0 && a_esocket->socket != INVALID_SOCKET){ - HASH_ADD(hh, a_context->esockets, uuid, sizeof(a_esocket->uuid), a_esocket ); - a_context->event_sockets_count++; - } - return 0; + a_es->context = NULL; + return l_ret; } - /** - * @brief dap_context_esocket_find_by_uuid + * @brief dap_context_find * @param a_context * @param a_es_uuid * @return */ -dap_events_socket_t *dap_context_esocket_find_by_uuid(dap_context_t * a_context, dap_events_socket_uuid_t a_es_uuid ) +dap_events_socket_t *dap_context_find(dap_context_t * a_context, dap_events_socket_uuid_t a_es_uuid ) { if(a_context == NULL){ log_it(L_ERROR, "Worker is NULL, can't fund esocket by UUID"); @@ -1307,12 +1410,12 @@ dap_events_socket_t *dap_context_esocket_find_by_uuid(dap_context_t * a_context, } /** - * @brief dap_context_create_esocket_queue + * @brief dap_context_create_queue * @param a_context * @param a_callback * @return */ - dap_events_socket_t * dap_context_create_esocket_queue(dap_context_t * a_context, dap_events_socket_callback_queue_ptr_t a_callback) + dap_events_socket_t * dap_context_create_queue(dap_context_t * a_context, dap_events_socket_callback_queue_ptr_t a_callback) { dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); if(!l_es){ @@ -1528,7 +1631,7 @@ dap_events_socket_t *dap_context_esocket_find_by_uuid(dap_context_t * a_context, #endif if ( a_context) { - if(dap_context_add_esocket(a_context, l_es)) { + if(dap_context_add(a_context, l_es)) { #ifdef DAP_OS_WINDOWS errno = WSAGetLastError(); #endif @@ -1545,7 +1648,7 @@ dap_events_socket_t *dap_context_esocket_find_by_uuid(dap_context_t * a_context, * @param a_callback * @return */ -dap_events_socket_t * dap_context_create_esocket_event(dap_context_t * a_context, dap_events_socket_callback_event_t a_callback) +dap_events_socket_t * dap_context_create_event(dap_context_t * a_context, dap_events_socket_callback_event_t a_callback) { dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); if (!l_es) return NULL; l_es->buf_out_size_max = l_es->buf_in_size_max = 1; @@ -1627,22 +1730,22 @@ dap_events_socket_t * dap_context_create_esocket_event(dap_context_t * a_context #elif defined(DAP_EVENTS_CAPS_KQUEUE) // nothing to do #else -#error "Not defined dap_context_create_esocket_event() on your platform" +#error "Not defined dap_context_create_event() on your platform" #endif if(a_context) - dap_context_add_esocket(a_context,l_es); + dap_context_add(a_context,l_es); return l_es; } /** - * @brief dap_context_create_esocket_pipe + * @brief dap_context_create_pipe * @param a_context * @param a_callback * @param a_flags * @return */ -dap_events_socket_t * dap_context_create_esocket_pipe(dap_context_t * a_context, dap_events_socket_callback_t a_callback, uint32_t a_flags) +dap_events_socket_t * dap_context_create_pipe(dap_context_t * a_context, dap_events_socket_callback_t a_callback, uint32_t a_flags) { #ifdef DAP_OS_WINDOWS UNUSED(a_w); @@ -1695,7 +1798,7 @@ dap_events_socket_t * dap_context_create_esocket_pipe(dap_context_t * a_context, #else #error "No defined s_create_type_pipe() for your platform" #endif - dap_context_add_esocket(a_context,l_es); + dap_context_add(a_context,l_es); return l_es; #endif } diff --git a/dap-sdk/io/dap_events_socket.c b/dap-sdk/io/dap_events_socket.c index 44e4e0f0960e32dd32a7d116543a8e94d95722fb..9e08e5102a1935b1f933f18d03bc2c04b6596f9c 100644 --- a/dap-sdk/io/dap_events_socket.c +++ b/dap-sdk/io/dap_events_socket.c @@ -256,7 +256,7 @@ void dap_events_socket_reassign_between_workers_unsafe(dap_events_socket_t * a_e dap_events_socket_t * l_queue_input= l_worker->queue_es_new_input[a_worker_new->id]; log_it(L_DEBUG, "Reassign between %u->%u workers: %p (%d) ", l_worker->id, a_worker_new->id, a_es, a_es->fd ); - dap_events_socket_remove_from_worker_unsafe( a_es, l_worker ); + dap_context_remove(a_es); a_es->was_reassigned = true; if (a_es->callbacks.worker_unassign_callback) a_es->callbacks.worker_unassign_callback(a_es, l_worker); @@ -297,7 +297,7 @@ void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, */ dap_events_socket_t * dap_events_socket_create_type_pipe_mt(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags) { - dap_events_socket_t * l_es = dap_context_create_esocket_pipe(NULL, a_callback, a_flags); + dap_events_socket_t * l_es = dap_context_create_pipe(NULL, a_callback, a_flags); dap_worker_add_events_socket_unsafe(l_es,a_w); return l_es; } @@ -367,7 +367,7 @@ dap_events_socket_t * dap_events_socket_create(dap_events_desc_type_t a_type, da */ dap_events_socket_t * dap_events_socket_create_type_pipe_unsafe(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags) { - dap_events_socket_t * l_es = dap_context_create_esocket_pipe(NULL, a_callback, a_flags); + dap_events_socket_t * l_es = dap_context_create_pipe(NULL, a_callback, a_flags); dap_worker_add_events_socket_unsafe(l_es,a_w); return l_es; } @@ -509,7 +509,7 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket */ dap_events_socket_t * dap_events_socket_create_type_queue_ptr_mt(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback) { - dap_events_socket_t * l_es = dap_context_create_esocket_queue(NULL, a_callback); + dap_events_socket_t * l_es = dap_context_create_queue(NULL, a_callback); assert(l_es); // If no worker - don't assign if ( a_w) @@ -627,7 +627,7 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) */ dap_events_socket_t * dap_events_socket_create_type_event_mt(dap_worker_t * a_w, dap_events_socket_callback_event_t a_callback) { - dap_events_socket_t * l_es = dap_context_create_esocket_event(NULL, a_callback); + dap_events_socket_t * l_es = dap_context_create_event(NULL, a_callback); // If no worker - don't assign if ( a_w) dap_events_socket_assign_on_worker_mt(l_es,a_w); @@ -643,7 +643,7 @@ dap_events_socket_t * dap_events_socket_create_type_event_mt(dap_worker_t * a_w, dap_events_socket_t * dap_events_socket_create_type_event_unsafe(dap_worker_t * a_w, dap_events_socket_callback_event_t a_callback) { - dap_events_socket_t * l_es = dap_context_create_esocket_event(NULL, a_callback); + dap_events_socket_t * l_es = dap_context_create_event(NULL, a_callback); // If no worker - don't assign if ( a_w) dap_worker_add_events_socket_unsafe(l_es,a_w); @@ -1203,7 +1203,7 @@ bool s_remove_and_delete_unsafe_delayed_delete_callback(void * a_arg) assert(l_es_handler); assert(l_worker); dap_events_socket_t * l_es; - if( (l_es = dap_context_esocket_find_by_uuid(l_worker->context, l_es_handler->esocket_uuid)) != NULL) + if( (l_es = dap_context_find(l_worker->context, l_es_handler->esocket_uuid)) != NULL) //dap_events_socket_remove_and_delete_unsafe(l_es,l_es_handler->value == 1); dap_events_socket_remove_and_delete_unsafe( l_es, l_es_handler->value == 1); DAP_DELETE(l_es_handler); @@ -1224,7 +1224,7 @@ void dap_events_socket_remove_and_delete_unsafe_delayed( dap_events_socket_t *a_ dap_events_socket_descriptor_close(a_es); dap_worker_t * l_worker = a_es->context->worker; - dap_events_socket_remove_from_worker_unsafe( a_es, l_worker); + dap_context_remove(a_es); a_es->flags |= DAP_SOCK_SIGNAL_CLOSE; dap_timerfd_start_on_worker(l_worker, s_delayed_ops_timeout_ms, s_remove_and_delete_unsafe_delayed_delete_callback, l_es_handler ); @@ -1247,7 +1247,7 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool #endif //log_it( L_DEBUG, "es is going to be removed from the lists and free the memory (0x%016X)", a_es ); - dap_events_socket_remove_from_worker_unsafe(a_es, a_es->context->worker); + dap_context_remove(a_es); if( a_es->callbacks.delete_callback ) a_es->callbacks.delete_callback( a_es, NULL ); // Init internal structure @@ -1300,83 +1300,6 @@ void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_p DAP_DEL_Z( a_esocket ) } -/** - * @brief dap_events_socket_delete - * @param a_es - */ -void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap_worker_t * a_worker) -{ - if (!a_es->context->worker) { - log_it(L_INFO, "No worker assigned to esocket %"DAP_FORMAT_SOCKET, a_es->socket); - return; - } - - a_worker->context->event_sockets_count--; - if(a_es->socket != 0 && a_es->socket != INVALID_SOCKET ) - HASH_DELETE(hh,a_worker->context->esockets, a_es); - -#if defined(DAP_EVENTS_CAPS_EPOLL) - - if ( epoll_ctl( a_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 ) { - int l_errno = errno; - char l_errbuf[128]; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd %"DAP_FORMAT_HANDLE" \"%s\" (%d)", - a_worker->epoll_fd, l_errbuf, l_errno); - } //else - // log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_worker->id ); -#elif defined(DAP_EVENTS_CAPS_KQUEUE) - if (a_es->socket != -1 && a_es->type != DESCRIPTOR_TYPE_TIMER){ - struct kevent * l_event = &a_es->kqueue_event; - if (a_es->kqueue_base_filter){ - EV_SET(l_event, a_es->socket, a_es->kqueue_base_filter ,EV_DELETE, 0,0,a_es); - if ( kevent( a_worker->context->kqueue_fd,l_event,1,NULL,0,NULL) == -1 ) { - int l_errno = errno; - char l_errbuf[128]; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it( L_ERROR,"Can't remove event socket's handler %d from the kqueue %d filter %d \"%s\" (%d)", a_es->socket, - a_worker->context->kqueue_fd,a_es->kqueue_base_filter, l_errbuf, l_errno); - } - }else{ - EV_SET(l_event, a_es->socket, EVFILT_EXCEPT ,EV_DELETE, 0,0,a_es); - kevent( a_worker->context->kqueue_fd,l_event,1,NULL,0,NULL); // If this filter is not set up - no warnings - - - if(a_es->flags & DAP_SOCK_READY_TO_WRITE){ - EV_SET(l_event, a_es->socket, EVFILT_WRITE ,EV_DELETE, 0,0,a_es); - if ( kevent( a_worker->context->kqueue_fd,l_event,1,NULL,0,NULL) == -1 ) { - int l_errno = errno; - char l_errbuf[128]; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it( L_ERROR,"Can't remove event socket's handler %d from the kqueue %d filter EVFILT_WRITE \"%s\" (%d)", a_es->socket, - a_worker->context->kqueue_fd, l_errbuf, l_errno); - } - } - if(a_es->flags & DAP_SOCK_READY_TO_READ){ - EV_SET(l_event, a_es->socket, EVFILT_READ ,EV_DELETE, 0,0,a_es); - if ( kevent( a_worker->context->kqueue_fd,l_event,1,NULL,0,NULL) == -1 ) { - int l_errno = errno; - char l_errbuf[128]; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it( L_ERROR,"Can't remove event socket's handler %d from the kqueue %d filter EVFILT_READ \"%s\" (%d)", a_es->socket, - a_worker->context->kqueue_fd, l_errbuf, l_errno); - } - } - - } - } -#elif defined (DAP_EVENTS_CAPS_POLL) - if (a_es->poll_index < a_worker->context->poll_count ){ - a_worker->context->poll[a_es->poll_index].fd = -1; - a_worker->context->poll_compress = true; - }else{ - log_it(L_ERROR, "Wrong poll index when remove from worker (unsafe): %u when total count %u", a_es->poll_index, a_worker->context->poll_count); - } -#else -#error "Unimplemented new esocket on worker callback for current platform" -#endif - a_es->context = NULL; -} /** * @brief dap_events_socket_remove_and_delete diff --git a/dap-sdk/io/dap_proc_queue.c b/dap-sdk/io/dap_proc_queue.c index 5ad0766c744fad4d40e3547820dcc2bb44870051..95f2247e0f1f5d102672c1eff0ac48e897e82e72 100644 --- a/dap-sdk/io/dap_proc_queue.c +++ b/dap-sdk/io/dap_proc_queue.c @@ -83,7 +83,7 @@ dap_proc_queue_t * dap_proc_queue_create(dap_proc_thread_t * a_thread) } l_queue->proc_thread = a_thread; - l_queue->esocket = dap_context_create_esocket_queue(a_thread->context,s_queue_esocket_callback); + l_queue->esocket = dap_context_create_queue(a_thread->context,s_queue_esocket_callback); l_queue->esocket->proc_thread = a_thread; l_queue->esocket->_inheritor = l_queue; return l_queue; diff --git a/dap-sdk/io/dap_proc_thread.c b/dap-sdk/io/dap_proc_thread.c index d84119ebb1b7ac3ecaa8ff25aa4cb667fadd710d..7988cbb7ebe618cfc5d595a0cb8264189a25b74a 100644 --- a/dap-sdk/io/dap_proc_thread.c +++ b/dap-sdk/io/dap_proc_thread.c @@ -289,7 +289,7 @@ int dap_proc_thread_assign_esocket_unsafe(dap_proc_thread_t * a_thread, dap_even */ dap_events_socket_t * dap_proc_thread_create_queue_ptr(dap_proc_thread_t * a_thread, dap_events_socket_callback_queue_ptr_t a_callback) { - dap_events_socket_t * l_es = dap_context_create_esocket_queue(a_thread->context,a_callback); + dap_events_socket_t * l_es = dap_context_create_queue(a_thread->context,a_callback); if(l_es == NULL) return NULL; l_es->proc_thread = a_thread; @@ -316,9 +316,9 @@ static void s_context_callback_started( dap_context_t * a_context, void *a_arg) dap_events_socket_assign_on_worker_mt(l_worker_related->proc_queue_input,l_worker_related); - l_thread->proc_event = dap_context_create_esocket_event( a_context , s_proc_event_callback); + l_thread->proc_event = dap_context_create_event( a_context , s_proc_event_callback); l_thread->proc_event->proc_thread = l_thread; - l_thread->event_exit = dap_context_create_esocket_event( a_context, s_event_exit_callback); + l_thread->event_exit = dap_context_create_event( a_context, s_event_exit_callback); l_thread->event_exit->proc_thread = l_thread; l_thread->proc_event->_inheritor = l_thread; // we pass thread through it diff --git a/dap-sdk/io/dap_timerfd.c b/dap-sdk/io/dap_timerfd.c index 9c49828ad9b24c5718a974f728c94e984de12aa4..5b1f39b60886770af2618f8ba6001e8d5e79e3f1 100644 --- a/dap-sdk/io/dap_timerfd.c +++ b/dap-sdk/io/dap_timerfd.c @@ -272,7 +272,7 @@ static inline void s_timerfd_reset(dap_timerfd_t *a_timerfd, dap_events_socket_t // Re-add timer in context dap_context_t * l_context = a_es->context; a_es->context = NULL; - dap_context_add_esocket(l_context,a_es); + dap_context_add(l_context,a_es); #elif defined (DAP_OS_WINDOWS) /*LARGE_INTEGER l_due_time; l_due_time.QuadPart = (long long)a_timerfd->timeout_ms * _MSEC; @@ -316,7 +316,7 @@ static void s_timerfd_reset_worker_callback( dap_worker_t * a_worker, void * a_a { dap_timerfd_t *l_timerfd = (dap_timerfd_t *) a_arg; dap_events_socket_t *l_sock = NULL; - l_sock = dap_context_esocket_find_by_uuid(a_worker->context, l_timerfd->esocket_uuid); + l_sock = dap_context_find(a_worker->context, l_timerfd->esocket_uuid); if (l_sock) s_timerfd_reset(l_timerfd, l_sock); @@ -332,7 +332,7 @@ static bool s_timerfd_reset_proc_thread_callback( dap_proc_thread_t * a_thread, { dap_timerfd_t *l_timerfd = (dap_timerfd_t *) a_arg; dap_events_socket_t *l_sock = NULL; - l_sock = dap_context_esocket_find_by_uuid(a_thread->context, l_timerfd->esocket_uuid); + l_sock = dap_context_find(a_thread->context, l_timerfd->esocket_uuid); if (l_sock) s_timerfd_reset(l_timerfd, l_sock); return true; diff --git a/dap-sdk/io/dap_worker.c b/dap-sdk/io/dap_worker.c index 794ef1e2eed353a70cddc44486f16c72c359d7f9..004d8dca696e8c7a20e67b9d962d334723bb1e9b 100644 --- a/dap-sdk/io/dap_worker.c +++ b/dap-sdk/io/dap_worker.c @@ -90,10 +90,10 @@ void dap_worker_context_callback_started( dap_context_t * a_context, void *a_arg l_worker->queue_es_reassign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_thread_get_count() ); - l_worker->queue_es_new = dap_context_create_esocket_queue(a_context, s_queue_add_es_callback); - l_worker->queue_es_delete = dap_context_create_esocket_queue(a_context, s_queue_delete_es_callback); - l_worker->queue_es_io = dap_context_create_esocket_queue(a_context, s_queue_es_io_callback); - l_worker->queue_es_reassign = dap_context_create_esocket_queue(a_context, s_queue_es_reassign_callback ); + l_worker->queue_es_new = dap_context_create_queue(a_context, s_queue_add_es_callback); + l_worker->queue_es_delete = dap_context_create_queue(a_context, s_queue_delete_es_callback); + l_worker->queue_es_io = dap_context_create_queue(a_context, s_queue_es_io_callback); + l_worker->queue_es_reassign = dap_context_create_queue(a_context, s_queue_es_reassign_callback ); for( size_t n = 0; n < dap_events_thread_get_count(); n++) { @@ -103,8 +103,8 @@ void dap_worker_context_callback_started( dap_context_t * a_context, void *a_arg l_worker->queue_es_reassign_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_reassign); } - l_worker->queue_callback = dap_context_create_esocket_queue(a_context, s_queue_callback_callback); - l_worker->event_exit = dap_context_create_esocket_event(a_context, s_event_exit_callback); + l_worker->queue_callback = dap_context_create_queue(a_context, s_queue_callback_callback); + l_worker->event_exit = dap_context_create_event(a_context, s_event_exit_callback); l_worker->timer_check_activity = dap_timerfd_create(s_connection_timeout * 1000 / 2, s_socket_all_check_activity, l_worker); @@ -163,7 +163,7 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg) if(l_es_new->socket!=0 && l_es_new->socket != INVALID_SOCKET) #endif - if(dap_context_esocket_find_by_uuid( l_context, l_es_new->uuid)){ + if(dap_context_find( l_context, l_es_new->uuid)){ // Socket already present in worker, it's OK return; } @@ -192,7 +192,7 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg) l_es_new->is_initalized = true; } - int l_ret = dap_context_add_esocket(l_context, l_es_new); + int l_ret = dap_context_add(l_context, l_es_new); if ( l_ret != 0 ){ log_it(L_CRITICAL,"Can't add event socket's handler to worker i/o poll mechanism with error %d", errno); }else{ @@ -212,7 +212,7 @@ static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg assert(a_arg); dap_events_socket_uuid_t * l_es_uuid_ptr = (dap_events_socket_uuid_t*) a_arg; dap_events_socket_t * l_es; - if ( (l_es = dap_context_esocket_find_by_uuid(a_es->context,*l_es_uuid_ptr)) != NULL ){ + if ( (l_es = dap_context_find(a_es->context,*l_es_uuid_ptr)) != NULL ){ //l_es->flags |= DAP_SOCK_SIGNAL_CLOSE; // Send signal to socket to kill dap_events_socket_remove_and_delete_unsafe(l_es,false); }else @@ -235,7 +235,7 @@ static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_a dap_worker_msg_reassign_t * l_msg = (dap_worker_msg_reassign_t*) a_arg; assert(l_msg); dap_events_socket_t * l_es_reassign; - if ( ( l_es_reassign = dap_context_esocket_find_by_uuid(l_context, l_msg->esocket_uuid))!= NULL ){ + if ( ( l_es_reassign = dap_context_find(l_context, l_msg->esocket_uuid))!= NULL ){ if( l_es_reassign->was_reassigned && l_es_reassign->flags & DAP_SOCK_REASSIGN_ONCE) { log_it(L_INFO, "Reassgment request with DAP_SOCK_REASSIGN_ONCE allowed only once, declined reassigment from %u to %u", l_es_reassign->context->worker->id, l_msg->worker_new->id); @@ -290,7 +290,7 @@ static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg) dap_worker_msg_io_t * l_msg = a_arg; assert(l_msg); // Check if it was removed from the list - dap_events_socket_t *l_msg_es = dap_context_esocket_find_by_uuid(l_worker->context, l_msg->esocket_uuid); + dap_events_socket_t *l_msg_es = dap_context_find(l_worker->context, l_msg->esocket_uuid); if ( l_msg_es == NULL){ log_it(L_INFO, "We got i/o message for esocket %"DAP_UINT64_FORMAT_U" thats now not in list. Lost %zu data", l_msg->esocket_uuid, l_msg->data_size); DAP_DELETE(l_msg); diff --git a/dap-sdk/io/include/dap_context.h b/dap-sdk/io/include/dap_context.h index 7cd25adc685427082ebd8b5d6393620bdae9b4f1..32f861ad434859fbc518668792f2f7e53ed56f78 100644 --- a/dap-sdk/io/include/dap_context.h +++ b/dap-sdk/io/include/dap_context.h @@ -147,9 +147,10 @@ static inline dap_context_t * dap_context_current() /// ALL THIS FUNCTIONS ARE UNSAFE ! CALL THEM ONLY INSIDE THEIR OWN CONTEXT!! -int dap_context_add_esocket(dap_context_t * a_context, dap_events_socket_t * a_esocket ); -int dap_context_poll_update(dap_events_socket_t * a_esocket); -dap_events_socket_t *dap_context_esocket_find_by_uuid(dap_context_t * a_context, dap_events_socket_uuid_t a_es_uuid ); -dap_events_socket_t * dap_context_create_esocket_queue(dap_context_t * a_context, dap_events_socket_callback_queue_ptr_t a_callback); -dap_events_socket_t * dap_context_create_esocket_event(dap_context_t * a_context, dap_events_socket_callback_event_t a_callback); -dap_events_socket_t * dap_context_create_esocket_pipe(dap_context_t * a_context, dap_events_socket_callback_t a_callback, uint32_t a_flags); +int dap_context_add(dap_context_t * a_context, dap_events_socket_t * a_es ); +int dap_context_remove( dap_events_socket_t * a_es); +int dap_context_poll_update(dap_events_socket_t * a_es); +dap_events_socket_t *dap_context_find(dap_context_t * a_context, dap_events_socket_uuid_t a_es_uuid ); +dap_events_socket_t * dap_context_create_queue(dap_context_t * a_context, dap_events_socket_callback_queue_ptr_t a_callback); +dap_events_socket_t * dap_context_create_event(dap_context_t * a_context, dap_events_socket_callback_event_t a_callback); +dap_events_socket_t * dap_context_create_pipe(dap_context_t * a_context, dap_events_socket_callback_t a_callback, uint32_t a_flags); diff --git a/dap-sdk/io/include/dap_events_socket.h b/dap-sdk/io/include/dap_events_socket.h index 95f17cbd0d05dc5679050cc7c8ba4686d01767dc..f6a3a8699bc2e3a4b331ef68e6c3ace7df161b47 100644 --- a/dap-sdk/io/include/dap_events_socket.h +++ b/dap-sdk/io/include/dap_events_socket.h @@ -187,6 +187,7 @@ typedef struct dap_events_socket_w_data{ } dap_events_socket_w_data_t; typedef uint64_t dap_events_socket_uuid_t; +#define DAP_FORMAT_ESOCKET_UUID "0x%016"DAP_UINT64_FORMAT_X typedef struct dap_events_socket { union { diff --git a/dap-sdk/io/include/dap_worker.h b/dap-sdk/io/include/dap_worker.h index 0bdf64e1898fb8113c5d54c6f177891bfdd791bf..6d95beca282cd984f29a648277c08f6c2a54c521 100644 --- a/dap-sdk/io/include/dap_worker.h +++ b/dap-sdk/io/include/dap_worker.h @@ -102,7 +102,7 @@ static inline dap_worker_t * dap_worker_get_current(){ static inline int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_worker_t * a_worker) { - return dap_context_add_esocket(a_worker->context, a_esocket); + return dap_context_add(a_worker->context, a_esocket); } void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_worker_t * a_worker); diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index 6767dc661c3c9a4c0e0bab214764c0a43aec2454..5e217177dbb5f8cf94f0032fbdd12046f46277dc 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -187,7 +187,7 @@ static bool s_timer_timeout_after_connected_check(void * a_arg) dap_worker_t * l_worker = dap_worker_get_current(); // We're in own esocket context assert(l_worker); - dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid( l_worker->context, *l_es_uuid_ptr); + dap_events_socket_t * l_es = dap_context_find( l_worker->context, *l_es_uuid_ptr); if(l_es){ dap_client_http_pvt_t * l_http_pvt = PVT(l_es); assert(l_http_pvt); @@ -224,7 +224,7 @@ static bool s_timer_timeout_check(void * a_arg) dap_worker_t * l_worker = dap_worker_get_current(); // We're in own esocket context assert(l_worker); - dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid(l_worker->context, *l_es_uuid); + dap_events_socket_t * l_es = dap_context_find(l_worker->context, *l_es_uuid); if(l_es){ if (l_es->flags & DAP_SOCK_CONNECTING ){ dap_client_http_pvt_t * l_http_pvt = PVT(l_es); diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 37e7abd3babc8f4161807e83033574e6d341552c..7e5e3c3fe08e163e562f05e7808a1990fb02e485 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -232,7 +232,7 @@ static bool s_stream_timer_timeout_check(void * a_arg) dap_worker_t *l_worker = dap_worker_get_current(); assert(l_worker); - dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid(l_worker->context, *l_es_uuid_ptr); + dap_events_socket_t * l_es = dap_context_find(l_worker->context, *l_es_uuid_ptr); if(l_es){ if (l_es->flags & DAP_SOCK_CONNECTING ){ dap_client_pvt_t *l_client_pvt = DAP_ESOCKET_CLIENT_PVT(l_es); @@ -274,7 +274,7 @@ static bool s_stream_timer_timeout_after_connected_check(void * a_arg) dap_worker_t * l_worker = dap_worker_get_current(); assert(l_worker); - dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid(l_worker->context, *l_es_uuid_ptr); + dap_events_socket_t * l_es = dap_context_find(l_worker->context, *l_es_uuid_ptr); if( l_es ){ dap_client_pvt_t *l_client_pvt = DAP_ESOCKET_CLIENT_PVT(l_es); if (dap_client_pvt_find(l_client_pvt->uuid)) { @@ -314,7 +314,7 @@ static bool s_enc_init_delay_before_request_timer_callback(void * a_arg) assert (a_arg); dap_events_socket_uuid_t* l_es_uuid_ptr = (dap_events_socket_uuid_t*) a_arg; dap_worker_t * l_worker = dap_worker_get_current(); - dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid(l_worker->context, *l_es_uuid_ptr); + dap_events_socket_t * l_es = dap_context_find(l_worker->context, *l_es_uuid_ptr); if(l_es){ dap_client_pvt_t *l_client_pvt = DAP_ESOCKET_CLIENT_PVT(l_es); s_stage_status_after(l_client_pvt); diff --git a/dap-sdk/net/server/http_server/dap_http_simple.c b/dap-sdk/net/server/http_server/dap_http_simple.c index 6dead19f9665feff878750090f3a08effb63fc1a..ebe20fefb7796e8fd001da25db6d4570764921db 100644 --- a/dap-sdk/net/server/http_server/dap_http_simple.c +++ b/dap-sdk/net/server/http_server/dap_http_simple.c @@ -369,7 +369,7 @@ static void s_http_client_headers_read( dap_http_client_t *a_http_client, void * log_it(L_ERROR, "Not defined content-length %zu in request", a_http_client->in_content_length); } else { log_it( L_DEBUG, "No data section, execution proc callback" ); - dap_events_socket_remove_from_worker_unsafe(l_http_simple->esocket ,l_http_simple->worker); + dap_context_remove(l_http_simple->esocket ); dap_proc_queue_add_callback_inter( l_http_simple->worker->proc_queue_input, s_proc_queue_callback, l_http_simple); } @@ -411,7 +411,7 @@ void s_http_client_data_read( dap_http_client_t *a_http_client, void * a_arg ) // bool isOK=true; log_it( L_INFO,"Data for http_simple_request collected" ); - dap_events_socket_remove_from_worker_unsafe(a_http_client->esocket,a_http_client->esocket->context->worker); + dap_context_remove(a_http_client->esocket); dap_proc_queue_add_callback_inter( l_http_simple->worker->proc_queue_input , s_proc_queue_callback, l_http_simple); } } diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index 1723311f0c54808551fcf1e13f983a411868803f..0d8da5824e51fd8bd146cc0fa91d4e3f6090eb0d 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -889,7 +889,7 @@ static bool s_callback_keepalive(void *a_arg, bool a_server_side) return false; dap_events_socket_uuid_t * l_es_uuid = (dap_events_socket_uuid_t*) a_arg; dap_worker_t * l_worker = dap_worker_get_current(); - dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid(l_worker->context, *l_es_uuid); + dap_events_socket_t * l_es = dap_context_find(l_worker->context, *l_es_uuid); if(l_es) { dap_stream_t *l_stream = NULL; if (a_server_side) { diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 6cc468cb7b5239efd910b683455def512632b05c..ca5dc94075de0ca67de4e7892cb2a308c5abe351 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -211,7 +211,7 @@ dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_events_socket_ dap_events_socket_t * l_es = NULL; dap_events_socket_uuid_t l_es_uuid = l_me->esocket_uuid; // check if esocket still in worker - if( (l_es = dap_context_esocket_find_by_uuid(l_worker->context, l_es_uuid)) != NULL ) { + if( (l_es = dap_context_find(l_worker->context, l_es_uuid)) != NULL ) { dap_client_t * l_client = dap_client_from_esocket(l_es); if (l_client ) { dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t*) l_client->_inheritor; diff --git a/modules/net/dap_chain_node_dns_client.c b/modules/net/dap_chain_node_dns_client.c index 7208405d9117e54e7525ac90678e0c60f4bf8644..13cf93e03299b83e002a721e2c36cdb1c68f04ee 100644 --- a/modules/net/dap_chain_node_dns_client.c +++ b/modules/net/dap_chain_node_dns_client.c @@ -137,7 +137,7 @@ static bool s_dns_client_esocket_timeout_callback(void * a_arg) assert(l_worker); dap_events_socket_t * l_es; - if((l_es = dap_context_esocket_find_by_uuid(l_worker->context,*l_es_uuid_ptr) ) != NULL){ // If we've not closed this esocket + if((l_es = dap_context_find(l_worker->context,*l_es_uuid_ptr) ) != NULL){ // If we've not closed this esocket struct dns_client * l_dns_client = (struct dns_client*) l_es->_inheritor; log_it(L_WARNING,"DNS request timeout, bad network?"); if(! l_dns_client->is_callbacks_called ){