From 642e28990274664936e87b1265832e3a94458fa0 Mon Sep 17 00:00:00 2001
From: armatusmiles <akurotych@gmail.com>
Date: Thu, 30 Aug 2018 16:55:24 +0300
Subject: [PATCH] [+] Multithread loop

---
 CMakeLists.txt |   2 +
 dap_server.c   | 198 +++++++++++++++++++++++++++++++++++--------------
 dap_server.h   |   3 +-
 3 files changed, 144 insertions(+), 59 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index a6dc514..97b0cb8 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,6 +1,8 @@
 cmake_minimum_required(VERSION 3.0)
 project (dap_core_server C)
 
+add_definitions ("-D_GNU_SOURCE")
+
 add_subdirectory(libdap)
 
 if(BUILD_DAP_CORE_SERVER_TESTS)
diff --git a/dap_server.c b/dap_server.c
index 1b7084f..58c0c4a 100644
--- a/dap_server.c
+++ b/dap_server.c
@@ -26,20 +26,16 @@
 #include <sys/select.h>
 
 #define NI_NUMERICHOST	1	/* Don't try to look up hostname.  */
-#define NI_NUMERICSERV 2	/* Don't convert port number to name.  */
+#define NI_NUMERICSERV  2	/* Don't convert port number to name.  */
 #define NI_NOFQDN	    4	/* Only return nodename portion.  */
-#define NI_NAMEREQD	8	/* Don't return numeric addresses.  */
+#define NI_NAMEREQD	    8	/* Don't return numeric addresses.  */
 #define NI_DGRAM	    16	/* Look up UDP service rather than TCP.  */
 
-#include <sys/epoll.h>
-
 #include <netdb.h>
 #include <unistd.h>
 #include <fcntl.h>
 #include <string.h>
 #include <time.h>
-
-#include <stdio.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <stddef.h>
@@ -50,27 +46,61 @@
 #include "dap_server.h"
 #include <ev.h>
 
-#define LOG_TAG "dap_server"
+#define LOG_TAG "server"
+
+static void read_write_cb (struct ev_loop* loop, struct ev_io* watcher, int revents);
 
-static void read_write_cb (struct ev_loop* _loop, struct ev_io* watcher, int revents);
+static struct ev_loop** listener_clients_loops;
+static ev_async* async_watchers;
 
-static struct ev_loop* listener_clients_loop;
-static ev_async async_watcher;
+static size_t _count_threads = 0;
 
 typedef struct ev_async_data
 {
     int client_fd;
+    int number_thread;
     dap_server_t *dap_server;
 } ev_async_data_t;
 
+static struct thread_information {
+    int thread_number;
+    int count_open_connections;
+} *thread_inform;
+
+static pthread_mutex_t mutex_set_client_thread_cb;
+static pthread_mutex_t mutex_on_cnt_connections;
+
+#define DAP_EV_DATA(a) ((ev_async_data_t*)a->data)
+
+// n - number thread, input opertor: ++ or --
+#define MUTEX_CNT_CONN(n, operator) pthread_mutex_lock(&mutex_on_cnt_connections); \
+                                    thread_inform[n].count_open_connections operator; \
+                                    pthread_mutex_unlock(&mutex_on_cnt_connections);
+
+
 /**
  * @brief dap_server_init Init server module
  * @return Zero if ok others if no
  */
-int dap_server_init()
+int dap_server_init(size_t count_threads)
 {
+    _count_threads = count_threads;
+
     signal(SIGPIPE, SIG_IGN);
-    async_watcher.data = malloc(sizeof(ev_async_data_t));
+
+    async_watchers = malloc(sizeof(ev_async) * _count_threads);
+    listener_clients_loops = malloc(sizeof(struct ev_loop*) * _count_threads);
+    thread_inform = malloc (sizeof(struct thread_information) * _count_threads);
+
+    for(size_t i = 0; i < _count_threads; i++)
+    {
+        thread_inform[i].thread_number = (int)i;
+        thread_inform[i].count_open_connections = 0;
+        async_watchers[i].data = malloc(sizeof(ev_async_data_t));
+    }
+
+    pthread_mutex_init(&mutex_set_client_thread_cb, NULL);
+    pthread_mutex_init(&mutex_on_cnt_connections, NULL);
 
     log_it(L_NOTICE,"Initialized socket server module");
 
@@ -82,6 +112,9 @@ int dap_server_init()
  */
 void dap_server_deinit()
 {
+    free(async_watchers);
+    free(listener_clients_loops);
+    free(thread_inform);
 }
 
 
@@ -121,30 +154,36 @@ int set_nonblock_socket(int fd)
     return fcntl(fd, F_SETFL, flags);
 }
 
