diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index d9f243e987a7ef624c3e3c44e64018f57a749aa1..bdcfe0a9bc55b13b78059073a07cfe2528c904ea 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -107,6 +107,7 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct dap_worker * a_worker) { a_es->last_ping_request = time(NULL); + a_es->worker = a_worker; dap_worker_add_events_socket(a_es,a_worker); } @@ -140,6 +141,7 @@ void dap_events_socket_assign_on_worker_unsafe(dap_events_socket_t * a_es, struc */ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags) { + UNUSED(a_flags); dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); l_es->type = DESCRIPTOR_TYPE_PIPE; l_es->worker = a_w; @@ -724,32 +726,32 @@ size_t dap_events_socket_write_mt(dap_worker_t * a_w,dap_events_socket_t *a_es, */ size_t dap_events_socket_write_f_mt(dap_worker_t * a_w,dap_events_socket_t *a_es, const char * format,...) { - va_list ap; + va_list ap, ap_copy; va_start(ap,format); + va_copy(ap_copy, ap); int l_data_size = dap_vsnprintf(NULL,0,format,ap); va_end(ap); if (l_data_size <0 ){ log_it(L_ERROR,"Can't write out formatted data '%s' with values",format); return 0; } - l_data_size++; // To calc trailing zero dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); l_msg->esocket = a_es; l_msg->data = DAP_NEW_SIZE(void,l_data_size + 1); l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; - va_start(ap, format); - l_data_size = dap_vsnprintf(l_msg->data,0,format,ap); - va_end(ap); + l_data_size = dap_vsprintf(l_msg->data,format,ap_copy); + va_end(ap_copy); if (l_data_size <0 ){ log_it(L_ERROR,"Can't write out formatted data '%s' with values",format); + DAP_DELETE(l_msg->data); DAP_DELETE(l_msg); return 0; } - l_data_size++; l_msg->data_size = l_data_size; int l_ret= dap_events_socket_queue_ptr_send(a_w->queue_es_io, l_msg ); if (l_ret!=0){ log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret); + DAP_DELETE(l_msg->data); DAP_DELETE(l_msg); return 0; } diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 97cf057c5efa3676c5f6fb4a5207c995b5aa0e08..a4bcbc7ae66c1856f827641f062753716627b193 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -55,7 +55,12 @@ int dap_proc_thread_init(uint32_t a_threads_count){ pthread_cond_init( &s_threads[i].started_cond, NULL ); pthread_mutex_init( &s_threads[i].started_mutex, NULL ); pthread_mutex_lock( &s_threads[i].started_mutex ); - pthread_create( &s_threads[i].thread_id,NULL, s_proc_thread_function, &s_threads[i] ); + int res = pthread_create( &s_threads[i].thread_id,NULL, s_proc_thread_function, &s_threads[i] ); + if (res) { + log_it(L_CRITICAL, "Create thread failed with code %d", res); + pthread_mutex_unlock( &s_threads[i].started_mutex ); + return -1; + } pthread_cond_wait( &s_threads[i].started_cond, &s_threads[i].started_mutex ); pthread_mutex_unlock( &s_threads[i].started_mutex ); } @@ -138,12 +143,10 @@ static void * s_proc_thread_function(void * a_arg) { dap_proc_thread_t * l_thread = (dap_proc_thread_t*) a_arg; assert(l_thread); - dap_cpu_assign_thread_on(l_thread->cpu_id); struct sched_param l_shed_params; l_shed_params.sched_priority = 0; pthread_setschedparam(pthread_self(),SCHED_BATCH ,&l_shed_params); - #ifdef DAP_EVENTS_CAPS_EPOLL struct epoll_event l_epoll_events[DAP_MAX_EPOLL_EVENTS] = {{0}}; l_thread->epoll_ctl = epoll_create( DAP_MAX_EPOLL_EVENTS ); diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index c8f94ebf8612fc2f9c91337d4386234f3ddbe6a0..5965ce773dbe21a036985be44e6241cfa58bf016 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -153,10 +153,16 @@ void *dap_worker_thread(void *arg) l_cur->callbacks.error_callback(l_cur, NULL); // Call callback to process error event } + if (l_epoll_events[n].events & EPOLLRDHUP) { + log_it(L_INFO, "Client socket disconnected"); + dap_events_socket_set_readable_unsafe(l_cur, false); + l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + //l_epoll_events[n].events &= ~EPOLLIN; + } if(l_epoll_events[n].events & EPOLLIN) { - //log_it(DEBUG,"Comes connection in active read set"); + //log_it(L_DEBUG, "Comes connection with type %d", l_cur->type); if(l_cur->buf_in_size == sizeof(l_cur->buf_in)) { log_it(L_WARNING, "Buffer is full when there is smth to read. Its dropped!"); l_cur->buf_in_size = 0; @@ -221,7 +227,7 @@ void *dap_worker_thread(void *arg) if (l_must_read_smth){ // Socket/Descriptor read if(l_bytes_read > 0) { l_cur->buf_in_size += l_bytes_read; - //log_it(DEBUG, "Received %d bytes", bytes_read); + //log_it(L_DEBUG, "Received %d bytes", l_bytes_read); if(l_cur->callbacks.read_callback) l_cur->callbacks.read_callback(l_cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well else{ @@ -236,10 +242,9 @@ void *dap_worker_thread(void *arg) l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; } } - else if(l_bytes_read == 0) { - log_it(L_INFO, "Client socket disconnected"); - dap_events_socket_set_readable_unsafe(l_cur, false); - l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + else { + log_it(L_WARNING, "EPOLLIN triggered but nothing to read"); + dap_events_socket_set_readable_unsafe(l_cur,false); } } } diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index 2374eedcaa2bf51e0ecbb07e96416f5b792396a1..f47b347b670f638c66d790573583fcdc33e00dde 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -347,7 +347,6 @@ void dap_stream_delete(dap_stream_t *a_stream) if(s_stream_keepalive_list){ DL_DELETE(s_stream_keepalive_list, a_stream); } - a_stream->esocket = NULL; pthread_mutex_unlock(&s_mutex_keepalive_list); while (a_stream->channel_count) { @@ -358,6 +357,7 @@ void dap_stream_delete(dap_stream_t *a_stream) if(a_stream->session) dap_stream_session_close_mt(a_stream->session->id); // TODO make stream close after timeout, not momentaly a_stream->session = NULL; + a_stream->esocket = NULL; pthread_rwlock_unlock(&a_stream->rwlock); pthread_rwlock_destroy(&a_stream->rwlock); DAP_DELETE(a_stream); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 39f82f919e49a75a52a96b22c5be6267c3324ce2..9263c5cfcf70ec6a4441053fd7c4a58d50dabd38 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -481,9 +481,8 @@ static int s_net_states_proc(dap_chain_net_t * l_net) dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb"); dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t ) {}; - size_t l_res = dap_stream_ch_chain_pkt_write(dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id()), - DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id, l_chain_id, - l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); + size_t l_res = dap_stream_ch_chain_pkt_write(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id, + l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); if (l_res == 0) { log_it(L_WARNING, "Can't send GDB sync request"); continue; @@ -503,10 +502,9 @@ static int s_net_states_proc(dap_chain_net_t * l_net) default: log_it(L_INFO, "Node sync error %d",l_res); } - if (!dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id())) { - l_res = dap_stream_ch_chain_pkt_write(dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id()), - DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS, l_net->pub.id, l_chain_id, - l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); + if (dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id())) { + l_res = dap_stream_ch_chain_pkt_write(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS, l_net->pub.id, + l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); switch (l_res) { case -1: