From e0556fb1524b6ac402ecfc284aa945c13c162562 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Fri, 16 Oct 2020 16:40:37 +0000 Subject: [PATCH] bugs-4572 --- CMakeLists.txt | 2 +- dap-sdk/core/src/dap_common.c | 24 ++--- dap-sdk/net/client/dap_client.c | 31 +++--- dap-sdk/net/client/dap_client_http.c | 7 +- dap-sdk/net/client/dap_client_pvt.c | 47 ++++++---- dap-sdk/net/core/dap_proc_queue.c | 2 +- dap-sdk/net/core/dap_proc_thread.c | 2 +- dap-sdk/net/core/dap_timerfd.c | 37 ++++---- dap-sdk/net/core/dap_worker.c | 64 +++++++------ dap-sdk/net/core/include/dap_events_socket.h | 1 - dap-sdk/net/core/include/dap_timerfd.h | 5 +- .../net/stream/ch/include/dap_stream_ch_pkt.h | 2 +- dap-sdk/net/stream/stream/dap_stream.c | 94 +++++++------------ dap-sdk/net/stream/stream/dap_stream_pkt.c | 13 --- .../net/stream/stream/include/dap_stream.h | 4 - .../stream/stream/include/dap_stream_pkt.h | 3 +- modules/channel/chain/dap_stream_ch_chain.c | 1 + modules/net/dap_chain_net.c | 16 ++-- modules/net/dap_chain_node_client.c | 13 +++ modules/net/include/dap_chain_node_client.h | 5 +- 20 files changed, 183 insertions(+), 190 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 098d030f2b..8c86602c76 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.6-16") +set(CELLFRAME_SDK_NATIVE_VERSION "2.6-17") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/dap-sdk/core/src/dap_common.c b/dap-sdk/core/src/dap_common.c index 8004259700..ded4fd75bd 100755 --- a/dap-sdk/core/src/dap_common.c +++ b/dap-sdk/core/src/dap_common.c @@ -879,7 +879,7 @@ void *dap_interval_timer_create(unsigned int a_msec, dap_timer_callback_t a_call if (s_timers_count == DAP_INTERVAL_TIMERS_MAX) { return NULL; } -#ifdef WIN32 +#if (defined _WIN32) if (s_timers_count == 0) { InitializeCriticalSection(&s_timers_lock); } @@ -888,7 +888,7 @@ void *dap_interval_timer_create(unsigned int a_msec, dap_timer_callback_t a_call return NULL; } EnterCriticalSection(&s_timers_lock); -#elif defined DAP_OS_DARWIN +#elif (defined DAP_OS_DARWIN) if (s_timers_count == 0) { pthread_mutex_init(&s_timers_lock, NULL); } @@ -922,9 +922,9 @@ void *dap_interval_timer_create(unsigned int a_msec, dap_timer_callback_t a_call s_timers[s_timers_count].callback = a_callback; s_timers[s_timers_count].param = a_param; s_timers_count++; -#ifdef _WIN32 +#ifdef WIN32 LeaveCriticalSection(&s_timers_lock); -#elif DAP_OS_UNIX +#else pthread_mutex_unlock(&s_timers_lock); #endif return (void *)l_timer; @@ -940,9 +940,9 @@ int dap_interval_timer_delete(void *a_timer) if (!s_timers_count) { return -1; } -#ifdef _WIN32 +#if (defined _WIN32) EnterCriticalSection(&s_timers_lock); -#elif DAP_OS_UNIX +#elif (defined DAP_OS_UNIX) pthread_mutex_lock(&s_timers_lock); #endif int l_timer_idx = s_timer_find(a_timer); @@ -959,18 +959,18 @@ int dap_interval_timer_delete(void *a_timer) DeleteCriticalSection(&s_timers_lock); } return !DeleteTimerQueueTimer(NULL, (HANDLE)a_timer, NULL); -#elif DAP_OS_UNIX +#else pthread_mutex_unlock(&s_timers_lock); if (s_timers_count == 0) { pthread_mutex_destroy(&s_timers_lock); } - #ifdef DAP_OS_DARWIN +#ifdef DAP_OS_UNIX + return timer_delete((timer_t)a_timer); +#else dispatch_source_cancel(a_timer); return 0; - #else - return timer_delete((timer_t)a_timer); - #endif -#endif +#endif // DAP_OS_UNIX +#endif // _WIN32 } /** diff --git a/dap-sdk/net/client/dap_client.c b/dap-sdk/net/client/dap_client.c index 09f32b4940..e1cf468828 100644 --- a/dap-sdk/net/client/dap_client.c +++ b/dap-sdk/net/client/dap_client.c @@ -254,14 +254,25 @@ static void s_go_stage_on_client_worker_unsafe(dap_worker_t * a_worker,void * a_ dap_client_pvt_t * l_client_pvt = ((struct go_stage_arg*) a_arg)->client_pvt; DAP_DELETE(a_arg); - l_client_pvt->stage_target = a_stage_target; - l_client_pvt->stage_target_done_callback = a_stage_end_callback; - dap_client_stage_t l_cur_stage = l_client_pvt->stage; dap_client_stage_status_t l_cur_stage_status= l_client_pvt->stage_status; - - if(a_stage_target != l_cur_stage ){ // Going to stages downstairs + if (a_stage_target == l_cur_stage) { + log_it(L_DEBUG, "Already have target state %s", dap_client_stage_str(a_stage_target)); + if (a_stage_end_callback) { + a_stage_end_callback(l_client_pvt->client, NULL); + } + return; + } + log_it(L_DEBUG, "Start transitions chain to %s", dap_client_stage_str(a_stage_target)); + l_client_pvt->stage_target = a_stage_target; + l_client_pvt->stage_target_done_callback = a_stage_end_callback; + if (a_stage_target < l_cur_stage) { + dap_client_pvt_stage_transaction_begin(l_client_pvt, STAGE_BEGIN, NULL); + } + l_cur_stage = l_client_pvt->stage; + l_cur_stage_status= l_client_pvt->stage_status; + if (a_stage_target != l_cur_stage ){ // Going to stages downstairs switch(l_cur_stage_status ){ case STAGE_STATUS_ABORTING: log_it(L_ERROR, "Already aborting the stage %s" @@ -274,17 +285,11 @@ static void s_go_stage_on_client_worker_unsafe(dap_worker_t * a_worker,void * a_ case STAGE_STATUS_DONE: case STAGE_STATUS_ERROR: default: { - log_it(L_DEBUG, "Start transitions chain to %s" - ,dap_client_stage_str(a_stage_target) ); - int step = (a_stage_target > l_cur_stage)?1:-1; dap_client_pvt_stage_transaction_begin(l_client_pvt, - l_cur_stage+step, - m_stage_fsm_operator_unsafe - ); + l_cur_stage + 1, + m_stage_fsm_operator_unsafe); } } - }else{ // Same stage - log_it(L_ERROR,"We're already on stage %s",dap_client_stage_str(a_stage_target)); } } /** diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index 8ad1f99eb5..444a10b456 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -166,8 +166,7 @@ static void s_http_read(dap_events_socket_t * a_es, void * arg) s_client_http_delete(l_client_http_internal); a_es->_inheritor = NULL; - a_es->kill_signal=true; - a_es->flags &= DAP_SOCK_SIGNAL_CLOSE; + a_es->flags |= DAP_SOCK_SIGNAL_CLOSE; } } @@ -200,9 +199,7 @@ static void s_http_error(dap_events_socket_t * a_es, int a_errno) s_client_http_delete(l_client_http_internal); a_es->_inheritor = NULL; // close connection. - // TODO merge this things into the one (I expect better it would be flag ) - a_es->flags &= DAP_SOCK_SIGNAL_CLOSE; - a_es->kill_signal = true; + a_es->flags |= DAP_SOCK_SIGNAL_CLOSE; } /** diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 2814a5bfb6..5e89ef28cf 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -150,9 +150,8 @@ static void s_client_pvt_disconnected(dap_client_t * a_client, void * a_arg ) (void) a_arg; // To be sure thats cond waiter is waiting and unlocked mutex pthread_mutex_lock(&DAP_CLIENT_PVT(a_client)->disconnected_mutex); - pthread_mutex_unlock(&DAP_CLIENT_PVT(a_client)->disconnected_mutex); - pthread_cond_broadcast(&DAP_CLIENT_PVT(a_client)->disconnected_cond); + pthread_mutex_unlock(&DAP_CLIENT_PVT(a_client)->disconnected_mutex); } /** @@ -167,7 +166,9 @@ int dap_client_pvt_disconnect_all_n_wait(dap_client_pvt_t *a_client_pvt) return -1; pthread_mutex_lock(&a_client_pvt->disconnected_mutex); dap_client_go_stage(a_client_pvt->client, STAGE_BEGIN, s_client_pvt_disconnected ); - pthread_cond_wait(&a_client_pvt->disconnected_cond, &a_client_pvt->disconnected_mutex); + while (a_client_pvt->stage != STAGE_BEGIN) { + pthread_cond_wait(&a_client_pvt->disconnected_cond, &a_client_pvt->disconnected_mutex); + } pthread_mutex_unlock(&a_client_pvt->disconnected_mutex); return 0; @@ -242,6 +243,21 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) switch (l_stage_status) { case STAGE_STATUS_IN_PROGRESS: { + if (a_client_pvt->stage_target == STAGE_BEGIN) { + switch(l_stage) { + case STAGE_STREAM_CONNECTED: + case STAGE_STREAM_STREAMING: + dap_stream_delete(a_client_pvt->stream); + a_client_pvt->stream = NULL; + a_client_pvt->stream_es = NULL; + break; + default: + break; + } + a_client_pvt->stage_status = STAGE_STATUS_DONE; + s_stage_status_after(a_client_pvt); + return; + } switch (l_stage) { case STAGE_ENC_INIT: { log_it(L_INFO, "Go to stage ENC: prepare the request"); @@ -471,7 +487,9 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) int MAX_ATTEMPTS = 3; a_client_pvt->stage_errors++; bool l_is_last_attempt = a_client_pvt->stage_errors > MAX_ATTEMPTS ? true : false; - + if (a_client_pvt->last_error == ERROR_NETWORK_CONNECTION_TIMEOUT) { + l_is_last_attempt = true; + } log_it(L_ERROR, "Error state, doing callback if present"); if(a_client_pvt->stage_status_error_callback) a_client_pvt->stage_status_error_callback(a_client_pvt->client, (void*) l_is_last_attempt); @@ -500,8 +518,8 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) log_it(L_INFO, "Connection attempt %d in 0.3 seconds", a_client_pvt->stage_errors); // small delay before next request - dap_timerfd_start_on_worker( l_worker, 300, - (dap_timerfd_callback_t) s_stage_status_after,a_client_pvt ); + dap_timerfd_start_on_worker( l_worker, 300, (dap_timerfd_callback_t)s_stage_status_after, + a_client_pvt, false ); } else{ log_it(L_INFO, "Too many connection attempts. Tries are over."); @@ -515,13 +533,6 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) break; case STAGE_STATUS_DONE: { log_it(L_INFO, "Stage status %s is done", dap_client_stage_str(a_client_pvt->stage)); - // go to next stage - if(a_client_pvt->stage_status_done_callback) { - a_client_pvt->stage_status_done_callback(a_client_pvt->client, NULL); - // Expecting that its one-shot callback - //a_client_internal->stage_status_done_callback = NULL; - } else - log_it(L_WARNING, "Stage done callback is not present"); bool l_is_last_stage = (a_client_pvt->stage == a_client_pvt->stage_target); if(l_is_last_stage) { //l_is_unref = true; @@ -532,8 +543,10 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) // Expecting that its one-shot callback a_client_pvt->stage_target_done_callback = NULL; } + } else if (a_client_pvt->stage_status_done_callback) { + // go to next stage + a_client_pvt->stage_status_done_callback(a_client_pvt->client, NULL); } - //l_is_unref = true; } break; default: @@ -1200,11 +1213,7 @@ static void s_stream_es_callback_error(dap_events_socket_t * a_es, int a_error) log_it(L_WARNING, "STREAM error \"%s\" (code %d)", l_errbuf, a_error); - - // TODO merge flag and field - l_client_pvt->stream_es->kill_signal = true; - l_client_pvt->stream_es->flags &= DAP_SOCK_SIGNAL_CLOSE; - + l_client_pvt->stream_es->flags |= DAP_SOCK_SIGNAL_CLOSE; if (a_error == ETIMEDOUT) { l_client_pvt->last_error = ERROR_NETWORK_CONNECTION_TIMEOUT; diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/net/core/dap_proc_queue.c index 4b91b2d996..2a6374b2a1 100644 --- a/dap-sdk/net/core/dap_proc_queue.c +++ b/dap-sdk/net/core/dap_proc_queue.c @@ -82,7 +82,7 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg) //log_it( L_DEBUG, "Sent signal to proc thread that we have callbacks on board"); } if (l_msg->signal_kill){ // Say to kill this object and delete its inherior dap_proc_queue_t - a_es->kill_signal = true; + a_es->flags |= DAP_SOCK_SIGNAL_CLOSE; } DAP_DELETE(l_msg); } diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 93c4ab15cc..4c3a2e62f3 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -287,7 +287,7 @@ static void * s_proc_thread_function(void * a_arg) default:{ log_it(L_ERROR, "Unprocessed descriptor type accepted in proc thread loop"); } } } - if(l_cur->kill_signal){ + if(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE){ #ifdef DAP_EVENTS_CAPS_EPOLL if ( epoll_ctl( l_thread->epoll_ctl, EPOLL_CTL_DEL, l_cur->fd, &l_cur->ev ) == -1 ) log_it( L_ERROR,"Can't remove event socket's handler from the epoll ctl" ); diff --git a/dap-sdk/net/core/dap_timerfd.c b/dap-sdk/net/core/dap_timerfd.c index d8526a500a..51be755a2c 100644 --- a/dap-sdk/net/core/dap_timerfd.c +++ b/dap-sdk/net/core/dap_timerfd.c @@ -57,9 +57,9 @@ int dap_timerfd_init() * @param a_callback * @return new allocated dap_timerfd_t structure or NULL if error */ -dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg) +dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg, bool a_repeated) { - return dap_timerfd_start_on_worker(dap_events_worker_get_auto(), a_timeout_ms, a_callback, a_callback_arg ); + return dap_timerfd_start_on_worker(dap_events_worker_get_auto(), a_timeout_ms, a_callback, a_callback_arg, a_repeated); } /** @@ -70,7 +70,7 @@ dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a * @param a_callback_arg * @return */ -dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg) +dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg, bool a_repeated) { struct itimerspec l_ts; @@ -79,7 +79,7 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t log_it(L_WARNING, "dap_timerfd_start() failed: timerfd_create() errno=%d\n", errno); return NULL; } - // first expiration in 0 seconds after times start + // repeat never l_ts.it_interval.tv_sec = 0; l_ts.it_interval.tv_nsec = 0; // timeout for timer @@ -110,6 +110,7 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t l_timerfd->events_socket = l_events_socket; l_timerfd->callback = a_callback; l_timerfd->callback_arg = a_callback_arg; + l_timerfd->repeated = a_repeated; dap_worker_add_events_socket(l_events_socket, a_worker); return l_timerfd; @@ -123,21 +124,25 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock) { uint64_t l_ptiu64; dap_timerfd_t *l_timerfd = a_event_sock->_inheritor; - //printf("\nread() returned %d, %d\n", l_ptiu64, l_read_ret); - struct itimerspec l_ts; - // first expiration in 0 seconds after times start - l_ts.it_interval.tv_sec = 0; - l_ts.it_interval.tv_nsec = 0; - // timeout for timer - l_ts.it_value.tv_sec = l_timerfd->timeout_ms / 1000; - l_ts.it_value.tv_nsec = (l_timerfd->timeout_ms % 1000) * 1000000; - if(timerfd_settime(l_timerfd->tfd, 0, &l_ts, NULL) < 0) { - log_it(L_WARNING, "callback_timerfd_read() failed: timerfd_settime() errno=%d\n", errno); - } // run user's callback if(l_timerfd->callback) l_timerfd->callback(l_timerfd->callback_arg); - dap_events_socket_set_readable_unsafe(a_event_sock, true); + if (l_timerfd->repeated) { + //printf("\nread() returned %d, %d\n", l_ptiu64, l_read_ret); + struct itimerspec l_ts; + // repeat never + l_ts.it_interval.tv_sec = 0; + l_ts.it_interval.tv_nsec = 0; + // timeout for timer + l_ts.it_value.tv_sec = l_timerfd->timeout_ms / 1000; + l_ts.it_value.tv_nsec = (l_timerfd->timeout_ms % 1000) * 1000000; + if(timerfd_settime(l_timerfd->tfd, 0, &l_ts, NULL) < 0) { + log_it(L_WARNING, "callback_timerfd_read() failed: timerfd_settime() errno=%d\n", errno); + } + dap_events_socket_set_readable_unsafe(a_event_sock, true); + } else { + dap_events_socket_remove_and_delete_unsafe(l_timerfd->events_socket, false); + } } /** diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 5187a36dfd..aa7cf33af4 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -30,6 +30,7 @@ #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> +#include <sys/resource.h> #include "dap_common.h" #include "dap_math_ops.h" @@ -38,8 +39,8 @@ #define LOG_TAG "dap_worker" - -static time_t s_connection_timeout = 60; +// temporary too big timout for no closing sockets opened to not keep alive peers +static time_t s_connection_timeout = 20000; // 60; // seconds static void s_socket_all_check_activity( void * a_arg); @@ -60,6 +61,14 @@ int dap_worker_init( size_t a_conn_timeout ) { if ( a_conn_timeout ) s_connection_timeout = a_conn_timeout; + struct rlimit l_fdlimit; + if (getrlimit(RLIMIT_NOFILE, &l_fdlimit)) + return -1; + rlim_t l_oldlimit = l_fdlimit.rlim_cur; + l_fdlimit.rlim_cur = l_fdlimit.rlim_max; + if (setrlimit(RLIMIT_NOFILE, &l_fdlimit)) + return -2; + log_it(L_INFO, "Set maximum opened descriptors from %d to %d", l_oldlimit, l_fdlimit.rlim_cur); return 0; } @@ -102,7 +111,8 @@ void *dap_worker_thread(void *arg) l_worker->queue_es_reassign = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_reassign_callback ); l_worker->queue_callback= dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_callback_callback); l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback ); - l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker,s_connection_timeout / 2,s_socket_all_check_activity,l_worker); + l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker, s_connection_timeout * 1000 / 2, + s_socket_all_check_activity, l_worker, true ); pthread_setspecific(l_worker->events->pth_key_worker, l_worker); pthread_cond_broadcast(&l_worker->started_cond); @@ -156,23 +166,22 @@ void *dap_worker_thread(void *arg) log_it(L_ERROR, "dap_events_socket NULL"); continue; } - l_cur->last_time_active = l_cur_time; -// log_it(L_DEBUG, "Worker=%d fd=%d", l_worker->id, l_cur->socket); + // log_it(L_DEBUG, "Worker=%d fd=%d", l_worker->id, l_cur->socket); int l_sock_err = 0, l_sock_err_size = sizeof(l_sock_err); //connection already closed (EPOLLHUP - shutdown has been made in both directions) - if( l_flag_hup) { // && events[n].events & EPOLLERR) { + if( l_flag_hup) { switch (l_cur->type ){ case DESCRIPTOR_TYPE_SOCKET_LISTENING: case DESCRIPTOR_TYPE_SOCKET: getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size); - //if(!(events[n].events & EPOLLIN)) - //cur->no_close = false; if (l_sock_err) { dap_events_socket_set_readable_unsafe(l_cur, false); dap_events_socket_set_writable_unsafe(l_cur, false); l_cur->buf_out_size = 0; l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + l_flag_error = l_flag_read = l_flag_write = false; + l_cur->callbacks.error_callback(l_cur, l_sock_err); // Call callback to process error event log_it(L_INFO, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err)); } break; @@ -195,14 +204,14 @@ void *dap_worker_thread(void *arg) l_cur->callbacks.error_callback(l_cur, l_sock_err); // Call callback to process error event } - if (l_flag_hup) { + /*if (l_flag_hup) { log_it(L_INFO, "Client socket disconnected"); dap_events_socket_set_readable_unsafe(l_cur, false); dap_events_socket_set_writable_unsafe(l_cur, false); l_cur->buf_out_size = 0; l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; - } + }*/ if(l_flag_read) { @@ -283,6 +292,9 @@ void *dap_worker_thread(void *arg) if (l_must_read_smth){ // Socket/Descriptor read if(l_bytes_read > 0) { + if (l_cur->type == DESCRIPTOR_TYPE_SOCKET || l_cur->type == DESCRIPTOR_TYPE_SOCKET_UDP) { + l_cur->last_time_active = l_cur_time; + } l_cur->buf_in_size += l_bytes_read; //log_it(L_DEBUG, "Received %d bytes", l_bytes_read); if(l_cur->callbacks.read_callback){ @@ -419,25 +431,18 @@ void *dap_worker_thread(void *arg) if (l_cur->buf_out_size) { dap_events_socket_set_writable_unsafe(l_cur,true); } - if((l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !l_cur->no_close && l_cur->buf_out_size == 0) { - // protect against double deletion - l_cur->kill_signal = true; - //dap_events_socket_remove_and_delete(cur, true); - log_it(L_INFO, "Got signal to close %s, sock %u [thread %u]", l_cur->hostaddr, l_cur->socket, l_tn); - } else if (l_cur->buf_out_size ){ - log_it(L_INFO, "Got signal to close %s, sock %u [thread %u] but buffer is not empty(%zd)", l_cur->hostaddr, l_cur->socket, l_tn, - l_cur->buf_out_size); - } - if(l_cur->kill_signal && l_cur->buf_out_size == 0) { - log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ]", l_cur->socket, l_tn); - dap_events_socket_remove_and_delete_unsafe( l_cur, false); - }else if (l_cur->buf_out_size ){ - log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ] but buffer is not empty(%zd)", l_cur->socket, l_tn, - l_cur->buf_out_size); + if ((l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !l_cur->no_close) + { + if (l_cur->buf_out_size == 0) { + log_it(L_INFO, "Process signal to close %s, sock %u [thread %u]", l_cur->hostaddr, l_cur->socket, l_tn); + dap_events_socket_remove_and_delete_unsafe( l_cur, false); + } else if (l_cur->buf_out_size ) { + log_it(L_INFO, "Got signal to close %s, sock %u [thread %u] but buffer is not empty(%zd)", l_cur->hostaddr, l_cur->socket, l_tn, + l_cur->buf_out_size); + } } - if( l_worker->signal_exit){ log_it(L_NOTICE,"Worker :%u finished", l_worker->id); return NULL; @@ -482,7 +487,6 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg) dap_worker_t * w = a_es->worker; //log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new); if(dap_events_socket_check_unsafe( w, l_es_new)){ - log_it(L_WARNING, "Already assigned %d (%p), you're doing smth wrong", l_es_new->socket, l_es_new); // Socket already present in worker, it's OK return; } @@ -537,7 +541,7 @@ static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg { dap_events_socket_t * l_esocket = (dap_events_socket_t*) a_arg; if (dap_events_socket_check_unsafe(a_es->worker,l_esocket)){ - ((dap_events_socket_t*)a_arg)->kill_signal = true; // Send signal to socket to kill + ((dap_events_socket_t*)a_arg)->flags |= DAP_SOCK_SIGNAL_CLOSE; // Send signal to socket to kill }else log_it(L_INFO, "While we were sending the delete() message, esocket %p has been disconnected", l_esocket); } @@ -648,8 +652,8 @@ static void s_socket_all_check_activity( void * a_arg) //log_it(L_DEBUG,"Check sockets activity on worker #%u at %s", l_worker->id, l_curtimebuf); HASH_ITER(hh_worker, l_worker->esockets, l_es, tmp ) { - if ( l_es->type == DESCRIPTOR_TYPE_SOCKET ){ - if ( !l_es->kill_signal && + if ( l_es->type == DESCRIPTOR_TYPE_SOCKET || l_es->type == DESCRIPTOR_TYPE_SOCKET_UDP ){ + if ( !(l_es->flags & DAP_SOCK_SIGNAL_CLOSE) && ( l_curtime >= (l_es->last_time_active + s_connection_timeout) ) && !l_es->no_close ) { log_it( L_INFO, "Socket %u timeout (diff %u ), closing...", l_es->socket, l_curtime - (time_t)l_es->last_time_active - s_connection_timeout ); if (l_es->callbacks.error_callback) { diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index feb65aa366..3e7d44854f 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -146,7 +146,6 @@ typedef struct dap_events_socket { // Flags. TODO - rework in bool fields uint32_t flags; bool no_close; - atomic_bool kill_signal; atomic_bool is_initalized; bool was_reassigned; // Was reassigment at least once diff --git a/dap-sdk/net/core/include/dap_timerfd.h b/dap-sdk/net/core/include/dap_timerfd.h index 716e721871..0f600fe557 100644 --- a/dap-sdk/net/core/include/dap_timerfd.h +++ b/dap-sdk/net/core/include/dap_timerfd.h @@ -42,10 +42,11 @@ typedef struct dap_timerfd { dap_events_socket_t *events_socket; dap_timerfd_callback_t callback; void *callback_arg; + bool repeated; } dap_timerfd_t; int dap_timerfd_init(); -dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *callback_arg); -dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg); +dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *callback_arg, bool a_repeated); +dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg, bool a_repeated); void dap_timerfd_delete(dap_timerfd_t *l_timerfd); diff --git a/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h b/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h index 47dfac6ebf..43c8949c09 100644 --- a/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h +++ b/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h @@ -22,7 +22,7 @@ #pragma once #define STREAM_CH_PKT_TYPE_REQUEST 0x0 -#define STREAM_CH_PKT_TYPE_KEEPALIVE 0x11 +//#define STREAM_CH_PKT_TYPE_KEEPALIVE 0x11 #include <stdint.h> #include <stddef.h> diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index e57dc4e405..ebe00c1c5a 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -73,15 +73,10 @@ static void s_udp_esocket_new(dap_events_socket_t* a_esocket,void * a_arg); static dap_stream_t * s_stream_new(dap_http_client_t * a_http_client); // Create new stream static void s_http_client_delete(dap_http_client_t * a_esocket, void * a_arg); -//struct ev_loop *keepalive_loop; -pthread_t keepalive_thread; - static dap_stream_t *s_stream_keepalive_list = NULL; static pthread_mutex_t s_mutex_keepalive_list; - static void s_keepalive_cb( void ); -static bool s_keep_alive_loop_quit_signal = false; static bool s_dump_packet_headers = false; bool dap_stream_get_dump_packet_headers(){ return s_dump_packet_headers; } @@ -123,10 +118,8 @@ int dap_stream_init(dap_config_t * a_config) s_dap_stream_load_preferred_encryption_type(a_config); s_dump_packet_headers = dap_config_get_item_bool_default(g_config,"general","debug_dump_stream_headers",false); - s_keep_alive_loop_quit_signal = false; pthread_mutex_init( &s_mutex_keepalive_list, NULL ); - //pthread_create( &keepalive_thread, NULL, stream_loop, NULL ); - + dap_timerfd_start(STREAM_KEEPALIVE_TIMEOUT * 1000, (dap_timerfd_callback_t)s_keepalive_cb, NULL, true); log_it(L_NOTICE,"Init streaming module"); return 0; @@ -137,11 +130,7 @@ int dap_stream_init(dap_config_t * a_config) */ void dap_stream_deinit() { - s_keep_alive_loop_quit_signal = true; - pthread_join( keepalive_thread, NULL ); - pthread_mutex_destroy( &s_mutex_keepalive_list ); - dap_stream_ch_deinit( ); } @@ -292,11 +281,11 @@ void dap_stream_delete(dap_stream_t *a_stream) log_it(L_ERROR,"stream delete NULL instance"); return; } - pthread_mutex_lock(&s_mutex_keepalive_list); - if(s_stream_keepalive_list){ + if (a_stream->prev) { + pthread_mutex_lock(&s_mutex_keepalive_list); DL_DELETE(s_stream_keepalive_list, a_stream); + pthread_mutex_unlock(&s_mutex_keepalive_list); } - pthread_mutex_unlock(&s_mutex_keepalive_list); while (a_stream->channel_count) { dap_stream_ch_delete(a_stream->channel[a_stream->channel_count - 1]); @@ -338,30 +327,13 @@ dap_stream_t* dap_stream_new_es_client(dap_events_socket_t * a_esocket) ret->esocket = a_esocket; ret->buf_defrag_size=0; ret->is_client_to_uplink = true; + pthread_mutex_lock(&s_mutex_keepalive_list); + DL_APPEND(s_stream_keepalive_list, ret); + pthread_mutex_unlock(&s_mutex_keepalive_list); return ret; } -/** - Function for keepalive loop -static void keepalive_cb (EV_P_ ev_timer *w, int revents) -{ - struct dap_stream *sid = w->data; - if(sid->keepalive_passed < STREAM_KEEPALIVE_PASSES) - { - dap_stream_send_keepalive(sid); - sid->keepalive_passed+=1; - } - else{ - log_it(L_INFO, "Client disconnected"); - ev_timer_stop (keepalive_loop, &sid->keepalive_watcher); - void * arg; - stream_dap_delete(sid->conn,arg); - } -} -**/ - - /** * @brief stream_header_read Read headers callback for HTTP * @param a_http_client HTTP client structure @@ -724,10 +696,9 @@ static void s_stream_proc_pkt_in(dap_stream_t * a_stream) size_t l_pkt_size = a_stream->pkt_buf_in_data_size; a_stream->pkt_buf_in=NULL; a_stream->pkt_buf_in_data_size=0; - a_stream->keepalive_passed = 0; - if(l_pkt->hdr.type == STREAM_PKT_TYPE_DATA_PACKET) - { + switch (l_pkt->hdr.type) { + case STREAM_PKT_TYPE_DATA_PACKET: { dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_stream->pkt_cache; if(dap_stream_pkt_read_unsafe(a_stream,l_pkt, l_ch_pkt, sizeof(a_stream->pkt_cache))==0){ @@ -750,27 +721,35 @@ static void s_stream_proc_pkt_in(dap_stream_t * a_stream) if(l_ch){ l_ch->stat.bytes_read+=l_ch_pkt->hdr.size; - if(l_ch->proc) - if(l_ch->proc->packet_in_callback){ - if ( s_dump_packet_headers ){ - log_it(L_INFO,"Income channel packet: id='%c' size=%u type=0x%02Xu seq_id=0x%016X enc_type=0x%02X",(char) l_ch_pkt->hdr.id, - l_ch_pkt->hdr.size, l_ch_pkt->hdr.type, l_ch_pkt->hdr.seq_id , l_ch_pkt->hdr.enc_type); - } - l_ch->proc->packet_in_callback(l_ch,l_ch_pkt); + if(l_ch->proc && l_ch->proc->packet_in_callback){ + if ( s_dump_packet_headers ){ + log_it(L_INFO,"Income channel packet: id='%c' size=%u type=0x%02Xu seq_id=0x%016X enc_type=0x%02X",(char) l_ch_pkt->hdr.id, + l_ch_pkt->hdr.size, l_ch_pkt->hdr.type, l_ch_pkt->hdr.seq_id , l_ch_pkt->hdr.enc_type); } - - } else if(l_ch_pkt->hdr.id == TECHICAL_CHANNEL_ID && l_ch_pkt->hdr.type == STREAM_CH_PKT_TYPE_KEEPALIVE){ - dap_stream_send_keepalive(a_stream); + l_ch->proc->packet_in_callback(l_ch,l_ch_pkt); + } } else{ log_it(L_WARNING, "Input: unprocessed channel packet id '%c'",(char) l_ch_pkt->hdr.id ); } - } else if(l_pkt->hdr.type == STREAM_PKT_TYPE_SERVICE_PACKET) { + } break; + case STREAM_PKT_TYPE_SERVICE_PACKET: { stream_srv_pkt_t * srv_pkt = DAP_NEW(stream_srv_pkt_t); memcpy(srv_pkt, l_pkt->data,sizeof(stream_srv_pkt_t)); uint32_t session_id = srv_pkt->session_id; check_session(session_id,a_stream->esocket); DAP_DELETE(srv_pkt); - } else { + } break; + case STREAM_PKT_TYPE_KEEPALIVE: { + //log_it(L_DEBUG, "Keep alive check recieved"); + stream_pkt_hdr_t l_ret_pkt = {}; + l_ret_pkt.type = STREAM_PKT_TYPE_ALIVE; + memcpy(l_ret_pkt.sig, c_dap_stream_sig, sizeof(l_ret_pkt.sig)); + dap_events_socket_write_unsafe(a_stream->esocket, &l_ret_pkt, sizeof(l_ret_pkt)); + } break; + case STREAM_PKT_TYPE_ALIVE: + //log_it(L_DEBUG, "Keep alive response recieved"); + break; + default: log_it(L_WARNING, "Unknown header type"); } @@ -810,20 +789,13 @@ static bool s_detect_loose_packet(dap_stream_t * a_stream) static void s_keepalive_cb( void ) { dap_stream_t *l_stream, *tmp; - return; pthread_mutex_lock( &s_mutex_keepalive_list ); + stream_pkt_hdr_t l_pkt = {}; + l_pkt.type = STREAM_PKT_TYPE_KEEPALIVE; + memcpy(l_pkt.sig, c_dap_stream_sig, sizeof(l_pkt.sig)); DL_FOREACH_SAFE( s_stream_keepalive_list, l_stream, tmp ) { - if ( l_stream->keepalive_passed < STREAM_KEEPALIVE_PASSES ) { - dap_stream_send_keepalive( l_stream ); - l_stream->keepalive_passed += 1; - } - else { - log_it( L_INFO, "Client disconnected" ); - DL_DELETE( s_stream_keepalive_list, l_stream ); - s_esocket_callback_delete( l_stream->esocket, NULL ); - } + dap_events_socket_write_mt(l_stream->stream_worker->worker, l_stream->esocket, &l_pkt, sizeof(l_pkt)); } - pthread_mutex_unlock( &s_mutex_keepalive_list ); } diff --git a/dap-sdk/net/stream/stream/dap_stream_pkt.c b/dap-sdk/net/stream/stream/dap_stream_pkt.c index 13f3b4224d..1394eb16a8 100644 --- a/dap-sdk/net/stream/stream/dap_stream_pkt.c +++ b/dap-sdk/net/stream/stream/dap_stream_pkt.c @@ -186,16 +186,3 @@ size_t dap_stream_pkt_write_mt(dap_worker_t * a_w,dap_events_socket_t *a_es, dap return a_data_size; } - -/** - * @brief dap_stream_send_keepalive - * @param a_stream - */ -void dap_stream_send_keepalive(dap_stream_t * a_stream) -{ - dap_stream_ch_pkt_hdr_t l_pkt={0}; - l_pkt.id = TECHICAL_CHANNEL_ID; - l_pkt.type=STREAM_CH_PKT_TYPE_KEEPALIVE; - - dap_stream_pkt_write_unsafe( a_stream, &l_pkt, sizeof(l_pkt) ); -} diff --git a/dap-sdk/net/stream/stream/include/dap_stream.h b/dap-sdk/net/stream/stream/include/dap_stream.h index 71fb321518..39f076f1b0 100644 --- a/dap-sdk/net/stream/stream/include/dap_stream.h +++ b/dap-sdk/net/stream/stream/include/dap_stream.h @@ -46,7 +46,6 @@ typedef struct dap_stream_pkt dap_stream_pkt_t; typedef struct dap_events_socket dap_events_socket_t; #define STREAM_BUF_SIZE_MAX 500000 #define STREAM_KEEPALIVE_TIMEOUT 3 // How often send keeplive messages (seconds) -#define STREAM_KEEPALIVE_PASSES 3 // How many messagges without answers need for disconnect client and close session typedef void (*dap_stream_callback)( dap_stream_t *,void*); @@ -62,9 +61,6 @@ typedef struct dap_stream { bool is_live; bool is_client_to_uplink ; -// ev_timer keepalive_watcher; // Watcher for keepalive loop - uint8_t keepalive_passed; // Number of sended keepalive messages - struct dap_stream_pkt * in_pkt; struct dap_stream_pkt *pkt_buf_in; size_t pkt_buf_in_data_size; diff --git a/dap-sdk/net/stream/stream/include/dap_stream_pkt.h b/dap-sdk/net/stream/stream/include/dap_stream_pkt.h index 34beb4e381..6f15299eeb 100644 --- a/dap-sdk/net/stream/stream/include/dap_stream_pkt.h +++ b/dap-sdk/net/stream/stream/include/dap_stream_pkt.h @@ -28,7 +28,8 @@ typedef struct dap_stream dap_stream_t; typedef struct dap_stream_session dap_stream_session_t; #define STREAM_PKT_TYPE_DATA_PACKET 0x00 #define STREAM_PKT_TYPE_SERVICE_PACKET 0xff -//#define STREAM_PKT_TYPE_KEEPALIVE 0x11 +#define STREAM_PKT_TYPE_KEEPALIVE 0x11 +#define STREAM_PKT_TYPE_ALIVE 0x12 typedef struct stream_pkt_hdr{ uint8_t sig[8]; // Signature to find out beginning of the frame diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 6ce9e8078b..cab4576d56 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -724,6 +724,7 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, NULL, 0, l_ch_chain->callback_notify_arg); } else { // Process one chain from l_ch_chain->request_atom_iter + log_it(L_INFO, "Send one CHAIN packet len=%d", l_ch_chain->request_atom_iter->cur_size); dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index e5e13eccf2..9644019a1f 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -539,7 +539,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) // find dap_chain_id_t dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb"); dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t ) {}; - l_node_client->state = NODE_CLIENT_STATE_CONNECTED; + dap_chain_node_client_reset(l_node_client); size_t l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id, l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); if (l_res == 0) { @@ -548,7 +548,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) } // wait for finishing of request - int timeout_ms = 30000; // 5 min = 300 sec = 300 000 ms + int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms // TODO add progress info to console l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); switch (l_res) { @@ -561,7 +561,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) default: log_it(L_INFO, "Node sync error %d",l_res); } - l_node_client->state = NODE_CLIENT_STATE_CONNECTED; + dap_chain_node_client_reset(l_node_client); l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS, l_net->pub.id, l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); @@ -603,13 +603,13 @@ static int s_net_states_proc(dap_chain_net_t * l_net) dap_chain_t * l_chain = NULL; int l_res = 0; DL_FOREACH (l_net->pub.chains, l_chain) { - l_node_client->state = NODE_CLIENT_STATE_CONNECTED; + dap_chain_node_client_reset(l_node_client); dap_stream_ch_chain_sync_request_t l_request ; memset(&l_request, 0, sizeof (l_request)); dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id, l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request)); // wait for finishing of request - int timeout_ms = 20000; // 2 min = 120 sec = 120 000 ms + int timeout_ms = 120000; // 2 min = 120 sec = 120 000 ms // TODO add progress info to console l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); switch (l_res) { @@ -623,7 +623,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) default: log_it(L_ERROR, "Sync of chain '%s' error %d", l_chain->name,l_res); } - l_node_client->state = NODE_CLIENT_STATE_CONNECTED; + dap_chain_node_client_reset(l_node_client); dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS, l_net->pub.id, l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request)); l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); @@ -711,7 +711,7 @@ static void *s_net_check_thread ( void *a_net ) return NULL; } - log_it(L_DEBUG, "Check net states"); + //log_it(L_DEBUG, "Check net states"); // check or start sync s_net_states_proc( l_net ); @@ -730,7 +730,7 @@ static void *s_net_check_thread ( void *a_net ) l_net_pvt->flags |= F_DAP_CHAIN_NET_GO_SYNC; s_net_states_proc( l_net ); } - log_it(L_DEBUG, "Sleep on 10 seconds..."); + //log_it(L_DEBUG, "Sleep on 10 seconds..."); sleep(10); } return NULL; diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 2037b86213..0000663b4a 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -474,6 +474,13 @@ dap_chain_node_client_t* dap_chain_node_client_connect(dap_chain_node_info_t *a_ return dap_chain_client_connect(a_node_info, l_stage_target, l_active_channels); } +void dap_chain_node_client_reset(dap_chain_node_client_t *a_client) +{ + if (a_client->state > NODE_CLIENT_STATE_CONNECTED) { + a_client->state = NODE_CLIENT_STATE_CONNECTED; + } +} + /** * Close connection to server, delete chain_node_client_t *client */ @@ -535,6 +542,12 @@ int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_s return 0; } + if (a_client->state < NODE_CLIENT_STATE_CONNECTED && a_waited_state > NODE_CLIENT_STATE_CONNECTED) { + log_it(L_WARNING, "Waited state can't be achieved"); + pthread_mutex_unlock(&a_client->wait_mutex); + return -2; + } + #ifndef _WIN32 // prepare for signal waiting struct timespec l_cond_timeout; diff --git a/modules/net/include/dap_chain_node_client.h b/modules/net/include/dap_chain_node_client.h index 265bb37a72..11d4b6ce67 100644 --- a/modules/net/include/dap_chain_node_client.h +++ b/modules/net/include/dap_chain_node_client.h @@ -90,7 +90,10 @@ dap_chain_node_client_t* dap_chain_client_connect(dap_chain_node_info_t *a_node_ * return a connection handle, or NULL, if an error */ dap_chain_node_client_t* dap_chain_node_client_connect(dap_chain_node_info_t *node_info); - +/** + * Reset client state to connected state if it is connected + */ +void dap_chain_node_client_reset(dap_chain_node_client_t *a_client); /** * Close connection to server, delete chain_node_client_t *client */ -- GitLab