Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (2)
......@@ -690,6 +690,27 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
}
}
/**
* @brief dap_stream_ch_chain_create_sync_request_gdb
* @param a_ch_chain
* @param a_net
*/
void dap_stream_ch_chain_create_sync_request_gdb(dap_stream_ch_chain_t * a_ch_chain, dap_chain_net_t * a_net)
{
a_ch_chain->is_on_request = true;
memset(&a_ch_chain->request_hdr,0,sizeof (a_ch_chain->request_hdr));
a_ch_chain->request_hdr.net_id = a_net->pub.id;
struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request);
l_sync_request->ch = a_ch_chain->ch;
l_sync_request->worker = a_ch_chain->ch->stream_worker->worker;
memcpy(&l_sync_request->request, &a_ch_chain->request, sizeof (a_ch_chain->request));
memcpy(&l_sync_request->request_hdr, &a_ch_chain->request_hdr, sizeof (a_ch_chain->request_hdr));
dap_proc_queue_add_callback_inter(a_ch_chain->ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback,
l_sync_request);
}
/**
* @brief s_stream_ch_packet_in
* @param a_ch
......@@ -1036,12 +1057,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
l_ch_chain->request_hdr.net_id.uint64 , l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64, l_ch_chain->request.id_start, l_ch_chain->request.id_end );
struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request);
l_sync_request->ch = a_ch;
l_sync_request->worker = a_ch->stream_worker->worker;
memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request));
memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr));
dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_sync_request);
dap_stream_ch_chain_create_sync_request_gdb(l_ch_chain,dap_chain_net_by_id(l_ch_chain->request_hdr.net_id) );
}
}else{
log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request));
......
......@@ -81,7 +81,8 @@ typedef struct dap_stream_ch_chain {
bool request_updates_complete;
atomic_bool is_on_request; // Protects request section
bool is_on_request; // Protects request section
bool is_on_reverse_request;
dap_stream_ch_chain_callback_packet_t callback_notify_packet_out;
dap_stream_ch_chain_callback_packet_t callback_notify_packet_in;
......@@ -96,3 +97,4 @@ void dap_stream_ch_chain_deinit(void);
inline static uint8_t dap_stream_ch_chain_get_id(void) { return (uint8_t) 'C'; }
void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain);
void dap_stream_ch_chain_create_sync_request_gdb(dap_stream_ch_chain_t * a_ch_chain, dap_chain_net_t * a_net);
......@@ -80,13 +80,21 @@ int dap_db_driver_init(const char *a_driver_name, const char *a_filename_db)
int l_ret = -1;
if(s_used_driver)
dap_db_driver_deinit();
// Fill callbacks with zeros
memset(&s_drv_callback, 0, sizeof(dap_db_driver_callbacks_t));
// Setup driver name
s_used_driver = dap_strdup(a_driver_name);
// Compose path
char l_db_path_ext[strlen(a_driver_name) + strlen(a_filename_db) + 6];
dap_snprintf(l_db_path_ext, sizeof(l_db_path_ext), "%s/gdb-%s", a_filename_db, a_driver_name);
memset(&s_drv_callback, 0, sizeof(dap_db_driver_callbacks_t));
// Check for engine
if(!dap_strcmp(s_used_driver, "ldb"))
l_ret = -1;
else if(!dap_strcmp(s_used_driver, "sqlite"))
else if(!dap_strcmp(s_used_driver, "sqlite") || !dap_strcmp(s_used_driver, "sqlite3") )
l_ret = dap_db_driver_sqlite_init(l_db_path_ext, &s_drv_callback);
else if(!dap_strcmp(s_used_driver, "cdb"))
l_ret = dap_db_driver_cdb_init(l_db_path_ext, &s_drv_callback);
......
......@@ -83,14 +83,15 @@ int dap_db_driver_sqlite_init(const char *a_filename_db, dap_db_driver_callbacks
if(sqlite3_threadsafe() && !sqlite3_config(SQLITE_CONFIG_SERIALIZED))
l_ret = sqlite3_initialize();
if(l_ret != SQLITE_OK) {
log_it(L_ERROR, "Can't init sqlite err=%d", l_ret);
return l_ret;
log_it(L_ERROR, "Can't init sqlite err=%d (%s)", l_ret, sqlite3_errstr(l_ret));
return -2;
}
char *l_error_message = NULL;
s_db = dap_db_driver_sqlite_open(a_filename_db, SQLITE_OPEN_READWRITE, &l_error_message);
if(!s_db) {
log_it(L_ERROR, "Can't init sqlite err=%d", l_error_message);
log_it(L_ERROR, "Can't init sqlite err: \"%s\"", l_error_message);
dap_db_driver_sqlite_free(l_error_message);
l_ret = -3;
}
else {
if(!dap_db_driver_sqlite_set_pragma(s_db, "synchronous", "NORMAL")) // 0 | OFF | 1 | NORMAL | 2 | FULL
......@@ -154,12 +155,15 @@ sqlite3* dap_db_driver_sqlite_open(const char *a_filename_utf8, int a_flags, cha
int l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_FULLMUTEX, NULL);
// if unable to open the database file
if(l_rc == SQLITE_CANTOPEN) {
sqlite3_close(l_db);
log_it(L_WARNING,"No database on path %s, creating one from scratch", a_filename_utf8);
if(l_db)
sqlite3_close(l_db);
// try to create database
l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_FULLMUTEX | SQLITE_OPEN_CREATE, NULL);
}
if(l_rc != SQLITE_OK)
{
if(l_rc != SQLITE_OK) {
log_it(L_CRITICAL,"Can't open database on path %s (code %d: \"%s\" )", a_filename_utf8, l_rc, sqlite3_errstr(l_rc));
if(a_error_message)
*a_error_message = sqlite3_mprintf("Can't open database: %s\n", sqlite3_errmsg(l_db));
sqlite3_close(l_db);
......@@ -220,7 +224,7 @@ int dap_db_driver_sqlite_flush()
char *l_error_message;
s_db = dap_db_driver_sqlite_open(s_filename_db, SQLITE_OPEN_READWRITE, &l_error_message);
if(!s_db) {
log_it(L_ERROR, "Can't init sqlite err=%d", l_error_message);
log_it(L_ERROR, "Can't init sqlite err: \"%s\"", l_error_message);
dap_db_driver_sqlite_free(l_error_message);
return -3;
}
......
......@@ -219,13 +219,19 @@ static const dap_chain_node_client_callbacks_t s_node_link_callbacks={
.error=s_node_link_callback_error
};
// State machine switchs here
static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg);
// Prepare link success/error endpoints
static void s_net_state_link_prepare_success(dap_worker_t * a_worker,dap_chain_node_info_t * a_node_info, void * a_arg);
static void s_net_state_link_prepare_error(dap_worker_t * a_worker,dap_chain_node_info_t * a_node_info, void * a_arg, int a_errno);
static void s_net_proc_kill( dap_chain_net_t * a_net );
int s_net_load(const char * a_net_name, uint16_t a_acl_idx);
// Notify callback for GlobalDB changes
static void s_gbd_history_callback_notify (void * a_arg,const char a_op_code, const char * a_prefix, const char * a_group,
const char * a_key, const void * a_value,
const size_t a_value_len);
......@@ -700,9 +706,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg)
case NODE_ROLE_CELL_MASTER: {
if (l_net_pvt->seed_aliases_count) {
// Add other root nodes as synchronization links
pthread_rwlock_unlock(&l_net_pvt->rwlock);
s_fill_links_from_root_aliases(l_net);
pthread_rwlock_wrlock(&l_net_pvt->rwlock);
l_net_pvt->state = NET_STATE_LINKS_CONNECTING;
l_repeat_after_exit = true;
break;
......@@ -721,10 +725,19 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg)
if (l_net_pvt->seed_aliases_count) {
i = rand() % l_net_pvt->seed_aliases_count;
dap_chain_node_addr_t *l_remote_addr = dap_chain_node_alias_find(l_net, l_net_pvt->seed_aliases[i]);
dap_chain_node_info_t *l_remote_node_info = dap_chain_node_info_read(l_net, l_remote_addr);
l_addr.s_addr = l_remote_node_info ? l_remote_node_info->hdr.ext_addr_v4.s_addr : 0;
DAP_DELETE(l_remote_node_info);
l_port = DNS_LISTEN_PORT;
if (l_remote_addr){
dap_chain_node_info_t *l_remote_node_info = dap_chain_node_info_read(l_net, l_remote_addr);
if(l_remote_node_info){
l_addr.s_addr = l_remote_node_info ? l_remote_node_info->hdr.ext_addr_v4.s_addr : 0;
DAP_DELETE(l_remote_node_info);
l_port = DNS_LISTEN_PORT;
}else{
log_it(L_WARNING,"Can't find node info for node addr "NODE_ADDR_FP_STR,
NODE_ADDR_FP_ARGS(l_remote_addr));
}
}else{
log_it(L_WARNING,"Can't find alias info for seed alias %s",l_net_pvt->seed_aliases[i]);
}
} else if (l_net_pvt->bootstrap_nodes_count) {
i = rand() % l_net_pvt->bootstrap_nodes_count;
l_addr = l_net_pvt->bootstrap_nodes_addrs[i];
......@@ -758,13 +771,12 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg)
s_net_state_link_prepare_error,l_dns_request);
}
}
l_link_id++;
}
if (l_sync_fill_root_nodes){
pthread_rwlock_unlock(&l_net_pvt->rwlock);
log_it(L_ATT,"Not found bootstrap addresses, fill seed nodelist from root aliases");
s_fill_links_from_root_aliases(l_net);
pthread_rwlock_wrlock(&l_net_pvt->rwlock);
}
l_link_id++;
} break;
}
} break;
......
......@@ -305,47 +305,12 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha
dap_chain_net_t * l_net = l_node_client->net;
assert(l_net);
if(s_stream_ch_chain_debug_more)
log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" synced GDB", l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr ));
log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" synced GDB, request reverse sync", l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr ));
// We over with GLOBAL_DB and switch on syncing chains
l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES;
dap_chain_net_state_t l_net_state = dap_chain_net_get_state(l_node_client->net);
l_node_client->state = NODE_CLIENT_STATE_SYNC_GDB_RVRS ;
a_ch_chain->is_on_reverse_request = true;
// Begin from the first chain
l_node_client->cur_chain = l_node_client->net->pub.chains;
dap_chain_cell_id_t l_cell_id={0};
dap_chain_id_t l_chain_id={0};
if(! l_node_client->cur_chain){
log_it(L_CRITICAL,"In: Can't sync chains for %s because there is no chains in it",l_net->pub.name);
dap_stream_ch_chain_pkt_write_error_unsafe(a_ch_chain->ch,l_net->pub.id.uint64,
l_chain_id.uint64,l_cell_id.uint64,"ERROR_CHAIN_NO_CHAINS");
l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES ;
}else{ // If present - select the first one cell in chain
l_chain_id=l_node_client->cur_chain->id;
dap_chain_cell_t * l_cell = l_node_client->cur_chain->cells;
if (l_cell){
l_cell_id=l_cell->id;
}
uint64_t l_net_id = l_net->pub.id.uint64;
dap_stream_ch_chain_pkt_t * l_chain_pkt;
size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr);
l_chain_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size );
l_chain_pkt->hdr.version = 1;
l_chain_pkt->hdr.net_id.uint64 = l_net_id;
l_chain_pkt->hdr.cell_id.uint64 = l_cell_id.uint64;
l_chain_pkt->hdr.chain_id.uint64 = l_chain_id.uint64;
dap_stream_ch_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START ,
l_chain_pkt,l_chain_pkt_size);
DAP_DELETE(l_chain_pkt);
log_it(L_INFO,
"In: Send UPDATE_CHAINS_START: net_id=0x%016x chain_id=0x%016x cell_id=0x%016x ",
l_net_id,l_chain_id.uint64,l_cell_id.uint64
);
}
dap_stream_ch_chain_create_sync_request_gdb(a_ch_chain, l_node_client->net);
}break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: {
......@@ -416,11 +381,17 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha
#else
SetEvent( l_node_client->wait_cond );
#endif
dap_stream_ch_chain_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS,
l_net->pub.id.uint64 ,
l_chain_id.uint64,l_cell_id.uint64,NULL,0);
a_ch_chain->state = CHAIN_STATE_SYNC_CHAINS ;
if (l_node_client->net->pub.chains){
dap_chain_t * l_chain = l_node_client->net->pub.chains;
a_ch_chain->request_atom_iter = l_chain->callback_atom_iter_create(l_chain);
size_t l_first_size = 0;
l_chain->callback_atom_iter_get_first(a_ch_chain->request_atom_iter, &l_first_size);
}
dap_stream_ch_set_ready_to_write_unsafe(a_ch_chain->ch, true);
}
}
......@@ -453,20 +424,67 @@ static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t* a_ch_ch
(void) a_pkt_data_size;
(void) a_ch_chain;
dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg;
switch (a_pkt_type) {
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL:
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB:
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB:{
if (a_ch_chain->is_on_reverse_request){
a_ch_chain->is_on_reverse_request = false;
dap_chain_net_t * l_net = l_node_client->net;
// We over with GLOBAL_DB and switch on syncing chains
l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES;
// Begin from the first chain
l_node_client->cur_chain = l_node_client->net->pub.chains;
dap_chain_cell_id_t l_cell_id={0};
dap_chain_id_t l_chain_id={0};
if(! l_node_client->cur_chain){
log_it(L_CRITICAL,"In: Can't sync chains for %s because there is no chains in it",l_net->pub.name);
dap_stream_ch_chain_pkt_write_error_unsafe(a_ch_chain->ch,l_net->pub.id.uint64,
l_chain_id.uint64,l_cell_id.uint64,"ERROR_CHAIN_NO_CHAINS");
l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES ;
}else{ // If present - select the first one cell in chain
l_chain_id=l_node_client->cur_chain->id;
dap_chain_cell_t * l_cell = l_node_client->cur_chain->cells;
if (l_cell){
l_cell_id=l_cell->id;
}
uint64_t l_net_id = l_net->pub.id.uint64;
dap_stream_ch_chain_pkt_t * l_chain_pkt;
size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr);
l_chain_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size );
l_chain_pkt->hdr.version = 1;
l_chain_pkt->hdr.net_id.uint64 = l_net_id;
l_chain_pkt->hdr.cell_id.uint64 = l_cell_id.uint64;
l_chain_pkt->hdr.chain_id.uint64 = l_chain_id.uint64;
dap_stream_ch_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START ,
l_chain_pkt,l_chain_pkt_size);
DAP_DELETE(l_chain_pkt);
log_it(L_INFO,
"In: Send UPDATE_CHAINS_START: net_id=0x%016x chain_id=0x%016x cell_id=0x%016x ",
l_net_id,l_chain_id.uint64,l_cell_id.uint64
);
}
}
}break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL:
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: {
l_node_client->state = NODE_CLIENT_STATE_SYNCED;
#ifndef _WIN32
pthread_cond_broadcast(&l_node_client->wait_cond);
#else
SetEvent( l_node_client->wait_cond );
#endif
}
break;
default: {
}
if(s_stream_ch_chain_debug_more)
log_it(L_INFO,"Out: chains all sent to uplink "NODE_ADDR_FP_STR,NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr));
l_node_client->state = NODE_CLIENT_STATE_SYNCED ;
#ifndef _WIN32
pthread_cond_broadcast(&l_node_client->wait_cond);
#else
SetEvent( l_node_client->wait_cond );
#endif
a_ch_chain->is_on_reverse_request = false;
}break;
default: {
}
}
}
......
......@@ -39,7 +39,9 @@ typedef enum dap_chain_node_client_state {
NODE_CLIENT_STATE_PONG = 4,
NODE_CLIENT_STATE_CONNECTING = 5,
NODE_CLIENT_STATE_ESTABLISHED = 100,
NODE_CLIENT_STATE_SYNC_GDB = 101,
NODE_CLIENT_STATE_SYNC_GDB_UPDATES = 101,
NODE_CLIENT_STATE_SYNC_GDB = 102,
NODE_CLIENT_STATE_SYNC_GDB_RVRS = 103,
NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES = 110,
NODE_CLIENT_STATE_SYNC_CHAINS = 111,
NODE_CLIENT_STATE_SYNC_CHAINS_RVRS = 112,
......