diff --git a/dap-sdk/core/include/dap_common.h b/dap-sdk/core/include/dap_common.h index 4e5939c65d22b15d83b9f892e0b248e47b2a74fb..1b8b665af8f52b5959292e0816ecae2c994f5f01 100755 --- a/dap-sdk/core/include/dap_common.h +++ b/dap-sdk/core/include/dap_common.h @@ -122,8 +122,7 @@ typedef uint8_t byte_t; #define DAP_NEW_Z_SIZE(a, b) DAP_CAST_REINT(a, rpcalloc(1,b)) #define DAP_REALLOC(a, b) rprealloc(a,b) #define DAP_DELETE(a) rpfree(a) - #define DAP_DUP(a) ( __typeof(a) ret = memcpy(ret,a,sizeof(*a)) ) - #define DAP_DUP_SIZE(a,b) ( __typeof(a) ret = memcpy(ret,a,b) ) + #define DAP_DUP(a) memcpy(rpmalloc(sizeof(*a)), a, sizeof(*a)) #else #define DAP_MALLOC(a) malloc(a) #define DAP_FREE(a) free(a) @@ -139,8 +138,7 @@ typedef uint8_t byte_t; #define DAP_NEW_Z_SIZE(a, b) DAP_CAST_REINT(a, calloc(1,b)) #define DAP_REALLOC(a, b) realloc(a,b) #define DAP_DELETE(a) free(a) - #define DAP_DUP(a) ( __typeof(a) ret = memcpy(ret,a,sizeof(*a)) ) - #define DAP_DUP_SIZE(a,b) ( __typeof(a) memcpy(ret,a,b) ) + #define DAP_DUP(a) memcpy(malloc(sizeof(*a)), a, sizeof(*a)) #endif #define DAP_DEL_Z(a) if(a) { DAP_DELETE(a); a=NULL;} diff --git a/dap-sdk/core/include/dap_tsd.h b/dap-sdk/core/include/dap_tsd.h index bb4594b51a5809fb99e138ea36d666256fe18956..c96385913f9e5f62ccadd15ae07845b17a682ccd 100644 --- a/dap-sdk/core/include/dap_tsd.h +++ b/dap-sdk/core/include/dap_tsd.h @@ -23,11 +23,12 @@ This file is part of DAP SDK the open source project #pragma once #include "dap_common.h" #include "dap_strfuncs.h" -typedef struct dap_tsd{ + +typedef struct dap_tsd { uint16_t type; uint32_t size; byte_t data[]; -} dap_tsd_t; +} DAP_ALIGN_PACKED dap_tsd_t; dap_tsd_t * dap_tsd_create(uint16_t a_type, const void * a_data, size_t a_data_size); dap_tsd_t* dap_tsd_find(byte_t * a_data, size_t a_data_size,uint16_t a_type); diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 89eab836b1cafdffe95d990c09ce602d5fb4b5d8..5822e919b8e908cd68f1ad39d24bae0ca51e114b 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -85,6 +85,7 @@ struct sync_request struct{ dap_db_log_list_t *db_log; // db log dap_list_t *db_iter; + char *sync_group; } gdb; struct{ dap_chain_atom_iter_t *request_atom_iter; @@ -572,19 +573,51 @@ static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_worker, void *a_a struct sync_request *l_sync_request = (struct sync_request *) a_arg; dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(a_worker), l_sync_request->ch_uuid); - if( l_ch == NULL ){ + if( l_ch == NULL ) { log_it(L_INFO,"Client disconnected before we sent the reply"); - DAP_DELETE(l_sync_request); - return; + } else { + dap_stream_ch_chain_pkt_write_error_unsafe(l_ch, l_sync_request->request_hdr.net_id.uint64, + l_sync_request->request_hdr.chain_id.uint64, + l_sync_request->request_hdr.cell_id.uint64, + "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED"); } - - dap_stream_ch_chain_pkt_write_error_unsafe(l_ch, l_sync_request->request_hdr.net_id.uint64, - l_sync_request->request_hdr.chain_id.uint64, - l_sync_request->request_hdr.cell_id.uint64, - "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED"); DAP_DELETE(l_sync_request); } +static void s_gdb_sync_tsd_worker_callback(dap_worker_t *a_worker, void *a_arg) +{ + struct sync_request *l_sync_request = (struct sync_request *) a_arg; + + dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(a_worker), l_sync_request->ch_uuid); + if( l_ch == NULL ) { + log_it(L_INFO,"Client disconnected before we sent the reply"); + } else { + size_t l_gr_len = strlen(l_sync_request->gdb.sync_group) + 1; + size_t l_data_size = 2 * sizeof(uint64_t) + l_gr_len; + dap_tsd_t *l_tsd_rec = DAP_NEW_SIZE(dap_tsd_t, l_data_size + sizeof(dap_tsd_t)); + l_tsd_rec->type = DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_LAST_ID; + l_tsd_rec->size = l_data_size; + uint64_t l_node_addr = dap_chain_net_get_cur_addr_int(dap_chain_net_by_id(l_sync_request->request_hdr.net_id)); + void *l_data_ptr = l_tsd_rec->data; + memcpy(l_data_ptr, &l_node_addr, sizeof(uint64_t)); + l_data_ptr += sizeof(uint64_t); + memcpy(l_data_ptr, &l_sync_request->request.id_end, sizeof(uint64_t)); + l_data_ptr += sizeof(uint64_t); + memcpy(l_data_ptr, l_sync_request->gdb.sync_group, l_gr_len); + log_it(L_INFO, "Allocated %d bytes, copied %d bytes, sent %d bytes", + l_data_size + sizeof(dap_tsd_t), + (long int)((byte_t *)l_data_ptr - l_tsd_rec->data) + l_gr_len, + l_tsd_rec->size + sizeof(dap_tsd_t)); + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD, + l_sync_request->request_hdr.net_id.uint64, + l_sync_request->request_hdr.chain_id.uint64, + l_sync_request->request_hdr.cell_id.uint64, + l_tsd_rec, l_tsd_rec->size + sizeof(dap_tsd_t)); + DAP_DELETE(l_tsd_rec); + } + DAP_DELETE(l_sync_request->gdb.sync_group); + DAP_DELETE(l_sync_request); +} /** * @brief s_gdb_in_pkt_callback @@ -610,9 +643,28 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) log_it(L_WARNING, "In: GLOBAL_DB parse: packet in list with NULL data(pkt_data_size:%zd)", l_pkt_item->pkt_data_size); } + uint64_t l_last_id = 0; + char *l_last_group = NULL; + char l_last_type = '\0'; + bool l_group_changed = false; + for (size_t i = 0; i < l_data_obj_count; i++) { // obj to add dap_store_obj_t *l_obj = l_store_obj + i; + l_group_changed = l_last_group && (strcmp(l_last_group, l_obj->group) || l_last_type != l_obj->type); + // Send remote side notification about received obj + if (l_sync_request->request.node_addr.uint64 && + (l_group_changed || i == l_data_obj_count - 1)) { + struct sync_request *l_sync_req_tsd = DAP_DUP(l_sync_request); + l_sync_req_tsd->request.id_end = l_last_id; + l_sync_req_tsd->gdb.sync_group = l_obj->type == 'a' ? dap_strdup(l_obj->group) : + dap_strdup_printf("%s.del", l_obj->group); + dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, + s_gdb_sync_tsd_worker_callback, l_sync_req_tsd); + } + l_last_id = l_obj->id; + l_last_group = l_obj->group; + l_last_type = l_obj->type; //check whether to apply the received data into the database bool l_apply = false; // timestamp for exist obj @@ -653,8 +705,7 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) } // save data to global_db if(!dap_chain_global_db_obj_save(l_obj, 1)) { - struct sync_request *l_sync_req_err = DAP_NEW_Z(struct sync_request); - memcpy(l_sync_req_err, l_sync_request, sizeof(struct sync_request)); + struct sync_request *l_sync_req_err = DAP_DUP(l_sync_request); dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_gdb_in_pkt_error_worker_callback, l_sync_req_err); } else { @@ -662,8 +713,9 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) log_it(L_DEBUG, "Added new GLOBAL_DB synchronization record"); } } - if(l_store_obj) + if(l_store_obj) { dap_store_obj_free(l_store_obj, l_data_obj_count); + } } else { log_it(L_WARNING, "In proc thread got GDB stream ch packet with zero data"); } @@ -782,8 +834,21 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; // Response with metadata organized in TSD - case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD:{ - if (s_debug_more) + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD: { + if (l_chain_pkt_data_size) { + dap_tsd_t *l_tsd_rec = (dap_tsd_t *)l_chain_pkt->data; + if (l_tsd_rec->type != DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_LAST_ID || + l_tsd_rec->size < 2 * sizeof(uint64_t) + 2) { + break; + } + void *l_data_ptr = l_tsd_rec->data; + uint64_t l_node_addr = *(uint64_t *)l_data_ptr; + l_data_ptr += sizeof(uint64_t); + uint64_t l_last_id = *(uint64_t *)l_data_ptr; + l_data_ptr += sizeof(uint64_t); + char *l_group = (char *)l_data_ptr; + dap_db_set_last_id_remote(l_node_addr, l_last_id, l_group); + } else if (s_debug_more) log_it(L_DEBUG, "Global DB TSD packet detected"); } break; @@ -1333,11 +1398,6 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) }*/ l_skip_count++; } else { - if (l_ch_chain->request.node_addr.uint64) { - dap_db_set_last_id_remote(l_ch_chain->request.node_addr.uint64, - dap_store_packet_get_id(l_obj->pkt), - dap_store_packet_get_group(l_obj->pkt)); - } l_hash_item = DAP_NEW(dap_stream_ch_chain_hash_item_t); memcpy(&l_hash_item->hash, &l_obj->hash, sizeof(dap_chain_hash_fast_t)); l_hash_item->size = l_obj->pkt->data_size; diff --git a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h index cee15365cba2944c2b0521e33949ee616f3e060a..0cf3ab302157a88703036efbe9e2f88b75fa59e3 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h +++ b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h @@ -75,6 +75,7 @@ #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_COUNT 0x0002 // Items count #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_HASH_LAST 0x0003 // Hash of last(s) item #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_HASH_FIRST 0x0004 // Hash of first(s) item +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_LAST_ID 0x0100 // Last ID of GDB synced group typedef enum dap_stream_ch_chain_state{ CHAIN_STATE_IDLE=0, diff --git a/modules/global-db/dap_chain_global_db_hist.c b/modules/global-db/dap_chain_global_db_hist.c index 3839c1cd0c5d99bd5693994422d74bfcaee0ecba..a362e52f6bd972ba74140658f73347d44b113b9a 100644 --- a/modules/global-db/dap_chain_global_db_hist.c +++ b/modules/global-db/dap_chain_global_db_hist.c @@ -10,8 +10,10 @@ #include "dap_chain_datum_tx_items.h" #include "dap_chain_global_db_remote.h" #include "dap_chain_global_db_hist.h" - #include "uthash.h" + +#define GDB_SYNC_ALWAYS_FROM_ZERO + // for dap_db_history() typedef struct dap_tx_data{ dap_chain_hash_fast_t tx_hash; diff --git a/modules/global-db/dap_chain_global_db_remote.c b/modules/global-db/dap_chain_global_db_remote.c index 63be1fe953fa04c37e72e77aa7179ad3c236d2df..8954518599b1ecd341d8a9f1822c254f3db3e887 100644 --- a/modules/global-db/dap_chain_global_db_remote.c +++ b/modules/global-db/dap_chain_global_db_remote.c @@ -183,27 +183,6 @@ dap_store_obj_pkt_t *dap_store_packet_multiple(dap_store_obj_pkt_t *a_old_pkt, d return a_old_pkt; } -char *dap_store_packet_get_group(dap_store_obj_pkt_t *a_pkt) -{ - uint16_t l_gr_len; - memcpy(&l_gr_len, a_pkt->data + sizeof(uint32_t), sizeof(uint16_t)); - char *l_ret_str = DAP_NEW_SIZE(char, l_gr_len + 1); - size_t l_gr_offset = sizeof(uint32_t) + sizeof(uint16_t); - memcpy(l_ret_str, a_pkt->data + l_gr_offset, l_gr_len); - l_ret_str[l_gr_len] = '\0'; - return l_ret_str; -} - -uint64_t dap_store_packet_get_id(dap_store_obj_pkt_t *a_pkt) -{ - uint16_t l_gr_len; - memcpy(&l_gr_len, a_pkt->data + sizeof(uint32_t), sizeof(uint16_t)); - size_t l_id_offset = sizeof(uint32_t) + sizeof(uint16_t) + l_gr_len; - uint64_t l_ret_id; - memcpy(&l_ret_id, a_pkt->data + l_id_offset, sizeof(uint64_t)); - return l_ret_id; -} - void dap_store_packet_change_id(dap_store_obj_pkt_t *a_pkt, uint64_t a_id) { uint16_t l_gr_len;