diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 7556bda3826158ac80c7debe33acdfd757a55dd5..59a52c01f98d95256170fcc41b950340ddafb383 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -487,7 +487,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) dap_events_socket_assign_on_worker_mt(a_client_pvt->stream_es, l_worker); a_client_pvt->stream_es->_inheritor = a_client_pvt;//->client; - a_client_pvt->stream = dap_stream_new_es(a_client_pvt->stream_es); + a_client_pvt->stream = dap_stream_new_es_client(a_client_pvt->stream_es); a_client_pvt->stream->is_client_to_uplink = true; a_client_pvt->stream->session = dap_stream_session_pure_new(); // may be from in packet? diff --git a/dap-sdk/net/stream/ch/dap_stream_ch.c b/dap-sdk/net/stream/ch/dap_stream_ch.c index c507880b314aeeb1e0c6abc4147ef0885f78d884..85c0649402023aed8990fb462c9c72ff7aaa7a8c 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch.c @@ -106,10 +106,8 @@ dap_stream_ch_t* dap_stream_ch_new(dap_stream_t* a_stream, uint8_t id) if(l_ch_new->proc->new_callback) l_ch_new->proc->new_callback(l_ch_new,NULL); - pthread_rwlock_wrlock(&a_stream->rwlock); a_stream->channel[l_ch_new->stream->channel_count] = l_ch_new; a_stream->channel_count++; - pthread_rwlock_unlock(&a_stream->rwlock); struct dap_stream_ch_table_t *l_new_ch = DAP_NEW_Z(struct dap_stream_ch_table_t); l_new_ch->ch = l_ch_new; diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index f14bff6ce9bf23e54d3a88593a20365cb27b3403..360574bcad55b1a74344d607731874b9dbd66eef 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -26,6 +26,8 @@ #include <stdint.h> #include <unistd.h> +#include <mqueue.h> + #ifdef _WIN32 #include <winsock2.h> #include <windows.h> @@ -54,23 +56,23 @@ #define LOG_TAG "dap_stream" #define HEADER_WITH_SIZE_FIELD 12 //This count of bytes enough for allocate memory for stream packet -void stream_proc_pkt_in(dap_stream_t * sid); +static void s_stream_proc_pkt_in(dap_stream_t * a_stream); // Callbacks for HTTP client -void stream_headers_read(dap_http_client_t * sh, void * arg); // Prepare stream when all headers are read +static void s_stream_headers_read(dap_http_client_t * a_http_client, void * a_arg); // Prepare stream when all headers are read -void s_http_client_headers_write(dap_http_client_t * sh, void * arg); // Output headers -void s_http_client_data_write(dap_http_client_t * sh, void * arg); // Write the data -void stream_data_read(dap_http_client_t * sh, void * arg); // Read the data +static void s_http_client_headers_write(dap_http_client_t * a_http_client, void * a_arg); // Output headers +static void s_http_client_data_write(dap_http_client_t * a_http_client, void * a_arg); // Write the data +static void s_stream_data_read(dap_http_client_t * a_http_client, void * a_arg); // Read the data -void s_http_client_data_read(dap_events_socket_t* sh, void * arg); -void stream_dap_data_write(dap_events_socket_t* sh, void * arg); -void s_es_callback_delete(dap_events_socket_t* sh, void * arg); -void stream_dap_udp_new(dap_events_socket_t* sh,void * arg); +static void s_http_client_data_read(dap_events_socket_t* a_esocket, void * a_arg); +static void s_stream_dap_data_write(dap_events_socket_t* a_esocket, void * a_arg); +static void s_es_callback_delete(dap_events_socket_t* a_esocket, void * a_arg); +static void s_stream_dap_udp_new(dap_events_socket_t* a_esocket,void * a_arg); // Internal functions -dap_stream_t * stream_new(dap_http_client_t * a_sh); // Create new stream -void stream_delete(dap_http_client_t * sh, void * arg); +static dap_stream_t * s_stream_new(dap_http_client_t * a_http_client); // Create new stream +static void s_stream_delete(dap_http_client_t * a_esocket, void * a_arg); //struct ev_loop *keepalive_loop; pthread_t keepalive_thread; @@ -78,8 +80,7 @@ pthread_t keepalive_thread; static dap_stream_t *s_stream_keepalive_list = NULL; static pthread_mutex_t s_mutex_keepalive_list; -static void start_keepalive( dap_stream_t *sid ); -static void keepalive_cb( void ); +static void s_keepalive_cb( void ); static bool s_keep_alive_loop_quit_signal = false; static bool s_dump_packet_headers = false; @@ -88,28 +89,6 @@ bool dap_stream_get_dump_packet_headers(){ return s_dump_packet_headers; } static struct timespec keepalive_loop_sleep = { 0, STREAM_KEEPALIVE_TIMEOUT * 1000 * 1000 }; -// Start keepalive stream -static void *stream_loop( void *arg ) -{ - UNUSED(arg); -// keepalive_loop = ev_loop_new(0); -// ev_loop(keepalive_loop, 0); - do { - - #ifndef _WIN32 - //nanosleep( &keepalive_loop_sleep, NULL ); - sleep( STREAM_KEEPALIVE_TIMEOUT ); - #else - Sleep( STREAM_KEEPALIVE_TIMEOUT * 1000 ); - #endif - - keepalive_cb( ); - - } while ( !s_keep_alive_loop_quit_signal ); - - return NULL; -} - /** * @brief stream_init Init stream module * @return 0 if ok others if not @@ -153,72 +132,72 @@ void dap_stream_deinit() * @param sh HTTP server instance * @param url URL */ -void dap_stream_add_proc_http(struct dap_http * sh, const char * url) +void dap_stream_add_proc_http(struct dap_http * a_http, const char * a_url) { - dap_http_add_proc(sh,url,NULL,NULL,stream_delete,stream_headers_read,s_http_client_headers_write,stream_data_read,s_http_client_data_write,NULL); + dap_http_add_proc(a_http,a_url,NULL,NULL,s_stream_delete,s_stream_headers_read,s_http_client_headers_write,s_stream_data_read,s_http_client_data_write,NULL); } /** * @brief stream_add_proc_udp Add processor callback for streaming - * @param sh UDP server instance + * @param a_udp_server UDP server instance */ -void dap_stream_add_proc_udp(dap_udp_server_t * sh) +void dap_stream_add_proc_udp(dap_udp_server_t * a_udp_server) { - dap_server_t* server = sh->dap_server; - server->client_callbacks.read_callback = s_http_client_data_read; - server->client_callbacks.write_callback = stream_dap_data_write; - server->client_callbacks.delete_callback = s_es_callback_delete; - server->client_callbacks.new_callback = stream_dap_udp_new; + dap_server_t* l_server = a_udp_server->dap_server; + l_server->client_callbacks.read_callback = s_http_client_data_read; + l_server->client_callbacks.write_callback = s_stream_dap_data_write; + l_server->client_callbacks.delete_callback = s_es_callback_delete; + l_server->client_callbacks.new_callback = s_stream_dap_udp_new; } /** * @brief stream_states_update - * @param sid stream instance + * @param a_stream stream instance */ -void stream_states_update(struct dap_stream *sid) +void stream_states_update(struct dap_stream *a_stream) { - if(sid->conn_http) - sid->conn_http->state_write=DAP_HTTP_CLIENT_STATE_START; + if(a_stream->conn_http) + a_stream->conn_http->state_write=DAP_HTTP_CLIENT_STATE_START; size_t i; bool ready_to_write=false; - for(i=0;i<sid->channel_count; i++) - ready_to_write|=sid->channel[i]->ready_to_write; - dap_events_socket_set_writable_unsafe(sid->esocket,ready_to_write); - if(sid->conn_http) - sid->conn_http->out_content_ready=true; + for(i=0;i<a_stream->channel_count; i++) + ready_to_write|=a_stream->channel[i]->ready_to_write; + dap_events_socket_set_writable_unsafe(a_stream->esocket,ready_to_write); + if(a_stream->conn_http) + a_stream->conn_http->out_content_ready=true; } /** * @brief stream_header_read Read headers callback for HTTP - * @param cl_ht HTTP client structure - * @param arg Not used + * @param a_http_client HTTP client structure + * @param a_arg Not used */ -void stream_headers_read(dap_http_client_t * cl_ht, void * arg) +void s_stream_headers_read(dap_http_client_t * a_http_client, void * a_arg) { - (void) arg; + (void) a_arg; // char * raw=0; // int raw_size; unsigned int id=0; // log_it(L_DEBUG,"Prepare data stream"); - if(cl_ht->in_query_string[0]){ - log_it(L_INFO,"Query string [%s]",cl_ht->in_query_string); + if(a_http_client->in_query_string[0]){ + log_it(L_INFO,"Query string [%s]",a_http_client->in_query_string); // if(sscanf(cl_ht->in_query_string,"fj913htmdgaq-d9hf=%u",&id)==1){ - if(sscanf(cl_ht->in_query_string,"session_id=%u",&id) == 1 || - sscanf(cl_ht->in_query_string,"fj913htmdgaq-d9hf=%u",&id) == 1) { + if(sscanf(a_http_client->in_query_string,"session_id=%u",&id) == 1 || + sscanf(a_http_client->in_query_string,"fj913htmdgaq-d9hf=%u",&id) == 1) { dap_stream_session_t * ss=NULL; ss=dap_stream_session_id_mt(id); if(ss==NULL){ log_it(L_ERROR,"No session id %u was found",id); - cl_ht->reply_status_code=404; - strcpy(cl_ht->reply_reason_phrase,"Not found"); + a_http_client->reply_status_code=404; + strcpy(a_http_client->reply_reason_phrase,"Not found"); }else{ log_it(L_INFO,"Session id %u was found with channels = %s",id,ss->active_channels); if(dap_stream_session_open(ss)==0){ // Create new stream - dap_stream_t * sid = stream_new(cl_ht); + dap_stream_t * sid = s_stream_new(a_http_client); sid->session=ss; - dap_http_header_t *header = dap_http_header_find(cl_ht->in_headers, "Service-Key"); + dap_http_header_t *header = dap_http_header_find(a_http_client->in_headers, "Service-Key"); if (header) ss->service_key = strdup(header->value); size_t count_channels = strlen(ss->active_channels); @@ -227,16 +206,16 @@ void stream_headers_read(dap_http_client_t * cl_ht, void * arg) //sid->channel[i]->ready_to_write = true; } - cl_ht->reply_status_code=200; - strcpy(cl_ht->reply_reason_phrase,"OK"); - cl_ht->state_read=DAP_HTTP_CLIENT_STATE_DATA; - dap_events_socket_set_readable_unsafe(cl_ht->esocket,true); + a_http_client->reply_status_code=200; + strcpy(a_http_client->reply_reason_phrase,"OK"); + a_http_client->state_read=DAP_HTTP_CLIENT_STATE_DATA; + dap_events_socket_set_readable_unsafe(a_http_client->esocket,true); stream_states_update(sid); }else{ log_it(L_ERROR,"Can't open session id %u",id); - cl_ht->reply_status_code=404; - strcpy(cl_ht->reply_reason_phrase,"Not found"); + a_http_client->reply_status_code=404; + strcpy(a_http_client->reply_reason_phrase,"Not found"); } } } @@ -267,7 +246,7 @@ dap_stream_t * stream_new_udp(dap_events_socket_t * a_esocket) * @param id session id * @param cl DAP client structure */ -void check_session( unsigned int a_id, dap_events_socket_t *a_client_remote ) +void check_session( unsigned int a_id, dap_events_socket_t *a_esocket ) { dap_stream_session_t *l_session = NULL; @@ -288,10 +267,10 @@ void check_session( unsigned int a_id, dap_events_socket_t *a_client_remote ) dap_stream_t *l_stream; - if ( DAP_STREAM(a_client_remote) == NULL ) - l_stream = stream_new_udp( a_client_remote ); + if ( DAP_STREAM(a_esocket) == NULL ) + l_stream = stream_new_udp( a_esocket ); else - l_stream = DAP_STREAM( a_client_remote ); + l_stream = DAP_STREAM( a_esocket ); l_stream->session = l_session; @@ -300,30 +279,28 @@ void check_session( unsigned int a_id, dap_events_socket_t *a_client_remote ) log_it( L_INFO, "Opened stream session technical and data channels" ); - size_t count_channels = strlen(l_session->active_channels); + //size_t count_channels = strlen(l_session->active_channels); for (size_t i =0; i<sizeof (l_session->active_channels); i++ ) if ( l_session->active_channels[i]) dap_stream_ch_new( l_stream, l_session->active_channels[i] ); stream_states_update( l_stream ); - dap_events_socket_set_readable_unsafe( a_client_remote, true ); + dap_events_socket_set_readable_unsafe( a_esocket, true ); - start_keepalive( l_stream ); } /** * @brief stream_new Create new stream instance for HTTP client * @return New stream_t instance */ -dap_stream_t * stream_new(dap_http_client_t * a_sh) +dap_stream_t * s_stream_new(dap_http_client_t * a_http_client) { dap_stream_t * ret= DAP_NEW_Z(dap_stream_t); - pthread_rwlock_init( &ret->rwlock, NULL); - ret->esocket = a_sh->esocket; - ret->stream_worker = (dap_stream_worker_t*) a_sh->esocket->worker->_inheritor; - ret->conn_http=a_sh; + ret->esocket = a_http_client->esocket; + ret->stream_worker = (dap_stream_worker_t*) a_http_client->esocket->worker->_inheritor; + ret->conn_http=a_http_client; ret->buf_defrag_size = 0; ret->seq_id = 0; ret->client_last_seq_id_packet = (size_t)-1; @@ -355,13 +332,10 @@ void dap_stream_delete(dap_stream_t *a_stream) dap_stream_ch_delete(a_stream->channel[a_stream->channel_count - 1]); } - pthread_rwlock_wrlock(&a_stream->rwlock); if(a_stream->session) dap_stream_session_close_mt(a_stream->session->id); // TODO make stream close after timeout, not momentaly a_stream->session = NULL; a_stream->esocket = NULL; - pthread_rwlock_unlock(&a_stream->rwlock); - pthread_rwlock_destroy(&a_stream->rwlock); DAP_DELETE(a_stream); log_it(L_NOTICE,"Stream connection is over"); } @@ -371,13 +345,14 @@ void dap_stream_delete(dap_stream_t *a_stream) * @param sh DAP client instance * @param arg Not used */ -void s_es_callback_delete(dap_events_socket_t* sh, void * arg) +static void s_es_callback_delete(dap_events_socket_t* a_esocket, void * a_arg) { - UNUSED(arg); - if (!sh) + UNUSED(a_arg); + if (!a_esocket) return; - dap_stream_t *l_stream = DAP_STREAM(sh); + dap_stream_t *l_stream = DAP_STREAM(a_esocket); dap_stream_delete(l_stream); + a_esocket->_inheritor = NULL; // To prevent double free } @@ -386,14 +361,13 @@ void s_es_callback_delete(dap_events_socket_t* sh, void * arg) * @param a_es * @return */ -dap_stream_t* dap_stream_new_es(dap_events_socket_t * a_es) +dap_stream_t* dap_stream_new_es_client(dap_events_socket_t * a_esocket) { dap_stream_t * ret= DAP_NEW_Z(dap_stream_t); - pthread_rwlock_init( &ret->rwlock, NULL); - ret->esocket = a_es; + ret->esocket = a_esocket; ret->buf_defrag_size=0; ret->is_client_to_uplink = true; - log_it(L_NOTICE,"New stream with events socket instance for %s",a_es->hostaddr); + log_it(L_NOTICE,"New stream with events socket instance for %s",a_esocket->hostaddr); return ret; } @@ -402,22 +376,22 @@ dap_stream_t* dap_stream_new_es(dap_events_socket_t * a_es) * @param sh HTTP client instance * @param arg Not used */ -void s_http_client_headers_write(dap_http_client_t * sh, void *arg) +static void s_http_client_headers_write(dap_http_client_t * a_http_client, void *a_arg) { - (void) arg; + (void) a_arg; - if(sh->reply_status_code==200){ - dap_stream_t *sid=DAP_STREAM(sh->esocket); + if(a_http_client->reply_status_code==200){ + dap_stream_t *sid=DAP_STREAM(a_http_client->esocket); - dap_http_out_header_add(sh,"Content-Type","application/octet-stream"); - dap_http_out_header_add(sh,"Connnection","keep-alive"); - dap_http_out_header_add(sh,"Cache-Control","no-cache"); + dap_http_out_header_add(a_http_client,"Content-Type","application/octet-stream"); + dap_http_out_header_add(a_http_client,"Connnection","keep-alive"); + dap_http_out_header_add(a_http_client,"Cache-Control","no-cache"); if(sid->stream_size>0) - dap_http_out_header_add_f(sh,"Content-Length","%u", (unsigned int) sid->stream_size ); + dap_http_out_header_add_f(a_http_client,"Content-Length","%u", (unsigned int) sid->stream_size ); - sh->state_read=DAP_HTTP_CLIENT_STATE_DATA; - dap_events_socket_set_readable_unsafe(sh->esocket,true); + a_http_client->state_read=DAP_HTTP_CLIENT_STATE_DATA; + dap_events_socket_set_readable_unsafe(a_http_client->esocket,true); } } @@ -440,7 +414,7 @@ static void keepalive_cb (EV_P_ ev_timer *w, int revents) } **/ -static void keepalive_cb( void ) +static void s_keepalive_cb( void ) { dap_stream_t *l_stream, *tmp; return; @@ -461,54 +435,37 @@ static void keepalive_cb( void ) } - - -/** - * @brief start_keepalive Start keepalive signals exchange for stream - * @param sid Stream instance - */ -void start_keepalive( dap_stream_t *sid ) { - return; -// keepalive_loop = EV_DEFAULT; -// sid->keepalive_watcher.data = sid; -// ev_timer_init (&sid->keepalive_watcher, keepalive_cb, STREAM_KEEPALIVE_TIMEOUT, STREAM_KEEPALIVE_TIMEOUT); -// ev_timer_start (keepalive_loop, &sid->keepalive_watcher); - pthread_mutex_lock( &s_mutex_keepalive_list ); - DL_APPEND( s_stream_keepalive_list, sid ); - pthread_mutex_unlock( &s_mutex_keepalive_list ); -} - /** * @brief stream_data_write HTTP data write callback - * @param sh HTTP client instance - * @param arg Not used + * @param a_http_client HTTP client instance + * @param a_arg Not used */ -void s_http_client_data_write(dap_http_client_t * sh, void * arg) +static void s_http_client_data_write(dap_http_client_t * a_http_client, void * a_arg) { - (void) arg; + (void) a_arg; - if(sh->reply_status_code==200){ - stream_dap_data_write(sh->esocket,arg); + if( a_http_client->reply_status_code == 200 ){ + s_stream_dap_data_write(a_http_client->esocket, a_arg); }else{ - log_it(L_WARNING, "Wrong request, reply status code is %u",sh->reply_status_code); + log_it(L_WARNING, "Wrong request, reply status code is %u",a_http_client->reply_status_code); } } /** * @brief s_data_read - * @param sh - * @param arg + * @param a_client + * @param a_arg */ -void s_http_client_data_read(dap_events_socket_t* a_client, void * arg) +static void s_http_client_data_read(dap_events_socket_t* a_client, void * a_arg) { dap_stream_t * l_stream =DAP_STREAM(a_client); - int * ret = (int *) arg; + int * l_ret = (int *) a_arg; if (s_dump_packet_headers ) { log_it(L_DEBUG,"dap_stream_data_read: ready_to_write=%s, client->buf_in_size=%u" , (a_client->flags & DAP_SOCK_READY_TO_WRITE)?"true":"false", a_client->buf_in_size ); } - *ret = dap_stream_data_proc_read( l_stream); + *l_ret = dap_stream_data_proc_read( l_stream); } /** @@ -571,7 +528,7 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) a_stream->pkt_buf_in = NULL; } else{ - stream_proc_pkt_in(a_stream); + s_stream_proc_pkt_in(a_stream); } } proc_data = (uint8_t *)(buf_in + buf_in_size - bytes_left_to_read);//proc_data=(a_stream->conn->buf_in + a_stream->conn->buf_in_size - bytes_left_to_read); @@ -642,7 +599,7 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) if(a_stream->pkt_buf_in_data_size==(pkt->hdr.size + sizeof(stream_pkt_hdr_t))){ // log_it(INFO,"All the packet is present in da buffer (hdr.size=%u read_bytes_to=%u buf_in_size=%u)" // ,sid->pkt_buf_in->hdr.size,read_bytes_to,sid->conn->buf_in_size); - stream_proc_pkt_in(a_stream); + s_stream_proc_pkt_in(a_stream); }else if(a_stream->pkt_buf_in_data_size>pkt->hdr.size + sizeof(stream_pkt_hdr_t)){ //log_it(L_WARNING,"Input: packet buffer has %u bytes more than we need, they're lost",a_stream->pkt_buf_in_data_size-pkt->hdr.size); }else{ @@ -679,9 +636,9 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) * @param sh DAP client instance * @param arg Not used */ -void stream_dap_data_write(dap_events_socket_t* a_client , void * arg){ +static void s_stream_dap_data_write(dap_events_socket_t* a_client , void * a_arg){ + (void) a_arg; size_t i; - (void) arg; bool ready_to_write=false; //log_it(L_DEBUG,"Process channels data output (%u channels)", DAP_STREAM(a_client )->channel_count ); @@ -706,35 +663,35 @@ void stream_dap_data_write(dap_events_socket_t* a_client , void * arg){ * @param sh DAP client instance * @param arg Not used */ -void stream_dap_udp_new(dap_events_socket_t* sh, void * arg){ -// dap_stream_t *sid = stream_new_udp(sh); - stream_new_udp(sh); +static void s_stream_dap_udp_new(dap_events_socket_t* a_esocket, void * a_arg){ + stream_new_udp(a_esocket); } - -static bool _detect_loose_packet(dap_stream_t * a_stream) +/** + * @brief _detect_loose_packet + * @param a_stream + * @return + */ +static bool s_detect_loose_packet(dap_stream_t * a_stream) { - dap_stream_ch_pkt_t * ch_pkt = (dap_stream_ch_pkt_t *) a_stream->pkt_cache; - - pthread_rwlock_wrlock (&a_stream->rwlock); + dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_stream->pkt_cache; - int count_loosed_packets = ch_pkt->hdr.seq_id - (a_stream->client_last_seq_id_packet + 1); - if(count_loosed_packets > 0) + int l_count_loosed_packets = l_ch_pkt->hdr.seq_id - (a_stream->client_last_seq_id_packet + 1); + if(l_count_loosed_packets > 0) { log_it(L_WARNING, "Detected loosed %d packets. " - "Last read seq_id packet: %d Current: %d", count_loosed_packets, - a_stream->client_last_seq_id_packet, ch_pkt->hdr.seq_id); - } else if(count_loosed_packets < 0) { - if(a_stream->client_last_seq_id_packet != 0 && ch_pkt->hdr.seq_id != 0) { + "Last read seq_id packet: %d Current: %d", l_count_loosed_packets, + a_stream->client_last_seq_id_packet, l_ch_pkt->hdr.seq_id); + } else if(l_count_loosed_packets < 0) { + if(a_stream->client_last_seq_id_packet != 0 && l_ch_pkt->hdr.seq_id != 0) { log_it(L_WARNING, "Something wrong. count_loosed packets %d can't less than zero. " - "Last read seq_id packet: %d Current: %d", count_loosed_packets, - a_stream->client_last_seq_id_packet, ch_pkt->hdr.seq_id); + "Last read seq_id packet: %d Current: %d", l_count_loosed_packets, + a_stream->client_last_seq_id_packet, l_ch_pkt->hdr.seq_id); } // else client don't support seqid functionality } // log_it(L_DEBUG, "Packet seq id: %d", ch_pkt->hdr.seq_id); // log_it(L_DEBUG, "Last seq id: %d", sid->last_seq_id_packet); - a_stream->client_last_seq_id_packet = ch_pkt->hdr.seq_id; - pthread_rwlock_unlock (&a_stream->rwlock); + a_stream->client_last_seq_id_packet = l_ch_pkt->hdr.seq_id; return false; } @@ -744,15 +701,13 @@ static bool _detect_loose_packet(dap_stream_t * a_stream) * @brief stream_proc_pkt_in * @param sid */ -void stream_proc_pkt_in(dap_stream_t * a_stream) +static void s_stream_proc_pkt_in(dap_stream_t * a_stream) { - pthread_rwlock_wrlock( &a_stream->rwlock ); dap_stream_pkt_t * l_pkt = a_stream->pkt_buf_in; size_t l_pkt_size = a_stream->pkt_buf_in_data_size; a_stream->pkt_buf_in=NULL; a_stream->pkt_buf_in_data_size=0; a_stream->keepalive_passed = 0; - pthread_rwlock_unlock (&a_stream->rwlock); if(l_pkt->hdr.type == STREAM_PKT_TYPE_DATA_PACKET) { @@ -764,29 +719,27 @@ void stream_proc_pkt_in(dap_stream_t * a_stream) return; } - _detect_loose_packet(a_stream); + s_detect_loose_packet(a_stream); // Find channel - dap_stream_ch_t * ch = NULL; - pthread_rwlock_wrlock (&a_stream->rwlock); + dap_stream_ch_t * l_ch = NULL; for(size_t i=0;i<a_stream->channel_count;i++){ if(a_stream->channel[i]->proc){ if(a_stream->channel[i]->proc->id == l_ch_pkt->hdr.id ){ - ch=a_stream->channel[i]; + l_ch=a_stream->channel[i]; } } } - pthread_rwlock_unlock (&a_stream->rwlock); - if(ch){ - ch->stat.bytes_read+=l_ch_pkt->hdr.size; - if(ch->proc) - if(ch->proc->packet_in_callback){ + if(l_ch){ + l_ch->stat.bytes_read+=l_ch_pkt->hdr.size; + if(l_ch->proc) + if(l_ch->proc->packet_in_callback){ if ( s_dump_packet_headers ){ log_it(L_INFO,"Income channel packet: id='%c' size=%u type=0x%02Xu seq_id=0x%016X enc_type=0x%02X",(char) l_ch_pkt->hdr.id, l_ch_pkt->hdr.size, l_ch_pkt->hdr.type, l_ch_pkt->hdr.seq_id , l_ch_pkt->hdr.enc_type); } - ch->proc->packet_in_callback(ch,l_ch_pkt); + l_ch->proc->packet_in_callback(l_ch,l_ch_pkt); } } else if(l_ch_pkt->hdr.id == TECHICAL_CHANNEL_ID && l_ch_pkt->hdr.type == STREAM_CH_PKT_TYPE_KEEPALIVE){ @@ -804,9 +757,6 @@ void stream_proc_pkt_in(dap_stream_t * a_stream) log_it(L_WARNING, "Unknown header type"); } - -// ev_timer_again (keepalive_loop, &a_stream->keepalive_watcher); - start_keepalive( a_stream ); DAP_DELETE(l_pkt); } @@ -815,7 +765,7 @@ void stream_proc_pkt_in(dap_stream_t * a_stream) * @param sh HTTP client instance * @param arg Processed number of bytes */ -void stream_data_read(dap_http_client_t * sh, void * arg) +static void s_stream_data_read(dap_http_client_t * sh, void * arg) { s_http_client_data_read(sh->esocket,arg); } @@ -826,7 +776,7 @@ void stream_data_read(dap_http_client_t * sh, void * arg) * @brief stream_delete Delete stream and free its resources * @param sid Stream id */ -void stream_delete(dap_http_client_t * sh, void * arg) +static void s_stream_delete(dap_http_client_t * sh, void * arg) { s_es_callback_delete(sh->esocket,arg); } diff --git a/dap-sdk/net/stream/stream/include/dap_stream.h b/dap-sdk/net/stream/stream/include/dap_stream.h index 3130f61dfb0add3d018a2855353fd3c1a7644fd5..888be346b5ae1b3b43b0d3aea5bc9004bfc999e1 100644 --- a/dap-sdk/net/stream/stream/include/dap_stream.h +++ b/dap-sdk/net/stream/stream/include/dap_stream.h @@ -52,7 +52,6 @@ typedef void (*dap_stream_callback)( dap_stream_t *,void*); typedef struct dap_stream { int id; - pthread_rwlock_t rwlock; dap_stream_session_t * session; dap_events_socket_t * esocket; // Connection dap_stream_worker_t * stream_worker; @@ -102,7 +101,7 @@ void dap_stream_add_proc_http(dap_http_t * sh, const char * url); void dap_stream_add_proc_udp(dap_udp_server_t * sh); -dap_stream_t* dap_stream_new_es(dap_events_socket_t * a_es); +dap_stream_t* dap_stream_new_es_client(dap_events_socket_t * a_es); size_t dap_stream_data_proc_read(dap_stream_t * a_stream); size_t dap_stream_data_proc_write(dap_stream_t * a_stream); void dap_stream_delete(dap_stream_t * a_stream);