diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c index 85637f01faeeab6a6db109ceb18869c5bddbc5c3..d8bf479eff2148f637eafcdf4b9c02ce07398ba7 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/net/core/dap_events.c @@ -106,8 +106,8 @@ static dap_events_t * s_events_default = NULL; /** * @brief dap_get_cpu_count - * - * @return uint32_t + * + * @return uint32_t */ uint32_t dap_get_cpu_count( ) { @@ -123,7 +123,7 @@ uint32_t dap_get_cpu_count( ) CPU_ZERO( &cs ); #endif -#if defined (DAP_OS_ANDROID) +#if defined (DAP_OS_ANDROID) sched_getaffinity( 0, sizeof(cs), &cs ); #elif defined (DAP_OS_DARWIN) int count=0; @@ -151,8 +151,8 @@ uint32_t dap_get_cpu_count( ) /** * @brief dap_cpu_assign_thread_on - * - * @param a_cpu_id + * + * @param a_cpu_id */ void dap_cpu_assign_thread_on(uint32_t a_cpu_id) { @@ -236,11 +236,11 @@ err: */ void dap_events_deinit( ) { - dap_proc_thread_deinit(); + dap_proc_thread_deinit(); dap_events_socket_deinit(); dap_worker_deinit(); - - dap_events_wait(s_events_default); + + dap_events_wait(s_events_default); if ( s_threads ) DAP_DELETE( s_threads ); @@ -267,7 +267,7 @@ dap_events_t * dap_events_new( ) /** * @brief dap_events_get_default * simply return s_events_default - * @return dap_events_t* + * @return dap_events_t* */ dap_events_t* dap_events_get_default( ) { @@ -291,9 +291,9 @@ void dap_events_delete( dap_events_t *a_events ) /** * @brief dap_events_remove_and_delete_socket_unsafe * calls dap_events_socket_remove_and_delete_unsafe - * @param a_events - * @param a_socket - * @param a_preserve_inheritor + * @param a_events + * @param a_socket + * @param a_preserve_inheritor */ void dap_events_remove_and_delete_socket_unsafe(dap_events_t *a_events, dap_events_socket_t *a_socket, bool a_preserve_inheritor) { @@ -349,10 +349,12 @@ int dap_events_start( dap_events_t *a_events ) struct timespec l_timeout; clock_gettime(CLOCK_REALTIME, &l_timeout); l_timeout.tv_sec+=15; + pthread_create( &s_threads[i].tid, NULL, dap_worker_thread, l_worker ); int l_ret; l_ret=pthread_cond_timedwait(&l_worker->started_cond, &l_worker->started_mutex, &l_timeout); pthread_mutex_unlock(&l_worker->started_mutex); + if ( l_ret== ETIMEDOUT ){ log_it(L_CRITICAL, "Timeout 15 seconds is out: worker #%u thread don't respond", i); return -2; @@ -361,13 +363,17 @@ int dap_events_start( dap_events_t *a_events ) return -3; } } + +#if 0 // @RRL: Bugfix-5434 // Link queues between for( uint32_t i = 0; i < s_threads_count; i++) { dap_worker_t * l_worker = s_workers[i]; + l_worker->queue_es_new_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* s_threads_count); l_worker->queue_es_delete_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* s_threads_count); l_worker->queue_es_reassign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* s_threads_count); l_worker->queue_es_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* s_threads_count); + for( uint32_t n = 0; n < s_threads_count; n++) { l_worker->queue_es_new_input[n] = dap_events_socket_queue_ptr_create_input(s_workers[n]->queue_es_new); l_worker->queue_es_delete_input[n] = dap_events_socket_queue_ptr_create_input(s_workers[n]->queue_es_delete); @@ -375,6 +381,7 @@ int dap_events_start( dap_events_t *a_events ) l_worker->queue_es_io_input[n] = dap_events_socket_queue_ptr_create_input(s_workers[n]->queue_es_io); } } +#endif // Init callback processor if (dap_proc_thread_init(s_threads_count) != 0 ){ @@ -452,10 +459,7 @@ dap_worker_t *dap_events_worker_get_auto( ) */ dap_worker_t * dap_events_worker_get(uint8_t a_index) { - if (a_index < s_threads_count){ - return s_workers[a_index]; - }else - return NULL; + return (a_index < s_threads_count) ? s_workers[a_index] : NULL; } /** @@ -463,8 +467,8 @@ dap_worker_t * dap_events_worker_get(uint8_t a_index) */ void dap_events_worker_print_all( ) { - uint32_t i; - for( i = 0; i < s_threads_count; i ++ ) { + + for( int i = 0; i < s_threads_count; i ++ ) { log_it( L_INFO, "Worker: %d, count open connections: %d", s_workers[i]->id, s_workers[i]->event_sockets_count ); } diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 68c7bcc6e95066191c91fa8b08303d54755c4da8..f5281d3ed4e767c786312105e45b7f5216b19e58 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -119,7 +119,7 @@ void *dap_worker_thread(void *arg) struct sched_param l_shed_params; l_shed_params.sched_priority = 0; #ifdef DAP_OS_WINDOWS - if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL)) + if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL)) log_it(L_ERROR, "Couldn'r set thread priority, err: %lu", GetLastError()); #else pthread_setschedparam(pthread_self(),SCHED_FIFO ,&l_shed_params); @@ -148,6 +148,7 @@ void *dap_worker_thread(void *arg) #else #error "Unimplemented socket array for this platform" #endif + l_worker->queue_es_new_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); l_worker->queue_es_delete_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); l_worker->queue_es_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); @@ -159,9 +160,18 @@ void *dap_worker_thread(void *arg) l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_es_io_callback); l_worker->queue_es_reassign = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_es_reassign_callback ); + + for( int n = 0; n < dap_events_worker_get_count(); n++) { + l_worker->queue_es_new_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_new); + l_worker->queue_es_delete_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_delete); + l_worker->queue_es_io_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_io); + l_worker->queue_es_reassign_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_reassign); + } + + l_worker->queue_callback = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_callback_callback); l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback); - + l_worker->timer_check_activity = dap_timerfd_create(s_connection_timeout * 1000 / 2, s_socket_all_check_activity, l_worker); dap_worker_add_events_socket_unsafe( l_worker->timer_check_activity->events_socket, l_worker); @@ -170,8 +180,8 @@ void *dap_worker_thread(void *arg) pthread_mutex_unlock(&l_worker->started_mutex); bool s_loop_is_active = true; while(s_loop_is_active) { - int l_selected_sockets; - size_t l_sockets_max; + int l_selected_sockets; + size_t l_sockets_max; #ifdef DAP_EVENTS_CAPS_EPOLL l_selected_sockets = epoll_wait(l_worker->epoll_fd, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1); l_sockets_max = l_selected_sockets; @@ -657,9 +667,9 @@ void *dap_worker_thread(void *arg) l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL); #ifdef DAP_OS_WINDOWS //dap_events_socket_set_writable_unsafe(l_cur,false); // enabling this will break windows server replies - l_errno = WSAGetLastError(); + l_errno = WSAGetLastError(); #else - l_errno = errno; + l_errno = errno; #endif } break; @@ -729,7 +739,7 @@ void *dap_worker_thread(void *arg) l_errno = errno; if (l_bytes_sent == -1 && l_errno == EINVAL) // To make compatible with other l_errno = EAGAIN; // non-blocking sockets -#elif defined (DAP_EVENTS_CAPS_KQUEUE) +#elif defined (DAP_EVENTS_CAPS_KQUEUE) struct kevent* l_event=&l_cur->kqueue_event; dap_events_socket_w_data_t * l_es_w_data = DAP_NEW_Z(dap_events_socket_w_data_t); l_es_w_data->esocket = l_cur; @@ -743,7 +753,7 @@ void *dap_worker_thread(void *arg) log_it(L_WARNING,"queue ptr send error: kevent %p errno: %d", l_es_w_data, l_errno); DAP_DELETE(l_es_w_data); } - + #else #error "Not implemented dap_events_socket_queue_ptr_send() for this platform" #endif @@ -1212,8 +1222,8 @@ int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_wo } struct kevent l_event; u_short l_flags = a_esocket->kqueue_base_flags; - u_int l_fflags = a_esocket->kqueue_base_fflags; - short l_filter = a_esocket->kqueue_base_filter; + u_int l_fflags = a_esocket->kqueue_base_fflags; + short l_filter = a_esocket->kqueue_base_filter; int l_kqueue_fd =a_worker->kqueue_fd; if ( l_kqueue_fd == -1 ){ log_it(L_ERROR, "Esocket is not assigned with anything ,exit");