Skip to content
Snippets Groups Projects
Commit d8a9327a authored by Roman Khlopkov's avatar Roman Khlopkov 🔜
Browse files

[*] Grace period debugged

parent 46cd0b72
No related branches found
No related tags found
2 merge requests!253features-4616,!252features-4616
......@@ -139,7 +139,6 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock)
} else {
close(l_timerfd->tfd);
dap_events_socket_remove_and_delete_unsafe(l_timerfd->events_socket, false);
DAP_DELETE(l_timerfd);
}
}
......
......@@ -162,8 +162,8 @@ bool dap_stream_ch_check_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t *
*/
size_t dap_stream_ch_pkt_write_unsafe(dap_stream_ch_t * a_ch, uint8_t a_type, const void * a_data, size_t a_data_size)
{
if (!a_data_size || !a_ch || !a_data) {
log_it(L_WARNING, "NULL ptr or zero data size to write out in channel");
if (!a_ch) {
log_it(L_WARNING, "Channel is NULL ptr");
return 0;
}
//log_it(L_DEBUG,"Output: Has %u bytes of %c type for %c channel id",data_size, (char)type, (char) ch->proc->id );
......
......@@ -131,7 +131,7 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch , void* a_arg)
static bool s_unban_client(dap_chain_net_srv_banlist_item_t *a_item)
{
pthread_mutex_lock(a_item->ht_mutex);
HASH_DEL(*a_item->ht_head, a_item);
HASH_DEL(*(a_item->ht_head), a_item);
pthread_mutex_unlock(a_item->ht_mutex);
DAP_DELETE(a_item);
return false;
......@@ -139,8 +139,9 @@ static bool s_unban_client(dap_chain_net_srv_banlist_item_t *a_item)
static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace)
{
assert(a_grace);
dap_stream_ch_chain_net_srv_pkt_error_t l_err;
memset(&l_err,0,sizeof (l_err));
memset(&l_err, 0, sizeof(l_err));
dap_chain_net_srv_t * l_srv = NULL;
dap_stream_ch_t *l_ch = a_grace->ch;
......@@ -183,7 +184,7 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace)
l_tx = dap_chain_ledger_tx_find_by_hash( l_ledger,& l_request->hdr.tx_cond );
if ( ! l_tx ){ // No tx cond transaction
if (a_grace->usage) {
if (a_grace->usage) { // marker for reentry to function
log_it( L_WARNING, "No tx cond transaction");
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND ;
goto free_exit;
......@@ -267,7 +268,17 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace)
}
if ( l_srv->pricelist) {
if ( l_price ){
if (l_price || l_grace_start) {
if (l_price) {
if (a_grace->usage) {
DAP_DELETE(l_usage->price);
}
} else {
l_price = DAP_NEW_Z(dap_chain_net_srv_price_t);
memcpy(l_price, l_srv->pricelist, sizeof(*l_price));
l_price->value_datoshi = 0;
l_price->value_coins = 0;
}
l_usage->price = l_price;
// TODO extend callback to pass ext and ext size from service callbacks
l_receipt = dap_chain_net_srv_issue_receipt( l_usage->service, l_usage, l_usage->price,NULL,0 );
......@@ -310,14 +321,22 @@ free_exit:
}
if (a_grace->usage) { // add client pkey hash to banlist
a_grace->usage->is_active = false;
dap_chain_net_srv_banlist_item_t *l_item = DAP_NEW_Z(dap_chain_net_srv_banlist_item_t);
memcpy(&l_item->client_pkey_hash, &a_grace->usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t));
l_item->ht_mutex = &l_srv_session->parent->mutex;
l_item->ht_head = &l_srv_session->ban_list;
pthread_mutex_lock(&l_srv_session->parent->mutex);
HASH_ADD(hh, l_srv_session->ban_list, client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item);
pthread_mutex_unlock(&l_srv_session->parent->mutex);
dap_timerfd_start(l_srv->grace_period * 10000, (dap_timerfd_callback_t)s_unban_client, l_item);
if (l_srv) {
dap_chain_net_srv_banlist_item_t *l_item = NULL;
pthread_mutex_lock(&l_srv->banlist_mutex);
HASH_FIND(hh, l_srv->ban_list, &a_grace->usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item);
if (l_item)
pthread_mutex_unlock(&l_srv->banlist_mutex);
else {
l_item = DAP_NEW_Z(dap_chain_net_srv_banlist_item_t);
memcpy(&l_item->client_pkey_hash, &a_grace->usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t));
l_item->ht_mutex = &l_srv->banlist_mutex;
l_item->ht_head = &l_srv->ban_list;
HASH_ADD(hh, l_srv->ban_list, client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item);
pthread_mutex_unlock(&l_srv->banlist_mutex);
dap_timerfd_start(l_srv->grace_period * 10000, (dap_timerfd_callback_t)s_unban_client, l_item);
}
}
}
else if (l_usage)
dap_chain_net_srv_usage_delete(l_srv_session, l_usage);
......@@ -520,10 +539,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg)
// Check receipt signature pkey hash
dap_sign_get_pkey_hash(l_receipt_sign, &l_usage->client_pkey_hash);
dap_chain_net_srv_banlist_item_t *l_item = NULL;
dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_receipt->receipt_info.srv_uid);
if (l_usage->is_grace) {
pthread_mutex_lock(&l_srv_session->parent->mutex);
HASH_FIND(hh, l_srv_session->ban_list, &l_usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item);
pthread_mutex_unlock(&l_srv_session->parent->mutex);
pthread_mutex_lock(&l_srv->banlist_mutex);
HASH_FIND(hh, l_srv->ban_list, &l_usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item);
pthread_mutex_unlock(&l_srv->banlist_mutex);
if (l_item) { // client banned
// Update actual receipt
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_BANNED_PKEY_HASH ;
......@@ -593,11 +613,10 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg)
DAP_DELETE(l_tx_in_hash);
}
if (l_usage->is_grace) {
dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_ch_chain_net_srv->srv_uid);
if (l_usage->is_grace)
log_it(L_NOTICE, "Receipt is OK, but transaction can't be found. Start the grace period for %d seconds",
l_srv->grace_period);
} else
else
log_it(L_NOTICE, "Receipt with remote client sign is acceptible for. Now start the service's usage");
dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS ,
......
......@@ -554,10 +554,10 @@ dap_chain_net_srv_t* dap_chain_net_srv_add(dap_chain_net_srv_uid_t a_uid,dap_cha
l_srv->callback_response_success = a_callback_response_success;
l_srv->callback_response_error = a_callback_response_error;
l_srv->callback_receipt_next_success = a_callback_receipt_next_success;
pthread_mutex_init(&l_srv->banlist_mutex, NULL);
l_sdata = DAP_NEW_Z(service_list_t);
memcpy(&l_sdata->uid, &l_uid, sizeof(l_uid));
l_sdata->srv = l_srv;//DAP_NEW(dap_chain_net_srv_t);
//memcpy(l_sdata->srv, l_srv, sizeof(dap_chain_net_srv_t));
l_sdata->srv = l_srv;
HASH_ADD(hh, s_srv_list, uid, sizeof(l_srv->uid), l_sdata);
}else{
log_it(L_ERROR, "Already present service with 0x%016llX ", a_uid.uint64);
......@@ -617,8 +617,10 @@ void dap_chain_net_srv_del(dap_chain_net_srv_t * a_srv)
pthread_mutex_lock(&s_srv_list_mutex);
HASH_FIND(hh, s_srv_list, a_srv, sizeof(dap_chain_net_srv_uid_t), l_sdata);
if(l_sdata) {
DAP_DELETE(l_sdata);
HASH_DEL(s_srv_list, l_sdata);
pthread_mutex_destroy(&a_srv->banlist_mutex);
DAP_DELETE(a_srv);
DAP_DELETE(l_sdata);
}
pthread_mutex_unlock(&s_srv_list_mutex);
}
......@@ -679,8 +681,10 @@ void dap_chain_net_srv_del_all(void)
pthread_mutex_lock(&s_srv_list_mutex);
HASH_ITER(hh, s_srv_list , l_sdata, l_sdata_tmp)
{
DAP_DELETE(l_sdata);
HASH_DEL(s_srv_list, l_sdata);
pthread_mutex_destroy(&l_sdata->srv->banlist_mutex);
DAP_DELETE(l_sdata->srv);
DAP_DELETE(l_sdata);
}
pthread_mutex_unlock(&s_srv_list_mutex);
}
......
......@@ -36,12 +36,23 @@ typedef int (*dap_chain_net_srv_callback_data_t)(dap_chain_net_srv_t *, uint32_t
typedef int (*dap_chain_net_srv_callback_sign_request_t)(dap_chain_net_srv_t *, uint32_t, dap_chain_net_srv_client_t *, dap_chain_datum_tx_receipt_t **, size_t );
typedef void (*dap_chain_net_srv_callback_ch_t)(dap_chain_net_srv_t *, dap_stream_ch_t *);
typedef struct dap_chain_net_srv_banlist_item {
dap_chain_hash_fast_t client_pkey_hash;
pthread_mutex_t *ht_mutex;
struct dap_chain_net_srv_banlist_item **ht_head;
UT_hash_handle hh;
} dap_chain_net_srv_banlist_item_t;
typedef struct dap_chain_net_srv
{
dap_chain_net_srv_uid_t uid; // Unique ID for service.
dap_chain_net_srv_abstract_t srv_common;
dap_chain_net_srv_price_t *pricelist;
uint32_t grace_period;
pthread_mutex_t banlist_mutex;
dap_chain_net_srv_banlist_item_t *ban_list;
dap_chain_callback_trafic_t callback_trafic;
// Request for usage
......
......@@ -74,18 +74,11 @@ typedef struct dap_net_stats{
intmax_t packets_recv_lost;
} dap_net_stats_t;
typedef struct dap_chain_net_srv_banlist_item {
dap_chain_hash_fast_t client_pkey_hash;
pthread_mutex_t *ht_mutex;
struct dap_chain_net_srv_banlist_item **ht_head;
UT_hash_handle hh;
} dap_chain_net_srv_banlist_item_t;
typedef struct dap_chain_net_srv_stream_session {
time_t ts_activated;
dap_stream_session_t * parent;
dap_chain_net_srv_usage_t * usages;
dap_chain_net_srv_usage_t * usage_active;
dap_chain_net_srv_banlist_item_t *ban_list;
uintmax_t limits_bytes; // Bytes left
time_t limits_ts; // Timestamp until its activte
dap_chain_net_srv_price_unit_uid_t limits_units_type;
......@@ -93,7 +86,6 @@ typedef struct dap_chain_net_srv_stream_session {
// Some common stats
volatile dap_net_stats_t stats;
time_t ts_activated;
dap_sign_t* user_sign; // User's signature for auth if reconnect
} dap_chain_net_srv_stream_session_t;
......
......@@ -1268,18 +1268,18 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find_unsafe(l_srv_session, l_ch_vpn->usage_id);
if ( ! l_usage){
log_it(L_NOTICE, "No active usage in list, possible disconnected. Send nothin on this channel");
log_it(L_NOTICE, "No active usage in list, possible disconnected. Send nothing on this channel");
dap_stream_ch_set_ready_to_write_unsafe(a_ch,false);
dap_stream_ch_set_ready_to_read_unsafe(a_ch,false);
return;
}
if ( ! l_usage->is_active ){
log_it(L_INFO, "Usage inactivation: switch off packet input channel");
dap_stream_ch_set_ready_to_write_unsafe(a_ch,false);
dap_stream_ch_set_ready_to_read_unsafe(a_ch,false);
log_it(L_INFO, "Usage inactivation: switch off packet input & output channels");
if (l_usage->client)
dap_stream_ch_pkt_write_unsafe( l_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 );
dap_stream_ch_set_ready_to_write_unsafe(a_ch,false);
dap_stream_ch_set_ready_to_read_unsafe(a_ch,false);
return;
}
......@@ -1414,26 +1414,26 @@ static void s_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find_unsafe(l_srv_session, l_ch_vpn->usage_id);
if ( ! l_usage){
log_it(L_NOTICE, "No active usage in list, possible disconnected. Send nothin on this channel");
log_it(L_NOTICE, "No active usage in list, possible disconnected. Send nothing on this channel");
dap_stream_ch_set_ready_to_write_unsafe(a_ch,false);
dap_stream_ch_set_ready_to_read_unsafe(a_ch,false);
return;
}
if ( ! l_usage->is_active ){
log_it(L_INFO, "Usage inactivation: switch off packet output channel");
dap_stream_ch_set_ready_to_write_unsafe(a_ch,false);
dap_stream_ch_set_ready_to_read_unsafe(a_ch,false);
log_it(L_INFO, "Usage inactivation: switch off packet input & output channels");
if (l_usage->client)
dap_stream_ch_pkt_write_unsafe( l_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 );
dap_stream_ch_set_ready_to_write_unsafe(a_ch,false);
dap_stream_ch_set_ready_to_read_unsafe(a_ch,false);
return;
}
if ( (! l_usage->is_free) && (! l_usage->receipt) ){
log_it(L_WARNING, "No active receipt, switching off");
dap_stream_ch_set_ready_to_write_unsafe(a_ch,false);
dap_stream_ch_set_ready_to_read_unsafe(a_ch,false);
if (l_usage->client)
dap_stream_ch_pkt_write_unsafe( l_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 );
dap_stream_ch_set_ready_to_write_unsafe(a_ch,false);
dap_stream_ch_set_ready_to_read_unsafe(a_ch,false);
return;
}
// Check for empty buffer out here to prevent warnings in worker
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment