diff --git a/dap_stream_ch_chain.c b/dap_stream_ch_chain.c index 5136fdccf012af8505b6d7f1215e57f4958f778d..3e789732cd26aab1bb4d610c91c2898e825fd4e1 100644 --- a/dap_stream_ch_chain.c +++ b/dap_stream_ch_chain.c @@ -145,6 +145,19 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) }*/ } break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: { + log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt"); + /*if(s_net_name) { + DAP_DELETE(s_net_name); + s_net_name = NULL; //"kelvin-testnet" + }*/ + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: { + log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt"); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { log_it(L_INFO, "In: SYNCED_CHAINS pkt"); } @@ -209,8 +222,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); - // dap_stream_ch_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB ,&l_request, - // sizeof (l_request)); l_ch_chain->state = CHAIN_STATE_IDLE; } @@ -250,7 +261,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if(l_request->id_start > l_ch_chain->request_last_ts) { l_start_item = 0; } - dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_start_item + 1); + dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); + dap_list_t *l_add_groups = dap_chain_net_get_add_gdb_group(l_net, l_request->node_addr); + dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_start_item + 1, l_add_groups); if(l_db_log) { //log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list)); // Add it to outgoing list @@ -269,8 +282,16 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); } else { - //log_it(L_DEBUG, "No items to sync from %u", l_request->id_start + 1); + dap_chain_node_addr_t l_node_addr = { 0 }; + dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); + l_node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; + dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, + l_ch_chain->request_net_id, l_ch_chain->request_chain_id, + l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); + dap_stream_ch_chain_sync_request_t l_request = { { 0 } }; + //log_it(L_DEBUG, "No items to sync from %u", l_request->id_start + 1); + l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, @@ -340,7 +361,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) break; // first packet of data with source node address case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB: { - //log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d", l_chain_pkt_data_size); + log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d", l_chain_pkt_data_size); if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)) memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size); //memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); @@ -553,7 +574,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs; //dap_list_last( l_ch_chain->request_global_db_trs ); dap_global_db_obj_t *l_obj = dap_db_log_list_get(l_db_list); - if ( l_obj) { + if (1) { //dap_list_t *l_list = l_ch_chain->request_global_db_trs; //dap_list_last( l_ch_chain->request_global_db_trs ); bool l_is_stop = true; //l_list ? false : true; while(l_obj) { @@ -563,7 +584,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) size_t l_item_size_out = 0; uint8_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out); - + // Item not found, maybe it has deleted? Then go to the next item if(!l_item || !l_item_size_out) { //log_it(L_WARNING, "Log pack returned NULL??? data=0x%x (nothing to send) (rest=%d records)", l_obj, // l_items_rest); @@ -611,9 +632,10 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) if(l_is_stop){ //log_it(L_DEBUG, "l_obj == 0, STOP"); // If we don't need to send chains after - if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL){ - dap_stream_ch_chain_go_idle(l_ch_chain); - }else if(l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB) { +// if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL){ +// dap_stream_ch_chain_go_idle(l_ch_chain); +// }else if(l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB) + { // free log list l_ch_chain->request_global_db_trs = NULL; dap_db_log_list_delete(l_db_list); @@ -633,17 +655,12 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); - // dap_stream_ch_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB ,&l_request, - // sizeof (l_request)); - - dap_stream_ch_chain_go_idle(l_ch_chain); - - // log_it( L_DEBUG," callback_notify_packet_out" ); - if(l_ch_chain->callback_notify_packet_out) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, NULL, 0, l_ch_chain->callback_notify_arg); + if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL) + dap_stream_ch_chain_go_idle(l_ch_chain); } } } diff --git a/dap_stream_ch_chain_pkt.h b/dap_stream_ch_chain_pkt.h index 15877dd876699a9c4ca11610dcd27d676070ab2b..c0746bfd0230d87bbc68af82bbd776d735322bdc 100755 --- a/dap_stream_ch_chain_pkt.h +++ b/dap_stream_ch_chain_pkt.h @@ -38,6 +38,7 @@ #define DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB 0x11 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN 0x20 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB 0x21 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP 0x31 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS 0x02 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB 0x12 @@ -46,6 +47,7 @@ #define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS 0x03 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB 0x13 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP 0x33 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL 0x23 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR 0xff