From 262ab5370d26332633d829607e8d002a4a0948d7 Mon Sep 17 00:00:00 2001 From: cellframe <roman.khlopkov@demlabs.net> Date: Mon, 14 Nov 2022 13:04:13 +0300 Subject: [PATCH] [*] Potring changes from features-7149 --- dap-sdk/net/client/dap_client_http.c | 75 +- dap-sdk/net/client/dap_client_pvt.c | 12 +- dap-sdk/net/client/include/dap_client.h | 1 + dap-sdk/net/client/include/dap_client_http.h | 11 +- modules/net/dap_chain_net.c | 756 +++++++++--------- modules/net/dap_chain_node.c | 50 -- modules/net/dap_chain_node_client.c | 7 +- modules/net/dap_chain_node_dns_client.c | 85 +- modules/net/include/dap_chain_net.h | 1 - modules/net/include/dap_chain_node.h | 27 - modules/net/include/dap_chain_node_client.h | 2 +- .../net/include/dap_chain_node_dns_client.h | 6 +- 12 files changed, 451 insertions(+), 582 deletions(-) diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index 5e217177db..7ec0a4ee90 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -49,9 +49,8 @@ typedef struct dap_http_client_internal { dap_client_http_callback_data_t response_callback; dap_client_http_callback_error_t error_callback; - dap_client_http_callback_error_ext_t error_ext_callback; + void *callbacks_arg; - void *obj; // dap_client_pvt_t *client_pvt; byte_t *request; size_t request_size; size_t request_sent_size; @@ -195,7 +194,7 @@ static bool s_timer_timeout_after_connected_check(void * a_arg) log_it(L_WARNING, "Timeout for reading after connect for request http://%s:%u/%s, possible uplink is on heavy load or DPI between you", l_http_pvt->uplink_addr, l_http_pvt->uplink_port, l_http_pvt->path); if(l_http_pvt->error_callback) { - l_http_pvt->error_callback(ETIMEDOUT, l_http_pvt->obj); + l_http_pvt->error_callback(ETIMEDOUT, l_http_pvt->callbacks_arg); l_http_pvt->were_callbacks_called = true; } l_http_pvt->is_closed_by_timeout = true; @@ -231,7 +230,7 @@ static bool s_timer_timeout_check(void * a_arg) log_it(L_WARNING,"Connecting timeout for request http://%s:%u/%s, possible network problems or host is down", l_http_pvt->uplink_addr, l_http_pvt->uplink_port, l_http_pvt->path); if(l_http_pvt->error_callback) { - l_http_pvt->error_callback(ETIMEDOUT, l_http_pvt->obj); + l_http_pvt->error_callback(ETIMEDOUT, l_http_pvt->callbacks_arg); l_http_pvt->were_callbacks_called = true; } l_http_pvt->is_closed_by_timeout = true; @@ -310,7 +309,7 @@ static void s_http_read(dap_events_socket_t * a_es, void * arg) l_http_pvt->response_callback( l_http_pvt->response + l_http_pvt->header_length, l_http_pvt->content_length, //l_client_internal->response_size - l_client_internal->header_size, - l_http_pvt->obj); + l_http_pvt->callbacks_arg); l_http_pvt->response_size -= l_http_pvt->header_length; l_http_pvt->response_size -= l_http_pvt->content_length; l_http_pvt->header_length = 0; @@ -356,7 +355,7 @@ static void s_http_error(dap_events_socket_t * a_es, int a_errno) return; } if(l_client_http_internal->error_callback) - l_client_http_internal->error_callback(a_errno, l_client_http_internal->obj); + l_client_http_internal->error_callback(a_errno, l_client_http_internal->callbacks_arg); l_client_http_internal->were_callbacks_called = true; @@ -382,26 +381,26 @@ static void s_es_delete(dap_events_socket_t * a_es, void * a_arg) if (l_client_http_internal->content_length){ log_it(L_WARNING, "Remote server disconnected before he sends all data: %zd data in buffer when expected %zd", l_client_http_internal->response_size, l_client_http_internal->content_length); - l_client_http_internal->error_callback(-666, l_client_http_internal->obj); // -666 means remote server disconnected before he sends all + l_client_http_internal->error_callback(-666, l_client_http_internal->callbacks_arg); // -666 means remote server disconnected before he sends all }else if (l_response_size){ log_it(L_INFO, "Remote server replied without no content length but we have the response %zd bytes size", l_response_size); - //l_client_http_internal->error_callback(-10 , l_client_http_internal->obj); + //l_client_http_internal->error_callback(-10 , l_client_http_internal->callbacks_arg); if(l_client_http_internal->response_callback) l_client_http_internal->response_callback( l_client_http_internal->response + l_client_http_internal->header_length, l_response_size, - l_client_http_internal->obj); + l_client_http_internal->callbacks_arg); l_client_http_internal->were_callbacks_called = true; }else if (l_client_http_internal->response_size){ log_it(L_INFO, "Remote server disconnected with reply. Body is empty, only headers are in"); - l_client_http_internal->error_callback(-667 , l_client_http_internal->obj); // -667 means remote server replied only with headers + l_client_http_internal->error_callback(-667 , l_client_http_internal->callbacks_arg); // -667 means remote server replied only with headers l_client_http_internal->were_callbacks_called = true; }else{ log_it(L_WARNING, "Remote server disconnected without reply"); - l_client_http_internal->error_callback(-668, l_client_http_internal->obj); // -668 means remote server disconnected before he sends anythinh + l_client_http_internal->error_callback(-668, l_client_http_internal->callbacks_arg); // -668 means remote server disconnected before he sends anythinh l_client_http_internal->were_callbacks_called = true; } } @@ -458,14 +457,14 @@ static void s_client_http_delete(dap_client_http_pvt_t * a_http_pvt) * @param a_cookie * @param a_response_callback * @param a_error_callback - * @param a_obj - * @param a_custom - * @param a_custom_count + * @param a_callbacks_arg + * @param a_custom_headers + * @param a_over_ssl */ -void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_uplink_addr, uint16_t a_uplink_port, const char *a_method, +int dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_uplink_addr, uint16_t a_uplink_port, const char *a_method, const char *a_request_content_type, const char * a_path, const void *a_request, size_t a_request_size, char *a_cookie, dap_client_http_callback_data_t a_response_callback, dap_client_http_callback_error_t a_error_callback, - void *a_obj, char *a_custom, bool a_over_ssl) + void *a_callbacks_arg, char *a_custom_headers, bool a_over_ssl) { //log_it(L_DEBUG, "HTTP request on url '%s:%d'", a_uplink_addr, a_uplink_port); @@ -489,9 +488,9 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_upli if (l_socket == -1) { log_it(L_ERROR, "Error %d with socket create", errno); if(a_error_callback) - a_error_callback(errno,a_obj); + a_error_callback(errno, a_callbacks_arg); #endif - return NULL; + return -1; } // Get socket flags #if defined DAP_OS_WINDOWS @@ -503,17 +502,17 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_upli if (l_socket_flags == -1){ log_it(L_ERROR, "Error %d can't get socket flags", errno); if(a_error_callback) - a_error_callback(errno,a_obj); + a_error_callback(errno, a_callbacks_arg); - return NULL; + return -2; } // Make it non-block if (fcntl( l_socket, F_SETFL,l_socket_flags| O_NONBLOCK) == -1){ log_it(L_ERROR, "Error %d can't get socket flags", errno); if(a_error_callback) - a_error_callback(errno,a_obj); + a_error_callback(errno, a_callbacks_arg); - return NULL; + return -3; } #endif // set socket param @@ -530,21 +529,21 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_upli l_http_pvt->error_callback = a_error_callback; l_http_pvt->response_callback = a_response_callback; //l_client_http_internal->socket = l_socket; - l_http_pvt->obj = a_obj; + l_http_pvt->callbacks_arg = a_callbacks_arg; l_http_pvt->method = dap_strdup(a_method); l_http_pvt->path = dap_strdup(a_path); l_http_pvt->request_content_type = dap_strdup(a_request_content_type); l_http_pvt->request = DAP_NEW_Z_SIZE(byte_t, a_request_size+1); if (! l_http_pvt->request) - return NULL; + return -4; l_http_pvt->request_size = a_request_size; memcpy(l_http_pvt->request, a_request, a_request_size); l_http_pvt->uplink_addr = dap_strdup(a_uplink_addr); l_http_pvt->uplink_port = a_uplink_port; l_http_pvt->cookie = a_cookie; - l_http_pvt->request_custom_headers = dap_strdup(a_custom); + l_http_pvt->request_custom_headers = dap_strdup(a_custom_headers); l_http_pvt->response_size_max = DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX; l_http_pvt->response = (uint8_t*) DAP_NEW_Z_SIZE(uint8_t, DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX); @@ -562,9 +561,9 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_upli l_ev_socket->_inheritor = NULL; dap_events_socket_delete_unsafe( l_ev_socket, true); if(a_error_callback) - a_error_callback(errno,a_obj); + a_error_callback(errno, a_callbacks_arg); - return NULL; + return -5; } } l_ev_socket->remote_addr_str = dap_strdup(a_uplink_addr); @@ -590,7 +589,7 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_upli s_http_ssl_connected(l_ev_socket); #endif } - return l_http_pvt; + return 0; } #ifdef DAP_OS_WINDOWS else if(l_err == SOCKET_ERROR) { @@ -629,7 +628,7 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_upli l_http_pvt->worker->id, *l_ev_uuid_ptr); DAP_DEL_Z(l_ev_uuid_ptr); } - return l_http_pvt; + return 0; } else{ char l_errbuf[128]; @@ -640,12 +639,12 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_upli l_ev_socket->_inheritor = NULL; dap_events_socket_delete_unsafe( l_ev_socket, true); if(a_error_callback) - a_error_callback(errno,a_obj); + a_error_callback(errno, a_callbacks_arg); - return NULL; + return -6; } #endif - return NULL; + return -7; } #ifndef DAP_NET_CLIENT_NO_SSL @@ -762,15 +761,15 @@ static void s_http_connected(dap_events_socket_t * a_esocket) * @param a_cookie * @param a_response_callback * @param a_error_callback - * @param a_obj - * @param a_custom + * @param a_callbacks_arg + * @param a_custom_headers */ -void* dap_client_http_request(dap_worker_t * a_worker,const char *a_uplink_addr, uint16_t a_uplink_port, const char * a_method, +int dap_client_http_request(dap_worker_t * a_worker,const char *a_uplink_addr, uint16_t a_uplink_port, const char * a_method, const char* a_request_content_type, const char * a_path, const void *a_request, size_t a_request_size, char * a_cookie, dap_client_http_callback_data_t a_response_callback, - dap_client_http_callback_error_t a_error_callback, void *a_obj, void * a_custom) + dap_client_http_callback_error_t a_error_callback, void *a_callbacks_arg, char *a_custom_headers) { return dap_client_http_request_custom(a_worker, a_uplink_addr, a_uplink_port, a_method, a_request_content_type, a_path, - a_request, a_request_size, a_cookie, a_response_callback, a_error_callback, a_obj, - (char*)a_custom, false); + a_request, a_request_size, a_cookie, a_response_callback, a_error_callback, a_callbacks_arg, + a_custom_headers, false); } diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index d943dfae12..f6b61e2222 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -728,15 +728,11 @@ int dap_client_pvt_request(dap_client_pvt_t * a_client_internal, const char * a_ a_client_internal->request_response_callback = a_response_proc; a_client_internal->request_error_callback = a_response_error; a_client_internal->is_encrypted = false; - a_client_internal->refs_count++;; + a_client_internal->refs_count++; - void *l_ret = dap_client_http_request(a_client_internal->worker, a_client_internal->uplink_addr,a_client_internal->uplink_port, + return dap_client_http_request(a_client_internal->worker, a_client_internal->uplink_addr,a_client_internal->uplink_port, a_request ? "POST" : "GET", "text/text", a_path, a_request, a_request_size, NULL, s_request_response, s_request_error, a_client_internal, NULL); - - if(l_ret) - return 0; - return -1; } /** @@ -762,8 +758,7 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char a_query ? a_query : "NULL"); size_t l_sub_url_size = a_sub_url ? strlen(a_sub_url) : 0; size_t l_query_size = a_query ? strlen(a_query) : 0; - size_t l_url_size; - +// size_t l_url_size; // char l_url[1024] = { 0 }; // snprintf(l_url, 1024, "http://%s:%u", a_client_internal->uplink_addr, a_client_internal->uplink_port); // l_url_size = strlen(l_url); @@ -785,7 +780,6 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char a_client_internal->request_response_callback = a_response_proc; a_client_internal->request_error_callback = a_response_error; a_client_internal->is_encrypted = true; - size_t i; dap_enc_data_type_t l_enc_type; if(a_client_internal->uplink_protocol_version >= 21) diff --git a/dap-sdk/net/client/include/dap_client.h b/dap-sdk/net/client/include/dap_client.h index 923492769a..422864f192 100644 --- a/dap-sdk/net/client/include/dap_client.h +++ b/dap-sdk/net/client/include/dap_client.h @@ -90,6 +90,7 @@ typedef void (*dap_client_callback_data_size_t) (dap_client_t *, void *, size_t) #define DAP_UPLINK_PATH_STREAM "stream" #define DAP_UPLINK_PATH_LICENSE "license" //#define DAP_UPLINK_PATH_NODE_LIST "nodelist" +#define DAP_UPLINK_PATH_BALANCER "balancer" #ifdef __cplusplus extern "C" { diff --git a/dap-sdk/net/client/include/dap_client_http.h b/dap-sdk/net/client/include/dap_client_http.h index 182bd3a3a2..f416ffbd73 100644 --- a/dap-sdk/net/client/include/dap_client_http.h +++ b/dap-sdk/net/client/include/dap_client_http.h @@ -35,15 +35,14 @@ typedef void (*dap_client_http_callback_data_t)(void *, size_t, void *); // Call int dap_client_http_init(); void dap_client_http_deinit(); -void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_uplink_addr, uint16_t a_uplink_port, const char *a_method, +int dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_uplink_addr, uint16_t a_uplink_port, const char *a_method, const char *a_request_content_type, const char * a_path, const void *a_request, size_t a_request_size, char *a_cookie, - dap_client_http_callback_data_t a_response_callback, dap_client_http_callback_error_t a_error_callback, - void *a_obj, char *a_custom, bool a_over_ssl); - -void* dap_client_http_request(dap_worker_t * a_worker,const char *a_uplink_addr, uint16_t a_uplink_port, const char * a_method, + dap_client_http_callback_data_t a_response_callback, dap_client_http_callback_error_t a_error_callback, + void *a_callbacks_arg, char *a_custom_headers, bool a_over_ssl); +int dap_client_http_request(dap_worker_t * a_worker,const char *a_uplink_addr, uint16_t a_uplink_port, const char * a_method, const char* a_request_content_type, const char * a_path, const void *a_request, size_t a_request_size, char * a_cookie, dap_client_http_callback_data_t a_response_callback, - dap_client_http_callback_error_t a_error_callback, void *a_obj, void * a_custom); + dap_client_http_callback_error_t a_error_callback, void *a_callbacks_arg, char *a_custom_headers); uint64_t dap_client_http_get_connect_timeout_ms(); void dap_client_http_set_connect_timeout_ms(uint64_t a_timeout_ms); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 78f7ad6066..a2dbb52c92 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -97,7 +97,7 @@ #include "dap_notify_srv.h" #include "dap_chain_ledger.h" #include "dap_chain_cs_none.h" - +#include "dap_client_http.h" #include "dap_global_db.h" #include "dap_chain_global_db_remote.h" @@ -120,25 +120,23 @@ #define LOG_TAG "chain_net" #define F_DAP_CHAIN_NET_SYNC_FROM_ZERO ( 1 << 8 ) -#define F_DAP_CHAIN_NET_SHUTDOWN ( 1 << 9 ) -#define F_DAP_CHAIN_NET_GO_SYNC ( 1 << 10 ) -// maximum number of connections -static size_t s_max_links_count = 5;// by default 5 -// number of required connections -static size_t s_required_links_count = 3;// by default 3 static bool s_debug_more = false; -struct link_dns_request { - //uint32_t link_id; // not used - dap_chain_net_t * net; - uint_fast16_t tries; +struct balancer_link_request { + dap_chain_node_info_t *link_info; + dap_chain_net_t *net; + dap_worker_t *worker; + bool from_http; + bool link_replace; }; struct net_link { + uint64_t uplink_ip; dap_chain_node_info_t *link_info; dap_chain_node_client_t *link; dap_events_socket_uuid_t client_uuid; + UT_hash_handle hh; }; struct downlink { @@ -161,20 +159,21 @@ typedef struct dap_chain_net_pvt{ dap_chain_node_addr_t * node_addr; dap_chain_node_info_t * node_info; // Current node's info + atomic_uint balancer_link_requests; + bool balancer_http; //Active synchronizing link dap_chain_node_client_t *active_link; dap_list_t *links_queue; // Links waiting for sync - dap_list_t *net_links; // Links list - size_t links_connected_count; + struct net_link *net_links; // Links HT bool only_static_links; + uint16_t required_links_count; + uint16_t max_links_count; - struct downlink *downlinks; // List of links who sent SYNC REQ, it used for sync broadcasting + struct downlink *downlinks; // HT of links who sent SYNC REQ, it used for sync broadcasting dap_list_t *records_queue; dap_list_t *atoms_queue; - atomic_uint links_dns_requests; - bool load_mode; char ** seed_aliases; @@ -253,23 +252,12 @@ static const dap_chain_node_client_callbacks_t s_node_link_callbacks={ .delete=s_node_link_callback_delete }; - // State machine switchs here static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg); -// Notify about net states -struct json_object *net_states_json_collect(dap_chain_net_t * l_net); -static void s_net_states_notify(dap_chain_net_t * l_net); - -// Prepare link success/error endpoints -static void s_net_state_link_prepare_success(dap_worker_t * a_worker,dap_chain_node_info_t * a_node_info, void * a_arg); -static void s_net_state_link_prepare_error(dap_worker_t * a_worker,dap_chain_node_info_t * a_node_info, void * a_arg, int a_errno); - - -// Replace link success/error callbacks -static void s_net_state_link_replace_success(dap_worker_t *a_worker,dap_chain_node_info_t *a_node_info, void *a_arg); -static void s_net_state_link_replace_error(dap_worker_t *a_worker,dap_chain_node_info_t *a_node_info, void *a_arg, int a_errno); +struct json_object *s_net_states_json_collect(dap_chain_net_t * l_net); +static void s_net_states_notify(dap_chain_net_t * l_net); //static void s_net_proc_kill( dap_chain_net_t * a_net ); int s_net_load(const char * a_net_name, uint16_t a_acl_idx); @@ -279,14 +267,13 @@ static void s_gbd_history_callback_notify (void * a_arg, const char a_op_code, c const char * a_key, const void * a_value, const size_t a_value_len); static void s_chain_callback_notify(void * a_arg, dap_chain_t *a_chain, dap_chain_cell_id_t a_id, void *a_atom, size_t a_atom_size); - static int s_cli_net(int argc, char ** argv, char **str_reply); +static uint8_t *s_net_set_acl(dap_chain_hash_fast_t *a_pkey_hash); +static bool s_balancer_start_dns_request(dap_chain_net_t *a_net, dap_chain_node_info_t *a_link_node_info, bool a_link_replace); +static bool s_balancer_start_http_request(dap_chain_net_t *a_net, dap_chain_node_info_t *a_link_node_info, bool a_link_replace); static bool s_seed_mode = false; -static uint8_t *s_net_set_acl(dap_chain_hash_fast_t *a_pkey_hash); -static bool s_start_dns_request(dap_chain_net_t *a_net, dap_chain_node_info_t *a_link_node_info); - /** * @brief * init network settings from cellrame-node.cfg file @@ -322,11 +309,6 @@ int dap_chain_net_init() "net -net <chain net name> ledger reload\n" "\tPurge the cache of chain net ledger and recalculate it from chain file\n"); s_seed_mode = dap_config_get_item_bool_default(g_config,"general","seed_mode",false); - - // maximum number of connections to other nodes - s_max_links_count = dap_config_get_item_int32_default(g_config, "general", "max_links", s_max_links_count); - // required number of connections to other nodes - s_required_links_count = dap_config_get_item_int32_default(g_config, "general", "require_links", s_required_links_count); s_debug_more = dap_config_get_item_bool_default(g_config,"chain_net","debug_more",false); dap_enc_http_set_acl_callback(s_net_set_acl); @@ -373,8 +355,6 @@ int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_n pthread_rwlock_wrlock(&PVT(a_net)->states_lock); } PVT(a_net)->state_target = a_new_state; - // set flag for sync - PVT(a_net)->flags |= F_DAP_CHAIN_NET_GO_SYNC; //PVT(a_net)->flags |= F_DAP_CHAIN_NET_SYNC_FROM_ZERO; // TODO set this flag according to -mode argument from command line pthread_rwlock_unlock(&PVT(a_net)->states_lock); @@ -703,21 +683,7 @@ static void s_gbd_history_callback_notify(void *a_arg, const char a_op_code, con } } -/** - * @brief Get the possible number of links - */ -static size_t s_get_dns_max_links_count_from_cfg(dap_chain_net_t *a_net) -{ - dap_chain_net_pvt_t *l_net_pvt = a_net ? PVT(a_net) : NULL; - if(!l_net_pvt) - return 0; - return (size_t)l_net_pvt->seed_aliases_count + l_net_pvt->bootstrap_nodes_count; -} - -/** - * @brief Get one random link - */ -static dap_chain_node_info_t *s_get_dns_link_from_cfg(dap_chain_net_t *a_net) +static dap_chain_node_info_t *s_get_balancer_link_from_cfg(dap_chain_net_t *a_net) { dap_chain_net_pvt_t *l_net_pvt = a_net ? PVT(a_net) : NULL; if(!l_net_pvt) return NULL; @@ -768,20 +734,75 @@ static dap_chain_node_info_t *s_get_dns_link_from_cfg(dap_chain_net_t *a_net) * @param a_net Network * @param a_link_node_info Node info */ -static bool dap_chain_net_link_is_present(dap_chain_net_t *a_net, dap_chain_node_info_t *a_link_node_info) +static struct net_link *s_net_link_find(dap_chain_net_t *a_net, dap_chain_node_info_t *a_link_node_info) { - dap_chain_net_pvt_t *l_net_pvt = a_net ? PVT(a_net) : NULL; - if(!l_net_pvt) - return false; - dap_list_t *l_net_links = l_net_pvt->net_links; - while(l_net_links) { - struct net_link *l_net_link = (struct net_link*) l_net_links->data; - dap_chain_node_info_t *l_link_node_info = l_net_link->link_info; - if(dap_chain_node_info_addr_match(l_link_node_info, a_link_node_info)) - return true; - l_net_links = dap_list_next(l_net_links); + uint64_t l_addr = a_link_node_info->hdr.ext_addr_v4.s_addr; + struct net_link *l_present; + pthread_rwlock_rdlock(&PVT(a_net)->uplinks_lock); + HASH_FIND(hh, PVT(a_net)->net_links, &l_addr, sizeof(l_addr), l_present); + pthread_rwlock_unlock(&PVT(a_net)->uplinks_lock); + return l_present; +} + +static int s_net_link_add(dap_chain_net_t *a_net, dap_chain_node_info_t *a_link_node_info) +{ + dap_chain_net_pvt_t *l_pvt_net = PVT(a_net); + if (HASH_COUNT(PVT(a_net)->net_links) >= PVT(a_net)->max_links_count) + return +1; + if (!a_link_node_info) + return -1; + uint64_t l_own_addr = dap_chain_net_get_cur_addr_int(a_net); + if (a_link_node_info->hdr.address.uint64 == l_own_addr) + return -2; + uint64_t l_addr = a_link_node_info->hdr.ext_addr_v4.s_addr; + struct net_link *l_new_link; + pthread_rwlock_wrlock(&PVT(a_net)->uplinks_lock); + HASH_FIND(hh, PVT(a_net)->net_links, &l_addr, sizeof(l_addr), l_new_link); + if (l_new_link) { + pthread_rwlock_unlock(&PVT(a_net)->uplinks_lock); + return -3; + } + l_new_link = DAP_NEW_Z(struct net_link); + l_new_link->link_info = DAP_DUP(a_link_node_info); + l_new_link->uplink_ip = a_link_node_info->hdr.ext_addr_v4.s_addr; + HASH_ADD(hh, l_pvt_net->net_links, uplink_ip, sizeof(l_new_link->uplink_ip), l_new_link); + pthread_rwlock_unlock(&l_pvt_net->uplinks_lock); + return 0; +} + +static void s_net_link_remove(dap_chain_net_pvt_t *a_net_pvt, dap_events_socket_uuid_t a_client_uuid, bool a_rebase) +{ + struct net_link *l_link, *l_link_tmp, *l_link_found = NULL; + HASH_ITER(hh, a_net_pvt->net_links, l_link, l_link_tmp) { + if (l_link->client_uuid == a_client_uuid) { + l_link_found = l_link; + break; + } + } + if (!l_link_found) { + log_it(L_WARNING, "Can't find link UUID 0x%"DAP_UINT64_FORMAT_x" to remove it from links HT", a_client_uuid); + return; + } + HASH_DEL(a_net_pvt->net_links, l_link_found); + if (a_rebase) { + l_link_found->link = NULL; + l_link_found->client_uuid = 0; + // Add it to the list end + HASH_ADD(hh, a_net_pvt->net_links, uplink_ip, sizeof(l_link_found->uplink_ip), l_link_found); + } else { + DAP_DELETE(l_link_found->link_info); + DAP_DELETE(l_link_found); } - return false; +} + +static size_t s_net_get_active_links_count(dap_chain_net_t * a_net) +{ + int l_ret = 0; + struct net_link *l_link, *l_link_tmp; + HASH_ITER(hh, PVT(a_net)->net_links, l_link, l_link_tmp) + if (l_link->client_uuid) + l_ret++; + return l_ret; } /** @@ -790,38 +811,24 @@ static bool dap_chain_net_link_is_present(dap_chain_net_t *a_net, dap_chain_node */ static void s_fill_links_from_root_aliases(dap_chain_net_t * a_net) { - dap_chain_net_pvt_t *l_pvt_net = PVT(a_net); - uint64_t l_own_addr = dap_chain_net_get_cur_addr_int(a_net); - for (size_t i = 0; i < MIN(s_max_links_count, l_pvt_net->seed_aliases_count); i++) { - pthread_rwlock_rdlock(&l_pvt_net->uplinks_lock); - if (dap_list_length(l_pvt_net->net_links) >= s_max_links_count) { - pthread_rwlock_unlock(&l_pvt_net->uplinks_lock); - break; - } else - pthread_rwlock_unlock(&l_pvt_net->uplinks_lock); - - dap_chain_node_addr_t *l_link_addr = dap_chain_node_alias_find(a_net, l_pvt_net->seed_aliases[i]); - if (!l_link_addr) - continue; - - if (l_link_addr->uint64 == l_own_addr) { - DAP_DELETE(l_link_addr); - continue; // Do not link with self - } - dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(a_net, l_link_addr); - if(l_link_node_info && !dap_chain_net_link_is_present(a_net, l_link_node_info)) { - struct net_link *l_new_link = DAP_NEW_Z(struct net_link); - l_new_link->link_info = l_link_node_info; - pthread_rwlock_wrlock(&l_pvt_net->uplinks_lock); - l_pvt_net->net_links = dap_list_append(l_pvt_net->net_links, l_new_link); - pthread_rwlock_unlock(&l_pvt_net->uplinks_lock); - } else { - log_it(L_WARNING, "Not found link %s."NODE_ADDR_FP_STR" in the node list or link is already in use", a_net->pub.name, - NODE_ADDR_FP_ARGS(l_link_addr)); - DAP_DELETE(l_link_node_info); - } - DAP_DELETE(l_link_addr); - } + int ret = 0; + dap_chain_net_pvt_t *l_pvt_net = PVT(a_net); + for (size_t i = 0; i < l_pvt_net->seed_aliases_count; i++) { + dap_chain_node_addr_t *l_link_addr = dap_chain_node_alias_find(a_net, l_pvt_net->seed_aliases[i]); + if (!l_link_addr) + continue; + dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(a_net, l_link_addr); + if (!l_link_node_info) + log_it(L_WARNING, "Not found link %s."NODE_ADDR_FP_STR" in the node list", + a_net->pub.name, NODE_ADDR_FP_ARGS(l_link_addr)); + else { + ret = s_net_link_add(a_net, l_link_node_info); + DAP_DELETE(l_link_node_info); + } + DAP_DELETE(l_link_addr); + if (ret > 0) // Maximum links count reached + break; + } } /** @@ -846,9 +853,8 @@ static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_clie log_it(L_NOTICE, "Established connection with %s."NODE_ADDR_FP_STR,l_net->pub.name, NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr)); pthread_rwlock_wrlock(&l_net_pvt->uplinks_lock); - l_net_pvt->links_connected_count++; a_node_client->is_connected = true; - struct json_object *l_json = net_states_json_collect(l_net); + struct json_object *l_json = s_net_states_json_collect(l_net); char l_err_str[128] = { }; dap_snprintf(l_err_str, sizeof(l_err_str) , "Established connection with link " NODE_ADDR_FP_STR @@ -864,23 +870,6 @@ static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_clie } -static void s_node_link_remove(dap_chain_net_pvt_t *a_net_pvt, dap_chain_node_client_t *a_node_client, bool a_rebase) -{ - for (dap_list_t *it = a_net_pvt->net_links; it; it = it->next) { - if (((struct net_link *)it->data)->link == a_node_client) { - if (a_rebase) { - ((struct net_link *)it->data)->link = NULL; - a_net_pvt->net_links = dap_list_append(a_net_pvt->net_links, it->data); - } else { - DAP_DELETE(((struct net_link *)it->data)->link_info); - DAP_DELETE(it->data); - } - a_net_pvt->net_links = dap_list_delete_link(a_net_pvt->net_links, it); - break; - } - } -} - /** * @brief s_node_link_callback_disconnected * @param a_node_client @@ -891,58 +880,40 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t *a_node_cl { dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; dap_chain_net_pvt_t *l_net_pvt = PVT(l_net); - pthread_rwlock_wrlock(&l_net_pvt->uplinks_lock); if (a_node_client->is_connected) { a_node_client->is_connected = false; log_it(L_INFO, "%s."NODE_ADDR_FP_STR" disconnected.%s",l_net->pub.name, NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address), l_net_pvt->state_target == NET_STATE_OFFLINE ? "" : " Replace it..."); - if (l_net_pvt->links_connected_count) - l_net_pvt->links_connected_count--; - else - log_it(L_ERROR, "Links count is zero in disconnected callback, looks smbd decreased it twice or forget to increase on connect/reconnect"); } if (l_net_pvt->state_target != NET_STATE_OFFLINE) { - for (dap_list_t *it = l_net_pvt->net_links; it; it = it->next) { - if (((struct net_link *)it->data)->link == NULL) { // We have a free prepared link - s_node_link_remove(l_net_pvt, a_node_client, l_net_pvt->only_static_links); - dap_chain_node_client_t *l_client_new = dap_chain_net_client_create_n_connect(l_net, - ((struct net_link *)it->data)->link_info); - ((struct net_link *)it->data)->link = l_client_new; - ((struct net_link *)it->data)->client_uuid = l_client_new->uuid; - ((struct net_link *)it->data)->link = dap_chain_net_client_create_n_connect(l_net, - ((struct net_link *)it->data)->link_info); + pthread_rwlock_wrlock(&l_net_pvt->uplinks_lock); + s_net_link_remove(l_net_pvt, a_node_client->uuid, l_net_pvt->only_static_links); + a_node_client->keep_connection = false; + struct net_link *l_link, *l_link_tmp; + HASH_ITER(hh, l_net_pvt->net_links, l_link, l_link_tmp) { + if (l_link->link == NULL) { // We have a free prepared link + dap_chain_node_client_t *l_client_new = dap_chain_net_client_create_n_connect( + l_net, l_link->link_info); + l_link->link = l_client_new; + l_link->client_uuid = l_client_new->uuid; pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); return; } - } - if (l_net_pvt->only_static_links) { - a_node_client->keep_connection = true; - pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); - return; - } - dap_chain_node_info_t *l_link_node_info = NULL; - int l_n = 0; - while(l_n < 100) { - l_n++; - l_link_node_info = s_get_dns_link_from_cfg(l_net); - // If this connect not exists - if(l_link_node_info && !dap_chain_net_link_is_present(l_net, l_link_node_info)) { - break; - } - } - - if (l_link_node_info) { - if(!s_start_dns_request(l_net, l_link_node_info)) { - log_it(L_ERROR, "Can't process node info dns request"); - DAP_DELETE(l_link_node_info); - } else { - s_node_link_remove(l_net_pvt, a_node_client, false); - a_node_client->keep_connection = false; + } + if (!l_net_pvt->only_static_links) { + size_t l_current_links_prepared = HASH_COUNT(l_net_pvt->net_links); + for (size_t i = l_current_links_prepared; i < l_net_pvt->max_links_count ; i++) { + dap_chain_node_info_t *l_link_node_info = s_get_balancer_link_from_cfg(l_net); + if (l_link_node_info) { + if (!s_balancer_start_dns_request(l_net, l_link_node_info, true)) + log_it(L_ERROR, "Can't process node info dns request"); + DAP_DELETE(l_link_node_info); + } } } + pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); } - pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); } /** @@ -957,7 +928,7 @@ static void s_node_link_callback_stage(dap_chain_node_client_t * a_node_client,d if( s_debug_more) log_it(L_INFO,"%s."NODE_ADDR_FP_STR" stage %s",l_net->pub.name,NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr), dap_client_stage_str(a_stage)); - struct json_object *l_json = net_states_json_collect(l_net); + struct json_object *l_json = s_net_states_json_collect(l_net); json_object_object_add(l_json, "errorMessage", json_object_new_string(" ")); dap_notify_server_send_mt(json_object_get_string(l_json)); json_object_put(l_json); @@ -975,7 +946,7 @@ static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client, log_it(L_WARNING, "Can't establish link with %s."NODE_ADDR_FP_STR, l_net? l_net->pub.name : "(unknown)" , NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr)); if (l_net){ - struct json_object *l_json = net_states_json_collect(l_net); + struct json_object *l_json = s_net_states_json_collect(l_net); char l_node_addr_str[INET_ADDRSTRLEN] = {}; inet_ntop(AF_INET, &a_node_client->info->hdr.ext_addr_v4, l_node_addr_str, INET_ADDRSTRLEN); char l_err_str[128] = { }; @@ -998,43 +969,56 @@ static void s_node_link_callback_delete(dap_chain_node_client_t * a_node_client, dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); if (!a_node_client->keep_connection) { - struct json_object *l_json = net_states_json_collect(l_net); + struct json_object *l_json = s_net_states_json_collect(l_net); json_object_object_add(l_json, "errorMessage", json_object_new_string("Link deleted")); dap_notify_server_send_mt(json_object_get_string(l_json)); json_object_put(l_json); return; - } else if (a_node_client->is_connected) { + } else if (a_node_client->is_connected) a_node_client->is_connected = false; - if (l_net_pvt->links_connected_count) - l_net_pvt->links_connected_count--; - else - log_it(L_ERROR, "Links count is zero in delete callback"); - } dap_chain_net_sync_unlock(l_net, a_node_client); pthread_rwlock_wrlock(&l_net_pvt->uplinks_lock); - for ( dap_list_t * it = l_net_pvt->net_links; it; it=it->next ){ - if (((struct net_link *)it->data)->link == a_node_client) { + struct net_link *l_link, *l_link_tmp; + HASH_ITER(hh, l_net_pvt->net_links, l_link, l_link_tmp) { + if (l_link->link == a_node_client) { log_it(L_DEBUG,"Replace node client with new one"); dap_chain_node_client_t *l_client = dap_chain_net_client_create_n_connect(l_net, a_node_client->info); - ((struct net_link *)it->data)->link = l_client; - ((struct net_link *)it->data)->client_uuid = l_client->uuid; + l_link->link = l_client; + l_link->client_uuid = l_client->uuid; } } pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); - struct json_object *l_json = net_states_json_collect(l_net); + struct json_object *l_json = s_net_states_json_collect(l_net); json_object_object_add(l_json, "errorMessage", json_object_new_string("Link restart")); dap_notify_server_send_mt(json_object_get_string(l_json)); json_object_put(l_json); // Then a_node_client will be destroyed in a right way } +static void s_net_links_complete_and_start(dap_chain_net_t *a_net, dap_worker_t *a_worker) +{ + dap_chain_net_pvt_t * l_net_pvt = PVT(a_net); + pthread_rwlock_rdlock(&l_net_pvt->balancer_lock); + if (--l_net_pvt->balancer_link_requests == 0){ // It was the last one + if (HASH_COUNT(l_net_pvt->net_links) < l_net_pvt->max_links_count) + s_fill_links_from_root_aliases(a_net); // Comlete the sentence + pthread_rwlock_wrlock(&l_net_pvt->states_lock); + if (l_net_pvt->state_target != NET_STATE_OFFLINE){ + l_net_pvt->state = NET_STATE_LINKS_CONNECTING; + } + pthread_rwlock_unlock(&l_net_pvt->states_lock); + dap_proc_queue_add_callback_inter(a_worker->proc_queue_input, s_net_states_proc, a_net); + } + pthread_rwlock_unlock(&l_net_pvt->balancer_lock); +} + /** * @brief s_net_state_link_prepare_success * @param a_worker * @param a_node_info * @param a_arg */ -static void s_net_state_link_prepare_success(dap_worker_t * a_worker,dap_chain_node_info_t * a_node_info, void * a_arg) +static void s_net_balancer_link_prepare_success(dap_worker_t * a_worker, dap_chain_node_info_t * a_node_info, void * a_arg) { if(s_debug_more){ char l_node_addr_str[INET_ADDRSTRLEN]={}; @@ -1043,40 +1027,47 @@ static void s_net_state_link_prepare_success(dap_worker_t * a_worker,dap_chain_n l_node_addr_str ); } - struct link_dns_request * l_dns_request = (struct link_dns_request *) a_arg; - dap_chain_net_t * l_net = l_dns_request->net; - dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); - uint64_t l_own_addr = dap_chain_net_get_cur_addr_int(l_net); - if (a_node_info->hdr.address.uint64 != l_own_addr) { - struct net_link *l_new_link = DAP_NEW_Z(struct net_link); - l_new_link->link_info = a_node_info; - pthread_rwlock_wrlock(&l_net_pvt->uplinks_lock); - l_net_pvt->net_links = dap_list_append(l_net_pvt->net_links, l_new_link); - pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); - l_dns_request->tries = 0; - } - pthread_rwlock_rdlock(&l_net_pvt->balancer_lock); - - l_dns_request->tries++; - l_net_pvt->links_dns_requests--; - if (l_net_pvt->links_dns_requests == 0){ // It was the last one - pthread_rwlock_wrlock(&l_net_pvt->states_lock); - if (l_net_pvt->state != NET_STATE_LINKS_CONNECTING){ - l_net_pvt->state = NET_STATE_LINKS_CONNECTING; + struct balancer_link_request *l_balancer_request = (struct balancer_link_request *) a_arg; + dap_chain_net_t * l_net = l_balancer_request->net; + int l_res = s_net_link_add(l_net, a_node_info); + if (l_res < 0) { // Can't add this link + debug_if(s_debug_more, L_DEBUG, "Can't add link "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(a_node_info->hdr.address)); + if (l_balancer_request->link_replace) { + // Just try a new one + dap_chain_node_info_t *l_link_node_info = s_get_balancer_link_from_cfg(l_net); + if (l_link_node_info) { + if (!s_balancer_start_dns_request(l_net, l_link_node_info, true)) + log_it(L_ERROR, "Can't process node info dns request"); + DAP_DELETE(l_link_node_info); + } } - pthread_rwlock_unlock(&l_net_pvt->states_lock); - dap_proc_queue_add_callback_inter( a_worker->proc_queue_input,s_net_states_proc,l_net ); - } - pthread_rwlock_unlock(&l_net_pvt->balancer_lock); - struct json_object *l_json = net_states_json_collect(l_net); - char l_err_str[128] = { }; - dap_snprintf(l_err_str, sizeof(l_err_str) - , "Link " NODE_ADDR_FP_STR " prepared" - , NODE_ADDR_FP_ARGS_S(a_node_info->hdr.address)); - json_object_object_add(l_json, "errorMessage", json_object_new_string(l_err_str)); - dap_notify_server_send_mt(json_object_get_string(l_json)); - json_object_put(l_json); - DAP_DELETE(l_dns_request); + } else if (l_res == 0) { + struct json_object *l_json = s_net_states_json_collect(l_net); + char l_err_str[128] = { }; + dap_snprintf(l_err_str, sizeof(l_err_str) + , "Link " NODE_ADDR_FP_STR " prepared" + , NODE_ADDR_FP_ARGS_S(a_node_info->hdr.address)); + json_object_object_add(l_json, "errorMessage", json_object_new_string(l_err_str)); + dap_notify_server_send_mt(json_object_get_string(l_json)); + json_object_put(l_json); + debug_if(s_debug_more, L_DEBUG, "Link "NODE_ADDR_FP_STR" successfully added", + NODE_ADDR_FP_ARGS_S(a_node_info->hdr.address)); + if (l_balancer_request->link_replace && + s_net_get_active_links_count(l_net) < PVT(l_net)->required_links_count) { + // Auto-start new link + debug_if(s_debug_more, L_DEBUG, "Link "NODE_ADDR_FP_STR" started", + NODE_ADDR_FP_ARGS_S(a_node_info->hdr.address)); + dap_chain_node_client_t *l_client = dap_chain_net_client_create_n_connect(l_net, a_node_info); + struct net_link *l_new_link = s_net_link_find(l_net, a_node_info); + l_new_link->link = l_client; + l_new_link->client_uuid = l_client->uuid; + } + } else + debug_if(s_debug_more, L_DEBUG, "Maximum prepared links reached"); + if (!l_balancer_request->link_replace) + s_net_links_complete_and_start(l_net, a_worker); + DAP_DELETE(l_balancer_request->link_info); + DAP_DELETE(l_balancer_request); } /** @@ -1086,77 +1077,53 @@ static void s_net_state_link_prepare_success(dap_worker_t * a_worker,dap_chain_n * @param a_arg * @param a_errno */ -static void s_net_state_link_prepare_error(dap_worker_t * a_worker,dap_chain_node_info_t * a_node_info, void * a_arg, int a_errno) +static void s_net_balancer_link_prepare_error(dap_worker_t * a_worker, void * a_arg, int a_errno) { - struct link_dns_request * l_dns_request = (struct link_dns_request *) a_arg; - dap_chain_net_t * l_net = l_dns_request->net; - dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); + struct balancer_link_request *l_balancer_request = (struct balancer_link_request *)a_arg; + dap_chain_net_t * l_net = l_balancer_request->net; + dap_chain_node_info_t *l_node_info = l_balancer_request->link_info; char l_node_addr_str[INET_ADDRSTRLEN]={}; - inet_ntop(AF_INET,&a_node_info->hdr.ext_addr_v4,l_node_addr_str, INET_ADDRSTRLEN); - log_it(L_WARNING,"Link " NODE_ADDR_FP_STR " (%s) prepare error with code %d", NODE_ADDR_FP_ARGS_S(a_node_info->hdr.address), - l_node_addr_str,a_errno ); - struct json_object *l_json = net_states_json_collect(l_net); + inet_ntop(AF_INET, &l_node_info->hdr.ext_addr_v4, l_node_addr_str, INET_ADDRSTRLEN); + log_it(L_WARNING, "Link from balancer "NODE_ADDR_FP_STR" (%s) prepare error with code %d", + NODE_ADDR_FP_ARGS_S(l_node_info->hdr.address), l_node_addr_str,a_errno); + struct json_object *l_json = s_net_states_json_collect(l_net); char l_err_str[128] = { }; dap_snprintf(l_err_str, sizeof(l_err_str) - , "Link " NODE_ADDR_FP_STR " [%s] can't be prepared, errno %d" - , NODE_ADDR_FP_ARGS_S(a_node_info->hdr.address), l_node_addr_str, a_errno); + , "Link from balancer " NODE_ADDR_FP_STR " [%s] can't be prepared, errno %d" + , NODE_ADDR_FP_ARGS_S(l_node_info->hdr.address), l_node_addr_str, a_errno); json_object_object_add(l_json, "errorMessage", json_object_new_string(l_err_str)); dap_notify_server_send_mt(json_object_get_string(l_json)); json_object_put(l_json); - pthread_rwlock_wrlock(&l_net_pvt->balancer_lock); - if(l_net_pvt->links_dns_requests) - l_net_pvt->links_dns_requests--; + if (!l_balancer_request->link_replace) + s_net_links_complete_and_start(l_net, a_worker); + DAP_DELETE(l_node_info); + DAP_DELETE(l_balancer_request); +} - log_it(L_DEBUG, "Still %u link dns requests in process",l_net_pvt->links_dns_requests); - bool l_fill_from_root = false; - if(!l_net_pvt->links_dns_requests ){ - pthread_rwlock_wrlock(&l_net_pvt->states_lock); - if( l_net_pvt->state_target != NET_STATE_OFFLINE){ - log_it(L_WARNING,"Can't prepare links via DNS requests. Prefilling links with root addresses"); - l_net_pvt->state = NET_STATE_LINKS_CONNECTING; - l_fill_from_root = true; +void s_net_http_link_prepare_success(void *a_response, size_t a_response_size, void *a_arg) +{ + struct balancer_link_request *l_balancer_request = (struct balancer_link_request *)a_arg; + if (a_response_size != sizeof(dap_chain_node_info_t)) { + log_it(L_ERROR, "Invalid balancer response size %zu (expect %zu)", a_response_size, sizeof(dap_chain_node_info_t)); + dap_chain_net_t * l_net = l_balancer_request->net; + dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); + if (!l_balancer_request->link_replace) { + if (l_net_pvt->balancer_link_requests) + l_net_pvt->balancer_link_requests--; + } else { + dap_chain_node_info_t *l_link_node_info = s_get_balancer_link_from_cfg(l_net); + s_balancer_start_http_request(l_net, l_link_node_info, true); } - pthread_rwlock_unlock(&l_net_pvt->states_lock); + return; } - pthread_rwlock_unlock(&l_net_pvt->balancer_lock); - DAP_DELETE(l_dns_request); - s_fill_links_from_root_aliases(l_net); - dap_proc_queue_add_callback_inter( a_worker->proc_queue_input,s_net_states_proc,l_net ); + s_net_balancer_link_prepare_success(l_balancer_request->worker, (dap_chain_node_info_t *)&a_response, a_arg); } -/** - * @brief Get list of the unique links for the selected net - * @param a_net - * @return list of dap_chain_node_info_t or NULL - */ -static dap_chain_node_info_list_t* s_get_links(dap_chain_net_t *a_net) +void s_net_http_link_prepare_error(int a_error_code, void *a_arg) { - dap_chain_net_pvt_t *l_net_pvt = a_net ? PVT(a_net) : NULL; - if(!l_net_pvt) - return false; - - dap_chain_node_info_list_t *l_node_list = NULL; - // Choose between the allowed number of links and the number of real links - size_t l_max_links_count = MIN(s_max_links_count, s_get_dns_max_links_count_from_cfg(a_net)); - size_t l_cur_links_count = 0; - size_t l_n = 0;// Protect from eternal loop - while(l_cur_links_count < l_max_links_count) { - if(l_n > 1000) // It's a problem with link prepare - break; - l_n++; - dap_chain_node_info_t *l_link_node_info = s_get_dns_link_from_cfg(a_net); - if(!l_link_node_info) - continue; - // Protect against using the same node - if(dap_chain_node_info_list_is_added(l_node_list, l_link_node_info)) { - DAP_DEL_Z(l_link_node_info); - continue; - } - l_node_list = dap_chain_node_info_list_add(l_node_list, l_link_node_info); - l_cur_links_count++; - } - return l_node_list; + struct balancer_link_request *l_balancer_request = (struct balancer_link_request *)a_arg; + s_net_balancer_link_prepare_error(l_balancer_request->worker, a_arg, a_error_code); } /** @@ -1165,41 +1132,92 @@ static dap_chain_node_info_list_t* s_get_links(dap_chain_net_t *a_net) * @param a_link_node_info node parameters * @return list of dap_chain_node_info_t */ -static bool s_start_dns_request(dap_chain_net_t *a_net, dap_chain_node_info_t *a_link_node_info) +static bool s_balancer_start_dns_request(dap_chain_net_t *a_net, dap_chain_node_info_t *a_link_node_info, bool a_link_replace) { + char l_node_addr_str[INET_ADDRSTRLEN] = { }; + inet_ntop(AF_INET, &a_link_node_info->hdr.ext_addr_v4, l_node_addr_str, INET_ADDRSTRLEN); + log_it(L_DEBUG, "Start balancer DNS request to %s", l_node_addr_str); dap_chain_net_pvt_t *l_net_pvt = a_net ? PVT(a_net) : NULL; if(!l_net_pvt) return false; - l_net_pvt->links_dns_requests++; - struct link_dns_request *l_dns_request = DAP_NEW_Z(struct link_dns_request); - l_dns_request->net = a_net; - //l_dns_request->link_id = a_link_id; - if(dap_chain_node_info_dns_request(a_link_node_info->hdr.ext_addr_v4, + struct balancer_link_request *l_balancer_request = DAP_NEW_Z(struct balancer_link_request); + l_balancer_request->net = a_net; + l_balancer_request->link_info = DAP_DUP(a_link_node_info); + l_balancer_request->link_replace = a_link_replace; + if (dap_chain_node_info_dns_request(a_link_node_info->hdr.ext_addr_v4, a_link_node_info->hdr.ext_port, a_net->pub.name, - a_link_node_info, // use it twice - s_net_state_link_prepare_success, - s_net_state_link_prepare_error, - l_dns_request)) { - log_it(L_ERROR, "Can't process node info dns request"); - //l_node_list = dap_chain_node_info_list_del(l_node_list, a_link_node_info); - DAP_DEL_Z(l_dns_request); + s_net_balancer_link_prepare_success, + s_net_balancer_link_prepare_error, + l_balancer_request)) { + log_it(L_ERROR, "Can't process balancer link DNS request"); + DAP_DELETE(l_balancer_request); return false; } + l_net_pvt->balancer_link_requests++; return true; } +static bool s_balancer_start_http_request(dap_chain_net_t *a_net, dap_chain_node_info_t *a_link_node_info, bool a_link_replace) +{ + char l_node_addr_str[INET_ADDRSTRLEN] = { }; + inet_ntop(AF_INET, &a_link_node_info->hdr.ext_addr_v4, l_node_addr_str, INET_ADDRSTRLEN); + log_it(L_DEBUG, "Start balancer HTTP request to %s", l_node_addr_str); + dap_chain_net_pvt_t *l_net_pvt = a_net ? PVT(a_net) : NULL; + if (!l_net_pvt) + return false; + struct balancer_link_request *l_balancer_request = DAP_NEW_Z(struct balancer_link_request); + l_balancer_request->from_http = true; + l_balancer_request->net = a_net; + l_balancer_request->link_info = DAP_DUP(a_link_node_info); + l_balancer_request->worker = dap_events_worker_get_auto(); + l_balancer_request->link_replace = a_link_replace; + const char l_request[] = "/f0intlt4eyl03htogu?version=1,method=random"; + int l_ret = dap_client_http_request(l_balancer_request->worker, l_node_addr_str, a_link_node_info->hdr.ext_port, + "GET", "text/text", DAP_UPLINK_PATH_BALANCER, + l_request, sizeof(l_request), NULL, + s_net_http_link_prepare_success, s_net_http_link_prepare_error, + l_balancer_request, NULL); + if (l_ret) { + l_net_pvt->balancer_link_requests++; + return true; + } + log_it(L_ERROR, "Can't process balancer link HTTP request"); + DAP_DELETE(l_balancer_request); + return false; +} + +static void s_prepare_links_from_balancer(dap_chain_net_t *a_net, bool a_over_http) +{ + // Get list of the unique links for l_net + size_t l_max_links_count = PVT(a_net)->max_links_count * 2; // Not all will be success + for (size_t l_cur_links_count = 0, n = 0; l_cur_links_count < l_max_links_count; n++) { + if (n > 1000) // It's a problem with link prepare + break; + dap_chain_node_info_t *l_link_node_info = s_get_balancer_link_from_cfg(a_net); + if (!l_link_node_info) + continue; + // Start connect to link hubs + if (a_over_http) + s_balancer_start_http_request(a_net, l_link_node_info, false); + else + s_balancer_start_dns_request(a_net, l_link_node_info, false); + DAP_DELETE(l_link_node_info); + l_cur_links_count++; + } +} -struct json_object *net_states_json_collect(dap_chain_net_t * l_net) { +struct json_object *s_net_states_json_collect(dap_chain_net_t *a_net) +{ struct json_object *l_json = json_object_new_object(); json_object_object_add(l_json, "class" , json_object_new_string("NetStates")); - json_object_object_add(l_json, "name" , json_object_new_string((const char*)l_net->pub.name)); - json_object_object_add(l_json, "networkState" , json_object_new_string(dap_chain_net_state_to_str(PVT(l_net)->state))); - json_object_object_add(l_json, "targetState" , json_object_new_string(dap_chain_net_state_to_str(PVT(l_net)->state_target))); - json_object_object_add(l_json, "linksCount" , json_object_new_int(dap_list_length(PVT(l_net)->net_links))); - json_object_object_add(l_json, "activeLinksCount" , json_object_new_int(PVT(l_net)->links_connected_count)); + json_object_object_add(l_json, "name" , json_object_new_string((const char*)a_net->pub.name)); + json_object_object_add(l_json, "networkState" , json_object_new_string(dap_chain_net_state_to_str(PVT(a_net)->state))); + json_object_object_add(l_json, "targetState" , json_object_new_string(dap_chain_net_state_to_str(PVT(a_net)->state_target))); + json_object_object_add(l_json, "linksCount" , json_object_new_int(HASH_COUNT(PVT(a_net)->net_links))); + json_object_object_add(l_json, "activeLinksCount" , json_object_new_int(s_net_get_active_links_count(a_net))); char l_node_addr_str[24] = {'\0'}; - dap_snprintf(l_node_addr_str, sizeof(l_node_addr_str), NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS(PVT(l_net)->node_addr)); + dap_snprintf(l_node_addr_str, sizeof(l_node_addr_str), NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS(PVT(a_net)->node_addr)); json_object_object_add(l_json, "nodeAddress" , json_object_new_string(l_node_addr_str)); return l_json; } @@ -1208,8 +1226,9 @@ struct json_object *net_states_json_collect(dap_chain_net_t * l_net) { * @brief s_net_states_notify * @param l_net */ -static void s_net_states_notify(dap_chain_net_t * l_net) { - struct json_object *l_json = net_states_json_collect(l_net); +static void s_net_states_notify(dap_chain_net_t *a_net) +{ + struct json_object *l_json = s_net_states_json_collect(a_net); json_object_object_add(l_json, "errorMessage", json_object_new_string(" ")); // regular notify has no error dap_notify_server_send_mt(json_object_get_string(l_json)); json_object_put(l_json); @@ -1219,7 +1238,8 @@ static void s_net_states_notify(dap_chain_net_t * l_net) { * @brief s_net_states_proc * @param l_net */ -static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) { +static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) +{ UNUSED(a_thread); bool l_repeat_after_exit = false; // If true - repeat on next iteration of proc thread loop dap_chain_net_t *l_net = (dap_chain_net_t *) a_arg; @@ -1235,37 +1255,34 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) { switch (l_net_pvt->state) { // State OFFLINE where we don't do anything case NET_STATE_OFFLINE: { - l_net_pvt->links_connected_count = 0; - // delete all links - dap_list_t *l_tmp = l_net_pvt->net_links; - while (l_tmp) { - dap_list_t *l_next = l_tmp->next; - dap_chain_node_client_close(((struct net_link *)l_tmp->data)->client_uuid); - DAP_DEL_Z(((struct net_link *)l_tmp->data)->link_info); - l_tmp = l_next; + // delete all links + struct net_link *l_link, *l_link_tmp; + HASH_ITER(hh, l_net_pvt->net_links, l_link, l_link_tmp) { + HASH_DEL(l_net_pvt->net_links, l_link); + dap_chain_node_client_close(l_link->client_uuid); + DAP_DEL_Z(l_link->link_info); } struct downlink *l_downlink, *l_dltmp; HASH_ITER(hh, l_net_pvt->downlinks, l_downlink, l_dltmp) { HASH_DEL(l_net_pvt->downlinks, l_downlink); dap_events_socket_delete_mt(l_downlink->worker->worker, l_downlink->esocket_uuid); } - dap_list_free_full(l_net_pvt->net_links, NULL); - l_net_pvt->net_links = NULL; + l_net_pvt->balancer_link_requests = 0; + l_net_pvt->active_link = NULL; + dap_list_free_full(l_net_pvt->links_queue, NULL); + dap_list_free_full(l_net_pvt->atoms_queue, NULL); + dap_list_free_full(l_net_pvt->records_queue, NULL); if ( l_net_pvt->state_target != NET_STATE_OFFLINE ){ l_net_pvt->state = NET_STATE_LINKS_PREPARE; l_repeat_after_exit = true; - break; } - // disable SYNC_GDB - l_net_pvt->active_link = NULL; - l_net_pvt->flags &= ~F_DAP_CHAIN_NET_GO_SYNC; - l_net_pvt->last_sync = 0; } break; // Prepare links case NET_STATE_LINKS_PREPARE: { log_it(L_NOTICE,"%s.state: NET_STATE_LINKS_PREPARE", l_net->pub.name); s_net_states_notify(l_net); + // Extra links from cfg for (int i = 0; i < l_net_pvt->gdb_sync_nodes_links_count; i++) { if (i >= l_net_pvt->gdb_sync_nodes_addrs_count) break; @@ -1273,92 +1290,51 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) { l_link_node_info->hdr.address.uint64 = l_net_pvt->gdb_sync_nodes_addrs[i].uint64; l_link_node_info->hdr.ext_addr_v4.s_addr = l_net_pvt->gdb_sync_nodes_links_ips[i]; l_link_node_info->hdr.ext_port = l_net_pvt->gdb_sync_nodes_links_ports[i]; - if(!dap_chain_net_link_is_present(l_net, l_link_node_info)){ - struct net_link *l_new_link = DAP_NEW_Z(struct net_link); - l_new_link->link_info = l_link_node_info; - l_net_pvt->net_links = dap_list_append(l_net_pvt->net_links, l_new_link); - } - else{ - DAP_DELETE(l_link_node_info); - } - + s_net_link_add(l_net, l_link_node_info); + DAP_DELETE(l_link_node_info); } - uint64_t l_own_addr = dap_chain_net_get_cur_addr_int(l_net); + // Links from node info structure (currently empty) if (l_net_pvt->node_info) { for (size_t i = 0; i < l_net_pvt->node_info->hdr.links_number; i++) { dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, &l_net_pvt->node_info->links[i]); - if (!l_link_node_info || l_link_node_info->hdr.address.uint64 == l_own_addr) { - continue; // Do not link with self - } - if(!dap_chain_net_link_is_present(l_net, l_link_node_info)) { - struct net_link *l_new_link = DAP_NEW_Z(struct net_link); - l_new_link->link_info = l_link_node_info; - l_net_pvt->net_links = dap_list_append(l_net_pvt->net_links, l_new_link); - if(dap_list_length(l_net_pvt->net_links) >= s_max_links_count) { - - break; - } - } - else { - DAP_DELETE(l_link_node_info); - } + s_net_link_add(l_net, l_link_node_info); + DAP_DEL_Z(l_link_node_info); } } else { log_it(L_WARNING,"No nodeinfo in global_db to prepare links for connecting, try to add links from root servers"); } - if (l_net_pvt->only_static_links) { - if (l_net_pvt->seed_aliases_count) { - // Add other root nodes as synchronization links - s_fill_links_from_root_aliases(l_net); - l_net_pvt->state = NET_STATE_LINKS_CONNECTING; - l_repeat_after_exit = true; - break; - } - } else { - if (!l_net_pvt->seed_aliases_count && ! l_net_pvt->bootstrap_nodes_count){ - log_it(L_ERROR, "No root servers present in configuration file. Can't establish DNS requests"); - if (l_net_pvt->net_links) { // We have other links - l_net_pvt->state = NET_STATE_LINKS_CONNECTING; - l_repeat_after_exit = true; - } - break; - } - // Get DNS request result from root nodes as synchronization links - bool l_sync_fill_root_nodes = false; - if (!l_sync_fill_root_nodes) { - // Get list of the unique links for l_net - dap_chain_node_info_list_t *l_node_list = s_get_links(l_net); - // Start connect to links from list - dap_chain_node_info_list_t *l_node_list_cur = l_node_list; - while(l_node_list_cur) { - dap_chain_node_info_t *l_link_node_info = (dap_chain_node_info_t*)l_node_list_cur->data; - char l_node_addr_str[INET_ADDRSTRLEN] = { }; - inet_ntop(AF_INET, &l_link_node_info->hdr.ext_addr_v4, l_node_addr_str, INET_ADDRSTRLEN); - log_it(L_DEBUG, "Start DNS request to %s", l_node_addr_str); - if(!s_start_dns_request(l_net, l_link_node_info)) - { - DAP_DEL_Z(l_link_node_info); - } - l_node_list_cur = dap_list_next(l_node_list_cur); - } - dap_chain_node_info_list_free(l_node_list); - } else { - log_it(L_ATT, "Not use bootstrap addresses, fill seed nodelist from root aliases"); - s_fill_links_from_root_aliases(l_net); - } + if (!l_net_pvt->seed_aliases_count && ! l_net_pvt->bootstrap_nodes_count){ + log_it(L_ERROR, "No root servers present in configuration file. Can't establish DNS requests"); + if (l_net_pvt->net_links) { // We have other links + l_net_pvt->state = NET_STATE_LINKS_CONNECTING; + l_repeat_after_exit = true; + } + break; + } + // Get DNS request result from root nodes as synchronization links + if (!l_net_pvt->only_static_links) + s_prepare_links_from_balancer(l_net, false); + else { + log_it(L_ATT, "Not use bootstrap addresses, fill seed nodelist from root aliases"); + // Add other root nodes as synchronization links + s_fill_links_from_root_aliases(l_net); + l_net_pvt->state = NET_STATE_LINKS_CONNECTING; + l_repeat_after_exit = true; + break; } } break; case NET_STATE_LINKS_CONNECTING: { log_it(L_INFO, "%s.state: NET_STATE_LINKS_CONNECTING",l_net->pub.name); size_t l_used_links = 0; - for (dap_list_t *l_tmp = l_net_pvt->net_links; l_tmp; l_tmp = dap_list_next(l_tmp)) { - dap_chain_node_info_t *l_link_info = ((struct net_link *)l_tmp->data)->link_info; + struct net_link *l_link, *l_link_tmp; + HASH_ITER(hh, l_net_pvt->net_links, l_link, l_link_tmp) { + dap_chain_node_info_t *l_link_info = l_link->link_info; dap_chain_node_client_t *l_client = dap_chain_net_client_create_n_connect(l_net, l_link_info); - ((struct net_link *)l_tmp->data)->link = l_client; - ((struct net_link *)l_tmp->data)->client_uuid = l_client->uuid; - if (++l_used_links == s_required_links_count) + l_link->link = l_client; + l_link->client_uuid = l_client->uuid; + if (++l_used_links == l_net_pvt->required_links_count) break; } } break; @@ -1396,18 +1372,12 @@ int s_net_list_compare_uuids(const void *a_uuid1, const void *a_uuid2) bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client) { dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); - pthread_rwlock_wrlock(&l_net_pvt->uplinks_lock); - bool l_not_found = dap_chain_net_sync_trylock_nolock(a_net, a_client); - pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); - return l_not_found; -} - -bool dap_chain_net_sync_trylock_nolock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client) { - dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); + int a_err = pthread_rwlock_wrlock(&l_net_pvt->uplinks_lock); bool l_found = false; if (l_net_pvt->active_link) { - for (dap_list_t *l_links = l_net_pvt->net_links; l_links; l_links = dap_list_next(l_links)) { - dap_chain_node_client_t *l_client = ((struct net_link *)l_links->data)->link; + struct net_link *l_link, *l_link_tmp; + HASH_ITER(hh, l_net_pvt->net_links, l_link, l_link_tmp) { + dap_chain_node_client_t *l_client = l_link->link; if (l_client == l_net_pvt->active_link && l_client->state >= NODE_CLIENT_STATE_ESTABLISHED && l_client->state < NODE_CLIENT_STATE_SYNCED && @@ -1424,6 +1394,8 @@ bool dap_chain_net_sync_trylock_nolock(dap_chain_net_t *a_net, dap_chain_node_cl dap_events_socket_uuid_t *l_uuid = DAP_DUP(&a_client->uuid); l_net_pvt->links_queue = dap_list_append(l_net_pvt->links_queue, l_uuid); } + if (a_err != EDEADLK) + pthread_rwlock_unlock(&l_net_pvt->uplinks_lock); return !l_found; } @@ -1438,7 +1410,7 @@ bool dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t * l_net_pvt->active_link = NULL; while (l_net_pvt->active_link == NULL && l_net_pvt->links_queue) { dap_events_socket_uuid_t *l_uuid = l_net_pvt->links_queue->data; - dap_chain_node_sync_status_t l_status = dap_chain_node_client_start_sync(l_uuid, false); + dap_chain_node_sync_status_t l_status = dap_chain_node_client_start_sync(l_uuid); if (l_status != NODE_SYNC_STATUS_WAITING) { DAP_DELETE(l_net_pvt->links_queue->data); dap_list_t *l_to_remove = l_net_pvt->links_queue; @@ -1644,8 +1616,8 @@ void s_set_reply_text_node_status(char **a_str_reply, dap_chain_net_t * a_net){ char* l_sync_current_link_text_block = NULL; if (PVT(a_net)->state != NET_STATE_OFFLINE) l_sync_current_link_text_block = dap_strdup_printf(", active links %u from %u", - PVT(a_net)->links_connected_count, - dap_list_length(PVT(a_net)->net_links)); + s_net_get_active_links_count(a_net), + HASH_COUNT(PVT(a_net)->net_links)); dap_cli_server_cmd_set_reply_text(a_str_reply, "Network \"%s\" has state %s (target state %s)%s%s", a_net->pub.name, c_net_states[PVT(a_net)->state], @@ -1975,12 +1947,13 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) if ( strcmp(l_links_str,"list") == 0 ) { size_t i =0; dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); - pthread_rwlock_rdlock(&l_net_pvt->uplinks_lock); - size_t l_links_count = dap_list_length(l_net_pvt->net_links); + pthread_rwlock_rdlock(&l_net_pvt->uplinks_lock); + size_t l_links_count = HASH_COUNT(l_net_pvt->net_links); dap_string_t *l_reply = dap_string_new(""); dap_string_append_printf(l_reply,"Links %zu:\n", l_links_count); - for (dap_list_t * l_item = l_net_pvt->net_links; l_item; l_item = l_item->next ) { - dap_chain_node_client_t *l_node_client = ((struct net_link *)l_item->data)->link; + struct net_link *l_link, *l_link_tmp; + HASH_ITER(hh, l_net_pvt->net_links, l_link, l_link_tmp) { + dap_chain_node_client_t *l_node_client = l_link->link; if(l_node_client){ dap_chain_node_info_t * l_info = l_node_client->info; char l_ext_addr_v4[INET_ADDRSTRLEN]={}; @@ -2225,7 +2198,7 @@ void s_main_timer_callback(void *a_arg) dap_chain_net_pvt_t *l_net_pvt = PVT(l_net); if (l_net_pvt->state_target == NET_STATE_ONLINE && l_net_pvt->state >= NET_STATE_LINKS_ESTABLISHED && - !l_net_pvt->links_connected_count) // restart network + !s_net_get_active_links_count(l_net)) // restart network dap_chain_net_start(l_net); } @@ -2380,6 +2353,11 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) uint16_t l_bootstrap_nodes_len = 0; char **l_bootstrap_nodes = dap_config_get_array_str(l_cfg, "general", "bootstrap_hostnames", &l_bootstrap_nodes_len); + // maximum number of prepared connections to other nodes + l_net_pvt->max_links_count = dap_config_get_item_int16_default(l_cfg, "general", "max_links", 5); + // required number of active connections to other nodes + l_net_pvt->required_links_count = dap_config_get_item_int16_default(l_cfg, "general", "require_links", 3); + const char * l_node_addr_type = dap_config_get_item_str_default(l_cfg , "general" ,"node_addr_type","auto"); const char * l_node_addr_str = NULL; @@ -2737,7 +2715,7 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) for ( size_t i = 0; i< l_proc_chains_count ; i++){ dap_chain_id_t l_chain_id = {{0}}; if (dap_sscanf( l_proc_chains[i], "0x%16"DAP_UINT64_FORMAT_X, &l_chain_id.uint64) ==1 || - dap_scanf("0x%16"DAP_UINT64_FORMAT_x, &l_chain_id.uint64) == 1) { + dap_sscanf(l_proc_chains[i], "0x%16"DAP_UINT64_FORMAT_x, &l_chain_id.uint64) == 1) { dap_chain_t * l_chain = dap_chain_find_by_id(l_net->pub.id, l_chain_id ); if ( l_chain ){ l_chain->is_datum_pool_proc = true; diff --git a/modules/net/dap_chain_node.c b/modules/net/dap_chain_node.c index 6aa7e06e55..d830a5feb9 100644 --- a/modules/net/dap_chain_node.c +++ b/modules/net/dap_chain_node.c @@ -288,53 +288,3 @@ bool dap_chain_node_mempool_autoproc_init() void dap_chain_node_mempool_autoproc_deinit() { } - - -/** - * @brief Find a_node_info in the a_node_list - */ -bool dap_chain_node_info_list_is_added(dap_chain_node_info_list_t *a_node_list, dap_chain_node_info_t *a_node_info) -{ - while(a_node_list) { - dap_chain_node_info_t *l_node_info = a_node_list->data; - if(dap_chain_node_info_addr_match(l_node_info, a_node_info)) { - return true; - } - a_node_list = dap_list_next(a_node_list); - } - return false; -} - -/** - * @brief Add a_node_info to the a_node_list - */ -dap_chain_node_info_list_t* dap_chain_node_info_list_add(dap_chain_node_info_list_t *a_node_list, dap_chain_node_info_t *a_node_info) -{ - return dap_list_prepend(a_node_list, a_node_info); -} - -/** - * @brief Remove a_node_info from the a_node_list - */ -dap_chain_node_info_list_t* dap_chain_node_info_list_del(dap_chain_node_info_list_t *a_node_list, dap_chain_node_info_t *a_node_info) -{ - dap_chain_node_info_list_t *l_node_link = dap_list_find(a_node_list, a_node_info); - return dap_list_remove_link(a_node_list, l_node_link); -} - -//static void s_chain_node_info_callback_destroyed(void* data) -//{ -// dap_chain_node_info_t *l_link_node_info = (dap_chain_node_info_t*)data; -// DAP_DELETE(l_link_node_info); -//} - -/** - * @brief Free a_node_list - */ - -void dap_chain_node_info_list_free(dap_chain_node_info_list_t *a_node_list) -{ - dap_list_free(a_node_list); - //dap_list_free_full(a_node_list, s_chain_node_info_callback_destroyed); -} - diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 74acc571aa..5e574edbf9 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -221,7 +221,7 @@ static void s_node_client_connected_synchro_start_callback(dap_worker_t *a_worke * @param a_wrlock * @return */ -dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_events_socket_uuid_t *a_uuid, bool a_wrlock) +dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_events_socket_uuid_t *a_uuid) { dap_chain_node_client_handle_t *l_client_found = NULL; HASH_FIND(hh, s_clients, a_uuid, sizeof(*a_uuid), l_client_found); @@ -253,8 +253,7 @@ dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_events_socket_ // If we do nothing - init sync process if (l_ch_chain->state == CHAIN_STATE_IDLE) { - bool l_trylocked = a_wrlock ? dap_chain_net_sync_trylock(l_net, l_node_client) - : dap_chain_net_sync_trylock_nolock(l_net, l_node_client); + bool l_trylocked = dap_chain_net_sync_trylock(l_net, l_node_client); if (l_trylocked) { log_it(L_INFO, "Start synchronization process with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr)); dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; @@ -285,7 +284,7 @@ static bool s_timer_update_states_callback(void *a_arg) { dap_events_socket_uuid_t *l_uuid = (dap_events_socket_uuid_t *)a_arg; assert(l_uuid); - dap_chain_node_sync_status_t l_status = dap_chain_node_client_start_sync(l_uuid, true); + dap_chain_node_sync_status_t l_status = dap_chain_node_client_start_sync(l_uuid); if (l_status == NODE_SYNC_STATUS_MISSING) { DAP_DELETE(l_uuid); return false; diff --git a/modules/net/dap_chain_node_dns_client.c b/modules/net/dap_chain_node_dns_client.c index 12fd553cd7..a6863afee7 100644 --- a/modules/net/dap_chain_node_dns_client.c +++ b/modules/net/dap_chain_node_dns_client.c @@ -37,9 +37,8 @@ struct dns_client struct in_addr addr; uint16_t port; char *name; - dap_dns_buf_t * dns_request; - byte_t * buf; - size_t buf_size; + dap_dns_buf_t dns_request; + byte_t buf[1024]; dap_dns_client_node_info_request_success_callback_t callback_success; dap_dns_client_node_info_request_error_callback_t callback_error; @@ -68,7 +67,7 @@ static void s_dns_client_esocket_read_callback(dap_events_socket_t * a_esocket, size_t l_addr_point = DNS_HEADER_SIZE + strlen(l_dns_client->name) + 2 + 2 * sizeof(uint16_t) + DNS_ANSWER_SIZE - sizeof(uint32_t); if (l_recieved < l_addr_point + sizeof(uint32_t)) { log_it(L_WARNING, "DNS answer incomplete"); - l_dns_client->callback_error(a_esocket->context->worker, l_dns_client->result,l_dns_client->callbacks_arg,EIO ); + l_dns_client->callback_error(a_esocket->worker, l_dns_client->callbacks_arg, EIO); l_dns_client->is_callbacks_called = true; a_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; a_esocket->buf_in_size = a_esocket->buf_out_size = 0; @@ -78,27 +77,23 @@ static void s_dns_client_esocket_read_callback(dap_events_socket_t * a_esocket, int l_answers_count = ntohs(*(uint16_t *)l_cur); if (l_answers_count != 1) { log_it(L_WARNING, "Incorrect DNS answer format"); - l_dns_client->callback_error(a_esocket->context->worker, l_dns_client->result,l_dns_client->callbacks_arg,EINVAL); + l_dns_client->callback_error(a_esocket->context->worker, l_dns_client->callbacks_arg, EINVAL); l_dns_client->is_callbacks_called = true; a_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; a_esocket->buf_in_size = a_esocket->buf_out_size = 0; return; } l_cur = l_buf + l_addr_point; - if ( l_dns_client->result) { - l_dns_client->result->hdr.ext_addr_v4.s_addr = ntohl(*(uint32_t *)l_cur); - } + + dap_chain_node_info_t l_result = {}; + l_result.hdr.ext_addr_v4.s_addr = ntohl(*(uint32_t *)l_cur); l_cur = l_buf + 5 * sizeof(uint16_t); int l_additions_count = ntohs(*(uint16_t *)l_cur); if (l_additions_count == 1) { l_cur = l_buf + l_addr_point + DNS_ANSWER_SIZE; - if (l_dns_client->result) { - l_dns_client->result->hdr.ext_port = ntohs(*(uint16_t *)l_cur); - } + l_result.hdr.ext_port = ntohs(*(uint16_t *)l_cur); l_cur += sizeof(uint16_t); - if (l_dns_client->result) { - l_dns_client->result->hdr.address.uint64 = be64toh(*(uint64_t *)l_cur); - } + l_result.hdr.address.uint64 = be64toh(*(uint64_t *)l_cur); } l_dns_client->callback_success(a_esocket->context->worker,l_dns_client->result,l_dns_client->callbacks_arg); @@ -116,7 +111,7 @@ static void s_dns_client_esocket_error_callback(dap_events_socket_t * a_esocket, { struct dns_client * l_dns_client = (struct dns_client*) a_esocket->_inheritor; log_it(L_ERROR,"DNS client esocket error %d", a_error); - l_dns_client->callback_error(a_esocket->context->worker, l_dns_client->result,l_dns_client->callbacks_arg,a_error); + l_dns_client->callback_error(a_esocket->context->worker, l_dns_client->callbacks_arg, a_error); l_dns_client->is_callbacks_called = true; } @@ -141,7 +136,7 @@ static bool s_dns_client_esocket_timeout_callback(void * a_arg) struct dns_client * l_dns_client = (struct dns_client*) l_es->_inheritor; log_it(L_WARNING,"DNS request timeout, bad network?"); if(! l_dns_client->is_callbacks_called ){ - l_dns_client->callback_error(l_es->context->worker,l_dns_client->result,l_dns_client->callbacks_arg,ETIMEDOUT); + l_dns_client->callback_error(l_es->context->worker, l_dns_client->callbacks_arg, ETIMEDOUT); l_dns_client->is_callbacks_called = true; } dap_events_socket_remove_and_delete_unsafe( l_es, false); @@ -160,10 +155,9 @@ static void s_dns_client_esocket_delete_callback(dap_events_socket_t * a_esocket (void) a_arg; struct dns_client * l_dns_client = (struct dns_client*) a_esocket->_inheritor; if(! l_dns_client->is_callbacks_called ) - l_dns_client->callback_error(a_esocket->context->worker,l_dns_client->result,l_dns_client->callbacks_arg,EBUSY); + l_dns_client->callback_error(a_esocket->context->worker, l_dns_client->callbacks_arg, EBUSY); if(l_dns_client->name) DAP_DELETE(l_dns_client->name); - DAP_DEL_Z(l_dns_client->buf); } /** @@ -174,7 +168,7 @@ static void s_dns_client_esocket_delete_callback(dap_events_socket_t * a_esocket static void s_dns_client_esocket_worker_assign_callback(dap_events_socket_t * a_esocket, dap_worker_t * a_worker) { struct dns_client * l_dns_client = (struct dns_client*) a_esocket->_inheritor; - dap_events_socket_write_unsafe(a_esocket,l_dns_client->dns_request->data, l_dns_client->dns_request->size ); + dap_events_socket_write_unsafe(a_esocket,l_dns_client->dns_request.data, l_dns_client->dns_request.size ); dap_events_socket_uuid_t * l_es_uuid_ptr = DAP_NEW_Z(dap_events_socket_uuid_t); *l_es_uuid_ptr = a_esocket->uuid; @@ -193,7 +187,7 @@ static void s_dns_client_esocket_worker_assign_callback(dap_events_socket_t * a_ * @param a_callback_error * @param a_callbacks_arg */ -int dap_chain_node_info_dns_request(struct in_addr a_addr, uint16_t a_port, char *a_name, dap_chain_node_info_t *a_result, +int dap_chain_node_info_dns_request(struct in_addr a_addr, uint16_t a_port, char *a_name, dap_dns_client_node_info_request_success_callback_t a_callback_success, dap_dns_client_node_info_request_error_callback_t a_callback_error,void * a_callbacks_arg) { @@ -207,47 +201,30 @@ int dap_chain_node_info_dns_request(struct in_addr a_addr, uint16_t a_port, char l_dns_client->callback_success = a_callback_success; l_dns_client->callbacks_arg = a_callbacks_arg; l_dns_client->addr = a_addr; + dap_dns_buf_init(&l_dns_client->dns_request, (char *)l_dns_client->buf); + dap_dns_buf_put_uint16(&l_dns_client->dns_request, rand() % 0xFFFF); // ID - l_dns_client->buf_size = 1024; - l_dns_client->buf = DAP_NEW_Z_SIZE(byte_t,l_dns_client->buf_size); - if (!l_dns_client->buf){ - DAP_DELETE(l_dns_client); - return -2; - } - l_dns_client->dns_request = DAP_NEW_Z(dap_dns_buf_t); - if( ! l_dns_client->dns_request){ - DAP_DELETE(l_dns_client->buf); - DAP_DELETE(l_dns_client); - return -3; - } - l_dns_client->dns_request->data = (char *)l_dns_client->buf; - l_dns_client->result = a_result; - dap_dns_buf_put_uint16(l_dns_client->dns_request, rand() % 0xFFFF); // ID dap_dns_message_flags_t l_flags = {}; - dap_dns_buf_put_uint16(l_dns_client->dns_request, l_flags.val); - dap_dns_buf_put_uint16(l_dns_client->dns_request, 1); // we have only 1 question - dap_dns_buf_put_uint16(l_dns_client->dns_request, 0); - dap_dns_buf_put_uint16(l_dns_client->dns_request, 0); - dap_dns_buf_put_uint16(l_dns_client->dns_request, 0); - size_t l_ptr = 0; + dap_dns_buf_put_uint16(&l_dns_client->dns_request, l_flags.val); + dap_dns_buf_put_uint16(&l_dns_client->dns_request, 1); // we have only 1 question + dap_dns_buf_put_uint16(&l_dns_client->dns_request, 0); + dap_dns_buf_put_uint16(&l_dns_client->dns_request, 0); + dap_dns_buf_put_uint16(&l_dns_client->dns_request, 0); - uint8_t *l_cur = l_dns_client->buf + l_dns_client->dns_request->size; - for (size_t i = 0; i <= strlen(a_name); i++) - { - if (a_name[i] == '.' || a_name[i] == 0) - { + size_t l_ptr = 0; + uint8_t *l_cur = l_dns_client->buf + l_dns_client->dns_request.size; + size_t l_name_len = strlen(a_name); + for (size_t i = 0; i <= l_name_len; i++) + if (a_name[i] == '.' || a_name[i] == 0) { *l_cur++ = i - l_ptr; - for( ; l_ptr < i; l_ptr++) - { - *l_cur++ = a_name[l_ptr]; - } + while (l_ptr < i) + *l_cur++ = a_name[l_ptr++]; l_ptr++; } - } *l_cur++='\0'; - l_dns_client->dns_request->size = l_cur - l_dns_client->buf; - dap_dns_buf_put_uint16(l_dns_client->dns_request, DNS_RECORD_TYPE_A); - dap_dns_buf_put_uint16(l_dns_client->dns_request, DNS_CLASS_TYPE_IN); + l_dns_client->dns_request.size = l_cur - l_dns_client->buf; + dap_dns_buf_put_uint16(&l_dns_client->dns_request, DNS_RECORD_TYPE_A); + dap_dns_buf_put_uint16(&l_dns_client->dns_request, DNS_CLASS_TYPE_IN); dap_events_socket_callbacks_t l_esocket_callbacks={}; diff --git a/modules/net/include/dap_chain_net.h b/modules/net/include/dap_chain_net.h index a1fd95757f..ce2a975d44 100644 --- a/modules/net/include/dap_chain_net.h +++ b/modules/net/include/dap_chain_net.h @@ -134,7 +134,6 @@ bool dap_chain_net_get_flag_sync_from_zero( dap_chain_net_t * a_net); bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client); bool dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client); -bool dap_chain_net_sync_trylock_nolock(dap_chain_net_t* a_net, dap_chain_node_client_t* a_client); dap_chain_net_t * dap_chain_net_by_name( const char * a_name); dap_chain_net_t * dap_chain_net_by_id( dap_chain_net_id_t a_id); diff --git a/modules/net/include/dap_chain_node.h b/modules/net/include/dap_chain_node.h index 99327817e4..aa9f8f9068 100644 --- a/modules/net/include/dap_chain_node.h +++ b/modules/net/include/dap_chain_node.h @@ -101,14 +101,6 @@ typedef struct dap_chain_node_publ{ dap_chain_node_info_t node_info; } DAP_ALIGN_PACKED dap_chain_node_publ_t; - -typedef dap_list_t dap_chain_node_info_list_t; - -#define DAP_CHAIN_NODE_MEMPOOL_INTERVAL 1000 // milliseconds - - - - /** * Calculate size of struct dap_chain_node_info_t */ @@ -169,22 +161,3 @@ bool dap_chain_node_mempool_process(dap_chain_t *a_chain, dap_chain_datum_t *a_d void dap_chain_node_mempool_process_all(dap_chain_t *a_chain); bool dap_chain_node_mempool_autoproc_init(); void dap_chain_node_mempool_autoproc_deinit(); - -/** - * @brief Find a_node_info in the a_node_list - */ -bool dap_chain_node_info_list_is_added(dap_chain_node_info_list_t *a_node_list, dap_chain_node_info_t *a_node_info); - -/** - * @brief Add a_node_info to the a_node_list - */ -dap_chain_node_info_list_t* dap_chain_node_info_list_add(dap_chain_node_info_list_t *a_node_list, dap_chain_node_info_t *a_node_info); -/** - * @brief Remove a_node_info from the a_node_list - */ -dap_chain_node_info_list_t* dap_chain_node_info_list_del(dap_chain_node_info_list_t *a_node_list, dap_chain_node_info_t *a_node_info); - -/** - * @brief Free a_node_list - */ -void dap_chain_node_info_list_free(dap_chain_node_info_list_t *a_node_list); diff --git a/modules/net/include/dap_chain_node_client.h b/modules/net/include/dap_chain_node_client.h index 6dc818ca33..b8d06075cf 100644 --- a/modules/net/include/dap_chain_node_client.h +++ b/modules/net/include/dap_chain_node_client.h @@ -188,7 +188,7 @@ int dap_chain_node_client_set_callbacks(dap_client_t *a_client, uint8_t a_ch_id) int dap_chain_node_client_send_nodelist_req(dap_chain_node_client_t *a_client); -dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_events_socket_uuid_t *l_uuid, bool a_wrlock); +dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_events_socket_uuid_t *l_uuid); static inline const char * dap_chain_node_client_state_to_str( dap_chain_node_client_state_t a_state) { diff --git a/modules/net/include/dap_chain_node_dns_client.h b/modules/net/include/dap_chain_node_dns_client.h index e64bcab355..ca5f719b00 100644 --- a/modules/net/include/dap_chain_node_dns_client.h +++ b/modules/net/include/dap_chain_node_dns_client.h @@ -37,10 +37,10 @@ typedef struct _dap_dns_buf_t { } dap_dns_buf_t; // node info request callbacks -typedef void (*dap_dns_client_node_info_request_success_callback_t) (dap_worker_t * a_worker, dap_chain_node_info_t * , void *); -typedef void (*dap_dns_client_node_info_request_error_callback_t) (dap_worker_t * a_worker, dap_chain_node_info_t * , void *, int); +typedef void (*dap_dns_client_node_info_request_success_callback_t) (dap_worker_t *a_worker, dap_chain_node_info_t *a_result, void *a_arg); +typedef void (*dap_dns_client_node_info_request_error_callback_t) (dap_worker_t *a_worker, void *a_arg, int a_errno); -int dap_chain_node_info_dns_request(struct in_addr a_addr, uint16_t a_port, char *a_name, dap_chain_node_info_t *a_result, +int dap_chain_node_info_dns_request(struct in_addr a_addr, uint16_t a_port, char *a_name, dap_dns_client_node_info_request_success_callback_t a_callback_success, dap_dns_client_node_info_request_error_callback_t a_callback_error,void * a_callback_arg); -- GitLab