From 600c440c144d6bde302c68aa16f5e827a644d73c Mon Sep 17 00:00:00 2001 From: Dmitry Gerasimov <dmitriy.gerasimov@demlabs.net> Date: Thu, 28 Apr 2022 16:01:47 +0700 Subject: [PATCH] [+] Added new flag DAP_SOCK_DROP_WRITE_IF_ZERO and implemented for DNS requests --- dap-sdk/net/core/dap_worker.c | 190 ++++++++++--------- dap-sdk/net/core/include/dap_events_socket.h | 1 + modules/net/dap_chain_node_dns_client.c | 2 +- 3 files changed, 100 insertions(+), 93 deletions(-) diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 1cb91a384c..0365511d9e 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -666,116 +666,120 @@ void *dap_worker_thread(void *arg) //for(total_sent = 0; total_sent < cur->buf_out_size;) { // If after callback there is smth to send - we do it ssize_t l_bytes_sent =0; int l_errno=0; - - switch (l_cur->type){ - case DESCRIPTOR_TYPE_SOCKET_CLIENT: { - l_bytes_sent = send(l_cur->socket, (const char *)l_cur->buf_out, - l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL); + if(l_cur->buf_out_size){ + switch (l_cur->type){ + case DESCRIPTOR_TYPE_SOCKET_CLIENT: { + l_bytes_sent = send(l_cur->socket, (const char *)l_cur->buf_out, + l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL); #ifdef DAP_OS_WINDOWS - //dap_events_socket_set_writable_unsafe(l_cur,false); // enabling this will break windows server replies - l_errno = WSAGetLastError(); + //dap_events_socket_set_writable_unsafe(l_cur,false); // enabling this will break windows server replies + l_errno = WSAGetLastError(); #else - l_errno = errno; + l_errno = errno; #endif - } - break; - case DESCRIPTOR_TYPE_SOCKET_UDP: - l_bytes_sent = sendto(l_cur->socket, (const char *)l_cur->buf_out, - l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL, - (struct sockaddr *)&l_cur->remote_addr, sizeof(l_cur->remote_addr)); + } + break; + case DESCRIPTOR_TYPE_SOCKET_UDP: + l_bytes_sent = sendto(l_cur->socket, (const char *)l_cur->buf_out, + l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL, + (struct sockaddr *)&l_cur->remote_addr, sizeof(l_cur->remote_addr)); #ifdef DAP_OS_WINDOWS - dap_events_socket_set_writable_unsafe(l_cur,false); - l_errno = WSAGetLastError(); + dap_events_socket_set_writable_unsafe(l_cur,false); + l_errno = WSAGetLastError(); #else - l_errno = errno; + l_errno = errno; #endif - break; - case DESCRIPTOR_TYPE_SOCKET_CLIENT_SSL: { + break; + case DESCRIPTOR_TYPE_SOCKET_CLIENT_SSL: { #ifndef DAP_NET_CLIENT_NO_SSL - WOLFSSL *l_ssl = SSL(l_cur); - l_bytes_sent = wolfSSL_write(l_ssl, (char *)(l_cur->buf_out), l_cur->buf_out_size); - if (l_bytes_sent > 0) - log_it(L_DEBUG, "SSL write: %s", (char *)(l_cur->buf_out)); - l_errno = wolfSSL_get_error(l_ssl, 0); + WOLFSSL *l_ssl = SSL(l_cur); + l_bytes_sent = wolfSSL_write(l_ssl, (char *)(l_cur->buf_out), l_cur->buf_out_size); + if (l_bytes_sent > 0) + log_it(L_DEBUG, "SSL write: %s", (char *)(l_cur->buf_out)); + l_errno = wolfSSL_get_error(l_ssl, 0); #endif - } - case DESCRIPTOR_TYPE_QUEUE: - if (l_cur->flags & DAP_SOCK_QUEUE_PTR && l_cur->buf_out_size>= sizeof (void*)){ + } + case DESCRIPTOR_TYPE_QUEUE: + if (l_cur->flags & DAP_SOCK_QUEUE_PTR && l_cur->buf_out_size>= sizeof (void*)){ #if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) - l_bytes_sent = write(l_cur->socket, l_cur->buf_out, sizeof (void *) ); // We send pointer by pointer + l_bytes_sent = write(l_cur->socket, l_cur->buf_out, sizeof (void *) ); // We send pointer by pointer #elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX) - l_bytes_sent = mq_send(a_es->mqd, (const char *)&a_arg,sizeof (a_arg),0); + l_bytes_sent = mq_send(a_es->mqd, (const char *)&a_arg,sizeof (a_arg),0); #elif defined DAP_EVENTS_CAPS_MSMQ - DWORD l_mp_id = 0; - MQMSGPROPS l_mps; - MQPROPVARIANT l_mpvar[1]; - MSGPROPID l_p_id[1]; - HRESULT l_mstatus[1]; - - l_p_id[l_mp_id] = PROPID_M_BODY; - l_mpvar[l_mp_id].vt = VT_VECTOR | VT_UI1; - l_mpvar[l_mp_id].caub.pElems = l_cur->buf_out; - l_mpvar[l_mp_id].caub.cElems = (u_long)sizeof(void*); - l_mp_id++; - - l_mps.cProp = l_mp_id; - l_mps.aPropID = l_p_id; - l_mps.aPropVar = l_mpvar; - l_mps.aStatus = l_mstatus; - HRESULT hr = MQSendMessage(l_cur->mqh, &l_mps, MQ_NO_TRANSACTION); - - if (hr != MQ_OK) { - l_errno = hr; - log_it(L_ERROR, "An error occured on sending message to queue, errno: %ld", hr); - break; - } else { - l_errno = WSAGetLastError(); - - if(dap_sendto(l_cur->socket, l_cur->port, NULL, 0) == SOCKET_ERROR) { - log_it(L_ERROR, "Write to socket error: %d", WSAGetLastError()); - } - l_bytes_sent = sizeof(void*); - dap_events_socket_set_writable_unsafe(l_cur,false); + DWORD l_mp_id = 0; + MQMSGPROPS l_mps; + MQPROPVARIANT l_mpvar[1]; + MSGPROPID l_p_id[1]; + HRESULT l_mstatus[1]; + + l_p_id[l_mp_id] = PROPID_M_BODY; + l_mpvar[l_mp_id].vt = VT_VECTOR | VT_UI1; + l_mpvar[l_mp_id].caub.pElems = l_cur->buf_out; + l_mpvar[l_mp_id].caub.cElems = (u_long)sizeof(void*); + l_mp_id++; + + l_mps.cProp = l_mp_id; + l_mps.aPropID = l_p_id; + l_mps.aPropVar = l_mpvar; + l_mps.aStatus = l_mstatus; + HRESULT hr = MQSendMessage(l_cur->mqh, &l_mps, MQ_NO_TRANSACTION); + + if (hr != MQ_OK) { + l_errno = hr; + log_it(L_ERROR, "An error occured on sending message to queue, errno: %ld", hr); + break; + } else { + l_errno = WSAGetLastError(); + + if(dap_sendto(l_cur->socket, l_cur->port, NULL, 0) == SOCKET_ERROR) { + log_it(L_ERROR, "Write to socket error: %d", WSAGetLastError()); + } + l_bytes_sent = sizeof(void*); + dap_events_socket_set_writable_unsafe(l_cur,false); - } + } #elif defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE) - l_bytes_sent = mq_send(l_cur->mqd , (const char *)l_cur->buf_out,sizeof (void*),0); - if(l_bytes_sent == 0) - l_bytes_sent = sizeof (void*); - l_errno = errno; - if (l_bytes_sent == -1 && l_errno == EINVAL) // To make compatible with other - l_errno = EAGAIN; // non-blocking sockets -#elif defined (DAP_EVENTS_CAPS_KQUEUE) - struct kevent* l_event=&l_cur->kqueue_event; - dap_events_socket_w_data_t * l_es_w_data = DAP_NEW_Z(dap_events_socket_w_data_t); - l_es_w_data->esocket = l_cur; - memcpy(&l_es_w_data->ptr, l_cur->buf_out,sizeof(l_cur)); - EV_SET(l_event,l_cur->socket, l_cur->kqueue_base_filter,l_cur->kqueue_base_flags, l_cur->kqueue_base_fflags,l_cur->kqueue_data, l_es_w_data); - int l_n = kevent(l_worker->kqueue_fd,l_event,1,NULL,0,NULL); - if (l_n == 1){ - l_bytes_sent = sizeof(l_cur); - }else{ + l_bytes_sent = mq_send(l_cur->mqd , (const char *)l_cur->buf_out,sizeof (void*),0); + if(l_bytes_sent == 0) + l_bytes_sent = sizeof (void*); l_errno = errno; - log_it(L_WARNING,"queue ptr send error: kevent %p errno: %d", l_es_w_data, l_errno); - DAP_DELETE(l_es_w_data); - } + if (l_bytes_sent == -1 && l_errno == EINVAL) // To make compatible with other + l_errno = EAGAIN; // non-blocking sockets +#elif defined (DAP_EVENTS_CAPS_KQUEUE) + struct kevent* l_event=&l_cur->kqueue_event; + dap_events_socket_w_data_t * l_es_w_data = DAP_NEW_Z(dap_events_socket_w_data_t); + l_es_w_data->esocket = l_cur; + memcpy(&l_es_w_data->ptr, l_cur->buf_out,sizeof(l_cur)); + EV_SET(l_event,l_cur->socket, l_cur->kqueue_base_filter,l_cur->kqueue_base_flags, l_cur->kqueue_base_fflags,l_cur->kqueue_data, l_es_w_data); + int l_n = kevent(l_worker->kqueue_fd,l_event,1,NULL,0,NULL); + if (l_n == 1){ + l_bytes_sent = sizeof(l_cur); + }else{ + l_errno = errno; + log_it(L_WARNING,"queue ptr send error: kevent %p errno: %d", l_es_w_data, l_errno); + DAP_DELETE(l_es_w_data); + } #else #error "Not implemented dap_events_socket_queue_ptr_send() for this platform" #endif - }else{ - assert("Not implemented non-ptr queue send from outgoing buffer"); - // TODO Implement non-ptr queue output - } - break; - case DESCRIPTOR_TYPE_PIPE: - case DESCRIPTOR_TYPE_FILE: - l_bytes_sent = write(l_cur->fd, (char *) (l_cur->buf_out), l_cur->buf_out_size ); - l_errno = errno; - break; - default: - log_it(L_WARNING, "Socket %"DAP_FORMAT_SOCKET" is not SOCKET, PIPE or FILE but has WRITE state on. Switching it off", l_cur->socket); - dap_events_socket_set_writable_unsafe(l_cur,false); + }else{ + assert("Not implemented non-ptr queue send from outgoing buffer"); + // TODO Implement non-ptr queue output + } + break; + case DESCRIPTOR_TYPE_PIPE: + case DESCRIPTOR_TYPE_FILE: + l_bytes_sent = write(l_cur->fd, (char *) (l_cur->buf_out), l_cur->buf_out_size ); + l_errno = errno; + break; + default: + log_it(L_WARNING, "Socket %"DAP_FORMAT_SOCKET" is not SOCKET, PIPE or FILE but has WRITE state on. Switching it off", l_cur->socket); + dap_events_socket_set_writable_unsafe(l_cur,false); + } + }else{ // If buffer for sending was empty we should drop off write flag + l_bytes_sent = -1; + l_errno = EINVAL; } if(l_bytes_sent < 0) { @@ -819,6 +823,8 @@ void *dap_worker_thread(void *arg) } if (l_cur->buf_out_size) { dap_events_socket_set_writable_unsafe(l_cur,true); + }else if( l_cur->flags & DAP_SOCK_DROP_WRITE_IF_ZERO){ + dap_events_socket_set_writable_unsafe(l_cur,false); } if (l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 9217bf2ccd..051e39b276 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -106,6 +106,7 @@ typedef int SOCKET; #define DAP_SOCK_SIGNAL_CLOSE BIT( 2 ) #define DAP_SOCK_CONNECTING BIT( 3 ) // When connection happens this flag is armed for outgoing connections until its establish the connection #define DAP_SOCK_REASSIGN_ONCE BIT( 4 ) // This usable for FlowControl to prevent multiple reassigment +#define DAP_SOCK_DROP_WRITE_IF_ZERO BIT( 5 ) // Drop down WRITE flag from socket if reach zero bytes in output buffer // If set - queue limited to sizeof(void*) size of data transmitted #define DAP_SOCK_QUEUE_PTR BIT( 8 ) diff --git a/modules/net/dap_chain_node_dns_client.c b/modules/net/dap_chain_node_dns_client.c index 419841a1c9..be23b9dfe1 100644 --- a/modules/net/dap_chain_node_dns_client.c +++ b/modules/net/dap_chain_node_dns_client.c @@ -260,7 +260,7 @@ int dap_chain_node_info_dns_request(struct in_addr a_addr, uint16_t a_port, char l_esocket_callbacks.error_callback = s_dns_client_esocket_error_callback; // Error processing function dap_events_socket_t * l_esocket = dap_events_socket_create(DESCRIPTOR_TYPE_SOCKET_UDP,&l_esocket_callbacks); - l_esocket->flags |= DAP_SOCK_READY_TO_WRITE; + l_esocket->flags |= DAP_SOCK_DROP_WRITE_IF_ZERO; l_esocket->remote_addr.sin_family = AF_INET; l_esocket->remote_addr.sin_port = htons(a_port); l_esocket->remote_addr.sin_addr = a_addr; -- GitLab