Skip to content
Snippets Groups Projects
Commit 64d58d03 authored by armatusmiles's avatar armatusmiles
Browse files

[+] Atomic variable for calculate count open connections

parent 642e2899
No related branches found
No related tags found
No related merge requests found
cmake_minimum_required(VERSION 3.0)
project (dap_core_server C)
set(CMAKE_C_STANDARD 11)
add_definitions ("-D_GNU_SOURCE")
add_subdirectory(libdap)
......
......@@ -41,6 +41,7 @@
#include <stddef.h>
#include <errno.h>
#include <signal.h>
#include <stdatomic.h>
#include "dap_common.h"
#include "dap_server.h"
......@@ -51,20 +52,20 @@
static void read_write_cb (struct ev_loop* loop, struct ev_io* watcher, int revents);
static struct ev_loop** listener_clients_loops;
static ev_async* async_watchers;
static ev_async* async_watchers;
static dap_server_t * _current_run_server;
static size_t _count_threads = 0;
typedef struct ev_async_data
{
int client_fd;
int number_thread;
dap_server_t *dap_server;
int thread_number;
} ev_async_data_t;
static struct thread_information {
int thread_number;
int count_open_connections;
atomic_size_t count_open_connections;
} *thread_inform;
static pthread_mutex_t mutex_set_client_thread_cb;
......@@ -72,12 +73,6 @@ static pthread_mutex_t mutex_on_cnt_connections;
#define DAP_EV_DATA(a) ((ev_async_data_t*)a->data)
// n - number thread, input opertor: ++ or --
#define MUTEX_CNT_CONN(n, operator) pthread_mutex_lock(&mutex_on_cnt_connections); \
thread_inform[n].count_open_connections operator; \
pthread_mutex_unlock(&mutex_on_cnt_connections);
/**
* @brief dap_server_init Init server module
* @return Zero if ok others if no
......@@ -95,7 +90,7 @@ int dap_server_init(size_t count_threads)
for(size_t i = 0; i < _count_threads; i++)
{
thread_inform[i].thread_number = (int)i;
thread_inform[i].count_open_connections = 0;
atomic_init(&thread_inform[i].count_open_connections, 0);
async_watchers[i].data = malloc(sizeof(ev_async_data_t));
}
......@@ -112,6 +107,9 @@ int dap_server_init(size_t count_threads)
*/
void dap_server_deinit()
{
for(size_t i = 0; i < _count_threads; i++)
free (async_watchers[i].data);
free(async_watchers);
free(listener_clients_loops);
free(thread_inform);
......@@ -159,7 +157,6 @@ static void set_client_thread_cb (EV_P_ ev_async *w, int revents)
pthread_mutex_lock(&mutex_set_client_thread_cb);
int fd = DAP_EV_DATA(w)->client_fd;
dap_server_t *sh = DAP_EV_DATA(w)->dap_server;
struct ev_io* w_client = (struct ev_io*) malloc (sizeof(struct ev_io));
......@@ -169,17 +166,16 @@ static void set_client_thread_cb (EV_P_ ev_async *w, int revents)
memcpy(w_client->data, w->data, sizeof(ev_async_data_t));
dap_client_create(sh, fd, w_client);
dap_client_create(_current_run_server, fd, w_client);
ev_io_start(listener_clients_loops[DAP_EV_DATA(w)->number_thread], w_client);
ev_io_start(listener_clients_loops[DAP_EV_DATA(w)->thread_number], w_client);
pthread_mutex_unlock(&mutex_set_client_thread_cb);
}
static void read_write_cb (struct ev_loop* loop, struct ev_io* watcher, int revents)
{
dap_server_t* sh = DAP_EV_DATA(watcher)->dap_server;
dap_server_client_t* dap_cur = dap_client_find(watcher->fd, sh);
dap_server_client_t* dap_cur = dap_client_find(watcher->fd, _current_run_server);
if ( revents & EV_READ )
{
......@@ -193,7 +189,7 @@ static void read_write_cb (struct ev_loop* loop, struct ev_io* watcher, int reve
if(bytes_read > 0)
{
dap_cur->buf_in_size += (size_t)bytes_read;
sh->client_read_callback(dap_cur,NULL);
_current_run_server->client_read_callback(dap_cur,NULL);
}
else if(bytes_read < 0)
{
......@@ -211,7 +207,7 @@ static void read_write_cb (struct ev_loop* loop, struct ev_io* watcher, int reve
if( ( (revents & EV_WRITE) || dap_cur->_ready_to_write ) &&
dap_cur->signal_close == false ) {
sh->client_write_callback(dap_cur, NULL); // Call callback to process write event
_current_run_server->client_write_callback(dap_cur, NULL); // Call callback to process write event
if(dap_cur->buf_out_size == 0)
{
......@@ -239,10 +235,10 @@ static void read_write_cb (struct ev_loop* loop, struct ev_io* watcher, int reve
{
log_it(L_INFO, "Close Socket %d", watcher->fd);
MUTEX_CNT_CONN(DAP_EV_DATA(watcher)->number_thread, --);
atomic_fetch_sub(&thread_inform[DAP_EV_DATA(watcher)->thread_number].count_open_connections, 1);
ev_io_stop(listener_clients_loops[DAP_EV_DATA(watcher)->number_thread], watcher);
dap_client_remove(dap_cur, sh);
ev_io_stop(listener_clients_loops[DAP_EV_DATA(watcher)->thread_number], watcher);
dap_client_remove(dap_cur, _current_run_server);
free(watcher);
return;
}
......@@ -257,8 +253,8 @@ static inline uint8_t get_thread_index_min_connections()
uint8_t min = 0;
for(uint8_t i = 1; i < _count_threads; i++)
{
if ( thread_inform[min].count_open_connections >
thread_inform[i].count_open_connections )
if (atomic_load(&thread_inform[min].count_open_connections) >
atomic_load(&thread_inform[i].count_open_connections))
{
min = i;
}
......@@ -268,10 +264,11 @@ static inline uint8_t get_thread_index_min_connections()
static inline void print_online()
{
// TODO ATOMIC VARIABLE
for(uint8_t i = 0; i < _count_threads; i++)
{
log_it(L_INFO, "Thread number: %d, count: %d",
thread_inform[i].thread_number, thread_inform[i].count_open_connections);
thread_inform[i].thread_number, atomic_load(&thread_inform[i].count_open_connections));
}
}
......@@ -286,18 +283,19 @@ static void accept_cb (struct ev_loop* loop, struct ev_io* watcher, int revents)
uint8_t indx_min = get_thread_index_min_connections();
ev_async_data_t *ev_data = async_watchers[indx_min].data;
ev_data->client_fd = client_fd;
ev_data->dap_server = watcher->data;
ev_data->number_thread = indx_min;
ev_data->thread_number = indx_min;
MUTEX_CNT_CONN(ev_data->number_thread, ++);
atomic_fetch_add(&thread_inform[ev_data->thread_number].count_open_connections, 1);
log_it(L_DEBUG, "Client send to thread %d", ev_data->number_thread);
if ( ev_async_pending(&async_watchers[ev_data->number_thread]) == false ) { //the event has not yet been processed (or even noted) by the event loop? (i.e. Is it serviced? If yes then proceed to)
// print_online();
log_it(L_DEBUG, "Client send to thread %d", ev_data->thread_number);
if ( ev_async_pending(&async_watchers[ev_data->thread_number]) == false ) { //the event has not yet been processed (or even noted) by the event loop? (i.e. Is it serviced? If yes then proceed to)
log_it(L_INFO, "ev_async_pending");
ev_async_send(listener_clients_loops[ev_data->number_thread], &async_watchers[ev_data->number_thread]); //Sends/signals/activates the given ev_async watcher, that is, feeds an EV_ASYNC event on the watcher into the event loop.
ev_async_send(listener_clients_loops[ev_data->thread_number], &async_watchers[ev_data->thread_number]); //Sends/signals/activates the given ev_async watcher, that is, feeds an EV_ASYNC event on the watcher into the event loop.
}
else
else {
log_it(L_ERROR, "Ev async error pending");
}
}
/**
......@@ -384,7 +382,6 @@ void* thread_loop(void * arg)
int dap_server_loop(dap_server_t * sh)
{
int thread_arg[_count_threads];
pthread_t thread_listener[_count_threads];
for(size_t i = 0; i < _count_threads; i++)
......@@ -397,6 +394,7 @@ int dap_server_loop(dap_server_t * sh)
pthread_create(&thread_listener[i], NULL, thread_loop, &thread_arg[i]);
}
_current_run_server = sh;
struct ev_loop * ev_main_loop = ev_default_loop(0);
struct ev_io w_accept; w_accept.data = sh;
ev_io_init(&w_accept, accept_cb, sh->socket_listener, EV_READ);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment