Skip to content
Snippets Groups Projects
Commit 06626162 authored by armatusmiles's avatar armatusmiles
Browse files

[+] Callback worker in dap_traffic_track

parent 409d7cd0
No related branches found
No related tags found
No related merge requests found
...@@ -15,5 +15,5 @@ file(GLOB HEADERS *.h) ...@@ -15,5 +15,5 @@ file(GLOB HEADERS *.h)
add_library(${PROJECT_NAME} STATIC ${SOURCES} ${HEADERS}) add_library(${PROJECT_NAME} STATIC ${SOURCES} ${HEADERS})
target_link_libraries(${PROJECT_NAME} ev dap_crypto) target_link_libraries(${PROJECT_NAME} pthread ev dap_crypto)
target_include_directories(${PROJECT_NAME} INTERFACE .) target_include_directories(${PROJECT_NAME} INTERFACE .)
#include "dap_traffic_track.h" #include "dap_traffic_track.h"
#include "dap_common.h" #include "dap_common.h"
#include <pthread.h>
#define LOG_TAG "dap_traffic_track" #define LOG_TAG "dap_traffic_track"
#define BITS_IN_BYTE 8 #define BITS_IN_BYTE 8
#define ALLOC_STEP 100 #define ALLOC_STEP 100
static dap_traffic_callback_t callback = NULL; static dap_traffic_callback_t _callback = NULL;
static dap_server_t * _dap_server; static dap_server_t * _dap_server;
static ev_timer _timeout_watcher; static ev_timer _timeout_watcher;
static struct ev_loop *loop; static struct ev_loop *loop;
static pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER;
static struct callback_result { static pthread_cond_t _cond = PTHREAD_COND_INITIALIZER;
dap_traffic_track_result_t * res; static pthread_t worker_thread;
size_t allocated_counter; static bool _stop_worker_signal = false;
} _callback_result;
/** /**
* @brief calculate_mbits_speed * @brief calculate_mbits_speed
...@@ -29,17 +30,43 @@ static double _calculate_mbits_speed(size_t count_bytes) ...@@ -29,17 +30,43 @@ static double _calculate_mbits_speed(size_t count_bytes)
return bits_per_second / 1000000.0; // convert to mbits return bits_per_second / 1000000.0; // convert to mbits
} }
void* _worker_run(void * a)
{
(void)a;
pthread_mutex_lock(&_mutex);
while (true) {
pthread_cond_wait(&_cond, &_mutex);
if(_stop_worker_signal) {
log_it(L_INFO, "Dap traffic track worker stopped");
_stop_worker_signal = false;
break;
}
_callback(_dap_server);
}
pthread_mutex_unlock(&_mutex);
pthread_exit(NULL);
}
void _worker_start()
{
pthread_mutex_init(&_mutex, NULL);
pthread_cond_init(&_cond, NULL);
pthread_create(&worker_thread, NULL, _worker_run, NULL);
}
static void _realloc_callback_result(size_t count_users) void _worker_stop()
{ {
// rounding to multiple ALLOC_STEP pthread_mutex_lock(&_mutex);
size_t new_size = (count_users + ALLOC_STEP) - (count_users % ALLOC_STEP); _stop_worker_signal = true;
pthread_cond_signal(&_cond);
pthread_mutex_unlock(&_mutex);
_callback_result.res = (dap_traffic_track_result_t *) realloc // wait for exit worker_thread
(_callback_result.res, new_size * sizeof(dap_traffic_track_result_t)); pthread_join(worker_thread, NULL);
_callback_result.allocated_counter = new_size;
log_it(L_DEBUG, "Reallocated memory for _callback_result to: %d", _callback_result.allocated_counter); pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
_callback = NULL;
} }
static void _timeout_cb() static void _timeout_cb()
...@@ -48,11 +75,6 @@ static void _timeout_cb() ...@@ -48,11 +75,6 @@ static void _timeout_cb()
size_t count_users = HASH_COUNT(_dap_server->clients); size_t count_users = HASH_COUNT(_dap_server->clients);
if(_callback_result.allocated_counter < count_users ||
_callback_result.allocated_counter - ALLOC_STEP > count_users) {
_realloc_callback_result(count_users);
}
if(count_users) { if(count_users) {
size_t idx = 0; size_t idx = 0;
dap_server_client_t *dap_cur, *tmp; dap_server_client_t *dap_cur, *tmp;
...@@ -60,37 +82,31 @@ static void _timeout_cb() ...@@ -60,37 +82,31 @@ static void _timeout_cb()
dap_cur->upload_stat.speed_mbs = dap_cur->upload_stat.speed_mbs =
_calculate_mbits_speed(dap_cur->upload_stat.buf_size_total - _calculate_mbits_speed(dap_cur->upload_stat.buf_size_total -
dap_cur->upload_stat.buf_size_total_old); dap_cur->upload_stat.buf_size_total_old);
dap_cur->upload_stat.buf_size_total_old = dap_cur->upload_stat.buf_size_total; dap_cur->upload_stat.buf_size_total_old = dap_cur->upload_stat.buf_size_total;
dap_cur->download_stat.speed_mbs = dap_cur->download_stat.speed_mbs =
_calculate_mbits_speed(dap_cur->download_stat.buf_size_total - _calculate_mbits_speed(dap_cur->download_stat.buf_size_total -
dap_cur->download_stat.buf_size_total_old); dap_cur->download_stat.buf_size_total_old);
dap_cur->download_stat.buf_size_total_old = dap_cur->download_stat.buf_size_total; dap_cur->download_stat.buf_size_total_old = dap_cur->download_stat.buf_size_total;
// log_it(L_DEBUG, "upload_mbs: %f download_mbs: %f", dap_cur->upload_stat.speed_mbs,
// dap_cur->download_stat.speed_mbs);
strcpy(_callback_result.res[idx].client_id, dap_cur->id);
_callback_result.res[idx].upload_speed_mbs = dap_cur->upload_stat.speed_mbs;
_callback_result.res[idx].download_speed_mbs = dap_cur->download_stat.speed_mbs;
idx++; idx++;
} }
} }
pthread_mutex_unlock(&_dap_server->mutex_on_hash); pthread_mutex_unlock(&_dap_server->mutex_on_hash);
if(callback != NULL) { if(_callback != NULL) {
callback(_callback_result.res, count_users); pthread_mutex_lock(&_mutex);
pthread_cond_signal(&_cond);
pthread_mutex_unlock(&_mutex);
} }
} }
void dap_traffic_track_init(dap_server_t * server, void dap_traffic_track_init(dap_server_t * server,
time_t timeout) time_t timeout)
{ {
_callback_result.allocated_counter = ALLOC_STEP;
_callback_result.res = calloc(_callback_result.allocated_counter, sizeof (dap_traffic_track_result_t));
_dap_server = server; _dap_server = server;
_timeout_watcher.repeat = timeout; _timeout_watcher.repeat = timeout;
loop = EV_DEFAULT; loop = EV_DEFAULT;
...@@ -101,14 +117,29 @@ void dap_traffic_track_init(dap_server_t * server, ...@@ -101,14 +117,29 @@ void dap_traffic_track_init(dap_server_t * server,
void dap_traffic_track_deinit() void dap_traffic_track_deinit()
{ {
_callback_result.allocated_counter = 0; if(_callback != NULL)
free(_callback_result.res); _worker_stop();
ev_timer_stop(loop, &_timeout_watcher); ev_timer_stop(loop, &_timeout_watcher);
ev_loop_destroy(loop); ev_loop_destroy(loop);
log_it(L_NOTICE, "Deinitialized traffic track module"); log_it(L_NOTICE, "Deinitialized traffic track module");
} }
void dap_traffic_set_callback(dap_traffic_callback_t cb) void dap_traffic_callback_stop() {
if(_callback == NULL) {
log_it(L_WARNING, "worker not running");
return;
}
_worker_stop();
}
void dap_traffic_callback_set(dap_traffic_callback_t cb)
{ {
callback = cb; if(_callback == NULL) {
_callback = cb;
_worker_start();
return;
}
log_it(L_WARNING, "Callback already setted");
} }
...@@ -3,13 +3,7 @@ ...@@ -3,13 +3,7 @@
#include "dap_server_client.h" #include "dap_server_client.h"
#include "dap_server.h" #include "dap_server.h"
typedef struct dap_traffic_track_result { typedef void (*dap_traffic_callback_t) (dap_server_t*);
dap_server_client_id client_id;
double download_speed_mbs;
double upload_speed_mbs;
} dap_traffic_track_result_t;
typedef void (*dap_traffic_callback_t) (dap_traffic_track_result_t[], size_t result_length); // Callback for specific server's operations
/** /**
* @brief dap_traffic_track_init * @brief dap_traffic_track_init
...@@ -27,5 +21,7 @@ void dap_traffic_track_deinit(void); ...@@ -27,5 +21,7 @@ void dap_traffic_track_deinit(void);
/** /**
* @brief dap_traffic_add_callback * @brief dap_traffic_add_callback
*/ */
void dap_traffic_set_callback(dap_traffic_callback_t); void dap_traffic_callback_set(dap_traffic_callback_t);
void dap_traffic_callback_stop(void);
#endif #endif
Subproject commit 8da7f6c68b0d0bb17190dc581091073f50f4296d Subproject commit 3d50b609ae8168570d20da3efeef7a475d3cd35f
#include "dap_traffic_track_test.h" #include "dap_traffic_track_test.h"
#include <unistd.h>
#include <ev.h> #include <ev.h>
#include <math.h> #include <math.h>
...@@ -10,33 +11,30 @@ static struct moc_dap_server_clients { ...@@ -10,33 +11,30 @@ static struct moc_dap_server_clients {
} moc_dap_server_clients; } moc_dap_server_clients;
static dap_server_t * _dap_server; static dap_server_t * _dap_server;
static void success_callback(dap_traffic_track_result_t res[], size_t result_length) { // false == test failed
dap_assert(result_length == moc_dap_server_clients.count, "dap server amount clients"); static bool is_callback_result_success = false;
for(size_t i = 0, j = 0; (i < result_length) && (j = i + 1); i++) { static void success_callback(dap_server_t* server)
dap_assert_PIF(rint(res[i].download_speed_mbs) == (j * 8), "Calculate download traffic speed"); {
dap_assert_PIF(rint(res[i].upload_speed_mbs) == (j * 8), "Calculate upload traffic speed"); dap_pass_msg("call success_callback");
} pthread_mutex_lock(&_dap_server->mutex_on_hash);
ev_break (EV_A_ EVBREAK_ONE); size_t cnt = HASH_COUNT(server->clients);
} pthread_mutex_unlock(&_dap_server->mutex_on_hash);
dap_assert(cnt == moc_dap_server_clients.count, "dap server amount clients");
_Noreturn static void error_callback() { is_callback_result_success = true;
dap_fail("Error callback call, success_callback has no been called");
} }
static void test_callback() { static void test_callback() {
time_t timeout_sucess = 1; time_t timeout_sucess = 1;
loop = EV_DEFAULT;
/* timeout sucess_callback must be 1 for sucess calculating result*/
dap_traffic_track_init(_dap_server, timeout_sucess); dap_traffic_track_init(_dap_server, timeout_sucess);
dap_traffic_set_callback(success_callback); dap_traffic_callback_set(success_callback);
loop = EV_DEFAULT;
ev_run(loop, EVRUN_ONCE);
/* Add error watcher*/ usleep(1000); // wait for callback
static ev_timer timeout_error_watcher; dap_assert(is_callback_result_success, "callback_result");
ev_init(&timeout_error_watcher, error_callback); dap_traffic_callback_stop();
ev_timer_init (&timeout_error_watcher, error_callback, timeout_sucess * 2, 0.);
ev_timer_start (loop, &timeout_error_watcher);
ev_run (loop, 0);
} }
...@@ -46,9 +44,8 @@ void init_test_case() { ...@@ -46,9 +44,8 @@ void init_test_case() {
moc_dap_server_clients.clients = calloc(moc_dap_server_clients.count, moc_dap_server_clients.clients = calloc(moc_dap_server_clients.count,
sizeof(dap_server_client_t *)); sizeof(dap_server_client_t *));
for(size_t i = 0, j = 0; (i < moc_dap_server_clients.count) && (j = i + 1); i++) { for(size_t i = 0, j = 0; (i < moc_dap_server_clients.count) && (j = i + 1); i++) {
moc_dap_server_clients.clients[i] = dap_client_create(_dap_server, j, NULL); moc_dap_server_clients.clients[i] =
moc_dap_server_clients.clients[i]->upload_stat.buf_size_total = j * 1000000; dap_client_create(_dap_server, j, NULL);
moc_dap_server_clients.clients[i]->download_stat.buf_size_total = j * 1000000;
} }
} }
......
Subproject commit c6580b9c49f79e0cd6f8dff3465a91d7aefa0428 Subproject commit 2a18a8da0117b1d5f2d951ad3f50ad4e8a8b2fdd
...@@ -5,4 +5,5 @@ int main(void) { ...@@ -5,4 +5,5 @@ int main(void) {
// switch off debug info from library // switch off debug info from library
set_log_level(L_CRITICAL); set_log_level(L_CRITICAL);
dap_traffic_track_tests_run(); dap_traffic_track_tests_run();
return 0;
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment