From ebe689067a68d8a13a798f033e39037185cd1b49 Mon Sep 17 00:00:00 2001
From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net>
Date: Tue, 15 Dec 2020 16:28:05 +0700
Subject: [PATCH] [!] FlowControl is switched off by default because
 reassigment is broken

---
 CMakeLists.txt                              |  2 +-
 dap-sdk/net/core/dap_events_socket.c        | 23 +++++++++++++----
 modules/service/vpn/dap_chain_net_srv_vpn.c | 28 ++++++++++++---------
 3 files changed, 35 insertions(+), 18 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 81616169ba..ac00319ed8 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -2,7 +2,7 @@ project(cellframe-sdk C)
 cmake_minimum_required(VERSION 2.8)
 
 set(CMAKE_C_STANDARD 11)
-set(CELLFRAME_SDK_NATIVE_VERSION "2.6-81")
+set(CELLFRAME_SDK_NATIVE_VERSION "2.6-82")
 add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"")
 set(DAPSDK_MODULES "")
 
diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c
index 338c505608..9ea9132341 100644
--- a/dap-sdk/net/core/dap_events_socket.c
+++ b/dap-sdk/net/core/dap_events_socket.c
@@ -194,18 +194,31 @@ void dap_events_socket_assign_on_worker_inter(dap_events_socket_t * a_es_input,
 
 }
 
-
+/**
+ * @brief dap_events_socket_reassign_between_workers_unsafe
+ * @param a_es
+ * @param a_worker_new
+ */
 void dap_events_socket_reassign_between_workers_unsafe(dap_events_socket_t * a_es, dap_worker_t * a_worker_new)
 {
-    log_it(L_DEBUG, "reassign between workers");
-    dap_events_socket_remove_from_worker_unsafe( a_es, a_es->worker );
+    dap_worker_t * l_worker = a_es->worker;
+    dap_events_socket_t * l_queue_input= l_worker->queue_es_new_input[a_worker_new->id];
+    log_it(L_DEBUG, "Reassign between %u->%u workers: %p (%d)  ", l_worker->id, a_worker_new->id, a_es, a_es->fd );
+
+    dap_events_socket_remove_from_worker_unsafe( a_es, l_worker );
     a_es->was_reassigned = true;
     if (a_es->callbacks.worker_unassign_callback)
-        a_es->callbacks.worker_unassign_callback(a_es, a_es->worker);
+        a_es->callbacks.worker_unassign_callback(a_es, l_worker);
 
-    dap_worker_add_events_socket_unsafe(a_es, a_worker_new);
+    dap_worker_add_events_socket_inter( l_queue_input,  a_es);
 }
 
+/**
+ * @brief dap_events_socket_reassign_between_workers_mt
+ * @param a_worker_old
+ * @param a_es
+ * @param a_worker_new
+ */
 void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, dap_events_socket_t * a_es, dap_worker_t * a_worker_new)
 {
     dap_worker_msg_reassign_t * l_msg = DAP_NEW_Z(dap_worker_msg_reassign_t);
diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c
index f58b4f7264..5b8646345e 100644
--- a/modules/service/vpn/dap_chain_net_srv_vpn.c
+++ b/modules/service/vpn/dap_chain_net_srv_vpn.c
@@ -102,13 +102,13 @@ typedef struct vpn_local_network {
 } vpn_local_network_t;
 
 // Message for QUEUE_PTR operations
-struct tun_socket_msg{
+typedef struct tun_socket_msg{
     enum{
+        TUN_SOCKET_MSG_NONE,
         TUN_SOCKET_MSG_IP_ASSIGNED,
         TUN_SOCKET_MSG_IP_UNASSIGNED,
         TUN_SOCKET_MSG_CH_VPN_SEND,
         TUN_SOCKET_MSG_ESOCKET_REASSIGNED,
-        TUN_SOCKET_MSG_NONE
     } type;
     dap_chain_net_srv_ch_vpn_t * ch_vpn;
     dap_events_socket_t * esocket;
@@ -131,7 +131,7 @@ struct tun_socket_msg{
             ch_vpn_pkt_t * pkt;
         } ch_vpn_send;
     };
-};
+} tun_socket_msg_t;
 
 
 dap_chain_net_srv_vpn_tun_socket_t ** s_tun_sockets = NULL;
