From 6cdead680098d837805436265bd38682c6a4c177 Mon Sep 17 00:00:00 2001 From: Constantin P <papizh.konstantin@demlabs.net> Date: Fri, 13 Oct 2023 08:47:23 +0000 Subject: [PATCH] Develop+master 351 --- dap-sdk | 2 +- modules/channel/chain/dap_stream_ch_chain.c | 101 +++++- .../channel/chain/dap_stream_ch_chain_pkt.c | 36 ++- .../chain/include/dap_stream_ch_chain_pkt.h | 4 + .../consensus/dag-poa/dap_chain_cs_dag_poa.c | 11 +- modules/net/dap_chain_net.c | 299 ++++++++---------- modules/net/dap_chain_net_node_list.c | 2 +- modules/net/include/dap_chain_net.h | 28 +- 8 files changed, 270 insertions(+), 213 deletions(-) diff --git a/dap-sdk b/dap-sdk index 0d87419956..169b47e624 160000 --- a/dap-sdk +++ b/dap-sdk @@ -1 +1 @@ -Subproject commit 0d874199566279240c67dfccaf29cc1640a82c92 +Subproject commit 169b47e624b08e3a5907c06d673ae41bb2338803 diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index af9125b0b9..f41d4818e4 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -447,7 +447,7 @@ static bool s_sync_out_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_ar if (l_ch_chain->request_db_log) { if (s_debug_more) - log_it(L_DEBUG, "Sync out gdb proc, requested %"DAP_UINT64_FORMAT_U" transactions from address "NODE_ADDR_FP_STR, + log_it(L_DEBUG, "Sync out gdb proc, requested %"DAP_UINT64_FORMAT_U" records from address "NODE_ADDR_FP_STR, l_ch_chain->request_db_log->items_number, NODE_ADDR_FP_ARGS_S(l_sync_request->request.node_addr)); l_sync_request->gdb.db_log = l_ch_chain->request_db_log; dap_proc_thread_worker_exec_callback_inter(a_thread, l_sync_request->worker->id, s_sync_out_gdb_first_worker_callback, l_sync_request ); @@ -834,7 +834,7 @@ static bool s_chain_timer_callback(void *a_arg) dap_worker_t *l_worker = dap_worker_get_current(); dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(l_worker), *(dap_stream_ch_uuid_t*)a_arg); if (!l_ch) { - //dap_chain_net_del_downlink((dap_stream_ch_uuid_t*)a_arg); + dap_chain_net_del_downlink((dap_stream_ch_uuid_t*)a_arg); DAP_DELETE(a_arg); return false; } @@ -866,8 +866,8 @@ static bool s_chain_timer_callback(void *a_arg) } if (l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB && l_ch_chain->sent_breaks >= 3 * DAP_SYNC_TICKS_PER_SECOND) { debug_if(s_debug_more, L_INFO, "Send one global_db TSD packet (rest=%zu/%zu items)", - dap_db_log_list_get_count_rest(l_ch_chain->request_db_log), - dap_db_log_list_get_count(l_ch_chain->request_db_log)); + l_ch_chain->request_db_log->items_rest, + l_ch_chain->request_db_log->items_number); dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); @@ -1632,7 +1632,41 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) bool l_go_idle = false, l_was_sent_smth = false; switch (l_ch_chain->state) { // Update list of global DB records to remote - case CHAIN_STATE_UPDATE_GLOBAL_DB: { + case CHAIN_STATE_UPDATE_GLOBAL_DB: { + size_t i, q = + // s_update_pack_size; + 0; + dap_db_log_list_obj_t **l_objs = dap_db_log_list_get_multiple(l_ch_chain->request_db_log, /*DAP_STREAM_PKT_SIZE_MAX*/ 0, &q); + dap_stream_ch_chain_update_element_t *l_data = DAP_NEW_Z_COUNT(dap_stream_ch_chain_update_element_t, q); + for (i = 0; i < q; ++i) { + l_data[i].hash = l_objs[i]->hash; + l_data[i].size = l_objs[i]->pkt->data_size; + DAP_DELETE(l_objs[i]->pkt); + DAP_DELETE(l_objs[i]); + } + if (i) { + l_was_sent_smth = true; + s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB, + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, + l_data, i * sizeof(dap_stream_ch_chain_update_element_t)); + l_ch_chain->stats_request_gdb_processed += i; + DAP_DELETE(l_data); + DAP_DELETE(l_objs); + debug_if(s_debug_more, L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB, %zu records", i); + } else if (!l_objs) { + l_was_sent_smth = true; + l_ch_chain->request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(dap_chain_net_by_id( + l_ch_chain->request_hdr.net_id)); + s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END, + l_ch_chain->request_hdr.net_id.uint64, + l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, + &l_ch_chain->request, sizeof(dap_stream_ch_chain_sync_request_t)); + debug_if(s_debug_more, L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END"); + l_go_idle = true; + } +#if 0 dap_stream_ch_chain_update_element_t l_data[s_update_pack_size]; uint_fast16_t i; dap_db_log_list_obj_t *l_obj = NULL; @@ -1667,10 +1701,62 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) log_it(L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END"); l_go_idle = true; } +#endif } break; // Synchronize GDB - case CHAIN_STATE_SYNC_GLOBAL_DB: { + case CHAIN_STATE_SYNC_GLOBAL_DB: { + dap_global_db_pkt_t *l_pkt = NULL; + size_t l_pkt_size = 0, i, q = 0; + dap_db_log_list_obj_t **l_objs = dap_db_log_list_get_multiple(l_ch_chain->request_db_log, DAP_STREAM_PKT_SIZE_MAX, &q); + for (i = 0; i < q; ++i) { + dap_stream_ch_chain_hash_item_t *l_hash_item = NULL; + unsigned l_hash_item_hashv = 0; + HASH_VALUE(&l_objs[i]->hash, sizeof(dap_chain_hash_fast_t), l_hash_item_hashv); + HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_gdbs, &l_objs[i]->hash, + sizeof(dap_hash_fast_t), l_hash_item_hashv, l_hash_item); + if (!l_hash_item) { + l_hash_item = DAP_NEW_Z(dap_stream_ch_chain_hash_item_t); + *l_hash_item = (dap_stream_ch_chain_hash_item_t) { + .hash = l_objs[i]->hash, .size = l_objs[i]->pkt->data_size + }; + HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_gdbs, hash, sizeof(dap_chain_hash_fast_t), + l_hash_item_hashv, l_hash_item); + l_pkt = dap_global_db_pkt_pack(l_pkt, l_objs[i]->pkt); + l_ch_chain->stats_request_gdb_processed++; + l_pkt_size = sizeof(dap_global_db_pkt_t) + l_pkt->data_size; + } + + DAP_DELETE(l_objs[i]->pkt); + DAP_DELETE(l_objs[i]); + } + + if (l_pkt_size) { + l_was_sent_smth = true; + // If request was from defined node_addr we update its state + s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, l_pkt, l_pkt_size); + debug_if(s_debug_more, L_INFO, "Send one global_db packet, size %zu, rest %zu/%zu items", l_pkt_size, + l_ch_chain->request_db_log->items_rest, + l_ch_chain->request_db_log->items_number); + DAP_DELETE(l_pkt); + DAP_DELETE(l_objs); + } else if (!l_objs) { + l_was_sent_smth = true; + // last message + dap_stream_ch_chain_sync_request_t l_request = { }; + s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); + l_go_idle = true; + if (l_ch_chain->callback_notify_packet_out) + l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + NULL, 0, l_ch_chain->callback_notify_arg); + log_it(L_INFO,"Syncronized database: items syncronyzed %"DAP_UINT64_FORMAT_U" of %zu", + l_ch_chain->stats_request_gdb_processed, l_ch_chain->request_db_log->items_number); + } +#if 0 // Get global DB record dap_global_db_pkt_t *l_pkt = NULL; dap_db_log_list_obj_t *l_obj = NULL; @@ -1738,7 +1824,8 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, NULL, 0, l_ch_chain->callback_notify_arg); } - } break; +#endif + } break; // Update list of atoms to remote case CHAIN_STATE_UPDATE_CHAINS:{ diff --git a/modules/channel/chain/dap_stream_ch_chain_pkt.c b/modules/channel/chain/dap_stream_ch_chain_pkt.c index 2aeced269a..e4b48b5a6b 100644 --- a/modules/channel/chain/dap_stream_ch_chain_pkt.c +++ b/modules/channel/chain/dap_stream_ch_chain_pkt.c @@ -85,7 +85,9 @@ size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_strea const void * a_data, size_t a_data_size) { size_t l_chain_pkt_size = sizeof(dap_stream_ch_chain_pkt_hdr_t) + a_data_size; - dap_stream_ch_chain_pkt_t *l_chain_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size ); + dap_stream_ch_chain_pkt_t *l_chain_pkt = l_chain_pkt_size > 0x3FFF + ? DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size) + : DAP_NEW_STACK_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size); *l_chain_pkt = (dap_stream_ch_chain_pkt_t){ .hdr = { .version = DAP_STREAM_CH_CHAIN_PKT_VERSION, .net_id.uint64 = a_net_id, .cell_id.uint64 = a_cell_id, .chain_id.uint64 = a_chain_id } }; @@ -94,7 +96,37 @@ size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_strea memcpy(l_chain_pkt->data, a_data, a_data_size); size_t l_ret = dap_stream_ch_pkt_write_mt(a_worker, a_ch_uuid, a_type, l_chain_pkt, l_chain_pkt_size); - DAP_DELETE(l_chain_pkt); + if (l_chain_pkt_size > 0x3FFF) + DAP_DELETE(l_chain_pkt); + return l_ret; +} + +size_t dap_stream_ch_chain_pkt_write_multi_mt(dap_stream_ch_cachet_t *a_links, size_t a_count, uint8_t a_type,uint64_t a_net_id, + uint64_t a_chain_id, uint64_t a_cell_id, + const void * a_data, size_t a_data_size) +{ + size_t l_chain_pkt_size = sizeof(dap_stream_ch_chain_pkt_hdr_t) + a_data_size; + dap_stream_ch_chain_pkt_t *l_chain_pkt = l_chain_pkt_size > 0x3FFF + ? DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size) + : DAP_NEW_STACK_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size); + *l_chain_pkt = (dap_stream_ch_chain_pkt_t){ + .hdr = { .version = DAP_STREAM_CH_CHAIN_PKT_VERSION, .net_id.uint64 = a_net_id, .cell_id.uint64 = a_cell_id, .chain_id.uint64 = a_chain_id } + }; + + if (a_data_size && a_data) + memcpy(l_chain_pkt->data, a_data, a_data_size); + size_t l_ret = 0, l_tmp = 0, i; + for (i = 0; i < a_count; ++i) { + l_tmp = dap_stream_ch_pkt_write_mt(a_links[i].stream_worker, a_links[i].uuid, a_type, l_chain_pkt, l_chain_pkt_size); + if (!l_tmp) { + l_ret = 0; + break; + } else { + l_ret += l_tmp; + } + } + if (l_chain_pkt_size > 0x3FFF) + DAP_DELETE(l_chain_pkt); return l_ret; } diff --git a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h index a54a3b2eed..69af3d189f 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h +++ b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h @@ -149,6 +149,10 @@ size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_strea uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size); +size_t dap_stream_ch_chain_pkt_write_multi_mt(dap_stream_ch_cachet_t *a_links, size_t a_count, uint8_t a_type, uint64_t a_net_id, + uint64_t a_chain_id, uint64_t a_cell_id, + const void * a_data, size_t a_data_size); + size_t dap_stream_ch_chain_pkt_write_inter(dap_events_socket_t * a_es_input, dap_stream_ch_uuid_t a_ch_uuid, uint8_t a_type,uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size); diff --git a/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c b/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c index 846181534b..c2fcd8abb9 100644 --- a/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c +++ b/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c @@ -528,19 +528,19 @@ static dap_chain_cs_dag_event_round_item_t *s_round_event_choose_dup(dap_list_t enum dap_chain_poa_round_filter_stage l_stage = DAP_CHAIN_POA_ROUND_FILTER_STAGE_START; while (l_stage++ < DAP_CHAIN_POA_ROUND_FILTER_STAGE_MAX) { dap_list_t *it, *tmp; - for (it = l_dups, tmp = l_dups ? l_dups->next : NULL; it; it = tmp, tmp = tmp ? tmp->next : NULL) { + DL_FOREACH_SAFE(l_dups, it, tmp) { l_round_item = (dap_chain_cs_dag_event_round_item_t *)it->data; l_event = (dap_chain_cs_dag_event_t *)l_round_item->event_n_signs; switch (l_stage) { case DAP_CHAIN_POA_ROUND_FILTER_STAGE_SIGNS: if (l_event->header.signs_count != a_max_signs_counts) - l_dups = dap_list_remove_link(l_dups, it); + l_dups = dap_list_delete_link(l_dups, it); else if (l_round_item->round_info.ts_update < l_min_ts_update) l_min_ts_update = l_round_item->round_info.ts_update; break; case DAP_CHAIN_POA_ROUND_FILTER_STAGE_TS: if (l_round_item->round_info.ts_update != l_min_ts_update) - l_dups = dap_list_remove_link(l_dups, it); + l_dups = dap_list_delete_link(l_dups, it); else { s_event_get_unique_mem_region(l_round_item, l_event_mem_region); if (memcmp(l_winner_mem_region, l_event_mem_region, DAP_CHAIN_POA_ROUND_FILTER_MEM_SIZE)) @@ -550,8 +550,9 @@ static dap_chain_cs_dag_event_round_item_t *s_round_event_choose_dup(dap_list_t case DAP_CHAIN_POA_ROUND_FILTER_STAGE_MEM: s_event_get_unique_mem_region(l_round_item, l_event_mem_region); if (memcmp(l_winner_mem_region, l_event_mem_region, DAP_CHAIN_POA_ROUND_FILTER_MEM_SIZE)) - l_dups = dap_list_remove_link(l_dups, it); - default: break; + l_dups = dap_list_delete_link(l_dups, it); + default: + break; } } unsigned int l_dups_count = dap_list_length(l_dups); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 9e84e2ca26..6b0f8ff84f 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -149,6 +149,7 @@ struct net_link { }; struct downlink { + dap_chain_net_t *net; dap_stream_worker_t *worker; dap_stream_ch_uuid_t ch_uuid; dap_events_socket_uuid_t esocket_uuid; @@ -204,16 +205,13 @@ typedef struct dap_chain_net_pvt{ uint16_t *seed_nodes_ports; uint64_t *seed_nodes_addrs; - dap_chain_net_state_t state; - dap_chain_net_state_t state_target; + _Atomic(dap_chain_net_state_t) state, state_target; uint16_t acl_idx; // Main loop timer dap_interval_timer_t main_timer; - - pthread_mutex_t uplinks_mutex; - pthread_rwlock_t downlinks_lock; + pthread_mutex_t uplinks_mutex, downlinks_mutex; pthread_rwlock_t states_lock; dap_list_t *gdb_notifiers; @@ -222,30 +220,32 @@ typedef struct dap_chain_net_pvt{ } dap_chain_net_pvt_t; typedef struct dap_chain_net_item{ - char name [DAP_CHAIN_NET_NAME_MAX]; + char name[DAP_CHAIN_NET_NAME_MAX]; dap_chain_net_id_t net_id; - dap_chain_net_t * chain_net; - UT_hash_handle hh; - UT_hash_handle hh2; + dap_chain_net_t *chain_net; + UT_hash_handle hh, hh2; } dap_chain_net_item_t; #define PVT(a) ( (dap_chain_net_pvt_t *) (void*) a->pvt ) #define PVT_S(a) ( (dap_chain_net_pvt_t *) (void*) a.pvt ) -static pthread_rwlock_t s_net_items_rwlock = PTHREAD_RWLOCK_INITIALIZER; static dap_chain_net_item_t *s_net_items = NULL, *s_net_ids = NULL; -static const char * c_net_states[]={ - [NET_STATE_OFFLINE] = "NET_STATE_OFFLINE", - [NET_STATE_LINKS_PREPARE ] = "NET_STATE_LINKS_PREPARE", - [NET_STATE_LINKS_CONNECTING] = "NET_STATE_LINKS_CONNECTING", - [NET_STATE_LINKS_ESTABLISHED]= "NET_STATE_LINKS_ESTABLISHED", - [NET_STATE_SYNC_GDB]= "NET_STATE_SYNC_GDB", - [NET_STATE_SYNC_CHAINS]= "NET_STATE_SYNC_CHAINS", - [NET_STATE_ADDR_REQUEST]= "NET_STATE_ADDR_REQUEST", - [NET_STATE_ONLINE]= "NET_STATE_ONLINE" +static const char *c_net_states[] = { + [NET_STATE_OFFLINE] = "NET_STATE_OFFLINE", + [NET_STATE_LINKS_PREPARE ] = "NET_STATE_LINKS_PREPARE", + [NET_STATE_LINKS_CONNECTING] = "NET_STATE_LINKS_CONNECTING", + [NET_STATE_LINKS_ESTABLISHED] = "NET_STATE_LINKS_ESTABLISHED", + [NET_STATE_SYNC_GDB] = "NET_STATE_SYNC_GDB", + [NET_STATE_SYNC_CHAINS] = "NET_STATE_SYNC_CHAINS", + [NET_STATE_ADDR_REQUEST] = "NET_STATE_ADDR_REQUEST", + [NET_STATE_ONLINE] = "NET_STATE_ONLINE" }; +static inline const char * dap_chain_net_state_to_str(dap_chain_net_state_t a_state) { + return a_state < NET_STATE_OFFLINE || a_state > NET_STATE_ONLINE ? "NET_STATE_INVALID" : c_net_states[a_state]; +} + // Node link callbacks static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_client, void * a_arg); static void s_node_link_callback_disconnected(dap_chain_node_client_t * a_node_client, void * a_arg); @@ -427,16 +427,12 @@ int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_n log_it(L_ERROR, "Can't change state of loading network '%s'", a_net->pub.name); return -1; } - pthread_rwlock_wrlock(&PVT(a_net)->states_lock); if (PVT(a_net)->state != NET_STATE_OFFLINE){ PVT(a_net)->state = PVT(a_net)->state_target = NET_STATE_OFFLINE; - pthread_rwlock_unlock(&PVT(a_net)->states_lock); s_net_states_proc(NULL, a_net); - pthread_rwlock_wrlock(&PVT(a_net)->states_lock); } PVT(a_net)->state_target = a_new_state; //PVT(a_net)->flags |= F_DAP_CHAIN_NET_SYNC_FROM_ZERO; // TODO set this flag according to -mode argument from command line - pthread_rwlock_unlock(&PVT(a_net)->states_lock); if (a_new_state == NET_STATE_OFFLINE) return 0; return dap_proc_queue_add_callback(dap_events_worker_get_auto(), s_net_states_proc, a_net); @@ -444,9 +440,7 @@ int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_n dap_chain_net_state_t dap_chain_net_get_target_state(dap_chain_net_t *a_net) { - pthread_rwlock_rdlock(&PVT(a_net)->states_lock); dap_chain_net_state_t l_ret = PVT(a_net)->state_target; - pthread_rwlock_unlock(&PVT(a_net)->states_lock); return l_ret; } @@ -467,57 +461,62 @@ void dap_chain_net_add_gdb_notify_callback(dap_chain_net_t *a_net, dap_store_obj PVT(a_net)->gdb_notifiers = dap_list_append(PVT(a_net)->gdb_notifiers, l_notifier); } -int dap_chain_net_add_downlink(dap_chain_net_t *a_net, dap_stream_worker_t *a_worker, +void dap_chain_net_add_downlink_cb(UNUSED_ARG dap_worker_t *a_worker, void *a_arg) { + struct downlink *l_downlink = (struct downlink*)a_arg; + if (!l_downlink->net) { + DAP_DELETE(l_downlink); + return; + } + dap_chain_net_pvt_t *l_net_pvt = PVT(l_downlink->net); + unsigned a_hash_value; + HASH_VALUE(&l_downlink->ch_uuid, sizeof(l_downlink->ch_uuid), a_hash_value); + struct downlink *l_sought_downlink = NULL; + pthread_mutex_lock(&l_net_pvt->downlinks_mutex); + HASH_FIND_BYHASHVALUE(hh, l_net_pvt->downlinks, &l_downlink->ch_uuid, sizeof(l_downlink->ch_uuid), a_hash_value, l_sought_downlink); + if (l_sought_downlink) { + pthread_mutex_unlock(&l_net_pvt->downlinks_mutex); + DAP_DELETE(l_downlink); + return; + } + HASH_ADD_BYHASHVALUE(hh, l_net_pvt->downlinks, ch_uuid, sizeof(l_downlink->ch_uuid), a_hash_value, l_downlink); + pthread_mutex_unlock(&l_net_pvt->downlinks_mutex); +} + +void dap_chain_net_add_downlink(dap_chain_net_t *a_net, dap_stream_worker_t *a_worker, dap_stream_ch_uuid_t a_ch_uuid, dap_events_socket_uuid_t a_esocket_uuid, char *a_addr, int a_port) { - if (!a_net || !a_worker) - return -1; - dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); - unsigned a_hash_value; - HASH_VALUE(&a_ch_uuid, sizeof(a_ch_uuid), a_hash_value); - struct downlink *l_downlink = NULL; - pthread_rwlock_wrlock(&l_net_pvt->downlinks_lock); - HASH_FIND_BYHASHVALUE(hh, l_net_pvt->downlinks, &a_ch_uuid, sizeof(a_ch_uuid), a_hash_value, l_downlink); - if (l_downlink) { - pthread_rwlock_unlock(&l_net_pvt->downlinks_lock); - return -2; - } - l_downlink = DAP_NEW_Z(struct downlink); + struct downlink *l_downlink = DAP_NEW_Z(struct downlink); if (!l_downlink) { log_it(L_CRITICAL, "Memory allocation error"); - pthread_rwlock_unlock(&l_net_pvt->downlinks_lock); - return -1; + return; } *l_downlink = (struct downlink) { - .worker = a_worker, - .ch_uuid = a_ch_uuid, - .esocket_uuid = a_esocket_uuid, - .port = a_port + .net = a_net, + .worker = a_worker, + .ch_uuid = a_ch_uuid, + .esocket_uuid = a_esocket_uuid, + .port = a_port }; strncpy(l_downlink->addr, a_addr, INET_ADDRSTRLEN - 1); - HASH_ADD_BYHASHVALUE(hh, l_net_pvt->downlinks, ch_uuid, sizeof(a_ch_uuid), a_hash_value, l_downlink); - pthread_rwlock_unlock(&l_net_pvt->downlinks_lock); - return 0; + dap_worker_exec_callback_on(a_worker->worker, dap_chain_net_add_downlink_cb, l_downlink); } void dap_chain_net_del_downlink(dap_stream_ch_uuid_t *a_ch_uuid) { unsigned l_hash_value; - HASH_VALUE(*a_ch_uuid, sizeof(*a_ch_uuid), l_hash_value); - pthread_rwlock_rdlock(&s_net_items_rwlock); + HASH_VALUE(a_ch_uuid, sizeof(*a_ch_uuid), l_hash_value); struct downlink *l_downlink = NULL; for (dap_chain_net_item_t *l_net_item = s_net_items; l_net_item && !l_downlink; l_net_item = l_net_item->hh.next) { dap_chain_net_pvt_t *l_net_pvt = PVT(l_net_item->chain_net); - pthread_rwlock_wrlock(&l_net_pvt->downlinks_lock); + pthread_mutex_lock(&l_net_pvt->downlinks_mutex); HASH_FIND_BYHASHVALUE(hh, l_net_pvt->downlinks, a_ch_uuid, sizeof(*a_ch_uuid), l_hash_value, l_downlink); if (l_downlink) { HASH_DEL(l_net_pvt->downlinks, l_downlink); log_it(L_MSG, "Remove downlink %s : %d from net ht", l_downlink->addr, l_downlink->port); DAP_DELETE(l_downlink); } - pthread_rwlock_unlock(&l_net_pvt->downlinks_lock); + pthread_mutex_unlock(&l_net_pvt->downlinks_mutex); } - pthread_rwlock_unlock(&s_net_items_rwlock); } /** @@ -538,25 +537,31 @@ void dap_chain_net_sync_gdb_broadcast(dap_global_db_context_t *a_context, dap_st if (l_time_diff > DAP_BROADCAST_LIFETIME * 60) return; - dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; + dap_chain_net_t *l_net = (dap_chain_net_t*)a_arg; dap_global_db_pkt_t *l_data_out = dap_global_db_pkt_serialize(a_obj); struct downlink *l_link, *l_tmp; - pthread_rwlock_wrlock(&PVT(l_net)->downlinks_lock); + dap_stream_ch_cachet_t *l_active_downs = NULL; + pthread_mutex_lock(&PVT(l_net)->downlinks_mutex); + size_t l_new_count = 0, l_count = HASH_COUNT(PVT(l_net)->downlinks); + l_active_downs = DAP_NEW_Z_COUNT(dap_stream_ch_cachet_t, l_count); HASH_ITER(hh, PVT(l_net)->downlinks, l_link, l_tmp) { - bool l_ch_alive = dap_stream_ch_check_uuid_mt(l_link->worker, l_link->ch_uuid); - if (!l_ch_alive) { - HASH_DEL(PVT(l_net)->downlinks, l_link); - DAP_DELETE(l_link); - continue; + if (dap_stream_ch_check_uuid_mt(l_link->worker, l_link->ch_uuid)) { + l_active_downs[l_new_count++] = (dap_stream_ch_cachet_t){ .stream_worker = l_link->worker, .uuid = l_link->ch_uuid }; } - if (!dap_stream_ch_chain_pkt_write_mt(l_link->worker, //_inter(a_context->queue_worker_ch_io_input[l_link->worker->worker->id], - l_link->ch_uuid, - DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_net->pub.id.uint64, - 0, 0, l_data_out, - sizeof(dap_global_db_pkt_t) + l_data_out->data_size)) - debug_if(g_debug_reactor, L_ERROR, "Can't send pkt to worker (%d) for writing", l_link->worker->worker->id); } - pthread_rwlock_unlock(&PVT(l_net)->downlinks_lock); + pthread_mutex_unlock(&PVT(l_net)->downlinks_mutex); + if (l_new_count < l_count) { + l_active_downs = DAP_REALLOC_COUNT(l_active_downs, l_new_count); + } + if (!dap_stream_ch_chain_pkt_write_multi_mt(l_active_downs, //_inter(a_context->queue_worker_ch_io_input[l_link->worker->worker->id], + l_new_count, + DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_net->pub.id.uint64, + 0, 0, l_data_out, + sizeof(dap_global_db_pkt_t) + l_data_out->data_size)) + debug_if(g_debug_reactor, L_ERROR, "Can't broadcast pkt"); + + DAP_DELETE(l_active_downs); + DAP_DELETE(l_data_out); } struct net_broadcast_atoms_args { @@ -574,20 +579,24 @@ static bool s_net_send_atoms(dap_proc_thread_t *a_thread, void *a_arg) struct net_broadcast_atoms_args *l_args = a_arg; dap_chain_net_t *l_net = l_args->net; struct downlink *l_link, *l_tmp; - pthread_rwlock_wrlock(&PVT(l_net)->downlinks_lock); + dap_stream_ch_cachet_t *l_active_downs = NULL; + pthread_mutex_lock(&PVT(l_net)->downlinks_mutex); + size_t l_new_count = 0, l_count = HASH_COUNT(PVT(l_net)->downlinks); + l_active_downs = DAP_NEW_Z_COUNT(dap_stream_ch_cachet_t, l_count); HASH_ITER(hh, PVT(l_net)->downlinks, l_link, l_tmp) { - bool l_ch_alive = dap_stream_ch_check_uuid_mt(l_link->worker, l_link->ch_uuid); - if (!l_ch_alive) { - HASH_DEL(PVT(l_net)->downlinks, l_link); - DAP_DELETE(l_link); - continue; + if (dap_stream_ch_check_uuid_mt(l_link->worker, l_link->ch_uuid)) { + l_active_downs[l_new_count++] = (dap_stream_ch_cachet_t){ .stream_worker = l_link->worker, .uuid = l_link->ch_uuid }; } - if(!dap_stream_ch_chain_pkt_write_mt(l_link->worker, l_link->ch_uuid, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, - l_net->pub.id.uint64, l_args->chain_id, l_args->cell_id, - l_args->atom, l_args->atom_size)) - debug_if(g_debug_reactor, L_ERROR, "Can't send atom to worker (%d) for writing", l_link->worker->worker->id); } - pthread_rwlock_unlock(&PVT(l_net)->downlinks_lock); + pthread_mutex_unlock(&PVT(l_net)->downlinks_mutex); + if (l_new_count < l_count) { + l_active_downs = DAP_REALLOC_COUNT(l_active_downs, l_new_count); + } + if(!dap_stream_ch_chain_pkt_write_multi_mt(l_active_downs, l_new_count, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, + l_net->pub.id.uint64, l_args->chain_id, l_args->cell_id, + l_args->atom, l_args->atom_size)) + debug_if(g_debug_reactor, L_ERROR, "Can't broadcast atom"); + DAP_DELETE(l_active_downs); DAP_DELETE(l_args->atom); DAP_DELETE(l_args); return true; @@ -605,13 +614,7 @@ static void s_chain_callback_notify(void *a_arg, dap_chain_t *a_chain, dap_chain log_it(L_ERROR, "Argument is NULL for s_chain_callback_notify"); return; } - dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; - int l_downcount = 0; - pthread_rwlock_rdlock(&PVT(l_net)->downlinks_lock); - l_downcount = HASH_COUNT(PVT(l_net)->downlinks); - pthread_rwlock_unlock(&PVT(l_net)->downlinks_lock); - if (!l_downcount) - return; + dap_chain_net_t *l_net = (dap_chain_net_t*)a_arg; // Check object lifetime for broadcasting decision dap_time_t l_time_diff = dap_time_now() - a_chain->callback_atom_get_timestamp(a_atom); if (l_time_diff > DAP_BROADCAST_LIFETIME * 60) @@ -736,20 +739,24 @@ static struct net_link *s_net_link_find(dap_chain_net_t *a_net, dap_chain_node_i static int s_net_link_add(dap_chain_net_t *a_net, dap_chain_node_info_t *a_link_node_info) { - dap_chain_net_pvt_t *l_pvt_net = PVT(a_net); - if (HASH_COUNT(PVT(a_net)->net_links) >= PVT(a_net)->max_links_count) - return +1; if (!a_link_node_info) return -1; + dap_chain_net_pvt_t *l_pvt_net = PVT(a_net); + pthread_mutex_lock(&l_pvt_net->uplinks_mutex); + if (HASH_COUNT(l_pvt_net->net_links) >= PVT(a_net)->max_links_count) { + pthread_mutex_unlock(&l_pvt_net->uplinks_mutex); + return 1; + } uint64_t l_own_addr = dap_chain_net_get_cur_addr_int(a_net); - if (a_link_node_info->hdr.address.uint64 == l_own_addr) + if (a_link_node_info->hdr.address.uint64 == l_own_addr) { + pthread_mutex_unlock(&l_pvt_net->uplinks_mutex); return -2; + } uint64_t l_addr = a_link_node_info->hdr.ext_addr_v4.s_addr; struct net_link *l_new_link; - pthread_mutex_lock(&PVT(a_net)->uplinks_mutex); - HASH_FIND(hh, PVT(a_net)->net_links, &l_addr, sizeof(l_addr), l_new_link); + HASH_FIND(hh, l_pvt_net->net_links, &l_addr, sizeof(l_addr), l_new_link); if (l_new_link) { - pthread_mutex_unlock(&PVT(a_net)->uplinks_mutex); + pthread_mutex_unlock(&l_pvt_net->uplinks_mutex); return -3; } l_new_link = DAP_NEW_Z(struct net_link); @@ -892,7 +899,7 @@ static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_clie if ( s_debug_more ) log_it(L_NOTICE, "Established connection with %s."NODE_ADDR_FP_STR,l_net->pub.name, NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr)); - pthread_mutex_lock(&l_net_pvt->uplinks_mutex); + pthread_rwlock_wrlock(&l_net_pvt->states_lock); a_node_client->is_connected = true; struct json_object *l_json = s_net_states_json_collect(l_net); char l_err_str[128] = { }; @@ -906,8 +913,7 @@ static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_clie l_net_pvt->state = NET_STATE_LINKS_ESTABLISHED; dap_proc_queue_add_callback_inter(a_node_client->stream_worker->worker->proc_queue_input,s_net_states_proc,l_net ); } - pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); - + pthread_rwlock_unlock(&l_net_pvt->states_lock); } /** @@ -938,17 +944,17 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t *a_node_cl dap_chain_node_client_close_mt(a_node_client); // Remove it on next context iteration struct net_link *l_free_link = s_get_free_link(l_net); if (l_free_link) { - s_net_link_start(l_net, l_free_link, l_net_pvt->reconnect_delay); pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); + s_net_link_start(l_net, l_free_link, l_net_pvt->reconnect_delay); return; } + size_t l_current_links_prepared = HASH_COUNT(l_net_pvt->net_links); + pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); if (!l_net_pvt->only_static_links) { - size_t l_current_links_prepared = HASH_COUNT(l_net_pvt->net_links); for (size_t i = l_current_links_prepared; i < l_net_pvt->max_links_count ; i++) { s_new_balancer_link_request(l_net, 0); } } - pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); } } @@ -1042,11 +1048,9 @@ static void s_net_links_complete_and_start(dap_chain_net_t *a_net, dap_worker_t } if (HASH_COUNT(l_net_pvt->net_links) < l_net_pvt->max_links_count) s_fill_links_from_root_aliases(a_net); // Comlete the sentence - pthread_rwlock_wrlock(&l_net_pvt->states_lock); if (l_net_pvt->state_target != NET_STATE_OFFLINE){ l_net_pvt->state = NET_STATE_LINKS_CONNECTING; } - pthread_rwlock_unlock(&l_net_pvt->states_lock); dap_proc_queue_add_callback_inter(a_worker->proc_queue_input, s_net_states_proc, a_net); } } @@ -1107,9 +1111,7 @@ static void s_net_balancer_link_prepare_success(dap_worker_t * a_worker, dap_cha if (l_balancer_request->link_replace_tries && s_net_get_active_links_count(l_net) < PVT(l_net)->required_links_count) { // Auto-start new link - pthread_rwlock_rdlock(&PVT(l_net)->states_lock); dap_chain_net_state_t l_net_state = PVT(l_net)->state_target; - pthread_rwlock_unlock(&PVT(l_net)->states_lock); if (l_net_state != NET_STATE_OFFLINE) { struct net_link *l_free_link = s_get_free_link(l_net); if (l_free_link) @@ -1194,12 +1196,9 @@ static bool s_new_balancer_link_request(dap_chain_net_t *a_net, int a_link_repla dap_chain_net_pvt_t *l_net_pvt = a_net ? PVT(a_net) : NULL; if (!l_net_pvt) return false; - pthread_rwlock_rdlock(&l_net_pvt->states_lock); if (l_net_pvt->state_target == NET_STATE_OFFLINE) { - pthread_rwlock_unlock(&l_net_pvt->states_lock); return false; } - pthread_rwlock_unlock(&l_net_pvt->states_lock); if (a_link_replace_tries >= 3) { // network problems, make static links s_fill_links_from_root_aliases(a_net); @@ -1374,15 +1373,14 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) l_net_pvt->state = NET_STATE_OFFLINE; } - pthread_rwlock_wrlock(&l_net_pvt->states_lock); - - switch (l_net_pvt->state) { + switch ((dap_chain_net_state_t)l_net_pvt->state) { // State OFFLINE where we don't do anything case NET_STATE_OFFLINE: { log_it(L_NOTICE,"%s.state: NET_STATE_OFFLINE", l_net->pub.name); // delete all links struct net_link *l_link, *l_link_tmp; struct downlink *l_downlink, *l_dltmp; + pthread_mutex_lock(&l_net_pvt->uplinks_mutex); HASH_ITER(hh, l_net_pvt->net_links, l_link, l_link_tmp) { if (l_link->delay_timer) dap_timerfd_delete_mt(l_link->delay_timer->worker, l_link->delay_timer->esocket_uuid); @@ -1395,13 +1393,14 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) DAP_DEL_Z(l_link->link_info); DAP_DELETE(l_link); } - pthread_rwlock_wrlock(&PVT(l_net)->downlinks_lock); + pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); + pthread_mutex_lock(&l_net_pvt->downlinks_mutex); HASH_ITER(hh, l_net_pvt->downlinks, l_downlink, l_dltmp) { HASH_DEL(l_net_pvt->downlinks, l_downlink); dap_events_socket_delete_mt(l_downlink->worker->worker, l_downlink->esocket_uuid); DAP_DELETE(l_downlink); } - pthread_rwlock_unlock(&PVT(l_net)->downlinks_lock); + pthread_mutex_unlock(&l_net_pvt->downlinks_mutex); l_net_pvt->balancer_link_requests = 0; l_net_pvt->active_link = NULL; dap_list_free(l_net_pvt->links_queue); @@ -1423,8 +1422,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) break; dap_chain_node_info_t *l_link_node_info = DAP_NEW_Z(dap_chain_node_info_t); if (!l_link_node_info) { - log_it(L_CRITICAL, "Memory allocation error"); - pthread_rwlock_unlock(&l_net_pvt->states_lock); + log_it(L_CRITICAL, "Memory allocation error"); return false; } l_link_node_info->hdr.address.uint64 = l_net_pvt->gdb_sync_nodes_addrs[i].uint64; @@ -1454,9 +1452,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) } // Get DNS request result from root nodes as synchronization links if (!l_net_pvt->only_static_links) { - pthread_rwlock_unlock(&l_net_pvt->states_lock); s_prepare_links_from_balancer(l_net); - pthread_rwlock_wrlock(&l_net_pvt->states_lock); } else { log_it(L_ATT, "Not use bootstrap addresses, fill seed nodelist from root aliases"); // Add other root nodes as synchronization links @@ -1499,15 +1495,13 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) default: log_it (L_DEBUG, "Unprocessed state"); } s_net_states_notify(l_net); - pthread_rwlock_unlock(&l_net_pvt->states_lock); - - return ! l_repeat_after_exit; + return !l_repeat_after_exit; } bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client) { dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); - int a_err = pthread_mutex_lock(&l_net_pvt->uplinks_mutex); + pthread_mutex_lock(&l_net_pvt->uplinks_mutex); bool l_found = false; if (l_net_pvt->active_link) { struct net_link *l_link, *l_link_tmp; @@ -1527,7 +1521,6 @@ bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t } if (l_found && !dap_list_find(l_net_pvt->links_queue, a_client, NULL)) l_net_pvt->links_queue = dap_list_append(l_net_pvt->links_queue, a_client); - //if (a_err != EDEADLK) pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); return !l_found; } @@ -1553,9 +1546,7 @@ bool dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t * l_ret = l_net_pvt->active_link; pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); if (!l_ret && l_net_pvt->state_target == NET_STATE_ONLINE && l_net_pvt->last_sync) { - pthread_rwlock_wrlock(&l_net_pvt->states_lock); l_net_pvt->state = NET_STATE_ONLINE; - pthread_rwlock_unlock(&l_net_pvt->states_lock); } return l_ret; } @@ -1595,8 +1586,8 @@ static dap_chain_net_t *s_net_new(const char *a_id, const char *a_name, pthread_mutexattr_settype(&l_mutex_attr, PTHREAD_MUTEX_RECURSIVE); pthread_mutex_init(&PVT(l_ret)->uplinks_mutex, &l_mutex_attr); pthread_mutex_init(&l_ret->pub.balancer_mutex, &l_mutex_attr); + pthread_mutex_init(&PVT(l_ret)->downlinks_mutex, &l_mutex_attr); pthread_mutexattr_destroy(&l_mutex_attr); - pthread_rwlock_init(&PVT(l_ret)->downlinks_lock, NULL); pthread_rwlock_init(&PVT(l_ret)->states_lock, NULL); if (dap_chain_net_id_parse(a_id, &l_ret->pub.id) != 0) { DAP_DELETE(l_ret); @@ -1633,8 +1624,8 @@ static dap_chain_net_t *s_net_new(const char *a_id, const char *a_name, void dap_chain_net_delete( dap_chain_net_t * a_net ) { pthread_mutex_destroy(&PVT(a_net)->uplinks_mutex); + pthread_mutex_destroy(&PVT(a_net)->downlinks_mutex); pthread_mutex_destroy(&a_net->pub.balancer_mutex); - pthread_rwlock_destroy(&PVT(a_net)->downlinks_lock); pthread_rwlock_destroy(&PVT(a_net)->states_lock); if(PVT(a_net)->seed_aliases) { DAP_DELETE(PVT(a_net)->seed_aliases); @@ -1654,14 +1645,13 @@ void dap_chain_net_load_all() if(!HASH_COUNT(s_net_items)){ log_it(L_ERROR, "Can't find any nets"); return; - } pthread_rwlock_rdlock(&s_net_items_rwlock); + } dap_chain_net_item_t *l_net_items_current = NULL, *l_net_items_tmp = NULL; HASH_ITER(hh, s_net_items, l_net_items_current, l_net_items_tmp) { if( (l_ret = s_net_load(l_net_items_current->chain_net)) ) { log_it(L_ERROR, "Loading chains of net %s finished with (%d) error code.", l_net_items_current->name, l_ret); } } - pthread_rwlock_unlock(&s_net_items_rwlock); } dap_string_t* dap_cli_list_net() @@ -1671,7 +1661,6 @@ dap_string_t* dap_cli_list_net() dap_chain_net_t * l_net = NULL; int l_net_i = 0; dap_string_append(l_string_ret,"Available networks and chains:\n"); - pthread_rwlock_rdlock(&s_net_items_rwlock); HASH_ITER(hh, s_net_items, l_net_item, l_net_item_tmp){ l_net = l_net_item->chain_net; dap_string_append_printf(l_string_ret, "\t%s:\n", l_net_item->name); @@ -1683,7 +1672,6 @@ dap_string_t* dap_cli_list_net() l_chain = l_chain->next; } } - pthread_rwlock_unlock(&s_net_items_rwlock); return l_string_ret; } @@ -1872,7 +1860,6 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) dap_chain_net_item_t * l_net_item, *l_net_item_tmp; int l_net_i = 0; dap_string_append(l_string_ret,"Networks:\n"); - pthread_rwlock_rdlock(&s_net_items_rwlock); HASH_ITER(hh, s_net_items, l_net_item, l_net_item_tmp){ l_net = l_net_item->chain_net; dap_string_append_printf(l_string_ret, "\t%s:\n", l_net_item->name); @@ -1891,7 +1878,6 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) l_chain = l_chain->next; } } - pthread_rwlock_unlock(&s_net_items_rwlock); } }else{ @@ -1899,12 +1885,10 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) // show list of nets dap_chain_net_item_t * l_net_item, *l_net_item_tmp; int l_net_i = 0; - pthread_rwlock_rdlock(&s_net_items_rwlock); HASH_ITER(hh, s_net_items, l_net_item, l_net_item_tmp){ dap_string_append_printf(l_string_ret, "\t%s\n", l_net_item->name); l_net_i++; } - pthread_rwlock_unlock(&s_net_items_rwlock); dap_string_append(l_string_ret, "\n"); } @@ -2382,25 +2366,21 @@ int s_net_init(const char * a_net_name, uint16_t a_acl_idx) return -1; } // check nets with same IDs and names - pthread_rwlock_rdlock(&s_net_items_rwlock); dap_chain_net_item_t *l_net_items_current = NULL, *l_net_items_tmp = NULL; HASH_ITER(hh, s_net_items, l_net_items_current, l_net_items_tmp) { if (l_net_items_current->net_id.uint64 == l_net->pub.id.uint64) { log_it(L_ERROR,"Can't create net %s, net %s has the same ID %"DAP_UINT64_FORMAT_U"", l_net->pub.name, l_net_items_current->name, l_net->pub.id.uint64); log_it(L_ERROR, "Please, fix your configs and restart node"); dap_chain_net_delete(l_net); - pthread_rwlock_unlock(&s_net_items_rwlock); return -1; } if (!strcmp(l_net_items_current->name, l_net->pub.name)) { log_it(L_ERROR,"Can't create l_net ID %"DAP_UINT64_FORMAT_U", net ID %"DAP_UINT64_FORMAT_U" has the same name %s", l_net->pub.id.uint64, l_net_items_current->net_id.uint64, l_net->pub.name); log_it(L_ERROR, "Please, fix your configs and restart node"); dap_chain_net_delete(l_net); - pthread_rwlock_unlock(&s_net_items_rwlock); return -1; } } - pthread_rwlock_unlock(&s_net_items_rwlock); dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); l_net_pvt->load_mode = true; l_net_pvt->acl_idx = a_acl_idx; @@ -2497,10 +2477,8 @@ int s_net_init(const char * a_net_name, uint16_t a_acl_idx) ,dap_config_get_item_str(l_cfg , "general" , "name" )); l_net_item->chain_net = l_net; l_net_item->net_id.uint64 = l_net->pub.id.uint64; - pthread_rwlock_wrlock(&s_net_items_rwlock); HASH_ADD_STR(s_net_items,name,l_net_item); HASH_ADD(hh2, s_net_ids, net_id, sizeof(l_net_item->net_id), l_net_item); - pthread_rwlock_unlock(&s_net_items_rwlock); // Check if seed nodes are present in local db alias char **l_seed_aliases = dap_config_get_array_str(l_cfg, "general", "seed_nodes_aliases", @@ -3023,7 +3001,6 @@ int s_net_load(dap_chain_net_t *a_net) */ void dap_chain_net_deinit() { - pthread_rwlock_wrlock(&s_net_items_rwlock); dap_chain_net_item_t *l_current_item, *l_tmp; HASH_ITER(hh, s_net_items, l_current_item, l_tmp) { HASH_DEL(s_net_items, l_current_item); @@ -3033,9 +3010,6 @@ void dap_chain_net_deinit() DAP_DEL_Z(l_current_item); } dap_chain_node_net_ban_list_deinit(); - pthread_rwlock_unlock(&s_net_items_rwlock); - pthread_rwlock_destroy(&s_net_items_rwlock); - } /** @@ -3046,13 +3020,11 @@ dap_chain_net_t **dap_chain_net_list(uint16_t *a_size) { if (!a_size) return NULL; - pthread_rwlock_rdlock(&s_net_items_rwlock); *a_size = HASH_COUNT(s_net_items); if(*a_size){ dap_chain_net_t **l_net_list = DAP_NEW_SIZE(dap_chain_net_t *, (*a_size) * sizeof(dap_chain_net_t *)); if (!l_net_list) { log_it(L_CRITICAL, "Memory allocation error"); - pthread_rwlock_unlock(&s_net_items_rwlock); return NULL; } dap_chain_net_item_t *l_current_item = NULL, *l_tmp = NULL; @@ -3062,10 +3034,8 @@ dap_chain_net_t **dap_chain_net_list(uint16_t *a_size) if(i >= *a_size) break; } - pthread_rwlock_unlock(&s_net_items_rwlock); return l_net_list; } else { - pthread_rwlock_unlock(&s_net_items_rwlock); return NULL; } } @@ -3075,13 +3045,11 @@ dap_chain_net_t **dap_chain_net_list(uint16_t *a_size) * @param a_name * @return */ -dap_chain_net_t * dap_chain_net_by_name( const char * a_name) +dap_chain_net_t *dap_chain_net_by_name(const char *a_name) { - dap_chain_net_item_t * l_net_item = NULL; - if(a_name) { - pthread_rwlock_rdlock(&s_net_items_rwlock); - HASH_FIND_STR(s_net_items,a_name,l_net_item ); - pthread_rwlock_unlock(&s_net_items_rwlock); + dap_chain_net_item_t *l_net_item = NULL; + if (a_name) { + HASH_FIND_STR(s_net_items,a_name,l_net_item); } return l_net_item ? l_net_item->chain_net : NULL; } @@ -3102,12 +3070,10 @@ dap_ledger_t * dap_chain_ledger_by_net_name( const char * a_net_name) * @param a_id * @return */ -dap_chain_net_t * dap_chain_net_by_id( dap_chain_net_id_t a_id) +dap_chain_net_t *dap_chain_net_by_id(dap_chain_net_id_t a_id) { - dap_chain_net_item_t * l_net_item = NULL; - pthread_rwlock_rdlock(&s_net_items_rwlock); + dap_chain_net_item_t *l_net_item = NULL; HASH_FIND(hh2, s_net_ids, &a_id, sizeof(a_id), l_net_item); - pthread_rwlock_unlock(&s_net_items_rwlock); return l_net_item ? l_net_item->chain_net : NULL; } @@ -3237,13 +3203,9 @@ char * dap_chain_net_get_gdb_group_mempool_by_chain_type(dap_chain_net_t *a_net, * @param l_net * @return */ -dap_chain_net_state_t dap_chain_net_get_state ( dap_chain_net_t * l_net) +dap_chain_net_state_t dap_chain_net_get_state (dap_chain_net_t * l_net) { - assert(l_net); - pthread_rwlock_rdlock(&PVT(l_net)->states_lock); - dap_chain_net_state_t l_ret = PVT(l_net)->state; - pthread_rwlock_unlock(&PVT(l_net)->states_lock); - return l_ret; + return PVT(l_net)->state; } /** @@ -3255,13 +3217,10 @@ void dap_chain_net_set_state(dap_chain_net_t *l_net, dap_chain_net_state_t a_sta { assert(l_net); log_it(L_DEBUG,"%s set state %s", l_net->pub.name, dap_chain_net_state_to_str(a_state)); - pthread_rwlock_wrlock(&PVT(l_net)->states_lock); - if( a_state == PVT(l_net)->state){ - pthread_rwlock_unlock(&PVT(l_net)->states_lock); + if(a_state == PVT(l_net)->state){ return; } PVT(l_net)->state = a_state; - pthread_rwlock_unlock(&PVT(l_net)->states_lock); dap_proc_queue_add_callback(dap_events_worker_get_auto(), s_net_states_proc,l_net); } @@ -3720,7 +3679,6 @@ void dap_chain_net_announce_addrs() { log_it(L_ERROR, "Can't find any nets"); return; } - pthread_rwlock_rdlock(&s_net_items_rwlock); dap_chain_net_item_t *l_net_item = NULL, *l_tmp = NULL; HASH_ITER(hh, s_net_items, l_net_item, l_tmp) { dap_chain_net_pvt_t *l_net_pvt = PVT(l_net_item->chain_net); @@ -3735,7 +3693,6 @@ void dap_chain_net_announce_addrs() { NODE_ADDR_FP_ARGS(l_net_pvt->node_addr), l_node_addr_str, l_net_pvt->node_info->hdr.ext_port, l_net_item->name); } } - pthread_rwlock_unlock(&s_net_items_rwlock); } char *dap_chain_net_links_dump(dap_chain_net_t *a_net) { @@ -3756,14 +3713,14 @@ char *dap_chain_net_links_dump(dap_chain_net_t *a_net) { dap_string_t *l_str_downlinks = dap_string_new("---------------------------\n" "| ↑\\↓ |\t#\t|\t\tIP\t\t|\tPort\t|\n"); pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); - pthread_rwlock_rdlock(&l_net_pvt->downlinks_lock); + pthread_mutex_lock(&l_net_pvt->downlinks_mutex); struct downlink *l_downlink = NULL, *l_downtmp = NULL; HASH_ITER(hh, l_net_pvt->downlinks, l_downlink, l_downtmp) { dap_string_append_printf(l_str_downlinks, "| ↓ |\t%zu\t|\t%s\t\t|\t%u\t|\n", ++l_down_count, l_downlink->addr, l_downlink->port); } - pthread_rwlock_unlock(&l_net_pvt->downlinks_lock); + pthread_mutex_unlock(&l_net_pvt->downlinks_mutex); char *l_res_str = dap_strdup_printf("Count links: %zu\n\nUplinks: %zu\n%s\n\nDownlinks: %zu\n%s\n", l_up_count + l_down_count, l_up_count, l_str_uplinks->str, l_down_count, l_str_downlinks->str); @@ -3777,7 +3734,7 @@ char *dap_chain_net_links_dump(dap_chain_net_t *a_net) { * @param - * @return 0 if no errors */ -int s_net_init_node_addr_cert() +int s_net_init_node_addr_cert() { dap_config_t *l_cfg = NULL; dap_string_t *l_cfg_path = dap_string_new("cellframe-node"); @@ -3824,4 +3781,4 @@ int s_net_init_node_addr_cert() } } return 0; -} \ No newline at end of file +} diff --git a/modules/net/dap_chain_net_node_list.c b/modules/net/dap_chain_net_node_list.c index ff95622939..2043d5b3f2 100644 --- a/modules/net/dap_chain_net_node_list.c +++ b/modules/net/dap_chain_net_node_list.c @@ -189,7 +189,7 @@ static int dap_chain_net_node_list_wait(struct node_link_request *a_node_list_re return 0; } struct timespec l_cond_timeout; - clock_gettime(CLOCK_MONOTONIC, &l_cond_timeout); + clock_gettime(CLOCK_REALTIME, &l_cond_timeout); l_cond_timeout.tv_sec += a_timeout_ms/1000; int l_ret_wait = pthread_cond_timedwait(&a_node_list_request->wait_cond, &a_node_list_request->wait_mutex, &l_cond_timeout); if(!l_ret_wait) { diff --git a/modules/net/include/dap_chain_net.h b/modules/net/include/dap_chain_net.h index 22f3ba32a0..ff92297f3d 100644 --- a/modules/net/include/dap_chain_net.h +++ b/modules/net/include/dap_chain_net.h @@ -51,7 +51,7 @@ along with any CellFrame SDK based project. If not, see <http://www.gnu.org/lic struct dap_chain_node_info; typedef struct dap_chain_node_client dap_chain_node_client_t; -typedef enum dap_chain_net_state{ +typedef enum dap_chain_net_state{ NET_STATE_OFFLINE = 0, NET_STATE_LINKS_PREPARE, NET_STATE_LINKS_CONNECTING, @@ -62,18 +62,6 @@ typedef enum dap_chain_net_state{ NET_STATE_ONLINE } dap_chain_net_state_t; -static const char * g_net_state_str[]={ - [NET_STATE_OFFLINE] = "NET_STATE_OFFLINE", - [NET_STATE_LINKS_PREPARE]="NET_STATE_LINKS_PREPARE", - [NET_STATE_LINKS_CONNECTING]="NET_STATE_LINKS_CONNECTING", - [NET_STATE_LINKS_ESTABLISHED]="NET_STATE_LINKS_ESTABLISHED", - [NET_STATE_ADDR_REQUEST]="NET_STATE_ADDR_REQUEST", // Waiting for address assign - [NET_STATE_SYNC_GDB]="NET_STATE_SYNC_GDB", - [NET_STATE_SYNC_CHAINS]="NET_STATE_SYNC_CHAINS", - [NET_STATE_ONLINE]="NET_STATE_ONLINE" -}; - - typedef struct dap_chain_net{ struct { dap_chain_net_id_t id; @@ -133,18 +121,6 @@ inline static int dap_chain_net_sync_gdb(dap_chain_net_t * a_net) { return dap_c inline static int dap_chain_net_sync_chains(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_SYNC_CHAINS); } inline static int dap_chain_net_sync_all(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_SYNC_CHAINS); } -/** - * @brief dap_chain_net_state_to_str - * @param a_state - * @return - */ -static inline const char * dap_chain_net_state_to_str(dap_chain_net_state_t a_state){ - if(a_state< NET_STATE_OFFLINE || a_state > NET_STATE_ONLINE) - return "<Undefined net state>"; - else - return g_net_state_str[a_state]; -} - void dap_chain_net_delete( dap_chain_net_t * a_net); void dap_chain_net_proc_mempool(dap_chain_net_t *a_net); void dap_chain_net_set_flag_sync_from_zero(dap_chain_net_t * a_net, bool a_flag_sync_from_zero); @@ -205,7 +181,7 @@ bool dap_chain_net_get_extra_gdb_group(dap_chain_net_t *a_net, dap_chain_node_ad int dap_chain_net_verify_datum_for_add(dap_chain_t *a_chain, dap_chain_datum_t *a_datum, dap_hash_fast_t *a_datum_hash); char *dap_chain_net_verify_datum_err_code_to_str(dap_chain_datum_t *a_datum, int a_code); -int dap_chain_net_add_downlink(dap_chain_net_t *a_net, dap_stream_worker_t *a_worker, dap_stream_ch_uuid_t a_ch_uuid, dap_events_socket_uuid_t a_esocket_uuid, char *a_addr, int a_port); +void dap_chain_net_add_downlink(dap_chain_net_t *a_net, dap_stream_worker_t *a_worker, dap_stream_ch_uuid_t a_ch_uuid, dap_events_socket_uuid_t a_esocket_uuid, char *a_addr, int a_port); void dap_chain_net_del_downlink(dap_stream_ch_uuid_t *a_ch_uuid); void dap_chain_net_add_gdb_notify_callback(dap_chain_net_t *a_net, dap_store_obj_callback_notify_t a_callback, void *a_cb_arg); void dap_chain_net_sync_gdb_broadcast(dap_global_db_context_t *a_context, dap_store_obj_t *a_obj, void *a_arg); -- GitLab