Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (8)
Showing
with 936 additions and 360 deletions
......@@ -2,7 +2,7 @@ project(cellframe-sdk C)
cmake_minimum_required(VERSION 2.8)
set(CMAKE_C_STANDARD 11)
set(CELLFRAME_SDK_NATIVE_VERSION "2.6-66")
set(CELLFRAME_SDK_NATIVE_VERSION "2.6-72")
add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"")
set(DAPSDK_MODULES "")
......@@ -132,7 +132,7 @@ endif()
if (WIN32)
set(CELLFRAME_LIBS ${CELLFRAME_LIBS} KERNEL32 USER32 SHELL32 WINMM GDI32 ADVAPI32
Ole32 Version Imm32 OleAut32 ws2_32 ntdll psapi
Shlwapi Bcrypt Crypt32 Secur32 userenv )
Shlwapi Bcrypt Crypt32 Secur32 userenv mqrt)
endif()
target_link_libraries(${PROJECT_NAME} ${CELLFRAME_LIBS})
......
......@@ -37,6 +37,8 @@
#ifdef DAP_OS_WINDOWS
#include <fcntl.h>
#define pipe(pfds) _pipe(pfds, 4096, _O_BINARY)
#define strerror_r(arg1, arg2, arg3) strerror_s(arg2, arg3, arg1)
#define ctime_r(arg1, arg2) ctime_s(arg2, sizeof(arg2), arg1)
#endif
#ifdef __MACH__
#include <dispatch/dispatch.h>
......
......@@ -188,7 +188,7 @@ void dap_client_delete_unsafe(dap_client_t * a_client)
if ( DAP_CLIENT_PVT(a_client)->refs_count ==0 ){
dap_client_pvt_delete( DAP_CLIENT_PVT(a_client) );
pthread_mutex_destroy(&a_client->mutex);
DAP_DELETE(a_client);
DAP_DEL_Z(a_client)
} else
DAP_CLIENT_PVT(a_client)->is_to_delete = true;
}
......@@ -265,7 +265,7 @@ static void s_go_stage_on_client_worker_unsafe(dap_worker_t * a_worker,void * a_
dap_client_delete_unsafe(l_client_pvt->client);
return;
}
log_it(L_DEBUG, "Start transitions chain for client %p from %s to %s", l_client_pvt->client, dap_client_stage_str(l_cur_stage ) , dap_client_stage_str(l_stage_target));
log_it(L_DEBUG, "Start transitions chain for client %p -> %p from %s to %s", l_client_pvt, l_client_pvt->client, dap_client_stage_str(l_cur_stage ) , dap_client_stage_str(l_stage_target));
l_client_pvt->stage_target = l_stage_target;
l_client_pvt->stage_target_done_callback = l_stage_end_callback;
if (l_stage_target < l_cur_stage) {
......
......@@ -33,6 +33,7 @@
#include "dap_client.h"
#include "dap_client_pvt.h"
#include "dap_client_http.h"
#include "dap_enc_base64.h"
#define LOG_TAG "dap_client_http"
......@@ -63,10 +64,9 @@ typedef struct dap_http_client_internal {
uint16_t uplink_port;
const char *method;
const char *request_content_type;
const char * path;
char * path;
char *cookie;
char **request_custom_headers; // Custom headers
size_t request_custom_headers_count;
char *request_custom_headers; // Custom headers
// Request vars
dap_worker_t * worker;
......@@ -88,7 +88,6 @@ static void s_http_error(dap_events_socket_t * a_es, int a_arg);
*/
static void s_http_read(dap_events_socket_t * a_es, void * arg)
{
// log_it(L_DEBUG, "s_http_read ");
dap_client_http_pvt_t * l_client_http_internal = PVT(a_es);
if(!l_client_http_internal) {
log_it(L_ERROR, "s_http_read: l_client_http_internal is NULL!");
......@@ -182,22 +181,18 @@ static void s_http_error(dap_events_socket_t * a_es, int a_errno)
if (a_errno == ETIMEDOUT){
strncpy(l_errbuf,"Connection timeout", sizeof (l_errbuf)-1);
}else if (a_errno == ECONNREFUSED){
strncpy(l_errbuf,"Connection refused", sizeof (l_errbuf)-1);
}else if (a_errno == EHOSTDOWN){
strncpy(l_errbuf,"Host is down", sizeof (l_errbuf)-1);
}else if (a_errno == EHOSTUNREACH){
strncpy(l_errbuf,"No route to host", sizeof (l_errbuf)-1);
}else if (a_errno == EREMOTEIO){
strncpy(l_errbuf,"Remote I/O error", sizeof (l_errbuf)-1);
}else if(a_errno)
strerror_r(a_errno, l_errbuf, sizeof (l_errbuf));
else
strncpy(l_errbuf,"Unknown Error", sizeof (l_errbuf)-1);
if (a_es->flags & DAP_SOCK_CONNECTING){
log_it(L_WARNING, "Socket connecting error: %s (code %d)" , l_errbuf, a_errno);
log_it(L_WARNING, "Socket %d connecting error: %s (code %d)" , a_es->socket, l_errbuf, a_errno);
}else
log_it(L_WARNING, "Socket error: %s (code %d)" , l_errbuf, a_errno);
log_it(L_WARNING, "Socket %d error: %s (code %d)", a_es->socket, l_errbuf, a_errno);
dap_client_http_pvt_t * l_client_http_internal = PVT(a_es);
......@@ -228,19 +223,10 @@ static void s_client_http_delete(dap_client_http_pvt_t * a_http_pvt)
return;
}
if (a_http_pvt->response){
DAP_DELETE(a_http_pvt->response);
a_http_pvt->response = NULL;
}
if(a_http_pvt->request_custom_headers != NULL) {
for( size_t i = 0; i < a_http_pvt->request_custom_headers_count; i++) {
DAP_DELETE( a_http_pvt->request_custom_headers[i]);
}
a_http_pvt->request_custom_headers = NULL;
//DAP_DELETE( l_client_http_pvt->request_custom_headers);
}
DAP_DEL_Z(a_http_pvt->response)
DAP_DEL_Z(a_http_pvt->path)
DAP_DEL_Z(a_http_pvt->request)
DAP_DEL_Z(a_http_pvt->request_custom_headers)
}
......@@ -264,7 +250,7 @@ static void s_client_http_delete(dap_client_http_pvt_t * a_http_pvt)
void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplink_addr, uint16_t a_uplink_port, const char *a_method,
const char *a_request_content_type, const char * a_path, void *a_request, size_t a_request_size, char *a_cookie,
dap_client_http_callback_data_t a_response_callback, dap_client_http_callback_error_t a_error_callback,
void *a_obj, char **a_custom, size_t a_custom_count)
void *a_obj, char *a_custom)
{
//log_it(L_DEBUG, "HTTP request on url '%s:%d'", a_uplink_addr, a_uplink_port);
......@@ -275,12 +261,23 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin
};
// create socket
int l_socket = socket( PF_INET, SOCK_STREAM, 0);
#ifdef DAP_OS_WINDOWS
SOCKET l_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (l_socket == INVALID_SOCKET) {
log_it(L_ERROR, "Socket create error: %d", WSAGetLastError());
#else
int l_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (l_socket == -1) {
log_it(L_ERROR, "Error %d with socket create", errno);
#endif
return NULL;
}
// Get socket flags
#if defined DAP_OS_WINDOWS
u_long l_socket_flags = 0;
if (ioctlsocket((SOCKET)l_socket, (long)FIONBIO, &l_socket_flags))
log_it(L_ERROR, "Error ioctl %d", WSAGetLastError());
#else
int l_socket_flags = fcntl(l_socket, F_GETFL);
if (l_socket_flags == -1){
log_it(L_ERROR, "Error %d can't get socket flags", errno);
......@@ -291,6 +288,7 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin
log_it(L_ERROR, "Error %d can't get socket flags", errno);
return NULL;
}
#endif
// set socket param
int buffsize = DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX;
#ifdef _WIN32
......@@ -310,15 +308,14 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin
//l_client_http_internal->socket = l_socket;
l_http_pvt->obj = a_obj;
l_http_pvt->method = a_method;
l_http_pvt->path = a_path;
l_http_pvt->path = dap_strdup(a_path);
l_http_pvt->request_content_type = a_request_content_type;
l_http_pvt->request = a_request;
l_http_pvt->request = (u_char*)dap_strdup(a_request);
l_http_pvt->request_size = a_request_size;
l_http_pvt->uplink_addr = a_uplink_addr;
l_http_pvt->uplink_port = a_uplink_port;
l_http_pvt->cookie = a_cookie;
l_http_pvt->request_custom_headers = a_custom;
l_http_pvt->request_custom_headers_count = a_custom_count;
l_http_pvt->request_custom_headers = dap_strdup(a_custom);
l_http_pvt->response_size_max = DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX;
l_http_pvt->response = (uint8_t*) DAP_NEW_Z_SIZE(uint8_t, DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX);
......@@ -341,7 +338,8 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin
l_ev_socket->remote_addr.sin_family = AF_INET;
l_ev_socket->remote_addr.sin_port = htons(a_uplink_port);
l_ev_socket->flags |= DAP_SOCK_CONNECTING;
l_ev_socket->flags |= DAP_SOCK_READY_TO_WRITE ;
l_ev_socket->type = DESCRIPTOR_TYPE_SOCKET;
l_ev_socket->flags |= DAP_SOCK_READY_TO_WRITE;
int l_err = connect(l_socket, (struct sockaddr *) &l_ev_socket->remote_addr, sizeof(struct sockaddr_in));
if (l_err == 0){
......@@ -349,12 +347,31 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin
l_http_pvt->worker = a_worker?a_worker: dap_events_worker_get_auto();
dap_worker_add_events_socket(l_ev_socket,l_http_pvt->worker);
return l_http_pvt;
}else if( errno == EINPROGRESS && l_err == -1){
}
#ifdef DAP_OS_WINDOWS
else if(l_err == SOCKET_ERROR) {
int l_err2 = WSAGetLastError();
if (l_err2 == EWOULDBLOCK || l_err2 == EAGAIN) {
log_it(L_DEBUG, "Connecting to %s:%u", a_uplink_addr, a_uplink_port);
l_http_pvt->worker = a_worker?a_worker: dap_events_worker_get_auto();
dap_worker_add_events_socket(l_ev_socket,l_http_pvt->worker);
return l_http_pvt;
} else {
log_it(L_ERROR, "Socket %d connecting error: %d", l_ev_socket->socket, WSAGetLastError());
s_client_http_delete( l_http_pvt);
l_ev_socket->_inheritor = NULL;
dap_events_socket_delete_unsafe( l_ev_socket, true);
return NULL;
}
}
#else
else if( errno == EINPROGRESS && l_err == -1){
log_it(L_DEBUG, "Connecting to %s:%u", a_uplink_addr, a_uplink_port);
l_http_pvt->worker = a_worker?a_worker: dap_events_worker_get_auto();
dap_worker_add_events_socket(l_ev_socket,l_http_pvt->worker);
return l_http_pvt;
}else{
}
else{
char l_errbuf[128];
l_errbuf[0] = '\0';
strerror_r(l_err, l_errbuf, sizeof (l_errbuf));
......@@ -364,6 +381,7 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin
dap_events_socket_delete_unsafe( l_ev_socket, true);
return NULL;
}
#endif
}
/**
......@@ -382,49 +400,42 @@ static void s_http_connected(dap_events_socket_t * a_esocket)
// add to dap_worker
//dap_client_pvt_t * l_client_pvt = (dap_client_pvt_t*) a_obj;
//dap_events_new();
dap_string_t *l_request_headers = dap_string_new(NULL);
char l_request_headers[1024] = { '\0' };
int l_offset = 0;
size_t l_offset2 = sizeof(l_request_headers);
if(l_http_pvt->request && (dap_strcmp(l_http_pvt->method, "POST") == 0 || dap_strcmp(l_http_pvt->method, "POST_ENC") == 0)) {
char l_buf[1024];
//log_it(L_DEBUG, "POST request with %u bytes of decoded data", a_request_size);
if(l_http_pvt->request_content_type) {
dap_snprintf(l_buf, sizeof(l_buf), "Content-Type: %s\r\n", l_http_pvt->request_content_type);
l_request_headers = dap_string_append(l_request_headers, l_buf);
}
l_offset += l_http_pvt->request_content_type
? dap_snprintf(l_request_headers, l_offset2, "Content-Type: %s\r\n", l_http_pvt->request_content_type)
: 0;
// Add custom headers
if(l_http_pvt->request_custom_headers) {
for( size_t i = 0; i < l_http_pvt->request_custom_headers_count; i++) {
l_request_headers = dap_string_append(l_request_headers, l_http_pvt->request_custom_headers[i]);
l_request_headers = dap_string_append(l_request_headers, "\r\n");
}
}
l_offset += l_http_pvt->request_custom_headers
? dap_snprintf(l_request_headers + l_offset, l_offset2 -= l_offset, "%s", l_http_pvt->request_custom_headers)
: 0;
// Setup cookie header
if(l_http_pvt->cookie) {
dap_snprintf(l_buf, sizeof(l_buf), "Cookie: %s\r\n", l_http_pvt->cookie);
l_request_headers = dap_string_append(l_request_headers, l_buf);
}
l_offset += l_http_pvt->cookie
? dap_snprintf(l_request_headers + l_offset, l_offset2 -= l_offset, "Cookie: %s\r\n", l_http_pvt->cookie)
: 0;
// Set request size as Content-Length header
dap_snprintf(l_buf, sizeof(l_buf), "Content-Length: %lu\r\n", l_http_pvt->request_size);
l_request_headers = dap_string_append(l_request_headers, l_buf);
l_offset += dap_snprintf(l_request_headers + l_offset, l_offset2 -= l_offset, "Content-Length: %lu\r\n", l_http_pvt->request_size);
}
// adding string for GET request
char *l_get_str = NULL;
char l_get_str[l_http_pvt->request_size + 2];
memset(l_get_str, 0, sizeof(l_get_str));
if(!dap_strcmp(l_http_pvt->method, "GET")) {
char l_buf[1024];
dap_snprintf(l_buf, sizeof(l_buf), "User-Agent: Mozilla\r\n"); // We hide our request and mask them as possible
if(l_http_pvt->cookie) {
dap_snprintf(l_buf, sizeof(l_buf), "Cookie: %s\r\n", l_http_pvt->cookie);
l_request_headers = dap_string_append(l_request_headers, l_buf);
}
// We hide our request and mask them as possible
l_offset += dap_snprintf(l_request_headers + l_offset, l_offset2 -= l_offset, "User-Agent: Mozilla\r\n");
l_offset += l_http_pvt->cookie
? dap_snprintf(l_request_headers + l_offset, l_offset2 -= l_offset, "Cookie: %s\r\n", l_http_pvt->cookie)
: 0;
if(l_http_pvt->request)
l_get_str = dap_strdup_printf("?%s", l_http_pvt->request);
l_offset = l_http_pvt->request ? dap_snprintf(l_get_str, sizeof(l_get_str), "?%s", l_http_pvt->request) : 0;
}
// send header
......@@ -432,14 +443,11 @@ static void s_http_connected(dap_events_socket_t * a_esocket)
"Host: %s\r\n"
"%s"
"\r\n",
l_http_pvt->method, l_http_pvt->path, l_get_str ? l_get_str : "", l_http_pvt->uplink_addr, l_request_headers->str);
l_http_pvt->method, l_http_pvt->path, strlen(l_get_str) ? l_get_str : "", l_http_pvt->uplink_addr, l_request_headers);
// send data for POST request
if(l_get_str)
DAP_DELETE(l_get_str);
else if ( l_http_pvt->request_size)
if (l_http_pvt->request_size) {
dap_events_socket_write_unsafe( a_esocket, l_http_pvt->request, l_http_pvt->request_size);
dap_string_free(l_request_headers, true);
}
}
......@@ -464,14 +472,7 @@ void* dap_client_http_request(dap_worker_t * a_worker,const char *a_uplink_addr,
char * a_cookie, dap_client_http_callback_data_t a_response_callback,
dap_client_http_callback_error_t a_error_callback, void *a_obj, void * a_custom)
{
char *a_custom_new[1];
size_t a_custom_count = 0;
// use no more then one custom item only
a_custom_new[0] = (char*) a_custom;
if(a_custom)
a_custom_count = 1;
return dap_client_http_request_custom(a_worker, a_uplink_addr, a_uplink_port, a_method, a_request_content_type, a_path,
a_request, a_request_size, a_cookie, a_response_callback, a_error_callback, a_obj,
a_custom_new, a_custom_count);
(char*)a_custom);
}
......@@ -169,7 +169,7 @@ void dap_client_pvt_delete(dap_client_pvt_t * a_client_pvt)
if(a_client_pvt->stream_key)
dap_enc_key_delete(a_client_pvt->stream_key);
DAP_DELETE(a_client_pvt);
DAP_DEL_Z(a_client_pvt)
}
/**
......@@ -236,21 +236,23 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt)
l_sign_size = dap_sign_get_size(l_sign);
}
uint8_t l_data[l_key_size + l_sign_size];
memset(l_data, 0, sizeof(l_data));
memcpy(l_data,a_client_pvt->session_key_open->pub_key_data, l_key_size);
if (l_sign) {
memcpy(l_data + l_key_size, l_sign, l_sign_size);
}
size_t l_data_str_size_max = DAP_ENC_BASE64_ENCODE_SIZE(l_key_size + l_sign_size);
char l_data_str[l_data_str_size_max + 1];
memset(l_data_str, 0, sizeof(l_data_str));
// DAP_ENC_DATA_TYPE_B64_URLSAFE not need because send it by POST request
size_t l_data_str_enc_size = dap_enc_base64_encode(l_data, l_key_size + l_sign_size, l_data_str, DAP_ENC_DATA_TYPE_B64);
log_it(L_DEBUG, "ENC request size %u", l_data_str_enc_size);
char * l_enc_init_url = dap_strdup_printf(DAP_UPLINK_PATH_ENC_INIT
"/gd4y5yh78w42aaagh" "?enc_type=%d,pkey_exchange_type=%d,pkey_exchange_size=%zd",
a_client_pvt->session_key_type, a_client_pvt->session_key_open_type,
l_key_size );
char l_enc_init_url[1024] = { '\0' };
dap_snprintf(l_enc_init_url, sizeof(l_enc_init_url), DAP_UPLINK_PATH_ENC_INIT
"/gd4y5yh78w42aaagh" "?enc_type=%d,pkey_exchange_type=%d,pkey_exchange_size=%zd",
a_client_pvt->session_key_type, a_client_pvt->session_key_open_type,
l_key_size );
int l_res = dap_client_pvt_request(a_client_pvt, l_enc_init_url,
l_data_str, l_data_str_enc_size, s_enc_init_response, s_enc_init_error);
// bad request
......@@ -290,13 +292,31 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt)
case STAGE_STREAM_SESSION: {
log_it(L_INFO, "Go to stage STREAM_SESSION: process the state ops");
a_client_pvt->stream_socket = socket( PF_INET, SOCK_STREAM, 0);
a_client_pvt->stream_socket = socket(PF_INET, SOCK_STREAM, 0);
#ifdef DAP_OS_WINDOWS
if (a_client_pvt->stream_socket == INVALID_SOCKET) {
log_it(L_ERROR, "Socket create error %d", WSAGetLastError());
#else
if (a_client_pvt->stream_socket == -1) {
log_it(L_ERROR, "Error %d with socket create", errno);
#endif
a_client_pvt->stage_status = STAGE_STATUS_ERROR;
break;
}
#ifdef DAP_OS_WINDOWS
u_long l_socket_flags = 0;
if (ioctlsocket(a_client_pvt->stream_socket, (long)FIONBIO, &l_socket_flags) == SOCKET_ERROR) {
log_it(L_ERROR, "Can't set socket %d to nonblocking mode, error %d", a_client_pvt->stream_socket, WSAGetLastError());
}
int buffsize = 0x40000;
int optsize = sizeof( int );
if (setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (char *)&buffsize, &optsize ) < 0) {
log_it(L_ERROR, "Cant' set send buf size on socket %d, error %d", a_client_pvt->stream_socket, WSAGetLastError());
}
if (setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, &optsize ) < 0) {
log_it(L_ERROR, "Cant' set recv buf size on socket %d, error %d", a_client_pvt->stream_socket, WSAGetLastError());
}
#else
// Get socket flags
int l_socket_flags = fcntl(a_client_pvt->stream_socket, F_GETFL);
if (l_socket_flags == -1){
......@@ -306,16 +326,8 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt)
// Make it non-block
if (fcntl( a_client_pvt->stream_socket, F_SETFL,l_socket_flags| O_NONBLOCK) == -1){
log_it(L_ERROR, "Error %d can't get socket flags", errno);
break;;
}
#ifdef _WIN32
{
int buffsize = 65536*4;
int optsize = sizeof( int );
setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (char *)&buffsize, &optsize );
setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, &optsize );
break;
}
#else
int buffsize = 65536*4;
setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, ( void *) &buffsize, sizeof(int));
setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, ( void *) &buffsize, sizeof(int));
......@@ -330,9 +342,9 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt)
.connected_callback = s_stream_es_callback_connected
};//
a_client_pvt->stream_es = dap_events_socket_wrap_no_add(a_client_pvt->events,
a_client_pvt->stream_socket, &l_s_callbacks);
(int)a_client_pvt->stream_socket, &l_s_callbacks);
a_client_pvt->stream_es->flags |= DAP_SOCK_CONNECTING ; // To catch non-blocking error when connecting we should ar WRITE flag
a_client_pvt->stream_es->flags |= DAP_SOCK_READY_TO_WRITE;
a_client_pvt->stream_es->_inheritor = a_client_pvt;
a_client_pvt->stream = dap_stream_new_es_client(a_client_pvt->stream_es);
assert(a_client_pvt->stream);
......@@ -346,6 +358,7 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt)
// connect
memset(&a_client_pvt->stream_es->remote_addr, 0, sizeof(a_client_pvt->stream_es->remote_addr));
a_client_pvt->stream_es->remote_addr_str6 = NULL; //DAP_NEW_Z_SIZE(char, INET6_ADDRSTRLEN);
a_client_pvt->stream_es->remote_addr.sin_family = AF_INET;
a_client_pvt->stream_es->remote_addr.sin_port = htons(a_client_pvt->uplink_port);
if(inet_pton(AF_INET, a_client_pvt->uplink_addr, &(a_client_pvt->stream_es->remote_addr.sin_addr)) < 0) {
......@@ -356,8 +369,7 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt)
}
else {
int l_err = 0;
strncpy( a_client_pvt->stream_es->remote_addr_str, a_client_pvt->uplink_addr,
sizeof (a_client_pvt->stream_es->remote_addr_str)-1 );
a_client_pvt->stream_es->remote_addr_str = dap_strdup(a_client_pvt->uplink_addr);
if((l_err = connect(a_client_pvt->stream_socket, (struct sockaddr *) &a_client_pvt->stream_es->remote_addr,
sizeof(struct sockaddr_in))) ==0) {
......@@ -374,7 +386,11 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt)
strncpy(l_errbuf,"Unknown Error",sizeof(l_errbuf)-1);
log_it(L_ERROR, "Remote address can't connect (%s:%u) with sock_id %d: \"%s\" (code %d)", a_client_pvt->uplink_addr,
a_client_pvt->uplink_port, l_errbuf, l_err);
#ifdef DAP_OS_WINDOWS
closesocket(a_client_pvt->stream_socket);
#else
close(a_client_pvt->stream_socket);
#endif
a_client_pvt->stream_socket = 0;
a_client_pvt->stage_status = STAGE_STATUS_ERROR;
a_client_pvt->last_error = ERROR_STREAM_CONNECT ;
......@@ -588,17 +604,17 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char
// l_url_size = strlen(l_url);
size_t l_sub_url_enc_size_max = l_sub_url_size ? (5 * l_sub_url_size + 16) : 0;
char *l_sub_url_enc = l_sub_url_size ? DAP_NEW_Z_SIZE(char, l_sub_url_enc_size_max + 1) : NULL;
char *l_sub_url_enc = l_sub_url_size ? DAP_NEW_S_SIZE(char, l_sub_url_enc_size_max + 1) : NULL;
size_t l_query_enc_size_max = (is_query_enc) ? (l_query_size * 5 + 16) : l_query_size;
char *l_query_enc =
(is_query_enc) ? (l_query_size ? DAP_NEW_Z_SIZE(char, l_query_enc_size_max + 1) : NULL) : (char*) a_query;
(is_query_enc) ? (l_query_size ? DAP_NEW_S_SIZE(char, l_query_enc_size_max + 1) : NULL) : (char*) a_query;
// size_t l_url_full_size_max = 5 * l_sub_url_size + 5 * l_query_size + 16 + l_url_size + 2;
// char * l_url_full = DAP_NEW_Z_SIZE(char, l_url_full_size_max + 1);
size_t l_request_enc_size_max = a_request_size ? a_request_size * 2 + 16 : 0;
char * l_request_enc = a_request_size ? DAP_NEW_Z_SIZE(char, l_request_enc_size_max + 1) : NULL;
char * l_request_enc = a_request_size ? DAP_NEW_S_SIZE(char, l_request_enc_size_max + 1) : NULL;
size_t l_request_enc_size = 0;
a_client_internal->request_response_callback = a_response_proc;
......@@ -647,48 +663,32 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char
snprintf(l_url_full, l_url_full_size_max, "%s", l_url);
}
*/
char *l_path = NULL;
int l_off;
char *l_path = DAP_NEW_S_SIZE(char, l_query_enc_size_max + l_sub_url_enc_size_max + 1);
if(a_path) {
if(l_sub_url_size) {
if(l_query_size) {
l_path = dap_strdup_printf("%s/%s?%s", a_path, l_sub_url_enc, l_query_enc);
} else {
l_path = dap_strdup_printf("%s/%s", a_path, l_sub_url_enc);
}
if(l_sub_url_size)
{
l_off = l_query_size
? dap_snprintf(l_path, l_query_enc_size_max + l_sub_url_enc_size_max + 1, "%s/%s?%s", a_path, l_sub_url_enc, l_query_enc)
: dap_snprintf(l_path, l_sub_url_enc_size_max + 1, "%s/%s", a_path, l_sub_url_enc);
} else {
l_path = dap_strdup(a_path);
dap_stpcpy(l_path, a_path);
}
}
size_t l_custom_count = 1;
char **l_custom_new = DAP_NEW_Z_SIZE(char*,2*sizeof (char*));
l_custom_new[0] = dap_strdup_printf("KeyID: %s", a_client_internal->session_key_id ?
a_client_internal->session_key_id : "NULL");
// close session
if(a_client_internal->is_close_session) {
l_custom_new[1] = dap_strdup("SessionCloseAfterRequest: true");
l_custom_count++;
}
size_t size_required = a_client_internal->session_key_id ? strlen(a_client_internal->session_key_id) + 40 : 40;
char *l_custom = DAP_NEW_S_SIZE(char, size_required);
size_t l_off2 = size_required;
l_off = dap_snprintf(l_custom, l_off2, "KeyID: %s\r\n", a_client_internal->session_key_id ? a_client_internal->session_key_id : "NULL");
l_off += a_client_internal->is_close_session
? dap_snprintf(l_custom + l_off, l_off2 -= l_off, "%s\r\n", "SessionCloseAfterRequest: true")
: 0;
a_client_internal->refs_count++;
dap_client_http_request_custom(a_client_internal->worker, a_client_internal->uplink_addr, a_client_internal->uplink_port, a_request ? "POST" : "GET", "text/text",
l_path, l_request_enc, l_request_enc_size, NULL,
s_request_response, s_request_error, a_client_internal, l_custom_new, l_custom_count);
// dap_http_client_simple_request_custom(l_url_full, a_request ? "POST" : "GET", "text/text",
// l_request_enc, l_request_enc_size, NULL,
// m_request_response, a_client_internal->curl_sockfd ,m_request_error, a_client_internal, a_custom_new, a_custom_count);
if(l_sub_url_enc)
DAP_DELETE(l_sub_url_enc);
if(is_query_enc && l_query_enc)
DAP_DELETE(l_query_enc);
// if(l_url_full)
// DAP_DELETE(l_url_full);
if(l_request_enc)
DAP_DELETE(l_request_enc);
s_request_response, s_request_error, a_client_internal, l_custom);
}
/**
......@@ -1090,6 +1090,8 @@ static void s_stream_es_callback_delete(dap_events_socket_t *a_es, void *arg)
}
}
dap_stream_delete(l_client_pvt->stream);
DAP_DEL_Z(l_client_pvt->stream_es->remote_addr_str)
DAP_DEL_Z(l_client_pvt->stream_es->remote_addr_str6)
l_client_pvt->stream = NULL;
l_client_pvt->stream_es = NULL;
}
......@@ -1118,7 +1120,7 @@ static void s_stream_es_callback_read(dap_events_socket_t * a_es, void * arg)
l_pos_endl = (char*) memchr(a_es->buf_in, '\r', a_es->buf_in_size - 1);
if(l_pos_endl) {
if(*(l_pos_endl + 1) == '\n') {
dap_events_socket_shrink_buf_in(a_es, l_pos_endl - a_es->buf_in_str);
dap_events_socket_shrink_buf_in(a_es, l_pos_endl - (char*)a_es->buf_in);
log_it(L_DEBUG, "Header passed, go to streaming (%lu bytes already are in input buffer",
a_es->buf_in_size);
......
......@@ -34,7 +34,7 @@ typedef void (*dap_client_http_callback_data_t)(void *, size_t, void *); // Call
void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_uplink_addr, uint16_t a_uplink_port, const char *a_method,
const char *a_request_content_type, const char * a_path, void *a_request, size_t a_request_size, char *a_cookie,
dap_client_http_callback_data_t a_response_callback, dap_client_http_callback_error_t a_error_callback,
void *a_obj, char **a_custom, size_t a_custom_count);
void *a_obj, char *a_custom);
void* dap_client_http_request(dap_worker_t * a_worker,const char *a_uplink_addr, uint16_t a_uplink_port, const char * a_method,
const char* a_request_content_type, const char * a_path, void *a_request, size_t a_request_size,
......
......@@ -39,7 +39,11 @@ typedef struct dap_client_internal
dap_http_client_t * http_client;
dap_events_socket_t * stream_es;
#ifdef DAP_OS_WINDOWS
SOCKET stream_socket;
#else
int stream_socket;
#endif
dap_stream_t* stream;
dap_stream_worker_t* stream_worker;
dap_worker_t * worker;
......
......@@ -16,6 +16,7 @@ endif()
if(WIN32)
include_directories(../../../3rdparty/uthash/src/)
include_directories(../../../3rdparty/wepoll/)
include_directories(../../../modules/net/win32)
endif()
add_library(${PROJECT_NAME} STATIC ${DAP_SERVER_CORE_HEADERS} ${DAP_SERVER_CORE_SOURCES})
......
......@@ -109,7 +109,7 @@ uint32_t dap_get_cpu_count( )
void dap_cpu_assign_thread_on(uint32_t a_cpu_id)
{
#ifndef _WIN32
#ifndef DAP_OS_WINDOWS
#ifndef NO_POSIX_SHED
cpu_set_t mask;
CPU_ZERO(&mask);
......@@ -245,15 +245,22 @@ int dap_events_start( dap_events_t *a_events )
l_worker->id = i;
l_worker->events = a_events;
pthread_rwlock_init(&l_worker->esocket_rwlock,NULL);
#ifdef DAP_EVENTS_CAPS_EPOLL
l_worker->epoll_fd = epoll_create( DAP_MAX_EVENTS_COUNT );
pthread_mutex_init(& l_worker->started_mutex, NULL);
pthread_cond_init( & l_worker->started_cond, NULL);
//log_it(L_DEBUG, "Created event_fd %d for worker %u", l_worker->epoll_fd,i);
#ifdef DAP_OS_WINDOWS
if (!l_worker->epoll_fd) {
int l_errno = WSAGetLastError();
#else
if ( l_worker->epoll_fd == -1 ) {
int l_errno = errno;
#endif
char l_errbuf[128];
strerror_r(l_errno, l_errbuf, sizeof ( l_errbuf) );
strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
log_it(L_CRITICAL, "Error create epoll fd: %s (%d)", l_errbuf, l_errno);
DAP_DELETE(l_worker);
return -1;
......
......@@ -84,7 +84,7 @@ static bool s_debug_reactor = false;
int dap_events_socket_init( )
{
log_it(L_NOTICE,"Initialized events socket module");
s_debug_reactor = dap_config_get_item_bool_default(g_config, "general","debug_reactor", false);
s_debug_reactor = g_config? dap_config_get_item_bool_default(g_config, "general","debug_reactor", false) : false;
#if defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE)
#include <sys/time.h>
#include <sys/resource.h>
......@@ -108,6 +108,16 @@ void dap_events_socket_deinit( )
{
}
#ifdef DAP_OS_WINDOWS
void __stdcall mq_receive_cb(HRESULT hr, QUEUEHANDLE qh, DWORD timeout
, DWORD action, MQMSGPROPS *pmsgprops, LPOVERLAPPED pov, HANDLE cursor) {
switch (hr) {
case MQ_OK:
SetEvent(pov->hEvent);
break;
}
}
#endif
/**
* @brief dap_events_socket_wrap
......@@ -129,7 +139,9 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events,
ret->events = a_events;
memcpy(&ret->callbacks, a_callbacks, sizeof(ret->callbacks) );
ret->flags = DAP_SOCK_READY_TO_READ;
ret->buf_in = a_callbacks->timer_callback ? NULL : DAP_NEW_Z_SIZE(byte_t, DAP_EVENTS_SOCKET_BUF + 1);
ret->buf_out = a_callbacks->timer_callback ? NULL : DAP_NEW_Z_SIZE(byte_t, DAP_EVENTS_SOCKET_BUF + 1);
ret->buf_in_size = ret->buf_out_size = 0;
#if defined(DAP_EVENTS_CAPS_EPOLL)
ret->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP;
#elif defined(DAP_EVENTS_CAPS_POLL)
......@@ -138,7 +150,12 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events,
if ( a_sock!= 0 && a_sock != -1){
pthread_rwlock_wrlock(&a_events->sockets_rwlock);
HASH_ADD(hh,a_events->sockets, socket, sizeof (int), ret);
#ifdef DAP_OS_WINDOWS
log_it(L_WARNING, "Hash add 0x%x", ret);
HASH_ADD(hh,a_events->sockets, socket, sizeof(SOCKET), ret);
#else
HASH_ADD_INT(a_events->sockets, socket, ret);
#endif
pthread_rwlock_unlock(&a_events->sockets_rwlock);
}else
log_it(L_WARNING, "Be carefull, you've wrapped socket 0 or -1 so it wasn't added to global list. Do it yourself when possible");
......@@ -205,6 +222,9 @@ void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old,
*/
dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags)
{
#ifdef DAP_OS_WINDOWS
return NULL;
#else
UNUSED(a_flags);
dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t);
l_es->type = DESCRIPTOR_TYPE_PIPE;
......@@ -226,12 +246,8 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c
l_errbuf[0]=0;
if( pipe(l_pipe) < 0 ){
l_errno = errno;
#if defined DAP_OS_UNIX
strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
log_it( L_ERROR, "Error detected, can't create pipe(): '%s' (%d)", l_errbuf, l_errno);
#elif defined DAP_OS_WINDOWS
log_it( L_ERROR, "Can't create pipe, errno: %d", l_errno);
#endif
DAP_DELETE(l_es);
return NULL;
}//else
......@@ -248,6 +264,7 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c
#error "No defined s_create_type_pipe() for your platform"
#endif
return l_es;
#endif
}
/**
......@@ -304,6 +321,8 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket
{
dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t);
l_es->type = DESCRIPTOR_TYPE_QUEUE;
l_es->buf_out = DAP_NEW_Z_SIZE(byte_t, 8 * sizeof(void*));
//l_es->buf_out_size = 8 * sizeof(void*);
l_es->events = a_es->events;
#if defined(DAP_EVENTS_CAPS_EPOLL)
l_es->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP;
......@@ -312,8 +331,6 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket
#else
#error "Not defined s_create_type_pipe for your platform"
#endif
l_es->type = DESCRIPTOR_TYPE_QUEUE;
#ifdef DAP_EVENTS_CAPS_QUEUE_MQUEUE
l_es->mqd = a_es->mqd;
char l_mq_name[64];
......@@ -342,6 +359,36 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket
assert(l_es->mqd);
#elif defined (DAP_EVENTS_CAPS_QUEUE_PIPE2)
l_es->fd = a_es->fd2;
#elif defined DAP_EVENTS_CAPS_MSMQ
l_es->mqh = a_es->mqh;
l_es->mqh_recv = a_es->mqh_recv;
l_es->socket = a_es->socket;
WCHAR l_direct_name[MQ_MAX_Q_NAME_LEN + 1] = { 0 };
size_t l_sz_in_words = sizeof(l_direct_name)/sizeof(l_direct_name[0]);
int pos = _snwprintf_s(l_direct_name, l_sz_in_words, l_sz_in_words - 1, L"DIRECT=OS:.\\PRIVATE$\\DapEventSocketQueue%d", l_es->socket);
if (pos < 0) {
log_it(L_ERROR, "Message queue path error");
DAP_DELETE(l_es);
return NULL;
}
HRESULT hr = MQOpenQueue(l_direct_name, MQ_SEND_ACCESS, MQ_DENY_NONE, &(l_es->mqh));
if (hr == MQ_ERROR_QUEUE_NOT_FOUND) {
log_it(L_INFO, "Queue still not created, wait a bit...");
Sleep(300);
hr = MQOpenQueue(l_direct_name, MQ_SEND_ACCESS, MQ_DENY_NONE, &(l_es->mqh));
if (hr != MQ_OK) {
log_it(L_ERROR, "Can't open message queue for queue type, error: 0x%x", hr);
return NULL;
}
}
hr = MQOpenQueue(l_direct_name, MQ_RECEIVE_ACCESS, MQ_DENY_NONE, &(l_es->mqh_recv));
if (hr != MQ_OK) {
log_it(L_ERROR, "Can't open message queue for queue type, error: 0x%x", hr);
return NULL;
}
#else
#error "Not defined dap_events_socket_queue_ptr_create_input() for this platform"
#endif
......@@ -372,6 +419,7 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc
l_es->worker = a_w;
}
l_es->callbacks.queue_ptr_callback = a_callback; // Arm event callback
l_es->buf_out = NULL;
#if defined(DAP_EVENTS_CAPS_EPOLL)
l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP;
......@@ -439,6 +487,97 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc
l_mq_last_number++;
}
assert(l_es->mqd);
#elif defined DAP_EVENTS_CAPS_MSMQ
l_es->socket = socket(AF_INET, SOCK_DGRAM, 0);
if (l_es->socket == INVALID_SOCKET) {
log_it(L_ERROR, "Error creating socket for TYPE_QUEUE: %d", WSAGetLastError());
DAP_DELETE(l_es);
return NULL;
}
int buffsize = 1024;
setsockopt(l_es->socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, sizeof(int));
int reuse = 1;
if (setsockopt(l_es->socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
log_it(L_WARNING, "Can't set up REUSEADDR flag to the socket, err: %d", WSAGetLastError());
unsigned long l_mode = 0;
ioctlsocket(l_es->socket, FIONBIO, &l_mode);
int l_addr_len;
struct sockaddr_in l_addr;
l_addr.sin_family = AF_INET;
IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } };
l_addr.sin_addr = _in_addr;
l_addr.sin_port = l_es->socket + 32768;
l_addr_len = sizeof(struct sockaddr_in);
if (bind(l_es->socket, (struct sockaddr*)&l_addr, sizeof(l_addr)) < 0) {
log_it(L_ERROR, "Bind error: %d", WSAGetLastError());
} else {
log_it(L_INFO, "Binded %d", l_es->socket);
}
MQQUEUEPROPS l_qps;
MQPROPVARIANT l_qp_var[1];
QUEUEPROPID l_qp_id[1];
HRESULT l_q_status[1];
WCHAR l_pathname[MAX_PATH] = { 0 };
size_t l_sz_in_words = sizeof(l_pathname)/sizeof(l_pathname[0]);
int pos = _snwprintf_s(l_pathname, l_sz_in_words, l_sz_in_words - 1, L".\\PRIVATE$\\DapEventSocketQueue%d", l_es->socket);
if (pos < 0) {
log_it(L_ERROR, "Message queue path error");
DAP_DELETE(l_es);
return NULL;
}
u_long l_p_id = 0;
l_qp_id[l_p_id] = PROPID_Q_PATHNAME;
l_qp_var[l_p_id].vt = VT_LPWSTR;
l_qp_var[l_p_id].pwszVal = l_pathname;
l_p_id++;
l_qps.cProp = l_p_id;
l_qps.aPropID = l_qp_id;
l_qps.aPropVar = l_qp_var;
l_qps.aStatus = l_q_status;
WCHAR l_direct_name[MQ_MAX_Q_NAME_LEN + 1] = { 0 };
WCHAR l_format_name[sizeof(l_direct_name) - 10] = { 0 };
DWORD l_buflen = sizeof(l_format_name);
HRESULT hr = MQCreateQueue(NULL, &l_qps, l_format_name, &l_buflen);
if ((hr != MQ_OK) && (hr != MQ_ERROR_QUEUE_EXISTS) && (hr != MQ_INFORMATION_PROPERTY)) {
log_it(L_ERROR, "Can't create message queue for queue type, error: 0x%x", hr);
DAP_DELETE(l_es);
return NULL;
}
wcsncpy(l_direct_name, L"DIRECT=OS:", 10);
wcscat_s(l_direct_name, l_buflen, l_pathname);
hr = MQOpenQueue(l_direct_name, MQ_SEND_ACCESS, MQ_DENY_NONE, &(l_es->mqh));
if (hr == MQ_ERROR_QUEUE_NOT_FOUND) {
log_it(L_INFO, "Queue still not created, wait a bit...");
Sleep(300);
hr = MQOpenQueue(l_direct_name, MQ_SEND_ACCESS, MQ_DENY_NONE, &(l_es->mqh));
if (hr != MQ_OK) {
log_it(L_ERROR, "Can't open message queue for queue type, error: 0x%x", hr);
DAP_DELETE(l_es);
MQDeleteQueue(l_format_name);
return NULL;
}
}
hr = MQOpenQueue(l_direct_name, MQ_RECEIVE_ACCESS, MQ_DENY_NONE, &(l_es->mqh_recv));
if (hr != MQ_OK) {
log_it(L_ERROR, "Can't open message queue for queue type, error: 0x%x", hr);
DAP_DELETE(l_es);
MQCloseQueue(l_es->mqh);
MQDeleteQueue(l_format_name);
return NULL;
}
#else
#error "Not implemented s_create_type_queue_ptr() on your platform"
#endif
......@@ -474,8 +613,14 @@ dap_events_socket_t * dap_events_socket_create_type_queue_ptr_unsafe(dap_worker_
dap_events_socket_t * l_es = s_create_type_queue_ptr(a_w, a_callback);
assert(l_es);
// If no worker - don't assign
if ( a_w)
dap_worker_add_events_socket_unsafe(l_es,a_w);
if ( a_w) {
if(dap_worker_add_events_socket_unsafe(l_es,a_w)) {
#ifdef DAP_OS_WINDOWS
errno = WSAGetLastError();
#endif
log_it(L_ERROR, "Can't add esocket %d to polling, err %d", l_es->socket, errno);
}
}
return l_es;
}
......@@ -506,12 +651,58 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket)
return -1;
}
a_esocket->callbacks.queue_ptr_callback (a_esocket, l_queue_ptr);
#elif defined DAP_EVENTS_CAPS_MSMQ
DWORD l_mp_id = 0;
MQMSGPROPS l_mps;
MQPROPVARIANT l_mpvar[2];
MSGPROPID l_p_id[2];
UCHAR l_body[1024] = { 0 };
l_p_id[l_mp_id] = PROPID_M_BODY;
l_mpvar[l_mp_id].vt = VT_UI1 | VT_VECTOR;
l_mpvar[l_mp_id].caub.cElems = sizeof(l_body);
l_mpvar[l_mp_id].caub.pElems = l_body;
l_mp_id++;
l_p_id[l_mp_id] = PROPID_M_BODY_SIZE;
l_mpvar[l_mp_id].vt = VT_UI4;
l_mp_id++;
l_mps.cProp = l_mp_id;
l_mps.aPropID = l_p_id;
l_mps.aPropVar = l_mpvar;
l_mps.aStatus = NULL;
HRESULT hr = MQReceiveMessage(a_esocket->mqh_recv, 1000, MQ_ACTION_RECEIVE, &l_mps, NULL, NULL, NULL, MQ_NO_TRANSACTION);
if (hr != MQ_OK) {
log_it(L_ERROR, "An error 0x%x occured receiving a message from queue", hr);
return -1;
}
if (l_mpvar[1].ulVal % sizeof(void*)) {
log_it(L_ERROR, "Queue message size incorrect: %d", l_mpvar[1].ulVal);
if (l_mpvar[1].ulVal < sizeof(void*)) {
log_it(L_ERROR, "Queue socket %d received invalid data", a_esocket->socket);
return -1;
}
}
for (u_int pad = 0; pad < l_mpvar[1].ulVal; pad += sizeof(void*)) {
memcpy(&l_queue_ptr, l_body + pad, sizeof(void*));
a_esocket->callbacks.queue_ptr_callback (a_esocket, l_queue_ptr);
}
#else
#error "No Queue fetch mechanism implemented on your platform"
#endif
}else{
} else {
#ifdef DAP_OS_WINDOWS
int l_read = dap_recvfrom(a_esocket->socket, a_esocket->buf_in, DAP_EVENTS_SOCKET_BUF);
if (l_read == SOCKET_ERROR) {
log_it(L_ERROR, "Queue socket %d received invalid data, error %d", a_esocket->socket, WSAGetLastError());
return -1;
}
#else
size_t l_read = read(a_esocket->socket, a_esocket->buf_in,sizeof(a_esocket->buf_in));
a_esocket->callbacks.queue_callback(a_esocket,a_esocket->buf_in,l_read );
#endif
a_esocket->callbacks.queue_callback(a_esocket, a_esocket->buf_in, l_read);
}
}else{
log_it(L_ERROR, "Queue socket %d accepted data but callback is NULL ", a_esocket->socket);
......@@ -529,6 +720,8 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket)
dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_callback_event_t a_callback)
{
dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); if (!l_es) return NULL;
l_es->buf_out = DAP_NEW_Z_SIZE(byte_t, 1);
l_es->buf_out_size = 1;
l_es->type = DESCRIPTOR_TYPE_EVENT;
if (a_w){
l_es->events = a_w->events;
......@@ -564,15 +757,39 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_
//log_it(L_DEBUG, "Created eventfd descriptor %d", l_es->fd );
}
#elif defined DAP_OS_WINDOWS
int l_pipe[2];
if (pipe(l_pipe) < 0) {
log_it(L_ERROR, "Can't create pipe for event type, error: %d", errno);
l_es->socket = socket(AF_INET, SOCK_DGRAM, 0);
if (l_es->socket == INVALID_SOCKET) {
log_it(L_ERROR, "Error creating socket for TYPE_QUEUE: %d", WSAGetLastError());
DAP_DELETE(l_es);
return NULL;
}
l_es->fd2 = l_pipe[0];
l_es->fd = l_pipe[1];
log_it(L_DEBUG, "Created pipe for event type, %d -> %d", l_es->fd2, l_es->fd);
int buffsize = 1024;
setsockopt(l_es->socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, sizeof(int));
unsigned long l_mode = 0;
ioctlsocket(l_es->socket, FIONBIO, &l_mode);
int reuse = 1;
if (setsockopt(l_es->socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
log_it(L_WARNING, "Can't set up REUSEADDR flag to the socket, err: %d", WSAGetLastError());
int l_addr_len;
struct sockaddr_in l_addr;
l_addr.sin_family = AF_INET;
IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } };
l_addr.sin_addr = _in_addr;
l_addr.sin_port = l_es->socket + 32768;
l_addr_len = sizeof(struct sockaddr_in);
if (bind(l_es->socket, (struct sockaddr*)&l_addr, sizeof(l_addr)) < 0) {
log_it(L_ERROR, "Bind error: %d", WSAGetLastError());
} else {
log_it(L_INFO, "Binded %d", l_es->socket);
}
#endif
return l_es;
}
......@@ -627,22 +844,24 @@ void dap_events_socket_event_proc_input_unsafe(dap_events_socket_t *a_esocket)
}else
return; // do nothing
#elif defined DAP_OS_WINDOWS
uint64_t l_value;
u_short l_value;
int l_ret;
switch (l_ret = read(a_esocket->fd, &l_value, 8)) {
case -1:
log_it(L_CRITICAL, "Can't read from event socket pipe, error: %d", errno);
switch (l_ret = dap_recvfrom(a_esocket->socket, a_esocket->buf_in, a_esocket->buf_in_size)) {
case SOCKET_ERROR:
log_it(L_CRITICAL, "Can't read from event socket, error: %d", WSAGetLastError());
break;
case 0:
return;
default:
l_value = a_esocket->buf_out[0];
log_it(L_INFO, "Proc input event %d, val %d", a_esocket->socket, l_value);
a_esocket->callbacks.event_callback(a_esocket, l_value);
break;
return;
}
#else
#error "No Queue fetch mechanism implemented on your platform"
#endif
}else
} else
log_it(L_ERROR, "Event socket %d accepted data but callback is NULL ", a_esocket->socket);
}
......@@ -703,7 +922,7 @@ static int wait_send_socket(int a_sockfd, long timeout_ms)
* @param arg
* @return
*/
void *dap_events_socket_buf_thread(void *arg)
static void *dap_events_socket_buf_thread(void *arg)
{
dap_events_socket_buf_item_t *l_item = (dap_events_socket_buf_item_t*) arg;
if(!l_item) {
......@@ -713,7 +932,12 @@ void *dap_events_socket_buf_thread(void *arg)
int l_count = 0;
while(l_res < 1 && l_count < 3) {
// wait max 5 min
#ifdef DAP_OS_WINDOWS
log_it(L_INFO, "Wait 5 minutes");
l_res = wait_send_socket(l_item->es->socket, 300000);
#else
l_res = wait_send_socket(l_item->es->fd2, 300000);
#endif
if (l_res == 0){
dap_events_socket_queue_ptr_send(l_item->es, l_item->arg);
break;
......@@ -745,8 +969,14 @@ static void add_ptr_to_buf(dap_events_socket_t * a_es, void* a_arg)
int dap_events_socket_queue_ptr_send_to_input(dap_events_socket_t * a_es_input, void * a_arg)
{
volatile void * l_arg = a_arg;
int ret= dap_events_socket_write_unsafe(a_es_input,&l_arg,sizeof (l_arg) )==sizeof (l_arg)?0:1 ;
return ret;
if (a_es_input->buf_out_size >= sizeof(void*)) {
if (memcmp(a_es_input->buf_out + a_es_input->buf_out_size - sizeof(void*), a_arg, sizeof(void*))) {
log_it(L_INFO, "Ptr 0x%x already present in input, drop it", a_arg);
return 2;
}
}
return dap_events_socket_write_unsafe(a_es_input, &l_arg, sizeof(l_arg))
== sizeof(l_arg) ? 0 : 1;
}
/**
......@@ -772,6 +1002,49 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg)
l_errno = EAGAIN;
if (l_ret == 0)
l_ret = sizeof (a_arg);
#elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX)
struct timespec l_timeout;
clock_gettime(CLOCK_REALTIME, &l_timeout);
l_timeout.tv_sec+=2; // Not wait more than 1 second to get and 2 to send
int ret = mq_timedsend(a_es->mqd, (const char *)&a_arg,sizeof (a_arg),0, &l_timeout );
int l_errno = errno;
if (ret == sizeof(a_arg) )
return 0;
else
return l_errno;
#elif defined DAP_EVENTS_CAPS_MSMQ
char *pbuf = (char *)&a_arg;
DWORD l_mp_id = 0;
MQMSGPROPS l_mps;
MQPROPVARIANT l_mpvar[1];
MSGPROPID l_p_id[1];
HRESULT l_mstatus[1];
l_p_id[l_mp_id] = PROPID_M_BODY;
l_mpvar[l_mp_id].vt = VT_VECTOR | VT_UI1;
l_mpvar[l_mp_id].caub.pElems = (unsigned char*)(pbuf);
l_mpvar[l_mp_id].caub.cElems = sizeof(void*);
l_mp_id++;
l_mps.cProp = l_mp_id;
l_mps.aPropID = l_p_id;
l_mps.aPropVar = l_mpvar;
l_mps.aStatus = l_mstatus;
HRESULT hr = MQSendMessage(a_es->mqh, &l_mps, MQ_NO_TRANSACTION);
if (hr != MQ_OK) {
log_it(L_ERROR, "An error occured on sending message to queue, errno: 0x%x", hr);
return hr;
}
if(dap_sendto(a_es->socket, NULL, 0) == SOCKET_ERROR) {
return WSAGetLastError();
} else {
return 0;
}
#else
#error "Not implemented dap_events_socket_queue_ptr_send() for this platform"
#endif
......@@ -809,9 +1082,9 @@ int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value
else
return 1;
#elif defined DAP_OS_WINDOWS
byte_t l_bytes[sizeof(void*)] = { 0 };
if(write(a_es->fd2, l_bytes, sizeof(l_bytes)) == -1) {
return errno;
a_es->buf_out[0] = (u_short)a_value;
if(dap_sendto(a_es->socket, a_es->buf_out, sizeof(uint64_t)) == SOCKET_ERROR) {
return WSAGetLastError();
} else {
return 0;
}
......@@ -855,7 +1128,8 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da
ret->server = a_server;
memcpy(&ret->callbacks,a_callbacks, sizeof ( ret->callbacks) );
ret->buf_in = ret->buf_out = a_callbacks->timer_callback ? NULL : DAP_NEW_Z_SIZE(byte_t, DAP_EVENTS_SOCKET_BUF+1);
ret->buf_in_size = ret->buf_out_size = 0;
ret->flags = DAP_SOCK_READY_TO_READ;
ret->last_time_active = ret->last_ping_request = time( NULL );
......@@ -880,7 +1154,11 @@ dap_events_socket_t *dap_events_socket_find_unsafe( int sock, struct dap_events
if(!a_events)
return NULL;
if(a_events->sockets)
#ifdef DAP_OS_WINDOWS
HASH_FIND(hh, a_events->sockets, &sock, sizeof(SOCKET), ret );
#else
HASH_FIND_INT( a_events->sockets, &sock, ret );
#endif
return ret;
}
......@@ -901,7 +1179,11 @@ void dap_events_socket_worker_poll_update_unsafe(dap_events_socket_t * a_esocket
if( a_esocket->worker){
if ( epoll_ctl(a_esocket->worker->epoll_fd, EPOLL_CTL_MOD, a_esocket->socket, &a_esocket->ev) ){
#ifdef DAP_OS_WINDOWS
int l_errno = WSAGetLastError();
#else
int l_errno = errno;
#endif
char l_errbuf[128];
l_errbuf[0]=0;
strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
......@@ -960,11 +1242,20 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool
return;
}
if ( a_is_ready )
if ( a_is_ready ) {
a_esocket->flags |= DAP_SOCK_READY_TO_WRITE;
else
#ifdef DAP_OS_WINDOWS
if (a_esocket->type == DESCRIPTOR_TYPE_QUEUE)
a_esocket->flags |= EPOLLONESHOT;
#endif
}
else {
a_esocket->flags ^= DAP_SOCK_READY_TO_WRITE;
#ifdef DAP_OS_WINDOWS
if (a_esocket->type == DESCRIPTOR_TYPE_QUEUE)
a_esocket->flags ^= EPOLLONESHOT;
#endif
}
if( a_esocket->worker )
dap_events_socket_worker_poll_update_unsafe(a_esocket);
}
......@@ -1010,25 +1301,27 @@ void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_p
pthread_rwlock_unlock( &a_esocket->events->sockets_rwlock );
}
if ( a_esocket->_inheritor && !a_preserve_inheritor )
DAP_DELETE( a_esocket->_inheritor );
if (a_esocket->_pvt)
DAP_DELETE(a_esocket->_pvt);
if (!a_preserve_inheritor )
DAP_DEL_Z(a_esocket->_inheritor)
if ( a_esocket->socket && a_esocket->socket != -1) {
#ifdef _WIN32
DAP_DEL_Z(a_esocket->_pvt)
DAP_DEL_Z(a_esocket->buf_in)
DAP_DEL_Z(a_esocket->buf_out)
#ifdef DAP_OS_WINDOWS
if ( a_esocket->socket && a_esocket->socket != SOCKET_ERROR) {
closesocket( a_esocket->socket );
#else
#else
if ( a_esocket->socket && a_esocket->socket != -1) {
close( a_esocket->socket );
#ifdef DAP_EVENTS_CAPS_QUEUE_PIPE2
#ifdef DAP_EVENTS_CAPS_QUEUE_PIPE2
if( a_esocket->type == DESCRIPTOR_TYPE_QUEUE){
close( a_esocket->fd2);
}
#endif
#endif
#endif
#endif
}
DAP_DELETE( a_esocket );
DAP_DEL_Z( a_esocket )
}
/**
......@@ -1062,8 +1355,11 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap
#endif
a_worker->event_sockets_count--;
if(a_worker->esockets)
if(a_worker->esockets) {
pthread_rwlock_wrlock(&a_worker->esocket_rwlock);
HASH_DELETE(hh_worker,a_worker->esockets, a_es);
pthread_rwlock_unlock(&a_worker->esocket_rwlock);
}
a_es->worker = NULL;
}
......@@ -1078,7 +1374,9 @@ bool dap_events_socket_check_unsafe(dap_worker_t * a_worker,dap_events_socket_t
if (a_es){
if ( a_worker->esockets){
dap_events_socket_t * l_es = NULL;
pthread_rwlock_rdlock(&a_worker->esocket_rwlock);
HASH_FIND(hh_worker,a_worker->esockets,&a_es, sizeof(a_es), l_es );
pthread_rwlock_unlock(&a_worker->esocket_rwlock);
return l_es == a_es;
}else
return false;
......@@ -1278,14 +1576,14 @@ size_t dap_events_socket_write_f_mt(dap_worker_t * a_w,dap_events_socket_t *a_es
*/
size_t dap_events_socket_write_unsafe(dap_events_socket_t *sc, const void * data, size_t data_size)
{
if(sc->buf_out_size>sizeof(sc->buf_out)){
log_it(L_DEBUG,"write buffer already overflow size=%u/max=%u", sc->buf_out_size, sizeof(sc->buf_out));
if(sc->buf_out_size > DAP_EVENTS_SOCKET_BUF){
log_it(L_DEBUG,"write buffer already overflow size=%u/max=%u", sc->buf_out_size, DAP_EVENTS_SOCKET_BUF);
return 0;
}
//log_it(L_DEBUG,"dap_events_socket_write %u sock data %X size %u", sc->socket, data, data_size );
data_size = ((sc->buf_out_size+data_size)<(sizeof(sc->buf_out)))?data_size:(sizeof(sc->buf_out)-sc->buf_out_size );
memcpy(sc->buf_out+sc->buf_out_size,data,data_size);
sc->buf_out_size+=data_size;
//log_it(L_DEBUG,"dap_events_socket_write %u sock data %X size %u", sc->socket, data, data_size );
data_size = (sc->buf_out_size + data_size < DAP_EVENTS_SOCKET_BUF) ? data_size : (DAP_EVENTS_SOCKET_BUF - sc->buf_out_size);
memcpy(sc->buf_out + sc->buf_out_size, data, data_size);
sc->buf_out_size += data_size;
dap_events_socket_set_writable_unsafe(sc, true);
return data_size;
}
......@@ -1300,18 +1598,18 @@ size_t dap_events_socket_write_f_unsafe(dap_events_socket_t *sc, const char * fo
{
//log_it(L_DEBUG,"dap_events_socket_write_f %u sock", sc->socket );
size_t max_data_size = sizeof(sc->buf_out)-sc->buf_out_size;
size_t max_data_size = DAP_EVENTS_SOCKET_BUF - sc->buf_out_size;
va_list ap;
va_start(ap,format);
int ret=dap_vsnprintf((char*) sc->buf_out+sc->buf_out_size,max_data_size,format,ap);
va_start(ap, format);
int ret=dap_vsnprintf((char*)sc->buf_out + sc->buf_out_size, max_data_size, format, ap);
va_end(ap);
if(ret>0){
sc->buf_out_size+=ret;
}else{
log_it(L_ERROR,"Can't write out formatted data '%s'",format);
if(ret > 0) {
sc->buf_out_size += (unsigned int)ret;
} else {
log_it(L_ERROR,"Can't write out formatted data '%s'", format);
}
dap_events_socket_set_writable_unsafe(sc, true);
return (ret > 0) ? ret : 0;
return (ret > 0) ? (unsigned int)ret : 0;
}
/**
......@@ -1358,3 +1656,37 @@ void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_siz
}
}
#ifdef DAP_OS_WINDOWS
int dap_recvfrom(SOCKET s, void* buf_in, size_t buf_size) {
struct sockaddr_in l_dummy;
socklen_t l_size = sizeof(l_dummy);
int ret;
if (buf_in) {
memset(buf_in, 0, buf_size);
ret = recvfrom(s, (char*)buf_in, (long)buf_size, 0, (struct sockaddr *)&l_dummy, &l_size);
} else {
char l_tempbuf[sizeof(void*)];
ret = recvfrom(s, l_tempbuf, sizeof(l_tempbuf), 0, (struct sockaddr *)&l_dummy, &l_size);
}
return ret;
}
int dap_sendto(SOCKET s, void* buf_out, size_t buf_out_size) {
int l_addr_len;
struct sockaddr_in l_addr;
l_addr.sin_family = AF_INET;
IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } };
l_addr.sin_addr = _in_addr;
l_addr.sin_port = s + 32768;
l_addr_len = sizeof(struct sockaddr_in);
int ret;
if (buf_out) {
ret = sendto(s, (char*)buf_out, (long)buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *)&l_addr, l_addr_len);
} else {
char l_bytes[sizeof(void*)] = { 0 };
ret = sendto(s, l_bytes, sizeof(l_bytes), MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *)&l_addr, l_addr_len);
}
return ret;
}
#endif
......@@ -71,13 +71,13 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg)
dap_proc_queue_msg_t * l_msg = (dap_proc_queue_msg_t*) a_msg;
assert(l_msg);
// We have callback to add in list
if (l_msg->callback){
if (l_msg->callback) {
dap_proc_queue_item_t * l_item = DAP_NEW_Z(dap_proc_queue_item_t); if (! l_item) return;
l_item->callback = l_msg->callback;
l_item->callback_arg = l_msg->callback_arg;
if ( l_queue->item_last)
l_queue->item_last->prev = l_item;
l_queue->item_last = l_item;
l_item->next=l_queue->item_last ;
......@@ -86,7 +86,6 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg)
if( !l_queue->item_first)
l_queue->item_first = l_item;
// Add on top so after call this callback will be executed first
dap_events_socket_event_signal(l_queue->proc_thread->proc_event,1);
//log_it( L_DEBUG, "Sent signal to proc thread that we have callback %p/%p on board", l_msg->callback,l_msg->callback_arg);
......@@ -94,7 +93,7 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg)
if (l_msg->signal_kill){ // Say to kill this object and delete its inherior dap_proc_queue_t
a_es->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
DAP_DELETE(l_msg);
DAP_DEL_Z(l_msg)
}
/**
......
......@@ -24,8 +24,11 @@
#include <assert.h>
#include "dap_server.h"
#if defined(DAP_EVENTS_CAPS_EPOLL)
#if defined(DAP_EVENTS_CAPS_EPOLL) && !defined(DAP_OS_WINDOWS)
#include <sys/epoll.h>
#elif defined DAP_OS_WINDOWS
#include "wepoll.h"
#include "ws2tcpip.h"
#elif defined (DAP_EVENTS_CAPS_POLL)
#include <poll.h>
#else
......@@ -167,14 +170,25 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_va
static int s_update_poll_flags(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket)
{
#ifdef DAP_EVENTS_CAPS_EPOLL
l_ev.events = a_esocket->ev_base_events;
if( a_esocket->flags & DAP_SOCK_READY_TO_READ)
l_ev.events |= EPOLLIN;
if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE)
l_ev.events |= EPOLLOUT;
l_ev.data.ptr = a_esocket ;
if( epoll_ctl(a_thread->epoll_ctl, EPOLL_CTL_MOD, a_esocket->fd , &l_ev) != 0 ){
log_it(L_CRITICAL, "Can't add proc queue on epoll ctl");
u_int events = a_esocket->ev_base_flags;
if( a_esocket->flags & DAP_SOCK_READY_TO_READ) {
events |= EPOLLIN;
#ifdef DAP_OS_WINDOWS
events ^= EPOLLONESHOT;
#endif
}
if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE) {
events |= EPOLLOUT;
#ifdef DAP_OS_WINDOWS
events |= EPOLLONESHOT;
#endif
}
a_esocket->ev.events = events;
if( epoll_ctl(a_thread->epoll_ctl, EPOLL_CTL_MOD, a_esocket->socket, &a_esocket->ev) != 0 ){
#ifdef DAP_OS_WINDOWS
errno = WSAGetLastError();
#endif
log_it(L_CRITICAL, "Can't add proc queue on epoll ctl, err: %d", errno);
return -1;
}
#elif defined (DAP_EVENTS_CAPS_POLL)
......@@ -195,7 +209,12 @@ static void * s_proc_thread_function(void * a_arg)
dap_cpu_assign_thread_on(l_thread->cpu_id);
struct sched_param l_shed_params;
l_shed_params.sched_priority = 0;
#ifdef DAP_OS_WINDOWS
if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST))
log_it(L_ERROR, "Couldn't set thread priority, err: %d", GetLastError());
#else
pthread_setschedparam(pthread_self(),SCHED_BATCH ,&l_shed_params);
#endif
l_thread->proc_queue = dap_proc_queue_create(l_thread);
// Init proc_queue for related worker
......@@ -221,40 +240,51 @@ static void * s_proc_thread_function(void * a_arg)
l_thread->queue_io_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_io );
}
#ifdef DAP_EVENTS_CAPS_EPOLL
struct epoll_event *l_epoll_events = l_thread->epoll_events, l_ev;
memset(l_thread->epoll_events, 0,sizeof (l_thread->epoll_events));
struct epoll_event l_epoll_events[ DAP_EVENTS_SOCKET_MAX]= { { 0 } };
// Create epoll ctl
l_thread->epoll_ctl = epoll_create( DAP_EVENTS_SOCKET_MAX );
// add proc queue
l_ev.events = l_thread->proc_queue->esocket->ev_base_flags;
l_ev.data.ptr = l_thread->proc_queue->esocket;
if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->proc_queue->esocket->socket , &l_ev) != 0 ){
log_it(L_CRITICAL, "Can't add proc queue on epoll ctl");
l_thread->proc_queue->esocket->ev.events = l_thread->proc_queue->esocket->ev_base_flags;
l_thread->proc_queue->esocket->ev.data.ptr = l_thread->proc_queue->esocket;
if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->proc_queue->esocket->socket , &l_thread->proc_queue->esocket->ev) != 0 ){
#ifdef DAP_OS_WINDOWS
errno = WSAGetLastError();
#endif
log_it(L_CRITICAL, "Can't add proc queue %d on epoll ctl, error", l_thread->proc_queue->esocket->socket, errno);
return NULL;
}
// Add proc event
l_ev.events = l_thread->proc_event->ev_base_flags ;
l_ev.data.ptr = l_thread->proc_event;
if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->proc_event->fd , &l_ev) != 0 ){
log_it(L_CRITICAL, "Can't add proc queue on epoll ctl");
l_thread->proc_event->ev.events = l_thread->proc_event->ev_base_flags ;
l_thread->proc_event->ev.data.ptr = l_thread->proc_event;
if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->proc_event->socket , &l_thread->proc_event->ev) != 0 ){
#ifdef DAP_OS_WINDOWS
errno = WSAGetLastError();
#endif
log_it(L_CRITICAL, "Can't add proc event on epoll ctl, err: %d", errno);
return NULL;
}
for (size_t n = 0; n< dap_events_worker_get_count(); n++){
l_ev.events = l_thread->queue_assign_input[n]->ev_base_flags ;
l_ev.data.ptr = l_thread->queue_assign_input[n];
if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_assign_input[n]->fd , &l_ev) != 0 ){
log_it(L_CRITICAL, "Can't add proc queue on epoll ctl");
l_thread->queue_assign_input[n]->ev.events = l_thread->queue_assign_input[n]->ev_base_flags ;
l_thread->queue_assign_input[n]->ev.data.ptr = l_thread->queue_assign_input[n];
if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_assign_input[n]->socket, &l_thread->queue_assign_input[n]->ev) != 0 ){
#ifdef DAP_OS_WINDOWS
errno = WSAGetLastError();
#endif
log_it(L_CRITICAL, "Can't add queue input on epoll ctl, err: %d", errno);
return NULL;
}
l_ev.events = l_thread->queue_io_input[n]->ev_base_flags ;
l_ev.data.ptr = l_thread->queue_io_input[n];
if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_io_input[n]->fd , &l_ev) != 0 ){
log_it(L_CRITICAL, "Can't add proc queue on epoll ctl");
l_thread->queue_io_input[n]->ev.events = l_thread->queue_io_input[n]->ev_base_flags ;
l_thread->queue_io_input[n]->ev.data.ptr = l_thread->queue_io_input[n];
if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_io_input[n]->fd , &l_thread->queue_io_input[n]->ev) != 0 ){
#ifdef DAP_OS_WINDOWS
errno = WSAGetLastError();
#endif
log_it(L_CRITICAL, "Can't add proc io input on epoll ctl, err: %d", errno);
return NULL;
}
}
......@@ -371,8 +401,8 @@ static void * s_proc_thread_function(void * a_arg)
continue;
}
if(s_debug_reactor)
log_it(L_DEBUG, "Poc thread #%u esocket %p fd=%d flags=0x%0X (%s:%s:%s:%s:%s:%s:%s:%s)", l_thread->cpu_id, l_cur, l_cur->socket,
l_cur_events, l_flag_read?"read":"", l_flag_write?"write":"", l_flag_error?"error":"",
log_it(L_DEBUG, "Proc thread #%u esocket %p fd=%d type=%d flags=0x%0X (%s:%s:%s:%s:%s:%s:%s:%s)", l_thread->cpu_id, l_cur, l_cur->socket,
l_cur->type, l_cur_events, l_flag_read?"read":"", l_flag_write?"write":"", l_flag_error?"error":"",
l_flag_hup?"hup":"", l_flag_rdhup?"rdhup":"", l_flag_msg?"msg":"", l_flag_nval?"nval":"", l_flag_pri?"pri":"");
//log_it(L_DEBUG,"Waked up esocket %p (socket %d) {read:%s,write:%s,error:%s} ", l_cur, l_cur->fd,
......@@ -380,21 +410,25 @@ static void * s_proc_thread_function(void * a_arg)
time_t l_cur_time = time( NULL);
l_cur->last_time_active = l_cur_time;
if (l_flag_error){
#if defined DAP_OS_UNIX
#ifdef DAP_OS_WINDOWS
int l_errno = WSAGetLastError();
#else
int l_errno = errno;
#endif
char l_errbuf[128];
strerror_r(l_errno, l_errbuf,sizeof (l_errbuf));
log_it(L_ERROR,"Some error on proc thread #%u with %d socket: %s(%d)",l_thread->cpu_id, l_cur->socket, l_errbuf, l_errno);
#elif defined DAP_OS_WINDOWS
log_it(L_ERROR,"Some error occured on thread #%u with socket %d, errno: %d",l_thread->cpu_id, l_cur->socket, errno);
#endif
if(l_cur->callbacks.error_callback)
l_cur->callbacks.error_callback(l_cur, errno);
}
if (l_flag_read ){
int32_t l_bytes_read = 0;
switch (l_cur->type) {
case DESCRIPTOR_TYPE_QUEUE:
dap_events_socket_queue_proc_input_unsafe(l_cur);
#ifdef DAP_OS_WINDOWS
l_bytes_read = dap_recvfrom(l_cur->socket, NULL, 0);
#endif
break;
case DESCRIPTOR_TYPE_EVENT:
dap_events_socket_event_proc_input_unsafe (l_cur);
......@@ -402,6 +436,9 @@ static void * s_proc_thread_function(void * a_arg)
default:
log_it(L_ERROR, "Unprocessed descriptor type accepted in proc thread loop");
#ifdef DAP_OS_WINDOWS
l_bytes_read = dap_recvfrom(l_cur->socket, NULL, 0);
#endif
break;
}
}
......@@ -414,6 +451,39 @@ static void * s_proc_thread_function(void * a_arg)
if (l_cur->flags & DAP_SOCK_QUEUE_PTR){
#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2)
l_bytes_sent = write(l_cur->socket, l_cur->buf_out, sizeof (void *) ); // We send pointer by pointer
#elif defined DAP_EVENTS_CAPS_QUEUE_POSIX
l_bytes_sent = mq_send(a_es->mqd, (const char *) l_cur->buf_out, sizeof (void *),0);
#elif defined DAP_EVENTS_CAPS_MSMQ
DWORD l_mp_id = 0;
MQMSGPROPS l_mps;
MQPROPVARIANT l_mpvar[1];
MSGPROPID l_p_id[1];
HRESULT l_mstatus[1];
l_p_id[l_mp_id] = PROPID_M_BODY;
l_mpvar[l_mp_id].vt = VT_VECTOR | VT_UI1;
l_mpvar[l_mp_id].caub.pElems = l_cur->buf_out;
l_mpvar[l_mp_id].caub.cElems = (u_long)l_cur->buf_out_size;
l_mp_id++;
l_mps.cProp = l_mp_id;
l_mps.aPropID = l_p_id;
l_mps.aPropVar = l_mpvar;
l_mps.aStatus = l_mstatus;
HRESULT hr = MQSendMessage(l_cur->mqh, &l_mps, MQ_NO_TRANSACTION);
if (hr != MQ_OK) {
log_it(L_ERROR, "An error occured on sending message to queue, errno: 0x%x", hr);
break;
} else {
if(dap_sendto(l_cur->socket, NULL, 0) == SOCKET_ERROR) {
log_it(L_ERROR, "Write to sock error: %d", WSAGetLastError());
}
l_cur->buf_out_size = 0;
dap_events_socket_set_writable_unsafe(l_cur,false);
break;
}
#elif defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE)
l_bytes_sent = mq_send(l_cur->mqd, (const char *) l_cur->buf_out, sizeof (void *),0);
if (l_bytes_sent==0)
......@@ -455,7 +525,8 @@ static void * s_proc_thread_function(void * a_arg)
}
if(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE){
#ifdef DAP_EVENTS_CAPS_EPOLL
if ( epoll_ctl( l_thread->epoll_ctl, EPOLL_CTL_DEL, l_cur->fd, &l_cur->ev ) == -1 )
log_it(L_WARNING, "Deleting esocket %d from proc thread?...", l_cur->fd);
if ( epoll_ctl( l_thread->epoll_ctl, EPOLL_CTL_DEL, l_cur->fd, &l_cur->ev) == -1 )
log_it( L_ERROR,"Can't remove event socket's handler from the epoll ctl" );
//else
// log_it( L_DEBUG,"Removed epoll's event from proc thread #%u", l_thread->cpu_id );
......
......@@ -21,15 +21,22 @@
along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef DAP_OS_WINDOWS
#include "wepoll.h"
#include <ws2tcpip.h>
#elif defined DAP_OS_UNIX
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <sys/timerfd.h>
#endif
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
......@@ -41,7 +48,6 @@
#include <stddef.h>
#include <errno.h>
#include <signal.h>
#include <sys/timerfd.h>
#include <utlist.h>
#if ! defined(_GNU_SOURCE)
#define _GNU_SOURCE
......@@ -117,9 +123,10 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16
{
assert(a_events);
dap_server_t *l_server = DAP_NEW_Z(dap_server_t);
#ifndef DAP_OS_WINDOWS
l_server->socket_listener=-1; // To diff it from 0 fd
l_server->address = a_addr? strdup( a_addr) : strdup("0.0.0.0"); // If NULL we listen everything
#endif
l_server->address = a_addr ? strdup(a_addr) : strdup("0.0.0.0"); // If NULL we listen everything
l_server->port = a_port;
l_server->type = a_type;
......@@ -127,10 +134,14 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16
l_server->socket_listener = socket(AF_INET, SOCK_STREAM, 0);
else if (l_server->type == DAP_SERVER_UDP)
l_server->socket_listener = socket(AF_INET, SOCK_DGRAM, 0);
#ifdef DAP_OS_WINDOWS
if (l_server->socket_listener == INVALID_SOCKET) {
log_it(L_ERROR, "Socket error: %d", WSAGetLastError());
#else
if (l_server->socket_listener < 0) {
int l_errno = errno;
log_it (L_ERROR,"Socket error %s (%d)",strerror(l_errno), l_errno);
#endif
DAP_DELETE(l_server);
return NULL;
}
......@@ -140,6 +151,7 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16
if (setsockopt(l_server->socket_listener, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
log_it(L_WARNING, "Can't set up REUSEADDR flag to the socket");
reuse=1;
#ifdef SO_REUSEPORT
if (setsockopt(l_server->socket_listener, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
log_it(L_WARNING, "Can't set up REUSEPORT flag to the socket");
......@@ -150,17 +162,26 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16
l_server->listener_addr.sin_port = htons(l_server->port);
inet_pton(AF_INET, l_server->address, &(l_server->listener_addr.sin_addr));
if(bind (l_server->socket_listener, (struct sockaddr *) &(l_server->listener_addr), sizeof(l_server->listener_addr)) < 0){
if(bind (l_server->socket_listener, (struct sockaddr *) &(l_server->listener_addr), sizeof(l_server->listener_addr)) < 0) {
#ifdef DAP_OS_WINDOWS
log_it(L_ERROR,"Bind error: %d", WSAGetLastError());
closesocket(l_server->socket_listener);
#else
log_it(L_ERROR,"Bind error: %s",strerror(errno));
close(l_server->socket_listener);
#endif
DAP_DELETE(l_server);
return NULL;
}else{
} else {
log_it(L_INFO,"Binded %s:%u",l_server->address,l_server->port);
listen(l_server->socket_listener, SOMAXCONN);
}
#ifdef DAP_OS_WINDOWS
u_long l_mode = 0;
ioctlsocket(l_server->socket_listener, (long)FIONBIO, &l_mode);
#else
fcntl( l_server->socket_listener, F_SETFL, O_NONBLOCK);
#endif
pthread_mutex_init(&l_server->started_mutex,NULL);
pthread_cond_init(&l_server->started_cond,NULL);
......@@ -179,7 +200,7 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16
}
// if we have poll exclusive
#if defined(DAP_EVENTS_CAPS_EPOLL)
#ifdef DAP_EVENTS_CAPS_EPOLL
for(size_t l_worker_id = 0; l_worker_id < dap_events_worker_get_count() ; l_worker_id++){
dap_worker_t *l_w = dap_events_worker_get(l_worker_id);
assert(l_w);
......@@ -188,9 +209,10 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16
if (l_es) {
l_es->type = l_server->type == DAP_SERVER_TCP ? DESCRIPTOR_TYPE_SOCKET_LISTENING : DESCRIPTOR_TYPE_SOCKET_UDP;
#ifdef DAP_EVENTS_CAPS_EPOLL
// Prepare for multi thread listening
l_es->ev_base_flags = EPOLLET| EPOLLIN | EPOLLEXCLUSIVE;
l_es->ev_base_flags = EPOLLIN;
#ifdef EPOLLEXCLUSIVE
l_es->ev_base_flags |= EPOLLET | EPOLLEXCLUSIVE;
#endif
l_es->_inheritor = l_server;
pthread_mutex_lock(&l_server->started_mutex);
......@@ -215,7 +237,7 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16
dap_worker_add_events_socket( l_es, l_w );
pthread_cond_wait(&l_server->started_cond, &l_server->started_mutex);
pthread_mutex_unlock(&l_server->started_mutex);
} else{
} else {
log_it(L_WARNING, "Can't wrap event socket for %s:%u server", a_addr, a_port);
return NULL;
}
......@@ -271,7 +293,7 @@ static void s_es_server_accept(dap_events_socket_t *a_es, int a_remote_socket, s
l_es_new = s_es_server_create(a_es->events,a_remote_socket,&l_server->client_callbacks,l_server);
//l_es_new->is_dont_reset_write_flag = true; // By default all income connection has this flag
getnameinfo(a_remote_addr,a_remote_addr_size, l_es_new->hostaddr
, sizeof(l_es_new->hostaddr),l_es_new->service,sizeof(l_es_new->service),
,256, l_es_new->service,sizeof(l_es_new->service),
NI_NUMERICHOST | NI_NUMERICSERV);
log_it(L_INFO,"Connection accepted from %s (%s)", l_es_new->hostaddr, l_es_new->service );
......@@ -298,6 +320,8 @@ static dap_events_socket_t * s_es_server_create(dap_events_t * a_events, int a_s
ret = dap_events_socket_wrap_no_add(a_events, a_sock, a_callbacks);
ret->type = DESCRIPTOR_TYPE_SOCKET;
ret->server = a_server;
ret->hostaddr = DAP_NEW_Z_SIZE(char, 256);
ret->service = DAP_NEW_Z_SIZE(char, 54);
} else {
log_it(L_CRITICAL,"Accept error: %s",strerror(errno));
......
......@@ -69,9 +69,8 @@ void __stdcall TimerAPCb(void* arg, DWORD low, DWORD high) { // Timer high valu
UNREFERENCED_PARAMETER(low)
UNREFERENCED_PARAMETER(high)
dap_timerfd_t *l_timerfd = (dap_timerfd_t *)arg;
byte_t l_bytes[sizeof(void*)] = { 0 };
if (write(l_timerfd->pipe_in, l_bytes, sizeof(l_bytes)) == -1) {
log_it(L_CRITICAL, "Error occured on writing into pipe from APC, errno: %d", errno);
if (dap_sendto(l_timerfd->tfd, NULL, 0) == SOCKET_ERROR) {
log_it(L_CRITICAL, "Error occured on writing into socket from APC, errno: %d", WSAGetLastError());
}
}
#endif
......@@ -115,14 +114,29 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t
DAP_DELETE(l_timerfd);
return NULL;
}
int l_pipe[2];
if (pipe(l_pipe) < 0) {
log_it(L_ERROR, "Can't create pipe, error: %d", errno);
DAP_DELETE(l_timerfd);
return NULL;
}
SOCKET l_tfd = socket(AF_INET, SOCK_DGRAM, 0);
int buffsize = 1024;
setsockopt(l_tfd, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, sizeof(int));
unsigned long l_mode = 0;
int l_tfd = l_pipe[0];
ioctlsocket(l_tfd, FIONBIO, &l_mode);
int l_addr_len;
struct sockaddr_in l_addr;
l_addr.sin_family = AF_INET;
IN_ADDR _in_addr = { { .S_addr = htonl(INADDR_LOOPBACK) } };
l_addr.sin_addr = _in_addr;
l_addr.sin_port = l_tfd + 32768;
l_addr_len = sizeof(struct sockaddr_in);
if (bind(l_tfd, (struct sockaddr*)&l_addr, sizeof(l_addr)) < 0) {
log_it(L_ERROR, "Bind error: %d", WSAGetLastError());
} else {
log_it(L_INFO, "Binded %d", l_tfd);
}
LARGE_INTEGER l_due_time;
l_due_time.QuadPart = (long long)a_timeout_ms * _MSEC;
if (!SetWaitableTimer(l_th, &l_due_time, 0, TimerAPCb, l_timerfd, false)) {
......@@ -151,10 +165,6 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t
l_timerfd->callback_arg = a_callback_arg;
#ifdef DAP_OS_WINDOWS
l_timerfd->th = l_th;
l_timerfd->pipe_in = l_pipe[1];
/*ioctlsocket(l_pipe[0], FIONBIO, &l_mode);
l_mode = 0;
ioctlsocket(l_pipe[1], FIONBIO, &l_mode);*/
#endif
dap_worker_add_events_socket(l_events_socket, a_worker);
return l_timerfd;
......@@ -191,8 +201,10 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock)
#endif
dap_events_socket_set_readable_unsafe(a_event_sock, true);
} else {
#ifndef DAP_OS_WINDOWS
close(l_timerfd->tfd);
#if defined DAP_OS_WINDOWS
#else
closesocket(l_timerfd->tfd);
CloseHandle(l_timerfd->th);
#endif
l_timerfd->events_socket->flags |= DAP_SOCK_SIGNAL_CLOSE;
......
......@@ -28,15 +28,19 @@
#endif
#include <fcntl.h>
#include <sys/types.h>
#ifdef DAP_OS_UNIX
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/resource.h>
#elif defined DAP_OS_WINDOWS
#include <ws2tcpip.h>
#endif
#include "dap_common.h"
#include "dap_config.h"
#include "dap_math_ops.h"
#include "dap_worker.h"
#include "dap_events.h"
#include "dap_enc_base64.h"
#define LOG_TAG "dap_worker"
......@@ -63,7 +67,8 @@ int dap_worker_init( size_t a_conn_timeout )
if ( a_conn_timeout )
s_connection_timeout = a_conn_timeout;
s_debug_reactor = dap_config_get_item_bool_default(g_config,"general","debug_reactor",false);
s_debug_reactor =g_config? dap_config_get_item_bool_default(g_config,"general","debug_reactor",false) : false;
#ifdef DAP_OS_UNIX
struct rlimit l_fdlimit;
if (getrlimit(RLIMIT_NOFILE, &l_fdlimit))
return -1;
......@@ -73,6 +78,7 @@ int dap_worker_init( size_t a_conn_timeout )
if (setrlimit(RLIMIT_NOFILE, &l_fdlimit))
return -2;
log_it(L_INFO, "Set maximum opened descriptors from %d to %d", l_oldlimit, l_fdlimit.rlim_cur);
#endif
return 0;
}
......@@ -95,7 +101,12 @@ void *dap_worker_thread(void *arg)
dap_cpu_assign_thread_on(l_worker->id);
struct sched_param l_shed_params;
l_shed_params.sched_priority = 0;
#ifdef DAP_OS_WINDOWS
if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL))
log_it(L_ERROR, "Couldn'r set thread priority, err: %d", GetLastError());
#else
pthread_setschedparam(pthread_self(),SCHED_FIFO ,&l_shed_params);
#endif
#ifdef DAP_EVENTS_CAPS_EPOLL
struct epoll_event l_epoll_events[ DAP_EVENTS_SOCKET_MAX]= {{0}};
......@@ -108,15 +119,13 @@ void *dap_worker_thread(void *arg)
#error "Unimplemented socket array for this platform"
#endif
l_worker->queue_es_new = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_add_es_callback);
l_worker->queue_es_delete = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_delete_es_callback);
l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_io_callback);
l_worker->queue_es_reassign = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_reassign_callback );
l_worker->queue_callback= dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_callback_callback);
l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback );
l_worker->queue_es_new = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_add_es_callback);
l_worker->queue_es_delete = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_delete_es_callback);
l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_es_io_callback);
l_worker->queue_es_reassign = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_es_reassign_callback );
l_worker->queue_callback = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_callback_callback);
l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback);
l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker, s_connection_timeout * 1000 / 2,
s_socket_all_check_activity, l_worker);
......@@ -136,10 +145,14 @@ void *dap_worker_thread(void *arg)
if(l_selected_sockets == -1) {
if( errno == EINTR)
continue;
#ifdef DAP_OS_WINDOWS
log_it(L_ERROR, "Worker thread %d got errno %d", l_worker->id, WSAGetLastError());
#else
int l_errno = errno;
char l_errbuf[128];
strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
log_it(L_ERROR, "Worker thread %d got errno:\"%s\" (%d)", l_worker->id, l_errbuf, l_errno);
#endif
break;
}
......@@ -149,14 +162,14 @@ void *dap_worker_thread(void *arg)
bool l_flag_hup, l_flag_rdhup, l_flag_read, l_flag_write, l_flag_error, l_flag_nval, l_flag_msg, l_flag_pri;
#ifdef DAP_EVENTS_CAPS_EPOLL
l_cur = (dap_events_socket_t *) l_epoll_events[n].data.ptr;
uint32_t l_cur_events = l_epoll_events[n];
l_flag_hup = l_epoll_events[n].events & EPOLLHUP;
l_flag_rdhup = l_epoll_events[n].events & EPOLLRDHUP;
l_flag_write = l_epoll_events[n].events & EPOLLOUT;
l_flag_read = l_epoll_events[n].events & EPOLLIN;
l_flag_error = l_epoll_events[n].events & EPOLLERR;
l_flag_pri = l_epoll_events[n].events & EPOLLPRI;
l_flag_nval = false;
uint32_t l_cur_events = l_epoll_events[n].events;
l_flag_hup = l_cur_events & EPOLLHUP;
l_flag_rdhup = l_cur_events & EPOLLRDHUP;
l_flag_write = l_cur_events & EPOLLOUT;
l_flag_read = l_cur_events & EPOLLIN;
l_flag_error = l_cur_events & EPOLLERR;
l_flag_pri = l_cur_events & EPOLLPRI;
l_flag_nval = false;
#elif defined ( DAP_EVENTS_CAPS_POLL)
short l_cur_events =l_worker->poll[n].revents;
if (!l_cur_events) // No events for this socket
......@@ -178,46 +191,42 @@ void *dap_worker_thread(void *arg)
log_it(L_ERROR, "dap_events_socket NULL");
continue;
}
if(s_debug_reactor)
log_it(L_DEBUG, "Worker #%u esocket %p fd=%d flags=0x%0X (%s:%s:%s:%s:%s:%s:%s:%s)", l_worker->id, l_cur, l_cur->socket,
if(s_debug_reactor) {
log_it(L_DEBUG, "Worker #%u esocket %p type %d fd=%d flags=0x%0X (%s:%s:%s:%s:%s:%s:%s:%s)", l_worker->id, l_cur, l_cur->type, l_cur->socket,
l_cur_events, l_flag_read?"read":"", l_flag_write?"write":"", l_flag_error?"error":"",
l_flag_hup?"hup":"", l_flag_rdhup?"rdhup":"", l_flag_msg?"msg":"", l_flag_nval?"nval":"", l_flag_pri?"pri":"");
}
int l_sock_err = 0, l_sock_err_size = sizeof(l_sock_err);
//connection already closed (EPOLLHUP - shutdown has been made in both directions)
if (l_flag_rdhup){
switch (l_cur->type ){
case DESCRIPTOR_TYPE_SOCKET_UDP:
case DESCRIPTOR_TYPE_SOCKET:
dap_events_socket_set_readable_unsafe(l_cur, false);
dap_events_socket_set_writable_unsafe(l_cur, false);
l_cur->buf_out_size = 0;
l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
l_flag_error = l_flag_write = false;
break;
default:{}
}
if(s_debug_reactor)
log_it(L_INFO,"RDHUP event on esocket %p (%d) type %d", l_cur, l_cur->socket, l_cur->type );
}
if( l_flag_hup ) {
switch (l_cur->type ){
case DESCRIPTOR_TYPE_SOCKET_UDP:
case DESCRIPTOR_TYPE_SOCKET:
getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size);
if (l_sock_err) {
dap_events_socket_set_readable_unsafe(l_cur, false);
dap_events_socket_set_writable_unsafe(l_cur, false);
l_cur->buf_out_size = 0;
l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
l_flag_error = l_flag_write = false;
l_cur->callbacks.error_callback(l_cur, l_sock_err); // Call callback to process error event
log_it(L_INFO, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err));
}
case DESCRIPTOR_TYPE_SOCKET: {
int l_err = getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size);
#ifndef DAP_OS_WINDOWS
if (l_sock_err) {
log_it(L_DEBUG, "Socket %d error %d", l_cur->socket, l_sock_err);
#else
if (l_err == SOCKET_ERROR) {
log_it(L_DEBUG, "Socket %d will be shutdown (EPOLLHUP), error %d", l_cur->socket, WSAGetLastError());
#endif
dap_events_socket_set_readable_unsafe(l_cur, false);
dap_events_socket_set_writable_unsafe(l_cur, false);
l_cur->buf_out_size = 0;
l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
l_flag_error = l_flag_write = false;
l_cur->callbacks.error_callback(l_cur, l_sock_err); // Call callback to process error event
#ifndef DAP_OS_WINDOWS
log_it(L_INFO, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err));
#endif
}
break;
default:
if(s_debug_reactor)
log_it(L_INFO,"RDHUP event on esocket %p (%d) type %d", l_cur, l_cur->socket, l_cur->type );
}
default:
if(s_debug_reactor)
log_it(L_INFO,"RDHUP event on esocket %p (%d) type %d", l_cur, l_cur->socket, l_cur->type );
}
}
......@@ -233,6 +242,9 @@ void *dap_worker_thread(void *arg)
case DESCRIPTOR_TYPE_SOCKET_LISTENING:
case DESCRIPTOR_TYPE_SOCKET:
getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size);
#ifdef DAP_OS_WINDOWS
log_it(L_ERROR, "Winsock error: %d", WSAGetLastError());
#endif
log_it(L_ERROR, "Socket error: %s", strerror(l_sock_err));
default: ;
}
......@@ -255,7 +267,7 @@ void *dap_worker_thread(void *arg)
if(l_flag_read) {
//log_it(L_DEBUG, "Comes connection with type %d", l_cur->type);
if(l_cur->buf_in_size == sizeof(l_cur->buf_in)) {
if(l_cur->buf_in_size >= DAP_EVENTS_SOCKET_BUF) {
log_it(L_WARNING, "Buffer is full when there is smth to read. Its dropped!");
l_cur->buf_in_size = 0;
}
......@@ -267,21 +279,25 @@ void *dap_worker_thread(void *arg)
case DESCRIPTOR_TYPE_PIPE:
case DESCRIPTOR_TYPE_FILE:
l_must_read_smth = true;
#ifdef DAP_OS_WINDOWS
l_bytes_read = dap_recvfrom(l_cur->socket, l_cur->buf_in + l_cur->buf_in_size, DAP_EVENTS_SOCKET_BUF - l_cur->buf_in_size);
#else
l_bytes_read = read(l_cur->socket, (char *) (l_cur->buf_in + l_cur->buf_in_size),
sizeof(l_cur->buf_in) - l_cur->buf_in_size);
#endif
l_errno = errno;
break;
case DESCRIPTOR_TYPE_SOCKET:
l_must_read_smth = true;
l_bytes_read = recv(l_cur->fd, (char *) (l_cur->buf_in + l_cur->buf_in_size),
sizeof(l_cur->buf_in) - l_cur->buf_in_size, 0);
DAP_EVENTS_SOCKET_BUF - l_cur->buf_in_size, 0);
l_errno = errno;
break;
case DESCRIPTOR_TYPE_SOCKET_UDP: {
l_must_read_smth = true;
socklen_t l_size = sizeof(l_cur->remote_addr);
l_bytes_read = recvfrom(l_cur->fd, (char *) (l_cur->buf_in + l_cur->buf_in_size),
sizeof(l_cur->buf_in) - l_cur->buf_in_size, 0,
DAP_EVENTS_SOCKET_BUF - l_cur->buf_in_size, 0,
(struct sockaddr *)&l_cur->remote_addr, &l_size);
l_errno = errno;
......@@ -293,7 +309,12 @@ void *dap_worker_thread(void *arg)
struct sockaddr l_remote_addr;
socklen_t l_remote_addr_size= sizeof (l_remote_addr);
int l_remote_socket= accept(l_cur->socket ,&l_remote_addr,&l_remote_addr_size);
#ifdef DAP_OS_WINDOWS
u_long l_mode = 0;
ioctlsocket((SOCKET)l_remote_socket, (long)FIONBIO, &l_mode);
#else
fcntl( l_remote_socket, F_SETFL, O_NONBLOCK);
#endif
int l_errno = errno;
if ( l_remote_socket == -1 ){
......@@ -314,7 +335,11 @@ void *dap_worker_thread(void *arg)
case DESCRIPTOR_TYPE_TIMER:{
uint64_t val;
/* if we not reading data from socket, he triggered again */
#ifdef DAP_OS_WINDOWS
l_bytes_read = dap_recvfrom(l_cur->socket, NULL, 0);
#else
read( l_cur->fd, &val, 8);
#endif
if (l_cur->callbacks.timer_callback)
l_cur->callbacks.timer_callback(l_cur);
else
......@@ -322,9 +347,16 @@ void *dap_worker_thread(void *arg)
} break;
case DESCRIPTOR_TYPE_QUEUE:
#ifdef DAP_OS_WINDOWS
l_bytes_read = dap_recvfrom(l_cur->socket, NULL, 0);
#endif
dap_events_socket_queue_proc_input_unsafe(l_cur);
dap_events_socket_set_writable_unsafe(l_cur, false);
break;
case DESCRIPTOR_TYPE_EVENT:
#ifdef DAP_OS_WINDOWS
l_bytes_read = dap_recvfrom(l_cur->socket, NULL, 0);
#endif
dap_events_socket_event_proc_input_unsafe(l_cur);
break;
}
......@@ -364,29 +396,42 @@ void *dap_worker_thread(void *arg)
}
}
//if (l_flag_write)
// log_it(L_DEBUG,"Alarmed write flag for remote %s", l_cur->remote_addr_str[0]?l_cur->remote_addr_str:"(null)");
// Possibly have data to read despite EPOLLRDHUP
if (l_flag_rdhup){
switch (l_cur->type ){
case DESCRIPTOR_TYPE_SOCKET_UDP:
case DESCRIPTOR_TYPE_SOCKET:
dap_events_socket_set_readable_unsafe(l_cur, false);
dap_events_socket_set_writable_unsafe(l_cur, false);
l_cur->buf_out_size = 0;
l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
l_flag_error = l_flag_write = false;
break;
default:{}
}
if(s_debug_reactor)
log_it(L_INFO,"RDHUP event on esocket %p (%d) type %d", l_cur, l_cur->socket, l_cur->type );
}
// If its outgoing connection
if ( l_flag_write && ! l_cur->server && l_cur->flags& DAP_SOCK_CONNECTING &&
if ( l_flag_write && !l_cur->server && (l_cur->flags & DAP_SOCK_CONNECTING) &&
(l_cur->type == DESCRIPTOR_TYPE_SOCKET || l_cur->type == DESCRIPTOR_TYPE_SOCKET_UDP )){
int l_error = 0;
socklen_t l_error_len = sizeof(l_error);
char l_error_buf[128];
l_error_buf[0]='\0';
getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, &l_error, &l_error_len);
getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_error, &l_error_len);
if (l_error){
strerror_r(l_error, l_error_buf, sizeof (l_error_buf));
log_it(L_ERROR,"Connecting error with %s: \"%s\" (code %d)", l_cur->remote_addr_str[0]? l_cur->remote_addr_str: "(NULL)"
,
log_it(L_ERROR,"Connecting error with %s: \"%s\" (code %d)", l_cur->remote_addr_str ? l_cur->remote_addr_str: "(NULL)",
l_error_buf, l_error);
if ( l_cur->callbacks.error_callback )
l_cur->callbacks.error_callback(l_cur, l_error);
}else if(l_error == EINPROGRESS) {
log_it(L_DEBUG, "Connecting with %s in progress...", l_cur->remote_addr_str[0]? l_cur->remote_addr_str: "(NULL)");
log_it(L_DEBUG, "Connecting with %s in progress...", l_cur->remote_addr_str ? l_cur->remote_addr_str: "(NULL)");
}else{
if(s_debug_reactor)
log_it(L_NOTICE, "Connected with %s",l_cur->remote_addr_str[0]? l_cur->remote_addr_str: "(NULL)");
log_it(L_NOTICE, "Connected with %s",l_cur->remote_addr_str ? l_cur->remote_addr_str: "(NULL)");
l_cur->flags ^= DAP_SOCK_CONNECTING;
if (l_cur->callbacks.connected_callback)
l_cur->callbacks.connected_callback(l_cur);
......@@ -426,12 +471,20 @@ void *dap_worker_thread(void *arg)
//for(total_sent = 0; total_sent < cur->buf_out_size;) { // If after callback there is smth to send - we do it
ssize_t l_bytes_sent =0;
int l_errno;
switch (l_cur->type){
case DESCRIPTOR_TYPE_SOCKET:
case DESCRIPTOR_TYPE_SOCKET: {
l_bytes_sent = send(l_cur->socket, (const char *)l_cur->buf_out,
l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL);
l_errno = errno;
#ifdef DAP_OS_WINDOWS
dap_events_socket_set_writable_unsafe(l_cur,false);
l_errno = WSAGetLastError();
#else
l_errno = errno;
#endif
break;
}
case DESCRIPTOR_TYPE_SOCKET_UDP:
l_bytes_sent = sendto(l_cur->socket, (const char *)l_cur->buf_out,
l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL,
......@@ -443,7 +496,45 @@ void *dap_worker_thread(void *arg)
#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2)
l_bytes_sent = write(l_cur->socket, l_cur->buf_out, sizeof (void *) ); // We send pointer by pointer
l_errno = errno;
#elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX)
l_bytes_sent = mq_send(a_es->mqd, (const char *)&a_arg,sizeof (a_arg),0);
#elif defined DAP_EVENTS_CAPS_MSMQ
DWORD l_mp_id = 0;
MQMSGPROPS l_mps;
MQPROPVARIANT l_mpvar[1];
MSGPROPID l_p_id[1];
HRESULT l_mstatus[1];
l_p_id[l_mp_id] = PROPID_M_BODY;
l_mpvar[l_mp_id].vt = VT_VECTOR | VT_UI1;
l_mpvar[l_mp_id].caub.pElems = l_cur->buf_out;
l_mpvar[l_mp_id].caub.cElems = (u_long)l_cur->buf_out_size;
l_mp_id++;
l_mps.cProp = l_mp_id;
l_mps.aPropID = l_p_id;
l_mps.aPropVar = l_mpvar;
l_mps.aStatus = l_mstatus;
log_it(L_INFO, "Sent to SOCKET %d", l_cur->socket);
HRESULT hr = MQSendMessage(l_cur->mqh, &l_mps, MQ_NO_TRANSACTION);
if (hr != MQ_OK) {
log_it(L_ERROR, "An error occured on sending message to queue, errno: 0x%x", hr);
break;
} else {
l_errno = WSAGetLastError();
if(dap_sendto(l_cur->socket, NULL, 0) == SOCKET_ERROR) {
log_it(L_ERROR, "Write to socket error: %d", WSAGetLastError());
}
l_cur->buf_out_size = 0;
dap_events_socket_set_writable_unsafe(l_cur,false);
break;
}
#ifndef DAP_OS_WINDOWS
l_errno = errno;
#endif
#elif defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE)
l_bytes_sent = mq_send(l_cur->mqd , (const char *)l_cur->buf_out,sizeof (void*),0);
if(l_bytes_sent == 0)
......@@ -558,15 +649,17 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg)
// Socket already present in worker, it's OK
return;
}
switch( l_es_new->type){
case DESCRIPTOR_TYPE_SOCKET_UDP:
case DESCRIPTOR_TYPE_SOCKET:
case DESCRIPTOR_TYPE_SOCKET_LISTENING:{
#ifdef DAP_OS_UNIX
int l_cpu = l_worker->id;
setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu));
}break;
#endif
} break;
default: {}
}
......@@ -587,7 +680,9 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg)
// Add in global list
// Add in worker
l_es_new->me = l_es_new;
pthread_rwlock_wrlock(&l_worker->esocket_rwlock);
HASH_ADD(hh_worker, l_worker->esockets, me, sizeof(void *), l_es_new );
pthread_rwlock_unlock(&l_worker->esocket_rwlock);
l_worker->event_sockets_count++;
//log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id);
if (l_es_new->callbacks.worker_assign_callback)
......@@ -674,7 +769,9 @@ static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg)
// Check if it was removed from the list
dap_events_socket_t *l_msg_es = NULL;
pthread_rwlock_rdlock(&a_es->worker->esocket_rwlock);
HASH_FIND(hh_worker, a_es->worker->esockets, &l_msg->esocket , sizeof (void*), l_msg_es );
pthread_rwlock_unlock(&a_es->worker->esocket_rwlock);
if ( l_msg_es == NULL){
log_it(L_INFO, "We got i/o message for client thats now not in list. Lost %u data", l_msg->data_size);
DAP_DELETE(l_msg);
......@@ -718,7 +815,7 @@ static bool s_socket_all_check_activity( void * a_arg)
time_t l_curtime= time(NULL);
ctime_r(&l_curtime, l_curtimebuf);
//log_it(L_DEBUG,"Check sockets activity on worker #%u at %s", l_worker->id, l_curtimebuf);
pthread_rwlock_rdlock(&l_worker->esocket_rwlock);
HASH_ITER(hh_worker, l_worker->esockets, l_es, tmp ) {
if ( l_es->type == DESCRIPTOR_TYPE_SOCKET || l_es->type == DESCRIPTOR_TYPE_SOCKET_UDP ){
if ( !(l_es->flags & DAP_SOCK_SIGNAL_CLOSE) &&
......@@ -731,6 +828,7 @@ static bool s_socket_all_check_activity( void * a_arg)
}
}
}
pthread_rwlock_unlock(&l_worker->esocket_rwlock);
return true;
}
......@@ -745,8 +843,8 @@ void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_wor
if(l_ret != 0 ){
char l_errbuf[128];
*l_errbuf = 0;
strerror_r(l_ret,l_errbuf,sizeof (l_errbuf));
log_it(L_ERROR, "Cant send pointer in queue: \"%s\"(code %d)", l_errbuf, l_ret);
strerror_r(l_ret, l_errbuf, sizeof(l_errbuf));
log_it(L_ERROR, "Can't send pointer in queue: \"%s\"(code %d)", l_errbuf, l_ret);
}
}
......
......@@ -54,7 +54,14 @@
#define DAP_EVENTS_CAPS_EPOLL
#define DAP_EVENTS_CAPS_QUEUE_WEVENT
#define DAP_EVENTS_CAPS_EVENT_WEVENT
#define DAP_EVENTS_CAPS_PIPE_POSIX
//#define DAP_EVENTS_CAPS_PIPE_POSIX
#define DAP_EVENTS_CAPS_MSMQ
#define INET_ADDRSTRLEN 16
#define INET6_ADDRSTRLEN 46
#include <mq.h>
#include <ws2tcpip.h>
#define MSG_DONTWAIT 0
#define MSG_NOSIGNAL 0
#endif
#if defined(DAP_EVENTS_CAPS_WEPOLL)
......@@ -97,12 +104,12 @@ typedef struct dap_events_socket_callbacks {
union{ // Specific callbacks
dap_events_socket_callback_connected_t connected_callback; // Connected callback for client socket
dap_events_socket_callback_accept_t accept_callback; // Accept callback for listening socket
dap_events_socket_callback_timer_t timer_callback; // Timer callback for listening socket
dap_events_socket_callback_event_t event_callback; // Event callback for listening socket
dap_events_socket_callback_queue_t queue_callback; // Queue callback for listening socket
dap_events_socket_callback_queue_ptr_t queue_ptr_callback; // queue_ptr callback for listening socket
};
dap_events_socket_callback_timer_t timer_callback; // Timer callback for listening socket
dap_events_socket_callback_t new_callback; // Create new client callback
dap_events_socket_callback_t delete_callback; // Delete client callback
dap_events_socket_callback_t read_callback; // Read function
......@@ -128,18 +135,24 @@ typedef enum {
} dap_events_desc_type_t;
typedef struct dap_events_socket {
union{
union {
#ifdef DAP_OS_WINDOWS
SOCKET socket;
#else
int socket;
#endif
int fd;
#if defined(DAP_EVENTS_CAPS_QUEUE_MQUEUE)
mqd_t mqd;
#endif
};
#if defined(DAP_EVENTS_CAPS_QUEUE_MQUEUE)
uint32_t mqd_id;
#elif defined DAP_EVENTS_CAPS_MSMQ
};
QUEUEHANDLE mqh, mqh_recv;
HANDLE ev_timeout, ev_recv;
#endif
#ifdef DAP_EVENTS_CAPS_PIPE_POSIX
#if defined DAP_EVENTS_CAPS_PIPE_POSIX
int fd2;
#endif
dap_events_desc_type_t type;
......@@ -156,26 +169,31 @@ typedef struct dap_events_socket {
uint32_t buf_out_zero_count;
// Input section
union{
uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data
char buf_in_str[DAP_EVENTS_SOCKET_BUF+1];
};
//uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data
//char buf_in_str[DAP_EVENTS_SOCKET_BUF+1];
byte_t *buf_in;
//char *buf_in_str;
size_t buf_in_size; // size of data that is in the input buffer
// Output section
byte_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data
//byte_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data
byte_t *buf_out;
size_t buf_out_size; // size of data that is in the output buffer
dap_events_socket_t * pipe_out; // Pipe socket with data for output
// Stored string representation
char hostaddr[1024]; // Address
char service[128];
//char hostaddr[1024]; // Address
//char service[128];
char *hostaddr;
char *service;
// Remote address, port and others
struct sockaddr_in remote_addr;
char remote_addr_str[INET_ADDRSTRLEN];
char remote_addr_str6[INET6_ADDRSTRLEN];
//char remote_addr_str[INET_ADDRSTRLEN];
//char remote_addr_str6[INET6_ADDRSTRLEN];
char *remote_addr_str;
char *remote_addr_str6;
short remote_port;
......@@ -202,7 +220,6 @@ typedef struct dap_events_socket {
void *_inheritor; // Inheritor data to specific client type, usualy states for state machine
void *_pvt; //Private section, different for different types
struct dap_events_socket * me; // pointer on itself
UT_hash_handle hh;
UT_hash_handle hh_worker; // Handle for local CPU storage on worker
} dap_events_socket_t; // Node of bidirectional list of clients
......@@ -271,4 +288,7 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool
void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap_worker_t * a_worker);
void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size);
#ifdef DAP_OS_WINDOWS
int dap_recvfrom(SOCKET s, void* buf_in, size_t buf_size);
int dap_sendto(SOCKET s, void* buf_in, size_t buf_size);
#endif
......@@ -31,9 +31,6 @@
#include <ws2tcpip.h>
#include <io.h>
#include "win32/ip.h"
#include "win32/iphdr.h"
#define s6_addr32 s6_addr
#define herror perror
#else
......
......@@ -45,7 +45,6 @@ typedef struct dap_proc_thread{
#ifdef DAP_EVENTS_CAPS_EPOLL
EPOLL_HANDLE epoll_ctl;
struct epoll_event epoll_events[DAP_EVENTS_SOCKET_MAX];
#elif defined (DAP_EVENTS_CAPS_POLL)
int poll_fd;
struct pollfd * poll;
......
......@@ -65,8 +65,11 @@ typedef struct dap_server {
dap_server_type_t type; // Server's type
uint16_t port; // Listen port
char *address; // Listen address
#ifdef DAP_OS_WINDOWS
SOCKET socket_listener;
#else
int32_t socket_listener; // Socket for listener
#endif
dap_list_t *es_listeners;
struct sockaddr_in listener_addr; // Kernel structure for listener's binded address
......
......@@ -42,13 +42,17 @@ typedef bool (*dap_timerfd_callback_t)(void* ); // Callback for timer. If return
typedef struct dap_timerfd {
uint64_t timeout_ms;
#ifdef DAP_OS_WINDOWS
SOCKET tfd;
#else
int tfd; //timer file descriptor
#endif
dap_events_socket_t *events_socket;
dap_timerfd_callback_t callback;
void *callback_arg;
#ifdef DAP_OS_WINDOWS
HANDLE th;
int pipe_in;
SOCKET pipe_in;
#endif
} dap_timerfd_t;
......