Skip to content
Snippets Groups Projects
Commit 1ffe062f authored by dmitriy.gerasimov's avatar dmitriy.gerasimov
Browse files

Merge branch 'feature-3365-1' into 'master'

Feature 3365 1

See merge request !16
parents 59959421 4129a0a2
No related branches found
No related tags found
1 merge request!16Feature 3365 1
...@@ -145,6 +145,19 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -145,6 +145,19 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
}*/ }*/
} }
break; break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: {
log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt");
/*if(s_net_name) {
DAP_DELETE(s_net_name);
s_net_name = NULL; //"kelvin-testnet"
}*/
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: {
log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt");
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: {
log_it(L_INFO, "In: SYNCED_CHAINS pkt"); log_it(L_INFO, "In: SYNCED_CHAINS pkt");
} }
...@@ -209,8 +222,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -209,8 +222,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS,
l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); l_ch_chain->request_cell_id, &l_request, sizeof(l_request));
// dap_stream_ch_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB ,&l_request,
// sizeof (l_request));
l_ch_chain->state = CHAIN_STATE_IDLE; l_ch_chain->state = CHAIN_STATE_IDLE;
} }
...@@ -250,7 +261,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -250,7 +261,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
if(l_request->id_start > l_ch_chain->request_last_ts) { if(l_request->id_start > l_ch_chain->request_last_ts) {
l_start_item = 0; l_start_item = 0;
} }
dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_start_item + 1); dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id);
dap_list_t *l_add_groups = dap_chain_net_get_add_gdb_group(l_net, l_request->node_addr);
dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_start_item + 1, l_add_groups);
if(l_db_log) { if(l_db_log) {
//log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list)); //log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list));
// Add it to outgoing list // Add it to outgoing list
...@@ -269,8 +282,16 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -269,8 +282,16 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t));
} else { } else {
//log_it(L_DEBUG, "No items to sync from %u", l_request->id_start + 1); dap_chain_node_addr_t l_node_addr = { 0 };
dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id);
l_node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0;
dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB,
l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t));
dap_stream_ch_chain_sync_request_t l_request = { { 0 } }; dap_stream_ch_chain_sync_request_t l_request = { { 0 } };
//log_it(L_DEBUG, "No items to sync from %u", l_request->id_start + 1);
l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0;
l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64);
dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
...@@ -340,7 +361,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -340,7 +361,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
break; break;
// first packet of data with source node address // first packet of data with source node address
case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB: { case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB: {
//log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d", l_chain_pkt_data_size); log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d", l_chain_pkt_data_size);
if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)) if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t))
memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size); memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size);
//memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); //memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
...@@ -553,7 +574,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -553,7 +574,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs; //dap_list_last( l_ch_chain->request_global_db_trs ); dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs; //dap_list_last( l_ch_chain->request_global_db_trs );
dap_global_db_obj_t *l_obj = dap_db_log_list_get(l_db_list); dap_global_db_obj_t *l_obj = dap_db_log_list_get(l_db_list);
if ( l_obj) { if (1) {
//dap_list_t *l_list = l_ch_chain->request_global_db_trs; //dap_list_last( l_ch_chain->request_global_db_trs ); //dap_list_t *l_list = l_ch_chain->request_global_db_trs; //dap_list_last( l_ch_chain->request_global_db_trs );
bool l_is_stop = true; //l_list ? false : true; bool l_is_stop = true; //l_list ? false : true;
while(l_obj) { while(l_obj) {
...@@ -563,7 +584,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -563,7 +584,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
size_t l_item_size_out = 0; size_t l_item_size_out = 0;
uint8_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out); uint8_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out);
// Item not found, maybe it has deleted? Then go to the next item
if(!l_item || !l_item_size_out) { if(!l_item || !l_item_size_out) {
//log_it(L_WARNING, "Log pack returned NULL??? data=0x%x (nothing to send) (rest=%d records)", l_obj, //log_it(L_WARNING, "Log pack returned NULL??? data=0x%x (nothing to send) (rest=%d records)", l_obj,
// l_items_rest); // l_items_rest);
...@@ -611,9 +632,10 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -611,9 +632,10 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
if(l_is_stop){ if(l_is_stop){
//log_it(L_DEBUG, "l_obj == 0, STOP"); //log_it(L_DEBUG, "l_obj == 0, STOP");
// If we don't need to send chains after // If we don't need to send chains after
if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL){ // if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL){
dap_stream_ch_chain_go_idle(l_ch_chain); // dap_stream_ch_chain_go_idle(l_ch_chain);
}else if(l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB) { // }else if(l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB)
{
// free log list // free log list
l_ch_chain->request_global_db_trs = NULL; l_ch_chain->request_global_db_trs = NULL;
dap_db_log_list_delete(l_db_list); dap_db_log_list_delete(l_db_list);
...@@ -633,17 +655,12 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -633,17 +655,12 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); l_ch_chain->request_cell_id, &l_request, sizeof(l_request));
// dap_stream_ch_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB ,&l_request,
// sizeof (l_request));
dap_stream_ch_chain_go_idle(l_ch_chain);
// log_it( L_DEBUG," callback_notify_packet_out" );
if(l_ch_chain->callback_notify_packet_out) if(l_ch_chain->callback_notify_packet_out)
l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
NULL, 0, l_ch_chain->callback_notify_arg); NULL, 0, l_ch_chain->callback_notify_arg);
if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL)
dap_stream_ch_chain_go_idle(l_ch_chain);
} }
} }
} }
......
...@@ -38,6 +38,7 @@ ...@@ -38,6 +38,7 @@
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB 0x11 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB 0x11
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN 0x20 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN 0x20
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB 0x21 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB 0x21
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP 0x31
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS 0x02 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS 0x02
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB 0x12 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB 0x12
...@@ -46,6 +47,7 @@ ...@@ -46,6 +47,7 @@
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS 0x03 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS 0x03
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB 0x13 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB 0x13
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP 0x33
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL 0x23 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL 0x23
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR 0xff #define DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR 0xff
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment