From 680d0fa51f0a88797e07a03877a17c1ebfaff03f Mon Sep 17 00:00:00 2001 From: Dmitriy Gerasimov <naeper@demlabs.net> Date: Wed, 15 May 2019 22:18:51 +0700 Subject: [PATCH] [*] Lot of reworks and fixes --- dap_chain_net.c | 102 +++++++++++------- dap_chain_net.h | 7 +- dap_chain_node.h | 9 -- dap_chain_node_cli_cmd.c | 105 +++++++++---------- dap_chain_node_client.c | 219 ++++++++++++++++----------------------- dap_chain_node_client.h | 30 +++--- 6 files changed, 226 insertions(+), 246 deletions(-) diff --git a/dap_chain_net.c b/dap_chain_net.c index 34c56e010d..54f6c5ecbe 100644 --- a/dap_chain_net.c +++ b/dap_chain_net.c @@ -33,12 +33,14 @@ #include "dap_string.h" #include "dap_strfuncs.h" #include "dap_config.h" +#include "dap_hash.h" #include "dap_chain_utxo.h" #include "dap_chain_net.h" #include "dap_chain_node_client.h" #include "dap_chain_node_cli.h" #include "dap_chain_node_cli_cmd.h" +#include "dap_stream_ch_chain_net_pkt.h" #include "dap_stream_ch_chain_net.h" #include "dap_stream_ch_chain.h" #include "dap_stream_ch_chain_pkt.h" @@ -75,6 +77,7 @@ typedef struct dap_chain_net_pvt{ uint8_t padding2[6]; dap_chain_net_state_t state; + dap_chain_net_state_t state_prev; dap_chain_net_state_t state_target; } dap_chain_net_pvt_t; @@ -98,7 +101,7 @@ static const char * c_net_states[]={ [NET_STATE_LINKS_ESTABLISHED]= "NET_STATE_LINKS_ESTABLISHED", [NET_STATE_SYNC_GDB]= "NET_STATE_SYNC_GDB", [NET_STATE_SYNC_CHAINS]= "NET_STATE_SYNC_CHAINS", - [NET_STATE_STAND_BY]= "NET_STATE_STAND_BY" + [NET_STATE_ONLINE]= "NET_STATE_STAND_BY" }; static dap_chain_net_t * s_net_new(const char * a_id, const char * a_name , const char * a_node_role); @@ -152,15 +155,30 @@ lb_proc_state: case NET_STATE_OFFLINE:{ log_it(L_NOTICE,"%s.state: NET_STATE_OFFLINE",l_net->pub.name); if ( PVT(l_net)->state_target != NET_STATE_OFFLINE ){ - PVT(l_net)->state_target = NET_STATE_LINKS_PING; + PVT(l_net)->state = NET_STATE_LINKS_PING; goto lb_proc_state; } } break; case NET_STATE_LINKS_PING:{ log_it(L_NOTICE,"%s.state: NET_STATE_LINKS_PING",l_net->pub.name); - PVT(l_net)->state_target = NET_STATE_LINKS_CONNECTING; - goto lb_proc_state; + if ( PVT(l_net)->state_target != NET_STATE_LINKS_PING ){ + PVT(l_net)->state = NET_STATE_LINKS_CONNECTING; + goto lb_proc_state; + }else { + PVT(l_net)->state = NET_STATE_OFFLINE; + goto lb_proc_state; + } }break; + case NET_STATE_LINKS_PONG:{ + log_it(L_NOTICE,"%s.state: NET_STATE_LINKS_PONG",l_net->pub.name); + if ( ( PVT( l_net )->state_target != NET_STATE_LINKS_PONG ) && + ( PVT( l_net )->state_target != NET_STATE_OFFLINE ) ) { + PVT(l_net)->state = NET_STATE_LINKS_CONNECTING; + }else { // target was to have a pong + PVT(l_net)->state = NET_STATE_OFFLINE; + goto lb_proc_state; + } + } case NET_STATE_LINKS_CONNECTING:{ log_it(L_NOTICE,"%s.state: NET_STATE_LINKS_CONNECTING",l_net->pub.name); if ( PVT(l_net)->node_info ) { @@ -197,7 +215,7 @@ lb_proc_state: case NET_STATE_LINKS_ESTABLISHED:{ log_it(L_NOTICE,"%s.state: NET_STATE_LINKS_ESTABLISHED",l_net->pub.name); switch (PVT(l_net)->state_target) { - case NET_STATE_STAND_BY: + case NET_STATE_ONLINE: case NET_STATE_SYNC_GDB: PVT(l_net)->state = NET_STATE_SYNC_GDB ; goto lb_proc_state; case NET_STATE_SYNC_CHAINS: PVT(l_net)->state = NET_STATE_SYNC_CHAINS ; goto lb_proc_state; default:{} @@ -207,18 +225,14 @@ lb_proc_state: // send request dap_chain_node_client_t * l_node_client = NULL, *l_node_client_tmp = NULL; HASH_ITER(hh,PVT(l_net)->links,l_node_client,l_node_client_tmp){ - size_t l_data_size_out = 0; + dap_stream_ch_chain_sync_request_t l_sync_gdb = {{0}}; // Get last timestamp in log - time_t l_timestamp_start = dap_db_log_get_last_timestamp(); - size_t l_data_send_len = 0; - uint8_t *l_data_send = dap_stream_ch_chain_net_make_packet(PVT(l_net)->node_info->hdr.address.uint64 , - l_node_client->remote_node_addr.uint64, - l_timestamp_start, NULL, 0, &l_data_send_len); - - uint8_t l_ch_id = dap_stream_ch_chain_net_get_id(); // Channel id for global_db sync - int res = dap_chain_node_client_send_chain_net_request(l_node_client, l_ch_id, - STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC, l_data_send, l_data_send_len); //, NULL); - DAP_DELETE(l_data_send); + l_sync_gdb.ts_start = (uint64_t) dap_db_log_get_last_timestamp(); + l_sync_gdb.ts_end = (uint64_t) time(NULL); + uint8_t l_ch_id = dap_stream_ch_chain_get_id(); // Channel id for global_db sync + int res = dap_chain_node_client_send_ch_pkt(l_node_client, l_ch_id, + DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB , &l_sync_gdb, + sizeof (l_sync_gdb) ); if(res != 1) { log_it(L_WARNING,"Can't send GDB sync request"); HASH_DEL(PVT(l_net)->links,l_node_client); @@ -229,7 +243,7 @@ lb_proc_state: // wait for finishing of request int timeout_ms = 120000; // 2 min = 120 sec = 120 000 ms // TODO add progress info to console - res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_END, timeout_ms); + res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); switch (res) { case 0: log_it(L_WARNING,"Timeout with link sync"); @@ -241,37 +255,55 @@ lb_proc_state: log_it(L_INFO, "Node sync error %d",res); } } - if ( PVT(l_net)->state_target == NET_STATE_STAND_BY ){ + if ( PVT(l_net)->state_target == NET_STATE_ONLINE ){ PVT(l_net)->state = NET_STATE_SYNC_CHAINS; }else { - PVT(l_net)->state = NET_STATE_STAND_BY; + PVT(l_net)->state = NET_STATE_ONLINE; } goto lb_proc_state; }break; case NET_STATE_SYNC_CHAINS:{ dap_chain_node_client_t * l_node_client = NULL, *l_node_client_tmp = NULL; + uint8_t l_ch_id = dap_stream_ch_chain_get_id(); // Channel id for global_db sync HASH_ITER(hh,PVT(l_net)->links,l_node_client,l_node_client_tmp){ - uint8_t l_ch_id = dap_stream_ch_chain_get_id(); // Channel id for global_db sync - dap_chain_t * l_chain = NULL; + dap_chain_t * l_chain = NULL; DL_FOREACH(l_net->pub.chains, l_chain ){ - dap_stream_t * l_stream = dap_client_get_stream( l_node_client->client ); - if ( l_stream ){ - dap_stream_ch_t * l_ch = l_stream->channel [l_ch_id]; - if ( l_ch ) { - dap_stream_ch_chain_pkt_t * l_chain_pkt; - size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr) + sizeof (dap_stream_ch_chain_request_t ); - dap_stream_ch_chain_request_t * l_request = (dap_stream_ch_chain_request_t *) l_chain_pkt->data; - - dap_stream_ch_pkt_write(l_ch,'b',l_chain_pkt,l_chain_pkt_size); + size_t l_lasts_size = 0; + dap_chain_atom_ptr_t * l_lasts; + dap_chain_atom_iter_t * l_atom_iter = l_chain->callback_atom_iter_create(l_chain); + l_lasts = l_chain->callback_atom_iter_get_lasts(l_atom_iter,&l_lasts_size); + if ( l_lasts ) { + dap_stream_ch_chain_sync_request_t l_request = {{0}}; + dap_hash_fast(l_lasts[0],l_chain->callback_atom_get_size(l_lasts[0]),&l_request.hash_from ); + dap_chain_node_client_send_ch_pkt(l_node_client,l_ch_id, + DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, + &l_request,sizeof (l_request) ); + // wait for finishing of request + int timeout_ms = 120000; // 2 min = 120 sec = 120 000 ms + // TODO add progress info to console + int l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); + switch (l_res) { + case 0: + log_it(L_WARNING,"Timeout with link sync"); + break; + case 1: + log_it(L_INFO, "Node sync completed"); + break; + default: + log_it(L_INFO, "Node sync error %d",l_res); } + + DAP_DELETE( l_lasts ); } + DAP_DELETE( l_atom_iter ); } + } - PVT(l_net)->state = NET_STATE_STAND_BY; + PVT(l_net)->state = NET_STATE_ONLINE; goto lb_proc_state; }break; - case NET_STATE_STAND_BY:{ + case NET_STATE_ONLINE:{ } break; } @@ -612,7 +644,7 @@ int dap_chain_net_load(const char * a_net_name) if (l_chain ) l_chain->is_datum_pool_proc = true; - PVT(l_net)->state_target = NET_STATE_STAND_BY; + PVT(l_net)->state_target = NET_STATE_ONLINE; log_it(L_INFO,"Root node role established"); } break; case NODE_ROLE_CELL_MASTER: @@ -621,12 +653,12 @@ int dap_chain_net_load(const char * a_net_name) dap_chain_id_t l_chain_id = { .raw = {0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x01} }; dap_chain_t * l_chain = dap_chain_find_by_id(l_net->pub.id, l_chain_id ); l_chain->is_datum_pool_proc = true; - PVT(l_net)->state_target = NET_STATE_STAND_BY; + PVT(l_net)->state_target = NET_STATE_ONLINE; log_it(L_INFO,"Master node role established"); } break; case NODE_ROLE_FULL:{ log_it(L_INFO,"Full node role established"); - PVT(l_net)->state_target = NET_STATE_STAND_BY; + PVT(l_net)->state_target = NET_STATE_ONLINE; } break; case NODE_ROLE_LIGHT: default: diff --git a/dap_chain_net.h b/dap_chain_net.h index f0b183e741..f1ff037a3f 100644 --- a/dap_chain_net.h +++ b/dap_chain_net.h @@ -37,11 +37,12 @@ typedef enum dap_chain_net_state{ NET_STATE_OFFLINE = 0, NET_STATE_LINKS_PING, + NET_STATE_LINKS_PONG, NET_STATE_LINKS_CONNECTING, NET_STATE_LINKS_ESTABLISHED, NET_STATE_SYNC_GDB, NET_STATE_SYNC_CHAINS, - NET_STATE_STAND_BY, + NET_STATE_ONLINE, } dap_chain_net_state_t; @@ -64,12 +65,12 @@ int dap_chain_net_load(const char * a_net_name); int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_new_state); -inline static int dap_chain_net_start(dap_chain_net_t * a_net){ return dap_chain_net_state_go_to(a_net,NET_STATE_STAND_BY); } +inline static int dap_chain_net_start(dap_chain_net_t * a_net){ return dap_chain_net_state_go_to(a_net,NET_STATE_ONLINE); } inline static int dap_chain_net_stop(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_OFFLINE); } inline static int dap_chain_net_links_establish(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_LINKS_ESTABLISHED); } inline static int dap_chain_net_sync_chains(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_SYNC_CHAINS); } inline static int dap_chain_net_sync_gdb(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_SYNC_GDB); } -inline static int dap_chain_net_sync_all(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_STAND_BY); } +inline static int dap_chain_net_sync_all(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_ONLINE); } void dap_chain_net_delete( dap_chain_net_t * a_net); void dap_chain_net_proc_datapool (dap_chain_net_t * a_net); diff --git a/dap_chain_node.h b/dap_chain_node.h index be639a86f1..8d3be42b35 100644 --- a/dap_chain_node.h +++ b/dap_chain_node.h @@ -32,15 +32,6 @@ #include "dap_chain_common.h" #include "dap_chain_global_db.h" -/** - * @struct Node address - * - */ -typedef union dap_chain_node_addr{ - uint64_t uint64; - uint8_t raw[sizeof(uint64_t)]; // Access to selected octects -} DAP_ALIGN_PACKED dap_chain_node_addr_t; - /** * Node Declaration request * diff --git a/dap_chain_node_cli_cmd.c b/dap_chain_node_cli_cmd.c index 4c913f55c3..e155b56e74 100644 --- a/dap_chain_node_cli_cmd.c +++ b/dap_chain_node_cli_cmd.c @@ -37,6 +37,10 @@ #include <dirent.h> #include "iputils/iputils.h" + +#include "uthash.h" +#include "utlist.h" + #include "dap_string.h" #include "dap_hash.h" #include "dap_chain_common.h" @@ -60,6 +64,9 @@ #include "dap_chain_global_db_remote.h" #include "dap_stream_ch_chain_net.h" +#include "dap_stream_ch_chain.h" +#include "dap_stream_ch_chain_pkt.h" +#include "dap_stream_ch_chain_net_pkt.h" #define LOG_TAG "chain_node_cli_cmd" @@ -599,7 +606,7 @@ static int com_global_db_cur_node_set(dap_chain_node_info_t *a_node_info, const * str_reply[out] for reply * return 0 Ok, -1 error */ -static int com_global_db_cur_node_set_from_remote(dap_chain_node_info_t *a_node_info, const char *a_alias_str, char **a_str_reply) +static int com_node_request_addr(dap_chain_node_info_t *a_node_info, const char *a_alias_str, char **a_str_reply) { if(!a_node_info->hdr.address.uint64 && !a_alias_str) { dap_chain_node_cli_set_reply_text(a_str_reply, "addr not found"); @@ -645,8 +652,8 @@ static int com_global_db_cur_node_set_from_remote(dap_chain_node_info_t *a_node_ } // send request - res = dap_chain_node_client_send_chain_net_request(client, dap_stream_ch_chain_net_get_id(), - STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR, (char*) &l_node_info->hdr.address.uint64, sizeof(uint64_t)); //, NULL); + res = dap_chain_node_client_send_ch_pkt(client, dap_stream_ch_chain_net_get_id(), + DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE_REQUEST,NULL,0); if(res != 1) { dap_chain_node_cli_set_reply_text(a_str_reply, "no request sent"); // clean client struct @@ -666,12 +673,7 @@ static int com_global_db_cur_node_set_from_remote(dap_chain_node_info_t *a_node_ dap_chain_node_cli_set_reply_text(a_str_reply, "timeout"); return -1; case 1: { - uint64_t addr = 0; - if(client->recv_data_len == sizeof(uint64_t)) - memcpy(&addr, client->recv_data, sizeof(uint64_t)); - if(client->recv_data_len > 0) - DAP_DELETE(client->recv_data); - client->recv_data = NULL; + uint64_t addr = dap_db_get_cur_node_addr(); dap_chain_node_cli_set_reply_text(a_str_reply, "new address for remote node has been set 0x%x", addr); } @@ -801,7 +803,7 @@ int com_global_db(int a_argc, const char ** a_argv, char **a_str_reply) return com_global_db_cur_node_set(l_node_info, alias_str, a_str_reply); case CMD_CUR_NODE_SET_FROM_REMOTE: // handler of command 'global_db node remote_set' - return com_global_db_cur_node_set_from_remote(l_node_info, alias_str, a_str_reply); + return com_node_request_addr(l_node_info, alias_str, a_str_reply); default: dap_chain_node_cli_set_reply_text(a_str_reply, "command %s not recognized", a_argv[1]); @@ -910,69 +912,58 @@ int com_node(int argc, const char ** argv, char **str_reply) return -1; } - dap_chain_node_info_t *node_info = dap_chain_node_info_read_and_reply(&address, str_reply); - if(!node_info) { + dap_chain_node_info_t *l_remote_node_info = dap_chain_node_info_read_and_reply(&address, str_reply); + if(!l_remote_node_info) { return -1; } // start connect - dap_chain_node_client_t *client = dap_chain_node_client_connect(node_info); - if(!client) { + dap_chain_node_client_t *l_node_client = dap_chain_node_client_connect(l_remote_node_info); + if(!l_node_client) { dap_chain_node_cli_set_reply_text(str_reply, "can't connect"); - DAP_DELETE(node_info); + DAP_DELETE(l_remote_node_info); return -1; } // wait connected int timeout_ms = 15000; //15 sec = 15000 ms - int res = dap_chain_node_client_wait(client, NODE_CLIENT_STATE_CONNECTED, timeout_ms); + int res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_CONNECTED, timeout_ms); if(res != 1) { dap_chain_node_cli_set_reply_text(str_reply, "no response from node"); // clean client struct - dap_chain_node_client_close(client); - DAP_DELETE(node_info); + dap_chain_node_client_close(l_node_client); + DAP_DELETE(l_remote_node_info); return -1; } - // send request - size_t l_data_size_out = 0; - // Get last timestamp in log - time_t l_timestamp_start = dap_db_log_get_last_timestamp(); - size_t l_data_send_len = 0; - uint8_t *l_data_send = dap_stream_ch_chain_net_make_packet(l_cur_node_addr.uint64, node_info->hdr.address.uint64, - l_timestamp_start, NULL, 0, &l_data_send_len); - - uint8_t l_ch_id = dap_stream_ch_chain_net_get_id(); // Channel id for global_db sync - res = dap_chain_node_client_send_chain_net_request(client, l_ch_id, - STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC, l_data_send, l_data_send_len); //, NULL); - DAP_DELETE(l_data_send); - if(res != 1) { - dap_chain_node_cli_set_reply_text(str_reply, "no request sent"); - // clean client struct - dap_chain_node_client_close(client); - DAP_DELETE(node_info); - return -1; - } + dap_chain_t *l_chain = NULL; + + DL_FOREACH(l_net->pub.chains, l_chain) { + // send request + // Get last timestamp in log + dap_stream_ch_chain_sync_request_t l_sync_request = {{0}}; + l_sync_request.ts_start = (uint64_t) dap_db_log_get_last_timestamp(); + dap_stream_ch_t * l_ch_chain = dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id() ); + if( 0 == dap_stream_ch_chain_pkt_write(l_ch_chain,DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_ALL, + l_net->pub.id, l_chain->id ,l_remote_node_info->hdr.cell_id,&l_sync_request, + sizeof (l_sync_request))) { + dap_chain_node_cli_set_reply_text(str_reply, "no request sent"); + // clean client struct + dap_chain_node_client_close(l_node_client); + DAP_DELETE(l_remote_node_info); + return -1; + } - // wait for finishing of request - timeout_ms = 120000; // 2 min = 120 sec = 120 000 ms - // TODO add progress info to console - res = dap_chain_node_client_wait(client, NODE_CLIENT_STATE_END, timeout_ms); - DAP_DELETE(node_info); - dap_client_disconnect(client->client); - dap_chain_node_client_close(client); - switch (res) { - case 0: - dap_chain_node_cli_set_reply_text(str_reply, "timeout"); - return -1; - case 1: - dap_chain_node_cli_set_reply_text(str_reply, "nodes sync completed"); - return 0; - default: - dap_chain_node_cli_set_reply_text(str_reply, "error"); - return -1; + // wait for finishing of request + timeout_ms = 120000; // 2 min = 120 sec = 120 000 ms + // TODO add progress info to console + res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); } + DAP_DELETE(l_remote_node_info); + dap_client_disconnect(l_node_client->client); + dap_chain_node_client_close(l_node_client); + dap_chain_node_cli_set_reply_text(str_reply, "Node sync completed"); + return 0; - } - break; + } break; // make handshake case CMD_HANDSHAKE: { // get address from alias if addr not defined @@ -2327,7 +2318,7 @@ int com_print_log(int argc, const char ** argv, char **str_reply) } // get logs from list - char *l_str_ret = log_get_item(l_ts_after, l_limit); + char *l_str_ret = dap_log_get_item(l_ts_after, l_limit); if(!l_str_ret) { dap_chain_node_cli_set_reply_text(str_reply, "no logs"); return -1; diff --git a/dap_chain_node_client.c b/dap_chain_node_client.c index 28b09a0a53..9a29a093ef 100644 --- a/dap_chain_node_client.c +++ b/dap_chain_node_client.c @@ -34,6 +34,8 @@ #include "dap_http_client_simple.h" #include "dap_client_pvt.h" #include "dap_stream_ch_pkt.h" +#include "dap_stream_ch_chain.h" +#include "dap_stream_ch_chain_pkt.h" #include "dap_stream_ch_chain_net.h" #include "dap_stream_ch_chain_net_pkt.h" #include "dap_stream_pkt.h" @@ -46,7 +48,15 @@ #define SYSTEM_CONFIGS_DIR SYSTEM_PREFIX"/etc" static int listen_port_tcp = 8079; +static void s_stage_end_callback(dap_client_t *a_client, void *a_arg); +static void s_ch_chain_callback_notify_packet(dap_stream_ch_chain_t*, uint8_t a_pkt_type, + dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, + void * a_arg); +/** + * @brief dap_chain_node_client_init + * @return + */ int dap_chain_node_client_init(void) { dap_config_t *g_config; @@ -64,38 +74,99 @@ int dap_chain_node_client_init(void) return 0; } +/** + * @brief dap_chain_node_client_deinit + */ void dap_chain_node_client_deinit() { dap_http_client_simple_deinit(); dap_client_deinit(); } -// callback for dap_client_new() in chain_node_client_connect() +/** + * @brief stage_status_callback + * @param a_client + * @param a_arg + */ static void stage_status_callback(dap_client_t *a_client, void *a_arg) { + (void) a_client; + (void) a_arg; + //printf("* stage_status_callback client=%x data=%x\n", a_client, a_arg); } -// callback for dap_client_new() in chain_node_client_connect() -static void stage_status_error_callback(dap_client_t *a_client, void *a_arg) + +/** + * @brief s_stage_status_error_callback + * @param a_client + * @param a_arg + */ +static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) { + (void) a_arg; + if ( DAP_CHAIN_NODE_CLIENT(a_client)->keep_connection && + ( ( dap_client_get_stage(a_client) != STAGE_STREAM_STREAMING )|| + ( dap_client_get_stage_status(a_client) == STAGE_STATUS_ERROR ) ) ){ + log_it(L_NOTICE,"Some errors happends, current state is %s but we need to return back to STAGE_STREAM_STREAMING", + dap_client_get_stage_str(a_client) ) ; + dap_client_go_stage( a_client , STAGE_STREAM_STREAMING, s_stage_end_callback ); + } //printf("* tage_status_error_callback client=%x data=%x\n", a_client, a_arg); } -// callback for the end of connection in dap_chain_node_client_connect()->dap_client_go_stage() -static void a_stage_end_callback(dap_client_t *a_client, void *a_arg) +/** + * @brief a_stage_end_callback + * @param a_client + * @param a_arg + */ +static void s_stage_end_callback(dap_client_t *a_client, void *a_arg) { dap_chain_node_client_t *l_node_client = a_client->_inheritor; assert(l_node_client); if(l_node_client) { pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_CONNECTED; + + dap_stream_ch_t * l_ch = dap_client_get_stream_ch( a_client , dap_stream_ch_chain_get_id() ); + if (l_ch){ + dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + l_ch_chain->notify_callback = s_ch_chain_callback_notify_packet; + l_ch_chain->notify_callback_arg = l_node_client; + } + pthread_mutex_unlock(&l_node_client->wait_mutex); if ( l_node_client->callback_connected ) l_node_client->callback_connected(l_node_client,a_arg); + l_node_client->keep_connection = true; pthread_cond_signal(&l_node_client->wait_cond); - pthread_mutex_unlock(&l_node_client->wait_mutex); } } +/** + * @brief s_ch_chain_callback_notify_packet + * @param a_pkt_type + * @param a_pkt + * @param a_pkt_data_size + * @param a_arg + */ +static void s_ch_chain_callback_notify_packet(dap_stream_ch_chain_t* a_ch_chain, uint8_t a_pkt_type, + dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, + void * a_arg) +{ + dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; + switch (a_pkt_type) { + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS:{ + pthread_mutex_lock(&l_node_client->wait_mutex); + l_node_client->state = NODE_CLIENT_STATE_SYNCED; + pthread_mutex_unlock(&l_node_client->wait_mutex); + pthread_cond_signal(&l_node_client->wait_cond); + } + default:{} + } +} + + /** * Create connection to server * @@ -113,8 +184,9 @@ dap_chain_node_client_t* dap_chain_node_client_connect(dap_chain_node_info_t *no pthread_cond_init(&l_node_client->wait_cond, &attr); pthread_mutex_init(&l_node_client->wait_mutex, NULL); l_node_client->events = NULL; //dap_events_new(); - l_node_client->client = dap_client_new(l_node_client->events, stage_status_callback, stage_status_error_callback); + l_node_client->client = dap_client_new(l_node_client->events, stage_status_callback, s_stage_status_error_callback); l_node_client->client->_inheritor = l_node_client; + dap_client_set_active_channels(l_node_client->client,"NC"); int hostlen = 128; char host[hostlen]; @@ -139,7 +211,7 @@ dap_chain_node_client_t* dap_chain_node_client_connect(dap_chain_node_info_t *no l_node_client->state = NODE_CLIENT_STATE_CONNECT; // Handshake & connect - dap_client_go_stage(l_node_client->client, a_stage_target, a_stage_end_callback); + dap_client_go_stage(l_node_client->client, a_stage_target, s_stage_end_callback); return l_node_client; } @@ -158,136 +230,29 @@ void dap_chain_node_client_close(dap_chain_node_client_t *a_client) } } -/* - // callback for dap_client_request_enc() in client_mempool_send_datum() - static void s_response_proc(dap_client_t *a_client, void *str, size_t str_len) - { - printf("* s_response_proc a_client=%x str=%s str_len=%d\n", a_client, str, str_len); - dap_chain_node_client_t *l_client = a_client->_inheritor; - assert(l_client); - if(l_client) { - if(str_len > 0) { - // l_client->read_data_t.data = DAP_NEW_Z_SIZE(uint8_t, str_len + 1); - // if(l_client->read_data_t.data) { - // memcpy(l_client->read_data_t.data, str, str_len); - // l_client->read_data_t.data_len = str_len; - } - } - pthread_mutex_lock(&l_client->wait_mutex); - l_client->state = NODE_CLIENT_STATE_SENDED; - pthread_cond_signal(&l_client->wait_cond); - pthread_mutex_unlock(&l_client->wait_mutex); - } - */ - -/*// callback for dap_client_request_enc() in client_mempool_send_datum() - static void s_response_error(dap_client_t *a_client, int val) - { - printf("* s_response_error a_client=%x val=%d\n", a_client, val); - client_mempool_t *mempool = a_client->_inheritor; - assert(mempool); - if(mempool) { - pthread_mutex_lock(&mempool->wait_mutex); - mempool->state = CLIENT_MEMPOOL_ERROR; - pthread_cond_signal(&mempool->wait_cond); - pthread_mutex_unlock(&mempool->wait_mutex); - } - } - - // set new state and delete previous read data - static void dap_chain_node_client_reset(dap_chain_node_client_t *a_client, int new_state) - { - if(!a_client) - return; - pthread_mutex_lock(&a_client->wait_mutex); - //a_client->read_data_t.data_len = 0; - //DAP_DELETE(a_client->read_data_t.data); - //a_client->read_data_t.data = NULL; - a_client->state = new_state; - pthread_mutex_unlock(&a_client->wait_mutex); - }*/ - -static void dap_chain_node_client_callback(dap_stream_ch_chain_net_pkt_t *a_ch_chain_net, size_t a_data_size, - void *a_arg) -{ - dap_chain_node_client_t *client = (dap_chain_node_client_t*) a_arg; - assert(client); - // end of session - if(!a_ch_chain_net) - { - pthread_mutex_lock(&client->wait_mutex); - client->state = NODE_CLIENT_STATE_END; - pthread_cond_signal(&client->wait_cond); - pthread_mutex_unlock(&client->wait_mutex); - return; - } - - int l_state; - //printf("*callback type=%d\n", a_ch_chain_net->hdr.type); - - switch (a_ch_chain_net->hdr.type) { - case STREAM_CH_CHAIN_NET_PKT_TYPE_PING: - l_state = NODE_CLIENT_STATE_PING; - break; - case STREAM_CH_CHAIN_NET_PKT_TYPE_PONG: - l_state = NODE_CLIENT_STATE_PONG; - break; - case STREAM_CH_CHAIN_NET_PKT_TYPE_GET_NODE_ADDR: - l_state = NODE_CLIENT_STATE_GET_NODE_ADDR; - client->recv_data_len = a_data_size; - if(client->recv_data_len > 0) { - client->recv_data = DAP_NEW_SIZE(uint8_t, a_data_size); - memcpy(client->recv_data, a_ch_chain_net->data, a_data_size); - } - - break; - case STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR: - l_state = NODE_CLIENT_STATE_SET_NODE_ADDR; - break; -// case STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB: -// l_state = NODE_CLIENT_STATE_CONNECTED; -// break; - - default: - l_state = NODE_CLIENT_STATE_ERROR; - - } - if(client) - { - pthread_mutex_lock(&client->wait_mutex); - client->state = l_state; - pthread_cond_signal(&client->wait_cond); - pthread_mutex_unlock(&client->wait_mutex); - } -} /** * Send stream request to server */ -int dap_chain_node_client_send_chain_net_request(dap_chain_node_client_t *a_client, uint8_t a_ch_id, uint8_t a_type, - char *a_buf, size_t a_buf_size) +int dap_chain_node_client_send_ch_pkt(dap_chain_node_client_t *a_client, uint8_t a_ch_id, uint8_t a_type, + const void *a_pkt_data, size_t a_pkt_data_size) { if(!a_client || a_client->state < NODE_CLIENT_STATE_CONNECTED) return -1; - dap_stream_t *l_stream = dap_client_get_stream(a_client->client); + +// dap_stream_t *l_stream = dap_client_get_stream(a_client->client); dap_stream_ch_t * l_ch = dap_client_get_stream_ch(a_client->client, a_ch_id); - if(l_ch) - { - dap_stream_ch_chain_net_t * l_ch_chain = DAP_STREAM_CH_CHAIN_NET(l_ch); - l_ch_chain->notify_callback = dap_chain_node_client_callback; - l_ch_chain->notify_callback_arg = a_client; - int l_res = dap_stream_ch_chain_net_pkt_write(l_ch, a_type, a_buf, a_buf_size); - if(l_res <= 0) - return -1; - bool is_ready = true; - dap_events_socket_set_writable(l_ch->stream->events_socket, is_ready); - //dap_stream_ch_ready_to_write(ch, true); - } - else + if(l_ch){ +// dap_stream_ch_chain_net_t * l_ch_chain = DAP_STREAM_CH_CHAIN_NET(l_ch); + + dap_stream_ch_pkt_write(l_ch, a_type, a_pkt_data, a_pkt_data_size); + dap_stream_ch_set_ready_to_write(l_ch, true); + return 0; + }else return -1; - return 1; } + /** * wait for the complete of request * diff --git a/dap_chain_node_client.h b/dap_chain_node_client.h index 46158cf733..b371460835 100644 --- a/dap_chain_node_client.h +++ b/dap_chain_node_client.h @@ -31,19 +31,18 @@ // connection states typedef enum dap_chain_node_client_state{ NODE_CLIENT_STATE_ERROR = -1, - NODE_CLIENT_STATE_INIT, - NODE_CLIENT_STATE_GET_NODE_ADDR, - NODE_CLIENT_STATE_SET_NODE_ADDR, - NODE_CLIENT_STATE_PING, - NODE_CLIENT_STATE_PONG, - NODE_CLIENT_STATE_CONNECT, - NODE_CLIENT_STATE_CONNECTED, + NODE_CLIENT_STATE_INIT=0, + NODE_CLIENT_STATE_GET_NODE_ADDR=1, + NODE_CLIENT_STATE_SET_NODE_ADDR=2, + NODE_CLIENT_STATE_PING=3, + NODE_CLIENT_STATE_PONG=4, + NODE_CLIENT_STATE_CONNECT=5, + NODE_CLIENT_STATE_CONNECTED=100, //NODE_CLIENT_STATE_SEND, //NODE_CLIENT_STATE_SENDED, - NODE_CLIENT_STATE_SYNC_GDB, - NODE_CLIENT_STATE_SYNC_CHAINS, - NODE_CLIENT_STATE_STANDBY, - NODE_CLIENT_STATE_END + NODE_CLIENT_STATE_SYNC_GDB=101, + NODE_CLIENT_STATE_SYNC_CHAINS=102, + NODE_CLIENT_STATE_SYNCED=103 } dap_chain_node_client_state_t; typedef struct dap_chain_node_client dap_chain_node_client_t; @@ -60,15 +59,16 @@ typedef struct dap_chain_node_client { dap_chain_node_client_callback_t callback_connected; pthread_cond_t wait_cond; pthread_mutex_t wait_mutex; - uint8_t *recv_data; - size_t recv_data_len; // For hash indexing UT_hash_handle hh; dap_chain_node_addr_t remote_node_addr; struct in_addr remote_ipv4; struct in6_addr remote_ipv6; + + bool keep_connection; } dap_chain_node_client_t; +#define DAP_CHAIN_NODE_CLIENT(a) ( (dap_chain_node_client_t *) (a)->_inheritor ) int dap_chain_node_client_init(void); @@ -92,8 +92,8 @@ void dap_chain_node_client_close(dap_chain_node_client_t *client); /** * Send stream request to server */ -int dap_chain_node_client_send_chain_net_request(dap_chain_node_client_t *a_client, uint8_t a_ch_id, uint8_t a_type, - char *a_buf, size_t a_buf_size); +int dap_chain_node_client_send_ch_pkt(dap_chain_node_client_t *a_client, uint8_t a_ch_id, uint8_t a_type, + const void *a_buf, size_t a_buf_size); /** * wait for the complete of request -- GitLab