diff --git a/CMakeLists.txt b/CMakeLists.txt index ffdd0d382392a60f3576763f64459b0fd9449488..feaedcd5f623d0d211327fa69aac7c2730b1ed08 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.3-4") +set(CELLFRAME_SDK_NATIVE_VERSION "2.4-0") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 56e006a5118cf9c7cfd572aac1fcc2affff89f3b..e72422a5ec218c48bd8db090e50a28b1e4c86a98 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -285,7 +285,7 @@ int dap_client_pvt_disconnect(dap_client_pvt_t *a_client_pvt) l_counter++; } if(l_counter >= 70) { - dap_events_socket_remove_and_delete(a_client_pvt->stream_es, true); + dap_events_socket_queue_remove_and_delete(a_client_pvt->stream_es); } } // if (l_client_internal->stream_socket ) { diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c index 211c86ee5ab1ea0503dd640c7669c64e7ddce29c..b28fb0a43d13745a3c892f3a8d01eebb56fa66bc 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/net/core/dap_events.c @@ -79,17 +79,20 @@ //} dap_events_socket_info_t; //dap_events_socket_info_t **s_dap_events_sockets; +#define LOG_TAG "dap_events" static uint32_t s_threads_count = 1; -static size_t s_connection_timeout = 6000; +static time_t s_connection_timeout = 6000; static struct epoll_event *g_epoll_events = NULL; static volatile bool bEventsAreActive = true; bool s_workers_init = false; dap_worker_t *s_workers = NULL; dap_thread_t *s_threads = NULL; +static void s_new_es_callback( dap_events_socket_t * a_es, void * a_arg); +static void s_delete_es_callback( dap_events_socket_t * a_es, void * a_arg); +static void s_reassign_es_callback( dap_events_socket_t * a_es, void * a_arg); -#define LOG_TAG "dap_events" uint32_t dap_get_cpu_count( ) { @@ -216,17 +219,17 @@ void dap_events_delete( dap_events_t *a_events ) * @param n_thread * @param sh */ -static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t *d_ev, time_t cur_time ) +static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t *a_events, time_t cur_time ) { dap_events_socket_t *a_es, *tmp; - pthread_mutex_lock( &dap_worker->locker_on_count ); - DL_FOREACH_SAFE( d_ev->dlsockets, a_es, tmp ) { + pthread_rwlock_rdlock(&a_events->sockets_rwlock); + HASH_ITER(hh, a_events->sockets, a_es, tmp ) { if ( a_es->type == DESCRIPTOR_TYPE_FILE) continue; - if ( !a_es->kill_signal && cur_time >= a_es->last_time_active + s_connection_timeout && !a_es->no_close ) { + if ( !a_es->kill_signal && cur_time >= (time_t)a_es->last_time_active + s_connection_timeout && !a_es->no_close ) { log_it( L_INFO, "Socket %u timeout, closing...", a_es->socket ); if (a_es->callbacks.error_callback) { @@ -238,12 +241,12 @@ static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t else log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", dap_worker->number_thread ); - dap_worker->event_sockets_count --; - DL_DELETE( d_ev->dlsockets, a_es ); + pthread_rwlock_unlock(&a_events->sockets_rwlock); dap_events_socket_delete( a_es, true ); + pthread_rwlock_rdlock(&a_events->sockets_rwlock); } } - pthread_mutex_unlock( &dap_worker->locker_on_count ); + pthread_rwlock_unlock(&a_events->sockets_rwlock); } @@ -254,7 +257,7 @@ static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t */ static void *thread_worker_function(void *arg) { - dap_events_socket_t *cur; + dap_events_socket_t *l_cur; dap_worker_t *w = (dap_worker_t *) arg; time_t next_time_timeout_check = time( NULL) + s_connection_timeout / 2; uint32_t tn = w->number_thread; @@ -285,6 +288,9 @@ static void *thread_worker_function(void *arg) } #endif + w->event_new_es = dap_events_socket_create_type_event( w, s_new_es_callback); + w->event_delete_es = dap_events_socket_create_type_event( w, s_new_es_callback); + log_it(L_INFO, "Worker %d started, epoll fd %d", w->number_thread, w->epoll_fd); struct epoll_event *events = &g_epoll_events[ DAP_MAX_EPOLL_EVENTS * tn]; @@ -309,57 +315,68 @@ static void *thread_worker_function(void *arg) time_t cur_time = time( NULL); for(int32_t n = 0; n < selected_sockets; n++) { - cur = (dap_events_socket_t *) events[n].data.ptr; + l_cur = (dap_events_socket_t *) events[n].data.ptr; - if(!cur) { + if(!l_cur) { log_it(L_ERROR, "dap_events_socket NULL"); continue; } + l_cur->last_time_active = cur_time; //log_it(L_DEBUG, "Worker=%d fd=%d socket=%d event=0x%x(%d)", w->number_thread, w->epoll_fd,cur->socket, events[n].events,events[n].events); int l_sock_err = 0, l_sock_err_size = sizeof(l_sock_err); //connection already closed (EPOLLHUP - shutdown has been made in both directions) if(events[n].events & EPOLLHUP) { // && events[n].events & EPOLLERR) { - getsockopt(cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size); - //if(!(events[n].events & EPOLLIN)) - //cur->no_close = false; - if (l_sock_err) { - cur->flags |= DAP_SOCK_SIGNAL_CLOSE; - log_it(L_DEBUG, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err)); - if(!(events[n].events & EPOLLERR)) - cur->callbacks.error_callback(cur, NULL); // Call callback to process error event + switch (l_cur->type ){ + case DESCRIPTOR_TYPE_SOCKET_LISTENING: + case DESCRIPTOR_TYPE_SOCKET: + getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size); + //if(!(events[n].events & EPOLLIN)) + //cur->no_close = false; + if (l_sock_err) { + l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + log_it(L_DEBUG, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err)); + } + default: log_it(L_WARNING, "Unimplemented EPOLLHUP for socket type %d", l_cur->type); } } if(events[n].events & EPOLLERR) { - getsockopt(cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size); - log_it(L_ERROR, "Socket error: %s", strerror(l_sock_err)); - cur->flags |= DAP_SOCK_SIGNAL_CLOSE; - cur->callbacks.error_callback(cur, NULL); // Call callback to process error event + switch (l_cur->type ){ + case DESCRIPTOR_TYPE_SOCKET_LISTENING: + case DESCRIPTOR_TYPE_SOCKET: + getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size); + log_it(L_ERROR, "Socket error: %s", strerror(l_sock_err)); + default: ; + } + l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + l_cur->callbacks.error_callback(l_cur, NULL); // Call callback to process error event } - cur->last_time_active = cur_time; if(events[n].events & EPOLLIN) { //log_it(DEBUG,"Comes connection in active read set"); - if(cur->buf_in_size == sizeof(cur->buf_in)) { + if(l_cur->buf_in_size == sizeof(l_cur->buf_in)) { log_it(L_WARNING, "Buffer is full when there is smth to read. Its dropped!"); - cur->buf_in_size = 0; + l_cur->buf_in_size = 0; } int32_t bytes_read = 0; + int l_errno=0; bool l_must_read_smth = false; - switch (cur->type) { + switch (l_cur->type) { case DESCRIPTOR_TYPE_FILE: l_must_read_smth = true; - bytes_read = read(cur->socket, (char *) (cur->buf_in + cur->buf_in_size), - sizeof(cur->buf_in) - cur->buf_in_size); + bytes_read = read(l_cur->socket, (char *) (l_cur->buf_in + l_cur->buf_in_size), + sizeof(l_cur->buf_in) - l_cur->buf_in_size); + l_errno = errno; break; case DESCRIPTOR_TYPE_SOCKET: l_must_read_smth = true; - bytes_read = recv(cur->fd, (char *) (cur->buf_in + cur->buf_in_size), - sizeof(cur->buf_in) - cur->buf_in_size, 0); + bytes_read = recv(l_cur->fd, (char *) (l_cur->buf_in + l_cur->buf_in_size), + sizeof(l_cur->buf_in) - l_cur->buf_in_size, 0); + l_errno = errno; break; case DESCRIPTOR_TYPE_SOCKET_LISTENING: // Accept connection @@ -367,135 +384,124 @@ static void *thread_worker_function(void *arg) case DESCRIPTOR_TYPE_TIMER:{ uint64_t val; /* if we not reading data from socket, he triggered again */ - read( cur->fd, &val, 8); - } // Pass same actions as EVENT - mostly we're also event - case DESCRIPTOR_TYPE_EVENT: - if (cur->callbacks.action_callback) - cur->callbacks.action_callback(cur, NULL); + read( l_cur->fd, &val, 8); + if (l_cur->callbacks.timer_callback) + l_cur->callbacks.timer_callback(l_cur); else - log_it(L_ERROR, "Socket %d with action callback fired, but callback is NULL ", cur->socket); + log_it(L_ERROR, "Socket %d with timer callback fired, but callback is NULL ", l_cur->socket); + + } break; + case DESCRIPTOR_TYPE_EVENT: + if (l_cur->callbacks.event_callback){ + void * l_event_ptr = NULL; +#if defined(DAP_EVENTS_CAPS_EVENT_PIPE_PKT_MODE) + if(read( l_cur->fd, &l_event_ptr,sizeof (&l_event_ptr)) == sizeof (&l_event_ptr)) + l_cur->callbacks.event_callback(l_cur, l_event_ptr); + else if ( (errno != EAGAIN) && (errno != EWOULDBLOCK) ) // we use blocked socket for now but who knows... + log_it(L_WARNING, "Can't read packet from pipe"); +#endif + }else + log_it(L_ERROR, "Socket %d with event callback fired, but callback is NULL ", l_cur->socket); break; } if (l_must_read_smth){ // Socket/Descriptor read if(bytes_read > 0) { - cur->buf_in_size += bytes_read; + l_cur->buf_in_size += bytes_read; //log_it(DEBUG, "Received %d bytes", bytes_read); - cur->callbacks.read_callback(cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well + l_cur->callbacks.read_callback(l_cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well } else if(bytes_read < 0) { - log_it(L_ERROR, "Some error occured in recv() function: %s", strerror(errno)); - dap_events_socket_set_readable(cur, false); - cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + if (l_errno != EAGAIN && l_errno != EWOULDBLOCK){ // Socket is blocked + log_it(L_ERROR, "Some error occured in recv() function: %s", strerror(errno)); + dap_events_socket_set_readable(l_cur, false); + l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + } } else if(bytes_read == 0) { log_it(L_INFO, "Client socket disconnected"); - dap_events_socket_set_readable(cur, false); - cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + dap_events_socket_set_readable(l_cur, false); + l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; } } } // Socket is ready to write - if(((events[n].events & EPOLLOUT) || (cur->flags & DAP_SOCK_READY_TO_WRITE)) - && !(cur->flags & DAP_SOCK_SIGNAL_CLOSE)) { + if(((events[n].events & EPOLLOUT) || (l_cur->flags & DAP_SOCK_READY_TO_WRITE)) + && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE)) { ///log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size); - if(cur->callbacks.write_callback) - cur->callbacks.write_callback(cur, NULL); // Call callback to process write event + if(l_cur->callbacks.write_callback) + l_cur->callbacks.write_callback(l_cur, NULL); // Call callback to process write event - if(cur->flags & DAP_SOCK_READY_TO_WRITE) { + if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) { static const uint32_t buf_out_zero_count_max = 20; - cur->buf_out[cur->buf_out_size] = 0; + l_cur->buf_out[l_cur->buf_out_size] = 0; - if(!cur->buf_out_size) { + if(!l_cur->buf_out_size) { //log_it(L_WARNING, "Output: nothing to send. Why we are in write socket set?"); - cur->buf_out_zero_count++; + l_cur->buf_out_zero_count++; - if(cur->buf_out_zero_count > buf_out_zero_count_max) { // How many time buf_out on write event could be empty + if(l_cur->buf_out_zero_count > buf_out_zero_count_max) { // How many time buf_out on write event could be empty log_it(L_ERROR, "Output: nothing to send %u times, remove socket from the write set", buf_out_zero_count_max); - dap_events_socket_set_writable(cur, false); + dap_events_socket_set_writable(l_cur, false); } } else - cur->buf_out_zero_count = 0; + l_cur->buf_out_zero_count = 0; + } + //for(total_sent = 0; total_sent < cur->buf_out_size;) { // If after callback there is smth to send - we do it + int l_errno; + if(l_cur->type == DESCRIPTOR_TYPE_SOCKET) { + bytes_sent = send(l_cur->socket, (char *) (l_cur->buf_out + total_sent), + l_cur->buf_out_size - total_sent, MSG_DONTWAIT | MSG_NOSIGNAL); + l_errno = errno; + }else if(l_cur->type == DESCRIPTOR_TYPE_FILE) { + bytes_sent = write(l_cur->socket, (char *) (l_cur->buf_out + total_sent), + l_cur->buf_out_size - total_sent); + l_errno = errno; } - for(total_sent = 0; total_sent < cur->buf_out_size;) { // If after callback there is smth to send - we do it - if(cur->type == DESCRIPTOR_TYPE_SOCKET) { - bytes_sent = send(cur->socket, (char *) (cur->buf_out + total_sent), - cur->buf_out_size - total_sent, MSG_DONTWAIT | MSG_NOSIGNAL); - }else if(cur->type == DESCRIPTOR_TYPE_FILE) { - bytes_sent = write(cur->socket, (char *) (cur->buf_out + total_sent), - cur->buf_out_size - total_sent); - } - if(bytes_sent < 0) { + if(bytes_sent < 0) { + if (l_errno != EAGAIN && l_errno != EWOULDBLOCK ){ // If we have non-blocking socket log_it(L_ERROR, "Some error occured in send(): %s", strerror(errno)); - cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; break; } + }else{ total_sent += bytes_sent; //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", total_sent,sa_cur->buf_out_size); - } - //log_it(L_DEBUG,"Output: sent %u bytes",total_sent); - if (total_sent) { - pthread_mutex_lock(&cur->write_hold); - cur->buf_out_size -= total_sent; - if (cur->buf_out_size) { - memmove(cur->buf_out, &cur->buf_out[total_sent], cur->buf_out_size); - } else { - cur->flags &= ~DAP_SOCK_READY_TO_WRITE; + //} + //log_it(L_DEBUG,"Output: sent %u bytes",total_sent); + if (total_sent) { + pthread_mutex_lock(&l_cur->mutex); + l_cur->buf_out_size -= total_sent; + if (l_cur->buf_out_size) { + memmove(l_cur->buf_out, &l_cur->buf_out[total_sent], l_cur->buf_out_size); + } else { + l_cur->flags &= ~DAP_SOCK_READY_TO_WRITE; + } + pthread_mutex_unlock(&l_cur->mutex); } - pthread_mutex_unlock(&cur->write_hold); } } - if((cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !cur->no_close) { + if((l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !l_cur->no_close) { // protect against double deletion - cur->kill_signal = true; + l_cur->kill_signal = true; //dap_events_socket_remove_and_delete(cur, true); - log_it(L_INFO, "Got signal to close %s, sock %u [thread %u]", cur->hostaddr, cur->socket, tn); + log_it(L_INFO, "Got signal to close %s, sock %u [thread %u]", l_cur->hostaddr, l_cur->socket, tn); } - if(cur->kill_signal) { - log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ]", cur->socket, tn); - dap_events_socket_remove(cur); - dap_events_socket_delete( cur, true); + if(l_cur->kill_signal) { + log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ]", l_cur->socket, tn); + s_es_remove(l_cur); + dap_events_socket_delete( l_cur, true); } - /* - if(!w->event_to_kill_count) { - - pthread_mutex_unlock(&w->locker_on_count); - continue; - - do { - -// if ( cur->no_close ) { -// cur = cur->knext; -// continue; -// } - tmp = cur_del->knext; - - // delete only current events_socket because others may be active in the other workers - //if(cur_del == cur) - if(cur->kill_signal) { - log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ]", cur_del->socket, tn); - DL_LIST_REMOVE_NODE(w->events->to_kill_sockets, cur, kprev, knext, w->event_to_kill_count); - dap_events_socket_remove_and_delete(cur_del, true); - } - cur_del = tmp; - - } while(cur_del); - - log_it(L_INFO, "[ Thread %u ] coneections: %u, to kill: %u", tn, w->event_sockets_count, - w->event_to_kill_count); - - pthread_mutex_unlock(&w->locker_on_count); - */ - } // for + } #ifndef NO_TIMER if(cur_time >= next_time_timeout_check) { @@ -509,6 +515,87 @@ static void *thread_worker_function(void *arg) return NULL; } +/** + * @brief s_new_es_callback + * @param a_es + * @param a_arg + */ +static void s_new_es_callback( dap_events_socket_t * a_es, void * a_arg) +{ + dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg; + dap_worker_t * w = a_es->dap_worker; + log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new); + l_es_new->dap_worker = w; + if ( l_es_new->type == DESCRIPTOR_TYPE_SOCKET || l_es_new->type == DESCRIPTOR_TYPE_SOCKET_LISTENING ){ + int l_cpu = w->number_thread; + setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu)); + } + + if ( ! l_es_new->is_initalized ){ + if (l_es_new->callbacks.new_callback) + l_es_new->callbacks.new_callback(l_es_new, NULL); + l_es_new->is_initalized = true; + } + + if (l_es_new->socket>0){ + pthread_rwlock_wrlock(&w->events->sockets_rwlock); + HASH_ADD_INT(w->events->sockets, socket, l_es_new ); + pthread_rwlock_unlock(&w->events->sockets_rwlock); + + struct epoll_event l_ev={0}; + l_ev.events = l_es_new->flags ; + if(l_es_new->flags & DAP_SOCK_READY_TO_READ ) + l_ev.events |= EPOLLIN; + if(l_es_new->flags & DAP_SOCK_READY_TO_WRITE ) + l_ev.events |= EPOLLOUT; + l_ev.data.ptr = l_es_new; + + if ( epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, l_es_new->socket, &l_ev) == 1 ) + log_it(L_CRITICAL,"Can't add event socket's handler to epoll_fd"); + else{ + log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->number_thread); + if (l_es_new->callbacks.worker_assign_callback) + l_es_new->callbacks.worker_assign_callback(l_es_new, w); + + } + }else{ + log_it(L_ERROR, "Incorrect socket %d after new callback. Dropping this handler out", l_es_new->socket); + dap_events_socket_queue_on_remove_and_delete( l_es_new ); + } +} + +/** + * @brief s_delete_es_callback + * @param a_es + * @param a_arg + */ +static void s_delete_es_callback( dap_events_socket_t * a_es, void * a_arg) +{ + if ( epoll_ctl( a_es->dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 ) + log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" ); + else + log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_es->dap_worker->number_thread ); + + a_es->dap_worker->event_sockets_count --; + dap_events_socket_delete( a_es, false ); +} + +/** + * @brief s_reassign_es_callback + * @param a_es + * @param a_arg + */ +static void s_reassign_es_callback( dap_events_socket_t * a_es, void * a_arg) +{ + dap_events_socket_t * l_es_reassign = ((dap_events_socket_t* ) a_arg); + s_es_remove( l_es_reassign); + if (l_es_reassign->callbacks.worker_unassign_callback) + l_es_reassign->callbacks.worker_unassign_callback(l_es_reassign, a_es->dap_worker); + + dap_events_socket_assign_on_worker( l_es_reassign, l_es_reassign->dap_worker ); +} + + /** * @brief dap_worker_get_min * @return @@ -618,7 +705,7 @@ int dap_events_wait( dap_events_t *sh ) */ void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_worker_t * a_worker) { - eventfd_write( a_worker->eventsfd_new, (eventfd_t) a_events_socket ); + dap_events_socket_send_event( a_worker->event_new_es, a_events_socket ); } /** diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index da51e5ab9cebe51cd5e9bc2278a09d95e4c89c69..f139fde24b20e6d643bf2c20fb10f5ff938a749c 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -27,7 +27,7 @@ #include <stdarg.h> #include <string.h> #include <assert.h> - +#include <errno.h> #ifndef _WIN32 #include <sys/epoll.h> #include <unistd.h> @@ -88,7 +88,7 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, ret->events = a_events; memcpy(&ret->callbacks, a_callbacks, sizeof(ret->callbacks) ); ret->flags = DAP_SOCK_READY_TO_READ; - pthread_mutex_init(&ret->write_hold, NULL); + pthread_mutex_init(&ret->mutex, NULL); log_it( L_DEBUG,"Dap event socket wrapped around %d sock a_events = %X", a_sock, a_events ); @@ -113,25 +113,63 @@ void dap_events_socket_assign_on_worker(dap_events_socket_t * a_es, struct dap_w */ dap_events_socket_t * dap_events_socket_create_type_event(dap_worker_t * a_w, dap_events_socket_callback_t a_callback) { -#if defined (DAP_EVENTS_CAPS_EVENT_EVENTFD) && defined (DAP_EVENTS_CAPS_EPOLL) - struct epoll_event l_ev={0}; dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); l_es->type = DESCRIPTOR_TYPE_EVENT; l_es->dap_worker = a_w; - l_es->callbacks.action_callback = a_callback; // Arm action callback - int l_eventfd = eventfd(0,EFD_NONBLOCK); - //log_it( L_DEBUG, "Created eventfd %d (%p)", l_eventfd, l_es); - l_es->socket = l_eventfd; + l_es->callbacks.event_callback = a_callback; // Arm event callback + +#ifdef DAP_EVENTS_CAPS_EVENT_PIPE_PKT_MODE + int l_pipe[2]; + int l_errno; + char l_errbuf[128]; + if( pipe2(l_pipe,O_DIRECT) < 0 ){ + l_errno = errno; + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + switch (l_errno) { + case EINVAL: log_it(L_CRITICAL, "Too old linux version thats doesn't support O_DIRECT flag for pipes (%s)", l_errbuf); break; + default: log_it( L_ERROR, "Error detected, can't create pipe(): '%s' (%d)", l_errbuf, l_errno); + } + return NULL; + }else + log_it(L_DEBUG, "Created one-way unnamed pipe %d->%d", l_pipe[0], l_pipe[1]); + l_es->fd = l_pipe[1]; + l_es->fd2 = l_pipe[0]; +#endif + +#if defined(DAP_EVENTS_CAPS_EPOLL) + struct epoll_event l_ev={0}; + int l_event_fd = l_es->fd; + log_it( L_INFO, "Create event descriptor with queue %d (%p)", l_event_fd, l_es); l_ev.events = EPOLLIN | EPOLLET; l_ev.data.ptr = l_es; - epoll_ctl(a_w->epoll_fd, EPOLL_CTL_ADD, l_eventfd, &l_ev); + epoll_ctl(a_w->epoll_fd, EPOLL_CTL_ADD, l_event_fd, &l_ev); +#endif return l_es; -#else - // Default realization with pipe - return NULL; +} + +/** + * @brief dap_events_socket_send_event + * @param a_es + * @param a_arg + */ +void dap_events_socket_send_event( dap_events_socket_t * a_es, void* a_arg) +{ +#if defined(DAP_EVENTS_CAPS_EVENT_PIPE_PKT_MODE) + write( a_es->fd2, &a_arg,sizeof(a_arg)); #endif } +/** + * @brief dap_events_socket_queue_on_remove_and_delete + * @param a_es + */ +void dap_events_socket_queue_on_remove_and_delete(dap_events_socket_t* a_es) +{ + dap_events_socket_send_event( a_es->dap_worker->event_delete_es, a_es ); +} + + + /** * @brief dap_events_socket_create_after * @param a_es @@ -148,7 +186,6 @@ void dap_events_socket_create_after( dap_events_socket_t *a_es ) pthread_mutex_lock( &a_es->dap_worker->locker_on_count ); a_es->dap_worker->event_sockets_count ++; - DL_APPEND( a_es->events->dlsockets, a_es ); pthread_rwlock_wrlock( &a_es->events->sockets_rwlock ); HASH_ADD_INT( a_es->events->sockets, socket, a_es ); @@ -228,7 +265,8 @@ void dap_events_socket_set_readable( dap_events_socket_t *sc, bool is_ready ) if( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_READ) ) return; - sc->ev.events = EPOLLERR; + sc->ev.events = sc->ev_base_flags; + sc->ev.events |= EPOLLERR; if ( is_ready ) sc->flags |= DAP_SOCK_READY_TO_READ; @@ -258,10 +296,10 @@ void dap_events_socket_set_readable( dap_events_socket_t *sc, bool is_ready ) */ void dap_events_socket_set_writable( dap_events_socket_t *sc, bool is_ready ) { - pthread_mutex_lock(&sc->write_hold); + pthread_mutex_lock(&sc->mutex); if ( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_WRITE) ) { - pthread_mutex_unlock(&sc->write_hold); + pthread_mutex_unlock(&sc->mutex); return; } @@ -270,7 +308,7 @@ void dap_events_socket_set_writable( dap_events_socket_t *sc, bool is_ready ) else sc->flags ^= DAP_SOCK_READY_TO_WRITE; - int events = EPOLLERR; + int events = sc->ev_base_flags | EPOLLERR; if( sc->flags & DAP_SOCK_READY_TO_READ ) events |= EPOLLIN; @@ -278,7 +316,7 @@ void dap_events_socket_set_writable( dap_events_socket_t *sc, bool is_ready ) if( sc->flags & DAP_SOCK_READY_TO_WRITE ) events |= EPOLLOUT; - pthread_mutex_unlock(&sc->write_hold); + pthread_mutex_unlock(&sc->mutex); sc->ev.events = events; @@ -354,9 +392,15 @@ void dap_events_socket_delete( dap_events_socket_t *a_es, bool preserve_inherito closesocket( a_es->socket ); #else close( a_es->socket ); +#ifdef DAP_EVENTS_CAPS_EVENT_PIPE_PKT_MODE + if( a_es->type == DESCRIPTOR_TYPE_EVENT){ + close( a_es->fd2); + } +#endif + #endif } - pthread_mutex_destroy(&a_es->write_hold); + pthread_mutex_destroy(&a_es->mutex); DAP_DELETE( a_es ); } @@ -364,32 +408,24 @@ void dap_events_socket_delete( dap_events_socket_t *a_es, bool preserve_inherito * @brief dap_events_socket_delete * @param a_es */ -void dap_events_socket_remove( dap_events_socket_t *a_es) +void s_es_remove( dap_events_socket_t *a_es) { - pthread_mutex_lock(&a_es->dap_worker->locker_on_count); if ( epoll_ctl( a_es->dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 ) log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" ); else log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_es->dap_worker->number_thread ); - DL_DELETE( a_es->events->dlsockets, a_es ); a_es->dap_worker->event_sockets_count --; - pthread_mutex_unlock(&a_es->dap_worker->locker_on_count); } -void dap_events_socket_remove_and_delete( dap_events_socket_t *a_es, bool preserve_inheritor ) +/** + * @brief dap_events_socket_remove_and_delete + * @param a_es + * @param preserve_inheritor + */ +void dap_events_socket_queue_remove_and_delete( dap_events_socket_t *a_es ) { - pthread_mutex_lock(&a_es->dap_worker->locker_on_count); - if ( epoll_ctl( a_es->dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 ) - log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" ); - else - log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_es->dap_worker->number_thread ); - - DL_DELETE( a_es->events->dlsockets, a_es ); - a_es->dap_worker->event_sockets_count --; - pthread_mutex_unlock(&a_es->dap_worker->locker_on_count); - - dap_events_socket_delete( a_es, preserve_inheritor ); + dap_events_socket_send_event( a_es->dap_worker->event_delete_es, a_es ); } /** @@ -402,11 +438,11 @@ void dap_events_socket_remove_and_delete( dap_events_socket_t *a_es, bool prese size_t dap_events_socket_write(dap_events_socket_t *sc, const void * data, size_t data_size) { //log_it(L_DEBUG,"dap_events_socket_write %u sock data %X size %u", sc->socket, data, data_size ); - pthread_mutex_lock(&sc->write_hold); + pthread_mutex_lock(&sc->mutex); data_size = ((sc->buf_out_size+data_size)<(sizeof(sc->buf_out)))?data_size:(sizeof(sc->buf_out)-sc->buf_out_size ); memcpy(sc->buf_out+sc->buf_out_size,data,data_size); sc->buf_out_size+=data_size; - pthread_mutex_unlock(&sc->write_hold); + pthread_mutex_unlock(&sc->mutex); return data_size; } @@ -420,7 +456,7 @@ size_t dap_events_socket_write_f(dap_events_socket_t *sc, const char * format,.. { log_it(L_DEBUG,"dap_events_socket_write_f %u sock", sc->socket ); - pthread_mutex_lock(&sc->write_hold); + pthread_mutex_lock(&sc->mutex); size_t max_data_size = sizeof(sc->buf_out)-sc->buf_out_size; va_list ap; va_start(ap,format); @@ -431,7 +467,7 @@ size_t dap_events_socket_write_f(dap_events_socket_t *sc, const char * format,.. }else{ log_it(L_ERROR,"Can't write out formatted data '%s'",format); } - pthread_mutex_unlock(&sc->write_hold); + pthread_mutex_unlock(&sc->mutex); return (ret > 0) ? ret : 0; } diff --git a/dap-sdk/net/core/include/dap_events.h b/dap-sdk/net/core/include/dap_events.h index a29e5bab9836493966e6e06cee7ba10b44644ab7..a194e01775430fe0d4199e39ece908a6ed0462ed 100644 --- a/dap-sdk/net/core/include/dap_events.h +++ b/dap-sdk/net/core/include/dap_events.h @@ -52,9 +52,6 @@ struct dap_worker; typedef struct dap_events { dap_events_socket_t *sockets; // Hashmap of event sockets - dap_events_socket_t *dlsockets; // Dlist of event sockets - dap_events_socket_t *to_kill_sockets; // Dlist of event sockets - pthread_rwlock_t sockets_rwlock; void *_inheritor; // Pointer to the internal data, HTTP for example dap_thread_t proc_thread; @@ -66,7 +63,8 @@ typedef struct dap_worker { atomic_uint event_sockets_count; - int eventsfd_new; // Events fd for new socket + dap_events_socket_t * event_new_es; // Events socket for new socket + dap_events_socket_t * event_delete_es; // Events socket for new socket EPOLL_HANDLE epoll_fd; uint32_t number_thread; pthread_mutex_t locker_on_count; diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 840f83eebb0c326e160f446b7699006d168bccd6..11322a935f1500f599da4577c5662a9cbaf2a717 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -32,12 +32,13 @@ // Caps for different platforms #if defined(DAP_OS_LINUX) #define DAP_EVENTS_CAPS_EPOLL -#define DAP_EVENTS_CAPS_EVENT_EVENTFD + #define DAP_EVENTS_CAPS_EVENT_PIPE_PKT_MODE #elif defined (DAP_OS_UNIX) #define DAP_EVENTS_CAPS_POLL #define DAP_EVENTS_CAPS_EVENT_PIPE #elif defined (DAP_OS_WINDOWS) #define DAP_EVENTS_CAPS_WEPOLL + #define DAP_EVENTS_CAPS_EPOLL #define DAP_EVENTS_CAPS_EVENT_PIPE #endif @@ -52,13 +53,14 @@ typedef struct dap_events_socket dap_events_socket_t; typedef struct dap_worker dap_worker_t; typedef struct dap_server dap_server_t; -typedef void (*dap_events_socket_callback_t) (dap_events_socket_t *,void * arg); // Callback for specific client operations +typedef void (*dap_events_socket_callback_t) (dap_events_socket_t *,void * ); // Callback for specific client operations +typedef void (*dap_events_socket_callback_timer_t) (dap_events_socket_t * ); // Callback for specific client operations typedef void (*dap_events_socket_worker_callback_t) (dap_events_socket_t *,dap_worker_t * ); // Callback for specific client operations typedef struct dap_events_socket_callbacks { union{ dap_events_socket_callback_t accept_callback; // Accept callback for listening socket - dap_events_socket_callback_t timer_callback; // Timer callback for listening socket + dap_events_socket_callback_timer_t timer_callback; // Timer callback for listening socket dap_events_socket_callback_t event_callback; // Timer callback for listening socket dap_events_socket_callback_t action_callback; // Callback for action with socket // for events and timers thats pointer @@ -90,8 +92,8 @@ typedef struct dap_events_socket { int socket; int fd; }; -#ifdef DAP_EVENTS_CAPS_EVENT_PIPE - int32_t socket2; +#ifdef DAP_EVENTS_CAPS_EVENT_PIPE_PKT_MODE + int fd2; #endif dap_events_desc_type_t type; @@ -101,6 +103,7 @@ typedef struct dap_events_socket { uint32_t flags; bool no_close; atomic_bool kill_signal; + atomic_bool is_initalized; uint32_t buf_out_zero_count; @@ -120,7 +123,10 @@ typedef struct dap_events_socket { struct dap_events *events; struct dap_worker *dap_worker; +#ifdef DAP_EVENTS_CAPS_EPOLL + uint32_t ev_base_flags; struct epoll_event ev; +#endif dap_events_socket_callbacks_t callbacks; @@ -135,22 +141,20 @@ typedef struct dap_events_socket { void *_inheritor; // Inheritor data to specific client type, usualy states for state machine - pthread_mutex_t write_hold; + pthread_mutex_t mutex; } dap_events_socket_t; // Node of bidirectional list of clients +typedef struct dap_events_socket_event{ + +} dap_events_socket_event_t; + int dap_events_socket_init(); // Init clients module void dap_events_socket_deinit(); // Deinit clients module void dap_events_socket_create_after(dap_events_socket_t * a_es); dap_events_socket_t * dap_events_socket_create_type_event(dap_worker_t * a_w, dap_events_socket_callback_t a_callback); -inline dap_events_socket_t * dap_events_socket_send_event( dap_events_socket_t * a_es) -{ -#if defined(DAP_EVENTS_CAPS_EPOLL) && defined(DAP_EVENTS_CAPS_EVENT_EVENTFD) - events -#endif -} - +void dap_events_socket_send_event( dap_events_socket_t * a_es, void* a_arg); dap_events_socket_t * dap_events_socket_wrap_no_add(struct dap_events * a_events, int s, dap_events_socket_callbacks_t * a_callbacks); // Create new client and add it to the list @@ -181,9 +185,9 @@ size_t dap_events_socket_write_mt(dap_events_socket_t *sc, const void * data, si size_t dap_events_socket_write_f_mt(dap_events_socket_t *sc, const char * format,...); -void dap_events_socket_remove( dap_events_socket_t *a_es); +void s_es_remove( dap_events_socket_t *a_es); void dap_events_socket_delete(dap_events_socket_t *sc,bool preserve_inheritor); // Removes the client from the list -void dap_events_socket_remove_and_delete(dap_events_socket_t* a_es, bool preserve_inheritor ); +void dap_events_socket_queue_remove_and_delete(dap_events_socket_t* a_es); int dap_events_socket_kill_socket( dap_events_socket_t *a_es );