-
Roman Khlopkov authoredd8a9327a
dap_stream_ch_chain_net_srv.c 36.72 KiB
/*
* Authors:
* Dmitriy Gerasimov <naeper@demlabs.net>
* Cellframe https://cellframe.net
* DeM Labs Inc. https://demlabs.net
* Copyright (c) 2017-2019
* All rights reserved.
This file is part of CellFrame SDK the open source project
CellFrame SDK is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CellFrame SDK is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any CellFrame SDK based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <sys/time.h>
#include "dap_common.h"
#include "dap_hash.h"
#include "rand/dap_rand.h"
#include "dap_chain.h"
#include "dap_chain_datum_tx.h"
#include "dap_chain_datum_tx_in.h"
#include "dap_chain_datum_tx_in_cond.h"
#include "dap_chain_datum_tx_out.h"
#include "dap_chain_datum_tx_out_cond.h"
#include "dap_chain_datum_tx_receipt.h"
#include "dap_chain_mempool.h"
#include "dap_chain_net_srv.h"
#include "dap_chain_net_srv_common.h"
#include "dap_chain_net_srv_stream_session.h"
#include "dap_stream.h"
#include "dap_stream_ch.h"
#include "dap_stream_ch_pkt.h"
#include "dap_stream_ch_chain_net_srv.h"
#include "dap_stream_ch_chain_net_srv_pkt.h"
#include "dap_stream_ch_proc.h"
#include "dap_stream_ch_chain_net_srv.h"
#define LOG_TAG "dap_stream_ch_chain_net_srv"
uint8_t dap_stream_ch_chain_net_srv_get_id()
{
return 'R';
}
static void s_stream_ch_new(dap_stream_ch_t* ch , void* arg);
static void s_stream_ch_delete(dap_stream_ch_t* ch , void* arg);
static void s_stream_ch_packet_in(dap_stream_ch_t* ch , void* arg);
static void s_stream_ch_packet_out(dap_stream_ch_t* ch , void* arg);
/**
* @brief dap_stream_ch_chain_net_init
* @return
*/
int dap_stream_ch_chain_net_srv_init(void)
{
log_it(L_NOTICE,"Chain network services channel initialized");
dap_stream_ch_proc_add(dap_stream_ch_chain_net_srv_get_id(),s_stream_ch_new,s_stream_ch_delete,s_stream_ch_packet_in,s_stream_ch_packet_out);
return 0;
}
/**
* @brief dap_stream_ch_chain_deinit
*/
void dap_stream_ch_chain_net_srv_deinit(void)
{
}
/**
* @brief Set srv uid - for client
*/
void dap_stream_ch_chain_net_srv_set_srv_uid(dap_stream_ch_t* a_ch, dap_chain_net_srv_uid_t a_srv_uid)
{
// save srv id
dap_stream_ch_chain_net_srv_t * l_ch_chain_net_srv = DAP_STREAM_CH_CHAIN_NET_SRV(a_ch);
l_ch_chain_net_srv->srv_uid.uint64 = a_srv_uid.uint64;
}
/**
* @brief s_stream_ch_new
* @param a_ch
* @param arg
*/
void s_stream_ch_new(dap_stream_ch_t* a_ch , void* arg)
{
(void ) arg;
a_ch->internal=DAP_NEW_Z(dap_stream_ch_chain_net_srv_t);
dap_stream_ch_chain_net_srv_t * l_ch_chain_net_srv = DAP_STREAM_CH_CHAIN_NET_SRV(a_ch);
pthread_mutex_init( &l_ch_chain_net_srv->mutex,NULL);
if (a_ch->stream->session->_inheritor == NULL && a_ch->stream->session != NULL)
dap_chain_net_srv_stream_session_create( a_ch->stream->session );
else if ( a_ch->stream->session == NULL)
log_it( L_ERROR, "No session at all!");
else
log_it(L_ERROR, "Session inheritor is already present!");
dap_chain_net_srv_call_opened_all( a_ch);
}
/**
* @brief s_stream_ch_delete
* @param ch
* @param arg
*/
void s_stream_ch_delete(dap_stream_ch_t* a_ch , void* a_arg)
{
(void) a_ch;
(void) a_arg;
log_it(L_DEBUG, "Stream ch chain net srv delete");
dap_chain_net_srv_call_closed_all( a_ch);
}
static bool s_unban_client(dap_chain_net_srv_banlist_item_t *a_item)
{
pthread_mutex_lock(a_item->ht_mutex);
HASH_DEL(*(a_item->ht_head), a_item);
pthread_mutex_unlock(a_item->ht_mutex);
DAP_DELETE(a_item);
return false;
}
static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace)
{
assert(a_grace);
dap_stream_ch_chain_net_srv_pkt_error_t l_err;
memset(&l_err, 0, sizeof(l_err));
dap_chain_net_srv_t * l_srv = NULL;
dap_stream_ch_t *l_ch = a_grace->ch;
if (!dap_stream_ch_check_unsafe(a_grace->stream_worker, l_ch))
goto free_exit;
dap_chain_net_srv_stream_session_t *l_srv_session = l_ch && l_ch->stream && l_ch->stream->session ?
(dap_chain_net_srv_stream_session_t *)l_ch->stream->session->_inheritor : NULL;
if (!l_srv_session)
goto free_exit;
dap_stream_ch_chain_net_srv_pkt_request_t *l_request = a_grace->request;
l_srv = dap_chain_net_srv_get( l_request->hdr.srv_uid );
dap_chain_net_t * l_net = dap_chain_net_by_id( l_request->hdr.net_id );
l_err.net_id.uint64 = l_request->hdr.net_id.uint64;
l_err.srv_uid.uint64 = l_request->hdr.srv_uid.uint64;
if ( ! l_net ) // Network not found
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NOT_FOUND;
if ( ! l_srv ) // Service not found
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND;
if ( l_err.code ){
goto free_exit;
}
dap_ledger_t * l_ledger =l_net->pub.ledger;
dap_chain_datum_tx_t * l_tx = NULL;
dap_chain_tx_out_cond_t * l_tx_out_cond = NULL;
bool l_grace_start = false;
if (l_srv->pricelist ){ // Is present pricelist, not free service
if ( !l_ledger ){ // No ledger
log_it( L_WARNING, "No Ledger");
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NO_LEDGER ;
goto free_exit;
}
l_tx = dap_chain_ledger_tx_find_by_hash( l_ledger,& l_request->hdr.tx_cond );
if ( ! l_tx ){ // No tx cond transaction
if (a_grace->usage) { // marker for reentry to function
log_it( L_WARNING, "No tx cond transaction");
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND ;
goto free_exit;
} else
l_grace_start = true;
}
if (!l_grace_start) {
int l_tx_out_cond_size =0;
l_tx_out_cond = (dap_chain_tx_out_cond_t *)
dap_chain_datum_tx_item_get(l_tx, NULL, TX_ITEM_TYPE_OUT_COND, &l_tx_out_cond_size );
if ( ! l_tx_out_cond ) { // No conditioned output
log_it( L_WARNING, "No conditioned output");
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ;
goto free_exit;
}
// Check cond output if it equesl or not to request
if ( l_tx_out_cond->subtype.srv_pay.srv_uid.uint64 != l_request->hdr.srv_uid.uint64 ){
log_it( L_WARNING, "Wrong service uid in request, tx expect to close its output with 0x%016lX",
l_tx_out_cond->subtype.srv_pay.srv_uid );
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_WRONG_SRV_UID ;
goto free_exit;
}
}
}
dap_chain_net_srv_usage_t *l_usage = NULL;
if (!a_grace->usage) {
l_usage = dap_chain_net_srv_usage_add(l_srv_session, l_net, l_srv);
if ( !l_usage ){ // Usage can't add
log_it( L_WARNING, "Usage can't add");
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_USAGE_CANT_ADD;
goto free_exit;
}
l_err.usage_id = l_usage->id;
// Create one client
l_usage->client = DAP_NEW_Z( dap_chain_net_srv_client_t);
l_usage->client->stream_worker = l_ch->stream_worker;
l_usage->client->ch = l_ch;
l_usage->client->session_id = l_ch->stream->session->id;
l_usage->client->ts_created = time(NULL);
l_usage->tx_cond = l_tx;
memcpy(&l_usage->tx_cond_hash, &l_request->hdr.tx_cond,sizeof (l_usage->tx_cond_hash));
l_usage->ts_created = time(NULL);
} else {
l_usage = a_grace->usage;
l_usage->tx_cond = l_tx;
}
dap_chain_net_srv_price_t * l_price = NULL;
dap_chain_datum_tx_receipt_t * l_receipt = NULL;
const char * l_ticker = NULL;
if (l_srv->pricelist && !l_grace_start) {
l_ticker = dap_chain_ledger_tx_get_token_ticker_by_hash(l_ledger, &l_request->hdr.tx_cond );
dap_stpcpy(l_usage->token_ticker, l_ticker);
dap_chain_net_srv_price_t *l_price_tmp;
DL_FOREACH(l_srv->pricelist, l_price_tmp) {
if (l_price_tmp->net->pub.id.uint64 == l_request->hdr.net_id.uint64
&& dap_strcmp(l_price_tmp->token, l_ticker) == 0
&& l_price_tmp->units_uid.enm == l_tx_out_cond->subtype.srv_pay.unit.enm
)//&& (l_price_tmp->value_datoshi/l_price_tmp->units) < l_tx_out_cond->subtype.srv_pay.header.unit_price_max_datoshi)
{
l_price = l_price_tmp;
break;
}
}
if ( !l_price ) {
log_it( L_WARNING, "Request can't be processed because no acceptable price in pricelist for token %s in network %s",
l_ticker, l_net->pub.name );
l_err.code =DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_ACCEPT_TOKEN;
goto free_exit;
}
}
int ret;
if ((ret = l_srv->callback_requested(l_srv, l_usage->id, l_usage->client, l_request, a_grace->request_size)) != 0) {
log_it( L_WARNING, "Request canceled by service callback, return code %d", ret);
l_err.code = (uint32_t) ret ;
goto free_exit;
}
if ( l_srv->pricelist) {
if (l_price || l_grace_start) {
if (l_price) {
if (a_grace->usage) {
DAP_DELETE(l_usage->price);
}
} else {
l_price = DAP_NEW_Z(dap_chain_net_srv_price_t);
memcpy(l_price, l_srv->pricelist, sizeof(*l_price));
l_price->value_datoshi = 0;
l_price->value_coins = 0;
}
l_usage->price = l_price;
// TODO extend callback to pass ext and ext size from service callbacks
l_receipt = dap_chain_net_srv_issue_receipt( l_usage->service, l_usage, l_usage->price,NULL,0 );
dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST, l_receipt, l_receipt->size);
}else{
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_PRICE_NOT_FOUND ;
goto free_exit;
}
// If we a here we passed all the checks, wow, now if we're not for free we request the signature.
} else{
log_it( L_INFO, "Service provide for free");
l_usage->is_free = true;
size_t l_success_size = sizeof (dap_stream_ch_chain_net_srv_pkt_success_hdr_t );
dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t,
l_success_size);
l_success->hdr.usage_id = l_usage->id;
l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64;
l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64;
dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, l_success, l_success_size);
if ( l_usage->service->callback_response_success )
l_usage->service->callback_response_success ( l_usage->service, l_usage->id, l_usage->client, NULL, 0 );
DAP_DELETE(l_success);
}
if (l_grace_start) {
l_usage->is_grace = true;
a_grace->usage = l_usage;
dap_timerfd_start_on_worker(a_grace->stream_worker->worker, l_srv->grace_period * 1000,
(dap_timerfd_callback_t)s_grace_period_control, a_grace);
return false;
} else {
DAP_DELETE(a_grace->request);
DAP_DELETE(a_grace);
return false;
}
free_exit:
if (l_err.code) {
dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err));
if (l_srv && l_srv->callback_response_error)
l_srv->callback_response_error(l_srv, 0, NULL, &l_err, sizeof(l_err));
}
if (a_grace->usage) { // add client pkey hash to banlist
a_grace->usage->is_active = false;
if (l_srv) {
dap_chain_net_srv_banlist_item_t *l_item = NULL;
pthread_mutex_lock(&l_srv->banlist_mutex);
HASH_FIND(hh, l_srv->ban_list, &a_grace->usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item);
if (l_item)
pthread_mutex_unlock(&l_srv->banlist_mutex);
else {
l_item = DAP_NEW_Z(dap_chain_net_srv_banlist_item_t);
memcpy(&l_item->client_pkey_hash, &a_grace->usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t));
l_item->ht_mutex = &l_srv->banlist_mutex;
l_item->ht_head = &l_srv->ban_list;
HASH_ADD(hh, l_srv->ban_list, client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item);
pthread_mutex_unlock(&l_srv->banlist_mutex);
dap_timerfd_start(l_srv->grace_period * 10000, (dap_timerfd_callback_t)s_unban_client, l_item);
}
}
}
else if (l_usage)
dap_chain_net_srv_usage_delete(l_srv_session, l_usage);
DAP_DELETE(a_grace->request);
DAP_DELETE(a_grace);
return false;
}
/**
* @brief s_stream_ch_packet_in
* @param ch
* @param arg
*/
void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg)
{
dap_stream_ch_chain_net_srv_t * l_ch_chain_net_srv = DAP_STREAM_CH_CHAIN_NET_SRV(a_ch);
dap_stream_ch_pkt_t *l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg; // chain packet
dap_chain_net_srv_stream_session_t * l_srv_session = a_ch && a_ch->stream && a_ch->stream->session ?
a_ch->stream->session->_inheritor : NULL;
if ( ! l_srv_session ){
log_it( L_ERROR, "Not defined service session, switching off packet input process");
dap_stream_ch_set_ready_to_read_unsafe(a_ch, false);
return;
}
dap_stream_ch_chain_net_srv_pkt_error_t l_err;
memset(&l_err,0,sizeof (l_err));
if(l_ch_pkt ) {
switch (l_ch_pkt->hdr.type) {
// for send test data
case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_REQUEST:{
int l_err_code = 0;
dap_stream_ch_chain_net_srv_pkt_test_t *l_request = (dap_stream_ch_chain_net_srv_pkt_test_t*) l_ch_pkt->data;
size_t l_request_size = l_request->data_size + sizeof(dap_stream_ch_chain_net_srv_pkt_test_t);
if(l_ch_pkt->hdr.size != l_request_size) {
log_it(L_WARNING, "Wrong request size, less or more than required");
break;
}
struct timeval l_recvtime2;
gettimeofday(&l_recvtime2, NULL);
memcpy(&l_request->recv_time2,&l_recvtime2,sizeof (l_recvtime2));
//printf("\n%lu.%06lu \n", (unsigned long) l_request->recv_time2.tv_sec, (unsigned long) l_request->recv_time2.tv_usec);
dap_chain_hash_fast_t l_data_hash;
dap_hash_fast(l_request->data, l_request->data_size, &l_data_hash);
if(l_request->data_size>0 && !dap_hash_fast_compare(&l_data_hash, &(l_request->data_hash))){
l_err_code+=2;
}
// create data to send back
dap_stream_ch_chain_net_srv_pkt_test_t *l_request_out = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_test_t, sizeof(dap_stream_ch_chain_net_srv_pkt_test_t) + l_request->data_size_recv);
// copy info from recv message
memcpy(l_request_out,l_request, sizeof(dap_stream_ch_chain_net_srv_pkt_test_t));
l_request_out->data_size = l_request->data_size_recv;
randombytes(l_request_out->data, l_request_out->data_size);
l_request_out->err_code = l_err_code;
dap_hash_fast(l_request_out->data, l_request_out->data_size, &l_request_out->data_hash);
strncpy(l_request_out->ip_send,a_ch->stream->esocket->hostaddr , sizeof(l_request_out->ip_send)-1);
// Thats to prevent unaligned pointer
struct timeval l_tval;
gettimeofday(&l_tval, NULL);
l_request_out->send_time2.tv_sec = l_tval.tv_sec;
l_request_out->send_time2.tv_usec = l_tval.tv_usec;
// send response
dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE, l_request_out,
l_request_out->data_size + sizeof(dap_stream_ch_chain_net_srv_pkt_test_t));
DAP_DELETE(l_request_out);
} break;
// for receive test data.
case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE: {
dap_stream_ch_chain_net_srv_pkt_test_t *l_request = (dap_stream_ch_chain_net_srv_pkt_test_t *) l_ch_pkt->data;
size_t l_request_size = l_request->data_size + sizeof(dap_stream_ch_chain_net_srv_pkt_test_t);
if(l_ch_pkt->hdr.size != l_request_size) {
log_it(L_WARNING, "Wrong request size, less or more than required");
break;
}
gettimeofday(&l_request->recv_time1, NULL);
dap_chain_hash_fast_t l_data_hash;
dap_hash_fast(l_request->data, l_request->data_size, &l_data_hash);
if(!dap_hash_fast_compare(&l_data_hash, &(l_request->data_hash))) {
l_request->err_code += 4;
}
dap_stream_ch_set_ready_to_write_unsafe(a_ch, false);
} break;
// only for server
case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_REQUEST:{
if (l_ch_pkt->hdr.size < sizeof(dap_stream_ch_chain_net_srv_pkt_request_hdr_t) ){
log_it( L_WARNING, "Wrong request size, less than minimum");
break;
}
dap_chain_net_srv_grace_t *l_grace = DAP_NEW_Z(dap_chain_net_srv_grace_t);
// Parse the request
l_grace->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, l_ch_pkt->hdr.size);
memcpy(l_grace->request, l_ch_pkt->data, l_ch_pkt->hdr.size);
l_grace->request_size = l_ch_pkt->hdr.size;
l_grace->ch = a_ch;
l_grace->stream_worker = a_ch->stream_worker;
s_grace_period_control(l_grace);
} break;
// only for client
case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST:{
log_it( L_NOTICE, "Requested smth to sign");
dap_chain_datum_tx_receipt_t * l_receipt = (dap_chain_datum_tx_receipt_t *) l_ch_pkt->data;
size_t l_receipt_size = l_ch_pkt->hdr.size;
// create receipt copy, because l_receipt may be reallocated inside dap_chain_datum_tx_receipt_create()!
dap_chain_datum_tx_receipt_t *l_receipt_new = dap_chain_datum_tx_receipt_create(l_receipt->receipt_info.srv_uid,
l_receipt->receipt_info.units_type,
l_receipt->receipt_info.units,
l_receipt->receipt_info.value_datoshi,
l_receipt->exts_n_signs, l_receipt->exts_size);
//l_srv_session->usages
///l_usage->service->uid.uint64;
//dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find( l_srv_session, l_pkt->hdr.usage_id );
dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_ch_chain_net_srv->srv_uid);
if(l_srv && l_srv->callback_client_sign_request) {
// Sign receipt
l_srv->callback_client_sign_request(l_srv, 0, NULL, &l_receipt_new, l_receipt_size);
dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE,
l_receipt_new, l_receipt_new->size);
}
DAP_DELETE(l_receipt_new);
// TODO sign smth
} break;
// only for server
case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE:{
if (l_ch_pkt->hdr.size <= sizeof(dap_chain_receipt_info_t) + 1) {
log_it(L_ERROR, "Wrong sign response size, %zd when expected at least %zd with smth", l_ch_pkt->hdr.size,
sizeof(dap_chain_receipt_info_t)+1 );
break;
}
dap_chain_datum_tx_receipt_t * l_receipt = (dap_chain_datum_tx_receipt_t *) l_ch_pkt->data;
size_t l_receipt_size = l_ch_pkt->hdr.size;
dap_chain_net_srv_usage_t * l_usage= NULL, *l_tmp= NULL;
bool l_is_found = false;
pthread_mutex_lock(& l_srv_session->parent->mutex );
HASH_ITER(hh, l_srv_session->usages, l_usage, l_tmp){
if ( l_usage->receipt_next ){ // If we have receipt next
if ( memcmp(&l_usage->receipt_next->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){
l_is_found = true;
break;
}
}else if (l_usage->receipt ){ // If we sign first receipt
if ( memcmp(&l_usage->receipt->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){
l_is_found = true;
break;
}
}
}
pthread_mutex_unlock(& l_srv_session->parent->mutex );
if ( !l_is_found || ! l_usage ){
log_it(L_WARNING, "Can't find receipt in usages thats equal to response receipt");
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND ;
dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) );
if (l_usage && l_usage->service && l_usage->service->callback_response_error)
l_usage->service->callback_response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err) );
break;
}
l_err.usage_id = l_usage->id;
l_err.net_id.uint64 = l_usage->net->pub.id.uint64;
l_err.srv_uid.uint64 = l_usage->service->uid.uint64;
dap_chain_tx_out_cond_t *l_tx_out_cond;
if (!l_usage->is_grace) {
if (! l_usage->tx_cond ){
log_it(L_WARNING, "No tx out in usage");
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND ;
dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) );
if (l_usage->service->callback_response_error)
l_usage->service->callback_response_error( l_usage->service, l_usage->id, l_usage->client,
&l_err, sizeof (l_err) );
break;
}
int l_tx_out_cond_size =0;
l_tx_out_cond = (dap_chain_tx_out_cond_t *)dap_chain_datum_tx_item_get(l_usage->tx_cond, NULL,
TX_ITEM_TYPE_OUT_COND, &l_tx_out_cond_size );
if ( ! l_tx_out_cond ){ // No conditioned output
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ;
dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) );
if (l_usage->service->callback_response_error)
l_usage->service->callback_response_error( l_usage->service, l_usage->id, l_usage->client,&l_err,sizeof (l_err) );
break;
}
}
// get a second signature - from the client (first sign in server, second sign in client)
dap_sign_t * l_receipt_sign = dap_chain_datum_tx_receipt_sign_get( l_receipt, l_receipt_size, 1);
if ( ! l_receipt_sign ){
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND ;
dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) );
if (l_usage->service->callback_response_error)
l_usage->service->callback_response_error( l_usage->service, l_usage->id, l_usage->client,
&l_err, sizeof (l_err) );
break;
}
// Check receipt signature pkey hash
dap_sign_get_pkey_hash(l_receipt_sign, &l_usage->client_pkey_hash);
dap_chain_net_srv_banlist_item_t *l_item = NULL;
dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_receipt->receipt_info.srv_uid);
if (l_usage->is_grace) {
pthread_mutex_lock(&l_srv->banlist_mutex);
HASH_FIND(hh, l_srv->ban_list, &l_usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item);
pthread_mutex_unlock(&l_srv->banlist_mutex);
if (l_item) { // client banned
// Update actual receipt
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_BANNED_PKEY_HASH ;
dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof(l_err));
if (l_usage->service->callback_response_error)
l_usage->service->callback_response_error(l_usage->service,l_usage->id, l_usage->client, &l_err, sizeof(l_err));
break;
}
} else {
if (memcmp(l_usage->client_pkey_hash.raw, l_tx_out_cond->subtype.srv_pay.pkey_hash.raw, sizeof(l_usage->client_pkey_hash)) != 0) {
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_WRONG_PKEY_HASH ;
dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) );
if (l_usage->service->callback_response_error)
l_usage->service->callback_response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err) );
break;
}
}
// Update actual receipt
bool l_is_first_sign = false;
if (! l_usage->receipt_next && l_usage->receipt){
DAP_DELETE(l_usage->receipt);
l_usage->receipt = DAP_NEW_SIZE(dap_chain_datum_tx_receipt_t,l_receipt_size);
l_usage->receipt_size = l_receipt_size;
l_is_first_sign = true;
l_usage->is_active = true;
memcpy( l_usage->receipt, l_receipt, l_receipt_size);
} else if (l_usage->receipt_next ){
DAP_DELETE(l_usage->receipt_next);
l_usage->receipt_next = DAP_NEW_SIZE(dap_chain_datum_tx_receipt_t,l_receipt_size);
l_usage->receipt_next_size = l_receipt_size;
l_usage->is_active = true;
memcpy( l_usage->receipt_next, l_receipt, l_receipt_size);
}
// Store receipt if any problems with transactions
dap_chain_hash_fast_t l_receipt_hash={0};
dap_hash_fast(l_receipt,l_receipt_size,&l_receipt_hash);
char * l_receipt_hash_str = dap_chain_hash_fast_to_str_new(&l_receipt_hash);
dap_chain_global_db_gr_set( l_receipt_hash_str,l_receipt,l_receipt_size,"local.receipts");
l_receipt_hash_str = NULL; // To prevent usage of this pointer when it will be free by GDB processor
size_t l_success_size;
dap_chain_hash_fast_t *l_tx_in_hash = NULL;
if (!l_usage->is_grace) {
// Form input transaction
dap_chain_addr_t *l_wallet_addr = dap_chain_wallet_get_addr(l_usage->wallet, l_usage->net->pub.id);
l_tx_in_hash = dap_chain_mempool_tx_create_cond_input(l_usage->net, &l_usage->tx_cond_hash, l_wallet_addr,
dap_chain_wallet_get_key(l_usage->wallet, 0),
l_receipt, l_receipt_size);
if ( l_tx_in_hash){
char * l_tx_in_hash_str = dap_chain_hash_fast_to_str_new(l_tx_in_hash);
log_it(L_NOTICE, "Formed tx %s for input with active receipt", l_tx_in_hash_str);
DAP_DELETE(l_tx_in_hash_str);
}else
log_it(L_ERROR, "Can't create input tx cond transaction!");
l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t) + sizeof(dap_chain_hash_fast_t);
} else {
l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t);
}
dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t,
l_success_size);
l_success->hdr.usage_id = l_usage->id;
l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64;
l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64;
if (l_tx_in_hash) {
memcpy(l_success->custom_data, l_tx_in_hash, sizeof(dap_chain_hash_fast_t));
DAP_DELETE(l_tx_in_hash);
}
if (l_usage->is_grace)
log_it(L_NOTICE, "Receipt is OK, but transaction can't be found. Start the grace period for %d seconds",
l_srv->grace_period);
else
log_it(L_NOTICE, "Receipt with remote client sign is acceptible for. Now start the service's usage");
dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS ,
l_success, l_success_size);
DAP_DELETE(l_success);
if ( l_is_first_sign && l_usage->service->callback_response_success){
if( l_usage->service->callback_response_success(l_usage->service,l_usage->id, l_usage->client,
l_receipt, l_receipt_size ) !=0 ){
log_it(L_NOTICE, "No success by service callback, inactivating service usage");
l_usage->is_active = false;
}
// issue receipt next
l_usage->receipt_next = dap_chain_net_srv_issue_receipt( l_usage->service, l_usage, l_usage->price ,NULL,0);
l_usage->receipt_next_size = l_usage->receipt_next->size;
dap_stream_ch_pkt_write_unsafe( a_ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST ,
l_usage->receipt_next, l_usage->receipt_next->size);
}else if ( l_usage->service->callback_receipt_next_success){
if (l_usage->service->callback_receipt_next_success(l_usage->service,l_usage->id, l_usage->client,
l_receipt, l_receipt_size ) != 0 ){
log_it(L_NOTICE, "No success by service callback, inactivating service usage");
l_usage->is_active = false;
}
}
} break;
case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS:{
log_it( L_NOTICE, "Responsed with success");
// TODO code for service client mode
dap_stream_ch_chain_net_srv_pkt_success_t * l_success = (dap_stream_ch_chain_net_srv_pkt_success_t*)l_ch_pkt->data;
size_t l_success_size = l_ch_pkt->hdr.size;
dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_success->hdr.srv_uid);
if ( l_srv && l_srv->callback_client_success){
// Create client for client)
dap_chain_net_srv_client_t *l_client = DAP_NEW_Z( dap_chain_net_srv_client_t);
l_client->ch = a_ch;
l_client->stream_worker = a_ch->stream_worker;
l_client->ts_created = time(NULL);
l_client->session_id = a_ch->stream->session->id;
l_srv->callback_client_success(l_srv, l_success->hdr.usage_id, l_client, l_success, l_success_size );
//l_success->hdr.net_id, l_success->hdr.srv_uid, l_success->hdr.usage_id
}
} break;
case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_DATA:{
if (l_ch_pkt->hdr.size < sizeof(dap_stream_ch_chain_net_srv_pkt_data_hdr_t) ){
log_it( L_WARNING, "Wrong request size, less than minimum");
break;
}
// Parse the packet
dap_stream_ch_chain_net_srv_pkt_data_t * l_pkt =(dap_stream_ch_chain_net_srv_pkt_data_t *) l_ch_pkt->data;
size_t l_pkt_size = l_ch_pkt->hdr.size - sizeof (dap_stream_ch_chain_net_srv_pkt_data_t);
dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get( l_pkt->hdr.srv_uid);
dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find_unsafe( l_srv_session, l_pkt->hdr.usage_id );
// If service not found
if ( l_srv == NULL){
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND ;
l_err.srv_uid = l_pkt->hdr.srv_uid;
dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) );
break;
}
// Check if callback is not present
if ( l_srv->callback_stream_ch_read == NULL ){
l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_CH_NOT_FOUND ;
l_err.srv_uid = l_pkt->hdr.srv_uid;
dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) );
break;
}
// Call callback if present
l_srv->callback_stream_ch_read( l_srv,l_usage->id, l_usage->client, l_pkt->data, l_pkt_size );
} break;
case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR:{
if ( l_ch_pkt->hdr.size == sizeof (dap_stream_ch_chain_net_srv_pkt_error_t) ){
dap_stream_ch_chain_net_srv_pkt_error_t * l_err = (dap_stream_ch_chain_net_srv_pkt_error_t *) l_ch_pkt->data;
log_it( L_NOTICE, "Remote responsed with error code 0x%08X", l_err->code );
// TODO code for service client mode
}else{
log_it(L_ERROR, "Wrong error response size, %zd when expected %zd", l_ch_pkt->hdr.size,
sizeof ( dap_stream_ch_chain_net_srv_pkt_error_t) );
}
} break;
default: log_it( L_WARNING, "Unknown packet type 0x%02X", l_ch_pkt->hdr.type);
}
if(l_ch_chain_net_srv->notify_callback)
l_ch_chain_net_srv->notify_callback(l_ch_chain_net_srv, l_ch_pkt->hdr.type, l_ch_pkt, l_ch_chain_net_srv->notify_callback_arg);
}
}
/**
* @brief s_stream_ch_packet_out
* @param a_ch
* @param a_arg
*/
void s_stream_ch_packet_out(dap_stream_ch_t* a_ch , void* a_arg)
{
(void) a_arg;
dap_stream_ch_set_ready_to_write_unsafe(a_ch, false);
// Callback should note that after write action it should restore write flag if it has more data to send on next iteration
dap_chain_net_srv_call_write_all( a_ch);
}