From 33a4bf014974e810478792625ab824082e46f7d6 Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net> Date: Mon, 5 Oct 2020 22:17:07 +0700 Subject: [PATCH] [+] New DAP_SOCK_CONNECTING esocket flags and .callback_connected callback for non-blocking client socket [*] Reworked dap_client for non-blocking client [+] Added non-blocking flags for the rest descriptors (everything non-blocking now to prevent I/O reactor locks) [+] Published dap_events_socket_worker_poll_update_unsafe() that was static before --- CMakeLists.txt | 2 +- dap-sdk/net/client/dap_client.c | 4 +- dap-sdk/net/client/dap_client_pvt.c | 212 +++++++++++-------- dap-sdk/net/client/include/dap_client.h | 1 + dap-sdk/net/client/include/dap_client_pvt.h | 16 +- dap-sdk/net/core/dap_events_socket.c | 15 +- dap-sdk/net/core/dap_timerfd.c | 2 +- dap-sdk/net/core/dap_worker.c | 175 +++++++++------ dap-sdk/net/core/include/dap_events_socket.h | 16 +- modules/net/dap_chain_net.c | 18 +- 10 files changed, 278 insertions(+), 183 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 07e48a064f..392122de39 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.6-10") +set(CELLFRAME_SDK_NATIVE_VERSION "2.6-11") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/dap-sdk/net/client/dap_client.c b/dap-sdk/net/client/dap_client.c index faab826f00..3f03bcb08d 100644 --- a/dap-sdk/net/client/dap_client.c +++ b/dap-sdk/net/client/dap_client.c @@ -218,7 +218,7 @@ void dap_client_delete(dap_client_t * a_client) return; pthread_mutex_lock(&a_client->mutex); - dap_client_pvt_delete(DAP_CLIENT_PVT(a_client)); + dap_client_pvt_delete_n_wait(DAP_CLIENT_PVT(a_client)); pthread_mutex_unlock(&a_client->mutex); pthread_mutex_destroy(&a_client->mutex); DAP_DELETE(a_client); @@ -244,13 +244,11 @@ void dap_client_go_stage(dap_client_t * a_client, dap_client_stage_t a_stage_tar assert(l_client_internal); - pthread_mutex_lock( &l_client_internal->stage_mutex); l_client_internal->stage_target = a_stage_target; l_client_internal->stage_target_done_callback = a_stage_end_callback; dap_client_stage_t l_cur_stage = l_client_internal->stage; dap_client_stage_status_t l_cur_stage_status= l_client_internal->stage_status; - pthread_mutex_unlock( &l_client_internal->stage_mutex); if(a_stage_target != l_cur_stage ){ // Going to stages downstairs diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index f3627a7042..de0202c14b 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -1,26 +1,25 @@ /* * Authors: * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> - * DeM Labs Inc. https://demlabs.net - * CellFrame SDK https://cellframe.net - * Copyright (c) 2017-2019 + * DeM Labs Ltd. https://demlabs.net + * Copyright (c) 2017-2020 * All rights reserved. - This file is part of DAP (Deus Applications Prototypes) the open source project + This file is part of DAP SDK the open source project - DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. + DAP SDK is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. - DAP is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. + DAP SDK is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. - You should have received a copy of the GNU General Public License - along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. - */ + You should have received a copy of the GNU General Public License + along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>. +*/ #include <stdlib.h> #include <stdio.h> @@ -92,15 +91,19 @@ void m_stage_stream_streaming(dap_client_t * a_client, void* arg); // STREAM stage callbacks void m_stream_response(dap_client_t *, void *, size_t); void m_stream_error(dap_client_t *, int); - void m_request_response(void * a_response, size_t a_response_size, void * a_obj); void m_request_error(int, void *); +// Stream connection callback +static void s_stream_connected(dap_client_pvt_t * a_client_pvt); +static void s_stream_connect_error(dap_client_pvt_t * a_client_pvt, int a_err); + // stream callbacks -void m_es_stream_delete(dap_events_socket_t * a_es, void * arg); -void m_es_stream_read(dap_events_socket_t * a_es, void * arg); -void m_es_stream_write(dap_events_socket_t * a_es, void * arg); -void m_es_stream_error(dap_events_socket_t * a_es, int a_arg); +static void s_stream_es_callback_connected(dap_events_socket_t * a_es); +static void s_stream_es_callback_delete(dap_events_socket_t * a_es, void * arg); +static void s_stream_es_callback_read(dap_events_socket_t * a_es, void * arg); +static void s_stream_es_callback_write(dap_events_socket_t * a_es, void * arg); +static void s_stream_es_callback_error(dap_events_socket_t * a_es, int a_arg); /** * @brief dap_client_internal_init @@ -130,7 +133,6 @@ void dap_client_pvt_new(dap_client_pvt_t * a_client_internal) a_client_internal->events = dap_events_get_default(); pthread_mutex_init( &a_client_internal->disconnected_mutex, NULL); pthread_cond_init( &a_client_internal->disconnected_cond, NULL); - pthread_mutex_init( &a_client_internal->stage_mutex, NULL); // add to list dap_client_pvt_hh_add(a_client_internal); } @@ -143,8 +145,10 @@ void dap_client_pvt_new(dap_client_pvt_t * a_client_internal) static void s_client_pvt_disconnected(dap_client_t * a_client, void * a_arg ) { (void) a_arg; + // To be sure thats cond waiter is waiting and unlocked mutex pthread_mutex_lock(&DAP_CLIENT_PVT(a_client)->disconnected_mutex); pthread_mutex_unlock(&DAP_CLIENT_PVT(a_client)->disconnected_mutex); + pthread_cond_broadcast(&DAP_CLIENT_PVT(a_client)->disconnected_cond); } @@ -153,7 +157,7 @@ static void s_client_pvt_disconnected(dap_client_t * a_client, void * a_arg ) * @param a_client * @return */ -int dap_client_pvt_disconnect_all(dap_client_pvt_t *a_client_pvt) +int dap_client_pvt_disconnect_all_n_wait(dap_client_pvt_t *a_client_pvt) { //dap_client_pvt_t *a_client_pvt = (a_client) ? DAP_CLIENT_PVT(a_client) : NULL; if(!a_client_pvt) @@ -170,7 +174,7 @@ int dap_client_pvt_disconnect_all(dap_client_pvt_t *a_client_pvt) * @brief dap_client_pvt_delete * @param a_client_pvt */ -static void dap_client_pvt_delete_in(dap_client_pvt_t * a_client_pvt) +void dap_client_pvt_delete_n_wait(dap_client_pvt_t * a_client_pvt) { if(!a_client_pvt) return; @@ -180,7 +184,7 @@ static void dap_client_pvt_delete_in(dap_client_pvt_t * a_client_pvt) return; } - dap_client_pvt_disconnect_all(a_client_pvt); + dap_client_pvt_disconnect_all_n_wait(a_client_pvt); log_it(L_INFO, "dap_client_pvt_delete 0x%x", a_client_pvt); @@ -206,32 +210,43 @@ static void dap_client_pvt_delete_in(dap_client_pvt_t * a_client_pvt) pthread_mutex_destroy( &a_client_pvt->disconnected_mutex); pthread_cond_destroy( &a_client_pvt->disconnected_cond); - pthread_mutex_destroy(&a_client_pvt->stage_mutex); - //a_client_pvt->client = NULL; - // DAP_DELETE(a_client_pvt); } -/* -static void* dap_client_pvt_delete_proc(void *a_arg) +/** + * @brief s_stream_connected + * @param a_client_pvt + */ +static void s_stream_connected(dap_client_pvt_t * a_client_pvt) { - dap_client_pvt_t * l_client_pvt = (dap_client_pvt_t*)a_arg; - // wait for release l_client_pvt - //dap_client_pvt_wait_unref(l_client_pvt, 20000000); - - //dap_client_reset(l_client_pvt->client); - dap_client_pvt_delete_in(l_client_pvt); - //DAP_DELETE(l_client_pvt->client); - pthread_exit(0); -}*/ + log_it(L_INFO, "Remote address connected (%s:%u) with sock_id %d (assign on worker #%u)", a_client_pvt->uplink_addr, + a_client_pvt->uplink_port, a_client_pvt->stream_socket, a_client_pvt->stream_worker->worker->id); + a_client_pvt->stage_status = STAGE_STATUS_DONE; + s_stage_status_after(a_client_pvt); +} /** - * @brief dap_client_pvt_delete + * @brief s_stream_connect_error * @param a_client_pvt + * @param a_err */ -void dap_client_pvt_delete(dap_client_pvt_t * a_client_pvt) +static void s_stream_connect_error (dap_client_pvt_t * a_client_pvt, int a_err) { - //pthread_create(&l_thread, NULL, dap_client_pvt_delete_proc, a_client_pvt); - dap_client_pvt_delete_in(a_client_pvt); + char l_errbuf[128]; + l_errbuf[0]='\0'; + if (a_err) + strerror_r(a_err,l_errbuf,sizeof (l_errbuf)); + else + strncpy(l_errbuf,"Unknown Error",sizeof(l_errbuf)-1); + log_it(L_ERROR, "Remote address can't connected (%s:%u) with sock_id %d: \"%s\" (code %d)", a_client_pvt->uplink_addr, + a_client_pvt->uplink_port, l_errbuf, a_err); + dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es); + //close(a_client_pvt->stream_socket); + a_client_pvt->stream_socket = 0; + a_client_pvt->stage_status = STAGE_STATUS_ERROR; + a_client_pvt->last_error = ERROR_STREAM_CONNECT ; + + s_stage_status_after(a_client_pvt); + } /** @@ -241,9 +256,12 @@ void dap_client_pvt_delete(dap_client_pvt_t * a_client_pvt) static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) { //bool l_is_unref = false; - switch (a_client_pvt->stage_status) { + dap_client_stage_status_t l_stage_status = a_client_pvt->stage_status; + dap_client_stage_t l_stage = a_client_pvt->stage; + + switch (l_stage_status) { case STAGE_STATUS_IN_PROGRESS: { - switch (a_client_pvt->stage) { + switch (l_stage) { case STAGE_ENC_INIT: { log_it(L_INFO, "Go to stage ENC: prepare the request"); a_client_pvt->session_key_open = dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_MSRLN, NULL, 0, NULL, 0, 0); @@ -310,6 +328,8 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) log_it(L_INFO, "Go to stage STREAM_SESSION: process the state ops"); a_client_pvt->stream_socket = socket( PF_INET, SOCK_STREAM, 0); + fcntl( a_client_pvt->stream_socket, F_SETFL, O_NONBLOCK); + if (a_client_pvt->stream_socket == -1) { log_it(L_ERROR, "Error %d with socket create", errno); a_client_pvt->stage_status = STAGE_STATUS_ERROR; @@ -323,20 +343,23 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, &optsize ); } #else - int buffsize = 65536; + int buffsize = 65536*4; setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (const void *) &buffsize, sizeof(int)); setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (const void *) &buffsize, sizeof(int)); #endif // Wrap socket and setup callbacks static dap_events_socket_callbacks_t l_s_callbacks = { - .read_callback = m_es_stream_read, - .write_callback = m_es_stream_write, - .error_callback = m_es_stream_error, - .delete_callback = m_es_stream_delete - }; + .read_callback = s_stream_es_callback_read, + .write_callback = s_stream_es_callback_write, + .error_callback = s_stream_es_callback_error, + .delete_callback = s_stream_es_callback_delete, + .connected_callback = s_stream_es_callback_connected + };// a_client_pvt->stream_es = dap_events_socket_wrap_no_add(a_client_pvt->events, a_client_pvt->stream_socket, &l_s_callbacks); + a_client_pvt->stream_es->flags &= DAP_SOCK_CONNECTING ; // To catch non-blocking error when connecting we should ar WRITE flag + dap_worker_t * l_worker = dap_events_worker_get_auto(); assert(l_worker); assert(l_worker->_inheritor); @@ -344,7 +367,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) // add to dap_worker dap_events_socket_assign_on_worker_mt(a_client_pvt->stream_es, l_worker); - a_client_pvt->stream_es->_inheritor = a_client_pvt;//->client; + a_client_pvt->stream_es->_inheritor = a_client_pvt; a_client_pvt->stream = dap_stream_new_es_client(a_client_pvt->stream_es); assert(a_client_pvt->stream); a_client_pvt->stream->is_client_to_uplink = true; @@ -356,11 +379,10 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) // connect - struct sockaddr_in l_remote_addr; - memset(&l_remote_addr, 0, sizeof(l_remote_addr)); - l_remote_addr.sin_family = AF_INET; - l_remote_addr.sin_port = htons(a_client_pvt->uplink_port); - if(inet_pton(AF_INET, a_client_pvt->uplink_addr, &(l_remote_addr.sin_addr)) < 0) { + memset(&a_client_pvt->stream_es->remote_addr, 0, sizeof(a_client_pvt->stream_es->remote_addr)); + a_client_pvt->stream_es->remote_addr.sin_family = AF_INET; + a_client_pvt->stream_es->remote_addr.sin_port = htons(a_client_pvt->uplink_port); + if(inet_pton(AF_INET, a_client_pvt->uplink_addr, &(a_client_pvt->stream_es->remote_addr.sin_addr)) < 0) { log_it(L_ERROR, "Wrong remote address '%s:%u'", a_client_pvt->uplink_addr, a_client_pvt->uplink_port); //close(a_client_pvt->stream_socket); dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es); @@ -369,25 +391,18 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) } else { int l_err = 0; - if((l_err = connect(a_client_pvt->stream_socket, (struct sockaddr *) &l_remote_addr, - sizeof(struct sockaddr_in))) != -1) { + strncpy( a_client_pvt->stream_es->remote_addr_str, a_client_pvt->uplink_addr, + sizeof (a_client_pvt->stream_es->remote_addr_str)-1 ); - // a_client_pvt->stream_es->flags &= ~DAP_SOCK_SIGNAL_CLOSE;// ??? what it was? Why out of esocket context??? - //s_set_sock_nonblock(a_client_pvt->stream_socket, false); - log_it(L_INFO, "Remote address connected (%s:%u) with sock_id %d (assign on worker #%u)", a_client_pvt->uplink_addr, - a_client_pvt->uplink_port, a_client_pvt->stream_socket, l_worker->id); - a_client_pvt->stage_status = STAGE_STATUS_DONE; + if((l_err = connect(a_client_pvt->stream_socket, (struct sockaddr *) &a_client_pvt->stream_es->remote_addr, + sizeof(struct sockaddr_in))) != -1) { + s_stream_connected(a_client_pvt); } - else { - log_it(L_ERROR, "Remote address can't connected (%s:%u) with sock_id %d", a_client_pvt->uplink_addr, - a_client_pvt->uplink_port); - dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es); - //close(a_client_pvt->stream_socket); - a_client_pvt->stream_socket = 0; - a_client_pvt->stage_status = STAGE_STATUS_ERROR; + else if (l_err != EINPROGRESS){ + s_stream_connect_error(a_client_pvt,l_err); } } - s_stage_status_after(a_client_pvt); + } break; @@ -481,12 +496,14 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) a_client_pvt->stage = STAGE_ENC_INIT; // Trying the step again a_client_pvt->stage_status = STAGE_STATUS_IN_PROGRESS; + //dap_client_pvt_ref(a_client_pvt); s_stage_status_after(a_client_pvt); } else{ log_it(L_INFO, "Too many connection attempts. Tries are over."); a_client_pvt->stage_status = STAGE_STATUS_DONE; + // unref pvt //l_is_unref = true; } @@ -494,8 +511,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) } break; case STAGE_STATUS_DONE: { - log_it(L_INFO, "Stage status %s is done", - dap_client_stage_str(a_client_pvt->stage)); + log_it(L_INFO, "Stage status %s is done", dap_client_stage_str(a_client_pvt->stage)); // go to next stage if(a_client_pvt->stage_status_done_callback) { a_client_pvt->stage_status_done_callback(a_client_pvt->client, NULL); @@ -503,7 +519,6 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) //a_client_internal->stage_status_done_callback = NULL; } else log_it(L_WARNING, "Stage done callback is not present"); - bool l_is_last_stage = (a_client_pvt->stage == a_client_pvt->stage_target); if(l_is_last_stage) { //l_is_unref = true; @@ -539,13 +554,10 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) void dap_client_pvt_stage_transaction_begin(dap_client_pvt_t * a_client_internal, dap_client_stage_t a_stage_next, dap_client_callback_t a_done_callback) { - pthread_mutex_lock( &a_client_internal->stage_mutex); - a_client_internal->stage_status_done_callback = a_done_callback; a_client_internal->stage = a_stage_next; a_client_internal->stage_status = STAGE_STATUS_IN_PROGRESS; s_stage_status_after(a_client_internal); - pthread_mutex_unlock( &a_client_internal->stage_mutex); } /** @@ -577,8 +589,9 @@ int dap_client_pvt_request(dap_client_pvt_t * a_client_internal, const char * a_ // l_url = DAP_NEW_Z_SIZE(char, l_url_size_max); // snprintf(l_url, l_url_size_max, "http://%s:%u", a_client_internal->uplink_addr, a_client_internal->uplink_port); // } - void *l_ret = dap_client_http_request(a_client_internal->uplink_addr,a_client_internal->uplink_port, a_request ? "POST" : "GET", "text/text", a_path, a_request, - a_request_size, NULL, m_request_response, m_request_error, a_client_internal, NULL); + void *l_ret = dap_client_http_request(a_client_internal->uplink_addr,a_client_internal->uplink_port, + a_request ? "POST" : "GET", "text/text", a_path, a_request, + a_request_size, NULL, m_request_response, m_request_error, a_client_internal, NULL); // a_client_internal->curl = dap_http_client_simple_request(l_url, a_request ? "POST" : "GET", "text/text", a_request, // a_request_size, NULL, m_request_response, m_request_error, &a_client_internal->curl_sockfd, a_client_internal, NULL); // DAP_DELETE(l_url); @@ -850,30 +863,39 @@ void m_enc_init_response(dap_client_t * a_client, void * a_response, size_t a_re l_client_pvt->session_key_id, strlen(l_client_pvt->session_key_id), 0); DAP_DELETE(l_bob_message); + pthread_mutex_lock(&l_client_pvt->disconnected_mutex); if(l_client_pvt->stage == STAGE_ENC_INIT) { // We are in proper stage l_client_pvt->stage_status = STAGE_STATUS_DONE; + pthread_mutex_unlock(&l_client_pvt->disconnected_mutex); s_stage_status_after(l_client_pvt); } else { log_it(L_WARNING, "ENC: initialized encryption but current stage is %s (%s)", dap_client_get_stage_str(a_client), dap_client_get_stage_status_str(a_client)); + pthread_mutex_unlock(&l_client_pvt->disconnected_mutex); } } else { log_it(L_ERROR, "ENC: Wrong response (size %u data '%s')", a_response_size, (char* ) a_response); + pthread_mutex_lock(&l_client_pvt->disconnected_mutex); l_client_pvt->last_error = ERROR_ENC_NO_KEY; l_client_pvt->stage_status = STAGE_STATUS_ERROR; + pthread_mutex_unlock(&l_client_pvt->disconnected_mutex); s_stage_status_after(l_client_pvt); } DAP_DELETE(l_session_id_b64); DAP_DELETE(l_bob_message_b64); } else if(a_response_size > 1) { log_it(L_ERROR, "ENC: Wrong response (size %u data '%s')", a_response_size, (char* ) a_response); + pthread_mutex_lock(&l_client_pvt->disconnected_mutex); l_client_pvt->last_error = ERROR_ENC_NO_KEY; l_client_pvt->stage_status = STAGE_STATUS_ERROR; + pthread_mutex_unlock(&l_client_pvt->disconnected_mutex); s_stage_status_after(l_client_pvt); } else { log_it(L_ERROR, "ENC: Wrong response (size %u)", a_response_size); + pthread_mutex_lock(&l_client_pvt->disconnected_mutex); l_client_pvt->last_error = ERROR_ENC_NO_KEY; l_client_pvt->stage_status = STAGE_STATUS_ERROR; + pthread_mutex_unlock(&l_client_pvt->disconnected_mutex); s_stage_status_after(l_client_pvt); } } @@ -892,12 +914,14 @@ void m_enc_init_error(dap_client_t * a_client, int a_err_code) } //dap_client_internal_t * l_client_internal = dap_CLIENT_INTERNAL(a_client); log_it(L_ERROR, "ENC: Can't init ecnryption session, err code %d", a_err_code); + pthread_mutex_lock(&l_client_pvt->disconnected_mutex); if (a_err_code == ETIMEDOUT) { l_client_pvt->last_error = ERROR_NETWORK_CONNECTION_TIMEOUT; } else { l_client_pvt->last_error = ERROR_NETWORK_CONNECTION_REFUSE; } l_client_pvt->stage_status = STAGE_STATUS_ERROR; + pthread_mutex_unlock(&l_client_pvt->disconnected_mutex); s_stage_status_after(l_client_pvt); } @@ -1047,7 +1071,6 @@ void m_stream_error(dap_client_t * a_client, int a_error) log_it(L_ERROR, "m_stream_error: l_client_pvt is NULL!"); return; } - pthread_mutex_lock(&l_client_pvt->stage_mutex); if (a_error == ETIMEDOUT) { l_client_pvt->last_error = ERROR_NETWORK_CONNECTION_TIMEOUT; @@ -1057,8 +1080,6 @@ void m_stream_error(dap_client_t * a_client, int a_error) l_client_pvt->stage_status = STAGE_STATUS_ERROR; s_stage_status_after(l_client_pvt); - pthread_mutex_unlock(&l_client_pvt->stage_mutex); - } /** @@ -1071,16 +1092,27 @@ void m_stage_stream_streaming(dap_client_t * a_client, void* arg) log_it(L_INFO, "Stream is opened"); } +/** + * @brief s_stream_es_callback_new + * @param a_es + * @param arg + */ +static void s_stream_es_callback_connected(dap_events_socket_t * a_es) +{ + dap_client_pvt_t * l_client_pvt =(dap_client_pvt_t*) a_es->_inheritor; + s_stream_connected(l_client_pvt); +} + /** * @brief m_es_stream_delete * @param a_es * @param arg */ -void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) +static void s_stream_es_callback_delete(dap_events_socket_t *a_es, void *arg) { log_it(L_INFO, "================= stream delete/peer reconnect"); - dap_client_pvt_t * l_client_pvt = a_es->_inheritor; + dap_client_pvt_t * l_client_pvt =(dap_client_pvt_t*) a_es->_inheritor; a_es->_inheritor = NULL; // To prevent delete in reactor if(l_client_pvt == NULL) { @@ -1090,7 +1122,6 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) log_it(L_DEBUG, "client_pvt=0x%x", l_client_pvt); - pthread_mutex_lock(&l_client_pvt->stage_mutex); if (l_client_pvt->stage_status_error_callback) { if(l_client_pvt == l_client_pvt->client->_internal) l_client_pvt->stage_status_error_callback(l_client_pvt->client, (void *) true); @@ -1101,7 +1132,6 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) dap_stream_delete(l_client_pvt->stream); l_client_pvt->stream = NULL; l_client_pvt->stream_es = NULL; - pthread_mutex_unlock(&l_client_pvt->stage_mutex); /* disable reconnect from here if(l_client_pvt->is_reconnect) { log_it(L_DEBUG, "l_client_pvt->is_reconnect = true"); @@ -1118,10 +1148,10 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) * @param a_es * @param arg */ -void m_es_stream_read(dap_events_socket_t * a_es, void * arg) +static void s_stream_es_callback_read(dap_events_socket_t * a_es, void * arg) { //dap_client_t * l_client = DAP_CLIENT(a_es); - dap_client_pvt_t * l_client_pvt = a_es->_inheritor;//(l_client) ? DAP_CLIENT_PVT(l_client) : NULL; + dap_client_pvt_t * l_client_pvt =(dap_client_pvt_t *) a_es->_inheritor;//(l_client) ? DAP_CLIENT_PVT(l_client) : NULL; if(!l_client_pvt) { log_it(L_ERROR, "m_es_stream_read: l_client_pvt is NULL!"); return; @@ -1139,12 +1169,10 @@ void m_es_stream_read(dap_events_socket_t * a_es, void * arg) dap_events_socket_shrink_buf_in(a_es, l_pos_endl - a_es->buf_in_str); log_it(L_DEBUG, "Header passed, go to streaming (%lu bytes already are in input buffer", a_es->buf_in_size); - pthread_mutex_lock(&l_client_pvt->stage_mutex); l_client_pvt->stage = STAGE_STREAM_STREAMING; l_client_pvt->stage_status = STAGE_STATUS_DONE; s_stage_status_after(l_client_pvt); - pthread_mutex_unlock(&l_client_pvt->stage_mutex); dap_stream_data_proc_read(l_client_pvt->stream); dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size); @@ -1168,7 +1196,7 @@ void m_es_stream_read(dap_events_socket_t * a_es, void * arg) * @param a_es * @param arg */ -void m_es_stream_write(dap_events_socket_t * a_es, void * arg) +static void s_stream_es_callback_write(dap_events_socket_t * a_es, void * arg) { //dap_client_t * l_client = DAP_CLIENT(a_es); //dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL; @@ -1201,7 +1229,7 @@ void m_es_stream_write(dap_events_socket_t * a_es, void * arg) } } -void m_es_stream_error(dap_events_socket_t * a_es, int a_arg) +static void s_stream_es_callback_error(dap_events_socket_t * a_es, int a_arg) { //dap_client_t * l_client = DAP_CLIENT(a_es); //dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL; diff --git a/dap-sdk/net/client/include/dap_client.h b/dap-sdk/net/client/include/dap_client.h index 272bf758ee..ecf7d58d9f 100644 --- a/dap-sdk/net/client/include/dap_client.h +++ b/dap-sdk/net/client/include/dap_client.h @@ -60,6 +60,7 @@ typedef enum dap_client_error { ERROR_STREAM_CTL_ERROR, ERROR_STREAM_CTL_ERROR_AUTH, ERROR_STREAM_CTL_ERROR_RESPONSE_FORMAT, + ERROR_STREAM_CONNECT, ERROR_STREAM_RESPONSE_WRONG, ERROR_STREAM_RESPONSE_TIMEOUT, ERROR_STREAM_FREEZED, diff --git a/dap-sdk/net/client/include/dap_client_pvt.h b/dap-sdk/net/client/include/dap_client_pvt.h index 3785c90df6..8ff519b1db 100644 --- a/dap-sdk/net/client/include/dap_client_pvt.h +++ b/dap-sdk/net/client/include/dap_client_pvt.h @@ -1,25 +1,24 @@ /* * Authors: * Dmitriy A. Gearasimov <gerasimov.dmitriy@demlabs.net> - * DeM Labs Inc. https://demlabs.net - * Kelvin Project https://github.com/kelvinblockchain - * Copyright (c) 2017-2019 + * DeM Labs Ltd. https://demlabs.net + * Copyright (c) 2017-2020 * All rights reserved. - This file is part of DAP (Deus Applications Prototypes) the open source project + This file is part of DAP SDK the open source project - DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + DAP SDK is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. - DAP is distributed in the hope that it will be useful, + DAP SDK is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License - along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. + along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>. */ #pragma once @@ -62,7 +61,6 @@ typedef struct dap_client_internal uint32_t uplink_protocol_version; uint32_t remote_protocol_version; - pthread_mutex_t stage_mutex; // Protect all the stage_ fields below dap_client_stage_t stage_target; dap_client_callback_t stage_target_done_callback; dap_client_stage_t stage; @@ -103,7 +101,7 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char dap_client_callback_int_t a_error_proc); void dap_client_pvt_new(dap_client_pvt_t * a_client_internal); -void dap_client_pvt_delete(dap_client_pvt_t * a_client_pvts); +void dap_client_pvt_delete_n_wait(dap_client_pvt_t * a_client_pvts); //int dap_client_pvt_ref(dap_client_pvt_t * a_client_internal); //int dap_client_pvt_unref(dap_client_pvt_t * a_client_internal); diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 2ba195f72b..d6929e3a09 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -192,6 +192,9 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c // log_it(L_DEBUG, "Created one-way unnamed bytestream pipe %d->%d", l_pipe[0], l_pipe[1]); l_es->fd = l_pipe[0]; l_es->fd2 = l_pipe[1]; + fcntl( l_pipe[0], F_SETFL, O_NONBLOCK); + fcntl( l_pipe[1], F_SETFL, O_NONBLOCK); + #else #error "No defined s_create_type_pipe() for your platform" #endif @@ -407,7 +410,7 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_ #endif #ifdef DAP_EVENTS_CAPS_EVENT_EVENTFD - if((l_es->fd = eventfd(0,0) ) < 0 ){ + if((l_es->fd = eventfd(0,EFD_NONBLOCK) ) < 0 ){ int l_errno = errno; char l_errbuf[128]; l_errbuf[0]=0; @@ -694,7 +697,7 @@ dap_events_socket_t *dap_events_socket_find_unsafe( int sock, struct dap_events return ret; } -static void s_worker_poll_mod(dap_events_socket_t * a_esocket) +void dap_events_socket_worker_poll_update_unsafe(dap_events_socket_t * a_esocket) { #if defined (DAP_EVENTS_CAPS_EPOLL) int events = a_esocket->ev_base_flags | EPOLLERR; @@ -703,7 +706,7 @@ static void s_worker_poll_mod(dap_events_socket_t * a_esocket) if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) events |= EPOLLIN; - if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) + if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE || a_esocket->flags &DAP_SOCK_CONNECTING ) events |= EPOLLOUT; a_esocket->ev.events = events; @@ -723,7 +726,7 @@ static void s_worker_poll_mod(dap_events_socket_t * a_esocket) // Check & add if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) l_poll->events |= POLLIN; - if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) + if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE || a_esocket->flags &DAP_SOCK_CONNECTING ) l_poll->events |= POLLOUT; }else{ log_it(L_ERROR, "Wrong poll index when remove from worker (unsafe): %u when total count %u", a_esocket->poll_index, @@ -752,7 +755,7 @@ void dap_events_socket_set_readable_unsafe( dap_events_socket_t *a_esocket, bool a_esocket->flags ^= DAP_SOCK_READY_TO_READ; if( a_esocket->worker) - s_worker_poll_mod( a_esocket); + dap_events_socket_worker_poll_update_unsafe( a_esocket); } /** @@ -772,7 +775,7 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool a_esocket->flags ^= DAP_SOCK_READY_TO_WRITE; if( a_esocket->worker ) - s_worker_poll_mod(a_esocket); + dap_events_socket_worker_poll_update_unsafe(a_esocket); } /** diff --git a/dap-sdk/net/core/dap_timerfd.c b/dap-sdk/net/core/dap_timerfd.c index f2cc746622..d8526a500a 100644 --- a/dap-sdk/net/core/dap_timerfd.c +++ b/dap-sdk/net/core/dap_timerfd.c @@ -74,7 +74,7 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t { struct itimerspec l_ts; - int l_tfd = timerfd_create(CLOCK_MONOTONIC, 0); + int l_tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); if(l_tfd == -1) { log_it(L_WARNING, "dap_timerfd_start() failed: timerfd_create() errno=%d\n", errno); return NULL; diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index e08e31b2b1..fafbee2d1e 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -192,7 +192,7 @@ void *dap_worker_thread(void *arg) dap_events_socket_set_writable_unsafe(l_cur, false); l_cur->buf_out_size = 0; l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; - l_cur->callbacks.error_callback(l_cur, 0); // Call callback to process error event + l_cur->callbacks.error_callback(l_cur, l_sock_err); // Call callback to process error event } if (l_flag_hup) { @@ -245,6 +245,8 @@ void *dap_worker_thread(void *arg) struct sockaddr l_remote_addr; socklen_t l_remote_addr_size= sizeof (l_remote_addr); int l_remote_socket= accept(l_cur->socket ,&l_remote_addr,&l_remote_addr_size); + fcntl( l_remote_socket, F_SETFL, O_NONBLOCK); + int l_errno = errno; if ( l_remote_socket == -1 ){ if( l_errno == EAGAIN || l_errno == EWOULDBLOCK){// Everything is good, we'll receive ACCEPT on next poll @@ -310,82 +312,104 @@ void *dap_worker_thread(void *arg) } } - // Socket is ready to write - if(( l_flag_write || (l_cur->flags & DAP_SOCK_READY_TO_WRITE)) - && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE)) { + // If its outgoing connection + if ( l_flag_write && ! l_cur->server && l_cur->flags& DAP_SOCK_CONNECTING && + (l_cur->type == DESCRIPTOR_TYPE_SOCKET || l_cur->type == DESCRIPTOR_TYPE_SOCKET_UDP )){ + int l_error = 0; + socklen_t l_error_len = sizeof(l_error); + char l_error_buf[128]; + l_error_buf[0]='\0'; + getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, &l_error, &l_error_len); + if (l_error){ + strerror_r(l_error, l_error_buf, sizeof (l_error_buf)); + log_it(L_ERROR,"Connecting error with %s: \"%s\" (code %d)", l_cur->remote_addr_str[0]? l_cur->remote_addr_str: "(NULL)" + , + l_error_buf, l_error); + if ( l_cur->callbacks.error_callback ) + l_cur->callbacks.error_callback(l_cur, l_error); + }else if(l_error == EINPROGRESS) { + log_it(L_DEBUG, "Connecting with %s in progress...", l_cur->remote_addr_str[0]? l_cur->remote_addr_str: "(NULL)"); + }else{ + log_it(L_NOTICE, "Connected with %s",l_cur->remote_addr_str[0]? l_cur->remote_addr_str: "(NULL)"); + l_cur->flags ^= DAP_SOCK_CONNECTING; + if (l_cur->callbacks.connected_callback) + l_cur->callbacks.connected_callback(l_cur); + } + } + // Socket is ready to write and not going to close + if(( l_flag_write&&(l_cur->flags & DAP_SOCK_READY_TO_WRITE) || + (l_cur->flags & DAP_SOCK_READY_TO_WRITE)) && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE)) { //log_it(L_DEBUG, "Main loop output: %u bytes to send", l_cur->buf_out_size); if(l_cur->callbacks.write_callback) l_cur->callbacks.write_callback(l_cur, NULL); // Call callback to process write event - if (l_cur->worker == NULL ){ // esocket was unassigned in callback, we don't need any ops with it now, - // continue to poll another esockets - continue; - } - if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) { + if ( l_cur->worker ){ // esocket wasn't unassigned in callback, we need some other ops with it + if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) { - static const uint32_t buf_out_zero_count_max = 2; - //l_cur->buf_out[l_cur->buf_out_size] = 0; + static const uint32_t buf_out_zero_count_max = 2; + //l_cur->buf_out[l_cur->buf_out_size] = 0; - if(!l_cur->buf_out_size) { + if(!l_cur->buf_out_size) { - //log_it(L_WARNING, "Output: nothing to send. Why we are in write socket set?"); - l_cur->buf_out_zero_count++; + //log_it(L_WARNING, "Output: nothing to send. Why we are in write socket set?"); + l_cur->buf_out_zero_count++; - if(l_cur->buf_out_zero_count > buf_out_zero_count_max) { // How many time buf_out on write event could be empty - //log_it(L_WARNING, "Output: nothing to send %u times, remove socket from the write set", - // buf_out_zero_count_max); - dap_events_socket_set_writable_unsafe(l_cur, false); + if(l_cur->buf_out_zero_count > buf_out_zero_count_max) { // How many time buf_out on write event could be empty + //log_it(L_WARNING, "Output: nothing to send %u times, remove socket from the write set", + // buf_out_zero_count_max); + dap_events_socket_set_writable_unsafe(l_cur, false); + } } + else + l_cur->buf_out_zero_count = 0; } - else - l_cur->buf_out_zero_count = 0; - } - //for(total_sent = 0; total_sent < cur->buf_out_size;) { // If after callback there is smth to send - we do it - ssize_t l_bytes_sent =0; - int l_errno; - switch (l_cur->type){ - case DESCRIPTOR_TYPE_SOCKET: - l_bytes_sent = send(l_cur->socket, (const char *)l_cur->buf_out, - l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL); - l_errno = errno; - break; - case DESCRIPTOR_TYPE_SOCKET_UDP: - l_bytes_sent = sendto(l_cur->socket, (const char *)l_cur->buf_out, - l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL, - (struct sockaddr *)&l_cur->remote_addr, sizeof(l_cur->remote_addr)); - l_errno = errno; - break; - case DESCRIPTOR_TYPE_PIPE: - case DESCRIPTOR_TYPE_FILE: - l_bytes_sent = write(l_cur->socket, (char *) (l_cur->buf_out + l_bytes_sent), - l_cur->buf_out_size ); - l_errno = errno; - break; - default: - log_it(L_WARNING, "Socket %d is not SOCKET, PIPE or FILE but has WRITE state on. Switching it off"); - dap_events_socket_set_writable_unsafe(l_cur,false); - } - - if(l_bytes_sent < 0) { - if (l_errno != EAGAIN && l_errno != EWOULDBLOCK ){ // If we have non-blocking socket - log_it(L_ERROR, "Some error occured in send(): %s", strerror(errno)); - l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; - l_cur->buf_out_size = 0; - + //for(total_sent = 0; total_sent < cur->buf_out_size;) { // If after callback there is smth to send - we do it + ssize_t l_bytes_sent =0; + int l_errno; + switch (l_cur->type){ + case DESCRIPTOR_TYPE_SOCKET: + l_bytes_sent = send(l_cur->socket, (const char *)l_cur->buf_out, + l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL); + l_errno = errno; + break; + case DESCRIPTOR_TYPE_SOCKET_UDP: + l_bytes_sent = sendto(l_cur->socket, (const char *)l_cur->buf_out, + l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL, + (struct sockaddr *)&l_cur->remote_addr, sizeof(l_cur->remote_addr)); + l_errno = errno; + break; + case DESCRIPTOR_TYPE_PIPE: + case DESCRIPTOR_TYPE_FILE: + l_bytes_sent = write(l_cur->socket, (char *) (l_cur->buf_out + l_bytes_sent), + l_cur->buf_out_size ); + l_errno = errno; + break; + default: + log_it(L_WARNING, "Socket %d is not SOCKET, PIPE or FILE but has WRITE state on. Switching it off"); + dap_events_socket_set_writable_unsafe(l_cur,false); } - }else{ - //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", l_bytes_sent,l_cur->buf_out_size); - if (l_bytes_sent) { - if ( l_bytes_sent <= (ssize_t) l_cur->buf_out_size ){ - l_cur->buf_out_size -= l_bytes_sent; - if (l_cur->buf_out_size ) { - memmove(l_cur->buf_out, &l_cur->buf_out[l_bytes_sent], l_cur->buf_out_size); - } - }else{ - log_it(L_ERROR, "Wrong bytes sent, %zd more then was in buffer %zd",l_bytes_sent, l_cur->buf_out_size); + if(l_bytes_sent < 0) { + if (l_errno != EAGAIN && l_errno != EWOULDBLOCK ){ // If we have non-blocking socket + log_it(L_ERROR, "Some error occured in send(): %s", strerror(errno)); + l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; l_cur->buf_out_size = 0; + + } + }else{ + + //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", l_bytes_sent,l_cur->buf_out_size); + if (l_bytes_sent) { + if ( l_bytes_sent <= (ssize_t) l_cur->buf_out_size ){ + l_cur->buf_out_size -= l_bytes_sent; + if (l_cur->buf_out_size ) { + memmove(l_cur->buf_out, &l_cur->buf_out[l_bytes_sent], l_cur->buf_out_size); + } + }else{ + log_it(L_ERROR, "Wrong bytes sent, %zd more then was in buffer %zd",l_bytes_sent, l_cur->buf_out_size); + l_cur->buf_out_size = 0; + } } } } @@ -460,9 +484,15 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) return; } - if ( l_es_new->type == DESCRIPTOR_TYPE_SOCKET || l_es_new->type == DESCRIPTOR_TYPE_SOCKET_LISTENING ){ - int l_cpu = w->id; - setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu)); + switch( l_es_new->type){ + + case DESCRIPTOR_TYPE_SOCKET_UDP: + case DESCRIPTOR_TYPE_SOCKET: + case DESCRIPTOR_TYPE_SOCKET_LISTENING:{ + int l_cpu = w->id; + setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu)); + }break; + default: {} } l_es_new->worker = w; @@ -574,6 +604,17 @@ static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg) DAP_DELETE(l_msg); return; } + if (l_msg->flags_set & DAP_SOCK_CONNECTING) + if (! l_msg_es->flags & DAP_SOCK_CONNECTING ){ + l_msg_es->flags |= DAP_SOCK_CONNECTING; + dap_events_socket_worker_poll_update_unsafe(l_msg_es); + } + + if (l_msg->flags_set & DAP_SOCK_CONNECTING) + if (! l_msg_es->flags & DAP_SOCK_CONNECTING ){ + l_msg_es->flags ^= DAP_SOCK_CONNECTING; + dap_events_socket_worker_poll_update_unsafe(l_msg_es); + } if (l_msg->flags_set & DAP_SOCK_READY_TO_READ) dap_events_socket_set_readable_unsafe(l_msg_es, true); @@ -660,7 +701,7 @@ int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_wo a_worker->poll[a_worker->poll_count].events = a_esocket->poll_base_flags; if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) a_worker->poll[a_worker->poll_count].events |= POLLIN; - if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) + if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE || a_esocket->flags & DAP_SOCK_CONNECTING ) a_worker->poll[a_worker->poll_count].events |= POLLOUT; a_worker->poll_esocket[a_worker->poll_count] = a_esocket; diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index a93a0ad3b6..86e5560cad 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -70,7 +70,7 @@ #define DAP_SOCK_READY_TO_READ BIT( 0 ) #define DAP_SOCK_READY_TO_WRITE BIT( 1 ) #define DAP_SOCK_SIGNAL_CLOSE BIT( 2 ) -#define DAP_SOCK_ACTIVE BIT( 3 ) +#define DAP_SOCK_CONNECTING BIT( 3 ) // When connection happens this flag is armed for outgoing connections until its establish the connection #define DAP_SOCK_REASSIGN_ONCE BIT( 4 ) // This usable for FlowControl to prevent multiple reassigment // If set - queue limited to sizeof(void*) size of data transmitted @@ -89,10 +89,12 @@ typedef void (*dap_events_socket_callback_pipe_t) (dap_events_socket_t *,const v typedef void (*dap_events_socket_callback_queue_ptr_t) (dap_events_socket_t *, void *); // Callback for specific client operations typedef void (*dap_events_socket_callback_timer_t) (dap_events_socket_t * ); // Callback for specific client operations typedef void (*dap_events_socket_callback_accept_t) (dap_events_socket_t * , int, struct sockaddr* ); // Callback for accept of new connection +typedef void (*dap_events_socket_callback_connected_t) (dap_events_socket_t * ); // Callback for connected client connection typedef void (*dap_events_socket_worker_callback_t) (dap_events_socket_t *,dap_worker_t * ); // Callback for specific client operations typedef struct dap_events_socket_callbacks { - union{ + union{ // Specific callbacks + dap_events_socket_callback_connected_t connected_callback; // Connected callback for client socket dap_events_socket_callback_accept_t accept_callback; // Accept callback for listening socket dap_events_socket_callback_timer_t timer_callback; // Timer callback for listening socket dap_events_socket_callback_event_t event_callback; // Event callback for listening socket @@ -165,7 +167,13 @@ typedef struct dap_events_socket { // Stored string representation char hostaddr[1024]; // Address char service[128]; - struct sockaddr_in remote_addr; // For UDP datagrams + + // Remote address, port and others + struct sockaddr_in remote_addr; + char remote_addr_str[INET_ADDRSTRLEN]; + char remote_addr_str6[INET6_ADDRSTRLEN]; + short remote_port; + // Links to related objects dap_events_t *events; @@ -228,6 +236,8 @@ size_t dap_events_socket_pop_from_buf_in(dap_events_socket_t *sc, void * data, s bool dap_events_socket_check_unsafe(dap_worker_t * a_worker,dap_events_socket_t * a_es); void dap_events_socket_set_readable_unsafe(dap_events_socket_t * sc,bool is_ready); void dap_events_socket_set_writable_unsafe(dap_events_socket_t * sc,bool is_ready); +void dap_events_socket_worker_poll_update_unsafe(dap_events_socket_t * a_esocket); + size_t dap_events_socket_write_unsafe(dap_events_socket_t *sc, const void * data, size_t data_size); size_t dap_events_socket_write_f_unsafe(dap_events_socket_t *sc, const char * format,...); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 44e6632d77..1d79df09da 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -29,9 +29,25 @@ #include <stdint.h> #include <string.h> #include <errno.h> - #include <pthread.h> + +#ifdef DAP_OS_UNIX +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> +#include <netdb.h> +#endif + +#ifdef WIN32 +#include <winsock2.h> +#include <windows.h> +#include <mswsock.h> +#include <ws2tcpip.h> +#include <io.h> +#endif + + #include "uthash.h" #include "utlist.h" -- GitLab