Skip to content
Snippets Groups Projects
Unverified Commit 8fa982b5 authored by Dmitriy A. Gerasimov's avatar Dmitriy A. Gerasimov Committed by GitHub
Browse files

Merge pull request #11 from kelvinblockchain/feature-2198

Feature 2198
parents 228f84d3 87112946
No related branches found
No related tags found
No related merge requests found
Subproject commit 842029d3892f8794ff7f1a85af6d024ce62e1796
Subproject commit 999a4bc46231c2398a8d010dee06628965b3c478
Subproject commit 2d675d9e6d94ae8961f97af0ee7e16c2816a24af
Subproject commit 1b3a871d6ec93801f14081de51157bbb703593f1
Subproject commit a655fc5695e7da0b58c7cb5188d287dcf83f6939
Subproject commit 7c6065bc699760e6e66ee4e80861a3562b2366c1
Subproject commit 2564d01f7663a9b3fd0c30d9f2aefe0580b327cd
Subproject commit 19a6376646f497d97bfe1ea3fade1a907c32f76a
Subproject commit e12cd24dfca8b778cf63d1c957d04f7d2471f75c
Subproject commit dfbdaa1df0498069e60ffaf42de963375bffa816
......@@ -91,7 +91,13 @@ dap_stream_session_t * dap_stream_session_id(unsigned int id)
int dap_stream_session_close(unsigned int id)
{
return stream_session_close2(dap_stream_session_id(id));
log_it(L_INFO,"Close session id=%d", id);
dap_stream_session_t *l_s = dap_stream_session_id(id);
if(!l_s) {
log_it(L_WARNING, "Session id=%d not found", id);
return -1;
}
return stream_session_close2(l_s);
}
int stream_session_close2(dap_stream_session_t * s)
......
......@@ -46,6 +46,8 @@ struct dap_stream_session {
uint8_t enc_type;
char active_channels[16];// channels for open
stream_session_connection_type_t conn_type;
stream_session_type_t type;
UT_hash_handle hh;
......
......@@ -55,7 +55,7 @@ void stream_dap_delete(dap_client_remote_t* sh, void * arg);
void stream_dap_new(dap_client_remote_t* sh,void * arg);
// Internal functions
dap_stream_t * stream_new(dap_http_client_t * sh); // Create new stream
dap_stream_t * stream_new(dap_http_client_t * a_sh); // Create new stream
void stream_delete(dap_http_client_t * sh, void * arg);
struct ev_loop *keepalive_loop;
......@@ -154,7 +154,8 @@ void stream_headers_read(dap_http_client_t * cl_ht, void * arg)
log_it(L_DEBUG,"Prepare data stream");
if(cl_ht->in_query_string[0]){
log_it(L_INFO,"Query string [%s]",cl_ht->in_query_string);
if(sscanf(cl_ht->in_query_string,"fj913htmdgaq-d9hf=%u",&id)==1){
// if(sscanf(cl_ht->in_query_string,"fj913htmdgaq-d9hf=%u",&id)==1){
if(sscanf(cl_ht->in_query_string,"session_id=%u",&id)==1){
dap_stream_session_t * ss=NULL;
ss=dap_stream_session_id(id);
if(ss==NULL){
......@@ -166,6 +167,12 @@ void stream_headers_read(dap_http_client_t * cl_ht, void * arg)
if(dap_stream_session_open(ss)==0){ // Create new stream
dap_stream_t * sid = stream_new(cl_ht);
sid->session=ss;
size_t count_channels = strlen(ss->active_channels);
for(size_t i = 0; i < count_channels; i++) {
dap_stream_ch_new(sid, ss->active_channels[i]);
//sid->channel[i]->ready_to_write = true;
}
ss->create_empty = false;
if(ss->create_empty){
log_it(L_INFO, "Opened stream session with only technical channels");
......@@ -176,12 +183,12 @@ void stream_headers_read(dap_http_client_t * cl_ht, void * arg)
cl_ht->state_read=DAP_HTTP_CLIENT_STATE_DATA;
cl_ht->out_content_ready=true;
dap_stream_ch_new(sid,SERVICE_CHANNEL_ID);
dap_stream_ch_new(sid,'t');
//dap_stream_ch_new(sid,'t');
stream_states_update(sid);
dap_client_remote_ready_to_read(cl_ht->client,true);
}else{
dap_stream_ch_new(sid,SERVICE_CHANNEL_ID);
dap_stream_ch_new(sid,'g');
//dap_stream_ch_new(sid,'g');
cl_ht->reply_status_code=200;
strcpy(cl_ht->reply_reason_phrase,"OK");
......@@ -261,12 +268,12 @@ void check_session(unsigned int id, dap_client_remote_t* cl){
* @brief stream_new Create new stream instance for HTTP client
* @return New stream_t instance
*/
dap_stream_t * stream_new(dap_http_client_t * sh)
dap_stream_t * stream_new(dap_http_client_t * a_sh)
{
dap_stream_t * ret=(dap_stream_t*) calloc(1,sizeof(dap_stream_t));
ret->conn = sh->client;
ret->conn_http=sh;
ret->conn = a_sh->client;
ret->conn_http=a_sh;
ret->buf_defrag_size = 0;
ret->seq_id = 0;
ret->client_last_seq_id_packet = (size_t)-1;
......@@ -396,10 +403,12 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream)
{
bool found_sig=false;
dap_stream_pkt_t * pkt=NULL;
uint8_t * proc_data= a_stream->conn->buf_in;
char *buf_in = (a_stream->conn) ? (char*)a_stream->conn->buf_in : (char*)a_stream->events_socket->buf_in;
size_t buf_in_size = (a_stream->conn) ? a_stream->conn->buf_in_size : a_stream->events_socket->buf_in_size;
uint8_t * proc_data = buf_in;//a_stream->conn->buf_in;
bool proc_data_defrag=false; // We are or not in defrag buffer
size_t read_bytes_to=0;
size_t bytes_left_to_read=a_stream->conn->buf_in_size;
size_t bytes_left_to_read = buf_in_size;//a_stream->conn->buf_in_size;
// Process prebuffered packets or glue defragmented data with the current input
if(pkt=a_stream->pkt_buf_in){ // Packet signature detected
if(a_stream->pkt_buf_in_data_size < sizeof(stream_pkt_hdr_t))
......@@ -444,7 +453,7 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream)
stream_proc_pkt_in(a_stream);
}
}
proc_data=(a_stream->conn->buf_in + a_stream->conn->buf_in_size - bytes_left_to_read);
proc_data=(buf_in + buf_in_size - bytes_left_to_read);//proc_data=(a_stream->conn->buf_in + a_stream->conn->buf_in_size - bytes_left_to_read);
}else if( a_stream->buf_defrag_size>0){ // If smth is present in defrag buffer - we glue everything together in it
if( bytes_left_to_read > 0){ // If there is smth to process in input buffer
......@@ -518,10 +527,10 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream)
break;
}
}
if(!found_sig){
//log_it(DEBUG,"Input: Not found signature in the incomming data ( client->buf_in_size = %u *ret = %u )",
// sh->client->buf_in_size, *ret);
}
/*if(!found_sig){
log_it(L_DEBUG,"Input: Not found signature in the incomming data ( client->buf_in_size = %u *ret = %u )",
sh->client->buf_in_size, *ret);
}*/
if(bytes_left_to_read>0){
if(proc_data_defrag){
memmove(a_stream->buf_defrag, proc_data, bytes_left_to_read);
......@@ -535,12 +544,10 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream)
}else if(proc_data_defrag){
a_stream->buf_defrag_size=0;
}
return a_stream->conn->buf_in_size;
return buf_in_size;//a_stream->conn->buf_in_size;
}
/**
* @brief stream_dap_data_write Write callback for UDP client
* @param sh DAP client instance
......@@ -627,11 +634,13 @@ static bool _detect_loose_packet(dap_stream_t * sid)
*/
void stream_proc_pkt_in(dap_stream_t * sid)
{
if(sid->pkt_buf_in->hdr.type == DATA_PACKET)
if(sid->pkt_buf_in->hdr.type == STREAM_PKT_TYPE_DATA_PACKET)
{
dap_stream_ch_pkt_t * ch_pkt = (dap_stream_ch_pkt_t *) sid->buf_pkt_in;
dap_stream_pkt_read(sid,sid->pkt_buf_in, ch_pkt, STREAM_BUF_SIZE_MAX);
if(dap_stream_pkt_read(sid,sid->pkt_buf_in, ch_pkt, STREAM_BUF_SIZE_MAX)==0){
log_it(L_WARNING, "Input: can't decode packet size=%d",sid->pkt_buf_in_data_size);
}
_detect_loose_packet(sid);
......@@ -648,12 +657,12 @@ void stream_proc_pkt_in(dap_stream_t * sid)
if(ch->proc)
if(ch->proc->packet_in_callback)
ch->proc->packet_in_callback(ch,ch_pkt);
if(ch->proc->id == SERVICE_CHANNEL_ID && ch_pkt->hdr.type == KEEPALIVE_PACKET)
if(ch->proc->id == SERVICE_CHANNEL_ID && ch_pkt->hdr.type == STREAM_CH_PKT_TYPE_KEEPALIVE)
dap_stream_send_keepalive(sid);
}else{
log_it(L_WARNING, "Input: unprocessed channel packet id '%c'",(char) ch_pkt->hdr.id );
}
} else if(sid->pkt_buf_in->hdr.type == SERVICE_PACKET) {
} else if(sid->pkt_buf_in->hdr.type == STREAM_PKT_TYPE_SERVICE_PACKET) {
stream_srv_pkt_t * srv_pkt = (stream_srv_pkt_t *)malloc(sizeof(stream_srv_pkt_t));
memcpy(srv_pkt,sid->pkt_buf_in->data,sizeof(stream_srv_pkt_t));
uint32_t session_id = srv_pkt->session_id;
......
......@@ -104,11 +104,18 @@ void stream_ctl_proc(struct dap_http_simple *cl_st, void * arg)
enc_http_delegate_t *dg = enc_http_request_decode(cl_st);
if(dg){
if (strcmp(dg->url_path,"socket_forward")==0){
size_t l_channels_str_size = sizeof(ss->active_channels);
char l_channels_str[l_channels_str_size];
if(dg->url_path && strlen(dg->url_path) < 30 &&
sscanf(dg->url_path, "stream_ctl,channels=%s", l_channels_str) == 1) {
l_new_session = true;
}
/*if (strcmp(dg->url_path,"socket_forward")==0){
l_new_session = true;
}else if (strcmp(dg->url_path,"stream_ctl")==0) {
l_new_session = true;
}else{
}*/
else{
log_it(L_ERROR,"ctl command unknown: %s",dg->url_path);
enc_http_delegate_delete(dg);
*return_code = Http_Status_MethodNotAllowed;
......@@ -117,6 +124,7 @@ void stream_ctl_proc(struct dap_http_simple *cl_st, void * arg)
if(l_new_session){
ss = dap_stream_session_pure_new();
strncpy(ss->active_channels, l_channels_str, l_channels_str_size);
char *key_str = calloc(1, KEX_KEY_STR_SIZE);
dap_random_string_fill(key_str, KEX_KEY_STR_SIZE);
ss->key = dap_enc_key_new_generate(s_socket_forward_key.type, key_str, KEX_KEY_STR_SIZE,
......
......@@ -123,10 +123,14 @@ size_t dap_stream_pkt_write(struct dap_stream * sid, const void * data, uint32_t
ret+=dap_udp_client_write(sid->conn,&pkt_hdr,sizeof(pkt_hdr));
ret+=dap_udp_client_write(sid->conn,sid->buf,pkt_hdr.size);
}
else{
else if(sid->conn){
ret+=dap_client_remote_write(sid->conn,&pkt_hdr,sizeof(pkt_hdr));
ret+=dap_client_remote_write(sid->conn,sid->buf,pkt_hdr.size);
}
else if(sid->events_socket) {
ret += dap_events_socket_write(sid->events_socket, &pkt_hdr, sizeof(pkt_hdr));
ret += dap_events_socket_write(sid->events_socket, sid->buf, pkt_hdr.size);
}
return ret;
}
......@@ -135,9 +139,10 @@ extern void dap_stream_send_keepalive(struct dap_stream * sid)
{
for(int i=0;i<sid->channel_count;i++)
if(sid->channel[i]->proc){
if(sid->channel[i]->proc->id == SERVICE_CHANNEL_ID)
stream_ch_send_keepalive(sid->channel[i]);
dap_stream_ch_set_ready_to_write(sid->channel[i],true);
if(sid->channel[i]->proc->id == SERVICE_CHANNEL_ID){
dap_stream_ch_send_keepalive(sid->channel[i]);
dap_stream_ch_set_ready_to_write(sid->channel[i],true);
}
}
}
......
......@@ -25,9 +25,9 @@
#define STREAM_PKT_SIZE_MAX 500000
struct dap_stream;
#define DATA_PACKET 0x00
#define SERVICE_PACKET 0xff
#define KEEPALIVE_PACKET 0x11
#define STREAM_PKT_TYPE_DATA_PACKET 0x00
#define STREAM_PKT_TYPE_SERVICE_PACKET 0xff
//#define STREAM_PKT_TYPE_KEEPALIVE 0x11
typedef struct stream_pkt_hdr{
uint8_t sig[8]; // Signature to find out beginning of the frame
......
Subproject commit d2257789e0c796a5a3b637e14dcbaf8a8c7880cc
Subproject commit b76175acc517f085c319c8e66c62bd143f96bf94
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment