diff --git a/dap_chain_cs_dag.c b/dap_chain_cs_dag.c index fa733a846396c609754cec7abc9e78a07c9238f3..734f548574b2f4a4022086b736efb9bbdd3c6e80 100755 --- a/dap_chain_cs_dag.c +++ b/dap_chain_cs_dag.c @@ -22,9 +22,12 @@ along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. */ #include <stdint.h> -#include <uthash.h> +#include <time.h> +#include <pthread.h> +#include "uthash.h" #include "dap_common.h" +#include "dap_hash.h" #include "dap_chain_datum.h" #include "dap_chain_cs.h" #include "dap_chain_cs_dag.h" @@ -33,16 +36,18 @@ typedef struct dap_chain_cs_dag_event_item { dap_chain_hash_fast_t hash; + time_t ts_added; dap_chain_cs_dag_event_t *event; UT_hash_handle hh; } dap_chain_cs_dag_event_item_t; typedef struct dap_chain_cs_dag_pvt { dap_chain_cs_dag_event_item_t * events; - dap_chain_cs_dag_event_item_t * events_trashhold; + pthread_rwlock_t events_rwlock; - dap_chain_cs_dag_event_item_t * events_round_new; - dap_chain_cs_dag_event_item_t * events_round_prev_lasts; + dap_chain_cs_dag_event_item_t * events_treshold; + pthread_rwlock_t events_treshold_rwlock; + dap_chain_cs_dag_event_item_t * events_lasts; } dap_chain_cs_dag_pvt_t; #define PVT(a) ((dap_chain_cs_dag_pvt_t *) a->_pvt ) @@ -51,7 +56,7 @@ typedef struct dap_chain_cs_dag_pvt { static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_t *); // Accept new event in dag static int s_chain_callback_atom_verify(dap_chain_t * a_chain, dap_chain_atom_t *); // Verify new event in dag static size_t s_chain_callback_atom_hdr_get_size(dap_chain_atom_t *); // Get dag event size -static size_t s_chain_callback_atom_get_static_hdr_size(dap_chain_t *); // Get dag event header size +static size_t s_chain_callback_atom_get_static_hdr_size(); // Get dag event header size static dap_chain_atom_iter_t* s_chain_callback_atom_iter_create(dap_chain_t * a_chain ); // Get the fisrt event from dag static dap_chain_atom_t* s_chain_callback_atom_iter_get_first( dap_chain_atom_iter_t * a_atom_iter ); // Get the fisrt event from dag @@ -93,9 +98,12 @@ void dap_chain_cs_dag_deinit(void) */ int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) { - dap_chain_cs_dag_t * l_chain_cs_dag = DAP_NEW_Z(dap_chain_cs_dag_t); - l_chain_cs_dag->_pvt = DAP_NEW_Z(dap_chain_cs_dag_pvt_t); - l_chain_cs_dag->chain = a_chain; + dap_chain_cs_dag_t * l_dag = DAP_NEW_Z(dap_chain_cs_dag_t); + l_dag->_pvt = DAP_NEW_Z(dap_chain_cs_dag_pvt_t); + l_dag->chain = a_chain; + + pthread_rwlock_init(& PVT(l_dag)->events_rwlock,NULL); + pthread_rwlock_init(& PVT(l_dag)->events_treshold_rwlock,NULL); a_chain->callback_delete = dap_chain_cs_dag_delete; @@ -117,10 +125,10 @@ int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) a_chain->callback_datum_iter_get_next = s_chain_callback_datum_iter_get_next; // Get the next datum from chain from the current one // Others - a_chain->_inheritor = l_chain_cs_dag; + a_chain->_inheritor = l_dag; - l_chain_cs_dag->is_single_line = dap_config_get_item_bool_default(a_chain_cfg,"dag","is_single_line",false); - if ( l_chain_cs_dag->is_single_line ) + l_dag->is_single_line = dap_config_get_item_bool_default(a_chain_cfg,"dag","is_single_line",false); + if ( l_dag->is_single_line ) log_it (L_NOTICE, "DAG chain initialized (single line)"); else log_it (L_NOTICE, "DAG chain initialized (multichain)"); @@ -136,6 +144,9 @@ int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) void dap_chain_cs_dag_delete(dap_chain_t * a_chain) { dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG ( a_chain ); + pthread_rwlock_destroy(& PVT(l_dag)->events_rwlock); + pthread_rwlock_destroy(& PVT(l_dag)->events_treshold_rwlock); + if(l_dag->callback_delete ) l_dag->callback_delete(l_dag); if(l_dag->_inheritor) @@ -153,7 +164,7 @@ void dap_chain_cs_dag_delete(dap_chain_t * a_chain) static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_t * a_atom) { int ret = s_chain_callback_atom_verify (a_chain, a_atom); - if ( ret != 0 ){ + if ( ret < 0 ){ log_it(L_WARNING,"Wrong event, can't accept, verification returned %d",ret); return -1; } @@ -169,16 +180,26 @@ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_t * a l_event_item->event = l_event; dap_hash_fast(l_event, dap_chain_cs_dag_event_calc_size(l_event),&l_event_item->hash ); + + // Put in main table or in the treshhold if not all the rest linked event are present dap_chain_cs_dag_event_item_t * l_event_search = NULL; - HASH_FIND(hh, PVT(l_dag)->events,&l_event_item->hash,sizeof (l_event_search->hash), l_event_search); + dap_chain_cs_dag_event_item_t * l_events =( ret==0 ? PVT(l_dag)->events : PVT(l_dag)->events_treshold ); + pthread_rwlock_t * l_events_rwlock =( ret==0 ? &PVT(l_dag)->events_rwlock : &PVT(l_dag)->events_treshold_rwlock ); + pthread_rwlock_wrlock( l_events_rwlock ); + HASH_FIND(hh, l_events,&l_event_item->hash,sizeof (l_event_search->hash), l_event_search); if ( l_event_search ) { + pthread_rwlock_unlock( l_events_rwlock ); char * l_hash_str = dap_chain_hash_fast_to_str_new(&l_event_item->hash); log_it(L_ERROR, "Dag event %s is already present in dag",l_hash_str); DAP_DELETE(l_event_item); DAP_DELETE(l_hash_str); return -3; } - HASH_ADD(hh, PVT(l_dag)->events,hash,sizeof (l_event_item->hash), l_event_item); + HASH_ADD(hh, l_events,hash,sizeof (l_event_item->hash), l_event_item); + pthread_rwlock_unlock( l_events_rwlock ); + + // Now check the treshold if some events now are ready to move to the main table + dap_chain_cs_dag_proc_treshold(l_dag); return 0; } @@ -193,9 +214,34 @@ static int s_chain_callback_atom_verify(dap_chain_t * a_chain, dap_chain_atom_t { dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG(a_chain); dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *) a_atom; - return l_dag->callback_cs_verify ( l_dag, l_event ); + int ret = l_dag->callback_cs_verify ( l_dag, l_event ); + if (ret == 0 ){ + for (size_t i = 0; i< l_event->header.hash_count; i++) { + dap_chain_hash_fast_t * l_hash = ((dap_chain_hash_fast_t *) l_event->hashes_n_datum_n_signs) + i; + dap_chain_cs_dag_event_item_t * l_event_search = NULL; + HASH_FIND(hh, PVT(l_dag)->events ,l_hash ,sizeof (*l_hash), l_event_search); + if ( l_event_search == NULL ){ + log_it(L_DEBUG, "Hash %s wasn't in hashtable of previously parsed"); + return 1; + } + + } + return 0; + }else { + return ret; + } +} + +/** + * @brief dap_chain_cs_dag_proc_treshold + * @param a_dag + */ +void dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag) +{ + } + /** * @brief s_chain_callback_atom_get_size Get size of atomic element * @param a_atom @@ -211,7 +257,7 @@ static size_t s_chain_callback_atom_hdr_get_size(dap_chain_atom_t * a_atom) * @param a_chain * @return */ -static size_t s_chain_callback_atom_get_static_hdr_size(dap_chain_t * a_chain) +static size_t s_chain_callback_atom_get_static_hdr_size() { return sizeof (dap_chain_class_dag_event_hdr_t); } diff --git a/dap_chain_cs_dag.h b/dap_chain_cs_dag.h index 994543e279ec517f571458502e4b5f2306ac40b6..af2604fddc75e25652aa365346b357dff340c20b 100755 --- a/dap_chain_cs_dag.h +++ b/dap_chain_cs_dag.h @@ -49,3 +49,5 @@ void dap_chain_cs_dag_deinit(void); int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg); void dap_chain_cs_dag_delete(dap_chain_t * a_chain); + +void dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag);