diff --git a/.travis.yml b/.travis.yml index efab93726296244c6b8851580d49e7a011a94bbb..60cd883f0cfd8d703f12144422c57b5141324c37 100755 --- a/.travis.yml +++ b/.travis.yml @@ -11,7 +11,7 @@ before_install: script: - mkdir build - cd build - - cmake -DBUILD_DAP_CORE_SERVER_TESTS=ON ../ + - cmake . - make - ctest --verbose diff --git a/dap_client_remote.c b/dap_client_remote.c index 6eb8ce433102bff5dc5168dbe6f7c1c7681ffc51..ad163e09a938950d907a69ea9e4b5a3f6fb48fb4 100755 --- a/dap_client_remote.c +++ b/dap_client_remote.c @@ -103,7 +103,7 @@ dap_client_remote_t *dap_client_remote_create( dap_server_t *sh, int s, dap_serv dsc->pevent.events = EPOLLIN | EPOLLOUT | EPOLLERR; dsc->pevent.data.ptr = dsc; - dsc->_ready_to_read = true; + dsc->flags = DAP_SOCK_READY_TO_READ; dsc->buf_out_offset = 0; _save_ip_and_port( dsc ); @@ -172,27 +172,28 @@ dap_client_remote_t *dap_client_remote_find( int sock, struct dap_server *sh ) */ void dap_client_remote_ready_to_read( dap_client_remote_t *sc, bool is_ready ) { - if( is_ready != sc->_ready_to_read ) { + if( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_READ) ) + return; -// log_it( L_ERROR, "remote_ready_to_read() %u efd %X", sc->socket, sc->efd ); +// log_it( L_ERROR, "remote_ready_to_read() %u efd %X", sc->socket, sc->efd ); - int events = EPOLLERR; - sc->_ready_to_read = is_ready; + if ( is_ready ) + sc->flags |= DAP_SOCK_READY_TO_READ; + else + sc->flags ^= DAP_SOCK_READY_TO_READ; - if( sc->_ready_to_read ) { - events |= EPOLLIN; - } + int events = EPOLLERR; - if( sc->_ready_to_write ) { - events |= EPOLLOUT; - } + if( sc->flags & DAP_SOCK_READY_TO_READ ) + events |= EPOLLIN; - sc->pevent.events = events; + if( sc->flags & DAP_SOCK_READY_TO_WRITE ) + events |= EPOLLOUT; - if( epoll_ctl(sc->efd, EPOLL_CTL_MOD, sc->socket, &sc->pevent) != 0 ) { - log_it( L_ERROR, "epoll_ctl failed 000" ); - } - } + sc->pevent.events = events; + + if( epoll_ctl(sc->efd, EPOLL_CTL_MOD, sc->socket, &sc->pevent) != 0 ) + log_it( L_ERROR, "epoll_ctl failed 000" ); } /** @@ -202,27 +203,28 @@ void dap_client_remote_ready_to_read( dap_client_remote_t *sc, bool is_ready ) */ void dap_client_remote_ready_to_write( dap_client_remote_t *sc, bool is_ready ) { - if ( is_ready != sc->_ready_to_write ) { + if ( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_WRITE) ) + return; // log_it( L_ERROR, "remote_ready_to_write() %u efd %X", sc->socket, sc->efd ); - int events = EPOLLERR; - sc->_ready_to_write = is_ready; + if ( is_ready ) + sc->flags |= DAP_SOCK_READY_TO_WRITE; + else + sc->flags ^= DAP_SOCK_READY_TO_WRITE; - if ( sc->_ready_to_read ) { - events |= EPOLLIN; - } + int events = EPOLLERR; - if ( sc->_ready_to_write ) { - events |= EPOLLOUT; - } + if( sc->flags & DAP_SOCK_READY_TO_READ ) + events |= EPOLLIN; - sc->pevent.events = events; + if( sc->flags & DAP_SOCK_READY_TO_WRITE ) + events |= EPOLLOUT; - if( epoll_ctl(sc->efd, EPOLL_CTL_MOD, sc->socket, &sc->pevent) != 0 ) { - log_it( L_ERROR, "epoll_ctl failed 001" ); - } - } + sc->pevent.events = events; + + if( epoll_ctl(sc->efd, EPOLL_CTL_MOD, sc->socket, &sc->pevent) != 0 ) + log_it( L_ERROR, "epoll_ctl failed 001" ); } /** diff --git a/dap_client_remote.h b/dap_client_remote.h index 4bd1d45f162e9b666bb5f99a108ba7a70bcc8759..29cbfc81fe71b4ef33d5b103621d6686b127019c 100755 --- a/dap_client_remote.h +++ b/dap_client_remote.h @@ -67,9 +67,9 @@ typedef struct dap_client_remote { int socket; dap_server_client_id id; - bool signal_close; - bool _ready_to_write; - bool _ready_to_read; + uint32_t flags; + bool close_denied; + bool kill_signal; uint16_t port; str_ip s_ip; @@ -101,6 +101,7 @@ typedef struct dap_client_remote { UT_hash_handle hh; struct dap_client_remote *next, *prev; + struct dap_client_remote *knext, *kprev; void *_internal; void *_inheritor; // Internal data to specific client type, usualy states for state machine diff --git a/dap_events.c b/dap_events.c index a46737bd977fed283136657a620a0a8383aa9d04..5c6e730ce78c5017b2699cc612a6cb180442a0f7 100755 --- a/dap_events.c +++ b/dap_events.c @@ -200,11 +200,9 @@ void dap_events_delete( dap_events_t *a_events ) if ( a_events ) { - // TODO REPLACE TO DLIST HASH_ITER( hh, a_events->sockets,cur, tmp ) { dap_events_socket_delete( cur, false ); } - // TODO REPLACE TO DLIST if ( a_events->_inheritor ) free( a_events->_inheritor ); @@ -216,6 +214,33 @@ void dap_events_delete( dap_events_t *a_events ) } } +void dap_events_kill_socket( dap_events_socket_t *a_es ) +{ + if ( !a_es ) { + log_it( L_ERROR, "dap_events_kill_socket( NULL )" ); + return; + } + + uint32_t tn = a_es->dap_worker->number_thread; + + dap_worker_t *w = a_es->dap_worker; + dap_events_t *d_ev = w->events; + + pthread_mutex_lock( &w->locker_on_count ); + if ( a_es->kill_signal ) { + pthread_mutex_unlock( &w->locker_on_count ); + return; + } + + log_it( L_DEBUG, "KILL %u socket! [ thread %u ]", a_es->socket, tn ); + + a_es->kill_signal = true; + DL_LIST_ADD_NODE_HEAD( d_ev->to_kill_sockets, a_es, kprev, knext, w->event_to_kill_count ); + + pthread_mutex_unlock( &w->locker_on_count ); +} + + /** * @brief s_socket_info_all_check_activity * @param n_thread @@ -228,7 +253,7 @@ static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t pthread_mutex_lock( &dap_worker->locker_on_count ); DL_FOREACH_SAFE( d_ev->dlsockets, a_es, tmp ) { - if ( cur_time >= a_es->last_time_active + s_connection_timeout ) { + if ( !a_es->kill_signal && cur_time >= a_es->last_time_active + s_connection_timeout && !a_es->close_denied ) { log_it( L_INFO, "Socket %u timeout, closing...", a_es->socket ); @@ -239,36 +264,11 @@ static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t dap_worker->event_sockets_count --; DL_DELETE( d_ev->dlsockets, a_es ); - dap_events_socket_delete( a_es, true ); + dap_events_socket_delete( a_es, false ); } } pthread_mutex_unlock( &dap_worker->locker_on_count ); -/** - LL_FOREACH( s_dap_events_sockets[n_thread], ei ) { - - if( ei->es->is_pingable ){ - - if ( (time(NULL) - ei->es->last_ping_request) > (time_t)s_connection_timeout ) { // conn timeout - - log_it( L_INFO, "Connection on socket %d close by timeout", ei->es->sockets ); - - dap_events_socket_t * cur = dap_events_socket_find( ei->es->socket, sh ); - if ( cur != NULL ){ - dap_events_socket_remove_and_delete( cur ); - } - else { - log_it(L_ERROR, "Trying close socket but not find on client hash!"); - close(ei->es->socket); - } - } - else if(( time(NULL) - ei->es->last_ping_request ) > (time_t) s_connection_timeout/3 ) { - log_it(L_INFO, "Connection on socket %d last chance to remain alive", ei->es->socket); - } - } - } -**/ - } /** @@ -278,6 +278,7 @@ static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t */ static void *thread_worker_function( void *arg ) { + dap_events_socket_t *cur, *tmp; dap_worker_t *w = (dap_worker_t *)arg; time_t next_time_timeout_check = time( NULL ) + s_connection_timeout / 2; uint32_t tn = w->number_thread; @@ -302,7 +303,6 @@ static void *thread_worker_function( void *arg ) } #endif - log_it(L_INFO, "Worker %d started, epoll fd %d", w->number_thread, w->epoll_fd); struct epoll_event *events = &g_epoll_events[ DAP_MAX_EPOLL_EVENTS * tn ]; @@ -322,8 +322,7 @@ static void *thread_worker_function( void *arg ) for( int32_t n = 0; n < selected_sockets; n ++ ) { - //dap_events_socket_t *cur = dap_events_socket_find( events[n].data.fd, w->events ); - dap_events_socket_t *cur = (dap_events_socket_t *)events[n].data.ptr; + cur = (dap_events_socket_t *)events[n].data.ptr; if ( !cur ) { @@ -331,9 +330,9 @@ static void *thread_worker_function( void *arg ) continue; } - if( events[n].events & EPOLLERR ) { + if ( events[n].events & EPOLLERR ) { log_it( L_ERROR,"Socket error: %s",strerror(errno) ); - cur->signal_close = true; + cur->flags |= DAP_SOCK_SIGNAL_CLOSE; cur->callbacks->error_callback( cur, NULL ); // Call callback to process error event } @@ -356,20 +355,22 @@ static void *thread_worker_function( void *arg ) } else if( bytes_read < 0 ) { log_it( L_ERROR,"Some error occured in recv() function: %s",strerror(errno) ); - cur->signal_close = true; + dap_events_socket_set_readable( cur, false ); + cur->flags |= DAP_SOCK_SIGNAL_CLOSE; } else if ( bytes_read == 0 ) { log_it( L_INFO, "Client socket disconnected" ); - cur->signal_close = true; + dap_events_socket_set_readable( cur, false ); + cur->flags |= DAP_SOCK_SIGNAL_CLOSE; } } // Socket is ready to write - if( ( events[n].events & EPOLLOUT || cur->_ready_to_write ) && !cur->signal_close ) { + if( ( (events[n].events & EPOLLOUT) || (cur->flags & DAP_SOCK_READY_TO_WRITE) ) && !(cur->flags & DAP_SOCK_SIGNAL_CLOSE) ) { ///log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size); cur->callbacks->write_callback( cur, NULL ); // Call callback to process write event - if( cur->_ready_to_write ) { + if( cur->flags & DAP_SOCK_READY_TO_WRITE ) { static const uint32_t buf_out_zero_count_max = 20; cur->buf_out[cur->buf_out_size] = 0; @@ -394,7 +395,8 @@ static void *thread_worker_function( void *arg ) if ( bytes_sent < 0 ) { log_it(L_ERROR,"Some error occured in send() function"); - break; + cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + break; } total_sent += bytes_sent; //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", total_sent,sa_cur->buf_out_size); @@ -403,9 +405,23 @@ static void *thread_worker_function( void *arg ) cur->buf_out_size = 0; } - if( cur->signal_close ) { - log_it( L_INFO, "Got signal to close from the client %s", cur->hostaddr ); + if ( (cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !cur->close_denied ) { + + pthread_mutex_lock( &w->locker_on_count ); + + if ( cur->kill_signal ) { + pthread_mutex_unlock( &w->locker_on_count ); + continue; + } + +// pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); +// dap_server_kill_socket( dap_cur ); +// continue; + + log_it( L_INFO, "Got signal to close %s, sock %u [thread %u]", cur->hostaddr, cur->socket, tn ); + dap_events_socket_remove_and_delete( cur, false ); + pthread_mutex_unlock( &w->locker_on_count ); } } // for @@ -416,6 +432,36 @@ static void *thread_worker_function( void *arg ) next_time_timeout_check = cur_time + s_connection_timeout / 2; } #endif + + pthread_mutex_lock( &w->locker_on_count ); + if ( !w->event_to_kill_count ) { + + pthread_mutex_unlock( &w->locker_on_count ); + continue; + } + + cur = w->events->to_kill_sockets; + + do { + +// if ( cur->close_denied ) { +// cur = cur->knext; +// continue; +// } + + log_it( L_INFO, "Kill %u socket .... [ thread %u ]", cur->socket, tn ); + + tmp = cur->knext; + DL_LIST_REMOVE_NODE( w->events->to_kill_sockets, cur, kprev, knext, w->event_to_kill_count ); + + dap_events_socket_remove_and_delete( cur, false ); + cur = tmp; + + } while ( cur ); + + log_it( L_INFO, "[ Thread %u ] coneections: %u, to kill: %u", tn, w->event_sockets_count, w->event_to_kill_count ); + pthread_mutex_unlock( &w->locker_on_count ); + } // while return NULL; @@ -477,6 +523,7 @@ int dap_events_start( dap_events_t *a_events ) return -1; } + s_workers[i].event_to_kill_count = 0; s_workers[i].event_sockets_count = 0; s_workers[i].number_thread = i; s_workers[i].events = a_events; @@ -519,11 +566,6 @@ void dap_worker_add_events_socket( dap_events_socket_t *a_es) a_es->dap_worker = l_worker; a_es->events = a_es->dap_worker->events; - - pthread_mutex_lock( &l_worker->locker_on_count ); - l_worker->event_sockets_count ++; - DL_APPEND( a_es->events->dlsockets, a_es ); - pthread_mutex_unlock( &l_worker->locker_on_count ); } /** @@ -537,10 +579,8 @@ void dap_events_socket_remove_and_delete( dap_events_socket_t *a_es, bool prese else log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_es->dap_worker->number_thread ); - pthread_mutex_lock( &a_es->dap_worker->locker_on_count ); - a_es->dap_worker->event_sockets_count --; DL_DELETE( a_es->events->dlsockets, a_es ); - pthread_mutex_unlock( &a_es->dap_worker->locker_on_count ); + a_es->dap_worker->event_sockets_count --; dap_events_socket_delete( a_es, preserve_inheritor ); } diff --git a/dap_events.h b/dap_events.h index 7708fb091164364d8376067ef47d8ca5bce0b105..1b2d52ae807b080dba10cb2ad9a65c9959e3d069 100755 --- a/dap_events.h +++ b/dap_events.h @@ -51,6 +51,8 @@ typedef struct dap_events { dap_events_socket_t *sockets; // Hashmap of event sockets dap_events_socket_t *dlsockets; // Dlist of event sockets + dap_events_socket_t *to_kill_sockets; // Dlist of event sockets + pthread_rwlock_t sockets_rwlock; void *_inheritor; // Pointer to the internal data, HTTP for example dap_thread_t proc_thread; @@ -61,6 +63,8 @@ typedef struct dap_events { typedef struct dap_worker { uint32_t event_sockets_count; + uint32_t event_to_kill_count; + EPOLL_HANDLE epoll_fd; uint32_t number_thread; pthread_mutex_t locker_on_count; @@ -77,6 +81,8 @@ void dap_events_delete( dap_events_t * sh ); //void dap_events_socket_remove_and_delete( dap_events_socket_t* a_es ); void dap_events_socket_remove_and_delete(dap_events_socket_t* a_es, bool preserve_inheritor ); +void dap_events_kill_socket( dap_events_socket_t *a_es ); + int32_t dap_events_start( dap_events_t *sh ); int32_t dap_events_wait( dap_events_t *sh ); diff --git a/dap_events_socket.c b/dap_events_socket.c index 56c7a8c7ad87aa0b854f018d2734a71ddaa0db19..c5c06040aa28b24511afb37633adac990f743fb6 100755 --- a/dap_events_socket.c +++ b/dap_events_socket.c @@ -92,7 +92,10 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, ret->socket = a_sock; ret->events = a_events; ret->callbacks = a_callbacks; - ret->_ready_to_read = true; + ret->flags = DAP_SOCK_READY_TO_READ; + ret->close_denied = true; + + log_it( L_DEBUG,"Dap event socket wrapped around %d sock a_events = %X", a_sock, a_events ); return ret; } @@ -110,6 +113,11 @@ void dap_events_socket_create_after( dap_events_socket_t *a_es ) dap_worker_add_events_socket( a_es ); + pthread_mutex_lock( &a_es->dap_worker->locker_on_count ); + + a_es->dap_worker->event_sockets_count ++; + DL_APPEND( a_es->events->dlsockets, a_es ); + pthread_rwlock_wrlock( &a_es->events->sockets_rwlock ); HASH_ADD_INT( a_es->events->sockets, socket, a_es ); pthread_rwlock_unlock( &a_es->events->sockets_rwlock ); @@ -120,6 +128,7 @@ void dap_events_socket_create_after( dap_events_socket_t *a_es ) if ( epoll_ctl( a_es->dap_worker->epoll_fd, EPOLL_CTL_ADD, a_es->socket, &a_es->ev ) == 1 ) log_it( L_CRITICAL, "Can't add event socket's handler to epoll_fd" ); + pthread_mutex_unlock( &a_es->dap_worker->locker_on_count ); } /** @@ -144,7 +153,7 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da ret->events = a_events; ret->callbacks = a_callbacks; - ret->_ready_to_read = true; + ret->flags = DAP_SOCK_READY_TO_READ; ret->is_pingable = true; ret->last_time_active = ret->last_ping_request = time( NULL ); @@ -182,21 +191,30 @@ dap_events_socket_t *dap_events_socket_find( int sock, struct dap_events *a_even */ void dap_events_socket_set_readable( dap_events_socket_t *sc, bool is_ready ) { - if ( is_ready != sc->_ready_to_read ) { + if( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_READ) ) + return; + + sc->ev.events = EPOLLERR; + + if ( is_ready ) + sc->flags |= DAP_SOCK_READY_TO_READ; + else + sc->flags ^= DAP_SOCK_READY_TO_READ; - sc->ev.events = EPOLLERR; - sc->_ready_to_read = is_ready; + int events = EPOLLERR; - if ( sc->_ready_to_read ) - sc->ev.events |= EPOLLIN; - if ( sc->_ready_to_write ) - sc->ev.events |= EPOLLOUT; + if( sc->flags & DAP_SOCK_READY_TO_READ ) + events |= EPOLLIN; - if ( epoll_ctl(sc->dap_worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &sc->ev) == -1 ) - log_it( L_ERROR,"Can't update read client socket state in the epoll_fd" ); - else - dap_events_thread_wake_up( &sc->events->proc_thread ); - } + if( sc->flags & DAP_SOCK_READY_TO_WRITE ) + events |= EPOLLOUT; + + sc->ev.events = events; + + if ( epoll_ctl(sc->dap_worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &sc->ev) == -1 ) + log_it( L_ERROR,"Can't update read client socket state in the epoll_fd" ); + else + dap_events_thread_wake_up( &sc->events->proc_thread ); } /** @@ -206,21 +224,28 @@ void dap_events_socket_set_readable( dap_events_socket_t *sc, bool is_ready ) */ void dap_events_socket_set_writable( dap_events_socket_t *sc, bool is_ready ) { - if ( is_ready != sc->_ready_to_write ) { + if ( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_WRITE) ) + return; + + if ( is_ready ) + sc->flags |= DAP_SOCK_READY_TO_WRITE; + else + sc->flags ^= DAP_SOCK_READY_TO_WRITE; - sc->ev.events = EPOLLERR; - sc->_ready_to_write = is_ready; + int events = EPOLLERR; - if ( sc->_ready_to_read ) - sc->ev.events |= EPOLLIN; - if ( sc->_ready_to_write ) - sc->ev.events |= EPOLLOUT; + if( sc->flags & DAP_SOCK_READY_TO_READ ) + events |= EPOLLIN; - if ( epoll_ctl(sc->dap_worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &sc->ev) == -1 ) - log_it(L_ERROR,"Can't update write client socket state in the epoll_fd"); - else - dap_events_thread_wake_up( &sc->events->proc_thread ); - } + if( sc->flags & DAP_SOCK_READY_TO_WRITE ) + events |= EPOLLOUT; + + sc->ev.events = events; + + if ( epoll_ctl(sc->dap_worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &sc->ev) == -1 ) + log_it(L_ERROR,"Can't update write client socket state in the epoll_fd"); + else + dap_events_thread_wake_up( &sc->events->proc_thread ); } @@ -234,8 +259,6 @@ void dap_events_socket_delete( dap_events_socket_t *a_es, bool preserve_inherito log_it( L_DEBUG, "es is going to be removed from the lists and free the memory (0x%016X)", a_es ); -//a_es->ev - pthread_rwlock_wrlock( &a_es->events->sockets_rwlock ); HASH_DEL( a_es->events->sockets, a_es ); pthread_rwlock_unlock( &a_es->events->sockets_rwlock ); diff --git a/dap_events_socket.h b/dap_events_socket.h index 2f9dec38357dab785015811f88b7d5925c49dba2..0a3dd3bfe02b7ade9ae36f2912c12ce35340d5e0 100755 --- a/dap_events_socket.h +++ b/dap_events_socket.h @@ -92,9 +92,12 @@ typedef struct dap_events_socket { int32_t socket; - bool signal_close; - bool _ready_to_write; - bool _ready_to_read; + uint32_t flags; +// bool signal_close; + bool close_denied; + bool kill_signal; +// bool _ready_to_write; +// bool _ready_to_read; uint32_t buf_out_zero_count; union{ @@ -123,6 +126,7 @@ typedef struct dap_events_socket { UT_hash_handle hh; struct dap_events_socket *next, *prev; + struct dap_events_socket *knext, *kprev; void *_inheritor; // Inheritor data to specific client type, usualy states for state machine diff --git a/dap_server.c b/dap_server.c index da61e955e52cafa9c9e4a8646e3d170bc07df8e5..4b68a6f78e383cd5652b6be7af469e61826981f1 100755 --- a/dap_server.c +++ b/dap_server.c @@ -163,6 +163,7 @@ int32_t dap_server_init( uint32_t count_threads ) #endif dap_thread->thread_num = i; dap_thread->epoll_events = &threads_epoll_events[ i * epoll_max_events ]; + pthread_mutex_init( &dap_thread->mutex_dlist_add_remove, NULL ); } @@ -191,8 +192,9 @@ void dap_server_deinit( void ) if ( threads_epoll_events ) { free( threads_epoll_events ); - for ( uint32_t i = 0; i < _count_threads; ++i ) + for ( uint32_t i = 0; i < _count_threads; ++i ) { pthread_mutex_destroy( &dap_server_threads[i].mutex_dlist_add_remove ); + } } } @@ -270,13 +272,11 @@ static inline uint32_t get_thread_index_min_connections( ) { uint32_t min = 0; -// for( uint32_t i = 1; i < _count_threads; i ++ ) { - -// if ( atomic_load(&thread_inform[min].count_open_connections ) > -// atomic_load(&thread_inform[i].count_open_connections) ) { -// min = i; -// } -// } + for( uint32_t i = 1; i < _count_threads; i ++ ) { + if ( dap_server_threads[min].connections_count > dap_server_threads[i].connections_count ) { + min = i; + } + } return min; } @@ -295,6 +295,32 @@ static inline void print_online() // } } +void dap_server_kill_socket( dap_client_remote_t *dcr ) +{ + if ( !dcr ) { + log_it( L_ERROR, "dap_server_kill_socket( NULL )" ); + return; + } + + dap_server_thread_t *dsth = &dap_server_threads[ dcr->tn ]; + + pthread_mutex_lock( &dsth->mutex_dlist_add_remove ); + + if ( dcr->kill_signal ) { + pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); + return; + } + + log_it( L_DEBUG, "KILL %u socket! [ thread %u ]", dcr->socket, dcr->tn ); + + dcr->kill_signal = true; + + DL_LIST_ADD_NODE_HEAD( dsth->dap_clients_to_kill, dcr, kprev, knext, dsth->to_kill_count ); + pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); + + return; +} + /* ========================================================= dap_server_add_socket( ) @@ -309,18 +335,20 @@ dap_client_remote_t *dap_server_add_socket( int32_t fd, int32_t forced_thread_n if ( !dcr ) { log_it( L_ERROR, "accept %d dap_client_remote_create() == NULL", fd ); +// pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); return dcr; } log_it( L_DEBUG, "accept %d Client, thread %d", fd, tn ); pthread_mutex_lock( &dsth->mutex_dlist_add_remove ); - DL_APPEND( dsth->dap_remote_clients, dcr ); - pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); + DL_APPEND( dsth->dap_remote_clients, dcr ); + dsth->connections_count ++; if ( epoll_ctl( dsth->epoll_fd, EPOLL_CTL_ADD, fd, &dcr->pevent) != 0 ) { log_it( L_ERROR, "epoll_ctl failed 005" ); } + pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); return dcr; } @@ -346,9 +374,11 @@ void dap_server_remove_socket( dap_client_remote_t *dcr ) if ( epoll_ctl( dcr->efd, EPOLL_CTL_DEL, dcr->socket, &dcr->pevent ) == -1 ) log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" ); - pthread_mutex_lock( &dsth->mutex_dlist_add_remove ); +// pthread_mutex_lock( &dsth->mutex_dlist_add_remove ); DL_DELETE( dsth->dap_remote_clients, dcr ); - pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); + dsth->connections_count --; + +// pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); log_it( L_DEBUG, "dcr = %X", dcr ); } @@ -358,12 +388,13 @@ static void s_socket_all_check_activity( uint32_t tn, time_t cur_time ) dap_client_remote_t *dcr, *tmp; dap_server_thread_t *dsth = &dap_server_threads[ tn ]; - log_it( L_INFO,"s_socket_info_all_check_activity() on thread %u", tn ); +// log_it( L_INFO,"s_socket_info_all_check_activity() on thread %u", tn ); pthread_mutex_lock( &dsth->mutex_dlist_add_remove ); + DL_FOREACH_SAFE( dsth->dap_remote_clients, dcr, tmp ) { - if ( cur_time >= dcr->last_time_active + SOCKET_TIMEOUT_TIME ) { + if ( !dcr->kill_signal && cur_time >= dcr->last_time_active + SOCKET_TIMEOUT_TIME && !dcr->close_denied ) { log_it( L_INFO, "Socket %u timeout, closing...", dcr->socket ); @@ -371,6 +402,8 @@ static void s_socket_all_check_activity( uint32_t tn, time_t cur_time ) log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" ); DL_DELETE( dsth->dap_remote_clients, dcr ); + dsth->connections_count --; + dap_client_remote_remove( dcr, _current_run_server ); } } @@ -414,21 +447,21 @@ static void read_write_cb( dap_client_remote_t *dap_cur, int32_t revents ) else if ( bytes_read < 0 ) { log_it( L_ERROR,"Bytes read Error %s",strerror(errno) ); if ( strcmp(strerror(errno),"Resource temporarily unavailable") != 0 ) - dap_cur->signal_close = true; + dap_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; } else { // bytes_read == 0 - dap_cur->signal_close = true; + dap_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; log_it( L_DEBUG, "0 bytes read" ); } } - if( ( (revents & EPOLLOUT) || dap_cur->_ready_to_write ) && dap_cur->signal_close == false ) { + if( ( (revents & EPOLLOUT) || (dap_cur->flags & DAP_SOCK_READY_TO_WRITE) ) && !(dap_cur->flags & DAP_SOCK_SIGNAL_CLOSE) ) { // log_it(L_DEBUG, "[THREAD %u] socket write %d ", dap_cur->tn, dap_cur->socket ); _current_run_server->client_write_callback( dap_cur, NULL ); // Call callback to process write event if( dap_cur->buf_out_size == 0 ) { -// log_it(L_DEBUG, "dap_cur->buf_out_size = 0, set ev_read watcher " ); + log_it(L_DEBUG, "dap_cur->buf_out_size = 0, set ev_read watcher " ); dap_cur->pevent.events = EPOLLIN | EPOLLERR; if( epoll_ctl(dap_cur->efd, EPOLL_CTL_MOD, dap_cur->socket, &dap_cur->pevent) != 0 ) { @@ -448,7 +481,7 @@ static void read_write_cb( dap_client_remote_t *dap_cur, int32_t revents ) MSG_DONTWAIT | MSG_NOSIGNAL ); if( bytes_sent < 0 ) { log_it(L_ERROR,"[THREAD %u] Error occured in send() function %s", dap_cur->tn, strerror(errno) ); - dap_cur->signal_close = true; + dap_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; break; } @@ -456,9 +489,10 @@ static void read_write_cb( dap_client_remote_t *dap_cur, int32_t revents ) dap_cur->download_stat.buf_size_total += (size_t)bytes_sent; } +// log_it( L_ERROR, "check !" ); + if( total_sent == dap_cur->buf_out_size ) { dap_cur->buf_out_offset = dap_cur->buf_out_size = 0; - //dap_cur->signal_close = true; // REMOVE ME!!!!!!!!!!!!!!11 } else { dap_cur->buf_out_offset = total_sent; @@ -466,12 +500,16 @@ static void read_write_cb( dap_client_remote_t *dap_cur, int32_t revents ) } // else } // write - if ( dap_cur->signal_close ) { - log_it(L_ERROR,"Close signal" ); - dap_server_remove_socket( dap_cur ); - dap_client_remote_remove( dap_cur, _current_run_server ); - } +// log_it(L_ERROR,"OPA !") ; +// Sleep(200); + +// if ( (dap_cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !dap_cur->close_denied ) { +// log_it(L_ERROR,"Close signal" ); + +// dap_server_remove_socket( dap_cur ); +// dap_client_remote_remove( dap_cur, _current_run_server ); +// } } @@ -546,8 +584,8 @@ dap_server_t *dap_server_listen( const char *addr, uint16_t port, dap_server_typ */ void *thread_loop( void *arg ) { + dap_client_remote_t *dap_cur, *tmp; dap_server_thread_t *dsth = (dap_server_thread_t *)arg; - uint32_t tn = dsth->thread_num; EPOLL_HANDLE efd = dsth->epoll_fd; struct epoll_event *events = dsth->epoll_events; @@ -578,6 +616,7 @@ void *thread_loop( void *arg ) int32_t n = epoll_wait( efd, events, DAP_MAX_THREAD_EVENTS, 1000 ); // log_it(L_WARNING,"[THREAD %u] epoll events %u", tn, n ); +// Sleep(300); if ( n == -1 || bQuitSignal ) break; @@ -587,7 +626,7 @@ void *thread_loop( void *arg ) for ( int32_t i = 0; i < n; ++ i ) { // log_it(L_ERROR,"[THREAD %u] process epoll event %u", tn, i ); - dap_client_remote_t *dap_cur = (dap_client_remote_t *)events[i].data.ptr; + dap_cur = (dap_client_remote_t *)events[i].data.ptr; if ( !dap_cur ) { log_it( L_ERROR,"dap_client_remote_t NULL" ); @@ -598,19 +637,35 @@ void *thread_loop( void *arg ) if( events[i].events & EPOLLERR ) { log_it( L_ERROR,"Socket error: %u, remove it" , dap_cur->socket ); - dap_cur->signal_close = true; + dap_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; } - if ( dap_cur->signal_close ) { - //log_it( L_INFO, "Got signal to close from the client %s", dap_cur->hostaddr ); + if ( !(dap_cur->flags & DAP_SOCK_SIGNAL_CLOSE) || dap_cur->close_denied ) + read_write_cb( dap_cur, events[i].events ); + + if ( (dap_cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !dap_cur->close_denied ) { + + pthread_mutex_lock( &dsth->mutex_dlist_add_remove ); + + if ( dap_cur->kill_signal ) { + pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); + continue; + } + +// pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); +// dap_server_kill_socket( dap_cur ); +// continue; + + log_it( L_INFO, "Got signal to close %u socket, closing...[ %u ]", dap_cur->socket, tn ); dap_server_remove_socket( dap_cur ); dap_client_remote_remove( dap_cur, _current_run_server ); - continue; + + log_it( L_INFO, "[ Thread %u ] coneections: %u, to kill: %u", tn, dsth->connections_count, dsth->to_kill_count ); + pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); } - read_write_cb( dap_cur, events[i].events ); - } + } // for if ( cur_time >= next_time_timeout_check ) { @@ -618,6 +673,36 @@ void *thread_loop( void *arg ) next_time_timeout_check = cur_time + SOCKETS_TIMEOUT_CHECK_PERIOD; } + pthread_mutex_lock( &dsth->mutex_dlist_add_remove ); + if ( !dsth->to_kill_count ) { + + pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); + continue; + } + + dap_cur = dsth->dap_clients_to_kill; + + do { + + if ( dap_cur->close_denied ) { + dap_cur = dap_cur->knext; + continue; + } + + log_it( L_INFO, "Kill %u socket ...............[ thread %u ]", dap_cur->socket, tn ); + + tmp = dap_cur->knext; + DL_LIST_REMOVE_NODE( dsth->dap_clients_to_kill, dap_cur, kprev, knext, dsth->to_kill_count ); + + dap_server_remove_socket( dap_cur ); + dap_client_remote_remove( dap_cur, _current_run_server ); + dap_cur = tmp; + + } while ( dap_cur ); + + log_it( L_INFO, "[ Thread %u ] coneections: %u, to kill: %u", tn, dsth->connections_count, dsth->to_kill_count ); + pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); + } while( !bQuitSignal ); return NULL; diff --git a/dap_server.h b/dap_server.h index 98f050c4dddcaea7b97ee0a1150c9ee0d3c8b4d8..0aada31337fae99308b3f0010c6e0111a24146f1 100755 --- a/dap_server.h +++ b/dap_server.h @@ -40,14 +40,24 @@ typedef enum dap_server_type {DAP_SERVER_TCP} dap_server_type_t; +#define DAP_BIT( x ) ( 1 << x ) + +#define DAP_SOCK_READY_TO_READ DAP_BIT( 0 ) +#define DAP_SOCK_READY_TO_WRITE DAP_BIT( 1 ) +#define DAP_SOCK_SIGNAL_CLOSE DAP_BIT( 2 ) +#define DAP_SOCK_ACTIVE DAP_BIT( 3 ) + typedef struct dap_server_thread_s { EPOLL_HANDLE epoll_fd; + uint32_t thread_num; uint32_t connections_count; + uint32_t to_kill_count; struct epoll_event *epoll_events; dap_client_remote_t *dap_remote_clients; + dap_client_remote_t *dap_clients_to_kill; pthread_mutex_t mutex_dlist_add_remove; @@ -91,3 +101,47 @@ void dap_server_deinit( void ); // Deinit server module dap_server_t *dap_server_listen( const char *addr, uint16_t port, dap_server_type_t type ); int32_t dap_server_loop( dap_server_t *d_server ); + +#define DL_LIST_REMOVE_NODE( head, obj, _prev_, _next_, total ) \ + \ + if ( obj->_next_ ) { \ + \ + if ( obj->_prev_ ) \ + obj->_next_->_prev_ = obj->_prev_; \ + else { \ + \ + obj->_next_->_prev_ = NULL; \ + head = obj->_next_; \ + } \ + } \ + \ + if ( obj->_prev_ ) { \ + \ + if ( obj->_next_ ) \ + obj->_prev_->_next_ = obj->_next_; \ + else { \ + \ + obj->_prev_->_next_ = NULL; \ + } \ + } \ + -- total; + +#define DL_LIST_ADD_NODE_HEAD( head, obj, _prev_, _next_, total )\ + \ + if ( !total ) { \ + \ + obj->_prev_ = NULL; \ + obj->_next_ = NULL; \ + \ + head = obj; \ + } \ + else { \ + \ + head->_prev_ = obj; \ + \ + obj->_prev_ = NULL; \ + obj->_next_ = head; \ + \ + head = obj; \ + } \ + ++ total;