From 127189f74787ef8cdc2f70e89f0e59dd3f6dbdf2 Mon Sep 17 00:00:00 2001
From: Dmitriy Gerasimov <naeper@demlabs.net>
Date: Wed, 21 Jul 2021 12:58:21 +0700
Subject: [PATCH] [*] Fixed memory leaks

---
 dap-sdk/net/client/dap_client.c               |  2 +
 dap-sdk/net/client/dap_client_pvt.c           | 46 +++++++++++--------
 dap-sdk/net/core/dap_events.c                 |  4 +-
 dap-sdk/net/core/dap_events_socket.c          | 43 ++++++++++++-----
 dap-sdk/net/core/dap_worker.c                 |  2 +-
 .../server/notify_server/src/dap_notify_srv.c |  2 +-
 dap-sdk/net/stream/ch/dap_stream_ch_pkt.c     | 22 ++++++++-
 .../net/stream/ch/include/dap_stream_ch_pkt.h |  1 +
 dap-sdk/net/stream/stream/dap_stream.c        | 21 +++++----
 .../dap_stream_ch_chain_net_srv.c             |  3 +-
 .../srv/include/dap_chain_net_srv_common.h    |  1 +
 11 files changed, 102 insertions(+), 45 deletions(-)

diff --git a/dap-sdk/net/client/dap_client.c b/dap-sdk/net/client/dap_client.c
index 1bf65abff9..cfdcbfcb08 100644
--- a/dap-sdk/net/client/dap_client.c
+++ b/dap-sdk/net/client/dap_client.c
@@ -605,6 +605,7 @@ const char * dap_client_get_stream_id(dap_client_t * a_client)
  */
 bool dap_client_get_is_always_reconnect(dap_client_t * a_client)
 {
+    assert(a_client);
     return DAP_CLIENT_PVT(a_client)->is_always_reconnect;
 }
 
@@ -615,6 +616,7 @@ bool dap_client_get_is_always_reconnect(dap_client_t * a_client)
  */
 void dap_client_set_is_always_reconnect(dap_client_t * a_client, bool a_value)
 {
+    assert(a_client);
     DAP_CLIENT_PVT(a_client)->is_always_reconnect = a_value;
 }
 
diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c
index b33422490e..12f49aeb68 100644
--- a/dap-sdk/net/client/dap_client_pvt.c
+++ b/dap-sdk/net/client/dap_client_pvt.c
@@ -977,24 +977,33 @@ static void s_enc_init_response(dap_client_t * a_client, void * a_response, size
         //char l_session_id_b64[DAP_ENC_BASE64_ENCODE_SIZE(DAP_ENC_KS_KEY_ID_SIZE) + 1] = { 0 };
         //char *l_bob_message_b64 = DAP_NEW_Z_SIZE(char, a_response_size - sizeof(l_session_id_b64) + 1);
         if(json_parse_count >= 2 && json_parse_count <=3) { //if (sscanf (a_response,"%s %s",l_session_id_b64, l_bob_message_b64) == 2 ){
-            l_client_pvt->session_key_id = DAP_NEW_Z_SIZE(char, strlen(l_session_id_b64) + 1);
-            dap_enc_base64_decode(l_session_id_b64, strlen(l_session_id_b64),
-                    l_client_pvt->session_key_id, DAP_ENC_DATA_TYPE_B64);
-            log_it(L_DEBUG, "ENC: session Key ID %s", l_client_pvt->session_key_id);
-
-            char *l_bob_message = DAP_NEW_Z_SIZE(char, strlen(l_bob_message_b64) + 1);
-            size_t l_bob_message_size = dap_enc_base64_decode(l_bob_message_b64, strlen(l_bob_message_b64),
-                    l_bob_message, DAP_ENC_DATA_TYPE_B64);
-            l_client_pvt->session_key_open->gen_alice_shared_key(
-                    l_client_pvt->session_key_open, l_client_pvt->session_key_open->priv_key_data,
-                    l_bob_message_size, (unsigned char*) l_bob_message);
-
-            l_client_pvt->session_key = dap_enc_key_new_generate(l_client_pvt->session_key_type,
-                    l_client_pvt->session_key_open->priv_key_data, // shared key
-                    l_client_pvt->session_key_open->priv_key_data_size,
-                    l_client_pvt->session_key_id, strlen(l_client_pvt->session_key_id), l_client_pvt->session_key_block_size);
-
-            DAP_DELETE(l_bob_message);
+            if(!l_session_id_b64){
+                log_it(L_WARNING,"ENC: no session id in base64");
+            }
+            if(!l_bob_message_b64){
+                log_it(L_WARNING,"ENC: no bob message in base64");
+            }
+            if( l_bob_message_b64 && l_session_id_b64){
+                l_client_pvt->session_key_id = DAP_NEW_Z_SIZE(char, strlen(l_session_id_b64) + 1);
+                dap_enc_base64_decode(l_session_id_b64, strlen(l_session_id_b64),
+                        l_client_pvt->session_key_id, DAP_ENC_DATA_TYPE_B64);
+                log_it(L_DEBUG, "ENC: session Key ID %s", l_client_pvt->session_key_id);
+
+                char *l_bob_message = DAP_NEW_Z_SIZE(char, strlen(l_bob_message_b64) + 1);
+                size_t l_bob_message_size = dap_enc_base64_decode(l_bob_message_b64, strlen(l_bob_message_b64),
+                        l_bob_message, DAP_ENC_DATA_TYPE_B64);
+                l_client_pvt->session_key_open->gen_alice_shared_key(
+                        l_client_pvt->session_key_open, l_client_pvt->session_key_open->priv_key_data,
+                        l_bob_message_size, (unsigned char*) l_bob_message);
+
+                l_client_pvt->session_key = dap_enc_key_new_generate(l_client_pvt->session_key_type,
+                        l_client_pvt->session_key_open->priv_key_data, // shared key
+                        l_client_pvt->session_key_open->priv_key_data_size,
+                        l_client_pvt->session_key_id, strlen(l_client_pvt->session_key_id), l_client_pvt->session_key_block_size);
+
+                DAP_DELETE(l_bob_message);
+            }
+
             if(l_client_pvt->stage == STAGE_ENC_INIT) { // We are in proper stage
                 l_client_pvt->stage_status = STAGE_STATUS_DONE;
                 s_stage_status_after(l_client_pvt);
@@ -1126,6 +1135,7 @@ static void s_stream_ctl_response(dap_client_t * a_client, void * a_data, size_t
         }
         DAP_DELETE(l_stream_key);
     }
+    DAP_DELETE(l_response_str);
 }
 
 /**
diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c
index 83cd14845f..ac6ce8d2ef 100644
--- a/dap-sdk/net/core/dap_events.c
+++ b/dap-sdk/net/core/dap_events.c
@@ -160,9 +160,9 @@ void dap_cpu_assign_thread_on(uint32_t a_cpu_id)
     int l_retcode;
 #ifdef DAP_OS_DARWIN
     thread_affinity_policy_data_t l_policy_data={.affinity_tag = a_cpu_id};
-    thread_policy_set(l_pthread_mach_port , THREAD_AFFINITY_POLICY, (thread_policy_t)&l_policy_data , 1);
+    l_retcode = thread_policy_set(l_pthread_mach_port , THREAD_AFFINITY_POLICY, (thread_policy_t)&l_policy_data , 1);
 #elif defined(DAP_OS_ANDROID)
-    err = sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &mask);
+    l_retcode = sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &mask);
 #else
     l_retcode = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask);
 #endif
diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c
index 90ccaa46a0..c234fe38e7 100644
--- a/dap-sdk/net/core/dap_events_socket.c
+++ b/dap-sdk/net/core/dap_events_socket.c
@@ -256,7 +256,10 @@ void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old,
     l_msg->esocket = a_es;
     l_msg->esocket_uuid = a_es->uuid;
     l_msg->worker_new = a_worker_new;
-    dap_events_socket_queue_ptr_send(a_worker_old->queue_es_reassign, l_msg);
+    if( dap_events_socket_queue_ptr_send(a_worker_old->queue_es_reassign, l_msg) != 0 ){
+        log_it(L_ERROR,"Haven't sent reassign message with esocket %d", a_es?a_es->socket:-1);
+        DAP_DELETE(l_msg);
+    }
 }
 
 /**
@@ -1180,6 +1183,11 @@ int dap_events_socket_queue_ptr_send_to_input(dap_events_socket_t * a_es_input,
         assert(l_es);
 
         dap_events_socket_w_data_t * l_es_w_data = DAP_NEW_Z(dap_events_socket_w_data_t);
+        if(!l_es_w_data){
+            log_it(L_CRITICAL, "Can't allocate, out of memory");
+            return -1024;
+        }
+
         l_es_w_data->esocket = l_es;
         l_es_w_data->ptr = a_arg;
         EV_SET(&l_event,a_es_input->socket+arc4random()  , EVFILT_USER,EV_ADD | EV_CLEAR | EV_ONESHOT, NOTE_FFCOPY | NOTE_TRIGGER ,0, l_es_w_data);
@@ -1189,7 +1197,13 @@ int dap_events_socket_queue_ptr_send_to_input(dap_events_socket_t * a_es_input,
             l_ret=kevent(l_es->proc_thread->kqueue_fd,&l_event,1,NULL,0,NULL);
         else
             l_ret=-100;
-        return l_ret==0? 0 : -1;
+        if(l_ret ==0 ){
+            return 0;
+        }else{
+            log_it(L_ERROR,"Can't send message in queue, code %d", l_ret);
+            DAP_DELETE(l_es_w_data);
+            return l_ret;
+        }
     }else{
         log_it(L_ERROR,"No pipe_out pointer for queue socket, possible created wrong");
         return -2;
@@ -1215,7 +1229,7 @@ int dap_events_socket_queue_ptr_send_to_input(dap_events_socket_t * a_es_input,
  */
 int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg)
 {
-    int l_ret;
+    int l_ret=-1024;
     int l_errno;
     if (s_debug_reactor)
         log_it(L_DEBUG,"Sent ptr %p to esocket queue %p (%d)", a_arg, a_es, a_es? a_es->fd : -1);
@@ -1294,6 +1308,7 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg)
         else {
             log_it(L_WARNING,"Trying to send pointer in pipe out queue thats not assigned to any worker or proc thread");
             l_n = 0;
+            DAP_DELETE(l_es_w_data);
         }
     }else if(a_es->worker){
         l_n = kevent(a_es->worker->kqueue_fd,&l_event,1,NULL,0,NULL);
@@ -1304,6 +1319,7 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg)
     }else {
         log_it(L_WARNING,"Trying to send pointer in queue thats not assigned to any worker or proc thread");
         l_n = 0;
+        DAP_DELETE(l_es_w_data);
     }
 
     if(l_n == 0 ){
@@ -1312,23 +1328,24 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg)
         l_errno = errno;
         log_it(L_ERROR,"Sending kevent error code %d", l_n);
         l_ret = -1;
