/*
 * Authors:
 * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net>
 * Alexander Lysikov <alexander.lysikov@demlabs.net>
 * DeM Labs Inc.   https://demlabs.net
 * Kelvin Project https://github.com/kelvinblockchain
 * Copyright  (c) 2017-2018
 * All rights reserved.

 This file is part of DAP (Demlabs Application Protocol) the open source project

 DAP (Demlabs Application Protocol) is free software: you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
 the Free Software Foundation, either version 3 of the License, or
 (at your option) any later version.

 DAP is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 GNU General Public License for more details.

 You should have received a copy of the GNU General Public License
 along with any DAP based project.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdlib.h>
#include <stdio.h>
#include <stdlib.h>
#include <stddef.h>
#include <stdint.h>

#ifdef WIN32
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#include <pthread.h>
#endif

#include "dap_common.h"
#include "dap_strfuncs.h"
#include "dap_list.h"
#include "dap_config.h"
#include "dap_hash.h"
#include "dap_time.h"
#include "utlist.h"

#include "dap_worker.h"
#include "dap_events.h"
#include "dap_proc_thread.h"
#include "dap_client_pvt.h"

#include "dap_chain.h"
#include "dap_chain_datum.h"
#include "dap_chain_cs.h"
#include "dap_chain_cell.h"

#include "dap_global_db.h"

#include "dap_stream.h"
#include "dap_stream_pkt.h"
#include "dap_stream_worker.h"
#include "dap_stream_ch_pkt.h"
#include "dap_stream_ch.h"
#include "dap_stream_ch_proc.h"
#include "dap_chain_ch.h"
#include "dap_chain_ch_pkt.h"
#include "dap_stream_ch_gossip.h"

#define LOG_TAG "dap_chain_ch"

struct sync_request
{
    dap_worker_t * worker;
    dap_stream_ch_uuid_t ch_uuid;
    dap_chain_ch_sync_request_old_t request;
    dap_chain_ch_pkt_hdr_t request_hdr;
    dap_chain_pkt_item_t pkt;

    dap_chain_ch_hash_item_t *remote_atoms; // Remote atoms
    dap_chain_ch_hash_item_t *remote_gdbs; // Remote gdbs

    uint64_t stats_request_elemets_processed;
    int last_err;
    union{
        struct{
            //dap_db_log_list_t *db_log; //  db log
            dap_list_t *db_iter;
            char *sync_group;
        } gdb;
        struct{
            dap_chain_atom_iter_t *request_atom_iter;
        } chain;
    };
};

enum sync_context_state {
    SYNC_STATE_IDLE,
    SYNC_STATE_READY,
    SYNC_STATE_BUSY,
    SYNC_STATE_OVER
};

struct sync_context {
    atomic_uint_fast64_t allowed_num;
    atomic_uint_fast16_t state;
    dap_chain_atom_iter_t *iter;
    dap_stream_node_addr_t addr;
    dap_chain_net_id_t net_id;
    dap_chain_id_t chain_id;
    dap_chain_cell_id_t cell_id;
    uint64_t num_last;
    dap_time_t last_activity;
};

static void s_ch_chain_go_idle(dap_chain_ch_t *a_ch_chain);
static inline bool s_ch_chain_get_idle(dap_chain_ch_t *a_ch_chain) { return a_ch_chain->state == DAP_CHAIN_CH_STATE_IDLE; }

static void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg);
static void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg);
static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg);
static bool s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg);
static void s_stream_ch_io_complete(dap_events_socket_t *a_es, void *a_arg);

static bool s_sync_out_chains_proc_callback(void *a_arg);
static void s_sync_out_chains_last_worker_callback(dap_worker_t *a_worker, void *a_arg);
static void s_sync_out_chains_first_worker_callback(dap_worker_t *a_worker, void *a_arg);

static bool s_sync_out_gdb_proc_callback(void *a_arg);

static bool s_sync_in_chains_callback(void *a_arg);

static bool s_gdb_in_pkt_proc_callback(void *a_arg);
static bool s_gdb_in_pkt_proc_set_raw_callback(dap_global_db_instance_t *a_dbi,
                                               int a_rc, const char *a_group,
                                               const size_t a_values_total, const size_t a_values_count,
                                               dap_store_obj_t *a_values, void *a_arg);
static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_thread, void *a_arg);

static void s_stream_ch_chain_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id,
                                        uint64_t a_chain_id, uint64_t a_cell_id,
                                        const void * a_data, size_t a_data_size);
static void s_gossip_payload_callback(void *a_payload, size_t a_payload_size, dap_stream_node_addr_t a_sender_addr);
static bool s_chain_iter_callback(void *a_arg);
static bool s_chain_iter_delete_callback(void *a_arg);
static bool s_sync_timer_callback(void *a_arg);

static bool s_debug_more = false;
static uint32_t s_sync_timeout = 30;
static uint32_t s_sync_packets_per_thread_call = 10;
static uint32_t s_sync_ack_window_size = 100; // atoms

static uint_fast16_t s_update_pack_size=100; // Number of hashes packed into the one packet
static uint_fast16_t s_skip_in_reactor_count=50; // Number of hashes packed to skip in one reactor loop callback out packet

#ifdef  DAP_SYS_DEBUG

enum    {MEMSTAT$K_STM_CH_CHAIN, MEMSTAT$K_NR};
static  dap_memstat_rec_t   s_memstat [MEMSTAT$K_NR] = {
    {.fac_len = sizeof(LOG_TAG) - 1, .fac_name = {LOG_TAG}, .alloc_sz = sizeof(dap_chain_ch_t)},
};

#endif

static const char *s_error_type_to_string(dap_chain_ch_error_type_t a_error)
{
    switch (a_error) {
    case DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS:
        return "SYNC_REQUEST_ALREADY_IN_PROCESS";
    case DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE:
        return "INCORRECT_SYNC_SEQUENCE";
    case DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE:
        return "IVALID_PACKET_SIZE";
    case DAP_CHAIN_CH_ERROR_NET_INVALID_ID:
        return "INVALID_NET_ID";
    case DAP_CHAIN_CH_ERROR_CHAIN_NOT_FOUND:
        return "CHAIN_NOT_FOUND";
    case DAP_CHAIN_CH_ERROR_ATOM_NOT_FOUND:
        return "ATOM_NOT_FOUND";
    case DAP_CHAIN_CH_ERROR_UNKNOWN_CHAIN_PKT_TYPE:
        return "UNKNOWN_CHAIN_PACKET_TYPE";
    case DAP_CHAIN_CH_ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED:
        return "GLOBAL_DB_INTERNAL_SAVING_ERROR";
    case DAP_CHAIN_CH_ERROR_NET_IS_OFFLINE:
        return "NET_IS_OFFLINE";
    case DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY:
        return "OUT_OF_MEMORY";
    case DAP_CHAIN_CH_ERROR_INTERNAL:
        return "INTERNAL_ERROR";
    default:
        return "UNKNOWN_ERROR";
    }
}

/**
 * @brief dap_chain_ch_init
 * @return
 */
int dap_chain_ch_init()
{
    log_it(L_NOTICE, "Chains exchange channel initialized");
    dap_stream_ch_proc_add(DAP_CHAIN_CH_ID, s_stream_ch_new, s_stream_ch_delete, s_stream_ch_packet_in,
            s_stream_ch_packet_out);
    s_sync_timeout = dap_config_get_item_uint32_default(g_config, "chain", "sync_timeout", s_sync_timeout);
    s_sync_ack_window_size = dap_config_get_item_uint32_default(g_config, "chain", "sync_ack_window_size", s_sync_ack_window_size);
    s_sync_packets_per_thread_call = dap_config_get_item_int16_default(g_config, "chain", "pack_size", s_sync_packets_per_thread_call);
    s_debug_more = dap_config_get_item_bool_default(g_config, "chain", "debug_more", false);
#ifdef  DAP_SYS_DEBUG
    for (int i = 0; i < MEMSTAT$K_NR; i++)
        dap_memstat_reg(&s_memstat[i]);
#endif
    return dap_stream_ch_gossip_callback_add(DAP_CHAIN_CH_ID, s_gossip_payload_callback);
}

/**
 * @brief dap_chain_ch_deinit
 */
void dap_chain_ch_deinit()
{

}

/**
 * @brief s_stream_ch_new
 * @param a_ch
 * @param arg
 */
void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg)
{
    UNUSED(a_arg);
    if (!(a_ch->internal = DAP_NEW_Z(dap_chain_ch_t))) {
        log_it(L_CRITICAL, "Memory allocation error");
        return;
    };
    dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(a_ch);
    l_ch_chain->_inheritor = a_ch;
    a_ch->stream->esocket->callbacks.write_finished_callback = s_stream_ch_io_complete;

#ifdef  DAP_SYS_DEBUG
    atomic_fetch_add(&s_memstat[MEMSTAT$K_STM_CH_CHAIN].alloc_nr, 1);
#endif
    debug_if(s_debug_more, L_DEBUG, "[stm_ch_chain:%p] --- created chain:%p", a_ch, l_ch_chain);
}

/**
 * @brief s_stream_ch_delete
 * @param ch
 * @param arg
 */
static void s_stream_ch_delete(dap_stream_ch_t *a_ch, void *a_arg)
{
    UNUSED(a_arg);
    dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(a_ch);
    s_ch_chain_go_idle(l_ch_chain);
    debug_if(s_debug_more, L_DEBUG, "[stm_ch_chain:%p] --- deleted chain:%p", a_ch, l_ch_chain);
    DAP_DEL_Z(a_ch->internal);

#ifdef  DAP_SYS_DEBUG
    atomic_fetch_add(&s_memstat[MEMSTAT$K_STM_CH_CHAIN].free_nr, 1);
#endif
}

/**
 * @brief s_stream_ch_chain_delete
 * @param a_ch_chain
 */
static void s_sync_request_delete(struct sync_request * a_sync_request)
{
    if (!a_sync_request) {
        //already NULL'ed
        return;
    }
    if (a_sync_request->pkt.pkt_data) {
        DAP_DEL_Z(a_sync_request->pkt.pkt_data);
    }

    if (a_sync_request->gdb.db_iter) {
        a_sync_request->gdb.db_iter = dap_list_first( a_sync_request->gdb.db_iter);
        dap_list_free_full( a_sync_request->gdb.db_iter, NULL);
        a_sync_request->gdb.db_iter = NULL;
    }
    DAP_DEL_Z(a_sync_request);
}

/**
 * @brief s_sync_out_chains_worker_callback
 * @param a_worker
 * @param a_arg
 */
static void s_sync_out_chains_first_worker_callback(dap_worker_t *a_worker, void *a_arg)
{
    struct sync_request * l_sync_request = (struct sync_request *) a_arg;
    dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe( DAP_STREAM_WORKER(a_worker) , l_sync_request->ch_uuid);
    if( l_ch == NULL ){
        log_it(L_INFO,"Client disconnected before we sent the reply");
        s_sync_request_delete(l_sync_request);
        return;
    }

    dap_chain_ch_t * l_ch_chain = DAP_CHAIN_CH(l_ch);
    if (!l_ch_chain) {
        log_it(L_CRITICAL, "Channel without chain, dump it");
        s_sync_request_delete(l_sync_request);
        return;
    }
    if (l_ch_chain->state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS_REMOTE) {
        log_it(L_INFO, "Timeout fired before we sent the reply");
        s_sync_request_delete(l_sync_request);
        return;
    }

    l_ch_chain->state = DAP_CHAIN_CH_STATE_SYNC_CHAINS;
    l_ch_chain->request_atom_iter = l_sync_request->chain.request_atom_iter;

    if (s_debug_more )
        log_it(L_INFO,"Out: DAP_CHAIN_CH_PKT_TYPE_FIRST_CHAIN");

    dap_chain_ch_pkt_write_unsafe(l_ch, DAP_CHAIN_CH_PKT_TYPE_FIRST_CHAIN,
            l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
            l_ch_chain->request_hdr.cell_id.uint64, &g_node_addr, sizeof(dap_chain_node_addr_t));
    DAP_DELETE(l_sync_request);

}

