From abea351e05bcb385b846aceba262fe638b922d01 Mon Sep 17 00:00:00 2001 From: Dmitriy Gerasimov <naeper@demlabs.net> Date: Fri, 16 Jul 2021 14:42:05 +0700 Subject: [PATCH] [*] Chain sync fixes [!] Esocket queue fixes [+] SQLite paths and file creation if not present --- dap-sdk/net/core/dap_events_socket.c | 102 ++++++++++++++---- dap-sdk/net/core/dap_worker.c | 21 ++-- dap-sdk/net/core/include/dap_events_socket.h | 1 + dap-sdk/net/core/include/dap_worker.h | 2 + .../server/notify_server/src/dap_notify_srv.c | 3 +- dap-sdk/net/stream/stream/dap_stream_pkt.c | 3 + modules/channel/chain/dap_stream_ch_chain.c | 10 +- modules/global-db/dap_chain_global_db.c | 2 + .../dap_chain_global_db_driver_sqlite.c | 22 +++- 9 files changed, 131 insertions(+), 35 deletions(-) diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 1899739ca8..d2a64b4dec 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -254,6 +254,8 @@ void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, { dap_worker_msg_reassign_t * l_msg = DAP_NEW_Z(dap_worker_msg_reassign_t); l_msg->esocket = a_es; + if(a_es) + l_msg->esocket_uuid = a_es->uuid; l_msg->worker_new = a_worker_new; dap_events_socket_queue_ptr_send(a_worker_old->queue_es_reassign, l_msg); } @@ -1395,9 +1397,14 @@ int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value */ void dap_events_socket_queue_on_remove_and_delete(dap_events_socket_t* a_es) { - int l_ret= dap_events_socket_queue_ptr_send( a_es->worker->queue_es_delete, a_es ); + dap_events_socket_handler_t * l_es_handler= DAP_NEW_Z(dap_events_socket_handler_t); + l_es_handler->esocket = a_es; + l_es_handler->uuid = a_es->uuid; + + int l_ret= dap_events_socket_queue_ptr_send( a_es->worker->queue_es_delete, l_es_handler ); if( l_ret != 0 ){ log_it(L_ERROR, "Queue send returned %d", l_ret); + DAP_DELETE(l_es_handler); } } @@ -1589,8 +1596,15 @@ void dap_events_socket_set_readable_unsafe( dap_events_socket_t *a_esocket, bool char l_errbuf[128]; l_errbuf[0]=0; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it(L_ERROR," for set_read op %d: \"%s\" (%d)", - l_kqueue_fd, l_errbuf, l_errno); + if (l_errno == EBADF){ + log_it(L_ATT,"Socket %d (%p ) disconnected, rise CLOSE flag to remove from queue, lost %"DAP_UINT64_FORMAT_u":%" DAP_UINT64_FORMAT_u + " bytes",a_esocket->socket,a_esocket,a_esocket->buf_in_size,a_esocket->buf_out_size); + a_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; + a_esocket->buf_in_size = a_esocket->buf_out_size = 0; // Reset everything from buffer, we close it now all + }else{ + log_it(L_ERROR,"Can't update client socket %d state on kqueue fd for set_read op %d: \"%s\" (%d)", + a_esocket->socket, l_kqueue_fd, l_errbuf, l_errno); + } } } }else @@ -1650,8 +1664,15 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool char l_errbuf[128]; l_errbuf[0]=0; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it(L_ERROR,"Can't update client socket state on kqueue fd for set_write op %d: \"%s\" (%d)", - l_kqueue_fd, l_errbuf, l_errno); + if (l_errno == EBADF){ + log_it(L_ATT,"Socket %d (%p ) disconnected, rise CLOSE flag to remove from queue, lost %"DAP_UINT64_FORMAT_u":%" DAP_UINT64_FORMAT_u + " bytes",a_esocket->socket,a_esocket,a_esocket->buf_in_size,a_esocket->buf_out_size); + a_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; + a_esocket->buf_in_size = a_esocket->buf_out_size = 0; // Reset everything from buffer, we close it now all + }else{ + log_it(L_ERROR,"Can't update client socket %d state on kqueue fd for set_write op %d: \"%s\" (%d)", + a_esocket->socket, l_kqueue_fd, l_errbuf, l_errno); + } } } }else @@ -1818,6 +1839,30 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap a_es->worker = NULL; } +/** + * @brief dap_events_socket_check_uuid_unsafe + * @param a_worker + * @param a_es + * @param a_es_uuid + * @return + */ +bool dap_events_socket_check_uuid_unsafe(dap_worker_t * a_worker,dap_events_socket_t * a_es, uint128_t a_es_uuid) +{ + if (a_es){ + if ( a_worker->esockets){ + dap_events_socket_t * l_es = NULL; + bool l_ret; + pthread_rwlock_rdlock(&a_worker->esocket_rwlock); + HASH_FIND(hh_worker,a_worker->esockets,&a_es, sizeof(void*), l_es ); + l_ret = ( l_es == a_es && dap_uint128_check_equal(l_es->uuid,a_es_uuid) ); + pthread_rwlock_unlock(&a_worker->esocket_rwlock); + return l_ret; + }else + return false; + }else + return false; +} + /** * @brief dap_events_socket_check_unsafe * @param a_worker @@ -1846,8 +1891,16 @@ bool dap_events_socket_check_unsafe(dap_worker_t * a_worker,dap_events_socket_t */ void dap_events_socket_remove_and_delete_mt(dap_worker_t * a_w, dap_events_socket_t *a_es ) { - if(a_w) - dap_events_socket_queue_ptr_send( a_w->queue_es_delete, a_es ); + assert(a_w); + dap_events_socket_handler_t * l_es_handler= DAP_NEW_Z(dap_events_socket_handler_t); + l_es_handler->esocket = a_es; + if(a_es) + l_es_handler->uuid = a_es->uuid; + + if(dap_events_socket_queue_ptr_send( a_w->queue_es_delete, l_es_handler ) != 0 ){ + log_it(L_ERROR,"Can't send %d fd in queue", a_es->fd); + DAP_DELETE(l_es_handler); + } } /** @@ -1860,6 +1913,8 @@ void dap_events_socket_set_readable_mt(dap_worker_t * a_w, dap_events_socket_t * { dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); if (! l_msg) return; l_msg->esocket = a_es; + if(a_es) + l_msg->esocket_uuid = a_es->uuid; if (a_is_ready) l_msg->flags_set = DAP_SOCK_READY_TO_READ; else @@ -1881,6 +1936,8 @@ void dap_events_socket_set_writable_mt(dap_worker_t * a_w, dap_events_socket_t * { dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); if (!l_msg) return; l_msg->esocket = a_es; + if(a_es) + l_msg->esocket_uuid = a_es->uuid; if (a_is_ready) l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; else @@ -1905,6 +1962,8 @@ size_t dap_events_socket_write_inter(dap_events_socket_t * a_es_input, dap_event { dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); if( !l_msg) return 0; l_msg->esocket = a_es; + if(a_es) + l_msg->esocket_uuid = a_es->uuid; l_msg->data = DAP_NEW_SIZE(void,a_data_size); l_msg->data_size = a_data_size; l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; @@ -1942,6 +2001,8 @@ size_t dap_events_socket_write_f_inter(dap_events_socket_t * a_es_input, dap_eve dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); l_msg->esocket = a_es; + if(a_es) + l_msg->esocket_uuid = a_es->uuid; l_msg->data = DAP_NEW_SIZE(void,l_data_size); l_msg->data_size = l_data_size; l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; @@ -1968,6 +2029,8 @@ size_t dap_events_socket_write_mt(dap_worker_t * a_w,dap_events_socket_t *a_es, { dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); if (!l_msg) return 0; l_msg->esocket = a_es; + if(a_es) + l_msg->esocket_uuid = a_es->uuid; l_msg->data = DAP_NEW_SIZE(void,l_data_size); l_msg->data_size = l_data_size; l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; @@ -2002,6 +2065,8 @@ size_t dap_events_socket_write_f_mt(dap_worker_t * a_w,dap_events_socket_t *a_es } dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); l_msg->esocket = a_es; + if(a_es) + l_msg->esocket_uuid = a_es->uuid; l_msg->data = DAP_NEW_SIZE(void,l_data_size + 1); l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; l_data_size = dap_vsprintf(l_msg->data,format,ap_copy); @@ -2025,23 +2090,22 @@ size_t dap_events_socket_write_f_mt(dap_worker_t * a_w,dap_events_socket_t *a_es /** * @brief dap_events_socket_write Write data to the client - * @param sc Conn instance - * @param data Pointer to data - * @param data_size Size of data to write + * @param a_es Esocket instance + * @param a_data Pointer to data + * @param a_data_size Size of data to write * @return Number of bytes that were placed into the buffer */ -size_t dap_events_socket_write_unsafe(dap_events_socket_t *sc, const void * data, size_t data_size) +size_t dap_events_socket_write_unsafe(dap_events_socket_t *a_es, const void * a_data, size_t a_data_size) { - if(sc->buf_out_size > sc->buf_out_size_max){ - log_it(L_DEBUG,"write buffer already overflow size=%u/max=%u", sc->buf_out_size, sc->buf_out_size_max); + if(a_es->buf_out_size > a_es->buf_out_size_max){ + log_it(L_DEBUG,"write buffer already overflow size=%u/max=%u", a_es->buf_out_size, a_es->buf_out_size_max); return 0; } - //log_it(L_DEBUG,"dap_events_socket_write %u sock data %X size %u", sc->socket, data, data_size ); - data_size = (sc->buf_out_size + data_size < sc->buf_out_size_max) ? data_size : (sc->buf_out_size_max - sc->buf_out_size); - memcpy(sc->buf_out + sc->buf_out_size, data, data_size); - sc->buf_out_size += data_size; - dap_events_socket_set_writable_unsafe(sc, true); - return data_size; + a_data_size = (a_es->buf_out_size + a_data_size < a_es->buf_out_size_max) ? a_data_size : (a_es->buf_out_size_max - a_es->buf_out_size); + memcpy(a_es->buf_out + a_es->buf_out_size, a_data, a_data_size); + a_es->buf_out_size += a_data_size; + dap_events_socket_set_writable_unsafe(a_es, true); + return a_data_size; } /** diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 06fbebb723..750cec1fbf 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -973,11 +973,15 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg) */ static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg) { - dap_events_socket_t * l_esocket = (dap_events_socket_t*) a_arg; - if (dap_events_socket_check_unsafe(a_es->worker,l_esocket)){ + dap_events_socket_handler_t * l_es_handler = (dap_events_socket_handler_t*) a_arg; + assert(l_es_handler); + dap_events_socket_t * l_esocket = (dap_events_socket_t*) l_es_handler->esocket; + if (dap_events_socket_check_uuid_unsafe (a_es->worker,l_esocket, l_es_handler->uuid)){ ((dap_events_socket_t*)a_arg)->flags |= DAP_SOCK_SIGNAL_CLOSE; // Send signal to socket to kill - }else + }else{ log_it(L_INFO, "While we were sending the delete() message, esocket %p has been disconnected", l_esocket); + DAP_DELETE(l_es_handler); + } } /** @@ -989,7 +993,7 @@ static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_a { dap_worker_msg_reassign_t * l_msg = (dap_worker_msg_reassign_t*) a_arg; dap_events_socket_t * l_es_reassign = l_msg->esocket; - if (dap_events_socket_check_unsafe(a_es->worker,l_es_reassign)){ + if (dap_events_socket_check_uuid_unsafe(a_es->worker,l_es_reassign, l_msg->esocket_uuid)){ if( l_es_reassign->was_reassigned && l_es_reassign->flags & DAP_SOCK_REASSIGN_ONCE) { log_it(L_INFO, "Reassgment request with DAP_SOCK_REASSIGN_ONCE allowed only once, declined reassigment from %u to %u", l_es_reassign->worker->id, l_msg->worker_new->id); @@ -1039,16 +1043,13 @@ static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg) { dap_worker_msg_io_t * l_msg = a_arg; - // Check if it was removed from the list - dap_events_socket_t *l_msg_es = NULL; - pthread_rwlock_rdlock(&a_es->worker->esocket_rwlock); - HASH_FIND(hh_worker, a_es->worker->esockets, &l_msg->esocket , sizeof (void*), l_msg_es ); - pthread_rwlock_unlock(&a_es->worker->esocket_rwlock); - if ( l_msg_es == NULL){ + if ( !dap_events_socket_check_uuid_unsafe(a_es->worker,l_msg->esocket,l_msg->esocket_uuid)){ log_it(L_INFO, "We got i/o message for esocket %p thats now not in list. Lost %u data", l_msg->esocket, l_msg->data_size); DAP_DELETE(l_msg); return; } + dap_events_socket_t *l_msg_es = l_msg->esocket; + if (l_msg->flags_set & DAP_SOCK_CONNECTING) if (! (l_msg_es->flags & DAP_SOCK_CONNECTING) ){ l_msg_es->flags |= DAP_SOCK_CONNECTING; diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 0e107c6e1c..207a1067c1 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -340,6 +340,7 @@ size_t dap_events_socket_pop_from_buf_in(dap_events_socket_t *sc, void * data, s // Non-MT functions bool dap_events_socket_check_unsafe(dap_worker_t * a_worker,dap_events_socket_t * a_es); +bool dap_events_socket_check_uuid_unsafe(dap_worker_t * a_worker,dap_events_socket_t * a_es, uint128_t a_es_uuid); void dap_events_socket_set_readable_unsafe(dap_events_socket_t * sc,bool is_ready); void dap_events_socket_set_writable_unsafe(dap_events_socket_t * sc,bool is_ready); diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/net/core/include/dap_worker.h index 4da7c9cff0..ca912b4346 100644 --- a/dap-sdk/net/core/include/dap_worker.h +++ b/dap-sdk/net/core/include/dap_worker.h @@ -95,12 +95,14 @@ typedef struct dap_worker // Message for reassigment typedef struct dap_worker_msg_reassign{ dap_events_socket_t * esocket; + uint128_t esocket_uuid; dap_worker_t * worker_new; } dap_worker_msg_reassign_t; // Message for input/output queue typedef struct dap_worker_msg_io{ dap_events_socket_t * esocket; + uint128_t esocket_uuid; size_t data_size; void *data; uint32_t flags_set; diff --git a/dap-sdk/net/server/notify_server/src/dap_notify_srv.c b/dap-sdk/net/server/notify_server/src/dap_notify_srv.c index 7bc137cbf0..dc963b3c96 100644 --- a/dap-sdk/net/server/notify_server/src/dap_notify_srv.c +++ b/dap-sdk/net/server/notify_server/src/dap_notify_srv.c @@ -168,9 +168,10 @@ static void s_notify_server_callback_queue(dap_events_socket_t * a_es, void * a_ continue; } size_t l_str_len = a_arg? strlen((char*)a_arg): 0; - if(l_str_len) + if(l_str_len){ dap_events_socket_write_inter(a_es->worker->queue_es_io_input[l_worker_id],l_socket_handler->esocket, a_arg,l_str_len+1); + } DAP_DELETE(a_arg); } pthread_rwlock_unlock(&s_notify_server_clients_mutex); diff --git a/dap-sdk/net/stream/stream/dap_stream_pkt.c b/dap-sdk/net/stream/stream/dap_stream_pkt.c index 1394eb16a8..e1b43a9d92 100644 --- a/dap-sdk/net/stream/stream/dap_stream_pkt.c +++ b/dap-sdk/net/stream/stream/dap_stream_pkt.c @@ -170,6 +170,9 @@ size_t dap_stream_pkt_write_mt(dap_worker_t * a_w,dap_events_socket_t *a_es, dap { dap_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_worker_msg_io_t); stream_pkt_hdr_t *l_pkt_hdr; + l_msg->esocket = a_es; + if(a_es) + l_msg->esocket_uuid = a_es->uuid; // TODO replace function signature with UUID in place of worker+esocket l_msg->data_size = 16-a_data_size%16+a_data_size+sizeof(*l_pkt_hdr); l_msg->data = DAP_NEW_SIZE(void,l_msg->data_size); l_pkt_hdr=(stream_pkt_hdr_t*) l_msg->data; diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 160e1d5d05..b65a35401a 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -1206,13 +1206,15 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if(l_chain_pkt_data_size>1) l_error_str[l_chain_pkt_data_size-1]='\0'; // To be sure that nobody sends us garbage // without trailing zero - log_it(L_WARNING,"In: ERROR packet: '%s'",l_ch_chain->node_client, l_chain_pkt_data_size>1? - l_error_str:"<empty>"); + log_it(L_WARNING,"In from remote addr %s chain id 0x%016x got error on his side: '%s'", + l_ch_chain->ch->stream->esocket->remote_addr_str? + l_ch_chain->ch->stream->esocket->remote_addr_str: "<no addr>", + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt_data_size>1? l_error_str:"<empty>"); } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: { - log_it(L_INFO, "In: SYNCED_ALL net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); + log_it(L_INFO, "In from "NODE_ADDR_FP_STR": SYNCED_ALL net 0x%016x chain 0x%016x cell 0x%016x",NODE_ADDR_FP_ARGS_S(l_ch_chain->node_client->remote_node_addr), l_chain_pkt->hdr.net_id.uint64 , + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); } break; default: { diff --git a/modules/global-db/dap_chain_global_db.c b/modules/global-db/dap_chain_global_db.c index afcfddce19..1e5fa1979f 100644 --- a/modules/global-db/dap_chain_global_db.c +++ b/modules/global-db/dap_chain_global_db.c @@ -246,6 +246,8 @@ int dap_chain_global_db_init(dap_config_t * g_config) unlock(); if( res != 0 ) log_it(L_CRITICAL, "Hadn't initialized db driver \"%s\" on path \"%s\"", l_driver_name, l_storage_path); + else + log_it(L_NOTICE,"GlobalDB initialized"); return res; } diff --git a/modules/global-db/dap_chain_global_db_driver_sqlite.c b/modules/global-db/dap_chain_global_db_driver_sqlite.c index 186b3d9c51..8e445bbeb5 100644 --- a/modules/global-db/dap_chain_global_db_driver_sqlite.c +++ b/modules/global-db/dap_chain_global_db_driver_sqlite.c @@ -27,6 +27,7 @@ #include <stddef.h> #include <string.h> #include <pthread.h> +#include <errno.h> #ifdef DAP_OS_UNIX #include <unistd.h> @@ -34,6 +35,7 @@ #include "dap_common.h" #include "dap_hash.h" #include "dap_strfuncs.h" +#include "dap_file_utils.h" #include "dap_chain_global_db_driver_sqlite.h" #define LOG_TAG "db_sqlite" @@ -88,8 +90,26 @@ int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks log_it(L_ERROR, "Can't init sqlite err=%d (%s)", l_ret, sqlite3_errstr(l_ret)); return -2; } + // Check paths and create them if nessesary + char * l_filename_dir = dap_path_get_dirname(a_filename_db); + if(!dap_dir_test(l_filename_dir)){ + log_it(L_NOTICE, "No directory %s, trying to create...",l_filename_dir); + int l_mkdir_ret = dap_mkdir_with_parents(l_filename_dir); + int l_errno = errno; + if(!dap_dir_test(l_filename_dir)){ + char l_errbuf[255]; + l_errbuf[0] = '\0'; + strerror_r(l_errno,l_errbuf,sizeof(l_errbuf)); + log_it(L_ERROR, "Can't create directory, error code %d, error string \"%s\"", l_mkdir_ret, l_errbuf); + DAP_DELETE(l_filename_dir); + return -21; + }else + log_it(L_NOTICE,"Directory created"); + } + DAP_DEL_Z(l_filename_dir); + // Open Sqlite file, create if nessesary char *l_error_message = NULL; - s_db = dap_db_driver_sqlite_open(a_filename_db, SQLITE_OPEN_READWRITE, &l_error_message); + s_db = dap_db_driver_sqlite_open(a_filename_db, SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, &l_error_message); if(!s_db) { log_it(L_ERROR, "Can't init sqlite err: \"%s\"", l_error_message); dap_db_driver_sqlite_free(l_error_message); -- GitLab