diff --git a/stream/CMakeLists.txt b/stream/CMakeLists.txt
index c4ceb87e94c6fb745ebb993e280fe08803f2da5c..29468770c8339598d1e46ea9b8f1dec86eff6b61 100644
--- a/stream/CMakeLists.txt
+++ b/stream/CMakeLists.txt
@@ -8,7 +8,7 @@ set(STREAM_SRCS
 
 add_library(${PROJECT_NAME} STATIC ${STREAM_SRCS})
 
-target_link_libraries(dap_stream dap_core dap_udp_server
+target_link_libraries(dap_stream dap_core dap_udp_server dap_client
     dap_crypto dap_http_server dap_enc_server dap_session dap_stream_ch)
 
 target_include_directories(dap_stream INTERFACE .)
diff --git a/stream/stream.c b/stream/stream.c
index e28062f5cf12caf14adbe695cacd72f951365799..be5117683724c9a38138a257242bbe376d217686 100644
--- a/stream/stream.c
+++ b/stream/stream.c
@@ -60,7 +60,7 @@ void stream_delete(dap_http_client_t * sh, void * arg);
 struct ev_loop *keepalive_loop;
 pthread_t keepalive_thread;
 
-void start_keepalive(struct stream *sid);
+void start_keepalive(struct dap_stream *sid);
 
 // Start keepalive stream
 void* stream_loop(void * arg)
@@ -76,7 +76,7 @@ void* stream_loop(void * arg)
  */
 int stream_init()
 {
-    if( stream_ch_init() != 0 ){
+    if( dap_stream_ch_init() != 0 ){
         log_it(L_CRITICAL, "Can't init channel types submodule");
         return -1;
     }
@@ -91,7 +91,7 @@ int stream_init()
  */
 void stream_deinit()
 {
-    stream_ch_deinit();
+    dap_stream_ch_deinit();
 }
 
 /**
@@ -121,7 +121,7 @@ void stream_add_proc_udp(dap_udp_server_t * sh)
  * @brief stream_states_update
  * @param sid stream instance
  */
-void stream_states_update(struct stream *sid)
+void stream_states_update(struct dap_stream *sid)
 {
     if(sid->conn_http)
         sid->conn_http->state_write=DAP_HTTP_CLIENT_STATE_START;
@@ -174,13 +174,13 @@ void stream_headers_read(dap_http_client_t * cl_ht, void * arg)
                         //cl_ht->client->ready_to_write=true;
                         cl_ht->state_read=DAP_HTTP_CLIENT_STATE_DATA;
                         cl_ht->out_content_ready=true;
-                        stream_ch_new(sid,SERVICE_CHANNEL_ID);
-                        stream_ch_new(sid,'t');
+                        dap_stream_ch_new(sid,SERVICE_CHANNEL_ID);
+                        dap_stream_ch_new(sid,'t');
                         stream_states_update(sid);
                         dap_client_remote_ready_to_read(cl_ht->client,true);
                     }else{
-                        stream_ch_new(sid,SERVICE_CHANNEL_ID);
-                        stream_ch_new(sid,'g');
+                        dap_stream_ch_new(sid,SERVICE_CHANNEL_ID);
+                        dap_stream_ch_new(sid,'g');
 
                         cl_ht->reply_status_code=200;
                         strcpy(cl_ht->reply_reason_phrase,"OK");
@@ -242,8 +242,8 @@ void check_session(unsigned int id, dap_client_remote_t* cl){
             if(ss->create_empty)
                 log_it(L_INFO, "Session created empty");       
             log_it(L_INFO, "Opened stream session technical and data channels");
-            stream_ch_new(sid,SERVICE_CHANNEL_ID);
-            stream_ch_new(sid,DATA_CHANNEL_ID);
+            dap_stream_ch_new(sid,SERVICE_CHANNEL_ID);
+            dap_stream_ch_new(sid,DATA_CHANNEL_ID);
             stream_states_update(sid);
             if(STREAM(cl)->conn_udp)
                 dap_udp_client_ready_to_read(cl,true);
@@ -275,6 +275,24 @@ dap_stream_t * stream_new(dap_http_client_t * sh)
     return ret;
 }
 
+
+/**
+ * @brief dap_stream_new_es
+ * @param a_es
+ * @return
+ */
+dap_stream_t* dap_stream_new_es(dap_events_socket_t * a_es)
+{
+    dap_stream_t * ret= DAP_NEW_Z(dap_stream_t);
+
+    ret->events_socket = a_es;
+    ret->buf_defrag_size=0;
+    ret->is_client_to_uplink = true;
+
+    log_it(L_NOTICE,"New stream with events socket instance for %s",a_es->hostaddr);
+    return ret;
+}
+
 /**
  * @brief stream_headers_write Prepare headers for output. Creates stream structure
  * @param sh HTTP client instance
@@ -301,7 +319,7 @@ void stream_headers_write(dap_http_client_t * sh, void *arg)
 // Function for keepalive loop
 static void keepalive_cb (EV_P_ ev_timer *w, int revents)
 {
-    struct stream *sid = w->data;
+    struct dap_stream *sid = w->data;
     if(sid->keepalive_passed < STREAM_KEEPALIVE_PASSES)
     {
         stream_send_keepalive(sid);
@@ -319,7 +337,7 @@ static void keepalive_cb (EV_P_ ev_timer *w, int revents)
  * @brief start_keepalive Start keepalive signals exchange for stream
  * @param sid Stream instance
  */
-void start_keepalive(struct stream *sid){
+void start_keepalive(struct dap_stream *sid){
     keepalive_loop = EV_DEFAULT;
     sid->keepalive_watcher.data = sid;
     ev_timer_init (&sid->keepalive_watcher, keepalive_cb, STREAM_KEEPALIVE_TIMEOUT, STREAM_KEEPALIVE_TIMEOUT);
@@ -342,14 +360,28 @@ void stream_data_write(dap_http_client_t * sh, void * arg)
     }
 }
 
-
-
+/**
+ * @brief stream_dap_data_read
+ * @param sh
+ * @param arg
+ */
 void stream_dap_data_read(dap_client_remote_t* sh, void * arg)
 {
-    dap_stream_t * a_stream =STREAM(sh);
+    dap_stream_t * l_stream =STREAM(sh);
     int * ret = (int *) arg;
+
+     *ret = dap_stream_data_proc_read( l_stream);
+}
+
+/**
+ * @brief dap_stream_data_proc_read
+ * @param a_stream
+ * @return
+ */
+size_t dap_stream_data_proc_read (dap_stream_t *a_stream)
+{
     bool found_sig=false;
-    stream_pkt_t * pkt=NULL;
+    dap_stream_pkt_t * pkt=NULL;
     uint8_t * proc_data=  a_stream->conn->buf_in;
     bool proc_data_defrag=false; // We are or not in defrag buffer
     size_t read_bytes_to=0;
@@ -359,13 +391,13 @@ void stream_dap_data_read(dap_client_remote_t* sh, void * arg)
         if(a_stream->pkt_buf_in_data_size < sizeof(stream_pkt_hdr_t))
         {
             //At first read header
-            stream_pkt_t* check_pkt = stream_pkt_detect( proc_data , sizeof(stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size);
+            dap_stream_pkt_t* check_pkt = stream_pkt_detect( proc_data , sizeof(stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size);
             if(check_pkt){
                 // Got duplication of packet header several times
                 //log_it(L_DEBUG, "Drop incorrect header part");
                 a_stream->pkt_buf_in = NULL;
                 a_stream->pkt_buf_in_data_size=0;
-                return;
+                return 0;
             }
             if(sizeof(stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size > bytes_left_to_read)
                 read_bytes_to = bytes_left_to_read;
@@ -433,7 +465,7 @@ void stream_dap_data_read(dap_client_remote_t* sh, void * arg)
         size_t pkt_offset=( ((uint8_t*)pkt)- proc_data );
         bytes_left_to_read -= pkt_offset ;
         found_sig=true;
-        stream_pkt_t* temp_pkt = stream_pkt_detect( (uint8_t*)pkt + 1 ,pkt->hdr.size+sizeof(stream_pkt_hdr_t) );
+        dap_stream_pkt_t* temp_pkt = stream_pkt_detect( (uint8_t*)pkt + 1 ,pkt->hdr.size+sizeof(stream_pkt_hdr_t) );
         if(bytes_left_to_read  <(pkt->hdr.size+sizeof(stream_pkt_hdr_t) )){ // Is all the packet in da buf?
             read_bytes_to=bytes_left_to_read;
         }else{
@@ -444,7 +476,7 @@ void stream_dap_data_read(dap_client_remote_t* sh, void * arg)
         if(read_bytes_to > HEADER_WITH_SIZE_FIELD){ // If we have size field, we can allocate memory
             a_stream->pkt_buf_in_size_expected =( pkt->hdr.size+sizeof(stream_pkt_hdr_t));
             size_t pkt_buf_in_size_expected=a_stream->pkt_buf_in_size_expected;
-            a_stream->pkt_buf_in=(stream_pkt_t *) malloc(pkt_buf_in_size_expected);
+            a_stream->pkt_buf_in=(dap_stream_pkt_t *) malloc(pkt_buf_in_size_expected);
             if(read_bytes_to>(pkt->hdr.size+sizeof(stream_pkt_hdr_t) )){
                 //log_it(L_WARNING,"For some strange reasons we have read_bytes_to=%u is bigger than expected pkt length(%u bytes). Dropped %u bytes",
                 //       pkt->hdr.size+sizeof(stream_pkt_hdr_t),read_bytes_to- pkt->hdr.size+sizeof(stream_pkt_hdr_t));
@@ -489,7 +521,7 @@ void stream_dap_data_read(dap_client_remote_t* sh, void * arg)
     }else if(proc_data_defrag){
         a_stream->buf_defrag_size=0;
     }
-    *ret = a_stream->conn->buf_in_size;
+    return a_stream->conn->buf_in_size;
 }
 
 
@@ -534,7 +566,7 @@ void stream_dap_delete(dap_client_remote_t* sh, void * arg){
     (void) arg;
     size_t i;
     for(i=0;i<sid->channel_count; i++)
-        stream_ch_delete(sid->channel[i]);
+        dap_stream_ch_delete(sid->channel[i]);
     if(sid->session)
         dap_stream_session_close(sid->session->id);
     //free(sid);
diff --git a/stream/stream.h b/stream/stream.h
index 950f9bc5ae4f6a5b22ecbe3d7320aeac91752867..f3e2c14ab7e0a09b63d281ffc79f968924db7555 100644
--- a/stream/stream.h
+++ b/stream/stream.h
@@ -33,25 +33,25 @@
 #include "dap_stream_ch.h"
 #include "dap_udp_server.h"
 #include "dap_udp_client.h"
-
+#include "dap_events_socket.h"
 
 #define CHUNK_SIZE_MAX 3*1024
 
-struct dap_client_remote;
-struct dap_udp_server_t;
+typedef struct dap_client_remote dap_client_remote_t;
+typedef struct dap_udp_server dap_udp_server_t;
 
 
-struct dap_http_client;
-struct dap_http;
-struct stream;
-struct stream_pkt;
+typedef struct dap_http_client dap_http_client_t;
+typedef struct dap_http dap_http_t;
+typedef struct dap_stream dap_stream_t;
+typedef struct dap_stream_pkt dap_stream_pkt_t;
 #define STREAM_BUF_SIZE_MAX 20480
 #define STREAM_KEEPALIVE_TIMEOUT 3   // How  often send keeplive messages (seconds)
 #define STREAM_KEEPALIVE_PASSES 3    // How many messagges without answers need for disconnect client and close session
 
-typedef void (*stream_callback)(struct stream*,void*);
+typedef void (*stream_callback)(struct dap_stream*,void*);
 
-typedef struct stream {
+typedef struct dap_stream {
 
     int id;
     dap_stream_session_t * session;
@@ -61,13 +61,16 @@ typedef struct stream {
 
     struct dap_udp_client * conn_udp; // UDP-client
 
+    dap_events_socket_t * events_socket;
+
     bool is_live;
+    bool is_client_to_uplink ;
 
     ev_timer keepalive_watcher;         // Watcher for keepalive loop
     uint8_t keepalive_passed;           // Number of sended keepalive messages
 
-    struct stream_pkt * in_pkt;
-    struct stream_pkt *pkt_buf_in;
+    struct dap_stream_pkt * in_pkt;
+    struct dap_stream_pkt *pkt_buf_in;
     size_t pkt_buf_in_data_size;
     size_t pkt_buf_in_size_expected;
 
@@ -96,4 +99,13 @@ void stream_add_proc_http(struct dap_http * sh, const char * url);
 
 void stream_add_proc_udp(dap_udp_server_t * sh);
 
+dap_stream_t* dap_stream_new_es(dap_events_socket_t * a_es);
+size_t dap_stream_data_proc_read(dap_stream_t * a_stream);
+size_t dap_stream_data_proc_write(dap_stream_t * a_stream);
+void dap_stream_delete(dap_stream_t * a_stream);
+void dap_stream_proc_pkt_in(dap_stream_t * sid);
+
+void dap_stream_es_rw_states_update(struct dap_stream *a_stream);
+
+
 #endif
diff --git a/stream/stream_ctl.c b/stream/stream_ctl.c
index 7255f9201d67c01854c1d4617449a229f5ebc2b2..ac596f0408d5b6a48433b69edacdc441032ec904 100644
--- a/stream/stream_ctl.c
+++ b/stream/stream_ctl.c
@@ -89,48 +89,33 @@ void stream_ctl_proc(struct dap_http_simple *cl_st, void * arg)
    // unsigned int proto_version;
 	dap_stream_session_t * ss=NULL;
    // unsigned int action_cmd=0;
-    bool openPreview;
-    bool socket_forward=false;
+    bool l_new_session = false;
 
     enc_http_delegate_t *dg = enc_http_request_decode(cl_st);
 
     if(dg){
-        if(strcmp(dg->url_path,"open")==0)
-            openPreview=false;
-        else if (strcmp(dg->url_path,"open_preview")==0)
-            openPreview=true;
-        else if (strcmp(dg->url_path,"socket_forward")==0){
-            socket_forward=true;
+        if (strcmp(dg->url_path,"socket_forward")==0){
+            l_new_session = true;
+        }else if (strcmp(dg->url_path,"stream_ctl")==0) {
+            l_new_session = true;
         }else{
             log_it(L_ERROR,"ctl command unknown: %s",dg->url_path);
             enc_http_delegate_delete(dg);
             *isOk=false;
             return;
         }
-        if(socket_forward){
-            log_it(L_INFO,"[ctl] Play request for db_id=%d",db_id);
+        if(l_new_session){
 
             ss = dap_stream_session_pure_new();
-
             char *key_str = calloc(1, KEX_KEY_STR_SIZE);
             dap_random_string_fill(key_str, KEX_KEY_STR_SIZE);
             ss->key = dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_IAES, key_str, strlen(key_str), NULL, 0, 0);
             enc_http_reply_f(dg,"%u %s",ss->id,key_str);
             dg->isOk=true;
 
+            log_it(L_INFO," New stream session %u initialized",ss->id);
+
             free(key_str);
-        }else if(sscanf( dg->in_query ,"db_id=%u",&db_id)==1){
-//            log_it(L_INFO,"[ctl] Play request for db_id=%d",db_id);
-//            ss=dap_stream_session_new(db_id,openPreview);
-
-//            char key_str[255];
-//            for(int i = 0; i < sizeof(key_str); i++)
-//                key_str[i] = 65 + rand() % 25;
-
-//            ss->key=dap_enc_key_new_from_str(DAP_ENC_KEY_TYPE_AES,key_str);
-//            enc_http_reply_f(dg,"%u %s",ss->id,key_str);
-//            dg->isOk=true;
-//            log_it(L_DEBUG,"Stream AES key string %s",key_str);
         }else{
             log_it(L_ERROR,"Wrong request: \"%s\"",dg->in_query);
             dg->isOk=false;
diff --git a/stream/stream_pkt.c b/stream/stream_pkt.c
index 7b377d998a9639fcb9a270fad05437e639ed5771..88779e286769d30bf2651b49dbb0d4b2dfef0b15 100644
--- a/stream/stream_pkt.c
+++ b/stream/stream_pkt.c
@@ -46,10 +46,10 @@ const size_t dap_hdr_size=8+2+1+1+4;
 const uint8_t dap_sig[8]={0xa0,0x95,0x96,0xa9,0x9e,0x5c,0xfb,0xfa};
 
 
-stream_pkt_t * stream_pkt_detect(void * data, uint32_t data_size)
+dap_stream_pkt_t * stream_pkt_detect(void * data, uint32_t data_size)
 {
     void * sig_start=data;
-    stream_pkt_t * ret=NULL;
+    dap_stream_pkt_t * ret=NULL;
     uint32_t length_left=data_size;
     while(sig_start=memchr(sig_start, dap_sig[0],length_left) ){
         length_left= data_size-( sig_start-data);
@@ -81,7 +81,7 @@ size_t encode_dummy(const void * buf, const size_t buf_size, void * buf_out){
  * @param pkt
  * @param buf_out
  */
-size_t stream_pkt_read(struct stream * sid,struct stream_pkt * pkt, void * buf_out, size_t buf_out_size)
+size_t stream_pkt_read(struct dap_stream * sid,struct dap_stream_pkt * pkt, void * buf_out, size_t buf_out_size)
 {
     size_t ds = dap_enc_iaes256_cbc_decrypt_fast(sid->session->key,pkt->data,pkt->hdr.size,buf_out, buf_out_size);
 //    log_it(L_DEBUG,"Stream decoded %lu bytes ( last bytes 0x%02x 0x%02x 0x%02x 0x%02x ) ", ds,
@@ -104,7 +104,7 @@ size_t stream_pkt_read(struct stream * sid,struct stream_pkt * pkt, void * buf_o
  * @return
  */
 
-size_t stream_pkt_write(struct stream * sid, const void * data, uint32_t data_size)
+size_t stream_pkt_write(struct dap_stream * sid, const void * data, uint32_t data_size)
 {
     size_t ret=0;
     stream_pkt_hdr_t pkt_hdr;
@@ -131,13 +131,13 @@ size_t stream_pkt_write(struct stream * sid, const void * data, uint32_t data_si
 }
 
 
-extern void stream_send_keepalive(struct stream * sid)
+extern void stream_send_keepalive(struct dap_stream * sid)
 {
     for(int i=0;i<sid->channel_count;i++)
     if(sid->channel[i]->proc){
         if(sid->channel[i]->proc->id == SERVICE_CHANNEL_ID)
             stream_ch_send_keepalive(sid->channel[i]);
-            stream_ch_set_ready_to_write(sid->channel[i],true);            
+            dap_stream_ch_set_ready_to_write(sid->channel[i],true);            
     }
 }
 
diff --git a/stream/stream_pkt.h b/stream/stream_pkt.h
index 53b608c9928dd1e06123ae4076036c7384bfc4c8..38a2ce439107356e195963146cfaf371563dd1aa 100644
--- a/stream/stream_pkt.h
+++ b/stream/stream_pkt.h
@@ -25,7 +25,7 @@
 #include <stddef.h>
 
 #define STREAM_PKT_SIZE_MAX 100000
-struct stream;
+struct dap_stream;
 
 #define DATA_PACKET 0x00
 #define SERVICE_PACKET 0xff
@@ -40,10 +40,10 @@ typedef struct stream_pkt_hdr{
     uint64_t d_addr; // Destination address ( general#domain.net )
 }  __attribute__((packed)) stream_pkt_hdr_t;
 
-typedef struct stream_pkt{
+typedef struct dap_stream_pkt{
     stream_pkt_hdr_t hdr;
     uint8_t data[];
-}  __attribute__((packed)) stream_pkt_t;
+}  __attribute__((packed)) dap_stream_pkt_t;
 
 typedef struct stream_srv_pkt{
     uint32_t session_id;
@@ -54,12 +54,12 @@ typedef struct stream_srv_pkt{
 
 extern const uint8_t dap_sig[8];
 
-extern stream_pkt_t * stream_pkt_detect(void * data, uint32_t data_size);
+extern dap_stream_pkt_t * stream_pkt_detect(void * data, uint32_t data_size);
 
-extern size_t stream_pkt_read(struct stream * sid,struct stream_pkt * pkt, void * buf_out, size_t buf_out_size);
+extern size_t stream_pkt_read(struct dap_stream * sid,struct dap_stream_pkt * pkt, void * buf_out, size_t buf_out_size);
 
-extern size_t stream_pkt_write(struct stream * sid, const void * data, uint32_t data_size);
+extern size_t stream_pkt_write(struct dap_stream * sid, const void * data, uint32_t data_size);
 
-extern void stream_send_keepalive(struct stream * sid);
+extern void stream_send_keepalive(struct dap_stream * sid);
 
 #endif