Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (3)
Showing
with 292 additions and 317 deletions
......@@ -2,7 +2,7 @@ project(cellframe-sdk C)
cmake_minimum_required(VERSION 3.0)
set(CMAKE_C_STANDARD 11)
set(CELLFRAME_SDK_NATIVE_VERSION "2.9-16")
set(CELLFRAME_SDK_NATIVE_VERSION "2.9-20")
add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"")
set(DAPSDK_MODULES "")
message("Cellframe modules: ${CELLFRAME_MODULES}")
......
......@@ -47,6 +47,7 @@ win32 {
LIBS += -lntdll -lpsapi -ljson-c -lmagic -lmqrt -lshlwapi -lregex -ltre -lintl -liconv -lbcrypt -lcrypt32 -lsecur32 -luser32 -lws2_32 -lole32
include($$PWD/../../3rdparty/wepoll/wepoll.pri)
DEFINES += DAP_OS_WINDOWS
QMAKE_CFLAGS_DEBUG += -Wall -ggdb -g3
}
# 3rd party
......
......@@ -24,5 +24,6 @@
#pragma once
#include "dap_math_ops.h"
uint128_t dap_uuid_generate_uint128(); // Produce uint64 unique id
uint128_t dap_uuid_generate_uint128(); // Produce uint128 unique id
uint64_t dap_uuid_generate_uint64(); // Produce uint64 unique id
......@@ -30,7 +30,8 @@
#define LOG_TAG "dap_uuid"
atomic_uint_fast32_t s_global_counter=0;
atomic_uint_fast32_t s_global_counter32=0;
atomic_uint_fast16_t s_global_counter16=0;
/**
* @brief dap_uuid_generate_ui64
......@@ -42,12 +43,32 @@ uint128_t dap_uuid_generate_uint128()
uint32_t l_input[4] ={
[0]=random_uint32_t(UINT32_MAX),
[1]=time(NULL),
[2]=s_global_counter++,
[2]=s_global_counter32++,
[3]=random_uint32_t(UINT32_MAX)
};
uint128_t l_output;
SHAKE128((unsigned char *) &l_output,sizeof (l_output), (unsigned char*) &l_input,sizeof (l_input));
uint64_t *l_output_u64 =(uint64_t*) &l_output;
// uint64_t *l_output_u64 =(uint64_t*) &l_output;
// log_it(L_DEBUG,"UUID generated 0x%016X%016X (0x%08X%08X%08X%08X",l_output_u64[0],l_output_u64[1],
// l_input[0],l_input[1],l_input[2],l_input[3]);
return l_output;
}
/**
* @brief dap_uuid_generate_uint64
* @return
*/
uint64_t dap_uuid_generate_uint64()
{
uint32_t l_ts = (uint32_t) time(NULL);
uint16_t l_input[4] ={
[0]=dap_random_uint16(),
[1]= l_ts % UINT16_MAX,
[2]= s_global_counter16++,
[3]= dap_random_uint16()
};
uint64_t l_output;
SHAKE128((unsigned char *) &l_output,sizeof (l_output), (unsigned char*) &l_input,sizeof (l_input));
// log_it(L_DEBUG,"UUID generated 0x%016X%016X (0x%08X%08X%08X%08X",l_output_u64[0],l_output_u64[1],
// l_input[0],l_input[1],l_input[2],l_input[3]);
return l_output;
......
......@@ -26,6 +26,29 @@ uint32_t random_uint32_t(const uint32_t MAX_NUMBER)
return ret;
}
/**
* @brief dap_random_byte
* @return
*/
byte_t dap_random_byte()
{
byte_t ret;
randombytes(&ret, 1);
return ret;
}
/**
* @brief dap_random_uint16
* @return
*/
uint16_t dap_random_uint16()
{
uint16_t l_ret;
randombytes(&l_ret, 2);
return l_ret;
}
int randombase64(void*random_array, unsigned int size)
{
int off = size - (size/4)*3;
......
......@@ -6,5 +6,7 @@
int randombytes(void* random_array, unsigned int nbytes);
int randombase64(void*random_array, unsigned int size);
uint32_t random_uint32_t(const uint32_t MAX_NUMBER);
byte_t dap_random_byte();
uint16_t dap_random_uint16();
#endif
......@@ -180,15 +180,14 @@ void dap_client_http_set_connect_timeout_ms(uint64_t a_timeout_ms)
*/
static bool s_timer_timeout_after_connected_check(void * a_arg)
{
dap_events_socket_handler_t *l_es_handler = (dap_events_socket_handler_t*) a_arg;
dap_events_socket_handle_t *l_es_handler = (dap_events_socket_handle_t*) a_arg;
assert(l_es_handler);
dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); // We're in own esocket context
assert(l_worker);
dap_events_socket_t * l_es;
if(l_es = dap_worker_esocket_find_uuid( l_worker, l_es_handler->esocket_uuid ) ){
if(dap_events_socket_check_uuid_unsafe( l_worker, l_es_handler->esocket,l_es_handler->uuid ) ){
dap_events_socket_t * l_es = l_es_handler->esocket;
assert(l_es);
dap_client_http_pvt_t * l_http_pvt = PVT(l_es);
assert(l_http_pvt);
if ( time(NULL)- l_http_pvt->ts_last_read >= (time_t) s_client_timeout_read_after_connect_ms){
......@@ -206,7 +205,7 @@ static bool s_timer_timeout_after_connected_check(void * a_arg)
}
}else{
if(s_debug_more)
log_it(L_DEBUG,"Esocket %p is finished, close check timer", l_es_handler->esocket);
log_it(L_DEBUG,"Esocket %llu is finished, close check timer", l_es_handler->esocket_uuid);
}
DAP_DEL_Z(l_es_handler)
return false;
......@@ -220,23 +219,13 @@ static bool s_timer_timeout_after_connected_check(void * a_arg)
*/
static bool s_timer_timeout_check(void * a_arg)
{
dap_events_socket_handler_t *l_es_handler = (dap_events_socket_handler_t*) a_arg;
dap_events_socket_handle_t *l_es_handler = (dap_events_socket_handle_t*) a_arg;
assert(l_es_handler);
dap_events_socket_t * l_es = l_es_handler->esocket;
assert(l_es);
dap_events_t * l_events = dap_events_get_default();
assert(l_events);
dap_worker_t * l_worker = dap_events_get_current_worker(l_events); // We're in own esocket context
dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); // We're in own esocket context
assert(l_worker);
if(dap_events_socket_check_unsafe(l_worker, l_es) ){
if (!dap_uint128_check_equal(l_es->uuid,l_es_handler->uuid)){
if(s_debug_more)
log_it(L_DEBUG,"Timer esocket wrong argument, ignore this timeout...");
DAP_DEL_Z(l_es_handler)
return false;
}
dap_events_socket_t * l_es;
if(l_es = dap_worker_esocket_find_uuid(l_worker, l_es_handler->esocket_uuid)){
if (l_es->flags & DAP_SOCK_CONNECTING ){
dap_client_http_pvt_t * l_http_pvt = PVT(l_es);
log_it(L_WARNING,"Connecting timeout for request http://%s:%u/%s, possible network problems or host is down",
......@@ -254,7 +243,7 @@ static bool s_timer_timeout_check(void * a_arg)
log_it(L_DEBUG,"Socket %d is connected, close check timer", l_es->socket);
}else
if(s_debug_more)
log_it(L_DEBUG,"Esocket %p is finished, close check timer", l_es);
log_it(L_DEBUG,"Esocket %llu is finished, close check timer", l_es_handler->esocket_uuid);
DAP_DEL_Z(l_es_handler)
return false;
......@@ -626,9 +615,9 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_upli
log_it(L_DEBUG, "Connecting to %s:%u", a_uplink_addr, a_uplink_port);
l_http_pvt->worker = a_worker?a_worker: dap_events_worker_get_auto();
dap_worker_add_events_socket(l_ev_socket,l_http_pvt->worker);
dap_events_socket_handler_t * l_ev_socket_handler = DAP_NEW_Z(dap_events_socket_handler_t);
l_ev_socket_handler->esocket = l_ev_socket;
l_ev_socket_handler->uuid = l_ev_socket->uuid;
dap_events_socket_handle_t * l_ev_socket_handler = DAP_NEW_Z(dap_events_socket_handle_t);
l_ev_socket_handler->esocket_uuid = l_ev_socket->uuid;
dap_timerfd_start_on_worker(l_http_pvt->worker,s_client_timeout_ms, s_timer_timeout_check,l_ev_socket_handler);
return l_http_pvt;
} else {
......@@ -646,9 +635,8 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_upli
log_it(L_DEBUG, "Connecting to %s:%u", a_uplink_addr, a_uplink_port);
l_http_pvt->worker = a_worker?a_worker: dap_events_worker_get_auto();
dap_worker_add_events_socket(l_ev_socket,l_http_pvt->worker);
dap_events_socket_handler_t * l_ev_socket_handler = DAP_NEW_Z(dap_events_socket_handler_t);
l_ev_socket_handler->esocket = l_ev_socket;
l_ev_socket_handler->uuid = l_ev_socket->uuid;
dap_events_socket_handle_t * l_ev_socket_handler = DAP_NEW_Z(dap_events_socket_handle_t);
l_ev_socket_handler->esocket_uuid = l_ev_socket->uuid;
dap_timerfd_start_on_worker(l_http_pvt->worker,s_client_timeout_ms, s_timer_timeout_check,l_ev_socket_handler);
return l_http_pvt;
}
......@@ -686,9 +674,9 @@ static void s_http_ssl_connected(dap_events_socket_t * a_esocket)
a_esocket->flags |= DAP_SOCK_CONNECTING;
a_esocket->flags |= DAP_SOCK_READY_TO_WRITE;
a_esocket->callbacks.connected_callback = s_http_connected;
dap_events_socket_handler_t * l_ev_socket_handler = DAP_NEW_Z(dap_events_socket_handler_t);
dap_events_socket_handle_t * l_ev_socket_handler = DAP_NEW_Z(dap_events_socket_handle_t);
l_ev_socket_handler->esocket = a_esocket;
l_ev_socket_handler->uuid = a_esocket->uuid;
l_ev_socket_handler->esocket_uuid = a_esocket->uuid;
dap_timerfd_start_on_worker(l_http_pvt->worker, s_client_timeout_ms, s_timer_timeout_check, l_ev_socket_handler);
}
#endif
......@@ -709,9 +697,8 @@ static void s_http_connected(dap_events_socket_t * a_esocket)
// add to dap_worker
//dap_client_pvt_t * l_client_pvt = (dap_client_pvt_t*) a_obj;
//dap_events_new();
dap_events_socket_handler_t * l_ev_socket_handler = DAP_NEW_Z(dap_events_socket_handler_t);
l_ev_socket_handler->esocket = a_esocket;
l_ev_socket_handler->uuid = a_esocket->uuid;
dap_events_socket_handle_t * l_ev_socket_handler = DAP_NEW_Z(dap_events_socket_handle_t);
l_ev_socket_handler->esocket_uuid = a_esocket->uuid;
dap_timerfd_start_on_worker(l_http_pvt->worker, (unsigned long)s_client_timeout_read_after_connect_ms * 1000, s_timer_timeout_after_connected_check, l_ev_socket_handler);
char l_request_headers[1024] = { [0]='\0' };
......
......@@ -206,9 +206,9 @@ static void s_stream_connected(dap_client_pvt_t * a_client_pvt)
a_client_pvt->uplink_port, a_client_pvt->stream_socket, a_client_pvt->stream_worker->worker->id);
a_client_pvt->stage_status = STAGE_STATUS_DONE;
s_stage_status_after(a_client_pvt);
dap_events_socket_handler_t * l_ev_socket_handler = DAP_NEW_Z(dap_events_socket_handler_t);
l_ev_socket_handler->esocket = a_client_pvt->stream_es;
l_ev_socket_handler->uuid = a_client_pvt->stream_es->uuid;
dap_events_socket_handle_t * l_ev_socket_handler = DAP_NEW_Z(dap_events_socket_handle_t);
assert(a_client_pvt->stream_es);
l_ev_socket_handler->esocket_uuid = a_client_pvt->stream_es->uuid;
dap_timerfd_start_on_worker(a_client_pvt->stream_es->worker, s_client_timeout_read_after_connect * 1000, s_stream_timer_timeout_after_connected_check ,l_ev_socket_handler);
}
......@@ -219,23 +219,14 @@ static void s_stream_connected(dap_client_pvt_t * a_client_pvt)
*/
static bool s_stream_timer_timeout_check(void * a_arg)
{
dap_events_socket_handler_t *l_es_handler = (dap_events_socket_handler_t*) a_arg;
dap_events_socket_handle_t *l_es_handler = (dap_events_socket_handle_t*) a_arg;
assert(l_es_handler);
dap_events_socket_t * l_es = l_es_handler->esocket;
assert(l_es);
dap_events_t * l_events = dap_events_get_default();
assert(l_events);
dap_worker_t * l_worker =(dap_worker_t*) pthread_getspecific(l_events->pth_key_worker); // We're in own esocket context
dap_worker_t *l_worker = dap_events_get_current_worker(dap_events_get_default());
assert(l_worker);
if(dap_events_socket_check_unsafe(l_worker, l_es) ){
if (!dap_uint128_check_equal(l_es->uuid,l_es_handler->uuid)){
if(s_debug_more)
log_it(L_DEBUG,"Timer esocket wrong argument, ignore this timeout...");
DAP_DEL_Z(l_es_handler)
return false;
}
dap_events_socket_t * l_es;
if(l_es = dap_worker_esocket_find_uuid(l_worker, l_es_handler->esocket_uuid ) ){
if (l_es->flags & DAP_SOCK_CONNECTING ){
dap_client_pvt_t * l_client_pvt =(dap_client_pvt_t *) l_es->_inheritor;//(l_client) ? DAP_CLIENT_PVT(l_client) : NULL;
......@@ -253,7 +244,7 @@ static bool s_stream_timer_timeout_check(void * a_arg)
log_it(L_DEBUG,"Socket %d is connected, close check timer", l_es->socket);
}else
if(s_debug_more)
log_it(L_DEBUG,"Esocket %p is finished, close check timer", l_es);
log_it(L_DEBUG,"Esocket %llu is finished, close check timer", l_es_handler->esocket_uuid);
DAP_DEL_Z(l_es_handler)
return false;
......@@ -266,23 +257,14 @@ static bool s_stream_timer_timeout_check(void * a_arg)
*/
static bool s_stream_timer_timeout_after_connected_check(void * a_arg)
{
dap_events_socket_handler_t *l_es_handler = (dap_events_socket_handler_t*) a_arg;
dap_events_socket_handle_t *l_es_handler = (dap_events_socket_handle_t*) a_arg;
assert(l_es_handler);
dap_events_socket_t * l_es = l_es_handler->esocket;
assert(l_es);
dap_events_t * l_events = dap_events_get_default();
assert(l_events);
dap_worker_t * l_worker =(dap_worker_t*) pthread_getspecific(l_events->pth_key_worker); // We're in own esocket context
dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default());
assert(l_worker);
if(dap_events_socket_check_unsafe(l_worker, l_es) ){
if (!dap_uint128_check_equal(l_es->uuid,l_es_handler->uuid)){
if(s_debug_more)
log_it(L_DEBUG,"Streaming socket timer wrong argument, ignore this timeout...");
DAP_DEL_Z(l_es_handler)
return false;
}
dap_events_socket_t * l_es;
if( l_es = dap_worker_esocket_find_uuid(l_worker, l_es_handler->esocket_uuid) ){
dap_client_pvt_t * l_client_pvt =(dap_client_pvt_t *) l_es->_inheritor;//(l_client) ? DAP_CLIENT_PVT(l_client) : NULL;
if ( time(NULL)- l_client_pvt->ts_last_read >= s_client_timeout_read_after_connect){
......@@ -300,7 +282,7 @@ static bool s_stream_timer_timeout_after_connected_check(void * a_arg)
log_it(L_DEBUG,"Streaming socket %d is connected, close check timer", l_es->socket);
}else
if(s_debug_more)
log_it(L_DEBUG,"Streaming socket %p is finished, close check timer", l_es);
log_it(L_DEBUG,"Streaming socket %llu is finished, close check timer", l_es_handler->esocket_uuid);
DAP_DEL_Z(l_es_handler)
return false;
......@@ -502,9 +484,9 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt)
dap_worker_add_events_socket( a_client_pvt->stream_es, l_worker);
// Add check timer
dap_events_socket_handler_t * l_stream_es_handler = DAP_NEW_Z(dap_events_socket_handler_t);
l_stream_es_handler->esocket = a_client_pvt->stream_es;
l_stream_es_handler->uuid = a_client_pvt->stream_es->uuid;
dap_events_socket_handle_t * l_stream_es_handler = DAP_NEW_Z(dap_events_socket_handle_t);
assert(a_client_pvt->stream_es);
l_stream_es_handler->esocket_uuid = a_client_pvt->stream_es->uuid;
dap_timerfd_start_on_worker(a_client_pvt->worker, (unsigned long)s_client_timeout_read_after_connect * 1000,
s_stream_timer_timeout_check,l_stream_es_handler);
}
......@@ -531,9 +513,9 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt)
log_it(L_INFO,"Connecting stream to remote %s:%u",a_client_pvt->uplink_addr, a_client_pvt->uplink_port);
// add to dap_worker
dap_worker_add_events_socket( a_client_pvt->stream_es, l_worker);
dap_events_socket_handler_t * l_stream_es_handler = DAP_NEW_Z(dap_events_socket_handler_t);
l_stream_es_handler->esocket = a_client_pvt->stream_es;
l_stream_es_handler->uuid = a_client_pvt->stream_es->uuid;
dap_events_socket_handle_t * l_stream_es_handler = DAP_NEW_Z(dap_events_socket_handle_t);
assert (a_client_pvt->stream_es);
l_stream_es_handler->esocket_uuid = a_client_pvt->stream_es->uuid;
dap_timerfd_start_on_worker(a_client_pvt->worker, (unsigned long)s_client_timeout_read_after_connect * 1000,
s_stream_timer_timeout_check,l_stream_es_handler);
}
......
......@@ -249,6 +249,7 @@ dap_events_t * dap_events_new( )
pthread_rwlock_init( &ret->sockets_rwlock, NULL );
if ( s_events_default == NULL)
s_events_default = ret;
s_events_default->sockets = NULL;
pthread_key_create( &ret->pth_key_worker, NULL);
return ret;
......@@ -268,18 +269,30 @@ void dap_events_delete( dap_events_t *a_events )
if (a_events) {
dap_events_socket_t *l_cur, *l_tmp;
HASH_ITER( hh, a_events->sockets,l_cur, l_tmp ) {
HASH_DEL(a_events->sockets, l_cur);
dap_events_socket_remove_and_delete_unsafe( l_cur, true );
}
if ( a_events->_inheritor )
DAP_DELETE( a_events->_inheritor );
pthread_rwlock_destroy( &a_events->sockets_rwlock );
DAP_DELETE( a_events );
}
}
void dap_events_remove_and_delete_socket_unsafe(dap_events_t *a_events, dap_events_socket_t *a_socket, bool preserve_inheritor) {
if (!a_events)
return;
pthread_rwlock_wrlock(&a_events->sockets_rwlock);
dap_events_socket_t * l_es_find = NULL;
HASH_FIND_INT( a_events->sockets, &a_socket->socket, l_es_find );
if (l_es_find) {
HASH_DEL(a_events->sockets, l_es_find);
dap_events_socket_remove_and_delete_unsafe(l_es_find, preserve_inheritor);
}
pthread_rwlock_unlock(&a_events->sockets_rwlock);
}
/**
* @brief sa_server_loop Main server loop
* @param sh Server instance
......@@ -293,6 +306,7 @@ int dap_events_start( dap_events_t *a_events )
l_worker->id = i;
l_worker->events = a_events;
l_worker->esockets = NULL;
pthread_rwlock_init(&l_worker->esocket_rwlock,NULL);
pthread_mutex_init(& l_worker->started_mutex, NULL);
pthread_cond_init( & l_worker->started_cond, NULL);
......@@ -324,7 +338,6 @@ int dap_events_start( dap_events_t *a_events )
clock_gettime(CLOCK_REALTIME, &l_timeout);
l_timeout.tv_sec+=15;
pthread_create( &s_threads[i].tid, NULL, dap_worker_thread, l_worker );
int l_ret;
l_ret=pthread_cond_timedwait(&l_worker->started_cond, &l_worker->started_mutex, &l_timeout);
pthread_mutex_unlock(&l_worker->started_mutex);
......
......@@ -170,7 +170,7 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events,
l_ret->socket = a_sock;
l_ret->events = a_events;
l_ret->uuid = dap_uuid_generate_uint128();
l_ret->uuid = dap_uuid_generate_uint64();
if (a_callbacks)
memcpy(&l_ret->callbacks, a_callbacks, sizeof(l_ret->callbacks) );
l_ret->flags = DAP_SOCK_READY_TO_READ;
......@@ -196,7 +196,7 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events,
dap_events_socket_t * l_es_find = NULL;
HASH_FIND_INT( a_events->sockets, &a_sock, l_es_find );
if(l_es_find)
log_it(L_ERROR,"Trying to add %d descriptor in hashtable but found %p esocket with same socket", a_sock, l_es_find);
log_it(L_ERROR,"Trying to add socket %d to hashtable but found %p esocket with same socket", a_sock, l_es_find);
else
HASH_ADD_INT(a_events->sockets, socket, l_ret);
pthread_rwlock_unlock(&a_events->sockets_rwlock);
......@@ -288,7 +288,7 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c
l_es->type = DESCRIPTOR_TYPE_PIPE;
l_es->worker = a_w;
l_es->events = a_w->events;
l_es->uuid = dap_uuid_generate_uint128();
l_es->uuid = dap_uuid_generate_uint64();
l_es->callbacks.read_callback = a_callback; // Arm event callback
#if defined(DAP_EVENTS_CAPS_EPOLL)
l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP;
......@@ -450,7 +450,7 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket
l_es->buf_in = DAP_NEW_Z_SIZE(byte_t,l_es->buf_in_size_max );
//l_es->buf_out_size = 8 * sizeof(void*);
l_es->events = a_es->events;
l_es->uuid = dap_uuid_generate_uint128();
l_es->uuid = dap_uuid_generate_uint64();
#if defined(DAP_EVENTS_CAPS_EPOLL)
l_es->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP;
#elif defined(DAP_EVENTS_CAPS_POLL)
......@@ -507,7 +507,7 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket
WCHAR l_direct_name[MQ_MAX_Q_NAME_LEN] = { 0 };
int pos = 0;
#ifdef DAP_BRAND
pos = _snwprintf_s(l_direct_name, sizeof(l_direct_name)/sizeof(l_direct_name[0]), _TRUNCATE, L"DIRECT=OS:.\\PRIVATE$\\" DAP_BRAND "_esmq%d", l_es->mq_num);
pos = _snwprintf_s(l_direct_name, sizeof(l_direct_name)/sizeof(l_direct_name[0]), _TRUNCATE, L"DIRECT=OS:.\\PRIVATE$\\" DAP_BRAND "mq%d", l_es->mq_num);
#else
pos = _snwprintf_s(l_direct_name, sizeof(l_direct_name)/sizeof(l_direct_name[0]), _TRUNCATE, L"DIRECT=OS:.\\PRIVATE$\\%hs_esmq%d", dap_get_appname(), l_es->mq_num);
#endif
......@@ -560,7 +560,7 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc
}
l_es->type = DESCRIPTOR_TYPE_QUEUE;
l_es->flags = DAP_SOCK_QUEUE_PTR;
l_es->uuid = dap_uuid_generate_uint128();
l_es->uuid = dap_uuid_generate_uint64();
if (a_w){
l_es->events = a_w->events;
l_es->worker = a_w;
......@@ -711,7 +711,7 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc
static atomic_uint s_queue_num = 0;
int pos = 0;
#ifdef DAP_BRAND
pos = _snwprintf_s(l_pathname, sizeof(l_pathname)/sizeof(l_pathname[0]), _TRUNCATE, L".\\PRIVATE$\\" DAP_BRAND "_esmq%d", l_es->mq_num = s_queue_num++);
pos = _snwprintf_s(l_pathname, sizeof(l_pathname)/sizeof(l_pathname[0]), _TRUNCATE, L".\\PRIVATE$\\" DAP_BRAND "mq%d", l_es->mq_num = s_queue_num++);
#else
pos = _snwprintf_s(l_pathname, sizeof(l_pathname)/sizeof(l_pathname[0]), _TRUNCATE, L".\\PRIVATE$\\%hs_esmq%d", dap_get_appname(), l_es->mq_num = s_queue_num++);
#endif
......@@ -926,7 +926,7 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_
l_es->buf_out_size_max = l_es->buf_in_size_max = 1;
l_es->buf_out = DAP_NEW_Z_SIZE(byte_t, l_es->buf_out_size_max);
l_es->type = DESCRIPTOR_TYPE_EVENT;
l_es->uuid = dap_uuid_generate_uint128();
l_es->uuid = dap_uuid_generate_uint64();
if (a_w){
l_es->events = a_w->events;
l_es->worker = a_w;
......@@ -1431,9 +1431,8 @@ int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value
*/
void dap_events_socket_queue_on_remove_and_delete(dap_events_socket_t* a_es)
{
dap_events_socket_handler_t * l_es_handler= DAP_NEW_Z(dap_events_socket_handler_t);
l_es_handler->esocket = a_es;
l_es_handler->uuid = a_es->uuid;
dap_events_socket_handle_t * l_es_handler= DAP_NEW_Z(dap_events_socket_handle_t);
l_es_handler->esocket_uuid = a_es->uuid;
int l_ret= dap_events_socket_queue_ptr_send( a_es->worker->queue_es_delete, l_es_handler );
if( l_ret != 0 ){
......@@ -1463,7 +1462,7 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da
l_es->socket = a_sock;
l_es->events = a_events;
l_es->server = a_server;
l_es->uuid = dap_uuid_generate_uint128();
l_es->uuid = dap_uuid_generate_uint64();
memcpy(&l_es->callbacks,a_callbacks, sizeof ( l_es->callbacks) );
l_es->buf_out_size_max = l_es->buf_in_size_max = DAP_EVENTS_SOCKET_BUF;
l_es->buf_in = a_callbacks->timer_callback ? NULL : DAP_NEW_Z_SIZE(byte_t, l_es->buf_in_size_max+1);
......@@ -1479,25 +1478,24 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da
return l_es;
}
/**
* @brief dap_events_socket_find
* @param sock
* @param sh
* @brief dap_worker_esocket_find_uuid
* @param a_worker
* @param a_es_uuid
* @return
*/
dap_events_socket_t *dap_events_socket_find_unsafe( int sock, struct dap_events *a_events )
dap_events_socket_t *dap_worker_esocket_find_uuid(dap_worker_t * a_worker, dap_events_socket_uuid_t a_es_uuid )
{
// Why we have only unsafe socket? Because you need to lock sockets_rwlock when do any operations with
// socket that you've find in global list
dap_events_socket_t *ret = NULL;
if(!a_events)
return NULL;
if(a_events->sockets) {
pthread_rwlock_rdlock(&a_events->sockets_rwlock);
HASH_FIND_INT( a_events->sockets, &sock, ret );
pthread_rwlock_unlock(&a_events->sockets_rwlock);
assert(a_worker);
dap_events_socket_t * l_ret = NULL;
if(a_worker->esockets ) {
pthread_rwlock_rdlock(&a_worker->esocket_rwlock);
//HASH_FIND_PTR( a_worker->esockets, &a_es_uuid,l_ret );
HASH_FIND(hh_worker, a_worker->esockets, &a_es_uuid, sizeof(a_es_uuid), l_ret );
pthread_rwlock_unlock(&a_worker->esocket_rwlock );
}
return ret;
return l_ret;
}
void dap_events_socket_worker_poll_update_unsafe(dap_events_socket_t * a_esocket)
......@@ -1735,12 +1733,15 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool
bool s_remove_and_delete_unsafe_delayed_delete_callback(void * a_arg)
{
dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default());
dap_events_socket_handler_t * l_es_handler = (dap_events_socket_handler_t*) a_arg;
dap_events_socket_handle_t * l_es_handler = (dap_events_socket_handle_t*) a_arg;
assert(l_es_handler);
assert(l_worker);
if(dap_events_socket_check_uuid_unsafe(l_worker, l_es_handler->esocket, l_es_handler->uuid))
dap_events_socket_remove_and_delete_unsafe(l_es_handler->esocket,l_es_handler->value == 1);
dap_events_socket_t * l_es;
if( (l_es = dap_worker_esocket_find_uuid(l_worker, l_es_handler->esocket_uuid)) != NULL)
//dap_events_socket_remove_and_delete_unsafe(l_es,l_es_handler->value == 1);
dap_events_remove_and_delete_socket_unsafe(dap_events_get_default(), l_es, l_es_handler->value == 1);
DAP_DELETE(l_es_handler);
return false;
}
......@@ -1751,14 +1752,14 @@ bool s_remove_and_delete_unsafe_delayed_delete_callback(void * a_arg)
*/
void dap_events_socket_remove_and_delete_unsafe_delayed( dap_events_socket_t *a_es, bool a_preserve_inheritor )
{
dap_events_socket_handler_t * l_es_handler = DAP_NEW_Z(dap_events_socket_handler_t);
l_es_handler->esocket = a_es;
l_es_handler->uuid = a_es->uuid;
dap_events_socket_handle_t * l_es_handler = DAP_NEW_Z(dap_events_socket_handle_t);
l_es_handler->esocket_uuid = a_es->uuid;
l_es_handler->value = a_preserve_inheritor ? 1 : 0;
dap_events_socket_descriptor_close(a_es);
dap_worker_t * l_worker = a_es->worker;
dap_events_socket_remove_from_worker_unsafe( a_es, l_worker);
a_es->flags |= DAP_SOCK_SIGNAL_CLOSE;
dap_timerfd_start_on_worker(l_worker, s_delayed_ops_timeout_ms,
s_remove_and_delete_unsafe_delayed_delete_callback, l_es_handler );
}
......@@ -1820,31 +1821,7 @@ void dap_events_socket_descriptor_close(dap_events_socket_t *a_esocket)
*/
void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_preserve_inheritor)
{
#ifdef DAP_EVENTS_CAPS_KQUEUE
if( a_esocket->type != DESCRIPTOR_TYPE_EVENT
&& a_esocket->type != DESCRIPTOR_TYPE_TIMER
&& a_esocket->type != DESCRIPTOR_TYPE_QUEUE )
#endif
if (a_esocket->events){ // It could be socket NOT from events
if (a_esocket->socket != -1 && a_esocket->socket != 0 ){
if(!dap_events_socket_find_unsafe(a_esocket->socket, a_esocket->events)){
if(s_debug_reactor)
log_it(L_ERROR, "esocket %d type %d already deleted", a_esocket->socket, a_esocket->type);
/*dap_events_socket_t * es1 = NULL, *es2;
HASH_ITER(hh, a_esocket->events->sockets, es1, es2) {
log_it(L_INFO, "Table: socket %d", es1->socket);
}*/
return ;
}
if(a_esocket->events->sockets) {
pthread_rwlock_wrlock( &a_esocket->events->sockets_rwlock );
HASH_DEL( a_esocket->events->sockets, a_esocket );
pthread_rwlock_unlock( &a_esocket->events->sockets_rwlock );
}
}
}
dap_events_socket_descriptor_close(a_esocket);
if (!a_preserve_inheritor )
DAP_DEL_Z(a_esocket->_inheritor)
......@@ -1853,8 +1830,6 @@ void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_p
DAP_DEL_Z(a_esocket->buf_out)
DAP_DEL_Z(a_esocket->remote_addr_str)
dap_events_socket_descriptor_close(a_esocket);
DAP_DEL_Z( a_esocket )
}
......@@ -1868,6 +1843,12 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap
log_it(L_INFO, "No worker assigned to esocket %d", a_es->socket);
return;
}
pthread_rwlock_wrlock(&a_worker->esocket_rwlock);
a_worker->event_sockets_count--;
HASH_DELETE(hh_worker,a_worker->esockets, a_es);
pthread_rwlock_unlock(&a_worker->esocket_rwlock);
#if defined(DAP_EVENTS_CAPS_EPOLL)
if ( epoll_ctl( a_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 ) {
......@@ -1928,75 +1909,23 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap
#else
#error "Unimplemented new esocket on worker callback for current platform"
#endif
if( a_es->socket != 0 && a_es->socket != -1 ) {
pthread_rwlock_wrlock(&a_worker->esocket_rwlock);
a_worker->event_sockets_count--;
HASH_DELETE(hh_worker,a_worker->esockets, a_es);
pthread_rwlock_unlock(&a_worker->esocket_rwlock);
}
a_es->worker = NULL;
}
/**
* @brief dap_events_socket_check_uuid_unsafe
* @param a_worker
* @param a_es
* @param a_es_uuid
* @return
*/
bool dap_events_socket_check_uuid_unsafe(dap_worker_t * a_worker,dap_events_socket_t * a_es, uint128_t a_es_uuid)
{
if (a_es){
if ( a_worker->esockets){
dap_events_socket_t * l_es = NULL;
bool l_ret;
pthread_rwlock_rdlock(&a_worker->esocket_rwlock);
HASH_FIND(hh_worker,a_worker->esockets,&a_es, sizeof(void*), l_es );
l_ret = ( l_es == a_es && dap_uint128_check_equal(l_es->uuid,a_es_uuid) );
pthread_rwlock_unlock(&a_worker->esocket_rwlock);
return l_ret;
}else
return false;
}else
return false;
}
/**
* @brief dap_events_socket_check_unsafe
* @param a_worker
* @param a_es
* @return
*/
bool dap_events_socket_check_unsafe(dap_worker_t * a_worker,dap_events_socket_t * a_es)
{
if (a_es){
if ( a_worker->esockets){
dap_events_socket_t * l_es = NULL;
pthread_rwlock_rdlock(&a_worker->esocket_rwlock);
HASH_FIND(hh_worker,a_worker->esockets,&a_es, sizeof(void*), l_es );
pthread_rwlock_unlock(&a_worker->esocket_rwlock);
return l_es == a_es;
}else
return false;
}else
return false;
}
/**
* @brief dap_events_socket_remove_and_delete
* @param a_es
* @param preserve_inheritor
* @param a_w
* @param a_es_uuid
*/
void dap_events_socket_remove_and_delete_mt(dap_worker_t * a_w, dap_events_socket_t *a_es )
void dap_events_socket_remove_and_delete_mt(dap_worker_t * a_w, dap_events_socket_uuid_t a_es_uuid )
{
assert(a_w);
dap_events_socket_handler_t * l_es_handler= DAP_NEW_Z(dap_events_socket_handler_t);
l_es_handler->esocket = a_es;
if(a_es)
l_es_handler->uuid = a_es->uuid;
dap_events_socket_handle_t * l_es_handler= DAP_NEW_Z(dap_events_socket_handle_t);
l_es_handler->esocket_uuid = a_es_uuid;
if(dap_events_socket_queue_ptr_send( a_w->queue_es_delete, l_es_handler ) != 0 ){
log_it(L_ERROR,"Can't send %d fd in queue", a_es? a_es->fd : -1);
log_it(L_ERROR,"Can't send %llu fd in queue",a_es_uuid);
DAP_DELETE(l_es_handler);
}
}
......@@ -2004,15 +1933,13 @@ void dap_events_socket_remove_and_delete_mt(dap_worker_t * a_w, dap_events_sock
/**
* @brief dap_events_socket_set_readable_mt
* @param a_w
* @param a_es
* @param a_es_uuid
* @param a_is_ready
*/
void dap_events_socket_set_readable_mt(dap_worker_t * a_w, dap_events_socket_t * a_es,bool a_is_ready)
void dap_events_socket_set_readable_mt(dap_worker_t * a_w, dap_events_socket_uuid_t a_es_uuid,bool a_is_ready)
{
dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); if (! l_msg) return;
l_msg->esocket = a_es;
if(a_es)
l_msg->esocket_uuid = a_es->uuid;
l_msg->esocket_uuid = a_es_uuid;
if (a_is_ready)
l_msg->flags_set = DAP_SOCK_READY_TO_READ;
else
......@@ -2027,15 +1954,15 @@ void dap_events_socket_set_readable_mt(dap_worker_t * a_w, dap_events_socket_t *
/**
* @brief dap_events_socket_set_writable_mt
* @param sc
* @param is_ready
* @param a_w
* @param a_es_uuid
* @param a_is_ready
*/
void dap_events_socket_set_writable_mt(dap_worker_t * a_w, dap_events_socket_t * a_es,bool a_is_ready)
void dap_events_socket_set_writable_mt(dap_worker_t * a_w, dap_events_socket_uuid_t a_es_uuid, bool a_is_ready)
{
dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); if (!l_msg) return;
l_msg->esocket = a_es;
if(a_es)
l_msg->esocket_uuid = a_es->uuid;
l_msg->esocket_uuid = a_es_uuid;
if (a_is_ready)
l_msg->flags_set = DAP_SOCK_READY_TO_WRITE;
else
......@@ -2051,16 +1978,14 @@ void dap_events_socket_set_writable_mt(dap_worker_t * a_w, dap_events_socket_t *
/**
* @brief dap_events_socket_write_inter
* @param a_es_input
* @param a_es
* @param a_es_uuid
* @param a_data
* @param a_data_size
* @return
*/
size_t dap_events_socket_write_inter(dap_events_socket_t * a_es_input, dap_events_socket_t *a_es,uint128_t a_es_uuid, const void * a_data, size_t a_data_size)
size_t dap_events_socket_write_inter(dap_events_socket_t * a_es_input, dap_events_socket_uuid_t a_es_uuid, const void * a_data, size_t a_data_size)
{
dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); if( !l_msg) return 0;
l_msg->esocket = a_es;
l_msg->esocket_uuid = a_es_uuid;
l_msg->data = DAP_NEW_SIZE(void,a_data_size);
l_msg->data_size = a_data_size;
......@@ -2080,12 +2005,11 @@ size_t dap_events_socket_write_inter(dap_events_socket_t * a_es_input, dap_event
/**
* @brief dap_events_socket_write_f_inter
* @param a_es_input
* @param a_es
* @param a_es_uuid
* @param a_format
* @return
*/
size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_events_socket_t *a_es,uint128_t a_es_uuid, const char * a_format,...)
size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_events_socket_uuid_t a_es_uuid, const char * a_format,...)
{
va_list ap, ap_copy;
va_start(ap,a_format);
......@@ -2099,7 +2023,6 @@ size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_eve
}
dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t);
l_msg->esocket = a_es;
l_msg->esocket_uuid = a_es_uuid;
l_msg->data = DAP_NEW_SIZE(void,l_data_size);
l_msg->data_size = l_data_size;
......@@ -2118,17 +2041,16 @@ size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_eve
/**
* @brief dap_events_socket_write_mt
* @param sc
* @param data
* @param data_size
* @param a_w
* @param a_es_uuid
* @param a_data
* @param l_data_size
* @return
*/
size_t dap_events_socket_write_mt(dap_worker_t * a_w,dap_events_socket_t *a_es, const void * data, size_t l_data_size)
size_t dap_events_socket_write_mt(dap_worker_t * a_w,dap_events_socket_uuid_t a_es_uuid, const void * data, size_t l_data_size)
{
dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); if (!l_msg) return 0;
l_msg->esocket = a_es;
if(a_es)
l_msg->esocket_uuid = a_es->uuid;
l_msg->esocket_uuid = a_es_uuid;
l_msg->data = DAP_NEW_SIZE(void,l_data_size);
l_msg->data_size = l_data_size;
l_msg->flags_set = DAP_SOCK_READY_TO_WRITE;
......@@ -2145,32 +2067,30 @@ size_t dap_events_socket_write_mt(dap_worker_t * a_w,dap_events_socket_t *a_es,
/**
* @brief dap_events_socket_write_f_mt
* @param a_es
* @param format
* @param a_es_uuid
* @param a_format
* @return
*/
size_t dap_events_socket_write_f_mt(dap_worker_t * a_w,dap_events_socket_t *a_es, const char * format,...)
size_t dap_events_socket_write_f_mt(dap_worker_t * a_w,dap_events_socket_uuid_t a_es_uuid, const char * a_format,...)
{
va_list ap, ap_copy;
va_start(ap,format);
va_start(ap,a_format);
va_copy(ap_copy, ap);
int l_data_size = dap_vsnprintf(NULL,0,format,ap);
int l_data_size = dap_vsnprintf(NULL,0,a_format,ap);
va_end(ap);
if (l_data_size <0 ){
log_it(L_ERROR,"Can't write out formatted data '%s' with values",format);
log_it(L_ERROR,"Can't write out formatted data '%s' with values",a_format);
va_end(ap_copy);
return 0;
}
dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t);
l_msg->esocket = a_es;
if(a_es)
l_msg->esocket_uuid = a_es->uuid;
l_msg->esocket_uuid = a_es_uuid;
l_msg->data = DAP_NEW_SIZE(void,l_data_size + 1);
l_msg->flags_set = DAP_SOCK_READY_TO_WRITE;
l_data_size = dap_vsprintf(l_msg->data,format,ap_copy);
l_data_size = dap_vsprintf(l_msg->data,a_format,ap_copy);
va_end(ap_copy);
if (l_data_size <0 ){
log_it(L_ERROR,"Write f mt: can't write out formatted data '%s' with values",format);
log_it(L_ERROR,"Write f mt: can't write out formatted data '%s' with values",a_format);
DAP_DELETE(l_msg->data);
DAP_DELETE(l_msg);
return 0;
......@@ -2187,24 +2107,23 @@ size_t dap_events_socket_write_f_mt(dap_worker_t * a_w,dap_events_socket_t *a_es
}
/**
* @brief dap_events_socket_write Write data to the client
* @param sc Conn instance
* @param data Pointer to data
* @param data_size Size of data to write
* @return Number of bytes that were placed into the buffer
* @brief dap_events_socket_write_unsafe
* @param a_es
* @param a_data
* @param a_data_size
* @return
*/
size_t dap_events_socket_write_unsafe(dap_events_socket_t *sc, const void * data, size_t data_size)
size_t dap_events_socket_write_unsafe(dap_events_socket_t *a_es, const void * a_data, size_t a_data_size)
{
if(sc->buf_out_size > sc->buf_out_size_max){
log_it(L_DEBUG,"write buffer already overflow size=%u/max=%u", sc->buf_out_size, sc->buf_out_size_max);
if(a_es->buf_out_size > a_es->buf_out_size_max){
log_it(L_DEBUG,"write buffer already overflow size=%u/max=%u", a_es->buf_out_size, a_es->buf_out_size_max);
return 0;
}
//log_it(L_DEBUG,"dap_events_socket_write %u sock data %X size %u", sc->socket, data, data_size );
data_size = (sc->buf_out_size + data_size < sc->buf_out_size_max) ? data_size : (sc->buf_out_size_max - sc->buf_out_size);
memcpy(sc->buf_out + sc->buf_out_size, data, data_size);
sc->buf_out_size += data_size;
dap_events_socket_set_writable_unsafe(sc, true);
return data_size;
a_data_size = (a_es->buf_out_size + a_data_size < a_es->buf_out_size_max) ? a_data_size : (a_es->buf_out_size_max - a_es->buf_out_size);
memcpy(a_es->buf_out + a_es->buf_out_size, a_data, a_data_size);
a_es->buf_out_size += a_data_size;
dap_events_socket_set_writable_unsafe(a_es, true);
return a_data_size;
}
/**
......@@ -2215,8 +2134,6 @@ size_t dap_events_socket_write_unsafe(dap_events_socket_t *sc, const void * data
*/
size_t dap_events_socket_write_f_unsafe(dap_events_socket_t *a_es, const char * a_format,...)
{
//log_it(L_DEBUG,"dap_events_socket_write_f %u sock", sc->socket );
size_t l_max_data_size = a_es->buf_out_size_max - a_es->buf_out_size;
if (! l_max_data_size)
return 0;
......@@ -2239,26 +2156,24 @@ size_t dap_events_socket_write_f_unsafe(dap_events_socket_t *a_es, const char *
}
/**
* @brief dap_events_socket_pop_from_buf_in Read data from input buffer
* @param sc Conn instasnce
* @param data Pointer to memory where to store the data
* @param data_size Size of data to read
* @return Actual bytes number that were read
* @brief dap_events_socket_pop_from_buf_in
* @param a_essc
* @param a_data
* @param a_data_size
* @return
*/
size_t dap_events_socket_pop_from_buf_in(dap_events_socket_t *sc, void *data, size_t data_size)
size_t dap_events_socket_pop_from_buf_in(dap_events_socket_t *a_es, void *a_data, size_t a_data_size)
{
// log_it(L_DEBUG,"dap_events_socket_read %u sock data %X size %u", sc->socket, data, data_size );
if(data_size<sc->buf_in_size){
memcpy(data,sc->buf_in,data_size);
memmove(data,sc->buf_in+data_size,sc->buf_in_size-data_size);
if(a_data_size<a_es->buf_in_size){
memcpy(a_data,a_es->buf_in,a_data_size);
memmove(a_data,a_es->buf_in+a_data_size,a_es->buf_in_size-a_data_size);
}else{
if(data_size>sc->buf_in_size)
data_size=sc->buf_in_size;
memcpy(data,sc->buf_in,data_size);
if(a_data_size>a_es->buf_in_size)
a_data_size=a_es->buf_in_size;
memcpy(a_data,a_es->buf_in,a_data_size);
}
sc->buf_in_size-=data_size;
return data_size;
a_es->buf_in_size-=a_data_size;
return a_data_size;
}
......
......@@ -950,17 +950,16 @@ bool dap_proc_thread_assign_on_worker_inter(dap_proc_thread_t * a_thread, dap_wo
* @brief dap_proc_thread_esocket_write_inter
* @param a_thread
* @param a_worker
* @param a_es
* @param a_es_uuid
* @param a_data
* @param a_data_size
* @return
*/
int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_es,uint128_t a_es_uuid,
int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_uuid_t a_es_uuid,
const void * a_data, size_t a_data_size)
{
dap_events_socket_t * l_es_io_input = a_thread->queue_io_input[a_worker->id];
dap_events_socket_write_inter(l_es_io_input,a_es,a_es_uuid, a_data, a_data_size);
dap_events_socket_write_inter(l_es_io_input,a_es_uuid, a_data, a_data_size);
// TODO Make this code platform-independent
#ifndef DAP_EVENTS_CAPS_EVENT_KEVENT
l_es_io_input->flags |= DAP_SOCK_READY_TO_WRITE;
......@@ -974,12 +973,11 @@ int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_
* @brief dap_proc_thread_esocket_write_f_inter
* @param a_thread
* @param a_worker
* @param a_es
* @param a_es_uuid,
* @param a_format
* @return
*/
int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_es,uint128_t a_es_uuid,
int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_uuid_t a_es_uuid,
const char * a_format,...)
{
va_list ap, ap_copy;
......@@ -1002,7 +1000,7 @@ int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worke
l_data_size = dap_vsprintf(l_data,a_format,ap_copy);
va_end(ap_copy);
dap_events_socket_write_inter(l_es_io_input,a_es, a_es_uuid, l_data, l_data_size);
dap_events_socket_write_inter(l_es_io_input, a_es_uuid, l_data, l_data_size);
// TODO Make this code platform-independent
#ifndef DAP_EVENTS_CAPS_EVENT_KEVENT
l_es_io_input->flags |= DAP_SOCK_READY_TO_WRITE;
......
......@@ -104,7 +104,7 @@ void dap_server_delete(dap_server_t *a_server)
{
while (a_server->es_listeners) {
dap_events_socket_t *l_es = (dap_events_socket_t *)a_server->es_listeners->data;
dap_events_socket_remove_and_delete_mt(l_es->worker, l_es);
dap_events_socket_remove_and_delete_mt(l_es->worker, l_es->uuid); // TODO unsafe moment. Replace storage to uuids
dap_list_t *l_tmp = a_server->es_listeners;
a_server->es_listeners = l_tmp->next;
DAP_DELETE(l_tmp);
......
......@@ -43,6 +43,9 @@
#define LOG_TAG "dap_timerfd"
static void s_es_callback_timer(struct dap_events_socket *a_event_sock);
#ifdef DAP_OS_WINDOWS
static HANDLE hTimerQueue = NULL;
#endif
/**
* @brief dap_events_socket_init Init clients module
......@@ -50,6 +53,13 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock);
*/
int dap_timerfd_init()
{
#ifdef DAP_OS_WINDOWS
hTimerQueue = CreateTimerQueue();
if (!hTimerQueue) {
log_it(L_CRITICAL, "Timer queue failed, err %d", GetLastError());
return -4;
}
#endif
log_it(L_NOTICE, "Initialized timerfd");
return 0;
}
......@@ -74,6 +84,14 @@ void __stdcall TimerAPCb(void* arg, DWORD low, DWORD high) { // Timer high valu
log_it(L_CRITICAL, "Error occured on writing into socket from APC, errno: %d", WSAGetLastError());
}
}
void __stdcall TimerRoutine(void* arg, BOOLEAN flag) {
UNREFERENCED_PARAMETER(flag)
dap_timerfd_t *l_timerfd = (dap_timerfd_t *)arg;
if (dap_sendto(l_timerfd->tfd, l_timerfd->port, NULL, 0) == SOCKET_ERROR) {
log_it(L_CRITICAL, "Error occured on writing into socket from timer routine, errno: %d", WSAGetLastError());
}
}
#endif
......@@ -139,6 +157,7 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t
l_timerfd->callback = a_callback;
l_timerfd->callback_arg = a_callback_arg;
l_timerfd->events_socket = l_events_socket;
l_timerfd->esocket_uuid = l_events_socket->uuid;
#if defined DAP_OS_LINUX
struct itimerspec l_ts;
......@@ -178,13 +197,13 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t
#elif defined (DAP_OS_WINDOWS)
HANDLE l_th = CreateWaitableTimer(NULL, true, NULL);
/*HANDLE l_th = CreateWaitableTimer(NULL, true, NULL);
if (!l_th) {
log_it(L_CRITICAL, "Waitable timer not created, error %d", GetLastError());
DAP_DELETE(l_timerfd);
return NULL;
}
}*/
l_timerfd->th = NULL;
SOCKET l_tfd = socket(AF_INET, SOCK_DGRAM, 0);
int buffsize = 1024;
......@@ -209,13 +228,19 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t
//log_it(L_DEBUG, "Bound to port %d", l_addr.sin_port);
}
LARGE_INTEGER l_due_time;
/*LARGE_INTEGER l_due_time;
l_due_time.QuadPart = (long long)a_timeout_ms * _MSEC;
if (!SetWaitableTimer(l_th, &l_due_time, 0, TimerAPCb, l_timerfd, false)) {
log_it(L_CRITICAL, "Waitable timer not set, error %d", GetLastError());
CloseHandle(l_th);
DAP_DELETE(l_timerfd);
return NULL;
} */
if (!CreateTimerQueueTimer(&l_timerfd->th, hTimerQueue,
(WAITORTIMERCALLBACK)TimerRoutine, l_timerfd, (unsigned long)a_timeout_ms, 0, 0)) {
log_it(L_CRITICAL, "Timer not set, error %d", GetLastError());
DAP_DELETE(l_timerfd);
return NULL;
}
l_events_socket->socket = l_tfd;
#endif
......@@ -223,9 +248,9 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t
#if defined (DAP_OS_LINUX) || defined (DAP_OS_WINDOWS)
l_timerfd->tfd = l_tfd;
#endif
#ifdef DAP_OS_WINDOWS
l_timerfd->th = l_th;
#endif
//#ifdef DAP_OS_WINDOWS
//l_timerfd->th = l_th;
//#endif
return l_timerfd;
}
......@@ -256,12 +281,12 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock)
//EV_SET(l_event, 0, a_event_sock->kqueue_base_filter, a_event_sock->kqueue_base_flags,a_event_sock->kqueue_base_fflags,a_event_sock->kqueue_data,a_event_sock);
//kevent(a_event_sock->worker->kqueue_fd,l_event,1,NULL,0,NULL);
#elif defined (DAP_OS_WINDOWS)
LARGE_INTEGER l_due_time;
/*LARGE_INTEGER l_due_time;
l_due_time.QuadPart = (long long)l_timerfd->timeout_ms * _MSEC;
if (!SetWaitableTimer(l_timerfd->th, &l_due_time, 0, TimerAPCb, l_timerfd, false)) {
log_it(L_CRITICAL, "Waitable timer not reset, error %d", GetLastError());
CloseHandle(l_timerfd->th);
}
}*/ // Wtf is this entire thing for?...
#else
#error "No timer callback realization for your platform"
#endif
......@@ -275,7 +300,7 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock)
close(l_timerfd->tfd);
#elif defined(DAP_OS_WINDOWS)
closesocket(l_timerfd->tfd);
CloseHandle(l_timerfd->th);
//CloseHandle(l_timerfd->th);
#endif
l_timerfd->events_socket->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
......@@ -288,5 +313,5 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock)
*/
void dap_timerfd_delete(dap_timerfd_t *l_timerfd)
{
dap_events_socket_remove_and_delete_mt(l_timerfd->events_socket->worker, l_timerfd->events_socket);
dap_events_socket_remove_and_delete_mt(l_timerfd->events_socket->worker, l_timerfd->esocket_uuid);
}
......@@ -833,8 +833,8 @@ void *dap_worker_thread(void *arg)
// Here we expect thats event duplicates goes together in it. If not - we lose some events between.
}
}
dap_events_socket_remove_and_delete_unsafe( l_cur, false);
//dap_events_socket_remove_and_delete_unsafe( l_cur, false);
dap_events_remove_and_delete_socket_unsafe(dap_events_get_default(), l_cur, false);
#ifdef DAP_EVENTS_CAPS_KQUEUE
l_worker->kqueue_events_count--;
#endif
......@@ -916,7 +916,7 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg)
if(l_es_new->socket!=0 && l_es_new->socket != -1)
#endif
if(dap_events_socket_check_unsafe( l_worker, l_es_new)){
if(dap_worker_esocket_find_uuid( l_worker, l_es_new->uuid)){
// Socket already present in worker, it's OK
return;
}
......@@ -950,12 +950,11 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg)
if ( l_ret != 0 ){
log_it(L_CRITICAL,"Can't add event socket's handler to worker i/o poll mechanism with error %d", errno);
}else{
// Add in global list
// Add in worker
l_es_new->me = l_es_new;
if (l_es_new->socket!=0 && l_es_new->socket != -1){
pthread_rwlock_wrlock(&l_worker->esocket_rwlock);
HASH_ADD(hh_worker, l_worker->esockets, me, sizeof(void *), l_es_new );
HASH_ADD(hh_worker, l_worker->esockets, uuid, sizeof(l_es_new->uuid), l_es_new );
l_worker->event_sockets_count++;
pthread_rwlock_unlock(&l_worker->esocket_rwlock);
}
......@@ -973,13 +972,14 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg)
*/
static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg)
{
dap_events_socket_handler_t * l_es_handler = (dap_events_socket_handler_t*) a_arg;
dap_events_socket_handle_t * l_es_handler = (dap_events_socket_handle_t*) a_arg;
assert(l_es_handler);
dap_events_socket_t * l_esocket = (dap_events_socket_t*) l_es_handler->esocket;
if (dap_events_socket_check_uuid_unsafe (a_es->worker,l_esocket, l_es_handler->uuid)){
l_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; // Send signal to socket to kill
dap_events_socket_t * l_es;
if ( (l_es = dap_worker_esocket_find_uuid(a_es->worker,l_es_handler->esocket_uuid)) != NULL ){
//l_es->flags |= DAP_SOCK_SIGNAL_CLOSE; // Send signal to socket to kill
dap_events_socket_remove_and_delete_unsafe_delayed(l_es,false);
}else
log_it(L_INFO, "While we were sending the delete() message, esocket %p has been disconnected ", l_esocket);
log_it(L_INFO, "While we were sending the delete() message, esocket %llu has been disconnected ", l_es_handler->esocket_uuid);
DAP_DELETE(l_es_handler);
}
......@@ -990,9 +990,13 @@ static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg
*/
static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_arg)
{
assert(a_es);
dap_worker_t * l_worker = a_es->worker;
assert(l_worker);
dap_worker_msg_reassign_t * l_msg = (dap_worker_msg_reassign_t*) a_arg;
dap_events_socket_t * l_es_reassign = l_msg->esocket;
if (dap_events_socket_check_uuid_unsafe(a_es->worker,l_es_reassign, l_msg->esocket_uuid)){
assert(l_msg);
dap_events_socket_t * l_es_reassign;
if ( ( l_es_reassign = dap_worker_esocket_find_uuid(l_worker, l_msg->esocket_uuid))!= NULL ){
if( l_es_reassign->was_reassigned && l_es_reassign->flags & DAP_SOCK_REASSIGN_ONCE) {
log_it(L_INFO, "Reassgment request with DAP_SOCK_REASSIGN_ONCE allowed only once, declined reassigment from %u to %u",
l_es_reassign->worker->id, l_msg->worker_new->id);
......@@ -1040,15 +1044,18 @@ static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags)
*/
static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg)
{
assert(a_es);
dap_worker_t * l_worker = a_es->worker;
dap_worker_msg_io_t * l_msg = a_arg;
assert(l_msg);
// Check if it was removed from the list
if ( !dap_events_socket_check_uuid_unsafe(a_es->worker,l_msg->esocket,l_msg->esocket_uuid)){
log_it(L_INFO, "We got i/o message for esocket %p thats now not in list. Lost %u data", l_msg->esocket, l_msg->data_size);
dap_events_socket_t *l_msg_es = dap_worker_esocket_find_uuid(l_worker, l_msg->esocket_uuid);
if ( l_msg_es == NULL){
log_it(L_INFO, "We got i/o message for esocket %llu thats now not in list. Lost %u data", l_msg->esocket_uuid, l_msg->data_size);
DAP_DELETE(l_msg);
return;
}
dap_events_socket_t *l_msg_es = l_msg->esocket;
if (l_msg->flags_set & DAP_SOCK_CONNECTING)
if (! (l_msg_es->flags & DAP_SOCK_CONNECTING) ){
l_msg_es->flags |= DAP_SOCK_CONNECTING;
......@@ -1082,7 +1089,7 @@ static bool s_socket_all_check_activity( void * a_arg)
{
dap_worker_t *l_worker = (dap_worker_t*) a_arg;
assert(l_worker);
dap_events_socket_t *l_es, *tmp;
dap_events_socket_t *l_es = NULL, *tmp = NULL;
char l_curtimebuf[64];
time_t l_curtime= time(NULL);
dap_ctime_r(&l_curtime, l_curtimebuf);
......@@ -1096,7 +1103,7 @@ static bool s_socket_all_check_activity( void * a_arg)
if (l_es->callbacks.error_callback) {
l_es->callbacks.error_callback(l_es, ETIMEDOUT);
}
dap_events_socket_remove_and_delete_mt( l_worker, l_es);
dap_events_socket_remove_and_delete_unsafe_delayed(l_es,false);
}
}
}
......
......@@ -55,6 +55,7 @@ void dap_events_deinit( ); // Deinit server module
dap_events_t* dap_events_new( );
dap_events_t* dap_events_get_default( );
void dap_events_delete( dap_events_t * a_events );
void dap_events_remove_and_delete_socket_unsafe(dap_events_t*, dap_events_socket_t*, bool);
int32_t dap_events_start( dap_events_t *a_events );
void dap_events_stop_all();
......
......@@ -170,6 +170,8 @@ typedef struct dap_events_socket_w_data{
size_t size;
} dap_events_socket_w_data_t;
typedef uint64_t dap_events_socket_uuid_t;
typedef struct dap_events_socket {
union {
#ifdef DAP_OS_WINDOWS
......@@ -195,7 +197,7 @@ typedef struct dap_events_socket {
int fd2;
dap_events_desc_type_t type;
uint128_t uuid; // Unique UID
dap_events_socket_uuid_t uuid; // Unique UID
// Related sockets (be careful - possible problems, delete them before )
dap_events_socket_t ** workers_es; // If not NULL - on every worker must be present
size_t workers_es_size; // events socket with same socket
......@@ -281,19 +283,18 @@ typedef struct dap_events_socket {
#define SSL(a) (a ? (WOLFSSL *) (a)->_pvt : NULL)
typedef struct dap_events_socket_handler{
dap_events_socket_t * esocket;
uint128_t uuid;
dap_events_socket_uuid_t esocket_uuid;
struct {
uint64_t value; // some custom data
void * ptr;
};
} dap_events_socket_handler_t;
} dap_events_socket_handle_t;
typedef struct dap_events_socket_handler_hh{
dap_events_socket_t * esocket;
uint128_t uuid;
dap_events_socket_uuid_t uuid;
uint32_t worker_id;
UT_hash_handle hh;
} dap_events_socket_handler_hh_t;
......@@ -338,13 +339,11 @@ void dap_events_socket_assign_on_worker_inter(dap_events_socket_t * a_es_input,
void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, dap_events_socket_t * a_es, dap_worker_t * a_worker_new);
void dap_events_socket_reassign_between_workers_unsafe(dap_events_socket_t * a_es, dap_worker_t * a_worker_new);
dap_events_socket_t * dap_events_socket_find_unsafe(int sock, struct dap_events * sh); // Find client by socket
size_t dap_events_socket_pop_from_buf_in(dap_events_socket_t *sc, void * data, size_t data_size);
// Non-MT functions
bool dap_events_socket_check_unsafe(dap_worker_t * a_worker,dap_events_socket_t * a_es);
bool dap_events_socket_check_uuid_unsafe(dap_worker_t * a_worker,dap_events_socket_t * a_es, uint128_t a_es_uuid);
dap_events_socket_t * dap_worker_esocket_find_uuid(dap_worker_t * a_worker, dap_events_socket_uuid_t a_es_uuid);
void dap_events_socket_set_readable_unsafe(dap_events_socket_t * sc,bool is_ready);
void dap_events_socket_set_writable_unsafe(dap_events_socket_t * sc,bool is_ready);
......@@ -355,16 +354,16 @@ size_t dap_events_socket_write_unsafe(dap_events_socket_t *sc, const void * data
size_t dap_events_socket_write_f_unsafe(dap_events_socket_t *sc, const char * format,...);
// MT variants less
void dap_events_socket_set_readable_mt(dap_worker_t * a_w, dap_events_socket_t * a_es,bool a_is_ready);
void dap_events_socket_set_writable_mt(dap_worker_t * a_w, dap_events_socket_t * a_es,bool a_is_ready);
void dap_events_socket_set_readable_mt(dap_worker_t * a_w, dap_events_socket_uuid_t a_es_uuid, bool a_is_ready);
void dap_events_socket_set_writable_mt(dap_worker_t * a_w, dap_events_socket_uuid_t a_es_uuid, bool a_is_ready);
size_t dap_events_socket_write_mt(dap_worker_t * a_w, dap_events_socket_t *a_es, const void * a_data, size_t a_data_size);
size_t dap_events_socket_write_f_mt(dap_worker_t * a_w, dap_events_socket_t *a_es, const char * a_format,...);
size_t dap_events_socket_write_mt(dap_worker_t * a_w, dap_events_socket_uuid_t a_es_uuid, const void * a_data, size_t a_data_size);
size_t dap_events_socket_write_f_mt(dap_worker_t * a_w, dap_events_socket_uuid_t a_es_uuid, const char * a_format,...);
size_t dap_events_socket_write_inter(dap_events_socket_t * a_es_input, dap_events_socket_t *a_es, uint128_t a_es_uuid, const void * a_data, size_t a_data_size);
size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_events_socket_t *a_es,uint128_t a_es_uuid, const char * a_format,...);
size_t dap_events_socket_write_inter(dap_events_socket_t * a_es_input, dap_events_socket_uuid_t a_es_uuid, const void * a_data, size_t a_data_size);
size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_events_socket_uuid_t a_es_uuid,const char * a_format,...);
void dap_events_socket_remove_and_delete_mt( dap_worker_t * a_w, dap_events_socket_t* a_es);
void dap_events_socket_remove_and_delete_mt( dap_worker_t * a_w, dap_events_socket_uuid_t a_es_uuid);
void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool preserve_inheritor );
// Delayed removed
......
......@@ -74,9 +74,9 @@ dap_events_socket_t * dap_proc_thread_create_queue_ptr(dap_proc_thread_t * a_thr
bool dap_proc_thread_assign_on_worker_inter(dap_proc_thread_t * a_thread, dap_worker_t * a_worker, dap_events_socket_t *a_esocket );
int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_es,uint128_t a_es_uuid,
int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_uuid_t a_es_uuid,
const void * a_data, size_t a_data_size);
int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_es, uint128_t a_es_uuid,
int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_uuid_t a_es_uuid,
const char * a_format,...);
int dap_proc_thread_esocket_update_poll_flags(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket);
......
......@@ -54,6 +54,7 @@ typedef struct dap_timerfd {
int tfd; //timer file descriptor
#endif
dap_events_socket_t *events_socket;
dap_events_socket_uuid_t esocket_uuid;
dap_timerfd_callback_t callback;
void *callback_arg;
#ifdef DAP_OS_WINDOWS
......
......@@ -95,14 +95,13 @@ typedef struct dap_worker
// Message for reassigment
typedef struct dap_worker_msg_reassign{
dap_events_socket_t * esocket;
uint128_t esocket_uuid;
dap_events_socket_uuid_t esocket_uuid;
dap_worker_t * worker_new;
} dap_worker_msg_reassign_t;
// Message for input/output queue
typedef struct dap_worker_msg_io{
dap_events_socket_t * esocket;
uint128_t esocket_uuid;
dap_events_socket_uuid_t esocket_uuid;
size_t data_size;
void *data;
uint32_t flags_set;
......
......@@ -170,7 +170,7 @@ static void s_notify_server_callback_queue(dap_events_socket_t * a_es, void * a_
size_t l_str_len = a_arg? strlen((char*)a_arg): 0;
if(l_str_len){
dap_events_socket_write_inter(a_es->worker->queue_es_io_input[l_worker_id],
l_socket_handler->esocket, l_socket_handler->uuid,
l_socket_handler->uuid,
a_arg,l_str_len+1);
}
}
......