From 46cd0b727efdf546fceeb1585d151e88283423c0 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Thu, 5 Nov 2020 19:39:19 +0300 Subject: [PATCH] [*] Grace period completed --- dap-sdk/net/client/dap_client_pvt.c | 2 +- dap-sdk/net/stream/ch/dap_stream_ch_pkt.c | 2 +- dap-sdk/net/stream/stream/dap_stream.c | 2 +- .../dap_stream_ch_chain_net_srv.c | 53 +++++++++---------- modules/net/srv/include/dap_chain_net_srv.h | 1 + .../srv/include/dap_chain_net_srv_common.h | 26 ++++----- .../dap_chain_net_srv_stream_session.h | 1 - modules/service/vpn/dap_chain_net_srv_vpn.c | 2 +- 8 files changed, 39 insertions(+), 50 deletions(-) diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index b415e0f492..b423cc66a0 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -213,7 +213,7 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt) } a_client_pvt->stage_status = STAGE_STATUS_DONE; s_stage_status_after(a_client_pvt); - return; + return false; } switch (l_stage) { case STAGE_ENC_INIT: { diff --git a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c index ab46209234..159082f39c 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c @@ -200,7 +200,7 @@ size_t dap_stream_ch_pkt_write_unsafe(dap_stream_ch_t * a_ch, uint8_t a_type, c size_t l_ret=dap_stream_pkt_write_unsafe(a_ch->stream,l_buf_selected,a_data_size+sizeof(l_hdr)); a_ch->stat.bytes_write+=a_data_size; - a_ch->ready_to_write=true; + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); if(l_buf_allocated) DAP_DELETE(l_buf_allocated); diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index f22eb0c58c..bd55d25be3 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -75,7 +75,7 @@ static void s_http_client_delete(dap_http_client_t * a_esocket, void * a_arg); static dap_stream_t *s_stream_keepalive_list = NULL; static pthread_mutex_t s_mutex_keepalive_list; -static void s_keepalive_cb( void ); +static bool s_keepalive_cb( void ); static bool s_dump_packet_headers = false; diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c index 5191029761..330f3ddcbd 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c @@ -142,11 +142,13 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) dap_stream_ch_chain_net_srv_pkt_error_t l_err; memset(&l_err,0,sizeof (l_err)); dap_chain_net_srv_t * l_srv = NULL; + dap_stream_ch_t *l_ch = a_grace->ch; - if (!dap_stream_ch_check_unsafe(a_grace->worker, a_grace->ch)) + if (!dap_stream_ch_check_unsafe(a_grace->stream_worker, l_ch)) goto free_exit; - dap_chain_net_srv_stream_session_t *l_srv_session = a_ch && a_ch->stream && a_ch->stream->session ? - (dap_chain_net_srv_stream_session_t *)a_ch->stream->session->_inheritor : NULL; + + dap_chain_net_srv_stream_session_t *l_srv_session = l_ch && l_ch->stream && l_ch->stream->session ? + (dap_chain_net_srv_stream_session_t *)l_ch->stream->session->_inheritor : NULL; if (!l_srv_session) goto free_exit; @@ -221,9 +223,9 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) // Create one client l_usage->client = DAP_NEW_Z( dap_chain_net_srv_client_t); - l_usage->client->stream_worker = a_ch->stream_worker; - l_usage->client->ch = a_ch; - l_usage->client->session_id = a_ch->stream->session->id; + l_usage->client->stream_worker = l_ch->stream_worker; + l_usage->client->ch = l_ch; + l_usage->client->session_id = l_ch->stream->session->id; l_usage->client->ts_created = time(NULL); l_usage->tx_cond = l_tx; memcpy(&l_usage->tx_cond_hash, &l_request->hdr.tx_cond,sizeof (l_usage->tx_cond_hash)); @@ -258,7 +260,7 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) } } int ret; - if ( (ret= l_srv->callback_requested(l_srv,l_usage->id, l_usage->client, l_request, l_ch_pkt->hdr.size ) )!= 0 ){ + if ((ret = l_srv->callback_requested(l_srv, l_usage->id, l_usage->client, l_request, a_grace->request_size)) != 0) { log_it( L_WARNING, "Request canceled by service callback, return code %d", ret); l_err.code = (uint32_t) ret ; goto free_exit; @@ -269,8 +271,7 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) l_usage->price = l_price; // TODO extend callback to pass ext and ext size from service callbacks l_receipt = dap_chain_net_srv_issue_receipt( l_usage->service, l_usage, l_usage->price,NULL,0 ); - dap_stream_ch_pkt_write_unsafe( a_ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST , - l_receipt, l_receipt->size); + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST, l_receipt, l_receipt->size); }else{ l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_PRICE_NOT_FOUND ; goto free_exit; @@ -285,12 +286,7 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) l_success->hdr.usage_id = l_usage->id; l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; - - if (dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, - l_success, l_success_size)) { - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } - + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, l_success, l_success_size); if ( l_usage->service->callback_response_success ) l_usage->service->callback_response_success ( l_usage->service, l_usage->id, l_usage->client, NULL, 0 ); DAP_DELETE(l_success); @@ -298,7 +294,8 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) if (l_grace_start) { l_usage->is_grace = true; a_grace->usage = l_usage; - dap_timerfd_start_on_worker(a_grace->stream_worker->worker, l_srv->grace_period * 1000, s_grace_period_control, a_grace); + dap_timerfd_start_on_worker(a_grace->stream_worker->worker, l_srv->grace_period * 1000, + (dap_timerfd_callback_t)s_grace_period_control, a_grace); return false; } else { DAP_DELETE(a_grace->request); @@ -307,7 +304,7 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) } free_exit: if (l_err.code) { - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); if (l_srv && l_srv->callback_response_error) l_srv->callback_response_error(l_srv, 0, NULL, &l_err, sizeof(l_err)); } @@ -320,7 +317,7 @@ free_exit: pthread_mutex_lock(&l_srv_session->parent->mutex); HASH_ADD(hh, l_srv_session->ban_list, client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); pthread_mutex_unlock(&l_srv_session->parent->mutex); - dap_timerfd_start(l_srv->grace_period * 10000, s_unban_client, l_item); + dap_timerfd_start(l_srv->grace_period * 10000, (dap_timerfd_callback_t)s_unban_client, l_item); } else if (l_usage) dap_chain_net_srv_usage_delete(l_srv_session, l_usage); @@ -386,9 +383,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) l_request_out->send_time2.tv_usec = l_tval.tv_usec; // send response - if(dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE, l_request_out, l_request_out->data_size + sizeof(dap_stream_ch_chain_net_srv_pkt_test_t))) { - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE, l_request_out, + l_request_out->data_size + sizeof(dap_stream_ch_chain_net_srv_pkt_test_t)); DAP_DELETE(l_request_out); } break; @@ -419,6 +415,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) // Parse the request l_grace->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, l_ch_pkt->hdr.size); memcpy(l_grace->request, l_ch_pkt->data, l_ch_pkt->hdr.size); + l_grace->request_size = l_ch_pkt->hdr.size; l_grace->ch = a_ch; l_grace->stream_worker = a_ch->stream_worker; s_grace_period_control(l_grace); @@ -442,10 +439,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) if(l_srv && l_srv->callback_client_sign_request) { // Sign receipt l_srv->callback_client_sign_request(l_srv, 0, NULL, &l_receipt_new, l_receipt_size); - if(dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE, - l_receipt_new, l_receipt_new->size)) { - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE, + l_receipt_new, l_receipt_new->size); } DAP_DELETE(l_receipt_new); // TODO sign smth @@ -490,6 +485,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) l_err.net_id.uint64 = l_usage->net->pub.id.uint64; l_err.srv_uid.uint64 = l_usage->service->uid.uint64; + dap_chain_tx_out_cond_t *l_tx_out_cond; if (!l_usage->is_grace) { if (! l_usage->tx_cond ){ log_it(L_WARNING, "No tx out in usage"); @@ -501,9 +497,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) break; } int l_tx_out_cond_size =0; - dap_chain_tx_out_cond_t *l_tx_out_cond = (dap_chain_tx_out_cond_t *) - dap_chain_datum_tx_item_get(l_usage->tx_cond, NULL, TX_ITEM_TYPE_OUT_COND, &l_tx_out_cond_size ); - + l_tx_out_cond = (dap_chain_tx_out_cond_t *)dap_chain_datum_tx_item_get(l_usage->tx_cond, NULL, + TX_ITEM_TYPE_OUT_COND, &l_tx_out_cond_size ); if ( ! l_tx_out_cond ){ // No conditioned output l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); @@ -538,7 +533,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) break; } } else { - if( memcmp ( l_pkey_hash.raw, l_tx_out_cond->subtype.srv_pay.pkey_hash.raw , sizeof(l_pkey_hash) ) != 0 ){ + if (memcmp(l_usage->client_pkey_hash.raw, l_tx_out_cond->subtype.srv_pay.pkey_hash.raw, sizeof(l_usage->client_pkey_hash)) != 0) { l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_WRONG_PKEY_HASH ; dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); if (l_usage->service->callback_response_error) diff --git a/modules/net/srv/include/dap_chain_net_srv.h b/modules/net/srv/include/dap_chain_net_srv.h index 873367548f..a38b08f77b 100755 --- a/modules/net/srv/include/dap_chain_net_srv.h +++ b/modules/net/srv/include/dap_chain_net_srv.h @@ -41,6 +41,7 @@ typedef struct dap_chain_net_srv dap_chain_net_srv_uid_t uid; // Unique ID for service. dap_chain_net_srv_abstract_t srv_common; dap_chain_net_srv_price_t *pricelist; + uint32_t grace_period; dap_chain_callback_trafic_t callback_trafic; // Request for usage diff --git a/modules/net/srv/include/dap_chain_net_srv_common.h b/modules/net/srv/include/dap_chain_net_srv_common.h index e50fbe2984..d9bceb4ca0 100755 --- a/modules/net/srv/include/dap_chain_net_srv_common.h +++ b/modules/net/srv/include/dap_chain_net_srv_common.h @@ -33,13 +33,7 @@ #include "dap_chain_ledger.h" #include "dap_chain_net.h" #include "dap_chain_wallet.h" -#include "dap_timerfd.h" -#include "dap_chain_net_srv_stream_session.h" - - - -//Units of service - +//#include "dap_chain_net_srv_stream_session.h" //Service direction @@ -50,8 +44,6 @@ typedef enum dap_chain_net_srv_order_direction{ } dap_chain_net_srv_order_direction_t; - - typedef struct dap_chain_net_srv_abstract { uint8_t class; //Class of service (once or permanent) @@ -91,13 +83,6 @@ typedef struct dap_chain_net_srv_price struct dap_chain_net_srv_price * prev; } dap_chain_net_srv_price_t; -typedef struct dap_chain_net_srv_grace { - dap_stream_worker_t *stream_worker; - dap_stream_ch_t *ch; - dap_chain_net_srv_usage_t *usage; - dap_stream_ch_chain_net_srv_pkt_request_t *request; -} dap_chain_net_srv_grace_t; - // Ch pkt types #define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_REQUEST 0x01 #define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST 0x10 @@ -195,6 +180,15 @@ typedef struct dap_stream_ch_chain_net_srv_pkt_test{ uint8_t data[]; } DAP_ALIGN_PACKED dap_stream_ch_chain_net_srv_pkt_test_t; +typedef struct dap_chain_net_srv_usage dap_chain_net_srv_usage_t; + +typedef struct dap_chain_net_srv_grace { + dap_stream_worker_t *stream_worker; + dap_stream_ch_t *ch; + dap_chain_net_srv_usage_t *usage; + dap_stream_ch_chain_net_srv_pkt_request_t *request; + size_t request_size; +} dap_chain_net_srv_grace_t; DAP_STATIC_INLINE const char * dap_chain_net_srv_price_unit_uid_to_str( dap_chain_net_srv_price_unit_uid_t a_uid ) { diff --git a/modules/net/srv/include/dap_chain_net_srv_stream_session.h b/modules/net/srv/include/dap_chain_net_srv_stream_session.h index 57d64d5f1b..bc9e593c4c 100644 --- a/modules/net/srv/include/dap_chain_net_srv_stream_session.h +++ b/modules/net/srv/include/dap_chain_net_srv_stream_session.h @@ -59,7 +59,6 @@ typedef struct dap_chain_net_srv_usage{ bool is_active; bool is_free; bool is_grace; - uint32_t grace_period; UT_hash_handle hh; // } dap_chain_net_srv_usage_t; diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index f1a5a3e945..3abf8077b8 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -671,7 +671,7 @@ int s_vpn_service_create(dap_config_t * g_config){ l_srv_vpn->parent = l_srv; // Read if we need to dump all pkt operations - s_debug_more= dap_config_get_item_int_default(g_config,"srv_vpn", "debug_more",false); + s_debug_more= dap_config_get_item_bool_default(g_config,"srv_vpn", "debug_more",false); l_srv->grace_period = dap_config_get_item_uint32_default(g_config, "srv_vpn", "grace_period", 60); //! IMPORTANT ! This fetch is single-action and cannot be further reused, since it modifies the stored config data -- GitLab