-        DAP_DELETE(l_es_w_data);
     }
     
 #else
 #error "Not implemented dap_events_socket_queue_ptr_send() for this platform"
 #endif
-    if (l_ret == sizeof(a_arg) )
-        return  0;
-    else{
+    if (l_ret == sizeof(a_arg) ){
+        return 0;
+    }else{
         // Try again
         if(l_errno == EAGAIN || l_errno == EWOULDBLOCK ){
             add_ptr_to_buf(a_es, a_arg);
             return 0;
+        }else {
+            char l_errbuf[128];
+            log_it(L_ERROR, "Can't send ptr to queue:\"%s\" code %d", strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)), l_errno);
+            DAP_DELETE(l_es_w_data);
+            return l_errno;
         }
-        char l_errbuf[128];
-        log_it(L_ERROR, "Can't send ptr to queue:\"%s\" code %d", strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)), l_errno);
-        return l_errno;
     }
 }
 
@@ -1384,6 +1401,10 @@ int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value
     else
         l_n = -1;
 
+    if(l_n != 0){
+        log_it(L_ERROR,"Haven't sent pointer in pipe out queue, code %d", l_n);
+        DAP_DELETE(l_es_w_data);
+    }
     return l_n;
 #else
 #error "Not implemented dap_events_socket_event_signal() for this platform"
