/* * Authors: * Dmitriy Gerasimov <naeper@demlabs.net> * Cellframe https://cellframe.net * DeM Labs Inc. https://demlabs.net * Copyright (c) 2017-2019 * All rights reserved. This file is part of CellFrame SDK the open source project CellFrame SDK is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. CellFrame SDK is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with any CellFrame SDK based project. If not, see <http://www.gnu.org/licenses/>. */ #include <sys/time.h> #include <time.h> #include "dap_global_db.h" #include "dap_time.h" #include "dap_timerfd.h" #include "dap_hash.h" #include "rand/dap_rand.h" #include "dap_chain_net_srv.h" #include "dap_chain_net_srv_stream_session.h" #include "dap_stream.h" #include "dap_stream_ch.h" #include "dap_stream_ch_pkt.h" #include "dap_stream_ch_chain_net_srv.h" #include "dap_stream_ch_chain_net_srv_pkt.h" #include "dap_stream_ch_proc.h" #include "dap_stream_ch_chain_net_srv.h" #define LOG_TAG "dap_stream_ch_chain_net_srv" typedef struct usages_in_grace{ dap_hash_fast_t tx_cond_hash; dap_chain_net_srv_grace_t *grace; UT_hash_handle hh; } usages_in_grace_t; uint8_t dap_stream_ch_chain_net_srv_get_id() { return 'R'; } static void s_stream_ch_new(dap_stream_ch_t* ch , void* arg); static void s_stream_ch_delete(dap_stream_ch_t* ch , void* arg); static void s_stream_ch_packet_in(dap_stream_ch_t* ch , void* arg); static void s_stream_ch_packet_out(dap_stream_ch_t* ch , void* arg); static bool s_unban_client(dap_chain_net_srv_banlist_item_t *a_item); static void s_service_start(dap_stream_ch_t* a_ch , dap_stream_ch_chain_net_srv_pkt_request_t * a_request, size_t a_request_size); static void s_grace_period_start(dap_chain_net_srv_grace_t *a_grace); static bool s_grace_period_finish(usages_in_grace_t *a_grace); static inline void s_grace_error(dap_chain_net_srv_grace_t *a_grace, dap_stream_ch_chain_net_srv_pkt_error_t a_err){ dap_stream_ch_t * l_ch = dap_stream_ch_find_by_uuid_unsafe(a_grace->stream_worker, a_grace->ch_uuid); dap_chain_net_srv_stream_session_t *l_srv_session = l_ch && l_ch->stream && l_ch->stream->session ? (dap_chain_net_srv_stream_session_t *)l_ch->stream->session->_inheritor : NULL; if (!l_srv_session){ DAP_DELETE(a_grace->request); DAP_DELETE(a_grace); return; } a_grace->usage->is_grace = false; if (a_grace->usage->receipt_next){ // If not first grace-period log_it( L_WARNING, "Next receipt is rejected. Waiting until current limits is over."); DAP_DEL_Z(a_grace->usage->receipt_next); memset(&a_grace->usage->tx_cond_hash, 0, sizeof(a_grace->usage->tx_cond_hash)); DAP_DELETE(a_grace->request); DAP_DELETE(a_grace); return; } if (a_err.code) { if(l_ch) dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &a_err, sizeof (a_err)); if (a_grace->usage->service && a_grace->usage->service->callbacks.response_error) a_grace->usage->service->callbacks.response_error(a_grace->usage->service, 0, NULL, &a_err, sizeof(a_err)); } if (a_grace->usage) { // add client pkey hash to banlist a_grace->usage->is_active = false; if (a_grace->usage->service) { dap_chain_net_srv_banlist_item_t *l_item = NULL; pthread_mutex_lock(&a_grace->usage->service->banlist_mutex); HASH_FIND(hh, a_grace->usage->service->ban_list, &a_grace->usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); if (l_item) pthread_mutex_unlock(&a_grace->usage->service->banlist_mutex); else { l_item = DAP_NEW_Z(dap_chain_net_srv_banlist_item_t); if (!l_item) { log_it(L_ERROR, "Memory allocation error in s_grace_error"); DAP_DELETE(a_grace->request); DAP_DELETE(a_grace); return; } l_item->client_pkey_hash = a_grace->usage->client_pkey_hash; l_item->ht_mutex = &a_grace->usage->service->banlist_mutex; l_item->ht_head = &a_grace->usage->service->ban_list; HASH_ADD(hh, a_grace->usage->service->ban_list, client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); pthread_mutex_unlock(&a_grace->usage->service->banlist_mutex); dap_timerfd_start(a_grace->usage->service->grace_period * 1000, (dap_timerfd_callback_t)s_unban_client, l_item); } } } else if (l_srv_session->usage_active) dap_chain_net_srv_usage_delete(l_srv_session); DAP_DELETE(a_grace->request); DAP_DELETE(a_grace); } // TODO: move this to net_srv static usages_in_grace_t * s_grace_table = NULL; static pthread_mutex_t s_ht_grace_table_mutex; /** * @brief dap_stream_ch_chain_net_init * @return */ int dap_stream_ch_chain_net_srv_init(void) { log_it(L_NOTICE,"Chain network services channel initialized"); dap_stream_ch_proc_add(dap_stream_ch_chain_net_srv_get_id(),s_stream_ch_new,s_stream_ch_delete,s_stream_ch_packet_in,s_stream_ch_packet_out); pthread_mutex_init(&s_ht_grace_table_mutex, NULL); return 0; } /** * @brief dap_stream_ch_chain_deinit */ void dap_stream_ch_chain_net_srv_deinit(void) { } /** * @brief s_stream_ch_new * @param a_ch * @param arg */ void s_stream_ch_new(dap_stream_ch_t* a_ch , void* arg) { (void ) arg; a_ch->internal=DAP_NEW_Z(dap_stream_ch_chain_net_srv_t); dap_stream_ch_chain_net_srv_t * l_ch_chain_net_srv = DAP_STREAM_CH_CHAIN_NET_SRV(a_ch); l_ch_chain_net_srv->ch_uuid = a_ch->uuid; l_ch_chain_net_srv->ch = a_ch; if (a_ch->stream->session && !a_ch->stream->session->_inheritor) dap_chain_net_srv_stream_session_create( a_ch->stream->session ); else if ( a_ch->stream->session == NULL) log_it( L_ERROR, "No session at all!"); else log_it(L_ERROR, "Session inheritor is already present!"); dap_chain_net_srv_call_opened_all( a_ch); } /** * @brief s_stream_ch_delete * @param ch * @param arg */ void s_stream_ch_delete(dap_stream_ch_t* a_ch , void* a_arg) { (void) a_ch; (void) a_arg; log_it(L_DEBUG, "Stream ch chain net srv delete"); dap_chain_net_srv_call_closed_all( a_ch); if (a_ch->stream->session && a_ch->stream->session->_inheritor) dap_chain_net_srv_stream_session_delete( a_ch->stream->session ); DAP_DEL_Z(a_ch->internal); } static bool s_unban_client(dap_chain_net_srv_banlist_item_t *a_item) { pthread_mutex_lock(a_item->ht_mutex); HASH_DEL(*(a_item->ht_head), a_item); pthread_mutex_unlock(a_item->ht_mutex); DAP_DELETE(a_item); return false; } void dap_stream_ch_chain_net_srv_tx_cond_added_cb(void *a_arg, dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx) { UNUSED(a_ledger); UNUSED(a_arg); // TODO: 1. Get net_srv by srv_uid from tx_cond // 2. Get usages in grace HT from service usages_in_grace_t *l_item = NULL; dap_hash_fast_t tx_cond_hash = {}; dap_hash_fast((void*)a_tx, dap_chain_datum_tx_get_size(a_tx), &tx_cond_hash); pthread_mutex_lock(&s_ht_grace_table_mutex); HASH_FIND(hh, s_grace_table, &tx_cond_hash, sizeof(dap_hash_fast_t), l_item); if (l_item){ log_it(L_INFO, "Found tx in ledger by notify. Finish grace."); // Stop timer dap_timerfd_delete_mt(l_item->grace->stream_worker->worker, l_item->grace->timer_es_uuid); // finish grace s_grace_period_finish(l_item); } pthread_mutex_unlock(&s_ht_grace_table_mutex); } static void s_service_start(dap_stream_ch_t* a_ch , dap_stream_ch_chain_net_srv_pkt_request_t * a_request, size_t a_request_size) { assert(a_ch); dap_stream_ch_chain_net_srv_pkt_error_t l_err; memset(&l_err, 0, sizeof(l_err)); dap_chain_net_srv_t * l_srv = NULL; dap_chain_net_srv_stream_session_t *l_srv_session = a_ch->stream && a_ch->stream->session ? (dap_chain_net_srv_stream_session_t *)a_ch->stream->session->_inheritor : NULL; l_srv = dap_chain_net_srv_get( a_request->hdr.srv_uid ); dap_chain_net_t * l_net = dap_chain_net_by_id( a_request->hdr.net_id ); l_err.net_id.uint64 = a_request->hdr.net_id.uint64; l_err.srv_uid.uint64 = a_request->hdr.srv_uid.uint64; if ( ! l_net ) // Network not found l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NOT_FOUND; if ( ! l_srv ) // Service not found l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND; if ( l_err.code || !l_srv_session){ if(a_ch) dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); if (l_srv && l_srv->callbacks.response_error) l_srv->callbacks.response_error(l_srv, 0, NULL, &l_err, sizeof(l_err)); return; } dap_chain_net_srv_usage_t *l_usage = NULL; l_usage = dap_chain_net_srv_usage_add(l_srv_session, l_net, l_srv); if ( !l_usage ){ // Usage can't add log_it( L_WARNING, "Can't add usage"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_CANT_ADD_USAGE; if(a_ch) dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); if (l_srv && l_srv->callbacks.response_error) l_srv->callbacks.response_error(l_srv, 0, NULL, &l_err, sizeof(l_err)); return; } l_err.usage_id = l_usage->id; // Create one client l_usage->client = DAP_NEW_Z( dap_chain_net_srv_client_remote_t); if (!l_usage->client) { log_it(L_ERROR, "Memory allocation errror in s_service_start"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_ALLOC_MEMORY_ERROR; if(a_ch) dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); if (l_srv && l_srv->callbacks.response_error) l_srv->callbacks.response_error(l_srv, 0, NULL, &l_err, sizeof(l_err)); DAP_DEL_Z(l_usage); return; } l_usage->client->stream_worker = a_ch->stream_worker; l_usage->client->ch = a_ch; l_usage->client->session_id = a_ch->stream->session->id; l_usage->client->ts_created = time(NULL); l_usage->tx_cond_hash = a_request->hdr.tx_cond; l_usage->ts_created = time(NULL); l_usage->net = l_net; l_usage->service = l_srv; dap_chain_net_srv_grace_t *l_grace = DAP_NEW_Z(dap_chain_net_srv_grace_t); if (!l_grace) { log_it(L_ERROR, "Memory allocation errror in s_service_start"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_ALLOC_MEMORY_ERROR; if(a_ch) dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); if (l_srv && l_srv->callbacks.response_error) l_srv->callbacks.response_error(l_srv, 0, NULL, &l_err, sizeof(l_err)); DAP_DEL_Z(l_usage->client); DAP_DEL_Z(l_usage); return; } l_grace->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, a_request_size); if (!l_grace->request) { log_it(L_ERROR, "Memory allocation errror in s_service_start"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_ALLOC_MEMORY_ERROR; if(a_ch) dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); if (l_srv && l_srv->callbacks.response_error) l_srv->callbacks.response_error(l_srv, 0, NULL, &l_err, sizeof(l_err)); DAP_DEL_Z(l_usage->client); DAP_DEL_Z(l_usage); DAP_DEL_Z(l_grace); return; } memcpy(l_grace->request, a_request, a_request_size); l_grace->request_size = a_request_size; l_grace->ch_uuid = a_ch->uuid; l_grace->stream_worker = a_ch->stream_worker; l_grace->usage = l_usage; if (l_srv->pricelist){ // not free service s_grace_period_start(l_grace); } else { // Start service for free log_it( L_INFO, "Service provide for free"); l_grace->usage->is_free = true; size_t l_success_size = sizeof (dap_stream_ch_chain_net_srv_pkt_success_hdr_t ); dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, l_success_size); if(!l_success) { log_it(L_ERROR, "Memory allocation error in s_service_start"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_ALLOC_MEMORY_ERROR; if(a_ch) dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); if (l_srv && l_srv->callbacks.response_error) l_srv->callbacks.response_error(l_srv, 0, NULL, &l_err, sizeof(l_err)); DAP_DEL_Z(l_usage->client); DAP_DEL_Z(l_usage); DAP_DEL_Z(l_grace->request); DAP_DEL_Z(l_grace); return; } l_success->hdr.usage_id = l_usage->id; l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, l_success, l_success_size); if (l_usage->service->callbacks.response_success) l_usage->service->callbacks.response_success(l_usage->service, l_usage->id, l_usage->client, NULL, 0); DAP_DELETE(l_success); } return; } static void s_grace_period_start(dap_chain_net_srv_grace_t *a_grace) { assert(a_grace); dap_stream_ch_chain_net_srv_pkt_error_t l_err; memset(&l_err, 0, sizeof(l_err)); dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(a_grace->stream_worker, a_grace->ch_uuid); if (!l_ch){ s_grace_error(a_grace, l_err); return; } dap_chain_net_t * l_net = a_grace->usage->net; l_err.net_id.uint64 = l_net->pub.id.uint64; l_err.srv_uid.uint64 = a_grace->usage->service->uid.uint64; dap_ledger_t * l_ledger = l_net->pub.ledger; dap_chain_datum_tx_t * l_tx = NULL; dap_chain_tx_out_cond_t * l_tx_out_cond = NULL; dap_chain_net_srv_price_t * l_price = NULL; if ( !l_ledger ){ // No ledger log_it( L_WARNING, "No Ledger"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NO_LEDGER ; s_grace_error(a_grace, l_err); return; } l_tx = a_grace->usage->is_waiting_new_tx_cond ? NULL : dap_chain_ledger_tx_find_by_hash(l_ledger, &a_grace->usage->tx_cond_hash); if ( ! l_tx ){ // No tx cond transaction, start grace-period a_grace->usage->is_grace = true; l_price = DAP_NEW_Z(dap_chain_net_srv_price_t); if (!l_price) { log_it(L_ERROR, "Memory allocation error in s_grace_period_start"); s_grace_error(a_grace, l_err); return; } memcpy(l_price, a_grace->usage->service->pricelist, sizeof(*l_price)); a_grace->usage->price = l_price; if (!a_grace->usage->receipt){ a_grace->usage->receipt = dap_chain_net_srv_issue_receipt(a_grace->usage->service, a_grace->usage->price, NULL, 0); dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST, a_grace->usage->receipt, a_grace->usage->receipt->size); } usages_in_grace_t *l_item = DAP_NEW_Z_SIZE(usages_in_grace_t, sizeof(usages_in_grace_t)); if (!l_item) { log_it(L_ERROR, "Memory allocation error in s_grace_period_start"); DAP_DEL_Z(l_price); s_grace_error(a_grace, l_err); return; } l_item->grace = a_grace; l_item->tx_cond_hash = a_grace->usage->tx_cond_hash; pthread_mutex_lock(&s_ht_grace_table_mutex); HASH_ADD(hh, s_grace_table, tx_cond_hash, sizeof(dap_hash_fast_t), l_item); pthread_mutex_unlock(&s_ht_grace_table_mutex); a_grace->timer_es_uuid = dap_timerfd_start_on_worker(a_grace->stream_worker->worker, a_grace->usage->service->grace_period * 1000, (dap_timerfd_callback_t)s_grace_period_finish, l_item)->esocket_uuid; } else { // Start srvice in normal pay mode a_grace->usage->tx_cond = l_tx; l_tx_out_cond = dap_chain_datum_tx_out_cond_get(l_tx, DAP_CHAIN_TX_OUT_COND_SUBTYPE_SRV_PAY, NULL ); if ( ! l_tx_out_cond ) { // No conditioned output log_it( L_WARNING, "No conditioned output"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; s_grace_error(a_grace, l_err); return; } // Check cond output if it equesl or not to request if (!dap_chain_net_srv_uid_compare(l_tx_out_cond->header.srv_uid, a_grace->usage->service->uid)) { log_it( L_WARNING, "Wrong service uid in request, tx expect to close its output with 0x%016"DAP_UINT64_FORMAT_X, l_tx_out_cond->header.srv_uid.uint64 ); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_WRONG_SRV_UID ; s_grace_error(a_grace, l_err); return; } const char * l_ticker = NULL; l_ticker = dap_chain_ledger_tx_get_token_ticker_by_hash(l_ledger, &a_grace->usage->tx_cond_hash); dap_stpcpy(a_grace->usage->token_ticker, l_ticker); dap_chain_net_srv_price_t *l_price_tmp; DL_FOREACH(a_grace->usage->service->pricelist, l_price_tmp) { if (l_price_tmp && l_price_tmp->net->pub.id.uint64 == a_grace->usage->net->pub.id.uint64 && dap_strcmp(l_price_tmp->token, l_ticker) == 0 && l_price_tmp->units_uid.enm == l_tx_out_cond->subtype.srv_pay.unit.enm) { uint256_t l_unit_price = {}; if (l_price_tmp->units != 0){ DIV_256(l_price_tmp->value_datoshi, GET_256_FROM_64(l_price_tmp->units), &l_unit_price); } else { break; } if(!compare256(uint256_0, l_tx_out_cond->subtype.srv_pay.unit_price_max_datoshi) || compare256(l_unit_price, l_tx_out_cond->subtype.srv_pay.unit_price_max_datoshi) <= 0){ l_price = l_price_tmp; break; } } } if ( !l_price ) { log_it( L_WARNING, "Request can't be processed because no acceptable price in pricelist for token %s in network %s", l_ticker, l_net->pub.name ); l_err.code =DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_ACCEPT_TOKEN; s_grace_error(a_grace, l_err); return; } a_grace->usage->price = l_price; int ret; if ((ret = a_grace->usage->service->callbacks.requested(a_grace->usage->service, a_grace->usage->id, a_grace->usage->client, a_grace->request, a_grace->request_size)) != 0) { log_it( L_WARNING, "Request canceled by service callback, return code %d", ret); l_err.code = (uint32_t) ret ; s_grace_error(a_grace, l_err); return; } if (a_grace->usage->receipt_next){ DAP_DEL_Z(a_grace->usage->receipt_next); a_grace->usage->receipt_next = dap_chain_net_srv_issue_receipt(a_grace->usage->service, a_grace->usage->price, NULL, 0); }else{ a_grace->usage->receipt = dap_chain_net_srv_issue_receipt(a_grace->usage->service, a_grace->usage->price, NULL, 0); dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST, a_grace->usage->receipt, a_grace->usage->receipt->size); } DAP_DELETE(a_grace->request); DAP_DELETE(a_grace); } } static bool s_grace_period_finish(usages_in_grace_t *a_grace_item) { assert(a_grace_item); dap_stream_ch_chain_net_srv_pkt_error_t l_err; memset(&l_err, 0, sizeof(l_err)); dap_chain_net_srv_grace_t *l_grace = a_grace_item->grace; dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(l_grace->stream_worker, l_grace->ch_uuid); if (!l_ch){ s_grace_error(l_grace, l_err); HASH_DEL(s_grace_table, a_grace_item); DAP_DEL_Z(a_grace_item); return false; } if (l_grace->usage->price && !l_grace->usage->receipt_next){ // if first grace delete price and set actual DAP_DEL_Z(l_grace->usage->price); } if (l_grace->usage->is_waiting_new_tx_cond){ log_it(L_INFO, "No new tx cond!"); s_grace_error(l_grace, l_err); l_grace->usage->is_waiting_new_tx_cond = false; HASH_DEL(s_grace_table, a_grace_item); DAP_DEL_Z(a_grace_item); return false; } dap_chain_net_t * l_net = l_grace->usage->net; l_err.net_id.uint64 = l_net->pub.id.uint64; l_err.srv_uid.uint64 = l_grace->usage->service->uid.uint64; dap_ledger_t * l_ledger = l_net->pub.ledger; dap_chain_datum_tx_t * l_tx = NULL; dap_chain_tx_out_cond_t * l_tx_out_cond = NULL; if ( !l_ledger ){ // No ledger log_it( L_WARNING, "No Ledger"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NO_LEDGER ; s_grace_error(l_grace, l_err); HASH_DEL(s_grace_table, a_grace_item); DAP_DEL_Z(a_grace_item); return false; } log_it(L_INFO, "Grace period is over! Check tx in ledger."); l_tx = dap_chain_ledger_tx_find_by_hash(l_ledger, &l_grace->usage->tx_cond_hash); if ( ! l_tx ){ // No tx cond transaction, start grace-period log_it( L_WARNING, "No tx cond transaction"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND ; s_grace_error(l_grace, l_err); HASH_DEL(s_grace_table, a_grace_item); DAP_DEL_Z(a_grace_item); return false; } else { // Start srvice in normal pay mode log_it(L_INFO, "Tx is found in ledger."); l_grace->usage->tx_cond = l_tx; l_tx_out_cond = dap_chain_datum_tx_out_cond_get(l_tx, DAP_CHAIN_TX_OUT_COND_SUBTYPE_SRV_PAY, NULL ); if ( ! l_tx_out_cond ) { // No conditioned output log_it( L_WARNING, "No conditioned output"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; s_grace_error(l_grace, l_err); HASH_DEL(s_grace_table, a_grace_item); DAP_DEL_Z(a_grace_item); return false; } // Check cond output if it equesl or not to request if (!dap_chain_net_srv_uid_compare(l_tx_out_cond->header.srv_uid, l_grace->usage->service->uid)) { log_it( L_WARNING, "Wrong service uid in request, tx expect to close its output with 0x%016"DAP_UINT64_FORMAT_X, l_tx_out_cond->header.srv_uid.uint64 ); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_WRONG_SRV_UID ; s_grace_error(l_grace, l_err); HASH_DEL(s_grace_table, a_grace_item); DAP_DEL_Z(a_grace_item); return false; } dap_chain_net_srv_price_t * l_price = NULL; const char * l_ticker = NULL; l_ticker = dap_chain_ledger_tx_get_token_ticker_by_hash(l_ledger, &l_grace->usage->tx_cond_hash); dap_stpcpy(l_grace->usage->token_ticker, l_ticker); dap_chain_net_srv_price_t *l_price_tmp; DL_FOREACH(l_grace->usage->service->pricelist, l_price_tmp) { if (l_price_tmp && l_price_tmp->net->pub.id.uint64 == l_grace->usage->net->pub.id.uint64 && dap_strcmp(l_price_tmp->token, l_ticker) == 0 && l_price_tmp->units_uid.enm == l_tx_out_cond->subtype.srv_pay.unit.enm) { uint256_t l_unit_price = {}; if (l_price_tmp->units != 0){ DIV_256(l_price_tmp->value_datoshi, GET_256_FROM_64(l_price_tmp->units), &l_unit_price); } else { break; } if(!compare256(uint256_0, l_tx_out_cond->subtype.srv_pay.unit_price_max_datoshi) || compare256(l_unit_price, l_tx_out_cond->subtype.srv_pay.unit_price_max_datoshi) <= 0){ l_price = l_price_tmp; break; } } } if ( !l_price ) { log_it( L_WARNING, "Request can't be processed because no acceptable price in pricelist for token %s in network %s", l_ticker, l_net->pub.name ); l_err.code =DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_ACCEPT_TOKEN; s_grace_error(l_grace, l_err); HASH_DEL(s_grace_table, a_grace_item); DAP_DEL_Z(a_grace_item); return false; } l_grace->usage->price = l_price; int ret; if ((ret = l_grace->usage->service->callbacks.requested(l_grace->usage->service, l_grace->usage->id, l_grace->usage->client, l_grace->request, l_grace->request_size)) != 0) { log_it( L_WARNING, "Request canceled by service callback, return code %d", ret); l_err.code = (uint32_t) ret ; s_grace_error(l_grace, l_err); HASH_DEL(s_grace_table, a_grace_item); DAP_DEL_Z(a_grace_item); return false; } // make receipt or tx char *l_receipt_hash_str; dap_chain_datum_tx_receipt_t *l_receipt = NULL; if (l_grace->usage->receipt_next){ l_receipt = l_grace->usage->receipt_next; } else if (l_grace->usage->receipt){ l_receipt = l_grace->usage->receipt; } else { // Send error??? } size_t l_receipt_size = l_receipt->size; // get a second signature - from the client (first sign in server, second sign in client) dap_sign_t * l_receipt_sign = dap_chain_datum_tx_receipt_sign_get( l_receipt, l_receipt_size, 1); if ( ! l_receipt_sign ){ log_it(L_WARNING, "Tx already in chain, but receipt is not signed by client. Finish grace and wait receipt sign responce."); s_grace_error(l_grace, l_err); HASH_DEL(s_grace_table, a_grace_item); DAP_DEL_Z(a_grace_item); return false; } dap_get_data_hash_str_static(l_receipt, l_receipt_size, l_receipt_hash_str); dap_global_db_set("local.receipts", l_receipt_hash_str, l_receipt, l_receipt_size, false, NULL, NULL); // Form input transaction char *l_hash_str = dap_hash_fast_to_str_new(&l_grace->usage->tx_cond_hash); log_it(L_NOTICE, "Trying create input tx cond from tx %s with active receipt", l_hash_str); DAP_DEL_Z(l_hash_str); dap_chain_addr_t *l_wallet_addr = dap_chain_wallet_get_addr(l_grace->usage->price->wallet, l_grace->usage->net->pub.id); int ret_status = 0; char *l_tx_in_hash_str = dap_chain_mempool_tx_create_cond_input(l_grace->usage->net, &l_grace->usage->tx_cond_hash, l_wallet_addr, dap_chain_wallet_get_key(l_grace->usage->price->wallet, 0), l_receipt, "hex", &ret_status); DAP_DEL_Z(l_wallet_addr); if (!ret_status) { dap_chain_hash_fast_from_str(l_tx_in_hash_str, &l_grace->usage->tx_cond_hash); log_it(L_NOTICE, "Formed tx %s for input with active receipt", l_tx_in_hash_str); DAP_DELETE(l_tx_in_hash_str); }else{ if(ret_status == DAP_CHAIN_MEMPOOl_RET_STATUS_NOT_ENOUGH){ // memset(&l_grace->usage->tx_cond_hash, 0, sizeof(l_grace->usage->tx_cond_hash)); // DAP_DEL_Z(l_grace->usage->receipt_next); log_it(L_ERROR, "Tx cond have not enough funds"); dap_chain_net_srv_grace_t* l_grace_new = DAP_NEW_Z(dap_chain_net_srv_grace_t); if (!l_grace_new) { log_it(L_ERROR, "Memory allocation error in s_grace_period_finish"); DAP_DELETE(a_grace_item->grace->request); DAP_DEL_Z(a_grace_item->grace); HASH_DEL(s_grace_table, a_grace_item); DAP_DEL_Z(a_grace_item); return false; } // Parse the request l_grace_new->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, sizeof(dap_stream_ch_chain_net_srv_pkt_request_t)); if (!l_grace_new->request) { log_it(L_ERROR, "Memory allocation error in s_grace_period_finish"); DAP_DELETE(a_grace_item->grace->request); DAP_DEL_Z(a_grace_item->grace); HASH_DEL(s_grace_table, a_grace_item); DAP_DEL_Z(a_grace_item); return false; } l_grace_new->request->hdr.net_id = a_grace_item->grace->usage->net->pub.id; memcpy(l_grace_new->request->hdr.token, a_grace_item->grace->usage->token_ticker, strlen(a_grace_item->grace->usage->token_ticker)); l_grace_new->request->hdr.srv_uid = a_grace_item->grace->usage->service->uid; l_grace_new->request->hdr.tx_cond = a_grace_item->grace->usage->tx_cond_hash; l_grace_new->request_size = sizeof(dap_stream_ch_chain_net_srv_pkt_request_t); l_grace_new->ch_uuid = a_grace_item->grace->usage->client->ch->uuid; l_grace_new->stream_worker = a_grace_item->grace->usage->client->ch->stream_worker; l_grace_new->usage = a_grace_item->grace->usage; l_grace_new->usage->is_waiting_new_tx_cond = true; s_grace_period_start(l_grace_new); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_ENOUGH; dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); }else{ log_it(L_ERROR, "Can't create input tx cond transaction!"); memset(&l_grace->usage->tx_cond_hash, 0, sizeof(l_grace->usage->tx_cond_hash)); if (l_grace->usage->receipt_next){ DAP_DEL_Z(l_grace->usage->receipt_next); } else if (l_grace->usage->receipt){ DAP_DEL_Z(l_grace->usage->receipt); } l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND; dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); if (l_grace->usage->service->callbacks.response_error) l_grace->usage->service->callbacks.response_error(l_grace->usage->service,l_grace->usage->id, l_grace->usage->client,&l_err,sizeof (l_err)); } } } l_grace->usage->is_grace = false; DAP_DELETE(a_grace_item->grace->request); DAP_DEL_Z(a_grace_item->grace); HASH_DEL(s_grace_table, a_grace_item); DAP_DEL_Z(a_grace_item); return false; } /** * @brief s_stream_ch_packet_in * @param ch * @param arg */ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) { dap_stream_ch_pkt_t *l_ch_pkt = (dap_stream_ch_pkt_t *)a_arg; if (!l_ch_pkt) return; dap_chain_net_srv_stream_session_t *l_srv_session = NULL; if (a_ch) { l_srv_session = a_ch->stream && a_ch->stream->session ? a_ch->stream->session->_inheritor : NULL; } if (!l_srv_session) { log_it( L_ERROR, "Not defined service session, switching off packet input process"); dap_stream_ch_set_ready_to_read_unsafe(a_ch, false); return; } dap_stream_ch_chain_net_srv_t * l_ch_chain_net_srv = DAP_STREAM_CH_CHAIN_NET_SRV(a_ch); if (l_ch_chain_net_srv->notify_callback) { l_ch_chain_net_srv->notify_callback(l_ch_chain_net_srv, l_ch_pkt->hdr.type, l_ch_pkt, l_ch_chain_net_srv->notify_callback_arg); return; // It's a client behind this } dap_stream_ch_chain_net_srv_pkt_error_t l_err = { }; switch (l_ch_pkt->hdr.type) { case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_REQUEST: { typedef dap_stream_ch_chain_net_srv_pkt_test_t pkt_test_t; pkt_test_t *l_request = (pkt_test_t*)l_ch_pkt->data; size_t l_request_size = l_request->data_size + sizeof(pkt_test_t); if (l_ch_pkt->hdr.data_size != l_request_size) { log_it(L_WARNING, "Wrong request size %u, must be %zu [pkt seq %"DAP_UINT64_FORMAT_U"]", l_ch_pkt->hdr.data_size, l_request_size, l_ch_pkt->hdr.seq_id); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_WRONG_SIZE; dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof(l_err)); break; } dap_chain_hash_fast_t l_data_hash; dap_hash_fast(l_request->data, l_request->data_size, &l_data_hash); if (l_request->data_size > 0 && !dap_hash_fast_compare(&l_data_hash, &l_request->data_hash)) { log_it(L_WARNING, "Wrong hash [pkt seq %"DAP_UINT64_FORMAT_U"]", l_ch_pkt->hdr.seq_id); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_WRONG_HASH; dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof(l_err)); break; } if(l_request->data_size_recv > UINT_MAX) { log_it(L_WARNING, "Too large payload %zu [pkt seq %"DAP_UINT64_FORMAT_U"]", l_request->data_size_recv, l_ch_pkt->hdr.seq_id); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_BIG_SIZE; dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof(l_err)); break; } /* No need for bare copying, resend it back modified */ if (l_request->data_size_recv) { l_request->data_size = l_request->data_size_recv; if (!l_request->data_size_send){ l_request = (pkt_test_t*)DAP_DUP_SIZE(l_request, sizeof(pkt_test_t)); l_request = DAP_REALLOC(l_request, sizeof(pkt_test_t) + l_request->data_size); } randombytes(l_request->data, l_request->data_size); dap_hash_fast_t l_data_hash; dap_hash_fast(l_request->data, l_request->data_size, &l_data_hash); l_request->data_hash = l_data_hash; } l_request->err_code = 0; strncpy(l_request->ip_send, a_ch->stream->esocket->hostaddr, INET_ADDRSTRLEN - 1); l_request->ip_send[INET_ADDRSTRLEN - 1] = '\0'; // Compiler warning escape l_request->recv_time2 = dap_nanotime_now(); dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE, l_request, l_request->data_size + sizeof(pkt_test_t)); if(l_request != (pkt_test_t*)l_ch_pkt->data){ DAP_DELETE(l_request); } } break; /* DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_REQUEST */ case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_REQUEST: { //Service request if (l_ch_pkt->hdr.data_size < sizeof(dap_stream_ch_chain_net_srv_pkt_request_hdr_t) ){ log_it( L_WARNING, "Wrong request size, less than minimum"); break; } dap_stream_ch_chain_net_srv_pkt_request_t *l_request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, l_ch_pkt->hdr.data_size); if (!l_request) { log_it(L_ERROR, "Memory allocation error in s_stream_ch_packet_in"); return; } memcpy(l_request, l_ch_pkt->data, l_ch_pkt->hdr.data_size); l_ch_chain_net_srv->srv_uid.uint64 = l_request->hdr.srv_uid.uint64; s_service_start(a_ch, l_request, l_ch_pkt->hdr.data_size); } break; /* DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_REQUEST */ case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE: { // Check receipt sign and make tx if success dap_chain_net_srv_usage_t * l_usage = l_srv_session->usage_active; if (l_ch_pkt->hdr.data_size < sizeof(dap_chain_receipt_info_t)) { log_it(L_ERROR, "Wrong sign response size, %u when expected at least %zu with smth", l_ch_pkt->hdr.data_size, sizeof(dap_chain_receipt_info_t)); if ( l_usage->receipt_next ){ // If we have receipt next DAP_DEL_Z(l_usage->receipt_next); }else if (l_usage->receipt ){ // If we sign first receipt DAP_DEL_Z(l_usage->receipt); } break; } dap_chain_datum_tx_receipt_t * l_receipt = (dap_chain_datum_tx_receipt_t *) l_ch_pkt->data; size_t l_receipt_size = l_ch_pkt->hdr.data_size; bool l_is_found = false; if ( l_usage->receipt_next ){ // If we have receipt next if ( memcmp(&l_usage->receipt_next->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){ l_is_found = true; } }else if (l_usage->receipt ){ // If we sign first receipt if ( memcmp(&l_usage->receipt->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){ l_is_found = true; } } if ( !l_is_found || ! l_usage ){ log_it(L_WARNING, "Can't find receipt in usages thats equal to response receipt"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND; dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); if (l_usage && l_usage->service && l_usage->service->callbacks.response_error) l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); break; } l_err.usage_id = l_usage->id; l_err.net_id.uint64 = l_usage->net->pub.id.uint64; l_err.srv_uid.uint64 = l_usage->service->uid.uint64; dap_chain_tx_out_cond_t *l_tx_out_cond = NULL; if (!l_usage->is_grace) { if (! l_usage->tx_cond ){ log_it(L_WARNING, "No tx out in usage"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND ; dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); if (l_usage->service->callbacks.response_error) l_usage->service->callbacks.response_error( l_usage->service, l_usage->id, l_usage->client, &l_err, sizeof (l_err) ); break; } l_tx_out_cond = dap_chain_datum_tx_out_cond_get(l_usage->tx_cond, DAP_CHAIN_TX_OUT_COND_SUBTYPE_SRV_PAY, NULL ); if ( ! l_tx_out_cond ){ // No conditioned output l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); if (l_usage->service->callbacks.response_error) l_usage->service->callbacks.response_error( l_usage->service, l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); break; } } // get a second signature - from the client (first sign in server, second sign in client) dap_sign_t * l_receipt_sign = dap_chain_datum_tx_receipt_sign_get( l_receipt, l_receipt_size, 1); if ( ! l_receipt_sign ){ l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND ; dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); if (l_usage->service->callbacks.response_error) l_usage->service->callbacks.response_error( l_usage->service, l_usage->id, l_usage->client, &l_err, sizeof (l_err) ); break; } // Check receipt signature pkey hash dap_chain_net_srv_banlist_item_t *l_item = NULL; dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_receipt->receipt_info.srv_uid); dap_sign_get_pkey_hash(l_receipt_sign, &l_usage->client_pkey_hash); if (l_usage->is_grace) { pthread_mutex_lock(&l_srv->banlist_mutex); HASH_FIND(hh, l_srv->ban_list, &l_usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); pthread_mutex_unlock(&l_srv->banlist_mutex); if (l_item) { // client banned log_it(L_INFO, "Client pkey is banned!"); usages_in_grace_t *l_grace_item = NULL; pthread_mutex_lock(&s_ht_grace_table_mutex); HASH_FIND(hh, s_grace_table, &l_usage->tx_cond_hash, sizeof(dap_hash_fast_t), l_grace_item); if (l_grace_item){ // Stop timer dap_timerfd_delete_mt(l_grace_item->grace->stream_worker->worker, l_grace_item->grace->timer_es_uuid); // finish grace HASH_DEL(s_grace_table, l_grace_item); DAP_DEL_Z(l_grace_item); } pthread_mutex_unlock(&s_ht_grace_table_mutex); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_BANNED_PKEY_HASH ; dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof(l_err)); if (l_usage->service->callbacks.response_error) l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client, &l_err, sizeof(l_err)); break; } } else { if (memcmp(l_usage->client_pkey_hash.raw, l_tx_out_cond->subtype.srv_pay.pkey_hash.raw, sizeof(l_usage->client_pkey_hash)) != 0) { l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_WRONG_PKEY_HASH ; dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); if (l_usage->service->callbacks.response_error) l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); break; } } // Update actual receipt bool l_is_first_sign = false; if (! l_usage->receipt_next && l_usage->receipt){ DAP_DEL_Z(l_usage->receipt); l_usage->receipt = DAP_NEW_SIZE(dap_chain_datum_tx_receipt_t,l_receipt_size); if (!l_usage->receipt_next) { log_it(L_ERROR, "Memory allocation error in s_stream_ch_packet_in"); return; } l_is_first_sign = true; l_usage->is_active = true; memcpy( l_usage->receipt, l_receipt, l_receipt_size); } else if (l_usage->receipt_next ){ DAP_DEL_Z(l_usage->receipt_next); l_usage->receipt_next = DAP_NEW_SIZE(dap_chain_datum_tx_receipt_t,l_receipt_size); if (!l_usage->receipt_next) { log_it(L_ERROR, "Memory allocation error in s_stream_ch_packet_in"); return; } l_usage->is_active = true; memcpy( l_usage->receipt_next, l_receipt, l_receipt_size); } // Store receipt if any problems with transactions char *l_receipt_hash_str; dap_get_data_hash_str_static(l_receipt, l_receipt_size, l_receipt_hash_str); dap_global_db_set("local.receipts", l_receipt_hash_str, l_receipt, l_receipt_size, false, NULL, NULL); size_t l_success_size; if (!l_usage->is_grace) { // Form input transaction char *l_hash_str = dap_hash_fast_to_str_new(&l_usage->tx_cond_hash); log_it(L_NOTICE, "Trying create input tx cond from tx %s with active receipt", l_hash_str); DAP_DEL_Z(l_hash_str); dap_chain_addr_t *l_wallet_addr = dap_chain_wallet_get_addr(l_usage->price->wallet, l_usage->net->pub.id); int ret_status = 0; char *l_tx_in_hash_str = dap_chain_mempool_tx_create_cond_input(l_usage->net, &l_usage->tx_cond_hash, l_wallet_addr, dap_chain_wallet_get_key(l_usage->price->wallet, 0), l_receipt, "hex", &ret_status); if (!ret_status) { dap_chain_hash_fast_from_str(l_tx_in_hash_str, &l_usage->tx_cond_hash); log_it(L_NOTICE, "Formed tx %s for input with active receipt", l_tx_in_hash_str); DAP_DELETE(l_tx_in_hash_str); }else{ // TODO add ret status handling/ if tx not found start grace again dap_chain_net_srv_grace_t *l_grace = NULL; switch(ret_status){ case DAP_CHAIN_MEMPOOl_RET_STATUS_CANT_FIND_FINAL_TX_HASH: // TX not found in ledger and we not in grace, start grace log_it(L_ERROR, "Can't find tx cond. Start grace!"); l_grace = DAP_NEW_Z(dap_chain_net_srv_grace_t); if (!l_grace) { log_it(L_ERROR, "Memory allocation error in s_stream_ch_packet_in"); DAP_DELETE(l_tx_in_hash_str); return; } UNUSED(l_grace); // Parse the request l_grace->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, sizeof(dap_stream_ch_chain_net_srv_pkt_request_t)); if (!l_grace->request) { log_it(L_ERROR, "Memory allocation error in s_stream_ch_packet_in"); DAP_DEL_Z(l_grace) DAP_DELETE(l_tx_in_hash_str); return; } l_grace->request->hdr.net_id = l_usage->net->pub.id; memcpy(l_grace->request->hdr.token, l_usage->token_ticker, strlen(l_usage->token_ticker)); l_grace->request->hdr.srv_uid = l_usage->service->uid; l_grace->request->hdr.tx_cond = l_usage->tx_cond_hash; l_ch_chain_net_srv->srv_uid.uint64 = l_grace->request->hdr.srv_uid.uint64; l_grace->request_size = l_ch_pkt->hdr.data_size; l_grace->ch_uuid = a_ch->uuid; l_grace->stream_worker = a_ch->stream_worker; l_grace->usage = l_usage; s_grace_period_start(l_grace); DAP_DELETE(l_tx_in_hash_str); break; case DAP_CHAIN_MEMPOOl_RET_STATUS_NOT_ENOUGH: // TODO send new tx cond request log_it(L_ERROR, "Tx cond have not enough funds"); l_usage->is_waiting_new_tx_cond = true; l_grace = DAP_NEW_Z(dap_chain_net_srv_grace_t); if (!l_grace) { log_it(L_ERROR, "Memory allocation error in s_stream_ch_packet_in"); DAP_DELETE(l_tx_in_hash_str); return; } // Parse the request l_grace->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, sizeof(dap_stream_ch_chain_net_srv_pkt_request_t)); if (!l_grace->request) { log_it(L_ERROR, "Memory allocation error in s_stream_ch_packet_in"); DAP_DEL_Z(l_grace) DAP_DELETE(l_tx_in_hash_str); return; } l_grace->request->hdr.net_id = l_usage->net->pub.id; memcpy(l_grace->request->hdr.token, l_usage->token_ticker, strlen(l_usage->token_ticker)); l_grace->request->hdr.srv_uid = l_usage->service->uid; l_grace->request->hdr.tx_cond = l_usage->tx_cond_hash; l_ch_chain_net_srv->srv_uid.uint64 = l_grace->request->hdr.srv_uid.uint64; l_grace->request_size = l_ch_pkt->hdr.data_size; l_grace->ch_uuid = a_ch->uuid; l_grace->stream_worker = a_ch->stream_worker; l_grace->usage = l_usage; s_grace_period_start(l_grace); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_ENOUGH; dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); if (l_usage->service->callbacks.response_error) l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err)); DAP_DELETE(l_tx_in_hash_str); break; case DAP_CHAIN_MEMPOOL_RET_STATUS_BAD_ARGUMENTS: case DAP_CHAIN_MEMPOOl_RET_STATUS_WRONG_ADDR: case DAP_CHAIN_MEMPOOl_RET_STATUS_NOT_NATIVE_TOKEN: case DAP_CHAIN_MEMPOOl_RET_STATUS_NO_COND_OUT: case DAP_CHAIN_MEMPOOl_RET_STATUS_CANT_ADD_TX_OUT: case DAP_CHAIN_MEMPOOl_RET_STATUS_CANT_ADD_SIGN: default: log_it(L_ERROR, "Can't create input tx cond transaction!"); memset(&l_usage->tx_cond_hash, 0, sizeof(l_usage->tx_cond_hash)); DAP_DEL_Z(l_usage->receipt_next); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND; dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); if (l_usage->service->callbacks.response_error) l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err)); DAP_DELETE(l_tx_in_hash_str); break; } if (!l_usage->is_grace) break; } l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t) + DAP_CHAIN_HASH_FAST_STR_SIZE;//sizeof(dap_chain_hash_fast_t); } else { l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t); } dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_STACK_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, l_success_size); memset(&l_success->hdr, 0, sizeof(l_success->hdr)); l_success->hdr.usage_id = l_usage->id; l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; if (l_usage->is_grace){ char *l_hash_str = dap_hash_fast_to_str_new(&l_usage->tx_cond_hash); log_it(L_NOTICE, "Receipt is OK, but tx transaction %s %s. Start the grace period for %d seconds", l_hash_str, l_usage->is_waiting_new_tx_cond ? "have no enough funds. New tx cond requested": "can't be found", l_srv->grace_period); DAP_DEL_Z(l_hash_str); }else { char *l_hash_str = dap_hash_fast_to_str_new(&l_usage->tx_cond_hash); memcpy(l_success->custom_data, l_hash_str, DAP_CHAIN_HASH_FAST_STR_SIZE); DAP_DEL_Z(l_hash_str); log_it(L_NOTICE, "Receipt with remote client sign is acceptible for. Now start the service's usage"); } dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, l_success, l_success_size); if ( l_is_first_sign && l_usage->service->callbacks.response_success){ if( l_usage->service->callbacks.response_success(l_usage->service,l_usage->id, l_usage->client, l_receipt, l_receipt_size ) !=0 ){ log_it(L_NOTICE, "No success by service success callback, inactivating service usage"); l_usage->is_active = false; } } else if (l_usage->service->callbacks.receipt_next_success) { if (l_usage->service->callbacks.receipt_next_success(l_usage->service, l_usage->id, l_usage->client, l_receipt, l_receipt_size ) != 0 ){ log_it(L_NOTICE, "No success by service receipt_next callback, inactivating service usage"); l_usage->is_active = false; } } } break; case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_DATA: { if (l_ch_pkt->hdr.data_size < sizeof(dap_stream_ch_chain_net_srv_pkt_data_hdr_t) ){ log_it( L_WARNING, "Wrong request size, less than minimum"); break; } typedef dap_stream_ch_chain_net_srv_pkt_data_t pkt_t; pkt_t * l_pkt =(pkt_t *) l_ch_pkt->data; size_t l_pkt_size = l_ch_pkt->hdr.data_size - sizeof(pkt_t); dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get( l_pkt->hdr.srv_uid); dap_chain_net_srv_usage_t * l_usage = l_srv_session->usage_active;//dap_chain_net_srv_usage_find_unsafe( l_srv_session, l_pkt->hdr.usage_id ); // If service not found if ( l_srv == NULL){ l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND ; l_err.srv_uid = l_pkt->hdr.srv_uid; dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); break; } if (!l_srv->callbacks.custom_data) break; size_t l_out_data_size = 0; void *l_out_data = l_srv->callbacks.custom_data(l_srv, l_usage, l_pkt->data, l_pkt_size, &l_out_data_size); if (l_out_data && l_out_data_size) { pkt_t *l_data = DAP_NEW_STACK_SIZE(pkt_t, sizeof(pkt_t) + l_out_data_size); l_data->hdr.version = 1; l_data->hdr.srv_uid = l_srv->uid; l_data->hdr.usage_id = l_pkt->hdr.usage_id; l_data->hdr.data_size = l_out_data_size; memcpy(l_data->data, l_out_data, l_out_data_size); dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_DATA, l_data, sizeof(pkt_t) + l_out_data_size); } } break; case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR:{ if ( l_ch_pkt->hdr.data_size == sizeof (dap_stream_ch_chain_net_srv_pkt_error_t) ){ dap_stream_ch_chain_net_srv_pkt_error_t * l_err = (dap_stream_ch_chain_net_srv_pkt_error_t *) l_ch_pkt->data; log_it( L_NOTICE, "Remote responsed with error code 0x%08X", l_err->code ); // TODO code for service client mode }else{ log_it(L_ERROR, "Wrong error response size, %u when expected %zu", l_ch_pkt->hdr.data_size, sizeof ( dap_stream_ch_chain_net_srv_pkt_error_t) ); } } break; case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NEW_TX_COND_RESPONSE:{ dap_chain_net_srv_usage_t * l_usage = NULL; l_usage = l_srv_session->usage_active; dap_stream_ch_chain_net_srv_pkt_request_t* l_responce = (dap_stream_ch_chain_net_srv_pkt_request_t*)l_ch_pkt->data; char *l_tx_in_hash_str = dap_chain_hash_fast_to_str_new(&l_responce->hdr.tx_cond); log_it(L_NOTICE, "Received new tx cond %s", l_tx_in_hash_str); DAP_DELETE(l_tx_in_hash_str); if(!l_usage->is_waiting_new_tx_cond || !l_usage->is_grace) break; l_usage->is_waiting_new_tx_cond = false; dap_stream_ch_chain_net_srv_pkt_error_t l_err = { }; usages_in_grace_t *l_curr_grace_item = NULL; pthread_mutex_lock(&s_ht_grace_table_mutex); HASH_FIND(hh, s_grace_table, &l_usage->tx_cond_hash, sizeof(dap_hash_fast_t), l_curr_grace_item); pthread_mutex_unlock(&s_ht_grace_table_mutex); if (dap_hash_fast_is_blank(&l_responce->hdr.tx_cond)){ //if new tx cond creation failed tx_cond in responce will be blank if (l_curr_grace_item){ HASH_DEL(s_grace_table, l_curr_grace_item); dap_timerfd_delete_mt(l_curr_grace_item->grace->stream_worker->worker, l_curr_grace_item->grace->timer_es_uuid); s_grace_error(l_curr_grace_item->grace, l_err); DAP_DEL_Z(l_curr_grace_item); } break; } dap_chain_datum_tx_t *l_tx = dap_chain_ledger_tx_find_by_hash(l_usage->net->pub.ledger, &l_responce->hdr.tx_cond); if (l_tx){ // Replace if (l_curr_grace_item){ log_it(L_INFO, "Found tx in ledger by net tx responce handler. Finish waiting new tx grace period."); // Stop timer dap_timerfd_delete_mt(l_curr_grace_item->grace->stream_worker->worker, l_curr_grace_item->grace->timer_es_uuid); // finish grace l_usage->tx_cond_hash = l_responce->hdr.tx_cond; l_curr_grace_item->grace->request->hdr.tx_cond = l_responce->hdr.tx_cond; s_grace_period_finish(l_curr_grace_item); } }else{ if (l_curr_grace_item){ l_curr_grace_item->grace->usage->tx_cond_hash = l_responce->hdr.tx_cond; l_curr_grace_item->grace->request->hdr.tx_cond = l_responce->hdr.tx_cond; pthread_mutex_lock(&s_ht_grace_table_mutex); HASH_DEL(s_grace_table, l_curr_grace_item); l_curr_grace_item->tx_cond_hash = l_responce->hdr.tx_cond; HASH_ADD(hh, s_grace_table, tx_cond_hash, sizeof(dap_hash_fast_t), l_curr_grace_item); pthread_mutex_unlock(&s_ht_grace_table_mutex); } } size_t l_success_size = sizeof (dap_stream_ch_chain_net_srv_pkt_success_hdr_t ); dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, l_success_size); if(!l_success) { log_it(L_ERROR, "Memory allocation error in s_stream_ch_packet_in"); return; } l_success->hdr.usage_id = l_usage->id; l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, l_success, l_success_size); DAP_DELETE(l_success); }break; default: log_it( L_WARNING, "Unknown packet type 0x%02X", l_ch_pkt->hdr.type); } if(l_ch_chain_net_srv->notify_callback) l_ch_chain_net_srv->notify_callback(l_ch_chain_net_srv, l_ch_pkt->hdr.type, l_ch_pkt, l_ch_chain_net_srv->notify_callback_arg); } /** * @brief s_stream_ch_packet_out * @param a_ch * @param a_arg */ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch , void* a_arg) { (void) a_arg; dap_stream_ch_set_ready_to_write_unsafe(a_ch, false); // Callback should note that after write action it should restore write flag if it has more data to send on next iteration dap_chain_net_srv_call_write_all( a_ch); }