Skip to content
Snippets Groups Projects
dap_worker.c 32.93 KiB
/*
 * Authors:
 * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net>
 * DeM Labs Ltd.   https://demlabs.net
 * Copyright  (c) 2017
 * All rights reserved.

 This file is part of DAP SDK the open source project

    DAP SDK is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    DAP SDK is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with any DAP SDK based project.  If not, see <http://www.gnu.org/licenses/>.
*/
#include <time.h>
#include <errno.h>
#include <unistd.h>
#if ! defined (_GNU_SOURCE)
#define _GNU_SOURCE         /* See feature_test_macros(7) */
#endif
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>

#include "dap_common.h"
#include "dap_math_ops.h"
#include "dap_worker.h"
#include "dap_events.h"

#define LOG_TAG "dap_worker"


static time_t s_connection_timeout = 20000;


static void s_socket_all_check_activity( void * a_arg);
static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg);
static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg);
static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_arg);
static void s_queue_callback_callback( dap_events_socket_t * a_es, void * a_arg);
static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg);
static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags);

/**
 * @brief dap_worker_init
 * @param a_threads_count
 * @param conn_timeout
 * @return
 */
int dap_worker_init( size_t a_conn_timeout )
{
    if ( a_conn_timeout )
      s_connection_timeout = a_conn_timeout;
    return 0;
}

void dap_worker_deinit( )
{
}

/**
 * @brief dap_worker_thread
 * @param arg
 * @return
 */
void *dap_worker_thread(void *arg)
{
    dap_events_socket_t *l_cur;
    dap_worker_t *l_worker = (dap_worker_t *) arg;
    //time_t l_next_time_timeout_check = time( NULL) + s_connection_timeout / 2;
    uint32_t l_tn = l_worker->id;

    dap_cpu_assign_thread_on(l_worker->id);
    struct sched_param l_shed_params;
    l_shed_params.sched_priority = 0;
    pthread_setschedparam(pthread_self(),SCHED_FIFO ,&l_shed_params);

#ifdef DAP_EVENTS_CAPS_EPOLL
    struct epoll_event l_epoll_events[ DAP_EVENTS_SOCKET_MAX]= {{0}};
    log_it(L_INFO, "Worker #%d started with epoll fd %d and assigned to dedicated CPU unit", l_worker->id, l_worker->epoll_fd);
#elif defined(DAP_EVENTS_CAPS_POLL)
    l_worker->poll_count_max = DAP_EVENTS_SOCKET_MAX;
    l_worker->poll = DAP_NEW_Z_SIZE(struct pollfd,l_worker->poll_count_max*sizeof (struct pollfd));
    l_worker->poll_esocket = DAP_NEW_Z_SIZE(dap_events_socket_t*,l_worker->poll_count_max*sizeof (dap_events_socket_t*));
#else
#error "Unimplemented socket array for this platform"
#endif


    l_worker->queue_es_new = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_new_es_callback);
    l_worker->queue_es_delete = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_delete_es_callback);
    l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_io_callback);
    l_worker->queue_es_reassign = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_reassign_callback );
    l_worker->queue_callback= dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_callback_callback);
    l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback );
    l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker,s_connection_timeout / 2,s_socket_all_check_activity,l_worker);


    pthread_cond_broadcast(&l_worker->started_cond);
    bool s_loop_is_active = true;
    while(s_loop_is_active) {
#ifdef DAP_EVENTS_CAPS_EPOLL
        int l_selected_sockets = epoll_wait(l_worker->epoll_fd, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1);
        size_t l_sockets_max = l_selected_sockets;
#elif defined(DAP_EVENTS_CAPS_POLL)
        int l_selected_sockets = poll(l_worker->poll, l_worker->poll_count, -1);
        size_t l_sockets_max = l_worker->poll_count;
#else
#error "Unimplemented poll wait analog for this platform"
#endif
        if(l_selected_sockets == -1) {
            if( errno == EINTR)
                continue;
            int l_errno = errno;
            char l_errbuf[128];
            strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
            log_it(L_ERROR, "Worker thread %d got errno:\"%s\" (%d)", l_worker->id, l_errbuf, l_errno);
            break;
        }

        time_t l_cur_time = time( NULL);
        for(size_t n = 0; n < l_sockets_max; n++) {

            bool l_flag_hup, l_flag_rdhup, l_flag_read, l_flag_write, l_flag_error;
#ifdef DAP_EVENTS_CAPS_EPOLL
            l_cur = (dap_events_socket_t *) l_epoll_events[n].data.ptr;
            l_flag_hup = l_epoll_events[n].events & EPOLLHUP;
            l_flag_rdhup = l_epoll_events[n].events & EPOLLHUP;
            l_flag_write = l_epoll_events[n].events & EPOLLOUT;
            l_flag_read = l_epoll_events[n].events & EPOLLIN;
            l_flag_error = l_epoll_events[n].events & EPOLLERR;
#elif defined ( DAP_EVENTS_CAPS_POLL)
            short l_cur_events =l_worker->poll[n].revents;
            if (!l_cur_events) // No events for this socket
                continue;
            l_flag_hup =  l_cur_events& POLLHUP;
            l_flag_rdhup = l_cur_events & POLLHUP;
            l_flag_write = l_cur_events & POLLOUT;
            l_flag_read = l_cur_events & POLLIN;
            l_flag_error = l_cur_events & POLLERR;
            l_cur = l_worker->poll_esocket[n];
            //log_it(L_DEBUG, "flags: returned events 0x%0X requested events 0x%0X",l_worker->poll[n].revents,l_worker->poll[n].events );
#else
#error "Unimplemented fetch esocket after poll"
#endif
            if(!l_cur) {
                log_it(L_ERROR, "dap_events_socket NULL");
                continue;
            }
            l_cur->last_time_active = l_cur_time;
//            log_it(L_DEBUG, "Worker=%d fd=%d", l_worker->id, l_cur->socket);

            int l_sock_err = 0, l_sock_err_size = sizeof(l_sock_err);
            //connection already closed (EPOLLHUP - shutdown has been made in both directions)
            if( l_flag_hup) { // && events[n].events & EPOLLERR) {
                switch (l_cur->type ){
                    case DESCRIPTOR_TYPE_SOCKET_LISTENING:
                    case DESCRIPTOR_TYPE_SOCKET:
                        getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size);
                        //if(!(events[n].events & EPOLLIN))
                        //cur->no_close = false;
                        if (l_sock_err) {
                            dap_events_socket_set_readable_unsafe(l_cur, false);
                            dap_events_socket_set_writable_unsafe(l_cur, false);
                            l_cur->buf_out_size = 0;
                            l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
                            log_it(L_INFO, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err));
                        }
                    break;
                    default: log_it(L_WARNING, "Unimplemented EPOLLHUP for socket type %d", l_cur->type);
                }
            }

            if(l_flag_error) {
                switch (l_cur->type ){
                    case DESCRIPTOR_TYPE_SOCKET_LISTENING:
                    case DESCRIPTOR_TYPE_SOCKET:
                        getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size);
                        log_it(L_ERROR, "Socket error: %s", strerror(l_sock_err));
                    default: ;
                }
                dap_events_socket_set_readable_unsafe(l_cur, false);
                dap_events_socket_set_writable_unsafe(l_cur, false);
                l_cur->buf_out_size = 0;
                l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
                l_cur->callbacks.error_callback(l_cur, l_sock_err); // Call callback to process error event
            }

            if (l_flag_hup) {
                log_it(L_INFO, "Client socket disconnected");
                dap_events_socket_set_readable_unsafe(l_cur, false);
                dap_events_socket_set_writable_unsafe(l_cur, false);
                l_cur->buf_out_size = 0;
                l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;

            }

            if(l_flag_read) {

                //log_it(L_DEBUG, "Comes connection with type %d", l_cur->type);
                if(l_cur->buf_in_size == sizeof(l_cur->buf_in)) {
                    log_it(L_WARNING, "Buffer is full when there is smth to read. Its dropped!");
                    l_cur->buf_in_size = 0;
                }

                int32_t l_bytes_read = 0;
                int l_errno=0;
                bool l_must_read_smth = false;
                switch (l_cur->type) {
                    case DESCRIPTOR_TYPE_PIPE:
                    case DESCRIPTOR_TYPE_FILE:
                        l_must_read_smth = true;
                        l_bytes_read = read(l_cur->socket, (char *) (l_cur->buf_in + l_cur->buf_in_size),
                                sizeof(l_cur->buf_in) - l_cur->buf_in_size);
                        l_errno = errno;
                    break;
                    case DESCRIPTOR_TYPE_SOCKET:
                        l_must_read_smth = true;
                        l_bytes_read = recv(l_cur->fd, (char *) (l_cur->buf_in + l_cur->buf_in_size),
                                            sizeof(l_cur->buf_in) - l_cur->buf_in_size, 0);
                        l_errno = errno;
                    break;
                    case DESCRIPTOR_TYPE_SOCKET_UDP: {
                        l_must_read_smth = true;
                        socklen_t l_size = sizeof(l_cur->remote_addr);
                        l_bytes_read = recvfrom(l_cur->fd, (char *) (l_cur->buf_in + l_cur->buf_in_size),
                                                sizeof(l_cur->buf_in) - l_cur->buf_in_size, 0,
                                                (struct sockaddr *)&l_cur->remote_addr, &l_size);

                        l_errno = errno;
                    }
                    break;
                    case DESCRIPTOR_TYPE_SOCKET_LISTENING:
                        // Accept connection
                        if ( l_cur->callbacks.accept_callback){
                            struct sockaddr l_remote_addr;
                            socklen_t l_remote_addr_size= sizeof (l_remote_addr);
                            int l_remote_socket= accept(l_cur->socket ,&l_remote_addr,&l_remote_addr_size);
                            fcntl( l_remote_socket, F_SETFL, O_NONBLOCK);

                            int l_errno = errno;
                            if ( l_remote_socket == -1 ){
                                if( l_errno == EAGAIN || l_errno == EWOULDBLOCK){// Everything is good, we'll receive ACCEPT on next poll
                                    continue;
                                }else{
                                    char l_errbuf[128];
                                    strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
                                    log_it(L_WARNING,"accept() on socket %d error:\"%s\"(%d)",l_cur->socket, l_errbuf,l_errno);
                                    break;
                                }
                            }

                            l_cur->callbacks.accept_callback(l_cur,l_remote_socket,&l_remote_addr);
                        }else
                            log_it(L_ERROR,"No accept_callback on listening socket");
                    break;
                    case DESCRIPTOR_TYPE_TIMER:{
                        uint64_t val;
                        /* if we not reading data from socket, he triggered again */
                        read( l_cur->fd, &val, 8);
                        if (l_cur->callbacks.timer_callback)
                            l_cur->callbacks.timer_callback(l_cur);
                        else
                            log_it(L_ERROR, "Socket %d with timer callback fired, but callback is NULL ", l_cur->socket);

                    } break;
                    case DESCRIPTOR_TYPE_QUEUE:
                        dap_events_socket_queue_proc_input_unsafe(l_cur);
                    break;
                    case DESCRIPTOR_TYPE_EVENT:
                        dap_events_socket_event_proc_input_unsafe(l_cur);
                    break;
                }

                if (l_must_read_smth){ // Socket/Descriptor read
                    if(l_bytes_read > 0) {
                        l_cur->buf_in_size += l_bytes_read;
                        //log_it(L_DEBUG, "Received %d bytes", l_bytes_read);
                        if(l_cur->callbacks.read_callback){
                            l_cur->callbacks.read_callback(l_cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well
                            if (l_cur->worker == NULL ){ // esocket was unassigned in callback, we don't need any ops with it now,
                                                         // continue to poll another esockets
                                continue;
                            }
                        }else{
                            log_it(L_WARNING, "We have incomming %u data but no read callback on socket %d, removing from read set",
                                   l_bytes_read, l_cur->socket);
                            dap_events_socket_set_readable_unsafe(l_cur,false);
                        }
                    }
                    else if(l_bytes_read < 0) {
                        if (l_errno != EAGAIN && l_errno != EWOULDBLOCK){ // Socket is blocked
                            log_it(L_ERROR, "Some error occured in recv() function: %s", strerror(errno));
                            dap_events_socket_set_readable_unsafe(l_cur, false);
                            l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
                            l_cur->buf_out_size = 0;
                        }
                    }
                    else if (! l_flag_rdhup || !l_flag_error) {
                        log_it(L_WARNING, "EPOLLIN triggered but nothing to read");
                        dap_events_socket_set_readable_unsafe(l_cur,false);
                    }
                }
            }

            // If its outgoing connection
            if ( l_flag_write &&  ! l_cur->server &&  l_cur->flags& DAP_SOCK_CONNECTING &&
                 (l_cur->type == DESCRIPTOR_TYPE_SOCKET || l_cur->type == DESCRIPTOR_TYPE_SOCKET_UDP )){
                int l_error = 0;
                socklen_t l_error_len = sizeof(l_error);
                char l_error_buf[128];
                l_error_buf[0]='\0';
                getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, &l_error, &l_error_len);
                if (l_error){
                    strerror_r(l_error, l_error_buf, sizeof (l_error_buf));
                    log_it(L_ERROR,"Connecting error with %s: \"%s\" (code %d)", l_cur->remote_addr_str[0]? l_cur->remote_addr_str: "(NULL)"
                                                                                 ,
                           l_error_buf, l_error);
                    if ( l_cur->callbacks.error_callback )
                        l_cur->callbacks.error_callback(l_cur, l_error);
                }else if(l_error == EINPROGRESS) {
                    log_it(L_DEBUG, "Connecting with %s in progress...", l_cur->remote_addr_str[0]? l_cur->remote_addr_str: "(NULL)");
                }else{
                    log_it(L_NOTICE, "Connected with %s",l_cur->remote_addr_str[0]? l_cur->remote_addr_str: "(NULL)");
                    l_cur->flags ^= DAP_SOCK_CONNECTING;
                    if (l_cur->callbacks.connected_callback)
                        l_cur->callbacks.connected_callback(l_cur);
                }
            }

            // Socket is ready to write and not going to close
            if(   ( l_flag_write&&(l_cur->flags & DAP_SOCK_READY_TO_WRITE) ) ||
                 (    (l_cur->flags & DAP_SOCK_READY_TO_WRITE) && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) ) ) {
                //log_it(L_DEBUG, "Main loop output: %u bytes to send", l_cur->buf_out_size);
                if(l_cur->callbacks.write_callback)
                    l_cur->callbacks.write_callback(l_cur, NULL); // Call callback to process write event

                if ( l_cur->worker ){ // esocket wasn't unassigned in callback, we need some other ops with it
                    if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) {

                        static const uint32_t buf_out_zero_count_max = 2;
                        //l_cur->buf_out[l_cur->buf_out_size] = 0;

                        if(!l_cur->buf_out_size) {

                            //log_it(L_WARNING, "Output: nothing to send. Why we are in write socket set?");
                            l_cur->buf_out_zero_count++;

                            if(l_cur->buf_out_zero_count > buf_out_zero_count_max) { // How many time buf_out on write event could be empty
                                //log_it(L_WARNING, "Output: nothing to send %u times, remove socket from the write set",
                                //        buf_out_zero_count_max);
                                dap_events_socket_set_writable_unsafe(l_cur, false);
                            }
                        }
                        else
                            l_cur->buf_out_zero_count = 0;
                    }
                    //for(total_sent = 0; total_sent < cur->buf_out_size;) { // If after callback there is smth to send - we do it
                    ssize_t l_bytes_sent =0;
                    int l_errno;
                    switch (l_cur->type){
                        case DESCRIPTOR_TYPE_SOCKET:
                            l_bytes_sent = send(l_cur->socket, (const char *)l_cur->buf_out,
                                                l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL);
                            l_errno = errno;
                        break;
                        case DESCRIPTOR_TYPE_SOCKET_UDP:
                            l_bytes_sent = sendto(l_cur->socket, (const char *)l_cur->buf_out,
                                                  l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL,
                                                  (struct sockaddr *)&l_cur->remote_addr, sizeof(l_cur->remote_addr));
                            l_errno = errno;
                        break;
                        case DESCRIPTOR_TYPE_PIPE:
                        case DESCRIPTOR_TYPE_FILE:
                            l_bytes_sent = write(l_cur->socket, (char *) (l_cur->buf_out + l_bytes_sent),
                                    l_cur->buf_out_size );
                            l_errno = errno;
                        break;
                        default:
                            log_it(L_WARNING, "Socket %d is not SOCKET, PIPE or FILE but has WRITE state on. Switching it off");
                            dap_events_socket_set_writable_unsafe(l_cur,false);
                    }

                    if(l_bytes_sent < 0) {
                        if (l_errno != EAGAIN && l_errno != EWOULDBLOCK ){ // If we have non-blocking socket
                            log_it(L_ERROR, "Some error occured in send(): %s", strerror(errno));
                            l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
                            l_cur->buf_out_size = 0;
                        }
                    }else{
                        //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", l_bytes_sent,l_cur->buf_out_size);
                        if (l_bytes_sent) {
                            if ( l_bytes_sent <= (ssize_t) l_cur->buf_out_size ){
                                l_cur->buf_out_size -= l_bytes_sent;
                                if (l_cur->buf_out_size ) {
                                    memmove(l_cur->buf_out, &l_cur->buf_out[l_bytes_sent], l_cur->buf_out_size);
                                }
                            }else{
                                log_it(L_ERROR, "Wrong bytes sent, %zd more then was in buffer %zd",l_bytes_sent, l_cur->buf_out_size);
                                l_cur->buf_out_size = 0;
                            }
                        }
                    }
                }
            }
            if (l_cur->buf_out_size) {
                dap_events_socket_set_writable_unsafe(l_cur,true);
            }
            if((l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !l_cur->no_close  && l_cur->buf_out_size == 0) {
                // protect against double deletion
                l_cur->kill_signal = true;
                //dap_events_socket_remove_and_delete(cur, true);
                log_it(L_INFO, "Got signal to close %s, sock %u [thread %u]", l_cur->hostaddr, l_cur->socket, l_tn);
            } else if (l_cur->buf_out_size ){
                log_it(L_INFO, "Got signal to close %s, sock %u [thread %u] but buffer is not empty(%zd)", l_cur->hostaddr, l_cur->socket, l_tn,
                       l_cur->buf_out_size);
            }

            if(l_cur->kill_signal && l_cur->buf_out_size == 0) {
                log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ]", l_cur->socket, l_tn);
                dap_events_socket_remove_and_delete_unsafe( l_cur, false);
            }else if (l_cur->buf_out_size ){
                log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ] but buffer is not empty(%zd)", l_cur->socket, l_tn,
                       l_cur->buf_out_size);
            }


            if( l_worker->signal_exit){
                log_it(L_NOTICE,"Worker :%u finished", l_worker->id);
                return NULL;
            }
        }