@@ -1912,7 +1933,7 @@ void dap_events_socket_remove_and_delete_mt(dap_worker_t * a_w,  dap_events_sock
        l_es_handler->uuid = a_es->uuid;
 
     if(dap_events_socket_queue_ptr_send( a_w->queue_es_delete, l_es_handler ) != 0 ){
-        log_it(L_ERROR,"Can't send %d fd in queue", a_es->fd);
+        log_it(L_ERROR,"Can't send %d fd in queue", a_es? a_es->fd : -1);
         DAP_DELETE(l_es_handler);
     }
 }
diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c
index fb0c8c8f41..2fd8894b24 100644
--- a/dap-sdk/net/core/dap_worker.c
+++ b/dap-sdk/net/core/dap_worker.c
@@ -979,7 +979,7 @@ static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg
     if (dap_events_socket_check_uuid_unsafe (a_es->worker,l_esocket, l_es_handler->uuid)){
         l_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; // Send signal to socket to kill
     }else
-        log_it(L_INFO, "While we were sending the delete() message, esocket %p has been disconnected", l_esocket);
+        log_it(L_INFO, "While we were sending the delete() message, esocket %p has been disconnected ", l_esocket);
     DAP_DELETE(l_es_handler);
 }
 
diff --git a/dap-sdk/net/server/notify_server/src/dap_notify_srv.c b/dap-sdk/net/server/notify_server/src/dap_notify_srv.c
index dc963b3c96..c42380a56e 100644
--- a/dap-sdk/net/server/notify_server/src/dap_notify_srv.c
+++ b/dap-sdk/net/server/notify_server/src/dap_notify_srv.c
@@ -172,9 +172,9 @@ static void s_notify_server_callback_queue(dap_events_socket_t * a_es, void * a_
             dap_events_socket_write_inter(a_es->worker->queue_es_io_input[l_worker_id],l_socket_handler->esocket,
                                       a_arg,l_str_len+1);
         }
-        DAP_DELETE(a_arg);
     }
     pthread_rwlock_unlock(&s_notify_server_clients_mutex);
