diff --git a/CMakeLists.txt b/CMakeLists.txt index 003674ebc06c7fc8eefd7bdf263824857c49aa86..41419dd410d2ba5beb061ee59350ee7f72c60969 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 3.0) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.9-41") +set(CELLFRAME_SDK_NATIVE_VERSION "2.9-42") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") message("Cellframe modules: ${CELLFRAME_MODULES}") diff --git a/dap-sdk/core/include/dap_common.h b/dap-sdk/core/include/dap_common.h index 46b5578d90139ce055ded6108faaea8cfcd70461..1b8b665af8f52b5959292e0816ecae2c994f5f01 100755 --- a/dap-sdk/core/include/dap_common.h +++ b/dap-sdk/core/include/dap_common.h @@ -122,8 +122,7 @@ typedef uint8_t byte_t; #define DAP_NEW_Z_SIZE(a, b) DAP_CAST_REINT(a, rpcalloc(1,b)) #define DAP_REALLOC(a, b) rprealloc(a,b) #define DAP_DELETE(a) rpfree(a) - #define DAP_DUP(a, b) memcpy(a, b, sizeof(*b)) - #define DAP_DUP_SIZE(a, b, s) memcpy(a, b, s) + #define DAP_DUP(a) memcpy(rpmalloc(sizeof(*a)), a, sizeof(*a)) #else #define DAP_MALLOC(a) malloc(a) #define DAP_FREE(a) free(a) @@ -139,8 +138,7 @@ typedef uint8_t byte_t; #define DAP_NEW_Z_SIZE(a, b) DAP_CAST_REINT(a, calloc(1,b)) #define DAP_REALLOC(a, b) realloc(a,b) #define DAP_DELETE(a) free(a) - #define DAP_DUP(a, b) memcpy(a, b, sizeof(*b)) - #define DAP_DUP_SIZE(a, b, s) memcpy(a, b, s) + #define DAP_DUP(a) memcpy(malloc(sizeof(*a)), a, sizeof(*a)) #endif #define DAP_DEL_Z(a) if(a) { DAP_DELETE(a); a=NULL;} @@ -192,13 +190,11 @@ DAP_STATIC_INLINE void _dap_aligned_free( void *ptr ) #if __SIZEOF_LONG__==8 #define DAP_UINT64_FORMAT_X "lX" #define DAP_UINT64_FORMAT_x "lx" -#define DAP_UINT64_FORMAT_u "lu" -#define DAP_UINT64_FORMAT_U "lU" +#define DAP_UINT64_FORMAT_U "lu" #elif __SIZEOF_LONG__==4 #define DAP_UINT64_FORMAT_X "llX" #define DAP_UINT64_FORMAT_x "llx" -#define DAP_UINT64_FORMAT_u "llu" -#define DAP_UINT64_FORMAT_U "llU" +#define DAP_UINT64_FORMAT_U "llu" #else #error "DAP_UINT64_FORMAT_* are undefined for your platform" #endif diff --git a/dap-sdk/core/include/dap_tsd.h b/dap-sdk/core/include/dap_tsd.h index bb4594b51a5809fb99e138ea36d666256fe18956..c96385913f9e5f62ccadd15ae07845b17a682ccd 100644 --- a/dap-sdk/core/include/dap_tsd.h +++ b/dap-sdk/core/include/dap_tsd.h @@ -23,11 +23,12 @@ This file is part of DAP SDK the open source project #pragma once #include "dap_common.h" #include "dap_strfuncs.h" -typedef struct dap_tsd{ + +typedef struct dap_tsd { uint16_t type; uint32_t size; byte_t data[]; -} dap_tsd_t; +} DAP_ALIGN_PACKED dap_tsd_t; dap_tsd_t * dap_tsd_create(uint16_t a_type, const void * a_data, size_t a_data_size); dap_tsd_t* dap_tsd_find(byte_t * a_data, size_t a_data_size,uint16_t a_type); diff --git a/dap-sdk/crypto/src/dap_enc_dilithium.c b/dap-sdk/crypto/src/dap_enc_dilithium.c index ede8f973cd0124e932040ac9e0c2554443ab127f..aa9821f1bca46cad28006baa628f3100087e5e30 100755 --- a/dap-sdk/crypto/src/dap_enc_dilithium.c +++ b/dap-sdk/crypto/src/dap_enc_dilithium.c @@ -178,7 +178,7 @@ dilithium_signature_t* dap_enc_dilithium_read_signature(uint8_t *a_buf, size_t a l_shift_mem += sizeof(uint64_t); if( l_sign->sig_len> (UINT64_MAX - l_shift_mem ) ){ - log_it(L_ERROR,"::read_signature() Buflen inside signature %"DAP_UINT64_FORMAT_u" is too big ", l_sign->sig_len); + log_it(L_ERROR,"::read_signature() Buflen inside signature %"DAP_UINT64_FORMAT_U" is too big ", l_sign->sig_len); DAP_DELETE(l_sign); return NULL; } @@ -196,7 +196,7 @@ dilithium_signature_t* dap_enc_dilithium_read_signature(uint8_t *a_buf, size_t a } if( (uint64_t) a_buflen < (l_shift_mem + l_sign->sig_len) ){ - log_it(L_ERROR,"::read_signature() Buflen %zd is smaller than all fields together(%"DAP_UINT64_FORMAT_u")", a_buflen, + log_it(L_ERROR,"::read_signature() Buflen %zd is smaller than all fields together(%"DAP_UINT64_FORMAT_U")", a_buflen, l_shift_mem + l_sign->sig_len ); DAP_DELETE(l_sign); return NULL; @@ -204,7 +204,7 @@ dilithium_signature_t* dap_enc_dilithium_read_signature(uint8_t *a_buf, size_t a l_sign->sig_data = DAP_NEW_SIZE(unsigned char, l_sign->sig_len); if (!l_sign->sig_data){ - log_it(L_ERROR,"::read_signature() Can't allocate sig_data %"DAP_UINT64_FORMAT_u" size", l_sign->sig_len); + log_it(L_ERROR,"::read_signature() Can't allocate sig_data %"DAP_UINT64_FORMAT_U" size", l_sign->sig_len); DAP_DELETE(l_sign); return NULL; }else{ @@ -240,13 +240,13 @@ dilithium_signature_t* dap_enc_dilithium_read_signature_old(uint8_t *a_buf, size uint64_t l_shift_mem = sizeof(uint32_t) + sizeof(uint32_t); memcpy(&l_sign->sig_len, a_buf + l_shift_mem, sizeof(unsigned long long)); if( l_sign->sig_len> (UINT64_MAX - l_shift_mem ) ){ - log_it(L_ERROR,"::read_signature_old() Buflen inside signature %"DAP_UINT64_FORMAT_u" is too big ", l_sign->sig_len); + log_it(L_ERROR,"::read_signature_old() Buflen inside signature %"DAP_UINT64_FORMAT_U" is too big ", l_sign->sig_len); DAP_DELETE(l_sign); return NULL; } if( (uint64_t) a_buflen < (l_shift_mem + l_sign->sig_len) ){ - log_it(L_ERROR,"::read_signature_old() Buflen %zd is smaller than all fields together(%" DAP_UINT64_FORMAT_u")", a_buflen, + log_it(L_ERROR,"::read_signature_old() Buflen %zd is smaller than all fields together(%" DAP_UINT64_FORMAT_U")", a_buflen, l_shift_mem + l_sign->sig_len ); DAP_DELETE(l_sign); return NULL; @@ -255,7 +255,7 @@ dilithium_signature_t* dap_enc_dilithium_read_signature_old(uint8_t *a_buf, size l_shift_mem += sizeof(unsigned long long); l_sign->sig_data = DAP_NEW_SIZE(unsigned char, l_sign->sig_len); if (!l_sign->sig_data) - log_it(L_ERROR,"::read_signature_old() Can't allocate sig_data %"DAP_UINT64_FORMAT_u" size", l_sign->sig_len); + log_it(L_ERROR,"::read_signature_old() Can't allocate sig_data %"DAP_UINT64_FORMAT_U" size", l_sign->sig_len); memcpy(l_sign->sig_data, a_buf + l_shift_mem, l_sign->sig_len); return l_sign; @@ -288,13 +288,13 @@ dilithium_signature_t* dap_enc_dilithium_read_signature_old2(uint8_t *a_buf, siz uint64_t l_shift_mem = sizeof(uint32_t) + sizeof(uint64_t); memcpy(&l_sign->sig_len, a_buf + l_shift_mem, sizeof(unsigned long long)); if( l_sign->sig_len> (UINT64_MAX - l_shift_mem ) ){ - log_it(L_ERROR,"::read_signature_old() Buflen inside signature %"DAP_UINT64_FORMAT_u" is too big ", l_sign->sig_len); + log_it(L_ERROR,"::read_signature_old() Buflen inside signature %"DAP_UINT64_FORMAT_U" is too big ", l_sign->sig_len); DAP_DELETE(l_sign); return NULL; } if( (uint64_t) a_buflen < (l_shift_mem + l_sign->sig_len) ){ - log_it(L_ERROR,"::read_signature_old() Buflen %zd is smaller than all fields together(%" DAP_UINT64_FORMAT_u")", a_buflen, + log_it(L_ERROR,"::read_signature_old() Buflen %zd is smaller than all fields together(%" DAP_UINT64_FORMAT_U")", a_buflen, l_shift_mem + l_sign->sig_len ); DAP_DELETE(l_sign); return NULL; @@ -304,7 +304,7 @@ dilithium_signature_t* dap_enc_dilithium_read_signature_old2(uint8_t *a_buf, siz l_shift_mem += sizeof(unsigned long long); l_sign->sig_data = DAP_NEW_SIZE(unsigned char, l_sign->sig_len); if (!l_sign->sig_data) - log_it(L_ERROR,"::read_signature_old() Can't allocate sig_data %"DAP_UINT64_FORMAT_u" size", l_sign->sig_len); + log_it(L_ERROR,"::read_signature_old() Can't allocate sig_data %"DAP_UINT64_FORMAT_U" size", l_sign->sig_len); memcpy(l_sign->sig_data, a_buf + l_shift_mem, l_sign->sig_len); return l_sign; } @@ -440,7 +440,7 @@ dilithium_public_key_t* dap_enc_dilithium_read_public_key(const uint8_t *a_buf, if(l_buflen != (uint64_t) a_buflen){ if (l_buflen <<32 >>32 != (uint64_t) a_buflen ){ - log_it(L_ERROR,"::read_public_key() Buflen field inside buffer is %"DAP_UINT64_FORMAT_u" when expected to be %" DAP_UINT64_FORMAT_u, + log_it(L_ERROR,"::read_public_key() Buflen field inside buffer is %"DAP_UINT64_FORMAT_U" when expected to be %" DAP_UINT64_FORMAT_U, l_buflen,(uint64_t) a_buflen); return NULL; }else diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index e9ed600d2e3eb45dd6fcdd92282bba4197b1f6c2..fc58b1836335fe1f11f3697d1eaee835d2d3c3f2 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -599,7 +599,7 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_upli dap_events_socket_uuid_t * l_ev_uuid_ptr = DAP_NEW_Z(dap_events_socket_uuid_t); *l_ev_uuid_ptr = l_ev_socket->uuid; if (!dap_timerfd_start_on_worker(l_http_pvt->worker,s_client_timeout_ms, s_timer_timeout_check, l_ev_uuid_ptr)) { - log_it(L_ERROR,"Can't run timer on worker %u for esocket uuid %"DAP_UINT64_FORMAT_u" for timeout check during connection attempt ", + log_it(L_ERROR,"Can't run timer on worker %u for esocket uuid %"DAP_UINT64_FORMAT_U" for timeout check during connection attempt ", l_http_pvt->worker->id, *l_ev_uuid_ptr); DAP_DEL_Z(l_ev_uuid_ptr) } @@ -622,7 +622,7 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_upli dap_events_socket_uuid_t * l_ev_uuid_ptr = DAP_NEW_Z(dap_events_socket_uuid_t); *l_ev_uuid_ptr = l_ev_socket->uuid; if(dap_timerfd_start_on_worker(l_http_pvt->worker,s_client_timeout_ms, s_timer_timeout_check,l_ev_uuid_ptr) == NULL){ - log_it(L_ERROR,"Can't run timer on worker %u for esocket uuid %"DAP_UINT64_FORMAT_u" for timeout check during connection attempt ", + log_it(L_ERROR,"Can't run timer on worker %u for esocket uuid %"DAP_UINT64_FORMAT_U" for timeout check during connection attempt ", l_http_pvt->worker->id, *l_ev_uuid_ptr); DAP_DEL_Z(l_ev_uuid_ptr); } diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 58349cc6a5df98a64ee225eb72fa3561f504dec1..c24307df3b27664f748a76c623445500183a2b7a 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -218,7 +218,7 @@ static void s_stream_connected(dap_client_pvt_t * a_client_pvt) assert(a_client_pvt->stream_es); *l_es_uuid_ptr = a_client_pvt->stream_es->uuid; if( dap_timerfd_start_on_worker(a_client_pvt->stream_es->worker, s_client_timeout_read_after_connect_seconds * 1000, s_stream_timer_timeout_after_connected_check ,l_es_uuid_ptr) == NULL ){ - log_it(L_ERROR,"Can't run timer for stream after connect check for esocket uuid %"DAP_UINT64_FORMAT_u" "); + log_it(L_ERROR,"Can't run timer for stream after connect check for esocket uuid %"DAP_UINT64_FORMAT_U" "); DAP_DEL_Z(l_es_uuid_ptr); } } @@ -258,7 +258,7 @@ static bool s_stream_timer_timeout_check(void * a_arg) log_it(L_DEBUG,"Socket %d is connected, close check timer", l_es->socket); }else if(s_debug_more) - log_it(L_DEBUG,"Esocket %"DAP_UINT64_FORMAT_u" is finished, close check timer", *l_es_uuid_ptr); + log_it(L_DEBUG,"Esocket %"DAP_UINT64_FORMAT_U" is finished, close check timer", *l_es_uuid_ptr); DAP_DEL_Z(l_es_uuid_ptr) return false; @@ -302,7 +302,7 @@ static bool s_stream_timer_timeout_after_connected_check(void * a_arg) }else if(s_debug_more) - log_it(L_DEBUG,"Streaming socket %"DAP_UINT64_FORMAT_u" is finished, close check timer", *l_es_uuid_ptr); + log_it(L_DEBUG,"Streaming socket %"DAP_UINT64_FORMAT_U" is finished, close check timer", *l_es_uuid_ptr); DAP_DEL_Z(l_es_uuid_ptr); return false; diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 80c31d96ca1b6a4c18402c7ea1b9f0010c937d50..d3c11a8f20604c1bc3c2f1123777e6c227f81fee 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -1576,7 +1576,7 @@ void dap_events_socket_worker_poll_update_unsafe(dap_events_socket_t * a_esocket } } if (l_is_error && l_errno == EBADF){ - log_it(L_ATT,"Poll update: socket %d (%p ) disconnected, rise CLOSE flag to remove from queue, lost %"DAP_UINT64_FORMAT_u":%" DAP_UINT64_FORMAT_u + log_it(L_ATT,"Poll update: socket %d (%p ) disconnected, rise CLOSE flag to remove from queue, lost %"DAP_UINT64_FORMAT_U":%" DAP_UINT64_FORMAT_U " bytes",a_esocket->socket,a_esocket,a_esocket->buf_in_size,a_esocket->buf_out_size); a_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; a_esocket->buf_in_size = a_esocket->buf_out_size = 0; // Reset everything from buffer, we close it now all @@ -1628,7 +1628,7 @@ void dap_events_socket_set_readable_unsafe( dap_events_socket_t *a_esocket, bool l_errbuf[0]=0; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); if (l_errno == EBADF){ - log_it(L_ATT,"Set readable: socket %d (%p ) disconnected, rise CLOSE flag to remove from queue, lost %"DAP_UINT64_FORMAT_u":%" DAP_UINT64_FORMAT_u + log_it(L_ATT,"Set readable: socket %d (%p ) disconnected, rise CLOSE flag to remove from queue, lost %"DAP_UINT64_FORMAT_U":%" DAP_UINT64_FORMAT_U " bytes",a_esocket->socket,a_esocket,a_esocket->buf_in_size,a_esocket->buf_out_size); a_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; a_esocket->buf_in_size = a_esocket->buf_out_size = 0; // Reset everything from buffer, we close it now all @@ -1696,7 +1696,7 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool l_errbuf[0]=0; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); if (l_errno == EBADF){ - log_it(L_ATT,"Set writable: socket %d (%p ) disconnected, rise CLOSE flag to remove from queue, lost %"DAP_UINT64_FORMAT_u":%" DAP_UINT64_FORMAT_u + log_it(L_ATT,"Set writable: socket %d (%p ) disconnected, rise CLOSE flag to remove from queue, lost %"DAP_UINT64_FORMAT_U":%" DAP_UINT64_FORMAT_U " bytes",a_esocket->socket,a_esocket,a_esocket->buf_in_size,a_esocket->buf_out_size); a_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; a_esocket->buf_in_size = a_esocket->buf_out_size = 0; // Reset everything from buffer, we close it now all @@ -1917,7 +1917,7 @@ void dap_events_socket_remove_and_delete_mt(dap_worker_t * a_w, dap_events_sock *l_es_uuid_ptr = a_es_uuid; if(dap_events_socket_queue_ptr_send( a_w->queue_es_delete, l_es_uuid_ptr ) != 0 ){ - log_it(L_ERROR,"Can't send %"DAP_UINT64_FORMAT_u" uuid in queue",a_es_uuid); + log_it(L_ERROR,"Can't send %"DAP_UINT64_FORMAT_U" uuid in queue",a_es_uuid); DAP_DELETE(l_es_uuid_ptr); } } diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 9e08d0c27880b0d3ae6426dc21f52aa2ae86252c..a5e90d5a9268b4b354a09cf5e9fe7ed3b7ef26d7 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -982,7 +982,7 @@ static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg //l_es->flags |= DAP_SOCK_SIGNAL_CLOSE; // Send signal to socket to kill dap_events_socket_remove_and_delete_unsafe(l_es,false); }else - log_it(L_INFO, "While we were sending the delete() message, esocket %"DAP_UINT64_FORMAT_u" has been disconnected ", l_es_uuid_ptr); + log_it(L_INFO, "While we were sending the delete() message, esocket %"DAP_UINT64_FORMAT_U" has been disconnected ", l_es_uuid_ptr); DAP_DELETE(l_es_uuid_ptr); } diff --git a/dap-sdk/net/server/http_server/http_client/dap_http_client.c b/dap-sdk/net/server/http_server/http_client/dap_http_client.c index f3bbcc5f9330b3ba73d43dec4cd1057f22e8d3af..9ddf256b080cc3567c8bab71eca0fc5669e4035e 100644 --- a/dap-sdk/net/server/http_server/http_client/dap_http_client.c +++ b/dap-sdk/net/server/http_server/http_client/dap_http_client.c @@ -647,9 +647,9 @@ void dap_http_client_out_header_generate(dap_http_client_t *a_http_client) log_it(L_DEBUG,"output: Content-Type = '%s'",a_http_client->out_content_type); } if ( a_http_client->out_content_length ) { - dap_snprintf(buf,sizeof(buf),"%"DAP_UINT64_FORMAT_u"",a_http_client->out_content_length); + dap_snprintf(buf,sizeof(buf),"%"DAP_UINT64_FORMAT_U"",a_http_client->out_content_length); dap_http_header_add(&a_http_client->out_headers,"Content-Length",buf); - log_it(L_DEBUG,"output: Content-Length = %"DAP_UINT64_FORMAT_u"",a_http_client->out_content_length); + log_it(L_DEBUG,"output: Content-Length = %"DAP_UINT64_FORMAT_U"",a_http_client->out_content_length); } }else if (s_debug_http) diff --git a/modules/chain/dap_chain.c b/modules/chain/dap_chain.c index 812901cf829b0c6bea89b451cc77a7a256664b83..54f48db0781a2dc516e69d03a6f1b7d18369583e 100644 --- a/modules/chain/dap_chain.c +++ b/modules/chain/dap_chain.c @@ -300,7 +300,7 @@ dap_chain_t * dap_chain_load_from_cfg(dap_ledger_t* a_ledger, const char * a_cha if ( (l_chain_id_str = dap_config_get_item_str(l_cfg,"chain","id")) != NULL ){ if ( sscanf(l_chain_id_str,"0x%"DAP_UINT64_FORMAT_X,& l_chain_id_u ) !=1 ){ if ( sscanf(l_chain_id_str,"0x%"DAP_UINT64_FORMAT_x,&l_chain_id_u) !=1 ) { - if ( sscanf(l_chain_id_str,"%"DAP_UINT64_FORMAT_u,&l_chain_id_u ) !=1 ){ + if ( sscanf(l_chain_id_str,"%"DAP_UINT64_FORMAT_U,&l_chain_id_u ) !=1 ){ log_it (L_ERROR,"Can't recognize '%s' string as chain net id, hex or dec",l_chain_id_str); dap_config_close(l_cfg); return NULL; diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 86a353e7ec4526bdfec864dab5b2ab82b325b934..5822e919b8e908cd68f1ad39d24bae0ca51e114b 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -85,6 +85,7 @@ struct sync_request struct{ dap_db_log_list_t *db_log; // db log dap_list_t *db_iter; + char *sync_group; } gdb; struct{ dap_chain_atom_iter_t *request_atom_iter; @@ -396,7 +397,7 @@ static bool s_sync_out_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_ar if(l_db_log) { if (s_debug_more) - log_it(L_DEBUG, "Sync out gdb proc, requested %"DAP_UINT64_FORMAT_u" transactions from address "NODE_ADDR_FP_STR, + log_it(L_DEBUG, "Sync out gdb proc, requested %"DAP_UINT64_FORMAT_U" transactions from address "NODE_ADDR_FP_STR, l_db_log->items_number, NODE_ADDR_FP_ARGS_S(l_sync_request->request.node_addr)); l_sync_request->gdb.db_log = l_db_log; dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_gdb_first_worker_callback, l_sync_request ); @@ -572,19 +573,51 @@ static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_worker, void *a_a struct sync_request *l_sync_request = (struct sync_request *) a_arg; dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(a_worker), l_sync_request->ch_uuid); - if( l_ch == NULL ){ + if( l_ch == NULL ) { log_it(L_INFO,"Client disconnected before we sent the reply"); - DAP_DELETE(l_sync_request); - return; + } else { + dap_stream_ch_chain_pkt_write_error_unsafe(l_ch, l_sync_request->request_hdr.net_id.uint64, + l_sync_request->request_hdr.chain_id.uint64, + l_sync_request->request_hdr.cell_id.uint64, + "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED"); } - - dap_stream_ch_chain_pkt_write_error_unsafe(l_ch, l_sync_request->request_hdr.net_id.uint64, - l_sync_request->request_hdr.chain_id.uint64, - l_sync_request->request_hdr.cell_id.uint64, - "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED"); DAP_DELETE(l_sync_request); } +static void s_gdb_sync_tsd_worker_callback(dap_worker_t *a_worker, void *a_arg) +{ + struct sync_request *l_sync_request = (struct sync_request *) a_arg; + + dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(a_worker), l_sync_request->ch_uuid); + if( l_ch == NULL ) { + log_it(L_INFO,"Client disconnected before we sent the reply"); + } else { + size_t l_gr_len = strlen(l_sync_request->gdb.sync_group) + 1; + size_t l_data_size = 2 * sizeof(uint64_t) + l_gr_len; + dap_tsd_t *l_tsd_rec = DAP_NEW_SIZE(dap_tsd_t, l_data_size + sizeof(dap_tsd_t)); + l_tsd_rec->type = DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_LAST_ID; + l_tsd_rec->size = l_data_size; + uint64_t l_node_addr = dap_chain_net_get_cur_addr_int(dap_chain_net_by_id(l_sync_request->request_hdr.net_id)); + void *l_data_ptr = l_tsd_rec->data; + memcpy(l_data_ptr, &l_node_addr, sizeof(uint64_t)); + l_data_ptr += sizeof(uint64_t); + memcpy(l_data_ptr, &l_sync_request->request.id_end, sizeof(uint64_t)); + l_data_ptr += sizeof(uint64_t); + memcpy(l_data_ptr, l_sync_request->gdb.sync_group, l_gr_len); + log_it(L_INFO, "Allocated %d bytes, copied %d bytes, sent %d bytes", + l_data_size + sizeof(dap_tsd_t), + (long int)((byte_t *)l_data_ptr - l_tsd_rec->data) + l_gr_len, + l_tsd_rec->size + sizeof(dap_tsd_t)); + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD, + l_sync_request->request_hdr.net_id.uint64, + l_sync_request->request_hdr.chain_id.uint64, + l_sync_request->request_hdr.cell_id.uint64, + l_tsd_rec, l_tsd_rec->size + sizeof(dap_tsd_t)); + DAP_DELETE(l_tsd_rec); + } + DAP_DELETE(l_sync_request->gdb.sync_group); + DAP_DELETE(l_sync_request); +} /** * @brief s_gdb_in_pkt_callback @@ -610,9 +643,28 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) log_it(L_WARNING, "In: GLOBAL_DB parse: packet in list with NULL data(pkt_data_size:%zd)", l_pkt_item->pkt_data_size); } + uint64_t l_last_id = 0; + char *l_last_group = NULL; + char l_last_type = '\0'; + bool l_group_changed = false; + for (size_t i = 0; i < l_data_obj_count; i++) { // obj to add dap_store_obj_t *l_obj = l_store_obj + i; + l_group_changed = l_last_group && (strcmp(l_last_group, l_obj->group) || l_last_type != l_obj->type); + // Send remote side notification about received obj + if (l_sync_request->request.node_addr.uint64 && + (l_group_changed || i == l_data_obj_count - 1)) { + struct sync_request *l_sync_req_tsd = DAP_DUP(l_sync_request); + l_sync_req_tsd->request.id_end = l_last_id; + l_sync_req_tsd->gdb.sync_group = l_obj->type == 'a' ? dap_strdup(l_obj->group) : + dap_strdup_printf("%s.del", l_obj->group); + dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, + s_gdb_sync_tsd_worker_callback, l_sync_req_tsd); + } + l_last_id = l_obj->id; + l_last_group = l_obj->group; + l_last_type = l_obj->type; //check whether to apply the received data into the database bool l_apply = false; // timestamp for exist obj @@ -653,8 +705,7 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) } // save data to global_db if(!dap_chain_global_db_obj_save(l_obj, 1)) { - struct sync_request *l_sync_req_err = DAP_NEW_Z(struct sync_request); - memcpy(l_sync_req_err, l_sync_request, sizeof(struct sync_request)); + struct sync_request *l_sync_req_err = DAP_DUP(l_sync_request); dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_gdb_in_pkt_error_worker_callback, l_sync_req_err); } else { @@ -662,8 +713,9 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) log_it(L_DEBUG, "Added new GLOBAL_DB synchronization record"); } } - if(l_store_obj) + if(l_store_obj) { dap_store_obj_free(l_store_obj, l_data_obj_count); + } } else { log_it(L_WARNING, "In proc thread got GDB stream ch packet with zero data"); } @@ -782,8 +834,21 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; // Response with metadata organized in TSD - case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD:{ - if (s_debug_more) + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD: { + if (l_chain_pkt_data_size) { + dap_tsd_t *l_tsd_rec = (dap_tsd_t *)l_chain_pkt->data; + if (l_tsd_rec->type != DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_LAST_ID || + l_tsd_rec->size < 2 * sizeof(uint64_t) + 2) { + break; + } + void *l_data_ptr = l_tsd_rec->data; + uint64_t l_node_addr = *(uint64_t *)l_data_ptr; + l_data_ptr += sizeof(uint64_t); + uint64_t l_last_id = *(uint64_t *)l_data_ptr; + l_data_ptr += sizeof(uint64_t); + char *l_group = (char *)l_data_ptr; + dap_db_set_last_id_remote(l_node_addr, l_last_id, l_group); + } else if (s_debug_more) log_it(L_DEBUG, "Global DB TSD packet detected"); } break; @@ -1333,11 +1398,6 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) }*/ l_skip_count++; } else { - if (l_ch_chain->request.node_addr.uint64) { - dap_db_set_last_id_remote(l_ch_chain->request.node_addr.uint64, - dap_store_packet_get_id(l_obj->pkt), - dap_store_packet_get_group(l_obj->pkt)); - } l_hash_item = DAP_NEW(dap_stream_ch_chain_hash_item_t); memcpy(&l_hash_item->hash, &l_obj->hash, sizeof(dap_chain_hash_fast_t)); l_hash_item->size = l_obj->pkt->data_size; @@ -1365,7 +1425,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); } else { - log_it( L_INFO,"Syncronized database: items syncronyzed %"DAP_UINT64_FORMAT_u" from %"DAP_UINT64_FORMAT_u"", + log_it( L_INFO,"Syncronized database: items syncronyzed %"DAP_UINT64_FORMAT_U" from %"DAP_UINT64_FORMAT_U"", l_ch_chain->stats_request_gdb_processed, dap_db_log_list_get_count(l_ch_chain->request_db_log)); // last message dap_stream_ch_chain_sync_request_t l_request = {}; @@ -1477,7 +1537,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); - log_it( L_INFO,"Synced: %"DAP_UINT64_FORMAT_u" atoms processed", l_ch_chain->stats_request_atoms_processed); + log_it( L_INFO,"Synced: %"DAP_UINT64_FORMAT_U" atoms processed", l_ch_chain->stats_request_atoms_processed); dap_stream_ch_chain_go_idle(l_ch_chain); if (l_ch_chain->callback_notify_packet_out) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, NULL, diff --git a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h index cee15365cba2944c2b0521e33949ee616f3e060a..0cf3ab302157a88703036efbe9e2f88b75fa59e3 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h +++ b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h @@ -75,6 +75,7 @@ #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_COUNT 0x0002 // Items count #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_HASH_LAST 0x0003 // Hash of last(s) item #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_HASH_FIRST 0x0004 // Hash of first(s) item +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_LAST_ID 0x0100 // Last ID of GDB synced group typedef enum dap_stream_ch_chain_state{ CHAIN_STATE_IDLE=0, diff --git a/modules/global-db/dap_chain_global_db_hist.c b/modules/global-db/dap_chain_global_db_hist.c index 2fea00e1c06b8e2360a4746a9c70179f5051fe2a..66a46cca55fa61f821a68b7150af9448263f9635 100644 --- a/modules/global-db/dap_chain_global_db_hist.c +++ b/modules/global-db/dap_chain_global_db_hist.c @@ -10,8 +10,10 @@ #include "dap_chain_datum_tx_items.h" #include "dap_chain_global_db_remote.h" #include "dap_chain_global_db_hist.h" - #include "uthash.h" + +//#define GDB_SYNC_ALWAYS_FROM_ZERO + // for dap_db_history() typedef struct dap_tx_data{ dap_chain_hash_fast_t tx_hash; @@ -201,6 +203,7 @@ static void *s_list_thread_proc(void *arg) l_dap_db_log_list->list_read = l_list; pthread_mutex_unlock(&l_dap_db_log_list->list_mutex); } + if (l_del_group_name_replace) DAP_DELETE(l_del_group_name_replace); } diff --git a/modules/global-db/dap_chain_global_db_remote.c b/modules/global-db/dap_chain_global_db_remote.c index 8075026a12d0dd7fc54ecb491d1a546d9b1b0f3d..8954518599b1ecd341d8a9f1822c254f3db3e887 100644 --- a/modules/global-db/dap_chain_global_db_remote.c +++ b/modules/global-db/dap_chain_global_db_remote.c @@ -104,7 +104,7 @@ uint64_t dap_db_get_cur_node_addr(char *a_net_name) */ bool dap_db_set_last_id_remote(uint64_t a_node_addr, uint64_t a_id, char *a_group) { - //log_it( L_DEBUG, "Node 0x%016X set last synced id %"DAP_UINT64_FORMAT_u"", a_node_addr, a_id); + //log_it( L_DEBUG, "Node 0x%016X set last synced id %"DAP_UINT64_FORMAT_U"", a_node_addr, a_id); char *l_node_addr_str = dap_strdup_printf("%ju%s", a_node_addr, a_group); bool l_ret = dap_chain_global_db_gr_set(l_node_addr_str, &a_id, sizeof(uint64_t), GROUP_LOCAL_NODE_LAST_ID); @@ -183,27 +183,6 @@ dap_store_obj_pkt_t *dap_store_packet_multiple(dap_store_obj_pkt_t *a_old_pkt, d return a_old_pkt; } -char *dap_store_packet_get_group(dap_store_obj_pkt_t *a_pkt) -{ - uint16_t l_gr_len; - memcpy(&l_gr_len, a_pkt->data + sizeof(uint32_t), sizeof(uint16_t)); - char *l_ret_str = DAP_NEW_SIZE(char, l_gr_len + 1); - size_t l_gr_offset = sizeof(uint32_t) + sizeof(uint16_t); - memcpy(l_ret_str, a_pkt->data + l_gr_offset, l_gr_len); - l_ret_str[l_gr_len] = '\0'; - return l_ret_str; -} - -uint64_t dap_store_packet_get_id(dap_store_obj_pkt_t *a_pkt) -{ - uint16_t l_gr_len; - memcpy(&l_gr_len, a_pkt->data + sizeof(uint32_t), sizeof(uint16_t)); - size_t l_id_offset = sizeof(uint32_t) + sizeof(uint16_t) + l_gr_len; - uint64_t l_ret_id; - memcpy(&l_ret_id, a_pkt->data + l_id_offset, sizeof(uint64_t)); - return l_ret_id; -} - void dap_store_packet_change_id(dap_store_obj_pkt_t *a_pkt, uint64_t a_id) { uint16_t l_gr_len; diff --git a/modules/net/dap_chain_node_cli_cmd_tx.c b/modules/net/dap_chain_node_cli_cmd_tx.c index ad8ed919257069b3b75ff44fccf3212041d60745..d3e71217389ce41f7d2db33a268e1eeab3f091a6 100644 --- a/modules/net/dap_chain_node_cli_cmd_tx.c +++ b/modules/net/dap_chain_node_cli_cmd_tx.c @@ -129,7 +129,7 @@ void _dap_chain_datum_tx_out_data(dap_chain_datum_tx_t *a_datum, break; case TX_ITEM_TYPE_OUT: dap_string_append_printf(a_str_out, "\t OUT:\n" - "\t\t Value: %s (%"DAP_UINT64_FORMAT_u")\n" + "\t\t Value: %s (%"DAP_UINT64_FORMAT_U")\n" "\t\t Address: %s\n", dap_chain_balance_to_coins(dap_chain_uint128_from( ((dap_chain_tx_out_t*)item)->header.value) @@ -177,7 +177,7 @@ void _dap_chain_datum_tx_out_data(dap_chain_datum_tx_t *a_datum, "\t\t\t units: 0x%016"DAP_UINT64_FORMAT_x"\n" "\t\t\t uid: 0x%016"DAP_UINT64_FORMAT_x"\n" "\t\t\t units type: %s \n" - "\t\t\t value: %s (%"DAP_UINT64_FORMAT_u")\n", + "\t\t\t value: %s (%"DAP_UINT64_FORMAT_U")\n", ((dap_chain_datum_tx_receipt_t*)item)->size, ((dap_chain_datum_tx_receipt_t*)item)->exts_size, ((dap_chain_datum_tx_receipt_t*)item)->receipt_info.units, @@ -245,7 +245,7 @@ void _dap_chain_datum_tx_out_data(dap_chain_datum_tx_t *a_datum, dap_string_append_printf(a_str_out, "\t OUT COND:\n" "\t Header:\n" "\t\t\t ts_expires: %s\t" - "\t\t\t value: %s (%"DAP_UINT64_FORMAT_u")\n" + "\t\t\t value: %s (%"DAP_UINT64_FORMAT_U")\n" "\t\t\t subtype: %s\n" "\t\t SubType:\n", dap_ctime_r((time_t*)((dap_chain_tx_out_cond_t*)item)->header.ts_expires, l_tmp_buf), @@ -260,7 +260,7 @@ void _dap_chain_datum_tx_out_data(dap_chain_datum_tx_t *a_datum, dap_string_append_printf(a_str_out, "\t\t\t unit: 0x%08x\n" "\t\t\t uid: 0x%016"DAP_UINT64_FORMAT_x"\n" "\t\t\t pkey: %s\n" - "\t\t\t max price: %s (%"DAP_UINT64_FORMAT_u") \n", + "\t\t\t max price: %s (%"DAP_UINT64_FORMAT_U") \n", ((dap_chain_tx_out_cond_t*)item)->subtype.srv_pay.unit.uint32, ((dap_chain_tx_out_cond_t*)item)->subtype.srv_pay.srv_uid.uint64, l_hash_str_tmp, @@ -284,7 +284,7 @@ void _dap_chain_datum_tx_out_data(dap_chain_datum_tx_t *a_datum, dap_string_append_printf(a_str_out, "\t\t\t uid: 0x%016"DAP_UINT64_FORMAT_x"\n" "\t\t\t net id: 0x%016"DAP_UINT64_FORMAT_x"\n" "\t\t\t token: %s\n" - "\t\t\t value: %s (%"DAP_UINT64_FORMAT_u")\n", + "\t\t\t value: %s (%"DAP_UINT64_FORMAT_U")\n", ((dap_chain_tx_out_cond_t*)item)->subtype.srv_xchange.srv_uid.uint64, ((dap_chain_tx_out_cond_t*)item)->subtype.srv_xchange.net_id.uint64, ((dap_chain_tx_out_cond_t*)item)->subtype.srv_xchange.token, @@ -301,7 +301,7 @@ void _dap_chain_datum_tx_out_data(dap_chain_datum_tx_t *a_datum, dap_string_append_printf(a_str_out, "\t OUT EXT:\n" "\t\t Addr: %s\n" "\t\t Token: %s\n" - "\t\t Value: %s (%"DAP_UINT64_FORMAT_u")\n", + "\t\t Value: %s (%"DAP_UINT64_FORMAT_U")\n", dap_chain_addr_to_str(&((dap_chain_tx_out_ext_t*)item)->addr), ((dap_chain_tx_out_ext_t*)item)->token, dap_chain_balance_to_coins(dap_chain_uint128_from( diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 59e71bf6363bbf1319e2c8cdc7c6cdf70aa150e7..d7dba4dc714af081603d2becfd615b06c7e04161 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -278,11 +278,9 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) l_node_client->esocket_uuid = l_stream->esocket->uuid; l_node_client->stream_worker = l_stream->stream_worker; if (l_node_client->keep_connection) { - dap_events_socket_uuid_t *l_uuid = DAP_NEW(dap_events_socket_uuid_t); - DAP_DUP(l_uuid, &l_node_client->uuid); + dap_events_socket_uuid_t *l_uuid = DAP_DUP(&l_node_client->uuid); dap_worker_exec_callback_on(l_stream->esocket->worker, s_node_client_connected_synchro_start_callback, l_uuid); - dap_events_socket_uuid_t *l_uuid_timer = DAP_NEW(dap_events_socket_uuid_t); - DAP_DUP(l_uuid_timer, &l_node_client->uuid); + dap_events_socket_uuid_t *l_uuid_timer = DAP_DUP(&l_node_client->uuid); dap_timerfd_start_on_worker(l_stream->esocket->worker, s_timer_update_states * 1000, s_timer_update_states_callback, l_uuid_timer); } }