diff --git a/dap-sdk/io/dap_context.c b/dap-sdk/io/dap_context.c index 73524e2e0fa8d8313ce9f677fabc6c69c7cbea6f..12ef1b90781763bc1c54de7a240651073086b499 100644 --- a/dap-sdk/io/dap_context.c +++ b/dap-sdk/io/dap_context.c @@ -1270,7 +1270,7 @@ int dap_context_add(dap_context_t * a_context, dap_events_socket_t * a_es ) if ( a_es->type == DESCRIPTOR_TYPE_QUEUE ){ goto lb_exit; } - if ( a_es->type == DESCRIPTOR_TYPE_EVENT && a_es->pipe_out){ + if ( a_es->type == DESCRIPTOR_TYPE_EVENT /*&& a_es->pipe_out*/){ goto lb_exit; } struct kevent l_event; @@ -1284,14 +1284,15 @@ int dap_context_add(dap_context_t * a_context, dap_events_socket_t * a_es ) l_errno = -1; goto lb_exit; } - if (a_es->type == DESCRIPTOR_TYPE_EVENT ){ + /*if (a_es->type == DESCRIPTOR_TYPE_EVENT ){ EV_SET(&l_event, a_es->socket, EVFILT_USER,EV_ADD| EV_CLEAR ,0,0, &a_es->kqueue_event_catched_data ); if( kevent( l_kqueue_fd,&l_event,1,NULL,0,NULL)!=0){ l_is_error = true; l_errno = errno; goto lb_exit; } - }else{ + }else*/ + { if( l_filter){ EV_SET(&l_event, a_es->socket, l_filter,l_flags| EV_ADD,l_fflags,a_es->kqueue_data,a_es); if( kevent( l_kqueue_fd,&l_event,1,NULL,0,NULL) != 0 ){ diff --git a/dap-sdk/io/dap_events_socket.c b/dap-sdk/io/dap_events_socket.c index 684e3e78ac9721b6755127450099b2a3b5b197fd..0e1732597d31a82339bf23022bedab2dea5d7627 100644 --- a/dap-sdk/io/dap_events_socket.c +++ b/dap-sdk/io/dap_events_socket.c @@ -425,8 +425,8 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket #elif defined(DAP_EVENTS_CAPS_KQUEUE) // Here we have event identy thats we copy l_es->fd = a_es->fd; // - l_es->kqueue_base_flags = EV_CLEAR; - l_es->kqueue_base_fflags = 0; + l_es->kqueue_base_flags = EV_ONESHOT; + l_es->kqueue_base_fflags = NOTE_TRIGGER | NOTE_FFNOP; l_es->kqueue_base_filter = EVFILT_USER; l_es->kqueue_event_catched_data.esocket = l_es; @@ -886,7 +886,7 @@ int dap_events_socket_queue_ptr_send_to_input(dap_events_socket_t * a_es_input, l_es_w_data->esocket = l_es; l_es_w_data->ptr = a_arg; - EV_SET(&l_event,a_es_input->socket+arc4random() , EVFILT_USER,EV_ADD | EV_CLEAR | EV_ONESHOT, NOTE_FFCOPY | NOTE_TRIGGER ,0, l_es_w_data); + EV_SET(&l_event,a_es_input->socket+arc4random() , EVFILT_USER,EV_ADD | EV_ONESHOT, NOTE_FFNOP | NOTE_TRIGGER ,0, l_es_w_data); if(l_es->context) l_ret=kevent(l_es->context->kqueue_fd,&l_event,1,NULL,0,NULL); else @@ -1019,7 +1019,7 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t *a_es, void *a_arg) l_es_w_data->esocket = a_es; l_es_w_data->ptr = a_arg; - EV_SET(&l_event,a_es->socket+arc4random() , EVFILT_USER,EV_ADD | EV_CLEAR | EV_ONESHOT, NOTE_FFCOPY | NOTE_TRIGGER ,0, l_es_w_data); + EV_SET(&l_event,a_es->socket+arc4random() , EVFILT_USER,EV_ADD | EV_ONESHOT, NOTE_FFNOP | NOTE_TRIGGER ,0, l_es_w_data); int l_n; if(a_es->pipe_out){ // If we have pipe out - we send events directly to the pipe out kqueue fd if(a_es->pipe_out->context){ @@ -1108,7 +1108,7 @@ int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value l_es_w_data->esocket = a_es; l_es_w_data->value = a_value; - EV_SET(&l_event,a_es->socket, EVFILT_USER,0, NOTE_TRIGGER ,(intptr_t) a_es->socket, l_es_w_data); + EV_SET(&l_event,a_es->socket, EVFILT_USER,EV_ADD | EV_ONESHOT, NOTE_TRIGGER | NOTE_FFNOP ,(intptr_t) a_es->socket, l_es_w_data); int l_n; diff --git a/modules/chain/dap_chain_ledger.c b/modules/chain/dap_chain_ledger.c index 41d04dc0f7f22209968fbfe333ca194f7a422ca4..7e4aef83679c88cfda76d749f31e2ed23b989013 100644 --- a/modules/chain/dap_chain_ledger.c +++ b/modules/chain/dap_chain_ledger.c @@ -227,6 +227,16 @@ static int s_token_tsd_parse(dap_ledger_t * a_ledger, dap_chain_ledger_token_ite static int s_ledger_permissions_check(dap_chain_ledger_token_item_t * a_token_item, uint16_t a_permission_id, const void * a_data,size_t a_data_size ); static bool s_ledger_tps_callback(void *a_arg); +static inline int s_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, dap_hash_fast_t *a_tx_hash, bool a_from_threshold, bool a_safe_call); +static int s_tx_add_unsafe(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, dap_hash_fast_t *a_tx_hash, bool a_from_threshold); + +static int s_token_emission_add_unsafe(dap_ledger_t *a_ledger, byte_t *a_token_emission, size_t a_token_emission_size, + dap_hash_fast_t *a_emission_hash, bool a_from_threshold); +static inline int s_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_emission, size_t a_token_emission_size, + dap_hash_fast_t *a_emission_hash, bool a_from_threshold, bool a_safe_call); + + + static size_t s_threshold_emissions_max = 1000; static size_t s_threshold_txs_max = 10000; static bool s_debug_more = false; @@ -1060,6 +1070,7 @@ dap_string_t *dap_chain_ledger_threshold_hash_info(dap_ledger_t *a_ledger, dap_c dap_string_append(l_str_ret, "Hash was found in ledger tx threshold:"); dap_string_append(l_str_ret, l_tx_hash_str); dap_string_append(l_str_ret, "\n"); + pthread_rwlock_unlock(&l_ledger_pvt->threshold_txs_rwlock); return l_str_ret; } } @@ -1075,6 +1086,7 @@ dap_string_t *dap_chain_ledger_threshold_hash_info(dap_ledger_t *a_ledger, dap_c dap_string_append(l_str_ret, "Hash was found in ledger emission threshold: "); dap_string_append(l_str_ret, l_emission_hash_str); dap_string_append(l_str_ret, "\n"); + pthread_rwlock_unlock(&l_ledger_pvt->threshold_txs_rwlock); return l_str_ret; } } @@ -1178,23 +1190,19 @@ static void s_threshold_emissions_proc(dap_ledger_t * a_ledger) do { l_success = false; dap_chain_ledger_token_emission_item_t *l_emission_item, *l_emission_tmp; - pthread_rwlock_rdlock(&PVT(a_ledger)->threshold_emissions_rwlock); + pthread_rwlock_wrlock(&PVT(a_ledger)->threshold_emissions_rwlock); HASH_ITER(hh, PVT(a_ledger)->threshold_emissions, l_emission_item, l_emission_tmp) { - pthread_rwlock_unlock(&PVT(a_ledger)->threshold_emissions_rwlock); - int l_res = dap_chain_ledger_token_emission_add(a_ledger, (byte_t *)l_emission_item->datum_token_emission, + int l_res = s_token_emission_add_unsafe(a_ledger, (byte_t *)l_emission_item->datum_token_emission, l_emission_item->datum_token_emission_size, &l_emission_item->datum_token_emission_hash, true); if (l_res != DAP_CHAIN_CS_VERIFY_CODE_TX_NO_TOKEN) { - pthread_rwlock_wrlock(&PVT(a_ledger)->threshold_emissions_rwlock); HASH_DEL(PVT(a_ledger)->threshold_emissions, l_emission_item); - pthread_rwlock_unlock(&PVT(a_ledger)->threshold_emissions_rwlock); if (l_res) DAP_DELETE(l_emission_item->datum_token_emission); DAP_DELETE(l_emission_item); l_success = true; } - pthread_rwlock_rdlock(&PVT(a_ledger)->threshold_emissions_rwlock); } pthread_rwlock_unlock(&PVT(a_ledger)->threshold_emissions_rwlock); } while (l_success); @@ -1208,14 +1216,12 @@ static void s_threshold_txs_proc( dap_ledger_t *a_ledger) { bool l_success; dap_ledger_private_t * l_ledger_pvt = PVT(a_ledger); - pthread_rwlock_rdlock(&l_ledger_pvt->threshold_txs_rwlock); + pthread_rwlock_wrlock(&l_ledger_pvt->threshold_txs_rwlock); do { l_success = false; dap_chain_ledger_tx_item_t *l_tx_item, *l_tx_tmp; HASH_ITER(hh, l_ledger_pvt->threshold_txs, l_tx_item, l_tx_tmp) { - pthread_rwlock_unlock(&l_ledger_pvt->threshold_txs_rwlock ); - int l_res = dap_chain_ledger_tx_add(a_ledger, l_tx_item->tx, &l_tx_item->tx_hash_fast, true); - pthread_rwlock_wrlock(&l_ledger_pvt->threshold_txs_rwlock); + int l_res = s_tx_add_unsafe(a_ledger, l_tx_item->tx, &l_tx_item->tx_hash_fast, true); if (l_res != DAP_CHAIN_CS_VERIFY_CODE_TX_NO_EMISSION && l_res != DAP_CHAIN_CS_VERIFY_CODE_TX_NO_PREVIOUS) { HASH_DEL(l_ledger_pvt->threshold_txs, l_tx_item); @@ -1672,14 +1678,47 @@ bool s_chain_ledger_token_tsd_check(dap_chain_ledger_token_item_t * a_token_item return true; } + /** * @brief dap_chain_ledger_token_emission_add * @param a_token_emission * @param a_token_emision_size * @return */ + int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_emission, size_t a_token_emission_size, dap_hash_fast_t *a_emission_hash, bool a_from_threshold) +{ + return s_token_emission_add(a_ledger, a_token_emission, a_token_emission_size, a_emission_hash, a_from_threshold, true); +} + +/** + * @brief s_token_emission_add_unsafe + * @param a_ledger + * @param a_token_emission + * @param a_token_emission_size + * @param a_emission_hash + * @param a_from_threshold + * @return + */ +static int s_token_emission_add_unsafe(dap_ledger_t *a_ledger, byte_t *a_token_emission, size_t a_token_emission_size, + dap_hash_fast_t *a_emission_hash, bool a_from_threshold) +{ + return s_token_emission_add(a_ledger, a_token_emission, a_token_emission_size, a_emission_hash, a_from_threshold, false); +} + +/** + * @brief s_token_emission_add + * @param a_ledger + * @param a_token_emission + * @param a_token_emission_size + * @param a_emission_hash + * @param a_from_threshold + * @param a_safe_call + * @return + */ +static inline int s_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_emission, size_t a_token_emission_size, + dap_hash_fast_t *a_emission_hash, bool a_from_threshold, bool a_safe_call) { int l_ret = dap_chain_ledger_token_emission_add_check(a_ledger, a_token_emission, a_token_emission_size); if (l_ret) @@ -1695,11 +1734,11 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_ return DAP_CHAIN_CS_VERIFY_CODE_TX_NO_TOKEN; // check if such emission is already present in table - pthread_rwlock_rdlock( l_token_item ? &l_token_item->token_emissions_rwlock + if(a_safe_call) pthread_rwlock_rdlock( l_token_item ? &l_token_item->token_emissions_rwlock : &l_ledger_priv->threshold_emissions_rwlock); HASH_FIND(hh,l_token_item ? l_token_item->token_emissions : l_ledger_priv->threshold_emissions, a_emission_hash, sizeof(*a_emission_hash), l_token_emission_item); - pthread_rwlock_unlock(l_token_item ? &l_token_item->token_emissions_rwlock + if(a_safe_call) pthread_rwlock_unlock(l_token_item ? &l_token_item->token_emissions_rwlock : &l_ledger_priv->threshold_emissions_rwlock); char *l_hash_str = dap_chain_hash_fast_to_str_new(a_emission_hash); if (!l_token_emission_item) { @@ -1764,10 +1803,10 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_ s_threshold_txs_proc(a_ledger); } else if (HASH_COUNT(l_ledger_priv->threshold_emissions) < s_threshold_emissions_max) { l_token_emission_item->datum_token_emission = DAP_DUP_SIZE(a_token_emission, a_token_emission_size); - pthread_rwlock_wrlock(&l_ledger_priv->threshold_emissions_rwlock); + if(a_safe_call) pthread_rwlock_wrlock(&l_ledger_priv->threshold_emissions_rwlock); HASH_ADD(hh, l_ledger_priv->threshold_emissions, datum_token_emission_hash, sizeof(*a_emission_hash), l_token_emission_item); - pthread_rwlock_unlock(&l_ledger_priv->threshold_emissions_rwlock); + if(a_safe_call) pthread_rwlock_unlock(&l_ledger_priv->threshold_emissions_rwlock); l_ret = DAP_CHAIN_CS_VERIFY_CODE_TX_NO_TOKEN; if(s_debug_more) { char * l_token_emission_address_str = dap_chain_addr_to_str(&(l_token_emission_item->datum_token_emission->hdr.address)); @@ -2768,12 +2807,43 @@ int sort_ledger_tx_item(dap_chain_ledger_tx_item_t* a, dap_chain_ledger_tx_item_ a->tx->header.ts_created < b->tx->header.ts_created ? -1 : 1; } + /** - * Add new transaction to the cache list - * - * return 1 OK, -1 error + * @brief Add new transaction to the cache list + * @param a_ledger + * @param a_tx + * @param a_tx_hash + * @param a_from_threshold + * @return return 1 OK, -1 error */ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, dap_hash_fast_t *a_tx_hash, bool a_from_threshold) +{ + return s_tx_add(a_ledger,a_tx,a_tx_hash,a_from_threshold,true); +} + +/** + * @brief Add new transaction to the cache list, without rwlocks lock + * @param a_ledger + * @param a_tx + * @param a_tx_hash + * @param a_from_threshold + * @return return 1 OK, -1 error + */ +static int s_tx_add_unsafe(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, dap_hash_fast_t *a_tx_hash, bool a_from_threshold) +{ + return s_tx_add(a_ledger,a_tx,a_tx_hash,a_from_threshold,false); +} + +/** + * @brief Add new transaction to the cache list + * @param a_ledger + * @param a_tx + * @param a_tx_hash + * @param a_from_threshold + * @param a_safe_call True if we need to lock rwlock, false if not + * @return return 1 OK, -1 error + */ +static inline int s_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, dap_hash_fast_t *a_tx_hash, bool a_from_threshold, bool a_safe_call) { if(!a_tx){ if(s_debug_more) @@ -2797,9 +2867,9 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, bool l_from_threshold = a_from_threshold; if (!l_ledger_priv->load_mode && !a_from_threshold) { HASH_VALUE(a_tx_hash, sizeof(*a_tx_hash), l_hash_value); - pthread_rwlock_rdlock(&l_ledger_priv->ledger_rwlock); + if(a_safe_call) pthread_rwlock_rdlock(&l_ledger_priv->ledger_rwlock); HASH_FIND_BYHASHVALUE(hh, l_ledger_priv->ledger_items, a_tx_hash, sizeof(dap_chain_hash_fast_t), l_hash_value, l_item_tmp); - pthread_rwlock_unlock(&l_ledger_priv->ledger_rwlock); + if(a_safe_call) pthread_rwlock_unlock(&l_ledger_priv->ledger_rwlock); } char l_tx_hash_str[70]; dap_chain_hash_fast_to_str(a_tx_hash, l_tx_hash_str, sizeof(l_tx_hash_str)); @@ -3149,10 +3219,10 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, if (!l_hash_value) HASH_VALUE(a_tx_hash, sizeof(*a_tx_hash), l_hash_value); - pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); + if(a_safe_call) pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); HASH_ADD_BYHASHVALUE_INORDER(hh, l_ledger_priv->ledger_items, tx_hash_fast, sizeof(dap_chain_hash_fast_t), l_hash_value, l_tx_item, sort_ledger_tx_item); // tx_hash_fast: name of key field - pthread_rwlock_unlock(&l_ledger_priv->ledger_rwlock); + if(a_safe_call) pthread_rwlock_unlock(&l_ledger_priv->ledger_rwlock); // Count TPS clock_gettime(CLOCK_REALTIME, &l_ledger_priv->tps_end_time); l_ledger_priv->tps_count++; diff --git a/modules/net/dap_chain_node_dns_client.c b/modules/net/dap_chain_node_dns_client.c index 2163479c55bc6fdf705ebb876106e2cbd18937e2..12fd553cd74612d992e6412f3d9e83b30fc8b74d 100644 --- a/modules/net/dap_chain_node_dns_client.c +++ b/modules/net/dap_chain_node_dns_client.c @@ -257,7 +257,6 @@ int dap_chain_node_info_dns_request(struct in_addr a_addr, uint16_t a_port, char l_esocket_callbacks.error_callback = s_dns_client_esocket_error_callback; // Error processing function dap_events_socket_t * l_esocket = dap_events_socket_create(DESCRIPTOR_TYPE_SOCKET_UDP,&l_esocket_callbacks); - // l_esocket->flags |= DAP_SOCK_DROP_WRITE_IF_ZERO; l_esocket->remote_addr.sin_family = AF_INET; l_esocket->remote_addr.sin_port = htons(a_port); l_esocket->remote_addr.sin_addr = a_addr;