diff --git a/libdap-server-core b/libdap-server-core deleted file mode 160000 index 1ae468c517c5f3dc1fd1b604ba4c88040e0f7cbe..0000000000000000000000000000000000000000 --- a/libdap-server-core +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 1ae468c517c5f3dc1fd1b604ba4c88040e0f7cbe diff --git a/libdap-server-core/.gitignore b/libdap-server-core/.gitignore new file mode 100755 index 0000000000000000000000000000000000000000..ea462b27ee244839d596fea3b43f21e69e915b2b --- /dev/null +++ b/libdap-server-core/.gitignore @@ -0,0 +1,3 @@ +build +*.txt.user + diff --git a/libdap-server-core/.gitmodules b/libdap-server-core/.gitmodules new file mode 100755 index 0000000000000000000000000000000000000000..eb9891674f6f70bdec3b61c156df9c67b62ba479 --- /dev/null +++ b/libdap-server-core/.gitmodules @@ -0,0 +1,14 @@ +[submodule "libdap"] + path = libdap + url = https://gitlab.demlabs.net/cellframe/libdap + branch = master + +[submodule "libdap-crypto"] + path = libdap-crypto + url = https://gitlab.demlabs.net/cellframe/libdap-crypto + branch = master + +[submodule "test/libdap-test"] + path = test/libdap-test + url = https://gitlab.demlabs.net/cellframe/libdap-test + branch = master diff --git a/libdap-server-core/.travis.yml b/libdap-server-core/.travis.yml new file mode 100755 index 0000000000000000000000000000000000000000..c5aba3c2c639d08a8bc6f07069cad3e2b9b667a7 --- /dev/null +++ b/libdap-server-core/.travis.yml @@ -0,0 +1,24 @@ +language: c +compiler: gcc +dist: xenial +notifications: + email: false + +before_install: + - git submodule init + - git submodule update + +script: + - mkdir build + - cd build + - cmake -DBUILD_DAP_SERVER_CORE_TESTS=ON ../ + - make + - ctest --verbose + +addons: + apt: + sources: + - ubuntu-toolchain-r-test + packages: + - libev-dev + - libmemcached-dev diff --git a/libdap-server-core/CMakeLists.txt b/libdap-server-core/CMakeLists.txt new file mode 100755 index 0000000000000000000000000000000000000000..fa70477e070998bd926da9c0365694fa2cc07485 --- /dev/null +++ b/libdap-server-core/CMakeLists.txt @@ -0,0 +1,57 @@ +cmake_minimum_required(VERSION 3.0) + +project (dap_server_core C) +set(CMAKE_C_STANDARD 11) + +add_definitions ("-D_GNU_SOURCE") + +if(NOT SUBMODULES_NO_BUILD) + if ( NOT ( TARGET dap_core ) ) + add_subdirectory(libdap) + endif() + + if ( NOT ( TARGET dap_crypto ) ) + add_subdirectory(libdap-crypto) + endif() + +endif() + +if(WIN32) + file(GLOB DAP_SERVER_CORE_SOURCES src/*.c ../3rdparty/wepoll/*.c) + file(GLOB DAP_SERVER_CORE_HEADERS include/*.h ../3rdparty/wepoll/*.h) +else() + file(GLOB DAP_SERVER_CORE_SOURCES src/*.c) + file(GLOB DAP_SERVER_CORE_HEADERS include/*.h) +endif() + +if(WIN32) + include_directories(../3rdparty/uthash/src/) + #include_directories(../3rdparty/curl/include/) + include_directories(../3rdparty/wepoll/) +endif() + +add_library(${PROJECT_NAME} STATIC ${DAP_SERVER_CORE_HEADERS} ${DAP_SERVER_CORE_SOURCES}) + +if(WIN32) + target_link_libraries(${PROJECT_NAME} dap_core dap_crypto) + target_include_directories(${PROJECT_NAME} INTERFACE ../3rdparty/wepoll) +endif() + +if(UNIX) + target_link_libraries(${PROJECT_NAME} dap_core dap_crypto ev) + + if(NOT ANDROID) + target_link_libraries(${PROJECT_NAME} pthread) + endif() + +endif() + +target_include_directories(${PROJECT_NAME} PUBLIC include) +target_include_directories(${PROJECT_NAME} PRIVATE src) + + +if (${BUILD_DAP_SERVER_CORE_TESTS} MATCHES ON) + enable_testing() + add_subdirectory(test) +endif() + diff --git a/libdap-server-core/README.md b/libdap-server-core/README.md new file mode 100755 index 0000000000000000000000000000000000000000..6195103d43d2d68374b1decf37a0ef3f688c6c94 --- /dev/null +++ b/libdap-server-core/README.md @@ -0,0 +1,3 @@ +# libdap-server-core + +[](https://travis-ci.com/kelvinblockchain/libdap-server-core) diff --git a/libdap-server-core/include/dap_client_remote.h b/libdap-server-core/include/dap_client_remote.h new file mode 100755 index 0000000000000000000000000000000000000000..00a70653d7f63c2d836c17f9586b6b6f6a61eab6 --- /dev/null +++ b/libdap-server-core/include/dap_client_remote.h @@ -0,0 +1,132 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2017-2018 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#pragma once + +#include <stdint.h> +#include <stddef.h> +#include <stdbool.h> +#include "uthash.h" + +#ifndef WIN32 +#include <sys/epoll.h> +#include <sys/timerfd.h> +#endif + +#ifndef EPOLL_HANDLE + #ifndef WIN32 + #define EPOLL_HANDLE int + #else + #define EPOLL_HANDLE HANDLE + #include "wepoll.h" + #endif +#endif + +typedef char str_ip[16]; + +typedef struct dap_server dap_server_t; +typedef struct dap_server_thread_s dap_server_thread_t; + +struct dap_client_remote; + +typedef void (*dap_server_client_callback_t) (struct dap_client_remote *,void * arg); // Callback for specific client operations + +#define DAP_CLIENT_REMOTE_BUF 500000 +#define CLIENT_ID_SIZE 12 + +typedef char dap_server_client_id[CLIENT_ID_SIZE]; + +typedef struct traffic_stats { + + size_t buf_size_total; + size_t buf_size_total_old; // for calculate speed + double speed_mbs; // MegaBits per second +} traffic_stats_t; + +typedef struct dap_client_remote { + + int socket; + dap_server_client_id id; + + uint32_t flags; + bool no_close; + bool kill_signal; + + uint16_t port; + str_ip s_ip; + + uint32_t buf_out_zero_count; + char buf_in[DAP_CLIENT_REMOTE_BUF]; // Internal buffer for input data + + size_t buf_in_size; // size of data that is in the input buffer + + traffic_stats_t upload_stat; + traffic_stats_t download_stat; + + char buf_out[DAP_CLIENT_REMOTE_BUF]; // Internal buffer for output data + size_t buf_out_offset; + + char hostaddr[1024]; // Address + char service[128]; + + size_t buf_out_size; // size of data that is in the output buffer + struct epoll_event pevent; + + EPOLL_HANDLE efd; // Epoll fd + int tn; // working thread index + + time_t time_connection; + time_t last_time_active; + + struct dap_server *server; + dap_server_thread_t *thread; + + UT_hash_handle hh; + struct dap_client_remote *next, *prev; + struct dap_client_remote *knext, *kprev; + + void *_internal; + void *_inheritor; // Internal data to specific client type, usualy states for state machine + +} dap_client_remote_t; // Node of bidirectional list of clients + +int dap_client_remote_init( void ); // Init clients module +void dap_client_remote_deinit( void ); // Deinit clients module + +dap_client_remote_t *dap_client_remote_create( dap_server_t *sh, int s, dap_server_thread_t *dsth ); +dap_client_remote_t *dap_client_remote_find( int sock, dap_server_thread_t *t ); + +bool dap_client_remote_is_ready_to_read( dap_client_remote_t * sc ); +bool dap_client_remote_is_ready_to_write( dap_client_remote_t * sc ); +void dap_client_remote_ready_to_read( dap_client_remote_t * sc,bool is_ready ); +void dap_client_remote_ready_to_write( dap_client_remote_t * sc,bool is_ready ); + +size_t dap_client_remote_write( dap_client_remote_t *sc, const void * data, size_t data_size ); +size_t dap_client_remote_write_f( dap_client_remote_t *a_client, const char * a_format,... ); +size_t dap_client_remote_read( dap_client_remote_t *sc, void * data, size_t data_size ); + +//void dap_client_remote_remove( dap_client_remote_t *sc, struct dap_server * sh ); // Removes the client from the list +void dap_client_remote_remove( dap_client_remote_t *sc ); + +void dap_client_remote_shrink_buf_in( dap_client_remote_t * cl, size_t shrink_size ); + diff --git a/libdap-server-core/include/dap_events.h b/libdap-server-core/include/dap_events.h new file mode 100755 index 0000000000000000000000000000000000000000..8f6f407d0bbbfb5c6566d5065ecda214d1ab0bf7 --- /dev/null +++ b/libdap-server-core/include/dap_events.h @@ -0,0 +1,90 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2017-2019 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#pragma once + +#ifndef WIN32 +#include <netinet/in.h> + +#include <stdint.h> +#include <pthread.h> +#define EPOLL_HANDLE int +#else +#define EPOLL_HANDLE HANDLE +#endif + +#include "uthash.h" +#include "dap_events_socket.h" +#include "dap_server.h" + +struct dap_events; + +typedef void (*dap_events_callback_t) (struct dap_events *, void *arg); // Callback for specific server's operations + +typedef struct dap_thread { + pthread_t tid; +} dap_thread_t; + +struct dap_worker; + +typedef struct dap_events { + + dap_events_socket_t *sockets; // Hashmap of event sockets + dap_events_socket_t *dlsockets; // Dlist of event sockets + dap_events_socket_t *to_kill_sockets; // Dlist of event sockets + + pthread_rwlock_t sockets_rwlock; + void *_inheritor; // Pointer to the internal data, HTTP for example + dap_thread_t proc_thread; + pthread_rwlock_t servers_rwlock; + +} dap_events_t; + +typedef struct dap_worker +{ + uint32_t event_sockets_count; + //uint32_t event_to_kill_count; + + EPOLL_HANDLE epoll_fd; + uint32_t number_thread; + pthread_mutex_t locker_on_count; + dap_events_t *events; + +} dap_worker_t; + +int32_t dap_events_init( uint32_t a_threads_count, size_t conn_t ); // Init server module +void dap_events_deinit( ); // Deinit server module + +void dap_events_thread_wake_up( dap_thread_t *th ); +dap_events_t* dap_events_new( ); +void dap_events_delete( dap_events_t * sh ); + +int32_t dap_events_start( dap_events_t *sh ); +int32_t dap_events_wait( dap_events_t *sh ); + +uint32_t dap_worker_get_index_min( ); +dap_worker_t *dap_worker_get_min( ); + +void dap_worker_add_events_socket( dap_events_socket_t * a_events_socket ); +void dap_worker_print_all( ); + diff --git a/libdap-server-core/include/dap_events_socket.h b/libdap-server-core/include/dap_events_socket.h new file mode 100755 index 0000000000000000000000000000000000000000..deb34092c49ab0c262b468c5e69713cf66565f81 --- /dev/null +++ b/libdap-server-core/include/dap_events_socket.h @@ -0,0 +1,173 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2017-2019 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#pragma once + +#include <stdint.h> +#include <stddef.h> +#include <stdbool.h> +#include "uthash.h" +#ifndef _WIN32 +#include <sys/epoll.h> +#include <pthread.h> +#else +#include "wepoll.h" +#endif + +struct dap_events; +struct dap_events_socket; +struct dap_worker; + +typedef struct dap_server dap_server_t; +typedef void (*dap_events_socket_callback_t) (struct dap_events_socket *,void * arg); // Callback for specific client operations + +typedef struct dap_events_socket_callbacks { + + dap_events_socket_callback_t new_callback; // Create new client callback + dap_events_socket_callback_t delete_callback; // Delete client callback + dap_events_socket_callback_t read_callback; // Read function + dap_events_socket_callback_t write_callback; // Write function + dap_events_socket_callback_t error_callback; // Error processing function + +} dap_events_socket_callbacks_t; + +#define DAP_EVENTS_SOCKET_BUF 100000 + +#if 0 +typedef struct dap_events_socket { + + int socket; + bool signal_close; + + bool _ready_to_write; + bool _ready_to_read; + + uint32_t buf_out_zero_count; + union{ + uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data + char buf_in_str[DAP_EVENTS_SOCKET_BUF+1]; + }; + size_t buf_in_size; // size of data that is in the input buffer + + uint8_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data + + char hostaddr[1024]; // Address + char service[128]; + + size_t buf_out_size; // size of data that is in the output buffer + + struct dap_events *events; + + struct dap_worker *dap_worker; + dap_events_socket_callbacks_t *callbacks; + + time_t time_connection; + time_t last_ping_request; + bool is_pingable; + + UT_hash_handle hh; + + void * _inheritor; // Inheritor data to specific client type, usualy states for state machine +} dap_events_socket_t; // Node of bidirectional list of clients +#endif + +typedef enum { + DESCRIPTOR_TYPE_SOCKET = 0, + DESCRIPTOR_TYPE_FILE +} dap_events_desc_type_t; + +typedef struct dap_events_socket { + + int32_t socket; + dap_events_desc_type_t type; + + uint32_t flags; +// bool signal_close; + bool no_close; + bool kill_signal; +// bool _ready_to_write; +// bool _ready_to_read; + + uint32_t buf_out_zero_count; + union{ + uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data + char buf_in_str[DAP_EVENTS_SOCKET_BUF+1]; + }; + size_t buf_in_size; // size of data that is in the input buffer + + uint8_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data + + char hostaddr[1024]; // Address + char service[128]; + + size_t buf_out_size; // size of data that is in the output buffer + + struct dap_events *events; + struct dap_worker *dap_worker; + struct epoll_event ev; + + dap_events_socket_callbacks_t *callbacks; + + time_t time_connection; + time_t last_time_active; + time_t last_ping_request; + bool is_pingable; + + UT_hash_handle hh; + struct dap_events_socket *next, *prev; + struct dap_events_socket *knext, *kprev; + + void *_inheritor; // Inheritor data to specific client type, usualy states for state machine + + pthread_mutex_t write_hold; +} dap_events_socket_t; // Node of bidirectional list of clients + +int dap_events_socket_init(); // Init clients module +void dap_events_socket_deinit(); // Deinit clients module + +void dap_events_socket_create_after(dap_events_socket_t * a_es); + +dap_events_socket_t * dap_events_socket_wrap_no_add(struct dap_events * a_events, + int s, dap_events_socket_callbacks_t * a_callbacks); // Create new client and add it to the list + + +dap_events_socket_t * dap_events_socket_find(int sock, struct dap_events * sh); // Find client by socket + +bool dap_events_socket_is_ready_to_read(dap_events_socket_t * sc); +bool dap_events_socket_is_ready_to_write(dap_events_socket_t * sc); +void dap_events_socket_set_readable(dap_events_socket_t * sc,bool is_ready); +void dap_events_socket_set_writable(dap_events_socket_t * sc,bool is_ready); + +size_t dap_events_socket_write(dap_events_socket_t *sc, const void * data, size_t data_size); +size_t dap_events_socket_write_f(dap_events_socket_t *sc, const char * format,...); +size_t dap_events_socket_read(dap_events_socket_t *sc, void * data, size_t data_size); + +void dap_events_socket_remove( dap_events_socket_t *a_es); +void dap_events_socket_delete(dap_events_socket_t *sc,bool preserve_inheritor); // Removes the client from the list +void dap_events_socket_remove_and_delete(dap_events_socket_t* a_es, bool preserve_inheritor ); +int dap_events_socket_kill_socket( dap_events_socket_t *a_es ); + + + +void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size); + diff --git a/libdap-server-core/include/dap_server.h b/libdap-server-core/include/dap_server.h new file mode 100755 index 0000000000000000000000000000000000000000..77f0004e0d1ee306a409e37969f55df2a37c6397 --- /dev/null +++ b/libdap-server-core/include/dap_server.h @@ -0,0 +1,149 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#pragma once + +#ifndef _WIN32 +#include <netinet/in.h> +#include <stdint.h> +#include <sys/epoll.h> +#include <sys/timerfd.h> +#define EPOLL_HANDLE int +#else +#define EPOLL_HANDLE HANDLE +#define MSG_DONTWAIT 0 +#define MSG_NOSIGNAL 0 +#include "winsock.h" +#include "wepoll.h" +#endif + +#include <pthread.h> +#include "uthash.h" +#include "utlist.h" + +#include "dap_cpu_monitor.h" +#include "dap_client_remote.h" + +typedef enum dap_server_type {DAP_SERVER_TCP} dap_server_type_t; + +#define BIT( x ) ( 1 << x ) + +#define DAP_SOCK_READY_TO_READ BIT( 0 ) +#define DAP_SOCK_READY_TO_WRITE BIT( 1 ) +#define DAP_SOCK_SIGNAL_CLOSE BIT( 2 ) +#define DAP_SOCK_ACTIVE BIT( 3 ) + +typedef struct dap_server_thread_s { + + EPOLL_HANDLE epoll_fd; + + uint32_t thread_num; + uint32_t connections_count; + uint32_t to_kill_count; + + struct epoll_event *epoll_events; + dap_client_remote_t *dap_remote_clients; + dap_client_remote_t *hclients; // Hashmap of clients + dap_client_remote_t *dap_clients_to_kill; + + pthread_mutex_t mutex_dlist_add_remove; + pthread_mutex_t mutex_on_hash; + +} dap_server_thread_t; + +struct dap_server; + +typedef void (*dap_server_callback_t)( struct dap_server *,void * arg ); // Callback for specific server's operations + +typedef struct dap_server { + + dap_server_type_t type; // Server's type + uint16_t port; // Listen port + char *address; // Listen address + + int32_t socket_listener; // Socket for listener + EPOLL_HANDLE epoll_fd; // Epoll fd + + struct sockaddr_in listener_addr; // Kernel structure for listener's binded address + + void *_inheritor; // Pointer to the internal data, HTTP for example + + dap_cpu_stats_t cpu_stats; + + dap_server_callback_t server_delete_callback; + + dap_server_client_callback_t client_new_callback; // Create new client callback + dap_server_client_callback_t client_delete_callback; // Delete client callback + dap_server_client_callback_t client_read_callback; // Read function + dap_server_client_callback_t client_write_callback; // Write function + dap_server_client_callback_t client_error_callback; // Error processing function + +} dap_server_t; + +int32_t dap_server_init( uint32_t count_threads ); // Init server module +void dap_server_deinit( void ); // Deinit server module + +dap_server_t *dap_server_listen( const char *addr, uint16_t port, dap_server_type_t type ); + +int32_t dap_server_loop( dap_server_t *d_server ); +void dap_server_loop_stop( void ); + +#define DL_LIST_REMOVE_NODE( head, obj, _prev_, _next_, total ) \ + \ + if ( obj->_next_ ) { \ + \ + if ( obj->_prev_ ) \ + obj->_next_->_prev_ = obj->_prev_; \ + else { \ + \ + obj->_next_->_prev_ = NULL; \ + head = obj->_next_; \ + } \ + } \ + \ + if ( obj->_prev_ ) { \ + \ + if ( obj->_next_ ) \ + obj->_prev_->_next_ = obj->_next_; \ + else { \ + \ + obj->_prev_->_next_ = NULL; \ + } \ + } \ + -- total; + +#define DL_LIST_ADD_NODE_HEAD( head, obj, _prev_, _next_, total )\ + \ + if ( !total ) { \ + \ + obj->_prev_ = NULL; \ + obj->_next_ = NULL; \ + \ + head = obj; \ + } \ + else { \ + \ + head->_prev_ = obj; \ + \ + obj->_prev_ = NULL; \ + obj->_next_ = head; \ + \ + head = obj; \ + } \ + ++ total; diff --git a/libdap-server-core/include/dap_traffic_track.h b/libdap-server-core/include/dap_traffic_track.h new file mode 100755 index 0000000000000000000000000000000000000000..1cfae9d22689621d0c99a746f96d07a9321543a3 --- /dev/null +++ b/libdap-server-core/include/dap_traffic_track.h @@ -0,0 +1,50 @@ +/* + * Authors: + * Anatoliy Jurotich <anatoliy.kurotich@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2017-2018 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +#include "dap_client_remote.h" +#include "dap_server.h" + +typedef void (*dap_traffic_callback_t) (dap_server_t *); + +/** + * @brief dap_traffic_track_init + * @param clients + * @param timeout callback + */ +void dap_traffic_track_init( dap_server_t *server, time_t timeout ); + +/** + * @brief dap_traffic_track_deinit + */ +void dap_traffic_track_deinit( void ); + +/** + * @brief dap_traffic_add_callback + */ +void dap_traffic_callback_set( dap_traffic_callback_t ); + +void dap_traffic_callback_stop( void ); + diff --git a/libdap-server-core/libdap b/libdap-server-core/libdap new file mode 160000 index 0000000000000000000000000000000000000000..6c5dd5a5b0de5573b03ed144c651352467a56101 --- /dev/null +++ b/libdap-server-core/libdap @@ -0,0 +1 @@ +Subproject commit 6c5dd5a5b0de5573b03ed144c651352467a56101 diff --git a/libdap-server-core/libdap-crypto b/libdap-server-core/libdap-crypto new file mode 160000 index 0000000000000000000000000000000000000000..70bfa60b850cdb0a39cbe212439de9e9daf750d1 --- /dev/null +++ b/libdap-server-core/libdap-crypto @@ -0,0 +1 @@ +Subproject commit 70bfa60b850cdb0a39cbe212439de9e9daf750d1 diff --git a/libdap-server-core/src/dap_client_remote.c b/libdap-server-core/src/dap_client_remote.c new file mode 100755 index 0000000000000000000000000000000000000000..a4ac7c9dbe737e8d128260384744dfbb928a3a92 --- /dev/null +++ b/libdap-server-core/src/dap_client_remote.c @@ -0,0 +1,351 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2017-2018 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#ifndef _WIN32 +#include <unistd.h> +#include <arpa/inet.h> +#include <sys/epoll.h> +#include <sys/timerfd.h> +#else +#include <winsock2.h> +#include <windows.h> +#include <mswsock.h> +#include <ws2tcpip.h> +#include <io.h> +#include <pthread.h> +#endif + +#include "dap_common.h" +#include "dap_server.h" +#include "dap_client_remote.h" + +#define LOG_TAG "dap_client_remote" + +/** + * @brief dap_client_init Init clients module + * @return Zero if ok others if no + */ +int dap_client_remote_init( ) +{ + log_it( L_NOTICE, "Initialized socket client module" ); + + return 0; +} + +/** + * @brief dap_client_deinit Deinit clients module + */ +void dap_client_remote_deinit( ) +{ + + return; +} + +/** + * @brief _save_ip_and_port + * @param cl + */ +void _save_ip_and_port( dap_client_remote_t * cl ) +{ + struct sockaddr_in ip_adr_get; + socklen_t ip_adr_len; + + getpeername( cl->socket, (struct sockaddr * restrict)&ip_adr_get, &ip_adr_len ); + + cl->port = ntohs( ip_adr_get.sin_port ); + strcpy( cl->s_ip, inet_ntoa(ip_adr_get.sin_addr) ); +} + +/** + * @brief dap_client_remote_create Create new client and add it to the list + * @param sh Server instance + * @param s Client's socket + * @return Pointer to the new list's node + */ +dap_client_remote_t *dap_client_remote_create( dap_server_t *sh, int s, dap_server_thread_t *t ) +{ + dap_client_remote_t *dsc = DAP_NEW_Z( dap_client_remote_t ); + + dap_random_string_fill( dsc->id, CLIENT_ID_SIZE ); + + dsc->socket = s; + dsc->server = sh; + dsc->tn = t->thread_num; + dsc->thread = t; + dsc->efd = t->epoll_fd; + dsc->time_connection = dsc->last_time_active = time( NULL) ; + + dsc->pevent.events = EPOLLIN | EPOLLOUT | EPOLLERR; + dsc->pevent.data.ptr = dsc; + + dsc->flags = DAP_SOCK_READY_TO_READ; + dsc->buf_out_offset = 0; + + _save_ip_and_port( dsc ); + + pthread_mutex_lock( &t->mutex_on_hash ); + HASH_ADD_INT( t->hclients, socket, dsc ); + pthread_mutex_unlock( &t->mutex_on_hash ); + + if ( sh->client_new_callback ) + sh->client_new_callback( dsc, NULL ); // Init internal structure + + log_it(L_DEBUG, "Create remote client: ip: %s port %d", dsc->s_ip, dsc->port ); + + //log_it(L_DEBUG, "Create new client. ID: %s", dsc->id); + return dsc; +} + +/** + * @brief safe_client_remove Removes the client from the list + * @param sc Client instance + */ +void dap_client_remote_remove( dap_client_remote_t *sc ) +{ + log_it(L_DEBUG, "dap_client_remote_remove [THREAD %u] efd %u", sc->tn , sc->efd ); + + dap_server_thread_t *t = sc->thread; + + pthread_mutex_lock( &t->mutex_on_hash ); + HASH_DEL( t->hclients, sc ); + pthread_mutex_unlock( &t->mutex_on_hash ); + + if( sc->server->client_delete_callback ) { + sc->server->client_delete_callback( sc, NULL ); // Init internal structure + } + + if( sc->_inheritor ) { + free( sc->_inheritor ); + } + + if( sc->socket ) { + log_it( L_INFO, "dap_client_remote_remove close( %d );", sc->socket ); +#ifdef _WIN32 + closesocket( sc->socket ); +#else + close( sc->socket ); +#endif + } + + free( sc ); +} + +/** + * @brief dap_server_client_find + * @param sock + * @param sh + * @return + */ +dap_client_remote_t *dap_client_remote_find( int sock, dap_server_thread_t *t ) +{ + dap_client_remote_t *ret = NULL; + + pthread_mutex_lock( &t->mutex_on_hash ); + HASH_FIND_INT( t->hclients, &sock, ret ); + pthread_mutex_unlock( &t->mutex_on_hash ); + + return ret; +} + +/** + * @brief dap_client_remote_ready_to_read + * @param sc + * @param isReady + */ +void dap_client_remote_ready_to_read( dap_client_remote_t *sc, bool is_ready ) +{ + if( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_READ) ) + return; + +// log_it( L_ERROR, "remote_ready_to_read() %u efd %X", sc->socket, sc->efd ); + + if ( is_ready ) + sc->flags |= DAP_SOCK_READY_TO_READ; + else + sc->flags ^= DAP_SOCK_READY_TO_READ; + + int events = EPOLLERR; + + if( sc->flags & DAP_SOCK_READY_TO_READ ) + events |= EPOLLIN; + + if( sc->flags & DAP_SOCK_READY_TO_WRITE ) + events |= EPOLLOUT; + + sc->pevent.events = events; + + if( epoll_ctl(sc->efd, EPOLL_CTL_MOD, sc->socket, &sc->pevent) != 0 ) + log_it( L_ERROR, "epoll_ctl failed 000" ); +} + +/** + * @brief dap_client_remote_ready_to_write + * @param sc + * @param isReady + */ +void dap_client_remote_ready_to_write( dap_client_remote_t *sc, bool is_ready ) +{ + if ( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_WRITE) ) + return; + +// log_it( L_ERROR, "remote_ready_to_write() %u efd %X", sc->socket, sc->efd ); + + if ( is_ready ) + sc->flags |= DAP_SOCK_READY_TO_WRITE; + else + sc->flags ^= DAP_SOCK_READY_TO_WRITE; + + int events = EPOLLERR; + + if( sc->flags & DAP_SOCK_READY_TO_READ ) + events |= EPOLLIN; + + if( sc->flags & DAP_SOCK_READY_TO_WRITE ) + events |= EPOLLOUT; + + sc->pevent.events = events; + + if( epoll_ctl(sc->efd, EPOLL_CTL_MOD, sc->socket, &sc->pevent) != 0 ) + log_it( L_ERROR, "epoll_ctl failed 001" ); +} + +/** + * @brief dap_client_write Write data to the client + * @param sc Client instance + * @param data Pointer to data + * @param data_size Size of data to write + * @return Number of bytes that were placed into the buffer + */ +size_t dap_client_remote_write( dap_client_remote_t *sc, const void * data, size_t data_size ) +{ + if ( sc->buf_out_size + data_size > sizeof(sc->buf_out) ) { + log_it( L_WARNING, "Client buffer overflow. Packet loosed" ); + return 0; + } + + memcpy( sc->buf_out + sc->buf_out_size, data, data_size ); + sc->buf_out_size += data_size; + return data_size; +} + +/** + * @brief dap_client_write_f Write formatted text to the client + * @param a_client Client instance + * @param a_format Format + * @return Number of bytes that were placed into the buffer + */ +size_t dap_client_remote_write_f( dap_client_remote_t *a_client, const char * a_format, ... ) +{ + size_t max_data_size = sizeof( a_client->buf_out ) - a_client->buf_out_size; + + va_list ap; + va_start( ap, a_format ); + + int ret = dap_vsnprintf( a_client->buf_out + a_client->buf_out_size, max_data_size, a_format, ap ); + + va_end( ap ); + + if( ret > 0 ) { + a_client->buf_out_size += (unsigned long)ret; + return (size_t)ret; + } + else { + log_it( L_ERROR, "Can't write out formatted data '%s'", a_format ); + return 0; + } +} + +/** + * @brief dap_client_read Read data from input buffer + * @param a_client Client instasnce + * @param a_data Pointer to memory where to store the data + * @param a_data_size Size of data to read + * @return Actual bytes number that were read + */ +size_t dap_client_remote_read( dap_client_remote_t *a_client, void *a_data, size_t a_data_size ) +{ + if ( a_data_size < a_client->buf_in_size ) { + + memcpy( a_data, a_client->buf_in, a_data_size ); + memmove( a_client->buf_in, a_client->buf_in + a_data_size, a_client->buf_in_size - a_data_size ); + } + else { + if ( a_data_size > a_client->buf_in_size ) { + a_data_size = a_client->buf_in_size; + } + memcpy( a_data, a_client->buf_in, a_data_size ); + } + + a_client->buf_in_size -= a_data_size; + + return a_data_size; +} + + +/** + * @brief dap_client_remote_shrink_client_buf_in Shrink input buffer (shift it left) + * @param a_client Client instance + * @param a_shrink_size Size on wich we shrink the buffer with shifting it left + */ +void dap_client_remote_shrink_buf_in( dap_client_remote_t *a_client, size_t a_shrink_size ) +{ +#if 0 + if((a_shrink_size==0)||(a_client->buf_in_size==0) ){ + return; + }else if(a_client->buf_in_size>a_shrink_size){ + size_t buf_size=a_client->buf_in_size-a_shrink_size; + void * buf = malloc(buf_size); + memcpy(buf,a_client->buf_in+ a_shrink_size,buf_size ); + memcpy(a_client->buf_in,buf,buf_size); + a_client->buf_in_size=buf_size; + free(buf); + }else { + a_client->buf_in_size=0; + } +#endif + + if ( a_shrink_size == 0 || a_client->buf_in_size == 0 ) + return; + + if ( a_client->buf_in_size > a_shrink_size ) { + + size_t buf_size = a_client->buf_in_size - a_shrink_size; + memmove( a_client->buf_in, a_client->buf_in + a_shrink_size, buf_size ); +/** + void *buf = malloc( buf_size ); + memcpy( buf, a_client->buf_in + a_shrink_size, buf_size ); + memcpy( a_client->buf_in, buf, buf_size ); + // holy shit + a_client->buf_in_size = buf_size; + free( buf ); +**/ + a_client->buf_in_size = buf_size; + } + else { + a_client->buf_in_size = 0; + } +} diff --git a/libdap-server-core/src/dap_events.c b/libdap-server-core/src/dap_events.c new file mode 100755 index 0000000000000000000000000000000000000000..263474944d846c89305025a7bee6459a60fc26d1 --- /dev/null +++ b/libdap-server-core/src/dap_events.c @@ -0,0 +1,601 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Limited https://demlabs.net + * Cellframe https://cellframe.net + * Copyright (c) 2017-2020 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#define __USE_GNU + +#include <string.h> +#include <time.h> +#include <stdio.h> +#include <stdio.h> +#include <stdlib.h> +#include <stddef.h> +#include <stdint.h> + +#ifndef _WIN32 +#include <arpa/inet.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/stat.h> + +#include <sys/epoll.h> + +#include <netdb.h> +#include <unistd.h> +#include <fcntl.h> +#include <errno.h> +#include <signal.h> + +#if 1 +#include <sys/timerfd.h> +#elif defined(DAP_OS_ANDROID) +#define NO_POSIX_SHED +#define NO_TIMER +#endif + +#else +#include <winsock2.h> +#include <windows.h> +#include <mswsock.h> +#include <ws2tcpip.h> +#include <io.h> +#include <pthread.h> +#endif + +#include <utlist.h> +#include <sched.h> + +#include "dap_common.h" +#include "dap_strfuncs.h" +#include "dap_server.h" +#include "dap_events.h" + +#define DAP_MAX_EPOLL_EVENTS 8192 + +//typedef struct open_connection_info_s { +// dap_events_socket_t *es; +// struct open_connection_info *prev; +// struct open_connection_info *next; +//} dap_events_socket_info_t; + +//dap_events_socket_info_t **s_dap_events_sockets; + +static uint32_t s_threads_count = 1; +static size_t s_connection_timeout = 6000; +static struct epoll_event *g_epoll_events = NULL; + +bool s_workers_init = false; +dap_worker_t *s_workers = NULL; +dap_thread_t *s_threads = NULL; + +#define LOG_TAG "dap_events" + +uint32_t s_get_cpu_count( ) +{ +#ifdef _WIN32 + SYSTEM_INFO si; + + GetSystemInfo( &si ); + return si.dwNumberOfProcessors; +#else +#ifndef NO_POSIX_SHED + cpu_set_t cs; + CPU_ZERO( &cs ); + sched_getaffinity( 0, sizeof(cs), &cs ); + + uint32_t count = 0; + for ( int i = 0; i < 32; i++ ){ + if ( CPU_ISSET(i, &cs) ) + count ++; + } + return count; +#else + return 1; +#endif +#endif +} + +/** + * @brief sa_server_init Init server module + * @arg a_threads_count number of events processor workers in parallel threads + * @return Zero if ok others if no + */ +int32_t dap_events_init( uint32_t a_threads_count, size_t conn_timeout ) +{ + s_threads_count = a_threads_count ? a_threads_count : s_get_cpu_count( ); + + if ( conn_timeout ) + s_connection_timeout = conn_timeout; + + s_workers = (dap_worker_t *) calloc( 1, sizeof(dap_worker_t) * s_threads_count ); + s_threads = (dap_thread_t *) calloc( 1, sizeof(dap_thread_t) * s_threads_count ); + if ( !s_workers || !s_threads ) + goto err; + + g_epoll_events = (struct epoll_event *)malloc( sizeof(struct epoll_event) * DAP_MAX_EPOLL_EVENTS * s_threads_count ); + if ( !g_epoll_events ) + goto err; + + if ( dap_events_socket_init() != 0 ) { + + log_it( L_CRITICAL, "Can't init client submodule dap_events_socket_init( )" ); + goto err; + } + s_workers_init = true; + + log_it( L_NOTICE, "Initialized socket server module" ); + + #ifndef _WIN32 + signal( SIGPIPE, SIG_IGN ); + #endif + return 0; + +err: + dap_events_deinit( ); + return -1; +} + +/** + * @brief sa_server_deinit Deinit server module + */ +void dap_events_deinit( ) +{ + dap_events_socket_deinit( ); + + if ( g_epoll_events ) + free( g_epoll_events ); + + if ( s_threads ) + free( s_threads ); + + if ( s_workers ) + free( s_workers ); +} + +/** + * @brief server_new Creates new empty instance of server_t + * @return New instance + */ +dap_events_t * dap_events_new( ) +{ + dap_events_t *ret = (dap_events_t *)calloc( 1, sizeof(dap_events_t) ); + + pthread_rwlock_init( &ret->sockets_rwlock, NULL ); + pthread_rwlock_init( &ret->servers_rwlock, NULL ); + + return ret; +} + +/** + * @brief server_delete Delete event processor instance + * @param sh Pointer to the server instance + */ +void dap_events_delete( dap_events_t *a_events ) +{ + dap_events_socket_t *cur, *tmp; + + if ( a_events ) { + + HASH_ITER( hh, a_events->sockets,cur, tmp ) { + dap_events_socket_delete( cur, true ); + } + + if ( a_events->_inheritor ) + free( a_events->_inheritor ); + + pthread_rwlock_destroy( &a_events->servers_rwlock ); + pthread_rwlock_destroy( &a_events->sockets_rwlock ); + + free( a_events ); + } +} + +/** + * @brief s_socket_info_all_check_activity + * @param n_thread + * @param sh + */ +static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t *d_ev, time_t cur_time ) +{ + dap_events_socket_t *a_es, *tmp; + + pthread_mutex_lock( &dap_worker->locker_on_count ); + DL_FOREACH_SAFE( d_ev->dlsockets, a_es, tmp ) { + + if ( !a_es->kill_signal && cur_time >= a_es->last_time_active + s_connection_timeout && !a_es->no_close ) { + + log_it( L_INFO, "Socket %u timeout, closing...", a_es->socket ); + if (a_es->callbacks->error_callback) { + a_es->callbacks->error_callback(a_es, (void *)ETIMEDOUT); + } + + if ( epoll_ctl( dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 ) + log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" ); + else + log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", dap_worker->number_thread ); + + dap_worker->event_sockets_count --; + DL_DELETE( d_ev->dlsockets, a_es ); + dap_events_socket_delete( a_es, true ); + } + } + pthread_mutex_unlock( &dap_worker->locker_on_count ); + +} + +/** + * @brief thread_worker_function + * @param arg + * @return + */ +static void *thread_worker_function(void *arg) +{ + dap_events_socket_t *cur; + dap_worker_t *w = (dap_worker_t *) arg; + time_t next_time_timeout_check = time( NULL) + s_connection_timeout / 2; + uint32_t tn = w->number_thread; + +#ifndef _WIN32 +#ifndef NO_POSIX_SHED + cpu_set_t mask; + CPU_ZERO(&mask); + CPU_SET(tn, &mask); + + int err; +#ifndef __ANDROID__ + err = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask); +#else + err = sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &mask); +#endif + if(err) + { + log_it(L_CRITICAL, "Error pthread_setaffinity_np() You really have %d or more core in CPU?", *(int* )arg); + abort(); + } +#endif +#else + + if ( !SetThreadAffinityMask( GetCurrentThread(), (DWORD_PTR)(1 << tn) ) ) { + log_it( L_CRITICAL, "Error pthread_setaffinity_np() You really have %d or more core in CPU?", tn ); + abort(); + } + #endif + + log_it(L_INFO, "Worker %d started, epoll fd %d", w->number_thread, w->epoll_fd); + + struct epoll_event *events = &g_epoll_events[ DAP_MAX_EPOLL_EVENTS * tn]; + +// memset( &ev, 0, sizeof(ev) ); +// memset( &events, 0, sizeof(events) ); + + size_t total_sent; + int bytes_sent; + + while(1) { + + int selected_sockets = epoll_wait(w->epoll_fd, events, DAP_MAX_EPOLL_EVENTS, 1000); + + if(selected_sockets == -1) { + if( errno == EINTR) + continue; + break; + } + + time_t cur_time = time( NULL); + for(int32_t n = 0; n < selected_sockets; n++) { + + cur = (dap_events_socket_t *) events[n].data.ptr; + + if(!cur) { + + log_it(L_ERROR, "dap_events_socket NULL"); + continue; + } + //log_it(L_DEBUG, "Worker=%d fd=%d socket=%d event=0x%x(%d)", w->number_thread, w->epoll_fd,cur->socket, events[n].events,events[n].events); + int l_sock_err, l_sock_err_size; + //connection already closed (EPOLLHUP - shutdown has been made in both directions) + if(events[n].events & EPOLLHUP) { // && events[n].events & EPOLLERR) { + getsockopt(cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size); + //if(!(events[n].events & EPOLLIN)) + //cur->no_close = false; + if (l_sock_err) { + cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + log_it(L_DEBUG, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err)); + if(!(events[n].events & EPOLLERR)) + cur->callbacks->error_callback(cur, NULL); // Call callback to process error event + } + } + + if(events[n].events & EPOLLERR) { + getsockopt(cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size); + log_it(L_ERROR, "Socket error: %s", strerror(l_sock_err)); + cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + cur->callbacks->error_callback(cur, NULL); // Call callback to process error event + } + + cur->last_time_active = cur_time; + + if(events[n].events & EPOLLIN) { + + //log_it(DEBUG,"Comes connection in active read set"); + if(cur->buf_in_size == sizeof(cur->buf_in)) { + log_it(L_WARNING, "Buffer is full when there is smth to read. Its dropped!"); + cur->buf_in_size = 0; + } + + int32_t bytes_read = 0; + if(cur->type == DESCRIPTOR_TYPE_SOCKET) { + bytes_read = recv(cur->socket, (char *) (cur->buf_in + cur->buf_in_size), + sizeof(cur->buf_in) - cur->buf_in_size, 0); + }else if(cur->type = DESCRIPTOR_TYPE_FILE) { + bytes_read = read(cur->socket, (char *) (cur->buf_in + cur->buf_in_size), + sizeof(cur->buf_in) - cur->buf_in_size); + } + + if(bytes_read > 0) { + cur->buf_in_size += bytes_read; + //log_it(DEBUG, "Received %d bytes", bytes_read); + cur->callbacks->read_callback(cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well + } + else if(bytes_read < 0) { + log_it(L_ERROR, "Some error occured in recv() function: %s", strerror(errno)); + dap_events_socket_set_readable(cur, false); + cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + } + else if(bytes_read == 0) { + log_it(L_INFO, "Client socket disconnected"); + dap_events_socket_set_readable(cur, false); + cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + } + } + + // Socket is ready to write + if(((events[n].events & EPOLLOUT) || (cur->flags & DAP_SOCK_READY_TO_WRITE)) + && !(cur->flags & DAP_SOCK_SIGNAL_CLOSE)) { + ///log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size); + cur->callbacks->write_callback(cur, NULL); // Call callback to process write event + + if(cur->flags & DAP_SOCK_READY_TO_WRITE) { + + static const uint32_t buf_out_zero_count_max = 20; + cur->buf_out[cur->buf_out_size] = 0; + + if(!cur->buf_out_size) { + + //log_it(L_WARNING, "Output: nothing to send. Why we are in write socket set?"); + cur->buf_out_zero_count++; + + if(cur->buf_out_zero_count > buf_out_zero_count_max) { // How many time buf_out on write event could be empty + log_it(L_ERROR, "Output: nothing to send %u times, remove socket from the write set", + buf_out_zero_count_max); + dap_events_socket_set_writable(cur, false); + } + } + else + cur->buf_out_zero_count = 0; + } + for(total_sent = 0; total_sent < cur->buf_out_size;) { // If after callback there is smth to send - we do it + if(cur->type == DESCRIPTOR_TYPE_SOCKET) { + bytes_sent = send(cur->socket, (char *) (cur->buf_out + total_sent), + cur->buf_out_size - total_sent, MSG_DONTWAIT | MSG_NOSIGNAL); + }else if(cur->type == DESCRIPTOR_TYPE_FILE) { + bytes_sent = write(cur->socket, (char *) (cur->buf_out + total_sent), + cur->buf_out_size - total_sent); + } + + if(bytes_sent < 0) { + log_it(L_ERROR, "Some error occured in send(): %s", strerror(errno)); + cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + break; + } + total_sent += bytes_sent; + //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", total_sent,sa_cur->buf_out_size); + } + //log_it(L_DEBUG,"Output: sent %u bytes",total_sent); + pthread_mutex_lock(&cur->write_hold); + cur->buf_out_size -= total_sent; + if (cur->buf_out_size) { + memcpy(cur->buf_out, &cur->buf_out[total_sent], cur->buf_out_size); + } else { + cur->flags &= ~DAP_SOCK_READY_TO_WRITE; + } + pthread_mutex_unlock(&cur->write_hold); + } + + pthread_mutex_lock(&w->locker_on_count); + + if((cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !cur->no_close) { + // protect against double deletion + cur->kill_signal = true; + //dap_events_socket_remove_and_delete(cur, true); + log_it(L_INFO, "Got signal to close %s, sock %u [thread %u]", cur->hostaddr, cur->socket, tn); + } + + if(cur->kill_signal) { + log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ]", cur->socket, tn); + dap_events_socket_remove(cur); + pthread_mutex_unlock(&w->locker_on_count); + dap_events_socket_delete( cur, true); + } + else + pthread_mutex_unlock(&w->locker_on_count); + + /* + if(!w->event_to_kill_count) { + + pthread_mutex_unlock(&w->locker_on_count); + continue; + + do { + +// if ( cur->no_close ) { +// cur = cur->knext; +// continue; +// } + tmp = cur_del->knext; + + // delete only current events_socket because others may be active in the other workers + //if(cur_del == cur) + if(cur->kill_signal) { + log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ]", cur_del->socket, tn); + DL_LIST_REMOVE_NODE(w->events->to_kill_sockets, cur, kprev, knext, w->event_to_kill_count); + dap_events_socket_remove_and_delete(cur_del, true); + } + cur_del = tmp; + + } while(cur_del); + + log_it(L_INFO, "[ Thread %u ] coneections: %u, to kill: %u", tn, w->event_sockets_count, + w->event_to_kill_count); + + pthread_mutex_unlock(&w->locker_on_count); + */ + } // for + +#ifndef NO_TIMER + if(cur_time >= next_time_timeout_check) { + s_socket_all_check_activity(w, w->events, cur_time); + next_time_timeout_check = cur_time + s_connection_timeout / 2; + } +#endif + + } // while + + return NULL; +} + +/** + * @brief dap_worker_get_min + * @return + */ +dap_worker_t *dap_worker_get_min( ) +{ + // wait for s_workers init + while(!s_workers_init) + dap_usleep(DAP_USEC_PER_SEC / 1000); + dap_worker_t *l_workers = &s_workers[dap_worker_get_index_min()]; + // wait for worker start + while(!l_workers->events) + dap_usleep(DAP_USEC_PER_SEC / 1000); + return l_workers; +} + +/** + * @brief dap_worker_get_index_min + * @return + */ +uint32_t dap_worker_get_index_min( ) +{ + uint32_t min = 0; + uint32_t i; + + for( i = 1; i < s_threads_count; i++ ) { + + if ( s_workers[min].event_sockets_count > s_workers[i].event_sockets_count ) + min = i; + } + + return min; +} + +/** + * @brief dap_worker_print_all + */ +void dap_worker_print_all( ) +{ + uint32_t i; + + for( i = 0; i < s_threads_count; i ++ ) { + + log_it( L_INFO, "Worker: %d, count open connections: %d", + s_workers[i].number_thread, s_workers[i].event_sockets_count ); + } +} + +/** + * @brief sa_server_loop Main server loop + * @param sh Server instance + * @return Zero if ok others if not + */ +int dap_events_start( dap_events_t *a_events ) +{ + for( uint32_t i = 0; i < s_threads_count; i++) { + + s_workers[i].epoll_fd = epoll_create( DAP_MAX_EPOLL_EVENTS ); + if ( (intptr_t)s_workers[i].epoll_fd == -1 ) { + log_it(L_CRITICAL, "Error create epoll fd"); + return -1; + } + + //s_workers[i].event_to_kill_count = 0; + s_workers[i].event_sockets_count = 0; + s_workers[i].number_thread = i; + s_workers[i].events = a_events; + + pthread_mutex_init( &s_workers[i].locker_on_count, NULL ); + pthread_create( &s_threads[i].tid, NULL, thread_worker_function, &s_workers[i] ); + } + + return 0; +} + +/** + * @brief dap_events_wait + * @param sh + * @return + */ +int dap_events_wait( dap_events_t *sh ) +{ + (void) sh; + + for( uint32_t i = 0; i < s_threads_count; i ++ ) { + void *ret; + pthread_join( s_threads[i].tid, &ret ); + } + + return 0; +} + +/** + * @brief dap_worker_add_events_socket + * @param a_worker + * @param a_events_socket + */ +void dap_worker_add_events_socket( dap_events_socket_t *a_es) +{ +// struct epoll_event ev = {0}; + dap_worker_t *l_worker = dap_worker_get_min( ); + + a_es->dap_worker = l_worker; + a_es->events = a_es->dap_worker->events; +} + +/** + * @brief dap_events__thread_wake_up + * @param th + */ +void dap_events_thread_wake_up( dap_thread_t *th ) +{ + (void) th; + //pthread_kill(th->tid,SIGUSR1); +} diff --git a/libdap-server-core/src/dap_events_socket.c b/libdap-server-core/src/dap_events_socket.c new file mode 100755 index 0000000000000000000000000000000000000000..d297d1aecdee9ae289438ad437fb0700300a88fc --- /dev/null +++ b/libdap-server-core/src/dap_events_socket.c @@ -0,0 +1,442 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2017-2019 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdlib.h> +#include <stdio.h> +#include <stdarg.h> +#include <string.h> +#include <assert.h> + +#ifndef _WIN32 +#include <sys/epoll.h> +#include <unistd.h> +#include <fcntl.h> +#else +#include <winsock2.h> +#include <windows.h> +#include <mswsock.h> +#include <ws2tcpip.h> +#include <io.h> +#include "wepoll.h" +#include <pthread.h> +#endif + +#include "dap_common.h" +#include "dap_events.h" + +#include "dap_events_socket.h" + +#define LOG_TAG "dap_events_socket" + +/** + * @brief dap_events_socket_init Init clients module + * @return Zero if ok others if no + */ +int dap_events_socket_init( ) +{ + log_it(L_NOTICE,"Initialized socket client module"); + return 0; +} + +/** + * @brief dap_events_socket_deinit Deinit clients module + */ +void dap_events_socket_deinit( ) +{ + +} + + +/** + * @brief dap_events_socket_wrap + * @param a_events + * @param w + * @param s + * @param a_callbacks + * @return + */ +dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, + int a_sock, dap_events_socket_callbacks_t *a_callbacks ) +{ +// assert(a_events); + assert(a_callbacks); + + dap_events_socket_t *ret = DAP_NEW_Z( dap_events_socket_t ); + + ret->socket = a_sock; + ret->events = a_events; + ret->callbacks = a_callbacks; + ret->flags = DAP_SOCK_READY_TO_READ; + ret->no_close = false; + pthread_mutex_init(&ret->write_hold, NULL); + + log_it( L_DEBUG,"Dap event socket wrapped around %d sock a_events = %X", a_sock, a_events ); + + return ret; +} + +/** + * @brief dap_events_socket_create_after + * @param a_es + */ +void dap_events_socket_create_after( dap_events_socket_t *a_es ) +{ + if ( a_es->callbacks->new_callback ) + a_es->callbacks->new_callback( a_es, NULL ); // Init internal structure + + a_es->last_time_active = a_es->last_ping_request = time( NULL ); + + dap_worker_add_events_socket( a_es ); + + pthread_mutex_lock( &a_es->dap_worker->locker_on_count ); + + a_es->dap_worker->event_sockets_count ++; + DL_APPEND( a_es->events->dlsockets, a_es ); + + pthread_rwlock_wrlock( &a_es->events->sockets_rwlock ); + HASH_ADD_INT( a_es->events->sockets, socket, a_es ); + pthread_rwlock_unlock( &a_es->events->sockets_rwlock ); + + a_es->ev.events = EPOLLIN | EPOLLERR | EPOLLOUT; + a_es->ev.data.ptr = a_es; + + if ( epoll_ctl( a_es->dap_worker->epoll_fd, EPOLL_CTL_ADD, a_es->socket, &a_es->ev ) == 1 ) + log_it( L_CRITICAL, "Can't add event socket's handler to epoll_fd" ); + + pthread_mutex_unlock( &a_es->dap_worker->locker_on_count ); +} + +/** + * @brief dap_events_socket_wrap + * @param a_events + * @param w + * @param s + * @param a_callbacks + * @return + */ +dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct dap_events *a_events, + int a_sock, dap_events_socket_callbacks_t *a_callbacks ) +{ + assert( a_events ); + assert( a_callbacks ); + assert( a_server ); + + log_it( L_DEBUG,"Sap event socket wrapped around %d sock", a_sock ); + dap_events_socket_t * ret = DAP_NEW_Z( dap_events_socket_t ); + + ret->socket = a_sock; + ret->events = a_events; + ret->callbacks = a_callbacks; + + ret->flags = DAP_SOCK_READY_TO_READ; + ret->is_pingable = true; + ret->last_time_active = ret->last_ping_request = time( NULL ); + + pthread_rwlock_wrlock( &a_events->sockets_rwlock ); + HASH_ADD_INT( a_events->sockets, socket, ret ); + pthread_rwlock_unlock( &a_events->sockets_rwlock ); + + if( a_callbacks->new_callback ) + a_callbacks->new_callback( ret, NULL ); // Init internal structure + + return ret; +} + +/** + * @brief dap_events_socket_find + * @param sock + * @param sh + * @return + */ +dap_events_socket_t *dap_events_socket_find( int sock, struct dap_events *a_events ) +{ + dap_events_socket_t *ret = NULL; + if(!a_events) + return NULL; + pthread_rwlock_rdlock( &a_events->sockets_rwlock ); + if(a_events->sockets) + HASH_FIND_INT( a_events->sockets, &sock, ret ); + pthread_rwlock_unlock( &a_events->sockets_rwlock ); + + return ret; +} + +/** + * @brief dap_events_socket_ready_to_read + * @param sc + * @param isReady + */ +void dap_events_socket_set_readable( dap_events_socket_t *sc, bool is_ready ) +{ + if( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_READ) ) + return; + + sc->ev.events = EPOLLERR; + + if ( is_ready ) + sc->flags |= DAP_SOCK_READY_TO_READ; + else + sc->flags ^= DAP_SOCK_READY_TO_READ; + + int events = EPOLLERR; + + if( sc->flags & DAP_SOCK_READY_TO_READ ) + events |= EPOLLIN; + + if( sc->flags & DAP_SOCK_READY_TO_WRITE ) + events |= EPOLLOUT; + + sc->ev.events = events; + + if ( epoll_ctl(sc->dap_worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &sc->ev) == -1 ) + log_it( L_ERROR,"Can't update read client socket state in the epoll_fd" ); + else + dap_events_thread_wake_up( &sc->events->proc_thread ); +} + +/** + * @brief dap_events_socket_ready_to_write + * @param sc + * @param isReady + */ +void dap_events_socket_set_writable( dap_events_socket_t *sc, bool is_ready ) +{ + pthread_mutex_lock(&sc->write_hold); + + if ( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_WRITE) ) { + pthread_mutex_unlock(&sc->write_hold); + return; + } + + if ( is_ready ) + sc->flags |= DAP_SOCK_READY_TO_WRITE; + else + sc->flags ^= DAP_SOCK_READY_TO_WRITE; + + int events = EPOLLERR; + + if( sc->flags & DAP_SOCK_READY_TO_READ ) + events |= EPOLLIN; + + if( sc->flags & DAP_SOCK_READY_TO_WRITE ) + events |= EPOLLOUT; + + pthread_mutex_unlock(&sc->write_hold); + + sc->ev.events = events; + + if ( epoll_ctl(sc->dap_worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &sc->ev) == -1 ) + log_it(L_ERROR,"Can't update write client socket state in the epoll_fd"); + else + dap_events_thread_wake_up( &sc->events->proc_thread ); +} + + +/** + * @brief dap_events_socket_kill_socket + * @param sc Connection instance + */ +int dap_events_socket_kill_socket( dap_events_socket_t *a_es ) +{ + if ( !a_es ) { + log_it( L_ERROR, "dap_events_socket_kill_socket( NULL )" ); + return -1; + } + + uint32_t tn = a_es->dap_worker->number_thread; + + dap_worker_t *w = a_es->dap_worker; + //dap_events_t *d_ev = w->events; + + pthread_mutex_lock( &w->locker_on_count ); + if ( a_es->kill_signal ) { + pthread_mutex_unlock( &w->locker_on_count ); + return 0; + } + + log_it( L_DEBUG, "KILL %u socket! (in queue) [ thread %u ]", a_es->socket, tn ); + + a_es->kill_signal = true; + //DL_LIST_ADD_NODE_HEAD( d_ev->to_kill_sockets, a_es, kprev, knext, w->event_to_kill_count ); + + pthread_mutex_unlock( &w->locker_on_count ); + return 0; +} + +/** + * @brief dap_events_socket_remove Removes the client from the list + * @param sc Connection instance + */ +void dap_events_socket_delete( dap_events_socket_t *a_es, bool preserve_inheritor ) +{ + if ( !a_es ) return; + + log_it( L_DEBUG, "es is going to be removed from the lists and free the memory (0x%016X)", a_es ); + + if(!dap_events_socket_find(a_es->socket, a_es->events)){ + log_it( L_ERROR, "dap_events_socket 0x%x already deleted", a_es); + return ; + } + pthread_rwlock_wrlock( &a_es->events->sockets_rwlock ); + if(a_es->events->sockets) + HASH_DEL( a_es->events->sockets, a_es ); + pthread_rwlock_unlock( &a_es->events->sockets_rwlock ); + + log_it( L_DEBUG, "dap_events_socket wrapped around %d socket is removed", a_es->socket ); + + if( a_es->callbacks->delete_callback ) + a_es->callbacks->delete_callback( a_es, NULL ); // Init internal structure + + if ( a_es->_inheritor && !preserve_inheritor ) + DAP_DELETE( a_es->_inheritor ); + + if ( a_es->socket ) { +#ifdef _WIN32 + closesocket( a_es->socket ); +#else + close( a_es->socket ); +#endif + } + pthread_mutex_destroy(&a_es->write_hold); + free( a_es ); +} + +/** + * @brief dap_events_socket_delete + * @param a_es + */ +void dap_events_socket_remove( dap_events_socket_t *a_es) +{ + if ( epoll_ctl( a_es->dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 ) + log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" ); + else + log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_es->dap_worker->number_thread ); + + DL_DELETE( a_es->events->dlsockets, a_es ); + a_es->dap_worker->event_sockets_count --; +} + +void dap_events_socket_remove_and_delete( dap_events_socket_t *a_es, bool preserve_inheritor ) +{ + if ( epoll_ctl( a_es->dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 ) + log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" ); + else + log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_es->dap_worker->number_thread ); + + DL_DELETE( a_es->events->dlsockets, a_es ); + a_es->dap_worker->event_sockets_count --; + + dap_events_socket_delete( a_es, preserve_inheritor ); +} + +/** + * @brief dap_events_socket_write Write data to the client + * @param sc Conn instance + * @param data Pointer to data + * @param data_size Size of data to write + * @return Number of bytes that were placed into the buffer + */ +size_t dap_events_socket_write(dap_events_socket_t *sc, const void * data, size_t data_size) +{ + //log_it(L_DEBUG,"dap_events_socket_write %u sock data %X size %u", sc->socket, data, data_size ); + pthread_mutex_lock(&sc->write_hold); + data_size = ((sc->buf_out_size+data_size)<(sizeof(sc->buf_out)))?data_size:(sizeof(sc->buf_out)-sc->buf_out_size ); + memcpy(sc->buf_out+sc->buf_out_size,data,data_size); + sc->buf_out_size+=data_size; + pthread_mutex_unlock(&sc->write_hold); + return data_size; +} + +/** + * @brief dap_events_socket_write_f Write formatted text to the client + * @param sc Conn instance + * @param format Format + * @return Number of bytes that were placed into the buffer + */ +size_t dap_events_socket_write_f(dap_events_socket_t *sc, const char * format,...) +{ + log_it(L_DEBUG,"dap_events_socket_write_f %u sock", sc->socket ); + + pthread_mutex_lock(&sc->write_hold); + size_t max_data_size = sizeof(sc->buf_out)-sc->buf_out_size; + va_list ap; + va_start(ap,format); + int ret=dap_vsnprintf((char*) sc->buf_out+sc->buf_out_size,max_data_size,format,ap); + va_end(ap); + if(ret>0){ + sc->buf_out_size+=ret; + }else{ + log_it(L_ERROR,"Can't write out formatted data '%s'",format); + } + pthread_mutex_unlock(&sc->write_hold); + return (ret > 0) ? ret : 0; +} + +/** + * @brief dap_events_socket_read Read data from input buffer + * @param sc Conn instasnce + * @param data Pointer to memory where to store the data + * @param data_size Size of data to read + * @return Actual bytes number that were read + */ +size_t dap_events_socket_read(dap_events_socket_t *sc, void *data, size_t data_size) +{ +// log_it(L_DEBUG,"dap_events_socket_read %u sock data %X size %u", sc->socket, data, data_size ); + + if(data_size<sc->buf_in_size){ + memcpy(data,sc->buf_in,data_size); + memmove(data,sc->buf_in+data_size,sc->buf_in_size-data_size); + }else{ + if(data_size>sc->buf_in_size) + data_size=sc->buf_in_size; + memcpy(data,sc->buf_in,data_size); + } + sc->buf_in_size-=data_size; + return data_size; +} + + +/** + * @brief dap_events_socket_shrink_client_buf_in Shrink input buffer (shift it left) + * @param cl Client instance + * @param shrink_size Size on wich we shrink the buffer with shifting it left + */ +void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size) +{ + if((shrink_size==0)||(cl->buf_in_size==0) ){ + return; + }else if(cl->buf_in_size>shrink_size){ + size_t buf_size=cl->buf_in_size-shrink_size; + void * buf = malloc(buf_size); + memcpy(buf,cl->buf_in+ shrink_size,buf_size ); + memcpy(cl->buf_in,buf,buf_size); + cl->buf_in_size=buf_size; + if (buf) + free(buf); + }else{ + //log_it(WARNING,"Shrinking size of input buffer on amount bigger than actual buffer's size"); + cl->buf_in_size=0; + } + +} diff --git a/libdap-server-core/src/dap_server.c b/libdap-server-core/src/dap_server.c new file mode 100755 index 0000000000000000000000000000000000000000..608603446143eba8e0a8c0f49f0b231b6ef34fd7 --- /dev/null +++ b/libdap-server-core/src/dap_server.c @@ -0,0 +1,837 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#define __USE_GNU + +#include <string.h> +#include <time.h> +#include <stdio.h> +#include <stdlib.h> +#include <stddef.h> +#include <stdint.h> + +//#include <errno.h> +#include <signal.h> +#include <stdint.h> +#include <stdatomic.h> + +#ifndef _WIN32 +#include <arpa/inet.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/select.h> +#include <errno.h> +#include <netdb.h> +#include <unistd.h> +#include <fcntl.h> +#include <sys/epoll.h> +#include <sys/timerfd.h> +#else +#include <winsock2.h> +#include <windows.h> +#include <mswsock.h> +#include <ws2tcpip.h> +#include <io.h> +#include <pthread.h> +#endif + +#include <sched.h> + +#if 0 +#define NI_NUMERICHOST 1 /* Don't try to look up hostname. */ +#define NI_NUMERICSERV 2 /* Don't convert port number to name. */ +#define NI_NOFQDN 4 /* Only return nodename portion. */ +#define NI_NAMEREQD 8 /* Don't return numeric addresses. */ +#define NI_DGRAM 16 /* Look up UDP service rather than TCP. */ +#endif + +#include "dap_common.h" +#include "dap_server.h" + +#define LOG_TAG "server" + +#define DAP_MAX_THREAD_EVENTS 8192 +#define DAP_MAX_THREADS 16 + +#define SOCKET_TIMEOUT_TIME 300 +#define SOCKETS_TIMEOUT_CHECK_PERIOD 15 + +static uint32_t _count_threads = 0; +static uint32_t epoll_max_events = 0; +static bool bQuitSignal = false; +static bool moduleInit = false; + +static struct epoll_event *threads_epoll_events = NULL; +static dap_server_t *_current_run_server = NULL; + +static void read_write_cb( dap_client_remote_t *dap_cur, int32_t revents ); + +dap_server_thread_t dap_server_threads[ DAP_MAX_THREADS ]; + +/* +=============================================== + get_epoll_max_user_watches( ) + + return max epoll() event watches +=============================================== +*/ +static uint32_t get_epoll_max_user_watches( void ) +{ + static const char *maxepollpath = "/proc/sys/fs/epoll/max_user_watches"; + uint32_t v = 0, len; + char str[32]; + + FILE *fp = fopen( maxepollpath, "r" ); + if ( !fp ) { +// printf("can't open %s\n", maxepollpath ); + return v; + } + + len = fread( &str[0], 1, 31, fp ); + if ( !len ) { + return v; + } + + str[ len ] = 0; + v = atoi( str ); + + return v; +} + +/* +=============================================== + dap_server_init( ) + + Init server module + return Zero if ok others if no +=============================================== +*/ +int32_t dap_server_init( uint32_t count_threads ) +{ + dap_server_thread_t *dap_thread; + moduleInit = true; + + #ifndef _WIN32 + signal( SIGPIPE, SIG_IGN ); + #endif + + if ( count_threads > DAP_MAX_THREADS ) + count_threads = DAP_MAX_THREADS; + + _count_threads = count_threads; + log_it( L_NOTICE, "dap_server_init() threads %u", count_threads ); + + epoll_max_events = get_epoll_max_user_watches( ); + if ( epoll_max_events > DAP_MAX_THREAD_EVENTS ) + epoll_max_events = DAP_MAX_THREAD_EVENTS; + + threads_epoll_events = (struct epoll_event *)malloc( sizeof(struct epoll_event) * _count_threads * epoll_max_events ); + if ( !threads_epoll_events ) + goto err; + + memset( threads_epoll_events, 0, sizeof(struct epoll_event) * _count_threads * epoll_max_events ); + + dap_thread = &dap_server_threads[0]; + memset( dap_thread, 0, sizeof(dap_server_thread_t) * DAP_MAX_THREADS ); + + for ( uint32_t i = 0; i < _count_threads; ++i, ++dap_thread ) { + #ifndef _WIN32 + dap_thread->epoll_fd = -1; + #else + dap_thread->epoll_fd = (void*)-1; + #endif + dap_thread->thread_num = i; + dap_thread->epoll_events = &threads_epoll_events[ i * epoll_max_events ]; + + pthread_mutex_init( &dap_thread->mutex_dlist_add_remove, NULL ); + pthread_mutex_init( &dap_thread->mutex_on_hash, NULL ); + } + + log_it( L_NOTICE, "Initialized socket server module" ); + + dap_client_remote_init( ); + return 0; + +err:; + + dap_server_deinit( ); + return 1; +} + +void dap_server_loop_stop( void ){ + bQuitSignal = true; + dap_server_deinit(); +} + +/* +========================================================= + dap_server_deinit( ) + + Deinit server module +========================================================= +*/ +void dap_server_deinit( void ) +{ + if (moduleInit) { + dap_client_remote_t *dap_cur, *tmp; + dap_server_thread_t *t = &dap_server_threads[0]; + + dap_client_remote_deinit( ); + + if ( threads_epoll_events ) { + free( threads_epoll_events ); + + for ( uint32_t i = 0; i < _count_threads; ++i, ++t ) { + + HASH_ITER( hh, t->hclients, dap_cur, tmp ) + dap_client_remote_remove( dap_cur ); + + pthread_mutex_destroy( &dap_server_threads[i].mutex_on_hash ); + pthread_mutex_destroy( &dap_server_threads[i].mutex_dlist_add_remove ); + } + } + moduleInit = false; + } +} + +/* +========================================================= + dap_server_new( ) + + Creates new empty instance of dap_server_t +========================================================= +*/ +dap_server_t *dap_server_new( void ) +{ + return (dap_server_t *)calloc( 1, sizeof(dap_server_t) ); +} + +/* +========================================================= + dap_server_new( ) + + Delete server instance +========================================================= +*/ +void dap_server_delete( dap_server_t *sh ) +{ + if ( !sh ) return; + + if( sh->address ) + free( sh->address ); + + if( sh->server_delete_callback ) + sh->server_delete_callback( sh, NULL ); + + if ( sh->_inheritor ) + free( sh->_inheritor ); + + free( sh ); +} + +/* +========================================================= + set_nonblock_socket( ) +========================================================= +*/ +int32_t set_nonblock_socket( int32_t fd ) +{ +#ifdef _WIN32 + unsigned long arg = 1; + return ioctlsocket( fd, FIONBIO, &arg ); +#else + int32_t flags; + + flags = fcntl( fd, F_GETFL ); + flags |= O_NONBLOCK; + + return fcntl( fd, F_SETFL, flags ); +#endif +} + + +/* +========================================================= + get_thread_min_connections( ) + + return number thread which has minimum open connections +========================================================= +*/ +static inline uint32_t get_thread_index_min_connections( ) +{ + uint32_t min = 0; + + for( uint32_t i = 1; i < _count_threads; i ++ ) { + if ( dap_server_threads[min].connections_count > dap_server_threads[i].connections_count ) { + min = i; + } + } + + return min; +} + +/* +========================================================= + print_online( ) + +========================================================= +*/ +static inline void print_online() +{ + for( uint32_t i = 0; i < _count_threads; i ++ ) { + log_it( L_INFO, "Thread number: %u, count: %u", i, dap_server_threads[i].connections_count ); + } +} + +void dap_server_kill_socket( dap_client_remote_t *dcr ) +{ + if ( !dcr ) { + log_it( L_ERROR, "dap_server_kill_socket( NULL )" ); + return; + } + + dap_server_thread_t *dsth = &dap_server_threads[ dcr->tn ]; + + pthread_mutex_lock( &dsth->mutex_dlist_add_remove ); + + if ( dcr->kill_signal ) { + pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); + return; + } + + log_it( L_DEBUG, "KILL %u socket! [ thread %u ]", dcr->socket, dcr->tn ); + + dcr->kill_signal = true; + + DL_LIST_ADD_NODE_HEAD( dsth->dap_clients_to_kill, dcr, kprev, knext, dsth->to_kill_count ); + pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); + + return; +} + +/* +========================================================= + dap_server_add_socket( ) + +========================================================= +*/ +dap_client_remote_t *dap_server_add_socket( int32_t fd, int32_t forced_thread_n ) +{ + uint32_t tn = (forced_thread_n == -1) ? get_thread_index_min_connections( ) : forced_thread_n; + dap_server_thread_t *dsth = &dap_server_threads[ tn ]; + dap_client_remote_t *dcr = dap_client_remote_create( _current_run_server, fd, dsth ); + + if ( !dcr ) { + log_it( L_ERROR, "accept %d dap_client_remote_create() == NULL", fd ); +// pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); + return dcr; + } + + log_it( L_DEBUG, "accept %d Client, thread %d", fd, tn ); + + pthread_mutex_lock( &dsth->mutex_dlist_add_remove ); + + + DL_APPEND( dsth->dap_remote_clients, dcr ); + dsth->connections_count ++; + if ( epoll_ctl( dsth->epoll_fd, EPOLL_CTL_ADD, fd, &dcr->pevent) != 0 ) { + log_it( L_ERROR, "epoll_ctl failed 005" ); + } + pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); + + return dcr; +} + +/* +========================================================= + dap_server_remove_socket( ) + +========================================================= +*/ +void dap_server_remove_socket( dap_client_remote_t *dcr ) +{ + if ( !dcr ) { + log_it( L_ERROR, "dap_server_remove_socket( NULL )" ); + return; + } + + uint32_t tn = dcr->tn; + log_it( L_DEBUG, "dap_server_remove_socket %u thread %u", dcr->socket, tn ); + + dap_server_thread_t *dsth = &dap_server_threads[ tn ]; + + if ( epoll_ctl( dcr->efd, EPOLL_CTL_DEL, dcr->socket, &dcr->pevent ) == -1 ) + log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" ); + +// pthread_mutex_lock( &dsth->mutex_dlist_add_remove ); + DL_DELETE( dsth->dap_remote_clients, dcr ); + dsth->connections_count --; + +// pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); + +// log_it( L_DEBUG, "dcr = %X", dcr ); +} + +static void s_socket_all_check_activity( uint32_t tn, time_t cur_time ) +{ + dap_client_remote_t *dcr, *tmp; + dap_server_thread_t *dsth = &dap_server_threads[ tn ]; + +// log_it( L_INFO,"s_socket_info_all_check_activity() on thread %u", tn ); + + pthread_mutex_lock( &dsth->mutex_dlist_add_remove ); + + DL_FOREACH_SAFE( dsth->dap_remote_clients, dcr, tmp ) { + + if ( !dcr->kill_signal && cur_time >= dcr->last_time_active + SOCKET_TIMEOUT_TIME && !dcr->no_close ) { + + log_it( L_INFO, "Socket %u timeout, closing...", dcr->socket ); + + if ( epoll_ctl( dcr->efd, EPOLL_CTL_DEL, dcr->socket, &dcr->pevent ) == -1 ) + log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" ); + + DL_DELETE( dsth->dap_remote_clients, dcr ); + dsth->connections_count --; + + dap_client_remote_remove( dcr ); + } + } + pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); +} + +/* +========================================================= + read_write_cb( ) + +========================================================= +*/ +static void read_write_cb( dap_client_remote_t *dap_cur, int32_t revents ) +{ +// log_it( L_NOTICE, "[THREAD %u] read_write_cb fd %u revents %u", dap_cur->tn, dap_cur->socket, revents ); +// sleep( 5 ); // ????????? + + if( !dap_cur ) { + + log_it( L_ERROR, "read_write_cb: dap_client_remote NULL" ); + return; + } + + if ( revents & EPOLLIN ) { + +// log_it( L_DEBUG, "[THREAD %u] socket read %d ", dap_cur->tn, dap_cur->socket ); + + int32_t bytes_read = recv( dap_cur->socket, + dap_cur->buf_in + dap_cur->buf_in_size, + sizeof(dap_cur->buf_in) - dap_cur->buf_in_size, + 0 ); + if ( bytes_read > 0 ) { +// log_it( L_DEBUG, "[THREAD %u] read %u socket client said: %s", dap_cur->tn, bytes_read, dap_cur->buf_in + dap_cur->buf_in_size ); + + dap_cur->buf_in_size += (size_t)bytes_read; + dap_cur->upload_stat.buf_size_total += (size_t)bytes_read; + +// log_it( L_DEBUG, "[THREAD %u] read %u socket read callback()", dap_cur->tn, bytes_read ); + _current_run_server->client_read_callback( dap_cur ,NULL ); + } + else if ( bytes_read < 0 ) { + log_it( L_ERROR,"Bytes read Error %s",strerror(errno) ); + if ( strcmp(strerror(errno),"Resource temporarily unavailable") != 0 ) + dap_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + } + else { // bytes_read == 0 + dap_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + log_it( L_DEBUG, "0 bytes read" ); + } + } + + if( ( (revents & EPOLLOUT) || (dap_cur->flags & DAP_SOCK_READY_TO_WRITE) ) && !(dap_cur->flags & DAP_SOCK_SIGNAL_CLOSE) ) { + +// log_it(L_DEBUG, "[THREAD %u] socket write %d ", dap_cur->tn, dap_cur->socket ); + _current_run_server->client_write_callback( dap_cur, NULL ); // Call callback to process write event + + if( dap_cur->buf_out_size == 0 ) { + //log_it(L_DEBUG, "dap_cur->buf_out_size = 0, set ev_read watcher " ); + + dap_cur->pevent.events = EPOLLIN | EPOLLERR; + if( epoll_ctl(dap_cur->efd, EPOLL_CTL_MOD, dap_cur->socket, &dap_cur->pevent) != 0 ) { + log_it( L_ERROR, "epoll_ctl failed 003" ); + } + } + else { +// log_it(L_DEBUG, "[THREAD %u] send dap_cur->buf_out_size = %u , %s", dap_cur->tn, dap_cur->buf_out_size, dap_cur->buf_out ); + + size_t total_sent = dap_cur->buf_out_offset; + + while ( total_sent < dap_cur->buf_out_size ) { + //log_it(DEBUG, "Output: %u from %u bytes are sent ", total_sent, dap_cur->buf_out_size); + ssize_t bytes_sent = send( dap_cur->socket, + dap_cur->buf_out + total_sent, + dap_cur->buf_out_size - total_sent, + MSG_DONTWAIT | MSG_NOSIGNAL ); + if( bytes_sent < 0 ) { + log_it(L_ERROR,"[THREAD %u] Error occured in send() function %s", dap_cur->tn, strerror(errno) ); + dap_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + break; + } + + total_sent += (size_t)bytes_sent; + dap_cur->download_stat.buf_size_total += (size_t)bytes_sent; + } + +// log_it( L_ERROR, "check !" ); + + if( total_sent == dap_cur->buf_out_size ) { + dap_cur->buf_out_offset = dap_cur->buf_out_size = 0; + } + else { + dap_cur->buf_out_offset = total_sent; + } + } // else + } // write + + +// log_it(L_ERROR,"OPA !") ; +// Sleep(200); + +// if ( (dap_cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !dap_cur->no_close ) { +// log_it(L_ERROR,"Close signal" ); + +// dap_server_remove_socket( dap_cur ); +// dap_client_remote_remove( dap_cur, _current_run_server ); +// } + +} + + +/* +========================================================= + dap_server_listen( ) + + Create server_t instance and start to listen tcp port with selected address + +========================================================= +*/ +dap_server_t *dap_server_listen( const char *addr, uint16_t port, dap_server_type_t type ) +{ + dap_server_t* sh = dap_server_new( ); + + sh->socket_listener = -111; + + if( type == DAP_SERVER_TCP ) + sh->socket_listener = socket( AF_INET, SOCK_STREAM, 0 ); + else { + dap_server_delete( sh ); + return NULL; + } + + if ( set_nonblock_socket(sh->socket_listener) == -1 ) { + log_it( L_WARNING, "error server socket nonblock" ); + dap_server_delete( sh ); + return NULL; + } + + if ( sh->socket_listener < 0 ) { + log_it ( L_ERROR,"Socket error %s", strerror(errno) ); + dap_server_delete( sh ); + return NULL; + } + + log_it( L_NOTICE," Socket created..." ); + + int32_t reuse = 1; + + if ( reuse ) + if ( setsockopt( sh->socket_listener, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0 ) + log_it( L_WARNING, "Can't set up REUSEADDR flag to the socket" ); + + sh->listener_addr.sin_family = AF_INET; + sh->listener_addr.sin_port = htons( port ); + inet_pton( AF_INET, addr, &(sh->listener_addr.sin_addr) ); + + if( bind(sh->socket_listener, (struct sockaddr *)&(sh->listener_addr), sizeof(sh->listener_addr)) < 0 ) { + log_it( L_ERROR,"Bind error: %s",strerror(errno) ); + dap_server_delete( sh ); + return NULL; + } + + log_it( L_INFO,"Binded %s:%u", addr, port ); + listen( sh->socket_listener, DAP_MAX_THREAD_EVENTS * _count_threads ); + + return sh; +} + + +/* +========================================================= + thread_loop( ) + + Server listener thread loop +========================================================= +*/ +void *thread_loop( void *arg ) +{ + dap_client_remote_t *dap_cur, *tmp; + dap_server_thread_t *dsth = (dap_server_thread_t *)arg; + uint32_t tn = dsth->thread_num; + EPOLL_HANDLE efd = dsth->epoll_fd; + struct epoll_event *events = dsth->epoll_events; + time_t next_time_timeout_check = time( NULL ) + SOCKETS_TIMEOUT_CHECK_PERIOD; + + log_it(L_NOTICE, "Start loop listener socket thread %u efd %u", tn, efd ); + + #ifndef _WIN32 + cpu_set_t mask; + CPU_ZERO( &mask ); + CPU_SET( tn, &mask ); + + int err; +#ifndef ANDROID + err = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask); +#else + err = sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &mask); +#endif + if (err) { + log_it( L_CRITICAL, "Error pthread_setaffinity_np() You really have %d or more core in CPU?", tn ); + abort(); + } + #else + + if ( !SetThreadAffinityMask( GetCurrentThread(), (DWORD_PTR)(1 << tn) ) ) { + log_it( L_CRITICAL, "Error pthread_setaffinity_np() You really have %d or more core in CPU?", tn ); + abort(); + } + + #endif + + do { + + int32_t n = epoll_wait( efd, events, DAP_MAX_THREAD_EVENTS, 1000 ); + +// log_it(L_WARNING,"[THREAD %u] epoll events %u", tn, n ); +// Sleep(300); + + if ( bQuitSignal ) + break; + + if ( n < 0 ) { + if ( errno == EINTR ) + continue; + break; + } + + time_t cur_time = time( NULL ); + + for ( int32_t i = 0; i < n; ++ i ) { + +// log_it(L_ERROR,"[THREAD %u] process epoll event %u", tn, i ); + dap_cur = (dap_client_remote_t *)events[i].data.ptr; + + if ( !dap_cur ) { + log_it( L_ERROR,"dap_client_remote_t NULL" ); + continue; + } + + dap_cur->last_time_active = cur_time; + if( events[i].events & EPOLLERR ) { + log_it( L_ERROR,"Socket error: %u, remove it" , dap_cur->socket ); + dap_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + } +#ifdef _WIN32 + set_nonblock_socket(dap_cur->socket); // pconst: for winsock2 has no appropriate MSG attributes +#endif + if ( !(dap_cur->flags & DAP_SOCK_SIGNAL_CLOSE) || dap_cur->no_close ) + read_write_cb( dap_cur, events[i].events ); + + if ( (dap_cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !dap_cur->no_close ) { + + pthread_mutex_lock( &dsth->mutex_dlist_add_remove ); + + if ( dap_cur->kill_signal ) { + pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); + continue; + } + +// pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); +// dap_server_kill_socket( dap_cur ); +// continue; + + log_it( L_INFO, "Got signal to close %u socket, closing...[ %u ]", dap_cur->socket, tn ); + + dap_server_remove_socket( dap_cur ); + dap_client_remote_remove( dap_cur ); + + pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); + } + + } // for + + if ( cur_time >= next_time_timeout_check ) { + + s_socket_all_check_activity( tn, cur_time ); + next_time_timeout_check = cur_time + SOCKETS_TIMEOUT_CHECK_PERIOD; + } + + pthread_mutex_lock( &dsth->mutex_dlist_add_remove ); + if ( !dsth->to_kill_count ) { + + pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); + continue; + } + + dap_cur = dsth->dap_clients_to_kill; + + do { + + if ( dap_cur->no_close ) { + dap_cur = dap_cur->knext; + continue; + } + + log_it( L_INFO, "Kill %u socket ...............[ thread %u ]", dap_cur->socket, tn ); + + tmp = dap_cur->knext; + DL_LIST_REMOVE_NODE( dsth->dap_clients_to_kill, dap_cur, kprev, knext, dsth->to_kill_count ); + + dap_server_remove_socket( dap_cur ); + dap_client_remote_remove( dap_cur ); + dap_cur = tmp; + + } while ( dap_cur ); + + log_it( L_INFO, "[ Thread %u ] coneections: %u, to kill: %u", tn, dsth->connections_count, dsth->to_kill_count ); + pthread_mutex_unlock( &dsth->mutex_dlist_add_remove ); + + } while( !bQuitSignal ); + + return NULL; +} + +/* +========================================================= + dap_server_loop( ) + + Main server loop + + @param a_server Server instance + @return Zero if ok others if not +========================================================= +*/ +int32_t dap_server_loop( dap_server_t *d_server ) +{ + static uint32_t pickthread = 0; // just for test + pthread_t thread_listener[ DAP_MAX_THREADS ]; + + if ( !d_server ) return 1; + + for( uint32_t i = 0; i < _count_threads; ++i ) { + + EPOLL_HANDLE efd = epoll_create1( 0 ); +// log_it( L_ERROR, "EPOLL_HANDLE efd %u for thread %u created", efd, i ); + if ( (intptr_t)efd == -1 ) { + log_it( L_ERROR, "Server wakeup no events / error" ); + goto error; + } + dap_server_threads[ i ].epoll_fd = efd; + dap_server_threads[ i ].thread_num = i; + } + + for( uint32_t i = 0; i < _count_threads; ++i ) { + pthread_create( &thread_listener[i], NULL, thread_loop, &dap_server_threads[i] ); + } + + _current_run_server = d_server; + + EPOLL_HANDLE efd = epoll_create1( 0 ); + if ( (intptr_t)efd == -1 ) + goto error; + + struct epoll_event pev; + struct epoll_event events[ 16 ]; + + pev.events = EPOLLIN | EPOLLERR; + pev.data.fd = d_server->socket_listener; + + if( epoll_ctl( efd, EPOLL_CTL_ADD, d_server->socket_listener, &pev) != 0 ) { + log_it( L_ERROR, "epoll_ctl failed 004" ); + goto error; + } + + while( !bQuitSignal ) { + + int32_t n = epoll_wait( efd, &events[0], 16, -1 ); + + if ( bQuitSignal ) + break; + + if ( n <= 0 ) { + if ( errno == EINTR ) + continue; + log_it( L_ERROR, "Server wakeup no events / error" ); + break; + } + + for( int32_t i = 0; i < n; ++ i ) { + + if ( events[i].events & EPOLLIN ) { + + int client_fd = accept( events[i].data.fd, 0, 0 ); + + if ( client_fd < 0 ) { + log_it( L_ERROR, "accept_cb: error accept socket"); + continue; + } + + set_nonblock_socket( client_fd ); + dap_server_add_socket( client_fd, -1 ); + } + else if( events[i].events & EPOLLERR ) { + log_it( L_ERROR, "Server socket error event" ); + goto exit; + } + + } // for + + } // while + +exit:; + + #ifndef _WIN32 + close( efd ); + #else + epoll_close( efd ); + #endif +error:; + + bQuitSignal = true; + + for( uint32_t i = 0; i < _count_threads; ++i ) { + if ( (intptr_t)dap_server_threads[ i ].epoll_fd != -1 ) { + #ifndef _WIN32 + close( dap_server_threads[ i ].epoll_fd ); + #else + epoll_close( dap_server_threads[ i ].epoll_fd ); + #endif + } + } + + return 0; +} diff --git a/libdap-server-core/src/dap_traffic_track.c b/libdap-server-core/src/dap_traffic_track.c new file mode 100755 index 0000000000000000000000000000000000000000..ad9c6c661d8ea3dfb9b303a9fc4a4bb3e29f02d9 --- /dev/null +++ b/libdap-server-core/src/dap_traffic_track.c @@ -0,0 +1,228 @@ +/* + * Authors: + * Anatoliy Kurotich <anatoliy.kurotich@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2017-2018 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <string.h> +#include <time.h> +#include <stdio.h> +#include <stdlib.h> +#include <stddef.h> + +#ifndef _WIN32 +#include <pthread.h> +#include <ev.h> +#else +#include <winsock2.h> +#include <windows.h> +#include <mswsock.h> +#include <ws2tcpip.h> +#include <io.h> +#include <pthread.h> +#endif + +#include "dap_traffic_track.h" +#include "dap_common.h" +#include "dap_cpu_monitor.h" + +#define LOG_TAG "dap_traffic_track" + +#define BITS_IN_BYTE 8 +#define ALLOC_STEP 100 + +static dap_traffic_callback_t _callback = NULL; +static dap_server_t *_dap_server; + +#ifndef _WIN32 +static ev_timer _timeout_watcher; +static struct ev_loop *loop; +#else +static HANDLE _timeout_watcher; +#endif +static size_t timertimeout = 1; + +static pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t _cond = PTHREAD_COND_INITIALIZER; +static pthread_t worker_thread; +static bool _stop_worker_signal = false; + + +/** + * @brief calculate_mbits_speed + * @param count_bytes + * @details timeout we gots from _timeout_watcher.repeat + * @return mbit/second speed + */ +static double _calculate_mbits_speed( size_t count_bytes ) +{ + size_t bits_per_second = (count_bytes / timertimeout) * BITS_IN_BYTE; + // log_it(L_DEBUG, "TIMEOUT: %d, bits_per_second: %d mbits: %f", + // (size_t)_timeout_watcher.repeat, bits_per_second, bits_per_second / 1000000.0); + return (double)bits_per_second / 1000000.0; // convert to mbits +} + +void *_worker_run( void *a ) +{ + (void)a; + + pthread_mutex_lock( &_mutex ); + + while( true ) { + pthread_cond_wait( &_cond, &_mutex ); + if ( _stop_worker_signal ) { + log_it(L_INFO, "Dap traffic track worker stopped"); + _stop_worker_signal = false; + break; + } + _callback( _dap_server ); + } + + pthread_mutex_unlock( &_mutex ); + pthread_exit( NULL ); + + return NULL; +} + +void _worker_start( ) +{ + pthread_mutex_init( &_mutex, NULL ); + pthread_cond_init( &_cond, NULL ); + pthread_create( &worker_thread, NULL, _worker_run, NULL ); +} + +void _worker_stop() +{ + pthread_mutex_lock( &_mutex ); + _stop_worker_signal = true; + pthread_cond_signal( &_cond ); + pthread_mutex_unlock( &_mutex ); + + // wait for exit worker_thread + pthread_join( worker_thread, NULL ); + + pthread_mutex_destroy( &_mutex ); + pthread_cond_destroy( &_cond ); + _callback = NULL; +} + +#ifndef _WIN32 +static void _timeout_cb( ) +#else +VOID CALLBACK _timeout_cb( void *lpParameter, BOOL TimerOrWaitFired ) +#endif +{ +#if 0 + pthread_mutex_lock( &_dap_server->mutex_on_hash ); + + size_t count_users = HASH_COUNT(_dap_server->clients ); + + if ( count_users ) { +// size_t idx = 0; + dap_client_remote_t *dap_cur, *tmp; + HASH_ITER( hh, _dap_server->clients, dap_cur, tmp ) { + + dap_cur->upload_stat.speed_mbs = _calculate_mbits_speed( dap_cur->upload_stat.buf_size_total - + dap_cur->upload_stat.buf_size_total_old ); + + dap_cur->upload_stat.buf_size_total_old = dap_cur->upload_stat.buf_size_total; + + dap_cur->download_stat.speed_mbs = _calculate_mbits_speed( dap_cur->download_stat.buf_size_total - + dap_cur->download_stat.buf_size_total_old ); + + dap_cur->download_stat.buf_size_total_old = dap_cur->download_stat.buf_size_total; + +// idx ++; + } + } + + /* TODO find some better solution and place for this line */ + _dap_server->cpu_stats = dap_cpu_get_stats( ); + + pthread_mutex_unlock( &_dap_server->mutex_on_hash ); +#endif + + if ( _callback != NULL ) { + pthread_mutex_lock( &_mutex ); + pthread_cond_signal( &_cond ); + pthread_mutex_unlock( &_mutex ); + } +} + +void dap_traffic_track_init( dap_server_t * server, + time_t timeout ) +{ + dap_cpu_monitor_init( ); + + _dap_server = server; +#ifndef _WIN32 + _timeout_watcher.repeat = timeout; + + loop = EV_DEFAULT; + + ev_init( &_timeout_watcher, _timeout_cb ); + ev_timer_again( loop, &_timeout_watcher ); +#else + + timertimeout = timeout; + + CreateTimerQueueTimer( &_timeout_watcher, NULL, (WAITORTIMERCALLBACK)_timeout_cb, NULL, timertimeout, timertimeout, 0 ); + +#endif + + log_it(L_NOTICE, "Initialized traffic track module"); +} + +void dap_traffic_track_deinit() +{ + if ( _callback != NULL ) + _worker_stop(); + +#ifndef _WIN32 + ev_timer_stop( loop, &_timeout_watcher ); + ev_loop_destroy( loop ); +#else + DeleteTimerQueueTimer( NULL, _timeout_watcher, NULL ); +#endif + + log_it( L_NOTICE, "Deinitialized traffic track module" ); + dap_cpu_monitor_deinit( ); +} + +void dap_traffic_callback_stop() { + + if ( _callback == NULL ) { + log_it( L_WARNING, "worker not running" ); + return; + } + _worker_stop(); +} + +void dap_traffic_callback_set(dap_traffic_callback_t cb) +{ + if( _callback == NULL ) { + _callback = cb; + _worker_start(); + return; + } + + log_it( L_WARNING, "Callback already setted" ); +} diff --git a/libdap-server-core/test/CMakeLists.txt b/libdap-server-core/test/CMakeLists.txt new file mode 100755 index 0000000000000000000000000000000000000000..1149697b7fd05108808487b46e94be484231cdd7 --- /dev/null +++ b/libdap-server-core/test/CMakeLists.txt @@ -0,0 +1,17 @@ +project(server_core_test) + +if ( NOT ( TARGET dap_test ) ) + add_subdirectory(libdap-test) +endif() + +file(GLOB DAP_SERVER_CORE_TEST_SOURCES *.c) +file(GLOB DAP_SERVER_CORE_TEST_HEADERS *.h) + +add_executable(${PROJECT_NAME} ${DAP_SERVER_CORE_TEST_SOURCES} ${DAP_SERVER_CORE_TEST_HEADERS}) + +target_link_libraries(${PROJECT_NAME} dap_test dap_core dap_crypto dap_server_core ev) + +add_test( + NAME server_core_test + COMMAND server_core_test +) diff --git a/libdap-server-core/test/dap_traffic_track_test.c b/libdap-server-core/test/dap_traffic_track_test.c new file mode 100755 index 0000000000000000000000000000000000000000..a7f70ddcf478f7f2ab9aa6d961381a8aed5c051d --- /dev/null +++ b/libdap-server-core/test/dap_traffic_track_test.c @@ -0,0 +1,69 @@ +#include "dap_traffic_track_test.h" +#include <unistd.h> +#include <ev.h> +#include <math.h> +#if 0 + +static struct ev_loop *loop; + +static struct moc_dap_clients_remote { + dap_client_remote_t ** clients; + size_t count; +} moc_dap_clients_remote; + +static dap_server_t * _dap_server; + +// false == test failed +static bool is_callback_result_success = false; + +static void success_callback(dap_server_t* server) +{ + dap_pass_msg("Call success_callback"); + pthread_mutex_lock(&_dap_server->mutex_on_hash); + size_t cnt = HASH_COUNT(server->clients); + pthread_mutex_unlock(&_dap_server->mutex_on_hash); + dap_assert(cnt == moc_dap_clients_remote.count, "Dap server amount clients"); + is_callback_result_success = true; +} + +static void test_callback() { + time_t timeout_sucess = 1; + dap_traffic_track_init(_dap_server, timeout_sucess); + dap_traffic_callback_set(success_callback); + + loop = EV_DEFAULT; + ev_run(loop, EVRUN_ONCE); + + usleep(10000); // wait for callback + dap_assert(is_callback_result_success, "Callback_result"); + dap_traffic_callback_stop(); +} + + +void init_test_case() { + _dap_server = DAP_NEW_Z(dap_server_t); + moc_dap_clients_remote.count = 111; + moc_dap_clients_remote.clients = calloc(moc_dap_clients_remote.count, + sizeof(dap_client_remote_t *)); + for(size_t i = 0, j = 0; (i < moc_dap_clients_remote.count) && (j = i + 1); i++) { + moc_dap_clients_remote.clients[i] = + dap_client_remote_create(_dap_server, j, NULL); + } +} + +void cleanup_test_case() { + for(size_t i = 0; i < moc_dap_clients_remote.count; i++) + dap_client_remote_remove(moc_dap_clients_remote.clients[i], _dap_server); + ev_loop_destroy(loop); + DAP_DELETE(moc_dap_clients_remote.clients); + DAP_DELETE(_dap_server); +} + + +void dap_traffic_track_tests_run(void) { + dap_print_module_name("traffic_track"); + init_test_case(); + test_callback(); + cleanup_test_case(); +} +#endif diff --git a/libdap-server-core/test/dap_traffic_track_test.h b/libdap-server-core/test/dap_traffic_track_test.h new file mode 100755 index 0000000000000000000000000000000000000000..8aad876346d98cca5e82e138c92ffe5d324d1cac --- /dev/null +++ b/libdap-server-core/test/dap_traffic_track_test.h @@ -0,0 +1,7 @@ +#pragma once +#include "dap_test.h" +#include "dap_traffic_track.h" +#include "dap_common.h" +#include "dap_server.h" + +extern void dap_traffic_track_tests_run(void); diff --git a/libdap-server-core/test/libdap-test b/libdap-server-core/test/libdap-test new file mode 160000 index 0000000000000000000000000000000000000000..b76175acc517f085c319c8e66c62bd143f96bf94 --- /dev/null +++ b/libdap-server-core/test/libdap-test @@ -0,0 +1 @@ +Subproject commit b76175acc517f085c319c8e66c62bd143f96bf94 diff --git a/libdap-server-core/test/main.c b/libdap-server-core/test/main.c new file mode 100755 index 0000000000000000000000000000000000000000..611533844021795eaaac8db8156e787e6cc0816b --- /dev/null +++ b/libdap-server-core/test/main.c @@ -0,0 +1,9 @@ +#include "dap_common.h" +#include "dap_traffic_track_test.h" + +int main(int argc, const char * argv[]) { + //dap_log_level_set(L_CRITICAL); + //dap_traffic_track_tests_run(); + return 0; +} +