diff --git a/dap_udp_client.c b/dap_udp_client.c index 899898f8bb43f91b14be14b86b50552d9aa1168f..ac0a3a8851534f38248b78ee8dd960f39ad0b1a3 100755 --- a/dap_udp_client.c +++ b/dap_udp_client.c @@ -18,18 +18,43 @@ along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. */ -#include "dap_udp_client.h" -#include <sys/epoll.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <stdarg.h> -#include <unistd.h> -#include <string.h> #include <stdint.h> -#include <ev.h> + +#ifndef _WIN32 +#include <arpa/inet.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/select.h> +#include <sys/queue.h> +#include <errno.h> +#include <netdb.h> +#include <unistd.h> +#include <fcntl.h> +#include <sys/epoll.h> +#else +#undef _WIN32_WINNT +#define _WIN32_WINNT 0x0600 +#include <winsock2.h> +#include <windows.h> +#include <mswsock.h> +#include <ws2tcpip.h> +#include <io.h> +#include "wrappers.h" +#include <wepoll.h> +#include <pthread.h> +#endif + +#include "uthash.h" #include "utlist.h" + #include "dap_common.h" +#include "dap_udp_client.h" #include "dap_udp_server.h" #define LOG_TAG "udp_client" @@ -38,12 +63,7 @@ * @brief get_key Make key for hash table from host and port * @return 64 bit Key */ -uint64_t get_key(unsigned long host,unsigned short port){ - uint64_t key = host; - key = key << 32; - key += port; - return key; -} +#define get_key( host, key ) (((uint64_t)host << 32) + (uint64_t)port) /** * @brief udp_client_create Create new client and add it to hashmap @@ -53,32 +73,40 @@ uint64_t get_key(unsigned long host,unsigned short port){ * @param port Client port * @return Pointer to the new list's node */ -dap_client_remote_t * dap_udp_client_create(dap_server_t * sh, ev_io* w_client, unsigned long host, unsigned short port) +dap_client_remote_t *dap_udp_client_create( dap_server_t *dap_srv, EPOLL_HANDLE efd, unsigned long host, unsigned short port ) { - dap_udp_server_t* udp_server = DAP_UDP_SERVER(sh); - log_it(L_DEBUG,"Client structure create"); - - dap_udp_client_t * inh=DAP_NEW_Z(dap_udp_client_t); - inh->host_key = get_key(host,port); - - dap_client_remote_t * ret=DAP_NEW_Z(dap_client_remote_t); - inh->client = ret; - ret->server = sh; - ret->watcher_client = w_client; - ret->signal_close = false; - ret->_ready_to_read=true; - ret->_ready_to_write=false; - ret->_inheritor = inh; - pthread_mutex_init(&inh->mutex_on_client, NULL); - - pthread_mutex_lock(&udp_server->mutex_on_list); - HASH_ADD_INT( udp_server->clients, host_key, inh); - pthread_mutex_unlock(&udp_server->mutex_on_list); - if(sh->client_new_callback) - sh->client_new_callback(ret,NULL); // Init internal structure - - - return ret; + dap_udp_server_t *udp_server = DAP_UDP_SERVER( dap_srv ); + log_it( L_DEBUG, "Client structure create" ); + + dap_udp_client_t *inh = DAP_NEW_Z( dap_udp_client_t ); + inh->host_key = get_key( host, port ); + + dap_client_remote_t *ret = DAP_NEW_Z( dap_client_remote_t ); + inh->client = ret; + + ret->pevent.events = EPOLLIN | EPOLLERR; + ret->pevent.data.fd = dap_srv->socket_listener; + + ret->server = dap_srv; + ret->efd = efd; + + ret->flags = DAP_SOCK_READY_TO_READ; +// ret->signal_close = false; +// ret->_ready_to_read = true; +// ret->_ready_to_write = false; + + ret->_inheritor = inh; + + pthread_mutex_init( &inh->mutex_on_client, NULL ); + + pthread_mutex_lock( &udp_server->mutex_on_list ); + HASH_ADD_INT( udp_server->clients, host_key, inh ); + pthread_mutex_unlock( &udp_server->mutex_on_list ); + + if( dap_srv->client_new_callback ) + dap_srv->client_new_callback( ret, NULL ); // Init internal structure + + return ret; } /** @@ -87,10 +115,11 @@ dap_client_remote_t * dap_udp_client_create(dap_server_t * sh, ev_io* w_client, * @param host Variable for host address * @param host Variable for port */ -void dap_udp_client_get_address(dap_client_remote_t *client, unsigned int* host,unsigned short* port){ - dap_udp_client_t* udp_client = DAP_UDP_CLIENT(client); - *host = udp_client->host_key >> 32; - *port = (udp_client->host_key <<32) - *host; +void dap_udp_client_get_address( dap_client_remote_t *client, unsigned int* host, unsigned short* port ) +{ + dap_udp_client_t* udp_client = DAP_UDP_CLIENT( client ); + *host = udp_client->host_key >> 32; + *port = (udp_client->host_key <<32) - *host; } /** @@ -100,106 +129,123 @@ void dap_udp_client_get_address(dap_client_remote_t *client, unsigned int* host, * @param port Source port * @return Pointer to client or NULL if not found */ -dap_client_remote_t * dap_udp_client_find(dap_server_t * sh, unsigned long host,unsigned short port) +dap_client_remote_t *dap_udp_client_find( dap_server_t * dap_srv, unsigned long host, unsigned short port ) { - dap_udp_server_t* udp_server = DAP_UDP_SERVER(sh); - pthread_mutex_lock(&udp_server->mutex_on_list); - dap_udp_client_t* inh = NULL; - - uint64_t token = get_key(host,port); - HASH_FIND_INT(udp_server->clients,&token,inh); - - pthread_mutex_unlock(&udp_server->mutex_on_list); - if(inh == NULL) - return NULL; - else - return inh->client; + dap_udp_client_t *inh = NULL; + + dap_udp_server_t *udp_server = DAP_UDP_SERVER( dap_srv ); + uint64_t token = get_key( host, port ); + + pthread_mutex_lock( &udp_server->mutex_on_list ); + HASH_FIND_INT( udp_server->clients, &token, inh ); + pthread_mutex_unlock( &udp_server->mutex_on_list ); + + if( inh == NULL ) + return NULL; + else + return inh->client; } /** * @brief udp_client_ready_to_read Set ready_to_read flag - * @param sc Client structure + * @param dap_rclient Client structure * @param is_ready Flag value */ -void dap_udp_client_ready_to_read(dap_client_remote_t * sc,bool is_ready) +void dap_udp_client_ready_to_read( dap_client_remote_t *sc, bool is_ready ) { - if(is_ready != sc->_ready_to_read) { + if( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_READ) ) + return; - uint32_t events = 0; - sc->_ready_to_read=is_ready; + if ( is_ready ) + sc->flags |= DAP_SOCK_READY_TO_READ; + else + sc->flags ^= DAP_SOCK_READY_TO_READ; - if(sc->_ready_to_read) - { - events |= EV_READ; - } + int events = EPOLLERR; - if(sc->_ready_to_write) - events |= EV_WRITE; + if( sc->flags & DAP_SOCK_READY_TO_READ ) + events |= EPOLLIN; - ev_io_set(sc->watcher_client, sc->server->socket_listener, events ); - } + if( sc->flags & DAP_SOCK_READY_TO_WRITE ) + events |= EPOLLOUT; + + sc->pevent.events = events; + + if( epoll_ctl(sc->efd, EPOLL_CTL_MOD, sc->server->socket_listener, &sc->pevent) != 0 ) { + log_it( L_ERROR, "epoll_ctl failed 002" ); + } } /** * @brief udp_client_ready_to_write Set ready_to_write flag - * @param sc Client structure + * @param dap_rclient Client structure * @param is_ready Flag value */ -void dap_udp_client_ready_to_write(dap_client_remote_t * sc,bool is_ready) +void dap_udp_client_ready_to_write( dap_client_remote_t *sc, bool is_ready ) { - // if(is_ready) - // add_waiting_client(sc); // Add client to writing queue - if(is_ready != sc->_ready_to_write) { - uint32_t events = 0; - sc->_ready_to_write=is_ready; - - if(sc->_ready_to_read) - events |= EV_READ; - - if(sc->_ready_to_write) - { - events |= EV_WRITE; - } - int descriptor = sc->watcher_client->fd; - ev_io_set(sc->watcher_client, descriptor, events ); - } + if ( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_WRITE) ) + return; + + if ( is_ready ) + sc->flags |= DAP_SOCK_READY_TO_WRITE; + else + sc->flags ^= DAP_SOCK_READY_TO_WRITE; + + int events = EPOLLERR; + + if( sc->flags & DAP_SOCK_READY_TO_READ ) + events |= EPOLLIN; + + if( sc->flags & DAP_SOCK_READY_TO_WRITE ) + events |= EPOLLOUT; + + sc->pevent.events = events; + + if ( epoll_ctl(sc->efd, EPOLL_CTL_MOD, sc->pevent.data.fd, &sc->pevent) != 0 ) { + log_it( L_ERROR, "epoll_ctl failed 003" ); + } } /** * @brief add_waiting_client Add Client to write queue * @param client Client instance */ -void add_waiting_client(dap_client_remote_t* client){ - dap_server_t* sh = client->server; - dap_udp_server_t* udp_server = DAP_UDP_SERVER(sh); - dap_udp_client_t* udp_client = DAP_UDP_CLIENT(client); - - pthread_mutex_lock(&udp_server->mutex_on_list); +void add_waiting_client( dap_client_remote_t *dap_rclient ) +{ dap_udp_client_t* udp_cl, *tmp; - LL_FOREACH_SAFE(udp_server->waiting_clients,udp_cl,tmp) - if(udp_cl == udp_client) - { - pthread_mutex_unlock(&udp_server->mutex_on_list); + + dap_server_t *dap_srv = dap_rclient->server; + dap_udp_server_t *udp_server = DAP_UDP_SERVER( dap_srv ); + dap_udp_client_t *udp_client = DAP_UDP_CLIENT( dap_rclient ); + + pthread_mutex_lock( &udp_server->mutex_on_list ); + LL_FOREACH_SAFE( udp_server->waiting_clients, udp_cl, tmp ) { + if( udp_cl == udp_client ) { + pthread_mutex_unlock( &udp_server->mutex_on_list ); return; } - LL_APPEND(udp_server->waiting_clients, udp_client); - pthread_mutex_unlock(&udp_server->mutex_on_list); - + } + LL_APPEND( udp_server->waiting_clients, udp_client ); + pthread_mutex_unlock( &udp_server->mutex_on_list ); } -size_t dap_udp_client_write(dap_client_remote_t *sc, const void * data, size_t data_size){ - size_t size = dap_client_remote_write(sc,data,data_size); - add_waiting_client(sc); +size_t dap_udp_client_write( dap_client_remote_t *dap_rclient, const void *data, size_t data_size ) +{ + size_t size = dap_client_remote_write( dap_rclient, data, data_size ); + add_waiting_client( dap_rclient ); return size; } -size_t dap_udp_client_write_f(dap_client_remote_t *a_client, const char * a_format,...){ +size_t dap_udp_client_write_f( dap_client_remote_t *dap_rclient, const char * a_format, ... ) +{ size_t size = 0; - va_list ap; - va_start(ap,a_format); - size =dap_client_remote_write_f(a_client,a_format,ap); - va_end(ap); - add_waiting_client(a_client); + va_list va; + + va_start( va, a_format ); + size = dap_client_remote_write_f( dap_rclient, a_format, va ); + va_end( va ); + + add_waiting_client( dap_rclient ); return size; } diff --git a/dap_udp_client.h b/dap_udp_client.h index a59113e10fe9d5dbae3ee91ed00892f20120d8fd..7fc66f14585e359c482b97c201a7fb6bb5beb1ad 100755 --- a/dap_udp_client.h +++ b/dap_udp_client.h @@ -18,49 +18,49 @@ along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. */ #pragma once -#ifndef _UDP_CLIENT_H -#define _UDP_CLIENT_H -#include <sys/queue.h> #include <stdint.h> #include <stddef.h> #include <stdbool.h> + +#ifndef WIN32 #include <sys/queue.h> +#endif + #include "uthash.h" -#include "dap_client_remote.h" -#include <ev.h> +#include "dap_client_remote.h" typedef struct dap_udp_server dap_udp_server_t; struct dap_udp_client; -#define UDP_CLIENT_BUF 100000 +#define UDP_CLIENT_BUF 65535 -typedef struct dap_udp_client{ - dap_client_remote_t* client; +typedef struct dap_udp_client { + + dap_client_remote_t *client; uint64_t host_key; //key contains host address in first 4 bytes and port in last 4 bytes UT_hash_handle hh; + struct dap_udp_client *next, *prev; //pointers for writing queue pthread_mutex_t mutex_on_client; - void * _inheritor; // Internal data to specific client type, usualy states for state machine + void *_inheritor; // Internal data to specific client type, usualy states for state machine + } dap_udp_client_t; // Node of bidirectional list of clients #define DAP_UDP_CLIENT(a) ((dap_udp_client_t *) (a)->_inheritor) +dap_client_remote_t *dap_udp_client_create( dap_server_t *sh, EPOLL_HANDLE efd, unsigned long host, unsigned short port ); // Create new client and add it to the list +dap_client_remote_t *dap_udp_client_find( dap_server_t *sh, unsigned long host, unsigned short port ); // Find client by host and port -dap_client_remote_t * dap_udp_client_create(dap_server_t * sh, ev_io* w_client, unsigned long host, unsigned short port); // Create new client and add it to the list -dap_client_remote_t * dap_udp_client_find(dap_server_t * sh, unsigned long host, unsigned short port); // Find client by host and port +void dap_udp_client_ready_to_read( dap_client_remote_t *sc, bool is_ready ); +void dap_udp_client_ready_to_write( dap_client_remote_t *sc, bool is_ready ); -void dap_udp_client_ready_to_read(dap_client_remote_t * sc,bool is_ready); -void dap_udp_client_ready_to_write(dap_client_remote_t * sc,bool is_ready); +size_t dap_udp_client_write( dap_client_remote_t *sc, const void * data, size_t data_size ); +size_t dap_udp_client_write_f( dap_client_remote_t *a_client, const char * a_format, ... ); -size_t dap_udp_client_write(dap_client_remote_t *sc, const void * data, size_t data_size); -size_t dap_udp_client_write_f(dap_client_remote_t *a_client, const char * a_format,...); +void add_waiting_client( dap_client_remote_t *client ); // Add client to writing queue -void add_waiting_client(dap_client_remote_t* client); // Add client to writing queue - -void dap_udp_client_get_address(dap_client_remote_t *client, unsigned int* host,unsigned short* port); - -#endif +void dap_udp_client_get_address( dap_client_remote_t *client, unsigned int *host, unsigned short *port ); diff --git a/dap_udp_server.c b/dap_udp_server.c index 6009ebb9a2325272e0f9c7df29178950e3c01172..ba9adb6e1f62754f09f24f5c6431b0c85c6daac2 100755 --- a/dap_udp_server.c +++ b/dap_udp_server.c @@ -1,78 +1,127 @@ -#include "dap_udp_server.h" +/* + Copyright (c) 2017-2019 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <string.h> +#include <time.h> #include <stdio.h> -#include "dap_common.h" +#include <stdlib.h> +#include <stddef.h> +#include <signal.h> +#include <stdint.h> + +#ifndef _WIN32 +#include <arpa/inet.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/select.h> +#include <sys/queue.h> #include <errno.h> #include <netdb.h> #include <unistd.h> #include <fcntl.h> -#include <ev.h> +#include <sys/epoll.h> +#else +#undef _WIN32_WINNT +#define _WIN32_WINNT 0x0600 +#include <winsock2.h> +#include <windows.h> +#include <mswsock.h> +#include <ws2tcpip.h> +#include <io.h> +#include "wrappers.h" +#include <wepoll.h> +#include <pthread.h> +#endif + +#include "uthash.h" #include "utlist.h" +#include "dap_common.h" +#include "dap_udp_server.h" + #define LOG_TAG "dap_udp_server" #define BUFSIZE 1024 -char buf[BUFSIZE]; /* message buf */ -struct ev_io w_read; -struct ev_io w_write; +char buf[ BUFSIZE ]; /* message buf */ -static void write_cb(struct ev_loop* _loop, struct ev_io* watcher, int revents); -int check_close(dap_client_remote_t* client); +//struct ev_io w_read; +//struct ev_io w_write; + +EPOLL_HANDLE efd_read = (EPOLL_HANDLE)-1; +EPOLL_HANDLE efd_write = (EPOLL_HANDLE)-1; + +//static void write_cb( EPOLL_HANDLE efd, int revents ); + +int check_close( dap_client_remote_t *client ); /** */ -void error(char *msg) { - perror(msg); - exit(1); +void error( char *msg ) { + + perror( msg ); + exit( 1 ); } /** * @brief dap_udp_server_new Initialize server structure * @return Server pointer */ -dap_server_t * dap_udp_server_new() +dap_server_t *dap_udp_server_new( ) { - dap_udp_server_t* udp_server = (dap_udp_server_t*)calloc(1,sizeof(dap_udp_server_t)); - udp_server->waiting_clients = NULL; - dap_server_t* sh = (dap_server_t*) calloc(1,sizeof(dap_server_t)); - sh->_inheritor = udp_server; - udp_server->dap_server = sh; - return sh; -} + dap_udp_server_t *udp_server = (dap_udp_server_t *)calloc( 1, sizeof(dap_udp_server_t) ); + udp_server->waiting_clients = NULL; -/** - * @brief dap_udp_client_loop Create client listening event loop - */ -void* dap_udp_client_loop(void * arg) -{ - dap_server_t* sh = (dap_server_t*)arg; - log_it(L_NOTICE, "Start client listener thread"); - struct ev_loop * ev_client_loop = ev_loop_new(0); - w_write.data = sh; - ev_io_init(&w_write, write_cb, sh->socket_listener, EV_WRITE); - ev_io_start(ev_client_loop, &w_write); - ev_loop(ev_client_loop, 0); - return NULL; + dap_server_t *sh = (dap_server_t *) calloc( 1, sizeof(dap_server_t) ); + sh->_inheritor = udp_server; + + udp_server->dap_server = sh; + + return sh; } /** * @brief dap_udp_server_delete Safe delete server structure * @param sh Server instance */ -void dap_udp_server_delete(dap_server_t * sh) +void dap_udp_server_delete( dap_server_t *sh ) { - if(sh->address) - free(sh->address); - - dap_client_remote_t * client, * tmp; - HASH_ITER(hh,sh->clients,client,tmp) - dap_client_remote_remove(client, sh); - - if(sh->server_delete_callback) - sh->server_delete_callback(sh,NULL); - if(sh->_inheritor) - free(sh->_inheritor); - free(sh); + dap_client_remote_t *client, *tmp; + + if ( !sh ) return; + + if( sh->address ) + free( sh->address ); + + HASH_ITER( hh, sh->clients, client, tmp ) + dap_client_remote_remove( client, sh ); + + if ( sh->server_delete_callback ) + sh->server_delete_callback( sh, NULL ); + + if ( sh->_inheritor ) + free( sh->_inheritor ); + + free( sh ); } /** @@ -80,80 +129,93 @@ void dap_udp_server_delete(dap_server_t * sh) * @param port Binding port * @return Server instance */ -dap_server_t * dap_udp_server_listen(uint16_t port){ - dap_server_t* sh = dap_udp_server_new(); +dap_server_t *dap_udp_server_listen( uint16_t port ) { - sh->socket_listener = socket (AF_INET, SOCK_DGRAM, 0); + dap_server_t *sh = dap_udp_server_new( ); - if (sh->socket_listener < 0){ - log_it (L_ERROR,"Socket error %s",strerror(errno)); - dap_udp_server_delete(sh); - return NULL; - } + sh->socket_listener = socket( AF_INET, SOCK_DGRAM, 0 ); - int optval = 1; - if(setsockopt(sh->socket_listener, SOL_SOCKET, SO_REUSEADDR,(const void *)&optval , sizeof(int)) < 0) - log_it(L_WARNING, "Can't set up REUSEADDR flag to the socket"); + if ( sh->socket_listener < 0 ) { + log_it ( L_ERROR, "Socket error %s", strerror(errno) ); + dap_udp_server_delete( sh ); + return NULL; + } - bzero((char *) &(sh->listener_addr), sizeof(sh->listener_addr)); - sh->listener_addr.sin_family = AF_INET; - sh->listener_addr.sin_addr.s_addr = htonl(INADDR_ANY); - sh->listener_addr.sin_port = htons(port); + int optval = 1; + if ( setsockopt( sh->socket_listener, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval , sizeof(int)) < 0 ) + log_it( L_WARNING, "Can't set up REUSEADDR flag to the socket" ); - if(bind (sh->socket_listener, (struct sockaddr *) &(sh->listener_addr), sizeof(sh->listener_addr)) < 0) { - log_it(L_ERROR,"Bind error: %s",strerror(errno)); - dap_udp_server_delete(sh); - return NULL; - } - pthread_mutex_init(&DAP_UDP_SERVER(sh)->mutex_on_list, NULL); - return sh; + memset( (char *)&(sh->listener_addr), 0, sizeof(sh->listener_addr) ); + + sh->listener_addr.sin_family = AF_INET; + sh->listener_addr.sin_addr.s_addr = htonl( INADDR_ANY ); + sh->listener_addr.sin_port = htons( port ); + + if ( bind(sh->socket_listener, (struct sockaddr *) &(sh->listener_addr), sizeof(sh->listener_addr)) < 0) { + log_it( L_ERROR, "Bind error: %s", strerror(errno) ); + dap_udp_server_delete( sh ); + return NULL; + } + + pthread_mutex_init( &DAP_UDP_SERVER(sh)->mutex_on_list, NULL ); + + return sh; } /** * @brief write_cb */ -static void write_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) +static void write_cb( EPOLL_HANDLE efd, int revents, dap_server_t *sh ) { - if( ( revents & EV_WRITE ) ) { - dap_server_t* sh = watcher->data; - dap_udp_server_t* udp = DAP_UDP_SERVER(sh); - dap_udp_client_t * udp_client, * tmp; - pthread_mutex_lock(&udp->mutex_on_list); - LL_FOREACH_SAFE(udp->waiting_clients,udp_client,tmp) - { - //log_it(L_INFO,"write_cb"); - //pthread_mutex_lock(&udp_client->mutex_on_client); - dap_client_remote_t* client = udp_client->client; - if(client != NULL && check_close(client) == 0 && client->_ready_to_write) - { - if(sh->client_write_callback) - sh->client_write_callback(client, NULL); - if(client->buf_out_size > 0) - { - //log_it(L_INFO,"write_cb_client"); - for(size_t total_sent = 0; total_sent < client->buf_out_size;) { - struct sockaddr_in addr; - addr.sin_family = AF_INET; - dap_udp_client_get_address(client,&addr.sin_addr.s_addr,&addr.sin_port); - int bytes_sent = sendto(sh->socket_listener, client->buf_out + total_sent, client->buf_out_size - total_sent, 0, (struct sockaddr*) &addr, sizeof(addr)); - if(bytes_sent < 0) { - log_it(L_ERROR,"Some error occured in send() function"); - break; - } - total_sent += bytes_sent; - } - client->buf_out_size = 0; - bzero(client->buf_out, DAP_CLIENT_REMOTE_BUF + 1); - - } - LL_DELETE(udp->waiting_clients,udp_client); - } - else if(client == NULL) - LL_DELETE(udp->waiting_clients,udp_client); - //pthread_mutex_unlock(&udp_client->mutex_on_client); + dap_udp_client_t *udp_client, *tmp; + +// dap_server_t *sh = watcher->data; + dap_udp_server_t *udp = DAP_UDP_SERVER( sh ); + + pthread_mutex_lock( &udp->mutex_on_list ); + + LL_FOREACH_SAFE( udp->waiting_clients, udp_client, tmp ) { + + //log_it(L_INFO,"write_cb"); + //pthread_mutex_lock(&udp_client->mutex_on_client); + + dap_client_remote_t *client = udp_client->client; + + if( client != NULL && !check_close(client) && (client->flags & DAP_SOCK_READY_TO_WRITE) ) { + + if ( sh->client_write_callback ) + sh->client_write_callback( client, NULL ); + + if ( client->buf_out_size > 0 ) { + + //log_it(L_INFO,"write_cb_client"); + + for( size_t total_sent = 0; total_sent < client->buf_out_size; ) { + struct sockaddr_in addr; + addr.sin_family = AF_INET; + dap_udp_client_get_address( client, (unsigned int *)&addr.sin_addr.s_addr, &addr.sin_port ); + + int bytes_sent = sendto( sh->socket_listener, client->buf_out + total_sent, + client->buf_out_size - total_sent, 0, (struct sockaddr*) &addr, sizeof(addr) ); + + if ( bytes_sent < 0 ) { + log_it(L_ERROR,"Some error occured in send() function"); + break; + } + total_sent += bytes_sent; } - pthread_mutex_unlock(&udp->mutex_on_list); + client->buf_out_size = 0; + memset( client->buf_out, 0, DAP_CLIENT_REMOTE_BUF + 1 ); + } + LL_DELETE( udp->waiting_clients, udp_client ); } + else if( client == NULL ) { + LL_DELETE( udp->waiting_clients, udp_client ); + } + //pthread_mutex_unlock(&udp_client->mutex_on_client); + + } // for client + pthread_mutex_unlock(&udp->mutex_on_list); } /** @@ -161,106 +223,215 @@ static void write_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) * @param client Client structure * @return 1 if client deleted, 0 if client is no need to delete */ -int check_close(dap_client_remote_t* client){ - if(client->signal_close) - { - dap_udp_client_t* udp_client = DAP_UDP_CLIENT(client); - dap_server_t* sh = client->server; - dap_udp_server_t* udp_server = DAP_UDP_SERVER(sh); - dap_udp_client_t * client_check, * tmp; - LL_FOREACH_SAFE(udp_server->waiting_clients,client_check,tmp) - if(client_check->host_key == udp_client->host_key) - LL_DELETE(udp_server->waiting_clients,client_check); - dap_client_remote_remove(client, sh); - return 1; +int check_close( dap_client_remote_t *client ) +{ + dap_udp_client_t *client_check, *tmp; + + if( !(client->flags & DAP_SOCK_SIGNAL_CLOSE) ) return 0; + + dap_udp_client_t *udp_client = DAP_UDP_CLIENT( client ); + dap_server_t *sh = client->server; + dap_udp_server_t *udp_server = DAP_UDP_SERVER( sh ); + + LL_FOREACH_SAFE( udp_server->waiting_clients, client_check, tmp ) { + + if ( client_check->host_key == udp_client->host_key ) + LL_DELETE( udp_server->waiting_clients, client_check ); } - return 0; + + dap_client_remote_remove( client, sh ); + + return 1; } /** * @brief read_cb */ -static void read_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) +static void read_cb( EPOLL_HANDLE efd, int revents, dap_server_t *sh ) { - //log_it(L_INFO,"read_cb"); - if ( revents & EV_READ ) - { - struct sockaddr_in clientaddr; - int clientlen = sizeof(clientaddr); - dap_server_t* sh = watcher->data; - bzero(buf, BUFSIZE); - socklen_t bytes = recvfrom(sh->socket_listener, buf, BUFSIZE, 0,(struct sockaddr *) &clientaddr, &clientlen); - dap_client_remote_t *client = dap_udp_client_find(sh,clientaddr.sin_addr.s_addr,clientaddr.sin_port); - if(client != NULL && check_close(client) != 0) +// if ( !(revents & EV_READ) ) return; + + struct sockaddr_in clientaddr; + int clientlen = sizeof(clientaddr); +// dap_server_t *sh = watcher->data; + + memset( buf, 0, BUFSIZE ); + + int32_t bytes = recvfrom( sh->socket_listener, buf, BUFSIZE, 0,(struct sockaddr *) &clientaddr, &clientlen ); + + dap_client_remote_t *client = dap_udp_client_find( sh, clientaddr.sin_addr.s_addr, clientaddr.sin_port ); + + if( client != NULL && check_close(client) != 0 ) return; - if(bytes > 0){ - char * hostaddrp = inet_ntoa(clientaddr.sin_addr); - if(hostaddrp == NULL) - { - dap_udp_server_delete(sh); - error("ERROR on inet_ntoa\n"); - } - if(client == NULL) - { - client = dap_udp_client_create(sh,&w_write,clientaddr.sin_addr.s_addr,clientaddr.sin_port); - if(client == NULL) - { - dap_udp_server_delete(sh); - error("ERROR create client structure\n"); - } - } - dap_udp_client_t* udp_client = client->_inheritor; - pthread_mutex_lock(&udp_client->mutex_on_client); - size_t bytes_processed = 0; - size_t bytes_recieved = bytes; - while(bytes_recieved > 0){ - size_t bytes_to_transfer = 0; - if(bytes_recieved > UDP_CLIENT_BUF - client->buf_in_size) - bytes_to_transfer = UDP_CLIENT_BUF - client->buf_in_size; - else - bytes_to_transfer = bytes_recieved; - memcpy(client->buf_in + client->buf_in_size,buf+bytes_processed,bytes_to_transfer); - client->buf_in_size += bytes_to_transfer; - - if(sh->client_read_callback) - sh->client_read_callback(client,NULL); - - bytes_processed += bytes_to_transfer; - bytes_recieved -= bytes_to_transfer; - } - client->buf_in_size = 0; - bzero(client->buf_in, DAP_CLIENT_REMOTE_BUF + 1); - pthread_mutex_unlock(&udp_client->mutex_on_client); + if ( bytes > 0 ) { + + char *hostaddrp = inet_ntoa( clientaddr.sin_addr ); + + if ( hostaddrp == NULL ) { + dap_udp_server_delete( sh ); + error("ERROR on inet_ntoa\n"); } - else if(bytes < 0) - { - log_it(L_ERROR,"Bytes read Error %s",strerror(errno)); - if(client != NULL) - client->signal_close = true; + if ( client == NULL ) { + client = dap_udp_client_create( sh, efd, clientaddr.sin_addr.s_addr, clientaddr.sin_port ); + if(client == NULL) { + dap_udp_server_delete( sh ); + error("ERROR create client structure\n"); + } } - else if (bytes == 0) - { - if(client != NULL) - client->signal_close = true; + + dap_udp_client_t* udp_client = client->_inheritor; + + pthread_mutex_lock( &udp_client->mutex_on_client ); + + size_t bytes_processed = 0; + size_t bytes_recieved = bytes; + + while ( bytes_recieved > 0 ) { + + size_t bytes_to_transfer = 0; + + if ( bytes_recieved > UDP_CLIENT_BUF - client->buf_in_size ) + bytes_to_transfer = UDP_CLIENT_BUF - client->buf_in_size; + else + bytes_to_transfer = bytes_recieved; + + memcpy( client->buf_in + client->buf_in_size,buf + bytes_processed, bytes_to_transfer ); + client->buf_in_size += bytes_to_transfer; + + if ( sh->client_read_callback ) + sh->client_read_callback( client, NULL ); + + bytes_processed += bytes_to_transfer; + bytes_recieved -= bytes_to_transfer; } + + client->buf_in_size = 0; + memset( client->buf_in, 0, DAP_CLIENT_REMOTE_BUF + 1 ); + + pthread_mutex_unlock( &udp_client->mutex_on_client ); + + } + else if ( bytes < 0 ) { + + log_it( L_ERROR, "Bytes read Error %s", strerror(errno) ); + if( client != NULL ) + client->flags |= DAP_SOCK_SIGNAL_CLOSE; + } + else if (bytes == 0) { + if ( client != NULL ) + client->flags |= DAP_SOCK_SIGNAL_CLOSE; } } +/** + * @brief dap_udp_client_loop Create client listening event loop + */ +void *dap_udp_client_loop( void *arg ) +{ + dap_server_t *d_server = (dap_server_t *)arg; + struct epoll_event pev; + struct epoll_event events[ 16 ]; + + pev.events = EPOLLOUT | EPOLLERR; + pev.data.fd = d_server->socket_listener; + + if ( epoll_ctl( efd_write, EPOLL_CTL_ADD, d_server->socket_listener, &pev) != 0 ) { + log_it( L_ERROR, "epoll_ctl failed 001" ); + return NULL; + } + + log_it( L_NOTICE, "Start client write thread" ); + + do { + + int32_t n = epoll_wait( efd_write, events, 16, -1 ); + + if ( n == -1 ) + break; + + for ( int32_t i = 0; i < n; ++ i ) { + + if ( events[i].events & EPOLLOUT ) { + write_cb( efd_write, events[i].events, d_server ); + } + else if( events[i].events & EPOLLERR ) { + log_it( L_ERROR, "Server socket error event" ); + break; + } + } + + } while( 1 ); + + return NULL; +} + + /** * @brief dap_udp_server_loop Start server event loop * @param sh Server instance */ -void dap_udp_server_loop(dap_server_t * sh) +void dap_udp_server_loop( dap_server_t *d_server ) { -// sh->proc_thread.tid = pthread_self(); - pthread_t thread; - pthread_create(&thread, NULL, dap_udp_client_loop, sh); - struct ev_loop * ev_main_loop = ev_default_loop(0); - w_read.data = sh; - ev_io_init(&w_read, read_cb, sh->socket_listener, EV_READ); - ev_io_start(ev_main_loop, &w_read); - ev_run(ev_main_loop, 0); -} + EPOLL_HANDLE efd_read = epoll_create1( 0 ); + EPOLL_HANDLE efd_write = epoll_create1( 0 ); + pthread_t thread; + + if ( (intptr_t)efd_read == -1 || (intptr_t)efd_write == -1 ) { + log_it( L_ERROR, "epoll_create1 failed" ); + goto error; + } + + pthread_create( &thread, NULL, dap_udp_client_loop, d_server ); + + struct epoll_event pev; + struct epoll_event events[ 16 ]; + + pev.events = EPOLLIN | EPOLLERR; + pev.data.fd = d_server->socket_listener; + + if ( epoll_ctl( efd_read, EPOLL_CTL_ADD, d_server->socket_listener, &pev) != 0 ) { + log_it( L_ERROR, "epoll_ctl failed 000" ); + goto error; + } + + while( 1 ) { + + int32_t n = epoll_wait( efd_read, &events[0], 16, -1 ); + + if ( !n ) continue; + if ( n < 0 ) { + log_it( L_ERROR, "Server epoll error" ); + break; + } + + for( int32_t i = 0; i < n; ++ i ) { + + if ( events[i].events & EPOLLIN ) { + read_cb( efd_read, events[i].events, d_server ); + } + else if( events[i].events & EPOLLERR ) { + log_it( L_ERROR, "Server socket error event" ); + goto error; + } + } + } + +error: + + #ifndef _WIN32 + if ( efd_read != -1 ) + close( efd_read ); + if ( efd_write != -1 ) + close( efd_write ); + #else + if ( efd_read != INVALID_HANDLE_VALUE ) + epoll_close( efd_read ); + if ( efd_write != INVALID_HANDLE_VALUE ) + epoll_close( efd_write ); + #endif + + return; +} diff --git a/dap_udp_server.h b/dap_udp_server.h index 9aa180b12e53b4510d8d74de268c8db7e842a9c9..dcb94c77eaee4fef438bcbac42feefed2b0c6bbd 100755 --- a/dap_udp_server.h +++ b/dap_udp_server.h @@ -17,9 +17,10 @@ You should have received a copy of the GNU Lesser General Public License along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. */ + #pragma once -#ifndef _UDP_SERVER_H_ -#define _UDP_SERVER_H_ + +#ifndef WIN32 #include <stdint.h> #include <sys/socket.h> @@ -30,34 +31,36 @@ #include <sys/stat.h> #include <sys/select.h> #include <sys/queue.h> +#define EPOLL_HANDLE int +#else +#define EPOLL_HANDLE HANDLE +#endif + #include "dap_udp_client.h" #include "dap_server.h" #include "dap_client_remote.h" struct dap_udp_server; -typedef struct dap_udp_thread{ +typedef struct dap_udp_thread { pthread_t tid; } dap_udp_thread_t; -typedef void (*dap_udp_server_callback_t) (struct dap_udp_server *,void * arg); // Callback for specific server's operations +typedef void (*dap_udp_server_callback_t) (struct dap_udp_server *,void *arg); // Callback for specific server's operations + +typedef struct dap_udp_server { -typedef struct dap_udp_server{ - dap_udp_client_t * clients; - dap_udp_client_t * waiting_clients; // List clients for writing data + dap_udp_client_t *clients; + dap_udp_client_t *waiting_clients; // List clients for writing data pthread_mutex_t mutex_on_list; - void* _inheritor; - dap_server_t* dap_server; + void *_inheritor; + dap_server_t *dap_server; + } dap_udp_server_t; #define DAP_UDP_SERVER(a) ((dap_udp_server_t *) (a)->_inheritor) -extern void dap_udp_server_delete(dap_server_t * sh); - -extern void dap_udp_server_loop(dap_server_t* udp_server); // Start server event loop - -extern dap_server_t* dap_udp_server_listen(uint16_t port); // Create and bind server - -#endif - +extern void dap_udp_server_delete( dap_server_t *sh ); +extern void dap_udp_server_loop( dap_server_t *udp_server ); // Start server event loop +extern dap_server_t *dap_udp_server_listen( uint16_t port ); // Create and bind server