From fb38bac762618168f1c28fc8e192e49d2b5f7493 Mon Sep 17 00:00:00 2001 From: anta999 <antarcticstation@gmail.com> Date: Tue, 4 Jun 2019 16:47:07 +0400 Subject: [PATCH] test1 --- session/dap_stream_session.c | 45 +++++++++++++++++++++++++++++++----- stream/dap_stream.c | 39 +++++++++++++++++++++++-------- stream/dap_stream.h | 0 stream/dap_stream_ctl.c | 0 stream/dap_stream_ctl.h | 0 stream/dap_stream_pkt.c | 8 ++++++- stream/dap_stream_pkt.h | 0 7 files changed, 75 insertions(+), 17 deletions(-) mode change 100755 => 100644 session/dap_stream_session.c mode change 100755 => 100644 stream/dap_stream.c mode change 100755 => 100644 stream/dap_stream.h mode change 100755 => 100644 stream/dap_stream_ctl.c mode change 100755 => 100644 stream/dap_stream_ctl.h mode change 100755 => 100644 stream/dap_stream_pkt.c mode change 100755 => 100644 stream/dap_stream_pkt.h diff --git a/session/dap_stream_session.c b/session/dap_stream_session.c old mode 100755 new mode 100644 index b0ca4b1..ffb7a6d --- a/session/dap_stream_session.c +++ b/session/dap_stream_session.c @@ -46,6 +46,23 @@ void dap_stream_session_deinit() } } +void dap_stream_session_list() +{ + dap_stream_session_t *current, *tmp; + + log_it(L_INFO,"=== sessions list ======"); + + HASH_ITER( hh, sessions, current, tmp ) { + log_it(L_INFO,"ID %u session %X", current->id, current); + +// HASH_DEL(sessions,current); +// stream_session_close2(current); + } + + log_it(L_INFO,"=== sessions list ======"); +} + + static void * session_check(void * data) { return NULL; @@ -55,6 +72,8 @@ static void * session_check(void * data) dap_stream_session_t * dap_stream_session_pure_new() { dap_stream_session_t * ret=NULL; + + printf("pire new\n" ); unsigned int session_id=0,session_id_new=0; do{ session_id_new=session_id=rand()+rand()*0x100+rand()*0x10000+rand()*0x01000000; @@ -69,6 +88,8 @@ dap_stream_session_t * dap_stream_session_pure_new() ret->enc_type = 0x01; // Default encryption type log_it(L_DEBUG,"Timestamp %u",(unsigned int) ret->time_created); HASH_ADD_INT(sessions,id,ret); + + printf("puew new ok\n" ); return ret; } @@ -78,25 +99,35 @@ dap_stream_session_t * dap_stream_session_new(unsigned int media_id, bool open_p ret->media_id=media_id; ret->open_preview=open_preview; ret->create_empty=false; + return ret; } -dap_stream_session_t * dap_stream_session_id(unsigned int id) +dap_stream_session_t *dap_stream_session_id( unsigned int id ) { - dap_stream_session_t * ret; - HASH_FIND_INT(sessions,&id,ret); + dap_stream_session_t *ret; + HASH_FIND_INT( sessions, &id, ret ); + return ret; } int dap_stream_session_close(unsigned int id) { - log_it(L_INFO,"Close session id=%d", id); - dap_stream_session_t *l_s = dap_stream_session_id(id); + log_it(L_INFO,"Close session id %u", id); +// sleep( 3 ); + + dap_stream_session_list(); + + dap_stream_session_t *l_s = dap_stream_session_id( id ); + if(!l_s) { - log_it(L_WARNING, "Session id=%d not found", id); + log_it(L_WARNING, "Session id %u not found", id); return -1; } + + printf("close ses ok\n" ); + return stream_session_close2(l_s); } @@ -110,10 +141,12 @@ int stream_session_close2(dap_stream_session_t * s) int dap_stream_session_open(dap_stream_session_t * ss) { + printf("close open\n" ); int ret; pthread_mutex_lock(&ss->mutex); ret=ss->opened; if(ss->opened==0) ss->opened=1; pthread_mutex_unlock(&ss->mutex); + printf("close open ok\n" ); return ret; } diff --git a/stream/dap_stream.c b/stream/dap_stream.c old mode 100755 new mode 100644 index 8a6f32e..b6b9709 --- a/stream/dap_stream.c +++ b/stream/dap_stream.c @@ -232,7 +232,7 @@ dap_stream_t * stream_new_udp(dap_client_remote_t * sh) sh->_internal=ret; - log_it(L_NOTICE,"New stream instance"); + log_it(L_NOTICE,"New stream instance udp"); return ret; } @@ -243,6 +243,7 @@ dap_stream_t * stream_new_udp(dap_client_remote_t * sh) */ void check_session(unsigned int id, dap_client_remote_t* cl){ dap_stream_session_t * ss=NULL; + ss=dap_stream_session_id(id); if(ss==NULL){ log_it(L_ERROR,"No session id %u was found",id); @@ -288,20 +289,28 @@ dap_stream_t * stream_new(dap_http_client_t * a_sh) ret->conn->_internal=ret; - log_it(L_NOTICE,"New stream instance"); return ret; } -void dap_stream_delete(dap_stream_t * a_stream) +void dap_stream_delete( dap_stream_t *a_stream ) { - if(a_stream == NULL) + log_it(L_ERROR,"dap_stream_delete( )"); + + if(a_stream == NULL) { + log_it(L_ERROR,"stream delete NULL instance"); return; + } size_t i; - for(i = 0; i < a_stream->channel_count; i++) + + for(i = 0; i < a_stream->channel_count; i++) { dap_stream_ch_delete(a_stream->channel[i]); - if(a_stream->session) + } + + if ( a_stream->session ) { dap_stream_session_close(a_stream->session->id); + } + free(a_stream); } @@ -330,6 +339,7 @@ dap_stream_t* dap_stream_new_es(dap_events_socket_t * a_es) void s_headers_write(dap_http_client_t * sh, void *arg) { (void) arg; + if(sh->reply_status_code==200){ dap_stream_t *sid=DAP_STREAM(sh->client); @@ -348,6 +358,7 @@ void s_headers_write(dap_http_client_t * sh, void *arg) // Function for keepalive loop static void keepalive_cb (EV_P_ ev_timer *w, int revents) { + struct dap_stream *sid = w->data; if(sid->keepalive_passed < STREAM_KEEPALIVE_PASSES) { @@ -360,6 +371,7 @@ static void keepalive_cb (EV_P_ ev_timer *w, int revents) void * arg; stream_dap_delete(sid->conn,arg); } + } /** @@ -399,9 +411,10 @@ void s_data_read(dap_client_remote_t* a_client, void * arg) dap_stream_t * l_stream =DAP_STREAM(a_client); int * ret = (int *) arg; - if (s_dump_packet_headers ) + if (s_dump_packet_headers ) { log_it(L_DEBUG,"dap_stream_data_read: client->buf_in_size=%u" , - a_client->_ready_to_write?"true":"false", a_client->buf_in_size ); + (a_client->flags & DAP_SOCK_READY_TO_WRITE)?"true":"false", a_client->buf_in_size ); + } *ret = dap_stream_data_proc_read( l_stream); } @@ -415,6 +428,7 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) { bool found_sig=false; dap_stream_pkt_t * pkt=NULL; + char *buf_in = (a_stream->conn) ? (char*)a_stream->conn->buf_in : (char*)a_stream->events_socket->buf_in; size_t buf_in_size = (a_stream->conn) ? a_stream->conn->buf_in_size : a_stream->events_socket->buf_in_size; uint8_t * proc_data = buf_in;//a_stream->conn->buf_in; @@ -556,6 +570,7 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) }else if(proc_data_defrag){ a_stream->buf_defrag_size=0; } + return buf_in_size;//a_stream->conn->buf_in_size; } @@ -569,7 +584,7 @@ void stream_dap_data_write(dap_client_remote_t* a_client , void * arg){ size_t i; (void) arg; bool ready_to_write=false; - // log_it(L_DEBUG,"Process channels data output (%u channels)",STREAM(sh)->channel_count); + log_it(L_DEBUG,"Process channels data output (%u channels)", DAP_STREAM(a_client )->channel_count ); for(i=0;i<DAP_STREAM(a_client )->channel_count; i++){ dap_stream_ch_t * ch = DAP_STREAM(a_client )->channel[i]; @@ -579,15 +594,18 @@ void stream_dap_data_write(dap_client_remote_t* a_client , void * arg){ ready_to_write|=ch->ready_to_write; } } - if (s_dump_packet_headers ) + if (s_dump_packet_headers ) { log_it(L_DEBUG,"dap_stream_data_write: ready_to_write=%s client->buf_out_size=%u" , ready_to_write?"true":"false", a_client->buf_out_size ); + } /* if(STREAM(sh)->conn_udp) dap_udp_client_ready_to_write(STREAM(sh)->conn,ready_to_write); else dap_client_ready_to_write(sh,ready_to_write);*/ //log_it(L_ERROR,"No stream_data_write_callback is defined"); + + log_it(L_ERROR,"stream_dap_data_write ok"); } /** @@ -639,6 +657,7 @@ static bool _detect_loose_packet(dap_stream_t * sid) // log_it(L_DEBUG, "Packet seq id: %d", ch_pkt->hdr.seq_id); // log_it(L_DEBUG, "Last seq id: %d", sid->last_seq_id_packet); sid->client_last_seq_id_packet = ch_pkt->hdr.seq_id; + return false; } diff --git a/stream/dap_stream.h b/stream/dap_stream.h old mode 100755 new mode 100644 diff --git a/stream/dap_stream_ctl.c b/stream/dap_stream_ctl.c old mode 100755 new mode 100644 diff --git a/stream/dap_stream_ctl.h b/stream/dap_stream_ctl.h old mode 100755 new mode 100644 diff --git a/stream/dap_stream_pkt.c b/stream/dap_stream_pkt.c old mode 100755 new mode 100644 index 48e0828..cce7c68 --- a/stream/dap_stream_pkt.c +++ b/stream/dap_stream_pkt.c @@ -49,7 +49,9 @@ dap_stream_pkt_t * dap_stream_pkt_detect(void * a_data, size_t data_size) { uint8_t * sig_start=(uint8_t*) a_data; dap_stream_pkt_t * ret=NULL; + size_t length_left=data_size; + while( (sig_start=memchr(sig_start, c_dap_stream_sig[0],length_left)) != NULL ){ length_left= data_size- (size_t) ( sig_start- (uint8_t *) a_data); if(length_left < sizeof(c_dap_stream_sig) ) @@ -62,8 +64,9 @@ dap_stream_pkt_t * dap_stream_pkt_detect(void * a_data, size_t data_size) } break; }else - sig_start+=1; + sig_start+=1; } + return ret; } @@ -139,6 +142,7 @@ size_t dap_stream_pkt_write(dap_stream_t * a_stream, const void * a_data, size_t ret += dap_events_socket_write(a_stream->events_socket, &pkt_hdr, sizeof(pkt_hdr)); ret += dap_events_socket_write(a_stream->events_socket, a_stream->buf, pkt_hdr.size); } + return ret; } @@ -148,6 +152,7 @@ size_t dap_stream_pkt_write(dap_stream_t * a_stream, const void * a_data, size_t */ void dap_stream_send_keepalive(dap_stream_t * a_stream) { + for(size_t i=0;i<a_stream->channel_count;i++) if(a_stream->channel[i]->proc){ if(a_stream->channel[i]->proc->id == SERVICE_CHANNEL_ID){ @@ -155,6 +160,7 @@ void dap_stream_send_keepalive(dap_stream_t * a_stream) dap_stream_ch_set_ready_to_write(a_stream->channel[i],true); } } + } diff --git a/stream/dap_stream_pkt.h b/stream/dap_stream_pkt.h old mode 100755 new mode 100644 -- GitLab