#ifdef DAP_EVENTS_CAPS_POLL
      /***********************************************************/
       /* If the compress_array flag was turned on, we need       */
       /* to squeeze together the array and decrement the number  */
       /* of file descriptors. We do not need to move back the    */
       /* events and revents fields because the events will always*/
       /* be POLLIN in this case, and revents is output.          */
       /***********************************************************/
       if ( l_worker->poll_compress){
           l_worker->poll_compress = false;
           for (size_t i = 0; i < l_worker->poll_count ; i++)  {
               if ( l_worker->poll[i].fd == -1){
                   for(size_t j = i; j < l_worker->poll_count-1; j++){
                       l_worker->poll[j].fd = l_worker->poll[j+1].fd;
                       l_worker->poll_esocket[j] = l_worker->poll_esocket[j+1];
                       l_worker->poll_esocket[j]->poll_index = j;
                   }
                   i--;
                   l_worker->poll_count--;
               }
           }
       }
#endif
    } // while
    log_it(L_NOTICE,"Exiting thread #%u", l_worker->id);
    return NULL;
}

/**
 * @brief s_new_es_callback
 * @param a_es
 * @param a_arg
 */
static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg)
{
    dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg;
    dap_worker_t * w = a_es->worker;
    //log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new);
    if(dap_events_socket_check_unsafe( w, l_es_new)){
        //log_it(L_ERROR, "Already assigned %d (%p), you're doing smth wrong", l_es_new->socket, l_es_new);
        // Socket already present in worker, it's OK
        return;
    }

    switch( l_es_new->type){

        case DESCRIPTOR_TYPE_SOCKET_UDP:
        case DESCRIPTOR_TYPE_SOCKET:
        case DESCRIPTOR_TYPE_SOCKET_LISTENING:{
            int l_cpu = w->id;
            setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu));
        }break;
        default: {}
    }

    l_es_new->worker = w;
    // We need to differ new and reassigned esockets. If its new - is_initialized is false
    if ( ! l_es_new->is_initalized ){
        if (l_es_new->callbacks.new_callback)
            l_es_new->callbacks.new_callback(l_es_new, NULL);
        l_es_new->is_initalized = true;
    }

    if (l_es_new->socket>0){
        int l_ret = dap_worker_add_events_socket_unsafe(l_es_new,w);
        if (  l_ret != 0 ){
            log_it(L_CRITICAL,"Can't add event socket's handler to worker i/o poll mechanism with error %d", errno);
        }else{
            // Add in global list
            // Add in worker
            l_es_new->me = l_es_new;
            HASH_ADD(hh_worker, w->esockets, me, sizeof(void *), l_es_new );
            w->event_sockets_count++;
            //log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id);
            if (l_es_new->callbacks.worker_assign_callback)
                l_es_new->callbacks.worker_assign_callback(l_es_new, w);

        }
    }else{
        log_it(L_ERROR, "Incorrect socket %d after new callback. Dropping this handler out", l_es_new->socket);
        dap_events_socket_remove_and_delete_unsafe( l_es_new, false );
    }
}

/**
 * @brief s_delete_es_callback
 * @param a_es
 * @param a_arg
 */
static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg)
{
    dap_events_socket_t * l_esocket = (dap_events_socket_t*) a_arg;
    if (dap_events_socket_check_unsafe(a_es->worker,l_esocket)){
        ((dap_events_socket_t*)a_arg)->kill_signal = true; // Send signal to socket to kill
    }else
        log_it(L_INFO, "While we were sending the delete() message, esocket %p has been disconnected", l_esocket);
}

/**
 * @brief s_reassign_es_callback
 * @param a_es
 * @param a_arg
 */
static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_arg)
{
    dap_worker_msg_reassign_t * l_msg = (dap_worker_msg_reassign_t*) a_arg;
    dap_events_socket_t * l_es_reassign = l_msg->esocket;
    if (dap_events_socket_check_unsafe(a_es->worker,l_es_reassign)){
        if( l_es_reassign->was_reassigned && l_es_reassign->flags & DAP_SOCK_REASSIGN_ONCE) {
            log_it(L_INFO, "Reassgment request with DAP_SOCK_REASSIGN_ONCE allowed only once, declined reassigment from %u to %u",
                   l_es_reassign->worker->id, l_msg->worker_new->id);

        }else{
            dap_events_socket_reassign_between_workers_unsafe(l_es_reassign,l_msg->worker_new);
        }
    }else{
        log_it(L_INFO, "While we were sending the reassign message, esocket %p has been disconnected", l_msg->esocket);
    }
    DAP_DELETE(l_msg);
}

