Skip to content
Snippets Groups Projects

Feature 2336

Merged alexander.lysikov requested to merge feature-2336 into master
4 files
+ 227
68
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 192
51
@@ -46,6 +46,7 @@
#include "dap_chain_cs_dag.h"
#include "dap_chain_global_db.h"
#include "dap_chain_node_cli.h"
#include "dap_chain_cell.h"
#include "dap_chain_net.h"
#define LOG_TAG "dap_chain_cs_dag"
@@ -85,6 +86,7 @@ static dap_chain_atom_iter_t* s_chain_callback_atom_iter_create_from(dap_chain_t
static dap_chain_atom_ptr_t s_chain_callback_atom_iter_find_by_hash(dap_chain_atom_iter_t * a_atom_iter ,
dap_chain_hash_fast_t * a_atom_hash);
static dap_chain_datum_t* s_chain_callback_atom_get_datum(dap_chain_atom_ptr_t a_event);
// Get event(s) from dag
static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get_first( dap_chain_atom_iter_t * a_atom_iter ); // Get the fisrt event from dag
static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get_next( dap_chain_atom_iter_t * a_atom_iter ); // Get the next event from dag
@@ -172,6 +174,7 @@ int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg)
// Linear pass through
a_chain->callback_atom_iter_get_first = s_chain_callback_atom_iter_get_first; // Get the fisrt element from chain
a_chain->callback_atom_iter_get_next = s_chain_callback_atom_iter_get_next; // Get the next element from chain from the current one
a_chain->callback_atom_get_datum = s_chain_callback_atom_get_datum;
a_chain->callback_atom_iter_get_links = s_chain_callback_atom_iter_get_links; // Get the next element from chain from the current one
a_chain->callback_atom_iter_get_lasts = s_chain_callback_atom_iter_get_lasts;
@@ -193,6 +196,7 @@ int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg)
l_dag->is_single_line = dap_config_get_item_bool_default(a_chain_cfg,"dag","is_single_line",false);
l_dag->is_celled = dap_config_get_item_bool_default(a_chain_cfg,"dag","is_celled",false);
l_dag->is_add_directy = dap_config_get_item_bool_default(a_chain_cfg,"dag","is_add_directly",false);
l_dag->datum_add_hashes_count = dap_config_get_item_uint16_default(a_chain_cfg,"dag","datum_add_hashes_count",1);
l_dag->gdb_group_events_round_new = strdup( dap_config_get_item_str_default(a_chain_cfg,"dag","gdb_group_events_round_new",
"events.round.new"));
@@ -239,6 +243,7 @@ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t
dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG(a_chain);
dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *) a_atom;
// verification was already in s_chain_callback_atom_verify()
ret = l_dag->callback_cs_verify(l_dag,l_event);
if ( ret != 0 ){
log_it(L_WARNING,"Consensus can't accept the event, verification returned %d",ret);
@@ -264,11 +269,18 @@ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t
return -3;
}
HASH_ADD(hh, l_events,hash,sizeof (l_event_item->hash), l_event_item);
// save l_events to dag_pvt
if(ret==0)
PVT(l_dag)->events = l_events;
else
PVT(l_dag)->events_treshold = l_events;
//HASH_ADD(hh, PVT(l_dag)->events_treshold, hash, sizeof(l_event_item->hash), l_event_item);
if ( l_events == PVT(l_dag)->events){
dap_chain_cs_dag_event_item_t * l_event_last = NULL;
// Check the events and update the lasts
for ( dap_chain_hash_fast_t * l_link_hash = (dap_chain_hash_fast_t *) l_event->hashes_n_datum_n_signs ;
l_link_hash != ( dap_chain_hash_fast_t *) (
l_link_hash < ( dap_chain_hash_fast_t *) (
l_event->hashes_n_datum_n_signs + l_event->header.hash_count*sizeof (*l_link_hash) );
l_link_hash += sizeof (dap_chain_hash_fast_t ) ) {
l_event_last = NULL;
@@ -285,6 +297,40 @@ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t
l_event_last->event = l_event;
HASH_ADD(hh,PVT(l_dag)->events_lasts_unlinked,hash,sizeof (l_event_last->hash),l_event_last);
}
// add datum from event to ledger
dap_chain_datum_t *l_datum = (dap_chain_datum_t*) dap_chain_cs_dag_event_get_datum(l_event);
switch (l_datum->header.type_id) {
case DAP_CHAIN_DATUM_TOKEN_DECL: {
dap_chain_datum_token_t *l_token = (dap_chain_datum_token_t*) l_datum->data;
dap_chain_ledger_token_add(a_chain->ledger, l_token, l_datum->header.data_size);
}
break;
case DAP_CHAIN_DATUM_TOKEN_EMISSION: {
dap_chain_datum_token_emission_t *l_token_emission = (dap_chain_datum_token_emission_t*) l_datum->data;
dap_chain_ledger_token_emission_add(a_chain->ledger, l_token_emission, l_datum->header.data_size);
}
break;
case DAP_CHAIN_DATUM_TX: {
dap_chain_datum_tx_t *l_tx = (dap_chain_datum_tx_t*) l_datum->data;
//if ( !l_gdb_priv->is_load_mode ) // If its not load module but mempool proc
// l_tx->header.ts_created = time(NULL);
//if(dap_chain_datum_tx_get_size(l_tx) == l_datum->header.data_size){
// don't save bad transactions to base
if(dap_chain_ledger_tx_add(a_chain->ledger, l_tx) != 1) {
pthread_rwlock_unlock(l_events_rwlock);
return -1;
}
//}else
// return -2;
}
break;
default:
pthread_rwlock_unlock(l_events_rwlock);
return -1;
}
pthread_rwlock_unlock( l_events_rwlock );
// Now check the treshold if some events now are ready to move to the main table
dap_chain_cs_dag_proc_treshold(l_dag);
@@ -305,23 +351,24 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain
// Load current events new round pool
dap_global_db_obj_t * l_events_round_new = dap_chain_global_db_gr_load(l_dag->gdb_group_events_round_new, &l_events_round_new_size );
// Prepare hashes
size_t l_hashes_int_size = ( l_events_round_new_size + a_datums_count )> l_dag->datum_add_hashes_count ?
l_dag->datum_add_hashes_count :
l_events_round_new_size+a_datums_count;
size_t l_hashes_int_size = min(l_events_round_new_size + a_datums_count, l_dag->datum_add_hashes_count);
// ( l_events_round_new_size + a_datums_count ) > l_dag->datum_add_hashes_count ?
// l_dag->datum_add_hashes_count :
// l_events_round_new_size+a_datums_count;
if (l_dag->is_single_line ) // If single line - no any link inside
l_hashes_int_size = 0;
size_t l_hashes_ext_size = 1; // Change in cfg
size_t l_hashes_size = l_hashes_int_size+l_hashes_ext_size;
dap_chain_hash_fast_t * l_hashes = DAP_NEW_Z_SIZE(dap_chain_hash_fast_t,
sizeof(dap_chain_hash_fast_t) *
(l_hashes_int_size+l_hashes_ext_size) );
sizeof(dap_chain_hash_fast_t) * l_hashes_size);
size_t l_hashes_linked = 0;
for (size_t d = 0; d <a_datums_count ; d++){
dap_chain_datum_t * l_datum = a_datums[d];
if ( l_hashes_int_size ){
if ( l_hashes_int_size && l_events_round_new_size){
// Linking randomly with current new round set
size_t l_rnd_steps;
// Linking events inside round
@@ -342,56 +389,80 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain
}
if ( ! l_is_already_in_event ){
memcpy(&l_hashes[l_hashes_linked],&l_hash,sizeof (l_hash) );
l_hashes_linked++;
if(l_hashes_linked < l_hashes_size) {
memcpy(&l_hashes[l_hashes_linked], &l_hash, sizeof(l_hash));
l_hashes_linked++;
}
}
l_rnd_steps++;
if (l_rnd_steps > 100) // Too many attempts
break;
} while (l_hashes_linked <(l_hashes_int_size) );
} while (l_hashes_linked <(l_events_round_new_size) );
// Check if we have enought hash links
if (l_hashes_linked<l_hashes_int_size ){
if (l_hashes_linked<l_events_round_new_size ){
log_it(L_ERROR,"Can't link new events randomly for 100 attempts");
break;
}
}
// Now link with ext events
dap_chain_cs_dag_event_item_t *l_event_ext_item = NULL;
if ( PVT(l_dag)->events_lasts_unlinked ){ // Take then the first one if any events_lasts are present
l_event_ext_item = PVT(l_dag)->events_lasts_unlinked;
memcpy(&l_hashes[l_hashes_linked],& l_event_ext_item->hash,sizeof (l_event_ext_item->hash) );
l_hashes_linked++;
}
dap_chain_cs_dag_event_item_t *l_event_ext_item = NULL;
// is_single_line - no any link inside
if(!l_dag->is_single_line)
if( PVT(l_dag)->events_lasts_unlinked) { // Take then the first one if any events_lasts are present
l_event_ext_item = PVT(l_dag)->events_lasts_unlinked;
memcpy(&l_hashes[l_hashes_linked], &l_event_ext_item->hash, sizeof(l_event_ext_item->hash));
l_hashes_linked++;
}
if (l_hashes_linked || s_seed_mode ) {
dap_chain_cs_dag_event_t * l_event = l_dag->callback_cs_event_create(l_dag,l_datum,l_hashes,l_hashes_linked);
dap_chain_cs_dag_event_t * l_event = NULL;
if(l_dag->callback_cs_event_create)
l_event = l_dag->callback_cs_event_create(l_dag,l_datum,l_hashes,l_hashes_linked);
if ( l_event){ // Event is created
dap_chain_hash_fast_t l_event_hash;
dap_chain_cs_dag_event_calc_hash(l_event,&l_event_hash);
char * l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_hash);
if( dap_chain_global_db_gr_set( l_event_hash_str, (uint8_t *) l_event, dap_chain_cs_dag_event_calc_size(l_event),
l_dag->gdb_group_events_round_new ) ){
log_it(L_INFO,"Event %s placed in the new forming round",l_event_hash_str);
// Clear old ext link and place itself as event_lasts
dap_chain_cs_dag_event_item_t * l_event_unlinked_item = DAP_NEW_Z(dap_chain_cs_dag_event_item_t);
memcpy ( &l_event_unlinked_item->hash, &l_event_ext_item->hash, sizeof (l_event_ext_item->hash) );
l_event_unlinked_item->event = l_event;
l_event_unlinked_item->ts_added = (time_t) l_event->header.ts_created;
pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock);
HASH_ADD(hh, PVT(l_dag)->events_lasts_unlinked,hash,sizeof(l_event_unlinked_item->hash),l_event_unlinked_item );
if (l_event_ext_item){
HASH_DEL(PVT(l_dag)->events_lasts_unlinked, l_event_ext_item);
DAP_DELETE(l_event_ext_item);
// add directly to file
if(l_dag->is_add_directy) {
if(!s_chain_callback_atom_add(a_chain, l_event)) {
l_datum_processed++;
}
else {
log_it(L_ERROR, "Can't add new event");
continue;
}
pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock);
}
// add to new round into global_db
else {
dap_chain_hash_fast_t l_event_hash;
dap_chain_cs_dag_event_calc_hash(l_event, &l_event_hash);
char * l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_hash);
if(dap_chain_global_db_gr_set(l_event_hash_str, (uint8_t *) l_event,
dap_chain_cs_dag_event_calc_size(l_event),
l_dag->gdb_group_events_round_new)) {
log_it(L_INFO, "Event %s placed in the new forming round", l_event_hash_str);
// Clear old ext link and place itself as event_lasts
l_datum_processed++;
}else {
log_it(L_ERROR,"Can't add new event to the new events round");
break;
dap_chain_cs_dag_event_item_t * l_event_unlinked_item = DAP_NEW_Z(
dap_chain_cs_dag_event_item_t);
if(l_event_ext_item)
memcpy(&l_event_unlinked_item->hash, &l_event_ext_item->hash,
sizeof(l_event_ext_item->hash));
l_event_unlinked_item->event = l_event;
l_event_unlinked_item->ts_added = (time_t) l_event->header.ts_created;
pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock);
HASH_ADD(hh, PVT(l_dag)->events_lasts_unlinked, hash, sizeof(l_event_unlinked_item->hash),
l_event_unlinked_item);
if(l_event_ext_item) {
HASH_DEL(PVT(l_dag)->events_lasts_unlinked, l_event_ext_item);
DAP_DELETE(l_event_ext_item);
}
pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock);
l_datum_processed++;
}else {
log_it(L_ERROR,"Can't add new event to the new events round");
break;
}
}
}else {
log_it(L_ERROR,"Can't create new event!");
@@ -399,6 +470,24 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain
}
}
}
// add events to file
if(l_dag->is_add_directy && l_datum_processed>0) {
dap_chain_cell_t *l_cell = dap_chain_cell_create();
int l_res = -1;
if(l_cell) {
dap_chain_net_t *l_net = dap_chain_net_by_id(a_chain->net_id);
l_cell->chain = a_chain;
l_cell->id.uint64 = l_net ? l_net->pub.cell_id.uint64 : 0;
l_cell->file_storage_path = dap_strdup_printf("%0llx.dchaincell", l_cell->id.uint64);
l_res = dap_chain_cell_file_update(l_cell);
}
if(!l_cell || l_res) {
log_it(L_ERROR, "Can't add new %d events to the file '%s'", l_datum_processed,
l_cell ? l_cell->file_storage_path : "");
l_datum_processed = 0;
}
dap_chain_cell_delete(l_cell);
}
dap_chain_global_db_objs_delete(l_events_round_new, l_events_round_new_size);
return l_datum_processed;
}
@@ -439,7 +528,7 @@ static int s_chain_callback_atom_verify(dap_chain_t * a_chain, dap_chain_atom_pt
dap_chain_cs_dag_event_item_t * l_event_search = NULL;
HASH_FIND(hh, PVT(l_dag)->events ,l_hash ,sizeof (*l_hash), l_event_search);
if ( l_event_search == NULL ){
log_it(L_DEBUG, "Hash %s wasn't in hashtable of previously parsed");
log_it(L_DEBUG, "Hash %s wasn't in hashtable of previously parsed", l_hash);
return 1;
}
@@ -586,6 +675,18 @@ static dap_chain_atom_iter_t* s_chain_callback_atom_iter_create(dap_chain_t * a_
return l_atom_iter;
}
/**
* @brief s_chain_callback_atom_get_datum Get the datum from event
* @param a_atom_iter
* @return
*/
static dap_chain_datum_t* s_chain_callback_atom_get_datum(dap_chain_atom_ptr_t a_event)
{
if(a_event)
return dap_chain_cs_dag_event_get_datum((dap_chain_cs_dag_event_t*) a_event);
return NULL;
}
/**
* @brief s_chain_callback_atom_iter_get_first Get the first dag event
* @param a_atom_iter
@@ -593,9 +694,14 @@ static dap_chain_atom_iter_t* s_chain_callback_atom_iter_create(dap_chain_t * a_
*/
static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get_first(dap_chain_atom_iter_t * a_atom_iter )
{
a_atom_iter->cur = a_atom_iter->cur ?
(dap_chain_cs_dag_event_t*) PVT (DAP_CHAIN_CS_DAG( a_atom_iter->chain) )->events->event : NULL;
a_atom_iter->cur_item = PVT (DAP_CHAIN_CS_DAG( a_atom_iter->chain) )->events;
dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG(a_atom_iter->chain);
dap_chain_cs_dag_pvt_t *l_dag_pvt = l_dag ? PVT(l_dag) : NULL;
a_atom_iter->cur_item = l_dag_pvt->events;
a_atom_iter->cur = (dap_chain_cs_dag_event_t*) (l_dag_pvt->events ? l_dag_pvt->events->event : NULL);
// a_atom_iter->cur = a_atom_iter->cur ?
// (dap_chain_cs_dag_event_t*) PVT (DAP_CHAIN_CS_DAG( a_atom_iter->chain) )->events->event : NULL;
// a_atom_iter->cur_item = PVT (DAP_CHAIN_CS_DAG( a_atom_iter->chain) )->events;
return a_atom_iter->cur;
}
@@ -696,7 +802,8 @@ static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get_next( dap_chain_atom_
dap_chain_cs_dag_event_item_t * l_event_item = (dap_chain_cs_dag_event_item_t*) a_atom_iter->cur_item;
a_atom_iter->cur_item = l_event_item->hh.next;
l_event_item = (dap_chain_cs_dag_event_item_t*) a_atom_iter->cur_item;
a_atom_iter->cur = l_event_item->event ;
// if l_event_item=NULL then items are over
a_atom_iter->cur = l_event_item ? l_event_item->event : NULL;
}
return a_atom_iter->cur;
}
@@ -750,7 +857,7 @@ static int s_cli_dag(int argc, char ** argv, char **a_str_reply)
dap_chain_node_cli_find_option_val(argv, arg_index, argc, "-net", &l_net_name);
dap_chain_node_cli_find_option_val(argv, arg_index, argc, "-chain", &l_chain_name);
dap_chain_node_cli_find_option_val(argv, arg_index, argc, "event", &l_event_cmd_str);
dap_chain_node_cli_find_option_val(argv, arg_index, argc, "round", &l_event_cmd_str);
dap_chain_node_cli_find_option_val(argv, arg_index, argc, "round", &l_round_cmd_str);
if ( l_net_name == NULL){
dap_chain_node_cli_set_reply_text(a_str_reply, "Need -net <net name> param!");
@@ -781,15 +888,19 @@ static int s_cli_dag(int argc, char ** argv, char **a_str_reply)
const char * l_cmd_mode_str = NULL;
dap_chain_node_cli_find_option_val(argv, arg_index, argc, "-mode", &l_cmd_mode_str);
bool l_verify_only = false;
if ( strcmp(l_cmd_mode_str,"verify only") == 0 ){
if ( dap_strcmp(l_cmd_mode_str,"verify only") == 0 ){
l_verify_only = true;
}
log_it(L_NOTICE,"Round complete command accepted, forming new events");
dap_string_t *l_str_ret_tmp= dap_string_new("Completing round:\n");
size_t l_objs_size=0;
dap_global_db_obj_t * l_objs = dap_chain_global_db_gr_load(l_dag->gdb_group_events_round_new,&l_objs_size);
dap_string_t *l_str_ret_tmp= l_objs_size>0 ? dap_string_new("Completing round:\n") : dap_string_new("Completing round: no data");
// list for verifed and added events
dap_list_t *l_list_to_del = NULL;
// Check if its ready or not
for (size_t i = 0; i< l_objs_size; i++ ){
dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t*) l_objs[i].value;
@@ -805,12 +916,42 @@ static int s_cli_dag(int argc, char ** argv, char **a_str_reply)
dap_string_append_printf( l_str_ret_tmp, "Event %s verification passed\n", l_objs[i].key);
// If not verify only mode we add
if ( ! l_verify_only ){
dap_chain_atom_ptr_t l_new_atom = NULL; // produce deep copy of event;
memcpy(l_new_atom,l_event,l_event_size);
s_chain_callback_atom_add(l_chain,l_new_atom); // Add new atom in chain
dap_chain_atom_ptr_t l_new_atom = (dap_chain_atom_ptr_t)dap_chain_cs_dag_event_copy(l_event); // produce deep copy of event;
memcpy(l_new_atom, l_event, l_event_size);
if(s_chain_callback_atom_add(l_chain, l_new_atom) < 0) { // Add new atom in chain
DAP_DELETE(l_new_atom);
dap_string_append_printf(l_str_ret_tmp, "Event %s not added in chain\n", l_objs[i].key);
}
else {
// add event to delete
l_list_to_del = dap_list_prepend(l_list_to_del, l_objs[i].key);
dap_string_append_printf(l_str_ret_tmp, "Event %s added in chain successfully\n",
l_objs[i].key);
}
}
}
}
// write events to file and delete events from db
if(l_list_to_del) {
dap_chain_cell_t *l_cell = dap_chain_cell_create();
if(l_cell) {
l_cell->chain = l_chain;
l_cell->id.uint64 = l_net ? l_net->pub.cell_id.uint64 : 0;
l_cell->file_storage_path = dap_strdup_printf("%0llx.dchaincell", l_cell->id.uint64);
if(!dap_chain_cell_file_update(l_cell)) {
// delete events from db
dap_list_t *l_list_tmp = l_list_to_del;
while(l_list_tmp) {
const char *l_key = (const char*) l_list_tmp->data;
dap_chain_global_db_gr_del(l_key, l_dag->gdb_group_events_round_new);
l_list_tmp = dap_list_next(l_list_tmp);
}
}
}
dap_chain_cell_delete(l_cell);
dap_list_free(l_list_to_del);
}
// Cleaning up
dap_chain_global_db_objs_delete(l_objs, l_objs_size);
dap_chain_node_cli_set_reply_text(a_str_reply,l_str_ret_tmp->str);
Loading