From 902e3baa14b11f11ac39fdbd54aa64981958a9cc Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net> Date: Thu, 22 Jul 2021 22:29:17 +0700 Subject: [PATCH] [*] Fixed stream packet detect issues on small data parts in --- CMakeLists.txt | 2 +- dap-sdk/net/stream/stream/dap_stream.c | 57 ++++++++++--------- dap-sdk/net/stream/stream/dap_stream_pkt.c | 14 +++-- .../stream/stream/include/dap_stream_pkt.h | 6 +- 4 files changed, 41 insertions(+), 38 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 18af59136b..a719384a3f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 3.0) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.9-13") +set(CELLFRAME_SDK_NATIVE_VERSION "2.9-14") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") message("Cellframe modules: ${CELLFRAME_MODULES}") diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index 3d60bfd33c..24c832a66d 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -537,9 +537,9 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) if (!a_stream->esocket) return 0; - char *buf_in = (char*)a_stream->esocket->buf_in ; + char *l_buf_in = (char*)a_stream->esocket->buf_in ; size_t buf_in_size = a_stream->esocket->buf_in_size ; - uint8_t *proc_data = (uint8_t *)buf_in;//a_stream->conn->buf_in; + byte_t *proc_data = (byte_t *)l_buf_in;//a_stream->conn->buf_in; bool proc_data_defrag=false; // We are or not in defrag buffer size_t read_bytes_to=0; size_t bytes_left_to_read = buf_in_size;//a_stream->conn->buf_in_size; @@ -548,21 +548,21 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) pkt = a_stream->pkt_buf_in; if ( pkt ) { // Packet signature detected - if(a_stream->pkt_buf_in_data_size < sizeof(stream_pkt_hdr_t)) + if(a_stream->pkt_buf_in_data_size < sizeof(dap_stream_pkt_hdr_t)) { //At first read header - dap_stream_pkt_t* check_pkt = dap_stream_pkt_detect( proc_data , sizeof(stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size); + dap_stream_pkt_t* check_pkt = dap_stream_pkt_detect( proc_data , sizeof(dap_stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size); if(check_pkt){ // Got duplication of packet header several times - //log_it(L_DEBUG, "Drop incorrect header part"); + log_it(L_DEBUG, "Drop incorrect header part"); a_stream->pkt_buf_in = NULL; a_stream->pkt_buf_in_data_size=0; return 0; } - if(sizeof(stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size > bytes_left_to_read) + if(sizeof(dap_stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size > bytes_left_to_read) read_bytes_to = bytes_left_to_read; else - read_bytes_to = sizeof(stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size; + read_bytes_to = sizeof(dap_stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size; memcpy((uint8_t*)a_stream->pkt_buf_in+a_stream->pkt_buf_in_data_size,proc_data,read_bytes_to); bytes_left_to_read-=read_bytes_to; a_stream->pkt_buf_in_data_size += read_bytes_to; @@ -570,8 +570,8 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) read_bytes_to = 0; } - if ((pkt->hdr.size + sizeof(stream_pkt_hdr_t) -a_stream->pkt_buf_in_data_size) < bytes_left_to_read ) { // Looks the all packet is present in buffer - read_bytes_to=(a_stream->pkt_buf_in->hdr.size + sizeof(stream_pkt_hdr_t) -a_stream->pkt_buf_in_data_size); + if ((pkt->hdr.size + sizeof(dap_stream_pkt_hdr_t) -a_stream->pkt_buf_in_data_size) < bytes_left_to_read ) { // Looks the all packet is present in buffer + read_bytes_to=(a_stream->pkt_buf_in->hdr.size + sizeof(dap_stream_pkt_hdr_t) -a_stream->pkt_buf_in_data_size); }else{ read_bytes_to=bytes_left_to_read; } @@ -580,8 +580,8 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) bytes_left_to_read-=read_bytes_to; //log_it(L_DEBUG, "Prefilled packet buffer on %u bytes", read_bytes_to); read_bytes_to=0; - if(a_stream->pkt_buf_in_data_size>=(a_stream->pkt_buf_in->hdr.size + sizeof(stream_pkt_hdr_t)) ){ // If we have all the packet in packet buffer - if(a_stream->pkt_buf_in_data_size > a_stream->pkt_buf_in->hdr.size + sizeof(stream_pkt_hdr_t)){ // If we have little more data then we need for packet buffer + if(a_stream->pkt_buf_in_data_size>=(a_stream->pkt_buf_in->hdr.size + sizeof(dap_stream_pkt_hdr_t)) ){ // If we have all the packet in packet buffer + if(a_stream->pkt_buf_in_data_size > a_stream->pkt_buf_in->hdr.size + sizeof(dap_stream_pkt_hdr_t)){ // If we have little more data then we need for packet buffer //log_it(L_WARNING,"Prefilled packet buffer has %u bytes more than we need, they're lost",a_stream->pkt_buf_in_data_size-a_stream->pkt_buf_in->hdr.size); a_stream->pkt_buf_in_data_size = 0; a_stream->pkt_buf_in = NULL; @@ -590,7 +590,7 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) s_stream_proc_pkt_in(a_stream); } } - proc_data = (uint8_t *)(buf_in + buf_in_size - bytes_left_to_read);//proc_data=(a_stream->conn->buf_in + a_stream->conn->buf_in_size - bytes_left_to_read); + proc_data = (uint8_t *)(l_buf_in + buf_in_size - bytes_left_to_read);//proc_data=(a_stream->conn->buf_in + a_stream->conn->buf_in_size - bytes_left_to_read); }else if( a_stream->buf_defrag_size>0){ // If smth is present in defrag buffer - we glue everything together in it if( bytes_left_to_read > 0){ // If there is smth to process in input buffer @@ -616,35 +616,36 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) // log_it(DEBUG,"No prefill or defrag buffer, process directly buf_in"); // Now lets see how many packets we have in buffer now while ( (pkt = dap_stream_pkt_detect( proc_data , bytes_left_to_read)) ){ - - if(pkt->hdr.size > STREAM_PKT_SIZE_MAX ){ - //log_it(L_ERROR, "stream_pkt_detect() Too big packet size %u", - // pkt->hdr.size); - bytes_left_to_read=0; - break; + if(bytes_left_to_read -((byte_t*)pkt- proc_data ) >=sizeof (dap_stream_pkt_t)){ + if(pkt->hdr.size > STREAM_PKT_SIZE_MAX ){ + //log_it(L_ERROR, "stream_pkt_detect() Too big packet size %u", + // pkt->hdr.size); + bytes_left_to_read=0; + break; + } } size_t pkt_offset=( ((uint8_t*)pkt)- proc_data ); bytes_left_to_read -= pkt_offset ; found_sig=true; //dap_stream_pkt_t *temp_pkt = dap_stream_pkt_detect( (uint8_t*)pkt + 1 ,pkt->hdr.size+sizeof(stream_pkt_hdr_t) ); - - if(bytes_left_to_read <(pkt->hdr.size+sizeof(stream_pkt_hdr_t) )){ // Is all the packet in da buf? + if(bytes_left_to_read >= sizeof (dap_stream_pkt_t)) + if(bytes_left_to_read <(pkt->hdr.size+sizeof(dap_stream_pkt_t) )){ // Is all the packet in da buf? read_bytes_to=bytes_left_to_read; }else{ - read_bytes_to=pkt->hdr.size+sizeof(stream_pkt_hdr_t); + read_bytes_to=pkt->hdr.size+sizeof(dap_stream_pkt_t); } //log_it(L_DEBUG, "Detected packet signature pkt->hdr.size=%u read_bytes_to=%u bytes_left_to_read=%u pkt_offset=%u" // ,pkt->hdr.size, read_bytes_to, bytes_left_to_read,pkt_offset); if(read_bytes_to > HEADER_WITH_SIZE_FIELD){ // If we have size field, we can allocate memory - a_stream->pkt_buf_in_size_expected =( pkt->hdr.size+sizeof(stream_pkt_hdr_t)); + a_stream->pkt_buf_in_size_expected =( pkt->hdr.size+sizeof(dap_stream_pkt_hdr_t)); size_t pkt_buf_in_size_expected=a_stream->pkt_buf_in_size_expected; a_stream->pkt_buf_in=(dap_stream_pkt_t *) malloc(pkt_buf_in_size_expected); - if(read_bytes_to>(pkt->hdr.size+sizeof(stream_pkt_hdr_t) )){ + if(read_bytes_to>(pkt->hdr.size+sizeof(dap_stream_pkt_hdr_t) )){ //log_it(L_WARNING,"For some strange reasons we have read_bytes_to=%u is bigger than expected pkt length(%u bytes). Dropped %u bytes", // pkt->hdr.size+sizeof(stream_pkt_hdr_t),read_bytes_to- pkt->hdr.size+sizeof(stream_pkt_hdr_t)); - read_bytes_to=(pkt->hdr.size+sizeof(stream_pkt_hdr_t)); + read_bytes_to=(pkt->hdr.size+sizeof(dap_stream_pkt_hdr_t)); } if(read_bytes_to>bytes_left_to_read){ //log_it(L_WARNING,"For some strange reasons we have read_bytes_to=%u is bigger that's left in input buffer (%u bytes). Dropped %u bytes", @@ -655,11 +656,11 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) proc_data+=(read_bytes_to + pkt_offset); bytes_left_to_read-=read_bytes_to; a_stream->pkt_buf_in_data_size=(read_bytes_to); - if(a_stream->pkt_buf_in_data_size==(pkt->hdr.size + sizeof(stream_pkt_hdr_t))){ + if(a_stream->pkt_buf_in_data_size==(pkt->hdr.size + sizeof(dap_stream_pkt_hdr_t))){ // log_it(INFO,"All the packet is present in da buffer (hdr.size=%u read_bytes_to=%u buf_in_size=%u)" // ,sid->pkt_buf_in->hdr.size,read_bytes_to,sid->conn->buf_in_size); s_stream_proc_pkt_in(a_stream); - }else if(a_stream->pkt_buf_in_data_size>pkt->hdr.size + sizeof(stream_pkt_hdr_t)){ + }else if(a_stream->pkt_buf_in_data_size>pkt->hdr.size + sizeof(dap_stream_pkt_hdr_t)){ //log_it(L_WARNING,"Input: packet buffer has %u bytes more than we need, they're lost",a_stream->pkt_buf_in_data_size-pkt->hdr.size); }else{ //log_it(L_DEBUG,"Input: Not all stream packet in input (hdr.size=%u read_bytes_to=%u)",a_stream->pkt_buf_in->hdr.size,read_bytes_to); @@ -744,7 +745,7 @@ static void s_stream_proc_pkt_in(dap_stream_t * a_stream) } break; case STREAM_PKT_TYPE_KEEPALIVE: { //log_it(L_DEBUG, "Keep alive check recieved"); - stream_pkt_hdr_t l_ret_pkt = {}; + dap_stream_pkt_hdr_t l_ret_pkt = {}; l_ret_pkt.type = STREAM_PKT_TYPE_ALIVE; memcpy(l_ret_pkt.sig, c_dap_stream_sig, sizeof(l_ret_pkt.sig)); dap_events_socket_write_unsafe(a_stream->esocket, &l_ret_pkt, sizeof(l_ret_pkt)); @@ -794,7 +795,7 @@ static bool s_keepalive_cb( void ) dap_stream_t *l_stream, *tmp; dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); pthread_mutex_lock( &s_mutex_keepalive_list ); - stream_pkt_hdr_t l_pkt = {0}; + dap_stream_pkt_hdr_t l_pkt = {0}; l_pkt.type = STREAM_PKT_TYPE_KEEPALIVE; memcpy(l_pkt.sig, c_dap_stream_sig, sizeof(l_pkt.sig)); DL_FOREACH_SAFE( s_stream_keepalive_list, l_stream, tmp ) { diff --git a/dap-sdk/net/stream/stream/dap_stream_pkt.c b/dap-sdk/net/stream/stream/dap_stream_pkt.c index e1b43a9d92..2a8e048120 100644 --- a/dap-sdk/net/stream/stream/dap_stream_pkt.c +++ b/dap-sdk/net/stream/stream/dap_stream_pkt.c @@ -75,9 +75,11 @@ dap_stream_pkt_t * dap_stream_pkt_detect(void * a_data, size_t data_size) break; if(memcmp(sig_start,c_dap_stream_sig,sizeof(c_dap_stream_sig))==0){ ret= (dap_stream_pkt_t*) sig_start; - if(ret->hdr.size > STREAM_PKT_SIZE_MAX ){ - //log_it(L_ERROR, "Too big packet size %u",ret->hdr.size); - ret=NULL; + if(length_left>= sizeof (dap_stream_ch_pkt_t) ){ + if(ret->hdr.size > STREAM_PKT_SIZE_MAX ){ + log_it(L_ERROR, "Too big packet size %u",ret->hdr.size); + ret=NULL; + } } break; }else @@ -134,7 +136,7 @@ size_t dap_stream_pkt_read_unsafe( dap_stream_t * a_stream, dap_stream_pkt_t * a size_t dap_stream_pkt_write_unsafe(dap_stream_t * a_stream, const void * a_data, size_t a_data_size) { size_t ret=0; - stream_pkt_hdr_t pkt_hdr; + dap_stream_pkt_hdr_t pkt_hdr; uint8_t * l_buf_allocated = NULL; uint8_t * l_buf_selected = a_stream->buf; @@ -169,13 +171,13 @@ size_t dap_stream_pkt_write_unsafe(dap_stream_t * a_stream, const void * a_data, size_t dap_stream_pkt_write_mt(dap_worker_t * a_w,dap_events_socket_t *a_es, dap_enc_key_t *a_key, const void * a_data, size_t a_data_size) { dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); - stream_pkt_hdr_t *l_pkt_hdr; + dap_stream_pkt_hdr_t *l_pkt_hdr; l_msg->esocket = a_es; if(a_es) l_msg->esocket_uuid = a_es->uuid; // TODO replace function signature with UUID in place of worker+esocket l_msg->data_size = 16-a_data_size%16+a_data_size+sizeof(*l_pkt_hdr); l_msg->data = DAP_NEW_SIZE(void,l_msg->data_size); - l_pkt_hdr=(stream_pkt_hdr_t*) l_msg->data; + l_pkt_hdr=(dap_stream_pkt_hdr_t*) l_msg->data; memset(l_pkt_hdr,0,sizeof(*l_pkt_hdr)); memcpy(l_pkt_hdr->sig,c_dap_stream_sig,sizeof(l_pkt_hdr->sig)); l_msg->data_size=sizeof (*l_pkt_hdr) +dap_enc_code(a_key, a_data,a_data_size, ((byte_t*)l_msg->data)+sizeof (*l_pkt_hdr),l_msg->data_size-sizeof (*l_pkt_hdr),DAP_ENC_DATA_TYPE_RAW); diff --git a/dap-sdk/net/stream/stream/include/dap_stream_pkt.h b/dap-sdk/net/stream/stream/include/dap_stream_pkt.h index 6f15299eeb..7e91fc28c1 100644 --- a/dap-sdk/net/stream/stream/include/dap_stream_pkt.h +++ b/dap-sdk/net/stream/stream/include/dap_stream_pkt.h @@ -31,17 +31,17 @@ typedef struct dap_stream_session dap_stream_session_t; #define STREAM_PKT_TYPE_KEEPALIVE 0x11 #define STREAM_PKT_TYPE_ALIVE 0x12 -typedef struct stream_pkt_hdr{ +typedef struct dap_stream_pkt_hdr{ uint8_t sig[8]; // Signature to find out beginning of the frame uint32_t size; uint64_t timestamp; uint8_t type; // Packet type uint64_t src_addr; // Source address ( vasya@domain.net ) uint64_t dst_addr; // Destination address ( general#domain.net ) -} __attribute__((packed)) stream_pkt_hdr_t; +} __attribute__((packed)) dap_stream_pkt_hdr_t; typedef struct dap_stream_pkt{ - stream_pkt_hdr_t hdr; + dap_stream_pkt_hdr_t hdr; uint8_t data[]; } __attribute__((packed)) dap_stream_pkt_t; -- GitLab