From bbf883ef57063a6b9efe08dd680d0845a5660195 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Thu, 16 Dec 2021 15:23:32 +0300 Subject: [PATCH] [+] Serial sync for active links --- dap-sdk/core/include/dap_math_ops.h | 2 +- modules/net/dap_chain_net.c | 36 ++++++++++++++++++- modules/net/dap_chain_node_client.c | 3 +- modules/net/include/dap_chain_net.h | 4 ++- modules/service/vpn/dap_chain_net_srv_vpn.c | 6 ++-- .../vpn/include/dap_chain_net_srv_vpn.h | 2 +- 6 files changed, 45 insertions(+), 8 deletions(-) diff --git a/dap-sdk/core/include/dap_math_ops.h b/dap-sdk/core/include/dap_math_ops.h index 04647fd382..bd75c7acc2 100755 --- a/dap-sdk/core/include/dap_math_ops.h +++ b/dap-sdk/core/include/dap_math_ops.h @@ -5,7 +5,7 @@ #if defined(__GNUC__) || defined (__clang__) -#if __SIZEOF_INT128__ == 8 +#if __SIZEOF_INT128__ == 16 #define DAP_GLOBAL_IS_INT128 typedef __int128 _dap_int128_t; diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index a729dcf41b..8400c75cb7 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -145,6 +145,8 @@ typedef struct dap_chain_net_pvt{ dap_chain_node_addr_t * node_addr; dap_chain_node_info_t * node_info; // Current node's info + //Active synchronizing link + dap_chain_node_client_t *active_link; // Established links dap_list_t *links; // Links list size_t links_connected_count; @@ -976,7 +978,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) for (dap_list_t *l_tmp = l_net_pvt->links_info; l_tmp; l_tmp = dap_list_next(l_tmp)) { dap_chain_node_info_t *l_link_info = (dap_chain_node_info_t *)l_tmp->data; dap_chain_node_client_t *l_client = dap_chain_net_client_create_n_connect(l_net, l_link_info); - l_client->keep_connection = false; + l_client->keep_connection = true; l_net_pvt->links = dap_list_append(l_net_pvt->links, l_client); if (dap_list_length(l_net_pvt->links) == s_required_links_count) break; @@ -1007,6 +1009,38 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) return ! l_repeat_after_exit; } + +bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client) +{ + dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); + pthread_rwlock_rdlock(&l_net_pvt->rwlock); + bool l_found = false; + if (l_net_pvt->active_link) { + for (dap_list_t *l_links = l_net_pvt->links; l_links; l_links = dap_list_next(l_links)) { + if (l_links->data == l_net_pvt->active_link) { + dap_chain_node_client_t *l_client = (dap_chain_node_client_t *)l_links->data; + if (l_client->state >= NODE_CLIENT_STATE_ESTABLISHED && + l_client->state < NODE_CLIENT_STATE_SYNCED) { + l_found = true; + break; + } + } + } + } + if (!l_found) { + l_net_pvt->active_link = a_client; + } + pthread_rwlock_unlock(&l_net_pvt->rwlock); + return !l_found; +} + +void dap_chain_net_sync_unlock(dap_chain_net_t *a_net) +{ + dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); + pthread_rwlock_rdlock(&l_net_pvt->rwlock); + l_net_pvt->active_link = NULL; + pthread_rwlock_unlock(&l_net_pvt->rwlock); +} /** * @brief dap_chain_net_client_create_n_connect * @param a_net diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index a286d8f400..9d77b359f6 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -232,7 +232,7 @@ static bool s_timer_update_states_callback(void *a_arg) dap_chain_net_t * l_net = l_node_client->net; assert(l_net); // If we do nothing - init sync process - if (l_ch_chain->state == CHAIN_STATE_IDLE ||l_ch_chain->state == CHAIN_STATE_SYNC_ALL ){ + if (l_ch_chain->state == CHAIN_STATE_IDLE && dap_chain_net_sync_trylock(l_net, l_me)) { log_it(L_INFO, "Start synchronization process with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr)); dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); @@ -456,6 +456,7 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha }else{ // If no - over with sync process dap_chain_node_addr_t * l_node_addr = dap_chain_net_get_cur_addr(l_net); log_it(L_INFO, "In: State node %s."NODE_ADDR_FP_STR" is SYNCED",l_net->pub.name, NODE_ADDR_FP_ARGS(l_node_addr) ); + dap_chain_net_sync_unlock(l_net); l_node_client->state = NODE_CLIENT_STATE_SYNCED; if (dap_chain_net_get_target_state(l_net) == NET_STATE_ONLINE) dap_chain_net_set_state(l_net, NET_STATE_ONLINE); diff --git a/modules/net/include/dap_chain_net.h b/modules/net/include/dap_chain_net.h index d955cef77b..cfca21d02d 100644 --- a/modules/net/include/dap_chain_net.h +++ b/modules/net/include/dap_chain_net.h @@ -39,7 +39,7 @@ along with any CellFrame SDK based project. If not, see <http://www.gnu.org/lic #define DAP_CHAIN_NET_NAME_MAX 32 struct dap_chain_node_info; -struct dap_chain_node_client; +typedef struct dap_chain_node_client dap_chain_node_client_t; typedef enum dap_chain_net_state{ NET_STATE_OFFLINE = 0, @@ -119,6 +119,8 @@ void dap_chain_net_proc_mempool (dap_chain_net_t * a_net); void dap_chain_net_set_flag_sync_from_zero(dap_chain_net_t * a_net, bool a_flag_sync_from_zero); bool dap_chain_net_get_flag_sync_from_zero( dap_chain_net_t * a_net); +bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client); +void dap_chain_net_sync_unlock(dap_chain_net_t *a_net); dap_chain_net_t * dap_chain_net_by_name( const char * a_name); dap_chain_net_t * dap_chain_net_by_id( dap_chain_net_id_t a_id); diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index d963628e1e..85226074bb 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -120,7 +120,7 @@ typedef struct tun_socket_msg{ } type; dap_chain_net_srv_ch_vpn_t * ch_vpn; dap_events_socket_t * esocket; - uint128_t esocket_uuid; + dap_events_socket_uuid_t esocket_uuid; bool is_reassigned_once; union{ struct{ // Esocket reassigment @@ -474,7 +474,7 @@ static void s_tun_send_msg_ip_unassigned_all(dap_chain_net_srv_ch_vpn_t * a_ch_v */ static void s_tun_send_msg_esocket_reasigned_inter(dap_chain_net_srv_vpn_tun_socket_t * a_tun_socket, dap_chain_net_srv_ch_vpn_t * a_ch_vpn, dap_events_socket_t * a_esocket, - uint128_t a_esocket_uuid, struct in_addr a_addr, uint32_t a_esocket_worker_id) + dap_events_socket_uuid_t a_esocket_uuid, struct in_addr a_addr, uint32_t a_esocket_worker_id) { struct tun_socket_msg * l_msg = DAP_NEW_Z(struct tun_socket_msg); l_msg->type = TUN_SOCKET_MSG_ESOCKET_REASSIGNED ; @@ -500,7 +500,7 @@ static void s_tun_send_msg_esocket_reasigned_inter(dap_chain_net_srv_vpn_tun_soc * @param a_worker_id */ static void s_tun_send_msg_esocket_reasigned_all_inter(dap_chain_net_srv_ch_vpn_t * a_ch_vpn, dap_events_socket_t * a_esocket, - uint128_t a_esocket_uuid, struct in_addr a_addr, uint32_t a_worker_id) + dap_events_socket_uuid_t a_esocket_uuid, struct in_addr a_addr, uint32_t a_worker_id) { for( uint32_t i=0; i< s_tun_sockets_count; i++) s_tun_send_msg_esocket_reasigned_inter(s_tun_sockets[i] , a_ch_vpn, a_esocket, a_esocket_uuid, a_addr, a_worker_id); diff --git a/modules/service/vpn/include/dap_chain_net_srv_vpn.h b/modules/service/vpn/include/dap_chain_net_srv_vpn.h index 10e3edf368..a576bc5ca2 100644 --- a/modules/service/vpn/include/dap_chain_net_srv_vpn.h +++ b/modules/service/vpn/include/dap_chain_net_srv_vpn.h @@ -150,7 +150,7 @@ typedef struct dap_chain_net_srv_ch_vpn_info dap_events_socket_t * queue_msg; // Message queue dap_worker_t * worker; dap_events_socket_t * esocket; - uint64_t esocket_uuid; + dap_events_socket_uuid_t esocket_uuid; UT_hash_handle hh; }dap_chain_net_srv_ch_vpn_info_t; -- GitLab