/**
 * @brief s_queue_callback
 * @param a_es
 * @param a_arg
 */
static void s_queue_callback_callback( dap_events_socket_t * a_es, void * a_arg)
{
    dap_worker_msg_callback_t * l_msg = (dap_worker_msg_callback_t *) a_arg;
    assert(l_msg);
    assert(l_msg->callback);
    l_msg->callback(a_es->worker, l_msg->arg);
    DAP_DELETE(l_msg);
}

/**
 * @brief s_event_exit_callback
 * @param a_es
 * @param a_flags
 */
static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags)
{
    (void) a_flags;
    a_es->worker->signal_exit = true;
    log_it(L_NOTICE, "Worker :%u signaled to exit", a_es->worker->id);
}

/**
 * @brief s_pipe_data_out_read_callback
 * @param a_es
 * @param a_arg
 */
static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg)
{
    dap_worker_msg_io_t * l_msg = a_arg;

    // Check if it was removed from the list
    dap_events_socket_t *l_msg_es = NULL;
    HASH_FIND(hh_worker, a_es->worker->esockets, &l_msg->esocket , sizeof (void*), l_msg_es );
    if ( l_msg_es == NULL){
        log_it(L_INFO, "We got i/o message for client thats now not in list. Lost %u data", l_msg->data_size);
        DAP_DELETE(l_msg);
        return;
    }
    if (l_msg->flags_set & DAP_SOCK_CONNECTING)
        if (!  (l_msg_es->flags & DAP_SOCK_CONNECTING) ){
            l_msg_es->flags |= DAP_SOCK_CONNECTING;
            dap_events_socket_worker_poll_update_unsafe(l_msg_es);
        }

    if (l_msg->flags_set & DAP_SOCK_CONNECTING)
        if (!  (l_msg_es->flags & DAP_SOCK_CONNECTING) ){
            l_msg_es->flags ^= DAP_SOCK_CONNECTING;
            dap_events_socket_worker_poll_update_unsafe(l_msg_es);
        }

    if (l_msg->flags_set & DAP_SOCK_READY_TO_READ)
        dap_events_socket_set_readable_unsafe(l_msg_es, true);
    if (l_msg->flags_unset & DAP_SOCK_READY_TO_READ)
        dap_events_socket_set_readable_unsafe(l_msg_es, false);
    if (l_msg->flags_set & DAP_SOCK_READY_TO_WRITE)
        dap_events_socket_set_writable_unsafe(l_msg_es, true);
    if (l_msg->flags_unset & DAP_SOCK_READY_TO_WRITE)
        dap_events_socket_set_writable_unsafe(l_msg_es, false);
    if (l_msg->data_size && l_msg->data)
        dap_events_socket_write_unsafe(l_msg_es, l_msg->data,l_msg->data_size);
    DAP_DELETE(l_msg);
}
/**
 * @brief s_socket_all_check_activity
 * @param a_arg
 */
