Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (2)
Showing
with 390 additions and 262 deletions
Subproject commit f5409cdd40208193ec7d0e577318bfeb30f994e5
Subproject commit 5381607bce56e21fe61c878b343468cdb010a855
......@@ -2,9 +2,13 @@ cmake_minimum_required(VERSION 3.10)
project (dap_chain)
file(GLOB DAP_CHAIN_SRCS *.c)
file(GLOB DAP_CHAIN_HEADERS include/*.h)
if (WIN32)
file (GLOB MMAN_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/../../dap-sdk/3rdparty/mman/*)
list(APPEND DAP_CHAIN_SRCS ${MMAN_SRCS})
endif()
add_library(${PROJECT_NAME} STATIC ${DAP_CHAIN_SRCS} ${DAP_CHAIN_HEADERS})
if(BUILD_CELLFRAME_SDK_TESTS)
......@@ -17,7 +21,9 @@ endif()
target_link_libraries(${PROJECT_NAME} dap_chain_common dap_global_db dap_notify_srv dap_stream ${GLIB_LDFLAGS})
target_include_directories(${PROJECT_NAME} INTERFACE . include/ ${GLIB_INCLUDE_DIRS})
target_include_directories(${PROJECT_NAME} PUBLIC include)
target_include_directories(${PROJECT_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../../../dap-sdk/3rdparty/uthash/src)
if (WIN32)
target_include_directories(${PROJECT_NAME} INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/../../dap-sdk/3rdparty/mman/)
endif()
if (INSTALL_SDK)
set_target_properties(${PROJECT_NAME} PROPERTIES PUBLIC_HEADER "${DAP_CHAIN_HEADERS}")
......
......@@ -110,6 +110,11 @@ dap_chain_t *dap_chain_create(const char *a_chain_net_name, const char *a_chain_
.net_id = a_chain_net_id,
.name = dap_strdup(a_chain_name),
.net_name = dap_strdup(a_chain_net_name),
#ifdef DAP_OS_WINDOWS // TODO
.is_mapped = false,
#else
.is_mapped = dap_config_get_item_bool_default(g_config, "ledger", "mapped", true),
#endif
.cell_rwlock = PTHREAD_RWLOCK_INITIALIZER,
.atom_notifiers = NULL
};
......@@ -392,7 +397,10 @@ dap_chain_t *dap_chain_load_from_cfg(const char *a_chain_net_name, dap_chain_net
if ( dap_config_get_item_str_default(l_cfg, "files","storage_dir", NULL) )
{
DAP_CHAIN_PVT(l_chain)->file_storage_dir = (char*)dap_config_get_item_path( l_cfg , "files","storage_dir" );
DAP_CHAIN_PVT(l_chain)->file_storage_dir = (char*)dap_config_get_item_path( l_cfg, "files", "storage_dir" );
if (!dap_dir_test(DAP_CHAIN_PVT(l_chain)->file_storage_dir)) {
dap_mkdir_with_parents(DAP_CHAIN_PVT(l_chain)->file_storage_dir);
}
} else
log_it (L_INFO, "Not set file storage path, will not stored in files");
......@@ -592,9 +600,6 @@ int dap_chain_load_all(dap_chain_t *a_chain)
char *l_storage_dir = DAP_CHAIN_PVT(a_chain)->file_storage_dir;
if (!l_storage_dir)
return 0;
if (!dap_dir_test(l_storage_dir)) {
dap_mkdir_with_parents(l_storage_dir);
}
DIR *l_dir = opendir(l_storage_dir);
if (!l_dir) {
log_it(L_ERROR, "Cannot open directory %s", DAP_CHAIN_PVT(a_chain)->file_storage_dir);
......@@ -724,7 +729,8 @@ static bool s_notify_atom_on_thread(void *a_arg)
struct chain_thread_notifier *l_arg = a_arg;
assert(l_arg->atom && l_arg->callback);
l_arg->callback(l_arg->callback_arg, l_arg->chain, l_arg->cell_id, l_arg->atom, l_arg->atom_size);
DAP_DELETE(l_arg->atom);
if ( !l_arg->chain->is_mapped )
DAP_DELETE(l_arg->atom);
DAP_DELETE(l_arg);
return false;
}
......@@ -747,41 +753,7 @@ ssize_t dap_chain_atom_save(dap_chain_cell_t *a_chain_cell, const uint8_t *a_ato
}
}
}
ssize_t l_res = dap_chain_cell_file_append(a_chain_cell, a_atom, a_atom_size);
if (l_chain->atom_notifiers) {
dap_list_t *l_iter;
DL_FOREACH(l_chain->atom_notifiers, l_iter) {
dap_chain_atom_notifier_t *l_notifier = (dap_chain_atom_notifier_t*)l_iter->data;
struct chain_thread_notifier *l_arg = DAP_NEW_Z(struct chain_thread_notifier);
if (!l_arg) {
log_it(L_CRITICAL, "%s", g_error_memory_alloc);
continue;
}
*l_arg = (struct chain_thread_notifier) { .callback = l_notifier->callback, .callback_arg = l_notifier->arg,
.chain = l_chain, .cell_id = a_chain_cell->id, .atom_size = a_atom_size };
l_arg->atom = DAP_DUP_SIZE(a_atom, a_atom_size);
if (!l_arg->atom) {
DAP_DELETE(l_arg);
log_it(L_CRITICAL, "%s", g_error_memory_alloc);
continue;
}
dap_proc_thread_callback_add_pri(NULL, s_notify_atom_on_thread, l_arg, DAP_QUEUE_MSG_PRIORITY_LOW);
}
}
if (l_chain->callback_atom_add_from_treshold) {
dap_chain_atom_ptr_t l_atom_treshold;
do {
size_t l_atom_treshold_size;
l_atom_treshold = l_chain->callback_atom_add_from_treshold(l_chain, &l_atom_treshold_size);
if (l_atom_treshold) {
if (dap_chain_cell_file_append(a_chain_cell, l_atom_treshold, l_atom_treshold_size) > 0)
log_it(L_INFO, "Added atom from treshold");
else
log_it(L_ERROR, "Can't add atom from treshold");
}
} while(l_atom_treshold);
}
return l_res;
return dap_chain_cell_file_append(a_chain_cell, a_atom, a_atom_size);
}
/**
......@@ -824,6 +796,36 @@ const char* dap_chain_get_path(dap_chain_t *a_chain)
return DAP_CHAIN_PVT(a_chain)->file_storage_dir;
}
void dap_chain_atom_notify(dap_chain_cell_t *a_chain_cell, const uint8_t *a_atom, size_t a_atom_size) {
if ( !a_chain_cell->chain->atom_notifiers )
return;
dap_list_t *l_iter;
DL_FOREACH(a_chain_cell->chain->atom_notifiers, l_iter) {
dap_chain_atom_notifier_t *l_notifier = (dap_chain_atom_notifier_t*)l_iter->data;
struct chain_thread_notifier *l_arg = DAP_NEW_Z(struct chain_thread_notifier);
if (!l_arg) {
log_it(L_CRITICAL, "%s", g_error_memory_alloc);
continue;
}
*l_arg = (struct chain_thread_notifier) {
.callback = l_notifier->callback, .callback_arg = l_notifier->arg,
.chain = a_chain_cell->chain, .cell_id = a_chain_cell->id,
.atom = a_chain_cell->chain->is_mapped ? a_atom : DAP_DUP_SIZE(a_atom, a_atom_size),
.atom_size = a_atom_size };
dap_proc_thread_callback_add_pri(NULL, s_notify_atom_on_thread, l_arg, DAP_QUEUE_MSG_PRIORITY_LOW);
}
}
void dap_chain_atom_add_from_threshold(dap_chain_t *a_chain) {
if ( !a_chain->callback_atom_add_from_treshold )
return;
dap_chain_atom_ptr_t l_atom_treshold = NULL;
do {
size_t l_atom_treshold_size;
l_atom_treshold = a_chain->callback_atom_add_from_treshold(a_chain, &l_atom_treshold_size);
} while(l_atom_treshold);
}
const char *dap_chain_type_to_str(const dap_chain_type_t a_default_chain_type) {
switch (a_default_chain_type)
{
......
......@@ -30,16 +30,18 @@
#include "dap_config.h"
#include "dap_strfuncs.h"
#include "dap_file_utils.h"
#ifdef DAP_OS_WINDOWS
#include "mman.h"
#else
#include <sys/mman.h>
#endif
#define LOG_TAG "dap_chain_cell"
#define DAP_CHAIN_CELL_FILE_VERSION 1
#define DAP_CHAIN_CELL_FILE_SIGNATURE 0xfa340bef153eba48
#define DAP_CHAIN_CELL_FILE_TYPE_RAW 0
#define DAP_CHAIN_CELL_FILE_TYPE_COMPRESSED 1
#define DAP_MAPPED_VOLUME_LIMIT (1 << 28) // 256 MB for now, may be should be configurable?
/**
* @struct dap_chain_cell_file_header
*/
......@@ -53,6 +55,7 @@ typedef struct dap_chain_cell_file_header
dap_chain_cell_id_t cell_id;
} DAP_ALIGN_PACKED dap_chain_cell_file_header_t;
static bool s_debug_more = false;
/**
* @brief dap_chain_cell_init
......@@ -61,6 +64,7 @@ typedef struct dap_chain_cell_file_header
*/
int dap_chain_cell_init(void)
{
s_debug_more = dap_config_get_item_bool_default(g_config, "chain", "debug_more", false);
//s_cells_path = dap_config_get_item_str(g_config,"resources","cells_storage");
return 0;
}
......@@ -99,17 +103,57 @@ dap_chain_cell_t * dap_chain_cell_create_fill(dap_chain_t * a_chain, dap_chain_c
pthread_rwlock_unlock(&a_chain->cell_rwlock);
return l_cell;
}
char file_storage_path[MAX_PATH];
snprintf(file_storage_path, MAX_PATH, "%s/%0"DAP_UINT64_FORMAT_x".dchaincell",
DAP_CHAIN_PVT(a_chain)->file_storage_dir, a_cell_id.uint64);
char *l_map = NULL;
size_t l_size = 0;
FILE *l_file = fopen(file_storage_path, "r+b");
if ( l_file ) {
if ( a_chain->is_mapped ) {
fseek(l_file, 0, SEEK_END);
l_size = ftell(l_file);
fseek(l_file, 0, SEEK_SET);
if ( MAP_FAILED == (l_map = mmap(NULL, l_size ? l_size : dap_page_roundup(DAP_MAPPED_VOLUME_LIMIT), PROT_READ|PROT_WRITE, MAP_PRIVATE, fileno(l_file), 0)) ) {
log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X" cannot be mapped, errno %d", file_storage_path, a_cell_id.uint64, errno);
fclose(l_file);
pthread_rwlock_unlock(&a_chain->cell_rwlock);
return NULL;
}
}
} else if (errno == ENOENT) {
if ( !(l_file = fopen(file_storage_path, "w+b")) ) {
log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X" cannot be opened, error %d",
file_storage_path, a_cell_id.uint64, errno);
pthread_rwlock_unlock(&a_chain->cell_rwlock);
return NULL;
}
if ( MAP_FAILED == (l_map = mmap(NULL, l_size = dap_page_roundup(DAP_MAPPED_VOLUME_LIMIT), PROT_READ|PROT_WRITE, MAP_PRIVATE, fileno(l_file), 0)) ) {
log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X" cannot be mapped, error %d",
file_storage_path, a_cell_id.uint64, errno);
fclose(l_file);
pthread_rwlock_unlock(&a_chain->cell_rwlock);
return NULL;
}
}
l_cell = DAP_NEW_Z(dap_chain_cell_t);
if ( !l_cell ) {
pthread_rwlock_unlock(&a_chain->cell_rwlock);
log_it(L_CRITICAL, "%s", g_error_memory_alloc);
return NULL;
*l_cell = (dap_chain_cell_t) {
.id = a_cell_id.uint64,
.map = l_map,
.map_pos = l_map,
.map_end = l_map ? l_map + l_size : NULL,
.file_storage = l_file,
.chain = a_chain,
.storage_rwlock = PTHREAD_RWLOCK_INITIALIZER
};
memcpy(l_cell->file_storage_path, file_storage_path, sizeof(file_storage_path));
if (l_map) {
l_cell->map_range_bounds = dap_list_append(l_cell->map_range_bounds, l_map);
l_cell->map_range_bounds = dap_list_append(l_cell->map_range_bounds, l_cell->map_end);
}
l_cell->chain = a_chain;
l_cell->id.uint64 = a_cell_id.uint64;
snprintf(l_cell->file_storage_path, MAX_PATH, "%s/%0"DAP_UINT64_FORMAT_x".dchaincell",
DAP_CHAIN_PVT(a_chain)->file_storage_dir, l_cell->id.uint64);
pthread_rwlock_init(&l_cell->storage_rwlock, NULL);
debug_if (s_debug_more && a_chain->is_mapped, L_DEBUG, "Mapped volume size is %lu", (size_t)(l_cell->map_end - l_cell->map));
HASH_ADD(hh, a_chain->cells, id, sizeof(dap_chain_cell_id_t), l_cell);
pthread_rwlock_unlock(&a_chain->cell_rwlock);
return l_cell;
......@@ -143,6 +187,16 @@ void dap_chain_cell_close(dap_chain_cell_t *a_cell)
fclose(a_cell->file_storage);
a_cell->file_storage = NULL;
}
if (a_cell->chain->is_mapped) {
for (dap_list_t *l_iter = a_cell->map_range_bounds; l_iter; l_iter = l_iter->next) {
if (l_iter->next) {
debug_if(s_debug_more, "Unmap volume %p (%lu bytes)", l_iter->data, (size_t)(l_iter->next->data - l_iter->data));
munmap(l_iter->data, (size_t)(l_iter->next->data - l_iter->data));
l_iter = l_iter->next;
}
}
dap_list_free(a_cell->map_range_bounds);
}
}
/**
......@@ -195,62 +249,79 @@ void dap_chain_cell_delete_all(dap_chain_t *a_chain) {
*/
int dap_chain_cell_load(dap_chain_t *a_chain, dap_chain_cell_t *a_cell)
{
int l_ret = 0;
FILE *l_cell_file = fopen(a_cell->file_storage_path, "rb");
if (!l_cell_file) {
log_it(L_WARNING,"Can't read chain \"%s\"", a_cell->file_storage_path);
if (!a_cell)
return -1;
fseek(a_cell->file_storage, 0, SEEK_END);
size_t l_size = ftell(a_cell->file_storage);
fseek(a_cell->file_storage, 0, SEEK_SET);
if ( l_size < sizeof(dap_chain_cell_file_header_t) || (a_chain->is_mapped && !a_cell->map_pos) ) {
log_it(L_INFO, "Chain cell \"%s\" is yet empty", a_cell->file_storage_path);
return -1;
}
dap_chain_cell_file_header_t l_hdr = { 0 };
if (fread(&l_hdr, 1, sizeof(l_hdr), l_cell_file) != sizeof (l_hdr)) {
log_it(L_ERROR,"Can't read chain header \"%s\"", a_cell->file_storage_path);
fclose(l_cell_file);
return -2;
int l_ret = 0;
dap_chain_cell_file_header_t *l_hdr = NULL;
if (a_chain->is_mapped) {
l_hdr = (dap_chain_cell_file_header_t*)a_cell->map;
} else {
l_hdr = DAP_NEW(dap_chain_cell_file_header_t);
if ( fread(l_hdr, 1, sizeof(*l_hdr), a_cell->file_storage) != sizeof(*l_hdr) ) {
log_it(L_ERROR,"Can't read chain header \"%s\"", a_cell->file_storage_path);
fclose(a_cell->file_storage);
DAP_DELETE(l_hdr);
return -2;
}
}
if (l_hdr.signature != DAP_CHAIN_CELL_FILE_SIGNATURE) {
if (l_hdr->signature != DAP_CHAIN_CELL_FILE_SIGNATURE) {
log_it(L_ERROR, "Wrong signature in chain \"%s\", possible file corrupt", a_cell->file_storage_path);
fclose(l_cell_file);
fclose(a_cell->file_storage);
if (!a_chain->is_mapped) DAP_DELETE(l_hdr);
return -3;
}
if (l_hdr.version < DAP_CHAIN_CELL_FILE_VERSION ){
if (l_hdr->version < DAP_CHAIN_CELL_FILE_VERSION ){
log_it(L_ERROR, "Too low chain version, backup files");
fclose(l_cell_file);
return -3;
fclose(a_cell->file_storage);
if (!a_chain->is_mapped) DAP_DELETE(l_hdr);
return -4;
}
unsigned long q = 0;
size_t l_read = 0;
uint64_t l_el_size = 0;
while ((l_read = fread(&l_el_size, 1, sizeof(l_el_size), l_cell_file)) && !feof(l_cell_file)) {
if (l_read != sizeof(l_el_size) || l_el_size == 0) {
log_it(L_ERROR, "Corrupted element size %zu, chain %s is damaged", l_el_size, a_cell->file_storage_path);
l_ret = -4;
break;
}
dap_chain_atom_ptr_t l_element = DAP_NEW_SIZE(dap_chain_atom_ptr_t, l_el_size);
if (!l_element) {
log_it(L_CRITICAL, "%s", g_error_memory_alloc);
l_ret = -5;
break;
uint64_t q = 0, l_full_size = 0;
if (a_chain->is_mapped) {
a_cell->map_pos = a_cell->map + sizeof(dap_chain_cell_file_header_t);
for (uint64_t l_el_size = 0; a_cell->map_pos < a_cell->map_end && ( l_el_size = *(uint64_t*)a_cell->map_pos ); ++q, a_cell->map_pos += l_el_size) {
a_chain->callback_atom_add(a_chain, (dap_chain_atom_ptr_t)(a_cell->map_pos += sizeof(uint64_t)), l_el_size);
}
l_read = fread((void *)l_element, 1, l_el_size, l_cell_file);
if (l_read != l_el_size) {
log_it(L_ERROR, "Read only %lu of %zu bytes, stop cell loading", l_read, l_el_size);
DAP_DELETE(l_element);
l_ret = -6;
break;
fseek(a_cell->file_storage, a_cell->map_pos - a_cell->map, SEEK_SET);
} else {
DAP_DELETE(l_hdr);
size_t l_read = 0;
uint64_t l_el_size = 0;
while ((l_read = fread(&l_el_size, 1, sizeof(l_el_size), a_cell->file_storage)) && !feof(a_cell->file_storage)) {
if (l_read != sizeof(l_el_size) || l_el_size == 0) {
log_it(L_ERROR, "Corrupted element size %zu, chain %s is damaged", l_el_size, a_cell->file_storage_path);
l_ret = -4;
break;
}
dap_chain_atom_ptr_t l_element = DAP_NEW_SIZE(dap_chain_atom_ptr_t, l_el_size);
if (!l_element) {
log_it(L_CRITICAL, "Memory allocation error");
l_ret = -5;
break;
}
l_full_size += sizeof(uint64_t) + ( l_read = fread((void*)l_element, 1, l_el_size, a_cell->file_storage) );
if (l_read != l_el_size) {
log_it(L_ERROR, "Read only %lu of %zu bytes, stop cell loading", l_read, l_el_size);
DAP_DELETE(l_element);
l_ret = -6;
break;
}
if ( a_chain->callback_atom_add(a_chain, l_element, l_el_size) != ATOM_ACCEPT )
DAP_DELETE(l_element);
++q;
}
a_chain->callback_atom_add(a_chain, l_element, l_el_size); // ??? blocking GDB call
DAP_DELETE(l_element);
++q;
fseek(a_cell->file_storage, l_full_size, SEEK_SET);
}
if (l_ret < 0) {
log_it(L_INFO, "Couldn't load all atoms, %lu only", q);
} else {
log_it(L_INFO, "Loaded all %lu atoms in cell %s", q, a_cell->file_storage_path);
}
fclose(l_cell_file);
log_it(L_INFO, "Loaded %lu atoms in cell %s", q, a_cell->file_storage_path);
return l_ret;
}
static int s_file_write_header(dap_chain_cell_t *a_cell)
......@@ -274,16 +345,18 @@ static int s_file_write_header(dap_chain_cell_t *a_cell)
.chain_id = { .uint64 = a_cell->id.uint64 },
.chain_net_id = a_cell->chain->net_id
};
if(fwrite(&l_hdr, sizeof(l_hdr), 1, a_cell->file_storage) == 1) {
log_it(L_NOTICE, "Initialized file storage for cell 0x%016"DAP_UINT64_FORMAT_X" ( %s )",
a_cell->id.uint64, a_cell->file_storage_path);
fflush(a_cell->file_storage);
if (a_cell->chain->is_mapped)
a_cell->map_pos = a_cell->map + sizeof(l_hdr);
return 0;
} else {
log_it(L_ERROR, "Can't init file storage for cell 0x%016"DAP_UINT64_FORMAT_X" ( %s )",
a_cell->id.uint64, a_cell->file_storage_path);
return -1;
}
log_it(L_ERROR, "Can't init file storage for cell 0x%016"DAP_UINT64_FORMAT_X" ( %s )",
a_cell->id.uint64, a_cell->file_storage_path);
return -1;
}
static int s_file_atom_add(dap_chain_cell_t *a_cell, dap_chain_atom_ptr_t a_atom, uint64_t a_atom_size)
......@@ -292,6 +365,34 @@ static int s_file_atom_add(dap_chain_cell_t *a_cell, dap_chain_atom_ptr_t a_atom
log_it(L_CRITICAL, "Invalid arguments");
return -1;
}
if (a_cell->chain->is_mapped) {
size_t l_pos = ftell(a_cell->file_storage);
debug_if (s_debug_more, "Before filling volume for atom size %lu, stream pos of %s is %lu, map pos is %lu, space left in map %lu",
a_atom_size, a_cell->file_storage_path, l_pos, (size_t)(a_cell->map_pos - a_cell->map), (size_t)(a_cell->map_end - a_cell->map_pos));
if ( a_atom_size > (size_t)(a_cell->map_end - a_cell->map_pos) ) {
size_t l_map_size = dap_page_roundup(DAP_MAPPED_VOLUME_LIMIT),
l_volume_start = dap_page_rounddown(l_pos),
l_offset = l_pos - l_volume_start;
debug_if (s_debug_more, "Need to enlarge map of %s, current stream pos is %lu, map pos is %lu, offset of new map is %lu",
a_cell->file_storage_path, ftell(a_cell->file_storage), (size_t)(a_cell->map_end - a_cell->map_pos), l_offset);
if ( MAP_FAILED == (a_cell->map = mmap(NULL, l_map_size, PROT_READ|PROT_WRITE,
MAP_PRIVATE, fileno(a_cell->file_storage), l_volume_start)) )
{
log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X" cannot be mapped, errno %d",
a_cell->file_storage_path, a_cell->id.uint64, errno);
fclose(a_cell->file_storage);
return -2;
}
a_cell->map_pos = a_cell->map + l_offset;
a_cell->map_range_bounds = dap_list_append(a_cell->map_range_bounds, a_cell->map);
a_cell->map_range_bounds = dap_list_append(a_cell->map_range_bounds, a_cell->map_end = a_cell->map + l_map_size);
}
}
debug_if (s_debug_more && a_cell->chain->is_mapped, "Before writing an atom of size %lu, stream pos of %s is %lu and pos is %lu, space left in map %lu",
a_atom_size, a_cell->file_storage_path, ftell(a_cell->file_storage),
(size_t)(a_cell->map_pos - a_cell->map), (size_t)(a_cell->map_end - a_cell->map_pos));
if (fwrite(&a_atom_size, sizeof(a_atom_size), 1, a_cell->file_storage) != 1) {
log_it (L_ERROR, "Can't write atom data size from cell 0x%016"DAP_UINT64_FORMAT_X" in \"%s\"",
a_cell->id.uint64,
......@@ -300,10 +401,13 @@ static int s_file_atom_add(dap_chain_cell_t *a_cell, dap_chain_atom_ptr_t a_atom
}
if (fwrite(a_atom, a_atom_size, 1, a_cell->file_storage) != 1) {
log_it (L_ERROR, "Can't write data from cell 0x%016"DAP_UINT64_FORMAT_X" to the file \"%s\"",
a_cell->id.uint64,
a_cell->file_storage_path);
a_cell->id.uint64,
a_cell->file_storage_path);
return -3;
}
debug_if (s_debug_more && a_cell->chain->is_mapped, "After writing an atom of size %lu, stream pos of %s is %lu and map shift is %lu",
a_atom_size, a_cell->file_storage_path, ftell(a_cell->file_storage),
(size_t)(a_cell->map_pos - a_cell->map));
return 0;
}
......@@ -332,21 +436,19 @@ ssize_t dap_chain_cell_file_append(dap_chain_cell_t *a_cell, const void *a_atom,
bool l_err = false;
pthread_rwlock_wrlock(&a_cell->storage_rwlock);
if (!a_atom || !a_atom_size) {
a_cell->file_storage = a_cell->file_storage
? freopen(a_cell->file_storage_path, "w+b", a_cell->file_storage)
: fopen(a_cell->file_storage_path, "w+b");
if (!a_cell->file_storage) {
log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X" cannot be opened",
a_cell->file_storage_path,
a_cell->id.uint64);
pthread_rwlock_unlock(&a_cell->storage_rwlock);
return -3;
a_cell->file_storage = freopen(a_cell->file_storage_path, "w+b", a_cell->file_storage);
debug_if (s_debug_more, "Rewinding file %s", a_cell->file_storage_path);
if (a_cell->chain->is_mapped && a_cell->map_range_bounds) {
a_cell->map = a_cell->map_pos = a_cell->map_range_bounds->data;
a_cell->map_end = a_cell->map_range_bounds->next->data;
}
if (s_file_write_header(a_cell)) {
log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X": can't fill header", a_cell->file_storage_path, a_cell->id.uint64);
if ( s_file_write_header(a_cell) ) {
log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X": can't fill header",
a_cell->file_storage_path, a_cell->id.uint64);
pthread_rwlock_unlock(&a_cell->storage_rwlock);
return -4;
return -2;
}
l_total_res += sizeof(dap_chain_cell_file_header_t);
dap_chain_atom_iter_t *l_atom_iter = a_cell->chain->callback_atom_iter_create(a_cell->chain, a_cell->id, NULL);
dap_chain_atom_ptr_t l_atom;
uint64_t l_atom_size = 0;
......@@ -363,25 +465,18 @@ ssize_t dap_chain_cell_file_append(dap_chain_cell_t *a_cell, const void *a_atom,
}
}
a_cell->chain->callback_atom_iter_delete(l_atom_iter);
a_cell->file_storage = freopen(a_cell->file_storage_path, "a+b", a_cell->file_storage);
debug_if (s_debug_more && a_cell->chain->is_mapped, "After rewriting file %s, stream pos is %lu and map pos is %lu",
a_cell->file_storage_path, ftell(a_cell->file_storage),
(size_t)(a_cell->map_pos - a_cell->map));
} else {
if (!a_cell->file_storage)
a_cell->file_storage = fopen(a_cell->file_storage_path, "a+b");
if (!a_cell->file_storage) {
log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X" cannot be opened",
a_cell->file_storage_path,
a_cell->id.uint64);
debug_if (s_debug_more && a_cell->chain->is_mapped, "Before appending an atom of size %lu, stream pos of %s is %lu, map pos is %lu",
a_atom_size, a_cell->file_storage_path, ftell(a_cell->file_storage),
(size_t)(a_cell->map_pos - a_cell->map));
if ( !ftell(a_cell->file_storage) && s_file_write_header(a_cell) ) {
log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X": can't fill header",
a_cell->file_storage_path, a_cell->id.uint64);
pthread_rwlock_unlock(&a_cell->storage_rwlock);
return -3;
} else {
fseek(a_cell->file_storage, 0L, SEEK_END);
if (!ftell(a_cell->file_storage)) { // It's not garunteed that header has been yet added or not, regardless the descriptor validity
if (s_file_write_header(a_cell)) {
log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X": can't fill header", a_cell->file_storage_path, a_cell->id.uint64);
pthread_rwlock_unlock(&a_cell->storage_rwlock);
return -4;
}
}
}
if (s_file_atom_add(a_cell, a_atom, a_atom_size)) {
log_it(L_ERROR, "Chain cell \"%s\" 0x%016"DAP_UINT64_FORMAT_X": can't save atom!",
......@@ -389,6 +484,9 @@ ssize_t dap_chain_cell_file_append(dap_chain_cell_t *a_cell, const void *a_atom,
pthread_rwlock_unlock(&a_cell->storage_rwlock);
return -4;
}
debug_if (s_debug_more && a_cell->chain->is_mapped, "After appending an atom of size %lu, stream pos of %s is %lu, map pos is %lu",
a_atom_size, a_cell->file_storage_path, ftell(a_cell->file_storage),
(size_t)(a_cell->map_end - a_cell->map_pos));
++l_count;
l_total_res = a_atom_size + sizeof(uint64_t);
}
......
......@@ -636,10 +636,7 @@ static bool s_sync_in_chains_callback(void *a_arg)
break;
case ATOM_ACCEPT:
debug_if(s_debug_more, L_INFO, "Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name);
if (dap_chain_atom_save(l_chain->cells, l_atom, l_atom_size, NULL) < 0)
log_it(L_ERROR, "Can't save atom %s to the file", l_atom_hash_str);
else
l_ack_send = true;
l_ack_send = true;
break;
case ATOM_REJECT: {
debug_if(s_debug_more, L_WARNING, "Atom with hash %s for %s:%s rejected", l_atom_hash_str, l_chain->net_name, l_chain->name);
......@@ -656,10 +653,7 @@ static bool s_sync_in_chains_callback(void *a_arg)
dap_stream_ch_pkt_send_by_addr(&l_args->addr, DAP_CHAIN_CH_ID, DAP_CHAIN_CH_PKT_TYPE_CHAIN_ACK, l_pkt, dap_chain_ch_pkt_get_size(l_pkt));
DAP_DELETE(l_pkt);
debug_if(s_debug_more, L_DEBUG, "Out: CHAIN_ACK %s for net %s to destination " NODE_ADDR_FP_STR " with num %" DAP_UINT64_FORMAT_U,
l_chain ? l_chain->name : "(null)",
l_chain ? l_chain->net_name : "(null)",
NODE_ADDR_FP_ARGS_S(l_args->addr),
l_ack_num);
l_chain->name, l_chain->net_name, NODE_ADDR_FP_ARGS_S(l_args->addr), l_ack_num);
}
DAP_DELETE(l_args);
return false;
......
......@@ -138,7 +138,7 @@ typedef struct dap_chain {
char *name;
char *net_name;
bool is_datum_pool_proc;
bool is_mapped;
// Nested cells (hashtab by cell_id)
dap_chain_cell_t *cells;
dap_chain_cell_id_t active_cell_id;
......@@ -252,6 +252,8 @@ dap_chain_t *dap_chain_load_from_cfg(const char *a_chain_net_name, dap_chain_net
void dap_chain_delete(dap_chain_t * a_chain);
void dap_chain_add_callback_notify(dap_chain_t * a_chain, dap_chain_callback_notify_t a_callback, void * a_arg);
void dap_chain_atom_notify(dap_chain_cell_t *a_chain_cell, const uint8_t *a_atom, size_t a_atom_size);
void dap_chain_atom_add_from_threshold(dap_chain_t *a_chain);
dap_chain_atom_ptr_t dap_chain_get_atom_by_hash(dap_chain_t * a_chain, dap_chain_hash_fast_t * a_atom_hash, size_t * a_atom_size);
bool dap_chain_get_atom_last_hash_num(dap_chain_t *a_chain, dap_chain_cell_id_t a_cell_id, dap_hash_fast_t *a_atom_hash, uint64_t *a_atom_num);
DAP_STATIC_INLINE bool dap_chain_get_atom_last_hash(dap_chain_t *a_chain, dap_chain_cell_id_t a_cell_id, dap_hash_fast_t *a_atom_hash)
......
......@@ -34,10 +34,11 @@ typedef struct dap_chain_cell {
dap_chain_t * chain;
char file_storage_path[MAX_PATH];
FILE * file_storage; /// @param file_cache @brief Cache for raw blocks
uint8_t file_storage_type; /// @param file_storage_type @brief Is file_storage is raw, compressed or smth else
char *map, *map_pos, *map_end;
FILE *file_storage;
uint8_t file_storage_type;
dap_list_t *map_range_bounds;
pthread_rwlock_t storage_rwlock;
UT_hash_handle hh;
} dap_chain_cell_t;
......
......@@ -614,19 +614,15 @@ static bool s_callback_round_event_to_chain_callback_get_round_item(dap_global_d
if (l_chosen_item) {
size_t l_event_size = l_chosen_item->event_size;
dap_chain_cs_dag_event_t *l_new_atom = (dap_chain_cs_dag_event_t *)l_chosen_item->event_n_signs;
dap_hash_fast_t l_event_hash;
dap_hash_fast(l_new_atom, l_event_size, &l_event_hash);
char *l_event_hash_hex_str = DAP_NEW_STACK_SIZE(char, DAP_CHAIN_HASH_FAST_STR_SIZE);
dap_chain_hash_fast_to_str(&l_event_hash, l_event_hash_hex_str, DAP_CHAIN_HASH_FAST_STR_SIZE);
char *l_event_hash_hex_str;
dap_get_data_hash_str_static(l_new_atom, l_event_size, l_event_hash_hex_str);
dap_chain_datum_t *l_datum = dap_chain_cs_dag_event_get_datum(l_new_atom, l_event_size);
l_dag->round_completed = dap_max(l_new_atom->header.round_id, l_dag->round_current);
int l_verify_datum = dap_chain_net_verify_datum_for_add(l_dag->chain, l_datum, &l_chosen_item->round_info.datum_hash);
if (!l_verify_datum) {
dap_chain_atom_verify_res_t l_res = l_dag->chain->callback_atom_add(l_dag->chain, l_new_atom, l_event_size);
if (l_res == ATOM_ACCEPT) {
dap_chain_atom_save(l_dag->chain->cells, (dap_chain_atom_ptr_t)l_new_atom, l_event_size, &l_event_hash);
if (l_res == ATOM_ACCEPT)
s_poa_round_clean(l_dag->chain);
}
log_it(L_INFO, "Event %s from round %"DAP_UINT64_FORMAT_U" %s",
l_event_hash_hex_str, l_round_id, dap_chain_atom_verify_res_str[l_res]);
} else {
......
......@@ -378,13 +378,6 @@ void dap_chain_esbocs_add_block_collect(dap_chain_block_cache_t *a_block_cache,
if (a_type == DAP_CHAIN_BLOCK_COLLECT_BOTH || a_type == DAP_CHAIN_BLOCK_COLLECT_FEES) {
dap_sign_t *l_sign = dap_chain_block_sign_get(a_block_cache->block, a_block_cache->block_size, 0);
if (dap_pkey_match_sign(a_block_collect_params->block_sign_pkey, l_sign)) {
dap_chain_esbocs_block_collect_t *l_block_collect_params = DAP_NEW_Z(dap_chain_esbocs_block_collect_t);
l_block_collect_params->collecting_level = a_block_collect_params->collecting_level;
l_block_collect_params->minimum_fee = a_block_collect_params->minimum_fee;
l_block_collect_params->chain = a_block_collect_params->chain;
l_block_collect_params->blocks_sign_key = a_block_collect_params->blocks_sign_key;
l_block_collect_params->block_sign_pkey = a_block_collect_params->block_sign_pkey;
l_block_collect_params->collecting_addr = a_block_collect_params->collecting_addr;
dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain->net_id);
assert(l_net);
uint256_t l_value_fee = uint256_0;
......@@ -393,7 +386,7 @@ void dap_chain_esbocs_add_block_collect(dap_chain_block_cache_t *a_block_cache,
if (!IS_ZERO_256(l_value_fee)) {
char *l_fee_group = dap_chain_cs_blocks_get_fee_group(l_chain->net_name);
dap_global_db_set(l_fee_group, a_block_cache->block_hash_str, &l_value_fee, sizeof(l_value_fee),
false, s_check_db_collect_callback, l_block_collect_params);
false, s_check_db_collect_callback, DAP_DUP(a_block_collect_params));
DAP_DELETE(l_fee_group);
}
dap_list_free_full(l_list_used_out, NULL);
......@@ -403,23 +396,16 @@ void dap_chain_esbocs_add_block_collect(dap_chain_block_cache_t *a_block_cache,
return;
if (dap_chain_block_sign_match_pkey(a_block_cache->block, a_block_cache->block_size,
a_block_collect_params->block_sign_pkey)) {
dap_chain_esbocs_block_collect_t *l_block_collect_params = DAP_NEW_Z(dap_chain_esbocs_block_collect_t);
l_block_collect_params->collecting_level = a_block_collect_params->collecting_level;
l_block_collect_params->minimum_fee = a_block_collect_params->minimum_fee;
l_block_collect_params->chain = a_block_collect_params->chain;
l_block_collect_params->blocks_sign_key = a_block_collect_params->blocks_sign_key;
l_block_collect_params->block_sign_pkey = a_block_collect_params->block_sign_pkey;
l_block_collect_params->collecting_addr = a_block_collect_params->collecting_addr;
dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain->net_id);
assert(l_net);
if (!dap_ledger_is_used_reward(l_net->pub.ledger, &a_block_cache->block_hash,
&l_block_collect_params->collecting_addr->data.hash_fast)) {
&a_block_collect_params->collecting_addr->data.hash_fast)) {
uint256_t l_value_reward = l_chain->callback_calc_reward(l_chain, &a_block_cache->block_hash,
l_block_collect_params->block_sign_pkey);
a_block_collect_params->block_sign_pkey);
if (!IS_ZERO_256(l_value_reward)) {
char *l_reward_group = dap_chain_cs_blocks_get_reward_group(l_chain->net_name);
dap_global_db_set(l_reward_group, a_block_cache->block_hash_str, &l_value_reward, sizeof(l_value_reward),
false, s_check_db_collect_callback, l_block_collect_params);
false, s_check_db_collect_callback, DAP_DUP(a_block_collect_params));
DAP_DELETE(l_reward_group);
}
}
......@@ -427,7 +413,7 @@ void dap_chain_esbocs_add_block_collect(dap_chain_block_cache_t *a_block_cache,
}
static void s_new_atom_notifier(void *a_arg, dap_chain_t *a_chain, dap_chain_cell_id_t a_id,
void *a_atom, size_t a_atom_size)
void *a_atom, dap_chain_hash_fast_t *a_atom_hash, size_t a_atom_size)
{
dap_chain_esbocs_session_t *l_session = a_arg;
assert(l_session->chain == a_chain);
......@@ -448,7 +434,7 @@ static void s_new_atom_notifier(void *a_arg, dap_chain_t *a_chain, dap_chain_cel
.collecting_addr = PVT(l_session->esbocs)->collecting_addr,
.cell_id = a_id
};
dap_chain_block_cache_t *l_block_cache = dap_chain_block_cache_get_by_hash(DAP_CHAIN_CS_BLOCKS(a_chain), &l_last_block_hash);
dap_chain_block_cache_t *l_block_cache = dap_chain_block_cache_get_by_hash(DAP_CHAIN_CS_BLOCKS(a_chain), a_atom_hash);
dap_chain_esbocs_add_block_collect(l_block_cache, &l_block_collect_params, DAP_CHAIN_BLOCK_COLLECT_BOTH);
}
......@@ -1700,14 +1686,8 @@ static bool s_session_candidate_to_chain(dap_chain_esbocs_session_t *a_session,
const char *l_candidate_hash_str = dap_chain_hash_fast_to_str_static(a_candidate_hash);
switch (l_res) {
case ATOM_ACCEPT:
// block save to chain
if (dap_chain_atom_save(a_session->chain->cells, (uint8_t *)a_candidate, a_candidate_size, a_candidate_hash) < 0)
log_it(L_ERROR, "Can't save atom %s to the file", l_candidate_hash_str);
else
{
log_it(L_INFO, "block %s added in chain successfully", l_candidate_hash_str);
res = true;
}
log_it(L_INFO, "block %s added in chain successfully", l_candidate_hash_str);
res = true;
break;
case ATOM_MOVE_TO_THRESHOLD:
log_it(L_INFO, "Thresholded atom with hash %s", l_candidate_hash_str);
......
......@@ -322,10 +322,7 @@ typedef struct dap_ledger_private {
bool load_end;
// Ledger flags
bool check_ds;
bool check_cells_ds;
bool check_token_emission;
bool cached;
bool check_ds, check_cells_ds, check_token_emission, cached, mapped;
dap_chain_cell_id_t local_cell_id;
......@@ -584,6 +581,7 @@ static dap_ledger_t * dap_ledger_handle_new(void)
(dap_timer_callback_t)s_threshold_txs_free, l_ledger);
l_ledger_pvt->threshold_emissions_free_timer = dap_interval_timer_create(s_threshold_free_timer_tick,
(dap_timer_callback_t) s_threshold_emission_free, l_ledger);
l_ledger_pvt->mapped = dap_config_get_item_bool_default(g_config, "ledger", "mapped", true);
return l_ledger;
}
......@@ -1317,12 +1315,12 @@ int dap_ledger_token_add(dap_ledger_t *a_ledger, dap_chain_datum_token_t *a_toke
DAP_DELETE(l_token_item->datum_token);
} else {
log_it(L_ERROR, "Token with ticker '%s' update check failed", l_token->ticker);
DAP_DEL_Z(l_token);
DAP_DELETE(l_token);
return -2;
}
} else if (l_token->type == DAP_CHAIN_DATUM_TOKEN_TYPE_UPDATE) {
log_it(L_WARNING, "Token with ticker '%s' does not yet exist, declare it first", l_token->ticker);
DAP_DEL_Z(l_token);
DAP_DELETE(l_token);
return -6;
}
......@@ -1331,12 +1329,12 @@ int dap_ledger_token_add(dap_ledger_t *a_ledger, dap_chain_datum_token_t *a_toke
dap_sign_t **l_signs = dap_chain_datum_token_signs_parse(l_token, a_token_size, &l_auth_signs_total, &l_auth_signs_valid);
if (!l_signs || !l_auth_signs_total) {
log_it(L_ERROR, "No auth signs in token '%s' datum!", l_token->ticker);
DAP_DEL_Z(l_token);
DAP_DELETE(l_token);
return -7;
}
l_token_item = DAP_NEW_Z(dap_ledger_token_item_t);
if ( !l_token_item ) {
DAP_DEL_Z(l_token);
DAP_DELETE(l_token);
log_it(L_CRITICAL, "%s", g_error_memory_alloc);
return -8;
}
......@@ -1355,15 +1353,13 @@ int dap_ledger_token_add(dap_ledger_t *a_ledger, dap_chain_datum_token_t *a_toke
.description_token_size = 0
};
if ( !l_token_item->auth_pkeys ) {
if (l_token)
DAP_DELETE(l_token);
DAP_DELETE(l_token);
DAP_DELETE(l_token_item);
log_it(L_CRITICAL, "%s", g_error_memory_alloc);
return -6;
};
if ( !l_token_item->auth_pkeys ) {
if (l_token)
DAP_DELETE(l_token);
DAP_DELETE(l_token);
DAP_DEL_Z(l_token_item->auth_pkeys);
DAP_DELETE(l_token_item);
log_it(L_CRITICAL, "%s", g_error_memory_alloc);
......@@ -2409,6 +2405,7 @@ static void s_threshold_txs_proc( dap_ledger_t *a_ledger)
if (l_res != DAP_CHAIN_CS_VERIFY_CODE_TX_NO_EMISSION &&
l_res != DAP_CHAIN_CS_VERIFY_CODE_TX_NO_PREVIOUS) {
HASH_DEL(l_ledger_pvt->threshold_txs, l_tx_item);
if ( !l_ledger_pvt->mapped )
DAP_DELETE(l_tx_item->tx);
DAP_DELETE(l_tx_item);
l_success = true;
......@@ -2433,7 +2430,8 @@ static void s_threshold_txs_free(dap_ledger_t *a_ledger){
HASH_DEL(l_pvt->threshold_txs, l_current);
char l_tx_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE];
dap_chain_hash_fast_to_str(&l_current->tx_hash_fast, l_tx_hash_str, sizeof(l_tx_hash_str));
DAP_DELETE(l_current->tx);
if ( !l_pvt->mapped )
DAP_DELETE(l_current->tx);
DAP_DELETE(l_current);
log_it(L_NOTICE, "Removed transaction %s form threshold ledger", l_tx_hash_str);
}
......@@ -4832,7 +4830,7 @@ int dap_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, dap_ha
return -1;
}
l_item_tmp->tx_hash_fast = *a_tx_hash;
l_item_tmp->tx = DAP_DUP_SIZE(a_tx, dap_chain_datum_tx_get_size(a_tx));
l_item_tmp->tx = l_ledger_pvt->mapped ? a_tx : DAP_DUP_SIZE(a_tx, dap_chain_datum_tx_get_size(a_tx));
if ( !l_item_tmp->tx ) {
DAP_DELETE(l_item_tmp);
log_it(L_CRITICAL, "%s", g_error_memory_alloc);
......@@ -5100,7 +5098,7 @@ int dap_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, dap_ha
}
l_tx_item->tx_hash_fast = *a_tx_hash;
size_t l_tx_size = dap_chain_datum_tx_get_size(a_tx);
l_tx_item->tx = DAP_DUP_SIZE(a_tx, l_tx_size);
l_tx_item->tx = l_ledger_pvt->mapped ? a_tx : DAP_DUP_SIZE(a_tx, l_tx_size);
l_tx_item->cache_data.ts_created = dap_time_now(); // Time of transasction added to ledger
int l_outs_count = 0;
dap_list_t *l_list_tmp = dap_chain_datum_tx_items_get(a_tx, TX_ITEM_TYPE_OUT_ALL, &l_outs_count);
......@@ -5306,7 +5304,8 @@ void dap_ledger_purge(dap_ledger_t *a_ledger, bool a_preserve_db)
/* Delete threshold transactions */
HASH_ITER(hh, l_ledger_pvt->threshold_txs, l_item_current, l_item_tmp) {
HASH_DEL(l_ledger_pvt->threshold_txs, l_item_current);
DAP_DELETE(l_item_current->tx);
if (!l_ledger_pvt->mapped)
DAP_DELETE(l_item_current->tx);
DAP_DEL_Z(l_item_current);
}
......
......@@ -889,7 +889,6 @@ void dap_chain_net_purge(dap_chain_net_t *l_net)
{
dap_ledger_purge(l_net->pub.ledger, false);
dap_chain_net_srv_stake_purge(l_net);
dap_chain_net_decree_purge(l_net);
dap_chain_t *l_chain = NULL;
DL_FOREACH(l_net->pub.chains, l_chain) {
if (l_chain->callback_purge)
......@@ -898,6 +897,7 @@ void dap_chain_net_purge(dap_chain_net_t *l_net)
dap_chain_esbocs_set_min_validators_count(l_chain, 0);
l_net->pub.fee_value = uint256_0;
l_net->pub.fee_addr = c_dap_chain_addr_blank;
dap_chain_net_decree_purge(l_net);
dap_chain_load_all(l_chain);
}
DL_FOREACH(l_net->pub.chains, l_chain) {
......
......@@ -50,10 +50,9 @@ static int s_anchor_verify(dap_chain_net_t *a_net, dap_chain_datum_anchor_t *a_a
return -121;
}
int ret_val = 0;
dap_chain_datum_anchor_t *l_anchor = a_anchor;
size_t l_signs_size = l_anchor->header.signs_size;
size_t l_signs_size = a_anchor->header.signs_size;
//multiple signs reading from datum
dap_sign_t *l_signs_block = (dap_sign_t *)((byte_t*)l_anchor->data_n_sign + l_anchor->header.data_size);
dap_sign_t *l_signs_block = (dap_sign_t *)((byte_t*)a_anchor->data_n_sign + a_anchor->header.data_size);
if (!l_signs_size || !l_signs_block) {
log_it(L_WARNING, "Anchor data sign not found");
......@@ -73,26 +72,26 @@ static int s_anchor_verify(dap_chain_net_t *a_net, dap_chain_datum_anchor_t *a_a
return -106;
}
bool l_sign_authorized = false;
size_t l_signs_size_original = l_anchor->header.signs_size;
l_anchor->header.signs_size = 0;
size_t l_signs_size_original = a_anchor->header.signs_size;
a_anchor->header.signs_size = 0;
for (size_t i = 0; i < l_num_of_unique_signs; i++) {
for (dap_list_t *it = a_net->pub.decree->pkeys; it; it = it->next) {
if (dap_pkey_compare_with_sign(it->data, l_unique_signs[i])) {
// TODO make signs verification in s_concate_all_signs_in_array to correctly header.signs_size calculation
size_t l_verify_data_size = l_anchor->header.data_size + sizeof(dap_chain_datum_anchor_t);
if (dap_sign_verify_all(l_unique_signs[i], l_signs_size_original, l_anchor, l_verify_data_size))
size_t l_verify_data_size = a_anchor->header.data_size + sizeof(dap_chain_datum_anchor_t);
if (dap_sign_verify_all(l_unique_signs[i], l_signs_size_original, a_anchor, l_verify_data_size))
continue;
l_sign_authorized = true;
break;
}
}
l_anchor->header.signs_size += dap_sign_get_size(l_unique_signs[i]);
a_anchor->header.signs_size += dap_sign_get_size(l_unique_signs[i]);
if (l_sign_authorized)
break;
}
DAP_DELETE(l_signs_arr);
DAP_DELETE(l_unique_signs);
l_anchor->header.signs_size = l_signs_size_original;
a_anchor->header.signs_size = l_signs_size_original;
if (!l_sign_authorized) {
log_it(L_WARNING, "Anchor signs verify failed");
......@@ -101,7 +100,7 @@ static int s_anchor_verify(dap_chain_net_t *a_net, dap_chain_datum_anchor_t *a_a
dap_hash_fast_t l_decree_hash = {};
dap_chain_datum_decree_t *l_decree = NULL;
if ((ret_val = dap_chain_datum_anchor_get_hash_from_data(l_anchor, &l_decree_hash)) != 0) {
if ((ret_val = dap_chain_datum_anchor_get_hash_from_data(a_anchor, &l_decree_hash)) != 0) {
log_it(L_WARNING, "Can't get hash from anchor data");
return -106;
}
......
......@@ -105,7 +105,8 @@ int dap_chain_net_decree_deinit(dap_chain_net_t *a_net)
struct decree_hh *l_decree_hh, *l_tmp;
HASH_ITER(hh, s_decree_hh, l_decree_hh, l_tmp) {
HASH_DEL(s_decree_hh, l_decree_hh);
DAP_DELETE(l_decree_hh->decree);
if ( !dap_chain_find_by_id(l_decree_hh->decree->header.common_decree_params.net_id, l_decree_hh->decree->header.common_decree_params.chain_id)->is_mapped )
DAP_DELETE(l_decree_hh->decree);
DAP_DELETE(l_decree_hh);
}
return 0;
......@@ -139,12 +140,10 @@ static int s_decree_verify(dap_chain_net_t *a_net, dap_chain_datum_decree_t *a_d
return -123;
}
dap_chain_datum_decree_t *l_decree = a_decree;
// Get pkeys sign from decree datum
size_t l_signs_size = 0;
//multiple signs reading from datum
dap_sign_t *l_signs_block = dap_chain_datum_decree_get_signs(l_decree, &l_signs_size);
dap_sign_t *l_signs_block = dap_chain_datum_decree_get_signs(a_decree, &l_signs_size);
if (!l_signs_size || !l_signs_block)
{
log_it(L_WARNING, "Decree data sign not found");
......@@ -168,14 +167,14 @@ static int s_decree_verify(dap_chain_net_t *a_net, dap_chain_datum_decree_t *a_d
// Verify all keys and its signatures
uint16_t l_signs_size_for_current_sign = 0, l_signs_verify_counter = 0;
l_decree->header.signs_size = 0;
size_t l_verify_data_size = l_decree->header.data_size + sizeof(dap_chain_datum_decree_t);
a_decree->header.signs_size = 0;
size_t l_verify_data_size = a_decree->header.data_size + sizeof(dap_chain_datum_decree_t);
for (size_t i = 0; i < l_num_of_unique_signs; i++) {
size_t l_sign_max_size = dap_sign_get_size(l_unique_signs[i]);
if (s_verify_pkey(l_unique_signs[i], a_net)) {
// 3. verify sign
if(!dap_sign_verify_all(l_unique_signs[i], l_sign_max_size, l_decree, l_verify_data_size))
if(!dap_sign_verify_all(l_unique_signs[i], l_sign_max_size, a_decree, l_verify_data_size))
l_signs_verify_counter++;
} else {
dap_hash_fast_t l_sign_pkey_hash = {0};
......@@ -188,10 +187,10 @@ static int s_decree_verify(dap_chain_net_t *a_net, dap_chain_datum_decree_t *a_d
}
// Each sign change the sign_size field by adding its size after signing. So we need to change this field in header for each sign.
l_signs_size_for_current_sign += l_sign_max_size;
l_decree->header.signs_size = l_signs_size_for_current_sign;
a_decree->header.signs_size = l_signs_size_for_current_sign;
}
l_decree->header.signs_size = l_signs_size;
a_decree->header.signs_size = l_signs_size;
// DAP_DELETE(l_signs_arr);
DAP_DELETE(l_unique_signs);
......@@ -274,9 +273,8 @@ int dap_chain_net_decree_apply(dap_hash_fast_t *a_decree_hash, dap_chain_datum_d
log_it(L_WARNING, "Decree with hash %s is already present", dap_hash_fast_to_str_static(a_decree_hash));
return -123;
}
l_decree_hh->decree = DAP_DUP_SIZE(a_decree, dap_chain_datum_decree_get_size(a_decree));
if (a_decree->header.common_decree_params.chain_id.uint64 != a_chain->id.uint64 &&
!l_decree_hh->wait_for_apply)
l_decree_hh->decree = a_chain->is_mapped ? a_decree : DAP_DUP_SIZE(a_decree, dap_chain_datum_decree_get_size(a_decree));
if (a_decree->header.common_decree_params.chain_id.uint64 != a_chain->id.uint64 && !l_decree_hh->wait_for_apply)
// Apply it with corresponding anchor
return ret_val;
}
......
......@@ -54,7 +54,7 @@ void dap_chain_block_cache_deinit()
*/
dap_chain_block_cache_t *dap_chain_block_cache_new(dap_hash_fast_t *a_block_hash, dap_chain_block_t *a_block,
size_t a_block_size, uint64_t a_block_number)
size_t a_block_size, uint64_t a_block_number, bool a_copy_block)
{
if (! a_block)
return NULL;
......@@ -64,7 +64,7 @@ dap_chain_block_cache_t *dap_chain_block_cache_new(dap_hash_fast_t *a_block_hash
log_it(L_CRITICAL, "%s", g_error_memory_alloc);
return NULL;
}
l_block_cache->block = DAP_DUP_SIZE(a_block, a_block_size);
l_block_cache->block = a_copy_block ? DAP_DUP_SIZE(a_block, a_block_size) : a_block;
if (!l_block_cache->block) {
log_it(L_CRITICAL, "%s", g_error_memory_alloc);
return NULL;
......@@ -75,10 +75,11 @@ dap_chain_block_cache_t *dap_chain_block_cache_new(dap_hash_fast_t *a_block_hash
l_block_cache->sign_count = dap_chain_block_get_signs_count(a_block, a_block_size);
if (dap_chain_block_cache_update(l_block_cache, a_block_hash)) {
log_it(L_WARNING, "Block cache can't be created, possible cause corrupted block inside");
if (a_copy_block)
DAP_DELETE(l_block_cache->block);
DAP_DELETE(l_block_cache);
return NULL;
}
//log_it(L_DEBUG,"Block cache created");
return l_block_cache;
}
......
......@@ -49,7 +49,7 @@ dap_chain_block_chunks_t * dap_chain_block_chunks_create(dap_chain_cs_blocks_t *
for(size_t n=0; n< l_objs_count; n++){
dap_hash_fast_t l_block_hash;
dap_chain_hash_fast_from_str(l_objs[n].key, &l_block_hash);
dap_chain_block_cache_t *l_block_cache = dap_chain_block_cache_new(&l_block_hash, (dap_chain_block_t *)l_objs[n].value, l_objs[n].value_len, n + 1);
dap_chain_block_cache_t *l_block_cache = dap_chain_block_cache_new(&l_block_hash, (dap_chain_block_t *)l_objs[n].value, l_objs[n].value_len, n + 1, true);
dap_chain_block_chunks_add(l_ret, l_block_cache );
}
dap_global_db_objs_delete(l_objs,l_objs_count);
......
......@@ -1522,15 +1522,35 @@ static dap_chain_atom_verify_res_t s_callback_atom_add(dap_chain_t * a_chain, da
}
dap_chain_atom_verify_res_t ret = s_callback_atom_verify(a_chain, a_atom, a_atom_size);
switch (ret) {
case ATOM_ACCEPT:
l_block_cache = dap_chain_block_cache_new(&l_block_hash, l_block, l_block_size, PVT(l_blocks)->blocks_count + 1);
if (!l_block_cache)
break;
case ATOM_ACCEPT: {
dap_chain_cell_t *l_cell = dap_chain_cell_find_by_id(a_chain, l_block->hdr.cell_id);
if ( !dap_chain_net_get_load_mode( dap_chain_net_by_id(a_chain->net_id)) ) {
if ( (ret = dap_chain_atom_save(l_cell, a_atom, a_atom_size, &l_block_hash)) < 0 ) {
log_it(L_ERROR, "Can't save atom to file, code %d", ret);
pthread_rwlock_unlock(&PVT(l_blocks)->rwlock);
return ATOM_REJECT;
} else if (a_chain->is_mapped) {
l_block = (dap_chain_block_t*)( l_cell->map_pos += sizeof(uint64_t) ); // Switching to mapped area
l_cell->map_pos += a_atom_size;
}
ret = ATOM_PASS;
}
if ( !(l_block_cache = dap_chain_block_cache_new(&l_block_hash, l_block, l_block_size,
PVT(l_blocks)->blocks_count + 1, !a_chain->is_mapped)) )
{
log_it(L_DEBUG, "... corrupted block");
pthread_rwlock_unlock(&PVT(l_blocks)->rwlock);
return ATOM_REJECT;
}
debug_if(s_debug_more, L_DEBUG, "... new block %s", l_block_cache->block_hash_str);
HASH_ADD(hh, PVT(l_blocks)->blocks, block_hash, sizeof(l_block_cache->block_hash), l_block_cache);
++PVT(l_blocks)->blocks_count;
debug_if(s_debug_more, L_DEBUG, "Verified atom %p: ACCEPTED", a_atom);
s_add_atom_datums(l_blocks, l_block_cache);
dap_chain_atom_notify(l_cell, l_block, a_atom_size);
dap_chain_atom_add_from_threshold(a_chain);
break;
}
case ATOM_MOVE_TO_THRESHOLD:
// TODO: reimplement and enable threshold for blocks
/* {
......@@ -1547,8 +1567,6 @@ static dap_chain_atom_verify_res_t s_callback_atom_add(dap_chain_t * a_chain, da
break;
}
pthread_rwlock_unlock(&PVT(l_blocks)->rwlock);
if (ret == ATOM_ACCEPT)
s_add_atom_datums(l_blocks, l_block_cache);
return ret;
}
......@@ -1779,7 +1797,7 @@ static dap_chain_atom_ptr_t s_callback_atom_iter_get(dap_chain_atom_iter_t *a_at
a_atom_iter->cur_item = l_blocks_pvt->blocks;
break;
case DAP_CHAIN_ITER_OP_LAST:
HASH_LAST(l_blocks_pvt->blocks, a_atom_iter->cur_item);
a_atom_iter->cur_item = HASH_LAST(l_blocks_pvt->blocks);
break;
case DAP_CHAIN_ITER_OP_NEXT:
if (a_atom_iter->cur_item)
......
......@@ -73,7 +73,7 @@ void dap_chain_block_cache_deinit();
dap_chain_block_cache_t *dap_chain_block_cache_new(dap_hash_fast_t *a_block_hash, dap_chain_block_t *a_block,
size_t a_block_size, uint64_t a_block_number);
size_t a_block_size, uint64_t a_block_number, bool a_copy_block);
dap_chain_block_cache_t *dap_chain_block_cache_dup(dap_chain_block_cache_t *a_block);
int dap_chain_block_cache_update(dap_chain_block_cache_t *a_block_cache, dap_hash_fast_t *a_block_hash);
void dap_chain_block_cache_delete(dap_chain_block_cache_t *a_block_cache);
......
......@@ -65,6 +65,7 @@ typedef struct dap_chain_cs_dag_event_item {
size_t event_size;
uint64_t event_number;
int ret_code;
char *mapped_region;
UT_hash_handle hh, hh_select, hh_datums;
} dap_chain_cs_dag_event_item_t;
......@@ -304,7 +305,8 @@ static void s_dap_chain_cs_dag_threshold_free(dap_chain_cs_dag_t *a_dag) {
l_el->hash = l_current->hash;
HASH_ADD(hh, l_pvt->removed_events_from_treshold, hash, sizeof(dap_chain_hash_fast_t), l_el);
char *l_hash_dag = dap_hash_fast_to_str_new(&l_current->hash);
DAP_DELETE(l_current->event);
if (!a_dag->chain->is_mapped && !l_current->mapped_region)
DAP_DELETE(l_current->event);
HASH_DEL(l_pvt->events_treshold, l_current);
DAP_DELETE(l_current);
log_it(L_NOTICE, "Removed DAG event with %s hash from trashold.", l_hash_dag);
......@@ -315,7 +317,8 @@ static void s_dap_chain_cs_dag_threshold_free(dap_chain_cs_dag_t *a_dag) {
HASH_ITER(hh, l_pvt->events_treshold_conflicted, l_current, l_tmp) {
if (l_current->ts_added < l_time_cut_off) {
char *l_hash_dag = dap_hash_fast_to_str_new(&l_current->hash);
DAP_DELETE(l_current->event);
if (!a_dag->chain->is_mapped && !l_current->mapped_region)
DAP_DELETE(l_current->event);
HASH_DEL(l_pvt->events_treshold_conflicted, l_current);
DAP_DELETE(l_current);
log_it(L_NOTICE, "Removed DAG event with %s hash from trashold.", l_hash_dag);
......@@ -334,18 +337,26 @@ static void s_dap_chain_cs_dag_purge(dap_chain_t *a_chain)
// Clang bug at this, l_event_current should change at every loop cycle
HASH_ITER(hh, l_dag_pvt->events, l_event_current, l_event_tmp) {
HASH_DEL(l_dag_pvt->events, l_event_current);
if (!a_chain->is_mapped && !l_event_current->mapped_region)
DAP_DELETE(l_event_current->event);
DAP_DELETE(l_event_current);
}
HASH_ITER(hh, l_dag_pvt->events_lasts_unlinked, l_event_current, l_event_tmp) {
HASH_DEL(l_dag_pvt->events_lasts_unlinked, l_event_current);
if (!a_chain->is_mapped && !l_event_current->mapped_region)
DAP_DELETE(l_event_current->event);
DAP_DELETE(l_event_current);
}
HASH_ITER(hh, l_dag_pvt->events_treshold, l_event_current, l_event_tmp) {
HASH_DEL(l_dag_pvt->events_treshold, l_event_current);
if (!a_chain->is_mapped && !l_event_current->mapped_region)
DAP_DELETE(l_event_current->event);
DAP_DELETE(l_event_current);
}
HASH_ITER(hh, l_dag_pvt->events_treshold_conflicted, l_event_current, l_event_tmp) {
HASH_DEL(l_dag_pvt->events_treshold_conflicted, l_event_current);
if (!a_chain->is_mapped && !l_event_current->mapped_region)
DAP_DELETE(l_event_current->event);
DAP_DELETE(l_event_current);
}
pthread_mutex_unlock(&l_dag_pvt->events_mutex);
......@@ -442,7 +453,7 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha
dap_chain_cs_dag_event_calc_hash(l_event, a_atom_size, &l_event_hash);
if (s_debug_more) {
char l_event_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE] = { '\0' };
dap_chain_hash_fast_to_str(&l_event_item->hash, l_event_hash_str, sizeof(l_event_hash_str));
dap_chain_hash_fast_to_str(&l_event_hash, l_event_hash_str, sizeof(l_event_hash_str));
log_it(L_DEBUG, "Processing event: %s ... (size %zd)", l_event_hash_str,a_atom_size);
}
pthread_mutex_lock(&PVT(l_dag)->events_mutex);
......@@ -455,7 +466,7 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha
case ATOM_ACCEPT:
ret = s_chain_callback_atom_verify(a_chain, a_atom, a_atom_size);
if (ret == ATOM_MOVE_TO_THRESHOLD) {
if (!s_threshold_enabled && !dap_chain_net_get_load_mode(dap_chain_net_by_id(a_chain->net_id)))
if (!s_threshold_enabled /*&& !dap_chain_net_get_load_mode(dap_chain_net_by_id(a_chain->net_id))*/)
ret = ATOM_REJECT;
}
debug_if(s_debug_more, L_DEBUG, "Verified atom %p: %s", a_atom, dap_chain_atom_verify_res_str[ret]);
......@@ -468,28 +479,26 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha
break;
}
l_event_item = DAP_NEW_Z(dap_chain_cs_dag_event_item_t);
if (!l_event_item) {
if ( !(l_event_item = DAP_NEW_Z(dap_chain_cs_dag_event_item_t)) ) {
log_it(L_CRITICAL, "%s", g_error_memory_alloc);
pthread_mutex_unlock(&PVT(l_dag)->events_mutex);
return ATOM_REJECT;
}
l_event_item->event = DAP_DUP_SIZE(a_atom, a_atom_size);
if (!l_event_item->event) {
log_it(L_CRITICAL, "%s", g_error_memory_alloc);
pthread_mutex_unlock(&PVT(l_dag)->events_mutex);
return ATOM_REJECT;
}
l_event_item->event_size = a_atom_size;
l_event_item->ts_added = dap_time_now();
l_event_item->hash = l_event_hash;
*l_event_item = (dap_chain_cs_dag_event_item_t) {
.hash = l_event_hash,
.ts_added = dap_time_now(),
.event = a_chain->is_mapped ? l_event : DAP_DUP_SIZE(l_event, a_atom_size),
.event_size = a_atom_size
};
switch (ret) {
case ATOM_MOVE_TO_THRESHOLD: {
dap_chain_cs_dag_blocked_t *el = NULL;
HASH_FIND(hh, PVT(l_dag)->removed_events_from_treshold, &l_event_item->hash, sizeof(dap_chain_hash_fast_t), el);
HASH_FIND(hh, PVT(l_dag)->removed_events_from_treshold, &l_event_hash, sizeof(dap_chain_hash_fast_t), el);
if (!el) {
HASH_ADD(hh, PVT(l_dag)->events_treshold, hash, sizeof(l_event_item->hash), l_event_item);
if ( a_chain->is_mapped && dap_chain_net_get_load_mode(dap_chain_net_by_id(a_chain->net_id)) )
l_event_item->mapped_region = (char*)l_event;
HASH_ADD(hh, PVT(l_dag)->events_treshold, hash, sizeof(l_event_hash), l_event_item);
debug_if(s_debug_more, L_DEBUG, "... added to threshold");
} else {
ret = ATOM_REJECT;
......@@ -498,6 +507,17 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha
break;
}
case ATOM_ACCEPT: {
dap_chain_cell_t *l_cell = dap_chain_cell_find_by_id(a_chain, l_event->header.cell_id);
if ( !dap_chain_net_get_load_mode( dap_chain_net_by_id(a_chain->net_id)) ) {
if ( dap_chain_atom_save(l_cell, a_atom, a_atom_size, &l_event_hash) < 0 ) {
log_it(L_ERROR, "Can't save atom to file");
ret = ATOM_REJECT;
break;
} else if (a_chain->is_mapped) {
l_event_item->event = (dap_chain_cs_dag_event_t*)( l_cell->map_pos += sizeof(uint64_t) );
l_cell->map_pos += a_atom_size;
}
}
int l_consensus_check = s_dap_chain_add_atom_to_events_table(l_dag, l_event_item);
switch (l_consensus_check) {
case 0:
......@@ -517,8 +537,7 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha
debug_if(s_debug_more, L_WARNING, "... added with ledger code %d", l_consensus_check);
break;
}
dap_chain_cs_dag_event_item_t *l_tail;
HASH_LAST(PVT(l_dag)->events, l_tail);
dap_chain_cs_dag_event_item_t *l_tail = HASH_LAST(PVT(l_dag)->events);
if (l_tail && l_tail->event->header.ts_created > l_event->header.ts_created) {
DAP_CHAIN_PVT(a_chain)->need_reorder = true;
HASH_ADD_INORDER(hh, PVT(l_dag)->events, hash, sizeof(l_event_item->hash), l_event_item, s_sort_event_item);
......@@ -528,13 +547,16 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha
} else
HASH_ADD(hh, PVT(l_dag)->events, hash, sizeof(l_event_item->hash), l_event_item);
s_dag_events_lasts_process_new_last_event(l_dag, l_event_item);
dap_chain_atom_notify(l_cell, l_event_item->event, l_event_item->event_size);
dap_chain_atom_add_from_threshold(a_chain);
} break;
default:
break;
}
pthread_mutex_unlock(&PVT(l_dag)->events_mutex);
if (ret == ATOM_REJECT) { // Neither added, nor freed
DAP_DELETE(l_event_item->event);
if (!a_chain->is_mapped)
DAP_DELETE(l_event_item->event);
DAP_DELETE(l_event_item);
}
return ret;
......@@ -673,12 +695,12 @@ static bool s_chain_callback_datums_pool_proc(dap_chain_t *a_chain, dap_chain_da
bool l_res = false;
if (l_dag->is_add_directly) {
dap_chain_atom_verify_res_t l_verify_res = s_chain_callback_atom_add(a_chain, l_event, l_event_size);
if (l_verify_res == ATOM_ACCEPT)
l_res = dap_chain_atom_save(a_chain->cells, (uint8_t *)l_event, l_event_size, &l_event_hash) > 0;
else
log_it(L_ERROR, "Can't add new event to the file, atom verification result %d", l_verify_res);
DAP_DELETE(l_event);
return l_res;
if (l_verify_res != ATOM_ACCEPT) {
log_it(L_ERROR, "Can't add new event to the file, atom verification result %d", l_verify_res);
return false;
} else
return true;
}
dap_global_db_set_sync(l_dag->gdb_group_events_round_new, DAG_ROUND_CURRENT_KEY,
......@@ -941,17 +963,30 @@ dap_chain_cs_dag_event_item_t* s_dag_proc_treshold(dap_chain_cs_dag_t * a_dag)
if (ret == DAP_THRESHOLD_OK) {
debug_if(s_debug_more, L_DEBUG, "Processing event (threshold): %s...",
dap_chain_hash_fast_to_str_static(&l_event_item->hash));
dap_chain_cell_t *l_cell = dap_chain_cell_find_by_id(a_dag->chain, l_event_item->event->header.cell_id);
if ( !l_event_item->mapped_region ) {
if ( dap_chain_atom_save(l_cell, (const byte_t*)l_event_item->event, l_event_item->event_size, &l_event_item->hash) < 0 ) {
log_it(L_CRITICAL, "Can't move atom from threshold to file");
res = false;
break;
} else if (a_dag->chain->is_mapped) {
l_event_item->event = (dap_chain_cs_dag_event_t*)( l_cell->map_pos += sizeof(uint64_t) );
l_cell->map_pos += l_event_item->event_size;
}
}
int l_add_res = s_dap_chain_add_atom_to_events_table(a_dag, l_event_item);
HASH_DEL(PVT(a_dag)->events_treshold, l_event_item);
if (!l_add_res) {
HASH_ADD(hh, PVT(a_dag)->events, hash, sizeof(l_event_item->hash), l_event_item);
s_dag_events_lasts_process_new_last_event(a_dag, l_event_item);
debug_if(s_debug_more, L_INFO, "... moved from treshold to main chains");
debug_if(s_debug_more, L_INFO, "... moved from threshold to chain");
dap_chain_atom_notify(l_cell, l_event_item->event, l_event_item->event_size);
res = true;
} else {
// TODO clear other threshold items linked with this one
debug_if(s_debug_more, L_WARNING, "... rejected with ledger code %d", l_add_res);
DAP_DELETE(l_event_item->event);
if (!l_event_item->mapped_region)
DAP_DELETE(l_event_item->event);
DAP_DELETE(l_event_item);
}
break;
......@@ -1101,7 +1136,7 @@ static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get(dap_chain_atom_iter_t
a_atom_iter->cur_item = l_dag_pvt->events;
break;
case DAP_CHAIN_ITER_OP_LAST:
HASH_LAST(l_dag_pvt->events, a_atom_iter->cur_item);
a_atom_iter->cur_item = HASH_LAST(l_dag_pvt->events);
break;
case DAP_CHAIN_ITER_OP_NEXT:
if (a_atom_iter->cur_item)
......@@ -2027,8 +2062,7 @@ static dap_list_t *s_callback_get_atoms(dap_chain_t *a_chain, size_t a_count, si
size_t l_counter = 0;
size_t l_end = l_offset + a_count;
dap_chain_cs_dag_event_item_t *l_ptr;
HASH_LAST(l_dag_pvt->events, l_ptr);
dap_chain_cs_dag_event_item_t *l_ptr = HASH_LAST(l_dag_pvt->events);
for (dap_chain_cs_dag_event_item_t *ptr = l_ptr; ptr != NULL && l_counter < l_end; ptr = ptr->hh.prev){
if (l_counter >= l_offset){
dap_chain_cs_dag_event_t *l_event = ptr->event;
......