diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c index a036d7a3a6b4750b2db25bc30f32cfe5ceaecbc3..742271b5612c855b4d84d4163ace24004f0ea7b0 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c @@ -30,15 +30,15 @@ along with any CellFrame SDK based project. If not, see <http://www.gnu.org/lic #include "dap_hash.h" #include "rand/dap_rand.h" -#include "dap_chain.h" -#include "dap_chain_datum_tx.h" -#include "dap_chain_datum_tx_in.h" -#include "dap_chain_datum_tx_in_cond.h" -#include "dap_chain_datum_tx_out.h" -#include "dap_chain_datum_tx_out_cond.h" -#include "dap_chain_datum_tx_receipt.h" -#include "dap_chain_mempool.h" -#include "dap_common.h" +//#include "dap_chain.h" +//#include "dap_chain_datum_tx.h" +//#include "dap_chain_datum_tx_in.h" +//#include "dap_chain_datum_tx_in_cond.h" +//#include "dap_chain_datum_tx_out.h" +//#include "dap_chain_datum_tx_out_cond.h" +//#include "dap_chain_datum_tx_receipt.h" +//#include "dap_chain_mempool.h" +//#include "dap_common.h" #include "dap_chain_net_srv.h" #include "dap_chain_net_srv_stream_session.h" @@ -54,6 +54,11 @@ along with any CellFrame SDK based project. If not, see <http://www.gnu.org/lic #define LOG_TAG "dap_stream_ch_chain_net_srv" +typedef struct usages_in_grace{ + dap_hash_fast_t tx_cond_hash; + dap_chain_net_srv_grace_t *grace; + UT_hash_handle hh; +} usages_in_grace_t; uint8_t dap_stream_ch_chain_net_srv_get_id() { @@ -65,6 +70,62 @@ static void s_stream_ch_delete(dap_stream_ch_t* ch , void* arg); static void s_stream_ch_packet_in(dap_stream_ch_t* ch , void* arg); static void s_stream_ch_packet_out(dap_stream_ch_t* ch , void* arg); +static bool s_unban_client(dap_chain_net_srv_banlist_item_t *a_item); + +static void s_service_start(dap_stream_ch_t* a_ch , dap_stream_ch_chain_net_srv_pkt_request_t * a_request, size_t a_request_size); +static void s_grace_period_start(dap_chain_net_srv_grace_t *a_grace); +static bool s_grace_period_finish(usages_in_grace_t *a_grace); + +static inline void s_grace_error(dap_chain_net_srv_grace_t *a_grace, dap_stream_ch_chain_net_srv_pkt_error_t a_err){ + dap_stream_ch_t * l_ch = dap_stream_ch_find_by_uuid_unsafe(a_grace->stream_worker, a_grace->ch_uuid); + dap_chain_net_srv_stream_session_t *l_srv_session = l_ch && l_ch->stream && l_ch->stream->session ? + (dap_chain_net_srv_stream_session_t *)l_ch->stream->session->_inheritor : NULL; + + a_grace->usage->is_grace = false; + if (a_grace->usage->receipt_next){ // If not first grace-period + log_it( L_WARNING, "Next receipt is rejected. Waiting until current limits is over."); + DAP_DEL_Z(a_grace->usage->receipt_next); + memset(&a_grace->usage->tx_cond_hash, 0, sizeof(a_grace->usage->tx_cond_hash)); + DAP_DELETE(a_grace->request); + DAP_DELETE(a_grace); + return; + } + + if (a_err.code) { + if(l_ch) + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &a_err, sizeof (a_err)); + if (a_grace->usage->service && a_grace->usage->service->callbacks.response_error) + a_grace->usage->service->callbacks.response_error(a_grace->usage->service, 0, NULL, &a_err, sizeof(a_err)); + } + + if (a_grace->usage) { // add client pkey hash to banlist + a_grace->usage->is_active = false; + if (a_grace->usage->service) { + dap_chain_net_srv_banlist_item_t *l_item = NULL; + pthread_mutex_lock(&a_grace->usage->service->banlist_mutex); + HASH_FIND(hh, a_grace->usage->service->ban_list, &a_grace->usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); + if (l_item) + pthread_mutex_unlock(&a_grace->usage->service->banlist_mutex); + else { + l_item = DAP_NEW_Z(dap_chain_net_srv_banlist_item_t); + l_item->client_pkey_hash = a_grace->usage->client_pkey_hash; + l_item->ht_mutex = &a_grace->usage->service->banlist_mutex; + l_item->ht_head = &a_grace->usage->service->ban_list; + HASH_ADD(hh, a_grace->usage->service->ban_list, client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); + pthread_mutex_unlock(&a_grace->usage->service->banlist_mutex); + dap_timerfd_start(a_grace->usage->service->grace_period * 1000, (dap_timerfd_callback_t)s_unban_client, l_item); + } + } + + } else if (l_srv_session->usage_active) + dap_chain_net_srv_usage_delete(l_srv_session); + DAP_DELETE(a_grace->request); + DAP_DELETE(a_grace); +} + +// TODO: move this to net_srv +static usages_in_grace_t * s_grace_table = NULL; +static pthread_mutex_t s_ht_grace_table_mutex; /** * @brief dap_stream_ch_chain_net_init * @return @@ -73,6 +134,7 @@ int dap_stream_ch_chain_net_srv_init(void) { log_it(L_NOTICE,"Chain network services channel initialized"); dap_stream_ch_proc_add(dap_stream_ch_chain_net_srv_get_id(),s_stream_ch_new,s_stream_ch_delete,s_stream_ch_packet_in,s_stream_ch_packet_out); + pthread_mutex_init(&s_ht_grace_table_mutex, NULL); return 0; } @@ -105,7 +167,6 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch , void* arg) log_it(L_ERROR, "Session inheritor is already present!"); dap_chain_net_srv_call_opened_all( a_ch); - } @@ -120,6 +181,8 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch , void* a_arg) (void) a_arg; log_it(L_DEBUG, "Stream ch chain net srv delete"); dap_chain_net_srv_call_closed_all( a_ch); + if (a_ch->stream->session && a_ch->stream->session->_inheritor) + dap_chain_net_srv_stream_session_delete( a_ch->stream->session ); DAP_DEL_Z(a_ch->internal); } @@ -132,29 +195,41 @@ static bool s_unban_client(dap_chain_net_srv_banlist_item_t *a_item) return false; } -static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) +void dap_stream_ch_chain_net_srv_tx_cond_added_cb(void *a_arg, dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx) { - assert(a_grace); + UNUSED(a_ledger); + UNUSED(a_arg); + // TODO: 1. Get net_srv by srv_uid from tx_cond + // 2. Get usages in grace HT from service + usages_in_grace_t *l_item = NULL; + dap_hash_fast_t tx_cond_hash = {}; + dap_hash_fast((void*)a_tx, dap_chain_datum_tx_get_size(a_tx), &tx_cond_hash); + pthread_mutex_lock(&s_ht_grace_table_mutex); + HASH_FIND(hh, s_grace_table, &tx_cond_hash, sizeof(dap_hash_fast_t), l_item); + if (l_item){ + log_it(L_INFO, "Found tx in ledger by notify. Finish grace."); + // Stop timer + dap_timerfd_delete_mt(l_item->grace->stream_worker->worker, l_item->grace->timer_es_uuid); + // finish grace + s_grace_period_finish(l_item); + } + pthread_mutex_unlock(&s_ht_grace_table_mutex); +} + +static void s_service_start(dap_stream_ch_t* a_ch , dap_stream_ch_chain_net_srv_pkt_request_t * a_request, size_t a_request_size) +{ + assert(a_ch); dap_stream_ch_chain_net_srv_pkt_error_t l_err; memset(&l_err, 0, sizeof(l_err)); dap_chain_net_srv_t * l_srv = NULL; - dap_chain_net_srv_usage_t *l_usage = NULL; - dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(a_grace->stream_worker, a_grace->ch_uuid); - if (!l_ch) - goto free_exit; + dap_chain_net_srv_stream_session_t *l_srv_session = a_ch->stream && a_ch->stream->session ? + (dap_chain_net_srv_stream_session_t *)a_ch->stream->session->_inheritor : NULL; + l_srv = dap_chain_net_srv_get( a_request->hdr.srv_uid ); + dap_chain_net_t * l_net = dap_chain_net_by_id( a_request->hdr.net_id ); - dap_chain_net_srv_stream_session_t *l_srv_session = l_ch && l_ch->stream && l_ch->stream->session ? - (dap_chain_net_srv_stream_session_t *)l_ch->stream->session->_inheritor : NULL; - if (!l_srv_session) - goto free_exit; - - dap_stream_ch_chain_net_srv_pkt_request_t *l_request = a_grace->request; - l_srv = dap_chain_net_srv_get( l_request->hdr.srv_uid ); - dap_chain_net_t * l_net = dap_chain_net_by_id( l_request->hdr.net_id ); - - l_err.net_id.uint64 = l_request->hdr.net_id.uint64; - l_err.srv_uid.uint64 = l_request->hdr.srv_uid.uint64; + l_err.net_id.uint64 = a_request->hdr.net_id.uint64; + l_err.srv_uid.uint64 = a_request->hdr.srv_uid.uint64; if ( ! l_net ) // Network not found l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NOT_FOUND; @@ -162,83 +237,149 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) if ( ! l_srv ) // Service not found l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND; - if ( l_err.code ){ - goto free_exit; + if ( l_err.code || !l_srv_session){ + if(a_ch) + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); + if (l_srv && l_srv->callbacks.response_error) + l_srv->callbacks.response_error(l_srv, 0, NULL, &l_err, sizeof(l_err)); + return; + } + + dap_chain_net_srv_usage_t *l_usage = NULL; + l_usage = dap_chain_net_srv_usage_add(l_srv_session, l_net, l_srv); + if ( !l_usage ){ // Usage can't add + log_it( L_WARNING, "Can't add usage"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_CANT_ADD_USAGE; + if(a_ch) + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); + if (l_srv && l_srv->callbacks.response_error) + l_srv->callbacks.response_error(l_srv, 0, NULL, &l_err, sizeof(l_err)); + return; + } + + l_err.usage_id = l_usage->id; + // Create one client + l_usage->client = DAP_NEW_Z( dap_chain_net_srv_client_remote_t); + l_usage->client->stream_worker = a_ch->stream_worker; + l_usage->client->ch = a_ch; + l_usage->client->session_id = a_ch->stream->session->id; + l_usage->client->ts_created = time(NULL); + l_usage->tx_cond_hash = a_request->hdr.tx_cond; + l_usage->ts_created = time(NULL); + l_usage->net = l_net; + l_usage->service = l_srv; + + + dap_chain_net_srv_grace_t *l_grace = DAP_NEW_Z(dap_chain_net_srv_grace_t); + l_grace->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, a_request_size); + memcpy(l_grace->request, a_request, a_request_size); + l_grace->request_size = a_request_size; + l_grace->ch_uuid = a_ch->uuid; + l_grace->stream_worker = a_ch->stream_worker; + l_grace->usage = l_usage; + + if (l_srv->pricelist){ + // not free service + s_grace_period_start(l_grace); + } else { + // Start service for free + log_it( L_INFO, "Service provide for free"); + l_grace->usage->is_free = true; + size_t l_success_size = sizeof (dap_stream_ch_chain_net_srv_pkt_success_hdr_t ); + dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, + l_success_size); + l_success->hdr.usage_id = l_usage->id; + l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; + l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, l_success, l_success_size); + if (l_usage->service->callbacks.response_success) + l_usage->service->callbacks.response_success(l_usage->service, l_usage->id, l_usage->client, NULL, 0); + DAP_DELETE(l_success); + } + return; +} + +static void s_grace_period_start(dap_chain_net_srv_grace_t *a_grace) +{ + assert(a_grace); + dap_stream_ch_chain_net_srv_pkt_error_t l_err; + memset(&l_err, 0, sizeof(l_err)); + dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(a_grace->stream_worker, a_grace->ch_uuid); + + if (!l_ch){ + s_grace_error(a_grace, l_err); + return; } - dap_ledger_t * l_ledger =l_net->pub.ledger; + dap_chain_net_t * l_net = a_grace->usage->net; + + l_err.net_id.uint64 = l_net->pub.id.uint64; + l_err.srv_uid.uint64 = a_grace->usage->service->uid.uint64; + + dap_ledger_t * l_ledger = l_net->pub.ledger; dap_chain_datum_tx_t * l_tx = NULL; dap_chain_tx_out_cond_t * l_tx_out_cond = NULL; - bool l_grace_start = false; - if (l_srv->pricelist ){ // Is present pricelist, not free service + dap_chain_net_srv_price_t * l_price = NULL; + if ( !l_ledger ){ // No ledger + log_it( L_WARNING, "No Ledger"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NO_LEDGER ; + s_grace_error(a_grace, l_err); + return; + } - if ( !l_ledger ){ // No ledger - log_it( L_WARNING, "No Ledger"); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NO_LEDGER ; - goto free_exit; - } + l_tx = a_grace->usage->is_waiting_new_tx_cond ? NULL : dap_chain_ledger_tx_find_by_hash(l_ledger, &a_grace->usage->tx_cond_hash); + if ( ! l_tx ){ // No tx cond transaction, start grace-period + a_grace->usage->is_grace = true; - l_tx = dap_chain_ledger_tx_find_by_hash( l_ledger,& l_request->hdr.tx_cond ); - if ( ! l_tx ){ // No tx cond transaction - if (a_grace->usage) { // marker for reentry to function - log_it( L_WARNING, "No tx cond transaction"); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND ; - goto free_exit; - } else - l_grace_start = true; - } - if (!l_grace_start) { - int l_tx_out_cond_size =0; - l_tx_out_cond = (dap_chain_tx_out_cond_t *) - dap_chain_datum_tx_item_get(l_tx, NULL, TX_ITEM_TYPE_OUT_COND, &l_tx_out_cond_size ); + l_price = DAP_NEW_Z(dap_chain_net_srv_price_t); + memcpy(l_price, a_grace->usage->service->pricelist, sizeof(*l_price)); - if ( ! l_tx_out_cond ) { // No conditioned output - log_it( L_WARNING, "No conditioned output"); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; - goto free_exit; - } + a_grace->usage->price = l_price; - // Check cond output if it equesl or not to request - if (!dap_chain_net_srv_uid_compare(l_tx_out_cond->header.srv_uid, l_request->hdr.srv_uid)) { - log_it( L_WARNING, "Wrong service uid in request, tx expect to close its output with 0x%016"DAP_UINT64_FORMAT_X, - l_tx_out_cond->header.srv_uid.uint64 ); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_WRONG_SRV_UID ; - goto free_exit; - } + if (!a_grace->usage->receipt){ + a_grace->usage->receipt = dap_chain_net_srv_issue_receipt(a_grace->usage->service, a_grace->usage->price, NULL, 0); + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST, + a_grace->usage->receipt, a_grace->usage->receipt->size); } - } - if (!a_grace->usage) { - l_usage = dap_chain_net_srv_usage_add(l_srv_session, l_net, l_srv); - if ( !l_usage ){ // Usage can't add - log_it( L_WARNING, "Can't add usage"); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_CANT_ADD_USAGE; - goto free_exit; + usages_in_grace_t *l_item = DAP_NEW_Z_SIZE(usages_in_grace_t, sizeof(usages_in_grace_t)); + l_item->grace = a_grace; + l_item->tx_cond_hash = a_grace->usage->tx_cond_hash; + + pthread_mutex_lock(&s_ht_grace_table_mutex); + HASH_ADD(hh, s_grace_table, tx_cond_hash, sizeof(dap_hash_fast_t), l_item); + pthread_mutex_unlock(&s_ht_grace_table_mutex); + a_grace->timer_es_uuid = dap_timerfd_start_on_worker(a_grace->stream_worker->worker, a_grace->usage->service->grace_period * 1000, + (dap_timerfd_callback_t)s_grace_period_finish, l_item)->esocket_uuid; + + + } else { // Start srvice in normal pay mode + a_grace->usage->tx_cond = l_tx; + + l_tx_out_cond = dap_chain_datum_tx_out_cond_get(l_tx, DAP_CHAIN_TX_OUT_COND_SUBTYPE_SRV_PAY, NULL ); + + if ( ! l_tx_out_cond ) { // No conditioned output + log_it( L_WARNING, "No conditioned output"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; + s_grace_error(a_grace, l_err); + return; } - l_err.usage_id = l_usage->id; + // Check cond output if it equesl or not to request + if (!dap_chain_net_srv_uid_compare(l_tx_out_cond->header.srv_uid, a_grace->usage->service->uid)) { + log_it( L_WARNING, "Wrong service uid in request, tx expect to close its output with 0x%016"DAP_UINT64_FORMAT_X, + l_tx_out_cond->header.srv_uid.uint64 ); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_WRONG_SRV_UID ; + s_grace_error(a_grace, l_err); + return; + } - // Create one client - l_usage->client = DAP_NEW_Z( dap_chain_net_srv_client_remote_t); - l_usage->client->stream_worker = l_ch->stream_worker; - l_usage->client->ch = l_ch; - l_usage->client->session_id = l_ch->stream->session->id; - l_usage->client->ts_created = time(NULL); - l_usage->tx_cond = l_tx; - l_usage->tx_cond_hash = l_request->hdr.tx_cond; - l_usage->ts_created = time(NULL); - } else { - l_usage = a_grace->usage; - l_usage->tx_cond = l_tx; - } - dap_chain_net_srv_price_t * l_price = NULL; - const char * l_ticker = NULL; - if (l_srv->pricelist && !l_grace_start) { - l_ticker = dap_chain_ledger_tx_get_token_ticker_by_hash(l_ledger, &l_request->hdr.tx_cond ); - dap_stpcpy(l_usage->token_ticker, l_ticker); + const char * l_ticker = NULL; + l_ticker = dap_chain_ledger_tx_get_token_ticker_by_hash(l_ledger, &a_grace->usage->tx_cond_hash); + dap_stpcpy(a_grace->usage->token_ticker, l_ticker); dap_chain_net_srv_price_t *l_price_tmp; - DL_FOREACH(l_srv->pricelist, l_price_tmp) { - if (l_price_tmp->net->pub.id.uint64 == l_request->hdr.net_id.uint64 + DL_FOREACH(a_grace->usage->service->pricelist, l_price_tmp) { + if (l_price_tmp && l_price_tmp->net->pub.id.uint64 == a_grace->usage->net->pub.id.uint64 && dap_strcmp(l_price_tmp->token, l_ticker) == 0 && l_price_tmp->units_uid.enm == l_tx_out_cond->subtype.srv_pay.unit.enm )//&& (l_price_tmp->value_datoshi/l_price_tmp->units) < l_tx_out_cond->subtype.srv_pay.header.unit_price_max_datoshi) @@ -251,102 +392,232 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) log_it( L_WARNING, "Request can't be processed because no acceptable price in pricelist for token %s in network %s", l_ticker, l_net->pub.name ); l_err.code =DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_ACCEPT_TOKEN; - goto free_exit; + s_grace_error(a_grace, l_err); + return; + } + a_grace->usage->price = l_price; + int ret; + if ((ret = a_grace->usage->service->callbacks.requested(a_grace->usage->service, a_grace->usage->id, a_grace->usage->client, a_grace->request, a_grace->request_size)) != 0) { + log_it( L_WARNING, "Request canceled by service callback, return code %d", ret); + l_err.code = (uint32_t) ret ; + s_grace_error(a_grace, l_err); + return; } - } - int ret; - if ((ret = l_srv->callbacks.requested(l_srv, l_usage->id, l_usage->client, l_request, a_grace->request_size)) != 0) { - log_it( L_WARNING, "Request canceled by service callback, return code %d", ret); - l_err.code = (uint32_t) ret ; - goto free_exit; - } - if ( l_srv->pricelist) { - if (l_price || l_grace_start) { - if (l_price) { - if (a_grace->usage) { - DAP_DELETE(l_usage->price); - } - } else { - l_price = DAP_NEW_Z(dap_chain_net_srv_price_t); - memcpy(l_price, l_srv->pricelist, sizeof(*l_price)); - l_price->value_datoshi = uint256_0; - } - l_usage->price = l_price; - if (l_usage->receipt_next){ - DAP_DEL_Z(l_usage->receipt_next); - l_usage->receipt_next = dap_chain_net_srv_issue_receipt(l_usage->service, l_usage->price, NULL, 0); - }else{ - dap_chain_net_srv_price_t l_b_price = *l_usage->price; - if (l_grace_start || a_grace->usage){ - l_b_price.units *= 2; - MULT_256_256(l_b_price.value_datoshi, GET_256_FROM_64((uint64_t)2), &l_b_price.value_datoshi); - } - l_usage->receipt = dap_chain_net_srv_issue_receipt(l_usage->service, &l_b_price, NULL, 0); - dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST, - l_usage->receipt, l_usage->receipt->size); - } + if (a_grace->usage->receipt_next){ + DAP_DEL_Z(a_grace->usage->receipt_next); + a_grace->usage->receipt_next = dap_chain_net_srv_issue_receipt(a_grace->usage->service, a_grace->usage->price, NULL, 0); }else{ - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_PRICE_NOT_FOUND ; - goto free_exit; + a_grace->usage->receipt = dap_chain_net_srv_issue_receipt(a_grace->usage->service, a_grace->usage->price, NULL, 0); + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST, + a_grace->usage->receipt, a_grace->usage->receipt->size); } - // If we a here we passed all the checks, wow, now if we're not for free we request the signature. - } else{ - log_it( L_INFO, "Service provide for free"); - l_usage->is_free = true; - size_t l_success_size = sizeof (dap_stream_ch_chain_net_srv_pkt_success_hdr_t ); - dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, - l_success_size); - l_success->hdr.usage_id = l_usage->id; - l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; - l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; - dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, l_success, l_success_size); - if (l_usage->service->callbacks.response_success) - l_usage->service->callbacks.response_success(l_usage->service, l_usage->id, l_usage->client, NULL, 0); - DAP_DELETE(l_success); - } - if (l_grace_start) { - l_usage->is_grace = true; - a_grace->usage = l_usage; - dap_timerfd_start_on_worker(a_grace->stream_worker->worker, l_srv->grace_period * 1000, - (dap_timerfd_callback_t)s_grace_period_control, a_grace); - return false; - } else { + DAP_DELETE(a_grace->request); DAP_DELETE(a_grace); - l_usage->is_grace = false; + + } +} + +static bool s_grace_period_finish(usages_in_grace_t *a_grace_item) +{ + assert(a_grace_item); + dap_stream_ch_chain_net_srv_pkt_error_t l_err; + memset(&l_err, 0, sizeof(l_err)); + dap_chain_net_srv_grace_t *l_grace = a_grace_item->grace; + + dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(l_grace->stream_worker, l_grace->ch_uuid); + + if (l_grace->usage->price && !l_grace->usage->receipt_next){ // if first grace delete price and set actual + DAP_DEL_Z(l_grace->usage->price); + } + + if (!l_ch){ + s_grace_error(l_grace, l_err); + HASH_DEL(s_grace_table, a_grace_item); + DAP_DEL_Z(a_grace_item); return false; } -free_exit: - if (l_err.code) { - if(l_ch) - dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); - if (l_srv && l_srv->callbacks.response_error) - l_srv->callbacks.response_error(l_srv, 0, NULL, &l_err, sizeof(l_err)); + + if (l_grace->usage->is_waiting_new_tx_cond){ + log_it(L_INFO, "No new tx cond!"); + s_grace_error(l_grace, l_err); + l_grace->usage->is_waiting_new_tx_cond = false; + HASH_DEL(s_grace_table, a_grace_item); + DAP_DEL_Z(a_grace_item); + return false; } - if (a_grace->usage) { // add client pkey hash to banlist - a_grace->usage->is_active = false; - if (l_srv) { - dap_chain_net_srv_banlist_item_t *l_item = NULL; - pthread_mutex_lock(&l_srv->banlist_mutex); - HASH_FIND(hh, l_srv->ban_list, &a_grace->usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); - if (l_item) - pthread_mutex_unlock(&l_srv->banlist_mutex); - else { - l_item = DAP_NEW_Z(dap_chain_net_srv_banlist_item_t); - l_item->client_pkey_hash = a_grace->usage->client_pkey_hash; - l_item->ht_mutex = &l_srv->banlist_mutex; - l_item->ht_head = &l_srv->ban_list; - HASH_ADD(hh, l_srv->ban_list, client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); - pthread_mutex_unlock(&l_srv->banlist_mutex); - dap_timerfd_start(l_srv->grace_period * 1000, (dap_timerfd_callback_t)s_unban_client, l_item); + + dap_chain_net_t * l_net = l_grace->usage->net; + + l_err.net_id.uint64 = l_net->pub.id.uint64; + l_err.srv_uid.uint64 = l_grace->usage->service->uid.uint64; + + dap_ledger_t * l_ledger = l_net->pub.ledger; + dap_chain_datum_tx_t * l_tx = NULL; + dap_chain_tx_out_cond_t * l_tx_out_cond = NULL; + + if ( !l_ledger ){ // No ledger + log_it( L_WARNING, "No Ledger"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NO_LEDGER ; + s_grace_error(l_grace, l_err); + HASH_DEL(s_grace_table, a_grace_item); + DAP_DEL_Z(a_grace_item); + return false; + } + log_it(L_INFO, "Grace period is over! Check tx in ledger."); + l_tx = dap_chain_ledger_tx_find_by_hash(l_ledger, &l_grace->usage->tx_cond_hash); + if ( ! l_tx ){ // No tx cond transaction, start grace-period + log_it( L_WARNING, "No tx cond transaction"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND ; + s_grace_error(l_grace, l_err); + HASH_DEL(s_grace_table, a_grace_item); + DAP_DEL_Z(a_grace_item); + return false; + } else { // Start srvice in normal pay mode + log_it(L_INFO, "Tx is found in ledger."); + l_grace->usage->tx_cond = l_tx; + + l_tx_out_cond = dap_chain_datum_tx_out_cond_get(l_tx, DAP_CHAIN_TX_OUT_COND_SUBTYPE_SRV_PAY, NULL ); + + if ( ! l_tx_out_cond ) { // No conditioned output + log_it( L_WARNING, "No conditioned output"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; + s_grace_error(l_grace, l_err); + HASH_DEL(s_grace_table, a_grace_item); + DAP_DEL_Z(a_grace_item); + return false; + } + + // Check cond output if it equesl or not to request + if (!dap_chain_net_srv_uid_compare(l_tx_out_cond->header.srv_uid, l_grace->usage->service->uid)) { + log_it( L_WARNING, "Wrong service uid in request, tx expect to close its output with 0x%016"DAP_UINT64_FORMAT_X, + l_tx_out_cond->header.srv_uid.uint64 ); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_WRONG_SRV_UID ; + s_grace_error(l_grace, l_err); + HASH_DEL(s_grace_table, a_grace_item); + DAP_DEL_Z(a_grace_item); + return false; + } + + dap_chain_net_srv_price_t * l_price = NULL; + const char * l_ticker = NULL; + l_ticker = dap_chain_ledger_tx_get_token_ticker_by_hash(l_ledger, &l_grace->usage->tx_cond_hash); + dap_stpcpy(l_grace->usage->token_ticker, l_ticker); + + dap_chain_net_srv_price_t *l_price_tmp; + DL_FOREACH(l_grace->usage->service->pricelist, l_price_tmp) { + if (l_price_tmp && l_price_tmp->net->pub.id.uint64 == l_grace->usage->net->pub.id.uint64 + && dap_strcmp(l_price_tmp->token, l_ticker) == 0 + && l_price_tmp->units_uid.enm == l_tx_out_cond->subtype.srv_pay.unit.enm + )//&& (l_price_tmp->value_datoshi/l_price_tmp->units) < l_tx_out_cond->subtype.srv_pay.header.unit_price_max_datoshi) + { + l_price = l_price_tmp; + break; + } + } + if ( !l_price ) { + log_it( L_WARNING, "Request can't be processed because no acceptable price in pricelist for token %s in network %s", + l_ticker, l_net->pub.name ); + l_err.code =DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_ACCEPT_TOKEN; + s_grace_error(l_grace, l_err); + HASH_DEL(s_grace_table, a_grace_item); + DAP_DEL_Z(a_grace_item); + return false; + } + + l_grace->usage->price = l_price; + + int ret; + if ((ret = l_grace->usage->service->callbacks.requested(l_grace->usage->service, l_grace->usage->id, l_grace->usage->client, l_grace->request, l_grace->request_size)) != 0) { + log_it( L_WARNING, "Request canceled by service callback, return code %d", ret); + l_err.code = (uint32_t) ret ; + s_grace_error(l_grace, l_err); + HASH_DEL(s_grace_table, a_grace_item); + DAP_DEL_Z(a_grace_item); + return false; + } + + // make receipt or tx + char *l_receipt_hash_str; + dap_chain_datum_tx_receipt_t *l_receipt = NULL; + if (l_grace->usage->receipt_next){ + l_receipt = l_grace->usage->receipt_next; + } else if (l_grace->usage->receipt){ + l_receipt = l_grace->usage->receipt; + } else { + // Send error??? + } + size_t l_receipt_size = l_receipt->size; + + // get a second signature - from the client (first sign in server, second sign in client) + dap_sign_t * l_receipt_sign = dap_chain_datum_tx_receipt_sign_get( l_receipt, l_receipt_size, 1); + if ( ! l_receipt_sign ){ + log_it(L_WARNING, "Tx already in chain, but receipt is not signed by client. Finish grace and wait receipt sign responce."); + s_grace_error(l_grace, l_err); + HASH_DEL(s_grace_table, a_grace_item); + DAP_DEL_Z(a_grace_item); + return false; + } + dap_get_data_hash_str_static(l_receipt, l_receipt_size, l_receipt_hash_str); + dap_global_db_set("local.receipts", l_receipt_hash_str, l_receipt, l_receipt_size, false, NULL, NULL); + // Form input transaction + char *l_hash_str = dap_hash_fast_to_str_new(&l_grace->usage->tx_cond_hash); + log_it(L_NOTICE, "Trying create input tx cond from tx %s with active receipt", l_hash_str); + DAP_DEL_Z(l_hash_str); + dap_chain_addr_t *l_wallet_addr = dap_chain_wallet_get_addr(l_grace->usage->price->wallet, l_grace->usage->net->pub.id); + int ret_status = 0; + char *l_tx_in_hash_str = dap_chain_mempool_tx_create_cond_input(l_grace->usage->net, &l_grace->usage->tx_cond_hash, l_wallet_addr, + dap_chain_wallet_get_key(l_grace->usage->price->wallet, 0), + l_receipt, "hex", &ret_status); + DAP_DEL_Z(l_wallet_addr); + if (!ret_status) { + dap_chain_hash_fast_from_str(l_tx_in_hash_str, &l_grace->usage->tx_cond_hash); + log_it(L_NOTICE, "Formed tx %s for input with active receipt", l_tx_in_hash_str); + DAP_DELETE(l_tx_in_hash_str); + + }else{ + if(ret_status == DAP_CHAIN_MEMPOOl_RET_STATUS_NOT_ENOUGH){ +// memset(&l_grace->usage->tx_cond_hash, 0, sizeof(l_grace->usage->tx_cond_hash)); +// DAP_DEL_Z(l_grace->usage->receipt_next); + log_it(L_ERROR, "Tx cond have not enough funds"); + dap_chain_net_srv_grace_t* l_grace_new = DAP_NEW_Z(dap_chain_net_srv_grace_t); + // Parse the request + l_grace_new->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, sizeof(dap_stream_ch_chain_net_srv_pkt_request_t)); + l_grace_new->request->hdr.net_id = a_grace_item->grace->usage->net->pub.id; + memcpy(l_grace_new->request->hdr.token, a_grace_item->grace->usage->token_ticker, strlen(a_grace_item->grace->usage->token_ticker)); + l_grace_new->request->hdr.srv_uid = a_grace_item->grace->usage->service->uid; + l_grace_new->request->hdr.tx_cond = a_grace_item->grace->usage->tx_cond_hash; + l_grace_new->request_size = sizeof(dap_stream_ch_chain_net_srv_pkt_request_t); + l_grace_new->ch_uuid = a_grace_item->grace->usage->client->ch->uuid; + l_grace_new->stream_worker = a_grace_item->grace->usage->client->ch->stream_worker; + l_grace_new->usage = a_grace_item->grace->usage; + l_grace_new->usage->is_waiting_new_tx_cond = true; + s_grace_period_start(l_grace_new); + + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_ENOUGH; + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); + + }else{ + log_it(L_ERROR, "Can't create input tx cond transaction!"); + memset(&l_grace->usage->tx_cond_hash, 0, sizeof(l_grace->usage->tx_cond_hash)); + if (l_grace->usage->receipt_next){ + DAP_DEL_Z(l_grace->usage->receipt_next); + } else if (l_grace->usage->receipt){ + DAP_DEL_Z(l_grace->usage->receipt); + } + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND; + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); + if (l_grace->usage->service->callbacks.response_error) + l_grace->usage->service->callbacks.response_error(l_grace->usage->service,l_grace->usage->id, l_grace->usage->client,&l_err,sizeof (l_err)); } } } - else if (l_usage) - dap_chain_net_srv_usage_delete(l_srv_session, l_usage); - DAP_DELETE(a_grace->request); - DAP_DELETE(a_grace); + l_grace->usage->is_grace = false; + DAP_DELETE(a_grace_item->grace->request); + DAP_DEL_Z(a_grace_item->grace); + HASH_DEL(s_grace_table, a_grace_item); + DAP_DEL_Z(a_grace_item); return false; } @@ -434,26 +705,27 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) log_it( L_WARNING, "Wrong request size, less than minimum"); break; } - dap_chain_net_srv_grace_t *l_grace = DAP_NEW_Z(dap_chain_net_srv_grace_t); - // Parse the request - l_grace->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, l_ch_pkt->hdr.data_size); - memcpy(l_grace->request, l_ch_pkt->data, l_ch_pkt->hdr.data_size); - l_ch_chain_net_srv->srv_uid.uint64 = l_grace->request->hdr.srv_uid.uint64; - l_grace->request_size = l_ch_pkt->hdr.data_size; - l_grace->ch_uuid = a_ch->uuid; - l_grace->stream_worker = a_ch->stream_worker; - s_grace_period_control(l_grace); + dap_stream_ch_chain_net_srv_pkt_request_t *l_request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, l_ch_pkt->hdr.data_size); + memcpy(l_request, l_ch_pkt->data, l_ch_pkt->hdr.data_size); + l_ch_chain_net_srv->srv_uid.uint64 = l_request->hdr.srv_uid.uint64; + s_service_start(a_ch, l_request, l_ch_pkt->hdr.data_size); } break; /* DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_REQUEST */ case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE: { // Check receipt sign and make tx if success - if (l_ch_pkt->hdr.data_size < sizeof(dap_chain_receipt_info_t)) { + dap_chain_net_srv_usage_t * l_usage = l_srv_session->usage_active; + if (l_ch_pkt->hdr.data_size < sizeof(dap_chain_receipt_info_t)) { log_it(L_ERROR, "Wrong sign response size, %u when expected at least %zu with smth", l_ch_pkt->hdr.data_size, sizeof(dap_chain_receipt_info_t)); + if ( l_usage->receipt_next ){ // If we have receipt next + DAP_DEL_Z(l_usage->receipt_next); + }else if (l_usage->receipt ){ // If we sign first receipt + DAP_DEL_Z(l_usage->receipt); + } break; } dap_chain_datum_tx_receipt_t * l_receipt = (dap_chain_datum_tx_receipt_t *) l_ch_pkt->data; size_t l_receipt_size = l_ch_pkt->hdr.data_size; - dap_chain_net_srv_usage_t * l_usage = l_srv_session->usage_active; + bool l_is_found = false; if ( l_usage->receipt_next ){ // If we have receipt next if ( memcmp(&l_usage->receipt_next->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){ @@ -488,9 +760,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) &l_err, sizeof (l_err) ); break; } - int l_tx_out_cond_size =0; - l_tx_out_cond = (dap_chain_tx_out_cond_t *)dap_chain_datum_tx_item_get(l_usage->tx_cond, NULL, - TX_ITEM_TYPE_OUT_COND, &l_tx_out_cond_size ); + l_tx_out_cond = dap_chain_datum_tx_out_cond_get(l_usage->tx_cond, DAP_CHAIN_TX_OUT_COND_SUBTYPE_SRV_PAY, NULL ); if ( ! l_tx_out_cond ){ // No conditioned output l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); @@ -519,6 +789,19 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) HASH_FIND(hh, l_srv->ban_list, &l_usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); pthread_mutex_unlock(&l_srv->banlist_mutex); if (l_item) { // client banned + log_it(L_INFO, "Client pkey is banned!"); + usages_in_grace_t *l_grace_item = NULL; + pthread_mutex_lock(&s_ht_grace_table_mutex); + HASH_FIND(hh, s_grace_table, &l_usage->tx_cond_hash, sizeof(dap_hash_fast_t), l_grace_item); + if (l_grace_item){ + // Stop timer + dap_timerfd_delete_mt(l_grace_item->grace->stream_worker->worker, l_grace_item->grace->timer_es_uuid); + // finish grace + HASH_DEL(s_grace_table, l_grace_item); + DAP_DEL_Z(l_grace_item); + + } + pthread_mutex_unlock(&s_ht_grace_table_mutex); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_BANNED_PKEY_HASH ; dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof(l_err)); if (l_usage->service->callbacks.response_error) @@ -579,26 +862,41 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) l_grace = DAP_NEW_Z(dap_chain_net_srv_grace_t); UNUSED(l_grace); // Parse the request -// l_grace->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, sizeof(dap_stream_ch_chain_net_srv_pkt_request_t)); -// l_grace->request->hdr.net_id = l_usage->net->pub.id; -// memcpy(l_grace->request->hdr.token, l_usage->token_ticker, strlen(l_usage->token_ticker)); -// l_grace->request->hdr.srv_uid = l_usage->service->uid; -// l_grace->request->hdr.tx_cond = l_usage->tx_cond_hash; -// l_ch_chain_net_srv->srv_uid.uint64 = l_grace->request->hdr.srv_uid.uint64; -// l_grace->request_size = l_ch_pkt->hdr.data_size; -// l_grace->ch_uuid = a_ch->uuid; -// l_grace->stream_worker = a_ch->stream_worker; -// s_grace_period_control(l_grace); + l_grace->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, sizeof(dap_stream_ch_chain_net_srv_pkt_request_t)); + l_grace->request->hdr.net_id = l_usage->net->pub.id; + memcpy(l_grace->request->hdr.token, l_usage->token_ticker, strlen(l_usage->token_ticker)); + l_grace->request->hdr.srv_uid = l_usage->service->uid; + l_grace->request->hdr.tx_cond = l_usage->tx_cond_hash; + l_ch_chain_net_srv->srv_uid.uint64 = l_grace->request->hdr.srv_uid.uint64; + l_grace->request_size = l_ch_pkt->hdr.data_size; + l_grace->ch_uuid = a_ch->uuid; + l_grace->stream_worker = a_ch->stream_worker; + l_grace->usage = l_usage; + s_grace_period_start(l_grace); + DAP_DELETE(l_tx_in_hash_str); break; case DAP_CHAIN_MEMPOOl_RET_STATUS_NOT_ENOUGH: // TODO send new tx cond request - memset(&l_usage->tx_cond_hash, 0, sizeof(l_usage->tx_cond_hash)); - DAP_DEL_Z(l_usage->receipt_next); log_it(L_ERROR, "Tx cond have not enough funds"); + l_usage->is_waiting_new_tx_cond = true; + l_grace = DAP_NEW_Z(dap_chain_net_srv_grace_t); + // Parse the request + l_grace->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, sizeof(dap_stream_ch_chain_net_srv_pkt_request_t)); + l_grace->request->hdr.net_id = l_usage->net->pub.id; + memcpy(l_grace->request->hdr.token, l_usage->token_ticker, strlen(l_usage->token_ticker)); + l_grace->request->hdr.srv_uid = l_usage->service->uid; + l_grace->request->hdr.tx_cond = l_usage->tx_cond_hash; + l_ch_chain_net_srv->srv_uid.uint64 = l_grace->request->hdr.srv_uid.uint64; + l_grace->request_size = l_ch_pkt->hdr.data_size; + l_grace->ch_uuid = a_ch->uuid; + l_grace->stream_worker = a_ch->stream_worker; + l_grace->usage = l_usage; + s_grace_period_start(l_grace); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_ENOUGH; dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); -// if (l_usage->service->callbacks.response_error) -// l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err)); + if (l_usage->service->callbacks.response_error) + l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err)); + DAP_DELETE(l_tx_in_hash_str); break; case DAP_CHAIN_MEMPOOL_RET_STATUS_BAD_ARGUMENTS: case DAP_CHAIN_MEMPOOl_RET_STATUS_WRONG_ADDR: @@ -614,16 +912,17 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); if (l_usage->service->callbacks.response_error) l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err)); + DAP_DELETE(l_tx_in_hash_str); break; } - - - break; + if (!l_usage->is_grace) + break; } l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t) + DAP_CHAIN_HASH_FAST_STR_SIZE;//sizeof(dap_chain_hash_fast_t); } else { l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t); } + dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_STACK_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, l_success_size); memset(&l_success->hdr, 0, sizeof(l_success->hdr)); @@ -634,8 +933,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) if (l_usage->is_grace){ char *l_hash_str = dap_hash_fast_to_str_new(&l_usage->tx_cond_hash); - log_it(L_NOTICE, "Receipt is OK, but transaction %s can't be found. Start the grace period for %d seconds", l_hash_str, - l_srv->grace_period); + log_it(L_NOTICE, "Receipt is OK, but tx transaction %s %s. Start the grace period for %d seconds", l_hash_str, + l_usage->is_waiting_new_tx_cond ? "have no enough funds. New tx cond requested": "can't be found", + l_srv->grace_period); DAP_DEL_Z(l_hash_str); }else { char *l_hash_str = dap_hash_fast_to_str_new(&l_usage->tx_cond_hash); @@ -708,10 +1008,51 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) dap_chain_net_srv_usage_t * l_usage = NULL; l_usage = l_srv_session->usage_active; dap_stream_ch_chain_net_srv_pkt_request_t* l_responce = (dap_stream_ch_chain_net_srv_pkt_request_t*)l_ch_pkt->data; - l_usage->tx_cond_hash = l_responce->hdr.tx_cond; - char *l_tx_in_hash_str = dap_chain_hash_fast_to_str_new(&l_usage->tx_cond_hash); + + char *l_tx_in_hash_str = dap_chain_hash_fast_to_str_new(&l_responce->hdr.tx_cond); log_it(L_NOTICE, "Received new tx cond %s", l_tx_in_hash_str); DAP_DELETE(l_tx_in_hash_str); + l_usage->is_waiting_new_tx_cond = false; + + usages_in_grace_t *l_curr_grace_item = NULL; + pthread_mutex_lock(&s_ht_grace_table_mutex); + HASH_FIND(hh, s_grace_table, &l_usage->tx_cond_hash, sizeof(dap_hash_fast_t), l_curr_grace_item); + pthread_mutex_unlock(&s_ht_grace_table_mutex); + +// if (dap_hash_fast_compare(&l_responce->hdr.tx_cond, &l_usage->tx_cond_hash) || !l_usage->is_grace){ //check new tx not equals to old tx or tx waiting grace period is over +// // if equals delete receipt and tx and stop grace if running +// if (l_curr_grace_item){ +// HASH_DEL(s_grace_table, l_curr_grace_item); +// dap_timerfd_delete_mt(l_curr_grace_item->grace->stream_worker->worker, l_curr_grace_item->grace->timer_es_uuid); +// } +// break; +// } + + dap_chain_datum_tx_t *l_tx = dap_chain_ledger_tx_find_by_hash(l_usage->net->pub.ledger, &l_responce->hdr.tx_cond); + if (l_tx){ + // Replace + if (l_curr_grace_item){ + log_it(L_INFO, "Found tx in ledger by net tx responce handler. Finish waiting new tx grace period."); + // Stop timer + dap_timerfd_delete_mt(l_curr_grace_item->grace->stream_worker->worker, l_curr_grace_item->grace->timer_es_uuid); + // finish grace + l_usage->tx_cond_hash = l_responce->hdr.tx_cond; + l_curr_grace_item->grace->request->hdr.tx_cond = l_responce->hdr.tx_cond; + s_grace_period_finish(l_curr_grace_item); + } + }else{ + if (l_curr_grace_item){ + l_curr_grace_item->grace->usage->tx_cond_hash = l_responce->hdr.tx_cond; + l_curr_grace_item->grace->request->hdr.tx_cond = l_responce->hdr.tx_cond; + pthread_mutex_lock(&s_ht_grace_table_mutex); + HASH_DEL(s_grace_table, l_curr_grace_item); + l_curr_grace_item->tx_cond_hash = l_responce->hdr.tx_cond; + HASH_ADD(hh, s_grace_table, tx_cond_hash, sizeof(dap_hash_fast_t), l_curr_grace_item); + pthread_mutex_unlock(&s_ht_grace_table_mutex); + } + } + + size_t l_success_size = sizeof (dap_stream_ch_chain_net_srv_pkt_success_hdr_t ); dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, l_success_size); diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.h b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.h index 53490cc0c59594dc5215bceb4549146935108c6c..649a749c21f00447d3b8d18b9f27801043aa0b1c 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.h +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.h @@ -29,6 +29,16 @@ #include "dap_stream_ch_pkt.h" #include "dap_chain_common.h" +#include "dap_chain.h" +#include "dap_chain_datum_tx.h" +#include "dap_chain_datum_tx_in.h" +#include "dap_chain_datum_tx_in_cond.h" +#include "dap_chain_datum_tx_out.h" +#include "dap_chain_datum_tx_out_cond.h" +#include "dap_chain_datum_tx_receipt.h" +#include "dap_chain_mempool.h" +#include "dap_common.h" + typedef struct dap_stream_ch_chain_net_srv dap_stream_ch_chain_net_srv_t; typedef void (*dap_stream_ch_chain_net_srv_callback_packet_t)(dap_stream_ch_chain_net_srv_t *, uint8_t, @@ -46,3 +56,5 @@ typedef struct dap_stream_ch_chain_net_srv { uint8_t dap_stream_ch_chain_net_srv_get_id(); int dap_stream_ch_chain_net_srv_init(void); + +void dap_stream_ch_chain_net_srv_tx_cond_added_cb(void *a_arg, dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx); diff --git a/modules/net/srv/dap_chain_net_srv.c b/modules/net/srv/dap_chain_net_srv.c index 72982d1f9ae47f60ca53d6e33a69e4424bb1924f..7640a9c071ccac787478d48617a887da6e0a6ea7 100644 --- a/modules/net/srv/dap_chain_net_srv.c +++ b/modules/net/srv/dap_chain_net_srv.c @@ -863,6 +863,8 @@ dap_chain_net_srv_t* dap_chain_net_srv_add(dap_chain_net_srv_uid_t a_uid, l_sdata->srv = l_srv; dap_chain_net_srv_parse_pricelist(l_srv, a_config_section); HASH_ADD(hh, s_srv_list, uid, sizeof(l_srv->uid), l_sdata); + if (l_srv->pricelist) + dap_chain_ledger_tx_add_notify(l_srv->pricelist->net->pub.ledger, dap_stream_ch_chain_net_srv_tx_cond_added_cb, NULL); }else{ log_it(L_ERROR, "Already present service with 0x%016"DAP_UINT64_FORMAT_X, a_uid.uint64); } diff --git a/modules/net/srv/dap_chain_net_srv_stream_session.c b/modules/net/srv/dap_chain_net_srv_stream_session.c index 34128a0d38f0abfb4c01c3c734d90449b470ae0d..6df72f5346370ae52134c607fb9f8dd2ce756a84 100644 --- a/modules/net/srv/dap_chain_net_srv_stream_session.c +++ b/modules/net/srv/dap_chain_net_srv_stream_session.c @@ -47,6 +47,22 @@ dap_chain_net_srv_stream_session_t * dap_chain_net_srv_stream_session_create( da return l_session_srv; } +/** + * @brief dap_chain_net_srv_stream_session_create + * @param a_session + * @return + */ +void dap_chain_net_srv_stream_session_delete( dap_stream_session_t * a_session) +{ + if (!a_session){ + log_it (L_ERROR, "Session is NULL!"); + return; + } + dap_chain_net_srv_stream_session_t * l_session_srv = a_session->_inheritor; + dap_chain_net_srv_usage_delete(l_session_srv); +} + + /** * @brief dap_chain_net_srv_usage_add * @param a_srv_session @@ -78,21 +94,20 @@ dap_chain_net_srv_usage_t* dap_chain_net_srv_usage_add (dap_chain_net_srv_stream * @param a_usage * @return */ -void dap_chain_net_srv_usage_delete (dap_chain_net_srv_stream_session_t * a_srv_session, - dap_chain_net_srv_usage_t* a_usage) +void dap_chain_net_srv_usage_delete (dap_chain_net_srv_stream_session_t * a_srv_session) { - if ( a_usage->receipt ) - DAP_DELETE( a_usage->receipt ); - if ( a_usage->client ){ - for (dap_chain_net_srv_client_remote_t * l_srv_client = a_usage->client, * tmp = NULL; l_srv_client; ){ + if ( a_srv_session->usage_active->receipt ) + DAP_DELETE( a_srv_session->usage_active->receipt ); + if ( a_srv_session->usage_active->receipt_next ) + DAP_DELETE( a_srv_session->usage_active->receipt_next ); + if ( a_srv_session->usage_active->client ){ + for (dap_chain_net_srv_client_remote_t * l_srv_client = a_srv_session->usage_active->client, * tmp = NULL; l_srv_client; ){ tmp = l_srv_client; l_srv_client = l_srv_client->next; DAP_DELETE( tmp); } - - } - DAP_DELETE( a_usage ); + DAP_DEL_Z(a_srv_session->usage_active); } /** diff --git a/modules/net/srv/include/dap_chain_net_srv.h b/modules/net/srv/include/dap_chain_net_srv.h index 53db3a75b0f8dbb20b38f303711514b75882822d..08db33f999802ef1582c25ee15c9d3e5ad283568 100755 --- a/modules/net/srv/include/dap_chain_net_srv.h +++ b/modules/net/srv/include/dap_chain_net_srv.h @@ -186,6 +186,7 @@ typedef struct dap_chain_net_srv_grace { dap_stream_worker_t *stream_worker; dap_stream_ch_uuid_t ch_uuid; dap_chain_net_srv_usage_t *usage; + dap_events_socket_uuid_t timer_es_uuid; dap_stream_ch_chain_net_srv_pkt_request_t *request; size_t request_size; } dap_chain_net_srv_grace_t; diff --git a/modules/net/srv/include/dap_chain_net_srv_stream_session.h b/modules/net/srv/include/dap_chain_net_srv_stream_session.h index 6f7a7b0dda5c930ca9557af4378dab2a708c8756..af8bb938713a95f508ce0d14c06441e5276c0d37 100644 --- a/modules/net/srv/include/dap_chain_net_srv_stream_session.h +++ b/modules/net/srv/include/dap_chain_net_srv_stream_session.h @@ -55,6 +55,7 @@ typedef struct dap_chain_net_srv_usage{ bool is_active; bool is_free; bool is_grace; + bool is_waiting_new_tx_cond; // UT_hash_handle hh; // } dap_chain_net_srv_usage_t; @@ -91,9 +92,9 @@ typedef struct dap_chain_net_srv_stream_session { #define DAP_CHAIN_NET_SRV_STREAM_SESSION(a) ((dap_chain_net_srv_stream_session_t *) (a)->_inheritor ) dap_chain_net_srv_stream_session_t * dap_chain_net_srv_stream_session_create( dap_stream_session_t * a_session); +void dap_chain_net_srv_stream_session_delete( dap_stream_session_t * a_session); dap_chain_net_srv_usage_t* dap_chain_net_srv_usage_add (dap_chain_net_srv_stream_session_t * a_srv_session, dap_chain_net_t * a_net, dap_chain_net_srv_t * a_srv); -void dap_chain_net_srv_usage_delete (dap_chain_net_srv_stream_session_t * a_srv_session, - dap_chain_net_srv_usage_t* a_usage); +void dap_chain_net_srv_usage_delete (dap_chain_net_srv_stream_session_t * a_srv_session); dap_chain_net_srv_usage_t* dap_chain_net_srv_usage_find_unsafe (dap_chain_net_srv_stream_session_t * a_srv_session, uint32_t a_usage_id); diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index 05683727900ab216fb5522df9842d7ff5f38d032..b2ff5ae724d0851897898eaf962219a03cdd4379 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -1076,7 +1076,7 @@ void s_ch_vpn_new(dap_stream_ch_t* a_ch, void* a_arg) dap_chain_net_srv_stream_session_t * l_srv_session = (dap_chain_net_srv_stream_session_t *) a_ch->stream->session->_inheritor; - l_srv_vpn->usage_id = l_srv_session->usage_active? l_srv_session->usage_active->id : 0; + l_srv_vpn->usage_id = l_srv_session->usage_active ? l_srv_session->usage_active->id : 0; if( l_srv_vpn->usage_id) { // So complicated to update usage client to be sure that nothing breaks it @@ -1088,7 +1088,6 @@ void s_ch_vpn_new(dap_stream_ch_t* a_ch, void* a_arg) } pthread_rwlock_unlock(&s_clients_rwlock); } - } @@ -1146,6 +1145,7 @@ static void s_ch_vpn_delete(dap_stream_ch_t* a_ch, void* arg) l_ch_vpn->ch = NULL; l_ch_vpn->net_srv = NULL; l_ch_vpn->is_allowed =false; + DAP_DEL_Z(a_ch->internal); } /** @@ -1162,21 +1162,20 @@ static void s_update_limits(dap_stream_ch_t * a_ch , bool l_issue_new_receipt = false; // Check if there are time limits - if (a_usage->is_free) + if (a_usage->is_free || !a_usage->receipt || !a_usage->is_active) return; if (a_usage->receipt->receipt_info.units_type.enm == SERV_UNIT_DAY || a_usage->receipt->receipt_info.units_type.enm == SERV_UNIT_SEC){ time_t l_current_limit_ts = 0; - if ( a_usage->receipt){ - switch( a_usage->receipt->receipt_info.units_type.enm){ + + switch( a_usage->receipt->receipt_info.units_type.enm){ case SERV_UNIT_DAY:{ l_current_limit_ts = (time_t)a_usage->receipt->receipt_info.units*24*3600; } break; case SERV_UNIT_SEC:{ l_current_limit_ts = (time_t)a_usage->receipt->receipt_info.units; } - } } a_srv_session->limits_ts -= time(NULL) - a_srv_session->last_update_ts;