From ac7340c779cdd21d78ba23b42c035722cc2341c3 Mon Sep 17 00:00:00 2001 From: "alexey.stratulat" <alexey.stratulat@demlabs.net> Date: Tue, 23 Aug 2022 10:02:06 +0000 Subject: [PATCH] Features 6559 modules channel --- .../dap_stream_ch_chain_net_srv.c | 571 +++++++++--------- .../chain-net/dap_stream_ch_chain_net.c | 3 +- .../chain-voting/dap_stream_ch_chain_voting.c | 181 +++--- .../chain/include/dap_stream_ch_chain.h | 1 + modules/net/srv/include/dap_chain_net_srv.h | 2 +- 5 files changed, 359 insertions(+), 399 deletions(-) diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c index c106e86c13..14344eb326 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c @@ -342,316 +342,301 @@ free_exit: */ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) { - dap_stream_ch_chain_net_srv_t * l_ch_chain_net_srv = DAP_STREAM_CH_CHAIN_NET_SRV(a_ch); - dap_stream_ch_pkt_t *l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg; // chain packet - dap_chain_net_srv_stream_session_t * l_srv_session = a_ch && a_ch->stream && a_ch->stream->session ? - a_ch->stream->session->_inheritor : NULL; - if ( ! l_srv_session ){ + dap_stream_ch_pkt_t *l_ch_pkt = (dap_stream_ch_pkt_t *)a_arg; + if (!l_ch_pkt) + return; + dap_chain_net_srv_stream_session_t *l_srv_session = NULL; + if (a_ch) { + l_srv_session = a_ch->stream && a_ch->stream->session ? a_ch->stream->session->_inheritor : NULL; + } + if (!l_srv_session) { log_it( L_ERROR, "Not defined service session, switching off packet input process"); dap_stream_ch_set_ready_to_read_unsafe(a_ch, false); return; } - dap_stream_ch_chain_net_srv_pkt_error_t l_err; - memset(&l_err,0,sizeof (l_err)); - if (l_ch_pkt ) { - if (l_ch_chain_net_srv->notify_callback) { - l_ch_chain_net_srv->notify_callback(l_ch_chain_net_srv, l_ch_pkt->hdr.type, l_ch_pkt, l_ch_chain_net_srv->notify_callback_arg); - return; // It's a client behind this + dap_stream_ch_chain_net_srv_t * l_ch_chain_net_srv = DAP_STREAM_CH_CHAIN_NET_SRV(a_ch); + if (l_ch_chain_net_srv->notify_callback) { + l_ch_chain_net_srv->notify_callback(l_ch_chain_net_srv, l_ch_pkt->hdr.type, l_ch_pkt, l_ch_chain_net_srv->notify_callback_arg); + return; // It's a client behind this + } + dap_stream_ch_chain_net_srv_pkt_error_t l_err = { }; + switch (l_ch_pkt->hdr.type) { + case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_REQUEST: { + typedef dap_stream_ch_chain_net_srv_pkt_test_t pkt_test_t; + pkt_test_t *l_request = (pkt_test_t*)l_ch_pkt->data; + size_t l_request_size = l_request->data_size + sizeof(pkt_test_t); + if (l_ch_pkt->hdr.size != l_request_size) { + log_it(L_WARNING, "Wrong request size %u, must be %zu [pkt seq %lu]", l_ch_pkt->hdr.size, l_request_size, l_ch_pkt->hdr.seq_id); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_WRONG_SIZE; + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof(l_err)); + break; } - switch (l_ch_pkt->hdr.type) { - // for send test data - case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_REQUEST:{ - dap_stream_ch_chain_net_srv_pkt_test_t *l_request = (dap_stream_ch_chain_net_srv_pkt_test_t*) l_ch_pkt->data; - size_t l_request_size = l_request->data_size + sizeof(dap_stream_ch_chain_net_srv_pkt_test_t); - if (l_ch_pkt->hdr.size != l_request_size) { - log_it(L_WARNING, "Wrong request size %u, must be %zu [pkt seq %"DAP_UINT64_FORMAT_U"]", l_ch_pkt->hdr.size, l_request_size, l_ch_pkt->hdr.seq_id); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_WRONG_SIZE; - dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof(l_err)); - break; - } - struct timeval l_recvtime2; - gettimeofday(&l_recvtime2, NULL); - memcpy(&l_request->recv_time2,&l_recvtime2,sizeof (l_recvtime2)); - //printf("\n%lu.%06lu \n", (unsigned long) l_request->recv_time2.tv_sec, (unsigned long) l_request->recv_time2.tv_usec); - dap_chain_hash_fast_t l_data_hash; - dap_hash_fast(l_request->data, l_request->data_size, &l_data_hash); - if (l_request->data_size > 0 && !dap_hash_fast_compare(&l_data_hash, &l_request->data_hash)) { - log_it(L_WARNING, "Wrong hash [pkt seq %"DAP_UINT64_FORMAT_U"]", l_ch_pkt->hdr.seq_id); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_WRONG_HASH; - dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof(l_err)); - break; - } - // The size of the received data is too large (> 4.294.967.295 bytes) - if(l_request->data_size_recv > UINT_MAX) { - log_it(L_WARNING, "Too large payload %zu [pkt seq %"DAP_UINT64_FORMAT_U"]", l_request->data_size_recv, l_ch_pkt->hdr.seq_id); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_BIG_SIZE; - dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, - sizeof(l_err)); - break; - } - // create data to send back - size_t l_recv_out_size = sizeof(dap_stream_ch_chain_net_srv_pkt_test_t) + l_request->data_size_recv; - dap_stream_ch_chain_net_srv_pkt_test_t *l_request_out = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_test_t, l_recv_out_size); - // copy info from recv message - memcpy(l_request_out, l_request, sizeof(dap_stream_ch_chain_net_srv_pkt_test_t)); - l_request_out->data_size = l_request->data_size_recv; - if (l_request_out->data_size) { - randombytes(l_request_out->data, l_request_out->data_size); - dap_hash_fast(l_request_out->data, l_request_out->data_size, &l_request_out->data_hash); - } - l_request_out->err_code = 0; - strncpy((char *)l_request_out->ip_send, a_ch->stream->esocket->hostaddr, sizeof(l_request_out->ip_send) - 1); - // Thats to prevent unaligned pointer - struct timeval l_tval; - gettimeofday(&l_tval, NULL); - l_request_out->send_time2.tv_sec = l_tval.tv_sec; - l_request_out->send_time2.tv_usec = l_tval.tv_usec; - // send response - dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE, l_request_out, l_recv_out_size); - DAP_DELETE(l_request_out); - } break; - - case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_REQUEST:{ - if (l_ch_pkt->hdr.size < sizeof(dap_stream_ch_chain_net_srv_pkt_request_hdr_t) ){ - log_it( L_WARNING, "Wrong request size, less than minimum"); - break; - } - dap_chain_net_srv_grace_t *l_grace = DAP_NEW_Z(dap_chain_net_srv_grace_t); - // Parse the request - l_grace->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, l_ch_pkt->hdr.size); - memcpy(l_grace->request, l_ch_pkt->data, l_ch_pkt->hdr.size); - l_ch_chain_net_srv->srv_uid.uint64 = l_grace->request->hdr.srv_uid.uint64; - l_grace->request_size = l_ch_pkt->hdr.size; - l_grace->ch_uuid = a_ch->uuid; - l_grace->stream_worker = a_ch->stream_worker; - s_grace_period_control(l_grace); - } break; - - case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE:{ - if (l_ch_pkt->hdr.size < sizeof(dap_chain_receipt_info_t)) { - log_it(L_ERROR, "Wrong sign response size, %u when expected at least %zu with smth", l_ch_pkt->hdr.size, - sizeof(dap_chain_receipt_info_t)); - break; - } - dap_chain_datum_tx_receipt_t * l_receipt = (dap_chain_datum_tx_receipt_t *) l_ch_pkt->data; - size_t l_receipt_size = l_ch_pkt->hdr.size; - dap_chain_net_srv_usage_t * l_usage= NULL, *l_tmp= NULL; - bool l_is_found = false; - pthread_mutex_lock(& l_srv_session->parent->mutex ); // TODO rework it with packet usage_id - HASH_ITER(hh, l_srv_session->usages, l_usage, l_tmp){ - if ( l_usage->receipt_next ){ // If we have receipt next - if ( memcmp(&l_usage->receipt_next->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){ - l_is_found = true; - break; - } - }else if (l_usage->receipt ){ // If we sign first receipt - if ( memcmp(&l_usage->receipt->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){ - l_is_found = true; - break; - } - } - } - pthread_mutex_unlock(& l_srv_session->parent->mutex ); - - if ( !l_is_found || ! l_usage ){ - log_it(L_WARNING, "Can't find receipt in usages thats equal to response receipt"); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_usage && l_usage->service && l_usage->service->callbacks.response_error) - l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); + dap_chain_hash_fast_t l_data_hash; + dap_hash_fast(l_request->data, l_request->data_size, &l_data_hash); + if (l_request->data_size > 0 && !dap_hash_fast_compare(&l_data_hash, &l_request->data_hash)) { + log_it(L_WARNING, "Wrong hash [pkt seq %lu]", l_ch_pkt->hdr.seq_id); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_WRONG_HASH; + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof(l_err)); + break; + } + if(l_request->data_size_recv > UINT_MAX) { + log_it(L_WARNING, "Too large payload %zu [pkt seq %lu]", l_request->data_size_recv, l_ch_pkt->hdr.seq_id); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_BIG_SIZE; + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof(l_err)); + break; + } + /* No need for bare copying, resend it back modified */ + if (l_request->data_size_recv) { + l_request->data_size = l_request->data_size_recv; + randombytes(l_request->data, l_request->data_size); + dap_hash_fast(l_request->data, l_request->data_size, &l_request->data_hash); + } + l_request->err_code = 0; + strncpy(l_request->ip_send, a_ch->stream->esocket->hostaddr, INET_ADDRSTRLEN); + struct timespec l_recvtime2; + clck_gettime(CLOCK_REALTIME, &l_recvtime2); + l_request->recv_time2 = l_recvtime2; + + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_RESPONSE, l_request, + l_request->data_size + sizeof(pkt_test_t)); + } break; /* DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_CHECK_REQUEST */ + + case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_REQUEST: { + if (l_ch_pkt->hdr.size < sizeof(dap_stream_ch_chain_net_srv_pkt_request_hdr_t) ){ + log_it( L_WARNING, "Wrong request size, less than minimum"); + break; + } + dap_chain_net_srv_grace_t *l_grace = DAP_NEW_Z(dap_chain_net_srv_grace_t); + // Parse the request + l_grace->request = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_request_t, l_ch_pkt->hdr.size); + memcpy(l_grace->request, l_ch_pkt->data, l_ch_pkt->hdr.size); + l_ch_chain_net_srv->srv_uid.uint64 = l_grace->request->hdr.srv_uid.uint64; + l_grace->request_size = l_ch_pkt->hdr.size; + l_grace->ch_uuid = a_ch->uuid; + l_grace->stream_worker = a_ch->stream_worker; + s_grace_period_control(l_grace); + } break; /* DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_REQUEST */ + + case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_SIGN_RESPONSE: { + if (l_ch_pkt->hdr.size < sizeof(dap_chain_receipt_info_t)) { + log_it(L_ERROR, "Wrong sign response size, %u when expected at least %zu with smth", l_ch_pkt->hdr.size, + sizeof(dap_chain_receipt_info_t)); + break; + } + dap_chain_datum_tx_receipt_t * l_receipt = (dap_chain_datum_tx_receipt_t *) l_ch_pkt->data; + size_t l_receipt_size = l_ch_pkt->hdr.size; + dap_chain_net_srv_usage_t * l_usage= NULL, *l_tmp= NULL; + bool l_is_found = false; + pthread_mutex_lock(& l_srv_session->parent->mutex ); // TODO rework it with packet usage_id + HASH_ITER(hh, l_srv_session->usages, l_usage, l_tmp){ + if ( l_usage->receipt_next ){ // If we have receipt next + if ( memcmp(&l_usage->receipt_next->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){ + l_is_found = true; break; } - l_err.usage_id = l_usage->id; - l_err.net_id.uint64 = l_usage->net->pub.id.uint64; - l_err.srv_uid.uint64 = l_usage->service->uid.uint64; - - dap_chain_tx_out_cond_t *l_tx_out_cond = NULL; - if (!l_usage->is_grace) { - if (! l_usage->tx_cond ){ - log_it(L_WARNING, "No tx out in usage"); - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_usage->service->callbacks.response_error) - l_usage->service->callbacks.response_error( l_usage->service, l_usage->id, l_usage->client, - &l_err, sizeof (l_err) ); - break; - } - int l_tx_out_cond_size =0; - l_tx_out_cond = (dap_chain_tx_out_cond_t *)dap_chain_datum_tx_item_get(l_usage->tx_cond, NULL, - TX_ITEM_TYPE_OUT_COND, &l_tx_out_cond_size ); - if ( ! l_tx_out_cond ){ // No conditioned output - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_usage->service->callbacks.response_error) - l_usage->service->callbacks.response_error( l_usage->service, l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); - break; - } - } - // get a second signature - from the client (first sign in server, second sign in client) - dap_sign_t * l_receipt_sign = dap_chain_datum_tx_receipt_sign_get( l_receipt, l_receipt_size, 1); - if ( ! l_receipt_sign ){ - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_usage->service->callbacks.response_error) - l_usage->service->callbacks.response_error( l_usage->service, l_usage->id, l_usage->client, - &l_err, sizeof (l_err) ); + }else if (l_usage->receipt ){ // If we sign first receipt + if ( memcmp(&l_usage->receipt->receipt_info, &l_receipt->receipt_info,sizeof (l_receipt->receipt_info) )==0 ){ + l_is_found = true; break; } - // Check receipt signature pkey hash - dap_sign_get_pkey_hash(l_receipt_sign, &l_usage->client_pkey_hash); - dap_chain_net_srv_banlist_item_t *l_item = NULL; - dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_receipt->receipt_info.srv_uid); - if (l_usage->is_grace) { - pthread_mutex_lock(&l_srv->banlist_mutex); - HASH_FIND(hh, l_srv->ban_list, &l_usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); - pthread_mutex_unlock(&l_srv->banlist_mutex); - if (l_item) { // client banned - // Update actual receipt - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_BANNED_PKEY_HASH ; - dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof(l_err)); - if (l_usage->service->callbacks.response_error) - l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client, &l_err, sizeof(l_err)); - break; - } - } else { - if (memcmp(l_usage->client_pkey_hash.raw, l_tx_out_cond->subtype.srv_pay.pkey_hash.raw, sizeof(l_usage->client_pkey_hash)) != 0) { - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_WRONG_PKEY_HASH ; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - if (l_usage->service->callbacks.response_error) - l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); - break; - } - } + } + } + pthread_mutex_unlock(& l_srv_session->parent->mutex ); + + if ( !l_is_found || ! l_usage ){ + log_it(L_WARNING, "Can't find receipt in usages thats equal to response receipt"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND ; + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + if (l_usage && l_usage->service && l_usage->service->callbacks.response_error) + l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); + break; + } + l_err.usage_id = l_usage->id; + l_err.net_id.uint64 = l_usage->net->pub.id.uint64; + l_err.srv_uid.uint64 = l_usage->service->uid.uint64; - // Update actual receipt - bool l_is_first_sign = false; - if (! l_usage->receipt_next && l_usage->receipt){ - DAP_DELETE(l_usage->receipt); - l_usage->receipt = DAP_NEW_SIZE(dap_chain_datum_tx_receipt_t,l_receipt_size); - l_is_first_sign = true; - l_usage->is_active = true; - memcpy( l_usage->receipt, l_receipt, l_receipt_size); - } else if (l_usage->receipt_next ){ - DAP_DELETE(l_usage->receipt_next); - l_usage->receipt_next = DAP_NEW_SIZE(dap_chain_datum_tx_receipt_t,l_receipt_size); - l_usage->is_active = true; - memcpy( l_usage->receipt_next, l_receipt, l_receipt_size); - } + dap_chain_tx_out_cond_t *l_tx_out_cond = NULL; + if (!l_usage->is_grace) { + if (! l_usage->tx_cond ){ + log_it(L_WARNING, "No tx out in usage"); + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NOT_FOUND ; + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + if (l_usage->service->callbacks.response_error) + l_usage->service->callbacks.response_error( l_usage->service, l_usage->id, l_usage->client, + &l_err, sizeof (l_err) ); + break; + } + int l_tx_out_cond_size =0; + l_tx_out_cond = (dap_chain_tx_out_cond_t *)dap_chain_datum_tx_item_get(l_usage->tx_cond, NULL, + TX_ITEM_TYPE_OUT_COND, &l_tx_out_cond_size ); + if ( ! l_tx_out_cond ){ // No conditioned output + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_TX_COND_NO_COND_OUT ; + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + if (l_usage->service->callbacks.response_error) + l_usage->service->callbacks.response_error( l_usage->service, l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); + break; + } + } + // get a second signature - from the client (first sign in server, second sign in client) + dap_sign_t * l_receipt_sign = dap_chain_datum_tx_receipt_sign_get( l_receipt, l_receipt_size, 1); + if ( ! l_receipt_sign ){ + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_CANT_FIND ; + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + if (l_usage->service->callbacks.response_error) + l_usage->service->callbacks.response_error( l_usage->service, l_usage->id, l_usage->client, + &l_err, sizeof (l_err) ); + break; + } + // Check receipt signature pkey hash + dap_sign_get_pkey_hash(l_receipt_sign, &l_usage->client_pkey_hash); + dap_chain_net_srv_banlist_item_t *l_item = NULL; + dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get(l_receipt->receipt_info.srv_uid); + if (l_usage->is_grace) { + pthread_mutex_lock(&l_srv->banlist_mutex); + HASH_FIND(hh, l_srv->ban_list, &l_usage->client_pkey_hash, sizeof(dap_chain_hash_fast_t), l_item); + pthread_mutex_unlock(&l_srv->banlist_mutex); + if (l_item) { // client banned + // Update actual receipt + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_BANNED_PKEY_HASH ; + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof(l_err)); + if (l_usage->service->callbacks.response_error) + l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client, &l_err, sizeof(l_err)); + break; + } + } else { + if (memcmp(l_usage->client_pkey_hash.raw, l_tx_out_cond->subtype.srv_pay.pkey_hash.raw, sizeof(l_usage->client_pkey_hash)) != 0) { + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_RECEIPT_WRONG_PKEY_HASH ; + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + if (l_usage->service->callbacks.response_error) + l_usage->service->callbacks.response_error(l_usage->service,l_usage->id, l_usage->client,&l_err,sizeof (l_err) ); + break; + } + } - // Store receipt if any problems with transactions - dap_chain_hash_fast_t l_receipt_hash={0}; - dap_hash_fast(l_receipt,l_receipt_size,&l_receipt_hash); - - char *l_receipt_hash_str = dap_chain_hash_fast_to_str_new(&l_receipt_hash); - dap_global_db_set("local.receipts", l_receipt_hash_str, l_receipt, l_receipt_size, false, NULL, NULL); - DAP_DELETE(l_receipt_hash_str); - - size_t l_success_size; - dap_chain_hash_fast_t *l_tx_in_hash = NULL; - if (!l_usage->is_grace) { - // Form input transaction - dap_chain_addr_t *l_wallet_addr = dap_chain_wallet_get_addr(l_usage->price->wallet, l_usage->net->pub.id); - l_tx_in_hash = dap_chain_mempool_tx_create_cond_input(l_usage->net, &l_usage->tx_cond_hash, l_wallet_addr, - dap_chain_wallet_get_key(l_usage->price->wallet, 0), - l_receipt); - if (l_tx_in_hash) { - memcpy(&l_usage->tx_cond_hash, l_tx_in_hash, sizeof(dap_chain_hash_fast_t)); - char *l_tx_in_hash_str = dap_chain_hash_fast_to_str_new(l_tx_in_hash); - log_it(L_NOTICE, "Formed tx %s for input with active receipt", l_tx_in_hash_str); - DAP_DELETE(l_tx_in_hash_str); - }else - log_it(L_ERROR, "Can't create input tx cond transaction!"); - l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t) + sizeof(dap_chain_hash_fast_t); - } else { - l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t); - } - dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, - l_success_size); - l_success->hdr.usage_id = l_usage->id; - l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; - l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; - if (l_tx_in_hash) { - memcpy(l_success->custom_data, l_tx_in_hash, sizeof(dap_chain_hash_fast_t)); - DAP_DELETE(l_tx_in_hash); - } + // Update actual receipt + bool l_is_first_sign = false; + if (! l_usage->receipt_next && l_usage->receipt){ + DAP_DELETE(l_usage->receipt); + l_usage->receipt = DAP_NEW_SIZE(dap_chain_datum_tx_receipt_t,l_receipt_size); + l_is_first_sign = true; + l_usage->is_active = true; + memcpy( l_usage->receipt, l_receipt, l_receipt_size); + } else if (l_usage->receipt_next ){ + DAP_DELETE(l_usage->receipt_next); + l_usage->receipt_next = DAP_NEW_SIZE(dap_chain_datum_tx_receipt_t,l_receipt_size); + l_usage->is_active = true; + memcpy( l_usage->receipt_next, l_receipt, l_receipt_size); + } - if (l_usage->is_grace) - log_it(L_NOTICE, "Receipt is OK, but transaction can't be found. Start the grace period for %d seconds", - l_srv->grace_period); - else - log_it(L_NOTICE, "Receipt with remote client sign is acceptible for. Now start the service's usage"); - - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS , - l_success, l_success_size); - DAP_DELETE(l_success); - - if ( l_is_first_sign && l_usage->service->callbacks.response_success){ - if( l_usage->service->callbacks.response_success(l_usage->service,l_usage->id, l_usage->client, - l_receipt, l_receipt_size ) !=0 ){ - log_it(L_NOTICE, "No success by service success callback, inactivating service usage"); - l_usage->is_active = false; - } - } else if (l_usage->service->callbacks.receipt_next_success) { - if (l_usage->service->callbacks.receipt_next_success(l_usage->service, l_usage->id, l_usage->client, - l_receipt, l_receipt_size ) != 0 ){ - log_it(L_NOTICE, "No success by service receipt_next callback, inactivating service usage"); - l_usage->is_active = false; - } - } - } break; + // Store receipt if any problems with transactions + dap_chain_hash_fast_t l_receipt_hash={0}; + dap_hash_fast(l_receipt,l_receipt_size,&l_receipt_hash); + + char *l_receipt_hash_str = dap_chain_hash_fast_to_str_new(&l_receipt_hash); + dap_chain_global_db_gr_set(l_receipt_hash_str, l_receipt, l_receipt_size, "local.receipts"); + DAP_DELETE(l_receipt_hash_str); + + size_t l_success_size; + dap_chain_hash_fast_t *l_tx_in_hash = NULL; + if (!l_usage->is_grace) { + // Form input transaction + dap_chain_addr_t *l_wallet_addr = dap_chain_wallet_get_addr(l_usage->price->wallet, l_usage->net->pub.id); + l_tx_in_hash = dap_chain_mempool_tx_create_cond_input(l_usage->net, &l_usage->tx_cond_hash, l_wallet_addr, + dap_chain_wallet_get_key(l_usage->price->wallet, 0), + l_receipt); + if (l_tx_in_hash) { + memcpy(&l_usage->tx_cond_hash, l_tx_in_hash, sizeof(dap_chain_hash_fast_t)); + char *l_tx_in_hash_str = dap_chain_hash_fast_to_str_new(l_tx_in_hash); + log_it(L_NOTICE, "Formed tx %s for input with active receipt", l_tx_in_hash_str); + DAP_DELETE(l_tx_in_hash_str); + }else + log_it(L_ERROR, "Can't create input tx cond transaction!"); + l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t) + sizeof(dap_chain_hash_fast_t); + } else { + l_success_size = sizeof(dap_stream_ch_chain_net_srv_pkt_success_hdr_t); + } + dap_stream_ch_chain_net_srv_pkt_success_t *l_success = DAP_NEW_STACK_SIZE(dap_stream_ch_chain_net_srv_pkt_success_t, + l_success_size); + l_success->hdr.usage_id = l_usage->id; + l_success->hdr.net_id.uint64 = l_usage->net->pub.id.uint64; + l_success->hdr.srv_uid.uint64 = l_usage->service->uid.uint64; + if (l_tx_in_hash) { + memcpy(l_success->custom_data, l_tx_in_hash, sizeof(dap_chain_hash_fast_t)); + DAP_DELETE(l_tx_in_hash); + } - case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_DATA:{ - if (l_ch_pkt->hdr.size < sizeof(dap_stream_ch_chain_net_srv_pkt_data_hdr_t) ){ - log_it( L_WARNING, "Wrong request size, less than minimum"); - break; - } - // Parse the packet - dap_stream_ch_chain_net_srv_pkt_data_t * l_pkt =(dap_stream_ch_chain_net_srv_pkt_data_t *) l_ch_pkt->data; - size_t l_pkt_size = l_ch_pkt->hdr.size - sizeof (dap_stream_ch_chain_net_srv_pkt_data_t); - dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get( l_pkt->hdr.srv_uid); - dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find_unsafe( l_srv_session, l_pkt->hdr.usage_id ); - // If service not found - if ( l_srv == NULL){ - l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND ; - l_err.srv_uid = l_pkt->hdr.srv_uid; - dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); - break; - } - if (!l_srv->callbacks.custom_data) - break; - size_t l_out_data_size = 0; - void *l_out_data = l_srv->callbacks.custom_data(l_srv, l_usage, l_pkt->data, l_pkt_size, &l_out_data_size); - if (l_out_data && l_out_data_size) { - dap_stream_ch_chain_net_srv_pkt_data_t *l_data = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_srv_pkt_data_t, - sizeof(dap_stream_ch_chain_net_srv_pkt_data_t) + l_out_data_size); - l_data->hdr.version = 1; - l_data->hdr.srv_uid = l_srv->uid; - l_data->hdr.usage_id = l_pkt->hdr.usage_id; - l_data->hdr.data_size = l_out_data_size; - memcpy(l_data->data, l_out_data, l_out_data_size); - dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_DATA, l_data, sizeof(dap_stream_ch_chain_net_srv_pkt_data_t)+l_out_data_size); - DAP_DELETE(l_data); - } - } break; - - case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR:{ - if ( l_ch_pkt->hdr.size == sizeof (dap_stream_ch_chain_net_srv_pkt_error_t) ){ - dap_stream_ch_chain_net_srv_pkt_error_t * l_err = (dap_stream_ch_chain_net_srv_pkt_error_t *) l_ch_pkt->data; - log_it( L_NOTICE, "Remote responsed with error code 0x%08X", l_err->code ); - // TODO code for service client mode - }else{ - log_it(L_ERROR, "Wrong error response size, %u when expected %zu", l_ch_pkt->hdr.size, - sizeof ( dap_stream_ch_chain_net_srv_pkt_error_t) ); - } - } break; + if (l_usage->is_grace) + log_it(L_NOTICE, "Receipt is OK, but transaction can't be found. Start the grace period for %d seconds", + l_srv->grace_period); + else + log_it(L_NOTICE, "Receipt with remote client sign is acceptible for. Now start the service's usage"); - default: log_it( L_WARNING, "Unknown packet type 0x%02X", l_ch_pkt->hdr.type); + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_SUCCESS , + l_success, l_success_size); + + if ( l_is_first_sign && l_usage->service->callbacks.response_success){ + if( l_usage->service->callbacks.response_success(l_usage->service,l_usage->id, l_usage->client, + l_receipt, l_receipt_size ) !=0 ){ + log_it(L_NOTICE, "No success by service success callback, inactivating service usage"); + l_usage->is_active = false; + } + } else if (l_usage->service->callbacks.receipt_next_success) { + if (l_usage->service->callbacks.receipt_next_success(l_usage->service, l_usage->id, l_usage->client, + l_receipt, l_receipt_size ) != 0 ){ + log_it(L_NOTICE, "No success by service receipt_next callback, inactivating service usage"); + l_usage->is_active = false; + } } - if(l_ch_chain_net_srv->notify_callback) - l_ch_chain_net_srv->notify_callback(l_ch_chain_net_srv, l_ch_pkt->hdr.type, l_ch_pkt, l_ch_chain_net_srv->notify_callback_arg); - } + } break; + case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_DATA: { + if (l_ch_pkt->hdr.size < sizeof(dap_stream_ch_chain_net_srv_pkt_data_hdr_t) ){ + log_it( L_WARNING, "Wrong request size, less than minimum"); + break; + } + typedef dap_stream_ch_chain_net_srv_pkt_data_t pkt_t; + pkt_t * l_pkt =(pkt_t *) l_ch_pkt->data; + size_t l_pkt_size = l_ch_pkt->hdr.size - sizeof(pkt_t); + dap_chain_net_srv_t * l_srv = dap_chain_net_srv_get( l_pkt->hdr.srv_uid); + dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find_unsafe( l_srv_session, l_pkt->hdr.usage_id ); + // If service not found + if ( l_srv == NULL){ + l_err.code = DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR_CODE_SERVICE_NOT_FOUND ; + l_err.srv_uid = l_pkt->hdr.srv_uid; + dap_stream_ch_pkt_write_unsafe( a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR, &l_err, sizeof (l_err) ); + break; + } + if (!l_srv->callbacks.custom_data) + break; + size_t l_out_data_size = 0; + void *l_out_data = l_srv->callbacks.custom_data(l_srv, l_usage, l_pkt->data, l_pkt_size, &l_out_data_size); + if (l_out_data && l_out_data_size) { + pkt_t *l_data = DAP_NEW_STACK_SIZE(pkt_t, sizeof(pkt_t) + l_out_data_size); + l_data->hdr.version = 1; + l_data->hdr.srv_uid = l_srv->uid; + l_data->hdr.usage_id = l_pkt->hdr.usage_id; + l_data->hdr.data_size = l_out_data_size; + memcpy(l_data->data, l_out_data, l_out_data_size); + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_DATA, l_data, sizeof(pkt_t) + l_out_data_size); + } + } break; + + case DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_RESPONSE_ERROR:{ + if ( l_ch_pkt->hdr.size == sizeof (dap_stream_ch_chain_net_srv_pkt_error_t) ){ + dap_stream_ch_chain_net_srv_pkt_error_t * l_err = (dap_stream_ch_chain_net_srv_pkt_error_t *) l_ch_pkt->data; + log_it( L_NOTICE, "Remote responsed with error code 0x%08X", l_err->code ); + // TODO code for service client mode + }else{ + log_it(L_ERROR, "Wrong error response size, %u when expected %zu", l_ch_pkt->hdr.size, + sizeof ( dap_stream_ch_chain_net_srv_pkt_error_t) ); + } + } break; + + default: log_it( L_WARNING, "Unknown packet type 0x%02X", l_ch_pkt->hdr.type); + } + if(l_ch_chain_net_srv->notify_callback) + l_ch_chain_net_srv->notify_callback(l_ch_chain_net_srv, l_ch_pkt->hdr.type, l_ch_pkt, l_ch_chain_net_srv->notify_callback_arg); } /** diff --git a/modules/channel/chain-net/dap_stream_ch_chain_net.c b/modules/channel/chain-net/dap_stream_ch_chain_net.c index 745a4157eb..820ea683a4 100644 --- a/modules/channel/chain-net/dap_stream_ch_chain_net.c +++ b/modules/channel/chain-net/dap_stream_ch_chain_net.c @@ -206,7 +206,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) strcpy(l_err_str, "ERROR_NET_INVALID_ID"); l_error = true; } - if (!l_error && a_ch->stream->session->acl && !a_ch->stream->session->acl[l_acl_idx]) { + uint8_t l_acl = a_ch->stream->session->acl ? a_ch->stream->session->acl[l_acl_idx] : 1; + if (!l_error && !l_acl) { log_it(L_WARNING, "Unauthorized request attempt to network %s", dap_chain_net_by_id(l_ch_chain_net_pkt->hdr.net_id)->pub.name); strcpy(l_err_str, "ERROR_NET_NOT_AUTHORIZED"); diff --git a/modules/channel/chain-voting/dap_stream_ch_chain_voting.c b/modules/channel/chain-voting/dap_stream_ch_chain_voting.c index ec3c350d99..a615aa1ead 100644 --- a/modules/channel/chain-voting/dap_stream_ch_chain_voting.c +++ b/modules/channel/chain-voting/dap_stream_ch_chain_voting.c @@ -53,11 +53,10 @@ static voting_pkt_items_t *s_pkt_items = NULL; static voting_node_client_list_t *s_node_client_list = NULL; static pthread_rwlock_t s_node_client_list_rwlock = PTHREAD_RWLOCK_INITIALIZER; -static void s_callback_send_all_loopback(dap_chain_node_addr_t *a_remote_node_addr); +static void s_callback_send_all_loopback(uint64_t a_node_addr); static void s_callback_pkt_items_send_all(dap_client_t *a_client, void *a_arg); -static void s_callback_channel_pkt_free(uint64_t a_node_addr_uint64); -static void s_callback_channel_pkt_buf_limit(uint64_t a_node_addr_uint64); +static void s_callback_send_all_unsafe(dap_client_t *a_client, void *a_arg); static void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg); static void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg); @@ -108,9 +107,9 @@ void dap_stream_ch_chain_voting_in_callback_add(void* a_arg, voting_ch_callback_ } void dap_stream_ch_chain_voting_message_write(dap_chain_net_t * a_net, dap_list_t *a_sendto_nodes, - dap_chain_hash_fast_t * a_data_hash, - const void * a_data, size_t a_data_size){ - pthread_rwlock_rdlock(&s_pkt_items->rwlock_out); + dap_chain_hash_fast_t * a_data_hash, + const void * a_data, size_t a_data_size) +{ dap_stream_ch_chain_voting_pkt_t * l_voting_pkt; size_t l_voting_pkt_size = sizeof(l_voting_pkt->hdr) + a_data_size; l_voting_pkt = DAP_NEW_SIZE(dap_stream_ch_chain_voting_pkt_t, l_voting_pkt_size ); @@ -125,73 +124,18 @@ void dap_stream_ch_chain_voting_message_write(dap_chain_net_t * a_net, dap_list_ voting_pkt_addr_t * l_pkt_addr = DAP_NEW_Z(voting_pkt_addr_t); l_pkt_addr->node_addr.uint64 = 0; l_pkt_addr->voting_pkt = l_voting_pkt; - s_pkt_items->pkts_out = dap_list_append(s_pkt_items->pkts_out, l_pkt_addr); + pthread_rwlock_wrlock(&s_pkt_items->rwlock_out); + s_pkt_items->pkts_out = dap_list_append(s_pkt_items->pkts_out, l_pkt_addr); pthread_rwlock_unlock(&s_pkt_items->rwlock_out); dap_stream_ch_chain_voting_pkt_broadcast(a_net, a_sendto_nodes); } -static void s_callback_channel_pkt_free(uint64_t a_node_addr_uint64) +static void s_callback_send_all_unsafe_on_worker(dap_worker_t *a_worker, void *a_arg) { - pthread_rwlock_rdlock(&s_pkt_items->rwlock_out); - if ( dap_list_length(s_pkt_items->pkts_out) == 0 ){ - pthread_rwlock_unlock(&s_pkt_items->rwlock_out); - return; - } - - dap_list_t *l_list = dap_list_first(s_pkt_items->pkts_out); - while( l_list ) { - // dap_list_t *l_next_list = l_list->next; - voting_pkt_addr_t *l_pkt_addr = l_list->data; - if ( l_pkt_addr->node_addr.uint64 == a_node_addr_uint64) { - DAP_DELETE(l_pkt_addr->voting_pkt); - DAP_DELETE(l_pkt_addr); - dap_list_t *l_tmp = l_list->next; - pthread_rwlock_unlock(&s_pkt_items->rwlock_out); - pthread_rwlock_wrlock(&s_pkt_items->rwlock_out); - s_pkt_items->pkts_out = dap_list_delete_link(s_pkt_items->pkts_out, l_list); - pthread_rwlock_unlock(&s_pkt_items->rwlock_out); - pthread_rwlock_rdlock(&s_pkt_items->rwlock_out); - // dap_list_t *l_list = dap_list_first(s_pkt_items->pkts_out); - l_list = l_tmp; - } else { - l_list = l_list->next; - } - } - pthread_rwlock_unlock(&s_pkt_items->rwlock_out); -} - - -// remove overflow in outbuf -static void s_callback_channel_pkt_buf_limit(uint64_t a_node_addr_uint64) { - if ( dap_list_length(s_pkt_items->pkts_out) == 0 ) - return; - unsigned int l_limit = 10; // max messages per one addr - dap_list_t *l_list = dap_list_first(s_pkt_items->pkts_out); - unsigned int l_count = 0; - while (l_list) { - voting_pkt_addr_t *l_pkt_addr = (voting_pkt_addr_t *)l_list->data; - if ( l_pkt_addr->node_addr.uint64 == a_node_addr_uint64) - l_count++; - l_list = l_list->next; - } - if ( l_count > l_limit) { - unsigned int l_over = l_count-l_limit; - dap_list_t *l_list = dap_list_first(s_pkt_items->pkts_out); - while (l_list && l_over) { - voting_pkt_addr_t *l_pkt_addr = (voting_pkt_addr_t *)l_list->data; - if ( l_pkt_addr->node_addr.uint64 == a_node_addr_uint64 ) { - DAP_DELETE(l_pkt_addr->voting_pkt); - DAP_DELETE(l_pkt_addr); - s_pkt_items->pkts_out = dap_list_remove_link(s_pkt_items->pkts_out, l_list); - l_over--; - } - l_list = l_list->next; - } - } - - + UNUSED(a_worker); + s_callback_send_all_unsafe((dap_client_t *)a_arg, NULL); } @@ -370,25 +314,65 @@ void dap_stream_ch_chain_voting_pkt_broadcast(dap_chain_net_t *a_net, dap_list_t DAP_DELETE(l_args); } } -static void s_callback_send_all_loopback(dap_chain_node_addr_t *a_remote_node_addr) { +static void s_callback_send_all_loopback(uint64_t a_node_addr) { pthread_rwlock_rdlock(&s_pkt_items->rwlock_out); dap_list_t* l_pkts_list = dap_list_first(s_pkt_items->pkts_out); while(l_pkts_list) { dap_list_t *l_pkts_list_next = l_pkts_list->next; voting_pkt_addr_t *l_pkt_addr = (voting_pkt_addr_t *)l_pkts_list->data; - dap_stream_ch_chain_voting_pkt_t * l_voting_pkt = l_pkt_addr->voting_pkt; - size_t l_voting_pkt_size = sizeof(l_voting_pkt->hdr) + l_voting_pkt->hdr.data_size; - if ( l_pkt_addr->node_addr.uint64 == a_remote_node_addr->uint64 ) { - dap_stream_ch_chain_voting_pkt_t * l_pkt_lb = DAP_NEW_SIZE(dap_stream_ch_chain_voting_pkt_t, l_voting_pkt_size); - memcpy(l_pkt_lb, l_voting_pkt, l_voting_pkt_size); - pthread_rwlock_rdlock(&s_pkt_items->rwlock_in); - s_pkt_items->pkts_in = dap_list_append(s_pkt_items->pkts_in, l_pkt_lb); - pthread_rwlock_unlock(&s_pkt_items->rwlock_in); - } - l_pkts_list = l_pkts_list_next; - } - pthread_rwlock_unlock(&s_pkt_items->rwlock_out); - s_callback_channel_pkt_free(a_remote_node_addr->uint64); + if (l_pkt_addr->node_addr.uint64 == 0) { + if (a_node_addr) { + l_pkt_addr->voting_pkt->hdr.sender_node_addr.uint64 = + l_pkt_addr->voting_pkt->hdr.recipient_node_addr.uint64 = + a_node_addr; + pthread_rwlock_wrlock(&s_pkt_items->rwlock_in); + s_pkt_items->pkts_in = dap_list_append(s_pkt_items->pkts_in, l_pkt_addr->voting_pkt); + pthread_rwlock_unlock(&s_pkt_items->rwlock_in); + } else + DAP_DELETE(l_pkt_addr->voting_pkt); + DAP_DELETE(l_pkt_addr); + s_pkt_items->pkts_out = dap_list_delete_link(s_pkt_items->pkts_out, l_pkts_list); + } + l_pkts_list = l_pkts_list_next; + } + pthread_rwlock_unlock(&s_pkt_items->rwlock_out); +} + +/** + * @brief s_callback_send_all_unsafe + * @param a_client + * @param a_arg + */ +static void s_callback_send_all_unsafe(dap_client_t *a_client, void *a_arg) +{ + UNUSED(a_arg); + pthread_rwlock_wrlock(&s_pkt_items->rwlock_out); + dap_chain_node_client_t *l_node_client = DAP_CHAIN_NODE_CLIENT(a_client); + if (l_node_client) { + dap_stream_ch_t * l_ch = dap_client_get_stream_ch_unsafe(a_client, dap_stream_ch_chain_voting_get_id() ); + if (l_ch) { + dap_list_t* l_pkts_list = s_pkt_items->pkts_out; + while(l_pkts_list) { + dap_list_t *l_pkts_list_next = l_pkts_list->next; + voting_pkt_addr_t *l_pkt_addr = (voting_pkt_addr_t *)l_pkts_list->data; + dap_stream_ch_chain_voting_pkt_t * l_voting_pkt = l_pkt_addr->voting_pkt; + size_t l_voting_pkt_size = sizeof(l_voting_pkt->hdr) + l_voting_pkt->hdr.data_size; + if ( l_pkt_addr->node_addr.uint64 == l_node_client->remote_node_addr.uint64 ) { + if (l_ch) { + dap_stream_ch_pkt_write_unsafe(l_ch, + l_voting_pkt->hdr.pkt_type, l_voting_pkt, l_voting_pkt_size); + log_it(L_DEBUG, "Sent pkt size %zu to addr "NODE_ADDR_FP_STR, l_voting_pkt_size, + NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr)); + } + DAP_DELETE(l_voting_pkt); + DAP_DELETE(l_pkt_addr); + s_pkt_items->pkts_out = dap_list_delete_link(s_pkt_items->pkts_out, l_pkts_list); + } + l_pkts_list = l_pkts_list_next; + } + } + } + pthread_rwlock_unlock(&s_pkt_items->rwlock_out); } /** @@ -454,40 +438,29 @@ static void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) { static bool s_packet_in_callback_handler(void *a_arg) { UNUSED(a_arg); - pthread_rwlock_rdlock(&s_pkt_items->rwlock_in); - if (dap_list_length(s_pkt_items->pkts_in)) { + pthread_rwlock_wrlock(&s_pkt_items->rwlock_in); + if (s_pkt_items->pkts_in) { dap_list_t* l_list_pkts = dap_list_copy(s_pkt_items->pkts_in); dap_list_free(s_pkt_items->pkts_in); s_pkt_items->pkts_in = NULL; - - dap_list_t* l_list_temp = dap_list_first(l_list_pkts); - while(l_list_temp) { - dap_list_t *l_list_next = l_list_temp->next; - dap_stream_ch_chain_voting_pkt_t * l_voting_pkt = (dap_stream_ch_chain_voting_pkt_t *)l_list_temp->data; + pthread_rwlock_unlock(&s_pkt_items->rwlock_in); + while(l_list_pkts) { + dap_list_t *l_list_next = l_list_pkts->next; + dap_stream_ch_chain_voting_pkt_t * l_voting_pkt = (dap_stream_ch_chain_voting_pkt_t *)l_list_pkts->data; for (size_t i=0; i<s_pkt_in_callback_count; i++) { voting_pkt_in_callback_t * l_callback = s_pkt_in_callback+i; if (l_callback->packet_in_callback) { - dap_chain_node_addr_t *l_sender_node_addr = DAP_NEW(dap_chain_node_addr_t); - memcpy(l_sender_node_addr, &l_voting_pkt->hdr.sender_node_addr, sizeof(dap_chain_node_addr_t)); - - dap_chain_hash_fast_t *l_data_hash = DAP_NEW(dap_chain_hash_fast_t); - memcpy(l_data_hash, &l_voting_pkt->hdr.data_hash, sizeof(dap_chain_hash_fast_t)); - - uint8_t * l_data = DAP_NEW_SIZE(uint8_t, l_voting_pkt->hdr.data_size); - memcpy(l_data, &l_voting_pkt->data, l_voting_pkt->hdr.data_size); - l_callback->packet_in_callback(l_callback->arg, l_sender_node_addr, - l_data_hash, l_data, l_voting_pkt->hdr.data_size); - DAP_DELETE(l_sender_node_addr); - DAP_DELETE(l_data_hash); - DAP_DELETE(l_data); + l_callback->packet_in_callback(l_callback->arg, &l_voting_pkt->hdr.sender_node_addr, + &l_voting_pkt->hdr.data_hash, l_voting_pkt->data, l_voting_pkt->hdr.data_size); } - } - l_list_temp = l_list_next; + } DAP_DELETE(l_voting_pkt); + l_list_pkts = l_list_next; } dap_list_free(l_list_pkts); - } - pthread_rwlock_unlock(&s_pkt_items->rwlock_in); + } else { + pthread_rwlock_unlock(&s_pkt_items->rwlock_in); + } return true; } diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index 9aad8bb539..519381ef60 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -95,3 +95,4 @@ inline static uint8_t dap_stream_ch_chain_get_id(void) { return (uint8_t) 'C'; } dap_chain_t * dap_chain_get_chain_from_group_name(dap_chain_net_id_t a_net_id, const char *a_group_name); void dap_stream_ch_chain_create_sync_request_gdb(dap_stream_ch_chain_t * a_ch_chain, dap_chain_net_t * a_net); void dap_stream_ch_chain_timer_start(dap_stream_ch_chain_t *a_ch_chain); +void dap_stream_ch_chain_reset(dap_stream_ch_chain_t *a_ch_chain); diff --git a/modules/net/srv/include/dap_chain_net_srv.h b/modules/net/srv/include/dap_chain_net_srv.h index 5f9686769e..2273807c23 100755 --- a/modules/net/srv/include/dap_chain_net_srv.h +++ b/modules/net/srv/include/dap_chain_net_srv.h @@ -170,7 +170,7 @@ typedef struct dap_stream_ch_chain_net_srv_pkt_test { dap_chain_net_srv_uid_t srv_uid; int32_t time_connect_ms; struct timeval recv_time1; - struct timeval recv_time2; + struct timespec recv_time2; struct timeval send_time1; struct timeval send_time2; byte_t ip_send[16]; -- GitLab