-
-static void async_cb (EV_P_ ev_async *w, int revents)
+static void set_client_thread_cb (EV_P_ ev_async *w, int revents)
 {
-    int fd = ((ev_async_data_t*)w->data)->client_fd;
-    dap_server_t *sh = ((ev_async_data_t*)w->data)->dap_server;
+    pthread_mutex_lock(&mutex_set_client_thread_cb);
+
+    int fd = DAP_EV_DATA(w)->client_fd;
+    dap_server_t *sh = DAP_EV_DATA(w)->dap_server;
 
     struct ev_io* w_client = (struct ev_io*) malloc (sizeof(struct ev_io));
 
     ev_io_init(w_client, read_write_cb, fd, EV_READ);
     ev_io_set(w_client, fd, EV_READ  | EV_WRITE);
-    w_client->data = ((ev_async_data_t*)w->data)->dap_server;
+    w_client->data = malloc(sizeof(ev_async_data_t));
+
+    memcpy(w_client->data, w->data, sizeof(ev_async_data_t));
 
     dap_client_create(sh, fd, w_client);
 
-    ev_io_start(listener_clients_loop, w_client);
+    ev_io_start(listener_clients_loops[DAP_EV_DATA(w)->number_thread], w_client);
+
+    pthread_mutex_unlock(&mutex_set_client_thread_cb);
 }
 
 static void read_write_cb (struct ev_loop* loop, struct ev_io* watcher, int revents)
 {
-    dap_server_t* sh = watcher->data;
+    dap_server_t* sh = DAP_EV_DATA(watcher)->dap_server;
     dap_server_client_t* dap_cur = dap_client_find(watcher->fd, sh);
 
     if ( revents & EV_READ )
     {
+    //    log_it(INFO, "socket read %d thread %d", watcher->fd, thread);
         if(dap_cur)
         {
             ssize_t bytes_read = recv(dap_cur->socket,
@@ -153,15 +192,14 @@ static void read_write_cb (struct ev_loop* loop, struct ev_io* watcher, int reve
                                   0);
             if(bytes_read > 0)
             {
-                dap_cur->upload_stat.buf_size_total += bytes_read;
-                dap_cur->buf_in_size += bytes_read;
-                sh->client_read_callback(dap_cur, NULL);
+                dap_cur->buf_in_size += (size_t)bytes_read;
+                sh->client_read_callback(dap_cur,NULL);
             }
             else if(bytes_read < 0)
             {
                 log_it(L_ERROR,"Bytes read Error %s",strerror(errno));
-                dap_cur->signal_close = true;
-
+                if ( strcmp(strerror(errno),"Resource temporarily unavailable") != 0 )
+                    dap_cur->signal_close = true;
             }
             else if (bytes_read == 0)
             {
@@ -170,7 +208,8 @@ static void read_write_cb (struct ev_loop* loop, struct ev_io* watcher, int reve
         }
     }
 
-    if( ( revents & EV_WRITE ) || dap_cur->_ready_to_write ) {
+    if( ( (revents & EV_WRITE) || dap_cur->_ready_to_write ) &&
+            dap_cur->signal_close == false ) {
 
         sh->client_write_callback(dap_cur, NULL); // Call callback to process write event
 
@@ -181,18 +220,17 @@ static void read_write_cb (struct ev_loop* loop, struct ev_io* watcher, int reve
         else
         {
             for(size_t total_sent = 0; total_sent < dap_cur->buf_out_size;) {
-                //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", total_sent, dap_cur->buf_out_size);
+                //log_it(DEBUG, "Output: %u from %u bytes are sent ", total_sent, dap_cur->buf_out_size);
                 ssize_t bytes_sent = send(dap_cur->socket,
                                       dap_cur->buf_out + total_sent,
                                       dap_cur->buf_out_size - total_sent,
                                       MSG_DONTWAIT | MSG_NOSIGNAL );
                 if(bytes_sent < 0) {
-                    log_it(L_ERROR, "Some error occured in send() function");
+                    log_it(L_ERROR,"Some error occured in send() function");
                     break;
                 }
-                total_sent += bytes_sent;
+                total_sent += (size_t)bytes_sent;
             }
-            dap_cur->download_stat.buf_size_total += dap_cur->buf_out_size;
             dap_cur->buf_out_size = 0;
         }
     }
@@ -200,28 +238,66 @@ static void read_write_cb (struct ev_loop* loop, struct ev_io* watcher, int reve
     if(dap_cur->signal_close)
     {
         log_it(L_INFO, "Close Socket %d", watcher->fd);
+
+        MUTEX_CNT_CONN(DAP_EV_DATA(watcher)->number_thread, --);
+
+        ev_io_stop(listener_clients_loops[DAP_EV_DATA(watcher)->number_thread], watcher);
         dap_client_remove(dap_cur, sh);
-        ev_io_stop(listener_clients_loop, watcher);
         free(watcher);
         return;
     }
 }
 
+/**
+ * @brief get_thread_min_connections
+ * @return number thread which has minimum open connections
+ */
+static inline uint8_t get_thread_index_min_connections()
+{
+    uint8_t min = 0;
+    for(uint8_t i = 1; i < _count_threads; i++)
+    {
+        if ( thread_inform[min].count_open_connections >
+             thread_inform[i].count_open_connections )
+        {
+            min = i;
+        }
+    }
+    return min;
+}
+
+static inline void print_online()
+{
+    for(uint8_t i = 0; i < _count_threads; i++)
+    {
+        log_it(L_INFO, "Thread number: %d, count: %d",
+               thread_inform[i].thread_number, thread_inform[i].count_open_connections);
+    }
+}
+
 static void accept_cb (struct ev_loop* loop, struct ev_io* watcher, int revents)
 {
     int client_fd = accept(watcher->fd, 0, 0);
-    log_it(L_INFO, "Client accept socket %", client_fd);
+    log_it(L_INFO, "Client accept socket %d", client_fd);
     if( client_fd < 0 )
         log_it(L_ERROR, "error accept");
     set_nonblock_socket(client_fd);
 
-    ev_async_data_t *ev_data = async_watcher.data;
+    uint8_t indx_min = get_thread_index_min_connections();
+    ev_async_data_t *ev_data = async_watchers[indx_min].data;
     ev_data->client_fd = client_fd;
     ev_data->dap_server = watcher->data;
+    ev_data->number_thread = indx_min;
+
+    MUTEX_CNT_CONN(ev_data->number_thread, ++);
 
-    if ( ev_async_pending(&async_watcher) == false ) { //the event has not yet been processed (or even noted) by the event loop? (i.e. Is it serviced? If yes then proceed to)
-        ev_async_send(listener_clients_loop, &async_watcher); //Sends/signals/activates the given ev_async watcher, that is, feeds an EV_ASYNC event on the watcher into the event loop.
+    log_it(L_DEBUG, "Client send to thread %d", ev_data->number_thread);
+    if ( ev_async_pending(&async_watchers[ev_data->number_thread]) == false ) { //the event has not yet been processed (or even noted) by the event loop? (i.e. Is it serviced? If yes then proceed to)
+        log_it(L_INFO, "ev_async_pending");
+        ev_async_send(listener_clients_loops[ev_data->number_thread], &async_watchers[ev_data->number_thread]); //Sends/signals/activates the given ev_async watcher, that is, feeds an EV_ASYNC event on the watcher into the event loop.
     }
+    else
+        log_it(L_ERROR, "Ev async error pending");
 }
 
 /**
@@ -270,20 +346,33 @@ dap_server_t* dap_server_listen(const char * addr, uint16_t port, dap_server_typ
         dap_server_delete(sh);
         return NULL;
     }else {
-        log_it(L_INFO,"Binded %s:%u",addr,port);
-
+        log_it(L_INFO,"Binded %s:%u", addr, port);
         listen(sh->socket_listener, 100000);
         pthread_mutex_init(&sh->mutex_on_hash, NULL);
-
         return sh;
     }
 }
 
+/**
+ * @brief thread_loop
+ * @param arg
+ * @return
+ */
 void* thread_loop(void * arg)
 {
-    (void)arg;
-    log_it(L_NOTICE, "Start loop listener socket thread");
-    ev_loop(listener_clients_loop, 0);
+    log_it(L_NOTICE, "Start loop listener socket thread %d", *(int*)arg);
+
+    cpu_set_t mask;
+    CPU_ZERO(&mask);
+    CPU_SET(*(int*)arg, &mask);
+
+    if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask) != 0)
+    {
+        log_it(L_CRITICAL, "Error pthread_setaffinity_np() You really have %d or more core in CPU?", *(int*)arg);
+        abort();
+    }
+
+    ev_loop(listener_clients_loops[*(int*)arg], 0);
     return NULL;
 }
 
@@ -294,14 +383,19 @@ void* thread_loop(void * arg)
  */
 int dap_server_loop(dap_server_t * sh)
 {
-    pthread_t thread;
-    listener_clients_loop = ev_loop_new(0);
-    async_watcher.data = sh;
-    ev_async_init(&async_watcher, async_cb);
-    ev_async_start(listener_clients_loop, &async_watcher);
-    pthread_create(&thread, NULL, thread_loop, NULL);
+    int thread_arg[_count_threads];
+
+    pthread_t thread_listener[_count_threads];
 
-    sh->proc_thread.tid = pthread_self();
+    for(size_t i = 0; i < _count_threads; i++)
+    {
+        thread_arg[i] = (int)i;
+        listener_clients_loops[i] = ev_loop_new(0);
+        async_watchers[i].data = sh;
+        ev_async_init(&async_watchers[i], set_client_thread_cb);
+        ev_async_start(listener_clients_loops[i], &async_watchers[i]);
+        pthread_create(&thread_listener[i], NULL, thread_loop, &thread_arg[i]);
+    }
 
     struct ev_loop * ev_main_loop = ev_default_loop(0);
     struct ev_io w_accept; w_accept.data = sh;
@@ -311,13 +405,3 @@ int dap_server_loop(dap_server_t * sh)
 
     return 0;
 }
-
-
-/**
- * @brief dap_thread_wake_up
- * @param th
- */
-void dap_thread_wake_up(dap_thread_t * th)
-{
-   //pthread_kill(th->tid,SIGUSR1);
-}
diff --git a/dap_server.h b/dap_server.h
index 3ce0162..9f0a156 100644
--- a/dap_server.h
+++ b/dap_server.h
@@ -66,10 +66,9 @@ typedef struct dap_server{
 
 } dap_server_t;
 
-extern int dap_server_init(void); // Init server module
+extern int dap_server_init(size_t count_threads); // Init server module
 extern void dap_server_deinit(void); // Deinit server module
 
-extern void dap_thread_wake_up(dap_thread_t * th);
 extern dap_server_t* dap_server_listen(const char * addr, uint16_t port, dap_server_type_t type);
 
 extern int dap_server_loop(dap_server_t * sh);
-- 
GitLab