Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (6)
Showing
with 69 additions and 796 deletions
......@@ -2,7 +2,7 @@ project(cellframe-sdk C)
cmake_minimum_required(VERSION 2.8)
set(CMAKE_C_STANDARD 11)
set(CELLFRAME_SDK_NATIVE_VERSION "2.5-17")
set(CELLFRAME_SDK_NATIVE_VERSION "2.5-19")
add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"")
set(DAPSDK_MODULES "")
......@@ -55,7 +55,7 @@ endif()
# Networking
if (CELLFRAME_MODULES MATCHES "network")
message("[+] Module 'network'")
set(CELLFRAME_LIBS ${CELLFRAME_LIBS} dap_server_core dap_enc_server dap_http_server dap_udp_server dap_session
set(CELLFRAME_LIBS ${CELLFRAME_LIBS} dap_server_core dap_enc_server dap_http_server dap_session
dap_stream dap_stream_ch dap_client dap_stream_ch_chain dap_stream_ch_chain_net dap_chain_net dap_chain_mempool magic)
endif()
......
set(DAP_SDK_NATIVE_VERSION "2.0-11")
set(DAP_SDK_NATIVE_VERSION "2.0-13")
# Core
if (DAPSDK_MODULES MATCHES "core")
# Core
......@@ -22,5 +22,5 @@ endif()
# Networking server
if (DAPSDK_MODULES MATCHES "network-server")
add_subdirectory(net/server)
add_subdirectory(net/server-udp)
# add_subdirectory(net/server-udp)
endif()
......@@ -12,7 +12,7 @@ endif()
add_library(${PROJECT_NAME} STATIC ${DAP_CLIENT_HEADERS} ${DAP_CLIENT_SOURCES})
target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_server_core dap_udp_server dap_http_server dap_enc_server dap_stream dap_session dap_stream_ch json-c)
target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_server_core dap_http_server dap_enc_server dap_stream dap_session dap_stream_ch json-c)
if(UNIX AND NOT ANDROID)
target_link_libraries(${PROJECT_NAME} rt)
......
......@@ -30,7 +30,8 @@
// for Unix-like systems
#include <sys/types.h>
#include <sys/socket.h>
//#include <bits/socket_type.h>
#include <netdb.h>
#include <arpa/inet.h>
#endif
#include <unistd.h>
#include <errno.h>
......
......@@ -45,6 +45,7 @@
#else
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <arpa/inet.h>
#endif
#include <pthread.h>
......
......@@ -652,12 +652,10 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da
ret->socket = a_sock;
ret->events = a_events;
ret->server = a_server;
ret->is_dont_reset_write_flag = true;
memcpy(&ret->callbacks,a_callbacks, sizeof ( ret->callbacks) );
ret->flags = DAP_SOCK_READY_TO_READ;
ret->is_pingable = true;
ret->last_time_active = ret->last_ping_request = time( NULL );
pthread_rwlock_wrlock( &a_events->sockets_rwlock );
......@@ -693,7 +691,7 @@ dap_events_socket_t *dap_events_socket_find_unsafe( int sock, struct dap_events
*/
void dap_events_socket_set_readable_unsafe( dap_events_socket_t *sc, bool is_ready )
{
if( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_READ) )
if( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_READ))
return;
sc->ev.events = sc->ev_base_flags;
......@@ -730,7 +728,7 @@ void dap_events_socket_set_readable_unsafe( dap_events_socket_t *sc, bool is_rea
*/
void dap_events_socket_set_writable_unsafe( dap_events_socket_t *sc, bool a_is_ready )
{
if ( a_is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_WRITE) ) {
if ( a_is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_WRITE)) {
return;
}
......@@ -750,7 +748,7 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *sc, bool a_is_r
sc->ev.events = events;
if (sc->worker)
if (sc->worker && sc->server->type != DAP_SERVER_UDP)
if ( epoll_ctl(sc->worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &sc->ev) ){
int l_errno = errno;
char l_errbuf[128];
......
......@@ -64,7 +64,6 @@ static dap_events_socket_t * s_es_server_create(dap_events_t * a_events, int a_s
static void s_es_server_accept(dap_events_socket_t *a_es, int a_remote_socket, struct sockaddr* a_remote_addr);
static void s_es_server_error(dap_events_socket_t *a_es, int a_arg);
static void s_es_server_new(dap_events_socket_t *a_es, void * a_arg);
static void s_server_delete(dap_server_t * a_server);
/**
* @brief dap_server_init
* @return
......@@ -86,15 +85,24 @@ void dap_server_deinit()
* @brief dap_server_delete
* @param a_server
*/
void s_server_delete(dap_server_t * a_server)
void dap_server_delete(dap_server_t *a_server)
{
while (a_server->es_listeners) {
dap_events_socket_t *l_es = (dap_events_socket_t *)a_server->es_listeners->data;
dap_events_socket_remove_and_delete_mt(l_es->worker, l_es);
dap_list_t *l_tmp = a_server->es_listeners;
a_server->es_listeners = l_tmp->next;
DAP_DELETE(l_tmp);
}
if(a_server->delete_callback)
a_server->delete_callback(a_server,NULL);
if( a_server->address )
DAP_DELETE(a_server->address );
if( a_server->_inheritor )
DAP_DELETE( a_server->_inheritor );
DAP_DELETE(a_server);
if( a_server->address )
DAP_DELETE(a_server->address );
if( a_server->_inheritor )
DAP_DELETE( a_server->_inheritor );
pthread_mutex_destroy(&a_server->started_mutex);
pthread_cond_destroy(&a_server->started_cond);
DAP_DELETE(a_server);
}
/**
......@@ -105,7 +113,7 @@ void s_server_delete(dap_server_t * a_server)
* @param a_type
* @return
*/
dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16_t a_port, dap_server_type_t a_type)
dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16_t a_port, dap_server_type_t a_type, dap_events_socket_callbacks_t *a_callbacks)
{
assert(a_events);
dap_server_t *l_server = DAP_NEW_Z(dap_server_t);
......@@ -117,6 +125,8 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16
if(l_server->type == DAP_SERVER_TCP)
l_server->socket_listener = socket(AF_INET, SOCK_STREAM, 0);
else if (l_server->type == DAP_SERVER_UDP)
l_server->socket_listener = socket(AF_INET, SOCK_DGRAM, 0);
if (l_server->socket_listener < 0) {
int l_errno = errno;
......@@ -162,13 +172,20 @@ dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16
l_callbacks.accept_callback = s_es_server_accept;
l_callbacks.error_callback = s_es_server_error;
if (a_callbacks) {
l_callbacks.read_callback = a_callbacks->read_callback;
l_callbacks.write_callback = a_callbacks->write_callback;
l_callbacks.error_callback = a_callbacks->error_callback;
}
for(size_t l_worker_id = 0; l_worker_id < dap_events_worker_get_count() ; l_worker_id++){
dap_worker_t *l_w = dap_events_worker_get(l_worker_id);
assert(l_w);
dap_events_socket_t * l_es = dap_events_socket_wrap2( l_server, a_events, l_server->socket_listener, &l_callbacks);
l_server->es_listeners = dap_list_append(l_server->es_listeners, l_es);
if ( l_es){
l_es->type = DESCRIPTOR_TYPE_SOCKET_LISTENING;
if (l_es) {
l_es->type = l_server->type == DAP_SERVER_TCP ? DESCRIPTOR_TYPE_SOCKET_LISTENING : DESCRIPTOR_TYPE_SOCKET;
#ifdef DAP_EVENTS_CAPS_EPOLL
// Prepare for multi thread listening
l_es->ev_base_flags = EPOLLET| EPOLLIN | EPOLLEXCLUSIVE;
......@@ -259,7 +276,6 @@ static dap_events_socket_t * s_es_server_create(dap_events_t * a_events, int a_s
//fcntl(a_sock, F_SETFL, O_NONBLOCK);
ret = dap_events_socket_wrap_no_add(a_events, a_sock, a_callbacks);
ret->is_dont_reset_write_flag = true;
ret->type = DESCRIPTOR_TYPE_SOCKET;
ret->server = a_server;
......
......@@ -27,6 +27,9 @@
#define _GNU_SOURCE /* See feature_test_macros(7) */
#endif
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include "dap_common.h"
#include "dap_math_ops.h"
......@@ -190,8 +193,15 @@ void *dap_worker_thread(void *arg)
break;
case DESCRIPTOR_TYPE_SOCKET:
l_must_read_smth = true;
l_bytes_read = recv(l_cur->fd, (char *) (l_cur->buf_in + l_cur->buf_in_size),
sizeof(l_cur->buf_in) - l_cur->buf_in_size, 0);
if (l_cur->server->type == DAP_SERVER_TCP) {
l_bytes_read = recv(l_cur->fd, (char *) (l_cur->buf_in + l_cur->buf_in_size),
sizeof(l_cur->buf_in) - l_cur->buf_in_size, 0);
} else if (l_cur->server->type == DAP_SERVER_UDP) {
socklen_t l_size = sizeof(l_cur->remote_addr);
l_bytes_read = recvfrom(l_cur->fd, (char *) (l_cur->buf_in + l_cur->buf_in_size),
sizeof(l_cur->buf_in) - l_cur->buf_in_size, 0,
(struct sockaddr *)&l_cur->remote_addr, &l_size);
}
l_errno = errno;
break;
case DESCRIPTOR_TYPE_SOCKET_LISTENING:
......@@ -208,6 +218,7 @@ void *dap_worker_thread(void *arg)
char l_errbuf[128];
strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
log_it(L_WARNING,"accept() on socket %d error:\"%s\"(%d)",l_cur->socket, l_errbuf,l_errno);
break;
}
}
......@@ -244,7 +255,8 @@ void *dap_worker_thread(void *arg)
continue;
}
}else{
log_it(L_WARNING, "We have incomming %u data but no read callback on socket %d, removing from read set", l_cur->socket);
log_it(L_WARNING, "We have incomming %u data but no read callback on socket %d, removing from read set",
l_bytes_read, l_cur->socket);
dap_events_socket_set_readable_unsafe(l_cur,false);
}
}
......@@ -299,8 +311,14 @@ void *dap_worker_thread(void *arg)
int l_errno;
switch (l_cur->type){
case DESCRIPTOR_TYPE_SOCKET:
l_bytes_sent = send(l_cur->socket, l_cur->buf_out,
l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL);
if (l_cur->server->type == DAP_SERVER_TCP) {
l_bytes_sent = send(l_cur->socket, (const char *)l_cur->buf_out,
l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL);
} else if (l_cur->server->type == DAP_SERVER_UDP) {
l_bytes_sent = sendto(l_cur->socket, (const char *)l_cur->buf_out,
l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *)&l_cur->remote_addr, sizeof(l_cur->remote_addr));
}
l_errno = errno;
break;
case DESCRIPTOR_TYPE_PIPE:
......
......@@ -144,11 +144,6 @@ typedef struct dap_events_socket {
uint32_t buf_out_zero_count;
// Flags
bool is_pingable;
bool is_read_direct; // If set - don't call read() in worker, let operate with handler to callback
bool is_dont_reset_write_flag; // If set - don't reset write flag ever data is over
// Input section
union{
uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data
......@@ -165,7 +160,7 @@ typedef struct dap_events_socket {
// Stored string representation
char hostaddr[1024]; // Address
char service[128];
struct sockaddr remote_addr;
struct sockaddr_in remote_addr; // For UDP datagrams
// Links to related objects
dap_events_t *events;
......
......@@ -48,10 +48,11 @@
#include "uthash.h"
#include "utlist.h"
#include "dap_list.h"
#include "dap_cpu_monitor.h"
#include "dap_events_socket.h"
typedef enum dap_server_type {DAP_SERVER_TCP} dap_server_type_t;
typedef enum dap_server_type {DAP_SERVER_TCP, DAP_SERVER_UDP} dap_server_type_t;
......@@ -66,7 +67,7 @@ typedef struct dap_server {
char *address; // Listen address
int32_t socket_listener; // Socket for listener
dap_events_socket_t * es_listener;
dap_list_t *es_listeners;
struct sockaddr_in listener_addr; // Kernel structure for listener's binded address
......@@ -85,4 +86,5 @@ typedef struct dap_server {
int dap_server_init( ); // Init server module
void dap_server_deinit( void ); // Deinit server module
dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16_t a_port, dap_server_type_t a_type);
dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16_t a_port, dap_server_type_t a_type, dap_events_socket_callbacks_t *a_callbacks);
void dap_server_delete(dap_server_t *a_server);
cmake_minimum_required(VERSION 3.1)
project (dap_udp_server C)
file(GLOB DAP_UDP_SERVER_SRCS *.c)
file(GLOB DAP_UDP_SERVER_HEADERS include/*.h)
if(WIN32)
include_directories(../../../3rdparty/wepoll/)
include_directories(../../../3rdparty/uthash/src/)
#include_directories(../3rdparty/curl/include/)
endif()
add_library(${PROJECT_NAME} STATIC ${DAP_UDP_SERVER_SRCS} ${DAP_UDP_SERVER_HEADERS})
target_link_libraries(${PROJECT_NAME} dap_core dap_server_core)
target_include_directories(${PROJECT_NAME} INTERFACE .)
target_include_directories(${PROJECT_NAME} PUBLIC include)
/*
Copyright (c) 2017-2019 (c) Project "DeM Labs Inc" https://demlabs.net
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdarg.h>
#include <stdint.h>
#ifndef _WIN32
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/select.h>
#include <sys/queue.h>
#include <errno.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#else
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#include <pthread.h>
#endif
#include "uthash.h"
#include "utlist.h"
#include "dap_common.h"
#include "dap_udp_client.h"
#include "dap_udp_server.h"
#define LOG_TAG "udp_client"
/**
* @brief get_key Make key for hash table from host and port
* @return 64 bit Key
*/
#define get_key( host, key ) (((uint64_t)host << 32) + (uint64_t)port)
extern bool sb_payload_ready;
/**
* @brief udp_client_create Create new client and add it to hashmap
* @param sh Server instance
* @param host Client host address
* @param w_client Clients event loop watcher
* @param port Client port
* @return Pointer to the new list's node
*/
dap_events_socket_t *dap_udp_client_create( dap_server_t *dap_srv, EPOLL_HANDLE efd, unsigned long host, unsigned short port )
{
dap_udp_server_t *udp_server = DAP_UDP_SERVER( dap_srv );
log_it( L_DEBUG, "Client structure create with host = %x, port = %d", host, port );
dap_udp_client_t *inh = DAP_NEW_Z( dap_udp_client_t );
inh->host_key = get_key( host, port );
dap_events_socket_t *ret = dap_events_socket_wrap_no_add( dap_srv->es_listener->events, dap_srv->socket_listener, &dap_srv->client_callbacks);
inh->esocket = ret;
ret->server = dap_srv;
ret->flags = DAP_SOCK_READY_TO_READ;
// ret->signal_close = false;
// ret->_ready_to_read = true;
// ret->_ready_to_write = false;
ret->_inheritor = inh;
pthread_mutex_init( &inh->mutex_on_client, NULL );
pthread_mutex_lock( &udp_server->mutex_on_list );
HASH_ADD_INT( udp_server->hclients, host_key, inh );
pthread_mutex_unlock( &udp_server->mutex_on_list );
if( dap_srv->client_callbacks.new_callback )
dap_srv->client_callbacks.new_callback( ret, NULL ); // Init internal structure
return ret;
}
/**
* @brief udp_client_get_address Get host address and port of client
* @param client Pointer to client structure
* @param host Variable for host address
* @param host Variable for port
*/
void dap_udp_client_get_address( dap_events_socket_t *client, unsigned int* host, unsigned short* port )
{
dap_udp_client_t* udp_client = DAP_UDP_CLIENT( client );
*host = udp_client->host_key >> 32;
*port = udp_client->host_key;
}
/**
* @brief udp_client_find Find client structure by host address and port
* @param sh Server instance
* @param host Source host address
* @param port Source port
* @return Pointer to client or NULL if not found
*/
dap_events_socket_t *dap_udp_client_find( dap_server_t *dap_srv, unsigned long host, unsigned short port )
{
dap_udp_client_t *inh = NULL;
dap_udp_server_t *udp_server = DAP_UDP_SERVER( dap_srv );
uint64_t token = get_key( host, port );
pthread_mutex_lock( &udp_server->mutex_on_list );
HASH_FIND_INT( udp_server->hclients, &token, inh );
pthread_mutex_unlock( &udp_server->mutex_on_list );
if( inh == NULL )
return NULL;
else
return inh->esocket;
}
/**
* @brief add_waiting_client Add Client to write queue
* @param client Client instance
*/
void add_waiting_client( dap_events_socket_t *dap_rclient )
{
dap_udp_client_t* udp_cl, *tmp;
dap_server_t *dap_srv = dap_rclient->server;
dap_udp_server_t *udp_server = DAP_UDP_SERVER( dap_srv );
dap_udp_client_t *udp_client = DAP_UDP_CLIENT( dap_rclient );
pthread_mutex_lock( &udp_server->mutex_on_list );
LL_FOREACH_SAFE( udp_server->waiting_clients, udp_cl, tmp ) {
if( udp_cl == udp_client ) {
pthread_mutex_unlock( &udp_server->mutex_on_list );
return;
}
}
LL_APPEND( udp_server->waiting_clients, udp_client );
pthread_mutex_unlock( &udp_server->mutex_on_list );
}
size_t dap_udp_client_write_unsafe( dap_events_socket_t *dap_rclient, const void *data, size_t data_size )
{
size_t size = dap_events_socket_write_unsafe( dap_rclient, data, data_size );
add_waiting_client( dap_rclient );
return size;
}
size_t dap_udp_client_write_f( dap_events_socket_t *dap_rclient, const char * a_format, ... )
{
size_t size = 0;
va_list va;
va_start( va, a_format );
size = dap_events_socket_write_f_unsafe( dap_rclient, a_format, va );
va_end( va );
add_waiting_client( dap_rclient );
return size;
}
/*
Copyright (c) 2017-2019 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include <time.h>
#include <stdio.h>
#include <stdlib.h>
#include <stddef.h>
#include <signal.h>
#include <stdint.h>
#ifndef _WIN32
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/select.h>
#include <sys/queue.h>
#include <errno.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#else
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#include <pthread.h>
#endif
#include "uthash.h"
#include "utlist.h"
#include "dap_common.h"
#include "dap_udp_server.h"
#define LOG_TAG "dap_udp_server"
#define BUFSIZE 1024
char buf[ BUFSIZE ]; /* message buf */
bool sb_payload_ready;
//struct ev_io w_read;
//struct ev_io w_write;
EPOLL_HANDLE efd_read = (EPOLL_HANDLE)-1;
//static void write_cb( EPOLL_HANDLE efd, int revents );
int check_close( dap_events_socket_t *client );
/**
*/
static void error( char *msg ) {
perror( msg );
exit( 1 );
}
/**
* @brief dap_udp_server_new Initialize server structure
* @return Server pointer
*/
dap_server_t *dap_udp_server_new( )
{
dap_udp_server_t *udp_server = (dap_udp_server_t *)calloc( 1, sizeof(dap_udp_server_t) );
udp_server->waiting_clients = NULL;
dap_server_t *sh = (dap_server_t *) calloc( 1, sizeof(dap_server_t) );
sh->_inheritor = udp_server;
udp_server->dap_server = sh;
return sh;
}
/**
* @brief dap_udp_server_delete Safe delete server structure
* @param sh Server instance
*/
void dap_udp_server_delete( dap_server_t *sh )
{
if ( !sh ) return;
// dap_client_remote_t *client, *tmp;
// dap_udp_server_t *udps = (dap_udp_server_t *)sh->_inheritor;
// if ( !udps ) return;
if( sh->address )
free( sh->address );
// HASH_ITER( hh, udps->hclients, client, tmp )
// dap_client_remote_remove( client );
if ( sh->delete_callback )
sh->delete_callback( sh, NULL );
if ( sh->_inheritor )
free( sh->_inheritor );
free( sh );
}
/**
* @brief dap_udp_server_listen Create and bind server structure
* @param port Binding port
* @return Server instance
*/
dap_server_t *dap_udp_server_listen( uint16_t port ) {
dap_server_t *sh = dap_udp_server_new( );
sh->socket_listener = socket( AF_INET, SOCK_DGRAM, 0 );
if ( sh->socket_listener < 0 ) {
log_it ( L_ERROR, "Socket error %s", strerror(errno) );
dap_udp_server_delete( sh );
return NULL;
}
int optval = 1;
if ( setsockopt( sh->socket_listener, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval , sizeof(int)) < 0 )
log_it( L_WARNING, "Can't set up REUSEADDR flag to the socket" );
memset( (char *)&(sh->listener_addr), 0, sizeof(sh->listener_addr) );
sh->listener_addr.sin_family = AF_INET;
sh->listener_addr.sin_addr.s_addr = htonl( INADDR_ANY );
sh->listener_addr.sin_port = htons( port );
if ( bind(sh->socket_listener, (struct sockaddr *) &(sh->listener_addr), sizeof(sh->listener_addr)) < 0) {
log_it( L_ERROR, "Bind error: %s", strerror(errno) );
dap_udp_server_delete( sh );
return NULL;
}
log_it(L_INFO, "UDP server listening port 0.0.0.0:%d", port);
pthread_mutex_init( &DAP_UDP_SERVER(sh)->mutex_on_list, NULL );
pthread_mutex_init( &DAP_UDP_SERVER(sh)->mutex_on_hash, NULL );
return sh;
}
/**
* @brief write_cb
*/
static void write_cb( EPOLL_HANDLE efd, int revents, dap_server_t *sh )
{
UNUSED(revents);
dap_udp_client_t *udp_client, *tmp;
// dap_server_t *sh = watcher->data;
dap_udp_server_t *udp = DAP_UDP_SERVER( sh );
pthread_mutex_lock( &udp->mutex_on_list );
LL_FOREACH_SAFE( udp->waiting_clients, udp_client, tmp ) {
//log_it(L_INFO,"write_cb");
//pthread_mutex_lock(&udp_client->mutex_on_client);
dap_events_socket_t *client = udp_client->esocket;
if( client != NULL && !check_close(client) && (client->flags & DAP_SOCK_READY_TO_WRITE) ) {
if ( client->buf_out_size > 0 ) {
struct sockaddr_in addr;
addr.sin_family = AF_INET;
dap_udp_client_get_address( client, (unsigned int *)&addr.sin_addr.s_addr, &addr.sin_port );
//log_it(L_INFO,"write_cb_client host = %x, port = %d, socket = %x", addr.sin_addr.s_addr, addr.sin_port, sh->socket_listener);
for( size_t total_sent = 0; total_sent < client->buf_out_size; ) {
int bytes_sent = sendto( sh->socket_listener, client->buf_out + total_sent,
client->buf_out_size - total_sent, 0, (struct sockaddr*) &addr, sizeof(addr) );
if ( bytes_sent < 0 ) {
log_it(L_ERROR,"Some error occured in send() function");
break;
}
total_sent += bytes_sent;
}
client->buf_out_size = 0;
memset( client->buf_out, 0, sizeof(client->buf_out) );
client->flags &= ~DAP_SOCK_READY_TO_WRITE;
sb_payload_ready = false;
}
LL_DELETE( udp->waiting_clients, udp_client );
if ( sh->client_callbacks.write_callback )
sh->client_callbacks.write_callback( client, NULL );
}
else if( client == NULL ) {
LL_DELETE( udp->waiting_clients, udp_client );
}
//pthread_mutex_unlock(&udp_client->mutex_on_client);
} // for client
pthread_mutex_unlock(&udp->mutex_on_list);
}
/**
* @brief check_close Check if client need to close
* @param client Client structure
* @return 1 if client deleted, 0 if client is no need to delete
*/
int check_close( dap_events_socket_t *client )
{
dap_udp_client_t *client_check, *tmp;
if( !(client->flags & DAP_SOCK_SIGNAL_CLOSE) )
return 0;
dap_udp_client_t *udp_client = DAP_UDP_CLIENT( client );
dap_server_t *sh = client->server;
dap_udp_server_t *udp_server = DAP_UDP_SERVER( sh );
LL_FOREACH_SAFE( udp_server->waiting_clients, client_check, tmp ) {
if ( client_check->host_key == udp_client->host_key )
LL_DELETE( udp_server->waiting_clients, client_check );
}
dap_events_socket_remove_and_delete_mt(client->worker, client );
return 1;
}
/**
* @brief read_cb
*/
static void read_cb( EPOLL_HANDLE efd, int revents, dap_server_t *sh )
{
UNUSED(revents);
// if ( !(revents & EV_READ) ) return;
struct sockaddr_in clientaddr;
socklen_t clientlen = sizeof(clientaddr);
// dap_server_t *sh = watcher->data;
memset( buf, 0, BUFSIZE );
int32_t bytes = (int32_t) recvfrom( sh->socket_listener, buf, BUFSIZE, 0,(struct sockaddr *) &clientaddr, &clientlen );
dap_events_socket_t *client = dap_udp_client_find( sh, clientaddr.sin_addr.s_addr, clientaddr.sin_port );
if( client != NULL && check_close(client) != 0 )
return;
if ( bytes > 0 ) {
char *hostaddrp = inet_ntoa( clientaddr.sin_addr );
if ( hostaddrp == NULL ) {
dap_udp_server_delete( sh );
error("ERROR on inet_ntoa\n");
}
if ( client == NULL ) {
client = dap_udp_client_create( sh, efd, clientaddr.sin_addr.s_addr, clientaddr.sin_port );
if(client == NULL) {
dap_udp_server_delete( sh );
error("ERROR create client structure\n");
}
}
dap_udp_client_t* udp_client = client->_inheritor;
pthread_mutex_lock( &udp_client->mutex_on_client );
size_t bytes_processed = 0;
size_t bytes_recieved = bytes;
while ( bytes_recieved > 0 ) {
size_t bytes_to_transfer = 0;
if ( bytes_recieved > UDP_CLIENT_BUF - client->buf_in_size )
bytes_to_transfer = UDP_CLIENT_BUF - client->buf_in_size;
else
bytes_to_transfer = bytes_recieved;
memcpy( client->buf_in + client->buf_in_size,buf + bytes_processed, bytes_to_transfer );
client->buf_in_size += bytes_to_transfer;
if ( sh->client_callbacks.read_callback )
sh->client_callbacks.read_callback( client, NULL );
bytes_processed += bytes_to_transfer;
bytes_recieved -= bytes_to_transfer;
}
client->buf_in_size = 0;
memset( client->buf_in, 0, sizeof(client->buf_out) );
pthread_mutex_unlock( &udp_client->mutex_on_client );
}
else if ( bytes < 0 ) {
log_it( L_ERROR, "Bytes read Error %s", strerror(errno) );
if( client != NULL )
client->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
else if (bytes == 0) {
if ( client != NULL )
client->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
}
/**
* @brief dap_udp_server_loop Start server event loop
* @param sh Server instance
*/
void dap_udp_server_loop( dap_server_t *d_server )
{
efd_read = epoll_create1( 0 );
if ( (intptr_t)efd_read == -1 ) {
log_it( L_ERROR, "epoll_create1 failed" );
goto udp_error;
}
sb_payload_ready = false;
struct epoll_event pev = {0, {0}};
struct epoll_event events[ 16 ] = {{0, {0}}};
pev.events = EPOLLIN | EPOLLERR;
pev.data.fd = d_server->socket_listener;
if ( epoll_ctl( efd_read, EPOLL_CTL_ADD, d_server->socket_listener, &pev) != 0 ) {
log_it( L_ERROR, "epoll_ctl failed 000" );
goto udp_error;
}
while( 1 ) {
int32_t n = epoll_wait( efd_read, &events[0], 16, -1 );
if ( !n ) continue;
if ( n < 0 ) {
if ( errno == EINTR )
continue;
log_it( L_ERROR, "Server epoll error" );
break;
}
for( int32_t i = 0; i < n; ++ i ) {
if ( events[i].events & EPOLLIN ) {
read_cb( efd_read, events[i].events, d_server );
}
if ( events[i].events & EPOLLOUT) {
// Do nothing. It always true until socket eturn EAGAIN
}
if (sb_payload_ready) {
write_cb( efd_read, events[i].events, d_server );
}
if( events[i].events & EPOLLERR ) {
log_it( L_ERROR, "Server socket error event" );
goto udp_error;
}
}
}
udp_error:
#ifndef _WIN32
if ( efd_read != -1 )
close( efd_read );
#else
if ( efd_read != INVALID_HANDLE_VALUE )
epoll_close( efd_read );
#endif
return;
}
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <stdint.h>
#include <stddef.h>
#include <stdbool.h>
#ifndef WIN32
#include <sys/queue.h>
#endif
#include "dap_events_socket.h"
#include "dap_server.h"
#include "uthash.h"
typedef struct dap_udp_server dap_udp_server_t;
struct dap_udp_client;
#define UDP_CLIENT_BUF 65535
typedef struct dap_udp_client {
dap_events_socket_t *esocket;
uint64_t host_key; //key contains host address in first 4 bytes and port in last 4 bytes
UT_hash_handle hh;
struct dap_udp_client *next, *prev; //pointers for writing queue
pthread_mutex_t mutex_on_client;
void *_inheritor; // Internal data to specific client type, usualy states for state machine
} dap_udp_client_t; // Node of bidirectional list of clients
#define DAP_UDP_CLIENT(a) ((dap_udp_client_t *) (a)->_inheritor)
dap_events_socket_t *dap_udp_client_create( dap_server_t *sh, EPOLL_HANDLE efd, unsigned long host, unsigned short port ); // Create new client and add it to the list
dap_events_socket_t *dap_udp_client_find( dap_server_t *sh, unsigned long host, unsigned short port ); // Find client by host and port
void dap_udp_client_ready_to_read( dap_events_socket_t *sc, bool is_ready );
void dap_udp_client_ready_to_write( dap_events_socket_t *sc, bool is_ready );
size_t dap_udp_client_write_unsafe( dap_events_socket_t *sc, const void * data, size_t data_size );
size_t dap_udp_client_write_f( dap_events_socket_t *a_client, const char * a_format, ... );
void add_waiting_client( dap_events_socket_t *client ); // Add client to writing queue
void dap_udp_client_get_address( dap_events_socket_t *client, unsigned int *host, unsigned short *port );
/*
Copyright (c) 2017-2019 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#ifndef WIN32
#include <stdint.h>
#include <sys/socket.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/select.h>
#include <sys/queue.h>
#define EPOLL_HANDLE int
#endif
#include "dap_udp_client.h"
#include "dap_server.h"
struct dap_udp_server;
typedef struct dap_udp_thread {
pthread_t tid;
} dap_udp_thread_t;
typedef void (*dap_udp_server_callback_t) (struct dap_udp_server *,void *arg); // Callback for specific server's operations
typedef struct dap_udp_server {
dap_udp_client_t *hclients;
dap_udp_client_t *waiting_clients; // List clients for writing data
pthread_mutex_t mutex_on_list;
pthread_mutex_t mutex_on_hash;
void *_inheritor;
dap_server_t *dap_server;
} dap_udp_server_t;
#define DAP_UDP_SERVER(a) ((dap_udp_server_t *) (a)->_inheritor)
void dap_udp_server_delete( dap_server_t *sh );
void dap_udp_server_loop( dap_server_t *udp_server ); // Start server event loop
dap_server_t *dap_udp_server_listen( uint16_t port ); // Create and bind serv
cmake_minimum_required(VERSION 3.0)
project(udp-server-test)
set(CMAKE_C_STANDARD 11)
if ( NOT ( TARGET dap_test ) )
add_subdirectory(libdap-test)
endif()
file(GLOB SRC *.h *.c)
add_executable(${PROJECT_NAME} ${SRC})
target_link_libraries(${PROJECT_NAME} dap_test)
add_test(
NAME udp-server-test
COMMAND udp-server-test
)
#include <stdio.h>
int main()
{
return 0;
}
......@@ -376,7 +376,6 @@ static void s_http_client_data_write( dap_http_client_t * a_http_client, void *a
l_http_simple->reply_sent += dap_events_socket_write_unsafe( a_http_client->esocket,
l_http_simple->reply_byte + l_http_simple->reply_sent,
a_http_client->out_content_length - l_http_simple->reply_sent );
dap_events_socket_set_writable_unsafe(a_http_client->esocket, true);
if ( l_http_simple->reply_sent >= a_http_client->out_content_length ) {
log_it(L_INFO, "All the reply (%u) is sent out", a_http_client->out_content_length );
......
......@@ -11,7 +11,7 @@ endif()
add_library(${PROJECT_NAME} STATIC ${DAP_STREAM_CH_SRCS} ${DAP_STREAM_CH_HDRS})
target_link_libraries(dap_stream_ch dap_core dap_crypto dap_udp_server dap_stream )
target_link_libraries(dap_stream_ch dap_core dap_crypto dap_stream )
target_include_directories(dap_stream_ch INTERFACE .)
target_include_directories(${PROJECT_NAME} PUBLIC include)
......@@ -12,7 +12,7 @@ endif()
add_library(${PROJECT_NAME} STATIC ${STREAM_SRCS} ${STREAM_HDRS})
target_link_libraries(dap_stream dap_core dap_server_core dap_udp_server dap_crypto
target_link_libraries(dap_stream dap_core dap_server_core dap_crypto
dap_http_server dap_enc_server dap_session dap_stream_ch)
target_include_directories(dap_stream INTERFACE .)
......