diff --git a/stream/dap_stream.c b/stream/dap_stream.c index 5ff1ca8f3b4af11cf2a84c967e42f7440f78df4e..a1380480aa42b6e9d1436e499ca2a2cb541342e5 100644 --- a/stream/dap_stream.c +++ b/stream/dap_stream.c @@ -268,6 +268,7 @@ dap_stream_t * stream_new(dap_http_client_t * sh) ret->conn = sh->client; ret->conn_http=sh; ret->buf_defrag_size = 0; + ret->last_seq_id_packet = (size_t)-1; ret->conn->_internal=ret; @@ -387,137 +388,116 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) bool proc_data_defrag=false; // We are or not in defrag buffer size_t read_bytes_to=0; size_t bytes_left_to_read=a_stream->conn->buf_in_size; + + //log_it(L_DEBUG, "---> Stream data read %u bytes", bytes_left_to_read); + // Process prebuffered packets or glue defragmented data with the current input - if(pkt=a_stream->pkt_buf_in){ // Packet signature detected - if(a_stream->pkt_buf_in_data_size < sizeof(stream_pkt_hdr_t)) - { - //At first read header - dap_stream_pkt_t* check_pkt = dap_stream_pkt_detect( proc_data , sizeof(stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size); - if(check_pkt){ - // Got duplication of packet header several times - //log_it(L_DEBUG, "Drop incorrect header part"); - a_stream->pkt_buf_in = NULL; - a_stream->pkt_buf_in_data_size=0; - return 0; - } - if(sizeof(stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size > bytes_left_to_read) - read_bytes_to = bytes_left_to_read; - else - read_bytes_to = sizeof(stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size; - memcpy((uint8_t*)a_stream->pkt_buf_in+a_stream->pkt_buf_in_data_size,proc_data,read_bytes_to); - bytes_left_to_read-=read_bytes_to; - a_stream->pkt_buf_in_data_size += read_bytes_to; - proc_data += read_bytes_to; - read_bytes_to = 0; - } - if ((pkt->hdr.size + sizeof(stream_pkt_hdr_t) -a_stream->pkt_buf_in_data_size) < bytes_left_to_read ) { // Looks the all packet is present in buffer - read_bytes_to=(a_stream->pkt_buf_in->hdr.size + sizeof(stream_pkt_hdr_t) -a_stream->pkt_buf_in_data_size); - }else{ + if(pkt=a_stream->pkt_buf_in){ // Complete prebuffered packet + if ((pkt->hdr.size-a_stream->pkt_buf_in_data_size) < bytes_left_to_read ) { // Looks the all packet is present in buffer + read_bytes_to=(a_stream->pkt_buf_in->hdr.size-a_stream->pkt_buf_in_data_size); + }else read_bytes_to=bytes_left_to_read; - } - memcpy((uint8_t*)a_stream->pkt_buf_in+a_stream->pkt_buf_in_data_size,proc_data,read_bytes_to); + + memcpy(a_stream->pkt_buf_in->data+a_stream->pkt_buf_in_data_size,proc_data,read_bytes_to); a_stream->pkt_buf_in_data_size+=read_bytes_to; bytes_left_to_read-=read_bytes_to; - //log_it(L_DEBUG, "Prefilled packet buffer on %u bytes", read_bytes_to); + //log_it(DEBUG, "Prefilled packet buffer on %u bytes", read_bytes_to); read_bytes_to=0; - if(a_stream->pkt_buf_in_data_size>=(a_stream->pkt_buf_in->hdr.size + sizeof(stream_pkt_hdr_t)) ){ // If we have all the packet in packet buffer - if(a_stream->pkt_buf_in_data_size > a_stream->pkt_buf_in->hdr.size + sizeof(stream_pkt_hdr_t)){ // If we have little more data then we need for packet buffer - //log_it(L_WARNING,"Prefilled packet buffer has %u bytes more than we need, they're lost",a_stream->pkt_buf_in_data_size-a_stream->pkt_buf_in->hdr.size); - a_stream->pkt_buf_in_data_size = 0; - a_stream->pkt_buf_in = NULL; - } - else{ - stream_proc_pkt_in(a_stream); + if(a_stream->pkt_buf_in_data_size>=(a_stream->pkt_buf_in->hdr.size) ){ // If we have all the packet in packet buffer + if(a_stream->pkt_buf_in_data_size>a_stream->pkt_buf_in->hdr.size){ // If we have little more data then we need for packet buffer + log_it(L_WARNING,"Prefilled packet buffer has %u bytes more than we need, they're lost",a_stream->pkt_buf_in_data_size-a_stream->pkt_buf_in->hdr.size); } + stream_proc_pkt_in(a_stream); } proc_data=(a_stream->conn->buf_in + a_stream->conn->buf_in_size - bytes_left_to_read); - }else if( a_stream->buf_defrag_size>0){ // If smth is present in defrag buffer - we glue everything together in it if( bytes_left_to_read > 0){ // If there is smth to process in input buffer read_bytes_to=bytes_left_to_read; - if( (read_bytes_to + a_stream->buf_defrag_size) > sizeof(a_stream->buf_defrag) ){ - //log_it(L_WARNING,"Defrag buffer is overfilled, drop that" ); + if( (read_bytes_to+ a_stream->buf_defrag_size) > sizeof(a_stream->buf_defrag) ){ + log_it(L_WARNING,"Defrag buffer is overfilled, drop that" ); if(read_bytes_to>sizeof(a_stream->buf_defrag)) read_bytes_to=sizeof(a_stream->buf_defrag); a_stream->buf_defrag_size=0; } - //log_it(L_DEBUG,"Glue together defrag %u bytes and current %u bytes", a_stream->buf_defrag_size, read_bytes_to); + // log_it(DEBUG,"Glue together defrag %u bytes and current %u bytes", sid->buf_defrag_size, read_bytes_to); memcpy(a_stream->buf_defrag+a_stream->buf_defrag_size,proc_data,read_bytes_to ); bytes_left_to_read=a_stream->buf_defrag_size+read_bytes_to; // Then we have to read em all read_bytes_to=0; }else{ bytes_left_to_read=a_stream->buf_defrag_size; - //log_it(L_DEBUG,"Nothing to glue with defrag buffer, going to process just that (%u bytes)", bytes_left_to_read); + // log_it(DEBUG,"Nothing to glue with defrag buffer, going to process just that (%u bytes)", bytes_left_to_read); } - //log_it(L_WARNING,"Switch to defrag buffer"); proc_data=a_stream->buf_defrag; proc_data_defrag=true; }//else // log_it(DEBUG,"No prefill or defrag buffer, process directly buf_in"); + // Now lets see how many packets we have in buffer now while(pkt=dap_stream_pkt_detect( proc_data , bytes_left_to_read)){ if(pkt->hdr.size > STREAM_PKT_SIZE_MAX ){ - //log_it(L_ERROR, "stream_pkt_detect() Too big packet size %u", - // pkt->hdr.size); + log_it(L_ERROR, "stream_pkt_detect() Too big packet size %u", + pkt->hdr.size); bytes_left_to_read=0; break; } size_t pkt_offset=( ((uint8_t*)pkt)- proc_data ); bytes_left_to_read -= pkt_offset ; found_sig=true; - dap_stream_pkt_t* temp_pkt = dap_stream_pkt_detect( (uint8_t*)pkt + 1 ,pkt->hdr.size+sizeof(stream_pkt_hdr_t) ); if(bytes_left_to_read <(pkt->hdr.size+sizeof(stream_pkt_hdr_t) )){ // Is all the packet in da buf? read_bytes_to=bytes_left_to_read; }else{ read_bytes_to=pkt->hdr.size+sizeof(stream_pkt_hdr_t); } - //log_it(L_DEBUG, "Detected packet signature pkt->hdr.size=%u read_bytes_to=%u bytes_left_to_read=%u pkt_offset=%u" - // ,pkt->hdr.size, read_bytes_to, bytes_left_to_read,pkt_offset); - if(read_bytes_to > HEADER_WITH_SIZE_FIELD){ // If we have size field, we can allocate memory + //log_it(L_DEBUG, "Detected packet signature pkt->hdr.size=%u pkt->hdr.seq_id=%u read_bytes_to=%u bytes_left_to_read=%u pkt_offset=%u" + // ,pkt->hdr.size,pkt->hdr.seq_id , read_bytes_to, bytes_left_to_read,pkt_offset); + + if(read_bytes_to>0){ // We have smth to read, right? a_stream->pkt_buf_in_size_expected =( pkt->hdr.size+sizeof(stream_pkt_hdr_t)); size_t pkt_buf_in_size_expected=a_stream->pkt_buf_in_size_expected; + a_stream->pkt_buf_in=(dap_stream_pkt_t *) malloc(pkt_buf_in_size_expected); if(read_bytes_to>(pkt->hdr.size+sizeof(stream_pkt_hdr_t) )){ - //log_it(L_WARNING,"For some strange reasons we have read_bytes_to=%u is bigger than expected pkt length(%u bytes). Dropped %u bytes", - // pkt->hdr.size+sizeof(stream_pkt_hdr_t),read_bytes_to- pkt->hdr.size+sizeof(stream_pkt_hdr_t)); + log_it(L_WARNING,"For some strange reasons we have read_bytes_to=%u is bigger than expected pkt length(%u bytes). Dropped %u bytes", + pkt->hdr.size+sizeof(stream_pkt_hdr_t),read_bytes_to- pkt->hdr.size+sizeof(stream_pkt_hdr_t)); read_bytes_to=(pkt->hdr.size+sizeof(stream_pkt_hdr_t)); } if(read_bytes_to>bytes_left_to_read){ - //log_it(L_WARNING,"For some strange reasons we have read_bytes_to=%u is bigger that's left in input buffer (%u bytes). Dropped %u bytes", - // read_bytes_to,bytes_left_to_read); + log_it(L_WARNING,"For some strange reasons we have read_bytes_to=%u is bigger that's left in input buffer (%u bytes). Dropped %u bytes", + read_bytes_to,bytes_left_to_read); read_bytes_to=bytes_left_to_read; } memcpy(a_stream->pkt_buf_in,pkt,read_bytes_to); - proc_data+=(read_bytes_to + pkt_offset); + proc_data+=read_bytes_to; bytes_left_to_read-=read_bytes_to; - a_stream->pkt_buf_in_data_size=(read_bytes_to); - if(a_stream->pkt_buf_in_data_size==(pkt->hdr.size + sizeof(stream_pkt_hdr_t))){ + a_stream->pkt_buf_in_data_size=(read_bytes_to-sizeof(pkt->hdr)); + if(a_stream->pkt_buf_in_data_size==(pkt->hdr.size)){ // log_it(INFO,"All the packet is present in da buffer (hdr.size=%u read_bytes_to=%u buf_in_size=%u)" // ,sid->pkt_buf_in->hdr.size,read_bytes_to,sid->conn->buf_in_size); stream_proc_pkt_in(a_stream); - }else if(a_stream->pkt_buf_in_data_size>pkt->hdr.size + sizeof(stream_pkt_hdr_t)){ - //log_it(L_WARNING,"Input: packet buffer has %u bytes more than we need, they're lost",a_stream->pkt_buf_in_data_size-pkt->hdr.size); + }else if(a_stream->pkt_buf_in_data_size>pkt->hdr.size){ + log_it(L_WARNING,"Input: packet buffer has %u bytes more than we need, they're lost",a_stream->pkt_buf_in_data_size-pkt->hdr.size); }else{ - //log_it(L_DEBUG,"Input: Not all stream packet in input (hdr.size=%u read_bytes_to=%u)",a_stream->pkt_buf_in->hdr.size,read_bytes_to); + // log_it(DEBUG,"Input: Not all stream packet in input (hdr.size=%u read_bytes_to=%u)",sid->pkt_buf_in->hdr.size,read_bytes_to); } }else{ - break; + //log_it(DEBUG,"Input: read_bytes_to is zero inside the process loop, going to breaking out that"); } } if(!found_sig){ - //log_it(DEBUG,"Input: Not found signature in the incomming data ( client->buf_in_size = %u *ret = %u )", - // sh->client->buf_in_size, *ret); +// log_it(L_DEBUG,"Input: Not found signature in the incomming data ( client->buf_in_size = %u)", +// a_stream->conn->buf_in_size); } + if(bytes_left_to_read>0){ - if(proc_data_defrag){ + if(proc_data_defrag){ memmove(a_stream->buf_defrag, proc_data, bytes_left_to_read); a_stream->buf_defrag_size=bytes_left_to_read; - //log_it(L_INFO,"Fragment of %u bytes shifted in the begining the defrag buffer",bytes_left_to_read); + // log_it(INFO,"Fragment of %u bytes shifted in the begining the defrag buffer",bytes_left_to_read); }else{ memcpy(a_stream->buf_defrag, proc_data, bytes_left_to_read); a_stream->buf_defrag_size=bytes_left_to_read; - //log_it(L_INFO,"Fragment of %u bytes stored in defrag buffer",bytes_left_to_read); + // log_it(INFO,"Fragment of %u bytes stored in defrag buffer",bytes_left_to_read); } }else if(proc_data_defrag){ a_stream->buf_defrag_size=0; @@ -584,6 +564,30 @@ void stream_dap_new(dap_client_remote_t* sh, void * arg){ } +static bool _detect_loose_packet(dap_stream_t * sid) +{ + dap_stream_ch_pkt_t * ch_pkt = (dap_stream_ch_pkt_t *) sid->buf_pkt_in; + + int count_loosed_packets = ch_pkt->hdr.seq_id - (sid->last_seq_id_packet + 1); + if(count_loosed_packets > 0) + { + int count_loosed = sid->last_seq_id_packet + 1; + + log_it(L_WARNING, "Detected loosed %d packets. " + "Last read seq_id packet: %d Current: %d", count_loosed_packets, + sid->last_seq_id_packet, ch_pkt->hdr.seq_id); + } else if(count_loosed_packets < 0) { + log_it(L_WARNING, "Something wrong. count_loosed packets %d can't less than zero. " + "Last read seq_id packet: %d Current: %d", count_loosed_packets, + sid->last_seq_id_packet, ch_pkt->hdr.seq_id); + } +// log_it(L_DEBUG, "Packet seq id: %d", ch_pkt->hdr.seq_id); +// log_it(L_DEBUG, "Last seq id: %d", sid->last_seq_id_packet); + sid->last_seq_id_packet = ch_pkt->hdr.seq_id; + return false; +} + + /** * @brief stream_proc_pkt_in * @param sid @@ -596,6 +600,8 @@ void stream_proc_pkt_in(dap_stream_t * sid) dap_stream_pkt_read(sid,sid->pkt_buf_in, ch_pkt, STREAM_BUF_SIZE_MAX); + _detect_loose_packet(sid); + dap_stream_ch_t * ch = NULL; size_t i; for(i=0;i<sid->channel_count;i++) diff --git a/stream/dap_stream.h b/stream/dap_stream.h index ef073ea9c4aac5049fd08ba97f6f8ffb7dc5c540..2933cea9718e72f0b1186ae4d2366f8c40e7caa4 100644 --- a/stream/dap_stream.h +++ b/stream/dap_stream.h @@ -84,6 +84,7 @@ typedef struct dap_stream { size_t frame_sent; // Frame counter size_t stream_size; + size_t last_seq_id_packet; } dap_stream_t;