From 8ecc9ea0306a36d619a986d384e58a7f075c2f3d Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Fri, 24 Sep 2021 12:48:00 +0300 Subject: [PATCH] [*] A bit of order in chains --- CMakeLists.txt | 2 +- modules/chain/dap_chain.c | 1 + modules/chain/dap_chain_cell.c | 29 +++++-- modules/chain/dap_chain_ledger.c | 9 +-- modules/chain/include/dap_chain_cell.h | 1 + modules/chain/include/dap_chain_ledger.h | 4 +- modules/channel/chain/dap_stream_ch_chain.c | 44 ++++++----- modules/net/dap_chain_node_client.c | 5 +- modules/type/dag/dap_chain_cs_dag.c | 75 ++++++++++++------- .../type/dag/include/dap_chain_cs_dag_event.h | 8 +- 10 files changed, 113 insertions(+), 65 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index bb5753289e..987180e010 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 3.0) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.9-45") +set(CELLFRAME_SDK_NATIVE_VERSION "2.9-46") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") message("Cellframe modules: ${CELLFRAME_MODULES}") diff --git a/modules/chain/dap_chain.c b/modules/chain/dap_chain.c index a2979aa502..c0d8c61a17 100644 --- a/modules/chain/dap_chain.c +++ b/modules/chain/dap_chain.c @@ -342,6 +342,7 @@ dap_chain_t * dap_chain_load_from_cfg(dap_ledger_t* a_ledger, const char * a_cha log_it(L_DEBUG, "Added atom from treshold"); } } + /* Temporary garbage cleaner */ dap_chain_save_all( l_chain ); // Save only the valid chain, throw all garbage out! log_it (L_NOTICE, "Loaded chain files"); } else { diff --git a/modules/chain/dap_chain_cell.c b/modules/chain/dap_chain_cell.c index e4b10ca85a..74cc658e77 100644 --- a/modules/chain/dap_chain_cell.c +++ b/modules/chain/dap_chain_cell.c @@ -88,6 +88,7 @@ dap_chain_cell_t * dap_chain_cell_create_fill(dap_chain_t * a_chain, dap_chain_c l_cell->chain = a_chain; l_cell->id.uint64 = a_cell_id.uint64; l_cell->file_storage_path = dap_strdup_printf("%0"DAP_UINT64_FORMAT_x".dchaincell", l_cell->id.uint64); + pthread_rwlock_init(&l_cell->storage_rwlock, NULL); pthread_rwlock_wrlock(&a_chain->cell_rwlock); HASH_ADD(hh, a_chain->cells, id, sizeof(dap_chain_cell_id_t), l_cell); pthread_rwlock_unlock(&a_chain->cell_rwlock); @@ -100,6 +101,7 @@ dap_chain_cell_t * dap_chain_cell_create_fill2(dap_chain_t * a_chain, const char l_cell->chain = a_chain; sscanf(a_filename, "%"DAP_UINT64_FORMAT_x".dchaincell", &l_cell->id.uint64); l_cell->file_storage_path = dap_strdup_printf(a_filename); + pthread_rwlock_init(&l_cell->storage_rwlock, NULL); pthread_rwlock_wrlock(&a_chain->cell_rwlock); HASH_ADD(hh, a_chain->cells, id, sizeof(dap_chain_cell_id_t), l_cell); pthread_rwlock_unlock(&a_chain->cell_rwlock); @@ -138,6 +140,7 @@ void dap_chain_cell_delete(dap_chain_cell_t *a_cell) } a_cell->chain = NULL; DAP_DEL_Z(a_cell->file_storage_path) + pthread_rwlock_destroy(&a_cell->storage_rwlock); DAP_DEL_Z(a_cell); } @@ -186,12 +189,15 @@ int dap_chain_cell_load(dap_chain_t * a_chain, const char * a_cell_file_path) } unsigned long l_read = fread(l_element, 1, l_el_size, l_f); if(l_read == l_el_size) { - a_chain->callback_atom_add(a_chain, l_element, l_el_size); // !!! blocking GDB call !!! + dap_chain_atom_verify_res_t l_res = a_chain->callback_atom_add(a_chain, l_element, l_el_size); // !!! blocking GDB call !!! + if (l_res == ATOM_PASS && l_res == ATOM_REJECT) { + DAP_DELETE(l_element); + } ++q; } else { log_it(L_ERROR, "Read only %zd of %zd bytes, stop cell loading", l_read, l_el_size); - ret = -6; DAP_DELETE(l_element); + ret = -6; break; } } @@ -199,8 +205,9 @@ int dap_chain_cell_load(dap_chain_t * a_chain, const char * a_cell_file_path) log_it(L_INFO, "Couldn't load all atoms, %d only", q); } else { log_it(L_INFO, "Loaded all %d atoms in cell %s", q, a_cell_file_path); - dap_chain_cell_create_fill2(a_chain, a_cell_file_path); } + if (q) + dap_chain_cell_create_fill2(a_chain, a_cell_file_path); fclose(l_f); return ret; @@ -226,8 +233,7 @@ int dap_chain_cell_file_append( dap_chain_cell_t * a_cell, const void* a_atom, s char l_file_path[MAX_PATH] = {'\0'}; dap_snprintf(l_file_path, MAX_PATH, "%s/%s", DAP_CHAIN_PVT(a_cell->chain)->file_storage_dir, a_cell->file_storage_path); - if(!a_cell->file_storage) - a_cell->file_storage = fopen(l_file_path, "r+b"); + a_cell->file_storage = fopen(l_file_path, "r+b"); if (!a_cell->file_storage) { log_it(L_INFO, "Create chain cell"); a_cell->file_storage = fopen(l_file_path, "w+b"); @@ -239,6 +245,7 @@ int dap_chain_cell_file_append( dap_chain_cell_t * a_cell, const void* a_atom, s return -3; } } + pthread_rwlock_wrlock(&a_cell->storage_rwlock); fseek(a_cell->file_storage, 0L, SEEK_END); if (ftell(a_cell->file_storage) < (long)sizeof(dap_chain_cell_file_header_t)) { // fill the header fseek(a_cell->file_storage, 0L, SEEK_SET); @@ -261,14 +268,14 @@ int dap_chain_cell_file_append( dap_chain_cell_t * a_cell, const void* a_atom, s } // if no atom provided in arguments, we flush all the atoms in given chain size_t l_atom_size = a_atom_size ? a_atom_size : 0; - int l_total_wrote_bytes = 0; + size_t l_total_wrote_bytes = 0, l_count = 0; dap_chain_atom_iter_t *l_atom_iter = a_atom ? NULL : a_cell->chain->callback_atom_iter_create(a_cell->chain); if (!a_atom) { fseek(a_cell->file_storage, sizeof(dap_chain_cell_file_header_t), SEEK_SET); } for (dap_chain_atom_ptr_t l_atom = a_atom ? (dap_chain_atom_ptr_t)a_atom : a_cell->chain->callback_atom_iter_get_first(l_atom_iter, &l_atom_size); l_atom; - l_atom = a_atom ? NULL : a_cell->chain->callback_atom_iter_get_next(l_atom_iter, &l_atom_size)) + l_atom = a_atom ? NULL : a_cell->chain->callback_atom_iter_get_next(l_atom_iter, &l_atom_size), l_count++) { if (fwrite(&l_atom_size, 1, sizeof(l_atom_size), a_cell->file_storage) != sizeof(l_atom_size)) { log_it (L_ERROR,"Can't write atom data size from cell 0x%016X in \"%s\"", @@ -297,9 +304,15 @@ int dap_chain_cell_file_append( dap_chain_cell_t * a_cell, const void* a_atom, s } if (l_total_wrote_bytes > 0) { fflush(a_cell->file_storage); - if (!a_atom) + if (!a_atom) { ftruncate(fileno(a_cell->file_storage), l_total_wrote_bytes + sizeof(dap_chain_cell_file_header_t)); + log_it(L_DEBUG, "Saved %d atoms (total %"DAP_UINT64_FORMAT_U" bytes", l_count, l_total_wrote_bytes); + } + } else if (!a_atom) { + log_it(L_WARNING, "Nothing to save, event table is empty"); } + + pthread_rwlock_unlock(&a_cell->storage_rwlock); if (l_atom_iter) { a_cell->chain->callback_atom_iter_delete(l_atom_iter); } diff --git a/modules/chain/dap_chain_ledger.c b/modules/chain/dap_chain_ledger.c index d9eaf907ae..702a5a9744 100644 --- a/modules/chain/dap_chain_ledger.c +++ b/modules/chain/dap_chain_ledger.c @@ -338,8 +338,7 @@ int dap_chain_ledger_token_add(dap_ledger_t * a_ledger, dap_chain_datum_token_t pthread_rwlock_unlock(&PVT(a_ledger)->tokens_rwlock); if (l_token_item) { - if(s_debug_more) - log_it(L_WARNING,"Duplicate token declaration for ticker '%s' ", a_token->ticker); + log_it(L_WARNING,"Duplicate token declaration for ticker '%s' ", a_token->ticker); return -3; } @@ -1226,7 +1225,7 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, } else { if (l_token_item) { if(s_debug_more) - log_it(L_ERROR, "Can't add token emission datum of %"DAP_UINT64_FORMAT_U" %s ( %s )", + log_it(L_ERROR, "Duplicate token emission datum of %"DAP_UINT64_FORMAT_U" %s ( %s )", a_token_emission->hdr.value, c_token_ticker, l_hash_str); } ret = -1; @@ -1924,8 +1923,8 @@ int dap_chain_ledger_tx_cache_check(dap_ledger_t *a_ledger, dap_chain_datum_tx_t } l_value_cur = NULL; } else { - log_it(L_ERROR, "Emission for tx_token wasn't found"); - l_err_num = -11; + log_it(L_WARNING, "Emission for tx_token wasn't found"); + l_err_num = DAP_CHAIN_CS_VERIFY_CODE_TX_NO_EMISSION; break; } } diff --git a/modules/chain/include/dap_chain_cell.h b/modules/chain/include/dap_chain_cell.h index 5f599e40e4..2c5f9246e7 100644 --- a/modules/chain/include/dap_chain_cell.h +++ b/modules/chain/include/dap_chain_cell.h @@ -37,6 +37,7 @@ typedef struct dap_chain_cell { char * file_storage_path; FILE * file_storage; /// @param file_cache @brief Cache for raw blocks uint8_t file_storage_type; /// @param file_storage_type @brief Is file_storage is raw, compressed or smth else + pthread_rwlock_t storage_rwlock; UT_hash_handle hh; } dap_chain_cell_t; diff --git a/modules/chain/include/dap_chain_ledger.h b/modules/chain/include/dap_chain_ledger.h index ca99601676..e164a9e04c 100644 --- a/modules/chain/include/dap_chain_ledger.h +++ b/modules/chain/include/dap_chain_ledger.h @@ -54,7 +54,9 @@ typedef bool (* dap_chain_ledger_verificator_callback_t)(dap_chain_tx_out_cond_t #define DAP_CHAIN_LEDGER_CHECK_CELLS_DS 0x0100 // Error code for no previous transaction (candidate to threshold) -#define DAP_CHAIN_CS_VERIFY_CODE_TX_NO_PREVIOUS -5 +#define DAP_CHAIN_CS_VERIFY_CODE_TX_NO_PREVIOUS -111 +// Error code for no emission for a transaction (candidate to threshold) +#define DAP_CHAIN_CS_VERIFY_CODE_TX_NO_EMISSION -112 #define DAP_CHAIN_LEDGER_TOKENS_STR "tokens" #define DAP_CHAIN_LEDGER_EMISSIONS_STR "emissions" diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index bb60eb3945..a7eb706602 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -498,6 +498,7 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); log_it(L_WARNING,"Atom with hash %s for %s:%s not accepted (code ATOM_PASS, already present)", l_atom_hash_str, l_chain->net_name, l_chain->name); } + DAP_DELETE(l_atom_copy); break; case ATOM_MOVE_TO_THRESHOLD: case ATOM_ACCEPT: @@ -515,12 +516,14 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) log_it(L_ERROR, "Can't create cell with id 0x%x to save event...", l_sync_request->request_hdr.cell_id); break; } - int l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size); - if(l_res < 0) { - log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash, - l_cell ? l_cell->file_storage_path : "[null]"); - } else { - dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); + if (l_atom_add_res == ATOM_ACCEPT) { + int l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size); + if(l_res < 0) { + log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash, + l_cell ? l_cell->file_storage_path : "[null]"); + } else { + dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); + } } if (l_chain->callback_atom_add_from_treshold) { dap_chain_atom_ptr_t l_atom_treshold; @@ -530,7 +533,7 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) log_it(L_DEBUG, "Try to add atom from treshold"); l_atom_treshold = l_chain->callback_atom_add_from_treshold(l_chain, &l_atom_treshold_size); if(l_atom_treshold) { - l_res = dap_chain_cell_file_append(l_cell, l_atom_treshold, l_atom_treshold_size); + int l_res = dap_chain_cell_file_append(l_cell, l_atom_treshold, l_atom_treshold_size); log_it(L_INFO, "Added atom from treshold"); if(l_res < 0) { log_it(L_ERROR, "Can't save event 0x%x from treshold to file '%s'", @@ -539,20 +542,22 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) } } while(l_atom_treshold); } - dap_chain_cell_close(l_cell); + //dap_chain_cell_close(l_cell); break; case ATOM_REJECT: { - char l_atom_hash_str[72] = {'\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); - log_it(L_WARNING,"Atom with hash %s for %s:%s rejected", l_atom_hash_str, l_chain->net_name, l_chain->name); + if (s_debug_more) { + char l_atom_hash_str[72] = {'\0'}; + dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); + log_it(L_WARNING,"Atom with hash %s for %s:%s rejected", l_atom_hash_str, l_chain->net_name, l_chain->name); + } + DAP_DELETE(l_atom_copy); break; } default: + DAP_DELETE(l_atom_copy); log_it(L_CRITICAL, "Wtf is this ret code? %d", l_atom_add_res); break; } - - DAP_DEL_Z(l_atom_copy); DAP_DEL_Z(l_sync_request); return true; } @@ -808,8 +813,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); break; } - if(s_debug_more) - log_it(L_INFO, "In: UPDATE_GLOBAL_DB_REQ pkt: net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , + log_it(L_INFO, "In: UPDATE_GLOBAL_DB_REQ pkt: net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); if (l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) memcpy(&l_ch_chain->request, l_chain_pkt->data, sizeof(dap_stream_ch_chain_sync_request_t)); @@ -920,7 +924,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if(s_debug_more) { if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END) - log_it(L_INFO, "In: UPDATE_GLOBAL_DB_END pkt"); + log_it(L_INFO, "In: UPDATE_GLOBAL_DB_END pkt with total count %d hashes", + HASH_COUNT(l_ch_chain->remote_gdbs)); else log_it(L_INFO, "In: SYNC_GLOBAL_DB pkt"); } @@ -958,7 +963,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if(l_chain_pkt_data_size > 0) { struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); dap_chain_pkt_item_t *l_pkt_item = &l_sync_request->pkt; - l_pkt_item->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size); + l_pkt_item->pkt_data = DAP_NEW_SIZE(byte_t, l_chain_pkt_data_size); memcpy(l_pkt_item->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); l_pkt_item->pkt_data_size = l_chain_pkt_data_size; dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_gdb_in_pkt_proc_callback, l_sync_request); @@ -1144,7 +1149,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if(s_debug_more) { if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END) - log_it(L_INFO, "In: UPDATE_CHAINS_END pkt"); + log_it(L_INFO, "In: UPDATE_CHAINS_END pkt with total count %d hashes", + HASH_COUNT(l_ch_chain->remote_atoms)); else log_it(L_INFO, "In: SYNC_CHAINS pkt"); } @@ -1193,7 +1199,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if(l_chain_pkt_data_size > 0) { struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); dap_chain_pkt_item_t *l_pkt_item = &l_sync_request->pkt; - l_pkt_item->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size); + l_pkt_item->pkt_data = DAP_NEW_SIZE(byte_t, l_chain_pkt_data_size); memcpy(l_pkt_item->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); l_pkt_item->pkt_data_size = l_chain_pkt_data_size; if (s_debug_more){ diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index d7dba4dc71..9410f5bbf7 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -153,7 +153,8 @@ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) return; // check for last attempt bool l_is_last_attempt = a_arg ? true : false; - if(l_is_last_attempt){ + if (l_is_last_attempt) { + dap_chain_node_client_state_t l_prev_state = l_node_client->state; pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_DISCONNECTED; #ifndef _WIN32 @@ -164,7 +165,7 @@ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) pthread_mutex_unlock(&l_node_client->wait_mutex); l_node_client->esocket_uuid = 0; - if (l_node_client->callbacks.disconnected) { + if (l_prev_state >= NODE_CLIENT_STATE_ESTABLISHED && l_node_client->callbacks.disconnected) { l_node_client->callbacks.disconnected(l_node_client, l_node_client->callbacks_arg); } else if (l_node_client->keep_connection) { dap_events_socket_uuid_t *l_uuid = DAP_NEW(dap_events_socket_uuid_t); diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 015eb3fe86..d9f87a5865 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -335,22 +335,15 @@ static int s_dap_chain_add_atom_to_ledger(dap_chain_cs_dag_t * a_dag, dap_ledger static int s_dap_chain_add_atom_to_events_table(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger, dap_chain_cs_dag_event_item_t * a_event_item ) { - int res = a_dag->callback_cs_verify(a_dag,a_event_item->event, a_event_item->event_size); - - char l_buf_hash[128] = {'\0'}; - dap_chain_hash_fast_to_str(&a_event_item->hash,l_buf_hash,sizeof(l_buf_hash)-1); - if (res == 0 || memcmp( &a_event_item->hash, &a_dag->static_genesis_event_hash, sizeof(a_event_item->hash) ) == 0) { - if(s_debug_more) - log_it(L_DEBUG,"Dag event %s checked, add it to ledger", l_buf_hash); - int l_ledger_res = s_dap_chain_add_atom_to_ledger(a_dag, a_ledger, a_event_item); - if ( l_ledger_res != 0) { - if(s_debug_more) - log_it(L_WARNING,"Dag event %s checked, but ledger declined: code %d", l_buf_hash, l_ledger_res); - } - } else { - log_it(L_WARNING,"Dag event %s check failed: code %d", l_buf_hash, res ); + int l_ledger_res = s_dap_chain_add_atom_to_ledger(a_dag, a_ledger, a_event_item); + if (s_debug_more) { + char l_buf_hash[128] = {'\0'}; + dap_chain_hash_fast_to_str(&a_event_item->hash,l_buf_hash,sizeof(l_buf_hash)-1); + log_it(L_DEBUG,"Dag event %s checked, add it to ledger", l_buf_hash); + if (l_ledger_res != 0) + log_it(L_WARNING,"Dag event %s checked, but ledger declined: code %d", l_buf_hash, l_ledger_res); } - return res; + return l_ledger_res; } static bool s_dap_chain_check_if_event_is_present(dap_chain_cs_dag_event_item_t * a_hash_table, const dap_chain_hash_fast_t * hash) { @@ -423,17 +416,17 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha break; case ATOM_ACCEPT: { int l_consensus_check = s_dap_chain_add_atom_to_events_table(l_dag, a_chain->ledger, l_event_item); - //All correct, no matter for result - pthread_rwlock_wrlock(l_events_rwlock); - HASH_ADD(hh, PVT(l_dag)->events,hash,sizeof (l_event_item->hash), l_event_item); - s_dag_events_lasts_process_new_last_event(l_dag, l_event_item); - pthread_rwlock_unlock(l_events_rwlock); switch (l_consensus_check) { case 0: + pthread_rwlock_wrlock(l_events_rwlock); + HASH_ADD(hh, PVT(l_dag)->events,hash,sizeof (l_event_item->hash), l_event_item); + s_dag_events_lasts_process_new_last_event(l_dag, l_event_item); + pthread_rwlock_unlock(l_events_rwlock); if(s_debug_more) log_it(L_DEBUG, "... added"); break; case DAP_CHAIN_CS_VERIFY_CODE_TX_NO_PREVIOUS: + case DAP_CHAIN_CS_VERIFY_CODE_TX_NO_EMISSION: pthread_rwlock_wrlock(l_events_rwlock); HASH_ADD(hh, PVT(l_dag)->events_treshold, hash, sizeof(l_event_item->hash), l_event_item); pthread_rwlock_unlock(l_events_rwlock); @@ -443,7 +436,8 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha break; default: l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_item->hash); - log_it(L_WARNING, "Atom %s (size %zd) error adding (code %d)", l_event_hash_str, a_atom_size, l_consensus_check); + if (s_debug_more) + log_it(L_WARNING, "Atom %s (size %zd) error adding (code %d)", l_event_hash_str, a_atom_size, l_consensus_check); ret = ATOM_REJECT; break; } @@ -636,6 +630,7 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain memcpy(&l_event_unlinked_item->hash, &l_event_ext_item->hash, sizeof(l_event_ext_item->hash)); l_event_unlinked_item->event = l_event; + l_event_unlinked_item->event_size = l_event_size; l_event_unlinked_item->ts_added = (time_t) l_event->header.ts_created; pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock); HASH_ADD(hh, PVT(l_dag)->events_lasts_unlinked, hash, sizeof(l_event_unlinked_item->hash), @@ -683,6 +678,22 @@ dap_chain_cs_dag_event_t* dap_chain_cs_dag_find_event_by_hash(dap_chain_cs_dag_t return l_event; } +static bool s_event_verify_size(dap_chain_cs_dag_event_t *a_event, size_t a_event_size) +{ + if (sizeof(a_event->header) >= a_event_size) { + log_it(L_WARNING, "Size of atom is %zd that is equal or less then header %zd", a_event_size, sizeof(a_event->header)); + return false; + } + size_t l_sign_offset = dap_chain_cs_dag_event_calc_size_excl_signs(a_event, a_event_size); + if (l_sign_offset >= a_event_size) + return false; + for (int i = 0; i < a_event->header.signs_count; i++) { + dap_sign_t *l_sign = (dap_sign_t *)((uint8_t *)a_event + l_sign_offset); + l_sign_offset += dap_sign_get_size(l_sign); + } + return l_sign_offset == a_event_size; +} + /** @@ -697,8 +708,19 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_verify(dap_chain_t * a_ dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *) a_atom; dap_chain_atom_verify_res_t res = ATOM_ACCEPT; pthread_rwlock_t *l_events_rwlock = &PVT(l_dag)->events_rwlock; - if(sizeof (l_event->header) >= a_atom_size){ - log_it(L_WARNING,"Size of atom is %zd that is equal or less then header %zd",a_atom_size,sizeof (l_event->header)); + if (l_event->header.version) { + if (s_debug_more) + log_it(L_WARNING, "Unsupported event version, possible corrupted event"); + return ATOM_REJECT; + } + if (l_event->header.chain_id.uint64 != a_chain->id.uint64) { + if (s_debug_more) + log_it(L_WARNING, "Event from another chain, possible corrupted event"); + return ATOM_REJECT; + } + if (!s_event_verify_size(l_event, a_atom_size)) { + if (s_debug_more) + log_it(L_WARNING,"Event size not equal to expected"); return ATOM_REJECT; } // Hard accept list @@ -1522,9 +1544,10 @@ static int s_cli_dag(int argc, char ** argv, void *arg_func, char **a_str_reply) pthread_rwlock_rdlock(&PVT(l_dag)->events_rwlock); HASH_FIND(hh,PVT(l_dag)->events,&l_event_hash,sizeof(l_event_hash),l_event_item); pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); - if ( l_event_item ) + if ( l_event_item ) { l_event = l_event_item->event; - else { + l_event_size = l_event_item->event_size; + } else { ret = -24; dap_chain_node_cli_set_reply_text(a_str_reply, "Can't find event %s in events table\n", l_event_hash_str); @@ -1594,7 +1617,7 @@ static int s_cli_dag(int argc, char ** argv, void *arg_func, char **a_str_reply) dap_chain_addr_fill(&l_addr, l_sign->header.type, &l_pkey_hash, l_net->pub.id); char * l_addr_str = dap_chain_addr_to_str(&l_addr); dap_string_append_printf(l_str_tmp,"\t\t\t\t\t\ttype: %s\taddr: %s" - "n", dap_sign_type_to_str( l_sign->header.type ), + "\n", dap_sign_type_to_str( l_sign->header.type ), l_addr_str ); l_offset += l_sign_size; DAP_DELETE( l_addr_str); diff --git a/modules/type/dag/include/dap_chain_cs_dag_event.h b/modules/type/dag/include/dap_chain_cs_dag_event.h index 02203ef787..8065ea0eb3 100644 --- a/modules/type/dag/include/dap_chain_cs_dag_event.h +++ b/modules/type/dag/include/dap_chain_cs_dag_event.h @@ -103,12 +103,14 @@ static inline size_t dap_chain_cs_dag_event_calc_size(dap_chain_cs_dag_event_t * */ static inline ssize_t dap_chain_cs_dag_event_calc_size_excl_signs(dap_chain_cs_dag_event_t * a_event,size_t a_event_size) { - if (a_event_size< sizeof (a_event->header)) + if (a_event_size < sizeof(a_event->header)) return -1; size_t l_hashes_size = a_event->header.hash_count*sizeof(dap_chain_hash_fast_t); + if (l_hashes_size > a_event_size) + return -1; dap_chain_datum_t * l_datum = (dap_chain_datum_t*) (a_event->hashes_n_datum_n_signs + l_hashes_size); size_t l_datum_size = dap_chain_datum_size(l_datum); - return l_hashes_size + sizeof (a_event->header)+l_datum_size; + return l_hashes_size + sizeof (a_event->header) + l_datum_size; } /** @@ -119,5 +121,5 @@ static inline ssize_t dap_chain_cs_dag_event_calc_size_excl_signs(dap_chain_cs_d */ static inline void dap_chain_cs_dag_event_calc_hash(dap_chain_cs_dag_event_t * a_event,size_t a_event_size, dap_chain_hash_fast_t * a_event_hash) { - dap_hash_fast(a_event, a_event_size , a_event_hash); + dap_hash_fast(a_event, a_event_size, a_event_hash); } -- GitLab