diff --git a/dap_stream_ch.c b/dap_stream_ch.c index da0f5c055feb8e74b9c78702c4c385b5fb46ee67..03d8b83d267f9c97b7cca017b2abc0cd24f8cd82 100644 --- a/dap_stream_ch.c +++ b/dap_stream_ch.c @@ -40,7 +40,7 @@ int dap_stream_ch_init() log_it(L_CRITICAL,"Can't init stream channel proc submodule"); return -1; } - if(stream_ch_pkt_init() != 0 ){ + if(dap_stream_ch_pkt_init() != 0 ){ log_it(L_CRITICAL,"Can't init stream channel packet submodule"); return -1; } @@ -98,7 +98,6 @@ void dap_stream_ch_delete(dap_stream_ch_t*ch) //free(ch); } - void dap_stream_ch_set_ready_to_write(dap_stream_ch_t * ch,bool is_ready) { pthread_mutex_lock(&ch->mutex); @@ -109,8 +108,12 @@ void dap_stream_ch_set_ready_to_write(dap_stream_ch_t * ch,bool is_ready) ch->stream->conn_http->state_write=DAP_HTTP_CLIENT_STATE_DATA; if(ch->stream->conn_udp) dap_udp_client_ready_to_write(ch->stream->conn,is_ready); - else + // for stream server + else if(ch->stream->conn) dap_client_remote_ready_to_write(ch->stream->conn,is_ready); + // for stream client + else if(ch->stream->events_socket) + dap_events_socket_set_writable(ch->stream->events_socket, is_ready); } pthread_mutex_unlock(&ch->mutex); } diff --git a/dap_stream_ch_pkt.c b/dap_stream_ch_pkt.c index 76b1196c130d6b4e0734d723d48aa849d0cbda1e..e7cc7b693a3a23384143a870f52c63a6ab8dae0b 100644 --- a/dap_stream_ch_pkt.c +++ b/dap_stream_ch_pkt.c @@ -38,13 +38,13 @@ * @brief stream_ch_pkt_init * @return Zero if ok */ -int stream_ch_pkt_init() +int dap_stream_ch_pkt_init() { return 0; } -void stream_ch_pkt_deinit() +void dap_stream_ch_pkt_deinit() { } @@ -57,33 +57,33 @@ void stream_ch_pkt_deinit() * @param data_size * @return */ -size_t stream_ch_pkt_write(struct dap_stream_ch * ch, uint8_t type, const void * data, uint32_t data_size) +size_t dap_stream_ch_pkt_write(struct dap_stream_ch * a_ch, uint8_t a_type, const void * a_data, uint32_t a_data_size) { - pthread_mutex_lock( &ch->mutex); + pthread_mutex_lock( &a_ch->mutex); //log_it(L_DEBUG,"Output: Has %u bytes of %c type for %c channel id",data_size, (char)type, (char) ch->proc->id ); - dap_stream_ch_pkt_hdr_t hdr; + dap_stream_ch_pkt_hdr_t l_hdr; - memset(&hdr,0,sizeof(hdr)); - hdr.id = ch->proc->id; - hdr.size=data_size; - hdr.type=type; - hdr.enc_type = ch->proc->enc_type; - hdr.seq_id=ch->stream->seq_id; - ch->stream->seq_id++; + memset(&l_hdr,0,sizeof(l_hdr)); + l_hdr.id = a_ch->proc->id; + l_hdr.size=a_data_size; + l_hdr.type=a_type; + l_hdr.enc_type = a_ch->proc->enc_type; + l_hdr.seq_id=a_ch->stream->seq_id; + a_ch->stream->seq_id++; - if(data_size+sizeof(hdr)> sizeof(ch->buf) ){ - log_it(L_ERROR,"Too big data size %lu, bigger than encryption buffer size %lu",data_size,sizeof(ch->buf)); - data_size=sizeof(ch->buf)-sizeof(hdr); + if(a_data_size+sizeof(l_hdr)> sizeof(a_ch->buf) ){ + log_it(L_ERROR,"Too big data size %lu, bigger than encryption buffer size %lu", a_data_size, sizeof(a_ch->buf)); + a_data_size=sizeof(a_ch->buf)-sizeof(l_hdr); } - memcpy(ch->buf,&hdr,sizeof(hdr) ); - memcpy(ch->buf+sizeof(hdr),data,data_size ); + memcpy(a_ch->buf,&l_hdr,sizeof(l_hdr) ); + memcpy(a_ch->buf+sizeof(l_hdr),a_data,a_data_size ); - size_t ret=dap_stream_pkt_write(ch->stream,ch->buf,data_size+sizeof(hdr)); - ch->stat.bytes_write+=data_size; - pthread_mutex_unlock( &ch->mutex); - return ret; + size_t l_ret=dap_stream_pkt_write(a_ch->stream,a_ch->buf,a_data_size+sizeof(l_hdr)); + a_ch->stat.bytes_write+=a_data_size; + pthread_mutex_unlock( &a_ch->mutex); + return l_ret; } @@ -93,14 +93,14 @@ size_t stream_ch_pkt_write(struct dap_stream_ch * ch, uint8_t type, const void * @param str * @return */ -size_t stream_ch_pkt_write_f(struct dap_stream_ch * ch, uint8_t type, const char * str,...) +size_t dap_stream_ch_pkt_write_f(struct dap_stream_ch * a_ch, uint8_t a_type, const char * a_str,...) { - char buf[4096]; + char l_buf[4096]; va_list ap; - va_start(ap,str); - vsnprintf(buf,sizeof(buf),str,ap); + va_start(ap,a_str); + vsnprintf(l_buf,sizeof(l_buf),a_str,ap); va_end(ap); - size_t ret=stream_ch_pkt_write(ch,type,buf,strlen(buf)); + size_t ret=dap_stream_ch_pkt_write(a_ch,a_type,l_buf,strlen(l_buf)); return ret; } @@ -109,21 +109,22 @@ size_t stream_ch_pkt_write_f(struct dap_stream_ch * ch, uint8_t type, const char * @param ch * @return */ -size_t stream_ch_send_keepalive(struct dap_stream_ch * ch){ - pthread_mutex_lock( &ch->mutex); +size_t dap_stream_ch_send_keepalive(struct dap_stream_ch * a_ch) +{ + pthread_mutex_lock( &a_ch->mutex); - dap_stream_ch_pkt_hdr_t hdr; + dap_stream_ch_pkt_hdr_t l_hdr; - memset(&hdr,0,sizeof(hdr)); - hdr.id = ch->proc->id; - hdr.size=0; - hdr.type=KEEPALIVE_PACKET; - hdr.enc_type = ch->proc->enc_type; - hdr.seq_id=0; + memset(&l_hdr,0,sizeof(l_hdr)); + l_hdr.id = a_ch->proc->id; + l_hdr.size=0; + l_hdr.type=STREAM_CH_PKT_TYPE_KEEPALIVE; + l_hdr.enc_type = a_ch->proc->enc_type; + l_hdr.seq_id=0; - memcpy(ch->buf,&hdr,sizeof(hdr) ); + memcpy(a_ch->buf,&l_hdr,sizeof(l_hdr) ); - size_t ret=dap_stream_pkt_write(ch->stream,ch->buf,sizeof(hdr)); - pthread_mutex_unlock( &ch->mutex); - return ret; + size_t l_ret=dap_stream_pkt_write(a_ch->stream,a_ch->buf,sizeof(l_hdr)); + pthread_mutex_unlock( &a_ch->mutex); + return l_ret; } diff --git a/dap_stream_ch_pkt.h b/dap_stream_ch_pkt.h index db92bc077eb4d657ea589fa33d44c78d55a8b3ff..4b570080b63327e39da659981c559d6a4429f7cd 100644 --- a/dap_stream_ch_pkt.h +++ b/dap_stream_ch_pkt.h @@ -21,7 +21,8 @@ #pragma once -#define KEEPALIVE_PACKET 0x11 +#define STREAM_CH_PKT_TYPE_REQUEST 0x0 +#define STREAM_CH_PKT_TYPE_KEEPALIVE 0x11 #include <stdint.h> #include <stddef.h> @@ -41,10 +42,10 @@ typedef struct dap_stream_ch_pkt{ uint8_t data[]; } __attribute__((packed)) dap_stream_ch_pkt_t; -int stream_ch_pkt_init(); -void stream_ch_pkt_deinit(); +int dap_stream_ch_pkt_init(); +void dap_stream_ch_pkt_deinit(); -size_t stream_ch_pkt_write_f(struct dap_stream_ch * ch, uint8_t type, const char * str,...); -size_t stream_ch_pkt_write(struct dap_stream_ch * ch, uint8_t type, const void * data, uint32_t data_size); +size_t dap_stream_ch_pkt_write_f(struct dap_stream_ch * a_ch, uint8_t a_type, const char * a_str,...); +size_t dap_stream_ch_pkt_write(struct dap_stream_ch * a_ch, uint8_t a_type, const void * a_data, uint32_t a_data_size); -size_t stream_ch_send_keepalive(struct dap_stream_ch * ch); +size_t dap_stream_ch_send_keepalive(struct dap_stream_ch * a_ch);