Skip to content
Snippets Groups Projects
dap_stream_ch_chain.c 70.68 KiB
/*
 * Authors:
 * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net>
 * Alexander Lysikov <alexander.lysikov@demlabs.net>
 * DeM Labs Inc.   https://demlabs.net
 * Kelvin Project https://github.com/kelvinblockchain
 * Copyright  (c) 2017-2018
 * All rights reserved.

 This file is part of DAP (Deus Applications Prototypes) the open source project

 DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
 the Free Software Foundation, either version 3 of the License, or
 (at your option) any later version.

 DAP is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 GNU General Public License for more details.

 You should have received a copy of the GNU General Public License
 along with any DAP based project.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdlib.h>
#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <stddef.h>
#include <stdint.h>

#ifdef WIN32
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#include <pthread.h>
#endif

#include "dap_common.h"
#include "dap_strfuncs.h"
#include "dap_list.h"
#include "dap_config.h"
#include "dap_hash.h"
#include "utlist.h"

#include "dap_worker.h"
#include "dap_events.h"
#include "dap_proc_thread.h"

#include "dap_chain.h"
#include "dap_chain_datum.h"
#include "dap_chain_cs.h"
#include "dap_chain_cell.h"

#include "dap_chain_global_db.h"
#include "dap_chain_global_db_remote.h"

#include "dap_stream.h"
#include "dap_stream_worker.h"
#include "dap_stream_ch_pkt.h"
#include "dap_stream_ch.h"
#include "dap_stream_ch_proc.h"
#include "dap_stream_ch_chain.h"
#include "dap_stream_ch_chain_pkt.h"
#include "dap_chain_net.h"
#define LOG_TAG "dap_stream_ch_chain"

struct sync_request
{
    dap_worker_t * worker;
    dap_stream_ch_t * ch;
    dap_stream_ch_chain_sync_request_t request;
    dap_stream_ch_chain_pkt_hdr_t request_hdr;
    dap_list_t *pkt_list;

    dap_stream_ch_chain_hash_item_t * remote_atoms;

    uint64_t stats_request_elemets_processed;
    union{
        struct{
            dap_db_log_list_t *db_log; //  db log
            dap_list_t *db_iter;
        } gdb;
        struct{
            dap_chain_atom_iter_t *request_atom_iter;
        } chain;
    };
};

struct ch_chain_pkt_in
{
    dap_stream_ch_t * ch;
    dap_chain_pkt_item_t * pkt;
};

static void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg);
static void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg);
static void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg);
static void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg);

static bool s_sync_out_chains_proc_callback(dap_proc_thread_t *a_thread, void *a_arg);

static bool s_sync_out_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_arg);
static void s_sync_out_gdb_first_gdb_worker_callback(dap_worker_t *a_worker, void *a_arg);
static void s_sync_out_gdb_synced_data_worker_callback(dap_worker_t *a_worker, void *a_arg);

static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg);

static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg);
static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_thread, void *a_arg);

static bool s_debug_more=false;
static uint_fast16_t s_update_pack_size=100; // Number of hashes packed into the one packet
static uint_fast16_t s_skip_in_reactor_count=50; // Number of hashes packed to skip in one reactor loop callback out packet
/**
 * @brief dap_stream_ch_chain_init
 * @return
 */
int dap_stream_ch_chain_init()
{
    log_it(L_NOTICE, "Chains and global db exchange channel initialized");
    dap_stream_ch_proc_add(dap_stream_ch_chain_get_id(), s_stream_ch_new, s_stream_ch_delete, s_stream_ch_packet_in,
            s_stream_ch_packet_out);
    s_debug_more = dap_config_get_item_bool_default(g_config,"stream_ch_chain","debug_more",false);
    s_update_pack_size = dap_config_get_item_int16_default(g_config,"stream_ch_chain","update_pack_size",100);

    return 0;
}

/**
 * @brief dap_stream_ch_chain_deinit
 */
void dap_stream_ch_chain_deinit()
{

}

/**
 * @brief s_stream_ch_new
 * @param a_ch
 * @param arg
 */
void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg)
{
    UNUSED(a_arg);
    a_ch->internal = DAP_NEW_Z(dap_stream_ch_chain_t);
    dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
    l_ch_chain->ch = a_ch;
}

/**
 * @brief s_stream_ch_chain_delete
 * @param a_ch_chain
 */
static void s_sync_request_delete(struct sync_request * a_sync_request)
{
    if (a_sync_request->pkt_list) {
        dap_list_t *l_tmp_item = a_sync_request->pkt_list;        while(l_tmp_item) {
            dap_chain_pkt_item_t *l_pkt_copy = (dap_chain_pkt_item_t *)l_tmp_item->data;
            DAP_DELETE(l_pkt_copy->pkt_data);
            DAP_DELETE(l_pkt_copy);
            dap_list_t *l_trash_item = l_tmp_item;
            l_tmp_item = dap_list_next(l_tmp_item);
            DAP_DELETE(l_trash_item);
        }
    }

    if (a_sync_request->gdb.db_iter) {
        a_sync_request->gdb.db_iter = dap_list_first( a_sync_request->gdb.db_iter);
        dap_list_free_full( a_sync_request->gdb.db_iter, free);
        a_sync_request->gdb.db_iter = NULL;
    }
    DAP_DELETE(a_sync_request);
}

/**
 * @brief s_stream_ch_delete_in_proc
 * @param a_thread
 * @param a_arg
 * @return
 */
static bool s_stream_ch_delete_in_proc(dap_proc_thread_t * a_thread, void * a_arg)
{
    (void) a_thread;
    dap_stream_ch_chain_t * l_ch_chain=(dap_stream_ch_chain_t*) a_arg;
    dap_stream_ch_chain_hash_item_t * l_item, *l_tmp;

    // Clear remote atoms
    HASH_ITER(hh, l_ch_chain->remote_atoms, l_item, l_tmp){
        HASH_DEL(l_ch_chain->remote_atoms, l_item);
        DAP_DELETE(l_item);
    }
    // Clear remote gdbs
    HASH_ITER(hh, l_ch_chain->remote_gdbs, l_item, l_tmp){
        HASH_DEL(l_ch_chain->remote_gdbs, l_item);
        DAP_DELETE(l_item);
    }
    DAP_DELETE(l_ch_chain);
    return true;
}

/**
 * @brief s_stream_ch_delete
 * @param ch
 * @param arg
 */
static void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg)
{
    (void) a_arg;
    dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input,s_stream_ch_delete_in_proc,a_ch->internal );
    a_ch->internal = NULL; // To prevent its cleaning in worker
}

/**
 * @brief s_sync_out_chains_worker_callback
 * @param a_worker
 * @param a_arg
 */
static void s_sync_out_chains_first_worker_callback(dap_worker_t *a_worker, void *a_arg)
{
    UNUSED(a_worker);
    struct sync_request * l_sync_request = (struct sync_request *) a_arg;
    dap_stream_ch_t *l_ch = l_sync_request->ch;
    if( ! dap_stream_ch_check_unsafe( DAP_STREAM_WORKER(a_worker), l_ch) ){
        log_it(L_INFO,"Client disconnected before we sent the reply");
        s_sync_request_delete(l_sync_request);
        return;
    }

    dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
    l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS;
    l_ch_chain->request_atom_iter = l_sync_request->chain.request_atom_iter;
    l_ch_chain->remote_atoms = l_sync_request->remote_atoms; /// TODO check if they were present here before

    dap_chain_node_addr_t l_node_addr = {};
    dap_chain_net_t *l_net = dap_chain_net_by_id(l_sync_request->request_hdr.net_id);
    l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);

    if (s_debug_more )
        log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN");

    dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN,
            l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
            l_ch_chain->request_hdr.cell_id.uint64, &l_node_addr, sizeof(dap_chain_node_addr_t));
    DAP_DELETE(l_sync_request);

}

/**
 * @brief s_sync_out_chains_last_worker_callback
 * @param a_worker
 * @param a_arg
 */
static void s_sync_out_chains_last_worker_callback(dap_worker_t *a_worker, void *a_arg)
{
    UNUSED(a_worker);
    struct sync_request * l_sync_request = (struct sync_request *) a_arg;
    dap_stream_ch_t *l_ch = l_sync_request->ch;
    if(  !dap_stream_ch_check_unsafe( DAP_STREAM_WORKER(a_worker), l_ch) ){
        log_it(L_INFO,"Client disconnected before we sent the reply");
        s_sync_request_delete(l_sync_request);
        return;
    }

    dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
    l_ch_chain->request_atom_iter = l_sync_request->chain.request_atom_iter;
    // last packet
    dap_stream_ch_chain_sync_request_t l_request = {0};
    if (s_debug_more )
        log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS");
    dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS,
            l_sync_request->request_hdr.net_id.uint64, l_sync_request->request_hdr.chain_id.uint64,
            l_sync_request->request_hdr.cell_id.uint64, &l_request, sizeof(l_request));
    if (l_ch_chain->request_atom_iter)
        DAP_DEL_Z(l_ch_chain->request_atom_iter);
    l_ch_chain->state = CHAIN_STATE_IDLE;
    if (l_ch_chain->callback_notify_packet_out)
        l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS,
                                                NULL, 0, l_ch_chain->callback_notify_arg);
    DAP_DELETE(l_sync_request);
}
/**
 * @brief s_sync_chains_callback
 * @param a_thread
 * @param a_arg
 * @return
 */
static bool s_sync_out_chains_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
{
    struct sync_request * l_sync_request = (struct sync_request *) a_arg;

    dap_chain_t * l_chain = dap_chain_find_by_id(l_sync_request->request_hdr.net_id, l_sync_request->request_hdr.chain_id);
    assert(l_chain);

    pthread_rwlock_rdlock(&l_chain->atoms_rwlock);

    l_sync_request->chain.request_atom_iter = l_chain->callback_atom_iter_create(l_chain);
    size_t l_first_size = 0;
    dap_chain_atom_ptr_t *l_iter = l_chain->callback_atom_iter_get_first(l_sync_request->chain.request_atom_iter, &l_first_size);


    if (l_iter && l_first_size) {
        // first packet
        if (!dap_hash_fast_is_blank(&l_sync_request->request.hash_from)) {
            l_iter = l_chain->callback_atom_find_by_hash(l_sync_request->chain.request_atom_iter,
                                                          &l_sync_request->request.hash_from, &l_first_size);
        }


        pthread_rwlock_unlock(&l_chain->atoms_rwlock);
        dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id,s_sync_out_chains_first_worker_callback, l_sync_request );
    } else {
        pthread_rwlock_unlock(&l_chain->atoms_rwlock);
        dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id,s_sync_out_chains_last_worker_callback, l_sync_request );
    }
    return true;
}

/**
 * @brief s_sync_out_gdb_first_gdb_worker_callback
 * @param a_worker
 * @param a_arg
 */
static void s_sync_out_gdb_first_gdb_worker_callback(dap_worker_t *a_worker, void *a_arg)
{
    struct sync_request *l_sync_request = (struct sync_request *) a_arg;
    dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN( l_sync_request->ch );
    dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_hdr.net_id);

    // Add it to outgoing list
    l_ch_chain->request_global_db_trs = l_sync_request->gdb.db_log;
    l_ch_chain->request_db_iter = NULL;
    l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB;
    dap_chain_node_addr_t l_node_addr = { 0 };
    l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
    if (s_debug_more )
        log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB");
    dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch , DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB,
            l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
            l_ch_chain->request_hdr.cell_id.uint64, &l_node_addr, sizeof(dap_chain_node_addr_t));
    if(l_ch_chain->callback_notify_packet_out)
        l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB,
                                                NULL, 0, l_ch_chain->callback_notify_arg);

    if( a_worker){ // We send NULL to prevent delete
        DAP_DELETE(l_sync_request);
        l_ch_chain->is_on_request = false;
    }
}

/**
 * @brief s_sync_out_gdb_synced_data_worker_callback
 * @param a_worker
 * @param a_arg
 */
static void s_sync_out_gdb_synced_data_worker_callback(dap_worker_t *a_worker, void *a_arg)
{
    UNUSED(a_worker);
    dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN( ((struct sync_request *) a_arg)->ch );
    s_sync_out_gdb_first_gdb_worker_callback(NULL,a_arg); // NULL to say callback not to delete request

    dap_stream_ch_chain_sync_request_t l_request = {0};
    if (s_debug_more )
        log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB");
    dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
                                         l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
                                         l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request));
    l_ch_chain->state = CHAIN_STATE_IDLE;
    if(l_ch_chain->callback_notify_packet_out)
        l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
                                                NULL, 0, l_ch_chain->callback_notify_arg);
    l_ch_chain->is_on_request = false;
    DAP_DELETE(a_arg);

}

/**
 * @brief s_sync_out_gdb_callback
 * @param a_thread
 * @param a_arg
 * @return
 */
static bool s_sync_out_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
{
    struct sync_request *l_sync_request = (struct sync_request *) a_arg;

    // Get log diff
    uint64_t l_local_last_id = dap_db_log_get_last_id();
    if (s_debug_more)
        log_it(L_DEBUG, "Sync out gdb proc, requested transactions %llu:%llu", l_sync_request->request.id_start, l_local_last_id);
    uint64_t l_start_item = l_sync_request->request.id_start;
    // If the current global_db has been truncated, but the remote node has not known this
    if(l_sync_request->request.id_start > l_local_last_id) {
        l_start_item = 0;
    }
    dap_chain_net_t *l_net = dap_chain_net_by_id(l_sync_request->request_hdr.net_id);
    dap_list_t *l_add_groups = dap_chain_net_get_add_gdb_group(l_net, l_sync_request->request.node_addr);
    dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_start_item + 1, l_add_groups);

    if(l_db_log) {
        l_sync_request->gdb.db_log = l_db_log;
        dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_gdb_first_gdb_worker_callback,l_sync_request );
    } else {
        dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_gdb_synced_data_worker_callback,l_sync_request );
    }
    return true;
}

/**
 * @brief s_chain_in_pkt_callback
 * @param a_thread
 * @param a_arg
 * @return
 */
static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg)
{
    UNUSED(a_thread);
    struct sync_request *l_sync_request = (struct sync_request *) a_arg;
    dap_chain_hash_fast_t l_atom_hash = {};

    if (l_sync_request->pkt_list) {
        dap_chain_pkt_item_t *l_pkt_item = NULL;
        dap_list_t *l_pkt_iter =l_sync_request->pkt_list;
        l_pkt_item = (dap_chain_pkt_item_t *)l_pkt_iter->data;
        l_sync_request->pkt_list = l_sync_request->pkt_list->next;
        if (l_sync_request->pkt_list ){
            l_sync_request->pkt_list->prev = NULL;
        }
        if (l_pkt_item){
            dap_chain_t *l_chain = dap_chain_find_by_id(l_pkt_item->pkt_hdr.net_id, l_pkt_item->pkt_hdr.chain_id);
            if (!l_chain) {
                if (s_debug_more)
                    log_it(L_WARNING, "No chain found for DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN");
                return true;
            }
            dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_item->pkt_data;
            uint64_t l_atom_copy_size = l_pkt_item->pkt_data_size;
            if ( l_atom_copy_size && l_pkt_item && l_atom_copy ){
                pthread_rwlock_wrlock(&l_chain->atoms_rwlock);
                dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash);
                dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain);
                size_t l_atom_size =0;
                if ( l_chain->callback_atom_find_by_hash(l_atom_iter, &l_atom_hash, &l_atom_size) == NULL ) {
                    dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size);
                    if (l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) {
                        if (s_debug_more){
                            char l_atom_hash_str[72]={[0]='\0'};
                            dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 );
                            log_it(L_INFO,"Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name);
                        }

                        // append to file
                        dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_pkt_item->pkt_hdr.cell_id);
                        int l_res;
                        if (l_cell) {
                            // add one atom only
                            l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size);
                            // rewrite all file
                            //l_res = dap_chain_cell_file_update(l_cell);
                            if(l_res < 0) {
                                log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash,
                                        l_cell ? l_cell->file_storage_path : "[null]");
                            } else {
                                dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash);
                            }
                            // add all atoms from treshold
                            if (l_chain->callback_atom_add_from_treshold){
                                dap_chain_atom_ptr_t l_atom_treshold;
                                do{
                                    size_t l_atom_treshold_size;
                                    // add into ledger
                                    if (s_debug_more)
                                        log_it(L_DEBUG, "Try to add atom from treshold");
                                    l_atom_treshold = l_chain->callback_atom_add_from_treshold(l_chain, &l_atom_treshold_size);
                                    // add into file
                                    if(l_atom_treshold) {
                                        l_res = dap_chain_cell_file_append(l_cell, l_atom_treshold, l_atom_treshold_size);
                                        log_it(L_INFO, "Added atom from treshold");
                                        if(l_res < 0) {
                                            log_it(L_ERROR, "Can't save event 0x%x from treshold to the file '%s'",
                                                    l_atom_treshold, l_cell ? l_cell->file_storage_path : "[null]");
                                        }
                                    }
                                }
                                while(l_atom_treshold);
                            }

                            // delete cell and close file
                            dap_chain_cell_delete(l_cell);
                        }
                        else{
                            log_it(L_ERROR, "Can't get cell for cell_id 0x%x for save event to file", l_pkt_item->pkt_hdr.cell_id);

                        }
                    }else{
                        if (s_debug_more){
                            char l_atom_hash_str[72]={[0]='\0'};
                            dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 );
                            log_it(L_WARNING,"Not accepted atom (code %d) with hash %s for %s:%s", l_atom_add_res, l_atom_hash_str, l_chain->net_name, l_chain->name);
                        }
                    }
                    if(l_atom_add_res == ATOM_PASS)
                        DAP_DELETE(l_atom_copy);
                } else {
                    if (s_debug_more){
                        char l_atom_hash_str[72]={[0]='\0'};
                        dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 );
                        log_it(L_WARNING,"Already has atom with hash %s ", l_atom_hash_str);
                    }
                    dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash);
                    DAP_DELETE(l_atom_copy);
                }
                l_chain->callback_atom_iter_delete(l_atom_iter);
                pthread_rwlock_unlock(&l_chain->atoms_rwlock);
            }else{
                if (!l_pkt_item)
                    log_it(L_WARNING, "chain packet item is NULL");
                if (l_atom_copy_size)
                    log_it(L_WARNING, "chain packet item data size is zero");
            }
        }else{
            log_it(L_WARNING, "pkt copy is NULL");
        }
        if (l_pkt_item)
            DAP_DELETE(l_pkt_item);

        if (l_pkt_iter)
            DAP_DELETE(l_pkt_iter);
    }else
        log_it(L_WARNING, "In proc thread got CHAINS stream ch packet with zero data");

    if(l_sync_request->pkt_list){
        return false;
    }else{
        DAP_DELETE(l_sync_request);
        return true;
    }
}

/**
 * @brief s_gdb_in_pkt_error_worker_callback
 * @param a_thread
 * @param a_arg
 */
static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_worker, void *a_arg)
{
    UNUSED(a_worker);
    struct ch_chain_pkt_in * l_pkt_in = (struct ch_chain_pkt_in*) a_arg;

    dap_stream_ch_chain_pkt_write_error_unsafe(l_pkt_in->ch, l_pkt_in->pkt->pkt_hdr.net_id.uint64,
                                        l_pkt_in->pkt->pkt_hdr.chain_id.uint64, l_pkt_in->pkt->pkt_hdr.cell_id.uint64,
                                        "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED");
    dap_stream_ch_set_ready_to_write_unsafe(l_pkt_in->ch, true);
    DAP_DELETE(l_pkt_in);
}


/**
 * @brief s_gdb_in_pkt_callback
 * @param a_thread
 * @param a_arg
 * @return
 */
static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
{
    struct sync_request *l_sync_request = (struct sync_request *) a_arg;

    dap_list_t *l_pkt_iter = l_sync_request->pkt_list;
    if (l_pkt_iter) {
        l_sync_request->pkt_list =l_sync_request->pkt_list->next;
        if (l_sync_request->pkt_list )
            l_sync_request->pkt_list->prev = NULL;

        dap_chain_pkt_item_t *l_pkt_item = (dap_chain_pkt_item_t *)l_pkt_iter->data;
        size_t l_data_obj_count = 0;
        // deserialize data & Parse data from dap_db_log_pack()
        dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_pkt_item->pkt_data, l_pkt_item->pkt_data_size, &l_data_obj_count);
        if (s_debug_more){
            if (l_data_obj_count)
                log_it(L_INFO, "In: GLOBAL_DB parse: pkt_data_size=%zd, l_data_obj_count = %d",l_pkt_item->pkt_data_size, l_data_obj_count );
            else if (l_pkt_item->pkt_data){
                log_it(L_WARNING, "In: GLOBAL_DB parse: pkt_data_size=%zd, error=\"No data objs after unpack\"", l_pkt_item->pkt_data_size, l_data_obj_count );
            }else
                 log_it(L_WARNING, "In: GLOBAL_DB parse: packet in list with NULL data(pkt_data_size:%zd)", l_pkt_item->pkt_data_size);
        }

        for(size_t i = 0; i < l_data_obj_count; i++) {
            // timestamp for exist obj
            time_t l_timestamp_cur = 0;
            // obj to add
            dap_store_obj_t* l_obj = l_store_obj + i;
            // read item from base;
            size_t l_count_read = 0;
            dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group,
                    l_obj->key, &l_count_read);
            // get timestamp for the exist entry
            if(l_read_obj)
                l_timestamp_cur = l_read_obj->timestamp;
            // get timestamp for the deleted entry
            else
            {
                l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key);
            }

            //check whether to apply the received data into the database
            bool l_apply = true;
            if(l_obj->timestamp < l_timestamp_cur)
                l_apply = false;
            else if(l_obj->type == 'd') {
                // already deleted
                if(!l_read_obj)
                    l_apply = false;
            }
            else if(l_obj->type == 'a') {
                bool l_is_the_same_present = false;
                if(l_read_obj &&
                        l_read_obj->value_len == l_obj->value_len &&
                        !memcmp(l_read_obj->value, l_obj->value, l_obj->value_len))
                    l_is_the_same_present = true;
                // this data already present in global_db and not obsolete (out of date)
                if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp))
                    l_apply = false;
            }
            if(l_read_obj)
                dap_store_obj_free(l_read_obj, l_count_read);

            if (s_debug_more){
                char l_ts_str[50];
                dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp);
                log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\""
                        " timestamp=\"%s\" value_len=%u  ",
                        (char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group,
                        l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);
            }

            if(!l_apply) {
                // If request was from defined node_addr we update its state
                if(l_sync_request->request.node_addr.uint64) {
                    dap_db_set_last_id_remote(l_sync_request->request.node_addr.uint64, l_obj->id);
                }
                continue;
            }

            // apply received transaction
            dap_chain_t *l_chain = dap_chain_find_by_id(l_pkt_item->pkt_hdr.net_id, l_pkt_item->pkt_hdr.chain_id);
            if(l_chain) {
                if(l_chain->callback_add_datums_with_group){
                    void * restrict l_store_obj_value = l_store_obj->value;
                    l_chain->callback_add_datums_with_group(l_chain,
                            (dap_chain_datum_t** restrict) l_store_obj_value, 1,
                            l_store_obj[i].group);
                }
            }
            // save data to global_db
            if(!dap_chain_global_db_obj_save(l_obj, 1)) {
                struct ch_chain_pkt_in * l_pkt_in = DAP_NEW_Z(struct ch_chain_pkt_in);
                l_pkt_in->ch = l_sync_request->ch;
                l_pkt_in->pkt = l_pkt_item;
                dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id,s_gdb_in_pkt_error_worker_callback, l_sync_request);
            } else {
                // If request was from defined node_addr we update its state
                if(l_sync_request->request.node_addr.uint64) {
                    dap_db_set_last_id_remote(l_sync_request->request.node_addr.uint64, l_obj->id);
                }
                if (s_debug_more)
                    log_it(L_DEBUG, "Added new GLOBAL_DB history pack");
            }
        }
        if(l_store_obj)
            dap_store_obj_free(l_store_obj, l_data_obj_count);
        if (l_pkt_item)
            DAP_DELETE(l_pkt_item);
        if (l_pkt_iter)
            DAP_DELETE(l_pkt_iter);
        if(l_sync_request->pkt_list)
            return false;
        else{
            DAP_DELETE(l_sync_request);
            return true;
        }

    } else {
        log_it(L_WARNING, "In proc thread got GDB stream ch packet with zero data");
        return true;
    }
}

/**
 * @brief s_stream_ch_packet_in
 * @param a_ch
 * @param a_arg
 */
void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
{
    dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
    if (!l_ch_chain) {
        return;
    }
    dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg;
    dap_stream_ch_chain_pkt_t * l_chain_pkt = (dap_stream_ch_chain_pkt_t *) l_ch_pkt->data;
    if (!l_chain_pkt) {
        return;
    }
    if (l_ch_pkt->hdr.size< sizeof (l_chain_pkt->hdr) ){
        log_it(L_ERROR, "Corrupted packet: too small size %zd, smaller then header size %zd", l_ch_pkt->hdr.size,
               sizeof(l_chain_pkt->hdr));

    }

    size_t l_chain_pkt_data_size = l_ch_pkt->hdr.size-sizeof (l_chain_pkt->hdr) ;
    uint16_t l_acl_idx = dap_chain_net_acl_idx_by_id(l_chain_pkt->hdr.net_id );
    if (l_acl_idx == (uint16_t)-1) {
        if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR) {
            if(l_ch_chain->callback_notify_packet_in) {
                l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt,
                                                      l_chain_pkt_data_size, l_ch_chain->callback_notify_arg);
            }
        } else {

            log_it(L_ERROR, "Invalid request from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x" chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", a_ch->stream->esocket->remote_addr_str?
                       a_ch->stream->esocket->remote_addr_str: "<unknown>", l_chain_pkt->hdr.ext_id,
                   l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64,
                   l_chain_pkt->hdr.cell_id.uint64);
            dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                                                l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                                                "ERROR_NET_INVALID_ID");
            // Who are you? I don't know you! go away!
            a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE;
        }
        return;
    }
    if (a_ch->stream->session->acl && !a_ch->stream->session->acl[l_acl_idx]) {
        log_it(L_WARNING, "Unauthorized request attempt from %s to network %s", a_ch->stream->esocket->remote_addr_str?
                   a_ch->stream->esocket->remote_addr_str: "<unknown>",
               dap_chain_net_by_id(l_chain_pkt->hdr.net_id)->pub.name);
        dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                                            l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                                            "ERROR_NET_NOT_AUTHORIZED");
        return;
    }
    switch (l_ch_pkt->hdr.type) {
        /// --- GDB update ---
        // Request for gdbs list update
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ:{
        }break;
        // Response with metadata organized in TSD
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD:{

        }break;
        // If requested - begin to send atom hashes
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_START:{
            l_ch_chain->state = CHAIN_STATE_UPDATE_GLOBAL_DB_REMOTE;
            dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
        }break;
        // Response with gdb element hashes and sizes
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB:{
            for ( dap_stream_ch_chain_update_element_t * l_element =(dap_stream_ch_chain_update_element_t *) l_chain_pkt->data;
                   (size_t) (((byte_t*)l_element) - l_chain_pkt->data ) < l_chain_pkt_data_size;
                  l_element++){
                dap_stream_ch_chain_hash_item_t * l_hash_item = NULL;
                HASH_FIND(hh,l_ch_chain->remote_gdbs, &l_element->hash, sizeof (l_element->hash), l_hash_item );
                if( ! l_hash_item ){
                    l_hash_item = DAP_NEW(dap_stream_ch_chain_hash_item_t);
                    memcpy(&l_hash_item->hash, &l_element->hash, sizeof (l_element->hash));
                    l_hash_item->size = l_element->size;
                    HASH_ADD(hh, l_ch_chain->remote_gdbs, hash, sizeof (l_hash_item->hash), l_hash_item);
                    if (s_debug_more){
                        char l_hash_str[72]={ [0]='\0'};
                        dap_chain_hash_fast_to_str(&l_hash_item->hash,l_hash_str,sizeof (l_hash_str));
                        log_it(L_INFO,"In: Updated remote hash gdb list with %s ", l_hash_str);
                    }
                }
            }
        }break;
        // End of response
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END:{
            l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS_REMOTE ; // Switch on update chains hashes
            dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ ,
                                                 l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
                                                 l_ch_chain->request_hdr.cell_id.uint64, NULL, 0);
        }break;

        /// --- Chains update ---
        // Request for atoms list update
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ:{
            if(l_ch_chain->is_on_request){
                log_it(L_WARNING, "Can't process UPDATE_CHAINS_REQ request because its already busy with syncronization");
                dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                        "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS");
                dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
                break;
            }

            l_ch_chain->is_on_request=true;
            memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr));

            if(s_debug_more)
                log_it(L_INFO, "In:  UPDATE_CHAINS_REQ pkt: net 0x%016x chain 0x%016x cell 0x%016x", l_ch_chain->request_hdr.net_id.uint64 ,
                                    l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64);
            dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
            if(l_chain) {
                struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request);
                l_sync_request->ch = a_ch;
                l_sync_request->worker = a_ch->stream_worker->worker;
                l_sync_request->remote_atoms = l_ch_chain->remote_atoms;
                l_ch_chain->remote_atoms = NULL;

                memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request));
                memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr));
                dap_proc_queue_add_callback_inter(  a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_sync_request );
            }
        }break;
        // Response with metadata organized in TSD
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD :{

        }break;

        // If requested - begin to send atom hashes
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END:
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START:{
            if(s_debug_more)
                log_it(L_INFO,"In: Requested update chains start");
            memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr , sizeof (l_ch_chain->request_hdr));

            if(l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END )
                l_ch_chain->request_updates_complete = true;
            l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS_REMOTE;

            dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id);
            if (l_ch_chain){
                l_ch_chain->request_atom_iter = l_chain->callback_atom_iter_create(l_chain);
                l_chain->callback_atom_iter_get_first(l_ch_chain->request_atom_iter, NULL);
                dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
            }else{
                log_it(L_ERROR, "Invalid UPDATE_CHAINS_START request from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x" chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", a_ch->stream->esocket->remote_addr_str?
                           a_ch->stream->esocket->remote_addr_str: "<unknown>", l_chain_pkt->hdr.ext_id,
                       l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64,
                       l_chain_pkt->hdr.cell_id.uint64);
                dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                                                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                                                    "ERROR_NET_INVALID_ID");
                // Who are you? I don't know you! go away!
                a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE;
            }

        }break;

        // Response with atom hashes and sizes
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS :{
            uint l_count_added=0;
            uint l_count_total=0;

            dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
            if (! l_chain){
                log_it(L_ERROR, "Invalid UPDATE_CHAINS packet from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x" chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", a_ch->stream->esocket->remote_addr_str?
                           a_ch->stream->esocket->remote_addr_str: "<unknown>", l_chain_pkt->hdr.ext_id,
                       l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64,
                       l_chain_pkt->hdr.cell_id.uint64);
                dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                                                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                                                    "ERROR_NET_INVALID_ID");
                // Who are you? I don't know you! go away!
                a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE;
                break;
            }

            dap_chain_atom_iter_t * l_iter = l_chain->callback_atom_iter_create(l_chain);


            for ( dap_stream_ch_chain_update_element_t * l_element =(dap_stream_ch_chain_update_element_t *) l_chain_pkt->data;
                   (size_t) (((byte_t*)l_element) - l_chain_pkt->data ) < l_chain_pkt_data_size;
                  l_element++){
                dap_stream_ch_chain_hash_item_t * l_hash_item = NULL;
                HASH_FIND(hh,l_ch_chain->remote_atoms , &l_element->hash, sizeof (l_element->hash), l_hash_item );
                if( ! l_hash_item ){
                    l_hash_item = DAP_NEW(dap_stream_ch_chain_hash_item_t);
                    memcpy(&l_hash_item->hash, &l_element->hash, sizeof (l_element->hash));
                    l_hash_item->size = l_element->size;
                    HASH_ADD(hh, l_ch_chain->remote_atoms, hash, sizeof (l_hash_item->hash), l_hash_item);
                    l_count_added++;
                    /*
                    if (s_debug_more){
                        char l_hash_str[72]={ [0]='\0'};
                        dap_chain_hash_fast_to_str(&l_hash_item->hash,l_hash_str,sizeof (l_hash_str));
                        log_it(L_DEBUG,"In: Updated remote atom hash list with %s ", l_hash_str);
                    }*/
                }
                l_count_total++;
            }
            if (s_debug_more)
                log_it(L_INFO,"In: Added %u from %u remote atom hash  in list",l_count_added,l_count_total);
            l_chain->callback_atom_iter_delete(l_iter);
        }break;
        // End of response
        //case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END:{
        //    l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS_REMOTE; // Switch on update chains hashes to remote
        //    dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
        //}break;

        // first packet of data with source node address
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN: {
            l_ch_chain->stats_request_atoms_processed =0;
            if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)){
                    log_it(L_INFO, "From "NODE_ADDR_FP_STR": FIRST_CHAIN data_size=%d net 0x%016x chain 0x%016x cell 0x%016x ",
                           NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr),
                           l_chain_pkt_data_size,      l_ch_chain->request_hdr.net_id.uint64 ,
                           l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64);
            }else{
                log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN", l_chain_pkt_data_size);
                dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                        "ERROR_CHAIN_PACKET_TYPE_FIRST_CHAIN_INCORRET_DATA_SIZE(%zd/%zd)",l_chain_pkt_data_size, sizeof(dap_chain_node_addr_t));
            }
        }
        break;

        case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: {
            log_it(L_INFO, "In:  SYNCED_ALL net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 ,
                   l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
        }
            break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: {
                log_it(L_INFO, "In:  SYNCED_GLOBAL_DB: net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 ,
                       l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
        }
            break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: {
            if (s_debug_more)
                log_it(L_INFO, "In:  SYNCED_GLOBAL_DB_GROUP pkt net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 ,
                       l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
        }
            break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: {
            if (s_debug_more)
                log_it(L_INFO, "In:  SYNCED_GLOBAL_DB_GROUP pkt net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 ,
                       l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64);
        }
            break;

        case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: {
            l_ch_chain->request_updates_complete = false;
            if (dap_log_level_get()<= L_INFO){
                char *l_hash_from_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_from);
                char *l_hash_to_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_to);
                log_it(L_INFO, "In:  SYNCED_CHAINS: between %s and %s",l_hash_from_str?l_hash_from_str:"(null)",
                       l_hash_to_str? l_hash_to_str: "(null)");
                if(l_hash_from_str)
                    DAP_DELETE(l_hash_from_str);
                if(l_hash_to_str)
                    DAP_DELETE(l_hash_to_str);
            }
            //l_ch_chain->state = CHAIN_STATE_IDLE;

        }
            break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: {
            // fill ids
            if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) {
                if(l_ch_chain->is_on_request){
                    log_it(L_WARNING, "Can't process SYNC_CHAINS request because its already busy with syncronization");
                    dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                            l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                            "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS");
                    dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
                    break;
                }
                l_ch_chain->is_on_request=true;
                memcpy(&l_ch_chain->request, l_chain_pkt->data, l_chain_pkt_data_size);
                memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr));
                char *l_hash_from_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_from);
                char *l_hash_to_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_to);
                log_it(L_INFO, "In:  SYNC_CHAINS pkt: net 0x%016x chain 0x%016x cell 0x%016x between %s and %s", l_ch_chain->request_hdr.net_id.uint64 ,
                       l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64,
                       l_hash_from_str? l_hash_from_str: "(null)", l_hash_to_str?l_hash_to_str:"(null)");
                dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
                if(l_chain) {
                    if(l_ch_chain->state != CHAIN_STATE_IDLE) {
                        l_ch_chain->is_on_request=false;
                        log_it(L_INFO, "Can't process SYNC_CHAINS request between %s and %s because not in idle state",
                               l_hash_from_str? l_hash_from_str:"(null)",
                               l_hash_to_str?l_hash_to_str:"(null)");
                        dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                                l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                                "ERROR_STATE_NOT_IN_IDLE");
                        dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
                    } else {
                        struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request);
                        l_sync_request->ch = a_ch;
                        l_sync_request->worker = a_ch->stream_worker->worker;
                        memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request));
                        memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr));
                        dap_proc_queue_add_callback_inter(  a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_sync_request );
                    }
                }
                DAP_DELETE(l_hash_from_str);
                DAP_DELETE(l_hash_to_str);
            }else{
                log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: Wrong chain packet size %zd when expected %zd",
                       l_chain_pkt_data_size, sizeof(l_ch_chain->request));
                dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                        "ERROR_CHAIN_PKT_DATA_SIZE" );
            }
        }
            break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: {
            if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) {
                if(l_ch_chain->state != CHAIN_STATE_IDLE) {
                    log_it(L_WARNING, "Can't process SYNC_GLOBAL_DB request because not in idle state");
                    dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                            l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                            "ERROR_STATE_NOT_IN_IDLE");
                    dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
                    break;
                } else { // receive the latest global_db revision of the remote node -> go to send mode
                    if(l_ch_chain->is_on_request){
                        log_it(L_WARNING, "Can't process SYNC_CHAINS request because its already busy with syncronization");
                        dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                                l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                                "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS");
                        dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
                        break;
                    }
                    l_ch_chain->is_on_request = true;
                    dap_stream_ch_chain_sync_request_t * l_request =
                            (dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data;
                    memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size);
                    memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr));
                    log_it(L_INFO, "In:  SYNC_GLOBAL_DB pkt: net 0x%016x chain 0x%016x cell 0x%016x, range between %u and %u",
                           l_ch_chain->request_hdr.net_id.uint64 , l_ch_chain->request_hdr.chain_id.uint64,
                           l_ch_chain->request_hdr.cell_id.uint64, l_ch_chain->request.id_start, l_ch_chain->request.id_end );

                    struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request);
                    l_sync_request->ch = a_ch;
                    l_sync_request->worker = a_ch->stream_worker->worker;
                    memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request));
                    memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr));
                    dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_sync_request);
                }
            }else{
                log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request));
                dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                        "ERROR_CHAIN_PKT_DATA_SIZE" );
            }
        }
            break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: {
            if(l_chain_pkt_data_size) {
                dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
                if(l_chain) {
                    // Expect atom element in
                    if(l_chain_pkt_data_size > 0) {
                        dap_chain_pkt_item_t *l_pkt_item = DAP_NEW_Z(dap_chain_pkt_item_t);
                        memcpy(&l_pkt_item->pkt_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr));
                        l_pkt_item->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size);
                        memcpy(l_pkt_item->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
                        l_pkt_item->pkt_data_size = l_chain_pkt_data_size;

                        struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request);
                        l_sync_request->ch = a_ch;
                        l_sync_request->worker = a_ch->stream_worker->worker;
                        memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request));
                        memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr));

                        l_sync_request->pkt_list = dap_list_append(l_sync_request->pkt_list, l_pkt_item);
                        if (s_debug_more){
                            dap_chain_hash_fast_t l_atom_hash={0};
                            dap_hash_fast(l_chain_pkt->data, l_chain_pkt_data_size ,&l_atom_hash);
                            char *l_atom_hash_str= dap_chain_hash_fast_to_str_new(&l_atom_hash);
                            log_it(L_INFO, "In: CHAIN pkt: atom hash %s (size %zd)", l_atom_hash_str, l_chain_pkt_data_size);
                            DAP_DELETE(l_atom_hash_str);
                        }
                        dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_in_chains_callback, l_sync_request);
                    } else {
                        log_it(L_WARNING, "Empty chain packet");
                        dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                                l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                                "ERROR_CHAIN_PACKET_EMPTY");
                        dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
                    }
                }
            }
        }
            break;
            // first packet of data with source node address
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB:
            if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)){
                memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size);
               log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d net 0x%016x chain 0x%016x cell 0x%016x from address "NODE_ADDR_FP_STR,
                      l_chain_pkt_data_size,   l_chain_pkt->hdr.net_id.uint64 ,
                      l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr) );
            }else {
               log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB", l_chain_pkt_data_size);
               dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                       l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                       "ERROR_CHAIN_PACKET_TYPE_FIRST_GLOBAL_DB_INCORRET_DATA_SIZE");
            }
        break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB: {
            if(s_debug_more)
                log_it(L_INFO, "In: GLOBAL_DB data_size=%d ", l_chain_pkt_data_size);
            // get transaction and save it to global_db
            if(l_chain_pkt_data_size > 0) {
                dap_chain_pkt_item_t *l_pkt_item = DAP_NEW_Z(dap_chain_pkt_item_t);
                memcpy(&l_pkt_item->pkt_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr));
                l_pkt_item->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size);
                memcpy(l_pkt_item->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
                l_pkt_item->pkt_data_size = l_chain_pkt_data_size;

                struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request);
                l_sync_request->ch = a_ch;
                l_sync_request->worker = a_ch->stream_worker->worker;
                memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request));
                memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr));
                l_sync_request->pkt_list = dap_list_append(l_sync_request->pkt_list, l_pkt_item);
                dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_gdb_in_pkt_proc_callback, l_sync_request);
            } else {
                log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size");
                dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                        "ERROR_GLOBAL_DB_PACKET_EMPTY");
                dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
            }
        }
            break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: {
            dap_stream_ch_chain_sync_request_t l_sync_gdb = {0};
            memcpy(&l_sync_gdb, l_chain_pkt->data, l_chain_pkt_data_size);
            l_sync_gdb.id_start = dap_db_get_last_id_remote(l_sync_gdb.node_addr.uint64);
            dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id);
            l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
            log_it(L_INFO, "In:  SYNC_GLOBAL_DB_RVRS pkt: net 0x%016x chain 0x%016x cell 0x%016x, request gdb sync from %u", l_chain_pkt->hdr.net_id.uint64 ,
                    l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id, l_sync_gdb.id_start );
            dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id.uint64,
                                          l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_sync_gdb, sizeof(l_sync_gdb));
        }
            break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: {
            if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) {
                dap_stream_ch_chain_sync_request_t l_request={0};
                dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
                if( l_chain){
                    dap_chain_get_atom_last_hash(l_chain,& l_request.hash_from); // Move away from i/o reactor to callback processor
                    if( dap_log_level_get()<= L_INFO){
                        char l_hash_from_str[70]={[0]='\0'};
                        dap_chain_hash_fast_to_str(&l_request.hash_from,l_hash_from_str,sizeof (l_hash_from_str)-1);
                        log_it(L_INFO, "In:  SYNC_CHAINS_RVRS pkt: net 0x%016x chain 0x%016x cell 0x%016x request chains sync from %s",
                               l_chain_pkt->hdr.net_id.uint64 , l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                               l_hash_from_str[0] ? l_hash_from_str :"(null)");
                    }
                    dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id.uint64,
                                                  l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_request, sizeof(l_request));
                }
            }else{
                log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request));
                dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                        l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                        "ERROR_CHAIN_PKT_DATA_SIZE" );
            }
        }
            break;
        case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR:{
            char * l_error_str = (char*)l_chain_pkt->data;
            if(l_chain_pkt_data_size>1)
                l_error_str[l_chain_pkt_data_size-1]='\0'; // To be sure that nobody sends us garbage
                                                           // without trailing zero
            log_it(L_WARNING,"In: ERROR packet: '%s'",l_ch_chain->node_client, l_chain_pkt_data_size>1?
                       l_error_str:"<empty>");
        }break;
        default: {
            dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
                                                l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
                                                "ERROR_UNKNOWN_CHAIN_PKT_TYPE");
            }
    }
    if(l_ch_chain->callback_notify_packet_in)
        l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt,
                                              l_chain_pkt_data_size, l_ch_chain->callback_notify_arg);
}


/**
 * @brief dap_stream_ch_chain_go_idle
 * @param a_ch_chain
 */
void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain)
{
    a_ch_chain->is_on_request = false;
    a_ch_chain->state = CHAIN_STATE_IDLE;
    if(s_debug_more)
        log_it(L_INFO, "Go in CHAIN_STATE_IDLE");

    // Cleanup after request
    memset(&a_ch_chain->request, 0, sizeof(a_ch_chain->request));
    memset(&a_ch_chain->request_hdr, 0, sizeof(a_ch_chain->request_hdr));
    if(a_ch_chain->request_atom_iter)
        if(a_ch_chain->request_atom_iter->chain)
            if(a_ch_chain->request_atom_iter->chain->callback_atom_iter_delete){
                a_ch_chain->request_atom_iter->chain->callback_atom_iter_delete(a_ch_chain->request_atom_iter);
                return;
            }
    DAP_DEL_Z(a_ch_chain->request_atom_iter);
}

static void s_process_gdb_iter(dap_stream_ch_t *a_ch)
{
    dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
    dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs;
    dap_store_obj_pkt_t *l_pkt = (dap_store_obj_pkt_t *)l_ch_chain->request_db_iter->data;
    uint32_t l_pkt_size = sizeof(dap_store_obj_pkt_t) + l_pkt->data_size;
    if( s_debug_more)
        log_it(L_INFO, "Send one global_db record packet len=%d (rest=%d/%d items)", l_pkt_size,
           dap_db_log_list_get_count_rest(l_db_list), dap_db_log_list_get_count(l_db_list));
    dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB,
                                         l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
                                         l_ch_chain->request_hdr.cell_id.uint64, l_pkt, l_pkt_size);
    dap_list_t *l_iter = dap_list_next(l_ch_chain->request_db_iter);
    if (l_iter) {
        l_ch_chain->request_db_iter = l_iter;
    } else {
        l_ch_chain->stats_request_gdb_processed++;
        l_ch_chain->request_db_iter = dap_list_first(l_ch_chain->request_db_iter);
        dap_list_free_full(l_ch_chain->request_db_iter, free);
        l_ch_chain->request_db_iter = NULL;
    }
}

/**
 * @brief s_stream_ch_packet_out
 * @param ch
 * @param arg
 */
void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
{
    UNUSED(a_arg);

    dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);

    switch (l_ch_chain->state) {
        // Synchronize GDB
        case CHAIN_STATE_SYNC_GLOBAL_DB: {
            if (l_ch_chain->request_db_iter) {
                s_process_gdb_iter(a_ch);
            } else {
                dap_global_db_obj_t *l_obj;
                do { // Get log diff
                    size_t l_item_size_out = 0;
                    l_obj = dap_db_log_list_get(l_ch_chain->request_global_db_trs);
                    l_ch_chain->request_db_iter = dap_db_log_pack(l_obj, &l_item_size_out);
                    if (l_ch_chain->request_db_iter && l_item_size_out) {
                        break;
                    }
                    // Item not found, maybe it has deleted? Then go to the next item
                } while (l_obj);
                if (l_ch_chain->request_db_iter) {
                    s_process_gdb_iter(a_ch);
                } else {
                    // free log list
                    dap_db_log_list_delete(l_ch_chain->request_global_db_trs);
                    l_ch_chain->request_global_db_trs = NULL;
                    log_it( L_INFO,"Syncronized database:  last id %llu, items syncronyzed %llu ", dap_db_log_get_last_id(),
                        l_ch_chain->stats_request_gdb_processed );
                    // last message
                    l_ch_chain->is_on_request = false;
                    dap_stream_ch_chain_sync_request_t l_request = {};
                    dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
                                                         l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
                                                         l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request));
                    dap_stream_ch_chain_go_idle(l_ch_chain);
                    if (l_ch_chain->callback_notify_packet_out)
                        l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
                                                               NULL, 0, l_ch_chain->callback_notify_arg);
                }
            }
        } break;

        // Update list of atoms to remote
        case CHAIN_STATE_UPDATE_CHAINS_REMOTE:{
            dap_stream_ch_chain_update_element_t * l_data= DAP_NEW_Z_SIZE(dap_stream_ch_chain_update_element_t,sizeof (dap_stream_ch_chain_update_element_t)*s_update_pack_size);
            size_t l_data_size=0;
            for(uint_fast16_t n=0; n<s_update_pack_size && (l_ch_chain->request_atom_iter && l_ch_chain->request_atom_iter->cur);n++){
                memcpy(&l_data[n].hash, l_ch_chain->request_atom_iter->cur_hash, sizeof (l_data[n].hash));
                // Shift offset counter
                l_data_size += sizeof (dap_stream_ch_chain_update_element_t);
                // Then get next atom
                l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL);
            }
            if (l_data_size){
                if(s_debug_more)
                    log_it(L_DEBUG,"Out: UPDATE_CHAINS size %zd sent ",l_data_size);
                dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS,
                                                     l_ch_chain->request_hdr.net_id.uint64,
                                                     l_ch_chain->request_hdr.chain_id.uint64,
                                                     l_ch_chain->request_hdr.cell_id.uint64,
                                                     l_data,l_data_size);
            }
            if(!l_data_size  ||  !l_ch_chain->request_atom_iter){ // We over with all the hashes here
                if(s_debug_more)
                    log_it(L_INFO,"Out: UPDATE_CHAINS_END sent ");

                if (l_ch_chain->request_updates_complete){
                    dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ ,
                                                     l_ch_chain->request_hdr.net_id.uint64,
                                                     l_ch_chain->request_hdr.chain_id.uint64,
                                                     l_ch_chain->request_hdr.cell_id.uint64,
                                                     NULL,0);
                }else
                    dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END,
                                                     l_ch_chain->request_hdr.net_id.uint64,
                                                     l_ch_chain->request_hdr.chain_id.uint64,
                                                     l_ch_chain->request_hdr.cell_id.uint64,
                                                     NULL,0);
               dap_stream_ch_set_ready_to_write_unsafe(a_ch,false);
               l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS;
            }
        }break;

        // Synchronize chains
        case CHAIN_STATE_SYNC_CHAINS: {
            bool l_was_sent_smth=false;
            // Process one chain from l_ch_chain->request_atom_iter
            // Pack loop to skip quicker
            for(uint_fast16_t k=0; k<s_skip_in_reactor_count && l_ch_chain->request_atom_iter && l_ch_chain->request_atom_iter->cur; k++){
                // Check if present and skip if present
                dap_stream_ch_chain_hash_item_t * l_hash_item = NULL;
                HASH_FIND(hh,l_ch_chain->remote_atoms, l_ch_chain->request_atom_iter->cur_hash , sizeof (l_hash_item->hash), l_hash_item );
                if( l_hash_item ){ // If found - skip it
                    if(s_debug_more){
                        char l_request_atom_hash_str[81]={[0]='\0'};
                        dap_chain_hash_fast_to_str(l_ch_chain->request_atom_iter->cur_hash,l_request_atom_hash_str,sizeof (l_request_atom_hash_str));
                        log_it(L_DEBUG, "Out CHAIN: skip atom hash %s because its already present in remote atom hash table",
                                        l_request_atom_hash_str);
                    }
                }else{
                    l_hash_item = DAP_NEW_Z(dap_stream_ch_chain_hash_item_t);
                    if(s_debug_more){
                        dap_hash_fast(l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size,&l_hash_item->hash);
                        char *l_atom_hash_str= dap_chain_hash_fast_to_str_new(&l_hash_item->hash);

                        log_it(L_INFO, "Out CHAIN pkt: atom hash %s (size %zd) ", l_atom_hash_str, l_ch_chain->request_atom_iter->cur_size);
                        DAP_DELETE(l_atom_hash_str);
                    }
                    dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id.uint64,
                                                         l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64,
                                                         l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size);
                    l_was_sent_smth = true;
                    l_ch_chain->stats_request_atoms_processed++;

                    l_hash_item->size = l_ch_chain->request_atom_iter->cur_size;
                    // Because we sent this atom to remote - we record it to not to send it twice
                    HASH_ADD(hh, l_ch_chain->remote_atoms, hash, sizeof (l_hash_item->hash), l_hash_item);
                    break; // If sent smth - break out from pack loop
                }
                // Then get next atom and populate new last
                l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL);
            }
            if(!l_ch_chain->request_atom_iter ||
                    ( l_ch_chain->request_atom_iter &&(! l_ch_chain->request_atom_iter->cur) ) )  { // All chains synced
                dap_stream_ch_chain_sync_request_t l_request = {0};
                // last message
                l_was_sent_smth = true;
                dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS,
                                                     l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
                                                     l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request));
                log_it( L_INFO,"Synced: %llu atoms processed", l_ch_chain->stats_request_atoms_processed);
                dap_stream_ch_chain_go_idle(l_ch_chain);
                if (l_ch_chain->callback_notify_packet_out)
                    l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, NULL,
                                                           0, l_ch_chain->callback_notify_arg);
            }
            if (! l_was_sent_smth ){
                // Sending dumb packet with nothing to inform remote thats we're just skiping atoms, nothing freezed
                dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD,
                                                     l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
                                                     l_ch_chain->request_hdr.cell_id.uint64, NULL, 0);
            }
        } break;
        default: break;
    }
}