diff --git a/CMakeLists.txt b/CMakeLists.txt index 4fde6a830131534bbb517067f6eaec3262b50e8b..a84c8b4610fd0355bffc26d097ddeb8915f6c723 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 3.0) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.9-5") +set(CELLFRAME_SDK_NATIVE_VERSION "2.9-6") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") message("Cellframe modules: ${CELLFRAME_MODULES}") diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c index 5d64b4b80b6dc2cdedba58829c598106c6eaaa05..83cd14845f3626e56a155a73a0aa0790c049b0a7 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/net/core/dap_events.c @@ -217,7 +217,6 @@ int dap_events_init( uint32_t a_threads_count, size_t a_conn_timeout ) err: log_it(L_ERROR,"Deinit events subsystem"); dap_events_deinit(); - dap_worker_deinit(); return -1; } @@ -226,8 +225,11 @@ err: */ void dap_events_deinit( ) { + dap_proc_thread_deinit(); dap_events_socket_deinit(); dap_worker_deinit(); + + dap_events_wait(s_events_default); if ( s_threads ) DAP_DELETE( s_threads ); @@ -379,7 +381,7 @@ int dap_events_wait( dap_events_t *a_events ) void dap_events_stop_all( ) { for( uint32_t i = 0; i < s_threads_count; i ++ ) { - dap_events_socket_event_signal( s_workers[i]->event_exit, 0); + dap_events_socket_event_signal( s_workers[i]->event_exit, 1); } // TODO implement signal to stop the workers } diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 9f5b326d2719ac7f2c2c6e538a05e6276ec0d016..da7b629cdd0f61f9f9d60f5c8a609ec9bbd0402f 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -62,6 +62,7 @@ static size_t s_threads_count = 0; static bool s_debug_reactor = false; static dap_proc_thread_t * s_threads = NULL; static void * s_proc_thread_function(void * a_arg); +static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags); /** * @brief dap_proc_thread_init @@ -97,14 +98,19 @@ int dap_proc_thread_init(uint32_t a_threads_count){ */ void dap_proc_thread_deinit() { - // Signal to cancel working threads and wait for finish - // TODO: Android realization -#ifndef DAP_OS_ANDROID - for (size_t i = 0; i < s_threads_count; i++ ){ - pthread_cancel(s_threads[i].thread_id); + for (uint32_t i = 0; i < s_threads_count; i++){ + dap_events_socket_event_signal(s_threads[i].event_exit, 1); pthread_join(s_threads[i].thread_id, NULL); } -#endif + + // Signal to cancel working threads and wait for finish + // TODO: Android realization +//#ifndef DAP_OS_ANDROID +// for (size_t i = 0; i < s_threads_count; i++ ){ +// pthread_cancel(s_threads[i].thread_id); +// pthread_join(s_threads[i].thread_id, NULL); +// } +//#endif } @@ -145,13 +151,15 @@ dap_proc_thread_t * dap_proc_thread_get_auto() static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_value) { (void) a_value; -// log_it(L_DEBUG, "--> Proc event callback start"); + if(s_debug_reactor) + log_it(L_DEBUG, "--> Proc event callback start"); dap_proc_thread_t * l_thread = (dap_proc_thread_t *) a_esocket->_inheritor; dap_proc_queue_item_t * l_item = l_thread->proc_queue->item_first; dap_proc_queue_item_t * l_item_old = NULL; bool l_is_anybody_for_repeat=false; while(l_item){ -// log_it(L_INFO, "Proc event callback: %p/%p", l_item->callback, l_item->callback_arg); + if(s_debug_reactor) + log_it(L_INFO, "Proc event callback: %p/%p", l_item->callback, l_item->callback_arg); bool l_is_finished = l_item->callback(l_thread, l_item->callback_arg); if (l_is_finished){ if ( l_item->prev ){ @@ -176,9 +184,11 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_va DAP_DELETE(l_item); l_item = l_thread->proc_queue->item_first; } -// log_it(L_DEBUG, "Proc event finished"); + if(s_debug_reactor) + log_it(L_DEBUG, "Proc event finished"); }else{ -// log_it(L_DEBUG, "Proc event not finished"); + if(s_debug_reactor) + log_it(L_DEBUG, "Proc event not finished"); l_item_old = l_item; l_item=l_item->prev; } @@ -186,7 +196,8 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_va } if(l_is_anybody_for_repeat) // Arm event if we have smth to proc again dap_events_socket_event_signal(a_esocket,1); -// log_it(L_DEBUG, "<-- Proc event callback end"); + if(s_debug_reactor) + log_it(L_DEBUG, "<-- Proc event callback end"); } @@ -393,7 +404,11 @@ static void * s_proc_thread_function(void * a_arg) dap_events_socket_assign_on_worker_mt(l_worker_related->proc_queue_input,l_worker_related); l_thread->proc_event = dap_events_socket_create_type_event_unsafe(NULL, s_proc_event_callback); + l_thread->event_exit = dap_events_socket_create_type_event_unsafe(NULL, s_event_exit_callback); + l_thread->proc_event->_inheritor = l_thread; // we pass thread through it + l_thread->event_exit->_inheritor = l_thread; + size_t l_workers_count= dap_events_worker_get_count(); assert(l_workers_count); l_thread->queue_assign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)*l_workers_count ); @@ -435,6 +450,18 @@ static void * s_proc_thread_function(void * a_arg) log_it(L_CRITICAL, "Can't add proc event on epoll ctl, err: %d", errno); return NULL; } + + // Add exit event + l_thread->event_exit->ev.events = l_thread->event_exit->ev_base_flags; + l_thread->event_exit->ev.data.ptr = l_thread->event_exit; + if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->event_exit->socket , &l_thread->event_exit->ev) != 0 ){ +#ifdef DAP_OS_WINDOWS + errno = WSAGetLastError(); +#endif + log_it(L_CRITICAL, "Can't add exit event on epoll ctl, err: %d", errno); + return NULL; + } + for (size_t n = 0; n< dap_events_worker_get_count(); n++){ // Queue asssign @@ -485,6 +512,13 @@ static void * s_proc_thread_function(void * a_arg) l_thread->poll[l_thread->poll_count].events = l_thread->proc_event->poll_base_flags; l_thread->esockets[l_thread->poll_count] = l_thread->proc_event; l_thread->poll_count++; + + // Add exit event + l_thread->poll[l_thread->poll_count].fd = l_thread->event_exit->fd; + l_thread->poll[l_thread->poll_count].events = l_thread->event_exit->poll_base_flags; + l_thread->esockets[l_thread->poll_count] = l_thread->event_exit; + l_thread->poll_count++; + for (size_t n = 0; n< dap_events_worker_get_count(); n++){ dap_events_socket_t * l_queue_assign_input = l_thread->queue_assign_input[n]; @@ -522,6 +556,7 @@ static void * s_proc_thread_function(void * a_arg) dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->proc_queue->esocket); dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->proc_event); + dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->event_exit); for (size_t n = 0; n< dap_events_worker_get_count(); n++){ // Queue asssign @@ -542,8 +577,11 @@ static void * s_proc_thread_function(void * a_arg) pthread_mutex_lock(&l_thread->started_mutex); pthread_mutex_unlock(&l_thread->started_mutex); pthread_cond_broadcast(&l_thread->started_cond); + + l_thread->signal_exit = false; + // Main loop - while (! l_thread->signal_kill){ + while (!l_thread->signal_kill && !l_thread->signal_exit){ int l_selected_sockets; size_t l_sockets_max; @@ -874,7 +912,8 @@ static void * s_proc_thread_function(void * a_arg) } #endif } - log_it(L_NOTICE, "Stop processing thread #%u", l_thread->cpu_id); + log_it(L_ATT, "Stop processing thread #%u", l_thread->cpu_id); + fflush(stdout); // cleanip inputs for (size_t n=0; n<dap_events_worker_get_count(); n++){ @@ -986,3 +1025,13 @@ void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a dap_proc_thread_esocket_update_poll_flags(a_thread, a_thread->queue_callback_input[a_worker_id]); #endif } + +static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags) +{ + (void) a_flags; + dap_proc_thread_t * l_thread = (dap_proc_thread_t *) a_es->_inheritor; + l_thread->signal_exit = true; + if(s_debug_reactor) + log_it(L_DEBUG, "Proc_thread :%u signaled to exit", l_thread->cpu_id); +} + diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 834ac79a863434d0bed6cf07335628af011fabcf..d57521e7d3c11c89134f8ea819ba67764f13ae35 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -998,7 +998,8 @@ static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags) { (void) a_flags; a_es->worker->signal_exit = true; - log_it(L_NOTICE, "Worker :%u signaled to exit", a_es->worker->id); + if(s_debug_reactor) + log_it(L_DEBUG, "Worker :%u signaled to exit", a_es->worker->id); } /** diff --git a/dap-sdk/net/core/include/dap_proc_thread.h b/dap-sdk/net/core/include/dap_proc_thread.h index caf4ea2e5a155c64996c185025c7432a47a61f46..68537c3dbec148b82389b80e3967260132e0ac51 100644 --- a/dap-sdk/net/core/include/dap_proc_thread.h +++ b/dap-sdk/net/core/include/dap_proc_thread.h @@ -43,6 +43,10 @@ typedef struct dap_proc_thread{ pthread_mutex_t started_mutex; bool signal_kill; + bool signal_exit; + + dap_events_socket_t * event_exit; + #ifdef DAP_EVENTS_CAPS_EPOLL EPOLL_HANDLE epoll_ctl;