Commit ee8c3030 authored by dmitriy.gerasimov's avatar dmitriy.gerasimov

Merge branch 'features-4616-d' into 'develop'

features-4616

See merge request !2
parents 4a5a6c6b 80cf6b34
set(DAP_SDK_NATIVE_VERSION "2.0-13")
set(DAP_SDK_NATIVE_VERSION "2.0-14")
# Core
if (DAPSDK_MODULES MATCHES "core")
# Core
......
......@@ -77,7 +77,7 @@
#define DAP_ENC_KS_KEY_ID_SIZE 33
#endif
static void s_stage_status_after(dap_client_pvt_t * a_client_internal);
static bool s_stage_status_after(dap_client_pvt_t * a_client_internal);
// ENC stage callbacks
static void s_enc_init_response(dap_client_t *, void *, size_t);
......@@ -188,7 +188,7 @@ static void s_stream_connected(dap_client_pvt_t * a_client_pvt)
* @brief s_client_internal_stage_status_proc
* @param a_client
*/
static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt)
{
dap_worker_t * l_worker= a_client_pvt->worker;
assert(l_worker);
......@@ -213,7 +213,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
}
a_client_pvt->stage_status = STAGE_STATUS_DONE;
s_stage_status_after(a_client_pvt);
return;
return false;
}
switch (l_stage) {
case STAGE_ENC_INIT: {
......@@ -475,8 +475,8 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
log_it(L_INFO, "Connection attempt %d in 0.3 seconds", a_client_pvt->stage_errors);
// small delay before next request
dap_timerfd_start_on_worker( l_worker, 300, (dap_timerfd_callback_t)s_stage_status_after,
a_client_pvt, false );
dap_timerfd_start_on_worker(l_worker, 300, (dap_timerfd_callback_t)s_stage_status_after,
a_client_pvt);
}
else{
log_it(L_INFO, "Too many connection attempts. Tries are over.");
......@@ -513,7 +513,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
if(a_client_pvt->stage_status_callback)
a_client_pvt->stage_status_callback(a_client_pvt->client, NULL);
return false;
}
/**
......@@ -686,7 +686,6 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char
// if(l_url_full)
// DAP_DELETE(l_url_full);
}
/**
......
......@@ -70,9 +70,9 @@ void __stdcall TimerAPCb(void* arg, DWORD low, DWORD high) { // Timer high valu
* @param a_callback
* @return new allocated dap_timerfd_t structure or NULL if error
*/
dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg, bool a_repeated)
dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg)
{
return dap_timerfd_start_on_worker(dap_events_worker_get_auto(), a_timeout_ms, a_callback, a_callback_arg, a_repeated);
return dap_timerfd_start_on_worker(dap_events_worker_get_auto(), a_timeout_ms, a_callback, a_callback_arg);
}
/**
......@@ -83,7 +83,7 @@ dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a
* @param a_callback_arg
* @return
*/
dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg, bool a_repeated)
dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg)
{
dap_timerfd_t *l_timerfd = DAP_NEW(dap_timerfd_t);
......@@ -149,7 +149,6 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t
l_timerfd->events_socket = l_events_socket;
l_timerfd->callback = a_callback;
l_timerfd->callback_arg = a_callback_arg;
l_timerfd->repeated = a_repeated;
#ifdef DAP_OS_WINDOWS
l_timerfd->th = l_th;
l_timerfd->pipe_in = l_pipe[1];
......@@ -166,9 +165,7 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock)
{
dap_timerfd_t *l_timerfd = a_event_sock->_inheritor;
// run user's callback
if(l_timerfd->callback)
l_timerfd->callback(l_timerfd->callback_arg);
if (l_timerfd->repeated) {
if (l_timerfd->callback && l_timerfd->callback(l_timerfd->callback_arg)) {
#if defined DAP_OS_UNIX
struct itimerspec l_ts;
// repeat never
......@@ -190,7 +187,9 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock)
#endif
dap_events_socket_set_readable_unsafe(a_event_sock, true);
} else {
#if defined DAP_OS_WINDOWS
#if defined DAP_OS_UNIX
close(l_timerfd->tfd);
#elif defined DAP_OS_WINDOWS
CloseHandle(l_timerfd->th);
#endif
dap_events_socket_remove_and_delete_unsafe(l_timerfd->events_socket, false);
......
......@@ -40,10 +40,10 @@
#define LOG_TAG "dap_worker"
// temporary too big timout for no closing sockets opened to not keep alive peers
static time_t s_connection_timeout = 20000; // 60; // seconds
static time_t s_connection_timeout = 60; // seconds
static void s_socket_all_check_activity( void * a_arg);
static bool s_socket_all_check_activity( void * a_arg);
static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg);
static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg);
static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_arg);
......@@ -112,7 +112,7 @@ void *dap_worker_thread(void *arg)
l_worker->queue_callback= dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_callback_callback);
l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback );
l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker, s_connection_timeout * 1000 / 2,
s_socket_all_check_activity, l_worker, true );
(dap_timerfd_callback_t)s_socket_all_check_activity, l_worker );
pthread_setspecific(l_worker->events->pth_key_worker, l_worker);
pthread_cond_broadcast(&l_worker->started_cond);
......@@ -641,7 +641,7 @@ static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg)
* @brief s_socket_all_check_activity
* @param a_arg
*/
static void s_socket_all_check_activity( void * a_arg)
static bool s_socket_all_check_activity( void * a_arg)
{
dap_worker_t *l_worker = (dap_worker_t*) a_arg;
assert(l_worker);
......@@ -663,6 +663,7 @@ static void s_socket_all_check_activity( void * a_arg)
}
}
}
return true;
}
/**
......
......@@ -38,7 +38,7 @@
#include "dap_common.h"
#include "dap_events_socket.h"
typedef void (*dap_timerfd_callback_t)(void* ); // Callback for timer
typedef bool (*dap_timerfd_callback_t)(void* ); // Callback for timer. If return true, it will be called after next timeout
typedef struct dap_timerfd {
uint64_t timeout_ms;
......@@ -46,7 +46,6 @@ typedef struct dap_timerfd {
dap_events_socket_t *events_socket;
dap_timerfd_callback_t callback;
void *callback_arg;
bool repeated;
#ifdef DAP_OS_WINDOWS
HANDLE th;
int pipe_in;
......@@ -54,7 +53,7 @@ typedef struct dap_timerfd {
} dap_timerfd_t;
int dap_timerfd_init();
dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *callback_arg, bool a_repeated);
dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg, bool a_repeated);
dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *callback_arg);
dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg);
void dap_timerfd_delete(dap_timerfd_t *l_timerfd);
......@@ -162,8 +162,8 @@ bool dap_stream_ch_check_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t *
*/
size_t dap_stream_ch_pkt_write_unsafe(dap_stream_ch_t * a_ch, uint8_t a_type, const void * a_data, size_t a_data_size)
{
if (!a_data_size || !a_ch || !a_data) {
log_it(L_WARNING, "NULL ptr or zero data size to write out in channel");
if (!a_ch) {
log_it(L_WARNING, "Channel is NULL ptr");
return 0;
}
//log_it(L_DEBUG,"Output: Has %u bytes of %c type for %c channel id",data_size, (char)type, (char) ch->proc->id );
......@@ -200,7 +200,7 @@ size_t dap_stream_ch_pkt_write_unsafe(dap_stream_ch_t * a_ch, uint8_t a_type, c
size_t l_ret=dap_stream_pkt_write_unsafe(a_ch->stream,l_buf_selected,a_data_size+sizeof(l_hdr));
a_ch->stat.bytes_write+=a_data_size;
a_ch->ready_to_write=true;
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
if(l_buf_allocated)
DAP_DELETE(l_buf_allocated);
......
......@@ -75,13 +75,11 @@ static void s_http_client_delete(dap_http_client_t * a_esocket, void * a_arg);
static dap_stream_t *s_stream_keepalive_list = NULL;
static pthread_mutex_t s_mutex_keepalive_list;
static void s_keepalive_cb( void );
static bool s_keepalive_cb( void );
static bool s_dump_packet_headers = false;
bool dap_stream_get_dump_packet_headers(){ return s_dump_packet_headers; }
static struct timespec keepalive_loop_sleep = { 0, STREAM_KEEPALIVE_TIMEOUT * 1000 * 1000 };
static bool s_detect_loose_packet(dap_stream_t * a_stream);
dap_enc_key_type_t s_stream_get_preferred_encryption_type = DAP_ENC_KEY_TYPE_IAES;
......@@ -119,7 +117,7 @@ int dap_stream_init(dap_config_t * a_config)
s_dap_stream_load_preferred_encryption_type(a_config);
s_dump_packet_headers = dap_config_get_item_bool_default(g_config,"general","debug_dump_stream_headers",false);
pthread_mutex_init( &s_mutex_keepalive_list, NULL );
dap_timerfd_start(STREAM_KEEPALIVE_TIMEOUT * 1000, (dap_timerfd_callback_t)s_keepalive_cb, NULL, true);
dap_timerfd_start(STREAM_KEEPALIVE_TIMEOUT * 1000, (dap_timerfd_callback_t)s_keepalive_cb, NULL);
log_it(L_NOTICE,"Init streaming module");
return 0;
......@@ -786,7 +784,7 @@ static bool s_detect_loose_packet(dap_stream_t * a_stream)
}
static void s_keepalive_cb( void )
static bool s_keepalive_cb( void )
{
dap_stream_t *l_stream, *tmp;
pthread_mutex_lock( &s_mutex_keepalive_list );
......@@ -797,5 +795,6 @@ static void s_keepalive_cb( void )
dap_events_socket_write_mt(l_stream->stream_worker->worker, l_stream->esocket, &l_pkt, sizeof(l_pkt));
}
pthread_mutex_unlock( &s_mutex_keepalive_list );
return true;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment