diff --git a/dap-sdk/core/libdap.pri b/dap-sdk/core/libdap.pri index e4288f257471cf0f51dfa068995da51644116c1d..4ba46b269cba1a4ed2fe57531b87ca0b4dfd36eb 100755 --- a/dap-sdk/core/libdap.pri +++ b/dap-sdk/core/libdap.pri @@ -47,6 +47,7 @@ win32 { LIBS += -lntdll -lpsapi -ljson-c -lmagic -lmqrt -lshlwapi -lregex -ltre -lintl -liconv -lbcrypt -lcrypt32 -lsecur32 -luser32 -lws2_32 -lole32 include($$PWD/../../3rdparty/wepoll/wepoll.pri) DEFINES += DAP_OS_WINDOWS + QMAKE_CFLAGS_DEBUG += -Wall -ggdb -g3 } # 3rd party diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index 101f94c4b230e8a3f6c401a69a184e398e2cdd8b..cbb7620d07abe44e530b4deef8c5f03cb50107cc 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -185,9 +185,9 @@ static bool s_timer_timeout_after_connected_check(void * a_arg) dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); // We're in own esocket context assert(l_worker); - dap_events_socket_t * l_es; if(l_es = dap_worker_esocket_find_uuid( l_worker, l_es_handler->esocket_uuid ) ){ + dap_client_http_pvt_t * l_http_pvt = PVT(l_es); assert(l_http_pvt); if ( time(NULL)- l_http_pvt->ts_last_read >= (time_t) s_client_timeout_read_after_connect_ms){ @@ -224,7 +224,6 @@ static bool s_timer_timeout_check(void * a_arg) dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); // We're in own esocket context assert(l_worker); - dap_events_socket_t * l_es; if(l_es = dap_worker_esocket_find_uuid(l_worker, l_es_handler->esocket_uuid)){ if (l_es->flags & DAP_SOCK_CONNECTING ){ @@ -616,9 +615,9 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_upli log_it(L_DEBUG, "Connecting to %s:%u", a_uplink_addr, a_uplink_port); l_http_pvt->worker = a_worker?a_worker: dap_events_worker_get_auto(); dap_worker_add_events_socket(l_ev_socket,l_http_pvt->worker); - dap_events_socket_handler_t * l_ev_socket_handler = DAP_NEW_Z(dap_events_socket_handler_t); - l_ev_socket_handler->esocket = l_ev_socket; - l_ev_socket_handler->uuid = l_ev_socket->uuid; + dap_events_socket_handle_t * l_ev_socket_handler = DAP_NEW_Z(dap_events_socket_handle_t); + l_ev_socket_handler->esocket_uuid = l_ev_socket->uuid; + dap_timerfd_start_on_worker(l_http_pvt->worker,s_client_timeout_ms, s_timer_timeout_check,l_ev_socket_handler); return l_http_pvt; } else { diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c index d0c057becf510463ad2e015e2396b25680baaa40..4bdb38341fc31d49cc466791c05740d12ad49fb0 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/net/core/dap_events.c @@ -249,6 +249,7 @@ dap_events_t * dap_events_new( ) pthread_rwlock_init( &ret->sockets_rwlock, NULL ); if ( s_events_default == NULL) s_events_default = ret; + s_events_default->sockets = NULL; pthread_key_create( &ret->pth_key_worker, NULL); return ret; @@ -268,18 +269,30 @@ void dap_events_delete( dap_events_t *a_events ) if (a_events) { dap_events_socket_t *l_cur, *l_tmp; HASH_ITER( hh, a_events->sockets,l_cur, l_tmp ) { + HASH_DEL(a_events->sockets, l_cur); dap_events_socket_remove_and_delete_unsafe( l_cur, true ); } - if ( a_events->_inheritor ) DAP_DELETE( a_events->_inheritor ); - pthread_rwlock_destroy( &a_events->sockets_rwlock ); DAP_DELETE( a_events ); } } +void dap_events_remove_and_delete_socket_unsafe(dap_events_t *a_events, dap_events_socket_t *a_socket, bool preserve_inheritor) { + if (!a_events) + return; + pthread_rwlock_wrlock(&a_events->sockets_rwlock); + dap_events_socket_t * l_es_find = NULL; + HASH_FIND_INT( a_events->sockets, &a_socket->socket, l_es_find ); + if (l_es_find) { + HASH_DEL(a_events->sockets, l_es_find); + dap_events_socket_remove_and_delete_unsafe(l_es_find, preserve_inheritor); + } + pthread_rwlock_unlock(&a_events->sockets_rwlock); +} + /** * @brief sa_server_loop Main server loop * @param sh Server instance @@ -293,6 +306,7 @@ int dap_events_start( dap_events_t *a_events ) l_worker->id = i; l_worker->events = a_events; + l_worker->esockets = NULL; pthread_rwlock_init(&l_worker->esocket_rwlock,NULL); pthread_mutex_init(& l_worker->started_mutex, NULL); pthread_cond_init( & l_worker->started_cond, NULL); @@ -324,7 +338,6 @@ int dap_events_start( dap_events_t *a_events ) clock_gettime(CLOCK_REALTIME, &l_timeout); l_timeout.tv_sec+=15; pthread_create( &s_threads[i].tid, NULL, dap_worker_thread, l_worker ); - int l_ret; l_ret=pthread_cond_timedwait(&l_worker->started_cond, &l_worker->started_mutex, &l_timeout); pthread_mutex_unlock(&l_worker->started_mutex); diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index be711ec2f83c4e513c025b04fa75eec7798bfc2d..20b110ad862492d30b51eaf044c2c2ef3279dc11 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -196,7 +196,7 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, dap_events_socket_t * l_es_find = NULL; HASH_FIND_INT( a_events->sockets, &a_sock, l_es_find ); if(l_es_find) - log_it(L_ERROR,"Trying to add %d descriptor in hashtable but found %p esocket with same socket", a_sock, l_es_find); + log_it(L_ERROR,"Trying to add socket %d to hashtable but found %p esocket with same socket", a_sock, l_es_find); else HASH_ADD_INT(a_events->sockets, socket, l_ret); pthread_rwlock_unlock(&a_events->sockets_rwlock); @@ -507,7 +507,7 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket WCHAR l_direct_name[MQ_MAX_Q_NAME_LEN] = { 0 }; int pos = 0; #ifdef DAP_BRAND - pos = _snwprintf_s(l_direct_name, sizeof(l_direct_name)/sizeof(l_direct_name[0]), _TRUNCATE, L"DIRECT=OS:.\\PRIVATE$\\" DAP_BRAND "_esmq%d", l_es->mq_num); + pos = _snwprintf_s(l_direct_name, sizeof(l_direct_name)/sizeof(l_direct_name[0]), _TRUNCATE, L"DIRECT=OS:.\\PRIVATE$\\" DAP_BRAND "mq%d", l_es->mq_num); #else pos = _snwprintf_s(l_direct_name, sizeof(l_direct_name)/sizeof(l_direct_name[0]), _TRUNCATE, L"DIRECT=OS:.\\PRIVATE$\\%hs_esmq%d", dap_get_appname(), l_es->mq_num); #endif @@ -711,7 +711,7 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc static atomic_uint s_queue_num = 0; int pos = 0; #ifdef DAP_BRAND - pos = _snwprintf_s(l_pathname, sizeof(l_pathname)/sizeof(l_pathname[0]), _TRUNCATE, L".\\PRIVATE$\\" DAP_BRAND "_esmq%d", l_es->mq_num = s_queue_num++); + pos = _snwprintf_s(l_pathname, sizeof(l_pathname)/sizeof(l_pathname[0]), _TRUNCATE, L".\\PRIVATE$\\" DAP_BRAND "mq%d", l_es->mq_num = s_queue_num++); #else pos = _snwprintf_s(l_pathname, sizeof(l_pathname)/sizeof(l_pathname[0]), _TRUNCATE, L".\\PRIVATE$\\%hs_esmq%d", dap_get_appname(), l_es->mq_num = s_queue_num++); #endif @@ -1491,7 +1491,8 @@ dap_events_socket_t *dap_worker_esocket_find_uuid(dap_worker_t * a_worker, dap_e dap_events_socket_t * l_ret = NULL; if(a_worker->esockets ) { pthread_rwlock_rdlock(&a_worker->esocket_rwlock); - HASH_FIND_PTR( a_worker->esockets, &a_es_uuid,l_ret ); + //HASH_FIND_PTR( a_worker->esockets, &a_es_uuid,l_ret ); + HASH_FIND(hh_worker, a_worker->esockets, &a_es_uuid, sizeof(a_es_uuid), l_ret ); pthread_rwlock_unlock(&a_worker->esocket_rwlock ); } return l_ret; @@ -1737,8 +1738,10 @@ bool s_remove_and_delete_unsafe_delayed_delete_callback(void * a_arg) assert(l_worker); dap_events_socket_t * l_es; if( (l_es = dap_worker_esocket_find_uuid(l_worker, l_es_handler->esocket_uuid)) != NULL) - dap_events_socket_remove_and_delete_unsafe(l_es,l_es_handler->value == 1); + //dap_events_socket_remove_and_delete_unsafe(l_es,l_es_handler->value == 1); + dap_events_remove_and_delete_socket_unsafe(dap_events_get_default(), l_es, l_es_handler->value == 1); DAP_DELETE(l_es_handler); + return false; } diff --git a/dap-sdk/net/core/dap_timerfd.c b/dap-sdk/net/core/dap_timerfd.c index 8e5c30c693eeddfd75230c89f0b8d3094802d867..de549626a35115d3cd7976259838ecec2f134ccb 100644 --- a/dap-sdk/net/core/dap_timerfd.c +++ b/dap-sdk/net/core/dap_timerfd.c @@ -43,6 +43,9 @@ #define LOG_TAG "dap_timerfd" static void s_es_callback_timer(struct dap_events_socket *a_event_sock); +#ifdef DAP_OS_WINDOWS + static HANDLE hTimerQueue = NULL; +#endif /** * @brief dap_events_socket_init Init clients module @@ -50,6 +53,13 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock); */ int dap_timerfd_init() { +#ifdef DAP_OS_WINDOWS + hTimerQueue = CreateTimerQueue(); + if (!hTimerQueue) { + log_it(L_CRITICAL, "Timer queue failed, err %d", GetLastError()); + return -4; + } +#endif log_it(L_NOTICE, "Initialized timerfd"); return 0; } @@ -74,6 +84,14 @@ void __stdcall TimerAPCb(void* arg, DWORD low, DWORD high) { // Timer high valu log_it(L_CRITICAL, "Error occured on writing into socket from APC, errno: %d", WSAGetLastError()); } } + +void __stdcall TimerRoutine(void* arg, BOOLEAN flag) { + UNREFERENCED_PARAMETER(flag) + dap_timerfd_t *l_timerfd = (dap_timerfd_t *)arg; + if (dap_sendto(l_timerfd->tfd, l_timerfd->port, NULL, 0) == SOCKET_ERROR) { + log_it(L_CRITICAL, "Error occured on writing into socket from timer routine, errno: %d", WSAGetLastError()); + } +} #endif @@ -179,13 +197,13 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t #elif defined (DAP_OS_WINDOWS) - HANDLE l_th = CreateWaitableTimer(NULL, true, NULL); + /*HANDLE l_th = CreateWaitableTimer(NULL, true, NULL); if (!l_th) { log_it(L_CRITICAL, "Waitable timer not created, error %d", GetLastError()); DAP_DELETE(l_timerfd); return NULL; - } - + }*/ + l_timerfd->th = NULL; SOCKET l_tfd = socket(AF_INET, SOCK_DGRAM, 0); int buffsize = 1024; @@ -210,13 +228,19 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t //log_it(L_DEBUG, "Bound to port %d", l_addr.sin_port); } - LARGE_INTEGER l_due_time; + /*LARGE_INTEGER l_due_time; l_due_time.QuadPart = (long long)a_timeout_ms * _MSEC; if (!SetWaitableTimer(l_th, &l_due_time, 0, TimerAPCb, l_timerfd, false)) { log_it(L_CRITICAL, "Waitable timer not set, error %d", GetLastError()); CloseHandle(l_th); DAP_DELETE(l_timerfd); return NULL; + } */ + if (!CreateTimerQueueTimer(&l_timerfd->th, hTimerQueue, + (WAITORTIMERCALLBACK)TimerRoutine, l_timerfd, (unsigned long)a_timeout_ms, 0, 0)) { + log_it(L_CRITICAL, "Timer not set, error %d", GetLastError()); + DAP_DELETE(l_timerfd); + return NULL; } l_events_socket->socket = l_tfd; #endif @@ -224,9 +248,9 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t #if defined (DAP_OS_LINUX) || defined (DAP_OS_WINDOWS) l_timerfd->tfd = l_tfd; #endif -#ifdef DAP_OS_WINDOWS - l_timerfd->th = l_th; -#endif +//#ifdef DAP_OS_WINDOWS + //l_timerfd->th = l_th; +//#endif return l_timerfd; } @@ -257,12 +281,12 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock) //EV_SET(l_event, 0, a_event_sock->kqueue_base_filter, a_event_sock->kqueue_base_flags,a_event_sock->kqueue_base_fflags,a_event_sock->kqueue_data,a_event_sock); //kevent(a_event_sock->worker->kqueue_fd,l_event,1,NULL,0,NULL); #elif defined (DAP_OS_WINDOWS) - LARGE_INTEGER l_due_time; + /*LARGE_INTEGER l_due_time; l_due_time.QuadPart = (long long)l_timerfd->timeout_ms * _MSEC; if (!SetWaitableTimer(l_timerfd->th, &l_due_time, 0, TimerAPCb, l_timerfd, false)) { log_it(L_CRITICAL, "Waitable timer not reset, error %d", GetLastError()); CloseHandle(l_timerfd->th); - } + }*/ // Wtf is this entire thing for?... #else #error "No timer callback realization for your platform" #endif @@ -276,7 +300,7 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock) close(l_timerfd->tfd); #elif defined(DAP_OS_WINDOWS) closesocket(l_timerfd->tfd); - CloseHandle(l_timerfd->th); + //CloseHandle(l_timerfd->th); #endif l_timerfd->events_socket->flags |= DAP_SOCK_SIGNAL_CLOSE; } diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 65f5b1d456c155657c43986542de0d0937ea6f74..71e4c8931de96f7303e2db0fe125d841381a3316 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -833,8 +833,8 @@ void *dap_worker_thread(void *arg) // Here we expect thats event duplicates goes together in it. If not - we lose some events between. } } - - dap_events_socket_remove_and_delete_unsafe( l_cur, false); + //dap_events_socket_remove_and_delete_unsafe( l_cur, false); + dap_events_remove_and_delete_socket_unsafe(dap_events_get_default(), l_cur, false); #ifdef DAP_EVENTS_CAPS_KQUEUE l_worker->kqueue_events_count--; #endif diff --git a/dap-sdk/net/core/include/dap_events.h b/dap-sdk/net/core/include/dap_events.h index 0c5932a3e67255d647ba92952a3202349731d06c..f766af0dd3d3946e6f54706ce9db1d86b60b680c 100644 --- a/dap-sdk/net/core/include/dap_events.h +++ b/dap-sdk/net/core/include/dap_events.h @@ -55,6 +55,7 @@ void dap_events_deinit( ); // Deinit server module dap_events_t* dap_events_new( ); dap_events_t* dap_events_get_default( ); void dap_events_delete( dap_events_t * a_events ); +void dap_events_remove_and_delete_socket_unsafe(dap_events_t*, dap_events_socket_t*, bool); int32_t dap_events_start( dap_events_t *a_events ); void dap_events_stop_all();