From 8ee9dcebee3620101207161b105f1648479d3277 Mon Sep 17 00:00:00 2001 From: "daniil.frolov" <daniil.frolov@demlabs.net> Date: Mon, 14 Aug 2023 18:43:41 +0700 Subject: [PATCH] ... --- .../dap_stream_ch_chain_net_srv.c | 133 +++++++++++++++++- modules/net/srv/include/dap_chain_net_srv.h | 13 +- modules/service/vpn/dap_chain_net_srv_vpn.c | 88 +++++++++++- 3 files changed, 225 insertions(+), 9 deletions(-) diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c index aab1161c33..df747dbef4 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c @@ -42,6 +42,7 @@ along with any CellFrame SDK based project. If not, see <http://www.gnu.org/lic #include "dap_stream_ch_chain_net_srv.h" #define LOG_TAG "dap_stream_ch_chain_net_srv" +#define SRV_PAY_GDB_GROUP "local.srv_pay" typedef struct usages_in_grace{ dap_hash_fast_t tx_cond_hash; @@ -49,6 +50,8 @@ typedef struct usages_in_grace{ UT_hash_handle hh; } usages_in_grace_t; + + static void s_stream_ch_new(dap_stream_ch_t* ch , void* arg); static void s_stream_ch_delete(dap_stream_ch_t* ch , void* arg); static void s_stream_ch_packet_in(dap_stream_ch_t* ch , void* arg); @@ -178,9 +181,15 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch , void* a_arg) (void) a_ch; (void) a_arg; log_it(L_DEBUG, "Stream ch chain net srv delete"); - dap_chain_net_srv_call_closed_all( a_ch); + + dap_chain_net_srv_stream_session_t * l_srv_session = (dap_chain_net_srv_stream_session_t *) a_ch->stream->session->_inheritor; + dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_srv_session->usage_active->service->uid); + + l_srv->callbacks.save_remain_service(l_srv, l_srv_session->usage_active->id, l_srv_session->usage_active->client); + + dap_chain_net_srv_call_closed_all(a_ch); if (a_ch->stream->session && a_ch->stream->session->_inheritor) - dap_chain_net_srv_stream_session_delete( a_ch->stream->session ); + dap_chain_net_srv_stream_session_delete(a_ch->stream->session ); DAP_DEL_Z(a_ch->internal); } @@ -418,7 +427,7 @@ static void s_grace_period_start(dap_chain_net_srv_grace_t *a_grace) (dap_timerfd_callback_t)s_grace_period_finish, l_item)->esocket_uuid; - } else { // Start srvice in normal pay mode + } else { // Start service in normal pay mode a_grace->usage->tx_cond = l_tx; l_tx_out_cond = dap_chain_datum_tx_out_cond_get(l_tx, DAP_CHAIN_TX_OUT_COND_SUBTYPE_SRV_PAY, NULL ); @@ -487,6 +496,63 @@ static void s_grace_period_start(dap_chain_net_srv_grace_t *a_grace) return; } + dap_stream_ch_chain_net_srv_remain_service_store_t* l_remain_service = NULL; + l_remain_service = a_grace->usage->service->callbacks.get_remain_service(a_grace->usage->service, a_grace->usage->id, a_grace->usage->client); + if (l_remain_service && !a_grace->usage->is_active && + l_remain_service->remain_units && + l_remain_service->remain_units_type.enm == l_tx_out_cond->subtype.srv_pay.unit.enm){ + // Accept connection, set limits and start service + char *l_unit_type_str = NULL; + switch(l_remain_service->remain_units_type.enm){ + case SERV_UNIT_SEC: + l_unit_type_str = dap_strdup_printf( "SEC"); + break; + case SERV_UNIT_DAY: + l_unit_type_str = dap_strdup_printf( "DAY"); + break; + case SERV_UNIT_MB: + l_unit_type_str = dap_strdup_printf( "MB"); + break; + case SERV_UNIT_KB: + l_unit_type_str = dap_strdup_printf( "KB"); + break; + case SERV_UNIT_B: + l_unit_type_str = dap_strdup_printf( "B"); + break; + } + log_it(L_INFO, "User has %d %s remain service. Start service without paying.", l_remain_service->remain_units, l_unit_type_str); + DAP_DELETE(l_unit_type_str); + + size_t l_success_size = sizeof (dap_stream_ch_chain_net_srv_pkt_success_hdr_t ); + dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, + l_success_size); + if(!l_success) { + log_it(L_ERROR, "Memory allocation error in %s, line %d", __PRETTY_FUNCTION__, __LINE__); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_ALLOC_MEMORY_ERROR; + if(l_ch) + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); + if (a_grace->usage->service && a_grace->usage->service->callbacks.response_error) + a_grace->usage->service->callbacks.response_error(a_grace->usage->service, 0, NULL, &l_err, sizeof(l_err)); + } else { + l_success->hdr.usage_id = a_grace->usage->id; + l_success->hdr.net_id.uint64 = a_grace->usage->net->pub.id.uint64; + l_success->hdr.srv_uid.uint64 = a_grace->usage->service->uid.uint64; + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, l_success, l_success_size); + + // create and fil first receipt + a_grace->usage->receipt = dap_chain_datum_tx_receipt_create( + a_grace->usage->service->uid, l_remain_service->remain_units_type, l_remain_service->remain_units, uint256_0, NULL, NULL); + + if (a_grace->usage->service->callbacks.response_success) + a_grace->usage->service->callbacks.response_success(a_grace->usage->service, a_grace->usage->id, a_grace->usage->client, NULL, 0); + DAP_DELETE(l_success); + } + DAP_DELETE(a_grace->request); + DAP_DELETE(a_grace); + DAP_DELETE(l_remain_service); + return; + } + if (a_grace->usage->receipt_next){ DAP_DEL_Z(a_grace->usage->receipt_next); a_grace->usage->receipt_next = dap_chain_net_srv_issue_receipt(a_grace->usage->service, a_grace->usage->price, NULL, 0); @@ -497,7 +563,6 @@ static void s_grace_period_start(dap_chain_net_srv_grace_t *a_grace) } DAP_DELETE(a_grace->request); DAP_DELETE(a_grace); - } } @@ -617,6 +682,66 @@ static bool s_grace_period_finish(usages_in_grace_t *a_grace_item) RET_WITH_DEL_A_GRACE; } + // get remain units from DB + dap_stream_ch_chain_net_srv_remain_service_store_t* l_remain_service = NULL; + l_remain_service = l_grace->usage->service->callbacks.get_remain_service(l_grace->usage->service, l_grace->usage->id, l_grace->usage->client); + if (l_remain_service && !l_grace->usage->is_active && + l_remain_service->remain_units && + l_remain_service->remain_units_type.enm == l_tx_out_cond->subtype.srv_pay.unit.enm){ + // Accept connection, set limits and start service + char *l_unit_type_str = NULL; + switch(l_remain_service->remain_units_type.enm){ + case SERV_UNIT_SEC: + l_unit_type_str = dap_strdup_printf( "SEC"); + break; + case SERV_UNIT_DAY: + l_unit_type_str = dap_strdup_printf( "DAY"); + break; + case SERV_UNIT_MB: + l_unit_type_str = dap_strdup_printf( "MB"); + break; + case SERV_UNIT_KB: + l_unit_type_str = dap_strdup_printf( "KB"); + break; + case SERV_UNIT_B: + l_unit_type_str = dap_strdup_printf( "B"); + break; + } + log_it(L_INFO, "User has %d %s remain service. Start service without paying.", l_remain_service->remain_units, l_unit_type_str); + DAP_DELETE(l_unit_type_str); + + size_t l_success_size = sizeof (dap_stream_ch_chain_net_srv_pkt_success_hdr_t ); + dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, + l_success_size); + if(!l_success) { + log_it(L_ERROR, "Memory allocation error in %s, line %d", __PRETTY_FUNCTION__, __LINE__); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_ALLOC_MEMORY_ERROR; + if(l_ch) + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); + if (l_grace->usage->service && l_grace->usage->service->callbacks.response_error) + l_grace->usage->service->callbacks.response_error(l_grace->usage->service, 0, NULL, &l_err, sizeof(l_err)); + } else { + l_success->hdr.usage_id = l_grace->usage->id; + l_success->hdr.net_id.uint64 = l_grace->usage->net->pub.id.uint64; + l_success->hdr.srv_uid.uint64 = l_grace->usage->service->uid.uint64; + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, l_success, l_success_size); + + // create and fil first receipt + l_grace->usage->receipt = dap_chain_datum_tx_receipt_create( + l_grace->usage->service->uid, l_remain_service->remain_units_type, l_remain_service->remain_units, uint256_0, NULL, NULL); + + if (l_grace->usage->service->callbacks.response_success) + l_grace->usage->service->callbacks.response_success(l_grace->usage->service, l_grace->usage->id, l_grace->usage->client, NULL, 0); + DAP_DELETE(l_success); + } + DAP_DELETE(l_grace->request); + DAP_DELETE(l_grace); + DAP_DELETE(l_remain_service); + HASH_DEL(s_grace_table, a_grace_item); + DAP_DELETE(a_grace_item); + return false; + } + // make receipt or tx char *l_receipt_hash_str; dap_chain_datum_tx_receipt_t *l_receipt = NULL; diff --git a/modules/net/srv/include/dap_chain_net_srv.h b/modules/net/srv/include/dap_chain_net_srv.h index bde2977e89..6b8b721b0f 100755 --- a/modules/net/srv/include/dap_chain_net_srv.h +++ b/modules/net/srv/include/dap_chain_net_srv.h @@ -204,10 +204,16 @@ typedef struct dap_chain_net_srv_client_remote struct dap_chain_net_srv_client_remote *next; } dap_chain_net_srv_client_remote_t; +typedef struct { + uint64_t remain_units; + dap_chain_net_srv_price_unit_uid_t remain_units_type; +} dap_stream_ch_chain_net_srv_remain_service_store_t; + typedef int (*dap_chain_net_srv_callback_data_t)(dap_chain_net_srv_t *, uint32_t, dap_chain_net_srv_client_remote_t *, const void *, size_t); typedef void* (*dap_chain_net_srv_callback_custom_data_t)(dap_chain_net_srv_t *, dap_chain_net_srv_usage_t *, const void *, size_t, size_t *); typedef void (*dap_chain_net_srv_callback_ch_t)(dap_chain_net_srv_t *, dap_stream_ch_t *); - +typedef dap_stream_ch_chain_net_srv_remain_service_store_t* (*dap_chain_net_srv_callback_get_remain_srvice_t)(dap_chain_net_srv_t *, uint32_t, dap_chain_net_srv_client_remote_t*); +typedef int (*dap_chain_net_srv_callback_save_remain_srvice_t)(dap_chain_net_srv_t *, uint32_t, dap_chain_net_srv_client_remote_t*); // Process service decree typedef void (*dap_chain_net_srv_callback_decree_t)(dap_chain_net_srv_t* a_srv, dap_chain_net_t* a_net, dap_chain_t* a_chain, dap_chain_datum_decree_t* a_decree, size_t a_decree_size); @@ -231,7 +237,10 @@ typedef struct dap_chain_net_srv_callbacks { dap_chain_net_srv_callback_data_t receipt_next_success; // Custom data processing dap_chain_net_srv_callback_custom_data_t custom_data; - + // Remain service getting drom DB + dap_chain_net_srv_callback_get_remain_srvice_t get_remain_service; + // Remain service saving to DB + dap_chain_net_srv_callback_save_remain_srvice_t save_remain_service; // Decree processing dap_chain_net_srv_callback_decree_t decree; diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index bf8c5871a6..27f10ea385 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -188,8 +188,9 @@ static int s_callback_response_error(dap_chain_net_srv_t * a_srv, uint32_t a_usa static int s_callback_receipt_next_success(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_remote_t * a_srv_client, const void * a_receipt_next, size_t a_receipt_next_size); - - +static dap_stream_ch_chain_net_srv_remain_service_store_t* s_callback_get_remain_service(dap_chain_net_srv_t * a_srv, uint32_t usage_id, + dap_chain_net_srv_client_remote_t * a_srv_client); +static int s_callback_save_remain_service(dap_chain_net_srv_t * a_srv, uint32_t usage_id, dap_chain_net_srv_client_remote_t * a_srv_client); // Stream callbacks static void s_ch_vpn_new(dap_stream_ch_t* ch, void* arg); static void s_ch_vpn_delete(dap_stream_ch_t* ch, void* arg); @@ -858,6 +859,9 @@ static int s_vpn_service_create(dap_config_t * g_config) l_srv_callbacks.response_success = s_callback_response_success; l_srv_callbacks.response_error = s_callback_response_error; l_srv_callbacks.receipt_next_success = s_callback_receipt_next_success; + l_srv_callbacks.get_remain_service = s_callback_get_remain_service; + l_srv_callbacks.save_remain_service = s_callback_save_remain_service; + dap_chain_net_srv_t* l_srv = dap_chain_net_srv_add(l_uid, "srv_vpn", &l_srv_callbacks); @@ -1069,7 +1073,86 @@ static int s_callback_response_error(dap_chain_net_srv_t * a_srv, uint32_t a_usa return 0; } +static dap_stream_ch_chain_net_srv_remain_service_store_t* s_callback_get_remain_service(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, + dap_chain_net_srv_client_remote_t * a_srv_client) +{ + UNUSED(a_srv); + dap_chain_net_srv_stream_session_t * l_srv_session = a_srv_client && a_srv_client->ch && a_srv_client->ch->stream && a_srv_client->ch->stream->session ? + (dap_chain_net_srv_stream_session_t *) a_srv_client->ch->stream->session->_inheritor : NULL; + + if (!l_srv_session){ + log_it(L_DEBUG, "Can't find srv session"); + return NULL; + } + dap_chain_net_srv_usage_t* l_usage = dap_chain_net_srv_usage_find_unsafe(l_srv_session, a_usage_id); + if (!l_usage){ + log_it(L_DEBUG, "Can't find usage."); + return NULL; + } + dap_chain_net_t *l_net = l_usage->net; + + // get remain units from DB + char *l_remain_limits_gdb_group = dap_strdup_printf( "local.srv_pay.%s.vpn_srv.remain_limits", l_net->pub.name); + char *l_user_key = dap_chain_hash_fast_to_str_new(&l_usage->client_pkey_hash); + log_it(L_DEBUG, "Checkout user %s in group %s", l_user_key, l_remain_limits_gdb_group); + dap_stream_ch_chain_net_srv_remain_service_store_t* l_remain_service = NULL; + size_t l_remain_service_size = 0; + l_remain_service = (dap_stream_ch_chain_net_srv_remain_service_store_t*) dap_global_db_get_sync(l_remain_limits_gdb_group, l_user_key, &l_remain_service_size, NULL, NULL); + DAP_DELETE(l_remain_limits_gdb_group); + DAP_DELETE(l_user_key); + return l_remain_service; +} + +static int s_callback_save_remain_service(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, + dap_chain_net_srv_client_remote_t * a_srv_client) +{ + UNUSED(a_srv); + dap_chain_net_srv_stream_session_t * l_srv_session = a_srv_client && a_srv_client->ch && a_srv_client->ch->stream && a_srv_client->ch->stream->session ? + (dap_chain_net_srv_stream_session_t *) a_srv_client->ch->stream->session->_inheritor : NULL; + + if (!l_srv_session){ + log_it(L_DEBUG, "Can't find srv session"); + return -100; + } + dap_chain_net_srv_usage_t* l_usage = dap_chain_net_srv_usage_find_unsafe(l_srv_session, a_usage_id); + if (!l_usage){ + log_it(L_DEBUG, "Can't find usage."); + return -101; + } + + dap_chain_net_t *l_net = l_usage->net; + + // get remain units from DB + char *l_remain_limits_gdb_group = dap_strdup_printf( "local.srv_pay.%s.vpn_srv.remain_limits", l_net->pub.name); + char *l_user_key = dap_chain_hash_fast_to_str_new(&l_usage->client_pkey_hash); + log_it(L_DEBUG, "Checkout user %s in group %s", l_user_key, l_remain_limits_gdb_group); + + dap_stream_ch_chain_net_srv_remain_service_store_t l_remain_service = {}; + + l_remain_service.remain_units_type.enm = l_srv_session->limits_units_type.enm; + switch(l_remain_service.remain_units_type.enm){ + case SERV_UNIT_SEC: + case SERV_UNIT_DAY: + l_remain_service.remain_units = l_srv_session->limits_ts; + break; + case SERV_UNIT_MB: + case SERV_UNIT_KB: + case SERV_UNIT_B: + l_remain_service.remain_units = l_srv_session->limits_bytes; + break; + } + + if(dap_global_db_set_sync(l_remain_limits_gdb_group, l_user_key, &l_remain_service, sizeof(l_remain_service), false)) + { + DAP_DELETE(l_remain_limits_gdb_group); + DAP_DELETE(l_user_key); + return -102; + } + DAP_DELETE(l_remain_limits_gdb_group); + DAP_DELETE(l_user_key); + return 0; +} static void s_ch_vpn_esocket_assigned(dap_events_socket_t *a_es, dap_worker_t *a_worker) { @@ -1157,7 +1240,6 @@ static void s_ch_vpn_delete(dap_stream_ch_t* a_ch, void* arg) dap_chain_net_srv_ch_vpn_t * l_ch_vpn = CH_VPN(a_ch); dap_chain_net_srv_vpn_t * l_srv_vpn =(dap_chain_net_srv_vpn_t *) l_ch_vpn->net_srv->_internal; - // So complicated to update usage client to be sure that nothing breaks it usage_client_t * l_usage_client = NULL; -- GitLab