From 45671a76e83afcf164860dd80c2b00443aea50c3 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Fri, 30 Oct 2020 18:26:38 +0300 Subject: [PATCH] [+] Grace period for services --- .../dap_stream_ch_chain_net_srv.c | 412 ++++++++++-------- modules/net/srv/dap_chain_net_srv.c | 2 +- modules/net/srv/include/dap_chain_net_srv.h | 2 +- .../srv/include/dap_chain_net_srv_common.h | 9 + modules/service/vpn/dap_chain_net_srv_vpn.c | 2 + 5 files changed, 234 insertions(+), 193 deletions(-) diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c index cd4c595802..f2212bdbfb 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c @@ -128,6 +128,212 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch , void* a_arg) dap_chain_net_srv_call_closed_all( a_ch); } + +void s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) +{ + if (a_grace->timer) + dap_timerfd_delete(a_grace->timer); + + if (!dap_stream_ch_check_unsafe(a_grace->worker, a_grace->ch)) + goto free_exit; + dap_chain_net_srv_stream_session_t *l_srv_session = a_ch && a_ch->stream && a_ch->stream->session ? + (dap_chain_net_srv_stream_session_t *)a_ch->stream->session->_inheritor : NULL; + if (!l_srv_session) + goto free_exit; + dap_stream_ch_chain_net_srv_pkt_error_t l_err; + memset(&l_err,0,sizeof (l_err)); + + dap_stream_ch_chain_net_srv_pkt_request_t *l_request = a_grace->request; + dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get( l_request->hdr.srv_uid ); + dap_chain_net_t * l_net = dap_chain_net_by_id( l_request->hdr.net_id ); + + l_err.net_id.uint64 = l_request->hdr.net_id.uint64; + l_err.srv_uid.uint64 = l_request->hdr.srv_uid.uint64; + + if ( ! l_net ) // Network not found + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NOT_FOUND; + + if ( ! l_srv ) // Service not found + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND; + + if ( l_err.code ){ + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + if (l_srv && l_srv->callback_response_error) + l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); + goto free_exit; + } + + dap_ledger_t * l_ledger =l_net->pub.ledger; + dap_chain_datum_tx_t * l_tx = NULL; + dap_chain_tx_out_cond_t * l_tx_out_cond = NULL; + bool l_grace_start = false; + unsigned int l_grace_period = 60; + if (l_srv->pricelist ){ // Is present pricelist, not free service + + if ( !l_ledger ){ // No ledger + log_it( L_WARNING, "No Ledger"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NO_LEDGER ; + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + if (l_srv->callback_response_error) + l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); + goto free_exit; + } + + l_tx = dap_chain_ledger_tx_find_by_hash( l_ledger,& l_request->hdr.tx_cond ); + if ( ! l_tx ){ // No tx cond transaction + if (a_grace->usage) { + log_it( L_WARNING, "No tx cond transaction"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND ; + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + if (l_srv->callback_response_error) + l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); + goto free_exit; + } else + l_grace_start = true; + } + if (!l_grace_start) { + int l_tx_out_cond_size =0; + l_tx_out_cond = (dap_chain_tx_out_cond_t *) + dap_chain_datum_tx_item_get(l_tx, NULL, TX_ITEM_TYPE_OUT_COND, &l_tx_out_cond_size ); + + if ( ! l_tx_out_cond ) { // No conditioned output + log_it( L_WARNING, "No conditioned output"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + if (l_srv->callback_response_error) + l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); + goto free_exit; + } + + // Check cond output if it equesl or not to request + if ( l_tx_out_cond->subtype.srv_pay.srv_uid.uint64 != l_request->hdr.srv_uid.uint64 ){ + log_it( L_WARNING, "Wrong service uid in request, tx expect to close its output with 0x%016lX", + l_tx_out_cond->subtype.srv_pay.srv_uid ); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_WRONG_SRV_UID ; + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + if (l_srv->callback_response_error) + l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); + goto free_exit; + } + } + } + dap_chain_net_srv_usage_t *l_usage = NULL; + if (!a_grace->usage) { + l_usage = dap_chain_net_srv_usage_add(l_srv_session, l_net, l_srv); + if ( !l_usage ){ // Usage can't add + log_it( L_WARNING, "Usage can't add"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_USAGE_CANT_ADD; + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + if (l_srv->callback_response_error) + l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); + goto free_exit; + } + + l_err.usage_id = l_usage->id; + + // Create one client + l_usage->client = DAP_NEW_Z( dap_chain_net_srv_client_t); + l_usage->client->stream_worker = a_ch->stream_worker; + l_usage->client->ch = a_ch; + l_usage->client->session_id = a_ch->stream->session->id; + l_usage->client->ts_created = time(NULL); + l_usage->tx_cond = l_tx; + memcpy(&l_usage->tx_cond_hash, &l_request->hdr.tx_cond,sizeof (l_usage->tx_cond_hash)); + l_usage->ts_created = time(NULL); + } else { + l_usage = a_grace->usage; + l_usage->tx_cond = l_tx; + } + dap_chain_net_srv_price_t * l_price = NULL; + dap_chain_datum_tx_receipt_t * l_receipt = NULL; + const char * l_ticker = NULL; + if (l_srv->pricelist && !l_grace_start) { + l_ticker = dap_chain_ledger_tx_get_token_ticker_by_hash(l_ledger, &l_request->hdr.tx_cond ); + dap_stpcpy(l_usage->token_ticker, l_ticker); + + dap_chain_net_srv_price_t *l_price_tmp; + DL_FOREACH(l_srv->pricelist, l_price_tmp) { + if (l_price_tmp->net->pub.id.uint64 == l_request->hdr.net_id.uint64 + && dap_strcmp(l_price_tmp->token, l_ticker) == 0 + && l_price_tmp->units_uid.enm == l_tx_out_cond->subtype.srv_pay.unit.enm + )//&& (l_price_tmp->value_datoshi/l_price_tmp->units) < l_tx_out_cond->subtype.srv_pay.header.unit_price_max_datoshi) + { + l_price = l_price_tmp; + break; + } + } + if ( !l_price ) { + log_it( L_WARNING, "Request can't be processed because no acceptable price in pricelist for token %s in network %s", + l_ticker, l_net->pub.name ); + l_err.code =DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_ACCEPT_TOKEN; + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + if (l_srv->callback_response_error) + l_srv->callback_response_error(l_srv,l_usage->id,l_usage->client,&l_err,sizeof (l_err) ); + goto free_exit; + } + } + int ret; + if ( (ret= l_srv->callback_requested(l_srv,l_usage->id, l_usage->client, l_request, l_ch_pkt->hdr.size ) )!= 0 ){ + log_it( L_WARNING, "Request canceled by service callback, return code %d", ret); + l_err.code = (uint32_t) ret ; + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + if (l_srv->callback_response_error) + l_srv->callback_response_error(l_srv,l_usage->id, NULL,&l_err,sizeof (l_err) ); + goto free_exit; + } + + if ( l_srv->pricelist && !l_grace_start) { + if ( l_price ){ + l_usage->price = l_price; + // TODO extend callback to pass ext and ext size from service callbacks + l_receipt = dap_chain_net_srv_issue_receipt( l_usage->service, l_usage, l_usage->price,NULL,0 ); + dap_stream_ch_pkt_write_unsafe( a_ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST , + l_receipt, l_receipt->size); + + }else{ + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_PRICE_NOT_FOUND ; + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + if (l_srv->callback_response_error) + l_srv->callback_response_error( l_srv, l_usage->id, NULL, &l_err, sizeof( l_err ) ); + goto free_exit; + } + // If we a here we passed all the checks, wow, now if we're not for free we request the signature. + } else{ + if (l_grace_start) + log_it( L_INFO, "Start service grace period %d seconds", l_grace_period); + else + log_it( L_INFO, "Service provide for free"); + l_usage->is_free = true; + size_t l_success_size = sizeof (dap_stream_ch_chain_net_srv_pkt_success_hdr_t ); + dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, + l_success_size); + l_success->hdr.usage_id = l_usage->id; + l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; + l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; + + if (dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, + l_success, l_success_size)) { + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + } + + if ( l_usage->service->callback_response_success ) + l_usage->service->callback_response_success ( l_usage->service, l_usage->id, l_usage->client, NULL, 0 ); + DAP_DELETE(l_success); + } + if (l_grace_start) { + a_grace->usage = l_usage; + a_grace->timer = dap_timerfd_start_on_worker(a_grace->stream_worker->worker, l_grace_period * 1000, s_grace_period_control, a_grace, false); + return; + } +free_exit: + if (a_grace->usage) + a_grace->usage->is_active = false; + else if (l_usage) + dap_chain_net_srv_usage_delete(l_srv_session, l_usage); + DAP_DELETE(a_grace->request); + DAP_DELETE(a_grace); +} + /** * @brief s_stream_ch_packet_in * @param ch @@ -216,179 +422,13 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) log_it( L_WARNING, "Wrong request size, less than minimum"); break; } + dap_chain_net_srv_grace_t *l_grace = DAP_NEW_Z(dap_chain_net_srv_grace_t); // Parse the request - dap_stream_ch_chain_net_srv_pkt_request_t * l_request =(dap_stream_ch_chain_net_srv_pkt_request_t *) l_ch_pkt->data; - //size_t l_request_size = l_ch_pkt->hdr.size; - dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get( l_request->hdr.srv_uid ); - dap_chain_net_t * l_net = dap_chain_net_by_id( l_request->hdr.net_id ); - - l_err.net_id.uint64 = l_request->hdr.net_id.uint64; - l_err.srv_uid.uint64 = l_request->hdr.srv_uid.uint64; - - if ( ! l_net ) // Network not found - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NOT_FOUND; - - if ( ! l_srv ) // Service not found - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND; - - if ( l_err.code ){ - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv && l_srv->callback_response_error) - l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); - break; - } - - dap_ledger_t * l_ledger =l_net->pub.ledger; - dap_chain_datum_tx_t * l_tx = NULL; - dap_chain_tx_out_cond_t * l_tx_out_cond = NULL; - if (l_srv->pricelist ){ // Is present pricelist, not free service - - if ( !l_ledger ){ // No ledger - log_it( L_WARNING, "No Ledger"); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NO_LEDGER ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv->callback_response_error) - l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); - break; - } - - l_tx = dap_chain_ledger_tx_find_by_hash( l_ledger,& l_request->hdr.tx_cond ); - if ( ! l_tx ){ // No tx cond transaction - log_it( L_WARNING, "No tx cond transaction"); - /// TODO Add tx cond treshold and ability to provide service before the transaction comes from CDB - /// - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv->callback_response_error) - l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); - break; - } - int l_tx_out_cond_size =0; - l_tx_out_cond = (dap_chain_tx_out_cond_t *) - dap_chain_datum_tx_item_get(l_tx, NULL, TX_ITEM_TYPE_OUT_COND, &l_tx_out_cond_size ); - - if ( ! l_tx_out_cond ) { // No conditioned output - log_it( L_WARNING, "No conditioned output"); - - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv->callback_response_error) - l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); - break; - } - - // Check cond output if it equesl or not to request - if ( l_tx_out_cond->subtype.srv_pay.srv_uid.uint64 != l_request->hdr.srv_uid.uint64 ){ - log_it( L_WARNING, "Wrong service uid in request, tx expect to close its output with 0x%016lX", - l_tx_out_cond->subtype.srv_pay.srv_uid ); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_WRONG_SRV_UID ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv->callback_response_error) - l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); - break; - } - } - dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_add( l_srv_session, - l_net,l_srv ); - if ( !l_usage ){ // Usage can't add - log_it( L_WARNING, "Usage can't add"); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_USAGE_CANT_ADD; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv->callback_response_error) - l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); - break; - } - - l_err.usage_id = l_usage->id; - - // Create one client - l_usage->client = DAP_NEW_Z( dap_chain_net_srv_client_t); - l_usage->client->stream_worker = a_ch->stream_worker; - l_usage->client->ch = a_ch; - l_usage->client->session_id = a_ch->stream->session->id; - l_usage->client->ts_created = time(NULL); - l_usage->tx_cond = l_tx; - memcpy(&l_usage->tx_cond_hash, &l_request->hdr.tx_cond,sizeof (l_usage->tx_cond_hash)); - l_usage->ts_created = time(NULL); - - dap_chain_net_srv_price_t * l_price = NULL; - dap_chain_datum_tx_receipt_t * l_receipt = NULL; - const char * l_ticker = NULL; - if (l_srv->pricelist ){ - l_ticker = dap_chain_ledger_tx_get_token_ticker_by_hash(l_ledger, &l_request->hdr.tx_cond ); - dap_stpcpy(l_usage->token_ticker, l_ticker); - - dap_chain_net_srv_price_t *l_price_tmp; - DL_FOREACH(l_srv->pricelist, l_price_tmp) { - if (l_price_tmp->net->pub.id.uint64 == l_request->hdr.net_id.uint64 - && dap_strcmp(l_price_tmp->token, l_ticker) == 0 - && l_price_tmp->units_uid.enm == l_tx_out_cond->subtype.srv_pay.unit.enm - )//&& (l_price_tmp->value_datoshi/l_price_tmp->units) < l_tx_out_cond->subtype.srv_pay.header.unit_price_max_datoshi) - { - l_price = l_price_tmp; - break; - } - } - if ( !l_price ) { - log_it( L_WARNING, "Request can't be processed because no acceptable price in pricelist for token %s in network %s", - l_ticker, l_net->pub.name ); - dap_chain_net_srv_usage_delete(l_srv_session, l_usage); - l_err.code =DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_ACCEPT_TOKEN; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv->callback_response_error) - l_srv->callback_response_error(l_srv,l_usage->id,l_usage->client,&l_err,sizeof (l_err) ); - break; - } - } - int ret; - if ( (ret= l_srv->callback_requested(l_srv,l_usage->id, l_usage->client, l_request, l_ch_pkt->hdr.size ) )!= 0 ){ - log_it( L_WARNING, "Request canceled by service callback, return code %d", ret); - dap_chain_net_srv_usage_delete(l_srv_session, l_usage); - l_err.code = (uint32_t) ret ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv->callback_response_error) - l_srv->callback_response_error(l_srv,l_usage->id, NULL,&l_err,sizeof (l_err) ); - break; - } - - if ( l_srv->pricelist ){ - if ( l_price ){ - l_usage->price = l_price; - // TODO extend callback to pass ext and ext size from service callbacks - l_receipt = dap_chain_net_srv_issue_receipt( l_usage->service, l_usage, l_usage->price,NULL,0 ); - dap_stream_ch_pkt_write_unsafe( a_ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST , - l_receipt, l_receipt->size); - - }else{ - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_PRICE_NOT_FOUND ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv->callback_response_error) - l_srv->callback_response_error( l_srv, l_usage->id, NULL, &l_err, sizeof( l_err ) ); - } - // If we a here we passed all the checks, wow, now if we're not for free we request the signature. - } else{ - log_it( L_INFO, "Service provide for free"); - l_usage->is_free = true; - size_t l_success_size = sizeof (dap_stream_ch_chain_net_srv_pkt_success_hdr_t ); - dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, - l_success_size); - l_success->hdr.usage_id = l_usage->id; - l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; - l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; - - if (dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, - l_success, l_success_size)) { - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } - - if ( l_usage->service->callback_receipt_first_success ) - l_usage->service->callback_receipt_first_success ( l_usage->service, l_usage->id, l_usage->client, NULL, 0 ); - DAP_DELETE(l_success); - - } - // l_receipt used in l_usage->receipt - //if(l_receipt) - // DAP_DELETE(l_receipt); + l_grace->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, l_ch_pkt->hdr.size); + memcpy(l_grace->request, l_ch_pkt->data, l_ch_pkt->hdr.size); + l_grace->ch = a_ch; + l_grace->stream_worker = a_ch->stream_worker; + s_grace_period_control(l_grace); } break; // only for client case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST:{ @@ -401,20 +441,18 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) l_receipt->receipt_info.units, l_receipt->receipt_info.value_datoshi, l_receipt->exts_n_signs, l_receipt->exts_size); - - //l_srv_session->usages ///l_usage->service->uid.uint64; //dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find( l_srv_session, l_pkt->hdr.usage_id ); dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_ch_chain_net_srv->srv_uid); - if(l_srv && l_srv->callback_client_sign_request) { - // Sign receipt - l_srv->callback_client_sign_request(l_srv, 0, NULL, &l_receipt_new, l_receipt_size); - if(dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE, - l_receipt_new, l_receipt_new->size)) { - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + if(l_srv && l_srv->callback_client_sign_request) { + // Sign receipt + l_srv->callback_client_sign_request(l_srv, 0, NULL, &l_receipt_new, l_receipt_size); + if(dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE, + l_receipt_new, l_receipt_new->size)) { + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + } } - } DAP_DELETE(l_receipt_new); // TODO sign smth } break; @@ -534,14 +572,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) if ( l_tx_in_hash){ char * l_tx_in_hash_str = dap_chain_hash_fast_to_str_new(l_tx_in_hash); log_it(L_NOTICE, "Formed tx %s for input with active receipt", l_tx_in_hash_str); - - /* We could put transaction directly to chains - if ( dap_chain_net_get_role( l_usage->net ).enums == NODE_ROLE_MASTER || - dap_chain_net_get_role( l_usage->net ).enums == NODE_ROLE_CELL_MASTER || - dap_chain_net_get_role( l_usage->net ).enums == NODE_ROLE_ROOT || - dap_chain_net_get_role( l_usage->net ).enums == NODE_ROLE_ROOT_MASTER ){ - dap_chain_net_proc_mempool( l_usage->net); - }*/ DAP_DELETE(l_tx_in_hash_str); }else log_it(L_ERROR, "Can't create input tx cond transaction!"); @@ -560,8 +590,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) l_success, l_success_size); DAP_DELETE(l_success); - if ( l_is_first_sign && l_usage->service->callback_receipt_first_success){ - if( l_usage->service->callback_receipt_first_success(l_usage->service,l_usage->id, l_usage->client, + if ( l_is_first_sign && l_usage->service->callback_response_success){ + if( l_usage->service->callback_response_success(l_usage->service,l_usage->id, l_usage->client, l_receipt, l_receipt_size ) !=0 ){ log_it(L_NOTICE, "No success by service callback, inactivating service usage"); l_usage->is_active = false; diff --git a/modules/net/srv/dap_chain_net_srv.c b/modules/net/srv/dap_chain_net_srv.c index 99fa9dcea3..0b307a578f 100644 --- a/modules/net/srv/dap_chain_net_srv.c +++ b/modules/net/srv/dap_chain_net_srv.c @@ -551,7 +551,7 @@ dap_chain_net_srv_t* dap_chain_net_srv_add(dap_chain_net_srv_uid_t a_uid,dap_cha l_srv = DAP_NEW_Z(dap_chain_net_srv_t); l_srv->uid.uint64 = a_uid.uint64; l_srv->callback_requested = a_callback_request; - l_srv->callback_receipt_first_success = a_callback_response_success; + l_srv->callback_response_success = a_callback_response_success; l_srv->callback_response_error = a_callback_response_error; l_srv->callback_receipt_next_success = a_callback_receipt_next_success; l_sdata = DAP_NEW_Z(service_list_t); diff --git a/modules/net/srv/include/dap_chain_net_srv.h b/modules/net/srv/include/dap_chain_net_srv.h index c7de7e8f0c..873367548f 100755 --- a/modules/net/srv/include/dap_chain_net_srv.h +++ b/modules/net/srv/include/dap_chain_net_srv.h @@ -47,7 +47,7 @@ typedef struct dap_chain_net_srv dap_chain_net_srv_callback_data_t callback_requested; // Receipt first sign successfull - dap_chain_net_srv_callback_data_t callback_receipt_first_success; + dap_chain_net_srv_callback_data_t callback_response_success; // Response error dap_chain_net_srv_callback_data_t callback_response_error; diff --git a/modules/net/srv/include/dap_chain_net_srv_common.h b/modules/net/srv/include/dap_chain_net_srv_common.h index d4141c5746..e92995494c 100755 --- a/modules/net/srv/include/dap_chain_net_srv_common.h +++ b/modules/net/srv/include/dap_chain_net_srv_common.h @@ -33,6 +33,8 @@ #include "dap_chain_ledger.h" #include "dap_chain_net.h" #include "dap_chain_wallet.h" +#include "dap_timerfd.h" +#include "dap_chain_net_srv_stream_session.h" @@ -89,6 +91,13 @@ typedef struct dap_chain_net_srv_price struct dap_chain_net_srv_price * prev; } dap_chain_net_srv_price_t; +typedef struct dap_chain_net_srv_grace { + dap_stream_worker_t *stream_worker; + dap_stream_ch_t *ch; + dap_chain_net_srv_usage_t *usage; + dap_timerfd_t *timer; + dap_stream_ch_chain_net_srv_pkt_request_t *request; +} dap_chain_net_srv_grace_t; // Ch pkt types #define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_REQUEST 0x01 diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index d27b45c921..fd2c49a45f 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -1279,6 +1279,8 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_INFO, "Usage inactivation: switch off packet input channel"); dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); + if (l_usage->client) + dap_stream_ch_pkt_write_unsafe( l_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); return; } -- GitLab