Skip to content
Snippets Groups Projects
Commit 619e7a29 authored by anta999's avatar anta999
Browse files

test1

parent 2a075869
No related branches found
No related tags found
No related merge requests found
...@@ -18,18 +18,43 @@ ...@@ -18,18 +18,43 @@
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "dap_udp_client.h"
#include <sys/epoll.h>
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <stdarg.h> #include <stdarg.h>
#include <unistd.h>
#include <string.h>
#include <stdint.h> #include <stdint.h>
#include <ev.h>
#ifndef _WIN32
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/select.h>
#include <sys/queue.h>
#include <errno.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#else
#undef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#include "wrappers.h"
#include <wepoll.h>
#include <pthread.h>
#endif
#include "uthash.h"
#include "utlist.h" #include "utlist.h"
#include "dap_common.h" #include "dap_common.h"
#include "dap_udp_client.h"
#include "dap_udp_server.h" #include "dap_udp_server.h"
#define LOG_TAG "udp_client" #define LOG_TAG "udp_client"
...@@ -38,12 +63,7 @@ ...@@ -38,12 +63,7 @@
* @brief get_key Make key for hash table from host and port * @brief get_key Make key for hash table from host and port
* @return 64 bit Key * @return 64 bit Key
*/ */
uint64_t get_key(unsigned long host,unsigned short port){ #define get_key( host, key ) (((uint64_t)host << 32) + (uint64_t)port)
uint64_t key = host;
key = key << 32;
key += port;
return key;
}
/** /**
* @brief udp_client_create Create new client and add it to hashmap * @brief udp_client_create Create new client and add it to hashmap
...@@ -53,32 +73,40 @@ uint64_t get_key(unsigned long host,unsigned short port){ ...@@ -53,32 +73,40 @@ uint64_t get_key(unsigned long host,unsigned short port){
* @param port Client port * @param port Client port
* @return Pointer to the new list's node * @return Pointer to the new list's node
*/ */
dap_client_remote_t * dap_udp_client_create(dap_server_t * sh, ev_io* w_client, unsigned long host, unsigned short port) dap_client_remote_t *dap_udp_client_create( dap_server_t *dap_srv, EPOLL_HANDLE efd, unsigned long host, unsigned short port )
{ {
dap_udp_server_t* udp_server = DAP_UDP_SERVER(sh); dap_udp_server_t *udp_server = DAP_UDP_SERVER( dap_srv );
log_it(L_DEBUG,"Client structure create"); log_it( L_DEBUG, "Client structure create" );
dap_udp_client_t * inh=DAP_NEW_Z(dap_udp_client_t); dap_udp_client_t *inh = DAP_NEW_Z( dap_udp_client_t );
inh->host_key = get_key(host,port); inh->host_key = get_key( host, port );
dap_client_remote_t * ret=DAP_NEW_Z(dap_client_remote_t); dap_client_remote_t *ret = DAP_NEW_Z( dap_client_remote_t );
inh->client = ret; inh->client = ret;
ret->server = sh;
ret->watcher_client = w_client; ret->pevent.events = EPOLLIN | EPOLLERR;
ret->signal_close = false; ret->pevent.data.fd = dap_srv->socket_listener;
ret->_ready_to_read=true;
ret->_ready_to_write=false; ret->server = dap_srv;
ret->_inheritor = inh; ret->efd = efd;
pthread_mutex_init(&inh->mutex_on_client, NULL);
ret->flags = DAP_SOCK_READY_TO_READ;
pthread_mutex_lock(&udp_server->mutex_on_list); // ret->signal_close = false;
HASH_ADD_INT( udp_server->clients, host_key, inh); // ret->_ready_to_read = true;
pthread_mutex_unlock(&udp_server->mutex_on_list); // ret->_ready_to_write = false;
if(sh->client_new_callback)
sh->client_new_callback(ret,NULL); // Init internal structure ret->_inheritor = inh;
pthread_mutex_init( &inh->mutex_on_client, NULL );
return ret;
pthread_mutex_lock( &udp_server->mutex_on_list );
HASH_ADD_INT( udp_server->clients, host_key, inh );
pthread_mutex_unlock( &udp_server->mutex_on_list );
if( dap_srv->client_new_callback )
dap_srv->client_new_callback( ret, NULL ); // Init internal structure
return ret;
} }
/** /**
...@@ -87,10 +115,11 @@ dap_client_remote_t * dap_udp_client_create(dap_server_t * sh, ev_io* w_client, ...@@ -87,10 +115,11 @@ dap_client_remote_t * dap_udp_client_create(dap_server_t * sh, ev_io* w_client,
* @param host Variable for host address * @param host Variable for host address
* @param host Variable for port * @param host Variable for port
*/ */
void dap_udp_client_get_address(dap_client_remote_t *client, unsigned int* host,unsigned short* port){ void dap_udp_client_get_address( dap_client_remote_t *client, unsigned int* host, unsigned short* port )
dap_udp_client_t* udp_client = DAP_UDP_CLIENT(client); {
*host = udp_client->host_key >> 32; dap_udp_client_t* udp_client = DAP_UDP_CLIENT( client );
*port = (udp_client->host_key <<32) - *host; *host = udp_client->host_key >> 32;
*port = (udp_client->host_key <<32) - *host;
} }
/** /**
...@@ -100,106 +129,123 @@ void dap_udp_client_get_address(dap_client_remote_t *client, unsigned int* host, ...@@ -100,106 +129,123 @@ void dap_udp_client_get_address(dap_client_remote_t *client, unsigned int* host,
* @param port Source port * @param port Source port
* @return Pointer to client or NULL if not found * @return Pointer to client or NULL if not found
*/ */
dap_client_remote_t * dap_udp_client_find(dap_server_t * sh, unsigned long host,unsigned short port) dap_client_remote_t *dap_udp_client_find( dap_server_t * dap_srv, unsigned long host, unsigned short port )
{ {
dap_udp_server_t* udp_server = DAP_UDP_SERVER(sh); dap_udp_client_t *inh = NULL;
pthread_mutex_lock(&udp_server->mutex_on_list);
dap_udp_client_t* inh = NULL; dap_udp_server_t *udp_server = DAP_UDP_SERVER( dap_srv );
uint64_t token = get_key( host, port );
uint64_t token = get_key(host,port);
HASH_FIND_INT(udp_server->clients,&token,inh); pthread_mutex_lock( &udp_server->mutex_on_list );
HASH_FIND_INT( udp_server->clients, &token, inh );
pthread_mutex_unlock(&udp_server->mutex_on_list); pthread_mutex_unlock( &udp_server->mutex_on_list );
if(inh == NULL)
return NULL; if( inh == NULL )
else return NULL;
return inh->client; else
return inh->client;
} }
/** /**
* @brief udp_client_ready_to_read Set ready_to_read flag * @brief udp_client_ready_to_read Set ready_to_read flag
* @param sc Client structure * @param dap_rclient Client structure
* @param is_ready Flag value * @param is_ready Flag value
*/ */
void dap_udp_client_ready_to_read(dap_client_remote_t * sc,bool is_ready) void dap_udp_client_ready_to_read( dap_client_remote_t *sc, bool is_ready )
{ {
if(is_ready != sc->_ready_to_read) { if( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_READ) )
return;
uint32_t events = 0; if ( is_ready )
sc->_ready_to_read=is_ready; sc->flags |= DAP_SOCK_READY_TO_READ;
else
sc->flags ^= DAP_SOCK_READY_TO_READ;
if(sc->_ready_to_read) int events = EPOLLERR;
{
events |= EV_READ;
}
if(sc->_ready_to_write) if( sc->flags & DAP_SOCK_READY_TO_READ )
events |= EV_WRITE; events |= EPOLLIN;
ev_io_set(sc->watcher_client, sc->server->socket_listener, events ); if( sc->flags & DAP_SOCK_READY_TO_WRITE )
} events |= EPOLLOUT;
sc->pevent.events = events;
if( epoll_ctl(sc->efd, EPOLL_CTL_MOD, sc->server->socket_listener, &sc->pevent) != 0 ) {
log_it( L_ERROR, "epoll_ctl failed 002" );
}
} }
/** /**
* @brief udp_client_ready_to_write Set ready_to_write flag * @brief udp_client_ready_to_write Set ready_to_write flag
* @param sc Client structure * @param dap_rclient Client structure
* @param is_ready Flag value * @param is_ready Flag value
*/ */
void dap_udp_client_ready_to_write(dap_client_remote_t * sc,bool is_ready) void dap_udp_client_ready_to_write( dap_client_remote_t *sc, bool is_ready )
{ {
// if(is_ready) if ( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_WRITE) )
// add_waiting_client(sc); // Add client to writing queue return;
if(is_ready != sc->_ready_to_write) {
uint32_t events = 0; if ( is_ready )
sc->_ready_to_write=is_ready; sc->flags |= DAP_SOCK_READY_TO_WRITE;
else
if(sc->_ready_to_read) sc->flags ^= DAP_SOCK_READY_TO_WRITE;
events |= EV_READ;
int events = EPOLLERR;
if(sc->_ready_to_write)
{ if( sc->flags & DAP_SOCK_READY_TO_READ )
events |= EV_WRITE; events |= EPOLLIN;
}
int descriptor = sc->watcher_client->fd; if( sc->flags & DAP_SOCK_READY_TO_WRITE )
ev_io_set(sc->watcher_client, descriptor, events ); events |= EPOLLOUT;
}
sc->pevent.events = events;
if ( epoll_ctl(sc->efd, EPOLL_CTL_MOD, sc->pevent.data.fd, &sc->pevent) != 0 ) {
log_it( L_ERROR, "epoll_ctl failed 003" );
}
} }
/** /**
* @brief add_waiting_client Add Client to write queue * @brief add_waiting_client Add Client to write queue
* @param client Client instance * @param client Client instance
*/ */
void add_waiting_client(dap_client_remote_t* client){ void add_waiting_client( dap_client_remote_t *dap_rclient )
dap_server_t* sh = client->server; {
dap_udp_server_t* udp_server = DAP_UDP_SERVER(sh);
dap_udp_client_t* udp_client = DAP_UDP_CLIENT(client);
pthread_mutex_lock(&udp_server->mutex_on_list);
dap_udp_client_t* udp_cl, *tmp; dap_udp_client_t* udp_cl, *tmp;
LL_FOREACH_SAFE(udp_server->waiting_clients,udp_cl,tmp)
if(udp_cl == udp_client) dap_server_t *dap_srv = dap_rclient->server;
{ dap_udp_server_t *udp_server = DAP_UDP_SERVER( dap_srv );
pthread_mutex_unlock(&udp_server->mutex_on_list); dap_udp_client_t *udp_client = DAP_UDP_CLIENT( dap_rclient );
pthread_mutex_lock( &udp_server->mutex_on_list );
LL_FOREACH_SAFE( udp_server->waiting_clients, udp_cl, tmp ) {
if( udp_cl == udp_client ) {
pthread_mutex_unlock( &udp_server->mutex_on_list );
return; return;
} }
LL_APPEND(udp_server->waiting_clients, udp_client); }
pthread_mutex_unlock(&udp_server->mutex_on_list); LL_APPEND( udp_server->waiting_clients, udp_client );
pthread_mutex_unlock( &udp_server->mutex_on_list );
} }
size_t dap_udp_client_write(dap_client_remote_t *sc, const void * data, size_t data_size){ size_t dap_udp_client_write( dap_client_remote_t *dap_rclient, const void *data, size_t data_size )
size_t size = dap_client_remote_write(sc,data,data_size); {
add_waiting_client(sc); size_t size = dap_client_remote_write( dap_rclient, data, data_size );
add_waiting_client( dap_rclient );
return size; return size;
} }
size_t dap_udp_client_write_f(dap_client_remote_t *a_client, const char * a_format,...){ size_t dap_udp_client_write_f( dap_client_remote_t *dap_rclient, const char * a_format, ... )
{
size_t size = 0; size_t size = 0;
va_list ap; va_list va;
va_start(ap,a_format);
size =dap_client_remote_write_f(a_client,a_format,ap); va_start( va, a_format );
va_end(ap); size = dap_client_remote_write_f( dap_rclient, a_format, va );
add_waiting_client(a_client); va_end( va );
add_waiting_client( dap_rclient );
return size; return size;
} }
...@@ -18,49 +18,49 @@ ...@@ -18,49 +18,49 @@
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/ */
#pragma once #pragma once
#ifndef _UDP_CLIENT_H
#define _UDP_CLIENT_H
#include <sys/queue.h>
#include <stdint.h> #include <stdint.h>
#include <stddef.h> #include <stddef.h>
#include <stdbool.h> #include <stdbool.h>
#ifndef WIN32
#include <sys/queue.h> #include <sys/queue.h>
#endif
#include "uthash.h" #include "uthash.h"
#include "dap_client_remote.h"
#include <ev.h>
#include "dap_client_remote.h"
typedef struct dap_udp_server dap_udp_server_t; typedef struct dap_udp_server dap_udp_server_t;
struct dap_udp_client; struct dap_udp_client;
#define UDP_CLIENT_BUF 100000 #define UDP_CLIENT_BUF 65535
typedef struct dap_udp_client{ typedef struct dap_udp_client {
dap_client_remote_t* client;
dap_client_remote_t *client;
uint64_t host_key; //key contains host address in first 4 bytes and port in last 4 bytes uint64_t host_key; //key contains host address in first 4 bytes and port in last 4 bytes
UT_hash_handle hh; UT_hash_handle hh;
struct dap_udp_client *next, *prev; //pointers for writing queue struct dap_udp_client *next, *prev; //pointers for writing queue
pthread_mutex_t mutex_on_client; pthread_mutex_t mutex_on_client;
void * _inheritor; // Internal data to specific client type, usualy states for state machine void *_inheritor; // Internal data to specific client type, usualy states for state machine
} dap_udp_client_t; // Node of bidirectional list of clients } dap_udp_client_t; // Node of bidirectional list of clients
#define DAP_UDP_CLIENT(a) ((dap_udp_client_t *) (a)->_inheritor) #define DAP_UDP_CLIENT(a) ((dap_udp_client_t *) (a)->_inheritor)
dap_client_remote_t *dap_udp_client_create( dap_server_t *sh, EPOLL_HANDLE efd, unsigned long host, unsigned short port ); // Create new client and add it to the list
dap_client_remote_t *dap_udp_client_find( dap_server_t *sh, unsigned long host, unsigned short port ); // Find client by host and port
dap_client_remote_t * dap_udp_client_create(dap_server_t * sh, ev_io* w_client, unsigned long host, unsigned short port); // Create new client and add it to the list void dap_udp_client_ready_to_read( dap_client_remote_t *sc, bool is_ready );
dap_client_remote_t * dap_udp_client_find(dap_server_t * sh, unsigned long host, unsigned short port); // Find client by host and port void dap_udp_client_ready_to_write( dap_client_remote_t *sc, bool is_ready );
void dap_udp_client_ready_to_read(dap_client_remote_t * sc,bool is_ready); size_t dap_udp_client_write( dap_client_remote_t *sc, const void * data, size_t data_size );
void dap_udp_client_ready_to_write(dap_client_remote_t * sc,bool is_ready); size_t dap_udp_client_write_f( dap_client_remote_t *a_client, const char * a_format, ... );
size_t dap_udp_client_write(dap_client_remote_t *sc, const void * data, size_t data_size); void add_waiting_client( dap_client_remote_t *client ); // Add client to writing queue
size_t dap_udp_client_write_f(dap_client_remote_t *a_client, const char * a_format,...);
void add_waiting_client(dap_client_remote_t* client); // Add client to writing queue void dap_udp_client_get_address( dap_client_remote_t *client, unsigned int *host, unsigned short *port );
void dap_udp_client_get_address(dap_client_remote_t *client, unsigned int* host,unsigned short* port);
#endif
#include "dap_udp_server.h" /*
Copyright (c) 2017-2019 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include <time.h>
#include <stdio.h> #include <stdio.h>
#include "dap_common.h" #include <stdlib.h>
#include <stddef.h>
#include <signal.h>
#include <stdint.h>
#ifndef _WIN32
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/select.h>
#include <sys/queue.h>
#include <errno.h> #include <errno.h>
#include <netdb.h> #include <netdb.h>
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#include <ev.h> #include <sys/epoll.h>
#else
#undef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#include "wrappers.h"
#include <wepoll.h>
#include <pthread.h>
#endif
#include "uthash.h"
#include "utlist.h" #include "utlist.h"
#include "dap_common.h"
#include "dap_udp_server.h"
#define LOG_TAG "dap_udp_server" #define LOG_TAG "dap_udp_server"
#define BUFSIZE 1024 #define BUFSIZE 1024
char buf[BUFSIZE]; /* message buf */ char buf[ BUFSIZE ]; /* message buf */
struct ev_io w_read;
struct ev_io w_write;
static void write_cb(struct ev_loop* _loop, struct ev_io* watcher, int revents); //struct ev_io w_read;
int check_close(dap_client_remote_t* client); //struct ev_io w_write;
EPOLL_HANDLE efd_read = (EPOLL_HANDLE)-1;
EPOLL_HANDLE efd_write = (EPOLL_HANDLE)-1;
//static void write_cb( EPOLL_HANDLE efd, int revents );
int check_close( dap_client_remote_t *client );
/** /**
*/ */
void error(char *msg) { void error( char *msg ) {
perror(msg);
exit(1); perror( msg );
exit( 1 );
} }
/** /**
* @brief dap_udp_server_new Initialize server structure * @brief dap_udp_server_new Initialize server structure
* @return Server pointer * @return Server pointer
*/ */
dap_server_t * dap_udp_server_new() dap_server_t *dap_udp_server_new( )
{ {
dap_udp_server_t* udp_server = (dap_udp_server_t*)calloc(1,sizeof(dap_udp_server_t)); dap_udp_server_t *udp_server = (dap_udp_server_t *)calloc( 1, sizeof(dap_udp_server_t) );
udp_server->waiting_clients = NULL; udp_server->waiting_clients = NULL;
dap_server_t* sh = (dap_server_t*) calloc(1,sizeof(dap_server_t));
sh->_inheritor = udp_server;
udp_server->dap_server = sh;
return sh;
}
/** dap_server_t *sh = (dap_server_t *) calloc( 1, sizeof(dap_server_t) );
* @brief dap_udp_client_loop Create client listening event loop sh->_inheritor = udp_server;
*/
void* dap_udp_client_loop(void * arg) udp_server->dap_server = sh;
{
dap_server_t* sh = (dap_server_t*)arg; return sh;
log_it(L_NOTICE, "Start client listener thread");
struct ev_loop * ev_client_loop = ev_loop_new(0);
w_write.data = sh;
ev_io_init(&w_write, write_cb, sh->socket_listener, EV_WRITE);
ev_io_start(ev_client_loop, &w_write);
ev_loop(ev_client_loop, 0);
return NULL;
} }
/** /**
* @brief dap_udp_server_delete Safe delete server structure * @brief dap_udp_server_delete Safe delete server structure
* @param sh Server instance * @param sh Server instance
*/ */
void dap_udp_server_delete(dap_server_t * sh) void dap_udp_server_delete( dap_server_t *sh )
{ {
if(sh->address) dap_client_remote_t *client, *tmp;
free(sh->address);
if ( !sh ) return;
dap_client_remote_t * client, * tmp;
HASH_ITER(hh,sh->clients,client,tmp) if( sh->address )
dap_client_remote_remove(client, sh); free( sh->address );
if(sh->server_delete_callback) HASH_ITER( hh, sh->clients, client, tmp )
sh->server_delete_callback(sh,NULL); dap_client_remote_remove( client, sh );
if(sh->_inheritor)
free(sh->_inheritor); if ( sh->server_delete_callback )
free(sh); sh->server_delete_callback( sh, NULL );
if ( sh->_inheritor )
free( sh->_inheritor );
free( sh );
} }
/** /**
...@@ -80,80 +129,93 @@ void dap_udp_server_delete(dap_server_t * sh) ...@@ -80,80 +129,93 @@ void dap_udp_server_delete(dap_server_t * sh)
* @param port Binding port * @param port Binding port
* @return Server instance * @return Server instance
*/ */
dap_server_t * dap_udp_server_listen(uint16_t port){ dap_server_t *dap_udp_server_listen( uint16_t port ) {
dap_server_t* sh = dap_udp_server_new();
sh->socket_listener = socket (AF_INET, SOCK_DGRAM, 0); dap_server_t *sh = dap_udp_server_new( );
if (sh->socket_listener < 0){ sh->socket_listener = socket( AF_INET, SOCK_DGRAM, 0 );
log_it (L_ERROR,"Socket error %s",strerror(errno));
dap_udp_server_delete(sh);
return NULL;
}
int optval = 1; if ( sh->socket_listener < 0 ) {
if(setsockopt(sh->socket_listener, SOL_SOCKET, SO_REUSEADDR,(const void *)&optval , sizeof(int)) < 0) log_it ( L_ERROR, "Socket error %s", strerror(errno) );
log_it(L_WARNING, "Can't set up REUSEADDR flag to the socket"); dap_udp_server_delete( sh );
return NULL;
}
bzero((char *) &(sh->listener_addr), sizeof(sh->listener_addr)); int optval = 1;
sh->listener_addr.sin_family = AF_INET; if ( setsockopt( sh->socket_listener, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval , sizeof(int)) < 0 )
sh->listener_addr.sin_addr.s_addr = htonl(INADDR_ANY); log_it( L_WARNING, "Can't set up REUSEADDR flag to the socket" );
sh->listener_addr.sin_port = htons(port);
if(bind (sh->socket_listener, (struct sockaddr *) &(sh->listener_addr), sizeof(sh->listener_addr)) < 0) { memset( (char *)&(sh->listener_addr), 0, sizeof(sh->listener_addr) );
log_it(L_ERROR,"Bind error: %s",strerror(errno));
dap_udp_server_delete(sh); sh->listener_addr.sin_family = AF_INET;
return NULL; sh->listener_addr.sin_addr.s_addr = htonl( INADDR_ANY );
} sh->listener_addr.sin_port = htons( port );
pthread_mutex_init(&DAP_UDP_SERVER(sh)->mutex_on_list, NULL);
return sh; if ( bind(sh->socket_listener, (struct sockaddr *) &(sh->listener_addr), sizeof(sh->listener_addr)) < 0) {
log_it( L_ERROR, "Bind error: %s", strerror(errno) );
dap_udp_server_delete( sh );
return NULL;
}
pthread_mutex_init( &DAP_UDP_SERVER(sh)->mutex_on_list, NULL );
return sh;
} }
/** /**
* @brief write_cb * @brief write_cb
*/ */
static void write_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) static void write_cb( EPOLL_HANDLE efd, int revents, dap_server_t *sh )
{ {
if( ( revents & EV_WRITE ) ) { dap_udp_client_t *udp_client, *tmp;
dap_server_t* sh = watcher->data;
dap_udp_server_t* udp = DAP_UDP_SERVER(sh); // dap_server_t *sh = watcher->data;
dap_udp_client_t * udp_client, * tmp; dap_udp_server_t *udp = DAP_UDP_SERVER( sh );
pthread_mutex_lock(&udp->mutex_on_list);
LL_FOREACH_SAFE(udp->waiting_clients,udp_client,tmp) pthread_mutex_lock( &udp->mutex_on_list );
{
//log_it(L_INFO,"write_cb"); LL_FOREACH_SAFE( udp->waiting_clients, udp_client, tmp ) {
//pthread_mutex_lock(&udp_client->mutex_on_client);
dap_client_remote_t* client = udp_client->client; //log_it(L_INFO,"write_cb");
if(client != NULL && check_close(client) == 0 && client->_ready_to_write) //pthread_mutex_lock(&udp_client->mutex_on_client);
{
if(sh->client_write_callback) dap_client_remote_t *client = udp_client->client;
sh->client_write_callback(client, NULL);
if(client->buf_out_size > 0) if( client != NULL && !check_close(client) && (client->flags & DAP_SOCK_READY_TO_WRITE) ) {
{
//log_it(L_INFO,"write_cb_client"); if ( sh->client_write_callback )
for(size_t total_sent = 0; total_sent < client->buf_out_size;) { sh->client_write_callback( client, NULL );
struct sockaddr_in addr;
addr.sin_family = AF_INET; if ( client->buf_out_size > 0 ) {
dap_udp_client_get_address(client,&addr.sin_addr.s_addr,&addr.sin_port);
int bytes_sent = sendto(sh->socket_listener, client->buf_out + total_sent, client->buf_out_size - total_sent, 0, (struct sockaddr*) &addr, sizeof(addr)); //log_it(L_INFO,"write_cb_client");
if(bytes_sent < 0) {
log_it(L_ERROR,"Some error occured in send() function"); for( size_t total_sent = 0; total_sent < client->buf_out_size; ) {
break; struct sockaddr_in addr;
} addr.sin_family = AF_INET;
total_sent += bytes_sent; dap_udp_client_get_address( client, (unsigned int *)&addr.sin_addr.s_addr, &addr.sin_port );
}
client->buf_out_size = 0; int bytes_sent = sendto( sh->socket_listener, client->buf_out + total_sent,
bzero(client->buf_out, DAP_CLIENT_REMOTE_BUF + 1); client->buf_out_size - total_sent, 0, (struct sockaddr*) &addr, sizeof(addr) );
} if ( bytes_sent < 0 ) {
LL_DELETE(udp->waiting_clients,udp_client); log_it(L_ERROR,"Some error occured in send() function");
} break;
else if(client == NULL) }
LL_DELETE(udp->waiting_clients,udp_client); total_sent += bytes_sent;
//pthread_mutex_unlock(&udp_client->mutex_on_client);
} }
pthread_mutex_unlock(&udp->mutex_on_list); client->buf_out_size = 0;
memset( client->buf_out, 0, DAP_CLIENT_REMOTE_BUF + 1 );
}
LL_DELETE( udp->waiting_clients, udp_client );
} }
else if( client == NULL ) {
LL_DELETE( udp->waiting_clients, udp_client );
}
//pthread_mutex_unlock(&udp_client->mutex_on_client);
} // for client
pthread_mutex_unlock(&udp->mutex_on_list);
} }
/** /**
...@@ -161,106 +223,215 @@ static void write_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) ...@@ -161,106 +223,215 @@ static void write_cb(struct ev_loop* loop, struct ev_io* watcher, int revents)
* @param client Client structure * @param client Client structure
* @return 1 if client deleted, 0 if client is no need to delete * @return 1 if client deleted, 0 if client is no need to delete
*/ */
int check_close(dap_client_remote_t* client){ int check_close( dap_client_remote_t *client )
if(client->signal_close) {
{ dap_udp_client_t *client_check, *tmp;
dap_udp_client_t* udp_client = DAP_UDP_CLIENT(client);
dap_server_t* sh = client->server; if( !(client->flags & DAP_SOCK_SIGNAL_CLOSE) ) return 0;
dap_udp_server_t* udp_server = DAP_UDP_SERVER(sh);
dap_udp_client_t * client_check, * tmp; dap_udp_client_t *udp_client = DAP_UDP_CLIENT( client );
LL_FOREACH_SAFE(udp_server->waiting_clients,client_check,tmp) dap_server_t *sh = client->server;
if(client_check->host_key == udp_client->host_key) dap_udp_server_t *udp_server = DAP_UDP_SERVER( sh );
LL_DELETE(udp_server->waiting_clients,client_check);
dap_client_remote_remove(client, sh); LL_FOREACH_SAFE( udp_server->waiting_clients, client_check, tmp ) {
return 1;
if ( client_check->host_key == udp_client->host_key )
LL_DELETE( udp_server->waiting_clients, client_check );
} }
return 0;
dap_client_remote_remove( client, sh );
return 1;
} }
/** /**
* @brief read_cb * @brief read_cb
*/ */
static void read_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) static void read_cb( EPOLL_HANDLE efd, int revents, dap_server_t *sh )
{ {
//log_it(L_INFO,"read_cb"); // if ( !(revents & EV_READ) ) return;
if ( revents & EV_READ )
{ struct sockaddr_in clientaddr;
struct sockaddr_in clientaddr; int clientlen = sizeof(clientaddr);
int clientlen = sizeof(clientaddr); // dap_server_t *sh = watcher->data;
dap_server_t* sh = watcher->data;
bzero(buf, BUFSIZE); memset( buf, 0, BUFSIZE );
socklen_t bytes = recvfrom(sh->socket_listener, buf, BUFSIZE, 0,(struct sockaddr *) &clientaddr, &clientlen);
dap_client_remote_t *client = dap_udp_client_find(sh,clientaddr.sin_addr.s_addr,clientaddr.sin_port); int32_t bytes = recvfrom( sh->socket_listener, buf, BUFSIZE, 0,(struct sockaddr *) &clientaddr, &clientlen );
if(client != NULL && check_close(client) != 0)
dap_client_remote_t *client = dap_udp_client_find( sh, clientaddr.sin_addr.s_addr, clientaddr.sin_port );
if( client != NULL && check_close(client) != 0 )
return; return;
if(bytes > 0){
char * hostaddrp = inet_ntoa(clientaddr.sin_addr);
if(hostaddrp == NULL)
{
dap_udp_server_delete(sh);
error("ERROR on inet_ntoa\n");
}
if(client == NULL)
{
client = dap_udp_client_create(sh,&w_write,clientaddr.sin_addr.s_addr,clientaddr.sin_port);
if(client == NULL)
{
dap_udp_server_delete(sh);
error("ERROR create client structure\n");
}
}
dap_udp_client_t* udp_client = client->_inheritor;
pthread_mutex_lock(&udp_client->mutex_on_client);
size_t bytes_processed = 0;
size_t bytes_recieved = bytes;
while(bytes_recieved > 0){
size_t bytes_to_transfer = 0;
if(bytes_recieved > UDP_CLIENT_BUF - client->buf_in_size)
bytes_to_transfer = UDP_CLIENT_BUF - client->buf_in_size;
else
bytes_to_transfer = bytes_recieved;
memcpy(client->buf_in + client->buf_in_size,buf+bytes_processed,bytes_to_transfer);
client->buf_in_size += bytes_to_transfer;
if(sh->client_read_callback)
sh->client_read_callback(client,NULL);
bytes_processed += bytes_to_transfer;
bytes_recieved -= bytes_to_transfer;
}
client->buf_in_size = 0;
bzero(client->buf_in, DAP_CLIENT_REMOTE_BUF + 1);
pthread_mutex_unlock(&udp_client->mutex_on_client);
if ( bytes > 0 ) {
char *hostaddrp = inet_ntoa( clientaddr.sin_addr );
if ( hostaddrp == NULL ) {
dap_udp_server_delete( sh );
error("ERROR on inet_ntoa\n");
} }
else if(bytes < 0)
{
log_it(L_ERROR,"Bytes read Error %s",strerror(errno));
if(client != NULL)
client->signal_close = true;
if ( client == NULL ) {
client = dap_udp_client_create( sh, efd, clientaddr.sin_addr.s_addr, clientaddr.sin_port );
if(client == NULL) {
dap_udp_server_delete( sh );
error("ERROR create client structure\n");
}
} }
else if (bytes == 0)
{ dap_udp_client_t* udp_client = client->_inheritor;
if(client != NULL)
client->signal_close = true; pthread_mutex_lock( &udp_client->mutex_on_client );
size_t bytes_processed = 0;
size_t bytes_recieved = bytes;
while ( bytes_recieved > 0 ) {
size_t bytes_to_transfer = 0;
if ( bytes_recieved > UDP_CLIENT_BUF - client->buf_in_size )
bytes_to_transfer = UDP_CLIENT_BUF - client->buf_in_size;
else
bytes_to_transfer = bytes_recieved;
memcpy( client->buf_in + client->buf_in_size,buf + bytes_processed, bytes_to_transfer );
client->buf_in_size += bytes_to_transfer;
if ( sh->client_read_callback )
sh->client_read_callback( client, NULL );
bytes_processed += bytes_to_transfer;
bytes_recieved -= bytes_to_transfer;
} }
client->buf_in_size = 0;
memset( client->buf_in, 0, DAP_CLIENT_REMOTE_BUF + 1 );
pthread_mutex_unlock( &udp_client->mutex_on_client );
}
else if ( bytes < 0 ) {
log_it( L_ERROR, "Bytes read Error %s", strerror(errno) );
if( client != NULL )
client->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
else if (bytes == 0) {
if ( client != NULL )
client->flags |= DAP_SOCK_SIGNAL_CLOSE;
} }
} }
/**
* @brief dap_udp_client_loop Create client listening event loop
*/
void *dap_udp_client_loop( void *arg )
{
dap_server_t *d_server = (dap_server_t *)arg;
struct epoll_event pev;
struct epoll_event events[ 16 ];
pev.events = EPOLLOUT | EPOLLERR;
pev.data.fd = d_server->socket_listener;
if ( epoll_ctl( efd_write, EPOLL_CTL_ADD, d_server->socket_listener, &pev) != 0 ) {
log_it( L_ERROR, "epoll_ctl failed 001" );
return NULL;
}
log_it( L_NOTICE, "Start client write thread" );
do {
int32_t n = epoll_wait( efd_write, events, 16, -1 );
if ( n == -1 )
break;
for ( int32_t i = 0; i < n; ++ i ) {
if ( events[i].events & EPOLLOUT ) {
write_cb( efd_write, events[i].events, d_server );
}
else if( events[i].events & EPOLLERR ) {
log_it( L_ERROR, "Server socket error event" );
break;
}
}
} while( 1 );
return NULL;
}
/** /**
* @brief dap_udp_server_loop Start server event loop * @brief dap_udp_server_loop Start server event loop
* @param sh Server instance * @param sh Server instance
*/ */
void dap_udp_server_loop(dap_server_t * sh) void dap_udp_server_loop( dap_server_t *d_server )
{ {
// sh->proc_thread.tid = pthread_self(); EPOLL_HANDLE efd_read = epoll_create1( 0 );
pthread_t thread; EPOLL_HANDLE efd_write = epoll_create1( 0 );
pthread_create(&thread, NULL, dap_udp_client_loop, sh); pthread_t thread;
struct ev_loop * ev_main_loop = ev_default_loop(0);
w_read.data = sh; if ( (intptr_t)efd_read == -1 || (intptr_t)efd_write == -1 ) {
ev_io_init(&w_read, read_cb, sh->socket_listener, EV_READ);
ev_io_start(ev_main_loop, &w_read);
ev_run(ev_main_loop, 0);
}
log_it( L_ERROR, "epoll_create1 failed" );
goto error;
}
pthread_create( &thread, NULL, dap_udp_client_loop, d_server );
struct epoll_event pev;
struct epoll_event events[ 16 ];
pev.events = EPOLLIN | EPOLLERR;
pev.data.fd = d_server->socket_listener;
if ( epoll_ctl( efd_read, EPOLL_CTL_ADD, d_server->socket_listener, &pev) != 0 ) {
log_it( L_ERROR, "epoll_ctl failed 000" );
goto error;
}
while( 1 ) {
int32_t n = epoll_wait( efd_read, &events[0], 16, -1 );
if ( !n ) continue;
if ( n < 0 ) {
log_it( L_ERROR, "Server epoll error" );
break;
}
for( int32_t i = 0; i < n; ++ i ) {
if ( events[i].events & EPOLLIN ) {
read_cb( efd_read, events[i].events, d_server );
}
else if( events[i].events & EPOLLERR ) {
log_it( L_ERROR, "Server socket error event" );
goto error;
}
}
}
error:
#ifndef _WIN32
if ( efd_read != -1 )
close( efd_read );
if ( efd_write != -1 )
close( efd_write );
#else
if ( efd_read != INVALID_HANDLE_VALUE )
epoll_close( efd_read );
if ( efd_write != INVALID_HANDLE_VALUE )
epoll_close( efd_write );
#endif
return;
}
...@@ -17,9 +17,10 @@ ...@@ -17,9 +17,10 @@
You should have received a copy of the GNU Lesser General Public License You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/ */
#pragma once #pragma once
#ifndef _UDP_SERVER_H_
#define _UDP_SERVER_H_ #ifndef WIN32
#include <stdint.h> #include <stdint.h>
#include <sys/socket.h> #include <sys/socket.h>
...@@ -30,34 +31,36 @@ ...@@ -30,34 +31,36 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/select.h> #include <sys/select.h>
#include <sys/queue.h> #include <sys/queue.h>
#define EPOLL_HANDLE int
#else
#define EPOLL_HANDLE HANDLE
#endif
#include "dap_udp_client.h" #include "dap_udp_client.h"
#include "dap_server.h" #include "dap_server.h"
#include "dap_client_remote.h" #include "dap_client_remote.h"
struct dap_udp_server; struct dap_udp_server;
typedef struct dap_udp_thread{ typedef struct dap_udp_thread {
pthread_t tid; pthread_t tid;
} dap_udp_thread_t; } dap_udp_thread_t;
typedef void (*dap_udp_server_callback_t) (struct dap_udp_server *,void * arg); // Callback for specific server's operations typedef void (*dap_udp_server_callback_t) (struct dap_udp_server *,void *arg); // Callback for specific server's operations
typedef struct dap_udp_server {
typedef struct dap_udp_server{ dap_udp_client_t *clients;
dap_udp_client_t * clients; dap_udp_client_t *waiting_clients; // List clients for writing data
dap_udp_client_t * waiting_clients; // List clients for writing data
pthread_mutex_t mutex_on_list; pthread_mutex_t mutex_on_list;
void* _inheritor; void *_inheritor;
dap_server_t* dap_server; dap_server_t *dap_server;
} dap_udp_server_t; } dap_udp_server_t;
#define DAP_UDP_SERVER(a) ((dap_udp_server_t *) (a)->_inheritor) #define DAP_UDP_SERVER(a) ((dap_udp_server_t *) (a)->_inheritor)
extern void dap_udp_server_delete(dap_server_t * sh); extern void dap_udp_server_delete( dap_server_t *sh );
extern void dap_udp_server_loop( dap_server_t *udp_server ); // Start server event loop
extern void dap_udp_server_loop(dap_server_t* udp_server); // Start server event loop extern dap_server_t *dap_udp_server_listen( uint16_t port ); // Create and bind server
extern dap_server_t* dap_udp_server_listen(uint16_t port); // Create and bind server
#endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment