From 2af9114add35ca5dffa7ae860f674de77bb35602 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Mon, 27 Dec 2021 11:29:03 +0000 Subject: [PATCH] bugfix-4969 --- CMakeLists.txt | 3 +- dap-sdk/net/client/dap_client.c | 7 +- dap-sdk/net/client/dap_client_pvt.c | 4 +- dap-sdk/net/core/dap_timerfd.c | 84 ++++++++++------- dap-sdk/net/core/dap_worker.c | 9 +- dap-sdk/net/core/include/dap_timerfd.h | 4 +- dap-sdk/net/stream/stream/dap_stream.c | 31 +++---- modules/chain/dap_chain_cell.c | 2 +- modules/chain/dap_chain_ledger.c | 9 +- modules/channel/chain/dap_stream_ch_chain.c | 93 +++++++++++++++---- .../chain/include/dap_stream_ch_chain.h | 6 +- modules/common/dap_chain_common.c | 12 +-- .../global-db/dap_chain_global_db_driver.c | 3 +- .../dap_chain_global_db_driver_cdb.c | 17 ++-- modules/global-db/dap_chain_global_db_hist.c | 77 +++++++-------- .../global-db/dap_chain_global_db_remote.c | 28 +++--- .../include/dap_chain_global_db_driver.h | 4 +- modules/net/dap_chain_net.c | 42 ++++++--- modules/net/dap_chain_node_client.c | 60 +++++++----- modules/net/include/dap_chain_net.h | 2 +- modules/net/include/dap_chain_node_client.h | 2 +- modules/type/dag/dap_chain_cs_dag.c | 13 +-- 22 files changed, 302 insertions(+), 210 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4d2044726a..c45ab919d9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,8 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 3.10) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "3.0-12") - +set(CELLFRAME_SDK_NATIVE_VERSION "3.0-13") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(BUILD_CRYPTO_TESTS ON) diff --git a/dap-sdk/net/client/dap_client.c b/dap-sdk/net/client/dap_client.c index 89d098685e..903a5f3621 100644 --- a/dap-sdk/net/client/dap_client.c +++ b/dap-sdk/net/client/dap_client.c @@ -232,7 +232,7 @@ static void s_go_stage_on_client_worker_unsafe(dap_worker_t * a_worker,void * a_ dap_client_stage_t l_stage_target = ((struct go_stage_arg*) a_arg)->stage_target; dap_client_callback_t l_stage_end_callback= ((struct go_stage_arg*) a_arg)->stage_end_callback; dap_client_pvt_t * l_client_pvt = ((struct go_stage_arg*) a_arg)->client_pvt; - dap_client_t * l_client = ((struct go_stage_arg*) a_arg)->client_pvt->client; + dap_client_t * l_client = l_client_pvt->client; bool l_flag_delete_after = ((struct go_stage_arg *) a_arg)->flag_delete_after ;// Delete after stage achievement DAP_DELETE(a_arg); @@ -321,7 +321,10 @@ void dap_client_go_stage(dap_client_t * a_client, dap_client_stage_t a_stage_tar } dap_client_pvt_t * l_client_pvt = dap_client_pvt_find(a_client->pvt_uuid); - assert(l_client_pvt); + if (NULL == l_client_pvt) { + log_it(L_ERROR, "dap_client_go_stage, client_pvt == NULL"); + return; + } struct go_stage_arg *l_stage_arg = DAP_NEW_Z(struct go_stage_arg); if (! l_stage_arg) return; l_stage_arg->stage_end_callback = a_stage_end_callback; diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index ad7cddd9b2..a943cdd132 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -938,7 +938,7 @@ static void s_request_response(void * a_response, size_t a_response_size, void * static void s_enc_init_response(dap_client_t * a_client, void * a_response, size_t a_response_size) { dap_client_pvt_t * l_client_pvt = dap_client_pvt_find(a_client->pvt_uuid); - if (!l_client_pvt) return; + if (!l_client_pvt || l_client_pvt->is_to_delete) return; if (!l_client_pvt->session_key_open){ log_it(L_ERROR, "m_enc_init_response: session is NULL!"); @@ -1257,6 +1257,8 @@ static void s_stream_es_callback_delete(dap_events_socket_t *a_es, void *arg) } l_client_pvt->stream = NULL; l_client_pvt->stream_es = NULL; + l_client_pvt->stage_status = STAGE_STATUS_ERROR; + l_client_pvt->stage = l_client_pvt->stage_target = STAGE_BEGIN; } /** diff --git a/dap-sdk/net/core/dap_timerfd.c b/dap-sdk/net/core/dap_timerfd.c index 044dca1e62..6a34ba8ada 100644 --- a/dap-sdk/net/core/dap_timerfd.c +++ b/dap-sdk/net/core/dap_timerfd.c @@ -108,6 +108,7 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t dap_timerfd_t* l_timerfd = dap_timerfd_create( a_timeout_ms, a_callback, a_callback_arg); if(l_timerfd){ dap_worker_add_events_socket(l_timerfd->events_socket, a_worker); + l_timerfd->worker = a_worker; return l_timerfd; }else{ log_it(L_CRITICAL,"Can't create timer"); @@ -254,56 +255,75 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t return l_timerfd; } -/** - * @brief s_es_callback_timer - * @param a_event_sock - */ -static void s_es_callback_timer(struct dap_events_socket *a_event_sock) +static void s_timerfd_reset(dap_timerfd_t *a_timerfd, dap_events_socket_t *a_event_sock) { - dap_timerfd_t *l_timerfd = a_event_sock->_inheritor; - // run user's callback - if(l_timerfd->callback && l_timerfd->callback(l_timerfd->callback_arg)) { - //printf("\nread() returned %d, %d\n", l_ptiu64, l_read_ret); #if defined DAP_OS_LINUX - struct itimerspec l_ts; - // repeat never - l_ts.it_interval.tv_sec = 0; - l_ts.it_interval.tv_nsec = 0; - // timeout for timer - l_ts.it_value.tv_sec = l_timerfd->timeout_ms / 1000; - l_ts.it_value.tv_nsec = (l_timerfd->timeout_ms % 1000) * 1000000; - if(timerfd_settime(l_timerfd->tfd, 0, &l_ts, NULL) < 0) { - log_it(L_WARNING, "callback_timerfd_read() failed: timerfd_settime() errno=%d\n", errno); - } + struct itimerspec l_ts; + // repeat never + l_ts.it_interval.tv_sec = 0; + l_ts.it_interval.tv_nsec = 0; + // timeout for timer + l_ts.it_value.tv_sec = a_timerfd->timeout_ms / 1000; + l_ts.it_value.tv_nsec = (a_timerfd->timeout_ms % 1000) * 1000000; + if(timerfd_settime(a_timerfd->tfd, 0, &l_ts, NULL) < 0) { + log_it(L_WARNING, "Reset timerfd failed: timerfd_settime() errno=%d\n", errno); + } #elif defined (DAP_OS_BSD) - dap_worker_add_events_socket_unsafe(a_event_sock,a_event_sock->worker); - //struct kevent * l_event = &a_event_sock->kqueue_event; - //EV_SET(l_event, 0, a_event_sock->kqueue_base_filter, a_event_sock->kqueue_base_flags,a_event_sock->kqueue_base_fflags,a_event_sock->kqueue_data,a_event_sock); - //kevent(a_event_sock->worker->kqueue_fd,l_event,1,NULL,0,NULL); + dap_worker_add_events_socket_unsafe(a_event_sock,a_event_sock->worker); +//struct kevent * l_event = &a_event_sock->kqueue_event; +//EV_SET(l_event, 0, a_event_sock->kqueue_base_filter, a_event_sock->kqueue_base_flags,a_event_sock->kqueue_base_fflags,a_event_sock->kqueue_data,a_event_sock); +//kevent(a_event_sock->worker->kqueue_fd,l_event,1,NULL,0,NULL); #elif defined (DAP_OS_WINDOWS) - /*LARGE_INTEGER l_due_time; - l_due_time.QuadPart = (long long)l_timerfd->timeout_ms * _MSEC; - if (!SetWaitableTimer(l_timerfd->th, &l_due_time, 0, TimerAPCb, l_timerfd, false)) { - log_it(L_CRITICAL, "Waitable timer not reset, error %d", GetLastError()); - CloseHandle(l_timerfd->th); - }*/ // Wtf is this entire thing for?... + /*LARGE_INTEGER l_due_time; + l_due_time.QuadPart = (long long)a_timerfd->timeout_ms * _MSEC; + if (!SetWaitableTimer(a_timerfd->th, &l_due_time, 0, TimerAPCb, a_timerfd, false)) { + log_it(L_CRITICAL, "Waitable timer not reset, error %d", GetLastError()); + CloseHandle(a_timerfd->th); + }*/ // Wtf is this entire thing for?... #else -#error "No timer callback realization for your platform" +#error "No timer reset realization for your platform" #endif #ifndef DAP_OS_BSD - dap_events_socket_set_readable_unsafe(a_event_sock, true); + dap_events_socket_set_readable_unsafe(a_event_sock, true); #endif +} +/** + * @brief s_es_callback_timer + * @param a_event_sock + */ +static void s_es_callback_timer(struct dap_events_socket *a_event_sock) +{ + dap_timerfd_t *l_timerfd = a_event_sock->_inheritor; + // run user's callback + if(l_timerfd->callback && l_timerfd->callback(l_timerfd->callback_arg)) { + s_timerfd_reset(l_timerfd, a_event_sock); } else { l_timerfd->events_socket->flags |= DAP_SOCK_SIGNAL_CLOSE; } } +/** + * @brief dap_timerfd_reset + * @param a_tfd + */ +void dap_timerfd_reset(dap_timerfd_t *a_timerfd) +{ + if (!a_timerfd) + return; + dap_events_socket_t *l_sock = NULL; + if (a_timerfd->worker) + l_sock = dap_worker_esocket_find_uuid(a_timerfd->worker, a_timerfd->esocket_uuid); + else if (a_timerfd->proc_thread) + l_sock = a_timerfd->events_socket; + if (l_sock) + s_timerfd_reset(a_timerfd, l_sock); +} + /** * @brief dap_timerfd_stop * @param a_tfd - * @param a_callback */ void dap_timerfd_delete(dap_timerfd_t *a_timerfd) { diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 68c7bcc6e9..83d7c90e29 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -543,7 +543,7 @@ void *dap_worker_thread(void *arg) } #endif } - else if ( (! l_flag_rdhup || !l_flag_error ) && (!(l_cur->flags& DAP_SOCK_CONNECTING )) ) { + else if (!l_flag_rdhup && !l_flag_error && !(l_cur->flags & DAP_SOCK_CONNECTING)) { log_it(L_DEBUG, "EPOLLIN triggered but nothing to read"); //dap_events_socket_set_readable_unsafe(l_cur,false); } @@ -564,7 +564,7 @@ void *dap_worker_thread(void *arg) break; default:{} } - if(s_debug_reactor) + //if(s_debug_reactor) log_it(L_INFO,"RDHUP event on esocket %p (%"DAP_FORMAT_SOCKET") type %d", l_cur, l_cur->socket, l_cur->type ); } @@ -621,12 +621,13 @@ void *dap_worker_thread(void *arg) // Socket is ready to write and not going to close if( ( l_flag_write&&(l_cur->flags & DAP_SOCK_READY_TO_WRITE) ) || ( (l_cur->flags & DAP_SOCK_READY_TO_WRITE) && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) ) ) { - if(s_debug_reactor) - log_it(L_DEBUG, "Main loop output: %zu bytes to send", l_cur->buf_out_size); if(l_cur->callbacks.write_callback) l_cur->callbacks.write_callback(l_cur, NULL); // Call callback to process write event + if(s_debug_reactor) + log_it(L_DEBUG, "Main loop output: %zu bytes to send", l_cur->buf_out_size); + if ( l_cur->worker ){ // esocket wasn't unassigned in callback, we need some other ops with it if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) { diff --git a/dap-sdk/net/core/include/dap_timerfd.h b/dap-sdk/net/core/include/dap_timerfd.h index 6247112a51..9815ab819c 100644 --- a/dap-sdk/net/core/include/dap_timerfd.h +++ b/dap-sdk/net/core/include/dap_timerfd.h @@ -53,6 +53,8 @@ typedef struct dap_timerfd { #elif defined(DAP_OS_LINUX) int tfd; //timer file descriptor #endif + dap_worker_t *worker; + dap_proc_thread_t *proc_thread; dap_events_socket_t *events_socket; dap_events_socket_uuid_t esocket_uuid; dap_timerfd_callback_t callback; @@ -69,4 +71,4 @@ dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg); dap_timerfd_t* dap_timerfd_start_on_proc_thread(dap_proc_thread_t * a_proc_thread, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg); void dap_timerfd_delete(dap_timerfd_t *l_timerfd); - +void dap_timerfd_reset(dap_timerfd_t *a_timerfd); diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index 689c203b0a..a36776d860 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -646,9 +646,9 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) read_bytes_to=0; if(a_stream->pkt_buf_in_data_size>=(a_stream->pkt_buf_in->hdr.size + sizeof(dap_stream_pkt_hdr_t)) ){ // If we have all the packet in packet buffer if(a_stream->pkt_buf_in_data_size > a_stream->pkt_buf_in->hdr.size + sizeof(dap_stream_pkt_hdr_t)){ // If we have little more data then we need for packet buffer - //log_it(L_WARNING,"Prefilled packet buffer has %u bytes more than we need, they're lost",a_stream->pkt_buf_in_data_size-a_stream->pkt_buf_in->hdr.size); + log_it(L_WARNING,"Prefilled packet buffer has %zu bytes more than we need, it's lost",a_stream->pkt_buf_in_data_size-a_stream->pkt_buf_in->hdr.size); + DAP_DEL_Z(a_stream->pkt_buf_in); a_stream->pkt_buf_in_data_size = 0; - a_stream->pkt_buf_in = NULL; } else{ s_stream_proc_pkt_in(a_stream); @@ -692,24 +692,24 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) found_sig=true; //dap_stream_pkt_t *temp_pkt = dap_stream_pkt_detect( (uint8_t*)pkt + 1 ,pkt->hdr.size+sizeof(stream_pkt_hdr_t) ); + size_t l_pkt_size = pkt->hdr.size + sizeof(dap_stream_pkt_hdr_t); if(bytes_left_to_read >= sizeof (dap_stream_pkt_t)){ - if(bytes_left_to_read <(pkt->hdr.size+sizeof(dap_stream_pkt_t) )){ // Is all the packet in da buf? + if (bytes_left_to_read < l_pkt_size) { // Is all the packet in da buf? read_bytes_to=bytes_left_to_read; }else{ - read_bytes_to=pkt->hdr.size+sizeof(dap_stream_pkt_t); + read_bytes_to = l_pkt_size; } } //log_it(L_DEBUG, "Detected packet signature pkt->hdr.size=%u read_bytes_to=%u bytes_left_to_read=%u pkt_offset=%u" // ,pkt->hdr.size, read_bytes_to, bytes_left_to_read,pkt_offset); if(read_bytes_to > HEADER_WITH_SIZE_FIELD){ // If we have size field, we can allocate memory - a_stream->pkt_buf_in_size_expected =( pkt->hdr.size+sizeof(dap_stream_pkt_hdr_t)); - size_t pkt_buf_in_size_expected=a_stream->pkt_buf_in_size_expected; - a_stream->pkt_buf_in=(dap_stream_pkt_t *) malloc(pkt_buf_in_size_expected); - if(read_bytes_to>(pkt->hdr.size+sizeof(dap_stream_pkt_hdr_t) )){ + a_stream->pkt_buf_in_size_expected = l_pkt_size; + a_stream->pkt_buf_in = DAP_NEW_SIZE(struct dap_stream_pkt, l_pkt_size); + if (read_bytes_to > l_pkt_size) { //log_it(L_WARNING,"For some strange reasons we have read_bytes_to=%u is bigger than expected pkt length(%u bytes). Dropped %u bytes", // pkt->hdr.size+sizeof(stream_pkt_hdr_t),read_bytes_to- pkt->hdr.size+sizeof(stream_pkt_hdr_t)); - read_bytes_to=(pkt->hdr.size+sizeof(dap_stream_pkt_hdr_t)); + read_bytes_to = l_pkt_size; } if(read_bytes_to>bytes_left_to_read){ //log_it(L_WARNING,"For some strange reasons we have read_bytes_to=%u is bigger that's left in input buffer (%u bytes). Dropped %u bytes", @@ -720,11 +720,11 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) proc_data+=(read_bytes_to + pkt_offset); bytes_left_to_read-=read_bytes_to; a_stream->pkt_buf_in_data_size=(read_bytes_to); - if(a_stream->pkt_buf_in_data_size==(pkt->hdr.size + sizeof(dap_stream_pkt_hdr_t))){ + if(a_stream->pkt_buf_in_data_size==l_pkt_size){ // log_it(INFO,"All the packet is present in da buffer (hdr.size=%u read_bytes_to=%u buf_in_size=%u)" // ,sid->pkt_buf_in->hdr.size,read_bytes_to,sid->conn->buf_in_size); s_stream_proc_pkt_in(a_stream); - }else if(a_stream->pkt_buf_in_data_size>pkt->hdr.size + sizeof(dap_stream_pkt_hdr_t)){ + }else if(a_stream->pkt_buf_in_data_size>l_pkt_size){ //log_it(L_WARNING,"Input: packet buffer has %u bytes more than we need, they're lost",a_stream->pkt_buf_in_data_size-pkt->hdr.size); }else{ //log_it(L_DEBUG,"Input: Not all stream packet in input (hdr.size=%u read_bytes_to=%u)",a_stream->pkt_buf_in->hdr.size,read_bytes_to); @@ -820,13 +820,8 @@ static void s_stream_proc_pkt_in(dap_stream_t * a_stream) memcpy(l_ret_pkt.sig, c_dap_stream_sig, sizeof(l_ret_pkt.sig)); dap_events_socket_write_unsafe(a_stream->esocket, &l_ret_pkt, sizeof(l_ret_pkt)); // Reset client keepalive timer - if (a_stream->keepalive_timer && a_stream->keepalive_timer->events_socket->worker) { - void *l_arg = a_stream->keepalive_timer->callback_arg; - dap_timerfd_delete(a_stream->keepalive_timer); - a_stream->keepalive_timer = dap_timerfd_start_on_worker(a_stream->stream_worker->worker, - STREAM_KEEPALIVE_TIMEOUT * 1000, - (dap_timerfd_callback_t)s_callback_keepalive, - l_arg); + if (a_stream->keepalive_timer) { + dap_timerfd_reset(a_stream->keepalive_timer); } } break; case STREAM_PKT_TYPE_ALIVE: diff --git a/modules/chain/dap_chain_cell.c b/modules/chain/dap_chain_cell.c index 833697d368..a5ab6b4800 100644 --- a/modules/chain/dap_chain_cell.c +++ b/modules/chain/dap_chain_cell.c @@ -213,7 +213,7 @@ int dap_chain_cell_load(dap_chain_t * a_chain, const char * a_cell_file_path) unsigned long l_read = fread(l_element, 1, l_el_size, l_f); if(l_read == l_el_size) { dap_chain_atom_verify_res_t l_res = a_chain->callback_atom_add(a_chain, l_element, l_el_size); // !!! blocking GDB call !!! - if (l_res == ATOM_PASS && l_res == ATOM_REJECT) { + if (l_res == ATOM_PASS || l_res == ATOM_REJECT) { DAP_DELETE(l_element); } ++q; diff --git a/modules/chain/dap_chain_ledger.c b/modules/chain/dap_chain_ledger.c index 59ce1b6b5b..462c07ef93 100644 --- a/modules/chain/dap_chain_ledger.c +++ b/modules/chain/dap_chain_ledger.c @@ -1220,7 +1220,7 @@ int dap_chain_ledger_token_emission_add_check(dap_ledger_t *a_ledger, byte_t *a_ */ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_emission, size_t a_token_emission_size) { - int ret = 0; + int l_ret = 0; dap_ledger_private_t *l_ledger_priv = PVT(a_ledger); const char * c_token_ticker = ((dap_chain_datum_token_emission_t *)a_token_emission)->hdr.ticker; @@ -1259,6 +1259,7 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_ } else { HASH_ADD(hh, l_ledger_priv->treshold_emissions, datum_token_emission_hash, sizeof(l_token_emission_hash), l_token_emission_item); + l_ret = DAP_CHAIN_CS_VERIFY_CODE_TX_NO_TOKEN; } pthread_rwlock_unlock( l_token_item ? &l_token_item->token_emissions_rwlock : &l_ledger_priv->treshold_emissions_rwlock); @@ -1282,7 +1283,7 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_ if(s_debug_more) log_it(L_WARNING,"Treshold for emissions is overfulled (%zu max), dropping down new data, added nothing", s_treshold_emissions_max); - ret = -2; + l_ret = -2; } } else { if (l_token_item) { @@ -1291,10 +1292,10 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_ ((dap_chain_datum_token_emission_t *)a_token_emission)->hdr.value, c_token_ticker, l_hash_str); } - ret = -1; + l_ret = -1; } DAP_DELETE(l_hash_str); - return ret; + return l_ret; } int dap_chain_ledger_token_emission_load(dap_ledger_t *a_ledger, byte_t *a_token_emission, size_t a_token_emission_size) diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 7cefb1817f..e794995c26 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -127,7 +127,7 @@ int dap_stream_ch_chain_init() s_stream_ch_packet_out); s_debug_more = dap_config_get_item_bool_default(g_config,"stream_ch_chain","debug_more",false); s_update_pack_size = dap_config_get_item_int16_default(g_config,"stream_ch_chain","update_pack_size",100); - s_list_ban_groups = dap_config_get_array_str(g_config, "general", "ban_list_sync_groups", &s_size_ban_groups); + s_list_ban_groups = dap_config_get_array_str(g_config, "stream_ch_chain", "ban_list_sync_groups", &s_size_ban_groups); return 0; } @@ -150,7 +150,7 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg) UNUSED(a_arg); a_ch->internal = DAP_NEW_Z(dap_stream_ch_chain_t); dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); - l_ch_chain->ch = a_ch; + l_ch_chain->_inheritor = a_ch; } /** @@ -338,7 +338,7 @@ static void s_sync_out_gdb_first_worker_callback(dap_worker_t *a_worker, void *a l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); if (s_debug_more ) log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB"); - dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch , DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, + dap_stream_ch_chain_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, &l_node_addr, sizeof(dap_chain_node_addr_t)); if(l_ch_chain->callback_notify_packet_out) @@ -370,7 +370,7 @@ static void s_sync_out_gdb_last_worker_callback(dap_worker_t *a_worker, void *a_ if (s_debug_more ) log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB"); - dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + dap_stream_ch_chain_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); l_ch_chain->state = CHAIN_STATE_IDLE; @@ -719,7 +719,8 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) } } // save data to global_db - if(!dap_chain_global_db_obj_save(dap_store_obj_copy(l_obj, 1), 1)) { + dap_store_obj_t *l_obj_copy = dap_store_obj_copy(l_obj, 1); + if(!dap_chain_global_db_obj_save(l_obj_copy, 1)) { struct sync_request *l_sync_req_err = DAP_DUP(l_sync_request); dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_gdb_in_pkt_error_worker_callback, l_sync_req_err); @@ -727,6 +728,8 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) if (s_debug_more) log_it(L_DEBUG, "Added new GLOBAL_DB synchronization record"); } + DAP_DELETE(l_obj_copy->group); + DAP_DELETE(l_obj_copy); } if(l_store_obj) { dap_store_obj_free(l_store_obj, l_data_obj_count); @@ -767,6 +770,53 @@ static void s_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, a_net_id, a_chain_id, a_cell_id, a_err_string); } +static bool s_chain_timer_callback(void *a_arg) +{ + dap_worker_t *l_worker = dap_events_get_current_worker(dap_events_get_default()); + dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(l_worker), *(dap_stream_ch_uuid_t *)a_arg); + if (!l_ch) { + DAP_DELETE(a_arg); + return false; + } + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + if (!l_ch_chain->was_active) { + if (l_ch_chain->state != CHAIN_STATE_IDLE) { + dap_stream_ch_chain_go_idle(l_ch_chain); + } + DAP_DELETE(a_arg); + l_ch_chain->activity_timer = NULL; + return false; + } + l_ch_chain->was_active = false; + // Sending dumb packet with nothing to inform remote thats we're just skiping atoms of GDB's, nothing freezed + if (l_ch_chain->state == CHAIN_STATE_SYNC_CHAINS) + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD, + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); + if (l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB || l_ch_chain->state == CHAIN_STATE_UPDATE_GLOBAL_DB) { + if (s_debug_more) + log_it(L_INFO, "Send one global_db TSD packet (rest=%zu/%zu items)", + dap_db_log_list_get_count_rest(l_ch_chain->request_db_log), + dap_db_log_list_get_count(l_ch_chain->request_db_log)); + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD, + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); + } + return true; +} + +static void s_chain_timer_reset(dap_stream_ch_chain_t *a_ch_chain) +{ + if (a_ch_chain->state == CHAIN_STATE_IDLE) + return; + if (!a_ch_chain->activity_timer) { + dap_stream_ch_uuid_t *l_uuid = DAP_DUP(&DAP_STREAM_CH(a_ch_chain)->uuid); + a_ch_chain->activity_timer = dap_timerfd_start_on_worker(DAP_STREAM_CH(a_ch_chain)->stream_worker->worker, + 3000, s_chain_timer_callback, (void *)l_uuid); + } else + a_ch_chain->was_active = true; +} + /** * @brief s_stream_ch_packet_in * @param a_ch @@ -790,6 +840,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) sizeof(l_chain_pkt->hdr)); } + s_chain_timer_reset(l_ch_chain); size_t l_chain_pkt_data_size = l_ch_pkt->hdr.size-sizeof (l_chain_pkt->hdr) ; uint16_t l_acl_idx = dap_chain_net_acl_idx_by_id(l_chain_pkt->hdr.net_id ); @@ -1315,7 +1366,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_error_str[l_chain_pkt_data_size-1]='\0'; // To be sure that nobody sends us garbage // without trailing zero log_it(L_WARNING,"In from remote addr %s chain id 0x%016"DAP_UINT64_FORMAT_x" got error on his side: '%s'", - l_ch_chain->ch->stream->esocket->remote_addr_str ? l_ch_chain->ch->stream->esocket->remote_addr_str: "<no addr>", + DAP_STREAM_CH(l_ch_chain)->stream->esocket->remote_addr_str ? DAP_STREAM_CH(l_ch_chain)->stream->esocket->remote_addr_str: "<no addr>", l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt_data_size > 1 ? l_error_str:"<empty>"); } break; @@ -1383,26 +1434,32 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) return; dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); + s_chain_timer_reset(l_ch_chain); + switch (l_ch_chain->state) { // Update list of global DB records to remote case CHAIN_STATE_UPDATE_GLOBAL_DB: { dap_stream_ch_chain_update_element_t l_data[s_update_pack_size]; uint_fast16_t i; + dap_db_log_list_obj_t *l_obj = NULL; for (i = 0; i < s_update_pack_size; i++) { - dap_db_log_list_obj_t *l_obj = dap_db_log_list_get(l_ch_chain->request_db_log); - if (!l_obj) + l_obj = dap_db_log_list_get(l_ch_chain->request_db_log); + if (!l_obj || DAP_POINTER_TO_INT(l_obj) == 1) break; memcpy(&l_data[i].hash, &l_obj->hash, sizeof(dap_chain_hash_fast_t)); l_data[i].size = l_obj->pkt->data_size; } if (i) { - dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB, + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, l_data, i * sizeof(dap_stream_ch_chain_update_element_t)); l_ch_chain->stats_request_gdb_processed += i; if (s_debug_more) log_it(L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB"); + } else if (l_obj) { + // We need to return into the write callback + a_ch->stream->esocket->buf_out_zero_count = 0; } else { l_ch_chain->request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(dap_chain_net_by_id( l_ch_chain->request_hdr.net_id)); @@ -1425,8 +1482,10 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) size_t l_pkt_size = 0; for (uint_fast16_t l_skip_count = 0; l_skip_count < s_skip_in_reactor_count; ) { l_obj = dap_db_log_list_get(l_ch_chain->request_db_log); - if (!l_obj) + if (!l_obj || DAP_POINTER_TO_INT(l_obj) == 1) { + l_skip_count = s_skip_in_reactor_count; break; + } dap_stream_ch_chain_hash_item_t *l_hash_item = NULL; unsigned l_hash_item_hashv = 0; HASH_VALUE(&l_obj->hash, sizeof(dap_chain_hash_fast_t), l_hash_item_hashv); @@ -1455,7 +1514,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) } if (l_pkt_size) { // If request was from defined node_addr we update its state - if( s_debug_more) + if (s_debug_more) log_it(L_INFO, "Send one global_db packet len=%zu (rest=%zu/%zu items)", l_pkt_size, dap_db_log_list_get_count_rest(l_ch_chain->request_db_log), dap_db_log_list_get_count(l_ch_chain->request_db_log)); @@ -1464,10 +1523,8 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_hdr.cell_id.uint64, l_pkt, l_pkt_size); DAP_DELETE(l_pkt); } else if (l_obj) { - // Sending dumb packet with nothing to inform remote thats we're just skiping GDBs, nothing freezed - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD, - l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, - l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); + // We need to return into the write callback + a_ch->stream->esocket->buf_out_zero_count = 0; } else { log_it( L_INFO,"Syncronized database: items syncronyzed %"DAP_UINT64_FORMAT_U" from %zu", l_ch_chain->stats_request_gdb_processed, dap_db_log_list_get_count(l_ch_chain->request_db_log)); @@ -1578,10 +1635,8 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) 0, l_ch_chain->callback_notify_arg); } if (! l_was_sent_smth ){ - // Sending dumb packet with nothing to inform remote thats we're just skiping atoms, nothing freezed - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD, - l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, - l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); + // We need to return into the write callback + a_ch->stream->esocket->buf_out_zero_count = 0; } } break; default: break; diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index 5fd46f5c68..fa63d53c26 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -58,7 +58,7 @@ typedef struct dap_stream_ch_chain_hash_item{ typedef struct dap_stream_ch_chain { - dap_stream_ch_t * ch; + void *_inheritor; dap_stream_ch_chain_state_t state; dap_chain_node_client_t * node_client; // Node client associated with stream @@ -75,12 +75,16 @@ typedef struct dap_stream_ch_chain { dap_stream_ch_chain_pkt_hdr_t request_hdr; dap_list_t *request_db_iter; + bool was_active; + dap_timerfd_t *activity_timer; + dap_stream_ch_chain_callback_packet_t callback_notify_packet_out; dap_stream_ch_chain_callback_packet_t callback_notify_packet_in; void *callback_notify_arg; } dap_stream_ch_chain_t; #define DAP_STREAM_CH_CHAIN(a) ((dap_stream_ch_chain_t *) ((a)->internal) ) +#define DAP_STREAM_CH(a) ((dap_stream_ch_t *)((a)->_inheritor)) #define DAP_CHAIN_PKT_EXPECT_SIZE 7168 int dap_stream_ch_chain_init(void); diff --git a/modules/common/dap_chain_common.c b/modules/common/dap_chain_common.c index d10e19b974..00714b23ce 100644 --- a/modules/common/dap_chain_common.c +++ b/modules/common/dap_chain_common.c @@ -354,7 +354,7 @@ char *dap_chain_balance_to_coins(uint128_t a_balance) return l_buf; } -const union { uint64_t u64[2]; uint32_t u32[4]; } c_pow10[DATOSHI_POW + 1] = { +const union { uint64_t u64[2]; uint32_t u32[4]; } DAP_ALIGN_PACKED c_pow10[DATOSHI_POW + 1] = { { .u64 = {0, 1ULL} }, // 0 { .u64 = {0, 10ULL} }, // 1 { .u64 = {0, 100ULL} }, // 2 @@ -421,29 +421,29 @@ uint128_t dap_chain_balance_scan(char *a_balance) log_it(L_WARNING, "Input number is too big"); return l_nul; } - l_tmp = (l_tmp << 64) + c_pow10[i].u64[1] * l_digit; + l_tmp = (l_tmp << 64) + (uint128_t)c_pow10[i].u64[1] * l_digit; l_ret = dap_uint128_add(l_ret, l_tmp); if (l_ret == l_nul) return l_nul; #else uint128_t l_tmp; l_tmp.hi = 0; - l_tmp.lo = c_pow10[i].u32[2] * l_digit; + l_tmp.lo = (uint64_t)c_pow10[i].u32[2] * l_digit; l_ret = dap_uint128_add(l_ret, l_tmp); if (l_ret.hi == 0 && l_ret.lo == 0) return l_nul; - uint64_t l_mul = c_pow10[i].u32[3] * l_digit; + uint64_t l_mul = (uint64_t)c_pow10[i].u32[3] * l_digit; l_tmp.lo = l_mul << 32; l_tmp.hi = l_mul >> 32; l_ret = dap_uint128_add(l_ret, l_tmp); if (l_ret.hi == 0 && l_ret.lo == 0) return l_nul; l_tmp.lo = 0; - l_tmp.hi = c_pow10[i].u32[0] * l_digit; + l_tmp.hi = (uint64_t)c_pow10[i].u32[0] * l_digit; l_ret = dap_uint128_add(l_ret, l_tmp); if (l_ret.hi == 0 && l_ret.lo == 0) return l_nul; - l_mul = c_pow10[i].u32[1] * l_digit; + l_mul = (uint64_t)c_pow10[i].u32[1] * l_digit; if (l_mul >> 32) { log_it(L_WARNING, "Input number is too big"); return l_nul; diff --git a/modules/global-db/dap_chain_global_db_driver.c b/modules/global-db/dap_chain_global_db_driver.c index fe0a48daa5..3e3176a1af 100644 --- a/modules/global-db/dap_chain_global_db_driver.c +++ b/modules/global-db/dap_chain_global_db_driver.c @@ -198,8 +198,7 @@ dap_store_obj_t* dap_store_obj_copy(dap_store_obj_t *a_store_obj, size_t a_store memcpy(l_store_obj_dst, l_store_obj_src, sizeof(dap_store_obj_t)); l_store_obj_dst->group = dap_strdup(l_store_obj_src->group); l_store_obj_dst->key = dap_strdup(l_store_obj_src->key); - l_store_obj_dst->value = DAP_NEW_SIZE(uint8_t, l_store_obj_dst->value_len); - memcpy(l_store_obj_dst->value, l_store_obj_src->value, l_store_obj_dst->value_len); + l_store_obj_dst->value = DAP_DUP_SIZE(l_store_obj_src->value, l_store_obj_src->value_len); } return l_store_obj; } diff --git a/modules/global-db/dap_chain_global_db_driver_cdb.c b/modules/global-db/dap_chain_global_db_driver_cdb.c index 699f07942d..f4e006fcd2 100644 --- a/modules/global-db/dap_chain_global_db_driver_cdb.c +++ b/modules/global-db/dap_chain_global_db_driver_cdb.c @@ -83,12 +83,12 @@ static void cdb_serialize_val_to_dap_store_obj(pdap_store_obj_t a_obj, const cha a_obj->key = dap_strdup(key); a_obj->id = dap_hex_to_uint(val, sizeof(uint64_t)); offset += sizeof(uint64_t); - a_obj->value_len = dap_hex_to_uint(val + offset, sizeof(unsigned long)); - offset += sizeof(unsigned long); + a_obj->value_len = dap_hex_to_uint(val + offset, sizeof(uint64_t)); + offset += sizeof(uint64_t); a_obj->value = DAP_NEW_SIZE(uint8_t, a_obj->value_len); memcpy(a_obj->value, val + offset, a_obj->value_len); offset += a_obj->value_len; - a_obj->timestamp = (time_t)dap_hex_to_uint(val + offset, sizeof(time_t)); + a_obj->timestamp = dap_hex_to_uint(val + offset, sizeof(uint64_t)); } /** A callback function designed for finding a last item */ @@ -595,19 +595,18 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) { cdb_record l_rec; l_rec.key = a_store_obj->key; //dap_strdup(a_store_obj->key); int offset = 0; - char *l_val = DAP_NEW_Z_SIZE(char, sizeof(uint64_t) + sizeof(unsigned long) + a_store_obj->value_len + sizeof(time_t)); + char *l_val = DAP_NEW_Z_SIZE(char, sizeof(uint64_t) + sizeof(uint64_t) + a_store_obj->value_len + sizeof(uint64_t)); dap_uint_to_hex(l_val, ++l_cdb_i->id, sizeof(uint64_t)); offset += sizeof(uint64_t); - dap_uint_to_hex(l_val + offset, a_store_obj->value_len, sizeof(unsigned long)); - offset += sizeof(unsigned long); + dap_uint_to_hex(l_val + offset, a_store_obj->value_len, sizeof(uint64_t)); + offset += sizeof(uint64_t); if(a_store_obj->value && a_store_obj->value_len){ memcpy(l_val + offset, a_store_obj->value, a_store_obj->value_len); DAP_DELETE(a_store_obj->value); } offset += a_store_obj->value_len; - unsigned long l_time = (unsigned long)a_store_obj->timestamp; - dap_uint_to_hex(l_val + offset, l_time, sizeof(time_t)); - offset += sizeof(time_t); + dap_uint_to_hex(l_val + offset, a_store_obj->timestamp, sizeof(uint64_t)); + offset += sizeof(uint64_t); l_rec.val = l_val; if (cdb_set2(l_cdb_i->cdb, l_rec.key, (int)strlen(l_rec.key), l_rec.val, offset, CDB_INSERTCACHE | CDB_OVERWRITE, 0) != CDB_SUCCESS) { log_it(L_ERROR, "Couldn't add record with key [%s] to CDB: \"%s\"", l_rec.key, cdb_errmsg(cdb_errno(l_cdb_i->cdb))); diff --git a/modules/global-db/dap_chain_global_db_hist.c b/modules/global-db/dap_chain_global_db_hist.c index 21da7d5321..53d540f1ba 100644 --- a/modules/global-db/dap_chain_global_db_hist.c +++ b/modules/global-db/dap_chain_global_db_hist.c @@ -28,9 +28,6 @@ typedef struct dap_tx_data{ #define LOG_TAG "dap_chain_global_db_hist" -static uint16_t s_size_ban_list = 0; -static char **s_ban_list = NULL; - /** * @brief Packs members of a_rec structure into a single string. * @@ -276,30 +273,32 @@ dap_db_log_list_t* dap_db_log_list_start(dap_chain_node_addr_t a_addr, int a_fla } dap_list_free(l_groups_masks); - static int l_try_read_ban_list = 0; + static uint16_t s_size_ban_list = 0; + static char **s_ban_list = NULL; + static bool l_try_read_ban_list = false; - if (!l_try_read_ban_list) { - s_ban_list = dap_config_get_array_str(g_config, "general", "ban_list_sync_groups", &s_size_ban_list); - l_try_read_ban_list = 1; - } + if (!l_try_read_ban_list) { + s_ban_list = dap_config_get_array_str(g_config, "stream_ch_chain", "ban_list_sync_groups", &s_size_ban_list); + l_try_read_ban_list = true; + } - /* delete groups from ban list */ - if (s_size_ban_list > 0) { - for (dap_list_t *l_groups = l_dap_db_log_list->groups; l_groups; ) { - int found = 0; - for (int i = 0; i < s_size_ban_list; i++) { - if (dap_fnmatch(s_ban_list[i], l_groups->data, FNM_NOESCAPE)) { - dap_list_t *l_tmp = l_groups->next; - dap_list_delete_link(l_dap_db_log_list->groups, l_groups); - l_groups = l_tmp; - found = 1; - break; - } + /* delete groups from ban list */ + if (s_size_ban_list > 0) { + for (dap_list_t *l_groups = l_dap_db_log_list->groups; l_groups; ) { + bool l_found = false; + for (int i = 0; i < s_size_ban_list; i++) { + if (!dap_fnmatch(s_ban_list[i], l_groups->data, FNM_NOESCAPE)) { + dap_list_t *l_tmp = l_groups->next; + l_dap_db_log_list->groups = dap_list_delete_link(l_dap_db_log_list->groups, l_groups); + l_groups = l_tmp; + l_found = true; + break; } - if (found) continue; - l_groups = dap_list_next(l_groups); } + if (l_found) continue; + l_groups = dap_list_next(l_groups); } + } for (dap_list_t *l_groups = l_dap_db_log_list->groups; l_groups; l_groups = dap_list_next(l_groups)) { dap_db_log_list_group_t *l_replace = DAP_NEW_Z(dap_db_log_list_group_t); @@ -365,33 +364,19 @@ size_t dap_db_log_list_get_count_rest(dap_db_log_list_t *a_db_log_list) */ dap_db_log_list_obj_t *dap_db_log_list_get(dap_db_log_list_t *a_db_log_list) { - if(!a_db_log_list) + if (!a_db_log_list) return NULL; - dap_list_t *l_list; - bool l_is_process; - int l_count = 0; - while(1) { - pthread_mutex_lock(&a_db_log_list->list_mutex); - l_is_process = a_db_log_list->is_process; - // check next item - l_list = a_db_log_list->list_read; - if (l_list){ - a_db_log_list->list_read = dap_list_next(a_db_log_list->list_read); - a_db_log_list->items_rest--; - } - pthread_mutex_unlock(&a_db_log_list->list_mutex); - // wait reading next item, no more 1 sec (50 ms * 100 times) - if(!l_list && l_is_process) { - dap_usleep(DAP_USEC_PER_SEC / 200); - l_count++; - if(l_count > 100) - break; - } - else - break; + pthread_mutex_lock(&a_db_log_list->list_mutex); + int l_is_process = a_db_log_list->is_process; + // check next item + dap_list_t *l_list = a_db_log_list->list_read; + if (l_list){ + a_db_log_list->list_read = dap_list_next(a_db_log_list->list_read); + a_db_log_list->items_rest--; } + pthread_mutex_unlock(&a_db_log_list->list_mutex); //log_it(L_DEBUG, "get item n=%d", a_db_log_list->items_number - a_db_log_list->items_rest); - return l_list ? (dap_db_log_list_obj_t *)l_list->data : NULL; + return l_list ? (dap_db_log_list_obj_t *)l_list->data : DAP_INT_TO_POINTER(l_is_process); } /** diff --git a/modules/global-db/dap_chain_global_db_remote.c b/modules/global-db/dap_chain_global_db_remote.c index e1c5d27c13..53bec9f4ef 100644 --- a/modules/global-db/dap_chain_global_db_remote.c +++ b/modules/global-db/dap_chain_global_db_remote.c @@ -255,20 +255,20 @@ dap_store_obj_pkt_t *dap_store_packet_single(pdap_store_obj_t a_store_obj) uint32_t l_type = a_store_obj->type; memcpy(l_pkt->data, &l_type, sizeof(uint32_t)); uint64_t l_offset = sizeof(uint32_t); - uint16_t group_size = (uint16_t) dap_strlen(a_store_obj->group); - memcpy(l_pkt->data + l_offset, &group_size, sizeof(uint16_t)); + uint16_t l_group_size = (uint16_t) dap_strlen(a_store_obj->group); + memcpy(l_pkt->data + l_offset, &l_group_size, sizeof(uint16_t)); l_offset += sizeof(uint16_t); - memcpy(l_pkt->data + l_offset, a_store_obj->group, group_size); - l_offset += group_size; + memcpy(l_pkt->data + l_offset, a_store_obj->group, l_group_size); + l_offset += l_group_size; memcpy(l_pkt->data + l_offset, &a_store_obj->id, sizeof(uint64_t)); l_offset += sizeof(uint64_t); - memcpy(l_pkt->data + l_offset, &a_store_obj->timestamp, sizeof(time_t)); - l_offset += sizeof(time_t); - uint16_t key_size = (uint16_t) dap_strlen(a_store_obj->key); - memcpy(l_pkt->data + l_offset, &key_size, sizeof(uint16_t)); + memcpy(l_pkt->data + l_offset, &a_store_obj->timestamp, sizeof(uint64_t)); + l_offset += sizeof(uint64_t); + uint16_t l_key_size = (uint16_t) dap_strlen(a_store_obj->key); + memcpy(l_pkt->data + l_offset, &l_key_size, sizeof(uint16_t)); l_offset += sizeof(uint16_t); - memcpy(l_pkt->data + l_offset, a_store_obj->key, key_size); - l_offset += key_size; + memcpy(l_pkt->data + l_offset, a_store_obj->key, l_key_size); + l_offset += l_key_size; memcpy(l_pkt->data + l_offset, &a_store_obj->value_len, sizeof(uint64_t)); l_offset += sizeof(uint64_t); memcpy(l_pkt->data + l_offset, a_store_obj->value, a_store_obj->value_len); @@ -314,9 +314,9 @@ dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, siz memcpy(&obj->id, pkt->data + offset, sizeof(uint64_t)); offset += sizeof(uint64_t); - if (offset+sizeof (time_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'timestamp' field"); break;} // Check for buffer boundries - memcpy(&obj->timestamp, pkt->data + offset, sizeof(time_t)); - offset += sizeof(time_t); + if (offset+sizeof (uint64_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'timestamp' field"); break;} // Check for buffer boundries + memcpy(&obj->timestamp, pkt->data + offset, sizeof(uint64_t)); + offset += sizeof(uint64_t); if (offset+sizeof (uint16_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'key_length' field"); break;} // Check for buffer boundries memcpy(&str_length, pkt->data + offset, sizeof(uint16_t)); @@ -337,7 +337,7 @@ dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, siz memcpy(obj->value, pkt->data + offset, obj->value_len); offset += obj->value_len; } - //assert(pkt->data_size == offset); + assert(pkt->data_size == offset); if(store_obj_count) *store_obj_count = count; return store_obj; diff --git a/modules/global-db/include/dap_chain_global_db_driver.h b/modules/global-db/include/dap_chain_global_db_driver.h index ecf75715ec..af3174367a 100644 --- a/modules/global-db/include/dap_chain_global_db_driver.h +++ b/modules/global-db/include/dap_chain_global_db_driver.h @@ -31,13 +31,13 @@ typedef struct dap_store_obj { uint64_t id; - time_t timestamp; + uint64_t timestamp; uint8_t type; char *group; char *key; const char *c_key; uint8_t *value; - size_t value_len; + uint64_t value_len; }DAP_ALIGN_PACKED dap_store_obj_t, *pdap_store_obj_t; typedef struct dap_store_obj_pkt { diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 28ddbebe8c..c8bae49ea6 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -224,7 +224,8 @@ static const dap_chain_node_client_callbacks_t s_node_link_callbacks={ .connected=s_node_link_callback_connected, .disconnected=s_node_link_callback_disconnected, .stage=s_node_link_callback_stage, - .error=s_node_link_callback_error + .error=s_node_link_callback_error, + .delete=s_node_link_callback_delete }; @@ -628,29 +629,34 @@ static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client, */ static void s_node_link_callback_delete(dap_chain_node_client_t * a_node_client, void * a_arg) { - log_it(L_DEBUG,"Remove node client from list"); dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); + if (!a_node_client->keep_connection) { + dap_notify_server_send_f_mt("{" + "class:\"NetLinkDelete\"," + "net_id:0x%016" DAP_UINT64_FORMAT_X "," + "cell_id:0x%016"DAP_UINT64_FORMAT_X"," + "address:\""NODE_ADDR_FP_STR"\"" + "}\n", a_node_client->net->pub.id.uint64, a_node_client->info->hdr.cell_id.uint64, + NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address)); + return; + } pthread_rwlock_wrlock(&l_net_pvt->rwlock); for ( dap_list_t * it = l_net_pvt->links; it; it=it->next ){ - dap_chain_node_client_t * l_client =(dap_chain_node_client_t *) it->data; - // Cut out current iterator if it equals with deleting handler - if (l_client == a_node_client){ - if (it->prev) - it->prev->next = it->next; - if (it->next) - it->next->prev = it->prev; + if (it->data == a_node_client) { + log_it(L_DEBUG,"Replace node client with new one"); + it->data = dap_chain_net_client_create_n_connect(l_net, a_node_client->info); } } pthread_rwlock_unlock(&l_net_pvt->rwlock); dap_notify_server_send_f_mt("{" - "class:\"NetLinkDelete\"," + "class:\"NetLinkRestart\"," "net_id:0x%016" DAP_UINT64_FORMAT_X "," "cell_id:0x%016"DAP_UINT64_FORMAT_X"," "address:\""NODE_ADDR_FP_STR"\"" - "}\n", a_node_client->net->pub.id.uint64, a_node_client->info->hdr.cell_id.uint64, + "}\n", a_node_client->net->pub.id.uint64, a_node_client->info->hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address)); - dap_chain_node_client_close(a_node_client); + // Then a_alient wiil be destroyed in a right way } /** @@ -833,6 +839,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) dap_list_t *l_tmp = l_net_pvt->links; while (l_tmp) { dap_list_t *l_next =l_tmp->next; + ((dap_chain_node_client_t *)l_tmp->data)->keep_connection = false; dap_chain_node_client_close(l_tmp->data); DAP_DELETE(l_tmp); l_tmp = l_next; @@ -848,6 +855,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) break; } // disable SYNC_GDB + l_net_pvt->active_link = NULL; l_net_pvt->flags &= ~F_DAP_CHAIN_NET_GO_SYNC; l_net_pvt->last_sync = 0; } break; @@ -1021,7 +1029,8 @@ bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t if (l_links->data == l_net_pvt->active_link) { dap_chain_node_client_t *l_client = (dap_chain_node_client_t *)l_links->data; if (l_client->state >= NODE_CLIENT_STATE_ESTABLISHED && - l_client->state < NODE_CLIENT_STATE_SYNCED) { + l_client->state < NODE_CLIENT_STATE_SYNCED && + a_client != l_client) { l_found = true; break; } @@ -1035,11 +1044,14 @@ bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t return !l_found; } -void dap_chain_net_sync_unlock(dap_chain_net_t *a_net) +void dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client) { + if (!a_net) + return; dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); pthread_rwlock_rdlock(&l_net_pvt->rwlock); - l_net_pvt->active_link = NULL; + if (!a_client || l_net_pvt->active_link == a_client) + l_net_pvt->active_link = NULL; pthread_rwlock_unlock(&l_net_pvt->rwlock); } /** diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 81716e854c..beab9660ca 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -164,17 +164,18 @@ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) pthread_mutex_unlock(&l_node_client->wait_mutex); l_node_client->esocket_uuid = 0; + dap_chain_net_sync_unlock(l_node_client->net, l_node_client); if (l_node_client->callbacks.disconnected) { l_node_client->callbacks.disconnected(l_node_client, l_node_client->callbacks_arg); } if (l_node_client->keep_connection) { dap_events_socket_uuid_t *l_uuid = DAP_DUP(&l_node_client->uuid); - dap_timerfd_start_on_worker(l_node_client->stream_worker - ? l_node_client->stream_worker->worker - : dap_events_worker_get_auto(), - s_timer_update_states * 1000, - s_timer_update_states_callback, - l_uuid); + l_node_client->sync_timer = dap_timerfd_start_on_worker(l_node_client->stream_worker + ? l_node_client->stream_worker->worker + : dap_events_worker_get_auto(), + s_timer_update_states * 1000, + s_timer_update_states_callback, + l_uuid); } return; } @@ -224,7 +225,7 @@ static bool s_timer_update_states_callback(void *a_arg) dap_client_t * l_client = dap_client_from_esocket(l_es); if (l_client ) { dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t*) l_client->_inheritor; - if (l_node_client && l_node_client->ch_chain) { + if (l_node_client && l_node_client->ch_chain && l_node_client->stream_worker && l_node_client->ch_chain_uuid) { dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(l_node_client->stream_worker, l_node_client->ch_chain_uuid); if (l_ch) { dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); @@ -249,9 +250,12 @@ static bool s_timer_update_states_callback(void *a_arg) // if we not returned yet l_me->state = NODE_CLIENT_STATE_DISCONNECTED; if (l_me->keep_connection) { - log_it(L_INFO, "Reconnecting node client with peer "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_me->remote_node_addr)); - l_me->state = NODE_CLIENT_STATE_CONNECTING ; - dap_client_go_stage(l_me->client, STAGE_STREAM_STREAMING, s_stage_connected_callback); + if (dap_client_pvt_find(l_me->client->pvt_uuid)) { + log_it(L_INFO, "Reconnecting node client with peer "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_me->remote_node_addr)); + l_me->state = NODE_CLIENT_STATE_CONNECTING ; + dap_client_go_stage(l_me->client, STAGE_STREAM_STREAMING, s_stage_connected_callback); + } else + dap_chain_node_client_close(l_me); } DAP_DELETE(l_uuid); return false; @@ -294,7 +298,10 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) dap_events_socket_uuid_t *l_uuid = DAP_DUP(&l_node_client->uuid); dap_worker_exec_callback_on(l_stream->esocket->worker, s_node_client_connected_synchro_start_callback, l_uuid); dap_events_socket_uuid_t *l_uuid_timer = DAP_DUP(&l_node_client->uuid); - dap_timerfd_start_on_worker(l_stream->esocket->worker, s_timer_update_states * 1000, s_timer_update_states_callback, l_uuid_timer); + l_node_client->sync_timer = dap_timerfd_start_on_worker(l_stream->esocket->worker, + s_timer_update_states * 1000, + s_timer_update_states_callback, + l_uuid_timer); } } #ifndef _WIN32 @@ -363,6 +370,8 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg) { + UNUSED(a_ch_chain); + UNUSED(a_pkt_data_size); dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; switch (a_pkt_type) { @@ -451,18 +460,19 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" started to sync %s chain",l_net->pub.name, NODE_ADDR_FP_ARGS(l_node_addr), l_node_client->cur_chain->name ); } - dap_stream_ch_chain_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ, + dap_stream_ch_chain_pkt_write_unsafe(l_node_client->ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ, l_net->pub.id.uint64 , l_chain_id.uint64,l_cell_id.uint64,NULL,0); }else{ // If no - over with sync process dap_chain_node_addr_t * l_node_addr = dap_chain_net_get_cur_addr(l_net); log_it(L_INFO, "In: State node %s."NODE_ADDR_FP_STR" is SYNCED",l_net->pub.name, NODE_ADDR_FP_ARGS(l_node_addr) ); - dap_chain_net_sync_unlock(l_net); + dap_chain_net_sync_unlock(l_net, l_node_client); l_node_client->state = NODE_CLIENT_STATE_SYNCED; if (dap_chain_net_get_target_state(l_net) == NET_STATE_ONLINE) dap_chain_net_set_state(l_net, NET_STATE_ONLINE); else dap_chain_net_state_go_to(l_net, NET_STATE_OFFLINE); + dap_timerfd_reset(l_node_client->sync_timer); #ifndef _WIN32 pthread_cond_broadcast(&l_node_client->wait_cond); #else @@ -741,19 +751,23 @@ void dap_chain_node_client_close(dap_chain_node_client_t *a_client) if (l_client_found) { HASH_DEL(s_clients,l_client_found); DAP_DELETE(l_client_found); + if (a_client->callbacks.delete) + a_client->callbacks.delete(a_client, a_client->net); char l_node_addr_str[INET_ADDRSTRLEN] = {}; inet_ntop(AF_INET, &a_client->info->hdr.ext_addr_v4, l_node_addr_str, INET_ADDRSTRLEN); log_it(L_INFO, "Closing node client to uplink %s:%d", l_node_addr_str, a_client->info->hdr.ext_port); - dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(a_client->stream_worker, a_client->ch_chain_uuid); - if (l_ch) { - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); - l_ch_chain->callback_notify_packet_in = NULL; - l_ch_chain->callback_notify_packet_out = NULL; - } - l_ch = dap_stream_ch_find_by_uuid_unsafe(a_client->stream_worker, a_client->ch_chain_net_uuid); - if (l_ch) { - dap_stream_ch_chain_net_t *l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(l_ch); - l_ch_chain_net->notify_callback = NULL; + if (a_client->stream_worker) { + dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(a_client->stream_worker, a_client->ch_chain_uuid); + if (l_ch) { + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + l_ch_chain->callback_notify_packet_in = NULL; + l_ch_chain->callback_notify_packet_out = NULL; + } + l_ch = dap_stream_ch_find_by_uuid_unsafe(a_client->stream_worker, a_client->ch_chain_net_uuid); + if (l_ch) { + dap_stream_ch_chain_net_t *l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(l_ch); + l_ch_chain_net->notify_callback = NULL; + } } // clean client dap_client_pvt_t *l_client_pvt = dap_client_pvt_find(a_client->client->pvt_uuid); diff --git a/modules/net/include/dap_chain_net.h b/modules/net/include/dap_chain_net.h index 58ea4b07db..aef71319e3 100644 --- a/modules/net/include/dap_chain_net.h +++ b/modules/net/include/dap_chain_net.h @@ -120,7 +120,7 @@ void dap_chain_net_set_flag_sync_from_zero(dap_chain_net_t * a_net, bool a_flag_ bool dap_chain_net_get_flag_sync_from_zero( dap_chain_net_t * a_net); bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client); -void dap_chain_net_sync_unlock(dap_chain_net_t *a_net); +void dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client); dap_chain_net_t * dap_chain_net_by_name( const char * a_name); dap_chain_net_t * dap_chain_net_by_id( dap_chain_net_id_t a_id); diff --git a/modules/net/include/dap_chain_node_client.h b/modules/net/include/dap_chain_node_client.h index 21902259e9..6f11f89e50 100644 --- a/modules/net/include/dap_chain_node_client.h +++ b/modules/net/include/dap_chain_node_client.h @@ -113,8 +113,8 @@ typedef struct dap_chain_node_client { struct in6_addr remote_ipv6; bool keep_connection; - bool is_connected; + dap_timerfd_t *sync_timer; // callbacks dap_chain_node_client_callbacks_t callbacks; void * callbacks_arg; diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 29cfaf91f7..ad3aa84b2b 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -432,6 +432,7 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha break; case DAP_CHAIN_CS_VERIFY_CODE_TX_NO_PREVIOUS: case DAP_CHAIN_CS_VERIFY_CODE_TX_NO_EMISSION: + case DAP_CHAIN_CS_VERIFY_CODE_TX_NO_TOKEN: pthread_rwlock_wrlock(l_events_rwlock); HASH_ADD(hh, PVT(l_dag)->events_treshold, hash, sizeof(l_event_item->hash), l_event_item); pthread_rwlock_unlock(l_events_rwlock); @@ -915,16 +916,16 @@ dap_chain_cs_dag_event_item_t* dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t dap_dag_threshold_verification_res_t ret = dap_chain_cs_dag_event_verify_hashes_with_treshold (a_dag, l_event_item->event); if ( ret == DAP_THRESHOLD_OK || ret == DAP_THRESHOLD_CONFLICTING ){ // All its hashes are in main table, move thats one too into it HASH_DEL(PVT(a_dag)->events_treshold,l_event_item); - if(ret == DAP_THRESHOLD_OK){ if(s_debug_more) { char * l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_item->hash); log_it(L_DEBUG, "Processing event (threshold): %s...", l_event_hash_str); DAP_DELETE(l_event_hash_str); - } int l_add_res = s_dap_chain_add_atom_to_events_table(a_dag, a_ledger, l_event_item); - HASH_ADD(hh, PVT(a_dag)->events,hash,sizeof (l_event_item->hash), l_event_item); - s_dag_events_lasts_process_new_last_event(a_dag, l_event_item); - if(! l_add_res){ + } + int l_add_res = s_dap_chain_add_atom_to_events_table(a_dag, a_ledger, l_event_item); + if (!l_add_res) { + HASH_ADD(hh, PVT(a_dag)->events,hash,sizeof (l_event_item->hash), l_event_item); + s_dag_events_lasts_process_new_last_event(a_dag, l_event_item); if(s_debug_more) log_it(L_INFO, "... moved from treshold to main chains"); res = true; @@ -932,7 +933,7 @@ dap_chain_cs_dag_event_item_t* dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t }else{ if(s_debug_more) log_it(L_WARNING, "... error adding"); - //todo: delete event + DAP_DELETE(l_event_item); } //res = true; }else if(ret == DAP_THRESHOLD_CONFLICTING) -- GitLab