Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (5)
Showing
with 591 additions and 302 deletions
......@@ -79,6 +79,10 @@ typedef struct uint512_t {
#define ones_64 ((uint64_t)0xffffffffffffffff)
#ifdef __cplusplus
extern "C" {
#endif
static inline uint128_t GET_128_FROM_64(uint64_t n) {
#ifdef DAP_GLOBAL_IS_INT128
return (uint128_t) n;
......@@ -981,3 +985,7 @@ static inline uint256_t MULT_256_FLOAT(uint256_t a_val, long double a_mult)
DIV_256(l_ret, GET_256_FROM_64(CONV_256_FLOAT), &l_ret);
return l_ret;
}
#ifdef __cplusplus
}
#endif
......@@ -216,15 +216,16 @@ dap_cert_t * dap_cert_generate_mem_with_seed(const char * a_cert_name, dap_enc_k
if ( l_enc_key ){
dap_cert_t * l_cert = dap_cert_new(a_cert_name);
l_cert->enc_key = l_enc_key;
//log_it(L_DEBUG,"Certificate generated");
//dap_cert_item_t * l_cert_item = DAP_NEW_Z(dap_cert_item_t);
//snprintf(l_cert_item->name,sizeof(l_cert_item->name),"%s",a_cert_name);
//HASH_ADD_STR(s_certs,name,l_cert_item);
//log_it(L_DEBUG,"Certificate name %s recorded", a_cert_name);
if (a_seed && a_seed_size) {
dap_chain_hash_fast_t l_seed_hash;
dap_hash_fast(a_seed, a_seed_size, &l_seed_hash);
char *l_hash_str = dap_chain_hash_fast_to_str_new(&l_seed_hash);
log_it(L_DEBUG, "Certificate generated with seed hash %s", l_hash_str);
DAP_FREE(l_hash_str);
}
return l_cert;
} else {
log_it(L_ERROR,"Can't generate key in memory!");
//dap_cert_delete(l_cert);
return NULL;
}
}
......
......@@ -126,7 +126,7 @@ uint8_t* dap_enc_dilithium_write_signature(dilithium_signature_t* a_sign, size_t
return NULL ;
}
size_t l_shift_mem = 0;
size_t l_buflen = dap_enc_dilithium_calc_signagture_size(a_sign);
uint64_t l_buflen = dap_enc_dilithium_calc_signagture_size(a_sign);
uint8_t *l_buf = DAP_NEW_SIZE(uint8_t, l_buflen);
if(! l_buf)
......
......@@ -1255,6 +1255,8 @@ static void s_stream_es_callback_delete(dap_events_socket_t *a_es, void *arg)
}
l_client_pvt->stream = NULL;
l_client_pvt->stream_es = NULL;
//dap_client_delete_mt(l_client_pvt->client); // TODO find a way to delete the client if it no closed yet
}
/**
......
......@@ -377,13 +377,6 @@ dap_chain_t * dap_chain_load_from_cfg(dap_ledger_t* a_ledger, const char * a_cha
DAP_CHAIN_PVT ( l_chain)->file_storage_dir = strdup (
dap_config_get_item_str( l_cfg , "files","storage_dir" ) ) ;
if (dap_chain_load_all(l_chain) == 0) {
if (l_chain->callback_atom_add_from_treshold) {
while (l_chain->callback_atom_add_from_treshold(l_chain, NULL)) {
log_it(L_DEBUG, "Added atom from treshold");
}
}
/* Temporary garbage cleaner */
dap_chain_save_all( l_chain ); // Save only the valid chain, throw all garbage out!
log_it (L_NOTICE, "Loaded chain files");
} else {
dap_chain_save_all( l_chain );
......@@ -391,10 +384,11 @@ dap_chain_t * dap_chain_load_from_cfg(dap_ledger_t* a_ledger, const char * a_cha
}
} else{
log_it (L_INFO, "Not set file storage path, will not stored in files");
//dap_chain_delete(l_chain);
//l_chain = NULL;
}
if (!l_chain->cells) {
dap_chain_cell_id_t l_cell_id = {.uint64 = 0};
dap_chain_cell_create_fill(l_chain, l_cell_id);
}
}else{
log_it (L_ERROR, "Can't init consensus \"%s\"",dap_config_get_item_str_default( l_cfg , "chain","consensus","NULL"));
dap_chain_delete(l_chain);
......@@ -597,3 +591,17 @@ bool dap_chain_get_atom_last_hash(dap_chain_t *a_chain, dap_hash_fast_t *a_atom_
a_chain->callback_atom_iter_delete(l_atom_iter);
return l_ret;
}
ssize_t dap_chain_atom_save(dap_chain_t *a_chain, const uint8_t *a_atom, size_t a_atom_size, dap_chain_cell_id_t a_cell_id)
{
dap_chain_cell_t *l_cell = dap_chain_cell_find_by_id(a_chain, a_cell_id);
if (!l_cell) {
log_it(L_INFO, "Creating cell 0x%016"DAP_UINT64_FORMAT_X, a_cell_id.uint64);
l_cell = dap_chain_cell_create_fill(a_chain, a_cell_id);
if (!l_cell) {
log_it(L_ERROR, "Can't create cell with id 0x%"DAP_UINT64_FORMAT_x" to save event...", a_cell_id.uint64);
return -7;
}
}
return dap_chain_cell_file_append(l_cell, a_atom, a_atom_size);
}
......@@ -113,15 +113,9 @@ dap_chain_cell_t * dap_chain_cell_create_fill(dap_chain_t * a_chain, dap_chain_c
*/
dap_chain_cell_t * dap_chain_cell_create_fill2(dap_chain_t * a_chain, const char *a_filename)
{
dap_chain_cell_t * l_cell = DAP_NEW_Z(dap_chain_cell_t);
l_cell->chain = a_chain;
dap_sscanf(a_filename, "%"DAP_UINT64_FORMAT_x".dchaincell", &l_cell->id.uint64);
l_cell->file_storage_path = dap_strdup_printf(a_filename);
pthread_rwlock_init(&l_cell->storage_rwlock, NULL);
pthread_rwlock_wrlock(&a_chain->cell_rwlock);
HASH_ADD(hh, a_chain->cells, id, sizeof(dap_chain_cell_id_t), l_cell);
pthread_rwlock_unlock(&a_chain->cell_rwlock);
return l_cell;
dap_chain_cell_id_t l_cell_id;
dap_sscanf(a_filename, "%"DAP_UINT64_FORMAT_x".dchaincell", &l_cell_id.uint64);
return dap_chain_cell_create_fill(a_chain, l_cell_id);
}
/**
......
......@@ -189,3 +189,4 @@ void dap_chain_delete(dap_chain_t * a_chain);
void dap_chain_add_callback_notify(dap_chain_t * a_chain, dap_chain_callback_notify_t a_callback, void * a_arg);
dap_chain_atom_ptr_t dap_chain_get_atom_by_hash(dap_chain_t * a_chain, dap_chain_hash_fast_t * a_atom_hash, size_t * a_atom_size);
bool dap_chain_get_atom_last_hash(dap_chain_t *a_chain, dap_hash_fast_t *a_atom_hash, dap_chain_cell_id_t a_cel_id);
ssize_t dap_chain_atom_save(dap_chain_t *a_chain, const uint8_t *a_atom, size_t a_atom_size, dap_chain_cell_id_t a_cell_id);
......@@ -452,6 +452,11 @@ static bool s_sync_update_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a
log_it(L_DEBUG, "Prepare request to gdb sync from %s", l_sync_request->request.id_start ? "last sync" : "zero");
dap_chain_net_t *l_net = dap_chain_net_by_id(l_sync_request->request_hdr.net_id);
dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(l_sync_request->worker), l_sync_request->ch_uuid);
if (!l_ch) {
log_it(L_INFO, "Client disconnected before we sent the reply");
DAP_DELETE(l_sync_request);
return true;
}
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
int l_flags = 0;
if (dap_chain_net_get_add_gdb_group(l_net, l_sync_request->request.node_addr))
......@@ -501,79 +506,68 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg)
dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_item->pkt_data;
uint64_t l_atom_copy_size = l_pkt_item->pkt_data_size;
dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash);
size_t l_atom_size = 0;
if (dap_chain_get_atom_by_hash(l_chain, &l_atom_hash, &l_atom_size)) {
if (s_debug_more){
char l_atom_hash_str[72] = {'\0'};
dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 );
log_it(L_WARNING, "Atom hash %s is already present", l_atom_hash_str);
}
dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash);
DAP_DELETE(l_atom_copy);
DAP_DELETE(l_sync_request);
return true;
}
dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size);
switch (l_atom_add_res) {
case ATOM_PASS:
if (s_debug_more){
char l_atom_hash_str[72]={[0]='\0'};
dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 );
log_it(L_WARNING,"Atom with hash %s for %s:%s not accepted (code ATOM_PASS, already present)", l_atom_hash_str, l_chain->net_name, l_chain->name);
log_it(L_WARNING, "Atom with hash %s for %s:%s not accepted (code ATOM_PASS, already present)", l_atom_hash_str, l_chain->net_name, l_chain->name);
}
dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash);
DAP_DELETE(l_atom_copy);
break;
case ATOM_MOVE_TO_THRESHOLD:
if (s_debug_more) {
char l_atom_hash_str[72] = {'\0'};
dap_chain_hash_fast_to_str(&l_atom_hash, l_atom_hash_str, sizeof(l_atom_hash_str) - 1);
log_it(L_INFO, "Thresholded atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name);
}
break;
case ATOM_ACCEPT:
if (s_debug_more) {
char l_atom_hash_str[72]={'\0'};
dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 );
log_it(L_INFO, "%s atom with hash %s for %s:%s", l_atom_add_res == ATOM_ACCEPT ? "Accepted" : "Thresholded",
l_atom_hash_str, l_chain->net_name, l_chain->name);
}
dap_chain_cell_t *l_cell = dap_chain_cell_find_by_id(l_chain, l_sync_request->request_hdr.cell_id);
if (!l_cell) {
log_it(L_INFO, "Creating cell 0x%016"DAP_UINT64_FORMAT_X, l_sync_request->request_hdr.cell_id.uint64);
l_cell = dap_chain_cell_create_fill(l_chain, l_sync_request->request_hdr.cell_id);
}
if (!l_cell) {
log_it(L_ERROR, "Can't create cell with id 0x%"DAP_UINT64_FORMAT_x" to save event...", l_sync_request->request_hdr.cell_id.uint64);
break;
log_it(L_INFO,"Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name);
}
if (l_atom_add_res == ATOM_ACCEPT) {
int l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size);
if(l_res < 0) {
char l_atom_hash_str[72]={'\0'};
dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 );
log_it(L_ERROR, "Can't save event %s to the file '%s'", l_atom_hash_str,
l_cell ? l_cell->file_storage_path : "[null]");
} else {
dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash);
}
int l_res = dap_chain_atom_save(l_chain, l_atom_copy, l_atom_copy_size, l_sync_request->request_hdr.cell_id);
if(l_res < 0) {
char l_atom_hash_str[72]={'\0'};
dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 );
log_it(L_ERROR, "Can't save atom %s to the file", l_atom_hash_str);
} else {
dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash);
}
if (l_chain->callback_atom_add_from_treshold) {
dap_chain_atom_ptr_t l_atom_treshold;
do {
size_t l_atom_treshold_size;
if (s_debug_more)
log_it(L_DEBUG, "Try to add atom from treshold");
l_atom_treshold = l_chain->callback_atom_add_from_treshold(l_chain, &l_atom_treshold_size);
if(l_atom_treshold) {
int l_res = dap_chain_cell_file_append(l_cell, l_atom_treshold, l_atom_treshold_size);
log_it(L_INFO, "Added atom from treshold");
if(l_res < 0) {
char l_atom_hash_str[72]={'\0'};
dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 );
log_it(L_ERROR, "Can't save event %s from treshold to file '%s'", l_atom_hash_str,
l_cell ? l_cell->file_storage_path : "[null]");
} else {
dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash);
}
dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain->net_id);
dap_chain_t *l_cur_chain;
bool l_processed;
do {
l_processed = false;
DL_FOREACH(l_net->pub.chains, l_cur_chain) {
if (l_cur_chain->callback_atom_add_from_treshold) {
dap_chain_atom_ptr_t l_atom_treshold;
do {
size_t l_atom_treshold_size;
if (s_debug_more)
log_it(L_DEBUG, "Try to add atom from treshold");
l_atom_treshold = l_cur_chain->callback_atom_add_from_treshold(l_cur_chain, &l_atom_treshold_size);
if (l_atom_treshold) {
dap_chain_cell_id_t l_cell_id = (l_cur_chain == l_chain) ? l_sync_request->request_hdr.cell_id
: l_cur_chain->cells->id;
int l_res = dap_chain_atom_save(l_cur_chain, l_atom_copy, l_atom_copy_size, l_cell_id);
log_it(L_INFO, "Added atom from treshold");
if (l_res < 0) {
char l_atom_hash_str[72] = {'\0'};
dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str, sizeof(l_atom_hash_str) - 1);
log_it(L_ERROR, "Can't save atom %s from treshold to file", l_atom_hash_str);
} else if (l_cur_chain == l_chain) {
dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash);
}
}
} while(l_atom_treshold);
}
} while(l_atom_treshold);
}
//dap_chain_cell_close(l_cell);
}
} while (l_processed);
break;
case ATOM_REJECT: {
if (s_debug_more) {
......@@ -817,6 +811,9 @@ static bool s_chain_timer_callback(void *a_arg)
if (l_ch_chain->state != CHAIN_STATE_IDLE) {
dap_stream_ch_chain_go_idle(l_ch_chain);
}
if (l_ch_chain->callback_notify_packet_out)
l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_TIMEOUT, NULL, 0,
l_ch_chain->callback_notify_arg);
if (l_ch_chain->request_db_log) s_free_log_list_gdb(l_ch_chain);
DAP_DELETE(a_arg);
l_ch_chain->activity_timer = NULL;
......
......@@ -68,6 +68,7 @@
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB 0x36
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END 0x46
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_TIMEOUT 0xfe
#define DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR 0xff
// TSD sections
......
......@@ -61,6 +61,9 @@ typedef struct dap_chain_cs_dag_poa_pvt
uint16_t auth_certs_count;
uint16_t auth_certs_count_verify; // Number of signatures, needed for event verification
uint32_t confirmations_timeout; // wait signs over min value (auth_certs_count_verify)
bool auto_confirmation;
bool auto_round_complete;
uint32_t wait_sync_before_complete;
uint8_t padding[4];
dap_chain_callback_new_cfg_t prev_callback_created; // global network config init
} dap_chain_cs_dag_poa_pvt_t;
......@@ -81,6 +84,14 @@ static int s_callback_event_verify(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_
static dap_chain_cs_dag_event_t * s_callback_event_create(dap_chain_cs_dag_t * a_dag, dap_chain_datum_t * a_datum,
dap_chain_hash_fast_t * a_hashes, size_t a_hashes_count, size_t* a_event_size);
static bool s_callback_round_event_to_chain(dap_chain_cs_dag_poa_callback_timer_arg_t * a_callback_arg);
static int s_callback_event_round_sync(dap_chain_cs_dag_t * a_dag, const char a_op_code, const char *a_group,
const char *a_key, const void *a_value, const size_t a_value_size);
static bool s_round_event_ready_minimum_check(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_event_t * a_event,
size_t a_event_size, char * a_event_hash_hex_str,
dap_chain_cs_dag_event_round_cfg_t * a_event_round_cfg);
static void s_round_event_cs_done(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_event_t * a_event,
char * a_event_hash_hex_str, dap_chain_cs_dag_event_round_cfg_t * a_event_round_cfg);
static void s_round_event_clean_dup(dap_chain_cs_dag_t * a_dag, const char *a_event_hash_hex_str);
// CLI commands
static int s_cli_dag_poa(int argc, char ** argv, char **str_reply);
......@@ -209,73 +220,35 @@ static int s_cli_dag_poa(int argc, char ** argv, char **a_str_reply)
char * l_event_new_hash_base58_str = dap_enc_base58_encode_hash_to_str(&l_event_new_hash);
//char * l_event_new_hash_base58_str = dap_enc_base58_from_hex_str_to_str(l_event_new_hash_hex_str);
bool l_event_is_ready = false;
if ( l_event_new->header.signs_count >= l_event_round_cfg.confirmations_minimum ) {
log_it(L_NOTICE,"Event %s minimum confirmations completed", l_event_new_hash_hex_str);
int l_ret_event_verify;
l_dag->callback_cs_set_event_round_cfg(l_dag, &l_event_round_cfg);
if ( ( l_ret_event_verify = l_dag->callback_cs_verify(l_dag, l_event_new, l_event_size_new)) == 0 ) {
log_it(L_NOTICE,"Event %s verification passed", l_event_new_hash_hex_str);
if (l_event_round_cfg.ts_confirmations_minimum_completed == (uint64_t)0) {
l_event_round_cfg.ts_confirmations_minimum_completed = (uint64_t)time(NULL);
}
l_event_is_ready = true;
} else {
log_it(L_NOTICE,"Error! Event %s is not passing consensus verification, ret code %d\n",
l_event_new_hash_hex_str, l_ret_event_verify);
}
}
bool l_event_is_ready = s_round_event_ready_minimum_check(l_dag, l_event_new, l_event_size_new,
l_event_new_hash_hex_str, &l_event_round_cfg);
if (dap_chain_cs_dag_event_gdb_set(l_event_new_hash_hex_str, l_event_new,
l_event_size_new, l_gdb_group_events, &l_event_round_cfg) ){
if ( dap_chain_global_db_gr_del(dap_strdup(l_event_hash_hex_str), l_gdb_group_events) ) { // Delete old event
if(!dap_strcmp(l_hash_out_type, "hex")) {
dap_chain_node_cli_set_reply_text(a_str_reply,
"Added new sign with cert \"%s\", event %s placed back in round.new\n",
l_poa_pvt->events_sign_cert->name, l_event_new_hash_hex_str);
}
else {
dap_chain_node_cli_set_reply_text(a_str_reply,
"Added new sign with cert \"%s\", event %s placed back in round.new\n",
l_poa_pvt->events_sign_cert->name, l_event_new_hash_base58_str);
}
ret = 0;
// dap_chain_net_sync_gdb(l_chain_net); // Propagate changes in pool
if (l_event_is_ready) { // minimum signs & verify passed
dap_chain_cs_dag_poa_callback_timer_arg_t * l_callback_arg = DAP_NEW_Z(dap_chain_cs_dag_poa_callback_timer_arg_t);
l_callback_arg->dag = l_dag;
l_callback_arg->l_event_hash_hex_str = dap_strdup(l_event_new_hash_hex_str);
memcpy(&l_callback_arg->event_round_cfg, &l_event_round_cfg, sizeof(dap_chain_cs_dag_event_round_cfg_t));
uint32_t l_timeout = l_event_round_cfg.confirmations_timeout;
if ( l_event_new->header.signs_count >= l_poa_pvt->auth_certs_count) {
s_callback_round_event_to_chain(l_callback_arg); // placement in chain now if max signs
}
else if ( l_timeout > ((uint64_t)time(NULL) - l_event_round_cfg.ts_confirmations_minimum_completed) ) {
l_timeout = l_timeout - ((uint64_t)time(NULL) - l_event_round_cfg.ts_confirmations_minimum_completed);
// placement in chain by timer
if (dap_timerfd_start(l_timeout*1000,
(dap_timerfd_callback_t)s_callback_round_event_to_chain,
l_callback_arg) == NULL) {
log_it(L_ERROR,"Can't run timer for Event %s", l_event_new_hash_hex_str);
} else {
log_it(L_NOTICE,"Run timer %dsec. for Event %s", l_timeout, l_event_new_hash_hex_str);
}
} else { // placement in chain now if timer out
s_callback_round_event_to_chain(l_callback_arg);
}
}
}else {
if ( !dap_chain_global_db_gr_del(dap_strdup(l_event_hash_hex_str), l_gdb_group_events) ) { // Delete old event
ret = 1;
dap_chain_node_cli_set_reply_text(a_str_reply, "Added new sign with cert \"%s\", event %s placed back in round.new\n"
"WARNING! Old event %s with same datum is still in round.new, produced DUP!\n",
l_poa_pvt->events_sign_cert->name ,l_event_new_hash_hex_str, l_event_hash_str);
}
}else {
if(!dap_strcmp(l_hash_out_type, "hex")) {
dap_chain_node_cli_set_reply_text(a_str_reply,
"Added new sign with cert \"%s\", event %s placed back in round.new\n",
l_poa_pvt->events_sign_cert->name, l_event_new_hash_hex_str);
}
else {
dap_chain_node_cli_set_reply_text(a_str_reply,
"Added new sign with cert \"%s\", event %s placed back in round.new\n",
l_poa_pvt->events_sign_cert->name, l_event_new_hash_base58_str);
}
ret = 0;
// dap_chain_net_sync_gdb(l_chain_net); // Propagate changes in pool
if (l_event_is_ready && l_poa_pvt->auto_round_complete) { // cs done (minimum signs & verify passed)
s_round_event_cs_done(l_dag, l_event_new, l_event_new_hash_hex_str, &l_event_round_cfg);
}
}else {
if(!dap_strcmp(l_hash_out_type, "hex")) {
dap_chain_node_cli_set_reply_text(a_str_reply,
"GDB Error: Can't place event %s with new sign back in round.new\n",
......@@ -289,6 +262,7 @@ static int s_cli_dag_poa(int argc, char ** argv, char **a_str_reply)
ret=-31;
}
DAP_DELETE(l_event_new);
DAP_DELETE(l_event_new_hash_hex_str);
DAP_DELETE(l_event_new_hash_base58_str);
} else {
......@@ -297,11 +271,11 @@ static int s_cli_dag_poa(int argc, char ** argv, char **a_str_reply)
l_event_hash_str);
ret=-1;
}
DAP_DELETE(l_event_new);
//DAP_DELETE(l_event_new);
}
// DAP_DELETE( l_gdb_group_events );
// DAP_DELETE(l_event_round_cfg);
// DAP_DELETE(l_event);
DAP_DELETE(l_event);
} else {
dap_chain_node_cli_set_reply_text(a_str_reply, "Command dag_poa requires subcommand 'sign'");
}
......@@ -360,6 +334,141 @@ static int s_callback_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg)
return 0;
}
typedef struct event_clean_dup_items {
uint16_t signs_count;
uint64_t ts_update;
char * hash_str;
UT_hash_handle hh;
} event_clean_dup_items_t;
static event_clean_dup_items_t *s_event_clean_dup_items = NULL;
static void s_round_event_clean_dup(dap_chain_cs_dag_t * a_dag, const char *a_event_hash_hex_str) {
char * l_gdb_group_events = a_dag->gdb_group_events_round_new;
size_t l_event_size = 0;
dap_chain_cs_dag_event_t * l_event;
dap_chain_cs_dag_event_round_cfg_t l_event_round_cfg;
if ( (l_event = dap_chain_cs_dag_event_gdb_get( a_event_hash_hex_str, &l_event_size,
l_gdb_group_events, &l_event_round_cfg )) == NULL ) {
return;
}
//char * l_event_first_hash_str = dap_chain_hash_fast_to_str_new(&l_event_round_cfg.first_event_hash);
size_t l_events_round_size = 0;
// dap_global_db_obj_t * l_events_round = dap_chain_global_db_gr_load(a_dag->gdb_group_events_round_new, &l_events_round_size );
dap_store_obj_t *l_events_round = dap_chain_global_db_driver_read(a_dag->gdb_group_events_round_new, NULL, &l_events_round_size);
uint16_t l_max_signs_count = 0;
//char * l_max_signs_hash;
for (size_t l_index = 0; l_index<l_events_round_size; l_index++) {
// dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *)
// ((dap_chain_cs_dag_event_round_item_t *)l_events_round[l_index].value)->event;
// size_t l_event_size = ((dap_chain_cs_dag_event_round_item_t *)l_events_round[l_index].value)->event_size;
dap_chain_cs_dag_event_round_item_t *l_event_round_item = (dap_chain_cs_dag_event_round_item_t *)l_events_round[l_index].value;
dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *)l_event_round_item->event;
if ( memcmp(&l_event_round_cfg.first_event_hash,
&l_event_round_item->cfg.first_event_hash, sizeof(dap_chain_hash_fast_t)) == 0 ) {
event_clean_dup_items_t * l_item = DAP_NEW_Z(event_clean_dup_items_t);
l_item->signs_count = l_event->header.signs_count;
// l_item->ts_update = l_events_round[l_index].timestamp;
l_item->ts_update = l_event_round_item->cfg.ts_update;
l_item->hash_str = l_events_round[l_index].key;
HASH_ADD_STR(s_event_clean_dup_items, hash_str, l_item);
if ( l_event->header.signs_count > l_max_signs_count ) {
l_max_signs_count = l_event->header.signs_count;
}
}
}
uint64_t l_max_ts_update = 0;
char * l_max_ts_update_hash;
event_clean_dup_items_t *l_clean_item=NULL, *l_clean_tmp=NULL;
HASH_ITER(hh, s_event_clean_dup_items, l_clean_item, l_clean_tmp) {
if ( l_clean_item->signs_count < l_max_signs_count ) {
// delete dup by min signatures
dap_chain_global_db_gr_del(dap_strdup(l_clean_item->hash_str), l_gdb_group_events);
HASH_DEL(s_event_clean_dup_items, l_clean_item);
DAP_DELETE(l_clean_item);
} else if ( l_clean_item->ts_update > l_max_ts_update ) {
l_max_ts_update = l_clean_item->ts_update;
l_max_ts_update_hash = l_clean_item->hash_str;
}
}
HASH_ITER(hh, s_event_clean_dup_items, l_clean_item, l_clean_tmp) {
if ( dap_strcmp(l_max_ts_update_hash, l_clean_item->hash_str) != 0 ) {
// delete dup by older
dap_chain_global_db_gr_del(dap_strdup(l_clean_item->hash_str), l_gdb_group_events);
}
HASH_DEL(s_event_clean_dup_items, l_clean_item);
DAP_DELETE(l_clean_item);
}
//HASH_CLEAR(hh, s_event_clean_dup_items);
dap_store_obj_free(l_events_round, l_events_round_size);
}
static bool s_round_event_ready_minimum_check(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_event_t * a_event,
size_t a_event_size, char * a_event_hash_hex_str,
dap_chain_cs_dag_event_round_cfg_t * a_event_round_cfg) {
if ( a_event->header.signs_count < a_event_round_cfg->confirmations_minimum ) {
return false;
}
a_dag->callback_cs_set_event_round_cfg(a_dag, a_event_round_cfg);
int l_ret_event_verify = a_dag->callback_cs_verify(a_dag, a_event, a_event_size);
if ( l_ret_event_verify == 0 ) {
if (a_event_round_cfg->ts_confirmations_minimum_completed == (uint64_t)0) {
a_event_round_cfg->ts_confirmations_minimum_completed = (uint64_t)time(NULL);
}
return true;
}
log_it(L_ERROR,"Round auto-complete error! Event %s is not passing consensus verification, ret code %d\n",
a_event_hash_hex_str, l_ret_event_verify );
return false;
}
static void s_round_event_cs_done(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_event_t * a_event,
char * a_event_hash_hex_str, dap_chain_cs_dag_event_round_cfg_t * a_event_round_cfg) {
dap_chain_cs_dag_poa_t * l_poa = DAP_CHAIN_CS_DAG_POA( a_dag );
dap_chain_cs_dag_poa_callback_timer_arg_t * l_callback_arg = DAP_NEW_Z(dap_chain_cs_dag_poa_callback_timer_arg_t);
l_callback_arg->dag = a_dag;
l_callback_arg->l_event_hash_hex_str = dap_strdup(a_event_hash_hex_str);
memcpy(&l_callback_arg->event_round_cfg, a_event_round_cfg, sizeof(dap_chain_cs_dag_event_round_cfg_t));
uint32_t l_timeout = a_event_round_cfg->confirmations_timeout;
if (a_event_round_cfg->ts_confirmations_minimum_completed == (uint64_t)0) {
a_event_round_cfg->ts_confirmations_minimum_completed = (uint64_t)time(NULL);
}
if ( a_event->header.signs_count >= PVT(l_poa)->auth_certs_count) {
// placement in chain now if max signs
if (dap_timerfd_start(PVT(l_poa)->wait_sync_before_complete*1000,
(dap_timerfd_callback_t)s_callback_round_event_to_chain,
l_callback_arg) == NULL) {
log_it(L_ERROR,"Can't run timer for Event %s", a_event_hash_hex_str);
} else {
log_it(L_NOTICE,"Run timer %dsec. for Event %s", PVT(l_poa)->wait_sync_before_complete, a_event_hash_hex_str);
}
}
else if ( l_timeout > ((uint64_t)time(NULL) - a_event_round_cfg->ts_confirmations_minimum_completed) ) {
l_timeout = l_timeout - ((uint64_t)time(NULL) - a_event_round_cfg->ts_confirmations_minimum_completed);
// placement in chain by timer
l_timeout += PVT(l_poa)->wait_sync_before_complete;
if (dap_timerfd_start(l_timeout*1000,
(dap_timerfd_callback_t)s_callback_round_event_to_chain,
l_callback_arg) == NULL) {
log_it(L_ERROR,"Can't run timer for Event %s", a_event_hash_hex_str);
} else {
log_it(L_NOTICE,"Run timer %dsec. for Event %s", l_timeout, a_event_hash_hex_str);
}
} else { // placement in chain now if timer out
if (dap_timerfd_start(PVT(l_poa)->wait_sync_before_complete*1000,
(dap_timerfd_callback_t)s_callback_round_event_to_chain,
l_callback_arg) == NULL) {
log_it(L_ERROR,"Can't run timer for Event %s", a_event_hash_hex_str);
} else {
log_it(L_NOTICE,"Run timer %dsec. for Event %s", PVT(l_poa)->wait_sync_before_complete, a_event_hash_hex_str);
}
}
}
static void s_callback_get_round_cfg(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_event_round_cfg_t * a_event_round_cfg) {
dap_chain_cs_dag_poa_t * l_poa = DAP_CHAIN_CS_DAG_POA(a_dag);
dap_chain_cs_dag_poa_pvt_t * l_poa_pvt = PVT (l_poa);
......@@ -368,7 +477,6 @@ static void s_callback_get_round_cfg(dap_chain_cs_dag_t * a_dag, dap_chain_cs_da
a_event_round_cfg->ts_confirmations_minimum_completed = 0;
}
static bool s_callback_round_event_to_chain(dap_chain_cs_dag_poa_callback_timer_arg_t * a_callback_arg) {
dap_chain_cs_dag_t * l_dag = a_callback_arg->dag;
dap_chain_net_t *l_net = dap_chain_net_by_id(l_dag->chain->net_id);
......@@ -489,6 +597,57 @@ static dap_chain_cs_dag_event_t * s_callback_event_create(dap_chain_cs_dag_t * a
return NULL;
}
static int s_callback_event_round_sync(dap_chain_cs_dag_t * a_dag, const char a_op_code, const char *a_group,
const char *a_key, const void *a_value, const size_t a_value_size)
{
dap_chain_net_t *l_net = dap_chain_net_by_id( a_dag->chain->net_id);
if ( a_value == NULL || a_op_code != 'a' ) {
return 0;
}
dap_chain_cs_dag_poa_t * l_poa = DAP_CHAIN_CS_DAG_POA(a_dag);
if ( !PVT(l_poa)->auto_confirmation ) {
s_round_event_clean_dup(a_dag, a_key); // Delete dup for manual mode
return 0;
}
dap_chain_cs_dag_event_round_item_t *l_event_round_item = (dap_chain_cs_dag_event_round_item_t *)a_value;
size_t l_event_size = l_event_round_item->event_size;
dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *)l_event_round_item->event;
// dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *)DAP_DUP_SIZE(l_event_round_item->event, l_event_size);
size_t l_event_size_new = 0;
dap_chain_cs_dag_event_t *l_event_new = dap_chain_cs_dag_event_copy_with_sign_add(l_event, l_event_size,
&l_event_size_new, l_net,
PVT(l_poa)->events_sign_cert->enc_key);
if ( l_event_new ) {
char * l_gdb_group_events = a_dag->gdb_group_events_round_new;
dap_chain_hash_fast_t l_event_new_hash;
dap_chain_cs_dag_event_calc_hash(l_event_new, l_event_size_new, &l_event_new_hash);
char * l_event_new_hash_hex_str = dap_chain_hash_fast_to_str_new(&l_event_new_hash);
bool l_event_is_ready = s_round_event_ready_minimum_check(a_dag, l_event_new, l_event_size_new,
l_event_new_hash_hex_str, &l_event_round_item->cfg);
if (dap_chain_cs_dag_event_gdb_set(l_event_new_hash_hex_str, l_event_new,
l_event_size_new, l_gdb_group_events, &l_event_round_item->cfg) ){
dap_chain_global_db_gr_del(dap_strdup(a_key), l_gdb_group_events); // Delete old event
if (l_event_is_ready && PVT(l_poa)->auto_round_complete) { // cs done (minimum signs & verify passed)
s_round_event_cs_done(a_dag, l_event_new, l_event_new_hash_hex_str, &l_event_round_item->cfg);
}
}
s_round_event_clean_dup(a_dag, l_event_new_hash_hex_str); // Delete dup
DAP_DELETE(l_event_new);
} else {
// if (PVT(l_poa)->auto_round_complete) {
// if (s_round_event_ready_minimum_check(a_dag, l_event, l_event_size, &l_event_round_item->cfg)){
// s_round_event_cs_done(a_dag, l_event, a_key, &l_event_round_item->cfg);
// }
// }
s_round_event_clean_dup(a_dag, a_key); // Delete dup
}
return 0;
}
/**
* @brief
......@@ -510,7 +669,6 @@ static int s_callback_event_verify(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_
uint16_t l_certs_count_verify = a_dag->use_event_round_cfg ? a_dag->event_round_cfg.confirmations_minimum
: l_poa_pvt->auth_certs_count_verify;
a_dag->use_event_round_cfg = false;
// if ( a_dag_event->header.signs_count >= l_poa_pvt->auth_certs_count_verify ){
if ( a_dag_event->header.signs_count >= l_certs_count_verify ){
size_t l_verified = 0;
......
......@@ -689,6 +689,7 @@ bool dap_chain_global_db_gr_save(dap_global_db_obj_t* a_objs, size_t a_objs_coun
for(size_t q = 0; q < a_objs_count; ++q) {
dap_store_obj_t *store_data_cur = l_store_data + q;
dap_global_db_obj_t *a_obj_cur = a_objs + q;
store_data_cur->type = 'a';
store_data_cur->key = a_obj_cur->key;
store_data_cur->group = (char*)a_group;
store_data_cur->value = a_obj_cur->value;
......
......@@ -151,8 +151,9 @@ typedef struct dap_chain_net_pvt{
//Active synchronizing link
dap_chain_node_client_t *active_link;
dap_list_t *links_queue; // Links waiting for sync
dap_list_t *net_links; // Links list
dap_list_t *net_links; // Links list
size_t links_connected_count;
atomic_uint links_dns_requests;
......@@ -356,9 +357,11 @@ inline static const char * s_net_state_to_str(dap_chain_net_state_t l_state)
*/
int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_new_state)
{
if (PVT(a_net)->state_target == a_new_state){
log_it(L_WARNING,"Already going to state %s",s_net_state_to_str(a_new_state));
if (PVT(a_net)->state != NET_STATE_OFFLINE){
PVT(a_net)->state = PVT(a_net)->state_target = NET_STATE_OFFLINE;
s_net_states_proc(NULL, a_net);
}
PVT(a_net)->state_target = a_new_state;
pthread_mutex_lock( &PVT(a_net)->state_mutex_cond); // Preventing call of state_go_to before wait cond will be armed
......@@ -725,6 +728,7 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t *a_node_cl
log_it(L_ERROR, "Links count is zero in disconnected callback, looks smbd decreased it twice or forget to increase on connect/reconnect");
}
if (l_net_pvt->state_target != NET_STATE_OFFLINE) {
a_node_client->keep_connection = true;
for (dap_list_t *it = l_net_pvt->net_links; it; it = it->next) {
if (((struct net_link *)it->data)->link == NULL) { // We have a free prepared link
s_node_link_remove(l_net_pvt, a_node_client);
......@@ -735,9 +739,7 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t *a_node_cl
}
}
dap_chain_node_info_t *l_link_node_info = s_get_dns_link_from_cfg(l_net);
if (!l_link_node_info) { // Try to keep this connection
a_node_client->keep_connection = true;
} else {
if (l_link_node_info) {
struct link_dns_request *l_dns_request = DAP_NEW_Z(struct link_dns_request);
l_dns_request->net = l_net;
if (dap_chain_node_info_dns_request(l_link_node_info->hdr.ext_addr_v4,
......@@ -750,7 +752,6 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t *a_node_cl
log_it(L_ERROR, "Can't process node info dns request");
DAP_DELETE(l_link_node_info);
DAP_DELETE(l_dns_request);
a_node_client->keep_connection = true;
} else {
s_node_link_remove(l_net_pvt, a_node_client);
a_node_client->keep_connection = false;
......@@ -823,7 +824,14 @@ static void s_node_link_callback_delete(dap_chain_node_client_t * a_node_client,
"}\n", a_node_client->net->pub.id.uint64, a_node_client->info->hdr.cell_id.uint64,
NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address));
return;
} else if (a_node_client->is_connected) {
a_node_client->is_connected = false;
if (l_net_pvt->links_connected_count)
l_net_pvt->links_connected_count--;
else
log_it(L_ERROR, "Links count is zero in delete callback");
}
dap_chain_net_sync_unlock(l_net, a_node_client);
pthread_rwlock_wrlock(&l_net_pvt->rwlock);
for ( dap_list_t * it = l_net_pvt->net_links; it; it=it->next ){
if (((struct net_link *)it->data)->link == a_node_client) {
......@@ -1145,11 +1153,10 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg)
case NET_STATE_LINKS_CONNECTING: {
log_it(L_INFO, "%s.state: NET_STATE_LINKS_CONNECTING",l_net->pub.name);
int l_used_links = 0;
size_t l_used_links = 0;
for (dap_list_t *l_tmp = l_net_pvt->net_links; l_tmp; l_tmp = dap_list_next(l_tmp)) {
dap_chain_node_info_t *l_link_info = ((struct net_link *)l_tmp->data)->link_info;
dap_chain_node_client_t *l_client = dap_chain_net_client_create_n_connect(l_net, l_link_info);
l_client->keep_connection = true;
((struct net_link *)l_tmp->data)->link = l_client;
if (++l_used_links == s_required_links_count)
break;
......@@ -1181,6 +1188,11 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg)
return ! l_repeat_after_exit;
}
int s_net_list_compare_uuids(const void *a_uuid1, const void *a_uuid2)
{
return memcmp(a_uuid1, a_uuid2, sizeof(dap_events_socket_uuid_t));
}
bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client)
{
dap_chain_net_pvt_t *l_net_pvt = PVT(a_net);
......@@ -1202,18 +1214,35 @@ bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t
l_net_pvt->active_link = a_client;
}
pthread_rwlock_unlock(&l_net_pvt->rwlock);
if (l_found && !dap_list_find_custom(l_net_pvt->links_queue, &a_client->uuid, s_net_list_compare_uuids)) {
dap_events_socket_uuid_t *l_uuid = DAP_DUP(&a_client->uuid);
l_net_pvt->links_queue = dap_list_append(l_net_pvt->links_queue, l_uuid);
}
return !l_found;
}
void dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client)
bool dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client)
{
if (!a_net)
return;
return false;
dap_chain_net_pvt_t *l_net_pvt = PVT(a_net);
pthread_rwlock_rdlock(&l_net_pvt->rwlock);
if (!a_client || l_net_pvt->active_link == a_client)
l_net_pvt->active_link = NULL;
pthread_rwlock_unlock(&l_net_pvt->rwlock);
pthread_rwlock_unlock(&l_net_pvt->rwlock);
while (l_net_pvt->active_link == NULL && l_net_pvt->links_queue) {
dap_events_socket_uuid_t *l_uuid = l_net_pvt->links_queue->data;
dap_chain_node_sync_status_t l_status = dap_chain_node_client_start_sync(l_uuid);
if (l_status != NODE_SYNC_STATUS_WAITING) {
DAP_DELETE(l_uuid);
dap_list_t *l_to_remove = l_net_pvt->links_queue;
l_net_pvt->links_queue = l_net_pvt->links_queue->next;
DAP_DELETE(l_to_remove);
} else {
break;
}
}
return l_net_pvt->active_link;
}
/**
* @brief dap_chain_net_client_create_n_connect
......@@ -1223,7 +1252,14 @@ void dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t *
*/
struct dap_chain_node_client * dap_chain_net_client_create_n_connect( dap_chain_net_t * a_net,struct dap_chain_node_info* a_link_info)
{
return dap_chain_node_client_create_n_connect(a_net, a_link_info,"CN",(dap_chain_node_client_callbacks_t *)&s_node_link_callbacks,a_net);
dap_chain_node_client_t *l_ret = dap_chain_node_client_create_n_connect(a_net,
a_link_info,
"CN",
(dap_chain_node_client_callbacks_t *)&s_node_link_callbacks,
a_net);
if (l_ret)
l_ret->keep_connection = true;
return l_ret;
}
/**
......@@ -1583,7 +1619,10 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply)
else if(strcmp(l_go_str, "sync") == 0) {
dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" resynchronizing",
l_net->pub.name);
dap_chain_net_state_go_to(l_net, NET_STATE_SYNC_CHAINS);
if (PVT(l_net)->state_target == NET_STATE_ONLINE)
dap_chain_net_state_go_to(l_net, NET_STATE_ONLINE);
else
dap_chain_net_state_go_to(l_net, NET_STATE_SYNC_CHAINS);
}
} else if ( l_get_str){
......@@ -1783,14 +1822,19 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply)
dap_chain_gdb_ledger_load((char *)dap_chain_gdb_get_group(l_chain), l_chain);
} else {
dap_chain_load_all(l_chain);
}
}
bool l_processed;
do {
l_processed = false;
DL_FOREACH(l_net->pub.chains, l_chain) {
if (l_chain->callback_atom_add_from_treshold) {
while (l_chain->callback_atom_add_from_treshold(l_chain, NULL)) {
log_it(L_DEBUG, "Added atom from treshold");
}
}
}
}
} while (l_processed);
} else {
dap_chain_node_cli_set_reply_text(a_str_reply,
"Command requires one of subcomand: sync, link, go, get, stats, ca, ledger");
......@@ -2251,6 +2295,14 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx)
l_list = dap_list_next(l_list);
}
dap_list_free(l_prior_list);
dap_chain_t *l_chain;
DL_FOREACH(l_net->pub.chains, l_chain) {
if (l_chain->callback_atom_add_from_treshold) {
while (l_chain->callback_atom_add_from_treshold(l_chain, NULL)) {
log_it(L_DEBUG, "Added atom from treshold");
}
}
}
const char* l_default_chain_name = dap_config_get_item_str(l_cfg , "general" , "default_chain");
if(l_default_chain_name)
......
......@@ -197,21 +197,13 @@ static void s_node_client_connected_synchro_start_callback(dap_worker_t *a_worke
DAP_DELETE(a_arg);
}
/**
* @brief s_timer_update_states_callback
* @param a_arg
* @return
*/
static bool s_timer_update_states_callback(void *a_arg)
dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_events_socket_uuid_t *a_uuid)
{
dap_chain_node_client_handle_t *l_client_found = NULL;
dap_events_socket_uuid_t *l_uuid = (dap_events_socket_uuid_t *)a_arg;
assert(l_uuid);
HASH_FIND(hh, s_clients, l_uuid, sizeof(*l_uuid), l_client_found);
HASH_FIND(hh, s_clients, a_uuid, sizeof(*a_uuid), l_client_found);
if(!l_client_found){
log_it(L_DEBUG,"Chain node client %p was deleted before timer fired, nothing to do", l_uuid);
DAP_DELETE(l_uuid);
return false;
log_it(L_DEBUG,"Chain node client %p was deleted before timer fired, nothing to do", a_uuid);
return NODE_SYNC_STATUS_MISSING;
}
dap_chain_node_client_t *l_me = l_client_found->client;
......@@ -225,6 +217,7 @@ static bool s_timer_update_states_callback(void *a_arg)
dap_client_t * l_client = dap_client_from_esocket(l_es);
if (l_client ) {
dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t*) l_client->_inheritor;
assert(l_node_client == l_me);
if (l_node_client && l_node_client->ch_chain && l_node_client->stream_worker && l_node_client->ch_chain_uuid) {
dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(l_node_client->stream_worker, l_node_client->ch_chain_uuid);
if (l_ch) {
......@@ -233,35 +226,70 @@ static bool s_timer_update_states_callback(void *a_arg)
dap_chain_net_t * l_net = l_node_client->net;
assert(l_net);
// If we do nothing - init sync process
if (l_ch_chain->state == CHAIN_STATE_IDLE && dap_chain_net_sync_trylock(l_net, l_me)) {
//if (l_ch_chain->state == CHAIN_STATE_IDLE) {
log_it(L_INFO, "Start synchronization process with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr));
dap_stream_ch_chain_sync_request_t l_sync_gdb = {};
l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
dap_stream_ch_chain_pkt_write_unsafe(l_node_client->ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ,
l_net->pub.id.uint64, 0, 0,
&l_sync_gdb, sizeof(l_sync_gdb));
}
return true;
if (l_ch_chain->state == CHAIN_STATE_IDLE) {
if (dap_chain_net_sync_trylock(l_net, l_node_client)) {
log_it(L_INFO, "Start synchronization process with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr));
dap_stream_ch_chain_sync_request_t l_sync_gdb = {};
l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
dap_stream_ch_chain_pkt_write_unsafe(l_node_client->ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ,
l_net->pub.id.uint64, 0, 0,
&l_sync_gdb, sizeof(l_sync_gdb));
return NODE_SYNC_STATUS_STARTED;
} else
return NODE_SYNC_STATUS_WAITING;
} else
return NODE_SYNC_STATUS_IN_PROGRESS;
}
}
}
}
return NODE_SYNC_STATUS_FAILED;
}
// if we not returned yet
l_me->state = NODE_CLIENT_STATE_DISCONNECTED;
if (l_me->keep_connection) {
if (dap_client_pvt_find(l_me->client->pvt_uuid)) {
log_it(L_INFO, "Reconnecting node client with peer "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_me->remote_node_addr));
l_me->state = NODE_CLIENT_STATE_CONNECTING ;
dap_client_go_stage(l_me->client, STAGE_STREAM_STREAMING, s_stage_connected_callback);
} else
dap_chain_node_client_close(l_me);
/**
* @brief s_timer_update_states_callback
* @param a_arg
* @return
*/
static bool s_timer_update_states_callback(void *a_arg)
{
dap_events_socket_uuid_t *l_uuid = (dap_events_socket_uuid_t *)a_arg;
assert(l_uuid);
dap_chain_node_sync_status_t l_status = dap_chain_node_client_start_sync(l_uuid);
if (l_status == NODE_SYNC_STATUS_MISSING) {
DAP_DELETE(l_uuid);
return false;
}
DAP_DELETE(l_uuid);
return false;
if (l_status == NODE_SYNC_STATUS_FAILED) {
dap_chain_node_client_handle_t *l_client_found = NULL;
HASH_FIND(hh, s_clients, l_uuid, sizeof(*l_uuid), l_client_found);
if(l_client_found) {
dap_chain_node_client_t *l_me = l_client_found->client;
l_me->state = NODE_CLIENT_STATE_DISCONNECTED;
if (l_me->keep_connection) {
if (dap_client_pvt_find(l_me->client->pvt_uuid)) {
if (l_me->callbacks.disconnected) {
l_me->callbacks.disconnected(l_me, l_me->callbacks_arg);
}
if (l_me->keep_connection) {
log_it(L_INFO, "Reconnecting node client with peer "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_me->remote_node_addr));
l_me->state = NODE_CLIENT_STATE_CONNECTING ;
dap_client_go_stage(l_me->client, STAGE_STREAM_STREAMING, s_stage_connected_callback);
} else {
dap_chain_node_client_close(l_me);
}
} else
dap_chain_node_client_close(l_me);
}
}
DAP_DELETE(l_uuid);
return false;
}
return true;
}
/**
* @brief a_stage_end_callback
* @param a_client
......@@ -467,13 +495,12 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha
}else{ // If no - over with sync process
dap_chain_node_addr_t * l_node_addr = dap_chain_net_get_cur_addr(l_net);
log_it(L_INFO, "In: State node %s."NODE_ADDR_FP_STR" is SYNCED",l_net->pub.name, NODE_ADDR_FP_ARGS(l_node_addr) );
dap_chain_net_sync_unlock(l_net, l_node_client);
bool l_have_waiting = dap_chain_net_sync_unlock(l_net, l_node_client);
l_node_client->state = NODE_CLIENT_STATE_SYNCED;
if (dap_chain_net_get_target_state(l_net) == NET_STATE_ONLINE) {
dap_timerfd_reset(l_node_client->sync_timer);
dap_chain_net_set_state(l_net, NET_STATE_ONLINE);
}
else
} else if (!l_have_waiting)
dap_chain_net_state_go_to(l_net, NET_STATE_OFFLINE);
#ifndef _WIN32
pthread_cond_broadcast(&l_node_client->wait_cond);
......@@ -505,21 +532,27 @@ static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t* a_ch_ch
(void) a_ch_chain;
dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg;
assert(a_arg);
dap_stream_ch_t * l_ch = NULL;
//if((l_ch = dap_stream_ch_find_by_uuid_unsafe(l_node_client->stream_worker, l_node_client->ch_chain_uuid)) != NULL){
switch (a_pkt_type) {
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: {
if(s_stream_ch_chain_debug_more)
log_it(L_INFO,"Out: global database sent to uplink "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr));
} break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: {
if(s_stream_ch_chain_debug_more)
log_it(L_INFO,"Out: chain %"DAP_UINT64_FORMAT_x" sent to uplink "NODE_ADDR_FP_STR,l_node_client->cur_chain ? l_node_client->cur_chain->id.uint64 : 0, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr));
}break;
default: {
}
switch (a_pkt_type) {
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: {
if(s_stream_ch_chain_debug_more)
log_it(L_INFO,"Out: global database sent to uplink "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr));
} break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: {
if(s_stream_ch_chain_debug_more)
log_it(L_INFO,"Out: chain %"DAP_UINT64_FORMAT_x" sent to uplink "NODE_ADDR_FP_STR,l_node_client->cur_chain ? l_node_client->cur_chain->id.uint64 : 0, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr));
} break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_TIMEOUT: {
dap_chain_net_t *l_net = l_node_client->net;
assert(l_net);
dap_chain_node_addr_t *l_node_addr = dap_chain_net_get_cur_addr(l_net);
log_it(L_DEBUG, "In: State node %s."NODE_ADDR_FP_STR" is timeout for sync", l_net->pub.name, NODE_ADDR_FP_ARGS(l_node_addr));
l_node_client->state = NODE_CLIENT_STATE_ERROR;
dap_chain_net_sync_unlock(l_net, l_node_client);
dap_timerfd_reset(l_node_client->sync_timer);
} break;
default: {
}
//}
}
}
/**
......@@ -564,7 +597,7 @@ static int save_stat_to_database(dap_stream_ch_chain_net_srv_pkt_test_t *a_reque
l_key = strtoll(l_obj->key, NULL, 16);
}
char *l_key_str = dap_strdup_printf("%06x", ++l_key);
if(!dap_chain_global_db_gr_set(dap_strdup(l_key_str), (uint8_t *) json_str, strlen(json_str) + 1, l_group)) {
if(!dap_chain_global_db_gr_set(dap_strdup(l_key_str), dap_strdup(json_str), strlen(json_str) + 1, l_group)) {
l_ret = -1;
}
DAP_DELETE(l_key_str);
......
......@@ -119,7 +119,7 @@ void dap_chain_net_set_flag_sync_from_zero(dap_chain_net_t * a_net, bool a_flag_
bool dap_chain_net_get_flag_sync_from_zero( dap_chain_net_t * a_net);
bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client);
void dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client);
bool dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client);
dap_chain_net_t * dap_chain_net_by_name( const char * a_name);
dap_chain_net_t * dap_chain_net_by_id( dap_chain_net_id_t a_id);
......
......@@ -49,6 +49,14 @@ typedef enum dap_chain_node_client_state {
NODE_CLIENT_STATE_CHECKED = 130,
} dap_chain_node_client_state_t;
typedef enum dap_chain_node_sync_status {
NODE_SYNC_STATUS_STARTED = 0,
NODE_SYNC_STATUS_WAITING = 1,
NODE_SYNC_STATUS_IN_PROGRESS = 2,
NODE_SYNC_STATUS_FAILED = -1,
NODE_SYNC_STATUS_MISSING = -2
} dap_chain_node_sync_status_t;
typedef struct dap_chain_node_client dap_chain_node_client_t;
typedef void (*dap_chain_node_client_callback_t)(dap_chain_node_client_t *, void*);
......@@ -169,6 +177,8 @@ int dap_chain_node_client_set_callbacks(dap_client_t *a_client, uint8_t a_ch_id)
int dap_chain_node_client_send_nodelist_req(dap_chain_node_client_t *a_client);
dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_events_socket_uuid_t *l_uuid);
static inline const char * dap_chain_node_client_state_to_str( dap_chain_node_client_state_t a_state)
{
switch (a_state) {
......
......@@ -26,6 +26,7 @@
#include <pthread.h>
#include "errno.h"
#include "uthash.h"
#include "utlist.h"
#ifdef _WIN32
#include <winsock2.h>
......@@ -162,15 +163,17 @@ void dap_chain_cs_dag_deinit(void)
}
static void s_history_callback_notify(void * a_arg, const char a_op_code, const char * a_group,
static void s_history_callback_round_notify(void * a_arg, const char a_op_code, const char * a_group,
const char * a_key, const void * a_value, const size_t a_value_size)
{
if (a_arg){
dap_chain_cs_dag_t * l_dag = (dap_chain_cs_dag_t *) a_arg;
dap_chain_net_t *l_net = dap_chain_net_by_id( l_dag->chain->net_id);
log_it(L_DEBUG,"%s.%s: op_code='%c' group=\"%s\" key=\"%s\" value_size=%zu",l_net->pub.name,
l_dag->chain->name, a_op_code, a_group, a_key, a_value_size);
// dap_chain_node_mempool_autoproc_notify((void *)l_net, a_op_code, a_group, a_key, a_value, a_value_size);
log_it(L_DEBUG,"%s.%s: op_code='%c' group=\"%s\" key=\"%s\" value_size=%zu",
l_net->pub.name, l_dag->chain->name, a_op_code, a_group, a_key, a_value_size);
if (l_dag->callback_cs_event_round_sync) {
l_dag->callback_cs_event_round_sync(l_dag, a_op_code, a_group, a_key, a_value, a_value_size);
}
dap_chain_net_sync_gdb_broadcast((void *)l_net, a_op_code, a_group, a_key, a_value, a_value_size);
}
}
......@@ -250,19 +253,22 @@ int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg)
// l_dag->gdb_group_events_round_new = dap_strdup( dap_config_get_item_str_default(a_chain_cfg,"dag","gdb_group_events_round_new",
// "events.round.new"));
char * l_round_new_str = dap_strdup( dap_config_get_item_str_default(a_chain_cfg,"dag","gdb_group_events_round_new", "events.round.new"));
char * l_round_new_str = dap_strdup( dap_config_get_item_str_default(a_chain_cfg,"dag","gdb_group_events_round_new", "new"));
dap_chain_net_t *l_net = dap_chain_net_by_id(a_chain->net_id);
//if(!l_dag->is_celled){
l_dag->gdb_group_events_round_new = dap_strdup_printf( "round-gdb.%s.chain-%016llX.%s",l_net->pub.name,
a_chain->id.uint64, l_round_new_str);
// }else {
// l_dag->gdb_group_events_round_new = dap_strdup_printf( "round-gdb.%s.chain-%016llX.cell-%016llX.%s",l_net->pub.name,
// a_chain->id.uint64, l_net->pub.cell_id.uint64, l_round_new_str);
// }
if(!l_dag->is_celled){
//char * gdb_group = dap_strdup_printf( "%016llx-%016llx-round", l_net->pub.id.uint64, a_chain->id.uint64);
char * gdb_group = dap_strdup_printf( "%s-%s-round", l_net->pub.name, a_chain->name);
l_dag->gdb_group_events_round_new = dap_strdup_printf( "%s.%s", gdb_group, l_round_new_str);
dap_chain_global_db_add_sync_group(gdb_group, s_history_callback_round_notify, l_dag);
}else {
//char * gdb_group = dap_strdup_printf( "%016llx-%016llx-%016llx-round", l_net->pub.id.uint64, a_chain->id.uint64, l_net->pub.cell_id.uint64);
char * gdb_group = dap_strdup_printf( "%s-%s-%016llx-round", l_net->pub.name, a_chain->name, a_chain->cells->id.uint64);
l_dag->gdb_group_events_round_new = dap_strdup_printf( "%s.%s", gdb_group, l_round_new_str);
dap_chain_global_db_add_sync_group(gdb_group, s_history_callback_round_notify, l_dag);
}
DAP_DELETE(l_round_new_str);
dap_chain_global_db_add_sync_group("round-gdb", s_history_callback_notify, l_dag);
if ( l_dag->is_single_line ) {
log_it (L_NOTICE, "DAG chain initialized (single line)");
} else {
......@@ -451,7 +457,7 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha
switch (ret) {
case ATOM_MOVE_TO_THRESHOLD:
pthread_rwlock_wrlock(l_events_rwlock);
HASH_ADD(hh, PVT(l_dag)->events_treshold, hash,sizeof (l_event_item->hash), l_event_item);
HASH_ADD(hh, PVT(l_dag)->events_treshold, hash, sizeof(l_event_item->hash), l_event_item);
pthread_rwlock_unlock(l_events_rwlock);
if(s_debug_more)
log_it(L_DEBUG, "... added to threshold");
......@@ -461,7 +467,7 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha
switch (l_consensus_check) {
case 0:
pthread_rwlock_wrlock(l_events_rwlock);
HASH_ADD(hh, PVT(l_dag)->events,hash,sizeof (l_event_item->hash), l_event_item);
HASH_ADD(hh, PVT(l_dag)->events,hash, sizeof(l_event_item->hash), l_event_item);
s_dag_events_lasts_process_new_last_event(l_dag, l_event_item);
pthread_rwlock_unlock(l_events_rwlock);
if(s_debug_more)
......@@ -632,30 +638,33 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain
if (l_dag->is_add_directly) {
l_cell = a_chain->cells;
if (s_chain_callback_atom_add(a_chain, l_event, l_event_size) == ATOM_ACCEPT) {
// add events to file
if (dap_chain_cell_file_append(l_cell, l_event, l_event_size ) < 0) {
log_it(L_ERROR, "Can't add new event to the file '%s'", l_cell->file_storage_path);
continue;
if (dap_chain_atom_save(a_chain, (uint8_t *)l_event, l_event_size, a_chain->cells->id) < 0) {
log_it(L_ERROR, "Can't add new event to the file");
}
// add all atoms from treshold
{
dap_chain_atom_ptr_t l_atom_treshold;
do {
size_t l_atom_treshold_size;
// add in ledger
l_atom_treshold = s_chain_callback_atom_add_from_treshold(a_chain, &l_atom_treshold_size);
// add into file
if(l_atom_treshold) {
int l_res = dap_chain_cell_file_append(l_cell, l_atom_treshold, l_atom_treshold_size);
if(l_res < 0) {
log_it(L_ERROR, "Can't save event %p from treshold to the file '%s'",
l_atom_treshold, l_cell ? l_cell->file_storage_path : "[null]");
}
// add all atoms from treshold
dap_chain_net_t *l_net = dap_chain_net_by_id(a_chain->net_id);
dap_chain_t *l_cur_chain;
bool l_processed;
do {
l_processed = false;
DL_FOREACH(l_net->pub.chains, l_cur_chain) {
if (l_cur_chain->callback_atom_add_from_treshold) {
dap_chain_atom_ptr_t l_atom_treshold;
do {
size_t l_atom_treshold_size;
// add in ledger
l_atom_treshold = l_cur_chain->callback_atom_add_from_treshold(l_cur_chain, &l_atom_treshold_size);
// add into file
if (l_atom_treshold) {
int l_res = dap_chain_atom_save(l_cur_chain, l_atom_treshold, l_atom_treshold_size, l_cur_chain->cells->id);
if (l_res < 0) {
log_it(L_ERROR, "Can't save event %p from treshold", l_atom_treshold);
}
}
} while (l_atom_treshold);
}
}
while(l_atom_treshold);
}
} while (l_processed);
l_datum_processed++;
}
else {
......@@ -713,9 +722,6 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain
}
}
DAP_DELETE(l_hashes);
if (l_cell) {
dap_chain_cell_close(l_cell);
}
dap_chain_global_db_objs_delete(l_events_round_new, l_events_round_new_size);
return l_datum_processed;
}
......@@ -936,10 +942,10 @@ int dap_chain_cs_dag_event_verify_hashes_with_treshold(dap_chain_cs_dag_t * a_da
if (ret == DAP_THRESHOLD_CONFLICTING)
return ret;
return l_is_events_all_hashes ?
l_is_events_main_hashes ?
(l_is_events_main_hashes ?
DAP_THRESHOLD_OK :
DAP_THRESHOLD_NO_HASHES :
DAP_THRESHOLD_NO_HASHES_IN_MAIN;
DAP_THRESHOLD_NO_HASHES_IN_MAIN) :
DAP_THRESHOLD_NO_HASHES;
}
/**
......@@ -950,40 +956,35 @@ int dap_chain_cs_dag_event_verify_hashes_with_treshold(dap_chain_cs_dag_t * a_da
dap_chain_cs_dag_event_item_t* dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger)
{
bool res = false;
// TODO Process finish treshold. For now - easiest from possible
dap_chain_cs_dag_event_item_t * l_event_item = NULL, * l_event_item_tmp = NULL;
pthread_rwlock_wrlock(&PVT(a_dag)->events_rwlock);
// !!!
int l_count = HASH_COUNT(PVT(a_dag)->events_treshold);
log_it(L_DEBUG, "*** %d events in threshold", l_count);
HASH_ITER(hh,PVT(a_dag)->events_treshold,l_event_item, l_event_item_tmp){
dap_dag_threshold_verification_res_t ret = dap_chain_cs_dag_event_verify_hashes_with_treshold (a_dag, l_event_item->event);
if ( ret == DAP_THRESHOLD_OK || ret == DAP_THRESHOLD_CONFLICTING ){ // All its hashes are in main table, move thats one too into it
HASH_DEL(PVT(a_dag)->events_treshold,l_event_item);
if(ret == DAP_THRESHOLD_OK){
if(s_debug_more) {
char * l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_item->hash);
log_it(L_DEBUG, "Processing event (threshold): %s...", l_event_hash_str);
DAP_DELETE(l_event_hash_str);
}
int l_add_res = s_dap_chain_add_atom_to_events_table(a_dag, a_ledger, l_event_item);
if (!l_add_res) {
HASH_ADD(hh, PVT(a_dag)->events,hash,sizeof (l_event_item->hash), l_event_item);
s_dag_events_lasts_process_new_last_event(a_dag, l_event_item);
if(s_debug_more)
log_it(L_INFO, "... moved from treshold to main chains");
res = true;
break;
}else{
if(s_debug_more)
log_it(L_WARNING, "... error adding");
DAP_DELETE(l_event_item);
}
//res = true;
}else if(ret == DAP_THRESHOLD_CONFLICTING)
HASH_ADD(hh, PVT(a_dag)->events_treshold_conflicted, hash,sizeof (l_event_item->hash), l_event_item);
HASH_ITER(hh, PVT(a_dag)->events_treshold, l_event_item, l_event_item_tmp) {
dap_dag_threshold_verification_res_t ret = dap_chain_cs_dag_event_verify_hashes_with_treshold(a_dag, l_event_item->event);
if (ret == DAP_THRESHOLD_OK) {
if (s_debug_more) {
char * l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_item->hash);
log_it(L_DEBUG, "Processing event (threshold): %s...", l_event_hash_str);
DAP_DELETE(l_event_hash_str);
}
int l_add_res = s_dap_chain_add_atom_to_events_table(a_dag, a_ledger, l_event_item);
if (!l_add_res) {
HASH_DEL(PVT(a_dag)->events_treshold, l_event_item);
HASH_ADD(hh, PVT(a_dag)->events, hash, sizeof(l_event_item->hash), l_event_item);
s_dag_events_lasts_process_new_last_event(a_dag, l_event_item);
if(s_debug_more)
log_it(L_INFO, "... moved from treshold to main chains");
res = true;
break;
} else {
if(s_debug_more)
log_it(L_WARNING, "... error adding");
}
//res = true;
} else if (ret == DAP_THRESHOLD_CONFLICTING) {
HASH_DEL(PVT(a_dag)->events_treshold, l_event_item);
HASH_ADD(hh, PVT(a_dag)->events_treshold_conflicted, hash, sizeof (l_event_item->hash), l_event_item);
}
}
pthread_rwlock_unlock(&PVT(a_dag)->events_rwlock);
......@@ -1674,10 +1675,22 @@ static int s_cli_dag(int argc, char ** argv, char **a_str_reply)
dap_string_append_printf(l_str_tmp,"\nEvent %s:\n", l_event_hash_str);
// Round cfg
if ( strcmp(l_from_events_str,"round.new") == 0 )
if ( strcmp(l_from_events_str,"round.new") == 0 ){
dap_string_append_printf(l_str_tmp,
"\t\tRound cfg:\n\t\t\t\tconfirmations_minimum:%d\n\t\t\t\tconfirmations_timeout:%d\n",
l_event_round_cfg.confirmations_minimum, l_event_round_cfg.confirmations_timeout);
"\t\tRound cfg:\n\t\t\t\tconfirmations_minimum: %d\n\t\t\t\tconfirmations_timeout: %d\n",
l_event_round_cfg.confirmations_minimum,
l_event_round_cfg.confirmations_timeout);
char * l_hash_str = dap_chain_hash_fast_to_str_new(&l_event_round_cfg.first_event_hash);
dap_string_append_printf(l_str_tmp, "\t\t\t\tfirst_event_hash: %s\n", l_hash_str);
DAP_DELETE(l_hash_str);
dap_string_append_printf(l_str_tmp,
"\t\t\t\tts_update: %s",
dap_ctime_r((time_t *)&l_event_round_cfg.ts_update, buf) );
if (l_event_round_cfg.ts_confirmations_minimum_completed != 0)
dap_string_append_printf(l_str_tmp,
"\t\t\t\tts_confirmations_minimum_completed: %s",
dap_ctime_r((time_t *)&l_event_round_cfg.ts_confirmations_minimum_completed, buf) );
}
// Header
dap_string_append_printf(l_str_tmp,"\t\tHeader:\n");
......
......@@ -22,14 +22,13 @@
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dap_chain_cs_dag_event.h"
#include "dap_common.h"
#include "dap_enc_key.h"
#include "dap_hash.h"
#include "dap_sign.h"
#include "dap_chain_datum.h"
#include "dap_chain_cs_dag.h"
#include "dap_chain_cs_dag_event.h"
#define LOG_TAG "dap_chain_cs_dag_event"
......@@ -112,7 +111,7 @@ dap_chain_cs_dag_event_t * dap_chain_cs_dag_event_copy(dap_chain_cs_dag_event_t
*/
dap_chain_cs_dag_event_t * dap_chain_cs_dag_event_copy_with_sign_add( dap_chain_cs_dag_event_t * a_event, size_t a_event_size,
size_t * a_event_size_new,
dap_chain_net_t * l_net, dap_enc_key_t * l_key)
dap_chain_net_t * a_net, dap_enc_key_t * a_key)
{
size_t l_hashes_size = a_event->header.hash_count*sizeof(dap_chain_hash_fast_t);
dap_chain_datum_t * l_datum = (dap_chain_datum_t*)(a_event->hashes_n_datum_n_signs + l_hashes_size);
......@@ -121,12 +120,12 @@ dap_chain_cs_dag_event_t * dap_chain_cs_dag_event_copy_with_sign_add( dap_chain_
// size_t l_event_size_excl_sign = dap_chain_cs_dag_event_calc_size_excl_signs(a_event,a_event_size);
size_t l_event_size = a_event_size;
size_t l_event_signs_size = l_event_size - l_event_size_excl_sign;
dap_sign_t * l_sign = dap_sign_create(l_key,a_event,l_event_size_excl_sign,0);
dap_sign_t * l_sign = dap_sign_create(a_key,a_event,l_event_size_excl_sign,0);
size_t l_sign_size = dap_sign_get_size(l_sign);
dap_chain_addr_t l_addr = {0};
dap_chain_hash_fast_t l_pkey_hash;
dap_sign_get_pkey_hash(l_sign, &l_pkey_hash);
dap_chain_addr_fill(&l_addr, l_sign->header.type, &l_pkey_hash, l_net->pub.id);
dap_chain_addr_fill(&l_addr, l_sign->header.type, &l_pkey_hash, a_net->pub.id);
char * l_addr_str = dap_chain_addr_to_str(&l_addr);
size_t l_offset = l_hashes_size+l_datum_size;
......@@ -137,20 +136,24 @@ dap_chain_cs_dag_event_t * dap_chain_cs_dag_event_copy_with_sign_add( dap_chain_
dap_chain_addr_t l_item_addr = {0};
dap_chain_hash_fast_t l_item_pkey_hash;
dap_sign_get_pkey_hash(l_item_sign, &l_item_pkey_hash);
dap_chain_addr_fill(&l_item_addr, l_item_sign->header.type, &l_item_pkey_hash, l_net->pub.id);
dap_chain_addr_fill(&l_item_addr, l_item_sign->header.type, &l_item_pkey_hash, a_net->pub.id);
// checking re-sign from one address
if (memcmp(&l_addr, &l_item_addr, sizeof(l_item_addr)) == 0) {
log_it(L_WARNING, "Re-sign from addr: %s", l_addr_str);
log_it(L_DEBUG, "Sign from this addr exists: %s", l_addr_str);
DAP_DELETE(l_sign);
DAP_DELETE(l_addr_str);
return NULL;
}
l_offset += l_sign_size;
}
DAP_DELETE(l_addr_str);
dap_chain_cs_dag_event_t * l_event_new = DAP_REALLOC(a_event, l_event_size+l_sign_size);
// dap_chain_cs_dag_event_t * l_event_new = DAP_REALLOC(a_event, l_event_size+l_sign_size);
dap_chain_cs_dag_event_t * l_event_new = DAP_NEW_Z_SIZE(dap_chain_cs_dag_event_t, l_event_size+l_sign_size);
memcpy(l_event_new, a_event, l_event_size);
memcpy(l_event_new->hashes_n_datum_n_signs+l_offset, l_sign, l_sign_size);
*a_event_size_new = l_event_size+l_sign_size;
l_event_new->header.signs_count++;
DAP_DELETE(l_sign);
DAP_DELETE(l_addr_str);
return l_event_new;
}
......@@ -183,11 +186,12 @@ dap_sign_t * dap_chain_cs_dag_event_get_sign( dap_chain_cs_dag_event_t * a_event
return NULL;
}
bool dap_chain_cs_dag_event_gdb_set(char *a_event_hash_str, dap_chain_cs_dag_event_t * a_event, uint32_t a_event_size,
bool dap_chain_cs_dag_event_gdb_set(char *a_event_hash_str, dap_chain_cs_dag_event_t * a_event, size_t a_event_size,
const char *a_group, dap_chain_cs_dag_event_round_cfg_t * a_event_round_cfg) {
dap_chain_cs_dag_event_round_item_t * l_event_round_item = DAP_NEW_SIZE(dap_chain_cs_dag_event_round_item_t,
sizeof(dap_chain_cs_dag_event_round_item_t)+a_event_size );
l_event_round_item->event_size = a_event_size;
a_event_round_cfg->ts_update = (uint64_t)time(NULL);
// l_event_round_item->event = DAP_DUP_SIZE(a_event, a_event_size);
memcpy(&l_event_round_item->cfg, a_event_round_cfg, sizeof(dap_chain_cs_dag_event_round_cfg_t));
memcpy(l_event_round_item->event, a_event, a_event_size);
......
......@@ -39,6 +39,9 @@ typedef dap_chain_cs_dag_event_t * (*dap_chain_cs_dag_callback_event_create_t)(d
typedef void (*dap_chain_cs_dag_callback_get_round_cfg_t)(dap_chain_cs_dag_t *, dap_chain_cs_dag_event_round_cfg_t *);
typedef void (*dap_chain_cs_dag_callback_set_event_round_cfg_t)(dap_chain_cs_dag_t *, dap_chain_cs_dag_event_round_cfg_t *);
typedef int (*dap_chain_cs_dag_callback_event_round_sync_t)(dap_chain_cs_dag_t * a_dag, const char a_op_code, const char *a_group,
const char *a_key, const void *a_value, const size_t a_value_size);
typedef struct dap_chain_cs_dag_hal_item {
dap_chain_hash_fast_t hash;
UT_hash_handle hh;
......@@ -65,6 +68,7 @@ typedef struct dap_chain_cs_dag
dap_chain_cs_dag_callback_event_t callback_cs_verify;
dap_chain_cs_dag_callback_get_round_cfg_t callback_cs_get_round_cfg;
dap_chain_cs_dag_callback_set_event_round_cfg_t callback_cs_set_event_round_cfg;
dap_chain_cs_dag_callback_event_round_sync_t callback_cs_event_round_sync;
void * _pvt;
void * _inheritor;
......
......@@ -23,10 +23,10 @@
*/
#pragma once
#include "dap_chain_net.h"
#include "dap_enc_key.h"
#include "dap_chain_common.h"
#include "dap_chain_datum.h"
#include "dap_chain_net.h"
#include "dap_sign.h"
#include "dap_hash.h"
......@@ -51,6 +51,8 @@ typedef struct dap_chain_cs_dag_event_round_cfg {
uint16_t confirmations_minimum; // param auth_certs_count_verify in PoA
uint32_t confirmations_timeout; // wait confirmations over minimum value (confirmations_minimum)
uint64_t ts_confirmations_minimum_completed;
uint64_t ts_update;
dap_chain_hash_fast_t first_event_hash; // first event hash in round
} DAP_ALIGN_PACKED dap_chain_cs_dag_event_round_cfg_t;
typedef struct dap_chain_cs_dag_event_round_item {
......@@ -80,7 +82,7 @@ dap_chain_cs_dag_event_t * dap_chain_cs_dag_event_copy(dap_chain_cs_dag_event_t
// Important: returns new deep copy of event
dap_chain_cs_dag_event_t * dap_chain_cs_dag_event_copy_with_sign_add( dap_chain_cs_dag_event_t * a_event, size_t a_event_size,
size_t * a_event_size_new,
dap_chain_net_t * l_net, dap_enc_key_t * l_key);
dap_chain_net_t * a_net, dap_enc_key_t * a_key);
dap_sign_t * dap_chain_cs_dag_event_get_sign( dap_chain_cs_dag_event_t * a_event, size_t a_event_size, uint16_t a_sign_number);
/**
......@@ -138,11 +140,11 @@ static inline void dap_chain_cs_dag_event_calc_hash(dap_chain_cs_dag_event_t * a
dap_hash_fast(a_event, a_event_size, a_event_hash);
}
static inline uint32_t dap_chain_cs_dag_event_round_item_get_size(dap_chain_cs_dag_event_round_item_t * a_event_round_item){
static inline size_t dap_chain_cs_dag_event_round_item_get_size(dap_chain_cs_dag_event_round_item_t * a_event_round_item){
return sizeof(dap_chain_cs_dag_event_round_item_t)+a_event_round_item->event_size;
}
bool dap_chain_cs_dag_event_gdb_set(char *a_event_hash_str, dap_chain_cs_dag_event_t * a_event, uint32_t a_event_size,
bool dap_chain_cs_dag_event_gdb_set(char *a_event_hash_str, dap_chain_cs_dag_event_t * a_event, size_t a_event_size,
const char *a_group, dap_chain_cs_dag_event_round_cfg_t * a_event_round_cfg);
dap_chain_cs_dag_event_t* dap_chain_cs_dag_event_gdb_get(const char *a_event_hash_str, size_t *a_event_size,
......