diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index 6b6e39d2dbde4e15fd0626db98e5f0f5a8c7b0c8..79ca6f4484c0ac269259a660ef45e9956eb04f3d 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -354,7 +354,7 @@ void* dap_client_http_request_custom(const char *a_uplink_addr, uint16_t a_uplin setsockopt(l_socket, SOL_SOCKET, SO_SNDBUF, (void*) &buffsize, sizeof(buffsize)); setsockopt(l_socket, SOL_SOCKET, SO_RCVBUF, (void*) &buffsize, sizeof(buffsize)); #endif - dap_events_socket_t *l_ev_socket = dap_events_socket_wrap_no_add(NULL, l_socket, &l_s_callbacks); + dap_events_socket_t *l_ev_socket = dap_events_socket_wrap_no_add(dap_events_get_default(), l_socket, &l_s_callbacks); // create private struct dap_client_http_internal_t *l_client_http_internal = DAP_NEW_Z(dap_client_http_internal_t); diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index eea06da02990a4486f604601bf423fed74ead038..7556bda3826158ac80c7debe33acdfd757a55dd5 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -126,6 +126,7 @@ void dap_client_pvt_new(dap_client_pvt_t * a_client_internal) a_client_internal->stage = STAGE_BEGIN; // start point of state machine a_client_internal->stage_status = STAGE_STATUS_DONE; a_client_internal->uplink_protocol_version = DAP_PROTOCOL_VERSION; + a_client_internal->events = dap_events_get_default(); // add to list dap_client_pvt_hh_add(a_client_internal); } diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c index 3df6512bc73aa363e9a394704ac7aaa3e0fa2deb..fd786c671c1b1831f49cb734c6239ffa6f35185e 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/net/core/dap_events.c @@ -80,7 +80,7 @@ static bool s_workers_init = false; static uint32_t s_threads_count = 1; static dap_worker_t **s_workers = NULL; static dap_thread_t *s_threads = NULL; - +static dap_events_t * s_events_default = NULL; uint32_t dap_get_cpu_count( ) { @@ -201,11 +201,17 @@ void dap_events_deinit( ) */ dap_events_t * dap_events_new( ) { - dap_events_t *ret = DAP_NEW_Z(dap_events_t); + dap_events_t *ret = DAP_NEW_Z(dap_events_t); - pthread_rwlock_init( &ret->sockets_rwlock, NULL ); + pthread_rwlock_init( &ret->sockets_rwlock, NULL ); + if ( s_events_default == NULL) + s_events_default = ret; + return ret; +} - return ret; +dap_events_t* dap_events_get_default( ) +{ + return s_events_default; } /** diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index d9f71bf55b7616a494df2ff96fec8dda392088b1..876bad62598ad2c1f5936354666d3a5e27c93434 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -80,23 +80,30 @@ void dap_events_socket_deinit( ) dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, int a_sock, dap_events_socket_callbacks_t *a_callbacks ) { -// assert(a_events); - assert(a_callbacks); + assert(a_events); + assert(a_callbacks); - dap_events_socket_t *ret = DAP_NEW_Z( dap_events_socket_t ); + dap_events_socket_t *ret = DAP_NEW_Z( dap_events_socket_t ); - ret->socket = a_sock; - ret->events = a_events; - memcpy(&ret->callbacks, a_callbacks, sizeof(ret->callbacks) ); - ret->flags = DAP_SOCK_READY_TO_READ; + ret->socket = a_sock; + ret->events = a_events; + memcpy(&ret->callbacks, a_callbacks, sizeof(ret->callbacks) ); + ret->flags = DAP_SOCK_READY_TO_READ; -#if defined(DAP_EVENTS_CAPS_EPOLL) + #if defined(DAP_EVENTS_CAPS_EPOLL) ret->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP; -#endif + #endif - // log_it( L_DEBUG,"Dap event socket wrapped around %d sock a_events = %X", a_sock, a_events ); + if ( a_sock!= 0 && a_sock != -1){ + pthread_rwlock_wrlock(&a_events->sockets_rwlock); + HASH_ADD(hh,a_events->sockets, socket, sizeof (int), ret); + pthread_rwlock_unlock(&a_events->sockets_rwlock); + }else + log_it(L_WARNING, "Be carefull, you've wrapped socket 0 or -1 so it wasn't added to global list. Do it yourself when possible"); - return ret; + // log_it( L_DEBUG,"Dap event socket wrapped around %d sock a_events = %X", a_sock, a_events ); + + return ret; } /** @@ -107,6 +114,7 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct dap_worker * a_worker) { a_es->last_ping_request = time(NULL); + log_it(L_DEBUG, "Assigned %p on worker %u", a_es, a_worker->id); dap_worker_add_events_socket(a_es,a_worker); } diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 79371f667e94068fb0d7f051abc059c949298a22..14f61eb10693a73bcbe7bb9d6e1ca7c96a7e2482 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -78,7 +78,7 @@ void *dap_worker_thread(void *arg) dap_cpu_assign_thread_on(l_worker->id); struct sched_param l_shed_params; l_shed_params.sched_priority = 0; - pthread_setschedparam(pthread_self(),SCHED_OTHER ,&l_shed_params); + pthread_setschedparam(pthread_self(),SCHED_FIFO ,&l_shed_params); l_worker->queue_es_new = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_new_es_callback); l_worker->queue_es_delete = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_delete_es_callback); @@ -123,7 +123,7 @@ void *dap_worker_thread(void *arg) l_cur->last_time_active = l_cur_time; //log_it(L_DEBUG, "Worker=%d fd=%d socket=%d event=0x%x(%d)", l_worker->id, - // l_worker->epoll_fd,l_cur->socket, l_epoll_events[n].events,l_epoll_events[n].events); + // l_worker->epoll_fd,l_cur->socket, l_epoll_events[n].events,l_epoll_events[n].events); int l_sock_err = 0, l_sock_err_size = sizeof(l_sock_err); //connection already closed (EPOLLHUP - shutdown has been made in both directions) if(l_epoll_events[n].events & EPOLLHUP) { // && events[n].events & EPOLLERR) { @@ -355,6 +355,11 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg; dap_worker_t * w = a_es->worker; //log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new); + if(dap_events_socket_check_unsafe( w, a_es)){ + log_it(L_ERROR, "Already assigned %d (%p), you're doing smth wrong", a_es->socket, a_es); + return; + } + if ( l_es_new->type == DESCRIPTOR_TYPE_SOCKET || l_es_new->type == DESCRIPTOR_TYPE_SOCKET_LISTENING ){ int l_cpu = w->id; setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu)); @@ -390,9 +395,6 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) log_it(L_CRITICAL,"Can't add event socket's handler to worker i/o poll mechanism with error %d", errno); }else{ // Add in global list - pthread_rwlock_wrlock(&w->events->sockets_rwlock); - HASH_ADD(hh, w->events->sockets, socket, sizeof (int), l_es_new ); - pthread_rwlock_unlock(&w->events->sockets_rwlock); // Add in worker l_es_new->me = l_es_new; HASH_ADD(hh_worker, w->esockets, me, sizeof(void *), l_es_new ); diff --git a/dap-sdk/net/core/include/dap_events.h b/dap-sdk/net/core/include/dap_events.h index 4ba95b55293f78c93945887eb2cd7429d1e48c1d..c8e5471fb9e525c88370d01b1fa8c2661269f680 100644 --- a/dap-sdk/net/core/include/dap_events.h +++ b/dap-sdk/net/core/include/dap_events.h @@ -47,6 +47,7 @@ int dap_events_init( uint32_t a_threads_count, size_t a_conn_timeout ); // Init void dap_events_deinit( ); // Deinit server module dap_events_t* dap_events_new( ); +dap_events_t* dap_events_get_default( ); void dap_events_delete( dap_events_t * a_events ); int32_t dap_events_start( dap_events_t *a_events ); diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 859485cc2bf781b7ac2bd69a143257a54410c457..bb691b9dd51b5bfefa52d42237dc4fa67f2e9cd9 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -779,6 +779,8 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) { (void) a_arg; + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_out_pkt_callback, a_ch); + if (l_ch_chain->state != CHAIN_STATE_IDLE) + dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_out_pkt_callback, a_ch); }