Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (1)
......@@ -2,7 +2,7 @@ project(cellframe-sdk C)
cmake_minimum_required(VERSION 2.8)
set(CMAKE_C_STANDARD 11)
set(CELLFRAME_SDK_NATIVE_VERSION "2.4-0")
set(CELLFRAME_SDK_NATIVE_VERSION "2.4-1")
add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"")
set(DAPSDK_MODULES "")
......
......@@ -133,12 +133,11 @@ dap_events_socket_t * dap_events_socket_create_type_event(dap_worker_t * a_w, da
return NULL;
}else
log_it(L_DEBUG, "Created one-way unnamed pipe %d->%d", l_pipe[0], l_pipe[1]);
l_es->fd = l_pipe[1];
l_es->fd2 = l_pipe[0];
l_es->fd = l_pipe[0];
l_es->fd2 = l_pipe[1];
#endif
#if defined(DAP_EVENTS_CAPS_EPOLL)
struct epoll_event l_ev={0};
int l_event_fd = l_es->fd;
//log_it( L_INFO, "Create event descriptor with queue %d (%p) and add it on epoll fd %d", l_event_fd, l_es, a_w->epoll_fd);
l_es->ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP | EPOLLHUP;
......@@ -219,6 +218,7 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da
ret->socket = a_sock;
ret->events = a_events;
ret->server = a_server;
memcpy(&ret->callbacks,a_callbacks, sizeof ( ret->callbacks) );
ret->flags = DAP_SOCK_READY_TO_READ;
......@@ -229,9 +229,6 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da
HASH_ADD_INT( a_events->sockets, socket, ret );
pthread_rwlock_unlock( &a_events->sockets_rwlock );
if( a_callbacks->new_callback )
a_callbacks->new_callback( ret, NULL ); // Init internal structure
return ret;
}
......
/*
* Authors:
* Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net>
* DeM Labs Ltd. https://demlabs.net
* Copyright (c) 2017
* All rights reserved.
This file is part of DAP SDK the open source project
DAP SDK is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP SDK is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <time.h>
#include <stdio.h>
#include <stdio.h>
#include <stdlib.h>
#include <stddef.h>
#include <errno.h>
#include <signal.h>
#include <sys/timerfd.h>
#include <utlist.h>
#if ! defined(_GNU_SOURCE)
#define _GNU_SOURCE
#endif
#if ! defined (__USE_GNU)
#define __USE_GNU
#endif
#include <sched.h>
#include "dap_common.h"
#include "dap_config.h"
#include "dap_server.h"
#include "dap_worker.h"
#include "dap_events.h"
#define LOG_TAG "dap_server"
static void s_es_server_read(dap_events_socket_t *a_events, void * a_arg);
static void s_es_server_error(dap_events_socket_t *a_events, void * a_arg);
static void s_server_delete(dap_server_t * a_server);
/**
* @brief dap_server_init
* @return
*/
int dap_server_init()
{
log_it(L_NOTICE,"Server module init");
return 0;
}
/**
* @brief dap_server_deinit
*/
void dap_server_deinit()
{
}
/**
* @brief dap_server_delete
* @param a_server
*/
void s_server_delete(dap_server_t * a_server)
{
if(a_server->delete_callback)
a_server->delete_callback(a_server,NULL);
if( a_server->address )
DAP_DELETE(a_server->address );
if( a_server->_inheritor )
DAP_DELETE( a_server->_inheritor );
DAP_DELETE(a_server);
}
/**
* @brief dap_server_new
* @param a_events
* @param a_addr
* @param a_port
* @param a_type
* @return
*/
dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16_t a_port, dap_server_type_t a_type)
{
assert(a_events);
dap_server_t *l_server = DAP_NEW_Z(dap_server_t);
l_server->socket_listener=-1; // To diff it from 0 fd
l_server->address = a_addr? strdup( a_addr) : strdup("0.0.0.0"); // If NULL we listen everything
l_server->port = a_port;
l_server->type = a_type;
if(l_server->type == DAP_SERVER_TCP)
l_server->socket_listener = socket(AF_INET, SOCK_STREAM, 0);
if (l_server->socket_listener < 0) {
int l_errno = errno;
log_it (L_ERROR,"Socket error %s (%d)",strerror(l_errno), l_errno);
DAP_DELETE(l_server);
return NULL;
}
log_it(L_NOTICE,"Listen socket created...");
int reuse=1;
if (setsockopt(l_server->socket_listener, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
log_it(L_WARNING, "Can't set up REUSEADDR flag to the socket");
#ifdef SO_REUSEPORT
if (setsockopt(l_server->socket_listener, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
log_it(L_WARNING, "Can't set up REUSEPORT flag to the socket");
#endif
//create socket
l_server->listener_addr.sin_family = AF_INET;
l_server->listener_addr.sin_port = htons(l_server->port);
inet_pton(AF_INET, l_server->address, &(l_server->listener_addr.sin_addr));
if(bind (l_server->socket_listener, (struct sockaddr *) &(l_server->listener_addr), sizeof(l_server->listener_addr)) < 0){
log_it(L_ERROR,"Bind error: %s",strerror(errno));
return NULL;
}else{
log_it(L_INFO,"Binded %s:%u",l_server->address,l_server->port);
listen(l_server->socket_listener, SOMAXCONN);
}
fcntl( l_server->socket_listener, F_SETFL, O_NONBLOCK);
dap_events_socket_callbacks_t l_callbacks = {{ 0 }};
l_callbacks.read_callback = s_es_server_read;
l_callbacks.error_callback = s_es_server_error;
for(size_t l_worker_id = 0; l_worker_id < dap_events_worker_get_count() ; l_worker_id++){
dap_events_socket_t * l_es = dap_events_socket_wrap_no_add( a_events, l_server->socket_listener, &l_callbacks);
dap_worker_t *l_w = dap_events_worker_get(l_worker_id);
assert(l_w);
if ( l_es){
log_it(L_DEBUG, "Wrapped server socket %p on worker %u", l_es, l_worker_id);
l_es->_inheritor = l_server;
l_es->server = l_server;
l_es->type = DESCRIPTOR_TYPE_SOCKET_LISTENING;
#ifdef DAP_EVENTS_CAPS_EPOLL
// Prepare for multi thread listening
l_es->ev_base_flags = EPOLLET| EPOLLIN | EPOLLEXCLUSIVE;
#endif
dap_worker_add_events_socket( l_es, l_w );
} else{
log_it(L_WARNING, "Can't wrap event socket for %s:%u server", a_addr, a_port);
return NULL;
}
}
return l_server;
}
/**
* @brief s_es_server_error
* @param a_es
* @param a_arg
*/
static void s_es_server_error(dap_events_socket_t *a_es, void * a_arg)
{
(void) a_arg;
(void) a_es;
char l_buf[128];
strerror_r(errno, l_buf, sizeof (l_buf));
log_it(L_WARNING, "Listening socket error: %s, ", l_buf);
}
/**
* @brief s_es_server_read
* @param a_es
* @param a_arg
*/
static void s_es_server_read(dap_events_socket_t *a_es,void * a_arg)
{
(void) a_arg;
a_es->buf_in_size = 0; // It should be 1 so we reset it to 0
//log_it(L_DEBUG, "Server socket %d is active",i);
dap_server_t * l_server = (dap_server_t*) a_es->_inheritor;
if( l_server ){
dap_events_socket_t * l_es_new = NULL;
log_it(L_DEBUG, "Listening socket (binded on %s:%u) got new incomming connection",l_server->address,l_server->port);
struct sockaddr client_addr = {0};
socklen_t client_addr_size = sizeof(struct sockaddr);
int l_es_new_socket;
while ( (l_es_new_socket = accept(a_es->socket ,&client_addr,&client_addr_size)) > 0){
log_it(L_DEBUG, "Accepted new connection (sock %d from %d)", l_es_new_socket, a_es->socket);
l_es_new = dap_server_events_socket_new(a_es->events,l_es_new_socket,&l_server->client_callbacks,l_server);
getnameinfo(&client_addr,client_addr_size, l_es_new->hostaddr
, sizeof(l_es_new->hostaddr),l_es_new->service,sizeof(l_es_new->service),
NI_NUMERICHOST | NI_NUMERICSERV);
log_it(L_INFO,"Connection accepted from %s (%s)", l_es_new->hostaddr, l_es_new->service );
dap_worker_add_events_socket_auto(l_es_new);
}
if ( l_es_new_socket == -1 && errno == EAGAIN){
// Everything is good, we'll receive ACCEPT on next poll
return;
}else{
log_it(L_WARNING,"accept() returned %d",l_es_new_socket);
}
}else
log_it(L_ERROR, "No sap_server object related with socket %d in the select loop",a_es->socket);
}
/**
* @brief dap_server_events_socket_new
* @param a_events
* @param a_sock
* @param a_callbacks
* @param a_server
* @return
*/
dap_events_socket_t * dap_server_events_socket_new(dap_events_t * a_events, int a_sock,
dap_events_socket_callbacks_t * a_callbacks, dap_server_t * a_server)
{
dap_events_socket_t * ret = NULL;
if (a_sock > 0) {
// set it nonblock
//fcntl(a_sock, F_SETFL, O_NONBLOCK);
ret = dap_events_socket_wrap_no_add(a_events, a_sock, a_callbacks);
ret->type = DESCRIPTOR_TYPE_SOCKET;
ret->server = a_server;
} else {
log_it(L_CRITICAL,"Accept error: %s",strerror(errno));
}
return ret;
}
/*
* Authors:
* Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net>
* DeM Labs Ltd. https://demlabs.net
* Copyright (c) 2017
* All rights reserved.
This file is part of DAP SDK the open source project
DAP SDK is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP SDK is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <time.h>
#include <stdio.h>
#include <stdio.h>
#include <stdlib.h>
#include <stddef.h>
#include <errno.h>
#include <signal.h>
#include <sys/timerfd.h>
#include <utlist.h>
#if ! defined(_GNU_SOURCE)
#define _GNU_SOURCE
#endif
#if ! defined (__USE_GNU)
#define __USE_GNU
#endif
#include <sched.h>
#include "dap_common.h"
#include "dap_config.h"
#include "dap_server.h"
#include "dap_worker.h"
#include "dap_events.h"
#define LOG_TAG "dap_server"
static void s_es_server_accept(dap_events_socket_t *a_es, int a_remote_socket, struct sockaddr* a_remote_addr);
static void s_es_server_error(dap_events_socket_t *a_es, void * a_arg);
static void s_es_server_new(dap_events_socket_t *a_es, void * a_arg);
static void s_server_delete(dap_server_t * a_server);
/**
* @brief dap_server_init
* @return
*/
int dap_server_init()
{
log_it(L_NOTICE,"Server module init");
return 0;
}
/**
* @brief dap_server_deinit
*/
void dap_server_deinit()
{
}
/**
* @brief dap_server_delete
* @param a_server
*/
void s_server_delete(dap_server_t * a_server)
{
if(a_server->delete_callback)
a_server->delete_callback(a_server,NULL);
if( a_server->address )
DAP_DELETE(a_server->address );
if( a_server->_inheritor )
DAP_DELETE( a_server->_inheritor );
DAP_DELETE(a_server);
}
/**
* @brief dap_server_new
* @param a_events
* @param a_addr
* @param a_port
* @param a_type
* @return
*/
dap_server_t* dap_server_new(dap_events_t *a_events, const char * a_addr, uint16_t a_port, dap_server_type_t a_type)
{
assert(a_events);
dap_server_t *l_server = DAP_NEW_Z(dap_server_t);
l_server->socket_listener=-1; // To diff it from 0 fd
l_server->address = a_addr? strdup( a_addr) : strdup("0.0.0.0"); // If NULL we listen everything
l_server->port = a_port;
l_server->type = a_type;
if(l_server->type == DAP_SERVER_TCP)
l_server->socket_listener = socket(AF_INET, SOCK_STREAM, 0);
if (l_server->socket_listener < 0) {
int l_errno = errno;
log_it (L_ERROR,"Socket error %s (%d)",strerror(l_errno), l_errno);
DAP_DELETE(l_server);
return NULL;
}
log_it(L_NOTICE,"Listen socket %d created...", l_server->socket_listener);
int reuse=1;
if (setsockopt(l_server->socket_listener, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
log_it(L_WARNING, "Can't set up REUSEADDR flag to the socket");
#ifdef SO_REUSEPORT
if (setsockopt(l_server->socket_listener, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
log_it(L_WARNING, "Can't set up REUSEPORT flag to the socket");
#endif
//create socket
l_server->listener_addr.sin_family = AF_INET;
l_server->listener_addr.sin_port = htons(l_server->port);
inet_pton(AF_INET, l_server->address, &(l_server->listener_addr.sin_addr));
if(bind (l_server->socket_listener, (struct sockaddr *) &(l_server->listener_addr), sizeof(l_server->listener_addr)) < 0){
log_it(L_ERROR,"Bind error: %s",strerror(errno));
close(l_server->socket_listener);
DAP_DELETE(l_server);
return NULL;
}else{
log_it(L_INFO,"Binded %s:%u",l_server->address,l_server->port);
listen(l_server->socket_listener, SOMAXCONN);
}
fcntl( l_server->socket_listener, F_SETFL, O_NONBLOCK);
pthread_mutex_init(&l_server->started_mutex,NULL);
pthread_cond_init(&l_server->started_cond,NULL);
dap_events_socket_callbacks_t l_callbacks;
memset(&l_callbacks,0,sizeof (l_callbacks));
l_callbacks.new_callback = s_es_server_new;
l_callbacks.accept_callback = s_es_server_accept;
l_callbacks.error_callback = s_es_server_error;
for(size_t l_worker_id = 0; l_worker_id < dap_events_worker_get_count() ; l_worker_id++){
dap_worker_t *l_w = dap_events_worker_get(l_worker_id);
assert(l_w);
dap_events_socket_t * l_es = dap_events_socket_wrap2( l_server, a_events, l_server->socket_listener, &l_callbacks);
if ( l_es){
l_es->type = DESCRIPTOR_TYPE_SOCKET_LISTENING;
#ifdef DAP_EVENTS_CAPS_EPOLL
// Prepare for multi thread listening
l_es->ev_base_flags = EPOLLET| EPOLLIN | EPOLLEXCLUSIVE;
#endif
l_es->_inheritor = l_server;
pthread_mutex_lock(&l_server->started_mutex);
dap_worker_add_events_socket( l_es, l_w );
pthread_cond_wait(&l_server->started_cond, &l_server->started_mutex);
pthread_mutex_unlock(&l_server->started_mutex);
} else{
log_it(L_WARNING, "Can't wrap event socket for %s:%u server", a_addr, a_port);
return NULL;
}
}
return l_server;
}
/**
* @brief s_es_server_new
* @param a_es
* @param a_arg
*/
static void s_es_server_new(dap_events_socket_t *a_es, void * a_arg)
{
log_it(L_DEBUG, "Created server socket %p on worker %u", a_es, a_es->worker->id);
dap_server_t *l_server = (dap_server_t*) a_es->_inheritor;
pthread_cond_broadcast( &l_server->started_cond);
}
/**
* @brief s_es_server_error
* @param a_es
* @param a_arg
*/
static void s_es_server_error(dap_events_socket_t *a_es, void * a_arg)
{
(void) a_arg;
(void) a_es;
char l_buf[128];
strerror_r(errno, l_buf, sizeof (l_buf));
log_it(L_WARNING, "Listening socket error: %s, ", l_buf);
}
/**
* @brief s_es_server_accept
* @param a_events
* @param a_remote_socket
* @param a_remote_addr
*/
static void s_es_server_accept(dap_events_socket_t *a_es, int a_remote_socket, struct sockaddr *a_remote_addr)
{
socklen_t a_remote_addr_size = sizeof(*a_remote_addr);
a_es->buf_in_size = 0; // It should be 1 so we reset it to 0
//log_it(L_DEBUG, "Server socket %d is active",i);
dap_server_t * l_server = (dap_server_t*) a_es->_inheritor;
assert(l_server);
dap_events_socket_t * l_es_new = NULL;
log_it(L_DEBUG, "Listening socket (binded on %s:%u) got new incomming connection",l_server->address,l_server->port);
log_it(L_DEBUG, "Accepted new connection (sock %d from %d)", a_remote_socket, a_es->socket);
l_es_new = dap_server_events_socket_new(a_es->events,a_remote_socket,&l_server->client_callbacks,l_server);
getnameinfo(a_remote_addr,a_remote_addr_size, l_es_new->hostaddr
, sizeof(l_es_new->hostaddr),l_es_new->service,sizeof(l_es_new->service),
NI_NUMERICHOST | NI_NUMERICSERV);
log_it(L_INFO,"Connection accepted from %s (%s)", l_es_new->hostaddr, l_es_new->service );
dap_worker_add_events_socket_auto(l_es_new);
}
/**
* @brief dap_server_events_socket_new
* @param a_events
* @param a_sock
* @param a_callbacks
* @param a_server
* @return
*/
dap_events_socket_t * dap_server_events_socket_new(dap_events_t * a_events, int a_sock,
dap_events_socket_callbacks_t * a_callbacks, dap_server_t * a_server)
{
dap_events_socket_t * ret = NULL;
if (a_sock > 0) {
// set it nonblock
//fcntl(a_sock, F_SETFL, O_NONBLOCK);
ret = dap_events_socket_wrap_no_add(a_events, a_sock, a_callbacks);
ret->type = DESCRIPTOR_TYPE_SOCKET;
ret->server = a_server;
} else {
log_it(L_CRITICAL,"Accept error: %s",strerror(errno));
}
return ret;
}
......@@ -138,7 +138,8 @@ void *dap_worker_thread(void *arg)
}
l_cur->last_time_active = l_cur_time;
//log_it(L_DEBUG, "Worker=%d fd=%d socket=%d event=0x%x(%d)", w->number_thread, w->epoll_fd,cur->socket, events[n].events,events[n].events);
log_it(L_DEBUG, "Worker=%d fd=%d socket=%d event=0x%x(%d)", l_worker->id,
l_worker->epoll_fd,l_cur->socket, l_epoll_events[n].events,l_epoll_events[n].events);
int l_sock_err = 0, l_sock_err_size = sizeof(l_sock_err);
//connection already closed (EPOLLHUP - shutdown has been made in both directions)
if(l_epoll_events[n].events & EPOLLHUP) { // && events[n].events & EPOLLERR) {
......@@ -195,6 +196,24 @@ void *dap_worker_thread(void *arg)
break;
case DESCRIPTOR_TYPE_SOCKET_LISTENING:
// Accept connection
if ( l_cur->callbacks.accept_callback){
struct sockaddr l_remote_addr;
socklen_t l_remote_addr_size= sizeof (l_remote_addr);
int l_remote_socket= accept(l_cur->socket ,&l_remote_addr,&l_remote_addr_size);
int l_errno = errno;
if ( l_remote_socket == -1 ){
if( l_errno == EAGAIN || l_errno == EWOULDBLOCK){// Everything is good, we'll receive ACCEPT on next poll
continue;
}else{
char l_errbuf[128];
strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
log_it(L_WARNING,"accept() on socket %d error:\"%s\"(%d)",l_cur->socket, l_errbuf,l_errno);
}
}
l_cur->callbacks.accept_callback(l_cur,l_remote_socket,&l_remote_addr);
}else
log_it(L_ERROR,"No accept_callback on listening socket");
break;
case DESCRIPTOR_TYPE_TIMER:{
uint64_t val;
......@@ -224,7 +243,12 @@ void *dap_worker_thread(void *arg)
if(l_bytes_read > 0) {
l_cur->buf_in_size += l_bytes_read;
//log_it(DEBUG, "Received %d bytes", bytes_read);
l_cur->callbacks.read_callback(l_cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well
if(l_cur->callbacks.read_callback)
l_cur->callbacks.read_callback(l_cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well
else{
log_it(L_WARNING, "We have incomming %u data but no read callback on socket %d, removing from read set", l_cur->socket);
dap_events_socket_set_readable_unsafe(l_cur,false);
}
}
else if(l_bytes_read < 0) {
if (l_errno != EAGAIN && l_errno != EWOULDBLOCK){ // Socket is blocked
......@@ -287,19 +311,17 @@ void *dap_worker_thread(void *arg)
break;
}
}else{
l_bytes_sent += l_bytes_sent;
//log_it(L_DEBUG, "Output: %u from %u bytes are sent ", total_sent,sa_cur->buf_out_size);
//log_it(L_DEBUG, "Output: %u from %u bytes are sent ", l_bytes_sent,l_cur->buf_out_size);
//}
//log_it(L_DEBUG,"Output: sent %u bytes",total_sent);
if (l_bytes_sent) {
pthread_mutex_lock(&l_cur->mutex);
l_cur->buf_out_size -= l_bytes_sent;
//log_it(L_DEBUG,"Output: left %u bytes in buffer",l_cur->buf_out_size);
if (l_cur->buf_out_size) {
memmove(l_cur->buf_out, &l_cur->buf_out[l_bytes_sent], l_cur->buf_out_size);
} else {
l_cur->flags &= ~DAP_SOCK_READY_TO_WRITE;
}
pthread_mutex_unlock(&l_cur->mutex);
}
}
}
......@@ -358,14 +380,14 @@ static void s_new_es_callback( dap_events_socket_t * a_es, void * a_arg)
pthread_rwlock_unlock(&w->events->sockets_rwlock);
struct epoll_event l_ev={0};
l_ev.events = l_es_new->flags ;
l_es_new->ev.events = l_es_new->ev_base_flags ;
if(l_es_new->flags & DAP_SOCK_READY_TO_READ )
l_ev.events |= EPOLLIN;
l_es_new->ev.events |= EPOLLIN;
if(l_es_new->flags & DAP_SOCK_READY_TO_WRITE )
l_ev.events |= EPOLLOUT;
l_ev.data.ptr = l_es_new;
l_es_new->ev.events |= EPOLLOUT;
l_es_new->ev.data.ptr = l_es_new;
if ( epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, l_es_new->socket, &l_ev) == 1 )
if ( epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, l_es_new->socket, &l_es_new->ev) == 1 )
log_it(L_CRITICAL,"Can't add event socket's handler to epoll_fd");
else{
log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id);
......
......@@ -59,11 +59,12 @@ typedef struct dap_worker dap_worker_t;
typedef struct dap_server dap_server_t;
typedef void (*dap_events_socket_callback_t) (dap_events_socket_t *,void * ); // Callback for specific client operations
typedef void (*dap_events_socket_callback_timer_t) (dap_events_socket_t * ); // Callback for specific client operations
typedef void (*dap_events_socket_callback_accept_t) (dap_events_socket_t * , int, struct sockaddr* ); // Callback for accept of new connection
typedef void (*dap_events_socket_worker_callback_t) (dap_events_socket_t *,dap_worker_t * ); // Callback for specific client operations
typedef struct dap_events_socket_callbacks {
union{
dap_events_socket_callback_t accept_callback; // Accept callback for listening socket
dap_events_socket_callback_accept_t accept_callback; // Accept callback for listening socket
dap_events_socket_callback_timer_t timer_callback; // Timer callback for listening socket
dap_events_socket_callback_t event_callback; // Timer callback for listening socket
dap_events_socket_callback_t action_callback; // Callback for action with socket
......@@ -160,6 +161,8 @@ dap_events_socket_t * dap_events_socket_create_type_event(dap_worker_t * a_w, da
void dap_events_socket_send_event( dap_events_socket_t * a_es, void* a_arg);
dap_events_socket_t * dap_events_socket_wrap_no_add(struct dap_events * a_events,
int s, dap_events_socket_callbacks_t * a_callbacks); // Create new client and add it to the list
dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct dap_events *a_events,
int a_sock, dap_events_socket_callbacks_t *a_callbacks );
dap_events_socket_t * dap_events_socket_find(int sock, struct dap_events * sh); // Find client by socket
......
......@@ -75,6 +75,9 @@ typedef struct dap_server {
dap_server_callback_t delete_callback;
dap_events_socket_callbacks_t client_callbacks; // Callbacks for the new clients
pthread_cond_t started_cond; // Condition for initialized socket
pthread_mutex_t started_mutex; // Mutex for shared operation between mirrored sockets
} dap_server_t;
int dap_server_init( ); // Init server module
......
/*
* Authors:
* Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net>
* DeM Labs Ltd. https://demlabs.net
* Copyright (c) 2017
* All rights reserved.
This file is part of DAP SDK the open source project
DAP SDK is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP SDK is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <unistd.h>
#include <dirent.h>
#include <errno.h>
#ifndef _WIN32
#include <sys/types.h>
#include <sys/stat.h>
#else
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#endif
#include <pthread.h>
#include <magic.h>
#include "dap_common.h"
#include "dap_events_socket.h"
#include "dap_http.h"
#include "dap_http_client.h"
#include "dap_http_folder.h"
#include "http_status_code.h"
typedef struct dap_http_url_proc_folder {
char local_path[4096];
magic_t mime_detector;
} dap_http_url_proc_folder_t;
#define URL_PROC_FOLDER(a) ((dap_http_url_proc_folder_t*) (a)->_inhertior )
typedef struct dap_http_file{
FILE * fd;
size_t position;
char local_path[4096+2048+1];
dap_http_client_t *client;
} dap_http_file_t;
#define DAP_HTTP_FILE(a) ((dap_http_file_t*) (a)->_inheritor )
void dap_http_folder_headers_read( dap_http_client_t *cl_ht, void *arg );
void dap_http_folder_headers_write( dap_http_client_t *cl_ht, void *arg );
void dap_http_folder_data_read( dap_http_client_t *cl_ht, void *arg );
void dap_http_folder_data_write( dap_http_client_t *cl_ht, void *arg );
#define LOG_TAG "dap_http_folder"
int dap_http_folder_init( )
{
return 0;
}
void dap_http_folder_deinit( )
{
}
/**
* @brief dap_http_folder_add Add folder for reading to the HTTP server
* @param sh Server instance
* @param url_path Beginning part of the URL
* @param local_path Local path that will be read for
*/
int dap_http_folder_add( dap_http_t *sh, const char *url_path, const char *local_path )
{
if ( !local_path ) {
log_it( L_ERROR, "Directory Path parameter is empty!" );
return -11;
}
log_it( L_DEBUG, "Checking url path %s", local_path );
#ifndef _WIN32
DIR *dirptr = opendir( local_path );
if ( dirptr == NULL ) {
log_it( L_ERROR, "Directory Not Found!" );
return -11;
}
else {
closedir( dirptr );
}
#else // WIN32
DWORD attr = GetFileAttributesA( local_path );
if ( attr == INVALID_FILE_ATTRIBUTES || !(attr & FILE_ATTRIBUTE_DIRECTORY) ) {
log_it( L_ERROR, "Directory Not Found!" );
return -11;
}
#endif
log_it( L_NOTICE, "File service for %s => %s ", url_path, local_path );
dap_http_url_proc_folder_t *up_folder = (dap_http_url_proc_folder_t *)calloc( 1, sizeof(dap_http_url_proc_folder_t) );
strncpy( up_folder->local_path, local_path, sizeof(up_folder->local_path)-1 );
up_folder->mime_detector = magic_open( MAGIC_SYMLINK | MAGIC_MIME | MAGIC_PRESERVE_ATIME );
if ( up_folder->mime_detector == NULL) {
log_it( L_CRITICAL,"Can't init MIME detection library" );
free( up_folder );
return -1;
}
#ifndef _WIN32
if( 0 != magic_load( up_folder->mime_detector, NULL) ) {
#else
if( 0 != magic_load( up_folder->mime_detector, "data.mag" ) ) {
#endif
log_it( L_CRITICAL, "Can't load MIME magic detection database" );
magic_close( up_folder->mime_detector );
free( up_folder );
return -2;
}
dap_http_add_proc( sh,
url_path,
up_folder,
NULL,
NULL,
dap_http_folder_headers_read,
dap_http_folder_headers_write,
dap_http_folder_data_read,
dap_http_folder_data_write,
NULL );
return 0;
}
/**
* @brief dap_http_folder_headers_read Signal thats HTTP client is now going to output the data
* @param cl_ht HTTP client instance
* @param arg Not used
*/
void dap_http_folder_headers_read(dap_http_client_t * cl_ht, void * arg)
{
(void) arg;
cl_ht->state_write=DAP_HTTP_CLIENT_STATE_START;
cl_ht->state_read=cl_ht->keep_alive?DAP_HTTP_CLIENT_STATE_START:DAP_HTTP_CLIENT_STATE_NONE;
dap_events_socket_set_writable_unsafe(cl_ht->esocket,true);
dap_events_socket_set_readable_unsafe(cl_ht->esocket, cl_ht->keep_alive);
}
#ifdef _WIN32
time_t FileTimeToUnixTime( FILETIME ft )
{
ULARGE_INTEGER ull;
ull.LowPart = ft.dwLowDateTime;
ull.HighPart = ft.dwHighDateTime;
return ull.QuadPart / 10000000ULL - 11644473600ULL;
}
#endif
/**
* @brief dap_http_folder_headers Prepare response HTTP headers for file folder request
* @param cl_ht HTTP client instane
* @param arg Not used
*/
void dap_http_folder_headers_write( dap_http_client_t *cl_ht, void * arg)
{
(void) arg;
// Get specific data for folder URL processor
dap_http_url_proc_folder_t * up_folder=(dap_http_url_proc_folder_t*) cl_ht->proc->_inheritor;
// Init specific file response data for HTTP client instance
cl_ht->_inheritor=DAP_NEW_Z(dap_http_file_t);
dap_http_file_t* cl_ht_file=DAP_HTTP_FILE(cl_ht);
cl_ht_file->client=cl_ht;
// Produce local path for file to open
dap_snprintf(cl_ht_file->local_path,sizeof(cl_ht_file->local_path),"%s/%s", up_folder->local_path, cl_ht->url_path );
log_it(L_DEBUG, "Check %s file", cl_ht_file->local_path);
#ifndef _WIN32
struct stat file_stat;
if ( stat(cl_ht_file->local_path, &file_stat) != 0 )
goto err;
cl_ht->out_last_modified = file_stat.st_mtime;
cl_ht->out_content_length = file_stat.st_size;
#else
FILETIME CreationTime;
FILETIME LastAccessTime;
FILETIME LastWriteTime;
HANDLE fileh = CreateFileA( cl_ht_file->local_path,
GENERIC_READ,
FILE_SHARE_READ,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_ARCHIVE,
NULL
);
if ( fileh == INVALID_HANDLE_VALUE )
goto err;
GetFileTime( fileh,
&CreationTime,
&LastAccessTime,
&LastWriteTime );
cl_ht->out_last_modified = FileTimeToUnixTime( LastWriteTime );
cl_ht->out_content_length = GetFileSize( fileh, NULL );
CloseHandle( fileh );
#endif
cl_ht_file->fd = fopen( cl_ht_file->local_path, "rb" );
if ( cl_ht_file->fd == NULL ) {
log_it(L_ERROR, "Can't open %s: %s",cl_ht_file->local_path,strerror(errno));
cl_ht->reply_status_code = Http_Status_NotFound;
strncpy( cl_ht->reply_reason_phrase, "Not Found", sizeof(cl_ht->reply_reason_phrase)-1 );
}
else {
cl_ht->reply_status_code = Http_Status_OK;
strncpy( cl_ht->reply_reason_phrase,"OK",sizeof(cl_ht->reply_reason_phrase)-1 );
const char *mime_type = magic_file( up_folder->mime_detector, cl_ht_file->local_path );
if( mime_type ) {
strncpy(cl_ht->out_content_type,mime_type,sizeof(cl_ht->out_content_type)-1);
log_it( L_DEBUG, "MIME type detected: '%s'", mime_type );
}
else {
cl_ht->reply_status_code=Http_Status_NotFound;
cl_ht->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE;
log_it(L_WARNING,"Can't detect MIME type of %s file: %s",cl_ht_file->local_path,magic_error(up_folder->mime_detector));
}
}
return;
err:
log_it( L_WARNING, "Can't get file info: %s", strerror(errno) );
cl_ht->reply_status_code = 404;
strncpy( cl_ht->reply_reason_phrase, "Not Found", sizeof(cl_ht->reply_reason_phrase)-1 );
return;
}
/**
* @brief dap_http_folder_read HTTP client callback for reading function for the folder processing
* @param cl_ht HTTP client instance
* @param arg Pointer to int with return bytes number
*/
void dap_http_folder_data_read(dap_http_client_t * cl_ht, void * arg)
{
int * bytes_return = (int*) arg; // Return number of read bytes
//Do nothing
*bytes_return=cl_ht->esocket->buf_in_size;
}
/**
* @brief dap_http_folder_write HTTP client callback for writting function for the folder processing
* @param cl_ht HTTP client instance
* @param arg
*/
void dap_http_folder_data_write(dap_http_client_t * cl_ht, void * arg)
{
(void) arg;
dap_http_file_t * cl_ht_file= DAP_HTTP_FILE(cl_ht);
cl_ht->esocket->buf_out_size=fread(cl_ht->esocket->buf_out,1,sizeof(cl_ht->esocket->buf_out),cl_ht_file->fd);
cl_ht_file->position+=cl_ht->esocket->buf_out_size;
if(feof(cl_ht_file->fd)!=0){
log_it(L_INFO, "All the file %s is sent out",cl_ht_file->local_path);
//strncat(cl_ht->client->buf_out+cl_ht->client->buf_out_size,"\r\n",sizeof(cl_ht->client->buf_out));
fclose(cl_ht_file->fd);
dap_events_socket_set_writable_unsafe(cl_ht->esocket,false);
if ( !cl_ht->keep_alive )
cl_ht->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE;
cl_ht->state_write=DAP_HTTP_CLIENT_STATE_NONE;
}
}
/*
* Authors:
* Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net>
* DeM Labs Ltd. https://demlabs.net
* Copyright (c) 2017
* All rights reserved.
This file is part of DAP SDK the open source project
DAP SDK is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP SDK is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <unistd.h>
#include <dirent.h>
#include <errno.h>
#ifndef _WIN32
#include <sys/types.h>
#include <sys/stat.h>
#else
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#endif
#include <pthread.h>
#include <magic.h>
#include "dap_common.h"
#include "dap_events_socket.h"
#include "dap_http.h"
#include "dap_http_client.h"
#include "dap_http_folder.h"
#include "http_status_code.h"
typedef struct dap_http_url_proc_folder {
char local_path[4096];
magic_t mime_detector;
} dap_http_url_proc_folder_t;
#define URL_PROC_FOLDER(a) ((dap_http_url_proc_folder_t*) (a)->_inhertior )
typedef struct dap_http_file{
FILE * fd;
size_t position;
char local_path[4096+2048+1];
dap_http_client_t *client;
} dap_http_file_t;
#define DAP_HTTP_FILE(a) ((dap_http_file_t*) (a)->_inheritor )
void dap_http_folder_headers_read( dap_http_client_t *cl_ht, void *arg );
void dap_http_folder_headers_write( dap_http_client_t *cl_ht, void *arg );
void dap_http_folder_data_read( dap_http_client_t *cl_ht, void *arg );
void dap_http_folder_data_write( dap_http_client_t *cl_ht, void *arg );
#define LOG_TAG "dap_http_folder"
int dap_http_folder_init( )
{
return 0;
}
void dap_http_folder_deinit( )
{
}
/**
* @brief dap_http_folder_add Add folder for reading to the HTTP server
* @param sh Server instance
* @param url_path Beginning part of the URL
* @param local_path Local path that will be read for
*/
int dap_http_folder_add( dap_http_t *sh, const char *url_path, const char *local_path )
{
if ( !local_path ) {
log_it( L_ERROR, "Directory Path parameter is empty!" );
return -11;
}
log_it( L_DEBUG, "Checking url path %s", local_path );
#ifndef _WIN32
DIR *dirptr = opendir( local_path );
if ( dirptr == NULL ) {
log_it( L_ERROR, "Directory Not Found!" );
return -11;
}
else {
closedir( dirptr );
}
#else // WIN32
DWORD attr = GetFileAttributesA( local_path );
if ( attr == INVALID_FILE_ATTRIBUTES || !(attr & FILE_ATTRIBUTE_DIRECTORY) ) {
log_it( L_ERROR, "Directory Not Found!" );
return -11;
}
#endif
log_it( L_NOTICE, "File service for %s => %s ", url_path, local_path );
dap_http_url_proc_folder_t *up_folder = (dap_http_url_proc_folder_t *)calloc( 1, sizeof(dap_http_url_proc_folder_t) );
strncpy( up_folder->local_path, local_path, sizeof(up_folder->local_path)-1 );
up_folder->mime_detector = magic_open( MAGIC_SYMLINK | MAGIC_MIME | MAGIC_PRESERVE_ATIME );
if ( up_folder->mime_detector == NULL) {
log_it( L_CRITICAL,"Can't init MIME detection library" );
free( up_folder );
return -1;
}
#ifndef _WIN32
if( 0 != magic_load( up_folder->mime_detector, NULL) ) {
#else
if( 0 != magic_load( up_folder->mime_detector, "data.mag" ) ) {
#endif
log_it( L_CRITICAL, "Can't load MIME magic detection database" );
magic_close( up_folder->mime_detector );
free( up_folder );
return -2;
}
dap_http_add_proc( sh,
url_path,
up_folder,
NULL,
NULL,
dap_http_folder_headers_read,
dap_http_folder_headers_write,
dap_http_folder_data_read,
dap_http_folder_data_write,
NULL );
return 0;
}
/**
* @brief dap_http_folder_headers_read Signal thats HTTP client is now going to output the data
* @param cl_ht HTTP client instance
* @param arg Not used
*/
void dap_http_folder_headers_read(dap_http_client_t * cl_ht, void * arg)
{
(void) arg;
cl_ht->state_write=DAP_HTTP_CLIENT_STATE_START;
cl_ht->state_read=cl_ht->keep_alive?DAP_HTTP_CLIENT_STATE_START:DAP_HTTP_CLIENT_STATE_NONE;
dap_events_socket_set_writable_unsafe(cl_ht->esocket,true);
dap_events_socket_set_readable_unsafe(cl_ht->esocket, cl_ht->keep_alive);
}
#ifdef _WIN32
time_t FileTimeToUnixTime( FILETIME ft )
{
ULARGE_INTEGER ull;
ull.LowPart = ft.dwLowDateTime;
ull.HighPart = ft.dwHighDateTime;
return ull.QuadPart / 10000000ULL - 11644473600ULL;
}
#endif
/**
* @brief dap_http_folder_headers Prepare response HTTP headers for file folder request
* @param cl_ht HTTP client instane
* @param arg Not used
*/
void dap_http_folder_headers_write( dap_http_client_t *cl_ht, void * arg)
{
(void) arg;
// Get specific data for folder URL processor
dap_http_url_proc_folder_t * up_folder=(dap_http_url_proc_folder_t*) cl_ht->proc->_inheritor;
// Init specific file response data for HTTP client instance
cl_ht->_inheritor=DAP_NEW_Z(dap_http_file_t);
dap_http_file_t* cl_ht_file=DAP_HTTP_FILE(cl_ht);
cl_ht_file->client=cl_ht;
// Produce local path for file to open
dap_snprintf(cl_ht_file->local_path,sizeof(cl_ht_file->local_path),"%s/%s", up_folder->local_path, cl_ht->url_path );
log_it(L_DEBUG, "Check %s file", cl_ht_file->local_path);
#ifndef _WIN32
struct stat file_stat;
if ( stat(cl_ht_file->local_path, &file_stat) != 0 )
goto err;
cl_ht->out_last_modified = file_stat.st_mtime;
cl_ht->out_content_length = file_stat.st_size;
#else
FILETIME CreationTime;
FILETIME LastAccessTime;
FILETIME LastWriteTime;
HANDLE fileh = CreateFileA( cl_ht_file->local_path,
GENERIC_READ,
FILE_SHARE_READ,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_ARCHIVE,
NULL
);
if ( fileh == INVALID_HANDLE_VALUE )
goto err;
GetFileTime( fileh,
&CreationTime,
&LastAccessTime,
&LastWriteTime );
cl_ht->out_last_modified = FileTimeToUnixTime( LastWriteTime );
cl_ht->out_content_length = GetFileSize( fileh, NULL );
CloseHandle( fileh );
#endif
cl_ht_file->fd = fopen( cl_ht_file->local_path, "rb" );
if ( cl_ht_file->fd == NULL ) {
log_it(L_ERROR, "Can't open %s: %s",cl_ht_file->local_path,strerror(errno));
cl_ht->reply_status_code = Http_Status_NotFound;
strncpy( cl_ht->reply_reason_phrase, "Not Found", sizeof(cl_ht->reply_reason_phrase)-1 );
}
else {
cl_ht->reply_status_code = Http_Status_OK;
strncpy( cl_ht->reply_reason_phrase,"OK",sizeof(cl_ht->reply_reason_phrase)-1 );
const char *mime_type = magic_file( up_folder->mime_detector, cl_ht_file->local_path );
if( mime_type ) {
strncpy(cl_ht->out_content_type,mime_type,sizeof(cl_ht->out_content_type)-1);
log_it( L_DEBUG, "MIME type detected: '%s'", mime_type );
}
else {
cl_ht->reply_status_code=Http_Status_NotFound;
cl_ht->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE;
log_it(L_WARNING,"Can't detect MIME type of %s file: %s",cl_ht_file->local_path,magic_error(up_folder->mime_detector));
}
}
return;
err:
log_it( L_WARNING, "Can't get file info: %s", strerror(errno) );
cl_ht->reply_status_code = 404;
strncpy( cl_ht->reply_reason_phrase, "Not Found", sizeof(cl_ht->reply_reason_phrase)-1 );
return;
}
/**
* @brief dap_http_folder_read HTTP client callback for reading function for the folder processing
* @param cl_ht HTTP client instance
* @param arg Pointer to int with return bytes number
*/
void dap_http_folder_data_read(dap_http_client_t * cl_ht, void * arg)
{
int * bytes_return = (int*) arg; // Return number of read bytes
//Do nothing
*bytes_return=cl_ht->esocket->buf_in_size;
}
/**
* @brief dap_http_folder_write HTTP client callback for writting function for the folder processing
* @param cl_ht HTTP client instance
* @param arg
*/
void dap_http_folder_data_write(dap_http_client_t * cl_ht, void * arg)
{
(void) arg;
dap_http_file_t * cl_ht_file= DAP_HTTP_FILE(cl_ht);
cl_ht->esocket->buf_out_size=fread(cl_ht->esocket->buf_out,1,sizeof(cl_ht->esocket->buf_out),cl_ht_file->fd);
cl_ht_file->position+=cl_ht->esocket->buf_out_size;
if(feof(cl_ht_file->fd)!=0){
log_it(L_INFO, "All the file %s is sent out",cl_ht_file->local_path);
//strncat(cl_ht->client->buf_out+cl_ht->client->buf_out_size,"\r\n",sizeof(cl_ht->client->buf_out));
fclose(cl_ht_file->fd);
dap_events_socket_set_writable_unsafe(cl_ht->esocket,false);
if ( !cl_ht->keep_alive )
cl_ht->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE;
cl_ht->state_write=DAP_HTTP_CLIENT_STATE_NONE;
}
}