diff --git a/libdap b/libdap index 842029d3892f8794ff7f1a85af6d024ce62e1796..999a4bc46231c2398a8d010dee06628965b3c478 160000 --- a/libdap +++ b/libdap @@ -1 +1 @@ -Subproject commit 842029d3892f8794ff7f1a85af6d024ce62e1796 +Subproject commit 999a4bc46231c2398a8d010dee06628965b3c478 diff --git a/libdap-server b/libdap-server index 2d675d9e6d94ae8961f97af0ee7e16c2816a24af..1b3a871d6ec93801f14081de51157bbb703593f1 160000 --- a/libdap-server +++ b/libdap-server @@ -1 +1 @@ -Subproject commit 2d675d9e6d94ae8961f97af0ee7e16c2816a24af +Subproject commit 1b3a871d6ec93801f14081de51157bbb703593f1 diff --git a/libdap-server-core b/libdap-server-core index a655fc5695e7da0b58c7cb5188d287dcf83f6939..7c6065bc699760e6e66ee4e80861a3562b2366c1 160000 --- a/libdap-server-core +++ b/libdap-server-core @@ -1 +1 @@ -Subproject commit a655fc5695e7da0b58c7cb5188d287dcf83f6939 +Subproject commit 7c6065bc699760e6e66ee4e80861a3562b2366c1 diff --git a/libdap-server-udp b/libdap-server-udp index 2564d01f7663a9b3fd0c30d9f2aefe0580b327cd..19a6376646f497d97bfe1ea3fade1a907c32f76a 160000 --- a/libdap-server-udp +++ b/libdap-server-udp @@ -1 +1 @@ -Subproject commit 2564d01f7663a9b3fd0c30d9f2aefe0580b327cd +Subproject commit 19a6376646f497d97bfe1ea3fade1a907c32f76a diff --git a/libdap-stream-ch b/libdap-stream-ch index e12cd24dfca8b778cf63d1c957d04f7d2471f75c..dfbdaa1df0498069e60ffaf42de963375bffa816 160000 --- a/libdap-stream-ch +++ b/libdap-stream-ch @@ -1 +1 @@ -Subproject commit e12cd24dfca8b778cf63d1c957d04f7d2471f75c +Subproject commit dfbdaa1df0498069e60ffaf42de963375bffa816 diff --git a/session/dap_stream_session.c b/session/dap_stream_session.c index 36824d1dc28c5879f4e30e1f52ce0414e95b298f..b0ca4b1805871b27cc00b4ef09f4b201278c31c5 100644 --- a/session/dap_stream_session.c +++ b/session/dap_stream_session.c @@ -91,7 +91,13 @@ dap_stream_session_t * dap_stream_session_id(unsigned int id) int dap_stream_session_close(unsigned int id) { - return stream_session_close2(dap_stream_session_id(id)); + log_it(L_INFO,"Close session id=%d", id); + dap_stream_session_t *l_s = dap_stream_session_id(id); + if(!l_s) { + log_it(L_WARNING, "Session id=%d not found", id); + return -1; + } + return stream_session_close2(l_s); } int stream_session_close2(dap_stream_session_t * s) diff --git a/session/dap_stream_session.h b/session/dap_stream_session.h index d5233af957a5b4f1c50c93fcc171fd1fc1aa10f3..7d7c53482df984248bce8927bc6dce7ff4302d54 100644 --- a/session/dap_stream_session.h +++ b/session/dap_stream_session.h @@ -46,6 +46,8 @@ struct dap_stream_session { uint8_t enc_type; + char active_channels[16];// channels for open + stream_session_connection_type_t conn_type; stream_session_type_t type; UT_hash_handle hh; diff --git a/stream/dap_stream.c b/stream/dap_stream.c index 335375464899d1a9a08b09f5bdf06b1c198bd430..2ca3131592fa3103e3bc7eecd8d8bb25cb66f36b 100644 --- a/stream/dap_stream.c +++ b/stream/dap_stream.c @@ -55,7 +55,7 @@ void stream_dap_delete(dap_client_remote_t* sh, void * arg); void stream_dap_new(dap_client_remote_t* sh,void * arg); // Internal functions -dap_stream_t * stream_new(dap_http_client_t * sh); // Create new stream +dap_stream_t * stream_new(dap_http_client_t * a_sh); // Create new stream void stream_delete(dap_http_client_t * sh, void * arg); struct ev_loop *keepalive_loop; @@ -154,7 +154,8 @@ void stream_headers_read(dap_http_client_t * cl_ht, void * arg) log_it(L_DEBUG,"Prepare data stream"); if(cl_ht->in_query_string[0]){ log_it(L_INFO,"Query string [%s]",cl_ht->in_query_string); - if(sscanf(cl_ht->in_query_string,"fj913htmdgaq-d9hf=%u",&id)==1){ +// if(sscanf(cl_ht->in_query_string,"fj913htmdgaq-d9hf=%u",&id)==1){ + if(sscanf(cl_ht->in_query_string,"session_id=%u",&id)==1){ dap_stream_session_t * ss=NULL; ss=dap_stream_session_id(id); if(ss==NULL){ @@ -166,6 +167,12 @@ void stream_headers_read(dap_http_client_t * cl_ht, void * arg) if(dap_stream_session_open(ss)==0){ // Create new stream dap_stream_t * sid = stream_new(cl_ht); sid->session=ss; + size_t count_channels = strlen(ss->active_channels); + for(size_t i = 0; i < count_channels; i++) { + dap_stream_ch_new(sid, ss->active_channels[i]); + //sid->channel[i]->ready_to_write = true; + } + ss->create_empty = false; if(ss->create_empty){ log_it(L_INFO, "Opened stream session with only technical channels"); @@ -176,12 +183,12 @@ void stream_headers_read(dap_http_client_t * cl_ht, void * arg) cl_ht->state_read=DAP_HTTP_CLIENT_STATE_DATA; cl_ht->out_content_ready=true; dap_stream_ch_new(sid,SERVICE_CHANNEL_ID); - dap_stream_ch_new(sid,'t'); + //dap_stream_ch_new(sid,'t'); stream_states_update(sid); dap_client_remote_ready_to_read(cl_ht->client,true); }else{ dap_stream_ch_new(sid,SERVICE_CHANNEL_ID); - dap_stream_ch_new(sid,'g'); + //dap_stream_ch_new(sid,'g'); cl_ht->reply_status_code=200; strcpy(cl_ht->reply_reason_phrase,"OK"); @@ -261,12 +268,12 @@ void check_session(unsigned int id, dap_client_remote_t* cl){ * @brief stream_new Create new stream instance for HTTP client * @return New stream_t instance */ -dap_stream_t * stream_new(dap_http_client_t * sh) +dap_stream_t * stream_new(dap_http_client_t * a_sh) { dap_stream_t * ret=(dap_stream_t*) calloc(1,sizeof(dap_stream_t)); - ret->conn = sh->client; - ret->conn_http=sh; + ret->conn = a_sh->client; + ret->conn_http=a_sh; ret->buf_defrag_size = 0; ret->seq_id = 0; ret->client_last_seq_id_packet = (size_t)-1; @@ -396,10 +403,12 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) { bool found_sig=false; dap_stream_pkt_t * pkt=NULL; - uint8_t * proc_data= a_stream->conn->buf_in; + char *buf_in = (a_stream->conn) ? (char*)a_stream->conn->buf_in : (char*)a_stream->events_socket->buf_in; + size_t buf_in_size = (a_stream->conn) ? a_stream->conn->buf_in_size : a_stream->events_socket->buf_in_size; + uint8_t * proc_data = buf_in;//a_stream->conn->buf_in; bool proc_data_defrag=false; // We are or not in defrag buffer size_t read_bytes_to=0; - size_t bytes_left_to_read=a_stream->conn->buf_in_size; + size_t bytes_left_to_read = buf_in_size;//a_stream->conn->buf_in_size; // Process prebuffered packets or glue defragmented data with the current input if(pkt=a_stream->pkt_buf_in){ // Packet signature detected if(a_stream->pkt_buf_in_data_size < sizeof(stream_pkt_hdr_t)) @@ -444,7 +453,7 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) stream_proc_pkt_in(a_stream); } } - proc_data=(a_stream->conn->buf_in + a_stream->conn->buf_in_size - bytes_left_to_read); + proc_data=(buf_in + buf_in_size - bytes_left_to_read);//proc_data=(a_stream->conn->buf_in + a_stream->conn->buf_in_size - bytes_left_to_read); }else if( a_stream->buf_defrag_size>0){ // If smth is present in defrag buffer - we glue everything together in it if( bytes_left_to_read > 0){ // If there is smth to process in input buffer @@ -518,10 +527,10 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) break; } } - if(!found_sig){ - //log_it(DEBUG,"Input: Not found signature in the incomming data ( client->buf_in_size = %u *ret = %u )", - // sh->client->buf_in_size, *ret); - } + /*if(!found_sig){ + log_it(L_DEBUG,"Input: Not found signature in the incomming data ( client->buf_in_size = %u *ret = %u )", + sh->client->buf_in_size, *ret); + }*/ if(bytes_left_to_read>0){ if(proc_data_defrag){ memmove(a_stream->buf_defrag, proc_data, bytes_left_to_read); @@ -535,12 +544,10 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) }else if(proc_data_defrag){ a_stream->buf_defrag_size=0; } - return a_stream->conn->buf_in_size; + return buf_in_size;//a_stream->conn->buf_in_size; } - - /** * @brief stream_dap_data_write Write callback for UDP client * @param sh DAP client instance @@ -627,11 +634,13 @@ static bool _detect_loose_packet(dap_stream_t * sid) */ void stream_proc_pkt_in(dap_stream_t * sid) { - if(sid->pkt_buf_in->hdr.type == DATA_PACKET) + if(sid->pkt_buf_in->hdr.type == STREAM_PKT_TYPE_DATA_PACKET) { dap_stream_ch_pkt_t * ch_pkt = (dap_stream_ch_pkt_t *) sid->buf_pkt_in; - dap_stream_pkt_read(sid,sid->pkt_buf_in, ch_pkt, STREAM_BUF_SIZE_MAX); + if(dap_stream_pkt_read(sid,sid->pkt_buf_in, ch_pkt, STREAM_BUF_SIZE_MAX)==0){ + log_it(L_WARNING, "Input: can't decode packet size=%d",sid->pkt_buf_in_data_size); + } _detect_loose_packet(sid); @@ -648,12 +657,12 @@ void stream_proc_pkt_in(dap_stream_t * sid) if(ch->proc) if(ch->proc->packet_in_callback) ch->proc->packet_in_callback(ch,ch_pkt); - if(ch->proc->id == SERVICE_CHANNEL_ID && ch_pkt->hdr.type == KEEPALIVE_PACKET) + if(ch->proc->id == SERVICE_CHANNEL_ID && ch_pkt->hdr.type == STREAM_CH_PKT_TYPE_KEEPALIVE) dap_stream_send_keepalive(sid); }else{ log_it(L_WARNING, "Input: unprocessed channel packet id '%c'",(char) ch_pkt->hdr.id ); } - } else if(sid->pkt_buf_in->hdr.type == SERVICE_PACKET) { + } else if(sid->pkt_buf_in->hdr.type == STREAM_PKT_TYPE_SERVICE_PACKET) { stream_srv_pkt_t * srv_pkt = (stream_srv_pkt_t *)malloc(sizeof(stream_srv_pkt_t)); memcpy(srv_pkt,sid->pkt_buf_in->data,sizeof(stream_srv_pkt_t)); uint32_t session_id = srv_pkt->session_id; diff --git a/stream/dap_stream_ctl.c b/stream/dap_stream_ctl.c index f3c62b975dc0d3a9899112cff4addc2d6fabfce0..732418814434af5108f5c9b8a190fd4a76b3f05a 100644 --- a/stream/dap_stream_ctl.c +++ b/stream/dap_stream_ctl.c @@ -104,11 +104,18 @@ void stream_ctl_proc(struct dap_http_simple *cl_st, void * arg) enc_http_delegate_t *dg = enc_http_request_decode(cl_st); if(dg){ - if (strcmp(dg->url_path,"socket_forward")==0){ + size_t l_channels_str_size = sizeof(ss->active_channels); + char l_channels_str[l_channels_str_size]; + if(dg->url_path && strlen(dg->url_path) < 30 && + sscanf(dg->url_path, "stream_ctl,channels=%s", l_channels_str) == 1) { + l_new_session = true; + } + /*if (strcmp(dg->url_path,"socket_forward")==0){ l_new_session = true; }else if (strcmp(dg->url_path,"stream_ctl")==0) { l_new_session = true; - }else{ + }*/ + else{ log_it(L_ERROR,"ctl command unknown: %s",dg->url_path); enc_http_delegate_delete(dg); *return_code = Http_Status_MethodNotAllowed; @@ -117,6 +124,7 @@ void stream_ctl_proc(struct dap_http_simple *cl_st, void * arg) if(l_new_session){ ss = dap_stream_session_pure_new(); + strncpy(ss->active_channels, l_channels_str, l_channels_str_size); char *key_str = calloc(1, KEX_KEY_STR_SIZE); dap_random_string_fill(key_str, KEX_KEY_STR_SIZE); ss->key = dap_enc_key_new_generate(s_socket_forward_key.type, key_str, KEX_KEY_STR_SIZE, diff --git a/stream/dap_stream_pkt.c b/stream/dap_stream_pkt.c index 12d21869c8710560dde353231efd56839bf00e22..a0ac038a27d809ee1467abbed9ba05454c88c5a4 100644 --- a/stream/dap_stream_pkt.c +++ b/stream/dap_stream_pkt.c @@ -123,10 +123,14 @@ size_t dap_stream_pkt_write(struct dap_stream * sid, const void * data, uint32_t ret+=dap_udp_client_write(sid->conn,&pkt_hdr,sizeof(pkt_hdr)); ret+=dap_udp_client_write(sid->conn,sid->buf,pkt_hdr.size); } - else{ + else if(sid->conn){ ret+=dap_client_remote_write(sid->conn,&pkt_hdr,sizeof(pkt_hdr)); ret+=dap_client_remote_write(sid->conn,sid->buf,pkt_hdr.size); } + else if(sid->events_socket) { + ret += dap_events_socket_write(sid->events_socket, &pkt_hdr, sizeof(pkt_hdr)); + ret += dap_events_socket_write(sid->events_socket, sid->buf, pkt_hdr.size); + } return ret; } @@ -135,9 +139,10 @@ extern void dap_stream_send_keepalive(struct dap_stream * sid) { for(int i=0;i<sid->channel_count;i++) if(sid->channel[i]->proc){ - if(sid->channel[i]->proc->id == SERVICE_CHANNEL_ID) - stream_ch_send_keepalive(sid->channel[i]); - dap_stream_ch_set_ready_to_write(sid->channel[i],true); + if(sid->channel[i]->proc->id == SERVICE_CHANNEL_ID){ + dap_stream_ch_send_keepalive(sid->channel[i]); + dap_stream_ch_set_ready_to_write(sid->channel[i],true); + } } } diff --git a/stream/dap_stream_pkt.h b/stream/dap_stream_pkt.h index d935bba9fb57f72d715dc759bb5f2218fd1cf5ff..ca072ffe36ccbe77e1d263d80999ac0bea2d8ee4 100644 --- a/stream/dap_stream_pkt.h +++ b/stream/dap_stream_pkt.h @@ -25,9 +25,9 @@ #define STREAM_PKT_SIZE_MAX 500000 struct dap_stream; -#define DATA_PACKET 0x00 -#define SERVICE_PACKET 0xff -#define KEEPALIVE_PACKET 0x11 +#define STREAM_PKT_TYPE_DATA_PACKET 0x00 +#define STREAM_PKT_TYPE_SERVICE_PACKET 0xff +//#define STREAM_PKT_TYPE_KEEPALIVE 0x11 typedef struct stream_pkt_hdr{ uint8_t sig[8]; // Signature to find out beginning of the frame diff --git a/test/libdap-test b/test/libdap-test index d2257789e0c796a5a3b637e14dcbaf8a8c7880cc..b76175acc517f085c319c8e66c62bd143f96bf94 160000 --- a/test/libdap-test +++ b/test/libdap-test @@ -1 +1 @@ -Subproject commit d2257789e0c796a5a3b637e14dcbaf8a8c7880cc +Subproject commit b76175acc517f085c319c8e66c62bd143f96bf94