From a9f7f1d92b1655ba6132352ed68f251634334b1d Mon Sep 17 00:00:00 2001 From: "pavel.uhanov" <pavel.uhanov@demlabs.net> Date: Thu, 13 Mar 2025 10:04:57 +0300 Subject: [PATCH] [*] add DAP_BALANCER_RPC_URI_HASH and adopt balancer --- modules/net/dap_chain_net_balancer.c | 111 ++++++++++-- modules/net/dap_chain_node_cli.c | 5 + modules/net/dap_chain_node_rpc.c | 178 +++++++++++++++++++ modules/net/include/dap_chain_net_balancer.h | 3 - modules/net/include/dap_chain_node_rpc.h | 22 +++ 5 files changed, 300 insertions(+), 19 deletions(-) create mode 100644 modules/net/dap_chain_node_rpc.c create mode 100644 modules/net/include/dap_chain_node_rpc.h diff --git a/modules/net/dap_chain_net_balancer.c b/modules/net/dap_chain_net_balancer.c index c9235a873b..3b418c7cfc 100644 --- a/modules/net/dap_chain_net_balancer.c +++ b/modules/net/dap_chain_net_balancer.c @@ -36,7 +36,14 @@ along with any CellFrame SDK based project. If not, see <http://www.gnu.org/lic #define LOG_TAG "dap_chain_net_balancer" -#define DAP_CHAIN_NET_BALANCER_REQUEST_DELAY 20 // sec +#define DAP_BALANCER_PROTOCOL_VERSION 2 +#define DAP_BALANCER_REQUEST_DELAY 20 // sec + +typedef enum dap_balancer_uri_type { + DAP_BALANCER_NODE_URI_HASH = 0, + DAP_BALANCER_RPC_URI_HASH, + DAP_BALANCER_URI_HASH_TOTAL +} dap_balancer_uri_type_t; typedef struct dap_balancer_request_info { dap_chain_net_id_t net_id; @@ -48,6 +55,21 @@ static_assert(sizeof(dap_chain_net_links_t) + sizeof(dap_chain_node_info_old_t) static const size_t s_max_links_response_count = (DAP_BALANCER_MAX_REPLY_SIZE - sizeof(dap_chain_net_links_t)) / sizeof(dap_chain_node_info_old_t); static dap_balancer_request_info_t* s_request_info_items = NULL; +static bool s_balancer_node = false; +static bool s_balancer_rpc = false; + +const char *s_uri[] = { + "f0intlt4eyl03htogu", + "GxDsAUEiAKJtAaw55CFbRh" // md5 + base58 encode +}; + +DAP_STATIC_INLINE dap_balancer_uri_type_t s_get_uri_type(const char *a_uri) +{ + dap_balancer_uri_type_t l_ret = 0; + for ( ; l_ret < DAP_BALANCER_URI_HASH_TOTAL && strcmp(a_uri, s_uri[l_ret]); ++l_ret); + return l_ret; +} + /** * @brief forming json file with balancer info: class networkName nodeAddress hostAddress hostPort * @param a_net - responce net @@ -368,15 +390,11 @@ int dap_chain_net_balancer_handshake(dap_chain_node_info_t *a_node_info, dap_cha * @param a_http_simple - http request * @param a_arg - request arg */ -void dap_chain_net_balancer_http_issue_link(dap_http_simple_t *a_http_simple, void *a_arg) +void s_http_node_issue_link(dap_http_simple_t *a_http_simple, http_status_code_t *a_return_code) { - log_it(L_DEBUG,"Proc enc http request from %s", a_http_simple->es_hostaddr); - http_status_code_t *l_return_code = (http_status_code_t *)a_arg; - - if (strcmp(a_http_simple->http_client->url_path, DAP_BALANCER_URI_HASH)) { - log_it(L_ERROR, "Wrong path '%s' in the request to dap_chain_net_balancer module", - a_http_simple->http_client->url_path); - *l_return_code = Http_Status_BadRequest; + if (!s_balancer_node) { + log_it(L_ERROR, "Balancer validator mode is off"); + *a_return_code = Http_Status_MethodNotAllowed; return; } int l_protocol_version = 0; @@ -387,13 +405,13 @@ void dap_chain_net_balancer_http_issue_link(dap_http_simple_t *a_http_simple, vo &l_protocol_version, &l_issue_method, &l_links_need); if (l_protocol_version > DAP_BALANCER_PROTOCOL_VERSION || l_protocol_version < 1 || l_issue_method != 'r') { log_it(L_ERROR, "Unsupported protocol version/method in the request to dap_chain_net_balancer module"); - *l_return_code = Http_Status_MethodNotAllowed; + *a_return_code = Http_Status_MethodNotAllowed; return; } char *l_net_str = strstr(a_http_simple->http_client->in_query_string, l_net_token); if (!l_net_str) { log_it(L_ERROR, "Net name token not found in the request to dap_chain_net_balancer module"); - *l_return_code = Http_Status_NotFound; + *a_return_code = Http_Status_NotFound; return; } l_net_str += sizeof(l_net_token) - 1; @@ -403,7 +421,7 @@ void dap_chain_net_balancer_http_issue_link(dap_http_simple_t *a_http_simple, vo l_ignored_str = strstr(a_http_simple->http_client->in_query_string, l_ignored_token); if (!l_ignored_str) { log_it(L_ERROR, "Net ignored token not found in the request to dap_chain_net_balancer module"); - *l_return_code = Http_Status_NotFound; + *a_return_code = Http_Status_NotFound; return; } *(l_ignored_str - 1) = 0; // set 0 terminator to split string @@ -413,10 +431,10 @@ void dap_chain_net_balancer_http_issue_link(dap_http_simple_t *a_http_simple, vo dap_chain_net_links_t *l_link_full_node_list = s_balancer_issue_link(l_net_str, l_links_need, l_protocol_version, l_ignored_str); if (!l_link_full_node_list) { log_it(L_DEBUG, "Can't issue link for network %s, no acceptable links found", l_net_str); - *l_return_code = Http_Status_NoContent; + *a_return_code = Http_Status_NoContent; return; } - *l_return_code = Http_Status_OK; + *a_return_code = Http_Status_OK; size_t l_data_send_size = sizeof(dap_chain_net_links_t); if (l_protocol_version == 1) l_data_send_size += sizeof(dap_chain_node_info_old_t) * l_link_full_node_list->count_node; @@ -426,6 +444,67 @@ void dap_chain_net_balancer_http_issue_link(dap_http_simple_t *a_http_simple, vo DAP_DELETE(l_link_full_node_list); } + +/** + * @brief issue to http balancer request + * @param a_http_simple - http request + * @param a_arg - request arg + */ +void s_http_rpc_issue_link(dap_http_simple_t *a_http_simple, http_status_code_t *a_return_code) +{ + if (!s_balancer_node) { + log_it(L_ERROR, "Balancer rpc mode is off"); + *a_return_code = Http_Status_MethodNotAllowed; + return; + } + int l_protocol_version = 0; + char l_issue_method = 0; + const char l_net_token[] = "net="; + sscanf(a_http_simple->http_client->in_query_string, "version=%d,method=%c", + &l_protocol_version, &l_issue_method); + if (l_protocol_version > DAP_BALANCER_PROTOCOL_VERSION || l_protocol_version < 1 || l_issue_method != 'r') { + log_it(L_ERROR, "Unsupported protocol version/method in the request to dap_chain_net_balancer module"); + *a_return_code = Http_Status_MethodNotAllowed; + return; + } + char *l_net_str = strstr(a_http_simple->http_client->in_query_string, l_net_token); + if (!l_net_str) { + log_it(L_ERROR, "Net name token not found in the request to dap_chain_net_balancer module"); + *a_return_code = Http_Status_NotFound; + return; + } + l_net_str += sizeof(l_net_token) - 1; + + // dap_http_simple_reply(a_http_simple, l_link_full_node_list, l_data_send_size); + // DAP_DELETE(l_link_full_node_list); +} + +/** + * @brief issue to http balancer request + * @param a_http_simple - http request + * @param a_arg - request arg + */ +void dap_chain_net_balancer_http_issue_link(dap_http_simple_t *a_http_simple, void *a_arg) +{ + dap_return_if_pass(!a_http_simple || !a_arg); + log_it(L_DEBUG,"Proc enc http request from %s", a_http_simple->es_hostaddr); + http_status_code_t *l_return_code = (http_status_code_t *)a_arg; + + switch (s_get_uri_type(a_http_simple->http_client->url_path)) { + case DAP_BALANCER_NODE_URI_HASH: + s_http_node_issue_link(a_http_simple, l_return_code); + break; + case DAP_BALANCER_RPC_URI_HASH: + s_http_rpc_issue_link(a_http_simple, l_return_code); + break; + default: + log_it(L_ERROR, "Wrong path '%s' in the request to dap_chain_net_balancer module", a_http_simple->http_client->url_path); + *l_return_code = Http_Status_BadRequest; + break; + } + return; +} + /** * @brief issue to dns balancer request * @param a_net_name - net name @@ -467,7 +546,7 @@ void dap_chain_net_balancer_request(void *a_arg) l_item->net_id = l_arg->net->pub.id; HASH_ADD(hh, s_request_info_items, net_id, sizeof(l_item->net_id), l_item); } - if (l_item->request_time + DAP_CHAIN_NET_BALANCER_REQUEST_DELAY > dap_time_now()) { + if (l_item->request_time + DAP_BALANCER_REQUEST_DELAY > dap_time_now()) { log_it(L_DEBUG, "Who understands life, he is in no hurry. Dear %s, please wait few seconds", l_arg->net->pub.name); DAP_DELETE(a_arg); return; @@ -509,7 +588,7 @@ void dap_chain_net_balancer_request(void *a_arg) const char *l_net_name = l_arg->net->pub.name, *l_bal_type = dap_chain_net_balancer_type_to_str(l_arg->type); char *l_request = dap_strdup_printf("%s/%s?version=%d,method=r,needlink=%d,net=%s,ignored=%s", DAP_UPLINK_PATH_BALANCER, - DAP_BALANCER_URI_HASH, + s_uri[DAP_BALANCER_NODE_URI_HASH], DAP_BALANCER_PROTOCOL_VERSION, (int)l_required_links_count, l_arg->net->pub.name, diff --git a/modules/net/dap_chain_node_cli.c b/modules/net/dap_chain_node_cli.c index 5faac29320..c6e4e6547a 100644 --- a/modules/net/dap_chain_node_cli.c +++ b/modules/net/dap_chain_node_cli.c @@ -42,6 +42,7 @@ #include "dap_chain_node_cli_cmd_tx.h" #include "dap_cli_server.h" #include "dap_chain_node_cli.h" +#include "dap_chain_node_rpc.h" #include "dap_notify_srv.h" #define LOG_TAG "chain_node_cli" @@ -54,6 +55,7 @@ static bool s_debug_cli = false; * init commands description * return 0 if OK, -1 error * @param g_config + * @param a_server_enabled - if server and rpc enabled will be wrn inform * @return int */ int dap_chain_node_cli_init(dap_config_t * g_config) @@ -63,6 +65,9 @@ int dap_chain_node_cli_init(dap_config_t * g_config) s_debug_cli = dap_config_get_item_bool_default(g_config, "cli-server", "debug-cli", false); if ( dap_cli_server_init(s_debug_cli, "cli-server") ) return log_it(L_ERROR, "Can't init CLI server!"), -1; + if (dap_config_get_item_bool_default(g_config, "cli-server", "rpc", false)) { + dap_chain_node_rpc_init(); + } dap_cli_server_cmd_add("global_db", com_global_db, "Work with global database", "global_db flush\n" diff --git a/modules/net/dap_chain_node_rpc.c b/modules/net/dap_chain_node_rpc.c new file mode 100644 index 0000000000..b64c02c2c3 --- /dev/null +++ b/modules/net/dap_chain_node_rpc.c @@ -0,0 +1,178 @@ +/* + * Authors: + * Pavel Uhanov <pavel.uhanov@demlabs.net> + * DeM Labs Inc. https://demlabs.net + + This file is part of DAP (Distributed Applications Platform) the open source project + + DAP (Distributed Applications Platform) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <stdlib.h> +#include <stdio.h> +#include <time.h> +#include <stdlib.h> +#include <stddef.h> +#include <stdint.h> +#include <string.h> + +#ifdef WIN32 +#include <winsock2.h> +#include <windows.h> +#include <mswsock.h> +#include <ws2tcpip.h> +#include <io.h> +#include <pthread.h> +#else +#include <sys/socket.h> +#include <netinet/in.h> +#endif + +#include "dap_hash.h" +#include "dap_chain_net.h" +#include "dap_global_db.h" +#include "dap_chain_node.h" +#include "dap_chain_cs_esbocs.h" +#include "dap_chain_ledger.h" + +#define LOG_TAG "dap_chain_node_rpc" +#define DAP_CHAIN_NODE_RPC_STATES_INFO_CURRENT_VERSION 1 +typedef struct dap_chain_node_rpc_states_info { + uint32_t version; + dap_chain_node_addr_t address; + int32_t net_state; + uint32_t cli_thread_count; +} DAP_ALIGN_PACKED dap_chain_node_rpc_states_info_t; + + +static const uint64_t s_timer_update_states_info = 10 /*sec*/ * 1000; +static const char s_states_group[] = ".rpc.states"; + +/** + * @brief get states info about current + * @param a_arg - pointer to callback arg + */ +static void s_update_node_rpc_states_info(UNUSED_ARG void *a_arg) +{ + for (dap_chain_net_t *l_net = dap_chain_net_iter_start(); l_net; l_net = dap_chain_net_iter_next(l_net)) { + if(dap_chain_net_get_state(l_net) != NET_STATE_OFFLINE) { + size_t + l_uplinks_count = 0, + l_downlinks_count = 0, + l_info_size = 0; + dap_chain_node_rpc_states_info_t *l_info = DAP_NEW_Z_RET_IF_FAIL(dap_chain_node_rpc_states_info_t); + l_info->version = DAP_CHAIN_NODE_RPC_STATES_INFO_CURRENT_VERSION; + l_info->address.uint64 = g_node_addr.uint64; + l_info->net_state = dap_chain_net_get_state(l_net); + + char *l_gdb_group = dap_strdup_printf("%s%s", l_net->pub.gdb_groups_prefix, s_states_group); + const char *l_node_addr_str = dap_stream_node_addr_to_str_static(l_info->address); + dap_global_db_set_sync(l_gdb_group, l_node_addr_str, l_info, l_info_size, false); + DAP_DEL_MULTY(l_info, l_gdb_group); + } + } +} + +static void s_states_info_to_str(dap_chain_net_t *a_net, const char *a_node_addr_str, dap_string_t *l_info_str) +{ +// sanity check + dap_return_if_pass(!a_net || !a_node_addr_str || !l_info_str); +// func work + // dap_nanotime_t l_timestamp = 0; + // size_t l_data_size = 0; + // char *l_gdb_group = dap_strdup_printf("%s%s", a_net->pub.gdb_groups_prefix, s_states_group); + // byte_t *l_node_info_data = dap_global_db_get_sync(l_gdb_group, a_node_addr_str, &l_data_size, NULL, &l_timestamp); + // DAP_DELETE(l_gdb_group); + // dap_chain_node_rpc_states_info_t *l_node_info = NULL; + // if (!l_node_info_data) + // return log_it(L_ERROR, "Can't find state of node %s in net %s", a_node_addr_str, a_net->pub.name); + // if ( (l_data_size - sizeof(dap_chain_node_rpc_states_info_t)) % sizeof(dap_chain_node_addr_t) ) { + // if ( (l_data_size - sizeof(dap_chain_node_net_states_info_v1_t)) % sizeof(dap_chain_node_addr_t) ) + // return DAP_DELETE(l_node_info_data), log_it(L_ERROR, "Irrelevant size of node %s info", a_node_addr_str); + // dap_chain_node_net_states_info_v1_t *l_info_old = (dap_chain_node_net_states_info_v1_t*)l_node_info_data; + // l_node_info = DAP_NEW_Z_SIZE( dap_chain_node_rpc_states_info_t, sizeof(dap_chain_node_rpc_states_info_t) + // + (l_info_old->uplinks_count + l_info_old->downlinks_count) * sizeof(dap_chain_node_addr_t) ); + // l_node_info->version_info = 1; + // memcpy( (byte_t*)l_node_info + node_info_v1_shift, l_info_old, l_data_size ); + // DAP_DELETE(l_node_info_data); + // } else + // l_node_info = (dap_chain_node_rpc_states_info_t*)l_node_info_data; + // char l_ts[80] = { '\0' }; + // dap_nanotime_to_str_rfc822(l_ts, sizeof(l_ts), l_timestamp); + // dap_string_append_printf(l_info_str, + // "Record timestamp: %s\nRecord version: %u\nNode version: %s\nNode addr: %s\nNet: %s\nRole: %s\n" + // "Events count: %"DAP_UINT64_FORMAT_U"\nAtoms count: %"DAP_UINT64_FORMAT_U"\nUplinks count: %u\nDownlinks count: %u\n", + // l_ts, l_node_info->version_info, l_node_info->version_node, a_node_addr_str, a_net->pub.name, + // dap_chain_node_role_to_str(l_node_info->role), l_node_info->info_v1.events_count, l_node_info->info_v1.atoms_count, + // l_node_info->info_v1.uplinks_count, l_node_info->info_v1.downlinks_count); + // size_t l_max_links = dap_max(l_node_info->info_v1.uplinks_count, l_node_info->info_v1.downlinks_count); + // if(l_max_links) { + // dap_string_append_printf(l_info_str, + // "-----------------------------------------------------------------\n" + // "|\tUplinks node addrs\t|\tDownlinks node addrs\t|\n" + // "-----------------------------------------------------------------\n"); + // } + // for (size_t i = 0; i < l_max_links; ++i) { + // char *l_upnlink_str = i < l_node_info->info_v1.uplinks_count + // ? dap_stream_node_addr_to_str(l_node_info->info_v1.links_addrs[i], false) + // : dap_strdup("\t\t"); + // char *l_downlink_str = i < l_node_info->info_v1.downlinks_count + // ? dap_stream_node_addr_to_str(l_node_info->info_v1.links_addrs[i + l_node_info->info_v1.uplinks_count], false) + // : dap_strdup("\t\t"); + // dap_string_append_printf(l_info_str, "|\t%s\t|\t%s\t|\n", l_upnlink_str, l_downlink_str); + // DAP_DEL_MULTY(l_upnlink_str, l_downlink_str); + // } + // dap_string_append_printf(l_info_str, "-----------------------------------------------------------------\n"); + // DAP_DELETE(l_node_info); +} + + +void dap_chain_node_rpc_init() +{ + if (dap_proc_thread_timer_add(NULL, s_update_node_rpc_states_info, NULL, s_timer_update_states_info)) + log_it(L_ERROR, "Can't activate timer on node states update"); +} + +// /** +// * @brief get states info about current +// * @param a_arg - pointer to callback arg +// */ +// dap_string_t *dap_chain_node_states_info_read(dap_chain_net_t *a_net, dap_stream_node_addr_t a_addr) +// { +// dap_string_t *l_ret = dap_string_new(""); +// const char *l_node_addr_str = dap_stream_node_addr_to_str_static(a_addr.uint64 ? a_addr : g_node_addr); +// if(!a_net) { +// for (dap_chain_net_t *l_net = dap_chain_net_iter_start(); l_net; l_net = dap_chain_net_iter_next(l_net)) { +// s_states_info_to_str(l_net, l_node_addr_str, l_ret); +// } +// } else { +// s_states_info_to_str(a_net, l_node_addr_str, l_ret); +// } +// if (!l_ret->len) { +// const char *l_prefix = !a_addr.uint64 ? "my" : a_addr.uint64 == g_node_addr.uint64 ? "my" : ""; +// if (a_net) { +// dap_string_append_printf(l_ret, "Can't find rpc state of %s node %s in net %s", l_prefix, l_node_addr_str, a_net->pub.name); +// } else { +// dap_string_append_printf(l_ret, "Can't find rpc state of %s node %s in nets ", l_prefix, l_node_addr_str); +// dap_chain_net_t *l_current_net = NULL, *l_next_net = dap_chain_net_iter_start(); +// while(l_next_net) { +// l_current_net = l_next_net; +// l_next_net = dap_chain_net_iter_next(l_next_net); +// dap_string_append_printf(l_ret, l_next_net ? "%s, " : "%s", l_current_net->pub.name); +// } +// } +// } +// return l_ret; +// } + diff --git a/modules/net/include/dap_chain_net_balancer.h b/modules/net/include/dap_chain_net_balancer.h index 3390633718..fc51c1bb13 100644 --- a/modules/net/include/dap_chain_net_balancer.h +++ b/modules/net/include/dap_chain_net_balancer.h @@ -26,11 +26,8 @@ along with any CellFrame SDK based project. If not, see <http://www.gnu.org/lic #include "dap_chain_node.h" #include "dap_http_simple.h" -#define DAP_BALANCER_URI_HASH "f0intlt4eyl03htogu" -#define DAP_BALANCER_PROTOCOL_VERSION 2 #define DAP_BALANCER_MAX_REPLY_SIZE 2048 - typedef struct dap_balancer_request_info dap_balancer_request_info_t; typedef struct dap_chain_net_links { diff --git a/modules/net/include/dap_chain_node_rpc.h b/modules/net/include/dap_chain_node_rpc.h new file mode 100644 index 0000000000..23d6e368c6 --- /dev/null +++ b/modules/net/include/dap_chain_node_rpc.h @@ -0,0 +1,22 @@ +/* + * Authors: + * Pavel Uhanov <pavel.uhanov@demlabs.net> + * DeM Labs Inc. https://demlabs.net + + This file is part of DAP (Distributed Applications Platform) the open source project + + DAP (Distributed Applications Platform) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. + */ + + void dap_chain_node_rpc_init(); \ No newline at end of file -- GitLab