From c434090b03c59d737d557abbbb9b573791d0e8a8 Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net> Date: Sun, 2 Dec 2018 12:19:05 +0700 Subject: [PATCH] [*] Renames [+] Client mode funcs --- stream/CMakeLists.txt | 2 +- stream/stream.c | 76 ++++++++++++++++++++++++++++++------------- stream/stream.h | 34 ++++++++++++------- stream/stream_ctl.c | 31 +++++------------- stream/stream_pkt.c | 12 +++---- stream/stream_pkt.h | 14 ++++---- 6 files changed, 99 insertions(+), 70 deletions(-) diff --git a/stream/CMakeLists.txt b/stream/CMakeLists.txt index c4ceb87..2946877 100644 --- a/stream/CMakeLists.txt +++ b/stream/CMakeLists.txt @@ -8,7 +8,7 @@ set(STREAM_SRCS add_library(${PROJECT_NAME} STATIC ${STREAM_SRCS}) -target_link_libraries(dap_stream dap_core dap_udp_server +target_link_libraries(dap_stream dap_core dap_udp_server dap_client dap_crypto dap_http_server dap_enc_server dap_session dap_stream_ch) target_include_directories(dap_stream INTERFACE .) diff --git a/stream/stream.c b/stream/stream.c index e28062f..be51176 100644 --- a/stream/stream.c +++ b/stream/stream.c @@ -60,7 +60,7 @@ void stream_delete(dap_http_client_t * sh, void * arg); struct ev_loop *keepalive_loop; pthread_t keepalive_thread; -void start_keepalive(struct stream *sid); +void start_keepalive(struct dap_stream *sid); // Start keepalive stream void* stream_loop(void * arg) @@ -76,7 +76,7 @@ void* stream_loop(void * arg) */ int stream_init() { - if( stream_ch_init() != 0 ){ + if( dap_stream_ch_init() != 0 ){ log_it(L_CRITICAL, "Can't init channel types submodule"); return -1; } @@ -91,7 +91,7 @@ int stream_init() */ void stream_deinit() { - stream_ch_deinit(); + dap_stream_ch_deinit(); } /** @@ -121,7 +121,7 @@ void stream_add_proc_udp(dap_udp_server_t * sh) * @brief stream_states_update * @param sid stream instance */ -void stream_states_update(struct stream *sid) +void stream_states_update(struct dap_stream *sid) { if(sid->conn_http) sid->conn_http->state_write=DAP_HTTP_CLIENT_STATE_START; @@ -174,13 +174,13 @@ void stream_headers_read(dap_http_client_t * cl_ht, void * arg) //cl_ht->client->ready_to_write=true; cl_ht->state_read=DAP_HTTP_CLIENT_STATE_DATA; cl_ht->out_content_ready=true; - stream_ch_new(sid,SERVICE_CHANNEL_ID); - stream_ch_new(sid,'t'); + dap_stream_ch_new(sid,SERVICE_CHANNEL_ID); + dap_stream_ch_new(sid,'t'); stream_states_update(sid); dap_client_remote_ready_to_read(cl_ht->client,true); }else{ - stream_ch_new(sid,SERVICE_CHANNEL_ID); - stream_ch_new(sid,'g'); + dap_stream_ch_new(sid,SERVICE_CHANNEL_ID); + dap_stream_ch_new(sid,'g'); cl_ht->reply_status_code=200; strcpy(cl_ht->reply_reason_phrase,"OK"); @@ -242,8 +242,8 @@ void check_session(unsigned int id, dap_client_remote_t* cl){ if(ss->create_empty) log_it(L_INFO, "Session created empty"); log_it(L_INFO, "Opened stream session technical and data channels"); - stream_ch_new(sid,SERVICE_CHANNEL_ID); - stream_ch_new(sid,DATA_CHANNEL_ID); + dap_stream_ch_new(sid,SERVICE_CHANNEL_ID); + dap_stream_ch_new(sid,DATA_CHANNEL_ID); stream_states_update(sid); if(STREAM(cl)->conn_udp) dap_udp_client_ready_to_read(cl,true); @@ -275,6 +275,24 @@ dap_stream_t * stream_new(dap_http_client_t * sh) return ret; } + +/** + * @brief dap_stream_new_es + * @param a_es + * @return + */ +dap_stream_t* dap_stream_new_es(dap_events_socket_t * a_es) +{ + dap_stream_t * ret= DAP_NEW_Z(dap_stream_t); + + ret->events_socket = a_es; + ret->buf_defrag_size=0; + ret->is_client_to_uplink = true; + + log_it(L_NOTICE,"New stream with events socket instance for %s",a_es->hostaddr); + return ret; +} + /** * @brief stream_headers_write Prepare headers for output. Creates stream structure * @param sh HTTP client instance @@ -301,7 +319,7 @@ void stream_headers_write(dap_http_client_t * sh, void *arg) // Function for keepalive loop static void keepalive_cb (EV_P_ ev_timer *w, int revents) { - struct stream *sid = w->data; + struct dap_stream *sid = w->data; if(sid->keepalive_passed < STREAM_KEEPALIVE_PASSES) { stream_send_keepalive(sid); @@ -319,7 +337,7 @@ static void keepalive_cb (EV_P_ ev_timer *w, int revents) * @brief start_keepalive Start keepalive signals exchange for stream * @param sid Stream instance */ -void start_keepalive(struct stream *sid){ +void start_keepalive(struct dap_stream *sid){ keepalive_loop = EV_DEFAULT; sid->keepalive_watcher.data = sid; ev_timer_init (&sid->keepalive_watcher, keepalive_cb, STREAM_KEEPALIVE_TIMEOUT, STREAM_KEEPALIVE_TIMEOUT); @@ -342,14 +360,28 @@ void stream_data_write(dap_http_client_t * sh, void * arg) } } - - +/** + * @brief stream_dap_data_read + * @param sh + * @param arg + */ void stream_dap_data_read(dap_client_remote_t* sh, void * arg) { - dap_stream_t * a_stream =STREAM(sh); + dap_stream_t * l_stream =STREAM(sh); int * ret = (int *) arg; + + *ret = dap_stream_data_proc_read( l_stream); +} + +/** + * @brief dap_stream_data_proc_read + * @param a_stream + * @return + */ +size_t dap_stream_data_proc_read (dap_stream_t *a_stream) +{ bool found_sig=false; - stream_pkt_t * pkt=NULL; + dap_stream_pkt_t * pkt=NULL; uint8_t * proc_data= a_stream->conn->buf_in; bool proc_data_defrag=false; // We are or not in defrag buffer size_t read_bytes_to=0; @@ -359,13 +391,13 @@ void stream_dap_data_read(dap_client_remote_t* sh, void * arg) if(a_stream->pkt_buf_in_data_size < sizeof(stream_pkt_hdr_t)) { //At first read header - stream_pkt_t* check_pkt = stream_pkt_detect( proc_data , sizeof(stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size); + dap_stream_pkt_t* check_pkt = stream_pkt_detect( proc_data , sizeof(stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size); if(check_pkt){ // Got duplication of packet header several times //log_it(L_DEBUG, "Drop incorrect header part"); a_stream->pkt_buf_in = NULL; a_stream->pkt_buf_in_data_size=0; - return; + return 0; } if(sizeof(stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size > bytes_left_to_read) read_bytes_to = bytes_left_to_read; @@ -433,7 +465,7 @@ void stream_dap_data_read(dap_client_remote_t* sh, void * arg) size_t pkt_offset=( ((uint8_t*)pkt)- proc_data ); bytes_left_to_read -= pkt_offset ; found_sig=true; - stream_pkt_t* temp_pkt = stream_pkt_detect( (uint8_t*)pkt + 1 ,pkt->hdr.size+sizeof(stream_pkt_hdr_t) ); + dap_stream_pkt_t* temp_pkt = stream_pkt_detect( (uint8_t*)pkt + 1 ,pkt->hdr.size+sizeof(stream_pkt_hdr_t) ); if(bytes_left_to_read <(pkt->hdr.size+sizeof(stream_pkt_hdr_t) )){ // Is all the packet in da buf? read_bytes_to=bytes_left_to_read; }else{ @@ -444,7 +476,7 @@ void stream_dap_data_read(dap_client_remote_t* sh, void * arg) if(read_bytes_to > HEADER_WITH_SIZE_FIELD){ // If we have size field, we can allocate memory a_stream->pkt_buf_in_size_expected =( pkt->hdr.size+sizeof(stream_pkt_hdr_t)); size_t pkt_buf_in_size_expected=a_stream->pkt_buf_in_size_expected; - a_stream->pkt_buf_in=(stream_pkt_t *) malloc(pkt_buf_in_size_expected); + a_stream->pkt_buf_in=(dap_stream_pkt_t *) malloc(pkt_buf_in_size_expected); if(read_bytes_to>(pkt->hdr.size+sizeof(stream_pkt_hdr_t) )){ //log_it(L_WARNING,"For some strange reasons we have read_bytes_to=%u is bigger than expected pkt length(%u bytes). Dropped %u bytes", // pkt->hdr.size+sizeof(stream_pkt_hdr_t),read_bytes_to- pkt->hdr.size+sizeof(stream_pkt_hdr_t)); @@ -489,7 +521,7 @@ void stream_dap_data_read(dap_client_remote_t* sh, void * arg) }else if(proc_data_defrag){ a_stream->buf_defrag_size=0; } - *ret = a_stream->conn->buf_in_size; + return a_stream->conn->buf_in_size; } @@ -534,7 +566,7 @@ void stream_dap_delete(dap_client_remote_t* sh, void * arg){ (void) arg; size_t i; for(i=0;i<sid->channel_count; i++) - stream_ch_delete(sid->channel[i]); + dap_stream_ch_delete(sid->channel[i]); if(sid->session) dap_stream_session_close(sid->session->id); //free(sid); diff --git a/stream/stream.h b/stream/stream.h index 950f9bc..f3e2c14 100644 --- a/stream/stream.h +++ b/stream/stream.h @@ -33,25 +33,25 @@ #include "dap_stream_ch.h" #include "dap_udp_server.h" #include "dap_udp_client.h" - +#include "dap_events_socket.h" #define CHUNK_SIZE_MAX 3*1024 -struct dap_client_remote; -struct dap_udp_server_t; +typedef struct dap_client_remote dap_client_remote_t; +typedef struct dap_udp_server dap_udp_server_t; -struct dap_http_client; -struct dap_http; -struct stream; -struct stream_pkt; +typedef struct dap_http_client dap_http_client_t; +typedef struct dap_http dap_http_t; +typedef struct dap_stream dap_stream_t; +typedef struct dap_stream_pkt dap_stream_pkt_t; #define STREAM_BUF_SIZE_MAX 20480 #define STREAM_KEEPALIVE_TIMEOUT 3 // How often send keeplive messages (seconds) #define STREAM_KEEPALIVE_PASSES 3 // How many messagges without answers need for disconnect client and close session -typedef void (*stream_callback)(struct stream*,void*); +typedef void (*stream_callback)(struct dap_stream*,void*); -typedef struct stream { +typedef struct dap_stream { int id; dap_stream_session_t * session; @@ -61,13 +61,16 @@ typedef struct stream { struct dap_udp_client * conn_udp; // UDP-client + dap_events_socket_t * events_socket; + bool is_live; + bool is_client_to_uplink ; ev_timer keepalive_watcher; // Watcher for keepalive loop uint8_t keepalive_passed; // Number of sended keepalive messages - struct stream_pkt * in_pkt; - struct stream_pkt *pkt_buf_in; + struct dap_stream_pkt * in_pkt; + struct dap_stream_pkt *pkt_buf_in; size_t pkt_buf_in_data_size; size_t pkt_buf_in_size_expected; @@ -96,4 +99,13 @@ void stream_add_proc_http(struct dap_http * sh, const char * url); void stream_add_proc_udp(dap_udp_server_t * sh); +dap_stream_t* dap_stream_new_es(dap_events_socket_t * a_es); +size_t dap_stream_data_proc_read(dap_stream_t * a_stream); +size_t dap_stream_data_proc_write(dap_stream_t * a_stream); +void dap_stream_delete(dap_stream_t * a_stream); +void dap_stream_proc_pkt_in(dap_stream_t * sid); + +void dap_stream_es_rw_states_update(struct dap_stream *a_stream); + + #endif diff --git a/stream/stream_ctl.c b/stream/stream_ctl.c index 7255f92..ac596f0 100644 --- a/stream/stream_ctl.c +++ b/stream/stream_ctl.c @@ -89,48 +89,33 @@ void stream_ctl_proc(struct dap_http_simple *cl_st, void * arg) // unsigned int proto_version; dap_stream_session_t * ss=NULL; // unsigned int action_cmd=0; - bool openPreview; - bool socket_forward=false; + bool l_new_session = false; enc_http_delegate_t *dg = enc_http_request_decode(cl_st); if(dg){ - if(strcmp(dg->url_path,"open")==0) - openPreview=false; - else if (strcmp(dg->url_path,"open_preview")==0) - openPreview=true; - else if (strcmp(dg->url_path,"socket_forward")==0){ - socket_forward=true; + if (strcmp(dg->url_path,"socket_forward")==0){ + l_new_session = true; + }else if (strcmp(dg->url_path,"stream_ctl")==0) { + l_new_session = true; }else{ log_it(L_ERROR,"ctl command unknown: %s",dg->url_path); enc_http_delegate_delete(dg); *isOk=false; return; } - if(socket_forward){ - log_it(L_INFO,"[ctl] Play request for db_id=%d",db_id); + if(l_new_session){ ss = dap_stream_session_pure_new(); - char *key_str = calloc(1, KEX_KEY_STR_SIZE); dap_random_string_fill(key_str, KEX_KEY_STR_SIZE); ss->key = dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_IAES, key_str, strlen(key_str), NULL, 0, 0); enc_http_reply_f(dg,"%u %s",ss->id,key_str); dg->isOk=true; + log_it(L_INFO," New stream session %u initialized",ss->id); + free(key_str); - }else if(sscanf( dg->in_query ,"db_id=%u",&db_id)==1){ -// log_it(L_INFO,"[ctl] Play request for db_id=%d",db_id); -// ss=dap_stream_session_new(db_id,openPreview); - -// char key_str[255]; -// for(int i = 0; i < sizeof(key_str); i++) -// key_str[i] = 65 + rand() % 25; - -// ss->key=dap_enc_key_new_from_str(DAP_ENC_KEY_TYPE_AES,key_str); -// enc_http_reply_f(dg,"%u %s",ss->id,key_str); -// dg->isOk=true; -// log_it(L_DEBUG,"Stream AES key string %s",key_str); }else{ log_it(L_ERROR,"Wrong request: \"%s\"",dg->in_query); dg->isOk=false; diff --git a/stream/stream_pkt.c b/stream/stream_pkt.c index 7b377d9..88779e2 100644 --- a/stream/stream_pkt.c +++ b/stream/stream_pkt.c @@ -46,10 +46,10 @@ const size_t dap_hdr_size=8+2+1+1+4; const uint8_t dap_sig[8]={0xa0,0x95,0x96,0xa9,0x9e,0x5c,0xfb,0xfa}; -stream_pkt_t * stream_pkt_detect(void * data, uint32_t data_size) +dap_stream_pkt_t * stream_pkt_detect(void * data, uint32_t data_size) { void * sig_start=data; - stream_pkt_t * ret=NULL; + dap_stream_pkt_t * ret=NULL; uint32_t length_left=data_size; while(sig_start=memchr(sig_start, dap_sig[0],length_left) ){ length_left= data_size-( sig_start-data); @@ -81,7 +81,7 @@ size_t encode_dummy(const void * buf, const size_t buf_size, void * buf_out){ * @param pkt * @param buf_out */ -size_t stream_pkt_read(struct stream * sid,struct stream_pkt * pkt, void * buf_out, size_t buf_out_size) +size_t stream_pkt_read(struct dap_stream * sid,struct dap_stream_pkt * pkt, void * buf_out, size_t buf_out_size) { size_t ds = dap_enc_iaes256_cbc_decrypt_fast(sid->session->key,pkt->data,pkt->hdr.size,buf_out, buf_out_size); // log_it(L_DEBUG,"Stream decoded %lu bytes ( last bytes 0x%02x 0x%02x 0x%02x 0x%02x ) ", ds, @@ -104,7 +104,7 @@ size_t stream_pkt_read(struct stream * sid,struct stream_pkt * pkt, void * buf_o * @return */ -size_t stream_pkt_write(struct stream * sid, const void * data, uint32_t data_size) +size_t stream_pkt_write(struct dap_stream * sid, const void * data, uint32_t data_size) { size_t ret=0; stream_pkt_hdr_t pkt_hdr; @@ -131,13 +131,13 @@ size_t stream_pkt_write(struct stream * sid, const void * data, uint32_t data_si } -extern void stream_send_keepalive(struct stream * sid) +extern void stream_send_keepalive(struct dap_stream * sid) { for(int i=0;i<sid->channel_count;i++) if(sid->channel[i]->proc){ if(sid->channel[i]->proc->id == SERVICE_CHANNEL_ID) stream_ch_send_keepalive(sid->channel[i]); - stream_ch_set_ready_to_write(sid->channel[i],true); + dap_stream_ch_set_ready_to_write(sid->channel[i],true); } } diff --git a/stream/stream_pkt.h b/stream/stream_pkt.h index 53b608c..38a2ce4 100644 --- a/stream/stream_pkt.h +++ b/stream/stream_pkt.h @@ -25,7 +25,7 @@ #include <stddef.h> #define STREAM_PKT_SIZE_MAX 100000 -struct stream; +struct dap_stream; #define DATA_PACKET 0x00 #define SERVICE_PACKET 0xff @@ -40,10 +40,10 @@ typedef struct stream_pkt_hdr{ uint64_t d_addr; // Destination address ( general#domain.net ) } __attribute__((packed)) stream_pkt_hdr_t; -typedef struct stream_pkt{ +typedef struct dap_stream_pkt{ stream_pkt_hdr_t hdr; uint8_t data[]; -} __attribute__((packed)) stream_pkt_t; +} __attribute__((packed)) dap_stream_pkt_t; typedef struct stream_srv_pkt{ uint32_t session_id; @@ -54,12 +54,12 @@ typedef struct stream_srv_pkt{ extern const uint8_t dap_sig[8]; -extern stream_pkt_t * stream_pkt_detect(void * data, uint32_t data_size); +extern dap_stream_pkt_t * stream_pkt_detect(void * data, uint32_t data_size); -extern size_t stream_pkt_read(struct stream * sid,struct stream_pkt * pkt, void * buf_out, size_t buf_out_size); +extern size_t stream_pkt_read(struct dap_stream * sid,struct dap_stream_pkt * pkt, void * buf_out, size_t buf_out_size); -extern size_t stream_pkt_write(struct stream * sid, const void * data, uint32_t data_size); +extern size_t stream_pkt_write(struct dap_stream * sid, const void * data, uint32_t data_size); -extern void stream_send_keepalive(struct stream * sid); +extern void stream_send_keepalive(struct dap_stream * sid); #endif -- GitLab