Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/libdap-server-core
1 result
Show changes
Commits on Source (2)
......@@ -63,7 +63,7 @@ typedef struct dap_events {
typedef struct dap_worker
{
uint32_t event_sockets_count;
uint32_t event_to_kill_count;
//uint32_t event_to_kill_count;
EPOLL_HANDLE epoll_fd;
uint32_t number_thread;
......@@ -78,10 +78,6 @@ void dap_events_deinit( ); // Deinit server module
void dap_events_thread_wake_up( dap_thread_t *th );
dap_events_t* dap_events_new( );
void dap_events_delete( dap_events_t * sh );
//void dap_events_socket_remove_and_delete( dap_events_socket_t* a_es );
void dap_events_socket_remove_and_delete(dap_events_socket_t* a_es, bool preserve_inheritor );
void dap_events_kill_socket( dap_events_socket_t *a_es );
int32_t dap_events_start( dap_events_t *sh );
int32_t dap_events_wait( dap_events_t *sh );
......
......@@ -154,7 +154,12 @@ size_t dap_events_socket_write(dap_events_socket_t *sc, const void * data, size_
size_t dap_events_socket_write_f(dap_events_socket_t *sc, const char * format,...);
size_t dap_events_socket_read(dap_events_socket_t *sc, void * data, size_t data_size);
void dap_events_socket_remove( dap_events_socket_t *a_es);
void dap_events_socket_delete(dap_events_socket_t *sc,bool preserve_inheritor); // Removes the client from the list
void dap_events_socket_remove_and_delete(dap_events_socket_t* a_es, bool preserve_inheritor );
int dap_events_socket_kill_socket( dap_events_socket_t *a_es );
void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size);
......@@ -66,6 +66,7 @@
#include <sched.h>
#include "dap_common.h"
#include "dap_server.h"
#include "dap_events.h"
#define DAP_MAX_EPOLL_EVENTS 8192
......@@ -193,7 +194,7 @@ void dap_events_delete( dap_events_t *a_events )
if ( a_events ) {
HASH_ITER( hh, a_events->sockets,cur, tmp ) {
dap_events_socket_delete( cur, false );
dap_events_socket_delete( cur, true );
}
if ( a_events->_inheritor )
......@@ -206,33 +207,6 @@ void dap_events_delete( dap_events_t *a_events )
}
}
void dap_events_kill_socket( dap_events_socket_t *a_es )
{
if ( !a_es ) {
log_it( L_ERROR, "dap_events_kill_socket( NULL )" );
return;
}
uint32_t tn = a_es->dap_worker->number_thread;
dap_worker_t *w = a_es->dap_worker;
dap_events_t *d_ev = w->events;
pthread_mutex_lock( &w->locker_on_count );
if ( a_es->kill_signal ) {
pthread_mutex_unlock( &w->locker_on_count );
return;
}
log_it( L_DEBUG, "KILL %u socket! [ thread %u ]", a_es->socket, tn );
a_es->kill_signal = true;
DL_LIST_ADD_NODE_HEAD( d_ev->to_kill_sockets, a_es, kprev, knext, w->event_to_kill_count );
pthread_mutex_unlock( &w->locker_on_count );
}
/**
* @brief s_socket_info_all_check_activity
* @param n_thread
......@@ -256,7 +230,7 @@ static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t
dap_worker->event_sockets_count --;
DL_DELETE( d_ev->dlsockets, a_es );
dap_events_socket_delete( a_es, false );
dap_events_socket_delete( a_es, true );
}
}
pthread_mutex_unlock( &dap_worker->locker_on_count );
......@@ -268,30 +242,30 @@ static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t
* @param arg
* @return
*/
static void *thread_worker_function( void *arg )
static void *thread_worker_function(void *arg)
{
dap_events_socket_t *cur, *tmp;
dap_worker_t *w = (dap_worker_t *)arg;
time_t next_time_timeout_check = time( NULL ) + s_connection_timeout / 2;
uint32_t tn = w->number_thread;
dap_events_socket_t *cur;
dap_worker_t *w = (dap_worker_t *) arg;
time_t next_time_timeout_check = time( NULL) + s_connection_timeout / 2;
uint32_t tn = w->number_thread;
#ifndef _WIN32
#ifndef NO_POSIX_SHED
cpu_set_t mask;
CPU_ZERO( &mask );
CPU_SET( tn, &mask );
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(tn, &mask);
int err;
int err;
#ifndef __ANDROID__
err = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask);
err = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask);
#else
err = sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &mask);
#endif
if (err)
{
log_it(L_CRITICAL, "Error pthread_setaffinity_np() You really have %d or more core in CPU?", *(int*)arg);
abort();
}
if(err)
{
log_it(L_CRITICAL, "Error pthread_setaffinity_np() You really have %d or more core in CPU?", *(int* )arg);
abort();
}
#endif
#else
......@@ -301,180 +275,185 @@ static void *thread_worker_function( void *arg )
}
#endif
log_it(L_INFO, "Worker %d started, epoll fd %d", w->number_thread, w->epoll_fd);
log_it(L_INFO, "Worker %d started, epoll fd %d", w->number_thread, w->epoll_fd);
struct epoll_event *events = &g_epoll_events[ DAP_MAX_EPOLL_EVENTS * tn ];
struct epoll_event *events = &g_epoll_events[ DAP_MAX_EPOLL_EVENTS * tn];
// memset( &ev, 0, sizeof(ev) );
// memset( &events, 0, sizeof(events) );
size_t total_sent; int bytes_sent;
size_t total_sent;
int bytes_sent;
while( 1 ) {
while(1) {
int selected_sockets = epoll_wait( w->epoll_fd, events, DAP_MAX_EPOLL_EVENTS, 1000 );
int selected_sockets = epoll_wait(w->epoll_fd, events, DAP_MAX_EPOLL_EVENTS, 1000);
if ( selected_sockets == -1 ) {
if ( errno == EINTR )
continue;
break;
}
time_t cur_time = time( NULL );
for( int32_t n = 0; n < selected_sockets; n ++ ) {
cur = (dap_events_socket_t *)events[n].data.ptr;
if ( !cur ) {
log_it( L_ERROR,"dap_events_socket NULL" );
continue;
}
//connection already closed (EPOLLHUP - shutdown has been made in both directions)
if(events[n].events & EPOLLHUP && events[n].events & EPOLLERR) {
log_it( L_DEBUG,"Socket error (EPOLLHUP): 0x%x",events[n].events );
if(!(events[n].events & EPOLLIN))
cur->no_close = false;
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
if ( events[n].events & EPOLLERR ) {
log_it( L_ERROR,"Socket error: %s",strerror(errno) );
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
cur->callbacks->error_callback( cur, NULL ); // Call callback to process error event
}
cur->last_time_active = cur_time;
if( events[n].events & EPOLLIN ) {
//log_it(DEBUG,"Comes connection in active read set");
if( cur->buf_in_size == sizeof(cur->buf_in) ) {
log_it( L_WARNING, "Buffer is full when there is smth to read. Its dropped!" );
cur->buf_in_size = 0;
}
int32_t bytes_read = recv( cur->socket, (char *)(cur->buf_in + cur->buf_in_size),
sizeof(cur->buf_in) - cur->buf_in_size, 0 );
if( bytes_read > 0 ) {
cur->buf_in_size += bytes_read;
//log_it(DEBUG, "Received %d bytes", bytes_read);
cur->callbacks->read_callback( cur, NULL ); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well
}
else if( bytes_read < 0 ) {
log_it( L_ERROR,"Some error occured in recv() function: %s",strerror(errno) );
dap_events_socket_set_readable( cur, false );
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
else if ( bytes_read == 0 ) {
log_it( L_INFO, "Client socket disconnected" );
dap_events_socket_set_readable( cur, false );
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
if(selected_sockets == -1) {
if( errno == EINTR)
continue;
break;
}
}
// Socket is ready to write
if( ( (events[n].events & EPOLLOUT) || (cur->flags & DAP_SOCK_READY_TO_WRITE) ) && !(cur->flags & DAP_SOCK_SIGNAL_CLOSE) ) {
///log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size);
cur->callbacks->write_callback( cur, NULL ); // Call callback to process write event
if( cur->flags & DAP_SOCK_READY_TO_WRITE ) {
time_t cur_time = time( NULL);
for(int32_t n = 0; n < selected_sockets; n++) {
static const uint32_t buf_out_zero_count_max = 20;
cur->buf_out[cur->buf_out_size] = 0;
cur = (dap_events_socket_t *) events[n].data.ptr;
if( !cur->buf_out_size ) {
if(!cur) {
log_it(L_WARNING, "Output: nothing to send. Why we are in write socket set?");
cur->buf_out_zero_count ++;
if( cur->buf_out_zero_count > buf_out_zero_count_max ) { // How many time buf_out on write event could be empty
log_it( L_ERROR, "Output: nothing to send %u times, remove socket from the write set", buf_out_zero_count_max );
dap_events_socket_set_writable( cur, false );
log_it(L_ERROR, "dap_events_socket NULL");
continue;
}
log_it(L_DEBUG, "Worker=%d fd=%d socket=%d event=0x%x(%d)", w->number_thread, w->epoll_fd,cur->socket, events[n].events,events[n].events);
//connection already closed (EPOLLHUP - shutdown has been made in both directions)
if(events[n].events & EPOLLHUP) { // && events[n].events & EPOLLERR) {
log_it(L_DEBUG, "Socket shutdown (EPOLLHUP): %s", strerror(errno));
//if(!(events[n].events & EPOLLIN))
//cur->no_close = false;
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
if(!(events[n].events & EPOLLERR))
cur->callbacks->error_callback(cur, NULL); // Call callback to process error event
}
}
else
cur->buf_out_zero_count = 0;
}
for ( total_sent = 0; total_sent < cur->buf_out_size; ) { // If after callback there is smth to send - we do it
bytes_sent = send( cur->socket, (char *)(cur->buf_out + total_sent),
cur->buf_out_size - total_sent, MSG_DONTWAIT | MSG_NOSIGNAL );
if ( bytes_sent < 0 ) {
log_it(L_ERROR,"Some error occured in send() function");
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
break;
}
total_sent += bytes_sent;
//log_it(L_DEBUG, "Output: %u from %u bytes are sent ", total_sent,sa_cur->buf_out_size);
}
//log_it(L_DEBUG,"Output: sent %u bytes",total_sent);
cur->buf_out_size = 0;
}
if ( (cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !cur->no_close ) {
pthread_mutex_lock( &w->locker_on_count );
if ( cur->kill_signal ) {
pthread_mutex_unlock( &w->locker_on_count );
continue;
}
// pthread_mutex_unlock( &dsth->mutex_dlist_add_remove );
// dap_server_kill_socket( dap_cur );
// continue;
if(events[n].events & EPOLLERR) {
log_it(L_ERROR, "Socket error: %s", strerror(errno));
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
cur->callbacks->error_callback(cur, NULL); // Call callback to process error event
}
log_it( L_INFO, "Got signal to close %s, sock %u [thread %u]", cur->hostaddr, cur->socket, tn );
cur->last_time_active = cur_time;
if(events[n].events & EPOLLIN) {
//log_it(DEBUG,"Comes connection in active read set");
if(cur->buf_in_size == sizeof(cur->buf_in)) {
log_it(L_WARNING, "Buffer is full when there is smth to read. Its dropped!");
cur->buf_in_size = 0;
}
int32_t bytes_read = recv(cur->socket, (char *) (cur->buf_in + cur->buf_in_size),
sizeof(cur->buf_in) - cur->buf_in_size, 0);
if(bytes_read > 0) {
cur->buf_in_size += bytes_read;
//log_it(DEBUG, "Received %d bytes", bytes_read);
cur->callbacks->read_callback(cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well
}
else if(bytes_read < 0) {
log_it(L_ERROR, "Some error occured in recv() function: %s", strerror(errno));
dap_events_socket_set_readable(cur, false);
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
else if(bytes_read == 0) {
log_it(L_INFO, "Client socket disconnected");
dap_events_socket_set_readable(cur, false);
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
}
dap_events_socket_remove_and_delete( cur, false );
pthread_mutex_unlock( &w->locker_on_count );
}
} // for
// Socket is ready to write
if(((events[n].events & EPOLLOUT) || (cur->flags & DAP_SOCK_READY_TO_WRITE))
&& !(cur->flags & DAP_SOCK_SIGNAL_CLOSE)) {
///log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size);
cur->callbacks->write_callback(cur, NULL); // Call callback to process write event
if(cur->flags & DAP_SOCK_READY_TO_WRITE) {
static const uint32_t buf_out_zero_count_max = 20;
cur->buf_out[cur->buf_out_size] = 0;
if(!cur->buf_out_size) {
log_it(L_WARNING, "Output: nothing to send. Why we are in write socket set?");
cur->buf_out_zero_count++;
if(cur->buf_out_zero_count > buf_out_zero_count_max) { // How many time buf_out on write event could be empty
log_it(L_ERROR, "Output: nothing to send %u times, remove socket from the write set",
buf_out_zero_count_max);
dap_events_socket_set_writable(cur, false);
}
}
else
cur->buf_out_zero_count = 0;
}
for(total_sent = 0; total_sent < cur->buf_out_size;) { // If after callback there is smth to send - we do it
bytes_sent = send(cur->socket, (char *) (cur->buf_out + total_sent),
cur->buf_out_size - total_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
if(bytes_sent < 0) {
log_it(L_ERROR, "Some error occured in send() function");
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
break;
}
total_sent += bytes_sent;
//log_it(L_DEBUG, "Output: %u from %u bytes are sent ", total_sent,sa_cur->buf_out_size);
}
//log_it(L_DEBUG,"Output: sent %u bytes",total_sent);
cur->buf_out_size = 0;
}
#ifndef NO_TIMER
if ( cur_time >= next_time_timeout_check ) {
pthread_mutex_lock(&w->locker_on_count);
s_socket_all_check_activity( w, w->events, cur_time );
next_time_timeout_check = cur_time + s_connection_timeout / 2;
}
#endif
if((cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !cur->no_close) {
// protect against double deletion
cur->kill_signal = true;
//dap_events_socket_remove_and_delete(cur, true);
log_it(L_INFO, "Got signal to close %s, sock %u [thread %u]", cur->hostaddr, cur->socket, tn);
}
pthread_mutex_lock( &w->locker_on_count );
if ( !w->event_to_kill_count ) {
if(cur->kill_signal) {
log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ]", cur->socket, tn);
dap_events_socket_remove(cur);
pthread_mutex_unlock(&w->locker_on_count);
dap_events_socket_delete( cur, true);
}
else
pthread_mutex_unlock(&w->locker_on_count);
pthread_mutex_unlock( &w->locker_on_count );
continue;
}
/*
if(!w->event_to_kill_count) {
cur = w->events->to_kill_sockets;
pthread_mutex_unlock(&w->locker_on_count);
continue;
do {
do {
// if ( cur->no_close ) {
// cur = cur->knext;
// continue;
// }
tmp = cur_del->knext;
// delete only current events_socket because others may be active in the other workers
//if(cur_del == cur)
if(cur->kill_signal) {
log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ]", cur_del->socket, tn);
DL_LIST_REMOVE_NODE(w->events->to_kill_sockets, cur, kprev, knext, w->event_to_kill_count);
dap_events_socket_remove_and_delete(cur_del, true);
}
cur_del = tmp;
} while(cur_del);
log_it(L_INFO, "[ Thread %u ] coneections: %u, to kill: %u", tn, w->event_sockets_count,
w->event_to_kill_count);
pthread_mutex_unlock(&w->locker_on_count);
*/
} // for
#ifndef NO_TIMER
if(cur_time >= next_time_timeout_check) {
s_socket_all_check_activity(w, w->events, cur_time);
next_time_timeout_check = cur_time + s_connection_timeout / 2;
}
#endif
log_it( L_INFO, "Kill %u socket .... [ thread %u ]", cur->socket, tn );
tmp = cur->knext;
DL_LIST_REMOVE_NODE( w->events->to_kill_sockets, cur, kprev, knext, w->event_to_kill_count );
dap_events_socket_remove_and_delete( cur, false );
cur = tmp;
} while ( cur );
log_it( L_INFO, "[ Thread %u ] coneections: %u, to kill: %u", tn, w->event_sockets_count, w->event_to_kill_count );
pthread_mutex_unlock( &w->locker_on_count );
} // while
} // while
return NULL;
return NULL;
}
/**
......@@ -533,7 +512,7 @@ int dap_events_start( dap_events_t *a_events )
return -1;
}
s_workers[i].event_to_kill_count = 0;
//s_workers[i].event_to_kill_count = 0;
s_workers[i].event_sockets_count = 0;
s_workers[i].number_thread = i;
s_workers[i].events = a_events;
......@@ -562,8 +541,6 @@ int dap_events_wait( dap_events_t *sh )
return 0;
}
/**
* @brief dap_worker_add_events_socket
* @param a_worker
......@@ -578,23 +555,6 @@ void dap_worker_add_events_socket( dap_events_socket_t *a_es)
a_es->events = a_es->dap_worker->events;
}
/**
* @brief dap_events_socket_delete
* @param a_es
*/
void dap_events_socket_remove_and_delete( dap_events_socket_t *a_es, bool preserve_inheritor )
{
if ( epoll_ctl( a_es->dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 )
log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" );
else
log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_es->dap_worker->number_thread );
DL_DELETE( a_es->events->dlsockets, a_es );
a_es->dap_worker->event_sockets_count --;
dap_events_socket_delete( a_es, preserve_inheritor );
}
/**
* @brief dap_events__thread_wake_up
* @param th
......
......@@ -173,9 +173,11 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da
dap_events_socket_t *dap_events_socket_find( int sock, struct dap_events *a_events )
{
dap_events_socket_t *ret = NULL;
if(!a_events)
return NULL;
pthread_rwlock_rdlock( &a_events->sockets_rwlock );
HASH_FIND_INT( a_events->sockets, &sock, ret );
if(a_events->sockets)
HASH_FIND_INT( a_events->sockets, &sock, ret );
pthread_rwlock_unlock( &a_events->sockets_rwlock );
return ret;
......@@ -246,6 +248,37 @@ void dap_events_socket_set_writable( dap_events_socket_t *sc, bool is_ready )
}
/**
* @brief dap_events_socket_kill_socket
* @param sc Connection instance
*/
int dap_events_socket_kill_socket( dap_events_socket_t *a_es )
{
if ( !a_es ) {
log_it( L_ERROR, "dap_events_socket_kill_socket( NULL )" );
return -1;
}
uint32_t tn = a_es->dap_worker->number_thread;
dap_worker_t *w = a_es->dap_worker;
//dap_events_t *d_ev = w->events;
pthread_mutex_lock( &w->locker_on_count );
if ( a_es->kill_signal ) {
pthread_mutex_unlock( &w->locker_on_count );
return 0;
}
log_it( L_DEBUG, "KILL %u socket! (in queue) [ thread %u ]", a_es->socket, tn );
a_es->kill_signal = true;
//DL_LIST_ADD_NODE_HEAD( d_ev->to_kill_sockets, a_es, kprev, knext, w->event_to_kill_count );
pthread_mutex_unlock( &w->locker_on_count );
return 0;
}
/**
* @brief dap_events_socket_remove Removes the client from the list
* @param sc Connection instance
......@@ -256,8 +289,13 @@ void dap_events_socket_delete( dap_events_socket_t *a_es, bool preserve_inherito
log_it( L_DEBUG, "es is going to be removed from the lists and free the memory (0x%016X)", a_es );
if(!dap_events_socket_find(a_es->socket, a_es->events)){
log_it( L_ERROR, "dap_events_socket 0x%x already deleted", a_es);
return ;
}
pthread_rwlock_wrlock( &a_es->events->sockets_rwlock );
HASH_DEL( a_es->events->sockets, a_es );
if(a_es->events->sockets)
HASH_DEL( a_es->events->sockets, a_es );
pthread_rwlock_unlock( &a_es->events->sockets_rwlock );
log_it( L_DEBUG, "dap_events_socket wrapped around %d socket is removed", a_es->socket );
......@@ -266,7 +304,7 @@ void dap_events_socket_delete( dap_events_socket_t *a_es, bool preserve_inherito
a_es->callbacks->delete_callback( a_es, NULL ); // Init internal structure
if ( a_es->_inheritor && !preserve_inheritor )
free( a_es->_inheritor );
DAP_DELETE( a_es->_inheritor );
if ( a_es->socket ) {
#ifdef _WIN32
......@@ -279,6 +317,34 @@ void dap_events_socket_delete( dap_events_socket_t *a_es, bool preserve_inherito
free( a_es );
}
/**
* @brief dap_events_socket_delete
* @param a_es
*/
void dap_events_socket_remove( dap_events_socket_t *a_es)
{
if ( epoll_ctl( a_es->dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 )
log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" );
else
log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_es->dap_worker->number_thread );
DL_DELETE( a_es->events->dlsockets, a_es );
a_es->dap_worker->event_sockets_count --;
}
void dap_events_socket_remove_and_delete( dap_events_socket_t *a_es, bool preserve_inheritor )
{
if ( epoll_ctl( a_es->dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 )
log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" );
else
log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_es->dap_worker->number_thread );
DL_DELETE( a_es->events->dlsockets, a_es );
a_es->dap_worker->event_sockets_count --;
dap_events_socket_delete( a_es, preserve_inheritor );
}
/**
* @brief dap_events_socket_write Write data to the client
* @param sc Conn instance
......