From b174c15b2f516178fb5b11e2d053a78ea9af0046 Mon Sep 17 00:00:00 2001
From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net>
Date: Tue, 8 Sep 2020 15:14:26 +0700
Subject: [PATCH] [+] More checks [+] New debug_more param for srv_vpn [+] More
 warnings about data loose [+] Stats collection for sent, receieved and lost

---
 dap-sdk/net/core/dap_events_socket.c          |   9 +-
 dap-sdk/net/core/dap_worker.c                 |   8 +-
 dap-sdk/net/stream/ch/dap_stream_ch_pkt.c     |  20 ++
 .../net/stream/ch/include/dap_stream_ch_pkt.h |   2 +
 dap-sdk/net/stream/stream/dap_stream_pkt.c    |   2 +-
 .../dap_chain_net_srv_stream_session.h        |  17 +-
 modules/service/vpn/dap_chain_net_srv_vpn.c   | 236 ++++++++++++++----
 .../vpn/include/dap_chain_net_srv_vpn.h       |   2 +-
 8 files changed, 244 insertions(+), 52 deletions(-)

diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c
index 22b9fd3c40..8dcb64f5f3 100644
--- a/dap-sdk/net/core/dap_events_socket.c
+++ b/dap-sdk/net/core/dap_events_socket.c
@@ -148,7 +148,6 @@ void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old,
     l_msg->esocket = a_es;
     l_msg->worker_new = a_worker_new;
     dap_events_socket_queue_ptr_send(a_worker_old->queue_es_reassign, l_msg);
