From 95eca74a9ed0ece8c908433bc8231ace7dd5efe7 Mon Sep 17 00:00:00 2001
From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net>
Date: Tue, 6 Oct 2020 02:13:55 +0700
Subject: [PATCH] [+] Common network functions module [*] Some fixes [*]
 Implemented non-blocking sockets for dap_client_http

---
 dap-sdk/net/client/dap_client_http.c          | 248 +++++++++---------
 dap-sdk/net/client/dap_client_pvt.c           | 194 +++++++-------
 dap-sdk/net/core/dap_events_socket.c          |   3 +-
 dap-sdk/net/core/dap_net.c                    |  50 ++++
 dap-sdk/net/core/dap_worker.c                 |   8 +-
 dap-sdk/net/core/include/dap_net.h            |  41 +++
 .../server/json_rpc/src/dap_json_rpc_errors.c |   5 +-
 7 files changed, 317 insertions(+), 232 deletions(-)
 create mode 100644 dap-sdk/net/core/dap_net.c
 create mode 100644 dap-sdk/net/core/include/dap_net.h

diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c
index 7eb3b44e1b..74fde346b4 100644
--- a/dap-sdk/net/client/dap_client_http.c
+++ b/dap-sdk/net/client/dap_client_http.c
@@ -19,26 +19,14 @@
  along with any DAP based project.  If not, see <http://www.gnu.org/licenses/>.
  */
 
-#ifdef WIN32
-// for Windows
-#include <winsock2.h>
-#include <windows.h>
-#include <mswsock.h>
-#include <ws2tcpip.h>
-#include <io.h>
-#else
-// for Unix-like systems
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#endif
+
 #include <unistd.h>
 #include <errno.h>
 
 #include "dap_common.h"
 #include "dap_strfuncs.h"
 #include "dap_string.h"
+#include "dap_net.h"
 #include "dap_events_socket.h"
 #include "dap_stream_ch_proc.h"
 #include "dap_server.h"
@@ -56,7 +44,7 @@ typedef struct dap_http_client_internal {
     dap_client_http_callback_error_t error_callback;
 
     void *obj; // dap_client_pvt_t *client_pvt;
-    uint8_t *request;
+    byte_t *request;
     size_t request_size;
     size_t request_sent_size;
 
@@ -70,9 +58,29 @@ typedef struct dap_http_client_internal {
     size_t response_size;
     size_t response_size_max;
 
-} dap_client_http_internal_t;
+    // Request args
+    const char *uplink_addr;
+    uint16_t uplink_port;
+    const char *method;
+    const char *request_content_type;
+    const char * path;
+    char *cookie;
+    char **request_custom_headers; // Custom headers
+    size_t request_custom_headers_count;
+
+    // Request vars
+    dap_worker_t * worker;
 
-#define DAP_CLIENT_HTTP(a) (a ? (dap_client_http_internal_t *) (a)->_inheritor : NULL)
+} dap_client_http_pvt_t;
+
+#define PVT(a) (a ? (dap_client_http_pvt_t *) (a)->_inheritor : NULL)
+
+static void s_http_connected(dap_events_socket_t * a_esocket); // Connected callback
+static void s_http_new(dap_events_socket_t * a_es, void * arg); // New callback (assigned on worker first time)
+static void s_http_delete(dap_events_socket_t * a_es, void * arg);
+static void s_http_read(dap_events_socket_t * a_es, void * arg);
+static void s_http_write(dap_events_socket_t * a_es, void * arg);
+static void s_http_error(dap_events_socket_t * a_es, int a_arg);
 
 /**
  * @brief s_http_new
@@ -83,7 +91,7 @@ static void s_http_new(dap_events_socket_t * a_es, void * arg)
 {
     UNUSED(arg);
     log_it(L_DEBUG, "HTTP client connected");
-    dap_client_http_internal_t * l_client_http_internal = DAP_CLIENT_HTTP(a_es);
+    dap_client_http_pvt_t * l_client_http_internal = PVT(a_es);
     if(!l_client_http_internal) {
         log_it(L_ERROR, "s_http_new: l_client_http_internal is NULL!");
         return;
@@ -95,6 +103,8 @@ static void s_http_new(dap_events_socket_t * a_es, void * arg)
     l_client_http_internal->response = (uint8_t*) DAP_NEW_Z_SIZE(uint8_t, DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX);
 }
 
+
+
 /**
  * @brief s_http_stream_write
  * @param a_es
@@ -123,7 +133,7 @@ static void s_http_write(dap_events_socket_t * a_es, void * arg)
 static void s_http_read(dap_events_socket_t * a_es, void * arg)
 {
 //    log_it(L_DEBUG, "s_http_read ");
-    dap_client_http_internal_t * l_client_http_internal = DAP_CLIENT_HTTP(a_es);
+    dap_client_http_pvt_t * l_client_http_internal = PVT(a_es);
     if(!l_client_http_internal) {
         log_it(L_ERROR, "s_http_read: l_client_http_internal is NULL!");
         return;
@@ -218,25 +228,33 @@ static void s_http_read(dap_events_socket_t * a_es, void * arg)
 /**
  * @brief s_http_stream_error
  * @param a_es
- * @param arg
+ * @param a_errno
  */
