From 93f6b7f54ed00873c159da00c134dc6a2621a18e Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net> Date: Sun, 23 Aug 2020 01:39:36 +0700 Subject: [PATCH] [*] 3rdparty: Updated uthash, utarray and utstrings [*] Fixed workers allocation [*] Some more renames --- 3rdparty/uthash/src/utarray.h | 25 ++++-- 3rdparty/uthash/src/uthash.h | 97 +++------------------ 3rdparty/uthash/src/utstring.h | 21 +++-- dap-sdk/net/core/dap_events.c | 62 +++++++++---- dap-sdk/net/core/dap_events_socket.c | 15 ++-- dap-sdk/net/core/dap_server.c | 7 +- dap-sdk/net/core/dap_worker.c | 25 ++++-- dap-sdk/net/core/include/dap_events.h | 2 +- dap-sdk/net/core/include/dap_worker.h | 5 +- modules/service/vpn/dap_chain_net_srv_vpn.c | 2 +- 10 files changed, 121 insertions(+), 140 deletions(-) diff --git a/3rdparty/uthash/src/utarray.h b/3rdparty/uthash/src/utarray.h index 6ed0dcebcb..6b6201820e 100644 --- a/3rdparty/uthash/src/utarray.h +++ b/3rdparty/uthash/src/utarray.h @@ -38,8 +38,13 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define UTARRAY_UNUSED #endif -#ifndef oom -#define oom() exit(-1) +#ifdef oom +#error "The name of macro 'oom' has been changed to 'utarray_oom'. Please update your code." +#define utarray_oom() oom() +#endif + +#ifndef utarray_oom +#define utarray_oom() exit(-1) #endif typedef void (ctor_f)(void *dst, const void *src); @@ -78,7 +83,9 @@ typedef struct { #define utarray_new(a,_icd) do { \ (a) = (UT_array*)malloc(sizeof(UT_array)); \ - if ((a) == NULL) oom(); \ + if ((a) == NULL) { \ + utarray_oom(); \ + } \ utarray_init(a,_icd); \ } while(0) @@ -92,7 +99,9 @@ typedef struct { char *utarray_tmp; \ while (((a)->i+(by)) > (a)->n) { (a)->n = ((a)->n ? (2*(a)->n) : 8); } \ utarray_tmp=(char*)realloc((a)->d, (a)->n*(a)->icd.sz); \ - if (utarray_tmp == NULL) oom(); \ + if (utarray_tmp == NULL) { \ + utarray_oom(); \ + } \ (a)->d=utarray_tmp; \ } \ } while(0) @@ -118,7 +127,7 @@ typedef struct { #define utarray_len(a) ((a)->i) #define utarray_eltptr(a,j) (((j) < (a)->i) ? _utarray_eltptr(a,j) : NULL) -#define _utarray_eltptr(a,j) ((a)->d + ((a)->icd.sz * (j))) +#define _utarray_eltptr(a,j) ((void*)((a)->d + ((a)->icd.sz * (j)))) #define utarray_insert(a,p,j) do { \ if ((j) > (a)->i) utarray_resize(a,j); \ @@ -216,10 +225,10 @@ typedef struct { #define utarray_find(a,v,cmp) bsearch((v),(a)->d,(a)->i,(a)->icd.sz,cmp) #define utarray_front(a) (((a)->i) ? (_utarray_eltptr(a,0)) : NULL) -#define utarray_next(a,e) (((e)==NULL) ? utarray_front(a) : ((((a)->i) > (utarray_eltidx(a,e)+1)) ? _utarray_eltptr(a,utarray_eltidx(a,e)+1) : NULL)) -#define utarray_prev(a,e) (((e)==NULL) ? utarray_back(a) : ((utarray_eltidx(a,e) > 0) ? _utarray_eltptr(a,utarray_eltidx(a,e)-1) : NULL)) +#define utarray_next(a,e) (((e)==NULL) ? utarray_front(a) : (((a)->i != utarray_eltidx(a,e)+1) ? _utarray_eltptr(a,utarray_eltidx(a,e)+1) : NULL)) +#define utarray_prev(a,e) (((e)==NULL) ? utarray_back(a) : ((utarray_eltidx(a,e) != 0) ? _utarray_eltptr(a,utarray_eltidx(a,e)-1) : NULL)) #define utarray_back(a) (((a)->i) ? (_utarray_eltptr(a,(a)->i-1)) : NULL) -#define utarray_eltidx(a,e) (((char*)(e) >= (a)->d) ? (((char*)(e) - (a)->d)/(a)->icd.sz) : -1) +#define utarray_eltidx(a,e) (((char*)(e) - (a)->d) / (a)->icd.sz) /* last we pre-define a few icd for common utarrays of ints and strings */ static void utarray_str_cpy(void *dst, const void *src) { diff --git a/3rdparty/uthash/src/uthash.h b/3rdparty/uthash/src/uthash.h index 76bdca6419..5e5866a353 100644 --- a/3rdparty/uthash/src/uthash.h +++ b/3rdparty/uthash/src/uthash.h @@ -144,7 +144,7 @@ typedef unsigned char uint8_t; /* calculate the element whose hash handle address is hhp */ #define ELMT_FROM_HH(tbl,hhp) ((void*)(((char*)(hhp)) - ((tbl)->hho))) /* calculate the hash handle from element address elp */ -#define HH_FROM_ELMT(tbl,elp) ((UT_hash_handle *)(((char*)(elp)) + ((tbl)->hho))) +#define HH_FROM_ELMT(tbl,elp) ((UT_hash_handle*)(void*)(((char*)(elp)) + ((tbl)->hho))) #define HASH_ROLLBACK_BKT(hh, head, itemptrhh) \ do { \ @@ -175,9 +175,12 @@ do { #define HASH_FIND(hh,head,keyptr,keylen,out) \ do { \ - unsigned _hf_hashv; \ - HASH_VALUE(keyptr, keylen, _hf_hashv); \ - HASH_FIND_BYHASHVALUE(hh, head, keyptr, keylen, _hf_hashv, out); \ + (out) = NULL; \ + if (head) { \ + unsigned _hf_hashv; \ + HASH_VALUE(keyptr, keylen, _hf_hashv); \ + HASH_FIND_BYHASHVALUE(hh, head, keyptr, keylen, _hf_hashv, out); \ + } \ } while (0) #ifdef HASH_BLOOM @@ -519,7 +522,8 @@ do { * This is for uthash developer only; it compiles away if HASH_DEBUG isn't defined. */ #ifdef HASH_DEBUG -#define HASH_OOPS(...) do { fprintf(stderr,__VA_ARGS__); exit(-1); } while (0) +#include <stdio.h> /* fprintf, stderr */ +#define HASH_OOPS(...) do { fprintf(stderr, __VA_ARGS__); exit(-1); } while (0) #define HASH_FSCK(hh,head,where) \ do { \ struct UT_hash_handle *_thh; \ @@ -750,87 +754,6 @@ do { hashv += hashv >> 6; \ } while (0) -#ifdef HASH_USING_NO_STRICT_ALIASING -/* The MurmurHash exploits some CPU's (x86,x86_64) tolerance for unaligned reads. - * For other types of CPU's (e.g. Sparc) an unaligned read causes a bus error. - * MurmurHash uses the faster approach only on CPU's where we know it's safe. - * - * Note the preprocessor built-in defines can be emitted using: - * - * gcc -m64 -dM -E - < /dev/null (on gcc) - * cc -## a.c (where a.c is a simple test file) (Sun Studio) - */ -#if (defined(__i386__) || defined(__x86_64__) || defined(_M_IX86)) -#define MUR_GETBLOCK(p,i) p[i] -#else /* non intel */ -#define MUR_PLUS0_ALIGNED(p) (((unsigned long)p & 3UL) == 0UL) -#define MUR_PLUS1_ALIGNED(p) (((unsigned long)p & 3UL) == 1UL) -#define MUR_PLUS2_ALIGNED(p) (((unsigned long)p & 3UL) == 2UL) -#define MUR_PLUS3_ALIGNED(p) (((unsigned long)p & 3UL) == 3UL) -#define WP(p) ((uint32_t*)((unsigned long)(p) & ~3UL)) -#if (defined(__BIG_ENDIAN__) || defined(SPARC) || defined(__ppc__) || defined(__ppc64__)) -#define MUR_THREE_ONE(p) ((((*WP(p))&0x00ffffff) << 8) | (((*(WP(p)+1))&0xff000000) >> 24)) -#define MUR_TWO_TWO(p) ((((*WP(p))&0x0000ffff) <<16) | (((*(WP(p)+1))&0xffff0000) >> 16)) -#define MUR_ONE_THREE(p) ((((*WP(p))&0x000000ff) <<24) | (((*(WP(p)+1))&0xffffff00) >> 8)) -#else /* assume little endian non-intel */ -#define MUR_THREE_ONE(p) ((((*WP(p))&0xffffff00) >> 8) | (((*(WP(p)+1))&0x000000ff) << 24)) -#define MUR_TWO_TWO(p) ((((*WP(p))&0xffff0000) >>16) | (((*(WP(p)+1))&0x0000ffff) << 16)) -#define MUR_ONE_THREE(p) ((((*WP(p))&0xff000000) >>24) | (((*(WP(p)+1))&0x00ffffff) << 8)) -#endif -#define MUR_GETBLOCK(p,i) (MUR_PLUS0_ALIGNED(p) ? ((p)[i]) : \ - (MUR_PLUS1_ALIGNED(p) ? MUR_THREE_ONE(p) : \ - (MUR_PLUS2_ALIGNED(p) ? MUR_TWO_TWO(p) : \ - MUR_ONE_THREE(p)))) -#endif -#define MUR_ROTL32(x,r) (((x) << (r)) | ((x) >> (32 - (r)))) -#define MUR_FMIX(_h) \ -do { \ - _h ^= _h >> 16; \ - _h *= 0x85ebca6bu; \ - _h ^= _h >> 13; \ - _h *= 0xc2b2ae35u; \ - _h ^= _h >> 16; \ -} while (0) - -#define HASH_MUR(key,keylen,hashv) \ -do { \ - const uint8_t *_mur_data = (const uint8_t*)(key); \ - const int _mur_nblocks = (int)(keylen) / 4; \ - uint32_t _mur_h1 = 0xf88D5353u; \ - uint32_t _mur_c1 = 0xcc9e2d51u; \ - uint32_t _mur_c2 = 0x1b873593u; \ - uint32_t _mur_k1 = 0; \ - const uint8_t *_mur_tail; \ - const uint32_t *_mur_blocks = (const uint32_t*)(_mur_data+(_mur_nblocks*4)); \ - int _mur_i; \ - for (_mur_i = -_mur_nblocks; _mur_i != 0; _mur_i++) { \ - _mur_k1 = MUR_GETBLOCK(_mur_blocks,_mur_i); \ - _mur_k1 *= _mur_c1; \ - _mur_k1 = MUR_ROTL32(_mur_k1,15); \ - _mur_k1 *= _mur_c2; \ - \ - _mur_h1 ^= _mur_k1; \ - _mur_h1 = MUR_ROTL32(_mur_h1,13); \ - _mur_h1 = (_mur_h1*5U) + 0xe6546b64u; \ - } \ - _mur_tail = (const uint8_t*)(_mur_data + (_mur_nblocks*4)); \ - _mur_k1=0; \ - switch ((keylen) & 3U) { \ - case 0: break; \ - case 3: _mur_k1 ^= (uint32_t)_mur_tail[2] << 16; /* FALLTHROUGH */ \ - case 2: _mur_k1 ^= (uint32_t)_mur_tail[1] << 8; /* FALLTHROUGH */ \ - case 1: _mur_k1 ^= (uint32_t)_mur_tail[0]; \ - _mur_k1 *= _mur_c1; \ - _mur_k1 = MUR_ROTL32(_mur_k1,15); \ - _mur_k1 *= _mur_c2; \ - _mur_h1 ^= _mur_k1; \ - } \ - _mur_h1 ^= (uint32_t)(keylen); \ - MUR_FMIX(_mur_h1); \ - hashv = _mur_h1; \ -} while (0) -#endif /* HASH_USING_NO_STRICT_ALIASING */ - /* iterate over items in a known bucket to find desired item */ #define HASH_FIND_IN_BKT(tbl,hh,head,keyptr,keylen_in,hashval,out) \ do { \ @@ -1080,7 +1003,7 @@ do { _elt = ELMT_FROM_HH((src)->hh_src.tbl, _src_hh); \ if (cond(_elt)) { \ IF_HASH_NONFATAL_OOM( int _hs_oomed = 0; ) \ - _dst_hh = (UT_hash_handle*)(((char*)_elt) + _dst_hho); \ + _dst_hh = (UT_hash_handle*)(void*)(((char*)_elt) + _dst_hho); \ _dst_hh->key = _src_hh->key; \ _dst_hh->keylen = _src_hh->keylen; \ _dst_hh->hashv = _src_hh->hashv; \ diff --git a/3rdparty/uthash/src/utstring.h b/3rdparty/uthash/src/utstring.h index ca25c902ca..4cf5ffd3dd 100644 --- a/3rdparty/uthash/src/utstring.h +++ b/3rdparty/uthash/src/utstring.h @@ -39,8 +39,13 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define UTSTRING_UNUSED #endif -#ifndef oom -#define oom() exit(-1) +#ifdef oom +#error "The name of macro 'oom' has been changed to 'utstring_oom'. Please update your code." +#define utstring_oom() oom() +#endif + +#ifndef utstring_oom +#define utstring_oom() exit(-1) #endif typedef struct { @@ -54,7 +59,9 @@ do { \ if (((s)->n - (s)->i) < (size_t)(amt)) { \ char *utstring_tmp = (char*)realloc( \ (s)->d, (s)->n + (amt)); \ - if (utstring_tmp == NULL) oom(); \ + if (!utstring_tmp) { \ + utstring_oom(); \ + } \ (s)->d = utstring_tmp; \ (s)->n += (amt); \ } \ @@ -81,9 +88,11 @@ do { \ #define utstring_new(s) \ do { \ - (s) = (UT_string*)malloc(sizeof(UT_string)); \ - if (!(s)) oom(); \ - utstring_init(s); \ + (s) = (UT_string*)malloc(sizeof(UT_string)); \ + if (!(s)) { \ + utstring_oom(); \ + } \ + utstring_init(s); \ } while(0) #define utstring_renew(s) \ diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c index c54c5472a5..1e6dc70662 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/net/core/dap_events.c @@ -76,10 +76,10 @@ #define LOG_TAG "dap_events" -bool s_workers_init = false; +static bool s_workers_init = false; static uint32_t s_threads_count = 1; -dap_worker_t *s_workers = NULL; -dap_thread_t *s_threads = NULL; +static dap_worker_t * volatile *s_workers = NULL; +static dap_thread_t *s_threads = NULL; uint32_t dap_get_cpu_count( ) @@ -116,7 +116,7 @@ int dap_events_init( uint32_t a_threads_count, size_t a_conn_timeout ) { s_threads_count = a_threads_count ? a_threads_count : dap_get_cpu_count( ); - s_workers = DAP_NEW_S_SIZE(dap_worker_t,sizeof(dap_worker_t) * s_threads_count ); + s_workers = DAP_NEW_Z_SIZE(dap_worker_t*,s_threads_count*sizeof (dap_worker_t*) ); s_threads = DAP_NEW_Z_SIZE(dap_thread_t, sizeof(dap_thread_t) * s_threads_count ); if ( !s_workers || !s_threads ) return -1; @@ -129,14 +129,15 @@ int dap_events_init( uint32_t a_threads_count, size_t a_conn_timeout ) goto err; } - log_it( L_NOTICE, "Initialized socket server module" ); + log_it( L_NOTICE, "Initialized event socket reactor for %u threads", s_threads_count ); #ifdef DAP_OS_UNIX - signal( SIGPIPE, SIG_IGN ); +// signal( SIGPIPE, SIG_IGN ); // ? #endif return 0; err: + log_it(L_ERROR,"Deinit events subsystem"); dap_events_deinit(); dap_worker_deinit(); return -1; @@ -162,7 +163,7 @@ void dap_events_deinit( ) */ dap_events_t * dap_events_new( ) { - dap_events_t *ret = (dap_events_t *)calloc( 1, sizeof(dap_events_t) ); + dap_events_t *ret = DAP_NEW_Z(dap_events_t); pthread_rwlock_init( &ret->sockets_rwlock, NULL ); pthread_rwlock_init( &ret->servers_rwlock, NULL ); @@ -199,21 +200,42 @@ void dap_events_delete( dap_events_t *a_events ) */ int dap_events_start( dap_events_t *a_events ) { + for( uint32_t i = 0; i < s_threads_count; i++) { - s_workers[i].id = i; - s_workers[i].events = a_events; + dap_worker_t * l_worker = DAP_NEW_Z(dap_worker_t); + + l_worker->id = i; + l_worker->events = a_events; #ifdef DAP_EVENTS_CAPS_EPOLL - s_workers[i].epoll_fd = epoll_create( DAP_MAX_EPOLL_EVENTS ); - if ( (intptr_t)s_workers[i].epoll_fd == -1 ) { + l_worker->epoll_fd = epoll_create( DAP_MAX_EPOLL_EVENTS ); + pthread_mutex_init(& l_worker->started_mutex, NULL); + pthread_cond_init( & l_worker->started_cond, NULL); + //log_it(L_DEBUG, "Created event_fd %d for worker %u", l_worker->epoll_fd,i); + if ( l_worker->epoll_fd == -1 ) { int l_errno = errno; char l_errbuf[128]; strerror_r(l_errno, l_errbuf, sizeof ( l_errbuf) ); log_it(L_CRITICAL, "Error create epoll fd: %s (%d)", l_errbuf, l_errno); + DAP_DELETE(l_worker); return -1; } #endif - pthread_mutex_init( &s_workers[i].locker_on_count, NULL ); - pthread_create( &s_threads[i].tid, NULL, dap_worker_thread, &s_workers[i] ); + s_workers[i] = l_worker; + pthread_mutex_lock(&l_worker->started_mutex); + struct timespec l_timeout; + clock_gettime(CLOCK_REALTIME, &l_timeout); + l_timeout.tv_sec+=5; + pthread_create( &s_threads[i].tid, NULL, dap_worker_thread, l_worker ); + + int l_ret; + l_ret=pthread_cond_timedwait(&l_worker->started_cond, &l_worker->started_mutex, &l_timeout); + if ( l_ret== ETIMEDOUT ){ + log_it(L_CRITICAL, "Timeout 5 seconds is out: worker #%u thread don't respond", i); + return -2; + } else if (l_ret != 0){ + log_it(L_CRITICAL, "Can't wait on condition: %d error code", l_ret); + return -3; + } } return 0; } @@ -254,7 +276,7 @@ uint32_t dap_events_worker_get_index_min( ) for( i = 1; i < s_threads_count; i++ ) { - if ( s_workers[min].event_sockets_count > s_workers[i].event_sockets_count ) + if ( s_workers[min]->event_sockets_count > s_workers[i]->event_sockets_count ) min = i; } @@ -272,8 +294,7 @@ uint32_t dap_events_worker_get_count() */ dap_worker_t *dap_events_worker_get_min( ) { - dap_worker_t *l_workers = &s_workers[dap_events_worker_get_index_min()]; - return l_workers; + return s_workers[dap_events_worker_get_index_min()]; } /** @@ -281,9 +302,12 @@ dap_worker_t *dap_events_worker_get_min( ) * @param a_index * @return */ -dap_worker_t * dap_events_worker_get_index(uint8_t a_index) +dap_worker_t * dap_events_worker_get(uint8_t a_index) { - return a_index < s_threads_count ? &s_workers[a_index] : NULL; + if (a_index < s_threads_count){ + return s_workers[a_index]; + }else + return NULL; } /** @@ -294,6 +318,6 @@ void dap_events_worker_print_all( ) uint32_t i; for( i = 0; i < s_threads_count; i ++ ) { log_it( L_INFO, "Worker: %d, count open connections: %d", - s_workers[i].id, s_workers[i].event_sockets_count ); + s_workers[i]->id, s_workers[i]->event_sockets_count ); } } diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 76f24690f6..ec4fe3cdc2 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -55,7 +55,7 @@ */ int dap_events_socket_init( ) { - log_it(L_NOTICE,"Initialized socket client module"); + log_it(L_NOTICE,"Initialized events socket module"); return 0; } @@ -129,6 +129,7 @@ dap_events_socket_t * dap_events_socket_create_type_event(dap_worker_t * a_w, da case EINVAL: log_it(L_CRITICAL, "Too old linux version thats doesn't support O_DIRECT flag for pipes (%s)", l_errbuf); break; default: log_it( L_ERROR, "Error detected, can't create pipe(): '%s' (%d)", l_errbuf, l_errno); } + DAP_DELETE(l_es); return NULL; }else log_it(L_DEBUG, "Created one-way unnamed pipe %d->%d", l_pipe[0], l_pipe[1]); @@ -139,10 +140,10 @@ dap_events_socket_t * dap_events_socket_create_type_event(dap_worker_t * a_w, da #if defined(DAP_EVENTS_CAPS_EPOLL) struct epoll_event l_ev={0}; int l_event_fd = l_es->fd; - log_it( L_INFO, "Create event descriptor with queue %d (%p)", l_event_fd, l_es); - l_ev.events = EPOLLIN | EPOLLET; - l_ev.data.ptr = l_es; - epoll_ctl(a_w->epoll_fd, EPOLL_CTL_ADD, l_event_fd, &l_ev); + //log_it( L_INFO, "Create event descriptor with queue %d (%p) and add it on epoll fd %d", l_event_fd, l_es, a_w->epoll_fd); + l_es->ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP | EPOLLHUP; + l_es->ev.data.ptr = l_es; + epoll_ctl(a_w->epoll_fd, EPOLL_CTL_ADD, l_event_fd, &l_es->ev); #endif return l_es; } @@ -183,7 +184,6 @@ void dap_events_socket_create_after( dap_events_socket_t *a_es ) dap_worker_add_events_socket_auto( a_es ); - pthread_mutex_lock( &a_es->worker->locker_on_count ); a_es->worker->event_sockets_count ++; @@ -197,7 +197,6 @@ void dap_events_socket_create_after( dap_events_socket_t *a_es ) if ( epoll_ctl( a_es->worker->epoll_fd, EPOLL_CTL_ADD, a_es->socket, &a_es->ev ) == 1 ) log_it( L_CRITICAL, "Can't add event socket's handler to epoll_fd" ); - pthread_mutex_unlock( &a_es->worker->locker_on_count ); } /** @@ -215,7 +214,7 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da assert( a_callbacks ); assert( a_server ); - log_it( L_DEBUG,"Sap event socket wrapped around %d sock", a_sock ); + log_it( L_DEBUG,"Dap event socket wrapped around %d sock", a_sock ); dap_events_socket_t * ret = DAP_NEW_Z( dap_events_socket_t ); ret->socket = a_sock; diff --git a/dap-sdk/net/core/dap_server.c b/dap-sdk/net/core/dap_server.c index f8d3756140..45f89872c1 100644 --- a/dap-sdk/net/core/dap_server.c +++ b/dap-sdk/net/core/dap_server.c @@ -53,6 +53,7 @@ #include "dap_common.h" #include "dap_config.h" #include "dap_server.h" +#include "dap_worker.h" #include "dap_events.h" #define LOG_TAG "dap_server" @@ -117,6 +118,7 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16 if (l_server->socket_listener < 0) { int l_errno = errno; log_it (L_ERROR,"Socket error %s (%d)",strerror(l_errno), l_errno); + DAP_DELETE(l_server); return NULL; } @@ -151,6 +153,9 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16 for(size_t l_worker_id = 0; l_worker_id < dap_events_worker_get_count() ; l_worker_id++){ dap_events_socket_t * l_es = dap_events_socket_wrap_no_add( a_events, l_server->socket_listener, &l_callbacks); + dap_worker_t *l_w = dap_events_worker_get(l_worker_id); + assert(l_w); + if ( l_es){ log_it(L_DEBUG, "Wrapped server socket %p on worker %u", l_es, l_worker_id); l_es->_inheritor = l_server; @@ -160,7 +165,7 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16 // Prepare for multi thread listening l_es->ev_base_flags = EPOLLET| EPOLLIN | EPOLLEXCLUSIVE; #endif - dap_worker_add_events_socket( l_es, dap_events_worker_get_index(l_worker_id) ); + dap_worker_add_events_socket( l_es, l_w ); } else{ log_it(L_WARNING, "Can't wrap event socket for %s:%u server", a_addr, a_port); return NULL; diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 6477a4b205..534ab5671a 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -67,22 +67,28 @@ void *dap_worker_thread(void *arg) dap_worker_t *l_worker = (dap_worker_t *) arg; time_t l_next_time_timeout_check = time( NULL) + s_connection_timeout / 2; uint32_t l_tn = l_worker->id; - #ifndef _WIN32 #ifndef NO_POSIX_SHED cpu_set_t mask; CPU_ZERO(&mask); CPU_SET(l_tn, &mask); - int err; + int l_retcode; #ifndef __ANDROID__ - err = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask); + l_retcode = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask); #else err = sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &mask); #endif - if(err) + if(l_retcode != 0) { - log_it(L_CRITICAL, "Error pthread_setaffinity_np() You really have %d or more core in CPU?", *(int* )arg); + char l_errbuf[128]={0}; + switch (l_retcode) { + case EFAULT: strncpy(l_errbuf,"A supplied memory address was invalid.",sizeof (l_errbuf)-1); break; + case EINVAL: strncpy(l_errbuf,"The affinity bit mask mask contains no processors that are currently physically on the system and permitted to the thread",sizeof (l_errbuf)-1); break; + case ESRCH: strncpy(l_errbuf,"No thread with the ID thread could be found",sizeof (l_errbuf)-1); break; + default: strncpy(l_errbuf,"Unknown error",sizeof (l_errbuf)-1); + } + log_it(L_CRITICAL, "Worker #%u: error pthread_setaffinity_np(): %s (%d)", l_errbuf , l_retcode); abort(); } #endif @@ -97,15 +103,15 @@ void *dap_worker_thread(void *arg) l_worker->event_new_es = dap_events_socket_create_type_event( l_worker, s_new_es_callback); l_worker->event_delete_es = dap_events_socket_create_type_event( l_worker, s_new_es_callback); - log_it(L_INFO, "Worker %d started, epoll fd %d", l_worker->id, l_worker->epoll_fd); #ifdef DAP_EVENTS_CAPS_EPOLL struct epoll_event l_epoll_events[ DAP_MAX_EPOLL_EVENTS]= {{0}}; + log_it(L_INFO, "Worker #%d started with epoll fd %d and assigned to dedicated CPU unit", l_worker->id, l_worker->epoll_fd); #else #error "Unimplemented socket array for this platform" #endif + pthread_cond_broadcast(&l_worker->started_cond); bool s_loop_is_active = true; - while(s_loop_is_active) { #ifdef DAP_EVENTS_CAPS_EPOLL int l_selected_sockets = epoll_wait(l_worker->epoll_fd, l_epoll_events, DAP_MAX_EPOLL_EVENTS, -1); @@ -115,7 +121,10 @@ void *dap_worker_thread(void *arg) if(l_selected_sockets == -1) { if( errno == EINTR) continue; - log_it(L_ERROR, "Worker thread %d got errno: %d", l_worker->id, errno); + int l_errno = errno; + char l_errbuf[128]; + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + log_it(L_ERROR, "Worker thread %d got errno:\"%s\" (%d)", l_worker->id, l_errbuf, l_errno); break; } diff --git a/dap-sdk/net/core/include/dap_events.h b/dap-sdk/net/core/include/dap_events.h index 7e4adebcfd..f5fd5a3d84 100644 --- a/dap-sdk/net/core/include/dap_events.h +++ b/dap-sdk/net/core/include/dap_events.h @@ -61,5 +61,5 @@ uint32_t dap_events_worker_get_index_min( ); uint32_t dap_events_worker_get_count(); dap_worker_t *dap_events_worker_get_min( ); -dap_worker_t * dap_events_worker_get_index(uint8_t a_index); +dap_worker_t * dap_events_worker_get(uint8_t a_index); uint32_t dap_get_cpu_count(); diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/net/core/include/dap_worker.h index 558cb3ea74..ce593a3d2e 100644 --- a/dap-sdk/net/core/include/dap_worker.h +++ b/dap-sdk/net/core/include/dap_worker.h @@ -23,6 +23,7 @@ #pragma once #include <stdint.h> #include <stdatomic.h> +#include <pthread.h> #include "dap_events_socket.h" typedef struct dap_worker @@ -33,8 +34,10 @@ typedef struct dap_worker dap_events_socket_t * event_new_es; // Events socket for new socket dap_events_socket_t * event_delete_es; // Events socket for new socket EPOLL_HANDLE epoll_fd; - pthread_mutex_t locker_on_count; dap_events_t *events; + + pthread_cond_t started_cond; + pthread_mutex_t started_mutex; } dap_worker_t; int dap_worker_init( size_t a_conn_timeout ); diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index 93dee5bad4..141197295f 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -399,7 +399,7 @@ int s_vpn_tun_create(dap_config_t * g_config) int err = -1; for( uint8_t i =0; i< l_cpu_count; i++){ - dap_worker_t * l_worker = dap_events_worker_get_index(i); + dap_worker_t * l_worker = dap_events_worker_get(i); assert( l_worker ); int l_tun_fd; if( (l_tun_fd = open("/dev/net/tun", O_RDWR| O_NONBLOCK)) < 0 ) { -- GitLab