diff --git a/stream/CMakeLists.txt b/stream/CMakeLists.txt index 65062f4f8b44f368b81114e4917345d813328add..ca68230aeae2cace417a838310efffa2a472bf82 100644 --- a/stream/CMakeLists.txt +++ b/stream/CMakeLists.txt @@ -5,6 +5,7 @@ set(STREAM_SRCS stream.c stream_ch.c stream_ch_pkt.c stream_ch_proc.c stream_ include_directories("${INCLUDE_DIRECTORIES} ${dap_core_INCLUDE_DIRS}") include_directories("${INCLUDE_DIRECTORIES} ${dap_core_server_INCLUDE_DIRS}") +include_directories("${INCLUDE_DIRECTORIES} ${dap_udp_server_INCLUDE_DIRS}") include_directories("${INCLUDE_DIRECTORIES} ${dap_crypto_INCLUDE_DIRS}") include_directories("${INCLUDE_DIRECTORIES} ${dap_client_INCLUDE_DIRS}") include_directories("${INCLUDE_DIRECTORIES} ${dap_session_INCLUDE_DIRS}") @@ -15,6 +16,7 @@ include_directories("${INCLUDE_DIRECTORIES} ${MONGO_INCLUDE_DIRS}") add_definitions ("${dap_core_DEFINITIONS}") add_definitions ("${dap_core_server_DEFINITIONS}") +add_definitions ("${dap_udp_server_DEFINITIONS}") add_definitions ("${dap_crypto_DEFINITIONS}") add_definitions ("${dap_client_DEFINITIONS}") add_definitions ("${dap_http_server_DEFINITIONS}") diff --git a/stream/stream.c b/stream/stream.c index aabc3801fd31e53345d358e45be41b1c4ef3bca7..87cb18a1fd71e8f4b0148aa6ba62355ae475ccac 100644 --- a/stream/stream.c +++ b/stream/stream.c @@ -33,6 +33,7 @@ #include "dap_http.h" #include "dap_http_client.h" #include "dap_http_header.h" +#include "dap_udp_server.h" #define LOG_TAG "stream" @@ -44,10 +45,26 @@ void stream_headers_write(dap_http_client_t * sh, void * arg); // Output headers void stream_data_write(dap_http_client_t * sh, void * arg); // Write the data void stream_data_read(dap_http_client_t * sh, void * arg); // Read the data +void stream_dap_data_read(dap_client_remote_t* sh, void * arg); +void stream_dap_data_write(dap_client_remote_t* sh, void * arg); +void stream_dap_delete(dap_client_remote_t* sh, void * arg); +void stream_dap_new(dap_client_remote_t* sh,void * arg); + // Internal functions stream_t * stream_new(dap_http_client_t * sh); // Create new stream void stream_delete(dap_http_client_t * sh, void * arg); +struct ev_loop *keepalive_loop; +pthread_t keepalive_thread; + +// Start keepalive stream +void* stream_loop(void * arg) +{ + keepalive_loop = ev_loop_new(0); + ev_loop(keepalive_loop, 0); + return NULL; +} + /** * @brief stream_init Init stream module * @return 0 if ok others if not @@ -58,6 +75,7 @@ int stream_init() log_it(L_CRITICAL, "Can't init channel types submodule"); return -1; } + pthread_create(&keepalive_thread, NULL, stream_loop, NULL); log_it(L_NOTICE,"Init streaming module"); return 0; @@ -72,28 +90,53 @@ void stream_deinit() } /** - * @brief stream_add_proc Add URL processor callback for streaming + * @brief stream_add_proc_http Add URL processor callback for streaming * @param sh HTTP server instance * @param url URL */ -void stream_add_proc(struct dap_http * sh, const char * url) +void stream_add_proc_http(struct dap_http * sh, const char * url) +{ + dap_http_add_proc(sh,url,NULL,NULL,stream_delete,stream_headers_read,stream_headers_write,stream_data_read,stream_data_write,NULL); +} + +/** + * @brief stream_add_proc_udp Add processor callback for streaming + * @param sh UDP server instance + */ +void stream_add_proc_udp(dap_udp_server_t * sh) { - dap_http_add_proc(sh,url,NULL,NULL,stream_delete,stream_headers_read,stream_headers_write,stream_data_read,stream_data_write,NULL); - //dap_http_add_proc(sh,url,NULL,NULL,NULL,stream_headers_read,stream_headers_write,NULL,stream_data_write,NULL); + dap_server_t* server = sh->dap_server; + server->client_read_callback = stream_dap_data_read; + server->client_write_callback = stream_dap_data_write; + server->client_delete_callback = stream_dap_delete; + server->client_new_callback = stream_dap_new; } +/** + * @brief stream_states_update + * @param sid stream instance + */ void stream_states_update(struct stream *sid) { - sid->conn_http->state_write=DAP_HTTP_CLIENT_STATE_START; + if(sid->conn_http) + sid->conn_http->state_write=DAP_HTTP_CLIENT_STATE_START; size_t i; bool ready_to_write=false; for(i=0;i<sid->channel_count; i++) ready_to_write|=sid->channel[i]->ready_to_write; - dap_client_ready_to_write(sid->conn,ready_to_write); - - sid->conn_http->out_content_ready=true; + if(sid->conn_udp) + dap_udp_client_ready_to_write(sid->conn_udp->client,ready_to_write); + else + dap_client_ready_to_write(sid->conn,ready_to_write); + if(sid->conn_http) + sid->conn_http->out_content_ready=true; } +/** + * @brief stream_header_read Read headers callback for HTTP + * @param cl_ht HTTP client structure + * @param arg Not used + */ void stream_headers_read(dap_http_client_t * cl_ht, void * arg) { (void) arg; @@ -126,12 +169,12 @@ void stream_headers_read(dap_http_client_t * cl_ht, void * arg) //cl_ht->client->ready_to_write=true; cl_ht->state_read=DAP_HTTP_CLIENT_STATE_DATA; cl_ht->out_content_ready=true; - stream_ch_new(sid,'s'); + stream_ch_new(sid,SERVICE_CHANNEL_ID); stream_ch_new(sid,'t'); stream_states_update(sid); dap_client_ready_to_read(cl_ht->client,true); }else{ - stream_ch_new(sid,'s'); + stream_ch_new(sid,SERVICE_CHANNEL_ID); stream_ch_new(sid,'g'); cl_ht->reply_status_code=200; @@ -154,6 +197,59 @@ void stream_headers_read(dap_http_client_t * cl_ht, void * arg) } } +/** + * @brief stream_new_udp Create new stream instance for UDP client + * @param sh DAP client structure + */ +stream_t * stream_new_udp(dap_client_remote_t * sh) +{ + stream_t * ret=(stream_t*) calloc(1,sizeof(stream_t)); + + ret->conn = sh; + ret->conn_udp=sh->_inheritor; + + sh->_internal=ret; + + log_it(L_NOTICE,"New stream instance"); + return ret; +} + +/** + * @brief check_session CHeck session status, open if need + * @param id session id + * @param cl DAP client structure + */ +void check_session(unsigned int id, dap_client_remote_t* cl){ + stream_session_t * ss=NULL; + ss=stream_session_id(id); + if(ss==NULL){ + log_it(L_ERROR,"No session id %u was found",id); + }else{ + log_it(L_INFO,"Session id %u was found with media_id = %d",id,ss->media_id); + if(stream_session_open(ss)==0){ // Create new stream + stream_t * sid; + if(STREAM(cl) == NULL) + sid = stream_new_udp(cl); + else + sid = STREAM(cl); + sid->session=ss; + if(ss->create_empty) + log_it(L_INFO, "Session created empty"); + log_it(L_INFO, "Opened stream session technical and data channels"); + stream_ch_new(sid,SERVICE_CHANNEL_ID); + stream_ch_new(sid,DATA_CHANNEL_ID); + stream_states_update(sid); + if(STREAM(cl)->conn_udp) + dap_udp_client_ready_to_read(cl,true); + else + dap_client_ready_to_read(cl,true); + start_keepalive(sid); + }else{ + log_it(L_ERROR,"Can't open session id %u",id); + } + } +} + /** * @brief stream_new Create new stream instance for HTTP client * @return New stream_t instance @@ -165,14 +261,13 @@ stream_t * stream_new(dap_http_client_t * sh) ret->conn = sh->client; ret->conn_http=sh; + ret->conn->_internal=ret; - sh->_internal=ret; log_it(L_NOTICE,"New stream instance"); return ret; } - /** * @brief stream_headers_write Prepare headers for output. Creates stream structure * @param sh HTTP client instance @@ -193,11 +288,36 @@ void stream_headers_write(dap_http_client_t * sh, void *arg) sh->state_read=DAP_HTTP_CLIENT_STATE_DATA; dap_client_ready_to_read(sh->client,true); - } } +// Function for keepalive loop +static void keepalive_cb (EV_P_ ev_timer *w, int revents) +{ + struct stream *sid = w->data; + if(sid->keepalive_passed < STREAM_KEEPALIVE_PASSES) + { + stream_send_keepalive(sid); + sid->keepalive_passed+=1; + } + else{ + log_it(L_INFO, "Client disconnected"); + ev_timer_stop (keepalive_loop, &sid->keepalive_watcher); + void * arg; + stream_dap_delete(sid->conn,arg); + } +} +/** + * @brief start_keepalive Start keepalive signals exchange for stream + * @param sid Stream instance + */ +void start_keepalive(struct stream *sid){ + keepalive_loop = EV_DEFAULT; + sid->keepalive_watcher.data = sid; + ev_timer_init (&sid->keepalive_watcher, keepalive_cb, STREAM_KEEPALIVE_TIMEOUT, STREAM_KEEPALIVE_TIMEOUT); + ev_timer_start (keepalive_loop, &sid->keepalive_watcher); +} /** * @brief stream_data_write HTTP data write callback @@ -209,104 +329,45 @@ void stream_data_write(dap_http_client_t * sh, void * arg) (void) arg; if(sh->reply_status_code==200){ - size_t i; - bool ready_to_write=false; - // log_it(L_DEBUG,"Process channels data output (%u channels)",STREAM(sh)->channel_count); - - for(i=0;i<STREAM(sh)->channel_count; i++){ - stream_ch_t * ch = STREAM(sh)->channel[i]; - if(ch->ready_to_write){ - ch->proc->packet_out_callback(ch,NULL); - ready_to_write|=ch->ready_to_write; - } - } - //log_it(L_DEBUG,"stream_data_out (ready_to_write=%s)", ready_to_write?"true":"false"); - - dap_client_ready_to_write(sh->client,ready_to_write); - //log_it(L_ERROR,"No stream_data_write_callback is defined"); + stream_dap_data_write(sh->client,arg); }else{ log_it(L_WARNING, "Wrong request, reply status code is %u",sh->reply_status_code); } } - -/** - * @brief stream_proc_pkt_in - * @param sid - */ -void stream_proc_pkt_in(stream_t * sid) -{ - // log_it(L_DEBUG,"Input: read last bytes for current packet (hdr.size=%u)",sid->pkt_buf_in-); - stream_ch_pkt_t * ch_pkt= (stream_ch_pkt_t*) calloc(1,sid->pkt_buf_in->hdr.size+sizeof(stream_ch_pkt_hdr_t)+16 ); - stream_pkt_read(sid,sid->pkt_buf_in, ch_pkt); - -// log_it (DEBUG, "Recieved channel packet with %lu of payload bytes (type '%c' id '%c')", -// ch_pkt->hdr.size,(char) ch_pkt->hdr.type, (char) ch_pkt->hdr.id); - stream_ch_t * ch = NULL; - size_t i; - for(i=0;i<sid->channel_count;i++) - if(sid->channel[i]->proc){ - if(sid->channel[i]->proc->id == ch_pkt->hdr.id ){ - ch=sid->channel[i]; - } - } - if(ch){ - ch->stat.bytes_read+=ch_pkt->hdr.size; - if(ch->proc) - if(ch->proc->packet_in_callback) - ch->proc->packet_in_callback(ch,ch_pkt); - - }else{ - log_it(L_WARNING, "Input: unprocessed channel packet id '%c'",(char) ch_pkt->hdr.id ); - } - free(sid->pkt_buf_in); - sid->pkt_buf_in=NULL; - sid->pkt_buf_in_data_size=0; - free(ch_pkt); - -} - /** - * @brief stream_data_read HTTP data read callback. Read packet and passes that to the channel's callback - * @param sh HTTP client instance - * @param arg Processed number of bytes + * @brief stream_dap_data_read Read callback for UDP client + * @param sh DAP client instance + * @param arg Not used */ - - - - -void stream_data_read(dap_http_client_t * sh, void * arg) -{ - - // log_it(L_DEBUG, "Stream data read %u bytes", sh->client->buf_in_size); - // log_it(L_DEBUG, "Stream data %s", sh->client->buf_in); +void stream_dap_data_read(dap_client_remote_t* sh, void * arg){ stream_t * sid =STREAM(sh); int * ret = (int *) arg; + int bytes_ready = 0; if(sid->pkt_buf_in){ size_t read_bytes_to=( ((sid->pkt_buf_in->hdr.size-sid->pkt_buf_in_data_size) > sid->conn->buf_in_size ) ? sid->conn->buf_in_size :(sid->pkt_buf_in->hdr.size-sid->pkt_buf_in_data_size)); - memcpy(sid->pkt_buf_in->data+sid->pkt_buf_in_data_size,sh->client->buf_in,read_bytes_to); + memcpy(sid->pkt_buf_in->data+sid->pkt_buf_in_data_size,sh->buf_in,read_bytes_to); sid->pkt_buf_in_data_size+=read_bytes_to; if(sid->pkt_buf_in_data_size>=(sid->pkt_buf_in->hdr.size) ){ stream_proc_pkt_in(sid); } - *ret+=read_bytes_to; + bytes_ready+=read_bytes_to; }else{ stream_pkt_t * pkt; - while(pkt=stream_pkt_detect( sh->client->buf_in + *ret, (sh->client->buf_in_size - ((size_t) *ret) ))){ + while(pkt=stream_pkt_detect( sh->buf_in + bytes_ready, (sh->buf_in_size - ((size_t) bytes_ready) ))){ size_t read_bytes_to=( (pkt->hdr.size+sizeof(stream_pkt_hdr_t)) > sid->conn->buf_in_size ?sid->conn->buf_in_size :(pkt->hdr.size+sizeof(stream_pkt_hdr_t) ) ); if(read_bytes_to){ sid->pkt_buf_in=(stream_pkt_t *) calloc(1,pkt->hdr.size+sizeof(stream_pkt_hdr_t)); memcpy(sid->pkt_buf_in,pkt,read_bytes_to); - *ret = (*ret)+ read_bytes_to; + bytes_ready = (bytes_ready)+ read_bytes_to; sid->pkt_buf_in_data_size=read_bytes_to-sizeof(stream_pkt_hdr_t); + if(read_bytes_to>=(pkt->hdr.size)){ - //log_it(L_INFO,"Input: read full packet (hdr.size=%u read_bytes_to=%u buf_in_size=%u)" - // ,sid->pkt_buf_in->hdr.size,read_bytes_to,sid->conn->buf_in_size); stream_proc_pkt_in(sid); }else{ log_it(L_DEBUG,"Input: Not all stream packet in input (hdr.size=%u read_bytes_to=%u)",sid->pkt_buf_in->hdr.size,read_bytes_to); @@ -315,22 +376,45 @@ void stream_data_read(dap_http_client_t * sh, void * arg) }else break; } - //log_it(L_WARNING,"Input: Not found signature in the incomming data"); - *ret += sh->client->buf_in_size; + bytes_ready += sh->buf_in_size; } - -// log_it(L_DEBUG,"Stream read data from HTTP client: %u",sh->client->buf_in_size); -// if(sh->client->buf_in_size ) + if(ret) + *ret = bytes_ready; } +/** + * @brief stream_dap_data_write Write callback for UDP client + * @param sh DAP client instance + * @param arg Not used + */ +void stream_dap_data_write(dap_client_remote_t* sh, void * arg){ + size_t i; + bool ready_to_write=false; + // log_it(L_DEBUG,"Process channels data output (%u channels)",STREAM(sh)->channel_count); + for(i=0;i<STREAM(sh)->channel_count; i++){ + stream_ch_t * ch = STREAM(sh)->channel[i]; + if(ch->ready_to_write){ + if(ch->proc->packet_out_callback) + ch->proc->packet_out_callback(ch,NULL); + ready_to_write|=ch->ready_to_write; + } + } + //log_it(L_DEBUG,"stream_data_out (ready_to_write=%s)", ready_to_write?"true":"false"); + + /* if(STREAM(sh)->conn_udp) + dap_udp_client_ready_to_write(STREAM(sh)->conn,ready_to_write); + else + dap_client_ready_to_write(sh,ready_to_write);*/ + //log_it(L_ERROR,"No stream_data_write_callback is defined"); +} /** - * @brief stream_delete Delete stream and free its resources - * @param sid Stream id + * @brief stream_dap_delete Delete callback for UDP client + * @param sh DAP client instance + * @param arg Not used */ -void stream_delete(dap_http_client_t * sh, void * arg) -{ +void stream_dap_delete(dap_client_remote_t* sh, void * arg){ stream_t * sid = STREAM(sh); if(sid == NULL) return; @@ -338,6 +422,84 @@ void stream_delete(dap_http_client_t * sh, void * arg) size_t i; for(i=0;i<sid->channel_count; i++) stream_ch_delete(sid->channel[i]); + if(sid->session) + stream_session_close(sid->session->id); //free(sid); log_it(L_NOTICE,"[core] Stream connection is finished"); } + +/** + * @brief stream_dap_new New connection callback for UDP client + * @param sh DAP client instance + * @param arg Not used + */ +void stream_dap_new(dap_client_remote_t* sh, void * arg){ + stream_t * sid = stream_new_udp(sh); +} + + +/** + * @brief stream_proc_pkt_in + * @param sid + */ +void stream_proc_pkt_in(stream_t * sid) +{ + if(sid->pkt_buf_in->hdr.type == DATA_PACKET) + { + stream_ch_pkt_t * ch_pkt= (stream_ch_pkt_t*) calloc(1,sid->pkt_buf_in->hdr.size+sizeof(stream_ch_pkt_hdr_t)+16 ); + stream_pkt_read(sid,sid->pkt_buf_in, ch_pkt); + stream_ch_t * ch = NULL; + size_t i; + for(i=0;i<sid->channel_count;i++) + if(sid->channel[i]->proc){ + if(sid->channel[i]->proc->id == ch_pkt->hdr.id ){ + ch=sid->channel[i]; + } + } + if(ch){ + ch->stat.bytes_read+=ch_pkt->hdr.size; + if(ch->proc) + if(ch->proc->packet_in_callback) + ch->proc->packet_in_callback(ch,ch_pkt); + if(ch->proc->id == SERVICE_CHANNEL_ID && ch_pkt->hdr.type == KEEPALIVE_PACKET) + stream_send_keepalive(sid); + }else{ + log_it(L_WARNING, "Input: unprocessed channel packet id '%c'",(char) ch_pkt->hdr.id ); + } + free(ch_pkt); + } + else if(sid->pkt_buf_in->hdr.type == SERVICE_PACKET) + { + stream_srv_pkt_t * srv_pkt = (stream_srv_pkt_t *)malloc(sizeof(stream_srv_pkt_t)); + memcpy(srv_pkt,sid->pkt_buf_in->data,sizeof(stream_srv_pkt_t)); + uint32_t session_id = srv_pkt->session_id; + check_session(session_id,sid->conn); + free(srv_pkt); + } + sid->keepalive_passed = 0; + ev_timer_again (keepalive_loop, &sid->keepalive_watcher); + free(sid->pkt_buf_in); + sid->pkt_buf_in=NULL; + sid->pkt_buf_in_data_size=0; +} + +/** + * @brief stream_data_read HTTP data read callback. Read packet and passes that to the channel's callback + * @param sh HTTP client instance + * @param arg Processed number of bytes + */ +void stream_data_read(dap_http_client_t * sh, void * arg) +{ + stream_dap_data_read(sh->client,arg); +} + + + +/** + * @brief stream_delete Delete stream and free its resources + * @param sid Stream id + */ +void stream_delete(dap_http_client_t * sh, void * arg) +{ + stream_dap_delete(sh->client,arg); +} diff --git a/stream/stream.h b/stream/stream.h index 653dd6f330906a642d40bfaa284b398c3ff5ffe7..250318c88fed8119aa41fa8714ac844bdaf05842 100644 --- a/stream/stream.h +++ b/stream/stream.h @@ -26,20 +26,26 @@ #include <stdint.h> #include <pthread.h> #include <stdbool.h> +#include <ev.h> #include "stream_session.h" #include "stream_ch.h" +#include "dap_udp_server.h" #define CHUNK_SIZE_MAX 3*1024 struct dap_client_remote; +struct dap_udp_server_t; + struct dap_http_client; struct dap_http; struct stream; struct stream_pkt; #define STREAM_BUF_SIZE_MAX 10240 +#define STREAM_KEEPALIVE_TIMEOUT 3 // How often send keeplive messages (seconds) +#define STREAM_KEEPALIVE_PASSES 3 // How many messagges without answers need for disconnect client and close session typedef void (*stream_callback)(struct stream*,void*); @@ -51,8 +57,13 @@ typedef struct stream { struct dap_http_client * conn_http; // HTTP-specific + struct dap_udp_client * conn_udp; // UDP-client + bool is_live; + ev_timer keepalive_watcher; // Watcher for keepalive loop + uint8_t keepalive_passed; // Number of sended keepalive messages + struct stream_pkt * in_pkt; struct stream_pkt *pkt_buf_in; size_t pkt_buf_in_data_size; @@ -71,8 +82,11 @@ typedef struct stream { #define STREAM(a) ((stream_t *) (a)->_internal ) extern int stream_init(); + extern void stream_deinit(); -extern void stream_add_proc(struct dap_http * sh, const char * url); +extern void stream_add_proc_http(struct dap_http * sh, const char * url); + +extern void stream_add_proc_udp(dap_udp_server_t * sh); #endif diff --git a/stream/stream_ch.c b/stream/stream_ch.c index 0e378ba8e2191a10054c6f5bc7f1e8fb46b3e0c4..22bed6073deeb98ff3cf43ba898d0b5dfb109244 100644 --- a/stream/stream_ch.c +++ b/stream/stream_ch.c @@ -105,9 +105,12 @@ void stream_ch_set_ready_to_write(stream_ch_t * ch,bool is_ready) if(ch->ready_to_write!=is_ready){ //log_it(L_DEBUG,"Change channel '%c' to %s", (char) ch->proc->id, is_ready?"true":"false"); ch->ready_to_write=is_ready; - if(is_ready) + if(is_ready && ch->stream->conn_http) ch->stream->conn_http->state_write=DAP_HTTP_CLIENT_STATE_DATA; - dap_client_ready_to_write(ch->stream->conn,is_ready); + if(ch->stream->conn_udp) + dap_udp_client_ready_to_write(ch->stream->conn,is_ready); + else + dap_client_ready_to_write(ch->stream->conn,is_ready); } pthread_mutex_unlock(&ch->mutex); } diff --git a/stream/stream_ch.h b/stream/stream_ch.h index 2c00b1bb328eeac0458fbe729476869aeddd236f..91c50adfddfb7b09bfef221036dfd1727598b8ba 100644 --- a/stream/stream_ch.h +++ b/stream/stream_ch.h @@ -29,6 +29,9 @@ struct stream_pkt; struct stream_ch_proc; struct stream_ch; +#define SERVICE_CHANNEL_ID 's' +#define DATA_CHANNEL_ID 'd' + typedef void (*stream_ch_callback_t) (struct stream_ch*,void*); typedef struct stream_ch{ diff --git a/stream/stream_ch_pkt.c b/stream/stream_ch_pkt.c index 8e661db0ef9cb777fc0b87a95706d1b1912bda01..9a77d2bddc7890c5b60ca614cd510aec6df81d2e 100644 --- a/stream/stream_ch_pkt.c +++ b/stream/stream_ch_pkt.c @@ -113,3 +113,27 @@ size_t stream_ch_pkt_write_f(struct stream_ch * ch, uint8_t type, const char * s size_t ret=stream_ch_pkt_write(ch,type,buf,strlen(buf)); return ret; } + +/** + * @brief stream_ch_send_keepalive + * @param ch + * @return + */ +size_t stream_ch_send_keepalive(struct stream_ch * ch){ + pthread_mutex_lock( &ch->mutex); + + stream_ch_pkt_hdr_t hdr; + + memset(&hdr,0,sizeof(hdr)); + hdr.id = ch->proc->id; + hdr.size=0; + hdr.type=KEEPALIVE_PACKET; + hdr.enc_type = ch->proc->enc_type; + hdr.seq_id=0; + + memcpy(ch->buf,&hdr,sizeof(hdr) ); + + size_t ret=stream_pkt_write(ch->stream,ch->buf,sizeof(hdr)); + pthread_mutex_unlock( &ch->mutex); + return ret; +} diff --git a/stream/stream_ch_pkt.h b/stream/stream_ch_pkt.h index e2be229c2e0cb4f4048d97928202c03a03e44603..4e8fcb0b8290a181d9ee581627de10249f3dff4d 100644 --- a/stream/stream_ch_pkt.h +++ b/stream/stream_ch_pkt.h @@ -22,6 +22,8 @@ #ifndef _STREAM_CH_PKT_H_ #define _STREAM_CH_PKT_H_ +#define KEEPALIVE_PACKET 0x11 + #include <stdint.h> #include <stddef.h> struct stream_ch; @@ -47,4 +49,6 @@ extern size_t stream_ch_pkt_write_f(struct stream_ch * ch, uint8_t type, const c extern size_t stream_ch_pkt_write(struct stream_ch * ch, uint8_t type, const void * data, uint32_t data_size); extern size_t stream_ch_pkt_write_seq_id(struct stream_ch * ch, uint8_t type, uint64_t seq_id, const void * data, uint32_t data_size); +extern size_t stream_ch_send_keepalive(struct stream_ch * ch); + #endif diff --git a/stream/stream_pkt.c b/stream/stream_pkt.c index 24292f24a966879f5b822172e902cee1148ac5e1..373f7ff5b051f3d1df8cb6e535a5d7cbc16fee87 100644 --- a/stream/stream_pkt.c +++ b/stream/stream_pkt.c @@ -67,6 +67,13 @@ stream_pkt_t * stream_pkt_detect(void * data, uint32_t data_size) return ret; } +size_t encode_dummy(const void * buf, const size_t buf_size, void * buf_out){ + if(memcpy(buf_out,buf,buf_size) != NULL) + return buf_size; + else + return 0; +} + /** * @brief stream_pkt_read * @param sid @@ -75,7 +82,8 @@ stream_pkt_t * stream_pkt_detect(void * data, uint32_t data_size) */ size_t stream_pkt_read(struct stream * sid,struct stream_pkt * pkt, void * buf_out) { - size_t ds = enc_decode(sid->session->key,pkt->data,pkt->hdr.size,buf_out,DAP_ENC_DATA_TYPE_RAW); + //size_t ds = dap_enc_decode(sid->session->key,pkt->data,pkt->hdr.size,buf_out,DAP_ENC_DATA_TYPE_RAW); + size_t ds = encode_dummy(pkt->data,pkt->hdr.size,buf_out); // log_it(L_DEBUG,"Stream decoded %lu bytes ( last bytes 0x%02x 0x%02x 0x%02x 0x%02x ) ", ds, // *((uint8_t *)buf_out+ds-4),*((uint8_t *)buf_out+ds-3),*((uint8_t *)buf_out+ds-2),*((uint8_t *)buf_out+ds-1) // ); @@ -86,6 +94,8 @@ size_t stream_pkt_read(struct stream * sid,struct stream_pkt * pkt, void * buf_o return ds; } + + /** * @brief stream_ch_pkt_write * @param ch @@ -107,11 +117,29 @@ size_t stream_pkt_write(struct stream * sid, const void * data, uint32_t data_si memset(&pkt_hdr,0,sizeof(pkt_hdr)); memcpy(pkt_hdr.sig,dap_sig,sizeof(pkt_hdr.sig)); - pkt_hdr.size = enc_code(sid->session->key,data,data_size,sid->buf,DAP_ENC_DATA_TYPE_RAW); + //pkt_hdr.size = dap_enc_code(sid->session->key,data,data_size,sid->buf,DAP_ENC_DATA_TYPE_RAW); + pkt_hdr.size = encode_dummy(data,data_size,sid->buf); - ret+=dap_client_write(sid->conn,&pkt_hdr,sizeof(pkt_hdr)); - ret+=dap_client_write(sid->conn,sid->buf,pkt_hdr.size); + if(sid->conn_udp){ + ret+=dap_udp_client_write(sid->conn,&pkt_hdr,sizeof(pkt_hdr)); + ret+=dap_udp_client_write(sid->conn,sid->buf,pkt_hdr.size); + } + else{ + ret+=dap_client_write(sid->conn,&pkt_hdr,sizeof(pkt_hdr)); + ret+=dap_client_write(sid->conn,sid->buf,pkt_hdr.size); + } return ret; } +extern void stream_send_keepalive(struct stream * sid) +{ + for(int i=0;i<sid->channel_count;i++) + if(sid->channel[i]->proc){ + if(sid->channel[i]->proc->id == SERVICE_CHANNEL_ID) + stream_ch_send_keepalive(sid->channel[i]); + stream_ch_set_ready_to_write(sid->channel[i],true); + } +} + + diff --git a/stream/stream_pkt.h b/stream/stream_pkt.h index 77cfc63096569d924d5bfc0d6bdbe20fae2d44d7..f3bf3117ec903032bfdf9bb96828c35adc7d9165 100644 --- a/stream/stream_pkt.h +++ b/stream/stream_pkt.h @@ -26,12 +26,17 @@ #define STREAM_PKT_SIZE_MAX 100000 struct stream; +#define DATA_PACKET 0x00 +#define SERVICE_PACKET 0xff +#define KEEPALIVE_PACKET 0x11 + typedef struct stream_pkt_hdr{ uint8_t sig[8]; // Signature to find out beginning of the frame uint32_t size; - uint8_t TTL; - char s_addr[32]; // Source address ( vasya@domain.net ) - char d_addr[32]; // Destination address ( general#domain.net ) + uint64_t timestamp; + uint8_t type; // Packet type + uint64_t s_addr; // Source address ( vasya@domain.net ) + uint64_t d_addr; // Destination address ( general#domain.net ) } __attribute__((packed)) stream_pkt_hdr_t; typedef struct stream_pkt{ @@ -39,6 +44,13 @@ typedef struct stream_pkt{ uint8_t data[]; } __attribute__((packed)) stream_pkt_t; +typedef struct stream_srv_pkt{ + uint32_t session_id; + uint8_t enc_type; + uint32_t coockie; +} __attribute__((packed)) stream_srv_pkt_t; + + extern const uint8_t dap_sig[8]; extern stream_pkt_t * stream_pkt_detect(void * data, uint32_t data_size); @@ -47,4 +59,6 @@ extern size_t stream_pkt_read(struct stream * sid,struct stream_pkt * pkt, void extern size_t stream_pkt_write(struct stream * sid, const void * data, uint32_t data_size); +extern void stream_send_keepalive(struct stream * sid); + #endif