/**
 * @brief s_sync_out_chains_last_worker_callback
 * @param a_worker
 * @param a_arg
 */
static void s_sync_out_chains_last_worker_callback(dap_worker_t *a_worker, void *a_arg)
{
    struct sync_request * l_sync_request = (struct sync_request *) a_arg;
    dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(a_worker), l_sync_request->ch_uuid);
    if( l_ch == NULL ){
        log_it(L_INFO,"Client disconnected before we sent the reply");
        s_sync_request_delete(l_sync_request);
        return;
    }

    dap_chain_ch_t * l_ch_chain = DAP_CHAIN_CH(l_ch);
    if (!l_ch_chain) {
        log_it(L_CRITICAL, "Channel without chain, dump it");
        s_sync_request_delete(l_sync_request);
        return;
    }
    l_ch_chain->request_atom_iter = l_sync_request->chain.request_atom_iter;
    // last packet
    dap_chain_ch_sync_request_old_t l_request = {};
    if (s_debug_more )
        log_it(L_INFO,"Out: DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS");
    dap_chain_ch_pkt_write_unsafe(l_ch, DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS,
            l_sync_request->request_hdr.net_id.uint64, l_sync_request->request_hdr.chain_id.uint64,
            l_sync_request->request_hdr.cell_id.uint64, &l_request, sizeof(l_request));
    s_ch_chain_go_idle(l_ch_chain);
    DAP_DELETE(l_sync_request);
}
/**
 * @brief s_sync_chains_callback
 * @param a_thread
 * @param a_arg
 * @return
 */
static bool s_sync_out_chains_proc_callback(void *a_arg)
{
    struct sync_request * l_sync_request = (struct sync_request *) a_arg;

    dap_chain_t * l_chain = dap_chain_find_by_id(l_sync_request->request_hdr.net_id, l_sync_request->request_hdr.chain_id);
    assert(l_chain);
    l_sync_request->chain.request_atom_iter = l_chain->callback_atom_iter_create(l_chain, l_sync_request->request_hdr.cell_id, NULL);
    size_t l_first_size = 0;
    dap_chain_atom_ptr_t l_atom = l_chain->callback_atom_iter_get(l_sync_request->chain.request_atom_iter, DAP_CHAIN_ITER_OP_FIRST, &l_first_size);
    if (l_atom && l_first_size) {
        // first packet
        dap_chain_hash_fast_t l_hash_from = l_sync_request->request.hash_from;
        if (!dap_hash_fast_is_blank(&l_hash_from)) {
            (void ) l_chain->callback_atom_find_by_hash(l_sync_request->chain.request_atom_iter,
                                                          &l_hash_from, &l_first_size);
        }
         dap_worker_exec_callback_on(dap_events_worker_get(l_sync_request->worker->id), s_sync_out_chains_first_worker_callback, l_sync_request );
    } else {
         dap_worker_exec_callback_on(dap_events_worker_get(l_sync_request->worker->id),s_sync_out_chains_last_worker_callback, l_sync_request );
    }
    return false;
}


/**
 * @brief s_sync_out_gdb_first_gdb_worker_callback
 * @param a_worker
 * @param a_arg
 */
static void s_sync_out_gdb_first_worker_callback(dap_worker_t *a_worker, void *a_arg)
{
    struct sync_request *l_sync_request = (struct sync_request *) a_arg;

    dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(l_sync_request->worker), l_sync_request->ch_uuid);
    if( l_ch == NULL ){
        log_it(L_INFO,"Client disconnected before we sent the reply");
        s_sync_request_delete(l_sync_request);
        return;
    }

    dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH( l_ch );

    if (!l_ch_chain) {
        log_it(L_CRITICAL, "Channel without chain, dump it");
        s_sync_request_delete(l_sync_request);
        return;
    }

    if (l_ch_chain->state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB_REMOTE) {
        log_it(L_INFO, "Timeout fired before we sent the reply");
        s_sync_request_delete(l_sync_request);
        return;
    }

    // Add it to outgoing list
    l_ch_chain->state = DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB;
    if (s_debug_more )
        log_it(L_INFO,"Out: DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB");
    dap_chain_ch_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB,
            l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
            l_ch_chain->request_hdr.cell_id.uint64, &g_node_addr, sizeof(dap_chain_node_addr_t));

    if( a_worker){ // We send NULL to prevent delete
        s_sync_request_delete(l_sync_request);
    }
}

/**
 * @brief s_sync_out_gdb_synced_data_worker_callback
 * @param a_worker
 * @param a_arg
 */
static void s_sync_out_gdb_last_worker_callback(dap_worker_t *a_worker, void *a_arg)
{
    struct sync_request * l_sync_request = (struct sync_request *) a_arg;
    dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(a_worker), l_sync_request->ch_uuid);
    if( l_ch == NULL ){
        log_it(L_INFO,"Client disconnected before we sent the reply");
        s_sync_request_delete(l_sync_request);
        return;
    }

    dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH( l_ch );
    if (!l_ch_chain) {
        log_it(L_CRITICAL, "Channel without chain, dump it");
        s_sync_request_delete(l_sync_request);
        return;
    }
    s_sync_out_gdb_first_worker_callback(NULL,a_arg); // NULL to say callback not to delete request

    if (s_debug_more )
        log_it(L_INFO,"Out: DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB");
    dap_chain_ch_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB,
                                         l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
                                         l_ch_chain->request_hdr.cell_id.uint64, NULL, 0);
    s_ch_chain_go_idle(l_ch_chain);
    s_sync_request_delete(l_sync_request);
}

/**
 * @brief s_sync_out_gdb_callback
 * @param a_thread
 * @param a_arg
 * @return
 */
static bool s_sync_out_gdb_proc_callback(void *a_arg)
{
    /*
    struct sync_request *l_sync_request = (struct sync_request *)a_arg;
    dap_chain_net_t *l_net = dap_chain_net_by_id(l_sync_request->request_hdr.net_id);
    dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(l_sync_request->worker), l_sync_request->ch_uuid);
    if (l_ch == NULL) {
        log_it(L_INFO, "Client disconnected before we sent the reply");
        s_sync_request_delete(l_sync_request);
        return true;
    }
    dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(l_ch);
    if (!l_ch_chain) {
        log_it(L_CRITICAL, "Channel without chain, dump it");
        s_sync_request_delete(l_sync_request);
        return true;
    }
    int l_flags = 0;
    if (dap_chain_net_get_extra_gdb_group(l_net, l_sync_request->request.node_addr))
        l_flags |= F_DB_LOG_ADD_EXTRA_GROUPS;
    if (!l_sync_request->request.id_start)
        l_flags |= F_DB_LOG_SYNC_FROM_ZERO;
    if (l_ch_chain->request_db_log != NULL)
        dap_db_log_list_delete(l_ch_chain->request_db_log);
    l_ch_chain->request_db_log = dap_db_log_list_start(l_net->pub.name, l_sync_request->request.node_addr.uint64, l_flags);

    if (l_ch_chain->request_db_log) {
        if (s_debug_more)
            log_it(L_DEBUG, "Sync out gdb proc, requested %"DAP_UINT64_FORMAT_U" records from address "NODE_ADDR_FP_STR,
                             l_ch_chain->request_db_log->items_number, NODE_ADDR_FP_ARGS_S(l_sync_request->request.node_addr));
        l_sync_request->gdb.db_log = l_ch_chain->request_db_log;
         dap_worker_exec_callback_on(dap_events_worker_get(l_sync_request->worker->id), s_sync_out_gdb_first_worker_callback, l_sync_request );
    } else {
         dap_worker_exec_callback_on(dap_events_worker_get(l_sync_request->worker->id), s_sync_out_gdb_last_worker_callback, l_sync_request );
    } */
    return false;
}

static void s_sync_update_gdb_start_worker_callback(dap_worker_t *a_worker, void *a_arg)
{
    struct sync_request *l_sync_request = (struct sync_request *) a_arg;

    dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(a_worker), l_sync_request->ch_uuid);
    if( l_ch == NULL ){
        log_it(L_INFO,"Client disconnected before we sent the reply");
        s_sync_request_delete(l_sync_request);
        return;
    }
    dap_chain_ch_pkt_write_unsafe(l_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_START,
                                         l_sync_request->request_hdr.net_id.uint64, l_sync_request->request_hdr.chain_id.uint64,
                                         l_sync_request->request_hdr.cell_id.uint64, NULL, 0);
    if (s_debug_more)
        log_it(L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_START for net_id 0x%016"DAP_UINT64_FORMAT_x" "
                       "chain_id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x"",
                       l_sync_request->request_hdr.net_id.uint64, l_sync_request->request_hdr.chain_id.uint64, l_sync_request->request_hdr.cell_id.uint64);
    DAP_DELETE(l_sync_request);
}

static bool s_sync_update_gdb_proc_callback(void *a_arg)
{
    /*
    struct sync_request *l_sync_request = (struct sync_request *)a_arg;
    log_it(L_DEBUG, "Prepare request to gdb sync from %s", l_sync_request->request.id_start ? "last sync" : "zero");
    dap_chain_net_t *l_net = dap_chain_net_by_id(l_sync_request->request_hdr.net_id);
    if (!l_net) {
        log_it(L_ERROR, "Network ID 0x%016"DAP_UINT64_FORMAT_x" not found", l_sync_request->request_hdr.net_id.uint64);
        DAP_DELETE(l_sync_request);
        return true;
    }
    dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(l_sync_request->worker), l_sync_request->ch_uuid);
    if (!l_ch) {
        log_it(L_INFO, "Client disconnected before we sent the reply");
        DAP_DELETE(l_sync_request);
        return true;
    } else if (!DAP_CHAIN_CH(l_ch)) {
        log_it(L_CRITICAL, "Channel without chain, dump it");
        DAP_DELETE(l_sync_request);
        return true;
    }

    dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(l_ch);
    int l_flags = 0;
    if (dap_chain_net_get_extra_gdb_group(l_net, l_sync_request->request.node_addr))
        l_flags |= F_DB_LOG_ADD_EXTRA_GROUPS;
    if (!l_sync_request->request.id_start)
        l_flags |= F_DB_LOG_SYNC_FROM_ZERO;
    if (l_ch_chain->request_db_log != NULL)
        dap_db_log_list_delete(l_ch_chain->request_db_log);
    l_ch_chain->request_db_log = dap_db_log_list_start(l_net->pub.name, l_sync_request->request.node_addr.uint64, l_flags);
    l_ch_chain->state = DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB;
    l_sync_request->gdb.db_log = l_ch_chain->request_db_log;
    l_sync_request->request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
     dap_worker_exec_callback_on(dap_events_worker_get(l_sync_request->worker->id), s_sync_update_gdb_start_worker_callback, l_sync_request);
     */
    return false;
}

static bool s_gdb_in_pkt_proc_callback(void *a_arg)
{
    return false;
}

struct atom_processing_args {
    dap_stream_node_addr_t addr;
    bool ack_req;
    byte_t data[];
};

/**
 * @brief s_sync_in_chains_callback
 * @param a_thread dap_proc_thread_t
 * @param a_arg void
 * @return
 */
static bool s_sync_in_chains_callback(void *a_arg)
{
    assert(a_arg);
    struct atom_processing_args *l_args = a_arg;
    dap_chain_ch_pkt_t *l_chain_pkt = (dap_chain_ch_pkt_t *)l_args->data;
    if (!l_chain_pkt->hdr.data_size) {
        log_it(L_CRITICAL, "Proc thread received corrupted chain packet!");
        return false;
    }
    dap_chain_atom_ptr_t l_atom = (dap_chain_atom_ptr_t)l_chain_pkt->data;
    uint64_t l_atom_size = l_chain_pkt->hdr.data_size;
    dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
    if (!l_chain) {
        debug_if(s_debug_more, L_WARNING, "No chain found for DAP_CHAIN_CH_PKT_TYPE_CHAIN");
        DAP_DELETE(l_args);
        return false;
    }
    char *l_atom_hash_str = NULL;
    if (s_debug_more)
        dap_get_data_hash_str_static(l_atom, l_atom_size, l_atom_hash_str);
    dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom, l_atom_size);
    bool l_ack_send = false;
    switch (l_atom_add_res) {
    case ATOM_PASS:
        debug_if(s_debug_more, L_WARNING, "Atom with hash %s for %s:%s not accepted (code ATOM_PASS, already present)",
                                                l_atom_hash_str, l_chain->net_name, l_chain->name);
        l_ack_send = true;
        break;
    case ATOM_MOVE_TO_THRESHOLD:
        debug_if(s_debug_more, L_INFO, "Thresholded atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name);
        break;
    case ATOM_ACCEPT:
        debug_if(s_debug_more, L_INFO,"Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name);
        if (dap_chain_atom_save(l_chain->cells, l_atom, l_atom_size, NULL) < 0)
            log_it(L_ERROR, "Can't save atom %s to the file", l_atom_hash_str);
        else
            l_ack_send = true;
        break;
    case ATOM_REJECT: {
        debug_if(s_debug_more, L_WARNING, "Atom with hash %s for %s:%s rejected", l_atom_hash_str, l_chain->net_name, l_chain->name);
        break;
    }
    default:
        log_it(L_CRITICAL, "Wtf is this ret code? %d", l_atom_add_res);
        break;
    }
    if (l_ack_send && l_args->ack_req) {
        uint64_t l_ack_num = (l_chain_pkt->hdr.num_hi << 16) | l_chain_pkt->hdr.num_lo;
        dap_chain_ch_pkt_t *l_pkt = dap_chain_ch_pkt_new(l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                                                         &l_ack_num, sizeof(uint64_t));
        dap_stream_ch_pkt_send_by_addr(&l_args->addr, DAP_CHAIN_CH_ID, DAP_CHAIN_CH_PKT_TYPE_CHAIN_ACK, l_pkt, dap_chain_ch_pkt_get_size(l_pkt));
        DAP_DELETE(l_pkt);
        debug_if(s_debug_more, L_DEBUG, "Out: CHAIN_ACK %s for net %s to destination " NODE_ADDR_FP_STR " with num %" DAP_UINT64_FORMAT_U,
                                l_chain ? l_chain->name : "(null)",
                                            l_chain ? l_chain->net_name : "(null)",
                                                            NODE_ADDR_FP_ARGS_S(l_args->addr),
                                l_ack_num);
    }
    DAP_DELETE(l_args);
    return false;
}

static void s_gossip_payload_callback(void *a_payload, size_t a_payload_size, dap_stream_node_addr_t a_sender_addr)
{
    assert(a_payload && a_payload_size);
    dap_chain_ch_pkt_t *l_chain_pkt = a_payload;
    if (a_payload_size <= sizeof(dap_chain_ch_pkt_t) ||
            a_payload_size != sizeof(dap_chain_ch_pkt_t) + l_chain_pkt->hdr.data_size) {
        log_it(L_WARNING, "Incorrect chain GOSSIP packet size");
        return;
    }
    struct atom_processing_args *l_args = DAP_NEW_SIZE(struct atom_processing_args, a_payload_size + sizeof(struct atom_processing_args));
    if (!l_args) {
        log_it(L_CRITICAL, g_error_memory_alloc);
        return;
    }
    l_args->addr = a_sender_addr;
    l_args->ack_req = false;
    memcpy(l_args->data, a_payload, a_payload_size);
    dap_proc_thread_callback_add(NULL, s_sync_in_chains_callback, l_args);
}

/**
 * @brief s_gdb_in_pkt_error_worker_callback
 * @param a_thread
 * @param a_arg
 */
static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_worker, void *a_arg)
{
    struct sync_request *l_sync_request = (struct sync_request *) a_arg;

    dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(a_worker), l_sync_request->ch_uuid);
    if (l_ch == NULL)
        log_it(L_INFO,"Client disconnected before we sent the reply");
    else
        dap_stream_ch_write_error_unsafe(l_ch, l_sync_request->request_hdr.net_id.uint64,
                                           l_sync_request->request_hdr.chain_id.uint64,
                                           l_sync_request->request_hdr.cell_id.uint64,
                                           DAP_CHAIN_CH_ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED);
    DAP_DELETE(l_sync_request);
}

/**
 * @brief dap_chain_ch_create_sync_request_gdb
 * @param a_ch_chain
 * @param a_net
 */
struct sync_request *dap_chain_ch_create_sync_request(dap_chain_ch_pkt_t *a_chain_pkt, dap_stream_ch_t* a_ch)
{
    dap_chain_ch_t * l_ch_chain = DAP_CHAIN_CH(a_ch);
    if (!l_ch_chain) {
        log_it(L_CRITICAL, "Channel without chain, dump it");
        return NULL;
    }
    struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request);
    if (!l_sync_request) {
        log_it(L_CRITICAL, "Memory allocation error");
        return NULL;
    }
    *l_sync_request = (struct sync_request) {
            .worker         = a_ch->stream_worker->worker,
            .ch_uuid        = a_ch->uuid,
            .request        = l_ch_chain->request,
            .request_hdr    = a_chain_pkt->hdr,
            .remote_atoms   = l_ch_chain->remote_atoms,
            .remote_gdbs    = l_ch_chain->remote_gdbs };
    return l_sync_request;
}

void dap_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, dap_chain_ch_error_type_t a_error)
{
    dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(a_ch);
    if (!l_ch_chain) {
        log_it(L_CRITICAL, "Channel without chain, dump it");
        return;
    }
    s_ch_chain_go_idle(l_ch_chain);
    dap_chain_ch_pkt_write_error_unsafe(a_ch, a_net_id, a_chain_id, a_cell_id, "%s", s_error_type_to_string(a_error));
}

static bool s_chain_timer_callback(void *a_arg)
{
    dap_worker_t *l_worker = dap_worker_get_current();
    dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(l_worker), *(dap_stream_ch_uuid_t*)a_arg);
    if (!l_ch) {
        DAP_DELETE(a_arg);
        return false;
    }
    dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(l_ch);
    if (!l_ch_chain) {
        log_it(L_CRITICAL, "Channel without chain, dump it");
        DAP_DELETE(a_arg);
        return false;
    }
    if (l_ch_chain->timer_shots++ >= DAP_SYNC_TICKS_PER_SECOND * s_sync_timeout) {
        if (!s_ch_chain_get_idle(l_ch_chain))
            s_ch_chain_go_idle(l_ch_chain);
        DAP_DELETE(a_arg);
        l_ch_chain->activity_timer = NULL;
        return false;
    }
    if (l_ch_chain->state != DAP_CHAIN_CH_STATE_WAITING && l_ch_chain->sent_breaks) {
        s_stream_ch_packet_out(l_ch, a_arg);
        if (l_ch_chain->activity_timer == NULL)
            return false;
    }
    // Sending dumb packet with nothing to inform remote thats we're just skiping atoms of GDB's, nothing freezed
    if (l_ch_chain->state == DAP_CHAIN_CH_STATE_SYNC_CHAINS && l_ch_chain->sent_breaks >= 3 * DAP_SYNC_TICKS_PER_SECOND) {
        debug_if(s_debug_more, L_INFO, "Send one chain TSD packet");
        dap_chain_ch_pkt_write_unsafe(l_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_TSD,
                                             l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
                                             l_ch_chain->request_hdr.cell_id.uint64, NULL, 0);
        l_ch_chain->sent_breaks = 0;
        l_ch_chain->timer_shots = 0;
    }
    return true;
}

static void s_chain_timer_reset(dap_chain_ch_t *a_ch_chain)
{
    a_ch_chain->timer_shots = 0;
    if (!a_ch_chain->activity_timer)
        dap_chain_ch_timer_start(a_ch_chain);
}

void dap_chain_ch_timer_start(dap_chain_ch_t *a_ch_chain)
{
    dap_stream_ch_uuid_t *l_uuid = DAP_DUP(&DAP_STREAM_CH(a_ch_chain)->uuid);
    a_ch_chain->activity_timer = dap_timerfd_start_on_worker(DAP_STREAM_CH(a_ch_chain)->stream_worker->worker,
                                                             1000 / DAP_SYNC_TICKS_PER_SECOND,
                                                             s_chain_timer_callback, (void *)l_uuid);
    a_ch_chain->sent_breaks = 0;
}

/**
 * @brief s_stream_ch_packet_in
 * @param a_ch
 * @param a_arg
 */
static bool s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
{
    dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(a_ch);
    if (!l_ch_chain || l_ch_chain->_inheritor != a_ch) {
        log_it(L_ERROR, "No chain in channel, returning");
        return false;
    }
    dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg;
    if (l_ch_pkt->hdr.data_size < sizeof(dap_chain_ch_pkt_t)) {
        log_it(L_ERROR, "Corrupted packet: too small size %u, smaller then header size %zu",
                                            l_ch_pkt->hdr.data_size, sizeof(dap_chain_ch_pkt_t));
        return false;
    }

    dap_chain_ch_pkt_t *l_chain_pkt = (dap_chain_ch_pkt_t *)l_ch_pkt->data;
    size_t l_chain_pkt_data_size = l_ch_pkt->hdr.data_size - sizeof(l_chain_pkt->hdr);

    if (l_chain_pkt->hdr.version > DAP_CHAIN_CH_PKT_VERSION) {
        debug_if(s_debug_more, L_ATT, "Unsupported protocol version %d, current version %d",
                 l_chain_pkt->hdr.version, DAP_CHAIN_CH_PKT_VERSION);
        return false;
    }
    if (l_chain_pkt->hdr.version >= 2 &&
                l_chain_pkt_data_size != l_chain_pkt->hdr.data_size) {
        log_it(L_WARNING, "Incorrect chain packet size %zu, expected %u",
                            l_chain_pkt_data_size, l_chain_pkt->hdr.data_size);
        return false;
    }

    s_chain_timer_reset(l_ch_chain);

    switch (l_ch_pkt->hdr.type) {

    /* *** New synchronization protocol *** */

    case DAP_CHAIN_CH_PKT_TYPE_ERROR:{
        char * l_error_str = (char*)l_chain_pkt->data;
        if(l_chain_pkt_data_size>1)
            l_error_str[l_chain_pkt_data_size-1]='\0'; // To be sure that nobody sends us garbage
                                                       // without trailing zero
        log_it(L_WARNING, "In: from remote addr %s chain id 0x%016"DAP_UINT64_FORMAT_x" got error on his side: '%s'",
               DAP_STREAM_CH(l_ch_chain)->stream->esocket->remote_addr_str,
               l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt_data_size ? l_error_str : "<empty>");
    } break;

    case DAP_CHAIN_CH_PKT_TYPE_CHAIN: {
        dap_cluster_t *l_cluster = dap_cluster_find(dap_guuid_compose(l_chain_pkt->hdr.net_id.uint64, 0));
        if (!l_cluster) {
            log_it(L_WARNING, "Can't find cluster with ID 0x%" DAP_UINT64_FORMAT_X, l_chain_pkt->hdr.net_id.uint64);
            dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                    DAP_CHAIN_CH_ERROR_CHAIN_NOT_FOUND);
            return false;
        }
        dap_cluster_member_t *l_check = dap_cluster_member_find_unsafe(l_cluster, &a_ch->stream->node);
        if (!l_check) {
            log_it(L_WARNING, "Node with addr "NODE_ADDR_FP_STR" isn't a member of cluster %s",
                                        NODE_ADDR_FP_ARGS_S(a_ch->stream->node), l_cluster->mnemonim);
            return false;
        }
        struct atom_processing_args *l_args = DAP_NEW_SIZE(struct atom_processing_args, l_ch_pkt->hdr.data_size + sizeof(struct atom_processing_args));
        if (!l_args) {
            log_it(L_CRITICAL, g_error_memory_alloc);
            break;
        }
        l_args->addr = a_ch->stream->node;
        l_args->ack_req = true;
        if (l_chain_pkt->hdr.version < 2)
            l_chain_pkt->hdr.data_size = l_chain_pkt_data_size;
        memcpy(l_args->data, l_chain_pkt, l_ch_pkt->hdr.data_size);
        if (s_debug_more) {
            char *l_atom_hash_str;
            dap_get_data_hash_str_static(l_chain_pkt->data, l_chain_pkt_data_size, l_atom_hash_str);
            log_it(L_INFO, "In: CHAIN pkt: atom hash %s (size %zd)", l_atom_hash_str, l_chain_pkt_data_size);
        }
        dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_in_chains_callback, l_args);
    } break;

    case DAP_CHAIN_CH_PKT_TYPE_CHAIN_REQ: {
        if (l_chain_pkt_data_size != sizeof(dap_chain_ch_sync_request_t)) {
            log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_CHAIN_REQ: Wrong chain packet size %zd when expected %zd",
                                                                            l_chain_pkt_data_size, sizeof(dap_chain_ch_sync_request_t));
            dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                    DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE);
            return false;
        }
        dap_chain_ch_sync_request_t *l_request = (dap_chain_ch_sync_request_t *)l_chain_pkt->data;
        if (s_debug_more)
            log_it(L_INFO, "In: CHAIN_REQ pkt: net 0x%016" DAP_UINT64_FORMAT_x " chain 0x%016" DAP_UINT64_FORMAT_x
                            " cell 0x%016" DAP_UINT64_FORMAT_x ", hash from %s, num from %" DAP_UINT64_FORMAT_U,
                            l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                            dap_hash_fast_to_str_static(&l_request->hash_from), l_request->num_from);
        if (l_ch_chain->sync_context) {
            log_it(L_WARNING, "Can't process CHAIN_REQ request cause already busy with syncronization");
            dap_chain_ch_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                    DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS);
            return false;
        }
        dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
        if (!l_chain) {
            log_it(L_WARNING, "Not found chain id 0x%016" DAP_UINT64_FORMAT_x " with net id 0x%016" DAP_UINT64_FORMAT_x,
                                                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.net_id.uint64);
            dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                    DAP_CHAIN_CH_ERROR_CHAIN_NOT_FOUND);
            return false;
        }
        if (!dap_link_manager_get_net_condition(l_chain_pkt->hdr.net_id.uint64)) {
            log_it(L_WARNING, "Net id 0x%016" DAP_UINT64_FORMAT_x " is offline", l_chain_pkt->hdr.net_id.uint64);
            dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                    DAP_CHAIN_CH_ERROR_NET_IS_OFFLINE);
            return false;
        }
        bool l_sync_from_begin = dap_hash_fast_is_blank(&l_request->hash_from);
        dap_chain_atom_iter_t *l_iter = l_chain->callback_atom_iter_create(l_chain, l_chain_pkt->hdr.cell_id, l_sync_from_begin
                                                                           ? NULL : &l_request->hash_from);
        if (!l_iter) {
            dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                    DAP_CHAIN_CH_ERROR_OUT_OF_MEMORY);
            break;
        }
        if (l_sync_from_begin)
            l_chain->callback_atom_iter_get(l_iter, DAP_CHAIN_ITER_OP_FIRST, NULL);
        bool l_missed_hash = false;
        uint64_t l_last_num = l_chain->callback_count_atom(l_chain);
        if (l_iter->cur) {
            if (l_sync_from_begin ||
                    (l_request->num_from == l_iter->cur_num &&
                    l_last_num > l_iter->cur_num)) {
                dap_chain_ch_summary_t l_sum = { .num_cur = l_iter->cur_num, .num_last = l_last_num };
                dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_CHAIN_SUMMARY,
                                                l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64,
                                                l_chain_pkt->hdr.cell_id.uint64, &l_sum, sizeof(l_sum));
                debug_if(s_debug_more, L_DEBUG, "Out: CHAIN_SUMMARY %s for net %s to destination " NODE_ADDR_FP_STR,
                                        l_chain ? l_chain->name : "(null)",
                                                    l_chain ? l_chain->net_name : "(null)",
                                                                    NODE_ADDR_FP_ARGS_S(a_ch->stream->node));
                struct sync_context *l_context = DAP_NEW_Z(struct sync_context);
                l_context->addr = a_ch->stream->node;
                l_context->iter = l_iter;
                l_context->net_id = l_chain_pkt->hdr.net_id;
                l_context->chain_id = l_chain_pkt->hdr.chain_id;
                l_context->cell_id = l_chain_pkt->hdr.cell_id;
                l_context->num_last = l_sum.num_last;
                l_context->last_activity = dap_time_now();
                atomic_store_explicit(&l_context->state, SYNC_STATE_READY, memory_order_relaxed);
                atomic_store(&l_context->allowed_num, l_sum.num_cur + s_sync_ack_window_size);
                dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_chain_iter_callback, l_context);
                l_ch_chain->sync_context = l_context;
                l_ch_chain->sync_timer = dap_timerfd_start_on_worker(a_ch->stream_worker->worker, 1000, s_sync_timer_callback, l_ch_chain);
                break;
            }
            if (l_request->num_from < l_iter->cur_num || l_last_num > l_iter->cur_num)
                l_missed_hash = true;
        } else if (!l_sync_from_begin && l_last_num >= l_request->num_from) {
            l_missed_hash = true;
            debug_if(s_debug_more, L_WARNING, "Requested atom with hash %s not found", dap_hash_fast_to_str_static(&l_request->hash_from));
        }
        if (l_missed_hash) {
            l_chain->callback_atom_iter_get(l_iter, DAP_CHAIN_ITER_OP_LAST, NULL);
            dap_chain_ch_miss_info_t l_miss_info = { .missed_hash = l_request->hash_from,
                                                     .last_hash = *l_iter->cur_hash,
                                                     .last_num = l_iter->cur_num };
            dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_CHAIN_MISS,
                                          l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64,
                                          l_chain_pkt->hdr.cell_id.uint64, &l_miss_info, sizeof(l_miss_info));
            if (s_debug_more) {
                char l_last_hash_str[DAP_HASH_FAST_STR_SIZE];
                dap_hash_fast_to_str(&l_miss_info.last_hash, l_last_hash_str, DAP_HASH_FAST_STR_SIZE);
                log_it(L_INFO, "Out: CHAIN_MISS %s for net %s to source " NODE_ADDR_FP_STR
                                             " with hash missed %s, hash last %s and num last %" DAP_UINT64_FORMAT_U,
                        l_chain ? l_chain->name : "(null)",
                                    l_chain ? l_chain->net_name : "(null)",
                                                    NODE_ADDR_FP_ARGS_S(a_ch->stream->node),
                        dap_hash_fast_to_str_static(&l_miss_info.missed_hash),
                        l_last_hash_str,
                        l_miss_info.last_num);
            }
        } else {
            dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAIN,
                                          l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64,
                                          l_chain_pkt->hdr.cell_id.uint64, NULL, 0);
            debug_if(s_debug_more, L_DEBUG, "Out: SYNCED_CHAIN %s for net %s to destination " NODE_ADDR_FP_STR,
                                    l_chain ? l_chain->name : "(null)",
                                                l_chain ? l_chain->net_name : "(null)",
                                                                NODE_ADDR_FP_ARGS_S(a_ch->stream->node));
        }
        l_chain->callback_atom_iter_delete(l_iter);
    } break;

    case DAP_CHAIN_CH_PKT_TYPE_CHAIN_SUMMARY: {
        if (l_chain_pkt_data_size != sizeof(dap_chain_ch_summary_t)) {
            log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_CHAIN_SUMMARY: Wrong chain packet size %zd when expected %zd",
                                                                            l_chain_pkt_data_size, sizeof(dap_chain_ch_summary_t));
            dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                    DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE);
            return false;
        }
        dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
        dap_chain_ch_summary_t *l_sum = (dap_chain_ch_summary_t *)l_chain_pkt->data;
        debug_if(s_debug_more, L_DEBUG, "In: CHAIN_SUMMARY of %s for net %s from source " NODE_ADDR_FP_STR
                                            " with %" DAP_UINT64_FORMAT_U " atoms to sync from %" DAP_UINT64_FORMAT_U " to %" DAP_UINT64_FORMAT_U,
                                l_chain ? l_chain->name : "(null)",
                                            l_chain ? l_chain->net_name : "(null)",
                                                            NODE_ADDR_FP_ARGS_S(a_ch->stream->node),
                                l_sum->num_last - l_sum->num_cur, l_sum->num_cur, l_sum->num_last);
    } break;

    case DAP_CHAIN_CH_PKT_TYPE_CHAIN_ACK: {
        if (l_chain_pkt_data_size != sizeof(uint64_t)) {
            log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_CHAIN_ACK: Wrong chain packet size %zd when expected %zd",
                                                                            l_chain_pkt_data_size, sizeof(uint64_t));
            dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                    DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE);
            return false;
        }
        uint64_t l_ack_num = *(uint64_t *)l_chain_pkt->data;
        dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
        debug_if(s_debug_more, L_DEBUG, "In: CHAIN_ACK %s for net %s from source " NODE_ADDR_FP_STR " with num %" DAP_UINT64_FORMAT_U,
                                l_chain ? l_chain->name : "(null)",
                                            l_chain ? l_chain->net_name : "(null)",
                                                            NODE_ADDR_FP_ARGS_S(a_ch->stream->node),
                                l_ack_num);
        struct sync_context *l_context = l_ch_chain->sync_context;
        if (!l_context) {
            log_it(L_WARNING, "CHAIN_ACK: No active sync context");
            dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                    DAP_CHAIN_CH_ERROR_INCORRECT_SYNC_SEQUENCE);
            break;
        }
        if (l_context->num_last == l_ack_num) {
            dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAIN,
                                                 l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64,
                                                 l_chain_pkt->hdr.cell_id.uint64, NULL, 0);
            s_ch_chain_go_idle(l_ch_chain);
            break;
        }
        l_context->last_activity = dap_time_now();
        if (atomic_load_explicit(&l_context->state, memory_order_relaxed) == SYNC_STATE_OVER)
            break;
        atomic_store_explicit(&l_context->allowed_num,
                              dap_min(l_ack_num + s_sync_ack_window_size, l_context->num_last),
                              memory_order_release);
        if (atomic_exchange(&l_context->state, SYNC_STATE_READY) == SYNC_STATE_IDLE)
            dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_chain_iter_callback, l_context);
    } break;

    case DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAIN: {
        dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
        log_it(L_INFO, "In: SYNCED_CHAIN %s for net %s from source " NODE_ADDR_FP_STR,
                    l_chain ? l_chain->name : "(null)",
                                l_chain ? l_chain->net_name : "(null)",
                                                NODE_ADDR_FP_ARGS_S(a_ch->stream->node));
    } break;

    case DAP_CHAIN_CH_PKT_TYPE_CHAIN_MISS: {
        if (l_chain_pkt_data_size != sizeof(dap_chain_ch_miss_info_t)) {
            log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_CHAIN_MISS: Wrong chain packet size %zd when expected %zd",
                                                                            l_chain_pkt_data_size, sizeof(dap_chain_ch_miss_info_t));
            dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                    DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE);
            return false;
        }
        dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
        dap_chain_ch_miss_info_t *l_miss_info = (dap_chain_ch_miss_info_t *)l_chain_pkt->data;
        if (s_debug_more) {
            char l_last_hash_str[DAP_HASH_FAST_STR_SIZE];
            dap_hash_fast_to_str(&l_miss_info->last_hash, l_last_hash_str, DAP_HASH_FAST_STR_SIZE);
            log_it(L_INFO, "In: CHAIN_MISS %s for net %s from source " NODE_ADDR_FP_STR
                                         " with hash missed %s, hash last %s and num last %" DAP_UINT64_FORMAT_U,
                    l_chain ? l_chain->name : "(null)",
                                l_chain ? l_chain->net_name : "(null)",
                                                NODE_ADDR_FP_ARGS_S(a_ch->stream->node),
                    dap_hash_fast_to_str_static(&l_miss_info->missed_hash),
                    l_last_hash_str,
                    l_miss_info->last_num);
        }
        // Will be processed upper in net packet notifier callback
    } break;

    default:
        dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                                            l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                                            DAP_CHAIN_CH_ERROR_UNKNOWN_CHAIN_PKT_TYPE);
        return false;

//    }
//}

    /* *** Legacy *** */

        /// --- GDB update ---
        // Request for gdbs list update
        case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_REQ:{
            if(l_ch_chain->state != DAP_CHAIN_CH_STATE_IDLE){
                log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_REQ request because its already busy with syncronization");
                dap_chain_ch_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                        DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS);
                break;
            }
            log_it(L_INFO, "In:  UPDATE_GLOBAL_DB_REQ pkt: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x,
                            l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
            if (l_chain_pkt_data_size == (size_t)sizeof(dap_chain_ch_sync_request_old_t))
                l_ch_chain->request = *(dap_chain_ch_sync_request_old_t*)l_chain_pkt->data;
            struct sync_request *l_sync_request = dap_chain_ch_create_sync_request(l_chain_pkt, a_ch);
            l_ch_chain->stats_request_gdb_processed = 0;
            l_ch_chain->request_hdr = l_chain_pkt->hdr;
            dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_update_gdb_proc_callback, l_sync_request);
        } break;

        // Response with metadata organized in TSD
        case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_TSD: {
            if (s_debug_more)
                log_it(L_DEBUG, "Global DB TSD packet detected");
        } break;

        // If requested - begin to recieve record's hashes
        case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_START:{
            if (s_debug_more)
                log_it(L_INFO, "In:  UPDATE_GLOBAL_DB_START pkt net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x,
                                l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
            if (l_ch_chain->state != DAP_CHAIN_CH_STATE_IDLE){
                log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_START request because its already busy with syncronization");
                dap_chain_ch_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                        DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS);
                break;
            }
            l_ch_chain->request_hdr = l_chain_pkt->hdr;
            l_ch_chain->state = DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB_REMOTE;
        } break;
        // Response with gdb element hashes and sizes
        case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB:{
            if(s_debug_more)
                log_it(L_INFO, "In: UPDATE_GLOBAL_DB pkt data_size=%zu", l_chain_pkt_data_size);
            if (l_ch_chain->state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB_REMOTE ||
                    memcmp(&l_ch_chain->request_hdr.net_id, &l_chain_pkt->hdr.net_id,
                           sizeof(dap_chain_net_id_t) + sizeof(dap_chain_id_t) + sizeof(dap_chain_cell_id_t))) {
                log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB request because its already busy with syncronization");
                dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                        DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS);
                break;
            }
            for ( dap_chain_ch_update_element_t * l_element =(dap_chain_ch_update_element_t *) l_chain_pkt->data;
                   (size_t) (((byte_t*)l_element) - l_chain_pkt->data ) < l_chain_pkt_data_size;
                  l_element++){
                dap_chain_ch_hash_item_t * l_hash_item = NULL;
                unsigned l_hash_item_hashv;
                HASH_VALUE(&l_element->hash, sizeof(l_element->hash), l_hash_item_hashv);
                HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_gdbs, &l_element->hash, sizeof(l_element->hash),
                                      l_hash_item_hashv, l_hash_item);
                if (!l_hash_item) {
                    l_hash_item = DAP_NEW_Z(dap_chain_ch_hash_item_t);
                    if (!l_hash_item) {
                        log_it(L_CRITICAL, "Memory allocation error");
                        break;
                    }
                    l_hash_item->hash = l_element->hash;
                    l_hash_item->size = l_element->size;
                    HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_gdbs, hash, sizeof(l_hash_item->hash),
                                         l_hash_item_hashv, l_hash_item);
                    /*if (s_debug_more){
                        char l_hash_str[72]={ [0]='\0'};
                        dap_chain_hash_fast_to_str(&l_hash_item->hash,l_hash_str,sizeof (l_hash_str));
                        log_it(L_DEBUG,"In: Updated remote hash gdb list with %s ", l_hash_str);
                    }*/
                }
            }
        } break;
        // End of response with starting of DB sync
        case DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END: {
            if(l_chain_pkt_data_size == sizeof(dap_chain_ch_sync_request_old_t)) {
                if (l_ch_chain->state != DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB_REMOTE ||
                        memcmp(&l_ch_chain->request_hdr.net_id, &l_chain_pkt->hdr.net_id,
                               sizeof(dap_chain_net_id_t) + sizeof(dap_chain_id_t) + sizeof(dap_chain_cell_id_t))) {
                    log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_END request because its already busy with syncronization");
                    dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                            l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                            DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS);
                    break;
                }
                debug_if(s_debug_more, L_INFO, "In: UPDATE_GLOBAL_DB_END pkt with total count %d hashes",
                                        HASH_COUNT(l_ch_chain->remote_gdbs));
                if (l_chain_pkt_data_size == sizeof(dap_chain_ch_sync_request_old_t))
                    l_ch_chain->request = *(dap_chain_ch_sync_request_old_t*)l_chain_pkt->data;
                struct sync_request *l_sync_request = dap_chain_ch_create_sync_request(l_chain_pkt, a_ch);
                dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_sync_request);
            } else {
                log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request));
                dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                        DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE);
            }
        } break;
        // first packet of data with source node address
        case DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB: {
            if(l_chain_pkt_data_size == (size_t)sizeof(dap_chain_node_addr_t)){
               l_ch_chain->request.node_addr = *(dap_chain_node_addr_t*)l_chain_pkt->data;
               l_ch_chain->stats_request_gdb_processed = 0;
               log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%zu net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x
                              " from address "NODE_ADDR_FP_STR, l_chain_pkt_data_size,   l_chain_pkt->hdr.net_id.uint64 ,
                              l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr) );
            }else {
               log_it(L_WARNING,"Incorrect data size %zu in packet DAP_CHAIN_CH_PKT_TYPE_FIRST_GLOBAL_DB", l_chain_pkt_data_size);
               dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                       l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                       DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE);
            }
        } break;

        case DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB: {
            if(s_debug_more)
                log_it(L_INFO, "In: GLOBAL_DB data_size=%zu", l_chain_pkt_data_size);
            // get transaction and save it to global_db
            if(l_chain_pkt_data_size > 0) {
                struct sync_request *l_sync_request = dap_chain_ch_create_sync_request(l_chain_pkt, a_ch);
                dap_chain_pkt_item_t *l_pkt_item = &l_sync_request->pkt;
                l_pkt_item->pkt_data = DAP_DUP_SIZE(l_chain_pkt->data, l_chain_pkt_data_size);
                l_pkt_item->pkt_data_size = l_chain_pkt_data_size;
                dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_gdb_in_pkt_proc_callback, l_sync_request);
            } else {
                log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size");
                dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                        DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE);
            }
        }  break;

        case DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB: {
                log_it(L_INFO, "In:  SYNCED_GLOBAL_DB: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x,
                                l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
                // we haven't node client waitng, so reply to other side
                dap_chain_ch_sync_request_old_t l_sync_gdb = {};
                l_sync_gdb.node_addr.uint64 = g_node_addr.uint64;
                dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_REQ, l_chain_pkt->hdr.net_id.uint64,
                                              l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_sync_gdb, sizeof(l_sync_gdb));
        } break;

        /// --- Chains update ---
        // Request for atoms list update
        case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_REQ:{
            if (l_ch_chain->state != DAP_CHAIN_CH_STATE_IDLE) {
                log_it(L_WARNING, "Can't process UPDATE_CHAINS_REQ request because its already busy with syncronization");
                dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                        DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS);
                break;
            }
            if(s_debug_more)
                log_it(L_INFO, "In: UPDATE_CHAINS_REQ pkt: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x,
                                l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
            dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
            if (l_chain) {
                l_ch_chain->state = DAP_CHAIN_CH_STATE_UPDATE_CHAINS;
                if(s_debug_more)
                    log_it(L_INFO, "Out: UPDATE_CHAINS_START pkt: net %s chain %s cell 0x%016"DAP_UINT64_FORMAT_X, l_chain->name,
                                        l_chain->net_name, l_chain_pkt->hdr.cell_id.uint64);
                l_ch_chain->request_atom_iter = l_chain->callback_atom_iter_create(l_chain, l_chain_pkt->hdr.cell_id, NULL);
                l_chain->callback_atom_iter_get(l_ch_chain->request_atom_iter, DAP_CHAIN_ITER_OP_FIRST, NULL);
                l_ch_chain->request_hdr = l_chain_pkt->hdr;
                dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_START,
                                                     l_chain_pkt->hdr.net_id.uint64,l_chain_pkt->hdr.chain_id.uint64,
                                                     l_chain_pkt->hdr.cell_id.uint64, NULL, 0);
            }
        } break;
        // Response with metadata organized in TSD
        case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_TSD :{
            if (s_debug_more)
                log_it(L_DEBUG, "Chain TSD packet detected");
        } break;

        // If requested - begin to send atom hashes
        case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_START:{
            if (l_ch_chain->state != DAP_CHAIN_CH_STATE_IDLE) {
                log_it(L_WARNING, "Can't process UPDATE_CHAINS_START request because its already busy with syncronization");
                dap_chain_ch_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                        DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS);
                break;
            }
            dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
            if (!l_chain) {
                log_it(L_ERROR, "Invalid UPDATE_CHAINS_START request from %s with net id 0x%016"DAP_UINT64_FORMAT_x
                                " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet",
                                a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64,
                                l_chain_pkt->hdr.cell_id.uint64);
                dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                                                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                                                    DAP_CHAIN_CH_ERROR_NET_INVALID_ID);
                // Who are you? I don't know you! go away!
                a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE;
                break;
            }
            l_ch_chain->state = DAP_CHAIN_CH_STATE_UPDATE_CHAINS_REMOTE;
            l_ch_chain->request_hdr = l_chain_pkt->hdr;
            debug_if(s_debug_more, L_INFO, "In: UPDATE_CHAINS_START pkt");
        } break;

        // Response with atom hashes and sizes
        case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS: {
            unsigned int l_count_added=0;
            unsigned int l_count_total=0;

            dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
            if (! l_chain){
                log_it(L_ERROR, "Invalid UPDATE_CHAINS packet from %s with net id 0x%016"DAP_UINT64_FORMAT_x
                                " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet",
                                a_ch->stream->esocket->remote_addr_str,
                                l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64,
                                l_chain_pkt->hdr.cell_id.uint64);
                dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                                                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                                                    DAP_CHAIN_CH_ERROR_NET_INVALID_ID);
                // Who are you? I don't know you! go away!
                a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE;
                break;
            }
            for ( dap_chain_ch_update_element_t * l_element =(dap_chain_ch_update_element_t *) l_chain_pkt->data;
                   (size_t) (((byte_t*)l_element) - l_chain_pkt->data ) < l_chain_pkt_data_size;
                  l_element++){
                dap_chain_ch_hash_item_t * l_hash_item = NULL;
                unsigned l_hash_item_hashv;
                HASH_VALUE(&l_element->hash, sizeof(dap_hash_fast_t), l_hash_item_hashv);
                HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_atoms, &l_element->hash, sizeof(dap_hash_fast_t),
                                      l_hash_item_hashv, l_hash_item);
                if( ! l_hash_item ){
                    l_hash_item = DAP_NEW_Z(dap_chain_ch_hash_item_t);
                    if (!l_hash_item) {
                        log_it(L_CRITICAL, "Memory allocation error");
                        break;
                    }
                    l_hash_item->hash = l_element->hash;
                    l_hash_item->size = l_element->size;
                    HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_atoms, hash, sizeof(dap_hash_fast_t),
                                         l_hash_item_hashv, l_hash_item);
                    l_count_added++;
                    /*
                    if (s_debug_more){
                        char l_hash_str[72]={ [0]='\0'};
                        dap_chain_hash_fast_to_str(&l_hash_item->hash,l_hash_str,sizeof (l_hash_str));
                        log_it(L_DEBUG,"In: Updated remote atom hash list with %s ", l_hash_str);
                    }*/
                }
                l_count_total++;
            }
            if (s_debug_more)
                log_it(L_INFO,"In: Added %u from %u remote atom hash  in list",l_count_added,l_count_total);
        } break;

        case DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_END: {
            if(l_chain_pkt_data_size == sizeof(dap_chain_ch_sync_request_old_t)) {
                if (l_ch_chain->state != DAP_CHAIN_CH_STATE_UPDATE_CHAINS_REMOTE ||
                        memcmp(&l_ch_chain->request_hdr.net_id, &l_chain_pkt->hdr.net_id,
                               sizeof(dap_chain_net_id_t) + sizeof(dap_chain_id_t) + sizeof(dap_chain_cell_id_t))) {
                    log_it(L_WARNING, "Can't process UPDATE_CHAINS_END request because its already busy with syncronization");
                    dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                            l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                            DAP_CHAIN_CH_ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS);
                    break;
                }
                dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
                if (!l_chain) {
                    log_it(L_ERROR, "Invalid UPDATE_CHAINS packet from %s with net id 0x%016"DAP_UINT64_FORMAT_x
                                    " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet",
                                    a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64,
                                    l_chain_pkt->hdr.cell_id.uint64);
                    dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                                                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                                                        DAP_CHAIN_CH_ERROR_NET_INVALID_ID);
                    break;
                }
                debug_if(s_debug_more, L_INFO, "In: UPDATE_CHAINS_END pkt with total count %d hashes",
                               HASH_COUNT(l_ch_chain->remote_atoms));
                struct sync_request *l_sync_request = dap_chain_ch_create_sync_request(l_chain_pkt, a_ch);
                l_ch_chain->stats_request_atoms_processed = 0;
                l_ch_chain->request_hdr = l_chain_pkt->hdr;
                dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_sync_request);
            } else {
                log_it(L_WARNING, "DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_END: Wrong chain packet size %zd when expected %zd",
                       l_chain_pkt_data_size, sizeof(l_ch_chain->request));
                dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                        DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE);
            }
        } break;
        // first packet of data with source node address
        case DAP_CHAIN_CH_PKT_TYPE_FIRST_CHAIN: {
            if(l_chain_pkt_data_size == (size_t)sizeof(dap_chain_node_addr_t)){
                l_ch_chain->request_hdr = l_chain_pkt->hdr;
                l_ch_chain->request.node_addr = *(dap_chain_node_addr_t*)l_chain_pkt->data;
                log_it(L_INFO, "From "NODE_ADDR_FP_STR": FIRST_CHAIN data_size=%zu net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x,
                               NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr),
                               l_chain_pkt_data_size, l_ch_chain->request_hdr.net_id.uint64 ,
                               l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64);
            }else{
                log_it(L_WARNING,"Incorrect data size %zd in packet DAP_CHAIN_CH_PKT_TYPE_FIRST_CHAIN", l_chain_pkt_data_size);
                dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                        DAP_CHAIN_CH_ERROR_CHAIN_PKT_DATA_SIZE);
            }
        } break;

        case DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS: {
            if (dap_log_level_get() <= L_INFO) {
                dap_chain_hash_fast_t l_hash_from = l_ch_chain->request.hash_from;
                char l_hash_from_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = { '\0' }, l_hash_to_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = { '\0' };
                dap_chain_hash_fast_to_str(&l_hash_from, l_hash_from_str, DAP_CHAIN_HASH_FAST_STR_SIZE);
                dap_chain_hash_fast_to_str(&c_dap_chain_addr_blank.data.hash_fast, l_hash_to_str, DAP_CHAIN_HASH_FAST_STR_SIZE);
                log_it(L_INFO, "In:  SYNCED_CHAINS: between %s and %s",l_hash_from_str[0] ? l_hash_from_str : "(null)",
                       l_hash_to_str[0] ? l_hash_to_str: "(null)");

            }
            s_ch_chain_get_idle(l_ch_chain);
            if (l_ch_chain->activity_timer) {
                dap_timerfd_delete_unsafe(l_ch_chain->activity_timer);
                l_ch_chain->activity_timer = NULL;
            }
            // we haven't node client waitng, so reply to other side
            dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
            if (!l_chain) {
                log_it(L_ERROR, "Invalid SYNCED_CHAINS packet from %s with net id 0x%016"DAP_UINT64_FORMAT_x
                                " chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet",
                                a_ch->stream->esocket->remote_addr_str, l_chain_pkt->hdr.net_id.uint64,
                                l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
                dap_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                                                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                                                    DAP_CHAIN_CH_ERROR_NET_INVALID_ID);
                break;
            }
            if (s_debug_more) {
                log_it(L_INFO, "Out: UPDATE_CHAINS_REQ pkt");
            }
            dap_chain_ch_sync_request_old_t l_request= {};
            dap_chain_ch_pkt_write_unsafe(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_REQ, l_chain_pkt->hdr.net_id.uint64,
                                          l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_request, sizeof(l_request));
        } break;
    }
    return true;
}

