diff --git a/dap-sdk b/dap-sdk index 08250a34c8c40afb400b5e734dc7097e38a234d3..34ea33894637309e6853b4f69c463523d82651b2 160000 --- a/dap-sdk +++ b/dap-sdk @@ -1 +1 @@ -Subproject commit 08250a34c8c40afb400b5e734dc7097e38a234d3 +Subproject commit 34ea33894637309e6853b4f69c463523d82651b2 diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c index 746588d65b96a1726c39a8f700d3e95383de5805..98781deb70854364a6ac369feeec92fb2d29909b 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c @@ -141,7 +141,7 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) dap_chain_net_srv_usage_t *l_usage = NULL; dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(a_grace->stream_worker, a_grace->ch_uuid); - if (l_ch== NULL ) + if (!l_ch) goto free_exit; dap_chain_net_srv_stream_session_t *l_srv_session = l_ch && l_ch->stream && l_ch->stream->session ? @@ -209,7 +209,7 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) } if (!a_grace->usage) { l_usage = dap_chain_net_srv_usage_add(l_srv_session, l_net, l_srv); - if ( !l_usage ){ // Usage can't add + if ( !l_usage ){ // Usage can't add log_it( L_WARNING, "Can't add usage"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_CANT_ADD_USAGE; goto free_exit; @@ -272,10 +272,20 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) memcpy(l_price, l_srv->pricelist, sizeof(*l_price)); l_price->value_datoshi = uint256_0; } - l_usage->price = l_price; - l_usage->receipt = dap_chain_net_srv_issue_receipt(l_usage->service, l_usage->price, NULL, 0); - dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST, + l_usage->price = l_price; + if (l_usage->receipt_next){ + DAP_DEL_Z(l_usage->receipt_next); + l_usage->receipt_next = dap_chain_net_srv_issue_receipt(l_usage->service, l_usage->price, NULL, 0); + }else{ + dap_chain_net_srv_price_t l_b_price = *l_usage->price; + if (l_grace_start || a_grace->usage){ + l_b_price.units *= 2; + MULT_256_256(l_b_price.value_datoshi, GET_256_FROM_64((uint64_t)2), &l_b_price.value_datoshi); + } + l_usage->receipt = dap_chain_net_srv_issue_receipt(l_usage->service, &l_b_price, NULL, 0); + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST, l_usage->receipt, l_usage->receipt->size); + } }else{ l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_PRICE_NOT_FOUND ; goto free_exit; @@ -304,6 +314,7 @@ static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) } else { DAP_DELETE(a_grace->request); DAP_DELETE(a_grace); + l_usage->is_grace = false; return false; } free_exit: @@ -328,7 +339,7 @@ free_exit: l_item->ht_head = &l_srv->ban_list; HASH_ADD(hh, l_srv->ban_list, client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); pthread_mutex_unlock(&l_srv->banlist_mutex); - dap_timerfd_start(l_srv->grace_period * 10000, (dap_timerfd_callback_t)s_unban_client, l_item); + dap_timerfd_start(l_srv->grace_period * 1000, (dap_timerfd_callback_t)s_unban_client, l_item); } } } @@ -393,8 +404,16 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) /* No need for bare copying, resend it back modified */ if (l_request->data_size_recv) { l_request->data_size = l_request->data_size_recv; + if (!l_request->data_size_send){ + l_request = (pkt_test_t*)DAP_DUP_SIZE(l_request, sizeof(pkt_test_t)); + l_request = DAP_REALLOC(l_request, sizeof(pkt_test_t) + l_request->data_size); + } + randombytes(l_request->data, l_request->data_size); - dap_hash_fast(l_request->data, l_request->data_size, &l_request->data_hash); + dap_hash_fast_t l_data_hash; + dap_hash_fast(l_request->data, l_request->data_size, &l_data_hash); + l_request->data_hash = l_data_hash; + } l_request->err_code = 0; @@ -404,9 +423,13 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE, l_request, l_request->data_size + sizeof(pkt_test_t)); + if(l_request != (pkt_test_t*)l_ch_pkt->data){ + DAP_DELETE(l_request); + } + } break; /* DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_REQUEST */ - case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_REQUEST: { + case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_REQUEST: { //Service request if (l_ch_pkt->hdr.data_size < sizeof(dap_stream_ch_chain_net_srv_pkt_request_hdr_t) ){ log_it( L_WARNING, "Wrong request size, less than minimum"); break; @@ -422,7 +445,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) s_grace_period_control(l_grace); } break; /* DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_REQUEST */ - case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE: { + case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE: { // Check receipt sign and make tx if success if (l_ch_pkt->hdr.data_size < sizeof(dap_chain_receipt_info_t)) { log_it(L_ERROR, "Wrong sign response size, %u when expected at least %zu with smth", l_ch_pkt->hdr.data_size, sizeof(dap_chain_receipt_info_t)); @@ -430,27 +453,21 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) } dap_chain_datum_tx_receipt_t * l_receipt = (dap_chain_datum_tx_receipt_t *) l_ch_pkt->data; size_t l_receipt_size = l_ch_pkt->hdr.data_size; - dap_chain_net_srv_usage_t * l_usage= NULL, *l_tmp= NULL; + dap_chain_net_srv_usage_t * l_usage = l_srv_session->usage_active; bool l_is_found = false; - pthread_mutex_lock(& l_srv_session->parent->mutex ); // TODO rework it with packet usage_id - HASH_ITER(hh, l_srv_session->usages, l_usage, l_tmp){ - if ( l_usage->receipt_next ){ // If we have receipt next - if ( memcmp(&l_usage->receipt_next->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){ - l_is_found = true; - break; - } - }else if (l_usage->receipt ){ // If we sign first receipt - if ( memcmp(&l_usage->receipt->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){ - l_is_found = true; - break; - } + if ( l_usage->receipt_next ){ // If we have receipt next + if ( memcmp(&l_usage->receipt_next->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){ + l_is_found = true; + } + }else if (l_usage->receipt ){ // If we sign first receipt + if ( memcmp(&l_usage->receipt->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){ + l_is_found = true; } } - pthread_mutex_unlock(& l_srv_session->parent->mutex ); if ( !l_is_found || ! l_usage ){ log_it(L_WARNING, "Can't find receipt in usages thats equal to response receipt"); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND ; + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND; dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); if (l_usage && l_usage->service && l_usage->service->callbacks.response_error) l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); @@ -481,6 +498,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) l_usage->service->callbacks.response_error( l_usage->service, l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); break; } + } // get a second signature - from the client (first sign in server, second sign in client) dap_sign_t * l_receipt_sign = dap_chain_datum_tx_receipt_sign_get( l_receipt, l_receipt_size, 1); @@ -493,21 +511,20 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) break; } // Check receipt signature pkey hash - dap_sign_get_pkey_hash(l_receipt_sign, &l_usage->client_pkey_hash); dap_chain_net_srv_banlist_item_t *l_item = NULL; dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_receipt->receipt_info.srv_uid); + dap_sign_get_pkey_hash(l_receipt_sign, &l_usage->client_pkey_hash); if (l_usage->is_grace) { pthread_mutex_lock(&l_srv->banlist_mutex); HASH_FIND(hh, l_srv->ban_list, &l_usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); pthread_mutex_unlock(&l_srv->banlist_mutex); if (l_item) { // client banned - // Update actual receipt - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_BANNED_PKEY_HASH ; - dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof(l_err)); - if (l_usage->service->callbacks.response_error) - l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client, &l_err, sizeof(l_err)); - break; - } + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_BANNED_PKEY_HASH ; + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof(l_err)); + if (l_usage->service->callbacks.response_error) + l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client, &l_err, sizeof(l_err)); + break; + } } else { if (memcmp(l_usage->client_pkey_hash.raw, l_tx_out_cond->subtype.srv_pay.pkey_hash.raw, sizeof(l_usage->client_pkey_hash)) != 0) { l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_WRONG_PKEY_HASH ; @@ -540,17 +557,69 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) size_t l_success_size; if (!l_usage->is_grace) { // Form input transaction + char *l_hash_str = dap_hash_fast_to_str_new(&l_usage->tx_cond_hash); + log_it(L_NOTICE, "Trying create input tx cond from tx %s with active receipt", l_hash_str); + DAP_DEL_Z(l_hash_str); dap_chain_addr_t *l_wallet_addr = dap_chain_wallet_get_addr(l_usage->price->wallet, l_usage->net->pub.id); + int ret_status = 0; char *l_tx_in_hash_str = dap_chain_mempool_tx_create_cond_input(l_usage->net, &l_usage->tx_cond_hash, l_wallet_addr, dap_chain_wallet_get_key(l_usage->price->wallet, 0), - l_receipt, "hex"); - if (l_tx_in_hash_str) { + l_receipt, "hex", &ret_status); + if (!ret_status) { dap_chain_hash_fast_from_str(l_tx_in_hash_str, &l_usage->tx_cond_hash); log_it(L_NOTICE, "Formed tx %s for input with active receipt", l_tx_in_hash_str); DAP_DELETE(l_tx_in_hash_str); - }else - log_it(L_ERROR, "Can't create input tx cond transaction!"); - l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t) + sizeof(dap_chain_hash_fast_t); + }else{ + // TODO add ret status handling/ if tx not found start grace again + dap_chain_net_srv_grace_t *l_grace = NULL; + switch(ret_status){ + case DAP_CHAIN_MEMPOOl_RET_STATUS_CANT_FIND_FINAL_TX_HASH: + // TX not found in ledger and we not in grace, start grace + log_it(L_ERROR, "Can't find tx cond. Start grace!"); + l_grace = DAP_NEW_Z(dap_chain_net_srv_grace_t); + // Parse the request +// l_grace->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, sizeof(dap_stream_ch_chain_net_srv_pkt_request_t)); +// l_grace->request->hdr.net_id = l_usage->net->pub.id; +// memcpy(l_grace->request->hdr.token, l_usage->token_ticker, strlen(l_usage->token_ticker)); +// l_grace->request->hdr.srv_uid = l_usage->service->uid; +// l_grace->request->hdr.tx_cond = l_usage->tx_cond_hash; +// l_ch_chain_net_srv->srv_uid.uint64 = l_grace->request->hdr.srv_uid.uint64; +// l_grace->request_size = l_ch_pkt->hdr.data_size; +// l_grace->ch_uuid = a_ch->uuid; +// l_grace->stream_worker = a_ch->stream_worker; +// s_grace_period_control(l_grace); + break; + case DAP_CHAIN_MEMPOOl_RET_STATUS_NOT_ENOUGH: + // TODO send new tx cond request + memset(&l_usage->tx_cond_hash, 0, sizeof(l_usage->tx_cond_hash)); + DAP_DEL_Z(l_usage->receipt_next); + log_it(L_ERROR, "Tx cond have not enough funds"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_ENOUGH; + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); +// if (l_usage->service->callbacks.response_error) +// l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err)); + break; + case DAP_CHAIN_MEMPOOL_RET_STATUS_BAD_ARGUMENTS: + case DAP_CHAIN_MEMPOOl_RET_STATUS_WRONG_ADDR: + case DAP_CHAIN_MEMPOOl_RET_STATUS_NOT_NATIVE_TOKEN: + case DAP_CHAIN_MEMPOOl_RET_STATUS_NO_COND_OUT: + case DAP_CHAIN_MEMPOOl_RET_STATUS_CANT_ADD_TX_OUT: + case DAP_CHAIN_MEMPOOl_RET_STATUS_CANT_ADD_SIGN: + default: + log_it(L_ERROR, "Can't create input tx cond transaction!"); + memset(&l_usage->tx_cond_hash, 0, sizeof(l_usage->tx_cond_hash)); + DAP_DEL_Z(l_usage->receipt_next); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND; + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); + if (l_usage->service->callbacks.response_error) + l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err)); + break; + } + + + break; + } + l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t) + DAP_CHAIN_HASH_FAST_STR_SIZE;//sizeof(dap_chain_hash_fast_t); } else { l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t); } @@ -562,15 +631,19 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; - if (l_usage->is_grace) - log_it(L_NOTICE, "Receipt is OK, but transaction can't be found. Start the grace period for %d seconds", + if (l_usage->is_grace){ + char *l_hash_str = dap_hash_fast_to_str_new(&l_usage->tx_cond_hash); + log_it(L_NOTICE, "Receipt is OK, but transaction %s can't be found. Start the grace period for %d seconds", l_hash_str, l_srv->grace_period); - else { - memcpy(l_success->custom_data, &l_usage->tx_cond_hash, sizeof(dap_chain_hash_fast_t)); + DAP_DEL_Z(l_hash_str); + }else { + char *l_hash_str = dap_hash_fast_to_str_new(&l_usage->tx_cond_hash); + memcpy(l_success->custom_data, l_hash_str, DAP_CHAIN_HASH_FAST_STR_SIZE); + DAP_DEL_Z(l_hash_str); log_it(L_NOTICE, "Receipt with remote client sign is acceptible for. Now start the service's usage"); } - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS , + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, l_success, l_success_size); if ( l_is_first_sign && l_usage->service->callbacks.response_success){ @@ -597,7 +670,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) pkt_t * l_pkt =(pkt_t *) l_ch_pkt->data; size_t l_pkt_size = l_ch_pkt->hdr.data_size - sizeof(pkt_t); dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get( l_pkt->hdr.srv_uid); - dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find_unsafe( l_srv_session, l_pkt->hdr.usage_id ); + dap_chain_net_srv_usage_t * l_usage = l_srv_session->usage_active;//dap_chain_net_srv_usage_find_unsafe( l_srv_session, l_pkt->hdr.usage_id ); // If service not found if ( l_srv == NULL){ l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND ; @@ -630,7 +703,23 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) sizeof ( dap_stream_ch_chain_net_srv_pkt_error_t) ); } } break; - + case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NEW_TX_COND_RESPONSE:{ + dap_chain_net_srv_usage_t * l_usage = NULL; + l_usage = l_srv_session->usage_active; + dap_stream_ch_chain_net_srv_pkt_request_t* l_responce = (dap_stream_ch_chain_net_srv_pkt_request_t*)l_ch_pkt->data; + l_usage->tx_cond_hash = l_responce->hdr.tx_cond; + char *l_tx_in_hash_str = dap_chain_hash_fast_to_str_new(&l_usage->tx_cond_hash); + log_it(L_NOTICE, "Received new tx cond %s", l_tx_in_hash_str); + DAP_DELETE(l_tx_in_hash_str); + size_t l_success_size = sizeof (dap_stream_ch_chain_net_srv_pkt_success_hdr_t ); + dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, + l_success_size); + l_success->hdr.usage_id = l_usage->id; + l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; + l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, l_success, l_success_size); + DAP_DELETE(l_success); + }break; default: log_it( L_WARNING, "Unknown packet type 0x%02X", l_ch_pkt->hdr.type); } if(l_ch_chain_net_srv->notify_callback) diff --git a/modules/mempool/dap_chain_mempool.c b/modules/mempool/dap_chain_mempool.c index ba72fe5a03c508d4defb77a2251f85b84ba621fa..38f7c4021d17321fae1a61050f324de733ee4893 100644 --- a/modules/mempool/dap_chain_mempool.c +++ b/modules/mempool/dap_chain_mempool.c @@ -597,24 +597,34 @@ static void s_tx_create_massive_gdb_save_callback(dap_global_db_context_t *a_glo * * return 0 Ok, -2 not enough funds to transfer, -1 other Error */ -char *dap_chain_mempool_tx_create_cond_input(dap_chain_net_t *a_net, dap_chain_hash_fast_t *a_tx_prev_hash, +char* dap_chain_mempool_tx_create_cond_input(dap_chain_net_t *a_net, dap_chain_hash_fast_t *a_tx_prev_hash, const dap_chain_addr_t *a_addr_to, dap_enc_key_t *a_key_tx_sign, - dap_chain_datum_tx_receipt_t *a_receipt, const char *a_hash_out_type) + dap_chain_datum_tx_receipt_t *a_receipt, const char *a_hash_out_type, int *a_ret_status) { dap_ledger_t * l_ledger = a_net ? dap_chain_ledger_by_net_name( a_net->pub.name ) : NULL; - if ( ! a_net || ! l_ledger || ! a_addr_to ) + if ( ! a_net || ! l_ledger || ! a_addr_to ){ + if (a_ret_status) + *a_ret_status = DAP_CHAIN_MEMPOOL_RET_STATUS_BAD_ARGUMENTS; return NULL; + } + if ( ! dap_chain_addr_check_sum (a_addr_to) ){ log_it(L_ERROR, "Wrong address_to checksum"); + if (a_ret_status) + *a_ret_status = DAP_CHAIN_MEMPOOl_RET_STATUS_WRONG_ADDR; return NULL; } dap_chain_hash_fast_t *l_tx_final_hash = dap_chain_ledger_get_final_chain_tx_hash(l_ledger, DAP_CHAIN_TX_OUT_COND_SUBTYPE_SRV_PAY, a_tx_prev_hash); if (!l_tx_final_hash) { log_it(L_WARNING, "Requested conditional transaction is already used out"); + if (a_ret_status) + *a_ret_status = DAP_CHAIN_MEMPOOl_RET_STATUS_CANT_FIND_FINAL_TX_HASH; return NULL; } if (dap_strcmp(a_net->pub.native_ticker, dap_chain_ledger_tx_get_token_ticker_by_hash(l_ledger, l_tx_final_hash))) { log_it(L_WARNING, "Pay for service should be only in native token ticker"); + if (a_ret_status) + *a_ret_status = DAP_CHAIN_MEMPOOl_RET_STATUS_NOT_NATIVE_TOKEN; return NULL; } dap_chain_datum_tx_t *l_tx_cond = dap_chain_ledger_tx_find_by_hash(l_ledger, l_tx_final_hash); @@ -622,6 +632,8 @@ char *dap_chain_mempool_tx_create_cond_input(dap_chain_net_t *a_net, dap_chain_h dap_chain_tx_out_cond_t *l_out_cond = dap_chain_datum_tx_out_cond_get(l_tx_cond, DAP_CHAIN_TX_OUT_COND_SUBTYPE_SRV_PAY, &l_out_cond_idx); if (!l_out_cond) { log_it(L_WARNING, "Requested conditioned transaction have no conditioned output"); + if (a_ret_status) + *a_ret_status = DAP_CHAIN_MEMPOOl_RET_STATUS_NO_COND_OUT; return NULL; } @@ -635,6 +647,8 @@ char *dap_chain_mempool_tx_create_cond_input(dap_chain_net_t *a_net, dap_chain_h SUM_256_256(l_value_send, l_fee, &l_value_send); if (compare256(l_out_cond->header.value, l_value_send) < 0) { log_it(L_WARNING, "Requested conditioned transaction have no enough funds"); + if (a_ret_status) + *a_ret_status = DAP_CHAIN_MEMPOOl_RET_STATUS_NOT_ENOUGH; return NULL; } @@ -644,20 +658,26 @@ char *dap_chain_mempool_tx_create_cond_input(dap_chain_net_t *a_net, dap_chain_h // add 'in_cond' items if (dap_chain_datum_tx_add_in_cond_item(&l_tx, l_tx_final_hash, l_out_cond_idx, 0)) { dap_chain_datum_tx_delete(l_tx); - log_it( L_ERROR, "Cant add tx cond input"); + log_it( L_ERROR, "Can`t add tx cond input"); + if (a_ret_status) + *a_ret_status = DAP_CHAIN_MEMPOOl_RET_STATUS_CANT_ADD_TX_OUT; return NULL; } // add 'out' item if (dap_chain_datum_tx_add_out_item(&l_tx, a_addr_to, a_receipt->receipt_info.value_datoshi) != 1) { dap_chain_datum_tx_delete(l_tx); - log_it( L_ERROR, "Cant add tx output"); + log_it( L_ERROR, "Can`t add tx output"); + if (a_ret_status) + *a_ret_status = DAP_CHAIN_MEMPOOl_RET_STATUS_CANT_ADD_TX_OUT; return NULL; } // add network fee if (l_net_fee_used) { if (dap_chain_datum_tx_add_out_item(&l_tx, &l_addr_fee, l_net_fee) != 1) { dap_chain_datum_tx_delete(l_tx); - log_it( L_ERROR, "Cant add tx output"); + log_it( L_ERROR, "Can`t add tx output"); + if (a_ret_status) + *a_ret_status = DAP_CHAIN_MEMPOOl_RET_STATUS_CANT_ADD_TX_OUT; return NULL; } } @@ -665,7 +685,9 @@ char *dap_chain_mempool_tx_create_cond_input(dap_chain_net_t *a_net, dap_chain_h if (!IS_ZERO_256(l_fee)) { if (dap_chain_datum_tx_add_fee_item(&l_tx, l_fee) != 1) { dap_chain_datum_tx_delete(l_tx); - log_it( L_ERROR, "Cant add tx output"); + log_it( L_ERROR, "Can`t add tx output"); + if (a_ret_status) + *a_ret_status = DAP_CHAIN_MEMPOOl_RET_STATUS_CANT_ADD_TX_OUT; return NULL; } } @@ -680,6 +702,8 @@ char *dap_chain_mempool_tx_create_cond_input(dap_chain_net_t *a_net, dap_chain_h if(dap_chain_datum_tx_add_sign_item(&l_tx, a_key_tx_sign) != 1) { dap_chain_datum_tx_delete(l_tx); log_it( L_ERROR, "Can't add sign output"); + if (a_ret_status) + *a_ret_status = DAP_CHAIN_MEMPOOl_RET_STATUS_CANT_ADD_SIGN; return NULL; } size_t l_tx_size = dap_chain_datum_tx_get_size( l_tx ); @@ -688,6 +712,8 @@ char *dap_chain_mempool_tx_create_cond_input(dap_chain_net_t *a_net, dap_chain_h dap_chain_t *l_chain = dap_chain_net_get_default_chain_by_chain_type(a_net, CHAIN_TYPE_TX); char *l_ret = dap_chain_mempool_datum_add(l_datum, l_chain, a_hash_out_type); DAP_DELETE(l_datum); + if (a_ret_status) + *a_ret_status = DAP_CHAIN_MEMPOOl_RET_STATUS_SUCCESS; return l_ret; } diff --git a/modules/mempool/include/dap_chain_mempool.h b/modules/mempool/include/dap_chain_mempool.h index 3cf9bc10c56bc92809a4c65a8bd37f8a75877a99..f14bdce4ebae2ee3249b48147e618ab0b787a0e9 100644 --- a/modules/mempool/include/dap_chain_mempool.h +++ b/modules/mempool/include/dap_chain_mempool.h @@ -21,6 +21,19 @@ #define DAP_DATUM_MEMPOOL_VERSION "01" +// dap_chain_mempool_tx_create_cond_input ret status +#define DAP_CHAIN_MEMPOOl_RET_STATUS_SUCCESS 0 +#define DAP_CHAIN_MEMPOOL_RET_STATUS_BAD_ARGUMENTS -100 +#define DAP_CHAIN_MEMPOOl_RET_STATUS_WRONG_ADDR -101 +#define DAP_CHAIN_MEMPOOl_RET_STATUS_CANT_FIND_FINAL_TX_HASH -102 +#define DAP_CHAIN_MEMPOOl_RET_STATUS_NOT_NATIVE_TOKEN -103 +#define DAP_CHAIN_MEMPOOl_RET_STATUS_NO_COND_OUT -104 +#define DAP_CHAIN_MEMPOOl_RET_STATUS_NOT_ENOUGH -105 +#define DAP_CHAIN_MEMPOOl_RET_STATUS_CANT_ADD_TX_OUT -106 +#define DAP_CHAIN_MEMPOOl_RET_STATUS_CANT_ADD_SIGN -107 + + + // action enum { DAP_DATUM_MEMPOOL_NONE = 0, DAP_DATUM_MEMPOOL_ADD, DAP_DATUM_MEMPOOL_CHECK, DAP_DATUM_MEMPOOL_DEL @@ -55,7 +68,7 @@ char *dap_chain_mempool_tx_create(dap_chain_t *a_chain, dap_enc_key_t *a_key_fro uint256_t a_value, uint256_t a_value_fee, const char *a_hash_out_type); // Make transfer transaction & insert to cache -char *dap_chain_mempool_tx_create_cond(dap_chain_net_t * a_net, +char* dap_chain_mempool_tx_create_cond(dap_chain_net_t * a_net, dap_enc_key_t *a_key_from, dap_pkey_t *a_key_cond, const char a_token_ticker[DAP_CHAIN_TICKER_SIZE_MAX], uint256_t a_value, uint256_t a_value_per_unit_max, dap_chain_net_srv_price_unit_uid_t a_unit, @@ -63,7 +76,7 @@ char *dap_chain_mempool_tx_create_cond(dap_chain_net_t * a_net, size_t a_cond_size, const char *a_hash_out_type); char *dap_chain_mempool_tx_create_cond_input(dap_chain_net_t *a_net, dap_chain_hash_fast_t *a_tx_prev_hash, - const dap_chain_addr_t *a_addr_to, dap_enc_key_t *a_key_tx_sign, dap_chain_datum_tx_receipt_t *a_receipt, const char *a_hash_out_type); + const dap_chain_addr_t *a_addr_to, dap_enc_key_t *a_key_tx_sign, dap_chain_datum_tx_receipt_t *a_receipt, const char *a_hash_out_type, int *a_ret_status); int dap_chain_mempool_tx_create_massive(dap_chain_t * a_chain, dap_enc_key_t *a_key_from, const dap_chain_addr_t* a_addr_from, const dap_chain_addr_t* a_addr_to, diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index e4c2947d928275bb02111cbc02dfcc1ae4a30993..09c79aefb05d874e93f455f4f20cbec14058fc7a 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -2105,7 +2105,7 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) } l_ret = dap_global_db_del_sync(l_gdb_group_str, l_hash_string); DAP_DELETE(l_gdb_group_str); - if (!l_ret) { + if (l_ret) { dap_cli_server_cmd_set_reply_text(a_str_reply, "Cant't find certificate public key hash in database"); return -10; } diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 58f6053e905db8dd6fdbad1fb3207e70f7e9101d..980b03424d163ed395c7c592cc6a52ae20265303 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -314,7 +314,7 @@ static void s_ch_chain_callback_notify_packet_in2(dap_stream_ch_chain_net_t* a_c pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_NODE_ADDR_LEASED; dap_cond_signal(l_node_client->wait_cond); - pthread_mutex_lock(&l_node_client->wait_mutex); + pthread_mutex_unlock(&l_node_client->wait_mutex); break; } // get remote node address @@ -326,7 +326,7 @@ static void s_ch_chain_callback_notify_packet_in2(dap_stream_ch_chain_net_t* a_c pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_GET_NODE_ADDR; dap_cond_signal(l_node_client->wait_cond); - pthread_mutex_lock(&l_node_client->wait_mutex); + pthread_mutex_unlock(&l_node_client->wait_mutex); break; } case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_VALIDATOR_READY: { @@ -642,7 +642,7 @@ static void s_ch_chain_callback_notify_packet_R(dap_stream_ch_chain_net_srv_t* a pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_CHECKED; dap_cond_signal(l_node_client->wait_cond); - pthread_mutex_lock(&l_node_client->wait_mutex); + pthread_mutex_unlock(&l_node_client->wait_mutex); break; } case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE: @@ -749,6 +749,7 @@ bool dap_chain_node_client_connect(dap_chain_node_client_t *a_node_client, const log_it(L_WARNING, "Undefined address with node client connect to"); return false; } + int ret_code = 0; dap_client_set_uplink_unsafe(a_node_client->client, l_host_addr, a_node_client->info->hdr.ext_port); a_node_client->state = NODE_CLIENT_STATE_CONNECTING; // Handshake & connect diff --git a/modules/net/srv/dap_chain_net_srv.c b/modules/net/srv/dap_chain_net_srv.c index 2ca571221aade09484266e3a313dfac124fb3fa7..efc48b8cc86a5b1ad21cdb2f29d8f751f37ea574 100644 --- a/modules/net/srv/dap_chain_net_srv.c +++ b/modules/net/srv/dap_chain_net_srv.c @@ -616,7 +616,7 @@ static int s_cli_net_srv( int argc, char **argv, char **a_str_reply) dap_string_append_printf(l_string_ret, "Command 'net_srv' requires subcommand 'order'"); ret = -3; } - dap_cli_server_cmd_set_reply_text(a_str_reply, l_string_ret->str); + dap_cli_server_cmd_set_reply_text(a_str_reply, "%s", l_string_ret->str); dap_string_free(l_string_ret, true); } @@ -675,51 +675,72 @@ static bool s_pay_verificator_callback(dap_ledger_t * a_ledger, dap_chain_tx_out dap_chain_datum_tx_t *a_tx_in, bool a_owner) { UNUSED(a_ledger); - if (!a_owner) - return false; + if (a_owner) + return true; dap_chain_datum_tx_receipt_t *l_receipt = (dap_chain_datum_tx_receipt_t *) dap_chain_datum_tx_item_get(a_tx_in, NULL, TX_ITEM_TYPE_RECEIPT, NULL); - if (!l_receipt) + if (!l_receipt){ + log_it(L_ERROR, "Can't find receipt."); return false; + } // Check provider sign dap_sign_t *l_sign = dap_chain_datum_tx_receipt_sign_get(l_receipt, l_receipt->size, 0); - if (!l_sign) + if (!l_sign){ + log_it(L_ERROR, "Can't get provider sign from receipt."); return false; + } - if (dap_sign_verify_all(l_sign, l_sign->header.sign_pkey_size + l_sign->header.sign_size, &l_receipt->receipt_info, sizeof(l_receipt->receipt_info))){ + if (dap_sign_verify_all(l_sign, dap_sign_get_size(l_sign), &l_receipt->receipt_info, sizeof(l_receipt->receipt_info))){ + log_it(L_ERROR, "Provider sign in receipt not passed verification."); return false; } - // Checking that the signature matches the provider's signature + // Checking the signature matches the provider's signature dap_hash_fast_t l_tx_sign_pkey_hash = {}; dap_hash_fast_t l_provider_pkey_hash = {}; - if (!dap_sign_get_pkey_hash(l_sign, &l_provider_pkey_hash)) + if (!dap_sign_get_pkey_hash(l_sign, &l_provider_pkey_hash)){ + log_it(L_ERROR, "Can't get pkey hash from provider sign."); return false; + } int l_item_size = 0; uint8_t* l_sig = dap_chain_datum_tx_item_get(a_tx_in, 0, TX_ITEM_TYPE_SIG, &l_item_size); if(!l_sig){ + log_it(L_ERROR, "Can't get item with provider signature from tx"); return false; } l_sign = dap_chain_datum_tx_item_sign_get_sig((dap_chain_tx_sig_t *)l_sig); - if (!l_sign || !dap_sign_get_pkey_hash(l_sign, &l_tx_sign_pkey_hash)) + if (!l_sign){ + log_it(L_ERROR, "Provider sign from tx sig_item"); + return false; + } + + if(!dap_sign_get_pkey_hash(l_sign, &l_tx_sign_pkey_hash)){ + log_it(L_ERROR, "Can't get pkey hash from tx provider signature"); return false; + } if(!dap_hash_fast_compare(&l_tx_sign_pkey_hash, &l_provider_pkey_hash)){ + log_it(L_ERROR, "Provider signature in receipt and tx is different."); return false; } // Check client sign l_sign = dap_chain_datum_tx_receipt_sign_get(l_receipt, l_receipt->size, 1); - if (!l_sign) + if (!l_sign){ + log_it(L_ERROR, "Can't get client signature from receipt."); return false; + } dap_hash_fast_t l_pkey_hash = {}; - if (!dap_sign_get_pkey_hash(l_sign, &l_pkey_hash)) + if (!dap_sign_get_pkey_hash(l_sign, &l_pkey_hash)){ + log_it(L_ERROR, "Can't get pkey hash from receipt client signature"); return false; + } if(!dap_hash_fast_compare(&l_pkey_hash, &a_cond->subtype.srv_pay.pkey_hash)){ + log_it(L_ERROR, "Client signature in receipt is invalid!"); return false; } diff --git a/modules/net/srv/dap_chain_net_srv_stream_session.c b/modules/net/srv/dap_chain_net_srv_stream_session.c index e72e03bad1e86179aaa410e13d226e9e0ab36430..34128a0d38f0abfb4c01c3c734d90449b470ae0d 100644 --- a/modules/net/srv/dap_chain_net_srv_stream_session.c +++ b/modules/net/srv/dap_chain_net_srv_stream_session.c @@ -63,9 +63,7 @@ dap_chain_net_srv_usage_t* dap_chain_net_srv_usage_add (dap_chain_net_srv_stream l_ret->net = a_net; l_ret->service = a_srv; pthread_rwlock_init(&l_ret->rwlock,NULL); - pthread_mutex_lock(&a_srv_session->parent->mutex); - HASH_ADD_INT( a_srv_session->usages, id,l_ret ); - pthread_mutex_unlock(&a_srv_session->parent->mutex); + a_srv_session->usage_active = l_ret; log_it( L_NOTICE, "Added service %s:0x%016"DAP_UINT64_FORMAT_X" , usage id: %d", l_ret->net->pub.name, a_srv->uid.uint64, l_ret->id); return l_ret; }else{ @@ -94,9 +92,6 @@ void dap_chain_net_srv_usage_delete (dap_chain_net_srv_stream_session_t * a_srv_ } - pthread_mutex_lock(&a_srv_session->parent->mutex); - HASH_DEL(a_srv_session->usages, a_usage); - pthread_mutex_unlock(&a_srv_session->parent->mutex); DAP_DELETE( a_usage ); } @@ -110,6 +105,7 @@ dap_chain_net_srv_usage_t* dap_chain_net_srv_usage_find_unsafe (dap_chain_net_sr uint32_t a_usage_id) { dap_chain_net_srv_usage_t * l_ret = NULL; - HASH_FIND_INT(a_srv_session->usages, &a_usage_id, l_ret); + if (a_srv_session->usage_active->id == a_usage_id) + l_ret = a_srv_session->usage_active; return l_ret; } diff --git a/modules/net/srv/include/dap_chain_net_srv.h b/modules/net/srv/include/dap_chain_net_srv.h index f46a7f43e46536fa246cf5da94f73ec93c953566..53db3a75b0f8dbb20b38f303711514b75882822d 100755 --- a/modules/net/srv/include/dap_chain_net_srv.h +++ b/modules/net/srv/include/dap_chain_net_srv.h @@ -87,11 +87,13 @@ typedef struct dap_chain_net_srv_price #define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED 0x20 #define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_DATA 0x30 #define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_DATA 0x31 +#define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NEW_TX_COND_REQUEST 0x40 +#define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NEW_TX_COND_RESPONSE 0x41 #define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS 0xf0 #define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR 0xff // for connection testing -#define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_REQUEST 0x40 -#define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE 0x41 +#define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_REQUEST 0x50 +#define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE 0x51 #define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_UNDEFINED 0x00000000 diff --git a/modules/net/srv/include/dap_chain_net_srv_stream_session.h b/modules/net/srv/include/dap_chain_net_srv_stream_session.h index 527927cc16e91a65156153917c54e4fdb9ecd1db..6f7a7b0dda5c930ca9557af4378dab2a708c8756 100644 --- a/modules/net/srv/include/dap_chain_net_srv_stream_session.h +++ b/modules/net/srv/include/dap_chain_net_srv_stream_session.h @@ -55,7 +55,7 @@ typedef struct dap_chain_net_srv_usage{ bool is_active; bool is_free; bool is_grace; - UT_hash_handle hh; // +// UT_hash_handle hh; // } dap_chain_net_srv_usage_t; typedef struct dap_net_stats{ @@ -73,10 +73,12 @@ typedef struct dap_net_stats{ typedef struct dap_chain_net_srv_stream_session { time_t ts_activated; dap_stream_session_t * parent; - dap_chain_net_srv_usage_t * usages; dap_chain_net_srv_usage_t * usage_active; - uintmax_t limits_bytes; // Bytes left - time_t limits_ts; // Timestamp until its activte + intmax_t limits_bytes; // Bytes provided for using the service left + time_t limits_ts; //Time provided for using the service + + time_t last_update_ts; //Time provided for using the service + dap_chain_net_srv_price_unit_uid_t limits_units_type; // Some common stats diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index 174e3cfa29ba9aac2763089ff85455e278a7690a..29497559df64dc9d7935f8be11c45e4fced6175e 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -235,7 +235,7 @@ static bool s_tun_client_send_data_unsafe(dap_chain_net_srv_ch_vpn_t * l_ch_vpn, static bool s_tun_client_send_data_unsafe(dap_chain_net_srv_ch_vpn_t * l_ch_vpn, ch_vpn_pkt_t * l_pkt_out) { dap_chain_net_srv_stream_session_t *l_srv_session = DAP_CHAIN_NET_SRV_STREAM_SESSION(l_ch_vpn->ch->stream->session); - dap_chain_net_srv_usage_t *l_usage = dap_chain_net_srv_usage_find_unsafe(l_srv_session, l_ch_vpn->usage_id); + dap_chain_net_srv_usage_t *l_usage = l_srv_session->usage_active;// dap_chain_net_srv_usage_find_unsafe(l_srv_session, l_ch_vpn->usage_id); size_t l_data_to_send = (l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header)); debug_if(s_debug_more, L_DEBUG, "Sent stream pkt size %zu on worker #%u", l_data_to_send, l_ch_vpn->ch->stream_worker->worker->id); size_t l_data_sent = dap_stream_ch_pkt_write_unsafe(l_ch_vpn->ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, l_pkt_out, l_data_to_send); @@ -262,7 +262,7 @@ static bool s_tun_client_send_data_unsafe(dap_chain_net_srv_ch_vpn_t * l_ch_vpn, static bool s_tun_client_send_data_inter(dap_events_socket_t * a_es_input, dap_chain_net_srv_ch_vpn_t * a_ch_vpn, ch_vpn_pkt_t * a_pkt_out) { dap_chain_net_srv_stream_session_t * l_srv_session = DAP_CHAIN_NET_SRV_STREAM_SESSION (a_ch_vpn->ch->stream->session ); - dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find_unsafe(l_srv_session, a_ch_vpn->usage_id); + dap_chain_net_srv_usage_t * l_usage = l_srv_session->usage_active;// dap_chain_net_srv_usage_find_unsafe(l_srv_session, a_ch_vpn->usage_id); size_t l_data_to_send = (a_pkt_out->header.op_data.data_size + sizeof(a_pkt_out->header)); size_t l_data_sent = dap_stream_ch_pkt_write_inter(a_es_input, a_ch_vpn->ch->uuid, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, a_pkt_out, l_data_to_send); @@ -363,7 +363,7 @@ static void s_tun_recv_msg_callback(dap_events_socket_t * a_esocket_queue, void switch (l_msg->type) { case TUN_SOCKET_MSG_ESOCKET_REASSIGNED: { assert(l_msg->esocket_reassigment.worker_id < s_tun_sockets_count); - dap_chain_net_srv_vpn_tun_socket_t* l_tun_sock = s_tun_sockets[a_esocket_queue->worker->id]; + dap_chain_net_srv_vpn_tun_socket_t* l_tun_sock = s_tun_sockets[a_esocket_queue->context->worker->id]; assert(l_tun_sock); dap_chain_net_srv_ch_vpn_info_t* l_info = NULL; HASH_FIND(hh, l_tun_sock->clients, &l_msg->esocket_reassigment.addr, sizeof(l_msg->esocket_reassigment.addr), l_info); @@ -371,11 +371,11 @@ static void s_tun_recv_msg_callback(dap_events_socket_t * a_esocket_queue, void l_info->worker = dap_events_worker_get(l_msg->esocket_reassigment.worker_id); l_info->queue_msg = s_tun_sockets_queue_msg[l_msg->esocket_reassigment.worker_id]; l_info->is_reassigned_once = true; - l_info->is_on_this_worker = (a_esocket_queue->worker->id == l_msg->esocket_reassigment.worker_id); + l_info->is_on_this_worker = (a_esocket_queue->context->worker->id == l_msg->esocket_reassigment.worker_id); if (s_debug_more) { char l_addrbuf[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &l_msg->esocket_reassigment.addr, l_addrbuf, sizeof(l_addrbuf)); - log_it(L_INFO, "Tun:%u message: addr %s reassign on worker #%u", a_esocket_queue->worker->id, + log_it(L_INFO, "Tun:%u message: addr %s reassign on worker #%u", a_esocket_queue->context->worker->id, l_addrbuf, l_msg->esocket_reassigment.worker_id); } } @@ -383,7 +383,7 @@ static void s_tun_recv_msg_callback(dap_events_socket_t * a_esocket_queue, void char l_addrbuf[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &l_msg->esocket_reassigment.addr, l_addrbuf, sizeof(l_addrbuf)); log_it(L_INFO, "Reassigment message for address %s on worker %u comes but no such address was found on tun socket %u", - l_addrbuf, l_msg->esocket_reassigment.worker_id, a_esocket_queue->worker->id); + l_addrbuf, l_msg->esocket_reassigment.worker_id, a_esocket_queue->context->worker->id); } } break; /* l_msg->type == TUN_SOCKET_MSG_ESOCKET_REASSIGNED */ @@ -413,7 +413,7 @@ static void s_tun_recv_msg_callback(dap_events_socket_t * a_esocket_queue, void if (dap_log_level_get() <= L_INFO) { char l_addrbuf[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &l_msg->ip_assigment.addr, l_addrbuf, sizeof(l_addrbuf)); - log_it(L_DEBUG, "Tun:%u message: addr %s assigned for worker #%u on tun #u", a_esocket_queue->worker->id, + log_it(L_DEBUG, "Tun:%u message: addr %s assigned for worker #%u on tun #u", a_esocket_queue->context->worker->id, l_addrbuf, l_msg->ip_assigment.worker_id); } } @@ -442,18 +442,18 @@ static void s_tun_recv_msg_callback(dap_events_socket_t * a_esocket_queue, void }break; /* l_msg->type == TUN_SOCKET_MSG_IP_UNASSIGNED */ case TUN_SOCKET_MSG_CH_VPN_SEND: { - if (dap_context_find(a_esocket_queue->worker->context, l_msg->esocket_uuid) == l_msg->esocket) { + if (dap_context_find(a_esocket_queue->context->worker->context, l_msg->esocket_uuid) == l_msg->esocket) { s_tun_client_send_data_unsafe(l_msg->ch_vpn, l_msg->ch_vpn_send.pkt); if (s_debug_more) { char l_addrbuf[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &l_msg->ip_assigment.addr, l_addrbuf, sizeof(l_addrbuf)); log_it(L_DEBUG, "Tun:%u message: send %u bytes for ch vpn protocol", - a_esocket_queue->worker->id, l_msg->ch_vpn_send.pkt->header.op_data.data_size); + a_esocket_queue->context->worker->id, l_msg->ch_vpn_send.pkt->header.op_data.data_size); } } else { log_it(L_ERROR, "MSG: No esocket %p on worker #%u, lost %d data", - l_msg->esocket, a_esocket_queue->worker->id, l_msg->ch_vpn_send.pkt->header.op_data.data_size); + l_msg->esocket, a_esocket_queue->context->worker->id, l_msg->ch_vpn_send.pkt->header.op_data.data_size); } DAP_DELETE(l_msg->ch_vpn_send.pkt); } break; /* l_msg->type == TUN_SOCKET_MSG_CH_VPN_SEND */ @@ -515,8 +515,9 @@ static void s_tun_send_msg_ip_unassigned(uint32_t a_worker_own_id, uint32_t a_wo l_msg->ip_unassigment.addr = a_addr; l_msg->ip_unassigment.worker_id = a_ch_vpn->ch->stream_worker->worker->id; l_msg->esocket = a_ch_vpn->ch->stream->esocket; - l_msg->esocket_uuid = a_ch_vpn->ch->stream->esocket->uuid; - l_msg->is_reassigned_once = a_ch_vpn->ch->stream->esocket->was_reassigned; + l_msg->esocket_uuid = a_ch_vpn->ch->stream->esocket ? a_ch_vpn->ch->stream->esocket->uuid : 0; + l_msg->is_reassigned_once = a_ch_vpn->ch->stream->esocket ? a_ch_vpn->ch->stream->esocket->was_reassigned : false; + if( a_worker_own_id != a_worker_id){ if ( dap_events_socket_queue_ptr_send(s_tun_sockets_queue_msg[a_worker_id], l_msg) != 0 ) { @@ -749,6 +750,8 @@ static int s_vpn_tun_create(dap_config_t * g_config) } s_tun_deattach_queue(l_tun_fd); s_raw_server->tun_device_name = strdup(s_raw_server->ifr.ifr_name); + s_raw_server->tun_fd = l_tun_fd; + #elif !defined (DAP_OS_DARWIN) #error "Undefined tun interface attach for your platform" #endif @@ -901,12 +904,12 @@ static int s_callback_response_success(dap_chain_net_srv_t * a_srv, uint32_t a_u int l_ret = 0; const dap_chain_datum_tx_receipt_t * l_receipt = (const dap_chain_datum_tx_receipt_t *) a_request; size_t l_receipt_size = a_request_size; - + log_it( L_INFO, "s_callback_response_success is called"); // dap_stream_ch_chain_net_srv_pkt_request_t * l_request = (dap_stream_ch_chain_net_srv_pkt_request_t *) a_request; // dap_chain_net_srv_stream_session_t * l_srv_session = (dap_chain_net_srv_stream_session_t *) a_srv_client->ch->stream->session->_inheritor; dap_chain_net_srv_stream_session_t * l_srv_session = (dap_chain_net_srv_stream_session_t *) a_srv_client->ch->stream->session->_inheritor; - dap_chain_net_srv_usage_t * l_usage_active= dap_chain_net_srv_usage_find_unsafe(l_srv_session,a_usage_id); + dap_chain_net_srv_usage_t * l_usage_active = l_srv_session->usage_active;// dap_chain_net_srv_usage_find_unsafe(l_srv_session,a_usage_id); dap_chain_net_srv_ch_vpn_t * l_srv_ch_vpn =(dap_chain_net_srv_ch_vpn_t*) a_srv_client->ch->stream->channel[DAP_CHAIN_NET_SRV_VPN_ID] ? a_srv_client->ch->stream->channel[DAP_CHAIN_NET_SRV_VPN_ID]->internal : NULL; @@ -940,6 +943,45 @@ static int s_callback_response_success(dap_chain_net_srv_t * a_srv, uint32_t a_u log_it(L_WARNING, "VPN channel is not open, will be no data transmission"); l_ret = -2; } + + // set start limits + if(!l_usage_active->is_free){ + switch( l_usage_active->receipt->receipt_info.units_type.enm){ + case SERV_UNIT_DAY:{ + l_srv_session->last_update_ts = time(NULL); + if (l_usage_active->is_grace || l_srv_session->limits_ts == 0) + l_srv_session->limits_ts = (time_t)l_usage_active->receipt->receipt_info.units*24*3600; + log_it(L_INFO,"%"DAP_UINT64_FORMAT_U" seconds more for VPN usage", l_usage_active->receipt->receipt_info.units); + } break; + case SERV_UNIT_SEC:{ + l_srv_session->last_update_ts = time(NULL); + if (l_usage_active->is_grace || l_srv_session->limits_ts == 0) + l_srv_session->limits_ts = (time_t)l_usage_active->receipt->receipt_info.units; + log_it(L_INFO,"%"DAP_UINT64_FORMAT_U" seconds more for VPN usage", l_usage_active->receipt->receipt_info.units); + } break; + case SERV_UNIT_B:{ + if (l_usage_active->is_grace || l_srv_session->limits_bytes == 0) + l_srv_session->limits_bytes = (uintmax_t) l_usage_active->receipt->receipt_info.units; + log_it(L_INFO,"%"DAP_UINT64_FORMAT_U" bytes more for VPN usage", l_usage_active->receipt->receipt_info.units); + } break; + case SERV_UNIT_KB:{ + if (l_usage_active->is_grace || l_srv_session->limits_bytes == 0) + l_srv_session->limits_bytes = 1000ull * ( (uintmax_t) l_usage_active->receipt->receipt_info.units); + log_it(L_INFO,"%"DAP_UINT64_FORMAT_U" bytes more for VPN usage", l_usage_active->receipt->receipt_info.units); + } break; + case SERV_UNIT_MB:{ + if (l_usage_active->is_grace || l_srv_session->limits_bytes == 0) + l_srv_session->limits_bytes = 1000000ull * ( (uintmax_t) l_usage_active->receipt->receipt_info.units); + log_it(L_INFO,"%"DAP_UINT64_FORMAT_U" bytes more for VPN usage", l_usage_active->receipt->receipt_info.units); + } break; + default: { + log_it(L_WARNING, "VPN doesnt accept serv unit type 0x%08X", l_usage_active->receipt->receipt_info.units_type.uint32 ); + dap_stream_ch_set_ready_to_write_unsafe(l_usage_active->client->ch,false); + dap_stream_ch_set_ready_to_read_unsafe(l_usage_active->client->ch,false); + dap_stream_ch_pkt_write_unsafe(l_usage_active->client->ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); + } + } + } pthread_rwlock_unlock(&s_clients_rwlock); return l_ret; } @@ -961,6 +1003,8 @@ static int s_callback_receipt_next_success(dap_chain_net_srv_t * a_srv, uint32_t const dap_chain_datum_tx_receipt_t * l_receipt_next = (const dap_chain_datum_tx_receipt_t *) a_receipt_next; size_t l_receipt_next_size = a_receipt_next_size; + + log_it(L_INFO, "Next receipt successfuly accepted"); // usage is present, we've accepted packets dap_stream_ch_set_ready_to_read_unsafe( l_srv_ch_vpn->ch , true ); @@ -1123,89 +1167,131 @@ static void s_update_limits(dap_stream_ch_t * a_ch , { bool l_issue_new_receipt = false; // Check if there are time limits - if( a_srv_session->limits_ts ){ - if( a_srv_session->limits_ts < time(NULL) ){ // Limits out - a_srv_session->limits_ts = 0; + + if (a_usage->is_free) + return; + + if (a_usage->receipt->receipt_info.units_type.enm == SERV_UNIT_DAY || + a_usage->receipt->receipt_info.units_type.enm == SERV_UNIT_SEC){ + time_t l_current_limit_ts = 0; + if ( a_usage->receipt){ + switch( a_usage->receipt->receipt_info.units_type.enm){ + case SERV_UNIT_DAY:{ + l_current_limit_ts = (time_t)a_usage->receipt->receipt_info.units*24*3600; + } break; + case SERV_UNIT_SEC:{ + l_current_limit_ts = (time_t)a_usage->receipt->receipt_info.units; + } + } + } + + a_srv_session->limits_ts -= time(NULL) - a_srv_session->last_update_ts; + + if(a_srv_session->limits_ts < l_current_limit_ts/2 && !a_usage->receipt_next && !a_usage->is_grace){ + l_issue_new_receipt = true; + } + a_srv_session->last_update_ts = time(NULL); + + + if( a_srv_session->limits_ts <= 0 && !a_usage->is_grace){ log_it(L_INFO, "Limits by timestamp are over. Switch to the next receipt"); DAP_DELETE(a_usage->receipt); a_usage->receipt = a_usage->receipt_next; a_usage->receipt_next = NULL; - l_issue_new_receipt = true; if ( a_usage->receipt){ // If there is next receipt add the time and request the next receipt a_srv_session->limits_units_type.uint32 = a_usage->receipt->receipt_info.units_type.uint32; switch( a_usage->receipt->receipt_info.units_type.enm){ - case SERV_UNIT_DAY:{ - a_srv_session->limits_ts = time(NULL) + (time_t) a_usage->receipt->receipt_info.units*24*3600; - log_it(L_INFO,"%"DAP_UINT64_FORMAT_U" days more for VPN usage", a_usage->receipt->receipt_info.units); - } break; - case SERV_UNIT_SEC:{ - a_srv_session->limits_ts = time(NULL) + (time_t) a_usage->receipt->receipt_info.units; - log_it(L_INFO,"%"DAP_UINT64_FORMAT_U" seconds more for VPN usage", a_usage->receipt->receipt_info.units); - } break; - default: { - log_it(L_WARNING, "VPN doesnt accept serv unit type 0x%08X for limits_ts", a_usage->receipt->receipt_info.units_type.uint32 ); - dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); - dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); - dap_stream_ch_pkt_write_unsafe( a_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); - } + case SERV_UNIT_DAY:{ + a_srv_session->limits_ts += (time_t)a_usage->receipt->receipt_info.units*24*3600; + log_it(L_INFO,"%"DAP_UINT64_FORMAT_U" days more for VPN usage", a_usage->receipt->receipt_info.units); + } break; + case SERV_UNIT_SEC:{ + a_srv_session->limits_ts += (time_t)a_usage->receipt->receipt_info.units; + log_it(L_INFO,"%"DAP_UINT64_FORMAT_U" seconds more for VPN usage", a_srv_session->limits_ts); + } break; + default: { + log_it(L_WARNING, "VPN doesnt accept serv unit type 0x%08X for limits_ts", a_usage->receipt->receipt_info.units_type.uint32 ); + dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); + dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); + dap_stream_ch_pkt_write_unsafe( a_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); } - - //l_ch_vpn->limits_ts = time(NULL) + l_usage->receipt->receipt - }else { + } + }else if (!a_usage->is_grace){ log_it( L_NOTICE, "No activate receipt in usage, switch off write callback for channel"); + dap_stream_ch_chain_net_srv_pkt_error_t l_err = { }; + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND ; dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); - dap_stream_ch_pkt_write_unsafe( a_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); + dap_stream_ch_pkt_write_unsafe( a_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , &l_err, sizeof(l_err)); } } - }else if ( a_srv_session->limits_bytes ){ - if ( a_srv_session->limits_bytes >(uintmax_t) a_bytes ){ - // Update limits - a_srv_session->limits_bytes -= (uintmax_t) a_bytes; - }else{ // traffic out + }else if ( a_usage->receipt->receipt_info.units_type.enm == SERV_UNIT_B || + a_usage->receipt->receipt_info.units_type.enm == SERV_UNIT_KB || + a_usage->receipt->receipt_info.units_type.enm == SERV_UNIT_MB ){ + intmax_t current_limit_bytes = 0; + if ( a_usage->receipt){// if we have active receipt and a_srv_session->last_update_ts == 0 then we counts units by traffic + switch( a_usage->receipt->receipt_info.units_type.enm){ + case SERV_UNIT_B:{ + current_limit_bytes = (uintmax_t) a_usage->receipt->receipt_info.units; + } break; + case SERV_UNIT_KB:{ + current_limit_bytes = 1000ull * ( (uintmax_t) a_usage->receipt->receipt_info.units); + } break; + case SERV_UNIT_MB:{ + current_limit_bytes = 1000000ull * ( (uintmax_t) a_usage->receipt->receipt_info.units); + } break; + } + } + + + a_srv_session->limits_bytes -= (intmax_t) a_bytes; + + if (a_srv_session->limits_bytes && a_srv_session->limits_bytes < current_limit_bytes/2 && ! a_usage->receipt_next){ + l_issue_new_receipt = true; + } + + if (a_srv_session->limits_bytes <= 0 && !a_usage->is_grace){ log_it(L_INFO, "Limits by traffic is over. Switch to the next receipt"); DAP_DELETE(a_usage->receipt); a_usage->receipt = a_usage->receipt_next; a_usage->receipt_next = NULL; - l_issue_new_receipt = true; if ( a_usage->receipt){ // If there is next receipt add the time and request the next receipt a_srv_session->limits_units_type.uint32 = a_usage->receipt->receipt_info.units_type.uint32; switch( a_usage->receipt->receipt_info.units_type.enm){ - case SERV_UNIT_B:{ - a_srv_session->limits_bytes += (uintmax_t) a_usage->receipt->receipt_info.units; - log_it(L_INFO,"%"DAP_UINT64_FORMAT_U" bytes more for VPN usage", a_usage->receipt->receipt_info.units); - } break; - case SERV_UNIT_KB:{ - a_srv_session->limits_bytes += 1000ull * ( (uintmax_t) a_usage->receipt->receipt_info.units); - log_it(L_INFO,"%"DAP_UINT64_FORMAT_U" bytes more for VPN usage", a_usage->receipt->receipt_info.units); - } break; - case SERV_UNIT_MB:{ - a_srv_session->limits_bytes += 1000000ull * ( (uintmax_t) a_usage->receipt->receipt_info.units); - log_it(L_INFO,"%"DAP_UINT64_FORMAT_U" bytes more for VPN usage", a_usage->receipt->receipt_info.units); - } break; - default: { - log_it(L_WARNING, "VPN doesnt accept serv unit type 0x%08X for limits_bytes", a_usage->receipt->receipt_info.units_type.uint32 ); - dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); - dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); - dap_stream_ch_pkt_write_unsafe( a_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); - } + case SERV_UNIT_B:{ + a_srv_session->limits_bytes += (uintmax_t) a_usage->receipt->receipt_info.units; + log_it(L_INFO,"%"DAP_UINT64_FORMAT_U" bytes more for VPN usage", a_usage->receipt->receipt_info.units); + } break; + case SERV_UNIT_KB:{ + a_srv_session->limits_bytes += 1000ull * ( (uintmax_t) a_usage->receipt->receipt_info.units); + log_it(L_INFO,"%"DAP_UINT64_FORMAT_U" bytes more for VPN usage", a_usage->receipt->receipt_info.units); + } break; + case SERV_UNIT_MB:{ + a_srv_session->limits_bytes += 1000000ull * ( (uintmax_t) a_usage->receipt->receipt_info.units); + log_it(L_INFO,"%"DAP_UINT64_FORMAT_U" bytes more for VPN usage", a_usage->receipt->receipt_info.units); + } break; + default: { + log_it(L_WARNING, "VPN doesnt accept serv unit type 0x%08X for limits_bytes", a_usage->receipt->receipt_info.units_type.uint32 ); + dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); + dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); + dap_stream_ch_pkt_write_unsafe( a_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); } - - - }else { + } + }else if (!a_usage->is_grace){ log_it( L_NOTICE, "No activate receipt in usage, switch off write callback for channel"); + dap_stream_ch_chain_net_srv_pkt_error_t l_err = { }; + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND ; dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); - dap_stream_ch_pkt_write_unsafe( a_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); - + dap_stream_ch_pkt_write_unsafe( a_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , &l_err, sizeof(l_err)); } - } - } + } // If issue new receipt - if ( l_issue_new_receipt ) { + if ( l_issue_new_receipt && !dap_hash_fast_is_blank(&a_usage->tx_cond_hash)) { if ( a_usage->receipt){ + log_it( L_NOTICE, "Send next receipt to sign"); a_usage->receipt_next = dap_chain_net_srv_issue_receipt(a_usage->service, a_usage->price, NULL, 0); dap_stream_ch_pkt_write_unsafe(a_usage->client->ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST, a_usage->receipt_next, a_usage->receipt_next->size); @@ -1429,7 +1515,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_pkt_t * l_pkt = (dap_stream_ch_pkt_t *) a_arg; dap_chain_net_srv_stream_session_t * l_srv_session = DAP_CHAIN_NET_SRV_STREAM_SESSION (a_ch->stream->session ); dap_chain_net_srv_ch_vpn_t *l_ch_vpn = CH_VPN(a_ch); - dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find_unsafe(l_srv_session, l_ch_vpn->usage_id); + dap_chain_net_srv_usage_t * l_usage = l_srv_session->usage_active;// dap_chain_net_srv_usage_find_unsafe(l_srv_session, l_ch_vpn->usage_id); if ( ! l_usage){ log_it(L_NOTICE, "No active usage in list, possible disconnected. Send nothing on this channel"); @@ -1545,7 +1631,7 @@ static void s_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) dap_chain_net_srv_stream_session_t * l_srv_session = DAP_CHAIN_NET_SRV_STREAM_SESSION( a_ch->stream->session ); dap_chain_net_srv_ch_vpn_t *l_ch_vpn = CH_VPN(a_ch); - dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find_unsafe(l_srv_session, l_ch_vpn->usage_id); + dap_chain_net_srv_usage_t * l_usage = l_srv_session->usage_active;// dap_chain_net_srv_usage_find_unsafe(l_srv_session, l_ch_vpn->usage_id); if ( ! l_usage){ log_it(L_NOTICE, "No active usage in list, possible disconnected. Send nothing on this channel"); dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); @@ -1707,7 +1793,7 @@ static void s_es_tun_read(dap_events_socket_t * a_es, void * arg) if ( !l_vpn_info->is_on_this_worker && !l_vpn_info->is_reassigned_once && s_raw_server->auto_cpu_reassignment) { log_it(L_NOTICE, "Reassigning from worker %u to %u", l_vpn_info->worker->id, a_es->context->worker->id); l_vpn_info->is_reassigned_once = true; - s_tun_send_msg_esocket_reassigned_all_inter(a_es->worker->id, l_vpn_info->ch_vpn, l_vpn_info->esocket, l_vpn_info->esocket_uuid, + s_tun_send_msg_esocket_reassigned_all_inter(a_es->context->worker->id, l_vpn_info->ch_vpn, l_vpn_info->esocket, l_vpn_info->esocket_uuid, l_vpn_info->addr_ipv4); dap_events_socket_reassign_between_workers_mt(l_vpn_info->worker, l_vpn_info->esocket, a_es->context->worker); } diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 8fc47e70fbec840fa8ac381cfa693c0fafc1d7be..886a66a0225927d8fb27390317503cbd92da1877 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -881,7 +881,8 @@ void s_dag_events_lasts_delete_linked_with_event(dap_chain_cs_dag_t * a_dag, dap for (size_t i = 0; i< a_event->header.hash_count; i++) { dap_chain_hash_fast_t * l_hash = ((dap_chain_hash_fast_t *) a_event->hashes_n_datum_n_signs) + i; dap_chain_cs_dag_event_item_t * l_event_item = NULL; - HASH_FIND(hh, PVT(a_dag)->events_lasts_unlinked ,l_hash ,sizeof (*l_hash), l_event_item); + dap_chain_cs_dag_pvt_t * l_dag_pvt = PVT(a_dag); + HASH_FIND(hh, l_dag_pvt->events_lasts_unlinked ,l_hash ,sizeof (*l_hash), l_event_item); if ( l_event_item ){ HASH_DEL(PVT(a_dag)->events_lasts_unlinked,l_event_item); DAP_DEL_Z(l_event_item); @@ -1616,7 +1617,7 @@ static int s_cli_dag(int argc, char ** argv, char **a_str_reply) if (s_callback_add_datums(l_chain, &l_datum, 1)) { char *l_datums_datum_hash_str; dap_get_data_hash_str_static(l_datum->data, l_datum->header.data_size, l_datums_datum_hash_str); - if (dap_global_db_del_sync(l_datum_hash_str, l_gdb_group_mempool)) { + if (!dap_global_db_del_sync(l_datum_hash_str, l_gdb_group_mempool)) { dap_cli_server_cmd_set_reply_text(a_str_reply, "Converted datum %s from mempool to event in the new forming round ", l_datum_hash_str);