Skip to content
Snippets Groups Projects
Commit 913b11e5 authored by Dmitriy Gerasimov's avatar Dmitriy Gerasimov
Browse files

[*] Init commit

parent 13dc9dc4
No related branches found
No related tags found
1 merge request!24Support 3689
cmake_minimum_required(VERSION 2.8)
project (dapsession)
set(SESSION_SRCS stream_session.c)
include_directories("${INCLUDE_DIRECTORIES} ${dapcore_INCLUDE_DIRS}")
include_directories("${INCLUDE_DIRECTORIES} ${dapcrypt_INCLUDE_DIRS}")
include_directories("${INCLUDE_DIRECTORIES} ${dapauth_INCLUDE_DIRS}")
#include_directories("${INCLUDE_DIRECTORIES} ${dapstream_INCLUDE_DIRS}")
include_directories("${INCLUDE_DIRECTORIES} ${dapsession_INCLUDE_DIRS}")
include_directories("${INCLUDE_DIRECTORIES} ${daphttp_INCLUDE_DIRS}")
add_definitions ("${dapcore_DEFINITIONS}")
add_definitions ("${dapcrypt_DEFINITIONS}")
add_definitions ("${dapauth_DEFINITIONS}")
add_definitions ("${daphttp_DEFINITIONS}")
#add_definitions ("${dapstream_DEFINITIONS}")
add_definitions ("${dapsession_DEFINITIONS}")
add_library(${PROJECT_NAME} STATIC ${SESSION_SRCS})
set(${PROJECT_NAME}_DEFINITIONS CACHE INTERNAL "${PROJECT_NAME}: Definitions" FORCE)
set(${PROJECT_NAME}_INCLUDE_DIRS ${PROJECT_SOURCE_DIR} CACHE INTERNAL "${PROJECT_NAME}: Include Directories" FORCE)
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include "common.h"
#include "stream_session.h"
#define LOG_TAG "stream_session"
stream_session_t * sessions=NULL;
int stream_session_close2(stream_session_t * s);
static void * session_check(void * data);
void stream_session_init()
{
log_it(INFO,"[session] Init module");
srand ( time(NULL) );
}
void stream_session_deinit()
{
stream_session_t *current, *tmp;
log_it(INFO,"[session] Destroy everything");
HASH_ITER(hh, sessions, current, tmp) {
HASH_DEL(sessions,current);
stream_session_close2(current);
}
}
static void * session_check(void * data)
{
return NULL;
}
stream_session_t * stream_session_pure_new()
{
stream_session_t * ret=NULL;
unsigned int session_id=0,session_id_new=0;
do{
session_id_new=session_id=rand()+rand()*0x100+rand()*0x10000+rand()*0x01000000;
HASH_FIND_INT(sessions,&session_id_new,ret);
}while(ret);
log_it(INFO,"[session] Creating new with id %u",session_id);
ret=(stream_session_t*) calloc(1,sizeof(stream_session_t));
pthread_mutex_init(&ret->mutex, NULL);
ret->id=session_id;
ret->time_created=time(NULL);
ret->create_empty=true;
ret->enc_type = 0x01; // Default encryption type
log_it(DEBUG,"[session] timestamp %u",(unsigned int) ret->time_created);
HASH_ADD_INT(sessions,id,ret);
return ret;
}
stream_session_t * stream_session_new(unsigned int media_id, bool open_preview)
{
stream_session_t * ret=stream_session_pure_new();
ret->media_id=media_id;
ret->open_preview=open_preview;
ret->create_empty=false;
return ret;
}
stream_session_t * stream_session_id(unsigned int id)
{
stream_session_t * ret;
HASH_FIND_INT(sessions,&id,ret);
return ret;
}
int stream_session_close(unsigned int id)
{
return stream_session_close2(stream_session_id(id));
}
int stream_session_close2(stream_session_t * s)
{
log_it(INFO,"[session] Close");
HASH_DEL(sessions,s);
free(s);
return 0;
}
int stream_session_open(stream_session_t * ss)
{
int ret;
pthread_mutex_lock(&ss->mutex);
ret=ss->opened;
if(ss->opened==0) ss->opened=1;
pthread_mutex_unlock(&ss->mutex);
return ret;
}
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _STREAM_SESSION_H
#define _STREAM_SESSION_H
#include <pthread.h>
#include <time.h>
#include <stdbool.h>
#include <stdint.h>
#include <netinet/in.h>
#include "uthash.h"
#include "enc_key.h"
typedef enum stream_session_type {STREAM_SESSION_TYPE_MEDIA=0,STREAM_SESSION_TYPE_VPN} stream_session_type_t;
typedef enum stream_session_connection_type {STEAM_SESSION_HTTP = 0, STREAM_SESSION_UDP, STREAM_SESSION_END_TYPE} stream_session_connection_type_t;
struct stream_session {
bool create_empty;
unsigned int id;
unsigned int media_id;
enc_key_t * key;
bool open_preview;
pthread_mutex_t mutex;
int opened;
time_t time_created;
uint8_t enc_type;
stream_session_connection_type_t conn_type;
stream_session_type_t type;
UT_hash_handle hh;
struct in_addr tun_client_addr;
};
typedef struct stream_session stream_session_t;
extern void stream_session_init();
extern void stream_session_deinit();
extern stream_session_t * stream_session_pure_new();
extern stream_session_t * stream_session_new(unsigned int media_id, bool open_preview);
extern stream_session_t * stream_session_id(unsigned int id);
extern int stream_session_open(stream_session_t * ss); /*Lock for opening for single client , return 0 if ok*/
extern int stream_session_close(unsigned int id);
#endif
cmake_minimum_required(VERSION 2.8)
project (dapstream)
set(STREAM_SRCS stream.c stream_ch.c stream_ch_pkt.c stream_ch_proc.c stream_ctl.c stream_pkt.c )
include_directories("${INCLUDE_DIRECTORIES} ${dapcore_INCLUDE_DIRS}")
include_directories("${INCLUDE_DIRECTORIES} ${dapcrypt_INCLUDE_DIRS}")
include_directories("${INCLUDE_DIRECTORIES} ${dapauth_INCLUDE_DIRS}")
include_directories("${INCLUDE_DIRECTORIES} ${dapsession_INCLUDE_DIRS}")
include_directories("${INCLUDE_DIRECTORIES} ${dapdb_INCLUDE_DIRS}")
include_directories("${INCLUDE_DIRECTORIES} ${daphttp_INCLUDE_DIRS}")
include_directories("${INCLUDE_DIRECTORIES} ${dapdb_http_INCLUDE_DIRS}")
include_directories("${INCLUDE_DIRECTORIES} ${MONGO_INCLUDE_DIRS}")
add_definitions ("${dapcore_DEFINITIONS}")
add_definitions ("${dapcrypt_DEFINITIONS}")
add_definitions ("${dapauth_DEFINITIONS}")
add_definitions ("${dapdb_DEFINITIONS}")
add_definitions ("${dapdb_http_DEFINITIONS}")
add_definitions ("${daphttp_DEFINITIONS}")
add_definitions ("${dapsession_DEFINITIONS}")
add_library(${PROJECT_NAME} STATIC ${STREAM_SRCS})
set(${PROJECT_NAME}_DEFINITIONS CACHE INTERNAL "${PROJECT_NAME}: Definitions" FORCE)
set(${PROJECT_NAME}_INCLUDE_DIRS ${PROJECT_SOURCE_DIR} CACHE INTERNAL "${PROJECT_NAME}: Include Directories" FORCE)
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <stdio.h>
#include "config.h"
#include "common.h"
#include "db_core.h"
#include "db_auth.h"
#include "stream.h"
#include "stream_pkt.h"
#include "stream_ch.h"
#include "stream_ch_proc.h"
#include "stream_ch_pkt.h"
#include "stream_session.h"
#include "dap_client.h"
#include "dap_http.h"
#include "dap_http_client.h"
#include "dap_http_header.h"
#define LOG_TAG "stream"
// Callbacks for HTTP client
void stream_headers_read(dap_http_client_t * sh, void * arg); // Prepare stream when all headers are read
void stream_headers_write(dap_http_client_t * sh, void * arg); // Output headers
void stream_data_write(dap_http_client_t * sh, void * arg); // Write the data
void stream_data_read(dap_http_client_t * sh, void * arg); // Read the data
// Internal functions
stream_t * stream_new(dap_http_client_t * sh); // Create new stream
void stream_delete(dap_http_client_t * sh, void * arg);
/**
* @brief stream_init Init stream module
* @return 0 if ok others if not
*/
int stream_init()
{
if( stream_ch_init() != 0 ){
log_it(CRITICAL, "Can't init channel types submodule");
return -1;
}
log_it(NOTICE,"Init streaming module");
return 0;
}
/**
* @brief stream_media_deinit Deinint Stream module
*/
void stream_deinit()
{
stream_ch_deinit();
}
/**
* @brief stream_add_proc Add URL processor callback for streaming
* @param sh HTTP server instance
* @param url URL
*/
void stream_add_proc(struct dap_http * sh, const char * url)
{
dap_http_add_proc(sh,url,NULL,NULL,stream_delete,stream_headers_read,stream_headers_write,stream_data_read,stream_data_write,NULL);
//dap_http_add_proc(sh,url,NULL,NULL,NULL,stream_headers_read,stream_headers_write,NULL,stream_data_write,NULL);
}
void stream_states_update(struct stream *sid)
{
sid->conn_http->state_write=DAP_HTTP_CLIENT_STATE_START;
size_t i;
bool ready_to_write=false;
for(i=0;i<sid->channel_count; i++)
ready_to_write|=sid->channel[i]->ready_to_write;
dap_client_ready_to_write(sid->conn,ready_to_write);
sid->conn_http->out_content_ready=true;
}
void stream_headers_read(dap_http_client_t * cl_ht, void * arg)
{
(void) arg;
// char * raw=0;
// int raw_size;
unsigned int id=0;
db_auth_info_t * ai = db_auth_info_by_cookie(cl_ht->in_cookie);
if (ai == NULL)
ai = db_search_cookie_in_db(cl_ht->in_cookie);
if(ai){
log_it(DEBUG,"Prepare data stream");
if(cl_ht->in_query_string[0]){
log_it(INFO,"Query string [%s]",cl_ht->in_query_string);
if(sscanf(cl_ht->in_query_string,"fj913htmdgaq-d9hf=%u",&id)==1){
stream_session_t * ss=NULL;
ss=stream_session_id(id);
if(ss==NULL){
log_it(ERROR,"No session id %u was found",id);
cl_ht->reply_status_code=404;
strcpy(cl_ht->reply_reason_phrase,"Not found");
}else{
log_it(INFO,"Session id %u was found with media_id = %d",id,ss->media_id);
if(stream_session_open(ss)==0){ // Create new stream
stream_t * sid = stream_new(cl_ht);
sid->session=ss;
if(ss->create_empty){
log_it(INFO, "Opened stream session with only technical channels");
cl_ht->reply_status_code=200;
strcpy(cl_ht->reply_reason_phrase,"OK");
//cl_ht->state_write=DAP_HTTP_CLIENT_STATE_START;
//cl_ht->client->ready_to_write=true;
cl_ht->state_read=DAP_HTTP_CLIENT_STATE_DATA;
cl_ht->out_content_ready=true;
stream_ch_new(sid,'s');
stream_ch_new(sid,'t');
stream_states_update(sid);
dap_client_ready_to_read(cl_ht->client,true);
}else{
stream_ch_new(sid,'s');
stream_ch_new(sid,'g');
cl_ht->reply_status_code=200;
strcpy(cl_ht->reply_reason_phrase,"OK");
cl_ht->state_read=DAP_HTTP_CLIENT_STATE_DATA;
dap_client_ready_to_read(cl_ht->client,true);
stream_states_update(sid);
}
}else{
log_it(ERROR,"Can't open session id %u",id);
cl_ht->reply_status_code=404;
strcpy(cl_ht->reply_reason_phrase,"Not found");
}
}
}
}else{
log_it(ERROR,"No query string");
}
}else{
log_it(WARNING,"Not authorized connection");
cl_ht->reply_status_code=505;
strcpy(cl_ht->reply_reason_phrase,"Not authorized");
cl_ht->state_write=DAP_HTTP_CLIENT_STATE_START;
dap_client_ready_to_write(cl_ht->client,true);
}
}
/**
* @brief stream_new Create new stream instance for HTTP client
* @return New stream_t instance
*/
stream_t * stream_new(dap_http_client_t * sh)
{
stream_t * ret=(stream_t*) calloc(1,sizeof(stream_t));
ret->conn = sh->client;
ret->conn_http=sh;
sh->internal=ret;
log_it(NOTICE,"New stream instance");
return ret;
}
/**
* @brief stream_headers_write Prepare headers for output. Creates stream structure
* @param sh HTTP client instance
* @param arg Not used
*/
void stream_headers_write(dap_http_client_t * sh, void *arg)
{
(void) arg;
if(sh->reply_status_code==200){
stream_t *sid=STREAM(sh);
dap_http_out_header_add(sh,"Content-Type","application/octet-stream");
dap_http_out_header_add(sh,"Connnection","keep-alive");
dap_http_out_header_add(sh,"Cache-Control","no-cache");
if(sid->stream_size>0)
dap_http_out_header_add_f(sh,"Content-Length","%u", (unsigned int) sid->stream_size );
sh->state_read=DAP_HTTP_CLIENT_STATE_DATA;
dap_client_ready_to_read(sh->client,true);
}
}
/**
* @brief stream_data_write HTTP data write callback
* @param sh HTTP client instance
* @param arg Not used
*/
void stream_data_write(dap_http_client_t * sh, void * arg)
{
(void) arg;
if(sh->reply_status_code==200){
size_t i;
bool ready_to_write=false;
// log_it(DEBUG,"Process channels data output (%u channels)",STREAM(sh)->channel_count);
for(i=0;i<STREAM(sh)->channel_count; i++){
stream_ch_t * ch = STREAM(sh)->channel[i];
if(ch->ready_to_write){
ch->proc->packet_out_callback(ch,NULL);
ready_to_write|=ch->ready_to_write;
}
}
//log_it(DEBUG,"stream_data_out (ready_to_write=%s)", ready_to_write?"true":"false");
dap_client_ready_to_write(sh->client,ready_to_write);
//log_it(ERROR,"No stream_data_write_callback is defined");
}else{
log_it(WARNING, "Wrong request, reply status code is %u",sh->reply_status_code);
}
}
/**
* @brief stream_proc_pkt_in
* @param sid
*/
void stream_proc_pkt_in(stream_t * sid)
{
// log_it(DEBUG,"Input: read last bytes for current packet (hdr.size=%u)",sid->pkt_buf_in-);
stream_ch_pkt_t * ch_pkt= (stream_ch_pkt_t*) calloc(1,sid->pkt_buf_in->hdr.size+sizeof(stream_ch_pkt_hdr_t)+16 );
stream_pkt_read(sid,sid->pkt_buf_in, ch_pkt);
// log_it (DEBUG, "Recieved channel packet with %lu of payload bytes (type '%c' id '%c')",
// ch_pkt->hdr.size,(char) ch_pkt->hdr.type, (char) ch_pkt->hdr.id);
stream_ch_t * ch = NULL;
size_t i;
for(i=0;i<sid->channel_count;i++)
if(sid->channel[i]->proc){
if(sid->channel[i]->proc->id == ch_pkt->hdr.id ){
ch=sid->channel[i];
}
}
if(ch){
ch->stat.bytes_read+=ch_pkt->hdr.size;
if(ch->proc)
if(ch->proc->packet_in_callback)
ch->proc->packet_in_callback(ch,ch_pkt);
}else{
log_it(WARNING, "Input: unprocessed channel packet id '%c'",(char) ch_pkt->hdr.id );
}
free(sid->pkt_buf_in);
sid->pkt_buf_in=NULL;
sid->pkt_buf_in_data_size=0;
free(ch_pkt);
}
/**
* @brief stream_data_read HTTP data read callback. Read packet and passes that to the channel's callback
* @param sh HTTP client instance
* @param arg Processed number of bytes
*/
void stream_data_read(dap_http_client_t * sh, void * arg)
{
// log_it(DEBUG, "Stream data read %u bytes", sh->client->buf_in_size);
// log_it(DEBUG, "Stream data %s", sh->client->buf_in);
stream_t * sid =STREAM(sh);
int * ret = (int *) arg;
if(sid->pkt_buf_in){
size_t read_bytes_to=( ((sid->pkt_buf_in->hdr.size-sid->pkt_buf_in_data_size) > sid->conn->buf_in_size )
? sid->conn->buf_in_size
:(sid->pkt_buf_in->hdr.size-sid->pkt_buf_in_data_size));
memcpy(sid->pkt_buf_in->data+sid->pkt_buf_in_data_size,sh->client->buf_in,read_bytes_to);
sid->pkt_buf_in_data_size+=read_bytes_to;
if(sid->pkt_buf_in_data_size>=(sid->pkt_buf_in->hdr.size) ){
stream_proc_pkt_in(sid);
}
*ret+=read_bytes_to;
}else{
stream_pkt_t * pkt;
while(pkt=stream_pkt_detect( sh->client->buf_in + *ret, (sh->client->buf_in_size - ((size_t) *ret) ))){
size_t read_bytes_to=( (pkt->hdr.size+sizeof(stream_pkt_hdr_t)) > sid->conn->buf_in_size
?sid->conn->buf_in_size
:(pkt->hdr.size+sizeof(stream_pkt_hdr_t) ) );
if(read_bytes_to){
sid->pkt_buf_in=(stream_pkt_t *) calloc(1,pkt->hdr.size+sizeof(stream_pkt_hdr_t));
memcpy(sid->pkt_buf_in,pkt,read_bytes_to);
*ret = (*ret)+ read_bytes_to;
sid->pkt_buf_in_data_size=read_bytes_to-sizeof(stream_pkt_hdr_t);
if(read_bytes_to>=(pkt->hdr.size)){
//log_it(INFO,"Input: read full packet (hdr.size=%u read_bytes_to=%u buf_in_size=%u)"
// ,sid->pkt_buf_in->hdr.size,read_bytes_to,sid->conn->buf_in_size);
stream_proc_pkt_in(sid);
}else{
log_it(DEBUG,"Input: Not all stream packet in input (hdr.size=%u read_bytes_to=%u)",sid->pkt_buf_in->hdr.size,read_bytes_to);
}
return;
}else
break;
}
//log_it(WARNING,"Input: Not found signature in the incomming data");
*ret += sh->client->buf_in_size;
}
// log_it(DEBUG,"Stream read data from HTTP client: %u",sh->client->buf_in_size);
// if(sh->client->buf_in_size )
}
/**
* @brief stream_delete Delete stream and free its resources
* @param sid Stream id
*/
void stream_delete(dap_http_client_t * sh, void * arg)
{
stream_t * sid = STREAM(sh);
if(sid == NULL)
return;
(void) arg;
size_t i;
for(i=0;i<sid->channel_count; i++)
stream_ch_delete(sid->channel[i]);
//free(sid);
log_it(NOTICE,"[core] Stream connection is finished");
}
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _STREAM_H
#define _STREAM_H
//#include <gst/gst.h>
#include <stdio.h>
#include <stdbool.h>
#include <stdint.h>
#include <pthread.h>
#include <stdbool.h>
#include "stream_session.h"
#include "stream_ch.h"
#define CHUNK_SIZE_MAX 3*1024
struct dap_client;
struct dap_http_client;
struct dap_http;
struct stream;
struct stream_pkt;
#define STREAM_BUF_SIZE_MAX 10240
typedef void (*stream_callback)(struct stream*,void*);
typedef struct stream {
int id;
stream_session_t * session;
struct dap_client * conn; // Connection
struct dap_http_client * conn_http; // HTTP-specific
bool is_live;
struct stream_pkt * in_pkt;
struct stream_pkt *pkt_buf_in;
size_t pkt_buf_in_data_size;
uint8_t buf[500000];
stream_ch_t * channel[255]; // TODO reduce channels to 16 to economy memory
size_t channel_count;
size_t frame_sent; // Frame counter
size_t stream_size;
} stream_t;
#define STREAM(a) ((stream_t *) (a)->internal )
extern int stream_init();
extern void stream_deinit();
extern void stream_add_proc(struct dap_http * sh, const char * url);
#endif
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include "common.h"
#include "dap_client.h"
#include "dap_http_client.h"
#include "stream.h"
#include "stream_ch.h"
#include "stream_ch_proc.h"
#include "stream_ch_pkt.h"
#define LOG_TAG "stream_ch"
/**
* @brief stream_ch_init Init stream channel module
* @return Zero if ok others if no
*/
int stream_ch_init()
{
if(stream_ch_proc_init() != 0 ){
log_it(CRITICAL,"Can't init stream channel proc submodule");
return -1;
}
if(stream_ch_pkt_init() != 0 ){
log_it(CRITICAL,"Can't init stream channel packet submodule");
return -1;
}
log_it(NOTICE,"Module stream channel initialized");
return 0;
}
/**
* @brief stream_ch_deinit Destroy stream channel submodule
*/
void stream_ch_deinit()
{
}
/**
* @brief stream_ch_new Creates new stream channel instance
* @param direction Direction of channel (input to the server, output to the client)
* @return
*/
stream_ch_t* stream_ch_new(struct stream* stream,uint8_t id)
{
stream_ch_proc_t * proc=stream_ch_proc_find(id);
if(proc){
stream_ch_t * ret= CALLOC(stream_ch_t);
ret->stream=stream;
ret->proc=proc;
ret->ready_to_read=true;
ret->stream->channel[ret->stream->channel_count]=ret;
ret->stream->channel_count++;
pthread_mutex_init(&(ret->mutex),NULL);
if(ret->proc->new_callback)
ret->proc->new_callback(ret,NULL);
return ret;
}else{
log_it(WARNING, "Unknown stream processor with id %uc",id);
return NULL;
}
}
/**
* @brief stream_ch_delete Delete channel instance
* @param ch Channel delete
*/
void stream_ch_delete(stream_ch_t*ch)
{
if(ch->proc)
if(ch->proc->delete_callback)
ch->proc->delete_callback(ch,NULL);
pthread_mutex_destroy(&(ch->mutex));
if(ch->internal){
free(ch->internal);
}
//free(ch);
}
void stream_ch_set_ready_to_write(stream_ch_t * ch,bool is_ready)
{
pthread_mutex_lock(&ch->mutex);
if(ch->ready_to_write!=is_ready){
//log_it(DEBUG,"Change channel '%c' to %s", (char) ch->proc->id, is_ready?"true":"false");
ch->ready_to_write=is_ready;
if(is_ready)
ch->stream->conn_http->state_write=DAP_HTTP_CLIENT_STATE_DATA;
dap_client_ready_to_write(ch->stream->conn,is_ready);
}
pthread_mutex_unlock(&ch->mutex);
}
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _STREAM_CH_H
#define _STREAM_CH_H
#include <stdbool.h>
#include <pthread.h>
#include <stdint.h>
struct stream;
struct stream_pkt;
struct stream_ch_proc;
struct stream_ch;
typedef void (*stream_ch_callback_t) (struct stream_ch*,void*);
typedef struct stream_ch{
pthread_mutex_t mutex;
bool ready_to_write;
bool ready_to_read;
struct stream * stream;
struct{
uint64_t bytes_write;
uint64_t bytes_read;
} stat;
uint8_t buf[500000];
struct stream_ch_proc * proc;
void * internal; // Internal structure, GStreamer for example
} stream_ch_t;
extern int stream_ch_init();
extern void stream_ch_deinit();
extern stream_ch_t* stream_ch_new(struct stream*stream,uint8_t id);
extern void stream_ch_set_ready_to_write(stream_ch_t * ch,bool is_ready);
extern void stream_ch_delete(stream_ch_t*ch);
#endif
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdarg.h>
#include "common.h"
#include "enc.h"
#include "enc_key.h"
#include "dap_client.h"
#include "stream.h"
#include "stream_ch.h"
#include "stream_ch_pkt.h"
#include "stream_ch_proc.h"
#include "stream_pkt.h"
#define LOG_TAG "stream_ch_pkt"
/**
* @brief stream_ch_pkt_init
* @return Zero if ok
*/
int stream_ch_pkt_init()
{
return 0;
}
void stream_ch_pkt_deinit()
{
}
/**
* @brief stream_ch_pkt_write
* @param sid
* @param data
* @param data_size
* @return
*/
size_t stream_ch_pkt_write_seq_id(struct stream_ch * ch, uint8_t type, uint64_t seq_id, const void * data, uint32_t data_size)
{
pthread_mutex_lock( &ch->mutex);
//log_it(DEBUG,"Output: Has %u bytes of %c type for %c channel id",data_size, (char)type, (char) ch->proc->id );
stream_ch_pkt_hdr_t hdr;
memset(&hdr,0,sizeof(hdr));
hdr.id = ch->proc->id;
hdr.size=data_size;
hdr.type=type;
hdr.enc_type = ch->proc->enc_type;
hdr.seq_id=seq_id;
if(data_size+sizeof(hdr)> sizeof(ch->buf) ){
log_it(ERROR,"Too big data size %lu, bigger than encryption buffer size %lu",data_size,sizeof(ch->buf));
data_size=sizeof(ch->buf)-sizeof(hdr);
}
memcpy(ch->buf,&hdr,sizeof(hdr) );
memcpy(ch->buf+sizeof(hdr),data,data_size );
size_t ret=stream_pkt_write(ch->stream,ch->buf,data_size+sizeof(hdr));
ch->stat.bytes_write+=data_size;
pthread_mutex_unlock( &ch->mutex);
return ret;
}
/**
* @brief stream_ch_pkt_write
* @param sid
* @param data
* @param data_size
* @return
*/
size_t stream_ch_pkt_write(struct stream_ch * ch, uint8_t type, const void * data, uint32_t data_size)
{
return stream_ch_pkt_write_seq_id(ch,type,0,data,data_size);
}
/**
* @brief stream_ch_pkt_write_str
* @param sid
* @param str
* @return
*/
size_t stream_ch_pkt_write_f(struct stream_ch * ch, uint8_t type, const char * str,...)
{
char buf[4096];
va_list ap;
va_start(ap,str);
vsnprintf(buf,sizeof(buf),str,ap);
va_end(ap);
size_t ret=stream_ch_pkt_write(ch,type,buf,strlen(buf));
return ret;
}
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _STREAM_CH_PKT_H_
#define _STREAM_CH_PKT_H_
#include <stdint.h>
#include <stddef.h>
struct stream_ch;
typedef struct stream_ch_pkt_hdr{
uint8_t id; // Channel id
uint8_t enc_type; // Zero if not encrypted
uint8_t type; // general, command, info, signal and etc
uint8_t padding;
uint64_t seq_id; // Sequence id or position id
// uint64_t seq
uint32_t size;
} __attribute__((packed)) stream_ch_pkt_hdr_t;
typedef struct stream_ch_pkt{
stream_ch_pkt_hdr_t hdr;
uint8_t data[];
} __attribute__((packed)) stream_ch_pkt_t;
extern int stream_ch_pkt_init();
extern void stream_ch_pkt_deinit();
extern size_t stream_ch_pkt_write_f(struct stream_ch * ch, uint8_t type, const char * str,...);
extern size_t stream_ch_pkt_write(struct stream_ch * ch, uint8_t type, const void * data, uint32_t data_size);
extern size_t stream_ch_pkt_write_seq_id(struct stream_ch * ch, uint8_t type, uint64_t seq_id, const void * data, uint32_t data_size);
#endif
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include "common.h"
#include "stream_ch_proc.h"
#define LOG_TAG "stream_ch_type"
stream_ch_proc_t proc[256]={0};
/**
* @brief stream_ch_type_init Initialize stream channels type module
* @return 0 if ok others if no
*/
int stream_ch_proc_init()
{
log_it(NOTICE, "Module stream channel types initialized");
return 0;
}
void stream_ch_proc_deinit()
{
}
/**
* @brief stream_ch_proc_add
* @param id
* @param delete_callback
* @param packet_in_callback
* @param packet_out_callback
*/
void stream_ch_proc_add(uint8_t id,stream_ch_callback_t new_callback,stream_ch_callback_t delete_callback,
stream_ch_callback_t packet_in_callback,
stream_ch_callback_t packet_out_callback
)
{
proc[id].id=id;
proc[id].new_callback=new_callback;
proc[id].delete_callback=delete_callback;
proc[id].packet_in_callback=packet_in_callback;
proc[id].packet_out_callback=packet_out_callback;
}
/**
* @brief stream_ch_proc_find
* @param id
* @return
*/
stream_ch_proc_t* stream_ch_proc_find(uint8_t id)
{
return proc+id;
}
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _STREAM_CH_TYPE_H_
#define _STREAM_CH_TYPE_H_
#include <stdint.h>
#include "stream_ch.h"
typedef struct stream_ch_proc{
uint8_t id; // Channel type id
uint8_t enc_type; // Encryption type
stream_ch_callback_t new_callback;
stream_ch_callback_t delete_callback;
stream_ch_callback_t packet_in_callback;
stream_ch_callback_t packet_out_callback;
void * internal;
} stream_ch_proc_t;
extern int stream_ch_proc_init();
extern void stream_ch_proc_deinit();
extern void stream_ch_proc_add(uint8_t id,
stream_ch_callback_t new_callback, stream_ch_callback_t delete_callback,
stream_ch_callback_t packet_in_callback,
stream_ch_callback_t packet_out_callback
);
extern stream_ch_proc_t* stream_ch_proc_find(uint8_t id);
#endif
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdbool.h>
#include <string.h>
#include "common.h"
#include "stream.h"
#include "enc_http.h"
#include "dap_http.h"
#include "dap_http_client.h"
#include "dap_client.h"
#include "dap_http_simple.h"
#include "stream_session.h"
#include "stream_ctl.h"
#define LOG_TAG "stream_ctl"
const char* connection_type_str[] =
{
[STEAM_SESSION_HTTP] = "http",
[STREAM_SESSION_UDP] = "udp"
};
#define DAPMP_VERSION 13
bool stream_check_proto_version(unsigned int ver);
void stream_ctl_proc(struct dap_http_simple *cl_st, void * arg);
/**
* @brief stream_ctl_init Initialize stream control module
* @return Zero if ok others if not
*/
int stream_ctl_init()
{
log_it(NOTICE,"Initialized stream control module");
return 0;
}
/**
* @brief stream_ctl_deinit Deinit stream control module
*/
void stream_ctl_deinit()
{
}
/**
* @brief stream_ctl_add_proc Add stream control url processor
* @param sh HTTP server instance
* @param url URL string
*/
void stream_ctl_add_proc(struct dap_http * sh, const char * url)
{
dap_http_simple_proc_add(sh,url,4096,stream_ctl_proc);
}
/**
* @brief stream_ctl_headers_read Process CTL request
* @param cl_st HTTP server instance
* @param arg Not used
*/
void stream_ctl_proc(struct dap_http_simple *cl_st, void * arg)
{
bool * isOk = (bool *) arg;
unsigned int db_id=0;
// unsigned int proto_version;
stream_session_t * ss=NULL;
// unsigned int action_cmd=0;
bool openPreview;
bool socket_forward=false;
enc_http_delegate_t *dg = enc_http_request_decode(cl_st);
if(dg){
if(strcmp(dg->url_path,"open")==0)
openPreview=false;
else if (strcmp(dg->url_path,"open_preview")==0)
openPreview=true;
else if (strcmp(dg->url_path,"socket_forward")==0){
socket_forward=true;
}else{
log_it(ERROR,"ctl command unknown: %s",dg->url_path);
enc_http_delegate_delete(dg);
*isOk=false;
return;
}
if(socket_forward){
log_it(INFO,"[ctl] Play request for db_id=%d",db_id);
ss=stream_session_pure_new();
char key_str[255];
for(int i = 0; i < sizeof(key_str); i++)
key_str[i] = 65 + rand() % 25;
ss->key=enc_key_create(key_str,ENC_KEY_TYPE_AES);
enc_http_reply_f(dg,"%u %s",ss->id,key_str);
dg->isOk=true;
// log_it(DEBUG,"Stream AES key string %s",key_str);
}else if(sscanf( dg->in_query ,"db_id=%u",&db_id)==1){
log_it(INFO,"[ctl] Play request for db_id=%d",db_id);
ss=stream_session_new(db_id,openPreview);
char key_str[255];
for(int i = 0; i < sizeof(key_str); i++)
key_str[i] = 65 + rand() % 25;
ss->key=enc_key_create(key_str,ENC_KEY_TYPE_AES);
enc_http_reply_f(dg,"%u %s",ss->id,key_str);
dg->isOk=true;
log_it(DEBUG,"Stream AES key string %s",key_str);
}else{
log_it(ERROR,"Wrong request: \"%s\"",dg->in_query);
dg->isOk=false;
}
*isOk=dg->isOk;
unsigned int conn_t = 0;
char *ct_str = strstr(dg->in_query, "connection_type");
if (ct_str)
{
sscanf(ct_str, "connection_type=%u", &conn_t);
if (conn_t < 0 || conn_t >= STREAM_SESSION_END_TYPE)
{
log_it(WARNING,"Error connection type : %i",conn_t);
conn_t = STEAM_SESSION_HTTP;
}
if (ss)
{
ss->conn_type = conn_t;
}
}
log_it(INFO,"setup connection_type: %s", connection_type_str[conn_t]);
enc_http_reply_encode(cl_st,dg);
enc_http_delegate_delete(dg);
}else{
log_it(ERROR,"No encryption layer was initialized well");
*isOk=false;
}
}
bool stream_check_proto_version(unsigned int ver)
{
return ver<=DAPMP_VERSION;
}
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _STREAM_CTL_H
#define _STREAM_CTL_H
struct dap_http;
extern int stream_ctl_init();
extern void stream_ctl_deinit();
extern void stream_ctl_add_proc(struct dap_http * sh, const char * url);
#endif
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdint.h>
#include <string.h>
#include "common.h"
#include "config.h"
#include "dap_client.h"
#include "dap_http_client.h"
#include "enc.h"
#include "enc_key.h"
#include "stream.h"
#include "stream_pkt.h"
#include "stream_ch.h"
#include "stream_ch_pkt.h"
#include "stream_ch_proc.h"
#define LOG_TAG "stream_pkt"
const size_t dap_hdr_size=8+2+1+1+4;
const uint8_t dap_sig[8]={0xa0,0x95,0x96,0xa9,0x9e,0x5c,0xfb,0xfa};
stream_pkt_t * stream_pkt_detect(void * data, uint32_t data_size)
{
void * sig_start=data;
stream_pkt_t * ret=NULL;
uint32_t length_left=data_size;
while(sig_start=memchr(sig_start, dap_sig[0],length_left) ){
length_left= data_size-( sig_start-data);
if(length_left < sizeof(dap_sig) )
break;
if(memcmp(sig_start,dap_sig,sizeof(dap_sig))==0){
ret=sig_start;
if(ret->hdr.size > STREAM_PKT_SIZE_MAX ){
log_it(ERROR, "Too big packet size %u",ret->hdr.size);
ret=NULL;
}
break;
}else
sig_start+=1;
}
return ret;
}
/**
* @brief stream_pkt_read
* @param sid
* @param pkt
* @param buf_out
*/
size_t stream_pkt_read(struct stream * sid,struct stream_pkt * pkt, void * buf_out)
{
size_t ds = enc_decode(sid->session->key,pkt->data,pkt->hdr.size,buf_out,ENC_DATA_TYPE_RAW);
// log_it(DEBUG,"Stream decoded %lu bytes ( last bytes 0x%02x 0x%02x 0x%02x 0x%02x ) ", ds,
// *((uint8_t *)buf_out+ds-4),*((uint8_t *)buf_out+ds-3),*((uint8_t *)buf_out+ds-2),*((uint8_t *)buf_out+ds-1)
// );
// size_t mv=35;
// log_it(DEBUG,"(Decoded bytes with mv %lu bytes 0x%02x 0x%02x 0x%02x 0x%02x ) ", mv,
// *((uint8_t *)buf_out+mv-4),*((uint8_t *)buf_out+mv-3),*((uint8_t *)buf_out+mv-2),*((uint8_t *)buf_out+mv-1)
// );
return ds;
}
/**
* @brief stream_ch_pkt_write
* @param ch
* @param data
* @param data_size
* @return
*/
size_t stream_pkt_write(struct stream * sid, const void * data, uint32_t data_size)
{
size_t ret=0;
stream_pkt_hdr_t pkt_hdr;
if(data_size> sizeof(sid->buf) ){
log_it(ERROR,"Too big data size %lu, bigger than encryption buffer size %lu",data_size,sizeof(sid->buf));
data_size=sizeof(sid->buf);
}
memset(&pkt_hdr,0,sizeof(pkt_hdr));
memcpy(pkt_hdr.sig,dap_sig,sizeof(pkt_hdr.sig));
pkt_hdr.size = enc_code(sid->session->key,data,data_size,sid->buf,ENC_DATA_TYPE_RAW);
ret+=dap_client_write(sid->conn,&pkt_hdr,sizeof(pkt_hdr));
ret+=dap_client_write(sid->conn,sid->buf,pkt_hdr.size);
return ret;
}
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _STREAM_PKT_H_
#define _STREAM_PKT_H_
//#include <gst/gst.h>
#include <stdint.h>
#define STREAM_PKT_SIZE_MAX 100000
struct stream;
typedef struct stream_pkt_hdr{
uint8_t sig[8]; // Signature to find out beginning of the frame
uint32_t size;
uint8_t TTL;
char s_addr[32]; // Source address ( vasya@domain.net )
char d_addr[32]; // Destination address ( general#domain.net )
} __attribute__((packed)) stream_pkt_hdr_t;
typedef struct stream_pkt{
stream_pkt_hdr_t hdr;
uint8_t data[];
} __attribute__((packed)) stream_pkt_t;
extern const uint8_t dap_sig[8];
extern stream_pkt_t * stream_pkt_detect(void * data, uint32_t data_size);
extern size_t stream_pkt_read(struct stream * sid,struct stream_pkt * pkt, void * buf_out);
extern size_t stream_pkt_write(struct stream * sid, const void * data, uint32_t data_size);
#endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment