Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • dap/dap-sdk
1 result
Show changes
Commits on Source (4)
set(DAP_SDK_NATIVE_VERSION "2.0-13")
set(DAP_SDK_NATIVE_VERSION "2.0-14")
# Core
if (DAPSDK_MODULES MATCHES "core")
# Core
......
......@@ -33,6 +33,10 @@
#include <stdlib.h>
#include <stdio.h>
#include <time.h>
#ifdef DAP_OS_WINDOWS
#include <fcntl.h>
#define pipe(pfds) _pipe(pfds, 4096, _O_BINARY)
#endif
#ifdef __MACH__
#include <dispatch/dispatch.h>
#endif
......
......@@ -55,8 +55,6 @@
#define popen _popen
#define pclose _pclose
#define pipe(pfds) _pipe(pfds, 4096, 0x8000)
#endif
#include "dap_common.h"
......
......@@ -77,7 +77,7 @@
#define DAP_ENC_KS_KEY_ID_SIZE 33
#endif
static void s_stage_status_after(dap_client_pvt_t * a_client_internal);
static bool s_stage_status_after(dap_client_pvt_t * a_client_internal);
// ENC stage callbacks
static void s_enc_init_response(dap_client_t *, void *, size_t);
......@@ -188,7 +188,7 @@ static void s_stream_connected(dap_client_pvt_t * a_client_pvt)
* @brief s_client_internal_stage_status_proc
* @param a_client
*/
static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt)
{
dap_worker_t * l_worker= a_client_pvt->worker;
assert(l_worker);
......@@ -213,7 +213,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
}
a_client_pvt->stage_status = STAGE_STATUS_DONE;
s_stage_status_after(a_client_pvt);
return;
return false;
}
switch (l_stage) {
case STAGE_ENC_INIT: {
......@@ -475,8 +475,8 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
log_it(L_INFO, "Connection attempt %d in 0.3 seconds", a_client_pvt->stage_errors);
// small delay before next request
dap_timerfd_start_on_worker( l_worker, 300, (dap_timerfd_callback_t)s_stage_status_after,
a_client_pvt, false );
dap_timerfd_start_on_worker(l_worker, 300, (dap_timerfd_callback_t)s_stage_status_after,
a_client_pvt);
}
else{
log_it(L_INFO, "Too many connection attempts. Tries are over.");
......@@ -513,7 +513,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
if(a_client_pvt->stage_status_callback)
a_client_pvt->stage_status_callback(a_client_pvt->client, NULL);
return false;
}
/**
......@@ -686,7 +686,6 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char
// if(l_url_full)
// DAP_DELETE(l_url_full);
}
/**
......
......@@ -28,11 +28,10 @@
#include <string.h>
#include <assert.h>
#include <errno.h>
#ifndef _WIN32
#ifndef DAP_OS_WINDOWS
#include <sys/epoll.h>
#include <sys/select.h>
#include <unistd.h>
#include <fcntl.h>
#else
#include <winsock2.h>
#include <windows.h>
......@@ -42,7 +41,7 @@
#include "wepoll.h"
#endif
#include <pthread.h>
#include <fcntl.h>
#include "dap_common.h"
#include "dap_list.h"
#include "dap_worker.h"
......@@ -214,16 +213,22 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c
l_errbuf[0]=0;
if( pipe(l_pipe) < 0 ){
l_errno = errno;
#if defined DAP_OS_UNIX
strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
log_it( L_ERROR, "Error detected, can't create pipe(): '%s' (%d)", l_errbuf, l_errno);
#elif defined DAP_OS_WINDOWS
log_it( L_ERROR, "Can't create pipe, errno: %d", l_errno);
#endif
DAP_DELETE(l_es);
return NULL;
}//else
// log_it(L_DEBUG, "Created one-way unnamed bytestream pipe %d->%d", l_pipe[0], l_pipe[1]);
l_es->fd = l_pipe[0];
l_es->fd2 = l_pipe[1];
#if defined DAP_OS_UNIX
fcntl( l_pipe[0], F_SETFL, O_NONBLOCK);
fcntl( l_pipe[1], F_SETFL, O_NONBLOCK);
#endif
#else
#error "No defined s_create_type_pipe() for your platform"
......@@ -440,8 +445,21 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_
}
DAP_DELETE(l_es);
return NULL;
}//else
// log_it(L_DEBUG, "Created eventfd descriptor %d", l_es->fd );
} else {
l_es->fd2 = l_es->fd;
log_it(L_DEBUG, "Created eventfd descriptor %d", l_es->fd );
}
#elif defined DAP_OS_WINDOWS
int l_pipe[2];
if (pipe(l_pipe) < 0) {
log_it(L_ERROR, "Can't create pipe for event type, error: %d", errno);
DAP_DELETE(l_es);
return NULL;
}
l_es->fd2 = l_pipe[0];
l_es->fd = l_pipe[1];
log_it(L_DEBUG, "Created pipe for event type, %d -> %d", l_es->fd2, l_es->fd);
#endif
return l_es;
}
......@@ -495,11 +513,24 @@ void dap_events_socket_event_proc_input_unsafe(dap_events_socket_t *a_esocket)
log_it(L_WARNING, "Can't read packet from event fd: \"%s\"(%d)", l_errbuf, l_errno);
}else
return; // do nothing
#elif defined DAP_OS_WINDOWS
uint64_t l_value;
int l_ret;
switch (l_ret = read(a_esocket->fd, &l_value, 8)) {
case -1:
log_it(L_CRITICAL, "Can't read from event socket pipe, error: %d", errno);
break;
case 0:
return;
default:
a_esocket->callbacks.event_callback(a_esocket, l_value);
break;
}
#else
#error "No Queue fetch mechanism implemented on your platform"
#error "No event fetch mechanism implemented on your platform"
#endif
}else
log_it(L_ERROR, "Queue socket %d accepted data but callback is NULL ", a_esocket->socket);
log_it(L_ERROR, "Event socket %d accepted data but callback is NULL ", a_esocket->socket);
}
......@@ -642,6 +673,13 @@ int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value
return l_errno;
else
return 1;
#elif defined DAP_OS_WINDOWS
byte_t l_bytes[sizeof(void*)] = { 0 };
if(write(a_es->fd2, l_bytes, sizeof(l_bytes)) == -1) {
return errno;
} else {
return 0;
}
#else
#error "Not implemented dap_events_socket_event_signal() for this platform"
#endif
......
......@@ -216,7 +216,7 @@ static void * s_proc_thread_function(void * a_arg)
#ifdef DAP_EVENTS_CAPS_EPOLL
//log_it(L_DEBUG, "Epoll_wait call");
int l_selected_sockets = epoll_wait(l_thread->epoll_ctl, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1);
size_t l_sockets_max = l_selected_sockets;
size_t l_sockets_max = (size_t)l_selected_sockets;
#elif defined (DAP_EVENTS_CAPS_POLL)
int l_selected_sockets = poll(l_poll,l_poll_count,-1);
size_t l_sockets_max = l_poll_count;
......@@ -227,10 +227,14 @@ static void * s_proc_thread_function(void * a_arg)
if(l_selected_sockets == -1) {
if( errno == EINTR)
continue;
#if defined DAP_OS_UNIX
int l_errno = errno;
char l_errbuf[128];
strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
log_it(L_ERROR, "Proc thread #%d got errno:\"%s\" (%d)", l_thread->cpu_id , l_errbuf, l_errno);
#elif DAP_OS_WINDOWS
log_it(L_ERROR, "Error occured on thread #%d, errno: %d", l_thread->cpu_id , errno);
#endif
break;
}
for(size_t n = 0; n < l_sockets_max; n++) {
......@@ -269,12 +273,16 @@ static void * s_proc_thread_function(void * a_arg)
time_t l_cur_time = time( NULL);
l_cur->last_time_active = l_cur_time;
if (l_flag_error){
#if defined DAP_OS_UNIX
int l_errno = errno;
char l_errbuf[128];
strerror_r(l_errno, l_errbuf,sizeof (l_errbuf));
log_it(L_ERROR,"Some error on proc thread #%u with %d socket: %s(%d)",l_thread->cpu_id, l_cur->socket, l_errbuf, l_errno);
#elif defined DAP_OS_WINDOWS
log_it(L_ERROR,"Some error occured on thread #%u with socket %d, errno: %d",l_thread->cpu_id, l_cur->socket, errno);
#endif
if(l_cur->callbacks.error_callback)
l_cur->callbacks.error_callback(l_cur,errno);
l_cur->callbacks.error_callback(l_cur, errno);
}
if (l_flag_read ){
switch (l_cur->type) {
......@@ -285,7 +293,9 @@ static void * s_proc_thread_function(void * a_arg)
dap_events_socket_event_proc_input_unsafe (l_cur);
break;
default:{ log_it(L_ERROR, "Unprocessed descriptor type accepted in proc thread loop"); }
default:
log_it(L_ERROR, "Unprocessed descriptor type accepted in proc thread loop");
break;
}
}
if(l_cur->flags & DAP_ESOCK_SIGNAL_CLOSE){
......
......@@ -20,17 +20,17 @@
You should have received a copy of the GNU General Public License
along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef DAP_OS_UNIX
#include <stdint.h>
#include <stdbool.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/timerfd.h>
#include <inttypes.h>
#ifdef DAP_OS_WINDOWS
#include <winsock2.h>
#endif
#include "dap_common.h"
#include "dap_events.h"
#include "dap_worker.h"
......@@ -51,15 +51,28 @@ int dap_timerfd_init()
return 0;
}
#ifdef DAP_OS_WINDOWS
void __stdcall TimerAPCb(void* arg, DWORD low, DWORD high) { // Timer high value.
UNREFERENCED_PARAMETER(low)
UNREFERENCED_PARAMETER(high)
dap_timerfd_t *l_timerfd = (dap_timerfd_t *)arg;
byte_t l_bytes[sizeof(void*)] = { 0 };
if (write(l_timerfd->pipe_in, l_bytes, sizeof(l_bytes)) == -1) {
log_it(L_CRITICAL, "Error occured on writing into pipe from APC, errno: %d", errno);
}
}
#endif
/**
* @brief dap_timerfd_start
* @param a_timeout_ms
* @param a_callback
* @return new allocated dap_timerfd_t structure or NULL if error
*/
dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg, bool a_repeated)
dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg)
{
return dap_timerfd_start_on_worker(dap_events_worker_get_auto(), a_timeout_ms, a_callback, a_callback_arg, a_repeated);
return dap_timerfd_start_on_worker(dap_events_worker_get_auto(), a_timeout_ms, a_callback, a_callback_arg);
}
/**
......@@ -70,13 +83,16 @@ dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a
* @param a_callback_arg
* @return
*/
dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg, bool a_repeated)
dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg)
{
dap_timerfd_t *l_timerfd = DAP_NEW(dap_timerfd_t);
#if defined DAP_OS_UNIX
struct itimerspec l_ts;
int l_tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
if(l_tfd == -1) {
log_it(L_WARNING, "dap_timerfd_start() failed: timerfd_create() errno=%d\n", errno);
DAP_DELETE(l_timerfd);
return NULL;
}
// repeat never
......@@ -88,11 +104,34 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t
if(timerfd_settime(l_tfd, 0, &l_ts, NULL) < 0) {
log_it(L_WARNING, "dap_timerfd_start() failed: timerfd_settime() errno=%d\n", errno);
close(l_tfd);
DAP_DELETE(l_timerfd);
return NULL;
}
// create dap_timerfd_t structure
dap_timerfd_t *l_timerfd = DAP_NEW(dap_timerfd_t);
#elif defined DAP_OS_WINDOWS
HANDLE l_th = CreateWaitableTimer(NULL, true, NULL);
if (!l_th) {
log_it(L_CRITICAL, "Waitable timer not created, error %d", GetLastError());
DAP_DELETE(l_timerfd);
return NULL;
}
int l_pipe[2];
if (pipe(l_pipe) < 0) {
log_it(L_ERROR, "Can't create pipe, error: %d", errno);
DAP_DELETE(l_timerfd);
return NULL;
}
unsigned long l_mode = 0;
int l_tfd = l_pipe[0];
LARGE_INTEGER l_due_time;
l_due_time.QuadPart = (long long)a_timeout_ms * _MSEC;
if (!SetWaitableTimer(l_th, &l_due_time, 0, TimerAPCb, l_timerfd, false)) {
log_it(L_CRITICAL, "Waitable timer not set, error %d", GetLastError());
CloseHandle(l_th);
DAP_DELETE(l_timerfd);
return NULL;
}
#endif
// create events_socket for timer file descriptor
dap_events_socket_callbacks_t l_s_callbacks;
......@@ -105,14 +144,16 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t
l_events_socket->_inheritor = l_timerfd;
// fill out dap_timerfd_t structure
l_timerfd->timeout_ms = a_timeout_ms;
l_timerfd->tfd = l_tfd;
l_timerfd->events_socket = l_events_socket;
l_timerfd->callback = a_callback;
l_timerfd->callback_arg = a_callback_arg;
l_timerfd->repeated = a_repeated;
l_timerfd->timeout_ms = a_timeout_ms;
l_timerfd->tfd = l_tfd;
l_timerfd->events_socket = l_events_socket;
l_timerfd->callback = a_callback;
l_timerfd->callback_arg = a_callback_arg;
#ifdef DAP_OS_WINDOWS
l_timerfd->th = l_th;
l_timerfd->pipe_in = l_pipe[1];
#endif
dap_worker_add_events_socket(l_events_socket, a_worker);
return l_timerfd;
}
......@@ -122,13 +163,10 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t
*/
static void s_es_callback_timer(struct dap_events_socket *a_event_sock)
{
uint64_t l_ptiu64;
dap_timerfd_t *l_timerfd = a_event_sock->_inheritor;
// run user's callback
if(l_timerfd->callback)
l_timerfd->callback(l_timerfd->callback_arg);
if (l_timerfd->repeated) {
//printf("\nread() returned %d, %d\n", l_ptiu64, l_read_ret);
if (l_timerfd->callback && l_timerfd->callback(l_timerfd->callback_arg)) {
#if defined DAP_OS_UNIX
struct itimerspec l_ts;
// repeat never
l_ts.it_interval.tv_sec = 0;
......@@ -139,8 +177,21 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock)
if(timerfd_settime(l_timerfd->tfd, 0, &l_ts, NULL) < 0) {
log_it(L_WARNING, "callback_timerfd_read() failed: timerfd_settime() errno=%d\n", errno);
}
#elif defined DAP_OS_WINDOWS
LARGE_INTEGER l_due_time;
l_due_time.QuadPart = (long long)l_timerfd->timeout_ms * _MSEC;
if (!SetWaitableTimer(l_timerfd->th, &l_due_time, 0, TimerAPCb, l_timerfd, false)) {
log_it(L_CRITICAL, "Waitable timer not reset, error %d", GetLastError());
CloseHandle(l_timerfd->th);
}
#endif
dap_events_socket_set_readable_unsafe(a_event_sock, true);
} else {
#if defined DAP_OS_UNIX
close(l_timerfd->tfd);
#elif defined DAP_OS_WINDOWS
CloseHandle(l_timerfd->th);
#endif
dap_events_socket_remove_and_delete_unsafe(l_timerfd->events_socket, false);
}
}
......@@ -154,6 +205,3 @@ void dap_timerfd_delete(dap_timerfd_t *l_timerfd)
{
dap_events_socket_remove_and_delete_mt(l_timerfd->events_socket->worker, l_timerfd->events_socket);
}
#else
#error "No dap_timerfd realization for your platform"
#endif
......@@ -40,10 +40,10 @@
#define LOG_TAG "dap_worker"
// temporary too big timout for no closing sockets opened to not keep alive peers
static time_t s_connection_timeout = 20000; // 60; // seconds
static time_t s_connection_timeout = 60; // seconds
static void s_socket_all_check_activity( void * a_arg);
static bool s_socket_all_check_activity( void * a_arg);
static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg);
static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg);
static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_arg);
......@@ -112,7 +112,7 @@ void *dap_worker_thread(void *arg)
l_worker->queue_callback= dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_callback_callback);
l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback );
l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker, s_connection_timeout * 1000 / 2,
s_socket_all_check_activity, l_worker, true );
(dap_timerfd_callback_t)s_socket_all_check_activity, l_worker );
pthread_setspecific(l_worker->events->pth_key_worker, l_worker);
pthread_cond_broadcast(&l_worker->started_cond);
......@@ -641,7 +641,7 @@ static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg)
* @brief s_socket_all_check_activity
* @param a_arg
*/
static void s_socket_all_check_activity( void * a_arg)
static bool s_socket_all_check_activity( void * a_arg)
{
dap_worker_t *l_worker = (dap_worker_t*) a_arg;
assert(l_worker);
......@@ -663,6 +663,7 @@ static void s_socket_all_check_activity( void * a_arg)
}
}
}
return true;
}
/**
......
......@@ -27,14 +27,18 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#if defined DAP_OS_UNIX
#include <sys/time.h>
#include <sys/timerfd.h>
#elif defined DAP_OS_WINDOWS
#define _MSEC -10000
#endif
#include <inttypes.h>
#include "dap_common.h"
#include "dap_events_socket.h"
typedef void (*dap_timerfd_callback_t)(void* ); // Callback for timer
typedef bool (*dap_timerfd_callback_t)(void* ); // Callback for timer. If return true, it will be called after next timeout
typedef struct dap_timerfd {
uint64_t timeout_ms;
......@@ -42,11 +46,14 @@ typedef struct dap_timerfd {
dap_events_socket_t *events_socket;
dap_timerfd_callback_t callback;
void *callback_arg;
bool repeated;
#ifdef DAP_OS_WINDOWS
HANDLE th;
int pipe_in;
#endif
} dap_timerfd_t;
int dap_timerfd_init();
dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *callback_arg, bool a_repeated);
dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg, bool a_repeated);
dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *callback_arg);
dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg);
void dap_timerfd_delete(dap_timerfd_t *l_timerfd);
......@@ -162,8 +162,8 @@ bool dap_stream_ch_check_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t *
*/
size_t dap_stream_ch_pkt_write_unsafe(dap_stream_ch_t * a_ch, uint8_t a_type, const void * a_data, size_t a_data_size)
{
if (!a_data_size || !a_ch || !a_data) {
log_it(L_WARNING, "NULL ptr or zero data size to write out in channel");
if (!a_ch) {
log_it(L_WARNING, "Channel is NULL ptr");
return 0;
}
//log_it(L_DEBUG,"Output: Has %u bytes of %c type for %c channel id",data_size, (char)type, (char) ch->proc->id );
......@@ -200,7 +200,7 @@ size_t dap_stream_ch_pkt_write_unsafe(dap_stream_ch_t * a_ch, uint8_t a_type, c
size_t l_ret=dap_stream_pkt_write_unsafe(a_ch->stream,l_buf_selected,a_data_size+sizeof(l_hdr));
a_ch->stat.bytes_write+=a_data_size;
a_ch->ready_to_write=true;
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
if(l_buf_allocated)
DAP_DELETE(l_buf_allocated);
......
......@@ -75,13 +75,11 @@ static void s_http_client_delete(dap_http_client_t * a_esocket, void * a_arg);
static dap_stream_t *s_stream_keepalive_list = NULL;
static pthread_mutex_t s_mutex_keepalive_list;
static void s_keepalive_cb( void );
static bool s_keepalive_cb( void );
static bool s_dump_packet_headers = false;
bool dap_stream_get_dump_packet_headers(){ return s_dump_packet_headers; }
static struct timespec keepalive_loop_sleep = { 0, STREAM_KEEPALIVE_TIMEOUT * 1000 * 1000 };
static bool s_detect_loose_packet(dap_stream_t * a_stream);
dap_enc_key_type_t s_stream_get_preferred_encryption_type = DAP_ENC_KEY_TYPE_IAES;
......@@ -119,7 +117,7 @@ int dap_stream_init(dap_config_t * a_config)
s_dap_stream_load_preferred_encryption_type(a_config);
s_dump_packet_headers = dap_config_get_item_bool_default(g_config,"general","debug_dump_stream_headers",false);
pthread_mutex_init( &s_mutex_keepalive_list, NULL );
dap_timerfd_start(STREAM_KEEPALIVE_TIMEOUT * 1000, (dap_timerfd_callback_t)s_keepalive_cb, NULL, true);
dap_timerfd_start(STREAM_KEEPALIVE_TIMEOUT * 1000, (dap_timerfd_callback_t)s_keepalive_cb, NULL);
log_it(L_NOTICE,"Init streaming module");
return 0;
......@@ -786,7 +784,7 @@ static bool s_detect_loose_packet(dap_stream_t * a_stream)
}
static void s_keepalive_cb( void )
static bool s_keepalive_cb( void )
{
dap_stream_t *l_stream, *tmp;
pthread_mutex_lock( &s_mutex_keepalive_list );
......@@ -797,5 +795,6 @@ static void s_keepalive_cb( void )
dap_events_socket_write_mt(l_stream->stream_worker->worker, l_stream->esocket, &l_pkt, sizeof(l_pkt));
}
pthread_mutex_unlock( &s_mutex_keepalive_list );
return true;
}