diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c index 6d28b361699a2a11cb6393a226e12a711950770f..211c86ee5ab1ea0503dd640c7669c64e7ddce29c 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/net/core/dap_events.c @@ -229,8 +229,8 @@ static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t if ( !a_es->kill_signal && cur_time >= a_es->last_time_active + s_connection_timeout && !a_es->no_close ) { log_it( L_INFO, "Socket %u timeout, closing...", a_es->socket ); - if (a_es->callbacks->error_callback) { - a_es->callbacks->error_callback(a_es, (void *)ETIMEDOUT); + if (a_es->callbacks.error_callback) { + a_es->callbacks.error_callback(a_es, (void *)ETIMEDOUT); } if ( epoll_ctl( dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 ) @@ -327,7 +327,7 @@ static void *thread_worker_function(void *arg) cur->flags |= DAP_SOCK_SIGNAL_CLOSE; log_it(L_DEBUG, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err)); if(!(events[n].events & EPOLLERR)) - cur->callbacks->error_callback(cur, NULL); // Call callback to process error event + cur->callbacks.error_callback(cur, NULL); // Call callback to process error event } } @@ -335,7 +335,7 @@ static void *thread_worker_function(void *arg) getsockopt(cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size); log_it(L_ERROR, "Socket error: %s", strerror(l_sock_err)); cur->flags |= DAP_SOCK_SIGNAL_CLOSE; - cur->callbacks->error_callback(cur, NULL); // Call callback to process error event + cur->callbacks.error_callback(cur, NULL); // Call callback to process error event } cur->last_time_active = cur_time; @@ -349,28 +349,50 @@ static void *thread_worker_function(void *arg) } int32_t bytes_read = 0; - if(cur->type == DESCRIPTOR_TYPE_SOCKET) { - bytes_read = recv(cur->socket, (char *) (cur->buf_in + cur->buf_in_size), - sizeof(cur->buf_in) - cur->buf_in_size, 0); - }else if(cur->type == DESCRIPTOR_TYPE_FILE) { - bytes_read = read(cur->socket, (char *) (cur->buf_in + cur->buf_in_size), - sizeof(cur->buf_in) - cur->buf_in_size); + bool l_must_read_smth = false; + switch (cur->type) { + case DESCRIPTOR_TYPE_FILE: + l_must_read_smth = true; + bytes_read = read(cur->socket, (char *) (cur->buf_in + cur->buf_in_size), + sizeof(cur->buf_in) - cur->buf_in_size); + break; + case DESCRIPTOR_TYPE_SOCKET: + l_must_read_smth = true; + bytes_read = recv(cur->fd, (char *) (cur->buf_in + cur->buf_in_size), + sizeof(cur->buf_in) - cur->buf_in_size, 0); + break; + case DESCRIPTOR_TYPE_SOCKET_LISTENING: + // Accept connection + break; + case DESCRIPTOR_TYPE_TIMER:{ + uint64_t val; + /* if we not reading data from socket, he triggered again */ + read( cur->fd, &val, 8); + } // Pass same actions as EVENT - mostly we're also event + case DESCRIPTOR_TYPE_EVENT: + if (cur->callbacks.action_callback) + cur->callbacks.action_callback(cur, NULL); + else + log_it(L_ERROR, "Socket %d with action callback fired, but callback is NULL ", cur->socket); + break; } - if(bytes_read > 0) { - cur->buf_in_size += bytes_read; - //log_it(DEBUG, "Received %d bytes", bytes_read); - cur->callbacks->read_callback(cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well - } - else if(bytes_read < 0) { - log_it(L_ERROR, "Some error occured in recv() function: %s", strerror(errno)); - dap_events_socket_set_readable(cur, false); - cur->flags |= DAP_SOCK_SIGNAL_CLOSE; - } - else if(bytes_read == 0) { - log_it(L_INFO, "Client socket disconnected"); - dap_events_socket_set_readable(cur, false); - cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + if (l_must_read_smth){ // Socket/Descriptor read + if(bytes_read > 0) { + cur->buf_in_size += bytes_read; + //log_it(DEBUG, "Received %d bytes", bytes_read); + cur->callbacks.read_callback(cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well + } + else if(bytes_read < 0) { + log_it(L_ERROR, "Some error occured in recv() function: %s", strerror(errno)); + dap_events_socket_set_readable(cur, false); + cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + } + else if(bytes_read == 0) { + log_it(L_INFO, "Client socket disconnected"); + dap_events_socket_set_readable(cur, false); + cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + } } } @@ -378,8 +400,8 @@ static void *thread_worker_function(void *arg) if(((events[n].events & EPOLLOUT) || (cur->flags & DAP_SOCK_READY_TO_WRITE)) && !(cur->flags & DAP_SOCK_SIGNAL_CLOSE)) { ///log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size); - if(cur->callbacks->write_callback) - cur->callbacks->write_callback(cur, NULL); // Call callback to process write event + if(cur->callbacks.write_callback) + cur->callbacks.write_callback(cur, NULL); // Call callback to process write event if(cur->flags & DAP_SOCK_READY_TO_WRITE) { diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index d637a157e90b2b66d2d1e4070be11231da493540..da51e5ab9cebe51cd5e9bc2278a09d95e4c89c69 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -86,9 +86,8 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, ret->socket = a_sock; ret->events = a_events; - ret->callbacks = a_callbacks; + memcpy(&ret->callbacks, a_callbacks, sizeof(ret->callbacks) ); ret->flags = DAP_SOCK_READY_TO_READ; - ret->no_close = false; pthread_mutex_init(&ret->write_hold, NULL); log_it( L_DEBUG,"Dap event socket wrapped around %d sock a_events = %X", a_sock, a_events ); @@ -107,14 +106,40 @@ void dap_events_socket_assign_on_worker(dap_events_socket_t * a_es, struct dap_w dap_worker_add_events_socket(a_es,a_worker); } +/** + * @brief dap_events_socket_create_type_event + * @param a_w + * @return + */ +dap_events_socket_t * dap_events_socket_create_type_event(dap_worker_t * a_w, dap_events_socket_callback_t a_callback) +{ +#if defined (DAP_EVENTS_CAPS_EVENT_EVENTFD) && defined (DAP_EVENTS_CAPS_EPOLL) + struct epoll_event l_ev={0}; + dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); + l_es->type = DESCRIPTOR_TYPE_EVENT; + l_es->dap_worker = a_w; + l_es->callbacks.action_callback = a_callback; // Arm action callback + int l_eventfd = eventfd(0,EFD_NONBLOCK); + //log_it( L_DEBUG, "Created eventfd %d (%p)", l_eventfd, l_es); + l_es->socket = l_eventfd; + l_ev.events = EPOLLIN | EPOLLET; + l_ev.data.ptr = l_es; + epoll_ctl(a_w->epoll_fd, EPOLL_CTL_ADD, l_eventfd, &l_ev); + return l_es; +#else + // Default realization with pipe + return NULL; +#endif +} + /** * @brief dap_events_socket_create_after * @param a_es */ void dap_events_socket_create_after( dap_events_socket_t *a_es ) { - if ( a_es->callbacks->new_callback ) - a_es->callbacks->new_callback( a_es, NULL ); // Init internal structure + if ( a_es->callbacks.new_callback ) + a_es->callbacks.new_callback( a_es, NULL ); // Init internal structure a_es->last_time_active = a_es->last_ping_request = time( NULL ); @@ -158,7 +183,7 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da ret->socket = a_sock; ret->events = a_events; - ret->callbacks = a_callbacks; + memcpy(&ret->callbacks,a_callbacks, sizeof ( ret->callbacks) ); ret->flags = DAP_SOCK_READY_TO_READ; ret->is_pingable = true; @@ -318,8 +343,8 @@ void dap_events_socket_delete( dap_events_socket_t *a_es, bool preserve_inherito log_it( L_DEBUG, "dap_events_socket wrapped around %d socket is removed", a_es->socket ); - if( a_es->callbacks->delete_callback ) - a_es->callbacks->delete_callback( a_es, NULL ); // Init internal structure + if( a_es->callbacks.delete_callback ) + a_es->callbacks.delete_callback( a_es, NULL ); // Init internal structure if ( a_es->_inheritor && !preserve_inheritor ) DAP_DELETE( a_es->_inheritor ); diff --git a/dap-sdk/net/core/include/dap_events.h b/dap-sdk/net/core/include/dap_events.h index c63b78bb53347f159d941ae5fa42c15d82225548..a29e5bab9836493966e6e06cee7ba10b44644ab7 100644 --- a/dap-sdk/net/core/include/dap_events.h +++ b/dap-sdk/net/core/include/dap_events.h @@ -92,6 +92,7 @@ uint32_t dap_get_cpu_count( ); dap_worker_t * dap_worker_get_index(uint8_t a_index); void dap_events_socket_assign_on_worker(dap_events_socket_t * a_es, struct dap_worker * a_worker); +void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_worker_t * a_worker); void dap_worker_add_events_socket_auto( dap_events_socket_t * a_events_socket ); void dap_worker_print_all( ); diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 99ebe57b2bc355c41ecf4545b16a6547867bae23..5aed12e7c86942cd5fbf057f65555543ed495520 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -34,14 +34,30 @@ #include "wepoll.h" #endif #include <pthread.h> -struct dap_events; -struct dap_events_socket; -struct dap_worker; + +// Caps for different platforms +#if defined(DAP_OS_LINUX) + #define DAP_EVENTS_CAPS_EPOLL +#define DAP_EVENTS_CAPS_EVENT_EVENTFD +#elif defined (DAP_OS_UNIX) + #define DAP_EVENTS_CAPS_POLL + #define DAP_EVENTS_CAPS_EVENT_PIPE +#elif defined (DAP_OS_WINDOWS) + #define DAP_EVENTS_CAPS_EVENT_PIPE +#endif + +typedef struct dap_events dap_events_t; +typedef struct dap_events_socket dap_events_socket_t; +typedef struct dap_worker dap_worker_t; typedef struct dap_server dap_server_t; -typedef void (*dap_events_socket_callback_t) (struct dap_events_socket *,void * arg); // Callback for specific client operations +typedef void (*dap_events_socket_callback_t) (dap_events_socket_t *,void * arg); // Callback for specific client operations +typedef void (*dap_events_socket_worker_callback_t) (dap_events_socket_t *,dap_worker_t * ); // Callback for specific client operations typedef struct dap_events_socket_callbacks { + dap_events_socket_callback_t action_callback; // Callback for action with socket + // for events and timers thats pointer + // to processing callback dap_events_socket_callback_t new_callback; // Create new client callback dap_events_socket_callback_t delete_callback; // Delete client callback @@ -49,24 +65,45 @@ typedef struct dap_events_socket_callbacks { dap_events_socket_callback_t write_callback; // Write function dap_events_socket_callback_t error_callback; // Error processing function + dap_events_socket_worker_callback_t worker_assign_callback; // After successful worker assign + dap_events_socket_worker_callback_t worker_unassign_callback; // After successful worker unassign + } dap_events_socket_callbacks_t; #define DAP_EVENTS_SOCKET_BUF 100000 -#if 0 +typedef enum { + DESCRIPTOR_TYPE_SOCKET = 0, + DESCRIPTOR_TYPE_SOCKET_LISTENING, + DESCRIPTOR_TYPE_EVENT, + DESCRIPTOR_TYPE_TIMER, + DESCRIPTOR_TYPE_FILE +} dap_events_desc_type_t; + typedef struct dap_events_socket { + union{ + int socket; + int fd; + }; +#ifdef DAP_EVENTS_CAPS_EVENT_PIPE + int32_t socket2; +#endif + dap_events_desc_type_t type; - int socket; - bool signal_close; + dap_events_socket_t ** workers_es; // If not NULL - on every worker must be present + size_t workers_es_size; // events socket with same socket - bool _ready_to_write; - bool _ready_to_read; + uint32_t flags; + bool no_close; + atomic_bool kill_signal; uint32_t buf_out_zero_count; + union{ uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data char buf_in_str[DAP_EVENTS_SOCKET_BUF+1]; }; + size_t buf_in_size; // size of data that is in the input buffer uint8_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data @@ -77,69 +114,23 @@ typedef struct dap_events_socket { size_t buf_out_size; // size of data that is in the output buffer struct dap_events *events; - struct dap_worker *dap_worker; - dap_events_socket_callbacks_t *callbacks; + struct epoll_event ev; + + dap_events_socket_callbacks_t callbacks; time_t time_connection; + time_t last_time_active; time_t last_ping_request; bool is_pingable; UT_hash_handle hh; + struct dap_events_socket *next, *prev; + struct dap_events_socket *knext, *kprev; - void * _inheritor; // Inheritor data to specific client type, usualy states for state machine -} dap_events_socket_t; // Node of bidirectional list of clients -#endif + void *_inheritor; // Inheritor data to specific client type, usualy states for state machine -typedef enum { - DESCRIPTOR_TYPE_SOCKET = 0, - DESCRIPTOR_TYPE_FILE -} dap_events_desc_type_t; - -typedef struct dap_events_socket { - - int32_t socket; - dap_events_desc_type_t type; - - uint32_t flags; -// bool signal_close; - bool no_close; - atomic_bool kill_signal; -// bool _ready_to_write; -// bool _ready_to_read; - - uint32_t buf_out_zero_count; - union{ - uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data - char buf_in_str[DAP_EVENTS_SOCKET_BUF+1]; - }; - size_t buf_in_size; // size of data that is in the input buffer - - uint8_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data - - char hostaddr[1024]; // Address - char service[128]; - - size_t buf_out_size; // size of data that is in the output buffer - - struct dap_events *events; - struct dap_worker *dap_worker; - struct epoll_event ev; - - dap_events_socket_callbacks_t *callbacks; - - time_t time_connection; - time_t last_time_active; - time_t last_ping_request; - bool is_pingable; - - UT_hash_handle hh; - struct dap_events_socket *next, *prev; - struct dap_events_socket *knext, *kprev; - - void *_inheritor; // Inheritor data to specific client type, usualy states for state machine - - pthread_mutex_t write_hold; + pthread_mutex_t write_hold; } dap_events_socket_t; // Node of bidirectional list of clients int dap_events_socket_init(); // Init clients module @@ -147,6 +138,8 @@ void dap_events_socket_deinit(); // Deinit clients module void dap_events_socket_create_after(dap_events_socket_t * a_es); +dap_events_socket_t * dap_events_socket_create_type_event(dap_worker_t * a_w, dap_events_socket_callback_t a_callback); + dap_events_socket_t * dap_events_socket_wrap_no_add(struct dap_events * a_events, int s, dap_events_socket_callbacks_t * a_callbacks); // Create new client and add it to the list diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.h b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.h index f6b83f171919a1d99abfff45a5cfa156cb3a5380..ced85e9ac1bb8bba8a12a71edd3208982f33d8f0 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.h +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.h @@ -44,4 +44,4 @@ typedef struct dap_stream_ch_chain_net_srv { void dap_stream_ch_chain_net_srv_set_srv_uid(dap_stream_ch_t* a_ch, dap_chain_net_srv_uid_t a_srv_uid); uint8_t dap_stream_ch_chain_net_srv_get_id(); - +int dap_stream_ch_chain_net_srv_init(void);