diff --git a/CMakeLists.txt b/CMakeLists.txt index c3044cacfdc74695f2cb4c3ccd149d2af8c4842f..53bf8aa17758171b4f2eee577719214d6ac404ee 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 3.0) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.9-22") +set(CELLFRAME_SDK_NATIVE_VERSION "2.9-23") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") message("Cellframe modules: ${CELLFRAME_MODULES}") diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index cfbfde377ae8491b88b353c599b42e976fe3fefe..bea02c2aef9d63138e8bd41459898b3ec77ad4c8 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -64,6 +64,9 @@ static void s_http_client_headers_write(dap_http_client_t * a_http_client, void static void s_http_client_data_write(dap_http_client_t * a_http_client, void * a_arg); // Write the data static void s_http_client_data_read(dap_http_client_t * a_http_client, void * a_arg); // Read the data +static void s_esocket_callback_worker_assign(dap_events_socket_t * a_esocket, dap_worker_t * a_worker); +static void s_esocket_callback_worker_unassign(dap_events_socket_t * a_esocket, dap_worker_t * a_worker); + static void s_esocket_data_read(dap_events_socket_t* a_esocket, void * a_arg); static void s_esocket_write(dap_events_socket_t* a_esocket, void * a_arg); static void s_esocket_callback_delete(dap_events_socket_t* a_esocket, void * a_arg); @@ -71,13 +74,13 @@ static void s_udp_esocket_new(dap_events_socket_t* a_esocket,void * a_arg); // Internal functions static dap_stream_t * s_stream_new(dap_http_client_t * a_http_client); // Create new stream +static void s_http_client_new(dap_http_client_t * a_esocket, void * a_arg); static void s_http_client_delete(dap_http_client_t * a_esocket, void * a_arg); -static dap_stream_t *s_stream_keepalive_list = NULL; -static pthread_mutex_t s_mutex_keepalive_list; -static bool s_keepalive_cb( void ); +static bool s_callback_keepalive( void * a_arg); static bool s_dump_packet_headers = false; +static bool s_debug = false; bool dap_stream_get_dump_packet_headers(){ return s_dump_packet_headers; } @@ -117,8 +120,7 @@ int dap_stream_init(dap_config_t * a_config) s_dap_stream_load_preferred_encryption_type(a_config); s_dump_packet_headers = dap_config_get_item_bool_default(g_config,"general","debug_dump_stream_headers",false); - pthread_mutex_init( &s_mutex_keepalive_list, NULL ); - dap_timerfd_start(STREAM_KEEPALIVE_TIMEOUT * 1000, (dap_timerfd_callback_t)s_keepalive_cb, NULL); + s_debug = dap_config_get_item_bool_default(g_config,"stream","debug",false); log_it(L_NOTICE,"Init streaming module"); return 0; @@ -129,7 +131,6 @@ int dap_stream_init(dap_config_t * a_config) */ void dap_stream_deinit() { - pthread_mutex_destroy( &s_mutex_keepalive_list ); dap_stream_ch_deinit( ); } @@ -142,7 +143,7 @@ void dap_stream_add_proc_http(struct dap_http * a_http, const char * a_url) { dap_http_add_proc(a_http,a_url ,NULL, // _internal - NULL, // New + s_http_client_new, // New s_http_client_delete, // Delete s_http_client_headers_read, // Headers read s_http_client_headers_write, // Headerts write @@ -161,6 +162,9 @@ void dap_stream_add_proc_udp(dap_server_t *a_udp_server) a_udp_server->client_callbacks.write_callback = s_esocket_write; a_udp_server->client_callbacks.delete_callback = s_esocket_callback_delete; a_udp_server->client_callbacks.new_callback = s_udp_esocket_new; + a_udp_server->client_callbacks.worker_assign_callback = s_esocket_callback_worker_assign; + a_udp_server->client_callbacks.worker_unassign_callback = s_esocket_callback_worker_unassign; + } /** @@ -280,11 +284,6 @@ void dap_stream_delete(dap_stream_t *a_stream) log_it(L_ERROR,"stream delete NULL instance"); return; } - //if (a_stream->prev) { - pthread_mutex_lock(&s_mutex_keepalive_list); - DL_DELETE(s_stream_keepalive_list, a_stream); - pthread_mutex_unlock(&s_mutex_keepalive_list); - //} while (a_stream->channel_count) { dap_stream_ch_delete(a_stream->channel[a_stream->channel_count - 1]); @@ -300,14 +299,14 @@ void dap_stream_delete(dap_stream_t *a_stream) /** * @brief stream_dap_delete Delete callback for UDP client - * @param sh DAP client instance + * @param a_esocket DAP client instance * @param arg Not used */ static void s_esocket_callback_delete(dap_events_socket_t* a_esocket, void * a_arg) { UNUSED(a_arg); - if (!a_esocket) - return; + assert (a_esocket); + dap_http_client_t *l_http_client = DAP_HTTP_CLIENT(a_esocket); dap_stream_t *l_stream = DAP_STREAM(l_http_client); l_http_client->_inheritor = NULL; // To prevent double free @@ -327,9 +326,6 @@ dap_stream_t* dap_stream_new_es_client(dap_events_socket_t * a_esocket) ret->esocket_uuid = a_esocket->uuid; ret->buf_defrag_size=0; ret->is_client_to_uplink = true; - pthread_mutex_lock(&s_mutex_keepalive_list); - DL_APPEND(s_stream_keepalive_list, ret); - pthread_mutex_unlock(&s_mutex_keepalive_list); return ret; } @@ -404,17 +400,22 @@ static void s_http_client_headers_write(dap_http_client_t * a_http_client, void (void) a_arg; //log_it(L_DEBUG,"s_http_client_headers_write()"); if(a_http_client->reply_status_code==200){ - dap_stream_t *sid=DAP_STREAM(a_http_client); + dap_stream_t *l_stream=DAP_STREAM(a_http_client); dap_http_out_header_add(a_http_client,"Content-Type","application/octet-stream"); - dap_http_out_header_add(a_http_client,"Connnection","keep-alive"); + dap_http_out_header_add(a_http_client,"Connection","keep-alive"); dap_http_out_header_add(a_http_client,"Cache-Control","no-cache"); - if(sid->stream_size>0) - dap_http_out_header_add_f(a_http_client,"Content-Length","%u", (unsigned int) sid->stream_size ); + if(l_stream->stream_size>0) + dap_http_out_header_add_f(a_http_client,"Content-Length","%u", (unsigned int) l_stream->stream_size ); a_http_client->state_read=DAP_HTTP_CLIENT_STATE_DATA; dap_events_socket_set_readable_unsafe(a_http_client->esocket,true); + // Connection is established, setting up keepalive timer + dap_events_socket_uuid_t * l_es_uuid= DAP_NEW_Z(dap_events_socket_uuid_t); + *l_es_uuid = a_http_client->esocket->uuid; + dap_timerfd_start(STREAM_KEEPALIVE_TIMEOUT * 1000, (dap_timerfd_callback_t)s_callback_keepalive, l_es_uuid); + } } @@ -434,6 +435,42 @@ static void s_http_client_data_write(dap_http_client_t * a_http_client, void * a } } +/** + * @brief s_esocket_callback_worker_assign + * @param a_esocket + * @param a_worker + */ +static void s_esocket_callback_worker_assign(dap_events_socket_t * a_esocket, dap_worker_t * a_worker) +{ + if(a_esocket->type == DESCRIPTOR_TYPE_SOCKET_UDP){ + dap_events_socket_uuid_t * l_es_uuid= DAP_NEW_Z(dap_events_socket_uuid_t); + *l_es_uuid = a_esocket->uuid; + dap_timerfd_start(STREAM_KEEPALIVE_TIMEOUT * 1000, (dap_timerfd_callback_t)s_callback_keepalive, l_es_uuid); + }else { + dap_http_client_t *l_http_client = DAP_HTTP_CLIENT(a_esocket); + assert(l_http_client); + dap_stream_t * l_stream =DAP_STREAM(l_http_client); + assert(l_stream); + // If we were reassigned after connection was bringed up + if(l_http_client->state_read == DAP_HTTP_CLIENT_STATE_DATA && l_http_client->state_write == DAP_HTTP_CLIENT_STATE_DATA ){ + dap_events_socket_uuid_t * l_es_uuid= DAP_NEW_Z(dap_events_socket_uuid_t); + *l_es_uuid = a_esocket->uuid; + dap_timerfd_start(STREAM_KEEPALIVE_TIMEOUT * 1000, (dap_timerfd_callback_t)s_callback_keepalive, l_es_uuid); + } + } +} + +/** + * @brief s_esocket_callback_worker_unassign + * @param a_esocket + * @param a_worker + */ +static void s_esocket_callback_worker_unassign(dap_events_socket_t * a_esocket, dap_worker_t * a_worker) +{ + // TODO switch off keepalive packets sending +} + + /** * @brief s_data_read * @param a_client @@ -482,34 +519,45 @@ static void s_esocket_write(dap_events_socket_t* a_esocket , void * a_arg){ } /** - * @brief stream_dap_new New connection callback for UDP client - * @param sh DAP client instance + * @brief s_udp_esocket_new New connection callback for UDP client + * @param a_esocket DAP client instance * @param arg Not used */ -static void s_udp_esocket_new(dap_events_socket_t* a_esocket, void * a_arg){ +static void s_udp_esocket_new(dap_events_socket_t* a_esocket, void * a_arg) +{ stream_new_udp(a_esocket); } /** - * @brief stream_data_read HTTP data read callback. Read packet and passes that to the channel's callback - * @param sh HTTP client instance + * @brief s_http_client_data_read HTTP data read callback. Read packet and passes that to the channel's callback + * @param a_http_client HTTP client instance * @param arg Processed number of bytes */ -static void s_http_client_data_read(dap_http_client_t * sh, void * arg) +static void s_http_client_data_read(dap_http_client_t * a_http_client, void * arg) { - s_esocket_data_read(sh->esocket,arg); + s_esocket_data_read(a_http_client->esocket,arg); } +/** + * @brief s_http_client_new + * @param a_http_client + * @param arg + */ +static void s_http_client_new(dap_http_client_t * a_http_client, void * arg) +{ + a_http_client->esocket->callbacks.worker_assign_callback = s_esocket_callback_worker_assign; + a_http_client->esocket->callbacks.worker_unassign_callback = s_esocket_callback_worker_unassign; +} /** * @brief stream_delete Delete stream and free its resources * @param sid Stream id */ -static void s_http_client_delete(dap_http_client_t * sh, void * arg) +static void s_http_client_delete(dap_http_client_t * a_http_client, void * arg) { - s_esocket_callback_delete(sh->esocket,arg); + s_esocket_callback_delete(a_http_client->esocket,arg); } /** @@ -790,21 +838,28 @@ static bool s_detect_loose_packet(dap_stream_t * a_stream) return false; } - -static bool s_keepalive_cb( void ) +/** + * @brief s_callback_keepalive + * @param a_arg + * @return + */ +static bool s_callback_keepalive( void * a_arg) { - dap_stream_t *l_stream, *tmp; + dap_events_socket_uuid_t * l_es_uuid = (dap_events_socket_uuid_t*) a_arg; dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); - pthread_mutex_lock( &s_mutex_keepalive_list ); - dap_stream_pkt_hdr_t l_pkt = {0}; - l_pkt.type = STREAM_PKT_TYPE_KEEPALIVE; - memcpy(l_pkt.sig, c_dap_stream_sig, sizeof(l_pkt.sig)); - DL_FOREACH_SAFE( s_stream_keepalive_list, l_stream, tmp ) { - dap_events_socket_t * l_input = l_worker->queue_es_io_input [l_stream->stream_worker->worker->id]; - dap_events_socket_write_inter( l_input, l_stream->esocket_uuid, - &l_pkt, sizeof(l_pkt)); + dap_events_socket_t * l_es = dap_worker_esocket_find_uuid(l_worker, *l_es_uuid); + if( l_es){ + if(s_debug) + log_it(L_DEBUG,"Keepalive for sock fd %d uuid 0x%016llu", l_es->socket, *l_es_uuid); + dap_stream_pkt_hdr_t l_pkt = {0}; + l_pkt.type = STREAM_PKT_TYPE_KEEPALIVE; + memcpy(l_pkt.sig, c_dap_stream_sig, sizeof(l_pkt.sig)); + dap_events_socket_write_unsafe( l_es, &l_pkt, sizeof(l_pkt)); + return true; + }else{ + if(s_debug) + log_it(L_INFO,"Keepalive for sock uuid %016llx removed", *l_es_uuid); + DAP_DELETE(l_es_uuid); + return false; // Socket is removed from worker } - pthread_mutex_unlock( &s_mutex_keepalive_list ); - return true; } -