From d8a9327aede45c49e1be2f920d6ceec36a35faff Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Wed, 11 Nov 2020 12:37:22 +0300 Subject: [PATCH] [*] Grace period debugged --- dap-sdk/net/core/dap_timerfd.c | 1 - dap-sdk/net/stream/ch/dap_stream_ch_pkt.c | 4 +- .../dap_stream_ch_chain_net_srv.c | 55 +++++++++++++------ modules/net/srv/dap_chain_net_srv.c | 12 ++-- modules/net/srv/include/dap_chain_net_srv.h | 11 ++++ .../dap_chain_net_srv_stream_session.h | 10 +--- modules/service/vpn/dap_chain_net_srv_vpn.c | 20 +++---- 7 files changed, 69 insertions(+), 44 deletions(-) diff --git a/dap-sdk/net/core/dap_timerfd.c b/dap-sdk/net/core/dap_timerfd.c index 8e9bea676d..d688bb4336 100644 --- a/dap-sdk/net/core/dap_timerfd.c +++ b/dap-sdk/net/core/dap_timerfd.c @@ -139,7 +139,6 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock) } else { close(l_timerfd->tfd); dap_events_socket_remove_and_delete_unsafe(l_timerfd->events_socket, false); - DAP_DELETE(l_timerfd); } } diff --git a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c index 159082f39c..b13e46355d 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c @@ -162,8 +162,8 @@ bool dap_stream_ch_check_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t * */ size_t dap_stream_ch_pkt_write_unsafe(dap_stream_ch_t * a_ch, uint8_t a_type, const void * a_data, size_t a_data_size) { - if (!a_data_size || !a_ch || !a_data) { - log_it(L_WARNING, "NULL ptr or zero data size to write out in channel"); + if (!a_ch) { + log_it(L_WARNING, "Channel is NULL ptr"); return 0; } //log_it(L_DEBUG,"Output: Has %u bytes of %c type for %c channel id",data_size, (char)type, (char) ch->proc->id ); diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c index 330f3ddcbd..67e970a839 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c @@ -131,7 +131,7 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch , void* a_arg) static bool s_unban_client(dap_chain_net_srv_banlist_item_t *a_item) { pthread_mutex_lock(a_item->ht_mutex); - HASH_DEL(*a_item->ht_head, a_item); + HASH_DEL(*(a_item->ht_head), a_item); pthread_mutex_unlock(a_item->ht_mutex); DAP_DELETE(a_item); return false; @@ -139,8 +139,9 @@ static bool s_unban_client(dap_chain_net_srv_banlist_item_t *a_item) static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) { + assert(a_grace); dap_stream_ch_chain_net_srv_pkt_error_t l_err; - memset(&l_err,0,sizeof (l_err)); + memset(&l_err, 0, sizeof(l_err)); dap_chain_net_srv_t * l_srv = NULL; dap_stream_ch_t *l_ch = a_grace->ch; @@ -183,7 +184,7 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) l_tx = dap_chain_ledger_tx_find_by_hash( l_ledger,& l_request->hdr.tx_cond ); if ( ! l_tx ){ // No tx cond transaction - if (a_grace->usage) { + if (a_grace->usage) { // marker for reentry to function log_it( L_WARNING, "No tx cond transaction"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND ; goto free_exit; @@ -267,7 +268,17 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) } if ( l_srv->pricelist) { - if ( l_price ){ + if (l_price || l_grace_start) { + if (l_price) { + if (a_grace->usage) { + DAP_DELETE(l_usage->price); + } + } else { + l_price = DAP_NEW_Z(dap_chain_net_srv_price_t); + memcpy(l_price, l_srv->pricelist, sizeof(*l_price)); + l_price->value_datoshi = 0; + l_price->value_coins = 0; + } l_usage->price = l_price; // TODO extend callback to pass ext and ext size from service callbacks l_receipt = dap_chain_net_srv_issue_receipt( l_usage->service, l_usage, l_usage->price,NULL,0 ); @@ -310,14 +321,22 @@ free_exit: } if (a_grace->usage) { // add client pkey hash to banlist a_grace->usage->is_active = false; - dap_chain_net_srv_banlist_item_t *l_item = DAP_NEW_Z(dap_chain_net_srv_banlist_item_t); - memcpy(&l_item->client_pkey_hash, &a_grace->usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t)); - l_item->ht_mutex = &l_srv_session->parent->mutex; - l_item->ht_head = &l_srv_session->ban_list; - pthread_mutex_lock(&l_srv_session->parent->mutex); - HASH_ADD(hh, l_srv_session->ban_list, client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); - pthread_mutex_unlock(&l_srv_session->parent->mutex); - dap_timerfd_start(l_srv->grace_period * 10000, (dap_timerfd_callback_t)s_unban_client, l_item); + if (l_srv) { + dap_chain_net_srv_banlist_item_t *l_item = NULL; + pthread_mutex_lock(&l_srv->banlist_mutex); + HASH_FIND(hh, l_srv->ban_list, &a_grace->usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); + if (l_item) + pthread_mutex_unlock(&l_srv->banlist_mutex); + else { + l_item = DAP_NEW_Z(dap_chain_net_srv_banlist_item_t); + memcpy(&l_item->client_pkey_hash, &a_grace->usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t)); + l_item->ht_mutex = &l_srv->banlist_mutex; + l_item->ht_head = &l_srv->ban_list; + HASH_ADD(hh, l_srv->ban_list, client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); + pthread_mutex_unlock(&l_srv->banlist_mutex); + dap_timerfd_start(l_srv->grace_period * 10000, (dap_timerfd_callback_t)s_unban_client, l_item); + } + } } else if (l_usage) dap_chain_net_srv_usage_delete(l_srv_session, l_usage); @@ -520,10 +539,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) // Check receipt signature pkey hash dap_sign_get_pkey_hash(l_receipt_sign, &l_usage->client_pkey_hash); dap_chain_net_srv_banlist_item_t *l_item = NULL; + dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_receipt->receipt_info.srv_uid); if (l_usage->is_grace) { - pthread_mutex_lock(&l_srv_session->parent->mutex); - HASH_FIND(hh, l_srv_session->ban_list, &l_usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); - pthread_mutex_unlock(&l_srv_session->parent->mutex); + pthread_mutex_lock(&l_srv->banlist_mutex); + HASH_FIND(hh, l_srv->ban_list, &l_usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); + pthread_mutex_unlock(&l_srv->banlist_mutex); if (l_item) { // client banned // Update actual receipt l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_BANNED_PKEY_HASH ; @@ -593,11 +613,10 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) DAP_DELETE(l_tx_in_hash); } - if (l_usage->is_grace) { - dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_ch_chain_net_srv->srv_uid); + if (l_usage->is_grace) log_it(L_NOTICE, "Receipt is OK, but transaction can't be found. Start the grace period for %d seconds", l_srv->grace_period); - } else + else log_it(L_NOTICE, "Receipt with remote client sign is acceptible for. Now start the service's usage"); dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS , diff --git a/modules/net/srv/dap_chain_net_srv.c b/modules/net/srv/dap_chain_net_srv.c index 0b307a578f..9c6ca9d747 100644 --- a/modules/net/srv/dap_chain_net_srv.c +++ b/modules/net/srv/dap_chain_net_srv.c @@ -554,10 +554,10 @@ dap_chain_net_srv_t* dap_chain_net_srv_add(dap_chain_net_srv_uid_t a_uid,dap_cha l_srv->callback_response_success = a_callback_response_success; l_srv->callback_response_error = a_callback_response_error; l_srv->callback_receipt_next_success = a_callback_receipt_next_success; + pthread_mutex_init(&l_srv->banlist_mutex, NULL); l_sdata = DAP_NEW_Z(service_list_t); memcpy(&l_sdata->uid, &l_uid, sizeof(l_uid)); - l_sdata->srv = l_srv;//DAP_NEW(dap_chain_net_srv_t); - //memcpy(l_sdata->srv, l_srv, sizeof(dap_chain_net_srv_t)); + l_sdata->srv = l_srv; HASH_ADD(hh, s_srv_list, uid, sizeof(l_srv->uid), l_sdata); }else{ log_it(L_ERROR, "Already present service with 0x%016llX ", a_uid.uint64); @@ -617,8 +617,10 @@ void dap_chain_net_srv_del(dap_chain_net_srv_t * a_srv) pthread_mutex_lock(&s_srv_list_mutex); HASH_FIND(hh, s_srv_list, a_srv, sizeof(dap_chain_net_srv_uid_t), l_sdata); if(l_sdata) { - DAP_DELETE(l_sdata); HASH_DEL(s_srv_list, l_sdata); + pthread_mutex_destroy(&a_srv->banlist_mutex); + DAP_DELETE(a_srv); + DAP_DELETE(l_sdata); } pthread_mutex_unlock(&s_srv_list_mutex); } @@ -679,8 +681,10 @@ void dap_chain_net_srv_del_all(void) pthread_mutex_lock(&s_srv_list_mutex); HASH_ITER(hh, s_srv_list , l_sdata, l_sdata_tmp) { - DAP_DELETE(l_sdata); HASH_DEL(s_srv_list, l_sdata); + pthread_mutex_destroy(&l_sdata->srv->banlist_mutex); + DAP_DELETE(l_sdata->srv); + DAP_DELETE(l_sdata); } pthread_mutex_unlock(&s_srv_list_mutex); } diff --git a/modules/net/srv/include/dap_chain_net_srv.h b/modules/net/srv/include/dap_chain_net_srv.h index a38b08f77b..cb018bdcfb 100755 --- a/modules/net/srv/include/dap_chain_net_srv.h +++ b/modules/net/srv/include/dap_chain_net_srv.h @@ -36,12 +36,23 @@ typedef int (*dap_chain_net_srv_callback_data_t)(dap_chain_net_srv_t *, uint32_t typedef int (*dap_chain_net_srv_callback_sign_request_t)(dap_chain_net_srv_t *, uint32_t, dap_chain_net_srv_client_t *, dap_chain_datum_tx_receipt_t **, size_t ); typedef void (*dap_chain_net_srv_callback_ch_t)(dap_chain_net_srv_t *, dap_stream_ch_t *); +typedef struct dap_chain_net_srv_banlist_item { + dap_chain_hash_fast_t client_pkey_hash; + pthread_mutex_t *ht_mutex; + struct dap_chain_net_srv_banlist_item **ht_head; + UT_hash_handle hh; +} dap_chain_net_srv_banlist_item_t; + typedef struct dap_chain_net_srv { dap_chain_net_srv_uid_t uid; // Unique ID for service. dap_chain_net_srv_abstract_t srv_common; dap_chain_net_srv_price_t *pricelist; + uint32_t grace_period; + pthread_mutex_t banlist_mutex; + dap_chain_net_srv_banlist_item_t *ban_list; + dap_chain_callback_trafic_t callback_trafic; // Request for usage diff --git a/modules/net/srv/include/dap_chain_net_srv_stream_session.h b/modules/net/srv/include/dap_chain_net_srv_stream_session.h index bc9e593c4c..3a7377526f 100644 --- a/modules/net/srv/include/dap_chain_net_srv_stream_session.h +++ b/modules/net/srv/include/dap_chain_net_srv_stream_session.h @@ -74,18 +74,11 @@ typedef struct dap_net_stats{ intmax_t packets_recv_lost; } dap_net_stats_t; -typedef struct dap_chain_net_srv_banlist_item { - dap_chain_hash_fast_t client_pkey_hash; - pthread_mutex_t *ht_mutex; - struct dap_chain_net_srv_banlist_item **ht_head; - UT_hash_handle hh; -} dap_chain_net_srv_banlist_item_t; - typedef struct dap_chain_net_srv_stream_session { + time_t ts_activated; dap_stream_session_t * parent; dap_chain_net_srv_usage_t * usages; dap_chain_net_srv_usage_t * usage_active; - dap_chain_net_srv_banlist_item_t *ban_list; uintmax_t limits_bytes; // Bytes left time_t limits_ts; // Timestamp until its activte dap_chain_net_srv_price_unit_uid_t limits_units_type; @@ -93,7 +86,6 @@ typedef struct dap_chain_net_srv_stream_session { // Some common stats volatile dap_net_stats_t stats; - time_t ts_activated; dap_sign_t* user_sign; // User's signature for auth if reconnect } dap_chain_net_srv_stream_session_t; diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index 3abf8077b8..93c8f92d30 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -1268,18 +1268,18 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find_unsafe(l_srv_session, l_ch_vpn->usage_id); if ( ! l_usage){ - log_it(L_NOTICE, "No active usage in list, possible disconnected. Send nothin on this channel"); + log_it(L_NOTICE, "No active usage in list, possible disconnected. Send nothing on this channel"); dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); return; } if ( ! l_usage->is_active ){ - log_it(L_INFO, "Usage inactivation: switch off packet input channel"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); - dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); + log_it(L_INFO, "Usage inactivation: switch off packet input & output channels"); if (l_usage->client) dap_stream_ch_pkt_write_unsafe( l_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); + dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); + dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); return; } @@ -1414,26 +1414,26 @@ static void s_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find_unsafe(l_srv_session, l_ch_vpn->usage_id); if ( ! l_usage){ - log_it(L_NOTICE, "No active usage in list, possible disconnected. Send nothin on this channel"); + log_it(L_NOTICE, "No active usage in list, possible disconnected. Send nothing on this channel"); dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); return; } if ( ! l_usage->is_active ){ - log_it(L_INFO, "Usage inactivation: switch off packet output channel"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); - dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); + log_it(L_INFO, "Usage inactivation: switch off packet input & output channels"); if (l_usage->client) dap_stream_ch_pkt_write_unsafe( l_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); + dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); + dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); return; } if ( (! l_usage->is_free) && (! l_usage->receipt) ){ log_it(L_WARNING, "No active receipt, switching off"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); - dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); if (l_usage->client) dap_stream_ch_pkt_write_unsafe( l_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); + dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); + dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); return; } // Check for empty buffer out here to prevent warnings in worker -- GitLab