diff --git a/CMakeLists.txt b/CMakeLists.txt index 08fce37d85e3ea2bb2beaa8bc14b5f617bc96753..16097f62b5258e20e82f34f257f9635b33081d92 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.10) project(cellframe-sdk C) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "3.3-0") +set(CELLFRAME_SDK_NATIVE_VERSION "3.4-0") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") diff --git a/dap-sdk b/dap-sdk index 40f0762e0de57244851ef19436bda14c56664628..b3ff1c9d8e1d1744c3d58d03c0fe7861115f27e6 160000 --- a/dap-sdk +++ b/dap-sdk @@ -1 +1 @@ -Subproject commit 40f0762e0de57244851ef19436bda14c56664628 +Subproject commit b3ff1c9d8e1d1744c3d58d03c0fe7861115f27e6 diff --git a/modules/channel/chain-net/dap_stream_ch_chain_net.c b/modules/channel/chain-net/dap_stream_ch_chain_net.c index f610520e9f9e422faeb5c6b98078f3f747839d5c..e760a2f5d6e54a231def678493c46c546890fb13 100644 --- a/modules/channel/chain-net/dap_stream_ch_chain_net.c +++ b/modules/channel/chain-net/dap_stream_ch_chain_net.c @@ -165,7 +165,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void* a_arg) switch (l_ch_pkt->hdr.type) { case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ANNOUNCE: assert(!dap_stream_node_addr_is_blank(&a_ch->stream->node)); - dap_chain_net_add_cluster_link(l_net, &a_ch->stream->node); + dap_accounting_downlink_in_net(l_net->pub.id.uint64, &a_ch->stream->node); break; // received ping request - > send pong request case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_PING: diff --git a/modules/net/dap_chain_ledger.c b/modules/net/dap_chain_ledger.c index 3ef15287c15b374a79647749a60861676c39b5fe..6d5ab8d9e6b5f41e0a4b740ddf513bd2ba71e76d 100644 --- a/modules/net/dap_chain_ledger.c +++ b/modules/net/dap_chain_ledger.c @@ -4743,6 +4743,7 @@ void dap_ledger_purge(dap_ledger_t *a_ledger, bool a_preserve_db) { dap_return_if_fail(a_ledger); dap_ledger_private_t *l_ledger_pvt = PVT(a_ledger); + pthread_rwlock_wrlock(&l_ledger_pvt->ledger_rwlock); pthread_rwlock_wrlock(&l_ledger_pvt->tokens_rwlock); pthread_rwlock_wrlock(&l_ledger_pvt->threshold_emissions_rwlock); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 378c735f68723998cfe3657f998e642ee52fb208..6cf246bbf22414b6704e17c05352c94ca7784c68 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -118,6 +118,8 @@ #include "dap_chain_node_net_ban_list.h" #include "dap_chain_cs_esbocs.h" #include "dap_chain_net_voting.h" +#include "dap_global_db_cluster.h" +#include "dap_link_manager.h" #include "dap_stream_cluster.h" #include <stdio.h> @@ -138,14 +140,6 @@ struct balancer_link_request { int link_replace_tries; }; -struct net_link { - uint64_t uplink_ip; - dap_chain_node_info_t *link_info; - dap_chain_node_client_t *link; - dap_timerfd_t *delay_timer; - UT_hash_handle hh; -}; - struct block_reward { uint64_t block_number; uint256_t reward; @@ -166,16 +160,7 @@ typedef struct dap_chain_net_pvt{ atomic_uint balancer_link_requests; bool balancer_http; - //Active synchronizing link - dap_chain_node_client_t *active_link; - dap_list_t *links_queue; // Links waiting for sync - - struct net_link *net_links; // Links HT bool only_static_links; - uint16_t required_links_count; - uint16_t max_links_count; - uint16_t reconnect_delay; // sec - bool load_mode; uint16_t permanent_links_count; @@ -187,12 +172,12 @@ typedef struct dap_chain_net_pvt{ uint16_t seed_nodes_count; struct sockaddr_in *seed_nodes_ipv4; struct sockaddr_in6 *seed_nodes_ipv6; // TODO + dap_stream_node_addr_t *seed_nodes_addrs; _Atomic(dap_chain_net_state_t) state, state_target; uint16_t acl_idx; // Main loop timer dap_interval_timer_t main_timer; - pthread_mutex_t uplinks_mutex; //Global DB clusters for different access groups. Notification with cluster contents changing dap_global_db_cluster_t *mempool_clusters; // List of chains mempools @@ -210,8 +195,8 @@ typedef struct dap_chain_net_item{ UT_hash_handle hh, hh2; } dap_chain_net_item_t; -#define PVT(a) ( (dap_chain_net_pvt_t *) (void*) a->pvt ) -#define PVT_S(a) ( (dap_chain_net_pvt_t *) (void*) a.pvt ) +#define PVT(a) ((dap_chain_net_pvt_t *)a->pvt) +#define PVT_S(a) ((dap_chain_net_pvt_t *)a.pvt) static dap_chain_net_item_t *s_net_items = NULL, *s_net_ids = NULL; @@ -231,18 +216,20 @@ static inline const char * dap_chain_net_state_to_str(dap_chain_net_state_t a_st } // Node link callbacks -static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_client, void * a_arg); -static void s_node_link_callback_disconnected(dap_chain_node_client_t * a_node_client, void * a_arg); static void s_node_link_callback_stage(dap_chain_node_client_t * a_node_client,dap_client_stage_t a_stage, void * a_arg); -static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client, int a_error, void * a_arg); -static void s_node_link_callback_delete(dap_chain_node_client_t * a_node_client, void * a_arg); - -static const dap_chain_node_client_callbacks_t s_node_link_callbacks = { - .connected = s_node_link_callback_connected, - .disconnected = s_node_link_callback_disconnected, - .stage = s_node_link_callback_stage, - .error = s_node_link_callback_error, - .delete = s_node_link_callback_delete + +static void s_link_manager_callback_connected(dap_link_t *a_link, uint64_t a_net_id); +static void s_link_manager_callback_error(dap_link_t *a_link, uint64_t a_net_id, int a_error); +static void s_link_manager_callback_disconnected(dap_link_t *a_link, uint64_t a_net_id, int a_links_count); +static int s_link_manager_fill_net_info(dap_link_t *a_link); +static void s_link_manager_link_request(uint64_t a_net_id); + +static const dap_link_manager_callbacks_t s_link_manager_callbacks = { + .connected = s_link_manager_callback_connected, + .disconnected = s_link_manager_callback_disconnected, + .error = s_link_manager_callback_error, + .fill_net_info = s_link_manager_fill_net_info, + .link_request = s_link_manager_link_request, }; // State machine switchs here @@ -260,9 +247,9 @@ static int s_net_try_online(dap_chain_net_t *a_net); static int s_cli_net(int argc, char ** argv, void **a_str_reply); static uint8_t *s_net_set_acl(dap_chain_hash_fast_t *a_pkey_hash); -static void s_prepare_links_from_balancer(dap_chain_net_t *a_net); static bool s_new_balancer_link_request(dap_chain_net_t *a_net, int a_link_replace_tries); + /** * @brief * init network settings from cellrame-node.cfg file @@ -377,7 +364,7 @@ char *dap_chain_net_get_gdb_group_acl(dap_chain_net_t *a_net) * @param a_new_state dap_chain_net_state_t new network state * @return int */ -int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_new_state) +int dap_chain_net_state_go_to(dap_chain_net_t *a_net, dap_chain_net_state_t a_new_state) { if (PVT(a_net)->load_mode) { log_it(L_ERROR, "Can't change state of loading network '%s'", a_net->pub.name); @@ -389,11 +376,14 @@ int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_n } PVT(a_net)->state_target = a_new_state; //PVT(a_net)->flags |= F_DAP_CHAIN_NET_SYNC_FROM_ZERO; // TODO set this flag according to -mode argument from command line - if(a_new_state == NET_STATE_ONLINE) + if(a_new_state == NET_STATE_ONLINE) { dap_chain_esbocs_start_timer(a_net->pub.id); + dap_link_manager_set_net_status(a_net->pub.id.uint64, true); + } if (a_new_state == NET_STATE_OFFLINE){ dap_chain_esbocs_stop_timer(a_net->pub.id); + dap_link_manager_set_net_status(a_net->pub.id.uint64, false); return 0; } @@ -408,263 +398,93 @@ dap_chain_net_state_t dap_chain_net_get_target_state(dap_chain_net_t *a_net) dap_chain_node_info_t *dap_chain_net_balancer_link_from_cfg(dap_chain_net_t *a_net) { +// sanity check + dap_return_val_if_pass_err(!a_net || !PVT(a_net) || !PVT(a_net)->seed_nodes_count, NULL, "No valid balancer links found"); +// memory alloc + dap_chain_node_info_t *l_link_node_info = NULL; + DAP_NEW_Z_RET_VAL(l_link_node_info, dap_chain_node_info_t, NULL, NULL); +// func work dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); - struct in_addr l_addr = { }; - uint16_t i, l_port = 0; - if (l_net_pvt->seed_nodes_count) { - i = dap_random_uint16() % l_net_pvt->seed_nodes_count; - l_addr = l_net_pvt->seed_nodes_ipv4[i].sin_addr; - l_port = l_net_pvt->seed_nodes_ipv4[i].sin_port; - } else { - log_it(L_ERROR, "No valid balancer links found"); - return NULL; - } - dap_chain_node_info_t *l_link_node_info = DAP_NEW_Z(dap_chain_node_info_t); - if(! l_link_node_info){ - log_it(L_CRITICAL,"Can't allocate memory for node link info"); - return NULL; - } - l_link_node_info->hdr.ext_addr_v4 = l_addr; - l_link_node_info->hdr.ext_port = l_port; + uint16_t i = dap_random_uint16() % l_net_pvt->seed_nodes_count; + l_link_node_info->hdr.ext_addr_v4 = l_net_pvt->seed_nodes_ipv4[i].sin_addr; + l_link_node_info->hdr.ext_port = l_net_pvt->seed_nodes_ipv4[i].sin_port; return l_link_node_info; } -void dap_chain_net_add_cluster_link(dap_chain_net_t *a_net, dap_stream_node_addr_t *a_node_addr) -{ - dap_return_if_fail(a_net && a_node_addr); - dap_cluster_t *l_links_cluster = dap_cluster_by_mnemonim(a_net->pub.name); - if (l_links_cluster) - dap_cluster_member_add(l_links_cluster, a_node_addr, 0, NULL); - else - log_it(L_ERROR, "Not found links cluster for net %s", a_net->pub.name); -} - -/** - * @brief Check if the current link is already present or not - * - * @param a_net Network - * @param a_link_node_info Node info - */ -static struct net_link *s_net_link_find(dap_chain_net_t *a_net, dap_chain_node_info_t *a_link_node_info) -{ - uint64_t l_addr = a_link_node_info->hdr.ext_addr_v4.s_addr; - struct net_link *l_present; - pthread_mutex_lock(&PVT(a_net)->uplinks_mutex); - HASH_FIND(hh, PVT(a_net)->net_links, &l_addr, sizeof(l_addr), l_present); - pthread_mutex_unlock(&PVT(a_net)->uplinks_mutex); - return l_present; -} - -static int s_net_link_add(dap_chain_net_t *a_net, dap_chain_node_info_t *a_link_node_info) -{ - if (!a_link_node_info) - return -1; - dap_chain_net_pvt_t *l_pvt_net = PVT(a_net); - pthread_mutex_lock(&l_pvt_net->uplinks_mutex); - if (HASH_COUNT(l_pvt_net->net_links) >= PVT(a_net)->max_links_count) { - pthread_mutex_unlock(&l_pvt_net->uplinks_mutex); - return 1; - } - uint64_t l_own_addr = dap_chain_net_get_cur_addr_int(a_net); - if (a_link_node_info->hdr.address.uint64 == l_own_addr) { - pthread_mutex_unlock(&l_pvt_net->uplinks_mutex); - return -2; - } - uint64_t l_addr = a_link_node_info->hdr.ext_addr_v4.s_addr; - struct net_link *l_new_link; - HASH_FIND(hh, l_pvt_net->net_links, &l_addr, sizeof(l_addr), l_new_link); - if (l_new_link) { - pthread_mutex_unlock(&l_pvt_net->uplinks_mutex); - return -3; - } - l_new_link = DAP_NEW_Z(struct net_link); - if (!l_new_link) { - log_it(L_CRITICAL, "Memory allocation error"); - pthread_mutex_unlock(&PVT(a_net)->uplinks_mutex); - return -4; - } - l_new_link->link_info = DAP_DUP(a_link_node_info); - l_new_link->uplink_ip = a_link_node_info->hdr.ext_addr_v4.s_addr; - HASH_ADD(hh, l_pvt_net->net_links, uplink_ip, sizeof(l_new_link->uplink_ip), l_new_link); - pthread_mutex_unlock(&l_pvt_net->uplinks_mutex); - return 0; -} - -static void s_net_link_remove(dap_chain_net_pvt_t *a_net_pvt, dap_chain_node_client_t *a_link, bool a_rebase) -{ - struct net_link *l_link = NULL, *l_link_tmp = NULL, *l_link_found = NULL; - HASH_ITER(hh, a_net_pvt->net_links, l_link, l_link_tmp) { - if (l_link->link == a_link) { - l_link_found = l_link; - break; - } - } - if (!l_link_found) { - log_it(L_WARNING, "Can't find link %p to remove it from links HT", a_link); - return; - } - HASH_DEL(a_net_pvt->net_links, l_link_found); - if (l_link_found->delay_timer) { - dap_timerfd_delete_mt(l_link_found->delay_timer->worker, l_link_found->delay_timer->esocket_uuid); - l_link_found->delay_timer = NULL; - } - dap_chain_node_client_t *l_client = l_link_found->link; - a_net_pvt->links_queue = dap_list_remove_all(a_net_pvt->links_queue, l_client); - if (a_rebase) { - l_link_found->link = NULL; - // Add it to the list end - HASH_ADD(hh, a_net_pvt->net_links, uplink_ip, sizeof(l_link_found->uplink_ip), l_link_found); - } else { - DAP_DEL_Z(l_link_found->link_info); - DAP_DELETE(l_link_found); - } -} - -static size_t s_net_get_active_links_count(dap_chain_net_t * a_net) -{ - int l_ret = 0; - struct net_link *l_link, *l_link_tmp; - HASH_ITER(hh, PVT(a_net)->net_links, l_link, l_link_tmp) - if (l_link->link) - l_ret++; - return l_ret; -} - -static struct net_link *s_get_free_link(dap_chain_net_t *a_net) -{ - struct net_link *l_link, *l_link_tmp; - HASH_ITER(hh, PVT(a_net)->net_links, l_link, l_link_tmp) { - if (l_link->link == NULL) // We have a free prepared link - return l_link; - } - return NULL; -} - -static bool s_net_link_callback_connect_delayed(void *a_arg) -{ - struct net_link *l_link = a_arg; - dap_chain_node_client_t *l_client = l_link->link; - log_it(L_MSG, "Connecting to link "NODE_ADDR_FP_STR" [%s]", - NODE_ADDR_FP_ARGS_S(l_client->info->hdr.address), inet_ntoa(l_client->info->hdr.ext_addr_v4)); - dap_chain_node_client_connect(l_client, "CGND"); - l_link->delay_timer = NULL; - return false; -} - -static bool s_net_link_start(dap_chain_net_t *a_net, struct net_link *a_link, uint16_t a_delay) -{ - assert(a_net && a_link); - dap_chain_node_info_t *l_link_info = a_link->link_info; - dap_chain_node_client_t *l_client = dap_chain_node_client_create(a_net, l_link_info, &s_node_link_callbacks, a_net); - if (l_client) - l_client->keep_connection = true; - else - return false; - a_link->link = l_client; - if (a_delay) { - a_link->delay_timer = dap_timerfd_start(a_delay * 1000, s_net_link_callback_connect_delayed, a_link); - return true; - } - log_it(L_MSG, "Connecting to link "NODE_ADDR_FP_STR" [%s]", NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address), inet_ntoa(l_link_info->hdr.ext_addr_v4)); - return dap_chain_node_client_connect(l_client, "CGND"); -} - /** * @brief s_fill_links_from_root_aliases * @param a_net */ static void s_fill_links_from_root_aliases(dap_chain_net_t *a_net) { +// sanity check + dap_return_if_pass(!a_net); +// func work dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); + dap_link_t *l_link = NULL; for (size_t i = 0; i < l_net_pvt->seed_nodes_count; i++) { - dap_chain_node_info_t l_link_node_info = {}; - l_link_node_info.hdr.ext_addr_v4 = l_net_pvt->seed_nodes_ipv4[i].sin_addr; - l_link_node_info.hdr.ext_port = l_net_pvt->seed_nodes_ipv4[i].sin_port; - if (PVT(a_net)->seeds_is_poas) - l_link_node_info.hdr.address = l_net_pvt->poa_nodes_addrs[i]; - if (s_net_link_add(a_net, &l_link_node_info) > 0) // Maximum links count reached - break; + //if (PVT(a_net)->seeds_is_poas) + // l_link_node_info.hdr.address = l_net_pvt->poa_nodes_addrs[i]; + dap_link_t *l_link = dap_link_manager_link_create_or_update(&l_net_pvt->seed_nodes_addrs[i], + &l_net_pvt->seed_nodes_ipv4[i].sin_addr, &l_net_pvt->seed_nodes_ipv6[i].sin6_addr, l_net_pvt->seed_nodes_ipv4[i].sin_port); + if (!l_link) + continue; + if (dap_link_manager_link_add(a_net->pub.id.uint64, l_link)) + DAP_DELETE(l_link); } } /** - * @brief s_node_link_callback_connected + * @brief s_link_manager_callback_connected * @param a_node_client * @param a_arg */ -static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_client, void * a_arg) +static void s_link_manager_callback_connected(dap_link_t *a_link, uint64_t a_net_id) { - dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; - dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); - - a_node_client->stream_worker = dap_client_get_stream_worker(a_node_client->client); - if(a_node_client->stream_worker == NULL){ - log_it(L_ERROR, "Stream worker is NULL in connected() callback, do nothing"); - a_node_client->state = NODE_CLIENT_STATE_ERROR; - return; - } +// sanity check + dap_return_if_pass(!a_link || !a_net_id); +// func work + dap_chain_net_t * l_net = dap_chain_net_by_id((dap_chain_net_id_t){.uint64 = a_net_id}); + dap_chain_net_pvt_t *l_net_pvt = PVT(l_net); - a_node_client->resync_gdb = l_net_pvt->flags & F_DAP_CHAIN_NET_SYNC_FROM_ZERO; - if ( s_debug_more ) log_it(L_NOTICE, "Established connection with %s."NODE_ADDR_FP_STR,l_net->pub.name, - NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr)); - a_node_client->is_connected = true; - dap_stream_t *l_stream = dap_client_get_stream(a_node_client->client); - assert(l_stream); - dap_chain_net_add_cluster_link(l_net, &l_stream->node); + NODE_ADDR_FP_ARGS_S(a_link->node_addr)); + struct json_object *l_json = s_net_states_json_collect(l_net); char l_err_str[128] = { }; snprintf(l_err_str, sizeof(l_err_str) , "Established connection with link " NODE_ADDR_FP_STR - , NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address)); + , NODE_ADDR_FP_ARGS_S(a_link->node_addr)); json_object_object_add(l_json, "errorMessage", json_object_new_string(l_err_str)); dap_notify_server_send_mt(json_object_get_string(l_json)); json_object_put(l_json); if(l_net_pvt->state == NET_STATE_LINKS_CONNECTING ){ l_net_pvt->state = NET_STATE_LINKS_ESTABLISHED; - dap_proc_thread_callback_add(a_node_client->stream_worker->worker->proc_queue_input,s_net_states_proc,l_net ); } + dap_stream_ch_chain_net_pkt_hdr_t l_announce = { .version = DAP_STREAM_CH_CHAIN_NET_PKT_VERSION, + .net_id = l_net->pub.id }; + dap_client_write_unsafe(a_link->client, 'N', DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ANNOUNCE, + &l_announce, sizeof(l_announce)); } /** - * @brief s_node_link_callback_disconnected + * @brief s_link_manager_callback_disconnected * @param a_node_client * @param a_arg */ -static void s_node_link_callback_disconnected(dap_chain_node_client_t *a_node_client, void *a_arg) +static void s_link_manager_callback_disconnected(dap_link_t *a_link, uint64_t a_net_id, int a_links_count) { - dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; +// sanity check + dap_return_if_pass(!a_link); +// func work + dap_chain_net_t *l_net = dap_chain_net_by_id((dap_chain_net_id_t){.uint64 = a_net_id}); dap_chain_net_pvt_t *l_net_pvt = PVT(l_net); - if (a_node_client->is_connected) { - a_node_client->is_connected = false; - log_it(L_INFO, "%s."NODE_ADDR_FP_STR" disconnected.%s",l_net->pub.name, - NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address), - l_net_pvt->state_target == NET_STATE_OFFLINE ? "" : " Replace it..."); - } - if (l_net_pvt->state_target != NET_STATE_OFFLINE) { - pthread_mutex_lock(&l_net_pvt->uplinks_mutex); - s_net_link_remove(l_net_pvt, a_node_client, l_net_pvt->only_static_links); - //char *l_key = dap_chain_node_addr_to_hash_str(&a_node_client->info->hdr.address); - //dap_global_db_del_sync(l_net->pub.gdb_nodes, l_key); - //DAP_DELETE(l_key); - - a_node_client->keep_connection = false; - a_node_client->callbacks.delete = NULL; - dap_chain_node_client_close_mt(a_node_client); // Remove it on next context iteration - struct net_link *l_free_link = s_get_free_link(l_net); - if (l_free_link) { - pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); - s_net_link_start(l_net, l_free_link, l_net_pvt->reconnect_delay); - return; - } - size_t l_current_links_prepared = HASH_COUNT(l_net_pvt->net_links); - pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); - if (!l_net_pvt->only_static_links) { - for (size_t i = l_current_links_prepared; i < l_net_pvt->max_links_count ; i++) { - s_new_balancer_link_request(l_net, 0); - } - } + log_it(L_INFO, "%s."NODE_ADDR_FP_STR" disconnected.%s", l_net ? l_net->pub.name : "(unknown)" , + NODE_ADDR_FP_ARGS_S(a_link->node_addr), + l_net_pvt->state_target == NET_STATE_OFFLINE ? "" : " Replace it..."); + if(!a_links_count && l_net_pvt->state == NET_STATE_ONLINE ){ + l_net_pvt->state = NET_STATE_LINKS_PREPARE; } } @@ -687,77 +507,46 @@ static void s_node_link_callback_stage(dap_chain_node_client_t * a_node_client,d } /** - * @brief s_node_link_callback_error + * @brief s_link_manager_callback_error * @param a_node_client * @param a_error * @param a_arg */ -static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client, int a_error, void * a_arg) +static void s_link_manager_callback_error(dap_link_t *a_link, uint64_t a_net_id, int a_error) { - dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; - log_it(L_WARNING, "Can't establish link with %s."NODE_ADDR_FP_STR, l_net? l_net->pub.name : "(unknown)" , - NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr)); +// sanity check + dap_return_if_pass(!a_link); +// func work + dap_chain_net_t *l_net = dap_chain_net_by_id((dap_chain_net_id_t){.uint64 = a_net_id}); + dap_chain_net_pvt_t *l_net_pvt = PVT(l_net); + log_it(L_WARNING, "Can't establish link with %s."NODE_ADDR_FP_STR, l_net ? l_net->pub.name : "(unknown)" , + NODE_ADDR_FP_ARGS_S(a_link->node_addr)); if (l_net){ struct json_object *l_json = s_net_states_json_collect(l_net); - char l_node_addr_str[INET_ADDRSTRLEN] = {}; - inet_ntop(AF_INET, &a_node_client->info->hdr.ext_addr_v4, l_node_addr_str, INET_ADDRSTRLEN); char l_err_str[128] = { }; snprintf(l_err_str, sizeof(l_err_str) , "Link " NODE_ADDR_FP_STR " [%s] can't be established, errno %d" - , NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address), l_node_addr_str, a_error); + , NODE_ADDR_FP_ARGS_S(a_link->node_addr), a_link->client->uplink_addr, a_error); json_object_object_add(l_json, "errorMessage", json_object_new_string(l_err_str)); dap_notify_server_send_mt(json_object_get_string(l_json)); json_object_put(l_json); } } -/** - * @brief s_node_link_callback_delete - * @param a_node_client - * @param a_arg - */ -static void s_node_link_callback_delete(dap_chain_node_client_t * a_node_client, void * a_arg) -{ - dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; - dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); - if (!a_node_client->keep_connection) { - struct json_object *l_json = s_net_states_json_collect(l_net); - json_object_object_add(l_json, "errorMessage", json_object_new_string("Link deleted")); - dap_notify_server_send_mt(json_object_get_string(l_json)); - json_object_put(l_json); - return; - } else if (a_node_client->is_connected) - a_node_client->is_connected = false; - dap_chain_net_sync_unlock(l_net, a_node_client); - pthread_mutex_lock(&l_net_pvt->uplinks_mutex); - struct net_link *l_link, *l_link_tmp; - HASH_ITER(hh, l_net_pvt->net_links, l_link, l_link_tmp) { - if (l_link->link == a_node_client) { - log_it(L_DEBUG, "Replace node client with new one with %d sec", l_net_pvt->reconnect_delay); - s_net_link_start(l_net, l_link, l_net_pvt->reconnect_delay); - } - } - pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); - struct json_object *l_json = s_net_states_json_collect(l_net); - json_object_object_add(l_json, "errorMessage", json_object_new_string("Link restart")); - dap_notify_server_send_mt(json_object_get_string(l_json)); - json_object_put(l_json); - // Then a_node_client will be destroyed in a right way -} - static void s_net_links_complete_and_start(dap_chain_net_t *a_net, dap_worker_t *a_worker) { dap_chain_net_pvt_t * l_net_pvt = PVT(a_net); if (--l_net_pvt->balancer_link_requests == 0){ // It was the last one // No links obtained from DNS - if (HASH_COUNT(l_net_pvt->net_links) == 0 && !l_net_pvt->balancer_http) { + size_t l_links_count = dap_link_manager_links_count(a_net->pub.id.uint64); + if (l_links_count == 0 && !l_net_pvt->balancer_http) { // Try to get links from HTTP balancer l_net_pvt->balancer_http = true; - s_prepare_links_from_balancer(a_net); + s_new_balancer_link_request(a_net, 0); return; } - if (HASH_COUNT(l_net_pvt->net_links) < l_net_pvt->max_links_count) - s_fill_links_from_root_aliases(a_net); // Comlete the sentence + // if (l_links_count < l_net_pvt->max_links_count) + // s_fill_links_from_root_aliases(a_net); // Comlete the sentence if (l_net_pvt->state_target != NET_STATE_OFFLINE){ l_net_pvt->state = NET_STATE_LINKS_CONNECTING; } @@ -776,7 +565,7 @@ static void s_net_balancer_link_prepare_success(dap_worker_t * a_worker, dap_cha if(s_debug_more){ char l_node_addr_str[INET_ADDRSTRLEN]={}; dap_chain_node_info_t * l_node_info = (dap_chain_node_info_t *)a_link_full_node_list->nodes_info; - for(size_t i=0;i<a_link_full_node_list->count_node;i++){ + for(size_t i = 0; i < a_link_full_node_list->count_node; ++i){ inet_ntop(AF_INET,&(l_node_info + i)->hdr.ext_addr_v4,l_node_addr_str, INET_ADDRSTRLEN); log_it(L_DEBUG,"Link " NODE_ADDR_FP_STR " (%s) prepare success", NODE_ADDR_FP_ARGS_S((l_node_info + i)->hdr.address), l_node_addr_str ); @@ -786,59 +575,35 @@ static void s_net_balancer_link_prepare_success(dap_worker_t * a_worker, dap_cha struct balancer_link_request *l_balancer_request = (struct balancer_link_request *) a_arg; dap_chain_net_t * l_net = l_balancer_request->net; dap_chain_node_info_t * l_node_info = (dap_chain_node_info_t *)a_link_full_node_list->nodes_info; - int l_res = 0; - size_t i = 0; char l_err_str[128] = { }; struct json_object *l_json; - while(!l_res){ - if(i >= a_link_full_node_list->count_node) - break; - l_res = s_net_link_add(l_net, l_node_info + i); - switch (l_res) { - case 0: - l_json = s_net_states_json_collect(l_net); - - snprintf(l_err_str, sizeof(l_err_str) - , "Link " NODE_ADDR_FP_STR " prepared" - , NODE_ADDR_FP_ARGS_S((l_node_info + i)->hdr.address)); - json_object_object_add(l_json, "errorMessage", json_object_new_string(l_err_str)); - dap_notify_server_send_mt(json_object_get_string(l_json)); - json_object_put(l_json); - debug_if(s_debug_more, L_DEBUG, "Link "NODE_ADDR_FP_STR" successfully added", - NODE_ADDR_FP_ARGS_S((l_node_info + i)->hdr.address)); - break; - case 1: - debug_if(s_debug_more, L_DEBUG, "Maximum prepared links reached"); - break; - case -1: - - break; - default: - break; - } - i++; - } - struct net_link *l_free_link = NULL; - bool need_link = false; - pthread_mutex_lock(&PVT(l_net)->uplinks_mutex); - if (l_balancer_request->link_replace_tries && - s_net_get_active_links_count(l_net) < PVT(l_net)->required_links_count) { - // Auto-start new link - dap_chain_net_state_t l_net_state = PVT(l_net)->state_target; - if (l_net_state != NET_STATE_OFFLINE) { - l_free_link = s_get_free_link(l_net); - need_link = true; + for(size_t i = 0; i < a_link_full_node_list->count_node; ++i){ + dap_link_t *l_link = dap_link_manager_link_create_or_update(&l_node_info[i].hdr.address, + &l_node_info[i].hdr.ext_addr_v4, &l_node_info[i].hdr.ext_addr_v6, l_node_info[i].hdr.ext_port); + if (!l_link) + continue; + switch (dap_link_manager_link_add(l_net->pub.id.uint64, l_link)) { + case 0: + l_json = s_net_states_json_collect(l_net); + + snprintf(l_err_str, sizeof(l_err_str) + , "Link " NODE_ADDR_FP_STR " prepared" + , NODE_ADDR_FP_ARGS_S((l_node_info + i)->hdr.address)); + json_object_object_add(l_json, "errorMessage", json_object_new_string(l_err_str)); + dap_notify_server_send_mt(json_object_get_string(l_json)); + json_object_put(l_json); + debug_if(s_debug_more, L_DEBUG, "Link "NODE_ADDR_FP_STR" successfully added", + NODE_ADDR_FP_ARGS_S((l_node_info + i)->hdr.address)); + break; + case 1: + debug_if(s_debug_more, L_DEBUG, "Maximum prepared links reached"); + break; + case -1: + break; + default: + break; } } - pthread_mutex_unlock(&PVT(l_net)->uplinks_mutex); - - // Auto-start new link - if(need_link){ - if (l_free_link) - s_net_link_start(l_net, l_free_link, PVT(l_net)->reconnect_delay); - else - s_new_balancer_link_request(l_net, l_balancer_request->link_replace_tries); - } if (!l_balancer_request->link_replace_tries) s_net_links_complete_and_start(l_net, a_worker); @@ -888,12 +653,12 @@ void s_net_http_link_prepare_success(void *a_response, size_t a_response_size, v size_t l_response_size_need = sizeof(dap_chain_net_node_balancer_t) + (sizeof(dap_chain_node_info_t) * l_link_full_node_list->count_node); log_it(L_WARNING, "Get data size - %lu need - (%lu)", a_response_size, l_response_size_need); - if (a_response_size != l_response_size_need) { - log_it(L_ERROR, "Invalid balancer response size %lu (expected %lu)", a_response_size, l_response_size_need); - s_new_balancer_link_request(l_balancer_request->net, l_balancer_request->link_replace_tries); - DAP_DELETE(l_balancer_request); - return; - } + // if (a_response_size != l_response_size_need) { + // log_it(L_ERROR, "Invalid balancer response size %lu (expected %lu)", a_response_size, l_response_size_need); + // s_new_balancer_link_request(l_balancer_request->net, l_balancer_request->link_replace_tries); + // DAP_DELETE(l_balancer_request); + // return; + // } s_net_balancer_link_prepare_success(l_balancer_request->worker, l_link_full_node_list, a_arg); } @@ -911,65 +676,61 @@ void s_net_http_link_prepare_error(int a_error_code, void *a_arg) */ static bool s_new_balancer_link_request(dap_chain_net_t *a_net, int a_link_replace_tries) { - dap_chain_net_pvt_t *l_net_pvt = a_net ? PVT(a_net) : NULL; - if (!l_net_pvt) - return false; - if (l_net_pvt->state_target == NET_STATE_OFFLINE) { - return false; - } - if (a_link_replace_tries >= 3) { - // network problems, make static links +// sanity check + dap_return_val_if_pass(!a_net || !PVT(a_net) || PVT(a_net)->state_target == NET_STATE_OFFLINE , false); +// func work + dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); + if (dap_link_manager_links_count(a_net->pub.id.uint64) < l_net_pvt->seed_nodes_count) { s_fill_links_from_root_aliases(a_net); - pthread_mutex_lock(&l_net_pvt->uplinks_mutex); - struct net_link *l_free_link = s_get_free_link(a_net); - if (l_free_link) - s_net_link_start(a_net, l_free_link, l_net_pvt->reconnect_delay); - pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); - return true; + // Extra links from cfg + for (int i = 0; i < l_net_pvt->permanent_links_count; i++) { + dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(a_net, l_net_pvt->permanent_links + i); + if (!l_link_node_info) { + log_it(L_WARNING, "Can't find addr info for permanent link " NODE_ADDR_FP_STR, + NODE_ADDR_FP_ARGS(l_net_pvt->permanent_links + i)); + continue; + } + dap_link_t *l_link = dap_link_manager_link_create_or_update(&l_link_node_info[i].hdr.address, + &l_link_node_info[i].hdr.ext_addr_v4, &l_link_node_info[i].hdr.ext_addr_v6, l_link_node_info[i].hdr.ext_port); + dap_link_manager_link_add(a_net->pub.id.uint64, l_link); + DAP_DELETE(l_link_node_info); + } } - if(!a_link_replace_tries){ - - dap_chain_net_node_balancer_t *l_link_full_node_list = dap_chain_net_balancer_get_node(a_net->pub.name,l_net_pvt->max_links_count*2); - size_t node_cnt = 0,i = 0; - if(l_link_full_node_list) - { + if(l_net_pvt->only_static_links) + return true; + int a_required_links_count = dap_link_manager_needed_links_count(a_net->pub.id.uint64); + if(!a_link_replace_tries) { + dap_chain_net_node_balancer_t *l_link_full_node_list = dap_chain_net_balancer_get_node(a_net->pub.name, a_required_links_count * 2); + size_t l_node_cnt = 0; + if(l_link_full_node_list) { dap_chain_node_info_t * l_node_info = (dap_chain_node_info_t *)l_link_full_node_list->nodes_info; - node_cnt = l_link_full_node_list->count_node; - int l_net_link_add = 0; - size_t l_links_count = 0; - while(!l_net_link_add && i<node_cnt){ + l_node_cnt = l_link_full_node_list->count_node; - l_net_link_add = s_net_link_add(a_net, l_node_info + i); + for(size_t i = 0; i < l_node_cnt; ++i) { + int l_net_link_add = 0; + dap_link_t *l_link = dap_link_manager_link_create_or_update(&l_node_info[i].hdr.address, + &l_node_info[i].hdr.ext_addr_v4, &l_node_info[i].hdr.ext_addr_v6, l_node_info[i].hdr.ext_port); + if (!l_link) + continue; + l_net_link_add = dap_link_manager_link_add(a_net->pub.id.uint64, l_link); switch (l_net_link_add) { - case 0: - log_it(L_MSG, "Network LOCAL balancer issues link IP %s, [%ld blocks]", inet_ntoa((l_node_info + i)->hdr.ext_addr_v4),l_node_info->info.atoms_count); - break; - case -1: - log_it(L_MSG, "Network LOCAL balancer: IP %s is already among links", inet_ntoa((l_node_info + i)->hdr.ext_addr_v4)); - break; - case 1: - log_it(L_MSG, "Network links table is full"); - break; - default: - break; + case 0: + log_it(L_MSG, "Network LOCAL balancer issues link IP %s, [%ld blocks]", l_link->client->uplink_addr,l_node_info->info.atoms_count); + break; + case -1: + log_it(L_MSG, "Network LOCAL balancer: Node %s is already among links", l_link->client->uplink_addr); + break; + case -2: + log_it(L_MSG, "Network LOCAL balancer: Link manager not active"); + break; + case 1: + log_it(L_MSG, "Network links table is full"); + break; + default: + break; } - l_links_count = HASH_COUNT(l_net_pvt->net_links); - if(l_net_link_add && l_links_count < l_net_pvt->required_links_count && i < node_cnt)l_net_link_add = 0; - i++; - } - DAP_DELETE(l_link_full_node_list); - pthread_mutex_lock(&l_net_pvt->uplinks_mutex); - struct net_link *l_free_link = s_get_free_link(a_net); - if (l_free_link){ - s_net_link_start(a_net, l_free_link, l_net_pvt->reconnect_delay); - pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); - return true; - } - else - { - pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); - return false; } + DAP_DEL_MULTY(l_link_full_node_list); } } dap_chain_node_info_t *l_link_node_info = dap_chain_net_balancer_link_from_cfg(a_net); @@ -994,7 +755,7 @@ static bool s_new_balancer_link_request(dap_chain_net_t *a_net, int a_link_repla char *l_request = dap_strdup_printf("%s/%s?version=1,method=r,needlink=%d,net=%s", DAP_UPLINK_PATH_BALANCER, DAP_BALANCER_URI_HASH, - l_net_pvt->required_links_count, + a_required_links_count, a_net->pub.name); ret = dap_client_http_request(l_balancer_request->worker, l_node_addr_str, @@ -1031,20 +792,6 @@ static bool s_new_balancer_link_request(dap_chain_net_t *a_net, int a_link_repla return true; } -static void s_prepare_links_from_balancer(dap_chain_net_t *a_net) -{ - if (!a_net) { - log_it(L_ERROR, "Invalid arguments in s_prepare_links_from_balancer"); - return; - } - // Get list of the unique links for l_net - size_t l_max_links_count = PVT(a_net)->max_links_count; // Not all will be success - for (size_t l_cur_links_count = 0, n = 0; n < 100 && l_cur_links_count < l_max_links_count; ++n) { - if (s_new_balancer_link_request(a_net, 0)) - l_cur_links_count++; - } -} - struct json_object *s_net_states_json_collect(dap_chain_net_t *a_net) { struct json_object *l_json = json_object_new_object(); @@ -1052,8 +799,8 @@ struct json_object *s_net_states_json_collect(dap_chain_net_t *a_net) json_object_object_add(l_json, "name" , json_object_new_string((const char*)a_net->pub.name)); json_object_object_add(l_json, "networkState" , json_object_new_string(dap_chain_net_state_to_str(PVT(a_net)->state))); json_object_object_add(l_json, "targetState" , json_object_new_string(dap_chain_net_state_to_str(PVT(a_net)->state_target))); - json_object_object_add(l_json, "linksCount" , json_object_new_int(PVT(a_net)->net_links ? HASH_COUNT(PVT(a_net)->net_links) : 0)); - json_object_object_add(l_json, "activeLinksCount" , json_object_new_int(s_net_get_active_links_count(a_net))); + json_object_object_add(l_json, "linksCount" , json_object_new_int(0)); + json_object_object_add(l_json, "activeLinksCount" , json_object_new_int(dap_link_manager_links_count(a_net->pub.id.uint64))); char l_node_addr_str[24] = {'\0'}; int l_tmp = snprintf(l_node_addr_str, sizeof(l_node_addr_str), NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(g_node_addr)); json_object_object_add(l_json, "nodeAddress" , json_object_new_string(l_tmp ? l_node_addr_str : "0000::0000::0000::0000")); @@ -1092,25 +839,7 @@ static bool s_net_states_proc(void *a_arg) case NET_STATE_OFFLINE: { log_it(L_NOTICE,"%s.state: NET_STATE_OFFLINE", l_net->pub.name); // delete all links - struct net_link *l_link, *l_link_tmp; - pthread_mutex_lock(&l_net_pvt->uplinks_mutex); - HASH_ITER(hh, l_net_pvt->net_links, l_link, l_link_tmp) { - if (l_link->delay_timer) - dap_timerfd_delete_mt(l_link->delay_timer->worker, l_link->delay_timer->esocket_uuid); - if (l_link->link) { - dap_chain_node_client_t *l_client = l_link->link; - l_client->callbacks.delete = NULL; - dap_chain_node_client_close_mt(l_client); - } - HASH_DEL(l_net_pvt->net_links, l_link); - DAP_DEL_Z(l_link->link_info); - DAP_DELETE(l_link); - } - pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); l_net_pvt->balancer_link_requests = 0; - l_net_pvt->active_link = NULL; - dap_list_free(l_net_pvt->links_queue); - l_net_pvt->links_queue = NULL; if ( l_net_pvt->state_target != NET_STATE_OFFLINE ){ l_net_pvt->state = NET_STATE_LINKS_PREPARE; l_repeat_after_exit = true; @@ -1122,50 +851,11 @@ static bool s_net_states_proc(void *a_arg) case NET_STATE_LINKS_PREPARE: { log_it(L_NOTICE,"%s.state: NET_STATE_LINKS_PREPARE", l_net->pub.name); s_net_states_notify(l_net); - // Extra links from cfg - for (int i = 0; i < l_net_pvt->permanent_links_count; i++) { - dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, l_net_pvt->permanent_links + i); - if (!l_link_node_info) { - log_it(L_WARNING, "Can't find addr info for permanent link " NODE_ADDR_FP_STR, - NODE_ADDR_FP_ARGS(l_net_pvt->permanent_links + i)); - continue; - } - s_net_link_add(l_net, l_link_node_info); - DAP_DELETE(l_link_node_info); - } - - if (!l_net_pvt->seed_nodes_count) { - if (l_net_pvt->net_links) { // We have other links - l_net_pvt->state = NET_STATE_LINKS_CONNECTING; - l_repeat_after_exit = true; - } else { - log_it(L_ERROR, "No information about seed nodes present in configuration file"); - dap_chain_net_state_go_to(l_net, NET_STATE_OFFLINE); - } - break; - } - // Get DNS request result from root nodes as synchronization links - if (!l_net_pvt->only_static_links) { - s_prepare_links_from_balancer(l_net); - } else { - log_it(L_ATT, "Not use bootstrap addresses, fill seed nodelist from root aliases"); - // Add other root nodes as synchronization links - s_fill_links_from_root_aliases(l_net); - l_net_pvt->state = NET_STATE_LINKS_CONNECTING; - l_repeat_after_exit = true; - break; - } } break; case NET_STATE_LINKS_CONNECTING: { log_it(L_INFO, "%s.state: NET_STATE_LINKS_CONNECTING",l_net->pub.name); size_t l_used_links = 0; - struct net_link *l_link, *l_link_tmp; - HASH_ITER(hh, l_net_pvt->net_links, l_link, l_link_tmp) { - s_net_link_start(l_net, l_link, 0); - if (++l_used_links == l_net_pvt->required_links_count) - break; - } } break; case NET_STATE_LINKS_ESTABLISHED: @@ -1192,59 +882,6 @@ static bool s_net_states_proc(void *a_arg) return l_repeat_after_exit; } -bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client) -{ - dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); - pthread_mutex_lock(&l_net_pvt->uplinks_mutex); - bool l_found = false; - if (l_net_pvt->active_link) { - struct net_link *l_link, *l_link_tmp; - HASH_ITER(hh, l_net_pvt->net_links, l_link, l_link_tmp) { - dap_chain_node_client_t *l_client = l_link->link; - if (l_client == l_net_pvt->active_link && - l_client->state >= NODE_CLIENT_STATE_ESTABLISHED && - l_client->state < NODE_CLIENT_STATE_SYNCED && - a_client != l_client) { - l_found = true; - break; - } - } - } - if (!l_found) { - l_net_pvt->active_link = a_client; - } - if (l_found && !dap_list_find(l_net_pvt->links_queue, a_client, NULL)) - l_net_pvt->links_queue = dap_list_append(l_net_pvt->links_queue, a_client); - pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); - return !l_found; -} - -bool dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client) -{ - if (!a_net) - return false; - dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); - pthread_mutex_lock(&l_net_pvt->uplinks_mutex); - bool l_ret = false; - if (!a_client || l_net_pvt->active_link == a_client) - l_net_pvt->active_link = NULL; - while (l_net_pvt->active_link == NULL && l_net_pvt->links_queue) { - dap_chain_node_client_t *l_link = l_net_pvt->links_queue->data; - dap_chain_node_sync_status_t l_status = dap_chain_node_client_start_sync(l_link); - if (l_status != NODE_SYNC_STATUS_WAITING) - // Remove list head - l_net_pvt->links_queue = dap_list_delete_link(l_net_pvt->links_queue, l_net_pvt->links_queue); - else - break; - } - l_ret = l_net_pvt->active_link; - pthread_mutex_unlock(&l_net_pvt->uplinks_mutex); - if (!l_ret && l_net_pvt->state_target == NET_STATE_ONLINE && l_net_pvt->last_sync) { - l_net_pvt->state = NET_STATE_ONLINE; - } - return l_ret; -} - /** * @brief dap_chain_net_get_role * @param a_net @@ -1278,7 +915,6 @@ static dap_chain_net_t *s_net_new(const char *a_id, const char *a_name, pthread_mutexattr_t l_mutex_attr; pthread_mutexattr_init(&l_mutex_attr); pthread_mutexattr_settype(&l_mutex_attr, PTHREAD_MUTEX_RECURSIVE); - pthread_mutex_init(&PVT(l_ret)->uplinks_mutex, &l_mutex_attr); pthread_mutex_init(&l_ret->pub.balancer_mutex, &l_mutex_attr); pthread_mutexattr_destroy(&l_mutex_attr); if (dap_chain_net_id_parse(a_id, &l_ret->pub.id) != 0) { @@ -1375,8 +1011,8 @@ json_object* s_set_reply_text_node_status_json(dap_chain_net_t *a_net) { json_object_object_add(l_jobj_ret, "current_addr", l_jobj_cur_node_addr); if (PVT(a_net)->state != NET_STATE_OFFLINE) { json_object *l_jobj_links = json_object_new_object(); - json_object *l_jobj_active_links = json_object_new_uint64(s_net_get_active_links_count(a_net)); - json_object *l_jobj_total_links = json_object_new_uint64(HASH_COUNT(PVT(a_net)->net_links)); + json_object *l_jobj_active_links = json_object_new_uint64(0 /*HASH_COUNT(PVT(a_net)->net_links)*/); // need adopt to link manager + json_object *l_jobj_total_links = json_object_new_uint64(0 /*HASH_COUNT(PVT(a_net)->net_links)*/); if (!l_jobj_links || !l_jobj_active_links || !l_jobj_total_links) { json_object_put(l_jobj_ret); json_object_put(l_jobj_links); @@ -1410,8 +1046,8 @@ void s_set_reply_text_node_status(void **a_str_reply, dap_chain_net_t * a_net){ char* l_sync_current_link_text_block = NULL; if (PVT(a_net)->state != NET_STATE_OFFLINE) l_sync_current_link_text_block = dap_strdup_printf(", active links %zu from %u", - s_net_get_active_links_count(a_net), - HASH_COUNT(PVT(a_net)->net_links)); + dap_link_manager_links_count(a_net->pub.id.uint64), + 0 /*HASH_COUNT(PVT(a_net)->net_links)*/); dap_cli_server_cmd_set_reply_text(a_str_reply, "Network \"%s\" has state %s (target state %s)%s%s", a_net->pub.name, c_net_states[PVT(a_net)->state], @@ -2367,7 +2003,7 @@ void s_main_timer_callback(void *a_arg) dap_chain_net_pvt_t *l_net_pvt = PVT(l_net); if (l_net_pvt->state_target == NET_STATE_ONLINE && l_net_pvt->state >= NET_STATE_LINKS_ESTABLISHED && - !s_net_get_active_links_count(l_net)) // restart network + !dap_link_manager_links_count(l_net->pub.id.uint64)) // restart network dap_chain_net_start(l_net); dap_chain_net_balancer_prepare_list_links(l_net->pub.name,false); } @@ -2495,12 +2131,10 @@ int s_net_init(const char * a_net_name, uint16_t a_acl_idx) HASH_ADD_STR(s_net_items, name, l_net_item); HASH_ADD(hh2, s_net_ids, net_id, sizeof(l_net_item->net_id), l_net_item); - // Maximum number of prepared connections to other nodes - l_net_pvt->max_links_count = dap_config_get_item_int16_default(l_cfg, "general", "max_links", 5); - // Required number of active connections to other nodes - l_net_pvt->required_links_count = dap_config_get_item_int16_default(l_cfg, "general", "require_links", 3); - // Wait time before reconnect attempt with same link - l_net_pvt->reconnect_delay = dap_config_get_item_int16_default(l_cfg, "general", "reconnect_delay", 10); + uint16_t l_seed_nodes_addrs_len =0; + char ** l_seed_nodes_addrs = dap_config_get_array_str( l_cfg , "general" ,"seed_nodes_addrs", &l_seed_nodes_addrs_len); + uint16_t l_permamnet_nodes_addrs_len =0; + char ** l_permamnet_nodes_addrs = dap_config_get_array_str( l_cfg , "general" ,"permanent_nodes_addrs", &l_permamnet_nodes_addrs_len); char **l_poa_nodes_addrs = dap_config_get_array_str(l_cfg, "general", "seed_nodes_addrs", &l_net_pvt->poa_nodes_count); if (!l_net_pvt->poa_nodes_count) { @@ -2511,7 +2145,7 @@ int s_net_init(const char * a_net_name, uint16_t a_acl_idx) } l_net_pvt->poa_nodes_addrs = DAP_NEW_SIZE(dap_stream_node_addr_t, l_net_pvt->poa_nodes_count * sizeof(dap_stream_node_addr_t)); if (!l_net_pvt->poa_nodes_addrs) { - log_it(L_CRITICAL, g_error_memory_alloc); + log_it(L_CRITICAL, "%s", g_error_memory_alloc); dap_chain_net_delete(l_net); dap_config_close(l_cfg); return -1; @@ -2534,8 +2168,9 @@ int s_net_init(const char * a_net_name, uint16_t a_acl_idx) char **l_seed_nodes_port = dap_config_get_array_str(l_cfg, "general" ,"seed_nodes_port", &l_seed_nodes_port_len); uint16_t l_bootstrap_nodes_len = 0; char **l_bootstrap_nodes = dap_config_get_array_str(l_cfg, "general", "bootstrap_hostnames", &l_bootstrap_nodes_len); - if (l_seed_nodes_port_len) { - if ((l_seed_nodes_ipv4_len && l_seed_nodes_ipv4_len != l_seed_nodes_port_len) || + if (l_seed_nodes_addrs_len) { + if ( l_seed_nodes_addrs_len != l_seed_nodes_port_len || + (l_seed_nodes_ipv4_len && l_seed_nodes_ipv4_len != l_seed_nodes_port_len) || (l_seed_nodes_ipv6_len && l_seed_nodes_ipv6_len != l_seed_nodes_port_len) || (l_seed_nodes_hostnames_len && l_seed_nodes_hostnames_len != l_seed_nodes_port_len) || (!l_seed_nodes_ipv4_len && !l_seed_nodes_ipv6_len && !l_seed_nodes_hostnames_len) || @@ -2545,18 +2180,29 @@ int s_net_init(const char * a_net_name, uint16_t a_acl_idx) dap_config_close(l_cfg); return -6; } - l_net_pvt->seed_nodes_count = l_seed_nodes_port_len; + l_net_pvt->seed_nodes_count = l_seed_nodes_addrs_len; l_net_pvt->seeds_is_poas = true; } else { if (!l_bootstrap_nodes_len) log_it(L_WARNING, "Configuration for network %s doesn't contains any links", l_net->pub.name); l_net_pvt->seed_nodes_count = l_bootstrap_nodes_len; } + l_net_pvt->permanent_links_count = l_permamnet_nodes_addrs_len; log_it (L_DEBUG, "Read %u seed nodes params", l_net_pvt->seed_nodes_count); + l_net_pvt->seed_nodes_addrs = DAP_NEW_SIZE(dap_stream_node_addr_t, l_net_pvt->seed_nodes_count * sizeof(dap_stream_node_addr_t)); + l_net_pvt->permanent_links = DAP_NEW_SIZE(dap_stream_node_addr_t, l_net_pvt->permanent_links_count * sizeof(dap_stream_node_addr_t)); + // Load permanent nodes from cfg file + for (uint16_t i = 0; i < l_net_pvt->permanent_links_count; i++) { + if (dap_chain_node_addr_from_str(l_net_pvt->permanent_links + i, l_permamnet_nodes_addrs[i])) { + log_it(L_ERROR,"Wrong address format, must be 0123::4567::89AB::CDEF"); + l_net_pvt->permanent_links_count--; + continue; + } + } if (l_seed_nodes_ipv6_len) { l_net_pvt->seed_nodes_ipv6 = DAP_NEW_SIZE(struct sockaddr_in6, l_net_pvt->seed_nodes_count * sizeof(struct sockaddr_in6)); if (!l_net_pvt->seed_nodes_ipv6) { - log_it(L_CRITICAL, g_error_memory_alloc); + log_it(L_CRITICAL, "%s", g_error_memory_alloc); dap_chain_net_delete(l_net); dap_config_close(l_cfg); return -1; @@ -2564,7 +2210,7 @@ int s_net_init(const char * a_net_name, uint16_t a_acl_idx) } else { // Just only IPv4 can be resolved for now l_net_pvt->seed_nodes_ipv4 = DAP_NEW_SIZE(struct sockaddr_in, l_net_pvt->seed_nodes_count * sizeof(struct sockaddr_in)); if (!l_net_pvt->seed_nodes_ipv4) { - log_it(L_CRITICAL, g_error_memory_alloc); + log_it(L_CRITICAL, "%s", g_error_memory_alloc); dap_chain_net_delete(l_net); dap_config_close(l_cfg); return -1; @@ -2574,6 +2220,11 @@ int s_net_init(const char * a_net_name, uint16_t a_acl_idx) for (uint16_t i = 0; i < l_net_pvt->seed_nodes_count; i++) { char *l_node_hostname = NULL; uint16_t l_node_port = 0; + if (dap_chain_node_addr_from_str(l_net_pvt->seed_nodes_addrs + i, l_seed_nodes_addrs[i])) { + log_it(L_ERROR,"Wrong address format, must be 0123::4567::89AB::CDEF"); + l_net_pvt->seed_nodes_count--; + continue; + } if (l_seed_nodes_port_len) { l_node_port = strtoul(l_seed_nodes_port[i], NULL, 10); if (l_seed_nodes_ipv4_len) @@ -2971,8 +2622,11 @@ int s_net_load(dap_chain_net_t *a_net) uint32_t l_timeout = dap_config_get_item_uint32_default(g_config, "node_client", "timer_update_states", 600); PVT(l_net)->main_timer = dap_interval_timer_create(l_timeout * 1000, s_main_timer_callback, l_net); - dap_config_close(l_cfg); + if(dap_link_manager_add_net(l_net->pub.id.uint64, l_net_pvt->nodes_cluster->links_cluster)) { + log_it(L_WARNING, "Can't add net %s to link manager", l_net->pub.name); + } log_it(L_INFO, "Chain network \"%s\" initialized",l_net->pub.name); + dap_config_close(l_cfg); return 0; } @@ -3001,7 +2655,6 @@ static int s_net_try_online(dap_chain_net_t *a_net) if (l_target_state != l_net_pvt->state_target) { dap_chain_net_state_go_to(l_net, l_target_state); - log_it(L_INFO, "Network \"%s\" goes online",l_net->pub.name); } @@ -3322,7 +2975,7 @@ dap_chain_cell_id_t * dap_chain_net_get_cur_cell( dap_chain_net_t * l_net) /** * Get remote nodes list (list of dap_chain_node_addr_t struct) */ -dap_list_t* dap_chain_net_get_node_list(dap_chain_net_t * l_net) +dap_list_t* dap_chain_net_get_node_list(dap_chain_net_t *l_net) { dap_list_t *l_node_list = NULL; @@ -3716,3 +3369,40 @@ void dap_chain_net_announce_addrs(dap_chain_net_t *a_net) NODE_ADDR_FP_ARGS_S(g_node_addr), l_node_addr_str, l_net_pvt->node_info->hdr.ext_port, a_net->pub.name); } } + +void s_link_manager_link_request(uint64_t a_net_id) +{ +// sanity check + dap_chain_net_t *l_net = dap_chain_net_by_id((dap_chain_net_id_t){.uint64 = a_net_id}); + dap_return_if_pass(!l_net); +// func work + s_new_balancer_link_request(l_net, 0); +} + +int dap_chain_net_link_manager_init() +{ + return dap_link_manager_init(&s_link_manager_callbacks); +} + +int s_link_manager_fill_net_info(dap_link_t *a_link) +{ +// sanity check + dap_return_val_if_pass(!a_link, -1); +// func work + int l_ret = 0; + dap_chain_net_item_t *l_net_item = NULL, *l_tmp = NULL; + dap_chain_node_info_t *l_node_info = NULL; + HASH_ITER(hh, s_net_items, l_net_item, l_tmp) { + if ((l_node_info = dap_chain_node_info_read(l_net_item->chain_net,&a_link->node_addr))) + break; + } + if (!l_node_info) { + return -3; + } + if (a_link != dap_link_manager_link_create_or_update(&l_node_info->hdr.address, + &l_node_info->hdr.ext_addr_v4, &l_node_info->hdr.ext_addr_v6, l_node_info->hdr.ext_port)) { + log_it(L_WARNING, "LEAKS, links dublicate to node "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(a_link->node_addr)); + } + DAP_DELETE(l_node_info); + return l_ret; +} diff --git a/modules/net/dap_chain_node.c b/modules/net/dap_chain_node.c index 8de6289a3b50def2cf3b2b715a8ea28e6fdf4b3b..73d7bb016c891316521e5e4b4f21b0bf839384dc 100644 --- a/modules/net/dap_chain_node.c +++ b/modules/net/dap_chain_node.c @@ -136,9 +136,9 @@ int dap_chain_node_info_save(dap_chain_net_t * a_net, dap_chain_node_info_t *a_n /** * Read node from base */ -dap_chain_node_info_t* dap_chain_node_info_read( dap_chain_net_t * a_net,dap_chain_node_addr_t *l_address) +dap_chain_node_info_t* dap_chain_node_info_read( dap_chain_net_t * a_net,dap_chain_node_addr_t *a_address) { - char *l_key = dap_chain_node_addr_to_hash_str(l_address); + char *l_key = dap_chain_node_addr_to_hash_str(a_address); if (!l_key) { log_it(L_WARNING,"Can't calculate hash of addr"); return NULL; @@ -149,7 +149,7 @@ dap_chain_node_info_t* dap_chain_node_info_read( dap_chain_net_t * a_net,dap_cha l_node_info = (dap_chain_node_info_t *)dap_global_db_get_sync(a_net->pub.gdb_nodes, l_key, &node_info_size, NULL, NULL); if (!l_node_info) { - log_it(L_NOTICE, "Node with address %s not found in base", l_key); + log_it(L_NOTICE, "Node with address %s not found in base of %s network", l_key, a_net->pub.name); DAP_DELETE(l_key); return NULL; } diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index eac80ec86b27bd2f7ef3182264e0eaaf13ec123c..bcb29652e18770aa73e1938eb1b3a44a3b1c1c86 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -156,7 +156,7 @@ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) l_node_client->esocket_uuid = 0; - dap_chain_net_sync_unlock(l_node_client->net, l_node_client); + // dap_chain_net_sync_unlock(l_node_client->net, l_node_client); if (l_node_client->callbacks.disconnected) { l_node_client->callbacks.disconnected(l_node_client, l_node_client->callbacks_arg); } @@ -189,7 +189,8 @@ dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_chain_node_cli // If we do nothing - init sync process if (l_ch_chain->state == CHAIN_STATE_IDLE) { - bool l_trylocked = dap_chain_net_sync_trylock(l_net, a_node_client); + // bool l_trylocked = dap_chain_net_sync_trylock(l_net, a_node_client); + bool l_trylocked = true; if (l_trylocked) { log_it(L_INFO, "Start synchronization process with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr)); a_node_client->cur_chain = l_net->pub.chains; @@ -277,7 +278,7 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) DL_FOREACH(dap_global_db_instance_get_default()->clusters, it) if (dap_cluster_member_find_unsafe(it->role_cluster, l_uplink_addr)) dap_cluster_member_add(it->links_cluster, l_uplink_addr, 0, NULL); - dap_chain_net_add_cluster_link(l_node_client->net, l_uplink_addr); + // dap_chain_net_add_cluster_link(l_node_client->net, l_uplink_addr); pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_ESTABLISHED; @@ -450,7 +451,8 @@ static void s_ch_chain_callback_notify_packet_in(dap_chain_ch_t* a_ch_chain, uin l_node_client->state = NODE_CLIENT_STATE_SYNCED; dap_cond_signal(l_node_client->wait_cond); pthread_mutex_unlock(&l_node_client->wait_mutex); - bool l_have_waiting = dap_chain_net_sync_unlock(l_net, l_node_client); + // bool l_have_waiting = dap_chain_net_sync_unlock(l_net, l_node_client); + bool l_have_waiting = true; if (dap_chain_net_get_target_state(l_net) == NET_STATE_ONLINE) { dap_timerfd_reset_unsafe(l_node_client->sync_timer); dap_chain_net_set_state(l_net, NET_STATE_ONLINE); @@ -500,7 +502,8 @@ static void s_ch_chain_callback_notify_packet_out(dap_chain_ch_t* a_ch_chain, ui l_node_client->state = NODE_CLIENT_STATE_ERROR; if (l_node_client->sync_timer) dap_timerfd_reset_unsafe(l_node_client->sync_timer); - bool l_have_waiting = dap_chain_net_sync_unlock(l_net, l_node_client); + // bool l_have_waiting = dap_chain_net_sync_unlock(l_net, l_node_client); + bool l_have_waiting = true; if (!l_have_waiting) { if (dap_chain_net_get_target_state(l_net) == NET_STATE_ONLINE) dap_chain_net_set_state(l_net, NET_STATE_ONLINE); diff --git a/modules/net/include/dap_chain_net.h b/modules/net/include/dap_chain_net.h index 92548139f4ca86eaac7019d9521bfebbba7f7ee6..04540ea9aeabf36c5f76d17b3e3d7484c89faa07 100644 --- a/modules/net/include/dap_chain_net.h +++ b/modules/net/include/dap_chain_net.h @@ -135,10 +135,6 @@ void dap_chain_net_proc_mempool(dap_chain_net_t *a_net); void dap_chain_net_set_flag_sync_from_zero(dap_chain_net_t * a_net, bool a_flag_sync_from_zero); bool dap_chain_net_get_flag_sync_from_zero( dap_chain_net_t * a_net); -bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client); -bool dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client); - -void dap_chain_net_add_cluster_link(dap_chain_net_t *a_net, dap_stream_node_addr_t *a_node_addr); dap_chain_net_t * dap_chain_net_by_name( const char * a_name); dap_chain_net_t * dap_chain_net_by_id( dap_chain_net_id_t a_id); uint16_t dap_chain_net_get_acl_idx(dap_chain_net_t *a_net); @@ -215,6 +211,7 @@ int dap_chain_datum_add(dap_chain_t * a_chain, dap_chain_datum_t *a_datum, size_ bool dap_chain_net_get_load_mode(dap_chain_net_t * a_net); void dap_chain_net_announce_addrs(dap_chain_net_t *a_net); char *dap_chain_net_links_dump(dap_chain_net_t*); +int dap_chain_net_link_manager_init(); enum dap_chain_net_json_rpc_error_list{ DAP_CHAIN_NET_JSON_RPC_OK,