From bd2ade6400d1579e4a963d8235be1ffb41c2aff9 Mon Sep 17 00:00:00 2001 From: "papizh.konstantin" <papizh.konstantin@demlabs.net> Date: Wed, 15 Sep 2021 11:48:55 +0000 Subject: [PATCH] Feature 4515 2 --- modules/chain/dap_chain.c | 60 ++-- modules/chain/dap_chain_cell.c | 303 ++++++++++---------- modules/chain/dap_chain_ledger.c | 7 +- modules/chain/include/dap_chain.h | 2 +- modules/chain/include/dap_chain_cell.h | 4 +- modules/channel/chain/dap_stream_ch_chain.c | 207 +++++++------ modules/net/dap_chain_net.c | 2 +- modules/net/dap_chain_node_cli_cmd.c | 9 +- modules/net/dap_chain_node_cli_cmd_tx.c | 10 +- modules/type/blocks/dap_chain_cs_blocks.c | 2 +- modules/type/dag/dap_chain_cs_dag.c | 230 ++++++++------- 11 files changed, 432 insertions(+), 404 deletions(-) diff --git a/modules/chain/dap_chain.c b/modules/chain/dap_chain.c index 54f48db078..33c758c2a4 100644 --- a/modules/chain/dap_chain.c +++ b/modules/chain/dap_chain.c @@ -146,6 +146,7 @@ dap_chain_t * dap_chain_create(dap_ledger_t* a_ledger, const char * a_chain_net_ l_ret->net_name = strdup (a_chain_net_name); l_ret->ledger = a_ledger; pthread_rwlock_init(&l_ret->atoms_rwlock,NULL); + pthread_rwlock_init(&l_ret->cell_rwlock,NULL); dap_chain_item_t * l_ret_item = DAP_NEW_Z(dap_chain_item_t); l_ret_item->chain = l_ret; @@ -194,6 +195,7 @@ void dap_chain_delete(dap_chain_t * a_chain) a_chain->autoproc_datum_types_count = 0; DAP_DELETE(a_chain->autoproc_datum_types); pthread_rwlock_destroy(&a_chain->atoms_rwlock); + pthread_rwlock_destroy(&a_chain->cell_rwlock); pthread_rwlock_unlock(&s_chain_items_rwlock); } @@ -207,7 +209,7 @@ void dap_chain_delete(dap_chain_t * a_chain) dap_chain_atom_ptr_t dap_chain_get_atom_by_hash(dap_chain_t * a_chain, dap_chain_hash_fast_t * a_atom_hash, size_t * a_atom_size) { dap_chain_atom_iter_t * l_iter = a_chain->callback_atom_iter_create(a_chain); - dap_chain_atom_ptr_t *l_ret = a_chain->callback_atom_find_by_hash(l_iter, a_atom_hash, a_atom_size); + dap_chain_atom_ptr_t l_ret = a_chain->callback_atom_find_by_hash(l_iter, a_atom_hash, a_atom_size); a_chain->callback_atom_iter_delete(l_iter); return l_ret; } @@ -427,15 +429,14 @@ bool dap_chain_has_file_store(dap_chain_t * a_chain) */ int dap_chain_save_all (dap_chain_t * l_chain) { - int ret = -1; - pthread_rwlock_rdlock(&l_chain->atoms_rwlock); + int ret = 0; + pthread_rwlock_rdlock(&l_chain->cell_rwlock); dap_chain_cell_t * l_item, *l_item_tmp = NULL; HASH_ITER(hh,l_chain->cells,l_item,l_item_tmp){ - dap_chain_cell_file_update(l_item); - if (ret <0 ) + if(dap_chain_cell_file_update(l_item) <= 0) ret++; } - pthread_rwlock_unlock(&l_chain->atoms_rwlock); + pthread_rwlock_unlock(&l_chain->cell_rwlock); return ret; } @@ -444,39 +445,29 @@ int dap_chain_save_all (dap_chain_t * l_chain) * @param l_chain * @return */ -int dap_chain_load_all (dap_chain_t * l_chain) +int dap_chain_load_all(dap_chain_t *l_chain) { - int l_ret = -2; - if(!l_chain) - return l_ret; - pthread_rwlock_wrlock(&l_chain->atoms_rwlock); - - // create directory if not exist + int l_ret = 0; + if (!l_chain) + return -2; if(!dap_dir_test(DAP_CHAIN_PVT (l_chain)->file_storage_dir)) { dap_mkdir_with_parents(DAP_CHAIN_PVT (l_chain)->file_storage_dir); } DIR * l_dir = opendir(DAP_CHAIN_PVT(l_chain)->file_storage_dir); - if( l_dir ) { - struct dirent * l_dir_entry; - l_ret = -1; - while((l_dir_entry=readdir(l_dir))!=NULL){ - const char * l_filename = l_dir_entry->d_name; - size_t l_filename_len = strlen (l_filename); - // Check if its not special dir entries . or .. - if( strcmp(l_filename,".") && strcmp(l_filename,"..") ){ - // If not check the file's suffix - const char l_suffix[]=".dchaincell"; - size_t l_suffix_len = strlen(l_suffix); - if (strncmp(l_filename+ l_filename_len-l_suffix_len,l_suffix,l_suffix_len) == 0 ){ - if ( dap_chain_cell_load(l_chain,l_filename) == 0 ) - l_ret = 0; - } - } + if (!l_dir) { + log_it(L_ERROR, "Cannot open directory %s", DAP_CHAIN_PVT (l_chain)->file_storage_dir); + return -3; + } + for (struct dirent *l_dir_entry = readdir(l_dir); l_dir_entry != NULL; l_dir_entry = readdir(l_dir)) + { + const char * l_filename = l_dir_entry->d_name; + const char l_suffix[] = ".dchaincell"; + size_t l_suffix_len = strlen(l_suffix); + if (strncmp(l_filename + strlen(l_filename) - l_suffix_len, l_suffix, l_suffix_len) == 0 ) { + l_ret += dap_chain_cell_load(l_chain,l_filename); } - closedir(l_dir); } - pthread_rwlock_unlock(&l_chain->atoms_rwlock); - + closedir(l_dir); return l_ret; } @@ -539,13 +530,11 @@ void dap_chain_add_callback_notify(dap_chain_t * a_chain, dap_chain_callback_not bool dap_chain_get_atom_last_hash(dap_chain_t * a_chain, dap_hash_fast_t * a_atom_hash) { bool l_ret = false; - pthread_rwlock_rdlock(&a_chain->atoms_rwlock); - dap_chain_atom_iter_t *l_atom_iter = a_chain->callback_atom_iter_create(a_chain); dap_chain_atom_ptr_t * l_lasts_atom; size_t l_lasts_atom_count=0; size_t* l_lasts_atom_size =NULL; - l_lasts_atom= a_chain->callback_atom_iter_get_lasts(l_atom_iter, &l_lasts_atom_count,&l_lasts_atom_size); + l_lasts_atom = a_chain->callback_atom_iter_get_lasts(l_atom_iter, &l_lasts_atom_count,&l_lasts_atom_size); if (l_lasts_atom&& l_lasts_atom_count){ assert(l_lasts_atom_size[0]); assert(l_lasts_atom[0]); @@ -560,6 +549,5 @@ bool dap_chain_get_atom_last_hash(dap_chain_t * a_chain, dap_hash_fast_t * a_ato l_ret = true; } a_chain->callback_atom_iter_delete(l_atom_iter); - pthread_rwlock_unlock(&a_chain->atoms_rwlock); return l_ret; } diff --git a/modules/chain/dap_chain_cell.c b/modules/chain/dap_chain_cell.c index 8e44c83674..4f0076f2b4 100644 --- a/modules/chain/dap_chain_cell.c +++ b/modules/chain/dap_chain_cell.c @@ -66,14 +66,15 @@ int dap_chain_cell_init(void) return 0; } -/** - * @brief dap_chain_cell_create - * @return - */ -dap_chain_cell_t * dap_chain_cell_create(void) +dap_chain_cell_t * dap_chain_cell_find_by_id(dap_chain_t * a_chain, dap_chain_cell_id_t a_cell_id) { - dap_chain_cell_t * l_cell = DAP_NEW_Z(dap_chain_cell_t); - return l_cell; + if (!a_chain->cells) + return NULL; + dap_chain_cell_t *l_cell = NULL; + pthread_rwlock_rdlock(&a_chain->cell_rwlock); + HASH_FIND(hh, a_chain->cells, &a_cell_id, sizeof(dap_chain_cell_id_t), l_cell); + pthread_rwlock_unlock(&a_chain->cell_rwlock); + return l_cell; } /** @@ -83,15 +84,38 @@ dap_chain_cell_t * dap_chain_cell_create(void) */ dap_chain_cell_t * dap_chain_cell_create_fill(dap_chain_t * a_chain, dap_chain_cell_id_t a_cell_id) { - dap_chain_cell_t * l_cell = dap_chain_cell_create(); + dap_chain_cell_t * l_cell = DAP_NEW_Z(dap_chain_cell_t); l_cell->chain = a_chain; l_cell->id.uint64 = a_cell_id.uint64; - //dap_chain_net_t *l_net = dap_chain_net_by_id(a_net_id); - //l_cell->id.uint64 = l_net ? l_net->pub.cell_id.uint64 : 0; l_cell->file_storage_path = dap_strdup_printf("%0llx.dchaincell", l_cell->id.uint64); + pthread_rwlock_wrlock(&a_chain->cell_rwlock); + HASH_ADD(hh, a_chain->cells, id, sizeof(dap_chain_cell_id_t), l_cell); + pthread_rwlock_unlock(&a_chain->cell_rwlock); + return l_cell; +} + +dap_chain_cell_t * dap_chain_cell_create_fill2(dap_chain_t * a_chain, const char *a_filename) +{ + dap_chain_cell_t * l_cell = DAP_NEW_Z(dap_chain_cell_t); + l_cell->chain = a_chain; + sscanf(a_filename, "%0llx.dchaincell", &l_cell->id.uint64); + l_cell->file_storage_path = dap_strdup_printf(a_filename); + pthread_rwlock_wrlock(&a_chain->cell_rwlock); + HASH_ADD(hh, a_chain->cells, id, sizeof(dap_chain_cell_id_t), l_cell); + pthread_rwlock_unlock(&a_chain->cell_rwlock); return l_cell; } +void dap_chain_cell_close(dap_chain_cell_t *a_cell) +{ + if(!a_cell) + return; + if(a_cell->file_storage) { + fclose(a_cell->file_storage); + a_cell->file_storage = NULL; + } +} + /** * @brief dap_chain_cell_delete * @return @@ -100,10 +124,21 @@ void dap_chain_cell_delete(dap_chain_cell_t *a_cell) { if(!a_cell) return; - if(a_cell->file_storage) - fclose(a_cell->file_storage); - DAP_DELETE(a_cell->file_storage_path); - DAP_DELETE(a_cell); + dap_chain_cell_close(a_cell); + if (a_cell->chain->cells) { + dap_chain_cell_t *l_cell = NULL; + dap_chain_cell_id_t l_cell_id = { + .uint64 = a_cell->id.uint64 + }; + pthread_rwlock_wrlock(&a_cell->chain->cell_rwlock); + HASH_FIND(hh, a_cell->chain->cells, &l_cell_id, sizeof(dap_chain_cell_id_t), l_cell); + if (l_cell) + HASH_DEL(a_cell->chain->cells, l_cell); + pthread_rwlock_unlock(&a_cell->chain->cell_rwlock); + } + a_cell->chain = NULL; + DAP_DEL_Z(a_cell->file_storage_path) + DAP_DEL_Z(a_cell); } /** @@ -114,63 +149,62 @@ void dap_chain_cell_delete(dap_chain_cell_t *a_cell) */ int dap_chain_cell_load(dap_chain_t * a_chain, const char * a_cell_file_path) { - dap_chain_cell_t * l_cell = dap_chain_cell_create(); - l_cell->file_storage_path = dap_strdup( a_cell_file_path ); - { - char l_file_path[MAX_PATH] = {'\0'}; - dap_snprintf(l_file_path, MAX_PATH, "%s/%s", DAP_CHAIN_PVT(a_chain)->file_storage_dir, - l_cell->file_storage_path); - l_cell->file_storage = fopen(l_file_path,"rb"); + int ret = 0; + char l_file_path[MAX_PATH] = {'\0'}; + dap_snprintf(l_file_path, MAX_PATH, "%s/%s", DAP_CHAIN_PVT(a_chain)->file_storage_dir, a_cell_file_path); + FILE *l_f = fopen(l_file_path, "rb"); + if (!l_f) { + log_it(L_WARNING,"Can't read chain \"%s\"", l_file_path); + return -1; } - if ( l_cell->file_storage ){ - dap_chain_cell_file_header_t l_hdr = {0}; - if ( fread( &l_hdr,1,sizeof(l_hdr),l_cell->file_storage ) == sizeof (l_hdr) ) { - if ( l_hdr.signature == DAP_CHAIN_CELL_FILE_SIGNATURE ) { - while ( feof( l_cell->file_storage) == 0 ){ - size_t l_element_size = 0; - if ( fread(&l_element_size,1,sizeof(l_element_size),l_cell->file_storage) == - sizeof(l_element_size) ){ - if ( l_element_size > 0){ - dap_chain_atom_ptr_t * l_element = DAP_NEW_Z_SIZE (dap_chain_atom_ptr_t, l_element_size ); - if ( l_element){ - size_t l_read_bytes = fread( l_element,1,l_element_size,l_cell->file_storage ); - if ( l_read_bytes == l_element_size ) { - a_chain->callback_atom_add (a_chain, l_element, l_element_size); - }else{ - log_it (L_ERROR, "Can't read %zd bytes (processed only %zd), stop cell load process", l_element_size, - l_read_bytes); - break; - } - }else{ - log_it (L_ERROR, "Can't allocate %zd bytes, stop cell load process", l_element_size); - break; - } - - } else { - log_it (L_ERROR, "Zero element size, file is corrupted"); - break; - } - } - } - dap_chain_cell_delete(l_cell); - return 0; - } else { - log_it (L_ERROR,"Wrong signature in file \"%s\", possible file corrupt",l_cell->file_storage_path); - dap_chain_cell_delete(l_cell); - return -3; - } + dap_chain_cell_file_header_t l_hdr = { 0 }; + if (fread(&l_hdr, 1, sizeof(l_hdr), l_f) != sizeof (l_hdr)) { + log_it(L_ERROR,"Can't read chain header \"%s\"", l_file_path); + fclose(l_f); + return -2; + } + if (l_hdr.signature != DAP_CHAIN_CELL_FILE_SIGNATURE) { + log_it(L_ERROR, "Wrong signature in chain \"%s\", possible file corrupt", l_file_path); + fclose(l_f); + return -3; + } + size_t l_el_size = 0; + unsigned long q = 0; + volatile dap_chain_cell_t *l_dummy; + for (fread(&l_el_size, 1, sizeof(l_el_size), l_f); !feof(l_f); l_el_size = 0, fread(&l_el_size, 1, sizeof(l_el_size), l_f)) + { + if (!l_el_size) { + log_it(L_ERROR, "Zero element size, chain %s is corrupted", l_file_path); + ret = -4; + break; + } + dap_chain_atom_ptr_t l_element = DAP_NEW_Z_SIZE(dap_chain_atom_ptr_t, l_el_size); + if (!l_element) { + log_it(L_ERROR, "Out of memory"); + ret = -5; + break; + } + unsigned long l_read = fread(l_element, 1, l_el_size, l_f); + if(l_read == l_el_size) { + a_chain->callback_atom_add(a_chain, l_element, l_el_size); // !!! blocking GDB call !!! + ++q; + DAP_DELETE(l_element); } else { - log_it (L_ERROR,"Can't read dap_chain file header \"%s\"",l_cell->file_storage_path); - dap_chain_cell_delete(l_cell); - return -2; + log_it(L_ERROR, "Read only %zd of %zd bytes, stop cell loading", l_read, l_el_size); + ret = -6; + DAP_DELETE(l_element); + break; } - }else { - log_it (L_WARNING,"Can't read dap_chain file \"%s\"",l_cell->file_storage_path); - dap_chain_cell_delete(l_cell); - return -1; } - dap_chain_cell_delete(l_cell); - return -4; + if (ret < 0) { + log_it(L_INFO, "Couldn't load all atoms, %d only", q); + } else { + log_it(L_INFO, "Loaded all %d atoms in cell %s", q, a_cell_file_path); + l_dummy = dap_chain_cell_create_fill2(a_chain, a_cell_file_path); + } + fclose(l_f); + return ret; + } /** @@ -182,24 +216,33 @@ int dap_chain_cell_load(dap_chain_t * a_chain, const char * a_cell_file_path) */ int dap_chain_cell_file_append( dap_chain_cell_t * a_cell, const void* a_atom, size_t a_atom_size) { - size_t l_total_wrote_bytes = 0; - // file need to be opened or created - if(a_cell->file_storage == NULL) { + if(!a_cell) + return -1; + if (!a_atom && !a_cell->chain) { + log_it(L_WARNING,"Chain not found for cell 0x%016X ( %s )", + a_cell->id.uint64, a_cell->file_storage_path); + return -1; + } + if(!a_cell->file_storage) { char l_file_path[MAX_PATH] = {'\0'}; dap_snprintf(l_file_path, MAX_PATH, "%s/%s", DAP_CHAIN_PVT(a_cell->chain)->file_storage_dir, a_cell->file_storage_path); - a_cell->file_storage = fopen(l_file_path, "ab"); - if(a_cell->file_storage == NULL) - a_cell->file_storage = fopen(l_file_path, "wb"); - } - if(!a_cell->file_storage) { - log_it(L_ERROR, "File \"%s\" not opened for write cell 0x%016X", - a_cell->file_storage_path, - a_cell->id.uint64); - return -3; + if(!a_cell->file_storage) + a_cell->file_storage = fopen(l_file_path, "r+b"); + if (!a_cell->file_storage) { + log_it(L_INFO, "Create chain cell"); + a_cell->file_storage = fopen(l_file_path, "w+b"); + } + if (!a_cell->file_storage) { + log_it(L_ERROR, "Chain cell \"%s\" cannot be opened 0x%016X", + a_cell->file_storage_path, + a_cell->id.uint64); + return -3; + } } - // write header if file empty or not created - if(!ftell(a_cell->file_storage)) { + fseek(a_cell->file_storage, 0L, SEEK_END); + if (ftell(a_cell->file_storage) < (long)sizeof(dap_chain_cell_file_header_t)) { // fill the header + fseek(a_cell->file_storage, 0L, SEEK_SET); dap_chain_cell_file_header_t l_hdr = { .signature = DAP_CHAIN_CELL_FILE_SIGNATURE, .version = DAP_CHAIN_CELL_FILE_VERSION, @@ -213,30 +256,47 @@ int dap_chain_cell_file_append( dap_chain_cell_t * a_cell, const void* a_atom, s } else { log_it(L_ERROR, "Can't init file storage for cell 0x%016X ( %s )", a_cell->id.uint64, a_cell->file_storage_path); - fclose(a_cell->file_storage); - a_cell->file_storage = NULL; + dap_chain_cell_close(a_cell); + return -4; } } - if ( fwrite(&a_atom_size,1,sizeof(a_atom_size),a_cell->file_storage) == sizeof(a_atom_size) ){ - l_total_wrote_bytes += sizeof (a_atom_size); - if ( fwrite(a_atom,1,a_atom_size,a_cell->file_storage) == a_atom_size ){ - l_total_wrote_bytes += a_atom_size; - // change in chain happened -> nodes synchronization required - if(a_cell->chain && a_cell->chain->callback_notify) - a_cell->chain->callback_notify(a_cell->chain->callback_notify_arg, a_cell->chain, a_cell->id, (void *)a_atom, a_atom_size); - } else { + // if no atom provided in arguments, we flush all the atoms in given chain + size_t l_atom_size = a_atom_size ? a_atom_size : 0; + int l_total_wrote_bytes = 0; + dap_chain_atom_iter_t *l_atom_iter = a_atom ? a_cell->chain->callback_atom_iter_create(a_cell->chain) : NULL; + for (dap_chain_atom_ptr_t l_atom = a_atom ? (dap_chain_atom_ptr_t)a_atom : a_cell->chain->callback_atom_iter_get_first(l_atom_iter, &l_atom_size); + l_atom; + l_atom = a_atom ? NULL : a_cell->chain->callback_atom_iter_get_next(l_atom_iter, &l_atom_size)) + { + if (fwrite(&l_atom_size, 1, sizeof(l_atom_size), a_cell->file_storage) != sizeof(l_atom_size)) { + log_it (L_ERROR,"Can't write atom data size from cell 0x%016X in \"%s\"", + a_cell->id.uint64, + a_cell->file_storage_path); + dap_chain_cell_close(a_cell); + l_total_wrote_bytes = -2; + break; + } + l_total_wrote_bytes += sizeof(l_atom_size); + if (fwrite(l_atom, 1, l_atom_size, a_cell->file_storage) != l_atom_size) { log_it (L_ERROR, "Can't write data from cell 0x%016X to the file \"%s\"", a_cell->id.uint64, a_cell->file_storage_path); - return -1; + dap_chain_cell_close(a_cell); + l_total_wrote_bytes = -3; + break; } - } else { - log_it (L_ERROR,"Can't write atom data size from cell 0x%016X in \"%s\"", - a_cell->id.uint64, - a_cell->file_storage_path); - return -2; + l_total_wrote_bytes += l_atom_size; + if(a_cell->chain && a_cell->chain->callback_notify) + a_cell->chain->callback_notify(a_cell->chain->callback_notify_arg, + a_cell->chain, + a_cell->id, + (void *)l_atom, + l_atom_size); } - return (int) l_total_wrote_bytes; + if (l_atom_iter) { + a_cell->chain->callback_atom_iter_delete(l_atom_iter); + } + return (int)l_total_wrote_bytes; } /** @@ -246,50 +306,5 @@ int dap_chain_cell_file_append( dap_chain_cell_t * a_cell, const void* a_atom, s */ int dap_chain_cell_file_update( dap_chain_cell_t * a_cell) { - if(!a_cell) - return -1; - if(!a_cell->chain){ - log_it(L_WARNING,"chain not defined for cell for cell 0x%016X ( %s )", - a_cell->id.uint64,a_cell->file_storage_path); - return -1; - } - if(a_cell->file_storage == NULL ){ // File need to be created - char l_file_path[MAX_PATH] = {'\0'}; - dap_snprintf(l_file_path, MAX_PATH, "%s/%s", DAP_CHAIN_PVT(a_cell->chain)->file_storage_dir, - a_cell->file_storage_path); - a_cell->file_storage = fopen(l_file_path, "wb"); - if(a_cell->file_storage) { - dap_chain_cell_file_header_t l_hdr = { - .signature = DAP_CHAIN_CELL_FILE_SIGNATURE, - .version = DAP_CHAIN_CELL_FILE_VERSION, - .type = DAP_CHAIN_CELL_FILE_TYPE_RAW, - .chain_id = { .uint64 = a_cell->id.uint64 }, - .chain_net_id = a_cell->chain->net_id - }; - if ( fwrite( &l_hdr,1,sizeof(l_hdr),a_cell->file_storage ) == sizeof (l_hdr) ) { - log_it(L_NOTICE,"Initialized file storage for cell 0x%016X ( %s )", - a_cell->id.uint64,a_cell->file_storage_path); - }else{ - log_it(L_ERROR,"Can't init file storage for cell 0x%016X ( %s )", - a_cell->id.uint64,a_cell->file_storage_path); - fclose(a_cell->file_storage); - a_cell->file_storage = NULL; - } - } - } - if ( a_cell->file_storage ){ - dap_chain_t * l_chain = a_cell->chain; - dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create (l_chain); - size_t l_atom_size = 0; - dap_chain_atom_ptr_t *l_atom = l_chain->callback_atom_iter_get_first(l_atom_iter, &l_atom_size); - while ( l_atom && l_atom_size){ - if ( dap_chain_cell_file_append (a_cell,l_atom, l_atom_size) <0 ) - break; - l_atom = l_chain->callback_atom_iter_get_next( l_atom_iter, &l_atom_size ); - } - }else { - log_it (L_ERROR,"Can't write cell 0x%016X file \"%s\"",a_cell->id.uint64, a_cell->file_storage_path); - return -1; - } - return 0; + return dap_chain_cell_file_append(a_cell, NULL, 0); } diff --git a/modules/chain/dap_chain_ledger.c b/modules/chain/dap_chain_ledger.c index 6f47412d99..7b945510f0 100644 --- a/modules/chain/dap_chain_ledger.c +++ b/modules/chain/dap_chain_ledger.c @@ -1359,6 +1359,7 @@ void dap_chain_ledger_addr_get_token_ticker_all_fast(dap_ledger_t *a_ledger, dap char **l_tickers = DAP_NEW_Z_SIZE(char*, l_count * sizeof(char*)); l_count = 0; char *l_addr = dap_chain_addr_to_str(a_addr); + pthread_rwlock_rdlock(&PVT(a_ledger)->balance_accounts_rwlock); HASH_ITER(hh, PVT(a_ledger)->balance_accounts, wallet_balance, tmp) { char **l_keys = dap_strsplit(wallet_balance->key, " ", -1); if (!dap_strcmp(l_keys[0], l_addr)) { @@ -1367,6 +1368,7 @@ void dap_chain_ledger_addr_get_token_ticker_all_fast(dap_ledger_t *a_ledger, dap } dap_strfreev(l_keys); } + pthread_rwlock_unlock(&PVT(a_ledger)->balance_accounts_rwlock); *a_tickers = l_tickers; } *a_tickers_size = l_count; @@ -2113,7 +2115,9 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx, &bound_item->out.tx_prev_out_ext->addr; char *l_addr_str = dap_chain_addr_to_str(l_addr); char *l_wallet_balance_key = dap_strjoin(" ", l_addr_str, l_token_ticker, (char*)NULL); + pthread_rwlock_rdlock(&PVT(a_ledger)->balance_accounts_rwlock); HASH_FIND_STR(PVT(a_ledger)->balance_accounts, l_wallet_balance_key, wallet_balance); + pthread_rwlock_unlock(&PVT(a_ledger)->balance_accounts_rwlock); if (wallet_balance) { uint64_t l_value = (l_out_type == TX_ITEM_TYPE_OUT) ? bound_item->out.tx_prev_out->header.value : @@ -2544,8 +2548,9 @@ uint128_t dap_chain_ledger_calc_balance(dap_ledger_t *a_ledger, const dap_chain_ dap_ledger_wallet_balance_t *l_balance_item = NULL;// ,* l_balance_item_tmp = NULL; char *l_addr = dap_chain_addr_to_str(a_addr); char *l_wallet_balance_key = dap_strjoin(" ", l_addr, a_token_ticker, (char*)NULL); - + pthread_rwlock_rdlock(&PVT(a_ledger)->balance_accounts_rwlock); HASH_FIND_STR(PVT(a_ledger)->balance_accounts, l_wallet_balance_key, l_balance_item); + pthread_rwlock_unlock(&PVT(a_ledger)->balance_accounts_rwlock); if (l_balance_item) { if(s_debug_more) log_it (L_INFO,"Found address in cache with balance %"DAP_UINT64_FORMAT_U"", l_balance_item->balance); diff --git a/modules/chain/include/dap_chain.h b/modules/chain/include/dap_chain.h index 154702a44d..5d6de24f66 100644 --- a/modules/chain/include/dap_chain.h +++ b/modules/chain/include/dap_chain.h @@ -119,7 +119,7 @@ typedef struct dap_chain{ // read/write atoms rwlock pthread_rwlock_t atoms_rwlock; - + pthread_rwlock_t cell_rwlock; dap_chain_callback_new_cfg_t callback_created; dap_chain_callback_t callback_delete; diff --git a/modules/chain/include/dap_chain_cell.h b/modules/chain/include/dap_chain_cell.h index 532a0876f2..5f599e40e4 100644 --- a/modules/chain/include/dap_chain_cell.h +++ b/modules/chain/include/dap_chain_cell.h @@ -72,8 +72,10 @@ typedef struct dap_chain_cell_decl{ int dap_chain_cell_init(void); -dap_chain_cell_t * dap_chain_cell_create(void); dap_chain_cell_t * dap_chain_cell_create_fill(dap_chain_t * a_chain, dap_chain_cell_id_t a_cell_id); +dap_chain_cell_t * dap_chain_cell_create_fill2(dap_chain_t * a_chain, const char *a_filename); +dap_chain_cell_t * dap_chain_cell_find_by_id(dap_chain_t * a_chain, dap_chain_cell_id_t a_cell_id); +void dap_chain_cell_close(dap_chain_cell_t *a_cell); void dap_chain_cell_delete(dap_chain_cell_t *a_cell); int dap_chain_cell_load(dap_chain_t * a_chain, const char * a_cell_file_path); int dap_chain_cell_file_update( dap_chain_cell_t * a_cell); diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 5822e919b8..00e39dca2b 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -265,7 +265,7 @@ static void s_sync_out_chains_last_worker_callback(dap_worker_t *a_worker, void l_sync_request->request_hdr.net_id.uint64, l_sync_request->request_hdr.chain_id.uint64, l_sync_request->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); if (l_ch_chain->request_atom_iter) - DAP_DEL_Z(l_ch_chain->request_atom_iter); + l_ch_chain->request_atom_iter->chain->callback_atom_iter_delete(l_ch_chain->request_atom_iter); l_ch_chain->state = CHAIN_STATE_IDLE; if (l_ch_chain->callback_notify_packet_out) @@ -285,14 +285,10 @@ static bool s_sync_out_chains_proc_callback(dap_proc_thread_t *a_thread, void *a dap_chain_t * l_chain = dap_chain_find_by_id(l_sync_request->request_hdr.net_id, l_sync_request->request_hdr.chain_id); assert(l_chain); - - pthread_rwlock_rdlock(&l_chain->atoms_rwlock); - + //pthread_rwlock_rdlock(&l_chain->atoms_rwlock); l_sync_request->chain.request_atom_iter = l_chain->callback_atom_iter_create(l_chain); size_t l_first_size = 0; - dap_chain_atom_ptr_t *l_iter = l_chain->callback_atom_iter_get_first(l_sync_request->chain.request_atom_iter, &l_first_size); - - + dap_chain_atom_ptr_t l_iter = l_chain->callback_atom_iter_get_first(l_sync_request->chain.request_atom_iter, &l_first_size); if (l_iter && l_first_size) { // first packet if (!dap_hash_fast_is_blank(&l_sync_request->request.hash_from)) { @@ -301,10 +297,10 @@ static bool s_sync_out_chains_proc_callback(dap_proc_thread_t *a_thread, void *a } - pthread_rwlock_unlock(&l_chain->atoms_rwlock); - dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id,s_sync_out_chains_first_worker_callback, l_sync_request ); + //pthread_rwlock_unlock(&l_chain->atoms_rwlock); + dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_chains_first_worker_callback, l_sync_request ); } else { - pthread_rwlock_unlock(&l_chain->atoms_rwlock); + //pthread_rwlock_unlock(&l_chain->atoms_rwlock); dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id,s_sync_out_chains_last_worker_callback, l_sync_request ); } return true; @@ -457,109 +453,104 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) { UNUSED(a_thread); struct sync_request *l_sync_request = (struct sync_request *) a_arg; + if (!l_sync_request) { + log_it(L_CRITICAL, "Proc thread received corrupted chain packet!"); + return true; + } dap_chain_pkt_item_t *l_pkt_item = &l_sync_request->pkt; dap_chain_hash_fast_t l_atom_hash = {}; - - if (l_pkt_item->pkt_data_size) { - dap_chain_t *l_chain = dap_chain_find_by_id(l_sync_request->request_hdr.net_id, l_sync_request->request_hdr.chain_id); - if (!l_chain) { - if (s_debug_more) - log_it(L_WARNING, "No chain found for DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN"); - return true; + if (l_pkt_item->pkt_data_size == 0 || !l_pkt_item->pkt_data) { + log_it(L_CRITICAL, "In proc thread got CHAINS stream ch packet with zero data"); + DAP_DELETE(l_sync_request); + return true; + } + dap_chain_t *l_chain = dap_chain_find_by_id(l_sync_request->request_hdr.net_id, l_sync_request->request_hdr.chain_id); + if (!l_chain) { + if (s_debug_more) + log_it(L_WARNING, "No chain found for DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN"); + DAP_DELETE(l_sync_request); + DAP_DEL_Z(l_pkt_item->pkt_data); + return true; + } + dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_item->pkt_data; + uint64_t l_atom_copy_size = l_pkt_item->pkt_data_size; + dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash); + size_t l_atom_size = 0; + if (dap_chain_get_atom_by_hash(l_chain, &l_atom_hash, &l_atom_size)) { + if (s_debug_more){ + char l_atom_hash_str[72] = {'\0'}; + dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); + log_it(L_WARNING, "Atom hash %s is already present", l_atom_hash_str); } - dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_item->pkt_data; - uint64_t l_atom_copy_size = l_pkt_item->pkt_data_size; - if ( l_atom_copy_size && l_pkt_item && l_atom_copy ){ - pthread_rwlock_wrlock(&l_chain->atoms_rwlock); - dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash); - dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain); - size_t l_atom_size =0; - if ( l_chain->callback_atom_find_by_hash(l_atom_iter, &l_atom_hash, &l_atom_size) == NULL ) { - dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size); - if ( l_atom_add_res != ATOM_REJECT && dap_chain_has_file_store(l_chain)) { - if (s_debug_more){ - char l_atom_hash_str[72]={[0]='\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); - log_it(L_INFO,"Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); - } - - // append to file - dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_sync_request->request_hdr.cell_id); - int l_res; - if (l_cell) { - // add one atom only - l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size); - // rewrite all file - //l_res = dap_chain_cell_file_update(l_cell); - if(l_res < 0) { - log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash, - l_cell ? l_cell->file_storage_path : "[null]"); - } else { - dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); - } - // add all atoms from treshold - if (l_chain->callback_atom_add_from_treshold){ - dap_chain_atom_ptr_t l_atom_treshold; - do{ - size_t l_atom_treshold_size; - // add into ledger - if (s_debug_more) - log_it(L_DEBUG, "Try to add atom from treshold"); - l_atom_treshold = l_chain->callback_atom_add_from_treshold(l_chain, &l_atom_treshold_size); - // add into file - if(l_atom_treshold) { - l_res = dap_chain_cell_file_append(l_cell, l_atom_treshold, l_atom_treshold_size); - log_it(L_INFO, "Added atom from treshold"); - if(l_res < 0) { - log_it(L_ERROR, "Can't save event 0x%x from treshold to the file '%s'", - l_atom_treshold, l_cell ? l_cell->file_storage_path : "[null]"); - } - } - } - while(l_atom_treshold); - } - - // delete cell and close file - dap_chain_cell_delete(l_cell); - } - else{ - log_it(L_ERROR, "Can't get cell for cell_id 0x%x for save event to file", l_sync_request->request_hdr.cell_id); - - } - }else if(l_atom_add_res == ATOM_PASS){ - if (s_debug_more){ - char l_atom_hash_str[72]={[0]='\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); - log_it(L_WARNING,"Not accepted atom (code ATOM_PASS) with hash %s for %s:%s and moved into the treshold", l_atom_hash_str, l_chain->net_name, l_chain->name); - } - }else{ - if (s_debug_more){ - char l_atom_hash_str[72]={[0]='\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); - log_it(L_WARNING,"Not accepted atom (code %d) with hash %s for %s:%s", l_atom_add_res, l_atom_hash_str, l_chain->net_name, l_chain->name); + dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); + DAP_DELETE(l_atom_copy); + DAP_DELETE(l_sync_request); + return true; + } + dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size); + switch (l_atom_add_res) { + case ATOM_PASS: + if (s_debug_more){ + char l_atom_hash_str[72]={[0]='\0'}; + dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); + log_it(L_WARNING,"Atom with hash %s for %s:%s not accepted (code ATOM_PASS, already present)", l_atom_hash_str, l_chain->net_name, l_chain->name); + } + break; + case ATOM_MOVE_TO_THRESHOLD: + case ATOM_ACCEPT: + if (s_debug_more) { + char l_atom_hash_str[72]={'\0'}; + dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); + log_it(L_INFO,"Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); + } + dap_chain_cell_t *l_cell = dap_chain_cell_find_by_id(l_chain, l_sync_request->request_hdr.cell_id); + if (!l_cell) { + log_it(L_INFO, "Creating cell 0x%016X", l_sync_request->request_hdr.cell_id.uint64); + l_cell = dap_chain_cell_create_fill(l_chain, l_sync_request->request_hdr.cell_id); + } + if (!l_cell) { + log_it(L_ERROR, "Can't create cell with id 0x%x to save event...", l_sync_request->request_hdr.cell_id); + break; + } + int l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size); + if(l_res < 0) { + log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash, + l_cell ? l_cell->file_storage_path : "[null]"); + } else { + dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); + } + if (l_chain->callback_atom_add_from_treshold) { + dap_chain_atom_ptr_t l_atom_treshold; + do { + size_t l_atom_treshold_size; + if (s_debug_more) + log_it(L_DEBUG, "Try to add atom from treshold"); + l_atom_treshold = l_chain->callback_atom_add_from_treshold(l_chain, &l_atom_treshold_size); + if(l_atom_treshold) { + l_res = dap_chain_cell_file_append(l_cell, l_atom_treshold, l_atom_treshold_size); + log_it(L_INFO, "Added atom from treshold"); + if(l_res < 0) { + log_it(L_ERROR, "Can't save event 0x%x from treshold to file '%s'", + l_atom_treshold, l_cell ? l_cell->file_storage_path : "[null]"); } } - DAP_DEL_Z(l_atom_copy); - } else { - if (s_debug_more){ - char l_atom_hash_str[72]={[0]='\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); - log_it(L_WARNING,"Already has atom with hash %s ", l_atom_hash_str); - } - dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); - DAP_DELETE(l_atom_copy); - } - l_chain->callback_atom_iter_delete(l_atom_iter); - pthread_rwlock_unlock(&l_chain->atoms_rwlock); - }else{ - if (!l_pkt_item) - log_it(L_WARNING, "chain packet item is NULL"); - if (l_atom_copy_size) - log_it(L_WARNING, "chain packet item data size is zero"); + } while(l_atom_treshold); } - }else - log_it(L_WARNING, "In proc thread got CHAINS stream ch packet with zero data"); - DAP_DELETE(l_sync_request); + dap_chain_cell_close(l_cell); + break; + case ATOM_REJECT: { + char l_atom_hash_str[72] = {'\0'}; + dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); + log_it(L_WARNING,"Atom with hash %s for %s:%s rejected", l_atom_hash_str, l_chain->net_name, l_chain->name); + break; + } + default: + log_it(L_CRITICAL, "Wtf is this ret code? %d", l_atom_add_res); + break; + } + + DAP_DEL_Z(l_atom_copy); + DAP_DEL_Z(l_sync_request); return true; } @@ -1314,7 +1305,7 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain) a_ch_chain->request_atom_iter = NULL; return; } - DAP_DEL_Z(a_ch_chain->request_atom_iter); + a_ch_chain->request_atom_iter->chain->callback_atom_iter_delete(a_ch_chain->request_atom_iter); } // free log list dap_db_log_list_delete(a_ch_chain->request_db_log); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 1d26565b6d..36a29d7c35 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -2047,7 +2047,7 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) dap_chain_net_state_go_to(l_net, l_target_state); // Start the proc thread - log_it(L_NOTICE, "Сhain network \"%s\" initialized",l_net_item->name); + log_it(L_INFO, "Сhain network \"%s\" initialized",l_net_item->name); dap_config_close(l_cfg); } diff --git a/modules/net/dap_chain_node_cli_cmd.c b/modules/net/dap_chain_node_cli_cmd.c index bdb5a2e59d..36beb9ed11 100644 --- a/modules/net/dap_chain_node_cli_cmd.c +++ b/modules/net/dap_chain_node_cli_cmd.c @@ -732,17 +732,14 @@ int com_global_db(int a_argc, char ** a_argv, void *arg_func, char **a_str_reply dap_chain_node_cli_set_reply_text(a_str_reply, "invalid parameters"); return -1; } - dap_chain_cell_t *l_cell = dap_chain_cell_create(); - l_cell->chain = l_chain; - l_cell->id.uint64 = l_cell_id.uint64; - l_cell->file_storage_path = dap_strdup_printf("%0llx.dchaincell",l_cell->id.uint64); + dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_cell_id); int l_ret = dap_chain_cell_file_update(l_cell); - if(!l_ret) + if(l_ret > 0) dap_chain_node_cli_set_reply_text(a_str_reply, "cell added successfully"); else dap_chain_node_cli_set_reply_text(a_str_reply, "can't create file for cell 0x%016X ( %s )", l_cell->id.uint64,l_cell->file_storage_path); - dap_chain_cell_delete(l_cell); + dap_chain_cell_close(l_cell); return l_ret; //case CMD_NONE: diff --git a/modules/net/dap_chain_node_cli_cmd_tx.c b/modules/net/dap_chain_node_cli_cmd_tx.c index d3e7121738..c02c5c68cb 100644 --- a/modules/net/dap_chain_node_cli_cmd_tx.c +++ b/modules/net/dap_chain_node_cli_cmd_tx.c @@ -362,7 +362,7 @@ char* dap_db_history_tx(dap_chain_hash_fast_t* a_tx_hash, dap_chain_t * a_chain, // load transactions dap_chain_atom_iter_t *l_atom_iter = a_chain->callback_atom_iter_create(a_chain); size_t l_atom_size = 0; - dap_chain_atom_ptr_t *l_atom = a_chain->callback_atom_iter_get_first(l_atom_iter, &l_atom_size); + dap_chain_atom_ptr_t l_atom = a_chain->callback_atom_iter_get_first(l_atom_iter, &l_atom_size); while(l_atom && l_atom_size) { dap_chain_datum_t *l_datum = (dap_chain_datum_t*) l_atom; @@ -578,7 +578,7 @@ char* dap_db_history_addr(dap_chain_addr_t * a_addr, dap_chain_t * a_chain, cons // load transactions dap_chain_atom_iter_t *l_atom_iter = a_chain->callback_atom_iter_create(a_chain); size_t l_atom_size=0; - dap_chain_atom_ptr_t *l_atom = a_chain->callback_atom_iter_get_first(l_atom_iter, &l_atom_size); + dap_chain_atom_ptr_t l_atom = a_chain->callback_atom_iter_get_first(l_atom_iter, &l_atom_size); if (!l_atom) { return NULL; } @@ -962,7 +962,7 @@ static char* dap_db_history_token_list(dap_chain_t * a_chain, const char *a_toke // load transactions size_t l_atom_size = 0; dap_chain_atom_iter_t *l_atom_iter = a_chain->callback_atom_iter_create(a_chain); - dap_chain_atom_ptr_t *l_atom = a_chain->callback_atom_iter_get_first(l_atom_iter, &l_atom_size); + dap_chain_atom_ptr_t l_atom = a_chain->callback_atom_iter_get_first(l_atom_iter, &l_atom_size); size_t l_datums_count = 0; dap_chain_datum_t **l_datums = (a_chain->callback_atom_get_datums && l_atom && l_atom_size) ? a_chain->callback_atom_get_datums(l_atom, l_atom_size, &l_datums_count) : NULL; @@ -1068,7 +1068,7 @@ static char* dap_db_history_filter(dap_chain_t * a_chain, dap_ledger_t *a_ledger // load transactions size_t l_atom_size = 0; dap_chain_atom_iter_t *l_atom_iter = a_chain->callback_atom_iter_create(a_chain); - dap_chain_atom_ptr_t *l_atom = a_chain->callback_atom_iter_get_first(l_atom_iter, &l_atom_size); + dap_chain_atom_ptr_t l_atom = a_chain->callback_atom_iter_get_first(l_atom_iter, &l_atom_size); size_t l_datum_num = 0, l_token_num = 0, l_emission_num = 0, l_tx_num = 0; size_t l_datum_num_global = a_total_datums ? *a_total_datums : 0; while(l_atom && l_atom_size) { @@ -1090,7 +1090,7 @@ static char* dap_db_history_filter(dap_chain_t * a_chain, dap_ledger_t *a_ledger } /*dap_chain_atom_iter_t *l_atom_iter = a_chain->callback_atom_iter_create(a_chain); - dap_chain_atom_ptr_t *l_atom = a_chain->callback_atom_iter_get_first(l_atom_iter); + dap_chain_atom_ptr_t l_atom = a_chain->callback_atom_iter_get_first(l_atom_iter); size_t l_atom_size = a_chain->callback_atom_get_size(l_atom); size_t l_datum_num = 0, l_token_num = 0, l_emission_num = 0, l_tx_num = 0; while(l_atom && l_atom_size) { diff --git a/modules/type/blocks/dap_chain_cs_blocks.c b/modules/type/blocks/dap_chain_cs_blocks.c index 83efd52980..19a46d4e4d 100644 --- a/modules/type/blocks/dap_chain_cs_blocks.c +++ b/modules/type/blocks/dap_chain_cs_blocks.c @@ -933,7 +933,7 @@ static dap_chain_atom_ptr_t s_callback_atom_iter_find_by_hash(dap_chain_atom_ite size_t * a_atom_size) { assert(a_atom_iter); - dap_chain_atom_ptr_t * l_ret = NULL; + dap_chain_atom_ptr_t l_ret = NULL; pthread_rwlock_rdlock(& PVT(ITER_PVT(a_atom_iter)->blocks)->rwlock ); dap_chain_block_cache_t * l_block_cache = NULL; HASH_FIND(hh, PVT(ITER_PVT(a_atom_iter)->blocks)->blocks, a_atom_hash,sizeof (*a_atom_hash), l_block_cache); diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 94b9491a3a..b087258752 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -295,6 +295,7 @@ void dap_chain_cs_dag_delete(dap_chain_t * a_chain) static int s_dap_chain_add_atom_to_ledger(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger, dap_chain_cs_dag_event_item_t * a_event_item) { dap_chain_datum_t *l_datum = (dap_chain_datum_t*) dap_chain_cs_dag_event_get_datum(a_event_item->event, a_event_item->event_size); + pthread_rwlock_t * l_events_rwlock = &PVT(a_dag)->events_rwlock; switch (l_datum->header.type_id) { case DAP_CHAIN_DATUM_TOKEN_DECL: { dap_chain_datum_token_t *l_token = (dap_chain_datum_token_t*) l_datum->data; @@ -313,13 +314,14 @@ static int s_dap_chain_add_atom_to_ledger(dap_chain_cs_dag_t * a_dag, dap_ledger if( l_ret != 1 ) { return l_ret; } - dap_chain_cs_dag_event_item_t * l_tx_event= DAP_NEW_Z(dap_chain_cs_dag_event_item_t); + dap_chain_cs_dag_event_item_t * l_tx_event = DAP_NEW_Z(dap_chain_cs_dag_event_item_t); l_tx_event->ts_added = a_event_item->ts_added; l_tx_event->event = a_event_item->event; l_tx_event->event_size = a_event_item->event_size; memcpy(&l_tx_event->hash, &a_event_item->hash, sizeof (l_tx_event->hash) ); - - HASH_ADD(hh,PVT(a_dag)->tx_events,hash,sizeof (l_tx_event->hash),l_tx_event); + pthread_rwlock_wrlock(l_events_rwlock); + HASH_ADD(hh,PVT(a_dag)->tx_events, hash, sizeof (l_tx_event->hash), l_tx_event); + pthread_rwlock_unlock(l_events_rwlock); } break; default: @@ -332,37 +334,28 @@ static int s_dap_chain_add_atom_to_events_table(dap_chain_cs_dag_t * a_dag, dap_ { int res = a_dag->callback_cs_verify(a_dag,a_event_item->event, a_event_item->event_size); - char l_buf_hash[128]; + char l_buf_hash[128] = {'\0'}; dap_chain_hash_fast_to_str(&a_event_item->hash,l_buf_hash,sizeof(l_buf_hash)-1); if (res == 0 || memcmp( &a_event_item->hash, &a_dag->static_genesis_event_hash, sizeof(a_event_item->hash) ) == 0) { if(s_debug_more) log_it(L_DEBUG,"Dag event %s checked, add it to ledger", l_buf_hash); int l_ledger_res = s_dap_chain_add_atom_to_ledger(a_dag, a_ledger, a_event_item); - if ( l_ledger_res ) { + if ( l_ledger_res != 0) { if(s_debug_more) log_it(L_WARNING,"Dag event %s checked, but ledger declined: code %d", l_buf_hash, l_ledger_res); } - //All correct, no matter for result - HASH_ADD(hh, PVT(a_dag)->events,hash,sizeof (a_event_item->hash), a_event_item); - s_dag_events_lasts_process_new_last_event(a_dag, a_event_item); } else { - log_it(L_WARNING,"Dag event %s check failed: code %d", l_buf_hash, res ); + log_it(L_WARNING,"Dag event %s check failed: code %d", l_buf_hash, res ); } return res; } -static bool s_dap_chain_check_if_event_is_present(dap_chain_cs_dag_event_item_t * a_hash_table, const dap_chain_hash_fast_t * hash){ - bool res = false; - dap_chain_cs_dag_event_item_t * l_event_search = NULL; - +static bool s_dap_chain_check_if_event_is_present(dap_chain_cs_dag_event_item_t * a_hash_table, const dap_chain_hash_fast_t * hash) { if(!a_hash_table) return false; - + dap_chain_cs_dag_event_item_t * l_event_search = NULL; HASH_FIND(hh, a_hash_table, hash, sizeof(*hash), l_event_search); - if ( l_event_search ) - res = true; - - return res; + return (l_event_search != NULL); } /** @@ -374,12 +367,11 @@ static bool s_dap_chain_check_if_event_is_present(dap_chain_cs_dag_event_item_t */ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t a_atom, size_t a_atom_size) { - dap_chain_atom_verify_res_t ret = ATOM_ACCEPT; dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG(a_chain); dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *) a_atom; dap_chain_cs_dag_event_item_t * l_event_item = DAP_NEW_Z(dap_chain_cs_dag_event_item_t); - pthread_rwlock_t * l_events_rwlock = &PVT(l_dag)->events_rwlock ; + pthread_rwlock_t * l_events_rwlock = &PVT(l_dag)->events_rwlock; l_event_item->event = l_event; l_event_item->event_size = a_atom_size; l_event_item->ts_added = time(NULL); @@ -388,59 +380,75 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha dap_chain_hash_fast_t l_event_hash; dap_chain_cs_dag_event_calc_hash(l_event, a_atom_size,&l_event_hash); - char * l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_item->hash); - if(s_debug_more) + char * l_event_hash_str; + if(s_debug_more) { + l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_item->hash); log_it(L_DEBUG, "Processing event: %s... (size %zd)", l_event_hash_str,a_atom_size); + } - pthread_rwlock_wrlock( l_events_rwlock ); - + pthread_rwlock_rdlock(l_events_rwlock); // check if we already have this event - if(s_dap_chain_check_if_event_is_present(PVT(l_dag)->events, &l_event_item->hash)){ - ret = ATOM_PASS; - if(s_debug_more) - log_it(L_DEBUG, "... already present in events"); - }else if(s_dap_chain_check_if_event_is_present(PVT(l_dag)->events_treshold, &l_event_item->hash)){ - ret = ATOM_PASS; - if(s_debug_more) - log_it(L_DEBUG, "... already present in threshold"); - } + dap_chain_atom_verify_res_t ret = s_dap_chain_check_if_event_is_present(PVT(l_dag)->events, &l_event_item->hash) || + s_dap_chain_check_if_event_is_present(PVT(l_dag)->events_treshold, &l_event_item->hash) ? ATOM_PASS : ATOM_ACCEPT; + pthread_rwlock_unlock(l_events_rwlock); // verify hashes and consensus - if(ret == ATOM_ACCEPT){ - ret = s_chain_callback_atom_verify (a_chain, a_atom, a_atom_size); + switch (ret) { + case ATOM_ACCEPT: + ret = s_chain_callback_atom_verify(a_chain, a_atom, a_atom_size); if(s_debug_more) log_it(L_DEBUG, "Verified atom %p: code %d", a_atom, ret); + break; + case ATOM_PASS: + if(s_debug_more) { + log_it(L_DEBUG, "Atom already present"); + DAP_DELETE(l_event_hash_str); + } + DAP_DELETE(l_event_item); + return ret; + default: + break; } - if( ret == ATOM_MOVE_TO_THRESHOLD){ - HASH_ADD(hh, PVT(l_dag)->events_treshold,hash,sizeof (l_event_item->hash), l_event_item); + switch (ret) { + case ATOM_MOVE_TO_THRESHOLD: + pthread_rwlock_wrlock(l_events_rwlock); + HASH_ADD(hh, PVT(l_dag)->events_treshold, hash,sizeof (l_event_item->hash), l_event_item); + pthread_rwlock_unlock(l_events_rwlock); if(s_debug_more) log_it(L_DEBUG, "... added to threshold"); - }else if( ret == ATOM_ACCEPT){ + break; + case ATOM_ACCEPT: { int l_consensus_check = s_dap_chain_add_atom_to_events_table(l_dag, a_chain->ledger, l_event_item); - if(!l_consensus_check){ + //All correct, no matter for result + pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock); + HASH_ADD(hh, PVT(l_dag)->events,hash,sizeof (l_event_item->hash), l_event_item); + s_dag_events_lasts_process_new_last_event(l_dag, l_event_item); + pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); + switch (l_consensus_check) { + case 0: if(s_debug_more) log_it(L_DEBUG, "... added"); - }else if (l_consensus_check == DAP_CHAIN_CS_VERIFY_CODE_TX_NO_PREVIOUS){ + break; + case DAP_CHAIN_CS_VERIFY_CODE_TX_NO_PREVIOUS: + pthread_rwlock_wrlock(l_events_rwlock); HASH_ADD(hh, PVT(l_dag)->events_treshold, hash, sizeof(l_event_item->hash), l_event_item); + pthread_rwlock_unlock(l_events_rwlock); if(s_debug_more) log_it(L_DEBUG, "... tresholded"); ret = ATOM_MOVE_TO_THRESHOLD; - }else{ - log_it(L_WARNING, "Atom %s (size %zd) error adding (code %d)", l_event_hash_str,a_atom_size, l_consensus_check); - ret = ATOM_REJECT; + break; + default: + log_it(L_WARNING, "Atom %s (size %zd) error adding (code %d)", l_event_hash_str,a_atom_size, l_consensus_check); + ret = ATOM_REJECT; + break; } } - // will added in callback s_chain_callback_atom_add_from_treshold() - //while(dap_chain_cs_dag_proc_treshold(l_dag, a_chain->ledger)); - pthread_rwlock_unlock( l_events_rwlock ); - - if(ret == ATOM_PASS){ - DAP_DELETE(l_event_item); + break; + default: break; } - - DAP_DELETE(l_event_hash_str); - + if(s_debug_more) + DAP_DELETE(l_event_hash_str); return ret; } @@ -453,11 +461,7 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha static dap_chain_atom_ptr_t s_chain_callback_atom_add_from_treshold(dap_chain_t * a_chain, size_t *a_event_size_out) { dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG(a_chain); - pthread_rwlock_t * l_events_rwlock = &PVT(l_dag)->events_rwlock; - - pthread_rwlock_wrlock(l_events_rwlock); dap_chain_cs_dag_event_item_t *l_item = dap_chain_cs_dag_proc_treshold(l_dag, a_chain->ledger); - pthread_rwlock_unlock(l_events_rwlock); // dap_chain_cs_dag_event_calc_size() if(l_item) { if(a_event_size_out) @@ -494,7 +498,11 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain dap_chain_hash_fast_t * l_hashes = l_hashes_size ?DAP_NEW_Z_SIZE(dap_chain_hash_fast_t, sizeof(dap_chain_hash_fast_t) * l_hashes_size) : NULL; size_t l_hashes_linked = 0; - dap_chain_cell_t *l_cell = NULL; + dap_chain_net_t *l_net = dap_chain_net_by_id(a_chain->net_id); + dap_chain_cell_id_t l_cell_id = { + .uint64 = l_net ? l_net->pub.cell_id.uint64 : 0 + }; + dap_chain_cell_t *l_cell = dap_chain_cell_find_by_id(a_chain, l_cell_id); for (size_t d = 0; d <a_datums_count ; d++){ dap_chain_datum_t * l_datum = a_datums[d]; @@ -554,12 +562,14 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain dap_chain_cs_dag_event_item_t *l_event_ext_item = NULL; // is_single_line - only one link inside if(!l_dag->is_single_line || !l_hashes_linked){ + pthread_rwlock_rdlock(&PVT(l_dag)->events_rwlock); if( PVT(l_dag)->events_lasts_unlinked && l_hashes_linked < l_hashes_size) { // Take then the first one if any events_lasts are present l_event_ext_item = PVT(l_dag)->events_lasts_unlinked; if(l_hashes) memcpy(&l_hashes[l_hashes_linked], &l_event_ext_item->hash, sizeof(l_event_ext_item->hash)); l_hashes_linked++; } + pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); } if (l_hashes_linked || s_seed_mode ) { @@ -572,15 +582,7 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain if (s_chain_callback_atom_add(a_chain, l_event, l_event_size) == ATOM_ACCEPT) { // add events to file if (!l_cell) { - l_cell = dap_chain_cell_create(); - if (!l_cell) { - log_it(L_ERROR, "Insufficient memory"); - continue; - } - dap_chain_net_t *l_net = dap_chain_net_by_id(a_chain->net_id); - l_cell->chain = a_chain; - l_cell->id.uint64 = l_net ? l_net->pub.cell_id.uint64 : 0; - l_cell->file_storage_path = dap_strdup_printf("%0llx.dchaincell", l_cell->id.uint64); + dap_chain_cell_create_fill(a_chain, l_cell_id); } if (dap_chain_cell_file_append(l_cell, l_event, l_event_size ) < 0) { log_it(L_ERROR, "Can't add new event to the file '%s'", l_cell->file_storage_path); @@ -654,7 +656,7 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain } DAP_DELETE(l_hashes); if (l_cell) { - dap_chain_cell_delete(l_cell); + dap_chain_cell_close(l_cell); } dap_chain_global_db_objs_delete(l_events_round_new, l_events_round_new_size); return l_datum_processed; @@ -690,17 +692,19 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_verify(dap_chain_t * a_ dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG(a_chain); dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *) a_atom; dap_chain_atom_verify_res_t res = ATOM_ACCEPT; - + pthread_rwlock_t *l_events_rwlock = &PVT(l_dag)->events_rwlock; if(sizeof (l_event->header) >= a_atom_size){ log_it(L_WARNING,"Size of atom is %zd that is equal or less then header %zd",a_atom_size,sizeof (l_event->header)); return ATOM_REJECT; } // Hard accept list if (l_dag->hal) { - dap_chain_hash_fast_t l_event_hash; + dap_chain_hash_fast_t l_event_hash = { }; dap_chain_cs_dag_event_calc_hash(l_event,a_atom_size, &l_event_hash); dap_chain_cs_dag_hal_item_t *l_hash_found = NULL; + pthread_rwlock_rdlock(l_events_rwlock); HASH_FIND(hh, l_dag->hal, &l_event_hash, sizeof(l_event_hash), l_hash_found); + pthread_rwlock_unlock(l_events_rwlock); if (l_hash_found) { return ATOM_ACCEPT; } @@ -735,11 +739,15 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_verify(dap_chain_t * a_ //chain coherence if (! PVT(l_dag)->events ){ res = ATOM_MOVE_TO_THRESHOLD; - }else{ + log_it(L_DEBUG, "*** event %p goes to threshold", l_event); + } else { + log_it(L_DEBUG, "*** event %p hash count %d",l_event, l_event->header.hash_count); for (size_t i = 0; i< l_event->header.hash_count; i++) { dap_chain_hash_fast_t * l_hash = ((dap_chain_hash_fast_t *) l_event->hashes_n_datum_n_signs) + i; dap_chain_cs_dag_event_item_t * l_event_search = NULL; + pthread_rwlock_rdlock(l_events_rwlock); HASH_FIND(hh, PVT(l_dag)->events ,l_hash ,sizeof (*l_hash), l_event_search); + pthread_rwlock_unlock(l_events_rwlock); if ( l_event_search == NULL ){ char * l_hash_str = dap_chain_hash_fast_to_str_new(l_hash); if(s_debug_more) @@ -818,17 +826,16 @@ int dap_chain_cs_dag_event_verify_hashes_with_treshold(dap_chain_cs_dag_t * a_da //looks like an alternative genesis event return DAP_THRESHOLD_CONFLICTING; } - + dap_dag_threshold_verification_res_t ret; for (size_t i = 0; i< a_event->header.hash_count; i++) { dap_chain_hash_fast_t * l_hash = ((dap_chain_hash_fast_t *) a_event->hashes_n_datum_n_signs) + i; dap_chain_cs_dag_event_item_t * l_event_search = NULL; - HASH_FIND(hh, PVT(a_dag)->events_treshold_conflicted,l_hash ,sizeof (*l_hash), l_event_search); if ( l_event_search ){ //event is linked to event we consider conflicting - return DAP_THRESHOLD_CONFLICTING; + ret = DAP_THRESHOLD_CONFLICTING; + break; } - HASH_FIND(hh, PVT(a_dag)->events ,l_hash ,sizeof (*l_hash), l_event_search); if ( l_event_search == NULL ){ // If not found in events - search in treshhold l_is_events_main_hashes = false; @@ -839,13 +846,13 @@ int dap_chain_cs_dag_event_verify_hashes_with_treshold(dap_chain_cs_dag_t * a_da } } } - if( l_is_events_all_hashes && l_is_events_main_hashes ){ - return DAP_THRESHOLD_OK; - }else if ( ! l_is_events_all_hashes) { - return DAP_THRESHOLD_NO_HASHES; - }else { - return DAP_THRESHOLD_NO_HASHES_IN_MAIN; - } + if (ret == DAP_THRESHOLD_CONFLICTING) + return ret; + return l_is_events_all_hashes ? + l_is_events_main_hashes ? + DAP_THRESHOLD_OK : + DAP_THRESHOLD_NO_HASHES : + DAP_THRESHOLD_NO_HASHES_IN_MAIN; } /** @@ -858,9 +865,12 @@ dap_chain_cs_dag_event_item_t* dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t bool res = false; // TODO Process finish treshold. For now - easiest from possible dap_chain_cs_dag_event_item_t * l_event_item = NULL, * l_event_item_tmp = NULL; + pthread_rwlock_wrlock(&PVT(a_dag)->events_rwlock); + // !!! + int l_count = HASH_COUNT(PVT(a_dag)->events_treshold); + log_it(L_DEBUG, "*** %d events in threshold", l_count); HASH_ITER(hh,PVT(a_dag)->events_treshold,l_event_item, l_event_item_tmp){ - dap_chain_cs_dag_event_t * l_event = l_event_item->event; - dap_dag_threshold_verification_res_t ret = dap_chain_cs_dag_event_verify_hashes_with_treshold (a_dag,l_event); + dap_dag_threshold_verification_res_t ret = dap_chain_cs_dag_event_verify_hashes_with_treshold (a_dag, l_event_item->event); if ( ret == DAP_THRESHOLD_OK || ret == DAP_THRESHOLD_CONFLICTING ){ // All its hashes are in main table, move thats one too into it HASH_DEL(PVT(a_dag)->events_treshold,l_event_item); @@ -870,6 +880,8 @@ dap_chain_cs_dag_event_item_t* dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t log_it(L_DEBUG, "Processing event (threshold): %s...", l_event_hash_str); int l_add_res = s_dap_chain_add_atom_to_events_table(a_dag, a_ledger, l_event_item); + HASH_ADD(hh, PVT(a_dag)->events,hash,sizeof (l_event_item->hash), l_event_item); + s_dag_events_lasts_process_new_last_event(a_dag, l_event_item); if(! l_add_res){ if(s_debug_more) log_it(L_INFO, "... moved from treshold to main chains", l_event_hash_str); @@ -888,11 +900,8 @@ dap_chain_cs_dag_event_item_t* dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t } } - //return res; - if(res){ - return l_event_item; - } - return NULL; + pthread_rwlock_unlock(&PVT(a_dag)->events_rwlock); + return res ? l_event_item : NULL; } /** @@ -941,6 +950,10 @@ static dap_chain_atom_iter_t* s_chain_callback_atom_iter_create(dap_chain_t * a_ { dap_chain_atom_iter_t * l_atom_iter = DAP_NEW_Z(dap_chain_atom_iter_t); l_atom_iter->chain = a_chain; + pthread_rwlock_rdlock(&a_chain->atoms_rwlock); +#ifdef WIN32 + log_it(L_DEBUG, "! Create caller id %d", GetThreadId(GetCurrentThread())); +#endif return l_atom_iter; } @@ -1013,7 +1026,7 @@ static dap_chain_atom_ptr_t* s_chain_callback_atom_iter_get_lasts( dap_chain_ato if ( l_lasts_size > 0 ) { if( a_lasts_size) *a_lasts_size = l_lasts_size; - l_ret = DAP_NEW_Z_SIZE(dap_chain_atom_ptr_t, sizeof(dap_chain_atom_ptr_t *) * l_lasts_size); + l_ret = DAP_NEW_Z_SIZE(dap_chain_atom_ptr_t, sizeof(dap_chain_atom_ptr_t) * l_lasts_size); dap_chain_cs_dag_event_item_t * l_event_item = NULL, *l_event_item_tmp = NULL; size_t i = 0; *a_lasts_size_array = DAP_NEW_Z_SIZE(size_t, sizeof(size_t) * l_lasts_size); @@ -1046,7 +1059,7 @@ static dap_chain_atom_ptr_t* s_chain_callback_atom_iter_get_links( dap_chain_ato dap_chain_cs_dag_event_item_t * l_event_item = (dap_chain_cs_dag_event_item_t *) a_atom_iter->cur_item; if ( l_event->header.hash_count > 0){ dap_chain_atom_ptr_t * l_ret = DAP_NEW_Z_SIZE(dap_chain_atom_ptr_t, - sizeof (dap_chain_atom_ptr_t*) * l_event->header.hash_count ); + sizeof (dap_chain_atom_ptr_t) * l_event->header.hash_count ); if( a_links_size) *a_links_size = l_event->header.hash_count; *a_links_size_array = DAP_NEW_Z_SIZE(size_t, l_event->header.hash_count*sizeof (size_t)); @@ -1055,7 +1068,9 @@ static dap_chain_atom_ptr_t* s_chain_callback_atom_iter_get_links( dap_chain_ato dap_chain_hash_fast_t * l_link_hash = (dap_chain_hash_fast_t *) (l_event->hashes_n_datum_n_signs + i*sizeof(*l_link_hash)); + pthread_rwlock_rdlock(&PVT(l_dag)->events_rwlock); HASH_FIND(hh, PVT(l_dag)->events,l_link_hash,sizeof(*l_link_hash),l_link_item); + pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); if ( l_link_item ){ l_ret[i] = l_link_item->event; (*a_links_size_array)[i] = l_link_item->event_size; @@ -1089,7 +1104,9 @@ static dap_chain_atom_ptr_t s_chain_callback_atom_iter_find_by_hash(dap_chain_at { dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG( a_atom_iter->chain ); dap_chain_cs_dag_event_item_t * l_event_item = NULL; + pthread_rwlock_rdlock(&PVT(l_dag)->events_rwlock); HASH_FIND(hh, PVT(l_dag)->events,a_atom_hash,sizeof(*a_atom_hash),l_event_item); + pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); if ( l_event_item ){ a_atom_iter->cur_item = l_event_item; a_atom_iter->cur = l_event_item->event; @@ -1108,7 +1125,9 @@ static dap_chain_datum_tx_t* s_chain_callback_atom_iter_find_by_tx_hash(dap_chai { dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG( a_chain ); dap_chain_cs_dag_event_item_t * l_event_item = NULL; + pthread_rwlock_rdlock(&PVT(l_dag)->events_rwlock); HASH_FIND(hh, PVT(l_dag)->tx_events,a_atom_hash,sizeof(*a_atom_hash),l_event_item); + pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); if ( l_event_item ){ dap_chain_datum_t * l_datum = dap_chain_cs_dag_event_get_datum(l_event_item->event, l_event_item->event_size) ; return l_datum ? l_datum->header.data_size ? (dap_chain_datum_tx_t*) l_datum->data : NULL :NULL; @@ -1146,6 +1165,10 @@ static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get_next( dap_chain_atom_ */ static void s_chain_callback_atom_iter_delete(dap_chain_atom_iter_t * a_atom_iter ) { + pthread_rwlock_unlock(&a_atom_iter->chain->atoms_rwlock); +#ifdef WIN32 + log_it(L_DEBUG, "! Delete caller id %d", GetThreadId(GetCurrentThread())); +#endif DAP_DELETE(a_atom_iter); } @@ -1285,12 +1308,14 @@ static int s_cli_dag(int argc, char ** argv, void *arg_func, char **a_str_reply) } // write events to file and delete events from db if(l_list_to_del) { - dap_chain_cell_t *l_cell = dap_chain_cell_create(); + dap_chain_cell_id_t l_cell_id = { + .uint64 = l_net ? l_net->pub.cell_id.uint64 : 0 + }; + dap_chain_cell_t *l_cell = dap_chain_cell_find_by_id(l_chain, l_cell_id); + if (!l_cell) + l_cell = dap_chain_cell_create_fill(l_chain, l_cell_id); if(l_cell) { - l_cell->chain = l_chain; - l_cell->id.uint64 = l_net ? l_net->pub.cell_id.uint64 : 0; - l_cell->file_storage_path = dap_strdup_printf("%0llx.dchaincell", l_cell->id.uint64); - if(!dap_chain_cell_file_update(l_cell)) { + if(dap_chain_cell_file_update(l_cell) > 0) { // delete events from db dap_list_t *l_list_tmp = l_list_to_del; while(l_list_tmp) { @@ -1300,7 +1325,7 @@ static int s_cli_dag(int argc, char ** argv, void *arg_func, char **a_str_reply) } } } - dap_chain_cell_delete(l_cell); + dap_chain_cell_close(l_cell); dap_list_free(l_list_to_del); } @@ -1423,10 +1448,13 @@ static int s_cli_dag(int argc, char ** argv, void *arg_func, char **a_str_reply) ret = 0; }else { dap_chain_cs_dag_event_item_t * l_event_item = NULL; + pthread_rwlock_rdlock(&PVT(l_dag)->events_rwlock); HASH_FIND(hh,PVT(l_dag)->events,&l_event_hash,sizeof(l_event_hash),l_event_item); - + pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); if ( l_event_item ){ + pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock); HASH_DELETE(hh, PVT(l_dag)->events, l_event_item); + pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); if(!dap_strcmp(l_hash_out_type, "hex")) { log_it(L_WARNING, "Dropped event %s from chains! Hope you know what are you doing!", l_event_hash_hex_str); @@ -1473,7 +1501,9 @@ static int s_cli_dag(int argc, char ** argv, void *arg_func, char **a_str_reply) }else if ( strcmp(l_from_events_str,"events_lasts") == 0){ dap_chain_cs_dag_event_item_t * l_event_item = NULL; + pthread_rwlock_rdlock(&PVT(l_dag)->events_rwlock); HASH_FIND(hh,PVT(l_dag)->events_lasts_unlinked,&l_event_hash,sizeof(l_event_hash),l_event_item); + pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); if ( l_event_item ) l_event = l_event_item->event; else { @@ -1484,7 +1514,9 @@ static int s_cli_dag(int argc, char ** argv, void *arg_func, char **a_str_reply) } }else if ( strcmp(l_from_events_str,"events") == 0){ dap_chain_cs_dag_event_item_t * l_event_item = NULL; + pthread_rwlock_rdlock(&PVT(l_dag)->events_rwlock); HASH_FIND(hh,PVT(l_dag)->events,&l_event_hash,sizeof(l_event_hash),l_event_item); + pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); if ( l_event_item ) l_event = l_event_item->event; else { @@ -1607,12 +1639,11 @@ static int s_cli_dag(int argc, char ** argv, void *arg_func, char **a_str_reply) dap_string_free(l_str_tmp,false); }else if (l_from_events_str && (strcmp(l_from_events_str,"events") == 0) ){ dap_string_t * l_str_tmp = dap_string_new(NULL); + pthread_rwlock_rdlock(&PVT(l_dag)->events_rwlock); size_t l_events_count = HASH_COUNT(PVT(l_dag)->events); dap_string_append_printf(l_str_tmp,"%s.%s: Have %u events :\n", l_net->pub.name,l_chain->name,l_events_count); dap_chain_cs_dag_event_item_t * l_event_item = NULL,*l_event_item_tmp = NULL; - - pthread_rwlock_rdlock(&PVT(l_dag)->events_rwlock); HASH_ITER(hh,PVT(l_dag)->events,l_event_item, l_event_item_tmp ) { char buf[50]; char * l_event_item_hash_str = dap_chain_hash_fast_to_str_new( &l_event_item->hash); @@ -1622,7 +1653,6 @@ static int s_cli_dag(int argc, char ** argv, void *arg_func, char **a_str_reply) DAP_DELETE(l_event_item_hash_str); } pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); - dap_chain_node_cli_set_reply_text(a_str_reply, l_str_tmp->str); dap_string_free(l_str_tmp,false); -- GitLab