diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index d297d1aecdee9ae289438ad437fb0700300a88fc..6060e27a3f02081a19c98a942885ceeace575afb 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -174,7 +174,7 @@ dap_events_socket_t *dap_events_socket_find( int sock, struct dap_events *a_even dap_events_socket_t *ret = NULL; if(!a_events) return NULL; - pthread_rwlock_rdlock( &a_events->sockets_rwlock ); + pthread_rwlock_wrlock( &a_events->sockets_rwlock ); if(a_events->sockets) HASH_FIND_INT( a_events->sockets, &sock, ret ); pthread_rwlock_unlock( &a_events->sockets_rwlock ); diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index d6769440874667f6a4182fd522cdc95a66426d41..adc20540126577325035768c18a2f47330ce9269 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -776,7 +776,7 @@ void stream_proc_pkt_in(dap_stream_t * a_stream) // Find channel dap_stream_ch_t * ch = NULL; - pthread_rwlock_rdlock (&a_stream->rwlock); + pthread_rwlock_wrlock (&a_stream->rwlock); for(size_t i=0;i<a_stream->channel_count;i++){ if(a_stream->channel[i]->proc){ if(a_stream->channel[i]->proc->id == l_ch_pkt->hdr.id ){ diff --git a/modules/chain/dap_chain.c b/modules/chain/dap_chain.c index b8aba895c5b025c991d3b01d6a06d883875e6dfd..08039a78ea10dae69067211070d5acf0bb2640f4 100644 --- a/modules/chain/dap_chain.c +++ b/modules/chain/dap_chain.c @@ -189,7 +189,7 @@ dap_chain_t * dap_chain_find_by_id(dap_chain_net_id_t a_chain_net_id,dap_chain_i }; dap_chain_item_t * l_ret_item = NULL; - pthread_rwlock_rdlock(&s_chain_items_rwlock); + pthread_rwlock_wrlock(&s_chain_items_rwlock); HASH_FIND(hh,s_chain_items,&l_chain_item_id,sizeof(dap_chain_item_id_t),l_ret_item); pthread_rwlock_unlock(&s_chain_items_rwlock); if ( l_ret_item ){ diff --git a/modules/chain/dap_chain_ledger.c b/modules/chain/dap_chain_ledger.c index 5024ddc9a43c85e6813bd7422d4cc1a0f6ffc2d1..64c463ef26a9bb90905582b74c1c95f293def162 100644 --- a/modules/chain/dap_chain_ledger.c +++ b/modules/chain/dap_chain_ledger.c @@ -315,7 +315,7 @@ int dap_chain_ledger_token_emission_add_check(dap_ledger_t *a_ledger, const dap_ const char * c_token_ticker = a_token_emission->hdr.ticker; dap_chain_ledger_token_item_t * l_token_item = NULL; - pthread_rwlock_rdlock(&l_ledger_priv->tokens_rwlock); + pthread_rwlock_wrlock(&l_ledger_priv->tokens_rwlock); HASH_FIND_STR(l_ledger_priv->tokens, c_token_ticker, l_token_item); pthread_rwlock_unlock(&l_ledger_priv->tokens_rwlock); @@ -326,7 +326,7 @@ int dap_chain_ledger_token_emission_add_check(dap_ledger_t *a_ledger, const dap_ //dap_chain_hash_fast_t * l_token_emission_hash_ptr = &l_token_emission_hash; dap_hash_fast(a_token_emission, a_token_emission_size, &l_token_emission_hash); char * l_hash_str = dap_chain_hash_fast_to_str_new(&l_token_emission_hash); - pthread_rwlock_rdlock( l_token_item ? + pthread_rwlock_wrlock( l_token_item ? &l_token_item->token_emissions_rwlock : &l_ledger_priv->treshold_emissions_rwlock ); @@ -364,7 +364,7 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, const char * c_token_ticker = a_token_emission->hdr.ticker; dap_chain_ledger_token_item_t * l_token_item = NULL; - pthread_rwlock_rdlock(&l_ledger_priv->tokens_rwlock); + pthread_rwlock_wrlock(&l_ledger_priv->tokens_rwlock); HASH_FIND_STR(l_ledger_priv->tokens, c_token_ticker, l_token_item); pthread_rwlock_unlock(&l_ledger_priv->tokens_rwlock); @@ -436,12 +436,12 @@ dap_chain_datum_token_emission_t * dap_chain_ledger_token_emission_find(dap_ledg dap_ledger_private_t *l_ledger_priv = PVT(a_ledger); dap_chain_datum_token_emission_t * l_token_emission = NULL; dap_chain_ledger_token_item_t * l_token_item = NULL; - pthread_rwlock_rdlock(&l_ledger_priv->tokens_rwlock); + pthread_rwlock_wrlock(&l_ledger_priv->tokens_rwlock); HASH_FIND_STR(l_ledger_priv->tokens, a_token_ticker, l_token_item); if(l_token_item) { dap_chain_ledger_token_emission_item_t * l_token_emission_item = NULL; - pthread_rwlock_rdlock( &l_token_item->token_emissions_rwlock); + pthread_rwlock_wrlock( &l_token_item->token_emissions_rwlock); HASH_FIND(hh, l_token_item->token_emissions, a_token_emission_hash, sizeof(*a_token_emission_hash), l_token_emission_item); if( l_token_emission_item) @@ -567,7 +567,7 @@ static dap_chain_datum_tx_t* s_find_datum_tx_by_hash(dap_ledger_t *a_ledger, dap_ledger_private_t *l_ledger_priv = PVT(a_ledger); dap_chain_datum_tx_t *l_tx_ret = NULL; dap_chain_ledger_tx_item_t *l_tx_item; - pthread_rwlock_rdlock(&l_ledger_priv->ledger_rwlock); + pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); HASH_FIND(hh, l_ledger_priv->ledger_items, a_tx_hash, sizeof(dap_chain_hash_fast_t), l_tx_item); // tx_hash already in the hash? if(l_tx_item) { l_tx_ret = l_tx_item->tx; @@ -1153,7 +1153,7 @@ _dap_int128_t dap_chain_ledger_count(dap_ledger_t *a_ledger) _dap_int128_t l_ret = 0; dap_ledger_private_t *l_ledger_priv = PVT(a_ledger); dap_chain_ledger_tx_item_t *l_iter_current, *l_item_tmp; - pthread_rwlock_rdlock(&l_ledger_priv->ledger_rwlock); + pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); HASH_ITER(hh, l_ledger_priv->ledger_items , l_iter_current, l_item_tmp) { l_ret++; @@ -1174,7 +1174,7 @@ uint64_t dap_chain_ledger_count_from_to(dap_ledger_t * a_ledger, time_t a_ts_fro uint64_t l_ret = 0; dap_ledger_private_t *l_ledger_priv = PVT(a_ledger); dap_chain_ledger_tx_item_t *l_iter_current, *l_item_tmp; - pthread_rwlock_rdlock(&l_ledger_priv->ledger_rwlock); + pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); if ( a_ts_from && a_ts_to) { HASH_ITER(hh, l_ledger_priv->ledger_items , l_iter_current, l_item_tmp){ if ( l_iter_current->ts_created >= a_ts_from && l_iter_current->ts_created <= a_ts_to ) @@ -1289,7 +1289,7 @@ uint64_t dap_chain_ledger_calc_balance_full(dap_ledger_t *a_ledger, const dap_ch */ dap_ledger_private_t *l_ledger_priv = PVT(a_ledger); dap_chain_ledger_tx_item_t *l_iter_current, *l_item_tmp; - pthread_rwlock_rdlock(&l_ledger_priv->ledger_rwlock); + pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); HASH_ITER(hh, l_ledger_priv->ledger_items , l_iter_current, l_item_tmp) { dap_chain_datum_tx_t *l_cur_tx = l_iter_current->tx; @@ -1349,7 +1349,7 @@ static dap_chain_ledger_tx_item_t* tx_item_find_by_addr(dap_ledger_t *a_ledger, bool is_null_hash = dap_hash_fast_is_blank(a_tx_first_hash); bool is_search_enable = is_null_hash; dap_chain_ledger_tx_item_t *l_iter_current, *l_item_tmp; - pthread_rwlock_rdlock(&l_ledger_priv->ledger_rwlock); + pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); HASH_ITER(hh, l_ledger_priv->ledger_items , l_iter_current, l_item_tmp) { // If a_token is setup we check if its not our token - miss it @@ -1426,7 +1426,7 @@ const dap_chain_datum_tx_t* dap_chain_ledger_tx_find_by_pkey(dap_ledger_t *a_led bool is_null_hash = dap_hash_fast_is_blank(a_tx_first_hash); bool is_search_enable = is_null_hash; dap_chain_ledger_tx_item_t *l_iter_current, *l_item_tmp; - pthread_rwlock_rdlock(&l_ledger_priv->ledger_rwlock); + pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); HASH_ITER(hh, l_ledger_priv->ledger_items , l_iter_current, l_item_tmp) { dap_chain_datum_tx_t *l_tx_tmp = l_iter_current->tx; @@ -1472,7 +1472,7 @@ const dap_chain_datum_tx_t* dap_chain_ledger_tx_cache_find_out_cond(dap_ledger_t bool is_null_hash = dap_hash_fast_is_blank(a_tx_first_hash); bool is_search_enable = is_null_hash; dap_chain_ledger_tx_item_t *l_iter_current, *l_item_tmp; - pthread_rwlock_rdlock(&l_ledger_priv->ledger_rwlock); + pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); HASH_ITER(hh, l_ledger_priv->ledger_items, l_iter_current, l_item_tmp) { dap_chain_datum_tx_t *l_tx_tmp = l_iter_current->tx; diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 3e789732cd26aab1bb4d610c91c2898e825fd4e1..be7b4e66c1ca411353de65f628fe9829be625b3c 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -194,6 +194,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_chain_hash_fast_t l_atom_hash; dap_hash_fast(l_lasts[i], l_chain->callback_atom_get_size(l_lasts[i]), &l_atom_hash); + pthread_mutex_lock(&l_ch_chain->mutex); HASH_FIND(hh, l_ch_chain->request_atoms_lasts, &l_atom_hash, sizeof(l_atom_hash), l_item); if(l_item == NULL) { // Not found, add new lasts @@ -203,6 +204,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) HASH_ADD(hh, l_ch_chain->request_atoms_lasts, atom_hash, sizeof(l_atom_hash), l_item); } + pthread_mutex_unlock(&l_ch_chain->mutex); //else // DAP_DELETE(l_lasts[i]); } @@ -535,12 +537,13 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain) memset(&a_ch_chain->request_last_ts, 0, sizeof(a_ch_chain->request_last_ts)); dap_chain_atom_item_t *l_atom_item = NULL, *l_atom_item_tmp = NULL; - + pthread_mutex_lock(&a_ch_chain->mutex); HASH_ITER( hh,a_ch_chain->request_atoms_lasts, l_atom_item, l_atom_item_tmp) HASH_DEL(a_ch_chain->request_atoms_lasts, l_atom_item); HASH_ITER( hh, a_ch_chain->request_atoms_processed, l_atom_item, l_atom_item_tmp ) HASH_DEL(a_ch_chain->request_atoms_processed, l_atom_item); + pthread_mutex_unlock(&a_ch_chain->mutex); dap_stream_ch_set_ready_to_write(a_ch_chain->ch, false); } @@ -702,6 +705,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) if(l_ch_chain->callback_notify_packet_out) l_ch_chain->callback_notify_packet_out(l_ch_chain, l_send_pkt_type, NULL, 0, l_ch_chain->callback_notify_arg); }else{ // Process one chain from l_ch_chain->request_atoms_lasts + pthread_mutex_lock(&l_ch_chain->mutex); HASH_ITER(hh,l_ch_chain->request_atoms_lasts, l_atom_item, l_atom_item_tmp) { dap_chain_atom_item_t * l_atom_item_proc = NULL; // Check if its processed already @@ -756,6 +760,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) HASH_DEL(l_ch_chain->request_atoms_lasts, l_atom_item); } } + pthread_mutex_unlock(&l_ch_chain->mutex); } //assert(l_ch_chain->request_atoms_lasts == NULL); //l_ch_chain->request_atoms_lasts = l_chains_lasts_new; diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 2b19601490269bec6ebfa8ff457d7d4858909e49..4b253daab7eb3e756518c63a28bfc31269f1ee08 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -335,10 +335,10 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha l_lasts = l_chain->callback_atom_iter_get_lasts(l_iter, &l_lasts_size); if ( l_lasts){ for(size_t i = 0; i < l_lasts_size; i++) { - pthread_mutex_lock(&l_node_client->wait_mutex); dap_chain_atom_item_t * l_item = NULL; dap_chain_hash_fast_t l_atom_hash; dap_hash_fast(l_lasts[i], l_chain->callback_atom_get_size(l_lasts[i]), &l_atom_hash); + pthread_mutex_lock(&a_ch_chain->mutex); HASH_FIND(hh, a_ch_chain->request_atoms_lasts, &l_atom_hash, sizeof(l_atom_hash), l_item); if(l_item == NULL) { // Not found, add new lasts l_item = DAP_NEW_Z(dap_chain_atom_item_t); @@ -348,7 +348,7 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha } //else // DAP_DELETE(l_lasts[i]); - pthread_mutex_unlock(&l_node_client->wait_mutex); + pthread_mutex_unlock(&a_ch_chain->mutex); } DAP_DELETE(l_lasts); } diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 3038215e8572106c3c397dccb86de12a6db9aafb..eb12e8dda3955cb23ca27f342088a9439df36799 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -291,7 +291,7 @@ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t else PVT(l_dag)->events_treshold = l_events; //HASH_ADD(hh, PVT(l_dag)->events_treshold, hash, sizeof(l_event_item->hash), l_event_item); - + pthread_rwlock_unlock( l_events_rwlock ); if ( l_events == PVT(l_dag)->events){ dap_chain_cs_dag_event_item_t * l_event_last = NULL; // Check the events and update the lasts @@ -300,11 +300,13 @@ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t l_event->hashes_n_datum_n_signs + l_event->header.hash_count*sizeof (*l_link_hash) ); l_link_hash += sizeof (dap_chain_hash_fast_t ) ) { l_event_last = NULL; + pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock); HASH_FIND(hh,PVT(l_dag)->events_lasts_unlinked,&l_link_hash,sizeof(*l_link_hash), l_event_last); if ( l_event_last ){ // If present in unlinked - remove HASH_DEL(PVT(l_dag)->events_lasts_unlinked,l_event_last); - DAP_DELETE(l_event_last); + DAP_DEL_Z(l_event_last); } + pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); } // and then adds itself @@ -312,8 +314,9 @@ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t l_event_last->ts_added = l_event_item->ts_added; l_event_last->event = l_event; dap_hash_fast(l_event, dap_chain_cs_dag_event_calc_size(l_event),&l_event_last->hash ); - + pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock); HASH_ADD(hh,PVT(l_dag)->events_lasts_unlinked,hash,sizeof (l_event_last->hash),l_event_last); + pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); } // add datum from event to ledger @@ -543,7 +546,7 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain dap_chain_cs_dag_event_t* dap_chain_cs_dag_find_event_by_hash(dap_chain_cs_dag_t * a_dag, dap_chain_hash_fast_t * a_hash) { dap_chain_cs_dag_event_item_t* l_event_item = NULL; - pthread_rwlock_rdlock( &PVT(a_dag)->events_rwlock ); + pthread_rwlock_wrlock( &PVT(a_dag)->events_rwlock ); HASH_FIND(hh, PVT(a_dag)->events ,a_hash,sizeof(*a_hash), l_event_item); dap_chain_cs_dag_event_t * l_event = l_event_item->event; pthread_rwlock_unlock( &PVT(a_dag)->events_rwlock ); @@ -620,11 +623,13 @@ void s_dag_events_lasts_delete_linked_with_event(dap_chain_cs_dag_t * a_dag, dap for (size_t i = 0; i< a_event->header.hash_count; i++) { dap_chain_hash_fast_t * l_hash = ((dap_chain_hash_fast_t *) a_event->hashes_n_datum_n_signs) + i; dap_chain_cs_dag_event_item_t * l_event_item = NULL; + pthread_rwlock_wrlock(&PVT(a_dag)->events_rwlock); HASH_FIND(hh, PVT(a_dag)->events_lasts_unlinked ,l_hash ,sizeof (*l_hash), l_event_item); if ( l_event_item ){ HASH_DEL(PVT(a_dag)->events_lasts_unlinked,l_event_item); - DAP_DELETE(l_event_item); + DAP_DEL_Z(l_event_item); } + pthread_rwlock_wrlock(&PVT(a_dag)->events_rwlock); } } @@ -663,7 +668,7 @@ int dap_chain_cs_dag_event_verify_hashes_with_treshold(dap_chain_cs_dag_t * a_da void dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag) { // TODO Process finish treshold. For now - easiest from possible - pthread_rwlock_rdlock(&PVT(a_dag)->events_rwlock); + pthread_rwlock_wrlock(&PVT(a_dag)->events_rwlock); dap_chain_cs_dag_event_item_t * l_event_item = NULL, * l_event_item_tmp = NULL; HASH_ITER(hh,PVT(a_dag)->events_treshold,l_event_item, l_event_item_tmp){ dap_chain_cs_dag_event_t * l_event = l_event_item->event; @@ -785,10 +790,12 @@ static dap_chain_atom_ptr_t* s_chain_callback_atom_iter_get_lasts( dap_chain_ato dap_chain_cs_dag_event_item_t * l_event_item = NULL, *l_event_item_tmp = NULL; size_t i = 0; + pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock); HASH_ITER(hh,PVT(l_dag)->events_lasts_unlinked, l_event_item,l_event_item_tmp){ l_ret[i] = l_event_item->event; i++; } + pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); return l_ret; } return NULL; @@ -1155,7 +1162,9 @@ static int s_cli_dag(int argc, char ** argv, void *arg_func, char **a_str_reply) }else if ( strcmp(l_from_events_str,"events_lasts") == 0){ dap_chain_cs_dag_event_item_t * l_event_item = NULL; + pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock); HASH_FIND(hh,PVT(l_dag)->events_lasts_unlinked,&l_event_hash,sizeof(l_event_hash),l_event_item); + pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); if ( l_event_item ) l_event = l_event_item->event; else { @@ -1166,7 +1175,9 @@ static int s_cli_dag(int argc, char ** argv, void *arg_func, char **a_str_reply) } }else if ( strcmp(l_from_events_str,"events") == 0){ dap_chain_cs_dag_event_item_t * l_event_item = NULL; + pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock); HASH_FIND(hh,PVT(l_dag)->events,&l_event_hash,sizeof(l_event_hash),l_event_item); + pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); if ( l_event_item ) l_event = l_event_item->event; else {