From 2922800eddeb217e1081093d7dc68f2f28433926 Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net> Date: Mon, 1 Feb 2021 00:45:41 +0700 Subject: [PATCH] [*] Reverse sync requests for GDB and Chains. --- modules/channel/chain/dap_stream_ch_chain.c | 28 ++++-- .../chain/include/dap_stream_ch_chain.h | 4 +- modules/net/dap_chain_node_client.c | 97 +++++++++++-------- 3 files changed, 79 insertions(+), 50 deletions(-) diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 64d94930da..2209157536 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -690,6 +690,27 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) } } +/** + * @brief dap_stream_ch_chain_create_sync_request_gdb + * @param a_ch_chain + * @param a_net + */ +void dap_stream_ch_chain_create_sync_request_gdb(dap_stream_ch_chain_t * a_ch_chain, dap_chain_net_t * a_net) +{ + a_ch_chain->is_on_request = true; + memset(&a_ch_chain->request_hdr,0,sizeof (a_ch_chain->request_hdr)); + a_ch_chain->request_hdr.net_id = a_net->pub.id; + + struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request); + l_sync_request->ch = a_ch_chain->ch; + l_sync_request->worker = a_ch_chain->ch->stream_worker->worker; + memcpy(&l_sync_request->request, &a_ch_chain->request, sizeof (a_ch_chain->request)); + memcpy(&l_sync_request->request_hdr, &a_ch_chain->request_hdr, sizeof (a_ch_chain->request_hdr)); + dap_proc_queue_add_callback_inter(a_ch_chain->ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, + l_sync_request); +} + + /** * @brief s_stream_ch_packet_in * @param a_ch @@ -1036,12 +1057,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_hdr.net_id.uint64 , l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, l_ch_chain->request.id_start, l_ch_chain->request.id_end ); - struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request); - l_sync_request->ch = a_ch; - l_sync_request->worker = a_ch->stream_worker->worker; - memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request)); - memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr)); - dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_sync_request); + dap_stream_ch_chain_create_sync_request_gdb(l_ch_chain,dap_chain_net_by_id(l_ch_chain->request_hdr.net_id) ); } }else{ log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index 805ab0ace2..06293450d5 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -81,7 +81,8 @@ typedef struct dap_stream_ch_chain { bool request_updates_complete; - atomic_bool is_on_request; // Protects request section + bool is_on_request; // Protects request section + bool is_on_reverse_request; dap_stream_ch_chain_callback_packet_t callback_notify_packet_out; dap_stream_ch_chain_callback_packet_t callback_notify_packet_in; @@ -96,3 +97,4 @@ void dap_stream_ch_chain_deinit(void); inline static uint8_t dap_stream_ch_chain_get_id(void) { return (uint8_t) 'C'; } void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain); +void dap_stream_ch_chain_create_sync_request_gdb(dap_stream_ch_chain_t * a_ch_chain, dap_chain_net_t * a_net); diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index d722c9a140..6f92accc53 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -305,47 +305,12 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha dap_chain_net_t * l_net = l_node_client->net; assert(l_net); if(s_stream_ch_chain_debug_more) - log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" synced GDB", l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr )); + log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" synced GDB, request reverse sync", l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr )); - // We over with GLOBAL_DB and switch on syncing chains - l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES; - dap_chain_net_state_t l_net_state = dap_chain_net_get_state(l_node_client->net); + l_node_client->state = NODE_CLIENT_STATE_SYNC_GDB_RVRS ; + a_ch_chain->is_on_reverse_request = true; - // Begin from the first chain - l_node_client->cur_chain = l_node_client->net->pub.chains; - dap_chain_cell_id_t l_cell_id={0}; - dap_chain_id_t l_chain_id={0}; - - - if(! l_node_client->cur_chain){ - log_it(L_CRITICAL,"In: Can't sync chains for %s because there is no chains in it",l_net->pub.name); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch_chain->ch,l_net->pub.id.uint64, - l_chain_id.uint64,l_cell_id.uint64,"ERROR_CHAIN_NO_CHAINS"); - l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES ; - - }else{ // If present - select the first one cell in chain - l_chain_id=l_node_client->cur_chain->id; - dap_chain_cell_t * l_cell = l_node_client->cur_chain->cells; - if (l_cell){ - l_cell_id=l_cell->id; - } - uint64_t l_net_id = l_net->pub.id.uint64; - - dap_stream_ch_chain_pkt_t * l_chain_pkt; - size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr); - l_chain_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size ); - l_chain_pkt->hdr.version = 1; - l_chain_pkt->hdr.net_id.uint64 = l_net_id; - l_chain_pkt->hdr.cell_id.uint64 = l_cell_id.uint64; - l_chain_pkt->hdr.chain_id.uint64 = l_chain_id.uint64; - dap_stream_ch_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START , - l_chain_pkt,l_chain_pkt_size); - DAP_DELETE(l_chain_pkt); - log_it(L_INFO, - "In: Send UPDATE_CHAINS_START: net_id=0x%016x chain_id=0x%016x cell_id=0x%016x ", - l_net_id,l_chain_id.uint64,l_cell_id.uint64 - ); - } + dap_stream_ch_chain_create_sync_request_gdb(a_ch_chain, l_node_client->net); }break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { @@ -459,7 +424,53 @@ static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t* a_ch_ch (void) a_pkt_data_size; (void) a_ch_chain; dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; + switch (a_pkt_type) { + + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB:{ + if (a_ch_chain->is_on_reverse_request){ + a_ch_chain->is_on_reverse_request = false; + dap_chain_net_t * l_net = l_node_client->net; + // We over with GLOBAL_DB and switch on syncing chains + l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES; + + // Begin from the first chain + l_node_client->cur_chain = l_node_client->net->pub.chains; + dap_chain_cell_id_t l_cell_id={0}; + dap_chain_id_t l_chain_id={0}; + + + if(! l_node_client->cur_chain){ + log_it(L_CRITICAL,"In: Can't sync chains for %s because there is no chains in it",l_net->pub.name); + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch_chain->ch,l_net->pub.id.uint64, + l_chain_id.uint64,l_cell_id.uint64,"ERROR_CHAIN_NO_CHAINS"); + l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES ; + + }else{ // If present - select the first one cell in chain + l_chain_id=l_node_client->cur_chain->id; + dap_chain_cell_t * l_cell = l_node_client->cur_chain->cells; + if (l_cell){ + l_cell_id=l_cell->id; + } + uint64_t l_net_id = l_net->pub.id.uint64; + + dap_stream_ch_chain_pkt_t * l_chain_pkt; + size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr); + l_chain_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size ); + l_chain_pkt->hdr.version = 1; + l_chain_pkt->hdr.net_id.uint64 = l_net_id; + l_chain_pkt->hdr.cell_id.uint64 = l_cell_id.uint64; + l_chain_pkt->hdr.chain_id.uint64 = l_chain_id.uint64; + dap_stream_ch_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START , + l_chain_pkt,l_chain_pkt_size); + DAP_DELETE(l_chain_pkt); + log_it(L_INFO, + "In: Send UPDATE_CHAINS_START: net_id=0x%016x chain_id=0x%016x cell_id=0x%016x ", + l_net_id,l_chain_id.uint64,l_cell_id.uint64 + ); + } + } + }break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { if(s_stream_ch_chain_debug_more) @@ -470,10 +481,10 @@ static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t* a_ch_ch #else SetEvent( l_node_client->wait_cond ); #endif - } - break; - default: { - } + a_ch_chain->is_on_reverse_request = false; + }break; + default: { + } } } -- GitLab