Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (10)
......@@ -156,8 +156,10 @@ static volatile int s_log_count = 0;
static pthread_t s_log_thread = 0;
static void *s_log_thread_proc(void *arg);
#define STR_LOG_BUF_MAX 1000
typedef struct log_str_t {
char str[1000];
char str[STR_LOG_BUF_MAX];
unsigned int offset;
struct log_str_t *prev, *next;
} log_str_t;
......@@ -363,6 +365,28 @@ void _log_it(const char *a_log_tag, enum dap_log_level a_ll, const char *a_fmt,
pthread_mutex_unlock(&s_log_mutex);
}
static int s_check_and_fill_buffer_log(char **m, struct tm *a_tm_st, char *a_tmp)
{
char *s = *m;
struct tm l_tm;
if (sscanf(a_tmp, "[%d/%d/%d-%d:%d:%d]", &l_tm.tm_mon, &l_tm.tm_mday, &l_tm.tm_year, &l_tm.tm_hour, &l_tm.tm_min, &l_tm.tm_sec) == 6) {
l_tm.tm_mon--;
if (a_tm_st->tm_year >= l_tm.tm_year &&
a_tm_st->tm_mon >= l_tm.tm_mon &&
a_tm_st->tm_mday >= l_tm.tm_mday &&
a_tm_st->tm_hour >= l_tm.tm_hour &&
a_tm_st->tm_min >= l_tm.tm_min &&
a_tm_st->tm_sec >= l_tm.tm_sec) {
size_t l_len = strlen(a_tmp);
strncpy(s, a_tmp, l_len);
s += l_len;
//*s++ = '\n';
*m = s;
return 1;
}
}
return 0;
}
/**
* @brief dap_log_get_item
* @param a_start_time
......@@ -371,9 +395,47 @@ void _log_it(const char *a_log_tag, enum dap_log_level a_ll, const char *a_fmt,
*/
char *dap_log_get_item(time_t a_start_time, int a_limit)
{
#if 0
UNUSED(a_start_time);
UNUSED(a_limit);
return NULL; // TODO
#endif
log_str_t *elem, *tmp;
elem = tmp = NULL;
char *l_buf = DAP_CALLOC(STR_LOG_BUF_MAX, a_limit);
char *l_line = DAP_CALLOC(1, STR_LOG_BUF_MAX + 1);
char *s = l_buf;
//char *l_log_file = dap_strdup_printf("%s/var/log/%s.log", g_sys_dir_path, dap_get_appname());
char *l_log_file = dap_strdup_printf("%s", s_log_file_path);
FILE *fp = fopen(l_log_file, "r");
if (!fp) {
DAP_FREE(l_buf);
DAP_FREE(l_line);
return NULL;
}
struct tm *l_tm_st = localtime (&a_start_time);
pthread_mutex_lock(&s_log_mutex);
while (fgets(l_line, STR_LOG_BUF_MAX, fp)) {
if (a_limit <= 0) break;
a_limit -= s_check_and_fill_buffer_log(&s, l_tm_st, l_line);
}
DL_FOREACH_SAFE(s_log_buffer, elem, tmp) {
if (!tmp->str) continue;
if (a_limit <= 0) break;
a_limit -= s_check_and_fill_buffer_log(&s, l_tm_st, tmp->str);
}
pthread_mutex_unlock(&s_log_mutex);
fclose(fp);
DAP_FREE(l_line);
return l_buf;
}
/**
......
......@@ -34,6 +34,7 @@
#endif
#include "dap_common.h"
#include "dap_timerfd.h"
#include "dap_strfuncs.h"
#include "dap_enc_base58.h"
#include "dap_chain_pvt.h"
......@@ -46,6 +47,7 @@
#include "dap_chain_cs_dag_event.h"
#include "dap_chain_cs_dag_poa.h"
#include "dap_chain_net_srv_stake.h"
#include "dap_chain_cell.h"
#include "dap_cert.h"
......@@ -58,18 +60,28 @@ typedef struct dap_chain_cs_dag_poa_pvt
char * auth_certs_prefix;
uint16_t auth_certs_count;
uint16_t auth_certs_count_verify; // Number of signatures, needed for event verification
uint32_t confirmations_timeout; // wait signs over min value (auth_certs_count_verify)
uint8_t padding[4];
dap_chain_callback_new_cfg_t prev_callback_created; // global network config init
} dap_chain_cs_dag_poa_pvt_t;
#define PVT(a) ((dap_chain_cs_dag_poa_pvt_t *) a->_pvt )
typedef struct dap_chain_cs_dag_poa_callback_timer_arg {
dap_chain_cs_dag_t * dag;
char * l_event_hash_hex_str;
dap_chain_cs_dag_event_round_cfg_t event_round_cfg;
} dap_chain_cs_dag_poa_callback_timer_arg_t;
static void s_callback_get_round_cfg(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_event_round_cfg_t * a_event_round_cfg);
static void s_callback_delete(dap_chain_cs_dag_t * a_dag);
static int s_callback_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg);
static int s_callback_created(dap_chain_t * a_chain, dap_config_t *a_chain_cfg);
static int s_callback_event_verify(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_event_t * a_dag_event, size_t a_dag_event_size);
static dap_chain_cs_dag_event_t * s_callback_event_create(dap_chain_cs_dag_t * a_dag, dap_chain_datum_t * a_datum,
dap_chain_hash_fast_t * a_hashes, size_t a_hashes_count, size_t* a_event_size);
static bool s_callback_round_event_to_chain(dap_chain_cs_dag_poa_callback_timer_arg_t * a_callback_arg);
// CLI commands
static int s_cli_dag_poa(int argc, char ** argv, char **str_reply);
......@@ -177,58 +189,113 @@ static int s_cli_dag_poa(int argc, char ** argv, char **a_str_reply)
char * l_gdb_group_events = l_dag->gdb_group_events_round_new;
size_t l_event_size = 0;
dap_chain_cs_dag_event_t * l_event;
if ( (l_event = (dap_chain_cs_dag_event_t*) dap_chain_global_db_gr_get( dap_strdup(l_event_hash_hex_str),
&l_event_size, l_gdb_group_events )) == NULL ){
dap_chain_cs_dag_event_round_cfg_t l_event_round_cfg;
if ( (l_event = dap_chain_cs_dag_event_gdb_get( l_event_hash_hex_str, &l_event_size,
l_gdb_group_events, &l_event_round_cfg )) == NULL ){
dap_chain_node_cli_set_reply_text(a_str_reply,
"Can't find event in round.new - only place where could be signed the new event\n",
l_event_hash_str);
ret = -30;
}else {
dap_chain_cs_dag_event_t *l_event_new = dap_chain_cs_dag_event_copy_with_sign_add(l_event, l_event_size,l_poa_pvt->events_sign_cert->enc_key );
dap_chain_hash_fast_t l_event_new_hash;
dap_chain_cs_dag_event_calc_hash(l_event_new, l_event_size,&l_event_new_hash);
//size_t l_event_new_size = dap_chain_cs_dag_event_calc_size(l_event_new);
char * l_event_new_hash_hex_str = dap_chain_hash_fast_to_str_new(&l_event_new_hash);
char * l_event_new_hash_base58_str = dap_enc_base58_encode_hash_to_str(&l_event_new_hash);
//char * l_event_new_hash_base58_str = dap_enc_base58_from_hex_str_to_str(l_event_new_hash_hex_str);
if (dap_chain_global_db_gr_set( dap_strdup(l_event_new_hash_hex_str),(uint8_t*) l_event,l_event_size,l_gdb_group_events) ){
if ( dap_chain_global_db_gr_del(dap_strdup(l_event_hash_hex_str),l_gdb_group_events) ) { // Delete old event
size_t l_event_size_new = 0;
dap_chain_cs_dag_event_t *l_event_new = dap_chain_cs_dag_event_copy_with_sign_add(l_event, l_event_size,
&l_event_size_new,
l_chain_net, l_poa_pvt->events_sign_cert->enc_key);
if ( l_event_new ) {
dap_chain_hash_fast_t l_event_new_hash;
dap_chain_cs_dag_event_calc_hash(l_event_new, l_event_size_new, &l_event_new_hash);
//size_t l_event_new_size = dap_chain_cs_dag_event_calc_size(l_event_new);
char * l_event_new_hash_hex_str = dap_chain_hash_fast_to_str_new(&l_event_new_hash);
char * l_event_new_hash_base58_str = dap_enc_base58_encode_hash_to_str(&l_event_new_hash);
//char * l_event_new_hash_base58_str = dap_enc_base58_from_hex_str_to_str(l_event_new_hash_hex_str);
bool l_event_is_ready = false;
if ( l_event_new->header.signs_count >= l_event_round_cfg.confirmations_minimum ) {
log_it(L_NOTICE,"Event %s minimum confirmations completed", l_event_new_hash_base58_str);
int l_ret_event_verify;
l_dag->callback_cs_set_event_round_cfg(l_dag, &l_event_round_cfg);
if ( ( l_ret_event_verify = l_dag->callback_cs_verify(l_dag, l_event_new, l_event_size_new)) == 0 ) {
log_it(L_NOTICE,"Event %s verification passed", l_event_new_hash_hex_str);
if (l_event_round_cfg.ts_confirmations_minimum_completed == (uint64_t)0) {
l_event_round_cfg.ts_confirmations_minimum_completed = (uint64_t)time(NULL);
}
l_event_is_ready = true;
} else {
log_it(L_NOTICE,"Error! Event %s is not passing consensus verification, ret code %d\n",
l_event_new_hash_hex_str, l_ret_event_verify);
}
}
if (dap_chain_cs_dag_event_gdb_set(l_event_new_hash_hex_str, l_event_new,
l_event_size_new, l_gdb_group_events, &l_event_round_cfg) ){
if ( dap_chain_global_db_gr_del(dap_strdup(l_event_hash_hex_str), l_gdb_group_events) ) { // Delete old event
if(!dap_strcmp(l_hash_out_type, "hex")) {
dap_chain_node_cli_set_reply_text(a_str_reply,
"Added new sign with cert \"%s\", event %s placed back in round.new\n",
l_poa_pvt->events_sign_cert->name, l_event_new_hash_hex_str);
}
else {
dap_chain_node_cli_set_reply_text(a_str_reply,
"Added new sign with cert \"%s\", event %s placed back in round.new\n",
l_poa_pvt->events_sign_cert->name, l_event_new_hash_base58_str);
}
ret = 0;
dap_chain_net_sync_gdb(l_chain_net); // Propagate changes in pool
if (l_event_is_ready) {
dap_chain_cs_dag_poa_callback_timer_arg_t * l_callback_arg = DAP_NEW_Z(dap_chain_cs_dag_poa_callback_timer_arg_t);
l_callback_arg->dag = l_dag;
l_callback_arg->l_event_hash_hex_str = dap_strdup(l_event_new_hash_hex_str);
memcpy(&l_callback_arg->event_round_cfg, &l_event_round_cfg, sizeof(dap_chain_cs_dag_event_round_cfg_t));
uint32_t l_timeout = l_event_round_cfg.confirmations_timeout;
if ( l_timeout <= ((uint64_t)time(NULL) - l_event_round_cfg.ts_confirmations_minimum_completed) ) {
s_callback_round_event_to_chain(l_callback_arg);
}
else {
l_timeout = l_timeout - ((uint64_t)time(NULL) - l_event_round_cfg.ts_confirmations_minimum_completed);
if (dap_timerfd_start(l_timeout*1000,
(dap_timerfd_callback_t)s_callback_round_event_to_chain,
l_callback_arg) == NULL) {
log_it(L_ERROR,"Can't run timer for Event %s", l_event_new_hash_hex_str);
} else {
log_it(L_NOTICE,"Run timer %dsec. for Event %s", l_timeout, l_event_new_hash_hex_str);
}
}
}
}else {
ret = 1;
dap_chain_node_cli_set_reply_text(a_str_reply, "Added new sign with cert \"%s\", event %s placed back in round.new\n"
"WARNING! Old event %s with same datum is still in round.new, produced DUP!\n",
l_poa_pvt->events_sign_cert->name ,l_event_new_hash_hex_str, l_event_hash_str);
}
}else {
if(!dap_strcmp(l_hash_out_type, "hex")) {
dap_chain_node_cli_set_reply_text(a_str_reply,
"Added new sign with cert \"%s\", event %s placed back in round.new\n",
l_poa_pvt->events_sign_cert->name, l_event_new_hash_hex_str);
"GDB Error: Can't place event %s with new sign back in round.new\n",
l_event_new_hash_hex_str);
}
else {
dap_chain_node_cli_set_reply_text(a_str_reply,
"Added new sign with cert \"%s\", event %s placed back in round.new\n",
l_poa_pvt->events_sign_cert->name, l_event_new_hash_base58_str);
"GDB Error: Can't place event %s with new sign back in round.new\n",
l_event_new_hash_base58_str);
}
ret = 0;
dap_chain_net_sync_gdb(l_chain_net); // Propagate changes in pool
}else {
ret = 1;
dap_chain_node_cli_set_reply_text(a_str_reply, "Added new sign with cert \"%s\", event %s placed back in round.new\n"
"WARNING! Old event %s with same datum is still in round.new, produced DUP!\n",
l_poa_pvt->events_sign_cert->name ,l_event_new_hash_hex_str, l_event_hash_str);
}
}else {
if(!dap_strcmp(l_hash_out_type, "hex")) {
dap_chain_node_cli_set_reply_text(a_str_reply,
"GDB Error: Can't place event %s with new sign back in round.new\n",
l_event_new_hash_hex_str);
}
else {
dap_chain_node_cli_set_reply_text(a_str_reply,
"GDB Error: Can't place event %s with new sign back in round.new\n",
l_event_new_hash_base58_str);
}
ret=-31;
ret=-31;
}
DAP_DELETE(l_event_new_hash_hex_str);
DAP_DELETE(l_event_new_hash_base58_str);
} else {
dap_chain_node_cli_set_reply_text(a_str_reply,
"Can't sign event in round.new\n",
l_event_hash_str);
ret=-1;
}
DAP_DELETE(l_event_new_hash_hex_str);
DAP_DELETE(l_event_new_hash_base58_str);
}
DAP_DELETE( l_gdb_group_events );
// DAP_DELETE( l_gdb_group_events );
// DAP_DELETE(l_event_round_cfg);
DAP_DELETE(l_event);
} else {
dap_chain_node_cli_set_reply_text(a_str_reply, "Command dag_poa requires subcommand 'sign'");
......@@ -256,12 +323,14 @@ static int s_callback_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg)
l_dag->callback_delete = s_callback_delete;
l_dag->callback_cs_verify = s_callback_event_verify;
l_dag->callback_cs_event_create = s_callback_event_create;
l_dag->callback_cs_get_round_cfg = s_callback_get_round_cfg;
l_poa->_pvt = DAP_NEW_Z ( dap_chain_cs_dag_poa_pvt_t );
dap_chain_cs_dag_poa_pvt_t * l_poa_pvt = PVT ( l_poa );
if (dap_config_get_item_str(a_chain_cfg,"dag-poa","auth_certs_prefix") ) {
l_poa_pvt->auth_certs_count = dap_config_get_item_uint16_default(a_chain_cfg,"dag-poa","auth_certs_number",0);
l_poa_pvt->auth_certs_count_verify = dap_config_get_item_uint16_default(a_chain_cfg,"dag-poa","auth_certs_number_verify",0);
l_poa_pvt->confirmations_timeout = dap_config_get_item_uint16_default(a_chain_cfg,"dag-poa","confirmations_timeout",600);
l_poa_pvt->auth_certs_prefix = strdup ( dap_config_get_item_str(a_chain_cfg,"dag-poa","auth_certs_prefix") );
if (l_poa_pvt->auth_certs_count && l_poa_pvt->auth_certs_count_verify ) {
l_poa_pvt->auth_certs = DAP_NEW_Z_SIZE ( dap_cert_t *, l_poa_pvt->auth_certs_count * sizeof(dap_cert_t));
......@@ -286,6 +355,54 @@ static int s_callback_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg)
return 0;
}
static void s_callback_get_round_cfg(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_event_round_cfg_t * a_event_round_cfg) {
dap_chain_cs_dag_poa_t * l_poa = DAP_CHAIN_CS_DAG_POA(a_dag);
dap_chain_cs_dag_poa_pvt_t * l_poa_pvt = PVT (l_poa);
a_event_round_cfg->confirmations_minimum = l_poa_pvt->auth_certs_count_verify;
a_event_round_cfg->confirmations_timeout = l_poa_pvt->confirmations_timeout;
a_event_round_cfg->ts_confirmations_minimum_completed = 0;
}
static bool s_callback_round_event_to_chain(dap_chain_cs_dag_poa_callback_timer_arg_t * a_callback_arg) {
dap_chain_cs_dag_t * l_dag = a_callback_arg->dag;
dap_chain_net_t *l_net = dap_chain_net_by_id(l_dag->chain->net_id);
char * l_gdb_group_events = l_dag->gdb_group_events_round_new;
dap_chain_cs_dag_event_t * l_event;
size_t l_event_size = 0;
dap_chain_cs_dag_event_round_cfg_t l_event_round_cfg;
if ( (l_event = dap_chain_cs_dag_event_gdb_get( a_callback_arg->l_event_hash_hex_str, &l_event_size,
l_gdb_group_events, &l_event_round_cfg )) == NULL ){
log_it(L_NOTICE,"Can't find event %s in round.new. The hash may have changed by reason the addition of a new signature.",
a_callback_arg->l_event_hash_hex_str);
}
else {
dap_chain_atom_ptr_t l_new_atom = (dap_chain_atom_ptr_t)dap_chain_cs_dag_event_copy(l_event, l_event_size);
memcpy(l_new_atom, l_event, l_event_size);
if(l_dag->chain->callback_atom_add(l_dag->chain, l_new_atom, l_event_size) < 0) { // Add new atom in chain
DAP_DELETE(l_new_atom);
log_it(L_NOTICE, "Event %s not added in chain", a_callback_arg->l_event_hash_hex_str);
}
else {
log_it(L_NOTICE, "Event %s added in chain successfully",
a_callback_arg->l_event_hash_hex_str);
if (dap_chain_cell_file_update(l_dag->chain->cells) > 0) {
// delete events from db
dap_chain_global_db_gr_del(dap_strdup(a_callback_arg->l_event_hash_hex_str), l_dag->gdb_group_events_round_new);
}
dap_chain_cell_close(l_dag->chain->cells);
dap_chain_net_sync_all(l_net);
}
}
DAP_DELETE(a_callback_arg->l_event_hash_hex_str);
DAP_DELETE(a_callback_arg);
return false;
}
/**
* @brief create callback load certificate for event signing for specific chain
* path to certificate iw written to chain config file in dag_poa section
......@@ -385,7 +502,12 @@ static int s_callback_event_verify(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_
a_dag_event, l_offset_from_beginning, a_dag_event_size);
return -7; // Incorrest size
}
if ( a_dag_event->header.signs_count >= l_poa_pvt->auth_certs_count_verify ){
uint16_t l_certs_count_verify = a_dag->use_event_round_cfg ? a_dag->event_round_cfg.confirmations_minimum
: l_poa_pvt->auth_certs_count_verify;
a_dag->use_event_round_cfg = false;
// if ( a_dag_event->header.signs_count >= l_poa_pvt->auth_certs_count_verify ){
if ( a_dag_event->header.signs_count >= l_certs_count_verify ){
size_t l_verified = 0;
for ( uint16_t i = 0; i < a_dag_event->header.signs_count; i++ ) {
if (l_offset_from_beginning == a_dag_event_size)
......@@ -405,7 +527,8 @@ static int s_callback_event_verify(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_
l_verified++;
}
}
return l_verified >= l_poa_pvt->auth_certs_count_verify ? 0 : -1;
//return l_verified >= l_poa_pvt->auth_certs_count_verify ? 0 : -1;
return l_verified >= l_certs_count_verify ? 0 : -1;
}else if (a_dag_event->header.hash_count == 0){
dap_chain_hash_fast_t l_event_hash;
dap_chain_cs_dag_event_calc_hash(a_dag_event,a_dag_event_size, &l_event_hash);
......
......@@ -77,6 +77,7 @@ static void s_pgsql_free_connection(PGconn *a_conn)
for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) {
if (s_conn_pool[i].conn == a_conn) {
s_conn_pool[i].busy = 0;
break;
}
}
pthread_rwlock_unlock(&s_db_rwlock);
......
......@@ -41,8 +41,15 @@
#define LOG_TAG "db_sqlite"
static sqlite3 *s_db = NULL;
struct dap_sqlite_conn_pool_item {
sqlite3 *conn;
int busy;
};
//static sqlite3 *s_db = NULL;
static sqlite3 *s_trans = NULL;
static char *s_filename_db = NULL;
static struct dap_sqlite_conn_pool_item s_conn_pool[DAP_SQLITE_POOL_COUNT];
static pthread_rwlock_t s_db_rwlock = PTHREAD_RWLOCK_INITIALIZER;
// Value of one field in the table
typedef struct _SQLITE_VALUE_
......@@ -77,6 +84,41 @@ typedef struct _SQLITE_ROW_VALUE_
static int dap_db_driver_sqlite_exec(sqlite3 *l_db, const char *l_query, char **l_error_message);
static sqlite3 *s_sqlite_get_connection(void)
{
if (pthread_rwlock_wrlock(&s_db_rwlock) == EDEADLK) {
return NULL;
}
sqlite3 *l_ret = NULL;
for (int i = 0; i < DAP_SQLITE_POOL_COUNT; i++) {
if (!s_conn_pool[i].busy) {
l_ret = s_conn_pool[i].conn;
s_conn_pool[i].busy = 1;
break;
}
}
pthread_rwlock_unlock(&s_db_rwlock);
return l_ret;
}
static void s_sqlite_free_connection(sqlite3 *a_conn)
{
if (pthread_rwlock_wrlock(&s_db_rwlock) == EDEADLK) {
return;
}
for (int i = 0; i < DAP_SQLITE_POOL_COUNT; i++) {
if (s_conn_pool[i].conn == a_conn) {
s_conn_pool[i].busy = 0;
break;
}
}
pthread_rwlock_unlock(&s_db_rwlock);
}
/**
* @brief Initializes a SQLite database.
* @note no thread safe
......@@ -112,13 +154,18 @@ int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks
DAP_DEL_Z(l_filename_dir);
// Open Sqlite file, create if nessesary
char *l_error_message = NULL;
s_db = dap_db_driver_sqlite_open(a_filename_db, SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, &l_error_message);
if(!s_db) {
log_it(L_ERROR, "Can't init sqlite err: \"%s\"", l_error_message);
dap_db_driver_sqlite_free(l_error_message);
l_ret = -3;
}
else {
for (int i = 0; i < DAP_SQLITE_POOL_COUNT; i++) {
s_conn_pool[i].conn = dap_db_driver_sqlite_open(a_filename_db, SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, &l_error_message);
if (!s_conn_pool[i].conn) {
log_it(L_ERROR, "Can't init sqlite err: \"%s\"", l_error_message);
dap_db_driver_sqlite_free(l_error_message);
l_ret = -3;
for(int ii = i - 1; ii >= 0; ii--) {
dap_db_driver_sqlite_close(s_conn_pool[ii].conn);
}
goto end;
}
sqlite3 *s_db = s_conn_pool[i].conn;
if(!dap_db_driver_sqlite_set_pragma(s_db, "synchronous", "NORMAL")) // 0 | OFF | 1 | NORMAL | 2 | FULL
printf("can't set new synchronous mode\n");
if(!dap_db_driver_sqlite_set_pragma(s_db, "journal_mode", "OFF")) // DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF
......@@ -126,6 +173,9 @@ int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks
if(!dap_db_driver_sqlite_set_pragma(s_db, "page_size", "1024")) // DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF
printf("can't set page_size\n");
s_conn_pool[i].busy = 0;
}
// *PRAGMA page_size = bytes; // page size DB; it is reasonable to make it equal to the size of the disk cluster 4096
// *PRAGMA cache_size = -kibibytes; // by default it is equal to 2000 pages of database
//
......@@ -141,7 +191,7 @@ int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks
a_drv_callback->deinit = dap_db_driver_sqlite_deinit;
a_drv_callback->flush = dap_db_driver_sqlite_flush;
s_filename_db = strdup(a_filename_db);
}
end:
return l_ret;
}
......@@ -153,13 +203,14 @@ int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks
int dap_db_driver_sqlite_deinit(void)
{
pthread_rwlock_wrlock(&s_db_rwlock);
if(!s_db){
pthread_rwlock_unlock(&s_db_rwlock);
return -666;
}
dap_db_driver_sqlite_close(s_db);
for (int i = 0; i < DAP_SQLITE_POOL_COUNT; i++) {
if (s_conn_pool[i].conn) {
dap_db_driver_sqlite_close(s_conn_pool[i].conn);
s_conn_pool[i].busy = 0;
}
}
pthread_rwlock_unlock(&s_db_rwlock);
s_db = NULL;
//s_db = NULL;
return sqlite3_shutdown();
}
......@@ -191,14 +242,14 @@ sqlite3* dap_db_driver_sqlite_open(const char *a_filename_utf8, int a_flags, cha
{
sqlite3 *l_db = NULL;
int l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_FULLMUTEX, NULL);
int l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_NOMUTEX, NULL);
// if unable to open the database file
if(l_rc == SQLITE_CANTOPEN) {
log_it(L_WARNING,"No database on path %s, creating one from scratch", a_filename_utf8);
if(l_db)
sqlite3_close(l_db);
// try to create database
l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_FULLMUTEX| SQLITE_OPEN_CREATE, NULL);
l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_NOMUTEX| SQLITE_OPEN_CREATE, NULL);
}
if(l_rc != SQLITE_OK) {
......@@ -269,16 +320,14 @@ bool dap_db_driver_sqlite_set_pragma(sqlite3 *a_db, char *a_param, char *a_mode)
int dap_db_driver_sqlite_flush()
{
log_it(L_DEBUG, "Start flush sqlite data base.");
pthread_rwlock_wrlock(&s_db_rwlock);
sqlite3 *s_db = s_sqlite_get_connection();
if(!s_db){
pthread_rwlock_unlock(&s_db_rwlock);
return -666;
}
dap_db_driver_sqlite_close(s_db);
char *l_error_message = NULL;
s_db = dap_db_driver_sqlite_open(s_filename_db, SQLITE_OPEN_READWRITE, &l_error_message);
if(!s_db) {
pthread_rwlock_unlock(&s_db_rwlock);
log_it(L_ERROR, "Can't init sqlite err: \"%s\"", l_error_message? l_error_message: "UNKNOWN");
dap_db_driver_sqlite_free(l_error_message);
return -3;
......@@ -293,7 +342,6 @@ int dap_db_driver_sqlite_flush()
if(!dap_db_driver_sqlite_set_pragma(s_db, "page_size", "1024")) // DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF
log_it(L_WARNING, "Can't set page_size\n");
pthread_rwlock_unlock(&s_db_rwlock);
return 0;
}
......@@ -331,6 +379,7 @@ static int dap_db_driver_sqlite_exec(sqlite3 *l_db, const char *l_query, char **
*/
static int dap_db_driver_sqlite_create_group_table(const char *a_table_name)
{
sqlite3 *s_db = s_sqlite_get_connection();
char *l_error_message = NULL;
if(!s_db || !a_table_name)
return -1;
......@@ -343,6 +392,7 @@ static int dap_db_driver_sqlite_create_group_table(const char *a_table_name)
log_it(L_ERROR, "Creatу_table : %s\n", l_error_message);
dap_db_driver_sqlite_free(l_error_message);
DAP_DELETE(l_query);
s_sqlite_free_connection(s_db);
return -1;
}
DAP_DELETE(l_query);
......@@ -353,9 +403,11 @@ static int dap_db_driver_sqlite_create_group_table(const char *a_table_name)
log_it(L_ERROR, "Create unique index : %s\n", l_error_message);
dap_db_driver_sqlite_free(l_error_message);
DAP_DELETE(l_query);
s_sqlite_free_connection(s_db);
return -1;
}
DAP_DELETE(l_query);
s_sqlite_free_connection(s_db);
return 0;
}
......@@ -512,13 +564,14 @@ int dap_db_driver_sqlite_vacuum(sqlite3 *l_db)
*/
int dap_db_driver_sqlite_start_transaction(void)
{
s_trans = s_sqlite_get_connection();
pthread_rwlock_wrlock(&s_db_rwlock);
if(!s_db){
if(!s_trans){
pthread_rwlock_unlock(&s_db_rwlock);
return -666;
}
if(SQLITE_OK == dap_db_driver_sqlite_exec(s_db, "BEGIN", NULL)){
if(SQLITE_OK == dap_db_driver_sqlite_exec(s_trans, "BEGIN", NULL)){
pthread_rwlock_unlock(&s_db_rwlock);
return 0;
}else{
......@@ -535,11 +588,11 @@ int dap_db_driver_sqlite_start_transaction(void)
int dap_db_driver_sqlite_end_transaction(void)
{
pthread_rwlock_wrlock(&s_db_rwlock);
if(!s_db){
if(!s_trans){
pthread_rwlock_unlock(&s_db_rwlock);
return -666;
}
if(SQLITE_OK == dap_db_driver_sqlite_exec(s_db, "COMMIT", NULL)){
if(SQLITE_OK == dap_db_driver_sqlite_exec(s_trans, "COMMIT", NULL)){
pthread_rwlock_unlock(&s_db_rwlock);
return 0;
}else{
......@@ -630,9 +683,8 @@ int dap_db_driver_sqlite_apply_store_obj(dap_store_obj_t *a_store_obj)
return -1;
}
// execute request
pthread_rwlock_wrlock(&s_db_rwlock);
sqlite3 *s_db = s_sqlite_get_connection();
if(!s_db){
pthread_rwlock_unlock(&s_db_rwlock);
return -666;
}
......@@ -662,13 +714,13 @@ int dap_db_driver_sqlite_apply_store_obj(dap_store_obj_t *a_store_obj)
// repeat request
l_ret = dap_db_driver_sqlite_exec(s_db, l_query, &l_error_message);
}
pthread_rwlock_unlock(&s_db_rwlock);
// missing database
if(l_ret != SQLITE_OK) {
log_it(L_ERROR, "sqlite apply error: %s", l_error_message);
dap_db_driver_sqlite_free(l_error_message);
l_ret = -1;
}
s_sqlite_free_connection(s_db);
if (a_store_obj->key)
DAP_DELETE(a_store_obj->key);
dap_db_driver_sqlite_free(l_query);
......@@ -731,19 +783,18 @@ dap_store_obj_t* dap_db_driver_sqlite_read_last_store_obj(const char *a_group)
return NULL;
char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group);
char *l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' ORDER BY id DESC LIMIT 1", l_table_name);
pthread_rwlock_wrlock(&s_db_rwlock);
sqlite3 *s_db = s_sqlite_get_connection();
if(!s_db){
pthread_rwlock_unlock(&s_db_rwlock);
return NULL;
}
int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, &l_error_message);
pthread_rwlock_unlock(&s_db_rwlock);
sqlite3_free(l_str_query);
DAP_DEL_Z(l_table_name);
if(l_ret != SQLITE_OK) {
//log_it(L_ERROR, "read last l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
dap_db_driver_sqlite_free(l_error_message);
s_sqlite_free_connection(s_db);
return NULL;
}
......@@ -760,6 +811,8 @@ dap_store_obj_t* dap_db_driver_sqlite_read_last_store_obj(const char *a_group)
dap_db_driver_sqlite_row_free(l_row);
dap_db_driver_sqlite_query_free(l_res);
s_sqlite_free_connection(s_db);
return l_obj;
}
......@@ -785,27 +838,27 @@ dap_store_obj_t* dap_db_driver_sqlite_read_cond_store_obj(const char *a_group, u
int l_count_out = 0;
if(a_count_out)
l_count_out = (int)*a_count_out;
char *l_str_query;
char *l_str_query = NULL;
if(l_count_out)
l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE id>='%lld' ORDER BY id ASC LIMIT %d",
l_table_name, a_id, l_count_out);
else
l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' WHERE id>='%lld' ORDER BY id ASC",
l_table_name, a_id);
pthread_rwlock_wrlock(&s_db_rwlock);
sqlite3 *s_db = s_sqlite_get_connection();
if(!s_db){
pthread_rwlock_unlock(&s_db_rwlock);
if (l_str_query) sqlite3_free(l_str_query);
return NULL;
}
int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, &l_error_message);
pthread_rwlock_unlock(&s_db_rwlock);
sqlite3_free(l_str_query);
DAP_DEL_Z(l_table_name);
if(l_ret != SQLITE_OK) {
//log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
dap_db_driver_sqlite_free(l_error_message);
s_sqlite_free_connection(s_db);
return NULL;
}
......@@ -835,9 +888,11 @@ dap_store_obj_t* dap_db_driver_sqlite_read_cond_store_obj(const char *a_group, u
} while(l_row);
dap_db_driver_sqlite_query_free(l_res);
s_sqlite_free_connection(s_db);
if(a_count_out)
*a_count_out = (size_t)l_count_out;
return l_obj;
}
......@@ -851,6 +906,7 @@ dap_store_obj_t* dap_db_driver_sqlite_read_cond_store_obj(const char *a_group, u
*/
dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out)
{
sqlite3 *s_db = s_sqlite_get_connection();
if(!a_group || !s_db)
return NULL;
dap_store_obj_t *l_obj = NULL;
......@@ -876,14 +932,13 @@ dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const
else
l_str_query = sqlite3_mprintf("SELECT id,ts,key,value FROM '%s' ORDER BY id ASC", l_table_name);
}
pthread_rwlock_wrlock(&s_db_rwlock);
int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, NULL);
pthread_rwlock_unlock(&s_db_rwlock);
sqlite3_free(l_str_query);
DAP_DEL_Z(l_table_name);
if(l_ret != SQLITE_OK) {
//log_it(L_ERROR, "read l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
s_sqlite_free_connection(s_db);
return NULL;
}
......@@ -913,6 +968,7 @@ dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const
} while(l_row);
dap_db_driver_sqlite_query_free(l_res);
s_sqlite_free_connection(s_db);
if(a_count_out)
*a_count_out = l_count_out;
......@@ -927,16 +983,16 @@ dap_store_obj_t* dap_db_driver_sqlite_read_store_obj(const char *a_group, const
*/
dap_list_t* dap_db_driver_sqlite_get_groups_by_mask(const char *a_group_mask)
{
sqlite3 *s_db = s_sqlite_get_connection();
if(!a_group_mask || !s_db)
return NULL;
sqlite3_stmt *l_res;
const char *l_str_query = "SELECT name FROM sqlite_master WHERE type ='table' AND name NOT LIKE 'sqlite_%'";
dap_list_t *l_ret_list = NULL;
pthread_rwlock_wrlock(&s_db_rwlock);
int l_ret = dap_db_driver_sqlite_query(s_db, (char *)l_str_query, &l_res, NULL);
pthread_rwlock_unlock(&s_db_rwlock);
if(l_ret != SQLITE_OK) {
//log_it(L_ERROR, "Get tables l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
s_sqlite_free_connection(s_db);
return NULL;
}
char * l_mask = dap_db_driver_sqlite_make_table_name(a_group_mask);
......@@ -948,6 +1004,9 @@ dap_list_t* dap_db_driver_sqlite_get_groups_by_mask(const char *a_group_mask)
dap_db_driver_sqlite_row_free(l_row);
}
dap_db_driver_sqlite_query_free(l_res);
s_sqlite_free_connection(s_db);
return l_ret_list;
}
......@@ -960,20 +1019,20 @@ dap_list_t* dap_db_driver_sqlite_get_groups_by_mask(const char *a_group_mask)
*/
size_t dap_db_driver_sqlite_read_count_store(const char *a_group, uint64_t a_id)
{
sqlite3 *s_db = s_sqlite_get_connection();
sqlite3_stmt *l_res;
if(!a_group || ! s_db)
return 0;
char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group);
char *l_str_query = sqlite3_mprintf("SELECT COUNT(*) FROM '%s' WHERE id>='%lld'", l_table_name, a_id);
pthread_rwlock_wrlock(&s_db_rwlock);
int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, NULL);
pthread_rwlock_unlock(&s_db_rwlock);
sqlite3_free(l_str_query);
DAP_DEL_Z(l_table_name);
if(l_ret != SQLITE_OK) {
//log_it(L_ERROR, "Count l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
s_sqlite_free_connection(s_db);
return 0;
}
size_t l_ret_val = 0;
......@@ -983,6 +1042,9 @@ size_t dap_db_driver_sqlite_read_count_store(const char *a_group, uint64_t a_id)
dap_db_driver_sqlite_row_free(l_row);
}
dap_db_driver_sqlite_query_free(l_res);
s_sqlite_free_connection(s_db);
return l_ret_val;
}
......@@ -995,20 +1057,20 @@ size_t dap_db_driver_sqlite_read_count_store(const char *a_group, uint64_t a_id)
*/
bool dap_db_driver_sqlite_is_obj(const char *a_group, const char *a_key)
{
sqlite3 *s_db = s_sqlite_get_connection();
sqlite3_stmt *l_res;
if(!a_group || ! s_db)
return false;
char * l_table_name = dap_db_driver_sqlite_make_table_name(a_group);
char *l_str_query = sqlite3_mprintf("SELECT EXISTS(SELECT * FROM '%s' WHERE key='%s')", l_table_name, a_key);
pthread_rwlock_wrlock(&s_db_rwlock);
int l_ret = dap_db_driver_sqlite_query(s_db, l_str_query, &l_res, NULL);
pthread_rwlock_unlock(&s_db_rwlock);
sqlite3_free(l_str_query);
DAP_DEL_Z(l_table_name);
if(l_ret != SQLITE_OK) {
//log_it(L_ERROR, "Exists l_ret=%d, %s\n", sqlite3_errcode(s_db), sqlite3_errmsg(s_db));
s_sqlite_free_connection(s_db);
return false;
}
bool l_ret_val = false;
......@@ -1018,5 +1080,8 @@ bool dap_db_driver_sqlite_is_obj(const char *a_group, const char *a_key)
dap_db_driver_sqlite_row_free(l_row);
}
dap_db_driver_sqlite_query_free(l_res);
s_sqlite_free_connection(s_db);
return l_ret_val;
}
......@@ -27,6 +27,8 @@
#include <sqlite3.h>
#include "dap_chain_global_db_driver.h"
#define DAP_SQLITE_POOL_COUNT 16
int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks_t *a_drv_callback);
int dap_db_driver_sqlite_deinit(void);
......
......@@ -234,6 +234,7 @@ static bool s_timer_update_states_callback(void *a_arg)
assert(l_net);
// If we do nothing - init sync process
if (l_ch_chain->state == CHAIN_STATE_IDLE && dap_chain_net_sync_trylock(l_net, l_me)) {
//if (l_ch_chain->state == CHAIN_STATE_IDLE) {
log_it(L_INFO, "Start synchronization process with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr));
dap_stream_ch_chain_sync_request_t l_sync_gdb = {};
l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
......
......@@ -122,6 +122,7 @@ static dap_chain_datum_t* s_chain_callback_datum_iter_get_next( dap_chain_datum_
static int s_cli_dag(int argc, char ** argv, char **str_reply);
void s_dag_events_lasts_process_new_last_event(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_event_item_t * a_event_item);
static void s_dag_chain_cs_set_event_round_cfg(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_event_round_cfg_t * a_event_round_cfg);
static bool s_seed_mode = false;
static bool s_debug_more = false;
......@@ -163,6 +164,19 @@ void dap_chain_cs_dag_deinit(void)
}
static void s_history_callback_notify(void * a_arg, const char a_op_code, const char * a_group,
const char * a_key, const void * a_value, const size_t a_value_size)
{
if (a_arg){
dap_chain_cs_dag_t * l_dag = (dap_chain_cs_dag_t *) a_arg;
dap_chain_net_t *l_net = dap_chain_net_by_id( l_dag->chain->net_id);
log_it(L_DEBUG,"%s.%s: op_code='%c' group=\"%s\" key=\"%s\" value_size=%zu",l_net->pub.name,
l_dag->chain->name, a_op_code, a_group, a_key, a_value_size);
// dap_chain_node_mempool_autoproc_notify((void *)l_net, a_op_code, a_group, a_key, a_value, a_value_size);
dap_chain_net_sync_gdb_broadcast((void *)l_net, a_op_code, a_group, a_key, a_value, a_value_size);
}
}
/**
* @brief dap_chain_cs_dag_new
* @param a_chain
......@@ -232,10 +246,26 @@ int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg)
l_dag->is_single_line = dap_config_get_item_bool_default(a_chain_cfg,"dag","is_single_line",false);
l_dag->is_celled = dap_config_get_item_bool_default(a_chain_cfg,"dag","is_celled",false);
l_dag->is_add_directy = dap_config_get_item_bool_default(a_chain_cfg,"dag","is_add_directly",false);
l_dag->is_add_directly = dap_config_get_item_bool_default(a_chain_cfg,"dag","is_add_directly",false);
l_dag->datum_add_hashes_count = dap_config_get_item_uint16_default(a_chain_cfg,"dag","datum_add_hashes_count",1);
l_dag->gdb_group_events_round_new = dap_strdup( dap_config_get_item_str_default(a_chain_cfg,"dag","gdb_group_events_round_new",
"events.round.new"));
l_dag->use_event_round_cfg = false;
l_dag->callback_cs_set_event_round_cfg = s_dag_chain_cs_set_event_round_cfg;
// l_dag->gdb_group_events_round_new = dap_strdup( dap_config_get_item_str_default(a_chain_cfg,"dag","gdb_group_events_round_new",
// "events.round.new"));
char * l_round_new_str = dap_strdup( dap_config_get_item_str_default(a_chain_cfg,"dag","gdb_group_events_round_new", "events.round.new"));
dap_chain_net_t *l_net = dap_chain_net_by_id(a_chain->net_id);
//if(!l_dag->is_celled){
l_dag->gdb_group_events_round_new = dap_strdup_printf( "round-gdb.%s.chain-%016llX.%s",l_net->pub.name,
a_chain->id.uint64, l_round_new_str);
// }else {
// l_dag->gdb_group_events_round_new = dap_strdup_printf( "round-gdb.%s.chain-%016llX.cell-%016llX.%s",l_net->pub.name,
// a_chain->id.uint64, l_net->pub.cell_id.uint64, l_round_new_str);
// }
DAP_DELETE(l_round_new_str);
dap_chain_global_db_add_sync_group("round-gdb", s_history_callback_notify, l_dag);
if ( l_dag->is_single_line ) {
log_it (L_NOTICE, "DAG chain initialized (single line)");
} else {
......@@ -245,6 +275,13 @@ int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg)
return 0;
}
static void s_dag_chain_cs_set_event_round_cfg(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_event_round_cfg_t * a_event_round_cfg){
memcpy(&a_dag->event_round_cfg,
a_event_round_cfg,
sizeof(dap_chain_cs_dag_event_round_cfg_t));
a_dag->use_event_round_cfg = true;
}
static void s_dap_chain_cs_dag_purge(dap_chain_t *a_chain)
{
dap_chain_cs_dag_pvt_t *l_dag_pvt = PVT(DAP_CHAIN_CS_DAG(a_chain));
......@@ -544,8 +581,11 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain
do{
int l_index = rand() % (int) l_events_round_new_size;
dap_chain_hash_fast_t l_hash;
dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *) l_events_round_new[l_index].value;
size_t l_event_size = l_events_round_new[l_index].value_len;
// dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *) l_events_round_new[l_index].value;
// size_t l_event_size = l_events_round_new[l_index].value_len;
dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *)
((dap_chain_cs_dag_event_round_item_t *)l_events_round_new[l_index].value)->event;
size_t l_event_size = ((dap_chain_cs_dag_event_round_item_t *)l_events_round_new[l_index].value)->event_size;
dap_hash_fast(l_event, l_event_size,&l_hash);
bool l_is_already_in_event = false;
......@@ -592,7 +632,7 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain
if(l_dag->callback_cs_event_create)
l_event = l_dag->callback_cs_event_create(l_dag,l_datum,l_hashes,l_hashes_linked,&l_event_size);
if (l_event&&l_event_size) { // Event is created
if (l_dag->is_add_directy) {
if (l_dag->is_add_directly) {
l_cell = a_chain->cells;
if (s_chain_callback_atom_add(a_chain, l_event, l_event_size) == ATOM_ACCEPT) {
// add events to file
......@@ -631,9 +671,16 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain
dap_chain_hash_fast_t l_event_hash;
dap_chain_cs_dag_event_calc_hash(l_event,l_event_size, &l_event_hash);
char * l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_hash);
if(dap_chain_global_db_gr_set(dap_strdup(l_event_hash_str), (uint8_t *) l_event,
l_event_size,
l_dag->gdb_group_events_round_new)) {
// if(dap_chain_global_db_gr_set(dap_strdup(l_event_hash_str), (uint8_t *)l_event,
// l_event_size,
// l_dag->gdb_group_events_round_new)) {
dap_chain_cs_dag_event_round_cfg_t l_event_round_cfg;
if ( l_dag->callback_cs_get_round_cfg ) {
l_dag->callback_cs_get_round_cfg(l_dag, &l_event_round_cfg);
}
if(dap_chain_cs_dag_event_gdb_set(l_event_hash_str, l_event,
l_event_size, l_dag->gdb_group_events_round_new,
&l_event_round_cfg)) {
log_it(L_INFO, "Event %s placed in the new forming round", l_event_hash_str);
DAP_DELETE(l_event_hash_str);
l_event_hash_str = NULL;
......@@ -1361,8 +1408,12 @@ static int s_cli_dag(int argc, char ** argv, char **a_str_reply)
// Check if its ready or not
for (size_t i = 0; i< l_objs_size; i++ ){
dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t*) l_objs[i].value;
size_t l_event_size = l_objs[i].value_len;
// dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t*) l_objs[i].value;
// size_t l_event_size = l_objs[i].value_len;
dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *)
((dap_chain_cs_dag_event_round_item_t *)l_objs[i].value)->event;
size_t l_event_size = ((dap_chain_cs_dag_event_round_item_t *)l_objs[i].value)->event_size;
l_dag->callback_cs_set_event_round_cfg(l_dag, &((dap_chain_cs_dag_event_round_item_t *)l_objs[i].value)->cfg);
int l_ret_event_verify;
if ( ( l_ret_event_verify = l_dag->callback_cs_verify (l_dag,l_event,l_event_size) ) !=0 ){// if consensus accept the event
dap_string_append_printf( l_str_ret_tmp,
......@@ -1566,16 +1617,20 @@ static int s_cli_dag(int argc, char ** argv, char **a_str_reply)
}break;
case SUBCMD_EVENT_DUMP:{
dap_chain_cs_dag_event_t * l_event = NULL;
dap_chain_cs_dag_event_round_cfg_t l_event_round_cfg;
size_t l_event_size = 0;
if ( l_from_events_str ){
if ( strcmp(l_from_events_str,"round.new") == 0 ){
const char * l_gdb_group_events = l_dag->gdb_group_events_round_new;
l_event = (dap_chain_cs_dag_event_t *) dap_chain_global_db_gr_get
( l_event_hash_str ,&l_event_size,l_gdb_group_events );
// l_event = (dap_chain_cs_dag_event_t *) dap_chain_global_db_gr_get
// ( l_event_hash_str ,&l_event_size,l_gdb_group_events );
l_event = dap_chain_cs_dag_event_gdb_get(l_event_hash_str, &l_event_size,
l_gdb_group_events, &l_event_round_cfg);
}else if ( strncmp(l_from_events_str,"round.",6) == 0){
}else if ( strcmp(l_from_events_str,"events_lasts") == 0){
dap_chain_cs_dag_event_item_t * l_event_item = NULL;
dap_chain_cs_dag_event_round_cfg_t l_event_round_cfg;
pthread_rwlock_rdlock(&PVT(l_dag)->events_rwlock);
HASH_FIND(hh,PVT(l_dag)->events_lasts_unlinked,&l_event_hash,sizeof(l_event_hash),l_event_item);
pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock);
......@@ -1619,8 +1674,17 @@ static int s_cli_dag(int argc, char ** argv, char **a_str_reply)
dap_string_t * l_str_tmp = dap_string_new(NULL);
char buf[50];
time_t l_ts_reated = (time_t) l_event->header.ts_created;
dap_string_append_printf(l_str_tmp,"\nEvent %s:\n", l_event_hash_str);
// Round cfg
if ( strcmp(l_from_events_str,"round.new") == 0 )
dap_string_append_printf(l_str_tmp,
"\t\tRound cfg:\n\t\t\t\tconfirmations_minimum:%d\n\t\t\t\tconfirmations_timeout:%d\n",
l_event_round_cfg.confirmations_minimum, l_event_round_cfg.confirmations_timeout);
// Header
dap_string_append_printf(l_str_tmp,"Event %s:\n", l_event_hash_str);
dap_string_append_printf(l_str_tmp,"\t\tHeader:\n");
dap_string_append_printf(l_str_tmp,"\t\t\t\tversion: 0x%02X\n",l_event->header.version);
dap_string_append_printf(l_str_tmp,"\t\t\t\tcell_id: 0x%016"DAP_UINT64_FORMAT_x"\n",l_event->header.cell_id.uint64);
dap_string_append_printf(l_str_tmp,"\t\t\t\tchain_id: 0x%016"DAP_UINT64_FORMAT_X"\n",l_event->header.chain_id.uint64);
......@@ -1694,7 +1758,9 @@ static int s_cli_dag(int argc, char ** argv, char **a_str_reply)
dap_string_append_printf(l_str_tmp,"%s.%s: Found %zu records :\n",l_net->pub.name,l_chain->name,l_objs_count);
for (size_t i = 0; i< l_objs_count; i++){
dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *) l_objs[i].value;
// dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *) l_objs[i].value;
dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *)
((dap_chain_cs_dag_event_round_item_t *)l_objs[i].value)->event;
char buf[50];
time_t l_ts_create = (time_t) l_event->header.ts_created;
dap_string_append_printf(l_str_tmp,"\t%s: ts_create=%s",
......
......@@ -111,14 +111,44 @@ dap_chain_cs_dag_event_t * dap_chain_cs_dag_event_copy(dap_chain_cs_dag_event_t
* @return
*/
dap_chain_cs_dag_event_t * dap_chain_cs_dag_event_copy_with_sign_add( dap_chain_cs_dag_event_t * a_event, size_t a_event_size,
dap_enc_key_t * l_key)
size_t * a_event_size_new,
dap_chain_net_t * l_net, dap_enc_key_t * l_key)
{
size_t l_event_signing_size = dap_chain_cs_dag_event_calc_size_excl_signs( a_event ,a_event_size);
dap_sign_t * l_sign = dap_sign_create(l_key,a_event,l_event_signing_size,0);
size_t l_hashes_size = a_event->header.hash_count*sizeof(dap_chain_hash_fast_t);
dap_chain_datum_t * l_datum = (dap_chain_datum_t*)(a_event->hashes_n_datum_n_signs + l_hashes_size);
size_t l_datum_size = dap_chain_datum_size(l_datum);
size_t l_event_size_excl_sign = sizeof(a_event->header)+l_hashes_size+l_datum_size;
// size_t l_event_size_excl_sign = dap_chain_cs_dag_event_calc_size_excl_signs(a_event,a_event_size);
size_t l_event_size = a_event_size;
size_t l_event_signs_size = l_event_size - l_event_size_excl_sign;
dap_sign_t * l_sign = dap_sign_create(l_key,a_event,l_event_size_excl_sign,0);
size_t l_sign_size = dap_sign_get_size(l_sign);
dap_chain_cs_dag_event_t *l_event_new = DAP_NEW_Z_SIZE(dap_chain_cs_dag_event_t, a_event_size+l_sign_size);
memcpy(l_event_new, a_event,a_event_size);
memcpy(l_event_new+a_event_size,l_sign,l_sign_size);
dap_chain_addr_t l_addr = {0};
dap_chain_hash_fast_t l_pkey_hash;
dap_sign_get_pkey_hash(l_sign, &l_pkey_hash);
dap_chain_addr_fill(&l_addr, l_sign->header.type, &l_pkey_hash, l_net->pub.id);
char * l_addr_str = dap_chain_addr_to_str(&l_addr);
size_t l_offset = l_hashes_size+l_datum_size;
// checking re-sign from one address and calc signs size
while ( l_offset+sizeof(a_event->header) < l_event_size ) {
dap_sign_t * l_item_sign = (dap_sign_t *)(a_event->hashes_n_datum_n_signs +l_offset);
size_t l_sign_size = dap_sign_get_size(l_item_sign);
dap_chain_addr_t l_item_addr = {0};
dap_chain_hash_fast_t l_item_pkey_hash;
dap_sign_get_pkey_hash(l_item_sign, &l_item_pkey_hash);
dap_chain_addr_fill(&l_item_addr, l_item_sign->header.type, &l_item_pkey_hash, l_net->pub.id);
// checking re-sign from one address
if (memcmp(&l_addr, &l_item_addr, sizeof(l_item_addr)) == 0) {
log_it(L_WARNING, "Re-sign from addr: %s", l_addr_str);
return NULL;
}
l_offset += l_sign_size;
}
DAP_DELETE(l_addr_str);
dap_chain_cs_dag_event_t * l_event_new = DAP_NEW_Z_SIZE(dap_chain_cs_dag_event_t, l_event_size+l_sign_size);
memcpy(l_event_new, a_event, l_event_size);
memcpy(l_event_new->hashes_n_datum_n_signs+l_offset, l_sign, l_sign_size);
l_event_new->header.signs_count++;
return l_event_new;
}
......@@ -152,4 +182,34 @@ dap_sign_t * dap_chain_cs_dag_event_get_sign( dap_chain_cs_dag_event_t * a_event
return NULL;
}
bool dap_chain_cs_dag_event_gdb_set(char *a_event_hash_str, dap_chain_cs_dag_event_t * a_event, uint32_t a_event_size,
const char *a_group, dap_chain_cs_dag_event_round_cfg_t * a_event_round_cfg) {
dap_chain_cs_dag_event_round_item_t * l_event_round_item = DAP_NEW_SIZE(dap_chain_cs_dag_event_round_item_t,
sizeof(dap_chain_cs_dag_event_round_item_t)+a_event_size );
l_event_round_item->event_size = a_event_size;
// l_event_round_item->event = DAP_DUP_SIZE(a_event, a_event_size);
memcpy(&l_event_round_item->cfg, a_event_round_cfg, sizeof(dap_chain_cs_dag_event_round_cfg_t));
memcpy(l_event_round_item->event, a_event, a_event_size);
bool ret = dap_chain_global_db_gr_set(dap_strdup(a_event_hash_str), (uint8_t *)l_event_round_item,
dap_chain_cs_dag_event_round_item_get_size(l_event_round_item),
a_group);
DAP_DELETE(l_event_round_item);
return ret;
}
dap_chain_cs_dag_event_t* dap_chain_cs_dag_event_gdb_get(char *a_event_hash_str, uint32_t * a_event_size, const char *a_group,
dap_chain_cs_dag_event_round_cfg_t * a_event_round_cfg) {
size_t l_event_round_item_size = 0;
dap_chain_cs_dag_event_round_item_t* l_event_round_item =
(dap_chain_cs_dag_event_round_item_t*)dap_chain_global_db_gr_get(a_event_hash_str, &l_event_round_item_size, a_group );
if ( l_event_round_item == NULL )
return NULL;
size_t l_event_size = l_event_round_item->event_size;
dap_chain_cs_dag_event_t* l_event = DAP_NEW_SIZE(dap_chain_cs_dag_event_t, l_event_size);
memcpy(a_event_round_cfg, &l_event_round_item->cfg, sizeof(dap_chain_cs_dag_event_round_cfg_t));
memcpy(l_event, l_event_round_item->event, l_event_size);
DAP_DELETE(l_event_round_item);
*a_event_size = l_event_size;
return l_event;
}
......@@ -31,12 +31,14 @@ typedef struct dap_chain_cs_dag dap_chain_cs_dag_t;
typedef void (*dap_chain_cs_dag_callback_t)(dap_chain_cs_dag_t *);
typedef int (*dap_chain_cs_dag_callback_event_t)(dap_chain_cs_dag_t *, dap_chain_cs_dag_event_t *,size_t);
typedef dap_chain_cs_dag_event_t * (*dap_chain_cs_dag_callback_event_create_t)(dap_chain_cs_dag_t *,
dap_chain_datum_t *,
dap_chain_hash_fast_t *,
size_t, size_t*);
typedef int (*dap_chain_cs_dag_callback_get_round_cfg_t)(dap_chain_cs_dag_t *, dap_chain_cs_dag_event_round_cfg_t *);
typedef void (*dap_chain_cs_dag_callback_set_event_round_cfg_t)(dap_chain_cs_dag_t *, dap_chain_cs_dag_event_round_cfg_t *);
typedef struct dap_chain_cs_dag_hal_item {
dap_chain_hash_fast_t hash;
UT_hash_handle hh;
......@@ -47,17 +49,22 @@ typedef struct dap_chain_cs_dag
dap_chain_t * chain;
bool is_single_line;
bool is_celled;
bool is_add_directy;
bool is_add_directly;
bool is_static_genesis_event;
dap_chain_hash_fast_t static_genesis_event_hash;
dap_chain_cs_dag_hal_item_t *hal;
dap_chain_cs_dag_event_round_cfg_t event_round_cfg; // for verify function
bool use_event_round_cfg;
uint16_t datum_add_hashes_count;
char * gdb_group_events_round_new;
dap_chain_cs_dag_callback_t callback_delete;
dap_chain_cs_dag_callback_event_create_t callback_cs_event_create;
dap_chain_cs_dag_callback_event_t callback_cs_verify;
dap_chain_cs_dag_callback_get_round_cfg_t callback_cs_get_round_cfg;
dap_chain_cs_dag_callback_set_event_round_cfg_t callback_cs_set_event_round_cfg;
void * _pvt;
void * _inheritor;
......
......@@ -26,6 +26,7 @@
#include "dap_enc_key.h"
#include "dap_chain_common.h"
#include "dap_chain_datum.h"
#include "dap_chain_net.h"
#include "dap_sign.h"
#include "dap_hash.h"
......@@ -46,6 +47,17 @@ typedef struct dap_chain_cs_dag_event {
uint8_t hashes_n_datum_n_signs[]; // Hashes, signes and datum
} DAP_ALIGN_PACKED dap_chain_cs_dag_event_t;
typedef struct dap_chain_cs_dag_event_round_cfg {
uint16_t confirmations_minimum; // param auth_certs_count_verify in PoA
uint32_t confirmations_timeout; // wait confirmations over minimum value (confirmations_minimum)
uint64_t ts_confirmations_minimum_completed;
} DAP_ALIGN_PACKED dap_chain_cs_dag_event_round_cfg_t;
typedef struct dap_chain_cs_dag_event_round_item {
dap_chain_cs_dag_event_round_cfg_t cfg;
uint32_t event_size;
uint8_t event[]; // event // dap_chain_cs_dag_event_t
} DAP_ALIGN_PACKED dap_chain_cs_dag_event_round_item_t;
dap_chain_cs_dag_event_t * dap_chain_cs_dag_event_new(dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, dap_chain_datum_t * a_datum,
dap_enc_key_t * a_key,
......@@ -66,7 +78,9 @@ static inline dap_chain_datum_t* dap_chain_cs_dag_event_get_datum(dap_chain_cs_d
dap_chain_cs_dag_event_t * dap_chain_cs_dag_event_copy(dap_chain_cs_dag_event_t *a_event_src, size_t a_event_size);
// Important: returns new deep copy of event
dap_chain_cs_dag_event_t * dap_chain_cs_dag_event_copy_with_sign_add( dap_chain_cs_dag_event_t * a_event,size_t a_event_size, dap_enc_key_t * l_key);
dap_chain_cs_dag_event_t * dap_chain_cs_dag_event_copy_with_sign_add( dap_chain_cs_dag_event_t * a_event, size_t a_event_size,
size_t * a_event_size_new,
dap_chain_net_t * l_net, dap_enc_key_t * l_key);
dap_sign_t * dap_chain_cs_dag_event_get_sign( dap_chain_cs_dag_event_t * a_event, size_t a_event_size, uint16_t a_sign_number);
/**
......@@ -123,3 +137,14 @@ static inline void dap_chain_cs_dag_event_calc_hash(dap_chain_cs_dag_event_t * a
{
dap_hash_fast(a_event, a_event_size, a_event_hash);
}
static inline uint32_t dap_chain_cs_dag_event_round_item_get_size(dap_chain_cs_dag_event_round_item_t * a_event_round_item){
return sizeof(dap_chain_cs_dag_event_round_item_t)+a_event_round_item->event_size;
}
bool dap_chain_cs_dag_event_gdb_set(char *a_event_hash_str, dap_chain_cs_dag_event_t * a_event, uint32_t a_event_size,
const char *a_group, dap_chain_cs_dag_event_round_cfg_t * a_event_round_cfg);
dap_chain_cs_dag_event_t* dap_chain_cs_dag_event_gdb_get(char *a_event_hash_str, uint32_t * a_event_size,
const char *a_group, dap_chain_cs_dag_event_round_cfg_t * a_event_round_cfg);