From 265faae2c2346c1b81dc958f25e2433933584c44 Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net> Date: Mon, 9 Nov 2020 20:16:39 +0700 Subject: [PATCH] [+] Inter thread queues&functions --- dap-sdk/net/core/dap_events.c | 9 + dap-sdk/net/core/dap_events_socket.c | 144 ++++++++++++++- dap-sdk/net/core/dap_proc_queue.c | 21 ++- dap-sdk/net/core/dap_proc_thread.c | 167 +++++++++++++++--- dap-sdk/net/core/dap_worker.c | 32 ++++ dap-sdk/net/core/include/dap_events_socket.h | 18 +- dap-sdk/net/core/include/dap_proc_queue.h | 1 + dap-sdk/net/core/include/dap_proc_thread.h | 9 + dap-sdk/net/core/include/dap_worker.h | 13 +- .../net/server/http_server/dap_http_simple.c | 10 +- modules/channel/chain/dap_stream_ch_chain.c | 20 +-- 11 files changed, 391 insertions(+), 53 deletions(-) diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c index 6913a18d95..10eb4fdcf0 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/net/core/dap_events.c @@ -251,6 +251,7 @@ int dap_events_start( dap_events_t *a_events ) l_worker->id = i; l_worker->events = a_events; l_worker->proc_queue = dap_proc_thread_get(i)->proc_queue; + l_worker->proc_queue_input = dap_events_socket_queue_ptr_create_input(l_worker->proc_queue->esocket); #ifdef DAP_EVENTS_CAPS_EPOLL l_worker->epoll_fd = epoll_create( DAP_MAX_EVENTS_COUNT ); pthread_mutex_init(& l_worker->started_mutex, NULL); @@ -282,6 +283,14 @@ int dap_events_start( dap_events_t *a_events ) return -3; } } + // Link queues between + for( uint32_t i = 0; i < s_threads_count; i++) { + dap_worker_t * l_worker = s_workers[i]; + l_worker->queue_es_io_input = DAP_NEW_S_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* s_threads_count); + for( uint32_t n = 0; n < s_threads_count; n++) { + l_worker->queue_es_io_input[n] = dap_events_socket_queue_ptr_create_input(s_workers[n]->queue_es_io); + } + } return 0; } diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 1985ba9985..821f7728d9 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -53,6 +53,21 @@ #define LOG_TAG "dap_events_socket" +// Item for QUEUE_PTR input esocket +struct queue_ptr_input_item{ + dap_events_socket_t * esocket; + void * ptr; + struct queue_ptr_input_item * next; +}; + +// QUEUE_PTR input esocket pvt section +struct queue_ptr_input_pvt{ + dap_events_socket_t * esocket; + struct queue_ptr_input_item * items_first; + struct queue_ptr_input_item * items_last; +}; +#define PVT_QUEUE_PTR_INPUT(a) ( (struct queue_ptr_input_pvt*) (a)->_pvt ) + /** * @brief dap_events_socket_init Init clients module * @return Zero if ok others if no @@ -134,6 +149,14 @@ void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct da dap_worker_add_events_socket(a_es,a_worker); } +void dap_events_socket_assign_on_worker_inter(dap_events_socket_t * a_es_input, dap_events_socket_t * a_es) +{ + a_es->last_ping_request = time(NULL); + // log_it(L_DEBUG, "Assigned %p on worker %u", a_es, a_worker->id); + dap_worker_add_events_socket_inter(a_es_input,a_es); + +} + void dap_events_socket_reassign_between_workers_unsafe(dap_events_socket_t * a_es, dap_worker_t * a_worker_new) { @@ -229,6 +252,49 @@ dap_events_socket_t * dap_events_socket_create_type_pipe_unsafe(dap_worker_t * a return l_es; } +/** + * @brief s_socket_type_queue_ptr_input_callback_delete + * @param a_es + * @param a_arg + */ +static void s_socket_type_queue_ptr_input_callback_delete(dap_events_socket_t * a_es, void * a_arg) +{ + (void) a_arg; + for (struct queue_ptr_input_item * l_item = PVT_QUEUE_PTR_INPUT(a_es)->items_first; l_item; ){ + struct queue_ptr_input_item * l_item_next= l_item->next; + DAP_DELETE(l_item); + l_item= l_item_next; + } + PVT_QUEUE_PTR_INPUT(a_es)->items_first = PVT_QUEUE_PTR_INPUT(a_es)->items_last = NULL; +} + + +/** + * @brief dap_events_socket_queue_ptr_create_input + * @param a_es + * @return + */ +dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket_t* a_es) +{ + dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); + l_es->type = DESCRIPTOR_TYPE_QUEUE; + l_es->events = a_es->events; +#if defined(DAP_EVENTS_CAPS_EPOLL) + l_es->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP; +#elif defined(DAP_EVENTS_CAPS_POLL) + l_es->poll_base_flags = POLLERR | POLLRDHUP | POLLHUP; +#else +#error "Not defined s_create_type_pipe for your platform" +#endif + + l_es->type = DESCRIPTOR_TYPE_QUEUE; + l_es->fd = a_es->fd2; + l_es->flags = DAP_SOCK_QUEUE_PTR; + l_es->_pvt = DAP_NEW_Z(struct queue_ptr_input_pvt); + l_es->callbacks.delete_callback = s_socket_type_queue_ptr_input_callback_delete; + return l_es; +} + /** * @brief s_create_type_queue * @param a_w @@ -571,6 +637,17 @@ static void add_ptr_to_buf(dap_events_socket_t * a_es, void* a_arg) pthread_create(&l_thread, NULL, dap_events_socket_buf_thread, l_item); } +/** + * @brief dap_events_socket_queue_ptr_send_to_input + * @param a_es_input + * @param a_arg + * @return + */ +int dap_events_socket_queue_ptr_send_to_input(dap_events_socket_t * a_es_input, void * a_arg) +{ + return dap_events_socket_write_unsafe(a_es_input,&a_arg,sizeof (a_arg) )==sizeof (a_arg) ; +} + /** * @brief dap_events_socket_send_event * @param a_es @@ -820,6 +897,8 @@ void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_p if ( a_esocket->_inheritor && !a_preserve_inheritor ) DAP_DELETE( a_esocket->_inheritor ); + if (a_esocket->_pvt) + DAP_DELETE(a_esocket->_pvt); if ( a_esocket->socket && a_esocket->socket != -1) { #ifdef _WIN32 @@ -946,6 +1025,69 @@ void dap_events_socket_set_writable_mt(dap_worker_t * a_w, dap_events_socket_t * } } +/** + * @brief dap_events_socket_write_inter + * @param a_es_input + * @param a_es + * @param a_data + * @param a_data_size + * @return + */ +size_t dap_events_socket_write_inter(dap_events_socket_t * a_es_input, dap_events_socket_t *a_es, const void * a_data, size_t a_data_size) +{ + dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); + l_msg->esocket = a_es; + l_msg->data = DAP_NEW_SIZE(void,a_data_size); + l_msg->data_size = a_data_size; + l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; + memcpy( l_msg->data, a_data, a_data_size); + + int l_ret= dap_events_socket_queue_ptr_send_to_input( a_es_input, l_msg ); + if (l_ret!=0){ + log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret); + DAP_DELETE(l_msg); + return 0; + } + return a_data_size; +} + +/** + * @brief dap_events_socket_write_f_inter + * @param a_es_input + * @param sc + * @param format + * @return + */ +size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_events_socket_t *a_es, const char * a_format,...) +{ + va_list ap, ap_copy; + va_start(ap,a_format); + va_copy(ap_copy, ap); + int l_data_size = dap_vsnprintf(NULL,0,a_format,ap); + va_end(ap); + if (l_data_size <0 ){ + log_it(L_ERROR,"Can't write out formatted data '%s' with values",a_format); + va_end(ap_copy); + return 0; + } + + dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); + l_msg->esocket = a_es; + l_msg->data = DAP_NEW_SIZE(void,l_data_size); + l_msg->data_size = l_data_size; + l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; + l_data_size = dap_vsprintf(l_msg->data,a_format,ap_copy); + va_end(ap_copy); + + int l_ret= dap_events_socket_queue_ptr_send_to_input(a_es_input, l_msg ); + if (l_ret!=0){ + log_it(L_ERROR, "Wasn't send pointer to queue input: code %d", l_ret); + DAP_DELETE(l_msg); + return 0; + } + return l_data_size; +} + /** * @brief dap_events_socket_write_mt * @param sc @@ -964,7 +1106,7 @@ size_t dap_events_socket_write_mt(dap_worker_t * a_w,dap_events_socket_t *a_es, int l_ret= dap_events_socket_queue_ptr_send(a_w->queue_es_io, l_msg ); if (l_ret!=0){ - log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret); + log_it(L_ERROR, "Wasn't send pointer to queue input: code %d", l_ret); DAP_DELETE(l_msg); return 0; } diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/net/core/dap_proc_queue.c index 2a6374b2a1..e9ae5baa8b 100644 --- a/dap-sdk/net/core/dap_proc_queue.c +++ b/dap-sdk/net/core/dap_proc_queue.c @@ -87,7 +87,12 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg) DAP_DELETE(l_msg); } - +/** + * @brief dap_proc_queue_add_callback + * @param a_worker + * @param a_callback + * @param a_callback_arg + */ void dap_proc_queue_add_callback(dap_worker_t * a_worker,dap_proc_queue_callback_t a_callback, void * a_callback_arg) { dap_proc_queue_msg_t * l_msg = DAP_NEW_Z(dap_proc_queue_msg_t); @@ -95,3 +100,17 @@ void dap_proc_queue_add_callback(dap_worker_t * a_worker,dap_proc_queue_callback l_msg->callback_arg = a_callback_arg; dap_events_socket_queue_ptr_send( a_worker->proc_queue->esocket , l_msg ); } + +/** + * @brief dap_proc_queue_add_callback_inter + * @param a_es_input + * @param a_callback + * @param a_callback_arg + */ +void dap_proc_queue_add_callback_inter( dap_events_socket_t * a_es_input, dap_proc_queue_callback_t a_callback, void * a_callback_arg) +{ + dap_proc_queue_msg_t * l_msg = DAP_NEW_Z(dap_proc_queue_msg_t); + l_msg->callback = a_callback; + l_msg->callback_arg = a_callback_arg; + dap_events_socket_queue_ptr_send_to_input( a_es_input , l_msg ); +} diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 86afea3b6d..6f75494795 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -159,10 +159,15 @@ static void * s_proc_thread_function(void * a_arg) l_thread->proc_event = dap_events_socket_create_type_queue_ptr_unsafe(NULL, s_proc_event_callback); l_thread->proc_event->_inheritor = l_thread; // we pass thread through it - + l_thread->queue_assign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)*dap_events_worker_get_count() ); + l_thread->queue_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)*dap_events_worker_get_count() ); + for (size_t n=0; n<dap_events_worker_get_count(); n++){ + l_thread->queue_assign_input[n] = dap_events_socket_queue_ptr_create_input(dap_events_worker_get(n)->queue_es_new ); + l_thread->queue_io_input[n] = dap_events_socket_queue_ptr_create_input(dap_events_worker_get(n)->queue_es_io ); + } #ifdef DAP_EVENTS_CAPS_EPOLL - struct epoll_event l_epoll_events[DAP_EVENTS_SOCKET_MAX], l_ev; - memset(l_epoll_events, 0,sizeof (l_epoll_events)); + struct epoll_event l_epoll_events = l_thread->epoll_events, l_ev; + memset(l_thread->epoll_events, 0,sizeof (l_thread->epoll_events)); // Create epoll ctl l_thread->epoll_ctl = epoll_create( DAP_EVENTS_SOCKET_MAX ); @@ -182,24 +187,54 @@ static void * s_proc_thread_function(void * a_arg) log_it(L_CRITICAL, "Can't add proc queue on epoll ctl"); return NULL; } + + for (size_t n = 0; n< dap_events_worker_get_count(); n++){ + l_ev.events = l_thread->queue_assign_input[n]->ev_base_flags ; + l_ev.data.ptr = l_thread->queue_assign_input[n]; + if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_assign_input[n]->fd , &l_ev) != 0 ){ + log_it(L_CRITICAL, "Can't add proc queue on epoll ctl"); + return NULL; + } + + l_ev.events = l_thread->queue_io_input[n]->ev_base_flags ; + l_ev.data.ptr = l_thread->queue_io_input[n]; + if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_io_input[n]->fd , &l_ev) != 0 ){ + log_it(L_CRITICAL, "Can't add proc queue on epoll ctl"); + return NULL; + } + } #elif defined(DAP_EVENTS_CAPS_POLL) - size_t l_poll_count_max = DAP_EVENTS_SOCKET_MAX; - size_t l_poll_count = 0; + l_thread->poll_count_max = DAP_EVENTS_SOCKET_MAX; + l_thread->poll_count = 0; bool l_poll_compress = false; - struct pollfd * l_poll = DAP_NEW_Z_SIZE(struct pollfd,l_poll_count_max *sizeof (*l_poll)); - dap_events_socket_t ** l_esockets = DAP_NEW_Z_SIZE(dap_events_socket_t*,l_poll_count_max *sizeof (*l_esockets)); + l_thread->poll = DAP_NEW_Z_SIZE(struct pollfd,l_thread->poll_count_max *sizeof (*l_thread->poll)); + l_thread->esockets = DAP_NEW_Z_SIZE(dap_events_socket_t*,l_thread->poll_count_max *sizeof (*l_thread->esockets)); // Add proc queue - l_poll[0].fd = l_thread->proc_queue->esocket->fd; - l_poll[0].events = l_thread->proc_queue->esocket->poll_base_flags; - l_esockets[0] = l_thread->proc_queue->esocket; - l_poll_count++; + l_thread->poll[0].fd = l_thread->proc_queue->esocket->fd; + l_thread->poll[0].events = l_thread->proc_queue->esocket->poll_base_flags; + l_thread->esockets[0] = l_thread->proc_queue->esocket; + l_thread->poll_count++; // Add proc event - l_poll[1].fd = l_thread->proc_event->fd; - l_poll[1].events = l_thread->proc_event->poll_base_flags; - l_esockets[1] = l_thread->proc_event; - l_poll_count++; + l_thread->poll[1].fd = l_thread->proc_event->fd; + l_thread->poll[1].events = l_thread->proc_event->poll_base_flags; + l_thread->esockets[1] = l_thread->proc_event; + l_thread->poll_count++; + + for (size_t n = 0; n< dap_events_worker_get_count(); n++){ + l_thread->queue_assign_input[n]->poll_index = l_thread->poll_count; + l_thread->poll[l_thread->poll_count].fd = l_thread->queue_assign_input[n]->fd; + l_thread->poll[l_thread->poll_count].events = l_thread->queue_assign_input[n]->poll_base_flags; + l_thread->esockets[l_thread->poll_count] = l_thread->queue_assign_input[n]; + l_thread->poll_count++; + + l_thread->queue_io_input[n]->poll_index = l_thread->poll_count; + l_thread->poll[l_thread->poll_count].fd = l_thread->queue_io_input[n]->fd; + l_thread->poll[l_thread->poll_count].events = l_thread->queue_io_input[n]->poll_base_flags; + l_thread->esockets[l_thread->poll_count] = l_thread->queue_io_input[n]; + l_thread->poll_count++; + } #else #error "Unimplemented poll events analog for this platform" @@ -217,8 +252,8 @@ static void * s_proc_thread_function(void * a_arg) int l_selected_sockets = epoll_wait(l_thread->epoll_ctl, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1); size_t l_sockets_max = l_selected_sockets; #elif defined (DAP_EVENTS_CAPS_POLL) - int l_selected_sockets = poll(l_poll,l_poll_count,-1); - size_t l_sockets_max = l_poll_count; + int l_selected_sockets = poll(l_thread->poll,l_thread->poll_count,-1); + size_t l_sockets_max = l_thread->poll_count; #else #error "Unimplemented poll wait analog for this platform" #endif @@ -244,14 +279,14 @@ static void * s_proc_thread_function(void * a_arg) l_flag_read = l_cur_events & EPOLLIN; l_flag_error = l_cur_events & EPOLLERR; #elif defined ( DAP_EVENTS_CAPS_POLL) - if(n>=(int32_t)l_poll_count){ - log_it(L_WARNING,"selected_sockets(%d) is bigger then poll count (%u)", l_selected_sockets, l_poll_count); + if(n>=l_thread->poll_count){ + log_it(L_WARNING,"selected_sockets(%d) is bigger then poll count (%u)", l_selected_sockets, l_thread->poll_count); break; } - short l_cur_events = l_poll[n].revents ; + short l_cur_events = l_thread->poll[n].revents ; if (!l_cur_events) continue; - l_cur = l_esockets[n]; + l_cur = l_thread->esockets[n]; l_flag_hup = l_cur_events & POLLHUP; l_flag_rdhup = l_cur_events & POLLHUP; l_flag_write = l_cur_events & POLLOUT; @@ -287,6 +322,45 @@ static void * s_proc_thread_function(void * a_arg) default:{ log_it(L_ERROR, "Unprocessed descriptor type accepted in proc thread loop"); } } } + if (l_flag_write ){ + ssize_t l_bytes_sent = -1; + switch (l_cur->type) { + case DESCRIPTOR_TYPE_QUEUE: + if (l_cur->flags & DAP_SOCK_QUEUE_PTR){ +#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) + l_bytes_sent = write(l_cur->socket, l_cur->buf_out, sizeof (void *) ); // We send pointer by pointer +#elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX) + l_bytes_sent = mq_send(a_es->mqd, (const char *) l_cur->buf_out, sizeof (void *),0); +#else +#error "Not implemented dap_events_socket_queue_ptr_send() for this platform" +#endif + int l_errno = errno; + break; + }break; + default: + log_it(L_ERROR, "Dont process write flags for this socket %d in proc thread", l_cur->fd); + + } + if(l_bytes_sent>0){ + l_cur->buf_out_size -= l_bytes_sent; + if (l_cur->buf_out_size ){ // Shrink output buffer + memmove(l_cur->buf_out, l_cur->buf_out+l_bytes_sent, l_cur->buf_out_size ); + }else{ + #ifdef DAP_EVENTS_CAPS_EPOLL + l_ev.events = l_epoll_events[n].events ^ EPOLLOUT ; + l_ev.data.ptr = l_cur; + if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_MOD, l_cur->fd , &l_ev) != 0 ){ + log_it(L_CRITICAL, "Can't update queue_ptr on epoll ctl on proc thread"); + return NULL; + } + #elif defined ( DAP_EVENTS_CAPS_POLL) + l_thread->poll[n].events ^= POLLOUT; + #else + #error "Not implemented poll/epoll here" + #endif + } + } + } if(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE){ #ifdef DAP_EVENTS_CAPS_EPOLL if ( epoll_ctl( l_thread->epoll_ctl, EPOLL_CTL_DEL, l_cur->fd, &l_cur->ev ) == -1 ) @@ -299,7 +373,7 @@ static void * s_proc_thread_function(void * a_arg) DAP_DELETE(l_cur->_inheritor); DAP_DELETE(l_cur); #elif defined (DAP_EVENTS_CAPS_POLL) - l_poll[n].fd = -1; + l_thread->poll[n].fd = -1; l_poll_compress = true; #else #error "Unimplemented poll ctl analog for this platform" @@ -317,22 +391,57 @@ static void * s_proc_thread_function(void * a_arg) /***********************************************************/ if ( l_poll_compress){ l_poll_compress = false; - for (size_t i = 0; i < l_poll_count ; i++) { - if ( l_poll[i].fd == -1){ - for(size_t j = i; j < l_poll_count-1; j++){ - l_poll[j].fd = l_poll[j+1].fd; - l_esockets[j] = l_esockets[j+1]; - l_esockets[j]->poll_index = j; + for (size_t i = 0; i < l_thread->poll_count ; i++) { + if ( l_thread->poll[i].fd == -1){ + for(size_t j = i; j < l_thread->poll_count-1; j++){ + l_thread->poll[j].fd = l_thread->poll[j+1].fd; + l_thread->esockets[j] = l_thread->esockets[j+1]; + l_thread->esockets[j]->poll_index = j; } i--; - l_poll_count--; + l_thread->poll_count--; } } } #endif } log_it(L_NOTICE, "Stop processing thread #%u", l_thread->cpu_id); + + // cleanip inputs + for (size_t n=0; n<dap_events_worker_get_count(); n++){ + dap_events_socket_delete_unsafe(l_thread->queue_assign_input[n], false); + dap_events_socket_delete_unsafe(l_thread->queue_io_input[n], false); + } + return NULL; } +/** + * @brief dap_proc_thread_assign_on_worker_inter + * @param a_thread + * @param a_worker + * @param a_esocket + * @return + */ +bool dap_proc_thread_assign_on_worker_inter(dap_proc_thread_t * a_thread, dap_worker_t * a_worker, dap_events_socket_t *a_esocket ) +{ + dap_events_socket_assign_on_worker_inter(a_thread->queue_assign_input[a_worker->id], a_esocket); + +#ifdef DAP_EVENTS_CAPS_EPOLL + struct epoll_event l_ev; + l_ev.events = a_esocket->ev_base_flags | EPOLLOUT ; + l_ev.data.ptr = a_esocket; + if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_MOD, e_socket->fd , &l_ev) != 0 ){ + log_it(L_ERROR, "Can't update queue_ptr on epoll ctl on proc thread"); + return false; + } +#elif defined ( DAP_EVENTS_CAPS_POLL) + a_thread->poll[a_esocket->poll_index].events |= POLLOUT; +#else +#error "Not implemented poll/epoll here" +#endif + + return true; +} + diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 3f67a46156..414c63079d 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -110,6 +110,8 @@ void *dap_worker_thread(void *arg) l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_io_callback); l_worker->queue_es_reassign = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_reassign_callback ); l_worker->queue_callback= dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_callback_callback); + + l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback ); l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker, s_connection_timeout * 1000 / 2, s_socket_all_check_activity, l_worker, true ); @@ -395,6 +397,20 @@ void *dap_worker_thread(void *arg) (struct sockaddr *)&l_cur->remote_addr, sizeof(l_cur->remote_addr)); l_errno = errno; break; + case DESCRIPTOR_TYPE_QUEUE: + if (l_cur->flags & DAP_SOCK_QUEUE_PTR){ +#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) + + l_bytes_sent = write(l_cur->socket, (char *) (l_cur->buf_out + l_bytes_sent), + sizeof (void *) ); // We send pointer by pointer +#elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX) + l_bytes_sent = mq_send(a_es->mqd, (const char *)&a_arg,sizeof (a_arg),0); +#else +#error "Not implemented dap_events_socket_queue_ptr_send() for this platform" +#endif + l_errno = errno; + break; + } case DESCRIPTOR_TYPE_PIPE: case DESCRIPTOR_TYPE_FILE: l_bytes_sent = write(l_cur->socket, (char *) (l_cur->buf_out + l_bytes_sent), @@ -681,6 +697,22 @@ void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_wor } } +/** + * @brief dap_worker_add_events_socket_inter + * @param a_es_input + * @param a_events_socket + */ +void dap_worker_add_events_socket_inter(dap_events_socket_t * a_es_input, dap_events_socket_t * a_events_socket) +{ + if( dap_events_socket_queue_ptr_send_to_input( a_es_input, a_events_socket ) != sizeof (a_events_socket) ){ + int l_errno = errno; + char l_errbuf[128]; + *l_errbuf = 0; + strerror_r(l_errno,l_errbuf,sizeof (l_errbuf)); + log_it(L_ERROR, "Cant send pointer to interthread queue input: \"%s\"(code %d)", l_errbuf, l_errno); + } +} + /** * @brief dap_worker_add_events_socket_unsafe * @param a_worker diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 3e7d44854f..7dd192535a 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -160,7 +160,7 @@ typedef struct dap_events_socket { // Output section - uint8_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data + byte_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data size_t buf_out_size; // size of data that is in the output buffer dap_events_socket_t * pipe_out; // Pipe socket with data for output @@ -196,6 +196,7 @@ typedef struct dap_events_socket { time_t last_ping_request; void *_inheritor; // Inheritor data to specific client type, usualy states for state machine + void *_pvt; //Private section, different for different types struct dap_events_socket * me; // pointer on itself UT_hash_handle hh; @@ -215,7 +216,12 @@ void dap_events_socket_event_proc_input_unsafe(dap_events_socket_t *a_esocket); dap_events_socket_t * dap_events_socket_create_type_pipe_unsafe(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags); dap_events_socket_t * dap_events_socket_create_type_pipe_mt(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags); + +dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket_t* a_es); +int dap_events_socket_queue_ptr_send_to_input( dap_events_socket_t * a_es, void* a_arg); int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg); + + int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value); void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_preserve_inheritor); @@ -226,6 +232,7 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da int a_sock, dap_events_socket_callbacks_t *a_callbacks ); void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct dap_worker * a_worker); +void dap_events_socket_assign_on_worker_inter(dap_events_socket_t * a_es_input, dap_events_socket_t * a_es); void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, dap_events_socket_t * a_es, dap_worker_t * a_worker_new); void dap_events_socket_reassign_between_workers_unsafe(dap_events_socket_t * a_es, dap_worker_t * a_worker_new); @@ -247,8 +254,13 @@ size_t dap_events_socket_write_f_unsafe(dap_events_socket_t *sc, const char * fo // MT variants less void dap_events_socket_set_readable_mt(dap_worker_t * a_w, dap_events_socket_t * a_es,bool a_is_ready); void dap_events_socket_set_writable_mt(dap_worker_t * a_w, dap_events_socket_t * a_es,bool a_is_ready); -size_t dap_events_socket_write_mt(dap_worker_t * a_w, dap_events_socket_t *sc, const void * data, size_t data_size); -size_t dap_events_socket_write_f_mt(dap_worker_t * a_w, dap_events_socket_t *sc, const char * format,...); + +size_t dap_events_socket_write_mt(dap_worker_t * a_w, dap_events_socket_t *a_es, const void * a_data, size_t a_data_size); +size_t dap_events_socket_write_f_mt(dap_worker_t * a_w, dap_events_socket_t *a_es, const char * a_format,...); + +size_t dap_events_socket_write_inter(dap_events_socket_t * a_es_input, dap_events_socket_t *a_es, const void * a_data, size_t a_data_size); +size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_events_socket_t *a_es, const char * a_format,...); + void dap_events_socket_remove_and_delete_mt( dap_worker_t * a_w, dap_events_socket_t* a_es); void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool preserve_inheritor ); diff --git a/dap-sdk/net/core/include/dap_proc_queue.h b/dap-sdk/net/core/include/dap_proc_queue.h index 097a44c179..0807f49546 100644 --- a/dap-sdk/net/core/include/dap_proc_queue.h +++ b/dap-sdk/net/core/include/dap_proc_queue.h @@ -43,4 +43,5 @@ dap_proc_queue_t * dap_proc_queue_create(dap_proc_thread_t * a_thread); void dap_proc_queue_delete(dap_proc_queue_t * a_queue); void dap_proc_queue_add_callback(dap_worker_t * a_worker, dap_proc_queue_callback_t a_callback, void * a_callback_arg); +void dap_proc_queue_add_callback_inter( dap_events_socket_t * a_es_input, dap_proc_queue_callback_t a_callback, void * a_callback_arg); diff --git a/dap-sdk/net/core/include/dap_proc_thread.h b/dap-sdk/net/core/include/dap_proc_thread.h index b637cc8159..b269e0d1e0 100644 --- a/dap-sdk/net/core/include/dap_proc_thread.h +++ b/dap-sdk/net/core/include/dap_proc_thread.h @@ -33,6 +33,9 @@ typedef struct dap_proc_thread{ pthread_t thread_id; dap_proc_queue_t * proc_queue; dap_events_socket_t * proc_event; // Should be armed if we have to deal with it + + dap_events_socket_t ** queue_assign_input; // Inputs for assign queues + dap_events_socket_t ** queue_io_input; // Inputs for assign queues atomic_uint proc_queue_size; pthread_cond_t started_cond; @@ -42,8 +45,13 @@ typedef struct dap_proc_thread{ #ifdef DAP_EVENTS_CAPS_EPOLL EPOLL_HANDLE epoll_ctl; + struct epoll_event epoll_events[DAP_EVENTS_SOCKET_MAX]; #elif defined (DAP_EVENTS_CAPS_POLL) int poll_fd; + struct pollfd * poll; + dap_events_socket_t ** esockets; + size_t poll_count; + size_t poll_count_max; #else #error "No poll for proc thread for your platform" #endif @@ -52,3 +60,4 @@ typedef struct dap_proc_thread{ int dap_proc_thread_init(uint32_t a_threads_count); dap_proc_thread_t * dap_proc_thread_get(uint32_t a_thread_number); dap_proc_thread_t * dap_proc_thread_get_auto(); +bool dap_proc_thread_assign_on_worker_inter(dap_proc_thread_t * a_thread, dap_worker_t * a_worker, dap_events_socket_t *a_esocket ); diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/net/core/include/dap_worker.h index f8c88d66a6..a19ef1c393 100644 --- a/dap-sdk/net/core/include/dap_worker.h +++ b/dap-sdk/net/core/include/dap_worker.h @@ -33,16 +33,20 @@ typedef struct dap_worker uint32_t id; dap_events_t* events; dap_proc_queue_t* proc_queue; + dap_events_socket_t *proc_queue_input; + atomic_uint event_sockets_count; dap_events_socket_t *esockets; // Hashmap of event sockets // Signal to exit bool signal_exit; // worker control queues - dap_events_socket_t * queue_es_new; // Events socket for new socket - dap_events_socket_t * queue_es_delete; // Events socke - dap_events_socket_t * queue_es_reassign; // Reassign between workers - dap_events_socket_t * queue_es_io; // Events socket for new socket + dap_events_socket_t * queue_es_new; // Queue socket for new socket + + dap_events_socket_t * queue_es_delete; // Queue socke + dap_events_socket_t * queue_es_reassign; // Queue for reassign between workers + dap_events_socket_t * queue_es_io; // Queue socket for io ops + dap_events_socket_t ** queue_es_io_input; // Queue socket for io ops between workers dap_events_socket_t * event_exit; // Events socket for exit dap_events_socket_t * queue_callback; // Queue for pure callback on worker @@ -90,6 +94,7 @@ void dap_worker_deinit(); int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_worker_t * a_worker); void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_worker_t * a_worker); +void dap_worker_add_events_socket_inter(dap_events_socket_t * a_es_input, dap_events_socket_t * a_events_socket); dap_worker_t *dap_worker_add_events_socket_auto( dap_events_socket_t * a_events_socket ); void dap_worker_exec_callback_on(dap_worker_t * a_worker, dap_worker_callback_t a_callback, void * a_arg); diff --git a/dap-sdk/net/server/http_server/dap_http_simple.c b/dap-sdk/net/server/http_server/dap_http_simple.c index 17f0cf83bc..60abea7f92 100644 --- a/dap-sdk/net/server/http_server/dap_http_simple.c +++ b/dap-sdk/net/server/http_server/dap_http_simple.c @@ -292,7 +292,7 @@ bool s_proc_queue_callback(dap_proc_thread_t * a_thread, void * a_arg ) const char error_msg[] = "Not found User-Agent HTTP header"; _write_response_bad_request(l_http_simple, error_msg); _set_only_write_http_client_state( l_http_simple->http_client); - dap_events_socket_assign_on_worker_mt(l_http_simple->esocket, l_http_simple->worker); + dap_proc_thread_assign_on_worker_inter(a_thread, l_http_simple->worker, l_http_simple->esocket ); return true; } @@ -301,7 +301,7 @@ bool s_proc_queue_callback(dap_proc_thread_t * a_thread, void * a_arg ) const char* error_msg = "User-Agent version not supported. Update your software"; _write_response_bad_request(l_http_simple, error_msg); _set_only_write_http_client_state( l_http_simple->http_client); - dap_events_socket_assign_on_worker_mt(l_http_simple->esocket, l_http_simple->worker); + dap_proc_thread_assign_on_worker_inter(a_thread, l_http_simple->worker, l_http_simple->esocket ); return true; } } @@ -318,7 +318,7 @@ bool s_proc_queue_callback(dap_proc_thread_t * a_thread, void * a_arg ) } _set_only_write_http_client_state( l_http_simple->http_client); - dap_events_socket_assign_on_worker_mt(l_http_simple->esocket, l_http_simple->worker); + dap_proc_thread_assign_on_worker_inter(a_thread, l_http_simple->worker, l_http_simple->esocket ); return true; } @@ -354,7 +354,7 @@ static void s_http_client_headers_read( dap_http_client_t *a_http_client, void * } else { log_it( L_DEBUG, "No data section, execution proc callback" ); dap_events_socket_remove_from_worker_unsafe(a_http_client->esocket,a_http_client->esocket->worker); - dap_proc_queue_add_callback( l_http_simple->worker, s_proc_queue_callback, l_http_simple); + dap_proc_queue_add_callback_inter( l_http_simple->worker->proc_queue_input, s_proc_queue_callback, l_http_simple); } } @@ -417,7 +417,7 @@ void s_http_client_data_read( dap_http_client_t *a_http_client, void * a_arg ) // bool isOK=true; log_it( L_INFO,"Data for http_simple_request collected" ); dap_events_socket_remove_from_worker_unsafe(a_http_client->esocket,a_http_client->esocket->worker); - dap_proc_queue_add_callback( l_http_simple->worker , s_proc_queue_callback, l_http_simple); + dap_proc_queue_add_callback_inter( l_http_simple->worker->proc_queue_input , s_proc_queue_callback, l_http_simple); } } diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 386db368a3..01bab756e8 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -177,7 +177,7 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) NULL, 0, l_ch_chain->callback_notify_arg); } dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); - dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); + dap_proc_thread_assign_on_worker_inter(a_thread, l_ch->stream_worker->worker, l_ch->stream->esocket ); return true; } @@ -222,7 +222,7 @@ bool s_sync_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg) // go to send data from list [in s_stream_ch_packet_out()] // no data to send -> send one empty message DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); - dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); + dap_proc_thread_assign_on_worker_inter(a_thread, l_ch->stream_worker->worker, l_ch->stream->esocket ); return true; } @@ -322,7 +322,7 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) DAP_DELETE(l_pkt_copy_list); }else log_it(L_WARNING, "In proc thread got CHAINS stream ch packet with zero data"); - dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); + dap_proc_thread_assign_on_worker_inter(a_thread, l_ch->stream_worker->worker, l_ch->stream->esocket ); return true; } @@ -426,7 +426,7 @@ bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) } else { log_it(L_WARNING, "In proc thread got GDB stream ch packet with zero data"); } - dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); + dap_proc_thread_assign_on_worker_inter(a_thread, l_ch->stream_worker->worker, l_ch->stream->esocket ); return true; } @@ -513,7 +513,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); } dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_sync_chains_callback, a_ch); + dap_proc_queue_add_callback_inter( a_ch->stream_worker->worker->proc_queue_input, s_sync_chains_callback, a_ch); } } } @@ -535,7 +535,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size); memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_sync_gdb_callback, a_ch); + dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_gdb_callback, a_ch); } else { log_it(L_ERROR, "Get DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB session_id=%u bad request", @@ -567,7 +567,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_pkt_copy->pkt_data_size = l_chain_pkt_data_size; l_ch_chain->pkt_copy_list = dap_list_append(l_ch_chain->pkt_copy_list, l_pkt_copy); dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_chain_pkt_callback, a_ch); + dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_chain_pkt_callback, a_ch); } else { log_it(L_WARNING, "Empty chain packet"); dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, @@ -596,7 +596,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_pkt_copy->pkt_data_size = l_chain_pkt_data_size; l_ch_chain->pkt_copy_list = dap_list_append(l_ch_chain->pkt_copy_list, l_pkt_copy); dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_gdb_pkt_callback, a_ch); + dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_gdb_pkt_callback, a_ch); } else { log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size"); dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, @@ -759,7 +759,7 @@ static bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) if (l_ch->stream->esocket->buf_out_size) { dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); } - dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); + dap_proc_thread_assign_on_worker_inter(a_thread, l_ch->stream_worker->worker, l_ch->stream->esocket ); return true; } return false; @@ -780,6 +780,6 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); if (l_ch_chain && l_ch_chain->state != CHAIN_STATE_IDLE) { dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_out_pkt_callback, a_ch); + dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_out_pkt_callback, a_ch); } } -- GitLab