diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 456294610c955843b4ca9a8dc1dd03d0c0b2eaf5..b423cc66a0394cc1de75b6ff2232563ec338b120 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -77,7 +77,7 @@ #define DAP_ENC_KS_KEY_ID_SIZE 33 #endif -static void s_stage_status_after(dap_client_pvt_t * a_client_internal); +static bool s_stage_status_after(dap_client_pvt_t * a_client_internal); // ENC stage callbacks static void s_enc_init_response(dap_client_t *, void *, size_t); @@ -188,7 +188,7 @@ static void s_stream_connected(dap_client_pvt_t * a_client_pvt) * @brief s_client_internal_stage_status_proc * @param a_client */ -static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) +static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt) { dap_worker_t * l_worker= a_client_pvt->worker; assert(l_worker); @@ -213,7 +213,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) } a_client_pvt->stage_status = STAGE_STATUS_DONE; s_stage_status_after(a_client_pvt); - return; + return false; } switch (l_stage) { case STAGE_ENC_INIT: { @@ -474,10 +474,9 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) // Trying the step again a_client_pvt->stage_status = STAGE_STATUS_IN_PROGRESS; log_it(L_INFO, "Connection attempt %d in 0.3 seconds", a_client_pvt->stage_errors); - // small delay before next request - dap_timerfd_start_on_worker( l_worker, 300, (dap_timerfd_callback_t)s_stage_status_after, - a_client_pvt, false ); + dap_timerfd_start_on_worker(l_worker, 300, (dap_timerfd_callback_t)s_stage_status_after, + a_client_pvt); } else{ log_it(L_INFO, "Too many connection attempts. Tries are over."); @@ -514,7 +513,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) if(a_client_pvt->stage_status_callback) a_client_pvt->stage_status_callback(a_client_pvt->client, NULL); - + return false; } /** diff --git a/dap-sdk/net/core/dap_timerfd.c b/dap-sdk/net/core/dap_timerfd.c index b65f82e41e8a85463b9248d6f99234f2a8f3a887..95d8146c391654eb92ec3edbdd1ee20be511483a 100644 --- a/dap-sdk/net/core/dap_timerfd.c +++ b/dap-sdk/net/core/dap_timerfd.c @@ -59,9 +59,9 @@ int dap_timerfd_init() * @param a_callback * @return new allocated dap_timerfd_t structure or NULL if error */ -dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg, bool a_repeated) +dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg) { - return dap_timerfd_start_on_worker(dap_events_worker_get_auto(), a_timeout_ms, a_callback, a_callback_arg, a_repeated); + return dap_timerfd_start_on_worker(dap_events_worker_get_auto(), a_timeout_ms, a_callback, a_callback_arg); } #ifdef DAP_OS_WINDOWS @@ -84,7 +84,7 @@ void __stdcall TimerAPCb(void* arg, DWORD low, DWORD high) { // Timer high valu * @param a_callback_arg * @return */ -dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg, bool a_repeated) +dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg) { dap_timerfd_t *l_timerfd = DAP_NEW(dap_timerfd_t); @@ -149,7 +149,6 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t l_timerfd->events_socket = l_events_socket; l_timerfd->callback = a_callback; l_timerfd->callback_arg = a_callback_arg; - l_timerfd->repeated = a_repeated; #ifdef DAP_OS_WINDOWS l_timerfd->th = l_th; l_timerfd->pipe_in = l_pipe[1]; @@ -168,9 +167,9 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t static void s_es_callback_timer(struct dap_events_socket *a_event_sock) { dap_timerfd_t *l_timerfd = a_event_sock->_inheritor; - if(l_timerfd->callback) - l_timerfd->callback(l_timerfd->callback_arg); - if (l_timerfd->repeated) { + // run user's callback + if(l_timerfd->callback && l_timerfd->callback(l_timerfd->callback_arg)) { + //printf("\nread() returned %d, %d\n", l_ptiu64, l_read_ret); #if defined DAP_OS_UNIX struct itimerspec l_ts; // repeat never @@ -192,6 +191,7 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock) #endif dap_events_socket_set_readable_unsafe(a_event_sock, true); } else { + close(l_timerfd->tfd); #if defined DAP_OS_WINDOWS CloseHandle(l_timerfd->th); #endif diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 901e5960d1fd2b903ad373f04353f1ad879d852a..8c23b73657e12beff1d931ceaa073b88a44084a5 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -43,7 +43,7 @@ static time_t s_connection_timeout = 20000; // 60; // seconds -static void s_socket_all_check_activity( void * a_arg); +static bool s_socket_all_check_activity( void * a_arg); static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg); static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg); static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_arg); @@ -114,7 +114,7 @@ void *dap_worker_thread(void *arg) l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback ); l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker, s_connection_timeout * 1000 / 2, - s_socket_all_check_activity, l_worker, true ); + s_socket_all_check_activity, l_worker); pthread_setspecific(l_worker->events->pth_key_worker, l_worker); pthread_cond_broadcast(&l_worker->started_cond); @@ -663,7 +663,7 @@ static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg) * @brief s_socket_all_check_activity * @param a_arg */ -static void s_socket_all_check_activity( void * a_arg) +static bool s_socket_all_check_activity( void * a_arg) { dap_worker_t *l_worker = (dap_worker_t*) a_arg; assert(l_worker); @@ -685,6 +685,7 @@ static void s_socket_all_check_activity( void * a_arg) } } } + return true; } /** diff --git a/dap-sdk/net/core/include/dap_timerfd.h b/dap-sdk/net/core/include/dap_timerfd.h index 097f8cf68626073317f0e9f11f9f173fc1f14e05..b578d02ccf884e6bdb5abf27fc4888bd2a53522e 100644 --- a/dap-sdk/net/core/include/dap_timerfd.h +++ b/dap-sdk/net/core/include/dap_timerfd.h @@ -38,7 +38,7 @@ #include "dap_common.h" #include "dap_events_socket.h" -typedef void (*dap_timerfd_callback_t)(void* ); // Callback for timer +typedef bool (*dap_timerfd_callback_t)(void* ); // Callback for timer. If return true, it will be called after next timeout typedef struct dap_timerfd { uint64_t timeout_ms; @@ -46,7 +46,6 @@ typedef struct dap_timerfd { dap_events_socket_t *events_socket; dap_timerfd_callback_t callback; void *callback_arg; - bool repeated; #ifdef DAP_OS_WINDOWS HANDLE th; int pipe_in; @@ -54,7 +53,7 @@ typedef struct dap_timerfd { } dap_timerfd_t; int dap_timerfd_init(); -dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *callback_arg, bool a_repeated); -dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg, bool a_repeated); +dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *callback_arg); +dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg); void dap_timerfd_delete(dap_timerfd_t *l_timerfd); diff --git a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c index ab462092341907aff6aa4fda8094bcdbc5c6bdb0..b13e46355dfe1e14a1952126f9a76d0619a17f9e 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c @@ -162,8 +162,8 @@ bool dap_stream_ch_check_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t * */ size_t dap_stream_ch_pkt_write_unsafe(dap_stream_ch_t * a_ch, uint8_t a_type, const void * a_data, size_t a_data_size) { - if (!a_data_size || !a_ch || !a_data) { - log_it(L_WARNING, "NULL ptr or zero data size to write out in channel"); + if (!a_ch) { + log_it(L_WARNING, "Channel is NULL ptr"); return 0; } //log_it(L_DEBUG,"Output: Has %u bytes of %c type for %c channel id",data_size, (char)type, (char) ch->proc->id ); @@ -200,7 +200,7 @@ size_t dap_stream_ch_pkt_write_unsafe(dap_stream_ch_t * a_ch, uint8_t a_type, c size_t l_ret=dap_stream_pkt_write_unsafe(a_ch->stream,l_buf_selected,a_data_size+sizeof(l_hdr)); a_ch->stat.bytes_write+=a_data_size; - a_ch->ready_to_write=true; + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); if(l_buf_allocated) DAP_DELETE(l_buf_allocated); diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index f9e254a58693aa2b2fdc98ab9fad725366dadb8f..4a0c29119b293aeeb6806e0a580ff3a5c5a74d08 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -75,13 +75,12 @@ static void s_http_client_delete(dap_http_client_t * a_esocket, void * a_arg); static dap_stream_t *s_stream_keepalive_list = NULL; static pthread_mutex_t s_mutex_keepalive_list; -static void s_keepalive_cb( void ); +static bool s_keepalive_cb( void ); static bool s_dump_packet_headers = false; bool dap_stream_get_dump_packet_headers(){ return s_dump_packet_headers; } -static struct timespec keepalive_loop_sleep = { 0, STREAM_KEEPALIVE_TIMEOUT * 1000 * 1000 }; static bool s_detect_loose_packet(dap_stream_t * a_stream); dap_enc_key_type_t s_stream_get_preferred_encryption_type = DAP_ENC_KEY_TYPE_IAES; @@ -119,7 +118,7 @@ int dap_stream_init(dap_config_t * a_config) s_dap_stream_load_preferred_encryption_type(a_config); s_dump_packet_headers = dap_config_get_item_bool_default(g_config,"general","debug_dump_stream_headers",false); pthread_mutex_init( &s_mutex_keepalive_list, NULL ); - dap_timerfd_start(STREAM_KEEPALIVE_TIMEOUT * 1000, (dap_timerfd_callback_t)s_keepalive_cb, NULL, true); + dap_timerfd_start(STREAM_KEEPALIVE_TIMEOUT * 1000, (dap_timerfd_callback_t)s_keepalive_cb, NULL); log_it(L_NOTICE,"Init streaming module"); return 0; @@ -786,7 +785,7 @@ static bool s_detect_loose_packet(dap_stream_t * a_stream) } -static void s_keepalive_cb( void ) +static bool s_keepalive_cb( void ) { dap_stream_t *l_stream, *tmp; pthread_mutex_lock( &s_mutex_keepalive_list ); @@ -797,5 +796,6 @@ static void s_keepalive_cb( void ) dap_events_socket_write_mt(l_stream->stream_worker->worker, l_stream->esocket, &l_pkt, sizeof(l_pkt)); } pthread_mutex_unlock( &s_mutex_keepalive_list ); + return true; } diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c index cd4c5958021f28578725720bfb35ff21e2bba44f..67e970a83906c5ddb040d52f36008792b57f5da3 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c @@ -128,6 +128,223 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch , void* a_arg) dap_chain_net_srv_call_closed_all( a_ch); } +static bool s_unban_client(dap_chain_net_srv_banlist_item_t *a_item) +{ + pthread_mutex_lock(a_item->ht_mutex); + HASH_DEL(*(a_item->ht_head), a_item); + pthread_mutex_unlock(a_item->ht_mutex); + DAP_DELETE(a_item); + return false; +} + +static bool s_grace_period_control(dap_chain_net_srv_grace_t *a_grace) +{ + assert(a_grace); + dap_stream_ch_chain_net_srv_pkt_error_t l_err; + memset(&l_err, 0, sizeof(l_err)); + dap_chain_net_srv_t * l_srv = NULL; + dap_stream_ch_t *l_ch = a_grace->ch; + + if (!dap_stream_ch_check_unsafe(a_grace->stream_worker, l_ch)) + goto free_exit; + + dap_chain_net_srv_stream_session_t *l_srv_session = l_ch && l_ch->stream && l_ch->stream->session ? + (dap_chain_net_srv_stream_session_t *)l_ch->stream->session->_inheritor : NULL; + if (!l_srv_session) + goto free_exit; + + dap_stream_ch_chain_net_srv_pkt_request_t *l_request = a_grace->request; + l_srv = dap_chain_net_srv_get( l_request->hdr.srv_uid ); + dap_chain_net_t * l_net = dap_chain_net_by_id( l_request->hdr.net_id ); + + l_err.net_id.uint64 = l_request->hdr.net_id.uint64; + l_err.srv_uid.uint64 = l_request->hdr.srv_uid.uint64; + + if ( ! l_net ) // Network not found + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NOT_FOUND; + + if ( ! l_srv ) // Service not found + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND; + + if ( l_err.code ){ + goto free_exit; + } + + dap_ledger_t * l_ledger =l_net->pub.ledger; + dap_chain_datum_tx_t * l_tx = NULL; + dap_chain_tx_out_cond_t * l_tx_out_cond = NULL; + bool l_grace_start = false; + if (l_srv->pricelist ){ // Is present pricelist, not free service + + if ( !l_ledger ){ // No ledger + log_it( L_WARNING, "No Ledger"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NO_LEDGER ; + goto free_exit; + } + + l_tx = dap_chain_ledger_tx_find_by_hash( l_ledger,& l_request->hdr.tx_cond ); + if ( ! l_tx ){ // No tx cond transaction + if (a_grace->usage) { // marker for reentry to function + log_it( L_WARNING, "No tx cond transaction"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND ; + goto free_exit; + } else + l_grace_start = true; + } + if (!l_grace_start) { + int l_tx_out_cond_size =0; + l_tx_out_cond = (dap_chain_tx_out_cond_t *) + dap_chain_datum_tx_item_get(l_tx, NULL, TX_ITEM_TYPE_OUT_COND, &l_tx_out_cond_size ); + + if ( ! l_tx_out_cond ) { // No conditioned output + log_it( L_WARNING, "No conditioned output"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; + goto free_exit; + } + + // Check cond output if it equesl or not to request + if ( l_tx_out_cond->subtype.srv_pay.srv_uid.uint64 != l_request->hdr.srv_uid.uint64 ){ + log_it( L_WARNING, "Wrong service uid in request, tx expect to close its output with 0x%016lX", + l_tx_out_cond->subtype.srv_pay.srv_uid ); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_WRONG_SRV_UID ; + goto free_exit; + } + } + } + dap_chain_net_srv_usage_t *l_usage = NULL; + if (!a_grace->usage) { + l_usage = dap_chain_net_srv_usage_add(l_srv_session, l_net, l_srv); + if ( !l_usage ){ // Usage can't add + log_it( L_WARNING, "Usage can't add"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_USAGE_CANT_ADD; + goto free_exit; + } + + l_err.usage_id = l_usage->id; + + // Create one client + l_usage->client = DAP_NEW_Z( dap_chain_net_srv_client_t); + l_usage->client->stream_worker = l_ch->stream_worker; + l_usage->client->ch = l_ch; + l_usage->client->session_id = l_ch->stream->session->id; + l_usage->client->ts_created = time(NULL); + l_usage->tx_cond = l_tx; + memcpy(&l_usage->tx_cond_hash, &l_request->hdr.tx_cond,sizeof (l_usage->tx_cond_hash)); + l_usage->ts_created = time(NULL); + } else { + l_usage = a_grace->usage; + l_usage->tx_cond = l_tx; + } + dap_chain_net_srv_price_t * l_price = NULL; + dap_chain_datum_tx_receipt_t * l_receipt = NULL; + const char * l_ticker = NULL; + if (l_srv->pricelist && !l_grace_start) { + l_ticker = dap_chain_ledger_tx_get_token_ticker_by_hash(l_ledger, &l_request->hdr.tx_cond ); + dap_stpcpy(l_usage->token_ticker, l_ticker); + + dap_chain_net_srv_price_t *l_price_tmp; + DL_FOREACH(l_srv->pricelist, l_price_tmp) { + if (l_price_tmp->net->pub.id.uint64 == l_request->hdr.net_id.uint64 + && dap_strcmp(l_price_tmp->token, l_ticker) == 0 + && l_price_tmp->units_uid.enm == l_tx_out_cond->subtype.srv_pay.unit.enm + )//&& (l_price_tmp->value_datoshi/l_price_tmp->units) < l_tx_out_cond->subtype.srv_pay.header.unit_price_max_datoshi) + { + l_price = l_price_tmp; + break; + } + } + if ( !l_price ) { + log_it( L_WARNING, "Request can't be processed because no acceptable price in pricelist for token %s in network %s", + l_ticker, l_net->pub.name ); + l_err.code =DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_ACCEPT_TOKEN; + goto free_exit; + } + } + int ret; + if ((ret = l_srv->callback_requested(l_srv, l_usage->id, l_usage->client, l_request, a_grace->request_size)) != 0) { + log_it( L_WARNING, "Request canceled by service callback, return code %d", ret); + l_err.code = (uint32_t) ret ; + goto free_exit; + } + + if ( l_srv->pricelist) { + if (l_price || l_grace_start) { + if (l_price) { + if (a_grace->usage) { + DAP_DELETE(l_usage->price); + } + } else { + l_price = DAP_NEW_Z(dap_chain_net_srv_price_t); + memcpy(l_price, l_srv->pricelist, sizeof(*l_price)); + l_price->value_datoshi = 0; + l_price->value_coins = 0; + } + l_usage->price = l_price; + // TODO extend callback to pass ext and ext size from service callbacks + l_receipt = dap_chain_net_srv_issue_receipt( l_usage->service, l_usage, l_usage->price,NULL,0 ); + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST, l_receipt, l_receipt->size); + }else{ + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_PRICE_NOT_FOUND ; + goto free_exit; + } + // If we a here we passed all the checks, wow, now if we're not for free we request the signature. + } else{ + log_it( L_INFO, "Service provide for free"); + l_usage->is_free = true; + size_t l_success_size = sizeof (dap_stream_ch_chain_net_srv_pkt_success_hdr_t ); + dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, + l_success_size); + l_success->hdr.usage_id = l_usage->id; + l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; + l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, l_success, l_success_size); + if ( l_usage->service->callback_response_success ) + l_usage->service->callback_response_success ( l_usage->service, l_usage->id, l_usage->client, NULL, 0 ); + DAP_DELETE(l_success); + } + if (l_grace_start) { + l_usage->is_grace = true; + a_grace->usage = l_usage; + dap_timerfd_start_on_worker(a_grace->stream_worker->worker, l_srv->grace_period * 1000, + (dap_timerfd_callback_t)s_grace_period_control, a_grace); + return false; + } else { + DAP_DELETE(a_grace->request); + DAP_DELETE(a_grace); + return false; + } +free_exit: + if (l_err.code) { + dap_stream_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err)); + if (l_srv && l_srv->callback_response_error) + l_srv->callback_response_error(l_srv, 0, NULL, &l_err, sizeof(l_err)); + } + if (a_grace->usage) { // add client pkey hash to banlist + a_grace->usage->is_active = false; + if (l_srv) { + dap_chain_net_srv_banlist_item_t *l_item = NULL; + pthread_mutex_lock(&l_srv->banlist_mutex); + HASH_FIND(hh, l_srv->ban_list, &a_grace->usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); + if (l_item) + pthread_mutex_unlock(&l_srv->banlist_mutex); + else { + l_item = DAP_NEW_Z(dap_chain_net_srv_banlist_item_t); + memcpy(&l_item->client_pkey_hash, &a_grace->usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t)); + l_item->ht_mutex = &l_srv->banlist_mutex; + l_item->ht_head = &l_srv->ban_list; + HASH_ADD(hh, l_srv->ban_list, client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); + pthread_mutex_unlock(&l_srv->banlist_mutex); + dap_timerfd_start(l_srv->grace_period * 10000, (dap_timerfd_callback_t)s_unban_client, l_item); + } + } + } + else if (l_usage) + dap_chain_net_srv_usage_delete(l_srv_session, l_usage); + DAP_DELETE(a_grace->request); + DAP_DELETE(a_grace); + return false; +} + /** * @brief s_stream_ch_packet_in * @param ch @@ -185,13 +402,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) l_request_out->send_time2.tv_usec = l_tval.tv_usec; // send response - if(dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE, l_request_out, l_request_out->data_size + sizeof(dap_stream_ch_chain_net_srv_pkt_test_t))) { - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE, l_request_out, + l_request_out->data_size + sizeof(dap_stream_ch_chain_net_srv_pkt_test_t)); DAP_DELETE(l_request_out); + } break; - } - break; // for receive test data. case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE: { dap_stream_ch_chain_net_srv_pkt_test_t *l_request = (dap_stream_ch_chain_net_srv_pkt_test_t *) l_ch_pkt->data; @@ -207,8 +422,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) l_request->err_code += 4; } dap_stream_ch_set_ready_to_write_unsafe(a_ch, false); - } - break; + } break; // only for server case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_REQUEST:{ @@ -216,180 +430,16 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) log_it( L_WARNING, "Wrong request size, less than minimum"); break; } + dap_chain_net_srv_grace_t *l_grace = DAP_NEW_Z(dap_chain_net_srv_grace_t); // Parse the request - dap_stream_ch_chain_net_srv_pkt_request_t * l_request =(dap_stream_ch_chain_net_srv_pkt_request_t *) l_ch_pkt->data; - //size_t l_request_size = l_ch_pkt->hdr.size; - dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get( l_request->hdr.srv_uid ); - dap_chain_net_t * l_net = dap_chain_net_by_id( l_request->hdr.net_id ); - - l_err.net_id.uint64 = l_request->hdr.net_id.uint64; - l_err.srv_uid.uint64 = l_request->hdr.srv_uid.uint64; - - if ( ! l_net ) // Network not found - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NOT_FOUND; - - if ( ! l_srv ) // Service not found - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND; - - if ( l_err.code ){ - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv && l_srv->callback_response_error) - l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); - break; - } - - dap_ledger_t * l_ledger =l_net->pub.ledger; - dap_chain_datum_tx_t * l_tx = NULL; - dap_chain_tx_out_cond_t * l_tx_out_cond = NULL; - if (l_srv->pricelist ){ // Is present pricelist, not free service - - if ( !l_ledger ){ // No ledger - log_it( L_WARNING, "No Ledger"); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_NETWORK_NO_LEDGER ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv->callback_response_error) - l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); - break; - } - - l_tx = dap_chain_ledger_tx_find_by_hash( l_ledger,& l_request->hdr.tx_cond ); - if ( ! l_tx ){ // No tx cond transaction - log_it( L_WARNING, "No tx cond transaction"); - /// TODO Add tx cond treshold and ability to provide service before the transaction comes from CDB - /// - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv->callback_response_error) - l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); - break; - } - int l_tx_out_cond_size =0; - l_tx_out_cond = (dap_chain_tx_out_cond_t *) - dap_chain_datum_tx_item_get(l_tx, NULL, TX_ITEM_TYPE_OUT_COND, &l_tx_out_cond_size ); - - if ( ! l_tx_out_cond ) { // No conditioned output - log_it( L_WARNING, "No conditioned output"); - - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv->callback_response_error) - l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); - break; - } - - // Check cond output if it equesl or not to request - if ( l_tx_out_cond->subtype.srv_pay.srv_uid.uint64 != l_request->hdr.srv_uid.uint64 ){ - log_it( L_WARNING, "Wrong service uid in request, tx expect to close its output with 0x%016lX", - l_tx_out_cond->subtype.srv_pay.srv_uid ); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_WRONG_SRV_UID ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv->callback_response_error) - l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); - break; - } - } - dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_add( l_srv_session, - l_net,l_srv ); - if ( !l_usage ){ // Usage can't add - log_it( L_WARNING, "Usage can't add"); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_USAGE_CANT_ADD; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv->callback_response_error) - l_srv->callback_response_error(l_srv,0,NULL,&l_err,sizeof (l_err) ); - break; - } - - l_err.usage_id = l_usage->id; - - // Create one client - l_usage->client = DAP_NEW_Z( dap_chain_net_srv_client_t); - l_usage->client->stream_worker = a_ch->stream_worker; - l_usage->client->ch = a_ch; - l_usage->client->session_id = a_ch->stream->session->id; - l_usage->client->ts_created = time(NULL); - l_usage->tx_cond = l_tx; - memcpy(&l_usage->tx_cond_hash, &l_request->hdr.tx_cond,sizeof (l_usage->tx_cond_hash)); - l_usage->ts_created = time(NULL); - - dap_chain_net_srv_price_t * l_price = NULL; - dap_chain_datum_tx_receipt_t * l_receipt = NULL; - const char * l_ticker = NULL; - if (l_srv->pricelist ){ - l_ticker = dap_chain_ledger_tx_get_token_ticker_by_hash(l_ledger, &l_request->hdr.tx_cond ); - dap_stpcpy(l_usage->token_ticker, l_ticker); - - dap_chain_net_srv_price_t *l_price_tmp; - DL_FOREACH(l_srv->pricelist, l_price_tmp) { - if (l_price_tmp->net->pub.id.uint64 == l_request->hdr.net_id.uint64 - && dap_strcmp(l_price_tmp->token, l_ticker) == 0 - && l_price_tmp->units_uid.enm == l_tx_out_cond->subtype.srv_pay.unit.enm - )//&& (l_price_tmp->value_datoshi/l_price_tmp->units) < l_tx_out_cond->subtype.srv_pay.header.unit_price_max_datoshi) - { - l_price = l_price_tmp; - break; - } - } - if ( !l_price ) { - log_it( L_WARNING, "Request can't be processed because no acceptable price in pricelist for token %s in network %s", - l_ticker, l_net->pub.name ); - dap_chain_net_srv_usage_delete(l_srv_session, l_usage); - l_err.code =DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_ACCEPT_TOKEN; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv->callback_response_error) - l_srv->callback_response_error(l_srv,l_usage->id,l_usage->client,&l_err,sizeof (l_err) ); - break; - } - } - int ret; - if ( (ret= l_srv->callback_requested(l_srv,l_usage->id, l_usage->client, l_request, l_ch_pkt->hdr.size ) )!= 0 ){ - log_it( L_WARNING, "Request canceled by service callback, return code %d", ret); - dap_chain_net_srv_usage_delete(l_srv_session, l_usage); - l_err.code = (uint32_t) ret ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv->callback_response_error) - l_srv->callback_response_error(l_srv,l_usage->id, NULL,&l_err,sizeof (l_err) ); - break; - } - - if ( l_srv->pricelist ){ - if ( l_price ){ - l_usage->price = l_price; - // TODO extend callback to pass ext and ext size from service callbacks - l_receipt = dap_chain_net_srv_issue_receipt( l_usage->service, l_usage, l_usage->price,NULL,0 ); - dap_stream_ch_pkt_write_unsafe( a_ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST , - l_receipt, l_receipt->size); - - }else{ - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_PRICE_NOT_FOUND ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_srv->callback_response_error) - l_srv->callback_response_error( l_srv, l_usage->id, NULL, &l_err, sizeof( l_err ) ); - } - // If we a here we passed all the checks, wow, now if we're not for free we request the signature. - } else{ - log_it( L_INFO, "Service provide for free"); - l_usage->is_free = true; - size_t l_success_size = sizeof (dap_stream_ch_chain_net_srv_pkt_success_hdr_t ); - dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, - l_success_size); - l_success->hdr.usage_id = l_usage->id; - l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; - l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; - - if (dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS, - l_success, l_success_size)) { - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } - - if ( l_usage->service->callback_receipt_first_success ) - l_usage->service->callback_receipt_first_success ( l_usage->service, l_usage->id, l_usage->client, NULL, 0 ); - DAP_DELETE(l_success); - - } - // l_receipt used in l_usage->receipt - //if(l_receipt) - // DAP_DELETE(l_receipt); + l_grace->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, l_ch_pkt->hdr.size); + memcpy(l_grace->request, l_ch_pkt->data, l_ch_pkt->hdr.size); + l_grace->request_size = l_ch_pkt->hdr.size; + l_grace->ch = a_ch; + l_grace->stream_worker = a_ch->stream_worker; + s_grace_period_control(l_grace); } break; + // only for client case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST:{ log_it( L_NOTICE, "Requested smth to sign"); @@ -401,58 +451,61 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) l_receipt->receipt_info.units, l_receipt->receipt_info.value_datoshi, l_receipt->exts_n_signs, l_receipt->exts_size); - - //l_srv_session->usages ///l_usage->service->uid.uint64; //dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find( l_srv_session, l_pkt->hdr.usage_id ); dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_ch_chain_net_srv->srv_uid); - if(l_srv && l_srv->callback_client_sign_request) { - // Sign receipt - l_srv->callback_client_sign_request(l_srv, 0, NULL, &l_receipt_new, l_receipt_size); - if(dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE, - l_receipt_new, l_receipt_new->size)) { - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + if(l_srv && l_srv->callback_client_sign_request) { + // Sign receipt + l_srv->callback_client_sign_request(l_srv, 0, NULL, &l_receipt_new, l_receipt_size); + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE, + l_receipt_new, l_receipt_new->size); } - } DAP_DELETE(l_receipt_new); // TODO sign smth } break; + // only for server case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE:{ - if ( l_ch_pkt->hdr.size > sizeof(dap_chain_receipt_info_t)+1 ){ - dap_chain_datum_tx_receipt_t * l_receipt = (dap_chain_datum_tx_receipt_t *) l_ch_pkt->data; - size_t l_receipt_size = l_ch_pkt->hdr.size; - dap_chain_net_srv_usage_t * l_usage= NULL, *l_tmp= NULL; - bool l_is_found = false; - pthread_mutex_lock(& l_srv_session->parent->mutex ); - HASH_ITER(hh, l_srv_session->usages, l_usage, l_tmp){ - if ( l_usage->receipt_next ){ // If we have receipt next - if ( memcmp(&l_usage->receipt_next->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){ - l_is_found = true; - break; - } - }else if (l_usage->receipt ){ // If we sign first receipt - if ( memcmp(&l_usage->receipt->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){ - l_is_found = true; - break; - } + if (l_ch_pkt->hdr.size <= sizeof(dap_chain_receipt_info_t) + 1) { + log_it(L_ERROR, "Wrong sign response size, %zd when expected at least %zd with smth", l_ch_pkt->hdr.size, + sizeof(dap_chain_receipt_info_t)+1 ); + break; + } + dap_chain_datum_tx_receipt_t * l_receipt = (dap_chain_datum_tx_receipt_t *) l_ch_pkt->data; + size_t l_receipt_size = l_ch_pkt->hdr.size; + dap_chain_net_srv_usage_t * l_usage= NULL, *l_tmp= NULL; + bool l_is_found = false; + pthread_mutex_lock(& l_srv_session->parent->mutex ); + HASH_ITER(hh, l_srv_session->usages, l_usage, l_tmp){ + if ( l_usage->receipt_next ){ // If we have receipt next + if ( memcmp(&l_usage->receipt_next->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){ + l_is_found = true; + break; + } + }else if (l_usage->receipt ){ // If we sign first receipt + if ( memcmp(&l_usage->receipt->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){ + l_is_found = true; + break; } } - pthread_mutex_unlock(& l_srv_session->parent->mutex ); + } + pthread_mutex_unlock(& l_srv_session->parent->mutex ); - if ( !l_is_found || ! l_usage ){ - log_it(L_WARNING, "Can't find receipt in usages thats equal to response receipt"); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_usage && l_usage->service && l_usage->service->callback_response_error) - l_usage->service->callback_response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); - break; - } - l_err.usage_id = l_usage->id; - l_err.net_id.uint64 = l_usage->net->pub.id.uint64; - l_err.srv_uid.uint64 = l_usage->service->uid.uint64; + if ( !l_is_found || ! l_usage ){ + log_it(L_WARNING, "Can't find receipt in usages thats equal to response receipt"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND ; + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + if (l_usage && l_usage->service && l_usage->service->callback_response_error) + l_usage->service->callback_response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); + break; + } + l_err.usage_id = l_usage->id; + l_err.net_id.uint64 = l_usage->net->pub.id.uint64; + l_err.srv_uid.uint64 = l_usage->service->uid.uint64; + dap_chain_tx_out_cond_t *l_tx_out_cond; + if (!l_usage->is_grace) { if (! l_usage->tx_cond ){ log_it(L_WARNING, "No tx out in usage"); l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND ; @@ -463,9 +516,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) break; } int l_tx_out_cond_size =0; - dap_chain_tx_out_cond_t *l_tx_out_cond = (dap_chain_tx_out_cond_t *) - dap_chain_datum_tx_item_get(l_usage->tx_cond, NULL, TX_ITEM_TYPE_OUT_COND, &l_tx_out_cond_size ); - + l_tx_out_cond = (dap_chain_tx_out_cond_t *)dap_chain_datum_tx_item_get(l_usage->tx_cond, NULL, + TX_ITEM_TYPE_OUT_COND, &l_tx_out_cond_size ); if ( ! l_tx_out_cond ){ // No conditioned output l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); @@ -473,118 +525,125 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) l_usage->service->callback_response_error( l_usage->service, l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); break; } - // get a second signature - from the client (first sign in server, second sign in client) - dap_sign_t * l_receipt_sign = dap_chain_datum_tx_receipt_sign_get( l_receipt, l_receipt_size, 1); - if ( ! l_receipt_sign ){ - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + } + // get a second signature - from the client (first sign in server, second sign in client) + dap_sign_t * l_receipt_sign = dap_chain_datum_tx_receipt_sign_get( l_receipt, l_receipt_size, 1); + if ( ! l_receipt_sign ){ + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND ; + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + if (l_usage->service->callback_response_error) + l_usage->service->callback_response_error( l_usage->service, l_usage->id, l_usage->client, + &l_err, sizeof (l_err) ); + break; + } + // Check receipt signature pkey hash + dap_sign_get_pkey_hash(l_receipt_sign, &l_usage->client_pkey_hash); + dap_chain_net_srv_banlist_item_t *l_item = NULL; + dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_receipt->receipt_info.srv_uid); + if (l_usage->is_grace) { + pthread_mutex_lock(&l_srv->banlist_mutex); + HASH_FIND(hh, l_srv->ban_list, &l_usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); + pthread_mutex_unlock(&l_srv->banlist_mutex); + if (l_item) { // client banned + // Update actual receipt + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_BANNED_PKEY_HASH ; + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof(l_err)); if (l_usage->service->callback_response_error) - l_usage->service->callback_response_error( l_usage->service, l_usage->id, l_usage->client, - &l_err, sizeof (l_err) ); + l_usage->service->callback_response_error(l_usage->service,l_usage->id, l_usage->client, &l_err, sizeof(l_err)); break; } - - // Check receipt signature pkey hash - dap_chain_hash_fast_t l_pkey_hash={0}; - dap_sign_get_pkey_hash( l_receipt_sign, &l_pkey_hash); - - - if( memcmp ( l_pkey_hash.raw, l_tx_out_cond->subtype.srv_pay.pkey_hash.raw , sizeof(l_pkey_hash) ) != 0 ){ + } else { + if (memcmp(l_usage->client_pkey_hash.raw, l_tx_out_cond->subtype.srv_pay.pkey_hash.raw, sizeof(l_usage->client_pkey_hash)) != 0) { l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_WRONG_PKEY_HASH ; dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); if (l_usage->service->callback_response_error) l_usage->service->callback_response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); break; } + } - bool l_is_first_sign = false; - if (! l_usage->receipt_next && l_usage->receipt){ - DAP_DELETE(l_usage->receipt); - l_usage->receipt = DAP_NEW_SIZE(dap_chain_datum_tx_receipt_t,l_receipt_size); - l_usage->receipt_size = l_receipt_size; - l_is_first_sign = true; - l_usage->is_active = true; - memcpy( l_usage->receipt, l_receipt, l_receipt_size); - } else if (l_usage->receipt_next ){ - DAP_DELETE(l_usage->receipt_next); - l_usage->receipt_next = DAP_NEW_SIZE(dap_chain_datum_tx_receipt_t,l_receipt_size); - l_usage->receipt_next_size = l_receipt_size; - l_usage->is_active = true; - memcpy( l_usage->receipt_next, l_receipt, l_receipt_size); - } - - // Update actual receipt - log_it(L_NOTICE, "Receipt with remote client sign is acceptible for. Now start the service's usage"); - + // Update actual receipt + bool l_is_first_sign = false; + if (! l_usage->receipt_next && l_usage->receipt){ + DAP_DELETE(l_usage->receipt); + l_usage->receipt = DAP_NEW_SIZE(dap_chain_datum_tx_receipt_t,l_receipt_size); + l_usage->receipt_size = l_receipt_size; + l_is_first_sign = true; + l_usage->is_active = true; + memcpy( l_usage->receipt, l_receipt, l_receipt_size); + } else if (l_usage->receipt_next ){ + DAP_DELETE(l_usage->receipt_next); + l_usage->receipt_next = DAP_NEW_SIZE(dap_chain_datum_tx_receipt_t,l_receipt_size); + l_usage->receipt_next_size = l_receipt_size; + l_usage->is_active = true; + memcpy( l_usage->receipt_next, l_receipt, l_receipt_size); + } - // Store receipt if any problems with transactions - dap_chain_hash_fast_t l_receipt_hash={0}; - dap_hash_fast(l_receipt,l_receipt_size,&l_receipt_hash); - char * l_receipt_hash_str = dap_chain_hash_fast_to_str_new(&l_receipt_hash); - dap_chain_global_db_gr_set( l_receipt_hash_str,l_receipt,l_receipt_size,"local.receipts"); - l_receipt_hash_str = NULL; // To prevent usage of this pointer when it will be free by GDB processor + // Store receipt if any problems with transactions + dap_chain_hash_fast_t l_receipt_hash={0}; + dap_hash_fast(l_receipt,l_receipt_size,&l_receipt_hash); + char * l_receipt_hash_str = dap_chain_hash_fast_to_str_new(&l_receipt_hash); + dap_chain_global_db_gr_set( l_receipt_hash_str,l_receipt,l_receipt_size,"local.receipts"); + l_receipt_hash_str = NULL; // To prevent usage of this pointer when it will be free by GDB processor + size_t l_success_size; + dap_chain_hash_fast_t *l_tx_in_hash = NULL; + if (!l_usage->is_grace) { // Form input transaction dap_chain_addr_t *l_wallet_addr = dap_chain_wallet_get_addr(l_usage->wallet, l_usage->net->pub.id); - - - dap_chain_hash_fast_t * l_tx_in_hash = dap_chain_mempool_tx_create_cond_input( - l_usage->net,&l_usage->tx_cond_hash, - l_wallet_addr,dap_chain_wallet_get_key( l_usage->wallet,0), l_receipt, l_receipt_size); - + l_tx_in_hash = dap_chain_mempool_tx_create_cond_input(l_usage->net, &l_usage->tx_cond_hash, l_wallet_addr, + dap_chain_wallet_get_key(l_usage->wallet, 0), + l_receipt, l_receipt_size); if ( l_tx_in_hash){ char * l_tx_in_hash_str = dap_chain_hash_fast_to_str_new(l_tx_in_hash); log_it(L_NOTICE, "Formed tx %s for input with active receipt", l_tx_in_hash_str); - - /* We could put transaction directly to chains - if ( dap_chain_net_get_role( l_usage->net ).enums == NODE_ROLE_MASTER || - dap_chain_net_get_role( l_usage->net ).enums == NODE_ROLE_CELL_MASTER || - dap_chain_net_get_role( l_usage->net ).enums == NODE_ROLE_ROOT || - dap_chain_net_get_role( l_usage->net ).enums == NODE_ROLE_ROOT_MASTER ){ - dap_chain_net_proc_mempool( l_usage->net); - }*/ DAP_DELETE(l_tx_in_hash_str); }else log_it(L_ERROR, "Can't create input tx cond transaction!"); + l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t) + sizeof(dap_chain_hash_fast_t); + } else { + l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t); + } + dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, + l_success_size); + l_success->hdr.usage_id = l_usage->id; + l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; + l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; + if (l_tx_in_hash) { + memcpy(l_success->custom_data, l_tx_in_hash, sizeof(dap_chain_hash_fast_t)); + DAP_DELETE(l_tx_in_hash); + } + + if (l_usage->is_grace) + log_it(L_NOTICE, "Receipt is OK, but transaction can't be found. Start the grace period for %d seconds", + l_srv->grace_period); + else + log_it(L_NOTICE, "Receipt with remote client sign is acceptible for. Now start the service's usage"); - size_t l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t) + sizeof(dap_chain_hash_fast_t); - dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, - l_success_size); - l_success->hdr.usage_id = l_usage->id; - l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; - l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; - if (l_tx_in_hash) { - memcpy(l_success->custom_data, l_tx_in_hash, sizeof(dap_chain_hash_fast_t)); - DAP_DELETE(l_tx_in_hash); + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS , + l_success, l_success_size); + DAP_DELETE(l_success); + + if ( l_is_first_sign && l_usage->service->callback_response_success){ + if( l_usage->service->callback_response_success(l_usage->service,l_usage->id, l_usage->client, + l_receipt, l_receipt_size ) !=0 ){ + log_it(L_NOTICE, "No success by service callback, inactivating service usage"); + l_usage->is_active = false; } - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS , - l_success, l_success_size); - DAP_DELETE(l_success); - - if ( l_is_first_sign && l_usage->service->callback_receipt_first_success){ - if( l_usage->service->callback_receipt_first_success(l_usage->service,l_usage->id, l_usage->client, - l_receipt, l_receipt_size ) !=0 ){ - log_it(L_NOTICE, "No success by service callback, inactivating service usage"); - l_usage->is_active = false; - } - // issue receipt next - l_usage->receipt_next = dap_chain_net_srv_issue_receipt( l_usage->service, l_usage, l_usage->price ,NULL,0); - l_usage->receipt_next_size = l_usage->receipt_next->size; - dap_stream_ch_pkt_write_unsafe( a_ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST , - l_usage->receipt_next, l_usage->receipt_next->size); - - }else if ( l_usage->service->callback_receipt_next_success){ - if (l_usage->service->callback_receipt_next_success(l_usage->service,l_usage->id, l_usage->client, - l_receipt, l_receipt_size ) != 0 ){ - log_it(L_NOTICE, "No success by service callback, inactivating service usage"); - l_usage->is_active = false; - } + // issue receipt next + l_usage->receipt_next = dap_chain_net_srv_issue_receipt( l_usage->service, l_usage, l_usage->price ,NULL,0); + l_usage->receipt_next_size = l_usage->receipt_next->size; + dap_stream_ch_pkt_write_unsafe( a_ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST , + l_usage->receipt_next, l_usage->receipt_next->size); + + }else if ( l_usage->service->callback_receipt_next_success){ + if (l_usage->service->callback_receipt_next_success(l_usage->service,l_usage->id, l_usage->client, + l_receipt, l_receipt_size ) != 0 ){ + log_it(L_NOTICE, "No success by service callback, inactivating service usage"); + l_usage->is_active = false; } - - }else{ - log_it(L_ERROR, "Wrong sign response size, %zd when expected at least %zd with smth", l_ch_pkt->hdr.size, - sizeof(dap_chain_receipt_info_t)+1 ); } } break; + case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS:{ log_it( L_NOTICE, "Responsed with success"); // TODO code for service client mode @@ -602,6 +661,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) //l_success->hdr.net_id, l_success->hdr.srv_uid, l_success->hdr.usage_id } } break; + case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_DATA:{ if (l_ch_pkt->hdr.size < sizeof(dap_stream_ch_chain_net_srv_pkt_data_hdr_t) ){ log_it( L_WARNING, "Wrong request size, less than minimum"); @@ -633,6 +693,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) } break; + case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR:{ if ( l_ch_pkt->hdr.size == sizeof (dap_stream_ch_chain_net_srv_pkt_error_t) ){ dap_stream_ch_chain_net_srv_pkt_error_t * l_err = (dap_stream_ch_chain_net_srv_pkt_error_t *) l_ch_pkt->data; @@ -643,6 +704,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) sizeof ( dap_stream_ch_chain_net_srv_pkt_error_t) ); } } break; + default: log_it( L_WARNING, "Unknown packet type 0x%02X", l_ch_pkt->hdr.type); } if(l_ch_chain_net_srv->notify_callback) diff --git a/modules/net/srv/dap_chain_net_srv.c b/modules/net/srv/dap_chain_net_srv.c index b761a70a9ccb217bdfadb199168dbb69ce01c6da..3291d75684d76f28804feb03ee2e3415365f78bd 100644 --- a/modules/net/srv/dap_chain_net_srv.c +++ b/modules/net/srv/dap_chain_net_srv.c @@ -635,13 +635,13 @@ dap_chain_net_srv_t* dap_chain_net_srv_add(dap_chain_net_srv_uid_t a_uid,dap_cha l_srv = DAP_NEW_Z(dap_chain_net_srv_t); l_srv->uid.uint64 = a_uid.uint64; l_srv->callback_requested = a_callback_request; - l_srv->callback_receipt_first_success = a_callback_response_success; + l_srv->callback_response_success = a_callback_response_success; l_srv->callback_response_error = a_callback_response_error; l_srv->callback_receipt_next_success = a_callback_receipt_next_success; + pthread_mutex_init(&l_srv->banlist_mutex, NULL); l_sdata = DAP_NEW_Z(service_list_t); memcpy(&l_sdata->uid, &l_uid, sizeof(l_uid)); - l_sdata->srv = l_srv;//DAP_NEW(dap_chain_net_srv_t); - //memcpy(l_sdata->srv, l_srv, sizeof(dap_chain_net_srv_t)); + l_sdata->srv = l_srv; HASH_ADD(hh, s_srv_list, uid, sizeof(l_srv->uid), l_sdata); }else{ log_it(L_ERROR, "Already present service with 0x%016llX ", a_uid.uint64); @@ -701,8 +701,10 @@ void dap_chain_net_srv_del(dap_chain_net_srv_t * a_srv) pthread_mutex_lock(&s_srv_list_mutex); HASH_FIND(hh, s_srv_list, a_srv, sizeof(dap_chain_net_srv_uid_t), l_sdata); if(l_sdata) { - DAP_DELETE(l_sdata); HASH_DEL(s_srv_list, l_sdata); + pthread_mutex_destroy(&a_srv->banlist_mutex); + DAP_DELETE(a_srv); + DAP_DELETE(l_sdata); } pthread_mutex_unlock(&s_srv_list_mutex); } @@ -763,8 +765,10 @@ void dap_chain_net_srv_del_all(void) pthread_mutex_lock(&s_srv_list_mutex); HASH_ITER(hh, s_srv_list , l_sdata, l_sdata_tmp) { - DAP_DELETE(l_sdata); HASH_DEL(s_srv_list, l_sdata); + pthread_mutex_destroy(&l_sdata->srv->banlist_mutex); + DAP_DELETE(l_sdata->srv); + DAP_DELETE(l_sdata); } pthread_mutex_unlock(&s_srv_list_mutex); } diff --git a/modules/net/srv/include/dap_chain_net_srv.h b/modules/net/srv/include/dap_chain_net_srv.h index c7de7e8f0c988d173597e837c08dffc02c2ad979..cb018bdcfb09528a8347529df2cbb9781bf35ead 100755 --- a/modules/net/srv/include/dap_chain_net_srv.h +++ b/modules/net/srv/include/dap_chain_net_srv.h @@ -36,18 +36,30 @@ typedef int (*dap_chain_net_srv_callback_data_t)(dap_chain_net_srv_t *, uint32_t typedef int (*dap_chain_net_srv_callback_sign_request_t)(dap_chain_net_srv_t *, uint32_t, dap_chain_net_srv_client_t *, dap_chain_datum_tx_receipt_t **, size_t ); typedef void (*dap_chain_net_srv_callback_ch_t)(dap_chain_net_srv_t *, dap_stream_ch_t *); +typedef struct dap_chain_net_srv_banlist_item { + dap_chain_hash_fast_t client_pkey_hash; + pthread_mutex_t *ht_mutex; + struct dap_chain_net_srv_banlist_item **ht_head; + UT_hash_handle hh; +} dap_chain_net_srv_banlist_item_t; + typedef struct dap_chain_net_srv { dap_chain_net_srv_uid_t uid; // Unique ID for service. dap_chain_net_srv_abstract_t srv_common; dap_chain_net_srv_price_t *pricelist; + + uint32_t grace_period; + pthread_mutex_t banlist_mutex; + dap_chain_net_srv_banlist_item_t *ban_list; + dap_chain_callback_trafic_t callback_trafic; // Request for usage dap_chain_net_srv_callback_data_t callback_requested; // Receipt first sign successfull - dap_chain_net_srv_callback_data_t callback_receipt_first_success; + dap_chain_net_srv_callback_data_t callback_response_success; // Response error dap_chain_net_srv_callback_data_t callback_response_error; diff --git a/modules/net/srv/include/dap_chain_net_srv_common.h b/modules/net/srv/include/dap_chain_net_srv_common.h index d4141c574611305fc4eab2795fd847ef890c03f1..d9bceb4ca095799827328d0f4df35c629583f045 100755 --- a/modules/net/srv/include/dap_chain_net_srv_common.h +++ b/modules/net/srv/include/dap_chain_net_srv_common.h @@ -33,11 +33,7 @@ #include "dap_chain_ledger.h" #include "dap_chain_net.h" #include "dap_chain_wallet.h" - - - -//Units of service - +//#include "dap_chain_net_srv_stream_session.h" //Service direction @@ -48,8 +44,6 @@ typedef enum dap_chain_net_srv_order_direction{ } dap_chain_net_srv_order_direction_t; - - typedef struct dap_chain_net_srv_abstract { uint8_t class; //Class of service (once or permanent) @@ -89,7 +83,6 @@ typedef struct dap_chain_net_srv_price struct dap_chain_net_srv_price * prev; } dap_chain_net_srv_price_t; - // Ch pkt types #define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_REQUEST 0x01 #define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_REQUEST 0x10 @@ -117,6 +110,7 @@ typedef struct dap_chain_net_srv_price #define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND 0x00000500 #define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_NO_SIGN 0x00000501 #define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_WRONG_PKEY_HASH 0x00000502 +#define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_BANNED_PKEY_HASH 0x00000503 #define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_PRICE_NOT_FOUND 0x00000600 #define DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_UNKNOWN 0xffffffff @@ -186,6 +180,15 @@ typedef struct dap_stream_ch_chain_net_srv_pkt_test{ uint8_t data[]; } DAP_ALIGN_PACKED dap_stream_ch_chain_net_srv_pkt_test_t; +typedef struct dap_chain_net_srv_usage dap_chain_net_srv_usage_t; + +typedef struct dap_chain_net_srv_grace { + dap_stream_worker_t *stream_worker; + dap_stream_ch_t *ch; + dap_chain_net_srv_usage_t *usage; + dap_stream_ch_chain_net_srv_pkt_request_t *request; + size_t request_size; +} dap_chain_net_srv_grace_t; DAP_STATIC_INLINE const char * dap_chain_net_srv_price_unit_uid_to_str( dap_chain_net_srv_price_unit_uid_t a_uid ) { diff --git a/modules/net/srv/include/dap_chain_net_srv_stream_session.h b/modules/net/srv/include/dap_chain_net_srv_stream_session.h index a47a16c9bc76a751b3c50319430cc69926b5dded..3a7377526f7f737190fc6617f8533975301a0d40 100644 --- a/modules/net/srv/include/dap_chain_net_srv_stream_session.h +++ b/modules/net/srv/include/dap_chain_net_srv_stream_session.h @@ -37,6 +37,7 @@ along with any CellFrame SDK based project. If not, see <http://www.gnu.org/lic #include "dap_chain_wallet.h" typedef struct dap_chain_net_srv dap_chain_net_srv_t; + typedef struct dap_chain_net_srv_usage{ uint32_t id; // Usage id pthread_rwlock_t rwlock; @@ -53,14 +54,14 @@ typedef struct dap_chain_net_srv_usage{ dap_chain_net_srv_client_t * client; dap_chain_datum_tx_t * tx_cond; dap_chain_hash_fast_t tx_cond_hash; + dap_chain_hash_fast_t client_pkey_hash; char token_ticker[DAP_CHAIN_TICKER_SIZE_MAX]; bool is_active; bool is_free; + bool is_grace; UT_hash_handle hh; // } dap_chain_net_srv_usage_t; -typedef void (*dap_response_success_callback_t) (dap_stream_ch_chain_net_srv_pkt_success_t*, void*); - typedef struct dap_net_stats{ uintmax_t bytes_sent; uintmax_t bytes_recv; @@ -74,10 +75,10 @@ typedef struct dap_net_stats{ } dap_net_stats_t; typedef struct dap_chain_net_srv_stream_session { + time_t ts_activated; dap_stream_session_t * parent; dap_chain_net_srv_usage_t * usages; dap_chain_net_srv_usage_t * usage_active; - uintmax_t limits_bytes; // Bytes left time_t limits_ts; // Timestamp until its activte dap_chain_net_srv_price_unit_uid_t limits_units_type; @@ -85,12 +86,8 @@ typedef struct dap_chain_net_srv_stream_session { // Some common stats volatile dap_net_stats_t stats; - time_t ts_activated; dap_sign_t* user_sign; // User's signature for auth if reconnect - dap_response_success_callback_t response_success_callback; - void *response_success_callback_data; - } dap_chain_net_srv_stream_session_t; #define DAP_CHAIN_NET_SRV_STREAM_SESSION(a) ((dap_chain_net_srv_stream_session_t *) (a)->_inheritor ) diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index d27b45c92169b0dea28c34687d30e90fbec4d8f0..93c8f92d3010ffb53b11f1d7d82bec02df254b37 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -670,14 +670,13 @@ int s_vpn_service_create(dap_config_t * g_config){ l_srv->_inhertor = l_srv_vpn; l_srv_vpn->parent = l_srv; - uint16_t l_pricelist_count = 0; - // Read if we need to dump all pkt operations s_debug_more= dap_config_get_item_bool_default(g_config,"srv_vpn", "debug_more",false); - + l_srv->grace_period = dap_config_get_item_uint32_default(g_config, "srv_vpn", "grace_period", 60); //! IMPORTANT ! This fetch is single-action and cannot be further reused, since it modifies the stored config data //! it also must NOT be freed within this module ! + uint16_t l_pricelist_count = 0; char **l_pricelist = dap_config_get_array_str(g_config, "srv_vpn", "pricelist", &l_pricelist_count); // must not be freed! for (uint16_t i = 0; i < l_pricelist_count; i++) { dap_chain_net_srv_price_t *l_price = DAP_NEW_Z(dap_chain_net_srv_price_t); @@ -1269,14 +1268,16 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find_unsafe(l_srv_session, l_ch_vpn->usage_id); if ( ! l_usage){ - log_it(L_NOTICE, "No active usage in list, possible disconnected. Send nothin on this channel"); + log_it(L_NOTICE, "No active usage in list, possible disconnected. Send nothing on this channel"); dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); return; } if ( ! l_usage->is_active ){ - log_it(L_INFO, "Usage inactivation: switch off packet input channel"); + log_it(L_INFO, "Usage inactivation: switch off packet input & output channels"); + if (l_usage->client) + dap_stream_ch_pkt_write_unsafe( l_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); return; @@ -1413,26 +1414,26 @@ static void s_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find_unsafe(l_srv_session, l_ch_vpn->usage_id); if ( ! l_usage){ - log_it(L_NOTICE, "No active usage in list, possible disconnected. Send nothin on this channel"); + log_it(L_NOTICE, "No active usage in list, possible disconnected. Send nothing on this channel"); dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); return; } if ( ! l_usage->is_active ){ - log_it(L_INFO, "Usage inactivation: switch off packet output channel"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); - dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); + log_it(L_INFO, "Usage inactivation: switch off packet input & output channels"); if (l_usage->client) dap_stream_ch_pkt_write_unsafe( l_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); + dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); + dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); return; } if ( (! l_usage->is_free) && (! l_usage->receipt) ){ log_it(L_WARNING, "No active receipt, switching off"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); - dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); if (l_usage->client) dap_stream_ch_pkt_write_unsafe( l_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); + dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); + dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); return; } // Check for empty buffer out here to prevent warnings in worker