Skip to content
Snippets Groups Projects
Commit d6f8a05f authored by Aleksandr Lysikov's avatar Aleksandr Lysikov
Browse files

added new packet types

added support of the packets ping & pong
parent 80f0cf1e
No related branches found
No related tags found
No related merge requests found
......@@ -3,7 +3,7 @@
* Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net>
* DeM Labs Inc. https://demlabs.net
* Kelvin Project https://github.com/kelvinblockchain
* Copyright (c) 2017-2018
* Copyright (c) 2017-2019
* All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
......@@ -22,37 +22,38 @@
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <pthread.h>
#include <errno.h>
#include <string.h>
#include "dap_common.h"
#include "dap_http_client.h"
#include "dap_stream.h"
#include "dap_stream_ch.h"
#include "dap_stream_ch_chain_net.h"
#include "dap_stream_ch_pkt.h"
#include "dap_stream_ch_proc.h"
#include "dap_stream_ch_chain_net_pkt.h"
#include "dap_stream_ch_chain_net.h"
#define LOG_TAG "dap_stream_ch_chain_net"
typedef struct dap_stream_ch_chain_net {
pthread_mutex_t mutex;
} dap_stream_ch_chain_net_t;
#define DAP_STREAM_CH_CHAIN_NET(a) ((dap_stream_ch_chain_net_t *) ((a)->internal) )
static void s_stream_ch_new(dap_stream_ch_t* ch , void* arg);
static void s_stream_ch_delete(dap_stream_ch_t* ch , void* arg);
static void s_stream_ch_packet_in(dap_stream_ch_t* ch , void* arg);
static void s_stream_ch_packet_out(dap_stream_ch_t* ch , void* arg);
uint8_t dap_stream_ch_chain_net_get_id()
{
return 'N';
}
/**
* @brief dap_stream_ch_chain_net_init
* @return
*/
int dap_stream_ch_chain_net_init()
{
log_it(L_NOTICE,"Chain network channel initialized");
dap_stream_ch_proc_add('N',s_stream_ch_new,s_stream_ch_delete,s_stream_ch_packet_in,s_stream_ch_packet_out);
log_it(L_NOTICE, "Chain network channel initialized");
dap_stream_ch_proc_add(dap_stream_ch_chain_net_get_id(), s_stream_ch_new, s_stream_ch_delete,
s_stream_ch_packet_in, s_stream_ch_packet_out);
return 0;
}
......@@ -88,14 +89,55 @@ void s_stream_ch_delete(dap_stream_ch_t* ch , void* arg)
}
static int debug_count = 0;
/**
* @brief s_stream_ch_packet_in
* @param ch
* @param arg
*/
void s_stream_ch_packet_in(dap_stream_ch_t* ch , void* arg)
void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
{
dap_stream_ch_chain_net_t * l_ch_chain = DAP_STREAM_CH_CHAIN_NET(a_ch);
if(l_ch_chain) {
dap_stream_ch_pkt_t *l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg;
dap_stream_ch_chain_net_pkt_t *l_chain_pkt = (dap_stream_ch_chain_net_pkt_t *) l_ch_pkt->data;
if(l_chain_pkt) {
size_t data_size = l_ch_pkt->hdr.size - sizeof(dap_stream_ch_chain_net_pkt_hdr_t);
printf("*packet TYPE=%d data_size=%d in=%s\n", l_chain_pkt->hdr.type, data_size,
(data_size > 0) ? (char*) (l_chain_pkt->data) : "-");
switch (l_chain_pkt->hdr.type) {
case STREAM_CH_CHAIN_NET_PKT_TYPE_DBG: {
dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_PING, NULL, 0);
dap_stream_ch_set_ready_to_write(a_ch, true);
}
break;
case STREAM_CH_CHAIN_NET_PKT_TYPE_PING: {
log_it(L_INFO,"Get STREAM_CH_CHAIN_NET_PKT_TYPE_PING");
int l_res = dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_PONG, NULL, 0);
dap_stream_ch_set_ready_to_write(a_ch, true);
}
break;
case STREAM_CH_CHAIN_NET_PKT_TYPE_PONG: {
log_it(L_INFO,"Get STREAM_CH_CHAIN_NET_PKT_TYPE_PONG");
dap_stream_ch_set_ready_to_write(a_ch, false);
}
break;
case STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB: {
}
break;
case STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC: {
// должен сообщать последнюю ревизию global_db у ноды-клиента,
// чем инициировать у ноды-сервера процесс отправки транзиция
}
break;
}
if(l_ch_chain->notify_callback)
l_ch_chain->notify_callback(l_chain_pkt, l_ch_chain->notify_callback_arg);
}
}
debug_count++;
}
/**
......@@ -103,7 +145,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* ch , void* arg)
* @param ch
* @param arg
*/
void s_stream_ch_packet_out(dap_stream_ch_t* ch , void* arg)
void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
{
printf("*packet out count=%d\n", debug_count);
dap_stream_ch_set_ready_to_write(a_ch, false);
}
......@@ -24,5 +24,20 @@
#pragma once
#include <pthread.h>
#include <stdint.h>
#include "dap_stream_ch_chain_net_pkt.h"
typedef void (*dap_stream_ch_chain_net_callback_t)(dap_stream_ch_chain_net_pkt_t*, void*);
typedef struct dap_stream_ch_chain_net {
pthread_mutex_t mutex;
dap_stream_ch_chain_net_callback_t notify_callback;
void *notify_callback_arg;
} dap_stream_ch_chain_net_t;
#define DAP_STREAM_CH_CHAIN_NET(a) ((dap_stream_ch_chain_net_t *) ((a)->internal) )
uint8_t dap_stream_ch_chain_net_get_id();
int dap_stream_ch_chain_net_init();
void dap_stream_ch_chain_net_deinit();
#include <stdarg.h>
#include <stdio.h>
#include <string.h>
#include <dap_common.h>
#include <dap_stream.h>
#include <dap_stream_pkt.h>
#include <dap_stream_ch_pkt.h>
#include "dap_stream_ch_chain_pkt.h"
#include "dap_stream_ch_chain_net.h"
#include "dap_stream_ch_chain_net_pkt.h"
#define LOG_TAG "dap_stream_ch_chain_net_pkt"
/**
* @brief dap_stream_ch_net_pkt_write
* @param sid
* @param data
* @param data_size
* @return
*/
size_t dap_stream_ch_chain_net_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type,
const void * a_data, uint32_t a_data_size)
{
dap_stream_ch_chain_net_pkt_t l_hdr;
memset(&l_hdr, 0, sizeof(l_hdr));
l_hdr.hdr.type = a_type;
size_t l_buf_size = sizeof(l_hdr) + a_data_size;
char *l_buf = DAP_NEW_SIZE(char, l_buf_size);
memcpy(l_buf, &l_hdr, sizeof(l_hdr));
memcpy(l_buf + sizeof(l_hdr), a_data, a_data_size);
int l_ret = dap_stream_ch_pkt_write(a_ch, STREAM_CH_CHAIN_PKT_TYPE_REQUEST, l_buf, l_buf_size);
DAP_DELETE(l_buf);
return l_ret;
}
/**
* @brief dap_stream_ch_chain_net_pkt_write_f
* @param sid
* @param str
* @return
*/
size_t dap_stream_ch_chain_net_pkt_write_f(dap_stream_ch_t *a_ch, uint8_t a_type, const char *a_str, ...)
{
char l_buf[4096];
va_list ap;
va_start(ap, a_str);
vsnprintf(l_buf, sizeof(l_buf), a_str, ap);
va_end(ap);
size_t ret = dap_stream_ch_chain_net_pkt_write(a_ch, a_type, l_buf, strlen(l_buf));
return ret;
}
......@@ -29,8 +29,15 @@
#include "dap_chain_net.h"
#include "dap_chain_node.h"
#define STREAM_CH_CHAIN_NET_PKT_TYPE_PING 0x0000
#define STREAM_CH_CHAIN_NET_PKT_TYPE_PONG 0x0001
#define STREAM_CH_CHAIN_NET_PKT_TYPE_REQUEST 0x00
#define STREAM_CH_CHAIN_NET_PKT_TYPE_PING 0x01
#define STREAM_CH_CHAIN_NET_PKT_TYPE_PONG 0x02
#define STREAM_CH_CHAIN_NET_PKT_TYPE_BLOCK 0x11
#define STREAM_CH_CHAIN_NET_PKT_TYPE_DATUM 0x12
#define STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB 0x13
#define STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC 0x14
#define STREAM_CH_CHAIN_NET_PKT_TYPE_DBG 0x99
typedef struct stream_ch_chain_net_pkt_hdr{
dap_chain_net_id_t net_id;
......@@ -46,3 +53,6 @@ typedef struct dap_stream_ch_chain_net_pkt{
uint8_t data[];
} __attribute__((packed)) dap_stream_ch_chain_net_pkt_t;
size_t dap_stream_ch_chain_net_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, const void * a_data, uint32_t a_data_size);
size_t dap_stream_ch_chain_net_pkt_write_f(dap_stream_ch_t *a_ch, uint8_t a_type, const char *a_str, ...);
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment