/* * Authors: * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> * DeM Labs Ltd. https://demlabs.net * Copyright (c) 2020 * All rights reserved. This file is part of DAP SDK the open source project DAP SDK is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. DAP SDK is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>. */ #include <assert.h> #include <errno.h> #include "dap_server.h" #if defined(DAP_EVENTS_CAPS_EPOLL) && !defined(DAP_OS_WINDOWS) #include <sys/epoll.h> #elif defined DAP_OS_WINDOWS #include "wepoll.h" #elif defined (DAP_EVENTS_CAPS_POLL) #include <poll.h> #else #error "Unimplemented poll for this platform" #endif #include "dap_config.h" #include "dap_events.h" #include "dap_events_socket.h" #include "dap_proc_thread.h" #define LOG_TAG "dap_proc_thread" static size_t s_threads_count = 0; static bool s_debug_reactor = false; static dap_proc_thread_t * s_threads = NULL; static void * s_proc_thread_function(void * a_arg); /** * @brief dap_proc_thread_init * @param a_cpu_count 0 means autodetect * @return */ int dap_proc_thread_init(uint32_t a_threads_count){ s_threads_count = a_threads_count ? a_threads_count : dap_get_cpu_count( ); s_threads = DAP_NEW_Z_SIZE(dap_proc_thread_t, sizeof (dap_proc_thread_t)* s_threads_count); s_debug_reactor = g_config? dap_config_get_item_bool_default(g_config,"general","debug_reactor",false) : false; for (size_t i = 0; i < s_threads_count; i++ ){ s_threads[i].cpu_id = i; pthread_cond_init( &s_threads[i].started_cond, NULL ); pthread_mutex_init( &s_threads[i].started_mutex, NULL ); pthread_mutex_lock( &s_threads[i].started_mutex ); int res = pthread_create( &s_threads[i].thread_id,NULL, s_proc_thread_function, &s_threads[i] ); if (res) { log_it(L_CRITICAL, "Create thread failed with code %d", res); pthread_mutex_unlock( &s_threads[i].started_mutex ); return -1; } pthread_cond_wait( &s_threads[i].started_cond, &s_threads[i].started_mutex ); pthread_mutex_unlock( &s_threads[i].started_mutex ); } return 0; } /** * @brief dap_proc_thread_deinit */ void dap_proc_thread_deinit() { // Signal to cancel working threads and wait for finish for (size_t i = 0; i < s_threads_count; i++ ){ pthread_cancel(s_threads[i].thread_id); pthread_join(s_threads[i].thread_id, NULL); } } /** * @brief dap_proc_thread_get * @param a_cpu_id * @return */ dap_proc_thread_t * dap_proc_thread_get(uint32_t a_cpu_id) { return a_cpu_id<s_threads_count? &s_threads[a_cpu_id] : NULL; } /** * @brief dap_proc_thread_get_auto * @return */ dap_proc_thread_t * dap_proc_thread_get_auto() { size_t l_id_min=0; size_t l_size_min=UINT32_MAX; for (size_t i = 0; i < s_threads_count; i++ ){ size_t l_queue_size = s_threads[i].proc_queue_size; if( l_queue_size < l_size_min ){ l_size_min = l_queue_size; l_id_min = i; } } return &s_threads[l_id_min]; } /** * @brief s_proc_event_callback * @param a_esocket * @param a_value */ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_value) { (void) a_value; // log_it(L_DEBUG, "--> Proc event callback start"); dap_proc_thread_t * l_thread = (dap_proc_thread_t *) a_esocket->_inheritor; dap_proc_queue_item_t * l_item = l_thread->proc_queue->item_first; dap_proc_queue_item_t * l_item_old = NULL; bool l_is_anybody_for_repeat=false; while(l_item){ // log_it(L_INFO, "Proc event callback: %p/%p", l_item->callback, l_item->callback_arg); bool l_is_finished = l_item->callback(l_thread, l_item->callback_arg); if (l_is_finished){ if ( l_item->prev ){ l_item->prev->next = l_item_old; } if(l_item_old){ l_item_old->prev = l_item->prev; if ( ! l_item->prev ) { // We deleted tail l_thread->proc_queue->item_last = l_item_old; } DAP_DELETE(l_item); l_item = l_item_old->prev; }else{ l_thread->proc_queue->item_first = l_item->prev; if ( l_item->prev){ l_item->prev->next = NULL; // Prev if it was - now its NULL }else l_thread->proc_queue->item_last = NULL; // NULL last item DAP_DELETE(l_item); l_item = l_thread->proc_queue->item_first; } // log_it(L_DEBUG, "Proc event finished"); }else{ // log_it(L_DEBUG, "Proc event not finished"); l_item_old = l_item; l_item=l_item->prev; } l_is_anybody_for_repeat = !l_is_finished; } if(l_is_anybody_for_repeat) // Arm event if we have smth to proc again dap_events_socket_event_signal(a_esocket,1); // log_it(L_DEBUG, "<-- Proc event callback end"); } /** * @brief s_update_poll_flags * @param a_thread * @param a_esocket * @return */ static int s_update_poll_flags(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket) { #ifdef DAP_EVENTS_CAPS_EPOLL u_int events = a_esocket->ev_base_flags; if( a_esocket->flags & DAP_SOCK_READY_TO_READ) { events |= EPOLLIN; #ifdef DAP_OS_WINDOWS events ^= EPOLLONESHOT; #endif } if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE) { events |= EPOLLOUT; #ifdef DAP_OS_WINDOWS events |= EPOLLONESHOT; #endif } a_esocket->ev.events = events; if( epoll_ctl(a_thread->epoll_ctl, EPOLL_CTL_MOD, a_esocket->socket, &a_esocket->ev) != 0 ){ #ifdef DAP_OS_WINDOWS errno = WSAGetLastError(); #endif log_it(L_CRITICAL, "Can't add proc queue on epoll ctl, err: %d", errno); return -1; } #elif defined (DAP_EVENTS_CAPS_POLL) a_thread->poll[a_esocket->poll_index].events= a_esocket->poll_base_flags; if( a_esocket->flags & DAP_SOCK_READY_TO_READ) a_thread->poll[a_esocket->poll_index].revents |= POLLIN; if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE) a_thread->poll[a_esocket->poll_index].revents |= POLLOUT; #else #error "Not defined dap_proc_thread.c::s_update_poll_flags() on your platform" #endif return 0; } /** * @brief dap_proc_thread_create_queue_ptr * @details Call this function as others only from safe situation, or, thats better, from a_thread's context * @param a_thread * @param a_callback * @return */ dap_events_socket_t * dap_proc_thread_create_queue_ptr(dap_proc_thread_t * a_thread, dap_events_socket_callback_queue_ptr_t a_callback) { dap_events_socket_t * l_es = dap_events_socket_create_type_queue_ptr_unsafe(NULL,a_callback); if(l_es == NULL) return NULL; l_es->proc_thread = a_thread; #ifdef DAP_EVENTS_CAPS_EPOLL l_es->ev.events = l_es->ev_base_flags ; l_es->ev.data.ptr = l_es; if( epoll_ctl(a_thread->epoll_ctl, EPOLL_CTL_ADD, l_es->socket, &l_es->ev) != 0 ){ #ifdef DAP_OS_WINDOWS errno = WSAGetLastError(); #endif log_it(L_CRITICAL, "Can't add queue input on epoll ctl, err: %d", errno); return NULL; } #elif defined(DAP_EVENTS_CAPS_POLL) l_es->poll_index = a_thread->poll_count; a_thread->poll[a_thread->poll_count].fd = l_es->fd; a_thread->poll[a_thread->poll_count].events = l_es->poll_base_flags; a_thread->esockets[a_thread->poll_count] = l_es; a_thread->poll_count++; #else #error "Not defined dap_proc_thread_create_queue_ptr() on your platform" #endif return l_es; } /** * @brief s_proc_thread_function * @param a_arg * @return */ static void * s_proc_thread_function(void * a_arg) { dap_proc_thread_t * l_thread = (dap_proc_thread_t*) a_arg; assert(l_thread); dap_cpu_assign_thread_on(l_thread->cpu_id); struct sched_param l_shed_params; l_shed_params.sched_priority = 0; #ifdef DAP_OS_WINDOWS if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST)) log_it(L_ERROR, "Couldn't set thread priority, err: %d", GetLastError()); #else pthread_setschedparam(pthread_self(),SCHED_BATCH ,&l_shed_params); #endif l_thread->proc_queue = dap_proc_queue_create(l_thread); // Init proc_queue for related worker dap_worker_t * l_worker_related = dap_events_worker_get(l_thread->cpu_id); assert(l_worker_related); l_worker_related->proc_queue = l_thread->proc_queue; l_worker_related->proc_queue_input = dap_events_socket_queue_ptr_create_input(l_worker_related->proc_queue->esocket); dap_events_socket_assign_on_worker_mt(l_worker_related->proc_queue_input,l_worker_related); l_thread->proc_event = dap_events_socket_create_type_event_unsafe(NULL, s_proc_event_callback); l_thread->proc_event->_inheritor = l_thread; // we pass thread through it size_t l_workers_count= dap_events_worker_get_count(); assert(l_workers_count); l_thread->queue_assign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)*l_workers_count ); l_thread->queue_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)*l_workers_count ); l_thread->queue_callback_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)*l_workers_count ); assert(l_thread->queue_assign_input); assert(l_thread->queue_io_input); for (size_t n=0; n<l_workers_count; n++){ dap_worker_t * l_worker =dap_events_worker_get(n); l_thread->queue_assign_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_new ); l_thread->queue_io_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_io ); l_thread->queue_callback_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_callback ); } #ifdef DAP_EVENTS_CAPS_EPOLL struct epoll_event l_epoll_events[ DAP_EVENTS_SOCKET_MAX]= { { 0 } }; // Create epoll ctl l_thread->epoll_ctl = epoll_create( DAP_EVENTS_SOCKET_MAX ); // add proc queue l_thread->proc_queue->esocket->ev.events = l_thread->proc_queue->esocket->ev_base_flags; l_thread->proc_queue->esocket->ev.data.ptr = l_thread->proc_queue->esocket; if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->proc_queue->esocket->socket , &l_thread->proc_queue->esocket->ev) != 0 ){ #ifdef DAP_OS_WINDOWS errno = WSAGetLastError(); #endif log_it(L_CRITICAL, "Can't add proc queue %d on epoll ctl, error", l_thread->proc_queue->esocket->socket, errno); return NULL; } // Add proc event l_thread->proc_event->ev.events = l_thread->proc_event->ev_base_flags ; l_thread->proc_event->ev.data.ptr = l_thread->proc_event; if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->proc_event->socket , &l_thread->proc_event->ev) != 0 ){ #ifdef DAP_OS_WINDOWS errno = WSAGetLastError(); #endif log_it(L_CRITICAL, "Can't add proc event on epoll ctl, err: %d", errno); return NULL; } for (size_t n = 0; n< dap_events_worker_get_count(); n++){ // Queue asssign l_thread->queue_assign_input[n]->ev.events = l_thread->queue_assign_input[n]->ev_base_flags ; l_thread->queue_assign_input[n]->ev.data.ptr = l_thread->queue_assign_input[n]; if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_assign_input[n]->socket, &l_thread->queue_assign_input[n]->ev) != 0 ){ #ifdef DAP_OS_WINDOWS errno = WSAGetLastError(); #endif log_it(L_CRITICAL, "Can't add queue input on epoll ctl, err: %d", errno); return NULL; } // Queue IO l_thread->queue_io_input[n]->ev.events = l_thread->queue_io_input[n]->ev_base_flags ; l_thread->queue_io_input[n]->ev.data.ptr = l_thread->queue_io_input[n]; if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_io_input[n]->fd , &l_thread->queue_io_input[n]->ev) != 0 ){ #ifdef DAP_OS_WINDOWS errno = WSAGetLastError(); #endif log_it(L_CRITICAL, "Can't add proc io input on epoll ctl, err: %d", errno); return NULL; } // Queue callback l_thread->queue_callback_input[n]->ev.events = l_thread->queue_callback_input[n]->ev_base_flags ; l_thread->queue_callback_input[n]->ev.data.ptr = l_thread->queue_callback_input[n]; if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_callback_input[n]->fd , &l_thread->queue_callback_input[n]->ev) != 0 ){ #ifdef DAP_OS_WINDOWS errno = WSAGetLastError(); #endif log_it(L_CRITICAL, "Can't add proc io input on epoll ctl, err: %d", errno); return NULL; } } #elif defined(DAP_EVENTS_CAPS_POLL) l_thread->poll_count_max = DAP_EVENTS_SOCKET_MAX; l_thread->poll_count = 0; bool l_poll_compress = false; l_thread->poll = DAP_NEW_Z_SIZE(struct pollfd,l_thread->poll_count_max *sizeof (*l_thread->poll)); l_thread->esockets = DAP_NEW_Z_SIZE(dap_events_socket_t*,l_thread->poll_count_max *sizeof (*l_thread->esockets)); // Add proc queue l_thread->poll[l_thread->poll_count].fd = l_thread->proc_queue->esocket->fd; l_thread->poll[l_thread->poll_count].events = l_thread->proc_queue->esocket->poll_base_flags; l_thread->esockets[l_thread->poll_count] = l_thread->proc_queue->esocket; l_thread->poll_count++; // Add proc event l_thread->poll[l_thread->poll_count].fd = l_thread->proc_event->fd; l_thread->poll[l_thread->poll_count].events = l_thread->proc_event->poll_base_flags; l_thread->esockets[l_thread->poll_count] = l_thread->proc_event; l_thread->poll_count++; for (size_t n = 0; n< dap_events_worker_get_count(); n++){ dap_events_socket_t * l_queue_assign_input = l_thread->queue_assign_input[n]; dap_events_socket_t * l_queue_io_input = l_thread->queue_io_input[n]; dap_events_socket_t * l_queue_callback_input = l_thread->queue_callback_input[n]; if (l_queue_assign_input&&l_queue_io_input){ // Queue assign input l_queue_assign_input->poll_index = l_thread->poll_count; l_thread->poll[l_thread->poll_count].fd = l_queue_assign_input->fd; l_thread->poll[l_thread->poll_count].events = l_queue_assign_input->poll_base_flags; l_thread->esockets[l_thread->poll_count] = l_queue_assign_input; l_thread->poll_count++; // Queue io input l_queue_io_input->poll_index = l_thread->poll_count; l_thread->poll[l_thread->poll_count].fd = l_queue_io_input->fd; l_thread->poll[l_thread->poll_count].events = l_queue_io_input->poll_base_flags; l_thread->esockets[l_thread->poll_count] = l_queue_io_input; l_thread->poll_count++; // Queue callback input l_queue_callback_input->poll_index = l_thread->poll_count; l_thread->poll[l_thread->poll_count].fd = l_queue_callback_input->fd; l_thread->poll[l_thread->poll_count].events = l_queue_callback_input->poll_base_flags; l_thread->esockets[l_thread->poll_count] = l_queue_callback_input; l_thread->poll_count++; } } #else #error "Unimplemented poll events analog for this platform" #endif //We've started! pthread_mutex_lock(&l_thread->started_mutex); pthread_mutex_unlock(&l_thread->started_mutex); pthread_cond_broadcast(&l_thread->started_cond); // Main loop while (! l_thread->signal_kill){ #ifdef DAP_EVENTS_CAPS_EPOLL //log_it(L_DEBUG, "Epoll_wait call"); int l_selected_sockets = epoll_wait(l_thread->epoll_ctl, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1); size_t l_sockets_max = (size_t)l_selected_sockets; #elif defined (DAP_EVENTS_CAPS_POLL) int l_selected_sockets = poll(l_thread->poll,l_thread->poll_count,-1); size_t l_sockets_max = l_thread->poll_count; #else #error "Unimplemented poll wait analog for this platform" #endif if(l_selected_sockets == -1) { if( errno == EINTR) continue; #if defined DAP_OS_UNIX int l_errno = errno; char l_errbuf[128]; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); log_it(L_ERROR, "Proc thread #%d got errno:\"%s\" (%d)", l_thread->cpu_id , l_errbuf, l_errno); #elif DAP_OS_WINDOWS log_it(L_ERROR, "Error occured on thread #%d, errno: %d", l_thread->cpu_id , errno); #endif break; } for(size_t n = 0; n < l_sockets_max; n++) { dap_events_socket_t * l_cur; bool l_flag_hup, l_flag_rdhup, l_flag_read, l_flag_write, l_flag_error, l_flag_nval,l_flag_pri,l_flag_msg; #ifdef DAP_EVENTS_CAPS_EPOLL l_cur = (dap_events_socket_t *) l_epoll_events[n].data.ptr; uint32_t l_cur_events = l_epoll_events[n].events; l_flag_hup = l_cur_events & EPOLLHUP; l_flag_rdhup = l_cur_events & EPOLLHUP; l_flag_write = l_cur_events & EPOLLOUT; l_flag_read = l_cur_events & EPOLLIN; l_flag_error = l_cur_events & EPOLLERR; l_flag_nval = false; l_flag_pri = false; l_flag_msg = false; #elif defined ( DAP_EVENTS_CAPS_POLL) if(n>=l_thread->poll_count){ log_it(L_WARNING,"selected_sockets(%d) is bigger then poll count (%u)", l_selected_sockets, l_thread->poll_count); break; } short l_cur_events = l_thread->poll[n].revents ; if (!l_cur_events) continue; l_cur = l_thread->esockets[n]; l_flag_hup = l_cur_events& POLLHUP; l_flag_rdhup = l_cur_events & POLLRDHUP; l_flag_write = (l_cur_events & POLLOUT) || (l_cur_events &POLLRDNORM)|| (l_cur_events &POLLRDBAND ) ; l_flag_read = l_cur_events & POLLIN || (l_cur_events &POLLWRNORM)|| (l_cur_events &POLLWRBAND ); l_flag_error = l_cur_events & POLLERR; l_flag_nval = l_cur_events & POLLNVAL; l_flag_pri = l_cur_events & POLLPRI; l_flag_msg = l_cur_events & POLLMSG; #else #error "Unimplemented fetch esocket after poll" #endif if(!l_cur) { log_it(L_ERROR, "dap_events_socket NULL"); continue; } if(s_debug_reactor) log_it(L_DEBUG, "Proc thread #%u esocket %p fd=%d type=%d flags=0x%0X (%s:%s:%s:%s:%s:%s:%s:%s)", l_thread->cpu_id, l_cur, l_cur->socket, l_cur->type, l_cur_events, l_flag_read?"read":"", l_flag_write?"write":"", l_flag_error?"error":"", l_flag_hup?"hup":"", l_flag_rdhup?"rdhup":"", l_flag_msg?"msg":"", l_flag_nval?"nval":"", l_flag_pri?"pri":""); //log_it(L_DEBUG,"Waked up esocket %p (socket %d) {read:%s,write:%s,error:%s} ", l_cur, l_cur->fd, // l_flag_read?"true":"false", l_flag_write?"true":"false", l_flag_error?"true":"false" ); time_t l_cur_time = time( NULL); l_cur->last_time_active = l_cur_time; if (l_flag_error){ #ifdef DAP_OS_WINDOWS int l_errno = WSAGetLastError(); #else int l_errno = errno; #endif char l_errbuf[128]; strerror_r(l_errno, l_errbuf,sizeof (l_errbuf)); log_it(L_ERROR,"Some error on proc thread #%u with %d socket: %s(%d)",l_thread->cpu_id, l_cur->socket, l_errbuf, l_errno); if(l_cur->callbacks.error_callback) l_cur->callbacks.error_callback(l_cur, errno); } if (l_flag_read ){ int32_t l_bytes_read = 0; switch (l_cur->type) { case DESCRIPTOR_TYPE_QUEUE: dap_events_socket_queue_proc_input_unsafe(l_cur); #ifdef DAP_OS_WINDOWS l_bytes_read = dap_recvfrom(l_cur->socket, NULL, 0); #endif break; case DESCRIPTOR_TYPE_EVENT: dap_events_socket_event_proc_input_unsafe (l_cur); break; default: log_it(L_ERROR, "Unprocessed descriptor type accepted in proc thread loop"); #ifdef DAP_OS_WINDOWS l_bytes_read = dap_recvfrom(l_cur->socket, NULL, 0); #endif break; } } if (l_flag_write ){ int l_errno=0; if (l_cur->buf_out_size){ ssize_t l_bytes_sent = -1; switch (l_cur->type) { case DESCRIPTOR_TYPE_QUEUE: if (l_cur->flags & DAP_SOCK_QUEUE_PTR){ #if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) l_bytes_sent = write(l_cur->socket, l_cur->buf_out, sizeof (void *) ); // We send pointer by pointer #elif defined DAP_EVENTS_CAPS_MSMQ DWORD l_mp_id = 0; MQMSGPROPS l_mps; MQPROPVARIANT l_mpvar[1]; MSGPROPID l_p_id[1]; HRESULT l_mstatus[1]; l_p_id[l_mp_id] = PROPID_M_BODY; l_mpvar[l_mp_id].vt = VT_VECTOR | VT_UI1; l_mpvar[l_mp_id].caub.pElems = l_cur->buf_out; l_mpvar[l_mp_id].caub.cElems = (u_long)l_cur->buf_out_size; l_mp_id++; l_mps.cProp = l_mp_id; l_mps.aPropID = l_p_id; l_mps.aPropVar = l_mpvar; l_mps.aStatus = l_mstatus; HRESULT hr = MQSendMessage(l_cur->mqh, &l_mps, MQ_NO_TRANSACTION); if (hr != MQ_OK) { log_it(L_ERROR, "An error occured on sending message to queue, errno: 0x%x", hr); break; } else { if(dap_sendto(l_cur->socket, l_cur->port, NULL, 0) == SOCKET_ERROR) { log_it(L_ERROR, "Write to sock error: %d", WSAGetLastError()); } l_cur->buf_out_size = 0; dap_events_socket_set_writable_unsafe(l_cur,false); break; } #elif defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE) volatile char * l_ptr = (char *) l_cur->buf_out; volatile void *l_ptr_in; memcpy(&l_ptr_in,l_ptr, sizeof (l_ptr_in) ); l_bytes_sent = mq_send(l_cur->mqd, l_ptr, sizeof (l_ptr),0); if (l_bytes_sent==0){ // log_it(L_DEBUG,"mq_send %p success", l_ptr_in); l_bytes_sent = sizeof (void *); }else if (l_bytes_sent == -1 && errno == EINVAL){ // To make compatible with other l_errno = EAGAIN; // non-blocking sockets // log_it(L_DEBUG,"mq_send %p EAGAIN", l_ptr_in); }else{ l_errno = errno; log_it(L_WARNING,"mq_send %p errno: %d", l_ptr_in, l_errno); } #else #error "Not implemented dap_events_socket_queue_ptr_send() for this platform" #endif //int l_errno = errno; break; }break; default: log_it(L_ERROR, "Dont process write flags for this socket %d in proc thread", l_cur->fd); } l_errno = errno; if(l_bytes_sent>0){ l_cur->buf_out_size -= l_bytes_sent; //log_it(L_DEBUG,"Sent %zd bytes out, left %zd in buf out", l_bytes_sent, l_cur->buf_out); if (l_cur->buf_out_size ){ // Shrink output buffer memmove(l_cur->buf_out, l_cur->buf_out+l_bytes_sent, l_cur->buf_out_size ); }else{ l_cur->flags ^= DAP_SOCK_READY_TO_WRITE; s_update_poll_flags(l_thread, l_cur); } } }else{ log_it(L_DEBUG,"(!) Write event receieved but nothing in buffer, switching off this flag"); l_cur->flags ^= DAP_SOCK_READY_TO_WRITE; s_update_poll_flags(l_thread, l_cur); } } if(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE){ #ifdef DAP_EVENTS_CAPS_EPOLL log_it(L_WARNING, "Deleting esocket %d from proc thread?...", l_cur->fd); if ( epoll_ctl( l_thread->epoll_ctl, EPOLL_CTL_DEL, l_cur->fd, &l_cur->ev) == -1 ) log_it( L_ERROR,"Can't remove event socket's handler from the epoll ctl" ); //else // log_it( L_DEBUG,"Removed epoll's event from proc thread #%u", l_thread->cpu_id ); if (l_cur->callbacks.delete_callback) l_cur->callbacks.delete_callback(l_cur, l_thread); if(l_cur->_inheritor) DAP_DELETE(l_cur->_inheritor); DAP_DELETE(l_cur); #elif defined (DAP_EVENTS_CAPS_POLL) l_thread->poll[n].fd = -1; l_poll_compress = true; #else #error "Unimplemented poll ctl analog for this platform" #endif } } #ifdef DAP_EVENTS_CAPS_POLL /***********************************************************/ /* If the compress_array flag was turned on, we need */ /* to squeeze together the array and decrement the number */ /* of file descriptors. We do not need to move back the */ /* events and revents fields because the events will always*/ /* be POLLIN in this case, and revents is output. */ /***********************************************************/ if ( l_poll_compress){ l_poll_compress = false; for (size_t i = 0; i < l_thread->poll_count ; i++) { if ( l_thread->poll[i].fd == -1){ for(size_t j = i; j +1 < l_thread->poll_count; j++){ l_thread->poll[j].fd = l_thread->poll[j+1].fd; l_thread->esockets[j] = l_thread->esockets[j+1]; if(l_thread->esockets[j]) l_thread->esockets[j]->poll_index = j; } i--; l_thread->poll_count--; } } } #endif } log_it(L_NOTICE, "Stop processing thread #%u", l_thread->cpu_id); // cleanip inputs for (size_t n=0; n<dap_events_worker_get_count(); n++){ dap_events_socket_delete_unsafe(l_thread->queue_assign_input[n], false); dap_events_socket_delete_unsafe(l_thread->queue_io_input[n], false); } return NULL; } /** * @brief dap_proc_thread_assign_on_worker_inter * @param a_thread * @param a_worker * @param a_esocket * @return */ bool dap_proc_thread_assign_on_worker_inter(dap_proc_thread_t * a_thread, dap_worker_t * a_worker, dap_events_socket_t *a_esocket ) { dap_events_socket_t * l_es_assign_input = a_thread->queue_assign_input[a_worker->id]; //log_it(L_DEBUG,"Remove esocket %p from proc thread and send it to worker #%u",a_esocket, a_worker->id); dap_events_socket_assign_on_worker_inter(l_es_assign_input, a_esocket); l_es_assign_input->flags |= DAP_SOCK_READY_TO_WRITE; s_update_poll_flags(a_thread, l_es_assign_input); return true; } /** * @brief dap_proc_thread_esocket_write_inter * @param a_thread * @param a_worker * @param a_esocket * @param a_data * @param a_data_size * @return */ int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_esocket, const void * a_data, size_t a_data_size) { dap_events_socket_t * l_es_io_input = a_thread->queue_io_input[a_worker->id]; dap_events_socket_write_inter(l_es_io_input,a_esocket, a_data, a_data_size); l_es_io_input->flags |= DAP_SOCK_READY_TO_WRITE; s_update_poll_flags(a_thread, l_es_io_input); return 0; } /** * @brief dap_proc_thread_esocket_write_f_inter * @param a_thread * @param a_worker * @param a_esocket * @param a_format * @return */ int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_esocket, const char * a_format,...) { va_list ap, ap_copy; va_start(ap,a_format); va_copy(ap_copy, ap); int l_data_size = dap_vsnprintf(NULL,0,a_format,ap); va_end(ap); if (l_data_size <0 ){ log_it(L_ERROR,"Can't write out formatted data '%s' with values",a_format); va_end(ap_copy); return 0; } dap_events_socket_t * l_es_io_input = a_thread->queue_io_input[a_worker->id]; char * l_data = DAP_NEW_SIZE(char,l_data_size+1); if (!l_data) return -1; l_data_size = dap_vsprintf(l_data,a_format,ap_copy); va_end(ap_copy); dap_events_socket_write_inter(l_es_io_input,a_esocket, l_data, l_data_size); l_es_io_input->flags |= DAP_SOCK_READY_TO_WRITE; s_update_poll_flags(a_thread, l_es_io_input); return 0; } /** * @brief dap_proc_thread_worker_exec_callback * @param a_thread * @param a_worker_id * @param a_callback * @param a_arg */ void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a_worker_id, dap_worker_callback_t a_callback, void * a_arg) { dap_worker_msg_callback_t * l_msg = DAP_NEW_Z(dap_worker_msg_callback_t); l_msg->callback = a_callback; l_msg->arg = a_arg; dap_events_socket_queue_ptr_send_to_input(a_thread->queue_callback_input[a_worker_id],l_msg ); a_thread->queue_callback_input[a_worker_id]->flags |= DAP_SOCK_READY_TO_WRITE; s_update_poll_flags(a_thread, a_thread->queue_callback_input[a_worker_id]); }