diff --git a/modules/chain/dap_chain_cell.c b/modules/chain/dap_chain_cell.c index 5cb49c37e9bbd3574d51d925fe0b9ca93659e4cf..ff2f658b81d7f257bc06f8a56cd528f1516fd688 100644 --- a/modules/chain/dap_chain_cell.c +++ b/modules/chain/dap_chain_cell.c @@ -317,7 +317,7 @@ int dap_chain_cell_file_append(dap_chain_cell_t *a_cell, const void *a_atom, siz a_cell->id.uint64); pthread_rwlock_unlock(&a_cell->storage_rwlock); return -3; - } else if (s_file_write_header(a_cell)) {// fill the header + } else if (!s_file_write_header(a_cell)) {// fill the header pthread_rwlock_unlock(&a_cell->storage_rwlock); return -4; } diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 4b0c88105a31f92771681995a6a886d14599e364..b51d0b7f7f1766ffd18d9b122ba7b1e31e196d1e 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -278,10 +278,13 @@ static void s_sync_out_chains_first_worker_callback(dap_worker_t *a_worker, void } dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); - l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS; - l_ch_chain->request_atom_iter = l_sync_request->chain.request_atom_iter; - l_ch_chain->remote_atoms = l_sync_request->remote_atoms; /// TODO check if they were present here before + if (l_ch_chain->state != CHAIN_STATE_UPDATE_CHAINS_REMOTE) { + log_it(L_INFO, "Timeout fired before we sent the reply"); + s_sync_request_delete(l_sync_request); + return; + } + l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS; dap_chain_node_addr_t l_node_addr = {}; dap_chain_net_t *l_net = dap_chain_net_by_id(l_sync_request->request_hdr.net_id); l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); @@ -376,10 +379,14 @@ static void s_sync_out_gdb_first_worker_callback(dap_worker_t *a_worker, void *a } dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN( l_ch ); - dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_hdr.net_id); + if (l_ch_chain->state != CHAIN_STATE_UPDATE_GLOBAL_DB_REMOTE) { + log_it(L_INFO, "Timeout fired before we sent the reply"); + s_sync_request_delete(l_sync_request); + return; + } + dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_hdr.net_id); // Add it to outgoing list - if (l_ch_chain->request_db_log == NULL) l_ch_chain->request_db_log = l_sync_request->gdb.db_log; l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB; dap_chain_node_addr_t l_node_addr = { 0 }; l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); @@ -945,7 +952,7 @@ static bool s_chain_timer_callback(void *a_arg) return false; } dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); - if (l_ch_chain->timer_shots++ >= 500) { // 500 * 20 = 10 sec + if (l_ch_chain->timer_shots++ >= DAP_SYNC_TICKS_PER_SECOND * DAP_CHAIN_NODE_SYNC_TIMEOUT) { if (!s_ch_chain_get_idle(l_ch_chain)) { s_ch_chain_go_idle(l_ch_chain); if (l_ch_chain->callback_notify_packet_out) @@ -961,13 +968,13 @@ static bool s_chain_timer_callback(void *a_arg) if (l_ch_chain->state != CHAIN_STATE_WAITING && l_ch_chain->sent_breaks) s_stream_ch_packet_out(l_ch, NULL); // Sending dumb packet with nothing to inform remote thats we're just skiping atoms of GDB's, nothing freezed - if (l_ch_chain->state == CHAIN_STATE_SYNC_CHAINS && l_ch_chain->sent_breaks >= 150) { // 150 * 20 = 3 sec + if (l_ch_chain->state == CHAIN_STATE_SYNC_CHAINS && l_ch_chain->sent_breaks >= 3 * DAP_SYNC_TICKS_PER_SECOND) { dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); l_ch_chain->sent_breaks = 0; } - if (l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB && l_ch_chain->sent_breaks >= 150) { // 150 * 20 = 3 sec + if (l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB && l_ch_chain->sent_breaks >= 3 * DAP_SYNC_TICKS_PER_SECOND) { if (s_debug_more) log_it(L_INFO, "Send one global_db TSD packet (rest=%zu/%zu items)", dap_db_log_list_get_count_rest(l_ch_chain->request_db_log), @@ -991,7 +998,9 @@ void dap_stream_ch_chain_timer_start(dap_stream_ch_chain_t *a_ch_chain) { dap_stream_ch_uuid_t *l_uuid = DAP_DUP(&DAP_STREAM_CH(a_ch_chain)->uuid); a_ch_chain->activity_timer = dap_timerfd_start_on_worker(DAP_STREAM_CH(a_ch_chain)->stream_worker->worker, - 20, s_chain_timer_callback, (void *)l_uuid); + 1000 / DAP_SYNC_TICKS_PER_SECOND, + s_chain_timer_callback, (void *)l_uuid); + a_ch_chain->sent_breaks = 0; } /** diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index f6e681a2741e9ee0c7d573ce04bbfdcece058b08..df8e1ea843ce25df34d7417a0ebc79b438a20a6a 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -34,6 +34,9 @@ #include "uthash.h" #include "dap_global_db_remote.h" +#define DAP_CHAIN_NODE_SYNC_TIMEOUT 30 // sec +#define DAP_SYNC_TICKS_PER_SECOND 50 + typedef struct dap_stream_ch_chain dap_stream_ch_chain_t; typedef void (*dap_stream_ch_chain_callback_packet_t)(dap_stream_ch_chain_t*, uint8_t a_pkt_type, dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, diff --git a/modules/type/blocks/dap_chain_block_cache.c b/modules/type/blocks/dap_chain_block_cache.c index b81d8748b9dec9809edb0e9f679f9d23db06ffcd..f2c0569b326d9340e41ae7cc7ef4ec2bccfd0ebc 100644 --- a/modules/type/blocks/dap_chain_block_cache.c +++ b/modules/type/blocks/dap_chain_block_cache.c @@ -53,7 +53,9 @@ void dap_chain_block_cache_deinit() * @param a_block_size * @return */ -dap_chain_block_cache_t * dap_chain_block_cache_new(dap_chain_cs_blocks_t *a_blocks, dap_chain_block_t * a_block, size_t a_block_size) + +dap_chain_block_cache_t *dap_chain_block_cache_new(dap_chain_cs_blocks_t *a_blocks, dap_hash_fast_t *a_block_hash, + dap_chain_block_t *a_block, size_t a_block_size) { if (! a_block) return NULL; @@ -64,7 +66,7 @@ dap_chain_block_cache_t * dap_chain_block_cache_new(dap_chain_cs_blocks_t *a_blo l_block_cache->_inheritor = a_blocks; l_block_cache->ts_created = a_block->hdr.ts_created; l_block_cache->sign_count = dap_chain_block_get_signs_count(a_block, a_block_size); - if (dap_chain_block_cache_update(l_block_cache)) { + if (dap_chain_block_cache_update(l_block_cache, a_block_hash)) { log_it(L_WARNING, "Block cache can't be created, possible cause corrupted block inside"); DAP_DELETE(l_block_cache); return NULL; @@ -90,11 +92,15 @@ dap_chain_block_cache_t * dap_chain_block_cache_dup(dap_chain_block_cache_t * a_ * @brief dap_chain_block_cache_update * @param a_block_cache */ -int dap_chain_block_cache_update(dap_chain_block_cache_t * a_block_cache) + +int dap_chain_block_cache_update(dap_chain_block_cache_t *a_block_cache, dap_hash_fast_t *a_block_hash) { assert(a_block_cache); assert(a_block_cache->block); - dap_hash_fast(a_block_cache->block, a_block_cache->block_size, &a_block_cache->block_hash); + if (a_block_hash) + a_block_cache->block_hash = *a_block_hash; + else + dap_hash_fast(a_block_cache->block, a_block_cache->block_size, &a_block_cache->block_hash); a_block_cache->block_hash_str = dap_hash_fast_to_str_new(&a_block_cache->block_hash); DAP_DEL_Z(a_block_cache->meta); a_block_cache->meta = dap_chain_block_get_meta(a_block_cache->block, a_block_cache->block_size, &a_block_cache->meta_count); diff --git a/modules/type/blocks/dap_chain_block_chunk.c b/modules/type/blocks/dap_chain_block_chunk.c index def3a69ec39f7386564160a7e35ec55bf0e4de02..b79ce22ac96814a1000354dd0ef926f2cb9cbb69 100644 --- a/modules/type/blocks/dap_chain_block_chunk.c +++ b/modules/type/blocks/dap_chain_block_chunk.c @@ -44,7 +44,10 @@ dap_chain_block_chunks_t * dap_chain_block_chunks_create(dap_chain_cs_blocks_t * size_t l_objs_count =0; dap_global_db_obj_t * l_objs= dap_global_db_get_all_sync(l_ret->gdb_group, &l_objs_count); for(size_t n=0; n< l_objs_count; n++){ - dap_chain_block_cache_t *l_block_cache = dap_chain_block_cache_new(a_blocks, (dap_chain_block_t*)l_objs[n].value, l_objs[n].value_len); + dap_hash_fast_t l_block_hash; + dap_chain_hash_fast_from_str(l_objs[n].key, &l_block_hash); + dap_chain_block_cache_t *l_block_cache = dap_chain_block_cache_new(a_blocks, &l_block_hash, + (dap_chain_block_t*)l_objs[n].value, l_objs[n].value_len); dap_chain_block_chunks_add(l_ret, l_block_cache ); } dap_global_db_objs_delete(l_objs,l_objs_count); @@ -95,8 +98,8 @@ void dap_chain_block_chunks_add(dap_chain_block_chunks_t * a_chunks,dap_chain_bl return; } // Save to GDB - dap_global_db_set(a_chunks->gdb_group, a_block_cache->block_hash_str, a_block_cache->block, a_block_cache->block_size, - true, NULL, NULL ); + //dap_global_db_set(a_chunks->gdb_group, a_block_cache->block_hash_str, a_block_cache->block, a_block_cache->block_size, + // true, NULL, NULL ); // And here we select chunk for the new block cache bool l_is_chunk_found = false; diff --git a/modules/type/blocks/dap_chain_cs_blocks.c b/modules/type/blocks/dap_chain_cs_blocks.c index 488c92796dc803ac4f336cd4154bc4ea6054c80c..87ec3a674e40d754b49f0c70403ee6217f4284bf 100644 --- a/modules/type/blocks/dap_chain_cs_blocks.c +++ b/modules/type/blocks/dap_chain_cs_blocks.c @@ -705,33 +705,23 @@ static int s_add_atom_to_ledger(dap_chain_cs_blocks_t * a_blocks, dap_ledger_t * */ static int s_add_atom_to_blocks(dap_chain_cs_blocks_t * a_blocks, dap_ledger_t * a_ledger, dap_chain_block_cache_t * a_block_cache ) { - pthread_rwlock_rdlock( &PVT(a_blocks)->rwlock ); - int res = a_blocks->callback_block_verify ? - a_blocks->callback_block_verify(a_blocks,a_block_cache->block, a_block_cache->block_size) - : 0; - if (res == 0 || memcmp( &a_block_cache->block_hash, &PVT(a_blocks)->genesis_block_hash, sizeof(a_block_cache->block_hash) ) == 0) { - debug_if(s_debug_more, L_DEBUG, "Block %s checked, add it to ledger", a_block_cache->block_hash_str); - pthread_rwlock_unlock( &PVT(a_blocks)->rwlock ); - res = s_add_atom_to_ledger(a_blocks, a_ledger, a_block_cache); - debug_if(s_debug_more, L_DEBUG, "Block %s checked, %s", a_block_cache->block_hash_str, - res == (int)a_block_cache->datum_count ? - "all correct" : "but ledger declined"); - //All correct, no matter for result - pthread_rwlock_wrlock( &PVT(a_blocks)->rwlock ); - HASH_ADD(hh, PVT(a_blocks)->blocks,block_hash,sizeof (a_block_cache->block_hash), a_block_cache); - PVT(a_blocks)->blocks_count++; - if (! (PVT(a_blocks)->block_cache_first ) ) - PVT(a_blocks)->block_cache_first = a_block_cache; - if (PVT(a_blocks)->block_cache_last) - PVT(a_blocks)->block_cache_last->next = a_block_cache; - a_block_cache->prev = PVT(a_blocks)->block_cache_last; - PVT(a_blocks)->block_cache_last = a_block_cache; - res = 1; - } else { - log_it(L_WARNING,"Block %s check failed: code %d", a_block_cache->block_hash_str, res ); - } + int l_res = 0; + l_res = s_add_atom_to_ledger(a_blocks, a_ledger, a_block_cache); + debug_if(s_debug_more, L_DEBUG, "Block %s checked, %s", a_block_cache->block_hash_str, + l_res == (int)a_block_cache->datum_count ? + "all correct" : "but ledger declined"); + //All correct, no matter for result + pthread_rwlock_wrlock( &PVT(a_blocks)->rwlock ); + HASH_ADD(hh, PVT(a_blocks)->blocks,block_hash,sizeof (a_block_cache->block_hash), a_block_cache); + PVT(a_blocks)->blocks_count++; + if (! (PVT(a_blocks)->block_cache_first ) ) + PVT(a_blocks)->block_cache_first = a_block_cache; + if (PVT(a_blocks)->block_cache_last) + PVT(a_blocks)->block_cache_last->next = a_block_cache; + a_block_cache->prev = PVT(a_blocks)->block_cache_last; + PVT(a_blocks)->block_cache_last = a_block_cache; pthread_rwlock_unlock( &PVT(a_blocks)->rwlock ); - return res; + return 1; } @@ -776,7 +766,11 @@ static void s_bft_consensus_setup(dap_chain_cs_blocks_t * a_blocks) } // Pass through all the chunk and add it to main chain for(l_block_cache= l_chunk->block_cache_top ;l_block_cache; l_block_cache=l_block_cache->prev){ - int l_check_res = s_add_atom_to_blocks(a_blocks, a_blocks->chain->ledger, l_block_cache); + int l_check_res = 0; + if (a_blocks->callback_block_verify) + l_check_res = a_blocks->callback_block_verify(a_blocks, l_block_cache->block, l_block_cache->block_size); + if (!l_check_res) + l_check_res = s_add_atom_to_blocks(a_blocks, a_blocks->chain->ledger, l_block_cache); if ( l_check_res != 0 ){ log_it(L_WARNING,"Can't move block %s from chunk to main chain - data inside wasn't verified: code %d", l_block_cache->block_hash_str, l_check_res); @@ -806,37 +800,33 @@ static void s_bft_consensus_setup(dap_chain_cs_blocks_t * a_blocks) */ static dap_chain_atom_verify_res_t s_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t a_atom , size_t a_atom_size) { - dap_chain_atom_verify_res_t ret = ATOM_ACCEPT; dap_chain_cs_blocks_t * l_blocks = DAP_CHAIN_CS_BLOCKS(a_chain); dap_chain_block_t * l_block = (dap_chain_block_t *) a_atom; + size_t l_block_size = a_atom_size; pthread_rwlock_wrlock(&PVT(l_blocks)->datums_lock); dap_chain_hash_fast_t l_block_hash; - size_t l_block_size = a_atom_size; - dap_hash_fast(a_atom,a_atom_size, & l_block_hash); + dap_hash_fast(l_block, l_block_size, &l_block_hash); dap_chain_block_cache_t * l_block_cache = dap_chain_block_cs_cache_get_by_hash(l_blocks, &l_block_hash); if (l_block_cache ){ debug_if(s_debug_more, L_DEBUG, "... already present in blocks %s", l_block_cache->block_hash_str); pthread_rwlock_unlock(&PVT(l_blocks)->datums_lock); return ATOM_PASS; } else { - l_block_cache = dap_chain_block_cache_new(l_blocks, l_block, l_block_size); + l_block_cache = dap_chain_block_cache_new(l_blocks, &l_block_hash, l_block, l_block_size); if (!l_block_cache) { log_it(L_DEBUG, "... corrupted block"); pthread_rwlock_unlock(&PVT(l_blocks)->datums_lock); return ATOM_REJECT; } debug_if(s_debug_more, L_DEBUG, "... new block %s", l_block_cache->block_hash_str); - ret = ATOM_ACCEPT; } pthread_rwlock_unlock(&PVT(l_blocks)->datums_lock); // verify hashes and consensus - if(ret == ATOM_ACCEPT){ - ret = s_callback_atom_verify (a_chain, a_atom, a_atom_size); - debug_if(s_debug_more, L_DEBUG, "Verified atom %p: %s", a_atom, ret == ATOM_ACCEPT ? "accepted" : - (ret == ATOM_REJECT ? "rejected" : "thresholded")); - } + dap_chain_atom_verify_res_t ret = s_callback_atom_verify (a_chain, a_atom, a_atom_size); + debug_if(s_debug_more, L_DEBUG, "Verified atom %p: %s", a_atom, ret == ATOM_ACCEPT ? "accepted" : + (ret == ATOM_REJECT ? "rejected" : "thresholded")); if( ret == ATOM_ACCEPT){ int l_consensus_check = s_add_atom_to_blocks(l_blocks, a_chain->ledger, l_block_cache); @@ -853,10 +843,15 @@ static dap_chain_atom_verify_res_t s_callback_atom_add(dap_chain_t * a_chain, da } // !TODO make chunks add to blocks }else if(ret == ATOM_MOVE_TO_THRESHOLD){ + if (dap_chain_block_cs_cache_get_by_hash(l_blocks, &l_block_hash)) { + // if it was concurrent atom processed before + dap_chain_block_cache_delete(l_block_cache); + return ATOM_PASS; + } dap_chain_block_chunks_add( PVT(l_blocks)->chunks,l_block_cache); //dap_chain_block_chunks_sort(PVT(l_blocks)->chunks); }else if (ret == ATOM_REJECT ){ - DAP_DELETE(l_block_cache); + dap_chain_block_cache_delete(l_block_cache); } //s_bft_consensus_setup(l_blocks); @@ -877,7 +872,6 @@ static dap_chain_atom_verify_res_t s_callback_atom_verify(dap_chain_t * a_chain, dap_chain_cs_blocks_pvt_t * l_blocks_pvt = PVT(l_blocks); assert(l_blocks_pvt); dap_chain_block_t * l_block = (dap_chain_block_t *) a_atom; - dap_chain_atom_verify_res_t res = ATOM_ACCEPT; if(sizeof (l_block->hdr) >= a_atom_size){ log_it(L_WARNING,"Size of block is %zd that is equal or less then block's header size %zd",a_atom_size,sizeof (l_block->hdr)); @@ -904,25 +898,23 @@ static dap_chain_atom_verify_res_t s_callback_atom_verify(dap_chain_t * a_chain, // 2nd level consensus if(l_blocks->callback_block_verify) if (l_blocks->callback_block_verify(l_blocks, l_block, a_atom_size)) - res = ATOM_REJECT; - - if(res == ATOM_ACCEPT){ - // genesis or seed mode - if (l_is_genesis) { - if (!l_blocks_pvt->blocks) { - if (s_seed_mode) - log_it(L_NOTICE, "Accepting new genesis block"); - else - log_it(L_NOTICE, "Accepting static genesis block"); - } else { - log_it(L_WARNING,"Cant accept genesis block: already present data in blockchain"); - res = ATOM_REJECT; - } - } else if (!PVT(l_blocks)->block_cache_last || - !dap_hash_fast_compare(&PVT(l_blocks)->block_cache_last->block_hash, &l_block_prev_hash)) - res = ATOM_MOVE_TO_THRESHOLD; - } - return res; + return ATOM_REJECT; + + // genesis or seed mode + if (l_is_genesis) { + if (!l_blocks_pvt->blocks) { + if (s_seed_mode) + log_it(L_NOTICE, "Accepting new genesis block"); + else + log_it(L_NOTICE, "Accepting static genesis block"); + } else { + log_it(L_WARNING,"Cant accept genesis block: already present data in blockchain"); + return ATOM_REJECT; + } + } else if (!PVT(l_blocks)->block_cache_last || + !dap_hash_fast_compare(&PVT(l_blocks)->block_cache_last->block_hash, &l_block_prev_hash)) + return ATOM_MOVE_TO_THRESHOLD; + return ATOM_ACCEPT; } /** diff --git a/modules/type/blocks/include/dap_chain_block_cache.h b/modules/type/blocks/include/dap_chain_block_cache.h index 01f0a8d3b616edc569d6b42c0cf186c8bdb95c0b..5ffde4d9ebcdec23f16454df4d4906ac0b85ce19 100644 --- a/modules/type/blocks/include/dap_chain_block_cache.h +++ b/modules/type/blocks/include/dap_chain_block_cache.h @@ -86,9 +86,11 @@ typedef struct dap_chain_block_cache{ int dap_chain_block_cache_init(); void dap_chain_block_cache_deinit(); -dap_chain_block_cache_t * dap_chain_block_cache_new(dap_chain_cs_blocks_t *a_blocks, dap_chain_block_t * a_block, size_t a_block_size); -dap_chain_block_cache_t * dap_chain_block_cache_dup(dap_chain_block_cache_t * a_block); -int dap_chain_block_cache_update(dap_chain_block_cache_t * a_block_cache); -void dap_chain_block_cache_delete(dap_chain_block_cache_t * a_block_cache); -dap_chain_datum_tx_t* dap_chain_block_cache_get_tx_by_hash (dap_chain_block_cache_t * a_block_cache, dap_chain_hash_fast_t * a_tx_hash); + +dap_chain_block_cache_t *dap_chain_block_cache_new(dap_chain_cs_blocks_t *a_blocks, dap_hash_fast_t *a_block_hash, + dap_chain_block_t *a_block, size_t a_block_size); +dap_chain_block_cache_t *dap_chain_block_cache_dup(dap_chain_block_cache_t *a_block); +int dap_chain_block_cache_update(dap_chain_block_cache_t *a_block_cache, dap_hash_fast_t *a_block_hash); +void dap_chain_block_cache_delete(dap_chain_block_cache_t *a_block_cache); +dap_chain_datum_tx_t *dap_chain_block_cache_get_tx_by_hash(dap_chain_block_cache_t *a_block_cache, dap_chain_hash_fast_t *a_tx_hash);