static void s_socket_all_check_activity( void * a_arg)
{
    dap_worker_t *l_worker = (dap_worker_t*) a_arg;
    assert(l_worker);
    dap_events_socket_t *l_es, *tmp;
    char l_curtimebuf[64];
    time_t l_curtime= time(NULL);
    ctime_r(&l_curtime, l_curtimebuf);
    //log_it(L_DEBUG,"Check sockets activity on worker #%u at %s", l_worker->id, l_curtimebuf);

    HASH_ITER(hh_worker, l_worker->esockets, l_es, tmp ) {
        if ( l_es->type == DESCRIPTOR_TYPE_SOCKET  ){
            if ( !l_es->kill_signal && l_curtime >=  (time_t)l_es->last_time_active + s_connection_timeout && !l_es->no_close ) {
                log_it( L_INFO, "Socket %u timeout, closing...", l_es->socket );
                if (l_es->callbacks.error_callback) {
                    l_es->callbacks.error_callback(l_es, ETIMEDOUT);
                }
                dap_events_socket_remove_and_delete_mt( l_worker, l_es);
            }
        }
    }
}

/**
 * @brief sap_worker_add_events_socket
 * @param a_events_socket
 * @param a_worker
 */
void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_worker_t * a_worker)
{
    int l_ret = dap_events_socket_queue_ptr_send( a_worker->queue_es_new, a_events_socket );
    if(l_ret != 0 ){
        char l_errbuf[128];
        *l_errbuf = 0;
        strerror_r(l_ret,l_errbuf,sizeof (l_errbuf));
        log_it(L_ERROR, "Cant send pointer in queue: \"%s\"(code %d)", l_errbuf, l_ret);
    }
}