+    DAP_DELETE(a_arg);
 }
 
 /**
diff --git a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c
index c060766e5a..11631fba3e 100644
--- a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c
+++ b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c
@@ -222,7 +222,27 @@ bool dap_stream_ch_check_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t *
         return false;
 }
 
-
+/**
+ * @brief dap_stream_ch_check_uuid_unsafe
+ * @param a_worker
+ * @param a_ch
+ * @param a_uuid
+ * @return
+ */
+bool dap_stream_ch_check_uuid_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t * a_ch, uint128_t a_uuid)
+{
+    if (a_ch){
+        if ( a_worker->channels){
+            dap_stream_ch_t * l_ch = NULL;
+            pthread_rwlock_rdlock(&a_worker->channels_rwlock);
+            HASH_FIND(hh_worker,a_worker->channels ,&a_ch, sizeof(a_ch), l_ch );
+            pthread_rwlock_unlock(&a_worker->channels_rwlock);
+            return l_ch == a_ch && dap_uint128_check_equal(l_ch->uuid,a_uuid);
+        }else
+            return false;
+    }else
+        return false;
+}
 
 /**
  * @brief stream_ch_pkt_write
diff --git a/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h b/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h
index b561c723c4..505285e904 100644
--- a/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h
+++ b/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h
@@ -55,6 +55,7 @@ size_t dap_stream_ch_pkt_write_f_unsafe(struct dap_stream_ch * a_ch, uint8_t a_t
 size_t dap_stream_ch_pkt_write_unsafe(struct dap_stream_ch * a_ch,  uint8_t a_type, const void * a_data, size_t a_data_size);
 
 bool dap_stream_ch_check_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t * a_ch);
+bool dap_stream_ch_check_uuid_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t * a_ch, uint128_t a_uuid);
 
 size_t dap_stream_ch_pkt_write_f_mt(dap_stream_worker_t * a_worker , dap_stream_ch_t *a_ch, uint8_t a_type, const char * a_str,...);
 size_t dap_stream_ch_pkt_write_mt(dap_stream_worker_t * a_worker , dap_stream_ch_t *a_ch,  uint8_t a_type, const void * a_data, size_t a_data_size);
diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c
index 4fcff3a0f3..c418c1883f 100644
--- a/dap-sdk/net/stream/stream/dap_stream.c
+++ b/dap-sdk/net/stream/stream/dap_stream.c
@@ -37,6 +37,7 @@
 
 #include "dap_common.h"
 #include "dap_timerfd.h"
+#include "dap_events.h"
 
 #include "dap_stream.h"
 #include "dap_stream_pkt.h"
@@ -789,15 +790,15 @@ static bool s_detect_loose_packet(dap_stream_t * a_stream)
 
 static bool s_keepalive_cb( void )
 {
-  dap_stream_t  *l_stream, *tmp;
-  pthread_mutex_lock( &s_mutex_keepalive_list );
-  stream_pkt_hdr_t l_pkt = {0};
-  l_pkt.type = STREAM_PKT_TYPE_KEEPALIVE;
-  memcpy(l_pkt.sig, c_dap_stream_sig, sizeof(l_pkt.sig));
-  DL_FOREACH_SAFE( s_stream_keepalive_list, l_stream, tmp ) {
-      dap_events_socket_write_mt(l_stream->stream_worker->worker, l_stream->esocket, &l_pkt, sizeof(l_pkt));
-  }
-  pthread_mutex_unlock( &s_mutex_keepalive_list );
-  return true;
+    dap_stream_t  *l_stream, *tmp;
+    pthread_mutex_lock( &s_mutex_keepalive_list );
+    stream_pkt_hdr_t l_pkt = {0};
+    l_pkt.type = STREAM_PKT_TYPE_KEEPALIVE;
+    memcpy(l_pkt.sig, c_dap_stream_sig, sizeof(l_pkt.sig));
+    DL_FOREACH_SAFE( s_stream_keepalive_list, l_stream, tmp ) {
+      dap_events_socket_write_inter(  l_stream->stream_worker->worker, l_stream->esocket, &l_pkt, sizeof(l_pkt));
+    }
+    pthread_mutex_unlock( &s_mutex_keepalive_list );
+    return true;
 }
 
diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c
index 6a45d7919a..f191e1bd15 100644
--- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c
+++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c
@@ -146,7 +146,7 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace)
     dap_chain_net_srv_t * l_srv = NULL;
     dap_stream_ch_t *l_ch = a_grace->ch;
 
-    if (!dap_stream_ch_check_unsafe(a_grace->stream_worker, l_ch))
+    if (!dap_stream_ch_check_uuid_unsafe(a_grace->stream_worker, l_ch, a_grace->ch_uuid))
         goto free_exit;
 
     dap_chain_net_srv_stream_session_t *l_srv_session = l_ch && l_ch->stream && l_ch->stream->session ?
@@ -438,6 +438,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg)
                 memcpy(l_grace->request, l_ch_pkt->data, l_ch_pkt->hdr.size);
                 l_grace->request_size = l_ch_pkt->hdr.size;
                 l_grace->ch = a_ch;
+                l_grace->ch_uuid = a_ch->uuid;
                 l_grace->stream_worker = a_ch->stream_worker;
                 s_grace_period_control(l_grace);
             } break;
diff --git a/modules/net/srv/include/dap_chain_net_srv_common.h b/modules/net/srv/include/dap_chain_net_srv_common.h
index 69238b2983..eb12f9c6fd 100755
--- a/modules/net/srv/include/dap_chain_net_srv_common.h
+++ b/modules/net/srv/include/dap_chain_net_srv_common.h
@@ -186,6 +186,7 @@ typedef struct dap_chain_net_srv_usage dap_chain_net_srv_usage_t;
 typedef struct dap_chain_net_srv_grace {
     dap_stream_worker_t *stream_worker;
     dap_stream_ch_t *ch;
+    uint128_t ch_uuid;
     dap_chain_net_srv_usage_t *usage;
     dap_stream_ch_chain_net_srv_pkt_request_t *request;
     size_t request_size;
-- 
GitLab