From 782eda8972474ea993e6b53b431cda97877c9b0b Mon Sep 17 00:00:00 2001 From: "roman.khlopkov" <roman.khlopkov@demlabs.net> Date: Mon, 12 Feb 2024 12:26:53 +0300 Subject: [PATCH] [*] Validators clustering refinement --- modules/chain/dap_chain.c | 4 +- modules/chain/dap_chain_ch.c | 218 +++++++++--------- modules/chain/dap_chain_ch_pkt.c | 44 ++-- modules/chain/include/dap_chain_ch.h | 38 +-- modules/chain/include/dap_chain_ch_pkt.h | 44 ++-- .../dap_stream_ch_chain_net_srv.c | 2 +- .../chain-net/dap_stream_ch_chain_net.c | 8 +- .../include/dap_stream_ch_chain_net.h | 4 +- .../consensus/esbocs/dap_chain_cs_esbocs.c | 76 ++++-- .../esbocs/include/dap_chain_cs_esbocs.h | 2 + modules/net/dap_chain_net.c | 16 +- modules/net/dap_chain_node_cli_cmd.c | 8 +- modules/net/dap_chain_node_client.c | 34 +-- modules/net/include/dap_chain_net.h | 1 + .../dap_chain_net_srv_stake_pos_delegate.c | 11 +- .../dap_chain_net_srv_stake_pos_delegate.h | 2 +- 16 files changed, 280 insertions(+), 232 deletions(-) diff --git a/modules/chain/dap_chain.c b/modules/chain/dap_chain.c index f22458c787..d6091478ba 100644 --- a/modules/chain/dap_chain.c +++ b/modules/chain/dap_chain.c @@ -713,8 +713,8 @@ ssize_t dap_chain_atom_save(dap_chain_cell_t *a_chain_cell, const uint8_t *a_ato if (a_new_atom_hash) { // Atom is new and need to be distributed for the net dap_cluster_t *l_net_cluster = dap_cluster_find(l_chain->net_id.uint64); if (l_net_cluster) { - size_t l_pkt_size = a_atom_size + sizeof(dap_stream_ch_chain_pkt_t); - dap_stream_ch_chain_pkt_t *l_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_pkt_size); + size_t l_pkt_size = a_atom_size + sizeof(dap_chain_ch_pkt_t); + dap_chain_ch_pkt_t *l_pkt = DAP_NEW_Z_SIZE(dap_chain_ch_pkt_t, l_pkt_size); if (l_pkt) { l_pkt->hdr.version = 2; l_pkt->hdr.data_size = a_atom_size; diff --git a/modules/chain/dap_chain_ch.c b/modules/chain/dap_chain_ch.c index 0b59fcd393..2bff87638e 100644 --- a/modules/chain/dap_chain_ch.c +++ b/modules/chain/dap_chain_ch.c @@ -68,18 +68,18 @@ #include "dap_chain_ch_pkt.h" #include "dap_stream_ch_gossip.h" -#define LOG_TAG "dap_stream_ch_chain" +#define LOG_TAG "dap_chain_ch" struct sync_request { dap_worker_t * worker; dap_stream_ch_uuid_t ch_uuid; - dap_stream_ch_chain_sync_request_t request; - dap_stream_ch_chain_pkt_hdr_t request_hdr; + dap_chain_ch_sync_request_t request; + dap_chain_ch_pkt_hdr_t request_hdr; dap_chain_pkt_item_t pkt; - dap_stream_ch_chain_hash_item_t *remote_atoms; // Remote atoms - dap_stream_ch_chain_hash_item_t *remote_gdbs; // Remote gdbs + dap_chain_ch_hash_item_t *remote_atoms; // Remote atoms + dap_chain_ch_hash_item_t *remote_gdbs; // Remote gdbs uint64_t stats_request_elemets_processed; int last_err; @@ -95,8 +95,8 @@ struct sync_request }; }; -static void s_ch_chain_go_idle(dap_stream_ch_chain_t *a_ch_chain); -static inline bool s_ch_chain_get_idle(dap_stream_ch_chain_t *a_ch_chain) { return a_ch_chain->state == CHAIN_STATE_IDLE; } +static void s_ch_chain_go_idle(dap_chain_ch_t *a_ch_chain); +static inline bool s_ch_chain_get_idle(dap_chain_ch_t *a_ch_chain) { return a_ch_chain->state == CHAIN_STATE_IDLE; } static void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg); static void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg); @@ -119,7 +119,7 @@ static bool s_gdb_in_pkt_proc_set_raw_callback(dap_global_db_instance_t *a_dbi, const size_t a_values_total, const size_t a_values_count, dap_store_obj_t *a_values, void *a_arg); static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_thread, void *a_arg); -static void s_free_log_list_gdb ( dap_stream_ch_chain_t * a_ch_chain); +static void s_free_log_list_gdb ( dap_chain_ch_t * a_ch_chain); static void s_stream_ch_chain_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, @@ -134,16 +134,16 @@ static uint_fast16_t s_skip_in_reactor_count=50; // Number of hashes packed to s enum {MEMSTAT$K_STM_CH_CHAIN, MEMSTAT$K_NR}; static dap_memstat_rec_t s_memstat [MEMSTAT$K_NR] = { - {.fac_len = sizeof(LOG_TAG) - 1, .fac_name = {LOG_TAG}, .alloc_sz = sizeof(dap_stream_ch_chain_t)}, + {.fac_len = sizeof(LOG_TAG) - 1, .fac_name = {LOG_TAG}, .alloc_sz = sizeof(dap_chain_ch_t)}, }; #endif /** - * @brief dap_stream_ch_chain_init + * @brief dap_chain_ch_init * @return */ -int dap_stream_ch_chain_init() +int dap_chain_ch_init() { log_it(L_NOTICE, "Chains and global db exchange channel initialized"); dap_stream_ch_proc_add(DAP_STREAM_CH_CHAIN_ID, s_stream_ch_new, s_stream_ch_delete, s_stream_ch_packet_in, @@ -159,9 +159,9 @@ int dap_stream_ch_chain_init() } /** - * @brief dap_stream_ch_chain_deinit + * @brief dap_chain_ch_deinit */ -void dap_stream_ch_chain_deinit() +void dap_chain_ch_deinit() { } @@ -174,11 +174,11 @@ void dap_stream_ch_chain_deinit() void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg) { UNUSED(a_arg); - if (!(a_ch->internal = DAP_NEW_Z(dap_stream_ch_chain_t))) { + if (!(a_ch->internal = DAP_NEW_Z(dap_chain_ch_t))) { log_it(L_CRITICAL, "Memory allocation error"); return; }; - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); + dap_chain_ch_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); l_ch_chain->_inheritor = a_ch; a_ch->stream->esocket->callbacks.write_finished_callback = s_stream_ch_io_complete; @@ -196,7 +196,7 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg) static void s_stream_ch_delete(dap_stream_ch_t *a_ch, void *a_arg) { UNUSED(a_arg); - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); + dap_chain_ch_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); if (l_ch_chain->callback_notify_packet_out) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_DELETE, NULL, 0, l_ch_chain->callback_notify_arg); @@ -209,7 +209,7 @@ static void s_stream_ch_delete(dap_stream_ch_t *a_ch, void *a_arg) #endif } -void dap_stream_ch_chain_reset_unsafe(dap_stream_ch_chain_t *a_ch_chain) +void dap_chain_ch_reset_unsafe(dap_chain_ch_t *a_ch_chain) { if (!a_ch_chain) return; @@ -253,7 +253,7 @@ static void s_sync_out_chains_first_worker_callback(dap_worker_t *a_worker, void return; } - dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + dap_chain_ch_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); if (!l_ch_chain) { log_it(L_CRITICAL, "Channel without chain, dump it"); s_sync_request_delete(l_sync_request); @@ -271,7 +271,7 @@ static void s_sync_out_chains_first_worker_callback(dap_worker_t *a_worker, void if (s_debug_more ) log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN"); - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN, + dap_chain_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, &g_node_addr, sizeof(dap_chain_node_addr_t)); DAP_DELETE(l_sync_request); @@ -293,7 +293,7 @@ static void s_sync_out_chains_last_worker_callback(dap_worker_t *a_worker, void return; } - dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + dap_chain_ch_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); if (!l_ch_chain) { log_it(L_CRITICAL, "Channel without chain, dump it"); s_sync_request_delete(l_sync_request); @@ -301,10 +301,10 @@ static void s_sync_out_chains_last_worker_callback(dap_worker_t *a_worker, void } l_ch_chain->request_atom_iter = l_sync_request->chain.request_atom_iter; // last packet - dap_stream_ch_chain_sync_request_t l_request = {}; + dap_chain_ch_sync_request_t l_request = {}; if (s_debug_more ) log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS"); - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, + dap_chain_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, l_sync_request->request_hdr.net_id.uint64, l_sync_request->request_hdr.chain_id.uint64, l_sync_request->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); s_ch_chain_go_idle(l_ch_chain); @@ -359,7 +359,7 @@ static void s_sync_out_gdb_first_worker_callback(dap_worker_t *a_worker, void *a return; } - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN( l_ch ); + dap_chain_ch_t *l_ch_chain = DAP_STREAM_CH_CHAIN( l_ch ); if (!l_ch_chain) { log_it(L_CRITICAL, "Channel without chain, dump it"); @@ -377,7 +377,7 @@ static void s_sync_out_gdb_first_worker_callback(dap_worker_t *a_worker, void *a l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB; if (s_debug_more ) log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB"); - dap_stream_ch_chain_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, + dap_chain_ch_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, &g_node_addr, sizeof(dap_chain_node_addr_t)); if(l_ch_chain->callback_notify_packet_out) @@ -404,7 +404,7 @@ static void s_sync_out_gdb_last_worker_callback(dap_worker_t *a_worker, void *a_ return; } - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN( l_ch ); + dap_chain_ch_t *l_ch_chain = DAP_STREAM_CH_CHAIN( l_ch ); if (!l_ch_chain) { log_it(L_CRITICAL, "Channel without chain, dump it"); s_sync_request_delete(l_sync_request); @@ -414,7 +414,7 @@ static void s_sync_out_gdb_last_worker_callback(dap_worker_t *a_worker, void *a_ if (s_debug_more ) log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB"); - dap_stream_ch_chain_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + dap_chain_ch_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); s_ch_chain_go_idle(l_ch_chain); @@ -441,7 +441,7 @@ static bool s_sync_out_gdb_proc_callback(void *a_arg) s_sync_request_delete(l_sync_request); return true; } - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + dap_chain_ch_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); if (!l_ch_chain) { log_it(L_CRITICAL, "Channel without chain, dump it"); s_sync_request_delete(l_sync_request); @@ -478,7 +478,7 @@ static void s_sync_update_gdb_start_worker_callback(dap_worker_t *a_worker, void s_sync_request_delete(l_sync_request); return; } - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_START, + dap_chain_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_START, l_sync_request->request_hdr.net_id.uint64, l_sync_request->request_hdr.chain_id.uint64, l_sync_request->request_hdr.cell_id.uint64, NULL, 0); if (s_debug_more) @@ -510,7 +510,7 @@ static bool s_sync_update_gdb_proc_callback(void *a_arg) return true; } - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + dap_chain_ch_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); int l_flags = 0; if (dap_chain_net_get_extra_gdb_group(l_net, l_sync_request->request.node_addr)) l_flags |= F_DB_LOG_ADD_EXTRA_GROUPS; @@ -540,7 +540,7 @@ static bool s_gdb_in_pkt_proc_callback(void *a_arg) */ static bool s_sync_in_chains_callback(void *a_arg) { - dap_stream_ch_chain_pkt_t *l_chain_pkt = a_arg; + dap_chain_ch_pkt_t *l_chain_pkt = a_arg; if (!l_chain_pkt || !l_chain_pkt->hdr.data_size) { log_it(L_CRITICAL, "Proc thread received corrupted chain packet!"); return false; @@ -591,13 +591,13 @@ static bool s_sync_in_chains_callback(void *a_arg) static void s_gossip_payload_callback(void *a_payload, size_t a_payload_size, dap_stream_node_addr_t UNUSED_ARG a_sender_addr) { assert(a_payload && a_payload_size); - dap_stream_ch_chain_pkt_t *l_chain_pkt = a_payload; - if (a_payload_size <= sizeof(dap_stream_ch_chain_pkt_t) || - a_payload_size != sizeof(dap_stream_ch_chain_pkt_t) + l_chain_pkt->hdr.data_size) { + dap_chain_ch_pkt_t *l_chain_pkt = a_payload; + if (a_payload_size <= sizeof(dap_chain_ch_pkt_t) || + a_payload_size != sizeof(dap_chain_ch_pkt_t) + l_chain_pkt->hdr.data_size) { log_it(L_WARNING, "Incorrect chain GOSSIP packet size"); return; } - dap_stream_ch_chain_pkt_t *l_chain_pkt_copy = DAP_DUP_SIZE(a_payload, a_payload_size); + dap_chain_ch_pkt_t *l_chain_pkt_copy = DAP_DUP_SIZE(a_payload, a_payload_size); if (!l_chain_pkt_copy) { log_it(L_CRITICAL, "Not enough memory"); return; @@ -618,7 +618,7 @@ static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_worker, void *a_a if( l_ch == NULL ) { log_it(L_INFO,"Client disconnected before we sent the reply"); } else { - dap_stream_ch_chain_pkt_write_error_unsafe(l_ch, l_sync_request->request_hdr.net_id.uint64, + dap_chain_ch_pkt_write_error_unsafe(l_ch, l_sync_request->request_hdr.net_id.uint64, l_sync_request->request_hdr.chain_id.uint64, l_sync_request->request_hdr.cell_id.uint64, "%s : err %d", "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED", l_sync_request->last_err); @@ -627,13 +627,13 @@ static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_worker, void *a_a } /** - * @brief dap_stream_ch_chain_create_sync_request_gdb + * @brief dap_chain_ch_create_sync_request_gdb * @param a_ch_chain * @param a_net */ -struct sync_request *dap_stream_ch_chain_create_sync_request(dap_stream_ch_chain_pkt_t *a_chain_pkt, dap_stream_ch_t* a_ch) +struct sync_request *dap_chain_ch_create_sync_request(dap_chain_ch_pkt_t *a_chain_pkt, dap_stream_ch_t* a_ch) { - dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); + dap_chain_ch_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); if (!l_ch_chain) { log_it(L_CRITICAL, "Channel without chain, dump it"); return NULL; @@ -655,13 +655,13 @@ struct sync_request *dap_stream_ch_chain_create_sync_request(dap_stream_ch_chain static void s_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const char * a_err_string) { - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); + dap_chain_ch_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); if (!l_ch_chain) { log_it(L_CRITICAL, "Channel without chain, dump it"); return; } s_ch_chain_go_idle(l_ch_chain); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, a_net_id, a_chain_id, a_cell_id, "%s", a_err_string); + dap_chain_ch_pkt_write_error_unsafe(a_ch, a_net_id, a_chain_id, a_cell_id, "%s", a_err_string); } static bool s_chain_timer_callback(void *a_arg) @@ -672,7 +672,7 @@ static bool s_chain_timer_callback(void *a_arg) DAP_DELETE(a_arg); return false; } - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + dap_chain_ch_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); if (!l_ch_chain) { log_it(L_CRITICAL, "Channel without chain, dump it"); DAP_DELETE(a_arg); @@ -697,7 +697,7 @@ static bool s_chain_timer_callback(void *a_arg) // Sending dumb packet with nothing to inform remote thats we're just skiping atoms of GDB's, nothing freezed if (l_ch_chain->state == CHAIN_STATE_SYNC_CHAINS && l_ch_chain->sent_breaks >= 3 * DAP_SYNC_TICKS_PER_SECOND) { debug_if(s_debug_more, L_INFO, "Send one chain TSD packet"); - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD, + dap_chain_ch_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); l_ch_chain->sent_breaks = 0; @@ -706,14 +706,14 @@ static bool s_chain_timer_callback(void *a_arg) return true; } -static void s_chain_timer_reset(dap_stream_ch_chain_t *a_ch_chain) +static void s_chain_timer_reset(dap_chain_ch_t *a_ch_chain) { a_ch_chain->timer_shots = 0; if (!a_ch_chain->activity_timer) - dap_stream_ch_chain_timer_start(a_ch_chain); + dap_chain_ch_timer_start(a_ch_chain); } -void dap_stream_ch_chain_timer_start(dap_stream_ch_chain_t *a_ch_chain) +void dap_chain_ch_timer_start(dap_chain_ch_t *a_ch_chain) { dap_stream_ch_uuid_t *l_uuid = DAP_DUP(&DAP_STREAM_CH(a_ch_chain)->uuid); a_ch_chain->activity_timer = dap_timerfd_start_on_worker(DAP_STREAM_CH(a_ch_chain)->stream_worker->worker, @@ -729,13 +729,13 @@ void dap_stream_ch_chain_timer_start(dap_stream_ch_chain_t *a_ch_chain) */ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) { - dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); + dap_chain_ch_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); if (!l_ch_chain || l_ch_chain->_inheritor != a_ch) { log_it(L_ERROR, "No chain in channel, returning"); return; } dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg; - dap_stream_ch_chain_pkt_t * l_chain_pkt = (dap_stream_ch_chain_pkt_t *) l_ch_pkt->data; + dap_chain_ch_pkt_t * l_chain_pkt = (dap_chain_ch_pkt_t *) l_ch_pkt->data; if (!l_chain_pkt) { log_it(L_ERROR, "No chain packet in channel packet, returning"); return; @@ -759,17 +759,17 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ:{ if(l_ch_chain->state != CHAIN_STATE_IDLE){ log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_REQ request because its already busy with syncronization"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_chain_ch_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); break; } log_it(L_INFO, "In: UPDATE_GLOBAL_DB_REQ pkt: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - if (l_chain_pkt_data_size == (size_t)sizeof(dap_stream_ch_chain_sync_request_t)) - l_ch_chain->request = *(dap_stream_ch_chain_sync_request_t*)l_chain_pkt->data; + if (l_chain_pkt_data_size == (size_t)sizeof(dap_chain_ch_sync_request_t)) + l_ch_chain->request = *(dap_chain_ch_sync_request_t*)l_chain_pkt->data; l_ch_chain->request.id_start = 0; - struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); + struct sync_request *l_sync_request = dap_chain_ch_create_sync_request(l_chain_pkt, a_ch); l_ch_chain->stats_request_gdb_processed = 0; l_ch_chain->request_hdr = l_chain_pkt->hdr; dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_update_gdb_proc_callback, l_sync_request); @@ -788,7 +788,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); if (l_ch_chain->state != CHAIN_STATE_IDLE){ log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_START request because its already busy with syncronization"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_chain_ch_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); break; @@ -801,23 +801,23 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if(s_debug_more) log_it(L_INFO, "In: UPDATE_GLOBAL_DB pkt data_size=%zu", l_chain_pkt_data_size); if (l_ch_chain->state != CHAIN_STATE_UPDATE_GLOBAL_DB_REMOTE || - memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_t))) { + memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_chain_ch_pkt_t))) { log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB request because its already busy with syncronization"); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); break; } - for ( dap_stream_ch_chain_update_element_t * l_element =(dap_stream_ch_chain_update_element_t *) l_chain_pkt->data; + for ( dap_chain_ch_update_element_t * l_element =(dap_chain_ch_update_element_t *) l_chain_pkt->data; (size_t) (((byte_t*)l_element) - l_chain_pkt->data ) < l_chain_pkt_data_size; l_element++){ - dap_stream_ch_chain_hash_item_t * l_hash_item = NULL; + dap_chain_ch_hash_item_t * l_hash_item = NULL; unsigned l_hash_item_hashv; HASH_VALUE(&l_element->hash, sizeof(l_element->hash), l_hash_item_hashv); HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_gdbs, &l_element->hash, sizeof(l_element->hash), l_hash_item_hashv, l_hash_item); if (!l_hash_item) { - l_hash_item = DAP_NEW_Z(dap_stream_ch_chain_hash_item_t); + l_hash_item = DAP_NEW_Z(dap_chain_ch_hash_item_t); if (!l_hash_item) { log_it(L_CRITICAL, "Memory allocation error"); return; @@ -836,9 +836,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; // End of response with starting of DB sync case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END: { - if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { + if(l_chain_pkt_data_size == sizeof(dap_chain_ch_sync_request_t)) { if (l_ch_chain->state != CHAIN_STATE_UPDATE_GLOBAL_DB_REMOTE || - memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_t))) { + memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_chain_ch_pkt_t))) { log_it(L_WARNING, "Can't process UPDATE_GLOBAL_DB_END request because its already busy with syncronization"); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, @@ -847,9 +847,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } debug_if(s_debug_more, L_INFO, "In: UPDATE_GLOBAL_DB_END pkt with total count %d hashes", HASH_COUNT(l_ch_chain->remote_gdbs)); - if (l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) - l_ch_chain->request = *(dap_stream_ch_chain_sync_request_t*)l_chain_pkt->data; - struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); + if (l_chain_pkt_data_size == sizeof(dap_chain_ch_sync_request_t)) + l_ch_chain->request = *(dap_chain_ch_sync_request_t*)l_chain_pkt->data; + struct sync_request *l_sync_request = dap_chain_ch_create_sync_request(l_chain_pkt, a_ch); dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_sync_request); } else { log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); @@ -879,7 +879,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_INFO, "In: GLOBAL_DB data_size=%zu", l_chain_pkt_data_size); // get transaction and save it to global_db if(l_chain_pkt_data_size > 0) { - struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); + struct sync_request *l_sync_request = dap_chain_ch_create_sync_request(l_chain_pkt, a_ch); dap_chain_pkt_item_t *l_pkt_item = &l_sync_request->pkt; l_pkt_item->pkt_data = DAP_DUP_SIZE(l_chain_pkt->data, l_chain_pkt_data_size); l_pkt_item->pkt_data_size = l_chain_pkt_data_size; @@ -896,9 +896,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_INFO, "In: SYNCED_GLOBAL_DB: net 0x%016"DAP_UINT64_FORMAT_x" chain 0x%016"DAP_UINT64_FORMAT_x" cell 0x%016"DAP_UINT64_FORMAT_x, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); if (!l_ch_chain->callback_notify_packet_in) { // we haven't node client waitng, so reply to other side - dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; + dap_chain_ch_sync_request_t l_sync_gdb = {}; l_sync_gdb.node_addr.uint64 = g_node_addr.uint64; - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ, l_chain_pkt->hdr.net_id.uint64, + dap_chain_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_sync_gdb, sizeof(l_sync_gdb)); } } break; @@ -908,7 +908,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ:{ if (l_ch_chain->state != CHAIN_STATE_IDLE) { log_it(L_WARNING, "Can't process UPDATE_CHAINS_REQ request because its already busy with syncronization"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_chain_ch_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); break; @@ -925,7 +925,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_atom_iter = l_chain->callback_atom_iter_create(l_chain, l_chain_pkt->hdr.cell_id, 1); l_chain->callback_atom_iter_get_first(l_ch_chain->request_atom_iter, NULL); l_ch_chain->request_hdr = l_chain_pkt->hdr; - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START, + dap_chain_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START, l_chain_pkt->hdr.net_id.uint64,l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, NULL, 0); } @@ -940,7 +940,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START:{ if (l_ch_chain->state != CHAIN_STATE_IDLE) { log_it(L_WARNING, "Can't process UPDATE_CHAINS_START request because its already busy with syncronization"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + dap_chain_ch_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); break; @@ -983,16 +983,16 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; break; } - for ( dap_stream_ch_chain_update_element_t * l_element =(dap_stream_ch_chain_update_element_t *) l_chain_pkt->data; + for ( dap_chain_ch_update_element_t * l_element =(dap_chain_ch_update_element_t *) l_chain_pkt->data; (size_t) (((byte_t*)l_element) - l_chain_pkt->data ) < l_chain_pkt_data_size; l_element++){ - dap_stream_ch_chain_hash_item_t * l_hash_item = NULL; + dap_chain_ch_hash_item_t * l_hash_item = NULL; unsigned l_hash_item_hashv; HASH_VALUE(&l_element->hash, sizeof(dap_hash_fast_t), l_hash_item_hashv); HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_atoms, &l_element->hash, sizeof(dap_hash_fast_t), l_hash_item_hashv, l_hash_item); if( ! l_hash_item ){ - l_hash_item = DAP_NEW_Z(dap_stream_ch_chain_hash_item_t); + l_hash_item = DAP_NEW_Z(dap_chain_ch_hash_item_t); if (!l_hash_item) { log_it(L_CRITICAL, "Memory allocation error"); return; @@ -1016,9 +1016,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END: { - if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { + if(l_chain_pkt_data_size == sizeof(dap_chain_ch_sync_request_t)) { if (l_ch_chain->state != CHAIN_STATE_UPDATE_CHAINS_REMOTE || - memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_stream_ch_chain_pkt_t))) { + memcmp(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(dap_chain_ch_pkt_t))) { log_it(L_WARNING, "Can't process UPDATE_CHAINS_END request because its already busy with syncronization"); s_stream_ch_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, @@ -1038,7 +1038,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } debug_if(s_debug_more, L_INFO, "In: UPDATE_CHAINS_END pkt with total count %d hashes", HASH_COUNT(l_ch_chain->remote_atoms)); - struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); + struct sync_request *l_sync_request = dap_chain_ch_create_sync_request(l_chain_pkt, a_ch); l_ch_chain->stats_request_atoms_processed = 0; l_ch_chain->request_hdr = l_chain_pkt->hdr; dap_proc_thread_callback_add(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_sync_request); @@ -1068,9 +1068,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: { - dap_stream_ch_chain_pkt_t *l_chain_pkt = (dap_stream_ch_chain_pkt_t *)l_ch_pkt->data; - if (l_chain_pkt_data_size <= sizeof(dap_stream_ch_chain_pkt_t) || - l_chain_pkt_data_size != sizeof(dap_stream_ch_chain_pkt_t) + l_chain_pkt->hdr.data_size) { + dap_chain_ch_pkt_t *l_chain_pkt = (dap_chain_ch_pkt_t *)l_ch_pkt->data; + if (l_chain_pkt_data_size <= sizeof(dap_chain_ch_pkt_t) || + l_chain_pkt_data_size != sizeof(dap_chain_ch_pkt_t) + l_chain_pkt->hdr.data_size) { log_it(L_WARNING, "Incorrect chain packet size"); break; } @@ -1085,7 +1085,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) NODE_ADDR_FP_ARGS_S(a_ch->stream->node), l_cluster->mnemonim); break; } - dap_stream_ch_chain_pkt_t *l_chain_pkt_copy = DAP_DUP_SIZE(l_chain_pkt->data, l_chain_pkt_data_size); + dap_chain_ch_pkt_t *l_chain_pkt_copy = DAP_DUP_SIZE(l_chain_pkt->data, l_chain_pkt_data_size); if (!l_chain_pkt_copy) { log_it(L_CRITICAL, "Not enough memory"); break; @@ -1131,8 +1131,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if (s_debug_more) { log_it(L_INFO, "Out: UPDATE_CHAINS_REQ pkt"); } - dap_stream_ch_chain_sync_request_t l_request= {}; - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ, l_chain_pkt->hdr.net_id.uint64, + dap_chain_ch_sync_request_t l_request= {}; + dap_chain_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_request, sizeof(l_request)); } } break; @@ -1163,7 +1163,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) * @brief s_ch_chain_go_idle_and_free_log_list * @param a_ch_chain */ -static void s_free_log_list_gdb ( dap_stream_ch_chain_t * a_ch_chain) +static void s_free_log_list_gdb ( dap_chain_ch_t * a_ch_chain) { } @@ -1172,7 +1172,7 @@ static void s_free_log_list_gdb ( dap_stream_ch_chain_t * a_ch_chain) * @brief s_ch_chain_go_idle * @param a_ch_chain */ -static void s_ch_chain_go_idle(dap_stream_ch_chain_t *a_ch_chain) +static void s_ch_chain_go_idle(dap_chain_ch_t *a_ch_chain) { if (a_ch_chain->state == CHAIN_STATE_IDLE) { return; @@ -1191,7 +1191,7 @@ static void s_ch_chain_go_idle(dap_stream_ch_chain_t *a_ch_chain) a_ch_chain->request_atom_iter = NULL; } - dap_stream_ch_chain_hash_item_t *l_hash_item = NULL, *l_tmp = NULL; + dap_chain_ch_hash_item_t *l_hash_item = NULL, *l_tmp = NULL; HASH_ITER(hh, a_ch_chain->remote_atoms, l_hash_item, l_tmp) { // Clang bug at this, l_hash_item should change at every loop cycle @@ -1205,7 +1205,7 @@ static void s_ch_chain_go_idle(dap_stream_ch_chain_t *a_ch_chain) struct chain_io_complete { dap_stream_ch_uuid_t ch_uuid; - dap_stream_ch_chain_state_t state; + dap_chain_ch_state_t state; uint8_t type; uint64_t net_id; uint64_t chain_id; @@ -1241,7 +1241,7 @@ static void s_stream_ch_io_complete(dap_events_socket_t *a_es, void *a_arg, int struct chain_io_complete *l_arg = (struct chain_io_complete *)a_arg; if (DAP_STREAM_CH_CHAIN(l_ch)->state == CHAIN_STATE_WAITING) DAP_STREAM_CH_CHAIN(l_ch)->state = l_arg->state; - dap_stream_ch_chain_pkt_write_unsafe(l_ch, l_arg->type, l_arg->net_id, l_arg->chain_id, + dap_chain_ch_pkt_write_unsafe(l_ch, l_arg->type, l_arg->net_id, l_arg->chain_id, l_arg->cell_id, l_arg->data, l_arg->data_size); a_es->callbacks.arg = NULL; DAP_DELETE(a_arg); @@ -1255,7 +1255,7 @@ static void s_stream_ch_chain_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, u const void * a_data, size_t a_data_size) { size_t l_free_buf_size = dap_events_socket_get_free_buf_size(a_ch->stream->esocket) - - sizeof(dap_stream_ch_chain_pkt_t) - sizeof(dap_stream_ch_pkt_t) - + sizeof(dap_chain_ch_pkt_t) - sizeof(dap_stream_ch_pkt_t) - sizeof(dap_stream_pkt_t) - DAP_STREAM_PKT_ENCRYPTION_OVERHEAD; if (l_free_buf_size < a_data_size) { struct chain_io_complete *l_arg = DAP_NEW_Z_SIZE(struct chain_io_complete, sizeof(struct chain_io_complete) + a_data_size); @@ -1271,7 +1271,7 @@ static void s_stream_ch_chain_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, u a_ch->stream->esocket->callbacks.arg = l_arg; } else - dap_stream_ch_chain_pkt_write_unsafe(a_ch, a_type, a_net_id, a_chain_id, a_cell_id, a_data, a_data_size); + dap_chain_ch_pkt_write_unsafe(a_ch, a_type, a_net_id, a_chain_id, a_cell_id, a_data, a_data_size); } /** @@ -1281,7 +1281,7 @@ static void s_stream_ch_chain_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, u */ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) { - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); + dap_chain_ch_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); if (!l_ch_chain) { log_it(L_CRITICAL, "Channel without chain, dump it"); s_ch_chain_go_idle(l_ch_chain); @@ -1296,7 +1296,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) // s_update_pack_size; 0; //dap_db_log_list_obj_t **l_objs = dap_db_log_list_get_multiple(l_ch_chain->request_db_log, DAP_STREAM_PKT_SIZE_MAX, &q); - dap_stream_ch_chain_update_element_t *l_data = DAP_NEW_Z_SIZE(dap_stream_ch_chain_update_element_t, q * sizeof(dap_stream_ch_chain_update_element_t)); + dap_chain_ch_update_element_t *l_data = DAP_NEW_Z_SIZE(dap_chain_ch_update_element_t, q * sizeof(dap_chain_ch_update_element_t)); for (i = 0; i < q; ++i) { l_data[i].hash = l_objs[i]->hash; l_data[i].size = l_objs[i]->pkt->data_size; @@ -1308,7 +1308,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, - l_data, i * sizeof(dap_stream_ch_chain_update_element_t)); + l_data, i * sizeof(dap_chain_ch_update_element_t)); l_ch_chain->stats_request_gdb_processed += i; DAP_DELETE(l_data); DAP_DELETE(l_objs); @@ -1321,11 +1321,11 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, - &l_ch_chain->request, sizeof(dap_stream_ch_chain_sync_request_t)); + &l_ch_chain->request, sizeof(dap_chain_ch_sync_request_t)); debug_if(s_debug_more, L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END"); l_go_idle = true; } - dap_stream_ch_chain_update_element_t l_data[s_update_pack_size]; + dap_chain_ch_update_element_t l_data[s_update_pack_size]; uint_fast16_t i; dap_db_log_list_obj_t *l_obj = NULL; for (i = 0; i < s_update_pack_size; i++) { @@ -1342,7 +1342,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, - l_data, i * sizeof(dap_stream_ch_chain_update_element_t)); + l_data, i * sizeof(dap_chain_ch_update_element_t)); l_ch_chain->stats_request_gdb_processed += i; if (s_debug_more) log_it(L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB"); @@ -1354,7 +1354,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, - &l_ch_chain->request, sizeof(dap_stream_ch_chain_sync_request_t)); + &l_ch_chain->request, sizeof(dap_chain_ch_sync_request_t)); if (s_debug_more ) log_it(L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END"); l_go_idle = true; @@ -1369,14 +1369,14 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) size_t l_pkt_size = 0, i, q = 0; dap_db_log_list_obj_t **l_objs = dap_db_log_list_get_multiple(l_ch_chain->request_db_log, DAP_STREAM_PKT_SIZE_MAX, &q); for (i = 0; i < q; ++i) { - dap_stream_ch_chain_hash_item_t *l_hash_item = NULL; + dap_chain_ch_hash_item_t *l_hash_item = NULL; unsigned l_hash_item_hashv = 0; HASH_VALUE(&l_objs[i]->hash, sizeof(dap_chain_hash_fast_t), l_hash_item_hashv); HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_gdbs, &l_objs[i]->hash, sizeof(dap_hash_fast_t), l_hash_item_hashv, l_hash_item); if (!l_hash_item) { - l_hash_item = DAP_NEW_Z(dap_stream_ch_chain_hash_item_t); - *l_hash_item = (dap_stream_ch_chain_hash_item_t) { + l_hash_item = DAP_NEW_Z(dap_chain_ch_hash_item_t); + *l_hash_item = (dap_chain_ch_hash_item_t) { .hash = l_objs[i]->hash, .size = l_objs[i]->pkt->data_size }; HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_gdbs, hash, sizeof(dap_chain_hash_fast_t), @@ -1404,7 +1404,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) } else if (!l_objs) { l_was_sent_smth = true; // last message - dap_stream_ch_chain_sync_request_t l_request = { }; + dap_chain_ch_sync_request_t l_request = { }; s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); @@ -1425,7 +1425,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) l_skip_count = s_skip_in_reactor_count; break; } - dap_stream_ch_chain_hash_item_t *l_hash_item = NULL; + dap_chain_ch_hash_item_t *l_hash_item = NULL; unsigned l_hash_item_hashv = 0; HASH_VALUE(&l_obj->hash, sizeof(dap_chain_hash_fast_t), l_hash_item_hashv); HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_gdbs, &l_obj->hash, sizeof(dap_hash_fast_t), @@ -1439,7 +1439,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) }*/ l_skip_count++; } else { - l_hash_item = DAP_NEW_Z(dap_stream_ch_chain_hash_item_t); + l_hash_item = DAP_NEW_Z(dap_chain_ch_hash_item_t); if (!l_hash_item) { log_it(L_CRITICAL, "Memory allocation error"); return; @@ -1473,7 +1473,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) log_it( L_INFO,"Syncronized database: items syncronyzed %"DAP_UINT64_FORMAT_U" from %zu", l_ch_chain->stats_request_gdb_processed, dap_db_log_list_get_count(l_ch_chain->request_db_log)); // last message - dap_stream_ch_chain_sync_request_t l_request = {}; + dap_chain_ch_sync_request_t l_request = {}; s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); @@ -1487,20 +1487,20 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) // Update list of atoms to remote case CHAIN_STATE_UPDATE_CHAINS:{ - dap_stream_ch_chain_update_element_t *l_data = DAP_NEW_Z_SIZE(dap_stream_ch_chain_update_element_t, - sizeof(dap_stream_ch_chain_update_element_t) * s_update_pack_size); + dap_chain_ch_update_element_t *l_data = DAP_NEW_Z_SIZE(dap_chain_ch_update_element_t, + sizeof(dap_chain_ch_update_element_t) * s_update_pack_size); size_t l_data_size=0; for(uint_fast16_t n=0; n<s_update_pack_size && (l_ch_chain->request_atom_iter && l_ch_chain->request_atom_iter->cur);n++){ l_data[n].hash = *l_ch_chain->request_atom_iter->cur_hash; // Shift offset counter - l_data_size += sizeof(dap_stream_ch_chain_update_element_t); + l_data_size += sizeof(dap_chain_ch_update_element_t); // Then get next atom l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); } if (l_data_size){ l_was_sent_smth = true; if(s_debug_more) - log_it(L_DEBUG,"Out: UPDATE_CHAINS with %zu hashes sent", l_data_size / sizeof(dap_stream_ch_chain_update_element_t)); + log_it(L_DEBUG,"Out: UPDATE_CHAINS with %zu hashes sent", l_data_size / sizeof(dap_chain_ch_update_element_t)); s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, @@ -1511,12 +1511,12 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) l_was_sent_smth = true; if(s_debug_more) log_it(L_INFO,"Out: UPDATE_CHAINS_END sent "); - dap_stream_ch_chain_sync_request_t l_request = {}; + dap_chain_ch_sync_request_t l_request = {}; s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, - &l_request, sizeof(dap_stream_ch_chain_sync_request_t)); + &l_request, sizeof(dap_chain_ch_sync_request_t)); l_go_idle = true; dap_stream_ch_set_ready_to_write_unsafe(a_ch, false); } @@ -1531,7 +1531,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) l_ch_chain->request_atom_iter && l_ch_chain->request_atom_iter->cur; k++){ // Check if present and skip if present - dap_stream_ch_chain_hash_item_t *l_hash_item = NULL; + dap_chain_ch_hash_item_t *l_hash_item = NULL; unsigned l_hash_item_hashv = 0; HASH_VALUE(l_ch_chain->request_atom_iter->cur_hash, sizeof(dap_chain_hash_fast_t), l_hash_item_hashv); HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_atoms, l_ch_chain->request_atom_iter->cur_hash, @@ -1544,7 +1544,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) l_request_atom_hash_str); }*/ }else{ - l_hash_item = DAP_NEW_Z(dap_stream_ch_chain_hash_item_t); + l_hash_item = DAP_NEW_Z(dap_chain_ch_hash_item_t); if (!l_hash_item) { log_it(L_CRITICAL, "Memory allocation error"); return; @@ -1572,7 +1572,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) break; } if(!l_ch_chain->request_atom_iter || !l_ch_chain->request_atom_iter->cur) { // All chains synced - dap_stream_ch_chain_sync_request_t l_request = {}; + dap_chain_ch_sync_request_t l_request = {}; // last message l_was_sent_smth = true; s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, diff --git a/modules/chain/dap_chain_ch_pkt.c b/modules/chain/dap_chain_ch_pkt.c index 743fb83a2e..216b31e3bc 100644 --- a/modules/chain/dap_chain_ch_pkt.c +++ b/modules/chain/dap_chain_ch_pkt.c @@ -21,7 +21,7 @@ #include "dap_chain_ch_pkt.h" #include "dap_chain.h" -#define LOG_TAG "dap_stream_ch_chain_pkt" +#define LOG_TAG "dap_chain_ch_pkt" /** * @brief dap_stream_ch_net_pkt_write @@ -30,13 +30,13 @@ * @param data_size * @return */ -size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id, +size_t dap_chain_ch_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size) { - size_t l_chain_pkt_size = sizeof(dap_stream_ch_chain_pkt_hdr_t) + a_data_size; - dap_stream_ch_chain_pkt_t *l_chain_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size); - *l_chain_pkt = (dap_stream_ch_chain_pkt_t){ + size_t l_chain_pkt_size = sizeof(dap_chain_ch_pkt_hdr_t) + a_data_size; + dap_chain_ch_pkt_t *l_chain_pkt = DAP_NEW_Z_SIZE(dap_chain_ch_pkt_t, l_chain_pkt_size); + *l_chain_pkt = (dap_chain_ch_pkt_t){ .hdr = { .version = DAP_STREAM_CH_CHAIN_PKT_VERSION, .net_id.uint64 = a_net_id, .cell_id.uint64 = a_cell_id, .chain_id.uint64 = a_chain_id } }; @@ -50,7 +50,7 @@ size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_typ /** - * @brief dap_stream_ch_chain_pkt_write_mt + * @brief dap_chain_ch_pkt_write_mt * @param a_worker * @param a_ch_uuid * @param a_type @@ -61,15 +61,15 @@ size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_typ * @param a_data_size * @return */ -size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_uuid_t a_ch_uuid, uint8_t a_type,uint64_t a_net_id, +size_t dap_chain_ch_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_uuid_t a_ch_uuid, uint8_t a_type,uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size) { - size_t l_chain_pkt_size = sizeof(dap_stream_ch_chain_pkt_hdr_t) + a_data_size; - dap_stream_ch_chain_pkt_t *l_chain_pkt = l_chain_pkt_size > 0x3FFF - ? DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size) - : DAP_NEW_STACK_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size); - *l_chain_pkt = (dap_stream_ch_chain_pkt_t){ + size_t l_chain_pkt_size = sizeof(dap_chain_ch_pkt_hdr_t) + a_data_size; + dap_chain_ch_pkt_t *l_chain_pkt = l_chain_pkt_size > 0x3FFF + ? DAP_NEW_Z_SIZE(dap_chain_ch_pkt_t, l_chain_pkt_size) + : DAP_NEW_STACK_SIZE(dap_chain_ch_pkt_t, l_chain_pkt_size); + *l_chain_pkt = (dap_chain_ch_pkt_t){ .hdr = { .version = DAP_STREAM_CH_CHAIN_PKT_VERSION, .net_id.uint64 = a_net_id, .cell_id.uint64 = a_cell_id, .chain_id.uint64 = a_chain_id } }; @@ -82,15 +82,15 @@ size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_strea return l_ret; } -size_t dap_stream_ch_chain_pkt_write_multi_mt(dap_stream_ch_cachet_t *a_links, size_t a_count, uint8_t a_type,uint64_t a_net_id, +size_t dap_chain_ch_pkt_write_multi_mt(dap_stream_ch_cachet_t *a_links, size_t a_count, uint8_t a_type,uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size) { - size_t l_chain_pkt_size = sizeof(dap_stream_ch_chain_pkt_hdr_t) + a_data_size; - dap_stream_ch_chain_pkt_t *l_chain_pkt = l_chain_pkt_size > 0x3FFF - ? DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size) - : DAP_NEW_STACK_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size); - *l_chain_pkt = (dap_stream_ch_chain_pkt_t){ + size_t l_chain_pkt_size = sizeof(dap_chain_ch_pkt_hdr_t) + a_data_size; + dap_chain_ch_pkt_t *l_chain_pkt = l_chain_pkt_size > 0x3FFF + ? DAP_NEW_Z_SIZE(dap_chain_ch_pkt_t, l_chain_pkt_size) + : DAP_NEW_STACK_SIZE(dap_chain_ch_pkt_t, l_chain_pkt_size); + *l_chain_pkt = (dap_chain_ch_pkt_t){ .hdr = { .version = DAP_STREAM_CH_CHAIN_PKT_VERSION, .net_id.uint64 = a_net_id, .cell_id.uint64 = a_cell_id, .chain_id.uint64 = a_chain_id } }; @@ -124,14 +124,14 @@ size_t dap_stream_ch_chain_pkt_write_multi_mt(dap_stream_ch_cachet_t *a_links, s * @param a_data_size * @return */ -size_t dap_stream_ch_chain_pkt_write_inter(dap_events_socket_t * a_es_input, dap_stream_ch_uuid_t a_ch_uuid, +size_t dap_chain_ch_pkt_write_inter(dap_events_socket_t * a_es_input, dap_stream_ch_uuid_t a_ch_uuid, uint8_t a_type,uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size) { - size_t l_chain_pkt_size = sizeof(dap_stream_ch_chain_pkt_hdr_t) + a_data_size; - dap_stream_ch_chain_pkt_t *l_chain_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size ); - *l_chain_pkt = (dap_stream_ch_chain_pkt_t){ + size_t l_chain_pkt_size = sizeof(dap_chain_ch_pkt_hdr_t) + a_data_size; + dap_chain_ch_pkt_t *l_chain_pkt = DAP_NEW_Z_SIZE(dap_chain_ch_pkt_t, l_chain_pkt_size ); + *l_chain_pkt = (dap_chain_ch_pkt_t){ .hdr = { .version = DAP_STREAM_CH_CHAIN_PKT_VERSION, .net_id.uint64 = a_net_id, .cell_id.uint64 = a_cell_id, .chain_id.uint64 = a_chain_id } }; diff --git a/modules/chain/include/dap_chain_ch.h b/modules/chain/include/dap_chain_ch.h index 0685aaf412..225689bb72 100644 --- a/modules/chain/include/dap_chain_ch.h +++ b/modules/chain/include/dap_chain_ch.h @@ -36,9 +36,9 @@ #define DAP_CHAIN_NODE_SYNC_TIMEOUT 60 // sec #define DAP_SYNC_TICKS_PER_SECOND 10 -typedef struct dap_stream_ch_chain dap_stream_ch_chain_t; -typedef void (*dap_stream_ch_chain_callback_packet_t)(dap_stream_ch_chain_t*, uint8_t a_pkt_type, - dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, +typedef struct dap_chain_ch dap_chain_ch_t; +typedef void (*dap_chain_ch_callback_packet_t)(dap_chain_ch_t*, uint8_t a_pkt_type, + dap_chain_ch_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg); typedef struct dap_chain_atom_item{ dap_chain_hash_fast_t atom_hash; @@ -52,46 +52,46 @@ typedef struct dap_chain_pkt_item { byte_t *pkt_data; } dap_chain_pkt_item_t; -typedef struct dap_stream_ch_chain_hash_item{ +typedef struct dap_chain_ch_hash_item{ dap_hash_fast_t hash; uint32_t size; UT_hash_handle hh; -} dap_stream_ch_chain_hash_item_t; +} dap_chain_ch_hash_item_t; -typedef struct dap_stream_ch_chain { +typedef struct dap_chain_ch { void *_inheritor; - dap_stream_ch_chain_state_t state; + dap_chain_ch_state_t state; uint64_t stats_request_atoms_processed; uint64_t stats_request_gdb_processed; - dap_stream_ch_chain_hash_item_t * remote_atoms; // Remote atoms - dap_stream_ch_chain_hash_item_t * remote_gdbs; // Remote gdbs + dap_chain_ch_hash_item_t * remote_atoms; // Remote atoms + dap_chain_ch_hash_item_t * remote_gdbs; // Remote gdbs // request section dap_chain_atom_iter_t *request_atom_iter; //dap_db_log_list_t *request_db_log; // list of global db records - dap_stream_ch_chain_sync_request_t request; - dap_stream_ch_chain_pkt_hdr_t request_hdr; + dap_chain_ch_sync_request_t request; + dap_chain_ch_pkt_hdr_t request_hdr; dap_list_t *request_db_iter; int timer_shots; dap_timerfd_t *activity_timer; int sent_breaks; - dap_stream_ch_chain_callback_packet_t callback_notify_packet_out; - dap_stream_ch_chain_callback_packet_t callback_notify_packet_in; + dap_chain_ch_callback_packet_t callback_notify_packet_out; + dap_chain_ch_callback_packet_t callback_notify_packet_in; void *callback_notify_arg; -} dap_stream_ch_chain_t; +} dap_chain_ch_t; -#define DAP_STREAM_CH_CHAIN(a) ((dap_stream_ch_chain_t *) ((a)->internal) ) +#define DAP_STREAM_CH_CHAIN(a) ((dap_chain_ch_t *) ((a)->internal) ) #define DAP_STREAM_CH(a) ((dap_stream_ch_t *)((a)->_inheritor)) #define DAP_CHAIN_PKT_EXPECT_SIZE 7168 #define DAP_STREAM_CH_CHAIN_ID 'C' -int dap_stream_ch_chain_init(void); -void dap_stream_ch_chain_deinit(void); +int dap_chain_ch_init(void); +void dap_chain_ch_deinit(void); -void dap_stream_ch_chain_timer_start(dap_stream_ch_chain_t *a_ch_chain); -void dap_stream_ch_chain_reset_unsafe(dap_stream_ch_chain_t *a_ch_chain); +void dap_chain_ch_timer_start(dap_chain_ch_t *a_ch_chain); +void dap_chain_ch_reset_unsafe(dap_chain_ch_t *a_ch_chain); diff --git a/modules/chain/include/dap_chain_ch_pkt.h b/modules/chain/include/dap_chain_ch_pkt.h index 2bbea66a36..d0f33de8ce 100644 --- a/modules/chain/include/dap_chain_ch_pkt.h +++ b/modules/chain/include/dap_chain_ch_pkt.h @@ -68,7 +68,7 @@ #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_HASH_FIRST 0x0004 // Hash of first(s) item #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_LAST_ID 0x0100 // Last ID of GDB synced group -typedef enum dap_stream_ch_chain_state{ +typedef enum dap_chain_ch_state{ CHAIN_STATE_IDLE=0, CHAIN_STATE_WAITING, CHAIN_STATE_UPDATE_GLOBAL_DB_REMOTE, // Downloadn GDB hashtable from remote @@ -78,38 +78,38 @@ typedef enum dap_stream_ch_chain_state{ CHAIN_STATE_UPDATE_CHAINS, // Update chains hashtable to remote CHAIN_STATE_SYNC_CHAINS, CHAIN_STATE_SYNC_ALL -} dap_stream_ch_chain_state_t; +} dap_chain_ch_state_t; -typedef struct dap_stream_ch_chain_update_element{ +typedef struct dap_chain_ch_update_element{ dap_hash_fast_t hash; uint32_t size; -} DAP_ALIGN_PACKED dap_stream_ch_chain_update_element_t; +} DAP_ALIGN_PACKED dap_chain_ch_update_element_t; -typedef struct dap_stream_ch_chain_sync_request{ +typedef struct dap_chain_ch_sync_request{ dap_chain_node_addr_t node_addr; // Requesting node's address dap_chain_hash_fast_t hash_from; dap_chain_hash_fast_t hash_to; // unused uint64_t id_start; uint64_t id_end; // unused -} DAP_ALIGN_PACKED dap_stream_ch_chain_sync_request_t; +} DAP_ALIGN_PACKED dap_chain_ch_sync_request_t; -typedef struct dap_stream_ch_chain_pkt_hdr { +typedef struct dap_chain_ch_pkt_hdr { uint8_t version; uint8_t padding[3]; uint32_t data_size; dap_chain_net_id_t net_id; dap_chain_id_t chain_id; dap_chain_cell_id_t cell_id; -} DAP_ALIGN_PACKED dap_stream_ch_chain_pkt_hdr_t; +} DAP_ALIGN_PACKED dap_chain_ch_pkt_hdr_t; -typedef struct dap_stream_ch_chain_pkt { - dap_stream_ch_chain_pkt_hdr_t hdr; +typedef struct dap_chain_ch_pkt { + dap_chain_ch_pkt_hdr_t hdr; uint8_t data[]; -} DAP_ALIGN_PACKED dap_stream_ch_chain_pkt_t; +} DAP_ALIGN_PACKED dap_chain_ch_pkt_t; -static const char* c_dap_stream_ch_chain_pkt_type_str[]={ +static const char* c_dap_chain_ch_pkt_type_str[]={ [DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN", [DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB", [DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS] = "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS", @@ -118,24 +118,24 @@ static const char* c_dap_stream_ch_chain_pkt_type_str[]={ }; -size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id, +size_t dap_chain_ch_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size); -size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_uuid_t a_ch_uuid, uint8_t a_type, uint64_t a_net_id, +size_t dap_chain_ch_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_uuid_t a_ch_uuid, uint8_t a_type, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size); -size_t dap_stream_ch_chain_pkt_write_multi_mt(dap_stream_ch_cachet_t *a_links, size_t a_count, uint8_t a_type, uint64_t a_net_id, +size_t dap_chain_ch_pkt_write_multi_mt(dap_stream_ch_cachet_t *a_links, size_t a_count, uint8_t a_type, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size); -size_t dap_stream_ch_chain_pkt_write_inter(dap_events_socket_t * a_es_input, dap_stream_ch_uuid_t a_ch_uuid, uint8_t a_type,uint64_t a_net_id, +size_t dap_chain_ch_pkt_write_inter(dap_events_socket_t * a_es_input, dap_stream_ch_uuid_t a_ch_uuid, uint8_t a_type,uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size); /** - * @brief dap_stream_ch_chain_pkt_write_error_unsafe + * @brief dap_chain_ch_pkt_write_error_unsafe * @param a_ch * @param a_net_id * @param a_chain_id @@ -143,7 +143,7 @@ size_t dap_stream_ch_chain_pkt_write_inter(dap_events_socket_t * a_es_input, dap * @param a_err_string_format * @return */ -inline static DAP_PRINTF_ATTR(5, 6) size_t dap_stream_ch_chain_pkt_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net_id, +inline static DAP_PRINTF_ATTR(5, 6) size_t dap_chain_ch_pkt_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const char *a_err_string_format, ...) { va_list l_va; @@ -157,14 +157,14 @@ inline static DAP_PRINTF_ATTR(5, 6) size_t dap_stream_ch_chain_pkt_write_error_u va_start(l_va, a_err_string_format); vsnprintf(l_str, l_size,a_err_string_format, l_va); va_end(l_va); - return dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR, + return dap_chain_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR, a_net_id, a_chain_id, a_cell_id, l_str, l_size); } return 0; } /** - * @brief dap_stream_ch_chain_pkt_write_error_inter + * @brief dap_chain_ch_pkt_write_error_inter * @param a_es_input * @param a_ch * @param a_net_id @@ -173,7 +173,7 @@ inline static DAP_PRINTF_ATTR(5, 6) size_t dap_stream_ch_chain_pkt_write_error_u * @param a_err_string_format * @return */ -static inline size_t dap_stream_ch_chain_pkt_write_error_inter(dap_events_socket_t *a_es_input, dap_stream_ch_uuid_t a_ch_uuid, +static inline size_t dap_chain_ch_pkt_write_error_inter(dap_events_socket_t *a_es_input, dap_stream_ch_uuid_t a_ch_uuid, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const char *a_err_string_format, ...) { va_list l_va; @@ -187,7 +187,7 @@ static inline size_t dap_stream_ch_chain_pkt_write_error_inter(dap_events_socket va_start(l_va, a_err_string_format); vsnprintf(l_str, l_size, a_err_string_format, l_va); va_end(l_va); - return dap_stream_ch_chain_pkt_write_inter(a_es_input, a_ch_uuid, DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR, + return dap_chain_ch_pkt_write_inter(a_es_input, a_ch_uuid, DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR, a_net_id, a_chain_id, a_cell_id, l_str, l_size); } return 0; diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c index 14749e119f..6066fa11d2 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c @@ -140,7 +140,7 @@ int dap_stream_ch_chain_net_srv_init(void) } /** - * @brief dap_stream_ch_chain_deinit + * @brief dap_chain_ch_deinit */ void dap_stream_ch_chain_net_srv_deinit(void) { diff --git a/modules/channel/chain-net/dap_stream_ch_chain_net.c b/modules/channel/chain-net/dap_stream_ch_chain_net.c index 129ff0c22f..066697d82f 100644 --- a/modules/channel/chain-net/dap_stream_ch_chain_net.c +++ b/modules/channel/chain-net/dap_stream_ch_chain_net.c @@ -76,7 +76,7 @@ int dap_stream_ch_chain_net_init() } /** - * @brief dap_stream_ch_chain_deinit + * @brief dap_chain_ch_deinit */ void dap_stream_ch_chain_net_deinit() { @@ -203,7 +203,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void* a_arg) dap_sign_t *l_sign = NULL; size_t sign_s = 0; size_t l_orders_num = 0; - dap_stream_ch_chain_validator_test_t *send = NULL; + dap_chain_ch_validator_test_t *send = NULL; dap_chain_net_srv_price_unit_uid_t l_price_unit = { { 0 } }; dap_chain_net_srv_uid_t l_uid = { .uint64 = DAP_CHAIN_NET_SRV_STAKE_POS_DELEGATE_ID }; uint256_t l_price_min = {}; @@ -229,7 +229,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void* a_arg) else flags = flags & ~F_CERT;//Specified certificate not found - send = DAP_NEW_Z_SIZE(dap_stream_ch_chain_validator_test_t, sizeof(dap_stream_ch_chain_validator_test_t) + sign_s); + send = DAP_NEW_Z_SIZE(dap_chain_ch_validator_test_t, sizeof(dap_chain_ch_validator_test_t) + sign_s); #ifdef DAP_VERSION strncpy((char *)send->header.version, (char *)DAP_VERSION, sizeof(send->header.version)); #endif @@ -264,7 +264,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void* a_arg) if(sign_s) memcpy(send->sign,l_sign,sign_s); dap_stream_ch_chain_net_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_VALIDATOR_READY , - l_ch_chain_net_pkt->hdr.net_id, send, sizeof(dap_stream_ch_chain_validator_test_t) + sign_s); + l_ch_chain_net_pkt->hdr.net_id, send, sizeof(dap_chain_ch_validator_test_t) + sign_s); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); if(l_sign) DAP_DELETE(l_sign); diff --git a/modules/channel/chain-net/include/dap_stream_ch_chain_net.h b/modules/channel/chain-net/include/dap_stream_ch_chain_net.h index d9518d6183..dba4f2f403 100644 --- a/modules/channel/chain-net/include/dap_stream_ch_chain_net.h +++ b/modules/channel/chain-net/include/dap_stream_ch_chain_net.h @@ -40,7 +40,7 @@ typedef struct dap_stream_ch_chain_net { void *notify_callback_arg; } dap_stream_ch_chain_net_t; -typedef struct dap_stream_ch_chain_validator_test{ +typedef struct dap_chain_ch_validator_test{ struct{ /// node Version uint8_t version[32]; @@ -52,7 +52,7 @@ typedef struct dap_stream_ch_chain_validator_test{ //uint8_t data[10]; }DAP_ALIGN_PACKED header; byte_t sign[]; -} DAP_ALIGN_PACKED dap_stream_ch_chain_validator_test_t; +} DAP_ALIGN_PACKED dap_chain_ch_validator_test_t; #define A_PROC 0x01//autoproc set #define F_ORDR 0x02//order exist diff --git a/modules/consensus/esbocs/dap_chain_cs_esbocs.c b/modules/consensus/esbocs/dap_chain_cs_esbocs.c index 009d51244a..ce52243862 100644 --- a/modules/consensus/esbocs/dap_chain_cs_esbocs.c +++ b/modules/consensus/esbocs/dap_chain_cs_esbocs.c @@ -252,9 +252,6 @@ static int s_callback_new(dap_chain_t *a_chain, dap_config_t *a_chain_cfg) l_ret = -4; goto lb_err; } - char *l_signer_addr = dap_chain_hash_fast_to_str_new(&l_signing_addr.data.hash_fast); - log_it(L_MSG, "add validator addr "NODE_ADDR_FP_STR", signing addr %s", NODE_ADDR_FP_ARGS_S(l_signer_node_addr), l_signer_addr); - DAP_DELETE(l_signer_addr); dap_chain_esbocs_validator_t *l_validator = DAP_NEW_Z(dap_chain_esbocs_validator_t); if (!l_validator) { @@ -266,6 +263,9 @@ static int s_callback_new(dap_chain_t *a_chain, dap_config_t *a_chain_cfg) l_validator->node_addr = l_signer_node_addr; l_validator->weight = uint256_1; l_esbocs_pvt->poa_validators = dap_list_append(l_esbocs_pvt->poa_validators, l_validator); + char *l_signer_addr = dap_chain_hash_fast_to_str_new(&l_signing_addr.data.hash_fast); + log_it(L_MSG, "add validator addr "NODE_ADDR_FP_STR", signing addr %s", NODE_ADDR_FP_ARGS_S(l_signer_node_addr), l_signer_addr); + DAP_DELETE(l_signer_addr); if (!l_esbocs_pvt->poa_mode) { // auth certs in PoA mode will be first PoS validators keys dap_hash_fast_t l_stake_tx_hash = {}; @@ -421,27 +421,44 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf l_esbocs_pvt->collecting_addr = dap_chain_addr_from_str(dap_config_get_item_str(a_chain_net_cfg, "esbocs", "fee_addr")); l_esbocs_pvt->collecting_level = dap_chain_coins_to_balance(dap_config_get_item_str_default(a_chain_net_cfg, "esbocs", "set_collect_fee", "10.0")); + dap_list_t *l_validators = dap_chain_net_srv_stake_get_validators(a_chain->net_id, false); + for (dap_list_t *it = l_validators; it; it = it->next) { + dap_stream_node_addr_t *l_addr = &((dap_chain_net_srv_stake_item_t *)it->data)->node_addr; + dap_chain_net_add_validator_to_clusters(a_chain, l_addr); + } + dap_chain_esbocs_session_t *l_session = NULL; + DAP_NEW_Z_RET_VAL(l_session, dap_chain_esbocs_session_t, -8, NULL); + l_session->chain = a_chain; + l_session->esbocs = l_esbocs; + l_esbocs->session = l_session; + DL_APPEND(s_session_items, l_session); + log_it(L_INFO, "Init ESBOCS session for net:%s, chain:%s", a_chain->net_name, a_chain->name); + const char *l_sign_cert_str = NULL; if( (l_sign_cert_str = dap_config_get_item_str(a_chain_net_cfg, "esbocs", "blocks-sign-cert")) ) { dap_cert_t *l_sign_cert = dap_cert_find_by_name(l_sign_cert_str); if (l_sign_cert == NULL) { log_it(L_ERROR, "Can't load sign certificate, name \"%s\" is wrong", l_sign_cert_str); + dap_list_free_full(l_validators, NULL); return -1; } else if (l_sign_cert->enc_key->priv_key_data) { l_esbocs_pvt->blocks_sign_key = l_sign_cert->enc_key; log_it(L_INFO, "Loaded \"%s\" certificate for net %s to sign ESBOCS blocks", l_sign_cert_str, a_chain->net_name); } else { log_it(L_ERROR, "Certificate \"%s\" has no private key", l_sign_cert_str); + dap_list_free_full(l_validators, NULL); return -2; } } else { log_it(L_NOTICE, "No sign certificate provided for net %s, can't sign any blocks. This node can't be a consensus validator", a_chain->net_name); + dap_list_free_full(l_validators, NULL); return -3; } dap_chain_net_t *l_net = dap_chain_net_by_id(a_chain->net_id); dap_chain_node_role_t l_role = dap_chain_net_get_role(l_net); if (l_role.enums > NODE_ROLE_MASTER) { log_it(L_NOTICE, "Node role is lower than master role, so this node can't be a consensus validator"); + dap_list_free_full(l_validators, NULL); return -5; } dap_chain_addr_t l_my_signing_addr; @@ -449,21 +466,17 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf if (!l_esbocs_pvt->poa_mode) { if (!dap_chain_net_srv_stake_key_delegated(&l_my_signing_addr)) { log_it(L_WARNING, "Signing key is not delegated by stake service. Switch off validator mode"); + dap_list_free_full(l_validators, NULL); return -6; } } else { if (!s_validator_check(&l_my_signing_addr, l_esbocs_pvt->poa_validators)) { log_it(L_WARNING, "Signing key is not present in PoA certs list. Switch off validator mode"); + dap_list_free_full(l_validators, NULL); return -7; } } - dap_chain_esbocs_session_t *l_session = NULL; - DAP_NEW_Z_RET_VAL(l_session, dap_chain_esbocs_session_t, -8, NULL); - - l_session->chain = a_chain; - l_session->esbocs = l_esbocs; - l_esbocs->session = l_session; l_session->my_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); l_session->my_signing_addr = l_my_signing_addr; char *l_sync_group = s_get_penalty_group(l_net->pub.id); @@ -474,12 +487,12 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf DAP_DELETE(l_sync_group); dap_global_db_cluster_add_notify_callback(l_session->db_cluster, s_db_change_notifier, l_session); - dap_list_t *l_validators = dap_chain_net_srv_stake_get_validators(a_chain->net_id, false); for (dap_list_t *it = l_validators; it; it = it->next) { dap_stream_node_addr_t *l_addr = &((dap_chain_net_srv_stake_item_t *)it->data)->node_addr; dap_global_db_cluster_member_add(l_session->db_cluster, l_addr, DAP_GDB_MEMBER_ROLE_ROOT); - dap_chain_net_add_validator_to_clusters(a_chain, l_addr); } + dap_list_free_full(l_validators, NULL); + //Find order minimum fee l_esbocs_pvt->block_sign_pkey = dap_pkey_from_enc_key(l_esbocs_pvt->blocks_sign_key); char *l_gdb_group_str = dap_chain_net_srv_order_get_gdb_group(l_net); @@ -509,7 +522,6 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf if (IS_ZERO_256(l_esbocs_pvt->minimum_fee)) { log_it(L_ERROR, "No valid order found was signed by this validator deledgated key. Switch off validator mode."); - DAP_DEL_Z(l_esbocs->session); return -4; } pthread_mutexattr_t l_mutex_attr; @@ -521,8 +533,6 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf dap_chain_add_callback_notify(a_chain, s_new_atom_notifier, l_session); s_session_round_new(l_session); - log_it(L_INFO, "Init session for net:%s, chain:%s", a_chain->net_name, a_chain->name); - DL_APPEND(s_session_items, l_session); l_session->cs_timer = dap_timerfd_start(1000, s_session_timer, l_session); debug_if(l_esbocs_pvt->debug, L_MSG, "Consensus main timer is started"); @@ -533,11 +543,9 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf bool dap_chain_esbocs_started(dap_chain_net_id_t a_net_id) { dap_chain_esbocs_session_t *l_session; - DL_FOREACH(s_session_items, l_session) { - if (l_session->chain->net_id.uint64 == a_net_id.uint64 && - l_session->esbocs && l_session->esbocs->_pvt) - return true; - } + DL_FOREACH(s_session_items, l_session) + if (l_session->chain->net_id.uint64 == a_net_id.uint64) + return DAP_CHAIN_PVT(l_session->chain)->cs_started; return false; } @@ -588,6 +596,36 @@ void dap_chain_esbocs_start_timer(dap_chain_net_id_t a_net_id) } } +bool dap_chain_esbocs_add_validator_to_clusters(dap_chain_net_id_t a_net_id, dap_stream_node_addr_t *a_validator_addr) +{ + dap_return_val_if_fail(a_validator_addr, -1); + dap_chain_esbocs_session_t *l_session; + bool l_ret = false; + DL_FOREACH(s_session_items, l_session) + if (l_session->chain->net_id.uint64 == a_net_id.uint64) { + l_ret = dap_chain_net_add_validator_to_clusters(l_session->chain, a_validator_addr); + if (l_session->db_cluster) + l_ret &= (bool)dap_global_db_cluster_member_add(l_session->db_cluster, a_validator_addr, DAP_GDB_MEMBER_ROLE_ROOT); + return l_ret; + } + return NULL; +} + +bool dap_chain_esbocs_remove_validator_from_clusters(dap_chain_net_id_t a_net_id, dap_stream_node_addr_t *a_validator_addr) +{ + dap_return_val_if_fail(a_validator_addr, -1); + dap_chain_esbocs_session_t *l_session; + bool l_ret = false; + DL_FOREACH(s_session_items, l_session) + if (l_session->chain->net_id.uint64 == a_net_id.uint64) { + l_ret = dap_chain_net_remove_validator_from_clusters(l_session->chain, a_validator_addr); + if (l_session->db_cluster) + l_ret &= dap_global_db_cluster_member_delete(l_session->db_cluster, a_validator_addr); + return l_ret; + } + return NULL; +} + static uint256_t s_callback_get_minimum_fee(dap_chain_t *a_chain) { dap_chain_cs_blocks_t *l_blocks = DAP_CHAIN_CS_BLOCKS(a_chain); diff --git a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h index 914c430414..210a79a3a5 100644 --- a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h +++ b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h @@ -215,3 +215,5 @@ void dap_chain_esbocs_start_timer(dap_chain_net_id_t a_net_id); dap_pkey_t *dap_chain_esbocs_get_sign_pkey(dap_chain_net_id_t a_net_id); uint256_t dap_chain_esbocs_get_fee(dap_chain_net_id_t a_net_id); bool dap_chain_esbocs_get_autocollect_status(dap_chain_net_id_t a_net_id); +bool dap_chain_esbocs_add_validator_to_clusters(dap_chain_net_id_t a_net_id, dap_stream_node_addr_t *a_validator_addr); +bool dap_chain_esbocs_remove_validator_from_clusters(dap_chain_net_id_t a_net_id, dap_stream_node_addr_t *a_validator_addr); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 5934ebda6f..c25241d263 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -268,7 +268,7 @@ static bool s_new_balancer_link_request(dap_chain_net_t *a_net, int a_link_repla int dap_chain_net_init() { dap_ledger_init(); - dap_stream_ch_chain_init(); + dap_chain_ch_init(); dap_stream_ch_chain_net_init(); dap_chain_node_client_init(); dap_chain_net_voting_init(); @@ -3008,10 +3008,16 @@ int dap_chain_net_add_poa_certs_to_cluster(dap_chain_net_t *a_net, dap_global_db bool dap_chain_net_add_validator_to_clusters(dap_chain_t *a_chain, dap_stream_node_addr_t *a_addr) { - bool l_ret = dap_global_db_cluster_member_add( - dap_chain_net_get_mempool_cluster(a_chain), a_addr, DAP_GDB_MEMBER_ROLE_ROOT); - return !l_ret ? false : dap_global_db_cluster_member_add( - PVT(dap_chain_net_by_id(a_chain->net_id))->orders_cluster, a_addr, DAP_GDB_MEMBER_ROLE_USER); + bool l_ret = dap_global_db_cluster_member_add(dap_chain_net_get_mempool_cluster(a_chain), a_addr, DAP_GDB_MEMBER_ROLE_ROOT); + l_ret &= (bool)dap_global_db_cluster_member_add(PVT(dap_chain_net_by_id(a_chain->net_id))->orders_cluster, a_addr, DAP_GDB_MEMBER_ROLE_USER); + return l_ret; +} + +bool dap_chain_net_remove_validator_from_clusters(dap_chain_t *a_chain, dap_stream_node_addr_t *a_addr) +{ + bool l_ret = !dap_global_db_cluster_member_delete(dap_chain_net_get_mempool_cluster(a_chain), a_addr); + l_ret &= !dap_global_db_cluster_member_delete(PVT(dap_chain_net_by_id(a_chain->net_id))->orders_cluster, a_addr); + return l_ret; } /** diff --git a/modules/net/dap_chain_node_cli_cmd.c b/modules/net/dap_chain_node_cli_cmd.c index 7a898eaf48..9c6e87cd79 100644 --- a/modules/net/dap_chain_node_cli_cmd.c +++ b/modules/net/dap_chain_node_cli_cmd.c @@ -1318,7 +1318,7 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) } log_it(L_NOTICE, "Stream connection established"); - dap_stream_ch_chain_sync_request_t l_sync_request = {}; + dap_chain_ch_sync_request_t l_sync_request = {}; dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, DAP_STREAM_CH_CHAIN_ID); // fill begin id l_sync_request.id_start = 1; @@ -1327,7 +1327,7 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) log_it(L_INFO, "Requested GLOBAL_DB syncronizatoin, %"DAP_UINT64_FORMAT_U":%"DAP_UINT64_FORMAT_U" period", l_sync_request.id_start, l_sync_request.id_end); - if(0 == dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, + if(0 == dap_chain_ch_pkt_write_unsafe(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id.uint64, 0, 0, &l_sync_request, sizeof(l_sync_request))) { dap_cli_server_cmd_set_reply_text(a_str_reply, "Error: Can't send sync chains request"); @@ -1361,8 +1361,8 @@ int com_node(int a_argc, char ** a_argv, void **a_str_reply) // reset state NODE_CLIENT_STATE_SYNCED dap_chain_node_client_reset(l_node_client); // send request - dap_stream_ch_chain_sync_request_t l_sync_request = {}; - if(0 == dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, + dap_chain_ch_sync_request_t l_sync_request = {}; + if(0 == dap_chain_ch_pkt_write_unsafe(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id.uint64, l_chain->id.uint64, l_remote_node_info->hdr.cell_id.uint64, &l_sync_request, sizeof(l_sync_request))) { dap_cli_server_cmd_set_reply_text(a_str_reply, "Error: Can't send sync chains request"); diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 4d82fde476..096d75700f 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -76,11 +76,11 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg); static bool s_timer_update_states_callback(void *a_arg); static int s_node_client_set_notify_callbacks(dap_client_t *a_client, uint8_t a_ch_id); -static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t*, uint8_t a_pkt_type, - dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, +static void s_ch_chain_callback_notify_packet_out(dap_chain_ch_t*, uint8_t a_pkt_type, + dap_chain_ch_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg); -static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type, - dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, +static void s_ch_chain_callback_notify_packet_in(dap_chain_ch_t* a_ch_chain, uint8_t a_pkt_type, + dap_chain_ch_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg); bool s_stream_ch_chain_debug_more = false; @@ -182,7 +182,7 @@ dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_chain_node_cli // check if esocket still in worker dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(a_node_client->stream_worker, a_node_client->ch_chain_uuid); if (l_ch) { - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + dap_chain_ch_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); assert(l_ch_chain); dap_chain_net_t * l_net = a_node_client->net; assert(l_net); @@ -192,14 +192,14 @@ dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_chain_node_cli bool l_trylocked = dap_chain_net_sync_trylock(l_net, a_node_client); if (l_trylocked) { log_it(L_INFO, "Start synchronization process with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr)); - dap_stream_ch_chain_sync_request_t l_sync_chain = {}; + dap_chain_ch_sync_request_t l_sync_chain = {}; l_sync_chain.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); - dap_stream_ch_chain_pkt_write_unsafe(a_node_client->ch_chain, + dap_chain_ch_pkt_write_unsafe(a_node_client->ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ, l_net->pub.id.uint64, l_net->pub.chains->id.uint64, 0, &l_sync_chain, sizeof(l_sync_chain)); if (!l_ch_chain->activity_timer) - dap_stream_ch_chain_timer_start(l_ch_chain); + dap_chain_ch_timer_start(l_ch_chain); return NODE_SYNC_STATUS_STARTED; } else return NODE_SYNC_STATUS_WAITING; @@ -327,15 +327,15 @@ static void s_ch_chain_callback_notify_packet_in2(dap_stream_ch_chain_net_t* a_c /** - * @brief s_ch_chain_callback_notify_packet_in - for dap_stream_ch_chain + * @brief s_ch_chain_callback_notify_packet_in - for dap_chain_ch * @param a_ch_chain * @param a_pkt_type * @param a_pkt * @param a_pkt_data_size * @param a_arg */ -static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type, - dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, +static void s_ch_chain_callback_notify_packet_in(dap_chain_ch_t* a_ch_chain, uint8_t a_pkt_type, + dap_chain_ch_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg) { UNUSED(a_pkt_data_size); @@ -351,7 +351,7 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha l_node_client->last_error); l_node_client->state = NODE_CLIENT_STATE_ERROR; if (!strcmp(l_node_client->last_error, "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS")) { - dap_stream_ch_chain_reset_unsafe(a_ch_chain); + dap_chain_ch_reset_unsafe(a_ch_chain); l_finished = true; } break; @@ -427,7 +427,7 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" started to sync %s chain",l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_node_addr), l_node_client->cur_chain->name ); } - dap_stream_ch_chain_pkt_write_unsafe(l_node_client->ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ, + dap_chain_ch_pkt_write_unsafe(l_node_client->ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ, l_net->pub.id.uint64 , l_chain_id.uint64,l_cell_id.uint64,NULL,0); } else { // If no - over with sync process @@ -465,8 +465,8 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha * @param a_pkt_data_size * @param a_arg */ -static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type, - dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, +static void s_ch_chain_callback_notify_packet_out(dap_chain_ch_t* a_ch_chain, uint8_t a_pkt_type, + dap_chain_ch_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg) { (void) a_pkt; @@ -772,7 +772,7 @@ void dap_chain_node_client_close_unsafe(dap_chain_node_client_t *a_node_client) if (a_node_client->stream_worker) { dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(a_node_client->stream_worker, a_node_client->ch_chain_uuid); if (l_ch) { - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + dap_chain_ch_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); l_ch_chain->callback_notify_packet_in = NULL; l_ch_chain->callback_notify_packet_out = NULL; } @@ -932,7 +932,7 @@ static int s_node_client_set_notify_callbacks(dap_client_t *a_client, uint8_t a_ switch (a_ch_id) { // 'C' case DAP_STREAM_CH_CHAIN_ID: { - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + dap_chain_ch_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); l_ch_chain->callback_notify_packet_out = s_ch_chain_callback_notify_packet_out; l_ch_chain->callback_notify_packet_in = s_ch_chain_callback_notify_packet_in; l_ch_chain->callback_notify_arg = l_node_client; diff --git a/modules/net/include/dap_chain_net.h b/modules/net/include/dap_chain_net.h index 71abb1e26f..3472aa51fb 100644 --- a/modules/net/include/dap_chain_net.h +++ b/modules/net/include/dap_chain_net.h @@ -160,6 +160,7 @@ dap_chain_node_info_t *dap_chain_net_balancer_link_from_cfg(dap_chain_net_t *a_n int dap_chain_net_add_poa_certs_to_cluster(dap_chain_net_t *a_net, dap_global_db_cluster_t *a_cluster); bool dap_chain_net_add_validator_to_clusters(dap_chain_t *a_chain, dap_stream_node_addr_t *a_addr); +bool dap_chain_net_remove_validator_from_clusters(dap_chain_t *a_chain, dap_stream_node_addr_t *a_addr); dap_global_db_cluster_t *dap_chain_net_get_mempool_cluster(dap_chain_t *a_chain); int dap_chain_net_add_reward(dap_chain_net_t *a_net, uint256_t a_reward, uint64_t a_block_num); diff --git a/modules/service/stake/dap_chain_net_srv_stake_pos_delegate.c b/modules/service/stake/dap_chain_net_srv_stake_pos_delegate.c index 1e2534e5e4..f7bfbe5afb 100644 --- a/modules/service/stake/dap_chain_net_srv_stake_pos_delegate.c +++ b/modules/service/stake/dap_chain_net_srv_stake_pos_delegate.c @@ -33,7 +33,7 @@ #include "dap_chain_net_tx.h" #include "dap_chain_net_srv.h" #include "dap_chain_net_srv_stake_pos_delegate.h" - +#include "dap_chain_cs_esbocs.h" #include "rand/dap_rand.h" #include "dap_chain_node_client.h" #include "dap_stream_ch_chain_net_pkt.h" @@ -305,6 +305,7 @@ void dap_chain_net_srv_stake_key_delegate(dap_chain_net_t *a_net, dap_chain_addr } } } + char l_key_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE]; dap_chain_hash_fast_to_str(&a_signing_addr->data.hash_fast, l_key_hash_str, DAP_CHAIN_HASH_FAST_STR_SIZE); @@ -2127,7 +2128,7 @@ static int s_callback_compare_tx_list(dap_list_t *a_datum1, dap_list_t *a_datum2 ? 0 : l_datum1->header.ts_created > l_datum2->header.ts_created ? 1 : -1; } -int dap_chain_net_srv_stake_check_validator(dap_chain_net_t * a_net, dap_hash_fast_t *a_tx_hash, dap_stream_ch_chain_validator_test_t * out_data, +int dap_chain_net_srv_stake_check_validator(dap_chain_net_t * a_net, dap_hash_fast_t *a_tx_hash, dap_chain_ch_validator_test_t * out_data, int a_time_connect, int a_time_respone) { char *l_key = NULL; @@ -2201,12 +2202,12 @@ int dap_chain_net_srv_stake_check_validator(dap_chain_net_t * a_net, dap_hash_fa rc = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_VALID_READY, a_time_respone); if (!rc) { - dap_stream_ch_chain_validator_test_t *validators_data = (dap_stream_ch_chain_validator_test_t*)l_node_client->callbacks_arg; + dap_chain_ch_validator_test_t *validators_data = (dap_chain_ch_validator_test_t*)l_node_client->callbacks_arg; dap_sign_t *l_sign = NULL; bool l_sign_correct = false; if(validators_data->header.sign_size){ - l_sign = (dap_sign_t*)(l_node_client->callbacks_arg + sizeof(dap_stream_ch_chain_validator_test_t)); + l_sign = (dap_sign_t*)(l_node_client->callbacks_arg + sizeof(dap_chain_ch_validator_test_t)); dap_hash_fast_t l_sign_pkey_hash; dap_sign_get_pkey_hash(l_sign, &l_sign_pkey_hash); l_sign_correct = dap_hash_fast_compare(&l_tx_out_cond->subtype.srv_stake_pos_delegate.signing_addr.data.hash_fast, &l_sign_pkey_hash); @@ -2295,7 +2296,7 @@ static int s_cli_srv_stake(int a_argc, char **a_argv, void **a_str_reply) const char * str_tx_hash = NULL; dap_chain_net_t * l_net = NULL; dap_hash_fast_t l_tx = {}; - dap_stream_ch_chain_validator_test_t l_out = {0}; + dap_chain_ch_validator_test_t l_out = {0}; dap_cli_server_cmd_find_option_val(a_argv, l_arg_index, a_argc, "-net", &l_netst); dap_cli_server_cmd_find_option_val(a_argv, l_arg_index, a_argc, "-tx", &str_tx_hash); diff --git a/modules/service/stake/include/dap_chain_net_srv_stake_pos_delegate.h b/modules/service/stake/include/dap_chain_net_srv_stake_pos_delegate.h index b9ed7aaed0..37fce710e3 100644 --- a/modules/service/stake/include/dap_chain_net_srv_stake_pos_delegate.h +++ b/modules/service/stake/include/dap_chain_net_srv_stake_pos_delegate.h @@ -86,7 +86,7 @@ json_object *dap_chain_net_srv_stake_get_fee_validators_json(dap_chain_net_t *a_ int dap_chain_net_srv_stake_load_cache(dap_chain_net_t *a_net); void dap_chain_net_srv_stake_purge(dap_chain_net_t *a_net); -int dap_chain_net_srv_stake_check_validator(dap_chain_net_t * a_net, dap_hash_fast_t *a_tx_hash, dap_stream_ch_chain_validator_test_t * out_data, +int dap_chain_net_srv_stake_check_validator(dap_chain_net_t * a_net, dap_hash_fast_t *a_tx_hash, dap_chain_ch_validator_test_t * out_data, int a_time_connect, int a_time_respone); dap_chain_datum_decree_t *dap_chain_net_srv_stake_decree_approve(dap_chain_net_t *a_net, -- GitLab