From f807ca8a71458986c7a419dbba1a76bd5a566f52 Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net> Date: Thu, 22 Jul 2021 22:04:50 +0700 Subject: [PATCH] [*] Fixed leaks [!] Added a_es_uuid param for interworkers communications --- CMakeLists.txt | 2 +- dap-sdk/net/client/dap_client_http.c | 3 +- dap-sdk/net/client/dap_client_pvt.c | 7 +++- dap-sdk/net/core/dap_events_socket.c | 40 ++++++++++++------- dap-sdk/net/core/dap_proc_thread.c | 15 ++++--- dap-sdk/net/core/include/dap_events_socket.h | 4 +- dap-sdk/net/core/include/dap_proc_thread.h | 4 +- .../server/notify_server/src/dap_notify_srv.c | 3 +- dap-sdk/net/stream/stream/dap_stream.c | 11 +++-- .../net/stream/stream/include/dap_stream.h | 1 + modules/net/dap_chain_node_dns_client.c | 1 + 11 files changed, 59 insertions(+), 32 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 539b47b63b..18af59136b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 3.0) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.9-12") +set(CELLFRAME_SDK_NATIVE_VERSION "2.9-13") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") message("Cellframe modules: ${CELLFRAME_MODULES}") diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index d43fa788ac..e19cddd0d8 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -349,7 +349,8 @@ static void s_http_read(dap_events_socket_t * a_es, void * arg) l_http_pvt->header_length = 0; l_http_pvt->content_length = 0; l_http_pvt->were_callbacks_called = true; - dap_events_socket_remove_and_delete_unsafe(a_es, true); + a_es->flags |= DAP_SOCK_SIGNAL_CLOSE; + //dap_events_socket_remove_and_delete_unsafe(a_es, true); } } diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 2d67da94eb..a12b12888e 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -327,7 +327,9 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt) case STAGE_STREAM_CONNECTED: case STAGE_STREAM_STREAMING: dap_stream_delete(a_client_pvt->stream); - dap_events_socket_remove_and_delete_unsafe(a_client_pvt->stream_es, true); + if(a_client_pvt->stream_es) + a_client_pvt->stream_es->flags |= DAP_SOCK_SIGNAL_CLOSE; + //dap_events_socket_remove_and_delete_unsafe(a_client_pvt->stream_es, true); a_client_pvt->stream = NULL; a_client_pvt->stream_es = NULL; break; @@ -808,7 +810,8 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char if(a_path) { if(l_sub_url_size){ if(l_query_size){ - dap_snprintf(l_path, l_path_size, "%s/%s?%s", a_path, l_sub_url_enc, + dap_snprintf(l_path, l_path_size, "%s/%s?%s", a_path?a_path:"", + l_sub_url_enc?l_sub_url_enc:"", l_query_enc?l_query_enc:""); }else{ dap_snprintf(l_path, l_path_size, "%s/%s", a_path, l_sub_url_enc); diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 6ad78c7be5..6e02349778 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -1218,7 +1218,7 @@ int dap_events_socket_queue_ptr_send_to_input(dap_events_socket_t * a_es_input, } }*/ return dap_events_socket_write_unsafe(a_es_input, &l_arg, sizeof(l_arg)) - == sizeof(l_arg) ? 0 : 1; + == sizeof(l_arg) ? 0 : -1; #endif } @@ -1241,10 +1241,16 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg) assert(a_es->mqd); l_ret = mq_send(a_es->mqd, (const char *)&a_arg,sizeof (a_arg),0); l_errno = errno; + if ( l_ret == EPERM){ + log_it(L_ERROR,"No permissions to send data in mqueue"); + } + if (l_errno == EINVAL || l_errno == EINTR || l_errno == ETIMEDOUT) l_errno = EAGAIN; if (l_ret == 0) l_ret = sizeof (a_arg); + else if (l_ret >0) + l_ret = -l_ret; #elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX) struct timespec l_timeout; clock_gettime(CLOCK_REALTIME, &l_timeout); @@ -1956,7 +1962,7 @@ void dap_events_socket_set_readable_mt(dap_worker_t * a_w, dap_events_socket_t * int l_ret= dap_events_socket_queue_ptr_send(a_w->queue_es_io, l_msg ); if (l_ret!=0){ - log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret); + log_it(L_ERROR, "set readable mt: wasn't send pointer to queue with set readble flag: code %d", l_ret); DAP_DELETE(l_msg); } } @@ -1979,7 +1985,7 @@ void dap_events_socket_set_writable_mt(dap_worker_t * a_w, dap_events_socket_t * int l_ret= dap_events_socket_queue_ptr_send(a_w->queue_es_io, l_msg ); if (l_ret!=0){ - log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret); + log_it(L_ERROR, "set writable mt: wasn't send pointer to queue: code %d", l_ret); DAP_DELETE(l_msg); } } @@ -1988,16 +1994,16 @@ void dap_events_socket_set_writable_mt(dap_worker_t * a_w, dap_events_socket_t * * @brief dap_events_socket_write_inter * @param a_es_input * @param a_es + * @param a_es_uuid * @param a_data * @param a_data_size * @return */ -size_t dap_events_socket_write_inter(dap_events_socket_t * a_es_input, dap_events_socket_t *a_es, const void * a_data, size_t a_data_size) +size_t dap_events_socket_write_inter(dap_events_socket_t * a_es_input, dap_events_socket_t *a_es,uint128_t a_es_uuid, const void * a_data, size_t a_data_size) { dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); if( !l_msg) return 0; l_msg->esocket = a_es; - if(a_es) - l_msg->esocket_uuid = a_es->uuid; + l_msg->esocket_uuid = a_es_uuid; l_msg->data = DAP_NEW_SIZE(void,a_data_size); l_msg->data_size = a_data_size; l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; @@ -2006,7 +2012,7 @@ size_t dap_events_socket_write_inter(dap_events_socket_t * a_es_input, dap_event int l_ret= dap_events_socket_queue_ptr_send_to_input( a_es_input, l_msg ); if (l_ret!=0){ - log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret); + log_it(L_ERROR, "write inter: wasn't send pointer to queue: code %d", l_ret); DAP_DELETE(l_msg); return 0; } @@ -2016,11 +2022,12 @@ size_t dap_events_socket_write_inter(dap_events_socket_t * a_es_input, dap_event /** * @brief dap_events_socket_write_f_inter * @param a_es_input - * @param sc - * @param format + * @param a_es + * @param a_es_uuid + * @param a_format * @return */ -size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_events_socket_t *a_es, const char * a_format,...) +size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_events_socket_t *a_es,uint128_t a_es_uuid, const char * a_format,...) { va_list ap, ap_copy; va_start(ap,a_format); @@ -2035,6 +2042,7 @@ size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_eve dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); l_msg->esocket = a_es; + l_msg->esocket_uuid = a_es_uuid; l_msg->data = DAP_NEW_SIZE(void,l_data_size); l_msg->data_size = l_data_size; l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; @@ -2043,7 +2051,7 @@ size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_eve int l_ret= dap_events_socket_queue_ptr_send_to_input(a_es_input, l_msg ); if (l_ret!=0){ - log_it(L_ERROR, "Wasn't send pointer to queue input: code %d", l_ret); + log_it(L_ERROR, "wite f inter: wasn't send pointer to queue input: code %d", l_ret); DAP_DELETE(l_msg); return 0; } @@ -2070,7 +2078,7 @@ size_t dap_events_socket_write_mt(dap_worker_t * a_w,dap_events_socket_t *a_es, int l_ret= dap_events_socket_queue_ptr_send(a_w->queue_es_io, l_msg ); if (l_ret!=0){ - log_it(L_ERROR, "Wasn't send pointer to queue input: code %d", l_ret); + log_it(L_ERROR, "wite mt: wasn't send pointer to queue input: code %d", l_ret); DAP_DELETE(l_msg); return 0; } @@ -2104,7 +2112,7 @@ size_t dap_events_socket_write_f_mt(dap_worker_t * a_w,dap_events_socket_t *a_es l_data_size = dap_vsprintf(l_msg->data,format,ap_copy); va_end(ap_copy); if (l_data_size <0 ){ - log_it(L_ERROR,"Can't write out formatted data '%s' with values",format); + log_it(L_ERROR,"Write f mt: can't write out formatted data '%s' with values",format); DAP_DELETE(l_msg->data); DAP_DELETE(l_msg); return 0; @@ -2112,7 +2120,7 @@ size_t dap_events_socket_write_f_mt(dap_worker_t * a_w,dap_events_socket_t *a_es l_msg->data_size = l_data_size; int l_ret= dap_events_socket_queue_ptr_send(a_w->queue_es_io, l_msg ); if (l_ret!=0){ - log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret); + log_it(L_ERROR, "Wrrite f mt: wasn't send pointer to queue: code %d", l_ret); DAP_DELETE(l_msg->data); DAP_DELETE(l_msg); return 0; @@ -2154,6 +2162,10 @@ size_t dap_events_socket_write_f_unsafe(dap_events_socket_t *a_es, const char * size_t l_max_data_size = a_es->buf_out_size_max - a_es->buf_out_size; if (! l_max_data_size) return 0; + if(!a_es->buf_out){ + log_it(L_ERROR,"Can't write formatted data to NULL buffer output"); + return 0; + } va_list l_ap; va_start(l_ap, a_format); diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index c1014bab5c..6123e662e7 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -950,16 +950,17 @@ bool dap_proc_thread_assign_on_worker_inter(dap_proc_thread_t * a_thread, dap_wo * @brief dap_proc_thread_esocket_write_inter * @param a_thread * @param a_worker - * @param a_esocket + * @param a_es + * @param a_es_uuid * @param a_data * @param a_data_size * @return */ -int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_esocket, +int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_es,uint128_t a_es_uuid, const void * a_data, size_t a_data_size) { dap_events_socket_t * l_es_io_input = a_thread->queue_io_input[a_worker->id]; - dap_events_socket_write_inter(l_es_io_input,a_esocket, a_data, a_data_size); + dap_events_socket_write_inter(l_es_io_input,a_es,a_es_uuid, a_data, a_data_size); // TODO Make this code platform-independent #ifndef DAP_EVENTS_CAPS_EVENT_KEVENT l_es_io_input->flags |= DAP_SOCK_READY_TO_WRITE; @@ -973,11 +974,12 @@ int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_ * @brief dap_proc_thread_esocket_write_f_inter * @param a_thread * @param a_worker - * @param a_esocket + * @param a_es + * @param a_es_uuid, * @param a_format * @return */ -int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_esocket, +int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_es,uint128_t a_es_uuid, const char * a_format,...) { va_list ap, ap_copy; @@ -1000,12 +1002,13 @@ int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worke l_data_size = dap_vsprintf(l_data,a_format,ap_copy); va_end(ap_copy); - dap_events_socket_write_inter(l_es_io_input,a_esocket, l_data, l_data_size); + dap_events_socket_write_inter(l_es_io_input,a_es, a_es_uuid, l_data, l_data_size); // TODO Make this code platform-independent #ifndef DAP_EVENTS_CAPS_EVENT_KEVENT l_es_io_input->flags |= DAP_SOCK_READY_TO_WRITE; dap_proc_thread_esocket_update_poll_flags(a_thread, l_es_io_input); #endif + DAP_DELETE(l_data); return 0; } diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 207a1067c1..689246a9d5 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -357,8 +357,8 @@ void dap_events_socket_set_writable_mt(dap_worker_t * a_w, dap_events_socket_t * size_t dap_events_socket_write_mt(dap_worker_t * a_w, dap_events_socket_t *a_es, const void * a_data, size_t a_data_size); size_t dap_events_socket_write_f_mt(dap_worker_t * a_w, dap_events_socket_t *a_es, const char * a_format,...); -size_t dap_events_socket_write_inter(dap_events_socket_t * a_es_input, dap_events_socket_t *a_es, const void * a_data, size_t a_data_size); -size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_events_socket_t *a_es, const char * a_format,...); +size_t dap_events_socket_write_inter(dap_events_socket_t * a_es_input, dap_events_socket_t *a_es, uint128_t a_es_uuid, const void * a_data, size_t a_data_size); +size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_events_socket_t *a_es,uint128_t a_es_uuid, const char * a_format,...); void dap_events_socket_remove_and_delete_mt( dap_worker_t * a_w, dap_events_socket_t* a_es); void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool preserve_inheritor ); diff --git a/dap-sdk/net/core/include/dap_proc_thread.h b/dap-sdk/net/core/include/dap_proc_thread.h index f4436778b1..af56582e02 100644 --- a/dap-sdk/net/core/include/dap_proc_thread.h +++ b/dap-sdk/net/core/include/dap_proc_thread.h @@ -74,9 +74,9 @@ dap_events_socket_t * dap_proc_thread_create_queue_ptr(dap_proc_thread_t * a_thr bool dap_proc_thread_assign_on_worker_inter(dap_proc_thread_t * a_thread, dap_worker_t * a_worker, dap_events_socket_t *a_esocket ); -int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_esocket, +int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_es,uint128_t a_es_uuid, const void * a_data, size_t a_data_size); -int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_esocket, +int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_es, uint128_t a_es_uuid, const char * a_format,...); int dap_proc_thread_esocket_update_poll_flags(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket); diff --git a/dap-sdk/net/server/notify_server/src/dap_notify_srv.c b/dap-sdk/net/server/notify_server/src/dap_notify_srv.c index c42380a56e..e438913f50 100644 --- a/dap-sdk/net/server/notify_server/src/dap_notify_srv.c +++ b/dap-sdk/net/server/notify_server/src/dap_notify_srv.c @@ -169,7 +169,8 @@ static void s_notify_server_callback_queue(dap_events_socket_t * a_es, void * a_ } size_t l_str_len = a_arg? strlen((char*)a_arg): 0; if(l_str_len){ - dap_events_socket_write_inter(a_es->worker->queue_es_io_input[l_worker_id],l_socket_handler->esocket, + dap_events_socket_write_inter(a_es->worker->queue_es_io_input[l_worker_id], + l_socket_handler->esocket, l_socket_handler->uuid, a_arg,l_str_len+1); } } diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index c418c1883f..3d60bfd33c 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -280,11 +280,11 @@ void dap_stream_delete(dap_stream_t *a_stream) log_it(L_ERROR,"stream delete NULL instance"); return; } - if (a_stream->prev) { + //if (a_stream->prev) { pthread_mutex_lock(&s_mutex_keepalive_list); DL_DELETE(s_stream_keepalive_list, a_stream); pthread_mutex_unlock(&s_mutex_keepalive_list); - } + //} while (a_stream->channel_count) { dap_stream_ch_delete(a_stream->channel[a_stream->channel_count - 1]); @@ -324,6 +324,7 @@ dap_stream_t* dap_stream_new_es_client(dap_events_socket_t * a_esocket) { dap_stream_t * ret= DAP_NEW_Z(dap_stream_t); ret->esocket = a_esocket; + ret->esocket_uuid = a_esocket->uuid; ret->buf_defrag_size=0; ret->is_client_to_uplink = true; pthread_mutex_lock(&s_mutex_keepalive_list); @@ -791,12 +792,16 @@ static bool s_detect_loose_packet(dap_stream_t * a_stream) static bool s_keepalive_cb( void ) { dap_stream_t *l_stream, *tmp; + dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); pthread_mutex_lock( &s_mutex_keepalive_list ); stream_pkt_hdr_t l_pkt = {0}; l_pkt.type = STREAM_PKT_TYPE_KEEPALIVE; memcpy(l_pkt.sig, c_dap_stream_sig, sizeof(l_pkt.sig)); DL_FOREACH_SAFE( s_stream_keepalive_list, l_stream, tmp ) { - dap_events_socket_write_inter( l_stream->stream_worker->worker, l_stream->esocket, &l_pkt, sizeof(l_pkt)); + dap_events_socket_t * l_input = l_worker->queue_es_io_input [l_stream->stream_worker->worker->id]; + dap_events_socket_write_inter( l_input, + l_stream->esocket, l_stream->esocket_uuid, + &l_pkt, sizeof(l_pkt)); } pthread_mutex_unlock( &s_mutex_keepalive_list ); return true; diff --git a/dap-sdk/net/stream/stream/include/dap_stream.h b/dap-sdk/net/stream/stream/include/dap_stream.h index 39f076f1b0..21b3aee36a 100644 --- a/dap-sdk/net/stream/stream/include/dap_stream.h +++ b/dap-sdk/net/stream/stream/include/dap_stream.h @@ -53,6 +53,7 @@ typedef struct dap_stream { int id; dap_stream_session_t * session; dap_events_socket_t * esocket; // Connection + uint128_t esocket_uuid; dap_stream_worker_t * stream_worker; struct dap_http_client * conn_http; // HTTP-specific diff --git a/modules/net/dap_chain_node_dns_client.c b/modules/net/dap_chain_node_dns_client.c index 9a56211658..ec3f651b04 100644 --- a/modules/net/dap_chain_node_dns_client.c +++ b/modules/net/dap_chain_node_dns_client.c @@ -279,6 +279,7 @@ int dap_chain_node_info_dns_request(struct in_addr a_addr, uint16_t a_port, char dap_worker_t * l_worker = dap_events_worker_get_auto(); dap_events_socket_assign_on_worker_mt(l_esocket,l_worker); + return 0; } -- GitLab