diff --git a/CMakeLists.txt b/CMakeLists.txt index 7f368192f75fbf2d6907cac2c24e16fcbd0b1927..bc2c58273ff6b0e91afaf36772c9a14d0f50c41d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,12 +3,18 @@ project (dap_client) set(DAP_CLIENT_SRCS dap_client.c + dap_client_pvt.c dap_client_pool.c + dap_events.c + dap_events_socket.c ) set(DAP_CLIENT_HEADERS dap_client.h + dap_client_pvt.h dap_client_pool.h + dap_events.h + dap_events_socket.h ) @@ -20,7 +26,7 @@ include_directories("${INCLUDE_DIRECTORIES} ${dap_server_udp_INCLUDE_DIRS}") include_directories("${INCLUDE_DIRECTORIES} ${dap_stream_INCLUDE_DIRS}") include_directories("${INCLUDE_DIRECTORIES} ${dap_session_INCLUDE_DIRS}") -target_link_libraries(${PROJECT_NAME} dap_core dap_server_core dap_server dap_server_udp dap_stream dap_session) +target_link_libraries(${PROJECT_NAME} dap_core dap_core_server dap_udp_server dap_stream dap_session) target_include_directories(dap_client INTERFACE .) diff --git a/dap_client.c b/dap_client.c index 19e4933b1da89a7580bf106457acf5bd12decb93..e112f5c5ac9faaa8375407e0b0eed67421898374 100644 --- a/dap_client.c +++ b/dap_client.c @@ -1,404 +1,340 @@ -#include <sys/socket.h> -#include <arpa/inet.h> - -#include <fcntl.h> -#include <unistd.h> -#include <errno.h> #include <string.h> -#include <stdio.h> +#include <stdbool.h> + #include "dap_common.h" -#include "dap_config.h" -#include "dap_client.h" -#include "dap_client_remote.h" -#include "stream.h" -#include "stream_session.h" -#include "dap_stream_ch.h" -#include "dap_stream_ch_pkt.h" -#include "dap_stream_ch_proc.h" +#include "dap_http_client.h" #include "dap_client.h" +#include "dap_client_pvt.h" -#define LOG_TAG "dap_client" - -/** - * @brief Private data for sf_client - */ -typedef struct dap_client_pvt{ - dap_client_t * client; - - ev_io* w_client; - struct sap_worker * worker; - const char * a_name; - - dap_client_remote_t * stream_cr; - dap_stream_t * stream; - - size_t stream_ch_size; - dap_stream_ch_t ** ch; // Channels - bool is_client_to_uplink; - - dap_stream_session_t * stream_session; - dap_client_callback_t callback_connected; - dap_client_callback_t callback_disconnected; - dap_client_callback_t callback_error; -} dap_client_pvt_t; - -#define DAP_CLIENT_PVT(a) ( (dap_client_pvt_t *) ( a->_internal ) ) - -// Stage callbacks -void m_stage_status (dap_client_t * a_client, void* a_status); -void m_stage_stream_opened (dap_client_t * a_client, void* arg); -void m_stage_status_error (dap_client_t * a_client , void* a_error); - -// Stream callbacks -void m_stream_cr_delete(dap_client_remote_t * a_cr, void * arg); -void m_stream_cr_read(dap_client_remote_t * a_cr, void * arg); -void m_stream_cr_write(dap_client_remote_t * a_cr, void * arg); -void m_stream_cr_error(dap_client_remote_t * a_cr, void * arg); +#define LOG_TAG "dap_client" +// FSM realization: thats callback executes after the every stage is done +// and have to select the next one stage +void m_stage_fsm_operator(dap_client_t *, void *); /** - * @brief sf_client_new - * @param a_events - * @param a_worker - * @param a_stream - * @param a_name + * @brief dap_client_init * @return */ -static struct dap_events * s_events = NULL; -dap_client_t * dap_client_new(struct dap_events * a_events, const char * a_name, - const char * a_uplink_addr, uint16_t a_uplink_port) +int dap_client_init() { - dap_client_t * l_ret = DAP_NEW_Z(dap_client_t); - l_ret->_internal = DAP_NEW_Z(dap_client_pvt_t); - dap_client_pvt_t * l_ret_pvt = DAP_CLIENT_PVT(l_ret); - - // Create SAP_CLIENT object and setup it with data from config - l_ret_pvt->client = dap_client_new(m_stage_status,m_stage_status_error); - l_ret_pvt->client->_inheritor = l_ret; // We inherit dap_client from dap_client_remote - l_ret_pvt->events = a_events; - s_events = a_events; - - dap_client_set_uplink(l_ret_pvt->client, a_uplink_addr, a_uplink_port); + static bool s_is_first_time=true; + if (s_is_first_time ){ + log_it(L_INFO, "Init SAP client module"); + dap_http_client_init(); + dap_client_pvt_init(); + s_is_first_time = false; + } + return 0; - dap_client_go_stage(l_ret_pvt->client ,SAP_CLIENT_STAGE_STREAM_CTL ,m_stage_stream_opened ); +} - log_it(L_NOTICE,"Socket Forwarding client %s is initialized", l_pname ); - SAP_DELETE(l_pname); - return l_ret; +/** + * @brief dap_client_deinit + */ +void dap_client_deinit() +{ + dap_client_pvt_deinit(); + dap_http_client_deinit(); + log_it(L_INFO, "Deinit SAP client module"); } /** - * @brief sf_client_delete - * @param a_client + * @brief dap_client_new + * @param a_stage_status_callback + * @param a_stage_status_error_callback * @return */ -int dap_client_delete(dap_client_t * a_client) +dap_client_t * dap_client_new(dap_events_t * a_events, dap_client_callback_t a_stage_status_callback + ,dap_client_callback_t a_stage_status_error_callback ) { - + // ALLOC MEM FOR dap_client + dap_client_t *l_client = DAP_NEW_Z(dap_client_t); + if (!l_client) + goto MEM_ALLOC_ERR; + + l_client->_internal = DAP_NEW_Z(dap_client_pvt_t); + if (!l_client->_internal) + goto MEM_ALLOC_ERR; + + // CONSTRUCT dap_client object + DAP_CLIENT_PVT(l_client)->client = l_client; + DAP_CLIENT_PVT(l_client)->events = a_events; + DAP_CLIENT_PVT(l_client)->stage_status_callback = a_stage_status_callback; + DAP_CLIENT_PVT(l_client)->stage_status_error_callback = a_stage_status_error_callback; + + dap_client_pvt_new(DAP_CLIENT_PVT(l_client) ); + + return l_client; + +MEM_ALLOC_ERR: + log_it(L_ERROR, "dap_client_new can not allocate memory"); + if (l_client) + if(l_client->_internal) + free(l_client->_internal); + + if (l_client) + DAP_DELETE (l_client); + return NULL; } + /** - * @brief m_stage_stream_opened + * @brief dap_client_reset * @param a_client - * @param arg */ -void m_stage_stream_opened(sap_client_t * a_client, void* arg) +void dap_client_reset(dap_client_t * a_client) { - log_it(L_INFO, "Stream session is opened, time to init it"); - - dap_client_t * l_sf_client = DAP_CLIENT(a_client); - dap_client_pvt_t * l_sf_client_pvt = DAP_CLIENT_PVT(l_sf_client); - l_sf_client_pvt->events = s_events; - if(l_sf_client == NULL ){ - log_it(L_ERROR, "sap_client is not inisialized"); - return; - } + dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); - int l_sock_peer = socket(PF_INET,SOCK_STREAM,0); + if(l_client_internal->session_key){ + dap_enc_key_delete(l_client_internal->session_key); + l_client_internal->session_key = NULL; - if( l_sock_peer < 1 ){ - log_it(L_ERROR,"Can't create the socket: %s", strerror(errno) ); - return; } - -// fcntl(l_sock_peer, F_SETFL, O_NONBLOCK); - setsockopt(l_sock_peer,SOL_SOCKET,SO_SNDBUF,(const void*) 20000000,sizeof(int) ); - setsockopt(l_sock_peer,SOL_SOCKET,SO_RCVBUF,(const void *) 10000000,sizeof(int) ); - - // Wrap socket and setup callbacks - static sap_events_socket_callbacks_t l_s_callbacks={ - .read_callback = m_stream_cr_read , - .write_callback = m_stream_cr_write , - .error_callback = m_stream_cr_error , - .delete_callback = m_stream_cr_delete - }; - - - - l_sf_client_pvt->stream_cr = sap_events_socket_wrap_no_add(l_sf_client_pvt->events, - l_sock_peer, &l_s_callbacks); - - l_sf_client_pvt->stream_cr->_inheritor = l_sf_client; // Inherit SF_CLIENT object to stream (then proxy to stream_ch objects) - l_sf_client_pvt->stream = sap_stream_new_es(l_sf_client_pvt->stream_cr); - l_sf_client_pvt->stream->is_client_to_uplink = true; - l_sf_client_pvt->stream_session = sap_stream_session_pure_new(); - - l_sf_client_pvt->stream_session->opened = true; - l_sf_client_pvt->stream_session->is_client_to_uplink = true; - l_sf_client_pvt->stream_session->key = sap_client_get_key_stream(a_client); - l_sf_client_pvt->stream->session = l_sf_client_pvt->stream_session;// Connect to session object created before - l_sf_client_pvt->stream_ch_sf = sap_stream_ch_new( l_sf_client_pvt->stream ,'s' ); - l_sf_client_pvt->ch_sf = CH_SF(l_sf_client_pvt->stream_ch_sf); - sap_stream_es_rw_states_update( l_sf_client_pvt->stream ); - sap_events_socket_set_readable( l_sf_client_pvt->stream_cr,true ); - - sap_events_socket_create_after(l_sf_client_pvt->stream_cr); - // Compose URL - size_t l_url_size_max = 1024; - char * l_url = SAP_NEW_Z_SIZE(char,l_url_size_max); - size_t l_url_size = snprintf(l_url,l_url_size_max,"/%s/fjskd9234j?fj913htmdgaq-d9hf=%s", SAP_UPLINK_PATH_STREAM, - sap_client_get_stream_id(a_client) ); - - // Compose HTTP request - size_t l_request_size_max = 4096; - char * l_request = SAP_NEW_Z_SIZE(char,l_request_size_max); - size_t l_request_size = snprintf(l_request, l_request_size_max, - "POST %s HTTP/1.1\r\n" - "Connection: Keep-Alive\r\n" - "Cookie: %s\r\n" - "User-Agent: libsap \r\n" - "Host: %3\r\n" - "\r\n", - l_url, sap_client_get_auth_cookie(a_client), - sap_client_get_uplink_addr(a_client) - ); - - // Configure remote address - struct sockaddr_in l_remote_addr; - l_remote_addr.sin_addr.s_addr = inet_addr( sap_client_get_uplink_addr(a_client) ); - l_remote_addr.sin_family = AF_INET; - l_remote_addr.sin_port = htons( sap_client_get_uplink_port(a_client) ); - log_it(L_INFO, "Prepared request, connecting to the uplink..."); - if( connect(l_sock_peer , (struct sockaddr *)&l_remote_addr , sizeof(l_remote_addr) ) == 0 ){ - log_it(L_NOTICE, "Connected to the uplink"); - send(l_sock_peer,l_request,l_request_size,0); - SAP_DELETE(l_request); + if(l_client_internal->session_key_id){ + DAP_DELETE(l_client_internal->session_key_id); + l_client_internal->session_key_id = NULL; + } + if ( l_client_internal->stream_key ){ + dap_enc_key_delete(l_client_internal->stream_key ); + l_client_internal->stream_key = NULL; } + l_client_internal->stream_es = NULL; + + l_client_internal->stage = STAGE_BEGIN; + l_client_internal->stage_status = STAGE_STATUS_DONE ; + l_client_internal->stage_target = STAGE_BEGIN ; } + /** - * @brief m_es_stream_delete - * @param a_es - * @param arg + * @brief dap_client_delete + * @param a_client */ -void m_stream_cr_delete(sap_events_socket_t * a_es, void * arg) +void dap_client_delete(dap_client_t * a_client) { - log_it(L_INFO, "Reconnecting peer"); - dap_client_t * l_client = DAP_CLIENT(a_es); - dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client); - sap_stream_delete(l_client_pvt->stream); - l_client_pvt->stream = NULL; - if(l_client_pvt->client) - sap_client_reset(l_client_pvt->client); - l_client_pvt->ch_sf = NULL; - l_client_pvt->stream_ch_sf = NULL; - l_client_pvt->stream_cr = NULL; - sap_stream_session_close(l_client_pvt->stream_session->id); - l_client_pvt->stream_session = NULL; - sap_client_go_stage(l_client_pvt->client ,SAP_CLIENT_STAGE_STREAM_CTL ,m_stage_stream_opened ); + dap_client_pvt_delete(DAP_CLIENT_PVT(a_client)); + DAP_DELETE(a_client); } /** - * @brief m_es_stream_read - * @param a_es - * @param arg + * @brief dap_client_go_stage + * @param a_client + * @param a_stage_end */ -void m_stream_cr_read(sap_events_socket_t * a_es, void * arg) +void dap_client_go_stage(dap_client_t * a_client, dap_client_stage_t a_stage_target, dap_client_callback_t a_stage_end_callback) { - dap_client_t * l_client = DAP_CLIENT(a_es); - dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client); - switch( l_client->stage ){ - case DAP_CLIENT_DISCONNECTED: l_client->stage = DAP_CLIENT_CONNECTING; - case DAP_CLIENT_CONNECTING:{ - l_client->stage = DAP_CLIENT_CONNECTED_HTTP_HEADERS; - } - case DAP_CLIENT_CONNECTED_HTTP_HEADERS:{ - if(a_es->buf_in_size>1){ - char * p; - p = (char*) memchr( a_es->buf_in,'\r',a_es->buf_in_size-1); - if ( p ){ - if ( *(p+1) == '\n' ) { - sap_events_socket_shrink_buf_in(a_es,p - a_es->buf_in_str ); - log_it(L_DEBUG,"Header passed, go to streaming (%lu bytes already are in input buffer", a_es->buf_in_size); - l_client->stage = DAP_CLIENT_CONNECTED_STREAMING; - sap_stream_data_proc_read(l_client_pvt->stream); - sap_events_socket_shrink_buf_in(a_es,a_es->buf_in_size ); - - - pthread_mutex_lock(&m_tun_server->clients_mutex); - ch_sf_pkt_t * l_sf_pkt ; - size_t l_sf_pkt_data_size = sizeof(in_addr_t)*2*(m_tun_server->peers_count+1); - size_t l_sf_pkt_size = sizeof(l_sf_pkt->header)+l_sf_pkt_data_size; - size_t i; - l_sf_pkt = SAP_NEW_Z_SIZE(ch_sf_pkt_t,l_sf_pkt_size); - l_sf_pkt->header.op_code = STREAM_SF_PACKET_OP_CODE_L3_ADDR_REQUEST; - l_sf_pkt->header.op_data.data_size = l_sf_pkt_data_size; - memcpy(l_sf_pkt->data, - &m_tun_server->int_network,sizeof(m_tun_server->int_network)); - memcpy(l_sf_pkt->data+sizeof(in_addr_t), - &m_tun_server->int_network_mask,sizeof(m_tun_server->int_network_mask)); - - for(i=1; i< m_tun_server->peers_count+1; i++){ - memcpy(l_sf_pkt->data+i*sizeof(in_addr_t)*2, - &m_tun_server->peers[i].netaddr,sizeof(in_addr_t)); - memcpy(l_sf_pkt->data+i*sizeof(in_addr_t)*2+sizeof(in_addr_t), - &m_tun_server->peers[i].netmask,sizeof(in_addr_t)); - log_it(L_NOTICE, "Add netaddr %s in request", m_tun_server->peers[i].netaddr); - } - - pthread_mutex_unlock(&m_tun_server->clients_mutex); - log_it(L_NOTICE, "Send L3 address request"); - stream_ch_pkt_write(l_client_pvt->stream_ch_sf,'d',l_sf_pkt,l_sf_pkt_size ); - SAP_DELETE(l_sf_pkt); - } - } + // ----- check parameters ----- + if(NULL == a_client) { + log_it(L_ERROR, "dap_client_go_stage, a_client == NULL"); + return; + } + if(NULL == a_stage_end_callback) { + log_it(L_ERROR, "dap_client_go_stage, a_stage_end_callback == NULL"); + return; + } + dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); + + l_client_internal->stage_target = a_stage_target; + l_client_internal->stage_target_done_callback = a_stage_end_callback; + if(a_stage_target != l_client_internal->stage ){ // Going to stages downstairs + switch(l_client_internal->stage_status ){ + case STAGE_STATUS_ABORTING: + log_it(L_ERROR, "Already aborting the stage %s" + , dap_client_stage_str(l_client_internal->stage)); + break; + case STAGE_STATUS_IN_PROGRESS:{ + log_it(L_WARNING, "Aborting the stage %s" + , dap_client_stage_str(l_client_internal->stage)); + }break; + case STAGE_STATUS_DONE: + case STAGE_STATUS_ERROR: + default: { + log_it(L_DEBUG, "Start transitions chain to %" + ,dap_client_stage_str(l_client_internal->stage_target) ); + int step = (a_stage_target > l_client_internal->stage)?1:-1; + dap_client_pvt_stage_transaction_begin(l_client_internal, + l_client_internal->stage+step, + m_stage_fsm_operator + ); } - }break; - case DAP_CLIENT_CONNECTED_STREAMING:{ - sap_stream_data_proc_read(l_client_pvt->stream); - sap_events_socket_shrink_buf_in(a_es,a_es->buf_in_size ); } + }else{ // Same stage + log_it(L_ERROR,"We're already on stage %s",dap_client_stage_str(a_stage_target)); } } /** - * @brief m_es_stream_write - * @param a_es - * @param arg + * @brief m_stage_fsm_operator + * @param a_client + * @param a_arg */ -void m_stream_cr_write(sap_events_socket_t * a_es, void * arg) +void m_stage_fsm_operator(dap_client_t * a_client, void * a_arg) { - dap_client_t * l_client = DAP_CLIENT(a_es); - switch( l_client->stage ){ - case DAP_CLIENT_DISCONNECTED: l_client->stage = DAP_CLIENT_CONNECTING; - case DAP_CLIENT_CONNECTING:{ - l_client->stage = DAP_CLIENT_CONNECTED_HTTP_HEADERS; - }break; - case DAP_CLIENT_CONNECTED_STREAMING:{ - size_t i; - bool ready_to_write=false; - // log_it(DEBUG,"Process channels data output (%u channels)",STREAM(sh)->channel_count); - - for(i=0;i< DAP_CLIENT_PVT(l_client)->stream->channel_count; i++){ - sap_stream_ch_t * ch = DAP_CLIENT_PVT(l_client)->stream->channel[i]; - if(ch->writable){ - ch->proc->packet_out_callback(ch,NULL); - ready_to_write|=ch->writable; - } - } - //log_it(L_DEBUG,"stream_data_out (ready_to_write=%s)", ready_to_write?"true":"false"); + (void *) a_arg; + dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); + if (l_client_internal->stage_target == l_client_internal->stage){ + log_it(L_WARNING, "FSM Op: current stage %s is same as target one, nothing to do", + dap_client_stage_str( l_client_internal->stage ) ); + l_client_internal->stage_status_done_callback = NULL; - sap_events_socket_set_writable(DAP_CLIENT_PVT(l_client)->stream_cr,ready_to_write); - //log_it(ERROR,"No stream_data_write_callback is defined"); - }break; + return; } + + int step = (l_client_internal->stage_target > l_client_internal->stage)?1:-1; + dap_client_stage_t l_stage_next = l_client_internal->stage+step; + log_it(L_NOTICE, "FSM Op: current stage %s, go to %s (target %s)" + ,dap_client_stage_str(l_client_internal->stage), dap_client_stage_str(l_stage_next) + ,dap_client_stage_str(l_client_internal->stage_target)); + dap_client_pvt_stage_transaction_begin(l_client_internal, + l_stage_next, m_stage_fsm_operator + ); + } + /** - * @brief m_es_stream_error - * @param a_es - * @param arg + * @brief dap_client_request_enc + * @param a_client + * @param a_path + * @param a_suburl + * @param a_query + * @param a_request + * @param a_request_size + * @param a_response_proc + * @param a_response_error */ -void m_stream_cr_error(sap_events_socket_t * a_es, void * arg) +void dap_client_request_enc(dap_client_t * a_client, const char * a_path, const char * a_suburl,const char* a_query, void * a_request, size_t a_request_size, + dap_client_callback_data_size_t a_response_proc, dap_client_callback_int_t a_response_error ) { - dap_client_t * l_client = DAP_CLIENT(a_es); - log_it(L_ERROR,"ES stream error"); + dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); + dap_client_pvt_request_enc(l_client_internal, a_path, a_suburl, a_query,a_request,a_request_size, a_response_proc,a_response_error); } - /** - * @brief m_stage_status - * @param a_client - * @param a_status + * @brief dap_client_error_str + * @param a_client_error + * @return */ -void m_stage_status (sap_client_t * a_client, void* a_status) +const char * dap_client_error_str(dap_client_error_t a_client_error) { - + switch(a_client_error){ + case ERROR_ENC_NO_KEY: return "ENC_NO_KEY"; + case ERROR_ENC_WRONG_KEY: return "ENC_WRONG_KEY"; + case ERROR_STREAM_RESPONSE_WRONG: return "STREAM_RESPONSE_WRONG"; + case ERROR_STREAM_RESPONSE_TIMEOUT: return "STREAM_RESPONSE_TIMEOUT"; + case ERROR_STREAM_FREEZED: return "STREAM_FREEZED"; + case ERROR_STREAM_CTL_ERROR: return "STREAM_CTL_ERROR"; + case ERROR_STREAM_CTL_ERROR_AUTH: return "STREAM_CTL_ERROR_AUTH"; + case ERROR_STREAM_CTL_ERROR_RESPONSE_FORMAT: return "STREAM_CTL_ERROR_RESPONSE_FORMAT"; + default : return "UNDEFINED"; + } } /** - * @brief m_stage_status_error + * @brief dap_client_get_error_str * @param a_client - * @param a_error + * @return */ -void m_stage_status_error (sap_client_t * a_client , void* a_error) +const char * dap_client_get_error_str(dap_client_t * a_client) { - + return dap_client_error_str( DAP_CLIENT_PVT(a_client)->last_error ); } - /** - * @brief sf_client_set_callback_error + * @brief dap_client_get_stage * @param a_client - * @param a_client_callback_error + * @return */ -void dap_client_set_callback_error(dap_client_t * a_client, dap_client_callback_t a_client_callback_error) +dap_client_stage_t dap_client_get_stage(dap_client_t * a_client) { - DAP_CLIENT_PVT(a_client)->callback_error = a_client_callback_error; + return DAP_CLIENT_PVT(a_client)->stage; } /** - * @brief sf_client_set_callback_connected + * @brief dap_client_get_stage_status_str * @param a_client - * @param a_client_callback_connected + * @return */ -void dap_client_set_callback_connected(dap_client_t * a_client, dap_client_callback_t a_client_callback_connected) +const char * dap_client_get_stage_status_str(dap_client_t *a_client){ + return dap_client_stage_status_str(DAP_CLIENT_PVT(a_client)->stage_status); +} + +/** + * @brief dap_client_stage_status_str + * @param a_stage_status + * @return + */ +const char * dap_client_stage_status_str(dap_client_stage_status_t a_stage_status) { - DAP_CLIENT_PVT(a_client)->callback_connected = a_client_callback_connected; + switch(a_stage_status){ + case STAGE_STATUS_NONE: return "NONE"; + case STAGE_STATUS_IN_PROGRESS: return "IN_PROGRESS"; + case STAGE_STATUS_ERROR: return "ERROR"; + case STAGE_STATUS_DONE: return "DONE"; + default: return "UNDEFINED"; + } } /** - * @brief sf_client_set_callback_disconnected + * @brief dap_client_get_stage_str * @param a_client - * @param a_client_callback_disconnected + * @return */ -void dap_client_set_callback_disconnected(dap_client_t * a_client, dap_client_callback_t a_client_callback_disconnected) +const char * dap_client_get_stage_str(dap_client_t *a_client) { - DAP_CLIENT_PVT(a_client)->callback_disconnected = a_client_callback_disconnected; + return dap_client_stage_str(DAP_CLIENT_PVT(a_client)->stage); } /** - * @brief sf_client_get_addr + * @brief dap_client_stage_str + * @param a_stage + * @return + */ +const char * dap_client_stage_str(dap_client_stage_t a_stage) +{ + switch(a_stage){ + case STAGE_BEGIN: return "BEGIN"; + case STAGE_ENC_INIT: return "ENC"; + case STAGE_STREAM_CTL: return "STREAM_CTL"; + case STAGE_STREAM_SESSION: return "STREAM_SESSION"; + case STAGE_STREAM_STREAMING: return "STREAM"; + default: return "UNDEFINED"; + } +} +/** + * @brief dap_client_get_stage_status * @param a_client * @return */ -struct in_addr sf_client_get_addr(dap_client_t * a_client) +dap_client_stage_status_t dap_client_get_stage_status(dap_client_t * a_client) { - return DAP_CLIENT_PVT(a_client)->stream_session->tun_client_addr; + return DAP_CLIENT_PVT(a_client)->stage_status; } /** - * @brief sf_client_get_gw + * @brief dap_client_get_key_stream * @param a_client * @return */ -struct in_addr sf_client_get_gw(dap_client_t * a_client) -{ - return DAP_CLIENT_PVT(a_client)->stream_session->tun_client_gw; +dap_enc_key_t * dap_client_get_key_stream(dap_client_t * a_client){ + return DAP_CLIENT_PVT(a_client)->stream_key; } + /** - * @brief sf_client_get_netmask + * @brief dap_client_get_stream_id * @param a_client * @return */ -struct in_addr sf_client_get_netmask(dap_client_t * a_client) +const char * dap_client_get_stream_id(dap_client_t * a_client) { - return DAP_CLIENT_PVT(a_client)->stream_session->tun_client_mask; + return DAP_CLIENT_PVT(a_client)->stream_id; } diff --git a/dap_client.h b/dap_client.h index e4290cddfbdfc605782b2276f9c5cb21c362be75..5d248f4443767e28d4e94e59d1b562479ee4ee5c 100644 --- a/dap_client.h +++ b/dap_client.h @@ -3,7 +3,7 @@ * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> * DeM Labs Inc. https://demlabs.net * Kelvin Project https://github.com/kelvinblockchain - * Copyright (c) 2017-2018 + * Copyright (c) 2017-2019 * All rights reserved. This file is part of DAP (Deus Applications Prototypes) the open source project @@ -23,23 +23,102 @@ */ #pragma once -typedef enum dap_client_stage{ DAP_CLIENT_DISCONNECTED=0,DAP_CLIENT_CONNECTING=1, - DAP_CLIENT_CONNECTED_HTTP_HEADERS=2, - DAP_CLIENT_CONNECTED_STREAMING=3 } dap_client_stage_t; -typedef struct dap_client { +#include <stdint.h> +#include "dap_enc_key.h" +#include "dap_events.h" +/** + * @brief The dap_client_stage enum. Top level of client's state machine + **/ +typedef enum dap_client_stage { + STAGE_BEGIN=0, + STAGE_ENC_INIT=1, + STAGE_STREAM_CTL=2, + STAGE_STREAM_SESSION=3, + STAGE_STREAM_CONNECTED=4, + STAGE_STREAM_STREAMING=5 +} dap_client_stage_t; + +typedef enum dap_client_stage_status { + STAGE_STATUS_NONE=0, + // Enc init stage + STAGE_STATUS_IN_PROGRESS, + STAGE_STATUS_ABORTING, + STAGE_STATUS_ERROR, + STAGE_STATUS_DONE, +} dap_client_stage_status_t; + +typedef enum dap_client_error { + ERROR_NO_ERROR = 0, + ERROR_ENC_NO_KEY, + ERROR_ENC_WRONG_KEY, + ERROR_STREAM_CTL_ERROR, + ERROR_STREAM_CTL_ERROR_AUTH, + ERROR_STREAM_CTL_ERROR_RESPONSE_FORMAT, + ERROR_STREAM_RESPONSE_WRONG, + ERROR_STREAM_RESPONSE_TIMEOUT, + ERROR_STREAM_FREEZED, + ERROR_NETWORK_CONNECTION_REFUSE +} dap_client_error_t; + +#define DAP_CLIENT_PROTOCOL_VERSION 22 + +/** + * @brief The dap_client struct + */ +typedef struct dap_client{ void * _internal; - dap_client_stage_t stage; + void * _inheritor; } dap_client_t; typedef void (*dap_client_callback_t) (dap_client_t *, void*); -typedef dap_stream_t; -typedef dap_events_t; +typedef void (*dap_client_callback_int_t) (dap_client_t *, int); +typedef void (*dap_client_callback_data_size_t) (dap_client_t *, void *, size_t); -#define DAP_CLIENT(a) ((dap_client_t *) (a)->_inheritor ) +#define DAP_UPLINK_PATH_ENC_INIT "1901248124123459" +#define DAP_UPLINK_PATH_DB "01094787531354" +#define DAP_UPLINK_PATH_STREAM_CTL "091348758013553" +#define DAP_UPLINK_PATH_STREAM "874751843144" +#define DAP_UPLINK_PATH_LICENSE "license" +#define DAP_UPLINK_PATH_SERVER_LIST "slist" + +#ifdef __cplusplus +extern "C" { +#endif + +int dap_client_init(); +void dap_client_deinit(); + +dap_client_t * dap_client_new(dap_events_t * a_events, dap_client_callback_t a_stage_status_callback + , dap_client_callback_t a_stage_status_error_callback ); +void dap_client_delete(dap_client_t * a_client); + + +dap_enc_key_t * dap_client_get_key_stream(dap_client_t * a_client); -dap_client_t * dap_client_new(dap_events_t * a_events,const char * a_name); +void dap_client_go_stage(dap_client_t * a_client, dap_client_stage_t a_stage_end, dap_client_callback_t a_stage_end_callback); + +void dap_client_reset(dap_client_t * a_client); + +void dap_client_request_enc(dap_client_t * a_client, const char * a_path,const char * a_suburl,const char* a_query, void * a_request, size_t a_request_size, + dap_client_callback_data_size_t a_response_proc, dap_client_callback_int_t a_response_error); + + +const char * dap_client_get_stage_str(dap_client_t * a_client); +const char * dap_client_stage_str(dap_client_stage_t a_stage); + +const char * dap_client_get_stage_status_str(dap_client_t * a_client); +const char * dap_client_stage_status_str(dap_client_stage_status_t a_stage_status); +const char * dap_client_error_str(dap_client_error_t a_client_error); +const char * dap_client_get_error_str(dap_client_t * a_client); + +const char * dap_client_get_auth_cookie(dap_client_t * a_client); +const char * dap_client_get_stream_id(dap_client_t * a_client); + +dap_client_stage_t dap_client_get_stage(dap_client_t * a_client); +dap_client_stage_status_t dap_client_get_stage_status(dap_client_t * a_client); + +#define DAP_CLIENT(a) ((dap_client_t *) (a)->_inheritor ) -void dap_client_set_callback_error(dap_client_t * a_client, dap_client_callback_t a_client_callback_error); -void dap_client_set_callback_connected(dap_client_t * a_client, dap_client_callback_t a_client_callback_connected); -void dap_client_set_callback_disconnected(dap_client_t * a_client, dap_client_callback_t a_client_callback_disconnected); -int dap_client_delete(dap_client_t * a_client); +#ifdef __cplusplus +} +#endif diff --git a/dap_client_pool.c b/dap_client_pool.c index 87b8a35813c549c4bae47afa18615765272236b3..e386c4206532118eecfcb486a11e50b193ae8534 100644 --- a/dap_client_pool.c +++ b/dap_client_pool.c @@ -3,126 +3,59 @@ #include <string.h> #include <errno.h> -#include "sap_common.h" -#include "sap_config.h" -#include "sap_events.h" -#include "sap_events_socket.h" -#include "ch_sf.h" -#include "ch_sf_tun.h" -#include "sf_client.h" -#include "mod_sf.h" +#include "dap_common.h" +#include "dap_config.h" +#include "dap_events.h" +#include "dap_events_socket.h" +#include "dap_client.h" +#include "dap_client_pool.h" -#define LOG_TAG "mod_sf" +#define LOG_TAG "dap_client_pool" -static struct sap_events * s_events = NULL; // Single-linked peers list -struct sf_client_list{ - sf_client_t * item; - char * name; - struct sf_client_list * next; +struct dap_client_list{ + dap_client_t * item; + char * id; + struct dap_client_list * next; }; -static struct sf_client_list * s_peers = NULL; -void m_sf_client_callback_connected(sf_client_t * a_client, void * arg); -void m_sf_client_callback_disconnected(sf_client_t * a_client, void * arg); -void m_sf_client_callback_error(sf_client_t * a_client, void * arg); +void s_stage_status_callback(dap_client_t * a_client, void* a_arg); +void s_stage_status_error_callback(dap_client_t * a_client, void* a_arg); +dap_events_t * s_events = NULL; /** - * @brief mod_sf_peer_start + * @brief dap_client_pool_init * @param a_events - * @param a_worker + * @return */ -void mod_sf_peer_start(struct sap_events * a_events) +int dap_client_pool_init(dap_events_t * a_events) { - if( s_events != NULL ){ - log_it(L_WARNING, "Peering is already started"); - return; - } s_events = a_events; - - // Prepare dir parse - DIR *d; - struct dirent *dir; - size_t buf_size = strlen(sap_configs_path())+1+strlen(SF_CLIENT_CONFIGS_PATH)+1; - char * buf = SAP_NEW_Z_SIZE(char,buf_size); - snprintf(buf,buf_size,"%s/%s",sap_configs_path(),SF_CLIENT_CONFIGS_PATH); - d = opendir(buf); - if (d) { - // Process every dir entry - while ((dir = readdir(d)) != NULL) { - if( dir->d_name[0]=='.' ) continue; - log_it(L_DEBUG,"Peer config '%s'", dir->d_name); - // List the peer in memory - struct sf_client_list * l_peer = SAP_NEW_Z(struct sf_client_list); - l_peer->next = s_peers; - l_peer->name = SAP_NEW_Z_SIZE(char, strlen(dir->d_name)); - strcpy(l_peer->name, dir->d_name); - - s_peers = l_peer; - - sf_client_t * l_sf_client = sf_client_new(a_events,dir->d_name); - if( l_sf_client == NULL){ - SAP_DELETE(l_peer); - log_it(L_WARNING, "Can't init peer config '%s'",dir->d_name); - continue; - } - l_peer->item = l_sf_client; - - // Setup callbacks - sf_client_set_callback_connected(l_sf_client,m_sf_client_callback_connected ); - sf_client_set_callback_disconnected( l_sf_client,m_sf_client_callback_disconnected ); - sf_client_set_callback_error(l_sf_client,m_sf_client_callback_error ); - } - closedir(d); - }else{ - log_it(L_ERROR, "Can't open path %s: %s",buf,strerror(errno)); - } } -/** - * @brief mod_sf_peer_stop - */ -void mod_sf_peer_stop() +void dap_client_pool_deinit() { - struct sf_client_list * l_item = s_peers, *l_tmp; - while (l_item){ - sf_client_delete(l_item->item); - l_tmp = l_item->next; - SAP_DELETE(l_item->name); - SAP_DELETE(l_item); - l_item = l_tmp; - } } /** - * @brief m_sf_client_callback_connected - * @param a_client - * @param arg + * @brief dap_client_pool_new + * @param a_client_id + * @return */ -void m_sf_client_callback_connected(sf_client_t * a_client, void * arg) +dap_client_t * dap_client_pool_new(const char * a_client_id) { - log_it(L_DEBUG,"mod_sf_client connected"); + dap_client_t * l_client = dap_client_new(s_events, s_stage_status_callback + , s_stage_status_error_callback ); } -/** - * @brief m_sf_client_callback_disconnected - * @param a_client - * @param arg - */ -void m_sf_client_callback_disconnected(sf_client_t * a_client, void * arg) +void s_stage_status_callback(dap_client_t * a_client, void* a_arg) { - log_it(L_DEBUG,"mod_sf_client disconnected"); + } -/** - * @brief m_sf_client_callback_error - * @param a_client - * @param arg - */ -void m_sf_client_callback_error(sf_client_t * a_client, void * arg) +void s_stage_status_error_callback(dap_client_t * a_client, void* a_arg) { - log_it(L_WARNING,"mod_sf_client error"); -} +} diff --git a/dap_client_pool.h b/dap_client_pool.h index 808f5ff5ad4b5555ef3c086fc1b4c758f85e3aec..a6b955f0eb6c1c6b02fdd10843850575deaf52a1 100644 --- a/dap_client_pool.h +++ b/dap_client_pool.h @@ -1,9 +1,33 @@ -#ifndef _MOD_SF_H_ -#define _MOD_SF_H_ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2017-2019 + * All rights reserved. -struct sap_events; -void mod_sf_peer_start(struct sap_events * a_events); -void mod_sf_peer_stop(); + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#pragma once +#include "dap_client.h" +#include "dap_events.h" + +int dap_client_pool_init(dap_events_t * a_events); +void dap_client_pool_deinit(); + +dap_client_t * dap_client_pool_new (const char * a_client_id); -#endif diff --git a/dap_client_pvt.c b/dap_client_pvt.c new file mode 100644 index 0000000000000000000000000000000000000000..3382f045199246b372ceb05793c5bbfa8910d051 --- /dev/null +++ b/dap_client_pvt.c @@ -0,0 +1,731 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2017-2019 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#include <sys/types.h> /* See NOTES */ +#include <sys/socket.h> + +#include <stdio.h> +#include <string.h> +#include <assert.h> + +#include "dap_enc_key.h" +#include "dap_enc_base64.h" +#include "dap_enc.h" +#include "dap_common.h" + +#include "dap_http_client_simple.h" +#include "dap_client.h" +#include "dap_client_pvt.h" +#include "dap_stream.h" +#include "dap_stream_ch.h" +#include "dap_stream_ch_proc.h" +#include "dap_stream_ch_pkt.h" + +#define LOG_TAG "dap_client_pvt" + +#ifndef DAP_ENC_KS_KEY_ID_SIZE +#define DAP_ENC_KS_KEY_ID_SIZE 33 +#endif + +static void s_stage_status_after(dap_client_pvt_t * a_client_internal); + + +// ENC stage callbacks +void m_enc_init_response(dap_client_t *, void *, size_t); +void m_enc_init_error(dap_client_t *, int); + +// STREAM_CTL stage callbacks +void m_stream_ctl_response(dap_client_t *, void *, size_t); +void m_stream_ctl_error(dap_client_t *, int); +void m_stage_stream_streaming (dap_client_t * a_client, void* arg); + + + +void m_request_response(void * a_response,size_t a_response_size,void * a_obj); +void m_request_error(int,void *); + +// stream callbacks + +void m_es_stream_delete(dap_events_socket_t * a_es, void * arg); +void m_es_stream_read(dap_events_socket_t * a_es, void * arg); +void m_es_stream_write(dap_events_socket_t * a_es, void * arg); +void m_es_stream_error(dap_events_socket_t * a_es, void * arg); + +/** + * @brief dap_client_internal_init + * @return + */ +int dap_client_pvt_init() +{ + return 0; +} + +/** + * @brief dap_client_internal_deinit + */ +void dap_client_pvt_deinit() +{ +} + +/** + * @brief dap_client_internal_new + * @param a_client_internal + */ +void dap_client_pvt_new(dap_client_pvt_t * a_client_internal) +{ + a_client_internal->stage = STAGE_BEGIN; // start point of state machine + a_client_internal->stage_status = STAGE_STATUS_DONE; +} + + + +/** + * @brief dap_client_pvt_delete + * @param a_client_pvt + */ +void dap_client_pvt_delete(dap_client_pvt_t * a_client_pvt) +{ + if(a_client_pvt->session_key_id) + DAP_DELETE(a_client_pvt->session_key_id); + if (a_client_pvt->session_key) + dap_enc_key_delete(a_client_pvt->session_key); + if (a_client_pvt->session_key_open) + dap_enc_key_delete(a_client_pvt->session_key_open); + if (a_client_pvt->stream_key) + dap_enc_key_delete(a_client_pvt->stream_key); + +} + + +/** + * @brief s_client_internal_stage_status_proc + * @param a_client + */ +static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) +{ + switch(a_client_pvt->stage_status){ + case STAGE_STATUS_IN_PROGRESS:{ + switch( a_client_pvt->stage){ + case STAGE_ENC_INIT:{ + log_it(L_INFO,"Go to stage ENC: prepare the request"); + + a_client_pvt->session_key_open = dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_MSRLN, + NULL,0, + NULL,0, + 0); + + + size_t l_key_str_size_max = DAP_ENC_BASE64_ENCODE_SIZE(a_client_pvt->session_key_open->pub_key_data_size); + char *l_key_str= DAP_NEW_Z_SIZE(char,l_key_str_size_max+1); + size_t l_key_str_enc_size = dap_enc_base64_encode(a_client_pvt->session_key_open->pub_key_data, + a_client_pvt->session_key_open->pub_key_data_size, + l_key_str,DAP_ENC_DATA_TYPE_B64_URLSAFE); + + log_it(L_DEBUG,"ENC request size %u",l_key_str_enc_size); + dap_client_pvt_request( a_client_pvt, DAP_UPLINK_PATH_ENC_INIT "/gd4y5yh78w42aaagh", + l_key_str,l_key_str_size_max, m_enc_init_response, m_enc_init_error ); + DAP_DELETE(l_key_str); + }break; + case STAGE_STREAM_CTL:{ + log_it(L_INFO,"Go to stage STREAM_CTL: prepare the request"); + + size_t l_request_size = 10; + char *l_request = DAP_NEW_Z_SIZE (char,l_request_size) ; + snprintf(l_request, l_request_size,"%d", DAP_CLIENT_PROTOCOL_VERSION); + log_it(L_DEBUG,"STREAM_CTL request size %u",strlen(l_request)); + + dap_client_pvt_request_enc(a_client_pvt, + DAP_UPLINK_PATH_STREAM_CTL, + "stream_ctl","type=tcp,maxconn=4",l_request,l_request_size, + m_stream_ctl_response, m_stream_ctl_error); + DAP_DELETE(l_request); + }break; + case STAGE_STREAM_SESSION:{ + log_it(L_INFO,"Go to stage STREAM_SESSION: process the state ops"); + a_client_pvt->stage_status = STAGE_STATUS_DONE; + s_stage_status_after(a_client_pvt); + } break; + case STAGE_STREAM_STREAMING:{ + log_it(L_INFO,"Go to stage STREAM: prepare the socket"); + a_client_pvt->stream_socket = socket(PF_INET,SOCK_STREAM,0); + setsockopt(a_client_pvt->stream_socket,SOL_SOCKET,SO_SNDBUF,(const void*) 50000,sizeof(int) ); + setsockopt(a_client_pvt->stream_socket,SOL_SOCKET,SO_RCVBUF,(const void *) 50000,sizeof(int) ); + // Wrap socket and setup callbacks + static dap_events_socket_callbacks_t l_s_callbacks={ + .read_callback = m_es_stream_read , + .write_callback = m_es_stream_write , + .error_callback = m_es_stream_error , + .delete_callback = m_es_stream_delete + }; + a_client_pvt->stream_es = dap_events_socket_wrap_no_add(a_client_pvt->events, + a_client_pvt->stream_socket, &l_s_callbacks); + + a_client_pvt->stream_es->_inheritor = a_client_pvt->client; + a_client_pvt->stream = dap_stream_new_es(a_client_pvt->stream_es); + a_client_pvt->stream->is_client_to_uplink = true; + a_client_pvt->stream_session = dap_stream_session_pure_new(); + + + } break; + + default:{ + log_it(L_ERROR,"Undefined proccessing actions for stage status %s", + dap_client_stage_status_str(a_client_pvt->stage_status)); + a_client_pvt->stage_status = STAGE_STATUS_ERROR; + s_stage_status_after(a_client_pvt); // be carefull to not to loop! + } + } + } break; + case STAGE_STATUS_ERROR:{ + log_it(L_ERROR, "Error state, doing callback if present"); + if( a_client_pvt->stage_status_error_callback ){ + a_client_pvt->stage_status_error_callback(a_client_pvt->client,NULL); + // Expecting that its one-shot callback + //a_client_internal->stage_status_error_callback = NULL; + + } + a_client_pvt->stage = STAGE_ENC_INIT; + // Trying the step again + a_client_pvt->stage_status = STAGE_STATUS_IN_PROGRESS; + s_stage_status_after(a_client_pvt); + } break; + case STAGE_STATUS_DONE :{ + log_it(L_INFO, "Stage status %s is done", + dap_client_stage_str(a_client_pvt->stage) ); + bool l_is_last_stage=( a_client_pvt->stage == a_client_pvt->stage_target ); + if( a_client_pvt->stage_status_done_callback ){ + a_client_pvt->stage_status_done_callback(a_client_pvt->client,NULL); + // Expecting that its one-shot callback + //a_client_internal->stage_status_done_callback = NULL; + }else + log_it(L_WARNING,"Stage done callback is not present"); + + if (l_is_last_stage ){ + log_it(L_NOTICE, "Stage %s is achieved", + dap_client_stage_str(a_client_pvt->stage)); + if( a_client_pvt->stage_target_done_callback ){ + a_client_pvt->stage_target_done_callback(a_client_pvt->client,NULL); + // Expecting that its one-shot callback + a_client_pvt->stage_target_done_callback = NULL; + } + } else log_it(L_ERROR,"!! dap_CLIENT_STAGE_STATUS_DONE but not l_is_last_stage !!"); + }break; + default: log_it(L_ERROR,"Undefined proccessing actions for stage status %s", + dap_client_stage_status_str(a_client_pvt->stage_status)); + } + + if( a_client_pvt->stage_status_callback ) + a_client_pvt->stage_status_callback( a_client_pvt->client,NULL ); +} + + +/** + * @brief dap_client_internal_stage_transaction_begin + * @param a_client_internal + * @param a_stage_next + * @param a_done_callback + */ +void dap_client_pvt_stage_transaction_begin(dap_client_pvt_t * a_client_internal, dap_client_stage_t a_stage_next, + dap_client_callback_t a_done_callback) +{ + a_client_internal->stage_status_done_callback = a_done_callback; + a_client_internal->stage = a_stage_next; + a_client_internal->stage_status = STAGE_STATUS_IN_PROGRESS; + s_stage_status_after(a_client_internal); +} + +/** + * @brief dap_client_internal_request + * @param a_client_internal + * @param a_path + * @param a_request + * @param a_request_size + * @param a_response_proc + */ +void dap_client_pvt_request(dap_client_pvt_t * a_client_internal, const char * a_path, void * a_request, + size_t a_request_size, dap_client_callback_data_size_t a_response_proc, dap_client_callback_int_t a_response_error) +{ + a_client_internal->request_response_callback = a_response_proc; + a_client_internal->request_error_callback = a_response_error; + a_client_internal->is_encrypted = false; + + size_t l_url_size_max = 0; + char *l_url = NULL; + if(a_path){ + l_url_size_max= strlen(a_client_internal->uplink_addr)+strlen(a_path)+15; + l_url = DAP_NEW_Z_SIZE(char,l_url_size_max); + + snprintf(l_url,l_url_size_max,"http://%s:%u/%s",a_client_internal->uplink_addr, a_client_internal->uplink_port, a_path ); + }else{ + l_url_size_max= strlen(a_client_internal->uplink_addr)+15; + l_url = DAP_NEW_Z_SIZE(char,l_url_size_max); + snprintf(l_url,l_url_size_max,"http://%s:%u",a_client_internal->uplink_addr, a_client_internal->uplink_port ); + } + dap_http_client_simple_request(l_url, a_request?"POST":"GET","text/text", a_request, a_request_size, + NULL, + m_request_response, m_request_error, a_client_internal,NULL); + DAP_DELETE(l_url); +} + +/** + * @brief dap_client_internal_request_enc + * @param a_client_internal + * @param a_path + * @param a_sub_url + * @param a_query + * @param a_request + * @param a_request_size + * @param a_response_proc + * @param a_response_error + */ +void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char * a_path, + const char * a_sub_url, const char * a_query + , void * a_request, size_t a_request_size + ,dap_client_callback_data_size_t a_response_proc + , dap_client_callback_int_t a_response_error) +{ + log_it(L_DEBUG,"Encrypted request: sub_url '%s' query '%s'",a_sub_url?a_sub_url:"NULL", a_query?a_query:"NULL" ); + size_t l_sub_url_size = a_sub_url?strlen(a_sub_url): 0; + size_t l_query_size = a_query?strlen(a_query):0; + size_t l_url_size; + + char l_url[1024]={0}; + snprintf(l_url,1024,"http://%s:%u",a_client_internal->uplink_addr, a_client_internal->uplink_port ); + l_url_size = strlen(l_url); + + size_t l_sub_url_enc_size_max = l_sub_url_size ? (5*l_sub_url_size+16 ): 0; + char *l_sub_url_enc = l_sub_url_size ? DAP_NEW_Z_SIZE(char, l_sub_url_enc_size_max+1 ): NULL; + + size_t l_query_enc_size_max = l_query_size*5+16; + char *l_query_enc = l_query_size ? DAP_NEW_Z_SIZE(char,l_query_enc_size_max+1 ):NULL; + + size_t l_url_full_size_max = 5*l_sub_url_size + 5*l_query_size + 5 + l_url_size+2; + char * l_url_full = DAP_NEW_Z_SIZE(char, l_url_full_size_max+1); + + size_t l_request_enc_size_max = a_request_size ?a_request_size*2+16 : 0; + char * l_request_enc = a_request_size? DAP_NEW_Z_SIZE(char,l_request_enc_size_max+1 ) : NULL; + size_t l_request_enc_size = 0; + + a_client_internal->request_response_callback = a_response_proc; + a_client_internal->request_error_callback = a_response_error; + a_client_internal->is_encrypted = true; + size_t i; + dap_enc_data_type_t l_enc_type; + + if( a_client_internal->uplink_protocol_version >= 21 ) + l_enc_type = DAP_ENC_DATA_TYPE_B64_URLSAFE; + else + l_enc_type = DAP_ENC_DATA_TYPE_B64; + + if ( l_sub_url_size ) + dap_enc_code(a_client_internal->session_key, + a_sub_url,l_sub_url_size, + l_sub_url_enc, l_sub_url_enc_size_max, + l_enc_type); + + if ( l_query_size ) + dap_enc_code(a_client_internal->session_key, + a_query,l_query_size, + l_query_enc,l_query_enc_size_max, + l_enc_type); + + + if ( a_request_size ) + l_request_enc_size = dap_enc_code(a_client_internal->session_key, + a_request, a_request_size, + l_request_enc, l_request_enc_size_max, + DAP_ENC_DATA_TYPE_RAW ); + + if (a_path){ + if( l_sub_url_size ){ + if( l_query_size ){ + snprintf(l_url_full,l_url_full_size_max-1,"%s/%s/%s?%s" + ,l_url,a_path, l_sub_url_enc, l_query_enc ); + + }else{ + snprintf(l_url_full,l_url_full_size_max,"%s/%s/%s",l_url,a_path, l_sub_url_enc); + } + }else{ + snprintf(l_url_full,l_url_full_size_max,"%s/%s",l_url,a_path); + } + }else{ + snprintf(l_url_full,l_url_full_size_max,"%s",l_url); + } + + size_t l_key_hdr_str_size_max = strlen(a_client_internal->session_key_id)+10; + char *l_key_hdr_str = DAP_NEW_Z_SIZE(char,l_key_hdr_str_size_max); + snprintf(l_key_hdr_str,l_key_hdr_str_size_max,"KeyID: %s",a_client_internal->session_key_id ); + + + dap_http_client_simple_request(l_url_full, a_request?"POST":"GET","text/text", + l_request_enc, l_request_enc_size, NULL, + m_request_response, m_request_error, a_client_internal, l_key_hdr_str); + + DAP_DELETE(l_key_hdr_str); + if( l_sub_url_enc ) + DAP_DELETE(l_sub_url_enc); + + if( l_query_enc ) + DAP_DELETE(l_query_enc); + + if( l_url_full ) + DAP_DELETE(l_url_full); + + if( l_request_enc ) + DAP_DELETE(l_request_enc); +} + +/** + * @brief m_request_error + * @param a_err_code + * @param a_obj + */ +void m_request_error(int a_err_code, void * a_obj) +{ + dap_client_pvt_t * a_client_internal = (dap_client_pvt_t *) a_obj; + a_client_internal->request_error_callback(a_client_internal->client, a_err_code ); +} + +/** + * @brief m_request_response + * @param a_response + * @param a_response_size + * @param a_obj + */ +void m_request_response(void * a_response,size_t a_response_size,void * a_obj) +{ + dap_client_pvt_t * a_client_internal = (dap_client_pvt_t *) a_obj; + if( a_client_internal->is_encrypted){ + size_t l_response_dec_size_max = a_response_size ?a_response_size*2+16 : 0; + char * l_response_dec = a_response_size? DAP_NEW_Z_SIZE(char, l_response_dec_size_max ) : NULL; + size_t l_response_dec_size = 0; + if ( a_response_size) + l_response_dec_size = dap_enc_decode( a_client_internal->session_key, + a_response, a_response_size, + l_response_dec, l_response_dec_size_max, + DAP_ENC_DATA_TYPE_RAW ); + + a_client_internal->request_response_callback(a_client_internal->client, l_response_dec, l_response_dec_size ); + + if( l_response_dec ) + DAP_DELETE ( l_response_dec ); + }else{ + a_client_internal->request_response_callback(a_client_internal->client, a_response, a_response_size ); + } +} + + +/** + * @brief m_enc_init_response + * @param a_client + * @param a_response + * @param a_response_size + */ +void m_enc_init_response(dap_client_t * a_client, void * a_response,size_t a_response_size) +{ + dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client); + if( a_response_size > 10 && a_response_size < 50){ + char l_session_id_b64 [DAP_ENC_BASE64_ENCODE_SIZE(DAP_ENC_KS_KEY_ID_SIZE)+1]={0}; + char *l_bob_message_b64 = DAP_NEW_Z_SIZE(char,a_response_size- sizeof(l_session_id_b64)+1 ); + if (sscanf (a_response,"%s %s",l_session_id_b64, l_bob_message_b64) == 2 ){ + l_client_pvt->session_key_id = DAP_NEW_Z_SIZE(char,strlen(l_session_id_b64)+1); + dap_enc_base64_decode(l_session_id_b64,strlen(l_session_id_b64), + l_client_pvt->session_key_id, DAP_ENC_DATA_TYPE_B64); + log_it(L_DEBUG,"ENC: session Key ID %s",l_client_pvt->session_key_id); + + char *l_bob_message = DAP_NEW_Z_SIZE(char,strlen(l_bob_message_b64)+1); + size_t l_bob_message_size = dap_enc_base64_decode(l_bob_message_b64,strlen(l_bob_message_b64), + l_bob_message, DAP_ENC_DATA_TYPE_B64); + l_client_pvt->session_key_open->gen_alice_shared_key( + l_client_pvt->session_key_open, l_client_pvt->session_key_open->priv_key_data, + l_bob_message_size, l_bob_message); + + l_client_pvt->session_key = dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_IAES, + l_client_pvt->session_key_open->priv_key_data, // shared key + l_client_pvt->session_key_open->priv_key_data_size, + l_client_pvt->session_key_id,strlen(l_client_pvt->session_key_id), 0); + + DAP_DELETE(l_bob_message); + if( l_client_pvt->stage == STAGE_ENC_INIT ){ // We are in proper stage + l_client_pvt->stage_status = STAGE_STATUS_DONE; + s_stage_status_after(l_client_pvt); + }else{ + log_it(L_WARNING,"ENC: initialized encryption but current stage is %s (%s)", + dap_client_get_stage_str(a_client),dap_client_get_stage_status_str(a_client)); + } + } else { + log_it(L_ERROR, "ENC: Wrong response (size %u data '%s')",a_response_size,(char*) a_response); + l_client_pvt->last_error = ERROR_ENC_NO_KEY; + l_client_pvt->stage_status = STAGE_STATUS_ERROR; + s_stage_status_after(l_client_pvt); + } + DAP_DELETE(l_bob_message_b64); + }else if( a_response_size>1){ + log_it(L_ERROR, "ENC: Wrong response (size %u data '%s')",a_response_size,(char*) a_response); + l_client_pvt->last_error = ERROR_ENC_NO_KEY; + l_client_pvt->stage_status = STAGE_STATUS_ERROR; + s_stage_status_after(l_client_pvt); + }else{ + log_it(L_ERROR, "ENC: Wrong response (size %u)",a_response_size); + l_client_pvt->last_error = ERROR_ENC_NO_KEY; + l_client_pvt->stage_status = STAGE_STATUS_ERROR; + s_stage_status_after(l_client_pvt); + } +} + +/** + * @brief m_enc_init_error + * @param a_client + * @param a_err_code + */ +void m_enc_init_error(dap_client_t * a_client, int a_err_code) +{ + dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client); + //dap_client_internal_t * l_client_internal = dap_CLIENT_INTERNAL(a_client); + log_it(L_ERROR,"ENC: Can't init ecnryption session, err code %d",a_err_code); + + l_client_pvt->last_error = ERROR_NETWORK_CONNECTION_REFUSE ; + l_client_pvt->stage_status = STAGE_STATUS_ERROR; + s_stage_status_after(l_client_pvt); +} + + + + +/** + * @brief m_stream_ctl_response + * @param a_client + * @param a_data + * @param a_data_size + */ +void m_stream_ctl_response(dap_client_t * a_client, void * a_data, size_t a_data_size) +{ + dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); + + log_it(L_DEBUG, "STREAM_CTL response %u bytes length recieved", a_data_size); + char * l_response_str = DAP_NEW_Z_SIZE(char, a_data_size+1); + memcpy(l_response_str, a_data,a_data_size); + + if( a_data_size<4 ){ + log_it(L_ERROR, "STREAM_CTL Wrong reply: '%s'", l_response_str); + l_client_internal->last_error = ERROR_STREAM_CTL_ERROR_RESPONSE_FORMAT; + l_client_internal->stage_status = STAGE_STATUS_ERROR; + s_stage_status_after(l_client_internal); + }else if ( strcmp(l_response_str, "ERROR") == 0 ){ + log_it(L_WARNING, "STREAM_CTL Got ERROR from the remote site,expecting thats ERROR_AUTH"); + l_client_internal->last_error = ERROR_STREAM_CTL_ERROR_AUTH; + l_client_internal->stage_status = STAGE_STATUS_ERROR; + s_stage_status_after(l_client_internal); + }else { + int l_arg_count; + char l_stream_id[25]={0}; + char *l_stream_key = DAP_NEW_Z_SIZE(char,4096*3); + void * l_stream_key_raw = DAP_NEW_Z_SIZE(char,4096); + size_t l_stream_key_raw_size = 0; + uint32_t l_remote_protocol_version; + + l_arg_count = sscanf(l_response_str,"%25s %4096s %u" + ,l_stream_id,l_stream_key,&l_remote_protocol_version ); + if (l_arg_count <2 ){ + log_it(L_WARNING, "STREAM_CTL Need at least 2 arguments in reply (got %d)",l_arg_count); + l_client_internal->last_error = ERROR_STREAM_CTL_ERROR_RESPONSE_FORMAT; + l_client_internal->stage_status = STAGE_STATUS_ERROR; + s_stage_status_after(l_client_internal); + }else{ + + if( l_arg_count >2){ + l_client_internal->uplink_protocol_version = l_remote_protocol_version; + log_it(L_DEBUG,"Uplink protocol version %u",l_remote_protocol_version); + }else + log_it(L_WARNING,"No uplink protocol version, use the default version %d" + ,l_client_internal->uplink_protocol_version = DAP_PROTOCOL_VERSION); + + if(strlen(l_stream_id)<13){ + //log_it(L_DEBUG, "Stream server id %s, stream key length(base64 encoded) %u" + // ,l_stream_id,strlen(l_stream_key) ); + log_it(L_DEBUG, "Stream server id %s, stream key '%s'" + ,l_stream_id,l_stream_key ); + + //l_stream_key_raw_size = dap_enc_base64_decode(l_stream_key,strlen(l_stream_key), + // l_stream_key_raw,DAP_ENC_DATA_TYPE_B64); + // Delete old key if present + if(l_client_internal->stream_key) + dap_enc_key_delete(l_client_internal->stream_key); + + strncpy(l_client_internal->stream_id,l_stream_id,sizeof(l_client_internal->stream_id)-1); + l_client_internal->stream_key = + dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_IAES, l_stream_key, strlen(l_stream_key), NULL, 0, 0); + + //streamSocket->connectToHost(SapSession::me()->address(),SapSession::me()->port().toUShort(),QIODevice::ReadWrite); + + if( l_client_internal->stage == STAGE_STREAM_CTL ){ // We are on the right stage + l_client_internal->stage_status = STAGE_STATUS_DONE; + s_stage_status_after(l_client_internal); + }else{ + log_it(L_WARNING,"Expected to be stage STREAM_CTL but current stage is %s (%s)", + dap_client_get_stage_str(a_client),dap_client_get_stage_status_str(a_client)); + + } + }else{ + log_it(L_WARNING,"Wrong stream id response"); + l_client_internal->last_error = ERROR_STREAM_CTL_ERROR_RESPONSE_FORMAT; + l_client_internal->stage_status = STAGE_STATUS_ERROR; + s_stage_status_after(l_client_internal); + } + + } + DAP_DELETE(l_stream_key); + DAP_DELETE(l_stream_key_raw); + } +} + +/** + * @brief m_stream_ctl_error + * @param a_client + * @param a_error + */ +void m_stream_ctl_error(dap_client_t * a_client, int a_error) +{ + log_it(L_WARNING, "STREAM_CTL error %d",a_error); + + dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client); + + l_client_pvt->last_error = ERROR_STREAM_CTL_ERROR; + l_client_pvt->stage_status = STAGE_STATUS_ERROR; + + s_stage_status_after(l_client_pvt); + +} + +/** + * @brief m_stage_stream_opened + * @param a_client + * @param arg + */ +void m_stage_stream_streaming(dap_client_t * a_client, void* arg) +{ + log_it(L_INFO, "Stream is opened"); +} + +/** + * @brief m_es_stream_delete + * @param a_es + * @param arg + */ +void m_es_stream_delete(dap_events_socket_t * a_es, void * arg) +{ + log_it(L_INFO, "====================================================== stream delete/peer reconnect"); + dap_client_t* l_client = DAP_CLIENT(a_es); + if(l_client == NULL ){ + log_it(L_ERROR, "dap_client is not initialized"); + return; + } + + dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client); + if(l_client_pvt == NULL ){ + log_it(L_ERROR, "dap_client_pvt is not initialized"); + return; + } + + dap_stream_delete(l_client_pvt->stream); + l_client_pvt->stream = NULL; + if(l_client_pvt->client) + dap_client_reset(l_client_pvt->client); + +// l_client_pvt->client= NULL; + l_client_pvt->stream_es = NULL; + dap_stream_session_close(l_client_pvt->stream_session->id); + l_client_pvt->stream_session = NULL; + dap_client_go_stage(l_client_pvt->client ,STAGE_STREAM_STREAMING ,m_stage_stream_streaming ); +} + +/** + * @brief m_es_stream_read + * @param a_es + * @param arg + */\ +void m_es_stream_read(dap_events_socket_t * a_es, void * arg) +{ + dap_client_t * l_client = DAP_CLIENT(a_es); + dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client); + switch( l_client_pvt->stage ){ + case STAGE_STREAM_SESSION: + dap_client_go_stage(l_client_pvt->client ,STAGE_STREAM_STREAMING ,m_stage_stream_streaming ); + break; + case STAGE_STREAM_CONNECTED:{ // Collect HTTP headers before streaming + if(a_es->buf_in_size>1){ + char * l_pos_endl; + l_pos_endl = (char*) memchr( a_es->buf_in,'\r',a_es->buf_in_size-1); + if ( l_pos_endl ){ + if ( *(l_pos_endl+1) == '\n' ) { + dap_events_socket_shrink_buf_in(a_es,l_pos_endl - a_es->buf_in_str ); + log_it(L_DEBUG,"Header passed, go to streaming (%lu bytes already are in input buffer", a_es->buf_in_size); + l_client_pvt->stage =STAGE_STREAM_STREAMING; + l_client_pvt->stage_status = STAGE_STATUS_DONE; + s_stage_status_after(l_client_pvt); + + dap_stream_data_proc_read(l_client_pvt->stream); + dap_events_socket_shrink_buf_in(a_es,a_es->buf_in_size ); + } + } + } + }break; + case STAGE_STREAM_STREAMING:{ // if streaming - process data with stream processor + dap_stream_data_proc_read(l_client_pvt->stream); + dap_events_socket_shrink_buf_in(a_es,a_es->buf_in_size ); + } + break; + + } +} + +/** + * @brief m_es_stream_write + * @param a_es + * @param arg + */ +void m_es_stream_write(dap_events_socket_t * a_es, void * arg) +{ + dap_client_t * l_client = DAP_CLIENT(a_es); + dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client); + + switch( l_client_pvt->stage ){ + case STAGE_STREAM_STREAMING:{ + size_t i; + bool ready_to_write=false; + // log_it(DEBUG,"Process channels data output (%u channels)",STREAM(sh)->channel_count); + + for(i=0;i< l_client_pvt->stream->channel_count; i++){ + dap_stream_ch_t * ch = l_client_pvt->stream->channel[i]; + if(ch->ready_to_write){ + ch->proc->packet_out_callback(ch,NULL); + ready_to_write|=ch->ready_to_write; + } + } + //log_it(L_DEBUG,"stream_data_out (ready_to_write=%s)", ready_to_write?"true":"false"); + + dap_events_socket_set_writable(l_client_pvt->stream_es,ready_to_write); + //log_it(ERROR,"No stream_data_write_callback is defined"); + }break; + } +} diff --git a/dap_client_pvt.h b/dap_client_pvt.h new file mode 100644 index 0000000000000000000000000000000000000000..a25c1d5b4d6286ef6c031295d296386d05fc04e7 --- /dev/null +++ b/dap_client_pvt.h @@ -0,0 +1,96 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2017-2019 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#pragma once + +#include <stdbool.h> +#include <stdint.h> +#include "dap_client.h" +#include "dap_stream.h" +#include "dap_events_socket.h" +typedef struct dap_enc_key dap_enc_key_t; +typedef struct dap_http_client dap_http_client_t; + +typedef struct dap_client_internal +{ + dap_client_t * client; + + dap_http_client_t * http_client; + + dap_events_socket_t * stream_es; + int stream_socket; + dap_stream_session_t * stream_session; + dap_stream_t* stream; + dap_events_t * events; + + dap_enc_key_t * session_key_open; // Open assymetric keys exchange + dap_enc_key_t * session_key; // Symmetric private key for session encryption + dap_enc_key_t * stream_key; // Stream private key for stream encryption + char stream_id[25]; + + + char * session_key_id; + + + char * last_parsed_node; + char * uplink_addr; + uint16_t uplink_port; + uint32_t uplink_protocol_version; + + + dap_client_stage_t stage_target; + dap_client_callback_t stage_target_done_callback; + + dap_client_stage_t stage; + dap_client_stage_status_t stage_status; + dap_client_error_t last_error; + + dap_client_callback_t stage_status_callback; + + dap_client_callback_t stage_status_done_callback; + dap_client_callback_t stage_status_error_callback; + + bool is_encrypted; + dap_client_callback_data_size_t request_response_callback; + dap_client_callback_int_t request_error_callback; +} dap_client_pvt_t; + +#define DAP_CLIENT_PVT(a) ((dap_client_pvt_t*) a->_internal ) + +int dap_client_pvt_init(); +void dap_client_pvt_deinit(); + +void dap_client_pvt_stage_transaction_begin(dap_client_pvt_t * dap_client_pvt_t, dap_client_stage_t a_stage_next, + dap_client_callback_t a_done_callback); + +void dap_client_pvt_request(dap_client_pvt_t * a_client_internal, const char * a_path, void * a_request, + size_t a_request_size, dap_client_callback_data_size_t a_response_proc, dap_client_callback_int_t a_response_error); + +void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char * a_path, const char * a_sub_url, + const char * a_query, void * a_request, size_t a_request_size, + dap_client_callback_data_size_t a_response_proc, + dap_client_callback_int_t a_error_proc); + +void dap_client_pvt_new(dap_client_pvt_t * a_client_internal); +void dap_client_pvt_delete(dap_client_pvt_t * a_client_pvts); + diff --git a/dap_events.c b/dap_events.c new file mode 100755 index 0000000000000000000000000000000000000000..c233759b01c8ed25f1ef9f2e23796614ee6ea2ec --- /dev/null +++ b/dap_events.c @@ -0,0 +1,536 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2017-2019 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <arpa/inet.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/stat.h> + +#include <sys/epoll.h> + +#include <netdb.h> +#include <unistd.h> +#include <fcntl.h> +#include <string.h> +#include <time.h> + +#include <stdio.h> +#include <stdio.h> +#include <stdlib.h> +#include <stddef.h> +#include <errno.h> +#include <signal.h> + + + +#if 1 +#include <sys/timerfd.h> +#elif defined(DAP_OS_ANDROID) +#define NO_POSIX_SHED +#define NO_TIMER +#endif + +#include <utlist.h> + +#define _GNU_SOURCE +#define __USE_GNU +#include <sched.h> + +#include "dap_common.h" +#include "dap_events.h" + + +typedef struct open_connection_info { + dap_events_socket_t *es; + struct open_connection_info *next; +} dap_events_socket_info_t; + +dap_events_socket_info_t **s_dap_events_sockets; +static uint8_t s_threads_count = 1; +static size_t s_connection_timeout = 600; + +dap_worker_t * s_workers = NULL; +dap_thread_t * s_threads = NULL; + +#define LOG_TAG "dap_events" +#define MAX_EPOLL_EVENTS 255 + +size_t s_get_cpu_count() +{ +#ifndef NO_POSIX_SHED + cpu_set_t cs; + CPU_ZERO(&cs); + sched_getaffinity(0, sizeof(cs), &cs); + + size_t count = 0; + for (int i = 0; i < 32; i++){ + if (CPU_ISSET(i, &cs)) + count++; + } + return count; +#else + return 1; +#endif +} +/** + * @brief sa_server_init Init server module + * @arg a_threads_count number of events processor workers in parallel threads + * @return Zero if ok others if no + */ +int dap_events_init(size_t a_threads_count,size_t conn_timeout) +{ + s_threads_count = a_threads_count?a_threads_count: s_get_cpu_count(); + + if(conn_timeout)s_connection_timeout=conn_timeout; + + s_workers = (dap_worker_t *) calloc(1,sizeof(dap_worker_t)*s_threads_count ); + s_threads = (dap_thread_t *) calloc(1,sizeof(dap_thread_t)*s_threads_count ); + + if(dap_events_socket_init() != 0 ) + { + log_it(L_CRITICAL, "Can't init client submodule"); + return -1; + } + + s_dap_events_sockets = malloc(sizeof(dap_events_socket_info_t *) * s_threads_count ); + for(int i = 0; i < s_threads_count; i++) + s_dap_events_sockets[i] = NULL; // i == index == thread number + + // *open_connection_info = malloc(sizeof(open_connection_info) * my_config.threads_cnt); + log_it(L_NOTICE,"Initialized socket server module"); + signal(SIGPIPE, SIG_IGN); + return 0; +} + +/** + * @brief sa_server_deinit Deinit server module + */ +void dap_events_deinit() +{ + dap_events_socket_deinit(); +} + + + +/** + * @brief server_new Creates new empty instance of server_t + * @return New instance + */ +dap_events_t * dap_events_new() +{ + dap_events_t* ret=(dap_events_t*) calloc(1,sizeof(dap_events_t)); + pthread_rwlock_init(&ret->sockets_rwlock,NULL); + pthread_rwlock_init(&ret->servers_rwlock,NULL); + + return ret; +} + +/** + * @brief server_delete Delete event processor instance + * @param sh Pointer to the server instance + */ +void dap_events_delete(dap_events_t * a_events) +{ + dap_events_socket_t * cur, * tmp; + + if (a_events) + { + HASH_ITER(hh,a_events->sockets,cur,tmp) + dap_events_socket_delete(cur,false); + + if (a_events->_inheritor) + free(a_events->_inheritor); + pthread_rwlock_destroy(&a_events->sockets_rwlock); + pthread_rwlock_destroy(&a_events->servers_rwlock); + free(a_events); + } +} + +/** + * @brief dap_events_socket_info_remove + * @param cl + * @param n_thread + * @return + */ +static bool dap_events_socket_info_remove(dap_events_socket_t* cl, uint8_t n_thread) +{ + if( n_thread >= s_threads_count ){ + log_it(L_WARNING, "Number thread %u not exists. remove client from list error", n_thread); + return false; + } + dap_events_socket_info_t *el, *tmp; + + LL_FOREACH_SAFE(s_dap_events_sockets[n_thread], el, tmp) + { + if( el->es == cl ) + { + LL_DELETE(s_dap_events_sockets[n_thread], el); + log_it(L_DEBUG, "Removed event socket from the thread's list"); + return true; + } + } + + log_it(L_WARNING, "Try remove client from list but not find." + " Thread: %d client socket %d", n_thread, cl->socket); + return false; +} + +/** + * @brief s_socket_info_all_check_activity + * @param n_thread + * @param sh + */ +static void s_socket_info_all_check_activity(uint8_t n_thread, dap_events_t *sh) +{ +// log_it(L_INFO, "========================================================= Socket check"); +// return; /// TODO debug and make thats shit working, bitch! + dap_events_socket_info_t *ei; + LL_FOREACH(s_dap_events_sockets[n_thread], ei){ + if( ei->es->is_pingable ){ + if(( time(NULL) - ei->es->last_ping_request ) > (time_t) s_connection_timeout ){ // conn timeout + log_it(L_INFO, "Connection on socket %d close by timeout", ei->es->socket); + + dap_events_socket_t * cur = dap_events_socket_find(ei->es->socket, sh); + if ( cur != NULL ){ + dap_events_socket_remove_and_delete( cur ); + } else { + log_it(L_ERROR, "Trying close socket but not find on client hash!"); + close(ei->es->socket); + } + } else if(( time(NULL) - ei->es->last_ping_request ) > (time_t) s_connection_timeout/3 ){ + log_it(L_INFO, "Connection on socket %d last chance to remain alive", ei->es->socket); + + } + } + } +} + +/** + * @brief thread_worker_function + * @param arg + * @return + */ +static void* thread_worker_function(void *arg) +{ + dap_worker_t* w = (dap_worker_t*) arg; + dap_events_socket_t* cur; + +#ifndef NO_POSIX_SHED + cpu_set_t mask; + CPU_ZERO(&mask); + CPU_SET(*(int*)arg, &mask); + + if ( pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask) != 0 ) + { + log_it(L_CRITICAL, "Error pthread_setaffinity_np() You really have %d or more core in CPU?", *(int*)arg); + abort(); + } +#endif + + log_it(L_INFO, "Worker %d started, epoll fd %d", w->number_thread, w->epoll_fd); + struct epoll_event ev, events[MAX_EPOLL_EVENTS]; + memzero(&ev,sizeof(ev)); + memzero(&events,sizeof(events)); +#ifndef NO_TIMER + int timerfd; + if ((timerfd = timerfd_create(CLOCK_MONOTONIC, 0)) < 0) + { + log_it(L_CRITICAL, "Failed to create timer"); + abort(); + } +#endif + + struct itimerspec timerValue; + memzero(&timerValue, sizeof(timerValue)); + + timerValue.it_value.tv_sec = 10; + timerValue.it_value.tv_nsec = 0; + timerValue.it_interval.tv_sec = s_connection_timeout / 2; + timerValue.it_interval.tv_nsec = 0; + + +#ifndef NO_TIMER + ev.events = EPOLLIN; + ev.data.fd = timerfd; + epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, timerfd, &ev); + + if (timerfd_settime(timerfd, 0, &timerValue, NULL) < 0) { + log_it(L_CRITICAL, "Could not start timer"); + abort(); + } +#endif + + size_t total_sent; int bytes_sent; + while(1) { + int selected_sockets = epoll_wait(w->epoll_fd, events, MAX_EPOLL_EVENTS, -1); + // log_it(INFO, "Epoll pwait trigered worker %d", w->number_worker); + for(int n = 0; n < selected_sockets; n++) { +#ifndef NO_TIMER + if (events[n].data.fd == timerfd) { + static uint64_t val; + /* if we not reading data from socket, he triggered again */ + read(events[n].data.fd, &val, 8); + s_socket_info_all_check_activity(w->number_thread, w->events); + } else +#endif + if ( ( cur = dap_events_socket_find(events[n].data.fd, w->events) ) != NULL ) { + if( events[n].events & EPOLLERR ) { + log_it(L_ERROR,"Socket error: %s",strerror(errno)); + cur->signal_close=true; + cur->callbacks->error_callback(cur,NULL); // Call callback to process error event + } else { + if( events[n].events & EPOLLIN ) { + //log_it(DEBUG,"Comes connection in active read set"); + if(cur->buf_in_size == sizeof(cur->buf_in)) + { + log_it(L_WARNING, "Buffer is full when there is smth to read. Its dropped!"); + cur->buf_in_size=0; + } + + int bytes_read = recv(cur->socket, + cur->buf_in + cur->buf_in_size, + sizeof(cur->buf_in)-cur->buf_in_size, 0); + + if(bytes_read > 0) { + cur->buf_in_size += bytes_read; + //log_it(DEBUG, "Received %d bytes", bytes_read); + cur->callbacks->read_callback(cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well + + } else if(bytes_read < 0) { + log_it(L_ERROR,"Some error occured in recv() function: %s",strerror(errno)); + cur->signal_close = true; + } else if (bytes_read == 0) { + log_it(L_INFO, "Client socket disconnected"); + cur->signal_close = true; + } + } + + // Socket is ready to write + if( ( events[n].events & EPOLLOUT || cur->_ready_to_write ) + && ( !cur->signal_close ) ) { + ///log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size); + cur->callbacks->write_callback(cur, NULL); // Call callback to process write event + + if(cur->_ready_to_write) + { + cur->buf_out[cur->buf_out_size]='\0'; + static const uint32_t buf_out_zero_count_max = 20; + if(cur->buf_out_size == 0) + { + log_it(L_WARNING, "Output: nothing to send. Why we are in write socket set?"); + cur->buf_out_zero_count++; + if(cur->buf_out_zero_count > buf_out_zero_count_max) // How many time buf_out on write event could be empty + { + log_it(L_ERROR, "Output: nothing to send %u times, remove socket from the write set",buf_out_zero_count_max); + dap_events_socket_set_writable(cur,false); + } + } + else + cur->buf_out_zero_count=0; + } + + for(total_sent = 0; total_sent < cur->buf_out_size;) + { // If after callback there is smth to send - we do it + bytes_sent = send(cur->socket, + cur->buf_out + total_sent, + cur->buf_out_size - total_sent, + MSG_DONTWAIT | MSG_NOSIGNAL ); + if(bytes_sent < 0) + { + log_it(L_ERROR,"Some error occured in send() function"); + break; + } + total_sent+= bytes_sent; + //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", total_sent,sa_cur->buf_out_size); + } + + //log_it(L_DEBUG,"Output: sent %u bytes",total_sent); + cur->buf_out_size = 0; + } + } + + if(cur->signal_close) + { + log_it(L_INFO, "Got signal to close from the client %s", cur->hostaddr); + dap_events_socket_remove_and_delete(cur); + } + } else { + log_it(L_ERROR,"Socket %d is not present in epoll set", events[n].data.fd); + ev.events = EPOLLIN | EPOLLOUT | EPOLLERR; + ev.data.fd=events[n].data.fd; + + if (epoll_ctl(w->epoll_fd, EPOLL_CTL_DEL, events[n].data.fd, &ev) == -1) + log_it(L_ERROR,"Can't remove not presented socket from the epoll_fd"); + } + } + } + return NULL; +} + +/** + * @brief dap_worker_get_min + * @return + */ +dap_worker_t * dap_worker_get_min() +{ + return &s_workers[dap_worker_get_index_min()]; +} + +/** + * @brief dap_worker_get_index_min + * @return + */ +uint8_t dap_worker_get_index_min() +{ + uint8_t min = 0; + uint8_t i; + for(i = 1; i < s_threads_count; i++) + { + if ( s_workers[min].event_sockets_count > s_workers[i].event_sockets_count ) + min = i; + } + + return min; +} + +/** + * @brief dap_worker_print_all + */ +void dap_worker_print_all() +{ + uint8_t i; + for(i = 0; i < s_threads_count; i++) + { + log_it(L_INFO, "Worker: %d, count open connections: %d", + s_workers[i].number_thread, s_workers[i].event_sockets_count); + } +} + +/** + * @brief sa_server_loop Main server loop + * @param sh Server instance + * @return Zero if ok others if not + */ +int dap_events_start(dap_events_t * a_events) +{ + int i; + for(i = 0; i < s_threads_count; i++) + { + if ( (s_workers[i].epoll_fd = epoll_create(MAX_EPOLL_EVENTS)) == -1 ) + { + log_it(L_CRITICAL, "Error create epoll fd"); + return -1; + } + s_workers[i].event_sockets_count = 0; + s_workers[i].number_thread = i; + s_workers[i].events = a_events; + pthread_mutex_init(&s_workers[i].locker_on_count, NULL); + pthread_create(&s_threads[i].tid, NULL, thread_worker_function, &s_workers[i]); + } + + return 0; +} + +/** + * @brief dap_events_wait + * @param sh + * @return + */ +int dap_events_wait(dap_events_t * sh) +{ + (void) sh; + int i; + for(i = 0; i < s_threads_count; i++){ + void * ret; + pthread_join(s_threads[i].tid,&ret); + } + return 0; +} + + + +/** + * @brief dap_worker_add_events_socket + * @param a_worker + * @param a_events_socket + */ +void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket) +{ + struct epoll_event ev = {0}; + dap_worker_t *l_worker =dap_worker_get_min(); + + ev.events = EPOLLIN | EPOLLERR | EPOLLOUT; + ev.data.fd = a_events_socket->socket; + + + pthread_mutex_lock(&l_worker->locker_on_count); + l_worker->event_sockets_count++; + pthread_mutex_unlock(&l_worker->locker_on_count); + + dap_events_socket_info_t * l_es_info = DAP_NEW_Z(dap_events_socket_info_t); + l_es_info->es = a_events_socket; + a_events_socket->dap_worker = l_worker; + LL_APPEND(s_dap_events_sockets[l_worker->number_thread], l_es_info); + + if ( epoll_ctl(l_worker->epoll_fd, EPOLL_CTL_ADD, a_events_socket->socket, &ev) == 1 ) + log_it(L_CRITICAL,"Can't add event socket's handler to epoll_fd"); + +} + +/** + * @brief dap_events_socket_delete + * @param a_es + */ +void dap_events_socket_remove_and_delete(dap_events_socket_t* a_es) +{ + + struct epoll_event ev={0}; + ev.events = EPOLLIN | EPOLLOUT | EPOLLERR; + ev.data.fd=a_es->socket; + + if (epoll_ctl(a_es->dap_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &ev) == -1) + log_it(L_ERROR,"Can't remove event socket's handler from the epoll_fd"); + else + log_it(L_DEBUG,"Removed epoll's event from dap_worker #%u",a_es->dap_worker->number_thread); + + pthread_mutex_lock(&a_es->dap_worker->locker_on_count); + a_es->dap_worker->event_sockets_count--; + pthread_mutex_unlock(&a_es->dap_worker->locker_on_count); + + dap_events_socket_info_remove(a_es, a_es->dap_worker->number_thread); + dap_events_socket_delete(a_es,true); + +} + +/** + * @brief dap_events__thread_wake_up + * @param th + */ +void dap_events_thread_wake_up(dap_thread_t * th) +{ + (void) th; + + //pthread_kill(th->tid,SIGUSR1); +} diff --git a/dap_events.h b/dap_events.h new file mode 100755 index 0000000000000000000000000000000000000000..96e13b9903322d6436ca5f72692177de6f71af4c --- /dev/null +++ b/dap_events.h @@ -0,0 +1,79 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2017-2019 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#pragma once + +#include <netinet/in.h> + +#include <stdint.h> +#include <pthread.h> +#include "uthash.h" + +#include "dap_events_socket.h" + +struct dap_events; + +typedef void (*dap_events_callback_t) (struct dap_events *,void * arg); // Callback for specific server's operations + +typedef struct dap_thread{ + pthread_t tid; +} dap_thread_t; + +struct dap_worker; +typedef struct dap_events{ + dap_events_socket_t * sockets; // Hashmap of event sockets + pthread_rwlock_t sockets_rwlock; + + void * _inheritor; // Pointer to the internal data, HTTP for example + + dap_thread_t proc_thread; + pthread_rwlock_t servers_rwlock; +} dap_events_t; + +typedef struct dap_worker +{ + int event_sockets_count; + int epoll_fd; + uint8_t number_thread; + pthread_mutex_t locker_on_count; + dap_events_t * events; +} dap_worker_t; + + +int dap_events_init(size_t a_threads_count,size_t conn_t); // Init server module +void dap_events_deinit(); // Deinit server module + +void dap_events_thread_wake_up(dap_thread_t * th); +dap_events_t* dap_events_new(); +void dap_events_delete(dap_events_t * sh); +void dap_events_socket_remove_and_delete(dap_events_socket_t* a_es); + +int dap_events_start(dap_events_t * sh); +int dap_events_wait(dap_events_t * sh); + +uint8_t dap_worker_get_index_min(); +dap_worker_t * dap_worker_get_min(); +void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket); +void dap_worker_print_all(); + + diff --git a/dap_events_socket.c b/dap_events_socket.c new file mode 100755 index 0000000000000000000000000000000000000000..da97454dbe3468ab62dcb19aebb81e4d163ba7ec --- /dev/null +++ b/dap_events_socket.c @@ -0,0 +1,303 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2017-2019 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <sys/epoll.h> +#include <unistd.h> +#include <fcntl.h> +#include <stdlib.h> +#include <stdio.h> +#include <stdarg.h> +#include <unistd.h> +#include <string.h> +#include <assert.h> +#include "dap_common.h" +#include "dap_events.h" + +#include "dap_events_socket.h" + +#define LOG_TAG "dap_events_socket" + +/** + * @brief dap_events_socket_init Init clients module + * @return Zero if ok others if no + */ +int dap_events_socket_init() +{ + log_it(L_NOTICE,"Initialized socket client module"); + return 0; +} + +/** + * @brief dap_events_socket_deinit Deinit clients module + */ +void dap_events_socket_deinit() +{ + +} + + +/** + * @brief dap_events_socket_wrap + * @param a_events + * @param w + * @param s + * @param a_callbacks + * @return + */ +dap_events_socket_t * dap_events_socket_wrap_no_add( dap_events_t * a_events, + int a_sock, dap_events_socket_callbacks_t * a_callbacks) +{ + assert(a_events); + assert(a_callbacks); + log_it(L_DEBUG,"Dap event socket wrapped around %d sock", a_sock); + dap_events_socket_t * ret = DAP_NEW_Z(dap_events_socket_t); + ret->socket = a_sock; + ret->events = a_events; + ret->callbacks = a_callbacks; + ret->_ready_to_read=true; + return ret; +} + +/** + * @brief dap_events_socket_create_after + * @param a_es + */ +void dap_events_socket_create_after(dap_events_socket_t * a_es) +{ + if(a_es->callbacks->new_callback) + a_es->callbacks->new_callback(a_es,NULL); // Init internal structure + + pthread_rwlock_wrlock(&a_es->events->sockets_rwlock); + a_es->last_ping_request = time(NULL); + HASH_ADD_INT(a_es->events->sockets, socket, a_es); + pthread_rwlock_unlock(&a_es->events->sockets_rwlock); + + dap_worker_add_events_socket(a_es); +} + +/** + * @brief dap_events_socket_wrap + * @param a_events + * @param w + * @param s + * @param a_callbacks + * @return + */ +dap_events_socket_t * dap_events_socket_wrap2(dap_server_t * a_server, struct dap_events * a_events, + int a_sock, dap_events_socket_callbacks_t * a_callbacks) +{ + assert(a_events); + assert(a_callbacks); + assert(a_server); + log_it(L_DEBUG,"Sap event socket wrapped around %d sock", a_sock); + dap_events_socket_t * ret = DAP_NEW_Z(dap_events_socket_t); + ret->socket = a_sock; + ret->events = a_events; + ret->callbacks = a_callbacks; + + ret->_ready_to_read=true; + ret->is_pingable = true; + + pthread_rwlock_wrlock(&a_events->sockets_rwlock); + ret->last_ping_request = time(NULL); + HASH_ADD_INT(a_events->sockets, socket, ret); + pthread_rwlock_unlock(&a_events->sockets_rwlock); + if(a_callbacks->new_callback) + a_callbacks->new_callback(ret,NULL); // Init internal structure + return ret; +} + +/** + * @brief dap_events_socket_find + * @param sock + * @param sh + * @return + */ +dap_events_socket_t * dap_events_socket_find(int sock, struct dap_events * a_events) +{ + dap_events_socket_t * ret = NULL; + pthread_rwlock_rdlock(&a_events->sockets_rwlock); + HASH_FIND_INT(a_events->sockets, &sock, ret); + pthread_rwlock_unlock(&a_events->sockets_rwlock); + return ret; +} + +/** + * @brief dap_events_socket_ready_to_read + * @param sc + * @param isReady + */ +void dap_events_socket_set_readable(dap_events_socket_t * sc,bool is_ready) +{ + if(is_ready != sc->_ready_to_read){ + struct epoll_event ev={0}; + ev.events = EPOLLERR; + sc->_ready_to_read=is_ready; + if(sc->_ready_to_read) + ev.events |= EPOLLIN; + if(sc->_ready_to_write) + ev.events |= EPOLLOUT; + + ev.data.fd=sc->socket; + if (epoll_ctl(sc->dap_worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &ev) == -1) { + log_it(L_ERROR,"Can't update read client socket state in the epoll_fd"); + }else + dap_events_thread_wake_up(&sc->events->proc_thread); + } +} + +/** + * @brief dap_events_socket_ready_to_write + * @param sc + * @param isReady + */ +void dap_events_socket_set_writable(dap_events_socket_t * sc,bool is_ready) +{ + if(is_ready != sc->_ready_to_write){ + struct epoll_event ev={0}; + ev.events = EPOLLERR ; + sc->_ready_to_write=is_ready; + if(sc->_ready_to_read) + ev.events |= EPOLLIN; + if(sc->_ready_to_write) + ev.events |= EPOLLOUT; + ev.data.fd=sc->socket; + if (epoll_ctl(sc->dap_worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &ev) == -1) { + log_it(L_ERROR,"Can't update write client socket state in the epoll_fd"); + }else + dap_events_thread_wake_up(&sc->events->proc_thread); + } + +} + + +/** + * @brief dap_events_socket_remove Removes the client from the list + * @param sc Connection instance + */ +void dap_events_socket_delete(dap_events_socket_t *a_es, bool preserve_inheritor) +{ + if (a_es){ + log_it(L_DEBUG, "es is going to be removed from the lists and free the memory (0x%016X)", a_es ); + pthread_rwlock_wrlock(&a_es->events->sockets_rwlock); + HASH_DEL(a_es->events->sockets, a_es); + pthread_rwlock_unlock(&a_es->events->sockets_rwlock); + log_it(L_DEBUG, "dap_events_socket wrapped around %d socket is removed", a_es->socket); + + if(a_es->callbacks->delete_callback) + a_es->callbacks->delete_callback(a_es, NULL); // Init internal structure + + if(a_es->_inheritor && !preserve_inheritor) +// if(a_es->_inheritor) + free(a_es->_inheritor); + + if(a_es->socket){ + close(a_es->socket); + } + free(a_es); + } +} + +/** + * @brief dap_events_socket_write Write data to the client + * @param sc Conn instance + * @param data Pointer to data + * @param data_size Size of data to write + * @return Number of bytes that were placed into the buffer + */ +size_t dap_events_socket_write(dap_events_socket_t *sc, const void * data, size_t data_size) +{ + data_size = ((sc->buf_out_size+data_size)<(sizeof(sc->buf_out)))?data_size:(sizeof(sc->buf_out)-sc->buf_out_size ); + memcpy(sc->buf_out+sc->buf_out_size,data,data_size); + sc->buf_out_size+=data_size; + return data_size; +} + +/** + * @brief dap_events_socket_write_f Write formatted text to the client + * @param sc Conn instance + * @param format Format + * @return Number of bytes that were placed into the buffer + */ +size_t dap_events_socket_write_f(dap_events_socket_t *sc, const char * format,...) +{ + size_t max_data_size = sizeof(sc->buf_out)-sc->buf_out_size; + va_list ap; + va_start(ap,format); + int ret=vsnprintf((char*) sc->buf_out+sc->buf_out_size,max_data_size,format,ap); + va_end(ap); + if(ret>0){ + sc->buf_out_size+=ret; + return ret; + }else{ + log_it(L_ERROR,"Can't write out formatted data '%s'",format); + return 0; + } +} + +/** + * @brief dap_events_socket_read Read data from input buffer + * @param sc Conn instasnce + * @param data Pointer to memory where to store the data + * @param data_size Size of data to read + * @return Actual bytes number that were read + */ +size_t dap_events_socket_read(dap_events_socket_t *sc, void * data, size_t data_size) +{ + if(data_size<sc->buf_in_size){ + memcpy(data,sc->buf_in,data_size); + memmove(data,sc->buf_in+data_size,sc->buf_in_size-data_size); + }else{ + if(data_size>sc->buf_in_size) + data_size=sc->buf_in_size; + memcpy(data,sc->buf_in,data_size); + } + sc->buf_in_size-=data_size; + return data_size; +} + + +/** + * @brief dap_events_socket_shrink_client_buf_in Shrink input buffer (shift it left) + * @param cl Client instance + * @param shrink_size Size on wich we shrink the buffer with shifting it left + */ +void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size) +{ + if((shrink_size==0)||(cl->buf_in_size==0) ){ + return; + }else if(cl->buf_in_size>shrink_size){ + size_t buf_size=cl->buf_in_size-shrink_size; + void * buf = malloc(buf_size); + memcpy(buf,cl->buf_in+ shrink_size,buf_size ); + memcpy(cl->buf_in,buf,buf_size); + cl->buf_in_size=buf_size; + if (buf) + free(buf); + }else{ + //log_it(WARNING,"Shrinking size of input buffer on amount bigger than actual buffer's size"); + cl->buf_in_size=0; + } + +} diff --git a/dap_events_socket.h b/dap_events_socket.h new file mode 100755 index 0000000000000000000000000000000000000000..09e8cb3f323ec5239460a6053f5dada417a903c4 --- /dev/null +++ b/dap_events_socket.h @@ -0,0 +1,106 @@ +/* + * Authors: + * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Kelvin Project https://github.com/kelvinblockchain + * Copyright (c) 2017-2019 + * All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#pragma once +#include <stdint.h> +#include <stddef.h> +#include <stdbool.h> +#include "uthash.h" +struct dap_events; +struct dap_events_socket; +struct dap_worker; +typedef struct dap_server dap_server_t; +typedef void (*dap_events_socket_callback_t) (struct dap_events_socket *,void * arg); // Callback for specific client operations +typedef struct dap_events_socket_callbacks{ + dap_events_socket_callback_t new_callback; // Create new client callback + dap_events_socket_callback_t delete_callback; // Delete client callback + dap_events_socket_callback_t read_callback; // Read function + dap_events_socket_callback_t write_callback; // Write function + dap_events_socket_callback_t error_callback; // Error processing function + +} dap_events_socket_callbacks_t; + + +#define DAP_EVENTS_SOCKET_BUF 100000 + +typedef struct dap_events_socket{ + int socket; + bool signal_close; + + bool _ready_to_write; + bool _ready_to_read; + + uint32_t buf_out_zero_count; + union{ + uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data + char buf_in_str[DAP_EVENTS_SOCKET_BUF+1]; + }; + size_t buf_in_size; // size of data that is in the input buffer + + uint8_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data + + char hostaddr[1024]; // Address + char service[128]; + + size_t buf_out_size; // size of data that is in the output buffer + + struct dap_events * events; + + struct dap_worker* dap_worker; + dap_events_socket_callbacks_t *callbacks; + + time_t time_connection; + time_t last_ping_request; + bool is_pingable; + + UT_hash_handle hh; + + void * _inheritor; // Inheritor data to specific client type, usualy states for state machine +} dap_events_socket_t; // Node of bidirectional list of clients + + + +int dap_events_socket_init(); // Init clients module +void dap_events_socket_deinit(); // Deinit clients module + +void dap_events_socket_create_after(dap_events_socket_t * a_es); + +dap_events_socket_t * dap_events_socket_wrap_no_add(struct dap_events * a_events, + int s, dap_events_socket_callbacks_t * a_callbacks); // Create new client and add it to the list + + +dap_events_socket_t * dap_events_socket_find(int sock, struct dap_events * sh); // Find client by socket + +bool dap_events_socket_is_ready_to_read(dap_events_socket_t * sc); +bool dap_events_socket_is_ready_to_write(dap_events_socket_t * sc); +void dap_events_socket_set_readable(dap_events_socket_t * sc,bool is_ready); +void dap_events_socket_set_writable(dap_events_socket_t * sc,bool is_ready); + +size_t dap_events_socket_write(dap_events_socket_t *sc, const void * data, size_t data_size); +size_t dap_events_socket_write_f(dap_events_socket_t *sc, const char * format,...); +size_t dap_events_socket_read(dap_events_socket_t *sc, void * data, size_t data_size); + +void dap_events_socket_delete(dap_events_socket_t *sc,bool preserve_inheritor); // Removes the client from the list + +void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size); +