From df3c430a13ce9b0452594c105ef4e87bc0d18641 Mon Sep 17 00:00:00 2001
From: Dmitriy Gerasimov <naeper@demlabs.net>
Date: Fri, 16 Jul 2021 14:15:10 +0700
Subject: [PATCH] [!] Massive bugfixes for event socket queues [*] Small fix
 for sqlite backend

---
 dap-sdk/net/core/dap_events_socket.c          | 76 +++++++++++++++++--
 dap-sdk/net/core/dap_worker.c                 | 16 ++--
 dap-sdk/net/core/include/dap_events_socket.h  |  1 +
 dap-sdk/net/core/include/dap_worker.h         |  2 +
 .../server/notify_server/src/dap_notify_srv.c |  3 +-
 dap-sdk/net/stream/stream/dap_stream_pkt.c    |  3 +
 modules/channel/chain/dap_stream_ch_chain.c   |  8 +-
 modules/global-db/dap_chain_global_db.c       |  2 +
 .../dap_chain_global_db_driver_sqlite.c       | 21 ++++-
 9 files changed, 112 insertions(+), 20 deletions(-)

diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c
index 1899739ca8..fbdf968b1f 100644
--- a/dap-sdk/net/core/dap_events_socket.c
+++ b/dap-sdk/net/core/dap_events_socket.c
@@ -254,6 +254,7 @@ void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old,
 {
     dap_worker_msg_reassign_t * l_msg = DAP_NEW_Z(dap_worker_msg_reassign_t);
     l_msg->esocket = a_es;
+    l_msg->esocket_uuid = a_es->uuid;
     l_msg->worker_new = a_worker_new;
     dap_events_socket_queue_ptr_send(a_worker_old->queue_es_reassign, l_msg);
 }
@@ -1395,9 +1396,14 @@ int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value
  */
 void dap_events_socket_queue_on_remove_and_delete(dap_events_socket_t* a_es)
 {
-    int l_ret= dap_events_socket_queue_ptr_send( a_es->worker->queue_es_delete, a_es );
+    dap_events_socket_handler_t * l_es_handler= DAP_NEW_Z(dap_events_socket_handler_t);
+    l_es_handler->esocket = a_es;
+    l_es_handler->uuid = a_es->uuid;
+
+    int l_ret= dap_events_socket_queue_ptr_send( a_es->worker->queue_es_delete, l_es_handler );
     if( l_ret != 0 ){
         log_it(L_ERROR, "Queue send returned %d", l_ret);
+        DAP_DELETE(l_es_handler);
     }
 }
 
@@ -1589,8 +1595,15 @@ void dap_events_socket_set_readable_unsafe( dap_events_socket_t *a_esocket, bool
                 char l_errbuf[128];
                 l_errbuf[0]=0;
                 strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
-                log_it(L_ERROR," for set_read op %d: \"%s\" (%d)",
-                    l_kqueue_fd, l_errbuf, l_errno);
+                if (l_errno == EBADF){
+                    log_it(L_ATT,"Socket %d (%p ) disconnected, rise CLOSE flag to remove from queue, lost %"DAP_UINT64_FORMAT_u":%" DAP_UINT64_FORMAT_u
+                           " bytes",a_esocket->socket,a_esocket,a_esocket->buf_in_size,a_esocket->buf_out_size);
+                    a_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE;
+                    a_esocket->buf_in_size = a_esocket->buf_out_size = 0; // Reset everything from buffer, we close it now all
+                }else{
+                    log_it(L_ERROR,"Can't update client socket %d state on kqueue fd for set_read op %d: \"%s\" (%d)",
+                                    a_esocket->socket, l_kqueue_fd, l_errbuf, l_errno);
+                }
             }
         }
     }else
@@ -1650,8 +1663,15 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool
                 char l_errbuf[128];
                 l_errbuf[0]=0;
                 strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