-
 }
 
 /**
@@ -254,7 +253,7 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc
     int l_pipe[2];
     int l_errno;
     char l_errbuf[128];
-    if( pipe2(l_pipe,O_DIRECT) < 0 ){
+    if( pipe2(l_pipe,O_DIRECT | O_NONBLOCK ) < 0 ){
         l_errno = errno;
         strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
         switch (l_errno) {
@@ -483,8 +482,12 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg)
     int l_errno = errno;
     if (ret == sizeof(a_arg) )
         return  0;
-    else
+    else{
+        char l_errbuf[128];
+        strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
+        log_it(L_ERROR, "Can't send ptr to queue:\"%s\" code %d", l_errbuf, l_errno);
         return l_errno;
+    }
 #elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX)
     struct timespec l_timeout;
     clock_gettime(CLOCK_REALTIME, &l_timeout);
diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c
index 76821c2113..c5cd7f2c92 100644
--- a/dap-sdk/net/core/dap_worker.c
+++ b/dap-sdk/net/core/dap_worker.c
@@ -267,8 +267,8 @@ void *dap_worker_thread(void *arg)
                 }
                 if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) {
 
-                    static const uint32_t buf_out_zero_count_max = 5;
-                    l_cur->buf_out[l_cur->buf_out_size] = 0;
+                    static const uint32_t buf_out_zero_count_max = 2;
+                    //l_cur->buf_out[l_cur->buf_out_size] = 0;
 
                     if(!l_cur->buf_out_size) {
 
@@ -276,8 +276,8 @@ void *dap_worker_thread(void *arg)
                         l_cur->buf_out_zero_count++;
 
                         if(l_cur->buf_out_zero_count > buf_out_zero_count_max) { // How many time buf_out on write event could be empty
-                            //log_it(L_WARNING, "Output: nothing to send %u times, remove socket from the write set",
-                            //        buf_out_zero_count_max);
+                            log_it(L_WARNING, "Output: nothing to send %u times, remove socket from the write set",
+                                    buf_out_zero_count_max);
                             dap_events_socket_set_writable_unsafe(l_cur, false);
                         }
                     }
diff --git a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c
index 6287ddcd23..0fce460367 100644
--- a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c
+++ b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c
@@ -132,6 +132,26 @@ size_t dap_stream_ch_pkt_write_mt(dap_stream_worker_t * a_worker , dap_stream_ch
     return a_data_size;
 }
 
+/**
+ * @brief dap_stream_ch_check_unsafe
+ * @param a_worker
+ * @param a_ch
+ * @return
+ */
+bool dap_stream_ch_check_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t * a_ch)
+{
+    if (a_ch){
+        if ( a_worker->channels){
+            dap_stream_ch_t * l_ch = NULL;
+            HASH_FIND(hh_worker,a_worker->channels ,&a_ch, sizeof(a_ch), l_ch );
+            return l_ch == a_ch;
+        }else
+            return false;
+    }else
+        return false;
+}
+
+
 
 /**
  * @brief stream_ch_pkt_write
diff --git a/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h b/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h
index f2c986ca2f..47dfac6ebf 100644
--- a/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h
+++ b/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h
@@ -54,5 +54,7 @@ void dap_stream_ch_pkt_deinit();
 size_t dap_stream_ch_pkt_write_f_unsafe(struct dap_stream_ch * a_ch, uint8_t a_type, const char * a_str,...);
 size_t dap_stream_ch_pkt_write_unsafe(struct dap_stream_ch * a_ch,  uint8_t a_type, const void * a_data, size_t a_data_size);
 
+bool dap_stream_ch_check_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t * a_ch);
+
 size_t dap_stream_ch_pkt_write_f_mt(dap_stream_worker_t * a_worker , dap_stream_ch_t *a_ch, uint8_t a_type, const char * a_str,...);
 size_t dap_stream_ch_pkt_write_mt(dap_stream_worker_t * a_worker , dap_stream_ch_t *a_ch,  uint8_t a_type, const void * a_data, size_t a_data_size);
diff --git a/dap-sdk/net/stream/stream/dap_stream_pkt.c b/dap-sdk/net/stream/stream/dap_stream_pkt.c
index 4a83abb5c9..9642362c3c 100644
--- a/dap-sdk/net/stream/stream/dap_stream_pkt.c
+++ b/dap-sdk/net/stream/stream/dap_stream_pkt.c
@@ -148,7 +148,7 @@ size_t dap_stream_pkt_write_unsafe(dap_stream_t * a_stream, const void * a_data,
     memset(&pkt_hdr,0,sizeof(pkt_hdr));
     memcpy(pkt_hdr.sig,c_dap_stream_sig,sizeof(pkt_hdr.sig));
 
-    pkt_hdr.size =(uint32_t) a_stream->session->key->enc_na(a_stream->session->key, a_data,a_data_size,l_buf_selected, l_buf_size_required);
+    pkt_hdr.size =(uint32_t) dap_enc_code( a_stream->session->key, a_data,a_data_size,l_buf_selected, l_buf_size_required, DAP_ENC_DATA_TYPE_RAW);
 
     ret+=dap_events_socket_write_unsafe(a_stream->esocket,&pkt_hdr,sizeof(pkt_hdr));
     ret+=dap_events_socket_write_unsafe(a_stream->esocket,l_buf_selected,pkt_hdr.size);
diff --git a/modules/net/srv/include/dap_chain_net_srv_stream_session.h b/modules/net/srv/include/dap_chain_net_srv_stream_session.h
index 8827cc6aba..a47a16c9bc 100644
--- a/modules/net/srv/include/dap_chain_net_srv_stream_session.h
+++ b/modules/net/srv/include/dap_chain_net_srv_stream_session.h
@@ -61,15 +61,30 @@ typedef struct dap_chain_net_srv_usage{
 
 typedef void (*dap_response_success_callback_t) (dap_stream_ch_chain_net_srv_pkt_success_t*, void*);
 
+typedef struct dap_net_stats{
+        uintmax_t bytes_sent;
+        uintmax_t bytes_recv;
+        uintmax_t bytes_sent_lost;
+        uintmax_t bytes_recv_lost;
+
+        uintmax_t packets_sent;
+        uintmax_t packets_recv;
+        uintmax_t packets_sent_lost;
+        intmax_t packets_recv_lost;
+} dap_net_stats_t;
+
 typedef struct dap_chain_net_srv_stream_session {
     dap_stream_session_t * parent;
     dap_chain_net_srv_usage_t * usages;
     dap_chain_net_srv_usage_t * usage_active;
 
-    uint128_t limits_bytes; // Bytes left
+    uintmax_t limits_bytes; // Bytes left
     time_t limits_ts; // Timestamp until its activte
     dap_chain_net_srv_price_unit_uid_t limits_units_type;
 
+    // Some common stats
+    volatile dap_net_stats_t stats;
+
     time_t ts_activated;
     dap_sign_t* user_sign; // User's signature for auth if reconnect
 
diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c
index 3dddf22356..a959d2162b 100644
--- a/modules/service/vpn/dap_chain_net_srv_vpn.c
+++ b/modules/service/vpn/dap_chain_net_srv_vpn.c
@@ -130,6 +130,7 @@ struct tun_socket_msg{
 ch_sf_tun_socket_t ** s_tun_sockets = NULL;
 dap_events_socket_t ** s_tun_sockets_queue_msg = NULL;
 uint32_t s_tun_sockets_count = 0;
+bool s_debug_more = false;
 
 static usage_client_t * s_clients;
 static dap_chain_net_srv_ch_vpn_t * s_ch_vpn_addrs ;
@@ -182,21 +183,34 @@ static int s_tun_deattach_queue(int fd);
 static int s_tun_attach_queue(int fd);
 
 
-static void s_tun_client_send_data(dap_chain_net_srv_ch_vpn_info_t * a_ch_vpn_info, const void * a_data, size_t a_data_size);
+static bool s_tun_client_send_data(dap_chain_net_srv_ch_vpn_info_t * a_ch_vpn_info, const void * a_data, size_t a_data_size);
+static size_t s_stream_session_esocket_send(dap_chain_net_srv_stream_session_t * l_srv_session, dap_events_socket_t * l_es, const void * a_data, size_t a_data_size );
+static bool s_tun_client_send_data_unsafe(dap_chain_net_srv_ch_vpn_t * l_ch_vpn, ch_vpn_pkt_t * l_pkt_out);
 
 
-static void s_tun_client_send_data_unsafe(dap_chain_net_srv_ch_vpn_t * l_ch_vpn, ch_vpn_pkt_t * l_pkt_out)
+static bool s_tun_client_send_data_unsafe(dap_chain_net_srv_ch_vpn_t * l_ch_vpn, ch_vpn_pkt_t * l_pkt_out)
 {
     dap_chain_net_srv_stream_session_t * l_srv_session = DAP_CHAIN_NET_SRV_STREAM_SESSION (l_ch_vpn->ch->stream->session );
     dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find_unsafe(l_srv_session,  l_ch_vpn->usage_id);
 
-    dap_stream_ch_pkt_write_unsafe(l_ch_vpn->ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, l_pkt_out,
-            l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header));
-    s_update_limits(l_ch_vpn->ch,l_srv_session,l_usage, l_pkt_out->header.op_data.data_size );
+    size_t l_data_to_send = (l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header));
+    size_t l_data_sent = dap_stream_ch_pkt_write_unsafe(l_ch_vpn->ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, l_pkt_out, l_data_to_send);
+    s_update_limits(l_ch_vpn->ch,l_srv_session,l_usage, l_data_sent );
+    if ( l_data_sent != l_data_to_send){
+        log_it(L_WARNING, "Wasn't sent all the data in tunnel: probably buffer overflow");
+        l_srv_session->stats.bytes_recv_lost += l_data_to_send - l_data_sent;
+        l_srv_session->stats.packets_recv_lost++;
+        return false;
+    }else{
+        l_srv_session->stats.bytes_recv += l_data_sent;
+        l_srv_session->stats.packets_recv++;
+        return true;
+    }
 }
 
-static void s_tun_client_send_data(dap_chain_net_srv_ch_vpn_info_t * l_ch_vpn_info, const void * a_data, size_t a_data_size)
+static bool s_tun_client_send_data(dap_chain_net_srv_ch_vpn_info_t * l_ch_vpn_info, const void * a_data, size_t a_data_size)
 {
+    assert(a_data_size > sizeof (struct iphdr));
     ch_vpn_pkt_t *l_pkt_out = DAP_NEW_Z_SIZE(ch_vpn_pkt_t, sizeof(l_pkt_out->header) + a_data_size);
     l_pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_RECV;
     l_pkt_out->header.sock_id = s_raw_server->tun_fd;
@@ -204,8 +218,21 @@ static void s_tun_client_send_data(dap_chain_net_srv_ch_vpn_info_t * l_ch_vpn_in
     l_pkt_out->header.op_data.data_size = a_data_size;
     memcpy(l_pkt_out->data, a_data, a_data_size);
 
+    struct in_addr l_in_daddr;
+    l_in_daddr.s_addr = ((struct iphdr* ) l_pkt_out->data)->daddr;
+
     if(l_ch_vpn_info->is_on_this_worker){
-        s_tun_client_send_data_unsafe(l_ch_vpn_info->ch_vpn,l_pkt_out);
+        if( dap_events_socket_check_unsafe(l_ch_vpn_info->worker, l_ch_vpn_info->esocket ) ){
+            if(s_debug_more){
+                char l_str_daddr[INET_ADDRSTRLEN];
+                inet_ntop(AF_INET,&l_in_daddr,l_str_daddr,sizeof (l_in_daddr));
+                log_it(L_INFO, "Sent packet for desitnation %s in own context",l_str_daddr);
+            }
+
+            s_tun_client_send_data_unsafe(l_ch_vpn_info->ch_vpn,l_pkt_out);
+        }else{
+            log_it(L_WARNING, "Was no esocket %p on worker #%u, lost %zd data",l_ch_vpn_info->esocket, l_ch_vpn_info->worker->id,a_data_size );
+        }
         DAP_DELETE(l_pkt_out);
     }else{
         struct tun_socket_msg * l_msg= DAP_NEW_Z(struct tun_socket_msg);
@@ -213,8 +240,20 @@ static void s_tun_client_send_data(dap_chain_net_srv_ch_vpn_info_t * l_ch_vpn_in
         l_msg->ch_vpn = l_ch_vpn_info->ch_vpn;
         l_msg->esocket = l_ch_vpn_info->esocket;
         l_msg->ch_vpn_send.pkt = l_pkt_out;
-        dap_events_socket_queue_ptr_send(l_ch_vpn_info->queue_msg, l_msg);
+        if (dap_events_socket_queue_ptr_send(l_ch_vpn_info->queue_msg, l_msg) != 0 ){
+            log_it(L_WARNING, "Lost %zd data send in tunnel send operation in alien context: queue is overfilled?",a_data_size );
+            DAP_DELETE(l_msg);
+            DAP_DELETE(l_pkt_out);
+            return false;
+        }
+        if(s_debug_more){
+            char l_str_daddr[INET_ADDRSTRLEN];
+            inet_ntop(AF_INET,&l_in_daddr,l_str_daddr,sizeof (l_in_daddr));
+            log_it(L_INFO, "Sent packet for desitnation %s between contexts",l_str_daddr);
+        }
+
     }
+    return true;
 }
 
 /**
@@ -296,7 +335,8 @@ static void s_tun_recv_msg_callback(dap_events_socket_t * a_esocket_queue, void
 
         }break;
         case TUN_SOCKET_MSG_CH_VPN_SEND:{
-            s_tun_client_send_data_unsafe(l_msg->ch_vpn,l_msg->ch_vpn_send.pkt);
+            if(dap_events_socket_check_unsafe(a_esocket_queue->worker, l_msg->esocket ) )
+                s_tun_client_send_data_unsafe(l_msg->ch_vpn,l_msg->ch_vpn_send.pkt);
             DAP_DELETE(l_msg->ch_vpn_send.pkt);
         }break;
         default:log_it(L_ERROR,"Wrong tun socket message type %d", l_msg->type);
@@ -353,7 +393,7 @@ static void s_tun_send_msg_ip_unassigned(uint32_t a_worker_id, dap_chain_net_srv
     l_msg->esocket = a_ch_vpn->ch->stream->esocket;
     l_msg->is_reassigned_once = a_ch_vpn->ch->stream->esocket->was_reassigned;
 
-    if (dap_events_socket_queue_ptr_send(s_tun_sockets_queue_msg[a_worker_id], l_msg) != 0){
+    if ( dap_events_socket_queue_ptr_send(s_tun_sockets_queue_msg[a_worker_id], l_msg) != 0 ) {
         log_it(L_WARNING, "Cant send new  ip unassign message to the tun msg queue #%u", a_worker_id);
     }
 }
@@ -627,6 +667,10 @@ int s_vpn_service_create(dap_config_t * g_config){
 
     uint16_t l_pricelist_count = 0;
 
+    // Read if we need to dump all pkt operations
+    s_debug_more= dap_config_get_item_bool_default(g_config,"srv_vpn", "debug_more",false);
+
+
     //! IMPORTANT ! This fetch is single-action and cannot be further reused, since it modifies the stored config data
     //! it also must NOT be freed within this module !
     char **l_pricelist = dap_config_get_array_str(g_config, "srv_vpn", "pricelist", &l_pricelist_count); // must not be freed!
@@ -718,11 +762,10 @@ int dap_chain_net_srv_vpn_init(dap_config_t * g_config) {
         return -1;
     }
     log_it(L_INFO,"TUN driver configured successfuly");
-
+    s_vpn_service_create(g_config);
     dap_stream_ch_proc_add(DAP_STREAM_CH_ID_NET_SRV_VPN, s_ch_vpn_new, s_ch_vpn_delete, s_ch_packet_in,
             s_ch_packet_out);
 
-    s_vpn_service_create(g_config);
     return 0;
 }
 
@@ -1056,6 +1099,7 @@ static void send_pong_pkt(dap_stream_ch_t* a_ch)
 void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv_usage_t * a_usage){
     dap_chain_net_srv_ch_vpn_t *l_ch_vpn = CH_VPN(a_ch);
     dap_chain_net_srv_vpn_t * l_srv_vpn =(dap_chain_net_srv_vpn_t *) a_usage->service->_inhertor;
+    dap_chain_net_srv_stream_session_t * l_srv_session= DAP_CHAIN_NET_SRV_STREAM_SESSION(l_ch_vpn->ch->stream->session);
 
     if ( l_ch_vpn->addr_ipv4.s_addr ){
         log_it(L_WARNING,"We already have ip address leased to us");
@@ -1064,9 +1108,19 @@ void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv
         pkt_out->header.op_problem.code = VPN_PROBLEM_CODE_ALREADY_ASSIGNED_ADDR;
         pkt_out->header.sock_id = s_raw_server->tun_fd;
         pkt_out->header.usage_id = a_usage->id;
-        dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out,
-                pkt_out->header.op_data.data_size + sizeof(pkt_out->header));
-        dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
+
+        size_t l_data_to_write = pkt_out->header.op_data.data_size + sizeof(pkt_out->header);
+        size_t l_data_wrote = dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out,
+                l_data_to_write);
+        if (l_data_wrote < l_data_to_write){
+            log_it(L_WARNING, "Buffer overfilled: can't send packet with VPN_PROBLEM_CODE_ALREADY_ASSIGNED_ADDR: sent only %zd from %zd",
+                    l_data_wrote,l_data_to_write );
+            l_srv_session->stats.bytes_sent_lost += l_data_to_write - l_data_wrote;
+            l_srv_session->stats.packets_sent_lost++;
+        }else{
+            l_srv_session->stats.packets_sent++;
+            l_srv_session->stats.bytes_sent+= l_data_wrote;
+        }
         return;
     }
     dap_chain_net_srv_vpn_item_ipv4_t * l_item_ipv4 = l_srv_vpn->ipv4_unleased;
@@ -1089,15 +1143,27 @@ void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv
         memcpy(l_pkt_out->data + sizeof(l_ch_vpn->addr_ipv4), &s_raw_server->ipv4_gw ,
                 sizeof(s_raw_server->ipv4_gw));
 
-        dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA , l_pkt_out,
-                l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header));
-        log_it(L_NOTICE, "VPN client address %s leased", inet_ntoa(l_ch_vpn->addr_ipv4));
-        log_it(L_INFO, "\tnet gateway %s", inet_ntoa(s_raw_server->ipv4_network_addr));
-        log_it(L_INFO, "\tnet mask %s", inet_ntoa(s_raw_server->ipv4_network_mask));
-        log_it(L_INFO, "\tgw %s", inet_ntoa(s_raw_server->ipv4_gw));
-        log_it(L_INFO, "\tlast_addr %s", inet_ntoa(s_raw_server->ipv4_lease_last));
-        l_srv_vpn->ipv4_unleased = l_item_ipv4->next;
-        DAP_DELETE(l_item_ipv4);
+        size_t l_data_to_write =  l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header);
+        size_t l_data_wrote = dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA , l_pkt_out,
+                l_data_to_write);
+        if (l_data_wrote < l_data_to_write){
+            log_it(L_WARNING, "Buffer overfilled: can't send packet with VPN_PACKET_OP_CODE_VPN_ADDR_REPLY: sent only %zd from %zd",
+                    l_data_wrote,l_data_to_write );
+            dap_chain_net_srv_stream_session_t * l_srv_session= DAP_CHAIN_NET_SRV_STREAM_SESSION(l_ch_vpn->ch->stream->session);
+            assert(l_srv_session);
+            l_srv_session->stats.bytes_sent_lost += l_data_to_write - l_data_wrote;
+            l_srv_session->stats.packets_sent_lost++;
+        }else{
+            log_it(L_NOTICE, "VPN client address %s leased", inet_ntoa(l_ch_vpn->addr_ipv4));
+            log_it(L_INFO, "\tnet gateway %s", inet_ntoa(s_raw_server->ipv4_network_addr));
+            log_it(L_INFO, "\tnet mask %s", inet_ntoa(s_raw_server->ipv4_network_mask));
+            log_it(L_INFO, "\tgw %s", inet_ntoa(s_raw_server->ipv4_gw));
+            log_it(L_INFO, "\tlast_addr %s", inet_ntoa(s_raw_server->ipv4_lease_last));
+            l_srv_vpn->ipv4_unleased = l_item_ipv4->next;
+            DAP_DELETE(l_item_ipv4);
+            l_srv_session->stats.packets_sent++;
+            l_srv_session->stats.bytes_sent+= l_data_wrote;
+        }
     }else{
         struct in_addr n_addr = { 0 }, n_addr_max;
         n_addr.s_addr = ntohl(s_raw_server->ipv4_lease_last.s_addr);
@@ -1141,11 +1207,21 @@ void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv
             memcpy(pkt_out->data + sizeof(l_ch_vpn->addr_ipv4), &s_raw_server->ipv4_gw,
                     sizeof(s_raw_server->ipv4_gw));
 
-            if(dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out,
-                                       pkt_out->header.op_data.data_size + sizeof(pkt_out->header))) {
-                dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
+            size_t l_data_to_write = pkt_out->header.op_data.data_size + sizeof(pkt_out->header);
+            size_t l_data_wrote = dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out,
+                                       l_data_to_write);
+            if (l_data_wrote < l_data_to_write){
+                log_it(L_WARNING, "Buffer overfilled: can't send packet with VPN_PACKET_OP_CODE_VPN_ADDR_REPLY: sent only %zd from %zd",
+                        l_data_wrote,l_data_to_write );
+                dap_chain_net_srv_stream_session_t * l_srv_session= DAP_CHAIN_NET_SRV_STREAM_SESSION(l_ch_vpn->ch->stream->session);
+                assert(l_srv_session);
+                l_srv_session->stats.bytes_sent_lost += l_data_to_write - l_data_wrote;
+                l_srv_session->stats.packets_sent_lost++;
+            }else{
+                l_srv_session->stats.packets_sent++;
+                l_srv_session->stats.bytes_sent+= l_data_wrote;
+                s_tun_send_msg_ip_assigned_all(l_ch_vpn, l_ch_vpn->addr_ipv4);
             }
-            s_tun_send_msg_ip_assigned_all(l_ch_vpn, l_ch_vpn->addr_ipv4);
         } else { // All the network is filled with clients, can't lease a new address
             log_it(L_WARNING, "All the network is filled with clients, can't lease a new address");
             ch_vpn_pkt_t *pkt_out = (ch_vpn_pkt_t*) calloc(1, sizeof(pkt_out->header));
@@ -1153,9 +1229,20 @@ void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv
             pkt_out->header.op_code = VPN_PACKET_OP_CODE_PROBLEM;
             pkt_out->header.usage_id = a_usage->id;
             pkt_out->header.op_problem.code = VPN_PROBLEM_CODE_NO_FREE_ADDR;
-            dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out,
+            size_t l_data_to_write = pkt_out->header.op_data.data_size + sizeof(pkt_out->header);
+            size_t l_data_wrote = dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out,
                     pkt_out->header.op_data.data_size + sizeof(pkt_out->header));
-            dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
+            if (l_data_wrote < l_data_to_write){
+                log_it(L_WARNING, "Buffer overfilled: can't send packet with VPN_PACKET_OP_CODE_PROBLEM: sent only %zd from %zd",
+                        l_data_wrote,l_data_to_write );
+                dap_chain_net_srv_stream_session_t * l_srv_session= DAP_CHAIN_NET_SRV_STREAM_SESSION(l_ch_vpn->ch->stream->session);
+                assert(l_srv_session);
+                l_srv_session->stats.bytes_sent_lost += l_data_to_write - l_data_wrote;
+                l_srv_session->stats.packets_sent_lost++;
+            }else{
+                l_srv_session->stats.packets_sent++;
+                l_srv_session->stats.bytes_sent+= l_data_wrote;
+            }
         }
     }
 }
@@ -1192,16 +1279,21 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
     ch_vpn_pkt_t * l_vpn_pkt = (ch_vpn_pkt_t *) l_pkt->data;
     size_t l_vpn_pkt_size = l_pkt->hdr.size - sizeof (l_vpn_pkt->header);
 
+    if (s_debug_more)
+        log_it(L_INFO, "Got srv_vpn packet with op_code=0x%02x", l_vpn_pkt->header.op_code);
 
-    //log_it(L_DEBUG, "Got SF packet with id %d op_code 0x%02x", remote_sock_id, sf_pkt->header.op_code);
     if(l_vpn_pkt->header.op_code >= 0xb0) { // Raw packets
         switch (l_vpn_pkt->header.op_code) {
             case VPN_PACKET_OP_CODE_PING:
                 a_ch->stream->esocket->last_ping_request = time(NULL);
+                l_srv_session->stats.bytes_recv += l_vpn_pkt_size;
+                l_srv_session->stats.packets_recv++;
                 send_pong_pkt(a_ch);
             break;
             case VPN_PACKET_OP_CODE_PONG:
                 a_ch->stream->esocket->last_ping_request = time(NULL);
+                l_srv_session->stats.bytes_recv += l_vpn_pkt_size;
+                l_srv_session->stats.packets_recv++;
             break;
             // for client
             case VPN_PACKET_OP_CODE_VPN_ADDR_REPLY: { // Assigned address for peer
@@ -1209,11 +1301,15 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                     log_it(L_ERROR, "Can't create tun");
                 }else
                     s_tun_send_msg_ip_assigned_all(CH_VPN(a_ch), CH_VPN(a_ch)->addr_ipv4);
+                l_srv_session->stats.bytes_recv += l_vpn_pkt_size;
+                l_srv_session->stats.packets_recv++;
             } break;
             // for server
             case VPN_PACKET_OP_CODE_VPN_ADDR_REQUEST: { // Client request after L3 connection the new IP address
                 log_it(L_INFO, "Received address request  ");
                 s_ch_packet_in_vpn_address_request(a_ch, l_usage);
+                l_srv_session->stats.bytes_recv += l_vpn_pkt_size;
+                l_srv_session->stats.packets_recv++;
             } break;
             // for client only
             case VPN_PACKET_OP_CODE_VPN_RECV:{
@@ -1221,9 +1317,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                 // Find tun socket for current worker
                 ch_sf_tun_socket_t * l_tun = s_tun_sockets[a_ch->stream_worker->worker->id];
                 assert(l_tun);
-                // Unsafely send it
-                dap_events_socket_write_unsafe( l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size);
-                //dap_events_socket_set_writable_unsafe(l_tun->es, true);
+                s_stream_session_esocket_send(l_srv_session, l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size);
             } break;
 
             // for servier only
@@ -1231,7 +1325,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
                 ch_sf_tun_socket_t * l_tun = s_tun_sockets[a_ch->stream_worker->worker->id];
                 assert(l_tun);
                 // Unsafely send it
-                size_t l_ret = dap_events_socket_write_unsafe( l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size);
+                size_t l_ret = s_stream_session_esocket_send(l_srv_session, l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size);
                 //dap_events_socket_set_writable_unsafe(l_tun->es, true);
                 if( l_ret)
                     s_update_limits (a_ch, l_srv_session, l_usage,l_ret );
@@ -1242,6 +1336,58 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
     }
 }
 
+/**
+ * @brief s_stream_session_esocket_send
+ * @param l_srv_session
+ * @param l_es
+ * @param a_data
+ * @param a_data_size
+ */
+static size_t s_stream_session_esocket_send(dap_chain_net_srv_stream_session_t * l_srv_session, dap_events_socket_t * l_es, const void * a_data, size_t a_data_size )
+{
+    // Lets first try to send it directly with write() call
+    ssize_t l_direct_wrote;
+    size_t l_ret = 0;
+    if (l_es->type == DESCRIPTOR_TYPE_FILE )
+        l_direct_wrote =  write(l_es->fd, a_data, a_data_size);
+    else
+        l_direct_wrote =  send(l_es->fd, a_data, a_data_size, MSG_DONTWAIT | MSG_NOSIGNAL);
+    int l_errno = errno;
+
+    size_t l_data_left_to_send=0;
+    if (l_direct_wrote > 0){
+        l_ret += l_direct_wrote;
+        if((size_t) l_direct_wrote < a_data_size){ // If we sent not all - lets put tail in buffer
+           l_data_left_to_send = a_data_size-l_direct_wrote;
+        }else{
+            l_srv_session->stats.packets_sent++;
+            l_srv_session->stats.bytes_sent+= l_direct_wrote;
+        }
+    }else{
+        l_data_left_to_send = a_data_size;
+        l_direct_wrote=0;
+        if(l_errno != EAGAIN && l_errno != EWOULDBLOCK){
+            char l_errbuf[128];
+            strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
+            log_it(L_WARNING,"Error with data sent: \"%s\" code %d",l_errbuf, l_errno);
+        }
+    }
+
+    if(l_data_left_to_send){
+        if ( dap_events_socket_write_unsafe( l_es, a_data +l_direct_wrote,l_data_left_to_send
+                                             ) < l_data_left_to_send ){
+            log_it(L_WARNING,"Loosing data, probably buffers are overfilling, lost %zd bytes", l_data_left_to_send);
+            l_srv_session->stats.bytes_sent_lost += l_data_left_to_send;
+            l_srv_session->stats.packets_sent_lost++;
+        }else{
+            l_ret += l_data_left_to_send;
+            l_srv_session->stats.packets_sent++;
+            l_srv_session->stats.bytes_sent+= l_direct_wrote;
+        }
+    }
+    return l_ret;
+}
+
 /**
  * @brief stream_sf_packet_out Packet Out Ch callback
  * @param ch
@@ -1277,6 +1423,10 @@ static void s_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
             dap_stream_ch_pkt_write_unsafe( l_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 );
         return;
     }
+    // Check for empty buffer out here to prevent warnings in worker
+    if ( ! a_ch->stream->esocket->buf_out_size )
+        dap_events_socket_set_writable_unsafe(a_ch->stream->esocket,false);
+
 }
 
 void m_es_tun_delete(dap_events_socket_t * a_es, void * arg)
@@ -1295,27 +1445,29 @@ void m_es_tun_read(dap_events_socket_t * a_es, void * arg)
 
     if(l_buf_in_size) {
         struct iphdr *iph = (struct iphdr*) a_es->buf_in;
-        struct in_addr in_daddr;
-        in_daddr.s_addr = iph->daddr;
+        struct in_addr l_in_daddr;
+        l_in_daddr.s_addr = iph->daddr;
 
         //
         dap_chain_net_srv_ch_vpn_info_t * l_vpn_info = NULL;
         // Try to find in worker's clients, without locks
         if ( l_tun_socket->clients){
-            HASH_FIND_INT( l_tun_socket->clients,&in_daddr.s_addr,l_vpn_info );
+            HASH_FIND_INT( l_tun_socket->clients,&l_in_daddr.s_addr,l_vpn_info );
         }
         // We found in local table, sending data (if possible)
         if (l_vpn_info){
             if ( !l_vpn_info->is_on_this_worker && !l_vpn_info->is_reassigned_once ){
                 log_it(L_NOTICE, "Reassigning from worker %u to %u", l_vpn_info->worker->id, a_es->worker->id);
-                dap_events_socket_reassign_between_workers_mt( l_vpn_info->worker,l_vpn_info->esocket,a_es->worker);
                 l_vpn_info->is_reassigned_once = true;
                 s_tun_send_msg_esocket_reasigned_all_mt(l_vpn_info->ch_vpn, l_vpn_info->esocket, l_vpn_info->addr_ipv4,a_es->worker->id);
+                dap_events_socket_reassign_between_workers_mt( l_vpn_info->worker,l_vpn_info->esocket,a_es->worker);
             }
             s_tun_client_send_data(l_vpn_info, a_es->buf_in, l_buf_in_size);
-        }//else{
-        //    log_it(L_DEBUG, "Can't find route for desitnation %s",str_daddr);
-        //}
+        }else if(s_debug_more){
+            char l_str_daddr[INET_ADDRSTRLEN];
+            inet_ntop(AF_INET,&l_in_daddr,l_str_daddr,sizeof (l_in_daddr));
+            log_it(L_WARNING, "Can't find route for desitnation %s",l_str_daddr);
+        }
     }
     a_es->buf_in_size=0; // NULL it out because read it all
 }
diff --git a/modules/service/vpn/include/dap_chain_net_srv_vpn.h b/modules/service/vpn/include/dap_chain_net_srv_vpn.h
index 3c175fa2c3..a073dbad18 100644
--- a/modules/service/vpn/include/dap_chain_net_srv_vpn.h
+++ b/modules/service/vpn/include/dap_chain_net_srv_vpn.h
@@ -90,7 +90,7 @@ typedef struct ch_vpn_pkt {
             } raw; // Raw access to OP bytes
         };
     } DAP_ALIGN_PACKED header;
-    uint8_t data[]; // Binary data nested by packet
+    byte_t data[]; // Binary data nested by packet
 }DAP_ALIGN_PACKED ch_vpn_pkt_t;
 
 typedef struct ch_sf_tun_socket ch_sf_tun_socket_t;
-- 
GitLab