From 00d195d6ae3875a884e875c6eca0ae0f1bbc196d Mon Sep 17 00:00:00 2001
From: Ivan Deniskin <ivanmordwin@yandex.ru>
Date: Fri, 27 Apr 2018 14:14:36 +0300
Subject: [PATCH] keepalive in channel level

---
 stream/stream.c        | 46 +++++++++++++++---------------------------
 stream/stream_ch.h     |  3 +++
 stream/stream_ch_pkt.c | 24 ++++++++++++++++++++++
 stream/stream_ch_pkt.h |  4 ++++
 stream/stream_pkt.c    | 16 ++++++---------
 5 files changed, 53 insertions(+), 40 deletions(-)

diff --git a/stream/stream.c b/stream/stream.c
index a68a3659f5..87cb18a1fd 100644
--- a/stream/stream.c
+++ b/stream/stream.c
@@ -169,12 +169,12 @@ void stream_headers_read(dap_http_client_t * cl_ht, void * arg)
                         //cl_ht->client->ready_to_write=true;
                         cl_ht->state_read=DAP_HTTP_CLIENT_STATE_DATA;
                         cl_ht->out_content_ready=true;
-                        stream_ch_new(sid,'s');
+                        stream_ch_new(sid,SERVICE_CHANNEL_ID);
                         stream_ch_new(sid,'t');
                         stream_states_update(sid);
                         dap_client_ready_to_read(cl_ht->client,true);
                     }else{
-                        stream_ch_new(sid,'s');
+                        stream_ch_new(sid,SERVICE_CHANNEL_ID);
                         stream_ch_new(sid,'g');
 
                         cl_ht->reply_status_code=200;
@@ -233,26 +233,16 @@ void check_session(unsigned int id, dap_client_remote_t* cl){
             else
                 sid = STREAM(cl);
             sid->session=ss;
-            if(ss->create_empty){
-                log_it(L_INFO, "Opened stream session with only technical channels");
-                stream_ch_new(sid,'s');
-                stream_ch_new(sid,'t');
-                stream_states_update(sid);
-                if(STREAM(cl)->conn_udp)
-                    dap_udp_client_ready_to_read(cl,true);
-                else
-                    dap_client_ready_to_read(cl,true);
-            }else{
-                stream_ch_new(sid,'s');
-                stream_ch_new(sid,'g');
-
-                if(STREAM(cl)->conn_udp)
-                    dap_udp_client_ready_to_read(cl->_inheritor,true);
-                else
-                    dap_client_ready_to_read(cl,true);
-                stream_states_update(sid);
-
-            }
+            if(ss->create_empty)
+                log_it(L_INFO, "Session created empty");       
+            log_it(L_INFO, "Opened stream session technical and data channels");
+            stream_ch_new(sid,SERVICE_CHANNEL_ID);
+            stream_ch_new(sid,DATA_CHANNEL_ID);
+            stream_states_update(sid);
+            if(STREAM(cl)->conn_udp)
+                dap_udp_client_ready_to_read(cl,true);
+            else
+                dap_client_ready_to_read(cl,true);
             start_keepalive(sid);
         }else{
             log_it(L_ERROR,"Can't open session id %u",id);
@@ -471,7 +461,8 @@ void stream_proc_pkt_in(stream_t * sid)
             if(ch->proc)
                 if(ch->proc->packet_in_callback)
                     ch->proc->packet_in_callback(ch,ch_pkt);
-
+            if(ch->proc->id == SERVICE_CHANNEL_ID && ch_pkt->hdr.type == KEEPALIVE_PACKET)
+                stream_send_keepalive(sid);
         }else{
             log_it(L_WARNING, "Input: unprocessed channel packet id '%c'",(char) ch_pkt->hdr.id );
         }
@@ -485,13 +476,8 @@ void stream_proc_pkt_in(stream_t * sid)
         check_session(session_id,sid->conn);
         free(srv_pkt);
     }
-    else if(sid->pkt_buf_in->hdr.type == KEEPALIVE_PACKET)
-    {
-        //Send keepalive answer and restart timer
-        stream_send_keepalive(sid);
-        sid->keepalive_passed = 0;
-        ev_timer_again (keepalive_loop, &sid->keepalive_watcher);
-    }
+    sid->keepalive_passed = 0;
+    ev_timer_again (keepalive_loop, &sid->keepalive_watcher);
     free(sid->pkt_buf_in);
     sid->pkt_buf_in=NULL;
     sid->pkt_buf_in_data_size=0;
diff --git a/stream/stream_ch.h b/stream/stream_ch.h
index 2c00b1bb32..91c50adfdd 100644
--- a/stream/stream_ch.h
+++ b/stream/stream_ch.h
@@ -29,6 +29,9 @@ struct stream_pkt;
 struct stream_ch_proc;
 struct stream_ch;
 
