From dc525154e57a3dffa6ae43508eaa545e0ba1cf80 Mon Sep 17 00:00:00 2001
From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net>
Date: Thu, 27 Aug 2020 11:58:52 +0700
Subject: [PATCH] [+] esocket reassigment between workers: queue, msg and
 function [*] Some renames and small fixes, as usual [*] Reworked http_simple
 to work with proc_thread. Example of how to throw esocket's inheritor to the
 proc context and then back it to worker

---
 dap-sdk/net/core/dap_events_socket.c          |  24 ++
 dap-sdk/net/core/dap_worker.c                 |  43 +++-
 dap-sdk/net/core/include/dap_events_socket.h  |   4 +
 dap-sdk/net/core/include/dap_worker.h         |  14 +-
 dap-sdk/net/server/enc_server/dap_enc_http.c  |  24 +-
 .../net/server/http_server/dap_http_simple.c  | 241 +++++-------------
 .../http_server/http_client/dap_http_client.c | 130 +++++-----
 .../http_server/include/dap_http_simple.h     |  45 ++--
 dap-sdk/net/stream/stream/dap_stream.c        |   8 +-
 dap-sdk/net/stream/stream/dap_stream_ctl.c    |   2 +-
 modules/mempool/dap_chain_mempool.c           |   6 +-
 11 files changed, 245 insertions(+), 296 deletions(-)

diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c
index 66f80723a3..d9f243e987 100644
--- a/dap-sdk/net/core/dap_events_socket.c
+++ b/dap-sdk/net/core/dap_events_socket.c
@@ -110,6 +110,11 @@ void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct da
     dap_worker_add_events_socket(a_es,a_worker);
 }
 
+void dap_events_socket_reassign_between_workers_unsafe(dap_worker_t * a_worker_old, dap_events_socket_t * a_es, dap_worker_t * a_worker_new)
+{
+    dap_events_socket_queue_ptr_send(a_worker_old->queue_es_reassign, a_worker_new );
+}
+
 /**
  * @brief dap_events_socket_assign_on_worker_unsafe
  * @param a_es
@@ -614,6 +619,25 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap
         HASH_DELETE(hh_worker,a_worker->esockets, a_es);
 }
 
+/**
+ * @brief dap_events_socket_check_unsafe
+ * @param a_worker
+ * @param a_es
+ * @return
+ */
+bool dap_events_socket_check_unsafe(dap_worker_t * a_worker,dap_events_socket_t * a_es)
+{
+    if (a_es){
+        if ( a_worker->esockets){
+            dap_events_socket_t * l_es = NULL;
+            HASH_FIND(hh_worker,a_worker->esockets,&a_es, sizeof(a_es), l_es );
+            return l_es == a_es;
+        }else
+            return false;
+    }else
+        return false;
+}
+
 /**
  * @brief dap_events_socket_remove_and_delete
  * @param a_es
diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c
index f26cb7dc6b..c8f94ebf86 100644
--- a/dap-sdk/net/core/dap_worker.c
+++ b/dap-sdk/net/core/dap_worker.c
@@ -42,7 +42,7 @@ static time_t s_connection_timeout = 20000;
 static void s_socket_all_check_activity( void * a_arg);
 static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg);
 static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg);
-static void s_queue_reassign_es_callback( dap_events_socket_t * a_es, void * a_arg);
+static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_arg);
 static void s_queue_callback_callback( dap_events_socket_t * a_es, void * a_arg);
 static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg);
 
@@ -83,6 +83,7 @@ void *dap_worker_thread(void *arg)
     l_worker->queue_es_new = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_new_es_callback);
     l_worker->queue_es_delete = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_delete_es_callback);
     l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_io_callback);
+    l_worker->queue_es_reassign = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_reassign_callback );
     l_worker->queue_callback= dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_callback_callback);
     l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker,s_connection_timeout / 2,s_socket_all_check_activity,l_worker);
 
@@ -396,7 +397,11 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg)
  */
 static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg)
 {
-    ((dap_events_socket_t*)a_arg)->kill_signal = true; // Send signal to socket to kill
+    dap_events_socket_t * l_esocket = (dap_events_socket_t*) a_arg;
+    if (dap_events_socket_check_unsafe(a_es->worker,l_esocket)){
+        ((dap_events_socket_t*)a_arg)->kill_signal = true; // Send signal to socket to kill
+    }else
+        log_it(L_INFO, "While we were sending the delete() message, esocket %p has been disconnected", l_esocket);
 }
 
 /**
@@ -404,14 +409,21 @@ static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg
  * @param a_es
  * @param a_arg
  */
-static void s_queue_reassign_es_callback( dap_events_socket_t * a_es, void * a_arg)
+static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_arg)
 {
-    dap_events_socket_t * l_es_reassign = ((dap_events_socket_t* ) a_arg);
-    dap_events_socket_remove_from_worker_unsafe( l_es_reassign, a_es->worker );
-    if (l_es_reassign->callbacks.worker_unassign_callback)
-        l_es_reassign->callbacks.worker_unassign_callback(l_es_reassign, a_es->worker);
+    dap_worker_msg_reassign_t * l_msg = (dap_worker_msg_reassign_t*) a_arg;
+    if (dap_events_socket_check_unsafe(a_es->worker,l_msg->esocket)){
+        dap_events_socket_t * l_es_reassign = l_msg->esocket;
+        dap_events_socket_remove_from_worker_unsafe( l_es_reassign, a_es->worker );
 
-    dap_events_socket_assign_on_worker_mt( l_es_reassign, l_es_reassign->worker );
+        if (l_es_reassign->callbacks.worker_unassign_callback)
+            l_es_reassign->callbacks.worker_unassign_callback(l_es_reassign, a_es->worker);
+
+        dap_events_socket_assign_on_worker_mt( l_es_reassign, l_msg->worker_new );
+    }else{
+        log_it(L_INFO, "While we were sending the reassign message, esocket %p has been disconnected", l_msg->esocket);
+    }
+    DAP_DELETE(l_msg);
 }
 
 /**
@@ -424,7 +436,8 @@ static void s_queue_callback_callback( dap_events_socket_t * a_es, void * a_arg)
     dap_worker_msg_callback_t * l_msg = (dap_worker_msg_callback_t *) a_arg;
     assert(l_msg);
     assert(l_msg->callback);
-    l_msg->callback(a_es->worker);
+    l_msg->callback(a_es->worker, l_msg->arg);
+    DAP_DELETE(l_msg);
 }
 
 /**
@@ -495,6 +508,18 @@ void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_wor
     dap_events_socket_queue_ptr_send( a_worker->queue_es_new, a_events_socket );
 }
 
+/**
+ * @brief dap_worker_exec_callback_on
+ */
+void dap_worker_exec_callback_on(dap_worker_t * a_worker, dap_worker_callback_t a_callback, void * a_arg)
+{
+    dap_worker_msg_callback_t * l_msg = DAP_NEW_Z(dap_worker_msg_callback_t);
+    l_msg->callback = a_callback;
+    l_msg->arg = a_arg;
+    dap_events_socket_queue_ptr_send( a_worker->queue_callback,l_msg );
+}
+
+
 /**
  * @brief dap_worker_add_events_socket
  * @param a_worker
diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h
index 03edccf0e1..83f6627ab7 100644
--- a/dap-sdk/net/core/include/dap_events_socket.h
+++ b/dap-sdk/net/core/include/dap_events_socket.h
@@ -205,11 +205,14 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da
 void dap_events_socket_assign_on_worker_unsafe(dap_events_socket_t * a_es, struct dap_worker * a_worker);
 void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct dap_worker * a_worker);
 
+void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, dap_events_socket_t * a_es, dap_worker_t * a_worker_new);
+
 dap_events_socket_t * dap_events_socket_find_unsafe(int sock, struct dap_events * sh); // Find client by socket
 
 size_t dap_events_socket_pop_from_buf_in(dap_events_socket_t *sc, void * data, size_t data_size);
 
 // Non-MT functions
+bool dap_events_socket_check_unsafe(dap_worker_t * a_worker,dap_events_socket_t * a_es);
 void dap_events_socket_set_readable_unsafe(dap_events_socket_t * sc,bool is_ready);
 void dap_events_socket_set_writable_unsafe(dap_events_socket_t * sc,bool is_ready);
 
@@ -227,3 +230,4 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool
 void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap_worker_t * a_worker);
 void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size);
 
+
diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/net/core/include/dap_worker.h
index df770f2fd5..cb08c3dbeb 100644
--- a/dap-sdk/net/core/include/dap_worker.h
+++ b/dap-sdk/net/core/include/dap_worker.h
@@ -39,6 +39,7 @@ typedef struct dap_worker
     // worker control queues
     dap_events_socket_t * queue_es_new; // Events socket for new socket
     dap_events_socket_t * queue_es_delete; // Events socke
+    dap_events_socket_t * queue_es_reassign; // Reassign between workers
     dap_events_socket_t * queue_es_io; // Events socket for new socket
 
     dap_events_socket_t * queue_callback; // Queue for pure callback on worker
@@ -51,6 +52,13 @@ typedef struct dap_worker
     void * _inheritor;
 } dap_worker_t;
 
+// Message for reassigment
+typedef struct dap_worker_msg_reassign{
+    dap_events_socket_t * esocket;
+    dap_worker_t * worker_new;
+} dap_worker_msg_reassign_t;
+
+// Message for input/output queue
 typedef struct dap_worker_msg_io{
     dap_events_socket_t * esocket;
     size_t data_size;
@@ -59,8 +67,11 @@ typedef struct dap_worker_msg_io{
     uint32_t flags_unset;
 } dap_worker_msg_io_t;
 
+// Message for callback execution
+typedef void (*dap_worker_callback_t)(dap_worker_t *,void *);
 typedef struct dap_worker_msg_callback{
-    void (*callback) (dap_worker_t *); // Callback for specific client operations
+    dap_worker_callback_t callback; // Callback for specific client operations
+    void * arg;
 } dap_worker_msg_callback_t;
 
 int dap_worker_init( size_t a_conn_timeout );
@@ -68,6 +79,7 @@ void dap_worker_deinit();
 
 void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_worker_t * a_worker);
 dap_worker_t *dap_worker_add_events_socket_auto( dap_events_socket_t * a_events_socket );
+void dap_worker_exec_callback_on(dap_worker_t * a_worker, dap_worker_callback_t a_callback, void * a_arg);
 
 // Thread function
 void *dap_worker_thread(void *arg);
diff --git a/dap-sdk/net/server/enc_server/dap_enc_http.c b/dap-sdk/net/server/enc_server/dap_enc_http.c
index c04ddf05f1..eabc22b6f6 100644
--- a/dap-sdk/net/server/enc_server/dap_enc_http.c
+++ b/dap-sdk/net/server/enc_server/dap_enc_http.c
@@ -98,7 +98,7 @@ void enc_http_proc(struct dap_http_simple *cl_st, void * arg)
     log_it(L_DEBUG,"Proc enc http request");
     http_status_code_t * return_code = (http_status_code_t*)arg;
 
-    if(strcmp(cl_st->http->url_path,"gd4y5yh78w42aaagh") == 0 ) {
+    if(strcmp(cl_st->http_client->url_path,"gd4y5yh78w42aaagh") == 0 ) {
 
         uint8_t alice_msg[cl_st->request_size];
         size_t decode_len = dap_enc_base64_decode(cl_st->request, cl_st->request_size, alice_msg, DAP_ENC_DATA_TYPE_B64);
@@ -148,7 +148,7 @@ void enc_http_proc(struct dap_http_simple *cl_st, void * arg)
         *return_code = Http_Status_OK;
 
     } else{
-        log_it(L_ERROR,"Wrong path '%s' in the request to enc_http module",cl_st->http->url_path);
+        log_it(L_ERROR,"Wrong path '%s' in the request to enc_http module",cl_st->http_client->url_path);
         *return_code = Http_Status_NotFound;
     }
 }
@@ -171,16 +171,16 @@ void enc_http_add_proc(struct dap_http * sh, const char * url)
 enc_http_delegate_t *enc_http_request_decode(struct dap_http_simple *a_http_simple)
 {
 
-    dap_enc_key_t * l_key= dap_enc_ks_find_http(a_http_simple->http);
+    dap_enc_key_t * l_key= dap_enc_ks_find_http(a_http_simple->http_client);
     if(l_key){
         enc_http_delegate_t * dg = DAP_NEW_Z(enc_http_delegate_t);
         dg->key=l_key;
-        dg->http=a_http_simple->http;
+        dg->http=a_http_simple->http_client;
        // dg->isOk=true;
 
-        strncpy(dg->action,a_http_simple->http->action,sizeof(dg->action)-1);
-        if(a_http_simple->http->in_cookie[0])
-            dg->cookie=strdup(a_http_simple->http->in_cookie);
+        strncpy(dg->action,a_http_simple->http_client->action,sizeof(dg->action)-1);
+        if(a_http_simple->http_client->in_cookie[0])
+            dg->cookie=strdup(a_http_simple->http_client->in_cookie);
 
         if(a_http_simple->request_size){
             size_t l_dg_request_size_max = a_http_simple->request_size;
@@ -199,20 +199,20 @@ enc_http_delegate_t *enc_http_request_decode(struct dap_http_simple *a_http_simp
         else
             l_enc_type = DAP_ENC_DATA_TYPE_B64;
 
-        size_t l_url_path_size_max = strlen(a_http_simple->http->url_path);
+        size_t l_url_path_size_max = strlen(a_http_simple->http_client->url_path);
         if(l_url_path_size_max){
             dg->url_path= DAP_NEW_SIZE(char,l_url_path_size_max+1);
-            dg->url_path_size=dap_enc_decode(l_key, a_http_simple->http->url_path,l_url_path_size_max,dg->url_path, l_url_path_size_max, l_enc_type);
+            dg->url_path_size=dap_enc_decode(l_key, a_http_simple->http_client->url_path,l_url_path_size_max,dg->url_path, l_url_path_size_max, l_enc_type);
             dg->url_path[dg->url_path_size] = 0;
             log_it(L_DEBUG,"URL path after decode '%s'",dg->url_path );
             // log_it(L_DEBUG,"URL path before decode: '%s' after decode '%s'",cl_st->http->url_path,dg->url_path );
         }
 
-        size_t l_in_query_size=strlen(a_http_simple->http->in_query_string);
+        size_t l_in_query_size=strlen(a_http_simple->http_client->in_query_string);
 
         if(l_in_query_size){
             dg->in_query= DAP_NEW_SIZE(char, l_in_query_size+1);
-            dg->in_query_size=dap_enc_decode(l_key, a_http_simple->http->in_query_string,l_in_query_size,dg->in_query,l_in_query_size,  l_enc_type);
+            dg->in_query_size=dap_enc_decode(l_key, a_http_simple->http_client->in_query_string,l_in_query_size,dg->in_query,l_in_query_size,  l_enc_type);
             dg->in_query[dg->in_query_size] = 0;
             log_it(L_DEBUG,"Query string after decode '%s'",dg->in_query);
         }
@@ -233,7 +233,7 @@ enc_http_delegate_t *enc_http_request_decode(struct dap_http_simple *a_http_simp
  */
 void enc_http_reply_encode(struct dap_http_simple *a_http_simple,enc_http_delegate_t * a_http_delegate)
 {
-    dap_enc_key_t * key = dap_enc_ks_find_http(a_http_simple->http);
+    dap_enc_key_t * key = dap_enc_ks_find_http(a_http_simple->http_client);
     if( key == NULL ) {
         log_it(L_ERROR, "Can't find http key.");
         return;
diff --git a/dap-sdk/net/server/http_server/dap_http_simple.c b/dap-sdk/net/server/http_server/dap_http_simple.c
index 2f96ea9d69..abb2c8acc9 100644
--- a/dap-sdk/net/server/http_server/dap_http_simple.c
+++ b/dap-sdk/net/server/http_server/dap_http_simple.c
@@ -45,6 +45,9 @@ See more details here <http://www.gnu.org/licenses/>.
 
 #include "dap_common.h"
 #include "dap_config.h"
+#include "dap_worker.h"
+#include "dap_events.h"
+#include "dap_proc_thread.h"
 #include "dap_http.h"
 #include "dap_http_client.h"
 #include "dap_http_simple.h"
@@ -59,16 +62,9 @@ See more details here <http://www.gnu.org/licenses/>.
 
 #define LOG_TAG "dap_http_simple"
 
-static void s_headers_read( dap_http_client_t *cl_ht, void *arg );
-static void s_http_client_data_write( dap_http_client_t *a_http_client, void *a_arg );
-static void s_es_read( dap_http_client_t * cl_ht, void *arg );
-void *dap_http_simple_proc( dap_http_simple_t * cl_sh );
-
-static void *loop_http_simple_proc( void *arg );
-
-static void async_control_proc( void );
-static void queue_http_request_put( dap_http_simple_t *cl_sh );
-
+static void s_http_client_headers_read( dap_http_client_t *cl_ht, void *arg );
+static void s_http_client_data_read( dap_http_client_t * cl_ht, void *arg );
+static bool s_proc_queue_callback(dap_proc_thread_t * a_thread, void *a_arg );
 
 typedef struct dap_http_simple_url_proc {
 
@@ -99,91 +95,16 @@ static bool is_unknown_user_agents_pass = false;
 
 #define DAP_HTTP_SIMPLE_URL_PROC(a) ((dap_http_simple_url_proc_t*) (a)->_inheritor)
 
-///static struct ev_loop* http_simple_loop;
-///static ev_async async_watcher_http_simple;
-
-static pthread_mutex_t mutex_on_queue_http_response = PTHREAD_MUTEX_INITIALIZER;
-static pthread_t http_simple_loop_thread;
-static bool bSimpleLoopThreadQuitSignal = false;
-
-static dap_http_simple_t **s_requests = NULL;
-static dap_http_simple_t **s_requestsproc = NULL;
-
-static uint32_t s_requests_count = 0;
-static uint32_t s_requestsproc_count = 0;
-
 static void _free_user_agents_list( void );
 
 int dap_http_simple_module_init( )
 {
-  s_requests = DAP_NEW_Z_SIZE(dap_http_simple_t*, sizeof(dap_http_simple_t *) * DAP_HTTP_SIMPLE_REQUEST_MAX * 2 );
-  if ( !s_requests ) {
-
-    log_it( L_ERROR, "Out of memory" );
-    return -1;
-  }
-
-  s_requestsproc = s_requests + DAP_HTTP_SIMPLE_REQUEST_MAX;
-  s_requests_count = 0;
-
-  bSimpleLoopThreadQuitSignal = false;
-  pthread_create( &http_simple_loop_thread, NULL, loop_http_simple_proc, NULL );
-
-  return 0;
+    return 0;
 }
 
 void dap_http_simple_module_deinit( void )
 {
-  bSimpleLoopThreadQuitSignal = true;
-
-  pthread_mutex_destroy( &mutex_on_queue_http_response );
-  pthread_join( http_simple_loop_thread, NULL );
-
-  _free_user_agents_list( );
-
-  if ( s_requests ) {
-    free( s_requests );
-    s_requests = NULL;
-  } 
-}
-
-//#define SIMPLE_LOOP_SLEEP   25 // ms
-#define SIMPLE_LOOP_SLEEP   50 // ms
-
-static struct timespec simple_loop_sleep = { 0, SIMPLE_LOOP_SLEEP * 1000 * 1000 };
-
-static void *loop_http_simple_proc( void *arg )
-{
-  log_it( L_NOTICE, "Start loop http simple thread" );
-
-  do {
-
-    pthread_mutex_lock( &mutex_on_queue_http_response );
-    if ( s_requests_count ) {
-
-      s_requestsproc_count = s_requests_count;
-      s_requests_count = 0;
-      memcpy( s_requestsproc, s_requests, sizeof(void *) * s_requestsproc_count );
-      pthread_mutex_unlock( &mutex_on_queue_http_response );
-
-      for ( uint32_t i = 0; i < s_requestsproc_count; ++ i ) {
-        dap_http_simple_proc( s_requestsproc[i] );
-        s_requestsproc[i]->http->esocket->no_close = false;
-//        free( s_requestsproc[i] ); // ???
-      }
-    }
-    else {
-      pthread_mutex_unlock( &mutex_on_queue_http_response );
-      #ifndef _WIN32
-        nanosleep( &simple_loop_sleep, NULL );
-      #else
-        Sleep( SIMPLE_LOOP_SLEEP );
-      #endif
-    }
-
-  } while( !bSimpleLoopThreadQuitSignal );
-
-  return NULL;
+    _free_user_agents_list( );
 }
 
 /**
@@ -204,8 +125,8 @@ void dap_http_simple_proc_add( dap_http_t *a_http, const char *a_url_path, size_
                      l_url_proc, // Internal structure
                      NULL, // Contrustor
                      NULL, //  Destructor
-                     s_headers_read, NULL, // Headers read, write
-                     s_es_read, s_http_client_data_write, // Data read, write
+                     s_http_client_headers_read, NULL, // Headers read, write
+                     s_http_client_data_read, NULL, // Data read, write
                      NULL); // errror
 }
 
@@ -253,6 +174,7 @@ END:
   return result;
 }
 
+
 bool dap_http_simple_set_supported_user_agents( const char *user_agents, ... )
 {
   va_list argptr;
@@ -308,6 +230,7 @@ inline static void _set_only_write_http_client_state(dap_http_client_t* http_cli
 //  Sleep(300);
 
   dap_events_socket_set_readable_unsafe(http_client->esocket,false);
+  dap_events_socket_set_writable_unsafe(http_client->esocket,true);
 //  http_client->state_write=DAP_HTTP_CLIENT_STATE_NONE;
 
   http_client->state_write=DAP_HTTP_CLIENT_STATE_START;
@@ -326,12 +249,12 @@ static void _copy_reply_and_mime_to_response( dap_http_simple_t *cl_sh )
     return;
   }
 
-  cl_sh->http->out_content_length = cl_sh->reply_size;
-  strcpy( cl_sh->http->out_content_type, cl_sh->reply_mime );
+  cl_sh->http_client->out_content_length = cl_sh->reply_size;
+  strcpy( cl_sh->http_client->out_content_type, cl_sh->reply_mime );
   return;
 }
 
-inline static void _write_response_bad_request( dap_http_simple_t * cl_sh,
+inline static void _write_response_bad_request( dap_http_simple_t * a_http_simple,
                                                const char* error_msg )
 {
 //  log_it(L_DEBUG,"_write_response_bad_request");
@@ -341,75 +264,84 @@ inline static void _write_response_bad_request( dap_http_simple_t * cl_sh,
   json_object_object_add( jobj, "error", json_object_new_string(error_msg) );
 
   log_it( L_DEBUG, "error message %s",  json_object_to_json_string(jobj) );
-  cl_sh->http->reply_status_code = Http_Status_BadRequest;
+  a_http_simple->http_client->reply_status_code = Http_Status_BadRequest;
 
   const char* json_str = json_object_to_json_string( jobj );
-  dap_http_simple_reply(cl_sh, (void*) json_str,
+  dap_http_simple_reply(a_http_simple, (void*) json_str,
                           (size_t) strlen(json_str) );
 
-  strcpy( cl_sh->reply_mime, "application/json" );
+  strcpy( a_http_simple->reply_mime, "application/json" );
 
-  _copy_reply_and_mime_to_response( cl_sh );
+  _copy_reply_and_mime_to_response( a_http_simple );
 
   json_object_put( jobj ); // free obj
-  _set_only_write_http_client_state( cl_sh->http );
+  dap_events_socket_set_writable_mt(a_http_simple->worker, a_http_simple->esocket ,true);
 }
 
 /**
  * @brief dap_http_simple_proc Execute procession callback and switch to write state
  * @param cl_sh HTTP simple client instance
  */
-void* dap_http_simple_proc( dap_http_simple_t *cl_sh )
+bool s_proc_queue_callback(dap_proc_thread_t * a_thread, void * a_arg )
 {
+    (void) a_thread;
+     dap_http_simple_t *l_http_simple = (dap_http_simple_t*) a_arg;
 //  log_it(L_DEBUG, "dap http simple proc");
 //  Sleep(300);
 
     http_status_code_t return_code = (http_status_code_t)0;
 
     if(_is_supported_user_agents_list_setted() == true) {
-        dap_http_header_t *header = dap_http_header_find(cl_sh->http->in_headers, "User-Agent");
+        dap_http_header_t *header = dap_http_header_find(l_http_simple->http_client->in_headers, "User-Agent");
         if(header == NULL && is_unknown_user_agents_pass == false) {
-            const char* error_msg = "Not found User-Agent HTTP header";
-            _write_response_bad_request(cl_sh, error_msg);
-            return NULL;
+            const char error_msg[] = "Not found User-Agent HTTP header";
+            _write_response_bad_request(l_http_simple, error_msg);
+            _set_only_write_http_client_state( l_http_simple->http_client);
+            dap_events_socket_assign_on_worker_mt(l_http_simple->esocket, l_http_simple->worker);
+            return true;
         }
 
         if(_is_user_agent_supported(header->value) == false) {
             log_it(L_DEBUG, "Not supported user agent in request: %s", header->value);
             const char* error_msg = "User-Agent version not supported. Update your software";
-            _write_response_bad_request(cl_sh, error_msg);
-            return NULL;
+            _write_response_bad_request(l_http_simple, error_msg);
+            _set_only_write_http_client_state( l_http_simple->http_client);
+            dap_events_socket_assign_on_worker_mt(l_http_simple->esocket, l_http_simple->worker);
+            return true;
         }
     }
 
-    DAP_HTTP_SIMPLE_URL_PROC(cl_sh->http->proc)->proc_callback(cl_sh,&return_code);
+    DAP_HTTP_SIMPLE_URL_PROC(l_http_simple->http_client->proc)->proc_callback(l_http_simple,&return_code);
 
     if(return_code) {
         log_it(L_DEBUG, "Request was processed well return_code=%d", return_code);
-        cl_sh->http->reply_status_code = (uint16_t)return_code;
-        _copy_reply_and_mime_to_response(cl_sh);
+        l_http_simple->http_client->reply_status_code = (uint16_t)return_code;
+        _copy_reply_and_mime_to_response(l_http_simple);
     } else {
         log_it(L_ERROR, "Request was processed with ERROR");
-        cl_sh->http->reply_status_code = Http_Status_InternalServerError;
+        l_http_simple->http_client->reply_status_code = Http_Status_InternalServerError;
     }
 
-    _set_only_write_http_client_state(cl_sh->http);
-    return NULL;
+    _set_only_write_http_client_state( l_http_simple->http_client);
+    dap_events_socket_assign_on_worker_mt(l_http_simple->esocket, l_http_simple->worker);
+
+    return true;
 }
 
 
-static void s_headers_read( dap_http_client_t *a_http_client, void *a_arg )
+static void s_http_client_headers_read( dap_http_client_t *a_http_client, void *a_arg )
 {
     (void) a_arg;
 
     a_http_client->_inheritor = DAP_NEW_Z( dap_http_simple_t );
-
+    dap_http_simple_t * l_http_simple = DAP_HTTP_SIMPLE(a_http_client);
     //  log_it(L_DEBUG,"dap_http_simple_headers_read");
     //  Sleep(300);
 
-    DAP_HTTP_SIMPLE(a_http_client)->http = a_http_client;
-    DAP_HTTP_SIMPLE(a_http_client)->reply_size_max = DAP_HTTP_SIMPLE_URL_PROC( a_http_client->proc )->reply_size_max;
-    DAP_HTTP_SIMPLE(a_http_client)->reply_byte = DAP_NEW_Z_SIZE(uint8_t, DAP_HTTP_SIMPLE(a_http_client)->reply_size_max );
+    l_http_simple->esocket = a_http_client->esocket;
+    l_http_simple->worker = a_http_client->esocket->worker;
+    l_http_simple->reply_size_max = DAP_HTTP_SIMPLE_URL_PROC( a_http_client->proc )->reply_size_max;
+    l_http_simple->reply_byte = DAP_NEW_Z_SIZE(uint8_t, DAP_HTTP_SIMPLE(a_http_client)->reply_size_max );
 
     if( a_http_client->in_content_length ) {
         // dbg if( a_http_client->in_content_length < 3){
@@ -425,11 +357,12 @@ static void s_headers_read( dap_http_client_t *a_http_client, void *a_arg )
             log_it(L_ERROR, "Not defined content-length %u in request", a_http_client->in_content_length);
     } else {
         log_it( L_DEBUG, "No data section, execution proc callback" );
-        queue_http_request_put( DAP_HTTP_SIMPLE(a_http_client) );
+        dap_events_socket_remove_from_worker_unsafe(a_http_client->esocket,a_http_client->esocket->worker);
+        dap_proc_queue_add_callback( a_http_client->esocket->worker->proc_queue, s_proc_queue_callback, l_http_simple);
     }
 }
 
-void s_es_read( dap_http_client_t *a_http_client, void * a_arg )
+void s_http_client_data_read( dap_http_client_t *a_http_client, void * a_arg )
 {
     int *ret = (int *)a_arg;
 
@@ -454,64 +387,32 @@ void s_es_read( dap_http_client_t *a_http_client, void * a_arg )
             l_http_simple->request_size += bytes_to_read;
         }
     }
-
+    *ret = (int) a_http_client->esocket->buf_in_size;
     if( l_http_simple->request_size >= a_http_client->in_content_length ) {
 
         // bool isOK=true;
-        log_it( L_DEBUG,"Data collected" );
-        queue_http_request_put( l_http_simple );
+        log_it( L_INFO,"Data for http_simple_request collected" );
+        dap_events_socket_set_readable_unsafe(a_http_client->esocket,false);
+        dap_events_socket_set_writable_unsafe(a_http_client->esocket,false);
+        a_http_client->esocket->_inheritor = NULL; // Unbound http_simple from http_client when over reading,
+                                          // now it would be proc thread context
+        dap_proc_queue_add_callback( a_http_client->esocket->worker->proc_queue, s_proc_queue_callback, l_http_simple);
     }
-
-    *ret = (int) a_http_client->esocket->buf_in_size;
 }
 
 
-/**
- * @brief dap_http_simple_data_write
- * @param a_http_client
- * @param a_arg
- */
-static void s_http_client_data_write( dap_http_client_t *a_http_client, void *a_arg )
-{
-  (void) a_arg;
-  dap_http_simple_t *cl_st = DAP_HTTP_SIMPLE( a_http_client );
-
-//  log_it(L_DEBUG,"dap_http_simple_data_write");
-//  Sleep(300);
-
-  if ( !cl_st->reply ) {
-
-    a_http_client->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE;
-    log_it( L_WARNING, "No reply to write, close connection" );
-
-    return;
-  }
-
-  cl_st->reply_sent += dap_events_socket_write_unsafe( a_http_client->esocket,
-                                              cl_st->reply_byte + cl_st->reply_sent,
-                                              a_http_client->out_content_length - cl_st->reply_sent );
-
-  if ( cl_st->reply_sent >= a_http_client->out_content_length ) {
-    log_it(L_INFO, "All the reply (%u) is sent out", a_http_client->out_content_length );
-    //cl_ht->client->signal_close=cl_ht->keep_alive;
-    a_http_client->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE;
-    //dap_client_ready_to_write(cl_ht->client,false);
-  }
-
-  free( cl_st->reply );
-}
-
 /**
  * @brief dap_http_simple_reply Add data to the reply buffer
  * @param shs HTTP simple client instance
  * @param data
  * @param data_size
  */
-size_t dap_http_simple_reply( dap_http_simple_t *a_http_simple, void *a_data, size_t a_data_size )
+size_t dap_http_simple_reply(dap_http_simple_t *a_http_simple, void *a_data, size_t a_data_size )
 {
     size_t l_data_copy_size = (a_data_size > (a_http_simple->reply_size_max - a_http_simple->reply_size) ) ? (a_http_simple->reply_size_max - a_http_simple->reply_size) : a_data_size;
 
-    memcpy( a_http_simple->reply_byte + a_http_simple->reply_size, a_data, l_data_copy_size );
+    memcpy(a_http_simple->reply_byte+a_http_simple->reply_size, a_data,l_data_copy_size );
+
     a_http_simple->reply_size += l_data_copy_size;
 
     return l_data_copy_size;
@@ -522,7 +423,7 @@ size_t dap_http_simple_reply( dap_http_simple_t *a_http_simple, void *a_data, si
  * @param shs
  * @param data
  */
-size_t dap_http_simple_reply_f( dap_http_simple_t * shs, const char * data, ... )
+size_t dap_http_simple_reply_f(dap_http_simple_t * shs, const char * data, ... )
 {
   char buf[4096];
   va_list va;
@@ -538,26 +439,6 @@ size_t dap_http_simple_reply_f( dap_http_simple_t * shs, const char * data, ...
     return 0;
 }
 
-inline void queue_http_request_put( dap_http_simple_t *cl_sh )
-{
-//  dap_http_simple_proc( cl_sh );
-
-  pthread_mutex_lock( &mutex_on_queue_http_response );
-
-  if ( s_requests_count >= DAP_HTTP_SIMPLE_REQUEST_MAX ) {
-
-    log_it( L_NOTICE, "Requests Buffer is FULL( %u ) ignore request" );
-    pthread_mutex_unlock( &mutex_on_queue_http_response );
-    return;
-  }
-
-  log_it( L_WARNING, "queue_http_request_put >>> %u", s_requests_count );
-
-  s_requests[ s_requests_count ++ ] = cl_sh;
-  cl_sh->http->esocket->no_close = true;
-
-  pthread_mutex_unlock( &mutex_on_queue_http_response );
-}
 
 /* Key Expired deprecated code */
 
diff --git a/dap-sdk/net/server/http_server/http_client/dap_http_client.c b/dap-sdk/net/server/http_server/http_client/dap_http_client.c
index 545c2918bd..ac0b171d04 100644
--- a/dap-sdk/net/server/http_server/http_client/dap_http_client.c
+++ b/dap-sdk/net/server/http_server/http_client/dap_http_client.c
@@ -93,23 +93,25 @@ void dap_http_client_new( dap_events_socket_t *cl, void *arg )
  */
 void dap_http_client_delete( dap_events_socket_t * cl, void *arg )
 {
-  dap_http_client_t *cl_ht = DAP_HTTP_CLIENT( cl );
-
-  while( cl_ht->in_headers )
-    dap_http_header_remove( &cl_ht->in_headers, cl_ht->in_headers );
+    dap_http_client_t *cl_ht = DAP_HTTP_CLIENT( cl );
+    if (cl_ht == NULL){ // Client is in proc callback in another thread so we don't delete it
+        return;
+    }
+    while( cl_ht->in_headers )
+        dap_http_header_remove( &cl_ht->in_headers, cl_ht->in_headers );
 
-  while( cl_ht->out_headers )
-    dap_http_header_remove( &cl_ht->out_headers, cl_ht->out_headers );
+    while( cl_ht->out_headers )
+        dap_http_header_remove( &cl_ht->out_headers, cl_ht->out_headers );
 
-  if( cl_ht->proc ) {
-    if( cl_ht->proc->delete_callback ) {
-      cl_ht->proc->delete_callback( cl_ht, NULL );
+    if( cl_ht->proc ) {
+        if( cl_ht->proc->delete_callback ) {
+          cl_ht->proc->delete_callback( cl_ht, NULL );
+        }
     }
-  }
 
-  if( cl_ht->_inheritor ) {
-    free( cl_ht->_inheritor );
-  }
+    if( cl_ht->_inheritor ) {
+        free( cl_ht->_inheritor );
+    }
 
   (void) arg;
 }
@@ -321,50 +323,50 @@ static inline void s_report_error_and_restart( dap_events_socket_t *cl, dap_http
  * @param cl HTTP Client instance
  * @param arg Additional argument (usualy not used)
  */
-void dap_http_client_read( dap_events_socket_t *cl, void *arg )
+void dap_http_client_read( dap_events_socket_t *a_esocket, void *arg )
 {
     char buf_line[4096] = {'\0'};
-    dap_http_client_t *cl_ht = DAP_HTTP_CLIENT( cl );
+    dap_http_client_t *l_http_client = DAP_HTTP_CLIENT( a_esocket );
 
 //  log_it( L_DEBUG, "dap_http_client_read..." );
   //log_it( L_DEBUG, "HTTP client in state read %d taked bytes in input %lu", cl_ht->state_read, cl->buf_in_size );
 
     do {
-        switch( cl_ht->state_read ) {
+        switch( l_http_client->state_read ) {
             case DAP_HTTP_CLIENT_STATE_START: { // Beginning of the session. We try to detect
               char  *peol;
               uint32_t eol;
 
-              if (!(peol = (char*)memchr(cl->buf_in, 10, cl->buf_in_size))) { /// search LF
-                  peol = (char*)memchr(cl->buf_in, 13, cl->buf_in_size);
+              if (!(peol = (char*)memchr(a_esocket->buf_in, 10, a_esocket->buf_in_size))) { /// search LF
+                  peol = (char*)memchr(a_esocket->buf_in, 13, a_esocket->buf_in_size);
               }
 
               if (peol) {
-                  eol = peol - cl->buf_in_str;
+                  eol = peol - a_esocket->buf_in_str;
                   if (eol <= 0) {
-                      eol = cl->buf_in_size - 2;
+                      eol = a_esocket->buf_in_size - 2;
                   }
               } else {
                   log_it( L_WARNING, "Single-line, possibly trash, input detected");
-                  eol = cl->buf_in_size - 2;
+                  eol = a_esocket->buf_in_size - 2;
               }
 
               if ( eol + 3 >= sizeof(buf_line) ) {
                 log_it( L_WARNING,"Too big line in request, more than %llu symbols - thats very strange", sizeof(buf_line) - 3 );
-                s_report_error_and_restart( cl, cl_ht );
+                s_report_error_and_restart( a_esocket, l_http_client );
                 break;
               }
 
-              memcpy( buf_line, cl->buf_in, eol + 1 ); // copy with LF
+              memcpy( buf_line, a_esocket->buf_in, eol + 1 ); // copy with LF
 
-              dap_events_socket_shrink_buf_in( cl, eol + 1 );
+              dap_events_socket_shrink_buf_in( a_esocket, eol + 1 );
               buf_line[ eol + 2 ] = 0; // null terminate
 
               // parse http_request_line
-              if ( !dap_http_request_line_parse(cl_ht, buf_line, eol + 1) ) {
+              if ( !dap_http_request_line_parse(l_http_client, buf_line, eol + 1) ) {
 
                 log_it( L_WARNING, "Input: Wrong request line '%s'", buf_line );
-                s_report_error_and_restart( cl, cl_ht );
+                s_report_error_and_restart( a_esocket, l_http_client );
                 break;
               }
 
@@ -373,54 +375,54 @@ void dap_http_client_read( dap_events_socket_t *cl, void *arg )
 
               char *query_string;
 
-              if( (query_string = strchr(cl_ht->url_path, '?')) != NULL ) {
+              if( (query_string = strchr(l_http_client->url_path, '?')) != NULL ) {
 
                 size_t len_after = strlen( query_string + 1 );
 
                 if ( len_after ) {
-                  if( len_after > (sizeof(cl_ht->in_query_string) - 1) )
-                    len_after = sizeof(cl_ht->in_query_string) - 1;
+                  if( len_after > (sizeof(l_http_client->in_query_string) - 1) )
+                    len_after = sizeof(l_http_client->in_query_string) - 1;
 
                   if ( strstr(query_string, "HTTP/1.1") )
-                    strncpy( cl_ht->in_query_string, query_string + 1, len_after - 11 );
+                    strncpy( l_http_client->in_query_string, query_string + 1, len_after - 11 );
                   else
-                    strncpy( cl_ht->in_query_string,query_string + 1, len_after );
+                    strncpy( l_http_client->in_query_string,query_string + 1, len_after );
 
-                  if ( cl_ht->in_query_string[strlen(cl_ht->in_query_string) - 1] == ' ' )
-                    cl_ht->in_query_string[strlen(cl_ht->in_query_string) - 1] = 0;
+                  if ( l_http_client->in_query_string[strlen(l_http_client->in_query_string) - 1] == ' ' )
+                    l_http_client->in_query_string[strlen(l_http_client->in_query_string) - 1] = 0;
 
                   query_string[0] = 0;
                 }
               }
 
-              log_it( L_WARNING, "Input: %s request for %s document (query string '%s')", cl_ht->action, cl_ht->url_path, cl_ht->in_query_string[0] ? cl_ht->in_query_string : ""  );
+              log_it( L_WARNING, "Input: %s request for %s document (query string '%s')", l_http_client->action, l_http_client->url_path, l_http_client->in_query_string[0] ? l_http_client->in_query_string : ""  );
 
               dap_http_url_proc_t *url_proc;
-              int32_t tpos = z_dirname( cl_ht->url_path, 0 );
-              log_it( L_WARNING, "cl_ht->url_path(dir) = %s", cl_ht->url_path );
+              int32_t tpos = z_dirname( l_http_client->url_path, 0 );
+              log_it( L_WARNING, "cl_ht->url_path(dir) = %s", l_http_client->url_path );
 
-              HASH_FIND_STR( cl_ht->http->url_proc, cl_ht->url_path, url_proc );  // Find URL processor
+              HASH_FIND_STR( l_http_client->http->url_proc, l_http_client->url_path, url_proc );  // Find URL processor
 
-              cl_ht->proc = url_proc;
+              l_http_client->proc = url_proc;
 
               if ( tpos )
-                cl_ht->url_path[ tpos ] = '/';
+                l_http_client->url_path[ tpos ] = '/';
 
-              char *ptr = z_basename( cl_ht->url_path, 0 );
+              char *ptr = z_basename( l_http_client->url_path, 0 );
               log_it( L_WARNING, "basename = %s", ptr );
 
         //      log_it( L_WARNING, "cl_ht->client->socket = %u efd %u", cl_ht->client->socket, cl_ht->client->efd );
 
-              memmove( cl_ht->url_path, ptr, strlen(ptr) + 1 );
+              memmove( l_http_client->url_path, ptr, strlen(ptr) + 1 );
 
-              log_it( L_WARNING, "cl_ht->url_path = %s", cl_ht->url_path );
+              log_it( L_WARNING, "cl_ht->url_path = %s", l_http_client->url_path );
 
               if ( url_proc ) {
-                cl_ht->state_read = DAP_HTTP_CLIENT_STATE_HEADERS;
+                l_http_client->state_read = DAP_HTTP_CLIENT_STATE_HEADERS;
               }
               else {
-                log_it( L_WARNING, "Input: unprocessed URL request %s is rejected", cl_ht->url_path );
-                s_report_error_and_restart( cl, cl_ht );
+                log_it( L_WARNING, "Input: unprocessed URL request %s is rejected", l_http_client->url_path );
+                s_report_error_and_restart( a_esocket, l_http_client );
                 break;
               }
 
@@ -433,13 +435,13 @@ void dap_http_client_read( dap_events_socket_t *cl, void *arg )
               char  *peol;
               uint32_t eol;
 
-              if ( !(peol = (char *)memchr(cl->buf_in, 10, cl->buf_in_size)) ) { /// search LF
+              if ( !(peol = (char *)memchr(a_esocket->buf_in, 10, a_esocket->buf_in_size)) ) { /// search LF
                 log_it( L_WARNING, "DAP_HTTP_CLIENT_STATE_HEADERS: no LF" );
-                s_report_error_and_restart( cl, cl_ht );
+                s_report_error_and_restart( a_esocket, l_http_client );
                 break;
               }
 
-              eol = peol - cl->buf_in_str;
+              eol = peol - a_esocket->buf_in_str;
 
         //      int eol = detect_end_of_line( cl->buf_in, cl->buf_in_size );
 
@@ -449,10 +451,10 @@ void dap_http_client_read( dap_events_socket_t *cl, void *arg )
         //      }
 
               int parse_ret;
-              memcpy( buf_line, cl->buf_in, eol + 1 );
+              memcpy( buf_line, a_esocket->buf_in, eol + 1 );
               buf_line[eol-1] = 0;
 
-              parse_ret = dap_http_header_parse( cl_ht, buf_line );
+              parse_ret = dap_http_header_parse( l_http_client, buf_line );
 
         //      log_it( L_WARNING, "cl_ht->client->socket = %u efd %u", cl_ht->client->socket, cl_ht->client->efd );
 
@@ -464,27 +466,27 @@ void dap_http_client_read( dap_events_socket_t *cl, void *arg )
 
                 log_it( L_INFO, "Input: HTTP headers are over" );
 
-                if ( cl_ht->proc->access_callback ) {
+                if ( l_http_client->proc->access_callback ) {
 
         //          log_it( L_WARNING, "access_callback" );
 
                   bool isOk = true;
-                  cl_ht->proc->access_callback( cl_ht, &isOk );
+                  l_http_client->proc->access_callback( l_http_client, &isOk );
                   if ( !isOk ) {
                     log_it( L_NOTICE, "Access restricted" );
-                    s_report_error_and_restart( cl, cl_ht );
+                    s_report_error_and_restart( a_esocket, l_http_client );
                   }
                 }
 
-                if ( cl_ht->proc->headers_read_callback ) {
+                if ( l_http_client->proc->headers_read_callback ) {
                   log_it( L_WARNING, "headers_read_callback" );
-                  cl_ht->proc->headers_read_callback( cl_ht, NULL );
+                  l_http_client->proc->headers_read_callback( l_http_client, NULL );
                 }
 
                 // If no headers callback we go to the DATA processing
-                if( cl_ht->in_content_length ) {
+                if( l_http_client->in_content_length ) {
                     log_it( L_WARNING, "headers -> DAP_HTTP_CLIENT_STATE_DATA" );
-                    cl_ht->state_read = DAP_HTTP_CLIENT_STATE_DATA;
+                    l_http_client->state_read = DAP_HTTP_CLIENT_STATE_DATA;
                 }
                 else {
                                   //log_it
@@ -494,7 +496,7 @@ void dap_http_client_read( dap_events_socket_t *cl, void *arg )
                 }
               } // parse_ret == 1
 
-              dap_events_socket_shrink_buf_in( cl, eol + 1 );
+              dap_events_socket_shrink_buf_in( a_esocket, eol + 1 );
             }
             break;
 
@@ -503,29 +505,29 @@ void dap_http_client_read( dap_events_socket_t *cl, void *arg )
                    //   log_it(L_WARNINGNG, "DBG_#002 [%s] [%s]",             cl_ht->in_query_string, cl_ht->url_path);
 
               size_t read_bytes = 0;
-              if ( cl_ht->proc->data_read_callback ) {
+              if ( l_http_client->proc->data_read_callback ) {
         //            log_it( L_WARNING, "cl_ht->proc->data_read_callback()" );
 
                 //while(cl_ht->client->buf_in_size){
-                cl_ht->proc->data_read_callback( cl_ht, &read_bytes );
-                dap_events_socket_shrink_buf_in( cl, read_bytes );
+                l_http_client->proc->data_read_callback( l_http_client, &read_bytes );
+                dap_events_socket_shrink_buf_in( a_esocket, read_bytes );
                         //}
               }
               else {
                 log_it( L_WARNING, "data_read callback is NULL in DAP_HTTP_CLIENT_STATE_DATA" );
-                cl->buf_in_size = 0;
+                a_esocket->buf_in_size = 0;
               }
             }
             break;
 
             case DAP_HTTP_CLIENT_STATE_NONE: {
-              cl->buf_in_size = 0;
+              a_esocket->buf_in_size = 0;
             }
             break;
 
     } // switch
 
-  } while ( cl->buf_in_size > 0 );
+  } while ( a_esocket->buf_in_size > 0 );
 
 //  log_it( L_DEBUG, "dap_http_client_read...exit" );
 //  Sleep(100);
diff --git a/dap-sdk/net/server/http_server/include/dap_http_simple.h b/dap-sdk/net/server/http_server/include/dap_http_simple.h
index 0dda2401f4..bd2ccc1a51 100644
--- a/dap-sdk/net/server/http_server/include/dap_http_simple.h
+++ b/dap-sdk/net/server/http_server/include/dap_http_simple.h
@@ -35,28 +35,29 @@ struct dap_http_simple;
 typedef void ( *dap_http_simple_callback_t )( struct dap_http_simple *, void * );
 
 typedef struct dap_http_simple {
-
-  dap_http_client_t *http;
-
-  union {
-    void *request;
-    char *request_str;
-    uint8_t * request_byte;
-  };
-
-  union {
-    void *reply;
-    uint8_t *reply_byte;
-    char *reply_str;
-  };
-
-  size_t request_size;
-  size_t request_size_max;
-  size_t reply_size;
-  size_t reply_size_max;
-  size_t reply_sent;
-
-  char reply_mime[256];
+    dap_events_socket_t * esocket;
+    dap_worker_t * worker;
+    dap_http_client_t * http_client;
+    union {
+        void *request;
+        char *request_str;
+        uint8_t * request_byte;
+    };
+
+    union {
+        void *reply;
+        uint8_t *reply_byte;
+        char *reply_str;
+    };
+    size_t content_length;
+
+    size_t request_size;
+    size_t request_size_max;
+    size_t reply_size;
+    size_t reply_size_max;
+    size_t reply_sent;
+
+    char reply_mime[256];
 
    // dap_http_simple_callback_t reply_proc_post_callback;
 } dap_http_simple_t;
diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c
index d98586360a..2374eedcaa 100644
--- a/dap-sdk/net/stream/stream/dap_stream.c
+++ b/dap-sdk/net/stream/stream/dap_stream.c
@@ -63,7 +63,7 @@ void s_http_client_headers_write(dap_http_client_t * sh, void * arg); // Output
 void s_http_client_data_write(dap_http_client_t * sh, void * arg); // Write the data
 void stream_data_read(dap_http_client_t * sh, void * arg); // Read the data
 
-void s_es_read(dap_events_socket_t* sh, void * arg);
+void s_http_client_data_read(dap_events_socket_t* sh, void * arg);
 void stream_dap_data_write(dap_events_socket_t* sh, void * arg);
 void s_es_callback_delete(dap_events_socket_t* sh, void * arg);
 void stream_dap_udp_new(dap_events_socket_t* sh,void * arg);
@@ -165,7 +165,7 @@ void dap_stream_add_proc_http(struct dap_http * sh, const char * url)
 void dap_stream_add_proc_udp(dap_udp_server_t * sh)
 {
     dap_server_t* server =  sh->dap_server;
-    server->client_callbacks.read_callback = s_es_read;
+    server->client_callbacks.read_callback = s_http_client_data_read;
     server->client_callbacks.write_callback = stream_dap_data_write;
     server->client_callbacks.delete_callback = s_es_callback_delete;
     server->client_callbacks.new_callback = stream_dap_udp_new;
@@ -498,7 +498,7 @@ void s_http_client_data_write(dap_http_client_t * sh, void * arg)
  * @param sh
  * @param arg
  */
-void s_es_read(dap_events_socket_t* a_client, void * arg)
+void s_http_client_data_read(dap_events_socket_t* a_client, void * arg)
 {
     dap_stream_t * l_stream =DAP_STREAM(a_client);
     int * ret = (int *) arg;
@@ -816,7 +816,7 @@ void stream_proc_pkt_in(dap_stream_t * a_stream)
  */
 void stream_data_read(dap_http_client_t * sh, void * arg)
 {
-    s_es_read(sh->esocket,arg);
+    s_http_client_data_read(sh->esocket,arg);
 }
 
 
diff --git a/dap-sdk/net/stream/stream/dap_stream_ctl.c b/dap-sdk/net/stream/stream/dap_stream_ctl.c
index 868a4a594e..ef5d72345a 100644
--- a/dap-sdk/net/stream/stream/dap_stream_ctl.c
+++ b/dap-sdk/net/stream/stream/dap_stream_ctl.c
@@ -146,7 +146,7 @@ void s_proc(struct dap_http_simple *a_http_simple, void * a_arg)
             dap_random_string_fill(key_str, KEX_KEY_STR_SIZE);
             ss->key = dap_enc_key_new_generate( s_socket_forward_key.type, key_str, KEX_KEY_STR_SIZE,
                                                NULL, 0, s_socket_forward_key.size);
-            dap_http_header_t *l_hdr_key_id = dap_http_header_find(a_http_simple->http->in_headers, "KeyID");
+            dap_http_header_t *l_hdr_key_id = dap_http_header_find(a_http_simple->http_client->in_headers, "KeyID");
             if (l_hdr_key_id) {
                 dap_enc_ks_key_t *l_ks_key = dap_enc_ks_find(l_hdr_key_id->value);
                 if (!l_ks_key) {
diff --git a/modules/mempool/dap_chain_mempool.c b/modules/mempool/dap_chain_mempool.c
index 9b4c4ebe29..c5150e1cfd 100644
--- a/modules/mempool/dap_chain_mempool.c
+++ b/modules/mempool/dap_chain_mempool.c
@@ -887,15 +887,15 @@ void chain_mempool_proc(struct dap_http_simple *cl_st, void * arg)
 {
     http_status_code_t * return_code = (http_status_code_t*) arg;
     // save key while it alive, i.e. still exist
-    dap_enc_key_t *key = dap_enc_ks_find_http(cl_st->http);
+    dap_enc_key_t *key = dap_enc_ks_find_http(cl_st->http_client);
     //dap_enc_key_serealize_t *key_ser = dap_enc_key_serealize(key_tmp);
     //dap_enc_key_t *key = dap_enc_key_deserealize(key_ser, sizeof(dap_enc_key_serealize_t));
 
     // read header
     dap_http_header_t *hdr_session_close_id =
-            (cl_st->http) ? dap_http_header_find(cl_st->http->in_headers, "SessionCloseAfterRequest") : NULL;
+            (cl_st->http_client) ? dap_http_header_find(cl_st->http_client->in_headers, "SessionCloseAfterRequest") : NULL;
     dap_http_header_t *hdr_key_id =
-            (hdr_session_close_id && cl_st->http) ? dap_http_header_find(cl_st->http->in_headers, "KeyID") : NULL;
+            (hdr_session_close_id && cl_st->http_client) ? dap_http_header_find(cl_st->http_client->in_headers, "KeyID") : NULL;
 
     enc_http_delegate_t *dg = enc_http_request_decode(cl_st);
     if(dg) {
-- 
GitLab