From ecabf1c82b2f2ad3acd28b00833238b03a7d34ac Mon Sep 17 00:00:00 2001
From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net>
Date: Mon, 9 Dec 2019 23:19:34 +0700
Subject: [PATCH] [*] Update limits and check for enabled\disabled

---
 dap_chain_net_srv_vpn.c | 98 +++++++++++++++++++++++++----------------
 1 file changed, 59 insertions(+), 39 deletions(-)

diff --git a/dap_chain_net_srv_vpn.c b/dap_chain_net_srv_vpn.c
index a145de2..f184914 100755
--- a/dap_chain_net_srv_vpn.c
+++ b/dap_chain_net_srv_vpn.c
@@ -1176,46 +1176,55 @@ void * srv_ch_sf_thread(void * arg)
         for(n = 0; n < nfds; ++n) {
             int s = events[n].data.fd;
 
-            ch_vpn_socket_proxy_t * sf = NULL;
+            ch_vpn_socket_proxy_t * l_socket_proxy = NULL;
             pthread_mutex_lock(&s_sf_socks_mutex);
-            HASH_FIND(hh_sock, sf_socks_client, &s, sizeof(s), sf);
+            HASH_FIND(hh_sock, sf_socks_client, &s, sizeof(s), l_socket_proxy);
             pthread_mutex_unlock(&s_sf_socks_mutex);
-            if(sf) {
+            if( l_socket_proxy ) {
+                dap_chain_net_srv_stream_session_t * l_srv_session = DAP_CHAIN_NET_SRV_STREAM_SESSION (l_socket_proxy->ch->stream->session );
+                dap_chain_net_srv_ch_vpn_t *l_ch_vpn = CH_VPN(l_socket_proxy->ch);
+                dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find(l_srv_session,  l_ch_vpn->usage_id);
+
                 if(events[n].events & EPOLLERR) {
                     log_it(L_NOTICE, "Socket id %d has EPOLLERR flag on", s);
-                    pthread_mutex_lock(&(sf->mutex));
-                    srv_stream_sf_disconnect(sf);
-                    pthread_mutex_unlock(&(sf->mutex));
+                    pthread_mutex_lock(&(l_socket_proxy->mutex));
+                    srv_stream_sf_disconnect(l_socket_proxy);
+                    pthread_mutex_unlock(&(l_socket_proxy->mutex));
                 } else if(events[n].events & EPOLLIN) {
                     char buf[1000000];
-                    size_t buf_size;
                     ssize_t ret;
-                    pthread_mutex_lock(&(sf->mutex));
-                    if(sf->pkt_out_size < ((sizeof(sf->pkt_out) / sizeof(sf->pkt_out[0])) - 1)) {
-                        ret = recv(sf->sock, buf, sizeof(buf), 0);
+                    pthread_mutex_lock(&(l_socket_proxy->mutex));
+                    if(l_socket_proxy->pkt_out_size < ((sizeof(l_socket_proxy->pkt_out) / sizeof(l_socket_proxy->pkt_out[0])) - 1)) {
+                        ret = recv(l_socket_proxy->sock, buf, sizeof(buf), 0);
                         //log_it(L_DEBUG,"recv() returned %d",ret);
                         if(ret > 0) {
-                            buf_size = ret;
-                            ch_vpn_pkt_t * pout;
-                            pout = sf->pkt_out[sf->pkt_out_size] = (ch_vpn_pkt_t *) calloc(1,
-                                    buf_size + sizeof(pout->header));
-                            pout->header.op_code = VPN_PACKET_OP_CODE_RECV;
-                            pout->header.sock_id = sf->id;
-                            pout->header.op_data.data_size = buf_size;
-                            memcpy(pout->data, buf, buf_size);
-                            sf->pkt_out_size++;
-                            pthread_mutex_unlock(&(sf->mutex));
-                            dap_stream_ch_set_ready_to_write(sf->ch, true);
+                            size_t buf_size = ret;
+                            s_update_limits(l_socket_proxy->ch,l_srv_session,l_usage,buf_size);
+                            if ( dap_stream_ch_get_ready_to_read(l_socket_proxy->ch) ){
+                                ch_vpn_pkt_t * pout;
+                                pout = l_socket_proxy->pkt_out[l_socket_proxy->pkt_out_size] = (ch_vpn_pkt_t *) calloc(1,
+                                        buf_size + sizeof(pout->header));
+                                pout->header.op_code = VPN_PACKET_OP_CODE_RECV;
+                                pout->header.sock_id = l_socket_proxy->id;
+                                pout->header.op_data.data_size = buf_size;
+                                memcpy(pout->data, buf, buf_size);
+                                l_socket_proxy->pkt_out_size++;
+                                pthread_mutex_unlock(&(l_socket_proxy->mutex));
+                                dap_stream_ch_set_ready_to_write(l_socket_proxy->ch, true);
+                            }else{
+                                pthread_mutex_unlock(&(l_socket_proxy->mutex));
+                            }
+
                         } else {
                             log_it(L_NOTICE,
                                     "Socket id %d returned error on recv() function - may be host has disconnected", s);
-                            pthread_mutex_unlock(&(sf->mutex));
-                            dap_stream_ch_set_ready_to_write(sf->ch, true);
-                            srv_stream_sf_disconnect(sf);
+                            pthread_mutex_unlock(&(l_socket_proxy->mutex));
+                            dap_stream_ch_set_ready_to_write(l_socket_proxy->ch, true);
+                            srv_stream_sf_disconnect(l_socket_proxy);
                         }
                     } else {
                         log_it(L_WARNING, "Can't receive data, full of stack");
-                        pthread_mutex_unlock(&(sf->mutex));
+                        pthread_mutex_unlock(&(l_socket_proxy->mutex));
                     }
                 } else {
                     log_it(L_WARNING, "Unprocessed flags 0x%08X", events[n].events);
@@ -1253,11 +1262,15 @@ void* srv_ch_sf_thread_raw(void *arg)
      return NULL;
      }
      */
-    uint8_t *tmp_buf;
+    uint8_t *l_tmp_buf;
     ssize_t tmp_buf_size;
     static int tun_MTU = 100000; /// TODO Replace with detection of MTU size
 
-    tmp_buf = (uint8_t *) calloc(1, tun_MTU);
+    l_tmp_buf = (uint8_t *) alloca(tun_MTU);
+    if ( !l_tmp_buf){ // Can't allocate on stack
+        log_it(L_ERROR,"Can't allocate read buffer on stack, trying to get one from heap");
+        l_tmp_buf = DAP_NEW_SIZE(uint8_t, tun_MTU);
+    }
     tmp_buf_size = 0;
     log_it(L_INFO, "Tun/tap thread starts with MTU = %d", tun_MTU);
 
@@ -1286,12 +1299,12 @@ void* srv_ch_sf_thread_raw(void *arg)
             }
 
             if(FD_ISSET(s_raw_server->tun_fd, &fds_read_active)) {
-                int read_ret = read(s_raw_server->tun_fd, tmp_buf, tun_MTU);
-                if(read_ret < 0) {
-                    log_it(L_CRITICAL, "Tun/tap read returned '%s' error, code (%d)", strerror(errno), read_ret);
+                int l_read_ret = read(s_raw_server->tun_fd, l_tmp_buf, tun_MTU);
+                if(l_read_ret < 0) {
+                    log_it(L_CRITICAL, "Tun/tap read returned '%s' error, code (%d)", strerror(errno), l_read_ret);
                     break;
                 } else {
-                    struct iphdr *iph = (struct iphdr*) tmp_buf;
+                    struct iphdr *iph = (struct iphdr*) l_tmp_buf;
                     struct in_addr in_daddr, in_saddr;
                     in_daddr.s_addr = iph->daddr;
                     in_saddr.s_addr = iph->saddr;
@@ -1309,14 +1322,21 @@ void* srv_ch_sf_thread_raw(void *arg)
 
                     ///
                     if(l_ch_vpn) { // Is present in hash table such destination address
-                        ch_vpn_pkt_t *pkt_out = (ch_vpn_pkt_t*) calloc(1, sizeof(pkt_out->header) + read_ret);
-                        pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_RECV;
-                        pkt_out->header.sock_id = s_raw_server->tun_fd;
-                        pkt_out->header.op_data.data_size = read_ret;
-                        memcpy(pkt_out->data, tmp_buf, read_ret);
-                        dap_stream_ch_pkt_write(l_ch_vpn->ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out,
-                                pkt_out->header.op_data.data_size + sizeof(pkt_out->header));
-                        dap_stream_ch_set_ready_to_write(l_ch_vpn->ch, true);
+
+                        if (dap_stream_ch_get_ready_to_read(l_ch_vpn->ch ) ){
+                            dap_chain_net_srv_stream_session_t * l_srv_session = DAP_CHAIN_NET_SRV_STREAM_SESSION (l_ch_vpn->ch->stream->session );
+                            dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find(l_srv_session,  l_ch_vpn->usage_id);
+                            ch_vpn_pkt_t *l_pkt_out = DAP_NEW_Z_SIZE(ch_vpn_pkt_t, sizeof(l_pkt_out->header) + l_read_ret);
+                            l_pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_RECV;
+                            l_pkt_out->header.sock_id = s_raw_server->tun_fd;
+                            l_pkt_out->header.usage_id = l_ch_vpn->usage_id;
+                            l_pkt_out->header.op_data.data_size = l_read_ret;
+                            memcpy(l_pkt_out->data, l_tmp_buf, l_read_ret);
+                            dap_stream_ch_pkt_write(l_ch_vpn->ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, l_pkt_out,
+                                    l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header));
+                            dap_stream_ch_set_ready_to_write(l_ch_vpn->ch, true);
+                            s_update_limits(l_ch_vpn->ch,l_srv_session,l_usage, l_read_ret);
+                        }
                     }
                     pthread_rwlock_unlock(&s_clients_rwlock);
                 }
-- 
GitLab