diff --git a/modules/chain/include/dap_chain.h b/modules/chain/include/dap_chain.h index 87567372f62b7b1ca5cc5181e7f12d54701fce7d..566c15beafb8554d12742ddb1b8b9ca24c8b2fa2 100644 --- a/modules/chain/include/dap_chain.h +++ b/modules/chain/include/dap_chain.h @@ -78,7 +78,10 @@ typedef size_t (*dap_chain_callback_atom_get_hdr_size_t)(void); typedef dap_chain_atom_iter_t* (*dap_chain_callback_atom_iter_create_t)(dap_chain_t *, dap_chain_cell_id_t, bool); typedef dap_chain_atom_iter_t* (*dap_chain_callback_atom_iter_create_from_t)(dap_chain_t * ,dap_chain_atom_ptr_t, size_t); typedef dap_chain_atom_ptr_t (*dap_chain_callback_atom_iter_get_first_t)(dap_chain_atom_iter_t * , size_t*); + typedef dap_chain_datum_t** (*dap_chain_callback_atom_get_datum_t)(dap_chain_atom_ptr_t, size_t, size_t * ); +typedef dap_time_t (*dap_chain_callback_atom_get_timestamp_t)(dap_chain_atom_ptr_t); + typedef dap_chain_atom_ptr_t (*dap_chain_callback_atom_iter_find_by_hash_t)(dap_chain_atom_iter_t * ,dap_chain_hash_fast_t *,size_t*); typedef dap_chain_datum_tx_t* (*dap_chain_callback_tx_find_by_hash_t)(dap_chain_t * ,dap_chain_hash_fast_t *); @@ -149,10 +152,13 @@ typedef struct dap_chain { dap_chain_callback_atom_iter_create_t callback_atom_iter_create; dap_chain_callback_atom_iter_create_from_t callback_atom_iter_create_from; dap_chain_callback_atom_iter_get_first_t callback_atom_iter_get_first; + dap_chain_callback_atom_get_datum_t callback_atom_get_datums; + dap_chain_callback_atom_get_timestamp_t callback_atom_get_timestamp; dap_chain_callback_atom_iter_find_by_hash_t callback_atom_find_by_hash; dap_chain_callback_tx_find_by_hash_t callback_tx_find_by_hash; + dap_chain_callback_atom_iter_get_next_t callback_atom_iter_get_next; dap_chain_callback_atom_iter_get_atoms_t callback_atom_iter_get_links; dap_chain_callback_atom_iter_get_atoms_t callback_atom_iter_get_lasts; diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 0acaf7b5f318d8ed688fac9c3538088728fb0561..4b0c88105a31f92771681995a6a886d14599e364 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -737,7 +737,7 @@ static void s_gdb_in_pkt_proc_callback_apply(dap_global_db_context_t *a_global_d } if (s_debug_more){ char l_ts_str[50]; - dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), dap_gdb_time_to_sec(l_obj->timestamp)); + dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), dap_nanotime_to_sec(l_obj->timestamp)); log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\"" " timestamp=\"%s\" value_len=%" DAP_UINT64_FORMAT_U, (char )l_obj->type, (char)l_obj->type, l_obj->group, diff --git a/modules/consensus/none/dap_chain_cs_none.c b/modules/consensus/none/dap_chain_cs_none.c index 38a53b26a53cfc4f30507e0a5415a2ef5f7c8222..a13f1b1f827c936dde274812bbc1d39cfe392aba 100644 --- a/modules/consensus/none/dap_chain_cs_none.c +++ b/modules/consensus/none/dap_chain_cs_none.c @@ -91,7 +91,7 @@ static dap_chain_atom_ptr_t *s_chain_callback_atom_iter_get_links(dap_chain_atom static dap_chain_atom_ptr_t *s_chain_callback_atom_iter_get_lasts(dap_chain_atom_iter_t * a_atom_iter, size_t * a_lasts_size_ptr, size_t ** a_lasts_sizes_ptr); // Get list of linked events static dap_chain_datum_t **s_chain_callback_atom_get_datum(dap_chain_atom_ptr_t a_atom, size_t a_atom_size, size_t *a_datums_count); - +static dap_time_t s_chain_callback_atom_get_timestamp(dap_chain_atom_ptr_t a_atom) { return ((dap_chain_datum_t *)a_atom)->header.ts_create; } static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain_datum_t ** a_datums, size_t a_datums_size); @@ -214,6 +214,7 @@ int dap_chain_gdb_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) a_chain->callback_atom_iter_get_links = s_chain_callback_atom_iter_get_links; // Get the next element from chain from the current one a_chain->callback_atom_iter_get_lasts = s_chain_callback_atom_iter_get_lasts; a_chain->callback_atom_get_datums = s_chain_callback_atom_get_datum; + a_chain->callback_atom_get_timestamp = s_chain_callback_atom_get_timestamp; return 0; } diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 8c905b25104817dd41f2e260fb7d0b7ff207c4c8..39ebba29c096f6abfe74b766cba294068946acec 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -173,8 +173,6 @@ typedef struct dap_chain_net_pvt{ uint16_t reconnect_delay; // sec struct downlink *downlinks; // HT of links who sent SYNC REQ, it used for sync broadcasting - dap_list_t *records_queue; - dap_list_t *atoms_queue; bool load_mode; char ** seed_aliases; @@ -204,8 +202,6 @@ typedef struct dap_chain_net_pvt{ pthread_rwlock_t downlinks_lock; pthread_rwlock_t balancer_lock; pthread_rwlock_t states_lock; - pthread_rwlock_t gdbs_lock; - pthread_rwlock_t atoms_lock; dap_list_t *gdb_notifiers; } dap_chain_net_pvt_t; @@ -418,77 +414,68 @@ void dap_chain_net_sync_gdb_broadcast(dap_global_db_context_t *a_context, dap_st { if (!a_arg || !a_obj || !a_obj->group || !a_obj->key) return; + // Check object lifetime for broadcasting decision + dap_nanotime_t l_time_diff = a_obj->timestamp ? a_obj->timestamp - dap_nanotime_now() : (dap_nanotime_t)-1; + if (dap_nanotime_to_sec(l_time_diff) > DAP_BROADCAST_LIFETIME * 60) + return; + dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; - if (PVT(l_net)->state != NET_STATE_SYNC_GDB) { - dap_chain_t *l_chain = NULL; - if (a_obj->type == DAP_DB$K_OPTYPE_ADD) - l_chain = dap_chain_get_chain_from_group_name(l_net->pub.id, a_obj->group); - dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t) {}; - dap_chain_cell_id_t l_cell_id = l_chain ? l_chain->cells->id : (dap_chain_cell_id_t){}; - dap_global_db_pkt_t *l_data_out = dap_store_packet_single(a_obj); - struct downlink *l_link, *l_tmp; - pthread_rwlock_rdlock(&PVT(l_net)->downlinks_lock); - HASH_ITER(hh, PVT(l_net)->downlinks, l_link, l_tmp) { - bool l_ch_alive = dap_stream_ch_check_uuid_mt(l_link->worker, l_link->ch_uuid); - if (!l_ch_alive) { - HASH_DEL(PVT(l_net)->downlinks, l_link); - DAP_DELETE(l_link); - continue; - } - if (!dap_stream_ch_chain_pkt_write_inter(a_context->queue_worker_ch_io_input[l_link->worker->worker->id], - l_link->ch_uuid, - DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_net->pub.id.uint64, - l_chain_id.uint64, l_cell_id.uint64, l_data_out, - sizeof(dap_global_db_pkt_t) + l_data_out->data_size)) - debug_if(g_debug_reactor, L_ERROR, "Can't send pkt to worker (%d) for writing", l_link->worker->worker->id); + dap_chain_t *l_chain = NULL; + if (a_obj->type == DAP_DB$K_OPTYPE_ADD) + l_chain = dap_chain_get_chain_from_group_name(l_net->pub.id, a_obj->group); + dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t) {}; + dap_chain_cell_id_t l_cell_id = l_chain ? l_chain->cells->id : (dap_chain_cell_id_t){}; + dap_global_db_pkt_t *l_data_out = dap_store_packet_single(a_obj); + struct downlink *l_link, *l_tmp; + pthread_rwlock_rdlock(&PVT(l_net)->downlinks_lock); + HASH_ITER(hh, PVT(l_net)->downlinks, l_link, l_tmp) { + bool l_ch_alive = dap_stream_ch_check_uuid_mt(l_link->worker, l_link->ch_uuid); + if (!l_ch_alive) { + HASH_DEL(PVT(l_net)->downlinks, l_link); + DAP_DELETE(l_link); + continue; } - pthread_rwlock_unlock(&PVT(l_net)->downlinks_lock); + if (!dap_stream_ch_chain_pkt_write_inter(a_context->queue_worker_ch_io_input[l_link->worker->worker->id], + l_link->ch_uuid, + DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_net->pub.id.uint64, + l_chain_id.uint64, l_cell_id.uint64, l_data_out, + sizeof(dap_global_db_pkt_t) + l_data_out->data_size)) + debug_if(g_debug_reactor, L_ERROR, "Can't send pkt to worker (%d) for writing", l_link->worker->worker->id); } + pthread_rwlock_unlock(&PVT(l_net)->downlinks_lock); } -static void s_atom_obj_free(void *a_atom_obj) -{ - dap_store_obj_t *l_obj = (dap_store_obj_t *)a_atom_obj; - DAP_DELETE(l_obj->value); - DAP_DELETE(l_obj); -} +struct net_broadcast_atoms_args { + dap_chain_atom_ptr_t atom; + size_t atom_size; + dap_chain_net_t *net; + uint64_t chain_id; + uint64_t cell_id; +}; static bool s_net_send_atoms(dap_proc_thread_t *a_thread, void *a_arg) { UNUSED(a_thread); - dap_store_obj_t *l_arg = (dap_store_obj_t *)a_arg; - dap_chain_net_t *l_net = (dap_chain_net_t *)l_arg->callback_proc_thread_arg; - pthread_rwlock_wrlock(&PVT(l_net)->atoms_lock); - if (PVT(l_net)->state != NET_STATE_SYNC_CHAINS) { - dap_list_t *it = NULL; - do { - dap_store_obj_t *l_obj_cur = it ? (dap_store_obj_t *)it->data : l_arg; - dap_chain_t *l_chain = (dap_chain_t *)l_obj_cur->group; - uint64_t l_cell_id = l_obj_cur->timestamp; - struct downlink *l_link, *l_tmp; - pthread_rwlock_wrlock(&PVT(l_net)->downlinks_lock); - HASH_ITER(hh, PVT(l_net)->downlinks, l_link, l_tmp) { - bool l_ch_alive = dap_stream_ch_check_uuid_mt(l_link->worker, l_link->ch_uuid); - if (!l_ch_alive) { - HASH_DEL(PVT(l_net)->downlinks, l_link); - DAP_DELETE(l_link); - continue; - } - if(!dap_stream_ch_chain_pkt_write_mt(l_link->worker, l_link->ch_uuid, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, - l_net->pub.id.uint64, l_chain->id.uint64, l_cell_id, - l_obj_cur->value, l_obj_cur->value_len)) - debug_if(g_debug_reactor, L_ERROR, "Can't send atom to worker (%d) for writing", l_link->worker->worker->id); - } - pthread_rwlock_unlock(&PVT(l_net)->downlinks_lock); - s_atom_obj_free(l_obj_cur); - if (it) - PVT(l_net)->atoms_queue = dap_list_delete_link(PVT(l_net)->atoms_queue, it); - it = PVT(l_net)->atoms_queue; - } while (it); - } else - //PVT(l_net)->atoms_queue = dap_list_append(PVT(l_net)->atoms_queue, l_arg); - s_atom_obj_free(a_arg); - pthread_rwlock_unlock(&PVT(l_net)->atoms_lock); + + struct net_broadcast_atoms_args *l_args = a_arg; + dap_chain_net_t *l_net = l_args->net; + struct downlink *l_link, *l_tmp; + pthread_rwlock_wrlock(&PVT(l_net)->downlinks_lock); + HASH_ITER(hh, PVT(l_net)->downlinks, l_link, l_tmp) { + bool l_ch_alive = dap_stream_ch_check_uuid_mt(l_link->worker, l_link->ch_uuid); + if (!l_ch_alive) { + HASH_DEL(PVT(l_net)->downlinks, l_link); + DAP_DELETE(l_link); + continue; + } + if(!dap_stream_ch_chain_pkt_write_mt(l_link->worker, l_link->ch_uuid, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, + l_net->pub.id.uint64, l_args->chain_id, l_args->cell_id, + l_args->atom, l_args->atom_size)) + debug_if(g_debug_reactor, L_ERROR, "Can't send atom to worker (%d) for writing", l_link->worker->worker->id); + } + pthread_rwlock_unlock(&PVT(l_net)->downlinks_lock); + DAP_DELETE(l_args->atom); + DAP_DELETE(l_args); return true; } @@ -503,23 +490,21 @@ static void s_chain_callback_notify(void *a_arg, dap_chain_t *a_chain, dap_chain if (!a_arg) return; dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; - if (!HASH_COUNT(PVT(l_net)->downlinks)) { - if (PVT(l_net)->atoms_queue) { - pthread_rwlock_wrlock(&PVT(l_net)->atoms_lock); - dap_list_free_full(PVT(l_net)->atoms_queue, s_atom_obj_free); - PVT(l_net)->atoms_queue = NULL; - pthread_rwlock_unlock(&PVT(l_net)->atoms_lock); - } + if (!HASH_COUNT(PVT(l_net)->downlinks)) return; - } - // Use it instead of new type definition to pack params in one callback arg - dap_store_obj_t *l_obj = DAP_NEW(dap_store_obj_t); - l_obj->timestamp = a_id.uint64; - l_obj->value = DAP_DUP_SIZE(a_atom, a_atom_size); - l_obj->value_len = a_atom_size; - l_obj->group = (char *)a_chain; - l_obj->callback_proc_thread_arg = a_arg; - dap_proc_queue_add_callback(dap_events_worker_get_auto(), s_net_send_atoms, l_obj); + // Check object lifetime for broadcasting decision + dap_time_t l_timestamp = a_chain->callback_atom_get_timestamp(a_atom); + dap_time_t l_time_diff = l_timestamp ? l_timestamp - dap_time_now() : (dap_time_t)-1; + if (l_time_diff > DAP_BROADCAST_LIFETIME * 60) + return; + + struct net_broadcast_atoms_args *l_args = DAP_NEW(struct net_broadcast_atoms_args); + l_args->net = l_net; + l_args->atom = DAP_DUP_SIZE(a_atom, a_atom_size); + l_args->atom_size = a_atom_size; + l_args->chain_id = a_chain->id.uint64; + l_args->cell_id = a_id.uint64; + dap_proc_queue_add_callback(dap_events_worker_get_auto(), s_net_send_atoms, l_args); } /** @@ -1189,8 +1174,6 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) l_net_pvt->balancer_link_requests = 0; l_net_pvt->active_link = NULL; dap_list_free_full(l_net_pvt->links_queue, NULL); - dap_list_free_full(l_net_pvt->atoms_queue, NULL); - dap_list_free_full(l_net_pvt->records_queue, NULL); if ( l_net_pvt->state_target != NET_STATE_OFFLINE ){ l_net_pvt->state = NET_STATE_LINKS_PREPARE; l_repeat_after_exit = true; @@ -1372,8 +1355,6 @@ static dap_chain_net_t *s_net_new(const char * a_id, const char * a_name , pthread_rwlock_init(&PVT(ret)->downlinks_lock, NULL); pthread_rwlock_init(&PVT(ret)->balancer_lock, NULL); pthread_rwlock_init(&PVT(ret)->states_lock, NULL); - pthread_rwlock_init(&PVT(ret)->gdbs_lock, NULL); - pthread_rwlock_init(&PVT(ret)->atoms_lock, NULL); if (dap_sscanf(a_id, "0x%016"DAP_UINT64_FORMAT_X, &ret->pub.id.uint64) != 1) { log_it (L_ERROR, "Wrong id format (\"%s\"). Must be like \"0x0123456789ABCDE\"" , a_id ); DAP_DELETE(ret); @@ -1413,8 +1394,6 @@ void dap_chain_net_delete( dap_chain_net_t * a_net ) pthread_rwlock_destroy(&PVT(a_net)->downlinks_lock); pthread_rwlock_destroy(&PVT(a_net)->balancer_lock); pthread_rwlock_destroy(&PVT(a_net)->states_lock); - pthread_rwlock_destroy(&PVT(a_net)->gdbs_lock); - pthread_rwlock_destroy(&PVT(a_net)->atoms_lock); if(PVT(a_net)->seed_aliases) { DAP_DELETE(PVT(a_net)->seed_aliases); PVT(a_net)->seed_aliases = NULL; diff --git a/modules/net/include/dap_chain_net.h b/modules/net/include/dap_chain_net.h index dd739d590761ede5d9a26e8ccdd7f61815b88f33..ac11bd752bcc80d38f1a78b952873a6dc5e1fdc7 100644 --- a/modules/net/include/dap_chain_net.h +++ b/modules/net/include/dap_chain_net.h @@ -41,8 +41,8 @@ along with any CellFrame SDK based project. If not, see <http://www.gnu.org/lic #include "dap_chain_datum_tx.h" #include "uthash.h" - #define DAP_CHAIN_NET_NAME_MAX 32 +#define DAP_BROADCAST_LIFETIME 15 // minutes struct dap_chain_node_info; typedef struct dap_chain_node_client dap_chain_node_client_t; diff --git a/modules/type/blocks/dap_chain_cs_blocks.c b/modules/type/blocks/dap_chain_cs_blocks.c index 3952e6674d2bac0535af0f6589f1d5dc44b53f5e..488c92796dc803ac4f336cd4154bc4ea6054c80c 100644 --- a/modules/type/blocks/dap_chain_cs_blocks.c +++ b/modules/type/blocks/dap_chain_cs_blocks.c @@ -116,6 +116,7 @@ static dap_chain_datum_tx_t* s_callback_atom_iter_find_by_tx_hash(dap_chain_t * dap_chain_hash_fast_t * a_tx_hash); static dap_chain_datum_t** s_callback_atom_get_datums(dap_chain_atom_ptr_t a_atom, size_t a_atom_size, size_t * a_datums_count); +static dap_time_t s_chain_callback_atom_get_timestamp(dap_chain_atom_ptr_t a_atom) { return ((dap_chain_block_t *)a_atom)->hdr.ts_created; } // Get blocks static dap_chain_atom_ptr_t s_callback_atom_iter_get_first( dap_chain_atom_iter_t * a_atom_iter, size_t *a_atom_size ); // Get the fisrt block static dap_chain_atom_ptr_t s_callback_atom_iter_get_next( dap_chain_atom_iter_t * a_atom_iter,size_t *a_atom_size ); // Get the next block @@ -208,15 +209,15 @@ int dap_chain_cs_blocks_new(dap_chain_t * a_chain, dap_config_t * a_chain_config a_chain->callback_atom_iter_create = s_callback_atom_iter_create; a_chain->callback_atom_iter_create_from = s_callback_atom_iter_create_from; a_chain->callback_atom_iter_delete = s_callback_atom_iter_delete; - // Linear pass through a_chain->callback_atom_iter_get_first = s_callback_atom_iter_get_first; // Get the fisrt element from chain a_chain->callback_atom_iter_get_next = s_callback_atom_iter_get_next; // Get the next element from chain from the current one - a_chain->callback_atom_get_datums = s_callback_atom_get_datums; - a_chain->callback_atom_iter_get_links = s_callback_atom_iter_get_links; // Get the next element from chain from the current one a_chain->callback_atom_iter_get_lasts = s_callback_atom_iter_get_lasts; + a_chain->callback_atom_get_datums = s_callback_atom_get_datums; + a_chain->callback_atom_get_timestamp = s_chain_callback_atom_get_timestamp; + a_chain->callback_atom_find_by_hash = s_callback_atom_iter_find_by_hash; a_chain->callback_tx_find_by_hash = s_callback_atom_iter_find_by_tx_hash; @@ -640,7 +641,7 @@ static void s_callback_cs_blocks_purge(dap_chain_t *a_chain) } pthread_rwlock_unlock(&PVT(l_blocks)->rwlock); dap_chain_block_chunks_delete(PVT(l_blocks)->chunks); - PVT(l_blocks)->chunks = DAP_NEW_Z(dap_chain_block_chunks_t); + PVT(l_blocks)->chunks = dap_chain_block_chunks_create(l_blocks); } /** diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 3ee05d0ec76f58f4bc16aaa0e84fabb10b599610..b26732edd004f27f63aa597dd41b114c422886fb 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -98,6 +98,7 @@ static dap_chain_atom_ptr_t s_chain_callback_atom_iter_find_by_hash(dap_chain_at static dap_chain_datum_tx_t* s_chain_callback_atom_find_by_tx_hash(dap_chain_t *a_chain, dap_chain_hash_fast_t *a_tx_hash); static dap_chain_datum_t** s_chain_callback_atom_get_datum(dap_chain_atom_ptr_t a_event, size_t a_atom_size, size_t *a_datums_count); +static dap_time_t s_chain_callback_atom_get_timestamp(dap_chain_atom_ptr_t a_atom) { return ((dap_chain_cs_dag_event_t *)a_atom)->header.ts_created; } // Get event(s) from dag static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get_first( dap_chain_atom_iter_t * a_atom_iter, size_t *a_atom_size ); // Get the fisrt event from dag static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get_next( dap_chain_atom_iter_t * a_atom_iter,size_t *a_atom_size ); // Get the next event from dag @@ -226,15 +227,15 @@ int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) a_chain->callback_atom_iter_create = s_chain_callback_atom_iter_create; a_chain->callback_atom_iter_create_from = s_chain_callback_atom_iter_create_from; a_chain->callback_atom_iter_delete = s_chain_callback_atom_iter_delete; - // Linear pass through a_chain->callback_atom_iter_get_first = s_chain_callback_atom_iter_get_first; // Get the fisrt element from chain a_chain->callback_atom_iter_get_next = s_chain_callback_atom_iter_get_next; // Get the next element from chain from the current one - a_chain->callback_atom_get_datums = s_chain_callback_atom_get_datum; - a_chain->callback_atom_iter_get_links = s_chain_callback_atom_iter_get_links; // Get the next element from chain from the current one a_chain->callback_atom_iter_get_lasts = s_chain_callback_atom_iter_get_lasts; + a_chain->callback_atom_get_datums = s_chain_callback_atom_get_datum; + a_chain->callback_atom_get_timestamp = s_chain_callback_atom_get_timestamp; + a_chain->callback_atom_find_by_hash = s_chain_callback_atom_iter_find_by_hash; a_chain->callback_tx_find_by_hash = s_chain_callback_atom_find_by_tx_hash;