diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 0a7f7873f8a79bef4068ff38510054b0f1c170f2..a5507db49f151bff4d71f742ced894a424a415fe 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -109,13 +109,17 @@ void *dap_worker_thread(void *arg) { dap_events_socket_t *l_cur; dap_worker_t *l_worker = (dap_worker_t *) arg; - //time_t l_next_time_timeout_check = time( NULL) + s_connection_timeout / 2; uint32_t l_tn = l_worker->id; + int l_errno = 0, l_selected_sockets; + socklen_t l_error_len = sizeof(l_errno); + char l_error_buf[128] = {0}; + ssize_t l_bytes_sent = 0, l_bytes_read = 0, l_sockets_max; + const struct sched_param l_shed_params = {0}; + dap_cpu_assign_thread_on(l_worker->id); pthread_setspecific(l_worker->events->pth_key_worker, l_worker); - struct sched_param l_shed_params; - l_shed_params.sched_priority = 0; + #ifdef DAP_OS_WINDOWS if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL)) log_it(L_ERROR, "Couldn'r set thread priority, err: %lu", GetLastError()); @@ -168,7 +172,6 @@ void *dap_worker_thread(void *arg) l_worker->queue_es_reassign_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_reassign); } - l_worker->queue_callback = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_callback_callback); l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback); @@ -180,8 +183,6 @@ void *dap_worker_thread(void *arg) pthread_mutex_unlock(&l_worker->started_mutex); while (1) { - int l_selected_sockets; - size_t l_sockets_max; #ifdef DAP_EVENTS_CAPS_EPOLL l_selected_sockets = epoll_wait(l_worker->epoll_fd, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1); l_sockets_max = l_selected_sockets; @@ -201,10 +202,8 @@ void *dap_worker_thread(void *arg) #ifdef DAP_OS_WINDOWS log_it(L_ERROR, "Worker thread %d got errno %d", l_worker->id, WSAGetLastError()); #else - int l_errno = errno; - char l_errbuf[128]; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it(L_ERROR, "Worker thread %d got errno:\"%s\" (%d)", l_worker->id, l_errbuf, l_errno); + strerror_r(l_errno, l_error_buf, sizeof (l_error_buf) - 1); + log_it(L_ERROR, "Worker thread %d got errno:\"%s\" (%d)", l_worker->id, l_error_buf, l_errno); assert(l_errno); #endif break; @@ -213,6 +212,7 @@ void *dap_worker_thread(void *arg) time_t l_cur_time = time( NULL); for(size_t n = 0; n < l_sockets_max; n++) { bool l_flag_hup, l_flag_rdhup, l_flag_read, l_flag_write, l_flag_error, l_flag_nval, l_flag_msg, l_flag_pri; + #ifdef DAP_EVENTS_CAPS_EPOLL l_cur = (dap_events_socket_t *) l_epoll_events[n].data.ptr; uint32_t l_cur_flags = l_epoll_events[n].events; @@ -232,6 +232,7 @@ void *dap_worker_thread(void *arg) if (!l_cur_flags) // No events for this socket continue; + l_flag_hup = l_cur_flags& POLLHUP; l_flag_rdhup = l_cur_flags & POLLRDHUP; l_flag_write = (l_cur_flags & POLLOUT) || (l_cur_flags &POLLWRNORM)|| (l_cur_flags &POLLWRBAND ) ; @@ -394,8 +395,6 @@ void *dap_worker_thread(void *arg) l_cur->buf_in_size = 0; } - int32_t l_bytes_read = 0; - int l_errno=0; bool l_must_read_smth = false; switch (l_cur->type) { case DESCRIPTOR_TYPE_PIPE: @@ -520,7 +519,7 @@ void *dap_worker_thread(void *arg) } l_cur->buf_in_size += l_bytes_read; if(g_debug_reactor) - log_it(L_DEBUG, "Received %d bytes for fd %d ", l_bytes_read, l_cur->fd); + log_it(L_DEBUG, "Received %zd bytes for fd %d ", l_bytes_read, l_cur->fd); if(l_cur->callbacks.read_callback){ l_cur->callbacks.read_callback(l_cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well if (l_cur->worker == NULL ){ // esocket was unassigned in callback, we don't need any ops with it now, @@ -528,7 +527,7 @@ void *dap_worker_thread(void *arg) continue; } }else{ - log_it(L_WARNING, "We have incomming %d data but no read callback on socket %"DAP_FORMAT_SOCKET", removing from read set", + log_it(L_WARNING, "We have incomming %zd data but no read callback on socket %"DAP_FORMAT_SOCKET", removing from read set", l_bytes_read, l_cur->socket); dap_events_socket_set_readable_unsafe(l_cur,false); } @@ -587,10 +586,6 @@ void *dap_worker_thread(void *arg) // If its outgoing connection if ((l_flag_write && !l_cur->server && l_cur->flags & DAP_SOCK_CONNECTING && l_cur->type == DESCRIPTOR_TYPE_SOCKET_CLIENT) || (l_cur->type == DESCRIPTOR_TYPE_SOCKET_CLIENT_SSL && l_cur->flags & DAP_SOCK_CONNECTING)) { - int l_error = 0; - socklen_t l_error_len = sizeof(l_error); - char l_error_buf[128]; - l_error_buf[0]='\0'; if (l_cur->type == DESCRIPTOR_TYPE_SOCKET_CLIENT_SSL) { #ifndef DAP_NET_CLIENT_NO_SSL WOLFSSL *l_ssl = SSL(l_cur); @@ -614,15 +609,17 @@ void *dap_worker_thread(void *arg) } #endif } else { - getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_error, &l_error_len); - if(l_error == EINPROGRESS) { + l_error_len = sizeof(l_errno); + + getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_errno, &l_error_len); + if(l_errno == EINPROGRESS) { log_it(L_DEBUG, "Connecting with %s in progress...", l_cur->remote_addr_str ? l_cur->remote_addr_str: "(NULL)"); - }else if (l_error){ - strerror_r(l_error, l_error_buf, sizeof (l_error_buf)); + }else if (l_errno){ + strerror_r(l_errno, l_error_buf, sizeof (l_error_buf)); log_it(L_ERROR,"Connecting error with %s: \"%s\" (code %d)", l_cur->remote_addr_str ? l_cur->remote_addr_str: "(NULL)", - l_error_buf, l_error); + l_error_buf, l_errno); if ( l_cur->callbacks.error_callback ) - l_cur->callbacks.error_callback(l_cur, l_error); + l_cur->callbacks.error_callback(l_cur, l_errno); }else{ if(g_debug_reactor) log_it(L_NOTICE, "Connected with %s",l_cur->remote_addr_str ? l_cur->remote_addr_str: "(NULL)"); @@ -634,39 +631,30 @@ void *dap_worker_thread(void *arg) } } - // Socket is ready to write and not going to close - if( ( l_flag_write&&(l_cur->flags & DAP_SOCK_READY_TO_WRITE) ) || - ( (l_cur->flags & DAP_SOCK_READY_TO_WRITE) && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) ) ) { - if(g_debug_reactor) - log_it(L_DEBUG, "Main loop output: %zu bytes to send", l_cur->buf_out_size); + /* + * Socket is ready to write and not going to close + */ + if ( !l_cur->buf_out_size ) /* Check firstly that output buffer is not empty */ + { + dap_events_socket_set_writable_unsafe(l_cur, false); /* Clear "enable write flag" */ - if(l_cur->callbacks.write_callback) - l_cur->callbacks.write_callback(l_cur, NULL); // Call callback to process write event + if ( l_cur->callbacks.write_finished_callback ) /* Optionaly call I/O completion routine */ + l_cur->callbacks.write_finished_callback(l_cur, l_worker, l_errno); - if ( l_cur->worker ){ // esocket wasn't unassigned in callback, we need some other ops with it - if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) { + l_flag_write = 0; /* Clear flag to exclude unecessary processing of output */ + } - static const uint32_t buf_out_zero_count_max = 2; - //l_cur->buf_out[l_cur->buf_out_size] = 0; + l_bytes_sent = 0; - if(!l_cur->buf_out_size) { + if ( ( l_flag_write && (l_cur->flags & DAP_SOCK_READY_TO_WRITE) ) || + ( (l_cur->flags & DAP_SOCK_READY_TO_WRITE) && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) ) ) { - //log_it(L_WARNING, "Output: nothing to send. Why we are in write socket set?"); - l_cur->buf_out_zero_count++; + debug_if (g_debug_reactor, L_DEBUG, "Main loop output: %zu bytes to send", l_cur->buf_out_size); - if(l_cur->buf_out_zero_count > buf_out_zero_count_max) { // How many time buf_out on write event could be empty - //log_it(L_WARNING, "Output: nothing to send %u times, remove socket from the write set", - // buf_out_zero_count_max); - dap_events_socket_set_writable_unsafe(l_cur, false); - } - } - else - l_cur->buf_out_zero_count = 0; - } - //for(total_sent = 0; total_sent < cur->buf_out_size;) { // If after callback there is smth to send - we do it - ssize_t l_bytes_sent =0; - int l_errno=0; - //if(l_cur->buf_out_size){ + if(l_cur->callbacks.write_callback) + l_cur->callbacks.write_callback(l_cur, NULL); /* Call callback to process write event */ + + if ( l_cur->worker ){ // esocket wasn't unassigned in callback, we need some other ops with it switch (l_cur->type){ case DESCRIPTOR_TYPE_SOCKET_CLIENT: { l_bytes_sent = send(l_cur->socket, (const char *)l_cur->buf_out, @@ -777,10 +765,6 @@ void *dap_worker_thread(void *arg) log_it(L_WARNING, "Socket %"DAP_FORMAT_SOCKET" is not SOCKET, PIPE or FILE but has WRITE state on. Switching it off", l_cur->socket); dap_events_socket_set_writable_unsafe(l_cur,false); } - //else{ // If buffer for sending was empty we should drop off write flag - // l_bytes_sent = -1; - // l_errno = EINVAL; - //} if(l_bytes_sent < 0) { #ifdef DAP_OS_WINDOWS @@ -823,12 +807,20 @@ void *dap_worker_thread(void *arg) } } } + + /* + * If whole buffer has been sent (or it was clrered) - clear "write flag" for socket/file descriptor to prevent + * generation of unexpected I/O events like POLLOUT and consuming CPU by this. + */ + if ( (l_cur->buf_out_size ) || (l_bytes_sent == l_cur->buf_out_size) ) + { + dap_events_socket_set_writable_unsafe(l_cur, false);/* Clear "enable write flag" */ + + if ( l_cur->callbacks.write_finished_callback ) /* Optionaly call I/O completion routine */ + l_cur->callbacks.write_finished_callback(l_cur, l_worker, l_errno); + } } - if (l_cur->buf_out_size) { - dap_events_socket_set_writable_unsafe(l_cur,true); - }else if( l_cur->flags & DAP_SOCK_DROP_WRITE_IF_ZERO){ - dap_events_socket_set_writable_unsafe(l_cur,false); - } + if (l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) { diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 051e39b276642b601c956341ca8ac64a40a523dc..7df579ff05e84863b5658c5d1200601250c46752 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -101,12 +101,12 @@ typedef int SOCKET; #endif #define BIT( x ) ( 1 << x ) -#define DAP_SOCK_READY_TO_READ BIT( 0 ) -#define DAP_SOCK_READY_TO_WRITE BIT( 1 ) -#define DAP_SOCK_SIGNAL_CLOSE BIT( 2 ) -#define DAP_SOCK_CONNECTING BIT( 3 ) // When connection happens this flag is armed for outgoing connections until its establish the connection -#define DAP_SOCK_REASSIGN_ONCE BIT( 4 ) // This usable for FlowControl to prevent multiple reassigment -#define DAP_SOCK_DROP_WRITE_IF_ZERO BIT( 5 ) // Drop down WRITE flag from socket if reach zero bytes in output buffer +#define DAP_SOCK_READY_TO_READ BIT( 0 ) +#define DAP_SOCK_READY_TO_WRITE BIT( 1 ) +#define DAP_SOCK_SIGNAL_CLOSE BIT( 2 ) +#define DAP_SOCK_CONNECTING BIT( 3 ) // When connection happens this flag is armed for outgoing connections until its establish the connection +#define DAP_SOCK_REASSIGN_ONCE BIT( 4 ) // This usable for FlowControl to prevent multiple reassigment +//#define DAP_SOCK_DROP_WRITE_IF_ZERO BIT( 5 ) // Drop down WRITE flag from socket if reach zero bytes in output buffer // If set - queue limited to sizeof(void*) size of data transmitted #define DAP_SOCK_QUEUE_PTR BIT( 8 ) @@ -128,30 +128,35 @@ typedef void (*dap_events_socket_callback_accept_t) (dap_events_socket_t * , SOC typedef void (*dap_events_socket_callback_connected_t) (dap_events_socket_t * ); // Callback for connected client connection typedef void (*dap_events_socket_worker_callback_t) (dap_events_socket_t *,dap_worker_t * ); // Callback for specific client operations + + /* A callback routine is supposed to be called on completion I/O */ +typedef void (*dap_events_socket_worker_complete_io_t) (dap_events_socket_t *, dap_worker_t *, int a_errno); + typedef struct dap_events_socket_callbacks { union{ // Specific callbacks - dap_events_socket_callback_connected_t connected_callback; // Connected callback for client socket - dap_events_socket_callback_accept_t accept_callback; // Accept callback for listening socket - dap_events_socket_callback_event_t event_callback; // Event callback for listening socket - dap_events_socket_callback_queue_t queue_callback; // Queue callback for listening socket - dap_events_socket_callback_queue_ptr_t queue_ptr_callback; // queue_ptr callback for listening socket + dap_events_socket_callback_connected_t connected_callback; /* Connected callback for client socket */ + dap_events_socket_callback_accept_t accept_callback; /* Accept callback for listening socket */ + dap_events_socket_callback_event_t event_callback; /* Event callback for listening socket */ + dap_events_socket_callback_queue_t queue_callback; /* Queue callback for listening socket */ + dap_events_socket_callback_queue_ptr_t queue_ptr_callback; /* queue_ptr callback for listening socket */ }; - dap_events_socket_callback_timer_t timer_callback; // Timer callback for listening socket - dap_events_socket_callback_t new_callback; // Create new client callback - dap_events_socket_callback_t delete_callback; // Delete client callback - dap_events_socket_callback_t read_callback; // Read function - dap_events_socket_callback_t write_callback; // Write function - dap_events_socket_callback_error_t error_callback; // Error processing function + dap_events_socket_callback_timer_t timer_callback; /* Timer callback for listening socket */ + dap_events_socket_callback_t new_callback; /* Create new client callback */ + dap_events_socket_callback_t delete_callback; /* Delete client callback */ + dap_events_socket_callback_t read_callback; /* Read function */ + dap_events_socket_callback_t write_callback; /* Write function */ + dap_events_socket_worker_complete_io_t write_finished_callback; /* Called on completion Write operation */ + dap_events_socket_callback_error_t error_callback; /* Error processing function */ - dap_events_socket_worker_callback_t worker_assign_callback; // After successful worker assign - dap_events_socket_worker_callback_t worker_unassign_callback; // After successful worker unassign + dap_events_socket_worker_callback_t worker_assign_callback; /* After successful worker assign */ + dap_events_socket_worker_callback_t worker_unassign_callback; /* After successful worker unassign */ } dap_events_socket_callbacks_t; -#define DAP_STREAM_PKT_SIZE_MAX 1 * 1024 * 1024 +#define DAP_STREAM_PKT_SIZE_MAX (1 * 1024 * 1024) #define DAP_EVENTS_SOCKET_BUF DAP_STREAM_PKT_SIZE_MAX -#define DAP_EVENTS_SOCKET_BUF_LIMIT DAP_STREAM_PKT_SIZE_MAX * 4 +#define DAP_EVENTS_SOCKET_BUF_LIMIT (DAP_STREAM_PKT_SIZE_MAX * 4) #define DAP_QUEUE_MAX_MSGS 8 typedef enum { @@ -217,31 +222,23 @@ typedef struct dap_events_socket { uint32_t buf_out_zero_count; // Input section - //uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data - //char buf_in_str[DAP_EVENTS_SOCKET_BUF+1]; byte_t *buf_in; size_t buf_in_size_max; // size of alloced buffer //char *buf_in_str; size_t buf_in_size; // size of data that is in the input buffer // Output section - - //byte_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data byte_t *buf_out; size_t buf_out_size; // size of data that is in the output buffer size_t buf_out_size_max; // max size of data dap_events_socket_t * pipe_out; // Pipe socket with data for output // Stored string representation - //char hostaddr[1024]; // Address - //char service[128]; char *hostaddr; char *service; // Remote address, port and others struct sockaddr_in remote_addr; - //char remote_addr_str[INET_ADDRSTRLEN]; - //char remote_addr_str6[INET6_ADDRSTRLEN]; char *remote_addr_str; char *remote_addr_str6; short remote_port; diff --git a/modules/net/dap_chain_node_dns_client.c b/modules/net/dap_chain_node_dns_client.c index be23b9dfe18c177c864cbbcd29cdf54d24904f09..1122be737e9606639e4952727a6ea6d224ad2523 100644 --- a/modules/net/dap_chain_node_dns_client.c +++ b/modules/net/dap_chain_node_dns_client.c @@ -260,7 +260,7 @@ int dap_chain_node_info_dns_request(struct in_addr a_addr, uint16_t a_port, char l_esocket_callbacks.error_callback = s_dns_client_esocket_error_callback; // Error processing function dap_events_socket_t * l_esocket = dap_events_socket_create(DESCRIPTOR_TYPE_SOCKET_UDP,&l_esocket_callbacks); - l_esocket->flags |= DAP_SOCK_DROP_WRITE_IF_ZERO; + // l_esocket->flags |= DAP_SOCK_DROP_WRITE_IF_ZERO; l_esocket->remote_addr.sin_family = AF_INET; l_esocket->remote_addr.sin_port = htons(a_port); l_esocket->remote_addr.sin_addr = a_addr;