diff --git a/modules/chain/include/dap_chain.h b/modules/chain/include/dap_chain.h index 96694458347514411f164876956119eb02eeb6a0..ed25e99e7a4e96db0417ecefa426eb8c4e18ae85 100644 --- a/modules/chain/include/dap_chain.h +++ b/modules/chain/include/dap_chain.h @@ -49,6 +49,12 @@ typedef struct dap_chain_atom_iter{ void * _inheritor; } dap_chain_atom_iter_t; +typedef enum dap_chain_atom_verify_res{ + ATOM_ACCEPT, + ATOM_PASS, + ATOM_REJECT, + ATOM_MOVE_TO_THRESHOLD +} dap_chain_atom_verify_res_t; typedef dap_chain_t* (*dap_chain_callback_new_t)(void); @@ -56,7 +62,8 @@ typedef void (*dap_chain_callback_t)(dap_chain_t *); typedef int (*dap_chain_callback_new_cfg_t)(dap_chain_t*, dap_config_t *); typedef void (*dap_chain_callback_ptr_t)(dap_chain_t *, void * ); -typedef int (*dap_chain_callback_atom_t)(dap_chain_t *, dap_chain_atom_ptr_t ); +typedef dap_chain_atom_verify_res_t (*dap_chain_callback_atom_t)(dap_chain_t *, dap_chain_atom_ptr_t ); +typedef dap_chain_atom_verify_res_t (*dap_chain_callback_atom_verify_t)(dap_chain_t *, dap_chain_atom_ptr_t ); typedef int (*dap_chain_callback_atom_size_t)(dap_chain_t *, dap_chain_atom_ptr_t ,size_t); typedef size_t (*dap_chain_callback_atom_get_hdr_size_t)(void); typedef size_t (*dap_chain_callback_atom_hdr_get_size_t)(dap_chain_atom_ptr_t ); @@ -112,7 +119,7 @@ typedef struct dap_chain{ dap_chain_callback_t callback_delete; dap_chain_callback_atom_t callback_atom_add; - dap_chain_callback_atom_t callback_atom_verify; + dap_chain_callback_atom_verify_t callback_atom_verify; dap_chain_datum_callback_datum_pool_proc_add_t callback_datums_pool_proc; dap_chain_datum_callback_datum_pool_proc_add_with_group_t callback_datums_pool_proc_with_group; diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 7d18a4d996e72920f61074bcba5f4eb585816ffb..92bd61bb91ee01e28937779ac9abf2ddb295668f 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -355,9 +355,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if(l_chain_pkt_data_size > 0) { dap_chain_atom_ptr_t l_atom_copy = DAP_CALLOC(1, l_chain_pkt_data_size); memcpy(l_atom_copy, l_chain_pkt->data, l_chain_pkt_data_size); - if(l_chain->callback_atom_add(l_chain, l_atom_copy) == 0 && - dap_chain_has_file_store(l_chain)) { - /* TODO: if atom was ignored (i.e. it's bad or already exists) memory is leaked */ + dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy); + if(l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) { // append to file dap_chain_cell_id_t l_cell_id; l_cell_id.uint64 = l_chain_pkt->hdr.cell_id.uint64; @@ -373,6 +372,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) // delete cell and close file dap_chain_cell_delete(l_cell); } + if(l_atom_add_res == ATOM_PASS) + DAP_DELETE(l_atom_copy); } else { log_it(L_WARNING, "Empty chain packet"); dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, diff --git a/modules/consensus/dag-pos/dap_chain_cs_dag_pos.c b/modules/consensus/dag-pos/dap_chain_cs_dag_pos.c index a9bc51329619874adb7c770098624a7646ed7a93..44be5c56f1fe14ee13efcf88f766a14fbe4d1a48 100644 --- a/modules/consensus/dag-pos/dap_chain_cs_dag_pos.c +++ b/modules/consensus/dag-pos/dap_chain_cs_dag_pos.c @@ -273,7 +273,7 @@ static int s_callback_event_verify(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_ } if (! l_is_enough_balance ){ char *l_addr_str = dap_chain_addr_to_str(&l_addr); - log_it(L_WARNING, "Verify of event is false, because bal=0 for addr=%s", l_addr_str); + log_it(L_WARNING, "Verify of event is false, because bal is not enough for addr=%s", l_addr_str); DAP_DELETE(l_addr_str); return 0; //-1; } diff --git a/modules/consensus/none/dap_chain_cs_none.c b/modules/consensus/none/dap_chain_cs_none.c index 5626597d1e2fb19c3a85424fb4179f448dd6ada8..59af3853fdb8f111e211d8e975e235949373d166 100644 --- a/modules/consensus/none/dap_chain_cs_none.c +++ b/modules/consensus/none/dap_chain_cs_none.c @@ -68,8 +68,8 @@ typedef struct dap_chain_gdb_private static int dap_chain_gdb_ledger_load(dap_chain_gdb_t *a_gdb, dap_chain_net_t *a_net); // Atomic element organization callbacks -static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t); // Accept new event in gdb -static int s_chain_callback_atom_verify(dap_chain_t * a_chain, dap_chain_atom_ptr_t); // Verify new event in gdb +static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t); // Accept new event in gdb +static dap_chain_atom_verify_res_t s_chain_callback_atom_verify(dap_chain_t * a_chain, dap_chain_atom_ptr_t); // Verify new event in gdb static size_t s_chain_callback_atom_hdr_get_size(dap_chain_atom_ptr_t); // Get gdb event size static size_t s_chain_callback_atom_get_static_hdr_size(void); // Get gdb event header size @@ -322,7 +322,7 @@ static size_t s_chain_callback_datums_pool_proc_with_group(dap_chain_t * a_chain * @param a_datums * @param a_datums_size */ -static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t a_atom) +static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t a_atom) { dap_chain_gdb_t * l_gdb = DAP_CHAIN_GDB(a_chain); dap_chain_gdb_private_t *l_gdb_priv = PVT(l_gdb); @@ -344,11 +344,11 @@ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t // don't save bad transactions to base if(dap_chain_ledger_tx_add(a_chain->ledger, l_tx) != 1) - return -1; + return ATOM_REJECT; //}else // return -2; }break; - default: return -1; + default: return ATOM_REJECT; } dap_chain_gdb_datum_hash_item_t * l_hash_item = DAP_NEW_Z(dap_chain_gdb_datum_hash_item_t); @@ -361,7 +361,7 @@ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t log_it(L_DEBUG,"Load mode, doesnt save item %s:%s", l_hash_item->key, l_gdb_priv->group_datums); DL_APPEND(l_gdb_priv->hash_items, l_hash_item); - return 0; + return ATOM_ACCEPT; } /** @@ -370,11 +370,11 @@ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t * @param a_atom * @return */ -static int s_chain_callback_atom_verify(dap_chain_t * a_chain, dap_chain_atom_ptr_t a_atom) +static dap_chain_atom_verify_res_t s_chain_callback_atom_verify(dap_chain_t * a_chain, dap_chain_atom_ptr_t a_atom) { (void) a_chain; (void) a_atom; - return 0; + return ATOM_ACCEPT; } /** diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 3d76887d71005e5f28bd56e7187fe25d45d2855c..48afa1e8855c846f51d948e5d11639dd6acde486 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -77,8 +77,8 @@ typedef struct dap_chain_cs_dag_pvt { #define PVT(a) ((dap_chain_cs_dag_pvt_t *) a->_pvt ) // Atomic element organization callbacks -static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t ); // Accept new event in dag -static int s_chain_callback_atom_verify(dap_chain_t * a_chain, dap_chain_atom_ptr_t ); // Verify new event in dag +static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t ); // Accept new event in dag +static dap_chain_atom_verify_res_t s_chain_callback_atom_verify(dap_chain_t * a_chain, dap_chain_atom_ptr_t ); // Verify new event in dag static size_t s_chain_callback_atom_hdr_get_size(dap_chain_atom_ptr_t ); // Get dag event size static size_t s_chain_callback_atom_get_static_hdr_size(void); // Get dag event header size @@ -281,14 +281,29 @@ static int s_dap_chain_add_atom_to_ledger(dap_chain_cs_dag_t * a_dag, dap_ledger } static int s_dap_chain_add_atom_to_events_table(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger, dap_chain_cs_dag_event_item_t * a_event_item ){ - HASH_ADD(hh, PVT(a_dag)->events,hash,sizeof (a_event_item->hash), a_event_item); - s_dag_events_lasts_process_new_last_event(a_dag, a_event_item); - int res = a_dag->callback_cs_verify(a_dag,a_event_item->event); - if(res == 0) + if(res == 0){ res = s_dap_chain_add_atom_to_ledger(a_dag, a_ledger, a_event_item); + HASH_ADD(hh, PVT(a_dag)->events,hash,sizeof (a_event_item->hash), a_event_item); + s_dag_events_lasts_process_new_last_event(a_dag, a_event_item); + } + + return res; +} + +static bool s_dap_chain_check_if_event_is_present(dap_chain_cs_dag_event_item_t * a_hash_table, const dap_chain_hash_fast_t * hash){ + bool res = false; + dap_chain_cs_dag_event_item_t * l_event_search = NULL; + + if(!a_hash_table) + return false; + + HASH_FIND(hh, a_hash_table, hash, sizeof(*hash), l_event_search); + if ( l_event_search ) + res = true; + return res; } @@ -298,51 +313,62 @@ static int s_dap_chain_add_atom_to_events_table(dap_chain_cs_dag_t * a_dag, dap_ * @param a_atom * @return 0 if verified and added well, otherwise if not */ -static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t a_atom) +static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t a_atom) { - bool l_add_to_threshold = false; - int ret = s_chain_callback_atom_verify (a_chain, a_atom); - if ( ret < 0 ){ - log_it(L_WARNING,"Wrong event, can't accept, verification returned %d",ret); - return -1; - }else if( ret > 0){ - l_add_to_threshold = true; - } - + dap_chain_atom_verify_res_t ret = ATOM_ACCEPT; dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG(a_chain); dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *) a_atom; dap_chain_cs_dag_event_item_t * l_event_item = DAP_NEW_Z(dap_chain_cs_dag_event_item_t); + pthread_rwlock_t * l_events_rwlock = &PVT(l_dag)->events_rwlock ; l_event_item->event = l_event; l_event_item->ts_added = time(NULL); + dap_hash_fast(l_event, dap_chain_cs_dag_event_calc_size(l_event),&l_event_item->hash ); + dap_chain_hash_fast_t l_event_hash; + dap_chain_cs_dag_event_calc_hash(l_event,&l_event_hash); + + char * l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_item->hash); + log_it(L_DEBUG, "Processing event: %s...", l_event_hash_str); - // Put in main table or in the treshhold if not all the rest linked event are present - dap_chain_cs_dag_event_item_t * l_event_search = NULL; - dap_chain_cs_dag_event_item_t * l_events =( l_add_to_threshold )? PVT(l_dag)->events_treshold : PVT(l_dag)->events ; - pthread_rwlock_t * l_events_rwlock = &PVT(l_dag)->events_rwlock ; pthread_rwlock_wrlock( l_events_rwlock ); - HASH_FIND(hh, l_events,&l_event_item->hash,sizeof (l_event_search->hash), l_event_search); - if ( l_event_search ) { - pthread_rwlock_unlock( l_events_rwlock ); - char * l_hash_str = dap_chain_hash_fast_to_str_new(&l_event_item->hash); - log_it(L_ERROR, "Dag event %s is already present in dag",l_hash_str); - DAP_DELETE(l_event_item); - DAP_DELETE(l_hash_str); - return -3; + + // check if we already have this event + if(s_dap_chain_check_if_event_is_present(PVT(l_dag)->events, &l_event_item->hash)){ + ret = ATOM_PASS; + log_it(L_DEBUG, "... already present in events"); + }else if(s_dap_chain_check_if_event_is_present(PVT(l_dag)->events_treshold, &l_event_item->hash)){ + ret = ATOM_PASS; + log_it(L_DEBUG, "... already present in threshold"); } - int res = 0; - if(l_add_to_threshold){ + // verify hashes and consensus + if(ret == ATOM_ACCEPT) + ret = s_chain_callback_atom_verify (a_chain, a_atom); + + if( ret == ATOM_MOVE_TO_THRESHOLD){ HASH_ADD(hh, PVT(l_dag)->events_treshold,hash,sizeof (l_event_item->hash), l_event_item); - }else{ - res = s_dap_chain_add_atom_to_events_table(l_dag, a_chain->ledger, l_event_item); + log_it(L_DEBUG, "... added to threshold"); + }else if( ret == ATOM_ACCEPT){ + int l_consensus_check = s_dap_chain_add_atom_to_events_table(l_dag, a_chain->ledger, l_event_item); + if(!l_consensus_check){ + log_it(L_DEBUG, "... added"); + }else{ + log_it(L_DEBUG, "... error adding"); + ret = ATOM_REJECT; + } } while(dap_chain_cs_dag_proc_treshold(l_dag, a_chain->ledger)); pthread_rwlock_unlock( l_events_rwlock ); - return res; + if(ret == ATOM_PASS){ + DAP_DELETE(l_event_item); + } + + DAP_DELETE(l_event_hash_str); + + return ret; } /** @@ -542,14 +568,16 @@ dap_chain_cs_dag_event_t* dap_chain_cs_dag_find_event_by_hash(dap_chain_cs_dag_t * @param a_atom * @return */ -static int s_chain_callback_atom_verify(dap_chain_t * a_chain, dap_chain_atom_ptr_t a_atom) +static dap_chain_atom_verify_res_t s_chain_callback_atom_verify(dap_chain_t * a_chain, dap_chain_atom_ptr_t a_atom) { dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG(a_chain); dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *) a_atom; + dap_chain_atom_verify_res_t res = ATOM_ACCEPT; + + // genesis or seed mode if (l_event->header.hash_count == 0){ if(s_seed_mode && !PVT(l_dag)->events) - //starting a new network and this is a genesis event - return 0; + return ATOM_ACCEPT; if (l_dag->is_static_genesis_event ){ dap_chain_hash_fast_t l_event_hash; @@ -561,36 +589,37 @@ static int s_chain_callback_atom_verify(dap_chain_t * a_chain, dap_chain_atom_pt log_it(L_WARNING, "Wrong genesis block %s (staticly predefined %s)",l_event_hash_str, l_genesis_event_hash_str); DAP_DELETE(l_event_hash_str); DAP_DELETE(l_genesis_event_hash_str); - return -22; + return ATOM_REJECT; + }else{ + return ATOM_ACCEPT; } - return 0; } } - int ret = l_dag->callback_cs_verify ( l_dag, l_event ); - if (ret == 0 ){ - if ( PVT(l_dag)->events ){ - for (size_t i = 0; i< l_event->header.hash_count; i++) { - dap_chain_hash_fast_t * l_hash = ((dap_chain_hash_fast_t *) l_event->hashes_n_datum_n_signs) + i; - dap_chain_cs_dag_event_item_t * l_event_search = NULL; - HASH_FIND(hh, PVT(l_dag)->events ,l_hash ,sizeof (*l_hash), l_event_search); - if ( l_event_search == NULL ){ - char * l_hash_str = dap_chain_hash_fast_to_str_new(l_hash); - log_it(L_DEBUG, "Hash %s wasn't in hashtable of previously parsed", l_hash_str); - DAP_DELETE(l_hash_str); - return 1; - } + //chain coherence + if (! PVT(l_dag)->events ){ + res = ATOM_MOVE_TO_THRESHOLD; + }else{ + for (size_t i = 0; i< l_event->header.hash_count; i++) { + dap_chain_hash_fast_t * l_hash = ((dap_chain_hash_fast_t *) l_event->hashes_n_datum_n_signs) + i; + dap_chain_cs_dag_event_item_t * l_event_search = NULL; + HASH_FIND(hh, PVT(l_dag)->events ,l_hash ,sizeof (*l_hash), l_event_search); + if ( l_event_search == NULL ){ + char * l_hash_str = dap_chain_hash_fast_to_str_new(l_hash); + log_it(L_INFO, "Hash %s wasn't in hashtable of previously parsed", l_hash_str); + DAP_DELETE(l_hash_str); + res = ATOM_MOVE_TO_THRESHOLD; + break; } - return 0; - }else{ - //event looks fine but we have no hash table yet and can't verify it's hashes - //so it goes into threshold - return 1; } - - }else { - return ret; } + + //consensus + if(res == ATOM_ACCEPT) + if(l_dag->callback_cs_verify ( l_dag, l_event )) + res = ATOM_REJECT; + + return res; } /** @@ -698,7 +727,17 @@ bool dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a HASH_DEL(PVT(a_dag)->events_treshold,l_event_item); if(ret == DAP_THRESHOLD_OK){ - s_dap_chain_add_atom_to_events_table(a_dag, a_ledger, l_event_item); + char * l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_item->hash); + log_it(L_DEBUG, "Processing event (threshold): %s...", l_event_hash_str); + + int l_add_res = s_dap_chain_add_atom_to_events_table(a_dag, a_ledger, l_event_item); + if(! l_add_res){ + log_it(L_DEBUG, "... added", l_event_hash_str); + }else{ + log_it(L_DEBUG, "... error adding", l_event_hash_str); + //todo: delete event + } + DAP_DELETE(l_event_hash_str); res = true; }else if(ret == DAP_THRESHOLD_CONFLICTING) HASH_ADD(hh, PVT(a_dag)->events_treshold_conflicted, hash,sizeof (l_event_item->hash), l_event_item);