-static void s_http_error(dap_events_socket_t * a_es, int arg)
+static void s_http_error(dap_events_socket_t * a_es, int a_errno)
 {
-    log_it(L_INFO, "http client error");
-    dap_client_http_internal_t * l_client_http_internal = DAP_CLIENT_HTTP(a_es);
+    char l_errbuf[128];
+    l_errbuf[0] = '\0';
+    if(a_errno)
+        strerror_r(a_errno, l_errbuf, sizeof (l_errbuf));
+    else
+        strncpy(l_errbuf,"Unknown Error", sizeof (l_errbuf)-1);
+    if (a_es->flags & DAP_SOCK_CONNECTING)
+        log_it(L_WARNING, "Socket connecting error: %s (code %d)" , l_errbuf, a_errno);
+    else
+        log_it(L_WARNING, "Socket error: %s (code %d)" , l_errbuf, a_errno);
+
+    dap_client_http_pvt_t * l_client_http_internal = PVT(a_es);
     if(!l_client_http_internal) {
         log_it(L_ERROR, "s_http_write: l_client_http_internal is NULL!");
         return;
     }
     if(l_client_http_internal->error_callback)
-        l_client_http_internal->error_callback(arg, l_client_http_internal->obj);
+        l_client_http_internal->error_callback(a_errno, l_client_http_internal->obj);
 
-    // close connection
+    // close connection.
+    // TODO merge this things into the one (I expect better it would be flag )
+    a_es->flags &= DAP_SOCK_SIGNAL_CLOSE;
     a_es->kill_signal = true;
-    //dap_events_socket_remove_and_delete(a_es, true);
-    //dap_events_thread_wake_up( &a_es->events->proc_thread);
-    //dap_events_socket_delete(a_es, false);
-    //a_es->no_close = false;
 }
 
 /**
@@ -249,67 +267,19 @@ static void s_http_delete(dap_events_socket_t *a_es, void *arg)
     UNUSED(arg);
     // call from dap_events_socket_delete(ev_socket, true);
     log_it(L_DEBUG, "HTTP client disconnected");
-    dap_client_http_internal_t * l_client_http_internal = DAP_CLIENT_HTTP(a_es);
+    dap_client_http_pvt_t * l_client_http_internal = PVT(a_es);
     if(!l_client_http_internal) {
         log_it(L_ERROR, "s_http_write: l_client_http_internal is NULL!");
         return;
     }
-    //close(l_client_http_internal->socket);
-    //l_client_http_internal->socket = 0;
-
-    DAP_DELETE(l_client_http_internal->response);
-    l_client_http_internal->response = NULL;
-
-    DAP_DELETE(l_client_http_internal);
-    a_es->_inheritor = NULL;
-}
 
-/**
- * @brief resolve_host
- * @param a_host hostname
- * @param ai_family AF_INET  for ipv4 or AF_INET6 for ipv6
- * @param a_addr_out out addr (struct in_addr or struct in6_addr)
- * @param return 0 of OK, <0 Error
- */
-int resolve_host(const char *a_host, int ai_family, struct sockaddr *a_addr_out)
-{
-    struct addrinfo l_hints, *l_res;
-    void *l_cur_addr = NULL;
-
-    memset(&l_hints, 0, sizeof(l_hints));
-    l_hints.ai_family = PF_UNSPEC;
-    l_hints.ai_socktype = SOCK_STREAM;
-    l_hints.ai_flags |= AI_CANONNAME;
-
-    int errcode = getaddrinfo(a_host, NULL, &l_hints, &l_res);
-    if(errcode != 0)
-            {
-        return -2;
+    if (l_client_http_internal->response){
+        DAP_DELETE(l_client_http_internal->response);
+        l_client_http_internal->response = NULL;
     }
-    while(l_res)
-    {
-        if(ai_family == l_res->ai_family)
-            switch (l_res->ai_family)
-            {
-            case AF_INET:
-                l_cur_addr = &((struct sockaddr_in *) l_res->ai_addr)->sin_addr;
-                memcpy(a_addr_out, l_cur_addr, sizeof(struct in_addr));
-                break;
-            case AF_INET6:
-                l_cur_addr = &((struct sockaddr_in6 *) l_res->ai_addr)->sin6_addr;
-                memcpy(a_addr_out, l_cur_addr, sizeof(struct in6_addr));
-                break;
-            }
-        if(l_cur_addr) {
-            freeaddrinfo(l_res);
-            return 0;
-        }
-        l_res = l_res->ai_next;
-    }
-    freeaddrinfo(l_res);
-    return -1;
 }
 
+
 /**
  * @brief dap_client_http_request_custom
  * @param a_uplink_addr
@@ -334,6 +304,7 @@ void* dap_client_http_request_custom(const char *a_uplink_addr, uint16_t a_uplin
     //log_it(L_DEBUG, "HTTP request on url '%s:%d'", a_uplink_addr, a_uplink_port);
     static dap_events_socket_callbacks_t l_s_callbacks = {
         .new_callback = s_http_new,
+        .connected_callback = s_http_connected,
         .read_callback = s_http_read,
         .write_callback = s_http_write,
         .error_callback = s_http_error,
@@ -346,6 +317,7 @@ void* dap_client_http_request_custom(const char *a_uplink_addr, uint16_t a_uplin
         log_it(L_ERROR, "Error %d with socket create", errno);
         return NULL;
     }
+    fcntl( l_socket, F_SETFL, O_NONBLOCK); // Make it non-block
     // set socket param
     int buffsize = DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX;
 #ifdef _WIN32
@@ -358,102 +330,128 @@ void* dap_client_http_request_custom(const char *a_uplink_addr, uint16_t a_uplin
     dap_events_socket_t *l_ev_socket = dap_events_socket_wrap_no_add(dap_events_get_default(), l_socket, &l_s_callbacks);
 
     // create private struct
-    dap_client_http_internal_t *l_client_http_internal = DAP_NEW_Z(dap_client_http_internal_t);
+    dap_client_http_pvt_t *l_client_http_internal = DAP_NEW_Z(dap_client_http_pvt_t);
     l_ev_socket->_inheritor = l_client_http_internal;
     l_client_http_internal->error_callback = a_error_callback;
     l_client_http_internal->response_callback = a_response_callback;
     //l_client_http_internal->socket = l_socket;
     l_client_http_internal->obj = a_obj;
+    l_client_http_internal->method = a_method;
+    l_client_http_internal->path = a_path;
+    l_client_http_internal->request_content_type = a_request_content_type;
+    l_client_http_internal->request = a_request;
+    l_client_http_internal->request_size = a_request_size;
+    l_client_http_internal->uplink_addr = a_uplink_addr;
+    l_client_http_internal->uplink_port = a_uplink_port;
+    l_client_http_internal->cookie = a_cookie;
+    l_client_http_internal->request_custom_headers = a_custom;
+    l_client_http_internal->request_custom_headers_count = a_custom_count;
 
-    struct sockaddr_in l_remote_addr;
-    memset(&l_remote_addr, 0, sizeof(l_remote_addr));
     // get struct in_addr from ip_str
-    inet_pton(AF_INET, a_uplink_addr, &(l_remote_addr.sin_addr));
+    inet_pton(AF_INET, a_uplink_addr, &(l_ev_socket->remote_addr.sin_addr));
     //Resolve addr if
-    if(!l_remote_addr.sin_addr.s_addr) {
-        if(resolve_host(a_uplink_addr, AF_INET, (struct sockaddr*) &l_remote_addr.sin_addr) < 0) {
+    if(!l_ev_socket->remote_addr.sin_addr.s_addr) {
+        if(dap_net_resolve_host(a_uplink_addr, AF_INET, (struct sockaddr*) &l_ev_socket->remote_addr.sin_addr) < 0) {
             log_it(L_ERROR, "Wrong remote address '%s:%u'", a_uplink_addr, a_uplink_port);
-            DAP_DELETE(l_ev_socket);
-            close(l_socket);
+            dap_events_socket_remove_and_delete_unsafe( l_ev_socket, true);
             return NULL;
         }
     }
+    l_client_http_internal->worker = dap_worker_add_events_socket_auto(l_ev_socket);
     // connect
-    l_remote_addr.sin_family = AF_INET;
-    l_remote_addr.sin_port = htons(a_uplink_port);
-    int l_err = 0;
-    dap_worker_t *l_worker = NULL;
-    if((l_err = connect(l_socket, (struct sockaddr *) &l_remote_addr, sizeof(struct sockaddr_in))) != -1) {
-        //s_set_sock_nonblock(a_client_pvt->stream_socket, false);
-        log_it(L_INFO, "Remote address connected (%s:%u) with sock_id %d", a_uplink_addr, a_uplink_port, l_socket);
-        // add to dap_worker
-        l_worker = dap_worker_add_events_socket_auto(l_ev_socket);
-    }
-    else {
-        log_it(L_ERROR, "Remote address can't connected (%s:%u) with sock_id %d err: %s", a_uplink_addr, a_uplink_port,
-                l_socket, strerror(errno));
-        DAP_DELETE(l_ev_socket);
-        close(l_socket);
-        return NULL ;
+    l_ev_socket->remote_addr.sin_family = AF_INET;
+    l_ev_socket->remote_addr.sin_port = htons(a_uplink_port);
+    int l_err = connect(l_socket, (struct sockaddr *) &l_ev_socket->remote_addr, sizeof(struct sockaddr_in));
+    if (l_err == 0){
+        log_it(L_DEBUG, "Connected momentaly with %s:%u!", a_uplink_addr, a_uplink_port);
+        return l_client_http_internal;
+    }else if( l_err == EINPROGRESS){
+        log_it(L_DEBUG, "Connecting to %s:%u", a_uplink_addr, a_uplink_port);
+        return l_client_http_internal;
+    }else{
+        char l_errbuf[128];
+        l_errbuf[0] = '\0';
+        strerror_r(l_err, l_errbuf, sizeof (l_errbuf));
+        log_it(L_ERROR, "Connecting error: \"%s\" (code %d)", l_errbuf, l_err);
+        return NULL;
     }
+}
 
+/**
+ * @brief s_http_connected
+ * @param a_esocket
+ */
+static void s_http_connected(dap_events_socket_t * a_esocket)
+{
+    assert(a_esocket);
+    dap_client_http_pvt_t * l_http_pvt = (dap_client_http_pvt_t*) a_esocket->_inheritor;
+    assert(l_http_pvt);
+    dap_worker_t *l_worker = l_http_pvt->worker;
+    assert(l_worker);
+
+    log_it(L_INFO, "Remote address connected (%s:%u) with sock_id %d", l_http_pvt->uplink_addr, l_http_pvt->uplink_port, a_esocket->socket);
+    // add to dap_worker
     //dap_client_pvt_t * l_client_pvt = (dap_client_pvt_t*) a_obj;
     //dap_events_new();
     dap_string_t *l_request_headers = dap_string_new(NULL);
 
-    if(a_request && (dap_strcmp(a_method, "POST") == 0 || dap_strcmp(a_method, "POST_ENC") == 0)) {
+    if(l_http_pvt->request && (dap_strcmp(l_http_pvt->method, "POST") == 0 || dap_strcmp(l_http_pvt->method, "POST_ENC") == 0)) {
         char l_buf[1024];
         //log_it(L_DEBUG, "POST request with %u bytes of decoded data", a_request_size);
 
-        if(a_request_content_type) {
-            dap_snprintf(l_buf, sizeof(l_buf), "Content-Type: %s\r\n", a_request_content_type);
+        if(l_http_pvt->request_content_type) {
+            dap_snprintf(l_buf, sizeof(l_buf), "Content-Type: %s\r\n", l_http_pvt->request_content_type);
             l_request_headers = dap_string_append(l_request_headers, l_buf);
         }
 
-        if(a_custom) {
-            for( size_t i = 0; i < a_custom_count; i++) {
-                l_request_headers = dap_string_append(l_request_headers, (char*) a_custom[i]);
+        // Add custom headers
+        if(l_http_pvt->request_custom_headers) {
+            for( size_t i = 0; i < l_http_pvt->request_custom_headers_count; i++) {
+                l_request_headers = dap_string_append(l_request_headers, (char*) l_http_pvt->request_custom_headers[i]);
                 l_request_headers = dap_string_append(l_request_headers, "\r\n");
             }
         }
 
-        if(a_cookie) {
-            dap_snprintf(l_buf, sizeof(l_buf), "Cookie: %s\r\n", a_cookie);
+        // Setup cookie header
+        if(l_http_pvt->cookie) {
+            dap_snprintf(l_buf, sizeof(l_buf), "Cookie: %s\r\n", l_http_pvt->cookie);
             l_request_headers = dap_string_append(l_request_headers, l_buf);
         }
 
-        dap_snprintf(l_buf, sizeof(l_buf), "Content-Length: %lu\r\n", a_request_size);
+        // Set request size as Content-Length header
+        dap_snprintf(l_buf, sizeof(l_buf), "Content-Length: %lu\r\n", l_http_pvt->request_size);
         l_request_headers = dap_string_append(l_request_headers, l_buf);
     }
 
     // adding string for GET request
     char *l_get_str = NULL;
-    if(!dap_strcmp(a_method, "GET")) {
+    if(!dap_strcmp(l_http_pvt->method, "GET")) {
         char l_buf[1024];
-        dap_snprintf(l_buf, sizeof(l_buf), "User-Agent: Mozilla\r\n");
-        if(a_cookie) {
-            dap_snprintf(l_buf, sizeof(l_buf), "Cookie: %s\r\n", a_cookie);
+        dap_snprintf(l_buf, sizeof(l_buf), "User-Agent: Mozilla\r\n"); // We hide our request and mask them as possible
+        if(l_http_pvt->cookie) {
+            dap_snprintf(l_buf, sizeof(l_buf), "Cookie: %s\r\n", l_http_pvt->cookie);
             l_request_headers = dap_string_append(l_request_headers, l_buf);
         }
 
-        if(a_request)
-            l_get_str = dap_strdup_printf("?%s", a_request);
+        if(l_http_pvt->request)
+            l_get_str = dap_strdup_printf("?%s", l_http_pvt->request);
     }
 
     // send header
-    dap_events_socket_write_f_mt(l_worker, l_ev_socket, "%s /%s%s HTTP/1.1\r\n"
+    dap_events_socket_write_f_mt(l_worker, a_esocket, "%s /%s%s HTTP/1.1\r\n"
             "Host: %s\r\n"
             "%s"
             "\r\n",
-            a_method, a_path, l_get_str ? l_get_str : "", a_uplink_addr, l_request_headers->str);
+            l_http_pvt->method, l_http_pvt->path, l_get_str ? l_get_str : "", l_http_pvt->uplink_addr, l_request_headers->str);
     // send data for POST request
     if(!l_get_str)
-        dap_events_socket_write_mt(l_worker, l_ev_socket, a_request, a_request_size);
+        dap_events_socket_write_mt(l_worker, a_esocket, l_http_pvt->request, l_http_pvt->request_size);
     DAP_DELETE(l_get_str);
     dap_string_free(l_request_headers, true);
-    return l_client_http_internal;
+
 }
 
+
 /**
  * @brief dap_client_http_request
  * @param a_uplink_addr
diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c
index de0202c14b..630e83a902 100644
--- a/dap-sdk/net/client/dap_client_pvt.c
+++ b/dap-sdk/net/client/dap_client_pvt.c
@@ -80,19 +80,18 @@
 static void s_stage_status_after(dap_client_pvt_t * a_client_internal);
 
 // ENC stage callbacks
-void m_enc_init_response(dap_client_t *, void *, size_t);
-void m_enc_init_error(dap_client_t *, int);
+static void s_enc_init_response(dap_client_t *, void *, size_t);
+static void s_enc_init_error(dap_client_t *, int);
 
 // STREAM_CTL stage callbacks
-void m_stream_ctl_response(dap_client_t *, void *, size_t);
-void m_stream_ctl_error(dap_client_t *, int);
-void m_stage_stream_streaming(dap_client_t * a_client, void* arg);
+static void s_stream_ctl_response(dap_client_t *, void *, size_t);
+static void s_stream_ctl_error(dap_client_t *, int);
+static void s_stage_stream_streaming(dap_client_t * a_client, void* arg);
 
 // STREAM stage callbacks
-void m_stream_response(dap_client_t *, void *, size_t);
-void m_stream_error(dap_client_t *, int);
-void m_request_response(void * a_response, size_t a_response_size, void * a_obj);
-void m_request_error(int, void *);
+static void s_stream_response(dap_client_t *, void *, size_t);
+static void s_request_response(void * a_response, size_t a_response_size, void * a_obj);
+static void s_request_error(int, void *);
 
 // Stream connection callback
 static void s_stream_connected(dap_client_pvt_t * a_client_pvt);
@@ -224,31 +223,6 @@ static void s_stream_connected(dap_client_pvt_t * a_client_pvt)
     s_stage_status_after(a_client_pvt);
 }
 
-/**
- * @brief s_stream_connect_error
- * @param a_client_pvt
- * @param a_err
- */
-static void s_stream_connect_error (dap_client_pvt_t * a_client_pvt, int a_err)
-{
-    char l_errbuf[128];
-    l_errbuf[0]='\0';
-    if (a_err)
-        strerror_r(a_err,l_errbuf,sizeof (l_errbuf));
-    else
-        strncpy(l_errbuf,"Unknown Error",sizeof(l_errbuf)-1);
-    log_it(L_ERROR, "Remote address can't connected (%s:%u) with sock_id %d: \"%s\" (code %d)", a_client_pvt->uplink_addr,
-            a_client_pvt->uplink_port, l_errbuf, a_err);
-    dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es);
-    //close(a_client_pvt->stream_socket);
-    a_client_pvt->stream_socket = 0;
-    a_client_pvt->stage_status = STAGE_STATUS_ERROR;
-    a_client_pvt->last_error = ERROR_STREAM_CONNECT ;
-
-    s_stage_status_after(a_client_pvt);
-
-}
-
 /**
  * @brief s_client_internal_stage_status_proc
  * @param a_client
@@ -289,7 +263,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
                     size_t l_data_str_enc_size = dap_enc_base64_encode(l_data, l_key_size + l_sign_size, l_data_str, DAP_ENC_DATA_TYPE_B64);
                     log_it(L_DEBUG, "ENC request size %u", l_data_str_enc_size);
                     int l_res = dap_client_pvt_request(a_client_pvt, DAP_UPLINK_PATH_ENC_INIT "/gd4y5yh78w42aaagh",
-                            l_data_str, l_data_str_enc_size, m_enc_init_response, m_enc_init_error);
+                            l_data_str, l_data_str_enc_size, s_enc_init_response, s_enc_init_error);
                     // bad request
                     if(l_res<0){
                         a_client_pvt->stage_status = STAGE_STATUS_ERROR;
@@ -319,7 +293,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
                     dap_client_pvt_request_enc(a_client_pvt,
                     DAP_UPLINK_PATH_STREAM_CTL,
                             l_suburl, "type=tcp,maxconn=4", l_request, l_request_size,
-                            m_stream_ctl_response, m_stream_ctl_error);
+                            s_stream_ctl_response, s_stream_ctl_error);
                     DAP_DELETE(l_request);
                     DAP_DELETE(l_suburl);
                 }
@@ -337,7 +311,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
                     }
         #ifdef _WIN32
                     {
-                      int buffsize = 65536;
+                      int buffsize = 65536*4;
                       int optsize = sizeof( int );
                       setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (char *)&buffsize, &optsize );
                       setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, &optsize );
@@ -395,11 +369,28 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
                                  sizeof (a_client_pvt->stream_es->remote_addr_str)-1 );
 
                         if((l_err = connect(a_client_pvt->stream_socket, (struct sockaddr *) &a_client_pvt->stream_es->remote_addr,
-                                sizeof(struct sockaddr_in))) != -1) {
-                            s_stream_connected(a_client_pvt);
+                                sizeof(struct sockaddr_in))) ==0) {
+                            log_it(L_DEBUG, "Connected momentaly with %s:%u", a_client_pvt->uplink_addr, a_client_pvt->uplink_port);
                         }
                         else if (l_err != EINPROGRESS){
-                            s_stream_connect_error(a_client_pvt,l_err);
+                            char l_errbuf[128];
+                            l_errbuf[0]='\0';
+                            if (l_err)
+                                strerror_r(l_err,l_errbuf,sizeof (l_errbuf));
+                            else
+                                strncpy(l_errbuf,"Unknown Error",sizeof(l_errbuf)-1);
+                            log_it(L_ERROR, "Remote address can't connect (%s:%u) with sock_id %d: \"%s\" (code %d)", a_client_pvt->uplink_addr,
+                                    a_client_pvt->uplink_port, l_errbuf, l_err);
+                            a_client_pvt->stream_es->kill_signal = true;
+                            a_client_pvt->stream_es->flags &= DAP_SOCK_SIGNAL_CLOSE;
+                            //close(a_client_pvt->stream_socket);
+                            a_client_pvt->stream_socket = 0;
+                            a_client_pvt->stage_status = STAGE_STATUS_ERROR;
+                            a_client_pvt->last_error = ERROR_STREAM_CONNECT ;
+
+                            s_stage_status_after(a_client_pvt);
+                        }else{
+                            log_it(L_INFO,"Connecting to remote %s:%s",a_client_pvt->uplink_addr, a_client_pvt->uplink_port);
                         }
                     }
 
@@ -591,7 +582,7 @@ int dap_client_pvt_request(dap_client_pvt_t * a_client_internal, const char * a_
 //    }
     void *l_ret = dap_client_http_request(a_client_internal->uplink_addr,a_client_internal->uplink_port,
                                            a_request ? "POST" : "GET", "text/text", a_path, a_request,
-                                            a_request_size, NULL, m_request_response, m_request_error, a_client_internal, NULL);
+                                            a_request_size, NULL, s_request_response, s_request_error, a_client_internal, NULL);
 //    a_client_internal->curl = dap_http_client_simple_request(l_url, a_request ? "POST" : "GET", "text/text", a_request,
 //            a_request_size, NULL, m_request_response, m_request_error, &a_client_internal->curl_sockfd, a_client_internal, NULL);
 //    DAP_DELETE(l_url);
@@ -717,7 +708,7 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char
     }
     dap_client_http_request_custom(a_client_internal->uplink_addr, a_client_internal->uplink_port, a_request ? "POST" : "GET", "text/text",
                 l_path, l_request_enc, l_request_enc_size, NULL,
-                m_request_response, m_request_error, a_client_internal, a_custom_new, a_custom_count);
+                s_request_response, s_request_error, a_client_internal, a_custom_new, a_custom_count);
 //    dap_http_client_simple_request_custom(l_url_full, a_request ? "POST" : "GET", "text/text",
 //            l_request_enc, l_request_enc_size, NULL,
 //            m_request_response, a_client_internal->curl_sockfd ,m_request_error, a_client_internal, a_custom_new, a_custom_count);
@@ -737,11 +728,11 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char
 }
 
 /**
- * @brief m_request_error
+ * @brief s_request_error
  * @param a_err_code
  * @param a_obj
  */
-void m_request_error(int a_err_code, void * a_obj)
+static void s_request_error(int a_err_code, void * a_obj)
 {
     dap_client_pvt_t * a_client_internal = (dap_client_pvt_t *) a_obj;
     dap_client_pvt_hh_lock();
@@ -758,12 +749,12 @@ void m_request_error(int a_err_code, void * a_obj)
 }
 
 /**
- * @brief m_request_response
+ * @brief s_request_response
  * @param a_response
  * @param a_response_size
  * @param a_obj
  */
-void m_request_response(void * a_response, size_t a_response_size, void * a_obj)
+static void s_request_response(void * a_response, size_t a_response_size, void * a_obj)
 {
     dap_client_pvt_t * a_client_internal = (dap_client_pvt_t *) a_obj;
     if(!a_client_internal || !a_client_internal->client)
@@ -794,12 +785,12 @@ void m_request_response(void * a_response, size_t a_response_size, void * a_obj)
 }
 
 /**
- * @brief m_enc_init_response
+ * @brief s_enc_init_response
  * @param a_client
  * @param a_response
  * @param a_response_size
  */
-void m_enc_init_response(dap_client_t * a_client, void * a_response, size_t a_response_size)
+static void s_enc_init_response(dap_client_t * a_client, void * a_response, size_t a_response_size)
 {
     dap_client_pvt_t * l_client_pvt = a_client ? DAP_CLIENT_PVT(a_client) : NULL;
     if(!l_client_pvt) {
@@ -901,11 +892,11 @@ void m_enc_init_response(dap_client_t * a_client, void * a_response, size_t a_re
 }
 
 /**
- * @brief m_enc_init_error
+ * @brief s_enc_init_error
  * @param a_client
  * @param a_err_code
  */
-void m_enc_init_error(dap_client_t * a_client, int a_err_code)
+static void s_enc_init_error(dap_client_t * a_client, int a_err_code)
 {
     dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client);
     if(!l_client_pvt) {
@@ -926,12 +917,12 @@ void m_enc_init_error(dap_client_t * a_client, int a_err_code)
 }
 
 /**
- * @brief m_stream_ctl_response
+ * @brief s_stream_ctl_response
  * @param a_client
  * @param a_data
  * @param a_data_size
  */
-void m_stream_ctl_response(dap_client_t * a_client, void * a_data, size_t a_data_size)
+static void s_stream_ctl_response(dap_client_t * a_client, void * a_data, size_t a_data_size)
 {
     dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client);
     if(!l_client_internal) {
@@ -1014,11 +1005,11 @@ void m_stream_ctl_response(dap_client_t * a_client, void * a_data, size_t a_data
 }
 
 /**
- * @brief m_stream_ctl_error
+ * @brief s_stream_ctl_error
  * @param a_client
  * @param a_error
  */
-void m_stream_ctl_error(dap_client_t * a_client, int a_error)
+static void s_stream_ctl_error(dap_client_t * a_client, int a_error)
 {
     log_it(L_WARNING, "STREAM_CTL error %d", a_error);
 
@@ -1038,8 +1029,14 @@ void m_stream_ctl_error(dap_client_t * a_client, int a_error)
 
 }
 
-// STREAM stage callbacks
-void m_stream_response(dap_client_t * a_client, void * a_data, size_t a_data_size)
+//
+/**
+ * @brief s_stream_response STREAM stage callbacks
+ * @param a_client
+ * @param a_data
+ * @param a_data_size
+ */
+static void s_stream_response(dap_client_t * a_client, void * a_data, size_t a_data_size)
 {
     dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client);
     if(!l_client_internal) {
@@ -1062,32 +1059,12 @@ void m_stream_response(dap_client_t * a_client, void * a_data, size_t a_data_siz
     s_stage_status_after(l_client_internal);
 }
 
-void m_stream_error(dap_client_t * a_client, int a_error)
-{
-    log_it(L_WARNING, "STREAM error %d", a_error);
-
-    dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client);
-    if(!l_client_pvt) {
-        log_it(L_ERROR, "m_stream_error: l_client_pvt is NULL!");
-        return;
-    }
-
-    if (a_error == ETIMEDOUT) {
-        l_client_pvt->last_error = ERROR_NETWORK_CONNECTION_TIMEOUT;
-    } else {
-        l_client_pvt->last_error = ERROR_STREAM_RESPONSE_WRONG;
-    }
-    l_client_pvt->stage_status = STAGE_STATUS_ERROR;
-
-    s_stage_status_after(l_client_pvt);
-}
-
 /**
- * @brief m_stage_stream_opened
+ * @brief s_stage_stream_opened
  * @param a_client
  * @param arg
  */
-void m_stage_stream_streaming(dap_client_t * a_client, void* arg)
+static void s_stage_stream_streaming(dap_client_t * a_client, void* arg)
 {
     log_it(L_INFO, "Stream  is opened");
 }
@@ -1104,13 +1081,14 @@ static void s_stream_es_callback_connected(dap_events_socket_t * a_es)
 }
 
 /**
- * @brief m_es_stream_delete
+ * @brief s_es_stream_delete
  * @param a_es
  * @param arg
  */
 static void s_stream_es_callback_delete(dap_events_socket_t *a_es, void *arg)
 {
-    log_it(L_INFO, "================= stream delete/peer reconnect");
+    (void) arg;
+    log_it(L_INFO, "Stream delete callback");
 
     dap_client_pvt_t * l_client_pvt =(dap_client_pvt_t*) a_es->_inheritor;
     a_es->_inheritor = NULL; // To prevent delete in reactor
@@ -1132,24 +1110,16 @@ static void s_stream_es_callback_delete(dap_events_socket_t *a_es, void *arg)
     dap_stream_delete(l_client_pvt->stream);
     l_client_pvt->stream = NULL;
     l_client_pvt->stream_es = NULL;
-/*  disable reconnect from here
-    if(l_client_pvt->is_reconnect) {
-        log_it(L_DEBUG, "l_client_pvt->is_reconnect = true");
-
-        dap_client_go_stage(l_client_pvt->client, STAGE_STREAM_STREAMING, m_stage_stream_streaming);
-    }
-    else
-        log_it(L_DEBUG, "l_client_pvt->is_reconnect = false");
-*/
 }
 
 /**
- * @brief m_es_stream_read
+ * @brief s_es_stream_read
  * @param a_es
  * @param arg
  */
 static void s_stream_es_callback_read(dap_events_socket_t * a_es, void * arg)
 {
+    (void) arg;
     //dap_client_t * l_client = DAP_CLIENT(a_es);
     dap_client_pvt_t * l_client_pvt =(dap_client_pvt_t *) a_es->_inheritor;//(l_client) ? DAP_CLIENT_PVT(l_client) : NULL;
     if(!l_client_pvt) {
@@ -1158,7 +1128,7 @@ static void s_stream_es_callback_read(dap_events_socket_t * a_es, void * arg)
     }
     switch (l_client_pvt->stage) {
         case STAGE_STREAM_SESSION:
-            dap_client_go_stage(l_client_pvt->client, STAGE_STREAM_STREAMING, m_stage_stream_streaming);
+            dap_client_go_stage(l_client_pvt->client, STAGE_STREAM_STREAMING, s_stage_stream_streaming);
             break;
         case STAGE_STREAM_CONNECTED: { // Collect HTTP headers before streaming
             if(a_es->buf_in_size > 1) {
@@ -1192,12 +1162,13 @@ static void s_stream_es_callback_read(dap_events_socket_t * a_es, void * arg)
 }
 
 /**
- * @brief m_es_stream_write
+ * @brief s_es_stream_write
  * @param a_es
  * @param arg
  */
 static void s_stream_es_callback_write(dap_events_socket_t * a_es, void * arg)
 {
+    (void) arg;
     //dap_client_t * l_client = DAP_CLIENT(a_es);
     //dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL;
     dap_client_pvt_t * l_client_pvt = a_es->_inheritor;
@@ -1229,14 +1200,37 @@ static void s_stream_es_callback_write(dap_events_socket_t * a_es, void * arg)
     }
 }
 
-static void s_stream_es_callback_error(dap_events_socket_t * a_es, int a_arg)
+/**
+ * @brief s_stream_es_callback_error
+ * @param a_es
+ * @param a_error
+ */
+static void s_stream_es_callback_error(dap_events_socket_t * a_es, int a_error)
 {
-    //dap_client_t * l_client = DAP_CLIENT(a_es);
-    //dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL;
-    dap_client_pvt_t * l_client_pvt = a_es->_inheritor;
-    if(!l_client_pvt) {
-        log_it(L_ERROR, "m_es_stream_error: l_client_pvt is NULL!");
-        return;
+    dap_client_pvt_t * l_client_pvt = (dap_client_pvt_t *) a_es->_inheritor;
+    assert( l_client_pvt);
+
+    char l_errbuf[128];
+    l_errbuf[0]='\0';
+    if (a_error)
+        strerror_r(a_error,l_errbuf,sizeof (l_errbuf));
+    else
+        strncpy(l_errbuf,"Unknown Error",sizeof(l_errbuf)-1);
+
+    log_it(L_WARNING, "STREAM error \"%s\" (code %d)", l_errbuf, a_error);
+
+
+    // TODO merge flag and field
+    l_client_pvt->stream_es->kill_signal = true;
+    l_client_pvt->stream_es->flags &= DAP_SOCK_SIGNAL_CLOSE;
+
+
+    if (a_error == ETIMEDOUT) {
+        l_client_pvt->last_error = ERROR_NETWORK_CONNECTION_TIMEOUT;
+    } else {
+        l_client_pvt->last_error = ERROR_STREAM_RESPONSE_WRONG;
     }
-    log_it(L_INFO, "m_es_stream_error: code %d", a_arg);
+    l_client_pvt->stage_status = STAGE_STATUS_ERROR;
+
+    s_stage_status_after(l_client_pvt);
 }
diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c
index d6929e3a09..8a6ffdd995 100644
--- a/dap-sdk/net/core/dap_events_socket.c
+++ b/dap-sdk/net/core/dap_events_socket.c
@@ -788,7 +788,8 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool
         return;
 
     //log_it( L_DEBUG, "es is going to be removed from the lists and free the memory (0x%016X)", a_es );
-    dap_events_socket_remove_from_worker_unsafe(a_es, a_es->worker);
+    if ( a_es->worker)
+        dap_events_socket_remove_from_worker_unsafe(a_es, a_es->worker);
 
     if (a_es->events){ // It could be socket NOT from events
         pthread_rwlock_wrlock( &a_es->events->sockets_rwlock );
diff --git a/dap-sdk/net/core/dap_net.c b/dap-sdk/net/core/dap_net.c
new file mode 100644
index 0000000000..8d4d744c3d
--- /dev/null
+++ b/dap-sdk/net/core/dap_net.c
@@ -0,0 +1,50 @@
+#include <string.h>
+#include "dap_net.h"
+
+#define LOG_TAG "dap_net"
+
+/**
+ * @brief dap_net_resolve_host
+ * @param a_host hostname
+ * @param ai_family AF_INET  for ipv4 or AF_INET6 for ipv6
+ * @param a_addr_out out addr (struct in_addr or struct in6_addr)
+ * @param return 0 of OK, <0 Error
+ */
+int dap_net_resolve_host(const char *a_host, int ai_family, struct sockaddr *a_addr_out)
+{
+    struct addrinfo l_hints, *l_res;
+    void *l_cur_addr = NULL;
+
+    memset(&l_hints, 0, sizeof(l_hints));
+    l_hints.ai_family = PF_UNSPEC;
+    l_hints.ai_socktype = SOCK_STREAM;
+    l_hints.ai_flags |= AI_CANONNAME;
+
+    int errcode = getaddrinfo(a_host, NULL, &l_hints, &l_res);
+    if(errcode != 0)
+            {
+        return -2;
+    }
+    while(l_res)
+    {
+        if(ai_family == l_res->ai_family)
+            switch (l_res->ai_family)
+            {
+            case AF_INET:
+                l_cur_addr = &((struct sockaddr_in *) l_res->ai_addr)->sin_addr;
+                memcpy(a_addr_out, l_cur_addr, sizeof(struct in_addr));
+                break;
+            case AF_INET6:
+                l_cur_addr = &((struct sockaddr_in6 *) l_res->ai_addr)->sin6_addr;
+                memcpy(a_addr_out, l_cur_addr, sizeof(struct in6_addr));
+                break;
+            }
+        if(l_cur_addr) {
+            freeaddrinfo(l_res);
+            return 0;
+        }
+        l_res = l_res->ai_next;
+    }
+    freeaddrinfo(l_res);
+    return -1;
+}
diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c
index fafbee2d1e..d50e05a2cc 100644
--- a/dap-sdk/net/core/dap_worker.c
+++ b/dap-sdk/net/core/dap_worker.c
@@ -338,8 +338,8 @@ void *dap_worker_thread(void *arg)
             }
 
             // Socket is ready to write and not going to close
-            if(( l_flag_write&&(l_cur->flags & DAP_SOCK_READY_TO_WRITE)  ||
-                 (l_cur->flags & DAP_SOCK_READY_TO_WRITE)) && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE)) {
+            if(   ( l_flag_write&&(l_cur->flags & DAP_SOCK_READY_TO_WRITE) ) ||
+                 (    (l_cur->flags & DAP_SOCK_READY_TO_WRITE) && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) ) ) {
                 //log_it(L_DEBUG, "Main loop output: %u bytes to send", l_cur->buf_out_size);
                 if(l_cur->callbacks.write_callback)
                     l_cur->callbacks.write_callback(l_cur, NULL); // Call callback to process write event
@@ -605,13 +605,13 @@ static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg)
         return;
     }
     if (l_msg->flags_set & DAP_SOCK_CONNECTING)
-        if (!  l_msg_es->flags & DAP_SOCK_CONNECTING ){
+        if (!  (l_msg_es->flags & DAP_SOCK_CONNECTING) ){
             l_msg_es->flags |= DAP_SOCK_CONNECTING;
             dap_events_socket_worker_poll_update_unsafe(l_msg_es);
         }
 
     if (l_msg->flags_set & DAP_SOCK_CONNECTING)
-        if (!  l_msg_es->flags & DAP_SOCK_CONNECTING ){
+        if (!  (l_msg_es->flags & DAP_SOCK_CONNECTING) ){
             l_msg_es->flags ^= DAP_SOCK_CONNECTING;
             dap_events_socket_worker_poll_update_unsafe(l_msg_es);
         }
diff --git a/dap-sdk/net/core/include/dap_net.h b/dap-sdk/net/core/include/dap_net.h
new file mode 100644
index 0000000000..e2bd58db2f
--- /dev/null
+++ b/dap-sdk/net/core/include/dap_net.h
@@ -0,0 +1,41 @@
+/*
+ * Authors:
+ * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net>
+ * DeM Labs Ltd.   https://demlabs.net
+ * Copyright  (c) 2020
+ * All rights reserved.
+
+ This file is part of DAP SDK the open source project
+
+    DAP SDK is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    DAP SDK is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with any DAP SDK based project.  If not, see <http://www.gnu.org/licenses/>.
+*/
+#pragma once
+
+#include "dap_common.h"
+#ifdef WIN32
+// for Windows
+#include <winsock2.h>
+#include <windows.h>
+#include <mswsock.h>
+#include <ws2tcpip.h>
+#include <io.h>
+#else
+// for Unix-like systems
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#endif
+
+int dap_net_resolve_host(const char *a_host, int ai_family, struct sockaddr *a_addr_out);
diff --git a/dap-sdk/net/server/json_rpc/src/dap_json_rpc_errors.c b/dap-sdk/net/server/json_rpc/src/dap_json_rpc_errors.c
index 6d465ddcc6..bf38da1416 100644
--- a/dap-sdk/net/server/json_rpc/src/dap_json_rpc_errors.c
+++ b/dap-sdk/net/server/json_rpc/src/dap_json_rpc_errors.c
@@ -60,12 +60,13 @@ int _dap_json_rpc_error_cmp_by_code(dap_json_rpc_error_t *a_error, int a_code_er
 {
     if (a_error->code_error == a_code_error)
         return 0;
-    if (a_error->code_error < a_code_error)
+    else if (a_error->code_error < a_code_error)
         return -1;
-    if (a_error->code_error > a_code_error)
+    else
         return 1;
 }
 
+
 dap_json_rpc_error_t *dap_json_rpc_error_search_by_code(int a_code_error)
 {
     dap_json_rpc_error_t *l_element = NULL;
-- 
GitLab