From d702e3400a910b02561508a651e58a316e7282bc Mon Sep 17 00:00:00 2001
From: Roman Khlopkov <roman.khlopkov@demlabs.net>
Date: Wed, 18 Aug 2021 17:40:02 +0300
Subject: [PATCH] [*] A number of stability changes

---
 dap-sdk/net/client/dap_client.c               |  2 +-
 dap-sdk/net/client/dap_client_http.c          | 27 ++++---------------
 dap-sdk/net/client/dap_client_pvt.c           | 16 +++++++----
 dap-sdk/net/core/dap_events_socket.c          | 19 ++++++++-----
 dap-sdk/net/core/include/dap_events_socket.h  |  1 +
 dap-sdk/net/stream/stream/dap_stream.c        | 13 +++++----
 .../global-db/dap_chain_global_db_driver.c    |  6 ++---
 modules/net/dap_chain_node_client.c           |  2 +-
 .../service/vpn/dap_chain_net_vpn_client.c    |  4 +--
 9 files changed, 43 insertions(+), 47 deletions(-)

diff --git a/dap-sdk/net/client/dap_client.c b/dap-sdk/net/client/dap_client.c
index cfdcbfcb08..138c188502 100644
--- a/dap-sdk/net/client/dap_client.c
+++ b/dap-sdk/net/client/dap_client.c
@@ -439,7 +439,7 @@ const char * dap_client_error_str(dap_client_error_t a_client_error)
  */
 const char * dap_client_get_error_str(dap_client_t * a_client)
 {
-    if(a_client == NULL){
+    if(a_client == NULL || DAP_CLIENT_PVT(a_client) == NULL){
         log_it(L_ERROR,"Client is NULL for dap_client_get_error_str");
         return NULL;
     }
diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c
index e7afb9f255..81722cacee 100644
--- a/dap-sdk/net/client/dap_client_http.c
+++ b/dap-sdk/net/client/dap_client_http.c
@@ -286,29 +286,12 @@ static void s_http_read(dap_events_socket_t * a_es, void * arg)
     }
     // process http header
     if(l_http_pvt->is_header_read) {
-        l_http_pvt->response[l_http_pvt->header_length - 1] = 0;
-        // search strings in header
-        char **l_strings = dap_strsplit((char*) l_http_pvt->response, "\r\n", -1);
-        if(l_strings) {
-            int i = 0;
-            while(l_strings[i]) {
-                char *l_string = l_strings[i];
-                char **l_values = dap_strsplit(l_string, ":", 2);
-                if(l_values && l_values[0] && l_values[1])
-                    if(!dap_strcmp("Content-Length", l_values[0])) {
-                        l_http_pvt->content_length = atoi(l_values[1]);
-                        l_http_pvt->is_header_read = false;
-                    }
-                dap_strfreev(l_values);
-                if(l_http_pvt->content_length)
-                    break;
-                i++;
-            }
-            dap_strfreev(l_strings);
+        const char *l_token = "Content-Length: ";
+        char *l_content_len_ptr = strstr((char*)l_http_pvt->response, l_token);
+        if (l_content_len_ptr) {
+            l_http_pvt->content_length = atoi(l_content_len_ptr + strlen(l_token));
+            l_http_pvt->is_header_read = false;
         }
-
-        // restore last symbol
-        l_http_pvt->response[l_http_pvt->header_length - 1] = '\n';
     }
 
     // process data
diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c
index 14daedc7b6..e821bc6b80 100644
--- a/dap-sdk/net/client/dap_client_pvt.c
+++ b/dap-sdk/net/client/dap_client_pvt.c
@@ -584,6 +584,8 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt)
         break;
 
         case STAGE_STATUS_ERROR: {
+            if (a_client_pvt->is_to_delete)
+                break;
             // limit the number of attempts
             a_client_pvt->stage_errors++;
             bool l_is_last_attempt = a_client_pvt->stage_errors > s_max_attempts ? true : false;
@@ -595,7 +597,7 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt)
             //    l_is_last_attempt = false;
             //}
 
-            log_it(L_ERROR, "Error state( %s), doing callback if present", dap_client_get_error_str(a_client_pvt->client));
+            log_it(L_ERROR, "Error state( %s), doing callback if present", dap_client_error_str(a_client_pvt->last_error));
             if(a_client_pvt->stage_status_error_callback)
                 a_client_pvt->stage_status_error_callback(a_client_pvt->client, (void*) l_is_last_attempt);
 
@@ -1309,7 +1311,13 @@ static void s_stream_es_callback_write(dap_events_socket_t * a_es, void * arg)
  */
 static void s_stream_es_callback_error(dap_events_socket_t * a_es, int a_error)
 {
-    dap_client_pvt_t * l_client_pvt = (dap_client_pvt_t *) a_es->_inheritor;
+    a_es->flags |= DAP_SOCK_SIGNAL_CLOSE;
+    dap_client_pvt_t *l_client_pvt = (dap_client_pvt_t *) a_es->_inheritor;
+    if (!l_client_pvt)
+        return;
+    l_client_pvt = dap_client_pvt_find(l_client_pvt->uuid);
+    if (!l_client_pvt)
+        return;
 
     char l_errbuf[128];
     l_errbuf[0]='\0';
@@ -1318,9 +1326,7 @@ static void s_stream_es_callback_error(dap_events_socket_t * a_es, int a_error)
     else
         strncpy(l_errbuf,"Unknown Error",sizeof(l_errbuf)-1);
 
-    log_it(L_WARNING, "STREAM error \"%s\" (code %d)", l_errbuf, a_error);
-
-    l_client_pvt->stream_es->flags |= DAP_SOCK_SIGNAL_CLOSE;
+    log_it(L_WARNING, "STREAM error \"%s\" (code %d)", l_errbuf, a_error);    
 
     if (a_error == ETIMEDOUT) {
         l_client_pvt->last_error = ERROR_NETWORK_CONNECTION_TIMEOUT;
diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c
index f6aff2f4e1..c452530f4e 100644
--- a/dap-sdk/net/core/dap_events_socket.c
+++ b/dap-sdk/net/core/dap_events_socket.c
@@ -127,6 +127,13 @@ int dap_events_socket_init( )
     char l_cmd[256];
     snprintf(l_cmd,sizeof (l_cmd),"rm /dev/mqueue/%s-queue_ptr*", dap_get_appname());
     system(l_cmd);
+    FILE *l_mq_msg_max = fopen("/proc/sys/fs/mqueue/msg_max", "w");
+    if (l_mq_msg_max) {
+        fprintf(l_mq_msg_max, "%d", DAP_QUEUE_MAX_MSGS);
+        fclose(l_mq_msg_max);
+    } else {
+        log_it(L_ERROR, "Сan't open /proc/sys/fs/mqueue/msg_max file for writing");
+    }
 #endif
     dap_timerfd_init();
     return 0;
@@ -434,9 +441,9 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket
 {
     dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t);
     l_es->type = DESCRIPTOR_TYPE_QUEUE;
-    l_es->buf_out_size_max = 8 * sizeof(void*);
+    l_es->buf_out_size_max = DAP_QUEUE_MAX_MSGS * sizeof(void*);
     l_es->buf_out       = DAP_NEW_Z_SIZE(byte_t,l_es->buf_out_size_max );
-    l_es->buf_in_size_max = 8 * sizeof(void*);
+    l_es->buf_in_size_max = DAP_QUEUE_MAX_MSGS * sizeof(void*);
     l_es->buf_in       = DAP_NEW_Z_SIZE(byte_t,l_es->buf_in_size_max );
     //l_es->buf_out_size  = 8 * sizeof(void*);
     l_es->events = a_es->events;
@@ -463,7 +470,7 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket
     char l_mq_name[64];
     struct mq_attr l_mq_attr;
     memset(&l_mq_attr,0,sizeof (l_mq_attr));
-    l_mq_attr.mq_maxmsg = 8; // Don't think we need to hold more than 1024 messages
+    l_mq_attr.mq_maxmsg = DAP_QUEUE_MAX_MSGS; // Don't think we need to hold more than 1024 messages
     l_mq_attr.mq_msgsize = sizeof (void*); // We send only pointer on memory,
                                             // so use it with shared memory if you do access from another process
     snprintf(l_mq_name,sizeof (l_mq_name),"/%s-queue_ptr-%u",dap_get_appname(), a_es->mqd_id );
@@ -556,7 +563,8 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc
         l_es->worker = a_w;
     }
     l_es->callbacks.queue_ptr_callback = a_callback; // Arm event callback
-    l_es->buf_in_size_max = 8 * sizeof(void*);
+    l_es->buf_in_size_max = DAP_QUEUE_MAX_MSGS * sizeof(void*);
+    l_es->buf_in = DAP_NEW_Z_SIZE(byte_t,l_es->buf_in_size_max);
     l_es->buf_out = NULL;
 
 #if defined(DAP_EVENTS_CAPS_EPOLL)
@@ -621,7 +629,6 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc
         char l_file_buf[l_file_buf_size];
         memset(l_file_buf, 0, l_file_buf_size);
         fread(l_file_buf, l_file_buf_size, 1, l_sys_max_pipe_size_fd);
-        l_sys_max_pipe_size_fd = NULL;
         uint64_t l_sys_max_pipe_size = strtoull(l_file_buf, 0, 10);
         fcntl(l_pipe[0], F_SETPIPE_SZ, l_sys_max_pipe_size);
         fclose(l_sys_max_pipe_size_fd);
@@ -633,7 +640,7 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc
     struct mq_attr l_mq_attr;
     static uint32_t l_mq_last_number=0;
     memset(&l_mq_attr,0,sizeof (l_mq_attr));
-    l_mq_attr.mq_maxmsg = 8; // Don't think we need to hold more than 1024 messages
+    l_mq_attr.mq_maxmsg = DAP_QUEUE_MAX_MSGS; // Don't think we need to hold more than 1024 messages
     l_mq_attr.mq_msgsize = sizeof (void*); // We send only pointer on memory,
                                             // so use it with shared memory if you do access from another process
     snprintf(l_mq_name,sizeof (l_mq_name),"/%s-queue_ptr-%u",dap_get_appname(),l_mq_last_number );
diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h
index 434fcc22fc..094c50a18e 100644
--- a/dap-sdk/net/core/include/dap_events_socket.h
+++ b/dap-sdk/net/core/include/dap_events_socket.h
@@ -143,6 +143,7 @@ typedef struct dap_events_socket_callbacks {
 } dap_events_socket_callbacks_t;
 
 #define DAP_EVENTS_SOCKET_BUF 100000
+#define DAP_QUEUE_MAX_MSGS 8
 
 typedef enum {
     DESCRIPTOR_TYPE_SOCKET_CLIENT = 0,
diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c
index bea02c2aef..3e5633a9df 100644
--- a/dap-sdk/net/stream/stream/dap_stream.c
+++ b/dap-sdk/net/stream/stream/dap_stream.c
@@ -664,14 +664,13 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream)
      //   log_it(DEBUG,"No prefill or defrag buffer, process directly buf_in");
     // Now lets see how many packets we have in buffer now
     while ( (pkt = dap_stream_pkt_detect( proc_data , bytes_left_to_read)) ){
-        if(bytes_left_to_read -((byte_t*)pkt- proc_data  ) >=sizeof (dap_stream_pkt_t)){
-            if(pkt->hdr.size > STREAM_PKT_SIZE_MAX ){
-                //log_it(L_ERROR, "stream_pkt_detect() Too big packet size %u",
-                //       pkt->hdr.size);
-                bytes_left_to_read=0;
-                break;
-            }
+        if(pkt->hdr.size > STREAM_PKT_SIZE_MAX ){
+            //log_it(L_ERROR, "stream_pkt_detect() Too big packet size %u",
+            //       pkt->hdr.size);
+            bytes_left_to_read=0;
+            break;
         }
+
         size_t pkt_offset=( ((uint8_t*)pkt)- proc_data );
         bytes_left_to_read -= pkt_offset ;
         found_sig=true;
diff --git a/modules/global-db/dap_chain_global_db_driver.c b/modules/global-db/dap_chain_global_db_driver.c
index 645875763a..542d5758af 100644
--- a/modules/global-db/dap_chain_global_db_driver.c
+++ b/modules/global-db/dap_chain_global_db_driver.c
@@ -325,9 +325,9 @@ dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, siz
         memcpy(obj->key, pkt->data + offset, str_length);
         offset += str_length;
 
-        if (offset+sizeof (uint32_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'value_length' field"); break;} // Check for buffer boundries
-        memcpy(&obj->value_len, pkt->data + offset, sizeof(uint32_t));
-        offset += sizeof(uint32_t);
+        if (offset+sizeof (size_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'value_length' field"); break;} // Check for buffer boundries
+        memcpy(&obj->value_len, pkt->data + offset, sizeof(size_t));
+        offset += sizeof(size_t);
 
         if (offset+obj->value_len> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'value' field"); break;} // Check for buffer boundries
         obj->value = DAP_NEW_Z_SIZE(uint8_t, obj->value_len + 1);
diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c
index a0df55ee08..141f126844 100644
--- a/modules/net/dap_chain_node_client.c
+++ b/modules/net/dap_chain_node_client.c
@@ -691,6 +691,7 @@ void dap_chain_node_client_close(dap_chain_node_client_t *a_client)
         inet_ntop(AF_INET, &a_client->info->hdr.ext_addr_v4, l_node_addr_str, INET_ADDRSTRLEN);
         log_it(L_INFO, "Closing node client to uplink %s:%d", l_node_addr_str, a_client->info->hdr.ext_port);
         // clean client
+        a_client->client->_inheritor = NULL;
         dap_client_delete_mt(a_client->client);
 #ifndef _WIN32
         pthread_cond_destroy(&a_client->wait_cond);
@@ -698,7 +699,6 @@ void dap_chain_node_client_close(dap_chain_node_client_t *a_client)
         CloseHandle( a_client->wait_cond );
 #endif
         pthread_mutex_destroy(&a_client->wait_mutex);
-        a_client->client->_inheritor = NULL;
         a_client->client = NULL;
         dap_chain_node_client_handle_t * l_client_found = NULL;
         HASH_FIND(hh,s_clients,&a_client->uuid,sizeof(a_client->uuid),l_client_found);
diff --git a/modules/service/vpn/dap_chain_net_vpn_client.c b/modules/service/vpn/dap_chain_net_vpn_client.c
index c41ca37e10..988387f818 100644
--- a/modules/service/vpn/dap_chain_net_vpn_client.c
+++ b/modules/service/vpn/dap_chain_net_vpn_client.c
@@ -416,7 +416,7 @@ int dap_chain_net_vpn_client_check(dap_chain_net_t *a_net, const char *a_ipv4_st
     if(a_timeout_test_ms==-1)
         a_timeout_test_ms = 10000;
     // default 5 sec = 5000 ms
-    int l_timeout_conn_ms = 5000;
+    int l_timeout_conn_ms = 25000;
 
     int l_ret = 0;
     if(!a_ipv4_str) // && !a_ipv6_str)
@@ -437,7 +437,7 @@ int dap_chain_net_vpn_client_check(dap_chain_net_t *a_net, const char *a_ipv4_st
     if(a_ipv6_str)
         inet_pton(AF_INET6, a_ipv6_str, &(s_node_info->hdr.ext_addr_v6));
 
-    s_vpn_client = dap_chain_node_client_connect_channels(a_net, s_node_info, l_active_channels);
+    s_vpn_client = dap_chain_node_client_create_n_connect(a_net, s_node_info, l_active_channels, NULL, NULL);
     if(!s_vpn_client) {
         log_it(L_ERROR, "Can't connect to VPN server=%s:%d", a_ipv4_str, a_port);
         // clean client struct
-- 
GitLab