From fc86bf9efa62e6912ce8a2d28f9e2e2f9de2be97 Mon Sep 17 00:00:00 2001 From: cellframe <roman.khlopkov@demlabs.net> Date: Sat, 16 Apr 2022 19:19:41 +0300 Subject: [PATCH] [*] A new part of sync fixes --- modules/channel/chain/dap_stream_ch_chain.c | 24 +++++++++------ .../global-db/dap_chain_global_db_remote.c | 30 ++++++++++++------- modules/net/dap_chain_net.c | 8 +++-- 3 files changed, 41 insertions(+), 21 deletions(-) diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index fc5ca5cbab..b443243046 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -678,9 +678,7 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) if(l_pkt_item->pkt_data_size != l_obj_pkt_size) { log_it(L_WARNING, "In: s_gdb_in_pkt_proc_callback: received size=%zu is not equal to obj_pkt_size=%zu", l_pkt_item->pkt_data_size, l_obj_pkt_size); - if(l_pkt_item->pkt_data) { - DAP_DELETE(l_pkt_item->pkt_data); - } + DAP_DEL_Z(l_pkt_item->pkt_data); DAP_DELETE(l_sync_request); return true; } @@ -688,6 +686,12 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) size_t l_data_obj_count = 0; // deserialize data & Parse data from dap_db_log_pack() dap_store_obj_t *l_store_obj = dap_store_unpacket_multiple(l_obj_pkt, &l_data_obj_count); + if (!l_store_obj) { + log_it(L_ERROR, "Invalid synchronization packet format"); + DAP_DEL_Z(l_pkt_item->pkt_data); + DAP_DELETE(l_sync_request); + return true; + } if (s_debug_more){ if (l_data_obj_count) log_it(L_INFO, "In: GLOBAL_DB parse: pkt_data_size=%"DAP_UINT64_FORMAT_U", l_data_obj_count = %zu",l_pkt_item->pkt_data_size, l_data_obj_count ); @@ -768,12 +772,14 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len); } if (!l_apply) { - if (l_obj->timestamp <= (uint64_t)l_timestamp_cur) - log_it(L_WARNING, "New data not applied, because newly object exists"); - if (l_obj->timestamp <= (uint64_t)l_timestamp_del) - log_it(L_WARNING, "New data not applied, because newly object is deleted"); - if ((l_obj->type == DAP_DB$K_OPTYPE_DEL && l_obj->timestamp <= l_limit_time)) - log_it(L_WARNING, "New data not applied, because object is too old"); + if (s_debug_more) { + if (l_obj->timestamp <= (uint64_t)l_timestamp_cur) + log_it(L_WARNING, "New data not applied, because newly object exists"); + if (l_obj->timestamp <= (uint64_t)l_timestamp_del) + log_it(L_WARNING, "New data not applied, because newly object is deleted"); + if ((l_obj->type == DAP_DB$K_OPTYPE_DEL && l_obj->timestamp <= l_limit_time)) + log_it(L_WARNING, "New data not applied, because object is too old"); + } continue; } diff --git a/modules/global-db/dap_chain_global_db_remote.c b/modules/global-db/dap_chain_global_db_remote.c index c103e99b2e..724c97b9dd 100644 --- a/modules/global-db/dap_chain_global_db_remote.c +++ b/modules/global-db/dap_chain_global_db_remote.c @@ -603,11 +603,11 @@ unsigned char *pdata; */ dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *a_pkt, size_t *a_store_obj_count) { - if(!a_pkt || a_pkt->data_size < 1) + if(!a_pkt || a_pkt->data_size < sizeof(dap_store_obj_pkt_t)) return NULL; uint64_t l_offset = 0; uint32_t l_count = a_pkt->obj_count, l_cur_count; - uint64_t l_size = l_count <= UINT32_MAX ? l_count * sizeof(struct dap_store_obj) : 0; + uint64_t l_size = l_count <= UINT16_MAX ? l_count * sizeof(struct dap_store_obj) : 0; dap_store_obj_t *l_store_obj = DAP_NEW_Z_SIZE(dap_store_obj_t, l_size); if (!l_store_obj || !l_size) { log_it(L_ERROR, "Invalid size: can't allocate %"DAP_UINT64_FORMAT_U" bytes", l_size); @@ -628,40 +628,50 @@ dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *a_pkt, s memcpy(&l_str_length, a_pkt->data + l_offset, sizeof(uint16_t)); l_offset += sizeof(uint16_t); - if (l_offset+l_str_length> a_pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'group' field"); break;} // Check for buffer boundries + if (l_offset + l_str_length > a_pkt->data_size || !l_str_length) {log_it(L_ERROR, "Broken GDB element: can't read 'group' field"); break;} // Check for buffer boundries l_obj->group = DAP_NEW_SIZE(char, l_str_length + 1); memcpy(l_obj->group, a_pkt->data + l_offset, l_str_length); l_obj->group[l_str_length] = '\0'; l_offset += l_str_length; - if (l_offset+sizeof (uint64_t)> a_pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'id' field"); break;} // Check for buffer boundries + if (l_offset+sizeof (uint64_t)> a_pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'id' field"); + DAP_DELETE(l_obj->group); break;} // Check for buffer boundries memcpy(&l_obj->id, a_pkt->data + l_offset, sizeof(uint64_t)); l_offset += sizeof(uint64_t); - if (l_offset+sizeof (uint64_t)> a_pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'timestamp' field"); break;} // Check for buffer boundries + if (l_offset+sizeof (uint64_t)> a_pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'timestamp' field"); + DAP_DELETE(l_obj->group); break;} // Check for buffer boundries memcpy(&l_obj->timestamp, a_pkt->data + l_offset, sizeof(uint64_t)); l_offset += sizeof(uint64_t); - if (l_offset+sizeof (uint16_t)> a_pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'key_length' field"); break;} // Check for buffer boundries + if (l_offset+sizeof (uint16_t)> a_pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'key_length' field"); + DAP_DELETE(l_obj->group); break;} // Check for buffer boundries memcpy(&l_str_length, a_pkt->data + l_offset, sizeof(uint16_t)); l_offset += sizeof(uint16_t); - if (l_offset+ l_str_length > a_pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'key' field"); break;} // Check for buffer boundries + if (l_offset + l_str_length > a_pkt->data_size || !l_str_length) {log_it(L_ERROR, "Broken GDB element: can't read 'key' field"); + DAP_DELETE(l_obj->group); break;} // Check for buffer boundries l_obj->key = DAP_NEW_SIZE(char, l_str_length + 1); memcpy((char *)l_obj->key, a_pkt->data + l_offset, l_str_length); ((char *)l_obj->key)[l_str_length] = '\0'; l_offset += l_str_length; - if (l_offset+sizeof (uint64_t)> a_pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'value_length' field"); break;} // Check for buffer boundries + if (l_offset+sizeof (uint64_t)> a_pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'value_length' field"); + DAP_DELETE(l_obj->group); DAP_DELETE(l_obj->key); break;} // Check for buffer boundries memcpy(&l_obj->value_len, a_pkt->data + l_offset, sizeof(uint64_t)); l_offset += sizeof(uint64_t); - if (l_offset+l_obj->value_len> a_pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'value' field"); break;} // Check for buffer boundries + if (l_offset + l_obj->value_len > a_pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'value' field"); + DAP_DELETE(l_obj->group); DAP_DELETE(l_obj->key);break;} // Check for buffer boundries l_obj->value = DAP_NEW_SIZE(uint8_t, l_obj->value_len); memcpy(l_obj->value, a_pkt->data + l_offset, l_obj->value_len); l_offset += l_obj->value_len; } - assert(a_pkt->data_size == l_offset); + if (a_pkt->data_size != l_offset) { + if (l_cur_count) + dap_store_obj_free(l_store_obj, l_cur_count); + return NULL; + } // Return the number of completely filled dap_store_obj_t structures // because l_cur_count may be less than l_count due to too little memory if(a_store_obj_count) diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 6e429a0c9d..8b6afd61b4 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -465,6 +465,8 @@ static bool s_net_send_records(dap_proc_thread_t *a_thread, void *a_arg) dap_chain_t *l_chain = dap_chain_get_chain_from_group_name(l_net->pub.id, l_obj->group); dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t) {}; dap_chain_cell_id_t l_cell_id = l_chain ? l_chain->cells->id : (dap_chain_cell_id_t){}; + if (!l_obj_cur->group) + break; dap_store_obj_pkt_t *l_data_out = dap_store_packet_single(l_obj_cur); dap_store_obj_free_one(l_obj_cur); struct downlink *l_link, *l_tmp; @@ -485,7 +487,8 @@ static bool s_net_send_records(dap_proc_thread_t *a_thread, void *a_arg) it = PVT(l_net)->records_queue; } while (it); } else - PVT(l_net)->records_queue = dap_list_append(PVT(l_net)->records_queue, l_obj); + //PVT(l_net)->records_queue = dap_list_append(PVT(l_net)->records_queue, l_obj); + dap_store_obj_free_one(l_obj); pthread_rwlock_unlock(&PVT(l_net)->rwlock); return true; } @@ -564,7 +567,8 @@ static bool s_net_send_atoms(dap_proc_thread_t *a_thread, void *a_arg) it = PVT(l_net)->atoms_queue; } while (it); } else - PVT(l_net)->atoms_queue = dap_list_append(PVT(l_net)->records_queue, l_arg); + //PVT(l_net)->atoms_queue = dap_list_append(PVT(l_net)->records_queue, l_arg); + s_atom_obj_free(a_arg); pthread_rwlock_unlock(&PVT(l_net)->rwlock); return true; } -- GitLab