From 4995f114553ef83f278540e46358ea7b3443a02c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Al=D0=B5x=D0=B0nder=20Lysik=D0=BEv?= <alexander.lysikov@demlabs.net> Date: Fri, 25 Sep 2020 23:18:19 +0500 Subject: [PATCH] fixed save events from treshold to file --- modules/chain/include/dap_chain.h | 2 + modules/channel/chain/dap_stream_ch_chain.c | 43 ++++++++++--- modules/consensus/none/dap_chain_cs_none.c | 1 - modules/type/dag/dap_chain_cs_dag.c | 62 +++++++++++++++++-- modules/type/dag/include/dap_chain_cs_dag.h | 2 +- .../type/dag/include/dap_chain_cs_dag_event.h | 2 +- 6 files changed, 95 insertions(+), 17 deletions(-) diff --git a/modules/chain/include/dap_chain.h b/modules/chain/include/dap_chain.h index 86f4782d85..fc03b2f72d 100644 --- a/modules/chain/include/dap_chain.h +++ b/modules/chain/include/dap_chain.h @@ -64,6 +64,7 @@ typedef int (*dap_chain_callback_new_cfg_t)(dap_chain_t*, dap_config_t *); typedef void (*dap_chain_callback_ptr_t)(dap_chain_t *, void * ); typedef dap_chain_atom_verify_res_t (*dap_chain_callback_atom_t)(dap_chain_t *, dap_chain_atom_ptr_t, size_t ); +typedef dap_chain_atom_ptr_t (*dap_chain_callback_atom_form_treshold_t)(dap_chain_t *, size_t *); typedef dap_chain_atom_verify_res_t (*dap_chain_callback_atom_verify_t)(dap_chain_t *, dap_chain_atom_ptr_t , size_t); typedef size_t (*dap_chain_callback_atom_get_hdr_size_t)(void); @@ -119,6 +120,7 @@ typedef struct dap_chain{ dap_chain_callback_t callback_delete; dap_chain_callback_atom_t callback_atom_add; + dap_chain_callback_atom_form_treshold_t callback_atom_add_from_treshold; dap_chain_callback_atom_verify_t callback_atom_verify; dap_chain_datum_callback_datum_pool_proc_add_t callback_datums_pool_proc; diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index c38416f2ae..27e9d17d4c 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -232,16 +232,41 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) if(l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) { // append to file dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_ch_chain->request_cell_id); - // add one atom only - int l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size); - // rewrite all file - //l_res = dap_chain_cell_file_update(l_cell); - if(!l_cell || l_res < 0) { - log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash, - l_cell ? l_cell->file_storage_path : "[null]"); + if(l_cell){ + // add one atom only + int l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size); + // rewrite all file + //l_res = dap_chain_cell_file_update(l_cell); + if(l_res < 0) { + log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash, + l_cell ? l_cell->file_storage_path : "[null]"); + } + // add all atoms from treshold + if(l_res && l_chain->callback_atom_add_from_treshold){ + dap_chain_atom_ptr_t l_atom_treshold; + do{ + size_t l_atom_treshold_size; + // add into ledger + l_atom_treshold = l_chain->callback_atom_add_from_treshold(l_chain, &l_atom_treshold_size); + // add into file + if(l_atom_treshold) { + int l_res = dap_chain_cell_file_append(l_cell, l_atom_treshold, l_atom_treshold_size); + if(l_res < 0) { + log_it(L_ERROR, "Can't save event 0x%x from treshold to the file '%s'", + l_atom_treshold, l_cell ? l_cell->file_storage_path : "[null]"); + } + } + } + while(l_atom_treshold); + } + + // delete cell and close file + dap_chain_cell_delete(l_cell); + } + else{ + log_it(L_ERROR, "Can't get cell for cell_id 0x%x for save event to file", l_ch_chain->request_cell_id); + } - // delete cell and close file - dap_chain_cell_delete(l_cell); } if(l_atom_add_res == ATOM_PASS) DAP_DELETE(l_atom_copy); diff --git a/modules/consensus/none/dap_chain_cs_none.c b/modules/consensus/none/dap_chain_cs_none.c index f68ce988c1..82f54c1fcb 100644 --- a/modules/consensus/none/dap_chain_cs_none.c +++ b/modules/consensus/none/dap_chain_cs_none.c @@ -273,7 +273,6 @@ static int dap_chain_gdb_ledger_load(dap_chain_gdb_t *a_gdb, dap_chain_net_t *a_ // make list of datums for(size_t i = 0; i < l_data_size; i++) { s_chain_callback_atom_add(a_gdb->chain,data[i].value, data[i].value_len); - } dap_chain_global_db_objs_delete(data, l_data_size); return 0; diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 6385aacf84..cabeb2ae2b 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -78,8 +78,11 @@ typedef struct dap_chain_cs_dag_pvt { #define PVT(a) ((dap_chain_cs_dag_pvt_t *) a->_pvt ) +dap_chain_cs_dag_event_item_t* dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger); + // Atomic element organization callbacks static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t , size_t); // Accept new event in dag +static dap_chain_atom_ptr_t s_chain_callback_atom_add_from_treshold(dap_chain_t * a_chain, size_t *a_event_size_out); // Accept new event in dag from treshold static dap_chain_atom_verify_res_t s_chain_callback_atom_verify(dap_chain_t * a_chain, dap_chain_atom_ptr_t , size_t); // Verify new event in dag static size_t s_chain_callback_atom_get_static_hdr_size(void); // Get dag event header size @@ -171,6 +174,7 @@ int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) // Atom element callbacks a_chain->callback_atom_add = s_chain_callback_atom_add ; // Accept new element in chain + a_chain->callback_atom_add_from_treshold = s_chain_callback_atom_add_from_treshold; // Accept new elements in chain from treshold a_chain->callback_atom_verify = s_chain_callback_atom_verify ; // Verify new element in chain a_chain->callback_atom_get_hdr_static_size = s_chain_callback_atom_get_static_hdr_size; // Get dag event hdr size @@ -369,8 +373,8 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha ret = ATOM_REJECT; } } - - while(dap_chain_cs_dag_proc_treshold(l_dag, a_chain->ledger)); + // will added in callback s_chain_callback_atom_add_from_treshold() + //while(dap_chain_cs_dag_proc_treshold(l_dag, a_chain->ledger)); pthread_rwlock_unlock( l_events_rwlock ); if(ret == ATOM_PASS){ @@ -382,6 +386,29 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha return ret; } + +/** + * @brief s_chain_callback_atom_add_from_treshold Accept new event in dag + * @param a_chain DAG object + * @return true if added one item, otherwise false + */ +static dap_chain_atom_ptr_t s_chain_callback_atom_add_from_treshold(dap_chain_t * a_chain, size_t *a_event_size_out) +{ + dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG(a_chain); + pthread_rwlock_t * l_events_rwlock = &PVT(l_dag)->events_rwlock; + + pthread_rwlock_wrlock(l_events_rwlock); + dap_chain_cs_dag_event_item_t *l_item = dap_chain_cs_dag_proc_treshold(l_dag, a_chain->ledger); + pthread_rwlock_unlock(l_events_rwlock); +// dap_chain_cs_dag_event_calc_size() + if(l_item) { + if(a_event_size_out) + *a_event_size_out = l_item->event_size; + return l_item->event; + } + return NULL; +} + /** * @brief s_chain_callback_datums_add * @param a_chain @@ -500,6 +527,24 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain log_it(L_ERROR, "Can't add new event to the file '%s'", l_cell->file_storage_path); continue; } + // add all atoms from treshold + { + dap_chain_atom_ptr_t l_atom_treshold; + do { + size_t l_atom_treshold_size; + // add in ledger + l_atom_treshold = s_chain_callback_atom_add_from_treshold(a_chain, &l_atom_treshold_size); + // add into file + if(l_atom_treshold) { + int l_res = dap_chain_cell_file_append(l_cell, l_atom_treshold, l_atom_treshold_size); + if(l_res < 0) { + log_it(L_ERROR, "Can't save event 0x%x from treshold to the file '%s'", + l_atom_treshold, l_cell ? l_cell->file_storage_path : "[null]"); + } + } + } + while(l_atom_treshold); + } l_datum_processed++; } else { @@ -737,7 +782,7 @@ int dap_chain_cs_dag_event_verify_hashes_with_treshold(dap_chain_cs_dag_t * a_da * @param a_dag * @returns true if some atoms were moved from threshold to events */ -bool dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger) +dap_chain_cs_dag_event_item_t* dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger) { bool res = false; // TODO Process finish treshold. For now - easiest from possible @@ -755,18 +800,25 @@ bool dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a int l_add_res = s_dap_chain_add_atom_to_events_table(a_dag, a_ledger, l_event_item); if(! l_add_res){ log_it(L_DEBUG, "... added", l_event_hash_str); + DAP_DELETE(l_event_hash_str); + res = true; + break; }else{ log_it(L_DEBUG, "... error adding", l_event_hash_str); //todo: delete event } DAP_DELETE(l_event_hash_str); - res = true; + //res = true; }else if(ret == DAP_THRESHOLD_CONFLICTING) HASH_ADD(hh, PVT(a_dag)->events_treshold_conflicted, hash,sizeof (l_event_item->hash), l_event_item); } } - return res; + //return res; + if(res){ + return l_event_item; + } + return NULL; } /** diff --git a/modules/type/dag/include/dap_chain_cs_dag.h b/modules/type/dag/include/dap_chain_cs_dag.h index 6befe38ae3..fab0c32016 100644 --- a/modules/type/dag/include/dap_chain_cs_dag.h +++ b/modules/type/dag/include/dap_chain_cs_dag.h @@ -65,7 +65,7 @@ void dap_chain_cs_dag_deinit(void); int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg); void dap_chain_cs_dag_delete(dap_chain_t * a_chain); -bool dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger); +//dap_chain_cs_dag_event_item_t* dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger); void dap_chain_cs_dag_proc_event_round_new(dap_chain_cs_dag_t *a_dag); dap_chain_cs_dag_event_t* dap_chain_cs_dag_find_event_by_hash(dap_chain_cs_dag_t * a_dag, diff --git a/modules/type/dag/include/dap_chain_cs_dag_event.h b/modules/type/dag/include/dap_chain_cs_dag_event.h index 5f3661be72..02203ef787 100644 --- a/modules/type/dag/include/dap_chain_cs_dag_event.h +++ b/modules/type/dag/include/dap_chain_cs_dag_event.h @@ -74,7 +74,6 @@ dap_sign_t * dap_chain_cs_dag_event_get_sign( dap_chain_cs_dag_event_t * a_event * @param a_event * @return */ - /** static inline size_t dap_chain_cs_dag_event_calc_size(dap_chain_cs_dag_event_t * a_event) { @@ -96,6 +95,7 @@ static inline size_t dap_chain_cs_dag_event_calc_size(dap_chain_cs_dag_event_t * return sizeof( a_event->header ) + l_hashes_size +l_signs_offset +l_datum_size; } **/ + /** * @brief dap_chain_cs_dag_event_calc_size_excl_signs * @param a_event -- GitLab