diff --git a/modules/chain/dap_chain.c b/modules/chain/dap_chain.c index 33c758c2a4fa0a0b8e6925db3cd452f94690e68d..d2877bcc0a4b7966fd9628327f95a6b44962eeb3 100644 --- a/modules/chain/dap_chain.c +++ b/modules/chain/dap_chain.c @@ -336,10 +336,15 @@ dap_chain_t * dap_chain_load_from_cfg(dap_ledger_t* a_ledger, const char * a_cha if ( dap_config_get_item_str_default(l_cfg , "files","storage_dir",NULL ) ) { DAP_CHAIN_PVT ( l_chain)->file_storage_dir = strdup ( dap_config_get_item_str( l_cfg , "files","storage_dir" ) ) ; - if ( dap_chain_load_all( l_chain ) != 0 ){ + if (dap_chain_load_all(l_chain) == 0) { + if (l_chain->callback_atom_add_from_treshold) { + while (l_chain->callback_atom_add_from_treshold(l_chain, NULL)) { + log_it(L_DEBUG, "Added atom from treshold"); + } + } dap_chain_save_all( l_chain ); log_it (L_NOTICE, "Loaded chain files"); - }else { + } else { dap_chain_save_all( l_chain ); log_it (L_NOTICE, "Initialized chain files"); } diff --git a/modules/chain/dap_chain_cell.c b/modules/chain/dap_chain_cell.c index 20a31982a566d931fc801ba63cee952d8f1eedec..7102fb648ee35eed1eb99dfd59b2e29e6f40d7f2 100644 --- a/modules/chain/dap_chain_cell.c +++ b/modules/chain/dap_chain_cell.c @@ -22,7 +22,7 @@ along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. */ #include "uthash.h" - +#include <unistd.h> #include "dap_common.h" #include "dap_config.h" #include "dap_strfuncs.h" @@ -170,15 +170,15 @@ int dap_chain_cell_load(dap_chain_t * a_chain, const char * a_cell_file_path) } size_t l_el_size = 0; unsigned long q = 0; - volatile dap_chain_cell_t *l_dummy; - for (fread(&l_el_size, 1, sizeof(l_el_size), l_f); !feof(l_f); l_el_size = 0, fread(&l_el_size, 1, sizeof(l_el_size), l_f)) + volatile int l_dummy; + for (l_dummy = fread(&l_el_size, 1, sizeof(l_el_size), l_f); !feof(l_f); l_el_size = 0, l_dummy = fread(&l_el_size, 1, sizeof(l_el_size), l_f)) { if (!l_el_size) { log_it(L_ERROR, "Zero element size, chain %s is corrupted", l_file_path); ret = -4; break; } - dap_chain_atom_ptr_t l_element = DAP_NEW_Z_SIZE(dap_chain_atom_ptr_t, l_el_size); + dap_chain_atom_ptr_t l_element = DAP_NEW_SIZE(dap_chain_atom_ptr_t, l_el_size); if (!l_element) { log_it(L_ERROR, "Out of memory"); ret = -5; @@ -188,7 +188,6 @@ int dap_chain_cell_load(dap_chain_t * a_chain, const char * a_cell_file_path) if(l_read == l_el_size) { a_chain->callback_atom_add(a_chain, l_element, l_el_size); // !!! blocking GDB call !!! ++q; - DAP_DELETE(l_element); } else { log_it(L_ERROR, "Read only %zd of %zd bytes, stop cell loading", l_read, l_el_size); ret = -6; @@ -200,7 +199,7 @@ int dap_chain_cell_load(dap_chain_t * a_chain, const char * a_cell_file_path) log_it(L_INFO, "Couldn't load all atoms, %d only", q); } else { log_it(L_INFO, "Loaded all %d atoms in cell %s", q, a_cell_file_path); - l_dummy = dap_chain_cell_create_fill2(a_chain, a_cell_file_path); + dap_chain_cell_create_fill2(a_chain, a_cell_file_path); } fclose(l_f); return ret; @@ -263,7 +262,10 @@ int dap_chain_cell_file_append( dap_chain_cell_t * a_cell, const void* a_atom, s // if no atom provided in arguments, we flush all the atoms in given chain size_t l_atom_size = a_atom_size ? a_atom_size : 0; int l_total_wrote_bytes = 0; - dap_chain_atom_iter_t *l_atom_iter = a_atom ? a_cell->chain->callback_atom_iter_create(a_cell->chain) : NULL; + dap_chain_atom_iter_t *l_atom_iter = a_atom ? NULL : a_cell->chain->callback_atom_iter_create(a_cell->chain); + if (!a_atom) { + fseek(a_cell->file_storage, sizeof(dap_chain_cell_file_header_t), SEEK_SET); + } for (dap_chain_atom_ptr_t l_atom = a_atom ? (dap_chain_atom_ptr_t)a_atom : a_cell->chain->callback_atom_iter_get_first(l_atom_iter, &l_atom_size); l_atom; l_atom = a_atom ? NULL : a_cell->chain->callback_atom_iter_get_next(l_atom_iter, &l_atom_size)) @@ -293,6 +295,10 @@ int dap_chain_cell_file_append( dap_chain_cell_t * a_cell, const void* a_atom, s (void *)l_atom, l_atom_size); } + if (l_total_wrote_bytes > 0) { + fflush(a_cell->file_storage); + ftruncate(fileno(a_cell->file_storage), l_total_wrote_bytes + sizeof(dap_chain_cell_file_header_t)); + } if (l_atom_iter) { a_cell->chain->callback_atom_iter_delete(l_atom_iter); } diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 05a1e9c167fb08fab9f85acc468e653f8484a287..bb60eb3945cfff89f9f527b13e5c1f32ed78eaef 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -633,22 +633,22 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) log_it(L_WARNING, "In: GLOBAL_DB parse: packet in list with NULL data(pkt_data_size:%zd)", l_pkt_item->pkt_data_size); } - uint64_t l_last_id = 0; - char *l_last_group = NULL; - char l_last_type = '\0'; + uint64_t l_last_id = l_store_obj->id; + char *l_last_group = l_store_obj->group; + char l_last_type = l_store_obj->type; bool l_group_changed = false; for (size_t i = 0; i < l_data_obj_count; i++) { // obj to add dap_store_obj_t *l_obj = l_store_obj + i; - l_group_changed = l_last_group && (strcmp(l_last_group, l_obj->group) || l_last_type != l_obj->type); + l_group_changed = strcmp(l_last_group, l_obj->group) || l_last_type != l_obj->type; // Send remote side notification about received obj if (l_sync_request->request.node_addr.uint64 && (l_group_changed || i == l_data_obj_count - 1)) { struct sync_request *l_sync_req_tsd = DAP_DUP(l_sync_request); l_sync_req_tsd->request.id_end = l_last_id; - l_sync_req_tsd->gdb.sync_group = l_obj->type == 'a' ? dap_strdup(l_obj->group) : - dap_strdup_printf("%s.del", l_obj->group); + l_sync_req_tsd->gdb.sync_group = l_obj->type == 'a' ? dap_strdup(l_last_group) : + dap_strdup_printf("%s.del", l_last_group); dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_gdb_sync_tsd_worker_callback, l_sync_req_tsd); } @@ -838,6 +838,12 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_data_ptr += sizeof(uint64_t); char *l_group = (char *)l_data_ptr; dap_db_set_last_id_remote(l_node_addr, l_last_id, l_group); + if (s_debug_more) { + dap_chain_node_addr_t l_addr; + l_addr.uint64 = l_node_addr; + log_it(L_INFO, "Set last_id %"DAP_UINT64_FORMAT_U" for group %s for node "NODE_ADDR_FP_STR, + l_last_id, l_group, NODE_ADDR_FP_ARGS_S(l_addr)); + } } else if (s_debug_more) log_it(L_DEBUG, "Global DB TSD packet detected"); } break; @@ -873,12 +879,16 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) (size_t) (((byte_t*)l_element) - l_chain_pkt->data ) < l_chain_pkt_data_size; l_element++){ dap_stream_ch_chain_hash_item_t * l_hash_item = NULL; - HASH_FIND(hh,l_ch_chain->remote_gdbs, &l_element->hash, sizeof (l_element->hash), l_hash_item ); - if( ! l_hash_item ){ + unsigned l_hash_item_hashv; + HASH_VALUE(&l_element->hash, sizeof(l_element->hash), l_hash_item_hashv); + HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_gdbs, &l_element->hash, sizeof(l_element->hash), + l_hash_item_hashv, l_hash_item); + if (!l_hash_item) { l_hash_item = DAP_NEW(dap_stream_ch_chain_hash_item_t); memcpy(&l_hash_item->hash, &l_element->hash, sizeof (l_element->hash)); l_hash_item->size = l_element->size; - HASH_ADD(hh, l_ch_chain->remote_gdbs, hash, sizeof (l_hash_item->hash), l_hash_item); + HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_gdbs, hash, sizeof(l_hash_item->hash), + l_hash_item_hashv, l_hash_item); /*if (s_debug_more){ char l_hash_str[72]={ [0]='\0'}; dap_chain_hash_fast_to_str(&l_hash_item->hash,l_hash_str,sizeof (l_hash_str)); @@ -1077,12 +1087,16 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) (size_t) (((byte_t*)l_element) - l_chain_pkt->data ) < l_chain_pkt_data_size; l_element++){ dap_stream_ch_chain_hash_item_t * l_hash_item = NULL; - HASH_FIND(hh,l_ch_chain->remote_atoms , &l_element->hash, sizeof (l_element->hash), l_hash_item ); + unsigned l_hash_item_hashv; + HASH_VALUE(&l_element->hash, sizeof(l_element->hash), l_hash_item_hashv); + HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_atoms, &l_element->hash, sizeof(l_element->hash), + l_hash_item_hashv, l_hash_item); if( ! l_hash_item ){ l_hash_item = DAP_NEW(dap_stream_ch_chain_hash_item_t); memcpy(&l_hash_item->hash, &l_element->hash, sizeof (l_element->hash)); l_hash_item->size = l_element->size; - HASH_ADD(hh, l_ch_chain->remote_atoms, hash, sizeof (l_hash_item->hash), l_hash_item); + HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_atoms, hash, sizeof(l_hash_item->hash), + l_hash_item_hashv, l_hash_item); l_count_added++; /* if (s_debug_more){ @@ -1135,7 +1149,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_INFO, "In: SYNC_CHAINS pkt"); } struct sync_request *l_sync_request = dap_stream_ch_chain_create_sync_request(l_chain_pkt, a_ch); - l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS; l_ch_chain->stats_request_atoms_processed = 0; if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS) { char *l_hash_from_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_from); @@ -1314,6 +1327,7 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain) HASH_DEL(a_ch_chain->remote_atoms, l_hash_item); DAP_DELETE(l_hash_item); } + a_ch_chain->remote_atoms = a_ch_chain->remote_gdbs = NULL; } /** @@ -1374,7 +1388,10 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) if (!l_obj) break; dap_stream_ch_chain_hash_item_t *l_hash_item = NULL; - HASH_FIND(hh, l_ch_chain->remote_gdbs, &l_obj->hash, sizeof(dap_hash_fast_t), l_hash_item); + unsigned l_hash_item_hashv = 0; + HASH_VALUE(&l_obj->hash, sizeof(dap_chain_hash_fast_t), l_hash_item_hashv); + HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_gdbs, &l_obj->hash, sizeof(dap_hash_fast_t), + l_hash_item_hashv, l_hash_item); if (l_hash_item) { // If found - skip it /*if (s_debug_more) { char l_request_atom_hash_str[DAP_CHAIN_HASH_FAST_STR_SIZE]; @@ -1387,7 +1404,8 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_hash_item = DAP_NEW(dap_stream_ch_chain_hash_item_t); memcpy(&l_hash_item->hash, &l_obj->hash, sizeof(dap_chain_hash_fast_t)); l_hash_item->size = l_obj->pkt->data_size; - HASH_ADD(hh, l_ch_chain->remote_gdbs, hash, sizeof(dap_chain_hash_fast_t), l_hash_item); + HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_gdbs, hash, sizeof(dap_chain_hash_fast_t), + l_hash_item_hashv, l_hash_item); l_pkt = dap_store_packet_multiple(l_pkt, l_obj->pkt); l_ch_chain->stats_request_gdb_processed++; l_pkt_size = sizeof(dap_store_obj_pkt_t) + l_pkt->data_size; @@ -1470,8 +1488,11 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_atom_iter && l_ch_chain->request_atom_iter->cur; k++){ // Check if present and skip if present - dap_stream_ch_chain_hash_item_t * l_hash_item = NULL; - HASH_FIND(hh,l_ch_chain->remote_atoms, l_ch_chain->request_atom_iter->cur_hash , sizeof (l_hash_item->hash), l_hash_item ); + dap_stream_ch_chain_hash_item_t *l_hash_item = NULL; + unsigned l_hash_item_hashv = 0; + HASH_VALUE(l_ch_chain->request_atom_iter->cur_hash, sizeof(dap_chain_hash_fast_t), l_hash_item_hashv); + HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_atoms, l_ch_chain->request_atom_iter->cur_hash, + sizeof(dap_chain_hash_fast_t), l_hash_item_hashv, l_hash_item); if( l_hash_item ){ // If found - skip it if(s_debug_more){ char l_request_atom_hash_str[81]={[0]='\0'}; @@ -1480,9 +1501,8 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_request_atom_hash_str); } }else{ - l_hash_item = DAP_NEW_Z(dap_stream_ch_chain_hash_item_t); - dap_hash_fast(l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size, - &l_hash_item->hash); + l_hash_item = DAP_NEW(dap_stream_ch_chain_hash_item_t); + memcpy(&l_hash_item->hash, l_ch_chain->request_atom_iter->cur_hash, sizeof(dap_chain_hash_fast_t)); if(s_debug_more){ char *l_atom_hash_str= dap_chain_hash_fast_to_str_new(&l_hash_item->hash); log_it(L_INFO, "Out CHAIN pkt: atom hash %s (size %zd) ", l_atom_hash_str, l_ch_chain->request_atom_iter->cur_size); @@ -1495,21 +1515,9 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->stats_request_atoms_processed++; l_hash_item->size = l_ch_chain->request_atom_iter->cur_size; - - unsigned l_hash_item_hashv =0; - dap_stream_ch_chain_hash_item_t *l_hash_item_check = NULL; - - HASH_VALUE(&l_hash_item->hash ,sizeof (l_hash_item->hash), - l_hash_item_hashv); - HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_atoms,&l_hash_item->hash ,sizeof (l_hash_item->hash), - l_hash_item_hashv, l_hash_item_check); - if (l_hash_item_check ==NULL ){ - // Because we sent this atom to remote - we record it to not to send it twice - HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_atoms, hash, sizeof (l_hash_item->hash),l_hash_item_hashv, - l_hash_item); - }else - DAP_DELETE(l_hash_item); - + // Because we sent this atom to remote - we record it to not to send it twice + HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_atoms, hash, sizeof(l_hash_item->hash), l_hash_item_hashv, + l_hash_item); } // Then get next atom and populate new last l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); diff --git a/modules/global-db/dap_chain_global_db_hist.c b/modules/global-db/dap_chain_global_db_hist.c index a362e52f6bd972ba74140658f73347d44b113b9a..66a46cca55fa61f821a68b7150af9448263f9635 100644 --- a/modules/global-db/dap_chain_global_db_hist.c +++ b/modules/global-db/dap_chain_global_db_hist.c @@ -12,7 +12,7 @@ #include "dap_chain_global_db_hist.h" #include "uthash.h" -#define GDB_SYNC_ALWAYS_FROM_ZERO +//#define GDB_SYNC_ALWAYS_FROM_ZERO // for dap_db_history() typedef struct dap_tx_data{ diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index b0872587527a162f6a978f55070f89969df67c60..d26336f27840ef42ec10071eb3c8982134b93820 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -376,9 +376,9 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha l_event_item->event_size = a_atom_size; l_event_item->ts_added = time(NULL); - dap_hash_fast(l_event, a_atom_size,&l_event_item->hash ); dap_chain_hash_fast_t l_event_hash; - dap_chain_cs_dag_event_calc_hash(l_event, a_atom_size,&l_event_hash); + dap_chain_cs_dag_event_calc_hash(l_event, a_atom_size, &l_event_hash); + memcpy(&l_event_item->hash, &l_event_hash, sizeof(dap_chain_hash_fast_t)); char * l_event_hash_str; if(s_debug_more) { @@ -439,7 +439,8 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha ret = ATOM_MOVE_TO_THRESHOLD; break; default: - log_it(L_WARNING, "Atom %s (size %zd) error adding (code %d)", l_event_hash_str,a_atom_size, l_consensus_check); + l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_item->hash); + log_it(L_WARNING, "Atom %s (size %zd) error adding (code %d)", l_event_hash_str, a_atom_size, l_consensus_check); ret = ATOM_REJECT; break; } @@ -730,7 +731,9 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_verify(dap_chain_t * a_ DAP_DELETE(l_event_hash_str); DAP_DELETE(l_genesis_event_hash_str); return ATOM_REJECT; - }else{ + } else { + if (s_debug_more) + log_it(L_INFO, "Accepting static genesis event"); return ATOM_ACCEPT; } } @@ -739,9 +742,9 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_verify(dap_chain_t * a_ //chain coherence if (! PVT(l_dag)->events ){ res = ATOM_MOVE_TO_THRESHOLD; - log_it(L_DEBUG, "*** event %p goes to threshold", l_event); + //log_it(L_DEBUG, "*** event %p goes to threshold", l_event); } else { - log_it(L_DEBUG, "*** event %p hash count %d",l_event, l_event->header.hash_count); + //log_it(L_DEBUG, "*** event %p hash count %d",l_event, l_event->header.hash_count); for (size_t i = 0; i< l_event->header.hash_count; i++) { dap_chain_hash_fast_t * l_hash = ((dap_chain_hash_fast_t *) l_event->hashes_n_datum_n_signs) + i; dap_chain_cs_dag_event_item_t * l_event_search = NULL;