static bool s_sync_timer_callback(void *a_arg)
{
    dap_chain_ch_t *l_ch_chain = a_arg;
    struct sync_context *l_context = l_ch_chain->sync_context;
    if (l_context->last_activity + s_sync_timeout <= dap_time_now()) {
        log_it(L_ERROR, "Sync timeout for node " NODE_ADDR_FP_STR " with net 0x%016" DAP_UINT64_FORMAT_x
                            " chain 0x%016" DAP_UINT64_FORMAT_x " cell 0x%016" DAP_UINT64_FORMAT_x,
                                        NODE_ADDR_FP_ARGS_S(l_context->addr), l_context->net_id.uint64,
                                        l_context->chain_id.uint64, l_context->cell_id.uint64);
        dap_stream_ch_write_error_unsafe(DAP_STREAM_CH(l_ch_chain), l_context->net_id.uint64,
                                         l_context->chain_id.uint64, l_context->cell_id.uint64,
                                         DAP_CHAIN_CH_ERROR_SYNC_TIMEOUT);
        l_ch_chain->sync_timer = NULL;
        s_ch_chain_go_idle(l_ch_chain);
        return false;
    }
    return true;
}

static bool s_chain_iter_callback(void *a_arg)
{
    assert(a_arg);
    struct sync_context *l_context = a_arg;
    dap_chain_atom_iter_t *l_iter = l_context->iter;
    assert(l_iter);
    dap_chain_t *l_chain = l_iter->chain;
    if (atomic_exchange(&l_context->state, SYNC_STATE_BUSY) == SYNC_STATE_OVER) {
        atomic_store(&l_context->state, SYNC_STATE_OVER);
        return false;
    }
    size_t l_atom_size = l_iter->cur_size;
    dap_chain_atom_ptr_t l_atom = l_iter->cur;
    uint32_t l_cycles_count = 0;
    while (l_atom && l_atom_size) {
        if (l_iter->cur_num > atomic_load_explicit(&l_context->allowed_num, memory_order_acquire))
            break;
        dap_chain_ch_pkt_t *l_pkt = dap_chain_ch_pkt_new(l_context->net_id.uint64, l_context->chain_id.uint64, l_context->cell_id.uint64,
                                                         l_atom, l_atom_size);
        // For master format binary complience
        l_pkt->hdr.num_lo = l_iter->cur_num & 0xFFFF;
        l_pkt->hdr.num_hi = (l_iter->cur_num >> 16) & 0xFF;
        dap_stream_ch_pkt_send_by_addr(&l_context->addr, DAP_CHAIN_CH_ID, DAP_CHAIN_CH_PKT_TYPE_CHAIN, l_pkt, dap_chain_ch_pkt_get_size(l_pkt));
        DAP_DELETE(l_pkt);
        debug_if(s_debug_more, L_DEBUG, "Out: CHAIN %s for net %s to destination " NODE_ADDR_FP_STR " with num %" DAP_UINT64_FORMAT_U
                                            " hash %s and size %zu",
                                l_chain ? l_chain->name : "(null)",
                                            l_chain ? l_chain->net_name : "(null)",
                                                            NODE_ADDR_FP_ARGS_S(l_context->addr),
                                l_iter->cur_num, dap_hash_fast_to_str_static(l_iter->cur_hash), l_iter->cur_size);
        l_atom = l_chain->callback_atom_iter_get(l_iter, DAP_CHAIN_ITER_OP_NEXT, &l_atom_size);
        if (!l_atom || !l_atom_size || l_iter->cur_num > l_context->num_last)
            break;
        if (atomic_exchange(&l_context->state, SYNC_STATE_BUSY) == SYNC_STATE_OVER) {
            atomic_store(&l_context->state, SYNC_STATE_OVER);
            return false;
        }
        if (++l_cycles_count >= s_sync_packets_per_thread_call)
            return true;
    }
    uint16_t l_state = l_atom && l_atom_size && l_iter->cur_num <= l_context->num_last
                ? SYNC_STATE_IDLE : SYNC_STATE_OVER;
    uint16_t l_prev_state = atomic_exchange(&l_context->state, l_state);
    if (l_prev_state == SYNC_STATE_OVER && l_state != SYNC_STATE_OVER)
        atomic_store(&l_context->state, SYNC_STATE_OVER);
    if (l_prev_state == SYNC_STATE_READY)   // Allowed num was changed since last state updating
        return true;
    return false;
}