/**
 * @brief dap_worker_add_events_socket_unsafe
 * @param a_worker
 * @param a_esocket
 */
int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_worker_t * a_worker )
{

#ifdef DAP_EVENTS_CAPS_EPOLL
        // Init events for EPOLL
        a_esocket->ev.events = a_esocket->ev_base_flags ;
        if(a_esocket->flags & DAP_SOCK_READY_TO_READ )
            a_esocket->ev.events |= EPOLLIN;
        if(a_esocket->flags & DAP_SOCK_READY_TO_WRITE )
            a_esocket->ev.events |= EPOLLOUT;
        a_esocket->ev.data.ptr = a_esocket;
        return epoll_ctl(a_worker->epoll_fd, EPOLL_CTL_ADD, a_esocket->socket, &a_esocket->ev);
#elif defined (DAP_EVENTS_CAPS_POLL)
    if (  a_worker->poll_count == a_worker->poll_count_max ){ // realloc
        a_worker->poll_count_max *= 2;
        log_it(L_WARNING, "Too many descriptors (%u), resizing array twice to %u", a_worker->poll_count, a_worker->poll_count_max);
        a_worker->poll =DAP_REALLOC(a_worker->poll, a_worker->poll_count_max * sizeof(*a_worker->poll));
        a_worker->poll_esocket =DAP_REALLOC(a_worker->poll_esocket, a_worker->poll_count_max * sizeof(*a_worker->poll_esocket));
    }
    a_worker->poll[a_worker->poll_count].fd = a_esocket->socket;
    a_esocket->poll_index = a_worker->poll_count;
    a_worker->poll[a_worker->poll_count].events = a_esocket->poll_base_flags;
    if( a_esocket->flags & DAP_SOCK_READY_TO_READ )
        a_worker->poll[a_worker->poll_count].events |= POLLIN;
    if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE || a_esocket->flags & DAP_SOCK_CONNECTING )
        a_worker->poll[a_worker->poll_count].events |= POLLOUT;

    a_worker->poll_esocket[a_worker->poll_count] = a_esocket;
    a_worker->poll_count++;
    return 0;
