diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c index 8666556da7f0251cc56eef0d950f4ab776b3dfa4..e817a9feb7b487a8f4ee42863593bf7fcc3e040f 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c @@ -106,9 +106,9 @@ static bool s_grace_period_finish(dap_chain_net_srv_grace_usage_t *a_grace); static void s_set_usage_data_to_gdb(const dap_chain_net_srv_usage_t *a_usage); static uint256_t s_calc_datoshi(const dap_chain_net_srv_usage_t *a_usage, uint256_t *a_prev); +static int s_check_tx_params(dap_chain_net_srv_usage_t *a_usage); /** Service states handlers **/ -static void s_service_state_go_to_remain_service(dap_chain_net_srv_usage_t *a_usage); static void s_service_state_go_to_grace(dap_chain_net_srv_usage_t *a_usage); static void s_service_state_go_to_normal(dap_chain_net_srv_usage_t *a_usage); static void s_service_state_go_to_error(dap_chain_net_srv_usage_t *a_usage); @@ -116,11 +116,10 @@ static void s_service_state_go_to_error(dap_chain_net_srv_usage_t *a_usage); /** Service substates handlers **/ static void s_service_substate_go_to_normal(dap_chain_net_srv_usage_t *a_usage); -static void s_service_substate_request_first_receipt(dap_chain_net_srv_usage_t *a_usage); -static void s_service_substate_request_next_receipt(dap_chain_net_srv_usage_t *a_usage); static void s_service_substate_pay_service(dap_chain_net_srv_usage_t *a_usage); static void s_service_substate_go_to_waiting_prev_tx(dap_chain_net_srv_usage_t *a_usage); static void s_service_substate_go_to_waiting_new_tx(dap_chain_net_srv_usage_t *a_usage); +static void s_service_substate_go_to_error(dap_chain_net_srv_usage_t *a_usage); /********************************/ @@ -291,9 +290,16 @@ static bool s_receipt_timeout_handler(dap_chain_net_srv_usage_t *a_usage) log_it(L_WARNING, "Receipt signing by client max attempt is reached!"); a_usage->receipt_sign_req_cnt = 0; - // TODO go to error if first receipt without sign and normal if next receipt sign error - a_usage->is_waiting_first_receipt_sign = false; - a_usage->is_waiting_next_receipt_sign = false; + // TODO go to error state if first receipt without sign and normal state and error substate if next receipt sign error + if (a_usage->service_substate == DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_WAITING_FIRST_RECEIPT_SIGN){ + // TODO go to error state and switch off + a_usage->last_err_code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_NO_SIGN; + s_service_state_go_to_error(a_usage); + } else if (a_usage->service_substate == DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_WAITING_NEXT_RECEIPT_SIGN){ + // TODO go to error substate + a_usage->last_err_code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_NO_SIGN; + s_service_substate_go_to_error(a_usage); + } return false; } @@ -533,12 +539,63 @@ static bool s_service_start(dap_stream_ch_t *a_ch , dap_stream_ch_chain_net_srv_ l_remain_service = l_usage->service->callbacks.get_remain_service(l_usage->service, l_usage->id, l_usage->client); if (l_remain_service && ((l_remain_service->limits_ts && l_usage->price->units_uid.enm == SERV_UNIT_SEC) || (l_remain_service->limits_bytes && l_usage->price->units_uid.enm == SERV_UNIT_B))){ - s_go_to_service_with_remain(l_usage); + dap_chain_net_srv_stream_session_t * l_srv_session = (dap_chain_net_srv_stream_session_t *) l_usage->client->ch->stream->session->_inheritor; + switch(l_usage->price->units_uid.enm){ + case SERV_UNIT_SEC: + l_srv_session->limits_ts = l_remain_service->limits_ts; + break; + case SERV_UNIT_B: + l_srv_session->limits_bytes = l_remain_service->limits_bytes; + break; + } + log_it(L_INFO, "User %s has %ld %s remain service. Start service without paying.", dap_chain_hash_fast_to_str_static(&l_usage->client_pkey_hash), + l_remain_service->limits_ts ? l_remain_service->limits_ts : l_remain_service->limits_bytes, + dap_chain_srv_unit_enum_to_str(l_usage->price->units_uid.enm)); + DAP_DELETE(l_user_key); + size_t l_success_size = sizeof (dap_stream_ch_chain_net_srv_pkt_success_hdr_t ); + dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, + l_success_size); + if(!l_success) { + log_it(L_ERROR, "Memory allocation error in %s, line %d", __PRETTY_FUNCTION__, __LINE__); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_ALLOC_MEMORY_ERROR; + if(a_ch) + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); + if (l_usage->service && l_usage->service->callbacks.response_error) + l_usage->service->callbacks.response_error(l_usage->service, 0, NULL, &l_err, sizeof(l_err)); + + // TODO go to state error + } else { + l_success->hdr.usage_id = l_usage->id; + l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; + l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, l_success, l_success_size); + + // create and fill first receipt + l_usage->receipt = dap_chain_datum_tx_receipt_create( + l_usage->service->uid, l_price->units_uid, l_price->units, l_price->value_datoshi, NULL, 0); + if (l_usage->service->callbacks.response_success) + l_usage->service->callbacks.response_success(l_usage->service, l_usage->id, l_usage->client, l_usage->receipt, l_usage->receipt->size); + DAP_DELETE(l_success); + } + + l_usage->service_state = DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_NORMAL; + l_usage->service_substate = DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_NORMAL; } else { // Check tx dap_chain_datum_tx_t *l_tx = dap_ledger_tx_find_by_hash(l_net->pub.ledger, &l_usage->tx_cond_hash); if (!l_tx){ // Start grace + dap_chain_net_srv_banlist_item_t *l_item = NULL; + pthread_mutex_lock(&l_srv->banlist_mutex); + HASH_FIND(hh, l_srv->ban_list, &l_usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); + pthread_mutex_unlock(&l_srv->banlist_mutex); + if (l_item) { // client banned + log_it(L_DEBUG, "Client %s is banned!", dap_chain_hash_fast_to_str_static(&l_usage->client_pkey_hash)); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_BANNED_PKEY_HASH; + // s_grace_error(a_grace, l_err); + // TODO go to error state + return false; + } s_service_state_go_to_grace(l_usage); } else { // Start normal mode @@ -1012,12 +1069,12 @@ static bool s_grace_period_finish(dap_chain_net_srv_grace_usage_t *a_grace_item) log_it(L_INFO, "Can't find new tx cond in ledger!"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND; } else { - log_it(L_INFO, "Can't find tx in ledger! Remove receipt and wait for en of service."); + log_it(L_INFO, "Can't find tx in ledger! Remove receipt and wait for end of service."); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND; } DAP_DELETE(l_grace); DAP_DELETE(a_grace_item); - // TODO go to error and wait till service end + // TODO go to error substate and wait till service end } break; } @@ -1422,6 +1479,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) if (! l_usage ){ log_it(L_WARNING, "No usage in srv."); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND; + // TODO go to error substate dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); if (l_usage && l_usage->service && l_usage->service->callbacks.response_error) l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); @@ -1443,6 +1501,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) if (dap_chain_net_get_state(l_usage->net) == NET_STATE_OFFLINE) { log_it(L_ERROR, "Can't pay service because net %s is offline.", l_usage->net->pub.name); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_IS_OFFLINE; + // TODO go to error substate return false; } @@ -1473,6 +1532,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) if ( !l_is_found || ! l_usage ){ log_it(L_WARNING, "Can't find receipt in usages thats equal to response receipt"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND; + // TODO go to error substate dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); if (l_usage && l_usage->service && l_usage->service->callbacks.response_error) l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); @@ -1486,6 +1546,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) if ( ! l_usage->tx_cond ){ log_it(L_WARNING, "No tx cond in usage"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND ; + // TODO go to error substate dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); if (l_usage->service->callbacks.response_error) l_usage->service->callbacks.response_error( l_usage->service, l_usage->id, l_usage->client, @@ -1495,6 +1556,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) l_tx_out_cond = dap_chain_datum_tx_out_cond_get(l_usage->tx_cond, DAP_CHAIN_TX_OUT_COND_SUBTYPE_SRV_PAY, NULL ); if ( ! l_tx_out_cond ){ // No condition output l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; + // TODO go to error substate dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); if (l_usage->service->callbacks.response_error) l_usage->service->callbacks.response_error( l_usage->service, l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); @@ -1504,6 +1566,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) dap_sign_t * l_receipt_sign = dap_chain_datum_tx_receipt_sign_get( l_receipt, l_receipt_size, 1); if ( ! l_receipt_sign ){ l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_NO_SIGN ; + // TODO go to error substate dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); if (l_usage->service->callbacks.response_error) l_usage->service->callbacks.response_error( l_usage->service, l_usage->id, l_usage->client, @@ -1514,6 +1577,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_receipt->receipt_info.srv_uid); if (memcmp(l_usage->client_pkey_hash.raw, l_tx_out_cond->subtype.srv_pay.pkey_hash.raw, sizeof(l_usage->client_pkey_hash)) != 0) { l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_WRONG_PKEY_HASH ; + // TODO go to error substate dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); if (l_usage->service->callbacks.response_error) l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); @@ -1527,6 +1591,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) if (!l_usage->receipt) { log_it(L_CRITICAL, "%s", c_error_memory_alloc); break; + // TODO go to error substate } } else if (l_usage->receipt_next && l_usage->service_substate == DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_WAITING_NEXT_RECEIPT_SIGN) { DAP_DELETE(l_usage->receipt_next); @@ -1534,11 +1599,10 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) if (!l_usage->receipt_next) { log_it(L_CRITICAL, "%s", c_error_memory_alloc); break; + // TODO go to error substate } } - - // TODO try to pay receipt s_service_substate_pay_service(l_usage); size_t l_success_size; l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t) + DAP_CHAIN_HASH_FAST_STR_SIZE; @@ -1580,13 +1644,13 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) if( l_usage->service->callbacks.response_success(l_usage->service,l_usage->id, l_usage->client, l_receipt, l_receipt_size ) !=0 ){ log_it(L_NOTICE, "No success by service success callback, inactivating service usage"); - //TODO go to error + // TODO go to error substate } } else if (l_usage->service->callbacks.receipt_next_success) { if (l_usage->service->callbacks.receipt_next_success(l_usage->service, l_usage->id, l_usage->client, l_receipt, l_receipt_size ) != 0 ){ log_it(L_NOTICE, "No success by service receipt_next callback, inactivating service usage"); - //TODO go to error + // TODO go to error substate } } // TODO go to normal substate @@ -1657,9 +1721,6 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) case DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_GRACE:{ // TODO go to error and switch off } break; - case DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_REMAIN_LIMITS:{ - // TODO go to error and wait till service end - } break; case DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_NORMAL:{ if (l_usage->service_substate == DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_WAITING_NEW_TX_FROM_CLIENT){ dap_stream_ch_chain_net_srv_pkt_error_t l_err = { }; @@ -1680,33 +1741,8 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) break; } - dap_chain_datum_tx_t *l_tx = dap_ledger_tx_find_by_hash(l_usage->net->pub.ledger, &l_responce->hdr.tx_cond); - if (l_tx){ - // Replace - if (l_curr_grace_item){ - log_it(L_INFO, "Found tx in ledger by net tx responce handler. Finish waiting new tx grace period."); - // Stop timer - dap_timerfd_delete_mt(l_curr_grace_item->grace->timer->worker, l_curr_grace_item->grace->timer->esocket_uuid); - // finish grace - l_usage->tx_cond_hash = l_responce->hdr.tx_cond; - // s_grace_period_finish(l_curr_grace_item); - //TODO try to pay service - s_service_substate_pay_service(l_usage); - } - }else{ - if (l_curr_grace_item){ - log_it(L_INFO, "Can't find tx in ledger. Waiting..."); - l_usage->service_substate == DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_WAITING_NEW_TX_IN_LEDGER; - l_curr_grace_item->grace->usage->tx_cond_hash = l_responce->hdr.tx_cond; - pthread_mutex_lock(&l_srv->grace_mutex); - HASH_DEL(l_srv->grace_hash_tab, l_curr_grace_item); - l_curr_grace_item->tx_cond_hash = l_responce->hdr.tx_cond; - HASH_ADD(hh, l_srv->grace_hash_tab, tx_cond_hash, sizeof(dap_hash_fast_t), l_curr_grace_item); - pthread_mutex_unlock(&l_srv->grace_mutex); - } - } - - + s_service_substate_pay_service(l_usage); + size_t l_success_size = sizeof (dap_stream_ch_chain_net_srv_pkt_success_hdr_t ); dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, l_success_size); @@ -1782,72 +1818,6 @@ static int s_pay_cervice(dap_chain_net_srv_usage_t *a_usage, dap_chain_datum_tx_ } } - -static void s_service_state_go_to_remain_service(dap_chain_net_srv_usage_t *a_usage) -{ - if (!a_usage) - return; - - a_usage->service_state = DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_REMAIN_LIMITS; - a_usage->service_substate = DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_IDLE; - - // Start with remain limit - // Accept connection, set limits and start service - dap_chain_net_srv_stream_session_t * l_srv_session = (dap_chain_net_srv_stream_session_t *) l_usage->client->ch->stream->session->_inheritor; - switch(l_tx_out_cond->subtype.srv_pay.unit.enm){ - case SERV_UNIT_SEC: - l_srv_session->limits_ts = l_remain_service->limits_ts; - break; - case SERV_UNIT_B: - l_srv_session->limits_bytes = l_remain_service->limits_bytes; - break; - } - char *l_user_key = dap_chain_hash_fast_to_str_new(&a_grace->usage->client_pkey_hash); - log_it(L_INFO, "User %s has %ld %s remain service. Start service without paying.", l_user_key, - l_remain_service->limits_ts ? l_remain_service->limits_ts : l_remain_service->limits_bytes, - dap_chain_srv_unit_enum_to_str(l_tx_out_cond->subtype.srv_pay.unit.enm)); - DAP_DELETE(l_user_key); - size_t l_success_size = sizeof (dap_stream_ch_chain_net_srv_pkt_success_hdr_t ); - dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, - l_success_size); - if(!l_success) { - log_it(L_ERROR, "Memory allocation error in %s, line %d", __PRETTY_FUNCTION__, __LINE__); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_ALLOC_MEMORY_ERROR; - if(a_ch) - dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); - if (a_grace->usage->service && a_grace->usage->service->callbacks.response_error) - a_grace->usage->service->callbacks.response_error(a_grace->usage->service, 0, NULL, &l_err, sizeof(l_err)); - - // TODO go to state error - } else { - l_success->hdr.usage_id = a_grace->usage->id; - l_success->hdr.net_id.uint64 = a_grace->usage->net->pub.id.uint64; - l_success->hdr.srv_uid.uint64 = a_grace->usage->service->uid.uint64; - dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, l_success, l_success_size); - - // Restore last receipt from DB - - // create and fill first receipt - // - // a_grace->usage->receipt = dap_chain_datum_tx_receipt_create( - // a_grace->usage->service->uid, l_price->units_uid, l_price->units, l_price->value_datoshi, NULL, 0); - // if (a_grace->usage->service->callbacks.response_success) - // a_grace->usage->service->callbacks.response_success(a_grace->usage->service, a_grace->usage->id, - // a_grace->usage->client, a_grace->usage->receipt, - // sizeof(dap_chain_datum_tx_receipt_t) + a_grace->usage->receipt->size + a_grace->usage->receipt->exts_size); - // DAP_DELETE(l_success); - } - - - // TODO go to substate normal - - DAP_DELETE(a_grace->request); - DAP_DELETE(a_grace); - DAP_DELETE(l_remain_service); - return false; - -} - static void s_service_state_go_to_grace(dap_chain_net_srv_usage_t *a_usage) { if (!a_usage) @@ -1906,50 +1876,20 @@ static void s_service_state_go_to_normal(dap_chain_net_srv_usage_t *a_usage) a_usage->service_substate = DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_IDLE; dap_stream_ch_t *l_ch = a_usage->client->ch; - dap_chain_net_srv_t *l_srv = dap_chain_net_srv_get( a_usage->service->uid); - dap_stream_ch_chain_net_srv_pkt_error_t l_err; - dap_chain_net_t *l_net = a_usage->net; - dap_chain_datum_tx_t *l_tx = a_usage->tx_cond; - dap_chain_tx_out_cond_t *l_tx_out_cond = dap_chain_datum_tx_out_cond_get(l_tx, DAP_CHAIN_TX_OUT_COND_SUBTYPE_SRV_PAY, NULL ); - - if ( ! l_tx_out_cond ) { // No conditioned output - log_it( L_WARNING, "No conditioned output"); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; - //TODO go to error state - return false; - } - - // Check cond output if it equesl or not to request - if (!dap_chain_net_srv_uid_compare(l_tx_out_cond->header.srv_uid, a_usage->service->uid)) { - log_it( L_WARNING, "Wrong service uid in request, tx expect to close its output with 0x%016"DAP_UINT64_FORMAT_X, - l_tx_out_cond->header.srv_uid.uint64 ); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_WRONG_SRV_UID ; - //TODO go to error state - return false; - } + int l_tx_check_status = s_check_tx_params(a_usage); + if (l_tx_check_status != 0){ + //TODO go to error substate - const char *l_ticker = dap_ledger_tx_get_token_ticker_by_hash(a_usage->net->pub.ledger, &a_usage->tx_cond_hash); - if (!l_ticker) { - char l_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = { '\0' }; - dap_hash_fast_to_str(&a_usage->tx_cond_hash, l_hash_str, sizeof(l_hash_str)); - log_it( L_ERROR, "Token ticker not found for tx cond hash %s", l_hash_str); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_TICKER_ERROR; - //TODO go to error state - return false; + return; } - dap_stpcpy(a_usage->token_ticker, l_ticker); - - char *l_order_hash_str = dap_chain_hash_fast_to_str_new(&a_usage->static_order_hash); - log_it(L_MSG, "Using price from order %s.", l_order_hash_str); - DAP_DELETE(l_order_hash_str); int ret; if ((ret = a_usage->service->callbacks.requested(a_usage->service, a_usage->id, a_usage->client, NULL, 0)) != 0) { log_it( L_WARNING, "Request canceled by service callback, return code %d", ret); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_REQUEST_INTERNAL_ERROR; + a_usage->last_err_code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_REQUEST_INTERNAL_ERROR; //TODO go to error state - return false; + return; } if (a_usage->service_state == DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_IDLE || @@ -1961,15 +1901,6 @@ static void s_service_state_go_to_normal(dap_chain_net_srv_usage_t *a_usage) a_usage->service_state = DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_NORMAL; a_usage->service_substate = DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_WAITING_FIRST_RECEIPT_SIGN; - //start timeout timer - a_usage->receipt_timeout_timer_start_callback(a_usage); - } else if (a_usage->service_state == DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_REMAIN_LIMITS){ - a_usage->receipt_next = dap_chain_net_srv_issue_receipt(a_usage->service, a_usage->price, NULL, 0); - dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST, - a_usage->receipt, a_usage->receipt->size); - a_usage->service_state = DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_NORMAL; - a_usage->service_substate = DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_WAITING_NEXT_RECEIPT_SIGN; - //start timeout timer a_usage->receipt_timeout_timer_start_callback(a_usage); } else { @@ -1982,6 +1913,52 @@ static void s_service_state_go_to_normal(dap_chain_net_srv_usage_t *a_usage) static void s_service_state_go_to_error(dap_chain_net_srv_usage_t *a_usage) { a_usage->is_active = 0; + + dap_stream_ch_t * l_ch = dap_stream_ch_find_by_uuid_unsafe(a_grace->stream_worker, a_grace->ch_uuid); + dap_chain_net_srv_stream_session_t *l_srv_session = l_ch && l_ch->stream && l_ch->stream->session ? + (dap_chain_net_srv_stream_session_t *)l_ch->stream->session->_inheritor : NULL; + + if (!l_srv_session){ + DAP_DEL_Z(a_grace); + return; + } + + if (a_err.code) { + if(l_ch) + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &a_err, sizeof (a_err)); + if (a_grace->usage->service && a_grace->usage->service->callbacks.response_error) + a_grace->usage->service->callbacks.response_error(a_grace->usage->service, 0, NULL, &a_err, sizeof(a_err)); + } + + if (a_grace->usage) { // add client pkey hash to banlist + a_grace->usage->is_active = 0; + if (a_grace->usage->service) { + dap_chain_net_srv_banlist_item_t *l_item = NULL; + pthread_mutex_lock(&a_grace->usage->service->banlist_mutex); + HASH_FIND(hh, a_grace->usage->service->ban_list, &a_grace->usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); + if (l_item) + pthread_mutex_unlock(&a_grace->usage->service->banlist_mutex); + else { + l_item = DAP_NEW_Z(dap_chain_net_srv_banlist_item_t); + if (!l_item) { + log_it(L_CRITICAL, "%s", c_error_memory_alloc); + pthread_mutex_unlock(&a_grace->usage->service->banlist_mutex); + DAP_DEL_Z(a_grace->request); + DAP_DEL_Z(a_grace); + return; + } + log_it(L_DEBUG, "Add client to banlist"); + l_item->client_pkey_hash = a_grace->usage->client_pkey_hash; + l_item->ht_mutex = &a_grace->usage->service->banlist_mutex; + l_item->ht_head = &a_grace->usage->service->ban_list; + HASH_ADD(hh, a_grace->usage->service->ban_list, client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); + pthread_mutex_unlock(&a_grace->usage->service->banlist_mutex); + dap_timerfd_start(a_grace->usage->service->grace_period * 1000, (dap_timerfd_callback_t)s_unban_client, l_item); + } + } + + } else if (l_srv_session->usage_active) + dap_chain_net_srv_usage_delete(l_srv_session); } @@ -1990,6 +1967,75 @@ static void s_service_substate_go_to_normal(dap_chain_net_srv_usage_t *a_usage) a_usage->service_substate = DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_NORMAL; } + +static int s_check_tx_params(dap_chain_net_srv_usage_t *a_usage) +{ + dap_chain_tx_out_cond_t * l_tx_out_cond = NULL; + dap_chain_net_srv_price_t * l_price = NULL; + dap_chain_datum_tx_t *l_tx = a_usage->tx_cond; + + l_tx_out_cond = dap_chain_datum_tx_out_cond_get(l_tx, DAP_CHAIN_TX_OUT_COND_SUBTYPE_SRV_PAY, NULL ); + + if ( ! l_tx_out_cond ) { // No conditioned output + log_it( L_WARNING, "No conditioned output"); + return DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; + } + + // Check cond output if it equesl or not to request + if (!dap_chain_net_srv_uid_compare(l_tx_out_cond->header.srv_uid, a_usage->service->uid)) { + log_it( L_WARNING, "Wrong service uid in request, tx expect to close its output with 0x%016"DAP_UINT64_FORMAT_X, + l_tx_out_cond->header.srv_uid.uint64 ); + return DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_WRONG_SRV_UID ; + } + + const char *l_ticker = dap_ledger_tx_get_token_ticker_by_hash(a_usage->net->pub.ledger, &a_usage->tx_cond_hash); + if (!l_ticker) { + log_it( L_ERROR, "Token ticker not found for tx cond hash %s", dap_chain_hash_fast_to_str_static(&a_usage->tx_cond_hash)); + return DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_TICKER_ERROR; + } + dap_stpcpy(a_usage->token_ticker, l_ticker); + + log_it(L_MSG, "Using price from order %s.", dap_chain_hash_fast_to_str_static(&a_usage->static_order_hash)); + if ((l_price = a_usage->price)){ + if (l_price->net->pub.id.uint64 != a_usage->net->pub.id.uint64){ + log_it( L_WARNING, "Pricelist is not for net %s.", a_usage->net->pub.name); + return DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_ACCEPT_TOKEN; + } + + if (dap_strcmp(l_price->token, l_ticker) != 0){ + log_it( L_WARNING, "Token ticker in the pricelist and tx do not match"); + return DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_ACCEPT_TOKEN; + } + + if (l_price->units_uid.enm != l_tx_out_cond->subtype.srv_pay.unit.enm){ + log_it( L_WARNING, "Unit ID in the pricelist and tx do not match"); + return DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_UNIT_TYPE_ERROR; + } + + uint256_t l_unit_price = {}; + if (l_price->units != 0){ + DIV_256(l_price->value_datoshi, GET_256_FROM_64(l_price->units), &l_unit_price); + } else { + log_it( L_WARNING, "Units in pricelist is zero. "); + return DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_UNITS_ERROR; + } + + if(IS_ZERO_256(l_tx_out_cond->subtype.srv_pay.unit_price_max_datoshi) || + compare256(l_unit_price, l_tx_out_cond->subtype.srv_pay.unit_price_max_datoshi) <= 0){ + } else { + log_it( L_WARNING, "Unit price in pricelist is greater than max allowable."); + return DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_UNITS_ERROR; + } + } else { + log_it( L_WARNING, "Request can't be processed because no acceptable price in pricelist for token %s in network %s", + l_ticker, a_usage->net->pub.name ); + return DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_PRICE_ERROR; + } + + return 0; +} + + static void s_service_substate_pay_service(dap_chain_net_srv_usage_t *a_usage) { dap_stream_ch_chain_net_srv_pkt_error_t l_err; @@ -2000,53 +2046,22 @@ static void s_service_substate_pay_service(dap_chain_net_srv_usage_t *a_usage) } break; - case DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_REMAIN_LIMITS:{ + case DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_NORMAL:{ - int l_ret = s_pay_cervice(a_usage, a_usage->receipt_next); - switch (l_ret){ - case PAY_SERVICE_STATUS_SUCCESS: - // Store last receipt if any problems with transactions - dap_global_db_set(SRV_RECEIPTS_GDB_GROUP, dap_chain_hash_fast_to_str_static(&a_usage->client_pkey_hash), a_usage->receipt_next, , false, NULL, NULL); - break; - case PAY_SERVICE_STATUS_TX_CANT_FIND: - // TX not found in ledger and we not in grace, start grace - log_it(L_ERROR, "Can't find tx cond. Start grace!"); - if (a_usage->service_substate == DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_WAITING_FIRST_RECEIPT_SIGN){ - // TODO go to error - } else { - // TODO go to prev tx waiting substate - s_service_substate_go_to_waiting_prev_tx(a_usage); - } - break; - case PAY_SERVICE_STATUS_NOT_ENOUGH: - log_it(L_ERROR, "Tx cond have not enough funds"); - s_service_substate_go_to_waiting_new_tx(l_usage); - break; - case PAY_SERVICE_STATUS_MEMALLOC_ERROR: - log_it(L_CRITICAL, "%s", c_error_memory_alloc); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_ALLOC_MEMORY_ERROR; - dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); - if (l_usage->service->callbacks.response_error) - l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err)); - return true; - case PAY_SERVICE_STATUS_TX_ERROR: - default: { - log_it(L_ERROR, "Can't create input tx cond transaction!"); - memset(&l_usage->tx_cond_hash, 0, sizeof(l_usage->tx_cond_hash)); - DAP_DEL_Z(l_usage->receipt_next); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_CREATION_ERROR; - dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); - if (l_usage->service->callbacks.response_error) - l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err)); - // TODO go error substate - return true; + dap_chain_datum_tx_receipt_t *l_receipt = NULL; + + if (a_usage->service_substate == DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_WAITING_FIRST_RECEIPT_SIGN || + a_usage->service_state == DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_WAITING_NEW_TX_IN_LEDGER || + a_usage->service_state == DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_WAITING_NEW_TX_FROM_CLIENT){ + + int ret = s_check_tx_params(a_usage); + if (ret != 0){ + a_usage->last_err_code = ret; + // TODO go to error substate + return; } } - } break; - case DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_NORMAL:{ - - dap_chain_datum_tx_receipt_t *l_receipt = NULL; if (a_usage->service_substate = DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_WAITING_FIRST_RECEIPT_SIGN) l_receipt = a_usage->receipt; else @@ -2073,7 +2088,8 @@ static void s_service_substate_pay_service(dap_chain_net_srv_usage_t *a_usage) break; case PAY_SERVICE_STATUS_NOT_ENOUGH: log_it(L_ERROR, "Tx cond have not enough funds"); - if (a_usage->service_substate != DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_WAITING_NEW_TX_IN_LEDGER){ + if (a_usage->service_substate != DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_WAITING_NEW_TX_IN_LEDGER && + a_usage->service_substate != DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_WAITING_NEW_TX_FROM_CLIENT){ s_service_substate_go_to_waiting_new_tx(a_usage); return; }else { @@ -2146,7 +2162,7 @@ static void s_service_substate_go_to_waiting_new_tx(dap_chain_net_srv_usage_t *a if (!l_grace) { log_it(L_CRITICAL, "%s", c_error_memory_alloc); //TODO go to error substate - return true; + return; } // Parse the request l_grace->ch_uuid = a_usage->client->ch->uuid; @@ -2158,7 +2174,7 @@ static void s_service_substate_go_to_waiting_new_tx(dap_chain_net_srv_usage_t *a if (!l_item) { log_it(L_CRITICAL, "%s", c_error_memory_alloc); //TODO go to error substate - return false; + return; } l_item->grace = l_grace; l_item->tx_cond_hash = a_usage->tx_cond_hash; @@ -2180,3 +2196,26 @@ static void s_service_substate_go_to_waiting_new_tx(dap_chain_net_srv_usage_t *a if (a_usage->service->callbacks.response_error) a_usage->service->callbacks.response_error(a_usage->service,a_usage->id, a_usage->client, &l_err,sizeof (l_err)); } + +static void s_service_substate_go_to_error(dap_chain_net_srv_usage_t *a_usage) +{ + if (!a_usage){ + //TODO go to error state + return; + } + + switch (a_usage->service_state){ + case DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_GRACE:{ + // TODO go to error STATE + } break; + case DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_NORMAL:{ + // wait until end of service in this substate + a_usage->service_substate=DAP_CHAIN_NET_SRV_USAGE_SERVICE_SUBSTATE_ERROR; + dap_stream_ch_pkt_write_unsafe(a_usage->client->ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &a_usage->last_err_code, sizeof (a_usage->last_err_code)); + if (a_usage->service->callbacks.response_error) + a_usage->service->callbacks.response_error(a_usage->service,a_usage->id, a_usage->client, &a_usage->last_err_code,sizeof (a_usage->last_err_code)); + } break; + default: + + } +} \ No newline at end of file diff --git a/modules/net/srv/include/dap_chain_net_srv_stream_session.h b/modules/net/srv/include/dap_chain_net_srv_stream_session.h index 3b2766c83cf45d63d23c937a19d06bc93dabcf8d..6ca422e15eb273044ee894179e0a0f38ab0c728b 100644 --- a/modules/net/srv/include/dap_chain_net_srv_stream_session.h +++ b/modules/net/srv/include/dap_chain_net_srv_stream_session.h @@ -41,12 +41,10 @@ typedef struct dap_chain_net_srv_price dap_chain_net_srv_price_t; typedef enum dap_chain_net_srv_usage_service_state{ DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_IDLE = 0, - DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_REMAIN_LIMITS, DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_GRACE, DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_NORMAL, DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_ERROR, DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_FREE, - } dap_chain_net_srv_usage_service_state_t; typedef enum dap_chain_net_srv_usage_service_substate{ @@ -84,9 +82,11 @@ typedef struct dap_chain_net_srv_usage{ char token_ticker[DAP_CHAIN_TICKER_SIZE_MAX]; bool is_active; + bool is_limits_changed; dap_chain_net_srv_usage_service_state_t service_state; dap_chain_net_srv_usage_service_substate_t service_substate; + uint32_t last_err_code; } dap_chain_net_srv_usage_t; typedef struct dap_net_stats{ diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index 126b1aaef8bf95f33f2f901073a3ca832f67d1ba..4b25bcbd14ddbf847412c9d480e6c26ff0d002b9 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -1364,10 +1364,10 @@ static void s_update_limits(dap_stream_ch_t * a_ch , bool l_issue_new_receipt = false; // Check if there are time limits - if (a_usage->is_free || (!a_usage->receipt && !a_usage->is_grace) || !a_usage->is_active) + if (!a_usage->service_state == DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_FREE || !a_usage->is_active) return; - if (a_usage->is_grace && !a_usage->receipt){ + if (a_usage->service_state == DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_GRACE){ a_srv_session->limits_bytes -= (intmax_t) a_bytes; a_srv_session->limits_ts -= time(NULL) - a_srv_session->last_update_ts; a_srv_session->last_update_ts = time(NULL); @@ -1897,7 +1897,7 @@ static bool s_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); return false; } - if ( (!l_usage->is_free) && (! l_usage->receipt && !l_usage->is_grace) ){ + if ( (!l_usage->service_state != DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_FREE) && (! l_usage->receipt && !!l_usage->service_state != DAP_CHAIN_NET_SRV_USAGE_SERVICE_STATE_GRACE) ){ log_it(L_WARNING, "No active receipt, switching off"); l_usage->is_active = 0; if (l_usage->client)