From c774fe3d9497a61d73d3608161738f6c27d3c3bb Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net> Date: Mon, 24 Aug 2020 23:19:57 +0700 Subject: [PATCH] [+] message io queue for dap_stream_ch_t objects [+] _mt stream channel write functions [+] default descriptor_type_queue is now with flag DAP_SOCKET_QUEUE_PTR. If without - fired callback with additional size argument [!] Switched uthash to MurMurHash3 thats added gcc flag "-fno-strict-aliasing" in set --- dap-sdk/core/CMakeLists.txt | 2 +- dap-sdk/net/client/dap_client_http.c | 8 +- dap-sdk/net/client/dap_client_pvt.c | 6 +- dap-sdk/net/core/dap_events.c | 2 +- dap-sdk/net/core/dap_events_socket.c | 139 +++++++++++++----- dap-sdk/net/core/dap_timerfd.c | 2 +- dap-sdk/net/core/dap_worker.c | 101 ++++++++----- dap-sdk/net/core/include/dap_events.h | 2 +- dap-sdk/net/core/include/dap_events_socket.h | 43 +++--- dap-sdk/net/core/include/dap_server.h | 1 + dap-sdk/net/core/include/dap_worker.h | 21 ++- dap-sdk/net/server-udp/dap_udp_server.c | 2 +- dap-sdk/net/stream/ch/dap_stream_ch.c | 34 +++-- dap-sdk/net/stream/ch/dap_stream_ch_pkt.c | 50 ++++++- dap-sdk/net/stream/ch/include/dap_stream_ch.h | 6 +- .../net/stream/ch/include/dap_stream_ch_pkt.h | 6 +- dap-sdk/net/stream/stream/dap_stream.c | 13 +- dap-sdk/net/stream/stream/dap_stream_pkt.c | 13 +- dap-sdk/net/stream/stream/dap_stream_worker.c | 84 +++++++++++ .../stream/stream/include/dap_stream_pkt.h | 6 +- .../stream/stream/include/dap_stream_worker.h | 44 ++++++ modules/service/vpn/dap_chain_net_srv_vpn.c | 4 +- .../vpn/dap_chain_net_vpn_client_tun.c | 2 +- 23 files changed, 447 insertions(+), 144 deletions(-) create mode 100644 dap-sdk/net/stream/stream/dap_stream_worker.c create mode 100644 dap-sdk/net/stream/stream/include/dap_stream_worker.h diff --git a/dap-sdk/core/CMakeLists.txt b/dap-sdk/core/CMakeLists.txt index b9c659ff12..c2bce90532 100755 --- a/dap-sdk/core/CMakeLists.txt +++ b/dap-sdk/core/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.0) project (dap_core) # fix implicit declaration warnings -add_definitions ("-D_GNU_SOURCE") +add_definitions ("-D_GNU_SOURCE -DDHASH_USING_NO_STRICT_ALIASING -DHASH_FUNCTION=HASH_MUR -fno-strict-aliasing") if(UNIX) file(GLOB CORE_SRCS diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index 01c82fc08b..0ee825a3ad 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -208,7 +208,7 @@ static void s_http_read(dap_events_socket_t * a_es, void * arg) } else { // close connection - dap_events_socket_remove_and_delete_mt(a_es); + a_es->kill_signal=true; //dap_events_socket_remove_and_delete(a_es, true); //dap_events_socket_delete(a_es, true); } } @@ -231,7 +231,7 @@ static void s_http_error(dap_events_socket_t * a_es, void * arg) l_client_http_internal->error_callback((int)arg, l_client_http_internal->obj); // close connection - dap_events_socket_remove_and_delete_mt(a_es); + a_es->kill_signal = true; //dap_events_socket_remove_and_delete(a_es, true); //dap_events_thread_wake_up( &a_es->events->proc_thread); //dap_events_socket_delete(a_es, false); @@ -372,7 +372,7 @@ void* dap_client_http_request_custom(const char *a_uplink_addr, uint16_t a_uplin if(!l_remote_addr.sin_addr.s_addr) { if(resolve_host(a_uplink_addr, AF_INET, (struct sockaddr*) &l_remote_addr.sin_addr) < 0) { log_it(L_ERROR, "Wrong remote address '%s:%u'", a_uplink_addr, a_uplink_port); - dap_events_socket_remove_and_delete_mt(l_ev_socket); + dap_events_socket_remove_and_delete_mt(l_ev_socket->worker, l_ev_socket); return NULL; } } @@ -390,7 +390,7 @@ void* dap_client_http_request_custom(const char *a_uplink_addr, uint16_t a_uplin log_it(L_ERROR, "Remote address can't connected (%s:%u) with sock_id %d err: %s", a_uplink_addr, a_uplink_port, l_socket, strerror(errno)); //l_ev_socket->no_close = false; - dap_events_socket_remove_and_delete_mt(l_ev_socket); + dap_events_socket_remove_and_delete_mt(l_ev_socket->worker, l_ev_socket); //shutdown(l_ev_socket->socket, SHUT_RDWR); //dap_events_socket_remove_and_delete(l_ev_socket, true); //l_ev_socket->socket = 0; diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index bacd1e2f4f..6b73438e89 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -278,7 +278,7 @@ int dap_client_pvt_disconnect(dap_client_pvt_t *a_client_pvt) // l_client_internal->stream_es->signal_close = true; // start stopping connection if(a_client_pvt->stream_es ) { - dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_es); + dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_es->worker, a_client_pvt->stream_es); int l_counter = 0; // wait for stop of connection (max 0.7 sec.) while(a_client_pvt->stream_es && l_counter < 70) { @@ -496,7 +496,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) if(inet_pton(AF_INET, a_client_pvt->uplink_addr, &(l_remote_addr.sin_addr)) < 0) { log_it(L_ERROR, "Wrong remote address '%s:%u'", a_client_pvt->uplink_addr, a_client_pvt->uplink_port); //close(a_client_pvt->stream_socket); - dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_es); + dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_es->worker, a_client_pvt->stream_es); //a_client_pvt->stream_socket = 0; a_client_pvt->stage_status = STAGE_STATUS_ERROR; } @@ -513,7 +513,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) else { log_it(L_ERROR, "Remote address can't connected (%s:%u) with sock_id %d", a_client_pvt->uplink_addr, a_client_pvt->uplink_port); - dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_es); + dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_es->worker, a_client_pvt->stream_es); //close(a_client_pvt->stream_socket); a_client_pvt->stream_socket = 0; a_client_pvt->stage_status = STAGE_STATUS_ERROR; diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c index ba26c53675..4edff20bf8 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/net/core/dap_events.c @@ -179,7 +179,7 @@ void dap_events_delete( dap_events_t *a_events ) if (a_events) { dap_events_socket_t *l_cur, *l_tmp; HASH_ITER( hh, a_events->sockets,l_cur, l_tmp ) { - dap_events_socket_delete_unsafe( l_cur, true ); + dap_events_socket_remove_and_delete_unsafe( l_cur, true ); } if ( a_events->_inheritor ) diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 74fd3690a3..32ff007877 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -43,6 +43,7 @@ #endif #include "dap_common.h" +#include "dap_worker.h" #include "dap_events.h" #include "dap_events_socket.h" @@ -126,13 +127,13 @@ void dap_events_socket_assign_on_worker_unsafe(dap_events_socket_t * a_es, struc } /** - * @brief dap_events_socket_create_type_queue + * @brief s_create_type_pipe * @param a_w * @param a_callback - * @param a_buf_in_size + * @param a_flags * @return */ -dap_events_socket_t * dap_events_socket_create_type_pipe(dap_worker_t * a_w, dap_events_socket_callback_t a_callback) +dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags) { dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); l_es->type = DESCRIPTOR_TYPE_PIPE; @@ -154,24 +155,51 @@ dap_events_socket_t * dap_events_socket_create_type_pipe(dap_worker_t * a_w, dap log_it(L_DEBUG, "Created one-way unnamed bytestream pipe %d->%d", l_pipe[0], l_pipe[1]); l_es->fd = l_pipe[0]; l_es->fd2 = l_pipe[1]; + return l_es; +} +/** + * @brief dap_events_socket_create_type_pipe_mt + * @param a_w + * @param a_callback + * @param a_flags + * @return + */ +dap_events_socket_t * dap_events_socket_create_type_pipe_mt(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags) +{ + dap_events_socket_t * l_es = s_create_type_pipe(a_w, a_callback, a_flags); dap_events_socket_assign_on_worker_unsafe(l_es,a_w); return l_es; } /** - * @brief dap_events_socket_create_type_queue + * @brief dap_events_socket_create_type_pipe_unsafe * @param a_w * @param a_callback + * @param a_flags + * @return + */ +dap_events_socket_t * dap_events_socket_create_type_pipe_unsafe(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags) +{ + dap_events_socket_t * l_es = s_create_type_pipe(a_w, a_callback, a_flags); + dap_events_socket_assign_on_worker_unsafe(l_es,a_w); + return l_es; +} + +/** + * @brief s_create_type_queue + * @param a_w + * @param a_flags * @return */ -dap_events_socket_t * dap_events_socket_create_type_queue(dap_worker_t * a_w, dap_events_socket_callback_t a_callback ) +dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback) { dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); l_es->type = DESCRIPTOR_TYPE_QUEUE; l_es->worker = a_w; + l_es->flags = DAP_SOCK_QUEUE_PTR; l_es->events = a_w->events; - l_es->callbacks.queue_callback = a_callback; // Arm event callback + l_es->callbacks.queue_ptr_callback = a_callback; // Arm event callback l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; #ifdef DAP_EVENTS_CAPS_EVENT_PIPE2 @@ -192,8 +220,33 @@ dap_events_socket_t * dap_events_socket_create_type_queue(dap_worker_t * a_w, da l_es->fd = l_pipe[0]; l_es->fd2 = l_pipe[1]; #endif + return l_es; +} + +/** + * @brief dap_events_socket_create_type_queue_mt + * @param a_w + * @param a_callback + * @param a_flags + * @return + */ +dap_events_socket_t * dap_events_socket_create_type_queue_ptr_mt(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback) +{ + dap_events_socket_t * l_es = s_create_type_queue_ptr(a_w, a_callback); + dap_events_socket_assign_on_worker_mt(l_es,a_w); + return l_es; +} +/** + * @brief dap_events_socket_create_type_queue + * @param a_w + * @param a_callback + * @return + */ +dap_events_socket_t * dap_events_socket_create_type_queue_ptr_unsafe(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback) +{ + dap_events_socket_t * l_es = s_create_type_queue_ptr(a_w, a_callback); dap_events_socket_assign_on_worker_unsafe(l_es,a_w); return l_es; } @@ -203,10 +256,17 @@ dap_events_socket_t * dap_events_socket_create_type_queue(dap_worker_t * a_w, da * @param a_es * @param a_arg */ -void dap_events_socket_queue_send( dap_events_socket_t * a_es, void* a_arg) +int dap_worker_queue_send_ptr( dap_events_socket_t * a_es, void* a_arg) { #if defined(DAP_EVENTS_CAPS_EVENT_PIPE2) - write( a_es->fd2, &a_arg,sizeof(a_arg)); + int ret = write( a_es->fd2, &a_arg,sizeof(a_arg)); + int l_errno = errno; + if (ret == 0 ) + return 0; + else if ( ret < 0) + return l_errno; + else + return 1; #endif } @@ -216,7 +276,10 @@ void dap_events_socket_queue_send( dap_events_socket_t * a_es, void* a_arg) */ void dap_events_socket_queue_on_remove_and_delete(dap_events_socket_t* a_es) { - dap_events_socket_queue_send( a_es->worker->queue_es_delete, a_es ); + int l_ret= dap_worker_queue_send_ptr( a_es->worker->queue_es_delete, a_es ); + if( l_ret != 0 ){ + log_it(L_ERROR, "Queue send returned %d", l_ret); + } } @@ -376,7 +439,7 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *sc, bool is_rea * @brief dap_events_socket_remove Removes the client from the list * @param sc Connection instance */ -void dap_events_socket_delete_unsafe( dap_events_socket_t *a_es, bool preserve_inheritor ) +void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool preserve_inheritor ) { if ( !a_es ) return; @@ -429,8 +492,8 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap else log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_worker->id ); a_worker->event_sockets_count--; - if(a_worker->sockets) - HASH_DELETE(hh_worker,a_worker->sockets, a_es); + if(a_worker->esockets) + HASH_DELETE(hh_worker,a_worker->esockets, a_es); } /** @@ -438,29 +501,31 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap * @param a_es * @param preserve_inheritor */ -void dap_events_socket_remove_and_delete_mt( dap_events_socket_t *a_es ) +void dap_events_socket_remove_and_delete_mt(dap_worker_t * a_w, dap_events_socket_t *a_es ) { - dap_events_socket_queue_send( a_es->worker->queue_es_delete, a_es ); + dap_worker_queue_send_ptr( a_w->queue_es_delete, a_es ); } /** * @brief dap_events_socket_set_readable_mt - * @param sc - * @param is_ready + * @param a_w + * @param a_es + * @param a_is_ready */ -void dap_events_socket_set_readable_mt(dap_events_socket_t * a_es,bool is_ready) +void dap_events_socket_set_readable_mt(dap_worker_t * a_w, dap_events_socket_t * a_es,bool a_is_ready) { - dap_events_socket_mgs_t * l_msg = DAP_NEW_Z(dap_events_socket_mgs_t); + dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); l_msg->esocket = a_es; - if (is_ready) + if (a_is_ready) l_msg->flags_set = DAP_SOCK_READY_TO_READ; else l_msg->flags_unset = DAP_SOCK_READY_TO_READ; - if (write(a_es->fd, l_msg,sizeof (l_msg)) != sizeof (l_msg) ){ - log_it(L_ERROR, "Wasn't send pointer to queue"); + + int l_ret= dap_worker_queue_send_ptr(a_w->queue_es_io, l_msg ); + if (l_ret!=0){ + log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret); DAP_DELETE(l_msg); } - } /** @@ -468,16 +533,18 @@ void dap_events_socket_set_readable_mt(dap_events_socket_t * a_es,bool is_ready) * @param sc * @param is_ready */ -void dap_events_socket_set_writable_mt(dap_events_socket_t * a_es,bool is_ready) +void dap_events_socket_set_writable_mt(dap_worker_t * a_w, dap_events_socket_t * a_es,bool a_is_ready) { - dap_events_socket_mgs_t * l_msg = DAP_NEW_Z(dap_events_socket_mgs_t); + dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); l_msg->esocket = a_es; - if (is_ready) + if (a_is_ready) l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; else l_msg->flags_unset = DAP_SOCK_READY_TO_WRITE; - if (write(a_es->fd, l_msg,sizeof (l_msg)) != sizeof (l_msg) ){ - log_it(L_ERROR, "Wasn't send pointer to queue"); + + int l_ret= dap_worker_queue_send_ptr(a_w->queue_es_io, l_msg ); + if (l_ret!=0){ + log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret); DAP_DELETE(l_msg); } } @@ -489,17 +556,18 @@ void dap_events_socket_set_writable_mt(dap_events_socket_t * a_es,bool is_ready) * @param data_size * @return */ -size_t dap_events_socket_write_mt(dap_events_socket_t *a_es, const void * data, size_t l_data_size) +size_t dap_events_socket_write_mt(dap_worker_t * a_w,dap_events_socket_t *a_es, const void * data, size_t l_data_size) { - dap_events_socket_mgs_t * l_msg = DAP_NEW_Z(dap_events_socket_mgs_t); + dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); l_msg->esocket = a_es; l_msg->data = DAP_NEW_SIZE(void,l_data_size); l_msg->data_size = l_data_size; l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; memcpy( l_msg->data, data, l_data_size); - if (write(a_es->fd, l_msg,sizeof (l_msg)) != sizeof (l_msg) ){ - log_it(L_ERROR, "Wasn't send pointer to queue"); + int l_ret= dap_worker_queue_send_ptr(a_w->queue_es_io, l_msg ); + if (l_ret!=0){ + log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret); DAP_DELETE(l_msg); return 0; } @@ -512,7 +580,7 @@ size_t dap_events_socket_write_mt(dap_events_socket_t *a_es, const void * data, * @param format * @return */ -size_t dap_events_socket_write_f_mt(dap_events_socket_t *a_es, const char * format,...) +size_t dap_events_socket_write_f_mt(dap_worker_t * a_w,dap_events_socket_t *a_es, const char * format,...) { va_list ap; va_start(ap,format); @@ -522,7 +590,7 @@ size_t dap_events_socket_write_f_mt(dap_events_socket_t *a_es, const char * form return 0; } l_data_size++; // To calc trailing zero - dap_events_socket_mgs_t * l_msg = DAP_NEW_Z(dap_events_socket_mgs_t); + dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); l_msg->esocket = a_es; l_msg->data = DAP_NEW_SIZE(void,l_data_size); l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; @@ -534,8 +602,9 @@ size_t dap_events_socket_write_f_mt(dap_events_socket_t *a_es, const char * form } l_data_size++; l_msg->data_size = l_data_size; - if (write(a_es->fd, l_msg,sizeof (l_msg)) != sizeof (l_msg) ){ - log_it(L_ERROR, "Wasn't send pointer to queue"); + int l_ret= dap_worker_queue_send_ptr(a_w->queue_es_io, l_msg ); + if (l_ret!=0){ + log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret); DAP_DELETE(l_msg); return 0; } diff --git a/dap-sdk/net/core/dap_timerfd.c b/dap-sdk/net/core/dap_timerfd.c index 0f95d148e5..f2cc746622 100644 --- a/dap-sdk/net/core/dap_timerfd.c +++ b/dap-sdk/net/core/dap_timerfd.c @@ -147,7 +147,7 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock) */ void dap_timerfd_delete(dap_timerfd_t *l_timerfd) { - dap_events_socket_remove_and_delete_mt(l_timerfd->events_socket); + dap_events_socket_remove_and_delete_mt(l_timerfd->events_socket->worker, l_timerfd->events_socket); } #else #error "No dap_timerfd realization for your platform" diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index ad4f9446b9..d1f7a58054 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -43,7 +43,8 @@ static void s_socket_all_check_activity( void * a_arg); static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg); static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg); static void s_queue_reassign_es_callback( dap_events_socket_t * a_es, void * a_arg); -static void s_queue_es_write_callback( dap_events_socket_t * a_es, void * a_arg); +static void s_queue_callback_callback( dap_events_socket_t * a_es, void * a_arg); +static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg); /** * @brief dap_worker_init @@ -106,9 +107,10 @@ void *dap_worker_thread(void *arg) } #endif - l_worker->queue_es_new = dap_events_socket_create_type_queue( l_worker, s_queue_new_es_callback); - l_worker->queue_es_delete = dap_events_socket_create_type_queue( l_worker, s_queue_delete_es_callback); - l_worker->queue_es_write = dap_events_socket_create_type_queue( l_worker, s_queue_es_write_callback ); + l_worker->queue_es_new = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_new_es_callback); + l_worker->queue_es_delete = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_delete_es_callback); + l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_io_callback); + l_worker->queue_callback= dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_callback_callback); l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker,s_connection_timeout / 2,s_socket_all_check_activity,l_worker); #ifdef DAP_EVENTS_CAPS_EPOLL @@ -236,15 +238,20 @@ void *dap_worker_thread(void *arg) } break; case DESCRIPTOR_TYPE_QUEUE: if (l_cur->callbacks.queue_callback){ - void * l_queue_ptr = NULL; + if (l_cur->flags & DAP_SOCK_QUEUE_PTR){ + void * l_queue_ptr = NULL; #if defined(DAP_EVENTS_CAPS_EVENT_PIPE2) - if(read( l_cur->fd, &l_queue_ptr,sizeof (&l_queue_ptr)) == sizeof (&l_queue_ptr)) - l_cur->callbacks.queue_callback(l_cur, l_queue_ptr); - else if ( (errno != EAGAIN) && (errno != EWOULDBLOCK) ) // we use blocked socket for now but who knows... - log_it(L_WARNING, "Can't read packet from pipe"); + if(read( l_cur->fd, &l_queue_ptr,sizeof (void *)) == sizeof (void *)) + l_cur->callbacks.queue_callback(l_cur, l_queue_ptr,sizeof(void *)); + else if ( (errno != EAGAIN) && (errno != EWOULDBLOCK) ) // we use blocked socket for now but who knows... + log_it(L_WARNING, "Can't read packet from pipe"); #else #error "No Queue fetch mechanism implemented on your platform" #endif + }else{ + size_t l_read = read(l_cur->socket, l_cur->buf_in,sizeof(l_cur->buf_in)); + l_cur->callbacks.queue_callback(l_cur,l_cur->buf_in,l_read ); + } }else log_it(L_ERROR, "Queue socket %d accepted data but callback is NULL ", l_cur->socket); break; @@ -353,7 +360,7 @@ void *dap_worker_thread(void *arg) if(l_cur->kill_signal) { log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ]", l_cur->socket, l_tn); - dap_events_socket_delete_unsafe( l_cur, false); + dap_events_socket_remove_and_delete_unsafe( l_cur, false); } } @@ -408,7 +415,8 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) HASH_ADD(hh, w->events->sockets, socket, sizeof (int), l_es_new ); pthread_rwlock_unlock(&w->events->sockets_rwlock); // Add in worker - HASH_ADD(hh_worker, w->sockets, socket, sizeof (int), l_es_new ); + l_es_new->me = l_es_new; + HASH_ADD(hh_worker, w->esockets, me, sizeof(void *), l_es_new ); log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id); if (l_es_new->callbacks.worker_assign_callback) @@ -417,7 +425,7 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) } }else{ log_it(L_ERROR, "Incorrect socket %d after new callback. Dropping this handler out", l_es_new->socket); - dap_events_socket_delete_unsafe( l_es_new, false ); + dap_events_socket_remove_and_delete_unsafe( l_es_new, false ); } } @@ -446,31 +454,48 @@ static void s_queue_reassign_es_callback( dap_events_socket_t * a_es, void * a_a dap_events_socket_assign_on_worker_mt( l_es_reassign, l_es_reassign->worker ); } +/** + * @brief s_queue_callback + * @param a_es + * @param a_arg + */ +static void s_queue_callback_callback( dap_events_socket_t * a_es, void * a_arg) +{ + dap_worker_msg_callback_t * l_msg = (dap_worker_msg_callback_t *) a_arg; + assert(l_msg); + assert(l_msg->callback); + l_msg->callback(a_es->worker); +} + /** * @brief s_pipe_data_out_read_callback * @param a_es * @param a_arg */ -static void s_queue_es_write_callback( dap_events_socket_t * a_es, void * a_arg) +static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg) { - dap_events_socket_t * l_es_data_out; - if( a_es->buf_in_size < sizeof(l_es_data_out) ){ - dap_events_socket_mgs_t * l_msg = a_arg; - dap_events_socket_t * l_msg_es = l_msg->esocket; - // TODO add check if it was deleted - if (l_msg->flags_set & DAP_SOCK_READY_TO_READ) - dap_events_socket_set_readable_unsafe(l_msg_es, true); - if (l_msg->flags_unset & DAP_SOCK_READY_TO_READ) - dap_events_socket_set_readable_unsafe(l_msg_es, false); - if (l_msg->flags_set & DAP_SOCK_READY_TO_WRITE) - dap_events_socket_set_writable_unsafe(l_msg_es, true); - if (l_msg->flags_unset & DAP_SOCK_READY_TO_WRITE) - dap_events_socket_set_writable_unsafe(l_msg_es, false); - if (l_msg->data_size && l_msg->data) - dap_events_socket_write_unsafe(l_msg_es, l_msg->data,l_msg->data_size); + dap_worker_msg_io_t * l_msg = a_arg; + + // Check if it was removed from the list + dap_events_socket_t *l_msg_es = NULL; + HASH_FIND(hh_worker, a_es->worker->esockets, &l_msg->esocket , sizeof (void*), l_msg_es ); + if ( l_msg_es == NULL){ + log_it(L_DEBUG, "We got i/o message for client thats now not in list. Lost %u data", l_msg->data_size); DAP_DELETE(l_msg); - //log_it() + return; } + + if (l_msg->flags_set & DAP_SOCK_READY_TO_READ) + dap_events_socket_set_readable_unsafe(l_msg_es, true); + if (l_msg->flags_unset & DAP_SOCK_READY_TO_READ) + dap_events_socket_set_readable_unsafe(l_msg_es, false); + if (l_msg->flags_set & DAP_SOCK_READY_TO_WRITE) + dap_events_socket_set_writable_unsafe(l_msg_es, true); + if (l_msg->flags_unset & DAP_SOCK_READY_TO_WRITE) + dap_events_socket_set_writable_unsafe(l_msg_es, false); + if (l_msg->data_size && l_msg->data) + dap_events_socket_write_unsafe(l_msg_es, l_msg->data,l_msg->data_size); + DAP_DELETE(l_msg); } /** @@ -481,20 +506,20 @@ static void s_socket_all_check_activity( void * a_arg) { dap_worker_t *l_worker = (dap_worker_t*) a_arg; assert(l_worker); - dap_events_socket_t *a_es, *tmp; + dap_events_socket_t *l_es, *tmp; char l_curtimebuf[64]; time_t l_curtime= time(NULL); ctime_r(&l_curtime, l_curtimebuf); log_it(L_DEBUG,"Check sockets activity on worker #%u at %s", l_worker->id, l_curtimebuf); - HASH_ITER(hh_worker, l_worker->sockets, a_es, tmp ) { - if ( a_es->type == DESCRIPTOR_TYPE_SOCKET ){ - if ( !a_es->kill_signal && l_curtime >= (time_t)a_es->last_time_active + s_connection_timeout && !a_es->no_close ) { - log_it( L_INFO, "Socket %u timeout, closing...", a_es->socket ); - if (a_es->callbacks.error_callback) { - a_es->callbacks.error_callback(a_es, (void *)ETIMEDOUT); + HASH_ITER(hh_worker, l_worker->esockets, l_es, tmp ) { + if ( l_es->type == DESCRIPTOR_TYPE_SOCKET ){ + if ( !l_es->kill_signal && l_curtime >= (time_t)l_es->last_time_active + s_connection_timeout && !l_es->no_close ) { + log_it( L_INFO, "Socket %u timeout, closing...", l_es->socket ); + if (l_es->callbacks.error_callback) { + l_es->callbacks.error_callback(l_es, (void *)ETIMEDOUT); } - dap_events_socket_remove_and_delete_mt( a_es); + dap_events_socket_remove_and_delete_mt( l_worker, l_es); } } } @@ -507,7 +532,7 @@ static void s_socket_all_check_activity( void * a_arg) */ void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_worker_t * a_worker) { - dap_events_socket_queue_send( a_worker->queue_es_new, a_events_socket ); + dap_worker_queue_send_ptr( a_worker->queue_es_new, a_events_socket ); } /** diff --git a/dap-sdk/net/core/include/dap_events.h b/dap-sdk/net/core/include/dap_events.h index b8dcab692f..c31fdc2e5a 100644 --- a/dap-sdk/net/core/include/dap_events.h +++ b/dap-sdk/net/core/include/dap_events.h @@ -3,7 +3,7 @@ * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> * DeM Labs Inc. https://demlabs.net * Kelvin Project https://github.com/kelvinblockchain - * Copyright (c) 2017-2019 + * Copyright (c) 2017-2020 * All rights reserved. This file is part of DAP (Deus Applications Prototypes) the open source project diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 580938cab9..e5817514f5 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -60,12 +60,18 @@ #define DAP_SOCK_SIGNAL_CLOSE BIT( 2 ) #define DAP_SOCK_ACTIVE BIT( 3 ) +// If set - queue limited to sizeof(void*) size of data transmitted +#define DAP_SOCK_QUEUE_PTR BIT( 8 ) + typedef struct dap_events dap_events_t; typedef struct dap_events_socket dap_events_socket_t; typedef struct dap_worker dap_worker_t; typedef struct dap_server dap_server_t; typedef void (*dap_events_socket_callback_t) (dap_events_socket_t *,void * ); // Callback for specific client operations +typedef void (*dap_events_socket_callback_queue_t) (dap_events_socket_t *,const void * , size_t); // Callback for specific client operations +typedef void (*dap_events_socket_callback_pipe_t) (dap_events_socket_t *,const void * , size_t); // Callback for specific client operations +typedef void (*dap_events_socket_callback_queue_ptr_t) (dap_events_socket_t *, void *); // Callback for specific client operations typedef void (*dap_events_socket_callback_timer_t) (dap_events_socket_t * ); // Callback for specific client operations typedef void (*dap_events_socket_callback_accept_t) (dap_events_socket_t * , int, struct sockaddr* ); // Callback for accept of new connection typedef void (*dap_events_socket_worker_callback_t) (dap_events_socket_t *,dap_worker_t * ); // Callback for specific client operations @@ -74,7 +80,8 @@ typedef struct dap_events_socket_callbacks { union{ dap_events_socket_callback_accept_t accept_callback; // Accept callback for listening socket dap_events_socket_callback_timer_t timer_callback; // Timer callback for listening socket - dap_events_socket_callback_t queue_callback; // Timer callback for listening socket + dap_events_socket_callback_queue_t queue_callback; // Timer callback for listening socket + dap_events_socket_callback_queue_ptr_t queue_ptr_callback; // Timer callback for listening socket dap_events_socket_callback_t action_callback; // Callback for action with socket // for events and timers thats pointer // to processing callback @@ -108,11 +115,8 @@ typedef struct dap_events_socket { }; #ifdef DAP_EVENTS_CAPS_EVENT_PIPE2 int fd2; - - int write_pipe; #endif dap_events_desc_type_t type; - // Related sockets (be careful - possible problems, delete them before ) dap_events_socket_t ** workers_es; // If not NULL - on every worker must be present size_t workers_es_size; // events socket with same socket @@ -165,26 +169,22 @@ typedef struct dap_events_socket { time_t last_ping_request; void *_inheritor; // Inheritor data to specific client type, usualy states for state machine + struct dap_events_socket * me; // pointer on itself + UT_hash_handle hh; UT_hash_handle hh_worker; // Handle for local CPU storage on worker } dap_events_socket_t; // Node of bidirectional list of clients -typedef struct dap_events_socket_mgs{ - dap_events_socket_t * esocket; - size_t data_size; - void *data; - uint32_t flags_set; - uint32_t flags_unset; -} dap_events_socket_mgs_t; - int dap_events_socket_init(); // Init clients module void dap_events_socket_deinit(); // Deinit clients module void dap_events_socket_create_after(dap_events_socket_t * a_es); -dap_events_socket_t * dap_events_socket_create_type_queue(dap_worker_t * a_w, dap_events_socket_callback_t a_callback); -dap_events_socket_t * dap_events_socket_create_type_pipe(dap_worker_t * a_w, dap_events_socket_callback_t a_callback); -void dap_events_socket_queue_send( dap_events_socket_t * a_es, void* a_arg); +dap_events_socket_t * dap_events_socket_create_type_queue_ptr_unsafe(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback); +dap_events_socket_t * dap_events_socket_create_type_queue_ptr_mt(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback); +dap_events_socket_t * dap_events_socket_create_type_pipe_unsafe(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags); +dap_events_socket_t * dap_events_socket_create_type_pipe_mt(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags); +int dap_worker_queue_send_ptr( dap_events_socket_t * a_es, void* a_arg); dap_events_socket_t * dap_events_socket_wrap_no_add(struct dap_events * a_events, int s, dap_events_socket_callbacks_t * a_callbacks); // Create new client and add it to the list dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct dap_events *a_events, @@ -205,14 +205,13 @@ size_t dap_events_socket_write_unsafe(dap_events_socket_t *sc, const void * data size_t dap_events_socket_write_f_unsafe(dap_events_socket_t *sc, const char * format,...); // MT variants less -void dap_events_socket_set_readable_mt(dap_events_socket_t * sc,bool is_ready); -void dap_events_socket_set_writable_mt(dap_events_socket_t * sc,bool is_ready); -size_t dap_events_socket_write_mt(dap_events_socket_t *sc, const void * data, size_t data_size); -size_t dap_events_socket_write_f_mt(dap_events_socket_t *sc, const char * format,...); +void dap_events_socket_set_readable_mt(dap_worker_t * a_w, dap_events_socket_t * a_es,bool a_is_ready); +void dap_events_socket_set_writable_mt(dap_worker_t * a_w, dap_events_socket_t * a_es,bool a_is_ready); +size_t dap_events_socket_write_mt(dap_worker_t * a_w, dap_events_socket_t *sc, const void * data, size_t data_size); +size_t dap_events_socket_write_f_mt(dap_worker_t * a_w, dap_events_socket_t *sc, const char * format,...); +void dap_events_socket_remove_and_delete_mt( dap_worker_t * a_w, dap_events_socket_t* a_es); +void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool preserve_inheritor ); -void dap_events_socket_remove_and_delete_mt(dap_events_socket_t* a_es); -void dap_events_socket_delete_unsafe( dap_events_socket_t *a_es, bool preserve_inheritor ); void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap_worker_t * a_worker); - void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size); diff --git a/dap-sdk/net/core/include/dap_server.h b/dap-sdk/net/core/include/dap_server.h index 17fec15974..be2d3b6368 100644 --- a/dap-sdk/net/core/include/dap_server.h +++ b/dap-sdk/net/core/include/dap_server.h @@ -21,6 +21,7 @@ along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>. */ + #pragma once #ifndef _WIN32 diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/net/core/include/dap_worker.h index 3fae36d3dd..c6a83bc0e0 100644 --- a/dap-sdk/net/core/include/dap_worker.h +++ b/dap-sdk/net/core/include/dap_worker.h @@ -2,7 +2,7 @@ * Authors: * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> * DeM Labs Ltd. https://demlabs.net - * Copyright (c) 2017 + * Copyright (c) 2020 * All rights reserved. This file is part of DAP SDK the open source project @@ -31,20 +31,35 @@ typedef struct dap_worker uint32_t id; dap_events_t *events; atomic_uint event_sockets_count; - dap_events_socket_t *sockets; // Hashmap of event sockets + dap_events_socket_t *esockets; // Hashmap of event sockets // worker control queues dap_events_socket_t * queue_es_new; // Events socket for new socket dap_events_socket_t * queue_es_delete; // Events socke - dap_events_socket_t * queue_es_write; // Events socket for new socket + dap_events_socket_t * queue_es_io; // Events socket for new socket + + dap_events_socket_t * queue_callback; // Queue for pure callback on worker dap_timerfd_t * timer_check_activity; EPOLL_HANDLE epoll_fd; pthread_cond_t started_cond; pthread_mutex_t started_mutex; + void * _inheritor; } dap_worker_t; +typedef struct dap_worker_msg_io{ + dap_events_socket_t * esocket; + size_t data_size; + void *data; + uint32_t flags_set; + uint32_t flags_unset; +} dap_worker_msg_io_t; + +typedef struct dap_worker_msg_callback{ + void (*callback) (dap_worker_t *); // Callback for specific client operations +} dap_worker_msg_callback_t; + int dap_worker_init( size_t a_conn_timeout ); void dap_worker_deinit(); diff --git a/dap-sdk/net/server-udp/dap_udp_server.c b/dap-sdk/net/server-udp/dap_udp_server.c index 6e8c40915e..d02ac6a629 100644 --- a/dap-sdk/net/server-udp/dap_udp_server.c +++ b/dap-sdk/net/server-udp/dap_udp_server.c @@ -243,7 +243,7 @@ int check_close( dap_events_socket_t *client ) LL_DELETE( udp_server->waiting_clients, client_check ); } - dap_events_socket_remove_and_delete_mt( client ); + dap_events_socket_remove_and_delete_mt(client->worker, client ); return 1; } diff --git a/dap-sdk/net/stream/ch/dap_stream_ch.c b/dap-sdk/net/stream/ch/dap_stream_ch.c index 2dd940e218..70577caa6a 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch.c @@ -42,6 +42,7 @@ #include "dap_stream_ch.h" #include "dap_stream_ch_proc.h" #include "dap_stream_ch_pkt.h" +#include "dap_stream_worker.h" #define LOG_TAG "dap_stream_ch" @@ -88,27 +89,36 @@ dap_stream_ch_t* dap_stream_ch_new(dap_stream_t* a_stream, uint8_t id) { stream_ch_proc_t * proc=stream_ch_proc_find(id); if(proc){ - dap_stream_ch_t* ret = DAP_NEW_Z(dap_stream_ch_t); - ret->stream = a_stream; - ret->proc = proc; - ret->ready_to_read = true; + dap_stream_ch_t* l_ch_new = DAP_NEW_Z(dap_stream_ch_t); + l_ch_new->me = l_ch_new; + l_ch_new->stream = a_stream; + l_ch_new->proc = proc; + l_ch_new->ready_to_read = true; - pthread_mutex_init(&(ret->mutex),NULL); - if(ret->proc->new_callback) - ret->proc->new_callback(ret,NULL); + // Init on stream worker + dap_stream_worker_t * l_stream_worker = DAP_STREAM_WORKER( a_stream->esocket->worker ); + l_ch_new->stream_worker = l_stream_worker; + HASH_ADD(hh_worker,l_stream_worker->channels, me,sizeof (void*),l_ch_new); + + pthread_mutex_init(&(l_ch_new->mutex),NULL); + + // Proc new callback + if(l_ch_new->proc->new_callback) + l_ch_new->proc->new_callback(l_ch_new,NULL); pthread_rwlock_wrlock(&a_stream->rwlock); - a_stream->channel[ret->stream->channel_count] = ret; + a_stream->channel[l_ch_new->stream->channel_count] = l_ch_new; a_stream->channel_count++; pthread_rwlock_unlock(&a_stream->rwlock); struct dap_stream_ch_table_t *l_new_ch = DAP_NEW_Z(struct dap_stream_ch_table_t); - l_new_ch->ch = ret; + l_new_ch->ch = l_ch_new; pthread_mutex_lock(&s_ch_table_lock); HASH_ADD_PTR(s_ch_table, ch, l_new_ch); pthread_mutex_unlock(&s_ch_table_lock); - return ret; + + return l_ch_new; }else{ log_it(L_WARNING, "Unknown stream processor with id %uc",id); return NULL; @@ -135,6 +145,10 @@ struct dap_stream_ch_table_t *dap_stream_ch_valid(dap_stream_ch_t *a_ch) */ void dap_stream_ch_delete(dap_stream_ch_t *a_ch) { + dap_stream_worker_t * l_stream_worker = DAP_STREAM_WORKER( a_ch->stream->esocket->worker ); + HASH_DELETE(hh_worker,l_stream_worker->channels, a_ch); + + pthread_mutex_lock(&s_ch_table_lock); struct dap_stream_ch_table_t *l_ret;; HASH_FIND_PTR(s_ch_table, &a_ch, l_ret); diff --git a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c index fc2f24cb9d..850a388a79 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c @@ -47,6 +47,7 @@ #include "dap_stream_ch_pkt.h" #include "dap_stream_ch_proc.h" #include "dap_stream_pkt.h" +#include "dap_stream_worker.h" #define LOG_TAG "dap_stream_ch_pkt" @@ -72,8 +73,36 @@ void dap_stream_ch_pkt_deinit() * @param a_str * @return */ -size_t dap_stream_ch_pkt_write_f_mt(dap_events_socket_t *a_es, dap_enc_key_t *a_key, uint8_t a_type, const char * a_str,...) +size_t dap_stream_ch_pkt_write_f_mt(dap_stream_worker_t * a_worker , dap_stream_ch_t *a_ch, uint8_t a_type, const char * a_format,...) { + va_list ap; + va_start(ap,a_format); + int l_data_size = dap_vsnprintf(NULL,0,a_format,ap); + if (l_data_size <0 ){ + log_it(L_ERROR,"Can't write out formatted data '%s' with values",a_format); + return 0; + } + l_data_size++; // To calc trailing zero + dap_stream_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_stream_worker_msg_io_t); + l_msg->ch = a_ch; + l_msg->ch_pkt_type = a_type; + l_msg->data = DAP_NEW_SIZE(void,l_data_size); + l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; + l_data_size = dap_vsnprintf(l_msg->data,0,a_format,ap); + if (l_data_size <0 ){ + log_it(L_ERROR,"Can't write out formatted data '%s' with values",a_format); + DAP_DELETE(l_msg); + return 0; + } + l_data_size++; + l_msg->data_size = l_data_size; + int l_ret= dap_worker_queue_send_ptr(a_worker->queue_ch_io , l_msg ); + if (l_ret!=0){ + log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret); + DAP_DELETE(l_msg); + return 0; + } + return l_data_size; } @@ -85,9 +114,22 @@ size_t dap_stream_ch_pkt_write_f_mt(dap_events_socket_t *a_es, dap_enc_key_t *a_ * @param a_data_size * @return */ -size_t dap_stream_ch_pkt_write_mt(dap_events_socket_t *a_es, dap_enc_key_t *a_key, uint8_t a_type, const void * a_data, size_t a_data_size) +size_t dap_stream_ch_pkt_write_mt(dap_stream_worker_t * a_worker , dap_stream_ch_t *a_ch, uint8_t a_type, const void * a_data, size_t a_data_size) { - + dap_stream_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_stream_worker_msg_io_t); + l_msg->ch = a_ch; + l_msg->ch_pkt_type = a_type; + l_msg->data = DAP_NEW_SIZE(void,a_data_size); + l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; + l_msg->data_size = a_data_size; + memcpy( l_msg->data, a_data, a_data_size); + int l_ret= dap_worker_queue_send_ptr(a_worker->queue_ch_io , l_msg ); + if (l_ret!=0){ + log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret); + DAP_DELETE(l_msg); + return 0; + } + return a_data_size; } @@ -98,7 +140,7 @@ size_t dap_stream_ch_pkt_write_mt(dap_events_socket_t *a_es, dap_enc_key_t *a_ke * @param data_size * @return */ -size_t dap_stream_ch_pkt_write_unsafe(struct dap_stream_ch * a_ch, uint8_t a_type, const void * a_data, size_t a_data_size) +size_t dap_stream_ch_pkt_write_unsafe(dap_stream_ch_t * a_ch, uint8_t a_type, const void * a_data, size_t a_data_size) { if (! a_data_size){ log_it(L_WARNING,"Zero data size to write out in channel"); diff --git a/dap-sdk/net/stream/ch/include/dap_stream_ch.h b/dap-sdk/net/stream/ch/include/dap_stream_ch.h index 87136dbbb5..dc1e862802 100644 --- a/dap-sdk/net/stream/ch/include/dap_stream_ch.h +++ b/dap-sdk/net/stream/ch/include/dap_stream_ch.h @@ -23,8 +23,9 @@ #include <stdbool.h> #include <pthread.h> #include <stdint.h> - +#include "uthash.h" typedef struct dap_stream dap_stream_t; +typedef struct dap_stream_worker dap_stream_worker_t; typedef struct dap_stream_pkt dap_stream_pkt_t; typedef struct dap_stream_ch_proc dap_stream_ch_proc_t; typedef struct dap_stream_ch dap_stream_ch_t; @@ -39,6 +40,7 @@ typedef struct dap_stream_ch{ bool ready_to_write; bool ready_to_read; dap_stream_t * stream; + dap_stream_worker_t * stream_worker; struct{ uint64_t bytes_write; uint64_t bytes_read; @@ -48,6 +50,8 @@ typedef struct dap_stream_ch{ dap_stream_ch_proc_t * proc; void * internal; + struct dap_stream_ch *me; + UT_hash_handle hh_worker; } dap_stream_ch_t; int dap_stream_ch_init(); diff --git a/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h b/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h index 4550566c63..f2c986ca2f 100644 --- a/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h +++ b/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h @@ -28,9 +28,11 @@ #include <stddef.h> #include "dap_enc_key.h" + typedef struct dap_stream_ch dap_stream_ch_t; typedef struct dap_stream_session dap_stream_session_t; typedef struct dap_events_socket dap_events_socket_t; +typedef struct dap_stream_worker dap_stream_worker_t; typedef struct dap_stream_ch_pkt_hdr{ uint8_t id; // Channel id uint8_t enc_type; // Zero if not encrypted @@ -52,5 +54,5 @@ void dap_stream_ch_pkt_deinit(); size_t dap_stream_ch_pkt_write_f_unsafe(struct dap_stream_ch * a_ch, uint8_t a_type, const char * a_str,...); size_t dap_stream_ch_pkt_write_unsafe(struct dap_stream_ch * a_ch, uint8_t a_type, const void * a_data, size_t a_data_size); -size_t dap_stream_ch_pkt_write_f_mt(dap_events_socket_t *a_es, dap_enc_key_t *a_key, uint8_t a_type, const char * a_str,...); -size_t dap_stream_ch_pkt_write_mt(dap_events_socket_t *a_es, dap_enc_key_t *a_key, uint8_t a_type, const void * a_data, size_t a_data_size); +size_t dap_stream_ch_pkt_write_f_mt(dap_stream_worker_t * a_worker , dap_stream_ch_t *a_ch, uint8_t a_type, const char * a_str,...); +size_t dap_stream_ch_pkt_write_mt(dap_stream_worker_t * a_worker , dap_stream_ch_t *a_ch, uint8_t a_type, const void * a_data, size_t a_data_size); diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index a4fa03a5e8..d98586360a 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -49,9 +49,9 @@ #include "dap_http_client.h" #include "dap_http_header.h" #include "dap_udp_server.h" +#include "dap_stream_worker.h" - -#define LOG_TAG "stream" +#define LOG_TAG "dap_stream" #define HEADER_WITH_SIZE_FIELD 12 //This count of bytes enough for allocate memory for stream packet void stream_proc_pkt_in(dap_stream_t * sid); @@ -120,10 +120,13 @@ int dap_stream_init( bool a_dump_packet_headers) log_it(L_CRITICAL, "Can't init channel types submodule"); return -1; } - s_dump_packet_headers = a_dump_packet_headers; + if( dap_stream_worker_init() != 0 ){ + log_it(L_CRITICAL, "Can't init stream worker extention submodule"); + return -2; + } + s_dump_packet_headers = a_dump_packet_headers; s_keep_alive_loop_quit_signal = false; - pthread_mutex_init( &s_mutex_keepalive_list, NULL ); //pthread_create( &keepalive_thread, NULL, stream_loop, NULL ); @@ -754,7 +757,7 @@ void stream_proc_pkt_in(dap_stream_t * a_stream) { dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_stream->pkt_cache; - if(dap_stream_pkt_read(a_stream,l_pkt, l_ch_pkt, sizeof(a_stream->pkt_cache))==0){ + if(dap_stream_pkt_read_unsafe(a_stream,l_pkt, l_ch_pkt, sizeof(a_stream->pkt_cache))==0){ log_it(L_WARNING, "Input: can't decode packet size=%d",l_pkt_size); DAP_DELETE(l_pkt); return; diff --git a/dap-sdk/net/stream/stream/dap_stream_pkt.c b/dap-sdk/net/stream/stream/dap_stream_pkt.c index 38a72dc3cb..135ee830ff 100644 --- a/dap-sdk/net/stream/stream/dap_stream_pkt.c +++ b/dap-sdk/net/stream/stream/dap_stream_pkt.c @@ -41,6 +41,7 @@ #include "dap_events_socket.h" +#include "dap_worker.h" #include "dap_http_client.h" #include "dap_enc.h" @@ -106,7 +107,7 @@ static size_t s_encode_dummy(const void * a_buf, size_t a_buf_size, void * a_buf * @param pkt * @param buf_out */ -size_t dap_stream_pkt_read( dap_stream_t * a_stream, dap_stream_pkt_t * a_pkt, void * a_buf_out, size_t a_buf_out_size) +size_t dap_stream_pkt_read_unsafe( dap_stream_t * a_stream, dap_stream_pkt_t * a_pkt, void * a_buf_out, size_t a_buf_out_size) { size_t ds = a_stream->session->key->dec_na(a_stream->session->key,a_pkt->data,a_pkt->hdr.size,a_buf_out, a_buf_out_size); // log_it(L_DEBUG,"Stream decoded %lu bytes ( last bytes 0x%02x 0x%02x 0x%02x 0x%02x ) ", ds, @@ -161,9 +162,9 @@ size_t dap_stream_pkt_write_unsafe(dap_stream_t * a_stream, const void * a_data, * @param a_data_size * @return */ -size_t dap_stream_pkt_write_mt(dap_events_socket_t *a_es, dap_enc_key_t *a_key, const void * a_data, size_t a_data_size) +size_t dap_stream_pkt_write_mt(dap_worker_t * a_w,dap_events_socket_t *a_es, dap_enc_key_t *a_key, const void * a_data, size_t a_data_size) { - dap_events_socket_mgs_t * l_msg = DAP_NEW_Z(dap_events_socket_mgs_t); + dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); stream_pkt_hdr_t *l_pkt_hdr; l_msg->data_size = 16-a_data_size%16+a_data_size+sizeof(*l_pkt_hdr); l_msg->data = DAP_NEW_SIZE(void,l_msg->data_size); @@ -171,8 +172,10 @@ size_t dap_stream_pkt_write_mt(dap_events_socket_t *a_es, dap_enc_key_t *a_key, memset(l_pkt_hdr,0,sizeof(*l_pkt_hdr)); memcpy(l_pkt_hdr->sig,c_dap_stream_sig,sizeof(l_pkt_hdr->sig)); l_msg->data_size=sizeof (*l_pkt_hdr) +dap_enc_code(a_key, a_data,a_data_size, ((byte_t*)l_msg->data)+sizeof (*l_pkt_hdr),l_msg->data_size-sizeof (*l_pkt_hdr),DAP_ENC_DATA_TYPE_RAW); - if (write(a_es->fd, l_msg,sizeof (l_msg)) != sizeof (l_msg) ){ - log_it(L_ERROR, "Wasn't send msg pointer to queue"); + + int l_ret= dap_worker_queue_send_ptr(a_w->queue_es_io, l_msg ); + if (l_ret!=0){ + log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret); DAP_DELETE(l_msg); return 0; } diff --git a/dap-sdk/net/stream/stream/dap_stream_worker.c b/dap-sdk/net/stream/stream/dap_stream_worker.c new file mode 100644 index 0000000000..5479b71753 --- /dev/null +++ b/dap-sdk/net/stream/stream/dap_stream_worker.c @@ -0,0 +1,84 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Ltd. https://demlabs.net + * Copyright (c) 2020 + * All rights reserved. + + This file is part of DAP SDK the open source project + + DAP SDK is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP SDK is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#include "dap_common.h" +#include "dap_events.h" +#include "dap_events_socket.h" +#include "dap_stream_worker.h" +#include "dap_stream_ch_pkt.h" + +#define LOG_TAG "dap_stream_worker" + +static void s_ch_io_callback(dap_events_socket_t * a_es, void * a_msg); + +/** + * @brief dap_stream_worker_init + * @return + */ +int dap_stream_worker_init() +{ + uint32_t l_worker_count = dap_events_worker_get_count(); + for (uint32_t i = 0; i < l_worker_count; i++){ + dap_worker_t * l_worker = dap_events_worker_get(i); + if (l_worker->_inheritor){ + log_it(L_CRITICAL,"Can't init stream worker, core worker has already inheritor"); + return -1; + } + dap_stream_worker_t *l_stream_worker = DAP_NEW_Z(dap_stream_worker_t); + l_worker->_inheritor = l_stream_worker; + l_stream_worker->worker = l_worker; + l_stream_worker->queue_ch_io = dap_events_socket_create_type_queue_ptr_mt( l_worker, s_ch_io_callback); + } + return 0; +} + +/** + * @brief s_ch_io_callback + * @param a_es + * @param a_msg + */ +static void s_ch_io_callback(dap_events_socket_t * a_es, void * a_msg) +{ + dap_stream_worker_t * l_stream_worker = DAP_STREAM_WORKER( a_es->worker ); + dap_stream_worker_msg_io_t * l_msg = (dap_stream_worker_msg_io_t*) a_msg; + + // Check if it was removed from the list + dap_stream_ch_t *l_msg_ch = NULL; + HASH_FIND(hh_worker, l_stream_worker->channels , &l_msg->ch , sizeof (void*), l_msg_ch ); + if ( l_msg_ch == NULL){ + log_it(L_DEBUG, "We got i/o message for client thats now not in list. Lost %u data", l_msg->data_size); + DAP_DELETE(l_msg); + return; + } + + if (l_msg->flags_set & DAP_SOCK_READY_TO_READ) + dap_stream_ch_set_ready_to_read_unsafe(l_msg_ch, true); + if (l_msg->flags_unset & DAP_SOCK_READY_TO_READ) + dap_stream_ch_set_ready_to_read_unsafe(l_msg_ch, false); + if (l_msg->flags_set & DAP_SOCK_READY_TO_WRITE) + dap_stream_ch_set_ready_to_write_unsafe(l_msg_ch, true); + if (l_msg->flags_unset & DAP_SOCK_READY_TO_WRITE) + dap_stream_ch_set_ready_to_write_unsafe(l_msg_ch, false); + if (l_msg->data_size && l_msg->data) + dap_stream_ch_pkt_write_unsafe(l_msg_ch, l_msg->ch_pkt_type, l_msg->data,l_msg->data_size); + DAP_DELETE(l_msg); +} diff --git a/dap-sdk/net/stream/stream/include/dap_stream_pkt.h b/dap-sdk/net/stream/stream/include/dap_stream_pkt.h index c16cc5efb4..34beb4e381 100644 --- a/dap-sdk/net/stream/stream/include/dap_stream_pkt.h +++ b/dap-sdk/net/stream/stream/include/dap_stream_pkt.h @@ -23,7 +23,7 @@ #include <stddef.h> #include "dap_enc_key.h" #include "dap_events_socket.h" -#define STREAM_PKT_SIZE_MAX 500000 +#define STREAM_PKT_SIZE_MAX 100000 typedef struct dap_stream dap_stream_t; typedef struct dap_stream_session dap_stream_session_t; #define STREAM_PKT_TYPE_DATA_PACKET 0x00 @@ -54,10 +54,10 @@ extern const uint8_t c_dap_stream_sig[8]; dap_stream_pkt_t * dap_stream_pkt_detect(void * a_data, size_t data_size); -size_t dap_stream_pkt_read(dap_stream_t * a_stream, dap_stream_pkt_t * a_pkt, void * a_buf_out, size_t a_buf_out_size); +size_t dap_stream_pkt_read_unsafe(dap_stream_t * a_stream, dap_stream_pkt_t * a_pkt, void * a_buf_out, size_t a_buf_out_size); size_t dap_stream_pkt_write_unsafe(dap_stream_t * a_stream, const void * data, size_t a_data_size); -size_t dap_stream_pkt_write_mt (dap_events_socket_t *a_es, dap_enc_key_t *a_key, const void * data, size_t a_data_size); +size_t dap_stream_pkt_write_mt (dap_worker_t * a_w, dap_events_socket_t *a_es, dap_enc_key_t *a_key, const void * data, size_t a_data_size); void dap_stream_send_keepalive( dap_stream_t * a_stream); diff --git a/dap-sdk/net/stream/stream/include/dap_stream_worker.h b/dap-sdk/net/stream/stream/include/dap_stream_worker.h new file mode 100644 index 0000000000..3058cdf8d6 --- /dev/null +++ b/dap-sdk/net/stream/stream/include/dap_stream_worker.h @@ -0,0 +1,44 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Ltd. https://demlabs.net + * Copyright (c) 2020 + * All rights reserved. + + This file is part of DAP SDK the open source project + + DAP SDK is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP SDK is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#pragma once +#include "dap_worker.h" +#include "dap_stream_ch.h" + +typedef struct dap_stream_worker { + dap_worker_t * worker; + dap_events_socket_t *queue_ch_io; // IO queue for channels + dap_stream_ch_t * channels; // Client channels assigned on worker. Unsafe list, operate only in worker's context +} dap_stream_worker_t; + +#define DAP_STREAM_WORKER(a) ((dap_stream_worker_t*) (a->_inheritor) ) + +typedef struct dap_stream_worker_msg_io { + dap_stream_ch_t * ch; // Channel that has operations with + uint32_t flags_set; // set flags + uint32_t flags_unset; // unset flags + uint8_t ch_pkt_type; + void * data; + size_t data_size; +} dap_stream_worker_msg_io_t; + +int dap_stream_worker_init(); diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index 497df93328..dc817eaa14 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -753,8 +753,7 @@ static void s_tun_create(void) static void s_tun_destroy(void) { pthread_rwlock_wrlock(& s_raw_server_rwlock); - dap_events_socket_remove_and_delete_mt(s_raw_server->tun_events_socket); - close(s_raw_server->tun_fd); + dap_events_socket_remove_and_delete_mt(s_raw_server->tun_events_socket->worker, s_raw_server->tun_events_socket); s_raw_server->tun_fd = -1; pthread_rwlock_unlock(& s_raw_server_rwlock); } @@ -1764,7 +1763,6 @@ void m_es_tun_delete(dap_events_socket_t * a_es, void * arg) { log_it(L_WARNING, __PRETTY_FUNCTION__); log_it(L_NOTICE, "Raw sockets listen thread is stopped"); - dap_events_socket_remove_and_delete_mt(s_raw_server->tun_events_socket); s_tun_destroy(); } diff --git a/modules/service/vpn/dap_chain_net_vpn_client_tun.c b/modules/service/vpn/dap_chain_net_vpn_client_tun.c index 1b7213821d..b5012c6ece 100644 --- a/modules/service/vpn/dap_chain_net_vpn_client_tun.c +++ b/modules/service/vpn/dap_chain_net_vpn_client_tun.c @@ -572,7 +572,7 @@ int dap_chain_net_vpn_client_tun_delete(void) if(is_dap_tun_in_worker()) { pthread_mutex_lock(&s_clients_mutex); - dap_events_socket_remove_and_delete_mt(s_tun_events_socket); + dap_events_socket_remove_and_delete_mt(s_tun_events_socket->worker, s_tun_events_socket); s_tun_events_socket = NULL; pthread_mutex_unlock(&s_clients_mutex); } -- GitLab