#else
#error "Unimplemented new esocket on worker callback for current platform"
#endif

}

/**
 * @brief dap_worker_exec_callback_on
 */
void dap_worker_exec_callback_on(dap_worker_t * a_worker, dap_worker_callback_t a_callback, void * a_arg)
{
    dap_worker_msg_callback_t * l_msg = DAP_NEW_Z(dap_worker_msg_callback_t);
    l_msg->callback = a_callback;
    l_msg->arg = a_arg;
    int l_ret=dap_events_socket_queue_ptr_send( a_worker->queue_callback,l_msg );
    if(l_ret != 0 ){
        char l_errbuf[128];
        *l_errbuf = 0;
        strerror_r(l_ret,l_errbuf,sizeof (l_errbuf));
        log_it(L_ERROR, "Cant send pointer in queue: \"%s\"(code %d)", l_errbuf, l_ret);
    }

}


/**
 * @brief dap_worker_add_events_socket
 * @param a_worker
 * @param a_events_socket
 */
dap_worker_t *dap_worker_add_events_socket_auto( dap_events_socket_t *a_es)
{
//  struct epoll_event ev = {0};
  dap_worker_t *l_worker = dap_events_worker_get_auto( );

  a_es->events = l_worker->events;
  dap_worker_add_events_socket( a_es, l_worker);
  return l_worker;
}