diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 099d4d843b382f8811735be833976500be8a7c8e..fc5ca5cbab862a7e7ee79393465c6821beea022a 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -505,11 +505,11 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) uint64_t l_atom_copy_size = l_pkt_item->pkt_data_size; dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash); dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size); + char l_atom_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = {[0]='\0'}; + dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); switch (l_atom_add_res) { case ATOM_PASS: if (s_debug_more){ - char l_atom_hash_str[72]={[0]='\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); log_it(L_WARNING, "Atom with hash %s for %s:%s not accepted (code ATOM_PASS, already present)", l_atom_hash_str, l_chain->net_name, l_chain->name); } dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); @@ -517,21 +517,15 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) break; case ATOM_MOVE_TO_THRESHOLD: if (s_debug_more) { - char l_atom_hash_str[72] = {'\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash, l_atom_hash_str, sizeof(l_atom_hash_str) - 1); log_it(L_INFO, "Thresholded atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); } break; case ATOM_ACCEPT: if (s_debug_more) { - char l_atom_hash_str[72]={'\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); log_it(L_INFO,"Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); } int l_res = dap_chain_atom_save(l_chain, l_atom_copy, l_atom_copy_size, l_sync_request->request_hdr.cell_id); if(l_res < 0) { - char l_atom_hash_str[72]={'\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); log_it(L_ERROR, "Can't save atom %s to the file", l_atom_hash_str); } else { dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); @@ -552,10 +546,10 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) if (l_atom_treshold) { dap_chain_cell_id_t l_cell_id = (l_cur_chain == l_chain) ? l_sync_request->request_hdr.cell_id : l_cur_chain->cells->id; - int l_res = dap_chain_atom_save(l_cur_chain, l_atom_copy, l_atom_copy_size, l_cell_id); + int l_res = dap_chain_atom_save(l_cur_chain, l_atom_treshold, l_atom_treshold_size, l_cell_id); log_it(L_INFO, "Added atom from treshold"); if (l_res < 0) { - char l_atom_hash_str[72] = {'\0'}; + dap_hash_fast(l_atom_treshold, l_atom_treshold_size, &l_atom_hash); dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str, sizeof(l_atom_hash_str) - 1); log_it(L_ERROR, "Can't save atom %s from treshold to file", l_atom_hash_str); } else if (l_cur_chain == l_chain) { @@ -755,7 +749,7 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group, l_obj->key, NULL); if (l_read_obj) { l_timestamp_cur = l_read_obj->timestamp; - dap_store_obj_free(l_read_obj, 1); + dap_store_obj_free_one(l_read_obj); } } time_t l_timestamp_del = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key); diff --git a/modules/global-db/dap_chain_global_db.c b/modules/global-db/dap_chain_global_db.c index 5768111d6d00758ee70881b6719d3417a28514cd..abfd46e7ef8f1e2b09ae88250f13dc8150eb5526 100644 --- a/modules/global-db/dap_chain_global_db.c +++ b/modules/global-db/dap_chain_global_db.c @@ -337,7 +337,7 @@ uint8_t * dap_chain_global_db_gr_get(const char *a_key, size_t *a_data_len_out, ? DAP_DUP_SIZE(l_store_data->value, l_store_data->value_len) : NULL; l_data_len_out = l_store_data->value_len; - dap_store_obj_free(l_store_data, 1); + dap_store_obj_free_one(l_store_data); *a_data_len_out = l_data_len_out; return l_ret_value; } diff --git a/modules/global-db/dap_chain_global_db_driver.c b/modules/global-db/dap_chain_global_db_driver.c index f89b7bea023844532b005c51e1de89dec5d98e0b..e62052accc1edfa1656d068d2080456b5b150f26 100644 --- a/modules/global-db/dap_chain_global_db_driver.c +++ b/modules/global-db/dap_chain_global_db_driver.c @@ -199,7 +199,8 @@ dap_store_obj_t *l_store_obj, *l_store_obj_dst, *l_store_obj_src; * @param a_store_count a number of objects * @return (none) */ -void dap_store_obj_free(dap_store_obj_t *a_store_obj, size_t a_store_count){ +void dap_store_obj_free(dap_store_obj_t *a_store_obj, size_t a_store_count) +{ if(!a_store_obj) return; diff --git a/modules/global-db/dap_chain_global_db_remote.c b/modules/global-db/dap_chain_global_db_remote.c index fd597586f0efe924837c5f8a5570e5d92e8373c6..c103e99b2ec6d0efb9a41f13c26d622f0dcd3ad3 100644 --- a/modules/global-db/dap_chain_global_db_remote.c +++ b/modules/global-db/dap_chain_global_db_remote.c @@ -26,7 +26,7 @@ uint64_t dap_db_log_get_group_last_id(const char *a_group_name) dap_store_obj_t *l_last_obj = dap_chain_global_db_get_last(a_group_name); if(l_last_obj) { result = l_last_obj->id; - dap_store_obj_free(l_last_obj, 1); + dap_store_obj_free_one(l_last_obj); } return result; } diff --git a/modules/global-db/include/dap_chain_global_db_driver.h b/modules/global-db/include/dap_chain_global_db_driver.h index 133ea7b637c054c81b2d13002cc0404ce86acc93..7b8ba74647bdfe6708fa15209cdef56bf43dfa85 100644 --- a/modules/global-db/include/dap_chain_global_db_driver.h +++ b/modules/global-db/include/dap_chain_global_db_driver.h @@ -86,6 +86,7 @@ void dap_db_driver_deinit(void); dap_store_obj_t* dap_store_obj_copy(dap_store_obj_t *a_store_obj, size_t a_store_count); void dap_store_obj_free(dap_store_obj_t *a_store_obj, size_t a_store_count); +DAP_STATIC_INLINE void dap_store_obj_free_one(dap_store_obj_t *a_store_obj) { return dap_store_obj_free(a_store_obj, 1); } int dap_db_driver_flush(void); char* dap_chain_global_db_driver_hash(const uint8_t *data, size_t data_size); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index f5e2baa2ba1233945abe5037277c0d19108650d8..6e429a0c9deb44fd51230f97c67c99c605539506 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -452,8 +452,11 @@ static bool s_net_send_records(dap_proc_thread_t *a_thread, void *a_arg) l_obj->type = l_arg->type; if (l_obj->type == DAP_DB$K_OPTYPE_DEL) { DAP_DELETE(l_obj->group); - l_obj->group = dap_strdup(l_arg->group); - } + l_obj->group = l_arg->group; + } else + DAP_DELETE(l_arg->group); + DAP_DELETE(l_arg->key); + DAP_DELETE(l_arg); pthread_rwlock_wrlock(&PVT(l_net)->rwlock); if (PVT(l_net)->state != NET_STATE_SYNC_GDB) { dap_list_t *it = NULL; @@ -463,7 +466,7 @@ static bool s_net_send_records(dap_proc_thread_t *a_thread, void *a_arg) dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t) {}; dap_chain_cell_id_t l_cell_id = l_chain ? l_chain->cells->id : (dap_chain_cell_id_t){}; dap_store_obj_pkt_t *l_data_out = dap_store_packet_single(l_obj_cur); - dap_store_obj_free(l_obj_cur, 1); + dap_store_obj_free_one(l_obj_cur); struct downlink *l_link, *l_tmp; HASH_ITER(hh, PVT(l_net)->downlinks, l_link, l_tmp) { dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(l_link->worker, l_link->uuid); @@ -477,12 +480,9 @@ static bool s_net_send_records(dap_proc_thread_t *a_thread, void *a_arg) sizeof(dap_store_obj_pkt_t) + l_data_out->data_size); } DAP_DELETE(l_data_out); - if (it) { - dap_list_t *l_tmp = it->next; - dap_list_delete_link(PVT(l_net)->records_queue, it); - it = l_tmp; - } else - it = PVT(l_net)->records_queue; + if (it) + PVT(l_net)->records_queue = dap_list_delete_link(PVT(l_net)->records_queue, it); + it = PVT(l_net)->records_queue; } while (it); } else PVT(l_net)->records_queue = dap_list_append(PVT(l_net)->records_queue, l_obj); @@ -490,9 +490,10 @@ static bool s_net_send_records(dap_proc_thread_t *a_thread, void *a_arg) return true; } +static void s_record_obj_free(void *a_obj) { return dap_store_obj_free_one((dap_store_obj_t *)a_obj); } + /** - * @brief if current network in ONLINE state send to all connected node - * executes, when you add data to gdb chain (class=gdb in chain config) + * @brief executes, when you add data to gdb and sends it to current network connected nodes * @param a_arg arguments. Can be network object (dap_chain_net_t) * @param a_op_code object type (f.e. l_net->type from dap_store_obj) * @param a_group group, for example "chain-gdb.home21-network.chain-F" @@ -505,9 +506,19 @@ void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const c { UNUSED(a_value); UNUSED(a_value_len); + if (!a_arg) + return; dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; - if (!HASH_COUNT(PVT(l_net)->downlinks)) + if (!HASH_COUNT(PVT(l_net)->downlinks)) { + if (PVT(l_net)->records_queue) { + pthread_rwlock_wrlock(&PVT(l_net)->rwlock); + dap_list_free_full(PVT(l_net)->records_queue, s_record_obj_free); + PVT(l_net)->records_queue = NULL; + pthread_rwlock_unlock(&PVT(l_net)->rwlock); + } return; + } + // Use it instead of new type definition to pack params in one callback arg dap_store_obj_t *l_obj = DAP_NEW(dap_store_obj_t); l_obj->type = a_op_code; l_obj->key = dap_strdup(a_key); @@ -516,6 +527,78 @@ void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const c dap_proc_queue_add_callback(dap_events_worker_get_auto(), s_net_send_records, l_obj); } +static void s_atom_obj_free(void *a_atom_obj) +{ + dap_store_obj_t *l_obj = (dap_store_obj_t *)a_atom_obj; + DAP_DELETE(l_obj->value); + DAP_DELETE(l_obj); +} + +static bool s_net_send_atoms(dap_proc_thread_t *a_thread, void *a_arg) +{ + UNUSED(a_thread); + dap_store_obj_t *l_arg = (dap_store_obj_t *)a_arg; + dap_chain_net_t *l_net = (dap_chain_net_t *)l_arg->cb_arg; + pthread_rwlock_rdlock(&PVT(l_net)->rwlock); + if (PVT(l_net)->state != NET_STATE_SYNC_CHAINS) { + dap_list_t *it = NULL; + do { + dap_store_obj_t *l_obj_cur = it ? (dap_store_obj_t *)it->data : l_arg; + dap_chain_t *l_chain = (dap_chain_t *)l_obj_cur->group; + uint64_t l_cell_id = l_obj_cur->timestamp; + struct downlink *l_link, *l_tmp; + HASH_ITER(hh, PVT(l_net)->downlinks, l_link, l_tmp) { + dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(l_link->worker, l_link->uuid); + if (!l_ch) { + HASH_DEL(PVT(l_net)->downlinks, l_link); + DAP_DELETE(l_link); + continue; + } + dap_stream_ch_chain_pkt_write_mt(l_link->worker, l_link->uuid, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, + l_net->pub.id.uint64, l_chain->id.uint64, l_cell_id, + l_obj_cur->value, l_obj_cur->value_len); + } + s_atom_obj_free(l_obj_cur); + if (it) + PVT(l_net)->atoms_queue = dap_list_delete_link(PVT(l_net)->atoms_queue, it); + it = PVT(l_net)->atoms_queue; + } while (it); + } else + PVT(l_net)->atoms_queue = dap_list_append(PVT(l_net)->records_queue, l_arg); + pthread_rwlock_unlock(&PVT(l_net)->rwlock); + return true; +} + +/** + * @brief s_chain_callback_notify + * @param a_arg + * @param a_chain + * @param a_id + */ +static void s_chain_callback_notify(void *a_arg, dap_chain_t *a_chain, dap_chain_cell_id_t a_id, void* a_atom, size_t a_atom_size) +{ + if (!a_arg) + return; + dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; + if (!HASH_COUNT(PVT(l_net)->downlinks)) { + if (PVT(l_net)->atoms_queue) { + pthread_rwlock_wrlock(&PVT(l_net)->rwlock); + dap_list_free_full(PVT(l_net)->atoms_queue, s_atom_obj_free); + PVT(l_net)->atoms_queue = NULL; + pthread_rwlock_unlock(&PVT(l_net)->rwlock); + } + return; + } + // Use it instead of new type definition to pack params in one callback arg + dap_store_obj_t *l_obj = DAP_NEW(dap_store_obj_t); + l_obj->timestamp = a_id.uint64; + l_obj->value = DAP_DUP_SIZE(a_atom, a_atom_size); + l_obj->value_len = a_atom_size; + l_obj->group = (char *)a_chain; + l_obj->cb_arg = a_arg; + dap_proc_queue_add_callback(dap_events_worker_get_auto(), s_net_send_atoms, l_obj); +} + /** * @brief added like callback in dap_chain_global_db_add_sync_group * @@ -561,37 +644,6 @@ static void s_gbd_history_callback_notify(void *a_arg, const char a_op_code, con } } -/** - * @brief s_chain_callback_notify - * @param a_arg - * @param a_chain - * @param a_id - */ -static void s_chain_callback_notify(void * a_arg, dap_chain_t *a_chain, dap_chain_cell_id_t a_id, void* a_atom, size_t a_atom_size) -{ - if (!a_arg) - return; - dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; - if (PVT(l_net)->state >= NET_STATE_LINKS_ESTABLISHED && PVT(l_net)->state != NET_STATE_SYNC_CHAINS) { - pthread_rwlock_rdlock(&PVT(l_net)->rwlock); - struct downlink *l_link, *l_tmp; - HASH_ITER(hh, PVT(l_net)->downlinks, l_link, l_tmp) { - dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(l_link->worker, l_link->uuid); - if (!l_ch) { - HASH_DEL(PVT(l_net)->downlinks, l_link); - DAP_DELETE(l_link); - continue; - } - dap_stream_ch_chain_pkt_write_mt(l_link->worker, l_link->uuid, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, - l_net->pub.id.uint64, a_chain->id.uint64, a_id.uint64, a_atom, a_atom_size); - } - pthread_rwlock_unlock(&PVT(l_net)->rwlock); - }else{ - if (s_debug_more) - log_it(L_WARNING,"Node current state is %s. Real-time syncing is possible when you in NET_STATE_LINKS_ESTABLISHED (and above) state", s_net_state_to_str(PVT(l_net)->state)); - } -} - static dap_chain_node_info_t *s_get_dns_link_from_cfg(dap_chain_net_t *a_net) { dap_chain_net_pvt_t *l_net_pvt = PVT(a_net);