+#define SERVICE_CHANNEL_ID 's'
+#define DATA_CHANNEL_ID 'd'
+
 typedef void (*stream_ch_callback_t) (struct stream_ch*,void*);
 
 typedef struct stream_ch{
diff --git a/stream/stream_ch_pkt.c b/stream/stream_ch_pkt.c
index 8e661db0ef..9a77d2bddc 100644
--- a/stream/stream_ch_pkt.c
+++ b/stream/stream_ch_pkt.c
@@ -113,3 +113,27 @@ size_t stream_ch_pkt_write_f(struct stream_ch * ch, uint8_t type, const char * s
     size_t ret=stream_ch_pkt_write(ch,type,buf,strlen(buf));
     return ret;
 }
+
+/**
+ * @brief stream_ch_send_keepalive
+ * @param ch
+ * @return
+ */
+size_t stream_ch_send_keepalive(struct stream_ch * ch){
+    pthread_mutex_lock( &ch->mutex);
+
+    stream_ch_pkt_hdr_t hdr;
+
+    memset(&hdr,0,sizeof(hdr));
+    hdr.id = ch->proc->id;
+    hdr.size=0;
+    hdr.type=KEEPALIVE_PACKET;
+    hdr.enc_type = ch->proc->enc_type;
+    hdr.seq_id=0;
+
+    memcpy(ch->buf,&hdr,sizeof(hdr) );
+
+    size_t ret=stream_pkt_write(ch->stream,ch->buf,sizeof(hdr));
+    pthread_mutex_unlock( &ch->mutex);
+    return ret;
+}
diff --git a/stream/stream_ch_pkt.h b/stream/stream_ch_pkt.h
index e2be229c2e..4e8fcb0b82 100644
--- a/stream/stream_ch_pkt.h
+++ b/stream/stream_ch_pkt.h
@@ -22,6 +22,8 @@
 #ifndef _STREAM_CH_PKT_H_
 #define _STREAM_CH_PKT_H_
 
+#define KEEPALIVE_PACKET 0x11
+
 #include <stdint.h>
 #include <stddef.h>
 struct stream_ch;
@@ -47,4 +49,6 @@ extern size_t stream_ch_pkt_write_f(struct stream_ch * ch, uint8_t type, const c
 extern size_t stream_ch_pkt_write(struct stream_ch * ch,  uint8_t type, const void * data, uint32_t data_size);
 extern size_t stream_ch_pkt_write_seq_id(struct stream_ch * ch,  uint8_t type, uint64_t seq_id, const void * data, uint32_t data_size);
 
+extern size_t stream_ch_send_keepalive(struct stream_ch * ch);
+
 #endif
diff --git a/stream/stream_pkt.c b/stream/stream_pkt.c
index 05d5d89729..373f7ff5b0 100644
--- a/stream/stream_pkt.c
+++ b/stream/stream_pkt.c
@@ -120,7 +120,6 @@ size_t stream_pkt_write(struct stream * sid, const void * data, uint32_t data_si
     //pkt_hdr.size = dap_enc_code(sid->session->key,data,data_size,sid->buf,DAP_ENC_DATA_TYPE_RAW);
     pkt_hdr.size = encode_dummy(data,data_size,sid->buf);
 
-
     if(sid->conn_udp){
         ret+=dap_udp_client_write(sid->conn,&pkt_hdr,sizeof(pkt_hdr));
         ret+=dap_udp_client_write(sid->conn,sid->buf,pkt_hdr.size);
@@ -135,15 +134,12 @@ size_t stream_pkt_write(struct stream * sid, const void * data, uint32_t data_si
 
 extern void stream_send_keepalive(struct stream * sid)
 {
-    stream_pkt_hdr_t pkt_hdr;
-    memset(&pkt_hdr,0,sizeof(pkt_hdr));
-    memcpy(pkt_hdr.sig,dap_sig,sizeof(pkt_hdr.sig));
-    pkt_hdr.type = KEEPALIVE_PACKET;
-    if(sid->conn_udp)
-        dap_udp_client_write(sid->conn,&pkt_hdr,sizeof(pkt_hdr));
-    else
-        dap_client_write(sid->conn,&pkt_hdr,sizeof(pkt_hdr));
-    dap_client_ready_to_write(sid->conn,true);
+    for(int i=0;i<sid->channel_count;i++)
+    if(sid->channel[i]->proc){
+        if(sid->channel[i]->proc->id == SERVICE_CHANNEL_ID)
+            stream_ch_send_keepalive(sid->channel[i]);
+            stream_ch_set_ready_to_write(sid->channel[i],true);            
+    }
 }
 
 
-- 
GitLab