diff --git a/CMakeLists.txt b/CMakeLists.txt index 685992e884dc94285a448b2fa9a2f633cb469005..9519c9174da45deeb6ad36514de2e1ed8d176761 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.6-1") +set(CELLFRAME_SDK_NATIVE_VERSION "2.6-2") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") @@ -55,7 +55,7 @@ endif() # Networking if (CELLFRAME_MODULES MATCHES "network") message("[+] Module 'network'") - set(CELLFRAME_LIBS ${CELLFRAME_LIBS} dap_server_core dap_json_rpc dap_enc_server dap_http_server dap_udp_server dap_session + set(CELLFRAME_LIBS ${CELLFRAME_LIBS} dap_server_core dap_json_rpc dap_enc_server dap_http_server dap_session dap_stream dap_stream_ch dap_client dap_stream_ch_chain dap_stream_ch_chain_net dap_chain_net dap_chain_mempool magic) endif() diff --git a/dap-sdk/CMakeLists.txt b/dap-sdk/CMakeLists.txt index cc44d581e1355d8536edad52b489ecc24a46f550..02650ba37bb9581fa846483f60fd39a1c91ea432 100644 --- a/dap-sdk/CMakeLists.txt +++ b/dap-sdk/CMakeLists.txt @@ -1,4 +1,4 @@ -set(DAP_SDK_NATIVE_VERSION "2.0-11") +set(DAP_SDK_NATIVE_VERSION "2.0-13") # Core if (DAPSDK_MODULES MATCHES "core") # Core @@ -22,5 +22,5 @@ endif() # Networking server if (DAPSDK_MODULES MATCHES "network-server") add_subdirectory(net/server) - add_subdirectory(net/server-udp) +# add_subdirectory(net/server-udp) endif() diff --git a/dap-sdk/net/client/CMakeLists.txt b/dap-sdk/net/client/CMakeLists.txt index 00f855bceb52d93e73d327260e24cf0b43ee6ac6..74875478768e8aa126cd0e7c6be2ac207b0206c2 100644 --- a/dap-sdk/net/client/CMakeLists.txt +++ b/dap-sdk/net/client/CMakeLists.txt @@ -12,7 +12,7 @@ endif() add_library(${PROJECT_NAME} STATIC ${DAP_CLIENT_HEADERS} ${DAP_CLIENT_SOURCES}) -target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_server_core dap_udp_server dap_http_server dap_enc_server dap_stream dap_session dap_stream_ch json-c) +target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_server_core dap_http_server dap_enc_server dap_stream dap_session dap_stream_ch json-c) if(UNIX AND NOT ANDROID) target_link_libraries(${PROJECT_NAME} rt) diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index 79ca6f4484c0ac269259a660ef45e9956eb04f3d..7eb3b44e1b3c9e250f7bc124432d8e8434cdf9b8 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -30,7 +30,8 @@ // for Unix-like systems #include <sys/types.h> #include <sys/socket.h> -//#include <bits/socket_type.h> +#include <netdb.h> +#include <arpa/inet.h> #endif #include <unistd.h> #include <errno.h> diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 9a97bc1e8fba20e44a7fe936f492e43459c0f360..f2be7f840fc40c359df7b1d2ef0461bcf758c7ee 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -45,6 +45,7 @@ #else #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> +#include <arpa/inet.h> #endif #include <pthread.h> diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c index fd786c671c1b1831f49cb734c6239ffa6f35185e..f29f7f3d8fde6dc2c51966100a673972b5739ccb 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/net/core/dap_events.c @@ -39,7 +39,7 @@ #include <sys/stat.h> #endif -#include <netdb.h> +//#include <netdb.h> #include <unistd.h> #include <fcntl.h> #include <errno.h> diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 8f4f0175040fbdcce85eb79b713e99fa860fc82c..c7acd09a14d39e45e357fc860321afdd39a69368 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -652,12 +652,10 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da ret->socket = a_sock; ret->events = a_events; ret->server = a_server; - ret->is_dont_reset_write_flag = true; memcpy(&ret->callbacks,a_callbacks, sizeof ( ret->callbacks) ); ret->flags = DAP_SOCK_READY_TO_READ; - ret->is_pingable = true; ret->last_time_active = ret->last_ping_request = time( NULL ); pthread_rwlock_wrlock( &a_events->sockets_rwlock ); @@ -693,7 +691,7 @@ dap_events_socket_t *dap_events_socket_find_unsafe( int sock, struct dap_events */ void dap_events_socket_set_readable_unsafe( dap_events_socket_t *sc, bool is_ready ) { - if( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_READ) ) + if( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_READ)) return; sc->ev.events = sc->ev_base_flags; @@ -730,7 +728,7 @@ void dap_events_socket_set_readable_unsafe( dap_events_socket_t *sc, bool is_rea */ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *sc, bool a_is_ready ) { - if ( a_is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_WRITE) ) { + if ( a_is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_WRITE)) { return; } @@ -750,7 +748,7 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *sc, bool a_is_r sc->ev.events = events; - if (sc->worker) + if (sc->worker && sc->server->type != DAP_SERVER_UDP) if ( epoll_ctl(sc->worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &sc->ev) ){ int l_errno = errno; char l_errbuf[128]; diff --git a/dap-sdk/net/core/dap_server.c b/dap-sdk/net/core/dap_server.c index 6f49c8298997a1de133f432a7c9449a25fd95500..e57d203d23c2fb8ff807f5210f25cb4beef2bb68 100644 --- a/dap-sdk/net/core/dap_server.c +++ b/dap-sdk/net/core/dap_server.c @@ -64,7 +64,6 @@ static dap_events_socket_t * s_es_server_create(dap_events_t * a_events, int a_s static void s_es_server_accept(dap_events_socket_t *a_es, int a_remote_socket, struct sockaddr* a_remote_addr); static void s_es_server_error(dap_events_socket_t *a_es, int a_arg); static void s_es_server_new(dap_events_socket_t *a_es, void * a_arg); -static void s_server_delete(dap_server_t * a_server); /** * @brief dap_server_init * @return @@ -86,15 +85,24 @@ void dap_server_deinit() * @brief dap_server_delete * @param a_server */ -void s_server_delete(dap_server_t * a_server) +void dap_server_delete(dap_server_t *a_server) { + while (a_server->es_listeners) { + dap_events_socket_t *l_es = (dap_events_socket_t *)a_server->es_listeners->data; + dap_events_socket_remove_and_delete_mt(l_es->worker, l_es); + dap_list_t *l_tmp = a_server->es_listeners; + a_server->es_listeners = l_tmp->next; + DAP_DELETE(l_tmp); + } if(a_server->delete_callback) a_server->delete_callback(a_server,NULL); - if( a_server->address ) - DAP_DELETE(a_server->address ); - if( a_server->_inheritor ) - DAP_DELETE( a_server->_inheritor ); - DAP_DELETE(a_server); + if( a_server->address ) + DAP_DELETE(a_server->address ); + if( a_server->_inheritor ) + DAP_DELETE( a_server->_inheritor ); + pthread_mutex_destroy(&a_server->started_mutex); + pthread_cond_destroy(&a_server->started_cond); + DAP_DELETE(a_server); } /** @@ -105,7 +113,7 @@ void s_server_delete(dap_server_t * a_server) * @param a_type * @return */ -dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16_t a_port, dap_server_type_t a_type) +dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16_t a_port, dap_server_type_t a_type, dap_events_socket_callbacks_t *a_callbacks) { assert(a_events); dap_server_t *l_server = DAP_NEW_Z(dap_server_t); @@ -117,6 +125,8 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16 if(l_server->type == DAP_SERVER_TCP) l_server->socket_listener = socket(AF_INET, SOCK_STREAM, 0); + else if (l_server->type == DAP_SERVER_UDP) + l_server->socket_listener = socket(AF_INET, SOCK_DGRAM, 0); if (l_server->socket_listener < 0) { int l_errno = errno; @@ -162,13 +172,20 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16 l_callbacks.accept_callback = s_es_server_accept; l_callbacks.error_callback = s_es_server_error; + if (a_callbacks) { + l_callbacks.read_callback = a_callbacks->read_callback; + l_callbacks.write_callback = a_callbacks->write_callback; + l_callbacks.error_callback = a_callbacks->error_callback; + } + for(size_t l_worker_id = 0; l_worker_id < dap_events_worker_get_count() ; l_worker_id++){ dap_worker_t *l_w = dap_events_worker_get(l_worker_id); assert(l_w); dap_events_socket_t * l_es = dap_events_socket_wrap2( l_server, a_events, l_server->socket_listener, &l_callbacks); + l_server->es_listeners = dap_list_append(l_server->es_listeners, l_es); - if ( l_es){ - l_es->type = DESCRIPTOR_TYPE_SOCKET_LISTENING; + if (l_es) { + l_es->type = l_server->type == DAP_SERVER_TCP ? DESCRIPTOR_TYPE_SOCKET_LISTENING : DESCRIPTOR_TYPE_SOCKET; #ifdef DAP_EVENTS_CAPS_EPOLL // Prepare for multi thread listening l_es->ev_base_flags = EPOLLET| EPOLLIN | EPOLLEXCLUSIVE; @@ -259,7 +276,6 @@ static dap_events_socket_t * s_es_server_create(dap_events_t * a_events, int a_s //fcntl(a_sock, F_SETFL, O_NONBLOCK); ret = dap_events_socket_wrap_no_add(a_events, a_sock, a_callbacks); - ret->is_dont_reset_write_flag = true; ret->type = DESCRIPTOR_TYPE_SOCKET; ret->server = a_server; diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 6a2081799ab911a6521789ccc5478ea2be0922ec..969ce8d232ba5291ccaaa8cb1c237dd7a53478db 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -27,6 +27,9 @@ #define _GNU_SOURCE /* See feature_test_macros(7) */ #endif #include <fcntl.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> #include "dap_common.h" #include "dap_math_ops.h" @@ -190,8 +193,15 @@ void *dap_worker_thread(void *arg) break; case DESCRIPTOR_TYPE_SOCKET: l_must_read_smth = true; - l_bytes_read = recv(l_cur->fd, (char *) (l_cur->buf_in + l_cur->buf_in_size), - sizeof(l_cur->buf_in) - l_cur->buf_in_size, 0); + if (l_cur->server->type == DAP_SERVER_TCP) { + l_bytes_read = recv(l_cur->fd, (char *) (l_cur->buf_in + l_cur->buf_in_size), + sizeof(l_cur->buf_in) - l_cur->buf_in_size, 0); + } else if (l_cur->server->type == DAP_SERVER_UDP) { + socklen_t l_size = sizeof(l_cur->remote_addr); + l_bytes_read = recvfrom(l_cur->fd, (char *) (l_cur->buf_in + l_cur->buf_in_size), + sizeof(l_cur->buf_in) - l_cur->buf_in_size, 0, + (struct sockaddr *)&l_cur->remote_addr, &l_size); + } l_errno = errno; break; case DESCRIPTOR_TYPE_SOCKET_LISTENING: @@ -208,6 +218,7 @@ void *dap_worker_thread(void *arg) char l_errbuf[128]; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); log_it(L_WARNING,"accept() on socket %d error:\"%s\"(%d)",l_cur->socket, l_errbuf,l_errno); + break; } } @@ -244,7 +255,8 @@ void *dap_worker_thread(void *arg) continue; } }else{ - log_it(L_WARNING, "We have incomming %u data but no read callback on socket %d, removing from read set", l_cur->socket); + log_it(L_WARNING, "We have incomming %u data but no read callback on socket %d, removing from read set", + l_bytes_read, l_cur->socket); dap_events_socket_set_readable_unsafe(l_cur,false); } } @@ -299,8 +311,14 @@ void *dap_worker_thread(void *arg) int l_errno; switch (l_cur->type){ case DESCRIPTOR_TYPE_SOCKET: - l_bytes_sent = send(l_cur->socket, l_cur->buf_out, - l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL); + if (l_cur->server->type == DAP_SERVER_TCP) { + l_bytes_sent = send(l_cur->socket, (const char *)l_cur->buf_out, + l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL); + } else if (l_cur->server->type == DAP_SERVER_UDP) { + l_bytes_sent = sendto(l_cur->socket, (const char *)l_cur->buf_out, + l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL, + (struct sockaddr *)&l_cur->remote_addr, sizeof(l_cur->remote_addr)); + } l_errno = errno; break; case DESCRIPTOR_TYPE_PIPE: diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 68020f1a339071b7269d987bb897aee2b45f6d7f..32737eba9d1f7b311e2c69da5c86b7b86389880d 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -54,12 +54,12 @@ #define DAP_EVENTS_CAPS_PIPE_POSIX #endif -#if defined(DAP_EVENTS_CAPS_EPOLL) -#include <sys/epoll.h> -#define EPOLL_HANDLE int -#elif defined (DAP_EVENTS_CAPS_WEPOLL) +#if defined(DAP_EVENTS_CAPS_WEPOLL) #include "wepoll.h" #define EPOLL_HANDLE HANDLE +#elif defined (DAP_EVENTS_CAPS_EPOLL) +#include <sys/epoll.h> +#define EPOLL_HANDLE int #endif #define BIT( x ) ( 1 << x ) @@ -144,11 +144,6 @@ typedef struct dap_events_socket { uint32_t buf_out_zero_count; - // Flags - bool is_pingable; - bool is_read_direct; // If set - don't call read() in worker, let operate with handler to callback - bool is_dont_reset_write_flag; // If set - don't reset write flag ever data is over - // Input section union{ uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data @@ -165,7 +160,7 @@ typedef struct dap_events_socket { // Stored string representation char hostaddr[1024]; // Address char service[128]; - struct sockaddr remote_addr; + struct sockaddr_in remote_addr; // For UDP datagrams // Links to related objects dap_events_t *events; diff --git a/dap-sdk/net/core/include/dap_server.h b/dap-sdk/net/core/include/dap_server.h index 44bf62135e5756e0d1c05fb1f62937e4d0b1ccc8..33e7c5ca43e5aedffbe971e3a8549d66ed0babf7 100644 --- a/dap-sdk/net/core/include/dap_server.h +++ b/dap-sdk/net/core/include/dap_server.h @@ -48,10 +48,11 @@ #include "uthash.h" #include "utlist.h" +#include "dap_list.h" #include "dap_cpu_monitor.h" #include "dap_events_socket.h" -typedef enum dap_server_type {DAP_SERVER_TCP} dap_server_type_t; +typedef enum dap_server_type {DAP_SERVER_TCP, DAP_SERVER_UDP} dap_server_type_t; @@ -66,7 +67,7 @@ typedef struct dap_server { char *address; // Listen address int32_t socket_listener; // Socket for listener - dap_events_socket_t * es_listener; + dap_list_t *es_listeners; struct sockaddr_in listener_addr; // Kernel structure for listener's binded address @@ -85,4 +86,5 @@ typedef struct dap_server { int dap_server_init( ); // Init server module void dap_server_deinit( void ); // Deinit server module -dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16_t a_port, dap_server_type_t a_type); +dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16_t a_port, dap_server_type_t a_type, dap_events_socket_callbacks_t *a_callbacks); +void dap_server_delete(dap_server_t *a_server); diff --git a/dap-sdk/net/server-udp/CMakeLists.txt b/dap-sdk/net/server-udp/CMakeLists.txt deleted file mode 100644 index f99bd6023df9336939a93a248e76bc40926efda6..0000000000000000000000000000000000000000 --- a/dap-sdk/net/server-udp/CMakeLists.txt +++ /dev/null @@ -1,19 +0,0 @@ -cmake_minimum_required(VERSION 3.1) -project (dap_udp_server C) - - -file(GLOB DAP_UDP_SERVER_SRCS *.c) -file(GLOB DAP_UDP_SERVER_HEADERS include/*.h) - -if(WIN32) - include_directories(../../../3rdparty/wepoll/) - include_directories(../../../3rdparty/uthash/src/) - #include_directories(../3rdparty/curl/include/) -endif() - -add_library(${PROJECT_NAME} STATIC ${DAP_UDP_SERVER_SRCS} ${DAP_UDP_SERVER_HEADERS}) - -target_link_libraries(${PROJECT_NAME} dap_core dap_server_core) - -target_include_directories(${PROJECT_NAME} INTERFACE .) -target_include_directories(${PROJECT_NAME} PUBLIC include) diff --git a/dap-sdk/net/server-udp/dap_udp_client.c b/dap-sdk/net/server-udp/dap_udp_client.c deleted file mode 100644 index fd4e86e809fa192a12a492537eeeb2f7cd7e51d8..0000000000000000000000000000000000000000 --- a/dap-sdk/net/server-udp/dap_udp_client.c +++ /dev/null @@ -1,185 +0,0 @@ -/* - Copyright (c) 2017-2019 (c) Project "DeM Labs Inc" https://demlabs.net - All rights reserved. - - This file is part of DAP (Deus Applications Prototypes) the open source project - - DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify - it under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - DAP is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <string.h> -#include <stdlib.h> -#include <stdio.h> -#include <stdarg.h> -#include <stdint.h> - -#ifndef _WIN32 -#include <arpa/inet.h> -#include <netinet/in.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <sys/select.h> -#include <sys/queue.h> -#include <errno.h> -#include <netdb.h> -#include <unistd.h> -#include <fcntl.h> -#include <sys/epoll.h> -#else -#include <winsock2.h> -#include <windows.h> -#include <mswsock.h> -#include <ws2tcpip.h> -#include <io.h> -#include <pthread.h> -#endif - -#include "uthash.h" -#include "utlist.h" - -#include "dap_common.h" -#include "dap_udp_client.h" -#include "dap_udp_server.h" - -#define LOG_TAG "udp_client" - -/** - * @brief get_key Make key for hash table from host and port - * @return 64 bit Key - */ -#define get_key( host, key ) (((uint64_t)host << 32) + (uint64_t)port) -extern bool sb_payload_ready; - -/** - * @brief udp_client_create Create new client and add it to hashmap - * @param sh Server instance - * @param host Client host address - * @param w_client Clients event loop watcher - * @param port Client port - * @return Pointer to the new list's node - */ -dap_events_socket_t *dap_udp_client_create( dap_server_t *dap_srv, EPOLL_HANDLE efd, unsigned long host, unsigned short port ) -{ - dap_udp_server_t *udp_server = DAP_UDP_SERVER( dap_srv ); - log_it( L_DEBUG, "Client structure create with host = %x, port = %d", host, port ); - - dap_udp_client_t *inh = DAP_NEW_Z( dap_udp_client_t ); - inh->host_key = get_key( host, port ); - - dap_events_socket_t *ret = dap_events_socket_wrap_no_add( dap_srv->es_listener->events, dap_srv->socket_listener, &dap_srv->client_callbacks); - inh->esocket = ret; - - ret->server = dap_srv; - - ret->flags = DAP_SOCK_READY_TO_READ; - -// ret->signal_close = false; -// ret->_ready_to_read = true; -// ret->_ready_to_write = false; - - ret->_inheritor = inh; - - pthread_mutex_init( &inh->mutex_on_client, NULL ); - - pthread_mutex_lock( &udp_server->mutex_on_list ); - HASH_ADD_INT( udp_server->hclients, host_key, inh ); - pthread_mutex_unlock( &udp_server->mutex_on_list ); - - if( dap_srv->client_callbacks.new_callback ) - dap_srv->client_callbacks.new_callback( ret, NULL ); // Init internal structure - - return ret; -} - -/** - * @brief udp_client_get_address Get host address and port of client - * @param client Pointer to client structure - * @param host Variable for host address - * @param host Variable for port - */ -void dap_udp_client_get_address( dap_events_socket_t *client, unsigned int* host, unsigned short* port ) -{ - dap_udp_client_t* udp_client = DAP_UDP_CLIENT( client ); - *host = udp_client->host_key >> 32; - *port = udp_client->host_key; -} - -/** - * @brief udp_client_find Find client structure by host address and port - * @param sh Server instance - * @param host Source host address - * @param port Source port - * @return Pointer to client or NULL if not found - */ -dap_events_socket_t *dap_udp_client_find( dap_server_t *dap_srv, unsigned long host, unsigned short port ) -{ - dap_udp_client_t *inh = NULL; - dap_udp_server_t *udp_server = DAP_UDP_SERVER( dap_srv ); - - uint64_t token = get_key( host, port ); - - pthread_mutex_lock( &udp_server->mutex_on_list ); - HASH_FIND_INT( udp_server->hclients, &token, inh ); - pthread_mutex_unlock( &udp_server->mutex_on_list ); - - if( inh == NULL ) - return NULL; - else - return inh->esocket; -} - -/** - * @brief add_waiting_client Add Client to write queue - * @param client Client instance - */ -void add_waiting_client( dap_events_socket_t *dap_rclient ) -{ - dap_udp_client_t* udp_cl, *tmp; - - dap_server_t *dap_srv = dap_rclient->server; - dap_udp_server_t *udp_server = DAP_UDP_SERVER( dap_srv ); - dap_udp_client_t *udp_client = DAP_UDP_CLIENT( dap_rclient ); - - pthread_mutex_lock( &udp_server->mutex_on_list ); - LL_FOREACH_SAFE( udp_server->waiting_clients, udp_cl, tmp ) { - if( udp_cl == udp_client ) { - pthread_mutex_unlock( &udp_server->mutex_on_list ); - return; - } - } - LL_APPEND( udp_server->waiting_clients, udp_client ); - pthread_mutex_unlock( &udp_server->mutex_on_list ); -} - -size_t dap_udp_client_write_unsafe( dap_events_socket_t *dap_rclient, const void *data, size_t data_size ) -{ - size_t size = dap_events_socket_write_unsafe( dap_rclient, data, data_size ); - add_waiting_client( dap_rclient ); - return size; -} - -size_t dap_udp_client_write_f( dap_events_socket_t *dap_rclient, const char * a_format, ... ) -{ - size_t size = 0; - va_list va; - - va_start( va, a_format ); - size = dap_events_socket_write_f_unsafe( dap_rclient, a_format, va ); - va_end( va ); - - add_waiting_client( dap_rclient ); - return size; -} - diff --git a/dap-sdk/net/server-udp/dap_udp_server.c b/dap-sdk/net/server-udp/dap_udp_server.c deleted file mode 100644 index 26a0fc39bb2bd9378a6b91011415cfe163468a62..0000000000000000000000000000000000000000 --- a/dap-sdk/net/server-udp/dap_udp_server.c +++ /dev/null @@ -1,399 +0,0 @@ -/* - Copyright (c) 2017-2019 (c) Project "DeM Labs Inc" https://github.com/demlabsinc - All rights reserved. - - This file is part of DAP (Deus Applications Prototypes) the open source project - - DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify - it under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - DAP is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <string.h> -#include <time.h> -#include <stdio.h> -#include <stdlib.h> -#include <stddef.h> -#include <signal.h> -#include <stdint.h> - -#ifndef _WIN32 -#include <arpa/inet.h> -#include <netinet/in.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <sys/select.h> -#include <sys/queue.h> -#include <errno.h> -#include <netdb.h> -#include <unistd.h> -#include <fcntl.h> -#include <sys/epoll.h> -#else -#include <winsock2.h> -#include <windows.h> -#include <mswsock.h> -#include <ws2tcpip.h> -#include <io.h> -#include <pthread.h> -#endif - -#include "uthash.h" -#include "utlist.h" - -#include "dap_common.h" -#include "dap_udp_server.h" - -#define LOG_TAG "dap_udp_server" - -#define BUFSIZE 1024 - -char buf[ BUFSIZE ]; /* message buf */ -bool sb_payload_ready; -//struct ev_io w_read; -//struct ev_io w_write; - -EPOLL_HANDLE efd_read = (EPOLL_HANDLE)-1; - -//static void write_cb( EPOLL_HANDLE efd, int revents ); - -int check_close( dap_events_socket_t *client ); - -/** - */ -static void error( char *msg ) { - - perror( msg ); - exit( 1 ); -} - -/** - * @brief dap_udp_server_new Initialize server structure - * @return Server pointer - */ -dap_server_t *dap_udp_server_new( ) -{ - dap_udp_server_t *udp_server = (dap_udp_server_t *)calloc( 1, sizeof(dap_udp_server_t) ); - udp_server->waiting_clients = NULL; - - dap_server_t *sh = (dap_server_t *) calloc( 1, sizeof(dap_server_t) ); - sh->_inheritor = udp_server; - - udp_server->dap_server = sh; - - return sh; -} - -/** - * @brief dap_udp_server_delete Safe delete server structure - * @param sh Server instance - */ -void dap_udp_server_delete( dap_server_t *sh ) -{ - if ( !sh ) return; - -// dap_client_remote_t *client, *tmp; -// dap_udp_server_t *udps = (dap_udp_server_t *)sh->_inheritor; - -// if ( !udps ) return; - - if( sh->address ) - free( sh->address ); - -// HASH_ITER( hh, udps->hclients, client, tmp ) -// dap_client_remote_remove( client ); - - if ( sh->delete_callback ) - sh->delete_callback( sh, NULL ); - - if ( sh->_inheritor ) - free( sh->_inheritor ); - - free( sh ); -} - -/** - * @brief dap_udp_server_listen Create and bind server structure - * @param port Binding port - * @return Server instance - */ -dap_server_t *dap_udp_server_listen( uint16_t port ) { - - dap_server_t *sh = dap_udp_server_new( ); - - sh->socket_listener = socket( AF_INET, SOCK_DGRAM, 0 ); - - if ( sh->socket_listener < 0 ) { - log_it ( L_ERROR, "Socket error %s", strerror(errno) ); - dap_udp_server_delete( sh ); - return NULL; - } - - int optval = 1; - if ( setsockopt( sh->socket_listener, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval , sizeof(int)) < 0 ) - log_it( L_WARNING, "Can't set up REUSEADDR flag to the socket" ); - - memset( (char *)&(sh->listener_addr), 0, sizeof(sh->listener_addr) ); - - sh->listener_addr.sin_family = AF_INET; - sh->listener_addr.sin_addr.s_addr = htonl( INADDR_ANY ); - sh->listener_addr.sin_port = htons( port ); - - if ( bind(sh->socket_listener, (struct sockaddr *) &(sh->listener_addr), sizeof(sh->listener_addr)) < 0) { - log_it( L_ERROR, "Bind error: %s", strerror(errno) ); - dap_udp_server_delete( sh ); - return NULL; - } - log_it(L_INFO, "UDP server listening port 0.0.0.0:%d", port); - pthread_mutex_init( &DAP_UDP_SERVER(sh)->mutex_on_list, NULL ); - pthread_mutex_init( &DAP_UDP_SERVER(sh)->mutex_on_hash, NULL ); - - return sh; -} - -/** - * @brief write_cb - */ -static void write_cb( EPOLL_HANDLE efd, int revents, dap_server_t *sh ) -{ - UNUSED(revents); - dap_udp_client_t *udp_client, *tmp; - -// dap_server_t *sh = watcher->data; - dap_udp_server_t *udp = DAP_UDP_SERVER( sh ); - - pthread_mutex_lock( &udp->mutex_on_list ); - - LL_FOREACH_SAFE( udp->waiting_clients, udp_client, tmp ) { - - //log_it(L_INFO,"write_cb"); - //pthread_mutex_lock(&udp_client->mutex_on_client); - - dap_events_socket_t *client = udp_client->esocket; - - if( client != NULL && !check_close(client) && (client->flags & DAP_SOCK_READY_TO_WRITE) ) { - if ( client->buf_out_size > 0 ) { - struct sockaddr_in addr; - addr.sin_family = AF_INET; - dap_udp_client_get_address( client, (unsigned int *)&addr.sin_addr.s_addr, &addr.sin_port ); - //log_it(L_INFO,"write_cb_client host = %x, port = %d, socket = %x", addr.sin_addr.s_addr, addr.sin_port, sh->socket_listener); - for( size_t total_sent = 0; total_sent < client->buf_out_size; ) { - - int bytes_sent = sendto( sh->socket_listener, client->buf_out + total_sent, - client->buf_out_size - total_sent, 0, (struct sockaddr*) &addr, sizeof(addr) ); - - if ( bytes_sent < 0 ) { - log_it(L_ERROR,"Some error occured in send() function"); - break; - } - total_sent += bytes_sent; - } - client->buf_out_size = 0; - memset( client->buf_out, 0, sizeof(client->buf_out) ); - client->flags &= ~DAP_SOCK_READY_TO_WRITE; - sb_payload_ready = false; - } - LL_DELETE( udp->waiting_clients, udp_client ); - if ( sh->client_callbacks.write_callback ) - sh->client_callbacks.write_callback( client, NULL ); - } - else if( client == NULL ) { - LL_DELETE( udp->waiting_clients, udp_client ); - } - //pthread_mutex_unlock(&udp_client->mutex_on_client); - - } // for client - pthread_mutex_unlock(&udp->mutex_on_list); -} - -/** - * @brief check_close Check if client need to close - * @param client Client structure - * @return 1 if client deleted, 0 if client is no need to delete - */ -int check_close( dap_events_socket_t *client ) -{ - dap_udp_client_t *client_check, *tmp; - - if( !(client->flags & DAP_SOCK_SIGNAL_CLOSE) ) - return 0; - - dap_udp_client_t *udp_client = DAP_UDP_CLIENT( client ); - dap_server_t *sh = client->server; - dap_udp_server_t *udp_server = DAP_UDP_SERVER( sh ); - - LL_FOREACH_SAFE( udp_server->waiting_clients, client_check, tmp ) { - - if ( client_check->host_key == udp_client->host_key ) - LL_DELETE( udp_server->waiting_clients, client_check ); - } - - dap_events_socket_remove_and_delete_mt(client->worker, client ); - - return 1; -} - -/** - * @brief read_cb - */ -static void read_cb( EPOLL_HANDLE efd, int revents, dap_server_t *sh ) -{ - UNUSED(revents); -// if ( !(revents & EV_READ) ) return; - - struct sockaddr_in clientaddr; - socklen_t clientlen = sizeof(clientaddr); -// dap_server_t *sh = watcher->data; - - memset( buf, 0, BUFSIZE ); - - int32_t bytes = (int32_t) recvfrom( sh->socket_listener, buf, BUFSIZE, 0,(struct sockaddr *) &clientaddr, &clientlen ); - - dap_events_socket_t *client = dap_udp_client_find( sh, clientaddr.sin_addr.s_addr, clientaddr.sin_port ); - - if( client != NULL && check_close(client) != 0 ) - return; - - if ( bytes > 0 ) { - - char *hostaddrp = inet_ntoa( clientaddr.sin_addr ); - - if ( hostaddrp == NULL ) { - dap_udp_server_delete( sh ); - error("ERROR on inet_ntoa\n"); - } - - if ( client == NULL ) { - client = dap_udp_client_create( sh, efd, clientaddr.sin_addr.s_addr, clientaddr.sin_port ); - if(client == NULL) { - dap_udp_server_delete( sh ); - error("ERROR create client structure\n"); - } - } - - dap_udp_client_t* udp_client = client->_inheritor; - - pthread_mutex_lock( &udp_client->mutex_on_client ); - - size_t bytes_processed = 0; - size_t bytes_recieved = bytes; - - while ( bytes_recieved > 0 ) { - - size_t bytes_to_transfer = 0; - - if ( bytes_recieved > UDP_CLIENT_BUF - client->buf_in_size ) - bytes_to_transfer = UDP_CLIENT_BUF - client->buf_in_size; - else - bytes_to_transfer = bytes_recieved; - - memcpy( client->buf_in + client->buf_in_size,buf + bytes_processed, bytes_to_transfer ); - client->buf_in_size += bytes_to_transfer; - - if ( sh->client_callbacks.read_callback ) - sh->client_callbacks.read_callback( client, NULL ); - - bytes_processed += bytes_to_transfer; - bytes_recieved -= bytes_to_transfer; - } - - client->buf_in_size = 0; - memset( client->buf_in, 0, sizeof(client->buf_out) ); - - pthread_mutex_unlock( &udp_client->mutex_on_client ); - - } - else if ( bytes < 0 ) { - - log_it( L_ERROR, "Bytes read Error %s", strerror(errno) ); - if( client != NULL ) - client->flags |= DAP_SOCK_SIGNAL_CLOSE; - } - else if (bytes == 0) { - if ( client != NULL ) - client->flags |= DAP_SOCK_SIGNAL_CLOSE; - } -} - -/** - * @brief dap_udp_server_loop Start server event loop - * @param sh Server instance - */ -void dap_udp_server_loop( dap_server_t *d_server ) -{ - efd_read = epoll_create1( 0 ); - - if ( (intptr_t)efd_read == -1 ) { - - log_it( L_ERROR, "epoll_create1 failed" ); - goto udp_error; - } - - sb_payload_ready = false; - - struct epoll_event pev = {0, {0}}; - struct epoll_event events[ 16 ] = {{0, {0}}}; - - pev.events = EPOLLIN | EPOLLERR; - pev.data.fd = d_server->socket_listener; - - if ( epoll_ctl( efd_read, EPOLL_CTL_ADD, d_server->socket_listener, &pev) != 0 ) { - log_it( L_ERROR, "epoll_ctl failed 000" ); - goto udp_error; - } - - while( 1 ) { - - int32_t n = epoll_wait( efd_read, &events[0], 16, -1 ); - - if ( !n ) continue; - - if ( n < 0 ) { - if ( errno == EINTR ) - continue; - log_it( L_ERROR, "Server epoll error" ); - break; - } - - for( int32_t i = 0; i < n; ++ i ) { - - if ( events[i].events & EPOLLIN ) { - read_cb( efd_read, events[i].events, d_server ); - } - if ( events[i].events & EPOLLOUT) { - // Do nothing. It always true until socket eturn EAGAIN - } - if (sb_payload_ready) { - write_cb( efd_read, events[i].events, d_server ); - } - if( events[i].events & EPOLLERR ) { - log_it( L_ERROR, "Server socket error event" ); - goto udp_error; - } - } - - } - -udp_error: - - #ifndef _WIN32 - if ( efd_read != -1 ) - close( efd_read ); - #else - if ( efd_read != INVALID_HANDLE_VALUE ) - epoll_close( efd_read ); - #endif - - return; -} - diff --git a/dap-sdk/net/server-udp/include/dap_udp_client.h b/dap-sdk/net/server-udp/include/dap_udp_client.h deleted file mode 100644 index f1b3844f5123cae65beb051171dc62c9211e7a1a..0000000000000000000000000000000000000000 --- a/dap-sdk/net/server-udp/include/dap_udp_client.h +++ /dev/null @@ -1,66 +0,0 @@ -/* - Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc - All rights reserved. - - This file is part of DAP (Deus Applications Prototypes) the open source project - - DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify - it under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - DAP is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. -*/ -#pragma once - -#include <stdint.h> -#include <stddef.h> -#include <stdbool.h> - -#ifndef WIN32 -#include <sys/queue.h> -#endif - -#include "dap_events_socket.h" -#include "dap_server.h" -#include "uthash.h" - -typedef struct dap_udp_server dap_udp_server_t; -struct dap_udp_client; - -#define UDP_CLIENT_BUF 65535 - -typedef struct dap_udp_client { - - dap_events_socket_t *esocket; - uint64_t host_key; //key contains host address in first 4 bytes and port in last 4 bytes - - UT_hash_handle hh; - - struct dap_udp_client *next, *prev; //pointers for writing queue - pthread_mutex_t mutex_on_client; - - void *_inheritor; // Internal data to specific client type, usualy states for state machine - -} dap_udp_client_t; // Node of bidirectional list of clients - -#define DAP_UDP_CLIENT(a) ((dap_udp_client_t *) (a)->_inheritor) - -dap_events_socket_t *dap_udp_client_create( dap_server_t *sh, EPOLL_HANDLE efd, unsigned long host, unsigned short port ); // Create new client and add it to the list -dap_events_socket_t *dap_udp_client_find( dap_server_t *sh, unsigned long host, unsigned short port ); // Find client by host and port - -void dap_udp_client_ready_to_read( dap_events_socket_t *sc, bool is_ready ); -void dap_udp_client_ready_to_write( dap_events_socket_t *sc, bool is_ready ); - -size_t dap_udp_client_write_unsafe( dap_events_socket_t *sc, const void * data, size_t data_size ); -size_t dap_udp_client_write_f( dap_events_socket_t *a_client, const char * a_format, ... ); - -void add_waiting_client( dap_events_socket_t *client ); // Add client to writing queue - -void dap_udp_client_get_address( dap_events_socket_t *client, unsigned int *host, unsigned short *port ); diff --git a/dap-sdk/net/server-udp/include/dap_udp_server.h b/dap-sdk/net/server-udp/include/dap_udp_server.h deleted file mode 100644 index f1725b4d141d6c6704d00d9f3eab397913a48593..0000000000000000000000000000000000000000 --- a/dap-sdk/net/server-udp/include/dap_udp_server.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - Copyright (c) 2017-2019 (c) Project "DeM Labs Inc" https://github.com/demlabsinc - All rights reserved. - - This file is part of DAP (Deus Applications Prototypes) the open source project - - DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify - it under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - DAP is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. -*/ - -#pragma once - -#ifndef WIN32 - -#include <stdint.h> -#include <sys/socket.h> -#include <netdb.h> -#include <arpa/inet.h> -#include <netinet/in.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <sys/select.h> -#include <sys/queue.h> -#define EPOLL_HANDLE int -#endif - -#include "dap_udp_client.h" -#include "dap_server.h" - -struct dap_udp_server; - -typedef struct dap_udp_thread { - pthread_t tid; -} dap_udp_thread_t; - -typedef void (*dap_udp_server_callback_t) (struct dap_udp_server *,void *arg); // Callback for specific server's operations - -typedef struct dap_udp_server { - - dap_udp_client_t *hclients; - dap_udp_client_t *waiting_clients; // List clients for writing data - pthread_mutex_t mutex_on_list; - pthread_mutex_t mutex_on_hash; - void *_inheritor; - dap_server_t *dap_server; - -} dap_udp_server_t; - -#define DAP_UDP_SERVER(a) ((dap_udp_server_t *) (a)->_inheritor) - -void dap_udp_server_delete( dap_server_t *sh ); -void dap_udp_server_loop( dap_server_t *udp_server ); // Start server event loop -dap_server_t *dap_udp_server_listen( uint16_t port ); // Create and bind serv diff --git a/dap-sdk/net/server-udp/test/CMakeLists.txt b/dap-sdk/net/server-udp/test/CMakeLists.txt deleted file mode 100644 index 998462efc70d1e565c198cca14e9d87496a3fc84..0000000000000000000000000000000000000000 --- a/dap-sdk/net/server-udp/test/CMakeLists.txt +++ /dev/null @@ -1,20 +0,0 @@ -cmake_minimum_required(VERSION 3.0) -project(udp-server-test) - -set(CMAKE_C_STANDARD 11) - -if ( NOT ( TARGET dap_test ) ) - add_subdirectory(libdap-test) -endif() - -file(GLOB SRC *.h *.c) - -add_executable(${PROJECT_NAME} ${SRC}) - -target_link_libraries(${PROJECT_NAME} dap_test) - -add_test( - NAME udp-server-test - COMMAND udp-server-test -) - diff --git a/dap-sdk/net/server-udp/test/main.c b/dap-sdk/net/server-udp/test/main.c deleted file mode 100644 index e02c1ef950a56e4077faed96e1ee2f6c5cfe8a3f..0000000000000000000000000000000000000000 --- a/dap-sdk/net/server-udp/test/main.c +++ /dev/null @@ -1,5 +0,0 @@ -#include <stdio.h> -int main() -{ - return 0; -} diff --git a/dap-sdk/net/server/http_server/dap_http_simple.c b/dap-sdk/net/server/http_server/dap_http_simple.c index 9428a6f00d77ed74601667fa6a974debb43086f8..0bc3b51be66a6dcb2e9ab01dd1a853b0703c0f84 100644 --- a/dap-sdk/net/server/http_server/dap_http_simple.c +++ b/dap-sdk/net/server/http_server/dap_http_simple.c @@ -376,7 +376,6 @@ static void s_http_client_data_write( dap_http_client_t * a_http_client, void *a l_http_simple->reply_sent += dap_events_socket_write_unsafe( a_http_client->esocket, l_http_simple->reply_byte + l_http_simple->reply_sent, a_http_client->out_content_length - l_http_simple->reply_sent ); - dap_events_socket_set_writable_unsafe(a_http_client->esocket, true); if ( l_http_simple->reply_sent >= a_http_client->out_content_length ) { log_it(L_INFO, "All the reply (%u) is sent out", a_http_client->out_content_length ); diff --git a/dap-sdk/net/stream/ch/CMakeLists.txt b/dap-sdk/net/stream/ch/CMakeLists.txt index 657546fb9d4c785c44536926283602b31d8c0765..bf7b26a76fabbf2f908ce835ec278d7bc55bcf37 100644 --- a/dap-sdk/net/stream/ch/CMakeLists.txt +++ b/dap-sdk/net/stream/ch/CMakeLists.txt @@ -11,7 +11,7 @@ endif() add_library(${PROJECT_NAME} STATIC ${DAP_STREAM_CH_SRCS} ${DAP_STREAM_CH_HDRS}) -target_link_libraries(dap_stream_ch dap_core dap_crypto dap_udp_server dap_stream ) +target_link_libraries(dap_stream_ch dap_core dap_crypto dap_stream ) target_include_directories(dap_stream_ch INTERFACE .) target_include_directories(${PROJECT_NAME} PUBLIC include) diff --git a/dap-sdk/net/stream/stream/CMakeLists.txt b/dap-sdk/net/stream/stream/CMakeLists.txt index 1b4ed0617c2f8cbe2b0ef1838a3bdbdc25daea5e..60416f45235c8f6c6a78bf60ed431820d736c933 100755 --- a/dap-sdk/net/stream/stream/CMakeLists.txt +++ b/dap-sdk/net/stream/stream/CMakeLists.txt @@ -12,7 +12,7 @@ endif() add_library(${PROJECT_NAME} STATIC ${STREAM_SRCS} ${STREAM_HDRS}) -target_link_libraries(dap_stream dap_core dap_server_core dap_udp_server dap_crypto +target_link_libraries(dap_stream dap_core dap_server_core dap_crypto dap_http_server dap_enc_server dap_session dap_stream_ch) target_include_directories(dap_stream INTERFACE .) diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index e9598cdf910e84952313b2e19feb0793448a701b..9904f8dac112dc590379281f94be244dfe0f089d 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -50,7 +50,6 @@ #include "dap_http.h" #include "dap_http_client.h" #include "dap_http_header.h" -#include "dap_udp_server.h" #include "dap_stream_worker.h" #define LOG_TAG "dap_stream" @@ -168,13 +167,12 @@ void dap_stream_add_proc_http(struct dap_http * a_http, const char * a_url) * @brief stream_add_proc_udp Add processor callback for streaming * @param a_udp_server UDP server instance */ -void dap_stream_add_proc_udp(dap_udp_server_t * a_udp_server) +void dap_stream_add_proc_udp(dap_server_t *a_udp_server) { - dap_server_t* l_server = a_udp_server->dap_server; - l_server->client_callbacks.read_callback = s_esocket_data_read; - l_server->client_callbacks.write_callback = s_esocket_write; - l_server->client_callbacks.delete_callback = s_esocket_callback_delete; - l_server->client_callbacks.new_callback = s_udp_esocket_new; + a_udp_server->client_callbacks.read_callback = s_esocket_data_read; + a_udp_server->client_callbacks.write_callback = s_esocket_write; + a_udp_server->client_callbacks.delete_callback = s_esocket_callback_delete; + a_udp_server->client_callbacks.new_callback = s_udp_esocket_new; } /** diff --git a/dap-sdk/net/stream/stream/dap_stream_pkt.c b/dap-sdk/net/stream/stream/dap_stream_pkt.c index 9642362c3cb2f06c839b935572e88e7b291c446d..13f3b4224df48503a44c93cc03a4dd4cc53d0ecc 100644 --- a/dap-sdk/net/stream/stream/dap_stream_pkt.c +++ b/dap-sdk/net/stream/stream/dap_stream_pkt.c @@ -152,7 +152,6 @@ size_t dap_stream_pkt_write_unsafe(dap_stream_t * a_stream, const void * a_data, ret+=dap_events_socket_write_unsafe(a_stream->esocket,&pkt_hdr,sizeof(pkt_hdr)); ret+=dap_events_socket_write_unsafe(a_stream->esocket,l_buf_selected,pkt_hdr.size); - dap_events_socket_set_writable_unsafe(a_stream->esocket, true); if(l_buf_allocated) DAP_DELETE(l_buf_allocated); diff --git a/dap-sdk/net/stream/stream/include/dap_stream.h b/dap-sdk/net/stream/stream/include/dap_stream.h index 0c0325bc3064b3903506d466f48e435025e78162..71fb3215181840dfae86161f86dc25dc350d1297 100644 --- a/dap-sdk/net/stream/stream/include/dap_stream.h +++ b/dap-sdk/net/stream/stream/include/dap_stream.h @@ -32,8 +32,6 @@ #include "dap_stream_ch.h" #include "dap_events_socket.h" -#include "dap_udp_server.h" -#include "dap_udp_client.h" #define CHUNK_SIZE_MAX (3 * 1024) @@ -101,7 +99,7 @@ void dap_stream_deinit(); void dap_stream_add_proc_http(dap_http_t * sh, const char * url); -void dap_stream_add_proc_udp(dap_udp_server_t * sh); +void dap_stream_add_proc_udp(dap_server_t *a_udp_server); dap_stream_t* dap_stream_new_es_client(dap_events_socket_t * a_es); size_t dap_stream_data_proc_read(dap_stream_t * a_stream); diff --git a/modules/chain/dap_chain_cs.c b/modules/chain/dap_chain_cs.c index 905049c2e112aa3ba835ca8d4f30a8b899e5732b..491a35002a51330d1a2c4cb69dc8d37bf5d81046 100644 --- a/modules/chain/dap_chain_cs.c +++ b/modules/chain/dap_chain_cs.c @@ -123,7 +123,7 @@ int dap_chain_cs_create(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) if ( l_item ) { log_it(L_NOTICE,"Consensus \"%s\" found, prepare to parse config file",l_item->name ); l_item->callback_init( a_chain, a_chain_cfg); - // TODO + DAP_CHAIN_PVT(a_chain)->cs_name = l_item->name; return 0; } else { log_it(L_ERROR,"Can't find consensus \"%s\"",dap_config_get_item_str( a_chain_cfg, "chain", "consensus")); diff --git a/modules/chain/dap_chain_ledger.c b/modules/chain/dap_chain_ledger.c index e0cf28cbf82400ac48ceb062dd45b6956c66352f..0f66f0343ac4482091d452ee3de40a29048f30c1 100644 --- a/modules/chain/dap_chain_ledger.c +++ b/modules/chain/dap_chain_ledger.c @@ -52,7 +52,6 @@ #include "dap_chain_datum_token.h" #include "dap_chain_mempool.h" #include "dap_chain_global_db.h" -#include "dap_chain_net.h" #include "dap_chain_ledger.h" #define LOG_TAG "dap_chain_ledger" @@ -69,18 +68,12 @@ static pthread_rwlock_t s_verificators_rwlock; #define MAX_OUT_ITEMS 10 typedef struct dap_chain_ledger_token_emission_item { dap_chain_hash_fast_t datum_token_emission_hash; - // while these are not needed - //dap_chain_hash_fast_t datum_tx_token_hash; - //dap_chain_tx_token_t * tx_token; - dap_chain_datum_token_emission_t *datum_token_emission; UT_hash_handle hh; } dap_chain_ledger_token_emission_item_t; typedef struct dap_chain_ledger_token_item { char ticker[DAP_CHAIN_TICKER_SIZE_MAX]; - dap_chain_hash_fast_t datum_token_hash; - uint8_t padding[6]; dap_chain_datum_token_t * datum_token; uint64_t total_supply; pthread_rwlock_t token_emissions_rwlock; @@ -88,17 +81,18 @@ typedef struct dap_chain_ledger_token_item { UT_hash_handle hh; } dap_chain_ledger_token_item_t; -// ledger cache item - one of unspendet outputs +// ledger cache item - one of unspent outputs typedef struct dap_chain_ledger_tx_item { dap_chain_hash_fast_t tx_hash_fast; dap_chain_datum_tx_t *tx; - time_t ts_created; - int n_outs; - int n_outs_used; - char token_tiker[10]; - // TODO dynamically allocates the memory in order not to limit the number of outputs in transaction - dap_chain_hash_fast_t tx_hash_spent_fast[MAX_OUT_ITEMS]; // spent outs list - uint8_t padding[6]; + struct { + time_t ts_created; + int n_outs; + int n_outs_used; + char token_tiker[10]; + // TODO dynamically allocates the memory in order not to limit the number of outputs in transaction + dap_chain_hash_fast_t tx_hash_spent_fast[MAX_OUT_ITEMS]; // spent outs list + } cache_data; UT_hash_handle hh; } dap_chain_ledger_tx_item_t; @@ -123,21 +117,24 @@ typedef struct dap_chain_ledger_tx_bound { dap_chain_ledger_tx_item_t *item_out; } dap_chain_ledger_tx_bound_t; -// Gotta use a regular null-terminated string instead, for uthash usability -/*typedef struct dap_ledger_wallet_balance_key{ - dap_chain_addr_t addr; - char ticker[DAP_CHAIN_TICKER_SIZE_MAX]; -} DAP_ALIGN_PACKED dap_ledger_wallet_balance_key_t; */ - // in-memory wallet balance typedef struct dap_ledger_wallet_balance { - //dap_ledger_wallet_balance_key_t key; char *key; char token_ticker[DAP_CHAIN_TICKER_SIZE_MAX]; uint128_t balance; UT_hash_handle hh; } dap_ledger_wallet_balance_t; +typedef struct dap_ledger_cache_item { + dap_chain_hash_fast_t *hash; + bool found; +} dap_ledger_cache_item_t; + +typedef struct dap_ledger_cache_str_item { + char *key; + bool found; +} dap_ledger_cache_str_item_t; + // dap_ledget_t private section typedef struct dap_ledger_private { // List of ledger - unspent transactions cache @@ -163,6 +160,11 @@ typedef struct dap_ledger_private { bool check_cells_ds; bool check_token_emission; dap_chain_cell_id_t local_cell_id; + /* Cache section */ + dap_ledger_cache_item_t last_tx; + dap_ledger_cache_item_t last_thres_tx; + dap_ledger_cache_item_t last_emit; + dap_ledger_cache_str_item_t last_ticker; } dap_ledger_private_t; #define PVT(a) ( (dap_ledger_private_t* ) a->_internal ) @@ -282,13 +284,19 @@ int dap_chain_ledger_token_add(dap_ledger_t * a_ledger, dap_chain_datum_token_t l_token_item = DAP_NEW_Z(dap_chain_ledger_token_item_t); dap_snprintf(l_token_item->ticker,sizeof (l_token_item->ticker),"%s",a_token->ticker); pthread_rwlock_init(&l_token_item->token_emissions_rwlock,NULL); - l_token_item->datum_token = DAP_NEW_Z_SIZE(dap_chain_datum_token_t, a_token_size); - memcpy(l_token_item->datum_token, a_token,a_token_size); - dap_hash_fast(a_token,a_token_size, &l_token_item->datum_token_hash); + HASH_ADD_STR(PVT(a_ledger)->tokens, ticker, l_token_item); + // Add it to cache + dap_chain_datum_token_t *l_token_cache = DAP_NEW_Z_SIZE(dap_chain_datum_token_t, a_token_size); + memcpy(l_token_cache, a_token, a_token_size); + char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "tokens"); + if (!dap_chain_global_db_gr_set(dap_strdup(a_token->ticker), l_token_cache, a_token_size, l_gdb_group)) { + log_it(L_WARNING, "Ledger cache mismatch"); + DAP_DELETE(l_token_cache); + } + DAP_DELETE(l_gdb_group); - HASH_ADD_STR(PVT(a_ledger)->tokens, ticker, l_token_item) ; switch(a_token->type){ case DAP_CHAIN_DATUM_TOKEN_TYPE_SIMPLE: l_token_item->total_supply = a_token->header_private.total_supply; @@ -311,6 +319,15 @@ int dap_chain_ledger_token_add(dap_ledger_t * a_ledger, dap_chain_datum_token_t return 0; } +int dap_chain_ledger_token_load(dap_ledger_t *a_ledger, dap_chain_datum_token_t *a_token, size_t a_token_size) +{ + if (PVT(a_ledger)->last_ticker.found) { + return dap_chain_ledger_token_add(a_ledger, a_token, a_token_size); + } else if (!strncmp(PVT(a_ledger)->last_ticker.key, a_token->ticker, DAP_CHAIN_TICKER_SIZE_MAX)) { + PVT(a_ledger)->last_ticker.found = true; + } + return 0; +} /** * @brief s_treshold_emissions_proc * @param a_ledger @@ -332,21 +349,120 @@ static void s_treshold_txs_proc( dap_ledger_t * a_ledger) } +void dap_chain_ledger_load_cache(dap_ledger_t *a_ledger) +{ + dap_ledger_private_t *l_ledger_pvt = PVT(a_ledger); + + char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "tokens"); + size_t l_objs_count = 0; + dap_global_db_obj_t *l_objs = dap_chain_global_db_gr_load(l_gdb_group, &l_objs_count); + for (size_t i = 0; i < l_objs_count; i++) { + dap_chain_ledger_token_item_t *l_token_item = DAP_NEW_Z(dap_chain_ledger_token_item_t); + strncpy(l_token_item->ticker, l_objs[i].key, sizeof(l_token_item->ticker)); + l_token_item->datum_token = DAP_NEW_Z_SIZE(dap_chain_datum_token_t, l_objs[i].value_len); + memcpy(l_token_item->datum_token, l_objs[i].value, l_objs[i].value_len); + pthread_rwlock_init(&l_token_item->token_emissions_rwlock, NULL); + if (l_token_item->datum_token->type == DAP_CHAIN_DATUM_TOKEN_TYPE_SIMPLE) { + l_token_item->total_supply = l_token_item->datum_token->header_private.total_supply; + } + HASH_ADD_STR(PVT(a_ledger)->tokens, ticker, l_token_item); + if (i == l_objs_count - 1) { + l_ledger_pvt->last_ticker.key = l_token_item->ticker; + } + } + dap_chain_global_db_objs_delete(l_objs, l_objs_count); + DAP_DELETE(l_gdb_group); + if (l_objs_count == 0) { + l_ledger_pvt->last_ticker.found = true; + } + + l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "emissions"); + l_objs_count = 0; + l_objs = dap_chain_global_db_gr_load(l_gdb_group, &l_objs_count); + for (size_t i = 0; i < l_objs_count; i++) { + dap_chain_ledger_token_emission_item_t *l_emission_item = DAP_NEW_Z(dap_chain_ledger_token_emission_item_t); + dap_chain_str_to_hash_fast(l_objs[i].key, &l_emission_item->datum_token_emission_hash); + l_emission_item->datum_token_emission = DAP_NEW_Z_SIZE(dap_chain_datum_token_emission_t, l_objs[i].value_len); + memcpy(l_emission_item->datum_token_emission, l_objs[i].value, l_objs[i].value_len); + const char * c_token_ticker = l_emission_item->datum_token_emission->hdr.ticker; + dap_chain_ledger_token_item_t *l_token_item = NULL; + HASH_FIND_STR(l_ledger_pvt->tokens, c_token_ticker, l_token_item); + if (l_token_item) { + HASH_ADD(hh, l_token_item->token_emissions, datum_token_emission_hash, + sizeof(dap_chain_hash_fast_t), l_emission_item); + } else { + HASH_ADD(hh, l_ledger_pvt->treshold_emissions, datum_token_emission_hash, + sizeof(dap_chain_hash_fast_t), l_emission_item); + } + if (i == l_objs_count - 1) { + l_ledger_pvt->last_emit.hash = &l_emission_item->datum_token_emission_hash; + } + } + dap_chain_global_db_objs_delete(l_objs, l_objs_count); + DAP_DELETE(l_gdb_group); + if (l_objs_count == 0) { + l_ledger_pvt->last_emit.found = true; + } + + l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "txs"); + l_objs_count = 0; + l_objs = dap_chain_global_db_gr_load(l_gdb_group, &l_objs_count); + for (size_t i = 0; i < l_objs_count; i++) { + dap_chain_ledger_tx_item_t *l_tx_item = DAP_NEW_Z(dap_chain_ledger_tx_item_t); + dap_chain_str_to_hash_fast(l_objs[i].key, &l_tx_item->tx_hash_fast); + l_tx_item->tx = DAP_NEW_Z_SIZE(dap_chain_datum_tx_t, l_objs[i].value_len - sizeof(l_tx_item->cache_data)); + memcpy(l_tx_item->tx, l_objs[i].value + sizeof(l_tx_item->cache_data), l_objs[i].value_len - sizeof(l_tx_item->cache_data)); + memcpy(&l_tx_item->cache_data, l_objs[i].value, sizeof(l_tx_item->cache_data)); + HASH_ADD(hh, l_ledger_pvt->ledger_items, tx_hash_fast, sizeof(dap_chain_hash_fast_t), l_tx_item); + if (i == l_objs_count - 1) { + PVT(a_ledger)->last_tx.hash = &l_tx_item->tx_hash_fast; + } + } + dap_chain_global_db_objs_delete(l_objs, l_objs_count); + DAP_DELETE(l_gdb_group); + if (l_objs_count == 0) { + l_ledger_pvt->last_tx.found = true; + } + + //TODO add tx threshold cache if need + l_ledger_pvt->last_thres_tx.found = true; + + l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "balances"); + l_objs_count = 0; + l_objs = dap_chain_global_db_gr_load(l_gdb_group, &l_objs_count); + for (size_t i = 0; i < l_objs_count; i++) { + dap_ledger_wallet_balance_t *l_balance_item = DAP_NEW_Z(dap_ledger_wallet_balance_t); + l_balance_item->key = DAP_NEW_Z_SIZE(char, strlen(l_objs[i].key) + 1); + strcpy(l_balance_item->key, l_objs[i].key); + char *l_ptr = strchr(l_balance_item->key, ' '); + if (l_ptr++) { + strcpy(l_balance_item->token_ticker, l_ptr); + } + l_balance_item->balance = *(uint128_t *)l_objs[i].value; + HASH_ADD_KEYPTR(hh, l_ledger_pvt->balance_accounts, l_balance_item->key, + strlen(l_balance_item->key), l_balance_item); + } + dap_chain_global_db_objs_delete(l_objs, l_objs_count); + DAP_DELETE(l_gdb_group); +} + /** * @brief dap_chain_ledger_create * @param a_check_flags * @return dap_ledger_t */ -dap_ledger_t* dap_chain_ledger_create(uint16_t a_check_flags) +dap_ledger_t* dap_chain_ledger_create(uint16_t a_check_flags, char *a_net_name) { dap_ledger_t *l_ledger = dap_chain_ledger_handle_new(); + l_ledger->net_name = a_net_name; dap_ledger_private_t *l_ledger_priv = PVT(l_ledger); l_ledger_priv->check_flags = a_check_flags; l_ledger_priv->check_ds = a_check_flags & DAP_CHAIN_LEDGER_CHECK_LOCAL_DS; l_ledger_priv->check_cells_ds = a_check_flags & DAP_CHAIN_LEDGER_CHECK_CELLS_DS; l_ledger_priv->check_token_emission = a_check_flags & DAP_CHAIN_LEDGER_CHECK_TOKEN_EMISSION; - // load ledger from mempool - return l_ledger; //dap_chain_ledger_load(l_ledger, "kelvin-testnet", "plasma"); + // load ledger cache from GDB + dap_chain_ledger_load_cache(l_ledger); + return l_ledger; } int dap_chain_ledger_token_emission_add_check(dap_ledger_t *a_ledger, const dap_chain_datum_token_emission_t *a_token_emission @@ -430,16 +546,23 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, memcpy(l_token_emission_item->datum_token_emission, a_token_emission, a_token_emission_size); memcpy(&l_token_emission_item->datum_token_emission_hash, l_token_emission_hash_ptr, sizeof(l_token_emission_hash)); - dap_chain_ledger_token_emission_item_t * l_token_emissions = l_token_item ? - l_token_item->token_emissions : l_ledger_priv->treshold_emissions; - HASH_ADD(hh, l_token_emissions , - datum_token_emission_hash, sizeof(l_token_emission_hash), - l_token_emission_item); - // save pointer to structure - if(l_token_item) - l_token_item->token_emissions = l_token_emissions; - else - l_ledger_priv->treshold_emissions = l_token_emissions; + if (l_token_item) { + HASH_ADD(hh, l_token_item->token_emissions, datum_token_emission_hash, + sizeof(l_token_emission_hash), l_token_emission_item); + } else { + HASH_ADD(hh, l_ledger_priv->treshold_emissions, datum_token_emission_hash, + sizeof(l_token_emission_hash), l_token_emission_item); + } + // Add it to cache + dap_chain_datum_token_emission_t *l_emission_cache = DAP_NEW_Z_SIZE(dap_chain_datum_token_emission_t, a_token_emission_size); + memcpy(l_emission_cache, a_token_emission, a_token_emission_size); + char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "emissions"); + if (!dap_chain_global_db_gr_set(dap_strdup(l_hash_str), l_emission_cache, a_token_emission_size, l_gdb_group)) { + log_it(L_WARNING, "Ledger cache mismatch"); + DAP_DELETE(l_emission_cache); + } + DAP_DELETE(l_gdb_group); + char * l_token_emission_address_str = dap_chain_addr_to_str( &(a_token_emission->hdr.address) ); log_it(L_NOTICE, "Added token emission datum to %s: type=%s value=%.1llf token=%s to_addr=%s ", @@ -466,6 +589,20 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, return ret; } +int dap_chain_ledger_token_emission_load(dap_ledger_t *a_ledger, const dap_chain_datum_token_emission_t *a_token_emission, size_t a_token_emission_size) +{ + if (PVT(a_ledger)->last_emit.found) { + return dap_chain_ledger_token_emission_add(a_ledger, a_token_emission, a_token_emission_size); + } else { + dap_chain_hash_fast_t l_token_emission_hash = {}; + dap_hash_fast(a_token_emission, a_token_emission_size, &l_token_emission_hash); + if (!memcmp(PVT(a_ledger)->last_emit.hash, &l_token_emission_hash, sizeof(dap_chain_hash_fast_t))) { + PVT(a_ledger)->last_emit.found = true; + } + } + return 0; +} + /** * @brief dap_chain_ledger_token_emission_find * @param a_token_ticker @@ -522,7 +659,7 @@ const char* dap_chain_ledger_tx_get_token_ticker_by_hash(dap_ledger_t *a_ledger, pthread_rwlock_rdlock(&l_ledger_priv->ledger_rwlock); HASH_FIND(hh, l_ledger_priv->ledger_items, a_tx_hash, sizeof ( *a_tx_hash), l_item ); pthread_rwlock_unlock(&l_ledger_priv->ledger_rwlock); - return l_item? l_item->token_tiker: NULL; + return l_item ? l_item->cache_data.token_tiker : NULL; } /** @@ -548,7 +685,7 @@ void dap_chain_ledger_addr_get_token_ticker_all(dap_ledger_t *a_ledger, dap_chai for(size_t i = 0; i < l_tickers_size; i++) { if (l_tickers[i]==NULL) break; - if(l_tickers[i] && strcmp(l_tickers[i], l_tx_item->token_tiker) == 0) { + if(l_tickers[i] && strcmp(l_tickers[i], l_tx_item->cache_data.token_tiker) == 0) { l_is_not_in_list = false; break; } @@ -558,7 +695,7 @@ void dap_chain_ledger_addr_get_token_ticker_all(dap_ledger_t *a_ledger, dap_chai l_tickers_size += (l_tickers_size / 2); l_tickers = DAP_REALLOC(l_tickers, l_tickers_size); } - l_tickers[l_tickers_pos] = dap_strdup(l_tx_item->token_tiker); + l_tickers[l_tickers_pos] = dap_strdup(l_tx_item->cache_data.token_tiker); l_tickers_pos++; } dap_chain_hash_fast_t* l_tx_hash = dap_chain_node_datum_tx_calc_hash(l_tx_item->tx); @@ -647,8 +784,8 @@ static bool dap_chain_ledger_item_is_used_out(dap_chain_ledger_tx_item_t *a_item } assert(a_idx_out < MAX_OUT_ITEMS); // if there are used 'out' items - if(a_item->n_outs_used > 0) { - if(!dap_hash_fast_is_blank(&(a_item->tx_hash_spent_fast[a_idx_out]))) + if(a_item->cache_data.n_outs_used > 0) { + if(!dap_hash_fast_is_blank(&(a_item->cache_data.tx_hash_spent_fast[a_idx_out]))) l_used_out = true; } return l_used_out; @@ -888,7 +1025,7 @@ int dap_chain_ledger_tx_cache_check(dap_ledger_t *a_ledger, dap_chain_datum_tx_t l_token = NULL; } if (!l_token || !*l_token) { - l_token = l_item_out->token_tiker; + l_token = l_item_out->cache_data.token_tiker; } if (!*l_token) { log_it(L_WARNING, "No token ticker found in previous transaction"); @@ -1057,6 +1194,19 @@ int dap_chain_ledger_tx_add_check(dap_ledger_t *a_ledger, dap_chain_datum_tx_t * return 0; } +int dap_chain_ledger_balance_cache_update(dap_ledger_t *a_ledger, dap_ledger_wallet_balance_t *a_balance) +{ + char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "balances"); + uint128_t *l_balance_value = DAP_NEW_Z(uint128_t); + *l_balance_value = a_balance->balance; + if (!dap_chain_global_db_gr_set(dap_strdup(a_balance->key), l_balance_value, sizeof(uint128_t), l_gdb_group)) { + log_it(L_WARNING, "Ledger cache mismatch"); + DAP_DELETE(l_balance_value); + return -1; + } + return 0; +} + /** * Add new transaction to the cache list * @@ -1113,8 +1263,8 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx) void *l_item_in = *(void **)&bound_item->in; dap_chain_tx_item_type_t l_type = *(uint8_t *)l_item_in; dap_chain_ledger_tx_item_t *l_prev_item_out = bound_item->item_out; - if (*l_prev_item_out->token_tiker) { - l_token_ticker = l_prev_item_out->token_tiker; + if (*l_prev_item_out->cache_data.token_tiker) { + l_token_ticker = l_prev_item_out->cache_data.token_tiker; } else { // Previous multichannel transaction l_token_ticker = bound_item->out.tx_prev_out_ext->token; } @@ -1140,26 +1290,28 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx) bound_item->out.tx_prev_out_ext->header.value; //log_it(L_DEBUG,"SPEND %lu from addr: %s", l_value, l_wallet_balance_key); wallet_balance->balance -= l_value; + // Update the cache + dap_chain_ledger_balance_cache_update(a_ledger, wallet_balance); } else { log_it(L_ERROR,"!!! Attempt to SPEND from some non-existent balance !!!: %s %s", l_addr_str, l_token_ticker); } DAP_DELETE(l_addr_str); DAP_DELETE(l_wallet_balance_key); /// Mark 'out' item in cache because it used - l_tx_prev_hash = &(l_prev_item_out->tx_hash_spent_fast[l_tx_in->header.tx_out_prev_idx]); + l_tx_prev_hash = &(l_prev_item_out->cache_data.tx_hash_spent_fast[l_tx_in->header.tx_out_prev_idx]); } else { // TX_ITEM_TYPE_IN_COND // all balance deducts performed with previous conditional transaction dap_chain_tx_in_cond_t *l_tx_in_cond = bound_item->in.tx_cur_in_cond; /// Mark 'out' item in cache because it used - l_tx_prev_hash = &(l_prev_item_out->tx_hash_spent_fast[l_tx_in_cond->header.tx_out_prev_idx]); + l_tx_prev_hash = &(l_prev_item_out->cache_data.tx_hash_spent_fast[l_tx_in_cond->header.tx_out_prev_idx]); } memcpy(l_tx_prev_hash, l_tx_hash, sizeof(dap_chain_hash_fast_t)); // add a used output - l_prev_item_out->n_outs_used++; + l_prev_item_out->cache_data.n_outs_used++; char * l_tx_prev_hash_str = dap_chain_hash_fast_to_str_new(l_tx_prev_hash); // delete previous transactions from cache because all out is used - if(l_prev_item_out->n_outs_used == l_prev_item_out->n_outs) { + if(l_prev_item_out->cache_data.n_outs_used == l_prev_item_out->cache_data.n_outs) { dap_chain_hash_fast_t l_tx_prev_hash_to_del = bound_item->tx_prev_hash_fast; // remove from memory ledger int res = dap_chain_ledger_tx_remove(a_ledger, &l_tx_prev_hash_to_del); @@ -1237,6 +1389,8 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx) //log_it(L_DEBUG, "Balance item is present in cache"); wallet_balance->balance += l_value; DAP_DELETE (l_wallet_balance_key); + // Update the cache + dap_chain_ledger_balance_cache_update(a_ledger, wallet_balance); } else { wallet_balance = DAP_NEW_Z(dap_ledger_wallet_balance_t); wallet_balance->key = l_wallet_balance_key; @@ -1245,6 +1399,8 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx) //log_it(L_DEBUG,"!!! Create new balance item: %s %s", l_addr_str, l_token_ticker); HASH_ADD_KEYPTR(hh, PVT(a_ledger)->balance_accounts, wallet_balance->key, strlen(l_wallet_balance_key), wallet_balance); + // Add it to cache + dap_chain_ledger_balance_cache_update(a_ledger, wallet_balance); } #ifdef __ANDROID__ log_it(L_INFO, "Updated balance +%.3Lf %s on addr %s", @@ -1272,15 +1428,15 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx) l_item_tmp = DAP_NEW_Z(dap_chain_ledger_tx_item_t); memcpy(&l_item_tmp->tx_hash_fast, l_tx_hash, sizeof(dap_chain_hash_fast_t)); l_item_tmp->tx = DAP_NEW_SIZE(dap_chain_datum_tx_t, dap_chain_datum_tx_get_size(a_tx)); - l_item_tmp->ts_created = (time_t) a_tx->header.ts_created; + l_item_tmp->cache_data.ts_created = (time_t) a_tx->header.ts_created; //calculate l_item_tmp->n_outs; // If debug mode dump the UTXO if ( dap_log_level_get() == L_DEBUG){ - l_item_tmp->n_outs = 0; - if( l_item_tmp->n_outs){ - dap_list_t *l_tist_tmp = dap_chain_datum_tx_items_get(a_tx, TX_ITEM_TYPE_OUT, &l_item_tmp->n_outs); - for (size_t i =0; i < (size_t) l_item_tmp->n_outs; i++){ + l_item_tmp->cache_data.n_outs = 0; + if( l_item_tmp->cache_data.n_outs){ + dap_list_t *l_tist_tmp = dap_chain_datum_tx_items_get(a_tx, TX_ITEM_TYPE_OUT, &l_item_tmp->cache_data.n_outs); + for (size_t i =0; i < (size_t) l_item_tmp->cache_data.n_outs; i++){ // TODO list conditional outputs dap_chain_tx_out_t * l_tx_out = l_tist_tmp->data; char * l_tx_out_addr_str = dap_chain_addr_to_str( &l_tx_out->addr ); @@ -1303,10 +1459,20 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx) } } if (l_token_ticker && !l_multichannel) - strncpy(l_item_tmp->token_tiker, l_token_ticker, sizeof(l_item_tmp->token_tiker) - 1); - - memcpy(l_item_tmp->tx, a_tx, dap_chain_datum_tx_get_size(a_tx)); - HASH_ADD(hh, l_ledger_priv->ledger_items, tx_hash_fast, sizeof(dap_chain_hash_fast_t), l_item_tmp); // tx_hash_fast: name of key field + strncpy(l_item_tmp->cache_data.token_tiker, l_token_ticker, sizeof(l_item_tmp->cache_data.token_tiker) - 1); + size_t l_tx_size = dap_chain_datum_tx_get_size(a_tx); + memcpy(l_item_tmp->tx, a_tx, l_tx_size); + HASH_ADD(hh, l_ledger_priv->ledger_items, tx_hash_fast, sizeof(dap_chain_hash_fast_t), l_item_tmp); // tx_hash_fast: name of key field + // Add it to cache + uint8_t *l_tx_cache = DAP_NEW_Z_SIZE(uint8_t, l_tx_size + sizeof(l_item_tmp->cache_data)); + memcpy(l_tx_cache, &l_item_tmp->cache_data, sizeof(l_item_tmp->cache_data)); + memcpy(l_tx_cache + sizeof(l_item_tmp->cache_data), a_tx, l_tx_size); + char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "txs"); + if (!dap_chain_global_db_gr_set(dap_strdup(l_tx_hash_str), l_tx_cache, l_tx_size + sizeof(l_item_tmp->cache_data), l_gdb_group)) { + log_it(L_WARNING, "Ledger cache mismatch"); + DAP_DELETE(l_tx_cache); + } + DAP_DELETE(l_gdb_group); ret = 1; } FIN: @@ -1316,6 +1482,25 @@ FIN: return ret; } +int dap_chain_ledger_tx_load(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx) +{ + if (PVT(a_ledger)->last_tx.found && PVT(a_ledger)->last_thres_tx.found) { + return dap_chain_ledger_tx_add(a_ledger, a_tx); + } else { + dap_chain_hash_fast_t l_tx_hash = {}; + dap_hash_fast(a_tx, dap_chain_datum_tx_get_size(a_tx), &l_tx_hash); + if (!PVT(a_ledger)->last_tx.found && + !memcmp(PVT(a_ledger)->last_tx.hash, &l_tx_hash, sizeof(dap_chain_hash_fast_t))) { + PVT(a_ledger)->last_tx.found = true; + } + if (!PVT(a_ledger)->last_thres_tx.found && + !memcmp(PVT(a_ledger)->last_thres_tx.hash, &l_tx_hash, sizeof(dap_chain_hash_fast_t))) { + PVT(a_ledger)->last_thres_tx.found = true; + } + } + return 1; +} + /** * Delete transaction from the cache * @@ -1332,18 +1517,20 @@ int dap_chain_ledger_tx_remove(dap_ledger_t *a_ledger, dap_chain_hash_fast_t *a_ HASH_FIND(hh, l_ledger_priv->ledger_items, a_tx_hash, sizeof(dap_chain_hash_fast_t), l_item_tmp); if(l_item_tmp != NULL) { HASH_DEL(l_ledger_priv->ledger_items, l_item_tmp); + // delete transaction + DAP_DELETE(l_item_tmp->tx); + // del struct for hash + DAP_DELETE(l_item_tmp); + // Remove it from cache + char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "txs"); + dap_chain_global_db_gr_del(dap_chain_hash_fast_to_str_new(a_tx_hash), l_gdb_group); + DAP_DELETE(l_gdb_group); l_ret = 1; } else // hash not found in the cache l_ret = -2; pthread_rwlock_unlock(&l_ledger_priv->ledger_rwlock); - if(!l_ret) { - // delete transaction - DAP_DELETE(l_item_tmp->tx); - // del struct for hash - DAP_DELETE(l_item_tmp); - } return l_ret; } @@ -1353,16 +1540,82 @@ int dap_chain_ledger_tx_remove(dap_ledger_t *a_ledger, dap_chain_hash_fast_t *a_ void dap_chain_ledger_purge(dap_ledger_t *a_ledger) { dap_ledger_private_t *l_ledger_priv = PVT(a_ledger); - dap_chain_ledger_tx_item_t *l_iter_current, *l_item_tmp; + const int l_hash_str_size = DAP_CHAIN_HASH_FAST_SIZE * 2 + 2; + char l_hash_str[l_hash_str_size]; pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); - HASH_ITER(hh, l_ledger_priv->ledger_items , l_iter_current, l_item_tmp) - { - // delete transaction - DAP_DELETE(l_iter_current->tx); - // del struct for hash - HASH_DEL(l_ledger_priv->ledger_items, l_iter_current); + pthread_rwlock_wrlock(&l_ledger_priv->tokens_rwlock); + pthread_rwlock_wrlock(&l_ledger_priv->treshold_emissions_rwlock); + pthread_rwlock_wrlock(&l_ledger_priv->treshold_txs_rwlock); + + // delete transactions + dap_chain_ledger_tx_item_t *l_item_current, *l_item_tmp; + char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "txs"); + HASH_ITER(hh, l_ledger_priv->ledger_items , l_item_current, l_item_tmp) { + DAP_DELETE(l_item_current->tx); + HASH_DEL(l_ledger_priv->ledger_items, l_item_current); + dap_chain_hash_fast_to_str(&l_item_current->tx_hash_fast, l_hash_str, l_hash_str_size); + dap_chain_global_db_gr_del(l_hash_str, l_gdb_group); + DAP_DELETE(l_item_current); + } + DAP_DELETE(l_gdb_group); + + // delete threshold txs + l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "thres_txs"); + HASH_ITER(hh, l_ledger_priv->treshold_txs, l_item_current, l_item_tmp) { + HASH_DEL(l_ledger_priv->treshold_txs, l_item_current); + dap_chain_hash_fast_to_str(&l_item_current->tx_hash_fast, l_hash_str, l_hash_str_size); + dap_chain_global_db_gr_del(l_hash_str, l_gdb_group); + DAP_DELETE(l_item_current->tx); + DAP_DELETE(l_item_current); + } + DAP_DELETE(l_gdb_group); + + // delete balances + dap_ledger_wallet_balance_t *l_balance_current, *l_balance_tmp; + l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "balances"); + HASH_ITER(hh, l_ledger_priv->balance_accounts, l_balance_current, l_balance_tmp) { + HASH_DEL(l_ledger_priv->balance_accounts, l_balance_current); + dap_chain_global_db_gr_del(l_balance_current->key, l_gdb_group); + DAP_DELETE(l_balance_current->key); + DAP_DELETE(l_balance_current); } + DAP_DELETE(l_gdb_group); + + // delete threshold emissions + dap_chain_ledger_token_emission_item_t *l_emission_current, *l_emission_tmp; + char *l_emissions_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "emissions"); + HASH_ITER(hh, l_ledger_priv->treshold_emissions, l_emission_current, l_emission_tmp) { + HASH_DEL(l_ledger_priv->treshold_emissions, l_emission_current); + dap_chain_hash_fast_to_str(&l_emission_current->datum_token_emission_hash, l_hash_str, l_hash_str_size); + dap_chain_global_db_gr_del(l_hash_str, l_emissions_gdb_group); + DAP_DELETE(l_emission_current->datum_token_emission); + DAP_DELETE(l_emission_current); + } + + // delete tokens & its emissions + dap_chain_ledger_token_item_t *l_token_current, *l_token_tmp; + l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "tokens"); + HASH_ITER(hh, l_ledger_priv->tokens, l_token_current, l_token_tmp) { + HASH_DEL(l_ledger_priv->tokens, l_token_current); + HASH_ITER(hh, l_token_current->token_emissions, l_emission_current, l_emission_tmp) { + HASH_DEL(l_token_current->token_emissions, l_emission_current); + dap_chain_hash_fast_to_str(&l_emission_current->datum_token_emission_hash, l_hash_str, l_hash_str_size); + dap_chain_global_db_gr_del(l_hash_str, l_emissions_gdb_group); + DAP_DELETE(l_emission_current->datum_token_emission); + DAP_DELETE(l_emission_current); + } + dap_chain_global_db_gr_del(l_token_current->ticker, l_gdb_group); + DAP_DELETE(l_token_current->datum_token); + pthread_rwlock_destroy(&l_token_current->token_emissions_rwlock); + DAP_DELETE(l_token_current); + } + DAP_DELETE(l_gdb_group); + DAP_DELETE(l_emissions_gdb_group); + pthread_rwlock_unlock(&l_ledger_priv->ledger_rwlock); + pthread_rwlock_unlock(&l_ledger_priv->tokens_rwlock); + pthread_rwlock_unlock(&l_ledger_priv->treshold_emissions_rwlock); + pthread_rwlock_unlock(&l_ledger_priv->treshold_txs_rwlock); } /** @@ -1370,16 +1623,7 @@ void dap_chain_ledger_purge(dap_ledger_t *a_ledger) */ _dap_int128_t dap_chain_ledger_count(dap_ledger_t *a_ledger) { - _dap_int128_t l_ret = 0; - dap_ledger_private_t *l_ledger_priv = PVT(a_ledger); - dap_chain_ledger_tx_item_t *l_iter_current, *l_item_tmp; - pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); - HASH_ITER(hh, l_ledger_priv->ledger_items , l_iter_current, l_item_tmp) - { - l_ret++; - } - pthread_rwlock_unlock(&l_ledger_priv->ledger_rwlock); - return l_ret; + return HASH_COUNT(PVT(a_ledger)->ledger_items); } /** @@ -1397,17 +1641,17 @@ uint64_t dap_chain_ledger_count_from_to(dap_ledger_t * a_ledger, time_t a_ts_fro pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); if ( a_ts_from && a_ts_to) { HASH_ITER(hh, l_ledger_priv->ledger_items , l_iter_current, l_item_tmp){ - if ( l_iter_current->ts_created >= a_ts_from && l_iter_current->ts_created <= a_ts_to ) + if ( l_iter_current->cache_data.ts_created >= a_ts_from && l_iter_current->cache_data.ts_created <= a_ts_to ) l_ret++; } } else if ( a_ts_to ){ HASH_ITER(hh, l_ledger_priv->ledger_items , l_iter_current, l_item_tmp){ - if ( l_iter_current->ts_created <= a_ts_to ) + if ( l_iter_current->cache_data.ts_created <= a_ts_to ) l_ret++; } } else if ( a_ts_from ){ HASH_ITER(hh, l_ledger_priv->ledger_items , l_iter_current, l_item_tmp){ - if ( l_iter_current->ts_created >= a_ts_from ) + if ( l_iter_current->cache_data.ts_created >= a_ts_from ) l_ret++; } }else { @@ -1512,7 +1756,7 @@ uint64_t dap_chain_ledger_calc_balance_full(dap_ledger_t *a_ledger, const dap_ch if (l_type == TX_ITEM_TYPE_OUT) { const dap_chain_tx_out_t *l_tx_out = (const dap_chain_tx_out_t*) l_list_tmp->data; // Check for token name - if (!strcmp(a_token_ticker, l_iter_current->token_tiker)) + if (!strcmp(a_token_ticker, l_iter_current->cache_data.token_tiker)) { // if transaction has the out item with requested addr if (!memcmp(a_addr, &l_tx_out->addr, sizeof(dap_chain_addr_t))) { // if 'out' item not used & transaction is valid @@ -1563,7 +1807,8 @@ static dap_chain_ledger_tx_item_t* tx_item_find_by_addr(dap_ledger_t *a_ledger, HASH_ITER(hh, l_ledger_priv->ledger_items , l_iter_current, l_item_tmp) { // If a_token is setup we check if its not our token - miss it - if (a_token && *l_iter_current->token_tiker && dap_strcmp(l_iter_current->token_tiker, a_token)) + if (a_token && *l_iter_current->cache_data.token_tiker && + dap_strcmp(l_iter_current->cache_data.token_tiker, a_token)) continue; // Now work with it dap_chain_datum_tx_t *l_tx = l_iter_current->tx; @@ -1715,7 +1960,7 @@ dap_chain_datum_tx_t* dap_chain_ledger_tx_cache_find_out_cond(dap_ledger_t *a_le l_cur_tx = l_tx_tmp; memcpy(a_tx_first_hash, l_tx_hash_tmp, sizeof(dap_chain_hash_fast_t)); if (a_token_ticker) { - strcpy(a_token_ticker, l_iter_current->token_tiker); + strcpy(a_token_ticker, l_iter_current->cache_data.token_tiker); } break; } diff --git a/modules/chain/include/dap_chain.h b/modules/chain/include/dap_chain.h index 86f4782d85f2e2720f72e56a7c5a305f2c0ee0b5..ed3f73afb2df178761798d6cee8a7485fe6d388a 100644 --- a/modules/chain/include/dap_chain.h +++ b/modules/chain/include/dap_chain.h @@ -117,6 +117,7 @@ typedef struct dap_chain{ dap_chain_callback_new_cfg_t callback_created; dap_chain_callback_t callback_delete; + dap_chain_callback_t callback_purge; dap_chain_callback_atom_t callback_atom_add; dap_chain_callback_atom_verify_t callback_atom_verify; diff --git a/modules/chain/include/dap_chain_ledger.h b/modules/chain/include/dap_chain_ledger.h index 29b7b7e9f0ecf06987174d0b66e1ca87b3370b82..3eb630475f066d7069f58607450e42edfd9e5265 100644 --- a/modules/chain/include/dap_chain_ledger.h +++ b/modules/chain/include/dap_chain_ledger.h @@ -38,6 +38,7 @@ #include "dap_chain_datum_tx_items.h" typedef struct dap_ledger { + char *net_name; void *_internal; } dap_ledger_t; @@ -52,7 +53,7 @@ typedef bool (* dap_chain_ledger_verificator_callback_t)(dap_chain_tx_out_cond_t // Check the double spending in all cells #define DAP_CHAIN_LEDGER_CHECK_CELLS_DS 0x0100 -dap_ledger_t* dap_chain_ledger_create(uint16_t a_check_flags); +dap_ledger_t* dap_chain_ledger_create(uint16_t a_check_flags, char *a_net_name); // Remove dap_ledger_t structure void dap_chain_ledger_handle_free(dap_ledger_t *a_ledger); @@ -67,19 +68,28 @@ void dap_chain_ledger_set_local_cell_id(dap_ledger_t *a_ledger, dap_chain_cell_i * @param a_tx * @return */ -static inline dap_chain_hash_fast_t* dap_chain_node_datum_tx_calc_hash(dap_chain_datum_tx_t *a_tx) +DAP_STATIC_INLINE dap_chain_hash_fast_t* dap_chain_node_datum_tx_calc_hash(dap_chain_datum_tx_t *a_tx) { dap_chain_hash_fast_t *tx_hash = DAP_NEW_Z(dap_chain_hash_fast_t); dap_hash_fast(a_tx, dap_chain_datum_tx_get_size(a_tx), tx_hash); return tx_hash; } +DAP_STATIC_INLINE char *dap_chain_ledger_get_gdb_group(dap_ledger_t *a_ledger, const char *a_suffix) +{ + if (a_ledger) { + return dap_strdup_printf("local.ledger-cache.%s.%s", a_ledger->net_name, a_suffix); + } + return NULL; +} + /** * Add new transaction to the cache * * return 1 OK, -1 error */ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx); +int dap_chain_ledger_tx_load(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx); int dap_chain_ledger_tx_add_check(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx); @@ -97,6 +107,7 @@ int dap_chain_ledger_token_ticker_check(dap_ledger_t * a_ledger, const char *a_t */ int dap_chain_ledger_token_add(dap_ledger_t * a_ledger,dap_chain_datum_token_t *a_token, size_t a_token_size); +int dap_chain_ledger_token_load(dap_ledger_t * a_ledger,dap_chain_datum_token_t *a_token, size_t a_token_size); int dap_chain_ledger_token_decl_add_check(dap_ledger_t * a_ledger,dap_chain_datum_token_t *a_token); /** @@ -104,6 +115,8 @@ int dap_chain_ledger_token_decl_add_check(dap_ledger_t * a_ledger,dap_chain_datu */ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, const dap_chain_datum_token_emission_t *a_token_emission, size_t a_token_emission_size); +int dap_chain_ledger_token_emission_load(dap_ledger_t *a_ledger, + const dap_chain_datum_token_emission_t *a_token_emission, size_t a_token_emission_size); // Check if it addable int dap_chain_ledger_token_emission_add_check(dap_ledger_t *a_ledger, diff --git a/modules/chain/include/dap_chain_pvt.h b/modules/chain/include/dap_chain_pvt.h index d18e434a8f0300db0f245c688dc17e75f3f358c9..4af990d5fcdf6b2650e6ba1a44785a67400d26dd 100644 --- a/modules/chain/include/dap_chain_pvt.h +++ b/modules/chain/include/dap_chain_pvt.h @@ -41,6 +41,8 @@ typedef struct dap_chain_pvt { dap_chain_t * chain; char * file_storage_dir; + char * cs_name; + int celled; } dap_chain_pvt_t; #define DAP_CHAIN_PVT(a) ((dap_chain_pvt_t *) a->_pvt ) diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 1d6eb6a2d6f28c2ec9e75f885f077a0e4285278b..320bed0abebed52e17b61c8b5dba75567be9d981 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -131,32 +131,10 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); - - dap_chain_atom_ptr_t * l_lasts = NULL; - size_t *l_lasts_sizes = NULL; - size_t l_lasts_count = 0; - dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create(l_chain); - l_ch_chain->request_atom_iter = l_iter; - l_lasts = l_chain->callback_atom_iter_get_lasts(l_iter, &l_lasts_count, &l_lasts_sizes); - log_it(L_INFO, "Found %d last atoms for synchronization", l_lasts_count); - if (l_lasts && l_lasts_sizes) { - for(long int i = 0; i < l_lasts_count; i++) { - dap_chain_atom_item_t * l_item = NULL; - dap_chain_hash_fast_t l_atom_hash; - dap_hash_fast(l_lasts[i], l_lasts_sizes[i], &l_atom_hash); - pthread_mutex_lock(&l_ch_chain->mutex); - HASH_FIND(hh, l_ch_chain->request_atoms_lasts, &l_atom_hash, sizeof(l_atom_hash), - l_item); - if(l_item == NULL) { // Not found, add new lasts - l_item = DAP_NEW_Z(dap_chain_atom_item_t); - l_item->atom = l_lasts[i]; - l_item->atom_size = l_lasts_sizes[i]; - memcpy(&l_item->atom_hash, &l_atom_hash, sizeof(l_atom_hash)); - HASH_ADD(hh, l_ch_chain->request_atoms_lasts, atom_hash, sizeof(l_atom_hash), - l_item); - } - pthread_mutex_unlock(&l_ch_chain->mutex); - } + l_ch_chain->request_atom_iter = l_chain->callback_atom_iter_create(l_chain); + size_t l_first_size = 0; + dap_chain_atom_ptr_t *l_first = l_chain->callback_atom_iter_get_first(l_ch_chain->request_atom_iter, &l_first_size); + if (l_first && l_first_size) { // first packet l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS; dap_chain_node_addr_t l_node_addr = { 0 }; @@ -168,15 +146,16 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) } else { // last packet - dap_stream_ch_chain_sync_request_t l_request; - memset(&l_request,0,sizeof (l_request)); + dap_stream_ch_chain_sync_request_t l_request = {}; dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); + DAP_DEL_Z(l_ch_chain->request_atom_iter); l_ch_chain->state = CHAIN_STATE_IDLE; + if (l_ch_chain->callback_notify_packet_out) + l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, + NULL, 0, l_ch_chain->callback_notify_arg); } - DAP_DELETE(l_lasts); - DAP_DELETE(l_iter); dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); return true; @@ -603,15 +582,7 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain) memset(&a_ch_chain->request_cell_id, 0, sizeof(a_ch_chain->request_cell_id)); memset(&a_ch_chain->request_chain_id, 0, sizeof(a_ch_chain->request_chain_id)); memset(&a_ch_chain->request_last_ts, 0, sizeof(a_ch_chain->request_last_ts)); - - dap_chain_atom_item_t *l_atom_item = NULL, *l_atom_item_tmp = NULL; - pthread_mutex_lock(&a_ch_chain->mutex); - HASH_ITER( hh,a_ch_chain->request_atoms_lasts, l_atom_item, l_atom_item_tmp) - HASH_DEL(a_ch_chain->request_atoms_lasts, l_atom_item); - - HASH_ITER( hh, a_ch_chain->request_atoms_processed, l_atom_item, l_atom_item_tmp ) - HASH_DEL(a_ch_chain->request_atoms_processed, l_atom_item); - pthread_mutex_unlock(&a_ch_chain->mutex); + DAP_DEL_Z(a_ch_chain->request_atom_iter); } bool s_process_gdb_iter(dap_stream_ch_t *a_ch) @@ -699,79 +670,26 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) // Synchronize chains case CHAIN_STATE_SYNC_CHAINS: { //log_it(L_DEBUG, "CHAIN_STATE_SYNC_CHAINS"); - dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); - dap_chain_atom_item_t * l_atom_item = NULL, *l_atom_item_tmp = NULL;//, *l_chains_lasts_new = NULL; - if(l_ch_chain->request_atoms_lasts == NULL) { // All chains synced - dap_stream_ch_chain_sync_request_t l_request; - memset(&l_request,0,sizeof (l_request)); - uint8_t l_send_pkt_type = l_ch_chain->state == CHAIN_STATE_SYNC_CHAINS ? - DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS : - DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL; + if (l_ch_chain->request_atom_iter->cur == NULL) { // All chains synced + dap_stream_ch_chain_sync_request_t l_request = {}; // last message - dap_stream_ch_chain_pkt_write_unsafe(l_ch, l_send_pkt_type, + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); l_packet_out = true; - log_it( L_DEBUG,"Synced: %llu atoms processed", - l_ch_chain->stats_request_atoms_processed); + log_it( L_DEBUG,"Synced: %llu atoms processed", l_ch_chain->stats_request_atoms_processed); dap_stream_ch_chain_go_idle(l_ch_chain); - - if(l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, l_send_pkt_type, NULL, 0, l_ch_chain->callback_notify_arg); - }else{ // Process one chain from l_ch_chain->request_atoms_lasts - pthread_mutex_lock(&l_ch_chain->mutex); - HASH_ITER(hh,l_ch_chain->request_atoms_lasts, l_atom_item, l_atom_item_tmp) { - dap_chain_atom_item_t * l_atom_item_proc = NULL; - // Check if its processed already - HASH_FIND(hh, l_ch_chain->request_atoms_processed, &l_atom_item->atom_hash, - sizeof(l_atom_item->atom_hash), l_atom_item_proc); - - if(l_atom_item_proc == NULL) { // If not processed we first store it in special table - l_atom_item_proc = DAP_NEW_Z(dap_chain_atom_item_t); - l_atom_item_proc->atom = l_atom_item->atom; - memcpy(&l_atom_item_proc->atom_hash, &l_atom_item->atom_hash, sizeof(l_atom_item->atom_hash)); - HASH_ADD(hh, l_ch_chain->request_atoms_processed, atom_hash, sizeof(l_atom_item->atom_hash), - l_atom_item_proc); - - // Then flush it out to the remote - size_t l_atom_size = l_atom_item->atom_size; - log_it(L_INFO, "Send one chain packet len=%d", l_atom_size); - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_net_id, - l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, - l_atom_item->atom, l_atom_size); - l_packet_out = true; - l_ch_chain->stats_request_atoms_processed++; - // Then parse links and populate new lasts - size_t l_links_count = 0; - dap_chain_atom_ptr_t * l_links = NULL; - size_t *l_links_sizes = NULL; - dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create_from(l_chain, l_atom_item->atom, l_atom_item->atom_size); - l_links = l_chain->callback_atom_iter_get_links(l_iter, &l_links_count, &l_links_sizes); - DAP_DELETE(l_iter); - if (l_links && l_links_sizes) { - for(size_t i = 0; i < l_links_count; i++) { // Find links - dap_chain_atom_item_t * l_link_item = NULL; - dap_chain_hash_fast_t l_link_hash; - dap_hash_fast(l_links[i], l_links_sizes[i], &l_link_hash); - // Check link in processed atims - HASH_FIND(hh, l_ch_chain->request_atoms_processed, &l_link_hash, sizeof(l_link_hash), l_link_item); - if(l_link_item == NULL) { // Not found, add new lasts - l_link_item = DAP_NEW_Z(dap_chain_atom_item_t); - l_link_item->atom = l_links[i];// do not use memory cause it will be deleted - l_link_item->atom_size = l_links_sizes[i]; - memcpy(&l_link_item->atom_hash, &l_link_hash, sizeof(l_link_hash)); - HASH_ADD(hh, l_ch_chain->request_atoms_lasts, atom_hash, sizeof(l_link_hash), l_link_item); - } - } - DAP_DELETE(l_links); - } - HASH_DEL(l_ch_chain->request_atoms_lasts, l_atom_item); - break; - } else { - HASH_DEL(l_ch_chain->request_atoms_lasts, l_atom_item); - } - } - pthread_mutex_unlock(&l_ch_chain->mutex); + if (l_ch_chain->callback_notify_packet_out) + l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, NULL, + 0, l_ch_chain->callback_notify_arg); + } else { // Process one chain from l_ch_chain->request_atom_iter + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_net_id, + l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, + l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size); + l_packet_out = true; + l_ch_chain->stats_request_atoms_processed++; + // Then get next atom and populate new last + l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); } } break; default: break; diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index 9e576b7656c964ae5dbe01f999b38f7dcb6fedb5..17992a7de03552522bdae7834a2ddf6703cf74a4 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -54,8 +54,6 @@ typedef struct dap_stream_ch_chain { dap_stream_ch_chain_state_t state; dap_chain_atom_iter_t * request_atom_iter; - dap_chain_atom_item_t * request_atoms_lasts; - dap_chain_atom_item_t * request_atoms_processed; uint8_t *pkt_data; uint64_t pkt_data_size; uint64_t stats_request_atoms_processed; diff --git a/modules/consensus/none/dap_chain_cs_none.c b/modules/consensus/none/dap_chain_cs_none.c index 0ffa2df546d2809fa1817cf77276580e7b52ce33..0819674cad21cd3e584cb335af55245f377f32cb 100644 --- a/modules/consensus/none/dap_chain_cs_none.c +++ b/modules/consensus/none/dap_chain_cs_none.c @@ -65,8 +65,6 @@ typedef struct dap_chain_gdb_private #define PVT(a) ( (a) ? (dap_chain_gdb_private_t* ) (a)->_internal : NULL) -static int dap_chain_gdb_ledger_load(dap_chain_gdb_t *a_gdb, dap_chain_net_t *a_net); - // Atomic element organization callbacks static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t, size_t); // Accept new event in gdb static dap_chain_atom_verify_res_t s_chain_callback_atom_verify(dap_chain_t * a_chain, dap_chain_atom_ptr_t, size_t); // Verify new event in gdb @@ -138,10 +136,16 @@ static void s_history_callback_notify(void * a_arg, const char a_op_code, const dap_chain_net_t *l_net = dap_chain_net_by_id( l_gdb->chain->net_id); log_it(L_DEBUG,"%s.%s: op_code='%c' group=\"%s\" key=\"%s\" value_size=%u",l_net->pub.name, l_gdb->chain->name, a_op_code, a_group, a_key, a_value_size); + dap_chain_node_mempool_autoproc_notify((void *)l_net, a_op_code, a_prefix, a_group, a_key, a_value, a_value_size); dap_chain_net_sync_gdb_broadcast((void *)l_net, a_op_code, a_prefix, a_group, a_key, a_value, a_value_size); } } +static void s_dap_chain_gdb_callback_purge(dap_chain_t *a_chain) +{ + PVT(DAP_CHAIN_GDB(a_chain))->is_load_mode = true; +} + /** * @brief dap_chain_gdb_new * @param a_chain @@ -176,10 +180,10 @@ int dap_chain_gdb_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) // load ledger l_gdb_priv->is_load_mode = true; - dap_chain_gdb_ledger_load(l_gdb, l_net); - l_gdb_priv->is_load_mode = false; + dap_chain_gdb_ledger_load(l_gdb_priv->group_datums, a_chain); a_chain->callback_delete = dap_chain_gdb_delete; + a_chain->callback_purge = s_dap_chain_gdb_callback_purge; // Atom element callbacks a_chain->callback_atom_add = s_chain_callback_atom_add; // Accept new element in chain @@ -258,24 +262,17 @@ const char* dap_chain_gdb_get_group(dap_chain_t * a_chain) * * return 0 if OK otherwise negative error code */ -static int dap_chain_gdb_ledger_load(dap_chain_gdb_t *a_gdb, dap_chain_net_t *a_net) +int dap_chain_gdb_ledger_load(char *a_gdb_group, dap_chain_t *a_chain) { - dap_chain_gdb_private_t *l_gdb_priv = PVT(a_gdb); - // protect from reloading - if(dap_chain_ledger_count( a_net->pub.ledger ) > 0) - return 0; - size_t l_data_size = 0; - - // Read the entire database into an array of size bytes - dap_global_db_obj_t *data = dap_chain_global_db_gr_load(l_gdb_priv->group_datums , &l_data_size); + dap_global_db_obj_t *data = dap_chain_global_db_gr_load(a_gdb_group, &l_data_size); // make list of datums for(size_t i = 0; i < l_data_size; i++) { - s_chain_callback_atom_add(a_gdb->chain,data[i].value, data[i].value_len); - + s_chain_callback_atom_add(a_chain, data[i].value, data[i].value_len); } dap_chain_global_db_objs_delete(data, l_data_size); + PVT(DAP_CHAIN_GDB(a_chain))->is_load_mode = false; return 0; } @@ -322,11 +319,11 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha switch (l_datum->header.type_id) { case DAP_CHAIN_DATUM_TOKEN_DECL:{ dap_chain_datum_token_t *l_token = (dap_chain_datum_token_t*) l_datum->data; - dap_chain_ledger_token_add(a_chain->ledger,l_token, l_datum->header.data_size); + dap_chain_ledger_token_load(a_chain->ledger,l_token, l_datum->header.data_size); }break; case DAP_CHAIN_DATUM_TOKEN_EMISSION: { dap_chain_datum_token_emission_t *l_token_emission = (dap_chain_datum_token_emission_t*) l_datum->data; - dap_chain_ledger_token_emission_add(a_chain->ledger, l_token_emission, l_datum->header.data_size); + dap_chain_ledger_token_emission_load(a_chain->ledger, l_token_emission, l_datum->header.data_size); }break; case DAP_CHAIN_DATUM_TX:{ dap_chain_datum_tx_t *l_tx = (dap_chain_datum_tx_t*) l_datum->data; @@ -335,7 +332,7 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha //if(dap_chain_datum_tx_get_size(l_tx) == l_datum->header.data_size){ // don't save bad transactions to base - if(dap_chain_ledger_tx_add(a_chain->ledger, l_tx) != 1) + if(dap_chain_ledger_tx_load(a_chain->ledger, l_tx) != 1) return ATOM_REJECT; //}else // return -2; diff --git a/modules/consensus/none/include/dap_chain_cs_none.h b/modules/consensus/none/include/dap_chain_cs_none.h index e40ae2184e63b48f74f699755053ed77c5420eb5..adf5cd6f0082c2239fd221d6c976fc6e74d60f31 100644 --- a/modules/consensus/none/include/dap_chain_cs_none.h +++ b/modules/consensus/none/include/dap_chain_cs_none.h @@ -39,4 +39,4 @@ int dap_chain_gdb_init(void); int dap_chain_gdb_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg); void dap_chain_gdb_delete(dap_chain_t * a_chain); const char* dap_chain_gdb_get_group(dap_chain_t * a_chain); - +int dap_chain_gdb_ledger_load(char *a_gdb_group, dap_chain_t *a_chain); diff --git a/modules/net/CMakeLists.txt b/modules/net/CMakeLists.txt index e5d055725660d5a26b7aa656d5037c7616204d48..104d304db2ca5a35ca2cb8e215b77affb1e62b30 100644 --- a/modules/net/CMakeLists.txt +++ b/modules/net/CMakeLists.txt @@ -40,12 +40,12 @@ add_library(${PROJECT_NAME} STATIC ${DAP_CHAIN_NET_SRCS} ${DAP_CHAIN_NET_HEADERS if(WIN32) target_link_libraries(dap_chain_net dap_core dap_crypto dap_client dap_stream_ch_chain dap_stream_ch_chain_net dap_stream_ch_chain_net_srv dap_chain dap_chain_crypto dap_chain_wallet dap_chain_net_srv - dap_chain_mempool dap_chain_global_db dap_chain_net_srv_stake) + dap_chain_mempool dap_chain_global_db dap_chain_net_srv_stake dap_chain_cs_none) endif() if(UNIX) target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_client dap_stream_ch_chain dap_stream_ch_chain_net dap_stream_ch_chain_net_srv dap_chain - dap_chain_wallet dap_chain_net_srv dap_chain_mempool dap_chain_global_db dap_chain_net_srv_stake + dap_chain_wallet dap_chain_net_srv dap_chain_mempool dap_chain_global_db dap_chain_net_srv_stake dap_chain_cs_none resolv ) endif() diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 586c36a59f959d3d6797e5a86edfa1384832c7de..4a213435a8b803ec3bcc030c07959c474f568a1c 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -38,6 +38,7 @@ #ifdef DAP_OS_UNIX #include <sys/types.h> #include <sys/socket.h> +#include <arpa/inet.h> #include <netdb.h> #endif @@ -66,10 +67,12 @@ #include "dap_enc_http.h" #include "dap_chain_common.h" #include "dap_chain_net.h" +#include "dap_chain_pvt.h" #include "dap_chain_node_client.h" #include "dap_chain_node_cli.h" #include "dap_chain_node_cli_cmd.h" #include "dap_chain_ledger.h" +#include "dap_chain_cs_none.h" #include "dap_chain_global_db.h" #include "dap_chain_global_db_remote.h" @@ -422,9 +425,9 @@ static int s_net_states_proc(dap_chain_net_t * l_net) dap_chain_node_info_t *l_remote_node_info = dap_chain_node_info_read(l_net, l_remote_addr); if(l_remote_node_info) { dap_chain_node_info_t *l_link_node_info = DAP_NEW_Z(dap_chain_node_info_t); - int res = 0; //dap_dns_client_get_addr(l_remote_node_info->hdr.ext_addr_v4.s_addr, l_net->pub.name, l_link_node_info); - memcpy(l_link_node_info, l_remote_node_info, sizeof(dap_chain_node_info_t)); - if (l_link_node_info->hdr.address.uint64 != l_own_addr) { + int l_res = dap_dns_client_get_addr(l_remote_node_info->hdr.ext_addr_v4.s_addr, l_net->pub.name, l_link_node_info); + //memcpy(l_link_node_info, l_remote_node_info, sizeof(dap_chain_node_info_t)); + if (!l_res && l_link_node_info->hdr.address.uint64 != l_own_addr) { l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info); } DAP_DELETE(l_remote_node_info); @@ -865,7 +868,8 @@ int dap_chain_net_init() "\tPrint list of authority cetificates from GDB group\n" "net -net <chain net name> ca del -hash <cert hash> [-H hex|base58(default)]\n" "\tDelete certificate from list of authority cetificates in GDB group by it's hash\n" - ); + "net -net <chain net name> ledger reload\n" + "\tPurge the cache of chain net ledger and recalculate it from chain file\n" ); s_seed_mode = dap_config_get_item_bool_default(g_config,"general","seed_mode",false); dap_chain_global_db_add_history_group_prefix("global", GROUP_LOCAL_HISTORY); @@ -1004,12 +1008,14 @@ static int s_cli_net( int argc, char **argv, void *arg_func, char **a_str_reply) const char *l_get_str = NULL; const char *l_stats_str = NULL; const char *l_ca_str = NULL; + const char *l_ledger_str = NULL; dap_chain_node_cli_find_option_val(argv, arg_index, argc, "sync", &l_sync_str); dap_chain_node_cli_find_option_val(argv, arg_index, argc, "link", &l_links_str); dap_chain_node_cli_find_option_val(argv, arg_index, argc, "go", &l_go_str); dap_chain_node_cli_find_option_val(argv, arg_index, argc, "get", &l_get_str); dap_chain_node_cli_find_option_val(argv, arg_index, argc, "stats", &l_stats_str); dap_chain_node_cli_find_option_val(argv, arg_index, argc, "ca", &l_ca_str); + dap_chain_node_cli_find_option_val(argv, arg_index, argc, "ledger", &l_ledger_str); if ( l_stats_str ){ if ( strcmp(l_stats_str,"tx") == 0 ) { @@ -1245,6 +1251,19 @@ static int s_cli_net( int argc, char **argv, void *arg_func, char **a_str_reply) "Subcommand \"ca\" requires one of parameter: add, list, del\n"); ret = -5; } + } else if (l_ledger_str && !strcmp(l_ledger_str, "reload")) { + dap_chain_ledger_purge(l_net->pub.ledger); + dap_chain_t *l_chain; + DL_FOREACH(l_net->pub.chains, l_chain) { + if (l_chain->callback_purge) { + l_chain->callback_purge(l_chain); + } + if (!strcmp(DAP_CHAIN_PVT(l_chain)->cs_name, "none")) { + dap_chain_gdb_ledger_load((char *)dap_chain_gdb_get_group(l_chain), l_chain); + } else { + dap_chain_load_all(l_chain); + } + } } else { dap_chain_node_cli_set_reply_text(a_str_reply,"Command requires one of subcomand: sync, links\n"); ret = -1; @@ -1381,7 +1400,7 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) l_ledger_flags |= DAP_CHAIN_LEDGER_CHECK_LOCAL_DS; } // init LEDGER model - l_net->pub.ledger = dap_chain_ledger_create(l_ledger_flags); + l_net->pub.ledger = dap_chain_ledger_create(l_ledger_flags, l_net->pub.name); // Check if seed nodes are present in local db alias char **l_seed_aliases = dap_config_get_array_str( l_cfg , "general" ,"seed_nodes_aliases" ,&PVT(l_net)->seed_aliases_count); diff --git a/modules/net/dap_chain_node_cli.c b/modules/net/dap_chain_node_cli.c index 82f0a7db27a58ed7e6c93148f72366970273eafd..4628c7a4d1aa98078fdfd1d9f79ed1f812af44ef 100644 --- a/modules/net/dap_chain_node_cli.c +++ b/modules/net/dap_chain_node_cli.c @@ -40,6 +40,7 @@ //#include <sys/select.h> #include <netinet/in.h> #include <sys/un.h> +#include <sys/stat.h> //#define closesocket close //typedef int SOCKET; //#define SOCKET_ERROR -1 // for win32 = (-1) diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index ce8848a5b700b483664c4b38b216606b78721546..a95864255192283925b47abae97944b9341fa02e 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -38,6 +38,10 @@ #include <ws2tcpip.h> #include <io.h> #include <pthread.h> +#else +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> #endif #include "dap_common.h" diff --git a/modules/net/dap_chain_node_ping.c b/modules/net/dap_chain_node_ping.c index 67ab55b30619a724a521050cb7a782deaebb7e17..e21b1bc78389b5698eec6d4bb9d3f47d5045c103 100644 --- a/modules/net/dap_chain_node_ping.c +++ b/modules/net/dap_chain_node_ping.c @@ -59,6 +59,9 @@ #include <wepoll.h> #else #include <signal.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> #endif #include <pthread.h> diff --git a/modules/net/dap_dns_server.c b/modules/net/dap_dns_server.c index 8c6907d31e1d1cfe2dfce67005a5db4309669fc7..572ad8ffde30b4000fde5b0679c7b23b6c353157 100644 --- a/modules/net/dap_dns_server.c +++ b/modules/net/dap_dns_server.c @@ -24,8 +24,6 @@ #include <errno.h> #include "dap_dns_server.h" -#include "dap_udp_server.h" -#include "dap_udp_client.h" #include "dap_events_socket.h" #include "dap_common.h" #include "dap_chain_net.h" @@ -38,6 +36,9 @@ #ifndef _WIN32 #include <unistd.h> // for close +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> #define closesocket close #define INVALID_SOCKET -1 #endif @@ -191,16 +192,16 @@ dap_dns_zone_callback_t dap_dns_zone_find(char *hostname) { * @param arg Unused * @return none */ -void dap_dns_client_read(dap_events_socket_t *client, void * arg) { - UNUSED(arg); - if (client->buf_in_size < DNS_HEADER_SIZE) { // Bad request +void dap_dns_client_read(dap_events_socket_t *a_es, void *a_arg) { + UNUSED(a_arg); + if (a_es->buf_in_size < DNS_HEADER_SIZE) { // Bad request return; } dap_dns_buf_t *dns_message = DAP_NEW(dap_dns_buf_t); dap_dns_buf_t *dns_reply = DAP_NEW(dap_dns_buf_t); - dns_message->data = DAP_NEW_SIZE(char, client->buf_in_size + 1); - dns_message->data[client->buf_in_size] = 0; - dap_events_socket_pop_from_buf_in(client, dns_message->data, client->buf_in_size); + dns_message->data = DAP_NEW_SIZE(char, a_es->buf_in_size + 1); + dns_message->data[a_es->buf_in_size] = 0; + dap_events_socket_pop_from_buf_in(a_es, dns_message->data, a_es->buf_in_size); dns_message->ptr = 0; // Parse incoming DNS message @@ -332,8 +333,7 @@ void dap_dns_client_read(dap_events_socket_t *client, void * arg) { dns_reply->data[2] = msg_flags.val >> 8; dns_reply->data[3] = msg_flags.val; // Send DNS reply - dap_events_socket_write_unsafe( client, dns_reply->data, dns_reply->ptr); - dap_events_socket_set_writable_unsafe( client, true); + dap_events_socket_write_unsafe(a_es, dns_reply->data, dns_reply->ptr); dap_string_free(dns_hostname, true); cleanup: DAP_DELETE(dns_reply->data); @@ -343,20 +343,16 @@ cleanup: return; } -void dap_dns_server_start() { - s_dns_server = DAP_NEW(dap_dns_server_t); - s_dns_server->hash_table = NULL; - s_dns_server->instance = dap_udp_server_listen(DNS_LISTEN_PORT); +void dap_dns_server_start(dap_events_t *a_ev) { + s_dns_server = DAP_NEW_Z(dap_dns_server_t); + dap_events_socket_callbacks_t l_cb = {}; + l_cb.read_callback = dap_dns_client_read; + s_dns_server->instance = dap_server_new(a_ev, NULL, DNS_LISTEN_PORT, DAP_SERVER_UDP, &l_cb); if (!s_dns_server->instance) { log_it(L_ERROR, "Can't start DNS server"); return; } - s_dns_server->instance->client_callbacks.read_callback = dap_dns_client_read; - s_dns_server->instance->client_callbacks.write_callback = NULL; - s_dns_server->instance->client_callbacks.new_callback = NULL; - s_dns_server->instance->client_callbacks.delete_callback = NULL; dap_dns_zone_register(&s_root_alias[0], dap_dns_resolve_hostname); // root resolver - pthread_create(&s_dns_server->udp_thread, NULL, (void *)dap_udp_server_loop, s_dns_server->instance); } void dap_dns_server_stop() { @@ -369,8 +365,7 @@ void dap_dns_server_stop() { DAP_DELETE(current_zone->zone); DAP_DELETE(current_zone); } - // TODO add code to stop udp_thread - dap_udp_server_delete(s_dns_server->instance); + dap_server_delete(s_dns_server->instance); DAP_DELETE(s_dns_server); } diff --git a/modules/net/include/dap_dns_server.h b/modules/net/include/dap_dns_server.h index e36022f8bf2bf4d08d774075ba41a2f1b24540e4..75cdec7feb2e2d36126a9a81c23df1914def441c 100644 --- a/modules/net/include/dap_dns_server.h +++ b/modules/net/include/dap_dns_server.h @@ -116,11 +116,10 @@ typedef struct _dap_dns_zone_hash_t { typedef struct _dap_dns_server_t { dap_server_t *instance; - pthread_t udp_thread; dap_dns_zone_hash_t *hash_table; } dap_dns_server_t; -void dap_dns_server_start(); +void dap_dns_server_start(dap_events_t *a_ev); void dap_dns_server_stop(); int dap_dns_zone_register(char *zone, dap_dns_zone_callback_t callback); int dap_dns_zone_unregister(char *zone); diff --git a/modules/net/srv/dap_chain_net_srv_order.c b/modules/net/srv/dap_chain_net_srv_order.c index 8aba700a30215b038b580b8d0ee49744bd6684e3..869461dc4f5340562c2a40ee97251d6fa451f224 100644 --- a/modules/net/srv/dap_chain_net_srv_order.c +++ b/modules/net/srv/dap_chain_net_srv_order.c @@ -524,17 +524,17 @@ static void s_srv_order_callback_notify(void *a_arg, const char a_op_code, const if (!strcmp(a_group, l_gdb_group_str)) { dap_chain_net_srv_order_t *l_order = (dap_chain_net_srv_order_t *)a_value; if (l_order->version == 1) { - dap_chain_global_db_gr_del((char *)a_key, a_group); + dap_chain_global_db_gr_del(dap_strdup(a_key), a_group); } else { dap_sign_t *l_sign = (dap_sign_t *)&l_order->ext[l_order->ext_size]; if (!dap_sign_verify(l_sign, l_order, sizeof(dap_chain_net_srv_order_t) + l_order->ext_size)) { - dap_chain_global_db_gr_del((char *)a_key, a_group); + dap_chain_global_db_gr_del(dap_strdup(a_key), a_group); DAP_DELETE(l_gdb_group_str); return; } dap_chain_hash_fast_t l_pkey_hash; if (!dap_sign_get_pkey_hash(l_sign, &l_pkey_hash)) { - dap_chain_global_db_gr_del((char *)a_key, a_group); + dap_chain_global_db_gr_del(dap_strdup(a_key), a_group); DAP_DELETE(l_gdb_group_str); return; } @@ -542,7 +542,7 @@ static void s_srv_order_callback_notify(void *a_arg, const char a_op_code, const dap_chain_addr_fill(&l_addr, l_sign->header.type, &l_pkey_hash, l_net->pub.id); uint64_t l_solvency = dap_chain_ledger_calc_balance(l_net->pub.ledger, &l_addr, l_order->price_ticker); if (l_solvency < l_order->price && !dap_chain_net_srv_stake_key_delegated(&l_addr)) { - dap_chain_global_db_gr_del((char *)a_key, a_group); + dap_chain_global_db_gr_del(dap_strdup(a_key), a_group); } } DAP_DELETE(l_gdb_group_str); diff --git a/modules/service/vpn/dap_chain_net_vpn_client.c b/modules/service/vpn/dap_chain_net_vpn_client.c index 35684d2e56e6c2fd1bc18079a7fb36d0a470e75f..3ad83911090d69415b2a787e7e79098a3d5d5e21 100644 --- a/modules/service/vpn/dap_chain_net_vpn_client.c +++ b/modules/service/vpn/dap_chain_net_vpn_client.c @@ -35,6 +35,9 @@ #include <sys/ioctl.h> #include <sys/time.h> #include <sys/epoll.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> #include "dap_common.h" #include "dap_config.h" #include "dap_strfuncs.h" diff --git a/modules/service/xchange/dap_chain_net_srv_xchange.c b/modules/service/xchange/dap_chain_net_srv_xchange.c index b7bd5071492e047932395300fca186099c49d41c..6336a366903b8bb6b989257ab085598fd4846351 100644 --- a/modules/service/xchange/dap_chain_net_srv_xchange.c +++ b/modules/service/xchange/dap_chain_net_srv_xchange.c @@ -653,7 +653,6 @@ static int s_cli_srv_xchange_price(int a_argc, char **a_argv, int a_arg_index, c } DAP_DELETE(l_order_hash_str); DAP_DELETE(l_price->wallet_str); - DAP_DELETE(l_price->key_ptr); DAP_DELETE(l_price); if (!l_str_reply->len) { dap_string_append(l_str_reply, "Price successfully removed"); @@ -715,7 +714,7 @@ static int s_cli_srv_xchange_price(int a_argc, char **a_argv, int a_arg_index, c return -14; } HASH_DEL(s_srv_xchange->pricelist, l_price); - dap_chain_global_db_gr_del(l_price->key_ptr, GROUP_LOCAL_XCHANGE); + dap_chain_global_db_gr_del(dap_strdup(l_price->key_ptr), GROUP_LOCAL_XCHANGE); bool l_ret = s_xchage_tx_invalidate(l_price, l_wallet); // may be changed to old price later dap_chain_wallet_close(l_wallet); if (!l_ret) { diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index b0c515c239acfe530c2a96af1e4811537115ef46..121cac6f470b81fe3e93f05a95ec2e5fbf7ea3f7 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -78,6 +78,8 @@ typedef struct dap_chain_cs_dag_pvt { #define PVT(a) ((dap_chain_cs_dag_pvt_t *) a->_pvt ) +static void s_dap_chain_cs_dag_purge(dap_chain_t *a_chain); + // Atomic element organization callbacks static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t , size_t); // Accept new event in dag static dap_chain_atom_verify_res_t s_chain_callback_atom_verify(dap_chain_t * a_chain, dap_chain_atom_ptr_t , size_t); // Verify new event in dag @@ -168,6 +170,7 @@ int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) pthread_rwlock_init(& PVT(l_dag)->events_rwlock,NULL); a_chain->callback_delete = dap_chain_cs_dag_delete; + a_chain->callback_purge = s_dap_chain_cs_dag_purge; // Atom element callbacks a_chain->callback_atom_add = s_chain_callback_atom_add ; // Accept new element in chain @@ -227,6 +230,34 @@ int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) return 0; } +static void s_dap_chain_cs_dag_purge(dap_chain_t *a_chain) +{ + dap_chain_cs_dag_pvt_t *l_dag_pvt = PVT(DAP_CHAIN_CS_DAG(a_chain)); + pthread_rwlock_wrlock(&l_dag_pvt->events_rwlock); + dap_chain_cs_dag_event_item_t *l_event_current, *l_event_tmp; + HASH_ITER(hh, l_dag_pvt->events, l_event_current, l_event_tmp) { + HASH_DEL(l_dag_pvt->events, l_event_current); + DAP_DELETE(l_event_current); + } + HASH_ITER(hh, l_dag_pvt->events_lasts_unlinked, l_event_current, l_event_tmp) { + HASH_DEL(l_dag_pvt->events_lasts_unlinked, l_event_current); + DAP_DELETE(l_event_current); + } + HASH_ITER(hh, l_dag_pvt->events_treshold, l_event_current, l_event_tmp) { + HASH_DEL(l_dag_pvt->events_treshold, l_event_current); + DAP_DELETE(l_event_current); + } + HASH_ITER(hh, l_dag_pvt->events_treshold_conflicted, l_event_current, l_event_tmp) { + HASH_DEL(l_dag_pvt->events_treshold_conflicted, l_event_current); + DAP_DELETE(l_event_current); + } + HASH_ITER(hh, l_dag_pvt->tx_events, l_event_current, l_event_tmp) { + HASH_DEL(l_dag_pvt->tx_events, l_event_current); + DAP_DELETE(l_event_current); + } + pthread_rwlock_unlock(&l_dag_pvt->events_rwlock); +} + /** * @brief dap_chain_cs_dag_delete * @param a_dag @@ -234,6 +265,7 @@ int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) */ void dap_chain_cs_dag_delete(dap_chain_t * a_chain) { + s_dap_chain_cs_dag_purge(a_chain); dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG ( a_chain ); pthread_rwlock_destroy(& PVT(l_dag)->events_rwlock); @@ -251,12 +283,12 @@ static int s_dap_chain_add_atom_to_ledger(dap_chain_cs_dag_t * a_dag, dap_ledger switch (l_datum->header.type_id) { case DAP_CHAIN_DATUM_TOKEN_DECL: { dap_chain_datum_token_t *l_token = (dap_chain_datum_token_t*) l_datum->data; - return dap_chain_ledger_token_add(a_ledger, l_token, l_datum->header.data_size); + return dap_chain_ledger_token_load(a_ledger, l_token, l_datum->header.data_size); } break; case DAP_CHAIN_DATUM_TOKEN_EMISSION: { dap_chain_datum_token_emission_t *l_token_emission = (dap_chain_datum_token_emission_t*) l_datum->data; - return dap_chain_ledger_token_emission_add(a_ledger, l_token_emission, l_datum->header.data_size); + return dap_chain_ledger_token_emission_load(a_ledger, l_token_emission, l_datum->header.data_size); } break; case DAP_CHAIN_DATUM_TX: { @@ -270,7 +302,7 @@ static int s_dap_chain_add_atom_to_ledger(dap_chain_cs_dag_t * a_dag, dap_ledger HASH_ADD(hh,PVT(a_dag)->tx_events,hash,sizeof (l_tx_event->hash),l_tx_event); // don't save bad transactions to base - if(dap_chain_ledger_tx_add(a_ledger, l_tx) != 1) { + if(dap_chain_ledger_tx_load(a_ledger, l_tx) != 1) { return -1; } }