Skip to content
Snippets Groups Projects
Commit 28fb0ed1 authored by Ivan Deniskin's avatar Ivan Deniskin
Browse files

dap_udp_client inherited from dap_client_remote, dap_udp_server inherited from dap_server

parent 8e5bcab5
No related branches found
No related tags found
1 merge request!24Support 3689
...@@ -6,9 +6,13 @@ set(DAP_UDP_SERVER_SRCS dap_udp_server.c dap_udp_client.h dap_udp_client.c ) ...@@ -6,9 +6,13 @@ set(DAP_UDP_SERVER_SRCS dap_udp_server.c dap_udp_client.h dap_udp_client.c )
include_directories("${INCLUDE_DIRECTORIES} ${dap_core_INCLUDE_DIRS}") include_directories("${INCLUDE_DIRECTORIES} ${dap_core_INCLUDE_DIRS}")
include_directories("${INCLUDE_DIRECTORIES} ${dap_crypto_INCLUDE_DIRS}") include_directories("${INCLUDE_DIRECTORIES} ${dap_crypto_INCLUDE_DIRS}")
include_directories("${INCLUDE_DIRECTORIES} ${dap_client_INCLUDE_DIRS}")
add_definitions ("${dap_core_DEFINITIONS}") add_definitions ("${dap_core_DEFINITIONS}")
add_definitions ("${dap_crypto_DEFINITIONS}") add_definitions ("${dap_crypto_DEFINITIONS}")
add_definitions ("${dap_client_DEFINITIONS}")
add_library(${PROJECT_NAME} STATIC ${DAP_UDP_SERVER_SRCS}) add_library(${PROJECT_NAME} STATIC ${DAP_UDP_SERVER_SRCS})
set(${PROJECT_NAME}_DEFINITIONS CACHE INTERNAL "${PROJECT_NAME}: Definitions" FORCE) set(${PROJECT_NAME}_DEFINITIONS CACHE INTERNAL "${PROJECT_NAME}: Definitions" FORCE)
......
...@@ -35,25 +35,6 @@ ...@@ -35,25 +35,6 @@
#define LOG_TAG "udp_client" #define LOG_TAG "udp_client"
/**
* @brief udp_client_init Init clients module
* @return Zero if ok others if no
*/
int dap_udp_client_init()
{
log_it(L_NOTICE,"Initialized socket client module");
return 0;
}
/**
* @brief udp_client_deinit Deinit clients module
*/
void dap_udp_client_deinit()
{
}
/** /**
* @brief get_key Make key for hash table from host and port * @brief get_key Make key for hash table from host and port
* @return 64 bit Key * @return 64 bit Key
...@@ -73,24 +54,31 @@ uint64_t get_key(unsigned long host,unsigned short port){ ...@@ -73,24 +54,31 @@ uint64_t get_key(unsigned long host,unsigned short port){
* @param port Client port * @param port Client port
* @return Pointer to the new list's node * @return Pointer to the new list's node
*/ */
dap_udp_client_t * dap_udp_client_create(dap_udp_server_t * sh, ev_io* w_client, unsigned long host, unsigned short port) dap_client_remote_t * dap_udp_client_create(dap_server_t * sh, ev_io* w_client, unsigned long host, unsigned short port)
{ {
pthread_mutex_lock(&sh->mutex_on_hash); dap_udp_server_t* udp_server = DAP_UDP_SERVER(sh);
log_it(L_DEBUG,"Client structure create"); log_it(L_DEBUG,"Client structure create");
dap_udp_client_t * ret=DAP_NEW_Z(dap_udp_client_t); dap_udp_client_t * inh=DAP_NEW_Z(dap_udp_client_t);
inh->host_key = get_key(host,port);
dap_client_remote_t * ret=DAP_NEW_Z(dap_client_remote_t);
inh->client = ret;
ret->server = sh; ret->server = sh;
ret->watcher_client = w_client; ret->watcher_client = w_client;
ret->signal_close = false; ret->signal_close = false;
ret->host_key = get_key(host,port);
ret->_ready_to_read=true; ret->_ready_to_read=true;
ret->_ready_to_write=false; ret->_ready_to_write=false;
ret->_inheritor = inh;
HASH_ADD_INT( sh->clients, host_key, ret); pthread_mutex_init(&inh->mutex_on_client, NULL);
pthread_mutex_lock(&udp_server->mutex_on_list);
HASH_ADD_INT( udp_server->clients, host_key, inh);
pthread_mutex_unlock(&udp_server->mutex_on_list);
if(sh->client_new_callback) if(sh->client_new_callback)
sh->client_new_callback(ret,NULL); // Init internal structure sh->client_new_callback(ret,NULL); // Init internal structure
pthread_mutex_unlock(&sh->mutex_on_hash);
return ret; return ret;
} }
...@@ -100,9 +88,10 @@ dap_udp_client_t * dap_udp_client_create(dap_udp_server_t * sh, ev_io* w_client, ...@@ -100,9 +88,10 @@ dap_udp_client_t * dap_udp_client_create(dap_udp_server_t * sh, ev_io* w_client,
* @param host Variable for host address * @param host Variable for host address
* @param host Variable for port * @param host Variable for port
*/ */
void dap_udp_client_get_address(dap_udp_client_t *client, unsigned long* host,unsigned short* port){ void dap_udp_client_get_address(dap_client_remote_t *client, unsigned long* host,unsigned short* port){
*host = client->host_key >> 32; dap_udp_client_t* udp_client = DAP_UDP_CLIENT(client);
*port = client->host_key - (*host<<32); *host = udp_client->host_key >> 32;
*port = udp_client->host_key - (*host<<32);
} }
/** /**
...@@ -112,38 +101,20 @@ void dap_udp_client_get_address(dap_udp_client_t *client, unsigned long* host,un ...@@ -112,38 +101,20 @@ void dap_udp_client_get_address(dap_udp_client_t *client, unsigned long* host,un
* @param port Source port * @param port Source port
* @return Pointer to client or NULL if not found * @return Pointer to client or NULL if not found
*/ */
dap_udp_client_t * dap_udp_client_find(dap_udp_server_t * sh, unsigned long host,unsigned short port) dap_client_remote_t * dap_udp_client_find(dap_server_t * sh, unsigned long host,unsigned short port)
{ {
pthread_mutex_lock(&sh->mutex_on_hash); dap_udp_server_t* udp_server = DAP_UDP_SERVER(sh);
pthread_mutex_lock(&udp_server->mutex_on_list);
dap_udp_client_t* inh = NULL;
dap_udp_client_t * ret = NULL;
uint64_t token = get_key(host,port); uint64_t token = get_key(host,port);
HASH_FIND_INT(sh->clients,&token,ret); HASH_FIND_INT(udp_server->clients,&token,inh);
pthread_mutex_unlock(&sh->mutex_on_hash); pthread_mutex_unlock(&udp_server->mutex_on_list);
return ret; if(inh == NULL)
} return NULL;
else
/** return inh->client;
* @brief udp_client_read Read data from input buffer
* @param sc Client instance
* @param data Pointer to memory where to store the data
* @param data_size Size of data to read
* @return Actual bytes number that were read
*/
size_t dap_udp_client_read(dap_udp_client_t *sc, void * data, size_t data_size)
{
if (data_size < sc->buf_in_size) {
memcpy(data, sc->buf_in, data_size);
memmove(data, sc->buf_in + data_size, sc->buf_in_size - data_size);
} else {
if (data_size > sc->buf_in_size) {
data_size = sc->buf_in_size;
}
memcpy(data, sc->buf_in, data_size);
}
sc->buf_in_size -= data_size;
return data_size;
} }
/** /**
...@@ -151,7 +122,7 @@ size_t dap_udp_client_read(dap_udp_client_t *sc, void * data, size_t data_size) ...@@ -151,7 +122,7 @@ size_t dap_udp_client_read(dap_udp_client_t *sc, void * data, size_t data_size)
* @param sc Client structure * @param sc Client structure
* @param is_ready Flag value * @param is_ready Flag value
*/ */
void dap_udp_client_ready_to_read(dap_udp_client_t * sc,bool is_ready) void dap_udp_client_ready_to_read(dap_client_remote_t * sc,bool is_ready)
{ {
if(is_ready != sc->_ready_to_read) { if(is_ready != sc->_ready_to_read) {
...@@ -175,10 +146,10 @@ void dap_udp_client_ready_to_read(dap_udp_client_t * sc,bool is_ready) ...@@ -175,10 +146,10 @@ void dap_udp_client_ready_to_read(dap_udp_client_t * sc,bool is_ready)
* @param sc Client structure * @param sc Client structure
* @param is_ready Flag value * @param is_ready Flag value
*/ */
void dap_udp_client_ready_to_write(dap_udp_client_t * sc,bool is_ready) void dap_udp_client_ready_to_write(dap_client_remote_t * sc,bool is_ready)
{ {
if(is_ready) // if(is_ready)
add_waiting_client(sc); // Add client to writing queue // add_waiting_client(sc); // Add client to writing queue
if(is_ready != sc->_ready_to_write) { if(is_ready != sc->_ready_to_write) {
uint32_t events = 0; uint32_t events = 0;
sc->_ready_to_write=is_ready; sc->_ready_to_write=is_ready;
...@@ -196,91 +167,40 @@ void dap_udp_client_ready_to_write(dap_udp_client_t * sc,bool is_ready) ...@@ -196,91 +167,40 @@ void dap_udp_client_ready_to_write(dap_udp_client_t * sc,bool is_ready)
} }
/** /**
* @brief udp_client_remove Removes the client from the hashmap * @brief add_waiting_client Add Client to write queue
* @param sc Client instance * @param client Client instance
* @param sh Server instance
*/ */
void dap_udp_client_remove(dap_udp_client_t *sc, dap_udp_server_t * sh) void add_waiting_client(dap_client_remote_t* client){
{ dap_server_t* sh = client->server;
pthread_mutex_lock(&sh->mutex_on_hash); dap_udp_server_t* udp_server = DAP_UDP_SERVER(sh);
dap_udp_client_t* udp_client = DAP_UDP_CLIENT(client);
log_it(L_DEBUG, "Client structure remove");
HASH_DEL(sc->server->clients,sc); pthread_mutex_lock(&udp_server->mutex_on_list);
dap_udp_client_t* udp_cl, *tmp;
LL_FOREACH_SAFE(udp_server->waiting_clients,udp_cl,tmp)
if(udp_cl == udp_client)
{
pthread_mutex_unlock(&udp_server->mutex_on_list);
return;
}
LL_APPEND(udp_server->waiting_clients, udp_client);
pthread_mutex_unlock(&udp_server->mutex_on_list);
if(sc->server->client_delete_callback)
sc->server->client_delete_callback(sc,NULL); // Init internal structure
if(sc->_inheritor)
free(sc->_inheritor);
free(sc);
pthread_mutex_unlock(&sh->mutex_on_hash);
} }
/** size_t dap_udp_client_write(dap_client_remote_t *sc, const void * data, size_t data_size){
* @brief udp_client_write Write data to the client size_t size = dap_client_write(sc,data,data_size);
* @param sc Client instance add_waiting_client(sc);
* @param data Pointer to data return size;
* @param data_size Size of data to write
* @return Number of bytes that were placed into the buffer
*/
size_t dap_udp_client_write(dap_udp_client_t *sc, const void * data, size_t data_size)
{
data_size = ((sc->buf_out_size+data_size)<(sizeof(sc->buf_out)))?data_size:(sizeof(sc->buf_out)-sc->buf_out_size );
memcpy(sc->buf_out+sc->buf_out_size,data,data_size);
sc->buf_out_size+=data_size;
return data_size;
} }
/** size_t dap_udp_client_write_f(dap_client_remote_t *a_client, const char * a_format,...){
* @brief udp_client_write_f Write formatted text to the client size_t size = 0;
* @param a_client Client instance
* @param a_format Format
* @return Number of bytes that were placed into the buffer
*/
size_t dap_udp_client_write_f(dap_udp_client_t *a_client, const char * a_format,...)
{
size_t max_data_size = sizeof(a_client->buf_out)-a_client->buf_out_size;
va_list ap; va_list ap;
va_start(ap,a_format); va_start(ap,a_format);
int ret=vsnprintf(a_client->buf_out+a_client->buf_out_size,max_data_size,a_format,ap); size =dap_client_write_f(a_client,a_format,ap);
va_end(ap); va_end(ap);
if(ret>0){ add_waiting_client(a_client);
a_client->buf_out_size+=ret; return size;
return ret;
}else{
log_it(L_ERROR,"Can't write out formatted data '%s'",a_format);
return 0;
}
}
/**
* @brief add_waiting_client Add Client to write queue
* @param client Client instance
*/
void add_waiting_client(dap_udp_client_t* client){
dap_udp_server_t* serv = client->server;
LL_APPEND(serv->waiting_clients, client);
}
/**
* @brief shrink_client_buf_in Shrink input buffer (shift it left)
* @param cl Client instance
* @param shrink_size Size on wich we shrink the buffer with shifting it left
*/
void dap_udp_client_shrink_buf_in(dap_udp_client_t * cl, size_t shrink_size)
{
if((shrink_size==0)||(cl->buf_in_size==0) ){
return;
}else if(cl->buf_in_size>shrink_size){
size_t buf_size=cl->buf_in_size-shrink_size;
void * buf = malloc(buf_size);
memcpy(buf,cl->buf_in+ shrink_size,buf_size );
memcpy(cl->buf_in,buf,buf_size);
cl->buf_in_size=buf_size;
free(buf);
}else {
cl->buf_in_size=0;
}
} }
...@@ -27,60 +27,39 @@ ...@@ -27,60 +27,39 @@
#include <stdbool.h> #include <stdbool.h>
#include <sys/queue.h> #include <sys/queue.h>
#include "uthash.h" #include "uthash.h"
#include "dap_client_remote.h"
#include <ev.h> #include <ev.h>
typedef struct dap_udp_server dap_udp_server_t; typedef struct dap_udp_server dap_udp_server_t;
struct dap_udp_client; struct dap_udp_client;
typedef void (*dap_udp_client_callback_t) (struct udp_client *,void * arg); // Callback for specific client operations
#define UDP_CLIENT_BUF 100000 #define UDP_CLIENT_BUF 100000
typedef struct dap_udp_client{ typedef struct dap_udp_client{
bool signal_close; dap_client_remote_t* client;
bool _ready_to_write;
bool _ready_to_read;
uint32_t buf_out_zero_count;
char buf_in[UDP_CLIENT_BUF+1]; // Internal buffer for input data
size_t buf_in_size; // size of data that is in the input buffer
char buf_out[UDP_CLIENT_BUF+1]; // Internal buffer for output data
size_t buf_out_size; // size of data that is in the output buffer
uint64_t host_key; //key contains host address in first 4 bytes and port in last 4 bytes uint64_t host_key; //key contains host address in first 4 bytes and port in last 4 bytes
ev_io* watcher_client;
struct dap_udp_server * server;
UT_hash_handle hh; UT_hash_handle hh;
struct dap_udp_client *next, *prev; //pointers for writing queue struct dap_udp_client *next, *prev; //pointers for writing queue
pthread_mutex_t mutex_on_client;
void * _inheritor; // Internal data to specific client type, usualy states for state machine void * _inheritor; // Internal data to specific client type, usualy states for state machine
} dap_udp_client_t; // Node of bidirectional list of clients } dap_udp_client_t; // Node of bidirectional list of clients
#define DAP_UDP_CLIENT(a) ((dap_udp_client_t *) (a)->_inheritor)
int dap_udp_client_init(); // Init clients module dap_client_remote_t * dap_udp_client_create(dap_server_t * sh, ev_io* w_client, unsigned long host, unsigned short port); // Create new client and add it to the list
void dap_udp_client_deinit(); // Deinit clients module dap_client_remote_t * dap_udp_client_find(dap_server_t * sh, unsigned long host, unsigned short port); // Find client by host and port
dap_udp_client_t * dap_udp_client_create(dap_udp_server_t * sh, ev_io* w_client, unsigned long host, unsigned short port); // Create new client and add it to the list
dap_udp_client_t * dap_udp_client_find(dap_udp_server_t * sh, unsigned long host, unsigned short port); // Find client by host and port
void dap_udp_client_ready_to_read(dap_udp_client_t * sc,bool is_ready);
void dap_udp_client_ready_to_write(dap_udp_client_t * sc,bool is_ready);
size_t dap_udp_client_write(dap_udp_client_t *sc, const void * data, size_t data_size); void dap_udp_client_ready_to_read(dap_client_remote_t * sc,bool is_ready);
size_t dap_udp_client_write_f(dap_udp_client_t *a_client, const char * a_format,...); void dap_udp_client_ready_to_write(dap_client_remote_t * sc,bool is_ready);
size_t dap_udp_client_read(dap_udp_client_t *sc, void * data, size_t data_size);
void add_waiting_client(dap_udp_client_t* client); // Add client to writing queue size_t dap_udp_client_write(dap_client_remote_t *sc, const void * data, size_t data_size);
size_t dap_udp_client_write_f(dap_client_remote_t *a_client, const char * a_format,...);
void dap_udp_client_remove(dap_udp_client_t *sc, dap_udp_server_t * sh); // Removes the client from the hash-table void add_waiting_client(dap_client_remote_t* client); // Add client to writing queue
void dap_udp_client_shrink_buf_in(dap_udp_client_t * cl, size_t shrink_size);
#endif #endif
\ No newline at end of file
/*
Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc
All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dap_udp_server.h" #include "dap_udp_server.h"
#include <stdio.h> #include <stdio.h>
#include "dap_common.h" #include "dap_common.h"
...@@ -38,24 +18,6 @@ struct ev_io w_write; ...@@ -38,24 +18,6 @@ struct ev_io w_write;
static void write_cb(struct ev_loop* loop, struct ev_io* watcher, int revents); static void write_cb(struct ev_loop* loop, struct ev_io* watcher, int revents);
/**
* @brief dap_udp_server_init Init server module
* @return Zero if ok others if no
*/
int dap_udp_server_init()
{
signal(SIGPIPE, SIG_IGN);
log_it(L_NOTICE,"Initialized socket server module");
return 0;
}
/**
* @brief dap_udp_server_deinit Deinit server module
*/
void dap_udp_server_deinit()
{
}
/** /**
*/ */
void error(char *msg) { void error(char *msg) {
...@@ -67,11 +29,14 @@ void error(char *msg) { ...@@ -67,11 +29,14 @@ void error(char *msg) {
* @brief dap_udp_server_new Initialize server structure * @brief dap_udp_server_new Initialize server structure
* @return Server pointer * @return Server pointer
*/ */
dap_udp_server_t * dap_udp_server_new() dap_server_t * dap_udp_server_new()
{ {
dap_udp_server_t* server = (dap_udp_server_t*)calloc(1,sizeof(dap_udp_server_t)); dap_udp_server_t* udp_server = (dap_udp_server_t*)calloc(1,sizeof(dap_udp_server_t));
server->waiting_clients = NULL; udp_server->waiting_clients = NULL;
return server; dap_server_t* sh = (dap_server_t*) calloc(1,sizeof(dap_server_t));
sh->_inheritor = udp_server;
udp_server->dap_server = sh;
return sh;
} }
/** /**
...@@ -79,7 +44,7 @@ dap_udp_server_t * dap_udp_server_new() ...@@ -79,7 +44,7 @@ dap_udp_server_t * dap_udp_server_new()
*/ */
void* dap_udp_client_loop(void * arg) void* dap_udp_client_loop(void * arg)
{ {
dap_udp_server_t* sh = (dap_udp_server_t*)arg; dap_server_t* sh = (dap_server_t*)arg;
log_it(L_NOTICE, "Start client listener thread"); log_it(L_NOTICE, "Start client listener thread");
struct ev_loop * ev_client_loop = ev_loop_new(0); struct ev_loop * ev_client_loop = ev_loop_new(0);
w_write.data = sh; w_write.data = sh;
...@@ -93,14 +58,14 @@ void* dap_udp_client_loop(void * arg) ...@@ -93,14 +58,14 @@ void* dap_udp_client_loop(void * arg)
* @brief dap_udp_server_delete Safe delete server structure * @brief dap_udp_server_delete Safe delete server structure
* @param sh Server instance * @param sh Server instance
*/ */
void dap_udp_server_delete(dap_udp_server_t * sh) void dap_udp_server_delete(dap_server_t * sh)
{ {
if(sh->address) if(sh->address)
free(sh->address); free(sh->address);
dap_udp_client_t * client, * tmp; dap_client_remote_t * client, * tmp;
HASH_ITER(hh,sh->clients,client,tmp) HASH_ITER(hh,sh->clients,client,tmp)
dap_udp_client_remove(client, sh); dap_client_remove(client, sh);
if(sh->server_delete_callback) if(sh->server_delete_callback)
sh->server_delete_callback(sh,NULL); sh->server_delete_callback(sh,NULL);
...@@ -114,8 +79,8 @@ void dap_udp_server_delete(dap_udp_server_t * sh) ...@@ -114,8 +79,8 @@ void dap_udp_server_delete(dap_udp_server_t * sh)
* @param port Binding port * @param port Binding port
* @return Server instance * @return Server instance
*/ */
dap_udp_server_t * dap_udp_server_listen(uint16_t port){ dap_server_t * dap_udp_server_listen(uint16_t port){
dap_udp_server_t* sh = dap_udp_server_new(); dap_server_t* sh = dap_udp_server_new();
sh->socket_listener = socket (AF_INET, SOCK_DGRAM, 0); sh->socket_listener = socket (AF_INET, SOCK_DGRAM, 0);
...@@ -139,6 +104,7 @@ dap_udp_server_t * dap_udp_server_listen(uint16_t port){ ...@@ -139,6 +104,7 @@ dap_udp_server_t * dap_udp_server_listen(uint16_t port){
dap_udp_server_delete(sh); dap_udp_server_delete(sh);
return NULL; return NULL;
} }
pthread_mutex_init(&DAP_UDP_SERVER(sh)->mutex_on_list, NULL);
return sh; return sh;
} }
...@@ -148,16 +114,22 @@ dap_udp_server_t * dap_udp_server_listen(uint16_t port){ ...@@ -148,16 +114,22 @@ dap_udp_server_t * dap_udp_server_listen(uint16_t port){
static void write_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) static void write_cb(struct ev_loop* loop, struct ev_io* watcher, int revents)
{ {
if( ( revents & EV_WRITE ) ) { if( ( revents & EV_WRITE ) ) {
dap_udp_server_t* sh = watcher->data; dap_server_t* sh = watcher->data;
dap_udp_client_t * client, * tmp; dap_udp_server_t* udp = DAP_UDP_SERVER(sh);
LL_FOREACH_SAFE(sh->waiting_clients,client,tmp) dap_udp_client_t * udp_client, * tmp;
{ pthread_mutex_lock(&udp->mutex_on_list);
LL_FOREACH_SAFE(udp->waiting_clients,udp_client,tmp)
{
//log_it(L_INFO,"write_cb");
//pthread_mutex_lock(&udp_client->mutex_on_client);
dap_client_remote_t* client = udp_client->client;
if(client != NULL && check_close(client) == 0 && client->_ready_to_write) if(client != NULL && check_close(client) == 0 && client->_ready_to_write)
{ {
if(sh->client_write_callback) if(sh->client_write_callback)
sh->client_write_callback(client, NULL); sh->client_write_callback(client, NULL);
if(client->buf_out_size > 0) if(client->buf_out_size > 0)
{ {
//log_it(L_INFO,"write_cb_client");
for(size_t total_sent = 0; total_sent < client->buf_out_size;) { for(size_t total_sent = 0; total_sent < client->buf_out_size;) {
struct sockaddr_in addr; struct sockaddr_in addr;
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
...@@ -170,12 +142,16 @@ static void write_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) ...@@ -170,12 +142,16 @@ static void write_cb(struct ev_loop* loop, struct ev_io* watcher, int revents)
total_sent += bytes_sent; total_sent += bytes_sent;
} }
client->buf_out_size = 0; client->buf_out_size = 0;
bzero(client->buf_out, DAP_CLIENT_REMOTE_BUF + 1);
} }
LL_DELETE(sh->waiting_clients,client); LL_DELETE(udp->waiting_clients,udp_client);
} }
else if(client == NULL) else if(client == NULL)
LL_DELETE(sh->waiting_clients,client); LL_DELETE(udp->waiting_clients,udp_client);
//pthread_mutex_unlock(&udp_client->mutex_on_client);
} }
pthread_mutex_unlock(&udp->mutex_on_list);
} }
} }
...@@ -184,15 +160,17 @@ static void write_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) ...@@ -184,15 +160,17 @@ static void write_cb(struct ev_loop* loop, struct ev_io* watcher, int revents)
* @param client Client structure * @param client Client structure
* @return 1 if client deleted, 0 if client is no need to delete * @return 1 if client deleted, 0 if client is no need to delete
*/ */
int check_close(dap_udp_client_t* client){ int check_close(dap_client_remote_t* client){
if(client->signal_close) if(client->signal_close)
{ {
dap_udp_server_t* sh = client->server; dap_udp_client_t* udp_client = DAP_UDP_CLIENT(client);
dap_server_t* sh = client->server;
dap_udp_server_t* udp_server = DAP_UDP_SERVER(sh);
dap_udp_client_t * client_check, * tmp; dap_udp_client_t * client_check, * tmp;
LL_FOREACH_SAFE(sh->waiting_clients,client_check,tmp) LL_FOREACH_SAFE(udp_server->waiting_clients,client_check,tmp)
if(client_check->host_key == client->host_key) if(client_check->host_key == udp_client->host_key)
LL_DELETE(sh->waiting_clients,client_check); LL_DELETE(udp_server->waiting_clients,client_check);
dap_udp_client_remove(client, sh); dap_client_remove(client, sh);
return 1; return 1;
} }
return 0; return 0;
...@@ -203,14 +181,15 @@ int check_close(dap_udp_client_t* client){ ...@@ -203,14 +181,15 @@ int check_close(dap_udp_client_t* client){
*/ */
static void read_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) static void read_cb(struct ev_loop* loop, struct ev_io* watcher, int revents)
{ {
//log_it(L_INFO,"read_cb");
if ( revents & EV_READ ) if ( revents & EV_READ )
{ {
struct sockaddr_in clientaddr; struct sockaddr_in clientaddr;
int clientlen = sizeof(clientaddr); int clientlen = sizeof(clientaddr);
dap_udp_server_t* sh = watcher->data; dap_server_t* sh = watcher->data;
bzero(buf, BUFSIZE); bzero(buf, BUFSIZE);
socklen_t bytes = recvfrom(sh->socket_listener, buf, BUFSIZE, 0,(struct sockaddr *) &clientaddr, &clientlen); socklen_t bytes = recvfrom(sh->socket_listener, buf, BUFSIZE, 0,(struct sockaddr *) &clientaddr, &clientlen);
dap_udp_client_t *client = dap_udp_client_find(sh,clientaddr.sin_addr.s_addr,clientaddr.sin_port); dap_client_remote_t *client = dap_udp_client_find(sh,clientaddr.sin_addr.s_addr,clientaddr.sin_port);
if(client != NULL && check_close(client) != 0) if(client != NULL && check_close(client) != 0)
return; return;
if(bytes > 0){ if(bytes > 0){
...@@ -229,8 +208,10 @@ static void read_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) ...@@ -229,8 +208,10 @@ static void read_cb(struct ev_loop* loop, struct ev_io* watcher, int revents)
error("ERROR create client structure\n"); error("ERROR create client structure\n");
} }
} }
dap_udp_client_t* udp_client = client->_inheritor;
pthread_mutex_lock(&udp_client->mutex_on_client);
size_t bytes_processed = 0; size_t bytes_processed = 0;
size_t bytes_recieved = strlen(buf); size_t bytes_recieved = bytes;
while(bytes_recieved > 0){ while(bytes_recieved > 0){
size_t bytes_to_transfer = 0; size_t bytes_to_transfer = 0;
if(bytes_recieved > UDP_CLIENT_BUF - client->buf_in_size) if(bytes_recieved > UDP_CLIENT_BUF - client->buf_in_size)
...@@ -239,11 +220,17 @@ static void read_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) ...@@ -239,11 +220,17 @@ static void read_cb(struct ev_loop* loop, struct ev_io* watcher, int revents)
bytes_to_transfer = bytes_recieved; bytes_to_transfer = bytes_recieved;
memcpy(client->buf_in + client->buf_in_size,buf+bytes_processed,bytes_to_transfer); memcpy(client->buf_in + client->buf_in_size,buf+bytes_processed,bytes_to_transfer);
client->buf_in_size += bytes_to_transfer; client->buf_in_size += bytes_to_transfer;
if(sh->client_read_callback) if(sh->client_read_callback)
sh->client_read_callback(client,NULL); sh->client_read_callback(client,NULL);
bytes_processed += bytes_to_transfer; bytes_processed += bytes_to_transfer;
bytes_recieved -= bytes_to_transfer; bytes_recieved -= bytes_to_transfer;
} }
client->buf_in_size = 0;
bzero(client->buf_in, DAP_CLIENT_REMOTE_BUF + 1);
pthread_mutex_unlock(&udp_client->mutex_on_client);
} }
else if(bytes < 0) else if(bytes < 0)
{ {
...@@ -264,7 +251,7 @@ static void read_cb(struct ev_loop* loop, struct ev_io* watcher, int revents) ...@@ -264,7 +251,7 @@ static void read_cb(struct ev_loop* loop, struct ev_io* watcher, int revents)
* @brief dap_udp_server_loop Start server event loop * @brief dap_udp_server_loop Start server event loop
* @param sh Server instance * @param sh Server instance
*/ */
void dap_udp_server_loop(dap_udp_server_t * sh){ void dap_udp_server_loop(dap_server_t * sh){
sh->proc_thread.tid = pthread_self(); sh->proc_thread.tid = pthread_self();
pthread_t thread; pthread_t thread;
......
...@@ -31,6 +31,8 @@ ...@@ -31,6 +31,8 @@
#include <sys/select.h> #include <sys/select.h>
#include <sys/queue.h> #include <sys/queue.h>
#include "dap_udp_client.h" #include "dap_udp_client.h"
#include "dap_server.h"
#include "dap_client_remote.h"
struct dap_udp_server; struct dap_udp_server;
...@@ -41,41 +43,20 @@ typedef struct dap_udp_thread{ ...@@ -41,41 +43,20 @@ typedef struct dap_udp_thread{
typedef void (*dap_udp_server_callback_t) (struct dap_udp_server *,void * arg); // Callback for specific server's operations typedef void (*dap_udp_server_callback_t) (struct dap_udp_server *,void * arg); // Callback for specific server's operations
typedef struct dap_udp_server{ typedef struct dap_udp_server{
uint16_t port; // Listen port dap_udp_client_t * clients;
char * address; // Listen address
dap_udp_client_t * clients; // Hashmap of clients
dap_udp_client_t * waiting_clients; // List clients for writing data dap_udp_client_t * waiting_clients; // List clients for writing data
pthread_mutex_t mutex_on_list;
int socket_listener; // Socket for listener void* _inheritor;
int epoll_fd; // Epoll fd dap_server_t* dap_server;
struct sockaddr_in listener_addr; // Kernel structure for listener's binded address
void * _inheritor; // Pointer to the internal data, HTTP for example
dap_udp_thread_t proc_thread;
pthread_mutex_t mutex_on_hash;
dap_udp_server_callback_t server_delete_callback;
dap_udp_client_callback_t client_new_callback; // Create new client callback
dap_udp_client_callback_t client_delete_callback; // Delete client callback
dap_udp_client_callback_t client_read_callback; // Read function
dap_udp_client_callback_t client_write_callback; // Write function
dap_udp_client_callback_t client_error_callback; // Error processing function
} dap_udp_server_t; } dap_udp_server_t;
extern int dap_udp_server_init(); // Init server module #define DAP_UDP_SERVER(a) ((dap_udp_server_t *) (a)->_inheritor)
extern void dap_udp_server_deinit(); // Deinit server module
extern void dap_udp_server_delete(dap_udp_server_t * sh); extern void dap_udp_server_delete(dap_server_t * sh);
extern void dap_udp_server_loop(dap_udp_server_t* udp_server); // Start server event loop extern void dap_udp_server_loop(dap_server_t* udp_server); // Start server event loop
extern dap_udp_server_t* dap_udp_server_listen(uint16_t port); // Create and bind server extern dap_server_t* dap_udp_server_listen(uint16_t port); // Create and bind server
#endif #endif
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment