diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 27cd66e8fcc2faea49b269c06d27cd2d25671f8b..ca1770fbdfdef07fab900a696b03cd8d7d735145 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -361,10 +361,12 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket l_es->mqh_recv = a_es->mqh_recv; l_es->socket = a_es->socket; + l_es->port = a_es->port; + l_es->mq_num = a_es->mq_num; WCHAR l_direct_name[MQ_MAX_Q_NAME_LEN + 1] = { 0 }; size_t l_sz_in_words = sizeof(l_direct_name)/sizeof(l_direct_name[0]); - int pos = _snwprintf_s(l_direct_name, l_sz_in_words, l_sz_in_words - 1, L"DIRECT=OS:.\\PRIVATE$\\DapEventSocketQueue%d", l_es->socket); + int pos = _snwprintf_s(l_direct_name, l_sz_in_words, l_sz_in_words - 1, L"DIRECT=OS:.\\PRIVATE$\\DapEventSocketQueue%d", l_es->mq_num); if (pos < 0) { log_it(L_ERROR, "Message queue path error"); DAP_DELETE(l_es); @@ -508,13 +510,16 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc l_addr.sin_family = AF_INET; IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } }; l_addr.sin_addr = _in_addr; - l_addr.sin_port = l_es->socket + 32768; + l_addr.sin_port = 0; //l_es->socket + 32768; l_addr_len = sizeof(struct sockaddr_in); if (bind(l_es->socket, (struct sockaddr*)&l_addr, sizeof(l_addr)) < 0) { log_it(L_ERROR, "Bind error: %d", WSAGetLastError()); } else { - log_it(L_INFO, "Binded %d", l_es->socket); + int dummy = 100; + getsockname(l_es->socket, (struct sockaddr*)&l_addr, &dummy); + l_es->port = l_addr.sin_port; + //log_it(L_DEBUG, "Bound to port %d", l_addr.sin_port); } MQQUEUEPROPS l_qps; @@ -523,8 +528,9 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc HRESULT l_q_status[1]; WCHAR l_pathname[MAX_PATH] = { 0 }; + static atomic_uint s_queue_num = 0; size_t l_sz_in_words = sizeof(l_pathname)/sizeof(l_pathname[0]); - int pos = _snwprintf_s(l_pathname, l_sz_in_words, l_sz_in_words - 1, L".\\PRIVATE$\\DapEventSocketQueue%d", l_es->socket); + int pos = _snwprintf_s(l_pathname, l_sz_in_words, l_sz_in_words - 1, L".\\PRIVATE$\\DapEventSocketQueue%d", l_es->mq_num = s_queue_num++); if (pos < 0) { log_it(L_ERROR, "Message queue path error"); DAP_DELETE(l_es); @@ -574,6 +580,10 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc MQDeleteQueue(l_format_name); return NULL; } + hr = MQPurgeQueue(l_es->mqh_recv); + if (hr != MQ_OK) { + log_it(L_DEBUG, "Message queue %d NOT purged, possible data corruption, err %d", l_es->mq_num, hr); + } #else #error "Not implemented s_create_type_queue_ptr() on your platform" @@ -779,13 +789,16 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_ l_addr.sin_family = AF_INET; IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } }; l_addr.sin_addr = _in_addr; - l_addr.sin_port = l_es->socket + 32768; + l_addr.sin_port = 0; //l_es->socket + 32768; l_addr_len = sizeof(struct sockaddr_in); if (bind(l_es->socket, (struct sockaddr*)&l_addr, sizeof(l_addr)) < 0) { log_it(L_ERROR, "Bind error: %d", WSAGetLastError()); } else { - log_it(L_INFO, "Binded %d", l_es->socket); + int dummy = 100; + getsockname(l_es->socket, (struct sockaddr*)&l_addr, &dummy); + l_es->port = l_addr.sin_port; + //log_it(L_DEBUG, "Bound to port %d", l_addr.sin_port); } #endif return l_es; @@ -1035,7 +1048,7 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg) return hr; } - if(dap_sendto(a_es->socket, NULL, 0) == SOCKET_ERROR) { + if(dap_sendto(a_es->socket, a_es->port, NULL, 0) == SOCKET_ERROR) { return WSAGetLastError(); } else { return 0; @@ -1079,7 +1092,7 @@ int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value return 1; #elif defined DAP_OS_WINDOWS a_es->buf_out[0] = (u_short)a_value; - if(dap_sendto(a_es->socket, a_es->buf_out, sizeof(uint64_t)) == SOCKET_ERROR) { + if(dap_sendto(a_es->socket, a_es->port, a_es->buf_out, sizeof(uint64_t)) == SOCKET_ERROR) { return WSAGetLastError(); } else { return 0; @@ -1675,13 +1688,13 @@ inline int dap_recvfrom(SOCKET s, void* buf_in, size_t buf_size) { return ret; } -inline int dap_sendto(SOCKET s, void* buf_out, size_t buf_out_size) { +inline int dap_sendto(SOCKET s, u_short port, void* buf_out, size_t buf_out_size) { int l_addr_len; struct sockaddr_in l_addr; l_addr.sin_family = AF_INET; IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } }; l_addr.sin_addr = _in_addr; - l_addr.sin_port = s + 32768; + l_addr.sin_port = port; l_addr_len = sizeof(struct sockaddr_in); int ret; if (buf_out) { diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index d234cfde624c30c3f75d15eff873dda6324b6a98..761c375018c94a389c42648ce92882f212b7ec67 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -475,7 +475,7 @@ static void * s_proc_thread_function(void * a_arg) log_it(L_ERROR, "An error occured on sending message to queue, errno: 0x%x", hr); break; } else { - if(dap_sendto(l_cur->socket, NULL, 0) == SOCKET_ERROR) { + if(dap_sendto(l_cur->socket, l_cur->port, NULL, 0) == SOCKET_ERROR) { log_it(L_ERROR, "Write to sock error: %d", WSAGetLastError()); } l_cur->buf_out_size = 0; diff --git a/dap-sdk/net/core/dap_timerfd.c b/dap-sdk/net/core/dap_timerfd.c index 01b0b706b4030d81a10b8dbb4129f874b441727e..584c0e2c8ff49a5e9e8b170f89a576f1eef5b1ee 100644 --- a/dap-sdk/net/core/dap_timerfd.c +++ b/dap-sdk/net/core/dap_timerfd.c @@ -69,7 +69,7 @@ void __stdcall TimerAPCb(void* arg, DWORD low, DWORD high) { // Timer high valu UNREFERENCED_PARAMETER(low) UNREFERENCED_PARAMETER(high) dap_timerfd_t *l_timerfd = (dap_timerfd_t *)arg; - if (dap_sendto(l_timerfd->tfd, NULL, 0) == SOCKET_ERROR) { + if (dap_sendto(l_timerfd->tfd, l_timerfd->port, NULL, 0) == SOCKET_ERROR) { log_it(L_CRITICAL, "Error occured on writing into socket from APC, errno: %d", WSAGetLastError()); } } @@ -128,13 +128,15 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t l_addr.sin_family = AF_INET; IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } }; l_addr.sin_addr = _in_addr; - l_addr.sin_port = l_tfd + 32768; + l_addr.sin_port = 0; //l_tfd + 32768; l_addr_len = sizeof(struct sockaddr_in); - if (bind(l_tfd, (struct sockaddr*)&l_addr, sizeof(l_addr)) < 0) { log_it(L_ERROR, "Bind error: %d", WSAGetLastError()); } else { - log_it(L_INFO, "Binded %d", l_tfd); + int dummy = 100; + getsockname(l_tfd, (struct sockaddr*)&l_addr, &dummy); + l_timerfd->port = l_addr.sin_port; + //log_it(L_DEBUG, "Bound to port %d", l_addr.sin_port); } LARGE_INTEGER l_due_time; diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 67e9eebad90062854869b8c634e12ff6ec9a2c3c..c8efe36d2a4c8b2500093095967c66ca38f475ec 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -529,7 +529,7 @@ void *dap_worker_thread(void *arg) } else { l_errno = WSAGetLastError(); - if(dap_sendto(l_cur->socket, NULL, 0) == SOCKET_ERROR) { + if(dap_sendto(l_cur->socket, l_cur->port, NULL, 0) == SOCKET_ERROR) { log_it(L_ERROR, "Write to socket error: %d", WSAGetLastError()); } l_cur->buf_out_size = 0; diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index b83875f4e4b34fcc0ac11ea2a0165be697d5e6c8..aea7be30d5ddb50ddd81997447ad8f23e06f3263 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -153,6 +153,8 @@ typedef struct dap_events_socket { #elif defined DAP_EVENTS_CAPS_MSMQ }; QUEUEHANDLE mqh, mqh_recv; + u_int mq_num; + u_short port; HANDLE ev_timeout, ev_recv; #endif @@ -299,7 +301,7 @@ void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_siz #ifdef DAP_OS_WINDOWS extern inline int dap_recvfrom(SOCKET s, void* buf_in, size_t buf_size); -extern inline int dap_sendto(SOCKET s, void* buf_in, size_t buf_size); +extern inline int dap_sendto(SOCKET s, u_short port, void* buf_in, size_t buf_size); #endif diff --git a/dap-sdk/net/core/include/dap_timerfd.h b/dap-sdk/net/core/include/dap_timerfd.h index 8a17faad90d070b392bd52f8a6dde0e2312595e9..671e8b42ce6b41abf92138e32e6b576a01f07d9e 100644 --- a/dap-sdk/net/core/include/dap_timerfd.h +++ b/dap-sdk/net/core/include/dap_timerfd.h @@ -44,6 +44,7 @@ typedef struct dap_timerfd { uint64_t timeout_ms; #ifdef DAP_OS_WINDOWS SOCKET tfd; + u_short port; #else int tfd; //timer file descriptor #endif