Skip to content
Snippets Groups Projects
Commit e148311f authored by Aleksandr Lysikov's avatar Aleksandr Lysikov
Browse files

added request globad_db data and save received data to global_db

parent 4dc682e8
No related branches found
No related tags found
No related merge requests found
......@@ -49,32 +49,59 @@ typedef struct session_data {
uint16_t type; // data type
time_t timestamp_start;
time_t timestamp_cur;
dap_list_t *list_tr; // list of transactions
dap_chain_node_addr_t node_remote;
dap_chain_node_addr_t node_cur;
UT_hash_handle hh;
} session_data_t;
struct my_struct {
int id; /* key */
char name[10];
UT_hash_handle hh; /* makes this structure hashable */
};
typedef struct message_data {
time_t timestamp_start;
uint64_t addr_from; // node addr
uint64_t addr_to; // node addr
//int16_t cmd;
} message_data_t;
// list of active sessions
static session_data_t *s_chain_net_data = NULL;
// for separate access to session_data_t
static pthread_mutex_t s_hash_mutex = PTHREAD_MUTEX_INITIALIZER;
static void session_data_update(unsigned int a_id, time_t a_timestamp_start)
uint8_t* dap_stream_ch_chain_net_make_packet(uint64_t a_node_addr_from, uint64_t a_node_addr_to,
char *a_timestamp_start_str, size_t *a_data_len_out)
{
message_data_t *l_data = DAP_NEW_Z(message_data_t);
l_data->timestamp_start = a_timestamp_start_str ? (time_t) strtoll(a_timestamp_start_str, NULL, 10) : 0;
l_data->addr_from = a_node_addr_from;
l_data->addr_to = a_node_addr_to;
*a_data_len_out = sizeof(message_data_t);
return (uint8_t*) l_data;
}
static void session_data_update(unsigned int a_id, dap_list_t *a_list, message_data_t *a_data, time_t a_timestamp_cur)
{
session_data_t *l_sdata;
pthread_mutex_lock(&s_hash_mutex);
HASH_FIND_INT(s_chain_net_data, &a_id, l_sdata);
if(l_sdata == NULL) {
l_sdata = DAP_NEW_Z(session_data_t);
l_sdata->id = a_id;
HASH_ADD_INT(s_chain_net_data, id, l_sdata);
}
l_sdata->list_tr = a_list;
if(a_data) {
l_sdata->timestamp_start = a_data->timestamp_start;
l_sdata->node_remote.uint64 = a_data->addr_from;
l_sdata->node_cur.uint64 = a_data->addr_to;
}
l_sdata->id = a_id;
l_sdata->timestamp_start = a_timestamp_start;
HASH_ADD_INT(s_chain_net_data, id, l_sdata);
if(a_timestamp_cur != (time_t) -1) {
l_sdata->timestamp_cur = a_timestamp_cur;
}
pthread_mutex_unlock(&s_hash_mutex);
}
......@@ -179,8 +206,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
if(l_chain_pkt) {
size_t data_size = l_ch_pkt->hdr.size - sizeof(dap_stream_ch_chain_net_pkt_hdr_t);
printf("*packet TYPE=%d data_size=%d in=%s\n", l_chain_pkt->hdr.type, data_size,
(data_size > 0) ? (char*) (l_chain_pkt->data) : "-");
printf("*packet TYPE=%d data_size=%d\n", l_chain_pkt->hdr.type, data_size);
//(data_size > 0) ? (char*) (l_chain_pkt->data) : "-");
switch (l_chain_pkt->hdr.type) {
case STREAM_CH_CHAIN_NET_PKT_TYPE_DBG: {
......@@ -202,15 +229,51 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
}
break;
case STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB: {
log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB");
// get transaction and save it to global_db
if(data_size > 0) {
int l_data_obj_count = 0;
// deserialize data
void *l_data_obj = dap_db_log_unpack(l_chain_pkt->data, data_size, &l_data_obj_count); // Parse data from dap_db_log_pack()
// save data to global_db
if(!dap_chain_global_db_obj_save(l_data_obj, l_data_obj_count))
log_it(L_ERROR, "Don't saved to global_db objs=0x%x count=%d",l_data_obj, l_data_obj_count);
}
dap_stream_ch_set_ready_to_write(a_ch, false);
/*// go to data transfer mode
else if(!data_size) {
dap_stream_ch_set_ready_to_write(a_ch, false);
// Get log diff
//dap_list_t *l_list = dap_db_log_get_list(a_data->timestamp_start);
//session_data_update(a_ch->stream->session->id, l_list, a_data, 0);
}*/
}
break;
// receive the latest global_db revision of the remote node -> send log diff
// receive the latest global_db revision of the remote node -> go to send mode
case STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC: {
log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC data=%s", l_chain_pkt->data);
time_t l_timestamp_start = (time_t) strtoll(l_chain_pkt->data, NULL, 10);
session_data_update(a_ch->stream->session->id, l_timestamp_start);
// int l_res = dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB, l_chain_pkt->data, 1);
dap_stream_ch_set_ready_to_write(a_ch, true);
if(data_size == sizeof(message_data_t)) {
message_data_t *a_data = (message_data_t*) l_chain_pkt->data;
char *l_timestamp_cur_str = dap_db_log_get_last_timestamp();
// Get log diff
dap_list_t *l_list = dap_db_log_get_list(a_data->timestamp_start);
session_data_update(a_ch->stream->session->id, l_list, a_data, -1);
dap_stream_ch_set_ready_to_write(a_ch, true);
log_it(L_INFO,
"Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC session_id=%u from 0x%llx to 0x%llx",
a_ch->stream->session->id, a_data->addr_from, a_data->addr_to);
}
else {
log_it(L_ERROR, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC session_id=%u bad request",
a_ch->stream->session->id);
dap_stream_ch_set_ready_to_write(a_ch, false);
}
}
break;
}
......@@ -233,7 +296,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
session_data_t *l_data = session_data_find(a_ch->stream->session->id);
printf("*packet out session_id=%u\n", a_ch->stream->session->id);
if(!l_data){
if(!l_data) {
log_it(L_WARNING, "if packet_out() l_data=NULL");
dap_stream_ch_set_ready_to_write(a_ch, false);
return;
......@@ -241,13 +304,54 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
// Get log diff
size_t l_data_size_out = 0;
dap_list_t *l_list = dap_db_log_get_list(l_data->timestamp_start);
int l_res = dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB, l_str, sizeof(l_str));
dap_list_t *l_list = l_data->list_tr;
int len = dap_list_length(l_list);
printf("*len=%d\n", len);
if(l_list) {
int l_item_size_out = 0;
uint8_t *l_item = NULL;
while(l_list && !l_item) {
l_item = dap_db_log_pack((dap_global_db_obj_t *) l_list->data, &l_item_size_out);
if(!l_item) {
// remove current item from list
dap_chain_global_db_obj_delete((dap_global_db_obj_t *) l_list->data);
l_list = dap_list_delete_link(l_list, l_list);
}
}
dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB, l_item, l_item_size_out);
// remove current item from list
dap_chain_global_db_obj_delete((dap_global_db_obj_t *) l_list->data);
l_list = dap_list_delete_link(l_list, l_list);
session_data_update(a_ch->stream->session->id, l_list, NULL, -1);
}
// last message
if(!l_list) {
// send request
size_t l_data_size_out = 0;
// Get last timestamp in log
char *l_timestamp_start_str = dap_db_log_get_last_timestamp();
size_t l_data_send_len = 0;
uint8_t *l_data_send = dap_stream_ch_chain_net_make_packet(l_data->node_cur.uint64, l_data->node_remote.uint64,
l_timestamp_start_str, &l_data_send_len);
DAP_DELETE(l_timestamp_start_str);
dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC, l_data_send,
l_data_send_len);
DAP_DELETE(l_data_send);
l_data = NULL;
}
int l_res = 0; //dap_stream_ch_chain_net_pkt_write(a_c h, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB, l_str, sizeof(l_str));
// session_data_update(a_ch->stream->session->id, l_timestamp_start);
// end of session
if(!l_data) {
if(!l_list)
dap_stream_ch_set_ready_to_write(a_ch, false);
}
else
dap_stream_ch_set_ready_to_write(a_ch, true);
pthread_mutex_unlock(&l_ch_chain_net->mutex);
}
......@@ -39,5 +39,6 @@ typedef struct dap_stream_ch_chain_net {
#define DAP_STREAM_CH_CHAIN_NET(a) ((dap_stream_ch_chain_net_t *) ((a)->internal) )
uint8_t dap_stream_ch_chain_net_get_id();
uint8_t* dap_stream_ch_chain_net_make_packet(uint64_t a_node_addr_from, uint64_t a_node_addr_to, char *a_timestamp_start_str, size_t *a_data_len_out);
int dap_stream_ch_chain_net_init();
void dap_stream_ch_chain_net_deinit();
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment