Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (1)
......@@ -2,7 +2,7 @@ project(cellframe-sdk C)
cmake_minimum_required(VERSION 3.0)
set(CMAKE_C_STANDARD 11)
set(CELLFRAME_SDK_NATIVE_VERSION "2.9-22")
set(CELLFRAME_SDK_NATIVE_VERSION "2.9-23")
add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"")
set(DAPSDK_MODULES "")
message("Cellframe modules: ${CELLFRAME_MODULES}")
......
......@@ -64,6 +64,9 @@ static void s_http_client_headers_write(dap_http_client_t * a_http_client, void
static void s_http_client_data_write(dap_http_client_t * a_http_client, void * a_arg); // Write the data
static void s_http_client_data_read(dap_http_client_t * a_http_client, void * a_arg); // Read the data
static void s_esocket_callback_worker_assign(dap_events_socket_t * a_esocket, dap_worker_t * a_worker);
static void s_esocket_callback_worker_unassign(dap_events_socket_t * a_esocket, dap_worker_t * a_worker);
static void s_esocket_data_read(dap_events_socket_t* a_esocket, void * a_arg);
static void s_esocket_write(dap_events_socket_t* a_esocket, void * a_arg);
static void s_esocket_callback_delete(dap_events_socket_t* a_esocket, void * a_arg);
......@@ -71,13 +74,13 @@ static void s_udp_esocket_new(dap_events_socket_t* a_esocket,void * a_arg);
// Internal functions
static dap_stream_t * s_stream_new(dap_http_client_t * a_http_client); // Create new stream
static void s_http_client_new(dap_http_client_t * a_esocket, void * a_arg);
static void s_http_client_delete(dap_http_client_t * a_esocket, void * a_arg);
static dap_stream_t *s_stream_keepalive_list = NULL;
static pthread_mutex_t s_mutex_keepalive_list;
static bool s_keepalive_cb( void );
static bool s_callback_keepalive( void * a_arg);
static bool s_dump_packet_headers = false;
static bool s_debug = false;
bool dap_stream_get_dump_packet_headers(){ return s_dump_packet_headers; }
......@@ -117,8 +120,7 @@ int dap_stream_init(dap_config_t * a_config)
s_dap_stream_load_preferred_encryption_type(a_config);
s_dump_packet_headers = dap_config_get_item_bool_default(g_config,"general","debug_dump_stream_headers",false);
pthread_mutex_init( &s_mutex_keepalive_list, NULL );
dap_timerfd_start(STREAM_KEEPALIVE_TIMEOUT * 1000, (dap_timerfd_callback_t)s_keepalive_cb, NULL);
s_debug = dap_config_get_item_bool_default(g_config,"stream","debug",false);
log_it(L_NOTICE,"Init streaming module");
return 0;
......@@ -129,7 +131,6 @@ int dap_stream_init(dap_config_t * a_config)
*/
void dap_stream_deinit()
{
pthread_mutex_destroy( &s_mutex_keepalive_list );
dap_stream_ch_deinit( );
}
......@@ -142,7 +143,7 @@ void dap_stream_add_proc_http(struct dap_http * a_http, const char * a_url)
{
dap_http_add_proc(a_http,a_url
,NULL, // _internal
NULL, // New
s_http_client_new, // New
s_http_client_delete, // Delete
s_http_client_headers_read, // Headers read
s_http_client_headers_write, // Headerts write
......@@ -161,6 +162,9 @@ void dap_stream_add_proc_udp(dap_server_t *a_udp_server)
a_udp_server->client_callbacks.write_callback = s_esocket_write;
a_udp_server->client_callbacks.delete_callback = s_esocket_callback_delete;
a_udp_server->client_callbacks.new_callback = s_udp_esocket_new;
a_udp_server->client_callbacks.worker_assign_callback = s_esocket_callback_worker_assign;
a_udp_server->client_callbacks.worker_unassign_callback = s_esocket_callback_worker_unassign;
}
/**
......@@ -280,11 +284,6 @@ void dap_stream_delete(dap_stream_t *a_stream)
log_it(L_ERROR,"stream delete NULL instance");
return;
}
//if (a_stream->prev) {
pthread_mutex_lock(&s_mutex_keepalive_list);
DL_DELETE(s_stream_keepalive_list, a_stream);
pthread_mutex_unlock(&s_mutex_keepalive_list);
//}
while (a_stream->channel_count) {
dap_stream_ch_delete(a_stream->channel[a_stream->channel_count - 1]);
......@@ -300,14 +299,14 @@ void dap_stream_delete(dap_stream_t *a_stream)
/**
* @brief stream_dap_delete Delete callback for UDP client
* @param sh DAP client instance
* @param a_esocket DAP client instance
* @param arg Not used
*/
static void s_esocket_callback_delete(dap_events_socket_t* a_esocket, void * a_arg)
{
UNUSED(a_arg);
if (!a_esocket)
return;
assert (a_esocket);
dap_http_client_t *l_http_client = DAP_HTTP_CLIENT(a_esocket);
dap_stream_t *l_stream = DAP_STREAM(l_http_client);
l_http_client->_inheritor = NULL; // To prevent double free
......@@ -327,9 +326,6 @@ dap_stream_t* dap_stream_new_es_client(dap_events_socket_t * a_esocket)
ret->esocket_uuid = a_esocket->uuid;
ret->buf_defrag_size=0;
ret->is_client_to_uplink = true;
pthread_mutex_lock(&s_mutex_keepalive_list);
DL_APPEND(s_stream_keepalive_list, ret);
pthread_mutex_unlock(&s_mutex_keepalive_list);
return ret;
}
......@@ -404,17 +400,22 @@ static void s_http_client_headers_write(dap_http_client_t * a_http_client, void
(void) a_arg;
//log_it(L_DEBUG,"s_http_client_headers_write()");
if(a_http_client->reply_status_code==200){
dap_stream_t *sid=DAP_STREAM(a_http_client);
dap_stream_t *l_stream=DAP_STREAM(a_http_client);
dap_http_out_header_add(a_http_client,"Content-Type","application/octet-stream");
dap_http_out_header_add(a_http_client,"Connnection","keep-alive");
dap_http_out_header_add(a_http_client,"Connection","keep-alive");
dap_http_out_header_add(a_http_client,"Cache-Control","no-cache");
if(sid->stream_size>0)
dap_http_out_header_add_f(a_http_client,"Content-Length","%u", (unsigned int) sid->stream_size );
if(l_stream->stream_size>0)
dap_http_out_header_add_f(a_http_client,"Content-Length","%u", (unsigned int) l_stream->stream_size );
a_http_client->state_read=DAP_HTTP_CLIENT_STATE_DATA;
dap_events_socket_set_readable_unsafe(a_http_client->esocket,true);
// Connection is established, setting up keepalive timer
dap_events_socket_uuid_t * l_es_uuid= DAP_NEW_Z(dap_events_socket_uuid_t);
*l_es_uuid = a_http_client->esocket->uuid;
dap_timerfd_start(STREAM_KEEPALIVE_TIMEOUT * 1000, (dap_timerfd_callback_t)s_callback_keepalive, l_es_uuid);
}
}
......@@ -434,6 +435,42 @@ static void s_http_client_data_write(dap_http_client_t * a_http_client, void * a
}
}
/**
* @brief s_esocket_callback_worker_assign
* @param a_esocket
* @param a_worker
*/
static void s_esocket_callback_worker_assign(dap_events_socket_t * a_esocket, dap_worker_t * a_worker)
{
if(a_esocket->type == DESCRIPTOR_TYPE_SOCKET_UDP){
dap_events_socket_uuid_t * l_es_uuid= DAP_NEW_Z(dap_events_socket_uuid_t);
*l_es_uuid = a_esocket->uuid;
dap_timerfd_start(STREAM_KEEPALIVE_TIMEOUT * 1000, (dap_timerfd_callback_t)s_callback_keepalive, l_es_uuid);
}else {
dap_http_client_t *l_http_client = DAP_HTTP_CLIENT(a_esocket);
assert(l_http_client);
dap_stream_t * l_stream =DAP_STREAM(l_http_client);
assert(l_stream);
// If we were reassigned after connection was bringed up
if(l_http_client->state_read == DAP_HTTP_CLIENT_STATE_DATA && l_http_client->state_write == DAP_HTTP_CLIENT_STATE_DATA ){
dap_events_socket_uuid_t * l_es_uuid= DAP_NEW_Z(dap_events_socket_uuid_t);
*l_es_uuid = a_esocket->uuid;
dap_timerfd_start(STREAM_KEEPALIVE_TIMEOUT * 1000, (dap_timerfd_callback_t)s_callback_keepalive, l_es_uuid);
}
}
}
/**
* @brief s_esocket_callback_worker_unassign
* @param a_esocket
* @param a_worker
*/
static void s_esocket_callback_worker_unassign(dap_events_socket_t * a_esocket, dap_worker_t * a_worker)
{
// TODO switch off keepalive packets sending
}
/**
* @brief s_data_read
* @param a_client
......@@ -482,34 +519,45 @@ static void s_esocket_write(dap_events_socket_t* a_esocket , void * a_arg){
}
/**
* @brief stream_dap_new New connection callback for UDP client
* @param sh DAP client instance
* @brief s_udp_esocket_new New connection callback for UDP client
* @param a_esocket DAP client instance
* @param arg Not used
*/
static void s_udp_esocket_new(dap_events_socket_t* a_esocket, void * a_arg){
static void s_udp_esocket_new(dap_events_socket_t* a_esocket, void * a_arg)
{
stream_new_udp(a_esocket);
}
/**
* @brief stream_data_read HTTP data read callback. Read packet and passes that to the channel's callback
* @param sh HTTP client instance
* @brief s_http_client_data_read HTTP data read callback. Read packet and passes that to the channel's callback
* @param a_http_client HTTP client instance
* @param arg Processed number of bytes
*/
static void s_http_client_data_read(dap_http_client_t * sh, void * arg)
static void s_http_client_data_read(dap_http_client_t * a_http_client, void * arg)
{
s_esocket_data_read(sh->esocket,arg);
s_esocket_data_read(a_http_client->esocket,arg);
}
/**
* @brief s_http_client_new
* @param a_http_client
* @param arg
*/
static void s_http_client_new(dap_http_client_t * a_http_client, void * arg)
{
a_http_client->esocket->callbacks.worker_assign_callback = s_esocket_callback_worker_assign;
a_http_client->esocket->callbacks.worker_unassign_callback = s_esocket_callback_worker_unassign;
}
/**
* @brief stream_delete Delete stream and free its resources
* @param sid Stream id
*/
static void s_http_client_delete(dap_http_client_t * sh, void * arg)
static void s_http_client_delete(dap_http_client_t * a_http_client, void * arg)
{
s_esocket_callback_delete(sh->esocket,arg);
s_esocket_callback_delete(a_http_client->esocket,arg);
}
/**
......@@ -790,21 +838,28 @@ static bool s_detect_loose_packet(dap_stream_t * a_stream)
return false;
}
static bool s_keepalive_cb( void )
/**
* @brief s_callback_keepalive
* @param a_arg
* @return
*/
static bool s_callback_keepalive( void * a_arg)
{
dap_stream_t *l_stream, *tmp;
dap_events_socket_uuid_t * l_es_uuid = (dap_events_socket_uuid_t*) a_arg;
dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default());
pthread_mutex_lock( &s_mutex_keepalive_list );
dap_stream_pkt_hdr_t l_pkt = {0};
l_pkt.type = STREAM_PKT_TYPE_KEEPALIVE;
memcpy(l_pkt.sig, c_dap_stream_sig, sizeof(l_pkt.sig));
DL_FOREACH_SAFE( s_stream_keepalive_list, l_stream, tmp ) {
dap_events_socket_t * l_input = l_worker->queue_es_io_input [l_stream->stream_worker->worker->id];
dap_events_socket_write_inter( l_input, l_stream->esocket_uuid,
&l_pkt, sizeof(l_pkt));
dap_events_socket_t * l_es = dap_worker_esocket_find_uuid(l_worker, *l_es_uuid);
if( l_es){
if(s_debug)
log_it(L_DEBUG,"Keepalive for sock fd %d uuid 0x%016llu", l_es->socket, *l_es_uuid);
dap_stream_pkt_hdr_t l_pkt = {0};
l_pkt.type = STREAM_PKT_TYPE_KEEPALIVE;
memcpy(l_pkt.sig, c_dap_stream_sig, sizeof(l_pkt.sig));
dap_events_socket_write_unsafe( l_es, &l_pkt, sizeof(l_pkt));
return true;
}else{
if(s_debug)
log_it(L_INFO,"Keepalive for sock uuid %016llx removed", *l_es_uuid);
DAP_DELETE(l_es_uuid);
return false; // Socket is removed from worker
}
pthread_mutex_unlock( &s_mutex_keepalive_list );
return true;
}