static bool s_chain_iter_delete_callback(void *a_arg)
{
    struct sync_context *l_context = a_arg;
    assert(l_context->iter);
    l_context->iter->chain->callback_atom_iter_delete(l_context->iter);
    DAP_DELETE(l_context);
    return false;
}

/**
 * @brief s_ch_chain_go_idle
 * @param a_ch_chain
 */
static void s_ch_chain_go_idle(dap_chain_ch_t *a_ch_chain)
{
    // New protocol
    if (a_ch_chain->sync_context) {
        atomic_store(&((struct sync_context *)a_ch_chain->sync_context)->state, SYNC_STATE_OVER);
        dap_proc_thread_callback_add(DAP_STREAM_CH(a_ch_chain)->stream_worker->worker->proc_queue_input,
                                     s_chain_iter_delete_callback, a_ch_chain->sync_context);
        a_ch_chain->sync_context = NULL;
    }
    if (a_ch_chain->sync_timer) {
        dap_timerfd_delete_unsafe(a_ch_chain->sync_timer);
        a_ch_chain->sync_timer = NULL;
    }
//}
    // Legacy
    if (a_ch_chain->state == DAP_CHAIN_CH_STATE_IDLE) {
        return;
    }
    a_ch_chain->state = DAP_CHAIN_CH_STATE_IDLE;

    if(s_debug_more)
        log_it(L_INFO, "Go in DAP_CHAIN_CH_STATE_IDLE");

    // Cleanup after request
    memset(&a_ch_chain->request, 0, sizeof(a_ch_chain->request));
    memset(&a_ch_chain->request_hdr, 0, sizeof(a_ch_chain->request_hdr));
    if (a_ch_chain->request_atom_iter && a_ch_chain->request_atom_iter->chain &&
            a_ch_chain->request_atom_iter->chain->callback_atom_iter_delete) {
        a_ch_chain->request_atom_iter->chain->callback_atom_iter_delete(a_ch_chain->request_atom_iter);
        a_ch_chain->request_atom_iter = NULL;
    }

    dap_chain_ch_hash_item_t *l_hash_item = NULL, *l_tmp = NULL;

    HASH_ITER(hh, a_ch_chain->remote_atoms, l_hash_item, l_tmp) {
        // Clang bug at this, l_hash_item should change at every loop cycle
        HASH_DEL(a_ch_chain->remote_atoms, l_hash_item);
        DAP_DELETE(l_hash_item);
    }
    a_ch_chain->remote_atoms = NULL;
    a_ch_chain->sent_breaks = 0;
}

struct chain_io_complete {
    dap_stream_ch_uuid_t ch_uuid;
    dap_chain_ch_state_t state;
    uint8_t type;
    uint64_t net_id;
    uint64_t chain_id;
    uint64_t cell_id;
    size_t data_size;
    byte_t data[];
};

static void s_stream_ch_io_complete(dap_events_socket_t *a_es, void *a_arg)
{
    dap_stream_t *l_stream = NULL;
    if (!a_es->server) {
        dap_client_t *l_client = DAP_ESOCKET_CLIENT(a_es);
        assert(l_client);
        dap_client_pvt_t *l_client_pvt = DAP_CLIENT_PVT(l_client);
        l_stream = l_client_pvt->stream;
    } else {
        dap_http_client_t *l_http_client = DAP_HTTP_CLIENT(a_es);
        if (l_http_client)
            l_stream = DAP_STREAM(l_http_client);
    }
    if (!l_stream)
        return;
    dap_stream_ch_t *l_ch = NULL;
    for (size_t i = 0; i < l_stream->channel_count; i++)
        if (l_stream->channel[i]->proc->id == DAP_CHAIN_CH_ID)
            l_ch = l_stream->channel[i];
    if (!l_ch || !DAP_CHAIN_CH(l_ch))
        return;
    if (a_arg) {
        struct chain_io_complete *l_arg = (struct chain_io_complete *)a_arg;
        if (DAP_CHAIN_CH(l_ch)->state == DAP_CHAIN_CH_STATE_WAITING)
            DAP_CHAIN_CH(l_ch)->state = l_arg->state;
        dap_chain_ch_pkt_write_unsafe(l_ch, l_arg->type, l_arg->net_id, l_arg->chain_id,
                                             l_arg->cell_id, l_arg->data, l_arg->data_size);
        a_es->callbacks.arg = NULL;
        DAP_DELETE(a_arg);
        return;
    }
    s_stream_ch_packet_out(l_ch, NULL);
}

static void s_stream_ch_chain_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id,
                                        uint64_t a_chain_id, uint64_t a_cell_id,
                                        const void * a_data, size_t a_data_size)
{
    size_t l_free_buf_size = dap_events_socket_get_free_buf_size(a_ch->stream->esocket) -
                                sizeof(dap_chain_ch_pkt_t) - sizeof(dap_stream_ch_pkt_t) -
                                sizeof(dap_stream_pkt_t) - DAP_STREAM_PKT_ENCRYPTION_OVERHEAD;
    if (l_free_buf_size < a_data_size) {
        struct chain_io_complete *l_arg = DAP_NEW_Z_SIZE(struct chain_io_complete, sizeof(struct chain_io_complete) + a_data_size);
        l_arg->ch_uuid = a_ch->uuid;
        l_arg->state = DAP_CHAIN_CH(a_ch)->state;
        DAP_CHAIN_CH(a_ch)->state = DAP_CHAIN_CH_STATE_WAITING;
        l_arg->type = a_type;
        l_arg->net_id = a_net_id;
        l_arg->chain_id = a_chain_id;
        l_arg->cell_id = a_cell_id;
        l_arg->data_size = a_data_size;
        memcpy(l_arg->data, a_data, a_data_size);
        a_ch->stream->esocket->callbacks.arg = l_arg;
    }
    else
       dap_chain_ch_pkt_write_unsafe(a_ch, a_type, a_net_id, a_chain_id, a_cell_id, a_data, a_data_size);
}

/**
 * @brief s_stream_ch_packet_out
 * @param ch
 * @param arg
 */
static bool s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg)
{
    dap_chain_ch_t *l_ch_chain = DAP_CHAIN_CH(a_ch);
    if (!l_ch_chain) {
        log_it(L_CRITICAL, "Channel without chain, dump it");
        s_ch_chain_go_idle(l_ch_chain);
        return false;
    }
    bool l_go_idle = false, l_was_sent_smth = false;
    switch (l_ch_chain->state) {
        // Update list of global DB records to remote
    case DAP_CHAIN_CH_STATE_UPDATE_GLOBAL_DB: {
#if 0
        size_t i, q =
                // s_update_pack_size;
                0;
        //dap_db_log_list_obj_t **l_objs = dap_db_log_list_get_multiple(l_ch_chain->request_db_log, DAP_STREAM_PKT_SIZE_MAX, &q);
        dap_chain_ch_update_element_t *l_data = DAP_NEW_Z_SIZE(dap_chain_ch_update_element_t, q * sizeof(dap_chain_ch_update_element_t));
        for (i = 0; i < q; ++i) {
            l_data[i].hash = l_objs[i]->hash;
            l_data[i].size = l_objs[i]->pkt->data_size;
            DAP_DELETE(l_objs[i]->pkt);
            DAP_DELETE(l_objs[i]);
        }
        if (i) {
            l_was_sent_smth = true;
            s_stream_ch_chain_pkt_write(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB,
                                        l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
                                        l_ch_chain->request_hdr.cell_id.uint64,
                                        l_data, i * sizeof(dap_chain_ch_update_element_t));
            l_ch_chain->stats_request_gdb_processed += i;
            DAP_DELETE(l_data);
            DAP_DELETE(l_objs);
            debug_if(s_debug_more, L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB, %zu records", i);
        } else if (!l_objs) {
            l_was_sent_smth = true;
            l_ch_chain->request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(dap_chain_net_by_id(
                                                                                      l_ch_chain->request_hdr.net_id));
            s_stream_ch_chain_pkt_write(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END,
                                                 l_ch_chain->request_hdr.net_id.uint64,
                                                 l_ch_chain->request_hdr.chain_id.uint64,
                                                 l_ch_chain->request_hdr.cell_id.uint64,
                                                 &l_ch_chain->request, sizeof(dap_chain_ch_sync_request_old_t));
            debug_if(s_debug_more, L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END");
            l_go_idle = true;
        }
            dap_chain_ch_update_element_t l_data[s_update_pack_size];
            uint_fast16_t i;
            dap_db_log_list_obj_t *l_obj = NULL;
            for (i = 0; i < s_update_pack_size; i++) {
                l_obj = dap_db_log_list_get(l_ch_chain->request_db_log);
                if (!l_obj || DAP_POINTER_TO_SIZE(l_obj) == 1)
                    break;
                l_data[i].hash = l_obj->hash;
                l_data[i].size = l_obj->pkt->data_size;
                DAP_DELETE(l_obj->pkt);
                DAP_DELETE(l_obj);
            }
            if (i) {
                l_was_sent_smth = true;
                s_stream_ch_chain_pkt_write(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB,
                                            l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
                                            l_ch_chain->request_hdr.cell_id.uint64,
                                            l_data, i * sizeof(dap_chain_ch_update_element_t));
                l_ch_chain->stats_request_gdb_processed += i;
                if (s_debug_more)
                    log_it(L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB");
            } else if (!l_obj) {
                l_was_sent_smth = true;
                l_ch_chain->request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(dap_chain_net_by_id(
                                                                                          l_ch_chain->request_hdr.net_id));
                s_stream_ch_chain_pkt_write(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END,
                                                     l_ch_chain->request_hdr.net_id.uint64,
                                                     l_ch_chain->request_hdr.chain_id.uint64,
                                                     l_ch_chain->request_hdr.cell_id.uint64,
                                                     &l_ch_chain->request, sizeof(dap_chain_ch_sync_request_old_t));
                if (s_debug_more )
                    log_it(L_INFO, "Out: DAP_CHAIN_CH_PKT_TYPE_UPDATE_GLOBAL_DB_END");
                l_go_idle = true;
            }
#endif
        } break;

        // Synchronize GDB
    case DAP_CHAIN_CH_STATE_SYNC_GLOBAL_DB: {
#if 0
        dap_global_db_pkt_t *l_pkt = NULL;
        size_t l_pkt_size = 0, i, q = 0;
        dap_db_log_list_obj_t **l_objs = dap_db_log_list_get_multiple(l_ch_chain->request_db_log, DAP_STREAM_PKT_SIZE_MAX, &q);
        for (i = 0; i < q; ++i) {
            dap_chain_ch_hash_item_t *l_hash_item = NULL;
            unsigned l_hash_item_hashv = 0;
            HASH_VALUE(&l_objs[i]->hash, sizeof(dap_chain_hash_fast_t), l_hash_item_hashv);
            HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_gdbs, &l_objs[i]->hash,
                                  sizeof(dap_hash_fast_t), l_hash_item_hashv, l_hash_item);
            if (!l_hash_item) {
                l_hash_item = DAP_NEW_Z(dap_chain_ch_hash_item_t);
                *l_hash_item = (dap_chain_ch_hash_item_t) {
                        .hash   = l_objs[i]->hash, .size   = l_objs[i]->pkt->data_size
                };
                HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_gdbs, hash, sizeof(dap_chain_hash_fast_t),
                                     l_hash_item_hashv, l_hash_item);
                l_pkt = dap_global_db_pkt_pack(l_pkt, l_objs[i]->pkt);
                l_ch_chain->stats_request_gdb_processed++;
                l_pkt_size = sizeof(dap_global_db_pkt_t) + l_pkt->data_size;
            }

            DAP_DELETE(l_objs[i]->pkt);
            DAP_DELETE(l_objs[i]);
        }

        if (l_pkt_size) {
            l_was_sent_smth = true;
            // If request was from defined node_addr we update its state
            s_stream_ch_chain_pkt_write(a_ch, DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB,
                                        l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
                                        l_ch_chain->request_hdr.cell_id.uint64, l_pkt, l_pkt_size);
            debug_if(s_debug_more, L_INFO, "Send one global_db packet, size %zu, rest %zu/%zu items", l_pkt_size,
                     l_ch_chain->request_db_log->items_rest,
                     l_ch_chain->request_db_log->items_number);
            DAP_DELETE(l_pkt);
            DAP_DELETE(l_objs);
        } else if (!l_objs) {
            l_was_sent_smth = true;
            // last message
            dap_chain_ch_sync_request_old_t l_request = { };
            s_stream_ch_chain_pkt_write(a_ch, DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB,
                                        l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
                                        l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request));
            l_go_idle = true;
            if (l_ch_chain->callback_notify_packet_out)
                l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB,
                                                       NULL, 0, l_ch_chain->callback_notify_arg);
            log_it(L_INFO,"Syncronized database: items syncronyzed %"DAP_UINT64_FORMAT_U" of %zu",
                    l_ch_chain->stats_request_gdb_processed, l_ch_chain->request_db_log->items_number);
        }
            // Get global DB record
            dap_global_db_pkt_t *l_pkt = NULL;
            dap_db_log_list_obj_t *l_obj = NULL;
            size_t l_pkt_size = 0;
            for (uint_fast16_t l_skip_count = 0; l_skip_count < s_skip_in_reactor_count; ) {
                l_obj = dap_db_log_list_get(l_ch_chain->request_db_log);
                if (!l_obj || DAP_POINTER_TO_SIZE(l_obj) == 1) {
                    l_skip_count = s_skip_in_reactor_count;
                    break;
                }
                dap_chain_ch_hash_item_t *l_hash_item = NULL;
                unsigned l_hash_item_hashv = 0;
                HASH_VALUE(&l_obj->hash, sizeof(dap_chain_hash_fast_t), l_hash_item_hashv);
                HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_gdbs, &l_obj->hash, sizeof(dap_hash_fast_t),
                                      l_hash_item_hashv, l_hash_item);
                if (l_hash_item) { // If found - skip it
                    /*if (s_debug_more) {
                        char l_request_atom_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE];
                        dap_chain_hash_fast_to_str(&l_obj->hash, l_request_atom_hash_str, DAP_CHAIN_HASH_FAST_STR_SIZE);
                        log_it(L_DEBUG, "Out CHAIN: skip GDB hash %s because its already present in remote GDB hash table",
                                        l_request_atom_hash_str);
                    }*/
                    l_skip_count++;
                } else {
                    l_hash_item = DAP_NEW_Z(dap_chain_ch_hash_item_t);
                    if (!l_hash_item) {
                        log_it(L_CRITICAL, "Memory allocation error");
                        return;
                    }
                    l_hash_item->hash = l_obj->hash;
                    l_hash_item->size = l_obj->pkt->data_size;
                    HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_gdbs, hash, sizeof(dap_chain_hash_fast_t),
                                         l_hash_item_hashv, l_hash_item);
                    l_pkt = dap_global_db_pkt_pack(l_pkt, l_obj->pkt);
                    l_ch_chain->stats_request_gdb_processed++;
                    l_pkt_size = sizeof(dap_global_db_pkt_t) + l_pkt->data_size;
                }
                DAP_DELETE(l_obj->pkt);
                DAP_DELETE(l_obj);
                if (l_pkt_size >= DAP_CHAIN_PKT_EXPECT_SIZE)
                    break;
            }
            if (l_pkt_size) {
                l_was_sent_smth = true;
                // If request was from defined node_addr we update its state
                if (s_debug_more)
                    log_it(L_INFO, "Send one global_db packet len=%zu (rest=%zu/%zu items)", l_pkt_size,
                                    dap_db_log_list_get_count_rest(l_ch_chain->request_db_log),
                                    dap_db_log_list_get_count(l_ch_chain->request_db_log));
                s_stream_ch_chain_pkt_write(a_ch, DAP_CHAIN_CH_PKT_TYPE_GLOBAL_DB,
                                                     l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
                                                     l_ch_chain->request_hdr.cell_id.uint64, l_pkt, l_pkt_size);
                DAP_DELETE(l_pkt);
            } else if (!l_obj) {
                l_was_sent_smth = true;
                log_it( L_INFO,"Syncronized database: items syncronyzed %"DAP_UINT64_FORMAT_U" from %zu",
                        l_ch_chain->stats_request_gdb_processed, dap_db_log_list_get_count(l_ch_chain->request_db_log));
                // last message
                dap_chain_ch_sync_request_old_t l_request = {};
                s_stream_ch_chain_pkt_write(a_ch, DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB,
                                                     l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
                                                     l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request));
                l_go_idle = true;
                if (l_ch_chain->callback_notify_packet_out)
                    l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_CHAIN_CH_PKT_TYPE_SYNCED_GLOBAL_DB,
                                                           NULL, 0, l_ch_chain->callback_notify_arg);
            }
