From b662be56b6b2d214d4bb55d5551567ced943643f Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net> Date: Thu, 1 Oct 2020 02:54:51 +0700 Subject: [PATCH] [!] Changed default for linux from epoll to poll [*] Fixed few small issues [*] Merges and renamed few functions --- CMakeLists.txt | 2 +- dap-sdk/net/core/dap_events.c | 2 +- dap-sdk/net/core/dap_events_socket.c | 166 ++++++++++-------- dap-sdk/net/core/dap_proc_queue.c | 10 +- dap-sdk/net/core/dap_proc_thread.c | 93 ++++++++-- dap-sdk/net/core/dap_worker.c | 149 ++++++++++++---- dap-sdk/net/core/include/dap_events.h | 2 +- dap-sdk/net/core/include/dap_events_socket.h | 9 +- dap-sdk/net/core/include/dap_proc_queue.h | 4 +- dap-sdk/net/core/include/dap_proc_thread.h | 2 + dap-sdk/net/core/include/dap_worker.h | 8 + dap-sdk/net/server/enc_server/dap_enc_http.c | 55 +++--- .../net/server/http_server/dap_http_simple.c | 4 +- modules/channel/chain/dap_stream_ch_chain.c | 10 +- 14 files changed, 356 insertions(+), 160 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 41865ce285..56bcfbe61d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.5-21") +set(CELLFRAME_SDK_NATIVE_VERSION "2.5-22") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c index f91dd6a422..2df4436432 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/net/core/dap_events.c @@ -250,7 +250,7 @@ int dap_events_start( dap_events_t *a_events ) l_worker->events = a_events; l_worker->proc_queue = dap_proc_thread_get(i)->proc_queue; #ifdef DAP_EVENTS_CAPS_EPOLL - l_worker->epoll_fd = epoll_create( DAP_MAX_EPOLL_EVENTS ); + l_worker->epoll_fd = epoll_create( DAP_MAX_EVENTS_COUNT ); pthread_mutex_init(& l_worker->started_mutex, NULL); pthread_cond_init( & l_worker->started_cond, NULL); //log_it(L_DEBUG, "Created event_fd %d for worker %u", l_worker->epoll_fd,i); diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 4ede03d902..5a9d6458a9 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -106,6 +106,8 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, #if defined(DAP_EVENTS_CAPS_EPOLL) ret->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP; + #elif defined(DAP_EVENTS_CAPS_POLL) + ret->poll_base_flags = POLLERR | POLLRDHUP | POLLHUP; #endif if ( a_sock!= 0 && a_sock != -1){ @@ -152,22 +154,6 @@ void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, dap_events_socket_queue_ptr_send(a_worker_old->queue_es_reassign, l_msg); } -/** - * @brief dap_events_socket_assign_on_worker_unsafe - * @param a_es - * @param a_worker - */ -void dap_events_socket_assign_on_worker_unsafe(dap_events_socket_t * a_es, struct dap_worker * a_worker) -{ -#if defined(DAP_EVENTS_CAPS_EPOLL) - int l_event_fd = a_es->fd; - //log_it( L_INFO, "Create event descriptor with queue %d (%p) and add it on epoll fd %d", l_event_fd, l_es, a_w->epoll_fd); - a_es->ev.events = a_es->ev_base_flags; - a_es->ev.data.ptr = a_es; - epoll_ctl(a_worker->epoll_fd, EPOLL_CTL_ADD, l_event_fd, &a_es->ev); -#endif -} - /** * @brief s_create_type_pipe * @param a_w @@ -183,7 +169,13 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c l_es->worker = a_w; l_es->events = a_w->events; l_es->callbacks.read_callback = a_callback; // Arm event callback +#if defined(DAP_EVENTS_CAPS_EPOLL) l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; +#elif defined(DAP_EVENTS_CAPS_POLL) + l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; +#else +#error "Not defined s_create_type_pipe for your platform" +#endif #if defined(DAP_EVENTS_CAPS_PIPE_POSIX) int l_pipe[2]; @@ -216,7 +208,7 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c dap_events_socket_t * dap_events_socket_create_type_pipe_mt(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags) { dap_events_socket_t * l_es = s_create_type_pipe(a_w, a_callback, a_flags); - dap_events_socket_assign_on_worker_unsafe(l_es,a_w); + dap_worker_add_events_socket_unsafe(l_es,a_w); return l_es; } @@ -230,7 +222,7 @@ dap_events_socket_t * dap_events_socket_create_type_pipe_mt(dap_worker_t * a_w, dap_events_socket_t * dap_events_socket_create_type_pipe_unsafe(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags) { dap_events_socket_t * l_es = s_create_type_pipe(a_w, a_callback, a_flags); - dap_events_socket_assign_on_worker_unsafe(l_es,a_w); + dap_worker_add_events_socket_unsafe(l_es,a_w); return l_es; } @@ -250,7 +242,15 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc l_es->worker = a_w; } l_es->callbacks.queue_ptr_callback = a_callback; // Arm event callback + +#if defined(DAP_EVENTS_CAPS_EPOLL) l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; +#elif defined(DAP_EVENTS_CAPS_POLL) + l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; +#else +#error "Not defined s_create_type_queue_ptr for your platform" +#endif + #ifdef DAP_EVENTS_CAPS_QUEUE_PIPE2 int l_pipe[2]; @@ -337,7 +337,7 @@ dap_events_socket_t * dap_events_socket_create_type_queue_ptr_unsafe(dap_worker_ assert(l_es); // If no worker - don't assign if ( a_w) - dap_events_socket_assign_on_worker_unsafe(l_es,a_w); + dap_worker_add_events_socket_unsafe(l_es,a_w); return l_es; } @@ -398,7 +398,13 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_ l_es->worker = a_w; } l_es->callbacks.event_callback = a_callback; // Arm event callback +#if defined(DAP_EVENTS_CAPS_EPOLL) l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; +#elif defined(DAP_EVENTS_CAPS_POLL) + l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; +#else +#error "Not defined s_create_type_event for your platform" +#endif #ifdef DAP_EVENTS_CAPS_EVENT_EVENTFD if((l_es->fd = eventfd(0,0) ) < 0 ){ @@ -448,7 +454,7 @@ dap_events_socket_t * dap_events_socket_create_type_event_unsafe(dap_worker_t * dap_events_socket_t * l_es = s_create_type_event(a_w, a_callback); // If no worker - don't assign if ( a_w) - dap_events_socket_assign_on_worker_unsafe(l_es,a_w); + dap_worker_add_events_socket_unsafe(l_es,a_w); return l_es; } @@ -597,6 +603,8 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg) #endif } + + /** * @brief dap_events_socket_event_signal * @param a_es @@ -684,79 +692,85 @@ dap_events_socket_t *dap_events_socket_find_unsafe( int sock, struct dap_events return ret; } -/** - * @brief dap_events_socket_ready_to_read - * @param sc - * @param isReady - */ -void dap_events_socket_set_readable_unsafe( dap_events_socket_t *sc, bool is_ready ) +static void s_worker_poll_mod(dap_events_socket_t * a_esocket) { - if( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_READ)) - return; +#if defined (DAP_EVENTS_CAPS_EPOLL) + int events = a_esocket->ev_base_flags | EPOLLERR; - sc->ev.events = sc->ev_base_flags; - sc->ev.events |= EPOLLERR; - - if ( is_ready ) - sc->flags |= DAP_SOCK_READY_TO_READ; - else - sc->flags ^= DAP_SOCK_READY_TO_READ; - - int events = EPOLLERR; + // Check & add + if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) + events |= EPOLLIN; - if( sc->flags & DAP_SOCK_READY_TO_READ ) - events |= EPOLLIN; + if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) + events |= EPOLLOUT; - if( sc->flags & DAP_SOCK_READY_TO_WRITE ) - events |= EPOLLOUT; + a_esocket->ev.events = events; - sc->ev.events = events; - if (sc->worker) - if ( epoll_ctl(sc->worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &sc->ev) == -1 ){ + if ( epoll_ctl(a_esocket->worker->epoll_fd, EPOLL_CTL_MOD, a_esocket->socket, &a_esocket->ev) ){ int l_errno = errno; char l_errbuf[128]; l_errbuf[0]=0; - strerror_r( l_errno, l_errbuf, sizeof (l_errbuf)); - log_it( L_ERROR,"Can't update read client socket state in the epoll_fd: \"%s\" (%d)", l_errbuf, l_errno ); + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + log_it(L_ERROR,"Can't update client socket state in the epoll_fd %d: \"%s\" (%d)", + a_esocket->worker->epoll_fd, l_errbuf, l_errno); + } +#elif defined (DAP_EVENTS_CAPS_POLL) + if (a_esocket->poll_index < a_esocket->worker->poll_count ){ + struct pollfd * l_poll = &a_esocket->worker->poll[a_esocket->poll_index]; + l_poll->events = a_esocket->poll_base_flags | POLLERR ; + // Check & add + if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) + l_poll->events |= POLLIN; + if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) + l_poll->events |= POLLOUT; + }else{ + log_it(L_ERROR, "Wrong poll index when remove from worker (unsafe): %u when total count %u", a_esocket->poll_index, + a_esocket->worker->poll_count); } + +#else +#error "Not defined dap_events_socket_set_writable_unsafe for your platform" +#endif + } /** - * @brief dap_events_socket_ready_to_write + * @brief dap_events_socket_ready_to_read * @param sc * @param isReady */ -void dap_events_socket_set_writable_unsafe( dap_events_socket_t *sc, bool a_is_ready ) +void dap_events_socket_set_readable_unsafe( dap_events_socket_t *a_esocket, bool is_ready ) { - if ( a_is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_WRITE)) { + if( is_ready == (bool)(a_esocket->flags & DAP_SOCK_READY_TO_READ)) return; - } - if ( a_is_ready ) - sc->flags |= DAP_SOCK_READY_TO_WRITE; + if ( is_ready ) + a_esocket->flags |= DAP_SOCK_READY_TO_READ; else - sc->flags ^= DAP_SOCK_READY_TO_WRITE; - - int events = sc->ev_base_flags | EPOLLERR; + a_esocket->flags ^= DAP_SOCK_READY_TO_READ; - // Check & add - if( sc->flags & DAP_SOCK_READY_TO_READ ) - events |= EPOLLIN; + if( a_esocket->worker) + s_worker_poll_mod( a_esocket); +} - if( sc->flags & DAP_SOCK_READY_TO_WRITE ) - events |= EPOLLOUT; +/** + * @brief dap_events_socket_ready_to_write + * @param a_esocket + * @param isReady + */ +void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool a_is_ready ) +{ + if ( a_is_ready == (bool)(a_esocket->flags & DAP_SOCK_READY_TO_WRITE)) { + return; + } - sc->ev.events = events; + if ( a_is_ready ) + a_esocket->flags |= DAP_SOCK_READY_TO_WRITE; + else + a_esocket->flags ^= DAP_SOCK_READY_TO_WRITE; - if (sc->worker && sc->type != DESCRIPTOR_TYPE_SOCKET_UDP) - if ( epoll_ctl(sc->worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &sc->ev) ){ - int l_errno = errno; - char l_errbuf[128]; - l_errbuf[0]=0; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it(L_ERROR,"Can't update write client socket state in the epoll_fd %d: \"%s\" (%d)", - sc->worker->epoll_fd, l_errbuf, l_errno); - } + if( a_esocket->worker ) + s_worker_poll_mod(a_esocket); } /** @@ -817,6 +831,8 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap // Socket already removed from worker return; } +#ifdef DAP_EVENTS_CAPS_EPOLL + if ( epoll_ctl( a_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 ) { int l_errno = errno; char l_errbuf[128]; @@ -825,6 +841,16 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap a_worker->epoll_fd, l_errbuf, l_errno); } //else // log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_worker->id ); +#elif defined (DAP_EVENTS_CAPS_POLL) + if (a_es->poll_index < a_worker->poll_count ){ + a_worker->poll[a_es->poll_index].fd = -1; + }else{ + log_it(L_ERROR, "Wrong poll index when remove from worker (unsafe): %u when total count %u", a_es->poll_index, a_worker->poll_count); + } +#else +#error "Unimplemented new esocket on worker callback for current platform" +#endif + a_worker->event_sockets_count--; if(a_worker->esockets) HASH_DELETE(hh_worker,a_worker->esockets, a_es); diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/net/core/dap_proc_queue.c index d41f6604ca..4b91b2d996 100644 --- a/dap-sdk/net/core/dap_proc_queue.c +++ b/dap-sdk/net/core/dap_proc_queue.c @@ -20,6 +20,7 @@ You should have received a copy of the GNU General Public License along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>. */ +#include "dap_worker.h" #include "dap_proc_queue.h" #include "dap_proc_thread.h" #define LOG_TAG "dap_proc_queue" @@ -87,15 +88,10 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg) } -void dap_proc_queue_add_callback(dap_proc_queue_t * a_queue,dap_proc_queue_callback_t a_callback, void * a_callback_arg) +void dap_proc_queue_add_callback(dap_worker_t * a_worker,dap_proc_queue_callback_t a_callback, void * a_callback_arg) { dap_proc_queue_msg_t * l_msg = DAP_NEW_Z(dap_proc_queue_msg_t); l_msg->callback = a_callback; l_msg->callback_arg = a_callback_arg; - dap_events_socket_queue_ptr_send( a_queue->esocket, l_msg ); -} - -void dap_proc_queue_add_callback_auto(dap_proc_queue_callback_t a_callback, void * a_callback_arg) -{ - dap_proc_queue_add_callback( dap_proc_thread_get_auto()->proc_queue ,a_callback,a_callback_arg); + dap_events_socket_queue_ptr_send( a_worker->proc_queue->esocket , l_msg ); } diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 133bf81001..203875d1d1 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -24,9 +24,10 @@ #include <assert.h> #include "dap_server.h" -#if defined(DAP_EVENTS_CAPS_WEPOLL) -#elif defined(DAP_EVENTS_CAPS_EPOLL) +#if defined(DAP_EVENTS_CAPS_EPOLL) #include <sys/epoll.h> +#elif defined (DAP_EVENTS_CAPS_POLL) +#include <poll.h> #else #error "Unimplemented poll for this platform" #endif @@ -181,6 +182,24 @@ static void * s_proc_thread_function(void * a_arg) log_it(L_CRITICAL, "Can't add proc queue on epoll ctl"); return NULL; } +#elif defined(DAP_EVENTS_CAPS_POLL) + size_t l_poll_count_max = DAP_MAX_EVENTS_COUNT; + size_t l_poll_count = 0; + bool l_poll_compress = false; + struct pollfd * l_poll = DAP_NEW_Z_SIZE(struct pollfd,l_poll_count_max *sizeof (*l_poll)); + dap_events_socket_t ** l_esockets = DAP_NEW_Z_SIZE(dap_events_socket_t*,l_poll_count_max *sizeof (*l_esockets)); + + // Add proc queue + l_poll[0].fd = l_thread->proc_queue->esocket->fd; + l_poll[0].events = l_thread->proc_queue->esocket->poll_base_flags; + l_esockets[0] = l_thread->proc_queue->esocket; + l_poll_count++; + + // Add proc event + l_poll[1].fd = l_thread->proc_event->fd; + l_poll[1].events = l_thread->proc_event->poll_base_flags; + l_esockets[1] = l_thread->proc_event; + l_poll_count++; #else #error "Unimplemented poll events analog for this platform" @@ -196,6 +215,10 @@ static void * s_proc_thread_function(void * a_arg) #ifdef DAP_EVENTS_CAPS_EPOLL //log_it(L_DEBUG, "Epoll_wait call"); int l_selected_sockets = epoll_wait(l_thread->epoll_ctl, l_epoll_events, DAP_MAX_EPOLL_EVENTS, -1); + size_t l_sockets_max = l_selected_sockets; +#elif defined (DAP_EVENTS_CAPS_POLL) + int l_selected_sockets = poll(l_poll,l_poll_count,-1); + size_t l_sockets_max = l_poll_count; #else #error "Unimplemented poll wait analog for this platform" #endif @@ -209,30 +232,50 @@ static void * s_proc_thread_function(void * a_arg) log_it(L_ERROR, "Proc thread #%d got errno:\"%s\" (%d)", l_thread->cpu_id , l_errbuf, l_errno); break; } - time_t l_cur_time = time( NULL); - for(int32_t n = 0; n < l_selected_sockets; n++) { + for(size_t n = 0; n < l_sockets_max; n++) { dap_events_socket_t * l_cur; + bool l_flag_hup, l_flag_rdhup, l_flag_read, l_flag_write, l_flag_error; +#ifdef DAP_EVENTS_CAPS_EPOLL l_cur = (dap_events_socket_t *) l_epoll_events[n].data.ptr; uint32_t l_cur_events = l_epoll_events[n].events; + l_flag_hup = l_cur_events & EPOLLHUP; + l_flag_rdhup = l_cur_events & EPOLLHUP; + l_flag_write = l_cur_events & EPOLLOUT; + l_flag_read = l_cur_events & EPOLLIN; + l_flag_error = l_cur_events & EPOLLERR; +#elif defined ( DAP_EVENTS_CAPS_POLL) + if(n>=(int32_t)l_poll_count){ + log_it(L_WARNING,"selected_sockets(%d) is bigger then poll count (%u)", l_selected_sockets, l_poll_count); + break; + } + short l_cur_events = l_poll[n].revents ; + if (!l_cur_events) + continue; + l_cur = l_esockets[n]; + l_flag_hup = l_cur_events & POLLHUP; + l_flag_rdhup = l_cur_events & POLLHUP; + l_flag_write = l_cur_events & POLLOUT; + l_flag_read = l_cur_events & POLLIN; + l_flag_error = l_cur_events & POLLERR; +#else +#error "Unimplemented fetch esocket after poll" +#endif + if(!l_cur) { log_it(L_ERROR, "dap_events_socket NULL"); continue; } + time_t l_cur_time = time( NULL); l_cur->last_time_active = l_cur_time; - if (l_cur_events & EPOLLERR ){ - char l_buferr[128]; - strerror_r(errno,l_buferr, sizeof (l_buferr)); - log_it(L_ERROR,"Some error happend in proc thread #%u: %s", l_thread->cpu_id, l_buferr); - } - if (l_cur_events & EPOLLERR ){ + if (l_flag_error){ int l_errno = errno; char l_errbuf[128]; strerror_r(l_errno, l_errbuf,sizeof (l_errbuf)); - log_it(L_ERROR,"Some error with %d socket: %s(%d)", l_cur->socket, l_errbuf, l_errno); + log_it(L_ERROR,"Some error on proc thread #%u with %d socket: %s(%d)",l_thread->cpu_id, l_cur->socket, l_errbuf, l_errno); if(l_cur->callbacks.error_callback) l_cur->callbacks.error_callback(l_cur,errno); } - if (l_cur_events & EPOLLIN ){ + if (l_flag_read ){ switch (l_cur->type) { case DESCRIPTOR_TYPE_QUEUE: dap_events_socket_queue_proc_input_unsafe(l_cur); @@ -255,12 +298,38 @@ static void * s_proc_thread_function(void * a_arg) if(l_cur->_inheritor) DAP_DELETE(l_cur->_inheritor); DAP_DELETE(l_cur); +#elif defined (DAP_EVENTS_CAPS_POLL) + l_poll[n].fd = -1; + l_poll_compress = true; #else #error "Unimplemented poll ctl analog for this platform" #endif } } +#ifdef DAP_EVENTS_CAPS_POLL + /***********************************************************/ + /* If the compress_array flag was turned on, we need */ + /* to squeeze together the array and decrement the number */ + /* of file descriptors. We do not need to move back the */ + /* events and revents fields because the events will always*/ + /* be POLLIN in this case, and revents is output. */ + /***********************************************************/ + if ( l_poll_compress){ + l_poll_compress = false; + for (size_t i = 0; i < l_poll_count ; i++) { + if ( l_poll[i].fd == -1){ + for(size_t j = i; j < l_poll_count-1; j++){ + l_poll[j].fd = l_poll[j+1].fd; + l_esockets[j] = l_esockets[j+1]; + l_esockets[j]->poll_index = j; + } + i--; + l_poll_count--; + } + } + } +#endif } log_it(L_NOTICE, "Stop processing thread #%u", l_thread->cpu_id); return NULL; diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index a79688ead9..d740dcd794 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -76,7 +76,7 @@ void *dap_worker_thread(void *arg) { dap_events_socket_t *l_cur; dap_worker_t *l_worker = (dap_worker_t *) arg; - time_t l_next_time_timeout_check = time( NULL) + s_connection_timeout / 2; + //time_t l_next_time_timeout_check = time( NULL) + s_connection_timeout / 2; uint32_t l_tn = l_worker->id; dap_cpu_assign_thread_on(l_worker->id); @@ -84,6 +84,18 @@ void *dap_worker_thread(void *arg) l_shed_params.sched_priority = 0; pthread_setschedparam(pthread_self(),SCHED_FIFO ,&l_shed_params); +#ifdef DAP_EVENTS_CAPS_EPOLL + struct epoll_event l_epoll_events[ DAP_MAX_EPOLL_EVENTS]= {{0}}; + log_it(L_INFO, "Worker #%d started with epoll fd %d and assigned to dedicated CPU unit", l_worker->id, l_worker->epoll_fd); +#elif defined(DAP_EVENTS_CAPS_POLL) + l_worker->poll_count_max = _SC_PAGE_SIZE; + l_worker->poll = DAP_NEW_Z_SIZE(struct pollfd,l_worker->poll_count_max*sizeof (struct pollfd)); + l_worker->poll_esocket = DAP_NEW_Z_SIZE(dap_events_socket_t*,l_worker->poll_count_max*sizeof (dap_events_socket_t*)); +#else +#error "Unimplemented socket array for this platform" +#endif + + l_worker->queue_es_new = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_new_es_callback); l_worker->queue_es_delete = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_delete_es_callback); l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_io_callback); @@ -92,18 +104,16 @@ void *dap_worker_thread(void *arg) l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback ); l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker,s_connection_timeout / 2,s_socket_all_check_activity,l_worker); -#ifdef DAP_EVENTS_CAPS_EPOLL - struct epoll_event l_epoll_events[ DAP_MAX_EPOLL_EVENTS]= {{0}}; - log_it(L_INFO, "Worker #%d started with epoll fd %d and assigned to dedicated CPU unit", l_worker->id, l_worker->epoll_fd); -#else -#error "Unimplemented socket array for this platform" -#endif pthread_cond_broadcast(&l_worker->started_cond); bool s_loop_is_active = true; while(s_loop_is_active) { #ifdef DAP_EVENTS_CAPS_EPOLL int l_selected_sockets = epoll_wait(l_worker->epoll_fd, l_epoll_events, DAP_MAX_EPOLL_EVENTS, -1); + size_t l_sockets_max = l_selected_sockets; +#elif defined(DAP_EVENTS_CAPS_POLL) + int l_selected_sockets = poll(l_worker->poll, l_worker->poll_count, -1); + size_t l_sockets_max = l_worker->poll_count; #else #error "Unimplemented poll wait analog for this platform" #endif @@ -118,20 +128,40 @@ void *dap_worker_thread(void *arg) } time_t l_cur_time = time( NULL); - for(int32_t n = 0; n < l_selected_sockets; n++) { + for(size_t n = 0; n < l_sockets_max; n++) { + bool l_flag_hup, l_flag_rdhup, l_flag_read, l_flag_write, l_flag_error; +#ifdef DAP_EVENTS_CAPS_EPOLL l_cur = (dap_events_socket_t *) l_epoll_events[n].data.ptr; + l_flag_hup = l_epoll_events[n].events & EPOLLHUP; + l_flag_rdhup = l_epoll_events[n].events & EPOLLHUP; + l_flag_write = l_epoll_events[n].events & EPOLLOUT; + l_flag_read = l_epoll_events[n].events & EPOLLIN; + l_flag_error = l_epoll_events[n].events & EPOLLERR; +#elif defined ( DAP_EVENTS_CAPS_POLL) + short l_cur_events =l_worker->poll[n].revents; + if (!l_cur_events) // No events for this socket + continue; + l_flag_hup = l_cur_events& POLLHUP; + l_flag_rdhup = l_cur_events & POLLHUP; + l_flag_write = l_cur_events & POLLOUT; + l_flag_read = l_cur_events & POLLIN; + l_flag_error = l_cur_events & POLLERR; + l_cur = l_worker->poll_esocket[n]; + //log_it(L_DEBUG, "flags: returned events 0x%0X requested events 0x%0X",l_worker->poll[n].revents,l_worker->poll[n].events ); +#else +#error "Unimplemented fetch esocket after poll" +#endif if(!l_cur) { log_it(L_ERROR, "dap_events_socket NULL"); continue; } l_cur->last_time_active = l_cur_time; +// log_it(L_DEBUG, "Worker=%d fd=%d", l_worker->id, l_cur->socket); - //log_it(L_DEBUG, "Worker=%d fd=%d socket=%d event=0x%x(%d)", l_worker->id, - // l_worker->epoll_fd,l_cur->socket, l_epoll_events[n].events,l_epoll_events[n].events); int l_sock_err = 0, l_sock_err_size = sizeof(l_sock_err); //connection already closed (EPOLLHUP - shutdown has been made in both directions) - if(l_epoll_events[n].events & EPOLLHUP) { // && events[n].events & EPOLLERR) { + if( l_flag_hup) { // && events[n].events & EPOLLERR) { switch (l_cur->type ){ case DESCRIPTOR_TYPE_SOCKET_LISTENING: case DESCRIPTOR_TYPE_SOCKET: @@ -150,7 +180,7 @@ void *dap_worker_thread(void *arg) } } - if(l_epoll_events[n].events & EPOLLERR) { + if(l_flag_error) { switch (l_cur->type ){ case DESCRIPTOR_TYPE_SOCKET_LISTENING: case DESCRIPTOR_TYPE_SOCKET: @@ -165,7 +195,7 @@ void *dap_worker_thread(void *arg) l_cur->callbacks.error_callback(l_cur, 0); // Call callback to process error event } - if (l_epoll_events[n].events & EPOLLRDHUP) { + if (l_flag_hup) { log_it(L_INFO, "Client socket disconnected"); dap_events_socket_set_readable_unsafe(l_cur, false); dap_events_socket_set_writable_unsafe(l_cur, false); @@ -174,7 +204,7 @@ void *dap_worker_thread(void *arg) } - if(l_epoll_events[n].events & EPOLLIN) { + if(l_flag_read) { //log_it(L_DEBUG, "Comes connection with type %d", l_cur->type); if(l_cur->buf_in_size == sizeof(l_cur->buf_in)) { @@ -273,7 +303,7 @@ void *dap_worker_thread(void *arg) l_cur->buf_out_size = 0; } } - else if (!(l_epoll_events[n].events & EPOLLRDHUP) || !(l_epoll_events[n].events & EPOLLERR)) { + else if (! l_flag_rdhup || !l_flag_error) { log_it(L_WARNING, "EPOLLIN triggered but nothing to read"); dap_events_socket_set_readable_unsafe(l_cur,false); } @@ -281,7 +311,7 @@ void *dap_worker_thread(void *arg) } // Socket is ready to write - if(((l_epoll_events[n].events & EPOLLOUT) || (l_cur->flags & DAP_SOCK_READY_TO_WRITE)) + if(( l_flag_write || (l_cur->flags & DAP_SOCK_READY_TO_WRITE)) && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE)) { //log_it(L_DEBUG, "Main loop output: %u bytes to send", l_cur->buf_out_size); @@ -348,7 +378,7 @@ void *dap_worker_thread(void *arg) //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", l_bytes_sent,l_cur->buf_out_size); if (l_bytes_sent) { - if ( l_bytes_sent <= l_cur->buf_out_size ){ + if ( l_bytes_sent <= (ssize_t) l_cur->buf_out_size ){ l_cur->buf_out_size -= l_bytes_sent; if (l_cur->buf_out_size ) { memmove(l_cur->buf_out, &l_cur->buf_out[l_bytes_sent], l_cur->buf_out_size); @@ -387,7 +417,29 @@ void *dap_worker_thread(void *arg) return NULL; } } - +#ifdef DAP_EVENTS_CAPS_POLL + /***********************************************************/ + /* If the compress_array flag was turned on, we need */ + /* to squeeze together the array and decrement the number */ + /* of file descriptors. We do not need to move back the */ + /* events and revents fields because the events will always*/ + /* be POLLIN in this case, and revents is output. */ + /***********************************************************/ + if ( l_worker->poll_compress){ + l_worker->poll_compress = false; + for (size_t i = 0; i < l_worker->poll_count ; i++) { + if ( l_worker->poll[i].fd == -1){ + for(size_t j = i; j < l_worker->poll_count-1; j++){ + l_worker->poll[j].fd = l_worker->poll[j+1].fd; + l_worker->poll_esocket[j] = l_worker->poll_esocket[j+1]; + l_worker->poll_esocket[j]->poll_index = j; + } + i--; + l_worker->poll_count--; + } + } + } +#endif } // while log_it(L_NOTICE,"Exiting thread #%u", l_worker->id); return NULL; @@ -412,7 +464,7 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) int l_cpu = w->id; setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu)); } - bool l_socket_present = (l_es_new->worker && l_es_new->is_initalized) ? true : false; + l_es_new->worker = w; // We need to differ new and reassigned esockets. If its new - is_initialized is false if ( ! l_es_new->is_initalized ){ @@ -422,23 +474,7 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) } if (l_es_new->socket>0){ - int l_ret = -1; -#ifdef DAP_EVENTS_CAPS_EPOLL - // Init events for EPOLL - l_es_new->ev.events = l_es_new->ev_base_flags ; - if(l_es_new->flags & DAP_SOCK_READY_TO_READ ) - l_es_new->ev.events |= EPOLLIN; - if(l_es_new->flags & DAP_SOCK_READY_TO_WRITE ) - l_es_new->ev.events |= EPOLLOUT; - l_es_new->ev.data.ptr = l_es_new; - if (l_socket_present) { - // Update only flags, socket already present in worker - return; - } - l_ret = epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, l_es_new->socket, &l_es_new->ev); -#else -#error "Unimplemented new esocket on worker callback for current platform" -#endif + int l_ret = dap_worker_add_events_socket_unsafe(l_es_new,w); if ( l_ret != 0 ){ log_it(L_CRITICAL,"Can't add event socket's handler to worker i/o poll mechanism with error %d", errno); }else{ @@ -595,6 +631,47 @@ void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_wor } } +/** + * @brief dap_worker_add_events_socket_unsafe + * @param a_worker + * @param a_esocket + */ +int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_worker_t * a_worker ) +{ + +#ifdef DAP_EVENTS_CAPS_EPOLL + // Init events for EPOLL + a_esocket->ev.events = a_esocket->ev_base_flags ; + if(a_esocket->flags & DAP_SOCK_READY_TO_READ ) + a_esocket->ev.events |= EPOLLIN; + if(a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) + a_esocket->ev.events |= EPOLLOUT; + a_esocket->ev.data.ptr = a_esocket; + return epoll_ctl(a_worker->epoll_fd, EPOLL_CTL_ADD, a_esocket->socket, &a_esocket->ev); +#elif defined (DAP_EVENTS_CAPS_POLL) + if ( a_worker->poll_count == a_worker->poll_count_max ){ // realloc + log_it(L_WARNING, "Too many descriptors, resizing array twice"); + a_worker->poll_count_max *= 2; + a_worker->poll =DAP_REALLOC(a_worker->poll, a_worker->poll_count_max * sizeof(*a_worker->poll)); + a_worker->poll_esocket =DAP_REALLOC(a_worker->poll_esocket, a_worker->poll_count_max * sizeof(*a_worker->poll_esocket)); + } + a_worker->poll[a_worker->poll_count].fd = a_esocket->socket; + a_esocket->poll_index = a_worker->poll_count; + a_worker->poll[a_worker->poll_count].events = a_esocket->poll_base_flags; + if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) + a_worker->poll[a_worker->poll_count].events |= POLLIN; + if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) + a_worker->poll[a_worker->poll_count].events |= POLLOUT; + + a_worker->poll_esocket[a_worker->poll_count] = a_esocket; + a_worker->poll_count++; + return 0; +#else +#error "Unimplemented new esocket on worker callback for current platform" +#endif + +} + /** * @brief dap_worker_exec_callback_on */ diff --git a/dap-sdk/net/core/include/dap_events.h b/dap-sdk/net/core/include/dap_events.h index c8e5471fb9..0f119ee031 100644 --- a/dap-sdk/net/core/include/dap_events.h +++ b/dap-sdk/net/core/include/dap_events.h @@ -28,7 +28,7 @@ #include "dap_server.h" #include "dap_worker.h" struct dap_events; -#define DAP_MAX_EPOLL_EVENTS 8192 +#define DAP_MAX_EVENTS_COUNT 8192 typedef void (*dap_events_callback_t) (struct dap_events *, void *arg); // Callback for specific server's operations diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 474c26ad4c..ceb4bc717e 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -32,7 +32,8 @@ // Caps for different platforms #if defined(DAP_OS_LINUX) - #define DAP_EVENTS_CAPS_EPOLL +// #define DAP_EVENTS_CAPS_EPOLL + #define DAP_EVENTS_CAPS_POLL #define DAP_EVENTS_CAPS_PIPE_POSIX #define DAP_EVENTS_CAPS_QUEUE_PIPE2 //#define DAP_EVENTS_CAPS_QUEUE_POSIX @@ -60,6 +61,8 @@ #elif defined (DAP_EVENTS_CAPS_WEPOLL) #include "wepoll.h" #define EPOLL_HANDLE HANDLE +#elif defined (DAP_EVENTS_CAPS_POLL) +#include <poll.h> #endif #define BIT( x ) ( 1 << x ) @@ -172,6 +175,9 @@ typedef struct dap_events_socket { #ifdef DAP_EVENTS_CAPS_EPOLL uint32_t ev_base_flags; struct epoll_event ev; +#elif defined (DAP_EVENTS_CAPS_POLL) + short poll_base_flags; + uint32_t poll_index; // index in poll array on worker #endif dap_events_socket_callbacks_t callbacks; @@ -208,7 +214,6 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct dap_events *a_events, int a_sock, dap_events_socket_callbacks_t *a_callbacks ); -void dap_events_socket_assign_on_worker_unsafe(dap_events_socket_t * a_es, struct dap_worker * a_worker); void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct dap_worker * a_worker); void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, dap_events_socket_t * a_es, dap_worker_t * a_worker_new); diff --git a/dap-sdk/net/core/include/dap_proc_queue.h b/dap-sdk/net/core/include/dap_proc_queue.h index e38c2d82c3..097a44c179 100644 --- a/dap-sdk/net/core/include/dap_proc_queue.h +++ b/dap-sdk/net/core/include/dap_proc_queue.h @@ -42,7 +42,5 @@ typedef struct dap_proc_queue{ dap_proc_queue_t * dap_proc_queue_create(dap_proc_thread_t * a_thread); void dap_proc_queue_delete(dap_proc_queue_t * a_queue); -void dap_proc_queue_add_callback(dap_proc_queue_t * a_queue,dap_proc_queue_callback_t a_callback, void * a_callback_arg); - -void dap_proc_queue_add_callback_auto(dap_proc_queue_callback_t a_callback, void * a_callback_arg); +void dap_proc_queue_add_callback(dap_worker_t * a_worker, dap_proc_queue_callback_t a_callback, void * a_callback_arg); diff --git a/dap-sdk/net/core/include/dap_proc_thread.h b/dap-sdk/net/core/include/dap_proc_thread.h index 0682f285b2..b637cc8159 100644 --- a/dap-sdk/net/core/include/dap_proc_thread.h +++ b/dap-sdk/net/core/include/dap_proc_thread.h @@ -42,6 +42,8 @@ typedef struct dap_proc_thread{ #ifdef DAP_EVENTS_CAPS_EPOLL EPOLL_HANDLE epoll_ctl; +#elif defined (DAP_EVENTS_CAPS_POLL) + int poll_fd; #else #error "No poll for proc thread for your platform" #endif diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/net/core/include/dap_worker.h index 5a00bf7008..f8c88d66a6 100644 --- a/dap-sdk/net/core/include/dap_worker.h +++ b/dap-sdk/net/core/include/dap_worker.h @@ -50,6 +50,13 @@ typedef struct dap_worker dap_timerfd_t * timer_check_activity; #ifdef DAP_EVENTS_CAPS_EPOLL EPOLL_HANDLE epoll_fd; +#elif defined ( DAP_EVENTS_CAPS_POLL) + int poll_fd; + struct pollfd * poll; + dap_events_socket_t ** poll_esocket; + size_t poll_count; + size_t poll_count_max; + bool poll_compress; // Some of fd's became NULL so arrays need to be reassigned #endif pthread_cond_t started_cond; pthread_mutex_t started_mutex; @@ -81,6 +88,7 @@ typedef struct dap_worker_msg_callback{ int dap_worker_init( size_t a_conn_timeout ); void dap_worker_deinit(); +int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_worker_t * a_worker); void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_worker_t * a_worker); dap_worker_t *dap_worker_add_events_socket_auto( dap_events_socket_t * a_events_socket ); void dap_worker_exec_callback_on(dap_worker_t * a_worker, dap_worker_callback_t a_callback, void * a_arg); diff --git a/dap-sdk/net/server/enc_server/dap_enc_http.c b/dap-sdk/net/server/enc_server/dap_enc_http.c index d8db585694..dc64d018b7 100644 --- a/dap-sdk/net/server/enc_server/dap_enc_http.c +++ b/dap-sdk/net/server/enc_server/dap_enc_http.c @@ -100,51 +100,66 @@ void enc_http_proc(struct dap_http_simple *cl_st, void * arg) http_status_code_t * return_code = (http_status_code_t*)arg; if(strcmp(cl_st->http_client->url_path,"gd4y5yh78w42aaagh") == 0 ) { + dap_enc_key_type_t l_pkey_exchange_type =DAP_ENC_KEY_TYPE_MSRLN ; + dap_enc_key_type_t l_enc_type = DAP_ENC_KEY_TYPE_IAES; + size_t l_pkey_exchange_size=MSRLN_PKA_BYTES; + sscanf(cl_st->http_client->in_query_string, "enc_type=%d,pkey_exchange_type=%d,pkey_exchange_size=%zd", + &l_pkey_exchange_type,&l_enc_type,&l_pkey_exchange_size); + uint8_t alice_msg[cl_st->request_size]; - size_t decode_len = dap_enc_base64_decode(cl_st->request, cl_st->request_size, alice_msg, DAP_ENC_DATA_TYPE_B64); + size_t l_decode_len = dap_enc_base64_decode(cl_st->request, cl_st->request_size, alice_msg, DAP_ENC_DATA_TYPE_B64); dap_chain_hash_fast_t l_sign_hash = {}; - if (decode_len < MSRLN_PKA_BYTES) { - log_it(L_WARNING, "Wrong http_enc request. Key not equal MSRLN_PKA_BYTES"); + if (l_decode_len < l_pkey_exchange_size) { + log_it(L_WARNING, "Wrong http_enc request. Key not equal pkey exchange size %zd", l_pkey_exchange_size); *return_code = Http_Status_BadRequest; return; - } else if (decode_len > MSRLN_PKA_BYTES) { - dap_sign_t *l_sign = (dap_sign_t *)&alice_msg[MSRLN_PKA_BYTES]; - if (dap_sign_verify(l_sign, alice_msg, MSRLN_PKA_BYTES) != 1) { + } else if (l_decode_len > l_pkey_exchange_size+ sizeof (dap_sign_hdr_t)) { + dap_sign_t *l_sign = (dap_sign_t *)&alice_msg[l_pkey_exchange_size]; + if (l_sign->header.sign_size == 0 || l_sign->header.sign_size > (l_decode_len - l_pkey_exchange_size) ){ + log_it(L_WARNING,"Wrong signature size %zd (decoded length %zd)",l_sign->header.sign_size, l_decode_len); + *return_code = Http_Status_BadRequest; + return; + } + if (dap_sign_verify(l_sign, alice_msg, l_pkey_exchange_size) != 1) { *return_code = Http_Status_Unauthorized; return; } dap_sign_get_pkey_hash(l_sign, &l_sign_hash); + }else if( l_decode_len != l_pkey_exchange_size){ + log_it(L_WARNING, "Wrong http_enc request. Data after pkey exchange is lesser or equal then signature's header"); + *return_code = Http_Status_BadRequest; + return; } - dap_enc_key_t* msrln_key = dap_enc_key_new(DAP_ENC_KEY_TYPE_MSRLN); - msrln_key->gen_bob_shared_key(msrln_key, alice_msg, MSRLN_PKA_BYTES, (void**)&msrln_key->pub_key_data); + dap_enc_key_t* l_pkey_exchange_key = dap_enc_key_new(l_pkey_exchange_type); + l_pkey_exchange_key->gen_bob_shared_key(l_pkey_exchange_key, alice_msg, l_pkey_exchange_size, (void**)&l_pkey_exchange_key->pub_key_data); - dap_enc_ks_key_t * key_ks = dap_enc_ks_new(); + dap_enc_ks_key_t * l_enc_key_ks = dap_enc_ks_new(); if (s_acl_callback) { - key_ks->acl_list = s_acl_callback(&l_sign_hash); + l_enc_key_ks->acl_list = s_acl_callback(&l_sign_hash); } else { - log_it(L_WARNING, "Callback for ACL is not set, pass anauthorized"); + log_it(L_DEBUG, "Callback for ACL is not set, pass anauthorized"); } - char encrypt_msg[DAP_ENC_BASE64_ENCODE_SIZE(msrln_key->pub_key_data_size) + 1]; - size_t encrypt_msg_size = dap_enc_base64_encode(msrln_key->pub_key_data, msrln_key->pub_key_data_size, encrypt_msg, DAP_ENC_DATA_TYPE_B64); + char encrypt_msg[DAP_ENC_BASE64_ENCODE_SIZE(l_pkey_exchange_key->pub_key_data_size) + 1]; + size_t encrypt_msg_size = dap_enc_base64_encode(l_pkey_exchange_key->pub_key_data, l_pkey_exchange_key->pub_key_data_size, encrypt_msg, DAP_ENC_DATA_TYPE_B64); encrypt_msg[encrypt_msg_size] = '\0'; - key_ks->key = dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_IAES, - msrln_key->priv_key_data, // shared key - msrln_key->priv_key_data_size, - key_ks->id, DAP_ENC_KS_KEY_ID_SIZE, 0); - dap_enc_ks_save_in_storage(key_ks); + l_enc_key_ks->key = dap_enc_key_new_generate(l_enc_type, + l_pkey_exchange_key->priv_key_data, // shared key + l_pkey_exchange_key->priv_key_data_size, + l_enc_key_ks->id, DAP_ENC_KS_KEY_ID_SIZE, 0); + dap_enc_ks_save_in_storage(l_enc_key_ks); char encrypt_id[DAP_ENC_BASE64_ENCODE_SIZE(DAP_ENC_KS_KEY_ID_SIZE) + 1]; - size_t encrypt_id_size = dap_enc_base64_encode(key_ks->id, DAP_ENC_KS_KEY_ID_SIZE, encrypt_id, DAP_ENC_DATA_TYPE_B64); + size_t encrypt_id_size = dap_enc_base64_encode(l_enc_key_ks->id, DAP_ENC_KS_KEY_ID_SIZE, encrypt_id, DAP_ENC_DATA_TYPE_B64); encrypt_id[encrypt_id_size] = '\0'; _enc_http_write_reply(cl_st, encrypt_id, encrypt_msg); - dap_enc_key_delete(msrln_key); + dap_enc_key_delete(l_pkey_exchange_key); *return_code = Http_Status_OK; diff --git a/dap-sdk/net/server/http_server/dap_http_simple.c b/dap-sdk/net/server/http_server/dap_http_simple.c index 0bc3b51be6..17f0cf83bc 100644 --- a/dap-sdk/net/server/http_server/dap_http_simple.c +++ b/dap-sdk/net/server/http_server/dap_http_simple.c @@ -354,7 +354,7 @@ static void s_http_client_headers_read( dap_http_client_t *a_http_client, void * } else { log_it( L_DEBUG, "No data section, execution proc callback" ); dap_events_socket_remove_from_worker_unsafe(a_http_client->esocket,a_http_client->esocket->worker); - dap_proc_queue_add_callback( l_http_simple->worker->proc_queue, s_proc_queue_callback, l_http_simple); + dap_proc_queue_add_callback( l_http_simple->worker, s_proc_queue_callback, l_http_simple); } } @@ -417,7 +417,7 @@ void s_http_client_data_read( dap_http_client_t *a_http_client, void * a_arg ) // bool isOK=true; log_it( L_INFO,"Data for http_simple_request collected" ); dap_events_socket_remove_from_worker_unsafe(a_http_client->esocket,a_http_client->esocket->worker); - dap_proc_queue_add_callback( l_http_simple->worker->proc_queue , s_proc_queue_callback, l_http_simple); + dap_proc_queue_add_callback( l_http_simple->worker , s_proc_queue_callback, l_http_simple); } } diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index dcf3ad9356..26db4a4a6c 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -467,7 +467,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); } dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_chains_callback, a_ch); + dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_sync_chains_callback, a_ch); } } } @@ -491,7 +491,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_gdb_callback, a_ch); + dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_sync_gdb_callback, a_ch); } else { log_it(L_ERROR, "Get DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB session_id=%u bad request", @@ -523,7 +523,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); l_ch_chain->pkt_data_size = l_chain_pkt_data_size; dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_chain_pkt_callback, a_ch); + dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_chain_pkt_callback, a_ch); } else { log_it(L_WARNING, "Empty chain packet"); dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, @@ -552,7 +552,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); l_ch_chain->pkt_data_size = l_chain_pkt_data_size; dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_gdb_pkt_callback, a_ch); + dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_gdb_pkt_callback, a_ch); } else { log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size"); dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, @@ -744,6 +744,6 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); if (l_ch_chain && l_ch_chain->state != CHAIN_STATE_IDLE) { dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_out_pkt_callback, a_ch); + dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_out_pkt_callback, a_ch); } } -- GitLab