Skip to content
Snippets Groups Projects
Commit 015d222f authored by Dmitriy A. Gerasimov's avatar Dmitriy A. Gerasimov
Browse files

[+] mqueue-based realization for QUEUE_PTR (unfinished)

[*] Fixed callback_queue_ptr() call
parent 380143bc
No related branches found
No related tags found
3 merge requests!251Master,!250Master,!187Release 2.5 version
Pipeline #4785 passed with stage
in 10 seconds
...@@ -15,7 +15,7 @@ add_library(${PROJECT_NAME} STATIC ${DAP_CLIENT_HEADERS} ${DAP_CLIENT_SOURCES}) ...@@ -15,7 +15,7 @@ add_library(${PROJECT_NAME} STATIC ${DAP_CLIENT_HEADERS} ${DAP_CLIENT_SOURCES})
target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_server_core dap_udp_server dap_http_server dap_enc_server dap_stream dap_session dap_stream_ch json-c) target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_server_core dap_udp_server dap_http_server dap_enc_server dap_stream dap_session dap_stream_ch json-c)
if(UNIX AND NOT ANDROID) if(UNIX AND NOT ANDROID)
target_link_libraries(${PROJECT_NAME} ev) target_link_libraries(${PROJECT_NAME} rt)
endif() endif()
......
...@@ -57,6 +57,17 @@ ...@@ -57,6 +57,17 @@
int dap_events_socket_init( ) int dap_events_socket_init( )
{ {
log_it(L_NOTICE,"Initialized events socket module"); log_it(L_NOTICE,"Initialized events socket module");
#if defined (DAP_EVENTS_CAPS_QUEUE_POSIX)
#ifdef DAP_OS_LINUX
#include <sys/time.h>
#include <sys/resource.h>
struct rlimit l_mqueue_limit;
l_mqueue_limit.rlim_cur = 1024;
l_mqueue_limit.rlim_max = 1024;
// setrlimit(RLIMIT_MSGQUEUE,&l_mqueue_limit);
#endif
#endif
return 0; return 0;
} }
...@@ -115,7 +126,7 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, ...@@ -115,7 +126,7 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events,
void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct dap_worker * a_worker) void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct dap_worker * a_worker)
{ {
a_es->last_ping_request = time(NULL); a_es->last_ping_request = time(NULL);
log_it(L_DEBUG, "Assigned %p on worker %u", a_es, a_worker->id); // log_it(L_DEBUG, "Assigned %p on worker %u", a_es, a_worker->id);
dap_worker_add_events_socket(a_es,a_worker); dap_worker_add_events_socket(a_es,a_worker);
} }
...@@ -157,6 +168,7 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c ...@@ -157,6 +168,7 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c
l_es->callbacks.read_callback = a_callback; // Arm event callback l_es->callbacks.read_callback = a_callback; // Arm event callback
l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP;
#if defined(DAP_EVENTS_CAPS_PIPE_POSIX)
int l_pipe[2]; int l_pipe[2];
int l_errno; int l_errno;
char l_errbuf[128]; char l_errbuf[128];
...@@ -170,6 +182,9 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c ...@@ -170,6 +182,9 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c
// log_it(L_DEBUG, "Created one-way unnamed bytestream pipe %d->%d", l_pipe[0], l_pipe[1]); // log_it(L_DEBUG, "Created one-way unnamed bytestream pipe %d->%d", l_pipe[0], l_pipe[1]);
l_es->fd = l_pipe[0]; l_es->fd = l_pipe[0];
l_es->fd2 = l_pipe[1]; l_es->fd2 = l_pipe[1];
#else
#error "No defined s_create_type_pipe() for your platform"
#endif
return l_es; return l_es;
} }
...@@ -236,6 +251,26 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc ...@@ -236,6 +251,26 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc
// log_it(L_DEBUG, "Created one-way unnamed packet pipe %d->%d", l_pipe[0], l_pipe[1]); // log_it(L_DEBUG, "Created one-way unnamed packet pipe %d->%d", l_pipe[0], l_pipe[1]);
l_es->fd = l_pipe[0]; l_es->fd = l_pipe[0];
l_es->fd2 = l_pipe[1]; l_es->fd2 = l_pipe[1];
#elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX)
char l_mq_name[64];
struct mq_attr l_mq_attr ={0};
l_mq_attr.mq_curmsgs = 9;
l_mq_attr.mq_maxmsg = 9; // Don't think we need to hold more than 1024 messages
l_mq_attr.mq_msgsize = sizeof (void *); // We send only pointer on memory,
// so use it with shared memory if you do access from another process
snprintf(l_mq_name,sizeof (l_mq_name),"/dap-%d-esocket-0x%p",getpid(),l_es);
l_es->mqd = mq_open(l_mq_name,O_CREAT|O_RDWR,S_IRWXU, &l_mq_attr);
if (l_es->mqd == -1 ){
int l_errno = errno;
char l_errbuf[128]={0};
strerror_r(l_errno,l_errbuf,sizeof (l_errbuf) );
DAP_DELETE(l_es);
l_es = NULL;
log_it(L_CRITICAL,"Can't create mqueue descriptor %s: \"%s\" code %d",l_mq_name, l_errbuf, l_errno);
}
#else
#error "Not implemented s_create_type_queue_ptr() on your platform"
#endif #endif
return l_es; return l_es;
} }
...@@ -250,6 +285,7 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc ...@@ -250,6 +285,7 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc
dap_events_socket_t * dap_events_socket_create_type_queue_ptr_mt(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback) dap_events_socket_t * dap_events_socket_create_type_queue_ptr_mt(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback)
{ {
dap_events_socket_t * l_es = s_create_type_queue_ptr(a_w, a_callback); dap_events_socket_t * l_es = s_create_type_queue_ptr(a_w, a_callback);
assert(l_es);
// If no worker - don't assign // If no worker - don't assign
if ( a_w) if ( a_w)
dap_events_socket_assign_on_worker_mt(l_es,a_w); dap_events_socket_assign_on_worker_mt(l_es,a_w);
...@@ -266,6 +302,7 @@ dap_events_socket_t * dap_events_socket_create_type_queue_ptr_mt(dap_worker_t * ...@@ -266,6 +302,7 @@ dap_events_socket_t * dap_events_socket_create_type_queue_ptr_mt(dap_worker_t *
dap_events_socket_t * dap_events_socket_create_type_queue_ptr_unsafe(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback) dap_events_socket_t * dap_events_socket_create_type_queue_ptr_unsafe(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback)
{ {
dap_events_socket_t * l_es = s_create_type_queue_ptr(a_w, a_callback); dap_events_socket_t * l_es = s_create_type_queue_ptr(a_w, a_callback);
assert(l_es);
// If no worker - don't assign // If no worker - don't assign
if ( a_w) if ( a_w)
dap_events_socket_assign_on_worker_unsafe(l_es,a_w); dap_events_socket_assign_on_worker_unsafe(l_es,a_w);
...@@ -276,16 +313,29 @@ dap_events_socket_t * dap_events_socket_create_type_queue_ptr_unsafe(dap_worker_ ...@@ -276,16 +313,29 @@ dap_events_socket_t * dap_events_socket_create_type_queue_ptr_unsafe(dap_worker_
* @brief dap_events_socket_queue_proc_input * @brief dap_events_socket_queue_proc_input
* @param a_esocket * @param a_esocket
*/ */
void dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket)
{ {
if (a_esocket->callbacks.queue_callback){ if (a_esocket->callbacks.queue_callback){
if (a_esocket->flags & DAP_SOCK_QUEUE_PTR){ if (a_esocket->flags & DAP_SOCK_QUEUE_PTR){
void * l_queue_ptr = NULL; void * l_queue_ptr = NULL;
#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) #if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2)
if(read( a_esocket->fd, &l_queue_ptr,sizeof (void *)) == sizeof (void *)) if(read( a_esocket->fd, &l_queue_ptr,sizeof (void *)) == sizeof (void *))
a_esocket->callbacks.queue_callback(a_esocket, l_queue_ptr,sizeof(void *)); a_esocket->callbacks.queue_ptr_callback(a_esocket, l_queue_ptr);
else if ( (errno != EAGAIN) && (errno != EWOULDBLOCK) ) // we use blocked socket for now but who knows... else if ( (errno != EAGAIN) && (errno != EWOULDBLOCK) ) // we use blocked socket for now but who knows...
log_it(L_WARNING, "Can't read packet from pipe"); log_it(L_WARNING, "Can't read packet from pipe");
#elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX)
struct timespec s_timeout;
clock_gettime(CLOCK_REALTIME, &s_timeout);
s_timeout.tv_sec+=1;
ssize_t l_ret = mq_timedreceive(a_esocket->mqd,(char*) &l_queue_ptr, sizeof (l_queue_ptr),NULL,&s_timeout );
if (l_ret == -1){
int l_errno = errno;
char l_errbuf[128]={0};
strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
log_it(L_ERROR, "Error in esocket queue_ptr:\"%s\" code %d", l_errbuf, l_errno);
return -1;
}
a_esocket->callbacks.queue_ptr_callback (a_esocket, l_queue_ptr);
#else #else
#error "No Queue fetch mechanism implemented on your platform" #error "No Queue fetch mechanism implemented on your platform"
#endif #endif
...@@ -293,9 +343,11 @@ void dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) ...@@ -293,9 +343,11 @@ void dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket)
size_t l_read = read(a_esocket->socket, a_esocket->buf_in,sizeof(a_esocket->buf_in)); size_t l_read = read(a_esocket->socket, a_esocket->buf_in,sizeof(a_esocket->buf_in));
a_esocket->callbacks.queue_callback(a_esocket,a_esocket->buf_in,l_read ); a_esocket->callbacks.queue_callback(a_esocket,a_esocket->buf_in,l_read );
} }
}else }else{
log_it(L_ERROR, "Queue socket %d accepted data but callback is NULL ", a_esocket->socket); log_it(L_ERROR, "Queue socket %d accepted data but callback is NULL ", a_esocket->socket);
return -1;
}
return 0;
} }
/** /**
...@@ -405,6 +457,16 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg) ...@@ -405,6 +457,16 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg)
return 0; return 0;
else else
return l_errno; return l_errno;
#elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX)
struct timespec l_timeout;
clock_gettime(CLOCK_REALTIME, &l_timeout);
l_timeout.tv_sec+=2; // Not wait more than 1 second to get and 2 to send
int ret = mq_timedsend(a_es->mqd, (const char *)&a_arg,sizeof (a_arg),0, &l_timeout );
int l_errno = errno;
if (ret == sizeof(a_arg) )
return 0;
else
return l_errno;
#else #else
#error "Not implemented dap_events_socket_queue_ptr_send() for this platform" #error "Not implemented dap_events_socket_queue_ptr_send() for this platform"
#endif #endif
......
...@@ -33,12 +33,16 @@ ...@@ -33,12 +33,16 @@
// Caps for different platforms // Caps for different platforms
#if defined(DAP_OS_LINUX) #if defined(DAP_OS_LINUX)
#define DAP_EVENTS_CAPS_EPOLL #define DAP_EVENTS_CAPS_EPOLL
#define DAP_EVENTS_CAPS_PIPE_POSIX
#define DAP_EVENTS_CAPS_QUEUE_PIPE2 #define DAP_EVENTS_CAPS_QUEUE_PIPE2
//#define DAP_EVENTS_CAPS_QUEUE_POSIX
#define DAP_EVENTS_CAPS_EVENT_EVENTFD #define DAP_EVENTS_CAPS_EVENT_EVENTFD
#include <netinet/in.h> #include <netinet/in.h>
#include <sys/eventfd.h> #include <sys/eventfd.h>
#include <mqueue.h>
#elif defined (DAP_OS_UNIX) #elif defined (DAP_OS_UNIX)
#define DAP_EVENTS_CAPS_KQUEUE #define DAP_EVENTS_CAPS_KQUEUE
#define DAP_EVENTS_CAPS_PIPE_POSIX
#define DAP_EVENTS_CAPS_EVENT_KEVENT #define DAP_EVENTS_CAPS_EVENT_KEVENT
#define DAP_EVENTS_CAPS_QUEUE_SOCKETPAIR #define DAP_EVENTS_CAPS_QUEUE_SOCKETPAIR
#include <netinet/in.h> #include <netinet/in.h>
...@@ -47,6 +51,7 @@ ...@@ -47,6 +51,7 @@
#define DAP_EVENTS_CAPS_EPOLL #define DAP_EVENTS_CAPS_EPOLL
#define DAP_EVENTS_CAPS_QUEUE_WEVENT #define DAP_EVENTS_CAPS_QUEUE_WEVENT
#define DAP_EVENTS_CAPS_EVENT_WEVENT #define DAP_EVENTS_CAPS_EVENT_WEVENT
#define DAP_EVENTS_CAPS_PIPE_POSIX
#endif #endif
#if defined(DAP_EVENTS_CAPS_EPOLL) #if defined(DAP_EVENTS_CAPS_EPOLL)
...@@ -119,8 +124,11 @@ typedef struct dap_events_socket { ...@@ -119,8 +124,11 @@ typedef struct dap_events_socket {
union{ union{
int socket; int socket;
int fd; int fd;
#if defined(DAP_EVENTS_CAPS_QUEUE_POSIX)
mqd_t mqd;
#endif
}; };
#ifdef DAP_EVENTS_CAPS_QUEUE_PIPE2 #ifdef DAP_EVENTS_CAPS_PIPE_POSIX
int fd2; int fd2;
#endif #endif
dap_events_desc_type_t type; dap_events_desc_type_t type;
...@@ -187,7 +195,7 @@ void dap_events_socket_deinit(); // Deinit clients module ...@@ -187,7 +195,7 @@ void dap_events_socket_deinit(); // Deinit clients module
dap_events_socket_t * dap_events_socket_create_type_queue_ptr_unsafe(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback); dap_events_socket_t * dap_events_socket_create_type_queue_ptr_unsafe(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback);
dap_events_socket_t * dap_events_socket_create_type_queue_ptr_mt(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback); dap_events_socket_t * dap_events_socket_create_type_queue_ptr_mt(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback);
void dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket); int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket);
dap_events_socket_t * dap_events_socket_create_type_event_unsafe(dap_worker_t * a_w, dap_events_socket_callback_event_t a_callback); dap_events_socket_t * dap_events_socket_create_type_event_unsafe(dap_worker_t * a_w, dap_events_socket_callback_event_t a_callback);
dap_events_socket_t * dap_events_socket_create_type_event_mt(dap_worker_t * a_w, dap_events_socket_callback_event_t a_callback); dap_events_socket_t * dap_events_socket_create_type_event_mt(dap_worker_t * a_w, dap_events_socket_callback_event_t a_callback);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment