From 73c8c06cb3853629826d0e4f1069e862fd0523ad Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net> Date: Mon, 2 Nov 2020 19:15:08 +0700 Subject: [PATCH] [!] Some updates and rework in reactor --- CMakeLists.txt | 2 +- dap-sdk/net/client/dap_client.c | 4 +- dap-sdk/net/client/dap_client_http.c | 11 +- dap-sdk/net/client/dap_client_pvt.c | 5 +- dap-sdk/net/core/dap_events_socket.c | 128 ++++++++++-------- dap-sdk/net/core/dap_proc_queue.c | 2 +- dap-sdk/net/core/dap_proc_thread.c | 2 +- dap-sdk/net/core/dap_server.c | 2 +- dap-sdk/net/core/dap_worker.c | 54 ++++---- dap-sdk/net/core/include/dap_events_socket.h | 18 ++- dap-sdk/net/core/include/dap_worker.h | 1 - .../net/server/http_server/dap_http_folder.c | 4 +- .../net/server/http_server/dap_http_simple.c | 4 +- .../http_server/http_client/dap_http_client.c | 2 +- dap-sdk/net/stream/ch/dap_stream_ch_pkt.c | 4 +- dap-sdk/net/stream/stream/dap_stream.c | 2 +- dap-sdk/net/stream/stream/dap_stream_worker.c | 8 +- .../dap_chain_global_db_driver_cdb.c | 10 +- modules/net/dap_chain_net.c | 3 +- modules/net/dap_chain_node_client.c | 2 +- modules/service/vpn/dap_chain_net_srv_vpn.c | 2 +- 21 files changed, 142 insertions(+), 128 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 2e4c57375c..73fbcb53e5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.6-29") +set(CELLFRAME_SDK_NATIVE_VERSION "2.7-0") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/dap-sdk/net/client/dap_client.c b/dap-sdk/net/client/dap_client.c index f70f5132dc..5b62c16506 100644 --- a/dap-sdk/net/client/dap_client.c +++ b/dap-sdk/net/client/dap_client.c @@ -314,13 +314,15 @@ void dap_client_go_stage(dap_client_t * a_client, dap_client_stage_t a_stage_tar return; } dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client); - assert(l_client_pvt); + log_it(L_DEBUG,"Client %p go to stage %s", a_client, dap_client_stage_str(a_stage_target) ); + struct go_stage_arg *l_stage_arg = DAP_NEW_Z(struct go_stage_arg); l_stage_arg->stage_end_callback = a_stage_end_callback; l_stage_arg->stage_target = a_stage_target; l_stage_arg->client_pvt = l_client_pvt; + dap_worker_exec_callback_on(l_client_pvt->worker, s_go_stage_on_client_worker_unsafe, l_stage_arg); } diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index 444a10b456..c94fe5f370 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -166,7 +166,7 @@ static void s_http_read(dap_events_socket_t * a_es, void * arg) s_client_http_delete(l_client_http_internal); a_es->_inheritor = NULL; - a_es->flags |= DAP_SOCK_SIGNAL_CLOSE; + a_es->flags |= DAP_ESOCK_SIGNAL_CLOSE; } } @@ -183,7 +183,7 @@ static void s_http_error(dap_events_socket_t * a_es, int a_errno) strerror_r(a_errno, l_errbuf, sizeof (l_errbuf)); else strncpy(l_errbuf,"Unknown Error", sizeof (l_errbuf)-1); - if (a_es->flags & DAP_SOCK_CONNECTING) + if (a_es->flags & DAP_ESOCK_CONNECTING) log_it(L_WARNING, "Socket connecting error: %s (code %d)" , l_errbuf, a_errno); else log_it(L_WARNING, "Socket error: %s (code %d)" , l_errbuf, a_errno); @@ -199,7 +199,7 @@ static void s_http_error(dap_events_socket_t * a_es, int a_errno) s_client_http_delete(l_client_http_internal); a_es->_inheritor = NULL; // close connection. - a_es->flags |= DAP_SOCK_SIGNAL_CLOSE; + a_es->flags |= DAP_ESOCK_SIGNAL_CLOSE; } /** @@ -253,7 +253,7 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin void *a_obj, char **a_custom, size_t a_custom_count) { - //log_it(L_DEBUG, "HTTP request on url '%s:%d'", a_uplink_addr, a_uplink_port); + log_it(L_DEBUG, "HTTP request on url '%s:%d'", a_uplink_addr, a_uplink_port); static dap_events_socket_callbacks_t l_s_callbacks = { .connected_callback = s_http_connected, .read_callback = s_http_read, @@ -326,7 +326,7 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin // connect l_ev_socket->remote_addr.sin_family = AF_INET; l_ev_socket->remote_addr.sin_port = htons(a_uplink_port); - l_ev_socket->flags |= DAP_SOCK_CONNECTING; + l_ev_socket->flags |= DAP_ESOCK_CONNECTING; int l_err = connect(l_socket, (struct sockaddr *) &l_ev_socket->remote_addr, sizeof(struct sockaddr_in)); if (l_err == 0){ log_it(L_DEBUG, "Connected momentaly with %s:%u!", a_uplink_addr, a_uplink_port); @@ -421,6 +421,7 @@ static void s_http_connected(dap_events_socket_t * a_esocket) if(!l_get_str) dap_events_socket_write_unsafe( a_esocket, l_http_pvt->request, l_http_pvt->request_size); DAP_DELETE(l_get_str); + dap_string_free(l_request_headers, true); } diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 456294610c..ee75dac1cf 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -331,8 +331,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) };// a_client_pvt->stream_es = dap_events_socket_wrap_no_add(a_client_pvt->events, a_client_pvt->stream_socket, &l_s_callbacks); - a_client_pvt->stream_es->flags |= DAP_SOCK_CONNECTING ; // To catch non-blocking error when connecting we should ar WRITE flag - + a_client_pvt->stream_es->flags = DAP_ESOCK_CONNECTING ; // To catch non-blocking error when connecting we should ar WRITE flag a_client_pvt->stream_es->_inheritor = a_client_pvt; a_client_pvt->stream = dap_stream_new_es_client(a_client_pvt->stream_es); assert(a_client_pvt->stream); @@ -1218,7 +1217,7 @@ static void s_stream_es_callback_error(dap_events_socket_t * a_es, int a_error) log_it(L_WARNING, "STREAM error \"%s\" (code %d)", l_errbuf, a_error); - l_client_pvt->stream_es->flags |= DAP_SOCK_SIGNAL_CLOSE; + l_client_pvt->stream_es->flags |= DAP_ESOCK_SIGNAL_CLOSE; if (a_error == ETIMEDOUT) { l_client_pvt->last_error = ERROR_NETWORK_CONNECTION_TIMEOUT; diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 1985ba9985..a52695c88a 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -53,6 +53,20 @@ #define LOG_TAG "dap_events_socket" +typedef struct dap_events_socket_pvt +{ + dap_events_socket_t * pipe_out; // Pipe socket with data for output + // Related sockets (be careful - possible problems, delete them before ) + dap_events_socket_t ** workers_es; // If not NULL - on every worker must be present + size_t workers_es_size; // events socket with same socket + +} dap_events_socket_pvt_t; + +static dap_events_socket_t *s_new(); +static void s_delete(dap_events_socket_t *); + +#define PVT(a) ((dap_events_socket_pvt_t*)(a)->_pvt) + /** * @brief dap_events_socket_init Init clients module * @return Zero if ok others if no @@ -82,6 +96,36 @@ void dap_events_socket_deinit( ) { } +/** + * @brief s_new + * @return + */ +static dap_events_socket_t *s_new() +{ + dap_events_socket_t *l_ret = DAP_NEW_Z_SIZE( dap_events_socket_t , sizeof (dap_events_socket_t)+ sizeof (dap_events_socket_pvt_t)); + PVT(l_ret)->workers_es_size = dap_events_worker_get_count(); + PVT(l_ret)->workers_es = DAP_NEW_Z_SIZE(dap_events_socket_t*,PVT(l_ret)->workers_es_size*sizeof (dap_events_socket_t*) ); + l_ret->flags = DAP_ESOCK_READY_TO_READ; +#if defined(DAP_EVENTS_CAPS_EPOLL) + l_ret->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP; +#elif defined(DAP_EVENTS_CAPS_POLL) + l_ret->poll_base_flags = POLLERR | POLLRDHUP | POLLHUP; +#else +#error "Not defined dap_events_socket_t::s_new() for your platform" +#endif + return l_ret; +} + +/** + * @brief s_delete + * @param a_esocket + */ +static void s_delete(dap_events_socket_t * a_esocket) +{ + if ( PVT(a_esocket)->workers_es && PVT(a_esocket)->workers_es_size ) + DAP_DELETE( PVT(a_esocket)->workers_es); + DAP_DELETE(a_esocket); +} /** * @brief dap_events_socket_wrap @@ -97,18 +141,11 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, assert(a_events); assert(a_callbacks); - dap_events_socket_t *ret = DAP_NEW_Z( dap_events_socket_t ); + dap_events_socket_t *ret = s_new(); ret->socket = a_sock; ret->events = a_events; memcpy(&ret->callbacks, a_callbacks, sizeof(ret->callbacks) ); - ret->flags = DAP_SOCK_READY_TO_READ; - - #if defined(DAP_EVENTS_CAPS_EPOLL) - ret->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP; - #elif defined(DAP_EVENTS_CAPS_POLL) - ret->poll_base_flags = POLLERR | POLLRDHUP | POLLHUP; - #endif if ( a_sock!= 0 && a_sock != -1){ pthread_rwlock_wrlock(&a_events->sockets_rwlock); @@ -137,7 +174,7 @@ void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct da void dap_events_socket_reassign_between_workers_unsafe(dap_events_socket_t * a_es, dap_worker_t * a_worker_new) { - log_it(L_DEBUG, "reassign between workers"); + log_it(L_DEBUG, "Reassign between workers"); dap_events_socket_remove_from_worker_unsafe( a_es, a_es->worker ); a_es->was_reassigned = true; if (a_es->callbacks.worker_unassign_callback) @@ -164,18 +201,11 @@ void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags) { UNUSED(a_flags); - dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); + dap_events_socket_t * l_es = s_new(); l_es->type = DESCRIPTOR_TYPE_PIPE; l_es->worker = a_w; l_es->events = a_w->events; l_es->callbacks.read_callback = a_callback; // Arm event callback -#if defined(DAP_EVENTS_CAPS_EPOLL) - l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; -#elif defined(DAP_EVENTS_CAPS_POLL) - l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; -#else -#error "Not defined s_create_type_pipe for your platform" -#endif #if defined(DAP_EVENTS_CAPS_PIPE_POSIX) int l_pipe[2]; @@ -237,24 +267,16 @@ dap_events_socket_t * dap_events_socket_create_type_pipe_unsafe(dap_worker_t * a */ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback) { - dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); + dap_events_socket_t * l_es = s_new(); l_es->type = DESCRIPTOR_TYPE_QUEUE; - l_es->flags = DAP_SOCK_QUEUE_PTR; + l_es->flags |= DAP_SOCK_QUEUE_PTR; if (a_w){ l_es->events = a_w->events; l_es->worker = a_w; - } - l_es->callbacks.queue_ptr_callback = a_callback; // Arm event callback - -#if defined(DAP_EVENTS_CAPS_EPOLL) - l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; -#elif defined(DAP_EVENTS_CAPS_POLL) - l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; -#else -#error "Not defined s_create_type_queue_ptr for your platform" -#endif + } + l_es->callbacks.queue_ptr_callback = a_callback; // Arm event callback #ifdef DAP_EVENTS_CAPS_QUEUE_PIPE2 int l_pipe[2]; int l_errno; @@ -283,7 +305,7 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc fread(l_file_buf, l_file_buf_size, 1, l_sys_max_pipe_size_fd); uint64_t l_sys_max_pipe_size = strtoull(l_file_buf, 0, 10); if (l_sys_max_pipe_size && fcntl(l_pipe[0], F_SETPIPE_SZ, l_sys_max_pipe_size) == l_sys_max_pipe_size) { - log_it(L_DEBUG, "Successfully resized pipe buffer to %lld", l_sys_max_pipe_size); + //log_it(L_DEBUG, "Successfully resized pipe buffer to %lld", l_sys_max_pipe_size); } #elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX) char l_mq_name[64]; @@ -394,20 +416,13 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) */ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_callback_event_t a_callback) { - dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); + dap_events_socket_t * l_es = s_new(); l_es->type = DESCRIPTOR_TYPE_EVENT; if (a_w){ l_es->events = a_w->events; l_es->worker = a_w; } l_es->callbacks.event_callback = a_callback; // Arm event callback -#if defined(DAP_EVENTS_CAPS_EPOLL) - l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; -#elif defined(DAP_EVENTS_CAPS_POLL) - l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; -#else -#error "Not defined s_create_type_event for your platform" -#endif #ifdef DAP_EVENTS_CAPS_EVENT_EVENTFD if((l_es->fd = eventfd(0,EFD_NONBLOCK) ) < 0 ){ @@ -425,8 +440,8 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_ } DAP_DELETE(l_es); return NULL; - }else - log_it(L_DEBUG, "Created eventfd descriptor %d", l_es->fd ); + }//else + // log_it(L_DEBUG, "Created eventfd descriptor %d", l_es->fd ); #endif return l_es; } @@ -660,7 +675,7 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da assert( a_server ); //log_it( L_DEBUG,"Dap event socket wrapped around %d sock", a_sock ); - dap_events_socket_t * ret = DAP_NEW_Z( dap_events_socket_t ); + dap_events_socket_t * ret = s_new(); ret->socket = a_sock; ret->events = a_events; @@ -668,7 +683,6 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da memcpy(&ret->callbacks,a_callbacks, sizeof ( ret->callbacks) ); - ret->flags = DAP_SOCK_READY_TO_READ; ret->last_time_active = ret->last_ping_request = time( NULL ); pthread_rwlock_wrlock( &a_events->sockets_rwlock ); @@ -724,9 +738,9 @@ void dap_events_socket_worker_poll_update_unsafe(dap_events_socket_t * a_esocket struct pollfd * l_poll = &a_esocket->worker->poll[a_esocket->poll_index]; l_poll->events = a_esocket->poll_base_flags | POLLERR ; // Check & add - if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) + if( a_esocket->flags & DAP_ESOCK_READY_TO_READ ) l_poll->events |= POLLIN; - if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE || a_esocket->flags &DAP_SOCK_CONNECTING ) + if( a_esocket->flags & DAP_ESOCK_READY_TO_WRITE || a_esocket->flags &DAP_ESOCK_CONNECTING ) l_poll->events |= POLLOUT; }else{ log_it(L_ERROR, "Wrong poll index when remove from worker (unsafe): %u when total count %u", a_esocket->poll_index, @@ -745,13 +759,13 @@ void dap_events_socket_worker_poll_update_unsafe(dap_events_socket_t * a_esocket */ void dap_events_socket_set_readable_unsafe( dap_events_socket_t *a_esocket, bool is_ready ) { - if( is_ready == (bool)(a_esocket->flags & DAP_SOCK_READY_TO_READ)) + if( is_ready == (bool)(a_esocket->flags & DAP_ESOCK_READY_TO_READ)) return; if ( is_ready ) - a_esocket->flags |= DAP_SOCK_READY_TO_READ; + a_esocket->flags |= DAP_ESOCK_READY_TO_READ; else - a_esocket->flags ^= DAP_SOCK_READY_TO_READ; + a_esocket->flags ^= DAP_ESOCK_READY_TO_READ; if( a_esocket->worker) dap_events_socket_worker_poll_update_unsafe( a_esocket); @@ -764,14 +778,14 @@ void dap_events_socket_set_readable_unsafe( dap_events_socket_t *a_esocket, bool */ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool a_is_ready ) { - if ( a_is_ready == (bool)(a_esocket->flags & DAP_SOCK_READY_TO_WRITE)) { + if ( a_is_ready == (bool)(a_esocket->flags & DAP_ESOCK_READY_TO_WRITE)) { return; } if ( a_is_ready ) - a_esocket->flags |= DAP_SOCK_READY_TO_WRITE; + a_esocket->flags |= DAP_ESOCK_READY_TO_WRITE; else - a_esocket->flags ^= DAP_SOCK_READY_TO_WRITE; + a_esocket->flags ^= DAP_ESOCK_READY_TO_WRITE; if( a_esocket->worker ) dap_events_socket_worker_poll_update_unsafe(a_esocket); @@ -834,7 +848,7 @@ void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_p #endif } - DAP_DELETE( a_esocket ); + s_delete(a_esocket); } /** @@ -914,9 +928,9 @@ void dap_events_socket_set_readable_mt(dap_worker_t * a_w, dap_events_socket_t * dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); l_msg->esocket = a_es; if (a_is_ready) - l_msg->flags_set = DAP_SOCK_READY_TO_READ; + l_msg->flags_set = DAP_ESOCK_READY_TO_READ; else - l_msg->flags_unset = DAP_SOCK_READY_TO_READ; + l_msg->flags_unset = DAP_ESOCK_READY_TO_READ; int l_ret= dap_events_socket_queue_ptr_send(a_w->queue_es_io, l_msg ); if (l_ret!=0){ @@ -935,9 +949,9 @@ void dap_events_socket_set_writable_mt(dap_worker_t * a_w, dap_events_socket_t * dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); l_msg->esocket = a_es; if (a_is_ready) - l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; + l_msg->flags_set = DAP_ESOCK_READY_TO_WRITE; else - l_msg->flags_unset = DAP_SOCK_READY_TO_WRITE; + l_msg->flags_unset = DAP_ESOCK_READY_TO_WRITE; int l_ret= dap_events_socket_queue_ptr_send(a_w->queue_es_io, l_msg ); if (l_ret!=0){ @@ -959,7 +973,7 @@ size_t dap_events_socket_write_mt(dap_worker_t * a_w,dap_events_socket_t *a_es, l_msg->esocket = a_es; l_msg->data = DAP_NEW_SIZE(void,l_data_size); l_msg->data_size = l_data_size; - l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; + l_msg->flags_set = DAP_ESOCK_READY_TO_WRITE; memcpy( l_msg->data, data, l_data_size); int l_ret= dap_events_socket_queue_ptr_send(a_w->queue_es_io, l_msg ); @@ -992,7 +1006,7 @@ size_t dap_events_socket_write_f_mt(dap_worker_t * a_w,dap_events_socket_t *a_es dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); l_msg->esocket = a_es; l_msg->data = DAP_NEW_SIZE(void,l_data_size + 1); - l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; + l_msg->flags_set = DAP_ESOCK_READY_TO_WRITE; l_data_size = dap_vsprintf(l_msg->data,format,ap_copy); va_end(ap_copy); if (l_data_size <0 ){ diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/net/core/dap_proc_queue.c index 2a6374b2a1..171a121cf2 100644 --- a/dap-sdk/net/core/dap_proc_queue.c +++ b/dap-sdk/net/core/dap_proc_queue.c @@ -82,7 +82,7 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg) //log_it( L_DEBUG, "Sent signal to proc thread that we have callbacks on board"); } if (l_msg->signal_kill){ // Say to kill this object and delete its inherior dap_proc_queue_t - a_es->flags |= DAP_SOCK_SIGNAL_CLOSE; + a_es->flags |= DAP_ESOCK_SIGNAL_CLOSE; } DAP_DELETE(l_msg); } diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 86afea3b6d..6fa891804f 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -287,7 +287,7 @@ static void * s_proc_thread_function(void * a_arg) default:{ log_it(L_ERROR, "Unprocessed descriptor type accepted in proc thread loop"); } } } - if(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE){ + if(l_cur->flags & DAP_ESOCK_SIGNAL_CLOSE){ #ifdef DAP_EVENTS_CAPS_EPOLL if ( epoll_ctl( l_thread->epoll_ctl, EPOLL_CTL_DEL, l_cur->fd, &l_cur->ev ) == -1 ) log_it( L_ERROR,"Can't remove event socket's handler from the epoll ctl" ); diff --git a/dap-sdk/net/core/dap_server.c b/dap-sdk/net/core/dap_server.c index 504f027fc7..cc42d5dd6e 100644 --- a/dap-sdk/net/core/dap_server.c +++ b/dap-sdk/net/core/dap_server.c @@ -273,7 +273,7 @@ static dap_events_socket_t * s_es_server_create(dap_events_t * a_events, int a_s dap_events_socket_t * ret = NULL; if (a_sock > 0) { // set it nonblock - //fcntl(a_sock, F_SETFL, O_NONBLOCK); + fcntl(a_sock, F_SETFL, O_NONBLOCK); ret = dap_events_socket_wrap_no_add(a_events, a_sock, a_callbacks); ret->type = DESCRIPTOR_TYPE_SOCKET; diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 88d31927a2..d3ae25bdfe 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -100,11 +100,11 @@ void *dap_worker_thread(void *arg) l_worker->poll_count_max = DAP_EVENTS_SOCKET_MAX; l_worker->poll = DAP_NEW_Z_SIZE(struct pollfd,l_worker->poll_count_max*sizeof (struct pollfd)); l_worker->poll_esocket = DAP_NEW_Z_SIZE(dap_events_socket_t*,l_worker->poll_count_max*sizeof (dap_events_socket_t*)); + log_it(L_INFO, "Worker #%d started with poll() and assigned to dedicated CPU unit", l_worker->id ); #else #error "Unimplemented socket array for this platform" #endif - l_worker->queue_es_new = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_add_es_callback); l_worker->queue_es_delete = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_delete_es_callback); l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_io_callback); @@ -179,7 +179,7 @@ void *dap_worker_thread(void *arg) dap_events_socket_set_readable_unsafe(l_cur, false); dap_events_socket_set_writable_unsafe(l_cur, false); l_cur->buf_out_size = 0; - l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + l_cur->flags |= DAP_ESOCK_SIGNAL_CLOSE; l_flag_error = l_flag_read = l_flag_write = false; l_cur->callbacks.error_callback(l_cur, l_sock_err); // Call callback to process error event log_it(L_INFO, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err)); @@ -200,7 +200,7 @@ void *dap_worker_thread(void *arg) dap_events_socket_set_readable_unsafe(l_cur, false); dap_events_socket_set_writable_unsafe(l_cur, false); l_cur->buf_out_size = 0; - l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + l_cur->flags |= DAP_ESOCK_SIGNAL_CLOSE; l_cur->callbacks.error_callback(l_cur, l_sock_err); // Call callback to process error event } @@ -313,11 +313,11 @@ void *dap_worker_thread(void *arg) if (l_errno != EAGAIN && l_errno != EWOULDBLOCK){ // Socket is blocked log_it(L_ERROR, "Some error occured in recv() function: %s", strerror(errno)); dap_events_socket_set_readable_unsafe(l_cur, false); - l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + l_cur->flags |= DAP_ESOCK_SIGNAL_CLOSE; l_cur->buf_out_size = 0; } } - else if ( (! l_flag_rdhup || !l_flag_error ) && (!(l_cur->flags& DAP_SOCK_CONNECTING )) ) { + else if ( (! l_flag_rdhup || !l_flag_error ) && (!(l_cur->flags& DAP_ESOCK_CONNECTING )) ) { log_it(L_WARNING, "EPOLLIN triggered but nothing to read"); dap_events_socket_set_readable_unsafe(l_cur,false); } @@ -328,7 +328,7 @@ void *dap_worker_thread(void *arg) // log_it(L_DEBUG,"Alarmed write flag for remote %s", l_cur->remote_addr_str[0]?l_cur->remote_addr_str:"(null)"); // If its outgoing connection - if ( l_flag_write && ! l_cur->server && l_cur->flags& DAP_SOCK_CONNECTING && + if ( l_flag_write && ! l_cur->server && l_cur->flags& DAP_ESOCK_CONNECTING && (l_cur->type == DESCRIPTOR_TYPE_SOCKET || l_cur->type == DESCRIPTOR_TYPE_SOCKET_UDP )){ int l_error = 0; socklen_t l_error_len = sizeof(l_error); @@ -346,7 +346,7 @@ void *dap_worker_thread(void *arg) log_it(L_DEBUG, "Connecting with %s in progress...", l_cur->remote_addr_str[0]? l_cur->remote_addr_str: "(NULL)"); }else{ // log_it(L_NOTICE, "Connected with %s",l_cur->remote_addr_str[0]? l_cur->remote_addr_str: "(NULL)"); - l_cur->flags ^= DAP_SOCK_CONNECTING; + l_cur->flags ^= DAP_ESOCK_CONNECTING; if (l_cur->callbacks.connected_callback) l_cur->callbacks.connected_callback(l_cur); dap_events_socket_worker_poll_update_unsafe(l_cur); @@ -354,14 +354,14 @@ void *dap_worker_thread(void *arg) } // Socket is ready to write and not going to close - if( ( l_flag_write&&(l_cur->flags & DAP_SOCK_READY_TO_WRITE) ) || - ( (l_cur->flags & DAP_SOCK_READY_TO_WRITE) && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) ) ) { + if( ( l_flag_write&&(l_cur->flags & DAP_ESOCK_READY_TO_WRITE) ) || + ( (l_cur->flags & DAP_ESOCK_READY_TO_WRITE) && !(l_cur->flags & DAP_ESOCK_SIGNAL_CLOSE) ) ) { //log_it(L_DEBUG, "Main loop output: %u bytes to send", l_cur->buf_out_size); if(l_cur->callbacks.write_callback) l_cur->callbacks.write_callback(l_cur, NULL); // Call callback to process write event if ( l_cur->worker ){ // esocket wasn't unassigned in callback, we need some other ops with it - if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) { + if(l_cur->flags & DAP_ESOCK_READY_TO_WRITE) { static const uint32_t buf_out_zero_count_max = 2; //l_cur->buf_out[l_cur->buf_out_size] = 0; @@ -409,7 +409,7 @@ void *dap_worker_thread(void *arg) if(l_bytes_sent < 0) { if (l_errno != EAGAIN && l_errno != EWOULDBLOCK ){ // If we have non-blocking socket log_it(L_ERROR, "Some error occured in send(): %s", strerror(errno)); - l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + l_cur->flags |= DAP_ESOCK_SIGNAL_CLOSE; l_cur->buf_out_size = 0; } }else{ @@ -432,7 +432,7 @@ void *dap_worker_thread(void *arg) dap_events_socket_set_writable_unsafe(l_cur,true); } - if ((l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !l_cur->no_close) + if ((l_cur->flags & DAP_ESOCK_SIGNAL_CLOSE) && !l_cur->no_close) { if (l_cur->buf_out_size == 0) { log_it(L_INFO, "Process signal to close %s, sock %u [thread %u]", l_cur->hostaddr, l_cur->socket, l_tn); @@ -541,7 +541,7 @@ static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg { dap_events_socket_t * l_esocket = (dap_events_socket_t*) a_arg; if (dap_events_socket_check_unsafe(a_es->worker,l_esocket)){ - ((dap_events_socket_t*)a_arg)->flags |= DAP_SOCK_SIGNAL_CLOSE; // Send signal to socket to kill + ((dap_events_socket_t*)a_arg)->flags |= DAP_ESOCK_SIGNAL_CLOSE; // Send signal to socket to kill }else log_it(L_INFO, "While we were sending the delete() message, esocket %p has been disconnected", l_esocket); } @@ -556,7 +556,7 @@ static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_a dap_worker_msg_reassign_t * l_msg = (dap_worker_msg_reassign_t*) a_arg; dap_events_socket_t * l_es_reassign = l_msg->esocket; if (dap_events_socket_check_unsafe(a_es->worker,l_es_reassign)){ - if( l_es_reassign->was_reassigned && l_es_reassign->flags & DAP_SOCK_REASSIGN_ONCE) { + if( l_es_reassign->was_reassigned && l_es_reassign->flags & DAP_ESOCK_REASSIGN_ONCE) { log_it(L_INFO, "Reassgment request with DAP_SOCK_REASSIGN_ONCE allowed only once, declined reassigment from %u to %u", l_es_reassign->worker->id, l_msg->worker_new->id); @@ -612,25 +612,25 @@ static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg) DAP_DELETE(l_msg); return; } - if (l_msg->flags_set & DAP_SOCK_CONNECTING) - if (! (l_msg_es->flags & DAP_SOCK_CONNECTING) ){ - l_msg_es->flags |= DAP_SOCK_CONNECTING; + if (l_msg->flags_set & DAP_ESOCK_CONNECTING) + if (! (l_msg_es->flags & DAP_ESOCK_CONNECTING) ){ + l_msg_es->flags |= DAP_ESOCK_CONNECTING; dap_events_socket_worker_poll_update_unsafe(l_msg_es); } - if (l_msg->flags_set & DAP_SOCK_CONNECTING) - if (! (l_msg_es->flags & DAP_SOCK_CONNECTING) ){ - l_msg_es->flags ^= DAP_SOCK_CONNECTING; + if (l_msg->flags_set & DAP_ESOCK_CONNECTING) + if (! (l_msg_es->flags & DAP_ESOCK_CONNECTING) ){ + l_msg_es->flags ^= DAP_ESOCK_CONNECTING; dap_events_socket_worker_poll_update_unsafe(l_msg_es); } - if (l_msg->flags_set & DAP_SOCK_READY_TO_READ) + if (l_msg->flags_set & DAP_ESOCK_READY_TO_READ) dap_events_socket_set_readable_unsafe(l_msg_es, true); - if (l_msg->flags_unset & DAP_SOCK_READY_TO_READ) + if (l_msg->flags_unset & DAP_ESOCK_READY_TO_READ) dap_events_socket_set_readable_unsafe(l_msg_es, false); - if (l_msg->flags_set & DAP_SOCK_READY_TO_WRITE) + if (l_msg->flags_set & DAP_ESOCK_READY_TO_WRITE) dap_events_socket_set_writable_unsafe(l_msg_es, true); - if (l_msg->flags_unset & DAP_SOCK_READY_TO_WRITE) + if (l_msg->flags_unset & DAP_ESOCK_READY_TO_WRITE) dap_events_socket_set_writable_unsafe(l_msg_es, false); if (l_msg->data_size && l_msg->data) dap_events_socket_write_unsafe(l_msg_es, l_msg->data,l_msg->data_size); @@ -653,7 +653,7 @@ static void s_socket_all_check_activity( void * a_arg) HASH_ITER(hh_worker, l_worker->esockets, l_es, tmp ) { if ( l_es->type == DESCRIPTOR_TYPE_SOCKET || l_es->type == DESCRIPTOR_TYPE_SOCKET_UDP ){ - if ( !(l_es->flags & DAP_SOCK_SIGNAL_CLOSE) && + if ( !(l_es->flags & DAP_ESOCK_SIGNAL_CLOSE) && ( l_curtime >= (l_es->last_time_active + s_connection_timeout) ) && !l_es->no_close ) { log_it( L_INFO, "Socket %u timeout (diff %u ), closing...", l_es->socket, l_curtime - (time_t)l_es->last_time_active - s_connection_timeout ); if (l_es->callbacks.error_callback) { @@ -708,9 +708,9 @@ int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_wo a_worker->poll[a_worker->poll_count].fd = a_esocket->socket; a_esocket->poll_index = a_worker->poll_count; a_worker->poll[a_worker->poll_count].events = a_esocket->poll_base_flags; - if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) + if( a_esocket->flags & DAP_ESOCK_READY_TO_READ ) a_worker->poll[a_worker->poll_count].events |= POLLIN; - if( (a_esocket->flags & DAP_SOCK_READY_TO_WRITE) || (a_esocket->flags & DAP_SOCK_CONNECTING) ) + if( (a_esocket->flags & DAP_ESOCK_READY_TO_WRITE) || (a_esocket->flags & DAP_ESOCK_CONNECTING) ) a_worker->poll[a_worker->poll_count].events |= POLLOUT; diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 3e7d44854f..077fbae507 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -68,11 +68,11 @@ #endif #define BIT( x ) ( 1 << x ) -#define DAP_SOCK_READY_TO_READ BIT( 0 ) -#define DAP_SOCK_READY_TO_WRITE BIT( 1 ) -#define DAP_SOCK_SIGNAL_CLOSE BIT( 2 ) -#define DAP_SOCK_CONNECTING BIT( 3 ) // When connection happens this flag is armed for outgoing connections until its establish the connection -#define DAP_SOCK_REASSIGN_ONCE BIT( 4 ) // This usable for FlowControl to prevent multiple reassigment +#define DAP_ESOCK_READY_TO_READ BIT( 0 ) +#define DAP_ESOCK_READY_TO_WRITE BIT( 1 ) +#define DAP_ESOCK_SIGNAL_CLOSE BIT( 2 ) +#define DAP_ESOCK_CONNECTING BIT( 3 ) // When connection happens this flag is armed for outgoing connections until its establish the connection +#define DAP_ESOCK_REASSIGN_ONCE BIT( 4 ) // This usable for FlowControl to prevent multiple reassigment // If set - queue limited to sizeof(void*) size of data transmitted #define DAP_SOCK_QUEUE_PTR BIT( 8 ) @@ -139,9 +139,6 @@ typedef struct dap_events_socket { int fd2; #endif dap_events_desc_type_t type; - // Related sockets (be careful - possible problems, delete them before ) - dap_events_socket_t ** workers_es; // If not NULL - on every worker must be present - size_t workers_es_size; // events socket with same socket // Flags. TODO - rework in bool fields uint32_t flags; @@ -162,7 +159,6 @@ typedef struct dap_events_socket { uint8_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data size_t buf_out_size; // size of data that is in the output buffer - dap_events_socket_t * pipe_out; // Pipe socket with data for output // Stored string representation char hostaddr[1024]; // Address @@ -195,11 +191,13 @@ typedef struct dap_events_socket { time_t last_time_active; time_t last_ping_request; - void *_inheritor; // Inheritor data to specific client type, usualy states for state machine struct dap_events_socket * me; // pointer on itself UT_hash_handle hh; UT_hash_handle hh_worker; // Handle for local CPU storage on worker + + void *_inheritor; // Inheritor data to specific client type, usualy states for state machine + byte_t _pvt[]; // Private section } dap_events_socket_t; // Node of bidirectional list of clients int dap_events_socket_init(); // Init clients module diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/net/core/include/dap_worker.h index f8c88d66a6..931130c489 100644 --- a/dap-sdk/net/core/include/dap_worker.h +++ b/dap-sdk/net/core/include/dap_worker.h @@ -51,7 +51,6 @@ typedef struct dap_worker #ifdef DAP_EVENTS_CAPS_EPOLL EPOLL_HANDLE epoll_fd; #elif defined ( DAP_EVENTS_CAPS_POLL) - int poll_fd; struct pollfd * poll; dap_events_socket_t ** poll_esocket; size_t poll_count; diff --git a/dap-sdk/net/server/http_server/dap_http_folder.c b/dap-sdk/net/server/http_server/dap_http_folder.c index 857f397752..41d7427d52 100644 --- a/dap-sdk/net/server/http_server/dap_http_folder.c +++ b/dap-sdk/net/server/http_server/dap_http_folder.c @@ -258,7 +258,7 @@ void dap_http_folder_headers_write( dap_http_client_t *cl_ht, void * arg) } else { cl_ht->reply_status_code=Http_Status_NotFound; - cl_ht->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; + cl_ht->esocket->flags |= DAP_ESOCK_SIGNAL_CLOSE; log_it(L_WARNING,"Can't detect MIME type of %s file: %s",cl_ht_file->local_path,magic_error(up_folder->mime_detector)); } } @@ -306,7 +306,7 @@ void dap_http_folder_data_write(dap_http_client_t * cl_ht, void * arg) dap_events_socket_set_writable_unsafe(cl_ht->esocket,false); if ( !cl_ht->keep_alive ) - cl_ht->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; + cl_ht->esocket->flags |= DAP_ESOCK_SIGNAL_CLOSE; cl_ht->state_write=DAP_HTTP_CLIENT_STATE_NONE; } diff --git a/dap-sdk/net/server/http_server/dap_http_simple.c b/dap-sdk/net/server/http_server/dap_http_simple.c index 17f0cf83bc..003b88871b 100644 --- a/dap-sdk/net/server/http_server/dap_http_simple.c +++ b/dap-sdk/net/server/http_server/dap_http_simple.c @@ -368,7 +368,7 @@ static void s_http_client_data_write( dap_http_client_t * a_http_client, void *a // Sleep(300); if ( !l_http_simple->reply ) { - a_http_client->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; + a_http_client->esocket->flags |= DAP_ESOCK_SIGNAL_CLOSE; log_it( L_WARNING, "No reply to write, close connection" ); return; } @@ -380,7 +380,7 @@ static void s_http_client_data_write( dap_http_client_t * a_http_client, void *a if ( l_http_simple->reply_sent >= a_http_client->out_content_length ) { log_it(L_INFO, "All the reply (%u) is sent out", a_http_client->out_content_length ); //cl_ht->client->signal_close=cl_ht->keep_alive; - a_http_client->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; + a_http_client->esocket->flags |= DAP_ESOCK_SIGNAL_CLOSE; //dap_client_ready_to_write(cl_ht->client,false); DAP_DELETE(l_http_simple->reply ); } diff --git a/dap-sdk/net/server/http_server/http_client/dap_http_client.c b/dap-sdk/net/server/http_server/http_client/dap_http_client.c index 8ee096e68d..ba21944730 100644 --- a/dap-sdk/net/server/http_server/http_client/dap_http_client.c +++ b/dap-sdk/net/server/http_server/http_client/dap_http_client.c @@ -517,7 +517,7 @@ void dap_http_client_write( dap_events_socket_t * cl, void *arg ) log_it( L_DEBUG, "Nothing to output" ); l_http_client->state_write = DAP_HTTP_CLIENT_STATE_NONE; dap_events_socket_set_writable_unsafe( cl, false ); - cl->flags |= DAP_SOCK_SIGNAL_CLOSE; + cl->flags |= DAP_ESOCK_SIGNAL_CLOSE; } dap_events_socket_set_readable_unsafe( cl, true ); } else { diff --git a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c index ab46209234..bb3c06c38e 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c @@ -87,7 +87,7 @@ size_t dap_stream_ch_pkt_write_f_mt(dap_stream_worker_t * a_worker , dap_stream_ l_msg->ch = a_ch; l_msg->ch_pkt_type = a_type; l_msg->data = DAP_NEW_SIZE(void,l_data_size); - l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; + l_msg->flags_set = DAP_ESOCK_READY_TO_WRITE; l_data_size = dap_vsnprintf(l_msg->data,0,a_format,ap); if (l_data_size <0 ){ log_it(L_ERROR,"Can't write out formatted data '%s' with values",a_format); @@ -120,7 +120,7 @@ size_t dap_stream_ch_pkt_write_mt(dap_stream_worker_t * a_worker , dap_stream_ch l_msg->ch = a_ch; l_msg->ch_pkt_type = a_type; l_msg->data = DAP_NEW_SIZE(void,a_data_size); - l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; + l_msg->flags_set = DAP_ESOCK_READY_TO_WRITE; l_msg->data_size = a_data_size; memcpy( l_msg->data, a_data, a_data_size); int l_ret= dap_events_socket_queue_ptr_send(a_worker->queue_ch_io , l_msg ); diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index f9e254a586..a45599fd39 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -447,7 +447,7 @@ static void s_esocket_data_read(dap_events_socket_t* a_client, void * a_arg) if (s_dump_packet_headers ) { log_it(L_DEBUG,"dap_stream_data_read: ready_to_write=%s, client->buf_in_size=%u" , - (a_client->flags & DAP_SOCK_READY_TO_WRITE)?"true":"false", a_client->buf_in_size ); + (a_client->flags & DAP_ESOCK_READY_TO_WRITE)?"true":"false", a_client->buf_in_size ); } *l_ret = dap_stream_data_proc_read( l_stream); } diff --git a/dap-sdk/net/stream/stream/dap_stream_worker.c b/dap-sdk/net/stream/stream/dap_stream_worker.c index fe374507f3..0291f29b9b 100644 --- a/dap-sdk/net/stream/stream/dap_stream_worker.c +++ b/dap-sdk/net/stream/stream/dap_stream_worker.c @@ -74,13 +74,13 @@ static void s_ch_io_callback(dap_events_socket_t * a_es, void * a_msg) return; } - if (l_msg->flags_set & DAP_SOCK_READY_TO_READ) + if (l_msg->flags_set & DAP_ESOCK_READY_TO_READ) dap_stream_ch_set_ready_to_read_unsafe(l_msg_ch, true); - if (l_msg->flags_unset & DAP_SOCK_READY_TO_READ) + if (l_msg->flags_unset & DAP_ESOCK_READY_TO_READ) dap_stream_ch_set_ready_to_read_unsafe(l_msg_ch, false); - if (l_msg->flags_set & DAP_SOCK_READY_TO_WRITE) + if (l_msg->flags_set & DAP_ESOCK_READY_TO_WRITE) dap_stream_ch_set_ready_to_write_unsafe(l_msg_ch, true); - if (l_msg->flags_unset & DAP_SOCK_READY_TO_WRITE) + if (l_msg->flags_unset & DAP_ESOCK_READY_TO_WRITE) dap_stream_ch_set_ready_to_write_unsafe(l_msg_ch, false); if (l_msg->data_size && l_msg->data) dap_stream_ch_pkt_write_unsafe(l_msg_ch, l_msg->ch_pkt_type, l_msg->data,l_msg->data_size); diff --git a/modules/global-db/dap_chain_global_db_driver_cdb.c b/modules/global-db/dap_chain_global_db_driver_cdb.c index 5adb80c769..7104702915 100644 --- a/modules/global-db/dap_chain_global_db_driver_cdb.c +++ b/modules/global-db/dap_chain_global_db_driver_cdb.c @@ -175,11 +175,11 @@ pcdb_instance dap_cdb_init_group(char *a_group, int a_flags) { cdb_iterate(l_cdb_i->cdb, dap_cdb_get_last_obj_iter_callback, (void*)&l_arg, l_iter); cdb_iterate_destroy(l_cdb_i->cdb, l_iter); l_cdb_i->id = l_arg.o->id; - log_it(L_INFO, "Group \"%s\" found" , l_cdb_i->local_group); - log_it(L_INFO, "Records: %-24u" , l_cdb_stat.rnum); - log_it(L_INFO, "Average read latency: %-24u" , l_cdb_stat.rlatcy); - log_it(L_INFO, "Average write latency: %-24u" , l_cdb_stat.wlatcy); - log_it(L_INFO, "Last id: %-24u" , l_cdb_i->id); + //log_it(L_INFO, "Group \"%s\" found" , l_cdb_i->local_group); + //log_it(L_INFO, "Records: %-24u" , l_cdb_stat.rnum); + //log_it(L_INFO, "Average read latency: %-24u" , l_cdb_stat.rlatcy); + //log_it(L_INFO, "Average write latency: %-24u" , l_cdb_stat.wlatcy); + //log_it(L_INFO, "Last id: %-24u" , l_cdb_i->id); DAP_DELETE(l_arg.o); } else { log_it(L_INFO, "Group \"%s\" created" , l_cdb_i->local_group); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index c98f272665..902d9619e9 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -539,7 +539,8 @@ static int s_net_states_proc(dap_chain_net_t *a_net) if (l_res == 0) { log_it(L_WARNING, "Can't send GDB sync request"); continue; - } + }else + log_it(L_DEBUG, "Sent DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB (size %zd)", l_res); // wait for finishing of request int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index e0e99708f4..ec08944edc 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -424,7 +424,7 @@ dap_chain_node_client_t* dap_chain_client_connect(dap_chain_node_info_t *a_node_ #endif pthread_mutex_init(&l_node_client->wait_mutex, NULL); - l_node_client->events = NULL; //dap_events_new(); + l_node_client->events = dap_events_get_default(); l_node_client->client = dap_client_new(l_node_client->events, s_stage_status_callback, s_stage_status_error_callback); l_node_client->client->_inheritor = l_node_client; diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index d27b45c921..c8092a2eda 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -906,7 +906,7 @@ static int s_callback_response_error(dap_chain_net_srv_t * a_srv, uint32_t a_usa void s_ch_vpn_new(dap_stream_ch_t* a_ch, void* a_arg) { (void) a_arg; - a_ch->stream->esocket->flags |= DAP_SOCK_REASSIGN_ONCE; // We will try to reassign on another worker + a_ch->stream->esocket->flags |= DAP_ESOCK_REASSIGN_ONCE; // We will try to reassign on another worker // to use FlowControl if its present in system // If not - we prevent jumping between workers with this trick a_ch->internal = DAP_NEW_Z(dap_chain_net_srv_ch_vpn_t); -- GitLab