From bf04e32db1ea2d2bc293633e6df6c6ad35f07e73 Mon Sep 17 00:00:00 2001 From: Aleksandr Lysikov <lysikov@inbox.ru> Date: Tue, 9 Apr 2019 23:05:20 +0500 Subject: [PATCH] added commands STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR, STREAM_CH_CHAIN_NET_PKT_TYPE_GET_NODE_ADDR --- dap_stream_ch_chain_net.c | 214 +++++++++++++++++++++++++++------- dap_stream_ch_chain_net.h | 6 +- dap_stream_ch_chain_net_pkt.h | 2 + 3 files changed, 175 insertions(+), 47 deletions(-) diff --git a/dap_stream_ch_chain_net.c b/dap_stream_ch_chain_net.c index c258892..ed11b5b 100644 --- a/dap_stream_ch_chain_net.c +++ b/dap_stream_ch_chain_net.c @@ -27,9 +27,11 @@ #include <pthread.h> #include "dap_common.h" +#include "dap_strfuncs.h" #include "uthash.h" #include "dap_http_client.h" #include "dap_chain_global_db.h" +#include "dap_chain_global_db_remote.h" #include "dap_stream.h" #include "dap_stream_ch_pkt.h" #include "dap_stream_ch_proc.h" @@ -45,7 +47,8 @@ static void s_stream_ch_packet_out(dap_stream_ch_t* ch, void* arg); typedef struct session_data { unsigned int id; - int sock; + //int sock; + int message_id; uint16_t type; // data type time_t timestamp_start; time_t timestamp_cur; @@ -60,8 +63,6 @@ typedef struct message_data { time_t timestamp_start; uint64_t addr_from; // node addr uint64_t addr_to; // node addr -//int16_t cmd; - } message_data_t; // list of active sessions @@ -69,18 +70,37 @@ static session_data_t *s_chain_net_data = NULL; // for separate access to session_data_t static pthread_mutex_t s_hash_mutex = PTHREAD_MUTEX_INITIALIZER; +// create packet to send uint8_t* dap_stream_ch_chain_net_make_packet(uint64_t a_node_addr_from, uint64_t a_node_addr_to, - char *a_timestamp_start_str, size_t *a_data_len_out) + time_t a_timestamp_start, uint8_t *a_sdata, size_t a_sdata_len, size_t *a_data_len_out) { - message_data_t *l_data = DAP_NEW_Z(message_data_t); - l_data->timestamp_start = a_timestamp_start_str ? (time_t) strtoll(a_timestamp_start_str, NULL, 10) : 0; + //message_data_t *l_data = DAP_NEW_Z(message_data_t); + message_data_t *l_data = DAP_NEW_Z_SIZE(message_data_t, sizeof(message_data_t) + a_sdata_len); + l_data->timestamp_start = a_timestamp_start; l_data->addr_from = a_node_addr_from; l_data->addr_to = a_node_addr_to; - *a_data_len_out = sizeof(message_data_t); + // copy add data + memcpy(l_data + 1, a_sdata, a_sdata_len); + *a_data_len_out = sizeof(message_data_t) + a_sdata_len; return (uint8_t*) l_data; } -static void session_data_update(unsigned int a_id, dap_list_t *a_list, message_data_t *a_data, time_t a_timestamp_cur) +// parse received packet +static const message_data_t *dap_stream_ch_chain_net_parse_packet(uint8_t* a_data, size_t a_data_len, + const uint8_t **a_add_data_out, size_t *a_add_data_len_out) +{ + if(a_data_len < sizeof(message_data_t)) + return NULL; + message_data_t *l_data = (message_data_t*) a_data; + if(a_add_data_out) + *a_add_data_out = (uint8_t*) (l_data + 1); + if(a_add_data_len_out) + *a_add_data_len_out = a_data_len - sizeof(message_data_t); + return l_data; +} + +static void session_data_update(unsigned int a_id, int a_messsage_id, dap_list_t *a_list, message_data_t *a_data, + time_t a_timestamp_cur) { session_data_t *l_sdata; pthread_mutex_lock(&s_hash_mutex); @@ -90,6 +110,8 @@ static void session_data_update(unsigned int a_id, dap_list_t *a_list, message_d l_sdata->id = a_id; HASH_ADD_INT(s_chain_net_data, id, l_sdata); } + if(a_messsage_id != -1) + l_sdata->message_id = a_messsage_id; l_sdata->list_tr = a_list; @@ -161,7 +183,7 @@ int dap_stream_ch_chain_net_init() */ void dap_stream_ch_chain_net_deinit() { - printf("* del all sessions\n"); + //printf("* del all sessions\n"); session_data_del_all(); } @@ -174,6 +196,7 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg) { a_ch->internal = DAP_NEW_Z(dap_stream_ch_chain_net_t); dap_stream_ch_chain_net_t * l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(a_ch); + l_ch_chain_net->ch = a_ch; pthread_mutex_init(&l_ch_chain_net->mutex, NULL); } @@ -184,7 +207,7 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg) */ void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) { - printf("* del session=%d\n", a_ch->stream->session->id); + //printf("* del session=%d\n", a_ch->stream->session->id); dap_stream_ch_chain_net_t * l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(a_ch); pthread_mutex_lock(&l_ch_chain_net->mutex); session_data_del(a_ch->stream->session->id); @@ -205,8 +228,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_net_pkt_t *l_chain_pkt = (dap_stream_ch_chain_net_pkt_t *) l_ch_pkt->data; if(l_chain_pkt) { - size_t data_size = l_ch_pkt->hdr.size - sizeof(dap_stream_ch_chain_net_pkt_hdr_t); - printf("*packet TYPE=%d data_size=%d\n", l_chain_pkt->hdr.type, data_size); + size_t l_ch_pkt_data_size = l_ch_pkt->hdr.size - sizeof(dap_stream_ch_chain_net_pkt_hdr_t); + //printf("*packet TYPE=%d data_size=%d\n", l_chain_pkt->hdr.type, l_ch_pkt_data_size); //(data_size > 0) ? (char*) (l_chain_pkt->data) : "-"); switch (l_chain_pkt->hdr.type) { @@ -226,22 +249,73 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) case STREAM_CH_CHAIN_NET_PKT_TYPE_PONG: { log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_PONG"); dap_stream_ch_set_ready_to_write(a_ch, false); + } + break; + // get node address + case STREAM_CH_CHAIN_NET_PKT_TYPE_GET_NODE_ADDR: { + log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GET_NODE_ADDR"); + dap_stream_ch_set_ready_to_write(a_ch, false); + } + break; + // set new node address + case STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR: { + log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR"); + { + uint64_t l_addr = 0; + // set cur node addr + if(l_ch_pkt_data_size == sizeof(uint64_t)) { + memcpy(&l_addr, l_chain_pkt->data, sizeof(uint64_t)); + dap_db_set_cur_node_addr(l_addr); + } + } + session_data_update(a_ch->stream->session->id, STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR, + NULL, NULL, -1); + dap_stream_ch_set_ready_to_write(a_ch, true); } break; case STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB: { - log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB"); + log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB data_size=%d", l_ch_pkt_data_size); // get transaction and save it to global_db - if(data_size > 0) { - int l_data_obj_count = 0; - // deserialize data - void *l_data_obj = dap_db_log_unpack(l_chain_pkt->data, data_size, &l_data_obj_count); // Parse data from dap_db_log_pack() - // save data to global_db - if(!dap_chain_global_db_obj_save(l_data_obj, l_data_obj_count)) - log_it(L_ERROR, "Don't saved to global_db objs=0x%x count=%d",l_data_obj, l_data_obj_count); - + if(l_ch_pkt_data_size > 0) { + + // parse received packet + size_t l_data_size = 0; + const uint8_t *l_data = NULL; + const message_data_t *l_mdata = dap_stream_ch_chain_net_parse_packet(l_chain_pkt->data, + l_ch_pkt_data_size, + &l_data, &l_data_size); + + /*dap_chain_node_addr_t l_node_cur; + dap_chain_node_addr_t l_node_remote; + uint8_t *l_recv_data = l_chain_pkt->data; //DAP_NEW_SIZE(uint8_t, l_item_size_out + 2 * sizeof(dap_chain_node_addr_t)); + memcpy(&l_node_remote, l_recv_data, sizeof(dap_chain_node_addr_t)); + memcpy(&l_node_cur, l_recv_data + sizeof(dap_chain_node_addr_t), sizeof(dap_chain_node_addr_t)); + uint8_t *l_mdata = l_recv_data + 2 * sizeof(dap_chain_node_addr_t); + l_ch_pkt_data_size -= 2 * sizeof(dap_chain_node_addr_t);*/ + + if(l_data && l_data_size > 0) { + //session_data_t *l_data = session_data_find(a_ch->stream->session->id); + int l_data_obj_count = 0; + + // deserialize data + void *l_data_obj = dap_db_log_unpack((uint8_t*) l_data, l_data_size, &l_data_obj_count); // Parse data from dap_db_log_pack() + // save data to global_db + if(!dap_chain_global_db_obj_save(l_data_obj, l_data_obj_count)) { + log_it(L_ERROR, "Don't saved to global_db objs=0x%x count=%d", l_data_obj, + l_data_obj_count); + } + else { + // Get remote timestamp + time_t l_timestamp_remote = dap_db_log_unpack_get_timestamp((uint8_t*) l_data, l_data_size); + // set new timestamp (saved data) for remote node + dap_db_log_set_last_timestamp_remote(l_mdata->addr_from, l_timestamp_remote); + //printf("***ts=%llu\n", l_timestamp_remote); + } + } + dap_stream_ch_set_ready_to_write(a_ch, false); } - dap_stream_ch_set_ready_to_write(a_ch, false); + /*// go to data transfer mode else if(!data_size) { @@ -256,18 +330,34 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) // receive the latest global_db revision of the remote node -> go to send mode case STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC: { - if(data_size == sizeof(message_data_t)) { - message_data_t *a_data = (message_data_t*) l_chain_pkt->data; - - char *l_timestamp_cur_str = dap_db_log_get_last_timestamp(); - // Get log diff - dap_list_t *l_list = dap_db_log_get_list(a_data->timestamp_start); - - session_data_update(a_ch->stream->session->id, l_list, a_data, -1); + if(l_ch_pkt_data_size == sizeof(message_data_t)) { + + // parse received packet + const message_data_t *l_data = dap_stream_ch_chain_net_parse_packet(l_chain_pkt->data, + l_ch_pkt_data_size, + NULL, NULL); + if(l_data) { + //message_data_t *l_data = (message_data_t*) l_chain_pkt->data; + + //time_t l_timestamp_remote_get = l_data->timestamp_start; + //time_t l_timestamp_remote_cur = dap_db_log_get_last_timestamp_remote(l_data->addr_from); + + // last timestamp for remote node + time_t l_timestamp_remote_saved = l_data->timestamp_start; //min(l_timestamp_remote_get, l_timestamp_remote_cur); + + // Get log diff + dap_list_t *l_list = dap_db_log_get_list(l_timestamp_remote_saved); + session_data_update(a_ch->stream->session->id, + STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC, + l_list, (message_data_t*) l_data, -1); + + log_it(L_INFO, + "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC session_id=%u from 0x%llx to 0x%llx count=%d", + a_ch->stream->session->id, l_data->addr_from, l_data->addr_to, dap_list_length(l_list)); + } + // go to send data from list [in s_stream_ch_packet_out()] + // no data to send -> send one empty message STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC dap_stream_ch_set_ready_to_write(a_ch, true); - log_it(L_INFO, - "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC session_id=%u from 0x%llx to 0x%llx", - a_ch->stream->session->id, a_data->addr_from, a_data->addr_to); } else { log_it(L_ERROR, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC session_id=%u bad request", @@ -277,8 +367,20 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; } - if(l_ch_chain_net->notify_callback) - l_ch_chain_net->notify_callback(l_chain_pkt, l_ch_chain_net->notify_callback_arg); + if(l_ch_chain_net->notify_callback) { + if(l_chain_pkt->hdr.type == STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC) { + session_data_t *l_data = session_data_find(a_ch->stream->session->id); + // end of session + if(!l_data->list_tr) + l_ch_chain_net->notify_callback(NULL, l_ch_pkt_data_size, l_ch_chain_net->notify_callback_arg); + else + l_ch_chain_net->notify_callback(l_chain_pkt, l_ch_pkt_data_size, + l_ch_chain_net->notify_callback_arg); + } + else + l_ch_chain_net->notify_callback(l_chain_pkt, l_ch_pkt_data_size, + l_ch_chain_net->notify_callback_arg); + } } pthread_mutex_unlock(&l_ch_chain_net->mutex); } @@ -295,18 +397,32 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) pthread_mutex_lock(&l_ch_chain_net->mutex); session_data_t *l_data = session_data_find(a_ch->stream->session->id); - printf("*packet out session_id=%u\n", a_ch->stream->session->id); + //printf("*packet out session_id=%u\n", a_ch->stream->session->id); if(!l_data) { log_it(L_WARNING, "if packet_out() l_data=NULL"); dap_stream_ch_set_ready_to_write(a_ch, false); return; } + if(l_data->message_id == STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR) { + // get cur node addr + uint64_t l_addr = dap_db_get_cur_node_addr(); + size_t l_send_data_len = sizeof(uint64_t); + // send cur node addr + dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GET_NODE_ADDR, &l_addr, l_send_data_len); + + pthread_mutex_unlock(&l_ch_chain_net->mutex); + dap_stream_ch_set_ready_to_write(a_ch, false); + return; + } + + dap_chain_node_addr_t node_cur; + // Get log diff size_t l_data_size_out = 0; dap_list_t *l_list = l_data->list_tr; int len = dap_list_length(l_list); - printf("*len=%d\n", len); + //printf("*len=%d\n", len); if(l_list) { int l_item_size_out = 0; uint8_t *l_item = NULL; @@ -318,25 +434,34 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_list = dap_list_delete_link(l_list, l_list); } } - dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB, l_item, l_item_size_out); + + size_t l_send_data_len = 0; + uint8_t *l_send_data = dap_stream_ch_chain_net_make_packet(l_data->node_cur.uint64, l_data->node_remote.uint64, + 0, l_item, l_item_size_out, &l_send_data_len); + + dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB, l_send_data, l_send_data_len); + DAP_DELETE(l_send_data); // remove current item from list dap_chain_global_db_obj_delete((dap_global_db_obj_t *) l_list->data); l_list = dap_list_delete_link(l_list, l_list); - session_data_update(a_ch->stream->session->id, l_list, NULL, -1); + session_data_update(a_ch->stream->session->id, -1, l_list, NULL, -1); } // last message if(!l_list) { // send request size_t l_data_size_out = 0; - // Get last timestamp in log - char *l_timestamp_start_str = dap_db_log_get_last_timestamp(); + // Get current last timestamp in log + //time_t l_timestamp_remote_saved = dap_db_log_get_last_timestamp(); + + // get remote last timestamp (saved data) for remote node + time_t l_timestamp_remote_saved = dap_db_log_get_last_timestamp_remote(l_data->node_remote.uint64); + size_t l_data_send_len = 0; uint8_t *l_data_send = dap_stream_ch_chain_net_make_packet(l_data->node_cur.uint64, l_data->node_remote.uint64, - l_timestamp_start_str, &l_data_send_len); - DAP_DELETE(l_timestamp_start_str); + l_timestamp_remote_saved, NULL, 0, &l_data_send_len); dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC, l_data_send, l_data_send_len); @@ -344,8 +469,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_data = NULL; } - int l_res = 0; //dap_stream_ch_chain_net_pkt_write(a_c h, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB, l_str, sizeof(l_str)); - // session_data_update(a_ch->stream->session->id, l_timestamp_start); + int l_res = 0; // end of session if(!l_list) diff --git a/dap_stream_ch_chain_net.h b/dap_stream_ch_chain_net.h index d5c1b7f..9279034 100644 --- a/dap_stream_ch_chain_net.h +++ b/dap_stream_ch_chain_net.h @@ -28,17 +28,19 @@ #include <stdint.h> #include "dap_stream_ch_chain_net_pkt.h" -typedef void (*dap_stream_ch_chain_net_callback_t)(dap_stream_ch_chain_net_pkt_t*, void*); +typedef void (*dap_stream_ch_chain_net_callback_t)(dap_stream_ch_chain_net_pkt_t *a_pkt, size_t a_pkt_data_size, void *arg); typedef struct dap_stream_ch_chain_net { pthread_mutex_t mutex; dap_stream_ch_chain_net_callback_t notify_callback; + dap_stream_ch_t *ch; void *notify_callback_arg; } dap_stream_ch_chain_net_t; #define DAP_STREAM_CH_CHAIN_NET(a) ((dap_stream_ch_chain_net_t *) ((a)->internal) ) uint8_t dap_stream_ch_chain_net_get_id(); -uint8_t* dap_stream_ch_chain_net_make_packet(uint64_t a_node_addr_from, uint64_t a_node_addr_to, char *a_timestamp_start_str, size_t *a_data_len_out); +uint8_t* dap_stream_ch_chain_net_make_packet(uint64_t a_node_addr_from, uint64_t a_node_addr_to, + time_t a_timestamp_start, uint8_t *a_sdata, size_t a_sdata_len, size_t *a_data_len_out); int dap_stream_ch_chain_net_init(); void dap_stream_ch_chain_net_deinit(); diff --git a/dap_stream_ch_chain_net_pkt.h b/dap_stream_ch_chain_net_pkt.h index 78b4246..b05ebd4 100644 --- a/dap_stream_ch_chain_net_pkt.h +++ b/dap_stream_ch_chain_net_pkt.h @@ -37,6 +37,8 @@ #define STREAM_CH_CHAIN_NET_PKT_TYPE_DATUM 0x12 #define STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB 0x13 #define STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC 0x14 +#define STREAM_CH_CHAIN_NET_PKT_TYPE_GET_NODE_ADDR 0x15 +#define STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR 0x16 #define STREAM_CH_CHAIN_NET_PKT_TYPE_DBG 0x99 typedef struct stream_ch_chain_net_pkt_hdr{ -- GitLab