From e148311fd6383ecce835d4f543bca9aae1086878 Mon Sep 17 00:00:00 2001 From: Aleksandr Lysikov <lysikov@inbox.ru> Date: Tue, 2 Apr 2019 23:13:09 +0500 Subject: [PATCH] added request globad_db data and save received data to global_db --- dap_stream_ch_chain_net.c | 148 ++++++++++++++++++++++++++++++++------ dap_stream_ch_chain_net.h | 1 + 2 files changed, 127 insertions(+), 22 deletions(-) diff --git a/dap_stream_ch_chain_net.c b/dap_stream_ch_chain_net.c index bd0559a..c258892 100644 --- a/dap_stream_ch_chain_net.c +++ b/dap_stream_ch_chain_net.c @@ -49,32 +49,59 @@ typedef struct session_data { uint16_t type; // data type time_t timestamp_start; time_t timestamp_cur; + dap_list_t *list_tr; // list of transactions + dap_chain_node_addr_t node_remote; + dap_chain_node_addr_t node_cur; UT_hash_handle hh; } session_data_t; -struct my_struct { - int id; /* key */ - char name[10]; - UT_hash_handle hh; /* makes this structure hashable */ -}; +typedef struct message_data { + time_t timestamp_start; + uint64_t addr_from; // node addr + uint64_t addr_to; // node addr +//int16_t cmd; + +} message_data_t; // list of active sessions static session_data_t *s_chain_net_data = NULL; // for separate access to session_data_t static pthread_mutex_t s_hash_mutex = PTHREAD_MUTEX_INITIALIZER; -static void session_data_update(unsigned int a_id, time_t a_timestamp_start) +uint8_t* dap_stream_ch_chain_net_make_packet(uint64_t a_node_addr_from, uint64_t a_node_addr_to, + char *a_timestamp_start_str, size_t *a_data_len_out) +{ + message_data_t *l_data = DAP_NEW_Z(message_data_t); + l_data->timestamp_start = a_timestamp_start_str ? (time_t) strtoll(a_timestamp_start_str, NULL, 10) : 0; + l_data->addr_from = a_node_addr_from; + l_data->addr_to = a_node_addr_to; + *a_data_len_out = sizeof(message_data_t); + return (uint8_t*) l_data; +} + +static void session_data_update(unsigned int a_id, dap_list_t *a_list, message_data_t *a_data, time_t a_timestamp_cur) { session_data_t *l_sdata; pthread_mutex_lock(&s_hash_mutex); HASH_FIND_INT(s_chain_net_data, &a_id, l_sdata); if(l_sdata == NULL) { l_sdata = DAP_NEW_Z(session_data_t); + l_sdata->id = a_id; + HASH_ADD_INT(s_chain_net_data, id, l_sdata); + } + + l_sdata->list_tr = a_list; + + if(a_data) { + l_sdata->timestamp_start = a_data->timestamp_start; + l_sdata->node_remote.uint64 = a_data->addr_from; + l_sdata->node_cur.uint64 = a_data->addr_to; } - l_sdata->id = a_id; - l_sdata->timestamp_start = a_timestamp_start; - HASH_ADD_INT(s_chain_net_data, id, l_sdata); + if(a_timestamp_cur != (time_t) -1) { + l_sdata->timestamp_cur = a_timestamp_cur; + } + pthread_mutex_unlock(&s_hash_mutex); } @@ -179,8 +206,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if(l_chain_pkt) { size_t data_size = l_ch_pkt->hdr.size - sizeof(dap_stream_ch_chain_net_pkt_hdr_t); - printf("*packet TYPE=%d data_size=%d in=%s\n", l_chain_pkt->hdr.type, data_size, - (data_size > 0) ? (char*) (l_chain_pkt->data) : "-"); + printf("*packet TYPE=%d data_size=%d\n", l_chain_pkt->hdr.type, data_size); + //(data_size > 0) ? (char*) (l_chain_pkt->data) : "-"); switch (l_chain_pkt->hdr.type) { case STREAM_CH_CHAIN_NET_PKT_TYPE_DBG: { @@ -202,15 +229,51 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; case STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB: { + log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB"); + // get transaction and save it to global_db + if(data_size > 0) { + int l_data_obj_count = 0; + // deserialize data + void *l_data_obj = dap_db_log_unpack(l_chain_pkt->data, data_size, &l_data_obj_count); // Parse data from dap_db_log_pack() + // save data to global_db + if(!dap_chain_global_db_obj_save(l_data_obj, l_data_obj_count)) + log_it(L_ERROR, "Don't saved to global_db objs=0x%x count=%d",l_data_obj, l_data_obj_count); + + + } + dap_stream_ch_set_ready_to_write(a_ch, false); + /*// go to data transfer mode + else if(!data_size) { + + dap_stream_ch_set_ready_to_write(a_ch, false); + // Get log diff + //dap_list_t *l_list = dap_db_log_get_list(a_data->timestamp_start); + //session_data_update(a_ch->stream->session->id, l_list, a_data, 0); + + }*/ } break; - // receive the latest global_db revision of the remote node -> send log diff + // receive the latest global_db revision of the remote node -> go to send mode case STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC: { - log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC data=%s", l_chain_pkt->data); - time_t l_timestamp_start = (time_t) strtoll(l_chain_pkt->data, NULL, 10); - session_data_update(a_ch->stream->session->id, l_timestamp_start); -// int l_res = dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB, l_chain_pkt->data, 1); - dap_stream_ch_set_ready_to_write(a_ch, true); + + if(data_size == sizeof(message_data_t)) { + message_data_t *a_data = (message_data_t*) l_chain_pkt->data; + + char *l_timestamp_cur_str = dap_db_log_get_last_timestamp(); + // Get log diff + dap_list_t *l_list = dap_db_log_get_list(a_data->timestamp_start); + + session_data_update(a_ch->stream->session->id, l_list, a_data, -1); + dap_stream_ch_set_ready_to_write(a_ch, true); + log_it(L_INFO, + "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC session_id=%u from 0x%llx to 0x%llx", + a_ch->stream->session->id, a_data->addr_from, a_data->addr_to); + } + else { + log_it(L_ERROR, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC session_id=%u bad request", + a_ch->stream->session->id); + dap_stream_ch_set_ready_to_write(a_ch, false); + } } break; } @@ -233,7 +296,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) session_data_t *l_data = session_data_find(a_ch->stream->session->id); printf("*packet out session_id=%u\n", a_ch->stream->session->id); - if(!l_data){ + if(!l_data) { log_it(L_WARNING, "if packet_out() l_data=NULL"); dap_stream_ch_set_ready_to_write(a_ch, false); return; @@ -241,13 +304,54 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) // Get log diff size_t l_data_size_out = 0; - dap_list_t *l_list = dap_db_log_get_list(l_data->timestamp_start); - int l_res = dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB, l_str, sizeof(l_str)); + dap_list_t *l_list = l_data->list_tr; + int len = dap_list_length(l_list); + printf("*len=%d\n", len); + if(l_list) { + int l_item_size_out = 0; + uint8_t *l_item = NULL; + while(l_list && !l_item) { + l_item = dap_db_log_pack((dap_global_db_obj_t *) l_list->data, &l_item_size_out); + if(!l_item) { + // remove current item from list + dap_chain_global_db_obj_delete((dap_global_db_obj_t *) l_list->data); + l_list = dap_list_delete_link(l_list, l_list); + } + } + dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB, l_item, l_item_size_out); + + // remove current item from list + dap_chain_global_db_obj_delete((dap_global_db_obj_t *) l_list->data); + + l_list = dap_list_delete_link(l_list, l_list); + + session_data_update(a_ch->stream->session->id, l_list, NULL, -1); + } + // last message + if(!l_list) { + // send request + size_t l_data_size_out = 0; + // Get last timestamp in log + char *l_timestamp_start_str = dap_db_log_get_last_timestamp(); + size_t l_data_send_len = 0; + uint8_t *l_data_send = dap_stream_ch_chain_net_make_packet(l_data->node_cur.uint64, l_data->node_remote.uint64, + l_timestamp_start_str, &l_data_send_len); + DAP_DELETE(l_timestamp_start_str); + + dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC, l_data_send, + l_data_send_len); + DAP_DELETE(l_data_send); + + l_data = NULL; + } + int l_res = 0; //dap_stream_ch_chain_net_pkt_write(a_c h, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB, l_str, sizeof(l_str)); // session_data_update(a_ch->stream->session->id, l_timestamp_start); + // end of session - if(!l_data) { + if(!l_list) dap_stream_ch_set_ready_to_write(a_ch, false); - } + else + dap_stream_ch_set_ready_to_write(a_ch, true); pthread_mutex_unlock(&l_ch_chain_net->mutex); } diff --git a/dap_stream_ch_chain_net.h b/dap_stream_ch_chain_net.h index cc87bd3..d5c1b7f 100644 --- a/dap_stream_ch_chain_net.h +++ b/dap_stream_ch_chain_net.h @@ -39,5 +39,6 @@ typedef struct dap_stream_ch_chain_net { #define DAP_STREAM_CH_CHAIN_NET(a) ((dap_stream_ch_chain_net_t *) ((a)->internal) ) uint8_t dap_stream_ch_chain_net_get_id(); +uint8_t* dap_stream_ch_chain_net_make_packet(uint64_t a_node_addr_from, uint64_t a_node_addr_to, char *a_timestamp_start_str, size_t *a_data_len_out); int dap_stream_ch_chain_net_init(); void dap_stream_ch_chain_net_deinit(); -- GitLab