-
Roman Khlopkov authored80a29cfa
dap_chain_ch.c 89.31 KiB
/*
* Authors:
* Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net>
* Alexander Lysikov <alexander.lysikov@demlabs.net>
* DeM Labs Inc. https://demlabs.net
* Kelvin Project https://github.com/kelvinblockchain
* Copyright (c) 2017-2018
* All rights reserved.
This file is part of DAP (Demlabs Application Protocol) the open source project
DAP (Demlabs Application Protocol) is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <stdio.h>
#include <stdlib.h>
#include <stddef.h>
#include <stdint.h>
#ifdef WIN32
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#include <pthread.h>
#endif
#include "dap_common.h"
#include "dap_strfuncs.h"
#include "dap_list.h"
#include "dap_config.h"
#include "dap_hash.h"
#include "dap_time.h"
#include "utlist.h"
#include "dap_worker.h"
#include "dap_events.h"
#include "dap_proc_thread.h"
#include "dap_client_pvt.h"
#include "dap_chain.h"
#include "dap_chain_datum.h"
#include "dap_chain_cs.h"
#include "dap_chain_cell.h"
#include "dap_global_db_legacy.h"
#include "dap_global_db_pkt.h"
#include "dap_global_db_ch.h"
#include "dap_stream.h"
#include "dap_stream_pkt.h"
#include "dap_stream_worker.h"
#include "dap_stream_ch_pkt.h"
#include "dap_stream_ch.h"
#include "dap_stream_ch_proc.h"
#include "dap_chain_ch.h"
#include "dap_chain_ch_pkt.h"
#include "dap_stream_ch_gossip.h"
#define LOG_TAG "dap_chain_ch"
#define DAP_CHAIN_PKT_EXPECT_SIZE DAP_STREAM_PKT_FRAGMENT_SIZE
enum sync_context_state {
SYNC_STATE_IDLE,
SYNC_STATE_READY,
SYNC_STATE_BUSY,
SYNC_STATE_OVER
};
struct sync_context {
atomic_uint_fast64_t allowed_num;
atomic_uint_fast16_t state;
dap_chain_atom_iter_t *iter;
dap_stream_node_addr_t addr;
dap_chain_net_id_t net_id;
dap_chain_id_t chain_id;
dap_chain_cell_id_t cell_id;
uint64_t num_last;
dap_time_t last_activity;
};
typedef struct dap_chain_ch_hash_item {
dap_hash_fast_t hash;
uint32_t size;
UT_hash_handle hh;
} dap_chain_ch_hash_item_t;
struct legacy_sync_context {
dap_stream_worker_t *worker;
dap_stream_ch_uuid_t ch_uuid;
dap_stream_node_addr_t remote_addr;
dap_chain_ch_pkt_hdr_t request_hdr;
_Atomic(dap_chain_ch_state_t) state;
dap_chain_ch_error_type_t last_error;
bool is_type_of_gdb;
union {
struct {
dap_chain_ch_hash_item_t *remote_atoms; // Remote atoms
dap_chain_atom_iter_t *atom_iter; // Atom iterator
uint64_t stats_request_atoms_processed; // Atoms statictic
};
struct {
dap_chain_ch_hash_item_t *remote_gdbs; // Remote gdbs
dap_global_db_legacy_list_t *db_list; // DB iterator
uint64_t stats_request_gdbs_processed; // DB statictic
};
};
dap_time_t last_activity;
dap_chain_ch_state_t prev_state;
size_t enqueued_data_size;
};
typedef struct dap_chain_ch {
void *_inheritor;
dap_timerfd_t *sync_timer;
struct sync_context *sync_context;
// Legacy section //
dap_timerfd_t *activity_timer;
uint32_t timer_shots;
int sent_breaks;
struct legacy_sync_context *legacy_sync_context;
} dap_chain_ch_t;
#define DAP_CHAIN_CH(a) ((dap_chain_ch_t *) ((a)->internal) )
#define DAP_STREAM_CH(a) ((dap_stream_ch_t *)((a)->_inheritor))
static void s_ch_chain_go_idle(dap_chain_ch_t *a_ch_chain);
static void s_stream_ch_new(dap_stream_ch_t *a_ch, void *a_arg);
static void s_stream_ch_delete(dap_stream_ch_t *a_ch, void *a_arg);
static bool s_stream_ch_packet_in(dap_stream_ch_t * a_ch, void *a_arg);
static void s_stream_ch_io_complete(dap_events_socket_t *a_es, void *a_arg);
static bool s_sync_in_chains_callback(void *a_arg);
static bool s_sync_out_chains_proc_callback(void *a_arg);
static bool s_gdb_in_pkt_proc_callback(void *a_arg);
static bool s_sync_out_gdb_proc_callback(void *a_arg);
static void s_stream_ch_chain_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id,
uint64_t a_chain_id, uint64_t a_cell_id,
const void * a_data, size_t a_data_size);
static void s_gossip_payload_callback(void *a_payload, size_t a_payload_size, dap_stream_node_addr_t a_sender_addr);
static bool s_chain_iter_callback(void *a_arg);
static bool s_chain_iter_delete_callback(void *a_arg);
static bool s_sync_timer_callback(void *a_arg);
static bool s_debug_more = false, s_debug_legacy = false;
static uint32_t s_sync_timeout = 30;
static uint32_t s_sync_packets_per_thread_call = 10;
static uint32_t s_sync_ack_window_size = 100; // atoms
// Legacy
static uint_fast16_t s_update_pack_size = 100; // Number of hashes packed into the one packet
#ifdef DAP_SYS_DEBUG
enum {MEMSTAT$K_STM_CH_CHAIN, MEMSTAT$K_NR};
static dap_memstat_rec_t s_memstat [MEMSTAT$K_NR] = {
{.fac_len = sizeof(LOG_TAG) - 1, .fac_name = {LOG_TAG}, .alloc_sz = sizeof(dap_chain_ch_t)},
};
#endif
static const char *s_error_type_to_string(dap_chain_ch_error_type_t a_error)
{
switch (a_error) {
case DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS:
return "SYNC_REQUEST_ALREADY_IN_PROCESS";
case DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE:
return "INCORRECT_SYNC_SEQUENCE";
case DAP_CHAIN_CH_ERROR_SYNC_TIMEOUT:
return "SYNCHRONIZATION TIMEOUT";
case DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE:
return "INVALID_PACKET_SIZE";
case DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE:
return "INVALID_LEGACY_PACKET_SIZE";
case DAP_CHAIN_CH_ERROR_NET_INVALID_ID:
return "INVALID_NET_ID";
case DAP_CHAIN_CH_ERROR_CHAIN_NOT_FOUND:
return "CHAIN_NOT_FOUND";
case DAP_CHAIN_CH_ERROR_ATOM_NOT_FOUND:
return "ATOM_NOT_FOUND";
case DAP_CHAIN_CH_ERROR_UNKNOWN_CHAIN_PKT_TYPE:
return "UNKNOWN_CHAIN_PACKET_TYPE";
case DAP_CHAIN_CH_ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED:
return "GLOBAL_DB_INTERNAL_SAVING_ERROR";
case DAP_CHAIN_CH_ERROR_NET_IS_OFFLINE:
return "NET_IS_OFFLINE";
case DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY:
return "OUT_OF_MEMORY";
case DAP_CHAIN_CH_ERROR_INTERNAL:
return "INTERNAL_ERROR";
default:
return "UNKNOWN_ERROR";
}
}
/**
* @brief dap_chain_ch_init
* @return
*/
int dap_chain_ch_init()
{
log_it(L_NOTICE, "Chains exchange channel initialized");
dap_stream_ch_proc_add(DAP_CHAIN_CH_ID, s_stream_ch_new, s_stream_ch_delete, s_stream_ch_packet_in, NULL);
s_sync_timeout = dap_config_get_item_uint32_default(g_config, "chain", "sync_timeout", s_sync_timeout);
s_sync_ack_window_size = dap_config_get_item_uint32_default(g_config, "chain", "sync_ack_window_size", s_sync_ack_window_size);
s_sync_packets_per_thread_call = dap_config_get_item_int16_default(g_config, "chain", "pack_size", s_sync_packets_per_thread_call);
s_debug_more = dap_config_get_item_bool_default(g_config, "chain", "debug_more", false);
s_debug_legacy = dap_config_get_item_bool_default(g_config, "chain", "debug_legacy", false);
#ifdef DAP_SYS_DEBUG
for (int i = 0; i < MEMSTAT$K_NR; i++)
dap_memstat_reg(&s_memstat[i]);
#endif
return dap_stream_ch_gossip_callback_add(DAP_CHAIN_CH_ID, s_gossip_payload_callback);
}
/**
* @brief dap_chain_ch_deinit
*/
void dap_chain_ch_deinit()
{
}
/**
* @brief s_stream_ch_new
* @param a_ch
* @param arg
*/
void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg)
{
UNUSED(a_arg);
if (!(a_ch->internal = DAP_NEW_Z(dap_chain_ch_t))) {
log_it(L_CRITICAL, "%s", g_error_memory_alloc);
return;
};
dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(a_ch);
l_ch_chain->_inheritor = a_ch;
#ifdef DAP_SYS_DEBUG
atomic_fetch_add(&s_memstat[MEMSTAT$K_STM_CH_CHAIN].alloc_nr, 1);
#endif
debug_if(s_debug_more, L_DEBUG, "[stm_ch_chain:%p] --- created chain:%p", a_ch, l_ch_chain);
}
/**
* @brief s_stream_ch_delete
* @param ch
* @param arg
*/
static void s_stream_ch_delete(dap_stream_ch_t *a_ch, void *a_arg)
{
UNUSED(a_arg);
dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(a_ch);
s_ch_chain_go_idle(l_ch_chain);
debug_if(s_debug_more, L_DEBUG, "[stm_ch_chain:%p] --- deleted chain:%p", a_ch, l_ch_chain);
DAP_DEL_Z(a_ch->internal);
#ifdef DAP_SYS_DEBUG
atomic_fetch_add(&s_memstat[MEMSTAT$K_STM_CH_CHAIN].free_nr, 1);
#endif
}
// *** Legacy support code *** //
/**
* @brief dap_chain_ch_create_sync_request_gdb
* @param a_ch_chain
* @param a_net
*/
struct legacy_sync_context *s_legacy_sync_context_create(dap_chain_ch_pkt_t *a_chain_pkt, dap_stream_ch_t* a_ch)
{
dap_chain_ch_t * l_ch_chain = DAP_CHAIN_CH(a_ch);
dap_return_val_if_fail(l_ch_chain, NULL);
struct legacy_sync_context *l_context;
DAP_NEW_Z_RET_VAL(l_context, struct legacy_sync_context, NULL, NULL);
*l_context = (struct legacy_sync_context) {
.worker = a_ch->stream_worker,
.ch_uuid = a_ch->uuid,
.remote_addr = *(dap_stream_node_addr_t *)a_chain_pkt->data,
.request_hdr = a_chain_pkt->hdr,
.state = DAP_CHAIN_CH_STATE_IDLE,
.last_activity = dap_time_now()
};
dap_stream_ch_uuid_t *l_uuid = DAP_DUP(&a_ch->uuid);
if (!l_uuid) {
log_it(L_CRITICAL, g_error_memory_alloc);
DAP_DELETE(l_context);
return NULL;
}
l_ch_chain->sync_timer = dap_timerfd_start_on_worker(a_ch->stream_worker->worker, 1000, s_sync_timer_callback, l_uuid);
a_ch->stream->esocket->callbacks.write_finished_callback = s_stream_ch_io_complete;
a_ch->stream->esocket->callbacks.arg = l_context;
return l_context;
}
/**
* @brief s_stream_ch_chain_delete
* @param a_ch_chain
*/
static void s_legacy_sync_context_delete(void *a_arg)
{
struct legacy_sync_context *l_context = a_arg;
dap_return_if_fail(l_context);
dap_chain_ch_hash_item_t *l_hash_item, *l_tmp;
if (l_context->is_type_of_gdb) {
HASH_ITER(hh, l_context->remote_gdbs, l_hash_item, l_tmp) {
// Clang bug at this, l_hash_item should change at every loop cycle
HASH_DEL(l_context->remote_gdbs, l_hash_item);
DAP_DELETE(l_hash_item);
}
l_context->remote_atoms = NULL;
if (l_context->db_list)
dap_global_db_legacy_list_delete(l_context->db_list);
} else {
HASH_ITER(hh, l_context->remote_atoms, l_hash_item, l_tmp) {
// Clang bug at this, l_hash_item should change at every loop cycle
HASH_DEL(l_context->remote_atoms, l_hash_item);
DAP_DELETE(l_hash_item);
}
l_context->remote_gdbs = NULL;
if (l_context->atom_iter)
l_context->atom_iter->chain->callback_atom_iter_delete(l_context->atom_iter);
}
dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(l_context->worker, l_context->ch_uuid);
if (l_ch) {
DAP_CHAIN_CH(l_ch)->legacy_sync_context = NULL;
l_ch->stream->esocket->callbacks.write_finished_callback = NULL;
l_ch->stream->esocket->callbacks.arg = NULL;
}
DAP_DELETE(l_context);
}
static bool s_sync_out_gdb_proc_callback(void *a_arg)
{
struct legacy_sync_context *l_context = a_arg;
dap_chain_ch_state_t l_cur_state = l_context->state;
if (l_cur_state == DAP_CHAIN_CH_STATE_WAITING)
return false;
if (l_cur_state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB && l_cur_state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB)
// Illegal context
goto context_delete;
dap_list_t *l_list_out = dap_global_db_legacy_list_get_multiple(l_context->db_list, s_update_pack_size);
if (!l_list_out) {
dap_chain_ch_sync_request_old_t l_payload = { .node_addr = g_node_addr };
uint8_t l_type = l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB ? DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END
: DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB;
debug_if(s_debug_legacy, L_INFO, "Out: %s", dap_chain_ch_pkt_type_to_str(l_type));
dap_chain_ch_pkt_write_mt(l_context->worker, l_context->ch_uuid, l_type,
l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id,
&l_payload, sizeof(l_payload));
if (l_cur_state == DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB) {
if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_IDLE) ||
l_cur_state == DAP_CHAIN_CH_STATE_ERROR)
goto context_delete;
return false;
}
log_it(L_INFO, "Synchronized database: items synchronized %" DAP_UINT64_FORMAT_U " from %zu",
l_context->stats_request_gdbs_processed, l_context->db_list->items_number);
dap_global_db_legacy_list_rewind(l_context->db_list);
if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB_REMOTE) ||
l_cur_state == DAP_CHAIN_CH_STATE_WAITING)
return false;
goto context_delete;
}
void *l_data = NULL;
size_t l_data_size = 0;
uint8_t l_type = DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB;
if (l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB) {
l_type = DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB;
l_data_size = dap_list_length(l_list_out) * sizeof(dap_chain_ch_update_element_t);
l_data = DAP_NEW_Z_SIZE(dap_chain_ch_update_element_t, l_data_size);
if (!l_data) {
log_it(L_CRITICAL, g_error_memory_alloc);
l_context->state = DAP_CHAIN_CH_STATE_ERROR;
goto context_delete;
}
}
bool l_go_wait = false;
size_t i = 0;
for (dap_list_t *it = l_list_out; it; it = it->next, i++) {
dap_global_db_pkt_old_t *l_pkt = it->data;
if (l_context->db_list->items_rest)
--l_context->db_list->items_rest;
dap_hash_t l_pkt_hash;
dap_hash_fast(l_pkt->data, l_pkt->data_size, &l_pkt_hash);
if (l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB) {
dap_chain_ch_update_element_t *l_hashes = l_data;
l_hashes[i].hash = l_pkt_hash;
l_hashes[i].size = l_pkt->data_size;
} else { // l_cur_state == DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB
dap_chain_ch_hash_item_t *l_hash_item = NULL;
HASH_FIND(hh, l_context->remote_gdbs, &l_pkt_hash, sizeof(dap_hash_fast_t), l_hash_item);
if (!l_hash_item) {
dap_global_db_pkt_old_t *l_pkt_pack = l_data;
size_t l_cur_size = l_pkt_pack ? l_pkt_pack->data_size : 0;
if (l_cur_size + sizeof(dap_global_db_pkt_old_t) + l_pkt->data_size >= DAP_CHAIN_PKT_EXPECT_SIZE) {
l_context->enqueued_data_size += l_data_size;
if (!l_go_wait && l_context->enqueued_data_size > DAP_EVENTS_SOCKET_BUF_SIZE / 2) {
atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_WAITING);
l_context->prev_state = l_cur_state;
l_go_wait = true;
}
dap_chain_ch_pkt_write_mt(l_context->worker, l_context->ch_uuid, l_type,
l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id,
l_data, l_data_size);
debug_if(s_debug_legacy, L_INFO, "Send one global_db packet len=%zu (rest=%zu/%zu items)", l_data_size,
l_context->db_list->items_rest, l_context->db_list->items_number);
l_context->last_activity = dap_time_now();
DAP_DEL_Z(l_pkt_pack);
l_cur_size = 0;
}
l_pkt_pack = dap_global_db_pkt_pack_old(l_pkt_pack, l_pkt);
if (!l_pkt_pack || l_cur_size == l_pkt_pack->data_size) {
l_context->state = DAP_CHAIN_CH_STATE_ERROR;
goto context_delete;
}
l_context->stats_request_gdbs_processed++;
l_data = l_pkt_pack;
l_data_size = sizeof(dap_global_db_pkt_old_t) + l_pkt_pack->data_size;
} /* else // Over-extended debug
debug_if(s_debug_legacy, L_DEBUG, "Skip GDB hash %s because its already present in remote GDB hash table",
dap_hash_fast_to_str_static(&l_pkt_hash));
*/
}
}
dap_list_free_full(l_list_out, NULL);
if (l_data && l_data_size) {
dap_chain_ch_pkt_write_mt(l_context->worker, l_context->ch_uuid, l_type,
l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id,
l_data, l_data_size);
if (l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB)
debug_if(s_debug_legacy, L_INFO, "Out: %s, %zu records", dap_chain_ch_pkt_type_to_str(l_type), i);
else
debug_if(s_debug_legacy, L_INFO, "Send one global_db packet len=%zu (rest=%zu/%zu items)", l_data_size,
l_context->db_list->items_rest, l_context->db_list->items_number);
l_context->last_activity = dap_time_now();
DAP_DELETE(l_data);
} else if (l_context->last_activity + 3 < dap_time_now()) {
l_context->last_activity = dap_time_now();
debug_if(s_debug_more, L_INFO, "Send one GlobalDB no freeze packet");
dap_chain_ch_pkt_write_mt(l_context->worker, l_context->ch_uuid, DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB_NO_FREEZE,
l_context->request_hdr.net_id, l_context->request_hdr.chain_id,
l_context->request_hdr.cell_id, NULL, 0);
}
return true;
context_delete:
dap_worker_exec_callback_on(l_context->worker->worker, s_legacy_sync_context_delete, l_context);
return false;
}
struct record_processing_args {
dap_stream_worker_t *worker;
dap_stream_ch_uuid_t uuid;
dap_chain_ch_pkt_hdr_t hdr;
dap_global_db_pkt_old_t *pkt;
};
static bool s_gdb_in_pkt_proc_callback(void *a_arg)
{
struct record_processing_args *l_args = a_arg;
size_t l_objs_count = 0;
dap_store_obj_t *l_objs = dap_global_db_pkt_deserialize_old(l_args->pkt, &l_objs_count);
DAP_DELETE(l_args->pkt);
if (!l_objs || !l_objs_count) {
log_it(L_WARNING, "Deserialization of legacy global DB packet failed");
DAP_DELETE(l_args);
return false;
}
bool l_success = false;
dap_stream_node_addr_t l_blank_addr = { .uint64 = 0 };
for (uint32_t i = 0; i < l_objs_count; i++)
if (!(l_success = dap_global_db_ch_check_store_obj(l_objs + i, &l_blank_addr)))
break;
if (l_success && dap_global_db_set_raw_sync(l_objs, l_objs_count)) {
const char *l_err_str = s_error_type_to_string(DAP_CHAIN_CH_ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED);
dap_chain_ch_pkt_t *l_chain_pkt = dap_chain_ch_pkt_new(l_args->hdr.net_id, l_args->hdr.chain_id, l_args->hdr.cell_id, l_err_str, strlen(l_err_str));
dap_stream_ch_pkt_write_mt(l_args->worker, l_args->uuid, DAP_CHAIN_CH_PKT_TYPE_ERROR, l_chain_pkt, dap_chain_ch_pkt_get_size(l_chain_pkt));
DAP_DELETE(l_chain_pkt);
}
dap_store_obj_free(l_objs, l_objs_count);
DAP_DELETE(l_args);
return false;
}
static bool s_sync_out_chains_proc_callback(void *a_arg)
{
struct legacy_sync_context *l_context = a_arg;
dap_chain_ch_state_t l_cur_state = l_context->state;
if (l_cur_state == DAP_CHAIN_CH_STATE_WAITING)
return false;
if (l_cur_state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS && l_cur_state != DAP_CHAIN_CH_STATE_SYNC_CHAINS)
// Illegal context
goto context_delete;
dap_chain_ch_update_element_t *l_hashes = NULL;
if (l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_CHAINS) {
l_hashes = DAP_NEW_Z_SIZE(dap_chain_ch_update_element_t, s_update_pack_size * sizeof(dap_chain_ch_update_element_t));
if (!l_hashes) {
log_it(L_CRITICAL, g_error_memory_alloc);
l_context->state = DAP_CHAIN_CH_STATE_ERROR;
goto context_delete;
}
}
size_t l_data_size = 0;
bool l_chain_end = false, l_go_wait = false;
for (uint_fast16_t i = 0; i < s_update_pack_size; i++) {
if (!l_context->atom_iter->cur || !l_context->atom_iter->cur_size) {
l_chain_end = true;
break;
}
if (l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_CHAINS) {
l_hashes[i].hash = *l_context->atom_iter->cur_hash;
l_hashes[i].size = l_context->atom_iter->cur_size;
l_data_size += sizeof(dap_chain_ch_update_element_t);
} else { // l_cur_state == DAP_CHAIN_CH_STATE_SYNC_CHAINS
dap_chain_ch_hash_item_t *l_hash_item = NULL;
HASH_FIND(hh, l_context->remote_atoms, l_context->atom_iter->cur_hash, sizeof(dap_hash_fast_t), l_hash_item);
if (!l_hash_item) {
l_context->enqueued_data_size += l_context->atom_iter->cur_size;
if (l_context->enqueued_data_size > DAP_EVENTS_SOCKET_BUF_SIZE / 2) {
atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_WAITING);
l_context->prev_state = l_cur_state;
l_go_wait = true;
}
dap_chain_ch_pkt_write_mt(l_context->worker, l_context->ch_uuid, DAP_CHAIN_CH_PKT_TYPE_CHAIN_OLD,
l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id,
l_context->atom_iter->cur, l_context->atom_iter->cur_size);
debug_if(s_debug_legacy, L_INFO, "Out CHAIN pkt: atom hash %s (size %zd)", dap_hash_fast_to_str_static(l_context->atom_iter->cur_hash),
l_context->atom_iter->cur_size);
l_context->last_activity = dap_time_now();
l_context->stats_request_atoms_processed++;
} /* else // Over-extended debug
debug_if(s_debug_legacy, L_DEBUG, "Skip atom hash %s because its already present in remote atoms hash table",
dap_hash_fast_to_str_static(&l_context->atom_iter->cur_hash));
*/
}
l_context->atom_iter->chain->callback_atom_iter_get(l_context->atom_iter, DAP_CHAIN_ITER_OP_NEXT, NULL);
if (l_go_wait)
break;
}
if (l_hashes) {
dap_chain_ch_pkt_write_mt(l_context->worker, l_context->ch_uuid, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS,
l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id,
l_hashes, l_data_size);
debug_if(s_debug_legacy, L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS, %zu records", l_data_size / sizeof(dap_chain_ch_update_element_t));
DAP_DELETE(l_hashes);
} else if (l_context->last_activity + 3 < dap_time_now()) {
l_context->last_activity = dap_time_now();
debug_if(s_debug_more, L_INFO, "Send one chain no freeze packet");
dap_chain_ch_pkt_write_mt(l_context->worker, l_context->ch_uuid, DAP_CHAIN_CH_PKT_TYPE_CHAINS_NO_FREEZE,
l_context->request_hdr.net_id, l_context->request_hdr.chain_id,
l_context->request_hdr.cell_id, NULL, 0);
}
if (l_chain_end) {
dap_chain_ch_sync_request_old_t l_payload = { .node_addr = g_node_addr };
uint8_t l_type = l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_CHAINS ? DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_END
: DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS;
debug_if(s_debug_legacy, L_INFO, "Out: %s", dap_chain_ch_pkt_type_to_str(l_type));
dap_chain_ch_pkt_write_mt(l_context->worker, l_context->ch_uuid, l_type,
l_context->request_hdr.net_id, l_context->request_hdr.chain_id, l_context->request_hdr.cell_id,
&l_payload, sizeof(l_payload));
debug_if(l_cur_state == DAP_CHAIN_CH_STATE_UPDATE_CHAINS, L_INFO,
"Synchronized chain: items synchronized %" DAP_UINT64_FORMAT_U, l_context->stats_request_atoms_processed);
if (l_cur_state == DAP_CHAIN_CH_STATE_SYNC_CHAINS) {
if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_IDLE) ||
l_cur_state == DAP_CHAIN_CH_STATE_ERROR)
goto context_delete;
return false;
}
l_context->atom_iter->chain->callback_atom_iter_get(l_context->atom_iter, DAP_CHAIN_ITER_OP_FIRST, NULL);
if (atomic_compare_exchange_strong(&l_context->state, &l_cur_state, DAP_CHAIN_CH_STATE_SYNC_CHAINS_REMOTE) ||
l_cur_state == DAP_CHAIN_CH_STATE_WAITING)
return false;
goto context_delete;
}
return true;
context_delete:
dap_worker_exec_callback_on(l_context->worker->worker, s_legacy_sync_context_delete, l_context);
return false;
}
// *** End of legacy support code *** //
struct atom_processing_args {
dap_stream_node_addr_t addr;
bool ack_req;
byte_t data[];
};
/**
* @brief s_sync_in_chains_callback
* @param a_thread dap_proc_thread_t
* @param a_arg void
* @return
*/
static bool s_sync_in_chains_callback(void *a_arg)
{
assert(a_arg);
struct atom_processing_args *l_args = a_arg;
dap_chain_ch_pkt_t *l_chain_pkt = (dap_chain_ch_pkt_t *)l_args->data;
if (!l_chain_pkt->hdr.data_size) {
log_it(L_CRITICAL, "Proc thread received corrupted chain packet!");
return false;
}
dap_chain_atom_ptr_t l_atom = (dap_chain_atom_ptr_t)l_chain_pkt->data;
uint64_t l_atom_size = l_chain_pkt->hdr.data_size;
dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
if (!l_chain) {
debug_if(s_debug_more, L_WARNING, "No chain found for DAP_CHAIN_CH_PKT_TYPE_CHAIN");
DAP_DELETE(l_args);
return false;
}
char *l_atom_hash_str = NULL;
if (s_debug_more)
dap_get_data_hash_str_static(l_atom, l_atom_size, l_atom_hash_str);
dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom, l_atom_size);
bool l_ack_send = false;
switch (l_atom_add_res) {
case ATOM_PASS:
debug_if(s_debug_more, L_WARNING, "Atom with hash %s for %s:%s not accepted (code ATOM_PASS, already present)",
l_atom_hash_str, l_chain->net_name, l_chain->name);
l_ack_send = true;
break;
case ATOM_MOVE_TO_THRESHOLD:
debug_if(s_debug_more, L_INFO, "Thresholded atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name);
break;
case ATOM_ACCEPT:
debug_if(s_debug_more, L_INFO, "Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name);
if (dap_chain_atom_save(l_chain->cells, l_atom, l_atom_size, NULL) < 0)
log_it(L_ERROR, "Can't save atom %s to the file", l_atom_hash_str);
else
l_ack_send = true;
break;
case ATOM_REJECT: {
debug_if(s_debug_more, L_WARNING, "Atom with hash %s for %s:%s rejected", l_atom_hash_str, l_chain->net_name, l_chain->name);
break;
}
default:
log_it(L_CRITICAL, "Wtf is this ret code? %d", l_atom_add_res);
break;
}
if (l_ack_send && l_args->ack_req) {
uint64_t l_ack_num = (l_chain_pkt->hdr.num_hi << 16) | l_chain_pkt->hdr.num_lo;
dap_chain_ch_pkt_t *l_pkt = dap_chain_ch_pkt_new(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
&l_ack_num, sizeof(uint64_t));
dap_stream_ch_pkt_send_by_addr(&l_args->addr, DAP_CHAIN_CH_ID, DAP_CHAIN_CH_PKT_TYPE_CHAIN_ACK, l_pkt, dap_chain_ch_pkt_get_size(l_pkt));
DAP_DELETE(l_pkt);
debug_if(s_debug_more, L_DEBUG, "Out: CHAIN_ACK %s for net %s to destination " NODE_ADDR_FP_STR " with num %" DAP_UINT64_FORMAT_U,
l_chain ? l_chain->name : "(null)",
l_chain ? l_chain->net_name : "(null)",
NODE_ADDR_FP_ARGS_S(l_args->addr),
l_ack_num);
}
DAP_DELETE(l_args);
return false;
}
static void s_gossip_payload_callback(void *a_payload, size_t a_payload_size, dap_stream_node_addr_t a_sender_addr)
{
assert(a_payload && a_payload_size);
dap_chain_ch_pkt_t *l_chain_pkt = a_payload;
if (a_payload_size <= sizeof(dap_chain_ch_pkt_t) ||
a_payload_size != sizeof(dap_chain_ch_pkt_t) + l_chain_pkt->hdr.data_size) {
log_it(L_WARNING, "Incorrect chain GOSSIP packet size");
return;
}
struct atom_processing_args *l_args = DAP_NEW_SIZE(struct atom_processing_args, a_payload_size + sizeof(struct atom_processing_args));
if (!l_args) {
log_it(L_CRITICAL, g_error_memory_alloc);
return;
}
l_args->addr = a_sender_addr;
l_args->ack_req = false;
memcpy(l_args->data, a_payload, a_payload_size);
dap_proc_thread_callback_add(NULL, s_sync_in_chains_callback, l_args);
}
void dap_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, dap_chain_net_id_t a_net_id, dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, dap_chain_ch_error_type_t a_error)
{
dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(a_ch);
dap_return_if_fail(l_ch_chain);
const char *l_err_str = s_error_type_to_string(a_error);
dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_ERROR, a_net_id, a_chain_id, a_cell_id, l_err_str, strlen(l_err_str) + 1);
s_ch_chain_go_idle(l_ch_chain);
}
/**
* @brief s_stream_ch_packet_in
* @param a_ch
* @param a_arg
*/
static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
{
dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(a_ch);
if (!l_ch_chain || l_ch_chain->_inheritor != a_ch) {
log_it(L_ERROR, "No chain in channel, returning");
return false;
}
dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg;
if (l_ch_pkt->hdr.data_size < sizeof(dap_chain_ch_pkt_t)) {
log_it(L_ERROR, "Corrupted packet: too small size %u, smaller then header size %zu",
l_ch_pkt->hdr.data_size, sizeof(dap_chain_ch_pkt_t));
return false;
}
dap_chain_ch_pkt_t *l_chain_pkt = (dap_chain_ch_pkt_t *)l_ch_pkt->data;
size_t l_chain_pkt_data_size = l_ch_pkt->hdr.data_size - sizeof(l_chain_pkt->hdr);
if (!l_chain_pkt->hdr.version || l_chain_pkt->hdr.version > DAP_CHAIN_CH_PKT_VERSION_CURRENT) {
debug_if(s_debug_more, L_ATT, "Unsupported protocol version %d, current version %d",
l_chain_pkt->hdr.version, DAP_CHAIN_CH_PKT_VERSION_CURRENT);
return false;
}
if (l_chain_pkt->hdr.version >= 2 &&
l_chain_pkt_data_size != l_chain_pkt->hdr.data_size) {
log_it(L_WARNING, "Incorrect chain packet size %zu, expected %u",
l_chain_pkt_data_size, l_chain_pkt->hdr.data_size);
return false;
}
switch (l_ch_pkt->hdr.type) {
/* *** New synchronization protocol *** */
case DAP_CHAIN_CH_PKT_TYPE_ERROR: {
if (!l_chain_pkt_data_size || l_chain_pkt->data[l_chain_pkt_data_size - 1] != 0) {
log_it(L_WARNING, "Incorrect format with data size %zu in packet %s", l_chain_pkt_data_size,
dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type));
return false;
}
log_it(L_WARNING, "In: from remote addr %s chain id 0x%016" DAP_UINT64_FORMAT_x " got error on his side: '%s'",
DAP_STREAM_CH(l_ch_chain)->stream->esocket->remote_addr_str,
l_chain_pkt->hdr.chain_id.uint64, (char *)l_chain_pkt->data);
s_ch_chain_go_idle(l_ch_chain);
} break;
case DAP_CHAIN_CH_PKT_TYPE_CHAIN: {
if (!l_chain_pkt_data_size) {
log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size,
dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type));
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE);
return false;
}
dap_cluster_t *l_cluster = dap_cluster_find(dap_guuid_compose(l_chain_pkt->hdr.net_id.uint64, 0));
if (!l_cluster) {
log_it(L_WARNING, "Can't find cluster with ID 0x%" DAP_UINT64_FORMAT_X, l_chain_pkt->hdr.net_id.uint64);
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE);
return false;
}
dap_cluster_member_t *l_check = dap_cluster_member_find_unsafe(l_cluster, &a_ch->stream->node);
if (!l_check) {
log_it(L_WARNING, "Node with addr "NODE_ADDR_FP_STR" isn't a member of cluster %s",
NODE_ADDR_FP_ARGS_S(a_ch->stream->node), l_cluster->mnemonim);
return false;
}
struct atom_processing_args *l_args = DAP_NEW_SIZE(struct atom_processing_args, l_ch_pkt->hdr.data_size + sizeof(struct atom_processing_args));
if (!l_args) {
log_it(L_CRITICAL, g_error_memory_alloc);
break;
}
l_args->addr = a_ch->stream->node;
l_args->ack_req = true;
memcpy(l_args->data, l_chain_pkt, l_ch_pkt->hdr.data_size);
if (s_debug_more) {
char *l_atom_hash_str;
dap_get_data_hash_str_static(l_chain_pkt->data, l_chain_pkt_data_size, l_atom_hash_str);
log_it(L_INFO, "In: CHAIN pkt: atom hash %s (size %zd)", l_atom_hash_str, l_chain_pkt_data_size);
}
dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_in_chains_callback, l_args);
} break;
case DAP_CHAIN_CH_PKT_TYPE_CHAIN_REQ: {
if (l_chain_pkt_data_size != sizeof(dap_chain_ch_sync_request_t)) {
log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_CHAIN_REQ: Wrong chain packet size %zd when expected %zd",
l_chain_pkt_data_size, sizeof(dap_chain_ch_sync_request_t));
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE);
return false;
}
dap_chain_ch_pkt_set_version(DAP_CHAIN_CH_PKT_VERSION_CURRENT);
dap_chain_ch_sync_request_t *l_request = (dap_chain_ch_sync_request_t *)l_chain_pkt->data;
if (s_debug_more)
log_it(L_INFO, "In: CHAIN_REQ pkt: net 0x%016" DAP_UINT64_FORMAT_x " chain 0x%016" DAP_UINT64_FORMAT_x
" cell 0x%016" DAP_UINT64_FORMAT_x ", hash from %s, num from %" DAP_UINT64_FORMAT_U,
l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
dap_hash_fast_to_str_static(&l_request->hash_from), l_request->num_from);
if (l_ch_chain->sync_context || l_ch_chain->legacy_sync_context) {
log_it(L_WARNING, "Can't process CHAIN_REQ request cause already busy with synchronization");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS);
break;
}
dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
if (!l_chain) {
log_it(L_WARNING, "Not found chain id 0x%016" DAP_UINT64_FORMAT_x " with net id 0x%016" DAP_UINT64_FORMAT_x,
l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.net_id.uint64);
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_CHAIN_NOT_FOUND);
break;
}
if (!dap_link_manager_get_net_condition(l_chain_pkt->hdr.net_id.uint64)) {
log_it(L_WARNING, "Net id 0x%016" DAP_UINT64_FORMAT_x " is offline", l_chain_pkt->hdr.net_id.uint64);
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_NET_IS_OFFLINE);
break;
}
bool l_sync_from_begin = dap_hash_fast_is_blank(&l_request->hash_from);
dap_chain_atom_iter_t *l_iter = l_chain->callback_atom_iter_create(l_chain, l_chain_pkt->hdr.cell_id, l_sync_from_begin
? NULL : &l_request->hash_from);
if (!l_iter) {
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY);
break;
}
if (l_sync_from_begin)
l_chain->callback_atom_iter_get(l_iter, DAP_CHAIN_ITER_OP_FIRST, NULL);
bool l_missed_hash = false;
uint64_t l_last_num = l_chain->callback_count_atom(l_chain);
if (l_iter->cur) {
if (l_sync_from_begin ||
(l_request->num_from == l_iter->cur_num &&
l_last_num > l_iter->cur_num)) {
dap_chain_ch_summary_t l_sum = { .num_cur = l_iter->cur_num, .num_last = l_last_num };
dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_CHAIN_SUMMARY,
l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id,
l_chain_pkt->hdr.cell_id, &l_sum, sizeof(l_sum));
debug_if(s_debug_more, L_DEBUG, "Out: CHAIN_SUMMARY %s for net %s to destination " NODE_ADDR_FP_STR,
l_chain->name, l_chain->net_name, NODE_ADDR_FP_ARGS_S(a_ch->stream->node));
struct sync_context *l_context = DAP_NEW_Z(struct sync_context);
l_context->addr = a_ch->stream->node;
l_context->iter = l_iter;
l_context->net_id = l_chain_pkt->hdr.net_id;
l_context->chain_id = l_chain_pkt->hdr.chain_id;
l_context->cell_id = l_chain_pkt->hdr.cell_id;
l_context->num_last = l_sum.num_last;
l_context->last_activity = dap_time_now();
atomic_store_explicit(&l_context->state, SYNC_STATE_READY, memory_order_relaxed);
atomic_store(&l_context->allowed_num, l_sum.num_cur + s_sync_ack_window_size);
dap_stream_ch_uuid_t *l_uuid = DAP_DUP(&a_ch->uuid);
if (!l_uuid) {
log_it(L_CRITICAL, g_error_memory_alloc);
DAP_DELETE(l_context);
break;
}
l_ch_chain->sync_context = l_context;
dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_chain_iter_callback, l_context);
l_ch_chain->sync_timer = dap_timerfd_start_on_worker(a_ch->stream_worker->worker, 1000, s_sync_timer_callback, l_uuid);
break;
}
if (l_request->num_from < l_iter->cur_num || l_last_num > l_iter->cur_num)
l_missed_hash = true;
} else if (!l_sync_from_begin && l_last_num >= l_request->num_from) {
l_missed_hash = true;
debug_if(s_debug_more, L_WARNING, "Requested atom with hash %s not found", dap_hash_fast_to_str_static(&l_request->hash_from));
}
if (l_missed_hash) {
l_chain->callback_atom_iter_get(l_iter, DAP_CHAIN_ITER_OP_LAST, NULL);
dap_chain_ch_miss_info_t l_miss_info = { .missed_hash = l_request->hash_from,
.last_hash = *l_iter->cur_hash,
.last_num = l_iter->cur_num };
dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_CHAIN_MISS,
l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id,
l_chain_pkt->hdr.cell_id, &l_miss_info, sizeof(l_miss_info));
if (s_debug_more) {
char l_last_hash_str[DAP_HASH_FAST_STR_SIZE];
dap_hash_fast_to_str(&l_miss_info.last_hash, l_last_hash_str, DAP_HASH_FAST_STR_SIZE);
log_it(L_INFO, "Out: CHAIN_MISS %s for net %s to source " NODE_ADDR_FP_STR
" with hash missed %s, hash last %s and num last %" DAP_UINT64_FORMAT_U,
l_chain ? l_chain->name : "(null)",
l_chain ? l_chain->net_name : "(null)",
NODE_ADDR_FP_ARGS_S(a_ch->stream->node),
dap_hash_fast_to_str_static(&l_miss_info.missed_hash),
l_last_hash_str,
l_miss_info.last_num);
}
} else {
dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAIN,
l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id,
l_chain_pkt->hdr.cell_id, NULL, 0);
debug_if(s_debug_more, L_DEBUG, "Out: SYNCED_CHAIN %s for net %s to destination " NODE_ADDR_FP_STR,
l_chain ? l_chain->name : "(null)",
l_chain ? l_chain->net_name : "(null)",
NODE_ADDR_FP_ARGS_S(a_ch->stream->node));
}
l_chain->callback_atom_iter_delete(l_iter);
} break;
case DAP_CHAIN_CH_PKT_TYPE_CHAIN_SUMMARY: {
if (l_chain_pkt_data_size != sizeof(dap_chain_ch_summary_t)) {
log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_CHAIN_SUMMARY: Wrong chain packet size %zd when expected %zd",
l_chain_pkt_data_size, sizeof(dap_chain_ch_summary_t));
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE);
return false;
}
dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
dap_chain_ch_summary_t *l_sum = (dap_chain_ch_summary_t *)l_chain_pkt->data;
debug_if(s_debug_more, L_DEBUG, "In: CHAIN_SUMMARY of %s for net %s from source " NODE_ADDR_FP_STR
" with %" DAP_UINT64_FORMAT_U " atoms to sync from %" DAP_UINT64_FORMAT_U " to %" DAP_UINT64_FORMAT_U,
l_chain ? l_chain->name : "(null)",
l_chain ? l_chain->net_name : "(null)",
NODE_ADDR_FP_ARGS_S(a_ch->stream->node),
l_sum->num_last - l_sum->num_cur, l_sum->num_cur, l_sum->num_last);
} break;
case DAP_CHAIN_CH_PKT_TYPE_CHAIN_ACK: {
if (l_chain_pkt_data_size != sizeof(uint64_t)) {
log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_CHAIN_ACK: Wrong chain packet size %zd when expected %zd",
l_chain_pkt_data_size, sizeof(uint64_t));
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE);
return false;
}
uint64_t l_ack_num = *(uint64_t *)l_chain_pkt->data;
dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
debug_if(s_debug_more, L_DEBUG, "In: CHAIN_ACK %s for net %s from source " NODE_ADDR_FP_STR " with num %" DAP_UINT64_FORMAT_U,
l_chain ? l_chain->name : "(null)",
l_chain ? l_chain->net_name : "(null)",
NODE_ADDR_FP_ARGS_S(a_ch->stream->node),
l_ack_num);
struct sync_context *l_context = l_ch_chain->sync_context;
if (!l_context) {
log_it(L_WARNING, "CHAIN_ACK: No active sync context");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
break;
}
if (l_context->num_last == l_ack_num) {
dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAIN,
l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id,
l_chain_pkt->hdr.cell_id, NULL, 0);
s_ch_chain_go_idle(l_ch_chain);
break;
}
l_context->last_activity = dap_time_now();
if (atomic_load_explicit(&l_context->state, memory_order_relaxed) == SYNC_STATE_OVER)
break;
atomic_store_explicit(&l_context->allowed_num,
dap_min(l_ack_num + s_sync_ack_window_size, l_context->num_last),
memory_order_release);
if (atomic_exchange(&l_context->state, SYNC_STATE_READY) == SYNC_STATE_IDLE)
dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_chain_iter_callback, l_context);
} break;
case DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAIN: {
dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
log_it(L_INFO, "In: SYNCED_CHAIN %s for net %s from source " NODE_ADDR_FP_STR,
l_chain ? l_chain->name : "(null)",
l_chain ? l_chain->net_name : "(null)",
NODE_ADDR_FP_ARGS_S(a_ch->stream->node));
} break;
case DAP_CHAIN_CH_PKT_TYPE_CHAIN_MISS: {
if (l_chain_pkt_data_size != sizeof(dap_chain_ch_miss_info_t)) {
log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_CHAIN_MISS: Wrong chain packet size %zd when expected %zd",
l_chain_pkt_data_size, sizeof(dap_chain_ch_miss_info_t));
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE);
return false;
}
dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
dap_chain_ch_miss_info_t *l_miss_info = (dap_chain_ch_miss_info_t *)l_chain_pkt->data;
if (s_debug_more) {
char l_last_hash_str[DAP_HASH_FAST_STR_SIZE];
dap_hash_fast_to_str(&l_miss_info->last_hash, l_last_hash_str, DAP_HASH_FAST_STR_SIZE);
log_it(L_INFO, "In: CHAIN_MISS %s for net %s from source " NODE_ADDR_FP_STR
" with hash missed %s, hash last %s and num last %" DAP_UINT64_FORMAT_U,
l_chain ? l_chain->name : "(null)",
l_chain ? l_chain->net_name : "(null)",
NODE_ADDR_FP_ARGS_S(a_ch->stream->node),
dap_hash_fast_to_str_static(&l_miss_info->missed_hash),
l_last_hash_str,
l_miss_info->last_num);
}
// Will be processed upper in net packet notifier callback
} break;
default:
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_UNKNOWN_CHAIN_PKT_TYPE);
return false;
// }
//}
/* *** Legacy *** */
/// --- GDB update ---
// Request for gdbs list update
case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_REQ: {
if (l_chain_pkt_data_size != sizeof(dap_chain_ch_sync_request_old_t)) {
log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size,
dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type));
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE);
return false;
}
dap_cluster_t *l_net_cluster = dap_cluster_find(dap_guuid_compose(l_chain_pkt->hdr.net_id.uint64, 0));
if (!l_net_cluster || !l_net_cluster->mnemonim) {
log_it(L_WARNING, "Net id 0x%016" DAP_UINT64_FORMAT_x " not found", l_chain_pkt->hdr.net_id.uint64);
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_NET_INVALID_ID);
break;
}
dap_chain_ch_pkt_set_version(1);
if (!dap_link_manager_get_net_condition(l_chain_pkt->hdr.net_id.uint64)) {
log_it(L_WARNING, "Net id 0x%016" DAP_UINT64_FORMAT_x " is offline", l_chain_pkt->hdr.net_id.uint64);
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_NET_IS_OFFLINE);
break;
}
if (l_ch_chain->sync_context || l_ch_chain->legacy_sync_context) {
log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB request because its already busy with syncronization");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS);
break;
}
dap_global_db_legacy_list_t *l_db_list = dap_global_db_legacy_list_start(l_net_cluster->mnemonim);
if (!l_db_list) {
log_it(L_ERROR, "Can't create legacy DB list");
dap_global_db_legacy_list_delete(l_db_list);
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY);
break;
}
struct legacy_sync_context *l_context = s_legacy_sync_context_create(l_chain_pkt, a_ch);
if (!l_context) {
log_it(L_ERROR, "Can't create sychronization context");
dap_global_db_legacy_list_delete(l_db_list);
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY);
break;
}
l_context->is_type_of_gdb = true;
l_context->db_list = l_db_list;
l_context->remote_addr = *(dap_stream_node_addr_t *)l_chain_pkt->data;
l_context->request_hdr = l_chain_pkt->hdr;
l_ch_chain->legacy_sync_context = l_context;
l_context->state = DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB;
debug_if(s_debug_legacy, L_DEBUG, "Sync out gdb proc, requested %" DAP_UINT64_FORMAT_U " records from address " NODE_ADDR_FP_STR " (unverified)",
l_db_list->items_number, NODE_ADDR_FP_ARGS_S(l_context->remote_addr));
log_it(L_INFO, "In: UPDATE_GLOBAL_DB_REQ pkt: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x,
l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
debug_if(s_debug_legacy, L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_START");
dap_chain_ch_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_START,
l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id,
l_chain_pkt->hdr.cell_id, &g_node_addr, sizeof(dap_chain_node_addr_t));
dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_context);
} break;
// If requested - begin to recieve record's hashes
case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_START: {
struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context;
if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB_REMOTE) {
log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_START packet cause synchronization sequence violation");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
break;
}
debug_if(s_debug_legacy, L_INFO, "In: UPDATE_GLOBAL_DB_START pkt net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x,
l_context->request_hdr.net_id.uint64, l_context->request_hdr.chain_id.uint64, l_context->request_hdr.cell_id.uint64);
} break;
// Response with gdb element hashes and sizes
case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB: {
if (l_chain_pkt_data_size > sizeof(dap_chain_ch_update_element_t) * s_update_pack_size) {
log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size,
dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type));
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE);
return false;
}
debug_if(s_debug_legacy, L_INFO, "In: UPDATE_GLOBAL_DB pkt data_size=%zu", l_chain_pkt_data_size);
struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context;
if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB_REMOTE) {
log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB packet cause synchronization sequence violation");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
break;
}
l_context->last_activity = dap_time_now();
for (dap_chain_ch_update_element_t *l_element = (dap_chain_ch_update_element_t *)l_chain_pkt->data;
(size_t)((byte_t *)(l_element + 1) - l_chain_pkt->data) <= l_chain_pkt_data_size;
l_element++) {
dap_chain_ch_hash_item_t * l_hash_item = NULL;
unsigned l_hash_item_hashv;
HASH_VALUE(&l_element->hash, sizeof(l_element->hash), l_hash_item_hashv);
HASH_FIND_BYHASHVALUE(hh, l_context->remote_gdbs, &l_element->hash, sizeof(l_element->hash),
l_hash_item_hashv, l_hash_item);
if (!l_hash_item) {
l_hash_item = DAP_NEW_Z(dap_chain_ch_hash_item_t);
if (!l_hash_item) {
log_it(L_CRITICAL, "%s", g_error_memory_alloc);
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY);
break;
}
l_hash_item->hash = l_element->hash;
l_hash_item->size = l_element->size;
HASH_ADD_BYHASHVALUE(hh, l_context->remote_gdbs, hash, sizeof(l_hash_item->hash),
l_hash_item_hashv, l_hash_item);
//debug_if(s_debug_legacy, L_DEBUG, "In: Updated remote hash GDB list with %s", dap_chain_hash_fast_to_str_static(&l_hash_item->hash));
}
}
} break;
// End of response with GlobalDB hashes
case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END: {
if (l_chain_pkt_data_size != sizeof(dap_chain_ch_sync_request_old_t)) {
log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size,
dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type));
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE);
return false;
}
struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context;
if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB_REMOTE) {
log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_END packet cause synchronization sequence violation");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
break;
}
debug_if(s_debug_legacy, L_INFO, "In: UPDATE_GLOBAL_DB_END pkt with total count %d hashes", HASH_COUNT(l_context->remote_gdbs));
l_context->state = DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB;
debug_if(s_debug_legacy, L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB");
dap_chain_ch_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB,
l_context->request_hdr.net_id, l_context->request_hdr.chain_id,
l_context->request_hdr.cell_id, &g_node_addr, sizeof(dap_chain_node_addr_t));
dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_context);
} break;
// first packet of data with source node address
case DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB: {
if (l_chain_pkt_data_size != sizeof(dap_chain_node_addr_t)) {
log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size,
dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type));
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE);
return false;
}
struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context;
if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB_REMOTE) {
log_it(L_WARNING, "Can't process FIRST_GLOBAL_DB packet cause synchronization sequence violation");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
break;
}
debug_if(s_debug_legacy, L_INFO, "In: FIRST_GLOBAL_DB data_size=%zu net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x
" from address "NODE_ADDR_FP_STR "(unverified)", l_chain_pkt_data_size, l_context->request_hdr.net_id.uint64,
l_context->request_hdr.chain_id.uint64, l_context->request_hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(l_context->remote_addr));
} break;
// Dummy packet for freeze detection
case DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB_NO_FREEZE: {
struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context;
if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB_REMOTE) {
log_it(L_WARNING, "Can't process GLOBAL_DB_NO_FREEZE packet cause synchronization sequence violation");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
break;
}
debug_if(s_debug_legacy, L_DEBUG, "Global DB no freeze packet detected");
l_context->last_activity = dap_time_now();
} break;
case DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB: {
dap_global_db_pkt_old_t *l_pkt = (dap_global_db_pkt_old_t *)l_chain_pkt->data;
if (l_chain_pkt_data_size < sizeof(dap_global_db_pkt_old_t) ||
l_chain_pkt_data_size != sizeof(*l_pkt) + l_pkt->data_size) {
log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size,
dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type));
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE);
return false;
}
struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context;
if (l_context && l_context->state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB_REMOTE) {
log_it(L_WARNING, "Can't process GLOBAL_DB packet cause synchronization sequence violation");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
break;
}
if (l_context)
l_context->last_activity = dap_time_now();
debug_if(s_debug_legacy, L_INFO, "In: GLOBAL_DB_OLD data_size=%zu", l_chain_pkt_data_size);
// get records and save it to global_db
struct record_processing_args *l_args;
DAP_NEW_Z_RET_VAL(l_args, struct record_processing_args, true, NULL);
l_args->pkt = DAP_DUP_SIZE(l_pkt, l_chain_pkt_data_size);
if (!l_args->pkt) {
log_it(L_CRITICAL, g_error_memory_alloc);
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY);
break;
}
l_args->worker = a_ch->stream_worker;
l_args->uuid = a_ch->uuid;
l_args->hdr = l_chain_pkt->hdr;
dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_gdb_in_pkt_proc_callback, l_args);
} break;
case DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB: {
struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context;
if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB_REMOTE) {
log_it(L_WARNING, "Can't process SYNCED_GLOBAL_DB packet cause synchronization sequence violation");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
break;
}
debug_if(s_debug_legacy, L_INFO, "In: SYNCED_GLOBAL_DB: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x,
l_context->request_hdr.net_id.uint64, l_context->request_hdr.chain_id.uint64, l_context->request_hdr.cell_id.uint64);
// we haven't node client waitng, so reply to other side
l_context->state = DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB_REMOTE;
dap_chain_ch_sync_request_old_t l_request = { .node_addr = g_node_addr };
debug_if(s_debug_legacy, L_INFO, "Out: UPDATE_GLOBAL_DB_REQ pkt");
dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_REQ, l_context->request_hdr.net_id,
l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, &l_request, sizeof(l_request));
} break;
/// --- Chains update ---
// Request for atoms list update
case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_REQ: {
if (l_chain_pkt_data_size) { // Expected packet with no data
log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size,
dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type));
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE);
return false;
}
dap_chain_ch_pkt_set_version(1);
if (!dap_link_manager_get_net_condition(l_chain_pkt->hdr.net_id.uint64)) {
log_it(L_WARNING, "Net id 0x%016" DAP_UINT64_FORMAT_x " is offline", l_chain_pkt->hdr.net_id.uint64);
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_NET_IS_OFFLINE);
break;
}
if (l_ch_chain->sync_context || l_ch_chain->legacy_sync_context) {
log_it(L_WARNING, "Can't process UPDATE_CHAINS request because its already busy with syncronization");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS);
break;
}
dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
if (!l_chain) {
log_it(L_WARNING, "Requested chain not found");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_CHAIN_NOT_FOUND);
break;
}
dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain, l_chain_pkt->hdr.cell_id, NULL);
if (!l_atom_iter) {
log_it(L_ERROR, "Can't create legacy atom iterator");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY);
break;
}
struct legacy_sync_context *l_context = s_legacy_sync_context_create(l_chain_pkt, a_ch);
if (!l_context) {
log_it(L_ERROR, "Can't create sychronization context");
l_chain->callback_atom_iter_delete(l_atom_iter);
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY);
break;
}
l_chain->callback_atom_iter_get(l_atom_iter, DAP_CHAIN_ITER_OP_FIRST, NULL);
l_context->atom_iter = l_atom_iter;
l_context->remote_addr = *(dap_stream_node_addr_t *)l_chain_pkt->data;
l_context->request_hdr = l_chain_pkt->hdr;
l_ch_chain->legacy_sync_context = l_context;
l_context->state = DAP_CHAIN_CH_STATE_UPDATE_CHAINS;
debug_if(s_debug_legacy, L_DEBUG, "Sync out chains proc, requested chain %s for net %s from address " NODE_ADDR_FP_STR " (unverified)",
l_chain->name, l_chain->net_name, NODE_ADDR_FP_ARGS_S(l_context->remote_addr));
debug_if(s_debug_legacy, L_INFO, "In: UPDATE_CHAINS_REQ pkt: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x,
l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
debug_if(s_debug_legacy, L_INFO, "Out: UPDATE_CHAINS_START pkt: net %s chain %s cell 0x%016"DAP_UINT64_FORMAT_X, l_chain->name,
l_chain->net_name, l_chain_pkt->hdr.cell_id.uint64);
dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_START,
l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id,
l_chain_pkt->hdr.cell_id, NULL, 0);
dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_context);
} break;
// If requested - begin to send atom hashes
case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_START: {
struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context;
if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS_REMOTE) {
log_it(L_WARNING, "Can't process UPDATE_CHAINS_START packet cause synchronization sequence violation");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
break;
}
debug_if(s_debug_legacy, L_INFO, "In: UPDATE_CHAINS_START pkt net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x,
l_context->request_hdr.net_id.uint64, l_context->request_hdr.chain_id.uint64, l_context->request_hdr.cell_id.uint64);
} break;
// Response with atom hashes and sizes
case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS: {
if (l_chain_pkt_data_size > sizeof(dap_chain_ch_update_element_t) * s_update_pack_size) {
log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size,
dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type));
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE);
return false;
}
debug_if(s_debug_legacy, L_INFO, "In: UPDATE_CHAINS pkt data_size=%zu", l_chain_pkt_data_size);
struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context;
if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS_REMOTE) {
log_it(L_WARNING, "Can't process UPDATE_CHAINS packet cause synchronization sequence violation");
dap_stream_ch_write_error_unsafe(a_ch, l_context->request_hdr.net_id,
l_context->request_hdr.chain_id, l_context->request_hdr.cell_id,
DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
break;
}
l_context->last_activity = dap_time_now();
unsigned int l_count_added = 0;
unsigned int l_count_total = 0;
for (dap_chain_ch_update_element_t *l_element = (dap_chain_ch_update_element_t *)l_chain_pkt->data;
(size_t)((byte_t *)(l_element + 1) - l_chain_pkt->data) <= l_chain_pkt_data_size;
l_element++) {
dap_chain_ch_hash_item_t *l_hash_item = NULL;
unsigned l_hash_item_hashv;
HASH_VALUE(&l_element->hash, sizeof(l_element->hash), l_hash_item_hashv);
HASH_FIND_BYHASHVALUE(hh, l_context->remote_atoms, &l_element->hash, sizeof(l_element->hash),
l_hash_item_hashv, l_hash_item);
if (!l_hash_item) {
l_hash_item = DAP_NEW_Z(dap_chain_ch_hash_item_t);
if (!l_hash_item) {
log_it(L_CRITICAL, "%s", g_error_memory_alloc);
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY);
break;
}
l_hash_item->hash = l_element->hash;
l_hash_item->size = l_element->size;
HASH_ADD_BYHASHVALUE(hh, l_context->remote_atoms, hash, sizeof(l_hash_item->hash),
l_hash_item_hashv, l_hash_item);
l_count_added++;
//debug_if(s_debug_legacy, L_DEBUG, "In: Updated remote hash GDB list with %s", dap_chain_hash_fast_to_str_static(&l_hash_item->hash));
}
l_count_total++;
}
debug_if(s_debug_legacy, L_INFO, "In: Added %u from %u remote atom hash in list", l_count_added, l_count_total);
} break;
// End of response with chain hashes
case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_END: {
if (l_chain_pkt_data_size != sizeof(dap_chain_ch_sync_request_old_t)) {
log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size,
dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type));
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE);
return false;
}
struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context;
if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS_REMOTE) {
log_it(L_WARNING, "Can't process UPDATE_CHAINS_END packet cause synchronization sequence violation");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
break;
}
debug_if(s_debug_legacy, L_INFO, "In: UPDATE_CHAINS_END pkt with total count %d hashes", HASH_COUNT(l_context->remote_atoms));
l_context->state = DAP_CHAIN_CH_STATE_SYNC_CHAINS;
debug_if(s_debug_legacy, L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_FIRST_CHAIN");
dap_chain_ch_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_CHAIN_CH_PKT_TYPE_FIRST_CHAIN,
l_context->request_hdr.net_id, l_context->request_hdr.chain_id,
l_context->request_hdr.cell_id, &g_node_addr, sizeof(dap_chain_node_addr_t));
dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_context);
} break;
// first packet of data with source node address (legacy, unverified)
case DAP_CHAIN_CH_PKT_TYPE_FIRST_CHAIN: {
if (l_chain_pkt_data_size != sizeof(dap_chain_node_addr_t)) {
log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size,
dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type));
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE);
return false;
}
struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context;
if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_SYNC_CHAINS_REMOTE) {
log_it(L_WARNING, "Can't process FIRST_CHAIN packet cause synchronization sequence violation");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
break;
}
debug_if(s_debug_legacy, L_INFO, "In: FIRST_CHAIN data_size=%zu net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x
" from address "NODE_ADDR_FP_STR "(unverified)", l_chain_pkt_data_size, l_context->request_hdr.net_id.uint64,
l_context->request_hdr.chain_id.uint64, l_context->request_hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(l_context->remote_addr));
} break;
// Dummy packet for freeze detection
case DAP_CHAIN_CH_PKT_TYPE_CHAINS_NO_FREEZE: {
struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context;
if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_SYNC_CHAINS_REMOTE) {
log_it(L_WARNING, "Can't process CHAINS_NO_FREEZE packet cause synchronization sequence violation");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
break;
}
debug_if(s_debug_legacy, L_DEBUG, "Chains no freeze packet detected");
l_context->last_activity = dap_time_now();
} break;
case DAP_CHAIN_CH_PKT_TYPE_CHAIN_OLD: {
if (!l_chain_pkt_data_size) {
log_it(L_WARNING, "Incorrect data size %zu in packet %s", l_chain_pkt_data_size,
dap_chain_ch_pkt_type_to_str(l_ch_pkt->hdr.type));
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_LEGACY_PKT_DATA_SIZE);
return false;
}
struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context;
if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_SYNC_CHAINS_REMOTE) {
log_it(L_WARNING, "Can't process FIRST_CHAIN packet cause synchronization sequence violation");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
break;
}
debug_if(s_debug_legacy, L_INFO, "In: CHAIN_OLD data_size=%zu", l_chain_pkt_data_size);
struct atom_processing_args *l_args = DAP_NEW_Z_SIZE(struct atom_processing_args, l_ch_pkt->hdr.data_size + sizeof(struct atom_processing_args));
if (!l_args) {
log_it(L_CRITICAL, g_error_memory_alloc);
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY);
break;
}
l_chain_pkt->hdr.data_size = l_chain_pkt_data_size;
memcpy(l_args->data, l_chain_pkt, l_ch_pkt->hdr.data_size);
if (s_debug_more) {
char *l_atom_hash_str;
dap_get_data_hash_str_static(l_chain_pkt->data, l_chain_pkt_data_size, l_atom_hash_str);
log_it(L_INFO, "In: CHAIN_OLD pkt: atom hash %s (size %zd)", l_atom_hash_str, l_chain_pkt_data_size);
}
dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_in_chains_callback, l_args);
} break;
case DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS: {
struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context;
if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_SYNC_CHAINS_REMOTE) {
log_it(L_WARNING, "Can't process SYNCED_CHAINS packet cause synchronization sequence violation");
dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
break;
}
debug_if(s_debug_legacy, L_INFO, "In: SYNCED_CHAINS: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x,
l_context->request_hdr.net_id.uint64, l_context->request_hdr.chain_id.uint64, l_context->request_hdr.cell_id.uint64);
// we haven't node client waitng, so reply to other side
l_context->state = DAP_CHAIN_CH_STATE_UPDATE_CHAINS_REMOTE;
debug_if(s_debug_legacy, L_INFO, "Out: UPDATE_CHAINS_REQ pkt");
dap_chain_ch_sync_request_old_t l_request = { .node_addr = g_node_addr };
dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_REQ, l_context->request_hdr.net_id,
l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, &l_request, sizeof(l_request));
} break;
}
return true;
}
static bool s_sync_timer_callback(void *a_arg)
{
dap_worker_t *l_worker = dap_worker_get_current();
dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(l_worker), *(dap_stream_ch_uuid_t *)a_arg);
if (!l_ch) {
DAP_DELETE(a_arg);
return false;
}
dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(l_ch);
if (!l_ch_chain) {
log_it(L_ERROR, "Channel without chain, dump it");
DAP_DELETE(a_arg);
return false;
}
bool l_timer_break = false;
const char *l_err_str = s_error_type_to_string(DAP_CHAIN_CH_ERROR_SYNC_TIMEOUT);
if (l_ch_chain->sync_context) {
struct sync_context *l_context = l_ch_chain->sync_context;
if (l_context->last_activity + s_sync_timeout <= dap_time_now()) {
log_it(L_ERROR, "Sync timeout for node " NODE_ADDR_FP_STR " with net 0x%016" DAP_UINT64_FORMAT_x
" chain 0x%016" DAP_UINT64_FORMAT_x " cell 0x%016" DAP_UINT64_FORMAT_x,
NODE_ADDR_FP_ARGS_S(l_context->addr), l_context->net_id.uint64,
l_context->chain_id.uint64, l_context->cell_id.uint64);
dap_chain_ch_pkt_write_unsafe(l_ch, DAP_CHAIN_CH_PKT_TYPE_ERROR, l_context->net_id,
l_context->chain_id, l_context->cell_id, l_err_str, strlen(l_err_str) + 1);
l_timer_break = true;
}
} else if (l_ch_chain->legacy_sync_context) {
struct legacy_sync_context *l_context = l_ch_chain->legacy_sync_context;
if (l_context->last_activity + s_sync_timeout <= dap_time_now()) {
log_it(L_ERROR, "Sync timeout for node " NODE_ADDR_FP_STR " (unverified) with net 0x%016" DAP_UINT64_FORMAT_x
" chain 0x%016" DAP_UINT64_FORMAT_x " cell 0x%016" DAP_UINT64_FORMAT_x,
NODE_ADDR_FP_ARGS_S(l_context->remote_addr), l_context->request_hdr.net_id.uint64,
l_context->request_hdr.chain_id.uint64, l_context->request_hdr.cell_id.uint64);
dap_chain_ch_pkt_write_unsafe(l_ch, DAP_CHAIN_CH_PKT_TYPE_ERROR, l_context->request_hdr.net_id,
l_context->request_hdr.chain_id, l_context->request_hdr.cell_id, l_err_str, strlen(l_err_str) + 1);
l_timer_break = true;
}
} else
l_timer_break = true;
if (l_timer_break) {
l_ch_chain->sync_timer = NULL; // Preserve timer removing from s_ch_chain_go_idle()
s_ch_chain_go_idle(l_ch_chain);
DAP_DELETE(a_arg);
return false;
}
return true;
}
static bool s_chain_iter_callback(void *a_arg)
{
assert(a_arg);
struct sync_context *l_context = a_arg;
dap_chain_atom_iter_t *l_iter = l_context->iter;
assert(l_iter);
dap_chain_t *l_chain = l_iter->chain;
if (atomic_exchange(&l_context->state, SYNC_STATE_BUSY) == SYNC_STATE_OVER) {
atomic_store(&l_context->state, SYNC_STATE_OVER);
return false;
}
size_t l_atom_size = l_iter->cur_size;
dap_chain_atom_ptr_t l_atom = l_iter->cur;
uint32_t l_cycles_count = 0;
while (l_atom && l_atom_size) {
if (l_iter->cur_num > atomic_load_explicit(&l_context->allowed_num, memory_order_acquire))
break;
dap_chain_ch_pkt_t *l_pkt = dap_chain_ch_pkt_new(l_context->net_id, l_context->chain_id, l_context->cell_id,
l_atom, l_atom_size);
// For master format binary complience
l_pkt->hdr.num_lo = l_iter->cur_num & 0xFFFF;
l_pkt->hdr.num_hi = (l_iter->cur_num >> 16) & 0xFF;
dap_stream_ch_pkt_send_by_addr(&l_context->addr, DAP_CHAIN_CH_ID, DAP_CHAIN_CH_PKT_TYPE_CHAIN, l_pkt, dap_chain_ch_pkt_get_size(l_pkt));
DAP_DELETE(l_pkt);
debug_if(s_debug_more, L_DEBUG, "Out: CHAIN %s for net %s to destination " NODE_ADDR_FP_STR " with num %" DAP_UINT64_FORMAT_U
" hash %s and size %zu",
l_chain ? l_chain->name : "(null)",
l_chain ? l_chain->net_name : "(null)",
NODE_ADDR_FP_ARGS_S(l_context->addr),
l_iter->cur_num, dap_hash_fast_to_str_static(l_iter->cur_hash), l_iter->cur_size);
l_atom = l_chain->callback_atom_iter_get(l_iter, DAP_CHAIN_ITER_OP_NEXT, &l_atom_size);
if (!l_atom || !l_atom_size || l_iter->cur_num > l_context->num_last)
break;
if (atomic_exchange(&l_context->state, SYNC_STATE_BUSY) == SYNC_STATE_OVER) {
atomic_store(&l_context->state, SYNC_STATE_OVER);
return false;
}
if (++l_cycles_count >= s_sync_packets_per_thread_call)
return true;
}
uint16_t l_state = l_atom && l_atom_size && l_iter->cur_num <= l_context->num_last
? SYNC_STATE_IDLE : SYNC_STATE_OVER;
uint16_t l_prev_state = atomic_exchange(&l_context->state, l_state);
if (l_prev_state == SYNC_STATE_OVER && l_state != SYNC_STATE_OVER)
atomic_store(&l_context->state, SYNC_STATE_OVER);
if (l_prev_state == SYNC_STATE_READY) // Allowed num was changed since last state updating
return true;
return false;
}
static bool s_chain_iter_delete_callback(void *a_arg)
{
struct sync_context *l_context = a_arg;
assert(l_context->iter);
l_context->iter->chain->callback_atom_iter_delete(l_context->iter);
DAP_DELETE(l_context);
return false;
}
/**
* @brief s_ch_chain_go_idle
* @param a_ch_chain
*/
static void s_ch_chain_go_idle(dap_chain_ch_t *a_ch_chain)
{
debug_if(s_debug_more, L_INFO, "Going to chain's stream channel STATE_IDLE");
// New protocol
if (a_ch_chain->sync_context) {
atomic_store(&((struct sync_context *)a_ch_chain->sync_context)->state, SYNC_STATE_OVER);
dap_proc_thread_callback_add(DAP_STREAM_CH(a_ch_chain)->stream_worker->worker->proc_queue_input,
s_chain_iter_delete_callback, a_ch_chain->sync_context);
a_ch_chain->sync_context = NULL;
}
if (a_ch_chain->sync_timer) {
dap_timerfd_delete_unsafe(a_ch_chain->sync_timer);
a_ch_chain->sync_timer = NULL;
}
//}
// Legacy
if (a_ch_chain->legacy_sync_context) {
dap_chain_ch_state_t l_current_state = atomic_exchange(
&((struct legacy_sync_context *)a_ch_chain->legacy_sync_context)->state, DAP_CHAIN_CH_STATE_IDLE);
if (l_current_state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS &&
l_current_state != DAP_CHAIN_CH_STATE_SYNC_CHAINS &&
l_current_state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB &&
l_current_state != DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB)
// Context will not be removed from proc thread
s_legacy_sync_context_delete(a_ch_chain->legacy_sync_context);
a_ch_chain->legacy_sync_context = NULL;
}
}
static void s_stream_ch_io_complete(dap_events_socket_t *a_es, void *a_arg)
{
dap_return_if_fail(a_arg);
dap_stream_t *l_stream = dap_stream_get_from_es(a_es);
assert(l_stream);
dap_stream_ch_t *l_ch = dap_stream_ch_by_id_unsafe(l_stream, DAP_CHAIN_CH_ID);
assert(l_ch);
struct legacy_sync_context *l_context = DAP_CHAIN_CH(l_ch)->legacy_sync_context;
if (!l_context || l_context->state != DAP_CHAIN_CH_STATE_WAITING)
return;
if (l_context->prev_state == DAP_CHAIN_CH_STATE_UPDATE_CHAINS ||
l_context->prev_state == DAP_CHAIN_CH_STATE_SYNC_CHAINS) {
l_context->state = l_context->prev_state;
l_context->enqueued_data_size = 0;
dap_proc_thread_callback_add(l_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_context);
} else if (l_context->prev_state == DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB ||
l_context->prev_state == DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB) {
l_context->state = l_context->prev_state;
l_context->enqueued_data_size = 0;
dap_proc_thread_callback_add(l_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_context);
} else
log_it(L_ERROR, "Unexpected legacy sync context state %d", l_context->state);
}