From 55bf47b12981036c624cda2c437fd6c456e5fb56 Mon Sep 17 00:00:00 2001 From: cellframe <roman.khlopkov@demlabs.net> Date: Tue, 8 Nov 2022 15:08:46 +0300 Subject: [PATCH] [*] Porting deadlock fix with dap_chain_net from master --- dap-sdk/net/stream/ch/include/dap_stream_ch.h | 2 +- modules/net/dap_chain_net.c | 221 +++++++++--------- 2 files changed, 111 insertions(+), 112 deletions(-) diff --git a/dap-sdk/net/stream/ch/include/dap_stream_ch.h b/dap-sdk/net/stream/ch/include/dap_stream_ch.h index 837076ab02..8531934abf 100644 --- a/dap-sdk/net/stream/ch/include/dap_stream_ch.h +++ b/dap-sdk/net/stream/ch/include/dap_stream_ch.h @@ -67,7 +67,7 @@ void dap_stream_ch_delete(dap_stream_ch_t *a_ch); dap_stream_ch_t *dap_stream_ch_find_by_uuid_unsafe(dap_stream_worker_t *a_worker, dap_stream_ch_uuid_t a_uuid); // MT-safe functions -DAP_STATIC_INLINE bool dap_stream_ch_check_uuid(dap_stream_worker_t *a_worker, dap_stream_ch_uuid_t a_ch_uuid) +DAP_STATIC_INLINE bool dap_stream_ch_check_uuid_mt(dap_stream_worker_t *a_worker, dap_stream_ch_uuid_t a_ch_uuid) { return dap_stream_ch_find_by_uuid_unsafe(a_worker, a_ch_uuid); } diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index eb378f0249..78f7ad6066 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -154,14 +154,6 @@ struct downlink { */ typedef struct dap_chain_net_pvt{ pthread_t proc_tid; -#ifndef _WIN32 - pthread_cond_t state_proc_cond; -#else - HANDLE state_proc_cond; -#endif - - - pthread_mutex_t state_mutex_cond; dap_chain_node_role_t node_role; uint32_t flags; time_t last_sync; @@ -207,8 +199,12 @@ typedef struct dap_chain_net_pvt{ // Main loop timer dap_interval_timer_t main_timer; - // General rwlock for structure - pthread_rwlock_t rwlock; + pthread_rwlock_t uplinks_lock; + pthread_rwlock_t downlinks_lock; + pthread_rwlock_t balancer_lock; + pthread_rwlock_t states_lock; + pthread_rwlock_t gdbs_lock; + pthread_rwlock_t atoms_lock; dap_list_t *gdb_notifiers; } dap_chain_net_pvt_t; @@ -223,8 +219,8 @@ typedef struct dap_chain_net_item{ #define PVT(a) ( (dap_chain_net_pvt_t *) (void*) a->pvt ) #define PVT_S(a) ( (dap_chain_net_pvt_t *) (void*) a.pvt ) -pthread_rwlock_t g_net_items_rwlock = PTHREAD_RWLOCK_INITIALIZER, - g_net_ids_rwlock = PTHREAD_RWLOCK_INITIALIZER; +static pthread_rwlock_t s_net_items_rwlock = PTHREAD_RWLOCK_INITIALIZER, + s_net_ids_rwlock = PTHREAD_RWLOCK_INITIALIZER; static dap_chain_net_item_t *s_net_items = NULL, *s_net_items_ids = NULL; @@ -369,30 +365,29 @@ char *dap_chain_net_get_gdb_group_acl(dap_chain_net_t *a_net) */ int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_new_state) { + pthread_rwlock_wrlock(&PVT(a_net)->states_lock); if (PVT(a_net)->state != NET_STATE_OFFLINE){ PVT(a_net)->state = PVT(a_net)->state_target = NET_STATE_OFFLINE; + pthread_rwlock_unlock(&PVT(a_net)->states_lock); s_net_states_proc(NULL, a_net); + pthread_rwlock_wrlock(&PVT(a_net)->states_lock); } - PVT(a_net)->state_target = a_new_state; - - pthread_mutex_lock( &PVT(a_net)->state_mutex_cond); // Preventing call of state_go_to before wait cond will be armed // set flag for sync PVT(a_net)->flags |= F_DAP_CHAIN_NET_GO_SYNC; //PVT(a_net)->flags |= F_DAP_CHAIN_NET_SYNC_FROM_ZERO; // TODO set this flag according to -mode argument from command line -#ifndef _WIN32 - pthread_cond_signal( &PVT(a_net)->state_proc_cond ); -#else - SetEvent( PVT(a_net)->state_proc_cond ); -#endif - pthread_mutex_unlock( &PVT(a_net)->state_mutex_cond); + pthread_rwlock_unlock(&PVT(a_net)->states_lock); + dap_proc_queue_add_callback(dap_events_worker_get_auto(), s_net_states_proc, a_net); return 0; } dap_chain_net_state_t dap_chain_net_get_target_state(dap_chain_net_t *a_net) { - return PVT(a_net)->state_target; + pthread_rwlock_rdlock(&PVT(a_net)->states_lock); + dap_chain_net_state_t l_ret = PVT(a_net)->state_target; + pthread_rwlock_unlock(&PVT(a_net)->states_lock); + return l_ret; } /** @@ -416,10 +411,10 @@ int dap_chain_net_add_downlink(dap_chain_net_t *a_net, dap_stream_worker_t *a_wo unsigned a_hash_value; HASH_VALUE(&a_ch_uuid, sizeof(a_ch_uuid), a_hash_value); struct downlink *l_downlink = NULL; - pthread_rwlock_rdlock(&l_net_pvt->rwlock); + pthread_rwlock_rdlock(&l_net_pvt->downlinks_lock); HASH_FIND_BYHASHVALUE(hh, l_net_pvt->downlinks, &a_ch_uuid, sizeof(a_ch_uuid), a_hash_value, l_downlink); if (l_downlink) { - pthread_rwlock_unlock(&l_net_pvt->rwlock); + pthread_rwlock_unlock(&l_net_pvt->downlinks_lock); return -2; } l_downlink = DAP_NEW_Z(struct downlink); @@ -427,7 +422,7 @@ int dap_chain_net_add_downlink(dap_chain_net_t *a_net, dap_stream_worker_t *a_wo l_downlink->ch_uuid = a_ch_uuid; l_downlink->esocket_uuid = a_esocket_uuid; HASH_ADD_BYHASHVALUE(hh, l_net_pvt->downlinks, ch_uuid, sizeof(a_ch_uuid), a_hash_value, l_downlink); - pthread_rwlock_unlock(&l_net_pvt->rwlock); + pthread_rwlock_unlock(&l_net_pvt->downlinks_lock); return 0; } @@ -473,7 +468,7 @@ static bool s_net_send_records_callback_get_raw (dap_global_db_context_t * a_glo DAP_DELETE(l_arg_obj->key); DAP_DELETE(l_arg_obj); - pthread_rwlock_wrlock(&PVT(l_net)->rwlock); + pthread_rwlock_wrlock(&PVT(l_net)->gdbs_lock); if (PVT(l_net)->state != NET_STATE_OFFLINE) { dap_list_t *it = NULL; do { @@ -486,8 +481,10 @@ static bool s_net_send_records_callback_get_raw (dap_global_db_context_t * a_glo dap_global_db_pkt_t *l_data_out = dap_store_packet_single(l_obj_cur); dap_store_obj_free_one(l_obj_cur); struct downlink *l_link, *l_tmp; + pthread_rwlock_rdlock(&PVT(l_net)->downlinks_lock); HASH_ITER(hh, PVT(l_net)->downlinks, l_link, l_tmp) { - if (! dap_stream_ch_check_uuid(l_link->worker, l_link->ch_uuid)) { + bool l_ch_alive = dap_stream_ch_check_uuid_mt(l_link->worker, l_link->ch_uuid); + if (!l_ch_alive) { HASH_DEL(PVT(l_net)->downlinks, l_link); DAP_DELETE(l_link); continue; @@ -499,6 +496,7 @@ static bool s_net_send_records_callback_get_raw (dap_global_db_context_t * a_glo sizeof(dap_global_db_pkt_t) + l_data_out->data_size)) debug_if(g_debug_reactor, L_ERROR, "Can't send pkt to worker (%d) for writing", l_link->worker->worker->id); } + pthread_rwlock_unlock(&PVT(l_net)->downlinks_lock); DAP_DELETE(l_data_out); if (it) PVT(l_net)->records_queue = dap_list_delete_link(PVT(l_net)->records_queue, it); @@ -507,7 +505,7 @@ static bool s_net_send_records_callback_get_raw (dap_global_db_context_t * a_glo } else //PVT(l_net)->records_queue = dap_list_append(PVT(l_net)->records_queue, l_obj); dap_store_obj_free_one(a_store_obj); - pthread_rwlock_unlock(&PVT(l_net)->rwlock); + pthread_rwlock_unlock(&PVT(l_net)->gdbs_lock); return false; // We've freed obj by our own before } @@ -566,10 +564,10 @@ void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const c dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; if (!HASH_COUNT(PVT(l_net)->downlinks)) { if (PVT(l_net)->records_queue) { - pthread_rwlock_wrlock(&PVT(l_net)->rwlock); + pthread_rwlock_wrlock(&PVT(l_net)->gdbs_lock); dap_list_free_full(PVT(l_net)->records_queue, s_record_obj_free); PVT(l_net)->records_queue = NULL; - pthread_rwlock_unlock(&PVT(l_net)->rwlock); + pthread_rwlock_unlock(&PVT(l_net)->gdbs_lock); } return; } @@ -596,7 +594,7 @@ static bool s_net_send_atoms(dap_proc_thread_t *a_thread, void *a_arg) UNUSED(a_thread); dap_store_obj_t *l_arg = (dap_store_obj_t *)a_arg; dap_chain_net_t *l_net = (dap_chain_net_t *)l_arg->callback_proc_thread_arg; - pthread_rwlock_wrlock(&PVT(l_net)->rwlock); + pthread_rwlock_wrlock(&PVT(l_net)->atoms_lock); if (PVT(l_net)->state != NET_STATE_SYNC_CHAINS) { dap_list_t *it = NULL; do { @@ -604,9 +602,10 @@ static bool s_net_send_atoms(dap_proc_thread_t *a_thread, void *a_arg) dap_chain_t *l_chain = (dap_chain_t *)l_obj_cur->group; uint64_t l_cell_id = l_obj_cur->timestamp; struct downlink *l_link, *l_tmp; + pthread_rwlock_wrlock(&PVT(l_net)->downlinks_lock); HASH_ITER(hh, PVT(l_net)->downlinks, l_link, l_tmp) { - dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(l_link->worker, l_link->ch_uuid); - if (!l_ch) { + bool l_ch_alive = dap_stream_ch_check_uuid_mt(l_link->worker, l_link->ch_uuid); + if (!l_ch_alive) { HASH_DEL(PVT(l_net)->downlinks, l_link); DAP_DELETE(l_link); continue; @@ -616,6 +615,7 @@ static bool s_net_send_atoms(dap_proc_thread_t *a_thread, void *a_arg) l_obj_cur->value, l_obj_cur->value_len)) debug_if(g_debug_reactor, L_ERROR, "Can't send atom to worker (%d) for writing", l_link->worker->worker->id); } + pthread_rwlock_unlock(&PVT(l_net)->downlinks_lock); s_atom_obj_free(l_obj_cur); if (it) PVT(l_net)->atoms_queue = dap_list_delete_link(PVT(l_net)->atoms_queue, it); @@ -624,7 +624,7 @@ static bool s_net_send_atoms(dap_proc_thread_t *a_thread, void *a_arg) } else //PVT(l_net)->atoms_queue = dap_list_append(PVT(l_net)->atoms_queue, l_arg); s_atom_obj_free(a_arg); - pthread_rwlock_unlock(&PVT(l_net)->rwlock); + pthread_rwlock_unlock(&PVT(l_net)->atoms_lock); return true; } @@ -641,10 +641,10 @@ static void s_chain_callback_notify(void *a_arg, dap_chain_t *a_chain, dap_chain dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; if (!HASH_COUNT(PVT(l_net)->downlinks)) { if (PVT(l_net)->atoms_queue) { - pthread_rwlock_wrlock(&PVT(l_net)->rwlock); + pthread_rwlock_wrlock(&PVT(l_net)->atoms_lock); dap_list_free_full(PVT(l_net)->atoms_queue, s_atom_obj_free); PVT(l_net)->atoms_queue = NULL; - pthread_rwlock_unlock(&PVT(l_net)->rwlock); + pthread_rwlock_unlock(&PVT(l_net)->atoms_lock); } return; } @@ -793,12 +793,12 @@ static void s_fill_links_from_root_aliases(dap_chain_net_t * a_net) dap_chain_net_pvt_t *l_pvt_net = PVT(a_net); uint64_t l_own_addr = dap_chain_net_get_cur_addr_int(a_net); for (size_t i = 0; i < MIN(s_max_links_count, l_pvt_net->seed_aliases_count); i++) { - pthread_rwlock_rdlock(&l_pvt_net->rwlock); + pthread_rwlock_rdlock(&l_pvt_net->uplinks_lock); if (dap_list_length(l_pvt_net->net_links) >= s_max_links_count) { - pthread_rwlock_unlock(&l_pvt_net->rwlock); + pthread_rwlock_unlock(&l_pvt_net->uplinks_lock); break; } else - pthread_rwlock_unlock(&l_pvt_net->rwlock); + pthread_rwlock_unlock(&l_pvt_net->uplinks_lock); dap_chain_node_addr_t *l_link_addr = dap_chain_node_alias_find(a_net, l_pvt_net->seed_aliases[i]); if (!l_link_addr) @@ -812,9 +812,9 @@ static void s_fill_links_from_root_aliases(dap_chain_net_t * a_net) if(l_link_node_info && !dap_chain_net_link_is_present(a_net, l_link_node_info)) { struct net_link *l_new_link = DAP_NEW_Z(struct net_link); l_new_link->link_info = l_link_node_info; - pthread_rwlock_wrlock(&l_pvt_net->rwlock); + pthread_rwlock_wrlock(&l_pvt_net->uplinks_lock); l_pvt_net->net_links = dap_list_append(l_pvt_net->net_links, l_new_link); - pthread_rwlock_unlock(&l_pvt_net->rwlock); + pthread_rwlock_unlock(&l_pvt_net->uplinks_lock); } else { log_it(L_WARNING, "Not found link %s."NODE_ADDR_FP_STR" in the node list or link is already in use", a_net->pub.name, NODE_ADDR_FP_ARGS(l_link_addr)); @@ -845,7 +845,7 @@ static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_clie if ( s_debug_more ) log_it(L_NOTICE, "Established connection with %s."NODE_ADDR_FP_STR,l_net->pub.name, NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr)); - pthread_rwlock_wrlock(&l_net_pvt->rwlock); + pthread_rwlock_wrlock(&l_net_pvt->uplinks_lock); l_net_pvt->links_connected_count++; a_node_client->is_connected = true; struct json_object *l_json = net_states_json_collect(l_net); @@ -860,7 +860,7 @@ static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_clie l_net_pvt->state = NET_STATE_LINKS_ESTABLISHED; dap_proc_queue_add_callback_inter(a_node_client->stream_worker->worker->proc_queue_input,s_net_states_proc,l_net ); } - pthread_rwlock_unlock(&l_net_pvt->rwlock); + pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); } @@ -891,7 +891,7 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t *a_node_cl { dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; dap_chain_net_pvt_t *l_net_pvt = PVT(l_net); - pthread_rwlock_wrlock(&l_net_pvt->rwlock); + pthread_rwlock_wrlock(&l_net_pvt->uplinks_lock); if (a_node_client->is_connected) { a_node_client->is_connected = false; log_it(L_INFO, "%s."NODE_ADDR_FP_STR" disconnected.%s",l_net->pub.name, @@ -912,13 +912,13 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t *a_node_cl ((struct net_link *)it->data)->client_uuid = l_client_new->uuid; ((struct net_link *)it->data)->link = dap_chain_net_client_create_n_connect(l_net, ((struct net_link *)it->data)->link_info); - pthread_rwlock_unlock(&l_net_pvt->rwlock); + pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); return; } } if (l_net_pvt->only_static_links) { a_node_client->keep_connection = true; - pthread_rwlock_unlock(&l_net_pvt->rwlock); + pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); return; } dap_chain_node_info_t *l_link_node_info = NULL; @@ -942,7 +942,7 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t *a_node_cl } } } - pthread_rwlock_unlock(&l_net_pvt->rwlock); + pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); } /** @@ -1011,7 +1011,7 @@ static void s_node_link_callback_delete(dap_chain_node_client_t * a_node_client, log_it(L_ERROR, "Links count is zero in delete callback"); } dap_chain_net_sync_unlock(l_net, a_node_client); - pthread_rwlock_wrlock(&l_net_pvt->rwlock); + pthread_rwlock_wrlock(&l_net_pvt->uplinks_lock); for ( dap_list_t * it = l_net_pvt->net_links; it; it=it->next ){ if (((struct net_link *)it->data)->link == a_node_client) { log_it(L_DEBUG,"Replace node client with new one"); @@ -1020,12 +1020,12 @@ static void s_node_link_callback_delete(dap_chain_node_client_t * a_node_client, ((struct net_link *)it->data)->client_uuid = l_client->uuid; } } - pthread_rwlock_unlock(&l_net_pvt->rwlock); + pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); struct json_object *l_json = net_states_json_collect(l_net); json_object_object_add(l_json, "errorMessage", json_object_new_string("Link restart")); dap_notify_server_send_mt(json_object_get_string(l_json)); json_object_put(l_json); - // Then a_alient wiil be destroyed in a right way + // Then a_node_client will be destroyed in a right way } /** @@ -1050,22 +1050,24 @@ static void s_net_state_link_prepare_success(dap_worker_t * a_worker,dap_chain_n if (a_node_info->hdr.address.uint64 != l_own_addr) { struct net_link *l_new_link = DAP_NEW_Z(struct net_link); l_new_link->link_info = a_node_info; - pthread_rwlock_wrlock(&l_net_pvt->rwlock); + pthread_rwlock_wrlock(&l_net_pvt->uplinks_lock); l_net_pvt->net_links = dap_list_append(l_net_pvt->net_links, l_new_link); - pthread_rwlock_unlock(&l_net_pvt->rwlock); + pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); l_dns_request->tries = 0; } - pthread_rwlock_rdlock(&l_net_pvt->rwlock); + pthread_rwlock_rdlock(&l_net_pvt->balancer_lock); l_dns_request->tries++; l_net_pvt->links_dns_requests--; if (l_net_pvt->links_dns_requests == 0){ // It was the last one + pthread_rwlock_wrlock(&l_net_pvt->states_lock); if (l_net_pvt->state != NET_STATE_LINKS_CONNECTING){ l_net_pvt->state = NET_STATE_LINKS_CONNECTING; } + pthread_rwlock_unlock(&l_net_pvt->states_lock); dap_proc_queue_add_callback_inter( a_worker->proc_queue_input,s_net_states_proc,l_net ); } - pthread_rwlock_unlock(&l_net_pvt->rwlock); + pthread_rwlock_unlock(&l_net_pvt->balancer_lock); struct json_object *l_json = net_states_json_collect(l_net); char l_err_str[128] = { }; dap_snprintf(l_err_str, sizeof(l_err_str) @@ -1101,25 +1103,26 @@ static void s_net_state_link_prepare_error(dap_worker_t * a_worker,dap_chain_nod json_object_object_add(l_json, "errorMessage", json_object_new_string(l_err_str)); dap_notify_server_send_mt(json_object_get_string(l_json)); json_object_put(l_json); - pthread_rwlock_wrlock(&l_net_pvt->rwlock); + pthread_rwlock_wrlock(&l_net_pvt->balancer_lock); if(l_net_pvt->links_dns_requests) l_net_pvt->links_dns_requests--; log_it(L_DEBUG, "Still %u link dns requests in process",l_net_pvt->links_dns_requests); + bool l_fill_from_root = false; if(!l_net_pvt->links_dns_requests ){ + pthread_rwlock_wrlock(&l_net_pvt->states_lock); if( l_net_pvt->state_target != NET_STATE_OFFLINE){ log_it(L_WARNING,"Can't prepare links via DNS requests. Prefilling links with root addresses"); l_net_pvt->state = NET_STATE_LINKS_CONNECTING; - pthread_rwlock_unlock(&l_net_pvt->rwlock); - s_fill_links_from_root_aliases(l_net); - dap_proc_queue_add_callback_inter( a_worker->proc_queue_input,s_net_states_proc,l_net ); - DAP_DELETE(l_dns_request); - return; + l_fill_from_root = true; } + pthread_rwlock_unlock(&l_net_pvt->states_lock); } - pthread_rwlock_unlock(&l_net_pvt->rwlock); + pthread_rwlock_unlock(&l_net_pvt->balancer_lock); DAP_DELETE(l_dns_request); + s_fill_links_from_root_aliases(l_net); + dap_proc_queue_add_callback_inter( a_worker->proc_queue_input,s_net_states_proc,l_net ); } /** @@ -1227,7 +1230,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) { l_net_pvt->state = NET_STATE_OFFLINE; } - pthread_rwlock_wrlock(&l_net_pvt->rwlock); + pthread_rwlock_wrlock(&l_net_pvt->states_lock); switch (l_net_pvt->state) { // State OFFLINE where we don't do anything @@ -1306,9 +1309,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) { if (l_net_pvt->only_static_links) { if (l_net_pvt->seed_aliases_count) { // Add other root nodes as synchronization links - pthread_rwlock_unlock(&l_net_pvt->rwlock); s_fill_links_from_root_aliases(l_net); - pthread_rwlock_wrlock(&l_net_pvt->rwlock); l_net_pvt->state = NET_STATE_LINKS_CONNECTING; l_repeat_after_exit = true; break; @@ -1344,9 +1345,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) { } else { log_it(L_ATT, "Not use bootstrap addresses, fill seed nodelist from root aliases"); - pthread_rwlock_unlock(&l_net_pvt->rwlock); s_fill_links_from_root_aliases(l_net); - pthread_rwlock_wrlock(&l_net_pvt->rwlock); } } } break; @@ -1384,7 +1383,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) { default: log_it (L_DEBUG, "Unprocessed state"); } s_net_states_notify(l_net); - pthread_rwlock_unlock(&l_net_pvt->rwlock); + pthread_rwlock_unlock(&l_net_pvt->states_lock); return ! l_repeat_after_exit; } @@ -1397,9 +1396,9 @@ int s_net_list_compare_uuids(const void *a_uuid1, const void *a_uuid2) bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client) { dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); - pthread_rwlock_wrlock(&l_net_pvt->rwlock); + pthread_rwlock_wrlock(&l_net_pvt->uplinks_lock); bool l_not_found = dap_chain_net_sync_trylock_nolock(a_net, a_client); - pthread_rwlock_unlock(&l_net_pvt->rwlock); + pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); return l_not_found; } @@ -1433,7 +1432,7 @@ bool dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t * if (!a_net) return false; dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); - pthread_rwlock_wrlock(&l_net_pvt->rwlock); + pthread_rwlock_wrlock(&l_net_pvt->uplinks_lock); bool l_ret = false; if (!a_client || l_net_pvt->active_link == a_client) l_net_pvt->active_link = NULL; @@ -1450,7 +1449,7 @@ bool dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t * } } l_ret = l_net_pvt->active_link; - pthread_rwlock_unlock(&l_net_pvt->rwlock); + pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); return l_ret; } /** @@ -1509,18 +1508,12 @@ static dap_chain_net_t *s_net_new(const char * a_id, const char * a_name , return NULL; dap_chain_net_t *ret = DAP_NEW_Z_SIZE( dap_chain_net_t, sizeof(ret->pub) + sizeof(dap_chain_net_pvt_t) ); ret->pub.name = strdup( a_name ); - -#ifndef _WIN32 - pthread_condattr_t l_attr; - pthread_condattr_init( &l_attr ); -#ifndef DAP_OS_DARWIN - pthread_condattr_setclock( &l_attr, CLOCK_MONOTONIC ); -#endif - pthread_cond_init( &PVT(ret)->state_proc_cond, &l_attr ); -#else - PVT(ret)->state_proc_cond = CreateEventA( NULL, FALSE, FALSE, NULL ); -#endif - pthread_mutex_init(&(PVT(ret)->state_mutex_cond), NULL); + pthread_rwlock_init(&PVT(ret)->uplinks_lock, NULL); + pthread_rwlock_init(&PVT(ret)->downlinks_lock, NULL); + pthread_rwlock_init(&PVT(ret)->balancer_lock, NULL); + pthread_rwlock_init(&PVT(ret)->states_lock, NULL); + pthread_rwlock_init(&PVT(ret)->gdbs_lock, NULL); + pthread_rwlock_init(&PVT(ret)->atoms_lock, NULL); if (dap_sscanf(a_id, "0x%016"DAP_UINT64_FORMAT_X, &ret->pub.id.uint64) != 1) { log_it (L_ERROR, "Wrong id format (\"%s\"). Must be like \"0x0123456789ABCDE\"" , a_id ); DAP_DELETE(ret); @@ -1556,11 +1549,17 @@ static dap_chain_net_t *s_net_new(const char * a_id, const char * a_name , */ void dap_chain_net_delete( dap_chain_net_t * a_net ) { + pthread_rwlock_destroy(&PVT(a_net)->uplinks_lock); + pthread_rwlock_destroy(&PVT(a_net)->downlinks_lock); + pthread_rwlock_destroy(&PVT(a_net)->balancer_lock); + pthread_rwlock_destroy(&PVT(a_net)->states_lock); + pthread_rwlock_destroy(&PVT(a_net)->gdbs_lock); + pthread_rwlock_destroy(&PVT(a_net)->atoms_lock); if(PVT(a_net)->seed_aliases) { DAP_DELETE(PVT(a_net)->seed_aliases); PVT(a_net)->seed_aliases = NULL; } - DAP_DELETE( PVT(a_net) ); + DAP_DELETE(a_net); } @@ -1617,7 +1616,7 @@ dap_string_t* dap_cli_list_net() dap_chain_net_t * l_net = NULL; int l_net_i = 0; dap_string_append(l_string_ret,"Available networks and chains:\n"); - pthread_rwlock_rdlock(&g_net_items_rwlock); + pthread_rwlock_rdlock(&s_net_items_rwlock); HASH_ITER(hh, s_net_items, l_net_item, l_net_item_tmp){ l_net = l_net_item->chain_net; dap_string_append_printf(l_string_ret, "\t%s:\n", l_net_item->name); @@ -1629,7 +1628,7 @@ dap_string_t* dap_cli_list_net() l_chain = l_chain->next; } } - pthread_rwlock_unlock(&g_net_items_rwlock); + pthread_rwlock_unlock(&s_net_items_rwlock); return l_string_ret; } @@ -1814,7 +1813,7 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) dap_chain_net_item_t * l_net_item, *l_net_item_tmp; int l_net_i = 0; dap_string_append(l_string_ret,"Networks:\n"); - pthread_rwlock_rdlock(&g_net_items_rwlock); + pthread_rwlock_rdlock(&s_net_items_rwlock); HASH_ITER(hh, s_net_items, l_net_item, l_net_item_tmp){ l_net = l_net_item->chain_net; dap_string_append_printf(l_string_ret, "\t%s:\n", l_net_item->name); @@ -1833,7 +1832,7 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) l_chain = l_chain->next; } } - pthread_rwlock_unlock(&g_net_items_rwlock); + pthread_rwlock_unlock(&s_net_items_rwlock); } }else{ @@ -1841,12 +1840,12 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) // show list of nets dap_chain_net_item_t * l_net_item, *l_net_item_tmp; int l_net_i = 0; - pthread_rwlock_rdlock(&g_net_items_rwlock); + pthread_rwlock_rdlock(&s_net_items_rwlock); HASH_ITER(hh, s_net_items, l_net_item, l_net_item_tmp){ dap_string_append_printf(l_string_ret, "\t%s\n", l_net_item->name); l_net_i++; } - pthread_rwlock_unlock(&g_net_items_rwlock); + pthread_rwlock_unlock(&s_net_items_rwlock); dap_string_append(l_string_ret, "\n"); } @@ -1976,7 +1975,7 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) if ( strcmp(l_links_str,"list") == 0 ) { size_t i =0; dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); - pthread_rwlock_rdlock(&l_net_pvt->rwlock ); + pthread_rwlock_rdlock(&l_net_pvt->uplinks_lock); size_t l_links_count = dap_list_length(l_net_pvt->net_links); dap_string_t *l_reply = dap_string_new(""); dap_string_append_printf(l_reply,"Links %zu:\n", l_links_count); @@ -2003,7 +2002,7 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) } i++; } - pthread_rwlock_unlock(&l_net_pvt->rwlock ); + pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); dap_cli_server_cmd_set_reply_text(a_str_reply,"%s",l_reply->str); dap_string_free(l_reply,true); @@ -2325,14 +2324,14 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) ,dap_config_get_item_str(l_cfg , "general" , "name" )); l_net_item->chain_net = l_net; l_net_item->net_id.uint64 = l_net->pub.id.uint64; - pthread_rwlock_wrlock(&g_net_items_rwlock); + pthread_rwlock_wrlock(&s_net_items_rwlock); HASH_ADD_STR(s_net_items,name,l_net_item); - pthread_rwlock_unlock(&g_net_items_rwlock); + pthread_rwlock_unlock(&s_net_items_rwlock); memcpy( l_net_item2,l_net_item,sizeof (*l_net_item)); - pthread_rwlock_wrlock(&g_net_ids_rwlock); + pthread_rwlock_wrlock(&s_net_ids_rwlock); HASH_ADD(hh,s_net_items_ids,net_id,sizeof ( l_net_item2->net_id),l_net_item2); - pthread_rwlock_unlock(&g_net_ids_rwlock); + pthread_rwlock_unlock(&s_net_ids_rwlock); // LEDGER model uint16_t l_ledger_flags = 0; @@ -2787,7 +2786,7 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) */ void dap_chain_net_deinit() { - pthread_rwlock_rdlock(&g_net_items_rwlock); + pthread_rwlock_rdlock(&s_net_items_rwlock); dap_chain_net_item_t *l_current_item, *l_tmp; HASH_ITER(hh, s_net_items, l_current_item, l_tmp) { dap_chain_net_t *l_net = l_current_item->chain_net; @@ -2798,14 +2797,14 @@ void dap_chain_net_deinit() HASH_DEL(s_net_items, l_current_item); DAP_DEL_Z(l_current_item); } - pthread_rwlock_unlock(&g_net_items_rwlock); - pthread_rwlock_destroy(&g_net_items_rwlock); + pthread_rwlock_unlock(&s_net_items_rwlock); + pthread_rwlock_destroy(&s_net_items_rwlock); } dap_chain_net_t **dap_chain_net_list(uint16_t *a_size) { - pthread_rwlock_rdlock(&g_net_items_rwlock); + pthread_rwlock_rdlock(&s_net_items_rwlock); *a_size = HASH_COUNT(s_net_items); if(*a_size){ dap_chain_net_t **l_net_list = DAP_NEW_SIZE(dap_chain_net_t *, (*a_size) * sizeof(dap_chain_net_t *)); @@ -2817,9 +2816,9 @@ dap_chain_net_t **dap_chain_net_list(uint16_t *a_size) break; } return l_net_list; - pthread_rwlock_unlock(&g_net_items_rwlock); + pthread_rwlock_unlock(&s_net_items_rwlock); } else { - pthread_rwlock_unlock(&g_net_items_rwlock); + pthread_rwlock_unlock(&s_net_items_rwlock); return NULL; } } @@ -2833,9 +2832,9 @@ dap_chain_net_t * dap_chain_net_by_name( const char * a_name) { dap_chain_net_item_t * l_net_item = NULL; if(a_name) { - pthread_rwlock_rdlock(&g_net_items_rwlock); + pthread_rwlock_rdlock(&s_net_items_rwlock); HASH_FIND_STR(s_net_items,a_name,l_net_item ); - pthread_rwlock_unlock(&g_net_items_rwlock); + pthread_rwlock_unlock(&s_net_items_rwlock); } return l_net_item ? l_net_item->chain_net : NULL; } @@ -2859,9 +2858,9 @@ dap_ledger_t * dap_chain_ledger_by_net_name( const char * a_net_name) dap_chain_net_t * dap_chain_net_by_id( dap_chain_net_id_t a_id) { dap_chain_net_item_t * l_net_item = NULL; - pthread_rwlock_rdlock(&g_net_ids_rwlock); + pthread_rwlock_rdlock(&s_net_ids_rwlock); HASH_FIND(hh,s_net_items_ids,&a_id,sizeof (a_id), l_net_item ); - pthread_rwlock_unlock(&g_net_ids_rwlock); + pthread_rwlock_unlock(&s_net_ids_rwlock); return l_net_item ? l_net_item->chain_net : NULL; } @@ -2978,9 +2977,9 @@ char * dap_chain_net_get_gdb_group_mempool_by_chain_type(dap_chain_net_t *a_net, dap_chain_net_state_t dap_chain_net_get_state ( dap_chain_net_t * l_net) { assert(l_net); - pthread_rwlock_rdlock(&PVT(l_net)->rwlock); + pthread_rwlock_rdlock(&PVT(l_net)->states_lock); dap_chain_net_state_t l_ret = PVT(l_net)->state; - pthread_rwlock_unlock(&PVT(l_net)->rwlock); + pthread_rwlock_unlock(&PVT(l_net)->states_lock); return l_ret; } @@ -2993,13 +2992,13 @@ void dap_chain_net_set_state ( dap_chain_net_t * l_net, dap_chain_net_state_t a_ { assert(l_net); log_it(L_DEBUG,"%s set state %s", l_net->pub.name, dap_chain_net_state_to_str(a_state) ); - pthread_rwlock_wrlock(&PVT(l_net)->rwlock); + pthread_rwlock_wrlock(&PVT(l_net)->states_lock); if( a_state == PVT(l_net)->state){ - pthread_rwlock_unlock(&PVT(l_net)->rwlock); + pthread_rwlock_unlock(&PVT(l_net)->states_lock); return; } PVT(l_net)->state = a_state; - pthread_rwlock_unlock(&PVT(l_net)->rwlock); + pthread_rwlock_unlock(&PVT(l_net)->states_lock); dap_proc_queue_add_callback(dap_events_worker_get_auto(), s_net_states_proc,l_net ); } -- GitLab