#endif
    } break;

        // Update list of atoms to remote
        case DAP_CHAIN_CH_STATE_UPDATE_CHAINS:{
            dap_chain_ch_update_element_t *l_data = DAP_NEW_Z_SIZE(dap_chain_ch_update_element_t,
                                                                          sizeof(dap_chain_ch_update_element_t) * s_update_pack_size);
            size_t l_data_size=0;
            for(uint_fast16_t n=0; n<s_update_pack_size && (l_ch_chain->request_atom_iter && l_ch_chain->request_atom_iter->cur);n++){
                l_data[n].hash = *l_ch_chain->request_atom_iter->cur_hash;
                // Shift offset counter
                l_data_size += sizeof(dap_chain_ch_update_element_t);
                // Then get next atom
                l_ch_chain->request_atom_iter->chain->callback_atom_iter_get(l_ch_chain->request_atom_iter, DAP_CHAIN_ITER_OP_NEXT, NULL);
            }
            if (l_data_size){
                l_was_sent_smth = true;
                if(s_debug_more)
                    log_it(L_DEBUG,"Out: UPDATE_CHAINS with %zu hashes sent", l_data_size / sizeof(dap_chain_ch_update_element_t));
                s_stream_ch_chain_pkt_write(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS,
                                                     l_ch_chain->request_hdr.net_id.uint64,
                                                     l_ch_chain->request_hdr.chain_id.uint64,
                                                     l_ch_chain->request_hdr.cell_id.uint64,
                                                     l_data,l_data_size);
            }
            if(!l_data_size  ||  !l_ch_chain->request_atom_iter){ // We over with all the hashes here
                l_was_sent_smth = true;
                if(s_debug_more)
                    log_it(L_INFO,"Out: UPDATE_CHAINS_END sent ");
                dap_chain_ch_sync_request_old_t l_request = {};
                s_stream_ch_chain_pkt_write(a_ch, DAP_CHAIN_CH_PKT_TYPE_UPDATE_CHAINS_END,
                                                     l_ch_chain->request_hdr.net_id.uint64,
                                                     l_ch_chain->request_hdr.chain_id.uint64,
                                                     l_ch_chain->request_hdr.cell_id.uint64,
                                                     &l_request, sizeof(dap_chain_ch_sync_request_old_t));
                l_go_idle = true;
                dap_stream_ch_set_ready_to_write_unsafe(a_ch, false);
            }
            DAP_DELETE(l_data);
        }break;

        // Synchronize chains
        case DAP_CHAIN_CH_STATE_SYNC_CHAINS: {
            // Process one chain from l_ch_chain->request_atom_iter
            // Pack loop to skip quicker
            for(uint_fast16_t k=0; k<s_skip_in_reactor_count     &&
                                   l_ch_chain->request_atom_iter &&
                                   l_ch_chain->request_atom_iter->cur; k++){
                // Check if present and skip if present
                dap_chain_ch_hash_item_t *l_hash_item = NULL;
                unsigned l_hash_item_hashv = 0;
                HASH_VALUE(l_ch_chain->request_atom_iter->cur_hash, sizeof(dap_chain_hash_fast_t), l_hash_item_hashv);
                HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_atoms, l_ch_chain->request_atom_iter->cur_hash,
                                      sizeof(dap_chain_hash_fast_t), l_hash_item_hashv, l_hash_item);
                if( l_hash_item ){ // If found - skip it
                    /*if(s_debug_more){
                        char l_request_atom_hash_str[81]={[0]='\0'};
                        dap_chain_hash_fast_to_str(l_ch_chain->request_atom_iter->cur_hash,l_request_atom_hash_str,sizeof (l_request_atom_hash_str));
                        log_it(L_DEBUG, "Out CHAIN: skip atom hash %s because its already present in remote atom hash table",
                                        l_request_atom_hash_str);
                    }*/
                }else{
                    l_hash_item = DAP_NEW_Z(dap_chain_ch_hash_item_t);
                    if (!l_hash_item) {
                        log_it(L_CRITICAL, "Memory allocation error");
                        return false;
                    }
                    l_hash_item->hash = *l_ch_chain->request_atom_iter->cur_hash;
                    if(s_debug_more){
                        char l_atom_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE];
                        dap_chain_hash_fast_to_str(&l_hash_item->hash, l_atom_hash_str, sizeof(l_atom_hash_str));
                        log_it(L_INFO, "Out CHAIN pkt: atom hash %s (size %zd) ", l_atom_hash_str, l_ch_chain->request_atom_iter->cur_size);
                    }
                    s_stream_ch_chain_pkt_write(a_ch, DAP_CHAIN_CH_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id.uint64,
                                                         l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64,
                                                         l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size);
                    l_was_sent_smth = true;
                    l_ch_chain->stats_request_atoms_processed++;

                    l_hash_item->size = l_ch_chain->request_atom_iter->cur_size;
                    // Because we sent this atom to remote - we record it to not to send it twice
                    HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_atoms, hash, sizeof(dap_hash_fast_t), l_hash_item_hashv,
                                         l_hash_item);
                }
                // Then get next atom and populate new last
                l_ch_chain->request_atom_iter->chain->callback_atom_iter_get(l_ch_chain->request_atom_iter, DAP_CHAIN_ITER_OP_NEXT, NULL);
                if (l_was_sent_smth)
                    break;
            }
            if(!l_ch_chain->request_atom_iter || !l_ch_chain->request_atom_iter->cur)  { // All chains synced
                dap_chain_ch_sync_request_old_t l_request = {};
                // last message
                l_was_sent_smth = true;
                s_stream_ch_chain_pkt_write(a_ch, DAP_CHAIN_CH_PKT_TYPE_SYNCED_CHAINS,
                                                     l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
                                                     l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request));
                log_it( L_INFO,"Synced: %"DAP_UINT64_FORMAT_U" atoms processed", l_ch_chain->stats_request_atoms_processed);
                l_go_idle = true;
            }
        } break;

        default:
            return false;
    }
    if (l_was_sent_smth) {
        s_chain_timer_reset(l_ch_chain);
        l_ch_chain->sent_breaks = 0;
    } else
        l_ch_chain->sent_breaks++;
    if (l_go_idle) {
        s_ch_chain_go_idle(l_ch_chain);
        if (l_ch_chain->activity_timer) {
            if (!a_arg)
                dap_timerfd_delete_unsafe(l_ch_chain->activity_timer);
            l_ch_chain->activity_timer = NULL;
        }
        return false;
    }
    return true;
}