Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (5)
......@@ -2,7 +2,7 @@ project(cellframe-sdk C)
cmake_minimum_required(VERSION 2.8)
set(CMAKE_C_STANDARD 11)
set(CELLFRAME_SDK_NATIVE_VERSION "2.8-7")
set(CELLFRAME_SDK_NATIVE_VERSION "2.8-8")
add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"")
set(DAPSDK_MODULES "")
......
......@@ -27,7 +27,7 @@
///========================================================================
/* Names for the four varieties of Dilithium */
typedef enum { MODE_0, MODE_1, MODE_2, MODE_3 } __attribute__((aligned(8))) dilithium_kind_t;
typedef enum { MODE_0, MODE_1, MODE_2, MODE_3 } __attribute__((aligned(4))) dilithium_kind_t;
typedef struct {
dilithium_kind_t kind; /* the kind of Dilithium (i.e. *this* choice of parameters) */
......
......@@ -127,7 +127,7 @@ static bool s_timer_timeout_check(void * a_arg)
if(dap_events_socket_check_unsafe(l_worker, l_es) ){
if (!dap_uint128_check_equal(l_es->uuid,l_es_handler->uuid)){
// Timer esocket wrong argument, ignore this timeout...
DAP_DELETE(l_es_handler);
DAP_DEL_Z(l_es_handler)
return false;
}
dap_client_http_pvt_t * l_http_pvt = PVT(l_es);
......@@ -141,10 +141,8 @@ static bool s_timer_timeout_check(void * a_arg)
log_it(L_INFO, "Close %s sock %u type %d by timeout",
l_es->remote_addr_str ? l_es->remote_addr_str : "", l_es->socket, l_es->type);
dap_events_socket_remove_and_delete_unsafe(l_es, true);
} else {
log_it(L_INFO, "Socket %d type %d already disposed", l_es->socket, l_es->type);
}
DAP_DELETE(l_es_handler);
DAP_DEL_Z(l_es_handler)
return false;
}
......@@ -501,7 +499,10 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin
log_it(L_DEBUG, "Connecting to %s:%u", a_uplink_addr, a_uplink_port);
l_http_pvt->worker = a_worker?a_worker: dap_events_worker_get_auto();
dap_worker_add_events_socket(l_ev_socket,l_http_pvt->worker);
dap_timerfd_start_on_worker(l_http_pvt->worker,s_client_timeout_ms, s_timer_timeout_check,l_ev_socket);
dap_events_socket_handler_t * l_ev_socket_handler = DAP_NEW_Z(dap_events_socket_handler_t);
l_ev_socket_handler->esocket = l_ev_socket;
l_ev_socket_handler->uuid = l_ev_socket->uuid;
dap_timerfd_start_on_worker(l_http_pvt->worker,s_client_timeout_ms, s_timer_timeout_check,l_ev_socket_handler);
return l_http_pvt;
} else {
log_it(L_ERROR, "Socket %d connecting error: %d", l_ev_socket->socket, l_err2);
......
......@@ -158,7 +158,7 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events,
l_ret->buf_out = a_callbacks->timer_callback ? NULL : DAP_NEW_Z_SIZE(byte_t, l_ret->buf_out_size_max + 1);
l_ret->buf_in_size = l_ret->buf_out_size = 0;
#if defined(DAP_EVENTS_CAPS_EPOLL)
ret->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP;
l_ret->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP;
#elif defined(DAP_EVENTS_CAPS_POLL)
l_ret->poll_base_flags = POLLERR | POLLRDHUP | POLLHUP;
#endif
......
......@@ -62,6 +62,10 @@
#include "dap_cert_file.h"
#include "dap_timerfd.h"
#include "dap_stream_worker.h"
#include "dap_worker.h"
#include "dap_proc_queue.h"
#include "dap_proc_thread.h"
#include "dap_enc_http.h"
#include "dap_chain_common.h"
......@@ -136,8 +140,13 @@ typedef struct dap_chain_net_pvt{
dap_chain_node_addr_t * node_addr;
dap_chain_node_info_t * node_info; // Current node's info
// Established links
dap_list_t *links; // Links list
size_t links_count;
// Prepared links
dap_list_t *links_info; // Links info list
atomic_uint links_dns_requests;
bool load_mode;
......@@ -202,6 +211,7 @@ static void s_node_link_callback_stage(dap_chain_node_client_t * a_node_client,d
static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client, int a_error, void * a_arg);
static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg);
static bool s_node_link_states_proc(dap_proc_thread_t *a_thread, void *a_arg);
static void s_net_state_link_prepare_success(dap_worker_t * a_worker,dap_chain_node_info_t * a_node_info, void * a_arg);
static void s_net_state_link_prepare_error(dap_worker_t * a_worker,dap_chain_node_info_t * a_node_info, void * a_arg, int a_errno);
......@@ -413,6 +423,23 @@ static void s_fill_links_from_root_aliases(dap_chain_net_t * a_net)
static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_client, void * a_arg)
{
dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg;
dap_chain_net_pvt_t * l_net_pvt = PVT(l_net);
dap_chain_node_info_t * l_link_info = a_node_client->info;
a_node_client->state = NODE_CLIENT_STATE_ESTABLISHED;
log_it(L_DEBUG, "Established connection with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address));
pthread_rwlock_wrlock(&l_net_pvt->rwlock);
l_net_pvt->links = dap_list_append(l_net_pvt->links, a_node_client);
l_net_pvt->links_count++;
size_t l_links_count = l_net_pvt->links_count;
if(l_net_pvt->state == NET_STATE_LINKS_CONNECTING )
l_net_pvt->state = NET_STATE_LINKS_ESTABLISHED;
pthread_rwlock_unlock(&l_net_pvt->rwlock);
dap_proc_queue_add_callback_inter(dap_client_get_stream_worker(a_node_client->client)->worker->proc_queue_input,
s_node_link_states_proc, a_node_client);
}
/**
......@@ -447,6 +474,9 @@ static void s_node_link_callback_stage(dap_chain_node_client_t * a_node_client,d
static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client, int a_error, void * a_arg)
{
dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg;
log_it(L_DEBUG, "Can't establish link with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address));
dap_chain_node_client_close(l_node_client);
l_node_client = NULL;
}
......@@ -474,7 +504,6 @@ static void s_net_state_link_prepare_success(dap_worker_t * a_worker,dap_chain_n
dap_proc_queue_add_callback_inter( a_worker->proc_queue_input,s_net_states_proc,l_net );
}
pthread_rwlock_unlock(&l_net_pvt->rwlock);
l_dns_request->tries++;
s_fill_links_from_root_aliases(l_net);
......@@ -482,6 +511,7 @@ static void s_net_state_link_prepare_success(dap_worker_t * a_worker,dap_chain_n
if (l_net_pvt->links_dns_requests == 0){ // It was the last one
}
pthread_rwlock_unlock(&l_net_pvt->rwlock);
}
/**
......@@ -497,9 +527,209 @@ static void s_net_state_link_prepare_error(dap_worker_t * a_worker,dap_chain_nod
dap_chain_net_t * l_net = l_dns_request->net;
dap_chain_net_pvt_t * l_net_pvt = PVT(l_net);
pthread_rwlock_unlock(&l_net_pvt->rwlock);
l_net_pvt->links_dns_requests--;
}
/**
* @brief s_node_link_states_proc
* @param a_thread
* @param a_arg
* @return
*/
static bool s_node_link_states_proc(dap_proc_thread_t *a_thread, void *a_arg)
{
bool l_repeate_after_exit = false;
dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t*) a_arg;
assert(l_node_client);
dap_chain_net_pvt_t * l_net_pvt = PVT(l_node_client->net);
switch (l_node_client->state) {
case NODE_CLIENT_STATE_ESTABLISHED:
if(l_node_client->sync_chains){
l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS;
l_repeate_after_exit = true;
}
if(l_node_client->sync_gdb){
l_node_client->state = NODE_CLIENT_STATE_SYNC_GDB;
l_repeate_after_exit = true;
}
break;
case NODE_CLIENT_STATE_SYNC_GDB:{
dap_stream_worker_t *l_worker = dap_client_get_stream_worker(l_node_client->client);
dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, dap_stream_ch_chain_get_id());
if ( !l_ch_chain) { // Channel or stream or client itself closed
l_tmp = dap_list_next(l_tmp);
dap_chain_node_client_close(l_node_client);
l_net_pvt->links = dap_list_remove(l_net_pvt->links, l_node_client);
continue;
}
dap_stream_ch_chain_sync_request_t l_sync_gdb = {};
// Get last timestamp in log if wasn't SYNC_FROM_ZERO flag
if (! (l_net_pvt->flags & F_DAP_CHAIN_NET_SYNC_FROM_ZERO) )
l_sync_gdb.id_start = (uint64_t) dap_db_get_last_id_remote(l_node_client->remote_node_addr.uint64);
l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
log_it(L_DEBUG, "Prepared request to gdb sync from %llu to %llu", l_sync_gdb.id_start, l_sync_gdb.id_end?l_sync_gdb.id_end:-1 );
// find dap_chain_id_t
dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb");
dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t ) {0};
dap_chain_node_client_reset(l_node_client);
size_t l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id,
l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb));
if (l_res == 0) {
log_it(L_WARNING, "Can't send GDB sync request");
continue;
}
// wait for finishing of request
int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms
// TODO add progress info to console
l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
switch (l_res) {
case -1:
log_it(L_WARNING, "Timeout with link sync gdb");
break;
case 0:
log_it(L_INFO, "Node sync gdb completed");
break;
default:
log_it(L_INFO, "Node sync gdb error %d",l_res);
}
dap_chain_node_client_reset(l_node_client);
l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS, l_net->pub.id,
l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb));
l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
switch (l_res) {
case -1:
log_it(L_WARNING, "Timeout with reverse link gdb sync");
break;
case 0:
log_it(L_INFO, "Node reverse gdb sync completed");
break;
default:
log_it(L_INFO, "Node reverse gdb sync error %d",l_res);
}
// -----
if (!l_net_pvt->links) {
l_net_pvt->state = NET_STATE_LINKS_PREPARE;
} else if (l_net_pvt->state_target >= NET_STATE_SYNC_CHAINS) {
l_net_pvt->state = NET_STATE_SYNC_CHAINS;
} else { // Synchronization done, go offline
log_it(L_INFO, "Synchronization done, go offline");
l_net_pvt->flags ^= F_DAP_CHAIN_NET_GO_SYNC;
l_net_pvt->flags ^= F_DAP_CHAIN_NET_SYNC_FROM_ZERO;
l_net_pvt->state = l_net_pvt->state_target = NET_STATE_OFFLINE;
}
} break;
case NODE_CLIENT_STATE_SYNC_CHAINS:{
dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data;
dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, dap_stream_ch_chain_get_id());
if (!l_ch_chain) { // Channel or stream or client itself closed
l_tmp = dap_list_next(l_tmp);
dap_chain_node_client_close(l_node_client);
l_net_pvt->links = dap_list_remove(l_net_pvt->links, l_node_client);
continue;
}
dap_stream_worker_t *l_worker = dap_client_get_stream_worker(l_node_client->client);
dap_chain_t * l_chain = NULL;
int l_res = 0;
DL_FOREACH (l_net->pub.chains, l_chain) {
dap_chain_node_client_reset(l_node_client);
dap_stream_ch_chain_sync_request_t l_request = {0};
// TODO: Uncomment next block when finish with partial updates
/*
if (! (l_pvt_net->flags & F_DAP_CHAIN_NET_SYNC_FROM_ZERO) )
dap_chain_get_atom_last_hash(l_chain,&l_request.hash_from);
*/
if ( !dap_hash_fast_is_blank(&l_request.hash_from) ){
if(dap_log_level_get() <= L_DEBUG){
char l_hash_str[128]={[0]='\0'};
dap_chain_hash_fast_to_str(&l_request.hash_from,l_hash_str,sizeof (l_hash_str)-1);
log_it(L_DEBUG,"Send sync chain request to"NODE_ADDR_FP_STR" for %s:%s from %s to infinity",
NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr) ,l_net->pub.name, l_chain->name, l_hash_str);
}
}else
log_it(L_DEBUG,"Send sync chain request for all the chains for addr "NODE_ADDR_FP_STR,
NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr));
dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id,
l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request));
// wait for finishing of request
int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms
// TODO add progress info to console
l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
switch (l_res) {
case -1:
//log_it(L_WARNING, "Timeout with sync of chain '%s' ", l_chain->name);
break;
case 0:
l_need_flush = true;
log_it(L_INFO, "Sync of chain '%s' completed ", l_chain->name);
break;
default:
log_it(L_ERROR, "Sync of chain '%s' error %d", l_chain->name,l_res);
}
dap_chain_node_client_reset(l_node_client);
l_request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS, l_net->pub.id,
l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request));
l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
switch (l_res) {
case -1:
//log_it(L_WARNING, "Timeout with reverse sync of chain '%s' ", l_chain->name);
break;
case 0:
l_need_flush = true;
log_it(L_INFO, "Reverse sync of chain '%s' completed ", l_chain->name);
// set time of last sync
{
struct timespec l_to;
clock_gettime(CLOCK_MONOTONIC, &l_to);
l_net_pvt->last_sync = l_to.tv_sec;
}
break;
default:
log_it(L_ERROR, "Reverse sync of chain '%s' error %d", l_chain->name,l_res);
}
}
///-------------------
if (l_need_flush) {
// flush global_db
dap_chain_global_db_flush();
}
if (!l_net_pvt->links ) {
log_it( L_INFO,"Return back to state LINKS_PREPARE ");
l_net_pvt->state = NET_STATE_LINKS_PREPARE;
} else {
if (l_net_pvt->state_target == NET_STATE_ONLINE) {
l_net_pvt->flags ^= F_DAP_CHAIN_NET_GO_SYNC;
l_net_pvt->flags ^= F_DAP_CHAIN_NET_SYNC_FROM_ZERO;
l_net_pvt->state = NET_STATE_ONLINE;
log_it(L_INFO, "Synchronization done, status online");
} else { // Synchronization done, go offline
l_net_pvt->state = l_net_pvt->state_target = NET_STATE_OFFLINE;
log_it(L_INFO, "Synchronization done, go offline");
}
}
}break;
case NODE_CLIENT_STATE_SYNCED:
break;
default:{
log_it(L_WARNING,"Non-processing node client state %d", l_node_client->state);
}
}
return l_repeate_after_exit;
}
/**
* @brief s_net_states_proc
* @param l_net
......@@ -563,6 +793,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg)
if (l_net_pvt->seed_aliases_count) {
// Add other root nodes as synchronization links
s_fill_links_from_root_aliases(l_net);
l_repeat_after_exit = true;
break;
}
}
......@@ -620,8 +851,9 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg)
}
l_tries++;
}
if (l_sync_fill_root_nodes)
if (l_sync_fill_root_nodes){
s_fill_links_from_root_aliases(l_net);
}
} break;
}
if (l_net_pvt->state_target != NET_STATE_OFFLINE) {
......@@ -636,215 +868,20 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg)
} else {
l_net_pvt->state = NET_STATE_OFFLINE;
}
l_repeat_after_exit = true;
} break;
case NET_STATE_LINKS_CONNECTING: {
log_it(L_DEBUG, "%s.state: NET_STATE_LINKS_CONNECTING",l_net->pub.name);
for (dap_list_t *l_tmp = l_net_pvt->links_info; l_tmp; l_tmp = dap_list_next(l_tmp)) {
dap_chain_node_info_t *l_link_info = (dap_chain_node_info_t *)l_tmp->data;
dap_chain_node_client_t *l_node_client = dap_chain_node_client_create_n_connect(l_link_info,"CN",s_node_link_callback_connected,
dap_chain_node_client_t *l_node_client = dap_chain_node_client_create_n_connect(l_net, l_link_info,"CN",s_node_link_callback_connected,
s_node_link_callback_disconnected,s_node_link_callback_stage,
s_node_link_callback_error,NULL);
if (l_node_client) {
// wait connected
int res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_CONNECTED, 20000 );
if (res == 0 ) {
log_it(L_DEBUG, "Established connection with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address));
l_net_pvt->links = dap_list_append(l_net_pvt->links, l_node_client);
} else {
log_it(L_DEBUG, "Can't establish link with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address));
dap_chain_node_client_close(l_node_client);
l_node_client = NULL;
}
}
if (dap_list_length(l_net_pvt->links) >= s_required_links_count) {
break;
}
}
if (l_net_pvt->links) { // We have at least one working link
l_net_pvt->state = NET_STATE_SYNC_GDB;
} else { // Try to find another links
struct timespec l_sleep = {3, 0};
nanosleep(&l_sleep, NULL);
l_net_pvt->state = NET_STATE_OFFLINE;
}
} break;
case NET_STATE_SYNC_GDB:{
for (dap_list_t *l_tmp = l_net_pvt->links; l_tmp; ) {
dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data;
dap_stream_worker_t *l_worker = dap_client_get_stream_worker(l_node_client->client);
dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, dap_stream_ch_chain_get_id());
if ( !l_ch_chain) { // Channel or stream or client itself closed
l_tmp = dap_list_next(l_tmp);
dap_chain_node_client_close(l_node_client);
l_net_pvt->links = dap_list_remove(l_net_pvt->links, l_node_client);
continue;
}
dap_stream_ch_chain_sync_request_t l_sync_gdb = {};
// Get last timestamp in log if wasn't SYNC_FROM_ZERO flag
if (! (l_net_pvt->flags & F_DAP_CHAIN_NET_SYNC_FROM_ZERO) )
l_sync_gdb.id_start = (uint64_t) dap_db_get_last_id_remote(l_node_client->remote_node_addr.uint64);
l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
log_it(L_DEBUG, "Prepared request to gdb sync from %llu to %llu", l_sync_gdb.id_start, l_sync_gdb.id_end?l_sync_gdb.id_end:-1 );
// find dap_chain_id_t
dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb");
dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t ) {0};
dap_chain_node_client_reset(l_node_client);
size_t l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id,
l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb));
if (l_res == 0) {
log_it(L_WARNING, "Can't send GDB sync request");
continue;
}
// wait for finishing of request
int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms
// TODO add progress info to console
l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
switch (l_res) {
case -1:
log_it(L_WARNING, "Timeout with link sync gdb");
break;
case 0:
log_it(L_INFO, "Node sync gdb completed");
break;
default:
log_it(L_INFO, "Node sync gdb error %d",l_res);
}
dap_chain_node_client_reset(l_node_client);
l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS, l_net->pub.id,
l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb));
l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
switch (l_res) {
case -1:
log_it(L_WARNING, "Timeout with reverse link gdb sync");
break;
case 0:
log_it(L_INFO, "Node reverse gdb sync completed");
break;
default:
log_it(L_INFO, "Node reverse gdb sync error %d",l_res);
}
l_tmp = dap_list_next(l_tmp);
}
if (!l_net_pvt->links) {
l_net_pvt->state = NET_STATE_LINKS_PREPARE;
} else if (l_net_pvt->state_target >= NET_STATE_SYNC_CHAINS) {
l_net_pvt->state = NET_STATE_SYNC_CHAINS;
} else { // Synchronization done, go offline
log_it(L_INFO, "Synchronization done, go offline");
l_net_pvt->flags ^= F_DAP_CHAIN_NET_GO_SYNC;
l_net_pvt->flags ^= F_DAP_CHAIN_NET_SYNC_FROM_ZERO;
l_net_pvt->state = l_net_pvt->state_target = NET_STATE_OFFLINE;
}
}
break;
case NET_STATE_SYNC_CHAINS: {
bool l_need_flush = false;
for (dap_list_t *l_tmp = l_net_pvt->links; l_tmp; l_tmp = dap_list_next(l_tmp)) {
dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data;
dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, dap_stream_ch_chain_get_id());
if (!l_ch_chain) { // Channel or stream or client itself closed
l_tmp = dap_list_next(l_tmp);
dap_chain_node_client_close(l_node_client);
l_net_pvt->links = dap_list_remove(l_net_pvt->links, l_node_client);
continue;
}
dap_stream_worker_t *l_worker = dap_client_get_stream_worker(l_node_client->client);
dap_chain_t * l_chain = NULL;
int l_res = 0;
DL_FOREACH (l_net->pub.chains, l_chain) {
dap_chain_node_client_reset(l_node_client);
dap_stream_ch_chain_sync_request_t l_request = {0};
// TODO: Uncomment next block when finish with partial updates
/*
if (! (l_pvt_net->flags & F_DAP_CHAIN_NET_SYNC_FROM_ZERO) )
dap_chain_get_atom_last_hash(l_chain,&l_request.hash_from);
*/
if ( !dap_hash_fast_is_blank(&l_request.hash_from) ){
if(dap_log_level_get() <= L_DEBUG){
char l_hash_str[128]={[0]='\0'};
dap_chain_hash_fast_to_str(&l_request.hash_from,l_hash_str,sizeof (l_hash_str)-1);
log_it(L_DEBUG,"Send sync chain request to"NODE_ADDR_FP_STR" for %s:%s from %s to infinity",
NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr) ,l_net->pub.name, l_chain->name, l_hash_str);
}
}else
log_it(L_DEBUG,"Send sync chain request for all the chains for addr "NODE_ADDR_FP_STR,
NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr));
dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id,
l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request));
// wait for finishing of request
int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms
// TODO add progress info to console
l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
switch (l_res) {
case -1:
//log_it(L_WARNING, "Timeout with sync of chain '%s' ", l_chain->name);
break;
case 0:
l_need_flush = true;
log_it(L_INFO, "Sync of chain '%s' completed ", l_chain->name);
break;
default:
log_it(L_ERROR, "Sync of chain '%s' error %d", l_chain->name,l_res);
}
dap_chain_node_client_reset(l_node_client);
l_request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS, l_net->pub.id,
l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request));
l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
switch (l_res) {
case -1:
//log_it(L_WARNING, "Timeout with reverse sync of chain '%s' ", l_chain->name);
break;
case 0:
l_need_flush = true;
log_it(L_INFO, "Reverse sync of chain '%s' completed ", l_chain->name);
// set time of last sync
{
struct timespec l_to;
clock_gettime(CLOCK_MONOTONIC, &l_to);
l_net_pvt->last_sync = l_to.tv_sec;
}
break;
default:
log_it(L_ERROR, "Reverse sync of chain '%s' error %d", l_chain->name,l_res);
}
}
l_tmp = dap_list_next(l_tmp);
}
if (l_need_flush) {
// flush global_db
dap_chain_global_db_flush();
}
if (!l_net_pvt->links ) {
log_it( L_INFO,"Return back to state LINKS_PREPARE ");
l_net_pvt->state = NET_STATE_LINKS_PREPARE;
} else {
if (l_net_pvt->state_target == NET_STATE_ONLINE) {
l_net_pvt->flags ^= F_DAP_CHAIN_NET_GO_SYNC;
l_net_pvt->flags ^= F_DAP_CHAIN_NET_SYNC_FROM_ZERO;
l_net_pvt->state = NET_STATE_ONLINE;
log_it(L_INFO, "Synchronization done, status online");
} else { // Synchronization done, go offline
l_net_pvt->state = l_net_pvt->state_target = NET_STATE_OFFLINE;
log_it(L_INFO, "Synchronization done, go offline");
}
}
}
break;
case NET_STATE_ONLINE: {
if (l_net_pvt->flags & F_DAP_CHAIN_NET_GO_SYNC)
{
......
......@@ -1002,7 +1002,7 @@ int com_node(int a_argc, char ** a_argv, void *arg_func, char **a_str_reply)
}
// wait connected
int timeout_ms = 7000; // 7 sec = 7000 ms
res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_CONNECTED, timeout_ms);
res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_ESTABLISHED, timeout_ms);
// select new node addr
if(l_is_auto && res){
if(l_remote_node_addr && l_nodes_count>1){
......@@ -1228,7 +1228,7 @@ int com_node(int a_argc, char ** a_argv, void *arg_func, char **a_str_reply)
return -7;
}
// wait handshake
int res = dap_chain_node_client_wait(client, NODE_CLIENT_STATE_CONNECTED, timeout_ms);
int res = dap_chain_node_client_wait(client, NODE_CLIENT_STATE_ESTABLISHED, timeout_ms);
if(res != 1) {
dap_chain_node_cli_set_reply_text(a_str_reply, "no response from node");
// clean client struct
......
......@@ -182,7 +182,7 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg)
l_node_client->callback_connected(l_node_client, a_arg);
l_node_client->keep_connection = true;
log_it(L_DEBUG, "Wakeup all who waits");
l_node_client->state = NODE_CLIENT_STATE_CONNECTED;
l_node_client->state = NODE_CLIENT_STATE_ESTABLISHED;
#ifndef _WIN32
pthread_cond_broadcast(&l_node_client->wait_cond);
#else
......@@ -412,6 +412,7 @@ dap_chain_node_client_t* dap_chain_client_connect(dap_chain_node_info_t *a_node_
/**
* @brief dap_chain_node_client_go_stage
* @param a_net
* @param a_node_info
* @param a_active_channels
* @param a_callback_connected
......@@ -421,7 +422,7 @@ dap_chain_node_client_t* dap_chain_client_connect(dap_chain_node_info_t *a_node_
* @param a_callback_arg
* @return
*/
dap_chain_node_client_t* dap_chain_node_client_create_n_connect(dap_chain_node_info_t *a_node_info,
dap_chain_node_client_t* dap_chain_node_client_create_n_connect(dap_chain_net_t * a_net, dap_chain_node_info_t *a_node_info,
const char *a_active_channels, dap_chain_node_client_callback_t a_callback_connected, dap_chain_node_client_callback_t a_callback_disconnected,
dap_chain_node_client_callback_stage_t a_callback_stage,
dap_chain_node_client_callback_error_t a_callback_error, void * a_callback_arg )
......@@ -437,6 +438,8 @@ dap_chain_node_client_t* dap_chain_node_client_create_n_connect(dap_chain_node_i
l_node_client->callback_discconnected = a_callback_disconnected;
l_node_client->callback_error = a_callback_error;
l_node_client->callback_stage = a_callback_stage;
l_node_client->info = a_node_info;
l_node_client->net = a_net;
#ifndef _WIN32
pthread_condattr_t attr;
......@@ -479,7 +482,7 @@ dap_chain_node_client_t* dap_chain_node_client_create_n_connect(dap_chain_node_i
// dap_client_stage_t a_stage_target = STAGE_ENC_INIT;
// dap_client_stage_t l_stage_target = STAGE_STREAM_STREAMING;
l_node_client->state = NODE_CLIENT_STATE_CONNECT;
l_node_client->state = NODE_CLIENT_STATE_CONNECTING ;
// ref pvt client
//dap_client_pvt_ref(DAP_CLIENT_PVT(l_node_client->client));
// Handshake & connect
......@@ -500,8 +503,8 @@ dap_chain_node_client_t* dap_chain_node_client_connect(dap_chain_node_info_t *a_
void dap_chain_node_client_reset(dap_chain_node_client_t *a_client)
{
if (a_client->state > NODE_CLIENT_STATE_CONNECTED) {
a_client->state = NODE_CLIENT_STATE_CONNECTED;
if (a_client->state > NODE_CLIENT_STATE_ESTABLISHED) {
a_client->state = NODE_CLIENT_STATE_ESTABLISHED;
}
}
......@@ -530,7 +533,7 @@ void dap_chain_node_client_close(dap_chain_node_client_t *a_client)
int dap_chain_node_client_send_ch_pkt(dap_chain_node_client_t *a_client, uint8_t a_ch_id, uint8_t a_type,
const void *a_pkt_data, size_t a_pkt_data_size)
{
if(!a_client || a_client->state < NODE_CLIENT_STATE_CONNECTED)
if(!a_client || a_client->state < NODE_CLIENT_STATE_ESTABLISHED)
return -1;
dap_stream_worker_t *l_stream_worker = dap_client_get_stream_worker(a_client->client);
......@@ -566,7 +569,7 @@ int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_s
return 0;
}
if (a_client->state < NODE_CLIENT_STATE_CONNECTED && a_waited_state > NODE_CLIENT_STATE_CONNECTED) {
if (a_client->state < NODE_CLIENT_STATE_ESTABLISHED && a_waited_state > NODE_CLIENT_STATE_ESTABLISHED) {
log_it(L_WARNING, "Waited state can't be achieved");
pthread_mutex_unlock(&a_client->wait_mutex);
return -2;
......@@ -676,7 +679,7 @@ static void nodelist_response_error_callback(dap_client_t *a_client, int a_err)
*/
int dap_chain_node_client_send_nodelist_req(dap_chain_node_client_t *a_client)
{
if(!a_client || !a_client->client || a_client->state < NODE_CLIENT_STATE_CONNECTED)
if(!a_client || !a_client->client || a_client->state < NODE_CLIENT_STATE_ESTABLISHED)
return -1;
//dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client->client);
......
......@@ -37,8 +37,8 @@ typedef enum dap_chain_node_client_state {
NODE_CLIENT_STATE_NODE_ADDR_LEASED = 2,
NODE_CLIENT_STATE_PING = 3,
NODE_CLIENT_STATE_PONG = 4,
NODE_CLIENT_STATE_CONNECT = 5,
NODE_CLIENT_STATE_CONNECTED = 100,
NODE_CLIENT_STATE_CONNECTING = 5,
NODE_CLIENT_STATE_ESTABLISHED = 100,
//NODE_CLIENT_STATE_SEND,
//NODE_CLIENT_STATE_SENDED,
NODE_CLIENT_STATE_SYNC_GDB = 101,
......@@ -56,9 +56,16 @@ typedef void (*dap_chain_node_client_callback_error_t)(dap_chain_node_client_t *
// state for a client connection
typedef struct dap_chain_node_client {
dap_chain_node_client_state_t state;
bool sync_gdb;
bool sync_chains;
dap_chain_cell_id_t cell_id;
dap_client_t *client;
dap_chain_node_info_t * info;
dap_events_t *events;
dap_chain_net_t * net;
char last_error[128];
#ifndef _WIN32
......@@ -91,7 +98,7 @@ int dap_chain_node_client_init(void);
void dap_chain_node_client_deinit(void);
dap_chain_node_client_t* dap_chain_node_client_create_n_connect(dap_chain_node_info_t *a_node_info, const char *a_active_channels,
dap_chain_node_client_t* dap_chain_node_client_create_n_connect(dap_chain_net_t * a_net, dap_chain_node_info_t *a_node_info, const char *a_active_channels,
dap_chain_node_client_callback_t a_callback_connected,
dap_chain_node_client_callback_t a_callback_disconnected,
dap_chain_node_client_callback_stage_t a_callback_stage,
......@@ -145,8 +152,8 @@ static inline const char * dap_chain_node_client_state_to_str( dap_chain_node_cl
case NODE_CLIENT_STATE_NODE_ADDR_LEASED: return "NODE_ADDR_LEASED";
case NODE_CLIENT_STATE_PING: return "PING";
case NODE_CLIENT_STATE_PONG: return "PONG";
case NODE_CLIENT_STATE_CONNECT: return "CONNECT";
case NODE_CLIENT_STATE_CONNECTED: return "CONNECTED";
case NODE_CLIENT_STATE_CONNECTING: return "CONNECT";
case NODE_CLIENT_STATE_ESTABLISHED: return "CONNECTED";
case NODE_CLIENT_STATE_SYNC_GDB: return "SYNC_GDB";
case NODE_CLIENT_STATE_SYNC_CHAINS: return "SYNC_CHAINS";
case NODE_CLIENT_STATE_SYNCED: return "SYNCED";
......
......@@ -448,7 +448,7 @@ int dap_chain_net_vpn_client_check(dap_chain_net_t *a_net, const char *a_ipv4_st
}
// wait connected
int l_timeout_ms = l_timeout_conn_ms; //5 sec = 5000 ms
int l_res = dap_chain_node_client_wait(s_vpn_client, NODE_CLIENT_STATE_CONNECTED, l_timeout_ms);
int l_res = dap_chain_node_client_wait(s_vpn_client, NODE_CLIENT_STATE_ESTABLISHED, l_timeout_ms);
if(l_res) {
log_it(L_ERROR, "No response from VPN server=%s:%d", a_ipv4_str, a_port);
// clean client struct
......@@ -543,7 +543,7 @@ int dap_chain_net_vpn_client_start(dap_chain_net_t *a_net, const char *a_ipv4_st
}
// wait connected
int timeout_ms = 5000; //5 sec = 5000 ms
int res = dap_chain_node_client_wait(s_vpn_client, NODE_CLIENT_STATE_CONNECTED, timeout_ms);
int res = dap_chain_node_client_wait(s_vpn_client, NODE_CLIENT_STATE_ESTABLISHED, timeout_ms);
if(res) {
log_it(L_ERROR, "No response from VPN server=%s:%d", a_ipv4_str, a_port);
// clean client struct
......