diff --git a/dap-sdk b/dap-sdk index f5409cdd40208193ec7d0e577318bfeb30f994e5..5381607bce56e21fe61c878b343468cdb010a855 160000 --- a/dap-sdk +++ b/dap-sdk @@ -1 +1 @@ -Subproject commit f5409cdd40208193ec7d0e577318bfeb30f994e5 +Subproject commit 5381607bce56e21fe61c878b343468cdb010a855 diff --git a/modules/chain/CMakeLists.txt b/modules/chain/CMakeLists.txt index f635a344e37e81cb627ab9e6493f0bfc4b39abab..5e2c10d1c370af894f2ea878d37c9f4c5980bbb6 100644 --- a/modules/chain/CMakeLists.txt +++ b/modules/chain/CMakeLists.txt @@ -2,9 +2,13 @@ cmake_minimum_required(VERSION 3.10) project (dap_chain) file(GLOB DAP_CHAIN_SRCS *.c) - file(GLOB DAP_CHAIN_HEADERS include/*.h) +if (WIN32) + file (GLOB MMAN_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/../../dap-sdk/3rdparty/mman/*) + list(APPEND DAP_CHAIN_SRCS ${MMAN_SRCS}) +endif() + add_library(${PROJECT_NAME} STATIC ${DAP_CHAIN_SRCS} ${DAP_CHAIN_HEADERS}) if(BUILD_CELLFRAME_SDK_TESTS) @@ -17,7 +21,9 @@ endif() target_link_libraries(${PROJECT_NAME} dap_chain_common dap_global_db dap_notify_srv dap_stream ${GLIB_LDFLAGS}) target_include_directories(${PROJECT_NAME} INTERFACE . include/ ${GLIB_INCLUDE_DIRS}) target_include_directories(${PROJECT_NAME} PUBLIC include) -target_include_directories(${PROJECT_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../../../dap-sdk/3rdparty/uthash/src) +if (WIN32) + target_include_directories(${PROJECT_NAME} INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/../../dap-sdk/3rdparty/mman/) +endif() if (INSTALL_SDK) set_target_properties(${PROJECT_NAME} PROPERTIES PUBLIC_HEADER "${DAP_CHAIN_HEADERS}") diff --git a/modules/chain/dap_chain.c b/modules/chain/dap_chain.c index 4bbdd262c612efbac628e68bc2ed08e8484aae39..04a3a2aa456e353a54551a8a048c24a6d0381358 100644 --- a/modules/chain/dap_chain.c +++ b/modules/chain/dap_chain.c @@ -110,6 +110,11 @@ dap_chain_t *dap_chain_create(const char *a_chain_net_name, const char *a_chain_ .net_id = a_chain_net_id, .name = dap_strdup(a_chain_name), .net_name = dap_strdup(a_chain_net_name), +#ifdef DAP_OS_WINDOWS // TODO + .is_mapped = false, +#else + .is_mapped = dap_config_get_item_bool_default(g_config, "ledger", "mapped", true), +#endif .cell_rwlock = PTHREAD_RWLOCK_INITIALIZER, .atom_notifiers = NULL }; @@ -392,7 +397,10 @@ dap_chain_t *dap_chain_load_from_cfg(const char *a_chain_net_name, dap_chain_net if ( dap_config_get_item_str_default(l_cfg, "files","storage_dir", NULL) ) { - DAP_CHAIN_PVT(l_chain)->file_storage_dir = (char*)dap_config_get_item_path( l_cfg , "files","storage_dir" ); + DAP_CHAIN_PVT(l_chain)->file_storage_dir = (char*)dap_config_get_item_path( l_cfg, "files", "storage_dir" ); + if (!dap_dir_test(DAP_CHAIN_PVT(l_chain)->file_storage_dir)) { + dap_mkdir_with_parents(DAP_CHAIN_PVT(l_chain)->file_storage_dir); + } } else log_it (L_INFO, "Not set file storage path, will not stored in files"); @@ -592,9 +600,6 @@ int dap_chain_load_all(dap_chain_t *a_chain) char *l_storage_dir = DAP_CHAIN_PVT(a_chain)->file_storage_dir; if (!l_storage_dir) return 0; - if (!dap_dir_test(l_storage_dir)) { - dap_mkdir_with_parents(l_storage_dir); - } DIR *l_dir = opendir(l_storage_dir); if (!l_dir) { log_it(L_ERROR, "Cannot open directory %s", DAP_CHAIN_PVT(a_chain)->file_storage_dir); @@ -724,7 +729,8 @@ static bool s_notify_atom_on_thread(void *a_arg) struct chain_thread_notifier *l_arg = a_arg; assert(l_arg->atom && l_arg->callback); l_arg->callback(l_arg->callback_arg, l_arg->chain, l_arg->cell_id, l_arg->atom, l_arg->atom_size); - DAP_DELETE(l_arg->atom); + if ( !l_arg->chain->is_mapped ) + DAP_DELETE(l_arg->atom); DAP_DELETE(l_arg); return false; } @@ -747,41 +753,7 @@ ssize_t dap_chain_atom_save(dap_chain_cell_t *a_chain_cell, const uint8_t *a_ato } } } - ssize_t l_res = dap_chain_cell_file_append(a_chain_cell, a_atom, a_atom_size); - if (l_chain->atom_notifiers) { - dap_list_t *l_iter; - DL_FOREACH(l_chain->atom_notifiers, l_iter) { - dap_chain_atom_notifier_t *l_notifier = (dap_chain_atom_notifier_t*)l_iter->data; - struct chain_thread_notifier *l_arg = DAP_NEW_Z(struct chain_thread_notifier); - if (!l_arg) { - log_it(L_CRITICAL, "%s", g_error_memory_alloc); - continue; - } - *l_arg = (struct chain_thread_notifier) { .callback = l_notifier->callback, .callback_arg = l_notifier->arg, - .chain = l_chain, .cell_id = a_chain_cell->id, .atom_size = a_atom_size }; - l_arg->atom = DAP_DUP_SIZE(a_atom, a_atom_size); - if (!l_arg->atom) { - DAP_DELETE(l_arg); - log_it(L_CRITICAL, "%s", g_error_memory_alloc); - continue; - } - dap_proc_thread_callback_add_pri(NULL, s_notify_atom_on_thread, l_arg, DAP_QUEUE_MSG_PRIORITY_LOW); - } - } - if (l_chain->callback_atom_add_from_treshold) { - dap_chain_atom_ptr_t l_atom_treshold; - do { - size_t l_atom_treshold_size; - l_atom_treshold = l_chain->callback_atom_add_from_treshold(l_chain, &l_atom_treshold_size); - if (l_atom_treshold) { - if (dap_chain_cell_file_append(a_chain_cell, l_atom_treshold, l_atom_treshold_size) > 0) - log_it(L_INFO, "Added atom from treshold"); - else - log_it(L_ERROR, "Can't add atom from treshold"); - } - } while(l_atom_treshold); - } - return l_res; + return dap_chain_cell_file_append(a_chain_cell, a_atom, a_atom_size); } /** @@ -824,6 +796,36 @@ const char* dap_chain_get_path(dap_chain_t *a_chain) return DAP_CHAIN_PVT(a_chain)->file_storage_dir; } +void dap_chain_atom_notify(dap_chain_cell_t *a_chain_cell, const uint8_t *a_atom, size_t a_atom_size) { + if ( !a_chain_cell->chain->atom_notifiers ) + return; + dap_list_t *l_iter; + DL_FOREACH(a_chain_cell->chain->atom_notifiers, l_iter) { + dap_chain_atom_notifier_t *l_notifier = (dap_chain_atom_notifier_t*)l_iter->data; + struct chain_thread_notifier *l_arg = DAP_NEW_Z(struct chain_thread_notifier); + if (!l_arg) { + log_it(L_CRITICAL, "%s", g_error_memory_alloc); + continue; + } + *l_arg = (struct chain_thread_notifier) { + .callback = l_notifier->callback, .callback_arg = l_notifier->arg, + .chain = a_chain_cell->chain, .cell_id = a_chain_cell->id, + .atom = a_chain_cell->chain->is_mapped ? a_atom : DAP_DUP_SIZE(a_atom, a_atom_size), + .atom_size = a_atom_size }; + dap_proc_thread_callback_add_pri(NULL, s_notify_atom_on_thread, l_arg, DAP_QUEUE_MSG_PRIORITY_LOW); + } +} + +void dap_chain_atom_add_from_threshold(dap_chain_t *a_chain) { + if ( !a_chain->callback_atom_add_from_treshold ) + return; + dap_chain_atom_ptr_t l_atom_treshold = NULL; + do { + size_t l_atom_treshold_size; + l_atom_treshold = a_chain->callback_atom_add_from_treshold(a_chain, &l_atom_treshold_size); + } while(l_atom_treshold); +} + const char *dap_chain_type_to_str(const dap_chain_type_t a_default_chain_type) { switch (a_default_chain_type) { diff --git a/modules/chain/dap_chain_cell.c b/modules/chain/dap_chain_cell.c index 4d758f091650571176ae07a67a35e89d72f8a331..d0a9d498e30dd81f6c70594e43c5f258785187d1 100644 --- a/modules/chain/dap_chain_cell.c +++ b/modules/chain/dap_chain_cell.c @@ -30,16 +30,18 @@ #include "dap_config.h" #include "dap_strfuncs.h" #include "dap_file_utils.h" - +#ifdef DAP_OS_WINDOWS +#include "mman.h" +#else +#include <sys/mman.h> +#endif #define LOG_TAG "dap_chain_cell" - - #define DAP_CHAIN_CELL_FILE_VERSION 1 #define DAP_CHAIN_CELL_FILE_SIGNATURE 0xfa340bef153eba48 #define DAP_CHAIN_CELL_FILE_TYPE_RAW 0 #define DAP_CHAIN_CELL_FILE_TYPE_COMPRESSED 1 - +#define DAP_MAPPED_VOLUME_LIMIT (1 << 28) // 256 MB for now, may be should be configurable? /** * @struct dap_chain_cell_file_header */ @@ -53,6 +55,7 @@ typedef struct dap_chain_cell_file_header dap_chain_cell_id_t cell_id; } DAP_ALIGN_PACKED dap_chain_cell_file_header_t; +static bool s_debug_more = false; /** * @brief dap_chain_cell_init @@ -61,6 +64,7 @@ typedef struct dap_chain_cell_file_header */ int dap_chain_cell_init(void) { + s_debug_more = dap_config_get_item_bool_default(g_config, "chain", "debug_more", false); //s_cells_path = dap_config_get_item_str(g_config,"resources","cells_storage"); return 0; } @@ -99,17 +103,57 @@ dap_chain_cell_t * dap_chain_cell_create_fill(dap_chain_t * a_chain, dap_chain_c pthread_rwlock_unlock(&a_chain->cell_rwlock); return l_cell; } + char file_storage_path[MAX_PATH]; + snprintf(file_storage_path, MAX_PATH, "%s/%0"DAP_UINT64_FORMAT_x".dchaincell", + DAP_CHAIN_PVT(a_chain)->file_storage_dir, a_cell_id.uint64); + + char *l_map = NULL; + size_t l_size = 0; + FILE *l_file = fopen(file_storage_path, "r+b"); + if ( l_file ) { + if ( a_chain->is_mapped ) { + fseek(l_file, 0, SEEK_END); + l_size = ftell(l_file); + fseek(l_file, 0, SEEK_SET); + if ( MAP_FAILED == (l_map = mmap(NULL, l_size ? l_size : dap_page_roundup(DAP_MAPPED_VOLUME_LIMIT), PROT_READ|PROT_WRITE, MAP_PRIVATE, fileno(l_file), 0)) ) { + log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X" cannot be mapped, errno %d", file_storage_path, a_cell_id.uint64, errno); + fclose(l_file); + pthread_rwlock_unlock(&a_chain->cell_rwlock); + return NULL; + } + } + } else if (errno == ENOENT) { + if ( !(l_file = fopen(file_storage_path, "w+b")) ) { + log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X" cannot be opened, error %d", + file_storage_path, a_cell_id.uint64, errno); + pthread_rwlock_unlock(&a_chain->cell_rwlock); + return NULL; + } + if ( MAP_FAILED == (l_map = mmap(NULL, l_size = dap_page_roundup(DAP_MAPPED_VOLUME_LIMIT), PROT_READ|PROT_WRITE, MAP_PRIVATE, fileno(l_file), 0)) ) { + log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X" cannot be mapped, error %d", + file_storage_path, a_cell_id.uint64, errno); + fclose(l_file); + pthread_rwlock_unlock(&a_chain->cell_rwlock); + return NULL; + } + } + l_cell = DAP_NEW_Z(dap_chain_cell_t); - if ( !l_cell ) { - pthread_rwlock_unlock(&a_chain->cell_rwlock); - log_it(L_CRITICAL, "%s", g_error_memory_alloc); - return NULL; + *l_cell = (dap_chain_cell_t) { + .id = a_cell_id.uint64, + .map = l_map, + .map_pos = l_map, + .map_end = l_map ? l_map + l_size : NULL, + .file_storage = l_file, + .chain = a_chain, + .storage_rwlock = PTHREAD_RWLOCK_INITIALIZER + }; + memcpy(l_cell->file_storage_path, file_storage_path, sizeof(file_storage_path)); + if (l_map) { + l_cell->map_range_bounds = dap_list_append(l_cell->map_range_bounds, l_map); + l_cell->map_range_bounds = dap_list_append(l_cell->map_range_bounds, l_cell->map_end); } - l_cell->chain = a_chain; - l_cell->id.uint64 = a_cell_id.uint64; - snprintf(l_cell->file_storage_path, MAX_PATH, "%s/%0"DAP_UINT64_FORMAT_x".dchaincell", - DAP_CHAIN_PVT(a_chain)->file_storage_dir, l_cell->id.uint64); - pthread_rwlock_init(&l_cell->storage_rwlock, NULL); + debug_if (s_debug_more && a_chain->is_mapped, L_DEBUG, "Mapped volume size is %lu", (size_t)(l_cell->map_end - l_cell->map)); HASH_ADD(hh, a_chain->cells, id, sizeof(dap_chain_cell_id_t), l_cell); pthread_rwlock_unlock(&a_chain->cell_rwlock); return l_cell; @@ -143,6 +187,16 @@ void dap_chain_cell_close(dap_chain_cell_t *a_cell) fclose(a_cell->file_storage); a_cell->file_storage = NULL; } + if (a_cell->chain->is_mapped) { + for (dap_list_t *l_iter = a_cell->map_range_bounds; l_iter; l_iter = l_iter->next) { + if (l_iter->next) { + debug_if(s_debug_more, "Unmap volume %p (%lu bytes)", l_iter->data, (size_t)(l_iter->next->data - l_iter->data)); + munmap(l_iter->data, (size_t)(l_iter->next->data - l_iter->data)); + l_iter = l_iter->next; + } + } + dap_list_free(a_cell->map_range_bounds); + } } /** @@ -195,62 +249,79 @@ void dap_chain_cell_delete_all(dap_chain_t *a_chain) { */ int dap_chain_cell_load(dap_chain_t *a_chain, dap_chain_cell_t *a_cell) { - int l_ret = 0; - FILE *l_cell_file = fopen(a_cell->file_storage_path, "rb"); - if (!l_cell_file) { - log_it(L_WARNING,"Can't read chain \"%s\"", a_cell->file_storage_path); + if (!a_cell) + return -1; + fseek(a_cell->file_storage, 0, SEEK_END); + size_t l_size = ftell(a_cell->file_storage); + fseek(a_cell->file_storage, 0, SEEK_SET); + if ( l_size < sizeof(dap_chain_cell_file_header_t) || (a_chain->is_mapped && !a_cell->map_pos) ) { + log_it(L_INFO, "Chain cell \"%s\" is yet empty", a_cell->file_storage_path); return -1; } - dap_chain_cell_file_header_t l_hdr = { 0 }; - if (fread(&l_hdr, 1, sizeof(l_hdr), l_cell_file) != sizeof (l_hdr)) { - log_it(L_ERROR,"Can't read chain header \"%s\"", a_cell->file_storage_path); - fclose(l_cell_file); - return -2; + int l_ret = 0; + dap_chain_cell_file_header_t *l_hdr = NULL; + if (a_chain->is_mapped) { + l_hdr = (dap_chain_cell_file_header_t*)a_cell->map; + } else { + l_hdr = DAP_NEW(dap_chain_cell_file_header_t); + if ( fread(l_hdr, 1, sizeof(*l_hdr), a_cell->file_storage) != sizeof(*l_hdr) ) { + log_it(L_ERROR,"Can't read chain header \"%s\"", a_cell->file_storage_path); + fclose(a_cell->file_storage); + DAP_DELETE(l_hdr); + return -2; + } } - if (l_hdr.signature != DAP_CHAIN_CELL_FILE_SIGNATURE) { + if (l_hdr->signature != DAP_CHAIN_CELL_FILE_SIGNATURE) { log_it(L_ERROR, "Wrong signature in chain \"%s\", possible file corrupt", a_cell->file_storage_path); - fclose(l_cell_file); + fclose(a_cell->file_storage); + if (!a_chain->is_mapped) DAP_DELETE(l_hdr); return -3; } - if (l_hdr.version < DAP_CHAIN_CELL_FILE_VERSION ){ + if (l_hdr->version < DAP_CHAIN_CELL_FILE_VERSION ){ log_it(L_ERROR, "Too low chain version, backup files"); - fclose(l_cell_file); - return -3; + fclose(a_cell->file_storage); + if (!a_chain->is_mapped) DAP_DELETE(l_hdr); + return -4; } - unsigned long q = 0; - size_t l_read = 0; - uint64_t l_el_size = 0; - while ((l_read = fread(&l_el_size, 1, sizeof(l_el_size), l_cell_file)) && !feof(l_cell_file)) { - if (l_read != sizeof(l_el_size) || l_el_size == 0) { - log_it(L_ERROR, "Corrupted element size %zu, chain %s is damaged", l_el_size, a_cell->file_storage_path); - l_ret = -4; - break; - } - dap_chain_atom_ptr_t l_element = DAP_NEW_SIZE(dap_chain_atom_ptr_t, l_el_size); - if (!l_element) { - log_it(L_CRITICAL, "%s", g_error_memory_alloc); - l_ret = -5; - break; + + uint64_t q = 0, l_full_size = 0; + if (a_chain->is_mapped) { + a_cell->map_pos = a_cell->map + sizeof(dap_chain_cell_file_header_t); + for (uint64_t l_el_size = 0; a_cell->map_pos < a_cell->map_end && ( l_el_size = *(uint64_t*)a_cell->map_pos ); ++q, a_cell->map_pos += l_el_size) { + a_chain->callback_atom_add(a_chain, (dap_chain_atom_ptr_t)(a_cell->map_pos += sizeof(uint64_t)), l_el_size); } - l_read = fread((void *)l_element, 1, l_el_size, l_cell_file); - if (l_read != l_el_size) { - log_it(L_ERROR, "Read only %lu of %zu bytes, stop cell loading", l_read, l_el_size); - DAP_DELETE(l_element); - l_ret = -6; - break; + fseek(a_cell->file_storage, a_cell->map_pos - a_cell->map, SEEK_SET); + } else { + DAP_DELETE(l_hdr); + size_t l_read = 0; + uint64_t l_el_size = 0; + while ((l_read = fread(&l_el_size, 1, sizeof(l_el_size), a_cell->file_storage)) && !feof(a_cell->file_storage)) { + if (l_read != sizeof(l_el_size) || l_el_size == 0) { + log_it(L_ERROR, "Corrupted element size %zu, chain %s is damaged", l_el_size, a_cell->file_storage_path); + l_ret = -4; + break; + } + dap_chain_atom_ptr_t l_element = DAP_NEW_SIZE(dap_chain_atom_ptr_t, l_el_size); + if (!l_element) { + log_it(L_CRITICAL, "Memory allocation error"); + l_ret = -5; + break; + } + l_full_size += sizeof(uint64_t) + ( l_read = fread((void*)l_element, 1, l_el_size, a_cell->file_storage) ); + if (l_read != l_el_size) { + log_it(L_ERROR, "Read only %lu of %zu bytes, stop cell loading", l_read, l_el_size); + DAP_DELETE(l_element); + l_ret = -6; + break; + } + if ( a_chain->callback_atom_add(a_chain, l_element, l_el_size) != ATOM_ACCEPT ) + DAP_DELETE(l_element); + ++q; } - a_chain->callback_atom_add(a_chain, l_element, l_el_size); // ??? blocking GDB call - DAP_DELETE(l_element); - ++q; + fseek(a_cell->file_storage, l_full_size, SEEK_SET); } - if (l_ret < 0) { - log_it(L_INFO, "Couldn't load all atoms, %lu only", q); - } else { - log_it(L_INFO, "Loaded all %lu atoms in cell %s", q, a_cell->file_storage_path); - } - fclose(l_cell_file); + log_it(L_INFO, "Loaded %lu atoms in cell %s", q, a_cell->file_storage_path); return l_ret; - } static int s_file_write_header(dap_chain_cell_t *a_cell) @@ -274,16 +345,18 @@ static int s_file_write_header(dap_chain_cell_t *a_cell) .chain_id = { .uint64 = a_cell->id.uint64 }, .chain_net_id = a_cell->chain->net_id }; + if(fwrite(&l_hdr, sizeof(l_hdr), 1, a_cell->file_storage) == 1) { log_it(L_NOTICE, "Initialized file storage for cell 0x%016"DAP_UINT64_FORMAT_X" ( %s )", a_cell->id.uint64, a_cell->file_storage_path); fflush(a_cell->file_storage); + if (a_cell->chain->is_mapped) + a_cell->map_pos = a_cell->map + sizeof(l_hdr); return 0; - } else { - log_it(L_ERROR, "Can't init file storage for cell 0x%016"DAP_UINT64_FORMAT_X" ( %s )", - a_cell->id.uint64, a_cell->file_storage_path); - return -1; } + log_it(L_ERROR, "Can't init file storage for cell 0x%016"DAP_UINT64_FORMAT_X" ( %s )", + a_cell->id.uint64, a_cell->file_storage_path); + return -1; } static int s_file_atom_add(dap_chain_cell_t *a_cell, dap_chain_atom_ptr_t a_atom, uint64_t a_atom_size) @@ -292,6 +365,34 @@ static int s_file_atom_add(dap_chain_cell_t *a_cell, dap_chain_atom_ptr_t a_atom log_it(L_CRITICAL, "Invalid arguments"); return -1; } + if (a_cell->chain->is_mapped) { + size_t l_pos = ftell(a_cell->file_storage); + debug_if (s_debug_more, "Before filling volume for atom size %lu, stream pos of %s is %lu, map pos is %lu, space left in map %lu", + a_atom_size, a_cell->file_storage_path, l_pos, (size_t)(a_cell->map_pos - a_cell->map), (size_t)(a_cell->map_end - a_cell->map_pos)); + if ( a_atom_size > (size_t)(a_cell->map_end - a_cell->map_pos) ) { + size_t l_map_size = dap_page_roundup(DAP_MAPPED_VOLUME_LIMIT), + l_volume_start = dap_page_rounddown(l_pos), + l_offset = l_pos - l_volume_start; + debug_if (s_debug_more, "Need to enlarge map of %s, current stream pos is %lu, map pos is %lu, offset of new map is %lu", + a_cell->file_storage_path, ftell(a_cell->file_storage), (size_t)(a_cell->map_end - a_cell->map_pos), l_offset); + if ( MAP_FAILED == (a_cell->map = mmap(NULL, l_map_size, PROT_READ|PROT_WRITE, + MAP_PRIVATE, fileno(a_cell->file_storage), l_volume_start)) ) + { + log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X" cannot be mapped, errno %d", + a_cell->file_storage_path, a_cell->id.uint64, errno); + fclose(a_cell->file_storage); + return -2; + } + a_cell->map_pos = a_cell->map + l_offset; + a_cell->map_range_bounds = dap_list_append(a_cell->map_range_bounds, a_cell->map); + a_cell->map_range_bounds = dap_list_append(a_cell->map_range_bounds, a_cell->map_end = a_cell->map + l_map_size); + } + } + + debug_if (s_debug_more && a_cell->chain->is_mapped, "Before writing an atom of size %lu, stream pos of %s is %lu and pos is %lu, space left in map %lu", + a_atom_size, a_cell->file_storage_path, ftell(a_cell->file_storage), + (size_t)(a_cell->map_pos - a_cell->map), (size_t)(a_cell->map_end - a_cell->map_pos)); + if (fwrite(&a_atom_size, sizeof(a_atom_size), 1, a_cell->file_storage) != 1) { log_it (L_ERROR, "Can't write atom data size from cell 0x%016"DAP_UINT64_FORMAT_X" in \"%s\"", a_cell->id.uint64, @@ -300,10 +401,13 @@ static int s_file_atom_add(dap_chain_cell_t *a_cell, dap_chain_atom_ptr_t a_atom } if (fwrite(a_atom, a_atom_size, 1, a_cell->file_storage) != 1) { log_it (L_ERROR, "Can't write data from cell 0x%016"DAP_UINT64_FORMAT_X" to the file \"%s\"", - a_cell->id.uint64, - a_cell->file_storage_path); + a_cell->id.uint64, + a_cell->file_storage_path); return -3; } + debug_if (s_debug_more && a_cell->chain->is_mapped, "After writing an atom of size %lu, stream pos of %s is %lu and map shift is %lu", + a_atom_size, a_cell->file_storage_path, ftell(a_cell->file_storage), + (size_t)(a_cell->map_pos - a_cell->map)); return 0; } @@ -332,21 +436,19 @@ ssize_t dap_chain_cell_file_append(dap_chain_cell_t *a_cell, const void *a_atom, bool l_err = false; pthread_rwlock_wrlock(&a_cell->storage_rwlock); if (!a_atom || !a_atom_size) { - a_cell->file_storage = a_cell->file_storage - ? freopen(a_cell->file_storage_path, "w+b", a_cell->file_storage) - : fopen(a_cell->file_storage_path, "w+b"); - if (!a_cell->file_storage) { - log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X" cannot be opened", - a_cell->file_storage_path, - a_cell->id.uint64); - pthread_rwlock_unlock(&a_cell->storage_rwlock); - return -3; + a_cell->file_storage = freopen(a_cell->file_storage_path, "w+b", a_cell->file_storage); + debug_if (s_debug_more, "Rewinding file %s", a_cell->file_storage_path); + if (a_cell->chain->is_mapped && a_cell->map_range_bounds) { + a_cell->map = a_cell->map_pos = a_cell->map_range_bounds->data; + a_cell->map_end = a_cell->map_range_bounds->next->data; } - if (s_file_write_header(a_cell)) { - log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X": can't fill header", a_cell->file_storage_path, a_cell->id.uint64); + if ( s_file_write_header(a_cell) ) { + log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X": can't fill header", + a_cell->file_storage_path, a_cell->id.uint64); pthread_rwlock_unlock(&a_cell->storage_rwlock); - return -4; + return -2; } + l_total_res += sizeof(dap_chain_cell_file_header_t); dap_chain_atom_iter_t *l_atom_iter = a_cell->chain->callback_atom_iter_create(a_cell->chain, a_cell->id, NULL); dap_chain_atom_ptr_t l_atom; uint64_t l_atom_size = 0; @@ -363,25 +465,18 @@ ssize_t dap_chain_cell_file_append(dap_chain_cell_t *a_cell, const void *a_atom, } } a_cell->chain->callback_atom_iter_delete(l_atom_iter); - a_cell->file_storage = freopen(a_cell->file_storage_path, "a+b", a_cell->file_storage); + debug_if (s_debug_more && a_cell->chain->is_mapped, "After rewriting file %s, stream pos is %lu and map pos is %lu", + a_cell->file_storage_path, ftell(a_cell->file_storage), + (size_t)(a_cell->map_pos - a_cell->map)); } else { - if (!a_cell->file_storage) - a_cell->file_storage = fopen(a_cell->file_storage_path, "a+b"); - if (!a_cell->file_storage) { - log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X" cannot be opened", - a_cell->file_storage_path, - a_cell->id.uint64); + debug_if (s_debug_more && a_cell->chain->is_mapped, "Before appending an atom of size %lu, stream pos of %s is %lu, map pos is %lu", + a_atom_size, a_cell->file_storage_path, ftell(a_cell->file_storage), + (size_t)(a_cell->map_pos - a_cell->map)); + if ( !ftell(a_cell->file_storage) && s_file_write_header(a_cell) ) { + log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X": can't fill header", + a_cell->file_storage_path, a_cell->id.uint64); pthread_rwlock_unlock(&a_cell->storage_rwlock); return -3; - } else { - fseek(a_cell->file_storage, 0L, SEEK_END); - if (!ftell(a_cell->file_storage)) { // It's not garunteed that header has been yet added or not, regardless the descriptor validity - if (s_file_write_header(a_cell)) { - log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X": can't fill header", a_cell->file_storage_path, a_cell->id.uint64); - pthread_rwlock_unlock(&a_cell->storage_rwlock); - return -4; - } - } } if (s_file_atom_add(a_cell, a_atom, a_atom_size)) { log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X": can't save atom!", @@ -389,6 +484,9 @@ ssize_t dap_chain_cell_file_append(dap_chain_cell_t *a_cell, const void *a_atom, pthread_rwlock_unlock(&a_cell->storage_rwlock); return -4; } + debug_if (s_debug_more && a_cell->chain->is_mapped, "After appending an atom of size %lu, stream pos of %s is %lu, map pos is %lu", + a_atom_size, a_cell->file_storage_path, ftell(a_cell->file_storage), + (size_t)(a_cell->map_end - a_cell->map_pos)); ++l_count; l_total_res = a_atom_size + sizeof(uint64_t); } diff --git a/modules/chain/dap_chain_ch.c b/modules/chain/dap_chain_ch.c index f457f31cf3c0108797b9ba077ab663785c9b26f3..6b74b128727f49fdf73106d47a271a42120e7549 100644 --- a/modules/chain/dap_chain_ch.c +++ b/modules/chain/dap_chain_ch.c @@ -636,10 +636,7 @@ static bool s_sync_in_chains_callback(void *a_arg) break; case ATOM_ACCEPT: debug_if(s_debug_more, L_INFO, "Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); - if (dap_chain_atom_save(l_chain->cells, l_atom, l_atom_size, NULL) < 0) - log_it(L_ERROR, "Can't save atom %s to the file", l_atom_hash_str); - else - l_ack_send = true; + l_ack_send = true; break; case ATOM_REJECT: { debug_if(s_debug_more, L_WARNING, "Atom with hash %s for %s:%s rejected", l_atom_hash_str, l_chain->net_name, l_chain->name); @@ -656,10 +653,7 @@ static bool s_sync_in_chains_callback(void *a_arg) dap_stream_ch_pkt_send_by_addr(&l_args->addr, DAP_CHAIN_CH_ID, DAP_CHAIN_CH_PKT_TYPE_CHAIN_ACK, l_pkt, dap_chain_ch_pkt_get_size(l_pkt)); DAP_DELETE(l_pkt); debug_if(s_debug_more, L_DEBUG, "Out: CHAIN_ACK %s for net %s to destination " NODE_ADDR_FP_STR " with num %" DAP_UINT64_FORMAT_U, - l_chain ? l_chain->name : "(null)", - l_chain ? l_chain->net_name : "(null)", - NODE_ADDR_FP_ARGS_S(l_args->addr), - l_ack_num); + l_chain->name, l_chain->net_name, NODE_ADDR_FP_ARGS_S(l_args->addr), l_ack_num); } DAP_DELETE(l_args); return false; diff --git a/modules/chain/include/dap_chain.h b/modules/chain/include/dap_chain.h index 4b83efb886aafcd3ab98f56bcfa07a68e139d474..76def40e457ffacd4b9b23b1452cde56beccb63c 100644 --- a/modules/chain/include/dap_chain.h +++ b/modules/chain/include/dap_chain.h @@ -138,7 +138,7 @@ typedef struct dap_chain { char *name; char *net_name; bool is_datum_pool_proc; - + bool is_mapped; // Nested cells (hashtab by cell_id) dap_chain_cell_t *cells; dap_chain_cell_id_t active_cell_id; @@ -252,6 +252,8 @@ dap_chain_t *dap_chain_load_from_cfg(const char *a_chain_net_name, dap_chain_net void dap_chain_delete(dap_chain_t * a_chain); void dap_chain_add_callback_notify(dap_chain_t * a_chain, dap_chain_callback_notify_t a_callback, void * a_arg); +void dap_chain_atom_notify(dap_chain_cell_t *a_chain_cell, const uint8_t *a_atom, size_t a_atom_size); +void dap_chain_atom_add_from_threshold(dap_chain_t *a_chain); dap_chain_atom_ptr_t dap_chain_get_atom_by_hash(dap_chain_t * a_chain, dap_chain_hash_fast_t * a_atom_hash, size_t * a_atom_size); bool dap_chain_get_atom_last_hash_num(dap_chain_t *a_chain, dap_chain_cell_id_t a_cell_id, dap_hash_fast_t *a_atom_hash, uint64_t *a_atom_num); DAP_STATIC_INLINE bool dap_chain_get_atom_last_hash(dap_chain_t *a_chain, dap_chain_cell_id_t a_cell_id, dap_hash_fast_t *a_atom_hash) diff --git a/modules/chain/include/dap_chain_cell.h b/modules/chain/include/dap_chain_cell.h index bf0fc7d0577da6a613db86455d4f991ce0dd461b..2c52cce17bc28812cf327176fa6ae56647144629 100644 --- a/modules/chain/include/dap_chain_cell.h +++ b/modules/chain/include/dap_chain_cell.h @@ -34,10 +34,11 @@ typedef struct dap_chain_cell { dap_chain_t * chain; char file_storage_path[MAX_PATH]; - FILE * file_storage; /// @param file_cache @brief Cache for raw blocks - uint8_t file_storage_type; /// @param file_storage_type @brief Is file_storage is raw, compressed or smth else + char *map, *map_pos, *map_end; + FILE *file_storage; + uint8_t file_storage_type; + dap_list_t *map_range_bounds; pthread_rwlock_t storage_rwlock; - UT_hash_handle hh; } dap_chain_cell_t; diff --git a/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c b/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c index b724f4f9df21e5212b3232daaceb5c986fa5402c..00edeaf8229419998365351c91a0f804e3eb8324 100644 --- a/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c +++ b/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c @@ -614,19 +614,15 @@ static bool s_callback_round_event_to_chain_callback_get_round_item(dap_global_d if (l_chosen_item) { size_t l_event_size = l_chosen_item->event_size; dap_chain_cs_dag_event_t *l_new_atom = (dap_chain_cs_dag_event_t *)l_chosen_item->event_n_signs; - dap_hash_fast_t l_event_hash; - dap_hash_fast(l_new_atom, l_event_size, &l_event_hash); - char *l_event_hash_hex_str = DAP_NEW_STACK_SIZE(char, DAP_CHAIN_HASH_FAST_STR_SIZE); - dap_chain_hash_fast_to_str(&l_event_hash, l_event_hash_hex_str, DAP_CHAIN_HASH_FAST_STR_SIZE); + char *l_event_hash_hex_str; + dap_get_data_hash_str_static(l_new_atom, l_event_size, l_event_hash_hex_str); dap_chain_datum_t *l_datum = dap_chain_cs_dag_event_get_datum(l_new_atom, l_event_size); l_dag->round_completed = dap_max(l_new_atom->header.round_id, l_dag->round_current); int l_verify_datum = dap_chain_net_verify_datum_for_add(l_dag->chain, l_datum, &l_chosen_item->round_info.datum_hash); if (!l_verify_datum) { dap_chain_atom_verify_res_t l_res = l_dag->chain->callback_atom_add(l_dag->chain, l_new_atom, l_event_size); - if (l_res == ATOM_ACCEPT) { - dap_chain_atom_save(l_dag->chain->cells, (dap_chain_atom_ptr_t)l_new_atom, l_event_size, &l_event_hash); + if (l_res == ATOM_ACCEPT) s_poa_round_clean(l_dag->chain); - } log_it(L_INFO, "Event %s from round %"DAP_UINT64_FORMAT_U" %s", l_event_hash_hex_str, l_round_id, dap_chain_atom_verify_res_str[l_res]); } else { diff --git a/modules/consensus/esbocs/dap_chain_cs_esbocs.c b/modules/consensus/esbocs/dap_chain_cs_esbocs.c index 5f494a41a13d09d0a08fe2a684d0e3c63954d07b..b4cbb481ec3e1285ce1c0758cb412c3541999519 100644 --- a/modules/consensus/esbocs/dap_chain_cs_esbocs.c +++ b/modules/consensus/esbocs/dap_chain_cs_esbocs.c @@ -378,13 +378,6 @@ void dap_chain_esbocs_add_block_collect(dap_chain_block_cache_t *a_block_cache, if (a_type == DAP_CHAIN_BLOCK_COLLECT_BOTH || a_type == DAP_CHAIN_BLOCK_COLLECT_FEES) { dap_sign_t *l_sign = dap_chain_block_sign_get(a_block_cache->block, a_block_cache->block_size, 0); if (dap_pkey_match_sign(a_block_collect_params->block_sign_pkey, l_sign)) { - dap_chain_esbocs_block_collect_t *l_block_collect_params = DAP_NEW_Z(dap_chain_esbocs_block_collect_t); - l_block_collect_params->collecting_level = a_block_collect_params->collecting_level; - l_block_collect_params->minimum_fee = a_block_collect_params->minimum_fee; - l_block_collect_params->chain = a_block_collect_params->chain; - l_block_collect_params->blocks_sign_key = a_block_collect_params->blocks_sign_key; - l_block_collect_params->block_sign_pkey = a_block_collect_params->block_sign_pkey; - l_block_collect_params->collecting_addr = a_block_collect_params->collecting_addr; dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain->net_id); assert(l_net); uint256_t l_value_fee = uint256_0; @@ -393,7 +386,7 @@ void dap_chain_esbocs_add_block_collect(dap_chain_block_cache_t *a_block_cache, if (!IS_ZERO_256(l_value_fee)) { char *l_fee_group = dap_chain_cs_blocks_get_fee_group(l_chain->net_name); dap_global_db_set(l_fee_group, a_block_cache->block_hash_str, &l_value_fee, sizeof(l_value_fee), - false, s_check_db_collect_callback, l_block_collect_params); + false, s_check_db_collect_callback, DAP_DUP(a_block_collect_params)); DAP_DELETE(l_fee_group); } dap_list_free_full(l_list_used_out, NULL); @@ -403,23 +396,16 @@ void dap_chain_esbocs_add_block_collect(dap_chain_block_cache_t *a_block_cache, return; if (dap_chain_block_sign_match_pkey(a_block_cache->block, a_block_cache->block_size, a_block_collect_params->block_sign_pkey)) { - dap_chain_esbocs_block_collect_t *l_block_collect_params = DAP_NEW_Z(dap_chain_esbocs_block_collect_t); - l_block_collect_params->collecting_level = a_block_collect_params->collecting_level; - l_block_collect_params->minimum_fee = a_block_collect_params->minimum_fee; - l_block_collect_params->chain = a_block_collect_params->chain; - l_block_collect_params->blocks_sign_key = a_block_collect_params->blocks_sign_key; - l_block_collect_params->block_sign_pkey = a_block_collect_params->block_sign_pkey; - l_block_collect_params->collecting_addr = a_block_collect_params->collecting_addr; dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain->net_id); assert(l_net); if (!dap_ledger_is_used_reward(l_net->pub.ledger, &a_block_cache->block_hash, - &l_block_collect_params->collecting_addr->data.hash_fast)) { + &a_block_collect_params->collecting_addr->data.hash_fast)) { uint256_t l_value_reward = l_chain->callback_calc_reward(l_chain, &a_block_cache->block_hash, - l_block_collect_params->block_sign_pkey); + a_block_collect_params->block_sign_pkey); if (!IS_ZERO_256(l_value_reward)) { char *l_reward_group = dap_chain_cs_blocks_get_reward_group(l_chain->net_name); dap_global_db_set(l_reward_group, a_block_cache->block_hash_str, &l_value_reward, sizeof(l_value_reward), - false, s_check_db_collect_callback, l_block_collect_params); + false, s_check_db_collect_callback, DAP_DUP(a_block_collect_params)); DAP_DELETE(l_reward_group); } } @@ -427,7 +413,7 @@ void dap_chain_esbocs_add_block_collect(dap_chain_block_cache_t *a_block_cache, } static void s_new_atom_notifier(void *a_arg, dap_chain_t *a_chain, dap_chain_cell_id_t a_id, - void *a_atom, size_t a_atom_size) + void *a_atom, dap_chain_hash_fast_t *a_atom_hash, size_t a_atom_size) { dap_chain_esbocs_session_t *l_session = a_arg; assert(l_session->chain == a_chain); @@ -448,7 +434,7 @@ static void s_new_atom_notifier(void *a_arg, dap_chain_t *a_chain, dap_chain_cel .collecting_addr = PVT(l_session->esbocs)->collecting_addr, .cell_id = a_id }; - dap_chain_block_cache_t *l_block_cache = dap_chain_block_cache_get_by_hash(DAP_CHAIN_CS_BLOCKS(a_chain), &l_last_block_hash); + dap_chain_block_cache_t *l_block_cache = dap_chain_block_cache_get_by_hash(DAP_CHAIN_CS_BLOCKS(a_chain), a_atom_hash); dap_chain_esbocs_add_block_collect(l_block_cache, &l_block_collect_params, DAP_CHAIN_BLOCK_COLLECT_BOTH); } @@ -1700,14 +1686,8 @@ static bool s_session_candidate_to_chain(dap_chain_esbocs_session_t *a_session, const char *l_candidate_hash_str = dap_chain_hash_fast_to_str_static(a_candidate_hash); switch (l_res) { case ATOM_ACCEPT: - // block save to chain - if (dap_chain_atom_save(a_session->chain->cells, (uint8_t *)a_candidate, a_candidate_size, a_candidate_hash) < 0) - log_it(L_ERROR, "Can't save atom %s to the file", l_candidate_hash_str); - else - { - log_it(L_INFO, "block %s added in chain successfully", l_candidate_hash_str); - res = true; - } + log_it(L_INFO, "block %s added in chain successfully", l_candidate_hash_str); + res = true; break; case ATOM_MOVE_TO_THRESHOLD: log_it(L_INFO, "Thresholded atom with hash %s", l_candidate_hash_str); diff --git a/modules/net/dap_chain_ledger.c b/modules/net/dap_chain_ledger.c index 986123b2b0ac2af270736b4fdced14bf873e4b24..e0068e9e6ffa304996f2db05713322d9fd8eb46f 100644 --- a/modules/net/dap_chain_ledger.c +++ b/modules/net/dap_chain_ledger.c @@ -322,10 +322,7 @@ typedef struct dap_ledger_private { bool load_end; // Ledger flags - bool check_ds; - bool check_cells_ds; - bool check_token_emission; - bool cached; + bool check_ds, check_cells_ds, check_token_emission, cached, mapped; dap_chain_cell_id_t local_cell_id; @@ -584,6 +581,7 @@ static dap_ledger_t * dap_ledger_handle_new(void) (dap_timer_callback_t)s_threshold_txs_free, l_ledger); l_ledger_pvt->threshold_emissions_free_timer = dap_interval_timer_create(s_threshold_free_timer_tick, (dap_timer_callback_t) s_threshold_emission_free, l_ledger); + l_ledger_pvt->mapped = dap_config_get_item_bool_default(g_config, "ledger", "mapped", true); return l_ledger; } @@ -1317,12 +1315,12 @@ int dap_ledger_token_add(dap_ledger_t *a_ledger, dap_chain_datum_token_t *a_toke DAP_DELETE(l_token_item->datum_token); } else { log_it(L_ERROR, "Token with ticker '%s' update check failed", l_token->ticker); - DAP_DEL_Z(l_token); + DAP_DELETE(l_token); return -2; } } else if (l_token->type == DAP_CHAIN_DATUM_TOKEN_TYPE_UPDATE) { log_it(L_WARNING, "Token with ticker '%s' does not yet exist, declare it first", l_token->ticker); - DAP_DEL_Z(l_token); + DAP_DELETE(l_token); return -6; } @@ -1331,12 +1329,12 @@ int dap_ledger_token_add(dap_ledger_t *a_ledger, dap_chain_datum_token_t *a_toke dap_sign_t **l_signs = dap_chain_datum_token_signs_parse(l_token, a_token_size, &l_auth_signs_total, &l_auth_signs_valid); if (!l_signs || !l_auth_signs_total) { log_it(L_ERROR, "No auth signs in token '%s' datum!", l_token->ticker); - DAP_DEL_Z(l_token); + DAP_DELETE(l_token); return -7; } l_token_item = DAP_NEW_Z(dap_ledger_token_item_t); if ( !l_token_item ) { - DAP_DEL_Z(l_token); + DAP_DELETE(l_token); log_it(L_CRITICAL, "%s", g_error_memory_alloc); return -8; } @@ -1355,15 +1353,13 @@ int dap_ledger_token_add(dap_ledger_t *a_ledger, dap_chain_datum_token_t *a_toke .description_token_size = 0 }; if ( !l_token_item->auth_pkeys ) { - if (l_token) - DAP_DELETE(l_token); + DAP_DELETE(l_token); DAP_DELETE(l_token_item); log_it(L_CRITICAL, "%s", g_error_memory_alloc); return -6; }; if ( !l_token_item->auth_pkeys ) { - if (l_token) - DAP_DELETE(l_token); + DAP_DELETE(l_token); DAP_DEL_Z(l_token_item->auth_pkeys); DAP_DELETE(l_token_item); log_it(L_CRITICAL, "%s", g_error_memory_alloc); @@ -2409,6 +2405,7 @@ static void s_threshold_txs_proc( dap_ledger_t *a_ledger) if (l_res != DAP_CHAIN_CS_VERIFY_CODE_TX_NO_EMISSION && l_res != DAP_CHAIN_CS_VERIFY_CODE_TX_NO_PREVIOUS) { HASH_DEL(l_ledger_pvt->threshold_txs, l_tx_item); + if ( !l_ledger_pvt->mapped ) DAP_DELETE(l_tx_item->tx); DAP_DELETE(l_tx_item); l_success = true; @@ -2433,7 +2430,8 @@ static void s_threshold_txs_free(dap_ledger_t *a_ledger){ HASH_DEL(l_pvt->threshold_txs, l_current); char l_tx_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE]; dap_chain_hash_fast_to_str(&l_current->tx_hash_fast, l_tx_hash_str, sizeof(l_tx_hash_str)); - DAP_DELETE(l_current->tx); + if ( !l_pvt->mapped ) + DAP_DELETE(l_current->tx); DAP_DELETE(l_current); log_it(L_NOTICE, "Removed transaction %s form threshold ledger", l_tx_hash_str); } @@ -4832,7 +4830,7 @@ int dap_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, dap_ha return -1; } l_item_tmp->tx_hash_fast = *a_tx_hash; - l_item_tmp->tx = DAP_DUP_SIZE(a_tx, dap_chain_datum_tx_get_size(a_tx)); + l_item_tmp->tx = l_ledger_pvt->mapped ? a_tx : DAP_DUP_SIZE(a_tx, dap_chain_datum_tx_get_size(a_tx)); if ( !l_item_tmp->tx ) { DAP_DELETE(l_item_tmp); log_it(L_CRITICAL, "%s", g_error_memory_alloc); @@ -5100,7 +5098,7 @@ int dap_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, dap_ha } l_tx_item->tx_hash_fast = *a_tx_hash; size_t l_tx_size = dap_chain_datum_tx_get_size(a_tx); - l_tx_item->tx = DAP_DUP_SIZE(a_tx, l_tx_size); + l_tx_item->tx = l_ledger_pvt->mapped ? a_tx : DAP_DUP_SIZE(a_tx, l_tx_size); l_tx_item->cache_data.ts_created = dap_time_now(); // Time of transasction added to ledger int l_outs_count = 0; dap_list_t *l_list_tmp = dap_chain_datum_tx_items_get(a_tx, TX_ITEM_TYPE_OUT_ALL, &l_outs_count); @@ -5306,7 +5304,8 @@ void dap_ledger_purge(dap_ledger_t *a_ledger, bool a_preserve_db) /* Delete threshold transactions */ HASH_ITER(hh, l_ledger_pvt->threshold_txs, l_item_current, l_item_tmp) { HASH_DEL(l_ledger_pvt->threshold_txs, l_item_current); - DAP_DELETE(l_item_current->tx); + if (!l_ledger_pvt->mapped) + DAP_DELETE(l_item_current->tx); DAP_DEL_Z(l_item_current); } diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 6918af0098e1be5ceef3407ec5d7e6df2a1da2f5..271f2c2b45221b6f42393d6f33adc335e5684fe9 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -889,7 +889,6 @@ void dap_chain_net_purge(dap_chain_net_t *l_net) { dap_ledger_purge(l_net->pub.ledger, false); dap_chain_net_srv_stake_purge(l_net); - dap_chain_net_decree_purge(l_net); dap_chain_t *l_chain = NULL; DL_FOREACH(l_net->pub.chains, l_chain) { if (l_chain->callback_purge) @@ -898,6 +897,7 @@ void dap_chain_net_purge(dap_chain_net_t *l_net) dap_chain_esbocs_set_min_validators_count(l_chain, 0); l_net->pub.fee_value = uint256_0; l_net->pub.fee_addr = c_dap_chain_addr_blank; + dap_chain_net_decree_purge(l_net); dap_chain_load_all(l_chain); } DL_FOREACH(l_net->pub.chains, l_chain) { diff --git a/modules/net/dap_chain_net_anchor.c b/modules/net/dap_chain_net_anchor.c index 38997a921cd60dafb89020754ed9354704774da0..50c9024ad925a43e1fdac60f9ea1265080ffb7d4 100644 --- a/modules/net/dap_chain_net_anchor.c +++ b/modules/net/dap_chain_net_anchor.c @@ -50,10 +50,9 @@ static int s_anchor_verify(dap_chain_net_t *a_net, dap_chain_datum_anchor_t *a_a return -121; } int ret_val = 0; - dap_chain_datum_anchor_t *l_anchor = a_anchor; - size_t l_signs_size = l_anchor->header.signs_size; + size_t l_signs_size = a_anchor->header.signs_size; //multiple signs reading from datum - dap_sign_t *l_signs_block = (dap_sign_t *)((byte_t*)l_anchor->data_n_sign + l_anchor->header.data_size); + dap_sign_t *l_signs_block = (dap_sign_t *)((byte_t*)a_anchor->data_n_sign + a_anchor->header.data_size); if (!l_signs_size || !l_signs_block) { log_it(L_WARNING, "Anchor data sign not found"); @@ -73,26 +72,26 @@ static int s_anchor_verify(dap_chain_net_t *a_net, dap_chain_datum_anchor_t *a_a return -106; } bool l_sign_authorized = false; - size_t l_signs_size_original = l_anchor->header.signs_size; - l_anchor->header.signs_size = 0; + size_t l_signs_size_original = a_anchor->header.signs_size; + a_anchor->header.signs_size = 0; for (size_t i = 0; i < l_num_of_unique_signs; i++) { for (dap_list_t *it = a_net->pub.decree->pkeys; it; it = it->next) { if (dap_pkey_compare_with_sign(it->data, l_unique_signs[i])) { // TODO make signs verification in s_concate_all_signs_in_array to correctly header.signs_size calculation - size_t l_verify_data_size = l_anchor->header.data_size + sizeof(dap_chain_datum_anchor_t); - if (dap_sign_verify_all(l_unique_signs[i], l_signs_size_original, l_anchor, l_verify_data_size)) + size_t l_verify_data_size = a_anchor->header.data_size + sizeof(dap_chain_datum_anchor_t); + if (dap_sign_verify_all(l_unique_signs[i], l_signs_size_original, a_anchor, l_verify_data_size)) continue; l_sign_authorized = true; break; } } - l_anchor->header.signs_size += dap_sign_get_size(l_unique_signs[i]); + a_anchor->header.signs_size += dap_sign_get_size(l_unique_signs[i]); if (l_sign_authorized) break; } DAP_DELETE(l_signs_arr); DAP_DELETE(l_unique_signs); - l_anchor->header.signs_size = l_signs_size_original; + a_anchor->header.signs_size = l_signs_size_original; if (!l_sign_authorized) { log_it(L_WARNING, "Anchor signs verify failed"); @@ -101,7 +100,7 @@ static int s_anchor_verify(dap_chain_net_t *a_net, dap_chain_datum_anchor_t *a_a dap_hash_fast_t l_decree_hash = {}; dap_chain_datum_decree_t *l_decree = NULL; - if ((ret_val = dap_chain_datum_anchor_get_hash_from_data(l_anchor, &l_decree_hash)) != 0) { + if ((ret_val = dap_chain_datum_anchor_get_hash_from_data(a_anchor, &l_decree_hash)) != 0) { log_it(L_WARNING, "Can't get hash from anchor data"); return -106; } diff --git a/modules/net/dap_chain_net_decree.c b/modules/net/dap_chain_net_decree.c index 80a0a27b6c5325decbc69ce562118c96737b23fe..2e717d59e0afadd6c4f7b4edfd04c14962ff87b7 100644 --- a/modules/net/dap_chain_net_decree.c +++ b/modules/net/dap_chain_net_decree.c @@ -105,7 +105,8 @@ int dap_chain_net_decree_deinit(dap_chain_net_t *a_net) struct decree_hh *l_decree_hh, *l_tmp; HASH_ITER(hh, s_decree_hh, l_decree_hh, l_tmp) { HASH_DEL(s_decree_hh, l_decree_hh); - DAP_DELETE(l_decree_hh->decree); + if ( !dap_chain_find_by_id(l_decree_hh->decree->header.common_decree_params.net_id, l_decree_hh->decree->header.common_decree_params.chain_id)->is_mapped ) + DAP_DELETE(l_decree_hh->decree); DAP_DELETE(l_decree_hh); } return 0; @@ -139,12 +140,10 @@ static int s_decree_verify(dap_chain_net_t *a_net, dap_chain_datum_decree_t *a_d return -123; } - dap_chain_datum_decree_t *l_decree = a_decree; // Get pkeys sign from decree datum - size_t l_signs_size = 0; //multiple signs reading from datum - dap_sign_t *l_signs_block = dap_chain_datum_decree_get_signs(l_decree, &l_signs_size); + dap_sign_t *l_signs_block = dap_chain_datum_decree_get_signs(a_decree, &l_signs_size); if (!l_signs_size || !l_signs_block) { log_it(L_WARNING, "Decree data sign not found"); @@ -168,14 +167,14 @@ static int s_decree_verify(dap_chain_net_t *a_net, dap_chain_datum_decree_t *a_d // Verify all keys and its signatures uint16_t l_signs_size_for_current_sign = 0, l_signs_verify_counter = 0; - l_decree->header.signs_size = 0; - size_t l_verify_data_size = l_decree->header.data_size + sizeof(dap_chain_datum_decree_t); + a_decree->header.signs_size = 0; + size_t l_verify_data_size = a_decree->header.data_size + sizeof(dap_chain_datum_decree_t); for (size_t i = 0; i < l_num_of_unique_signs; i++) { size_t l_sign_max_size = dap_sign_get_size(l_unique_signs[i]); if (s_verify_pkey(l_unique_signs[i], a_net)) { // 3. verify sign - if(!dap_sign_verify_all(l_unique_signs[i], l_sign_max_size, l_decree, l_verify_data_size)) + if(!dap_sign_verify_all(l_unique_signs[i], l_sign_max_size, a_decree, l_verify_data_size)) l_signs_verify_counter++; } else { dap_hash_fast_t l_sign_pkey_hash = {0}; @@ -188,10 +187,10 @@ static int s_decree_verify(dap_chain_net_t *a_net, dap_chain_datum_decree_t *a_d } // Each sign change the sign_size field by adding its size after signing. So we need to change this field in header for each sign. l_signs_size_for_current_sign += l_sign_max_size; - l_decree->header.signs_size = l_signs_size_for_current_sign; + a_decree->header.signs_size = l_signs_size_for_current_sign; } - l_decree->header.signs_size = l_signs_size; + a_decree->header.signs_size = l_signs_size; // DAP_DELETE(l_signs_arr); DAP_DELETE(l_unique_signs); @@ -274,9 +273,8 @@ int dap_chain_net_decree_apply(dap_hash_fast_t *a_decree_hash, dap_chain_datum_d log_it(L_WARNING, "Decree with hash %s is already present", dap_hash_fast_to_str_static(a_decree_hash)); return -123; } - l_decree_hh->decree = DAP_DUP_SIZE(a_decree, dap_chain_datum_decree_get_size(a_decree)); - if (a_decree->header.common_decree_params.chain_id.uint64 != a_chain->id.uint64 && - !l_decree_hh->wait_for_apply) + l_decree_hh->decree = a_chain->is_mapped ? a_decree : DAP_DUP_SIZE(a_decree, dap_chain_datum_decree_get_size(a_decree)); + if (a_decree->header.common_decree_params.chain_id.uint64 != a_chain->id.uint64 && !l_decree_hh->wait_for_apply) // Apply it with corresponding anchor return ret_val; } diff --git a/modules/type/blocks/dap_chain_block_cache.c b/modules/type/blocks/dap_chain_block_cache.c index 0a9926d4d25ca8858c1bf2214e632a2b8f4cddbb..741c37355d209aa7b5678c675f88010d9e24dbb6 100644 --- a/modules/type/blocks/dap_chain_block_cache.c +++ b/modules/type/blocks/dap_chain_block_cache.c @@ -54,7 +54,7 @@ void dap_chain_block_cache_deinit() */ dap_chain_block_cache_t *dap_chain_block_cache_new(dap_hash_fast_t *a_block_hash, dap_chain_block_t *a_block, - size_t a_block_size, uint64_t a_block_number) + size_t a_block_size, uint64_t a_block_number, bool a_copy_block) { if (! a_block) return NULL; @@ -64,7 +64,7 @@ dap_chain_block_cache_t *dap_chain_block_cache_new(dap_hash_fast_t *a_block_hash log_it(L_CRITICAL, "%s", g_error_memory_alloc); return NULL; } - l_block_cache->block = DAP_DUP_SIZE(a_block, a_block_size); + l_block_cache->block = a_copy_block ? DAP_DUP_SIZE(a_block, a_block_size) : a_block; if (!l_block_cache->block) { log_it(L_CRITICAL, "%s", g_error_memory_alloc); return NULL; @@ -75,10 +75,11 @@ dap_chain_block_cache_t *dap_chain_block_cache_new(dap_hash_fast_t *a_block_hash l_block_cache->sign_count = dap_chain_block_get_signs_count(a_block, a_block_size); if (dap_chain_block_cache_update(l_block_cache, a_block_hash)) { log_it(L_WARNING, "Block cache can't be created, possible cause corrupted block inside"); + if (a_copy_block) + DAP_DELETE(l_block_cache->block); DAP_DELETE(l_block_cache); return NULL; } - //log_it(L_DEBUG,"Block cache created"); return l_block_cache; } diff --git a/modules/type/blocks/dap_chain_block_chunk.c b/modules/type/blocks/dap_chain_block_chunk.c index f7d814bd523741b178a41b401a628abe8dd1ba7c..c29d1ec425ed3e0c7debed32245bbdb8ec15b7ed 100644 --- a/modules/type/blocks/dap_chain_block_chunk.c +++ b/modules/type/blocks/dap_chain_block_chunk.c @@ -49,7 +49,7 @@ dap_chain_block_chunks_t * dap_chain_block_chunks_create(dap_chain_cs_blocks_t * for(size_t n=0; n< l_objs_count; n++){ dap_hash_fast_t l_block_hash; dap_chain_hash_fast_from_str(l_objs[n].key, &l_block_hash); - dap_chain_block_cache_t *l_block_cache = dap_chain_block_cache_new(&l_block_hash, (dap_chain_block_t *)l_objs[n].value, l_objs[n].value_len, n + 1); + dap_chain_block_cache_t *l_block_cache = dap_chain_block_cache_new(&l_block_hash, (dap_chain_block_t *)l_objs[n].value, l_objs[n].value_len, n + 1, true); dap_chain_block_chunks_add(l_ret, l_block_cache ); } dap_global_db_objs_delete(l_objs,l_objs_count); diff --git a/modules/type/blocks/dap_chain_cs_blocks.c b/modules/type/blocks/dap_chain_cs_blocks.c index cf0eb23c085cf2d47c348d6a618f71869fc08de3..ed77c8a7fb55bc4645f48f40b3625af3a2e32952 100644 --- a/modules/type/blocks/dap_chain_cs_blocks.c +++ b/modules/type/blocks/dap_chain_cs_blocks.c @@ -1522,15 +1522,35 @@ static dap_chain_atom_verify_res_t s_callback_atom_add(dap_chain_t * a_chain, da } dap_chain_atom_verify_res_t ret = s_callback_atom_verify(a_chain, a_atom, a_atom_size); switch (ret) { - case ATOM_ACCEPT: - l_block_cache = dap_chain_block_cache_new(&l_block_hash, l_block, l_block_size, PVT(l_blocks)->blocks_count + 1); - if (!l_block_cache) - break; + case ATOM_ACCEPT: { + dap_chain_cell_t *l_cell = dap_chain_cell_find_by_id(a_chain, l_block->hdr.cell_id); + if ( !dap_chain_net_get_load_mode( dap_chain_net_by_id(a_chain->net_id)) ) { + if ( (ret = dap_chain_atom_save(l_cell, a_atom, a_atom_size, &l_block_hash)) < 0 ) { + log_it(L_ERROR, "Can't save atom to file, code %d", ret); + pthread_rwlock_unlock(&PVT(l_blocks)->rwlock); + return ATOM_REJECT; + } else if (a_chain->is_mapped) { + l_block = (dap_chain_block_t*)( l_cell->map_pos += sizeof(uint64_t) ); // Switching to mapped area + l_cell->map_pos += a_atom_size; + } + ret = ATOM_PASS; + } + if ( !(l_block_cache = dap_chain_block_cache_new(&l_block_hash, l_block, l_block_size, + PVT(l_blocks)->blocks_count + 1, !a_chain->is_mapped)) ) + { + log_it(L_DEBUG, "... corrupted block"); + pthread_rwlock_unlock(&PVT(l_blocks)->rwlock); + return ATOM_REJECT; + } debug_if(s_debug_more, L_DEBUG, "... new block %s", l_block_cache->block_hash_str); HASH_ADD(hh, PVT(l_blocks)->blocks, block_hash, sizeof(l_block_cache->block_hash), l_block_cache); ++PVT(l_blocks)->blocks_count; debug_if(s_debug_more, L_DEBUG, "Verified atom %p: ACCEPTED", a_atom); + s_add_atom_datums(l_blocks, l_block_cache); + dap_chain_atom_notify(l_cell, l_block, a_atom_size); + dap_chain_atom_add_from_threshold(a_chain); break; + } case ATOM_MOVE_TO_THRESHOLD: // TODO: reimplement and enable threshold for blocks /* { @@ -1547,8 +1567,6 @@ static dap_chain_atom_verify_res_t s_callback_atom_add(dap_chain_t * a_chain, da break; } pthread_rwlock_unlock(&PVT(l_blocks)->rwlock); - if (ret == ATOM_ACCEPT) - s_add_atom_datums(l_blocks, l_block_cache); return ret; } @@ -1779,7 +1797,7 @@ static dap_chain_atom_ptr_t s_callback_atom_iter_get(dap_chain_atom_iter_t *a_at a_atom_iter->cur_item = l_blocks_pvt->blocks; break; case DAP_CHAIN_ITER_OP_LAST: - HASH_LAST(l_blocks_pvt->blocks, a_atom_iter->cur_item); + a_atom_iter->cur_item = HASH_LAST(l_blocks_pvt->blocks); break; case DAP_CHAIN_ITER_OP_NEXT: if (a_atom_iter->cur_item) diff --git a/modules/type/blocks/include/dap_chain_block_cache.h b/modules/type/blocks/include/dap_chain_block_cache.h index d3231266816568044a75027e74a55a584f091b95..d69e2e652778d4a44dbc6576d31106ba6bdbd6ba 100644 --- a/modules/type/blocks/include/dap_chain_block_cache.h +++ b/modules/type/blocks/include/dap_chain_block_cache.h @@ -73,7 +73,7 @@ void dap_chain_block_cache_deinit(); dap_chain_block_cache_t *dap_chain_block_cache_new(dap_hash_fast_t *a_block_hash, dap_chain_block_t *a_block, - size_t a_block_size, uint64_t a_block_number); + size_t a_block_size, uint64_t a_block_number, bool a_copy_block); dap_chain_block_cache_t *dap_chain_block_cache_dup(dap_chain_block_cache_t *a_block); int dap_chain_block_cache_update(dap_chain_block_cache_t *a_block_cache, dap_hash_fast_t *a_block_hash); void dap_chain_block_cache_delete(dap_chain_block_cache_t *a_block_cache); diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 013091149027c0e9f8f9505c2a6f5b1452ed3e59..07756772ce34fb553b68095f6da7f4dfad9d9827 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -65,6 +65,7 @@ typedef struct dap_chain_cs_dag_event_item { size_t event_size; uint64_t event_number; int ret_code; + char *mapped_region; UT_hash_handle hh, hh_select, hh_datums; } dap_chain_cs_dag_event_item_t; @@ -304,7 +305,8 @@ static void s_dap_chain_cs_dag_threshold_free(dap_chain_cs_dag_t *a_dag) { l_el->hash = l_current->hash; HASH_ADD(hh, l_pvt->removed_events_from_treshold, hash, sizeof(dap_chain_hash_fast_t), l_el); char *l_hash_dag = dap_hash_fast_to_str_new(&l_current->hash); - DAP_DELETE(l_current->event); + if (!a_dag->chain->is_mapped && !l_current->mapped_region) + DAP_DELETE(l_current->event); HASH_DEL(l_pvt->events_treshold, l_current); DAP_DELETE(l_current); log_it(L_NOTICE, "Removed DAG event with %s hash from trashold.", l_hash_dag); @@ -315,7 +317,8 @@ static void s_dap_chain_cs_dag_threshold_free(dap_chain_cs_dag_t *a_dag) { HASH_ITER(hh, l_pvt->events_treshold_conflicted, l_current, l_tmp) { if (l_current->ts_added < l_time_cut_off) { char *l_hash_dag = dap_hash_fast_to_str_new(&l_current->hash); - DAP_DELETE(l_current->event); + if (!a_dag->chain->is_mapped && !l_current->mapped_region) + DAP_DELETE(l_current->event); HASH_DEL(l_pvt->events_treshold_conflicted, l_current); DAP_DELETE(l_current); log_it(L_NOTICE, "Removed DAG event with %s hash from trashold.", l_hash_dag); @@ -334,18 +337,26 @@ static void s_dap_chain_cs_dag_purge(dap_chain_t *a_chain) // Clang bug at this, l_event_current should change at every loop cycle HASH_ITER(hh, l_dag_pvt->events, l_event_current, l_event_tmp) { HASH_DEL(l_dag_pvt->events, l_event_current); + if (!a_chain->is_mapped && !l_event_current->mapped_region) + DAP_DELETE(l_event_current->event); DAP_DELETE(l_event_current); } HASH_ITER(hh, l_dag_pvt->events_lasts_unlinked, l_event_current, l_event_tmp) { HASH_DEL(l_dag_pvt->events_lasts_unlinked, l_event_current); + if (!a_chain->is_mapped && !l_event_current->mapped_region) + DAP_DELETE(l_event_current->event); DAP_DELETE(l_event_current); } HASH_ITER(hh, l_dag_pvt->events_treshold, l_event_current, l_event_tmp) { HASH_DEL(l_dag_pvt->events_treshold, l_event_current); + if (!a_chain->is_mapped && !l_event_current->mapped_region) + DAP_DELETE(l_event_current->event); DAP_DELETE(l_event_current); } HASH_ITER(hh, l_dag_pvt->events_treshold_conflicted, l_event_current, l_event_tmp) { HASH_DEL(l_dag_pvt->events_treshold_conflicted, l_event_current); + if (!a_chain->is_mapped && !l_event_current->mapped_region) + DAP_DELETE(l_event_current->event); DAP_DELETE(l_event_current); } pthread_mutex_unlock(&l_dag_pvt->events_mutex); @@ -442,7 +453,7 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha dap_chain_cs_dag_event_calc_hash(l_event, a_atom_size, &l_event_hash); if (s_debug_more) { char l_event_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = { '\0' }; - dap_chain_hash_fast_to_str(&l_event_item->hash, l_event_hash_str, sizeof(l_event_hash_str)); + dap_chain_hash_fast_to_str(&l_event_hash, l_event_hash_str, sizeof(l_event_hash_str)); log_it(L_DEBUG, "Processing event: %s ... (size %zd)", l_event_hash_str,a_atom_size); } pthread_mutex_lock(&PVT(l_dag)->events_mutex); @@ -455,7 +466,7 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha case ATOM_ACCEPT: ret = s_chain_callback_atom_verify(a_chain, a_atom, a_atom_size); if (ret == ATOM_MOVE_TO_THRESHOLD) { - if (!s_threshold_enabled && !dap_chain_net_get_load_mode(dap_chain_net_by_id(a_chain->net_id))) + if (!s_threshold_enabled /*&& !dap_chain_net_get_load_mode(dap_chain_net_by_id(a_chain->net_id))*/) ret = ATOM_REJECT; } debug_if(s_debug_more, L_DEBUG, "Verified atom %p: %s", a_atom, dap_chain_atom_verify_res_str[ret]); @@ -468,28 +479,26 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha break; } - l_event_item = DAP_NEW_Z(dap_chain_cs_dag_event_item_t); - if (!l_event_item) { + if ( !(l_event_item = DAP_NEW_Z(dap_chain_cs_dag_event_item_t)) ) { log_it(L_CRITICAL, "%s", g_error_memory_alloc); pthread_mutex_unlock(&PVT(l_dag)->events_mutex); return ATOM_REJECT; } - l_event_item->event = DAP_DUP_SIZE(a_atom, a_atom_size); - if (!l_event_item->event) { - log_it(L_CRITICAL, "%s", g_error_memory_alloc); - pthread_mutex_unlock(&PVT(l_dag)->events_mutex); - return ATOM_REJECT; - } - l_event_item->event_size = a_atom_size; - l_event_item->ts_added = dap_time_now(); - l_event_item->hash = l_event_hash; + *l_event_item = (dap_chain_cs_dag_event_item_t) { + .hash = l_event_hash, + .ts_added = dap_time_now(), + .event = a_chain->is_mapped ? l_event : DAP_DUP_SIZE(l_event, a_atom_size), + .event_size = a_atom_size + }; switch (ret) { case ATOM_MOVE_TO_THRESHOLD: { dap_chain_cs_dag_blocked_t *el = NULL; - HASH_FIND(hh, PVT(l_dag)->removed_events_from_treshold, &l_event_item->hash, sizeof(dap_chain_hash_fast_t), el); + HASH_FIND(hh, PVT(l_dag)->removed_events_from_treshold, &l_event_hash, sizeof(dap_chain_hash_fast_t), el); if (!el) { - HASH_ADD(hh, PVT(l_dag)->events_treshold, hash, sizeof(l_event_item->hash), l_event_item); + if ( a_chain->is_mapped && dap_chain_net_get_load_mode(dap_chain_net_by_id(a_chain->net_id)) ) + l_event_item->mapped_region = (char*)l_event; + HASH_ADD(hh, PVT(l_dag)->events_treshold, hash, sizeof(l_event_hash), l_event_item); debug_if(s_debug_more, L_DEBUG, "... added to threshold"); } else { ret = ATOM_REJECT; @@ -498,6 +507,17 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha break; } case ATOM_ACCEPT: { + dap_chain_cell_t *l_cell = dap_chain_cell_find_by_id(a_chain, l_event->header.cell_id); + if ( !dap_chain_net_get_load_mode( dap_chain_net_by_id(a_chain->net_id)) ) { + if ( dap_chain_atom_save(l_cell, a_atom, a_atom_size, &l_event_hash) < 0 ) { + log_it(L_ERROR, "Can't save atom to file"); + ret = ATOM_REJECT; + break; + } else if (a_chain->is_mapped) { + l_event_item->event = (dap_chain_cs_dag_event_t*)( l_cell->map_pos += sizeof(uint64_t) ); + l_cell->map_pos += a_atom_size; + } + } int l_consensus_check = s_dap_chain_add_atom_to_events_table(l_dag, l_event_item); switch (l_consensus_check) { case 0: @@ -517,8 +537,7 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha debug_if(s_debug_more, L_WARNING, "... added with ledger code %d", l_consensus_check); break; } - dap_chain_cs_dag_event_item_t *l_tail; - HASH_LAST(PVT(l_dag)->events, l_tail); + dap_chain_cs_dag_event_item_t *l_tail = HASH_LAST(PVT(l_dag)->events); if (l_tail && l_tail->event->header.ts_created > l_event->header.ts_created) { DAP_CHAIN_PVT(a_chain)->need_reorder = true; HASH_ADD_INORDER(hh, PVT(l_dag)->events, hash, sizeof(l_event_item->hash), l_event_item, s_sort_event_item); @@ -528,13 +547,16 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha } else HASH_ADD(hh, PVT(l_dag)->events, hash, sizeof(l_event_item->hash), l_event_item); s_dag_events_lasts_process_new_last_event(l_dag, l_event_item); + dap_chain_atom_notify(l_cell, l_event_item->event, l_event_item->event_size); + dap_chain_atom_add_from_threshold(a_chain); } break; default: break; } pthread_mutex_unlock(&PVT(l_dag)->events_mutex); if (ret == ATOM_REJECT) { // Neither added, nor freed - DAP_DELETE(l_event_item->event); + if (!a_chain->is_mapped) + DAP_DELETE(l_event_item->event); DAP_DELETE(l_event_item); } return ret; @@ -673,12 +695,12 @@ static bool s_chain_callback_datums_pool_proc(dap_chain_t *a_chain, dap_chain_da bool l_res = false; if (l_dag->is_add_directly) { dap_chain_atom_verify_res_t l_verify_res = s_chain_callback_atom_add(a_chain, l_event, l_event_size); - if (l_verify_res == ATOM_ACCEPT) - l_res = dap_chain_atom_save(a_chain->cells, (uint8_t *)l_event, l_event_size, &l_event_hash) > 0; - else - log_it(L_ERROR, "Can't add new event to the file, atom verification result %d", l_verify_res); DAP_DELETE(l_event); - return l_res; + if (l_verify_res != ATOM_ACCEPT) { + log_it(L_ERROR, "Can't add new event to the file, atom verification result %d", l_verify_res); + return false; + } else + return true; } dap_global_db_set_sync(l_dag->gdb_group_events_round_new, DAG_ROUND_CURRENT_KEY, @@ -941,17 +963,30 @@ dap_chain_cs_dag_event_item_t* s_dag_proc_treshold(dap_chain_cs_dag_t * a_dag) if (ret == DAP_THRESHOLD_OK) { debug_if(s_debug_more, L_DEBUG, "Processing event (threshold): %s...", dap_chain_hash_fast_to_str_static(&l_event_item->hash)); + dap_chain_cell_t *l_cell = dap_chain_cell_find_by_id(a_dag->chain, l_event_item->event->header.cell_id); + if ( !l_event_item->mapped_region ) { + if ( dap_chain_atom_save(l_cell, (const byte_t*)l_event_item->event, l_event_item->event_size, &l_event_item->hash) < 0 ) { + log_it(L_CRITICAL, "Can't move atom from threshold to file"); + res = false; + break; + } else if (a_dag->chain->is_mapped) { + l_event_item->event = (dap_chain_cs_dag_event_t*)( l_cell->map_pos += sizeof(uint64_t) ); + l_cell->map_pos += l_event_item->event_size; + } + } int l_add_res = s_dap_chain_add_atom_to_events_table(a_dag, l_event_item); HASH_DEL(PVT(a_dag)->events_treshold, l_event_item); if (!l_add_res) { HASH_ADD(hh, PVT(a_dag)->events, hash, sizeof(l_event_item->hash), l_event_item); s_dag_events_lasts_process_new_last_event(a_dag, l_event_item); - debug_if(s_debug_more, L_INFO, "... moved from treshold to main chains"); + debug_if(s_debug_more, L_INFO, "... moved from threshold to chain"); + dap_chain_atom_notify(l_cell, l_event_item->event, l_event_item->event_size); res = true; } else { // TODO clear other threshold items linked with this one debug_if(s_debug_more, L_WARNING, "... rejected with ledger code %d", l_add_res); - DAP_DELETE(l_event_item->event); + if (!l_event_item->mapped_region) + DAP_DELETE(l_event_item->event); DAP_DELETE(l_event_item); } break; @@ -1101,7 +1136,7 @@ static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get(dap_chain_atom_iter_t a_atom_iter->cur_item = l_dag_pvt->events; break; case DAP_CHAIN_ITER_OP_LAST: - HASH_LAST(l_dag_pvt->events, a_atom_iter->cur_item); + a_atom_iter->cur_item = HASH_LAST(l_dag_pvt->events); break; case DAP_CHAIN_ITER_OP_NEXT: if (a_atom_iter->cur_item) @@ -2027,8 +2062,7 @@ static dap_list_t *s_callback_get_atoms(dap_chain_t *a_chain, size_t a_count, si size_t l_counter = 0; size_t l_end = l_offset + a_count; - dap_chain_cs_dag_event_item_t *l_ptr; - HASH_LAST(l_dag_pvt->events, l_ptr); + dap_chain_cs_dag_event_item_t *l_ptr = HASH_LAST(l_dag_pvt->events); for (dap_chain_cs_dag_event_item_t *ptr = l_ptr; ptr != NULL && l_counter < l_end; ptr = ptr->hh.prev){ if (l_counter >= l_offset){ dap_chain_cs_dag_event_t *l_event = ptr->event;