diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index e19cddd0d807b6b69d61e2124789571e60f23ce8..127f6eec8757c4c999af299e088930b33635f158 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -208,7 +208,8 @@ static bool s_timer_timeout_after_connected_check(void * a_arg) l_http_pvt->is_closed_by_timeout = true; log_it(L_INFO, "Close %s sock %u type %d by timeout", l_es->remote_addr_str ? l_es->remote_addr_str : "", l_es->socket, l_es->type); - dap_events_socket_remove_and_delete_unsafe(l_es, true); + l_es->_inheritor = NULL; + dap_events_socket_remove_and_delete_unsafe_delayed(l_es, true); } }else if(s_debug_more) @@ -254,7 +255,7 @@ static bool s_timer_timeout_check(void * a_arg) l_http_pvt->is_closed_by_timeout = true; log_it(L_INFO, "Close %s sock %u type %d by timeout", l_es->remote_addr_str ? l_es->remote_addr_str : "", l_es->socket, l_es->type); - dap_events_socket_remove_and_delete_unsafe(l_es, true); + dap_events_socket_remove_and_delete_unsafe_delayed(l_es, true); }else if(s_debug_more) log_it(L_DEBUG,"Socket %d is connected, close check timer", l_es->socket); diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index b398160d15901eb32819e0053275a1443f3dc042..d102aa9982ae1b8356ce2b00c83d292aa6906f54 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -247,7 +247,7 @@ static bool s_stream_timer_timeout_check(void * a_arg) } log_it(L_INFO, "Close %s sock %u type %d by timeout", l_es->remote_addr_str ? l_es->remote_addr_str : "", l_es->socket, l_es->type); - dap_events_socket_remove_and_delete_unsafe(l_es, true); + dap_events_socket_remove_and_delete_unsafe_delayed(l_es,true); }else if(s_debug_more) log_it(L_DEBUG,"Socket %d is connected, close check timer", l_es->socket); @@ -294,7 +294,7 @@ static bool s_stream_timer_timeout_after_connected_check(void * a_arg) } log_it(L_INFO, "Close streaming socket %s by timeout", l_es->remote_addr_str ? l_es->remote_addr_str : "", l_es->socket); - dap_events_socket_remove_and_delete_unsafe(l_es, true); + dap_events_socket_remove_and_delete_unsafe_delayed(l_es,true); }else if(s_debug_more) log_it(L_DEBUG,"Streaming socket %d is connected, close check timer", l_es->socket); diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 6e02349778c575181680c99595727ecaa6a29aa3..37134713f77c333b48fc48b50af8745172b920a8 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -104,6 +104,10 @@ struct queue_ptr_input_pvt{ #define PVT_QUEUE_PTR_INPUT(a) ( (struct queue_ptr_input_pvt*) (a)->_pvt ) static bool s_debug_reactor = false; +static uint64_t s_delayed_ops_timeout_ms = 5000; +bool s_remove_and_delete_unsafe_delayed_delete_callback(void * a_arg); + + /** * @brief dap_events_socket_init Init clients module @@ -1717,6 +1721,41 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool } + +/** + * @brief s_remove_and_delete_unsafe_delayed_delete_callback + * @param arg + * @return + */ +bool s_remove_and_delete_unsafe_delayed_delete_callback(void * a_arg) +{ + dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); + dap_events_socket_handler_t * l_es_handler = (dap_events_socket_handler_t*) a_arg; + assert(l_es_handler); + assert(l_worker); + if(dap_events_socket_check_uuid_unsafe(l_worker, l_es_handler->esocket, l_es_handler->uuid)) + dap_events_socket_remove_and_delete_unsafe(l_es_handler->esocket,l_es_handler->value == 1); + DAP_DELETE(l_es_handler); + return false; +} + +/** + * @brief dap_events_socket_remove_and_delete_unsafe_delayed + * @param a_es + * @param a_preserve_inheritor + */ +void dap_events_socket_remove_and_delete_unsafe_delayed( dap_events_socket_t *a_es, bool a_preserve_inheritor ) +{ + dap_events_socket_handler_t * l_es_handler = DAP_NEW_Z(dap_events_socket_handler_t); + l_es_handler->esocket = a_es; + l_es_handler->uuid = a_es->uuid; + l_es_handler->value = a_preserve_inheritor ? 1 : 0; + dap_events_socket_remove_from_worker_unsafe(a_es, a_es->worker); + dap_events_socket_descriptor_close(a_es); + dap_timerfd_start_on_worker(a_es->worker, s_delayed_ops_timeout_ms, + s_remove_and_delete_unsafe_delayed_delete_callback, l_es_handler ); +} + /** * @brief dap_events_socket_remove Removes the client from the list * @param sc Connection instance @@ -1746,6 +1785,27 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool } +/** + * @brief dap_events_socket_descriptor_close + * @param a_socket + */ +void dap_events_socket_descriptor_close(dap_events_socket_t *a_esocket) +{ +#ifdef DAP_OS_WINDOWS + if ( a_esocket->socket && (a_esocket->socket != INVALID_SOCKET)) { + closesocket( a_esocket->socket ); +#else + if ( a_esocket->socket && (a_esocket->socket != -1)) { + close( a_esocket->socket ); + if( a_esocket->fd2 > 0 ){ + close( a_esocket->fd2); + } +#endif + } + a_esocket->fd2 = -1; + a_esocket->fd = -1; +} + /** * @brief dap_events_socket_delete_unsafe * @param a_esocket @@ -1785,18 +1845,9 @@ void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_p DAP_DEL_Z(a_esocket->buf_in) DAP_DEL_Z(a_esocket->buf_out) DAP_DEL_Z(a_esocket->remote_addr_str) -#ifdef DAP_OS_WINDOWS - if ( a_esocket->socket && (a_esocket->socket != INVALID_SOCKET)) { - closesocket( a_esocket->socket ); -#else - if ( a_esocket->socket && (a_esocket->socket != -1)) { - close( a_esocket->socket ); - if( a_esocket->fd2 > 0 ){ - close( a_esocket->fd2); - } -#endif - } + dap_events_socket_descriptor_close(a_esocket); + DAP_DEL_Z( a_esocket ) } diff --git a/dap-sdk/net/core/dap_timerfd.c b/dap-sdk/net/core/dap_timerfd.c index 44c26c61fe405ff1ea42f2225b91e3ec55ad2ba2..2328057f5de7efedde93639615846c52c3ace0b3 100644 --- a/dap-sdk/net/core/dap_timerfd.c +++ b/dap-sdk/net/core/dap_timerfd.c @@ -76,6 +76,7 @@ void __stdcall TimerAPCb(void* arg, DWORD low, DWORD high) { // Timer high valu } #endif + /** * @brief dap_timerfd_start_on_worker * @param a_worker diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 689246a9d5c876a118f81759c79d564f19d52a4c..21e3bfc10fceb0cf984ccc830c00357bd02a780e 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -283,6 +283,10 @@ typedef struct dap_events_socket { typedef struct dap_events_socket_handler{ dap_events_socket_t * esocket; uint128_t uuid; + struct { + uint64_t value; // some custom data + void * ptr; + }; } dap_events_socket_handler_t; @@ -363,6 +367,12 @@ size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_eve void dap_events_socket_remove_and_delete_mt( dap_worker_t * a_w, dap_events_socket_t* a_es); void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool preserve_inheritor ); +// Delayed removed +void dap_events_socket_remove_and_delete_unsafe_delayed( dap_events_socket_t *a_es, bool a_preserve_inheritor); + +// Just close socket descriptor +void dap_events_socket_descriptor_close(dap_events_socket_t *a_socket); + void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap_worker_t * a_worker); void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size); diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/net/core/include/dap_worker.h index ca912b4346036e7d6aef71d233a1fb28083aedba..847de76a181b755869e73ab8af98113d18522566 100644 --- a/dap-sdk/net/core/include/dap_worker.h +++ b/dap-sdk/net/core/include/dap_worker.h @@ -130,6 +130,7 @@ void dap_worker_add_events_socket_inter(dap_events_socket_t * a_es_input, dap_ev dap_worker_t *dap_worker_add_events_socket_auto( dap_events_socket_t * a_events_socket ); void dap_worker_exec_callback_on(dap_worker_t * a_worker, dap_worker_callback_t a_callback, void * a_arg); +bool dap_worker_check_esocket_polled_now(); // Check if esocket is right now polled and present in list // Thread function void *dap_worker_thread(void *arg); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index a3789b6ab0ecda52d33fcefcd3ff217c2b9e6ab7..c201220cb39be8395f7e32b1a8baa9a0faa3fcaa 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -451,9 +451,17 @@ static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_clie dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); dap_chain_node_info_t * l_link_info = a_node_client->info; - a_node_client->state = NODE_CLIENT_STATE_ESTABLISHED; + a_node_client->stream_worker = dap_client_get_stream_worker(a_node_client->client); + if(a_node_client->stream_worker == NULL){ + log_it(L_ERROR, "Stream worker is NULL in connected() callback, do nothing"); + a_node_client->state = NODE_CLIENT_STATE_ERROR; + return; + } + + a_node_client->state = NODE_CLIENT_STATE_ESTABLISHED; + if( !a_node_client->is_reconnecting || s_debug_more ) log_it(L_NOTICE, "Established connection with %s."NODE_ADDR_FP_STR,l_net->pub.name, NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr)); diff --git a/modules/net/dap_chain_node_dns_client.c b/modules/net/dap_chain_node_dns_client.c index ec3f651b04cbddfa5b324fa9a680d6ecbb38abc2..8043709f12fccac2930d185a8d3521f54b62d8a9 100644 --- a/modules/net/dap_chain_node_dns_client.c +++ b/modules/net/dap_chain_node_dns_client.c @@ -138,29 +138,18 @@ static bool s_dns_client_esocket_timeout_callback(void * a_arg) dap_worker_t * l_worker = dap_events_get_current_worker(l_events); // We're in own esocket context assert(l_worker); - if(dap_events_socket_check_unsafe(l_worker ,l_es)){ - if(dap_uint128_check_equal(l_es->uuid, l_es_handler->uuid)){ // Pointer is present but alien - DAP_DELETE(l_es_handler); - return false; - }else - DAP_DELETE(l_es_handler); - }else{ // No such pointer - DAP_DELETE(l_es_handler); - return false; - } - - - struct dns_client * l_dns_client = (struct dns_client*) l_es->_inheritor; - if(dap_events_socket_check_unsafe(l_worker, l_es) ){ // If we've not closed this esocket + if(dap_events_socket_check_uuid_unsafe(l_worker ,l_es, l_es_handler->uuid) ){ // If we've not closed this esocket + struct dns_client * l_dns_client = (struct dns_client*) l_es->_inheritor; log_it(L_WARNING,"DNS request timeout, bad network?"); if(! l_dns_client->is_callbacks_called ){ l_dns_client->callback_error(l_es->worker,l_dns_client->result,l_dns_client->callbacks_arg,ETIMEDOUT); l_dns_client->is_callbacks_called = true; } - dap_events_socket_remove_and_delete_unsafe( l_es, false); + dap_events_socket_remove_and_delete_unsafe_delayed( l_es, false); } + DAP_DELETE(l_es_handler); return false; }