From 066261620816ee9cc48216a70f995183e4d0d61c Mon Sep 17 00:00:00 2001
From: armatusmiles <akurotych@gmail.com>
Date: Wed, 5 Sep 2018 17:41:24 +0300
Subject: [PATCH] [+] Callback worker in dap_traffic_track

---
 CMakeLists.txt                |  2 +-
 dap_traffic_track.c           | 99 +++++++++++++++++++++++------------
 dap_traffic_track.h           | 12 ++---
 libdap                        |  2 +-
 test/dap_traffic_track_test.c | 43 +++++++--------
 test/libdap-test              |  2 +-
 test/main.c                   |  1 +
 7 files changed, 93 insertions(+), 68 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index faf180f..6fe6d63 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -15,5 +15,5 @@ file(GLOB HEADERS *.h)
 
 add_library(${PROJECT_NAME} STATIC ${SOURCES} ${HEADERS})
 
-target_link_libraries(${PROJECT_NAME} ev dap_crypto)
+target_link_libraries(${PROJECT_NAME} pthread ev dap_crypto)
 target_include_directories(${PROJECT_NAME} INTERFACE .)
diff --git a/dap_traffic_track.c b/dap_traffic_track.c
index 6b3738b..ce7042e 100644
--- a/dap_traffic_track.c
+++ b/dap_traffic_track.c
@@ -1,19 +1,20 @@
 #include "dap_traffic_track.h"
 #include "dap_common.h"
 
+#include <pthread.h>
+
 #define LOG_TAG "dap_traffic_track"
 #define BITS_IN_BYTE 8
 #define ALLOC_STEP 100
 
-static dap_traffic_callback_t callback = NULL;
+static dap_traffic_callback_t _callback = NULL;
 static dap_server_t * _dap_server;
 static ev_timer _timeout_watcher;
 static struct ev_loop *loop;
-
-static struct callback_result {
-    dap_traffic_track_result_t * res;
-    size_t allocated_counter;
-} _callback_result;
+static pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t _cond = PTHREAD_COND_INITIALIZER;
+static pthread_t worker_thread;
+static bool _stop_worker_signal = false;
 
 /**
  * @brief calculate_mbits_speed
@@ -29,17 +30,43 @@ static double _calculate_mbits_speed(size_t count_bytes)
     return bits_per_second / 1000000.0; // convert to mbits
 }
 
+void* _worker_run(void * a)
+{
+    (void)a;
+    pthread_mutex_lock(&_mutex);
+    while (true) {
+        pthread_cond_wait(&_cond, &_mutex);
+        if(_stop_worker_signal) {
+            log_it(L_INFO, "Dap traffic track worker stopped");
+            _stop_worker_signal = false;
+            break;
+        }
+        _callback(_dap_server);
+    }
+    pthread_mutex_unlock(&_mutex);
+    pthread_exit(NULL);
+}
+
+void _worker_start()
+{
+    pthread_mutex_init(&_mutex, NULL);
+    pthread_cond_init(&_cond, NULL);
+    pthread_create(&worker_thread, NULL, _worker_run, NULL);
+}
 
-static void _realloc_callback_result(size_t count_users)
+void _worker_stop()
 {
-    // rounding to multiple ALLOC_STEP
-    size_t new_size = (count_users + ALLOC_STEP) - (count_users % ALLOC_STEP);
+    pthread_mutex_lock(&_mutex);
+    _stop_worker_signal = true;
+    pthread_cond_signal(&_cond);
+    pthread_mutex_unlock(&_mutex);
 
-    _callback_result.res = (dap_traffic_track_result_t *) realloc
-            (_callback_result.res, new_size * sizeof(dap_traffic_track_result_t));
-    _callback_result.allocated_counter = new_size;
+    // wait for exit worker_thread
+    pthread_join(worker_thread, NULL);
 
-    log_it(L_DEBUG, "Reallocated memory for _callback_result to: %d", _callback_result.allocated_counter);
+    pthread_mutex_destroy(&_mutex);
+    pthread_cond_destroy(&_cond);
+    _callback = NULL;
 }
 
 static void _timeout_cb()
@@ -48,11 +75,6 @@ static void _timeout_cb()
 
     size_t count_users = HASH_COUNT(_dap_server->clients);
 
-    if(_callback_result.allocated_counter < count_users ||
-            _callback_result.allocated_counter - ALLOC_STEP > count_users) {
-        _realloc_callback_result(count_users);
-    }
-
     if(count_users) {
         size_t idx = 0;
         dap_server_client_t *dap_cur, *tmp;
@@ -60,37 +82,31 @@ static void _timeout_cb()
 
             dap_cur->upload_stat.speed_mbs =
                     _calculate_mbits_speed(dap_cur->upload_stat.buf_size_total -
-                                          dap_cur->upload_stat.buf_size_total_old);
+                                           dap_cur->upload_stat.buf_size_total_old);
             dap_cur->upload_stat.buf_size_total_old = dap_cur->upload_stat.buf_size_total;
 
             dap_cur->download_stat.speed_mbs =
                     _calculate_mbits_speed(dap_cur->download_stat.buf_size_total -
-                                          dap_cur->download_stat.buf_size_total_old);
+                                           dap_cur->download_stat.buf_size_total_old);
 
             dap_cur->download_stat.buf_size_total_old = dap_cur->download_stat.buf_size_total;
 
-            //        log_it(L_DEBUG, "upload_mbs: %f download_mbs: %f", dap_cur->upload_stat.speed_mbs,
-            //               dap_cur->download_stat.speed_mbs);
-            strcpy(_callback_result.res[idx].client_id, dap_cur->id);
-            _callback_result.res[idx].upload_speed_mbs = dap_cur->upload_stat.speed_mbs;
-            _callback_result.res[idx].download_speed_mbs = dap_cur->download_stat.speed_mbs;
             idx++;
         }
     }
 
     pthread_mutex_unlock(&_dap_server->mutex_on_hash);
 
-    if(callback != NULL) {
-        callback(_callback_result.res, count_users);
+    if(_callback != NULL) {
+        pthread_mutex_lock(&_mutex);
+        pthread_cond_signal(&_cond);
+        pthread_mutex_unlock(&_mutex);
     }
 }
 
 void dap_traffic_track_init(dap_server_t * server,
                             time_t timeout)
 {
-    _callback_result.allocated_counter = ALLOC_STEP;
-    _callback_result.res = calloc(_callback_result.allocated_counter, sizeof (dap_traffic_track_result_t));
-
     _dap_server = server;
     _timeout_watcher.repeat = timeout;
     loop = EV_DEFAULT;
@@ -101,14 +117,29 @@ void dap_traffic_track_init(dap_server_t * server,
 
 void dap_traffic_track_deinit()
 {
-    _callback_result.allocated_counter = 0;
-    free(_callback_result.res);
+    if(_callback != NULL)
+        _worker_stop();
+
     ev_timer_stop(loop, &_timeout_watcher);
     ev_loop_destroy(loop);
     log_it(L_NOTICE, "Deinitialized traffic track module");
 }
 
-void dap_traffic_set_callback(dap_traffic_callback_t cb)
+void dap_traffic_callback_stop() {
+    if(_callback == NULL) {
+        log_it(L_WARNING, "worker not running");
+        return;
+    }
+    _worker_stop();
+}
+
+void dap_traffic_callback_set(dap_traffic_callback_t cb)
 {
-    callback = cb;
+    if(_callback == NULL) {
+        _callback = cb;
+        _worker_start();
+        return;
+    }
+
+    log_it(L_WARNING, "Callback already setted");
 }
diff --git a/dap_traffic_track.h b/dap_traffic_track.h
index 90de22e..10516ba 100644
--- a/dap_traffic_track.h
+++ b/dap_traffic_track.h
@@ -3,13 +3,7 @@
 #include "dap_server_client.h"
 #include "dap_server.h"
 
-typedef struct dap_traffic_track_result {
-    dap_server_client_id client_id;
-    double download_speed_mbs;
-    double upload_speed_mbs;
-} dap_traffic_track_result_t;
-
-typedef void (*dap_traffic_callback_t) (dap_traffic_track_result_t[], size_t result_length); // Callback for specific server's operations
+typedef void (*dap_traffic_callback_t) (dap_server_t*);
 
 /**
  * @brief dap_traffic_track_init
@@ -27,5 +21,7 @@ void dap_traffic_track_deinit(void);
 /**
  * @brief dap_traffic_add_callback
  */
-void dap_traffic_set_callback(dap_traffic_callback_t);
+void dap_traffic_callback_set(dap_traffic_callback_t);
+
+void dap_traffic_callback_stop(void);
 #endif
diff --git a/libdap b/libdap
index 8da7f6c..3d50b60 160000
--- a/libdap
+++ b/libdap
@@ -1 +1 @@
-Subproject commit 8da7f6c68b0d0bb17190dc581091073f50f4296d
+Subproject commit 3d50b609ae8168570d20da3efeef7a475d3cd35f
diff --git a/test/dap_traffic_track_test.c b/test/dap_traffic_track_test.c
index dd40820..57859f6 100644
--- a/test/dap_traffic_track_test.c
+++ b/test/dap_traffic_track_test.c
@@ -1,4 +1,5 @@
 #include "dap_traffic_track_test.h"
+#include <unistd.h>
 #include <ev.h>
 #include <math.h>
 
@@ -10,33 +11,30 @@ static struct moc_dap_server_clients {
 } moc_dap_server_clients;
 static dap_server_t * _dap_server;
 
-static void success_callback(dap_traffic_track_result_t res[], size_t result_length) {
-    dap_assert(result_length == moc_dap_server_clients.count, "dap server amount clients");
+// false == test failed
+static bool is_callback_result_success = false;
 
-    for(size_t i = 0, j = 0; (i < result_length) && (j = i + 1); i++) {
-        dap_assert_PIF(rint(res[i].download_speed_mbs) == (j * 8), "Calculate download traffic speed");
-        dap_assert_PIF(rint(res[i].upload_speed_mbs) == (j * 8), "Calculate upload traffic speed");
-    }
-    ev_break (EV_A_ EVBREAK_ONE);
-}
-
-_Noreturn static void error_callback() {
-    dap_fail("Error callback call, success_callback has no been called");
+static void success_callback(dap_server_t* server)
+{
+    dap_pass_msg("call success_callback");
+    pthread_mutex_lock(&_dap_server->mutex_on_hash);
+    size_t cnt = HASH_COUNT(server->clients);
+    pthread_mutex_unlock(&_dap_server->mutex_on_hash);
+    dap_assert(cnt == moc_dap_server_clients.count, "dap server amount clients");
+    is_callback_result_success = true;
 }
 
 static void test_callback() {
     time_t timeout_sucess = 1;
-    loop = EV_DEFAULT;
-    /* timeout sucess_callback must be 1 for sucess calculating result*/
     dap_traffic_track_init(_dap_server, timeout_sucess);
-    dap_traffic_set_callback(success_callback);
+    dap_traffic_callback_set(success_callback);
+
+    loop = EV_DEFAULT;
+    ev_run(loop, EVRUN_ONCE);
 
-    /* Add error watcher*/
-    static ev_timer timeout_error_watcher;
-    ev_init(&timeout_error_watcher, error_callback);
-    ev_timer_init (&timeout_error_watcher, error_callback, timeout_sucess * 2, 0.);
-    ev_timer_start (loop, &timeout_error_watcher);
-    ev_run (loop, 0);
+    usleep(1000); // wait for callback
+    dap_assert(is_callback_result_success, "callback_result");
+    dap_traffic_callback_stop();
 }
 
 
@@ -46,9 +44,8 @@ void init_test_case() {
     moc_dap_server_clients.clients = calloc(moc_dap_server_clients.count,
                                             sizeof(dap_server_client_t *));
     for(size_t i = 0, j = 0; (i < moc_dap_server_clients.count) && (j = i + 1); i++) {
-        moc_dap_server_clients.clients[i] = dap_client_create(_dap_server, j, NULL);
-        moc_dap_server_clients.clients[i]->upload_stat.buf_size_total = j * 1000000;
-        moc_dap_server_clients.clients[i]->download_stat.buf_size_total = j * 1000000;
+        moc_dap_server_clients.clients[i] =
+                dap_client_create(_dap_server, j, NULL);
     }
 }
 
diff --git a/test/libdap-test b/test/libdap-test
index c6580b9..2a18a8d 160000
--- a/test/libdap-test
+++ b/test/libdap-test
@@ -1 +1 @@
-Subproject commit c6580b9c49f79e0cd6f8dff3465a91d7aefa0428
+Subproject commit 2a18a8da0117b1d5f2d951ad3f50ad4e8a8b2fdd
diff --git a/test/main.c b/test/main.c
index 502b770..bb9cdaf 100644
--- a/test/main.c
+++ b/test/main.c
@@ -5,4 +5,5 @@ int main(void) {
     // switch off debug info from library
     set_log_level(L_CRITICAL);
     dap_traffic_track_tests_run();
+    return 0;
 }
-- 
GitLab