From d29908c1b660d6ab91d671f210116395ff4bdbd4 Mon Sep 17 00:00:00 2001 From: Aleksandr Lysikov <lysikov@inbox.ru> Date: Sat, 23 Mar 2019 23:09:37 +0500 Subject: [PATCH] modified adding worker process for events_socket --- dap_events.c | 17 +++++++++-------- dap_events_socket.c | 11 ++++++++++- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/dap_events.c b/dap_events.c index 1e94ed2..61a6495 100755 --- a/dap_events.c +++ b/dap_events.c @@ -248,7 +248,6 @@ static void* thread_worker_function(void *arg) } #endif - log_it(L_INFO, "Worker %d started, epoll fd %d", w->number_thread, w->epoll_fd); struct epoll_event ev, events[MAX_EPOLL_EVENTS]; memzero(&ev,sizeof(ev)); memzero(&events,sizeof(events)); @@ -260,6 +259,7 @@ static void* thread_worker_function(void *arg) abort(); } #endif + log_it(L_INFO, "Worker 0x%x %d started, epoll fd %d timerfd %d", w, w->number_thread, w->epoll_fd, timerfd); struct itimerspec timerValue; memzero(&timerValue, sizeof(timerValue)); @@ -284,7 +284,7 @@ static void* thread_worker_function(void *arg) size_t total_sent; int bytes_sent; while(1) { int selected_sockets = epoll_wait(w->epoll_fd, events, MAX_EPOLL_EVENTS, -1); - // log_it(INFO, "Epoll pwait trigered worker %d", w->number_worker); + //log_it(L_INFO, "Epoll pwait trigered worker %d", w->number_thread); for(int n = 0; n < selected_sockets; n++) { #ifndef NO_TIMER if (events[n].data.fd == timerfd) { @@ -295,13 +295,14 @@ static void* thread_worker_function(void *arg) } else #endif if ( ( cur = dap_events_socket_find(events[n].data.fd, w->events) ) != NULL ) { + log_it(L_DEBUG, "Epoll event n=%d/%d fd=%d events=%d cur=%d", n, selected_sockets, events[n].data.fd, events[n].events, cur); if( events[n].events & EPOLLERR ) { log_it(L_ERROR,"Socket error: %s",strerror(errno)); cur->signal_close=true; cur->callbacks->error_callback(cur,NULL); // Call callback to process error event } else { if( events[n].events & EPOLLIN ) { - //log_it(DEBUG,"Comes connection in active read set"); + log_it(L_DEBUG,"Comes connection in active read set"); if(cur->buf_in_size == sizeof(cur->buf_in)) { log_it(L_WARNING, "Buffer is full when there is smth to read. Its dropped!"); @@ -314,7 +315,7 @@ static void* thread_worker_function(void *arg) if(bytes_read > 0) { cur->buf_in_size += bytes_read; - //log_it(DEBUG, "Received %d bytes", bytes_read); + log_it(L_DEBUG, "Received %d bytes", bytes_read); cur->callbacks->read_callback(cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well } else if(bytes_read < 0) { @@ -329,7 +330,7 @@ static void* thread_worker_function(void *arg) // Socket is ready to write if( ( events[n].events & EPOLLOUT || cur->_ready_to_write ) && ( !cur->signal_close ) ) { - ///log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size); + //log_it(L_DEBUG, "Main loop output: %u bytes to send", cur->buf_out_size); cur->callbacks->write_callback(cur, NULL); // Call callback to process write event if(cur->_ready_to_write) @@ -362,10 +363,10 @@ static void* thread_worker_function(void *arg) break; } total_sent+= bytes_sent; - //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", total_sent,sa_cur->buf_out_size); + //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", total_sent,cur->buf_out_size); } - //log_it(L_DEBUG,"Output: sent %u bytes",total_sent); + log_it(L_DEBUG,"Output: sent %u bytes",total_sent); cur->buf_out_size = 0; } } @@ -480,7 +481,7 @@ void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket) struct epoll_event ev = {0}; dap_worker_t *l_worker =dap_worker_get_min(); - ev.events = EPOLLIN | EPOLLERR | EPOLLOUT; + ev.events = EPOLLIN | EPOLLERR;// | EPOLLOUT; ev.data.fd = a_events_socket->socket; diff --git a/dap_events_socket.c b/dap_events_socket.c index da97454..fbaa25f 100755 --- a/dap_events_socket.c +++ b/dap_events_socket.c @@ -68,7 +68,7 @@ void dap_events_socket_deinit() dap_events_socket_t * dap_events_socket_wrap_no_add( dap_events_t * a_events, int a_sock, dap_events_socket_callbacks_t * a_callbacks) { - assert(a_events); + //assert(a_events); assert(a_callbacks); log_it(L_DEBUG,"Dap event socket wrapped around %d sock", a_sock); dap_events_socket_t * ret = DAP_NEW_Z(dap_events_socket_t); @@ -88,12 +88,21 @@ void dap_events_socket_create_after(dap_events_socket_t * a_es) if(a_es->callbacks->new_callback) a_es->callbacks->new_callback(a_es,NULL); // Init internal structure +/* pthread_rwlock_wrlock(&a_es->events->sockets_rwlock); a_es->last_ping_request = time(NULL); HASH_ADD_INT(a_es->events->sockets, socket, a_es); pthread_rwlock_unlock(&a_es->events->sockets_rwlock); dap_worker_add_events_socket(a_es); +*/ + dap_worker_add_events_socket(a_es); + + a_es->events = a_es->dap_worker->events; + pthread_rwlock_wrlock(&a_es->events->sockets_rwlock); + a_es->last_ping_request = time(NULL); + HASH_ADD_INT(a_es->events->sockets, socket, a_es); + pthread_rwlock_unlock(&a_es->events->sockets_rwlock); } /** -- GitLab