diff --git a/dap-sdk/io/dap_context.c b/dap-sdk/io/dap_context.c index 3c7b7358469a9ddd8d53398420cb104941a08068..8271e4d9fd974010f6ce2384240fa1cbf33e6baa 100644 --- a/dap-sdk/io/dap_context.c +++ b/dap-sdk/io/dap_context.c @@ -84,8 +84,12 @@ #include "dap_context.h" #include "dap_worker.h" #include "dap_events_socket.h" -pthread_key_t g_dap_context_pth_key; +pthread_key_t g_dap_context_pth_key; // Thread-specific object with pointer on current context + +static void *s_context_thread(void *arg); // Context thread +static int s_thread_init(dap_context_t * a_context); +static int s_thread_loop(dap_context_t * a_context); /** * @brief dap_context_init @@ -114,18 +118,163 @@ int dap_context_init() dap_context_t * dap_context_new() { dap_context_t * l_context = DAP_NEW_Z(dap_context_t); - static uint32_t s_context_id_max = 0; + static atomic_uint_fast64_t s_context_id_max = 0; l_context->id = s_context_id_max; s_context_id_max++; return l_context; } +/** + * @brief dap_context_run Run new context in dedicated thread. + * @param a_context Context object + * @param a_cpu_id CPU id on wich it will be assigned (if platform allows). -1 means no CPU affinity + * @param a_sched_policy Schedule policy + * @param a_priority Thread priority. 0 means default + * @param a_flags Flags specified context. 0 if default + * @param a_callback_loop_before Callback thats executes in thread just after initializetion but before main loop begins + * @param a_callback_loop_after Callback thats executes in thread just after main loop stops + * @param a_callback_arg Custom argument for callbacks + * @return Returns zero if succes, others if error (pthread_create() return code) + */ +int dap_context_run(dap_context_t * a_context,int a_cpu_id, int a_sched_policy, int a_priority, + uint32_t a_flags, + dap_context_callback_t a_callback_loop_before, + dap_context_callback_t a_callback_loop_after, + void * a_callback_arg ) +{ + dap_context_msg_run_t * l_msg = DAP_NEW_Z(dap_context_msg_run_t); + int l_ret; + + // Check for OOM + if(! l_msg){ + log_it(L_CRITICAL, "Can't allocate memory for context create message"); + return ENOMEM; + } + + // Prefill message structure for new context's thread + l_msg->context = a_context; + l_msg->priority = a_priority; + l_msg->sched_policy = a_sched_policy; + l_msg->cpu_id = a_cpu_id; + l_msg->flags = a_flags; + l_msg->callback_started = a_callback_loop_before; + l_msg->callback_stopped = a_callback_loop_after; + l_msg->callback_arg = a_callback_arg; + + // If we have to wait for started thread (and initialization inside ) + if( a_flags & DAP_CONTEXT_FLAG_WAIT_FOR_STARTED){ + // Init kernel objects + pthread_mutex_init(&a_context->started_mutex, NULL); + pthread_cond_init( &a_context->started_cond, NULL); + + // Prepare timer + struct timespec l_timeout; + clock_gettime(CLOCK_REALTIME, &l_timeout); + l_timeout.tv_sec+=DAP_CONTEXT_WAIT_FOR_STARTED_TIME; + // Lock started mutex and try to run a thread + pthread_mutex_lock(&a_context->started_mutex); + + l_ret = pthread_create( &a_context->thread_id , NULL, s_context_thread, l_msg); + + if(l_ret == 0){ // If everything is good we're waiting for DAP_CONTEXT_WAIT_FOR_STARTED_TIME seconds + l_ret=pthread_cond_timedwait(&a_context->started_cond, &a_context->started_mutex, &l_timeout); + if ( l_ret== ETIMEDOUT ){ // Timeout + log_it(L_CRITICAL, "Timeout %d seconds is out: context #%u thread don't respond", DAP_CONTEXT_WAIT_FOR_STARTED_TIME,a_context->id); + } else if (l_ret != 0){ // Another error + log_it(L_CRITICAL, "Can't wait on condition: %d error code", l_ret); + } else // All is good + log_it(L_NOTICE, "Context %u started", a_context->id); + }else{ // Thread haven't started + log_it(L_ERROR,"Can't create new thread for context %u", a_context->id ); + DAP_DELETE(l_msg); + } + pthread_mutex_unlock(&a_context->started_mutex); + }else{ // Here we wait for nothing, just run it + l_ret = pthread_create( &a_context->thread_id , NULL, s_context_thread, l_msg); + if(l_ret != 0){ // Check for error, if present lets cleanup the memory for l_msg + log_it(L_ERROR,"Can't create new thread for context %u", a_context->id ); + DAP_DELETE(l_msg); + } + } + return l_ret; +} + +/** + * @brief s_context_thread Context working thread + * @param arg + * @return + */ +static void *s_context_thread(void *a_arg) +{ + dap_context_msg_run_t * l_msg = (dap_context_msg_run_t*) a_arg; + dap_context_t * l_context = l_msg->context; + + l_context->cpu_id = l_msg->cpu_id; + if(l_msg->cpu_id!=-1) + dap_cpu_assign_thread_on(l_msg->cpu_id ); + + +#ifdef DAP_OS_WINDOWS + if (!SetThreadPriority(GetCurrentThread(), l_msg->priority )) + log_it(L_ERROR, "Couldn'r set thread priority, err: %lu", GetLastError()); +#else + if(l_msg->priority != 0 && l_msg->sched_policy != DAP_CONTEXT_POLICY_DEFAUT ){ + struct sched_param l_sched_params = {0}; +#if defined (DAP_OS_LINUX) + int l_sched_policy= SCHED_BATCH; +#else + int l_sched_policy= SCHED_OTHER; +#endif + + l_sched_params.sched_priority = l_msg->priority; + switch(l_msg->sched_policy){ + case DAP_CONTEXT_POLICY_FIFO: l_sched_policy = SCHED_FIFO; break; + case DAP_CONTEXT_POLICY_ROUND_ROBIN: l_sched_policy = SCHED_RR; break; + default:; + } + + pthread_setschedparam(pthread_self(), l_sched_policy,&l_sched_params); + } +#endif + + if(s_thread_init(l_context)!=0){ + // Can't initialize + if(l_msg->flags & DAP_CONTEXT_FLAG_WAIT_FOR_STARTED ) + pthread_cond_broadcast(&l_context->started_cond); + return NULL; + } + // Now we're running and initalized for sure, so we can assign flags to the current context + l_context->running_flags = l_msg->flags; + + // Started callback execution + l_msg->callback_started(l_context, l_msg->callback_arg); + + // Initialization success + if(l_msg->flags & DAP_CONTEXT_FLAG_WAIT_FOR_STARTED ) + pthread_cond_broadcast(&l_context->started_cond); + + s_thread_loop(l_context); + + // Stopped callback execution + l_msg->callback_stopped(l_context, l_msg->callback_arg); + + log_it(L_NOTICE,"Exiting context #%u", l_context->id); + + // Free memory. Because nobody expected to work with context outside itself it have to be safe + pthread_cond_destroy(&l_context->started_cond); + pthread_mutex_destroy(&l_context->started_mutex); + DAP_DELETE(l_context); + + return NULL; +} + + /** * @brief dap_context_thread_init * @param a_context * @return */ -int dap_context_thread_init(dap_context_t * a_context) +static int s_thread_init(dap_context_t * a_context) { pthread_setspecific(g_dap_context_pth_key, a_context); @@ -147,18 +296,33 @@ int dap_context_thread_init(dap_context_t * a_context) a_context->poll_count_max = DAP_EVENTS_SOCKET_MAX; a_context->poll = DAP_NEW_Z_SIZE(struct pollfd,a_context->poll_count_max*sizeof (struct pollfd)); a_context->poll_esocket = DAP_NEW_Z_SIZE(dap_events_socket_t*,a_context->poll_count_max*sizeof (dap_events_socket_t*)); +#elif defined(DAP_EVENTS_CAPS_EPOLL) + a_context->epoll_fd = epoll_create( DAP_MAX_EVENTS_COUNT ); + //log_it(L_DEBUG, "Created event_fd %d for context %u", a_context->epoll_fd,i); +#ifdef DAP_OS_WINDOWS + if (!a_context->epoll_fd) { + int l_errno = WSAGetLastError(); +#else + if ( a_context->epoll_fd == -1 ) { + int l_errno = errno; +#endif + char l_errbuf[128]; + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + log_it(L_CRITICAL, "Error create epoll fd: %s (%d)", l_errbuf, l_errno); + return -1; + } #else -#error "Unimplemented socket array for this platform" +#error "Unimplemented dap_context_init for this platform" #endif return 0; } /** - * @brief dap_context_thread_loop + * @brief s_thread_loop * @param a_context * @return */ -int dap_context_thread_loop(dap_context_t * a_context) +static int s_thread_loop(dap_context_t * a_context) { int l_errno = 0, l_selected_sockets = 0; dap_events_socket_t *l_cur = NULL; @@ -835,8 +999,7 @@ int dap_context_thread_loop(dap_context_t * a_context) // Here we expect thats event duplicates goes together in it. If not - we lose some events between. } } - //dap_events_socket_remove_and_delete_unsafe( l_cur, false); - dap_events_remove_and_delete_socket_unsafe(dap_events_get_default(), l_cur, false); + dap_events_socket_remove_and_delete_unsafe( l_cur, false); #ifdef DAP_EVENTS_CAPS_KQUEUE a_context->kqueue_events_count--; #endif diff --git a/dap-sdk/io/dap_events.c b/dap-sdk/io/dap_events.c index 64f75ff883f7997c0b5106ba434b2715ae07bddb..8b28bc03b483d585ce1044566cf1046f2675c77b 100644 --- a/dap-sdk/io/dap_events.c +++ b/dap-sdk/io/dap_events.c @@ -104,8 +104,6 @@ bool g_debug_reactor = false; static int s_workers_init = 0; static uint32_t s_threads_count = 1; static dap_worker_t **s_workers = NULL; -static dap_thread_t *s_threads = NULL; -static dap_events_t * s_events_default = NULL; /** * @brief dap_get_cpu_count @@ -220,8 +218,7 @@ int dap_events_init( uint32_t a_threads_count, size_t a_conn_timeout ) s_threads_count = a_threads_count ? a_threads_count : l_cpu_count; s_workers = DAP_NEW_Z_SIZE(dap_worker_t*,s_threads_count*sizeof (dap_worker_t*) ); - s_threads = DAP_NEW_Z_SIZE(dap_thread_t, sizeof(dap_thread_t) * s_threads_count ); - if ( !s_workers || !s_threads ) + if ( !s_workers ) return -1; if(dap_context_init() != 0){ @@ -257,9 +254,7 @@ void dap_events_deinit( ) dap_events_socket_deinit(); dap_worker_deinit(); - dap_events_wait(s_events_default); - if ( s_threads ) - DAP_DELETE( s_threads ); + dap_events_wait(); if ( s_workers ) DAP_DELETE( s_workers ); @@ -267,145 +262,59 @@ void dap_events_deinit( ) s_workers_init = 0; } -/** - * @brief server_new Creates new empty instance of server_t - * Additionally checking s_events_default and create thread (pthread_key_create) - * @return New instance - */ -dap_events_t * dap_events_new( ) -{ - dap_events_t *ret = DAP_NEW_Z(dap_events_t); - - if ( s_events_default == NULL) - s_events_default = ret; - pthread_key_create( &ret->pth_key_worker, NULL); - - return ret; -} - -/** - * @brief dap_events_get_default - * simply return s_events_default - * @return dap_events_t* - */ -dap_events_t* dap_events_get_default( ) -{ - return s_events_default; -} - -/** - * @brief server_delete Delete event processor instance - * @param sh Pointer to the server instance - */ -void dap_events_delete( dap_events_t *a_events ) -{ - if (a_events) { - if ( a_events->_inheritor ) - DAP_DELETE( a_events->_inheritor ); - - DAP_DELETE( a_events ); - } -} /** - * @brief dap_events_remove_and_delete_socket_unsafe - * calls dap_events_socket_remove_and_delete_unsafe - * @param a_events - * @param a_socket - * @param a_preserve_inheritor - */ -void dap_events_remove_and_delete_socket_unsafe(dap_events_t *a_events, dap_events_socket_t *a_socket, bool a_preserve_inheritor) -{ - (void) a_events; -// int l_sock = a_socket->socket; -// if( a_socket->type == DESCRIPTOR_TYPE_TIMER) -// log_it(L_DEBUG,"Remove timer %d", l_sock); - - dap_events_socket_remove_and_delete_unsafe(a_socket, a_preserve_inheritor); -} - -/** - * @brief sa_server_loop Main server loop - * @param sh Server instance + * @brief dap_events_start Run main server loop * @return Zero if ok others if not */ -int dap_events_start( dap_events_t *a_events ) +int dap_events_start() { - - if ( !s_workers_init ) + int l_ret = -1; + if ( !s_workers_init ){ log_it(L_CRITICAL, "Event socket reactor has not been fired, use dap_events_init() first"); + goto lb_err; + } for( uint32_t i = 0; i < s_threads_count; i++) { dap_worker_t * l_worker = DAP_NEW_Z(dap_worker_t); l_worker->id = i; - l_worker->events = a_events; l_worker->context = dap_context_new(); l_worker->context->worker = l_worker; - pthread_mutex_init(& l_worker->started_mutex, NULL); - pthread_cond_init( & l_worker->started_cond, NULL); -#if defined(DAP_EVENTS_CAPS_EPOLL) - l_worker->epoll_fd = epoll_create( DAP_MAX_EVENTS_COUNT ); - //log_it(L_DEBUG, "Created event_fd %d for worker %u", l_worker->epoll_fd,i); -#ifdef DAP_OS_WINDOWS - if (!l_worker->epoll_fd) { - int l_errno = WSAGetLastError(); -#else - if ( l_worker->epoll_fd == -1 ) { - int l_errno = errno; -#endif - char l_errbuf[128]; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it(L_CRITICAL, "Error create epoll fd: %s (%d)", l_errbuf, l_errno); - DAP_DELETE(l_worker); - return -1; - } -#elif defined(DAP_EVENTS_CAPS_POLL) -#elif defined(DAP_EVENTS_CAPS_KQUEUE) -#else -#error "Not defined worker init for your platform" -#endif + s_workers[i] = l_worker; - pthread_mutex_lock(&l_worker->started_mutex); - struct timespec l_timeout; - clock_gettime(CLOCK_REALTIME, &l_timeout); - l_timeout.tv_sec+=15; - - pthread_create( &s_threads[i].tid, NULL, dap_worker_thread, l_worker ); - int l_ret; - l_ret=pthread_cond_timedwait(&l_worker->started_cond, &l_worker->started_mutex, &l_timeout); - pthread_mutex_unlock(&l_worker->started_mutex); - - if ( l_ret== ETIMEDOUT ){ - log_it(L_CRITICAL, "Timeout 15 seconds is out: worker #%u thread don't respond", i); - return -2; - } else if (l_ret != 0){ - log_it(L_CRITICAL, "Can't wait on condition: %d error code", l_ret); - return -3; + + l_ret = dap_context_run(l_worker->context,i,DAP_CONTEXT_POLICY_FIFO,-1, 0, dap_worker_context_callback_started, + dap_worker_context_callback_stopped, l_worker); + if(l_ret != 0){ + log_it(L_CRITICAL, "Can't run worker #%u",i); + goto lb_err; } } // Init callback processor if (dap_proc_thread_init(s_threads_count) != 0 ){ log_it( L_CRITICAL, "Can't init proc threads" ); - return -4; + l_ret = -4; + goto lb_err; } return 0; +lb_err: + log_it(L_CRITICAL,"Events init failed with code %d", l_ret); + return l_ret; } /** * @brief dap_events_wait - * @param dap_events_t *a_events * @return */ -int dap_events_wait( dap_events_t *a_events ) +int dap_events_wait( ) { - (void) a_events; for( uint32_t i = 0; i < s_threads_count; i++ ) { void *ret; - pthread_join( s_threads[i].tid, &ret ); + pthread_join( s_workers[i]->context->thread_id , &ret ); } return 0; } @@ -430,7 +339,7 @@ void dap_events_stop_all( ) * @brief dap_worker_get_index_min * @return */ -uint32_t dap_events_worker_get_index_min( ) +uint32_t dap_events_thread_get_index_min( ) { uint32_t min = 0; @@ -445,7 +354,7 @@ uint32_t dap_events_worker_get_index_min( ) return min; } -uint32_t dap_events_worker_get_count() +uint32_t dap_events_thread_get_count() { return s_threads_count; } @@ -459,7 +368,7 @@ dap_worker_t *dap_events_worker_get_auto( ) if ( !s_workers_init ) log_it(L_CRITICAL, "Event socket reactor has not been fired, use dap_events_init() first"); - return s_workers[dap_events_worker_get_index_min()]; + return s_workers[dap_events_thread_get_index_min()]; } /** @@ -478,7 +387,7 @@ dap_worker_t * dap_events_worker_get(uint8_t a_index) /** * @brief dap_worker_print_all */ -void dap_events_worker_print_all( ) +void dap_worker_print_all( ) { if ( !s_workers_init ) log_it(L_CRITICAL, "Event socket reactor has not been fired, use dap_events_init() first"); diff --git a/dap-sdk/io/dap_events_socket.c b/dap-sdk/io/dap_events_socket.c index 7ea684c7282c11731e7f0177963ae8cb6f5fbc96..b1b50db460ea2d30512b11ef5639ca9071ee0140 100644 --- a/dap-sdk/io/dap_events_socket.c +++ b/dap-sdk/io/dap_events_socket.c @@ -182,10 +182,8 @@ void __stdcall mq_receive_cb(HRESULT hr, QUEUEHANDLE qh, DWORD timeout * @param a_callbacks * @return */ -dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, - int a_sock, dap_events_socket_callbacks_t *a_callbacks ) +dap_events_socket_t *dap_events_socket_wrap_no_add( int a_sock, dap_events_socket_callbacks_t *a_callbacks ) { - assert(a_events); assert(a_callbacks); dap_events_socket_t *l_ret = DAP_NEW_Z( dap_events_socket_t ); @@ -349,7 +347,7 @@ dap_events_socket_t * dap_events_socket_create(dap_events_desc_type_t a_type, da return NULL; } #endif - dap_events_socket_t * l_es =dap_events_socket_wrap_no_add(dap_events_get_default(),l_sock,a_callbacks); + dap_events_socket_t * l_es =dap_events_socket_wrap_no_add(l_sock,a_callbacks); if(!l_es){ log_it(L_CRITICAL,"Can't allocate memory for the new esocket"); return NULL; @@ -1061,17 +1059,14 @@ void dap_events_socket_delete_mt(dap_worker_t * a_worker, dap_events_socket_uuid } /** - * @brief dap_events_socket_wrap - * @param a_events - * @param w - * @param s + * @brief dap_events_socket_wrap2 + * @param a_server + * @param a_sock * @param a_callbacks * @return */ -dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct dap_events *a_events, - int a_sock, dap_events_socket_callbacks_t *a_callbacks ) +dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, int a_sock, dap_events_socket_callbacks_t *a_callbacks ) { - assert( a_events ); assert( a_callbacks ); assert( a_server ); @@ -1203,14 +1198,14 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool */ bool s_remove_and_delete_unsafe_delayed_delete_callback(void * a_arg) { - dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); + dap_worker_t * l_worker = dap_worker_get_current(); dap_events_socket_uuid_w_data_t * l_es_handler = (dap_events_socket_uuid_w_data_t*) a_arg; assert(l_es_handler); assert(l_worker); dap_events_socket_t * l_es; if( (l_es = dap_context_esocket_find_by_uuid(l_worker->context, l_es_handler->esocket_uuid)) != NULL) //dap_events_socket_remove_and_delete_unsafe(l_es,l_es_handler->value == 1); - dap_events_remove_and_delete_socket_unsafe(dap_events_get_default(), l_es, l_es_handler->value == 1); + dap_events_socket_remove_and_delete_unsafe( l_es, l_es_handler->value == 1); DAP_DELETE(l_es_handler); return false; diff --git a/dap-sdk/io/dap_proc_thread.c b/dap-sdk/io/dap_proc_thread.c index 70d2e1653fd9721e0eaafccae67903c84ccee39b..a40a0d53b1dba67f4ec9514e903bc93268c57955 100644 --- a/dap-sdk/io/dap_proc_thread.c +++ b/dap-sdk/io/dap_proc_thread.c @@ -61,13 +61,12 @@ typedef cpuset_t cpu_set_t; // Adopt BSD CPU setstructure to POSIX variant static size_t s_threads_count = 0; static dap_proc_thread_t * s_threads = NULL; -static dap_slist_t s_custom_threads = $DAP_SLIST_INITALIZER; /* Customized proc threads out of the pool */ -static pthread_rwlock_t s_custom_threads_rwlock = PTHREAD_RWLOCK_INITIALIZER; /* Lock to protect <s_custom_threads> */ - - static void *s_proc_thread_function(void * a_arg); static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags); +static void s_context_callback_started( dap_context_t * a_context, void *a_arg); +static void s_context_callback_stopped( dap_context_t * a_context, void *a_arg); + /** * @brief dap_proc_thread_init * @param a_cpu_count 0 means autodetect @@ -84,27 +83,22 @@ int l_ret = 0; for (uint32_t i = 0; i < s_threads_count; i++ ) { dap_proc_thread_t * l_thread = s_threads + i; - l_thread->cpu_id = i; l_thread->context = dap_context_new(); l_thread->context->proc_thread = l_thread; - pthread_cond_init(&l_thread->started_cond, NULL); - pthread_mutex_init( &l_thread->started_mutex,NULL); - pthread_mutex_lock( &l_thread->started_mutex ); - - if ( (l_ret = pthread_create( &l_thread->thread_id,NULL, s_proc_thread_function, &s_threads[i] )) ) { + if ( (l_ret = dap_context_run(l_thread->context,i,DAP_CONTEXT_POLICY_TIMESHARING,2, + DAP_CONTEXT_FLAG_WAIT_FOR_STARTED, s_context_callback_started, + s_context_callback_stopped,l_thread) ) ) { log_it(L_CRITICAL, "Create thread failed with code %d", l_ret); - pthread_mutex_unlock( &l_thread->started_mutex ); return l_ret; } - pthread_cond_wait( &l_thread->started_cond, &l_thread->started_mutex); - pthread_mutex_unlock( &l_thread->started_mutex); } return l_ret; } + /** * @brief dap_proc_thread_deinit */ @@ -116,22 +110,9 @@ void dap_proc_thread_deinit() for (uint32_t i = s_threads_count; i--; ){ dap_events_socket_event_signal(s_threads[i].event_exit, 1); - pthread_join(s_threads[i].thread_id, NULL); + pthread_join(s_threads[i].context->thread_id, NULL); } - // Cleaning custom proc threads - pthread_rwlock_wrlock(&s_custom_threads_rwlock); - - for ( uint32_t i = s_custom_threads.nr; i--; ) { - if ( s_dap_slist_get4head (&s_custom_threads, (void **) &l_proc_thread, &l_sz) ) - break; - - dap_events_socket_event_signal(l_proc_thread->event_exit, 1); - pthread_join(l_proc_thread->thread_id, NULL); - DAP_DELETE(l_proc_thread); - } - - pthread_rwlock_unlock(&s_custom_threads_rwlock); // Signal to cancel working threads and wait for finish // TODO: Android realization @@ -175,34 +156,6 @@ unsigned l_id_min = 0, l_size_min = UINT32_MAX, l_queue_size; return &s_threads[l_id_min]; } -/** - * @brief dap_proc_thread_run_custom Create custom proc thread for specified task - * @return - */ -dap_proc_thread_t * dap_proc_thread_run_custom(void) -{ - dap_proc_thread_t * l_proc_thread = DAP_NEW_Z(dap_proc_thread_t); - int l_ret; - - if (l_proc_thread == NULL) - return log_it(L_CRITICAL,"Out of memory, can't create new proc thread, errno=%d", errno), NULL; - - pthread_mutex_lock( &l_proc_thread->started_mutex ); - - if ( (l_ret = pthread_create( &l_proc_thread->thread_id ,NULL, s_proc_thread_function, l_proc_thread )) ) { - log_it(L_CRITICAL, "Create thread failed with code %d", l_ret); - DAP_DEL_Z (l_proc_thread); - }else{ - pthread_cond_wait( &l_proc_thread->started_cond, &l_proc_thread->started_mutex); - pthread_rwlock_wrlock(&s_custom_threads_rwlock); - assert ( !s_dap_slist_add2tail (&s_custom_threads, l_proc_thread, sizeof(l_proc_thread)) ); - pthread_rwlock_unlock(&s_custom_threads_rwlock); - } - - pthread_mutex_unlock( &l_proc_thread->started_mutex ); - return l_proc_thread; -} - /** * @brief s_proc_event_callback - get from queue next element and execute action routine, * repeat execution depending on status is returned by action routine. @@ -352,55 +305,34 @@ dap_events_socket_t * dap_proc_thread_create_queue_ptr(dap_proc_thread_t * a_thr } /** - * @brief s_proc_thread_function + * @brief s_context_callback_started + * @param a_context * @param a_arg - * @return */ -static void * s_proc_thread_function(void * a_arg) +static void s_context_callback_started( dap_context_t * a_context, void *a_arg) { - dap_proc_thread_t * l_thread = (dap_proc_thread_t*) a_arg; assert(l_thread); - dap_context_t * l_context = l_thread->context; - assert(l_context); - dap_cpu_assign_thread_on(l_thread->cpu_id); - - struct sched_param l_shed_params; - l_shed_params.sched_priority = 0; -#if defined(DAP_OS_WINDOWS) - if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST)) - log_it(L_ERROR, "Couldn't set thread priority, err: %lu", GetLastError()); -#elif defined (DAP_OS_LINUX) - pthread_setschedparam(pthread_self(),SCHED_BATCH ,&l_shed_params); -#elif defined (DAP_OS_BSD) - pthread_setschedparam(pthread_self(),SCHED_OTHER ,&l_shed_params); -#else -#error "Undefined set sched param" -#endif - if(dap_context_thread_init(l_thread->context)!=0){ - pthread_cond_broadcast(&l_thread->started_cond); - return NULL; - } - l_thread->proc_queue = dap_proc_queue_create(l_thread); // Init proc_queue for related worker - dap_worker_t * l_worker_related = dap_events_worker_get(l_thread->cpu_id); + dap_worker_t * l_worker_related = dap_events_worker_get(l_thread->context->cpu_id); assert(l_worker_related); + l_worker_related->proc_queue = l_thread->proc_queue; l_worker_related->proc_queue_input = dap_events_socket_queue_ptr_create_input(l_worker_related->proc_queue->esocket); dap_events_socket_assign_on_worker_mt(l_worker_related->proc_queue_input,l_worker_related); - l_thread->proc_event = dap_context_create_esocket_event( l_context , s_proc_event_callback); + l_thread->proc_event = dap_context_create_esocket_event( a_context , s_proc_event_callback); l_thread->proc_event->proc_thread = l_thread; - l_thread->event_exit = dap_context_create_esocket_event(l_context, s_event_exit_callback); + l_thread->event_exit = dap_context_create_esocket_event( a_context, s_event_exit_callback); l_thread->event_exit->proc_thread = l_thread; l_thread->proc_event->_inheritor = l_thread; // we pass thread through it l_thread->event_exit->_inheritor = l_thread; - size_t l_workers_count= dap_events_worker_get_count(); + size_t l_workers_count= dap_events_thread_get_count(); assert(l_workers_count); l_thread->queue_assign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)*l_workers_count ); l_thread->queue_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)*l_workers_count ); @@ -416,7 +348,7 @@ static void * s_proc_thread_function(void * a_arg) } - for (size_t n = 0; n< dap_events_worker_get_count(); n++){ + for (size_t n = 0; n< dap_events_thread_get_count(); n++){ // Queue asssign dap_proc_thread_assign_esocket_unsafe(l_thread, l_thread->queue_assign_input[n]); @@ -427,25 +359,27 @@ static void * s_proc_thread_function(void * a_arg) dap_proc_thread_assign_esocket_unsafe(l_thread, l_thread->queue_callback_input[n]); } +} - //We've started! - pthread_cond_broadcast(&l_thread->started_cond); - - - - dap_context_thread_loop(l_thread->context); - - log_it(L_ATT, "Stop processing thread #%u", l_thread->cpu_id); +/** + * @brief s_context_callback_stopped + * @param a_context + * @param a_arg + */ +static void s_context_callback_stopped( dap_context_t * a_context, void *a_arg) +{ + dap_proc_thread_t * l_thread = (dap_proc_thread_t*) a_arg; + assert(l_thread); + log_it(L_ATT, "Stop processing thread #%u", l_thread->context->cpu_id); // cleanip inputs - for (size_t n=0; n<dap_events_worker_get_count(); n++){ + for (size_t n=0; n<dap_events_thread_get_count(); n++){ dap_events_socket_delete_unsafe(l_thread->queue_assign_input[n], false); dap_events_socket_delete_unsafe(l_thread->queue_io_input[n], false); dap_events_socket_delete_unsafe(l_thread->queue_callback_input[n], false); } - - return NULL; } + /** * @brief dap_proc_thread_assign_on_worker_inter * @param a_thread @@ -560,7 +494,7 @@ static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags) dap_proc_thread_t * l_thread = (dap_proc_thread_t *) a_es->_inheritor; l_thread->context->signal_exit = true; if(g_debug_reactor) - log_it(L_DEBUG, "Proc_thread :%u signaled to exit", l_thread->cpu_id); + log_it(L_DEBUG, "Proc_thread :%u signaled to exit", l_thread->context->cpu_id); } diff --git a/dap-sdk/io/dap_server.c b/dap-sdk/io/dap_server.c index 84471b9c639105addb64cd7d368817654466c27b..c355da21e5c6a55240aa2b0ed5836cafe90dc427 100644 --- a/dap-sdk/io/dap_server.c +++ b/dap-sdk/io/dap_server.c @@ -73,7 +73,7 @@ #define LOG_TAG "dap_server" -static dap_events_socket_t * s_es_server_create(dap_events_t * a_events, int a_sock, +static dap_events_socket_t * s_es_server_create(int a_sock, dap_events_socket_callbacks_t * a_callbacks, dap_server_t * a_server); static int s_server_run(dap_server_t * a_server, dap_events_socket_callbacks_t *a_callbacks ); static void s_es_server_accept(dap_events_socket_t *a_es, SOCKET a_remote_socket, struct sockaddr* a_remote_addr); @@ -128,14 +128,12 @@ void dap_server_delete(dap_server_t *a_server) * @param a_callbacks * @return */ -dap_server_t* dap_server_new_local(dap_events_t *a_events, const char * a_path, const char* a_mode, dap_events_socket_callbacks_t *a_callbacks) +dap_server_t* dap_server_new_local(const char * a_path, const char* a_mode, dap_events_socket_callbacks_t *a_callbacks) { - assert(a_events); #ifdef DAP_OS_UNIX dap_server_t *l_server = DAP_NEW_Z(dap_server_t); l_server->socket_listener=-1; // To diff it from 0 fd l_server->type = SERVER_LOCAL; - l_server->events = a_events; l_server->socket_listener = socket(AF_LOCAL, SOCK_STREAM, 0); if (l_server->socket_listener < 0) { int l_errno = errno; @@ -180,9 +178,8 @@ dap_server_t* dap_server_new_local(dap_events_t *a_events, const char * a_path, * @param a_type * @return */ -dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16_t a_port, dap_server_type_t a_type, dap_events_socket_callbacks_t *a_callbacks) +dap_server_t* dap_server_new(const char * a_addr, uint16_t a_port, dap_server_type_t a_type, dap_events_socket_callbacks_t *a_callbacks) { - assert(a_events); dap_server_t *l_server = DAP_NEW_Z(dap_server_t); #ifndef DAP_OS_WINDOWS l_server->socket_listener=-1; // To diff it from 0 fd @@ -190,7 +187,6 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16 l_server->address = a_addr ? strdup(a_addr) : strdup("0.0.0.0"); // If NULL we listen everything l_server->port = a_port; l_server->type = a_type; - l_server->events = a_events; if(l_server->type == SERVER_TCP) l_server->socket_listener = socket(AF_INET, SOCK_STREAM, 0); @@ -321,7 +317,7 @@ static int s_server_run(dap_server_t * a_server, dap_events_socket_callbacks_t * // or not dap_worker_t *l_w = dap_events_worker_get_auto(); assert(l_w); - dap_events_socket_t * l_es = dap_events_socket_wrap2( a_server, a_server->events, a_server->socket_listener, &l_callbacks); + dap_events_socket_t * l_es = dap_events_socket_wrap2( a_server, a_server->socket_listener, &l_callbacks); if (l_es) { a_server->es_listeners = dap_list_append(a_server->es_listeners, l_es); l_es->type = a_server->type == SERVER_TCP ? DESCRIPTOR_TYPE_SOCKET_LISTENING : DESCRIPTOR_TYPE_SOCKET_UDP; @@ -383,7 +379,7 @@ static void s_es_server_accept(dap_events_socket_t *a_es, SOCKET a_remote_socket dap_events_socket_t * l_es_new = NULL; log_it(L_DEBUG, "Listening socket (binded on %s:%u) got new incomming connection",l_server->address,l_server->port); log_it(L_DEBUG, "Accepted new connection (sock %"DAP_FORMAT_SOCKET" from %"DAP_FORMAT_SOCKET")", a_remote_socket, a_es->socket); - l_es_new = s_es_server_create(dap_events_get_default(),a_remote_socket,&l_server->client_callbacks,l_server); + l_es_new = s_es_server_create(a_remote_socket,&l_server->client_callbacks,l_server); //l_es_new->is_dont_reset_write_flag = true; // By default all income connection has this flag getnameinfo(a_remote_addr,a_remote_addr_size, l_es_new->hostaddr ,256, l_es_new->service,sizeof(l_es_new->service), @@ -406,15 +402,15 @@ static void s_es_server_accept(dap_events_socket_t *a_es, SOCKET a_remote_socket * @param a_server * @return */ -static dap_events_socket_t * s_es_server_create(dap_events_t * a_events, int a_sock, - dap_events_socket_callbacks_t * a_callbacks, dap_server_t * a_server) +static dap_events_socket_t * s_es_server_create(int a_sock, dap_events_socket_callbacks_t * a_callbacks, + dap_server_t * a_server) { dap_events_socket_t * ret = NULL; if (a_sock > 0) { // set it nonblock //fcntl(a_sock, F_SETFL, O_NONBLOCK); - ret = dap_events_socket_wrap_no_add(a_events, a_sock, a_callbacks); + ret = dap_events_socket_wrap_no_add(a_sock, a_callbacks); ret->type = DESCRIPTOR_TYPE_SOCKET_CLIENT; ret->server = a_server; ret->hostaddr = DAP_NEW_Z_SIZE(char, 256); diff --git a/dap-sdk/io/dap_timerfd.c b/dap-sdk/io/dap_timerfd.c index 55d6b02dd0994323c46ac4e84cc8fe9e3928fd1e..9c49828ad9b24c5718a974f728c94e984de12aa4 100644 --- a/dap-sdk/io/dap_timerfd.c +++ b/dap-sdk/io/dap_timerfd.c @@ -147,7 +147,7 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t memset(&l_s_callbacks,0,sizeof (l_s_callbacks)); l_s_callbacks.timer_callback = s_es_callback_timer; - dap_events_socket_t * l_events_socket = dap_events_socket_wrap_no_add(dap_events_get_default(), -1, &l_s_callbacks); + dap_events_socket_t * l_events_socket = dap_events_socket_wrap_no_add(-1, &l_s_callbacks); l_events_socket->type = DESCRIPTOR_TYPE_TIMER; // pass l_timerfd to events_socket diff --git a/dap-sdk/io/dap_worker.c b/dap-sdk/io/dap_worker.c index 7c5293241abbdadd3077d44c04f5f9c6e7639c26..22b72b1ca88e256b6379aaecd9b4f14c15828906 100644 --- a/dap-sdk/io/dap_worker.c +++ b/dap-sdk/io/dap_worker.c @@ -41,6 +41,8 @@ #define LOG_TAG "dap_worker" +pthread_key_t g_pth_key_worker; + static time_t s_connection_timeout = 60; // seconds static bool s_socket_all_check_activity( void * a_arg); @@ -62,6 +64,8 @@ int dap_worker_init( size_t a_conn_timeout ) if ( a_conn_timeout ) s_connection_timeout = a_conn_timeout; + pthread_key_create( &g_pth_key_worker, NULL); + return 0; } @@ -70,65 +74,58 @@ void dap_worker_deinit( ) } /** - * @brief dap_worker_thread - * @param arg + * @brief dap_worker_context_callback_started + * @param a_context + * @param a_arg * @return */ -void *dap_worker_thread(void *arg) +void dap_worker_context_callback_started( dap_context_t * a_context, void *a_arg) { - dap_worker_t *l_worker = (dap_worker_t *) arg; + dap_worker_t *l_worker = (dap_worker_t *) a_arg; assert(l_worker); - dap_context_t * l_context = l_worker->context; - const struct sched_param l_shed_params = {0}; - - - dap_cpu_assign_thread_on(l_worker->id); - pthread_setspecific(l_worker->events->pth_key_worker, l_worker); - -#ifdef DAP_OS_WINDOWS - if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL)) - log_it(L_ERROR, "Couldn'r set thread priority, err: %lu", GetLastError()); -#else - pthread_setschedparam(pthread_self(),SCHED_FIFO ,&l_shed_params); -#endif - - if(dap_context_thread_init(l_context)!=0){ - pthread_cond_broadcast(&l_worker->started_cond); - return NULL; - } + pthread_setspecific(g_pth_key_worker, l_worker); + l_worker->queue_es_new_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_thread_get_count() ); + l_worker->queue_es_delete_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_thread_get_count() ); + l_worker->queue_es_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_thread_get_count() ); + l_worker->queue_es_reassign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_thread_get_count() ); - l_worker->queue_es_new_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); - l_worker->queue_es_delete_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); - l_worker->queue_es_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); - l_worker->queue_es_reassign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); + l_worker->queue_es_new = dap_context_create_esocket_queue(a_context, s_queue_add_es_callback); + l_worker->queue_es_delete = dap_context_create_esocket_queue(a_context, s_queue_delete_es_callback); + l_worker->queue_es_io = dap_context_create_esocket_queue(a_context, s_queue_es_io_callback); + l_worker->queue_es_reassign = dap_context_create_esocket_queue(a_context, s_queue_es_reassign_callback ); - l_worker->queue_es_new = dap_context_create_esocket_queue(l_context, s_queue_add_es_callback); - l_worker->queue_es_delete = dap_context_create_esocket_queue(l_context, s_queue_delete_es_callback); - l_worker->queue_es_io = dap_context_create_esocket_queue(l_context, s_queue_es_io_callback); - l_worker->queue_es_reassign = dap_context_create_esocket_queue(l_context, s_queue_es_reassign_callback ); - - for( size_t n = 0; n < dap_events_worker_get_count(); n++) { + for( size_t n = 0; n < dap_events_thread_get_count(); n++) { l_worker->queue_es_new_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_new); l_worker->queue_es_delete_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_delete); l_worker->queue_es_io_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_io); l_worker->queue_es_reassign_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_reassign); } - l_worker->queue_callback = dap_context_create_esocket_queue(l_context, s_queue_callback_callback); - l_worker->event_exit = dap_context_create_esocket_event(l_context, s_event_exit_callback); + l_worker->queue_callback = dap_context_create_esocket_queue(a_context, s_queue_callback_callback); + l_worker->event_exit = dap_context_create_esocket_event(a_context, s_event_exit_callback); l_worker->timer_check_activity = dap_timerfd_create(s_connection_timeout * 1000 / 2, s_socket_all_check_activity, l_worker); dap_worker_add_events_socket_unsafe( l_worker->timer_check_activity->events_socket, l_worker); - pthread_cond_broadcast(&l_worker->started_cond); - dap_context_thread_loop(l_context); +} + +/** + * @brief dap_worker_context_callback_stopped + * @param a_context + * @param a_arg + * @return + */ +void dap_worker_context_callback_stopped( dap_context_t * a_context, void *a_arg) +{ + dap_worker_t *l_worker = (dap_worker_t *) a_arg; + assert(l_worker); log_it(L_NOTICE,"Exiting thread #%u", l_worker->id); - return NULL; } + /** * @brief s_new_es_callback * @param a_es diff --git a/dap-sdk/io/include/dap_context.h b/dap-sdk/io/include/dap_context.h index 182a1fb4213dfc93074bc803526968ec2eb7b7f9..b4f8601480256008f7373d83970adc7f32de63b3 100644 --- a/dap-sdk/io/include/dap_context.h +++ b/dap-sdk/io/include/dap_context.h @@ -30,14 +30,41 @@ typedef struct dap_worker dap_worker_t; typedef struct dap_proc_thread dap_proc_thread_t; +typedef struct dap_context dap_context_t; +typedef void (*dap_context_callback_t) (dap_context_t *,void * ); // Callback for specific client operations +typedef struct dap_context_msg_callback_{ + dap_context_t * context; + dap_context_callback_t callback; + void * arg; +} dap_context_msg_callback_t; + +typedef struct dap_context_msg_run{ + dap_context_t * context; + dap_context_callback_t callback_started; + dap_context_callback_t callback_stopped; + int priority; + int sched_policy; + int cpu_id; + int flags; + void * callback_arg; +} dap_context_msg_run_t; + typedef struct dap_context { uint32_t id; // Context ID + int cpu_id; // CPU id (if assigned) // Compatibility fields, in future should be replaced with _inheritor dap_proc_thread_t * proc_thread; // If the context belongs to proc_thread dap_worker_t * worker; // If the context belongs to worker -#if defined DAP_EVENTS_CAPS_MSMQ + // pthread-related fields + pthread_cond_t started_cond; // Fires when thread started and pre-loop callback executes + pthread_mutex_t started_mutex; // related with started_cond + pthread_t thread_id; // Thread id + + + /// Platform-specific fields +#if defined DAP_EVENTS_CAPS_MSMQqq HANDLE msmq_events[MAXIMUM_WAIT_OBJECTS]; #endif @@ -67,23 +94,57 @@ typedef struct dap_context { // Signal to exit bool signal_exit; - + // Flags + bool is_running; // Is running + uint32_t running_flags; // Flags passed for _run function } dap_context_t; + +// Waiting for started before exit _run function/ +// ATTENTION: callback_started() executed just before exit a _run function + +#define DAP_CONTEXT_FLAG_WAIT_FOR_STARTED 0x00000001 +#define DAP_CONTEXT_FLAG_EXIT_IF_ERROR 0x00000100 + +// Usual policies +#define DAP_CONTEXT_POLICY_DEFAUT 0 +#define DAP_CONTEXT_POLICY_TIMESHARING 1 +// Real-time policies. +#define DAP_CONTEXT_POLICY_FIFO 2 +#define DAP_CONTEXT_POLICY_ROUND_ROBIN 3 + +// If set DAP_CONTEXT_FLAG_WAIT_FOR_STARTED thats time for waiting for (in seconds) +#define DAP_CONTEXT_WAIT_FOR_STARTED_TIME 15 + +// pthread kernel object for current context pointer extern pthread_key_t g_dap_context_pth_key; -static inline dap_context_t * dap_context_current(){ - return (dap_context_t*) pthread_getspecific(g_dap_context_pth_key); -} +/// Next functions are thread-safe int dap_context_init(); // Init +void dap_context_deinit(); // Deinit -// New context create. Thread-safe functions +// New context create and run. dap_context_t * dap_context_new(); +// Run new context in dedicated thread. +// ATTENTION: after running the context nobody have to access it outside its own running thread +// Please use queues for access if need it +int dap_context_run(dap_context_t * a_context,int a_cpu_id, int a_sched_policy, int a_priority,uint32_t a_flags, + dap_context_callback_t a_callback_started, + dap_context_callback_t a_callback_stopped, + void * a_callback_arg ); + +/** + * @brief dap_context_current Get current context + * @return Returns current context(if present, if not returns NULL) + */ +static inline dap_context_t * dap_context_current() +{ + return (dap_context_t*) pthread_getspecific(g_dap_context_pth_key); +} + /// ALL THIS FUNCTIONS ARE UNSAFE ! CALL THEM ONLY INSIDE THEIR OWN CONTEXT!! -int dap_context_thread_init(dap_context_t * a_context); -int dap_context_thread_loop(dap_context_t * a_context); int dap_context_add_esocket(dap_context_t * a_context, dap_events_socket_t * a_esocket ); int dap_context_poll_update(dap_events_socket_t * a_esocket); diff --git a/dap-sdk/io/include/dap_events.h b/dap-sdk/io/include/dap_events.h index c1071d5aac1a3efa8d5002ed6ccdf0f8fe14c1bd..8c45cef0efd5c36040d4c25c73a332cf20847efd 100644 --- a/dap-sdk/io/include/dap_events.h +++ b/dap-sdk/io/include/dap_events.h @@ -27,21 +27,8 @@ #include "dap_events_socket.h" #include "dap_server.h" #include "dap_worker.h" -struct dap_events; #define DAP_MAX_EVENTS_COUNT 8192 -typedef void (*dap_events_callback_t) (struct dap_events *, void *arg); // Callback for specific server's operations - -typedef struct dap_thread { - pthread_t tid; -} dap_thread_t; - -typedef struct dap_events { - pthread_key_t pth_key_worker; - void *_inheritor; // Pointer to the internal data, HTTP for example - dap_thread_t proc_thread; -} dap_events_t; - #ifdef __cplusplus extern "C" { @@ -49,30 +36,23 @@ extern "C" { extern bool g_debug_reactor; -int dap_events_init( uint32_t a_threads_count, size_t a_conn_timeout ); // Init server module -void dap_events_deinit( ); // Deinit server module +int dap_events_init( uint32_t a_threads_count, size_t a_conn_timeout ); // Init events module +void dap_events_deinit( ); // Deinit events module -dap_events_t* dap_events_new( ); -dap_events_t* dap_events_get_default( ); -void dap_events_delete( dap_events_t * a_events ); -void dap_events_remove_and_delete_socket_unsafe(dap_events_t*, dap_events_socket_t*, bool); -int32_t dap_events_start( dap_events_t *a_events ); +int32_t dap_events_start( ); void dap_events_stop_all(); -int32_t dap_events_wait( dap_events_t *a_events ); +int32_t dap_events_wait(); -void dap_events_worker_print_all( ); -uint32_t dap_events_worker_get_index_min( ); -uint32_t dap_events_worker_get_count(); +void dap_worker_print_all( ); +uint32_t dap_events_thread_get_index_min( ); +uint32_t dap_events_thread_get_count(); dap_worker_t *dap_events_worker_get_auto( ); dap_worker_t * dap_events_worker_get(uint8_t a_index); uint32_t dap_get_cpu_count(); void dap_cpu_assign_thread_on(uint32_t a_cpu_id); -static inline dap_worker_t * dap_events_get_current_worker(dap_events_t * a_events){ - return (dap_worker_t*) pthread_getspecific(a_events->pth_key_worker); -} #ifdef __cplusplus } diff --git a/dap-sdk/io/include/dap_events_socket.h b/dap-sdk/io/include/dap_events_socket.h index 52086076a9a63d1f9543f685d818e638686d9e48..50f5688b4431a499eaa9fd97a4a59fb09a5a3ecf 100644 --- a/dap-sdk/io/include/dap_events_socket.h +++ b/dap-sdk/io/include/dap_events_socket.h @@ -111,7 +111,6 @@ typedef int SOCKET; // If set - queue limited to sizeof(void*) size of data transmitted #define DAP_SOCK_QUEUE_PTR BIT( 8 ) -typedef struct dap_events dap_events_t; typedef struct dap_events_socket dap_events_socket_t; typedef struct dap_worker dap_worker_t; typedef struct dap_proc_thread dap_proc_thread_t ; @@ -245,7 +244,6 @@ typedef struct dap_events_socket { // Links to related objects -// dap_events_t *events; dap_context_t * context; dap_proc_thread_t * proc_thread; // If assigned on dap_proc_thread_t object dap_server_t *server; // If this socket assigned with server @@ -331,10 +329,8 @@ int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_preserve_inheritor); void dap_events_socket_delete_mt(dap_worker_t * a_worker, dap_events_socket_uuid_t a_es_uuid); -dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, - int a_sock, dap_events_socket_callbacks_t *a_callbacks ); -dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct dap_events *a_events, - int a_sock, dap_events_socket_callbacks_t *a_callbacks ); +dap_events_socket_t *dap_events_socket_wrap_no_add( int a_sock, dap_events_socket_callbacks_t *a_callbacks ); +dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, int a_sock, dap_events_socket_callbacks_t *a_callbacks ); void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct dap_worker * a_worker); void dap_events_socket_assign_on_worker_inter(dap_events_socket_t * a_es_input, dap_events_socket_t * a_es); diff --git a/dap-sdk/io/include/dap_proc_thread.h b/dap-sdk/io/include/dap_proc_thread.h index e2b1e9fb5ec8efd2d70553b04fb9245a640fc845..573397d12475273006a087e3ca17e57d73ed7b5b 100644 --- a/dap-sdk/io/include/dap_proc_thread.h +++ b/dap-sdk/io/include/dap_proc_thread.h @@ -29,8 +29,6 @@ #include "dap_common.h" #include "dap_context.h" typedef struct dap_proc_thread{ - uint32_t cpu_id; - pthread_t thread_id; /* TID has been returned by pthread_create() */ dap_proc_queue_t *proc_queue; /* Queues */ atomic_uint proc_queue_size; /* Thread's load factor - is not supported at the moment */ @@ -47,16 +45,12 @@ typedef struct dap_proc_thread{ dap_events_socket_t * event_exit; - pthread_cond_t started_cond; - pthread_mutex_t started_mutex ; - void * _inheritor; } dap_proc_thread_t; int dap_proc_thread_init(uint32_t a_threads_count); void dap_proc_thread_deinit(); -dap_proc_thread_t *dap_proc_thread_run_custom(void); dap_proc_thread_t *dap_proc_thread_get(uint32_t a_thread_number); dap_proc_thread_t *dap_proc_thread_get_auto(); diff --git a/dap-sdk/io/include/dap_server.h b/dap-sdk/io/include/dap_server.h index 606a9b6263ac2a9864b6a6d0f604a27f95a1d565..d2dc9b9fe3d64733f800ef34343cfc2ab5837ff7 100644 --- a/dap-sdk/io/include/dap_server.h +++ b/dap-sdk/io/include/dap_server.h @@ -75,8 +75,6 @@ typedef struct dap_server { #endif dap_list_t *es_listeners; - dap_events_t * events; - struct sockaddr_in listener_addr; // Kernel structure for listener's binded address #ifdef DAP_OS_UNIX @@ -98,7 +96,7 @@ typedef struct dap_server { int dap_server_init( ); // Init server module void dap_server_deinit( void ); // Deinit server module -dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16_t a_port, dap_server_type_t a_type, dap_events_socket_callbacks_t *a_callbacks); -dap_server_t* dap_server_new_local(dap_events_t *a_events, const char * a_path, const char * a_mode, dap_events_socket_callbacks_t *a_callbacks); +dap_server_t* dap_server_new(const char * a_addr, uint16_t a_port, dap_server_type_t a_type, dap_events_socket_callbacks_t *a_callbacks); +dap_server_t* dap_server_new_local(const char * a_path, const char * a_mode, dap_events_socket_callbacks_t *a_callbacks); void dap_server_delete(dap_server_t *a_server); diff --git a/dap-sdk/io/include/dap_worker.h b/dap-sdk/io/include/dap_worker.h index c9cf8201d45d171c089b95ea6f9fe59f538a647d..fadaf29ad1c3014d04d21d68a45b5d55bfd7e48c 100644 --- a/dap-sdk/io/include/dap_worker.h +++ b/dap-sdk/io/include/dap_worker.h @@ -36,7 +36,6 @@ typedef struct dap_timerfd dap_timerfd_t; typedef struct dap_worker { uint32_t id; - dap_events_t* events; dap_proc_queue_t* proc_queue; dap_events_socket_t *proc_queue_input; @@ -64,8 +63,6 @@ typedef struct dap_worker dap_timerfd_t * timer_check_activity; dap_context_t *context; - pthread_cond_t started_cond; - pthread_mutex_t started_mutex; void * _inheritor; } dap_worker_t; @@ -93,6 +90,8 @@ typedef struct dap_worker_msg_callback{ } dap_worker_msg_callback_t; +extern pthread_key_t g_pth_key_worker; + #ifdef __cplusplus extern "C" { #endif @@ -100,6 +99,10 @@ extern "C" { int dap_worker_init( size_t a_conn_timeout ); void dap_worker_deinit(); +static inline dap_worker_t * dap_worker_get_current(){ + return (dap_worker_t*) pthread_getspecific(g_pth_key_worker); +} + static inline int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_worker_t * a_worker) { return dap_context_add_esocket(a_worker->context, a_esocket); @@ -111,8 +114,10 @@ dap_worker_t *dap_worker_add_events_socket_auto( dap_events_socket_t * a_events_ void dap_worker_exec_callback_on(dap_worker_t * a_worker, dap_worker_callback_t a_callback, void * a_arg); bool dap_worker_check_esocket_polled_now(); // Check if esocket is right now polled and present in list -// Thread function +// Context callbacks void *dap_worker_thread(void *arg); +void dap_worker_context_callback_started( dap_context_t * a_context, void *a_arg); +void dap_worker_context_callback_stopped( dap_context_t * a_context, void *a_arg); #ifdef __cplusplus diff --git a/dap-sdk/net/client/dap_client.c b/dap-sdk/net/client/dap_client.c index d09fd83fb6f5eb35886df8b270d60ed60118343b..a919f969c5717e81bf4faae723f30b989f60f2a3 100644 --- a/dap-sdk/net/client/dap_client.c +++ b/dap-sdk/net/client/dap_client.c @@ -71,7 +71,7 @@ void dap_client_deinit() * @param a_stage_status_error_callback * @return */ -dap_client_t * dap_client_new(dap_events_t * a_events, dap_client_callback_t a_stage_status_callback +dap_client_t * dap_client_new( dap_client_callback_t a_stage_status_callback ,dap_client_callback_t a_stage_status_error_callback ) { // ALLOC MEM FOR dap_client @@ -88,7 +88,6 @@ dap_client_t * dap_client_new(dap_events_t * a_events, dap_client_callback_t a_s // CONSTRUCT dap_client object dap_client_pvt_t *l_client_pvt = DAP_CLIENT_PVT(l_client); l_client_pvt->client = l_client; - l_client_pvt->events = a_events; l_client_pvt->stage_status_callback = a_stage_status_callback; l_client_pvt->stage_status_error_callback = a_stage_status_error_callback; l_client_pvt->worker = dap_events_worker_get_auto(); diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index c321f6ab142e9920f10bf1b331aa478135a74b80..6767dc661c3c9a4c0e0bab214764c0a43aec2454 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -185,7 +185,7 @@ static bool s_timer_timeout_after_connected_check(void * a_arg) assert(a_arg); dap_events_socket_uuid_t * l_es_uuid_ptr = (dap_events_socket_uuid_t *) a_arg; - dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); // We're in own esocket context + dap_worker_t * l_worker = dap_worker_get_current(); // We're in own esocket context assert(l_worker); dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid( l_worker->context, *l_es_uuid_ptr); if(l_es){ @@ -222,7 +222,7 @@ static bool s_timer_timeout_check(void * a_arg) dap_events_socket_uuid_t *l_es_uuid = (dap_events_socket_uuid_t*) a_arg; assert(l_es_uuid); - dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); // We're in own esocket context + dap_worker_t * l_worker = dap_worker_get_current(); // We're in own esocket context assert(l_worker); dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid(l_worker->context, *l_es_uuid); if(l_es){ @@ -521,7 +521,7 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_upli timeout.tv_sec = 10; timeout.tv_usec = 0; - dap_events_socket_t *l_ev_socket = dap_events_socket_wrap_no_add(dap_events_get_default(), l_socket, &l_s_callbacks); + dap_events_socket_t *l_ev_socket = dap_events_socket_wrap_no_add(l_socket, &l_s_callbacks); log_it(L_DEBUG,"Created client request socket %"DAP_FORMAT_SOCKET, l_socket); // create private struct diff --git a/dap-sdk/net/client/dap_client_pool.c b/dap-sdk/net/client/dap_client_pool.c index 86648cca0e5848e9a36057d1a9c2f299183fe967..100472e5896ea571bd0e277a0bcb15b5276059ee 100644 --- a/dap-sdk/net/client/dap_client_pool.c +++ b/dap-sdk/net/client/dap_client_pool.c @@ -40,15 +40,13 @@ struct dap_client_list{ void s_stage_status_callback(dap_client_t * a_client, void* a_arg); void s_stage_status_error_callback(dap_client_t * a_client, void* a_arg); -dap_events_t * s_events = NULL; /** * @brief dap_client_pool_init * @param a_events * @return */ -int dap_client_pool_init(dap_events_t * a_events) +int dap_client_pool_init() { - s_events = a_events; return 0; } @@ -63,7 +61,7 @@ void dap_client_pool_deinit() */ dap_client_t * dap_client_pool_new(const char * a_client_id) { - dap_client_t * l_client = dap_client_new(s_events, s_stage_status_callback + dap_client_t * l_client = dap_client_new(s_stage_status_callback , s_stage_status_error_callback ); return l_client; } diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 56a7fdae61c6ae27efd521d7a265e13793b3d24b..4b748e33f0657566fd335c249b2586a34668c69e 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -153,7 +153,6 @@ void dap_client_pvt_new(dap_client_pvt_t * a_client_pvt) a_client_pvt->stage = STAGE_BEGIN; // start point of state machine a_client_pvt->stage_status = STAGE_STATUS_DONE; a_client_pvt->uplink_protocol_version = DAP_PROTOCOL_VERSION; - a_client_pvt->events = dap_events_get_default(); // add to list dap_client_pvt_hh_add_unsafe(a_client_pvt); } @@ -230,7 +229,7 @@ static bool s_stream_timer_timeout_check(void * a_arg) { assert(a_arg); dap_events_socket_uuid_t *l_es_uuid_ptr = (dap_events_socket_uuid_t*) a_arg; - dap_worker_t *l_worker = dap_events_get_current_worker(dap_events_get_default()); + dap_worker_t *l_worker = dap_worker_get_current(); assert(l_worker); dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid(l_worker->context, *l_es_uuid_ptr); @@ -272,7 +271,7 @@ static bool s_stream_timer_timeout_after_connected_check(void * a_arg) assert(a_arg); dap_events_socket_uuid_t *l_es_uuid_ptr = (dap_events_socket_uuid_t*) a_arg; - dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); + dap_worker_t * l_worker = dap_worker_get_current(); assert(l_worker); dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid(l_worker->context, *l_es_uuid_ptr); @@ -314,7 +313,7 @@ static bool s_enc_init_delay_before_request_timer_callback(void * a_arg) { assert (a_arg); dap_events_socket_uuid_t* l_es_uuid_ptr = (dap_events_socket_uuid_t*) a_arg; - dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); + dap_worker_t * l_worker = dap_worker_get_current(); dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid(l_worker->context, *l_es_uuid_ptr); if(l_es){ dap_client_pvt_t *l_client_pvt = DAP_ESOCKET_CLIENT_PVT(l_es); @@ -483,8 +482,8 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt) .delete_callback = s_stream_es_callback_delete, .connected_callback = s_stream_es_callback_connected };// - a_client_pvt->stream_es = dap_events_socket_wrap_no_add(a_client_pvt->events, - (int)a_client_pvt->stream_socket, &l_s_callbacks); + a_client_pvt->stream_es = dap_events_socket_wrap_no_add( (int) a_client_pvt->stream_socket, + &l_s_callbacks); a_client_pvt->stream_es->flags |= DAP_SOCK_CONNECTING ; // To catch non-blocking error when connecting we should up WRITE flag //a_client_pvt->stream_es->flags |= DAP_SOCK_READY_TO_WRITE; a_client_pvt->stream_es->_inheritor = a_client_pvt; diff --git a/dap-sdk/net/client/include/dap_client.h b/dap-sdk/net/client/include/dap_client.h index 0173b8cc673cb8db251c47f18b0bf3f78b308cb0..9ac1b35a5eac880ce359e4ffcdbfc969731fa7fd 100644 --- a/dap-sdk/net/client/include/dap_client.h +++ b/dap-sdk/net/client/include/dap_client.h @@ -97,7 +97,7 @@ extern "C" { int dap_client_init(); void dap_client_deinit(); -dap_client_t * dap_client_new(dap_events_t * a_events, dap_client_callback_t a_stage_status_callback +dap_client_t * dap_client_new( dap_client_callback_t a_stage_status_callback , dap_client_callback_t a_stage_status_error_callback ); void dap_client_set_uplink_unsafe(dap_client_t * a_client,const char* a_addr, uint16_t a_port); diff --git a/dap-sdk/net/client/include/dap_client_pool.h b/dap-sdk/net/client/include/dap_client_pool.h index 310a3d9517496084cefa9c44a4cb45b8d6c5a082..1a435cc58734a32342fc4316a27f80e9c90a6dd1 100644 --- a/dap-sdk/net/client/include/dap_client_pool.h +++ b/dap-sdk/net/client/include/dap_client_pool.h @@ -25,7 +25,7 @@ #include "dap_client.h" #include "dap_events.h" -int dap_client_pool_init(dap_events_t * a_events); +int dap_client_pool_init(); void dap_client_pool_deinit(); dap_client_t * dap_client_pool_new (const char * a_client_id); diff --git a/dap-sdk/net/client/include/dap_client_pvt.h b/dap-sdk/net/client/include/dap_client_pvt.h index d8b9c20b65e611ac745c17c005d39f3a2d354471..13c61f8b1c11646261fcc94441180be83000658a 100644 --- a/dap-sdk/net/client/include/dap_client_pvt.h +++ b/dap-sdk/net/client/include/dap_client_pvt.h @@ -48,7 +48,6 @@ typedef struct dap_client_internal dap_stream_t* stream; dap_stream_worker_t* stream_worker; dap_worker_t * worker; - dap_events_t * events; dap_enc_key_type_t session_key_type; dap_enc_key_type_t session_key_open_type; diff --git a/dap-sdk/net/server/notify_server/src/dap_notify_srv.c b/dap-sdk/net/server/notify_server/src/dap_notify_srv.c index 37df9da127904d3d47e5ff5deff8956c545d82b7..4560a6129d3c24d6fe0e7b0918f982c107d0ba95 100644 --- a/dap-sdk/net/server/notify_server/src/dap_notify_srv.c +++ b/dap-sdk/net/server/notify_server/src/dap_notify_srv.c @@ -64,9 +64,9 @@ int dap_notify_server_init() uint16_t l_notify_socket_port = dap_config_get_item_uint16_default(g_config, "notify_server", "listen_port",0); if(l_notify_socket_path){ - s_notify_server = dap_server_new_local(dap_events_get_default(), l_notify_socket_path, l_notify_socket_path_mode, NULL); + s_notify_server = dap_server_new_local(l_notify_socket_path, l_notify_socket_path_mode, NULL); }else if (l_notify_socket_address && l_notify_socket_port ){ - s_notify_server = dap_server_new(dap_events_get_default(), l_notify_socket_address, + s_notify_server = dap_server_new( l_notify_socket_address, l_notify_socket_port, SERVER_TCP, NULL); }else{ log_it(L_INFO,"Notify server is not configured, nothing to init but thats okay"); @@ -78,7 +78,7 @@ int dap_notify_server_init() s_notify_server->client_callbacks.new_callback = s_notify_server_callback_new; s_notify_server->client_callbacks.delete_callback = s_notify_server_callback_delete; s_notify_server_queue = dap_events_socket_create_type_queue_ptr_mt(dap_events_worker_get_auto(),s_notify_server_callback_queue); - uint32_t l_workers_count = dap_events_worker_get_count(); + uint32_t l_workers_count = dap_events_thread_get_count(); s_notify_server_queue_inter = DAP_NEW_Z_SIZE(dap_events_socket_t*,sizeof (dap_events_socket_t*)*l_workers_count ); for(uint32_t i = 0; i < l_workers_count; i++){ s_notify_server_queue_inter[i] = dap_events_socket_queue_ptr_create_input(s_notify_server_queue); @@ -116,7 +116,7 @@ int dap_notify_server_send_f_inter(uint32_t a_worker_id, const char * a_format,. { if(!s_notify_server_queue_inter) // If not initialized - nothing to notify return 0; - if(a_worker_id>= dap_events_worker_get_count()){ + if(a_worker_id>= dap_events_thread_get_count()){ log_it(L_ERROR,"Wrong worker id %u for send_f_inter() function", a_worker_id); return -10; } @@ -176,7 +176,7 @@ static void s_notify_server_callback_queue(dap_events_socket_t * a_es, void * a_ dap_events_socket_handler_hh_t * l_socket_handler = NULL,* l_tmp = NULL; HASH_ITER(hh, s_notify_server_clients, l_socket_handler, l_tmp){ uint32_t l_worker_id = l_socket_handler->worker_id; - if(l_worker_id>= dap_events_worker_get_count()){ + if(l_worker_id>= dap_events_thread_get_count()){ log_it(L_ERROR,"Wrong worker id %u for send_inter() function", l_worker_id); continue; } diff --git a/dap-sdk/net/stream/ch/dap_stream_ch.c b/dap-sdk/net/stream/ch/dap_stream_ch.c index 9743a669d5f39688de8e484646a7cf29ca3c95f8..6b16269b77ea02d52af5031fd173929ea6e59ae8 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch.c @@ -200,11 +200,11 @@ void dap_stream_ch_set_ready_to_write_unsafe(dap_stream_ch_t * ch,bool is_ready) static void s_print_workers_channels() { - uint32_t l_worker_count = dap_events_worker_get_count(); + uint32_t l_worker_count = dap_events_thread_get_count(); dap_stream_ch_t* l_msg_ch = NULL; dap_stream_ch_t* l_msg_ch_tmp = NULL; //print all worker connections - dap_events_worker_print_all(); + dap_worker_print_all(); for (uint32_t i = 0; i < l_worker_count; i++){ uint32_t l_channel_count = 0; dap_worker_t* l_worker = dap_events_worker_get(i); diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index d49d08488c5dae84e10d305487dc04c99850af87..1723311f0c54808551fcf1e13f983a411868803f 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -888,7 +888,7 @@ static bool s_callback_keepalive(void *a_arg, bool a_server_side) if (!a_arg) return false; dap_events_socket_uuid_t * l_es_uuid = (dap_events_socket_uuid_t*) a_arg; - dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); + dap_worker_t * l_worker = dap_worker_get_current(); dap_events_socket_t * l_es = dap_context_esocket_find_by_uuid(l_worker->context, *l_es_uuid); if(l_es) { dap_stream_t *l_stream = NULL; diff --git a/dap-sdk/net/stream/stream/dap_stream_worker.c b/dap-sdk/net/stream/stream/dap_stream_worker.c index 8d1fa9e5158a0dc4251fe0b2442e4b5eaa51d4d7..4fc5631ff00411aaaf5f2dec35b30e13e8d5bae3 100644 --- a/dap-sdk/net/stream/stream/dap_stream_worker.c +++ b/dap-sdk/net/stream/stream/dap_stream_worker.c @@ -43,7 +43,7 @@ static void s_ch_io_callback(dap_events_socket_t * a_es, void * a_msg); */ int dap_stream_worker_init() { - uint32_t l_worker_count = dap_events_worker_get_count(); + uint32_t l_worker_count = dap_events_thread_get_count(); for (uint32_t i = 0; i < l_worker_count; i++){ dap_worker_t * l_worker = dap_events_worker_get(i); if (!l_worker) { diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 617dc0178693ffc00a04955e9b21a18fc194eb71..c34e107b1100450ce1639268ddef6e77316d29af 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -853,7 +853,7 @@ static void s_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net static bool s_chain_timer_callback(void *a_arg) { - dap_worker_t *l_worker = dap_events_get_current_worker(dap_events_get_default()); + dap_worker_t *l_worker = dap_worker_get_current(); dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(l_worker), *(dap_stream_ch_uuid_t *)a_arg); if (!l_ch) { DAP_DELETE(a_arg); diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 298c91925144f909093996fc46fb59425de09cef..6cc468cb7b5239efd910b683455def512632b05c 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -204,7 +204,7 @@ dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_events_socket_ } dap_chain_node_client_t *l_me = l_client_found->client; - dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); + dap_worker_t * l_worker = dap_worker_get_current(); if (!l_worker) return NODE_SYNC_STATUS_FAILED; assert(l_me); @@ -721,7 +721,6 @@ dap_chain_node_client_t* dap_chain_node_client_create_n_connect(dap_chain_net_t #endif pthread_mutex_init(&l_node_client->wait_mutex, NULL); - l_node_client->events = NULL; //dap_events_new(); l_node_client->remote_node_addr.uint64 = a_node_info->hdr.address.uint64; if (dap_chain_node_client_connect_internal(l_node_client, a_active_channels)) return l_node_client; @@ -738,7 +737,7 @@ dap_chain_node_client_t* dap_chain_node_client_create_n_connect(dap_chain_net_t */ static bool dap_chain_node_client_connect_internal(dap_chain_node_client_t *a_node_client, const char *a_active_channels) { - a_node_client->client = dap_client_new(a_node_client->events, s_stage_status_callback, + a_node_client->client = dap_client_new( s_stage_status_callback, s_stage_status_error_callback); dap_client_set_is_always_reconnect(a_node_client->client, false); a_node_client->client->_inheritor = a_node_client; diff --git a/modules/net/dap_chain_node_dns_client.c b/modules/net/dap_chain_node_dns_client.c index ee4fdc06aa3df44418e47f3eef3ca0b5df92e18a..7208405d9117e54e7525ac90678e0c60f4bf8644 100644 --- a/modules/net/dap_chain_node_dns_client.c +++ b/modules/net/dap_chain_node_dns_client.c @@ -132,10 +132,8 @@ static bool s_dns_client_esocket_timeout_callback(void * a_arg) dap_events_socket_uuid_t * l_es_uuid_ptr = (dap_events_socket_uuid_t *) a_arg; assert(l_es_uuid_ptr); - dap_events_t * l_events = dap_events_get_default(); - assert(l_events); - dap_worker_t * l_worker = dap_events_get_current_worker(l_events); // We're in own esocket context + dap_worker_t * l_worker = dap_worker_get_current(); // We're in own esocket context assert(l_worker); dap_events_socket_t * l_es; diff --git a/modules/net/dap_chain_node_dns_server.c b/modules/net/dap_chain_node_dns_server.c index 27d0ec56744c1600339209df9d147e13234a7e5a..857e29512436c74d8e1c9f79223966c6808a6645 100644 --- a/modules/net/dap_chain_node_dns_server.c +++ b/modules/net/dap_chain_node_dns_server.c @@ -256,12 +256,12 @@ cleanup: return; } -void dap_dns_server_start(dap_events_t *a_ev, uint16_t a_port) +void dap_dns_server_start( uint16_t a_port) { s_dns_server = DAP_NEW_Z(dap_dns_server_t); dap_events_socket_callbacks_t l_cb = {}; l_cb.read_callback = dap_dns_client_read; - s_dns_server->instance = dap_server_new(a_ev, NULL, a_port, SERVER_UDP, &l_cb); + s_dns_server->instance = dap_server_new( NULL, a_port, SERVER_UDP, &l_cb); if (!s_dns_server->instance) { log_it(L_ERROR, "Can't start DNS server"); return; diff --git a/modules/net/include/dap_chain_node_client.h b/modules/net/include/dap_chain_node_client.h index b0bdbc79c352f34422b791d78e58e743270d01ce..3026a70ba0da367e49304a64f484dd9aeafad376 100644 --- a/modules/net/include/dap_chain_node_client.h +++ b/modules/net/include/dap_chain_node_client.h @@ -108,7 +108,6 @@ typedef struct dap_chain_node_client { dap_stream_ch_uuid_t ch_chain_net_srv_uuid; dap_chain_node_info_t * info; - dap_events_t *events; dap_chain_net_t * net; char last_error[128]; diff --git a/modules/net/include/dap_chain_node_dns_server.h b/modules/net/include/dap_chain_node_dns_server.h index ea72b39cb71a677e93606ed59572a67f75c63107..fbdae1f225095fa0b589aed0e8682b25fcb18b93 100644 --- a/modules/net/include/dap_chain_node_dns_server.h +++ b/modules/net/include/dap_chain_node_dns_server.h @@ -117,7 +117,7 @@ typedef struct _dap_dns_server_t { -void dap_dns_server_start(dap_events_t *a_ev, uint16_t a_port); +void dap_dns_server_start(uint16_t a_port); void dap_dns_server_stop(); int dap_dns_zone_register(char *zone, dap_dns_zone_callback_t callback); int dap_dns_zone_unregister(char *zone);