From d6f8a05f1060ea12841745663aa9a04409341348 Mon Sep 17 00:00:00 2001 From: Aleksandr Lysikov <lysikov@inbox.ru> Date: Sat, 23 Mar 2019 23:35:46 +0500 Subject: [PATCH] added new packet types added support of the packets ping & pong --- dap_stream_ch_chain_net.c | 75 +++++++++++++++++++++++++++-------- dap_stream_ch_chain_net.h | 15 +++++++ dap_stream_ch_chain_net_pkt.c | 51 ++++++++++++++++++++++++ dap_stream_ch_chain_net_pkt.h | 14 ++++++- 4 files changed, 137 insertions(+), 18 deletions(-) diff --git a/dap_stream_ch_chain_net.c b/dap_stream_ch_chain_net.c index db14f8c..945d57b 100644 --- a/dap_stream_ch_chain_net.c +++ b/dap_stream_ch_chain_net.c @@ -3,7 +3,7 @@ * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> * DeM Labs Inc. https://demlabs.net * Kelvin Project https://github.com/kelvinblockchain - * Copyright (c) 2017-2018 + * Copyright (c) 2017-2019 * All rights reserved. This file is part of DAP (Deus Applications Prototypes) the open source project @@ -22,37 +22,38 @@ along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. */ -#include <pthread.h> #include <errno.h> #include <string.h> #include "dap_common.h" +#include "dap_http_client.h" #include "dap_stream.h" -#include "dap_stream_ch.h" -#include "dap_stream_ch_chain_net.h" +#include "dap_stream_ch_pkt.h" #include "dap_stream_ch_proc.h" +#include "dap_stream_ch_chain_net_pkt.h" +#include "dap_stream_ch_chain_net.h" #define LOG_TAG "dap_stream_ch_chain_net" -typedef struct dap_stream_ch_chain_net { - pthread_mutex_t mutex; -} dap_stream_ch_chain_net_t; - -#define DAP_STREAM_CH_CHAIN_NET(a) ((dap_stream_ch_chain_net_t *) ((a)->internal) ) - static void s_stream_ch_new(dap_stream_ch_t* ch , void* arg); static void s_stream_ch_delete(dap_stream_ch_t* ch , void* arg); static void s_stream_ch_packet_in(dap_stream_ch_t* ch , void* arg); static void s_stream_ch_packet_out(dap_stream_ch_t* ch , void* arg); +uint8_t dap_stream_ch_chain_net_get_id() +{ + return 'N'; +} + /** * @brief dap_stream_ch_chain_net_init * @return */ int dap_stream_ch_chain_net_init() { - log_it(L_NOTICE,"Chain network channel initialized"); - dap_stream_ch_proc_add('N',s_stream_ch_new,s_stream_ch_delete,s_stream_ch_packet_in,s_stream_ch_packet_out); + log_it(L_NOTICE, "Chain network channel initialized"); + dap_stream_ch_proc_add(dap_stream_ch_chain_net_get_id(), s_stream_ch_new, s_stream_ch_delete, + s_stream_ch_packet_in, s_stream_ch_packet_out); return 0; } @@ -88,14 +89,55 @@ void s_stream_ch_delete(dap_stream_ch_t* ch , void* arg) } +static int debug_count = 0; + /** * @brief s_stream_ch_packet_in * @param ch * @param arg */ -void s_stream_ch_packet_in(dap_stream_ch_t* ch , void* arg) +void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) { - + dap_stream_ch_chain_net_t * l_ch_chain = DAP_STREAM_CH_CHAIN_NET(a_ch); + if(l_ch_chain) { + dap_stream_ch_pkt_t *l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg; + dap_stream_ch_chain_net_pkt_t *l_chain_pkt = (dap_stream_ch_chain_net_pkt_t *) l_ch_pkt->data; + if(l_chain_pkt) { + size_t data_size = l_ch_pkt->hdr.size - sizeof(dap_stream_ch_chain_net_pkt_hdr_t); + printf("*packet TYPE=%d data_size=%d in=%s\n", l_chain_pkt->hdr.type, data_size, + (data_size > 0) ? (char*) (l_chain_pkt->data) : "-"); + + switch (l_chain_pkt->hdr.type) { + case STREAM_CH_CHAIN_NET_PKT_TYPE_DBG: { + dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_PING, NULL, 0); + dap_stream_ch_set_ready_to_write(a_ch, true); + } + break; + case STREAM_CH_CHAIN_NET_PKT_TYPE_PING: { + log_it(L_INFO,"Get STREAM_CH_CHAIN_NET_PKT_TYPE_PING"); + int l_res = dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_PONG, NULL, 0); + dap_stream_ch_set_ready_to_write(a_ch, true); + } + break; + case STREAM_CH_CHAIN_NET_PKT_TYPE_PONG: { + log_it(L_INFO,"Get STREAM_CH_CHAIN_NET_PKT_TYPE_PONG"); + dap_stream_ch_set_ready_to_write(a_ch, false); + } + break; + case STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB: { + } + break; + case STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC: { + // должен Ñообщать поÑледнюю ревизию global_db у ноды-клиента, + // чем инициировать у ноды-Ñервера процеÑÑ Ð¾Ñ‚Ð¿Ñ€Ð°Ð²ÐºÐ¸ Ñ‚Ñ€Ð°Ð½Ð·Ð¸Ñ†Ð¸Ñ + } + break; + } + if(l_ch_chain->notify_callback) + l_ch_chain->notify_callback(l_chain_pkt, l_ch_chain->notify_callback_arg); + } + } + debug_count++; } /** @@ -103,7 +145,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* ch , void* arg) * @param ch * @param arg */ -void s_stream_ch_packet_out(dap_stream_ch_t* ch , void* arg) +void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) { - + printf("*packet out count=%d\n", debug_count); + dap_stream_ch_set_ready_to_write(a_ch, false); } diff --git a/dap_stream_ch_chain_net.h b/dap_stream_ch_chain_net.h index e3b5144..cc87bd3 100644 --- a/dap_stream_ch_chain_net.h +++ b/dap_stream_ch_chain_net.h @@ -24,5 +24,20 @@ #pragma once +#include <pthread.h> +#include <stdint.h> +#include "dap_stream_ch_chain_net_pkt.h" + +typedef void (*dap_stream_ch_chain_net_callback_t)(dap_stream_ch_chain_net_pkt_t*, void*); + +typedef struct dap_stream_ch_chain_net { + pthread_mutex_t mutex; + dap_stream_ch_chain_net_callback_t notify_callback; + void *notify_callback_arg; +} dap_stream_ch_chain_net_t; + +#define DAP_STREAM_CH_CHAIN_NET(a) ((dap_stream_ch_chain_net_t *) ((a)->internal) ) + +uint8_t dap_stream_ch_chain_net_get_id(); int dap_stream_ch_chain_net_init(); void dap_stream_ch_chain_net_deinit(); diff --git a/dap_stream_ch_chain_net_pkt.c b/dap_stream_ch_chain_net_pkt.c index e7cc184..ea44459 100644 --- a/dap_stream_ch_chain_net_pkt.c +++ b/dap_stream_ch_chain_net_pkt.c @@ -1 +1,52 @@ +#include <stdarg.h> +#include <stdio.h> +#include <string.h> + +#include <dap_common.h> +#include <dap_stream.h> +#include <dap_stream_pkt.h> +#include <dap_stream_ch_pkt.h> +#include "dap_stream_ch_chain_pkt.h" #include "dap_stream_ch_chain_net.h" +#include "dap_stream_ch_chain_net_pkt.h" + +#define LOG_TAG "dap_stream_ch_chain_net_pkt" + +/** + * @brief dap_stream_ch_net_pkt_write + * @param sid + * @param data + * @param data_size + * @return + */ +size_t dap_stream_ch_chain_net_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, + const void * a_data, uint32_t a_data_size) +{ + dap_stream_ch_chain_net_pkt_t l_hdr; + memset(&l_hdr, 0, sizeof(l_hdr)); + l_hdr.hdr.type = a_type; + size_t l_buf_size = sizeof(l_hdr) + a_data_size; + char *l_buf = DAP_NEW_SIZE(char, l_buf_size); + memcpy(l_buf, &l_hdr, sizeof(l_hdr)); + memcpy(l_buf + sizeof(l_hdr), a_data, a_data_size); + int l_ret = dap_stream_ch_pkt_write(a_ch, STREAM_CH_CHAIN_PKT_TYPE_REQUEST, l_buf, l_buf_size); + DAP_DELETE(l_buf); + return l_ret; +} + +/** + * @brief dap_stream_ch_chain_net_pkt_write_f + * @param sid + * @param str + * @return + */ +size_t dap_stream_ch_chain_net_pkt_write_f(dap_stream_ch_t *a_ch, uint8_t a_type, const char *a_str, ...) +{ + char l_buf[4096]; + va_list ap; + va_start(ap, a_str); + vsnprintf(l_buf, sizeof(l_buf), a_str, ap); + va_end(ap); + size_t ret = dap_stream_ch_chain_net_pkt_write(a_ch, a_type, l_buf, strlen(l_buf)); + return ret; +} diff --git a/dap_stream_ch_chain_net_pkt.h b/dap_stream_ch_chain_net_pkt.h index a2bcec1..78b4246 100644 --- a/dap_stream_ch_chain_net_pkt.h +++ b/dap_stream_ch_chain_net_pkt.h @@ -29,8 +29,15 @@ #include "dap_chain_net.h" #include "dap_chain_node.h" -#define STREAM_CH_CHAIN_NET_PKT_TYPE_PING 0x0000 -#define STREAM_CH_CHAIN_NET_PKT_TYPE_PONG 0x0001 + +#define STREAM_CH_CHAIN_NET_PKT_TYPE_REQUEST 0x00 +#define STREAM_CH_CHAIN_NET_PKT_TYPE_PING 0x01 +#define STREAM_CH_CHAIN_NET_PKT_TYPE_PONG 0x02 +#define STREAM_CH_CHAIN_NET_PKT_TYPE_BLOCK 0x11 +#define STREAM_CH_CHAIN_NET_PKT_TYPE_DATUM 0x12 +#define STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB 0x13 +#define STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC 0x14 +#define STREAM_CH_CHAIN_NET_PKT_TYPE_DBG 0x99 typedef struct stream_ch_chain_net_pkt_hdr{ dap_chain_net_id_t net_id; @@ -46,3 +53,6 @@ typedef struct dap_stream_ch_chain_net_pkt{ uint8_t data[]; } __attribute__((packed)) dap_stream_ch_chain_net_pkt_t; +size_t dap_stream_ch_chain_net_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, const void * a_data, uint32_t a_data_size); +size_t dap_stream_ch_chain_net_pkt_write_f(dap_stream_ch_t *a_ch, uint8_t a_type, const char *a_str, ...); + -- GitLab