Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/libdap-server-core
1 result
Show changes
Commits on Source (35)
[submodule "libdap"]
path = libdap
url = https://github.com/kelvinblockchain/libdap
url = https://gitlab.demlabs.net/cellframe/libdap
branch = master
[submodule "libdap-crypto"]
path = libdap-crypto
url = https://github.com/cellframe/libdap-crypto
url = https://gitlab.demlabs.net/cellframe/libdap-crypto
branch = master
[submodule "test/libdap-test"]
path = test/libdap-test
url = https://github.com/kelvinblockchain/libdap-test
url = https://gitlab.demlabs.net/cellframe/libdap-test
branch = master
......@@ -17,12 +17,36 @@ if(NOT (${SUBMODULES_NO_BUILD} MATCHES ON))
endif()
file(GLOB DAP_SERVER_CORE_SOURCES src/*.c)
file(GLOB DAP_SERVER_CORE_HEADERS include/*.h)
if(WIN32)
file(GLOB DAP_SERVER_CORE_SOURCES src/*.c ../sources/wepoll/*.c)
file(GLOB DAP_SERVER_CORE_HEADERS include/*.h ../sources/wepoll/*.h)
else()
file(GLOB DAP_SERVER_CORE_SOURCES src/*.c)
file(GLOB DAP_SERVER_CORE_HEADERS include/*.h)
endif()
if(WIN32)
include_directories(../libdap/src/win32/)
include_directories(../3rdparty/uthash/src/)
include_directories(../3rdparty/libjson-c/)
include_directories(../3rdparty/curl/include/)
endif()
add_library(${PROJECT_NAME} STATIC ${DAP_SERVER_CORE_HEADERS} ${DAP_SERVER_CORE_SOURCES})
target_link_libraries(${PROJECT_NAME} dap_core dap_crypto pthread memcached ev)
if(WIN32)
target_link_libraries(${PROJECT_NAME} dap_core dap_crypto)
endif()
if(UNIX)
target_link_libraries(${PROJECT_NAME} dap_core dap_crypto ev)
if(NOT ANDROID)
target_link_libraries(${PROJECT_NAME} pthread)
endif()
endif()
target_include_directories(${PROJECT_NAME} PUBLIC include)
target_include_directories(${PROJECT_NAME} PRIVATE src)
......
/*
* Authors:
* Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net>
* DeM Labs Inc. https://demlabs.net
* Kelvin Project https://github.com/kelvinblockchain
* Copyright (c) 2017-2018
* All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <stdint.h>
#include <stddef.h>
#include <stdbool.h>
#include "uthash.h"
#ifndef WIN32
#include <sys/epoll.h>
#include <sys/timerfd.h>
#endif
#ifndef EPOLL_HANDLE
#ifndef WIN32
#define EPOLL_HANDLE int
#else
#define EPOLL_HANDLE HANDLE
#endif
#endif
typedef char str_ip[16];
typedef struct dap_server dap_server_t;
typedef struct dap_server_thread_s dap_server_thread_t;
struct dap_client_remote;
typedef void (*dap_server_client_callback_t) (struct dap_client_remote *,void * arg); // Callback for specific client operations
#define DAP_CLIENT_REMOTE_BUF 500000
#define CLIENT_ID_SIZE 12
typedef char dap_server_client_id[CLIENT_ID_SIZE];
typedef struct traffic_stats {
size_t buf_size_total;
size_t buf_size_total_old; // for calculate speed
double speed_mbs; // MegaBits per second
} traffic_stats_t;
typedef struct dap_client_remote {
int socket;
dap_server_client_id id;
uint32_t flags;
bool no_close;
bool kill_signal;
uint16_t port;
str_ip s_ip;
uint32_t buf_out_zero_count;
char buf_in[DAP_CLIENT_REMOTE_BUF]; // Internal buffer for input data
size_t buf_in_size; // size of data that is in the input buffer
traffic_stats_t upload_stat;
traffic_stats_t download_stat;
char buf_out[DAP_CLIENT_REMOTE_BUF]; // Internal buffer for output data
size_t buf_out_offset;
char hostaddr[1024]; // Address
char service[128];
size_t buf_out_size; // size of data that is in the output buffer
struct epoll_event pevent;
EPOLL_HANDLE efd; // Epoll fd
int tn; // working thread index
time_t time_connection;
time_t last_time_active;
struct dap_server *server;
dap_server_thread_t *thread;
UT_hash_handle hh;
struct dap_client_remote *next, *prev;
struct dap_client_remote *knext, *kprev;
void *_internal;
void *_inheritor; // Internal data to specific client type, usualy states for state machine
} dap_client_remote_t; // Node of bidirectional list of clients
int dap_client_remote_init( void ); // Init clients module
void dap_client_remote_deinit( void ); // Deinit clients module
dap_client_remote_t *dap_client_remote_create( dap_server_t *sh, int s, dap_server_thread_t *dsth );
dap_client_remote_t *dap_client_remote_find( int sock, dap_server_thread_t *t );
bool dap_client_remote_is_ready_to_read( dap_client_remote_t * sc );
bool dap_client_remote_is_ready_to_write( dap_client_remote_t * sc );
void dap_client_remote_ready_to_read( dap_client_remote_t * sc,bool is_ready );
void dap_client_remote_ready_to_write( dap_client_remote_t * sc,bool is_ready );
size_t dap_client_remote_write( dap_client_remote_t *sc, const void * data, size_t data_size );
size_t dap_client_remote_write_f( dap_client_remote_t *a_client, const char * a_format,... );
size_t dap_client_remote_read( dap_client_remote_t *sc, void * data, size_t data_size );
//void dap_client_remote_remove( dap_client_remote_t *sc, struct dap_server * sh ); // Removes the client from the list
void dap_client_remote_remove( dap_client_remote_t *sc );
void dap_client_remote_shrink_buf_in( dap_client_remote_t * cl, size_t shrink_size );
/*
* Authors:
* Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net>
* DeM Labs Inc. https://demlabs.net
* Kelvin Project https://github.com/kelvinblockchain
* Copyright (c) 2017-2018
* All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <stdint.h>
#include <stddef.h>
#include <stdbool.h>
#include "uthash.h"
#ifndef WIN32
#include <sys/epoll.h>
#include <sys/timerfd.h>
#endif
#ifndef EPOLL_HANDLE
#ifndef WIN32
#define EPOLL_HANDLE int
#else
#define EPOLL_HANDLE HANDLE
#include "wepoll.h"
#endif
#endif
typedef char str_ip[16];
typedef struct dap_server dap_server_t;
typedef struct dap_server_thread_s dap_server_thread_t;
struct dap_client_remote;
typedef void (*dap_server_client_callback_t) (struct dap_client_remote *,void * arg); // Callback for specific client operations
#define DAP_CLIENT_REMOTE_BUF 500000
#define CLIENT_ID_SIZE 12
typedef char dap_server_client_id[CLIENT_ID_SIZE];
typedef struct traffic_stats {
size_t buf_size_total;
size_t buf_size_total_old; // for calculate speed
double speed_mbs; // MegaBits per second
} traffic_stats_t;
typedef struct dap_client_remote {
int socket;
dap_server_client_id id;
uint32_t flags;
bool no_close;
bool kill_signal;
uint16_t port;
str_ip s_ip;
uint32_t buf_out_zero_count;
char buf_in[DAP_CLIENT_REMOTE_BUF]; // Internal buffer for input data
size_t buf_in_size; // size of data that is in the input buffer
traffic_stats_t upload_stat;
traffic_stats_t download_stat;
char buf_out[DAP_CLIENT_REMOTE_BUF]; // Internal buffer for output data
size_t buf_out_offset;
char hostaddr[1024]; // Address
char service[128];
size_t buf_out_size; // size of data that is in the output buffer
struct epoll_event pevent;
EPOLL_HANDLE efd; // Epoll fd
int tn; // working thread index
time_t time_connection;
time_t last_time_active;
struct dap_server *server;
dap_server_thread_t *thread;
UT_hash_handle hh;
struct dap_client_remote *next, *prev;
struct dap_client_remote *knext, *kprev;
void *_internal;
void *_inheritor; // Internal data to specific client type, usualy states for state machine
} dap_client_remote_t; // Node of bidirectional list of clients
int dap_client_remote_init( void ); // Init clients module
void dap_client_remote_deinit( void ); // Deinit clients module
dap_client_remote_t *dap_client_remote_create( dap_server_t *sh, int s, dap_server_thread_t *dsth );
dap_client_remote_t *dap_client_remote_find( int sock, dap_server_thread_t *t );
bool dap_client_remote_is_ready_to_read( dap_client_remote_t * sc );
bool dap_client_remote_is_ready_to_write( dap_client_remote_t * sc );
void dap_client_remote_ready_to_read( dap_client_remote_t * sc,bool is_ready );
void dap_client_remote_ready_to_write( dap_client_remote_t * sc,bool is_ready );
size_t dap_client_remote_write( dap_client_remote_t *sc, const void * data, size_t data_size );
size_t dap_client_remote_write_f( dap_client_remote_t *a_client, const char * a_format,... );
size_t dap_client_remote_read( dap_client_remote_t *sc, void * data, size_t data_size );
//void dap_client_remote_remove( dap_client_remote_t *sc, struct dap_server * sh ); // Removes the client from the list
void dap_client_remote_remove( dap_client_remote_t *sc );
void dap_client_remote_shrink_buf_in( dap_client_remote_t * cl, size_t shrink_size );
/*
* Authors:
* Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net>
* DeM Labs Inc. https://demlabs.net
* Kelvin Project https://github.com/kelvinblockchain
* Copyright (c) 2017-2019
* All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#ifndef WIN32
#include <netinet/in.h>
#include <stdint.h>
#include <pthread.h>
#define EPOLL_HANDLE int
#else
#define EPOLL_HANDLE HANDLE
#endif
#include "uthash.h"
#include "dap_events_socket.h"
#include "dap_server.h"
struct dap_events;
typedef void (*dap_events_callback_t) (struct dap_events *, void *arg); // Callback for specific server's operations
typedef struct dap_thread {
pthread_t tid;
} dap_thread_t;
struct dap_worker;
typedef struct dap_events {
dap_events_socket_t *sockets; // Hashmap of event sockets
dap_events_socket_t *dlsockets; // Dlist of event sockets
dap_events_socket_t *to_kill_sockets; // Dlist of event sockets
pthread_rwlock_t sockets_rwlock;
void *_inheritor; // Pointer to the internal data, HTTP for example
dap_thread_t proc_thread;
pthread_rwlock_t servers_rwlock;
} dap_events_t;
typedef struct dap_worker
{
uint32_t event_sockets_count;
uint32_t event_to_kill_count;
EPOLL_HANDLE epoll_fd;
uint32_t number_thread;
pthread_mutex_t locker_on_count;
dap_events_t *events;
} dap_worker_t;
int32_t dap_events_init( uint32_t a_threads_count, size_t conn_t ); // Init server module
void dap_events_deinit( ); // Deinit server module
void dap_events_thread_wake_up( dap_thread_t *th );
dap_events_t* dap_events_new( );
void dap_events_delete( dap_events_t * sh );
//void dap_events_socket_remove_and_delete( dap_events_socket_t* a_es );
void dap_events_socket_remove_and_delete(dap_events_socket_t* a_es, bool preserve_inheritor );
void dap_events_kill_socket( dap_events_socket_t *a_es );
int32_t dap_events_start( dap_events_t *sh );
int32_t dap_events_wait( dap_events_t *sh );
uint32_t dap_worker_get_index_min( );
dap_worker_t *dap_worker_get_min( );
void dap_worker_add_events_socket( dap_events_socket_t * a_events_socket );
void dap_worker_print_all( );
/*
* Authors:
* Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net>
* DeM Labs Inc. https://demlabs.net
* Kelvin Project https://github.com/kelvinblockchain
* Copyright (c) 2017-2019
* All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#ifndef WIN32
#include <netinet/in.h>
#include <stdint.h>
#include <pthread.h>
#define EPOLL_HANDLE int
#else
#define EPOLL_HANDLE HANDLE
#endif
#include "uthash.h"
#include "dap_events_socket.h"
#include "dap_server.h"
struct dap_events;
typedef void (*dap_events_callback_t) (struct dap_events *, void *arg); // Callback for specific server's operations
typedef struct dap_thread {
pthread_t tid;
} dap_thread_t;
struct dap_worker;
typedef struct dap_events {
dap_events_socket_t *sockets; // Hashmap of event sockets
dap_events_socket_t *dlsockets; // Dlist of event sockets
dap_events_socket_t *to_kill_sockets; // Dlist of event sockets
pthread_rwlock_t sockets_rwlock;
void *_inheritor; // Pointer to the internal data, HTTP for example
dap_thread_t proc_thread;
pthread_rwlock_t servers_rwlock;
} dap_events_t;
typedef struct dap_worker
{
uint32_t event_sockets_count;
//uint32_t event_to_kill_count;
EPOLL_HANDLE epoll_fd;
uint32_t number_thread;
pthread_mutex_t locker_on_count;
dap_events_t *events;
} dap_worker_t;
int32_t dap_events_init( uint32_t a_threads_count, size_t conn_t ); // Init server module
void dap_events_deinit( ); // Deinit server module
void dap_events_thread_wake_up( dap_thread_t *th );
dap_events_t* dap_events_new( );
void dap_events_delete( dap_events_t * sh );
int32_t dap_events_start( dap_events_t *sh );
int32_t dap_events_wait( dap_events_t *sh );
uint32_t dap_worker_get_index_min( );
dap_worker_t *dap_worker_get_min( );
void dap_worker_add_events_socket( dap_events_socket_t * a_events_socket );
void dap_worker_print_all( );
/*
* Authors:
* Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net>
* DeM Labs Inc. https://demlabs.net
* Kelvin Project https://github.com/kelvinblockchain
* Copyright (c) 2017-2019
* All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <stdint.h>
#include <stddef.h>
#include <stdbool.h>
#include "uthash.h"
#ifndef _WIN32
#include <sys/epoll.h>
#endif
struct dap_events;
struct dap_events_socket;
struct dap_worker;
typedef struct dap_server dap_server_t;
typedef void (*dap_events_socket_callback_t) (struct dap_events_socket *,void * arg); // Callback for specific client operations
typedef struct dap_events_socket_callbacks {
dap_events_socket_callback_t new_callback; // Create new client callback
dap_events_socket_callback_t delete_callback; // Delete client callback
dap_events_socket_callback_t read_callback; // Read function
dap_events_socket_callback_t write_callback; // Write function
dap_events_socket_callback_t error_callback; // Error processing function
} dap_events_socket_callbacks_t;
#define DAP_EVENTS_SOCKET_BUF 100000
#if 0
typedef struct dap_events_socket {
int socket;
bool signal_close;
bool _ready_to_write;
bool _ready_to_read;
uint32_t buf_out_zero_count;
union{
uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data
char buf_in_str[DAP_EVENTS_SOCKET_BUF+1];
};
size_t buf_in_size; // size of data that is in the input buffer
uint8_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data
char hostaddr[1024]; // Address
char service[128];
size_t buf_out_size; // size of data that is in the output buffer
struct dap_events *events;
struct dap_worker *dap_worker;
dap_events_socket_callbacks_t *callbacks;
time_t time_connection;
time_t last_ping_request;
bool is_pingable;
UT_hash_handle hh;
void * _inheritor; // Inheritor data to specific client type, usualy states for state machine
} dap_events_socket_t; // Node of bidirectional list of clients
#endif
typedef struct dap_events_socket {
int32_t socket;
uint32_t flags;
// bool signal_close;
bool no_close;
bool kill_signal;
// bool _ready_to_write;
// bool _ready_to_read;
uint32_t buf_out_zero_count;
union{
uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data
char buf_in_str[DAP_EVENTS_SOCKET_BUF+1];
};
size_t buf_in_size; // size of data that is in the input buffer
uint8_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data
char hostaddr[1024]; // Address
char service[128];
size_t buf_out_size; // size of data that is in the output buffer
struct dap_events *events;
struct dap_worker *dap_worker;
struct epoll_event ev;
dap_events_socket_callbacks_t *callbacks;
time_t time_connection;
time_t last_time_active;
time_t last_ping_request;
bool is_pingable;
UT_hash_handle hh;
struct dap_events_socket *next, *prev;
struct dap_events_socket *knext, *kprev;
void *_inheritor; // Inheritor data to specific client type, usualy states for state machine
} dap_events_socket_t; // Node of bidirectional list of clients
int dap_events_socket_init(); // Init clients module
void dap_events_socket_deinit(); // Deinit clients module
void dap_events_socket_create_after(dap_events_socket_t * a_es);
dap_events_socket_t * dap_events_socket_wrap_no_add(struct dap_events * a_events,
int s, dap_events_socket_callbacks_t * a_callbacks); // Create new client and add it to the list
dap_events_socket_t * dap_events_socket_find(int sock, struct dap_events * sh); // Find client by socket
bool dap_events_socket_is_ready_to_read(dap_events_socket_t * sc);
bool dap_events_socket_is_ready_to_write(dap_events_socket_t * sc);
void dap_events_socket_set_readable(dap_events_socket_t * sc,bool is_ready);
void dap_events_socket_set_writable(dap_events_socket_t * sc,bool is_ready);
size_t dap_events_socket_write(dap_events_socket_t *sc, const void * data, size_t data_size);
size_t dap_events_socket_write_f(dap_events_socket_t *sc, const char * format,...);
size_t dap_events_socket_read(dap_events_socket_t *sc, void * data, size_t data_size);
void dap_events_socket_delete(dap_events_socket_t *sc,bool preserve_inheritor); // Removes the client from the list
void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size);
/*
* Authors:
* Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net>
* DeM Labs Inc. https://demlabs.net
* Kelvin Project https://github.com/kelvinblockchain
* Copyright (c) 2017-2019
* All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <stdint.h>
#include <stddef.h>
#include <stdbool.h>
#include "uthash.h"
#ifndef _WIN32
#include <sys/epoll.h>
#else
#include "wepoll.h"
#endif
struct dap_events;
struct dap_events_socket;
struct dap_worker;
typedef struct dap_server dap_server_t;
typedef void (*dap_events_socket_callback_t) (struct dap_events_socket *,void * arg); // Callback for specific client operations
typedef struct dap_events_socket_callbacks {
dap_events_socket_callback_t new_callback; // Create new client callback
dap_events_socket_callback_t delete_callback; // Delete client callback
dap_events_socket_callback_t read_callback; // Read function
dap_events_socket_callback_t write_callback; // Write function
dap_events_socket_callback_t error_callback; // Error processing function
} dap_events_socket_callbacks_t;
#define DAP_EVENTS_SOCKET_BUF 100000
#if 0
typedef struct dap_events_socket {
int socket;
bool signal_close;
bool _ready_to_write;
bool _ready_to_read;
uint32_t buf_out_zero_count;
union{
uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data
char buf_in_str[DAP_EVENTS_SOCKET_BUF+1];
};
size_t buf_in_size; // size of data that is in the input buffer
uint8_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data
char hostaddr[1024]; // Address
char service[128];
size_t buf_out_size; // size of data that is in the output buffer
struct dap_events *events;
struct dap_worker *dap_worker;
dap_events_socket_callbacks_t *callbacks;
time_t time_connection;
time_t last_ping_request;
bool is_pingable;
UT_hash_handle hh;
void * _inheritor; // Inheritor data to specific client type, usualy states for state machine
} dap_events_socket_t; // Node of bidirectional list of clients
#endif
typedef struct dap_events_socket {
int32_t socket;
uint32_t flags;
// bool signal_close;
bool no_close;
bool kill_signal;
// bool _ready_to_write;
// bool _ready_to_read;
uint32_t buf_out_zero_count;
union{
uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data
char buf_in_str[DAP_EVENTS_SOCKET_BUF+1];
};
size_t buf_in_size; // size of data that is in the input buffer
uint8_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data
char hostaddr[1024]; // Address
char service[128];
size_t buf_out_size; // size of data that is in the output buffer
struct dap_events *events;
struct dap_worker *dap_worker;
struct epoll_event ev;
dap_events_socket_callbacks_t *callbacks;
time_t time_connection;
time_t last_time_active;
time_t last_ping_request;
bool is_pingable;
UT_hash_handle hh;
struct dap_events_socket *next, *prev;
struct dap_events_socket *knext, *kprev;
void *_inheritor; // Inheritor data to specific client type, usualy states for state machine
} dap_events_socket_t; // Node of bidirectional list of clients
int dap_events_socket_init(); // Init clients module
void dap_events_socket_deinit(); // Deinit clients module
void dap_events_socket_create_after(dap_events_socket_t * a_es);
dap_events_socket_t * dap_events_socket_wrap_no_add(struct dap_events * a_events,
int s, dap_events_socket_callbacks_t * a_callbacks); // Create new client and add it to the list
dap_events_socket_t * dap_events_socket_find(int sock, struct dap_events * sh); // Find client by socket
bool dap_events_socket_is_ready_to_read(dap_events_socket_t * sc);
bool dap_events_socket_is_ready_to_write(dap_events_socket_t * sc);
void dap_events_socket_set_readable(dap_events_socket_t * sc,bool is_ready);
void dap_events_socket_set_writable(dap_events_socket_t * sc,bool is_ready);
size_t dap_events_socket_write(dap_events_socket_t *sc, const void * data, size_t data_size);
size_t dap_events_socket_write_f(dap_events_socket_t *sc, const char * format,...);
size_t dap_events_socket_read(dap_events_socket_t *sc, void * data, size_t data_size);
void dap_events_socket_remove( dap_events_socket_t *a_es);
void dap_events_socket_delete(dap_events_socket_t *sc,bool preserve_inheritor); // Removes the client from the list
void dap_events_socket_remove_and_delete(dap_events_socket_t* a_es, bool preserve_inheritor );
int dap_events_socket_kill_socket( dap_events_socket_t *a_es );
void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size);
#pragma once
#include <stdint.h>
#include <stdbool.h>
#include "dap_common.h"
/**
* @brief dap_memcached_init
* @param server_host
* @param port
* @return
*/
int dap_memcached_init(const char *server_host, uint16_t port);
/**
* @brief is_dap_memcache_enable
* @return
*/
bool dap_memcache_is_enable(void);
/**
* @brief dap_memcached_deinit
*/
void dap_memcached_deinit(void);
/**
* @brief dap_memcache_put
* @param key
* @param value
* @param value_size
* @param expiration if 0 value is never expire
* @return
*/
bool dap_memcache_put(const char* key, void *value, size_t value_size, time_t expiration);
/**
* @brief dap_memcache_get
* @param key
* @return true if key found
*/
bool dap_memcache_get(const char* key, size_t * value_size, void ** result);
/**
* @brief dap_memcache_delete
* @param key
* @return
*/
bool dap_memcache_delete(const char* key);
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#ifndef _WIN32
#include <netinet/in.h>
#include <stdint.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#define EPOLL_HANDLE int
#else
#define EPOLL_HANDLE HANDLE
#endif
#include "uthash.h"
#include "utlist.h"
#include "dap_cpu_monitor.h"
#include "dap_client_remote.h"
typedef enum dap_server_type {DAP_SERVER_TCP} dap_server_type_t;
#define BIT( x ) ( 1 << x )
#define DAP_SOCK_READY_TO_READ BIT( 0 )
#define DAP_SOCK_READY_TO_WRITE BIT( 1 )
#define DAP_SOCK_SIGNAL_CLOSE BIT( 2 )
#define DAP_SOCK_ACTIVE BIT( 3 )
typedef struct dap_server_thread_s {
EPOLL_HANDLE epoll_fd;
uint32_t thread_num;
uint32_t connections_count;
uint32_t to_kill_count;
struct epoll_event *epoll_events;
dap_client_remote_t *dap_remote_clients;
dap_client_remote_t *hclients; // Hashmap of clients
dap_client_remote_t *dap_clients_to_kill;
pthread_mutex_t mutex_dlist_add_remove;
pthread_mutex_t mutex_on_hash;
} dap_server_thread_t;
struct dap_server;
typedef void (*dap_server_callback_t)( struct dap_server *,void * arg ); // Callback for specific server's operations
typedef struct dap_server {
dap_server_type_t type; // Server's type
uint16_t port; // Listen port
char *address; // Listen address
int32_t socket_listener; // Socket for listener
EPOLL_HANDLE epoll_fd; // Epoll fd
struct sockaddr_in listener_addr; // Kernel structure for listener's binded address
void *_inheritor; // Pointer to the internal data, HTTP for example
dap_cpu_stats_t cpu_stats;
dap_server_callback_t server_delete_callback;
dap_server_client_callback_t client_new_callback; // Create new client callback
dap_server_client_callback_t client_delete_callback; // Delete client callback
dap_server_client_callback_t client_read_callback; // Read function
dap_server_client_callback_t client_write_callback; // Write function
dap_server_client_callback_t client_error_callback; // Error processing function
} dap_server_t;
int32_t dap_server_init( uint32_t count_threads ); // Init server module
void dap_server_deinit( void ); // Deinit server module
dap_server_t *dap_server_listen( const char *addr, uint16_t port, dap_server_type_t type );
int32_t dap_server_loop( dap_server_t *d_server );
#define DL_LIST_REMOVE_NODE( head, obj, _prev_, _next_, total ) \
\
if ( obj->_next_ ) { \
\
if ( obj->_prev_ ) \
obj->_next_->_prev_ = obj->_prev_; \
else { \
\
obj->_next_->_prev_ = NULL; \
head = obj->_next_; \
} \
} \
\
if ( obj->_prev_ ) { \
\
if ( obj->_next_ ) \
obj->_prev_->_next_ = obj->_next_; \
else { \
\
obj->_prev_->_next_ = NULL; \
} \
} \
-- total;
#define DL_LIST_ADD_NODE_HEAD( head, obj, _prev_, _next_, total )\
\
if ( !total ) { \
\
obj->_prev_ = NULL; \
obj->_next_ = NULL; \
\
head = obj; \
} \
else { \
\
head->_prev_ = obj; \
\
obj->_prev_ = NULL; \
obj->_next_ = head; \
\
head = obj; \
} \
++ total;
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#ifndef _WIN32
#include <netinet/in.h>
#include <stdint.h>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#define EPOLL_HANDLE int
#else
#define EPOLL_HANDLE HANDLE
#define MSG_DONTWAIT 0
#define MSG_NOSIGNAL 0
#include "wepoll.h"
#endif
#include <pthread.h>
#include "uthash.h"
#include "utlist.h"
#include "dap_cpu_monitor.h"
#include "dap_client_remote.h"
typedef enum dap_server_type {DAP_SERVER_TCP} dap_server_type_t;
#define BIT( x ) ( 1 << x )
#define DAP_SOCK_READY_TO_READ BIT( 0 )
#define DAP_SOCK_READY_TO_WRITE BIT( 1 )
#define DAP_SOCK_SIGNAL_CLOSE BIT( 2 )
#define DAP_SOCK_ACTIVE BIT( 3 )
typedef struct dap_server_thread_s {
EPOLL_HANDLE epoll_fd;
uint32_t thread_num;
uint32_t connections_count;
uint32_t to_kill_count;
struct epoll_event *epoll_events;
dap_client_remote_t *dap_remote_clients;
dap_client_remote_t *hclients; // Hashmap of clients
dap_client_remote_t *dap_clients_to_kill;
pthread_mutex_t mutex_dlist_add_remove;
pthread_mutex_t mutex_on_hash;
} dap_server_thread_t;
struct dap_server;
typedef void (*dap_server_callback_t)( struct dap_server *,void * arg ); // Callback for specific server's operations
typedef struct dap_server {
dap_server_type_t type; // Server's type
uint16_t port; // Listen port
char *address; // Listen address
int32_t socket_listener; // Socket for listener
EPOLL_HANDLE epoll_fd; // Epoll fd
struct sockaddr_in listener_addr; // Kernel structure for listener's binded address
void *_inheritor; // Pointer to the internal data, HTTP for example
dap_cpu_stats_t cpu_stats;
dap_server_callback_t server_delete_callback;
dap_server_client_callback_t client_new_callback; // Create new client callback
dap_server_client_callback_t client_delete_callback; // Delete client callback
dap_server_client_callback_t client_read_callback; // Read function
dap_server_client_callback_t client_write_callback; // Write function
dap_server_client_callback_t client_error_callback; // Error processing function
} dap_server_t;
int32_t dap_server_init( uint32_t count_threads ); // Init server module
void dap_server_deinit( void ); // Deinit server module
dap_server_t *dap_server_listen( const char *addr, uint16_t port, dap_server_type_t type );
int32_t dap_server_loop( dap_server_t *d_server );
#define DL_LIST_REMOVE_NODE( head, obj, _prev_, _next_, total ) \
\
if ( obj->_next_ ) { \
\
if ( obj->_prev_ ) \
obj->_next_->_prev_ = obj->_prev_; \
else { \
\
obj->_next_->_prev_ = NULL; \
head = obj->_next_; \
} \
} \
\
if ( obj->_prev_ ) { \
\
if ( obj->_next_ ) \
obj->_prev_->_next_ = obj->_next_; \
else { \
\
obj->_prev_->_next_ = NULL; \
} \
} \
-- total;
#define DL_LIST_ADD_NODE_HEAD( head, obj, _prev_, _next_, total )\
\
if ( !total ) { \
\
obj->_prev_ = NULL; \
obj->_next_ = NULL; \
\
head = obj; \
} \
else { \
\
head->_prev_ = obj; \
\
obj->_prev_ = NULL; \
obj->_next_ = head; \
\
head = obj; \
} \
++ total;
/*
* Authors:
* Anatoliy Jurotich <anatoliy.kurotich@demlabs.net>
* DeM Labs Inc. https://demlabs.net
* Kelvin Project https://github.com/kelvinblockchain
* Copyright (c) 2017-2018
* All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "dap_client_remote.h"
#include "dap_server.h"
typedef void (*dap_traffic_callback_t) (dap_server_t *);
/**
* @brief dap_traffic_track_init
* @param clients
* @param timeout callback
*/
void dap_traffic_track_init( dap_server_t *server, time_t timeout );
/**
* @brief dap_traffic_track_deinit
*/
void dap_traffic_track_deinit( void );
/**
* @brief dap_traffic_add_callback
*/
void dap_traffic_callback_set( dap_traffic_callback_t );
void dap_traffic_callback_stop( void );
/*
* Authors:
* Anatoliy Jurotich <anatoliy.kurotich@demlabs.net>
* DeM Labs Inc. https://demlabs.net
* Kelvin Project https://github.com/kelvinblockchain
* Copyright (c) 2017-2018
* All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "dap_client_remote.h"
#include "dap_server.h"
typedef void (*dap_traffic_callback_t) (dap_server_t *);
/**
* @brief dap_traffic_track_init
* @param clients
* @param timeout callback
*/
void dap_traffic_track_init( dap_server_t *server, time_t timeout );
/**
* @brief dap_traffic_track_deinit
*/
void dap_traffic_track_deinit( void );
/**
* @brief dap_traffic_add_callback
*/
void dap_traffic_callback_set( dap_traffic_callback_t );
void dap_traffic_callback_stop( void );
Subproject commit 4ab41cdcaa8087323652cd5fef702876ccc25dab
Subproject commit 6c5dd5a5b0de5573b03ed144c651352467a56101
Subproject commit ff63d762657f9687173db825705b8bf4b958abee
Subproject commit 70bfa60b850cdb0a39cbe212439de9e9daf750d1
......@@ -36,8 +36,6 @@
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#include "wrappers.h"
#include <wepoll.h>
#include <pthread.h>
#endif
......@@ -136,14 +134,22 @@ void dap_client_remote_remove( dap_client_remote_t *sc )
HASH_DEL( t->hclients, sc );
pthread_mutex_unlock( &t->mutex_on_hash );
if( sc->server->client_delete_callback )
if( sc->server->client_delete_callback ) {
sc->server->client_delete_callback( sc, NULL ); // Init internal structure
}
if( sc->_inheritor )
if( sc->_inheritor ) {
free( sc->_inheritor );
}
if( sc->socket )
if( sc->socket ) {
log_it( L_INFO, "dap_client_remote_remove close( %d );", sc->socket );
#ifdef _WIN32
closesocket( sc->socket );
#else
close( sc->socket );
#endif
}
free( sc );
}
......@@ -259,7 +265,7 @@ size_t dap_client_remote_write_f( dap_client_remote_t *a_client, const char * a_
va_list ap;
va_start( ap, a_format );
int ret = vsnprintf( a_client->buf_out + a_client->buf_out_size, max_data_size, a_format, ap );
int ret = dap_vsnprintf( a_client->buf_out + a_client->buf_out_size, max_data_size, a_format, ap );
va_end( ap );
......
......@@ -54,26 +54,19 @@
#endif
#else
#undef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#include "wrappers.h"
#include <wepoll.h>
#include <pthread.h>
//#define NO_TIMER
#endif
#include <utlist.h>
#include <sched.h>
#include "dap_common.h"
#include "dap_server.h"
#include "dap_events.h"
#define DAP_MAX_EPOLL_EVENTS 8192
......@@ -87,7 +80,7 @@
//dap_events_socket_info_t **s_dap_events_sockets;
static uint32_t s_threads_count = 1;
static size_t s_connection_timeout = 600;
static size_t s_connection_timeout = 6000;
static struct epoll_event *g_epoll_events = NULL;
dap_worker_t *s_workers = NULL;
......@@ -201,7 +194,7 @@ void dap_events_delete( dap_events_t *a_events )
if ( a_events ) {
HASH_ITER( hh, a_events->sockets,cur, tmp ) {
dap_events_socket_delete( cur, false );
dap_events_socket_delete( cur, true );
}
if ( a_events->_inheritor )
......@@ -214,33 +207,6 @@ void dap_events_delete( dap_events_t *a_events )
}
}
void dap_events_kill_socket( dap_events_socket_t *a_es )
{
if ( !a_es ) {
log_it( L_ERROR, "dap_events_kill_socket( NULL )" );
return;
}
uint32_t tn = a_es->dap_worker->number_thread;
dap_worker_t *w = a_es->dap_worker;
dap_events_t *d_ev = w->events;
pthread_mutex_lock( &w->locker_on_count );
if ( a_es->kill_signal ) {
pthread_mutex_unlock( &w->locker_on_count );
return;
}
log_it( L_DEBUG, "KILL %u socket! [ thread %u ]", a_es->socket, tn );
a_es->kill_signal = true;
DL_LIST_ADD_NODE_HEAD( d_ev->to_kill_sockets, a_es, kprev, knext, w->event_to_kill_count );
pthread_mutex_unlock( &w->locker_on_count );
}
/**
* @brief s_socket_info_all_check_activity
* @param n_thread
......@@ -264,7 +230,7 @@ static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t
dap_worker->event_sockets_count --;
DL_DELETE( d_ev->dlsockets, a_es );
dap_events_socket_delete( a_es, false );
dap_events_socket_delete( a_es, true );
}
}
pthread_mutex_unlock( &dap_worker->locker_on_count );
......@@ -276,26 +242,32 @@ static void s_socket_all_check_activity( dap_worker_t *dap_worker, dap_events_t
* @param arg
* @return
*/
static void *thread_worker_function( void *arg )
static void *thread_worker_function(void *arg)
{
dap_events_socket_t *cur, *tmp;
dap_worker_t *w = (dap_worker_t *)arg;
time_t next_time_timeout_check = time( NULL ) + s_connection_timeout / 2;
uint32_t tn = w->number_thread;
dap_events_socket_t *cur;
dap_worker_t *w = (dap_worker_t *) arg;
time_t next_time_timeout_check = time( NULL) + s_connection_timeout / 2;
uint32_t tn = w->number_thread;
#ifndef _WIN32
#ifndef NO_POSIX_SHED
cpu_set_t mask;
CPU_ZERO( &mask );
CPU_SET( tn, &mask );
if ( pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask) != 0 )
{
log_it(L_CRITICAL, "Error pthread_setaffinity_np() You really have %d or more core in CPU?", *(int*)arg);
abort();
}
#endif
#else
#ifndef _WIN32
#ifndef NO_POSIX_SHED
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(tn, &mask);
int err;
#ifndef __ANDROID__
err = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask);
#else
err = sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &mask);
#endif
if(err)
{
log_it(L_CRITICAL, "Error pthread_setaffinity_np() You really have %d or more core in CPU?", *(int* )arg);
abort();
}
#endif
#else
if ( !SetThreadAffinityMask( GetCurrentThread(), (DWORD_PTR)(1 << tn) ) ) {
log_it( L_CRITICAL, "Error pthread_setaffinity_np() You really have %d or more core in CPU?", tn );
......@@ -303,172 +275,185 @@ static void *thread_worker_function( void *arg )
}
#endif
log_it(L_INFO, "Worker %d started, epoll fd %d", w->number_thread, w->epoll_fd);
log_it(L_INFO, "Worker %d started, epoll fd %d", w->number_thread, w->epoll_fd);
struct epoll_event *events = &g_epoll_events[ DAP_MAX_EPOLL_EVENTS * tn ];
struct epoll_event *events = &g_epoll_events[ DAP_MAX_EPOLL_EVENTS * tn];
// memset( &ev, 0, sizeof(ev) );
// memset( &events, 0, sizeof(events) );
size_t total_sent; int bytes_sent;
size_t total_sent;
int bytes_sent;
while( 1 ) {
while(1) {
int selected_sockets = epoll_wait( w->epoll_fd, events, DAP_MAX_EPOLL_EVENTS, 1000 );
if ( selected_sockets == -1 ) {
if ( errno == EINTR )
continue;
break;
}
int selected_sockets = epoll_wait(w->epoll_fd, events, DAP_MAX_EPOLL_EVENTS, 1000);
time_t cur_time = time( NULL );
for( int32_t n = 0; n < selected_sockets; n ++ ) {
cur = (dap_events_socket_t *)events[n].data.ptr;
if ( !cur ) {
log_it( L_ERROR,"dap_events_socket NULL" );
continue;
}
if ( events[n].events & EPOLLERR ) {
log_it( L_ERROR,"Socket error: %s",strerror(errno) );
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
cur->callbacks->error_callback( cur, NULL ); // Call callback to process error event
}
cur->last_time_active = cur_time;
if( events[n].events & EPOLLIN ) {
//log_it(DEBUG,"Comes connection in active read set");
if( cur->buf_in_size == sizeof(cur->buf_in) ) {
log_it( L_WARNING, "Buffer is full when there is smth to read. Its dropped!" );
cur->buf_in_size = 0;
}
int32_t bytes_read = recv( cur->socket, (char *)(cur->buf_in + cur->buf_in_size),
sizeof(cur->buf_in) - cur->buf_in_size, 0 );
if( bytes_read > 0 ) {
cur->buf_in_size += bytes_read;
//log_it(DEBUG, "Received %d bytes", bytes_read);
cur->callbacks->read_callback( cur, NULL ); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well
}
else if( bytes_read < 0 ) {
log_it( L_ERROR,"Some error occured in recv() function: %s",strerror(errno) );
dap_events_socket_set_readable( cur, false );
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
else if ( bytes_read == 0 ) {
log_it( L_INFO, "Client socket disconnected" );
dap_events_socket_set_readable( cur, false );
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
if(selected_sockets == -1) {
if( errno == EINTR)
continue;
break;
}
}
// Socket is ready to write
if( ( (events[n].events & EPOLLOUT) || (cur->flags & DAP_SOCK_READY_TO_WRITE) ) && !(cur->flags & DAP_SOCK_SIGNAL_CLOSE) ) {
///log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size);
cur->callbacks->write_callback( cur, NULL ); // Call callback to process write event
if( cur->flags & DAP_SOCK_READY_TO_WRITE ) {
time_t cur_time = time( NULL);
for(int32_t n = 0; n < selected_sockets; n++) {
static const uint32_t buf_out_zero_count_max = 20;
cur->buf_out[cur->buf_out_size] = 0;
cur = (dap_events_socket_t *) events[n].data.ptr;
if( !cur->buf_out_size ) {
if(!cur) {
log_it(L_WARNING, "Output: nothing to send. Why we are in write socket set?");
cur->buf_out_zero_count ++;
if( cur->buf_out_zero_count > buf_out_zero_count_max ) { // How many time buf_out on write event could be empty
log_it( L_ERROR, "Output: nothing to send %u times, remove socket from the write set", buf_out_zero_count_max );
dap_events_socket_set_writable( cur, false );
log_it(L_ERROR, "dap_events_socket NULL");
continue;
}
log_it(L_DEBUG, "Worker=%d fd=%d socket=%d event=0x%x(%d)", w->number_thread, w->epoll_fd,cur->socket, events[n].events,events[n].events);
//connection already closed (EPOLLHUP - shutdown has been made in both directions)
if(events[n].events & EPOLLHUP) { // && events[n].events & EPOLLERR) {
log_it(L_DEBUG, "Socket shutdown (EPOLLHUP): %s", strerror(errno));
//if(!(events[n].events & EPOLLIN))
//cur->no_close = false;
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
if(!(events[n].events & EPOLLERR))
cur->callbacks->error_callback(cur, NULL); // Call callback to process error event
}
}
else
cur->buf_out_zero_count = 0;
}
for ( total_sent = 0; total_sent < cur->buf_out_size; ) { // If after callback there is smth to send - we do it
bytes_sent = send( cur->socket, (char *)(cur->buf_out + total_sent),
cur->buf_out_size - total_sent, MSG_DONTWAIT | MSG_NOSIGNAL );
if ( bytes_sent < 0 ) {
log_it(L_ERROR,"Some error occured in send() function");
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
break;
}
total_sent += bytes_sent;
//log_it(L_DEBUG, "Output: %u from %u bytes are sent ", total_sent,sa_cur->buf_out_size);
}
//log_it(L_DEBUG,"Output: sent %u bytes",total_sent);
cur->buf_out_size = 0;
}
if ( (cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !cur->no_close ) {
pthread_mutex_lock( &w->locker_on_count );
if ( cur->kill_signal ) {
pthread_mutex_unlock( &w->locker_on_count );
continue;
}
// pthread_mutex_unlock( &dsth->mutex_dlist_add_remove );
// dap_server_kill_socket( dap_cur );
// continue;
if(events[n].events & EPOLLERR) {
log_it(L_ERROR, "Socket error: %s", strerror(errno));
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
cur->callbacks->error_callback(cur, NULL); // Call callback to process error event
}
log_it( L_INFO, "Got signal to close %s, sock %u [thread %u]", cur->hostaddr, cur->socket, tn );
cur->last_time_active = cur_time;
if(events[n].events & EPOLLIN) {
//log_it(DEBUG,"Comes connection in active read set");
if(cur->buf_in_size == sizeof(cur->buf_in)) {
log_it(L_WARNING, "Buffer is full when there is smth to read. Its dropped!");
cur->buf_in_size = 0;
}
int32_t bytes_read = recv(cur->socket, (char *) (cur->buf_in + cur->buf_in_size),
sizeof(cur->buf_in) - cur->buf_in_size, 0);
if(bytes_read > 0) {
cur->buf_in_size += bytes_read;
//log_it(DEBUG, "Received %d bytes", bytes_read);
cur->callbacks->read_callback(cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well
}
else if(bytes_read < 0) {
log_it(L_ERROR, "Some error occured in recv() function: %s", strerror(errno));
dap_events_socket_set_readable(cur, false);
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
else if(bytes_read == 0) {
log_it(L_INFO, "Client socket disconnected");
dap_events_socket_set_readable(cur, false);
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
}
dap_events_socket_remove_and_delete( cur, false );
pthread_mutex_unlock( &w->locker_on_count );
}
} // for
// Socket is ready to write
if(((events[n].events & EPOLLOUT) || (cur->flags & DAP_SOCK_READY_TO_WRITE))
&& !(cur->flags & DAP_SOCK_SIGNAL_CLOSE)) {
///log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size);
cur->callbacks->write_callback(cur, NULL); // Call callback to process write event
if(cur->flags & DAP_SOCK_READY_TO_WRITE) {
static const uint32_t buf_out_zero_count_max = 20;
cur->buf_out[cur->buf_out_size] = 0;
if(!cur->buf_out_size) {
log_it(L_WARNING, "Output: nothing to send. Why we are in write socket set?");
cur->buf_out_zero_count++;
if(cur->buf_out_zero_count > buf_out_zero_count_max) { // How many time buf_out on write event could be empty
log_it(L_ERROR, "Output: nothing to send %u times, remove socket from the write set",
buf_out_zero_count_max);
dap_events_socket_set_writable(cur, false);
}
}
else
cur->buf_out_zero_count = 0;
}
for(total_sent = 0; total_sent < cur->buf_out_size;) { // If after callback there is smth to send - we do it
bytes_sent = send(cur->socket, (char *) (cur->buf_out + total_sent),
cur->buf_out_size - total_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
if(bytes_sent < 0) {
log_it(L_ERROR, "Some error occured in send() function");
cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
break;
}
total_sent += bytes_sent;
//log_it(L_DEBUG, "Output: %u from %u bytes are sent ", total_sent,sa_cur->buf_out_size);
}
//log_it(L_DEBUG,"Output: sent %u bytes",total_sent);
cur->buf_out_size = 0;
}
#ifndef NO_TIMER
if ( cur_time >= next_time_timeout_check ) {
pthread_mutex_lock(&w->locker_on_count);
s_socket_all_check_activity( w, w->events, cur_time );
next_time_timeout_check = cur_time + s_connection_timeout / 2;
}
#endif
if((cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !cur->no_close) {
// protect against double deletion
cur->kill_signal = true;
//dap_events_socket_remove_and_delete(cur, true);
log_it(L_INFO, "Got signal to close %s, sock %u [thread %u]", cur->hostaddr, cur->socket, tn);
}
pthread_mutex_lock( &w->locker_on_count );
if ( !w->event_to_kill_count ) {
if(cur->kill_signal) {
log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ]", cur->socket, tn);
dap_events_socket_remove(cur);
pthread_mutex_unlock(&w->locker_on_count);
dap_events_socket_delete( cur, true);
}
else
pthread_mutex_unlock(&w->locker_on_count);
pthread_mutex_unlock( &w->locker_on_count );
continue;
}
/*
if(!w->event_to_kill_count) {
cur = w->events->to_kill_sockets;
pthread_mutex_unlock(&w->locker_on_count);
continue;
do {
do {
// if ( cur->no_close ) {
// cur = cur->knext;
// continue;
// }
tmp = cur_del->knext;
// delete only current events_socket because others may be active in the other workers
//if(cur_del == cur)
if(cur->kill_signal) {
log_it(L_INFO, "Kill %u socket (processed).... [ thread %u ]", cur_del->socket, tn);
DL_LIST_REMOVE_NODE(w->events->to_kill_sockets, cur, kprev, knext, w->event_to_kill_count);
dap_events_socket_remove_and_delete(cur_del, true);
}
cur_del = tmp;
} while(cur_del);
log_it(L_INFO, "[ Thread %u ] coneections: %u, to kill: %u", tn, w->event_sockets_count,
w->event_to_kill_count);
pthread_mutex_unlock(&w->locker_on_count);
*/
} // for
#ifndef NO_TIMER
if(cur_time >= next_time_timeout_check) {
s_socket_all_check_activity(w, w->events, cur_time);
next_time_timeout_check = cur_time + s_connection_timeout / 2;
}
#endif
log_it( L_INFO, "Kill %u socket .... [ thread %u ]", cur->socket, tn );
tmp = cur->knext;
DL_LIST_REMOVE_NODE( w->events->to_kill_sockets, cur, kprev, knext, w->event_to_kill_count );
dap_events_socket_remove_and_delete( cur, false );
cur = tmp;
} while ( cur );
log_it( L_INFO, "[ Thread %u ] coneections: %u, to kill: %u", tn, w->event_sockets_count, w->event_to_kill_count );
pthread_mutex_unlock( &w->locker_on_count );
} // while
} // while
return NULL;
return NULL;
}
/**
......@@ -527,7 +512,7 @@ int dap_events_start( dap_events_t *a_events )
return -1;
}
s_workers[i].event_to_kill_count = 0;
//s_workers[i].event_to_kill_count = 0;
s_workers[i].event_sockets_count = 0;
s_workers[i].number_thread = i;
s_workers[i].events = a_events;
......@@ -556,8 +541,6 @@ int dap_events_wait( dap_events_t *sh )
return 0;
}
/**
* @brief dap_worker_add_events_socket
* @param a_worker
......@@ -572,23 +555,6 @@ void dap_worker_add_events_socket( dap_events_socket_t *a_es)
a_es->events = a_es->dap_worker->events;
}
/**
* @brief dap_events_socket_delete
* @param a_es
*/
void dap_events_socket_remove_and_delete( dap_events_socket_t *a_es, bool preserve_inheritor )
{
if ( epoll_ctl( a_es->dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 )
log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" );
else
log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_es->dap_worker->number_thread );
DL_DELETE( a_es->events->dlsockets, a_es );
a_es->dap_worker->event_sockets_count --;
dap_events_socket_delete( a_es, preserve_inheritor );
}
/**
* @brief dap_events__thread_wake_up
* @param th
......
......@@ -33,15 +33,12 @@
#include <unistd.h>
#include <fcntl.h>
#else
#undef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#include "wrappers.h"
#include <wepoll.h>
#include "wepoll.h"
#include <pthread.h>
#endif
......@@ -176,9 +173,11 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da
dap_events_socket_t *dap_events_socket_find( int sock, struct dap_events *a_events )
{
dap_events_socket_t *ret = NULL;
if(!a_events)
return NULL;
pthread_rwlock_rdlock( &a_events->sockets_rwlock );
HASH_FIND_INT( a_events->sockets, &sock, ret );
if(a_events->sockets)
HASH_FIND_INT( a_events->sockets, &sock, ret );
pthread_rwlock_unlock( &a_events->sockets_rwlock );
return ret;
......@@ -249,6 +248,37 @@ void dap_events_socket_set_writable( dap_events_socket_t *sc, bool is_ready )
}
/**
* @brief dap_events_socket_kill_socket
* @param sc Connection instance
*/
int dap_events_socket_kill_socket( dap_events_socket_t *a_es )
{
if ( !a_es ) {
log_it( L_ERROR, "dap_events_socket_kill_socket( NULL )" );
return -1;
}
uint32_t tn = a_es->dap_worker->number_thread;
dap_worker_t *w = a_es->dap_worker;
//dap_events_t *d_ev = w->events;
pthread_mutex_lock( &w->locker_on_count );
if ( a_es->kill_signal ) {
pthread_mutex_unlock( &w->locker_on_count );
return 0;
}
log_it( L_DEBUG, "KILL %u socket! (in queue) [ thread %u ]", a_es->socket, tn );
a_es->kill_signal = true;
//DL_LIST_ADD_NODE_HEAD( d_ev->to_kill_sockets, a_es, kprev, knext, w->event_to_kill_count );
pthread_mutex_unlock( &w->locker_on_count );
return 0;
}
/**
* @brief dap_events_socket_remove Removes the client from the list
* @param sc Connection instance
......@@ -259,8 +289,13 @@ void dap_events_socket_delete( dap_events_socket_t *a_es, bool preserve_inherito
log_it( L_DEBUG, "es is going to be removed from the lists and free the memory (0x%016X)", a_es );
if(!dap_events_socket_find(a_es->socket, a_es->events)){
log_it( L_ERROR, "dap_events_socket 0x%x already deleted", a_es);
return ;
}
pthread_rwlock_wrlock( &a_es->events->sockets_rwlock );
HASH_DEL( a_es->events->sockets, a_es );
if(a_es->events->sockets)
HASH_DEL( a_es->events->sockets, a_es );
pthread_rwlock_unlock( &a_es->events->sockets_rwlock );
log_it( L_DEBUG, "dap_events_socket wrapped around %d socket is removed", a_es->socket );
......@@ -269,14 +304,47 @@ void dap_events_socket_delete( dap_events_socket_t *a_es, bool preserve_inherito
a_es->callbacks->delete_callback( a_es, NULL ); // Init internal structure
if ( a_es->_inheritor && !preserve_inheritor )
free( a_es->_inheritor );
DAP_DELETE( a_es->_inheritor );
if ( a_es->socket )
if ( a_es->socket ) {
#ifdef _WIN32
closesocket( a_es->socket );
#else
close( a_es->socket );
#endif
}
free( a_es );
}
/**
* @brief dap_events_socket_delete
* @param a_es
*/
void dap_events_socket_remove( dap_events_socket_t *a_es)
{
if ( epoll_ctl( a_es->dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 )
log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" );
else
log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_es->dap_worker->number_thread );
DL_DELETE( a_es->events->dlsockets, a_es );
a_es->dap_worker->event_sockets_count --;
}
void dap_events_socket_remove_and_delete( dap_events_socket_t *a_es, bool preserve_inheritor )
{
if ( epoll_ctl( a_es->dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 )
log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" );
else
log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_es->dap_worker->number_thread );
DL_DELETE( a_es->events->dlsockets, a_es );
a_es->dap_worker->event_sockets_count --;
dap_events_socket_delete( a_es, preserve_inheritor );
}
/**
* @brief dap_events_socket_write Write data to the client
* @param sc Conn instance
......@@ -286,7 +354,7 @@ void dap_events_socket_delete( dap_events_socket_t *a_es, bool preserve_inherito
*/
size_t dap_events_socket_write(dap_events_socket_t *sc, const void * data, size_t data_size)
{
log_it(L_DEBUG,"dap_events_socket_write %u sock data %X size %u", sc->socket, data, data_size );
//log_it(L_DEBUG,"dap_events_socket_write %u sock data %X size %u", sc->socket, data, data_size );
data_size = ((sc->buf_out_size+data_size)<(sizeof(sc->buf_out)))?data_size:(sizeof(sc->buf_out)-sc->buf_out_size );
memcpy(sc->buf_out+sc->buf_out_size,data,data_size);
......@@ -307,7 +375,7 @@ size_t dap_events_socket_write_f(dap_events_socket_t *sc, const char * format,..
size_t max_data_size = sizeof(sc->buf_out)-sc->buf_out_size;
va_list ap;
va_start(ap,format);
int ret=vsnprintf((char*) sc->buf_out+sc->buf_out_size,max_data_size,format,ap);
int ret=dap_vsnprintf((char*) sc->buf_out+sc->buf_out_size,max_data_size,format,ap);
va_end(ap);
if(ret>0){
sc->buf_out_size+=ret;
......@@ -327,7 +395,7 @@ size_t dap_events_socket_write_f(dap_events_socket_t *sc, const char * format,..
*/
size_t dap_events_socket_read(dap_events_socket_t *sc, void *data, size_t data_size)
{
log_it(L_DEBUG,"dap_events_socket_read %u sock data %X size %u", sc->socket, data, data_size );
// log_it(L_DEBUG,"dap_events_socket_read %u sock data %X size %u", sc->socket, data, data_size );
if(data_size<sc->buf_in_size){
memcpy(data,sc->buf_in,data_size);
......
#include "dap_memcached.h"
#include <libmemcached/memcached.h>
#define LOG_TAG "dap_memcached"
static memcached_st *_memc;
static bool _is_module_enable = false;
int dap_memcached_init(const char *server_host, uint16_t port)
{
memcached_return rc;
memcached_server_st *servers = NULL;
char *test_key = "test_key";
char *test_value = "test_value";
_memc = memcached_create(NULL);
servers= memcached_server_list_append(servers, server_host, port, &rc);
rc= memcached_server_push(_memc, servers);
if (rc != MEMCACHED_SUCCESS) {
log_it(L_ERROR, "Couldn't add server: %s", memcached_strerror(_memc, rc));
return -1;
}
if(dap_memcache_put(test_key, test_value, strlen(test_value), 0) != true) {
return -2;
}
_is_module_enable = true;
return 0;
}
bool dap_memcache_is_enable()
{
return _is_module_enable;
}
bool dap_memcache_put(const char* key, void *value, size_t value_size, time_t expiration)
{
memcached_return rc;
rc = memcached_set(_memc, key, strlen(key), value, value_size, expiration, (uint32_t)0);
if (rc != MEMCACHED_SUCCESS) {
log_it(L_ERROR, "%s", memcached_strerror(_memc, rc));
return false;
}
return true;
}
bool dap_memcache_get(const char* key, size_t * value_size, void ** result)
{
memcached_return rc;
*result = memcached_get(_memc, key, strlen(key), value_size, NULL, &rc);
return rc == MEMCACHED_SUCCESS;
}
bool dap_memcache_delete(const char* key)
{
return memcached_delete(_memc, key, strlen(key), 0) == MEMCACHED_SUCCESS;
}
/**
* @brief dap_memcached_deinit
*/
void dap_memcached_deinit()
{
_is_module_enable = false;
}
......@@ -46,15 +46,11 @@
#include <sys/epoll.h>
#include <sys/timerfd.h>
#else
#undef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#include "wrappers.h"
#include <wepoll.h>
#include <pthread.h>
#endif
......@@ -76,7 +72,7 @@
#define DAP_MAX_THREAD_EVENTS 8192
#define DAP_MAX_THREADS 16
#define SOCKET_TIMEOUT_TIME 30
#define SOCKET_TIMEOUT_TIME 300
#define SOCKETS_TIMEOUT_CHECK_PERIOD 15
static uint32_t _count_threads = 0;
......@@ -105,7 +101,7 @@ static uint32_t get_epoll_max_user_watches( void )
FILE *fp = fopen( maxepollpath, "r" );
if ( !fp ) {
printf("can't open %s\n", maxepollpath );
// printf("can't open %s\n", maxepollpath );
return v;
}
......@@ -344,6 +340,7 @@ dap_client_remote_t *dap_server_add_socket( int32_t fd, int32_t forced_thread_n
pthread_mutex_lock( &dsth->mutex_dlist_add_remove );
DL_APPEND( dsth->dap_remote_clients, dcr );
dsth->connections_count ++;
if ( epoll_ctl( dsth->epoll_fd, EPOLL_CTL_ADD, fd, &dcr->pevent) != 0 ) {
......@@ -381,7 +378,7 @@ void dap_server_remove_socket( dap_client_remote_t *dcr )
// pthread_mutex_unlock( &dsth->mutex_dlist_add_remove );
log_it( L_DEBUG, "dcr = %X", dcr );
// log_it( L_DEBUG, "dcr = %X", dcr );
}
static void s_socket_all_check_activity( uint32_t tn, time_t cur_time )
......@@ -462,7 +459,7 @@ static void read_write_cb( dap_client_remote_t *dap_cur, int32_t revents )
_current_run_server->client_write_callback( dap_cur, NULL ); // Call callback to process write event
if( dap_cur->buf_out_size == 0 ) {
log_it(L_DEBUG, "dap_cur->buf_out_size = 0, set ev_read watcher " );
//log_it(L_DEBUG, "dap_cur->buf_out_size = 0, set ev_read watcher " );
dap_cur->pevent.events = EPOLLIN | EPOLLERR;
if( epoll_ctl(dap_cur->efd, EPOLL_CTL_MOD, dap_cur->socket, &dap_cur->pevent) != 0 ) {
......@@ -596,7 +593,13 @@ void *thread_loop( void *arg )
CPU_ZERO( &mask );
CPU_SET( tn, &mask );
if ( pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask) != 0 ) {
int err;
#ifndef ANDROID
err = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask);
#else
err = sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &mask);
#endif
if (err) {
log_it( L_CRITICAL, "Error pthread_setaffinity_np() You really have %d or more core in CPU?", tn );
abort();
}
......@@ -638,12 +641,13 @@ void *thread_loop( void *arg )
}
dap_cur->last_time_active = cur_time;
if( events[i].events & EPOLLERR ) {
log_it( L_ERROR,"Socket error: %u, remove it" , dap_cur->socket );
dap_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
#ifdef _WIN32
set_nonblock_socket(dap_cur->socket); // pconst: for winsock2 has no appropriate MSG attributes
#endif
if ( !(dap_cur->flags & DAP_SOCK_SIGNAL_CLOSE) || dap_cur->no_close )
read_write_cb( dap_cur, events[i].events );
......@@ -665,7 +669,6 @@ void *thread_loop( void *arg )
dap_server_remove_socket( dap_cur );
dap_client_remote_remove( dap_cur );
log_it( L_INFO, "[ Thread %u ] coneections: %u, to kill: %u", tn, dsth->connections_count, dsth->to_kill_count );
pthread_mutex_unlock( &dsth->mutex_dlist_add_remove );
}
......
......@@ -32,15 +32,11 @@
#include <pthread.h>
#include <ev.h>
#else
#undef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#include "wrappers.h"
#include <wepoll.h>
#include <pthread.h>
#endif
......