diff --git a/net/client/dap_client_pvt.c b/net/client/dap_client_pvt.c index 7e7b88faa0bb1cf69342d70170e20aef696268cc..864b22daa88d686ce243ae3cd9cc171a2dbdf649 100644 --- a/net/client/dap_client_pvt.c +++ b/net/client/dap_client_pvt.c @@ -1337,16 +1337,16 @@ static void s_stream_es_callback_read(dap_events_socket_t * a_es, void * arg) l_client_pvt->stage_status = STAGE_STATUS_DONE; s_stage_status_after(l_client_pvt); - dap_stream_data_proc_read(l_client_pvt->stream); - dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size); + size_t l_bytes_read = dap_stream_data_proc_read(l_client_pvt->stream); + dap_events_socket_shrink_buf_in(a_es, l_bytes_read); } } } } break; case STAGE_STREAM_STREAMING: { // if streaming - process data with stream processor - dap_stream_data_proc_read(l_client_pvt->stream); - dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size); + size_t l_bytes_read = dap_stream_data_proc_read(l_client_pvt->stream); + dap_events_socket_shrink_buf_in(a_es, l_bytes_read); } break; default: { diff --git a/net/stream/stream/dap_stream.c b/net/stream/stream/dap_stream.c index c7eb83b8981630207786d7c16174c9dd073fd545..6493ed7d2d451537d826b0484d4f412b4e49a92a 100644 --- a/net/stream/stream/dap_stream.c +++ b/net/stream/stream/dap_stream.c @@ -81,7 +81,7 @@ static dap_enc_key_type_t s_stream_get_preferred_encryption_type = DAP_ENC_KEY static int s_add_stream_info(authorized_stream_t **a_hash_table, authorized_stream_t *a_item, dap_stream_t *a_stream); -static void s_stream_proc_pkt_in(dap_stream_t * a_stream, dap_stream_pkt_t *l_pkt, size_t l_pkt_size); +static void s_stream_proc_pkt_in(dap_stream_t * a_stream, dap_stream_pkt_t *l_pkt); // Callbacks for HTTP client static void s_http_client_headers_read(dap_http_client_t * a_http_client, void * a_arg); // Prepare stream when all headers are read @@ -429,7 +429,6 @@ void dap_stream_delete_unsafe(dap_stream_t *a_stream) s_stream_delete_from_list(a_stream); DAP_DEL_Z(a_stream->buf_fragments); - DAP_DEL_Z(a_stream->pkt_buf_in); DAP_DELETE(a_stream); log_it(L_NOTICE,"Stream connection is over"); } @@ -676,96 +675,38 @@ static void s_http_client_delete(dap_http_client_t * a_http_client, void *a_arg) size_t dap_stream_data_proc_read (dap_stream_t *a_stream) { dap_return_val_if_fail(a_stream && a_stream->esocket && a_stream->esocket->buf_in, 0); - - byte_t *l_buf_in = a_stream->esocket->buf_in; - size_t l_buf_in_size = a_stream->esocket->buf_in_size; - - // Save the received data to stream memory - if (!a_stream->pkt_buf_in) { - a_stream->pkt_buf_in = DAP_DUP_SIZE(l_buf_in, l_buf_in_size); - a_stream->pkt_buf_in_data_size = l_buf_in_size; - } else { - debug_if(s_dump_packet_headers, L_DEBUG, "dap_stream_data_proc_read() Receive previously unprocessed data %zu bytes + new %zu bytes", - a_stream->pkt_buf_in_data_size, l_buf_in_size); - // The current data is added to rest of the previous package - a_stream->pkt_buf_in = DAP_REALLOC(a_stream->pkt_buf_in, a_stream->pkt_buf_in_data_size + l_buf_in_size); - memcpy((byte_t*)a_stream->pkt_buf_in + a_stream->pkt_buf_in_data_size, l_buf_in, l_buf_in_size); - // Increase the size of pkt_buf_in - a_stream->pkt_buf_in_data_size += l_buf_in_size; - } - // Switch to stream memory - l_buf_in = (byte_t*) a_stream->pkt_buf_in; - l_buf_in_size = a_stream->pkt_buf_in_data_size; - size_t l_buf_in_left = l_buf_in_size; - - dap_stream_pkt_t *l_pkt = NULL; - if(l_buf_in_left >= sizeof(dap_stream_pkt_hdr_t)) { - // Now lets see how many packets we have in buffer now - while(l_buf_in_left > 0 && (l_pkt = dap_stream_pkt_detect(l_buf_in, l_buf_in_left))) { // Packet signature detected - if(l_pkt->hdr.size > DAP_STREAM_PKT_SIZE_MAX) { - log_it(L_ERROR, "dap_stream_data_proc_read() Too big packet size %u, drop %zu bytes", l_pkt->hdr.size, l_buf_in_left); - // Skip this packet - l_buf_in_left = 0; - break; - } - - size_t l_pkt_offset = (((uint8_t*) l_pkt) - l_buf_in); - l_buf_in += l_pkt_offset; - l_buf_in_left -= l_pkt_offset; - - size_t l_pkt_size = l_pkt->hdr.size + sizeof(dap_stream_pkt_hdr_t); - - //log_it(L_DEBUG, "read packet offset=%zu size=%zu buf_in_left=%zu)",l_pkt_offset, l_pkt_size, l_buf_in_left); - - // Got the whole package - if(l_buf_in_left >= l_pkt_size) { - // Process data - s_stream_proc_pkt_in(a_stream, (dap_stream_pkt_t*) l_pkt, l_pkt_size); - // Go to the next data - l_buf_in += l_pkt_size; - l_buf_in_left -= l_pkt_size; - } else { - debug_if(s_dump_packet_headers,L_DEBUG, "Input: Not all stream packet in input (pkt_size=%zu buf_in_left=%zu)", l_pkt_size, l_buf_in_left); + byte_t *l_pos = a_stream->esocket->buf_in, *l_end = l_pos + a_stream->esocket->buf_in_size; + size_t l_shift = 0, l_processed_size = 0; + while ( l_pos < l_end && (l_pos = memchr( l_pos, c_dap_stream_sig[0], (size_t)(l_end - l_pos))) ) { + if ( (size_t)(l_end - l_pos) < sizeof(dap_stream_pkt_hdr_t) ) + break; + if ( !memcmp(l_pos, c_dap_stream_sig, sizeof(c_dap_stream_sig)) ) { + dap_stream_pkt_t *l_pkt = (dap_stream_pkt_t*)l_pos; + if (l_pkt->hdr.size > DAP_STREAM_PKT_SIZE_MAX) { + log_it(L_ERROR, "Invalid packet size %lu, dump it", l_pkt->hdr.size); + l_shift = sizeof(dap_stream_pkt_hdr_t); + } else if ( (l_shift = sizeof(dap_stream_pkt_hdr_t) + l_pkt->hdr.size) <= (size_t)(l_end - l_pos) ) { + debug_if(s_dump_packet_headers, L_DEBUG, "Processing full packet, size %lu", l_shift); + s_stream_proc_pkt_in(a_stream, l_pkt); + } else break; - } - } - } - - if(l_buf_in_left > 0) { - // Save the received data to stream memory for the next piece of data - if(!l_pkt) { - // pkt header not found, maybe l_buf_in_left is too small to detect pkt header, will do that next time - l_pkt = (dap_stream_pkt_t*) l_buf_in; - debug_if(s_dump_packet_headers, L_DEBUG, "dap_stream_data_proc_read() left unprocessed data %zu bytes, l_pkt=0", l_buf_in_left); - } - if(l_pkt) { - a_stream->pkt_buf_in_data_size = l_buf_in_left; - if(l_pkt != a_stream->pkt_buf_in){ - memmove(a_stream->pkt_buf_in, l_pkt, a_stream->pkt_buf_in_data_size); - //log_it(L_DEBUG, "dap_stream_data_proc_read() l_pkt=%zu != a_stream->pkt_buf_in=%zu", l_pkt, a_stream->pkt_buf_in); - } - - debug_if(s_dump_packet_headers,L_DEBUG, "dap_stream_data_proc_read() left unprocessed data %zu bytes", l_buf_in_left); - } - else { - log_it(L_ERROR, "dap_stream_data_proc_read() pkt header not found, drop %zu bytes", l_buf_in_left); - DAP_DEL_Z(a_stream->pkt_buf_in); - a_stream->pkt_buf_in_data_size = 0; - } - } - else { - DAP_DEL_Z(a_stream->pkt_buf_in); - a_stream->pkt_buf_in_data_size = 0; + l_pos += l_shift; + l_processed_size += l_shift; + } else + ++l_pos; } - return a_stream->esocket->buf_in_size; //a_stream->conn->buf_in_size; + debug_if( s_dump_packet_headers && l_processed_size, L_DEBUG, "Processed %lu / %lu bytes", + l_processed_size, (size_t)(l_end - a_stream->esocket->buf_in) ); + return l_processed_size; } /** * @brief stream_proc_pkt_in * @param sid */ -static void s_stream_proc_pkt_in(dap_stream_t * a_stream, dap_stream_pkt_t *a_pkt, size_t a_pkt_size) +static void s_stream_proc_pkt_in(dap_stream_t * a_stream, dap_stream_pkt_t *a_pkt) { + size_t a_pkt_size = sizeof(dap_stream_pkt_hdr_t) + a_pkt->hdr.size; bool l_is_clean_fragments = false; a_stream->is_active = true; diff --git a/net/stream/stream/dap_stream_pkt.c b/net/stream/stream/dap_stream_pkt.c index fc3f08d9958203aebb46ec4e7d372c3649c68584..9d482f34dd732d46bf0bf5d8723595b32b077bdf 100644 --- a/net/stream/stream/dap_stream_pkt.c +++ b/net/stream/stream/dap_stream_pkt.c @@ -33,37 +33,6 @@ const uint8_t c_dap_stream_sig [STREAM_PKT_SIG_SIZE] = {0xa0,0x95,0x96,0xa9,0x9e,0x5c,0xfb,0xfa}; -dap_stream_pkt_t * dap_stream_pkt_detect(void * a_data, size_t data_size) -{ - uint8_t * sig_start=(uint8_t*) a_data; - dap_stream_pkt_t * hpkt = NULL; - size_t length_left = data_size; - - while ( (sig_start = memchr(sig_start, c_dap_stream_sig[0], length_left)) ) { - length_left = data_size - (size_t)(sig_start - (uint8_t *)a_data); - if(length_left < sizeof(c_dap_stream_sig) ) - break; - - if ( !memcmp(sig_start, c_dap_stream_sig, sizeof(c_dap_stream_sig)) ) { - hpkt = (dap_stream_pkt_t *)sig_start; - if (length_left < sizeof(dap_stream_ch_pkt_hdr_t)) { - //log_it(L_ERROR, "Too small packet size %zu", length_left); // it's not an error, just random case - hpkt = NULL; - break; - } - if(hpkt->hdr.size > DAP_STREAM_PKT_SIZE_MAX ){ - log_it(L_ERROR, "Too big packet size %u (%#x), type:%d(%#x)", - hpkt->hdr.size, hpkt->hdr.size, hpkt->hdr.type, hpkt->hdr.type); - hpkt = NULL; - } - break; - } else - sig_start++; - } - - return hpkt; -} - /** * @brief stream_pkt_read * @param sid @@ -94,10 +63,8 @@ size_t dap_stream_pkt_write_unsafe(dap_stream_t *a_stream, uint8_t a_type, const dap_stream_pkt_hdr_t *l_pkt_hdr = (dap_stream_pkt_hdr_t*)s_pkt_buf; *l_pkt_hdr = (dap_stream_pkt_hdr_t) { .size = dap_enc_code( l_key, a_data, a_data_size, s_pkt_buf + sizeof(*l_pkt_hdr), l_full_size - sizeof(*l_pkt_hdr), DAP_ENC_DATA_TYPE_RAW ), - .type = a_type, - .timestamp = dap_time_now(), - .src_addr = g_node_addr.uint64, - .dst_addr = a_stream->node.uint64 }; + .timestamp = dap_time_now(), .type = a_type, + .src_addr = g_node_addr.uint64, .dst_addr = a_stream->node.uint64 }; memcpy(l_pkt_hdr->sig, c_dap_stream_sig, sizeof(l_pkt_hdr->sig)); return dap_events_socket_write_unsafe(a_stream->esocket, s_pkt_buf, l_full_size); } diff --git a/net/stream/stream/include/dap_stream.h b/net/stream/stream/include/dap_stream.h index 888639faeaac0536f40abb6c916da7ae517341d7..416e0e5f39bfdd8ba09516e038080501a1bb7cf9 100644 --- a/net/stream/stream/include/dap_stream.h +++ b/net/stream/stream/include/dap_stream.h @@ -60,11 +60,6 @@ typedef struct dap_stream { char *service_key; bool is_client_to_uplink; - struct dap_stream_pkt *in_pkt; - struct dap_stream_pkt *pkt_buf_in; - size_t pkt_buf_in_data_size; - size_t pkt_buf_in_size_expected; - uint8_t *buf_fragments, *pkt_cache; size_t buf_fragments_size_total;// Full size of all fragments size_t buf_fragments_size_filled;// Received size