diff --git a/modules/chain/dap_chain_ch.c b/modules/chain/dap_chain_ch.c index 2771e6b3d14dbbdd6468b099400886b5b68d0370..9a5d927246fb85760111e3cd71a0b05782605d63 100644 --- a/modules/chain/dap_chain_ch.c +++ b/modules/chain/dap_chain_ch.c @@ -105,6 +105,7 @@ typedef struct dap_chain_ch { void *_inheritor; dap_timerfd_t *sync_timer; struct sync_context *sync_context; + int idle_ack_counter; // Legacy section // dap_timerfd_t *activity_timer; @@ -140,7 +141,7 @@ static bool s_sync_timer_callback(void *a_arg); static bool s_debug_more = false, s_debug_legacy = false; static uint32_t s_sync_timeout = 30; static uint32_t s_sync_packets_per_thread_call = 10; -static uint32_t s_sync_ack_window_size = 512; // atoms +static uint32_t s_sync_ack_window_size = 16; // atoms // Legacy static const uint_fast16_t s_update_pack_size = 100; // Number of hashes packed into the one packet @@ -157,7 +158,7 @@ static dap_memstat_rec_t s_memstat [MEMSTAT$K_NR] = { const char* const s_error_type_to_string[] = { [DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS]= "SYNC_REQUEST_ALREADY_IN_PROCESS", [DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE] = "INCORRECT_SYNC_SEQUENCE", - [DAP_CHAIN_CH_ERROR_SYNC_TIMEOUT] = "SYNCHRONIZATION TIMEOUT", + [DAP_CHAIN_CH_ERROR_SYNC_TIMEOUT] = "SYNCHRONIZATION_TIMEOUT", [DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE] = "INVALID_PACKET_SIZE", [DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE] = "INVALID_LEGACY_PACKET_SIZE", [DAP_CHAIN_CH_ERROR_NET_INVALID_ID] = "INVALID_NET_ID", @@ -631,7 +632,7 @@ static bool s_sync_in_chains_callback(void *a_arg) log_it(L_CRITICAL, "Wtf is this ret code? %d", l_atom_add_res); break; } - if (l_ack_send && l_args->ack_req) { + if ( l_ack_send && l_args->ack_req ) { uint64_t l_ack_num = ((uint32_t)l_chain_pkt->hdr.num_hi << 16) | l_chain_pkt->hdr.num_lo; dap_chain_ch_pkt_t *l_pkt = dap_chain_ch_pkt_new(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_ack_num, sizeof(uint64_t), DAP_CHAIN_CH_PKT_VERSION_CURRENT); @@ -653,7 +654,7 @@ static void s_gossip_payload_callback(void *a_payload, size_t a_payload_size, da log_it(L_WARNING, "Incorrect chain GOSSIP packet size"); return; } - struct atom_processing_args *l_args = DAP_NEW_SIZE(struct atom_processing_args, a_payload_size + sizeof(struct atom_processing_args)); + struct atom_processing_args *l_args = DAP_NEW_Z_SIZE(struct atom_processing_args, a_payload_size + sizeof(struct atom_processing_args)); if (!l_args) { log_it(L_CRITICAL, "%s", c_error_memory_alloc); return; @@ -747,7 +748,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) NODE_ADDR_FP_ARGS_S(a_ch->stream->node), l_cluster->mnemonim); return false; } - struct atom_processing_args *l_args = DAP_NEW_SIZE(struct atom_processing_args, l_ch_pkt->hdr.data_size + sizeof(struct atom_processing_args)); + struct atom_processing_args *l_args = DAP_NEW_Z_SIZE(struct atom_processing_args, l_ch_pkt->hdr.data_size + sizeof(struct atom_processing_args)); if (!l_args) { log_it(L_CRITICAL, "%s", c_error_memory_alloc); break; @@ -755,9 +756,10 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_args->addr = a_ch->stream->node; l_args->ack_req = true; memcpy(l_args->data, l_chain_pkt, l_ch_pkt->hdr.data_size); - debug_if(s_debug_more, L_INFO, "In: CHAIN pkt: atom hash %s, size %zd, net id %" DAP_UINT64_FORMAT_U ", chain id %" DAP_UINT64_FORMAT_U, + debug_if(s_debug_more, L_INFO, "In: CHAIN pkt: atom hash %s, size %zd, net id %" DAP_UINT64_FORMAT_U ", chain id %" DAP_UINT64_FORMAT_U ", atom id %" DAP_UINT64_FORMAT_U, dap_get_data_hash_str(l_chain_pkt->data, l_chain_pkt_data_size).s, l_chain_pkt_data_size, - l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64); + l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, + (uint64_t)(((uint32_t)l_chain_pkt->hdr.num_hi << 16) | l_chain_pkt->hdr.num_lo)); dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_in_chains_callback, l_args); } break; @@ -821,8 +823,8 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sum, sizeof(l_sum), DAP_CHAIN_CH_PKT_VERSION_CURRENT); - debug_if(s_debug_more, L_DEBUG, "Out: CHAIN_SUMMARY %s for net %s to destination " NODE_ADDR_FP_STR, - l_chain->name, l_chain->net_name, NODE_ADDR_FP_ARGS_S(a_ch->stream->node)); + debug_if(s_debug_more, L_DEBUG, "Out: CHAIN_SUMMARY %s for net %s to destination " NODE_ADDR_FP_STR " value %"DAP_UINT64_FORMAT_U, + l_chain->name, l_chain->net_name, NODE_ADDR_FP_ARGS_S(a_ch->stream->node), l_last_num); struct sync_context *l_context = DAP_NEW_Z(struct sync_context); l_context->addr = a_ch->stream->node; l_context->iter = l_iter; @@ -830,7 +832,6 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_context->chain_id = l_chain_pkt->hdr.chain_id; l_context->cell_id = l_chain_pkt->hdr.cell_id; l_context->num_last = l_sum.num_last; - l_context->last_activity = dap_time_now(); atomic_store_explicit(&l_context->state, SYNC_STATE_READY, memory_order_relaxed); atomic_store(&l_context->allowed_num, l_sum.num_cur + s_sync_ack_window_size); dap_stream_ch_uuid_t *l_uuid = DAP_DUP(&a_ch->uuid); @@ -840,6 +841,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) break; } l_ch_chain->sync_context = l_context; + l_ch_chain->idle_ack_counter = s_sync_ack_window_size; dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_chain_iter_callback, l_context); l_ch_chain->sync_timer = dap_timerfd_start_on_worker(a_ch->stream_worker->worker, 1000, s_sync_timer_callback, l_uuid); break; @@ -923,10 +925,15 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_ack_num); struct sync_context *l_context = l_ch_chain->sync_context; if (!l_context) { - log_it(L_WARNING, "CHAIN_ACK: No active sync context"); - dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); + if (l_ch_chain->idle_ack_counter > 0) { + debug_if(s_debug_more, L_DEBUG, "End of window wave"); + l_ch_chain->idle_ack_counter--; + } else { + log_it(L_WARNING, "CHAIN_ACK: No active sync context"); + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); + } break; } if (l_context->num_last == l_ack_num) { @@ -1575,6 +1582,7 @@ static bool s_chain_iter_callback(void *a_arg) size_t l_atom_size = l_iter->cur_size; dap_chain_atom_ptr_t l_atom = l_iter->cur; uint32_t l_cycles_count = 0; + l_context->last_activity = dap_time_now(); while (l_atom && l_atom_size) { if (l_iter->cur_num > atomic_load_explicit(&l_context->allowed_num, memory_order_acquire)) break;