diff --git a/CMakeLists.txt b/CMakeLists.txt index 7339ab97eb421ee23fd10e1e8a9c18829f194972..30ea512b783692f78a3b6d31d378b877b6925a79 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 3.0) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.8-25") +set(CELLFRAME_SDK_NATIVE_VERSION "2.8-26") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") message("Cellframe modules: ${CELLFRAME_MODULES}") diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c index a9fd6bc5e45c7c38f315a5aca7ef06d8d1765687..48feb6d1daf4c036a3c0aee67d4fa75ea812f01a 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/net/core/dap_events.c @@ -217,7 +217,6 @@ int dap_events_init( uint32_t a_threads_count, size_t a_conn_timeout ) err: log_it(L_ERROR,"Deinit events subsystem"); dap_events_deinit(); - dap_worker_deinit(); return -1; } @@ -226,8 +225,11 @@ err: */ void dap_events_deinit( ) { + dap_proc_thread_deinit(); dap_events_socket_deinit(); dap_worker_deinit(); + + dap_events_wait(s_events_default); if ( s_threads ) DAP_DELETE( s_threads ); @@ -378,7 +380,7 @@ int dap_events_wait( dap_events_t *a_events ) void dap_events_stop_all( ) { for( uint32_t i = 0; i < s_threads_count; i ++ ) { - dap_events_socket_event_signal( s_workers[i]->event_exit, 0); + dap_events_socket_event_signal( s_workers[i]->event_exit, 1); } // TODO implement signal to stop the workers } diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 9f5b326d2719ac7f2c2c6e538a05e6276ec0d016..ca78b085c7ddce05aad0a1692697468609327d33 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -62,6 +62,7 @@ static size_t s_threads_count = 0; static bool s_debug_reactor = false; static dap_proc_thread_t * s_threads = NULL; static void * s_proc_thread_function(void * a_arg); +static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags); /** * @brief dap_proc_thread_init @@ -97,14 +98,18 @@ int dap_proc_thread_init(uint32_t a_threads_count){ */ void dap_proc_thread_deinit() { - // Signal to cancel working threads and wait for finish - // TODO: Android realization -#ifndef DAP_OS_ANDROID - for (size_t i = 0; i < s_threads_count; i++ ){ - pthread_cancel(s_threads[i].thread_id); + for (uint32_t i = 0; i < s_threads_count; i++){ + dap_events_socket_event_signal(s_threads[i].event_exit, 1); pthread_join(s_threads[i].thread_id, NULL); } -#endif + // Signal to cancel working threads and wait for finish + // TODO: Android realization +//#ifndef DAP_OS_ANDROID +// for (size_t i = 0; i < s_threads_count; i++ ){ +// pthread_cancel(s_threads[i].thread_id); +// pthread_join(s_threads[i].thread_id, NULL); +// } +//#endif } @@ -145,13 +150,15 @@ dap_proc_thread_t * dap_proc_thread_get_auto() static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_value) { (void) a_value; -// log_it(L_DEBUG, "--> Proc event callback start"); + if(s_debug_reactor) + log_it(L_DEBUG, "--> Proc event callback start"); dap_proc_thread_t * l_thread = (dap_proc_thread_t *) a_esocket->_inheritor; dap_proc_queue_item_t * l_item = l_thread->proc_queue->item_first; dap_proc_queue_item_t * l_item_old = NULL; bool l_is_anybody_for_repeat=false; while(l_item){ -// log_it(L_INFO, "Proc event callback: %p/%p", l_item->callback, l_item->callback_arg); + if(s_debug_reactor) + log_it(L_INFO, "Proc event callback: %p/%p", l_item->callback, l_item->callback_arg); bool l_is_finished = l_item->callback(l_thread, l_item->callback_arg); if (l_is_finished){ if ( l_item->prev ){ @@ -176,9 +183,11 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_va DAP_DELETE(l_item); l_item = l_thread->proc_queue->item_first; } -// log_it(L_DEBUG, "Proc event finished"); + if(s_debug_reactor) + log_it(L_DEBUG, "Proc event finished"); }else{ -// log_it(L_DEBUG, "Proc event not finished"); + if(s_debug_reactor) + log_it(L_DEBUG, "Proc event not finished"); l_item_old = l_item; l_item=l_item->prev; } @@ -186,7 +195,8 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_va } if(l_is_anybody_for_repeat) // Arm event if we have smth to proc again dap_events_socket_event_signal(a_esocket,1); -// log_it(L_DEBUG, "<-- Proc event callback end"); + if(s_debug_reactor) + log_it(L_DEBUG, "<-- Proc event callback end"); } @@ -369,7 +379,7 @@ static void * s_proc_thread_function(void * a_arg) dap_proc_thread_t * l_thread = (dap_proc_thread_t*) a_arg; assert(l_thread); dap_cpu_assign_thread_on(l_thread->cpu_id); - + struct sched_param l_shed_params; l_shed_params.sched_priority = 0; #if defined(DAP_OS_WINDOWS) @@ -393,7 +403,11 @@ static void * s_proc_thread_function(void * a_arg) dap_events_socket_assign_on_worker_mt(l_worker_related->proc_queue_input,l_worker_related); l_thread->proc_event = dap_events_socket_create_type_event_unsafe(NULL, s_proc_event_callback); + l_thread->event_exit = dap_events_socket_create_type_event_unsafe(NULL, s_event_exit_callback); + l_thread->proc_event->_inheritor = l_thread; // we pass thread through it + l_thread->event_exit->_inheritor = l_thread; + size_t l_workers_count= dap_events_worker_get_count(); assert(l_workers_count); l_thread->queue_assign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)*l_workers_count ); @@ -436,6 +450,17 @@ static void * s_proc_thread_function(void * a_arg) return NULL; } + // Add exit event + l_thread->event_exit->ev.events = l_thread->event_exit->ev_base_flags; + l_thread->event_exit->ev.data.ptr = l_thread->event_exit; + if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->event_exit->socket , &l_thread->event_exit->ev) != 0 ){ +#ifdef DAP_OS_WINDOWS + errno = WSAGetLastError(); +#endif + log_it(L_CRITICAL, "Can't add exit event on epoll ctl, err: %d", errno); + return NULL; + } + for (size_t n = 0; n< dap_events_worker_get_count(); n++){ // Queue asssign l_thread->queue_assign_input[n]->ev.events = l_thread->queue_assign_input[n]->ev_base_flags ; @@ -486,6 +511,12 @@ static void * s_proc_thread_function(void * a_arg) l_thread->esockets[l_thread->poll_count] = l_thread->proc_event; l_thread->poll_count++; + // Add exit event + l_thread->poll[l_thread->poll_count].fd = l_thread->event_exit->fd; + l_thread->poll[l_thread->poll_count].events = l_thread->event_exit->poll_base_flags; + l_thread->esockets[l_thread->poll_count] = l_thread->event_exit; + l_thread->poll_count++; + for (size_t n = 0; n< dap_events_worker_get_count(); n++){ dap_events_socket_t * l_queue_assign_input = l_thread->queue_assign_input[n]; dap_events_socket_t * l_queue_io_input = l_thread->queue_io_input[n]; @@ -522,6 +553,7 @@ static void * s_proc_thread_function(void * a_arg) dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->proc_queue->esocket); dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->proc_event); + dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->event_exit); for (size_t n = 0; n< dap_events_worker_get_count(); n++){ // Queue asssign @@ -542,8 +574,11 @@ static void * s_proc_thread_function(void * a_arg) pthread_mutex_lock(&l_thread->started_mutex); pthread_mutex_unlock(&l_thread->started_mutex); pthread_cond_broadcast(&l_thread->started_cond); + + l_thread->signal_exit = false; + // Main loop - while (! l_thread->signal_kill){ + while (!l_thread->signal_kill && !l_thread->signal_exit){ int l_selected_sockets; size_t l_sockets_max; @@ -874,7 +909,8 @@ static void * s_proc_thread_function(void * a_arg) } #endif } - log_it(L_NOTICE, "Stop processing thread #%u", l_thread->cpu_id); + log_it(L_ATT, "Stop processing thread #%u", l_thread->cpu_id); + fflush(stdout); // cleanip inputs for (size_t n=0; n<dap_events_worker_get_count(); n++){ @@ -986,3 +1022,12 @@ void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a dap_proc_thread_esocket_update_poll_flags(a_thread, a_thread->queue_callback_input[a_worker_id]); #endif } + +static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags) +{ + (void) a_flags; + dap_proc_thread_t * l_thread = (dap_proc_thread_t *) a_es->_inheritor; + l_thread->signal_exit = true; + if(s_debug_reactor) + log_it(L_DEBUG, "Proc_thread :%u signaled to exit", l_thread->cpu_id); +} diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 1f4e1812115e1d84f4c41a6fcfdb955cfbba7e51..635ba44d8ca4a7d2a68e0f85891f83ca55749a24 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -926,7 +926,8 @@ static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags) { (void) a_flags; a_es->worker->signal_exit = true; - log_it(L_NOTICE, "Worker :%u signaled to exit", a_es->worker->id); + if(s_debug_reactor) + log_it(L_DEBUG, "Worker :%u signaled to exit", a_es->worker->id); } /** diff --git a/dap-sdk/net/core/include/dap_proc_thread.h b/dap-sdk/net/core/include/dap_proc_thread.h index caf4ea2e5a155c64996c185025c7432a47a61f46..6775165a8697519002e39903e25a6ffb95d50360 100644 --- a/dap-sdk/net/core/include/dap_proc_thread.h +++ b/dap-sdk/net/core/include/dap_proc_thread.h @@ -43,6 +43,9 @@ typedef struct dap_proc_thread{ pthread_mutex_t started_mutex; bool signal_kill; + bool signal_exit; + + dap_events_socket_t * event_exit; #ifdef DAP_EVENTS_CAPS_EPOLL EPOLL_HANDLE epoll_ctl;