From a40e93f2c4d710fcacbd9836228965ddc7626a3e Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net> Date: Mon, 7 Jan 2019 16:21:59 +0700 Subject: [PATCH] [*] Subnodules update [-] Moved events processor to libdap-server-core --- CMakeLists.txt | 16 +- dap_events.c | 536 -------------------------------------------- dap_events.h | 80 ------- dap_events_socket.c | 303 ------------------------- dap_events_socket.h | 106 --------- 5 files changed, 6 insertions(+), 1035 deletions(-) delete mode 100755 dap_events.c delete mode 100755 dap_events.h delete mode 100755 dap_events_socket.c delete mode 100755 dap_events_socket.h diff --git a/CMakeLists.txt b/CMakeLists.txt index a187fa9..49d6a9d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,19 +1,15 @@ cmake_minimum_required(VERSION 3.0) - -if(TARGET libdap-client) - return() # The project has already been built. -endif() - -project (libdap-client) +project (dap_client) set(CMAKE_C_STANDARD 11) -add_subdirectory(libdap) -add_subdirectory(libdap-server) -add_subdirectory(libdap-stream) -add_subdirectory(libdap-stream-ch) if(BUILD_LIB_DAP_CLIENT_TESTS) enable_testing() + add_subdirectory(libdap) + add_subdirectory(libdap-server) + add_subdirectory(libdap-stream) + add_subdirectory(libdap-stream-ch) + add_subdirectory(test) endif() diff --git a/dap_events.c b/dap_events.c deleted file mode 100755 index c233759..0000000 --- a/dap_events.c +++ /dev/null @@ -1,536 +0,0 @@ -/* - * Authors: - * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> - * DeM Labs Inc. https://demlabs.net - * Kelvin Project https://github.com/kelvinblockchain - * Copyright (c) 2017-2019 - * All rights reserved. - - This file is part of DAP (Deus Applications Prototypes) the open source project - - DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - DAP is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <arpa/inet.h> -#include <netinet/in.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <sys/stat.h> - -#include <sys/epoll.h> - -#include <netdb.h> -#include <unistd.h> -#include <fcntl.h> -#include <string.h> -#include <time.h> - -#include <stdio.h> -#include <stdio.h> -#include <stdlib.h> -#include <stddef.h> -#include <errno.h> -#include <signal.h> - - - -#if 1 -#include <sys/timerfd.h> -#elif defined(DAP_OS_ANDROID) -#define NO_POSIX_SHED -#define NO_TIMER -#endif - -#include <utlist.h> - -#define _GNU_SOURCE -#define __USE_GNU -#include <sched.h> - -#include "dap_common.h" -#include "dap_events.h" - - -typedef struct open_connection_info { - dap_events_socket_t *es; - struct open_connection_info *next; -} dap_events_socket_info_t; - -dap_events_socket_info_t **s_dap_events_sockets; -static uint8_t s_threads_count = 1; -static size_t s_connection_timeout = 600; - -dap_worker_t * s_workers = NULL; -dap_thread_t * s_threads = NULL; - -#define LOG_TAG "dap_events" -#define MAX_EPOLL_EVENTS 255 - -size_t s_get_cpu_count() -{ -#ifndef NO_POSIX_SHED - cpu_set_t cs; - CPU_ZERO(&cs); - sched_getaffinity(0, sizeof(cs), &cs); - - size_t count = 0; - for (int i = 0; i < 32; i++){ - if (CPU_ISSET(i, &cs)) - count++; - } - return count; -#else - return 1; -#endif -} -/** - * @brief sa_server_init Init server module - * @arg a_threads_count number of events processor workers in parallel threads - * @return Zero if ok others if no - */ -int dap_events_init(size_t a_threads_count,size_t conn_timeout) -{ - s_threads_count = a_threads_count?a_threads_count: s_get_cpu_count(); - - if(conn_timeout)s_connection_timeout=conn_timeout; - - s_workers = (dap_worker_t *) calloc(1,sizeof(dap_worker_t)*s_threads_count ); - s_threads = (dap_thread_t *) calloc(1,sizeof(dap_thread_t)*s_threads_count ); - - if(dap_events_socket_init() != 0 ) - { - log_it(L_CRITICAL, "Can't init client submodule"); - return -1; - } - - s_dap_events_sockets = malloc(sizeof(dap_events_socket_info_t *) * s_threads_count ); - for(int i = 0; i < s_threads_count; i++) - s_dap_events_sockets[i] = NULL; // i == index == thread number - - // *open_connection_info = malloc(sizeof(open_connection_info) * my_config.threads_cnt); - log_it(L_NOTICE,"Initialized socket server module"); - signal(SIGPIPE, SIG_IGN); - return 0; -} - -/** - * @brief sa_server_deinit Deinit server module - */ -void dap_events_deinit() -{ - dap_events_socket_deinit(); -} - - - -/** - * @brief server_new Creates new empty instance of server_t - * @return New instance - */ -dap_events_t * dap_events_new() -{ - dap_events_t* ret=(dap_events_t*) calloc(1,sizeof(dap_events_t)); - pthread_rwlock_init(&ret->sockets_rwlock,NULL); - pthread_rwlock_init(&ret->servers_rwlock,NULL); - - return ret; -} - -/** - * @brief server_delete Delete event processor instance - * @param sh Pointer to the server instance - */ -void dap_events_delete(dap_events_t * a_events) -{ - dap_events_socket_t * cur, * tmp; - - if (a_events) - { - HASH_ITER(hh,a_events->sockets,cur,tmp) - dap_events_socket_delete(cur,false); - - if (a_events->_inheritor) - free(a_events->_inheritor); - pthread_rwlock_destroy(&a_events->sockets_rwlock); - pthread_rwlock_destroy(&a_events->servers_rwlock); - free(a_events); - } -} - -/** - * @brief dap_events_socket_info_remove - * @param cl - * @param n_thread - * @return - */ -static bool dap_events_socket_info_remove(dap_events_socket_t* cl, uint8_t n_thread) -{ - if( n_thread >= s_threads_count ){ - log_it(L_WARNING, "Number thread %u not exists. remove client from list error", n_thread); - return false; - } - dap_events_socket_info_t *el, *tmp; - - LL_FOREACH_SAFE(s_dap_events_sockets[n_thread], el, tmp) - { - if( el->es == cl ) - { - LL_DELETE(s_dap_events_sockets[n_thread], el); - log_it(L_DEBUG, "Removed event socket from the thread's list"); - return true; - } - } - - log_it(L_WARNING, "Try remove client from list but not find." - " Thread: %d client socket %d", n_thread, cl->socket); - return false; -} - -/** - * @brief s_socket_info_all_check_activity - * @param n_thread - * @param sh - */ -static void s_socket_info_all_check_activity(uint8_t n_thread, dap_events_t *sh) -{ -// log_it(L_INFO, "========================================================= Socket check"); -// return; /// TODO debug and make thats shit working, bitch! - dap_events_socket_info_t *ei; - LL_FOREACH(s_dap_events_sockets[n_thread], ei){ - if( ei->es->is_pingable ){ - if(( time(NULL) - ei->es->last_ping_request ) > (time_t) s_connection_timeout ){ // conn timeout - log_it(L_INFO, "Connection on socket %d close by timeout", ei->es->socket); - - dap_events_socket_t * cur = dap_events_socket_find(ei->es->socket, sh); - if ( cur != NULL ){ - dap_events_socket_remove_and_delete( cur ); - } else { - log_it(L_ERROR, "Trying close socket but not find on client hash!"); - close(ei->es->socket); - } - } else if(( time(NULL) - ei->es->last_ping_request ) > (time_t) s_connection_timeout/3 ){ - log_it(L_INFO, "Connection on socket %d last chance to remain alive", ei->es->socket); - - } - } - } -} - -/** - * @brief thread_worker_function - * @param arg - * @return - */ -static void* thread_worker_function(void *arg) -{ - dap_worker_t* w = (dap_worker_t*) arg; - dap_events_socket_t* cur; - -#ifndef NO_POSIX_SHED - cpu_set_t mask; - CPU_ZERO(&mask); - CPU_SET(*(int*)arg, &mask); - - if ( pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask) != 0 ) - { - log_it(L_CRITICAL, "Error pthread_setaffinity_np() You really have %d or more core in CPU?", *(int*)arg); - abort(); - } -#endif - - log_it(L_INFO, "Worker %d started, epoll fd %d", w->number_thread, w->epoll_fd); - struct epoll_event ev, events[MAX_EPOLL_EVENTS]; - memzero(&ev,sizeof(ev)); - memzero(&events,sizeof(events)); -#ifndef NO_TIMER - int timerfd; - if ((timerfd = timerfd_create(CLOCK_MONOTONIC, 0)) < 0) - { - log_it(L_CRITICAL, "Failed to create timer"); - abort(); - } -#endif - - struct itimerspec timerValue; - memzero(&timerValue, sizeof(timerValue)); - - timerValue.it_value.tv_sec = 10; - timerValue.it_value.tv_nsec = 0; - timerValue.it_interval.tv_sec = s_connection_timeout / 2; - timerValue.it_interval.tv_nsec = 0; - - -#ifndef NO_TIMER - ev.events = EPOLLIN; - ev.data.fd = timerfd; - epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, timerfd, &ev); - - if (timerfd_settime(timerfd, 0, &timerValue, NULL) < 0) { - log_it(L_CRITICAL, "Could not start timer"); - abort(); - } -#endif - - size_t total_sent; int bytes_sent; - while(1) { - int selected_sockets = epoll_wait(w->epoll_fd, events, MAX_EPOLL_EVENTS, -1); - // log_it(INFO, "Epoll pwait trigered worker %d", w->number_worker); - for(int n = 0; n < selected_sockets; n++) { -#ifndef NO_TIMER - if (events[n].data.fd == timerfd) { - static uint64_t val; - /* if we not reading data from socket, he triggered again */ - read(events[n].data.fd, &val, 8); - s_socket_info_all_check_activity(w->number_thread, w->events); - } else -#endif - if ( ( cur = dap_events_socket_find(events[n].data.fd, w->events) ) != NULL ) { - if( events[n].events & EPOLLERR ) { - log_it(L_ERROR,"Socket error: %s",strerror(errno)); - cur->signal_close=true; - cur->callbacks->error_callback(cur,NULL); // Call callback to process error event - } else { - if( events[n].events & EPOLLIN ) { - //log_it(DEBUG,"Comes connection in active read set"); - if(cur->buf_in_size == sizeof(cur->buf_in)) - { - log_it(L_WARNING, "Buffer is full when there is smth to read. Its dropped!"); - cur->buf_in_size=0; - } - - int bytes_read = recv(cur->socket, - cur->buf_in + cur->buf_in_size, - sizeof(cur->buf_in)-cur->buf_in_size, 0); - - if(bytes_read > 0) { - cur->buf_in_size += bytes_read; - //log_it(DEBUG, "Received %d bytes", bytes_read); - cur->callbacks->read_callback(cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well - - } else if(bytes_read < 0) { - log_it(L_ERROR,"Some error occured in recv() function: %s",strerror(errno)); - cur->signal_close = true; - } else if (bytes_read == 0) { - log_it(L_INFO, "Client socket disconnected"); - cur->signal_close = true; - } - } - - // Socket is ready to write - if( ( events[n].events & EPOLLOUT || cur->_ready_to_write ) - && ( !cur->signal_close ) ) { - ///log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size); - cur->callbacks->write_callback(cur, NULL); // Call callback to process write event - - if(cur->_ready_to_write) - { - cur->buf_out[cur->buf_out_size]='\0'; - static const uint32_t buf_out_zero_count_max = 20; - if(cur->buf_out_size == 0) - { - log_it(L_WARNING, "Output: nothing to send. Why we are in write socket set?"); - cur->buf_out_zero_count++; - if(cur->buf_out_zero_count > buf_out_zero_count_max) // How many time buf_out on write event could be empty - { - log_it(L_ERROR, "Output: nothing to send %u times, remove socket from the write set",buf_out_zero_count_max); - dap_events_socket_set_writable(cur,false); - } - } - else - cur->buf_out_zero_count=0; - } - - for(total_sent = 0; total_sent < cur->buf_out_size;) - { // If after callback there is smth to send - we do it - bytes_sent = send(cur->socket, - cur->buf_out + total_sent, - cur->buf_out_size - total_sent, - MSG_DONTWAIT | MSG_NOSIGNAL ); - if(bytes_sent < 0) - { - log_it(L_ERROR,"Some error occured in send() function"); - break; - } - total_sent+= bytes_sent; - //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", total_sent,sa_cur->buf_out_size); - } - - //log_it(L_DEBUG,"Output: sent %u bytes",total_sent); - cur->buf_out_size = 0; - } - } - - if(cur->signal_close) - { - log_it(L_INFO, "Got signal to close from the client %s", cur->hostaddr); - dap_events_socket_remove_and_delete(cur); - } - } else { - log_it(L_ERROR,"Socket %d is not present in epoll set", events[n].data.fd); - ev.events = EPOLLIN | EPOLLOUT | EPOLLERR; - ev.data.fd=events[n].data.fd; - - if (epoll_ctl(w->epoll_fd, EPOLL_CTL_DEL, events[n].data.fd, &ev) == -1) - log_it(L_ERROR,"Can't remove not presented socket from the epoll_fd"); - } - } - } - return NULL; -} - -/** - * @brief dap_worker_get_min - * @return - */ -dap_worker_t * dap_worker_get_min() -{ - return &s_workers[dap_worker_get_index_min()]; -} - -/** - * @brief dap_worker_get_index_min - * @return - */ -uint8_t dap_worker_get_index_min() -{ - uint8_t min = 0; - uint8_t i; - for(i = 1; i < s_threads_count; i++) - { - if ( s_workers[min].event_sockets_count > s_workers[i].event_sockets_count ) - min = i; - } - - return min; -} - -/** - * @brief dap_worker_print_all - */ -void dap_worker_print_all() -{ - uint8_t i; - for(i = 0; i < s_threads_count; i++) - { - log_it(L_INFO, "Worker: %d, count open connections: %d", - s_workers[i].number_thread, s_workers[i].event_sockets_count); - } -} - -/** - * @brief sa_server_loop Main server loop - * @param sh Server instance - * @return Zero if ok others if not - */ -int dap_events_start(dap_events_t * a_events) -{ - int i; - for(i = 0; i < s_threads_count; i++) - { - if ( (s_workers[i].epoll_fd = epoll_create(MAX_EPOLL_EVENTS)) == -1 ) - { - log_it(L_CRITICAL, "Error create epoll fd"); - return -1; - } - s_workers[i].event_sockets_count = 0; - s_workers[i].number_thread = i; - s_workers[i].events = a_events; - pthread_mutex_init(&s_workers[i].locker_on_count, NULL); - pthread_create(&s_threads[i].tid, NULL, thread_worker_function, &s_workers[i]); - } - - return 0; -} - -/** - * @brief dap_events_wait - * @param sh - * @return - */ -int dap_events_wait(dap_events_t * sh) -{ - (void) sh; - int i; - for(i = 0; i < s_threads_count; i++){ - void * ret; - pthread_join(s_threads[i].tid,&ret); - } - return 0; -} - - - -/** - * @brief dap_worker_add_events_socket - * @param a_worker - * @param a_events_socket - */ -void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket) -{ - struct epoll_event ev = {0}; - dap_worker_t *l_worker =dap_worker_get_min(); - - ev.events = EPOLLIN | EPOLLERR | EPOLLOUT; - ev.data.fd = a_events_socket->socket; - - - pthread_mutex_lock(&l_worker->locker_on_count); - l_worker->event_sockets_count++; - pthread_mutex_unlock(&l_worker->locker_on_count); - - dap_events_socket_info_t * l_es_info = DAP_NEW_Z(dap_events_socket_info_t); - l_es_info->es = a_events_socket; - a_events_socket->dap_worker = l_worker; - LL_APPEND(s_dap_events_sockets[l_worker->number_thread], l_es_info); - - if ( epoll_ctl(l_worker->epoll_fd, EPOLL_CTL_ADD, a_events_socket->socket, &ev) == 1 ) - log_it(L_CRITICAL,"Can't add event socket's handler to epoll_fd"); - -} - -/** - * @brief dap_events_socket_delete - * @param a_es - */ -void dap_events_socket_remove_and_delete(dap_events_socket_t* a_es) -{ - - struct epoll_event ev={0}; - ev.events = EPOLLIN | EPOLLOUT | EPOLLERR; - ev.data.fd=a_es->socket; - - if (epoll_ctl(a_es->dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &ev) == -1) - log_it(L_ERROR,"Can't remove event socket's handler from the epoll_fd"); - else - log_it(L_DEBUG,"Removed epoll's event from dap_worker #%u",a_es->dap_worker->number_thread); - - pthread_mutex_lock(&a_es->dap_worker->locker_on_count); - a_es->dap_worker->event_sockets_count--; - pthread_mutex_unlock(&a_es->dap_worker->locker_on_count); - - dap_events_socket_info_remove(a_es, a_es->dap_worker->number_thread); - dap_events_socket_delete(a_es,true); - -} - -/** - * @brief dap_events__thread_wake_up - * @param th - */ -void dap_events_thread_wake_up(dap_thread_t * th) -{ - (void) th; - - //pthread_kill(th->tid,SIGUSR1); -} diff --git a/dap_events.h b/dap_events.h deleted file mode 100755 index 1b08dfd..0000000 --- a/dap_events.h +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Authors: - * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> - * DeM Labs Inc. https://demlabs.net - * Kelvin Project https://github.com/kelvinblockchain - * Copyright (c) 2017-2019 - * All rights reserved. - - This file is part of DAP (Deus Applications Prototypes) the open source project - - DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - DAP is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. -*/ -#pragma once - -#include <netinet/in.h> - -#include <stdint.h> -#include <pthread.h> -#include "uthash.h" - -#include "dap_events_socket.h" -#include "dap_server.h" - -struct dap_events; - -typedef void (*dap_events_callback_t) (struct dap_events *,void * arg); // Callback for specific server's operations - -//typedef struct dap_thread{ -// pthread_t tid; -//} dap_thread_t; - -struct dap_worker; -typedef struct dap_events{ - dap_events_socket_t * sockets; // Hashmap of event sockets - pthread_rwlock_t sockets_rwlock; - - void * _inheritor; // Pointer to the internal data, HTTP for example - - dap_thread_t proc_thread; - pthread_rwlock_t servers_rwlock; -} dap_events_t; - -typedef struct dap_worker -{ - int event_sockets_count; - int epoll_fd; - uint8_t number_thread; - pthread_mutex_t locker_on_count; - dap_events_t * events; -} dap_worker_t; - - -int dap_events_init(size_t a_threads_count,size_t conn_t); // Init server module -void dap_events_deinit(); // Deinit server module - -void dap_events_thread_wake_up(dap_thread_t * th); -dap_events_t* dap_events_new(); -void dap_events_delete(dap_events_t * sh); -void dap_events_socket_remove_and_delete(dap_events_socket_t* a_es); - -int dap_events_start(dap_events_t * sh); -int dap_events_wait(dap_events_t * sh); - -uint8_t dap_worker_get_index_min(); -dap_worker_t * dap_worker_get_min(); -void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket); -void dap_worker_print_all(); - - diff --git a/dap_events_socket.c b/dap_events_socket.c deleted file mode 100755 index da97454..0000000 --- a/dap_events_socket.c +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Authors: - * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> - * DeM Labs Inc. https://demlabs.net - * Kelvin Project https://github.com/kelvinblockchain - * Copyright (c) 2017-2019 - * All rights reserved. - - This file is part of DAP (Deus Applications Prototypes) the open source project - - DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - DAP is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <sys/epoll.h> -#include <unistd.h> -#include <fcntl.h> -#include <stdlib.h> -#include <stdio.h> -#include <stdarg.h> -#include <unistd.h> -#include <string.h> -#include <assert.h> -#include "dap_common.h" -#include "dap_events.h" - -#include "dap_events_socket.h" - -#define LOG_TAG "dap_events_socket" - -/** - * @brief dap_events_socket_init Init clients module - * @return Zero if ok others if no - */ -int dap_events_socket_init() -{ - log_it(L_NOTICE,"Initialized socket client module"); - return 0; -} - -/** - * @brief dap_events_socket_deinit Deinit clients module - */ -void dap_events_socket_deinit() -{ - -} - - -/** - * @brief dap_events_socket_wrap - * @param a_events - * @param w - * @param s - * @param a_callbacks - * @return - */ -dap_events_socket_t * dap_events_socket_wrap_no_add( dap_events_t * a_events, - int a_sock, dap_events_socket_callbacks_t * a_callbacks) -{ - assert(a_events); - assert(a_callbacks); - log_it(L_DEBUG,"Dap event socket wrapped around %d sock", a_sock); - dap_events_socket_t * ret = DAP_NEW_Z(dap_events_socket_t); - ret->socket = a_sock; - ret->events = a_events; - ret->callbacks = a_callbacks; - ret->_ready_to_read=true; - return ret; -} - -/** - * @brief dap_events_socket_create_after - * @param a_es - */ -void dap_events_socket_create_after(dap_events_socket_t * a_es) -{ - if(a_es->callbacks->new_callback) - a_es->callbacks->new_callback(a_es,NULL); // Init internal structure - - pthread_rwlock_wrlock(&a_es->events->sockets_rwlock); - a_es->last_ping_request = time(NULL); - HASH_ADD_INT(a_es->events->sockets, socket, a_es); - pthread_rwlock_unlock(&a_es->events->sockets_rwlock); - - dap_worker_add_events_socket(a_es); -} - -/** - * @brief dap_events_socket_wrap - * @param a_events - * @param w - * @param s - * @param a_callbacks - * @return - */ -dap_events_socket_t * dap_events_socket_wrap2(dap_server_t * a_server, struct dap_events * a_events, - int a_sock, dap_events_socket_callbacks_t * a_callbacks) -{ - assert(a_events); - assert(a_callbacks); - assert(a_server); - log_it(L_DEBUG,"Sap event socket wrapped around %d sock", a_sock); - dap_events_socket_t * ret = DAP_NEW_Z(dap_events_socket_t); - ret->socket = a_sock; - ret->events = a_events; - ret->callbacks = a_callbacks; - - ret->_ready_to_read=true; - ret->is_pingable = true; - - pthread_rwlock_wrlock(&a_events->sockets_rwlock); - ret->last_ping_request = time(NULL); - HASH_ADD_INT(a_events->sockets, socket, ret); - pthread_rwlock_unlock(&a_events->sockets_rwlock); - if(a_callbacks->new_callback) - a_callbacks->new_callback(ret,NULL); // Init internal structure - return ret; -} - -/** - * @brief dap_events_socket_find - * @param sock - * @param sh - * @return - */ -dap_events_socket_t * dap_events_socket_find(int sock, struct dap_events * a_events) -{ - dap_events_socket_t * ret = NULL; - pthread_rwlock_rdlock(&a_events->sockets_rwlock); - HASH_FIND_INT(a_events->sockets, &sock, ret); - pthread_rwlock_unlock(&a_events->sockets_rwlock); - return ret; -} - -/** - * @brief dap_events_socket_ready_to_read - * @param sc - * @param isReady - */ -void dap_events_socket_set_readable(dap_events_socket_t * sc,bool is_ready) -{ - if(is_ready != sc->_ready_to_read){ - struct epoll_event ev={0}; - ev.events = EPOLLERR; - sc->_ready_to_read=is_ready; - if(sc->_ready_to_read) - ev.events |= EPOLLIN; - if(sc->_ready_to_write) - ev.events |= EPOLLOUT; - - ev.data.fd=sc->socket; - if (epoll_ctl(sc->dap_worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &ev) == -1) { - log_it(L_ERROR,"Can't update read client socket state in the epoll_fd"); - }else - dap_events_thread_wake_up(&sc->events->proc_thread); - } -} - -/** - * @brief dap_events_socket_ready_to_write - * @param sc - * @param isReady - */ -void dap_events_socket_set_writable(dap_events_socket_t * sc,bool is_ready) -{ - if(is_ready != sc->_ready_to_write){ - struct epoll_event ev={0}; - ev.events = EPOLLERR ; - sc->_ready_to_write=is_ready; - if(sc->_ready_to_read) - ev.events |= EPOLLIN; - if(sc->_ready_to_write) - ev.events |= EPOLLOUT; - ev.data.fd=sc->socket; - if (epoll_ctl(sc->dap_worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &ev) == -1) { - log_it(L_ERROR,"Can't update write client socket state in the epoll_fd"); - }else - dap_events_thread_wake_up(&sc->events->proc_thread); - } - -} - - -/** - * @brief dap_events_socket_remove Removes the client from the list - * @param sc Connection instance - */ -void dap_events_socket_delete(dap_events_socket_t *a_es, bool preserve_inheritor) -{ - if (a_es){ - log_it(L_DEBUG, "es is going to be removed from the lists and free the memory (0x%016X)", a_es ); - pthread_rwlock_wrlock(&a_es->events->sockets_rwlock); - HASH_DEL(a_es->events->sockets, a_es); - pthread_rwlock_unlock(&a_es->events->sockets_rwlock); - log_it(L_DEBUG, "dap_events_socket wrapped around %d socket is removed", a_es->socket); - - if(a_es->callbacks->delete_callback) - a_es->callbacks->delete_callback(a_es, NULL); // Init internal structure - - if(a_es->_inheritor && !preserve_inheritor) -// if(a_es->_inheritor) - free(a_es->_inheritor); - - if(a_es->socket){ - close(a_es->socket); - } - free(a_es); - } -} - -/** - * @brief dap_events_socket_write Write data to the client - * @param sc Conn instance - * @param data Pointer to data - * @param data_size Size of data to write - * @return Number of bytes that were placed into the buffer - */ -size_t dap_events_socket_write(dap_events_socket_t *sc, const void * data, size_t data_size) -{ - data_size = ((sc->buf_out_size+data_size)<(sizeof(sc->buf_out)))?data_size:(sizeof(sc->buf_out)-sc->buf_out_size ); - memcpy(sc->buf_out+sc->buf_out_size,data,data_size); - sc->buf_out_size+=data_size; - return data_size; -} - -/** - * @brief dap_events_socket_write_f Write formatted text to the client - * @param sc Conn instance - * @param format Format - * @return Number of bytes that were placed into the buffer - */ -size_t dap_events_socket_write_f(dap_events_socket_t *sc, const char * format,...) -{ - size_t max_data_size = sizeof(sc->buf_out)-sc->buf_out_size; - va_list ap; - va_start(ap,format); - int ret=vsnprintf((char*) sc->buf_out+sc->buf_out_size,max_data_size,format,ap); - va_end(ap); - if(ret>0){ - sc->buf_out_size+=ret; - return ret; - }else{ - log_it(L_ERROR,"Can't write out formatted data '%s'",format); - return 0; - } -} - -/** - * @brief dap_events_socket_read Read data from input buffer - * @param sc Conn instasnce - * @param data Pointer to memory where to store the data - * @param data_size Size of data to read - * @return Actual bytes number that were read - */ -size_t dap_events_socket_read(dap_events_socket_t *sc, void * data, size_t data_size) -{ - if(data_size<sc->buf_in_size){ - memcpy(data,sc->buf_in,data_size); - memmove(data,sc->buf_in+data_size,sc->buf_in_size-data_size); - }else{ - if(data_size>sc->buf_in_size) - data_size=sc->buf_in_size; - memcpy(data,sc->buf_in,data_size); - } - sc->buf_in_size-=data_size; - return data_size; -} - - -/** - * @brief dap_events_socket_shrink_client_buf_in Shrink input buffer (shift it left) - * @param cl Client instance - * @param shrink_size Size on wich we shrink the buffer with shifting it left - */ -void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size) -{ - if((shrink_size==0)||(cl->buf_in_size==0) ){ - return; - }else if(cl->buf_in_size>shrink_size){ - size_t buf_size=cl->buf_in_size-shrink_size; - void * buf = malloc(buf_size); - memcpy(buf,cl->buf_in+ shrink_size,buf_size ); - memcpy(cl->buf_in,buf,buf_size); - cl->buf_in_size=buf_size; - if (buf) - free(buf); - }else{ - //log_it(WARNING,"Shrinking size of input buffer on amount bigger than actual buffer's size"); - cl->buf_in_size=0; - } - -} diff --git a/dap_events_socket.h b/dap_events_socket.h deleted file mode 100755 index 09e8cb3..0000000 --- a/dap_events_socket.h +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Authors: - * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> - * DeM Labs Inc. https://demlabs.net - * Kelvin Project https://github.com/kelvinblockchain - * Copyright (c) 2017-2019 - * All rights reserved. - - This file is part of DAP (Deus Applications Prototypes) the open source project - - DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - DAP is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. -*/ -#pragma once -#include <stdint.h> -#include <stddef.h> -#include <stdbool.h> -#include "uthash.h" -struct dap_events; -struct dap_events_socket; -struct dap_worker; -typedef struct dap_server dap_server_t; -typedef void (*dap_events_socket_callback_t) (struct dap_events_socket *,void * arg); // Callback for specific client operations -typedef struct dap_events_socket_callbacks{ - dap_events_socket_callback_t new_callback; // Create new client callback - dap_events_socket_callback_t delete_callback; // Delete client callback - dap_events_socket_callback_t read_callback; // Read function - dap_events_socket_callback_t write_callback; // Write function - dap_events_socket_callback_t error_callback; // Error processing function - -} dap_events_socket_callbacks_t; - - -#define DAP_EVENTS_SOCKET_BUF 100000 - -typedef struct dap_events_socket{ - int socket; - bool signal_close; - - bool _ready_to_write; - bool _ready_to_read; - - uint32_t buf_out_zero_count; - union{ - uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data - char buf_in_str[DAP_EVENTS_SOCKET_BUF+1]; - }; - size_t buf_in_size; // size of data that is in the input buffer - - uint8_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data - - char hostaddr[1024]; // Address - char service[128]; - - size_t buf_out_size; // size of data that is in the output buffer - - struct dap_events * events; - - struct dap_worker* dap_worker; - dap_events_socket_callbacks_t *callbacks; - - time_t time_connection; - time_t last_ping_request; - bool is_pingable; - - UT_hash_handle hh; - - void * _inheritor; // Inheritor data to specific client type, usualy states for state machine -} dap_events_socket_t; // Node of bidirectional list of clients - - - -int dap_events_socket_init(); // Init clients module -void dap_events_socket_deinit(); // Deinit clients module - -void dap_events_socket_create_after(dap_events_socket_t * a_es); - -dap_events_socket_t * dap_events_socket_wrap_no_add(struct dap_events * a_events, - int s, dap_events_socket_callbacks_t * a_callbacks); // Create new client and add it to the list - - -dap_events_socket_t * dap_events_socket_find(int sock, struct dap_events * sh); // Find client by socket - -bool dap_events_socket_is_ready_to_read(dap_events_socket_t * sc); -bool dap_events_socket_is_ready_to_write(dap_events_socket_t * sc); -void dap_events_socket_set_readable(dap_events_socket_t * sc,bool is_ready); -void dap_events_socket_set_writable(dap_events_socket_t * sc,bool is_ready); - -size_t dap_events_socket_write(dap_events_socket_t *sc, const void * data, size_t data_size); -size_t dap_events_socket_write_f(dap_events_socket_t *sc, const char * format,...); -size_t dap_events_socket_read(dap_events_socket_t *sc, void * data, size_t data_size); - -void dap_events_socket_delete(dap_events_socket_t *sc,bool preserve_inheritor); // Removes the client from the list - -void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size); - -- GitLab