From 16603ca73c1bbab6475f7680376c6ffc9e1da29c Mon Sep 17 00:00:00 2001
From: "aleksei.voronin" <aleksei.voronin@demlabs.net>
Date: Fri, 21 Aug 2020 00:14:19 +0300
Subject: [PATCH] [*] started replacing old single tun-fd logic with
 multithread tun from sapserver

---
 dap-sdk/net/core/dap_events.c                 |  21 ++-
 dap-sdk/net/core/dap_events_socket.c          |  13 +-
 dap-sdk/net/core/include/dap_events.h         |   9 +-
 modules/service/vpn/dap_chain_net_srv_vpn.c   | 129 +++++++++++++++++-
 .../vpn/include/dap_chain_net_srv_vpn.h       |  74 ++++++++++
 5 files changed, 235 insertions(+), 11 deletions(-)

diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c
index 0228f5b783..6d28b36169 100644
--- a/dap-sdk/net/core/dap_events.c
+++ b/dap-sdk/net/core/dap_events.c
@@ -91,7 +91,7 @@ dap_thread_t *s_threads = NULL;
 
 #define LOG_TAG "dap_events"
 
-uint32_t s_get_cpu_count( )
+uint32_t dap_get_cpu_count( )
 {
 #ifdef _WIN32
   SYSTEM_INFO si;
@@ -123,7 +123,7 @@ uint32_t s_get_cpu_count( )
  */
 int32_t dap_events_init( uint32_t a_threads_count, size_t conn_timeout )
 {
-  s_threads_count = a_threads_count ? a_threads_count : s_get_cpu_count( );
+  s_threads_count = a_threads_count ? a_threads_count : dap_get_cpu_count( );
 
   if ( conn_timeout )
     s_connection_timeout = conn_timeout;
@@ -521,6 +521,11 @@ uint32_t dap_worker_get_index_min( )
   return min;
 }
 
+dap_worker_t * dap_worker_get_index(uint8_t a_index)
+{
+    return a_index < s_threads_count ? &s_workers[a_index] : NULL;
+}
+
 /**
  * @brief dap_worker_print_all
  */
@@ -584,12 +589,22 @@ int dap_events_wait( dap_events_t *sh )
   return 0;
 }
 
+/**
+ * @brief sap_worker_add_events_socket
+ * @param a_events_socket
+ * @param a_worker
+ */
+void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_worker_t * a_worker)
+{
+    eventfd_write( a_worker->eventsfd_new, (eventfd_t) a_events_socket );
+}
+
 /**
  * @brief dap_worker_add_events_socket
  * @param a_worker
  * @param a_events_socket
  */
-void dap_worker_add_events_socket( dap_events_socket_t *a_es)
+void dap_worker_add_events_socket_auto( dap_events_socket_t *a_es)
 {
 //  struct epoll_event ev = {0};
   dap_worker_t *l_worker = dap_worker_get_min( );
diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c
index c73c4d0039..d637a157e9 100644
--- a/dap-sdk/net/core/dap_events_socket.c
+++ b/dap-sdk/net/core/dap_events_socket.c
@@ -96,6 +96,17 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events,
   return ret;
 }
 
+/**
+ * @brief dap_events_socket_assign_on_worker
+ * @param a_es
+ * @param a_worker
+ */
+void dap_events_socket_assign_on_worker(dap_events_socket_t * a_es, struct dap_worker * a_worker)
+{
+    a_es->last_ping_request = time(NULL);
+    dap_worker_add_events_socket(a_es,a_worker);
+}
+
 /**
  * @brief dap_events_socket_create_after
  * @param a_es
@@ -107,7 +118,7 @@ void dap_events_socket_create_after( dap_events_socket_t *a_es )
 
   a_es->last_time_active = a_es->last_ping_request = time( NULL );
 
-  dap_worker_add_events_socket( a_es );
+  dap_worker_add_events_socket_auto( a_es );
 
   pthread_mutex_lock( &a_es->dap_worker->locker_on_count );
 
diff --git a/dap-sdk/net/core/include/dap_events.h b/dap-sdk/net/core/include/dap_events.h
index e7b7777b10..c63b78bb53 100644
--- a/dap-sdk/net/core/include/dap_events.h
+++ b/dap-sdk/net/core/include/dap_events.h
@@ -29,6 +29,7 @@
 #include <stdint.h>
 #include <pthread.h>
 #include <stdatomic.h>
+#include <sys/eventfd.h>
 #define EPOLL_HANDLE  int
 #else
 #define EPOLL_HANDLE  HANDLE
@@ -64,8 +65,8 @@ typedef struct dap_events {
 typedef struct dap_worker
 {
   atomic_uint event_sockets_count;
-  //uint32_t event_to_kill_count;
 
+  int eventsfd_new; // Events fd for new socket
   EPOLL_HANDLE epoll_fd;
   uint32_t number_thread;
   pthread_mutex_t locker_on_count;
@@ -87,6 +88,10 @@ int32_t dap_events_wait( dap_events_t *sh );
 uint32_t dap_worker_get_index_min( );
 dap_worker_t *dap_worker_get_min( );
 
-void dap_worker_add_events_socket( dap_events_socket_t * a_events_socket );
+uint32_t dap_get_cpu_count( );
+dap_worker_t * dap_worker_get_index(uint8_t a_index);
+
+void dap_events_socket_assign_on_worker(dap_events_socket_t * a_es, struct dap_worker * a_worker);
+void dap_worker_add_events_socket_auto( dap_events_socket_t * a_events_socket );
 void dap_worker_print_all( );
 
diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c
index 5f5e56c22b..b061a19f24 100644
--- a/modules/service/vpn/dap_chain_net_srv_vpn.c
+++ b/modules/service/vpn/dap_chain_net_srv_vpn.c
@@ -74,8 +74,6 @@
 
 #define LOG_TAG "dap_chain_net_srv_vpn"
 
-#define DAP_TUN_IN_WORKER
-
 #define SF_MAX_EVENTS 256
 
 typedef struct usage_client {
@@ -325,12 +323,133 @@ int dap_chain_net_srv_client_vpn_init(dap_config_t * g_config) {
     return 0;
 }
 
+
+ch_sf_tun_server_t * m_tun_server = NULL;
+
+int s_tun_deattach_queue(int fd)
+{
+    struct ifreq ifr;
+    memset(&ifr, 0, sizeof(ifr));
+    ifr.ifr_flags = IFF_DETACH_QUEUE;
+    return ioctl(fd, TUNSETQUEUE, (void *)&ifr);
+}
+
+
+/**
+ * @brief s_tun_es_create
+ * @param a_worker
+ * @return
+ */
+dap_events_socket_t * s_tun_es_create(dap_worker_t * a_worker, int a_tun_fd)
+{
+    assert(a_worker);
+    static dap_events_socket_callbacks_t l_s_callbacks = { 0 };
+//    l_s_callbacks.new_callback = s_es_tun_new;
+//    l_s_callbacks.read_callback = s_es_tun_read;
+//    l_s_callbacks.error_callback = s_es_tun_error;
+//    l_s_callbacks.delete_callback = s_es_tun_delete;
+
+    s_tun_deattach_queue(a_tun_fd);
+
+    dap_events_socket_t * l_es = dap_events_socket_wrap_no_add(a_worker->events ,
+                                          a_tun_fd, &l_s_callbacks);
+    l_es->type = DESCRIPTOR_TYPE_FILE;
+    dap_events_socket_assign_on_worker(l_es, a_worker);
+
+    return l_es;
+}
+
+int ch_sf_tun_create(dap_config_t * g_config)
+{
+    const char *c_addr = dap_config_get_item_str(g_config, "srv_vpn", "network_address");
+    const char *c_mask = dap_config_get_item_str(g_config, "srv_vpn", "network_mask");
+    if(!c_addr || !c_mask){
+        log_it(L_ERROR, "%s: error while reading network parameters from config (network_address and network_mask)", __PRETTY_FUNCTION__);
+        DAP_DELETE((void*)c_addr);
+        DAP_DELETE((void*)c_mask);
+        return -1;
+    }
+
+    inet_aton(c_addr, &m_tun_server->int_network );
+    inet_aton(c_mask, &m_tun_server->int_network_mask );
+    m_tun_server->int_network_addr.s_addr= (m_tun_server->int_network.s_addr | 0x01000000); // grow up some shit here!
+    m_tun_server->client_addr_last.s_addr = m_tun_server->int_network_addr.s_addr;
+    m_tun_server->announces_num=0;
+
+    memset(&m_tun_server->ifr, 0, sizeof(m_tun_server->ifr));
+    m_tun_server->ifr.ifr_flags = IFF_TUN | IFF_MULTI_QUEUE| IFF_NO_PI;
+
+    uint32_t l_cpu_count = dap_get_cpu_count(); // maybe replace with getting s_threads_count directly
+    log_it(L_NOTICE,"%s: trying to initialize multiqueue for %u workers", __PRETTY_FUNCTION__, l_cpu_count);
+
+    int err = -1;
+    for( uint8_t i =0; i< l_cpu_count; i++){
+        dap_worker_t * l_worker = dap_worker_get_index(i);
+        assert( l_worker );
+        int l_tun_fd;
+        if( (l_tun_fd = open("/dev/net/tun", O_RDWR| O_NONBLOCK)) < 0 ) {
+            log_it(L_ERROR,"Opening /dev/net/tun error: '%s'", strerror(errno));
+            err = -100;
+            break;
+        }
+        log_it(L_DEBUG,"Opening /dev/net/tun:%u", i);
+        if( (err = ioctl(l_tun_fd, TUNSETIFF, (void *)& m_tun_server->ifr)) < 0 ) {
+            log_it(L_CRITICAL, "ioctl(TUNSETIFF) error: '%s' ",strerror(errno));
+            close(l_tun_fd);
+            break;
+        }
+        s_tun_deattach_queue(l_tun_fd);
+        s_tun_es_create(l_worker, l_tun_fd);
+    }
+
+    if (! err ){
+        char buf[256];
+        log_it(L_NOTICE,"Bringed up %s virtual network interface (%s/%s)", m_tun_server->ifr.ifr_name,inet_ntoa(m_tun_server->int_network_addr),c_mask);
+        snprintf(buf,sizeof(buf),"ip link set %s up",m_tun_server->ifr.ifr_name);
+        system(buf);
+        snprintf(buf,sizeof(buf),"ip addr add %s/%s dev %s ",inet_ntoa(m_tun_server->int_network_addr),c_mask, m_tun_server->ifr.ifr_name );
+        system(buf);
+    }
+
+    return err;
+}
+
+/**
+* @brief ch_sf_tun_init
+* @return
+*/
+int ch_sf_tun_init()
+{
+    m_tun_server=DAP_NEW_Z(ch_sf_tun_server_t);
+    m_tun_server->peers_max = CH_SF_PEER_MAX;
+    m_tun_server->peers = DAP_NEW_Z_SIZE(ch_sf_peer_info_t, m_tun_server->peers_max);
+    pthread_rwlock_init(&m_tun_server->rwlock, NULL);
+
+    pthread_mutex_init(&m_tun_server->external_route_operations,NULL);
+    pthread_mutex_init(&m_tun_server->pkt_out_mutex,NULL);
+
+    return 0;
+}
+
+int dap_chain_net_srv_vpn_init(dap_config_t * g_config) {
+    ch_sf_tun_init();
+
+    log_it(L_DEBUG,"Initializing TUN driver...");
+    ch_sf_tun_create(g_config);
+    log_it(L_INFO,"TUN driver configured successfuly");
+
+    //TODO: initialize dap_chain_net_srv_vpn_t* here,
+    //take it from previous version of dap_chain_net_srv_vpn_init() (below)
+    return 0;
+}
+
 /**
  * @brief dap_stream_ch_vpn_init Init actions for VPN stream channel
  * @param vpn_addr Zero if only client mode. Address if the node shares its local VPN
  * @param vpn_mask Zero if only client mode. Mask if the node shares its local VPN
  * @return 0 if everything is okay, lesser then zero if errors
  */
+/*
 int dap_chain_net_srv_vpn_init(dap_config_t * g_config) {
     const char *c_addr = dap_config_get_item_str(g_config, "srv_vpn", "network_address");
     const char *c_mask = dap_config_get_item_str(g_config, "srv_vpn", "network_mask");
@@ -370,9 +489,8 @@ int dap_chain_net_srv_vpn_init(dap_config_t * g_config) {
 
         uint16_t l_pricelist_count = 0;
 
-        /* ! IMPORTANT ! This fetch is single-action and cannot be further reused, since it modifies the stored config data
-         * ! it also must NOT be freed within this module !
-         */
+        //! IMPORTANT ! This fetch is single-action and cannot be further reused, since it modifies the stored config data
+        //! it also must NOT be freed within this module !
         char **l_pricelist = dap_config_get_array_str(g_config, "srv_vpn", "pricelist", &l_pricelist_count); // must not be freed!
         for (uint16_t i = 0; i < l_pricelist_count; i++) {
             dap_chain_net_srv_price_t *l_price = DAP_NEW_Z(dap_chain_net_srv_price_t);
@@ -450,6 +568,7 @@ int dap_chain_net_srv_vpn_init(dap_config_t * g_config) {
     }
     return -1;
 }
+*/
 
 /**
  * @brief ch_sf_deinit
diff --git a/modules/service/vpn/include/dap_chain_net_srv_vpn.h b/modules/service/vpn/include/dap_chain_net_srv_vpn.h
index b7896676ff..f01ed33771 100644
--- a/modules/service/vpn/include/dap_chain_net_srv_vpn.h
+++ b/modules/service/vpn/include/dap_chain_net_srv_vpn.h
@@ -26,10 +26,13 @@
 #pragma once
 #ifdef DAP_OS_UNIX
 #include <netinet/in.h>
+#include <linux/if.h>
+#include <linux/if_tun.h>
 #endif
 
 #include "dap_config.h"
 #include "dap_chain_net_srv.h"
+#include "dap_events.h"
 
 
 #define DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_CLIENT    0x01
@@ -153,6 +156,77 @@ typedef struct dap_chain_net_srv_vpn
     dap_chain_net_srv_t * parent;
 } dap_chain_net_srv_vpn_t;
 
+
+#define CH_SF_PEER_MAX 20
+
+
+typedef struct ch_sf_peer_info {
+    in_addr_t 	addr;
+    in_addr_t 	netmask;
+    in_addr_t 	netaddr;
+    in_addr_t 	gw;
+    dap_stream_ch_t * ch;
+    uint64_t 	bytes_sent;
+    uint64_t 	bytes_recieved;
+    uint16_t	metric;					// route metric
+    uint16_t	ttl;					// route ttl, =0 means undead
+    bool		from_uplink;			// this route is received from uplink
+    bool		in_use;
+    bool		active;
+} ch_sf_peer_info_t;
+
+typedef  struct ch_sf_tun_socket ch_sf_tun_socket_t;
+
+typedef struct ch_sf_tun_client{
+    in_addr_t addr;
+    dap_stream_ch_t * ch;  /* TODO: check if it's same as sap_stream_ch_t */
+    ch_sf_tun_socket_t * tun_socket;
+
+    uint64_t bytes_sent;
+    uint64_t bytes_recieved;
+
+     UT_hash_handle hh;
+} ch_sf_tun_client_t;
+
+typedef struct ch_sf_tun_socket {
+    uint8_t worker_id;
+    dap_worker_t * worker; /* TODO: check if it's same as sap_worker_t (it's not) */
+    dap_events_socket_t * es;
+
+    ch_sf_tun_client_t * clients; // Remote clients identified by destination address
+
+    UT_hash_handle hh;
+}ch_sf_tun_socket_t;
+#define CH_SF_TUN_SOCKET(a) ((ch_sf_tun_socket_t*) a->_inheritor )
+
+struct ch_sf_tun_server{
+    struct in_addr client_addr_last;
+    struct in_addr int_network_mask;
+    struct in_addr int_network_addr;
+    struct in_addr announces_mask[10];
+    struct in_addr announces_addr[10];
+    struct in_addr int_network;
+    int 	tun_ctl_fd;
+    uint	announces_num;
+    struct ifreq ifr;
+    ch_sf_tun_client_t * clients; // Remote clients identified by destination address
+    ch_sf_peer_info_t *peers;
+    size_t peers_count;
+    size_t peers_max;
+
+    ch_vpn_pkt_t * pkt_out[400]; /* TODO: it differs from ch_sf_pkt with padding */
+    size_t pkt_out_size;
+    size_t pkt_out_rindex;
+    size_t pkt_out_windex;
+    pthread_mutex_t pkt_out_mutex;
+
+    pthread_mutex_t external_route_operations;
+
+    pthread_rwlock_t rwlock;
+};
+typedef struct ch_sf_tun_server ch_sf_tun_server_t;
+extern ch_sf_tun_server_t * m_tun_server;
+
 #define CH_VPN(a) ((dap_chain_net_srv_ch_vpn_t *) ((a)->internal) )
 
 bool is_dap_tun_in_worker(void);
-- 
GitLab