-                log_it(L_ERROR,"Can't update client socket state on kqueue fd for set_write op %d: \"%s\" (%d)",
-                    l_kqueue_fd, l_errbuf, l_errno);
+                if (l_errno == EBADF){
+                    log_it(L_ATT,"Socket %d (%p ) disconnected, rise CLOSE flag to remove from queue, lost %"DAP_UINT64_FORMAT_u":%" DAP_UINT64_FORMAT_u
+                           " bytes",a_esocket->socket,a_esocket,a_esocket->buf_in_size,a_esocket->buf_out_size);
+                    a_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE;
+                    a_esocket->buf_in_size = a_esocket->buf_out_size = 0; // Reset everything from buffer, we close it now all
+                }else{
+                    log_it(L_ERROR,"Can't update client socket %d state on kqueue fd for set_write op %d: \"%s\" (%d)",
+                                    a_esocket->socket, l_kqueue_fd, l_errbuf, l_errno);
+                }
             }
         }
     }else
@@ -1818,6 +1838,30 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap
     a_es->worker = NULL;
 }
 
+/**
+ * @brief dap_events_socket_check_uuid_unsafe
+ * @param a_worker
+ * @param a_es
+ * @param a_es_uuid
+ * @return
+ */
+bool dap_events_socket_check_uuid_unsafe(dap_worker_t * a_worker,dap_events_socket_t * a_es, uint128_t a_es_uuid)
+{
+    if (a_es){
+        if ( a_worker->esockets){
+            dap_events_socket_t * l_es = NULL;
+            bool l_ret;
+            pthread_rwlock_rdlock(&a_worker->esocket_rwlock);
+            HASH_FIND(hh_worker,a_worker->esockets,&a_es, sizeof(void*), l_es );
+            l_ret = ( l_es == a_es && dap_uint128_check_equal(l_es->uuid,a_es_uuid) );
+            pthread_rwlock_unlock(&a_worker->esocket_rwlock);
+            return l_ret;
+        }else
+            return false;
+    }else
+        return false;
+}
+
 /**
  * @brief dap_events_socket_check_unsafe
  * @param a_worker
@@ -1846,8 +1890,16 @@ bool dap_events_socket_check_unsafe(dap_worker_t * a_worker,dap_events_socket_t
  */
 void dap_events_socket_remove_and_delete_mt(dap_worker_t * a_w,  dap_events_socket_t *a_es )
 {
-    if(a_w)
-        dap_events_socket_queue_ptr_send( a_w->queue_es_delete, a_es );
+    assert(a_w);
+    dap_events_socket_handler_t * l_es_handler= DAP_NEW_Z(dap_events_socket_handler_t);
+    l_es_handler->esocket = a_es;
+    if(a_es)
+       l_es_handler->uuid = a_es->uuid;
+
+    if(dap_events_socket_queue_ptr_send( a_w->queue_es_delete, l_es_handler ) != 0 ){
+        log_it(L_ERROR,"Can't send %d fd in queue", a_es->fd);
+        DAP_DELETE(l_es_handler);
+    }
 }
 
 /**
@@ -1860,6 +1912,8 @@ void dap_events_socket_set_readable_mt(dap_worker_t * a_w, dap_events_socket_t *
 {
     dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); if (! l_msg) return;
     l_msg->esocket = a_es;
+    if(a_es)
+        l_msg->esocket_uuid = a_es->uuid;
     if (a_is_ready)
         l_msg->flags_set = DAP_SOCK_READY_TO_READ;
     else
@@ -1881,6 +1935,8 @@ void dap_events_socket_set_writable_mt(dap_worker_t * a_w, dap_events_socket_t *
 {
     dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); if (!l_msg) return;
     l_msg->esocket = a_es;
+    if(a_es)
+        l_msg->esocket_uuid = a_es->uuid;
     if (a_is_ready)
         l_msg->flags_set = DAP_SOCK_READY_TO_WRITE;
     else
@@ -1905,6 +1961,8 @@ size_t dap_events_socket_write_inter(dap_events_socket_t * a_es_input, dap_event
 {
     dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); if( !l_msg) return 0;
     l_msg->esocket = a_es;
+    if(a_es)
+        l_msg->esocket_uuid = a_es->uuid;
     l_msg->data = DAP_NEW_SIZE(void,a_data_size);
     l_msg->data_size = a_data_size;
     l_msg->flags_set = DAP_SOCK_READY_TO_WRITE;
@@ -1968,6 +2026,8 @@ size_t dap_events_socket_write_mt(dap_worker_t * a_w,dap_events_socket_t *a_es,
 {
     dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); if (!l_msg) return 0;
     l_msg->esocket = a_es;
+    if(a_es)
+        l_msg->esocket_uuid = a_es->uuid;
     l_msg->data = DAP_NEW_SIZE(void,l_data_size);
     l_msg->data_size = l_data_size;
     l_msg->flags_set = DAP_SOCK_READY_TO_WRITE;
@@ -2002,6 +2062,8 @@ size_t dap_events_socket_write_f_mt(dap_worker_t * a_w,dap_events_socket_t *a_es
     }
     dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t);
     l_msg->esocket = a_es;
+    if(a_es)
+        l_msg->esocket_uuid = a_es->uuid;
     l_msg->data = DAP_NEW_SIZE(void,l_data_size + 1);
     l_msg->flags_set = DAP_SOCK_READY_TO_WRITE;
     l_data_size = dap_vsprintf(l_msg->data,format,ap_copy);
diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c
index 06fbebb723..9f6df3a750 100644
--- a/dap-sdk/net/core/dap_worker.c
+++ b/dap-sdk/net/core/dap_worker.c
@@ -973,11 +973,14 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg)
  */
 static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg)
 {
-    dap_events_socket_t * l_esocket = (dap_events_socket_t*) a_arg;
-    if (dap_events_socket_check_unsafe(a_es->worker,l_esocket)){
+    dap_events_socket_handler_t * l_es_handler = (dap_events_socket_handler_t*) a_arg;
+    assert(l_es_handler);
+    dap_events_socket_t * l_esocket = (dap_events_socket_t*) l_es_handler->esocket;
+    if (dap_events_socket_check_uuid_unsafe (a_es->worker,l_esocket, l_es_handler->uuid)){
         ((dap_events_socket_t*)a_arg)->flags |= DAP_SOCK_SIGNAL_CLOSE; // Send signal to socket to kill
     }else
         log_it(L_INFO, "While we were sending the delete() message, esocket %p has been disconnected", l_esocket);
+    DAP_DELETE(l_es_handler);
 }
 
 /**
@@ -989,7 +992,7 @@ static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_a
 {
     dap_worker_msg_reassign_t * l_msg = (dap_worker_msg_reassign_t*) a_arg;
     dap_events_socket_t * l_es_reassign = l_msg->esocket;
-    if (dap_events_socket_check_unsafe(a_es->worker,l_es_reassign)){
+    if (dap_events_socket_check_uuid_unsafe(a_es->worker,l_es_reassign, l_msg->esocket_uuid)){
         if( l_es_reassign->was_reassigned && l_es_reassign->flags & DAP_SOCK_REASSIGN_ONCE) {
             log_it(L_INFO, "Reassgment request with DAP_SOCK_REASSIGN_ONCE allowed only once, declined reassigment from %u to %u",
                    l_es_reassign->worker->id, l_msg->worker_new->id);
@@ -1040,15 +1043,12 @@ static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg)
     dap_worker_msg_io_t * l_msg = a_arg;
 
     // Check if it was removed from the list
-    dap_events_socket_t *l_msg_es = NULL;
-    pthread_rwlock_rdlock(&a_es->worker->esocket_rwlock);
-    HASH_FIND(hh_worker, a_es->worker->esockets, &l_msg->esocket , sizeof (void*), l_msg_es );
-    pthread_rwlock_unlock(&a_es->worker->esocket_rwlock);
-    if ( l_msg_es == NULL){
+    if ( !dap_events_socket_check_uuid_unsafe(a_es->worker,l_msg->esocket,l_msg->esocket_uuid)){
         log_it(L_INFO, "We got i/o message for esocket %p thats now not in list. Lost %u data", l_msg->esocket, l_msg->data_size);
         DAP_DELETE(l_msg);
         return;
     }
+    dap_events_socket_t *l_msg_es = l_msg->esocket;
     if (l_msg->flags_set & DAP_SOCK_CONNECTING)
         if (!  (l_msg_es->flags & DAP_SOCK_CONNECTING) ){
             l_msg_es->flags |= DAP_SOCK_CONNECTING;
diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h
index 0e107c6e1c..207a1067c1 100644
--- a/dap-sdk/net/core/include/dap_events_socket.h
+++ b/dap-sdk/net/core/include/dap_events_socket.h
@@ -340,6 +340,7 @@ size_t dap_events_socket_pop_from_buf_in(dap_events_socket_t *sc, void * data, s
 
 // Non-MT functions
 bool dap_events_socket_check_unsafe(dap_worker_t * a_worker,dap_events_socket_t * a_es);
+bool dap_events_socket_check_uuid_unsafe(dap_worker_t * a_worker,dap_events_socket_t * a_es, uint128_t a_es_uuid);
 
 void dap_events_socket_set_readable_unsafe(dap_events_socket_t * sc,bool is_ready);
 void dap_events_socket_set_writable_unsafe(dap_events_socket_t * sc,bool is_ready);
diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/net/core/include/dap_worker.h
index 4da7c9cff0..ca912b4346 100644
--- a/dap-sdk/net/core/include/dap_worker.h
+++ b/dap-sdk/net/core/include/dap_worker.h
@@ -95,12 +95,14 @@ typedef struct dap_worker
 // Message for reassigment
 typedef struct dap_worker_msg_reassign{
     dap_events_socket_t * esocket;
+    uint128_t esocket_uuid;
     dap_worker_t * worker_new;
 } dap_worker_msg_reassign_t;
 
 // Message for input/output queue
 typedef struct dap_worker_msg_io{
     dap_events_socket_t * esocket;
+    uint128_t esocket_uuid;
     size_t data_size;
     void *data;
     uint32_t flags_set;
diff --git a/dap-sdk/net/server/notify_server/src/dap_notify_srv.c b/dap-sdk/net/server/notify_server/src/dap_notify_srv.c
index 7bc137cbf0..dc963b3c96 100644
--- a/dap-sdk/net/server/notify_server/src/dap_notify_srv.c
+++ b/dap-sdk/net/server/notify_server/src/dap_notify_srv.c
@@ -168,9 +168,10 @@ static void s_notify_server_callback_queue(dap_events_socket_t * a_es, void * a_
             continue;
         }
         size_t l_str_len = a_arg? strlen((char*)a_arg): 0;
-        if(l_str_len)
+        if(l_str_len){
             dap_events_socket_write_inter(a_es->worker->queue_es_io_input[l_worker_id],l_socket_handler->esocket,
                                       a_arg,l_str_len+1);
+        }
         DAP_DELETE(a_arg);
     }
     pthread_rwlock_unlock(&s_notify_server_clients_mutex);
diff --git a/dap-sdk/net/stream/stream/dap_stream_pkt.c b/dap-sdk/net/stream/stream/dap_stream_pkt.c
index 1394eb16a8..e1b43a9d92 100644
--- a/dap-sdk/net/stream/stream/dap_stream_pkt.c
+++ b/dap-sdk/net/stream/stream/dap_stream_pkt.c
@@ -170,6 +170,9 @@ size_t dap_stream_pkt_write_mt(dap_worker_t * a_w,dap_events_socket_t *a_es, dap
 {
     dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t);
     stream_pkt_hdr_t *l_pkt_hdr;
+    l_msg->esocket = a_es;
+    if(a_es)
+       l_msg->esocket_uuid = a_es->uuid; // TODO replace function signature with UUID in place of worker+esocket
     l_msg->data_size = 16-a_data_size%16+a_data_size+sizeof(*l_pkt_hdr);
     l_msg->data = DAP_NEW_SIZE(void,l_msg->data_size);
     l_pkt_hdr=(stream_pkt_hdr_t*) l_msg->data;
diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c
index 160e1d5d05..dc579383b2 100644
--- a/modules/channel/chain/dap_stream_ch_chain.c
+++ b/modules/channel/chain/dap_stream_ch_chain.c
@@ -1206,12 +1206,14 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
             if(l_chain_pkt_data_size>1)
                 l_error_str[l_chain_pkt_data_size-1]='\0'; // To be sure that nobody sends us garbage
                                                            // without trailing zero
-            log_it(L_WARNING,"In: ERROR packet: '%s'",l_ch_chain->node_client, l_chain_pkt_data_size>1?
-                       l_error_str:"<empty>");
+            log_it(L_WARNING,"In from remote addr %s chain id 0x%016x got error on his side: '%s'",
+                   l_ch_chain->ch->stream->esocket->remote_addr_str?
+                                                                                    l_ch_chain->ch->stream->esocket->remote_addr_str: "<no addr>",
+                   l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt_data_size>1? l_error_str:"<empty>");
         } break;
 
         case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: {
-            log_it(L_INFO, "In:  SYNCED_ALL net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 ,
+            log_it(L_INFO, "In from "NODE_ADDR_FP_STR":  SYNCED_ALL net 0x%016x chain 0x%016x cell 0x%016x",NODE_ADDR_FP_ARGS_S(l_ch_chain->node_client->remote_node_addr), l_chain_pkt->hdr.net_id.uint64 ,
                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
         } break;
 
diff --git a/modules/global-db/dap_chain_global_db.c b/modules/global-db/dap_chain_global_db.c
index afcfddce19..1e5fa1979f 100644
--- a/modules/global-db/dap_chain_global_db.c
+++ b/modules/global-db/dap_chain_global_db.c
@@ -246,6 +246,8 @@ int dap_chain_global_db_init(dap_config_t * g_config)
     unlock();
     if( res != 0 )
         log_it(L_CRITICAL, "Hadn't initialized db driver \"%s\" on path \"%s\"", l_driver_name, l_storage_path);
+    else
+        log_it(L_NOTICE,"GlobalDB initialized");
     return res;
 }
 
diff --git a/modules/global-db/dap_chain_global_db_driver_sqlite.c b/modules/global-db/dap_chain_global_db_driver_sqlite.c
index 186b3d9c51..c101cbe340 100644
--- a/modules/global-db/dap_chain_global_db_driver_sqlite.c
+++ b/modules/global-db/dap_chain_global_db_driver_sqlite.c
@@ -27,12 +27,14 @@
 #include <stddef.h>
 #include <string.h>
 #include <pthread.h>
+#include <errno.h>
 
 #ifdef DAP_OS_UNIX
 #include <unistd.h>
 #endif
 #include "dap_common.h"
 #include "dap_hash.h"
+#include "dap_file_utils.h"
 #include "dap_strfuncs.h"
 #include "dap_chain_global_db_driver_sqlite.h"
 
@@ -88,8 +90,25 @@ int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks
         log_it(L_ERROR, "Can't init sqlite err=%d (%s)", l_ret, sqlite3_errstr(l_ret));
         return -2;
     }
+    char * l_filename_dir = dap_path_get_dirname(a_filename_db);
+    if(!dap_dir_test(l_filename_dir)){
+        log_it(L_NOTICE, "No directory %s, trying to create...",l_filename_dir);
+        int l_mkdir_ret = dap_mkdir_with_parents(l_filename_dir);
+        int l_errno = errno;
+        if(!dap_dir_test(l_filename_dir)){
+            char l_errbuf[255];
+            l_errbuf[0] = '\0';
+            strerror_r(l_errno,l_errbuf,sizeof(l_errbuf));
+            log_it(L_ERROR, "Can't create directory, error code %d, error string \"%s\"", l_mkdir_ret, l_errbuf);
+            DAP_DELETE(l_filename_dir);
+            return -21;
+        }else
+            log_it(L_NOTICE,"Directory created");
+    }
+    DAP_DEL_Z(l_filename_dir);
+
     char *l_error_message = NULL;
-    s_db = dap_db_driver_sqlite_open(a_filename_db, SQLITE_OPEN_READWRITE, &l_error_message);
+    s_db = dap_db_driver_sqlite_open(a_filename_db, SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, &l_error_message);
     if(!s_db) {
         log_it(L_ERROR, "Can't init sqlite err: \"%s\"", l_error_message);
         dap_db_driver_sqlite_free(l_error_message);
-- 
GitLab