From 7aa244b6b53ca8cb482d64c7793b76033dfbe415 Mon Sep 17 00:00:00 2001
From: "aleksei.voronin" <aleksei.voronin@demlabs.net>
Date: Thu, 7 May 2020 05:34:52 +0300
Subject: [PATCH] [*] Raw tun device processing in working threads instead of
 separate thread. Hidden by define. Enabling is not recommended.

---
 dap-sdk/net/core/dap_events.c               |   3 +-
 modules/service/vpn/dap_chain_net_srv_vpn.c | 105 ++++++++++++++++++++
 2 files changed, 107 insertions(+), 1 deletion(-)

diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c
index 936484554a..0d8e6009e7 100644
--- a/dap-sdk/net/core/dap_events.c
+++ b/dap-sdk/net/core/dap_events.c
@@ -373,7 +373,8 @@ static void *thread_worker_function(void *arg)
             if(((events[n].events & EPOLLOUT) || (cur->flags & DAP_SOCK_READY_TO_WRITE))
                     && !(cur->flags & DAP_SOCK_SIGNAL_CLOSE)) {
                 ///log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size);
-                cur->callbacks->write_callback(cur, NULL); // Call callback to process write event
+                if(cur->callbacks->write_callback)
+                    cur->callbacks->write_callback(cur, NULL); // Call callback to process write event
 
                 if(cur->flags & DAP_SOCK_READY_TO_WRITE) {
 
diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c
index c76c26102a..4a8f310518 100644
--- a/modules/service/vpn/dap_chain_net_srv_vpn.c
+++ b/modules/service/vpn/dap_chain_net_srv_vpn.c
@@ -67,6 +67,7 @@
 #include "dap_chain_net_srv_stream_session.h"
 #include "dap_chain_net_vpn_client.h"
 #include "dap_chain_ledger.h"
+#include "dap_events.h"
 
 #define LOG_TAG "dap_chain_net_srv_vpn"
 
@@ -93,6 +94,9 @@ typedef struct vpn_local_network {
     int tun_fd;
     struct ifreq ifr;
 
+#ifdef DAP_TUN_IN_WORKER
+    dap_events_socket_t * tun_events_socket;
+#endif
     ch_vpn_pkt_t * pkt_out[400];
     size_t pkt_out_size;
     size_t pkt_out_rindex;
@@ -151,6 +155,31 @@ static void s_update_limits(dap_stream_ch_t * a_ch ,
                            dap_chain_net_srv_stream_session_t * a_srv_session,
                            dap_chain_net_srv_usage_t * a_usage, size_t a_bytes);
 
+#ifdef DAP_TUN_IN_WORKER
+static void m_es_tun_delete(dap_events_socket_t * a_es, void * arg);
+static void m_es_tun_read(dap_events_socket_t * a_es, void * arg);
+static void m_es_tun_error(dap_events_socket_t * a_es, void * arg);
+
+//TODO: create .new_callback for event sockets
+int s_tun_event_stream_create()
+{
+  static dap_events_socket_callbacks_t l_s_callbacks = {
+          .read_callback = m_es_tun_read,
+          .write_callback = NULL,
+          .error_callback = m_es_tun_error,
+          .delete_callback = m_es_tun_delete
+  };
+
+  s_raw_server->tun_events_socket = dap_events_socket_wrap_no_add(NULL,
+                                                                  s_raw_server->tun_fd, &l_s_callbacks);
+  s_raw_server->tun_events_socket->type = DESCRIPTOR_TYPE_FILE;
+  dap_events_socket_create_after(s_raw_server->tun_events_socket);
+  s_raw_server->tun_events_socket->_inheritor = s_raw_server;
+
+  return 0;
+}
+#endif
+
 /**
  * @brief dap_stream_ch_vpn_init Init actions for VPN stream channel
  * @param vpn_addr Zero if only client mode. Address if the node shares its local VPN
@@ -168,7 +197,20 @@ int dap_chain_net_srv_vpn_init(dap_config_t * g_config) {
         pthread_mutex_init(&s_raw_server->pkt_out_mutex, NULL);
         pthread_mutex_init(&s_sf_socks_mutex, NULL);
         pthread_cond_init(&s_sf_socks_cond, NULL);
+
+#ifdef DAP_TUN_IN_WORKER
+        s_tun_create();
+
+        if(s_raw_server->tun_fd == -1){
+          log_it(L_CRITICAL,"Error creating file descriptor for /dev/net/tun device");
+          return -2;
+        }
+
+        s_tun_event_stream_create();
+#else
         pthread_create(&srv_sf_socks_raw_pid, NULL, srv_ch_sf_thread_raw, NULL);
+#endif
+
         pthread_create(&srv_sf_socks_pid, NULL, srv_ch_sf_thread, NULL);
         dap_stream_ch_proc_add(DAP_STREAM_CH_ID_NET_SRV_VPN, s_new, srv_ch_vpn_delete, s_ch_packet_in,
                 s_ch_packet_out);
@@ -427,6 +469,9 @@ static void s_tun_create(void)
 static void s_tun_destroy(void)
 {
     pthread_rwlock_wrlock(& s_raw_server_rwlock);
+#ifdef DAP_TUN_IN_WORKER
+    dap_events_socket_kill_socket(s_raw_server->tun_events_socket);
+#endif
     close(s_raw_server->tun_fd);
     s_raw_server->tun_fd = -1;
     pthread_rwlock_unlock(& s_raw_server_rwlock);
@@ -1398,4 +1443,64 @@ void* srv_ch_sf_thread_raw(void *arg)
     return NULL;
 }
 
+#ifdef DAP_TUN_IN_WORKER
+void m_es_tun_delete(dap_events_socket_t * a_es, void * arg)
+{
+  log_it(L_WARNING, __PRETTY_FUNCTION__);
+  log_it(L_NOTICE, "Raw sockets listen thread is stopped");
+  dap_events_socket_kill_socket(s_raw_server->tun_events_socket);
+  s_tun_destroy();
+}
+
+void m_es_tun_read(dap_events_socket_t * a_es, void * arg)
+{
+    const static int tun_MTU = 100000; /// TODO Replace with detection of MTU size
+    uint8_t l_tmp_buf[tun_MTU];
+
+    size_t l_read_ret;
+
+    do{
+        l_read_ret = dap_events_socket_read(s_raw_server->tun_events_socket, l_tmp_buf, sizeof(l_tmp_buf));
+
+        if(l_read_ret > 0) {
+            struct iphdr *iph = (struct iphdr*) l_tmp_buf;
+            struct in_addr in_daddr, in_saddr;
+            in_daddr.s_addr = iph->daddr;
+            in_saddr.s_addr = iph->saddr;
+            char str_daddr[42], str_saddr[42];
+            dap_snprintf(str_saddr, sizeof(str_saddr), "%s",inet_ntoa(in_saddr) );
+            dap_snprintf(str_daddr, sizeof(str_daddr), "%s",inet_ntoa(in_daddr) );
+
+            dap_chain_net_srv_ch_vpn_t * l_ch_vpn = NULL;
+            pthread_rwlock_rdlock(&s_clients_rwlock);
+            HASH_FIND(hh,s_ch_vpn_addrs, &in_daddr, sizeof (in_daddr), l_ch_vpn);
+
+            if(l_ch_vpn) { // Is present in hash table such destination address
+
+                if (dap_stream_ch_get_ready_to_read(l_ch_vpn->ch ) ){
+                    dap_chain_net_srv_stream_session_t * l_srv_session = DAP_CHAIN_NET_SRV_STREAM_SESSION (l_ch_vpn->ch->stream->session );
+                    dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find(l_srv_session,  l_ch_vpn->usage_id);
+                    ch_vpn_pkt_t *l_pkt_out = DAP_NEW_Z_SIZE(ch_vpn_pkt_t, sizeof(l_pkt_out->header) + l_read_ret);
+                    l_pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_RECV;
+                    l_pkt_out->header.sock_id = s_raw_server->tun_fd;
+                    l_pkt_out->header.usage_id = l_ch_vpn->usage_id;
+                    l_pkt_out->header.op_data.data_size = l_read_ret;
+                    memcpy(l_pkt_out->data, l_tmp_buf, l_read_ret);
+                    dap_stream_ch_pkt_write(l_ch_vpn->ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, l_pkt_out,
+                            l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header));
+                    s_update_limits(l_ch_vpn->ch,l_srv_session,l_usage, l_read_ret);
+                }
+            }
+            pthread_rwlock_unlock(&s_clients_rwlock);
+        }
+    }while(l_read_ret > 0);
+
+    dap_events_socket_set_readable(a_es, true);
+}
+
+void m_es_tun_error(dap_events_socket_t * a_es, void * arg)
+{
+  log_it(L_WARNING, __PRETTY_FUNCTION__);
+}
+#endif
 
-- 
GitLab