diff --git a/session/CMakeLists.txt b/session/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..6e15be24ab1b79d14230cc813b10d12fe4a6dde7 --- /dev/null +++ b/session/CMakeLists.txt @@ -0,0 +1,22 @@ +cmake_minimum_required(VERSION 2.8) +project (dapsession) + +set(SESSION_SRCS stream_session.c) +include_directories("${INCLUDE_DIRECTORIES} ${dapcore_INCLUDE_DIRS}") +include_directories("${INCLUDE_DIRECTORIES} ${dapcrypt_INCLUDE_DIRS}") +include_directories("${INCLUDE_DIRECTORIES} ${dapauth_INCLUDE_DIRS}") +#include_directories("${INCLUDE_DIRECTORIES} ${dapstream_INCLUDE_DIRS}") +include_directories("${INCLUDE_DIRECTORIES} ${dapsession_INCLUDE_DIRS}") +include_directories("${INCLUDE_DIRECTORIES} ${daphttp_INCLUDE_DIRS}") +add_definitions ("${dapcore_DEFINITIONS}") +add_definitions ("${dapcrypt_DEFINITIONS}") +add_definitions ("${dapauth_DEFINITIONS}") +add_definitions ("${daphttp_DEFINITIONS}") +#add_definitions ("${dapstream_DEFINITIONS}") +add_definitions ("${dapsession_DEFINITIONS}") +add_library(${PROJECT_NAME} STATIC ${SESSION_SRCS}) + + +set(${PROJECT_NAME}_DEFINITIONS CACHE INTERNAL "${PROJECT_NAME}: Definitions" FORCE) + +set(${PROJECT_NAME}_INCLUDE_DIRS ${PROJECT_SOURCE_DIR} CACHE INTERNAL "${PROJECT_NAME}: Include Directories" FORCE) diff --git a/session/stream_session.c b/session/stream_session.c new file mode 100644 index 0000000000000000000000000000000000000000..fb3dbf2d69d8f672c92e0c64c565c580453deebf --- /dev/null +++ b/session/stream_session.c @@ -0,0 +1,113 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdlib.h> +#include "common.h" +#include "stream_session.h" + +#define LOG_TAG "stream_session" + +stream_session_t * sessions=NULL; + +int stream_session_close2(stream_session_t * s); +static void * session_check(void * data); + +void stream_session_init() +{ + log_it(INFO,"[session] Init module"); + srand ( time(NULL) ); +} + +void stream_session_deinit() +{ + stream_session_t *current, *tmp; + log_it(INFO,"[session] Destroy everything"); + + HASH_ITER(hh, sessions, current, tmp) { + HASH_DEL(sessions,current); + stream_session_close2(current); + } +} + +static void * session_check(void * data) +{ + return NULL; +} + + +stream_session_t * stream_session_pure_new() +{ + stream_session_t * ret=NULL; + unsigned int session_id=0,session_id_new=0; + do{ + session_id_new=session_id=rand()+rand()*0x100+rand()*0x10000+rand()*0x01000000; + HASH_FIND_INT(sessions,&session_id_new,ret); + }while(ret); + log_it(INFO,"[session] Creating new with id %u",session_id); + ret=(stream_session_t*) calloc(1,sizeof(stream_session_t)); + pthread_mutex_init(&ret->mutex, NULL); + ret->id=session_id; + ret->time_created=time(NULL); + ret->create_empty=true; + ret->enc_type = 0x01; // Default encryption type + log_it(DEBUG,"[session] timestamp %u",(unsigned int) ret->time_created); + HASH_ADD_INT(sessions,id,ret); + return ret; +} + +stream_session_t * stream_session_new(unsigned int media_id, bool open_preview) +{ + stream_session_t * ret=stream_session_pure_new(); + ret->media_id=media_id; + ret->open_preview=open_preview; + ret->create_empty=false; + return ret; +} + +stream_session_t * stream_session_id(unsigned int id) +{ + stream_session_t * ret; + HASH_FIND_INT(sessions,&id,ret); + return ret; +} + + +int stream_session_close(unsigned int id) +{ + return stream_session_close2(stream_session_id(id)); +} + +int stream_session_close2(stream_session_t * s) +{ + log_it(INFO,"[session] Close"); + HASH_DEL(sessions,s); + free(s); + return 0; +} + +int stream_session_open(stream_session_t * ss) +{ + int ret; + pthread_mutex_lock(&ss->mutex); + ret=ss->opened; + if(ss->opened==0) ss->opened=1; + pthread_mutex_unlock(&ss->mutex); + return ret; +} diff --git a/session/stream_session.h b/session/stream_session.h new file mode 100644 index 0000000000000000000000000000000000000000..64519d3de735557783e2be3582daa1a5428c0e62 --- /dev/null +++ b/session/stream_session.h @@ -0,0 +1,68 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef _STREAM_SESSION_H +#define _STREAM_SESSION_H +#include <pthread.h> +#include <time.h> +#include <stdbool.h> +#include <stdint.h> +#include <netinet/in.h> +#include "uthash.h" + +#include "enc_key.h" + +typedef enum stream_session_type {STREAM_SESSION_TYPE_MEDIA=0,STREAM_SESSION_TYPE_VPN} stream_session_type_t; +typedef enum stream_session_connection_type {STEAM_SESSION_HTTP = 0, STREAM_SESSION_UDP, STREAM_SESSION_END_TYPE} stream_session_connection_type_t; + +struct stream_session { + + bool create_empty; + unsigned int id; + unsigned int media_id; + + enc_key_t * key; + + bool open_preview; + pthread_mutex_t mutex; + int opened; + time_t time_created; + + uint8_t enc_type; + + stream_session_connection_type_t conn_type; + stream_session_type_t type; + UT_hash_handle hh; + + struct in_addr tun_client_addr; +}; +typedef struct stream_session stream_session_t; + +extern void stream_session_init(); +extern void stream_session_deinit(); + +extern stream_session_t * stream_session_pure_new(); +extern stream_session_t * stream_session_new(unsigned int media_id, bool open_preview); +extern stream_session_t * stream_session_id(unsigned int id); +extern int stream_session_open(stream_session_t * ss); /*Lock for opening for single client , return 0 if ok*/ +extern int stream_session_close(unsigned int id); + + +#endif diff --git a/stream/CMakeLists.txt b/stream/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..c29a4b5a5d623055ed4e99b078a7a72a38ec312f --- /dev/null +++ b/stream/CMakeLists.txt @@ -0,0 +1,25 @@ +cmake_minimum_required(VERSION 2.8) +project (dapstream) + +set(STREAM_SRCS stream.c stream_ch.c stream_ch_pkt.c stream_ch_proc.c stream_ctl.c stream_pkt.c ) + +include_directories("${INCLUDE_DIRECTORIES} ${dapcore_INCLUDE_DIRS}") +include_directories("${INCLUDE_DIRECTORIES} ${dapcrypt_INCLUDE_DIRS}") +include_directories("${INCLUDE_DIRECTORIES} ${dapauth_INCLUDE_DIRS}") +include_directories("${INCLUDE_DIRECTORIES} ${dapsession_INCLUDE_DIRS}") +include_directories("${INCLUDE_DIRECTORIES} ${dapdb_INCLUDE_DIRS}") +include_directories("${INCLUDE_DIRECTORIES} ${daphttp_INCLUDE_DIRS}") +include_directories("${INCLUDE_DIRECTORIES} ${dapdb_http_INCLUDE_DIRS}") +include_directories("${INCLUDE_DIRECTORIES} ${MONGO_INCLUDE_DIRS}") + +add_definitions ("${dapcore_DEFINITIONS}") +add_definitions ("${dapcrypt_DEFINITIONS}") +add_definitions ("${dapauth_DEFINITIONS}") +add_definitions ("${dapdb_DEFINITIONS}") +add_definitions ("${dapdb_http_DEFINITIONS}") +add_definitions ("${daphttp_DEFINITIONS}") +add_definitions ("${dapsession_DEFINITIONS}") + +add_library(${PROJECT_NAME} STATIC ${STREAM_SRCS}) +set(${PROJECT_NAME}_DEFINITIONS CACHE INTERNAL "${PROJECT_NAME}: Definitions" FORCE) +set(${PROJECT_NAME}_INCLUDE_DIRS ${PROJECT_SOURCE_DIR} CACHE INTERNAL "${PROJECT_NAME}: Include Directories" FORCE) diff --git a/stream/stream.c b/stream/stream.c new file mode 100644 index 0000000000000000000000000000000000000000..140b74e218b0c5fd4b33ba1ebb48c865551d0440 --- /dev/null +++ b/stream/stream.c @@ -0,0 +1,359 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdlib.h> +#include <stdio.h> +#include "config.h" +#include "common.h" + +#include "db_core.h" +#include "db_auth.h" +#include "stream.h" +#include "stream_pkt.h" +#include "stream_ch.h" +#include "stream_ch_proc.h" +#include "stream_ch_pkt.h" +#include "stream_session.h" + +#include "dap_client.h" +#include "dap_http.h" +#include "dap_http_client.h" +#include "dap_http_header.h" + + +#define LOG_TAG "stream" + +// Callbacks for HTTP client +void stream_headers_read(dap_http_client_t * sh, void * arg); // Prepare stream when all headers are read + +void stream_headers_write(dap_http_client_t * sh, void * arg); // Output headers +void stream_data_write(dap_http_client_t * sh, void * arg); // Write the data +void stream_data_read(dap_http_client_t * sh, void * arg); // Read the data + +// Internal functions +stream_t * stream_new(dap_http_client_t * sh); // Create new stream +void stream_delete(dap_http_client_t * sh, void * arg); + +/** + * @brief stream_init Init stream module + * @return 0 if ok others if not + */ +int stream_init() +{ + if( stream_ch_init() != 0 ){ + log_it(CRITICAL, "Can't init channel types submodule"); + return -1; + } + + log_it(NOTICE,"Init streaming module"); + return 0; +} + +/** + * @brief stream_media_deinit Deinint Stream module + */ +void stream_deinit() +{ + stream_ch_deinit(); +} + +/** + * @brief stream_add_proc Add URL processor callback for streaming + * @param sh HTTP server instance + * @param url URL + */ +void stream_add_proc(struct dap_http * sh, const char * url) +{ + dap_http_add_proc(sh,url,NULL,NULL,stream_delete,stream_headers_read,stream_headers_write,stream_data_read,stream_data_write,NULL); + //dap_http_add_proc(sh,url,NULL,NULL,NULL,stream_headers_read,stream_headers_write,NULL,stream_data_write,NULL); +} + +void stream_states_update(struct stream *sid) +{ + sid->conn_http->state_write=DAP_HTTP_CLIENT_STATE_START; + size_t i; + bool ready_to_write=false; + for(i=0;i<sid->channel_count; i++) + ready_to_write|=sid->channel[i]->ready_to_write; + dap_client_ready_to_write(sid->conn,ready_to_write); + + sid->conn_http->out_content_ready=true; +} + +void stream_headers_read(dap_http_client_t * cl_ht, void * arg) +{ + (void) arg; + + // char * raw=0; + // int raw_size; + unsigned int id=0; + + db_auth_info_t * ai = db_auth_info_by_cookie(cl_ht->in_cookie); + + if (ai == NULL) + ai = db_search_cookie_in_db(cl_ht->in_cookie); + + if(ai){ + log_it(DEBUG,"Prepare data stream"); + if(cl_ht->in_query_string[0]){ + log_it(INFO,"Query string [%s]",cl_ht->in_query_string); + if(sscanf(cl_ht->in_query_string,"fj913htmdgaq-d9hf=%u",&id)==1){ + stream_session_t * ss=NULL; + ss=stream_session_id(id); + if(ss==NULL){ + log_it(ERROR,"No session id %u was found",id); + cl_ht->reply_status_code=404; + strcpy(cl_ht->reply_reason_phrase,"Not found"); + }else{ + log_it(INFO,"Session id %u was found with media_id = %d",id,ss->media_id); + if(stream_session_open(ss)==0){ // Create new stream + stream_t * sid = stream_new(cl_ht); + sid->session=ss; + if(ss->create_empty){ + log_it(INFO, "Opened stream session with only technical channels"); + + cl_ht->reply_status_code=200; + strcpy(cl_ht->reply_reason_phrase,"OK"); + //cl_ht->state_write=DAP_HTTP_CLIENT_STATE_START; + //cl_ht->client->ready_to_write=true; + cl_ht->state_read=DAP_HTTP_CLIENT_STATE_DATA; + cl_ht->out_content_ready=true; + stream_ch_new(sid,'s'); + stream_ch_new(sid,'t'); + stream_states_update(sid); + dap_client_ready_to_read(cl_ht->client,true); + }else{ + stream_ch_new(sid,'s'); + stream_ch_new(sid,'g'); + + cl_ht->reply_status_code=200; + strcpy(cl_ht->reply_reason_phrase,"OK"); + cl_ht->state_read=DAP_HTTP_CLIENT_STATE_DATA; + dap_client_ready_to_read(cl_ht->client,true); + + stream_states_update(sid); + + } + }else{ + log_it(ERROR,"Can't open session id %u",id); + cl_ht->reply_status_code=404; + strcpy(cl_ht->reply_reason_phrase,"Not found"); + } + } + } + }else{ + log_it(ERROR,"No query string"); + } + }else{ + log_it(WARNING,"Not authorized connection"); + cl_ht->reply_status_code=505; + strcpy(cl_ht->reply_reason_phrase,"Not authorized"); + cl_ht->state_write=DAP_HTTP_CLIENT_STATE_START; + dap_client_ready_to_write(cl_ht->client,true); + } +} + +/** + * @brief stream_new Create new stream instance for HTTP client + * @return New stream_t instance + */ +stream_t * stream_new(dap_http_client_t * sh) +{ + stream_t * ret=(stream_t*) calloc(1,sizeof(stream_t)); + + ret->conn = sh->client; + ret->conn_http=sh; + + + sh->internal=ret; + + log_it(NOTICE,"New stream instance"); + return ret; +} + + +/** + * @brief stream_headers_write Prepare headers for output. Creates stream structure + * @param sh HTTP client instance + * @param arg Not used + */ +void stream_headers_write(dap_http_client_t * sh, void *arg) +{ + (void) arg; + if(sh->reply_status_code==200){ + stream_t *sid=STREAM(sh); + + dap_http_out_header_add(sh,"Content-Type","application/octet-stream"); + dap_http_out_header_add(sh,"Connnection","keep-alive"); + dap_http_out_header_add(sh,"Cache-Control","no-cache"); + + if(sid->stream_size>0) + dap_http_out_header_add_f(sh,"Content-Length","%u", (unsigned int) sid->stream_size ); + + sh->state_read=DAP_HTTP_CLIENT_STATE_DATA; + dap_client_ready_to_read(sh->client,true); + + } +} + + + +/** + * @brief stream_data_write HTTP data write callback + * @param sh HTTP client instance + * @param arg Not used + */ +void stream_data_write(dap_http_client_t * sh, void * arg) +{ + (void) arg; + + if(sh->reply_status_code==200){ + size_t i; + bool ready_to_write=false; + // log_it(DEBUG,"Process channels data output (%u channels)",STREAM(sh)->channel_count); + + for(i=0;i<STREAM(sh)->channel_count; i++){ + stream_ch_t * ch = STREAM(sh)->channel[i]; + if(ch->ready_to_write){ + ch->proc->packet_out_callback(ch,NULL); + ready_to_write|=ch->ready_to_write; + } + } + //log_it(DEBUG,"stream_data_out (ready_to_write=%s)", ready_to_write?"true":"false"); + + dap_client_ready_to_write(sh->client,ready_to_write); + //log_it(ERROR,"No stream_data_write_callback is defined"); + }else{ + log_it(WARNING, "Wrong request, reply status code is %u",sh->reply_status_code); + } +} + + +/** + * @brief stream_proc_pkt_in + * @param sid + */ +void stream_proc_pkt_in(stream_t * sid) +{ + // log_it(DEBUG,"Input: read last bytes for current packet (hdr.size=%u)",sid->pkt_buf_in-); + stream_ch_pkt_t * ch_pkt= (stream_ch_pkt_t*) calloc(1,sid->pkt_buf_in->hdr.size+sizeof(stream_ch_pkt_hdr_t)+16 ); + stream_pkt_read(sid,sid->pkt_buf_in, ch_pkt); + +// log_it (DEBUG, "Recieved channel packet with %lu of payload bytes (type '%c' id '%c')", +// ch_pkt->hdr.size,(char) ch_pkt->hdr.type, (char) ch_pkt->hdr.id); + stream_ch_t * ch = NULL; + size_t i; + for(i=0;i<sid->channel_count;i++) + if(sid->channel[i]->proc){ + if(sid->channel[i]->proc->id == ch_pkt->hdr.id ){ + ch=sid->channel[i]; + } + } + if(ch){ + ch->stat.bytes_read+=ch_pkt->hdr.size; + if(ch->proc) + if(ch->proc->packet_in_callback) + ch->proc->packet_in_callback(ch,ch_pkt); + + }else{ + log_it(WARNING, "Input: unprocessed channel packet id '%c'",(char) ch_pkt->hdr.id ); + } + free(sid->pkt_buf_in); + sid->pkt_buf_in=NULL; + sid->pkt_buf_in_data_size=0; + free(ch_pkt); + +} + +/** + * @brief stream_data_read HTTP data read callback. Read packet and passes that to the channel's callback + * @param sh HTTP client instance + * @param arg Processed number of bytes + */ + + + + +void stream_data_read(dap_http_client_t * sh, void * arg) +{ + + // log_it(DEBUG, "Stream data read %u bytes", sh->client->buf_in_size); + // log_it(DEBUG, "Stream data %s", sh->client->buf_in); + stream_t * sid =STREAM(sh); + int * ret = (int *) arg; + + if(sid->pkt_buf_in){ + size_t read_bytes_to=( ((sid->pkt_buf_in->hdr.size-sid->pkt_buf_in_data_size) > sid->conn->buf_in_size ) + ? sid->conn->buf_in_size + :(sid->pkt_buf_in->hdr.size-sid->pkt_buf_in_data_size)); + memcpy(sid->pkt_buf_in->data+sid->pkt_buf_in_data_size,sh->client->buf_in,read_bytes_to); + sid->pkt_buf_in_data_size+=read_bytes_to; + if(sid->pkt_buf_in_data_size>=(sid->pkt_buf_in->hdr.size) ){ + stream_proc_pkt_in(sid); + } + *ret+=read_bytes_to; + }else{ + stream_pkt_t * pkt; + while(pkt=stream_pkt_detect( sh->client->buf_in + *ret, (sh->client->buf_in_size - ((size_t) *ret) ))){ + size_t read_bytes_to=( (pkt->hdr.size+sizeof(stream_pkt_hdr_t)) > sid->conn->buf_in_size + ?sid->conn->buf_in_size + :(pkt->hdr.size+sizeof(stream_pkt_hdr_t) ) ); + if(read_bytes_to){ + sid->pkt_buf_in=(stream_pkt_t *) calloc(1,pkt->hdr.size+sizeof(stream_pkt_hdr_t)); + memcpy(sid->pkt_buf_in,pkt,read_bytes_to); + *ret = (*ret)+ read_bytes_to; + sid->pkt_buf_in_data_size=read_bytes_to-sizeof(stream_pkt_hdr_t); + if(read_bytes_to>=(pkt->hdr.size)){ + //log_it(INFO,"Input: read full packet (hdr.size=%u read_bytes_to=%u buf_in_size=%u)" + // ,sid->pkt_buf_in->hdr.size,read_bytes_to,sid->conn->buf_in_size); + stream_proc_pkt_in(sid); + }else{ + log_it(DEBUG,"Input: Not all stream packet in input (hdr.size=%u read_bytes_to=%u)",sid->pkt_buf_in->hdr.size,read_bytes_to); + } + return; + }else + break; + } + //log_it(WARNING,"Input: Not found signature in the incomming data"); + *ret += sh->client->buf_in_size; + } + +// log_it(DEBUG,"Stream read data from HTTP client: %u",sh->client->buf_in_size); +// if(sh->client->buf_in_size ) +} + + + +/** + * @brief stream_delete Delete stream and free its resources + * @param sid Stream id + */ +void stream_delete(dap_http_client_t * sh, void * arg) +{ + stream_t * sid = STREAM(sh); + if(sid == NULL) + return; + (void) arg; + size_t i; + for(i=0;i<sid->channel_count; i++) + stream_ch_delete(sid->channel[i]); + //free(sid); + log_it(NOTICE,"[core] Stream connection is finished"); +} diff --git a/stream/stream.h b/stream/stream.h new file mode 100644 index 0000000000000000000000000000000000000000..311807f0ae7cb07c92e0a6a6bb0ba0cddee980a3 --- /dev/null +++ b/stream/stream.h @@ -0,0 +1,78 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef _STREAM_H +#define _STREAM_H +//#include <gst/gst.h> +#include <stdio.h> +#include <stdbool.h> +#include <stdint.h> +#include <pthread.h> +#include <stdbool.h> + +#include "stream_session.h" +#include "stream_ch.h" + + +#define CHUNK_SIZE_MAX 3*1024 + +struct dap_client; + +struct dap_http_client; +struct dap_http; +struct stream; +struct stream_pkt; +#define STREAM_BUF_SIZE_MAX 10240 + +typedef void (*stream_callback)(struct stream*,void*); + +typedef struct stream { + + int id; + stream_session_t * session; + struct dap_client * conn; // Connection + + struct dap_http_client * conn_http; // HTTP-specific + + bool is_live; + + struct stream_pkt * in_pkt; + struct stream_pkt *pkt_buf_in; + size_t pkt_buf_in_data_size; + + uint8_t buf[500000]; + + stream_ch_t * channel[255]; // TODO reduce channels to 16 to economy memory + size_t channel_count; + + size_t frame_sent; // Frame counter + + size_t stream_size; + +} stream_t; + +#define STREAM(a) ((stream_t *) (a)->internal ) + +extern int stream_init(); +extern void stream_deinit(); + +extern void stream_add_proc(struct dap_http * sh, const char * url); + +#endif diff --git a/stream/stream_ch.c b/stream/stream_ch.c new file mode 100644 index 0000000000000000000000000000000000000000..a2da40a50b48cca254b39f2bb92fbdff6d0027b1 --- /dev/null +++ b/stream/stream_ch.c @@ -0,0 +1,113 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "common.h" + +#include "dap_client.h" +#include "dap_http_client.h" + +#include "stream.h" +#include "stream_ch.h" +#include "stream_ch_proc.h" +#include "stream_ch_pkt.h" + +#define LOG_TAG "stream_ch" + +/** + * @brief stream_ch_init Init stream channel module + * @return Zero if ok others if no + */ +int stream_ch_init() +{ + if(stream_ch_proc_init() != 0 ){ + log_it(CRITICAL,"Can't init stream channel proc submodule"); + return -1; + } + if(stream_ch_pkt_init() != 0 ){ + log_it(CRITICAL,"Can't init stream channel packet submodule"); + return -1; + } + log_it(NOTICE,"Module stream channel initialized"); + return 0; +} + +/** + * @brief stream_ch_deinit Destroy stream channel submodule + */ +void stream_ch_deinit() +{ +} + +/** + * @brief stream_ch_new Creates new stream channel instance + * @param direction Direction of channel (input to the server, output to the client) + * @return + */ +stream_ch_t* stream_ch_new(struct stream* stream,uint8_t id) +{ + stream_ch_proc_t * proc=stream_ch_proc_find(id); + if(proc){ + stream_ch_t * ret= CALLOC(stream_ch_t); + ret->stream=stream; + ret->proc=proc; + ret->ready_to_read=true; + ret->stream->channel[ret->stream->channel_count]=ret; + ret->stream->channel_count++; + pthread_mutex_init(&(ret->mutex),NULL); + if(ret->proc->new_callback) + ret->proc->new_callback(ret,NULL); + return ret; + }else{ + log_it(WARNING, "Unknown stream processor with id %uc",id); + return NULL; + } +} + +/** + * @brief stream_ch_delete Delete channel instance + * @param ch Channel delete + */ +void stream_ch_delete(stream_ch_t*ch) +{ + if(ch->proc) + if(ch->proc->delete_callback) + ch->proc->delete_callback(ch,NULL); + + pthread_mutex_destroy(&(ch->mutex)); + + if(ch->internal){ + free(ch->internal); + } + //free(ch); +} + + +void stream_ch_set_ready_to_write(stream_ch_t * ch,bool is_ready) +{ + pthread_mutex_lock(&ch->mutex); + if(ch->ready_to_write!=is_ready){ + //log_it(DEBUG,"Change channel '%c' to %s", (char) ch->proc->id, is_ready?"true":"false"); + ch->ready_to_write=is_ready; + if(is_ready) + ch->stream->conn_http->state_write=DAP_HTTP_CLIENT_STATE_DATA; + dap_client_ready_to_write(ch->stream->conn,is_ready); + } + pthread_mutex_unlock(&ch->mutex); +} diff --git a/stream/stream_ch.h b/stream/stream_ch.h new file mode 100644 index 0000000000000000000000000000000000000000..2c00b1bb328eeac0458fbe729476869aeddd236f --- /dev/null +++ b/stream/stream_ch.h @@ -0,0 +1,60 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef _STREAM_CH_H +#define _STREAM_CH_H +#include <stdbool.h> +#include <pthread.h> +#include <stdint.h> + +struct stream; +struct stream_pkt; +struct stream_ch_proc; +struct stream_ch; + +typedef void (*stream_ch_callback_t) (struct stream_ch*,void*); + +typedef struct stream_ch{ + pthread_mutex_t mutex; + bool ready_to_write; + bool ready_to_read; + struct stream * stream; + + struct{ + uint64_t bytes_write; + uint64_t bytes_read; + } stat; + + uint8_t buf[500000]; + + struct stream_ch_proc * proc; + void * internal; // Internal structure, GStreamer for example +} stream_ch_t; + +extern int stream_ch_init(); +extern void stream_ch_deinit(); + +extern stream_ch_t* stream_ch_new(struct stream*stream,uint8_t id); + +extern void stream_ch_set_ready_to_write(stream_ch_t * ch,bool is_ready); + +extern void stream_ch_delete(stream_ch_t*ch); + +#endif diff --git a/stream/stream_ch_pkt.c b/stream/stream_ch_pkt.c new file mode 100644 index 0000000000000000000000000000000000000000..4f6cd00f965a13717ab1081fc80092b478c6467c --- /dev/null +++ b/stream/stream_ch_pkt.c @@ -0,0 +1,115 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdio.h> +#include <stdarg.h> + +#include "common.h" +#include "enc.h" +#include "enc_key.h" + +#include "dap_client.h" +#include "stream.h" +#include "stream_ch.h" +#include "stream_ch_pkt.h" +#include "stream_ch_proc.h" +#include "stream_pkt.h" +#define LOG_TAG "stream_ch_pkt" + +/** + * @brief stream_ch_pkt_init + * @return Zero if ok + */ +int stream_ch_pkt_init() +{ + + return 0; +} + +void stream_ch_pkt_deinit() +{ + +} + + +/** + * @brief stream_ch_pkt_write + * @param sid + * @param data + * @param data_size + * @return + */ +size_t stream_ch_pkt_write_seq_id(struct stream_ch * ch, uint8_t type, uint64_t seq_id, const void * data, uint32_t data_size) +{ + pthread_mutex_lock( &ch->mutex); + + //log_it(DEBUG,"Output: Has %u bytes of %c type for %c channel id",data_size, (char)type, (char) ch->proc->id ); + + stream_ch_pkt_hdr_t hdr; + + memset(&hdr,0,sizeof(hdr)); + hdr.id = ch->proc->id; + hdr.size=data_size; + hdr.type=type; + hdr.enc_type = ch->proc->enc_type; + hdr.seq_id=seq_id; + + if(data_size+sizeof(hdr)> sizeof(ch->buf) ){ + log_it(ERROR,"Too big data size %lu, bigger than encryption buffer size %lu",data_size,sizeof(ch->buf)); + data_size=sizeof(ch->buf)-sizeof(hdr); + } + memcpy(ch->buf,&hdr,sizeof(hdr) ); + memcpy(ch->buf+sizeof(hdr),data,data_size ); + + size_t ret=stream_pkt_write(ch->stream,ch->buf,data_size+sizeof(hdr)); + ch->stat.bytes_write+=data_size; + pthread_mutex_unlock( &ch->mutex); + return ret; + +} + +/** + * @brief stream_ch_pkt_write + * @param sid + * @param data + * @param data_size + * @return + */ +size_t stream_ch_pkt_write(struct stream_ch * ch, uint8_t type, const void * data, uint32_t data_size) +{ + return stream_ch_pkt_write_seq_id(ch,type,0,data,data_size); +} + +/** + * @brief stream_ch_pkt_write_str + * @param sid + * @param str + * @return + */ +size_t stream_ch_pkt_write_f(struct stream_ch * ch, uint8_t type, const char * str,...) +{ + char buf[4096]; + va_list ap; + va_start(ap,str); + vsnprintf(buf,sizeof(buf),str,ap); + va_end(ap); + size_t ret=stream_ch_pkt_write(ch,type,buf,strlen(buf)); + return ret; +} diff --git a/stream/stream_ch_pkt.h b/stream/stream_ch_pkt.h new file mode 100644 index 0000000000000000000000000000000000000000..e2be229c2e0cb4f4048d97928202c03a03e44603 --- /dev/null +++ b/stream/stream_ch_pkt.h @@ -0,0 +1,50 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + + +#ifndef _STREAM_CH_PKT_H_ +#define _STREAM_CH_PKT_H_ + +#include <stdint.h> +#include <stddef.h> +struct stream_ch; +typedef struct stream_ch_pkt_hdr{ + uint8_t id; // Channel id + uint8_t enc_type; // Zero if not encrypted + uint8_t type; // general, command, info, signal and etc + uint8_t padding; + uint64_t seq_id; // Sequence id or position id +// uint64_t seq + uint32_t size; +} __attribute__((packed)) stream_ch_pkt_hdr_t; + +typedef struct stream_ch_pkt{ + stream_ch_pkt_hdr_t hdr; + uint8_t data[]; +} __attribute__((packed)) stream_ch_pkt_t; + +extern int stream_ch_pkt_init(); +extern void stream_ch_pkt_deinit(); + +extern size_t stream_ch_pkt_write_f(struct stream_ch * ch, uint8_t type, const char * str,...); +extern size_t stream_ch_pkt_write(struct stream_ch * ch, uint8_t type, const void * data, uint32_t data_size); +extern size_t stream_ch_pkt_write_seq_id(struct stream_ch * ch, uint8_t type, uint64_t seq_id, const void * data, uint32_t data_size); + +#endif diff --git a/stream/stream_ch_proc.c b/stream/stream_ch_proc.c new file mode 100644 index 0000000000000000000000000000000000000000..b7f443681649f3794f3d3c1bcc3a8942123138aa --- /dev/null +++ b/stream/stream_ch_proc.c @@ -0,0 +1,72 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "common.h" +#include "stream_ch_proc.h" + + +#define LOG_TAG "stream_ch_type" + +stream_ch_proc_t proc[256]={0}; + +/** + * @brief stream_ch_type_init Initialize stream channels type module + * @return 0 if ok others if no + */ +int stream_ch_proc_init() +{ + log_it(NOTICE, "Module stream channel types initialized"); + return 0; +} + +void stream_ch_proc_deinit() +{ + +} + + +/** + * @brief stream_ch_proc_add + * @param id + * @param delete_callback + * @param packet_in_callback + * @param packet_out_callback + */ +void stream_ch_proc_add(uint8_t id,stream_ch_callback_t new_callback,stream_ch_callback_t delete_callback, + stream_ch_callback_t packet_in_callback, + stream_ch_callback_t packet_out_callback + ) +{ + proc[id].id=id; + proc[id].new_callback=new_callback; + proc[id].delete_callback=delete_callback; + proc[id].packet_in_callback=packet_in_callback; + proc[id].packet_out_callback=packet_out_callback; +} + +/** + * @brief stream_ch_proc_find + * @param id + * @return + */ +stream_ch_proc_t* stream_ch_proc_find(uint8_t id) +{ + return proc+id; +} diff --git a/stream/stream_ch_proc.h b/stream/stream_ch_proc.h new file mode 100644 index 0000000000000000000000000000000000000000..c154097098992dd62d66884d42dd31ccff5f2d8f --- /dev/null +++ b/stream/stream_ch_proc.h @@ -0,0 +1,50 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef _STREAM_CH_TYPE_H_ +#define _STREAM_CH_TYPE_H_ + +#include <stdint.h> +#include "stream_ch.h" + + +typedef struct stream_ch_proc{ + uint8_t id; // Channel type id + uint8_t enc_type; // Encryption type + + stream_ch_callback_t new_callback; + stream_ch_callback_t delete_callback; + + stream_ch_callback_t packet_in_callback; + stream_ch_callback_t packet_out_callback; + void * internal; +} stream_ch_proc_t; + +extern int stream_ch_proc_init(); +extern void stream_ch_proc_deinit(); + +extern void stream_ch_proc_add(uint8_t id, + stream_ch_callback_t new_callback, stream_ch_callback_t delete_callback, + stream_ch_callback_t packet_in_callback, + stream_ch_callback_t packet_out_callback + ); +extern stream_ch_proc_t* stream_ch_proc_find(uint8_t id); + +#endif diff --git a/stream/stream_ctl.c b/stream/stream_ctl.c new file mode 100644 index 0000000000000000000000000000000000000000..bf075d0b38468bd2b6d0219a1c7068b8df8c7a3d --- /dev/null +++ b/stream/stream_ctl.c @@ -0,0 +1,172 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdbool.h> +#include <string.h> +#include "common.h" + +#include "stream.h" + +#include "enc_http.h" + +#include "dap_http.h" +#include "dap_http_client.h" +#include "dap_client.h" +#include "dap_http_simple.h" + +#include "stream_session.h" +#include "stream_ctl.h" + +#define LOG_TAG "stream_ctl" + +const char* connection_type_str[] = +{ + [STEAM_SESSION_HTTP] = "http", + [STREAM_SESSION_UDP] = "udp" +}; + +#define DAPMP_VERSION 13 +bool stream_check_proto_version(unsigned int ver); +void stream_ctl_proc(struct dap_http_simple *cl_st, void * arg); + +/** + * @brief stream_ctl_init Initialize stream control module + * @return Zero if ok others if not + */ +int stream_ctl_init() +{ + log_it(NOTICE,"Initialized stream control module"); + return 0; +} + +/** + * @brief stream_ctl_deinit Deinit stream control module + */ +void stream_ctl_deinit() +{ + +} + +/** + * @brief stream_ctl_add_proc Add stream control url processor + * @param sh HTTP server instance + * @param url URL string + */ +void stream_ctl_add_proc(struct dap_http * sh, const char * url) +{ + dap_http_simple_proc_add(sh,url,4096,stream_ctl_proc); +} + + +/** + * @brief stream_ctl_headers_read Process CTL request + * @param cl_st HTTP server instance + * @param arg Not used + */ +void stream_ctl_proc(struct dap_http_simple *cl_st, void * arg) +{ + bool * isOk = (bool *) arg; + + unsigned int db_id=0; + // unsigned int proto_version; + stream_session_t * ss=NULL; + // unsigned int action_cmd=0; + bool openPreview; + bool socket_forward=false; + + enc_http_delegate_t *dg = enc_http_request_decode(cl_st); + + if(dg){ + if(strcmp(dg->url_path,"open")==0) + openPreview=false; + else if (strcmp(dg->url_path,"open_preview")==0) + openPreview=true; + else if (strcmp(dg->url_path,"socket_forward")==0){ + socket_forward=true; + }else{ + log_it(ERROR,"ctl command unknown: %s",dg->url_path); + enc_http_delegate_delete(dg); + *isOk=false; + return; + } + if(socket_forward){ + log_it(INFO,"[ctl] Play request for db_id=%d",db_id); + ss=stream_session_pure_new(); + + char key_str[255]; + for(int i = 0; i < sizeof(key_str); i++) + key_str[i] = 65 + rand() % 25; + + ss->key=enc_key_create(key_str,ENC_KEY_TYPE_AES); + enc_http_reply_f(dg,"%u %s",ss->id,key_str); + dg->isOk=true; + // log_it(DEBUG,"Stream AES key string %s",key_str); + + }else if(sscanf( dg->in_query ,"db_id=%u",&db_id)==1){ + log_it(INFO,"[ctl] Play request for db_id=%d",db_id); + ss=stream_session_new(db_id,openPreview); + + char key_str[255]; + for(int i = 0; i < sizeof(key_str); i++) + key_str[i] = 65 + rand() % 25; + + ss->key=enc_key_create(key_str,ENC_KEY_TYPE_AES); + enc_http_reply_f(dg,"%u %s",ss->id,key_str); + dg->isOk=true; + log_it(DEBUG,"Stream AES key string %s",key_str); + }else{ + log_it(ERROR,"Wrong request: \"%s\"",dg->in_query); + dg->isOk=false; + } + *isOk=dg->isOk; + + unsigned int conn_t = 0; + char *ct_str = strstr(dg->in_query, "connection_type"); + if (ct_str) + { + sscanf(ct_str, "connection_type=%u", &conn_t); + if (conn_t < 0 || conn_t >= STREAM_SESSION_END_TYPE) + { + log_it(WARNING,"Error connection type : %i",conn_t); + conn_t = STEAM_SESSION_HTTP; + } + + if (ss) + { + ss->conn_type = conn_t; + } + + } + + log_it(INFO,"setup connection_type: %s", connection_type_str[conn_t]); + + enc_http_reply_encode(cl_st,dg); + enc_http_delegate_delete(dg); + }else{ + log_it(ERROR,"No encryption layer was initialized well"); + *isOk=false; + } +} + + +bool stream_check_proto_version(unsigned int ver) +{ + return ver<=DAPMP_VERSION; +} diff --git a/stream/stream_ctl.h b/stream/stream_ctl.h new file mode 100644 index 0000000000000000000000000000000000000000..8f4a15cee01b4a781f5ca8a5e482f47bb916ec3c --- /dev/null +++ b/stream/stream_ctl.h @@ -0,0 +1,32 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef _STREAM_CTL_H +#define _STREAM_CTL_H + +struct dap_http; + +extern int stream_ctl_init(); +extern void stream_ctl_deinit(); + + +extern void stream_ctl_add_proc(struct dap_http * sh, const char * url); + +#endif diff --git a/stream/stream_pkt.c b/stream/stream_pkt.c new file mode 100644 index 0000000000000000000000000000000000000000..01746e0136e7461ee1c2b0b260ac51cc37ecb7be --- /dev/null +++ b/stream/stream_pkt.c @@ -0,0 +1,117 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdint.h> +#include <string.h> +#include "common.h" +#include "config.h" + + +#include "dap_client.h" +#include "dap_http_client.h" + +#include "enc.h" +#include "enc_key.h" + +#include "stream.h" +#include "stream_pkt.h" +#include "stream_ch.h" +#include "stream_ch_pkt.h" +#include "stream_ch_proc.h" + + +#define LOG_TAG "stream_pkt" + + + +const size_t dap_hdr_size=8+2+1+1+4; +const uint8_t dap_sig[8]={0xa0,0x95,0x96,0xa9,0x9e,0x5c,0xfb,0xfa}; + + +stream_pkt_t * stream_pkt_detect(void * data, uint32_t data_size) +{ + void * sig_start=data; + stream_pkt_t * ret=NULL; + uint32_t length_left=data_size; + while(sig_start=memchr(sig_start, dap_sig[0],length_left) ){ + length_left= data_size-( sig_start-data); + if(length_left < sizeof(dap_sig) ) + break; + if(memcmp(sig_start,dap_sig,sizeof(dap_sig))==0){ + ret=sig_start; + if(ret->hdr.size > STREAM_PKT_SIZE_MAX ){ + log_it(ERROR, "Too big packet size %u",ret->hdr.size); + ret=NULL; + } + break; + }else + sig_start+=1; + } + return ret; +} + +/** + * @brief stream_pkt_read + * @param sid + * @param pkt + * @param buf_out + */ +size_t stream_pkt_read(struct stream * sid,struct stream_pkt * pkt, void * buf_out) +{ + size_t ds = enc_decode(sid->session->key,pkt->data,pkt->hdr.size,buf_out,ENC_DATA_TYPE_RAW); +// log_it(DEBUG,"Stream decoded %lu bytes ( last bytes 0x%02x 0x%02x 0x%02x 0x%02x ) ", ds, +// *((uint8_t *)buf_out+ds-4),*((uint8_t *)buf_out+ds-3),*((uint8_t *)buf_out+ds-2),*((uint8_t *)buf_out+ds-1) +// ); +// size_t mv=35; +// log_it(DEBUG,"(Decoded bytes with mv %lu bytes 0x%02x 0x%02x 0x%02x 0x%02x ) ", mv, +// *((uint8_t *)buf_out+mv-4),*((uint8_t *)buf_out+mv-3),*((uint8_t *)buf_out+mv-2),*((uint8_t *)buf_out+mv-1) +// ); + return ds; +} + +/** + * @brief stream_ch_pkt_write + * @param ch + * @param data + * @param data_size + * @return + */ + +size_t stream_pkt_write(struct stream * sid, const void * data, uint32_t data_size) +{ + size_t ret=0; + stream_pkt_hdr_t pkt_hdr; + + if(data_size> sizeof(sid->buf) ){ + log_it(ERROR,"Too big data size %lu, bigger than encryption buffer size %lu",data_size,sizeof(sid->buf)); + data_size=sizeof(sid->buf); + } + + memset(&pkt_hdr,0,sizeof(pkt_hdr)); + memcpy(pkt_hdr.sig,dap_sig,sizeof(pkt_hdr.sig)); + + pkt_hdr.size = enc_code(sid->session->key,data,data_size,sid->buf,ENC_DATA_TYPE_RAW); + + ret+=dap_client_write(sid->conn,&pkt_hdr,sizeof(pkt_hdr)); + ret+=dap_client_write(sid->conn,sid->buf,pkt_hdr.size); + return ret; +} + + diff --git a/stream/stream_pkt.h b/stream/stream_pkt.h new file mode 100644 index 0000000000000000000000000000000000000000..77cfc63096569d924d5bfc0d6bdbe20fae2d44d7 --- /dev/null +++ b/stream/stream_pkt.h @@ -0,0 +1,50 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef _STREAM_PKT_H_ +#define _STREAM_PKT_H_ +//#include <gst/gst.h> +#include <stdint.h> + +#define STREAM_PKT_SIZE_MAX 100000 +struct stream; + +typedef struct stream_pkt_hdr{ + uint8_t sig[8]; // Signature to find out beginning of the frame + uint32_t size; + uint8_t TTL; + char s_addr[32]; // Source address ( vasya@domain.net ) + char d_addr[32]; // Destination address ( general#domain.net ) +} __attribute__((packed)) stream_pkt_hdr_t; + +typedef struct stream_pkt{ + stream_pkt_hdr_t hdr; + uint8_t data[]; +} __attribute__((packed)) stream_pkt_t; + +extern const uint8_t dap_sig[8]; + +extern stream_pkt_t * stream_pkt_detect(void * data, uint32_t data_size); + +extern size_t stream_pkt_read(struct stream * sid,struct stream_pkt * pkt, void * buf_out); + +extern size_t stream_pkt_write(struct stream * sid, const void * data, uint32_t data_size); + +#endif