Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (14)
Showing
with 571 additions and 324 deletions
......@@ -15,6 +15,7 @@
#include "cdb_bgtask.h"
#include <stdlib.h>
#include <errno.h>
#ifndef _WIN32
#include <sys/signal.h>
#else
......@@ -69,6 +70,7 @@ static void *_cdb_bgtask_func(void *arg)
while(bt->run) {
time_t now = time(NULL);
struct timespec timeout;
int l_cond_rc;
/* check should run some tasks every 1 second */
timeout.tv_sec = now + 1;
......@@ -82,7 +84,9 @@ static void *_cdb_bgtask_func(void *arg)
task->ltime = now;
}
}
pthread_cond_timedwait(&bt->scond, &bt->smutex, &timeout);
pthread_mutex_lock(&bt->smutex);
l_cond_rc = pthread_cond_timedwait(&bt->scond, &bt->smutex, &timeout);
pthread_mutex_unlock(&bt->smutex);
}
return NULL;
......
......@@ -2,7 +2,7 @@ project(cellframe-sdk C)
cmake_minimum_required(VERSION 2.8)
set(CMAKE_C_STANDARD 11)
set(CELLFRAME_SDK_NATIVE_VERSION "2.3-4")
set(CELLFRAME_SDK_NATIVE_VERSION "2.4-0")
add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"")
set(DAPSDK_MODULES "")
......
......@@ -341,6 +341,10 @@ void* dap_client_http_request_custom(const char *a_uplink_addr, uint16_t a_uplin
// create socket
int l_socket = socket( PF_INET, SOCK_STREAM, 0);
if (l_socket == -1) {
log_it(L_ERROR, "Error %d with socket create", errno);
return NULL;
}
// set socket param
int buffsize = DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX;
#ifdef _WIN32
......
......@@ -277,7 +277,7 @@ int dap_client_pvt_disconnect(dap_client_pvt_t *a_client_pvt)
// l_client_internal->stream_es->signal_close = true;
// start stopping connection
if(!dap_events_socket_kill_socket(a_client_pvt->stream_es)) {
if(a_client_pvt->stream_es && !dap_events_socket_kill_socket(a_client_pvt->stream_es)) {
int l_counter = 0;
// wait for stop of connection (max 0.7 sec.)
while(a_client_pvt->stream_es && l_counter < 70) {
......@@ -285,7 +285,7 @@ int dap_client_pvt_disconnect(dap_client_pvt_t *a_client_pvt)
l_counter++;
}
if(l_counter >= 70) {
dap_events_socket_remove_and_delete(a_client_pvt->stream_es, true);
dap_events_socket_queue_remove_and_delete(a_client_pvt->stream_es);
}
}
// if (l_client_internal->stream_socket ) {
......@@ -452,6 +452,11 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
log_it(L_INFO, "Go to stage STREAM_SESSION: process the state ops");
a_client_pvt->stream_socket = socket( PF_INET, SOCK_STREAM, 0);
if (a_client_pvt->stream_socket == -1) {
log_it(L_ERROR, "Error %d with socket create", errno);
a_client_pvt->stage_status = STAGE_STATUS_ERROR;
break;
}
#ifdef _WIN32
{
int buffsize = 65536;
......@@ -480,10 +485,9 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
a_client_pvt->stream_es->_inheritor = a_client_pvt;//->client;
a_client_pvt->stream = dap_stream_new_es(a_client_pvt->stream_es);
a_client_pvt->stream->is_client_to_uplink = true;
a_client_pvt->stream_session = dap_stream_session_pure_new(); // may be from in packet?
a_client_pvt->stream->session = dap_stream_session_pure_new(); // may be from in packet?
// new added, whether it is necessary?
a_client_pvt->stream->session = a_client_pvt->stream_session;
a_client_pvt->stream->session->key = a_client_pvt->stream_key;
// connect
......@@ -1196,39 +1200,21 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg)
{
log_it(L_INFO, "================= stream delete/peer reconnect");
//dap_client_t *l_client = DAP_CLIENT(a_es);
dap_client_pvt_t * l_client_pvt = a_es->_inheritor;
if(l_client_pvt == NULL) {
log_it(L_ERROR, "dap_client_pvt_t is not initialized");
return;
}
//pthread_mutex_lock(&l_client->mutex);
//dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client);
log_it(L_DEBUG, "client_pvt=0x%x", l_client_pvt);
if(l_client_pvt == NULL) {
log_it(L_ERROR, "dap_client_pvt is not initialized");
//pthread_mutex_unlock(&l_client->mutex);
return;
}
if (l_client_pvt->stage_status_error_callback) {
l_client_pvt->stage_status_error_callback(l_client_pvt->client, (void *)true);
}
dap_stream_delete(l_client_pvt->stream);
l_client_pvt->stream = NULL;
// if(l_client_pvt->client && l_client_pvt->client == l_client)
// dap_client_reset(l_client_pvt->client);
// l_client_pvt->client= NULL;
// log_it(L_DEBUG, "dap_stream_session_close()");
// sleep(3);
dap_stream_session_close(l_client_pvt->stream_session->id);
l_client_pvt->stream_session = NULL;
// signal to permit deleting of l_client_pvt
l_client_pvt->stream_es = NULL;
//pthread_mutex_unlock(&l_client->mutex);
/* disable reconnect from here
if(l_client_pvt->is_reconnect) {
......
......@@ -41,7 +41,6 @@ typedef struct dap_client_internal
dap_events_socket_t * stream_es;
int stream_socket;
dap_stream_session_t * stream_session;
dap_stream_t* stream;
dap_events_t * events;
......
......@@ -79,19 +79,22 @@
//} dap_events_socket_info_t;
//dap_events_socket_info_t **s_dap_events_sockets;
#define LOG_TAG "dap_events"
static uint32_t s_threads_count = 1;
static size_t s_connection_timeout = 6000;
static time_t s_connection_timeout = 6000;
static struct epoll_event *g_epoll_events = NULL;
static volatile bool bEventsAreActive = true;
bool s_workers_init = false;
dap_worker_t *s_workers = NULL;
dap_thread_t *s_threads = NULL;
static void s_new_es_callback( dap_events_socket_t * a_es, void * a_arg);
static void s_delete_es_callback( dap_events_socket_t * a_es, void * a_arg);
static void s_reassign_es_callback( dap_events_socket_t * a_es, void * a_arg);
#define LOG_TAG "dap_events"
uint32_t s_get_cpu_count( )
uint32_t dap_get_cpu_count( )
{
#ifdef _WIN32
SYSTEM_INFO si;
......@@ -123,7 +126,7 @@ uint32_t s_get_cpu_count( )
*/
int32_t dap_events_init( uint32_t a_threads_count, size_t conn_timeout )
{
s_threads_count = a_threads_count ? a_threads_count : s_get_cpu_count( );
s_threads_count = a_threads_count ? a_threads_count : dap_get_cpu_count( );
if ( conn_timeout )
s_connection_timeout = conn_timeout;
......@@ -216,21 +219,21 @@ void dap_events_delete( dap_events_t *a_events )
* @param n_thread
* @param sh
*/
static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t *d_ev, time_t cur_time )
static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t *a_events, time_t cur_time )
{
dap_events_socket_t *a_es, *tmp;
pthread_mutex_lock( &dap_worker->locker_on_count );
DL_FOREACH_SAFE( d_ev->dlsockets, a_es, tmp ) {
pthread_rwlock_rdlock(&a_events->sockets_rwlock);
HASH_ITER(hh, a_events->sockets, a_es, tmp ) {
if ( a_es->type == DESCRIPTOR_TYPE_FILE)
continue;
if ( !a_es->kill_signal && cur_time >= a_es->last_time_active + s_connection_timeout && !a_es->no_close ) {
if ( !a_es->kill_signal && cur_time >= (time_t)a_es->last_time_active + s_connection_timeout && !a_es->no_close ) {
log_it( L_INFO, "Socket %u timeout, closing...", a_es->socket );
if (a_es->callbacks->error_callback) {
a_es->callbacks->error_callback(a_es, (void *)ETIMEDOUT);
if (a_es->callbacks.error_callback) {
a_es->callbacks.error_callback(a_es, (void *)ETIMEDOUT);
}
if ( epoll_ctl( dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 )
......@@ -238,12 +241,12 @@ static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t
else
log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", dap_worker->number_thread );
dap_worker->event_sockets_count --;
DL_DELETE( d_ev->dlsockets, a_es );
pthread_rwlock_unlock(&a_events->sockets_rwlock);
dap_events_socket_delete( a_es, true );
pthread_rwlock_rdlock(&a_events->sockets_rwlock);
}
}
pthread_mutex_unlock( &dap_worker->locker_on_count );
pthread_rwlock_unlock(&a_events->sockets_rwlock);
}
......@@ -254,7 +257,7 @@ static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t
*/
static void *thread_worker_function(void *arg)
{
dap_events_socket_t *cur;
dap_events_socket_t *l_cur;
dap_worker_t *w = (dap_worker_t *) arg;
time_t next_time_timeout_check = time( NULL) + s_connection_timeout / 2;
uint32_t tn = w->number_thread;
......@@ -285,6 +288,9 @@ static void *thread_worker_function(void *arg)
}
#endif
w->event_new_es = dap_events_socket_create_type_event( w, s_new_es_callback);
w->event_delete_es = dap_events_socket_create_type_event( w, s_new_es_callback);
log_it(L_INFO, "Worker %d started, epoll fd %d", w->number_thread, w->epoll_fd);
struct epoll_event *events = &g_epoll_events[ DAP_MAX_EPOLL_EVENTS * tn];
......@@ -309,176 +315,193 @@ static void *thread_worker_function(void *arg)
time_t cur_time = time( NULL);
for(int32_t n = 0; n < selected_sockets; n++) {
cur = (dap_events_socket_t *) events[n].data.ptr;
l_cur = (dap_events_socket_t *) events[n].data.ptr;
if(!cur) {
if(!l_cur) {
log_it(L_ERROR, "dap_events_socket NULL");
continue;
}
l_cur->last_time_active = cur_time;
//log_it(L_DEBUG, "Worker=%d fd=%d socket=%d event=0x%x(%d)", w->number_thread, w->epoll_fd,cur->socket, events[n].events,events[n].events);
int l_sock_err = 0, l_sock_err_size = sizeof(l_sock_err);
//connection already closed (EPOLLHUP - shutdown has been made in both directions)
if(events[n].events & EPOLLHUP) { // && events[n].events & EPOLLERR) {
getsockopt(cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size);
//if(!(events[n].events & EPOLLIN))
//cur->no_close = false;
if (l_sock_err) {
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
log_it(L_DEBUG, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err));
if(!(events[n].events & EPOLLERR))
cur->callbacks->error_callback(cur, NULL); // Call callback to process error event
switch (l_cur->type ){
case DESCRIPTOR_TYPE_SOCKET_LISTENING:
case DESCRIPTOR_TYPE_SOCKET:
getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size);
//if(!(events[n].events & EPOLLIN))
//cur->no_close = false;
if (l_sock_err) {
l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
log_it(L_DEBUG, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err));
}
default: log_it(L_WARNING, "Unimplemented EPOLLHUP for socket type %d", l_cur->type);
}
}
if(events[n].events & EPOLLERR) {
getsockopt(cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size);
log_it(L_ERROR, "Socket error: %s", strerror(l_sock_err));
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
cur->callbacks->error_callback(cur, NULL); // Call callback to process error event
switch (l_cur->type ){
case DESCRIPTOR_TYPE_SOCKET_LISTENING:
case DESCRIPTOR_TYPE_SOCKET:
getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size);
log_it(L_ERROR, "Socket error: %s", strerror(l_sock_err));
default: ;
}
l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
l_cur->callbacks.error_callback(l_cur, NULL); // Call callback to process error event
}
cur->last_time_active = cur_time;
if(events[n].events & EPOLLIN) {
//log_it(DEBUG,"Comes connection in active read set");
if(cur->buf_in_size == sizeof(cur->buf_in)) {
if(l_cur->buf_in_size == sizeof(l_cur->buf_in)) {
log_it(L_WARNING, "Buffer is full when there is smth to read. Its dropped!");
cur->buf_in_size = 0;
l_cur->buf_in_size = 0;
}
int32_t bytes_read = 0;
if(cur->type == DESCRIPTOR_TYPE_SOCKET) {
bytes_read = recv(cur->socket, (char *) (cur->buf_in + cur->buf_in_size),
sizeof(cur->buf_in) - cur->buf_in_size, 0);
}else if(cur->type == DESCRIPTOR_TYPE_FILE) {
bytes_read = read(cur->socket, (char *) (cur->buf_in + cur->buf_in_size),
sizeof(cur->buf_in) - cur->buf_in_size);
int l_errno=0;
bool l_must_read_smth = false;
switch (l_cur->type) {
case DESCRIPTOR_TYPE_FILE:
l_must_read_smth = true;
bytes_read = read(l_cur->socket, (char *) (l_cur->buf_in + l_cur->buf_in_size),
sizeof(l_cur->buf_in) - l_cur->buf_in_size);
l_errno = errno;
break;
case DESCRIPTOR_TYPE_SOCKET:
l_must_read_smth = true;
bytes_read = recv(l_cur->fd, (char *) (l_cur->buf_in + l_cur->buf_in_size),
sizeof(l_cur->buf_in) - l_cur->buf_in_size, 0);
l_errno = errno;
break;
case DESCRIPTOR_TYPE_SOCKET_LISTENING:
// Accept connection
break;
case DESCRIPTOR_TYPE_TIMER:{
uint64_t val;
/* if we not reading data from socket, he triggered again */
read( l_cur->fd, &val, 8);
if (l_cur->callbacks.timer_callback)
l_cur->callbacks.timer_callback(l_cur);
else
log_it(L_ERROR, "Socket %d with timer callback fired, but callback is NULL ", l_cur->socket);
} break;
case DESCRIPTOR_TYPE_EVENT:
if (l_cur->callbacks.event_callback){
void * l_event_ptr = NULL;
#if defined(DAP_EVENTS_CAPS_EVENT_PIPE_PKT_MODE)
if(read( l_cur->fd, &l_event_ptr,sizeof (&l_event_ptr)) == sizeof (&l_event_ptr))
l_cur->callbacks.event_callback(l_cur, l_event_ptr);
else if ( (errno != EAGAIN) && (errno != EWOULDBLOCK) ) // we use blocked socket for now but who knows...
log_it(L_WARNING, "Can't read packet from pipe");
#endif
}else
log_it(L_ERROR, "Socket %d with event callback fired, but callback is NULL ", l_cur->socket);
break;
}
if(bytes_read > 0) {
cur->buf_in_size += bytes_read;
//log_it(DEBUG, "Received %d bytes", bytes_read);
cur->callbacks->read_callback(cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well
}
else if(bytes_read < 0) {
log_it(L_ERROR, "Some error occured in recv() function: %s", strerror(errno));
dap_events_socket_set_readable(cur, false);
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
else if(bytes_read == 0) {
log_it(L_INFO, "Client socket disconnected");
dap_events_socket_set_readable(cur, false);
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
if (l_must_read_smth){ // Socket/Descriptor read
if(bytes_read > 0) {
l_cur->buf_in_size += bytes_read;
//log_it(DEBUG, "Received %d bytes", bytes_read);
l_cur->callbacks.read_callback(l_cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well
}
else if(bytes_read < 0) {
if (l_errno != EAGAIN && l_errno != EWOULDBLOCK){ // Socket is blocked
log_it(L_ERROR, "Some error occured in recv() function: %s", strerror(errno));
dap_events_socket_set_readable(l_cur, false);
l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
}
else if(bytes_read == 0) {
log_it(L_INFO, "Client socket disconnected");
dap_events_socket_set_readable(l_cur, false);
l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
}
}
// Socket is ready to write
if(((events[n].events & EPOLLOUT) || (cur->flags & DAP_SOCK_READY_TO_WRITE))
&& !(cur->flags & DAP_SOCK_SIGNAL_CLOSE)) {
if(((events[n].events & EPOLLOUT) || (l_cur->flags & DAP_SOCK_READY_TO_WRITE))
&& !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE)) {
///log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size);
if(cur->callbacks->write_callback)
cur->callbacks->write_callback(cur, NULL); // Call callback to process write event
if(l_cur->callbacks.write_callback)
l_cur->callbacks.write_callback(l_cur, NULL); // Call callback to process write event
if(cur->flags & DAP_SOCK_READY_TO_WRITE) {
if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) {
static const uint32_t buf_out_zero_count_max = 20;
cur->buf_out[cur->buf_out_size] = 0;
l_cur->buf_out[l_cur->buf_out_size] = 0;
if(!cur->buf_out_size) {
if(!l_cur->buf_out_size) {
//log_it(L_WARNING, "Output: nothing to send. Why we are in write socket set?");
cur->buf_out_zero_count++;
l_cur->buf_out_zero_count++;
if(cur->buf_out_zero_count > buf_out_zero_count_max) { // How many time buf_out on write event could be empty
if(l_cur->buf_out_zero_count > buf_out_zero_count_max) { // How many time buf_out on write event could be empty
log_it(L_ERROR, "Output: nothing to send %u times, remove socket from the write set",
buf_out_zero_count_max);
dap_events_socket_set_writable(cur, false);
dap_events_socket_set_writable(l_cur, false);
}
}
else
cur->buf_out_zero_count = 0;
l_cur->buf_out_zero_count = 0;
}
//for(total_sent = 0; total_sent < cur->buf_out_size;) { // If after callback there is smth to send - we do it
int l_errno;
if(l_cur->type == DESCRIPTOR_TYPE_SOCKET) {
bytes_sent = send(l_cur->socket, (char *) (l_cur->buf_out + total_sent),
l_cur->buf_out_size - total_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
l_errno = errno;
}else if(l_cur->type == DESCRIPTOR_TYPE_FILE) {
bytes_sent = write(l_cur->socket, (char *) (l_cur->buf_out + total_sent),
l_cur->buf_out_size - total_sent);
l_errno = errno;
}
for(total_sent = 0; total_sent < cur->buf_out_size;) { // If after callback there is smth to send - we do it
if(cur->type == DESCRIPTOR_TYPE_SOCKET) {
bytes_sent = send(cur->socket, (char *) (cur->buf_out + total_sent),
cur->buf_out_size - total_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
}else if(cur->type == DESCRIPTOR_TYPE_FILE) {
bytes_sent = write(cur->socket, (char *) (cur->buf_out + total_sent),
cur->buf_out_size - total_sent);
}
if(bytes_sent < 0) {
if(bytes_sent < 0) {
if (l_errno != EAGAIN && l_errno != EWOULDBLOCK ){ // If we have non-blocking socket
log_it(L_ERROR, "Some error occured in send(): %s", strerror(errno));
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
break;
}
}else{
total_sent += bytes_sent;
//log_it(L_DEBUG, "Output: %u from %u bytes are sent ", total_sent,sa_cur->buf_out_size);
}
//log_it(L_DEBUG,"Output: sent %u bytes",total_sent);
if (total_sent) {
pthread_mutex_lock(&cur->write_hold);
cur->buf_out_size -= total_sent;
if (cur->buf_out_size) {
memmove(cur->buf_out, &cur->buf_out[total_sent], cur->buf_out_size);
} else {
cur->flags &= ~DAP_SOCK_READY_TO_WRITE;
//}
//log_it(L_DEBUG,"Output: sent %u bytes",total_sent);
if (total_sent) {
pthread_mutex_lock(&l_cur->mutex);
l_cur->buf_out_size -= total_sent;
if (l_cur->buf_out_size) {
memmove(l_cur->buf_out, &l_cur->buf_out[total_sent], l_cur->buf_out_size);
} else {
l_cur->flags &= ~DAP_SOCK_READY_TO_WRITE;
}
pthread_mutex_unlock(&l_cur->mutex);
}
pthread_mutex_unlock(&cur->write_hold);
}
}
pthread_mutex_lock(&w->locker_on_count);
if((cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !cur->no_close) {
if((l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !l_cur->no_close) {
// protect against double deletion
cur->kill_signal = true;
l_cur->kill_signal = true;
//dap_events_socket_remove_and_delete(cur, true);
log_it(L_INFO, "Got signal to close %s, sock %u [thread %u]", cur->hostaddr, cur->socket, tn);
log_it(L_INFO, "Got signal to close %s, sock %u [thread %u]", l_cur->hostaddr, l_cur->socket, tn);
}
if(cur->kill_signal) {
log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ]", cur->socket, tn);
dap_events_socket_remove(cur);
pthread_mutex_unlock(&w->locker_on_count);
dap_events_socket_delete( cur, true);
if(l_cur->kill_signal) {
log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ]", l_cur->socket, tn);
s_es_remove(l_cur);
dap_events_socket_delete( l_cur, true);
}
else
pthread_mutex_unlock(&w->locker_on_count);
/*
if(!w->event_to_kill_count) {
pthread_mutex_unlock(&w->locker_on_count);
continue;
do {
// if ( cur->no_close ) {
// cur = cur->knext;
// continue;
// }
tmp = cur_del->knext;
// delete only current events_socket because others may be active in the other workers
//if(cur_del == cur)
if(cur->kill_signal) {
log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ]", cur_del->socket, tn);
DL_LIST_REMOVE_NODE(w->events->to_kill_sockets, cur, kprev, knext, w->event_to_kill_count);
dap_events_socket_remove_and_delete(cur_del, true);
}
cur_del = tmp;
} while(cur_del);
log_it(L_INFO, "[ Thread %u ] coneections: %u, to kill: %u", tn, w->event_sockets_count,
w->event_to_kill_count);
pthread_mutex_unlock(&w->locker_on_count);
*/
} // for
}
#ifndef NO_TIMER
if(cur_time >= next_time_timeout_check) {
......@@ -492,6 +515,87 @@ static void *thread_worker_function(void *arg)
return NULL;
}
/**
* @brief s_new_es_callback
* @param a_es
* @param a_arg
*/
static void s_new_es_callback( dap_events_socket_t * a_es, void * a_arg)
{
dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg;
dap_worker_t * w = a_es->dap_worker;
log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new);
l_es_new->dap_worker = w;
if ( l_es_new->type == DESCRIPTOR_TYPE_SOCKET || l_es_new->type == DESCRIPTOR_TYPE_SOCKET_LISTENING ){
int l_cpu = w->number_thread;
setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu));
}
if ( ! l_es_new->is_initalized ){
if (l_es_new->callbacks.new_callback)
l_es_new->callbacks.new_callback(l_es_new, NULL);
l_es_new->is_initalized = true;
}
if (l_es_new->socket>0){
pthread_rwlock_wrlock(&w->events->sockets_rwlock);
HASH_ADD_INT(w->events->sockets, socket, l_es_new );
pthread_rwlock_unlock(&w->events->sockets_rwlock);
struct epoll_event l_ev={0};
l_ev.events = l_es_new->flags ;
if(l_es_new->flags & DAP_SOCK_READY_TO_READ )
l_ev.events |= EPOLLIN;
if(l_es_new->flags & DAP_SOCK_READY_TO_WRITE )
l_ev.events |= EPOLLOUT;
l_ev.data.ptr = l_es_new;
if ( epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, l_es_new->socket, &l_ev) == 1 )
log_it(L_CRITICAL,"Can't add event socket's handler to epoll_fd");
else{
log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->number_thread);
if (l_es_new->callbacks.worker_assign_callback)
l_es_new->callbacks.worker_assign_callback(l_es_new, w);
}
}else{
log_it(L_ERROR, "Incorrect socket %d after new callback. Dropping this handler out", l_es_new->socket);
dap_events_socket_queue_on_remove_and_delete( l_es_new );
}
}
/**
* @brief s_delete_es_callback
* @param a_es
* @param a_arg
*/
static void s_delete_es_callback( dap_events_socket_t * a_es, void * a_arg)
{
if ( epoll_ctl( a_es->dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 )
log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" );
else
log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_es->dap_worker->number_thread );
a_es->dap_worker->event_sockets_count --;
dap_events_socket_delete( a_es, false );
}
/**
* @brief s_reassign_es_callback
* @param a_es
* @param a_arg
*/
static void s_reassign_es_callback( dap_events_socket_t * a_es, void * a_arg)
{
dap_events_socket_t * l_es_reassign = ((dap_events_socket_t* ) a_arg);
s_es_remove( l_es_reassign);
if (l_es_reassign->callbacks.worker_unassign_callback)
l_es_reassign->callbacks.worker_unassign_callback(l_es_reassign, a_es->dap_worker);
dap_events_socket_assign_on_worker( l_es_reassign, l_es_reassign->dap_worker );
}
/**
* @brief dap_worker_get_min
* @return
......@@ -526,6 +630,11 @@ uint32_t dap_worker_get_index_min( )
return min;
}
dap_worker_t * dap_worker_get_index(uint8_t a_index)
{
return a_index < s_threads_count ? &s_workers[a_index] : NULL;
}
/**
* @brief dap_worker_print_all
*/
......@@ -589,12 +698,22 @@ int dap_events_wait( dap_events_t *sh )
return 0;
}
/**
* @brief sap_worker_add_events_socket
* @param a_events_socket
* @param a_worker
*/
void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_worker_t * a_worker)
{
dap_events_socket_send_event( a_worker->event_new_es, a_events_socket );
}
/**
* @brief dap_worker_add_events_socket
* @param a_worker
* @param a_events_socket
*/
void dap_worker_add_events_socket( dap_events_socket_t *a_es)
void dap_worker_add_events_socket_auto( dap_events_socket_t *a_es)
{
// struct epoll_event ev = {0};
dap_worker_t *l_worker = dap_worker_get_min( );
......
......@@ -27,7 +27,7 @@
#include <stdarg.h>
#include <string.h>
#include <assert.h>
#include <errno.h>
#ifndef _WIN32
#include <sys/epoll.h>
#include <unistd.h>
......@@ -86,33 +86,106 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events,
ret->socket = a_sock;
ret->events = a_events;
ret->callbacks = a_callbacks;
memcpy(&ret->callbacks, a_callbacks, sizeof(ret->callbacks) );
ret->flags = DAP_SOCK_READY_TO_READ;
ret->no_close = false;
pthread_mutex_init(&ret->write_hold, NULL);
pthread_mutex_init(&ret->mutex, NULL);
log_it( L_DEBUG,"Dap event socket wrapped around %d sock a_events = %X", a_sock, a_events );
return ret;
}
/**
* @brief dap_events_socket_assign_on_worker
* @param a_es
* @param a_worker
*/
void dap_events_socket_assign_on_worker(dap_events_socket_t * a_es, struct dap_worker * a_worker)
{
a_es->last_ping_request = time(NULL);
dap_worker_add_events_socket(a_es,a_worker);
}
/**
* @brief dap_events_socket_create_type_event
* @param a_w
* @return
*/
dap_events_socket_t * dap_events_socket_create_type_event(dap_worker_t * a_w, dap_events_socket_callback_t a_callback)
{
dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t);
l_es->type = DESCRIPTOR_TYPE_EVENT;
l_es->dap_worker = a_w;
l_es->callbacks.event_callback = a_callback; // Arm event callback
#ifdef DAP_EVENTS_CAPS_EVENT_PIPE_PKT_MODE
int l_pipe[2];
int l_errno;
char l_errbuf[128];
if( pipe2(l_pipe,O_DIRECT) < 0 ){
l_errno = errno;
strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
switch (l_errno) {
case EINVAL: log_it(L_CRITICAL, "Too old linux version thats doesn't support O_DIRECT flag for pipes (%s)", l_errbuf); break;
default: log_it( L_ERROR, "Error detected, can't create pipe(): '%s' (%d)", l_errbuf, l_errno);
}
return NULL;
}else
log_it(L_DEBUG, "Created one-way unnamed pipe %d->%d", l_pipe[0], l_pipe[1]);
l_es->fd = l_pipe[1];
l_es->fd2 = l_pipe[0];
#endif
#if defined(DAP_EVENTS_CAPS_EPOLL)
struct epoll_event l_ev={0};
int l_event_fd = l_es->fd;
log_it( L_INFO, "Create event descriptor with queue %d (%p)", l_event_fd, l_es);
l_ev.events = EPOLLIN | EPOLLET;
l_ev.data.ptr = l_es;
epoll_ctl(a_w->epoll_fd, EPOLL_CTL_ADD, l_event_fd, &l_ev);
#endif
return l_es;
}
/**
* @brief dap_events_socket_send_event
* @param a_es
* @param a_arg
*/
void dap_events_socket_send_event( dap_events_socket_t * a_es, void* a_arg)
{
#if defined(DAP_EVENTS_CAPS_EVENT_PIPE_PKT_MODE)
write( a_es->fd2, &a_arg,sizeof(a_arg));
#endif
}
/**
* @brief dap_events_socket_queue_on_remove_and_delete
* @param a_es
*/
void dap_events_socket_queue_on_remove_and_delete(dap_events_socket_t* a_es)
{
dap_events_socket_send_event( a_es->dap_worker->event_delete_es, a_es );
}
/**
* @brief dap_events_socket_create_after
* @param a_es
*/
void dap_events_socket_create_after( dap_events_socket_t *a_es )
{
if ( a_es->callbacks->new_callback )
a_es->callbacks->new_callback( a_es, NULL ); // Init internal structure
if ( a_es->callbacks.new_callback )
a_es->callbacks.new_callback( a_es, NULL ); // Init internal structure
a_es->last_time_active = a_es->last_ping_request = time( NULL );
dap_worker_add_events_socket( a_es );
dap_worker_add_events_socket_auto( a_es );
pthread_mutex_lock( &a_es->dap_worker->locker_on_count );
a_es->dap_worker->event_sockets_count ++;
DL_APPEND( a_es->events->dlsockets, a_es );
pthread_rwlock_wrlock( &a_es->events->sockets_rwlock );
HASH_ADD_INT( a_es->events->sockets, socket, a_es );
......@@ -147,7 +220,7 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da
ret->socket = a_sock;
ret->events = a_events;
ret->callbacks = a_callbacks;
memcpy(&ret->callbacks,a_callbacks, sizeof ( ret->callbacks) );
ret->flags = DAP_SOCK_READY_TO_READ;
ret->is_pingable = true;
......@@ -192,7 +265,8 @@ void dap_events_socket_set_readable( dap_events_socket_t *sc, bool is_ready )
if( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_READ) )
return;
sc->ev.events = EPOLLERR;
sc->ev.events = sc->ev_base_flags;
sc->ev.events |= EPOLLERR;
if ( is_ready )
sc->flags |= DAP_SOCK_READY_TO_READ;
......@@ -222,10 +296,10 @@ void dap_events_socket_set_readable( dap_events_socket_t *sc, bool is_ready )
*/
void dap_events_socket_set_writable( dap_events_socket_t *sc, bool is_ready )
{
pthread_mutex_lock(&sc->write_hold);
pthread_mutex_lock(&sc->mutex);
if ( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_WRITE) ) {
pthread_mutex_unlock(&sc->write_hold);
pthread_mutex_unlock(&sc->mutex);
return;
}
......@@ -234,7 +308,7 @@ void dap_events_socket_set_writable( dap_events_socket_t *sc, bool is_ready )
else
sc->flags ^= DAP_SOCK_READY_TO_WRITE;
int events = EPOLLERR;
int events = sc->ev_base_flags | EPOLLERR;
if( sc->flags & DAP_SOCK_READY_TO_READ )
events |= EPOLLIN;
......@@ -242,7 +316,7 @@ void dap_events_socket_set_writable( dap_events_socket_t *sc, bool is_ready )
if( sc->flags & DAP_SOCK_READY_TO_WRITE )
events |= EPOLLOUT;
pthread_mutex_unlock(&sc->write_hold);
pthread_mutex_unlock(&sc->mutex);
sc->ev.events = events;
......@@ -274,9 +348,7 @@ int dap_events_socket_kill_socket( dap_events_socket_t *a_es )
//dap_events_t *d_ev = w->events;
pthread_mutex_lock( &a_es->dap_worker->locker_on_count );
if ( a_es->kill_signal ) {
pthread_mutex_unlock( &a_es->dap_worker ->locker_on_count );
return 0;
}
......@@ -285,7 +357,6 @@ int dap_events_socket_kill_socket( dap_events_socket_t *a_es )
a_es->kill_signal = true;
//DL_LIST_ADD_NODE_HEAD( d_ev->to_kill_sockets, a_es, kprev, knext, w->event_to_kill_count );
pthread_mutex_unlock( &a_es->dap_worker->locker_on_count );
return 0;
}
......@@ -310,8 +381,8 @@ void dap_events_socket_delete( dap_events_socket_t *a_es, bool preserve_inherito
log_it( L_DEBUG, "dap_events_socket wrapped around %d socket is removed", a_es->socket );
if( a_es->callbacks->delete_callback )
a_es->callbacks->delete_callback( a_es, NULL ); // Init internal structure
if( a_es->callbacks.delete_callback )
a_es->callbacks.delete_callback( a_es, NULL ); // Init internal structure
if ( a_es->_inheritor && !preserve_inheritor )
DAP_DELETE( a_es->_inheritor );
......@@ -321,9 +392,15 @@ void dap_events_socket_delete( dap_events_socket_t *a_es, bool preserve_inherito
closesocket( a_es->socket );
#else
close( a_es->socket );
#ifdef DAP_EVENTS_CAPS_EVENT_PIPE_PKT_MODE
if( a_es->type == DESCRIPTOR_TYPE_EVENT){
close( a_es->fd2);
}
#endif
#endif
}
pthread_mutex_destroy(&a_es->write_hold);
pthread_mutex_destroy(&a_es->mutex);
DAP_DELETE( a_es );
}
......@@ -331,28 +408,24 @@ void dap_events_socket_delete( dap_events_socket_t *a_es, bool preserve_inherito
* @brief dap_events_socket_delete
* @param a_es
*/
void dap_events_socket_remove( dap_events_socket_t *a_es)
void s_es_remove( dap_events_socket_t *a_es)
{
if ( epoll_ctl( a_es->dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 )
log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" );
else
log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_es->dap_worker->number_thread );
DL_DELETE( a_es->events->dlsockets, a_es );
a_es->dap_worker->event_sockets_count --;
}
void dap_events_socket_remove_and_delete( dap_events_socket_t *a_es, bool preserve_inheritor )
/**
* @brief dap_events_socket_remove_and_delete
* @param a_es
* @param preserve_inheritor
*/
void dap_events_socket_queue_remove_and_delete( dap_events_socket_t *a_es )
{
if ( epoll_ctl( a_es->dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 )
log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" );
else
log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_es->dap_worker->number_thread );
DL_DELETE( a_es->events->dlsockets, a_es );
a_es->dap_worker->event_sockets_count --;
dap_events_socket_delete( a_es, preserve_inheritor );
dap_events_socket_send_event( a_es->dap_worker->event_delete_es, a_es );
}
/**
......@@ -365,11 +438,11 @@ void dap_events_socket_remove_and_delete( dap_events_socket_t *a_es, bool prese
size_t dap_events_socket_write(dap_events_socket_t *sc, const void * data, size_t data_size)
{
//log_it(L_DEBUG,"dap_events_socket_write %u sock data %X size %u", sc->socket, data, data_size );
pthread_mutex_lock(&sc->write_hold);
pthread_mutex_lock(&sc->mutex);
data_size = ((sc->buf_out_size+data_size)<(sizeof(sc->buf_out)))?data_size:(sizeof(sc->buf_out)-sc->buf_out_size );
memcpy(sc->buf_out+sc->buf_out_size,data,data_size);
sc->buf_out_size+=data_size;
pthread_mutex_unlock(&sc->write_hold);
pthread_mutex_unlock(&sc->mutex);
return data_size;
}
......@@ -383,7 +456,7 @@ size_t dap_events_socket_write_f(dap_events_socket_t *sc, const char * format,..
{
log_it(L_DEBUG,"dap_events_socket_write_f %u sock", sc->socket );
pthread_mutex_lock(&sc->write_hold);
pthread_mutex_lock(&sc->mutex);
size_t max_data_size = sizeof(sc->buf_out)-sc->buf_out_size;
va_list ap;
va_start(ap,format);
......@@ -394,7 +467,7 @@ size_t dap_events_socket_write_f(dap_events_socket_t *sc, const char * format,..
}else{
log_it(L_ERROR,"Can't write out formatted data '%s'",format);
}
pthread_mutex_unlock(&sc->write_hold);
pthread_mutex_unlock(&sc->mutex);
return (ret > 0) ? ret : 0;
}
......
......@@ -642,6 +642,7 @@ void *thread_loop( void *arg )
CPU_SET( tn, &mask );
int err;
int l_sock_err = 0, l_sock_err_size = sizeof(l_sock_err);
#ifndef ANDROID
err = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask);
#else
......@@ -690,7 +691,8 @@ void *thread_loop( void *arg )
dap_cur->last_time_active = cur_time;
if( events[i].events & EPOLLERR ) {
log_it( L_ERROR,"Socket error: %u, remove it" , dap_cur->socket );
getsockopt(dap_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size);
log_it( L_ERROR,"Socket %u error: %s, remove it" , dap_cur->socket, strerror(l_sock_err));
dap_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
set_nonblock_socket(dap_cur->socket);
......
......@@ -21,7 +21,7 @@
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef WIN32
#include <stdint.h>
#include <stdbool.h>
#include <errno.h>
......@@ -149,4 +149,4 @@ int dap_timerfd_delete(dap_timerfd_t *l_timerfd)
DAP_DELETE(l_timerfd);
return 0;
}
#endif
......@@ -28,6 +28,8 @@
#include <stdint.h>
#include <pthread.h>
#include <stdatomic.h>
#include <sys/eventfd.h>
#define EPOLL_HANDLE int
#else
#define EPOLL_HANDLE HANDLE
......@@ -50,9 +52,6 @@ struct dap_worker;
typedef struct dap_events {
dap_events_socket_t *sockets; // Hashmap of event sockets
dap_events_socket_t *dlsockets; // Dlist of event sockets
dap_events_socket_t *to_kill_sockets; // Dlist of event sockets
pthread_rwlock_t sockets_rwlock;
void *_inheritor; // Pointer to the internal data, HTTP for example
dap_thread_t proc_thread;
......@@ -62,9 +61,10 @@ typedef struct dap_events {
typedef struct dap_worker
{
uint32_t event_sockets_count;
//uint32_t event_to_kill_count;
atomic_uint event_sockets_count;
dap_events_socket_t * event_new_es; // Events socket for new socket
dap_events_socket_t * event_delete_es; // Events socket for new socket
EPOLL_HANDLE epoll_fd;
uint32_t number_thread;
pthread_mutex_t locker_on_count;
......@@ -86,6 +86,11 @@ int32_t dap_events_wait( dap_events_t *sh );
uint32_t dap_worker_get_index_min( );
dap_worker_t *dap_worker_get_min( );
void dap_worker_add_events_socket( dap_events_socket_t * a_events_socket );
uint32_t dap_get_cpu_count( );
dap_worker_t * dap_worker_get_index(uint8_t a_index);
void dap_events_socket_assign_on_worker(dap_events_socket_t * a_es, struct dap_worker * a_worker);
void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_worker_t * a_worker);
void dap_worker_add_events_socket_auto( dap_events_socket_t * a_events_socket );
void dap_worker_print_all( );
......@@ -22,50 +22,96 @@
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <stdint.h>
#include <stddef.h>
#include <stdbool.h>
#include <stdatomic.h>
#include <pthread.h>
#include "uthash.h"
#ifndef _WIN32
// Caps for different platforms
#if defined(DAP_OS_LINUX)
#define DAP_EVENTS_CAPS_EPOLL
#define DAP_EVENTS_CAPS_EVENT_PIPE_PKT_MODE
#elif defined (DAP_OS_UNIX)
#define DAP_EVENTS_CAPS_POLL
#define DAP_EVENTS_CAPS_EVENT_PIPE
#elif defined (DAP_OS_WINDOWS)
#define DAP_EVENTS_CAPS_WEPOLL
#define DAP_EVENTS_CAPS_EPOLL
#define DAP_EVENTS_CAPS_EVENT_PIPE
#endif
#if defined(DAP_EVENTS_CAPS_EPOLL)
#include <sys/epoll.h>
#else
#elif defined (DAP_EVENTS_CAPS_WEPOLL)
#include "wepoll.h"
#endif
#include <pthread.h>
struct dap_events;
struct dap_events_socket;
struct dap_worker;
typedef struct dap_events dap_events_t;
typedef struct dap_events_socket dap_events_socket_t;
typedef struct dap_worker dap_worker_t;
typedef struct dap_server dap_server_t;
typedef void (*dap_events_socket_callback_t) (struct dap_events_socket *,void * arg); // Callback for specific client operations
typedef void (*dap_events_socket_callback_t) (dap_events_socket_t *,void * ); // Callback for specific client operations
typedef void (*dap_events_socket_callback_timer_t) (dap_events_socket_t * ); // Callback for specific client operations
typedef void (*dap_events_socket_worker_callback_t) (dap_events_socket_t *,dap_worker_t * ); // Callback for specific client operations
typedef struct dap_events_socket_callbacks {
union{
dap_events_socket_callback_t accept_callback; // Accept callback for listening socket
dap_events_socket_callback_timer_t timer_callback; // Timer callback for listening socket
dap_events_socket_callback_t event_callback; // Timer callback for listening socket
dap_events_socket_callback_t action_callback; // Callback for action with socket
// for events and timers thats pointer
// to processing callback
};
dap_events_socket_callback_t new_callback; // Create new client callback
dap_events_socket_callback_t delete_callback; // Delete client callback
dap_events_socket_callback_t read_callback; // Read function
dap_events_socket_callback_t write_callback; // Write function
dap_events_socket_callback_t error_callback; // Error processing function
dap_events_socket_worker_callback_t worker_assign_callback; // After successful worker assign
dap_events_socket_worker_callback_t worker_unassign_callback; // After successful worker unassign
} dap_events_socket_callbacks_t;
#define DAP_EVENTS_SOCKET_BUF 100000
#if 0
typedef enum {
DESCRIPTOR_TYPE_SOCKET = 0,
DESCRIPTOR_TYPE_SOCKET_LISTENING,
DESCRIPTOR_TYPE_EVENT,
DESCRIPTOR_TYPE_TIMER,
DESCRIPTOR_TYPE_FILE
} dap_events_desc_type_t;
typedef struct dap_events_socket {
union{
int socket;
int fd;
};
#ifdef DAP_EVENTS_CAPS_EVENT_PIPE_PKT_MODE
int fd2;
#endif
dap_events_desc_type_t type;
int socket;
bool signal_close;
dap_events_socket_t ** workers_es; // If not NULL - on every worker must be present
size_t workers_es_size; // events socket with same socket
bool _ready_to_write;
bool _ready_to_read;
uint32_t flags;
bool no_close;
atomic_bool kill_signal;
atomic_bool is_initalized;
uint32_t buf_out_zero_count;
union{
uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data
char buf_in_str[DAP_EVENTS_SOCKET_BUF+1];
};
size_t buf_in_size; // size of data that is in the input buffer
uint8_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data
......@@ -76,82 +122,46 @@ typedef struct dap_events_socket {
size_t buf_out_size; // size of data that is in the output buffer
struct dap_events *events;
struct dap_worker *dap_worker;
dap_events_socket_callbacks_t *callbacks;
#ifdef DAP_EVENTS_CAPS_EPOLL
uint32_t ev_base_flags;
struct epoll_event ev;
#endif
dap_events_socket_callbacks_t callbacks;
time_t time_connection;
time_t last_time_active;
time_t last_ping_request;
bool is_pingable;
UT_hash_handle hh;
struct dap_events_socket *next, *prev;
struct dap_events_socket *knext, *kprev;
void * _inheritor; // Inheritor data to specific client type, usualy states for state machine
} dap_events_socket_t; // Node of bidirectional list of clients
#endif
typedef enum {
DESCRIPTOR_TYPE_SOCKET = 0,
DESCRIPTOR_TYPE_FILE
} dap_events_desc_type_t;
typedef struct dap_events_socket {
int32_t socket;
dap_events_desc_type_t type;
uint32_t flags;
// bool signal_close;
bool no_close;
bool kill_signal;
// bool _ready_to_write;
// bool _ready_to_read;
uint32_t buf_out_zero_count;
union{
uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data
char buf_in_str[DAP_EVENTS_SOCKET_BUF+1];
};
size_t buf_in_size; // size of data that is in the input buffer
void *_inheritor; // Inheritor data to specific client type, usualy states for state machine
uint8_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data
char hostaddr[1024]; // Address
char service[128];
size_t buf_out_size; // size of data that is in the output buffer
struct dap_events *events;
struct dap_worker *dap_worker;
struct epoll_event ev;
dap_events_socket_callbacks_t *callbacks;
time_t time_connection;
time_t last_time_active;
time_t last_ping_request;
bool is_pingable;
UT_hash_handle hh;
struct dap_events_socket *next, *prev;
struct dap_events_socket *knext, *kprev;
pthread_mutex_t mutex;
} dap_events_socket_t; // Node of bidirectional list of clients
void *_inheritor; // Inheritor data to specific client type, usualy states for state machine
typedef struct dap_events_socket_event{
pthread_mutex_t write_hold;
} dap_events_socket_t; // Node of bidirectional list of clients
} dap_events_socket_event_t;
int dap_events_socket_init(); // Init clients module
void dap_events_socket_deinit(); // Deinit clients module
void dap_events_socket_create_after(dap_events_socket_t * a_es);
dap_events_socket_t * dap_events_socket_create_type_event(dap_worker_t * a_w, dap_events_socket_callback_t a_callback);
void dap_events_socket_send_event( dap_events_socket_t * a_es, void* a_arg);
dap_events_socket_t * dap_events_socket_wrap_no_add(struct dap_events * a_events,
int s, dap_events_socket_callbacks_t * a_callbacks); // Create new client and add it to the list
dap_events_socket_t * dap_events_socket_find(int sock, struct dap_events * sh); // Find client by socket
// Non-MT functions
bool dap_events_socket_is_ready_to_read(dap_events_socket_t * sc);
bool dap_events_socket_is_ready_to_write(dap_events_socket_t * sc);
void dap_events_socket_set_readable(dap_events_socket_t * sc,bool is_ready);
......@@ -161,9 +171,23 @@ size_t dap_events_socket_write(dap_events_socket_t *sc, const void * data, size_
size_t dap_events_socket_write_f(dap_events_socket_t *sc, const char * format,...);
size_t dap_events_socket_read(dap_events_socket_t *sc, void * data, size_t data_size);
void dap_events_socket_remove( dap_events_socket_t *a_es);
// MT variants less
bool dap_events_socket_is_ready_to_read_mt(dap_events_socket_t * sc);
bool dap_events_socket_is_ready_to_write_mt(dap_events_socket_t * sc);
void dap_events_socket_set_readable_mt(dap_events_socket_t * sc,bool is_ready);
void dap_events_socket_set_writable_mt(dap_events_socket_t * sc,bool is_ready);
size_t dap_events_socket_write_mt(dap_events_socket_t *sc, const void * data, size_t data_size);
size_t dap_events_socket_write_f_mt(dap_events_socket_t *sc, const char * format,...);
size_t dap_events_socket_read_mt(dap_events_socket_t *sc, void * data, size_t data_size);
size_t dap_events_socket_write_mt(dap_events_socket_t *sc, const void * data, size_t data_size);
size_t dap_events_socket_write_f_mt(dap_events_socket_t *sc, const char * format,...);
void s_es_remove( dap_events_socket_t *a_es);
void dap_events_socket_delete(dap_events_socket_t *sc,bool preserve_inheritor); // Removes the client from the list
void dap_events_socket_remove_and_delete(dap_events_socket_t* a_es, bool preserve_inheritor );
void dap_events_socket_queue_remove_and_delete(dap_events_socket_t* a_es);
int dap_events_socket_kill_socket( dap_events_socket_t *a_es );
......
......@@ -115,13 +115,16 @@ dap_stream_ch_t* dap_stream_ch_new(dap_stream_t* a_stream, uint8_t id)
}
}
bool dap_stream_ch_valid(dap_stream_ch_t *a_ch)
struct dap_stream_ch_table_t *dap_stream_ch_valid(dap_stream_ch_t *a_ch)
{
struct dap_stream_ch_table_t *l_ret;
if(!a_ch)
return false;
pthread_mutex_lock(&s_ch_table_lock);
HASH_FIND_PTR(s_ch_table, &a_ch, l_ret);
if (l_ret) {
pthread_mutex_lock(&a_ch->mutex);
}
pthread_mutex_unlock(&s_ch_table_lock);
return l_ret;
}
......@@ -133,18 +136,27 @@ bool dap_stream_ch_valid(dap_stream_ch_t *a_ch)
void dap_stream_ch_delete(dap_stream_ch_t *a_ch)
{
pthread_mutex_lock(&s_ch_table_lock);
struct dap_stream_ch_table_t *l_ret;
struct dap_stream_ch_table_t *l_ret;;
HASH_FIND_PTR(s_ch_table, &a_ch, l_ret);
if (!l_ret) {
pthread_mutex_unlock(&s_ch_table_lock);
return;
}
HASH_DEL(s_ch_table, l_ret);
pthread_mutex_lock(&a_ch->mutex);
pthread_mutex_unlock(&s_ch_table_lock);
DAP_DELETE(l_ret);
pthread_mutex_lock(&a_ch->mutex);
if (a_ch->proc)
if (a_ch->proc->delete_callback)
a_ch->proc->delete_callback(a_ch, NULL);
pthread_mutex_unlock(&a_ch->mutex);
pthread_mutex_destroy(&a_ch->mutex);
//pthread_rwlock_wrlock(&a_ch->stream->rwlock);
a_ch->stream->channel[a_ch->stream->channel_count--] = NULL;
//pthread_rwlock_unlock(&a_ch->stream->rwlock);
/* fixed raise, but probably may be memory leak!
if(ch->internal){
free(ch->internal);
......@@ -163,7 +175,6 @@ void dap_stream_ch_set_ready_to_read(dap_stream_ch_t * a_ch,bool a_is_ready)
if (!dap_stream_ch_valid(a_ch)) {
return;
}
pthread_mutex_lock(&a_ch->mutex);
if( a_ch->ready_to_read != a_is_ready){
//log_it(L_DEBUG,"Change channel '%c' to %s", (char) ch->proc->id, is_ready?"true":"false");
a_ch->ready_to_read=a_is_ready;
......@@ -189,7 +200,6 @@ void dap_stream_ch_set_ready_to_write(dap_stream_ch_t * ch,bool is_ready)
if (!dap_stream_ch_valid(ch)) {
return;
}
pthread_mutex_lock(&ch->mutex);
if(ch->ready_to_write!=is_ready){
//log_it(L_DEBUG,"Change channel '%c' to %s", (char) ch->proc->id, is_ready?"true":"false");
ch->ready_to_write=is_ready;
......@@ -218,7 +228,6 @@ bool dap_stream_ch_get_ready_to_read(dap_stream_ch_t * a_ch)
return false;
}
bool l_ret;
pthread_mutex_lock(&a_ch->mutex);
l_ret = a_ch->ready_to_read;
pthread_mutex_unlock(&a_ch->mutex);
return l_ret;
......@@ -235,7 +244,6 @@ bool dap_stream_ch_get_ready_to_write(dap_stream_ch_t * a_ch)
return false;
}
bool l_ret;
pthread_mutex_lock(&a_ch->mutex);
l_ret = a_ch->ready_to_write;
pthread_mutex_unlock(&a_ch->mutex);
return l_ret;
......
......@@ -82,7 +82,6 @@ size_t dap_stream_ch_pkt_write(struct dap_stream_ch * a_ch, uint8_t a_type, con
if (!dap_stream_ch_valid(a_ch)) {
return 0;
}
pthread_mutex_lock( &a_ch->mutex);
//log_it(L_DEBUG,"Output: Has %u bytes of %c type for %c channel id",data_size, (char)type, (char) ch->proc->id );
......
......@@ -63,6 +63,6 @@ bool dap_stream_ch_get_ready_to_write(dap_stream_ch_t *a_ch);
void dap_stream_ch_delete(dap_stream_ch_t *a_ch);
bool dap_stream_ch_valid(dap_stream_ch_t *a_ch);
struct dap_stream_ch_table_t *dap_stream_ch_valid(dap_stream_ch_t *a_ch);
#endif
......@@ -338,19 +338,50 @@ dap_stream_t * stream_new(dap_http_client_t * a_sh)
return ret;
}
void dap_stream_delete( dap_stream_t *a_stream )
void dap_stream_delete(dap_stream_t *a_stream)
{
// log_it(L_DEBUG,"dap_stream_delete( )");
if(a_stream == NULL) {
log_it(L_ERROR,"stream delete NULL instance");
return;
}
pthread_rwlock_destroy(&a_stream->rwlock);
stream_dap_delete(a_stream->conn, NULL);
pthread_mutex_lock(&s_mutex_keepalive_list);
if(s_stream_keepalive_list){
DL_DELETE(s_stream_keepalive_list, a_stream);
}
a_stream->conn_udp = NULL;
a_stream->conn = NULL;
a_stream->events_socket = NULL;
pthread_mutex_unlock(&s_mutex_keepalive_list);
while (a_stream->channel_count) {
dap_stream_ch_delete(a_stream->channel[a_stream->channel_count - 1]);
}
pthread_rwlock_wrlock(&a_stream->rwlock);
if(a_stream->session)
dap_stream_session_close(a_stream->session->id);
a_stream->session = NULL;
pthread_rwlock_unlock(&a_stream->rwlock);
pthread_rwlock_destroy(&a_stream->rwlock);
DAP_DELETE(a_stream);
log_it(L_NOTICE,"Stream connection is over");
}
/**
* @brief stream_dap_delete Delete callback for UDP client
* @param sh DAP client instance
* @param arg Not used
*/
void stream_dap_delete(dap_client_remote_t* sh, void * arg)
{
UNUSED(arg);
if (!sh)
return;
dap_stream_t *l_stream = DAP_STREAM(sh);
dap_stream_delete(l_stream);
}
/**
* @brief dap_stream_new_es
* @param a_es
......@@ -678,42 +709,6 @@ void stream_dap_data_write(dap_client_remote_t* a_client , void * arg){
//log_it(L_DEBUG,"stream_dap_data_write ok");
}
/**
* @brief stream_dap_delete Delete callback for UDP client
* @param sh DAP client instance
* @param arg Not used
*/
void stream_dap_delete(dap_client_remote_t* sh, void * arg){
if(!sh)
return;
dap_stream_t * l_stream = DAP_STREAM(sh);
if(l_stream == NULL)
return;
(void) arg;
pthread_mutex_lock(&s_mutex_keepalive_list);
if(s_stream_keepalive_list){
DL_DELETE(s_stream_keepalive_list, l_stream);
}
pthread_mutex_unlock(&s_mutex_keepalive_list);
/* Until channel is closed, it may still need l_stream->rwlock, so we can't lock it here yet.
In case of races on stream closing think about making this place more robust;
or forbid locking l_stream->rwlock from inside of channels. */
for( ;l_stream->channel_count; l_stream->channel_count--){
dap_stream_ch_delete(l_stream->channel[l_stream->channel_count - 1]);
l_stream->channel[l_stream->channel_count - 1] = NULL;
}
pthread_rwlock_wrlock(&l_stream->rwlock);
if(l_stream->session)
dap_stream_session_close(l_stream->session->id);
l_stream->session = NULL;
pthread_rwlock_unlock(&l_stream->rwlock);
//free(sid);
log_it(L_NOTICE,"Stream connection is over");
}
/**
* @brief stream_dap_new New connection callback for UDP client
* @param sh DAP client instance
......
......@@ -29,6 +29,7 @@
#include <sys/types.h>
#include <assert.h>
#include <errno.h>
#include <stdbool.h>
#ifdef _WIN32
#include <winsock2.h>
......@@ -47,6 +48,7 @@
#include "dap_chain_node_cli.h" // for UNIX_SOCKET_FILE
#include "dap_app_cli.h"
#include "dap_app_cli_net.h"
#include "dap_enc_base64.h"
static int s_status;
......@@ -155,6 +157,27 @@ dap_app_cli_connect_param_t* dap_app_cli_connect(const char *a_socket_path)
return l_ret;
}
/* if cli command argument contains one of the following symbol
argument is going to be encoded to base64 */
static const char* s_dap_app_cli_forbidden_symbols[] = {"\r\n", ";", ""};
bool s_dap_app_cli_cmd_contains_forbidden_symbol(const char * a_cmd_param){
for(int i = 0; s_dap_app_cli_forbidden_symbols[i][0] != '\0'; i++){
if(strstr(a_cmd_param, s_dap_app_cli_forbidden_symbols[i]))
return true;
}
return false;
}
char * s_dap_app_cli_strdup_to_base64(const char * a_cmd_param){
size_t l_cmd_param_len = strlen(a_cmd_param);
size_t l_cmd_param_base64_len = DAP_ENC_BASE64_ENCODE_SIZE(l_cmd_param_len) + 1;
char * l_cmd_param_base64 = DAP_NEW_SIZE(char, l_cmd_param_base64_len);
size_t l_cmd_param_base64_len_res = dap_enc_base64_encode(a_cmd_param, l_cmd_param_len, l_cmd_param_base64, DAP_ENC_DATA_TYPE_B64);
l_cmd_param_base64[l_cmd_param_base64_len_res] = '\0';
return l_cmd_param_base64;
}
/**
* Send request to kelvin-node
*
......@@ -174,7 +197,13 @@ int dap_app_cli_post_command( dap_app_cli_connect_param_t *a_socket, dap_app_cli
for (int i = 0; i < a_cmd->cmd_param_count; i++) {
if (a_cmd->cmd_param[i]) {
dap_string_append(l_cmd_data, "\r\n");
dap_string_append(l_cmd_data, a_cmd->cmd_param[i]);
if(s_dap_app_cli_cmd_contains_forbidden_symbol(a_cmd->cmd_param[i])){
char * l_cmd_param_base64 = s_dap_app_cli_strdup_to_base64(a_cmd->cmd_param[i]);
dap_string_append(l_cmd_data, l_cmd_param_base64);
DAP_DELETE(l_cmd_param_base64);
}else{
dap_string_append(l_cmd_data, a_cmd->cmd_param[i]);
}
}
}
}
......
......@@ -255,9 +255,9 @@ dap_chain_t * dap_chain_load_from_cfg(dap_ledger_t* a_ledger, const char * a_cha
// Recognize chains id
if ( (l_chain_id_str = dap_config_get_item_str(l_cfg,"chain","id")) != NULL ){
if ( sscanf(l_chain_id_str,"0x%016lX",& l_chain_id_u ) !=1 ){
if ( sscanf(l_chain_id_str,"0x%016lx",&l_chain_id_u) !=1 ) {
if ( sscanf(l_chain_id_str,"%lu",&l_chain_id_u ) !=1 ){
if ( sscanf(l_chain_id_str,"0x%016llX",& l_chain_id_u ) !=1 ){
if ( sscanf(l_chain_id_str,"0x%016llx",&l_chain_id_u) !=1 ) {
if ( sscanf(l_chain_id_str,"%llu",&l_chain_id_u ) !=1 ){
log_it (L_ERROR,"Can't recognize '%s' string as chain net id, hex or dec",l_chain_id_str);
dap_config_close(l_cfg);
return NULL;
......
......@@ -213,7 +213,7 @@ int dap_chain_cell_file_append( dap_chain_cell_t * a_cell, const void* a_atom, s
l_total_wrote_bytes += a_atom_size;
// change in chain happened -> nodes synchronization required
if(a_cell->chain && a_cell->chain->callback_notify)
a_cell->chain->callback_notify(a_cell->chain->callback_notify_arg, a_cell->chain, a_cell->id);
a_cell->chain->callback_notify(a_cell->chain->callback_notify_arg, a_cell->chain, a_cell->id, (void *)a_atom, a_atom_size);
} else {
log_it (L_ERROR, "Can't write data from cell 0x%016X to the file \"%s\"",
a_cell->id.uint64,
......
......@@ -83,7 +83,7 @@ typedef void (*dap_chain_callback_atom_iter_delete_t)(dap_chain_atom_iter_t * )
typedef size_t (*dap_chain_datum_callback_datum_pool_proc_add_t)(dap_chain_t * , dap_chain_datum_t **, size_t );
typedef size_t (*dap_chain_datum_callback_datum_pool_proc_add_with_group_t)(dap_chain_t * , dap_chain_datum_t **, size_t, const char *);
typedef void (*dap_chain_callback_notify_t)(void * a_arg, dap_chain_t *a_chain, dap_chain_cell_id_t a_id); //change in chain happened
typedef void (*dap_chain_callback_notify_t)(void * a_arg, dap_chain_t *a_chain, dap_chain_cell_id_t a_id, void* a_atom, size_t a_atom_size); //change in chain happened
typedef enum dap_chain_type
{
......
......@@ -44,4 +44,4 @@ typedef struct dap_stream_ch_chain_net_srv {
void dap_stream_ch_chain_net_srv_set_srv_uid(dap_stream_ch_t* a_ch, dap_chain_net_srv_uid_t a_srv_uid);
uint8_t dap_stream_ch_chain_net_srv_get_id();
int dap_stream_ch_chain_net_srv_init(void);