@@ -290,10 +290,10 @@ static void s_tun_recv_msg_callback(dap_events_socket_t * a_esocket_queue, void
                 l_info->queue_msg = s_tun_sockets_queue_msg[l_msg->esocket_reassigment.worker_id];
                 l_info->is_reassigned_once = true;
                 l_info->is_on_this_worker =(a_esocket_queue->worker->id == l_msg->esocket_reassigment.worker_id);
-                if(s_debug_more){
+                if(dap_log_level_get() <= L_INFO){
                     char l_addrbuf[INET_ADDRSTRLEN]= { [0]='\0'};
                     inet_ntop(AF_INET,&l_msg->esocket_reassigment.addr, l_addrbuf, sizeof (l_addrbuf));
-                    log_it(L_DEBUG, "Tun:%u message: addr %s reassign on worker #%u",a_esocket_queue->worker->id,
+                    log_it(L_INFO, "Tun:%u message: addr %s reassign on worker #%u",a_esocket_queue->worker->id,
                            l_addrbuf, l_msg->esocket_reassigment.worker_id);
                 }
             }else{
@@ -454,14 +454,15 @@ static void s_tun_send_msg_esocket_reasigned_inter(dap_chain_net_srv_vpn_tun_soc
     struct tun_socket_msg * l_msg = DAP_NEW_Z(struct tun_socket_msg);
     l_msg->type = TUN_SOCKET_MSG_ESOCKET_REASSIGNED ;
     l_msg->ch_vpn = a_ch_vpn;
-    l_msg->ip_unassigment.addr = a_addr;
-    l_msg->ip_unassigment.worker_id = a_esocket_worker_id;
+    l_msg->esocket_reassigment.addr = a_addr;
+    l_msg->esocket_reassigment.worker_id = a_esocket_worker_id;
     l_msg->esocket = a_esocket;
     l_msg->is_reassigned_once = true;
 
     if (dap_events_socket_queue_ptr_send_to_input(a_tun_socket->queue_tun_msg_input[a_esocket_worker_id] , l_msg) != 0){
         log_it(L_WARNING, "Cant send esocket reassigment message to the tun msg queue #%u", a_tun_socket->worker_id );
-    }
+    }else
+        log_it(L_DEBUG,"Sent reassign message to tun:%u", a_esocket_worker_id);
 }
 
 /**
@@ -702,8 +703,8 @@ static int s_vpn_tun_create(dap_config_t * g_config)
     // Fill inter tun qyueue
     // Create for all previous created sockets the input queue
     for (size_t n=0; n< s_tun_sockets_count; n++){
-        dap_worker_t * l_worker = dap_events_worker_get(n);
         dap_chain_net_srv_vpn_tun_socket_t * l_tun_socket = s_tun_sockets[n];
+        dap_worker_t * l_worker = dap_events_worker_get(n);
         for (size_t k=0; k< s_tun_sockets_count; k++){
             dap_events_socket_t * l_queue_msg_input = dap_events_socket_queue_ptr_create_input( s_tun_sockets_queue_msg[n] );
             l_tun_socket->queue_tun_msg_input[k] = l_queue_msg_input;
@@ -850,6 +851,7 @@ int dap_chain_net_srv_vpn_init(dap_config_t * g_config) {
         log_it(L_CRITICAL, "Error initializing TUN device driver!");
         return -1;
     }
+
     log_it(L_INFO,"TUN driver configured successfuly");
     s_vpn_service_create(g_config);
     dap_stream_ch_proc_add(DAP_STREAM_CH_ID_NET_SRV_VPN, s_ch_vpn_new, s_ch_vpn_delete, s_ch_packet_in,
@@ -1561,8 +1563,10 @@ static void s_es_tun_read(dap_events_socket_t * a_es, void * arg)
     if (s_debug_more){
         char l_str_daddr[INET_ADDRSTRLEN]={[0]='\0'};
         char l_str_saddr[INET_ADDRSTRLEN]={[0]='\0'};
-        inet_ntop(AF_INET,&iph->daddr,l_str_daddr,sizeof (iph->daddr));
-        inet_ntop(AF_INET,&iph->saddr,l_str_saddr,sizeof (iph->saddr));
+        struct in_addr l_daddr={ .s_addr = iph->daddr};
+        struct in_addr l_saddr={ .s_addr = iph->saddr};
+        inet_ntop(AF_INET,&l_daddr,l_str_daddr,sizeof (iph->daddr));
+        inet_ntop(AF_INET,&l_saddr,l_str_saddr,sizeof (iph->saddr));
         log_it(L_DEBUG,"m_es_tun_read() received ip packet %s->%s tot_len: %u ",
                l_str_saddr, l_str_saddr, iph->tot_len);
     }
@@ -1579,7 +1583,7 @@ static void s_es_tun_read(dap_events_socket_t * a_es, void * arg)
         }
         // We found in local table, sending data (if possible)
         if (l_vpn_info){
-            if ( !l_vpn_info->is_on_this_worker && !l_vpn_info->is_reassigned_once ){
+            if ( !l_vpn_info->is_on_this_worker && !l_vpn_info->is_reassigned_once && s_raw_server->auto_cpu_reassignment ){
                 log_it(L_NOTICE, "Reassigning from worker %u to %u", l_vpn_info->worker->id, a_es->worker->id);
                 l_vpn_info->is_reassigned_once = true;
                 s_tun_send_msg_esocket_reasigned_all_inter(l_vpn_info->ch_vpn, l_vpn_info->esocket, l_vpn_info->addr_ipv4,a_es->worker->id);
-- 
GitLab