diff --git a/CMakeLists.txt b/CMakeLists.txt index 2be4800018a3677191900ec9c6301f0c2d9ff2ce..c7364facbb7b82333be9e11d04d351162f4f5796 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -132,7 +132,7 @@ endif() if (WIN32) set(CELLFRAME_LIBS ${CELLFRAME_LIBS} KERNEL32 USER32 SHELL32 WINMM GDI32 ADVAPI32 Ole32 Version Imm32 OleAut32 ws2_32 ntdll psapi - Shlwapi Bcrypt Crypt32 Secur32 userenv ) + Shlwapi Bcrypt Crypt32 Secur32 userenv mqrt) endif() target_link_libraries(${PROJECT_NAME} ${CELLFRAME_LIBS}) diff --git a/dap-sdk/core/include/dap_common.h b/dap-sdk/core/include/dap_common.h index 3f1c18578befce106b52f8fd53b7ea6b24c7af38..cd127584380a056b9d868a6d56a9f3ee0c5c8e22 100755 --- a/dap-sdk/core/include/dap_common.h +++ b/dap-sdk/core/include/dap_common.h @@ -37,6 +37,8 @@ #ifdef DAP_OS_WINDOWS #include <fcntl.h> #define pipe(pfds) _pipe(pfds, 4096, _O_BINARY) +#define strerror_r(arg1, arg2, arg3) strerror_s(arg2, arg3, arg1) +#define ctime_r(arg1, arg2) ctime_s(arg2, sizeof(arg2), arg1) #endif #ifdef __MACH__ #include <dispatch/dispatch.h> diff --git a/dap-sdk/net/client/dap_client.c b/dap-sdk/net/client/dap_client.c index 2aa777ee3a50f13fded89a2cc728bb4cdf26f66f..d1a84860fb0e1582498500fc89048f961190fba3 100644 --- a/dap-sdk/net/client/dap_client.c +++ b/dap-sdk/net/client/dap_client.c @@ -188,7 +188,7 @@ void dap_client_delete_unsafe(dap_client_t * a_client) if ( DAP_CLIENT_PVT(a_client)->refs_count ==0 ){ dap_client_pvt_delete( DAP_CLIENT_PVT(a_client) ); pthread_mutex_destroy(&a_client->mutex); - DAP_DELETE(a_client); + DAP_DEL_Z(a_client) } else DAP_CLIENT_PVT(a_client)->is_to_delete = true; } @@ -265,7 +265,7 @@ static void s_go_stage_on_client_worker_unsafe(dap_worker_t * a_worker,void * a_ dap_client_delete_unsafe(l_client_pvt->client); return; } - log_it(L_DEBUG, "Start transitions chain for client %p from %s to %s", l_client_pvt->client, dap_client_stage_str(l_cur_stage ) , dap_client_stage_str(l_stage_target)); + log_it(L_DEBUG, "Start transitions chain for client %p -> %p from %s to %s", l_client_pvt, l_client_pvt->client, dap_client_stage_str(l_cur_stage ) , dap_client_stage_str(l_stage_target)); l_client_pvt->stage_target = l_stage_target; l_client_pvt->stage_target_done_callback = l_stage_end_callback; if (l_stage_target < l_cur_stage) { diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index 202b9b1e46201e83ea5efb7f7a19ed5c406d34b1..488e55c324939e61a79fb16ff0953ba8a8b629a0 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -33,6 +33,7 @@ #include "dap_client.h" #include "dap_client_pvt.h" #include "dap_client_http.h" +#include "dap_enc_base64.h" #define LOG_TAG "dap_client_http" @@ -63,10 +64,9 @@ typedef struct dap_http_client_internal { uint16_t uplink_port; const char *method; const char *request_content_type; - const char * path; + char * path; char *cookie; - char **request_custom_headers; // Custom headers - size_t request_custom_headers_count; + char *request_custom_headers; // Custom headers // Request vars dap_worker_t * worker; @@ -88,7 +88,6 @@ static void s_http_error(dap_events_socket_t * a_es, int a_arg); */ static void s_http_read(dap_events_socket_t * a_es, void * arg) { -// log_it(L_DEBUG, "s_http_read "); dap_client_http_pvt_t * l_client_http_internal = PVT(a_es); if(!l_client_http_internal) { log_it(L_ERROR, "s_http_read: l_client_http_internal is NULL!"); @@ -182,22 +181,18 @@ static void s_http_error(dap_events_socket_t * a_es, int a_errno) if (a_errno == ETIMEDOUT){ strncpy(l_errbuf,"Connection timeout", sizeof (l_errbuf)-1); }else if (a_errno == ECONNREFUSED){ - strncpy(l_errbuf,"Connection refused", sizeof (l_errbuf)-1); - }else if (a_errno == EHOSTDOWN){ strncpy(l_errbuf,"Host is down", sizeof (l_errbuf)-1); }else if (a_errno == EHOSTUNREACH){ strncpy(l_errbuf,"No route to host", sizeof (l_errbuf)-1); - }else if (a_errno == EREMOTEIO){ - strncpy(l_errbuf,"Remote I/O error", sizeof (l_errbuf)-1); }else if(a_errno) strerror_r(a_errno, l_errbuf, sizeof (l_errbuf)); else strncpy(l_errbuf,"Unknown Error", sizeof (l_errbuf)-1); if (a_es->flags & DAP_SOCK_CONNECTING){ - log_it(L_WARNING, "Socket connecting error: %s (code %d)" , l_errbuf, a_errno); + log_it(L_WARNING, "Socket %d connecting error: %s (code %d)" , a_es->socket, l_errbuf, a_errno); }else - log_it(L_WARNING, "Socket error: %s (code %d)" , l_errbuf, a_errno); + log_it(L_WARNING, "Socket %d error: %s (code %d)", a_es->socket, l_errbuf, a_errno); dap_client_http_pvt_t * l_client_http_internal = PVT(a_es); @@ -228,19 +223,10 @@ static void s_client_http_delete(dap_client_http_pvt_t * a_http_pvt) return; } - if (a_http_pvt->response){ - DAP_DELETE(a_http_pvt->response); - a_http_pvt->response = NULL; - } - - if(a_http_pvt->request_custom_headers != NULL) { - for( size_t i = 0; i < a_http_pvt->request_custom_headers_count; i++) { - DAP_DELETE( a_http_pvt->request_custom_headers[i]); - } - a_http_pvt->request_custom_headers = NULL; - //DAP_DELETE( l_client_http_pvt->request_custom_headers); - } - + DAP_DEL_Z(a_http_pvt->response) + DAP_DEL_Z(a_http_pvt->path) + DAP_DEL_Z(a_http_pvt->request) + DAP_DEL_Z(a_http_pvt->request_custom_headers) } @@ -264,7 +250,7 @@ static void s_client_http_delete(dap_client_http_pvt_t * a_http_pvt) void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplink_addr, uint16_t a_uplink_port, const char *a_method, const char *a_request_content_type, const char * a_path, void *a_request, size_t a_request_size, char *a_cookie, dap_client_http_callback_data_t a_response_callback, dap_client_http_callback_error_t a_error_callback, - void *a_obj, char **a_custom, size_t a_custom_count) + void *a_obj, char *a_custom) { //log_it(L_DEBUG, "HTTP request on url '%s:%d'", a_uplink_addr, a_uplink_port); @@ -275,12 +261,23 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin }; // create socket - int l_socket = socket( PF_INET, SOCK_STREAM, 0); +#ifdef DAP_OS_WINDOWS + SOCKET l_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + if (l_socket == INVALID_SOCKET) { + log_it(L_ERROR, "Socket create error: %d", WSAGetLastError()); +#else + int l_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); if (l_socket == -1) { log_it(L_ERROR, "Error %d with socket create", errno); +#endif return NULL; } // Get socket flags +#if defined DAP_OS_WINDOWS + u_long l_socket_flags = 0; + if (ioctlsocket((SOCKET)l_socket, (long)FIONBIO, &l_socket_flags)) + log_it(L_ERROR, "Error ioctl %d", WSAGetLastError()); +#else int l_socket_flags = fcntl(l_socket, F_GETFL); if (l_socket_flags == -1){ log_it(L_ERROR, "Error %d can't get socket flags", errno); @@ -291,6 +288,7 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin log_it(L_ERROR, "Error %d can't get socket flags", errno); return NULL; } +#endif // set socket param int buffsize = DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX; #ifdef _WIN32 @@ -310,15 +308,14 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin //l_client_http_internal->socket = l_socket; l_http_pvt->obj = a_obj; l_http_pvt->method = a_method; - l_http_pvt->path = a_path; + l_http_pvt->path = dap_strdup(a_path); l_http_pvt->request_content_type = a_request_content_type; - l_http_pvt->request = a_request; + l_http_pvt->request = (u_char*)dap_strdup(a_request); l_http_pvt->request_size = a_request_size; l_http_pvt->uplink_addr = a_uplink_addr; l_http_pvt->uplink_port = a_uplink_port; l_http_pvt->cookie = a_cookie; - l_http_pvt->request_custom_headers = a_custom; - l_http_pvt->request_custom_headers_count = a_custom_count; + l_http_pvt->request_custom_headers = dap_strdup(a_custom); l_http_pvt->response_size_max = DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX; l_http_pvt->response = (uint8_t*) DAP_NEW_Z_SIZE(uint8_t, DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX); @@ -341,7 +338,8 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin l_ev_socket->remote_addr.sin_family = AF_INET; l_ev_socket->remote_addr.sin_port = htons(a_uplink_port); l_ev_socket->flags |= DAP_SOCK_CONNECTING; - l_ev_socket->flags |= DAP_SOCK_READY_TO_WRITE ; + l_ev_socket->type = DESCRIPTOR_TYPE_SOCKET; + l_ev_socket->flags |= DAP_SOCK_READY_TO_WRITE; int l_err = connect(l_socket, (struct sockaddr *) &l_ev_socket->remote_addr, sizeof(struct sockaddr_in)); if (l_err == 0){ @@ -349,12 +347,31 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin l_http_pvt->worker = a_worker?a_worker: dap_events_worker_get_auto(); dap_worker_add_events_socket(l_ev_socket,l_http_pvt->worker); return l_http_pvt; - }else if( errno == EINPROGRESS && l_err == -1){ + } +#ifdef DAP_OS_WINDOWS + else if(l_err == SOCKET_ERROR) { + int l_err2 = WSAGetLastError(); + if (l_err2 == EWOULDBLOCK || l_err2 == EAGAIN) { + log_it(L_DEBUG, "Connecting to %s:%u", a_uplink_addr, a_uplink_port); + l_http_pvt->worker = a_worker?a_worker: dap_events_worker_get_auto(); + dap_worker_add_events_socket(l_ev_socket,l_http_pvt->worker); + return l_http_pvt; + } else { + log_it(L_ERROR, "Socket %d connecting error: %d", l_ev_socket->socket, WSAGetLastError()); + s_client_http_delete( l_http_pvt); + l_ev_socket->_inheritor = NULL; + dap_events_socket_delete_unsafe( l_ev_socket, true); + return NULL; + } + } +#else + else if( errno == EINPROGRESS && l_err == -1){ log_it(L_DEBUG, "Connecting to %s:%u", a_uplink_addr, a_uplink_port); l_http_pvt->worker = a_worker?a_worker: dap_events_worker_get_auto(); dap_worker_add_events_socket(l_ev_socket,l_http_pvt->worker); return l_http_pvt; - }else{ + } + else{ char l_errbuf[128]; l_errbuf[0] = '\0'; strerror_r(l_err, l_errbuf, sizeof (l_errbuf)); @@ -364,6 +381,7 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin dap_events_socket_delete_unsafe( l_ev_socket, true); return NULL; } +#endif } /** @@ -382,49 +400,42 @@ static void s_http_connected(dap_events_socket_t * a_esocket) // add to dap_worker //dap_client_pvt_t * l_client_pvt = (dap_client_pvt_t*) a_obj; //dap_events_new(); - dap_string_t *l_request_headers = dap_string_new(NULL); + char l_request_headers[1024] = { '\0' }; + int l_offset = 0; + size_t l_offset2 = sizeof(l_request_headers); if(l_http_pvt->request && (dap_strcmp(l_http_pvt->method, "POST") == 0 || dap_strcmp(l_http_pvt->method, "POST_ENC") == 0)) { - char l_buf[1024]; //log_it(L_DEBUG, "POST request with %u bytes of decoded data", a_request_size); - if(l_http_pvt->request_content_type) { - dap_snprintf(l_buf, sizeof(l_buf), "Content-Type: %s\r\n", l_http_pvt->request_content_type); - l_request_headers = dap_string_append(l_request_headers, l_buf); - } + l_offset += l_http_pvt->request_content_type + ? dap_snprintf(l_request_headers, l_offset2, "Content-Type: %s\r\n", l_http_pvt->request_content_type) + : 0; // Add custom headers - if(l_http_pvt->request_custom_headers) { - for( size_t i = 0; i < l_http_pvt->request_custom_headers_count; i++) { - l_request_headers = dap_string_append(l_request_headers, l_http_pvt->request_custom_headers[i]); - l_request_headers = dap_string_append(l_request_headers, "\r\n"); - } - - } + l_offset += l_http_pvt->request_custom_headers + ? dap_snprintf(l_request_headers + l_offset, l_offset2 -= l_offset, "%s", l_http_pvt->request_custom_headers) + : 0; // Setup cookie header - if(l_http_pvt->cookie) { - dap_snprintf(l_buf, sizeof(l_buf), "Cookie: %s\r\n", l_http_pvt->cookie); - l_request_headers = dap_string_append(l_request_headers, l_buf); - } + l_offset += l_http_pvt->cookie + ? dap_snprintf(l_request_headers + l_offset, l_offset2 -= l_offset, "Cookie: %s\r\n", l_http_pvt->cookie) + : 0; // Set request size as Content-Length header - dap_snprintf(l_buf, sizeof(l_buf), "Content-Length: %lu\r\n", l_http_pvt->request_size); - l_request_headers = dap_string_append(l_request_headers, l_buf); + l_offset += dap_snprintf(l_request_headers + l_offset, l_offset2 -= l_offset, "Content-Length: %lu\r\n", l_http_pvt->request_size); } // adding string for GET request - char *l_get_str = NULL; + char l_get_str[l_http_pvt->request_size + 2]; + memset(l_get_str, 0, sizeof(l_get_str)); if(!dap_strcmp(l_http_pvt->method, "GET")) { - char l_buf[1024]; - dap_snprintf(l_buf, sizeof(l_buf), "User-Agent: Mozilla\r\n"); // We hide our request and mask them as possible - if(l_http_pvt->cookie) { - dap_snprintf(l_buf, sizeof(l_buf), "Cookie: %s\r\n", l_http_pvt->cookie); - l_request_headers = dap_string_append(l_request_headers, l_buf); - } + // We hide our request and mask them as possible + l_offset += dap_snprintf(l_request_headers + l_offset, l_offset2 -= l_offset, "User-Agent: Mozilla\r\n"); + l_offset += l_http_pvt->cookie + ? dap_snprintf(l_request_headers + l_offset, l_offset2 -= l_offset, "Cookie: %s\r\n", l_http_pvt->cookie) + : 0; - if(l_http_pvt->request) - l_get_str = dap_strdup_printf("?%s", l_http_pvt->request); + l_offset = l_http_pvt->request ? dap_snprintf(l_get_str, sizeof(l_get_str), "?%s", l_http_pvt->request) : 0; } // send header @@ -432,14 +443,11 @@ static void s_http_connected(dap_events_socket_t * a_esocket) "Host: %s\r\n" "%s" "\r\n", - l_http_pvt->method, l_http_pvt->path, l_get_str ? l_get_str : "", l_http_pvt->uplink_addr, l_request_headers->str); + l_http_pvt->method, l_http_pvt->path, strlen(l_get_str) ? l_get_str : "", l_http_pvt->uplink_addr, l_request_headers); // send data for POST request - if(l_get_str) - DAP_DELETE(l_get_str); - else if ( l_http_pvt->request_size) + if (l_http_pvt->request_size) { dap_events_socket_write_unsafe( a_esocket, l_http_pvt->request, l_http_pvt->request_size); - dap_string_free(l_request_headers, true); - + } } @@ -464,14 +472,7 @@ void* dap_client_http_request(dap_worker_t * a_worker,const char *a_uplink_addr, char * a_cookie, dap_client_http_callback_data_t a_response_callback, dap_client_http_callback_error_t a_error_callback, void *a_obj, void * a_custom) { - char *a_custom_new[1]; - size_t a_custom_count = 0; - // use no more then one custom item only - a_custom_new[0] = (char*) a_custom; - if(a_custom) - a_custom_count = 1; - return dap_client_http_request_custom(a_worker, a_uplink_addr, a_uplink_port, a_method, a_request_content_type, a_path, a_request, a_request_size, a_cookie, a_response_callback, a_error_callback, a_obj, - a_custom_new, a_custom_count); + (char*)a_custom); } diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 43c40404c619fdf1fca519af657be595874af737..b0f1072c9f6b6e259f22a31fc0960d3844e61ad5 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -169,7 +169,7 @@ void dap_client_pvt_delete(dap_client_pvt_t * a_client_pvt) if(a_client_pvt->stream_key) dap_enc_key_delete(a_client_pvt->stream_key); - DAP_DELETE(a_client_pvt); + DAP_DEL_Z(a_client_pvt) } /** @@ -236,21 +236,23 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt) l_sign_size = dap_sign_get_size(l_sign); } uint8_t l_data[l_key_size + l_sign_size]; + memset(l_data, 0, sizeof(l_data)); memcpy(l_data,a_client_pvt->session_key_open->pub_key_data, l_key_size); if (l_sign) { memcpy(l_data + l_key_size, l_sign, l_sign_size); } size_t l_data_str_size_max = DAP_ENC_BASE64_ENCODE_SIZE(l_key_size + l_sign_size); char l_data_str[l_data_str_size_max + 1]; + memset(l_data_str, 0, sizeof(l_data_str)); // DAP_ENC_DATA_TYPE_B64_URLSAFE not need because send it by POST request size_t l_data_str_enc_size = dap_enc_base64_encode(l_data, l_key_size + l_sign_size, l_data_str, DAP_ENC_DATA_TYPE_B64); log_it(L_DEBUG, "ENC request size %u", l_data_str_enc_size); - char * l_enc_init_url = dap_strdup_printf(DAP_UPLINK_PATH_ENC_INIT - "/gd4y5yh78w42aaagh" "?enc_type=%d,pkey_exchange_type=%d,pkey_exchange_size=%zd", - a_client_pvt->session_key_type, a_client_pvt->session_key_open_type, - l_key_size ); - + char l_enc_init_url[1024] = { '\0' }; + dap_snprintf(l_enc_init_url, sizeof(l_enc_init_url), DAP_UPLINK_PATH_ENC_INIT + "/gd4y5yh78w42aaagh" "?enc_type=%d,pkey_exchange_type=%d,pkey_exchange_size=%zd", + a_client_pvt->session_key_type, a_client_pvt->session_key_open_type, + l_key_size ); int l_res = dap_client_pvt_request(a_client_pvt, l_enc_init_url, l_data_str, l_data_str_enc_size, s_enc_init_response, s_enc_init_error); // bad request @@ -290,13 +292,31 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt) case STAGE_STREAM_SESSION: { log_it(L_INFO, "Go to stage STREAM_SESSION: process the state ops"); - a_client_pvt->stream_socket = socket( PF_INET, SOCK_STREAM, 0); - + a_client_pvt->stream_socket = socket(PF_INET, SOCK_STREAM, 0); +#ifdef DAP_OS_WINDOWS + if (a_client_pvt->stream_socket == INVALID_SOCKET) { + log_it(L_ERROR, "Socket create error %d", WSAGetLastError()); +#else if (a_client_pvt->stream_socket == -1) { log_it(L_ERROR, "Error %d with socket create", errno); +#endif a_client_pvt->stage_status = STAGE_STATUS_ERROR; break; } +#ifdef DAP_OS_WINDOWS + u_long l_socket_flags = 0; + if (ioctlsocket(a_client_pvt->stream_socket, (long)FIONBIO, &l_socket_flags) == SOCKET_ERROR) { + log_it(L_ERROR, "Can't set socket %d to nonblocking mode, error %d", a_client_pvt->stream_socket, WSAGetLastError()); + } + int buffsize = 0x40000; + int optsize = sizeof( int ); + if (setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (char *)&buffsize, &optsize ) < 0) { + log_it(L_ERROR, "Cant' set send buf size on socket %d, error %d", a_client_pvt->stream_socket, WSAGetLastError()); + } + if (setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, &optsize ) < 0) { + log_it(L_ERROR, "Cant' set recv buf size on socket %d, error %d", a_client_pvt->stream_socket, WSAGetLastError()); + } +#else // Get socket flags int l_socket_flags = fcntl(a_client_pvt->stream_socket, F_GETFL); if (l_socket_flags == -1){ @@ -306,16 +326,8 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt) // Make it non-block if (fcntl( a_client_pvt->stream_socket, F_SETFL,l_socket_flags| O_NONBLOCK) == -1){ log_it(L_ERROR, "Error %d can't get socket flags", errno); - break;; - } -#ifdef _WIN32 - { - int buffsize = 65536*4; - int optsize = sizeof( int ); - setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (char *)&buffsize, &optsize ); - setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, &optsize ); + break; } -#else int buffsize = 65536*4; setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, ( void *) &buffsize, sizeof(int)); setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, ( void *) &buffsize, sizeof(int)); @@ -330,9 +342,9 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt) .connected_callback = s_stream_es_callback_connected };// a_client_pvt->stream_es = dap_events_socket_wrap_no_add(a_client_pvt->events, - a_client_pvt->stream_socket, &l_s_callbacks); + (int)a_client_pvt->stream_socket, &l_s_callbacks); a_client_pvt->stream_es->flags |= DAP_SOCK_CONNECTING ; // To catch non-blocking error when connecting we should ar WRITE flag - + a_client_pvt->stream_es->flags |= DAP_SOCK_READY_TO_WRITE; a_client_pvt->stream_es->_inheritor = a_client_pvt; a_client_pvt->stream = dap_stream_new_es_client(a_client_pvt->stream_es); assert(a_client_pvt->stream); @@ -346,6 +358,7 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt) // connect memset(&a_client_pvt->stream_es->remote_addr, 0, sizeof(a_client_pvt->stream_es->remote_addr)); + a_client_pvt->stream_es->remote_addr_str6 = NULL; //DAP_NEW_Z_SIZE(char, INET6_ADDRSTRLEN); a_client_pvt->stream_es->remote_addr.sin_family = AF_INET; a_client_pvt->stream_es->remote_addr.sin_port = htons(a_client_pvt->uplink_port); if(inet_pton(AF_INET, a_client_pvt->uplink_addr, &(a_client_pvt->stream_es->remote_addr.sin_addr)) < 0) { @@ -356,8 +369,7 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt) } else { int l_err = 0; - strncpy( a_client_pvt->stream_es->remote_addr_str, a_client_pvt->uplink_addr, - sizeof (a_client_pvt->stream_es->remote_addr_str)-1 ); + a_client_pvt->stream_es->remote_addr_str = dap_strdup(a_client_pvt->uplink_addr); if((l_err = connect(a_client_pvt->stream_socket, (struct sockaddr *) &a_client_pvt->stream_es->remote_addr, sizeof(struct sockaddr_in))) ==0) { @@ -374,7 +386,11 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt) strncpy(l_errbuf,"Unknown Error",sizeof(l_errbuf)-1); log_it(L_ERROR, "Remote address can't connect (%s:%u) with sock_id %d: \"%s\" (code %d)", a_client_pvt->uplink_addr, a_client_pvt->uplink_port, l_errbuf, l_err); +#ifdef DAP_OS_WINDOWS + closesocket(a_client_pvt->stream_socket); +#else close(a_client_pvt->stream_socket); +#endif a_client_pvt->stream_socket = 0; a_client_pvt->stage_status = STAGE_STATUS_ERROR; a_client_pvt->last_error = ERROR_STREAM_CONNECT ; @@ -588,17 +604,17 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char // l_url_size = strlen(l_url); size_t l_sub_url_enc_size_max = l_sub_url_size ? (5 * l_sub_url_size + 16) : 0; - char *l_sub_url_enc = l_sub_url_size ? DAP_NEW_Z_SIZE(char, l_sub_url_enc_size_max + 1) : NULL; + char *l_sub_url_enc = l_sub_url_size ? DAP_NEW_S_SIZE(char, l_sub_url_enc_size_max + 1) : NULL; size_t l_query_enc_size_max = (is_query_enc) ? (l_query_size * 5 + 16) : l_query_size; char *l_query_enc = - (is_query_enc) ? (l_query_size ? DAP_NEW_Z_SIZE(char, l_query_enc_size_max + 1) : NULL) : (char*) a_query; + (is_query_enc) ? (l_query_size ? DAP_NEW_S_SIZE(char, l_query_enc_size_max + 1) : NULL) : (char*) a_query; // size_t l_url_full_size_max = 5 * l_sub_url_size + 5 * l_query_size + 16 + l_url_size + 2; // char * l_url_full = DAP_NEW_Z_SIZE(char, l_url_full_size_max + 1); size_t l_request_enc_size_max = a_request_size ? a_request_size * 2 + 16 : 0; - char * l_request_enc = a_request_size ? DAP_NEW_Z_SIZE(char, l_request_enc_size_max + 1) : NULL; + char * l_request_enc = a_request_size ? DAP_NEW_S_SIZE(char, l_request_enc_size_max + 1) : NULL; size_t l_request_enc_size = 0; a_client_internal->request_response_callback = a_response_proc; @@ -647,48 +663,32 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char snprintf(l_url_full, l_url_full_size_max, "%s", l_url); } */ - char *l_path = NULL; + int l_off; + char *l_path = DAP_NEW_S_SIZE(char, l_query_enc_size_max + l_sub_url_enc_size_max + 1); if(a_path) { - if(l_sub_url_size) { - if(l_query_size) { - l_path = dap_strdup_printf("%s/%s?%s", a_path, l_sub_url_enc, l_query_enc); - - } else { - l_path = dap_strdup_printf("%s/%s", a_path, l_sub_url_enc); - } + if(l_sub_url_size) + { + l_off = l_query_size + ? dap_snprintf(l_path, l_query_enc_size_max + l_sub_url_enc_size_max + 1, "%s/%s?%s", a_path, l_sub_url_enc, l_query_enc) + : dap_snprintf(l_path, l_sub_url_enc_size_max + 1, "%s/%s", a_path, l_sub_url_enc); } else { - l_path = dap_strdup(a_path); + dap_stpcpy(l_path, a_path); } } - size_t l_custom_count = 1; - char **l_custom_new = DAP_NEW_Z_SIZE(char*,2*sizeof (char*)); - l_custom_new[0] = dap_strdup_printf("KeyID: %s", a_client_internal->session_key_id ? - a_client_internal->session_key_id : "NULL"); - // close session - if(a_client_internal->is_close_session) { - l_custom_new[1] = dap_strdup("SessionCloseAfterRequest: true"); - l_custom_count++; - } + size_t size_required = a_client_internal->session_key_id ? strlen(a_client_internal->session_key_id) + 40 : 40; + char *l_custom = DAP_NEW_S_SIZE(char, size_required); + size_t l_off2 = size_required; + + l_off = dap_snprintf(l_custom, l_off2, "KeyID: %s\r\n", a_client_internal->session_key_id ? a_client_internal->session_key_id : "NULL"); + l_off += a_client_internal->is_close_session + ? dap_snprintf(l_custom + l_off, l_off2 -= l_off, "%s\r\n", "SessionCloseAfterRequest: true") + : 0; + a_client_internal->refs_count++; dap_client_http_request_custom(a_client_internal->worker, a_client_internal->uplink_addr, a_client_internal->uplink_port, a_request ? "POST" : "GET", "text/text", l_path, l_request_enc, l_request_enc_size, NULL, - s_request_response, s_request_error, a_client_internal, l_custom_new, l_custom_count); -// dap_http_client_simple_request_custom(l_url_full, a_request ? "POST" : "GET", "text/text", -// l_request_enc, l_request_enc_size, NULL, -// m_request_response, a_client_internal->curl_sockfd ,m_request_error, a_client_internal, a_custom_new, a_custom_count); - - if(l_sub_url_enc) - DAP_DELETE(l_sub_url_enc); - - if(is_query_enc && l_query_enc) - DAP_DELETE(l_query_enc); - -// if(l_url_full) -// DAP_DELETE(l_url_full); - - if(l_request_enc) - DAP_DELETE(l_request_enc); + s_request_response, s_request_error, a_client_internal, l_custom); } /** @@ -1090,6 +1090,8 @@ static void s_stream_es_callback_delete(dap_events_socket_t *a_es, void *arg) } } dap_stream_delete(l_client_pvt->stream); + DAP_DEL_Z(l_client_pvt->stream_es->remote_addr_str) + DAP_DEL_Z(l_client_pvt->stream_es->remote_addr_str6) l_client_pvt->stream = NULL; l_client_pvt->stream_es = NULL; } @@ -1118,7 +1120,7 @@ static void s_stream_es_callback_read(dap_events_socket_t * a_es, void * arg) l_pos_endl = (char*) memchr(a_es->buf_in, '\r', a_es->buf_in_size - 1); if(l_pos_endl) { if(*(l_pos_endl + 1) == '\n') { - dap_events_socket_shrink_buf_in(a_es, l_pos_endl - a_es->buf_in_str); + dap_events_socket_shrink_buf_in(a_es, l_pos_endl - (char*)a_es->buf_in); log_it(L_DEBUG, "Header passed, go to streaming (%lu bytes already are in input buffer", a_es->buf_in_size); diff --git a/dap-sdk/net/client/include/dap_client_http.h b/dap-sdk/net/client/include/dap_client_http.h index f158cd04c4108a8a4e15691ab2f4d7e8f400bbd8..f980ebdbd9cc5b36f0c3e83b36dbe5c1b4bb13cc 100644 --- a/dap-sdk/net/client/include/dap_client_http.h +++ b/dap-sdk/net/client/include/dap_client_http.h @@ -34,7 +34,7 @@ typedef void (*dap_client_http_callback_data_t)(void *, size_t, void *); // Call void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_uplink_addr, uint16_t a_uplink_port, const char *a_method, const char *a_request_content_type, const char * a_path, void *a_request, size_t a_request_size, char *a_cookie, dap_client_http_callback_data_t a_response_callback, dap_client_http_callback_error_t a_error_callback, - void *a_obj, char **a_custom, size_t a_custom_count); + void *a_obj, char *a_custom); void* dap_client_http_request(dap_worker_t * a_worker,const char *a_uplink_addr, uint16_t a_uplink_port, const char * a_method, const char* a_request_content_type, const char * a_path, void *a_request, size_t a_request_size, diff --git a/dap-sdk/net/client/include/dap_client_pvt.h b/dap-sdk/net/client/include/dap_client_pvt.h index 7a4cf011a3dd3bc025b7de98d1f4b11939c300c9..e4e390e4b27b9125d089068ed6e3a543c5ad86c2 100644 --- a/dap-sdk/net/client/include/dap_client_pvt.h +++ b/dap-sdk/net/client/include/dap_client_pvt.h @@ -39,7 +39,11 @@ typedef struct dap_client_internal dap_http_client_t * http_client; dap_events_socket_t * stream_es; +#ifdef DAP_OS_WINDOWS + SOCKET stream_socket; +#else int stream_socket; +#endif dap_stream_t* stream; dap_stream_worker_t* stream_worker; dap_worker_t * worker; diff --git a/dap-sdk/net/core/CMakeLists.txt b/dap-sdk/net/core/CMakeLists.txt index 3b31208beeade39e3e94b7321b44d516a82c694d..efb66ea9107a8a712c077b885b15ff44d27310f6 100644 --- a/dap-sdk/net/core/CMakeLists.txt +++ b/dap-sdk/net/core/CMakeLists.txt @@ -16,6 +16,7 @@ endif() if(WIN32) include_directories(../../../3rdparty/uthash/src/) include_directories(../../../3rdparty/wepoll/) + include_directories(../../../modules/net/win32) endif() add_library(${PROJECT_NAME} STATIC ${DAP_SERVER_CORE_HEADERS} ${DAP_SERVER_CORE_SOURCES}) diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c index 73494a615cd9de01bf285b013348031781209941..38fbb6f96251d90a4b15a0a35d81f8e385622976 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/net/core/dap_events.c @@ -109,7 +109,7 @@ uint32_t dap_get_cpu_count( ) void dap_cpu_assign_thread_on(uint32_t a_cpu_id) { -#ifndef _WIN32 +#ifndef DAP_OS_WINDOWS #ifndef NO_POSIX_SHED cpu_set_t mask; CPU_ZERO(&mask); @@ -250,10 +250,15 @@ int dap_events_start( dap_events_t *a_events ) pthread_mutex_init(& l_worker->started_mutex, NULL); pthread_cond_init( & l_worker->started_cond, NULL); //log_it(L_DEBUG, "Created event_fd %d for worker %u", l_worker->epoll_fd,i); +#ifdef DAP_OS_WINDOWS + if (!l_worker->epoll_fd) { + int l_errno = WSAGetLastError(); +#else if ( l_worker->epoll_fd == -1 ) { int l_errno = errno; +#endif char l_errbuf[128]; - strerror_r(l_errno, l_errbuf, sizeof ( l_errbuf) ); + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); log_it(L_CRITICAL, "Error create epoll fd: %s (%d)", l_errbuf, l_errno); DAP_DELETE(l_worker); return -1; diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index b753cc8918820e6569fbfa1e3a75eb3771626dcc..0923759b374c03e11ab64073e0bc58af03109688 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -108,6 +108,16 @@ void dap_events_socket_deinit( ) { } +#ifdef DAP_OS_WINDOWS +void __stdcall mq_receive_cb(HRESULT hr, QUEUEHANDLE qh, DWORD timeout + , DWORD action, MQMSGPROPS *pmsgprops, LPOVERLAPPED pov, HANDLE cursor) { + switch (hr) { + case MQ_OK: + SetEvent(pov->hEvent); + break; + } +} +#endif /** * @brief dap_events_socket_wrap @@ -129,7 +139,9 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, ret->events = a_events; memcpy(&ret->callbacks, a_callbacks, sizeof(ret->callbacks) ); ret->flags = DAP_SOCK_READY_TO_READ; - + ret->buf_in = a_callbacks->timer_callback ? NULL : DAP_NEW_Z_SIZE(byte_t, DAP_EVENTS_SOCKET_BUF + 1); + ret->buf_out = a_callbacks->timer_callback ? NULL : DAP_NEW_Z_SIZE(byte_t, DAP_EVENTS_SOCKET_BUF + 1); + ret->buf_in_size = ret->buf_out_size = 0; #if defined(DAP_EVENTS_CAPS_EPOLL) ret->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP; #elif defined(DAP_EVENTS_CAPS_POLL) @@ -138,7 +150,12 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, if ( a_sock!= 0 && a_sock != -1){ pthread_rwlock_wrlock(&a_events->sockets_rwlock); - HASH_ADD(hh,a_events->sockets, socket, sizeof (int), ret); +#ifdef DAP_OS_WINDOWS + log_it(L_WARNING, "Hash add 0x%x", ret); + HASH_ADD(hh,a_events->sockets, socket, sizeof(SOCKET), ret); +#else + HASH_ADD_INT(hh,a_events->sockets, socket, ret); +#endif pthread_rwlock_unlock(&a_events->sockets_rwlock); }else log_it(L_WARNING, "Be carefull, you've wrapped socket 0 or -1 so it wasn't added to global list. Do it yourself when possible"); @@ -205,6 +222,9 @@ void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, */ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags) { +#ifdef DAP_OS_WINDOWS + return NULL; +#else UNUSED(a_flags); dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); l_es->type = DESCRIPTOR_TYPE_PIPE; @@ -226,12 +246,8 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c l_errbuf[0]=0; if( pipe(l_pipe) < 0 ){ l_errno = errno; -#if defined DAP_OS_UNIX strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); log_it( L_ERROR, "Error detected, can't create pipe(): '%s' (%d)", l_errbuf, l_errno); -#elif defined DAP_OS_WINDOWS - log_it( L_ERROR, "Can't create pipe, errno: %d", l_errno); -#endif DAP_DELETE(l_es); return NULL; }//else @@ -248,6 +264,7 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c #error "No defined s_create_type_pipe() for your platform" #endif return l_es; +#endif } /** @@ -304,6 +321,8 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket { dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); l_es->type = DESCRIPTOR_TYPE_QUEUE; + l_es->buf_out = DAP_NEW_Z_SIZE(byte_t, 8 * sizeof(void*)); + //l_es->buf_out_size = 8 * sizeof(void*); l_es->events = a_es->events; #if defined(DAP_EVENTS_CAPS_EPOLL) l_es->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP; @@ -312,8 +331,6 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket #else #error "Not defined s_create_type_pipe for your platform" #endif - - l_es->type = DESCRIPTOR_TYPE_QUEUE; #ifdef DAP_EVENTS_CAPS_QUEUE_MQUEUE l_es->mqd = a_es->mqd; char l_mq_name[64]; @@ -342,6 +359,36 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket assert(l_es->mqd); #elif defined (DAP_EVENTS_CAPS_QUEUE_PIPE2) l_es->fd = a_es->fd2; +#elif defined DAP_EVENTS_CAPS_MSMQ + l_es->mqh = a_es->mqh; + l_es->mqh_recv = a_es->mqh_recv; + + l_es->socket = a_es->socket; + + WCHAR l_direct_name[MQ_MAX_Q_NAME_LEN + 1] = { 0 }; + size_t l_sz_in_words = sizeof(l_direct_name)/sizeof(l_direct_name[0]); + int pos = _snwprintf_s(l_direct_name, l_sz_in_words, l_sz_in_words - 1, L"DIRECT=OS:.\\PRIVATE$\\DapEventSocketQueue%d", l_es->socket); + if (pos < 0) { + log_it(L_ERROR, "Message queue path error"); + DAP_DELETE(l_es); + return NULL; + } + + HRESULT hr = MQOpenQueue(l_direct_name, MQ_SEND_ACCESS, MQ_DENY_NONE, &(l_es->mqh)); + if (hr == MQ_ERROR_QUEUE_NOT_FOUND) { + log_it(L_INFO, "Queue still not created, wait a bit..."); + Sleep(300); + hr = MQOpenQueue(l_direct_name, MQ_SEND_ACCESS, MQ_DENY_NONE, &(l_es->mqh)); + if (hr != MQ_OK) { + log_it(L_ERROR, "Can't open message queue for queue type, error: 0x%x", hr); + return NULL; + } + } + hr = MQOpenQueue(l_direct_name, MQ_RECEIVE_ACCESS, MQ_DENY_NONE, &(l_es->mqh_recv)); + if (hr != MQ_OK) { + log_it(L_ERROR, "Can't open message queue for queue type, error: 0x%x", hr); + return NULL; + } #else #error "Not defined dap_events_socket_queue_ptr_create_input() for this platform" #endif @@ -372,6 +419,7 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc l_es->worker = a_w; } l_es->callbacks.queue_ptr_callback = a_callback; // Arm event callback + l_es->buf_out = NULL; #if defined(DAP_EVENTS_CAPS_EPOLL) l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; @@ -439,6 +487,97 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc l_mq_last_number++; } assert(l_es->mqd); +#elif defined DAP_EVENTS_CAPS_MSMQ + l_es->socket = socket(AF_INET, SOCK_DGRAM, 0); + + if (l_es->socket == INVALID_SOCKET) { + log_it(L_ERROR, "Error creating socket for TYPE_QUEUE: %d", WSAGetLastError()); + DAP_DELETE(l_es); + return NULL; + } + + int buffsize = 1024; + setsockopt(l_es->socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, sizeof(int)); + + int reuse = 1; + if (setsockopt(l_es->socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0) + log_it(L_WARNING, "Can't set up REUSEADDR flag to the socket, err: %d", WSAGetLastError()); + + unsigned long l_mode = 0; + ioctlsocket(l_es->socket, FIONBIO, &l_mode); + + int l_addr_len; + struct sockaddr_in l_addr; + l_addr.sin_family = AF_INET; + IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } }; + l_addr.sin_addr = _in_addr; + l_addr.sin_port = l_es->socket + 32768; + l_addr_len = sizeof(struct sockaddr_in); + + if (bind(l_es->socket, (struct sockaddr*)&l_addr, sizeof(l_addr)) < 0) { + log_it(L_ERROR, "Bind error: %d", WSAGetLastError()); + } else { + log_it(L_INFO, "Binded %d", l_es->socket); + } + + MQQUEUEPROPS l_qps; + MQPROPVARIANT l_qp_var[1]; + QUEUEPROPID l_qp_id[1]; + HRESULT l_q_status[1]; + + WCHAR l_pathname[MAX_PATH] = { 0 }; + size_t l_sz_in_words = sizeof(l_pathname)/sizeof(l_pathname[0]); + int pos = _snwprintf_s(l_pathname, l_sz_in_words, l_sz_in_words - 1, L".\\PRIVATE$\\DapEventSocketQueue%d", l_es->socket); + if (pos < 0) { + log_it(L_ERROR, "Message queue path error"); + DAP_DELETE(l_es); + return NULL; + } + u_long l_p_id = 0; + l_qp_id[l_p_id] = PROPID_Q_PATHNAME; + l_qp_var[l_p_id].vt = VT_LPWSTR; + l_qp_var[l_p_id].pwszVal = l_pathname; + l_p_id++; + + l_qps.cProp = l_p_id; + l_qps.aPropID = l_qp_id; + l_qps.aPropVar = l_qp_var; + l_qps.aStatus = l_q_status; + + WCHAR l_direct_name[MQ_MAX_Q_NAME_LEN + 1] = { 0 }; + WCHAR l_format_name[sizeof(l_direct_name) - 10] = { 0 }; + DWORD l_buflen = sizeof(l_format_name); + HRESULT hr = MQCreateQueue(NULL, &l_qps, l_format_name, &l_buflen); + if ((hr != MQ_OK) && (hr != MQ_ERROR_QUEUE_EXISTS) && (hr != MQ_INFORMATION_PROPERTY)) { + log_it(L_ERROR, "Can't create message queue for queue type, error: 0x%x", hr); + DAP_DELETE(l_es); + return NULL; + } + + wcsncpy(l_direct_name, L"DIRECT=OS:", 10); + wcscat_s(l_direct_name, l_buflen, l_pathname); + + hr = MQOpenQueue(l_direct_name, MQ_SEND_ACCESS, MQ_DENY_NONE, &(l_es->mqh)); + if (hr == MQ_ERROR_QUEUE_NOT_FOUND) { + log_it(L_INFO, "Queue still not created, wait a bit..."); + Sleep(300); + hr = MQOpenQueue(l_direct_name, MQ_SEND_ACCESS, MQ_DENY_NONE, &(l_es->mqh)); + if (hr != MQ_OK) { + log_it(L_ERROR, "Can't open message queue for queue type, error: 0x%x", hr); + DAP_DELETE(l_es); + MQDeleteQueue(l_format_name); + return NULL; + } + } + hr = MQOpenQueue(l_direct_name, MQ_RECEIVE_ACCESS, MQ_DENY_NONE, &(l_es->mqh_recv)); + if (hr != MQ_OK) { + log_it(L_ERROR, "Can't open message queue for queue type, error: 0x%x", hr); + DAP_DELETE(l_es); + MQCloseQueue(l_es->mqh); + MQDeleteQueue(l_format_name); + return NULL; + } + #else #error "Not implemented s_create_type_queue_ptr() on your platform" #endif @@ -474,8 +613,14 @@ dap_events_socket_t * dap_events_socket_create_type_queue_ptr_unsafe(dap_worker_ dap_events_socket_t * l_es = s_create_type_queue_ptr(a_w, a_callback); assert(l_es); // If no worker - don't assign - if ( a_w) - dap_worker_add_events_socket_unsafe(l_es,a_w); + if ( a_w) { + if(dap_worker_add_events_socket_unsafe(l_es,a_w)) { +#ifdef DAP_OS_WINDOWS + errno = WSAGetLastError(); +#endif + log_it(L_ERROR, "Can't add esocket %d to polling, err %d", l_es->socket, errno); + } + } return l_es; } @@ -506,12 +651,58 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) return -1; } a_esocket->callbacks.queue_ptr_callback (a_esocket, l_queue_ptr); +#elif defined DAP_EVENTS_CAPS_MSMQ + DWORD l_mp_id = 0; + MQMSGPROPS l_mps; + MQPROPVARIANT l_mpvar[2]; + MSGPROPID l_p_id[2]; + + UCHAR l_body[1024] = { 0 }; + l_p_id[l_mp_id] = PROPID_M_BODY; + l_mpvar[l_mp_id].vt = VT_UI1 | VT_VECTOR; + l_mpvar[l_mp_id].caub.cElems = sizeof(l_body); + l_mpvar[l_mp_id].caub.pElems = l_body; + l_mp_id++; + + l_p_id[l_mp_id] = PROPID_M_BODY_SIZE; + l_mpvar[l_mp_id].vt = VT_UI4; + l_mp_id++; + + l_mps.cProp = l_mp_id; + l_mps.aPropID = l_p_id; + l_mps.aPropVar = l_mpvar; + l_mps.aStatus = NULL; + + HRESULT hr = MQReceiveMessage(a_esocket->mqh_recv, 1000, MQ_ACTION_RECEIVE, &l_mps, NULL, NULL, NULL, MQ_NO_TRANSACTION); + if (hr != MQ_OK) { + log_it(L_ERROR, "An error 0x%x occured receiving a message from queue", hr); + return -1; + } + if (l_mpvar[1].ulVal % sizeof(void*)) { + log_it(L_ERROR, "Queue message size incorrect: %d", l_mpvar[1].ulVal); + if (l_mpvar[1].ulVal < sizeof(void*)) { + log_it(L_ERROR, "Queue socket %d received invalid data", a_esocket->socket); + return -1; + } + } + for (u_int pad = 0; pad < l_mpvar[1].ulVal; pad += sizeof(void*)) { + memcpy(&l_queue_ptr, l_body + pad, sizeof(void*)); + a_esocket->callbacks.queue_ptr_callback (a_esocket, l_queue_ptr); + } #else #error "No Queue fetch mechanism implemented on your platform" #endif - }else{ + } else { +#ifdef DAP_OS_WINDOWS + int l_read = dap_recvfrom(a_esocket->socket, a_esocket->buf_in, DAP_EVENTS_SOCKET_BUF); + if (l_read == SOCKET_ERROR) { + log_it(L_ERROR, "Queue socket %d received invalid data, error %d", a_esocket->socket, WSAGetLastError()); + return -1; + } +#else size_t l_read = read(a_esocket->socket, a_esocket->buf_in,sizeof(a_esocket->buf_in)); - a_esocket->callbacks.queue_callback(a_esocket,a_esocket->buf_in,l_read ); +#endif + a_esocket->callbacks.queue_callback(a_esocket, a_esocket->buf_in, l_read); } }else{ log_it(L_ERROR, "Queue socket %d accepted data but callback is NULL ", a_esocket->socket); @@ -529,6 +720,8 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_callback_event_t a_callback) { dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); if (!l_es) return NULL; + l_es->buf_out = DAP_NEW_Z_SIZE(byte_t, 1); + l_es->buf_out_size = 1; l_es->type = DESCRIPTOR_TYPE_EVENT; if (a_w){ l_es->events = a_w->events; @@ -564,15 +757,39 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_ //log_it(L_DEBUG, "Created eventfd descriptor %d", l_es->fd ); } #elif defined DAP_OS_WINDOWS - int l_pipe[2]; - if (pipe(l_pipe) < 0) { - log_it(L_ERROR, "Can't create pipe for event type, error: %d", errno); + + + l_es->socket = socket(AF_INET, SOCK_DGRAM, 0); + + if (l_es->socket == INVALID_SOCKET) { + log_it(L_ERROR, "Error creating socket for TYPE_QUEUE: %d", WSAGetLastError()); DAP_DELETE(l_es); return NULL; } - l_es->fd2 = l_pipe[0]; - l_es->fd = l_pipe[1]; - log_it(L_DEBUG, "Created pipe for event type, %d -> %d", l_es->fd2, l_es->fd); + + int buffsize = 1024; + setsockopt(l_es->socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, sizeof(int)); + + unsigned long l_mode = 0; + ioctlsocket(l_es->socket, FIONBIO, &l_mode); + + int reuse = 1; + if (setsockopt(l_es->socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0) + log_it(L_WARNING, "Can't set up REUSEADDR flag to the socket, err: %d", WSAGetLastError()); + + int l_addr_len; + struct sockaddr_in l_addr; + l_addr.sin_family = AF_INET; + IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } }; + l_addr.sin_addr = _in_addr; + l_addr.sin_port = l_es->socket + 32768; + l_addr_len = sizeof(struct sockaddr_in); + + if (bind(l_es->socket, (struct sockaddr*)&l_addr, sizeof(l_addr)) < 0) { + log_it(L_ERROR, "Bind error: %d", WSAGetLastError()); + } else { + log_it(L_INFO, "Binded %d", l_es->socket); + } #endif return l_es; } @@ -627,22 +844,24 @@ void dap_events_socket_event_proc_input_unsafe(dap_events_socket_t *a_esocket) }else return; // do nothing #elif defined DAP_OS_WINDOWS - uint64_t l_value; + u_short l_value; int l_ret; - switch (l_ret = read(a_esocket->fd, &l_value, 8)) { - case -1: - log_it(L_CRITICAL, "Can't read from event socket pipe, error: %d", errno); + switch (l_ret = dap_recvfrom(a_esocket->socket, a_esocket->buf_in, a_esocket->buf_in_size)) { + case SOCKET_ERROR: + log_it(L_CRITICAL, "Can't read from event socket, error: %d", WSAGetLastError()); break; case 0: return; default: + l_value = a_esocket->buf_out[0]; + log_it(L_INFO, "Proc input event %d, val %d", a_esocket->socket, l_value); a_esocket->callbacks.event_callback(a_esocket, l_value); - break; + return; } #else #error "No Queue fetch mechanism implemented on your platform" #endif - }else + } else log_it(L_ERROR, "Event socket %d accepted data but callback is NULL ", a_esocket->socket); } @@ -703,7 +922,7 @@ static int wait_send_socket(int a_sockfd, long timeout_ms) * @param arg * @return */ -void *dap_events_socket_buf_thread(void *arg) +static void *dap_events_socket_buf_thread(void *arg) { dap_events_socket_buf_item_t *l_item = (dap_events_socket_buf_item_t*) arg; if(!l_item) { @@ -713,7 +932,12 @@ void *dap_events_socket_buf_thread(void *arg) int l_count = 0; while(l_res < 1 && l_count < 3) { // wait max 5 min +#ifdef DAP_OS_WINDOWS + log_it(L_INFO, "Wait 5 minutes"); + l_res = wait_send_socket(l_item->es->socket, 300000); +#else l_res = wait_send_socket(l_item->es->fd2, 300000); +#endif if (l_res == 0){ dap_events_socket_queue_ptr_send(l_item->es, l_item->arg); break; @@ -745,8 +969,14 @@ static void add_ptr_to_buf(dap_events_socket_t * a_es, void* a_arg) int dap_events_socket_queue_ptr_send_to_input(dap_events_socket_t * a_es_input, void * a_arg) { volatile void * l_arg = a_arg; - int ret= dap_events_socket_write_unsafe(a_es_input,&l_arg,sizeof (l_arg) )==sizeof (l_arg)?0:1 ; - return ret; + if (a_es_input->buf_out_size >= sizeof(void*)) { + if (memcmp(a_es_input->buf_out + a_es_input->buf_out_size - sizeof(void*), a_arg, sizeof(void*))) { + log_it(L_INFO, "Ptr 0x%x already present in input, drop it", a_arg); + return 2; + } + } + return dap_events_socket_write_unsafe(a_es_input, &l_arg, sizeof(l_arg)) + == sizeof(l_arg) ? 0 : 1; } /** @@ -772,6 +1002,49 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg) l_errno = EAGAIN; if (l_ret == 0) l_ret = sizeof (a_arg); +#elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX) + struct timespec l_timeout; + clock_gettime(CLOCK_REALTIME, &l_timeout); + l_timeout.tv_sec+=2; // Not wait more than 1 second to get and 2 to send + int ret = mq_timedsend(a_es->mqd, (const char *)&a_arg,sizeof (a_arg),0, &l_timeout ); + int l_errno = errno; + if (ret == sizeof(a_arg) ) + return 0; + else + return l_errno; +#elif defined DAP_EVENTS_CAPS_MSMQ + + char *pbuf = (char *)&a_arg; + + DWORD l_mp_id = 0; + MQMSGPROPS l_mps; + MQPROPVARIANT l_mpvar[1]; + MSGPROPID l_p_id[1]; + HRESULT l_mstatus[1]; + + l_p_id[l_mp_id] = PROPID_M_BODY; + l_mpvar[l_mp_id].vt = VT_VECTOR | VT_UI1; + l_mpvar[l_mp_id].caub.pElems = (unsigned char*)(pbuf); + l_mpvar[l_mp_id].caub.cElems = sizeof(void*); + l_mp_id++; + + l_mps.cProp = l_mp_id; + l_mps.aPropID = l_p_id; + l_mps.aPropVar = l_mpvar; + l_mps.aStatus = l_mstatus; + HRESULT hr = MQSendMessage(a_es->mqh, &l_mps, MQ_NO_TRANSACTION); + + if (hr != MQ_OK) { + log_it(L_ERROR, "An error occured on sending message to queue, errno: 0x%x", hr); + return hr; + } + + if(dap_sendto(a_es->socket, NULL, 0) == SOCKET_ERROR) { + return WSAGetLastError(); + } else { + return 0; + } + #else #error "Not implemented dap_events_socket_queue_ptr_send() for this platform" #endif @@ -809,9 +1082,9 @@ int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value else return 1; #elif defined DAP_OS_WINDOWS - byte_t l_bytes[sizeof(void*)] = { 0 }; - if(write(a_es->fd2, l_bytes, sizeof(l_bytes)) == -1) { - return errno; + a_es->buf_out[0] = (u_short)a_value; + if(dap_sendto(a_es->socket, a_es->buf_out, sizeof(uint64_t)) == SOCKET_ERROR) { + return WSAGetLastError(); } else { return 0; } @@ -855,7 +1128,8 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da ret->server = a_server; memcpy(&ret->callbacks,a_callbacks, sizeof ( ret->callbacks) ); - + ret->buf_in = ret->buf_out = a_callbacks->timer_callback ? NULL : DAP_NEW_Z_SIZE(byte_t, DAP_EVENTS_SOCKET_BUF+1); + ret->buf_in_size = ret->buf_out_size = 0; ret->flags = DAP_SOCK_READY_TO_READ; ret->last_time_active = ret->last_ping_request = time( NULL ); @@ -880,7 +1154,11 @@ dap_events_socket_t *dap_events_socket_find_unsafe( int sock, struct dap_events if(!a_events) return NULL; if(a_events->sockets) +#ifdef DAP_OS_WINDOWS + HASH_FIND(hh, a_events->sockets, &sock, sizeof(SOCKET), ret ); +#else HASH_FIND_INT( a_events->sockets, &sock, ret ); +#endif return ret; } @@ -901,7 +1179,11 @@ void dap_events_socket_worker_poll_update_unsafe(dap_events_socket_t * a_esocket if( a_esocket->worker){ if ( epoll_ctl(a_esocket->worker->epoll_fd, EPOLL_CTL_MOD, a_esocket->socket, &a_esocket->ev) ){ +#ifdef DAP_OS_WINDOWS + int l_errno = WSAGetLastError(); +#else int l_errno = errno; +#endif char l_errbuf[128]; l_errbuf[0]=0; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); @@ -960,11 +1242,20 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool return; } - if ( a_is_ready ) + if ( a_is_ready ) { a_esocket->flags |= DAP_SOCK_READY_TO_WRITE; - else +#ifdef DAP_OS_WINDOWS + if (a_esocket->type == DESCRIPTOR_TYPE_QUEUE) + a_esocket->flags |= EPOLLONESHOT; +#endif + } + else { a_esocket->flags ^= DAP_SOCK_READY_TO_WRITE; - +#ifdef DAP_OS_WINDOWS + if (a_esocket->type == DESCRIPTOR_TYPE_QUEUE) + a_esocket->flags ^= EPOLLONESHOT; +#endif + } if( a_esocket->worker ) dap_events_socket_worker_poll_update_unsafe(a_esocket); } @@ -1010,25 +1301,27 @@ void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_p pthread_rwlock_unlock( &a_esocket->events->sockets_rwlock ); } - if ( a_esocket->_inheritor && !a_preserve_inheritor ) - DAP_DELETE( a_esocket->_inheritor ); - if (a_esocket->_pvt) - DAP_DELETE(a_esocket->_pvt); + if (!a_preserve_inheritor ) + DAP_DEL_Z(a_esocket->_inheritor) - if ( a_esocket->socket && a_esocket->socket != -1) { - #ifdef _WIN32 + DAP_DEL_Z(a_esocket->_pvt) + DAP_DEL_Z(a_esocket->buf_in) + DAP_DEL_Z(a_esocket->buf_out) +#ifdef DAP_OS_WINDOWS + if ( a_esocket->socket && a_esocket->socket != SOCKET_ERROR) { closesocket( a_esocket->socket ); - #else +#else + if ( a_esocket->socket && a_esocket->socket != -1) { close( a_esocket->socket ); - #ifdef DAP_EVENTS_CAPS_QUEUE_PIPE2 +#ifdef DAP_EVENTS_CAPS_QUEUE_PIPE2 if( a_esocket->type == DESCRIPTOR_TYPE_QUEUE){ close( a_esocket->fd2); } - #endif +#endif - #endif +#endif } - DAP_DELETE( a_esocket ); + DAP_DEL_Z( a_esocket ) } /** @@ -1062,8 +1355,11 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap #endif a_worker->event_sockets_count--; - if(a_worker->esockets) + if(a_worker->esockets) { + pthread_rwlock_wrlock(&a_worker->esocket_rwlock); HASH_DELETE(hh_worker,a_worker->esockets, a_es); + pthread_rwlock_unlock(&a_worker->esocket_rwlock); + } a_es->worker = NULL; } @@ -1078,7 +1374,9 @@ bool dap_events_socket_check_unsafe(dap_worker_t * a_worker,dap_events_socket_t if (a_es){ if ( a_worker->esockets){ dap_events_socket_t * l_es = NULL; + pthread_rwlock_rdlock(&a_worker->esocket_rwlock); HASH_FIND(hh_worker,a_worker->esockets,&a_es, sizeof(a_es), l_es ); + pthread_rwlock_unlock(&a_worker->esocket_rwlock); return l_es == a_es; }else return false; @@ -1278,14 +1576,14 @@ size_t dap_events_socket_write_f_mt(dap_worker_t * a_w,dap_events_socket_t *a_es */ size_t dap_events_socket_write_unsafe(dap_events_socket_t *sc, const void * data, size_t data_size) { - if(sc->buf_out_size>sizeof(sc->buf_out)){ - log_it(L_DEBUG,"write buffer already overflow size=%u/max=%u", sc->buf_out_size, sizeof(sc->buf_out)); + if(sc->buf_out_size > DAP_EVENTS_SOCKET_BUF){ + log_it(L_DEBUG,"write buffer already overflow size=%u/max=%u", sc->buf_out_size, DAP_EVENTS_SOCKET_BUF); return 0; } - //log_it(L_DEBUG,"dap_events_socket_write %u sock data %X size %u", sc->socket, data, data_size ); - data_size = ((sc->buf_out_size+data_size)<(sizeof(sc->buf_out)))?data_size:(sizeof(sc->buf_out)-sc->buf_out_size ); - memcpy(sc->buf_out+sc->buf_out_size,data,data_size); - sc->buf_out_size+=data_size; + //log_it(L_DEBUG,"dap_events_socket_write %u sock data %X size %u", sc->socket, data, data_size ); + data_size = (sc->buf_out_size + data_size < DAP_EVENTS_SOCKET_BUF) ? data_size : (DAP_EVENTS_SOCKET_BUF - sc->buf_out_size); + memcpy(sc->buf_out + sc->buf_out_size, data, data_size); + sc->buf_out_size += data_size; dap_events_socket_set_writable_unsafe(sc, true); return data_size; } @@ -1300,18 +1598,18 @@ size_t dap_events_socket_write_f_unsafe(dap_events_socket_t *sc, const char * fo { //log_it(L_DEBUG,"dap_events_socket_write_f %u sock", sc->socket ); - size_t max_data_size = sizeof(sc->buf_out)-sc->buf_out_size; + size_t max_data_size = DAP_EVENTS_SOCKET_BUF - sc->buf_out_size; va_list ap; - va_start(ap,format); - int ret=dap_vsnprintf((char*) sc->buf_out+sc->buf_out_size,max_data_size,format,ap); + va_start(ap, format); + int ret=dap_vsnprintf((char*)sc->buf_out + sc->buf_out_size, max_data_size, format, ap); va_end(ap); - if(ret>0){ - sc->buf_out_size+=ret; - }else{ - log_it(L_ERROR,"Can't write out formatted data '%s'",format); + if(ret > 0) { + sc->buf_out_size += (unsigned int)ret; + } else { + log_it(L_ERROR,"Can't write out formatted data '%s'", format); } dap_events_socket_set_writable_unsafe(sc, true); - return (ret > 0) ? ret : 0; + return (ret > 0) ? (unsigned int)ret : 0; } /** @@ -1358,3 +1656,37 @@ void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_siz } } + +#ifdef DAP_OS_WINDOWS +int dap_recvfrom(SOCKET s, void* buf_in, size_t buf_size) { + struct sockaddr_in l_dummy; + socklen_t l_size = sizeof(l_dummy); + int ret; + if (buf_in) { + memset(buf_in, 0, buf_size); + ret = recvfrom(s, (char*)buf_in, (long)buf_size, 0, (struct sockaddr *)&l_dummy, &l_size); + } else { + char l_tempbuf[sizeof(void*)]; + ret = recvfrom(s, l_tempbuf, sizeof(l_tempbuf), 0, (struct sockaddr *)&l_dummy, &l_size); + } + return ret; +} + +int dap_sendto(SOCKET s, void* buf_out, size_t buf_out_size) { + int l_addr_len; + struct sockaddr_in l_addr; + l_addr.sin_family = AF_INET; + IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } }; + l_addr.sin_addr = _in_addr; + l_addr.sin_port = s + 32768; + l_addr_len = sizeof(struct sockaddr_in); + int ret; + if (buf_out) { + ret = sendto(s, (char*)buf_out, (long)buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *)&l_addr, l_addr_len); + } else { + char l_bytes[sizeof(void*)] = { 0 }; + ret = sendto(s, l_bytes, sizeof(l_bytes), MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *)&l_addr, l_addr_len); + } + return ret; +} +#endif diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/net/core/dap_proc_queue.c index f932bc6c14dc46c9a66b629df1785375ed066eae..e04a8cfdd122d309b8a8b133ed42ef11b1485bc6 100644 --- a/dap-sdk/net/core/dap_proc_queue.c +++ b/dap-sdk/net/core/dap_proc_queue.c @@ -71,13 +71,13 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg) dap_proc_queue_msg_t * l_msg = (dap_proc_queue_msg_t*) a_msg; assert(l_msg); // We have callback to add in list - if (l_msg->callback){ + if (l_msg->callback) { dap_proc_queue_item_t * l_item = DAP_NEW_Z(dap_proc_queue_item_t); if (! l_item) return; l_item->callback = l_msg->callback; l_item->callback_arg = l_msg->callback_arg; if ( l_queue->item_last) - l_queue->item_last->prev = l_item; + l_queue->item_last = l_item; l_item->next=l_queue->item_last ; @@ -86,7 +86,6 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg) if( !l_queue->item_first) l_queue->item_first = l_item; - // Add on top so after call this callback will be executed first dap_events_socket_event_signal(l_queue->proc_thread->proc_event,1); //log_it( L_DEBUG, "Sent signal to proc thread that we have callback %p/%p on board", l_msg->callback,l_msg->callback_arg); @@ -94,7 +93,7 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg) if (l_msg->signal_kill){ // Say to kill this object and delete its inherior dap_proc_queue_t a_es->flags |= DAP_SOCK_SIGNAL_CLOSE; } - DAP_DELETE(l_msg); + DAP_DEL_Z(l_msg) } /** diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index d675b3544ec1abc412fb6854715eb433bd129743..028f1aec4e54ce6984c9818af13cac322b902e32 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -24,8 +24,11 @@ #include <assert.h> #include "dap_server.h" -#if defined(DAP_EVENTS_CAPS_EPOLL) +#if defined(DAP_EVENTS_CAPS_EPOLL) && !defined(DAP_OS_WINDOWS) #include <sys/epoll.h> +#elif defined DAP_OS_WINDOWS +#include "wepoll.h" +#include "ws2tcpip.h" #elif defined (DAP_EVENTS_CAPS_POLL) #include <poll.h> #else @@ -167,14 +170,25 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_va static int s_update_poll_flags(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket) { #ifdef DAP_EVENTS_CAPS_EPOLL - l_ev.events = a_esocket->ev_base_events; - if( a_esocket->flags & DAP_SOCK_READY_TO_READ) - l_ev.events |= EPOLLIN; - if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE) - l_ev.events |= EPOLLOUT; - l_ev.data.ptr = a_esocket ; - if( epoll_ctl(a_thread->epoll_ctl, EPOLL_CTL_MOD, a_esocket->fd , &l_ev) != 0 ){ - log_it(L_CRITICAL, "Can't add proc queue on epoll ctl"); + u_int events = a_esocket->ev_base_flags; + if( a_esocket->flags & DAP_SOCK_READY_TO_READ) { + events |= EPOLLIN; +#ifdef DAP_OS_WINDOWS + events ^= EPOLLONESHOT; +#endif + } + if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE) { + events |= EPOLLOUT; +#ifdef DAP_OS_WINDOWS + events |= EPOLLONESHOT; +#endif + } + a_esocket->ev.events = events; + if( epoll_ctl(a_thread->epoll_ctl, EPOLL_CTL_MOD, a_esocket->socket, &a_esocket->ev) != 0 ){ +#ifdef DAP_OS_WINDOWS + errno = WSAGetLastError(); +#endif + log_it(L_CRITICAL, "Can't add proc queue on epoll ctl, err: %d", errno); return -1; } #elif defined (DAP_EVENTS_CAPS_POLL) @@ -195,7 +209,12 @@ static void * s_proc_thread_function(void * a_arg) dap_cpu_assign_thread_on(l_thread->cpu_id); struct sched_param l_shed_params; l_shed_params.sched_priority = 0; +#ifdef DAP_OS_WINDOWS + if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST)) + log_it(L_ERROR, "Couldn't set thread priority, err: %d", GetLastError()); +#else pthread_setschedparam(pthread_self(),SCHED_BATCH ,&l_shed_params); +#endif l_thread->proc_queue = dap_proc_queue_create(l_thread); // Init proc_queue for related worker @@ -221,40 +240,51 @@ static void * s_proc_thread_function(void * a_arg) l_thread->queue_io_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_io ); } #ifdef DAP_EVENTS_CAPS_EPOLL - struct epoll_event *l_epoll_events = l_thread->epoll_events, l_ev; - memset(l_thread->epoll_events, 0,sizeof (l_thread->epoll_events)); + struct epoll_event l_epoll_events[ DAP_EVENTS_SOCKET_MAX]= { { 0 } }; // Create epoll ctl l_thread->epoll_ctl = epoll_create( DAP_EVENTS_SOCKET_MAX ); // add proc queue - l_ev.events = l_thread->proc_queue->esocket->ev_base_flags; - l_ev.data.ptr = l_thread->proc_queue->esocket; - if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->proc_queue->esocket->socket , &l_ev) != 0 ){ - log_it(L_CRITICAL, "Can't add proc queue on epoll ctl"); + l_thread->proc_queue->esocket->ev.events = l_thread->proc_queue->esocket->ev_base_flags; + l_thread->proc_queue->esocket->ev.data.ptr = l_thread->proc_queue->esocket; + if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->proc_queue->esocket->socket , &l_thread->proc_queue->esocket->ev) != 0 ){ +#ifdef DAP_OS_WINDOWS + errno = WSAGetLastError(); +#endif + log_it(L_CRITICAL, "Can't add proc queue %d on epoll ctl, error", l_thread->proc_queue->esocket->socket, errno); return NULL; } // Add proc event - l_ev.events = l_thread->proc_event->ev_base_flags ; - l_ev.data.ptr = l_thread->proc_event; - if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->proc_event->fd , &l_ev) != 0 ){ - log_it(L_CRITICAL, "Can't add proc queue on epoll ctl"); + l_thread->proc_event->ev.events = l_thread->proc_event->ev_base_flags ; + l_thread->proc_event->ev.data.ptr = l_thread->proc_event; + if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->proc_event->socket , &l_thread->proc_event->ev) != 0 ){ +#ifdef DAP_OS_WINDOWS + errno = WSAGetLastError(); +#endif + log_it(L_CRITICAL, "Can't add proc event on epoll ctl, err: %d", errno); return NULL; } for (size_t n = 0; n< dap_events_worker_get_count(); n++){ - l_ev.events = l_thread->queue_assign_input[n]->ev_base_flags ; - l_ev.data.ptr = l_thread->queue_assign_input[n]; - if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_assign_input[n]->fd , &l_ev) != 0 ){ - log_it(L_CRITICAL, "Can't add proc queue on epoll ctl"); + l_thread->queue_assign_input[n]->ev.events = l_thread->queue_assign_input[n]->ev_base_flags ; + l_thread->queue_assign_input[n]->ev.data.ptr = l_thread->queue_assign_input[n]; + if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_assign_input[n]->socket, &l_thread->queue_assign_input[n]->ev) != 0 ){ +#ifdef DAP_OS_WINDOWS + errno = WSAGetLastError(); +#endif + log_it(L_CRITICAL, "Can't add queue input on epoll ctl, err: %d", errno); return NULL; } - l_ev.events = l_thread->queue_io_input[n]->ev_base_flags ; - l_ev.data.ptr = l_thread->queue_io_input[n]; - if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_io_input[n]->fd , &l_ev) != 0 ){ - log_it(L_CRITICAL, "Can't add proc queue on epoll ctl"); + l_thread->queue_io_input[n]->ev.events = l_thread->queue_io_input[n]->ev_base_flags ; + l_thread->queue_io_input[n]->ev.data.ptr = l_thread->queue_io_input[n]; + if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_io_input[n]->fd , &l_thread->queue_io_input[n]->ev) != 0 ){ +#ifdef DAP_OS_WINDOWS + errno = WSAGetLastError(); +#endif + log_it(L_CRITICAL, "Can't add proc io input on epoll ctl, err: %d", errno); return NULL; } } @@ -371,8 +401,8 @@ static void * s_proc_thread_function(void * a_arg) continue; } if(s_debug_reactor) - log_it(L_DEBUG, "Poc thread #%u esocket %p fd=%d flags=0x%0X (%s:%s:%s:%s:%s:%s:%s:%s)", l_thread->cpu_id, l_cur, l_cur->socket, - l_cur_events, l_flag_read?"read":"", l_flag_write?"write":"", l_flag_error?"error":"", + log_it(L_DEBUG, "Proc thread #%u esocket %p fd=%d type=%d flags=0x%0X (%s:%s:%s:%s:%s:%s:%s:%s)", l_thread->cpu_id, l_cur, l_cur->socket, + l_cur->type, l_cur_events, l_flag_read?"read":"", l_flag_write?"write":"", l_flag_error?"error":"", l_flag_hup?"hup":"", l_flag_rdhup?"rdhup":"", l_flag_msg?"msg":"", l_flag_nval?"nval":"", l_flag_pri?"pri":""); //log_it(L_DEBUG,"Waked up esocket %p (socket %d) {read:%s,write:%s,error:%s} ", l_cur, l_cur->fd, @@ -380,21 +410,25 @@ static void * s_proc_thread_function(void * a_arg) time_t l_cur_time = time( NULL); l_cur->last_time_active = l_cur_time; if (l_flag_error){ -#if defined DAP_OS_UNIX +#ifdef DAP_OS_WINDOWS + int l_errno = WSAGetLastError(); +#else int l_errno = errno; +#endif char l_errbuf[128]; strerror_r(l_errno, l_errbuf,sizeof (l_errbuf)); log_it(L_ERROR,"Some error on proc thread #%u with %d socket: %s(%d)",l_thread->cpu_id, l_cur->socket, l_errbuf, l_errno); -#elif defined DAP_OS_WINDOWS - log_it(L_ERROR,"Some error occured on thread #%u with socket %d, errno: %d",l_thread->cpu_id, l_cur->socket, errno); -#endif if(l_cur->callbacks.error_callback) l_cur->callbacks.error_callback(l_cur, errno); } if (l_flag_read ){ + int32_t l_bytes_read = 0; switch (l_cur->type) { case DESCRIPTOR_TYPE_QUEUE: dap_events_socket_queue_proc_input_unsafe(l_cur); +#ifdef DAP_OS_WINDOWS + l_bytes_read = dap_recvfrom(l_cur->socket, NULL, 0); +#endif break; case DESCRIPTOR_TYPE_EVENT: dap_events_socket_event_proc_input_unsafe (l_cur); @@ -402,6 +436,9 @@ static void * s_proc_thread_function(void * a_arg) default: log_it(L_ERROR, "Unprocessed descriptor type accepted in proc thread loop"); +#ifdef DAP_OS_WINDOWS + l_bytes_read = dap_recvfrom(l_cur->socket, NULL, 0); +#endif break; } } @@ -414,6 +451,39 @@ static void * s_proc_thread_function(void * a_arg) if (l_cur->flags & DAP_SOCK_QUEUE_PTR){ #if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) l_bytes_sent = write(l_cur->socket, l_cur->buf_out, sizeof (void *) ); // We send pointer by pointer + #elif defined DAP_EVENTS_CAPS_QUEUE_POSIX + l_bytes_sent = mq_send(a_es->mqd, (const char *) l_cur->buf_out, sizeof (void *),0); + #elif defined DAP_EVENTS_CAPS_MSMQ + DWORD l_mp_id = 0; + MQMSGPROPS l_mps; + MQPROPVARIANT l_mpvar[1]; + MSGPROPID l_p_id[1]; + HRESULT l_mstatus[1]; + + l_p_id[l_mp_id] = PROPID_M_BODY; + l_mpvar[l_mp_id].vt = VT_VECTOR | VT_UI1; + l_mpvar[l_mp_id].caub.pElems = l_cur->buf_out; + l_mpvar[l_mp_id].caub.cElems = (u_long)l_cur->buf_out_size; + l_mp_id++; + + l_mps.cProp = l_mp_id; + l_mps.aPropID = l_p_id; + l_mps.aPropVar = l_mpvar; + l_mps.aStatus = l_mstatus; + HRESULT hr = MQSendMessage(l_cur->mqh, &l_mps, MQ_NO_TRANSACTION); + + if (hr != MQ_OK) { + log_it(L_ERROR, "An error occured on sending message to queue, errno: 0x%x", hr); + break; + } else { + if(dap_sendto(l_cur->socket, NULL, 0) == SOCKET_ERROR) { + log_it(L_ERROR, "Write to sock error: %d", WSAGetLastError()); + } + l_cur->buf_out_size = 0; + dap_events_socket_set_writable_unsafe(l_cur,false); + + break; + } #elif defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE) l_bytes_sent = mq_send(l_cur->mqd, (const char *) l_cur->buf_out, sizeof (void *),0); if (l_bytes_sent==0) @@ -455,7 +525,8 @@ static void * s_proc_thread_function(void * a_arg) } if(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE){ #ifdef DAP_EVENTS_CAPS_EPOLL - if ( epoll_ctl( l_thread->epoll_ctl, EPOLL_CTL_DEL, l_cur->fd, &l_cur->ev ) == -1 ) + log_it(L_WARNING, "Deleting esocket %d from proc thread?...", l_cur->fd); + if ( epoll_ctl( l_thread->epoll_ctl, EPOLL_CTL_DEL, l_cur->fd, &l_cur->ev) == -1 ) log_it( L_ERROR,"Can't remove event socket's handler from the epoll ctl" ); //else // log_it( L_DEBUG,"Removed epoll's event from proc thread #%u", l_thread->cpu_id ); diff --git a/dap-sdk/net/core/dap_server.c b/dap-sdk/net/core/dap_server.c index 2443ee3ae644e95d0d3396e954ed11dec7f61acc..ca9101f7ae3e4342166dfb1159579da3e2c1ea9b 100644 --- a/dap-sdk/net/core/dap_server.c +++ b/dap-sdk/net/core/dap_server.c @@ -21,15 +21,22 @@ along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>. */ +#ifdef DAP_OS_WINDOWS +#include "wepoll.h" +#include <ws2tcpip.h> +#elif defined DAP_OS_UNIX #include <arpa/inet.h> #include <netinet/in.h> #include <sys/socket.h> +#include <sys/epoll.h> +#include <netdb.h> +#include <sys/timerfd.h> +#endif + #include <sys/types.h> #include <sys/stat.h> -#include <sys/epoll.h> -#include <netdb.h> #include <unistd.h> #include <fcntl.h> #include <string.h> @@ -41,7 +48,6 @@ #include <stddef.h> #include <errno.h> #include <signal.h> -#include <sys/timerfd.h> #include <utlist.h> #if ! defined(_GNU_SOURCE) #define _GNU_SOURCE @@ -117,9 +123,10 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16 { assert(a_events); dap_server_t *l_server = DAP_NEW_Z(dap_server_t); - +#ifndef DAP_OS_WINDOWS l_server->socket_listener=-1; // To diff it from 0 fd - l_server->address = a_addr? strdup( a_addr) : strdup("0.0.0.0"); // If NULL we listen everything +#endif + l_server->address = a_addr ? strdup(a_addr) : strdup("0.0.0.0"); // If NULL we listen everything l_server->port = a_port; l_server->type = a_type; @@ -127,10 +134,14 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16 l_server->socket_listener = socket(AF_INET, SOCK_STREAM, 0); else if (l_server->type == DAP_SERVER_UDP) l_server->socket_listener = socket(AF_INET, SOCK_DGRAM, 0); - +#ifdef DAP_OS_WINDOWS + if (l_server->socket_listener == INVALID_SOCKET) { + log_it(L_ERROR, "Socket error: %d", WSAGetLastError()); +#else if (l_server->socket_listener < 0) { int l_errno = errno; log_it (L_ERROR,"Socket error %s (%d)",strerror(l_errno), l_errno); +#endif DAP_DELETE(l_server); return NULL; } @@ -140,6 +151,7 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16 if (setsockopt(l_server->socket_listener, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0) log_it(L_WARNING, "Can't set up REUSEADDR flag to the socket"); + reuse=1; #ifdef SO_REUSEPORT if (setsockopt(l_server->socket_listener, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0) log_it(L_WARNING, "Can't set up REUSEPORT flag to the socket"); @@ -150,17 +162,26 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16 l_server->listener_addr.sin_port = htons(l_server->port); inet_pton(AF_INET, l_server->address, &(l_server->listener_addr.sin_addr)); - if(bind (l_server->socket_listener, (struct sockaddr *) &(l_server->listener_addr), sizeof(l_server->listener_addr)) < 0){ + if(bind (l_server->socket_listener, (struct sockaddr *) &(l_server->listener_addr), sizeof(l_server->listener_addr)) < 0) { +#ifdef DAP_OS_WINDOWS + log_it(L_ERROR,"Bind error: %d", WSAGetLastError()); + closesocket(l_server->socket_listener); +#else log_it(L_ERROR,"Bind error: %s",strerror(errno)); close(l_server->socket_listener); +#endif DAP_DELETE(l_server); return NULL; - }else{ + } else { log_it(L_INFO,"Binded %s:%u",l_server->address,l_server->port); listen(l_server->socket_listener, SOMAXCONN); } - +#ifdef DAP_OS_WINDOWS + u_long l_mode = 0; + ioctlsocket(l_server->socket_listener, (long)FIONBIO, &l_mode); +#else fcntl( l_server->socket_listener, F_SETFL, O_NONBLOCK); +#endif pthread_mutex_init(&l_server->started_mutex,NULL); pthread_cond_init(&l_server->started_cond,NULL); @@ -179,7 +200,7 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16 } // if we have poll exclusive -#if defined(DAP_EVENTS_CAPS_EPOLL) +#ifdef DAP_EVENTS_CAPS_EPOLL for(size_t l_worker_id = 0; l_worker_id < dap_events_worker_get_count() ; l_worker_id++){ dap_worker_t *l_w = dap_events_worker_get(l_worker_id); assert(l_w); @@ -188,9 +209,10 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16 if (l_es) { l_es->type = l_server->type == DAP_SERVER_TCP ? DESCRIPTOR_TYPE_SOCKET_LISTENING : DESCRIPTOR_TYPE_SOCKET_UDP; -#ifdef DAP_EVENTS_CAPS_EPOLL // Prepare for multi thread listening - l_es->ev_base_flags = EPOLLET| EPOLLIN | EPOLLEXCLUSIVE; + l_es->ev_base_flags = EPOLLIN; +#ifdef EPOLLEXCLUSIVE + l_es->ev_base_flags |= EPOLLET | EPOLLEXCLUSIVE; #endif l_es->_inheritor = l_server; pthread_mutex_lock(&l_server->started_mutex); @@ -215,7 +237,7 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16 dap_worker_add_events_socket( l_es, l_w ); pthread_cond_wait(&l_server->started_cond, &l_server->started_mutex); pthread_mutex_unlock(&l_server->started_mutex); - } else{ + } else { log_it(L_WARNING, "Can't wrap event socket for %s:%u server", a_addr, a_port); return NULL; } @@ -271,7 +293,7 @@ static void s_es_server_accept(dap_events_socket_t *a_es, int a_remote_socket, s l_es_new = s_es_server_create(a_es->events,a_remote_socket,&l_server->client_callbacks,l_server); //l_es_new->is_dont_reset_write_flag = true; // By default all income connection has this flag getnameinfo(a_remote_addr,a_remote_addr_size, l_es_new->hostaddr - , sizeof(l_es_new->hostaddr),l_es_new->service,sizeof(l_es_new->service), + ,256, l_es_new->service,sizeof(l_es_new->service), NI_NUMERICHOST | NI_NUMERICSERV); log_it(L_INFO,"Connection accepted from %s (%s)", l_es_new->hostaddr, l_es_new->service ); @@ -298,6 +320,8 @@ static dap_events_socket_t * s_es_server_create(dap_events_t * a_events, int a_s ret = dap_events_socket_wrap_no_add(a_events, a_sock, a_callbacks); ret->type = DESCRIPTOR_TYPE_SOCKET; ret->server = a_server; + ret->hostaddr = DAP_NEW_Z_SIZE(char, 256); + ret->service = DAP_NEW_Z_SIZE(char, 54); } else { log_it(L_CRITICAL,"Accept error: %s",strerror(errno)); diff --git a/dap-sdk/net/core/dap_timerfd.c b/dap-sdk/net/core/dap_timerfd.c index e259e889803c16cc89031ae46411717b55c18131..01b0b706b4030d81a10b8dbb4129f874b441727e 100644 --- a/dap-sdk/net/core/dap_timerfd.c +++ b/dap-sdk/net/core/dap_timerfd.c @@ -69,9 +69,8 @@ void __stdcall TimerAPCb(void* arg, DWORD low, DWORD high) { // Timer high valu UNREFERENCED_PARAMETER(low) UNREFERENCED_PARAMETER(high) dap_timerfd_t *l_timerfd = (dap_timerfd_t *)arg; - byte_t l_bytes[sizeof(void*)] = { 0 }; - if (write(l_timerfd->pipe_in, l_bytes, sizeof(l_bytes)) == -1) { - log_it(L_CRITICAL, "Error occured on writing into pipe from APC, errno: %d", errno); + if (dap_sendto(l_timerfd->tfd, NULL, 0) == SOCKET_ERROR) { + log_it(L_CRITICAL, "Error occured on writing into socket from APC, errno: %d", WSAGetLastError()); } } #endif @@ -115,14 +114,29 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t DAP_DELETE(l_timerfd); return NULL; } - int l_pipe[2]; - if (pipe(l_pipe) < 0) { - log_it(L_ERROR, "Can't create pipe, error: %d", errno); - DAP_DELETE(l_timerfd); - return NULL; - } + + SOCKET l_tfd = socket(AF_INET, SOCK_DGRAM, 0); + int buffsize = 1024; + + setsockopt(l_tfd, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, sizeof(int)); + unsigned long l_mode = 0; - int l_tfd = l_pipe[0]; + ioctlsocket(l_tfd, FIONBIO, &l_mode); + + int l_addr_len; + struct sockaddr_in l_addr; + l_addr.sin_family = AF_INET; + IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } }; + l_addr.sin_addr = _in_addr; + l_addr.sin_port = l_tfd + 32768; + l_addr_len = sizeof(struct sockaddr_in); + + if (bind(l_tfd, (struct sockaddr*)&l_addr, sizeof(l_addr)) < 0) { + log_it(L_ERROR, "Bind error: %d", WSAGetLastError()); + } else { + log_it(L_INFO, "Binded %d", l_tfd); + } + LARGE_INTEGER l_due_time; l_due_time.QuadPart = (long long)a_timeout_ms * _MSEC; if (!SetWaitableTimer(l_th, &l_due_time, 0, TimerAPCb, l_timerfd, false)) { @@ -151,10 +165,6 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t l_timerfd->callback_arg = a_callback_arg; #ifdef DAP_OS_WINDOWS l_timerfd->th = l_th; - l_timerfd->pipe_in = l_pipe[1]; - /*ioctlsocket(l_pipe[0], FIONBIO, &l_mode); - l_mode = 0; - ioctlsocket(l_pipe[1], FIONBIO, &l_mode);*/ #endif dap_worker_add_events_socket(l_events_socket, a_worker); return l_timerfd; @@ -191,8 +201,10 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock) #endif dap_events_socket_set_readable_unsafe(a_event_sock, true); } else { +#ifndef DAP_OS_WINDOWS close(l_timerfd->tfd); -#if defined DAP_OS_WINDOWS +#else + closesocket(l_timerfd->tfd); CloseHandle(l_timerfd->th); #endif l_timerfd->events_socket->flags |= DAP_SOCK_SIGNAL_CLOSE; diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 66d79e3b52e905abbe84a57bc4b372a65c0f8673..92e6c98869a71ccf9630e0de0ad48f231f092e4a 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -28,15 +28,19 @@ #endif #include <fcntl.h> #include <sys/types.h> +#ifdef DAP_OS_UNIX #include <sys/socket.h> #include <arpa/inet.h> #include <sys/resource.h> - +#elif defined DAP_OS_WINDOWS +#include <ws2tcpip.h> +#endif #include "dap_common.h" #include "dap_config.h" #include "dap_math_ops.h" #include "dap_worker.h" #include "dap_events.h" +#include "dap_enc_base64.h" #define LOG_TAG "dap_worker" @@ -63,6 +67,9 @@ int dap_worker_init( size_t a_conn_timeout ) if ( a_conn_timeout ) s_connection_timeout = a_conn_timeout; + s_debug_reactor = dap_config_get_item_bool_default(g_config,"general","debug_reactor",false); +#ifdef DAP_OS_UNIX +======= s_debug_reactor =g_config? dap_config_get_item_bool_default(g_config,"general","debug_reactor",false) : false; struct rlimit l_fdlimit; @@ -74,6 +81,7 @@ int dap_worker_init( size_t a_conn_timeout ) if (setrlimit(RLIMIT_NOFILE, &l_fdlimit)) return -2; log_it(L_INFO, "Set maximum opened descriptors from %d to %d", l_oldlimit, l_fdlimit.rlim_cur); +#endif return 0; } @@ -96,7 +104,12 @@ void *dap_worker_thread(void *arg) dap_cpu_assign_thread_on(l_worker->id); struct sched_param l_shed_params; l_shed_params.sched_priority = 0; +#ifdef DAP_OS_WINDOWS + if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL)) + log_it(L_ERROR, "Couldn'r set thread priority, err: %d", GetLastError()); +#else pthread_setschedparam(pthread_self(),SCHED_FIFO ,&l_shed_params); +#endif #ifdef DAP_EVENTS_CAPS_EPOLL struct epoll_event l_epoll_events[ DAP_EVENTS_SOCKET_MAX]= {{0}}; @@ -109,15 +122,13 @@ void *dap_worker_thread(void *arg) #error "Unimplemented socket array for this platform" #endif - - l_worker->queue_es_new = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_add_es_callback); - l_worker->queue_es_delete = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_delete_es_callback); - l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_io_callback); - l_worker->queue_es_reassign = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_reassign_callback ); - l_worker->queue_callback= dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_callback_callback); - - - l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback ); + l_worker->queue_es_new = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_add_es_callback); + l_worker->queue_es_delete = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_delete_es_callback); + l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_es_io_callback); + l_worker->queue_es_reassign = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_es_reassign_callback ); + l_worker->queue_callback = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_callback_callback); + l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback); + l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker, s_connection_timeout * 1000 / 2, s_socket_all_check_activity, l_worker); @@ -137,10 +148,14 @@ void *dap_worker_thread(void *arg) if(l_selected_sockets == -1) { if( errno == EINTR) continue; +#ifdef DAP_OS_WINDOWS + log_it(L_ERROR, "Worker thread %d got errno %d", l_worker->id, WSAGetLastError()); +#else int l_errno = errno; char l_errbuf[128]; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); log_it(L_ERROR, "Worker thread %d got errno:\"%s\" (%d)", l_worker->id, l_errbuf, l_errno); +#endif break; } @@ -150,14 +165,14 @@ void *dap_worker_thread(void *arg) bool l_flag_hup, l_flag_rdhup, l_flag_read, l_flag_write, l_flag_error, l_flag_nval, l_flag_msg, l_flag_pri; #ifdef DAP_EVENTS_CAPS_EPOLL l_cur = (dap_events_socket_t *) l_epoll_events[n].data.ptr; - uint32_t l_cur_events = l_epoll_events[n]; - l_flag_hup = l_epoll_events[n].events & EPOLLHUP; - l_flag_rdhup = l_epoll_events[n].events & EPOLLRDHUP; - l_flag_write = l_epoll_events[n].events & EPOLLOUT; - l_flag_read = l_epoll_events[n].events & EPOLLIN; - l_flag_error = l_epoll_events[n].events & EPOLLERR; - l_flag_pri = l_epoll_events[n].events & EPOLLPRI; - l_flag_nval = false; + uint32_t l_cur_events = l_epoll_events[n].events; + l_flag_hup = l_cur_events & EPOLLHUP; + l_flag_rdhup = l_cur_events & EPOLLRDHUP; + l_flag_write = l_cur_events & EPOLLOUT; + l_flag_read = l_cur_events & EPOLLIN; + l_flag_error = l_cur_events & EPOLLERR; + l_flag_pri = l_cur_events & EPOLLPRI; + l_flag_nval = false; #elif defined ( DAP_EVENTS_CAPS_POLL) short l_cur_events =l_worker->poll[n].revents; if (!l_cur_events) // No events for this socket @@ -179,46 +194,42 @@ void *dap_worker_thread(void *arg) log_it(L_ERROR, "dap_events_socket NULL"); continue; } - if(s_debug_reactor) - log_it(L_DEBUG, "Worker #%u esocket %p fd=%d flags=0x%0X (%s:%s:%s:%s:%s:%s:%s:%s)", l_worker->id, l_cur, l_cur->socket, + if(s_debug_reactor) { + log_it(L_DEBUG, "Worker #%u esocket %p type %d fd=%d flags=0x%0X (%s:%s:%s:%s:%s:%s:%s:%s)", l_worker->id, l_cur, l_cur->type, l_cur->socket, l_cur_events, l_flag_read?"read":"", l_flag_write?"write":"", l_flag_error?"error":"", l_flag_hup?"hup":"", l_flag_rdhup?"rdhup":"", l_flag_msg?"msg":"", l_flag_nval?"nval":"", l_flag_pri?"pri":""); + } int l_sock_err = 0, l_sock_err_size = sizeof(l_sock_err); //connection already closed (EPOLLHUP - shutdown has been made in both directions) - if (l_flag_rdhup){ - switch (l_cur->type ){ - case DESCRIPTOR_TYPE_SOCKET_UDP: - case DESCRIPTOR_TYPE_SOCKET: - dap_events_socket_set_readable_unsafe(l_cur, false); - dap_events_socket_set_writable_unsafe(l_cur, false); - l_cur->buf_out_size = 0; - l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; - l_flag_error = l_flag_write = false; - break; - default:{} - } - if(s_debug_reactor) - log_it(L_INFO,"RDHUP event on esocket %p (%d) type %d", l_cur, l_cur->socket, l_cur->type ); - } + if( l_flag_hup ) { switch (l_cur->type ){ case DESCRIPTOR_TYPE_SOCKET_UDP: - case DESCRIPTOR_TYPE_SOCKET: - getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size); - if (l_sock_err) { - dap_events_socket_set_readable_unsafe(l_cur, false); - dap_events_socket_set_writable_unsafe(l_cur, false); - l_cur->buf_out_size = 0; - l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; - l_flag_error = l_flag_write = false; - l_cur->callbacks.error_callback(l_cur, l_sock_err); // Call callback to process error event - log_it(L_INFO, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err)); - } + case DESCRIPTOR_TYPE_SOCKET: { + int l_err = getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size); +#ifndef DAP_OS_WINDOWS + if (l_sock_err) { + log_it(L_DEBUG, "Socket %d error %d", l_cur->socket, l_sock_err); +#else + if (l_err == SOCKET_ERROR) { + log_it(L_DEBUG, "Socket %d will be shutdown (EPOLLHUP), error %d", l_cur->socket, WSAGetLastError()); +#endif + dap_events_socket_set_readable_unsafe(l_cur, false); + dap_events_socket_set_writable_unsafe(l_cur, false); + l_cur->buf_out_size = 0; + l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + l_flag_error = l_flag_write = false; + l_cur->callbacks.error_callback(l_cur, l_sock_err); // Call callback to process error event +#ifndef DAP_OS_WINDOWS + log_it(L_INFO, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err)); +#endif + } break; - default: - if(s_debug_reactor) - log_it(L_INFO,"RDHUP event on esocket %p (%d) type %d", l_cur, l_cur->socket, l_cur->type ); + } + default: + if(s_debug_reactor) + log_it(L_INFO,"RDHUP event on esocket %p (%d) type %d", l_cur, l_cur->socket, l_cur->type ); } } @@ -234,6 +245,9 @@ void *dap_worker_thread(void *arg) case DESCRIPTOR_TYPE_SOCKET_LISTENING: case DESCRIPTOR_TYPE_SOCKET: getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size); +#ifdef DAP_OS_WINDOWS + log_it(L_ERROR, "Winsock error: %d", WSAGetLastError()); +#endif log_it(L_ERROR, "Socket error: %s", strerror(l_sock_err)); default: ; } @@ -256,7 +270,7 @@ void *dap_worker_thread(void *arg) if(l_flag_read) { //log_it(L_DEBUG, "Comes connection with type %d", l_cur->type); - if(l_cur->buf_in_size == sizeof(l_cur->buf_in)) { + if(l_cur->buf_in_size >= DAP_EVENTS_SOCKET_BUF) { log_it(L_WARNING, "Buffer is full when there is smth to read. Its dropped!"); l_cur->buf_in_size = 0; } @@ -268,21 +282,25 @@ void *dap_worker_thread(void *arg) case DESCRIPTOR_TYPE_PIPE: case DESCRIPTOR_TYPE_FILE: l_must_read_smth = true; +#ifdef DAP_OS_WINDOWS + l_bytes_read = dap_recvfrom(l_cur->socket, l_cur->buf_in + l_cur->buf_in_size, DAP_EVENTS_SOCKET_BUF - l_cur->buf_in_size); +#else l_bytes_read = read(l_cur->socket, (char *) (l_cur->buf_in + l_cur->buf_in_size), sizeof(l_cur->buf_in) - l_cur->buf_in_size); +#endif l_errno = errno; break; case DESCRIPTOR_TYPE_SOCKET: l_must_read_smth = true; l_bytes_read = recv(l_cur->fd, (char *) (l_cur->buf_in + l_cur->buf_in_size), - sizeof(l_cur->buf_in) - l_cur->buf_in_size, 0); + DAP_EVENTS_SOCKET_BUF - l_cur->buf_in_size, 0); l_errno = errno; break; case DESCRIPTOR_TYPE_SOCKET_UDP: { l_must_read_smth = true; socklen_t l_size = sizeof(l_cur->remote_addr); l_bytes_read = recvfrom(l_cur->fd, (char *) (l_cur->buf_in + l_cur->buf_in_size), - sizeof(l_cur->buf_in) - l_cur->buf_in_size, 0, + DAP_EVENTS_SOCKET_BUF - l_cur->buf_in_size, 0, (struct sockaddr *)&l_cur->remote_addr, &l_size); l_errno = errno; @@ -294,7 +312,12 @@ void *dap_worker_thread(void *arg) struct sockaddr l_remote_addr; socklen_t l_remote_addr_size= sizeof (l_remote_addr); int l_remote_socket= accept(l_cur->socket ,&l_remote_addr,&l_remote_addr_size); +#ifdef DAP_OS_WINDOWS + u_long l_mode = 0; + ioctlsocket((SOCKET)l_remote_socket, (long)FIONBIO, &l_mode); +#else fcntl( l_remote_socket, F_SETFL, O_NONBLOCK); +#endif int l_errno = errno; if ( l_remote_socket == -1 ){ @@ -315,7 +338,11 @@ void *dap_worker_thread(void *arg) case DESCRIPTOR_TYPE_TIMER:{ uint64_t val; /* if we not reading data from socket, he triggered again */ +#ifdef DAP_OS_WINDOWS + l_bytes_read = dap_recvfrom(l_cur->socket, NULL, 0); +#else read( l_cur->fd, &val, 8); +#endif if (l_cur->callbacks.timer_callback) l_cur->callbacks.timer_callback(l_cur); else @@ -323,9 +350,16 @@ void *dap_worker_thread(void *arg) } break; case DESCRIPTOR_TYPE_QUEUE: +#ifdef DAP_OS_WINDOWS + l_bytes_read = dap_recvfrom(l_cur->socket, NULL, 0); +#endif dap_events_socket_queue_proc_input_unsafe(l_cur); + dap_events_socket_set_writable_unsafe(l_cur, false); break; case DESCRIPTOR_TYPE_EVENT: +#ifdef DAP_OS_WINDOWS + l_bytes_read = dap_recvfrom(l_cur->socket, NULL, 0); +#endif dap_events_socket_event_proc_input_unsafe(l_cur); break; } @@ -365,29 +399,42 @@ void *dap_worker_thread(void *arg) } } - //if (l_flag_write) - // log_it(L_DEBUG,"Alarmed write flag for remote %s", l_cur->remote_addr_str[0]?l_cur->remote_addr_str:"(null)"); + // Possibly have data to read despite EPOLLRDHUP + if (l_flag_rdhup){ + switch (l_cur->type ){ + case DESCRIPTOR_TYPE_SOCKET_UDP: + case DESCRIPTOR_TYPE_SOCKET: + dap_events_socket_set_readable_unsafe(l_cur, false); + dap_events_socket_set_writable_unsafe(l_cur, false); + l_cur->buf_out_size = 0; + l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + l_flag_error = l_flag_write = false; + break; + default:{} + } + if(s_debug_reactor) + log_it(L_INFO,"RDHUP event on esocket %p (%d) type %d", l_cur, l_cur->socket, l_cur->type ); + } // If its outgoing connection - if ( l_flag_write && ! l_cur->server && l_cur->flags& DAP_SOCK_CONNECTING && + if ( l_flag_write && !l_cur->server && (l_cur->flags & DAP_SOCK_CONNECTING) && (l_cur->type == DESCRIPTOR_TYPE_SOCKET || l_cur->type == DESCRIPTOR_TYPE_SOCKET_UDP )){ int l_error = 0; socklen_t l_error_len = sizeof(l_error); char l_error_buf[128]; l_error_buf[0]='\0'; - getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, &l_error, &l_error_len); + getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_error, &l_error_len); if (l_error){ strerror_r(l_error, l_error_buf, sizeof (l_error_buf)); - log_it(L_ERROR,"Connecting error with %s: \"%s\" (code %d)", l_cur->remote_addr_str[0]? l_cur->remote_addr_str: "(NULL)" - , + log_it(L_ERROR,"Connecting error with %s: \"%s\" (code %d)", l_cur->remote_addr_str ? l_cur->remote_addr_str: "(NULL)", l_error_buf, l_error); if ( l_cur->callbacks.error_callback ) l_cur->callbacks.error_callback(l_cur, l_error); }else if(l_error == EINPROGRESS) { - log_it(L_DEBUG, "Connecting with %s in progress...", l_cur->remote_addr_str[0]? l_cur->remote_addr_str: "(NULL)"); + log_it(L_DEBUG, "Connecting with %s in progress...", l_cur->remote_addr_str ? l_cur->remote_addr_str: "(NULL)"); }else{ if(s_debug_reactor) - log_it(L_NOTICE, "Connected with %s",l_cur->remote_addr_str[0]? l_cur->remote_addr_str: "(NULL)"); + log_it(L_NOTICE, "Connected with %s",l_cur->remote_addr_str ? l_cur->remote_addr_str: "(NULL)"); l_cur->flags ^= DAP_SOCK_CONNECTING; if (l_cur->callbacks.connected_callback) l_cur->callbacks.connected_callback(l_cur); @@ -427,12 +474,20 @@ void *dap_worker_thread(void *arg) //for(total_sent = 0; total_sent < cur->buf_out_size;) { // If after callback there is smth to send - we do it ssize_t l_bytes_sent =0; int l_errno; + switch (l_cur->type){ - case DESCRIPTOR_TYPE_SOCKET: + case DESCRIPTOR_TYPE_SOCKET: { l_bytes_sent = send(l_cur->socket, (const char *)l_cur->buf_out, l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL); - l_errno = errno; +#ifdef DAP_OS_WINDOWS + dap_events_socket_set_writable_unsafe(l_cur,false); + l_errno = WSAGetLastError(); +#else + l_errno = errno; +#endif + break; + } case DESCRIPTOR_TYPE_SOCKET_UDP: l_bytes_sent = sendto(l_cur->socket, (const char *)l_cur->buf_out, l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL, @@ -444,7 +499,45 @@ void *dap_worker_thread(void *arg) #if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) l_bytes_sent = write(l_cur->socket, l_cur->buf_out, sizeof (void *) ); // We send pointer by pointer - l_errno = errno; +#elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX) + l_bytes_sent = mq_send(a_es->mqd, (const char *)&a_arg,sizeof (a_arg),0); +#elif defined DAP_EVENTS_CAPS_MSMQ + DWORD l_mp_id = 0; + MQMSGPROPS l_mps; + MQPROPVARIANT l_mpvar[1]; + MSGPROPID l_p_id[1]; + HRESULT l_mstatus[1]; + + l_p_id[l_mp_id] = PROPID_M_BODY; + l_mpvar[l_mp_id].vt = VT_VECTOR | VT_UI1; + l_mpvar[l_mp_id].caub.pElems = l_cur->buf_out; + l_mpvar[l_mp_id].caub.cElems = (u_long)l_cur->buf_out_size; + l_mp_id++; + + l_mps.cProp = l_mp_id; + l_mps.aPropID = l_p_id; + l_mps.aPropVar = l_mpvar; + l_mps.aStatus = l_mstatus; + log_it(L_INFO, "Sent to SOCKET %d", l_cur->socket); + HRESULT hr = MQSendMessage(l_cur->mqh, &l_mps, MQ_NO_TRANSACTION); + + if (hr != MQ_OK) { + log_it(L_ERROR, "An error occured on sending message to queue, errno: 0x%x", hr); + break; + } else { + l_errno = WSAGetLastError(); + + if(dap_sendto(l_cur->socket, NULL, 0) == SOCKET_ERROR) { + log_it(L_ERROR, "Write to socket error: %d", WSAGetLastError()); + } + l_cur->buf_out_size = 0; + dap_events_socket_set_writable_unsafe(l_cur,false); + + break; + } +#ifndef DAP_OS_WINDOWS + l_errno = errno; +#endif #elif defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE) l_bytes_sent = mq_send(l_cur->mqd , (const char *)l_cur->buf_out,sizeof (void*),0); if(l_bytes_sent == 0) @@ -559,15 +652,18 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg) // Socket already present in worker, it's OK return; } - + l_worker->esocket_rwlock = PTHREAD_RWLOCK_INITIALIZER; switch( l_es_new->type){ case DESCRIPTOR_TYPE_SOCKET_UDP: case DESCRIPTOR_TYPE_SOCKET: case DESCRIPTOR_TYPE_SOCKET_LISTENING:{ + +#ifdef DAP_OS_UNIX int l_cpu = l_worker->id; setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu)); - }break; +#endif + } break; default: {} } @@ -588,7 +684,9 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg) // Add in global list // Add in worker l_es_new->me = l_es_new; + pthread_rwlock_wrlock(&l_worker->esocket_rwlock); HASH_ADD(hh_worker, l_worker->esockets, me, sizeof(void *), l_es_new ); + pthread_rwlock_unlock(&l_worker->esocket_rwlock); l_worker->event_sockets_count++; //log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id); if (l_es_new->callbacks.worker_assign_callback) @@ -675,7 +773,9 @@ static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg) // Check if it was removed from the list dap_events_socket_t *l_msg_es = NULL; + pthread_rwlock_rdlock(&a_es->worker->esocket_rwlock); HASH_FIND(hh_worker, a_es->worker->esockets, &l_msg->esocket , sizeof (void*), l_msg_es ); + pthread_rwlock_unlock(&a_es->worker->esocket_rwlock); if ( l_msg_es == NULL){ log_it(L_INFO, "We got i/o message for client thats now not in list. Lost %u data", l_msg->data_size); DAP_DELETE(l_msg); @@ -719,7 +819,7 @@ static bool s_socket_all_check_activity( void * a_arg) time_t l_curtime= time(NULL); ctime_r(&l_curtime, l_curtimebuf); //log_it(L_DEBUG,"Check sockets activity on worker #%u at %s", l_worker->id, l_curtimebuf); - + pthread_rwlock_rdlock(&l_worker->esocket_rwlock); HASH_ITER(hh_worker, l_worker->esockets, l_es, tmp ) { if ( l_es->type == DESCRIPTOR_TYPE_SOCKET || l_es->type == DESCRIPTOR_TYPE_SOCKET_UDP ){ if ( !(l_es->flags & DAP_SOCK_SIGNAL_CLOSE) && @@ -732,6 +832,7 @@ static bool s_socket_all_check_activity( void * a_arg) } } } + pthread_rwlock_unlock(&l_worker->esocket_rwlock); return true; } @@ -746,8 +847,8 @@ void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_wor if(l_ret != 0 ){ char l_errbuf[128]; *l_errbuf = 0; - strerror_r(l_ret,l_errbuf,sizeof (l_errbuf)); - log_it(L_ERROR, "Cant send pointer in queue: \"%s\"(code %d)", l_errbuf, l_ret); + strerror_r(l_ret, l_errbuf, sizeof(l_errbuf)); + log_it(L_ERROR, "Can't send pointer in queue: \"%s\"(code %d)", l_errbuf, l_ret); } } diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index c8186129a6fe7b9f9cebcebd3178ce3446d2eea4..2547d1cddb1b69824f139711a1e240166b3905b6 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -54,7 +54,14 @@ #define DAP_EVENTS_CAPS_EPOLL #define DAP_EVENTS_CAPS_QUEUE_WEVENT #define DAP_EVENTS_CAPS_EVENT_WEVENT - #define DAP_EVENTS_CAPS_PIPE_POSIX + //#define DAP_EVENTS_CAPS_PIPE_POSIX + #define DAP_EVENTS_CAPS_MSMQ + #define INET_ADDRSTRLEN 16 + #define INET6_ADDRSTRLEN 46 +#include <mq.h> +#include <ws2tcpip.h> +#define MSG_DONTWAIT 0 +#define MSG_NOSIGNAL 0 #endif #if defined(DAP_EVENTS_CAPS_WEPOLL) @@ -97,12 +104,12 @@ typedef struct dap_events_socket_callbacks { union{ // Specific callbacks dap_events_socket_callback_connected_t connected_callback; // Connected callback for client socket dap_events_socket_callback_accept_t accept_callback; // Accept callback for listening socket - dap_events_socket_callback_timer_t timer_callback; // Timer callback for listening socket dap_events_socket_callback_event_t event_callback; // Event callback for listening socket dap_events_socket_callback_queue_t queue_callback; // Queue callback for listening socket dap_events_socket_callback_queue_ptr_t queue_ptr_callback; // queue_ptr callback for listening socket }; + dap_events_socket_callback_timer_t timer_callback; // Timer callback for listening socket dap_events_socket_callback_t new_callback; // Create new client callback dap_events_socket_callback_t delete_callback; // Delete client callback dap_events_socket_callback_t read_callback; // Read function @@ -128,18 +135,26 @@ typedef enum { } dap_events_desc_type_t; typedef struct dap_events_socket { - union{ + union { +#ifdef DAP_OS_WINDOWS + SOCKET socket; +#else int socket; +#endif int fd; #if defined(DAP_EVENTS_CAPS_QUEUE_MQUEUE) mqd_t mqd; -#endif + } +#elif defined DAP_EVENTS_CAPS_MSMQ }; + QUEUEHANDLE mqh, mqh_recv; + HANDLE ev_timeout, ev_recv; #if defined(DAP_EVENTS_CAPS_QUEUE_MQUEUE) uint32_t mqd_id; #endif +#endif -#ifdef DAP_EVENTS_CAPS_PIPE_POSIX +#if defined DAP_EVENTS_CAPS_PIPE_POSIX int fd2; #endif dap_events_desc_type_t type; @@ -156,26 +171,31 @@ typedef struct dap_events_socket { uint32_t buf_out_zero_count; // Input section - union{ - uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data - char buf_in_str[DAP_EVENTS_SOCKET_BUF+1]; - }; + //uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data + //char buf_in_str[DAP_EVENTS_SOCKET_BUF+1]; + byte_t *buf_in; + //char *buf_in_str; size_t buf_in_size; // size of data that is in the input buffer // Output section - byte_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data + //byte_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data + byte_t *buf_out; size_t buf_out_size; // size of data that is in the output buffer dap_events_socket_t * pipe_out; // Pipe socket with data for output // Stored string representation - char hostaddr[1024]; // Address - char service[128]; + //char hostaddr[1024]; // Address + //char service[128]; + char *hostaddr; + char *service; // Remote address, port and others struct sockaddr_in remote_addr; - char remote_addr_str[INET_ADDRSTRLEN]; - char remote_addr_str6[INET6_ADDRSTRLEN]; + //char remote_addr_str[INET_ADDRSTRLEN]; + //char remote_addr_str6[INET6_ADDRSTRLEN]; + char *remote_addr_str; + char *remote_addr_str6; short remote_port; @@ -202,7 +222,6 @@ typedef struct dap_events_socket { void *_inheritor; // Inheritor data to specific client type, usualy states for state machine void *_pvt; //Private section, different for different types struct dap_events_socket * me; // pointer on itself - UT_hash_handle hh; UT_hash_handle hh_worker; // Handle for local CPU storage on worker } dap_events_socket_t; // Node of bidirectional list of clients @@ -271,4 +290,7 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap_worker_t * a_worker); void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size); - +#ifdef DAP_OS_WINDOWS +int dap_recvfrom(SOCKET s, void* buf_in, size_t buf_size); +int dap_sendto(SOCKET s, void* buf_in, size_t buf_size); +#endif diff --git a/dap-sdk/net/core/include/dap_net.h b/dap-sdk/net/core/include/dap_net.h index ea9e07e4b65a0322f7eb03d40d9a055376287ce2..92ce284136b8f309577f2a1f41445776f77c4949 100644 --- a/dap-sdk/net/core/include/dap_net.h +++ b/dap-sdk/net/core/include/dap_net.h @@ -31,9 +31,6 @@ #include <ws2tcpip.h> #include <io.h> -#include "win32/ip.h" -#include "win32/iphdr.h" - #define s6_addr32 s6_addr #define herror perror #else diff --git a/dap-sdk/net/core/include/dap_proc_thread.h b/dap-sdk/net/core/include/dap_proc_thread.h index 02374e8e25f03529aaf557a717fa3325ccde979d..f15d601cddd8cfe41b0803fd51bc9b1ff6fdd113 100644 --- a/dap-sdk/net/core/include/dap_proc_thread.h +++ b/dap-sdk/net/core/include/dap_proc_thread.h @@ -45,7 +45,6 @@ typedef struct dap_proc_thread{ #ifdef DAP_EVENTS_CAPS_EPOLL EPOLL_HANDLE epoll_ctl; - struct epoll_event epoll_events[DAP_EVENTS_SOCKET_MAX]; #elif defined (DAP_EVENTS_CAPS_POLL) int poll_fd; struct pollfd * poll; diff --git a/dap-sdk/net/core/include/dap_server.h b/dap-sdk/net/core/include/dap_server.h index 33e7c5ca43e5aedffbe971e3a8549d66ed0babf7..183e1f6a453a2cb0f3082141307ad9f3846a1230 100644 --- a/dap-sdk/net/core/include/dap_server.h +++ b/dap-sdk/net/core/include/dap_server.h @@ -65,8 +65,11 @@ typedef struct dap_server { dap_server_type_t type; // Server's type uint16_t port; // Listen port char *address; // Listen address - +#ifdef DAP_OS_WINDOWS + SOCKET socket_listener; +#else int32_t socket_listener; // Socket for listener +#endif dap_list_t *es_listeners; struct sockaddr_in listener_addr; // Kernel structure for listener's binded address diff --git a/dap-sdk/net/core/include/dap_timerfd.h b/dap-sdk/net/core/include/dap_timerfd.h index b578d02ccf884e6bdb5abf27fc4888bd2a53522e..8a17faad90d070b392bd52f8a6dde0e2312595e9 100644 --- a/dap-sdk/net/core/include/dap_timerfd.h +++ b/dap-sdk/net/core/include/dap_timerfd.h @@ -42,13 +42,17 @@ typedef bool (*dap_timerfd_callback_t)(void* ); // Callback for timer. If return typedef struct dap_timerfd { uint64_t timeout_ms; +#ifdef DAP_OS_WINDOWS + SOCKET tfd; +#else int tfd; //timer file descriptor +#endif dap_events_socket_t *events_socket; dap_timerfd_callback_t callback; void *callback_arg; #ifdef DAP_OS_WINDOWS HANDLE th; - int pipe_in; + SOCKET pipe_in; #endif } dap_timerfd_t; diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/net/core/include/dap_worker.h index a19ef1c393f6e779edb777a8b1021c6eef72e019..4fab1f676f577dd61bc29992bed317dda624f3a2 100644 --- a/dap-sdk/net/core/include/dap_worker.h +++ b/dap-sdk/net/core/include/dap_worker.h @@ -36,6 +36,7 @@ typedef struct dap_worker dap_events_socket_t *proc_queue_input; atomic_uint event_sockets_count; + pthread_rwlock_t esocket_rwlock; dap_events_socket_t *esockets; // Hashmap of event sockets // Signal to exit @@ -52,7 +53,10 @@ typedef struct dap_worker dap_events_socket_t * queue_callback; // Queue for pure callback on worker dap_timerfd_t * timer_check_activity; -#ifdef DAP_EVENTS_CAPS_EPOLL +#if defined DAP_EVENTS_CAPS_MSMQ + HANDLE msmq_events[MAXIMUM_WAIT_OBJECTS]; +#endif +#if defined DAP_EVENTS_CAPS_EPOLL EPOLL_HANDLE epoll_fd; #elif defined ( DAP_EVENTS_CAPS_POLL) int poll_fd; diff --git a/dap-sdk/net/server/http_server/dap_http_folder.c b/dap-sdk/net/server/http_server/dap_http_folder.c index 857f39775268a9c2affa017e80cd2bfa22e5c52e..fa2286d3e98d40d4ab103e964acc54288fa59deb 100644 --- a/dap-sdk/net/server/http_server/dap_http_folder.c +++ b/dap-sdk/net/server/http_server/dap_http_folder.c @@ -295,7 +295,7 @@ void dap_http_folder_data_write(dap_http_client_t * cl_ht, void * arg) { (void) arg; dap_http_file_t * cl_ht_file= DAP_HTTP_FILE(cl_ht); - cl_ht->esocket->buf_out_size=fread(cl_ht->esocket->buf_out,1,sizeof(cl_ht->esocket->buf_out),cl_ht_file->fd); + cl_ht->esocket->buf_out_size=fread(cl_ht->esocket->buf_out, 1, DAP_EVENTS_SOCKET_BUF + 1, cl_ht_file->fd); cl_ht_file->position+=cl_ht->esocket->buf_out_size; dap_events_socket_set_writable_unsafe(cl_ht->esocket, true); diff --git a/dap-sdk/net/server/http_server/http_client/dap_http_client.c b/dap-sdk/net/server/http_server/http_client/dap_http_client.c index 8ee096e68d2b2fc0ad95c50161e7394e300c7a64..0a04bb2e3bcd11cba81ee5d5d85a796a9a815df7 100644 --- a/dap-sdk/net/server/http_server/http_client/dap_http_client.c +++ b/dap-sdk/net/server/http_server/http_client/dap_http_client.c @@ -340,7 +340,7 @@ void dap_http_client_read( dap_events_socket_t *a_esocket, void *arg ) } if (peol) { - eol = peol - a_esocket->buf_in_str; + eol = peol - (char*)a_esocket->buf_in; if (eol <= 0) { eol = a_esocket->buf_in_size - 2; } @@ -422,7 +422,7 @@ void dap_http_client_read( dap_events_socket_t *a_esocket, void *arg ) break; } - l_eol_pos = l_str_eol - a_esocket->buf_in_str; + l_eol_pos = l_str_eol - (char*)a_esocket->buf_in; int parse_ret; memcpy( l_buf_line, a_esocket->buf_in, l_eol_pos + 1 ); diff --git a/dap-sdk/net/stream/ch/dap_stream_ch.c b/dap-sdk/net/stream/ch/dap_stream_ch.c index b28f693ab8148ae67d6eaaf399d612a694a8fcf6..d87c5833f46506f78e7e6fa10a6d816c9d297976 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch.c @@ -98,8 +98,9 @@ dap_stream_ch_t* dap_stream_ch_new(dap_stream_t* a_stream, uint8_t id) // Init on stream worker dap_stream_worker_t * l_stream_worker = a_stream->stream_worker; l_ch_new->stream_worker = l_stream_worker; + pthread_rwlock_wrlock(&l_stream_worker->channels_rwlock); HASH_ADD(hh_worker,l_stream_worker->channels, me,sizeof (void*),l_ch_new); - + pthread_rwlock_unlock(&l_stream_worker->channels_rwlock); pthread_mutex_init(&(l_ch_new->mutex),NULL); // Proc new callback @@ -130,8 +131,9 @@ dap_stream_ch_t* dap_stream_ch_new(dap_stream_t* a_stream, uint8_t id) void dap_stream_ch_delete(dap_stream_ch_t *a_ch) { dap_stream_worker_t * l_stream_worker = a_ch->stream_worker; + pthread_rwlock_wrlock(&l_stream_worker->channels_rwlock); HASH_DELETE(hh_worker,l_stream_worker->channels, a_ch); - + pthread_rwlock_unlock(&l_stream_worker->channels_rwlock); pthread_mutex_lock(&s_ch_table_lock); struct dap_stream_ch_table_t *l_ret;; diff --git a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c index b13e46355dfe1e14a1952126f9a76d0619a17f9e..c906f92c190f9f6fa26af3008edf073966de4c81 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c @@ -143,7 +143,9 @@ bool dap_stream_ch_check_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t * if (a_ch){ if ( a_worker->channels){ dap_stream_ch_t * l_ch = NULL; + pthread_rwlock_wrlock(&a_worker->channels_rwlock); HASH_FIND(hh_worker,a_worker->channels ,&a_ch, sizeof(a_ch), l_ch ); + pthread_rwlock_unlock(&a_worker->channels_rwlock); return l_ch == a_ch; }else return false; diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index 864f247f438dc7735be5f05ea6de85322d77c7a3..39561ca0af59ab4824074ab1c0a8ce2ba41b44d2 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -26,15 +26,15 @@ #include <stdint.h> #include <unistd.h> -#include <mqueue.h> - -#ifdef _WIN32 +#ifdef DAP_OS_WINDOWS #include <winsock2.h> #include <windows.h> #include <mswsock.h> #include <ws2tcpip.h> #include <io.h> #include <pthread.h> +#else +#include <mqueue.h> #endif #include "dap_common.h" diff --git a/dap-sdk/net/stream/stream/dap_stream_worker.c b/dap-sdk/net/stream/stream/dap_stream_worker.c index fe374507f386098af0efc7fad49e6f5973ad5c6b..83751efcd5530aa0b6ccb7edfd44d592a3c90bee 100644 --- a/dap-sdk/net/stream/stream/dap_stream_worker.c +++ b/dap-sdk/net/stream/stream/dap_stream_worker.c @@ -50,7 +50,9 @@ int dap_stream_worker_init() dap_stream_worker_t *l_stream_worker = DAP_NEW_Z(dap_stream_worker_t); l_worker->_inheritor = l_stream_worker; l_stream_worker->worker = l_worker; + l_stream_worker->channels_rwlock = PTHREAD_RWLOCK_INITIALIZER; l_stream_worker->queue_ch_io = dap_events_socket_create_type_queue_ptr_mt( l_worker, s_ch_io_callback); + log_it(L_WARNING, "Queue ch io socket: %d, worker %p", l_stream_worker->queue_ch_io->socket, l_stream_worker->worker); } return 0; } @@ -67,7 +69,9 @@ static void s_ch_io_callback(dap_events_socket_t * a_es, void * a_msg) // Check if it was removed from the list dap_stream_ch_t *l_msg_ch = NULL; + pthread_rwlock_wrlock(&l_stream_worker->channels_rwlock); HASH_FIND(hh_worker, l_stream_worker->channels , &l_msg->ch , sizeof (void*), l_msg_ch ); + pthread_rwlock_unlock(&l_stream_worker->channels_rwlock); if ( l_msg_ch == NULL){ log_it(L_DEBUG, "We got i/o message for client thats now not in list. Lost %u data", l_msg->data_size); DAP_DELETE(l_msg); diff --git a/dap-sdk/net/stream/stream/include/dap_stream_worker.h b/dap-sdk/net/stream/stream/include/dap_stream_worker.h index 3058cdf8d68e39e48e88e5cd0f6d72a7de3caf93..2e84a650559d8546b1a675a47fa42c2161009da1 100644 --- a/dap-sdk/net/stream/stream/include/dap_stream_worker.h +++ b/dap-sdk/net/stream/stream/include/dap_stream_worker.h @@ -28,6 +28,7 @@ typedef struct dap_stream_worker { dap_worker_t * worker; dap_events_socket_t *queue_ch_io; // IO queue for channels dap_stream_ch_t * channels; // Client channels assigned on worker. Unsafe list, operate only in worker's context + pthread_rwlock_t channels_rwlock; } dap_stream_worker_t; #define DAP_STREAM_WORKER(a) ((dap_stream_worker_t*) (a->_inheritor) ) diff --git a/modules/chain/dap_chain.c b/modules/chain/dap_chain.c index 4d68717b698a12978760201db76551b8cabacb80..958c1f74067c977f936b934f9879e1c703414543 100644 --- a/modules/chain/dap_chain.c +++ b/modules/chain/dap_chain.c @@ -24,7 +24,9 @@ #include <dap_chain_ledger.h> #include <sys/types.h> #include <dirent.h> +#ifdef DAP_OS_UNIX #include <stdc-predef.h> +#endif #include <unistd.h> #include "dap_common.h" diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 422cbd9965aa14f8c883ca7e96173b93170d5139..ece819203ebd530c8123f3ceb2e93018165f921f 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -191,7 +191,6 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) bool s_sync_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg) { - UNUSED(a_thread); dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); @@ -338,7 +337,6 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) { - UNUSED(a_thread); dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); @@ -557,6 +555,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); } else { dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); +#ifdef DAP_OS_WINDOWS + if (a_ch->stream_worker->worker->proc_queue_input->buf_out_size == 0) +#endif dap_proc_queue_add_callback_inter( a_ch->stream_worker->worker->proc_queue_input, s_sync_chains_callback, a_ch); } } @@ -593,6 +594,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_INFO, "Got DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB request", a_ch->stream->session->id); dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); +#ifdef DAP_OS_WINDOWS + if (a_ch->stream_worker->worker->proc_queue_input->buf_out_size == 0) +#endif dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_gdb_callback, a_ch); } }else{ @@ -639,6 +643,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); +#ifdef DAP_OS_WINDOWS + if (a_ch->stream_worker->worker->proc_queue_input->buf_out_size == 0) +#endif dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_chain_pkt_callback, a_ch); } else { log_it(L_WARNING, "Empty chain packet"); @@ -677,6 +684,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_pkt_copy->pkt_data_size = l_chain_pkt_data_size; l_ch_chain->pkt_copy_list = dap_list_append(l_ch_chain->pkt_copy_list, l_pkt_copy); dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); +//#ifdef DAP_OS_WINDOWS +// if (a_ch->stream_worker->worker->proc_queue_input->buf_out_size == 0) +//#endif dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_gdb_pkt_callback, a_ch); } else { log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size"); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 712698c052937f7fb6a986d6a95ce6382b985bcc..2e74d96e57404faa215dfd29d6ba8a7e1348af52 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -459,6 +459,8 @@ static int s_net_states_proc(dap_chain_net_t *a_net) } if (l_addr.s_addr) { dap_chain_node_info_t *l_link_node_info = DAP_NEW_Z(dap_chain_node_info_t); + struct in_addr _in_addr = { { .S_addr = l_addr.S_un.S_addr } }; + log_it(L_INFO, "dns get addrs %s : %d, net %s", inet_ntoa(_in_addr), l_port, a_net->pub.name); int l_res = dap_dns_client_get_addr(l_addr, l_port, a_net->pub.name, l_link_node_info); if (!l_res && l_link_node_info->hdr.address.uint64 != l_own_addr) { l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info); diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 3dd932297a3a2214407b91a7e325fb74b903df0a..61a8d6eac6f50ba36c1251c5e66f30c6b9f87fa8 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -548,7 +548,7 @@ int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_s return -2; } -#ifndef _WIN32 +#ifndef DAP_OS_WINDOWS // prepare for signal waiting struct timespec l_cond_timeout; clock_gettime( CLOCK_MONOTONIC, &l_cond_timeout); @@ -558,9 +558,10 @@ int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_s #endif // signal waiting - do { -#ifndef _WIN32 + +#ifndef DAP_OS_WINDOWS + do { int l_ret_wait = pthread_cond_timedwait(&a_client->wait_cond, &a_client->wait_mutex, &l_cond_timeout); if(l_ret_wait == 0 && ( a_client->state == a_waited_state || @@ -579,27 +580,23 @@ int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_s strerror_r(l_ret_wait,l_errbuf,sizeof (l_errbuf)); log_it(L_ERROR, "Pthread condition timed wait returned \"%s\"(code %d)", l_errbuf, l_ret_wait); } + } while(1); #else - int wait = WaitForSingleObject( a_client->wait_cond, (uint32_t)a_timeout_s*1000 ); - pthread_mutex_lock( &a_client->wait_mutex ); - - if ( wait == WAIT_OBJECT_0 && ( - a_client->state == a_waited_state || - a_client->state == NODE_CLIENT_STATE_ERROR || a_client->state == NODE_CLIENT_STATE_DISCONNECTED - )) { - ret = a_client->state == a_waited_state ? 0 : -2; - break; - } - - else if ( wait == WAIT_TIMEOUT || wait == WAIT_FAILED ) { - ret = -1; - break; - } + DWORD wait = WaitForSingleObject( a_client->wait_cond, (uint32_t)a_timeout_ms); + if ( wait == WAIT_OBJECT_0 && ( + a_client->state == a_waited_state || + a_client->state == NODE_CLIENT_STATE_ERROR || + a_client->state == NODE_CLIENT_STATE_DISCONNECTED)) + { + return a_client->state == a_waited_state ? 0 : -2; + } else if ( wait == WAIT_TIMEOUT || wait == WAIT_FAILED ) { + return -1; + } #endif - } while(1); - +#ifndef DAP_OS_WINDOWS pthread_mutex_unlock(&a_client->wait_mutex); +#endif return ret; } diff --git a/modules/net/dap_dns_server.c b/modules/net/dap_dns_server.c index 4eab75bd15b332ce92e77892bc6c4ac1058f34f4..1121fd50eba2eef831c53090f7a992538d6c2000 100644 --- a/modules/net/dap_dns_server.c +++ b/modules/net/dap_dns_server.c @@ -33,6 +33,7 @@ #include "dap_chain_global_db_remote.h" #define LOG_TAG "dap_dns_server" +#define BUF_SIZE 1024 #ifndef _WIN32 #include <unistd.h> // for close diff --git a/modules/net/srv/dap_chain_net_srv_geoip.c b/modules/net/srv/dap_chain_net_srv_geoip.c index e79d0d48f85a0ab6bd2580698e6d67ab9bb525fd..eb793bca9addd2f70ca220b37fd5ae15cb7fe3e6 100644 --- a/modules/net/srv/dap_chain_net_srv_geoip.c +++ b/modules/net/srv/dap_chain_net_srv_geoip.c @@ -73,11 +73,12 @@ geoip_info_t *chain_net_geoip_get_ip_info_by_web(const char *a_ip_str) const char *license_key = "1JGvRmd3Ux1kcBkb"; char *l_auth = dap_strdup_printf("%s:%s", user_id, license_key); size_t l_out_len = dap_enc_base64_encode(l_auth, strlen(l_auth), l_out, DAP_ENC_DATA_TYPE_B64); - char * l_custom = l_out_len > 0 ? dap_strdup_printf("Authorization: Basic %s", l_out) : NULL; - size_t l_custom_count = 1; + size_t l_size_req = l_out_len > 0 ? l_out_len + 32 : 0; + char * l_custom = l_out_len > 0 ? DAP_NEW_S_SIZE(char, l_size_req) : NULL; + int l_offset = l_out_len ? dap_snprintf(l_custom, l_size_req, "Authorization: Basic %s\r\n", l_out) : 0; // todo just need to finish up https request dap_client_http_request_custom(NULL,"geoip.maxmind.com", 443, "GET", "application/json", l_path, NULL, - 0, NULL, m_request_getip_response, m_request_getip_request_error, NULL, &l_custom, l_custom_count); + 0, NULL, m_request_getip_response, m_request_getip_request_error, NULL, l_custom); return NULL; }