From 3b927f61dcc019ee42951273fff0448a7ed608c5 Mon Sep 17 00:00:00 2001 From: Dmitriy Gerasimov <naeper@demlabs.net> Date: Tue, 27 Jul 2021 16:04:08 +0700 Subject: [PATCH] [+] New delete unsafe function with 5 sec delay --- dap-sdk/net/client/dap_client_http.c | 5 +- dap-sdk/net/client/dap_client_pvt.c | 4 +- dap-sdk/net/core/dap_events_socket.c | 73 +++++++++++++++++--- dap-sdk/net/core/dap_timerfd.c | 1 + dap-sdk/net/core/include/dap_events_socket.h | 10 +++ dap-sdk/net/core/include/dap_worker.h | 1 + modules/net/dap_chain_net.c | 10 ++- modules/net/dap_chain_node_dns_client.c | 19 ++--- 8 files changed, 92 insertions(+), 31 deletions(-) diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index e19cddd0d8..127f6eec87 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -208,7 +208,8 @@ static bool s_timer_timeout_after_connected_check(void * a_arg) l_http_pvt->is_closed_by_timeout = true; log_it(L_INFO, "Close %s sock %u type %d by timeout", l_es->remote_addr_str ? l_es->remote_addr_str : "", l_es->socket, l_es->type); - dap_events_socket_remove_and_delete_unsafe(l_es, true); + l_es->_inheritor = NULL; + dap_events_socket_remove_and_delete_unsafe_delayed(l_es, true); } }else if(s_debug_more) @@ -254,7 +255,7 @@ static bool s_timer_timeout_check(void * a_arg) l_http_pvt->is_closed_by_timeout = true; log_it(L_INFO, "Close %s sock %u type %d by timeout", l_es->remote_addr_str ? l_es->remote_addr_str : "", l_es->socket, l_es->type); - dap_events_socket_remove_and_delete_unsafe(l_es, true); + dap_events_socket_remove_and_delete_unsafe_delayed(l_es, true); }else if(s_debug_more) log_it(L_DEBUG,"Socket %d is connected, close check timer", l_es->socket); diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index b398160d15..d102aa9982 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -247,7 +247,7 @@ static bool s_stream_timer_timeout_check(void * a_arg) } log_it(L_INFO, "Close %s sock %u type %d by timeout", l_es->remote_addr_str ? l_es->remote_addr_str : "", l_es->socket, l_es->type); - dap_events_socket_remove_and_delete_unsafe(l_es, true); + dap_events_socket_remove_and_delete_unsafe_delayed(l_es,true); }else if(s_debug_more) log_it(L_DEBUG,"Socket %d is connected, close check timer", l_es->socket); @@ -294,7 +294,7 @@ static bool s_stream_timer_timeout_after_connected_check(void * a_arg) } log_it(L_INFO, "Close streaming socket %s by timeout", l_es->remote_addr_str ? l_es->remote_addr_str : "", l_es->socket); - dap_events_socket_remove_and_delete_unsafe(l_es, true); + dap_events_socket_remove_and_delete_unsafe_delayed(l_es,true); }else if(s_debug_more) log_it(L_DEBUG,"Streaming socket %d is connected, close check timer", l_es->socket); diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 6e02349778..37134713f7 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -104,6 +104,10 @@ struct queue_ptr_input_pvt{ #define PVT_QUEUE_PTR_INPUT(a) ( (struct queue_ptr_input_pvt*) (a)->_pvt ) static bool s_debug_reactor = false; +static uint64_t s_delayed_ops_timeout_ms = 5000; +bool s_remove_and_delete_unsafe_delayed_delete_callback(void * a_arg); + + /** * @brief dap_events_socket_init Init clients module @@ -1717,6 +1721,41 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool } + +/** + * @brief s_remove_and_delete_unsafe_delayed_delete_callback + * @param arg + * @return + */ +bool s_remove_and_delete_unsafe_delayed_delete_callback(void * a_arg) +{ + dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); + dap_events_socket_handler_t * l_es_handler = (dap_events_socket_handler_t*) a_arg; + assert(l_es_handler); + assert(l_worker); + if(dap_events_socket_check_uuid_unsafe(l_worker, l_es_handler->esocket, l_es_handler->uuid)) + dap_events_socket_remove_and_delete_unsafe(l_es_handler->esocket,l_es_handler->value == 1); + DAP_DELETE(l_es_handler); + return false; +} + +/** + * @brief dap_events_socket_remove_and_delete_unsafe_delayed + * @param a_es + * @param a_preserve_inheritor + */ +void dap_events_socket_remove_and_delete_unsafe_delayed( dap_events_socket_t *a_es, bool a_preserve_inheritor ) +{ + dap_events_socket_handler_t * l_es_handler = DAP_NEW_Z(dap_events_socket_handler_t); + l_es_handler->esocket = a_es; + l_es_handler->uuid = a_es->uuid; + l_es_handler->value = a_preserve_inheritor ? 1 : 0; + dap_events_socket_remove_from_worker_unsafe(a_es, a_es->worker); + dap_events_socket_descriptor_close(a_es); + dap_timerfd_start_on_worker(a_es->worker, s_delayed_ops_timeout_ms, + s_remove_and_delete_unsafe_delayed_delete_callback, l_es_handler ); +} + /** * @brief dap_events_socket_remove Removes the client from the list * @param sc Connection instance @@ -1746,6 +1785,27 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool } +/** + * @brief dap_events_socket_descriptor_close + * @param a_socket + */ +void dap_events_socket_descriptor_close(dap_events_socket_t *a_esocket) +{ +#ifdef DAP_OS_WINDOWS + if ( a_esocket->socket && (a_esocket->socket != INVALID_SOCKET)) { + closesocket( a_esocket->socket ); +#else + if ( a_esocket->socket && (a_esocket->socket != -1)) { + close( a_esocket->socket ); + if( a_esocket->fd2 > 0 ){ + close( a_esocket->fd2); + } +#endif + } + a_esocket->fd2 = -1; + a_esocket->fd = -1; +} + /** * @brief dap_events_socket_delete_unsafe * @param a_esocket @@ -1785,18 +1845,9 @@ void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_p DAP_DEL_Z(a_esocket->buf_in) DAP_DEL_Z(a_esocket->buf_out) DAP_DEL_Z(a_esocket->remote_addr_str) -#ifdef DAP_OS_WINDOWS - if ( a_esocket->socket && (a_esocket->socket != INVALID_SOCKET)) { - closesocket( a_esocket->socket ); -#else - if ( a_esocket->socket && (a_esocket->socket != -1)) { - close( a_esocket->socket ); - if( a_esocket->fd2 > 0 ){ - close( a_esocket->fd2); - } -#endif - } + dap_events_socket_descriptor_close(a_esocket); + DAP_DEL_Z( a_esocket ) } diff --git a/dap-sdk/net/core/dap_timerfd.c b/dap-sdk/net/core/dap_timerfd.c index 44c26c61fe..2328057f5d 100644 --- a/dap-sdk/net/core/dap_timerfd.c +++ b/dap-sdk/net/core/dap_timerfd.c @@ -76,6 +76,7 @@ void __stdcall TimerAPCb(void* arg, DWORD low, DWORD high) { // Timer high valu } #endif + /** * @brief dap_timerfd_start_on_worker * @param a_worker diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 689246a9d5..21e3bfc10f 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -283,6 +283,10 @@ typedef struct dap_events_socket { typedef struct dap_events_socket_handler{ dap_events_socket_t * esocket; uint128_t uuid; + struct { + uint64_t value; // some custom data + void * ptr; + }; } dap_events_socket_handler_t; @@ -363,6 +367,12 @@ size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_eve void dap_events_socket_remove_and_delete_mt( dap_worker_t * a_w, dap_events_socket_t* a_es); void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool preserve_inheritor ); +// Delayed removed +void dap_events_socket_remove_and_delete_unsafe_delayed( dap_events_socket_t *a_es, bool a_preserve_inheritor); + +// Just close socket descriptor +void dap_events_socket_descriptor_close(dap_events_socket_t *a_socket); + void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap_worker_t * a_worker); void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size); diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/net/core/include/dap_worker.h index ca912b4346..847de76a18 100644 --- a/dap-sdk/net/core/include/dap_worker.h +++ b/dap-sdk/net/core/include/dap_worker.h @@ -130,6 +130,7 @@ void dap_worker_add_events_socket_inter(dap_events_socket_t * a_es_input, dap_ev dap_worker_t *dap_worker_add_events_socket_auto( dap_events_socket_t * a_events_socket ); void dap_worker_exec_callback_on(dap_worker_t * a_worker, dap_worker_callback_t a_callback, void * a_arg); +bool dap_worker_check_esocket_polled_now(); // Check if esocket is right now polled and present in list // Thread function void *dap_worker_thread(void *arg); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index a3789b6ab0..c201220cb3 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -451,9 +451,17 @@ static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_clie dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); dap_chain_node_info_t * l_link_info = a_node_client->info; - a_node_client->state = NODE_CLIENT_STATE_ESTABLISHED; + a_node_client->stream_worker = dap_client_get_stream_worker(a_node_client->client); + if(a_node_client->stream_worker == NULL){ + log_it(L_ERROR, "Stream worker is NULL in connected() callback, do nothing"); + a_node_client->state = NODE_CLIENT_STATE_ERROR; + return; + } + + a_node_client->state = NODE_CLIENT_STATE_ESTABLISHED; + if( !a_node_client->is_reconnecting || s_debug_more ) log_it(L_NOTICE, "Established connection with %s."NODE_ADDR_FP_STR,l_net->pub.name, NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr)); diff --git a/modules/net/dap_chain_node_dns_client.c b/modules/net/dap_chain_node_dns_client.c index ec3f651b04..8043709f12 100644 --- a/modules/net/dap_chain_node_dns_client.c +++ b/modules/net/dap_chain_node_dns_client.c @@ -138,29 +138,18 @@ static bool s_dns_client_esocket_timeout_callback(void * a_arg) dap_worker_t * l_worker = dap_events_get_current_worker(l_events); // We're in own esocket context assert(l_worker); - if(dap_events_socket_check_unsafe(l_worker ,l_es)){ - if(dap_uint128_check_equal(l_es->uuid, l_es_handler->uuid)){ // Pointer is present but alien - DAP_DELETE(l_es_handler); - return false; - }else - DAP_DELETE(l_es_handler); - }else{ // No such pointer - DAP_DELETE(l_es_handler); - return false; - } - - - struct dns_client * l_dns_client = (struct dns_client*) l_es->_inheritor; - if(dap_events_socket_check_unsafe(l_worker, l_es) ){ // If we've not closed this esocket + if(dap_events_socket_check_uuid_unsafe(l_worker ,l_es, l_es_handler->uuid) ){ // If we've not closed this esocket + struct dns_client * l_dns_client = (struct dns_client*) l_es->_inheritor; log_it(L_WARNING,"DNS request timeout, bad network?"); if(! l_dns_client->is_callbacks_called ){ l_dns_client->callback_error(l_es->worker,l_dns_client->result,l_dns_client->callbacks_arg,ETIMEDOUT); l_dns_client->is_callbacks_called = true; } - dap_events_socket_remove_and_delete_unsafe( l_es, false); + dap_events_socket_remove_and_delete_unsafe_delayed( l_es, false); } + DAP_DELETE(l_es_handler); return false; } -- GitLab