diff --git a/dap-sdk b/dap-sdk index e7aba2f7157652c411183e9b2fcf37991724028b..985e0aeea0cf1a3fea4e19d43baf72a7029f58f6 160000 --- a/dap-sdk +++ b/dap-sdk @@ -1 +1 @@ -Subproject commit e7aba2f7157652c411183e9b2fcf37991724028b +Subproject commit 985e0aeea0cf1a3fea4e19d43baf72a7029f58f6 diff --git a/modules/chain/dap_chain.c b/modules/chain/dap_chain.c index 6bfea9e5fa17c1a824136d9f5d2dd5a534b0a63b..3ce92f9fed3887be3fa0f05976779b3383e9a2c4 100644 --- a/modules/chain/dap_chain.c +++ b/modules/chain/dap_chain.c @@ -680,14 +680,19 @@ void dap_chain_add_callback_notify(dap_chain_t * a_chain, dap_chain_callback_not * @param a_atom_hash * @return */ -bool dap_chain_get_atom_last_hash(dap_chain_t *a_chain, dap_hash_fast_t *a_atom_hash, dap_chain_cell_id_t a_cell_id) +bool dap_chain_get_atom_last_hash_num(dap_chain_t *a_chain, dap_chain_cell_id_t a_cell_id, dap_hash_fast_t *a_atom_hash, uint64_t *a_atom_num) { - dap_chain_atom_iter_t *l_iter = a_chain->callback_atom_iter_create(a_chain, a_cell_id, false); + dap_return_val_if_fail(a_atom_hash || a_atom_num, false); + dap_chain_atom_iter_t *l_iter = a_chain->callback_atom_iter_create(a_chain, a_cell_id, NULL); + if (!l_iter) + return false; a_chain->callback_atom_iter_get(l_iter, DAP_CHAIN_ITER_OP_LAST, NULL); - *a_atom_hash = l_iter->cur_hash ? *l_iter->cur_hash : (dap_hash_fast_t){0}; - bool l_ret = l_iter->cur_hash; + if (a_atom_hash) + *a_atom_hash = l_iter->cur_hash ? *l_iter->cur_hash : (dap_hash_fast_t){0}; + if (a_atom_num) + *a_atom_num = l_iter->cur_num; a_chain->callback_atom_iter_delete(l_iter); - return l_ret; + return true; } struct chain_thread_notifier { diff --git a/modules/chain/dap_chain_ch.c b/modules/chain/dap_chain_ch.c index 6991ce70c44b4165cbaefe12bc0b438d86dbab9a..7d1cb75fb9b2895cebcc35912696860b1a31c8fa 100644 --- a/modules/chain/dap_chain_ch.c +++ b/modules/chain/dap_chain_ch.c @@ -74,7 +74,7 @@ struct sync_request { dap_worker_t * worker; dap_stream_ch_uuid_t ch_uuid; - dap_chain_ch_sync_request_t request; + dap_chain_ch_sync_request_old_t request; dap_chain_ch_pkt_hdr_t request_hdr; dap_chain_pkt_item_t pkt; @@ -119,10 +119,9 @@ static inline bool s_ch_chain_get_idle(dap_chain_ch_t *a_ch_chain) { return a_ch static void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg); static void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg); -static void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg); +static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg); static bool s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg); static void s_stream_ch_io_complete(dap_events_socket_t *a_es, void *a_arg); -static void s_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, dap_chain_ch_error_type_t a_error); static bool s_sync_out_chains_proc_callback(void *a_arg); static void s_sync_out_chains_last_worker_callback(dap_worker_t *a_worker, void *a_arg); @@ -183,6 +182,12 @@ static const char *s_error_type_to_string(dap_chain_ch_error_type_t a_error) return "UNKNOWN_CHAIN_PACKET_TYPE"; case DAP_CHAIN_CH_ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED: return "GLOBAL_DB_INTERNAL_SAVING_ERROR"; + case DAP_CHAIN_CH_ERROR_NET_IS_OFFLINE: + return "NET_IS_OFFLINE"; + case DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY: + return "OUT_OF_MEMORY"; + case DAP_CHAIN_CH_ERROR_INTERNAL: + return "INTERNAL_ERROR"; default: return "UNKNOWN_ERROR"; } @@ -247,9 +252,6 @@ static void s_stream_ch_delete(dap_stream_ch_t *a_ch, void *a_arg) { UNUSED(a_arg); dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(a_ch); - if (l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_CHAIN_CH_PKT_TYPE_DELETE, NULL, 0, - l_ch_chain->callback_notify_arg); s_ch_chain_go_idle(l_ch_chain); debug_if(s_debug_more, L_DEBUG, "[stm_ch_chain:%p] --- deleted chain:%p", a_ch, l_ch_chain); DAP_DEL_Z(a_ch->internal); @@ -259,13 +261,6 @@ static void s_stream_ch_delete(dap_stream_ch_t *a_ch, void *a_arg) #endif } -void dap_chain_ch_reset_unsafe(dap_chain_ch_t *a_ch_chain) -{ - if (!a_ch_chain) - return; - s_ch_chain_go_idle(a_ch_chain); -} - /** * @brief s_stream_ch_chain_delete * @param a_ch_chain @@ -351,16 +346,13 @@ static void s_sync_out_chains_last_worker_callback(dap_worker_t *a_worker, void } l_ch_chain->request_atom_iter = l_sync_request->chain.request_atom_iter; // last packet - dap_chain_ch_sync_request_t l_request = {}; + dap_chain_ch_sync_request_old_t l_request = {}; if (s_debug_more ) log_it(L_INFO,"Out: DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS"); dap_chain_ch_pkt_write_unsafe(l_ch, DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS, l_sync_request->request_hdr.net_id.uint64, l_sync_request->request_hdr.chain_id.uint64, l_sync_request->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); s_ch_chain_go_idle(l_ch_chain); - if (l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS, - NULL, 0, l_ch_chain->callback_notify_arg); DAP_DELETE(l_sync_request); } /** @@ -430,9 +422,6 @@ static void s_sync_out_gdb_first_worker_callback(dap_worker_t *a_worker, void *a dap_chain_ch_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, &g_node_addr, sizeof(dap_chain_node_addr_t)); - if(l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB, - NULL, 0, l_ch_chain->callback_notify_arg); if( a_worker){ // We send NULL to prevent delete s_sync_request_delete(l_sync_request); @@ -468,9 +457,6 @@ static void s_sync_out_gdb_last_worker_callback(dap_worker_t *a_worker, void *a_ l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); s_ch_chain_go_idle(l_ch_chain); - if(l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB, - NULL, 0, l_ch_chain->callback_notify_arg); s_sync_request_delete(l_sync_request); } @@ -615,10 +601,12 @@ static bool s_sync_in_chains_callback(void *a_arg) if (s_debug_more) dap_get_data_hash_str_static(l_atom, l_atom_size, l_atom_hash_str); dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom, l_atom_size); + bool l_ack_send = false; switch (l_atom_add_res) { case ATOM_PASS: debug_if(s_debug_more, L_WARNING, "Atom with hash %s for %s:%s not accepted (code ATOM_PASS, already present)", l_atom_hash_str, l_chain->net_name, l_chain->name); + l_ack_send = true; break; case ATOM_MOVE_TO_THRESHOLD: debug_if(s_debug_more, L_INFO, "Thresholded atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); @@ -627,18 +615,8 @@ static bool s_sync_in_chains_callback(void *a_arg) debug_if(s_debug_more, L_INFO,"Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); if (dap_chain_atom_save(l_chain->cells, l_atom, l_atom_size, NULL) < 0) log_it(L_ERROR, "Can't save atom %s to the file", l_atom_hash_str); - if (l_args->ack_req) { - uint64_t l_ack_num = (l_chain_pkt->hdr.num_hi << 16) | l_chain_pkt->hdr.num_lo; - dap_chain_ch_pkt_t *l_pkt = dap_chain_ch_pkt_new(l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - &l_ack_num, sizeof(uint64_t)); - dap_stream_ch_pkt_send_by_addr(&l_args->addr, DAP_CHAIN_CH_ID, DAP_CHAIN_CH_PKT_TYPE_CHAIN_ACK, l_pkt, dap_chain_ch_pkt_get_size(l_pkt)); - DAP_DELETE(l_pkt); - debug_if(s_debug_more, L_DEBUG, "Out: CHAIN_ACK %s for net %s to destination " NODE_ADDR_FP_STR " with num %" DAP_UINT64_FORMAT_U, - l_chain ? l_chain->name : "(null)", - l_chain ? l_chain->net_name : "(null)", - NODE_ADDR_FP_ARGS_S(l_args->addr), - l_ack_num); - } + else + l_ack_send = true; break; case ATOM_REJECT: { debug_if(s_debug_more, L_WARNING, "Atom with hash %s for %s:%s rejected", l_atom_hash_str, l_chain->net_name, l_chain->name); @@ -648,6 +626,18 @@ static bool s_sync_in_chains_callback(void *a_arg) log_it(L_CRITICAL, "Wtf is this ret code? %d", l_atom_add_res); break; } + if (l_ack_send && l_args->ack_req) { + uint64_t l_ack_num = (l_chain_pkt->hdr.num_hi << 16) | l_chain_pkt->hdr.num_lo; + dap_chain_ch_pkt_t *l_pkt = dap_chain_ch_pkt_new(l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + &l_ack_num, sizeof(uint64_t)); + dap_stream_ch_pkt_send_by_addr(&l_args->addr, DAP_CHAIN_CH_ID, DAP_CHAIN_CH_PKT_TYPE_CHAIN_ACK, l_pkt, dap_chain_ch_pkt_get_size(l_pkt)); + DAP_DELETE(l_pkt); + debug_if(s_debug_more, L_DEBUG, "Out: CHAIN_ACK %s for net %s to destination " NODE_ADDR_FP_STR " with num %" DAP_UINT64_FORMAT_U, + l_chain ? l_chain->name : "(null)", + l_chain ? l_chain->net_name : "(null)", + NODE_ADDR_FP_ARGS_S(l_args->addr), + l_ack_num); + } DAP_DELETE(l_args); return false; } @@ -685,7 +675,7 @@ static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_worker, void *a_a if (l_ch == NULL) log_it(L_INFO,"Client disconnected before we sent the reply"); else - s_stream_ch_write_error_unsafe(l_ch, l_sync_request->request_hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(l_ch, l_sync_request->request_hdr.net_id.uint64, l_sync_request->request_hdr.chain_id.uint64, l_sync_request->request_hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED); @@ -719,7 +709,7 @@ struct sync_request *dap_chain_ch_create_sync_request(dap_chain_ch_pkt_t *a_chai return l_sync_request; } -static void s_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, dap_chain_ch_error_type_t a_error) +void dap_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, dap_chain_ch_error_type_t a_error) { dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(a_ch); if (!l_ch_chain) { @@ -745,12 +735,8 @@ static bool s_chain_timer_callback(void *a_arg) return false; } if (l_ch_chain->timer_shots++ >= DAP_SYNC_TICKS_PER_SECOND * s_sync_timeout) { - if (!s_ch_chain_get_idle(l_ch_chain)) { + if (!s_ch_chain_get_idle(l_ch_chain)) s_ch_chain_go_idle(l_ch_chain); - if (l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_CHAIN_CH_PKT_TYPE_TIMEOUT, NULL, 0, - l_ch_chain->callback_notify_arg); - } DAP_DELETE(a_arg); l_ch_chain->activity_timer = NULL; return false; @@ -793,18 +779,18 @@ void dap_chain_ch_timer_start(dap_chain_ch_t *a_ch_chain) * @param a_ch * @param a_arg */ -void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) +static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) { dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(a_ch); if (!l_ch_chain || l_ch_chain->_inheritor != a_ch) { log_it(L_ERROR, "No chain in channel, returning"); - return; + return false; } dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg; if (l_ch_pkt->hdr.data_size < sizeof(dap_chain_ch_pkt_t)) { log_it(L_ERROR, "Corrupted packet: too small size %u, smaller then header size %zu", l_ch_pkt->hdr.data_size, sizeof(dap_chain_ch_pkt_t)); - return; + return false; } dap_chain_ch_pkt_t *l_chain_pkt = (dap_chain_ch_pkt_t *)l_ch_pkt->data; @@ -813,13 +799,13 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if (l_chain_pkt->hdr.version > DAP_CHAIN_CH_PKT_VERSION) { debug_if(s_debug_more, L_ATT, "Unsupported protocol version %d, current version %d", l_chain_pkt->hdr.version, DAP_CHAIN_CH_PKT_VERSION); - return; + return false; } if (l_chain_pkt->hdr.version >= 2 && l_chain_pkt_data_size != l_chain_pkt->hdr.data_size) { log_it(L_WARNING, "Incorrect chain packet size %zu, expected %u", l_chain_pkt_data_size, l_chain_pkt->hdr.data_size); - return; + return false; } s_chain_timer_reset(l_ch_chain); @@ -842,21 +828,21 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_cluster_t *l_cluster = dap_cluster_find(dap_guuid_compose(l_chain_pkt->hdr.net_id.uint64, 0)); if (!l_cluster) { log_it(L_WARNING, "Can't find cluster with ID 0x%" DAP_UINT64_FORMAT_X, l_chain_pkt->hdr.net_id.uint64); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_CHAIN_NOT_FOUND); - break; + return false; } dap_cluster_member_t *l_check = dap_cluster_member_find_unsafe(l_cluster, &a_ch->stream->node); if (!l_check) { log_it(L_WARNING, "Node with addr "NODE_ADDR_FP_STR" isn't a member of cluster %s", NODE_ADDR_FP_ARGS_S(a_ch->stream->node), l_cluster->mnemonim); - break; + return false; } struct atom_processing_args *l_args = DAP_NEW_SIZE(struct atom_processing_args, l_ch_pkt->hdr.data_size + sizeof(struct atom_processing_args)); if (!l_args) { log_it(L_CRITICAL, g_error_memory_alloc); - return; + break; } l_args->addr = a_ch->stream->node; l_args->ack_req = true; @@ -872,49 +858,61 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; case DAP_CHAIN_CH_PKT_TYPE_CHAIN_REQ: { - if (l_chain_pkt_data_size != sizeof(dap_hash_fast_t)) { + if (l_chain_pkt_data_size != sizeof(dap_chain_ch_sync_request_t)) { log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_CHAIN_REQ: Wrong chain packet size %zd when expected %zd", - l_chain_pkt_data_size, sizeof(dap_hash_fast_t)); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt_data_size, sizeof(dap_chain_ch_sync_request_t)); + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE); - break; + return false; } + dap_chain_ch_sync_request_t *l_request = (dap_chain_ch_sync_request_t *)l_chain_pkt->data; if (s_debug_more) - log_it(L_INFO, "In: CHAIN_REQ pkt: net 0x%016" DAP_UINT64_FORMAT_x " chain 0x%016" DAP_UINT64_FORMAT_x " cell 0x%016" DAP_UINT64_FORMAT_x, - l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); + log_it(L_INFO, "In: CHAIN_REQ pkt: net 0x%016" DAP_UINT64_FORMAT_x " chain 0x%016" DAP_UINT64_FORMAT_x + " cell 0x%016" DAP_UINT64_FORMAT_x ", hash from %s, num from %" DAP_UINT64_FORMAT_U, + l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + dap_hash_fast_to_str_static(&l_request->hash_from), l_request->num_from); if (l_ch_chain->sync_context) { log_it(L_WARNING, "Can't process CHAIN_REQ request cause already busy with syncronization"); dap_chain_ch_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS); - break; + return false; } dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if (!l_chain) { log_it(L_WARNING, "Not found chain id 0x%016" DAP_UINT64_FORMAT_x " with net id 0x%016" DAP_UINT64_FORMAT_x, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.net_id.uint64); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_CHAIN_NOT_FOUND); - break; + return false; } if (!dap_link_manager_get_net_condition(l_chain_pkt->hdr.net_id.uint64)) { log_it(L_WARNING, "Net id 0x%016" DAP_UINT64_FORMAT_x " is offline", l_chain_pkt->hdr.net_id.uint64); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_NET_IS_OFFLINE); + return false; + } + bool l_sync_from_begin = dap_hash_fast_is_blank(&l_request->hash_from); + dap_chain_atom_iter_t *l_iter = l_chain->callback_atom_iter_create(l_chain, l_chain_pkt->hdr.cell_id, l_sync_from_begin + ? NULL : &l_request->hash_from); + if (!l_iter) { + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY); break; } - dap_hash_fast_t *l_requested_hash = (dap_hash_fast_t *)l_chain_pkt->data; - if (dap_hash_fast_is_blank(l_requested_hash)) - l_requested_hash = NULL; - dap_chain_atom_iter_t *l_iter = l_chain->callback_atom_iter_create(l_chain, l_chain_pkt->hdr.cell_id, l_requested_hash); - if (!l_requested_hash) + if (l_sync_from_begin) l_chain->callback_atom_iter_get(l_iter, DAP_CHAIN_ITER_OP_FIRST, NULL); + bool l_missed_hash = false; if (l_iter->cur) { - dap_chain_ch_summary_t l_sum = { .num_cur = l_requested_hash ? l_iter->cur_num : 0, .num_last = l_chain->callback_count_atom(l_chain) }; - if (l_sum.num_last - l_sum.num_cur) { + uint64_t l_last_num = l_chain->callback_count_atom(l_chain); + if (l_sync_from_begin || + (l_request->num_from == l_iter->cur_num && + l_last_num > l_iter->cur_num)) { + dap_chain_ch_summary_t l_sum = { .num_cur = l_iter->cur_num, .num_last = l_last_num }; dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_CHAIN_SUMMARY, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_sum, sizeof(l_sum)); @@ -937,15 +935,41 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->sync_timer = dap_timerfd_start_on_worker(a_ch->stream_worker->worker, 1000, s_sync_timer_callback, l_ch_chain); break; } - } else - debug_if(s_debug_more, L_DEBUG, "Requested atom with hash %s not found", dap_hash_fast_to_str_static(l_requested_hash)); - dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAIN, - l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, - l_chain_pkt->hdr.cell_id.uint64, NULL, 0); - debug_if(s_debug_more, L_DEBUG, "Out: SYNCED_CHAIN %s for net %s to destination " NODE_ADDR_FP_STR, - l_chain ? l_chain->name : "(null)", - l_chain ? l_chain->net_name : "(null)", - NODE_ADDR_FP_ARGS_S(a_ch->stream->node)); + if (l_request->num_from < l_iter->cur_num || l_last_num > l_iter->cur_num) + l_missed_hash = true; + } else if (!l_sync_from_begin) { + l_missed_hash = true; + debug_if(s_debug_more, L_DEBUG, "Requested atom with hash %s not found", dap_hash_fast_to_str_static(&l_request->hash_from)); + } + if (l_missed_hash) { + l_chain->callback_atom_iter_get(l_iter, DAP_CHAIN_ITER_OP_LAST, NULL); + dap_chain_ch_miss_info_t l_miss_info = { .missed_hash = l_request->hash_from, + .last_hash = *l_iter->cur_hash, + .last_num = l_iter->cur_num }; + dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_CHAIN_MISS, + l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, + l_chain_pkt->hdr.cell_id.uint64, &l_miss_info, sizeof(l_miss_info)); + if (s_debug_more) { + char l_last_hash_str[DAP_HASH_FAST_STR_SIZE]; + dap_hash_fast_to_str(&l_miss_info.last_hash, l_last_hash_str, DAP_HASH_FAST_STR_SIZE); + log_it(L_INFO, "Out: CHAIN_MISS %s for net %s to source " NODE_ADDR_FP_STR + " with hash missed %s, hash last %s and num last %" DAP_UINT64_FORMAT_U, + l_chain ? l_chain->name : "(null)", + l_chain ? l_chain->net_name : "(null)", + NODE_ADDR_FP_ARGS_S(a_ch->stream->node), + dap_hash_fast_to_str_static(&l_miss_info.missed_hash), + l_last_hash_str, + l_miss_info.last_num); + } + } else { + dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAIN, + l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, + l_chain_pkt->hdr.cell_id.uint64, NULL, 0); + debug_if(s_debug_more, L_DEBUG, "Out: SYNCED_CHAIN %s for net %s to destination " NODE_ADDR_FP_STR, + l_chain ? l_chain->name : "(null)", + l_chain ? l_chain->net_name : "(null)", + NODE_ADDR_FP_ARGS_S(a_ch->stream->node)); + } l_chain->callback_atom_iter_delete(l_iter); } break; @@ -953,10 +977,10 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if (l_chain_pkt_data_size != sizeof(dap_chain_ch_summary_t)) { log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_CHAIN_SUMMARY: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(dap_chain_ch_summary_t)); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE); - break; + return false; } dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); dap_chain_ch_summary_t *l_sum = (dap_chain_ch_summary_t *)l_chain_pkt->data; @@ -972,10 +996,10 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if (l_chain_pkt_data_size != sizeof(uint64_t)) { log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_CHAIN_ACK: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(uint64_t)); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE); - break; + return false; } uint64_t l_ack_num = *(uint64_t *)l_chain_pkt->data; dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); @@ -987,7 +1011,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) struct sync_context *l_context = l_ch_chain->sync_context; if (!l_context) { log_it(L_WARNING, "CHAIN_ACK: No active sync context"); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); break; @@ -1017,10 +1041,37 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) NODE_ADDR_FP_ARGS_S(a_ch->stream->node)); } break; + case DAP_CHAIN_CH_PKT_TYPE_CHAIN_MISS: { + if (l_chain_pkt_data_size != sizeof(dap_chain_ch_miss_info_t)) { + log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_CHAIN_MISS: Wrong chain packet size %zd when expected %zd", + l_chain_pkt_data_size, sizeof(dap_chain_ch_miss_info_t)); + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE); + return false; + } + dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); + dap_chain_ch_miss_info_t *l_miss_info = (dap_chain_ch_miss_info_t *)l_chain_pkt->data; + if (s_debug_more) { + char l_last_hash_str[DAP_HASH_FAST_STR_SIZE]; + dap_hash_fast_to_str(&l_miss_info->last_hash, l_last_hash_str, DAP_HASH_FAST_STR_SIZE); + log_it(L_INFO, "In: CHAIN_MISS %s for net %s from source " NODE_ADDR_FP_STR + " with hash missed %s, hash last %s and num last %" DAP_UINT64_FORMAT_U, + l_chain ? l_chain->name : "(null)", + l_chain ? l_chain->net_name : "(null)", + NODE_ADDR_FP_ARGS_S(a_ch->stream->node), + dap_hash_fast_to_str_static(&l_miss_info->missed_hash), + l_last_hash_str, + l_miss_info->last_num); + } + // Will be processed upper in net packet notifier callback + } break; + default: - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_UNKNOWN_CHAIN_PKT_TYPE); + return false; // } //} @@ -1039,8 +1090,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } log_it(L_INFO, "In: UPDATE_GLOBAL_DB_REQ pkt: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - if (l_chain_pkt_data_size == (size_t)sizeof(dap_chain_ch_sync_request_t)) - l_ch_chain->request = *(dap_chain_ch_sync_request_t*)l_chain_pkt->data; + if (l_chain_pkt_data_size == (size_t)sizeof(dap_chain_ch_sync_request_old_t)) + l_ch_chain->request = *(dap_chain_ch_sync_request_old_t*)l_chain_pkt->data; struct sync_request *l_sync_request = dap_chain_ch_create_sync_request(l_chain_pkt, a_ch); l_ch_chain->stats_request_gdb_processed = 0; l_ch_chain->request_hdr = l_chain_pkt->hdr; @@ -1076,7 +1127,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcmp(&l_ch_chain->request_hdr.net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t) + sizeof(dap_chain_id_t) + sizeof(dap_chain_cell_id_t))) { log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB request because its already busy with syncronization"); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS); break; @@ -1093,7 +1144,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_hash_item = DAP_NEW_Z(dap_chain_ch_hash_item_t); if (!l_hash_item) { log_it(L_CRITICAL, "Memory allocation error"); - return; + break; } l_hash_item->hash = l_element->hash; l_hash_item->size = l_element->size; @@ -1109,25 +1160,25 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; // End of response with starting of DB sync case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END: { - if(l_chain_pkt_data_size == sizeof(dap_chain_ch_sync_request_t)) { + if(l_chain_pkt_data_size == sizeof(dap_chain_ch_sync_request_old_t)) { if (l_ch_chain->state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB_REMOTE || memcmp(&l_ch_chain->request_hdr.net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t) + sizeof(dap_chain_id_t) + sizeof(dap_chain_cell_id_t))) { log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_END request because its already busy with syncronization"); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS); break; } debug_if(s_debug_more, L_INFO, "In: UPDATE_GLOBAL_DB_END pkt with total count %d hashes", HASH_COUNT(l_ch_chain->remote_gdbs)); - if (l_chain_pkt_data_size == sizeof(dap_chain_ch_sync_request_t)) - l_ch_chain->request = *(dap_chain_ch_sync_request_t*)l_chain_pkt->data; + if (l_chain_pkt_data_size == sizeof(dap_chain_ch_sync_request_old_t)) + l_ch_chain->request = *(dap_chain_ch_sync_request_old_t*)l_chain_pkt->data; struct sync_request *l_sync_request = dap_chain_ch_create_sync_request(l_chain_pkt, a_ch); dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_sync_request); } else { log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE); } @@ -1142,7 +1193,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr) ); }else { log_it(L_WARNING,"Incorrect data size %zu in packet DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB", l_chain_pkt_data_size); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE); } @@ -1160,7 +1211,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_gdb_in_pkt_proc_callback, l_sync_request); } else { log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size"); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE); } @@ -1169,12 +1220,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) case DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB: { log_it(L_INFO, "In: SYNCED_GLOBAL_DB: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - if (!l_ch_chain->callback_notify_packet_in) { // we haven't node client waitng, so reply to other side - dap_chain_ch_sync_request_t l_sync_gdb = {}; - l_sync_gdb.node_addr.uint64 = g_node_addr.uint64; - dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_REQ, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_sync_gdb, sizeof(l_sync_gdb)); - } + // we haven't node client waitng, so reply to other side + dap_chain_ch_sync_request_old_t l_sync_gdb = {}; + l_sync_gdb.node_addr.uint64 = g_node_addr.uint64; + dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_REQ, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_sync_gdb, sizeof(l_sync_gdb)); } break; /// --- Chains update --- @@ -1182,7 +1232,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_REQ:{ if (l_ch_chain->state != DAP_CHAIN_CH_STATE_IDLE) { log_it(L_WARNING, "Can't process UPDATE_CHAINS_REQ request because its already busy with syncronization"); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS); break; @@ -1225,7 +1275,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_NET_INVALID_ID); // Who are you? I don't know you! go away! @@ -1249,7 +1299,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_NET_INVALID_ID); // Who are you? I don't know you! go away! @@ -1268,7 +1318,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_hash_item = DAP_NEW_Z(dap_chain_ch_hash_item_t); if (!l_hash_item) { log_it(L_CRITICAL, "Memory allocation error"); - return; + break; } l_hash_item->hash = l_element->hash; l_hash_item->size = l_element->size; @@ -1289,12 +1339,12 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_END: { - if(l_chain_pkt_data_size == sizeof(dap_chain_ch_sync_request_t)) { + if(l_chain_pkt_data_size == sizeof(dap_chain_ch_sync_request_old_t)) { if (l_ch_chain->state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS_REMOTE || memcmp(&l_ch_chain->request_hdr.net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t) + sizeof(dap_chain_id_t) + sizeof(dap_chain_cell_id_t))) { log_it(L_WARNING, "Can't process UPDATE_CHAINS_END request because its already busy with syncronization"); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS); break; @@ -1305,7 +1355,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_NET_INVALID_ID); break; @@ -1319,7 +1369,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } else { log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_END: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE); } @@ -1335,7 +1385,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64); }else{ log_it(L_WARNING,"Incorrect data size %zd in packet DAP_CHAIN_CH_PKT_TYPE_FIRST_CHAIN", l_chain_pkt_data_size); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE); } @@ -1356,30 +1406,27 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_timerfd_delete_unsafe(l_ch_chain->activity_timer); l_ch_chain->activity_timer = NULL; } - if (!l_ch_chain->callback_notify_packet_in) { // we haven't node client waitng, so reply to other side - dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); - if (!l_chain) { - log_it(L_ERROR, "Invalid SYNCED_CHAINS packet from %s with net id 0x%016"DAP_UINT64_FORMAT_x - " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", - a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - DAP_CHAIN_CH_ERROR_NET_INVALID_ID); - break; - } - if (s_debug_more) { - log_it(L_INFO, "Out: UPDATE_CHAINS_REQ pkt"); - } - dap_chain_ch_sync_request_t l_request= {}; - dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_REQ, l_chain_pkt->hdr.net_id.uint64, - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_request, sizeof(l_request)); + // we haven't node client waitng, so reply to other side + dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); + if (!l_chain) { + log_it(L_ERROR, "Invalid SYNCED_CHAINS packet from %s with net id 0x%016"DAP_UINT64_FORMAT_x + " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", + a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); + dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + DAP_CHAIN_CH_ERROR_NET_INVALID_ID); + break; + } + if (s_debug_more) { + log_it(L_INFO, "Out: UPDATE_CHAINS_REQ pkt"); } + dap_chain_ch_sync_request_old_t l_request= {}; + dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_REQ, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_request, sizeof(l_request)); } break; } - if(l_ch_chain->callback_notify_packet_in) - l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt, - l_chain_pkt_data_size, l_ch_chain->callback_notify_arg); + return true; } static bool s_sync_timer_callback(void *a_arg) @@ -1387,6 +1434,13 @@ static bool s_sync_timer_callback(void *a_arg) dap_chain_ch_t *l_ch_chain = a_arg; struct sync_context *l_context = l_ch_chain->sync_context; if (l_context->last_activity + s_sync_timeout <= dap_time_now()) { + log_it(L_ERROR, "Sync timeout for node " NODE_ADDR_FP_STR " with net 0x%016" DAP_UINT64_FORMAT_x + " chain 0x%016" DAP_UINT64_FORMAT_x " cell 0x%016" DAP_UINT64_FORMAT_x, + NODE_ADDR_FP_ARGS_S(l_context->addr), l_context->net_id.uint64, + l_context->chain_id.uint64, l_context->cell_id.uint64); + dap_stream_ch_write_error_unsafe(DAP_STREAM_CH(l_ch_chain), l_context->net_id.uint64, + l_context->chain_id.uint64, l_context->cell_id.uint64, + DAP_CHAIN_CH_ERROR_SYNC_TIMEOUT); l_ch_chain->sync_timer = NULL; s_ch_chain_go_idle(l_ch_chain); return false; @@ -1616,7 +1670,7 @@ static bool s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, - &l_ch_chain->request, sizeof(dap_chain_ch_sync_request_t)); + &l_ch_chain->request, sizeof(dap_chain_ch_sync_request_old_t)); debug_if(s_debug_more, L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END"); l_go_idle = true; } @@ -1649,7 +1703,7 @@ static bool s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, - &l_ch_chain->request, sizeof(dap_chain_ch_sync_request_t)); + &l_ch_chain->request, sizeof(dap_chain_ch_sync_request_old_t)); if (s_debug_more ) log_it(L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END"); l_go_idle = true; @@ -1699,7 +1753,7 @@ static bool s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) } else if (!l_objs) { l_was_sent_smth = true; // last message - dap_chain_ch_sync_request_t l_request = { }; + dap_chain_ch_sync_request_old_t l_request = { }; s_stream_ch_chain_pkt_write(a_ch, DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); @@ -1768,7 +1822,7 @@ static bool s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) log_it( L_INFO,"Syncronized database: items syncronyzed %"DAP_UINT64_FORMAT_U" from %zu", l_ch_chain->stats_request_gdb_processed, dap_db_log_list_get_count(l_ch_chain->request_db_log)); // last message - dap_chain_ch_sync_request_t l_request = {}; + dap_chain_ch_sync_request_old_t l_request = {}; s_stream_ch_chain_pkt_write(a_ch, DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); @@ -1806,12 +1860,12 @@ static bool s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) l_was_sent_smth = true; if(s_debug_more) log_it(L_INFO,"Out: UPDATE_CHAINS_END sent "); - dap_chain_ch_sync_request_t l_request = {}; + dap_chain_ch_sync_request_old_t l_request = {}; s_stream_ch_chain_pkt_write(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_END, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, - &l_request, sizeof(dap_chain_ch_sync_request_t)); + &l_request, sizeof(dap_chain_ch_sync_request_old_t)); l_go_idle = true; dap_stream_ch_set_ready_to_write_unsafe(a_ch, false); } @@ -1867,7 +1921,7 @@ static bool s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) break; } if(!l_ch_chain->request_atom_iter || !l_ch_chain->request_atom_iter->cur) { // All chains synced - dap_chain_ch_sync_request_t l_request = {}; + dap_chain_ch_sync_request_old_t l_request = {}; // last message l_was_sent_smth = true; s_stream_ch_chain_pkt_write(a_ch, DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS, @@ -1875,9 +1929,6 @@ static bool s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); log_it( L_INFO,"Synced: %"DAP_UINT64_FORMAT_U" atoms processed", l_ch_chain->stats_request_atoms_processed); l_go_idle = true; - if (l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS, NULL, - 0, l_ch_chain->callback_notify_arg); } } break; diff --git a/modules/chain/include/dap_chain.h b/modules/chain/include/dap_chain.h index 59237d96057b64a84641848127359a4bd9ca3712..a0b865f378f72e2441265157b2eaa427e0e052f6 100644 --- a/modules/chain/include/dap_chain.h +++ b/modules/chain/include/dap_chain.h @@ -95,6 +95,7 @@ typedef size_t (*dap_chain_callback_atom_get_hdr_size_t)(void); typedef dap_chain_atom_iter_t * (*dap_chain_callback_atom_iter_create_t)(dap_chain_t *a_chain, dap_chain_cell_id_t a_cell_id, dap_hash_fast_t *a_hash_from); typedef dap_chain_atom_ptr_t (*dap_chain_callback_atom_iter_get_t)(dap_chain_atom_iter_t *a_iter, dap_chain_iter_op_t a_operation, size_t *a_atom_size); typedef dap_chain_atom_ptr_t (*dap_chain_callback_atom_iter_find_by_hash_t)(dap_chain_atom_iter_t *a_iter, dap_hash_fast_t *a_atom_hash, size_t *a_atom_size); +typedef dap_chain_atom_ptr_t (*dap_chain_callback_atom_iter_get_by_num_t)(dap_chain_atom_iter_t *a_iter, uint64_t a_atom_num); typedef void (*dap_chain_callback_atom_iter_delete_t)(dap_chain_atom_iter_t *); typedef dap_chain_datum_iter_t * (*dap_chain_datum_callback_iter_create_t)(dap_chain_t *); @@ -178,6 +179,7 @@ typedef struct dap_chain { dap_chain_callback_atom_get_timestamp_t callback_atom_get_timestamp; dap_chain_callback_atom_iter_find_by_hash_t callback_atom_find_by_hash; + dap_chain_callback_atom_iter_get_by_num_t callback_atom_get_by_num; dap_chain_callback_datum_find_by_hash_t callback_datum_find_by_hash; dap_chain_callback_block_find_by_hash_t callback_block_find_by_tx_hash; @@ -258,7 +260,11 @@ dap_chain_t *dap_chain_load_from_cfg(const char *a_chain_net_name, dap_chain_net void dap_chain_delete(dap_chain_t * a_chain); void dap_chain_add_callback_notify(dap_chain_t * a_chain, dap_chain_callback_notify_t a_callback, void * a_arg); dap_chain_atom_ptr_t dap_chain_get_atom_by_hash(dap_chain_t * a_chain, dap_chain_hash_fast_t * a_atom_hash, size_t * a_atom_size); -bool dap_chain_get_atom_last_hash(dap_chain_t *a_chain, dap_hash_fast_t *a_atom_hash, dap_chain_cell_id_t a_cell_id); +bool dap_chain_get_atom_last_hash_num(dap_chain_t *a_chain, dap_chain_cell_id_t a_cell_id, dap_hash_fast_t *a_atom_hash, uint64_t *a_atom_num); +DAP_STATIC_INLINE bool dap_chain_get_atom_last_hash(dap_chain_t *a_chain, dap_chain_cell_id_t a_cell_id, dap_hash_fast_t *a_atom_hash) +{ + return dap_chain_get_atom_last_hash_num(a_chain, a_cell_id, a_atom_hash, NULL); +} ssize_t dap_chain_atom_save(dap_chain_cell_t *a_chain_cell, const uint8_t *a_atom, size_t a_atom_size, dap_hash_fast_t *a_new_atom_hash); int dap_cert_chain_file_save(dap_chain_datum_t *datum, char *net_name); const char* dap_chain_get_path(dap_chain_t *a_chain); diff --git a/modules/chain/include/dap_chain_ch.h b/modules/chain/include/dap_chain_ch.h index 13554e358f8f4f068057fad9cce83fe85eefd9a1..9d51f6319edd336118b3ef1e375a0cf6171dbc1c 100644 --- a/modules/chain/include/dap_chain_ch.h +++ b/modules/chain/include/dap_chain_ch.h @@ -50,13 +50,16 @@ typedef enum dap_chain_ch_state { typedef enum dap_chain_ch_error_type { DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS, DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE, + DAP_CHAIN_CH_ERROR_SYNC_TIMEOUT, DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE, DAP_CHAIN_CH_ERROR_NET_INVALID_ID, DAP_CHAIN_CH_ERROR_CHAIN_NOT_FOUND, DAP_CHAIN_CH_ERROR_ATOM_NOT_FOUND, DAP_CHAIN_CH_ERROR_UNKNOWN_CHAIN_PKT_TYPE, DAP_CHAIN_CH_ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED, - DAP_CHAIN_CH_ERROR_NET_IS_OFFLINE + DAP_CHAIN_CH_ERROR_NET_IS_OFFLINE, + DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY, + DAP_CHAIN_CH_ERROR_INTERNAL } dap_chain_ch_error_type_t; typedef struct dap_chain_ch dap_chain_ch_t; @@ -68,7 +71,7 @@ typedef struct dap_chain_pkt_item { byte_t *pkt_data; } dap_chain_pkt_item_t; -typedef struct dap_chain_ch_hash_item{ +typedef struct dap_chain_ch_hash_item { dap_hash_fast_t hash; uint32_t size; UT_hash_handle hh; @@ -92,17 +95,13 @@ typedef struct dap_chain_ch { // request section dap_chain_atom_iter_t *request_atom_iter; //dap_db_log_list_t *request_db_log; // list of global db records - dap_chain_ch_sync_request_t request; + dap_chain_ch_sync_request_old_t request; dap_chain_ch_pkt_hdr_t request_hdr; dap_list_t *request_db_iter; uint32_t timer_shots; dap_timerfd_t *activity_timer; int sent_breaks; - - dap_chain_ch_callback_packet_t callback_notify_packet_out; - dap_chain_ch_callback_packet_t callback_notify_packet_in; - void *callback_notify_arg; } dap_chain_ch_t; #define DAP_CHAIN_CH(a) ((dap_chain_ch_t *) ((a)->internal) ) @@ -114,4 +113,5 @@ int dap_chain_ch_init(void); void dap_chain_ch_deinit(void); void dap_chain_ch_timer_start(dap_chain_ch_t *a_ch_chain); -void dap_chain_ch_reset_unsafe(dap_chain_ch_t *a_ch_chain); + +void dap_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, dap_chain_ch_error_type_t a_error); diff --git a/modules/chain/include/dap_chain_ch_pkt.h b/modules/chain/include/dap_chain_ch_pkt.h index 68046ededd862aa238032d327b81a5e2cedd47fb..0c721f1408379a5972fdf7138514607b9574cacc 100644 --- a/modules/chain/include/dap_chain_ch_pkt.h +++ b/modules/chain/include/dap_chain_ch_pkt.h @@ -60,6 +60,7 @@ // Stable #define DAP_CHAIN_CH_PKT_TYPE_CHAIN_REQ 0x80 +#define DAP_CHAIN_CH_PKT_TYPE_CHAIN_MISS 0x69 #define DAP_CHAIN_CH_PKT_TYPE_CHAIN 0x01 #define DAP_CHAIN_CH_PKT_TYPE_CHAIN_SUMMARY 0x81 #define DAP_CHAIN_CH_PKT_TYPE_CHAIN_ACK 0x82 @@ -75,15 +76,33 @@ #define DAP_CHAIN_CH_PKT_TYPE_UPDATE_TSD_HASH_FIRST 0x0004 // Hash of first(s) item #define DAP_CHAIN_CH_PKT_TYPE_UPDATE_TSD_LAST_ID 0x0100 // Last ID of GDB synced group +// *** Legacy *** // + typedef struct dap_chain_ch_update_element{ dap_hash_fast_t hash; uint32_t size; } DAP_ALIGN_PACKED dap_chain_ch_update_element_t; -typedef struct dap_chain_ch_sync_request{ +typedef struct dap_chain_ch_sync_request_old { dap_chain_node_addr_t node_addr; // Requesting node's address dap_chain_hash_fast_t hash_from; byte_t unused[96]; +} DAP_ALIGN_PACKED dap_chain_ch_sync_request_old_t; + +static const char* c_dap_chain_ch_pkt_type_str[]={ + [DAP_CHAIN_CH_PKT_TYPE_CHAIN] = "DAP_CHAIN_CH_PKT_TYPE_CHAIN", + [DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB] = "DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB", + [DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS] = "DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS", + [DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB] = "DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB", + [DAP_CHAIN_CH_PKT_TYPE_ERROR] = "DAP_CHAIN_CH_PKT_TYPE_ERROR" + +}; + +// *** Active *** // + +typedef struct dap_chain_ch_sync_request { + dap_chain_hash_fast_t hash_from; + uint64_t num_from; } DAP_ALIGN_PACKED dap_chain_ch_sync_request_t; typedef struct dap_chain_ch_summary { @@ -92,6 +111,12 @@ typedef struct dap_chain_ch_summary { byte_t reserved[128]; } DAP_ALIGN_PACKED dap_chain_ch_summary_t; +typedef struct dap_chain_ch_miss_info { + dap_hash_fast_t missed_hash; + dap_hash_fast_t last_hash; + uint64_t last_num; +} DAP_ALIGN_PACKED dap_chain_ch_miss_info_t; + typedef struct dap_chain_ch_pkt_hdr { uint8_t version; uint8_t num_hi; @@ -107,15 +132,6 @@ typedef struct dap_chain_ch_pkt { uint8_t data[]; } DAP_ALIGN_PACKED dap_chain_ch_pkt_t; -static const char* c_dap_chain_ch_pkt_type_str[]={ - [DAP_CHAIN_CH_PKT_TYPE_CHAIN] = "DAP_CHAIN_CH_PKT_TYPE_CHAIN", - [DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB] = "DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB", - [DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS] = "DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS", - [DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB] = "DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB", - [DAP_CHAIN_CH_PKT_TYPE_ERROR] = "DAP_CHAIN_CH_PKT_TYPE_ERROR" - -}; - DAP_STATIC_INLINE size_t dap_chain_ch_pkt_get_size(dap_chain_ch_pkt_t *a_pkt) { return sizeof(dap_chain_ch_pkt_hdr_t) + a_pkt->hdr.data_size; } dap_chain_ch_pkt_t *dap_chain_ch_pkt_new(uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, diff --git a/modules/channel/chain-net/dap_stream_ch_chain_net.c b/modules/channel/chain-net/dap_stream_ch_chain_net.c index 2ec116819d111324e8b84e310fdc29ce0aae1385..23d3aaac5a2808a68dec9aac455be73aba851857 100644 --- a/modules/channel/chain-net/dap_stream_ch_chain_net.c +++ b/modules/channel/chain-net/dap_stream_ch_chain_net.c @@ -59,7 +59,7 @@ static void s_stream_ch_new(dap_stream_ch_t* ch, void* arg); static void s_stream_ch_delete(dap_stream_ch_t* ch, void* arg); -static void s_stream_ch_packet_in(dap_stream_ch_t* ch, void* arg); +static bool s_stream_ch_packet_in(dap_stream_ch_t* ch, void* arg); /** * @brief dap_stream_ch_chain_net_init @@ -110,7 +110,7 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) * @param ch * @param arg */ -void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void* a_arg) +static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void* a_arg) { dap_stream_ch_chain_net_t * l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(a_ch); if(l_ch_chain_net) { @@ -119,18 +119,18 @@ void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void* a_arg) char *l_data_hash_str; dap_get_data_hash_str_static(l_ch_pkt->data, l_ch_pkt->hdr.data_size, l_data_hash_str); log_it(L_ATT, "Receive test data packet with hash %s", l_data_hash_str); - return; + return false; } if (l_ch_pkt->hdr.data_size < sizeof(dap_stream_ch_chain_net_pkt_t)) { log_it(L_WARNING, "Too small stream channel N packet size %u (header size %zu)", l_ch_pkt->hdr.data_size, sizeof(dap_stream_ch_chain_net_pkt_t)); - return; + return false; } dap_stream_ch_chain_net_pkt_t *l_ch_chain_net_pkt = (dap_stream_ch_chain_net_pkt_t *)l_ch_pkt->data; if (l_ch_chain_net_pkt->hdr.data_size + sizeof(dap_stream_ch_chain_net_pkt_t) > l_ch_pkt->hdr.data_size) { log_it(L_WARNING, "Too small stream channel N packet size %u (expected at least %zu)", l_ch_pkt->hdr.data_size, l_ch_chain_net_pkt->hdr.data_size + sizeof(dap_stream_ch_chain_net_pkt_t)); - return; + return false; } dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain_net_pkt->hdr.net_id); if (!l_net) { @@ -138,7 +138,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void* a_arg) char l_err_str[] = "ERROR_NET_INVALID_ID"; dap_stream_ch_chain_net_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR , l_ch_chain_net_pkt->hdr.net_id, l_err_str, sizeof(l_err_str)); - return; + return false; } if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR) { char *l_err_str = (char *)l_ch_chain_net_pkt->data; @@ -147,7 +147,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void* a_arg) assert(!dap_stream_node_addr_is_blank(&a_ch->stream->node)); dap_link_manager_accounting_link_in_net(l_net->pub.id.uint64, &a_ch->stream->node, false); } - return; + return false; } uint16_t l_acl_idx = dap_chain_net_get_acl_idx(l_net); uint8_t l_acl = a_ch->stream->session->acl ? a_ch->stream->session->acl[l_acl_idx] : 1; @@ -157,13 +157,13 @@ void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void* a_arg) char l_err_str[] = "ERROR_NET_NOT_AUTHORIZED"; dap_stream_ch_chain_net_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR , l_ch_chain_net_pkt->hdr.net_id, l_err_str, sizeof(l_err_str)); - return; + return false; } if (dap_chain_net_get_state(l_net) == NET_STATE_OFFLINE) { char l_err_str[] = "ERROR_NET_IS_OFFLINE"; dap_stream_ch_chain_net_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR, l_ch_chain_net_pkt->hdr.net_id, l_err_str, sizeof(l_err_str)); - return; + return false; } switch (l_ch_pkt->hdr.type) { @@ -174,7 +174,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void* a_arg) char l_err_str[] = "ERROR_STREAM_NOT_AUTHORIZED"; dap_stream_ch_chain_net_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR , l_ch_chain_net_pkt->hdr.net_id, l_err_str, sizeof(l_err_str)); - return; + break; } assert(!dap_stream_node_addr_is_blank(&a_ch->stream->node)); dap_link_manager_accounting_link_in_net(l_net->pub.id.uint64, &a_ch->stream->node, true); @@ -293,7 +293,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void* a_arg) default: log_it(L_ERROR, "Unknown paket type %hhu", l_ch_pkt->hdr.type); - break; + return false; } if(l_ch_chain_net->notify_callback) @@ -301,4 +301,5 @@ void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void* a_arg) l_ch_chain_net_pkt->hdr.data_size, l_ch_chain_net->notify_callback_arg); } + return true; } diff --git a/modules/consensus/esbocs/dap_chain_cs_esbocs.c b/modules/consensus/esbocs/dap_chain_cs_esbocs.c index e449a19996cfb2e3cd1d0b760c5194de7de859cb..02cd42e366e87c8c60962a4b5cc1ad88b1a9f7d9 100644 --- a/modules/consensus/esbocs/dap_chain_cs_esbocs.c +++ b/modules/consensus/esbocs/dap_chain_cs_esbocs.c @@ -51,7 +51,7 @@ enum s_esbocs_session_state { static dap_list_t *s_validator_check(dap_chain_addr_t *a_addr, dap_list_t *a_validators); static void s_session_state_change(dap_chain_esbocs_session_t *a_session, enum s_esbocs_session_state a_new_state, dap_time_t a_time); -static void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg); +static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg); static void s_session_packet_in(dap_chain_esbocs_session_t *a_session, dap_chain_node_addr_t *a_sender_node_addr, uint8_t *a_data, size_t a_data_size); static void s_session_round_clear(dap_chain_esbocs_session_t *a_session); static void s_session_round_new(dap_chain_esbocs_session_t *a_session); @@ -356,7 +356,7 @@ static void s_new_atom_notifier(void *a_arg, dap_chain_t *a_chain, dap_chain_cel assert(l_session->chain == a_chain); pthread_mutex_lock(&l_session->mutex); dap_chain_hash_fast_t l_last_block_hash; - dap_chain_get_atom_last_hash(l_session->chain, &l_last_block_hash, a_id); + dap_chain_get_atom_last_hash(l_session->chain, a_id, &l_last_block_hash); if (!dap_hash_fast_compare(&l_last_block_hash, &l_session->cur_round.last_block_hash)) s_session_round_new(l_session); pthread_mutex_unlock(&l_session->mutex); @@ -828,7 +828,7 @@ static void s_session_send_startsync(dap_chain_esbocs_session_t *a_session) if (a_session->cur_round.sync_sent) return; // Sync message already was sent dap_chain_hash_fast_t l_last_block_hash; - dap_chain_get_atom_last_hash(a_session->chain, &l_last_block_hash, c_dap_chain_cell_id_null); + dap_chain_get_atom_last_hash(a_session->chain, c_dap_chain_cell_id_null, &l_last_block_hash); a_session->ts_round_sync_start = dap_time_now(); if (!dap_hash_fast_compare(&l_last_block_hash, &a_session->cur_round.last_block_hash)) return; // My last block hash has changed, skip sync message @@ -929,7 +929,7 @@ static void s_session_round_new(dap_chain_esbocs_session_t *a_session) a_session->ts_stage_entry = 0; dap_hash_fast_t l_last_block_hash; - dap_chain_get_atom_last_hash(a_session->chain, &l_last_block_hash, c_dap_chain_cell_id_null); + dap_chain_get_atom_last_hash(a_session->chain, c_dap_chain_cell_id_null, &l_last_block_hash); if (!dap_hash_fast_compare(&l_last_block_hash, &a_session->cur_round.last_block_hash) || (!dap_hash_fast_is_blank(&l_last_block_hash) && dap_hash_fast_is_blank(&a_session->cur_round.last_block_hash))) { @@ -1862,32 +1862,32 @@ static bool s_process_incoming_message(void *a_arg) return false; } -static void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) +static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) { dap_stream_ch_pkt_t *l_ch_pkt = (dap_stream_ch_pkt_t *)a_arg; if (!l_ch_pkt) - return; + return false; dap_chain_esbocs_message_t *l_message = (dap_chain_esbocs_message_t *)l_ch_pkt->data; size_t l_message_size = l_ch_pkt->hdr.data_size; if (l_message_size < sizeof(dap_chain_esbocs_message_t) || l_message_size > DAP_CHAIN_CS_BLOCKS_MAX_BLOCK_SIZE + PKT_SIGN_N_HDR_OVERHEAD || l_message_size != sizeof(*l_message) + l_message->hdr.sign_size + l_message->hdr.message_size) { log_it(L_WARNING, "Invalid message size %zu, drop this packet", l_message_size); - return; + return false; } dap_chain_net_t *l_net = dap_chain_net_by_id(l_message->hdr.net_id); if (!l_net) { log_it(L_WARNING, "Can't find net with ID 0x%" DAP_UINT64_FORMAT_x, l_message->hdr.net_id.uint64); - return; + return false; } if (dap_chain_net_get_state(l_net) == NET_STATE_OFFLINE) { log_it(L_MSG, "Reject packet because net %s is offline", l_net->pub.name); a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; - return; + return false; } if (l_message->hdr.recv_addr.uint64 != g_node_addr.uint64) { log_it(L_WARNING, "Wrong packet destination address" NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_message->hdr.recv_addr)); - return; + return false; } dap_chain_esbocs_session_t *l_session; DL_FOREACH(s_session_items, l_session) @@ -1896,14 +1896,14 @@ static void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) if (!l_session) { log_it(L_WARNING, "Session for net %s not found", l_net->pub.name); a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; - return; + return false; } if (l_message->hdr.version != DAP_CHAIN_ESBOCS_PROTOCOL_VERSION) { debug_if(PVT(l_session->esbocs)->debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U " Message is rejected - different protocol version %hu (need %u)", l_session->chain->net_name, l_session->chain->name, l_session->cur_round.id, l_message->hdr.version, DAP_CHAIN_ESBOCS_PROTOCOL_VERSION); - return; + return false; } debug_if(PVT(l_session->esbocs)->debug, L_MSG, "net:%s, chain:%s, round:%"DAP_UINT64_FORMAT_U", attempt:%hhu." " Receive pkt type:0x%x from addr:"NODE_ADDR_FP_STR", my_addr:"NODE_ADDR_FP_STR"", @@ -1913,13 +1913,14 @@ static void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) struct esbocs_msg_args *l_args = DAP_NEW_SIZE(struct esbocs_msg_args, sizeof(struct esbocs_msg_args) + l_message_size); if (!l_args) { log_it(L_CRITICAL, g_error_memory_alloc); - return; + return false; } l_args->addr_from = a_ch->stream->node; l_args->session = l_session; l_args->message_size = l_message_size; memcpy(l_args->message, l_message, l_message_size); dap_proc_thread_callback_add(NULL, s_process_incoming_message, l_args); + return true; } /** diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index b362c7cf2e5f75981b3f8ca1fde89e7db680b252..5d256ba44edf9caebfaa6f39c7d8f349bcf9b3b9 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -165,6 +165,8 @@ struct chain_sync_context { dap_stream_node_addr_t current_link; dap_chain_t *cur_chain; dap_chain_cell_t *cur_cell; + dap_hash_fast_t requested_atom_hash; + uint64_t requested_atom_num; }; /** @@ -2482,6 +2484,8 @@ int s_net_load(dap_chain_net_t *a_net) return 0; } +static const uint64_t s_fork_sync_step = 20; // TODO get it from config + static void s_ch_in_pkt_callback(dap_stream_ch_t *a_ch, uint8_t a_type, const void *a_data, size_t a_data_size, void *a_arg) { debug_if(s_debug_more, L_DEBUG, "Got IN sync packet type %hhu size %zu from addr " NODE_ADDR_FP_STR, @@ -2491,10 +2495,76 @@ static void s_ch_in_pkt_callback(dap_stream_ch_t *a_ch, uint8_t a_type, const vo switch (a_type) { case DAP_CHAIN_CH_PKT_TYPE_ERROR: l_net_pvt->sync_context.state = SYNC_STATE_ERROR; - break; + return; + case DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAIN: l_net_pvt->sync_context.state = SYNC_STATE_SYNCED; - break; + return; + + case DAP_CHAIN_CH_PKT_TYPE_CHAIN_MISS: { + dap_chain_ch_miss_info_t *l_miss_info = (dap_chain_ch_miss_info_t *)(((dap_chain_ch_pkt_t *)(a_data))->data); + if (!dap_hash_fast_compare(&l_miss_info->missed_hash, &l_net_pvt->sync_context.requested_atom_hash)) { + char l_missed_hash_str[DAP_HASH_FAST_STR_SIZE]; + dap_hash_fast_to_str(&l_miss_info->missed_hash, l_missed_hash_str, DAP_HASH_FAST_STR_SIZE); + log_it(L_WARNING, "Get irrelevant chain sync MISSED packet with missed hash %s, but requested hash is %s", + l_missed_hash_str, + dap_hash_fast_to_str_static(&l_net_pvt->sync_context.requested_atom_hash)); + dap_stream_ch_write_error_unsafe(a_ch, l_net->pub.id.uint64, + l_net_pvt->sync_context.cur_chain->id.uint64, + l_net_pvt->sync_context.cur_cell + ? l_net_pvt->sync_context.cur_cell->id.uint64 + : 0, + DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE); + return; + } + dap_chain_atom_iter_t *l_iter = l_net_pvt->sync_context.cur_chain->callback_atom_iter_create( + l_net_pvt->sync_context.cur_chain, + l_net_pvt->sync_context.cur_cell + ? l_net_pvt->sync_context.cur_cell->id + : c_dap_chain_cell_id_null, + NULL); + if (!l_iter) { + log_it(L_CRITICAL, g_error_memory_alloc); + dap_stream_ch_write_error_unsafe(a_ch, l_net->pub.id.uint64, + l_net_pvt->sync_context.cur_chain->id.uint64, + l_net_pvt->sync_context.cur_cell + ? l_net_pvt->sync_context.cur_cell->id.uint64 + : 0, + DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY); + return; + } + dap_chain_atom_ptr_t l_atom = l_net_pvt->sync_context.cur_chain->callback_atom_find_by_hash(l_iter, &l_miss_info->last_hash, NULL); + if (l_atom && l_iter->cur_num == l_miss_info->last_num) { // We already have this subchain in our chain + l_net_pvt->sync_context.state = SYNC_STATE_SYNCED; + return; + } + dap_chain_ch_sync_request_t l_request = {}; + l_request.num_from = l_net_pvt->sync_context.requested_atom_num > s_fork_sync_step + ? l_net_pvt->sync_context.requested_atom_num - s_fork_sync_step + : 0; + if (l_request.num_from) { + l_atom = l_net_pvt->sync_context.cur_chain->callback_atom_get_by_num(l_iter, l_request.num_from); + assert(l_atom); + l_request.hash_from = *l_iter->cur_hash; + } + l_net_pvt->sync_context.cur_chain->callback_atom_iter_delete(l_iter); + debug_if(s_debug_more, L_INFO, "Send sync request to node " NODE_ADDR_FP_STR + " for net %s and chain %s, hash from %s, num from %" DAP_UINT64_FORMAT_U, + NODE_ADDR_FP_ARGS_S(l_net_pvt->sync_context.current_link), + l_net->pub.name, l_net_pvt->sync_context.cur_chain->name, + dap_hash_fast_to_str_static(&l_request.hash_from), l_request.num_from); + dap_chain_ch_pkt_write_unsafe(a_ch, + DAP_CHAIN_CH_PKT_TYPE_CHAIN_REQ, + l_net->pub.id.uint64, + l_net_pvt->sync_context.cur_chain->id.uint64, + l_net_pvt->sync_context.cur_cell + ? l_net_pvt->sync_context.cur_cell->id.uint64 + : 0, + &l_request, + sizeof(l_request)); + l_net_pvt->sync_context.requested_atom_hash = l_request.hash_from; + l_net_pvt->sync_context.requested_atom_num = l_request.num_from; + } default: break; } @@ -2563,19 +2633,37 @@ static void s_sync_timer_callback(void *a_arg) // TODO make correct working with cells assert(l_net_pvt->sync_context.cur_chain); l_net_pvt->sync_context.cur_cell = l_net_pvt->sync_context.cur_chain->cells; - log_it(L_INFO, "Start synchronization process with " NODE_ADDR_FP_STR " for net %s and chain %s", - NODE_ADDR_FP_ARGS_S(l_net_pvt->sync_context.current_link), - l_net->pub.name, l_net_pvt->sync_context.cur_chain->name); l_net_pvt->sync_context.state = l_net_pvt->sync_context.last_state = SYNC_STATE_WAITING; - dap_hash_fast_t l_last_hash; - dap_chain_get_atom_last_hash(l_net_pvt->sync_context.cur_chain, &l_last_hash, l_net_pvt->sync_context.cur_cell - ? l_net_pvt->sync_context.cur_cell->id : c_dap_chain_cell_id_null); + dap_chain_ch_sync_request_t l_request = {}; + uint64_t l_last_num = 0; + if (!dap_chain_get_atom_last_hash_num(l_net_pvt->sync_context.cur_chain, + l_net_pvt->sync_context.cur_cell + ? l_net_pvt->sync_context.cur_cell->id + : c_dap_chain_cell_id_null, + &l_request.hash_from, + &l_last_num)) { + log_it(L_ERROR, "Can't get last atom hash and number for chain %s with net %s", l_net_pvt->sync_context.cur_chain->name, + l_net->pub.name); + return; + } + l_request.num_from = l_last_num; dap_chain_ch_pkt_t *l_chain_pkt = dap_chain_ch_pkt_new(l_net->pub.id.uint64, l_net_pvt->sync_context.cur_chain->id.uint64, l_net_pvt->sync_context.cur_cell ? l_net_pvt->sync_context.cur_cell->id.uint64 : 0, - &l_last_hash, sizeof(l_last_hash)); + &l_request, sizeof(l_request)); + if (!l_chain_pkt) { + log_it(L_CRITICAL, g_error_memory_alloc); + return; + } + log_it(L_INFO, "Start synchronization process with " NODE_ADDR_FP_STR + " for net %s and chain %s, last hash %s, last num %" DAP_UINT64_FORMAT_U, + NODE_ADDR_FP_ARGS_S(l_net_pvt->sync_context.current_link), + l_net->pub.name, l_net_pvt->sync_context.cur_chain->name, + dap_hash_fast_to_str_static(&l_request.hash_from), l_last_num); dap_stream_ch_pkt_send_by_addr(&l_net_pvt->sync_context.current_link, DAP_CHAIN_CH_ID, DAP_CHAIN_CH_PKT_TYPE_CHAIN_REQ, l_chain_pkt, dap_chain_ch_pkt_get_size(l_chain_pkt)); + l_net_pvt->sync_context.requested_atom_hash = l_request.hash_from; + l_net_pvt->sync_context.requested_atom_num = l_request.num_from; DAP_DELETE(l_chain_pkt); } if (l_net_pvt->sync_context.last_state != SYNC_STATE_IDLE && diff --git a/modules/net/dap_chain_node_cli_cmd.c b/modules/net/dap_chain_node_cli_cmd.c index 3c136b9ef2fe3c5f7d26a168edbb4fe289f618d2..c0faca96be5c64b81e846e1f4cba411dabdb30e5 100644 --- a/modules/net/dap_chain_node_cli_cmd.c +++ b/modules/net/dap_chain_node_cli_cmd.c @@ -1180,7 +1180,7 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) } log_it(L_NOTICE, "Stream connection established"); - dap_chain_ch_sync_request_t l_sync_request = {}; + dap_chain_ch_sync_request_old_t l_sync_request = {}; dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, DAP_CHAIN_CH_ID); // fill begin id l_sync_request.id_start = 1; @@ -1223,7 +1223,7 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) // reset state NODE_CLIENT_STATE_SYNCED dap_chain_node_client_reset(l_node_client); // send request - dap_chain_ch_sync_request_t l_sync_request = {}; + dap_chain_ch_sync_request_old_t l_sync_request = {}; if(0 == dap_chain_ch_pkt_write_unsafe(l_ch_chain, DAP_CHAIN_CH_PKT_TYPE_SYNC_CHAINS, l_net->pub.id.uint64, l_chain->id.uint64, l_remote_node_info->hdr.cell_id.uint64, &l_sync_request, sizeof(l_sync_request))) { diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index f207fb10adf0b4256ecc0f81e1e69f9f0557d1c6..22abf697ed0316fd2c8f728b2505822f1ee9b31f 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -200,7 +200,7 @@ static char* s_ch_vpn_get_my_pkey_str(dap_chain_net_srv_usage_t* a_usage); // Stream callbacks static void s_ch_vpn_new(dap_stream_ch_t* ch, void* arg); static void s_ch_vpn_delete(dap_stream_ch_t* ch, void* arg); -static void s_ch_packet_in(dap_stream_ch_t* ch, void* a_arg); +static bool s_ch_packet_in(dap_stream_ch_t* ch, void* a_arg); static bool s_ch_packet_out(dap_stream_ch_t* ch, void* arg); static void s_ch_vpn_esocket_assigned(dap_events_socket_t* a_es, dap_worker_t * l_worker); @@ -1687,7 +1687,7 @@ static void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_ * @param ch * @param arg */ -void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) +static bool s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) { dap_stream_ch_pkt_t * l_pkt = (dap_stream_ch_pkt_t *) a_arg; dap_chain_net_srv_stream_session_t * l_srv_session = DAP_CHAIN_NET_SRV_STREAM_SESSION (a_ch->stream->session ); @@ -1698,7 +1698,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_NOTICE, "No active usage in list, possible disconnected. Send nothing on this channel"); dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); - return; + return false; } if ( ! l_usage->is_active ){ @@ -1707,7 +1707,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_pkt_write_unsafe( l_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); - return; + return false; } // check role if (dap_chain_net_get_role(l_usage->net).enums > NODE_ROLE_MASTER) { @@ -1720,7 +1720,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_pkt_write_unsafe( l_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); dap_stream_ch_set_ready_to_read_unsafe(a_ch,false); - return; + return false; } // TODO move address leasing to this structure @@ -1808,6 +1808,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_WARNING, "Can't process SF type 0x%02x", l_vpn_pkt->header.op_code); } } + return true; } /** diff --git a/modules/type/blocks/dap_chain_cs_blocks.c b/modules/type/blocks/dap_chain_cs_blocks.c index 82b8d5aa2c9b7b22ea74d5cf8734093524d62372..98ed880f12811494fe83edeaded80a2daa8bfdf0 100644 --- a/modules/type/blocks/dap_chain_cs_blocks.c +++ b/modules/type/blocks/dap_chain_cs_blocks.c @@ -102,6 +102,7 @@ static size_t s_callback_atom_get_static_hdr_size(void); static dap_chain_atom_iter_t *s_callback_atom_iter_create(dap_chain_t *a_chain, dap_chain_cell_id_t a_cell_id, dap_hash_fast_t *a_hash_from); static dap_chain_atom_ptr_t s_callback_atom_iter_find_by_hash(dap_chain_atom_iter_t * a_atom_iter , dap_chain_hash_fast_t * a_atom_hash, size_t * a_atom_size); +static dap_chain_atom_ptr_t s_callback_atom_iter_get_by_num(dap_chain_atom_iter_t *a_atom_iter, uint64_t a_atom_num); static dap_chain_datum_t *s_callback_datum_find_by_hash(dap_chain_t *a_chain, dap_chain_hash_fast_t *a_datum_hash, dap_chain_hash_fast_t *a_block_hash, int *a_ret_code); @@ -253,6 +254,7 @@ static int s_chain_cs_blocks_new(dap_chain_t * a_chain, dap_config_t * a_chain_c a_chain->callback_atom_get_timestamp = s_chain_callback_atom_get_timestamp; a_chain->callback_atom_find_by_hash = s_callback_atom_iter_find_by_hash; + a_chain->callback_atom_get_by_num = s_callback_atom_iter_get_by_num; a_chain->callback_datum_find_by_hash = s_callback_datum_find_by_hash; a_chain->callback_block_find_by_tx_hash = s_callback_block_find_by_tx_hash; @@ -1468,6 +1470,28 @@ static dap_chain_atom_ptr_t s_callback_atom_iter_find_by_hash(dap_chain_atom_ite return a_atom_iter->cur; } +static dap_chain_atom_ptr_t s_callback_atom_iter_get_by_num(dap_chain_atom_iter_t *a_atom_iter, uint64_t a_atom_num) +{ + assert(a_atom_iter); + dap_chain_cs_blocks_t *l_blocks = DAP_CHAIN_CS_BLOCKS(a_atom_iter->chain); + dap_chain_block_cache_t *l_block_cache = NULL; + pthread_rwlock_rdlock(&PVT(l_blocks)->rwlock); + for (l_block_cache = PVT(l_blocks)->blocks; l_block_cache; l_block_cache = l_block_cache->hh.next) + if (l_block_cache->block_number == a_atom_num) + break; + a_atom_iter->cur_item = l_block_cache; + if (l_block_cache) { + a_atom_iter->cur = l_block_cache->block; + a_atom_iter->cur_size = l_block_cache->block_size; + a_atom_iter->cur_hash = &l_block_cache->block_hash; + a_atom_iter->cur_num = l_block_cache->block_number; + } else + *a_atom_iter = (dap_chain_atom_iter_t) { .chain = a_atom_iter->chain, + .cell_id = a_atom_iter->cell_id }; + pthread_rwlock_unlock(&PVT(l_blocks)->rwlock); + return a_atom_iter->cur; +} + /** * @brief s_callback_atom_iter_find_by_tx_hash * @param a_chain diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 8470fe80416d660e0b2118a6e117fe98ace92cc4..5a9cc40d795ac213e7be2a8a5c9c719349e74d5e 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -104,6 +104,7 @@ static dap_chain_atom_iter_t* s_chain_callback_atom_iter_create(dap_chain_t * a_ static dap_chain_atom_ptr_t s_chain_callback_atom_iter_find_by_hash(dap_chain_atom_iter_t * a_atom_iter , dap_chain_hash_fast_t * a_atom_hash, size_t * a_atom_size); +static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get_by_num(dap_chain_atom_iter_t *a_atom_iter, uint64_t a_atom_num); static dap_chain_datum_t *s_chain_callback_atom_find_by_datum_hash(dap_chain_t *a_chain, dap_chain_hash_fast_t *a_datum_hash, dap_chain_hash_fast_t *a_event_hash, int *a_ret_code); static dap_chain_datum_t** s_chain_callback_atom_get_datum(dap_chain_atom_ptr_t a_event, size_t a_atom_size, size_t *a_datums_count); @@ -218,6 +219,7 @@ static int s_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) a_chain->callback_atom_iter_delete = s_chain_callback_atom_iter_delete; a_chain->callback_atom_iter_get = s_chain_callback_atom_iter_get; // Linear pass through a_chain->callback_atom_find_by_hash = s_chain_callback_atom_iter_find_by_hash; // Get element by hash + a_chain->callback_atom_get_by_num = s_chain_callback_atom_iter_get_by_num; a_chain->callback_atom_iter_get_links = s_chain_callback_atom_iter_get_links; a_chain->callback_atom_get_datums = s_chain_callback_atom_get_datum; @@ -515,16 +517,15 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha debug_if(s_debug_more, L_WARNING, "... added with ledger code %d", l_consensus_check); break; } - dap_chain_cs_dag_event_item_t *l_tail = PVT(l_dag)->events ? PVT(l_dag)->events->hh.tbl->tail->prev : NULL; - if (!l_tail) - l_tail = PVT(l_dag)->events; - else - l_tail = l_tail->hh.next; - if (l_tail && l_tail->event->header.ts_created > l_event->header.ts_created) + dap_chain_cs_dag_event_item_t *l_tail; + HASH_LAST(PVT(l_dag)->events, l_tail); + if (l_tail && l_tail->event->header.ts_created > l_event->header.ts_created) { DAP_CHAIN_PVT(a_chain)->need_reorder = true; - if (DAP_CHAIN_PVT(a_chain)->need_reorder) HASH_ADD_INORDER(hh, PVT(l_dag)->events, hash, sizeof(l_event_item->hash), l_event_item, s_sort_event_item); - else + dap_chain_cs_dag_event_item_t *it = PVT(l_dag)->events; + for (uint64_t i = 0; it; it = it->hh.next) // renumber chain events + it->event_number = ++i; + } else HASH_ADD(hh, PVT(l_dag)->events, hash, sizeof(l_event_item->hash), l_event_item); s_dag_events_lasts_process_new_last_event(l_dag, l_event_item); } break; @@ -1143,17 +1144,40 @@ static dap_chain_atom_ptr_t s_chain_callback_atom_iter_find_by_hash(dap_chain_at dap_chain_cs_dag_event_item_t *l_event_item = NULL; pthread_mutex_lock(&PVT(l_dag)->events_mutex); HASH_FIND(hh, PVT(l_dag)->events, a_atom_hash, sizeof(*a_atom_hash), l_event_item); - pthread_mutex_unlock(&PVT(l_dag)->events_mutex); - if (!l_event_item) - return NULL; - a_atom_iter->cur_item = l_event_item; - a_atom_iter->cur = l_event_item->event; - a_atom_iter->cur_size= l_event_item->event_size; - a_atom_iter->cur_hash = &l_event_item->hash; - a_atom_iter->cur_num = l_event_item->event_number; + if (l_event_item) { + a_atom_iter->cur_item = l_event_item; + a_atom_iter->cur = l_event_item->event; + a_atom_iter->cur_size = l_event_item->event_size; + a_atom_iter->cur_hash = &l_event_item->hash; + a_atom_iter->cur_num = l_event_item->event_number; + } else + *a_atom_iter = (dap_chain_atom_iter_t) { .chain = a_atom_iter->chain, + .cell_id = a_atom_iter->cell_id }; if (a_atom_size) - *a_atom_size = l_event_item->event_size; - return l_event_item->event; + *a_atom_size = a_atom_iter->cur_size; + pthread_mutex_unlock(&PVT(l_dag)->events_mutex); + return a_atom_iter->cur; +} + +static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get_by_num(dap_chain_atom_iter_t *a_atom_iter, uint64_t a_atom_num) +{ + dap_chain_cs_dag_t *l_dag = DAP_CHAIN_CS_DAG(a_atom_iter->chain); + dap_chain_cs_dag_event_item_t *l_event_item = NULL; + pthread_mutex_lock(&PVT(l_dag)->events_mutex); + for (l_event_item = PVT(l_dag)->events; l_event_item; l_event_item = l_event_item->hh.next) + if (l_event_item->event_number == a_atom_num) + break; + if (l_event_item) { + a_atom_iter->cur_item = l_event_item; + a_atom_iter->cur = l_event_item->event; + a_atom_iter->cur_size = l_event_item->event_size; + a_atom_iter->cur_hash = &l_event_item->hash; + a_atom_iter->cur_num = l_event_item->event_number; + } else + *a_atom_iter = (dap_chain_atom_iter_t) { .chain = a_atom_iter->chain, + .cell_id = a_atom_iter->cell_id }; + pthread_mutex_unlock(&PVT(l_dag)->events_mutex); + return a_atom_iter->cur; } /**