Skip to content
Snippets Groups Projects
Commit d4ffd61a authored by pavel.uhanov's avatar pavel.uhanov
Browse files

[*] add rpc role

parent b6e00ee2
No related branches found
No related tags found
1 merge request!2120Draft: feature-15728
Pipeline #52271 failed with stage
in 11 minutes and 36 seconds
...@@ -65,9 +65,7 @@ int dap_chain_node_cli_init(dap_config_t * g_config) ...@@ -65,9 +65,7 @@ int dap_chain_node_cli_init(dap_config_t * g_config)
s_debug_cli = dap_config_get_item_bool_default(g_config, "cli-server", "debug-cli", false); s_debug_cli = dap_config_get_item_bool_default(g_config, "cli-server", "debug-cli", false);
if ( dap_cli_server_init(s_debug_cli, "cli-server") ) if ( dap_cli_server_init(s_debug_cli, "cli-server") )
return log_it(L_ERROR, "Can't init CLI server!"), -1; return log_it(L_ERROR, "Can't init CLI server!"), -1;
if (dap_config_get_item_bool_default(g_config, "cli-server", "rpc", false)) { dap_chain_node_rpc_init(g_config);
dap_chain_node_rpc_init(g_config);
}
dap_cli_server_cmd_add("global_db", com_global_db, "Work with global database", dap_cli_server_cmd_add("global_db", com_global_db, "Work with global database",
"global_db flush\n" "global_db flush\n"
......
...@@ -19,39 +19,35 @@ ...@@ -19,39 +19,35 @@
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <stdlib.h> #include "dap_chain_node_rpc.h"
#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <stddef.h>
#include <stdint.h>
#include <string.h>
#include <sys/sysinfo.h>
#include <sys/vfs.h>
#include "dap_chain_net.h"
#include "dap_global_db.h" #include "dap_global_db.h"
#include "dap_global_db_cluster.h"
#include "dap_stream.h" #include "dap_stream.h"
#define LOG_TAG "dap_chain_node_rpc" #define LOG_TAG "dap_chain_node_rpc"
#define DAP_CHAIN_NODE_RPC_STATES_INFO_CURRENT_VERSION 1 #define DAP_CHAIN_NODE_RPC_STATES_INFO_CURRENT_VERSION 1
typedef struct dap_chain_node_rpc_states_info
{ typedef enum {
uint32_t version; RPC_ROLE_INVALID = 0,
dap_chain_node_addr_t address; RPC_ROLE_BALANCER,
uint32_t location; RPC_ROLE_SERVER,
uint32_t links_count; RPC_ROLE_ROOT
uint32_t cli_thread_count; } rpc_role_t;
struct sysinfo sysinfo;
} DAP_ALIGN_PACKED dap_chain_node_rpc_states_info_t;
static const uint64_t s_timer_update_states_info = 10 /*sec*/ * 1000; static const uint64_t s_timer_update_states_info = 10 /*sec*/ * 1000;
static const char s_rpc_states_group[] = "rpc.states"; static const char s_rpc_server_states_group[] = "rpc.states";
static const char s_rpc_list_group[] = "rpc.list"; static const char s_rpc_node_list_group[] = "rpc.list";
static dap_global_db_cluster_t *s_rpc_states_cluster = NULL; static dap_global_db_cluster_t *s_rpc_server_states_cluster = NULL;
static dap_global_db_cluster_t *s_rpc_list_cluster = NULL; static dap_global_db_cluster_t *s_rpc_node_list_cluster = NULL;
DAP_STATIC_INLINE s_get_role_from_str(const char *a_str)
{
if (!a_str) return RPC_ROLE_INVALID;
if (!strcmp(a_str, "balancer")) return RPC_ROLE_BALANCER;
if (!strcmp(a_str, "server")) return RPC_ROLE_SERVER;
if (!strcmp(a_str, "root")) return RPC_ROLE_SERVER;
return RPC_ROLE_INVALID;
}
/** /**
* @brief get states info about current * @brief get states info about current
...@@ -66,32 +62,49 @@ static void s_update_node_rpc_states_info(UNUSED_ARG void *a_arg) ...@@ -66,32 +62,49 @@ static void s_update_node_rpc_states_info(UNUSED_ARG void *a_arg)
sysinfo(&l_info->sysinfo); sysinfo(&l_info->sysinfo);
const char *l_node_addr_str = dap_stream_node_addr_to_str_static(l_info->address); const char *l_node_addr_str = dap_stream_node_addr_to_str_static(l_info->address);
dap_global_db_set_sync(s_rpc_states_group, l_node_addr_str, l_info, sizeof(dap_chain_node_rpc_states_info_t), false); dap_global_db_set_sync(s_rpc_server_states_group, l_node_addr_str, l_info, sizeof(dap_chain_node_rpc_states_info_t), false);
DAP_DELETE(l_info); DAP_DELETE(l_info);
} }
static int s_rpc_node_cmp(dap_list_t *a_list1, dap_list_t *a_list2)
{
return 0;
}
void dap_chain_node_rpc_init(dap_config_t *a_cfg) void dap_chain_node_rpc_init(dap_config_t *a_cfg)
{ {
if (!(s_rpc_states_cluster = dap_global_db_cluster_add( rpc_role_t l_role = s_get_role_from_str(dap_config_get_item_str(a_cfg, "rpc", "role"));
dap_global_db_instance_get_default(), DAP_STREAM_CLUSTER_GLOBAL,
*(dap_guuid_t *)&uint128_0, s_rpc_states_group, if (l_role == RPC_ROLE_SERVER || l_role == RPC_ROLE_BALANCER) {
0, if (!(s_rpc_server_states_cluster = dap_global_db_cluster_add(
true, DAP_GDB_MEMBER_ROLE_USER, DAP_CLUSTER_TYPE_EMBEDDED))) dap_global_db_instance_get_default(), DAP_STREAM_CLUSTER_GLOBAL,
return; *(dap_guuid_t *)&uint128_0, s_rpc_server_states_group,
if (!(s_rpc_list_cluster = dap_global_db_cluster_add( 0,
dap_global_db_instance_get_default(), DAP_STREAM_CLUSTER_GLOBAL, true, DAP_GDB_MEMBER_ROLE_USER, DAP_CLUSTER_TYPE_EMBEDDED)))
*(dap_guuid_t *)&uint128_0, s_rpc_list_group, {
0, log_it(L_ERROR, "Can't create rpc server states cluster");
true, DAP_GDB_MEMBER_ROLE_GUEST, DAP_CLUSTER_TYPE_EMBEDDED))) return;
return; }
dap_stream_node_addr_t *l_authorized_nodes = NULL; if (l_role == RPC_ROLE_SERVER && dap_proc_thread_timer_add(NULL, s_update_node_rpc_states_info, NULL, s_timer_update_states_info))
uint16_t l_authorized_nodes_count = 0; log_it(L_ERROR, "Can't activate timer on node states update");
dap_config_stream_addrs_parse(a_cfg, "cli-server", "authorized_nodes_addrs_rpc", &l_authorized_nodes, &l_authorized_nodes_count); }
for (uint16_t i = 0; i < l_authorized_nodes_count; ++i) if (l_role == RPC_ROLE_ROOT || l_role == RPC_ROLE_BALANCER) {
dap_global_db_cluster_member_add(s_rpc_list_cluster, l_authorized_nodes + i, DAP_GDB_MEMBER_ROLE_ROOT); if (!(s_rpc_node_list_cluster = dap_global_db_cluster_add(
DAP_DELETE(l_authorized_nodes); dap_global_db_instance_get_default(), DAP_STREAM_CLUSTER_GLOBAL,
if (dap_proc_thread_timer_add(NULL, s_update_node_rpc_states_info, NULL, s_timer_update_states_info)) *(dap_guuid_t *)&uint128_0, s_rpc_node_list_group,
log_it(L_ERROR, "Can't activate timer on node states update"); 0,
true, DAP_GDB_MEMBER_ROLE_GUEST, DAP_CLUSTER_TYPE_EMBEDDED)))
{
log_it(L_ERROR, "Can't create rpc node list cluster");
return;
}
dap_stream_node_addr_t *l_authorized_nodes = NULL;
uint16_t l_authorized_nodes_count = 0;
dap_config_stream_addrs_parse(a_cfg, "rpc", "authorized_nodes_addrs", &l_authorized_nodes, &l_authorized_nodes_count);
for (uint16_t i = 0; i < l_authorized_nodes_count; ++i)
dap_global_db_cluster_member_add(s_rpc_node_list_cluster, l_authorized_nodes + i, DAP_GDB_MEMBER_ROLE_ROOT);
DAP_DELETE(l_authorized_nodes);
}
} }
/** /**
...@@ -104,7 +117,7 @@ dap_string_t *dap_chain_node_rpc_states_info_read(dap_stream_node_addr_t a_addr) ...@@ -104,7 +117,7 @@ dap_string_t *dap_chain_node_rpc_states_info_read(dap_stream_node_addr_t a_addr)
size_t l_data_size = 0; size_t l_data_size = 0;
dap_string_t *l_ret = dap_string_new(""); dap_string_t *l_ret = dap_string_new("");
const char *l_node_addr_str = dap_stream_node_addr_to_str_static(a_addr.uint64 ? a_addr : g_node_addr); const char *l_node_addr_str = dap_stream_node_addr_to_str_static(a_addr.uint64 ? a_addr : g_node_addr);
dap_chain_node_rpc_states_info_t *l_node_info = (dap_chain_node_rpc_states_info_t *)dap_global_db_get_sync(s_rpc_states_group, l_node_addr_str, &l_data_size, NULL, &l_timestamp); dap_chain_node_rpc_states_info_t *l_node_info = (dap_chain_node_rpc_states_info_t *)dap_global_db_get_sync(s_rpc_server_states_group, l_node_addr_str, &l_data_size, NULL, &l_timestamp);
if (!l_node_info) { if (!l_node_info) {
log_it(L_ERROR, "Can't find state of rpc node %s", l_node_addr_str); log_it(L_ERROR, "Can't find state of rpc node %s", l_node_addr_str);
dap_string_append_printf(l_ret, "Can't find state of %s rpc node", l_node_addr_str); dap_string_append_printf(l_ret, "Can't find state of %s rpc node", l_node_addr_str);
...@@ -123,7 +136,7 @@ dap_string_t *dap_chain_node_rpc_states_info_read(dap_stream_node_addr_t a_addr) ...@@ -123,7 +136,7 @@ dap_string_t *dap_chain_node_rpc_states_info_read(dap_stream_node_addr_t a_addr)
bool dap_chain_node_rpc_is_my_node_authorized() bool dap_chain_node_rpc_is_my_node_authorized()
{ {
return dap_cluster_member_find_role(s_rpc_list_cluster->role_cluster, &g_node_addr) == DAP_GDB_MEMBER_ROLE_ROOT; return dap_cluster_member_find_role(s_rpc_node_list_cluster->role_cluster, &g_node_addr) == DAP_GDB_MEMBER_ROLE_ROOT;
} }
/** /**
...@@ -135,7 +148,7 @@ int dap_chain_node_rpc_info_save(dap_chain_node_info_t *a_node_info) ...@@ -135,7 +148,7 @@ int dap_chain_node_rpc_info_save(dap_chain_node_info_t *a_node_info)
{ {
return !a_node_info || !a_node_info->address.uint64 return !a_node_info || !a_node_info->address.uint64
? log_it(L_ERROR,"Can't save node rpc info, %s", a_node_info ? "null arg" : "zero address"), -1 ? log_it(L_ERROR,"Can't save node rpc info, %s", a_node_info ? "null arg" : "zero address"), -1
: dap_global_db_set_sync( s_rpc_list_group, : dap_global_db_set_sync( s_rpc_node_list_group,
dap_stream_node_addr_to_str_static(a_node_info->address), dap_stream_node_addr_to_str_static(a_node_info->address),
a_node_info, a_node_info,
dap_chain_node_info_get_size(a_node_info), false ); dap_chain_node_info_get_size(a_node_info), false );
...@@ -149,7 +162,7 @@ dap_string_t *dap_chain_node_rpc_list() ...@@ -149,7 +162,7 @@ dap_string_t *dap_chain_node_rpc_list()
{ {
dap_string_t *l_ret = dap_string_new("RPC node list:\n"); dap_string_t *l_ret = dap_string_new("RPC node list:\n");
size_t l_nodes_count = 0; size_t l_nodes_count = 0;
dap_global_db_obj_t *l_objs = dap_global_db_get_all_sync(s_rpc_list_group, &l_nodes_count); dap_global_db_obj_t *l_objs = dap_global_db_get_all_sync(s_rpc_node_list_group, &l_nodes_count);
if(!l_nodes_count || !l_objs) { if(!l_nodes_count || !l_objs) {
dap_string_append_printf(l_ret, "No records\n"); dap_string_append_printf(l_ret, "No records\n");
...@@ -176,4 +189,29 @@ dap_string_t *dap_chain_node_rpc_list() ...@@ -176,4 +189,29 @@ dap_string_t *dap_chain_node_rpc_list()
} }
dap_global_db_objs_delete(l_objs, l_nodes_count); dap_global_db_objs_delete(l_objs, l_nodes_count);
return l_ret; return l_ret;
}
/**
* @brief get states rpc info about current
* @param a_arg - pointer to callback arg
*/
dap_list_t *dap_chain_node_rpc_get_sorted_list(size_t *a_count)
{
size_t l_count = 0;
dap_list_t *l_ret = NULL;
dap_global_db_obj_t *l_nodes_obj = dap_global_db_get_all_sync(s_rpc_node_list_group, &l_count);
for (size_t i = 0; i < l_count; ++i) {
size_t l_data_size = 0;
dap_chain_node_info_t *l_node_info_curr = dap_global_db_get_sync(s_rpc_server_states_group, (l_nodes_obj + i)->key, &l_data_size, NULL, NULL);
if (!l_node_info_curr) {
log_it(L_ERROR, "Can't find info about rpc node %s", (l_nodes_obj + i)->key);
continue;
}
if (l_data_size != sizeof(l_node_info_curr)) {
log_it(L_ERROR, "Error data size in rpc node state, get %zu expected %zu", l_data_size, sizeof(l_node_info_curr));
continue;
}
l_ret = dap_list_insert_sorted(l_ret, (void *)l_node_info_curr, s_rpc_node_cmp);
}
return l_ret;
} }
\ No newline at end of file
...@@ -18,6 +18,18 @@ ...@@ -18,6 +18,18 @@
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <sys/sysinfo.h>
#include "dap_chain_node.h"
#include "dap_config.h"
typedef struct dap_chain_node_rpc_states_info {
uint32_t version;
dap_chain_node_addr_t address;
uint32_t location;
uint32_t links_count;
uint32_t cli_thread_count;
struct sysinfo sysinfo;
} DAP_ALIGN_PACKED dap_chain_node_rpc_states_info_t;
void dap_chain_node_rpc_init(dap_config_t *a_cfg); void dap_chain_node_rpc_init(dap_config_t *a_cfg);
bool dap_chain_node_rpc_is_my_node_authorized(); bool dap_chain_node_rpc_is_my_node_authorized();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment