From 5dba0b137871e13327dae8dd23187f80769c1a14 Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net> Date: Tue, 19 Jan 2021 17:09:30 +0700 Subject: [PATCH] [+] dap_notify --- CMakeLists.txt | 2 +- dap-sdk/net/core/dap_events_socket.c | 54 +++++--- dap-sdk/net/core/include/dap_events_socket.h | 4 +- dap-sdk/net/server/CMakeLists.txt | 1 + .../net/server/notify_server/CMakeLists.txt | 13 ++ .../notify_server/include/dap_notify_srv.h | 29 ++++ .../server/notify_server/src/dap_notify_srv.c | 126 ++++++++++++++++++ dap-sdk/net/stream/ch/dap_stream_ch.c | 2 + dap-sdk/net/stream/ch/include/dap_stream_ch.h | 2 + modules/net/dap_chain_net.c | 92 ++++++++++--- modules/net/include/dap_chain_node_client.h | 11 ++ 11 files changed, 293 insertions(+), 43 deletions(-) create mode 100644 dap-sdk/net/server/notify_server/CMakeLists.txt create mode 100644 dap-sdk/net/server/notify_server/include/dap_notify_srv.h create mode 100644 dap-sdk/net/server/notify_server/src/dap_notify_srv.c diff --git a/CMakeLists.txt b/CMakeLists.txt index f4d0935672..da1232731c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -60,7 +60,7 @@ endif() # Networking if (CELLFRAME_MODULES MATCHES "network") message("[+] Module 'network'") - set(CELLFRAME_LIBS ${CELLFRAME_LIBS} dap_server_core dap_json_rpc dap_enc_server dap_http_server dap_session + set(CELLFRAME_LIBS ${CELLFRAME_LIBS} dap_server_core dap_json_rpc dap_enc_server dap_notify_srv dap_http_server dap_session dap_stream dap_stream_ch dap_client dap_stream_ch_chain dap_stream_ch_chain_net dap_chain_net dap_chain_mempool magic) endif() diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index db6acb4d73..3f005a9d31 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -313,32 +313,44 @@ dap_events_socket_t * dap_events_socket_create_type_pipe_mt(dap_worker_t * a_w, */ dap_events_socket_t * dap_events_socket_create(dap_events_desc_type_t a_type, dap_events_socket_callbacks_t* a_callbacks) { - dap_events_socket_t * l_es = NULL; + int l_sock_type = SOCK_STREAM; + int l_sock_class = AF_INET; + switch(a_type){ case DESCRIPTOR_TYPE_SOCKET_CLIENT: - case DESCRIPTOR_TYPE_SOCKET_UDP :{ - #ifdef WIN32 - SOCKET l_sock; - #else - int l_sock; - #endif - l_sock = socket(AF_INET, (a_type==DESCRIPTOR_TYPE_SOCKET_CLIENT? SOCK_STREAM : SOCK_DGRAM) - | SOCK_NONBLOCK , 0); - if (l_sock == INVALID_SOCKET) { - log_it(L_ERROR, "Socket create error"); - break; - } - - dap_events_socket_t * l_es =dap_events_socket_wrap_no_add(dap_events_get_default(),l_sock,a_callbacks); - if(!l_es){ - log_it(L_CRITICAL,"Can't allocate memory for the new esocket"); - break; - } - l_es->type = DESCRIPTOR_TYPE_EVENT; - } break; + break; + case DESCRIPTOR_TYPE_SOCKET_UDP : + l_sock_type = SOCK_DGRAM; + break; + case DESCRIPTOR_TYPE_SOCKET_LOCAL_LISTENING: +#ifdef DAP_OS_UNIX + l_sock_class = AF_LOCAL; +#elif DAP_OS_WIDNOWS +#endif + break; default: log_it(L_CRITICAL,"Can't create socket type %d", a_type ); + return NULL; } + +#ifdef WIN32 + SOCKET l_sock; +#else + int l_sock; +#endif + l_sock = socket(l_sock_class, l_sock_type | SOCK_NONBLOCK , 0); + if (l_sock == INVALID_SOCKET) { + log_it(L_ERROR, "Socket create error"); + return NULL; + } + + dap_events_socket_t * l_es =dap_events_socket_wrap_no_add(dap_events_get_default(),l_sock,a_callbacks); + if(!l_es){ + log_it(L_CRITICAL,"Can't allocate memory for the new esocket"); + return NULL; + } + l_es->type = a_type ; + return l_es; } diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 9feaac106f..07b45c23ef 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -144,7 +144,9 @@ typedef enum { DESCRIPTOR_TYPE_PIPE, DESCRIPTOR_TYPE_TIMER, DESCRIPTOR_TYPE_EVENT, - DESCRIPTOR_TYPE_FILE + DESCRIPTOR_TYPE_FILE, + DESCRIPTOR_TYPE_SOCKET_LOCAL_LISTENING, + DESCRIPTOR_TYPE_SOCKET_LOCAL_CLIENT, } dap_events_desc_type_t; typedef struct dap_events_socket { diff --git a/dap-sdk/net/server/CMakeLists.txt b/dap-sdk/net/server/CMakeLists.txt index be52ca81d5..d2023893c5 100644 --- a/dap-sdk/net/server/CMakeLists.txt +++ b/dap-sdk/net/server/CMakeLists.txt @@ -1,6 +1,7 @@ project(libdap-server C) cmake_minimum_required(VERSION 3.0) +add_subdirectory(notify_server) add_subdirectory(http_server) add_subdirectory(enc_server) add_subdirectory(json_rpc) diff --git a/dap-sdk/net/server/notify_server/CMakeLists.txt b/dap-sdk/net/server/notify_server/CMakeLists.txt new file mode 100644 index 0000000000..005582dd3f --- /dev/null +++ b/dap-sdk/net/server/notify_server/CMakeLists.txt @@ -0,0 +1,13 @@ +cmake_minimum_required(VERSION 3.0) +project(dap_notify_srv C) + +file(GLOB DAP_NOTIFY_SRV_SRCS FILES src/*.c) +file(GLOB DAP_NOTIFY_SRV_HDRS FILES include/*.h) + + +add_library(${PROJECT_NAME} STATIC ${DAP_NOTIFY_SRV_SRCS} ${DAP_NOTIFY_SRV_HDRS}) +target_include_directories(${PROJECT_NAME} INTERFACE . include/) +target_include_directories(${PROJECT_NAME} PUBLIC include) + +target_link_libraries(${PROJECT_NAME} dap_core dap_server_core json-c ) + diff --git a/dap-sdk/net/server/notify_server/include/dap_notify_srv.h b/dap-sdk/net/server/notify_server/include/dap_notify_srv.h new file mode 100644 index 0000000000..e99b4e8674 --- /dev/null +++ b/dap-sdk/net/server/notify_server/include/dap_notify_srv.h @@ -0,0 +1,29 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Ltd. https://demlabs.net + * Copyright (c) 2021 + * All rights reserved. + + This file is part of DAP SDK the open source project + + DAP SDK is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP SDK is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#pragma once + +int dap_notify_server_init(const char * a_notify_socket_path); +void dap_notify_server_deinit(); +struct dap_events_socket * dap_notify_server_create_inter(); +int dap_notify_server_send_f_inter(struct dap_events_socket * a_input, const char * a_format,...); +int dap_notify_server_send_f_mt(const char * a_format,...); diff --git a/dap-sdk/net/server/notify_server/src/dap_notify_srv.c b/dap-sdk/net/server/notify_server/src/dap_notify_srv.c new file mode 100644 index 0000000000..722603fa1a --- /dev/null +++ b/dap-sdk/net/server/notify_server/src/dap_notify_srv.c @@ -0,0 +1,126 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Ltd. https://demlabs.net + * Copyright (c) 2021 + * All rights reserved. + + This file is part of DAP SDK the open source project + + DAP SDK is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP SDK is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifdef DAP_OS_UNIX +#include <sys/socket.h> +#include <sys/un.h> +#elif DAP_OS_WINDOWS +#endif + +#include <stdlib.h> +#include <stdarg.h> +#include <unistd.h> + +#include "dap_common.h" +#include "dap_strfuncs.h" +#include "dap_events_socket.h" +#include "dap_notify_srv.h" + +dap_events_socket_t * s_notify_server = NULL; +dap_events_socket_t * s_notify_server_queue = NULL; +static void s_notify_server_accept(dap_events_socket_t * a_es, int a_remote_socket, struct sockaddr* a_remote_addr ); +static void s_notify_server_inter_queue(dap_events_socket_t * a_es, void * a_arg); + +/** + * @brief dap_notify_server_init + * @param a_notify_socket_path + * @return + */ +int dap_notify_server_init(const char * a_notify_socket_path) +{ + dap_events_socket_callbacks_t l_callbacks={0}; + l_callbacks.accept_callback = s_notify_server_accept; + s_notify_server = dap_events_socket_create(DESCRIPTOR_TYPE_SOCKET_LOCAL_LISTENING,&l_callbacks ); + if (!s_notify_server) + return -1; + + struct sockaddr_un l_sock_un={0}; + l_sock_un.sun_family = AF_LOCAL; + strncpy(l_sock_un.sun_path, a_notify_socket_path, sizeof(l_sock_un.sun_path) - 1); + + return 0; +} + +/** + * @brief dap_notify_server_deinit + */ +void dap_notify_server_deinit() +{ + +} + +/** + * @brief dap_notify_server_create_inter + * @return + */ +struct dap_events_socket * dap_notify_server_create_inter() +{ + +} + +/** + * @brief dap_notify_server_send_fmt_inter + * @param a_input + * @param a_format + * @return + */ +int dap_notify_server_send_f_inter(struct dap_events_socket * a_input, const char * a_format,...) +{ + va_list va; + va_start(va, a_format); + size_t l_str_size=dap_vsnprintf(NULL,0,a_format,va); + char * l_str = DAP_NEW_SIZE(char,l_str_size+1); + dap_vsnprintf(l_str,l_str_size+1,a_format,va); + return dap_events_socket_queue_ptr_send_to_input(a_input,l_str); +} + +/** + * @brief dap_notify_server_send_fmt_mt + * @param a_format + * @return + */ +int dap_notify_server_send_f_mt(const char * a_format,...) +{ + +} + +/** + * @brief s_notify_server_accept + * @param a_es + * @param a_remote_socket + * @param a_remote_addr + */ +static void s_notify_server_accept(dap_events_socket_t * a_es, int a_remote_socket, struct sockaddr* a_remote_addr ) +{ + +} + +/** + * @brief s_notify_server_inter_queue + * @param a_es + * @param a_arg + */ +static void s_notify_server_inter_queue(dap_events_socket_t * a_es, void * a_arg) +{ + +} diff --git a/dap-sdk/net/stream/ch/dap_stream_ch.c b/dap-sdk/net/stream/ch/dap_stream_ch.c index 0a53372876..17a515745a 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch.c @@ -38,6 +38,7 @@ #include "dap_common.h" #include "dap_events_socket.h" #include "dap_http_client.h" +#include "dap_uuid.h" #include "dap_stream.h" #include "dap_stream_ch.h" #include "dap_stream_ch_proc.h" @@ -94,6 +95,7 @@ dap_stream_ch_t* dap_stream_ch_new(dap_stream_t* a_stream, uint8_t id) l_ch_new->stream = a_stream; l_ch_new->proc = proc; l_ch_new->ready_to_read = true; + l_ch_new->uuid = dap_uuid_generate_uint128(); // Init on stream worker dap_stream_worker_t * l_stream_worker = a_stream->stream_worker; diff --git a/dap-sdk/net/stream/ch/include/dap_stream_ch.h b/dap-sdk/net/stream/ch/include/dap_stream_ch.h index a4da8d255f..754dddf030 100644 --- a/dap-sdk/net/stream/ch/include/dap_stream_ch.h +++ b/dap-sdk/net/stream/ch/include/dap_stream_ch.h @@ -24,6 +24,7 @@ #include <pthread.h> #include <stdint.h> #include "uthash.h" +#include "dap_math_ops.h" typedef struct dap_stream dap_stream_t; typedef struct dap_stream_worker dap_stream_worker_t; typedef struct dap_stream_pkt dap_stream_pkt_t; @@ -40,6 +41,7 @@ typedef struct dap_stream_ch{ bool ready_to_write; bool ready_to_read; dap_stream_t * stream; + uint128_t uuid; dap_stream_worker_t * stream_worker; struct{ uint64_t bytes_write; diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index e6b664a6ca..05dccc9db8 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -112,7 +112,7 @@ static size_t s_max_links_count = 5;// by default 5 // number of required connections static size_t s_required_links_count = 3;// by default 3 static pthread_t s_net_check_pid; - +static bool s_debug_more = false; struct link_dns_request { dap_chain_net_t * net; @@ -386,7 +386,10 @@ static void s_chain_callback_notify(void * a_arg, dap_chain_t *a_chain, dap_chai } } - +/** + * @brief s_fill_links_from_root_aliases + * @param a_net + */ static void s_fill_links_from_root_aliases(dap_chain_net_t * a_net) { dap_chain_net_pvt_t *l_pvt_net = PVT(a_net); @@ -410,7 +413,7 @@ static void s_fill_links_from_root_aliases(dap_chain_net_t * a_net) l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info); pthread_rwlock_unlock(&l_pvt_net->rwlock); } else { - log_it(L_WARNING, "Not found link "NODE_ADDR_FP_STR" in the node list", NODE_ADDR_FPS_ARGS(l_link_addr)); + log_it(L_WARNING, "Not found link %s."NODE_ADDR_FP_STR" in the node list", a_net->pub.name, NODE_ADDR_FPS_ARGS(l_link_addr)); } } } @@ -428,18 +431,37 @@ static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_clie a_node_client->state = NODE_CLIENT_STATE_ESTABLISHED; - log_it(L_DEBUG, "Established connection with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address)); + if( !a_node_client->is_reconnecting || s_debug_more ) + log_it(L_NOTICE, "Established connection with %s."NODE_ADDR_FP_STR,l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address)); pthread_rwlock_wrlock(&l_net_pvt->rwlock); l_net_pvt->links = dap_list_append(l_net_pvt->links, a_node_client); l_net_pvt->links_count++; - size_t l_links_count = l_net_pvt->links_count; - if(l_net_pvt->state == NET_STATE_LINKS_CONNECTING ) + a_node_client->is_reconnecting = false; + + if(l_net_pvt->state == NET_STATE_LINKS_CONNECTING ){ l_net_pvt->state = NET_STATE_LINKS_ESTABLISHED; + + a_node_client->stream_worker = dap_client_get_stream_worker(a_node_client->client); + a_node_client->ch_chain = dap_client_get_stream_ch_unsafe(a_node_client->client, dap_stream_ch_chain_get_id()); + if (a_node_client->ch_chain){ + a_node_client->ch_chain_uuid = a_node_client->ch_chain->uuid; + a_node_client->ch_chain_net = dap_client_get_stream_ch_unsafe(a_node_client->client, dap_stream_ch_chain_net_get_id()); + if (a_node_client->ch_chain_net){ + a_node_client->ch_chain_net_uuid = a_node_client->ch_chain_net->uuid; + }else{ + log_it(L_WARNING,"No channel 'chain net' in stream connection"); + } + dap_proc_queue_add_callback_inter(dap_client_get_stream_worker(a_node_client->client)->worker->proc_queue_input, + s_node_link_states_proc, a_node_client); + }else{ + log_it(L_CRITICAL,"No channel 'chain' in stream connection, disconnecting link"); + l_net_pvt->state = NET_STATE_LINKS_CONNECTING; + dap_client_go_stage(a_node_client->client,STAGE_BEGIN,NULL); + } + } pthread_rwlock_unlock(&l_net_pvt->rwlock); - dap_proc_queue_add_callback_inter(dap_client_get_stream_worker(a_node_client->client)->worker->proc_queue_input, - s_node_link_states_proc, a_node_client); } /** @@ -450,7 +472,33 @@ static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_clie static void s_node_link_callback_disconnected(dap_chain_node_client_t * a_node_client, void * a_arg) { dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; + dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); + pthread_rwlock_wrlock(&l_net_pvt->rwlock); + if ( l_net_pvt->state_target ==NET_STATE_ONLINE ){ + if(s_debug_more) + log_it(L_NOTICE, "%s."NODE_ADDR_FP_STR" disconnected, reconnecting back...", + l_net->pub.name, + NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address) ); + + a_node_client->is_reconnecting = true; + dap_chain_node_client_create_n_connect(l_net, a_node_client->info,"CN", + s_node_link_callback_connected, + s_node_link_callback_disconnected, + s_node_link_callback_stage, + s_node_link_callback_error,NULL); + }else if (l_net_pvt->state_target == NET_STATE_OFFLINE){ + log_it(L_INFO, "%s."NODE_ADDR_FP_STR" disconnected",l_net->pub.name,NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address)); + }else{ + log_it(L_CRITICAL,"Link "NODE_ADDR_FP_STR" disconnected, but wrong target state %s: could be only NET_STATE_ONLINE or NET_STATE_OFFLINE " + ,NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address) + , c_net_states[l_net_pvt->state_target] ); + } + if(l_net_pvt->links_count) + l_net_pvt->links_count--; + else + log_it(L_CRITICAL,"Links count is zero in disconnected callback, looks smbd decreased it twice or forget to increase on connect/reconnect"); + pthread_rwlock_unlock(&l_net_pvt->rwlock); } /** @@ -462,7 +510,9 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t * a_node_c static void s_node_link_callback_stage(dap_chain_node_client_t * a_node_client,dap_client_stage_t a_stage, void * a_arg) { dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; - + if( s_debug_more) + log_it(L_INFO,"%s."NODE_ADDR_FP_STR" stage %s",l_net->pub.name,NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address), + dap_client_stage_str(a_stage)); } /** @@ -474,10 +524,8 @@ static void s_node_link_callback_stage(dap_chain_node_client_t * a_node_client,d static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client, int a_error, void * a_arg) { dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; - log_it(L_DEBUG, "Can't establish link with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address)); - dap_chain_node_client_close(l_node_client); - l_node_client = NULL; - + log_it(L_WARNING, "Can't establish link with %s."NODE_ADDR_FP_STR, l_net->pub.name, NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address)); + dap_chain_node_client_close(a_node_client); } /** @@ -499,17 +547,13 @@ static void s_net_state_link_prepare_success(dap_worker_t * a_worker,dap_chain_n l_dns_request->tries = 0; } pthread_rwlock_rdlock(&l_net_pvt->rwlock); - if (l_net_pvt->state_target == NET_STATE_OFFLINE) { // Go away from - DAP_DELETE(l_dns_request); - - dap_proc_queue_add_callback_inter( a_worker->proc_queue_input,s_net_states_proc,l_net ); - } l_dns_request->tries++; s_fill_links_from_root_aliases(l_net); l_net_pvt->links_dns_requests--; if (l_net_pvt->links_dns_requests == 0){ // It was the last one - + l_net_pvt->state = NET_STATE_LINKS_ESTABLISHED; + dap_proc_queue_add_callback_inter( a_worker->proc_queue_input,s_net_states_proc,l_net ); } pthread_rwlock_unlock(&l_net_pvt->rwlock); } @@ -528,7 +572,14 @@ static void s_net_state_link_prepare_error(dap_worker_t * a_worker,dap_chain_nod dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); pthread_rwlock_unlock(&l_net_pvt->rwlock); - l_net_pvt->links_dns_requests--; + if(l_net_pvt->links_dns_requests) + l_net_pvt->links_dns_requests--; + + if(!l_net_pvt->links_dns_requests ){ + l_net_pvt->state = NET_STATE_OFFLINE; + log_it(L_WARNING,"Can't prepare links via DNS requests"); + dap_proc_queue_add_callback_inter( a_worker->proc_queue_input,s_net_states_proc,l_net ); + } } /** @@ -1039,6 +1090,7 @@ int dap_chain_net_init() s_max_links_count = dap_config_get_item_int32_default(g_config, "general", "max_links", s_max_links_count); // required number of connections to other nodes s_required_links_count = dap_config_get_item_int32_default(g_config, "general", "require_links", s_required_links_count); + s_debug_more = dap_config_get_item_bool_default(g_config,"chain_net","debug_more",false); dap_chain_net_load_all(); diff --git a/modules/net/include/dap_chain_node_client.h b/modules/net/include/dap_chain_node_client.h index 8a4d01e127..8099b42021 100644 --- a/modules/net/include/dap_chain_node_client.h +++ b/modules/net/include/dap_chain_node_client.h @@ -61,7 +61,17 @@ typedef struct dap_chain_node_client { bool sync_chains; dap_chain_cell_id_t cell_id; + dap_client_t *client; + dap_stream_worker_t * stream_worker; + + // Channel chain + dap_stream_ch_t * ch_chain; + uint128_t ch_chain_uuid; + // Channel chain net + dap_stream_ch_t * ch_chain_net; + uint128_t ch_chain_net_uuid; + dap_chain_node_info_t * info; dap_events_t *events; @@ -84,6 +94,7 @@ typedef struct dap_chain_node_client { bool keep_connection; + bool is_reconnecting; // callbacks dap_chain_node_client_callback_t callback_connected; dap_chain_node_client_callback_t callback_discconnected; -- GitLab