From 45d225f3694af334bae3d5b74b871caa4811d183 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Fri, 4 Sep 2020 17:22:20 +0300 Subject: [PATCH] [+] Packet split for GDB sync --- modules/channel/chain/dap_stream_ch_chain.c | 16 +-- .../channel/chain/dap_stream_ch_chain_pkt.c | 8 +- .../global-db/dap_chain_global_db_driver.c | 97 ++++++++++--------- modules/global-db/dap_chain_global_db_hist.c | 44 ++------- .../global-db/include/dap_chain_global_db.h | 2 +- .../include/dap_chain_global_db_driver.h | 9 +- modules/net/dap_chain_net.c | 6 +- 7 files changed, 87 insertions(+), 95 deletions(-) diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 8f94aa9bef..aea46e12ca 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -628,12 +628,10 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) bool l_is_stop = true; while(l_obj) { size_t l_item_size_out = 0; - uint8_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out); + dap_list_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out); // Item not found, maybe it has deleted? Then go to the next item if(!l_item || !l_item_size_out) { - //log_it(L_WARNING, "Log pack returned NULL??? data=0x%x (nothing to send) (rest=%d records)", l_obj, l_items_rest); l_item_size_out = 0; - // go to next item l_obj = dap_db_log_list_get(l_db_list); } else { @@ -641,12 +639,16 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) size_t l_items_rest = dap_db_log_list_get_count_rest(l_db_list); log_it(L_INFO, "Send one global_db record data=0x%x len=%d (rest=%d/%d items)", l_item, l_item_size_out, l_items_rest, l_items_total);*/ - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, l_item, l_item_size_out); + for (dap_list_t *l_iter = l_item; l_iter; l_iter = dap_list_next(l_iter)) { + dap_store_obj_pkt_t *l_pkt = (dap_store_obj_pkt_t *)l_iter->data; + uint32_t l_pkt_size = sizeof(dap_store_obj_pkt_t) + l_pkt->data_size; + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, + l_ch_chain->request_net_id, l_ch_chain->request_chain_id, + l_ch_chain->request_cell_id, l_pkt, l_pkt_size); + } l_packet_out = true; l_ch_chain->stats_request_gdb_processed++; - DAP_DELETE(l_item); + dap_list_free_full(l_item, free); // sent the record, another will be sent l_is_stop = false; break; diff --git a/modules/channel/chain/dap_stream_ch_chain_pkt.c b/modules/channel/chain/dap_stream_ch_chain_pkt.c index aff9bb6776..19cc566016 100644 --- a/modules/channel/chain/dap_stream_ch_chain_pkt.c +++ b/modules/channel/chain/dap_stream_ch_chain_pkt.c @@ -49,8 +49,8 @@ dap_stream_ch_chain_state_t dap_stream_ch_chain_pkt_type_to_dap_stream_ch_chain_ * @return */ size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id, - dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, - const void * a_data, size_t a_data_size) + dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, + const void * a_data, size_t a_data_size) { dap_stream_ch_chain_pkt_t * l_chain_pkt; size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr) + a_data_size; @@ -69,8 +69,8 @@ size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_typ } size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id, - dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, - const void * a_data, size_t a_data_size) + dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, + const void * a_data, size_t a_data_size) { dap_stream_ch_chain_pkt_t * l_chain_pkt; size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr) + a_data_size; diff --git a/modules/global-db/dap_chain_global_db_driver.c b/modules/global-db/dap_chain_global_db_driver.c index b1e8058310..fd07a1bda4 100644 --- a/modules/global-db/dap_chain_global_db_driver.c +++ b/modules/global-db/dap_chain_global_db_driver.c @@ -196,53 +196,64 @@ static size_t dap_db_get_size_pdap_store_obj_t(pdap_store_obj_t store_obj) * @param a_size_out[out] size of output structure * @return NULL in case of an error */ -dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp, +dap_list_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp, size_t a_store_obj_count) { - if(!a_store_obj || a_store_obj_count < 1) + if (!a_store_obj || a_store_obj_count < 1) return NULL; - size_t l_data_size_out = sizeof(uint32_t); // size of output data + // calculate output structure size - for(size_t l_q = 0; l_q < a_store_obj_count; ++l_q) + dap_list_t *l_ret = NULL; + dap_store_obj_pkt_t *l_pkt; + uint32_t l_obj_count = 0, l_data_size_out = 0; + for (size_t l_q = 0; l_q < a_store_obj_count; ++l_q) { l_data_size_out += dap_db_get_size_pdap_store_obj_t(&a_store_obj[l_q]); - - dap_store_obj_pkt_t *l_pkt = DAP_NEW_Z_SIZE(dap_store_obj_pkt_t, sizeof(dap_store_obj_pkt_t) + l_data_size_out); - l_pkt->data_size = l_data_size_out; - l_pkt->timestamp = a_timestamp; - uint64_t l_offset = 0; - uint32_t l_count = (uint32_t) a_store_obj_count; - memcpy(l_pkt->data + l_offset, &l_count, sizeof(uint32_t)); - l_offset += sizeof(uint32_t); - for(size_t l_q = 0; l_q < a_store_obj_count; ++l_q) { - dap_store_obj_t obj = a_store_obj[l_q]; - //uint16_t section_size = (uint16_t) dap_strlen(obj.section); - uint16_t group_size = (uint16_t) dap_strlen(obj.group); - uint16_t key_size = (uint16_t) dap_strlen(obj.key); - memcpy(l_pkt->data + l_offset, &obj.type, sizeof(int)); - l_offset += sizeof(int); - //memcpy(l_pkt->data + l_offset, §ion_size, sizeof(uint16_t)); - //l_offset += sizeof(uint16_t); - //memcpy(l_pkt->data + l_offset, obj.section, section_size); - //l_offset += section_size; - memcpy(l_pkt->data + l_offset, &group_size, sizeof(uint16_t)); - l_offset += sizeof(uint16_t); - memcpy(l_pkt->data + l_offset, obj.group, group_size); - l_offset += group_size; - memcpy(l_pkt->data + l_offset, &obj.id, sizeof(uint64_t)); - l_offset += sizeof(uint64_t); - memcpy(l_pkt->data + l_offset, &obj.timestamp, sizeof(time_t)); - l_offset += sizeof(time_t); - memcpy(l_pkt->data + l_offset, &key_size, sizeof(uint16_t)); - l_offset += sizeof(uint16_t); - memcpy(l_pkt->data + l_offset, obj.key, key_size); - l_offset += key_size; - memcpy(l_pkt->data + l_offset, &obj.value_len, sizeof(size_t)); - l_offset += sizeof(size_t); - memcpy(l_pkt->data + l_offset, obj.value, obj.value_len); - l_offset += obj.value_len; + if (l_data_size_out > DAP_CHAIN_PKT_EXPECT_SIZE || (l_q == a_store_obj_count - 1 && l_data_size_out)) { + l_pkt = DAP_NEW_Z_SIZE(dap_store_obj_pkt_t, sizeof(dap_store_obj_pkt_t) + l_data_size_out); + l_pkt->data_size = l_data_size_out; + l_pkt->timestamp = a_timestamp; + l_pkt->obj_count = l_q + 1 - l_obj_count; + l_ret = dap_list_append(l_ret, l_pkt); + l_data_size_out = 0; + l_obj_count = l_q + 1; + } } - assert(l_data_size_out == l_offset); - return l_pkt; + l_obj_count = 0; + for (dap_list_t *l_iter = l_ret; l_iter; l_iter = dap_list_next(l_iter)) { + l_pkt = (dap_store_obj_pkt_t *)l_iter->data; + uint64_t l_offset = 0; + for(size_t l_q = 0; l_q < l_pkt->obj_count; ++l_q) { + dap_store_obj_t obj = a_store_obj[l_obj_count + l_q]; + //uint16_t section_size = (uint16_t) dap_strlen(obj.section); + uint16_t group_size = (uint16_t) dap_strlen(obj.group); + uint16_t key_size = (uint16_t) dap_strlen(obj.key); + memcpy(l_pkt->data + l_offset, &obj.type, sizeof(int)); + l_offset += sizeof(int); + //memcpy(l_pkt->data + l_offset, §ion_size, sizeof(uint16_t)); + //l_offset += sizeof(uint16_t); + //memcpy(l_pkt->data + l_offset, obj.section, section_size); + //l_offset += section_size; + memcpy(l_pkt->data + l_offset, &group_size, sizeof(uint16_t)); + l_offset += sizeof(uint16_t); + memcpy(l_pkt->data + l_offset, obj.group, group_size); + l_offset += group_size; + memcpy(l_pkt->data + l_offset, &obj.id, sizeof(uint64_t)); + l_offset += sizeof(uint64_t); + memcpy(l_pkt->data + l_offset, &obj.timestamp, sizeof(time_t)); + l_offset += sizeof(time_t); + memcpy(l_pkt->data + l_offset, &key_size, sizeof(uint16_t)); + l_offset += sizeof(uint16_t); + memcpy(l_pkt->data + l_offset, obj.key, key_size); + l_offset += key_size; + memcpy(l_pkt->data + l_offset, &obj.value_len, sizeof(size_t)); + l_offset += sizeof(size_t); + memcpy(l_pkt->data + l_offset, obj.value, obj.value_len); + l_offset += obj.value_len; + } + l_obj_count += l_pkt->obj_count; + assert(l_pkt->data_size == l_offset); + } + return l_ret; } /** * deserialization @@ -255,9 +266,7 @@ dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, siz if(!pkt || pkt->data_size < 1) return NULL; uint64_t offset = 0; - uint32_t count; - memcpy(&count, pkt->data, sizeof(uint32_t)); - offset += sizeof(uint32_t); + uint32_t count = pkt->obj_count; dap_store_obj_t *store_obj = DAP_NEW_Z_SIZE(dap_store_obj_t, count * sizeof(struct dap_store_obj)); for(size_t q = 0; q < count; ++q) { dap_store_obj_t *obj = store_obj + q; diff --git a/modules/global-db/dap_chain_global_db_hist.c b/modules/global-db/dap_chain_global_db_hist.c index c2683b1560..4ac96d1472 100644 --- a/modules/global-db/dap_chain_global_db_hist.c +++ b/modules/global-db/dap_chain_global_db_hist.c @@ -77,7 +77,7 @@ static char* dap_db_new_history_timestamp() * * return dap_store_obj_pkt_t* */ -uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out) +dap_list_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out) { if(!a_obj) return NULL; @@ -122,15 +122,18 @@ uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out) i++; } // serialize data - dap_store_obj_pkt_t *l_data_out = dap_store_packet_multiple(l_store_obj, l_timestamp, l_count); + dap_list_t *l_data_out = dap_store_packet_multiple(l_store_obj, l_timestamp, l_count); dap_store_obj_free(l_store_obj, l_count); dap_strfreev(l_keys); if(l_data_out && a_data_size_out) { - *a_data_size_out = sizeof(dap_store_obj_pkt_t) + l_data_out->data_size; + *a_data_size_out = 0; + for (dap_list_t *l_iter = l_data_out; l_iter; l_iter = dap_list_next(l_iter)) { + *a_data_size_out += sizeof(dap_store_obj_pkt_t) + ((dap_store_obj_pkt_t *)l_data_out)->data_size; + } } - return (uint8_t*) l_data_out; + return l_data_out; } @@ -1326,7 +1329,6 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr dap_db_log_list_t *l_dap_db_log_list = DAP_NEW_Z(dap_db_log_list_t); size_t l_add_groups_num = 0;// number of group - //size_t l_add_groups_items_num = 0;// number items in all groups dap_list_t *l_add_groups_mask = a_add_groups_mask; // calc l_add_groups_num while(l_add_groups_mask) { @@ -1337,7 +1339,6 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr l_add_groups_mask = dap_list_next(l_add_groups_mask); } - //size_t l_add_groups_num = dap_list_length(a_add_groups_mask); size_t l_data_size_out_main = dap_chain_global_db_driver_count(GROUP_LOCAL_HISTORY, first_id); size_t *l_data_size_out_add_items = DAP_NEW_Z_SIZE(size_t, sizeof(size_t) * l_add_groups_num); uint64_t *l_group_last_id = DAP_NEW_Z_SIZE(uint64_t, sizeof(uint64_t) * l_add_groups_num); @@ -1366,9 +1367,6 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr } if(!(l_data_size_out_main + l_data_size_out_add_items_count)) return NULL; - // debug -// if(l_data_size_out>11) -// l_data_size_out = 11; l_dap_db_log_list->item_start = first_id; l_dap_db_log_list->item_last = first_id + l_data_size_out_main; l_dap_db_log_list->items_number_main = l_data_size_out_main; @@ -1381,31 +1379,9 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr l_dap_db_log_list->group_names = l_group_names; l_dap_db_log_list->group_cur = -1; l_dap_db_log_list->add_groups = a_add_groups_mask; - // there are too few items, read items right now - if(0) {//l_data_size_out <= 10) { - dap_list_t *l_list = NULL; - // read first items - size_t l_objs_count = 0; - dap_store_obj_t *l_objs = dap_chain_global_db_cond_load(GROUP_LOCAL_HISTORY, first_id, &l_objs_count); - for(size_t i = 0; i < l_objs_count; i++) { - dap_store_obj_t *l_obj_cur = l_objs + i; - dap_global_db_obj_t *l_item = DAP_NEW(dap_global_db_obj_t); - l_item->id = l_obj_cur->id; - l_item->key = dap_strdup(l_obj_cur->key); - l_item->value = (uint8_t*) dap_strdup((char*) l_obj_cur->value); - l_list = dap_list_append(l_list, l_item); - } - l_dap_db_log_list->list_write = l_list; - l_dap_db_log_list->list_read = l_list; - //log_it(L_DEBUG, "loaded items n=%d", l_data_size_out); - dap_store_obj_free(l_objs, l_objs_count); - } - // start thread for items loading - else { - l_dap_db_log_list->is_process = true; - pthread_mutex_init(&l_dap_db_log_list->list_mutex, NULL); - pthread_create(&l_dap_db_log_list->thread, NULL, s_list_thread_proc, l_dap_db_log_list); - } + l_dap_db_log_list->is_process = true; + pthread_mutex_init(&l_dap_db_log_list->list_mutex, NULL); + pthread_create(&l_dap_db_log_list->thread, NULL, s_list_thread_proc, l_dap_db_log_list); return l_dap_db_log_list; } diff --git a/modules/global-db/include/dap_chain_global_db.h b/modules/global-db/include/dap_chain_global_db.h index 4ac3c5a9a3..76ae062716 100644 --- a/modules/global-db/include/dap_chain_global_db.h +++ b/modules/global-db/include/dap_chain_global_db.h @@ -119,7 +119,7 @@ char* dap_chain_global_db_hash(const uint8_t *data, size_t data_size); char* dap_chain_global_db_hash_fast(const uint8_t *data, size_t data_size); // Get data according the history log -uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out); +dap_list_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out); // Get data according the history log //char* dap_db_history_tx(dap_chain_hash_fast_t * a_tx_hash, const char *a_group_mempool); diff --git a/modules/global-db/include/dap_chain_global_db_driver.h b/modules/global-db/include/dap_chain_global_db_driver.h index c8d7ef9c52..4bccdc9e09 100644 --- a/modules/global-db/include/dap_chain_global_db_driver.h +++ b/modules/global-db/include/dap_chain_global_db_driver.h @@ -29,6 +29,8 @@ #include "dap_common.h" #include "dap_list.h" +#define DAP_CHAIN_PKT_EXPECT_SIZE 7168 + typedef struct dap_store_obj { uint64_t id; time_t timestamp; @@ -42,8 +44,9 @@ typedef struct dap_store_obj { }DAP_ALIGN_PACKED dap_store_obj_t, *pdap_store_obj_t; typedef struct dap_store_obj_pkt { - time_t timestamp; - size_t data_size; + uint64_t timestamp; + uint64_t data_size; + uint32_t obj_count; uint8_t data[]; }__attribute__((packed)) dap_store_obj_pkt_t; @@ -90,7 +93,7 @@ bool dap_chain_global_db_driver_is(const char *a_group, const char *a_key); size_t dap_chain_global_db_driver_count(const char *a_group, uint64_t id); dap_list_t* dap_chain_global_db_driver_get_groups_by_mask(const char *a_group_mask); -dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, +dap_list_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp, size_t a_store_obj_count); dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *a_pkt, size_t *a_store_obj_count); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 8d2e991ab4..3dba362684 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -268,7 +268,9 @@ void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const c l_obj->type = (uint8_t)a_op_code; DAP_DELETE(l_obj->group); l_obj->group = dap_strdup(a_group); - dap_store_obj_pkt_t *l_data_out = dap_store_packet_multiple(l_obj, l_obj->timestamp, 1); + dap_list_t *l_list_out = dap_store_packet_multiple(l_obj, l_obj->timestamp, 1); + // Expect only one element in list + dap_store_obj_pkt_t *l_data_out = (dap_store_obj_pkt_t *)l_list_out->data; dap_store_obj_free(l_obj, 1); dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb"); dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t) {}; @@ -278,7 +280,7 @@ void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const c dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_net->pub.id, l_chain_id, l_net->pub.cell_id, l_data_out, sizeof(dap_store_obj_pkt_t) + l_data_out->data_size); } - DAP_DELETE(l_data_out); + dap_list_free_full(l_list_out, free); } } -- GitLab