diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index ec046f6a602dccff81b96547f607339943f03fd8..cb1ef077b3937f150dbdc3594f9a4141ee0368cc 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -244,6 +244,40 @@ void dap_chain_cs_dag_delete(dap_chain_t * a_chain) DAP_DELETE(l_dag->_pvt); } +int dap_chain_add_to_ledger(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger, dap_chain_cs_dag_event_item_t * a_event_item){ + + dap_chain_datum_t *l_datum = (dap_chain_datum_t*) dap_chain_cs_dag_event_get_datum(a_event_item->event); + switch (l_datum->header.type_id) { + case DAP_CHAIN_DATUM_TOKEN_DECL: { + dap_chain_datum_token_t *l_token = (dap_chain_datum_token_t*) l_datum->data; + dap_chain_ledger_token_add(a_ledger, l_token, l_datum->header.data_size); + } + break; + case DAP_CHAIN_DATUM_TOKEN_EMISSION: { + dap_chain_datum_token_emission_t *l_token_emission = (dap_chain_datum_token_emission_t*) l_datum->data; + dap_chain_ledger_token_emission_add(a_ledger, l_token_emission, l_datum->header.data_size); + } + break; + case DAP_CHAIN_DATUM_TX: { + dap_chain_datum_tx_t *l_tx = (dap_chain_datum_tx_t*) l_datum->data; + dap_chain_cs_dag_event_item_t * l_tx_event= DAP_NEW_Z(dap_chain_cs_dag_event_item_t); + l_tx_event->ts_added = a_event_item->ts_added; + l_tx_event->event = a_event_item->event; + memcpy(&l_tx_event->hash, &a_event_item->hash, sizeof (l_tx_event->hash) ); + + HASH_ADD(hh,PVT(a_dag)->tx_events,hash,sizeof (l_tx_event->hash),l_tx_event); + + // don't save bad transactions to base + if(dap_chain_ledger_tx_add(a_ledger, l_tx) != 1) { + return -1; + } + } + break; + default: + return -1; + } +} + /** * @brief s_chain_callback_atom_add Accept new event in dag * @param a_chain DAG object @@ -252,20 +286,18 @@ void dap_chain_cs_dag_delete(dap_chain_t * a_chain) */ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t a_atom) { + bool l_add_to_threshold = false; int ret = s_chain_callback_atom_verify (a_chain, a_atom); if ( ret < 0 ){ log_it(L_WARNING,"Wrong event, can't accept, verification returned %d",ret); return -1; + }else if( ret > 0){ + l_add_to_threshold = true; } + dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG(a_chain); dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *) a_atom; - // verification was already in s_chain_callback_atom_verify() - int ret_cs = l_dag->callback_cs_verify(l_dag,l_event); - if ( ret_cs != 0 ){ - log_it(L_WARNING,"Consensus can't accept the event, verification returned %d",ret_cs); - return -2; - } dap_chain_cs_dag_event_item_t * l_event_item = DAP_NEW_Z(dap_chain_cs_dag_event_item_t); l_event_item->event = l_event; l_event_item->ts_added = time(NULL); @@ -273,7 +305,7 @@ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t // Put in main table or in the treshhold if not all the rest linked event are present dap_chain_cs_dag_event_item_t * l_event_search = NULL; - dap_chain_cs_dag_event_item_t * l_events =( (ret==0 && ret_cs == 0)? PVT(l_dag)->events : PVT(l_dag)->events_treshold ); + dap_chain_cs_dag_event_item_t * l_events =( l_add_to_threshold )? PVT(l_dag)->events_treshold : PVT(l_dag)->events ; pthread_rwlock_t * l_events_rwlock = &PVT(l_dag)->events_rwlock ; pthread_rwlock_wrlock( l_events_rwlock ); HASH_FIND(hh, l_events,&l_event_item->hash,sizeof (l_event_search->hash), l_event_search); @@ -287,10 +319,11 @@ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t } HASH_ADD(hh, l_events,hash,sizeof (l_event_item->hash), l_event_item); // save l_events to dag_pvt - if(ret==0 && ret_cs == 0) - PVT(l_dag)->events = l_events; - else + if(l_add_to_threshold) PVT(l_dag)->events_treshold = l_events; + else + PVT(l_dag)->events = l_events; + //HASH_ADD(hh, PVT(l_dag)->events_treshold, hash, sizeof(l_event_item->hash), l_event_item); pthread_rwlock_unlock( l_events_rwlock ); if ( l_events == PVT(l_dag)->events){ @@ -320,47 +353,23 @@ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); } - // add datum from event to ledger - dap_chain_datum_t *l_datum = (dap_chain_datum_t*) dap_chain_cs_dag_event_get_datum(l_event); - switch (l_datum->header.type_id) { - case DAP_CHAIN_DATUM_TOKEN_DECL: { - dap_chain_datum_token_t *l_token = (dap_chain_datum_token_t*) l_datum->data; - dap_chain_ledger_token_add(a_chain->ledger, l_token, l_datum->header.data_size); - } - break; - case DAP_CHAIN_DATUM_TOKEN_EMISSION: { - dap_chain_datum_token_emission_t *l_token_emission = (dap_chain_datum_token_emission_t*) l_datum->data; - dap_chain_ledger_token_emission_add(a_chain->ledger, l_token_emission, l_datum->header.data_size); - } - break; - case DAP_CHAIN_DATUM_TX: { - dap_chain_datum_tx_t *l_tx = (dap_chain_datum_tx_t*) l_datum->data; - dap_chain_cs_dag_event_item_t * l_tx_event= DAP_NEW_Z(dap_chain_cs_dag_event_item_t); - l_tx_event->ts_added = l_event_item->ts_added; - l_tx_event->event = l_event; - memcpy(&l_tx_event->hash, &l_event_item->hash, sizeof (l_tx_event->hash) ); - pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock); - HASH_ADD(hh,PVT(l_dag)->tx_events,hash,sizeof (l_tx_event->hash),l_tx_event); - pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); - - //if ( !l_gdb_priv->is_load_mode ) // If its not load module but mempool proc - // l_tx->header.ts_created = time(NULL); - //if(dap_chain_datum_tx_get_size(l_tx) == l_datum->header.data_size){ + if(!l_add_to_threshold){ + int ret_cs = l_dag->callback_cs_verify(l_dag,l_event); + if ( ret_cs != 0 ){ + log_it(L_WARNING,"Consensus can't accept the event, verification returned %d",ret_cs); + return -2; + } + pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock); + int res_ledger = dap_chain_add_to_ledger(l_dag, a_chain->ledger, l_event_item); + pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); - // don't save bad transactions to base - if(dap_chain_ledger_tx_add(a_chain->ledger, l_tx) != 1) { - return -1; - } - //}else - // return -2; - } - break; - default: - return -1; + if(res_ledger < 0) + return res_ledger; } + // Now check the treshold if some events now are ready to move to the main table pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock); - while(dap_chain_cs_dag_proc_treshold(l_dag)); + while(dap_chain_cs_dag_proc_treshold(l_dag, a_chain->ledger)); pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock); return 0; @@ -697,7 +706,7 @@ int dap_chain_cs_dag_event_verify_hashes_with_treshold(dap_chain_cs_dag_t * a_da * @param a_dag * @returns true if some atoms were moved from threshold to events */ -bool dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag) +bool dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger) { bool res = false; // TODO Process finish treshold. For now - easiest from possible @@ -710,6 +719,7 @@ bool dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag) if(ret == DAP_THRESHOLD_OK){ HASH_ADD(hh, PVT(a_dag)->events, hash,sizeof (l_event_item->hash), l_event_item); + int l_ledger_ret = dap_chain_add_to_ledger(a_dag, a_ledger, l_event_item); res = true; }else if(ret == DAP_THRESHOLD_CONFLICTING) HASH_ADD(hh, PVT(a_dag)->events_treshold_conflicted, hash,sizeof (l_event_item->hash), l_event_item); diff --git a/modules/type/dag/include/dap_chain_cs_dag.h b/modules/type/dag/include/dap_chain_cs_dag.h index 75a34d230beefb0ecc881e28a8b79b98ccc795c3..74a974b5a6d1a568c0cb1915da15f52cc3798144 100644 --- a/modules/type/dag/include/dap_chain_cs_dag.h +++ b/modules/type/dag/include/dap_chain_cs_dag.h @@ -65,7 +65,7 @@ void dap_chain_cs_dag_deinit(void); int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg); void dap_chain_cs_dag_delete(dap_chain_t * a_chain); -bool dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag); +bool dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger); void dap_chain_cs_dag_proc_event_round_new(dap_chain_cs_dag_t *a_dag); dap_chain_cs_dag_event_t* dap_chain_cs_dag_find_event_by_hash(dap_chain_cs_dag_t * a_dag,