Skip to content
Snippets Groups Projects
Commit 3c363021 authored by Dmitriy A. Gerasimov's avatar Dmitriy A. Gerasimov
Browse files

[-] Channel files gone to its own repository

parent 26dfafe9
No related branches found
No related tags found
No related merge requests found
......@@ -3,15 +3,12 @@ project (dap_stream)
set(STREAM_SRCS
stream.c
stream_ch.c
stream_ch_pkt.c
stream_ch_proc.c
stream_ctl.c
stream_pkt.c )
add_library(${PROJECT_NAME} STATIC ${STREAM_SRCS})
target_link_libraries(dap_stream dap_core dap_udp_server
dap_crypto dap_http_server dap_enc_server dap_session)
dap_crypto dap_http_server dap_enc_server dap_session dap_stream_ch)
target_include_directories(dap_stream INTERFACE .)
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dap_common.h"
#include "dap_server_client.h"
#include "dap_http_client.h"
#include "stream.h"
#include "stream_ch.h"
#include "stream_ch_proc.h"
#include "stream_ch_pkt.h"
#define LOG_TAG "stream_ch"
/**
* @brief stream_ch_init Init stream channel module
* @return Zero if ok others if no
*/
int stream_ch_init()
{
if(stream_ch_proc_init() != 0 ){
log_it(L_CRITICAL,"Can't init stream channel proc submodule");
return -1;
}
if(stream_ch_pkt_init() != 0 ){
log_it(L_CRITICAL,"Can't init stream channel packet submodule");
return -1;
}
log_it(L_NOTICE,"Module stream channel initialized");
return 0;
}
/**
* @brief stream_ch_deinit Destroy stream channel submodule
*/
void stream_ch_deinit()
{
}
/**
* @brief stream_ch_new Creates new stream channel instance
* @param direction Direction of channel (input to the server, output to the client)
* @return
*/
stream_ch_t* stream_ch_new(struct stream* stream,uint8_t id)
{
stream_ch_proc_t * proc=stream_ch_proc_find(id);
if(proc){
stream_ch_t * ret= DAP_NEW_Z(stream_ch_t);
ret->stream=stream;
ret->proc=proc;
ret->ready_to_read=true;
ret->stream->channel[ret->stream->channel_count]=ret;
ret->stream->channel_count++;
pthread_mutex_init(&(ret->mutex),NULL);
if(ret->proc->new_callback)
ret->proc->new_callback(ret,NULL);
return ret;
}else{
log_it(L_WARNING, "Unknown stream processor with id %uc",id);
return NULL;
}
}
/**
* @brief stream_ch_delete Delete channel instance
* @param ch Channel delete
*/
void stream_ch_delete(stream_ch_t*ch)
{
if(ch->proc)
if(ch->proc->delete_callback)
ch->proc->delete_callback(ch,NULL);
pthread_mutex_destroy(&(ch->mutex));
if(ch->internal){
free(ch->internal);
}
//free(ch);
}
void stream_ch_set_ready_to_write(stream_ch_t * ch,bool is_ready)
{
pthread_mutex_lock(&ch->mutex);
if(ch->ready_to_write!=is_ready){
//log_it(L_DEBUG,"Change channel '%c' to %s", (char) ch->proc->id, is_ready?"true":"false");
ch->ready_to_write=is_ready;
if(is_ready && ch->stream->conn_http)
ch->stream->conn_http->state_write=DAP_HTTP_CLIENT_STATE_DATA;
if(ch->stream->conn_udp)
dap_udp_client_ready_to_write(ch->stream->conn,is_ready);
else
dap_client_ready_to_write(ch->stream->conn,is_ready);
}
pthread_mutex_unlock(&ch->mutex);
}
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _STREAM_CH_H
#define _STREAM_CH_H
#include <stdbool.h>
#include <pthread.h>
#include <stdint.h>
struct stream;
struct stream_pkt;
struct stream_ch_proc;
struct stream_ch;
#define SERVICE_CHANNEL_ID 's'
#define DATA_CHANNEL_ID 'd'
typedef void (*stream_ch_callback_t) (struct stream_ch*,void*);
typedef struct stream_ch{
pthread_mutex_t mutex;
bool ready_to_write;
bool ready_to_read;
struct stream * stream;
struct{
uint64_t bytes_write;
uint64_t bytes_read;
} stat;
uint8_t buf[500000];
struct stream_ch_proc * proc;
void * internal; // Internal structure, GStreamer for example
} stream_ch_t;
extern int stream_ch_init();
extern void stream_ch_deinit();
extern stream_ch_t* stream_ch_new(struct stream*stream,uint8_t id);
extern void stream_ch_set_ready_to_write(stream_ch_t * ch,bool is_ready);
extern void stream_ch_delete(stream_ch_t*ch);
#endif
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdarg.h>
#include "dap_common.h"
#include "dap_enc.h"
#include "dap_enc_key.h"
#include "dap_server_client.h"
#include "stream.h"
#include "stream_ch.h"
#include "stream_ch_pkt.h"
#include "stream_ch_proc.h"
#include "stream_pkt.h"
#define LOG_TAG "stream_ch_pkt"
/**
* @brief stream_ch_pkt_init
* @return Zero if ok
*/
int stream_ch_pkt_init()
{
return 0;
}
void stream_ch_pkt_deinit()
{
}
/**
* @brief stream_ch_pkt_write
* @param sid
* @param data
* @param data_size
* @return
*/
size_t stream_ch_pkt_write_seq_id(struct stream_ch * ch, uint8_t type, uint64_t seq_id, const void * data, uint32_t data_size)
{
pthread_mutex_lock( &ch->mutex);
//log_it(L_DEBUG,"Output: Has %u bytes of %c type for %c channel id",data_size, (char)type, (char) ch->proc->id );
stream_ch_pkt_hdr_t hdr;
memset(&hdr,0,sizeof(hdr));
hdr.id = ch->proc->id;
hdr.size=data_size;
hdr.type=type;
hdr.enc_type = ch->proc->enc_type;
hdr.seq_id=seq_id;
if(data_size+sizeof(hdr)> sizeof(ch->buf) ){
log_it(L_ERROR,"Too big data size %lu, bigger than encryption buffer size %lu",data_size,sizeof(ch->buf));
data_size=sizeof(ch->buf)-sizeof(hdr);
}
memcpy(ch->buf,&hdr,sizeof(hdr) );
memcpy(ch->buf+sizeof(hdr),data,data_size );
size_t ret=stream_pkt_write(ch->stream,ch->buf,data_size+sizeof(hdr));
ch->stat.bytes_write+=data_size;
pthread_mutex_unlock( &ch->mutex);
return ret;
}
/**
* @brief stream_ch_pkt_write
* @param sid
* @param data
* @param data_size
* @return
*/
size_t stream_ch_pkt_write(struct stream_ch * ch, uint8_t type, const void * data, uint32_t data_size)
{
return stream_ch_pkt_write_seq_id(ch,type,0,data,data_size);
}
/**
* @brief stream_ch_pkt_write_str
* @param sid
* @param str
* @return
*/
size_t stream_ch_pkt_write_f(struct stream_ch * ch, uint8_t type, const char * str,...)
{
char buf[4096];
va_list ap;
va_start(ap,str);
vsnprintf(buf,sizeof(buf),str,ap);
va_end(ap);
size_t ret=stream_ch_pkt_write(ch,type,buf,strlen(buf));
return ret;
}
/**
* @brief stream_ch_send_keepalive
* @param ch
* @return
*/
size_t stream_ch_send_keepalive(struct stream_ch * ch){
pthread_mutex_lock( &ch->mutex);
stream_ch_pkt_hdr_t hdr;
memset(&hdr,0,sizeof(hdr));
hdr.id = ch->proc->id;
hdr.size=0;
hdr.type=KEEPALIVE_PACKET;
hdr.enc_type = ch->proc->enc_type;
hdr.seq_id=0;
memcpy(ch->buf,&hdr,sizeof(hdr) );
size_t ret=stream_pkt_write(ch->stream,ch->buf,sizeof(hdr));
pthread_mutex_unlock( &ch->mutex);
return ret;
}
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _STREAM_CH_PKT_H_
#define _STREAM_CH_PKT_H_
#define KEEPALIVE_PACKET 0x11
#include <stdint.h>
#include <stddef.h>
struct stream_ch;
typedef struct stream_ch_pkt_hdr{
uint8_t id; // Channel id
uint8_t enc_type; // Zero if not encrypted
uint8_t type; // general, command, info, signal and etc
uint8_t padding;
uint64_t seq_id; // Sequence id or position id
// uint64_t seq
uint32_t size;
} __attribute__((packed)) stream_ch_pkt_hdr_t;
typedef struct stream_ch_pkt{
stream_ch_pkt_hdr_t hdr;
uint8_t data[];
} __attribute__((packed)) stream_ch_pkt_t;
extern int stream_ch_pkt_init();
extern void stream_ch_pkt_deinit();
extern size_t stream_ch_pkt_write_f(struct stream_ch * ch, uint8_t type, const char * str,...);
extern size_t stream_ch_pkt_write(struct stream_ch * ch, uint8_t type, const void * data, uint32_t data_size);
extern size_t stream_ch_pkt_write_seq_id(struct stream_ch * ch, uint8_t type, uint64_t seq_id, const void * data, uint32_t data_size);
extern size_t stream_ch_send_keepalive(struct stream_ch * ch);
#endif
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dap_common.h"
#include "stream_ch_proc.h"
#define LOG_TAG "stream_ch_type"
stream_ch_proc_t proc[256]={0};
/**
* @brief stream_ch_type_init Initialize stream channels type module
* @return 0 if ok others if no
*/
int stream_ch_proc_init()
{
log_it(L_NOTICE, "Module stream channel types initialized");
return 0;
}
void stream_ch_proc_deinit()
{
}
/**
* @brief stream_ch_proc_add
* @param id
* @param delete_callback
* @param packet_in_callback
* @param packet_out_callback
*/
void stream_ch_proc_add(uint8_t id,stream_ch_callback_t new_callback,stream_ch_callback_t delete_callback,
stream_ch_callback_t packet_in_callback,
stream_ch_callback_t packet_out_callback
)
{
proc[id].id=id;
proc[id].new_callback=new_callback;
proc[id].delete_callback=delete_callback;
proc[id].packet_in_callback=packet_in_callback;
proc[id].packet_out_callback=packet_out_callback;
}
/**
* @brief stream_ch_proc_find
* @param id
* @return
*/
stream_ch_proc_t* stream_ch_proc_find(uint8_t id)
{
return proc+id;
}
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _STREAM_CH_TYPE_H_
#define _STREAM_CH_TYPE_H_
#include <stdint.h>
#include "stream_ch.h"
typedef struct stream_ch_proc{
uint8_t id; // Channel type id
uint8_t enc_type; // Encryption type
stream_ch_callback_t new_callback;
stream_ch_callback_t delete_callback;
stream_ch_callback_t packet_in_callback;
stream_ch_callback_t packet_out_callback;
void * internal;
} stream_ch_proc_t;
extern int stream_ch_proc_init();
extern void stream_ch_proc_deinit();
extern void stream_ch_proc_add(uint8_t id,
stream_ch_callback_t new_callback, stream_ch_callback_t delete_callback,
stream_ch_callback_t packet_in_callback,
stream_ch_callback_t packet_out_callback
);
extern stream_ch_proc_t* stream_ch_proc_find(uint8_t id);
#endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment