/* * Authors: * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> * DeM Labs Inc. https://demlabs.net * Kelvin Project https://github.com/kelvinblockchain * Copyright (c) 2017-2019 * All rights reserved. This file is part of DAP (Deus Applications Prototypes) the open source project DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. DAP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. */ #include <errno.h> #include <string.h> #include <pthread.h> #include "dap_common.h" #include "dap_strfuncs.h" #include "uthash.h" #include "dap_http_client.h" #include "dap_chain_global_db.h" #include "dap_chain_global_db_remote.h" #include "dap_stream.h" #include "dap_stream_ch_pkt.h" #include "dap_stream_ch_proc.h" #include "dap_stream_ch_chain_net_pkt.h" #include "dap_stream_ch_chain_net.h" #define LOG_TAG "dap_stream_ch_chain_net" static void s_stream_ch_new(dap_stream_ch_t* ch, void* arg); static void s_stream_ch_delete(dap_stream_ch_t* ch, void* arg); static void s_stream_ch_packet_in(dap_stream_ch_t* ch, void* arg); static void s_stream_ch_packet_out(dap_stream_ch_t* ch, void* arg); typedef struct session_data { unsigned int id; //int sock; int message_id; uint16_t type; // data type time_t timestamp_start; time_t timestamp_cur; dap_list_t *list_tr; // list of transactions dap_chain_node_addr_t node_remote; dap_chain_node_addr_t node_cur; UT_hash_handle hh; } session_data_t; typedef struct message_data { time_t timestamp_start; uint64_t addr_from; // node addr uint64_t addr_to; // node addr } message_data_t; // list of active sessions static session_data_t *s_chain_net_data = NULL; // for separate access to session_data_t static pthread_mutex_t s_hash_mutex = PTHREAD_MUTEX_INITIALIZER; // create packet to send uint8_t* dap_stream_ch_chain_net_make_packet(uint64_t a_node_addr_from, uint64_t a_node_addr_to, time_t a_timestamp_start, uint8_t *a_sdata, size_t a_sdata_len, size_t *a_data_len_out) { //message_data_t *l_data = DAP_NEW_Z(message_data_t); message_data_t *l_data = DAP_NEW_Z_SIZE(message_data_t, sizeof(message_data_t) + a_sdata_len); l_data->timestamp_start = a_timestamp_start; l_data->addr_from = a_node_addr_from; l_data->addr_to = a_node_addr_to; // copy add data memcpy(l_data + 1, a_sdata, a_sdata_len); *a_data_len_out = sizeof(message_data_t) + a_sdata_len; return (uint8_t*) l_data; } // parse received packet static const message_data_t *dap_stream_ch_chain_net_parse_packet(uint8_t* a_data, size_t a_data_len, const uint8_t **a_add_data_out, size_t *a_add_data_len_out) { if(a_data_len < sizeof(message_data_t)) return NULL; message_data_t *l_data = (message_data_t*) a_data; if(a_add_data_out) *a_add_data_out = (uint8_t*) (l_data + 1); if(a_add_data_len_out) *a_add_data_len_out = a_data_len - sizeof(message_data_t); return l_data; } static void session_data_update(unsigned int a_id, int a_messsage_id, dap_list_t *a_list, message_data_t *a_data, time_t a_timestamp_cur) { session_data_t *l_sdata; pthread_mutex_lock(&s_hash_mutex); HASH_FIND_INT(s_chain_net_data, &a_id, l_sdata); if(l_sdata == NULL) { l_sdata = DAP_NEW_Z(session_data_t); l_sdata->id = a_id; HASH_ADD_INT(s_chain_net_data, id, l_sdata); } if(a_messsage_id != -1) l_sdata->message_id = a_messsage_id; l_sdata->list_tr = a_list; if(a_data) { l_sdata->timestamp_start = a_data->timestamp_start; l_sdata->node_remote.uint64 = a_data->addr_from; l_sdata->node_cur.uint64 = a_data->addr_to; } if(a_timestamp_cur != (time_t) -1) { l_sdata->timestamp_cur = a_timestamp_cur; } pthread_mutex_unlock(&s_hash_mutex); } static session_data_t* session_data_find(unsigned int a_id) { session_data_t *l_sdata; pthread_mutex_lock(&s_hash_mutex); HASH_FIND_INT(s_chain_net_data, &a_id, l_sdata); pthread_mutex_unlock(&s_hash_mutex); return l_sdata; } static void session_data_del(unsigned int a_id) { session_data_t *l_sdata; pthread_mutex_lock(&s_hash_mutex); HASH_FIND_INT(s_chain_net_data, &a_id, l_sdata); if(l_sdata) { DAP_DELETE(l_sdata); HASH_DEL(s_chain_net_data, l_sdata); } pthread_mutex_unlock(&s_hash_mutex); } static void session_data_del_all() { session_data_t *l_sdata, *l_sdata_tmp; pthread_mutex_lock(&s_hash_mutex); HASH_ITER(hh, s_chain_net_data , l_sdata, l_sdata_tmp) { DAP_DELETE(l_sdata); HASH_DEL(s_chain_net_data, l_sdata); } pthread_mutex_unlock(&s_hash_mutex); } uint8_t dap_stream_ch_chain_net_get_id() { return 'N'; } /** * @brief dap_stream_ch_chain_net_init * @return */ int dap_stream_ch_chain_net_init() { log_it(L_NOTICE, "Chain network channel initialized"); dap_stream_ch_proc_add(dap_stream_ch_chain_net_get_id(), s_stream_ch_new, s_stream_ch_delete, s_stream_ch_packet_in, s_stream_ch_packet_out); return 0; } /** * @brief dap_stream_ch_chain_deinit */ void dap_stream_ch_chain_net_deinit() { //printf("* del all sessions\n"); session_data_del_all(); } /** * @brief s_stream_ch_new * @param a_ch * @param arg */ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg) { a_ch->internal = DAP_NEW_Z(dap_stream_ch_chain_net_t); dap_stream_ch_chain_net_t * l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(a_ch); l_ch_chain_net->ch = a_ch; pthread_mutex_init(&l_ch_chain_net->mutex, NULL); } /** * @brief s_stream_ch_delete * @param ch * @param arg */ void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) { //printf("* del session=%d\n", a_ch->stream->session->id); dap_stream_ch_chain_net_t * l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(a_ch); pthread_mutex_lock(&l_ch_chain_net->mutex); session_data_del(a_ch->stream->session->id); pthread_mutex_unlock(&l_ch_chain_net->mutex); } /** * @brief s_stream_ch_packet_in * @param ch * @param arg */ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) { dap_stream_ch_chain_net_t * l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(a_ch); if(l_ch_chain_net) { pthread_mutex_lock(&l_ch_chain_net->mutex); dap_stream_ch_pkt_t *l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg; dap_stream_ch_chain_net_pkt_t *l_chain_pkt = (dap_stream_ch_chain_net_pkt_t *) l_ch_pkt->data; if(l_chain_pkt) { size_t l_ch_pkt_data_size = l_ch_pkt->hdr.size - sizeof(dap_stream_ch_chain_net_pkt_hdr_t); //printf("*packet TYPE=%d data_size=%d\n", l_chain_pkt->hdr.type, l_ch_pkt_data_size); //(data_size > 0) ? (char*) (l_chain_pkt->data) : "-"); switch (l_chain_pkt->hdr.type) { case STREAM_CH_CHAIN_NET_PKT_TYPE_DBG: { dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_PING, NULL, 0); dap_stream_ch_set_ready_to_write(a_ch, true); } break; // received ping request - > send pong request case STREAM_CH_CHAIN_NET_PKT_TYPE_PING: { log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_PING"); int l_res = dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_PONG, NULL, 0); dap_stream_ch_set_ready_to_write(a_ch, true); } break; // receive pong request -> send nothing case STREAM_CH_CHAIN_NET_PKT_TYPE_PONG: { log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_PONG"); dap_stream_ch_set_ready_to_write(a_ch, false); } break; // get node address case STREAM_CH_CHAIN_NET_PKT_TYPE_GET_NODE_ADDR: { log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GET_NODE_ADDR"); dap_stream_ch_set_ready_to_write(a_ch, false); } break; // set new node address case STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR: { log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR"); { uint64_t l_addr = 0; // set cur node addr if(l_ch_pkt_data_size == sizeof(uint64_t)) { memcpy(&l_addr, l_chain_pkt->data, sizeof(uint64_t)); dap_db_set_cur_node_addr(l_addr); } } session_data_update(a_ch->stream->session->id, STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR, NULL, NULL, -1); dap_stream_ch_set_ready_to_write(a_ch, true); } break; case STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB: { log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB data_size=%d", l_ch_pkt_data_size); // get transaction and save it to global_db if(l_ch_pkt_data_size > 0) { // parse received packet size_t l_data_size = 0; const uint8_t *l_data = NULL; const message_data_t *l_mdata = dap_stream_ch_chain_net_parse_packet(l_chain_pkt->data, l_ch_pkt_data_size, &l_data, &l_data_size); /*dap_chain_node_addr_t l_node_cur; dap_chain_node_addr_t l_node_remote; uint8_t *l_recv_data = l_chain_pkt->data; //DAP_NEW_SIZE(uint8_t, l_item_size_out + 2 * sizeof(dap_chain_node_addr_t)); memcpy(&l_node_remote, l_recv_data, sizeof(dap_chain_node_addr_t)); memcpy(&l_node_cur, l_recv_data + sizeof(dap_chain_node_addr_t), sizeof(dap_chain_node_addr_t)); uint8_t *l_mdata = l_recv_data + 2 * sizeof(dap_chain_node_addr_t); l_ch_pkt_data_size -= 2 * sizeof(dap_chain_node_addr_t);*/ if(l_data && l_data_size > 0) { //session_data_t *l_data = session_data_find(a_ch->stream->session->id); int l_data_obj_count = 0; // deserialize data void *l_data_obj = dap_db_log_unpack((uint8_t*) l_data, l_data_size, &l_data_obj_count); // Parse data from dap_db_log_pack() // save data to global_db if(!dap_chain_global_db_obj_save(l_data_obj, l_data_obj_count)) { log_it(L_ERROR, "Don't saved to global_db objs=0x%x count=%d", l_data_obj, l_data_obj_count); } else { // Get remote timestamp time_t l_timestamp_remote = dap_db_log_unpack_get_timestamp((uint8_t*) l_data, l_data_size); // set new timestamp (saved data) for remote node dap_db_log_set_last_timestamp_remote(l_mdata->addr_from, l_timestamp_remote); //printf("***ts=%llu\n", l_timestamp_remote); } } dap_stream_ch_set_ready_to_write(a_ch, false); } /*// go to data transfer mode else if(!data_size) { dap_stream_ch_set_ready_to_write(a_ch, false); // Get log diff //dap_list_t *l_list = dap_db_log_get_list(a_data->timestamp_start); //session_data_update(a_ch->stream->session->id, l_list, a_data, 0); }*/ } break; // receive the latest global_db revision of the remote node -> go to send mode case STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC: { if(l_ch_pkt_data_size == sizeof(message_data_t)) { // parse received packet const message_data_t *l_data = dap_stream_ch_chain_net_parse_packet(l_chain_pkt->data, l_ch_pkt_data_size, NULL, NULL); if(l_data) { //message_data_t *l_data = (message_data_t*) l_chain_pkt->data; //time_t l_timestamp_remote_get = l_data->timestamp_start; //time_t l_timestamp_remote_cur = dap_db_log_get_last_timestamp_remote(l_data->addr_from); // last timestamp for remote node time_t l_timestamp_remote_saved = l_data->timestamp_start; //min(l_timestamp_remote_get, l_timestamp_remote_cur); // Get log diff dap_list_t *l_list = dap_db_log_get_list(l_timestamp_remote_saved); session_data_update(a_ch->stream->session->id, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC, l_list, (message_data_t*) l_data, -1); log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC session_id=%u from 0x%llx to 0x%llx count=%d", a_ch->stream->session->id, l_data->addr_from, l_data->addr_to, dap_list_length(l_list)); } // go to send data from list [in s_stream_ch_packet_out()] // no data to send -> send one empty message STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC dap_stream_ch_set_ready_to_write(a_ch, true); } else { log_it(L_ERROR, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC session_id=%u bad request", a_ch->stream->session->id); dap_stream_ch_set_ready_to_write(a_ch, false); } } break; } if(l_ch_chain_net->notify_callback) { if(l_chain_pkt->hdr.type == STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC) { session_data_t *l_data = session_data_find(a_ch->stream->session->id); // end of session if(!l_data->list_tr) l_ch_chain_net->notify_callback(NULL, l_ch_pkt_data_size, l_ch_chain_net->notify_callback_arg); else l_ch_chain_net->notify_callback(l_chain_pkt, l_ch_pkt_data_size, l_ch_chain_net->notify_callback_arg); } else l_ch_chain_net->notify_callback(l_chain_pkt, l_ch_pkt_data_size, l_ch_chain_net->notify_callback_arg); } } pthread_mutex_unlock(&l_ch_chain_net->mutex); } } /** * @brief s_stream_ch_packet_out * @param ch * @param arg */ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) { dap_stream_ch_chain_net_t * l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(a_ch); pthread_mutex_lock(&l_ch_chain_net->mutex); session_data_t *l_data = session_data_find(a_ch->stream->session->id); //printf("*packet out session_id=%u\n", a_ch->stream->session->id); if(!l_data) { log_it(L_WARNING, "if packet_out() l_data=NULL"); dap_stream_ch_set_ready_to_write(a_ch, false); return; } if(l_data->message_id == STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR) { // get cur node addr uint64_t l_addr = dap_db_get_cur_node_addr(); size_t l_send_data_len = sizeof(uint64_t); // send cur node addr dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GET_NODE_ADDR, &l_addr, l_send_data_len); pthread_mutex_unlock(&l_ch_chain_net->mutex); dap_stream_ch_set_ready_to_write(a_ch, false); return; } dap_chain_node_addr_t node_cur; // Get log diff size_t l_data_size_out = 0; dap_list_t *l_list = l_data->list_tr; int len = dap_list_length(l_list); //printf("*len=%d\n", len); if(l_list) { int l_item_size_out = 0; uint8_t *l_item = NULL; while(l_list && !l_item) { l_item = dap_db_log_pack((dap_global_db_obj_t *) l_list->data, &l_item_size_out); if(!l_item) { // remove current item from list dap_chain_global_db_obj_delete((dap_global_db_obj_t *) l_list->data); l_list = dap_list_delete_link(l_list, l_list); } } size_t l_send_data_len = 0; uint8_t *l_send_data = dap_stream_ch_chain_net_make_packet(l_data->node_cur.uint64, l_data->node_remote.uint64, 0, l_item, l_item_size_out, &l_send_data_len); dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB, l_send_data, l_send_data_len); DAP_DELETE(l_send_data); // remove current item from list dap_chain_global_db_obj_delete((dap_global_db_obj_t *) l_list->data); l_list = dap_list_delete_link(l_list, l_list); session_data_update(a_ch->stream->session->id, -1, l_list, NULL, -1); } // last message if(!l_list) { // send request size_t l_data_size_out = 0; // Get current last timestamp in log //time_t l_timestamp_remote_saved = dap_db_log_get_last_timestamp(); // get remote last timestamp (saved data) for remote node time_t l_timestamp_remote_saved = dap_db_log_get_last_timestamp_remote(l_data->node_remote.uint64); size_t l_data_send_len = 0; uint8_t *l_data_send = dap_stream_ch_chain_net_make_packet(l_data->node_cur.uint64, l_data->node_remote.uint64, l_timestamp_remote_saved, NULL, 0, &l_data_send_len); dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC, l_data_send, l_data_send_len); DAP_DELETE(l_data_send); l_data = NULL; } int l_res = 0; // end of session if(!l_list) dap_stream_ch_set_ready_to_write(a_ch, false); else dap_stream_ch_set_ready_to_write(a_ch, true); pthread_mutex_unlock(&l_ch_chain_net->mutex); }