From e6a5b13fd57c9081cd847bd812673cd5abbda083 Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net> Date: Fri, 18 Dec 2020 16:14:10 +0700 Subject: [PATCH] [*] Fixed compress poll on --- CMakeLists.txt | 2 +- dap-sdk/net/core/dap_events_socket.c | 12 ++++++++---- dap-sdk/net/core/dap_proc_thread.c | 17 ++++++++--------- dap-sdk/net/core/dap_server.c | 6 +++++- dap-sdk/net/core/dap_worker.c | 9 +++++---- 5 files changed, 27 insertions(+), 19 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8a9324203a..aad8beb3ca 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.6-83") +set(CELLFRAME_SDK_NATIVE_VERSION "2.6-84") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 9ea9132341..e80282157b 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -132,6 +132,8 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, assert(a_callbacks); dap_events_socket_t *ret = DAP_NEW_Z( dap_events_socket_t ); + if (!ret) + return NULL; ret->socket = a_sock; ret->events = a_events; @@ -335,7 +337,7 @@ dap_events_socket_t * dap_events_socket_queue_ptr_create_input(dap_events_socket { dap_events_socket_t * l_es = DAP_NEW_Z(dap_events_socket_t); l_es->type = DESCRIPTOR_TYPE_QUEUE; - l_es->buf_out_size_max = l_es->buf_in_size_max = 8 * sizeof(void*); + l_es->buf_out_size_max = 8 * sizeof(void*); l_es->buf_out = DAP_NEW_Z_SIZE(byte_t,l_es->buf_out_size_max ); l_es->buf_in_size_max = 8 * sizeof(void*); l_es->buf_in = DAP_NEW_Z_SIZE(byte_t,l_es->buf_in_size_max ); @@ -722,13 +724,13 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) #endif } else { #ifdef DAP_OS_WINDOWS - int l_read = dap_recvfrom(a_esocket->socket, a_esocket->buf_in, DAP_EVENTS_SOCKET_BUF); + int l_read = dap_recvfrom(a_esocket->socket, a_esocket->buf_in, a_esocket->buf_in_size_max); if (l_read == SOCKET_ERROR) { log_it(L_ERROR, "Queue socket %d received invalid data, error %d", a_esocket->socket, WSAGetLastError()); return -1; } #else - size_t l_read = read(a_esocket->socket, a_esocket->buf_in,sizeof(a_esocket->buf_in)); + size_t l_read = read(a_esocket->socket, a_esocket->buf_in, a_esocket->buf_in_size_max ); #endif a_esocket->callbacks.queue_callback(a_esocket, a_esocket->buf_in, l_read); } @@ -1386,6 +1388,7 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap #elif defined (DAP_EVENTS_CAPS_POLL) if (a_es->poll_index < a_worker->poll_count ){ a_worker->poll[a_es->poll_index].fd = -1; + a_worker->poll_compress = true; }else{ log_it(L_ERROR, "Wrong poll index when remove from worker (unsafe): %u when total count %u", a_es->poll_index, a_worker->poll_count); } @@ -1492,7 +1495,8 @@ size_t dap_events_socket_write_inter(dap_events_socket_t * a_es_input, dap_event l_msg->data = DAP_NEW_SIZE(void,a_data_size); l_msg->data_size = a_data_size; l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; - memcpy( l_msg->data, a_data, a_data_size); + if( a_data) + memcpy( l_msg->data, a_data, a_data_size); int l_ret= dap_events_socket_queue_ptr_send_to_input( a_es_input, l_msg ); if (l_ret!=0){ diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 761c375018..a2b2806cec 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -487,8 +487,9 @@ static void * s_proc_thread_function(void * a_arg) l_bytes_sent = mq_send(l_cur->mqd, (const char *) l_cur->buf_out, sizeof (void *),0); if (l_bytes_sent==0) l_bytes_sent = sizeof (void *); - if (l_bytes_sent == -1 && l_errno == EINVAL) // To make compatible with other + if (l_bytes_sent == -1 && errno == EINVAL) // To make compatible with other l_errno = EAGAIN; // non-blocking sockets + #else #error "Not implemented dap_events_socket_queue_ptr_send() for this platform" #endif @@ -555,14 +556,12 @@ static void * s_proc_thread_function(void * a_arg) l_poll_compress = false; for (size_t i = 0; i < l_thread->poll_count ; i++) { if ( l_thread->poll[i].fd == -1){ - if ( l_thread->poll_count){ - for(size_t j = i; j < l_thread->poll_count-1; j++){ - l_thread->poll[j].fd = l_thread->poll[j+1].fd; - l_thread->esockets[j] = l_thread->esockets[j+1]; - if(l_thread->esockets[j]) - l_thread->esockets[j]->poll_index = j; - } - } + for(size_t j = i; j +1 < l_thread->poll_count; j++){ + l_thread->poll[j].fd = l_thread->poll[j+1].fd; + l_thread->esockets[j] = l_thread->esockets[j+1]; + if(l_thread->esockets[j]) + l_thread->esockets[j]->poll_index = j; + } i--; l_thread->poll_count--; } diff --git a/dap-sdk/net/core/dap_server.c b/dap-sdk/net/core/dap_server.c index ca9101f7ae..f2e9409fa8 100644 --- a/dap-sdk/net/core/dap_server.c +++ b/dap-sdk/net/core/dap_server.c @@ -295,7 +295,11 @@ static void s_es_server_accept(dap_events_socket_t *a_es, int a_remote_socket, s getnameinfo(a_remote_addr,a_remote_addr_size, l_es_new->hostaddr ,256, l_es_new->service,sizeof(l_es_new->service), NI_NUMERICHOST | NI_NUMERICSERV); - + if (!l_es_new->hostaddr){ + struct in_addr l_addr_remote; + l_addr_remote.s_addr = ((struct sockaddr_in *) a_remote_addr)->sin_addr.s_addr; + inet_ntop(AF_INET,&l_addr_remote,l_es_new->hostaddr,sizeof (l_addr_remote) ); + } log_it(L_INFO,"Connection accepted from %s (%s)", l_es_new->hostaddr, l_es_new->service ); dap_worker_add_events_socket_auto(l_es_new); } diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 5ed174b607..5d9b8d68f4 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -158,6 +158,7 @@ void *dap_worker_thread(void *arg) char l_errbuf[128]; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); log_it(L_ERROR, "Worker thread %d got errno:\"%s\" (%d)", l_worker->id, l_errbuf, l_errno); + assert_perror(l_errno); #endif break; } @@ -429,14 +430,14 @@ void *dap_worker_thread(void *arg) char l_error_buf[128]; l_error_buf[0]='\0'; getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_error, &l_error_len); - if (l_error){ + if(l_error == EINPROGRESS) { + log_it(L_DEBUG, "Connecting with %s in progress...", l_cur->remote_addr_str ? l_cur->remote_addr_str: "(NULL)"); + }else if (l_error){ strerror_r(l_error, l_error_buf, sizeof (l_error_buf)); log_it(L_ERROR,"Connecting error with %s: \"%s\" (code %d)", l_cur->remote_addr_str ? l_cur->remote_addr_str: "(NULL)", l_error_buf, l_error); if ( l_cur->callbacks.error_callback ) l_cur->callbacks.error_callback(l_cur, l_error); - }else if(l_error == EINPROGRESS) { - log_it(L_DEBUG, "Connecting with %s in progress...", l_cur->remote_addr_str ? l_cur->remote_addr_str: "(NULL)"); }else{ if(s_debug_reactor) log_it(L_NOTICE, "Connected with %s",l_cur->remote_addr_str ? l_cur->remote_addr_str: "(NULL)"); @@ -478,7 +479,7 @@ void *dap_worker_thread(void *arg) } //for(total_sent = 0; total_sent < cur->buf_out_size;) { // If after callback there is smth to send - we do it ssize_t l_bytes_sent =0; - int l_errno; + int l_errno=0; switch (l_cur->type){ case DESCRIPTOR_TYPE_SOCKET: { -- GitLab