From 656b55dfa19f2772e72ac141199d7b9e0909867f Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Fri, 13 May 2022 09:25:12 +0000 Subject: [PATCH] bugs-6239 Former-commit-id: 6f26af5dd97d774c8eecb302a35a0327735fbf65 --- cmake/OS_Detection.cmake | 2 +- dap-sdk/net/client/dap_client_http.c | 2 +- dap-sdk/net/client/dap_client_pvt.c | 23 +-- dap-sdk/net/core/dap_events_socket.c | 64 ++++----- dap-sdk/net/core/dap_proc_thread.c | 27 ++-- dap-sdk/net/core/dap_worker.c | 59 ++++---- dap-sdk/net/core/include/dap_events_socket.h | 14 +- .../net/server/http_server/dap_http_simple.c | 51 +++---- .../http_server/http_client/dap_http_client.c | 22 +-- dap-sdk/net/stream/stream/dap_stream_pkt.c | 5 +- .../stream/stream/include/dap_stream_pkt.h | 2 + modules/chain/dap_chain.c | 6 +- modules/chain/dap_chain_ledger.c | 4 +- modules/channel/chain/dap_stream_ch_chain.c | 136 ++++++++++++------ .../chain/include/dap_stream_ch_chain_pkt.h | 1 + .../block-ton/dap_chain_cs_block_ton.c | 2 +- .../dap_chain_global_db_driver_cdb.c | 17 ++- modules/net/srv/dap_chain_net_srv_order.c | 9 +- modules/service/vpn/dap_chain_net_srv_vpn.c | 2 +- .../xchange/dap_chain_net_srv_xchange.c | 6 +- .../include/dap_chain_net_srv_xchange.h | 2 + modules/type/dag/dap_chain_cs_dag.c | 4 +- 22 files changed, 232 insertions(+), 228 deletions(-) diff --git a/cmake/OS_Detection.cmake b/cmake/OS_Detection.cmake index 18dc069732..9a1ceddf89 100644 --- a/cmake/OS_Detection.cmake +++ b/cmake/OS_Detection.cmake @@ -84,7 +84,7 @@ if(UNIX) if (LINUX) if(DAP_DEBUG) - set(_CCOPT "-DDAP_DEBUG -Wall -Wno-unused-command-line-argument -Wno-deprecated-declarations -Wno-unused-local-typedefs -Wno-unused-function -Wno-implicit-fallthrough -Wno-unused-variable -Wno-unused-parameter -Wno-unused-but-set-variable -pg -g3 -ggdb -fno-eliminate-unused-debug-symbols -fno-strict-aliasing") + set(_CCOPT "-DDAP_DEBUG -Wall -Wno-unused-command-line-argument -Wno-deprecated-declarations -Wno-unused-local-typedefs -Wno-unused-function -Wno-implicit-fallthrough -Wno-unused-variable -Wno-unused-parameter -Wno-unused-but-set-variable -pg -g3 -ggdb -fno-eliminate-unused-debug-symbols -fno-strict-aliasing") set(_LOPT "-pg") SET(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -pg") else() diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index 7567d22866..562f89d634 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -192,7 +192,7 @@ static bool s_timer_timeout_after_connected_check(void * a_arg) dap_client_http_pvt_t * l_http_pvt = PVT(l_es); assert(l_http_pvt); if ( time(NULL)- l_http_pvt->ts_last_read >= (time_t) s_client_timeout_read_after_connect_ms){ - log_it(L_WARNING,"Read after connect timeout for request http://%s:%u/%s, possible uplink is on heavy load or DPI between you", + log_it(L_WARNING, "Timeout for reading after connect for request http://%s:%u/%s, possible uplink is on heavy load or DPI between you", l_http_pvt->uplink_addr, l_http_pvt->uplink_port, l_http_pvt->path); if(l_http_pvt->error_callback) { l_http_pvt->error_callback(ETIMEDOUT, l_http_pvt->obj); diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 3ac7ef062f..75d8b06c0b 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -485,8 +485,8 @@ static bool s_stage_status_after(dap_client_pvt_t * a_client_pvt) };// a_client_pvt->stream_es = dap_events_socket_wrap_no_add(a_client_pvt->events, (int)a_client_pvt->stream_socket, &l_s_callbacks); - a_client_pvt->stream_es->flags |= DAP_SOCK_CONNECTING ; // To catch non-blocking error when connecting we should ar WRITE flag - a_client_pvt->stream_es->flags |= DAP_SOCK_READY_TO_WRITE; + a_client_pvt->stream_es->flags |= DAP_SOCK_CONNECTING ; // To catch non-blocking error when connecting we should up WRITE flag + //a_client_pvt->stream_es->flags |= DAP_SOCK_READY_TO_WRITE; a_client_pvt->stream_es->_inheritor = a_client_pvt; a_client_pvt->stream = dap_stream_new_es_client(a_client_pvt->stream_es); assert(a_client_pvt->stream); @@ -1317,25 +1317,14 @@ static void s_stream_es_callback_write(dap_events_socket_t * a_es, void * arg) return; switch (l_client_pvt->stage) { case STAGE_STREAM_STREAMING: { - size_t i; - bool ready_to_write = false; // log_it(DEBUG,"Process channels data output (%u channels)",STREAM(sh)->channel_count); - - for(i = 0; i < l_client_pvt->stream->channel_count; i++) { + for (size_t i = 0; i < l_client_pvt->stream->channel_count; i++) { dap_stream_ch_t * ch = l_client_pvt->stream->channel[i]; - if(ch->ready_to_write) { + if(ch->ready_to_write) ch->proc->packet_out_callback(ch, NULL); - ready_to_write |= ch->ready_to_write; - } } - //log_it(L_DEBUG,"stream_data_out (ready_to_write=%s)", ready_to_write?"true":"false"); - - dap_events_socket_set_writable_unsafe(l_client_pvt->stream_es, ready_to_write); - //log_it(ERROR,"No stream_data_write_callback is defined"); - } - break; - default: { - } + } break; + default: {} } } diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 549c451bf9..3a7aeded16 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -1116,29 +1116,25 @@ typedef struct dap_events_socket_buf_item static int wait_send_socket(SOCKET a_sockfd, long timeout_ms) { struct timeval l_tv; - fd_set l_outfd, l_errfd; - l_tv.tv_sec = timeout_ms / 1000; l_tv.tv_usec = (timeout_ms % 1000) * 1000; + fd_set l_outfd; FD_ZERO(&l_outfd); - FD_ZERO(&l_errfd); - FD_SET(a_sockfd, &l_errfd); FD_SET(a_sockfd, &l_outfd); - while(1) { + while (1) { #ifdef DAP_OS_WINDOWS - int l_res = select(1, NULL, &l_outfd, &l_errfd, &l_tv); + int l_res = select(1, NULL, &l_outfd, NULL, &l_tv); #else - int l_res = select(a_sockfd + 1, NULL, &l_outfd, &l_errfd, &l_tv); + int l_res = select(a_sockfd + 1, NULL, &l_outfd, NULL, &l_tv); #endif - if(l_res == 0){ - l_res = -2; + if (l_res == 0) { //log_it(L_DEBUG, "socket %d timed out", a_sockfd) - break; + return -2; } - if(l_res == -1) { - if(errno == EINTR) + if (l_res == -1) { + if (errno == EINTR) continue; log_it(L_DEBUG, "socket %"DAP_FORMAT_SOCKET" waiting errno=%d", a_sockfd, errno); return l_res; @@ -1146,7 +1142,7 @@ static int wait_send_socket(SOCKET a_sockfd, long timeout_ms) break; }; - if(FD_ISSET(a_sockfd, &l_outfd)) + if (FD_ISSET(a_sockfd, &l_outfd)) return 0; return -1; @@ -1159,27 +1155,28 @@ static int wait_send_socket(SOCKET a_sockfd, long timeout_ms) */ static void *dap_events_socket_buf_thread(void *arg) { - dap_events_socket_buf_item_t *l_item = (dap_events_socket_buf_item_t*) arg; - if(!l_item) { + dap_events_socket_buf_item_t *l_item = (dap_events_socket_buf_item_t *)arg; + if (!l_item) pthread_exit(0); - } int l_res = 0; int l_count = 0; - while(l_res < 1 && l_count < 3) { - // wait max 5 min -#ifdef DAP_OS_WINDOWS - log_it(L_INFO, "Wait 5 minutes"); - l_res = wait_send_socket(l_item->es->socket, 300000); -#else - l_res = wait_send_socket(l_item->es->fd2, 300000); + SOCKET l_sock = INVALID_SOCKET; + while (l_res < 1 && l_count++ < 3) { +#if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) + l_sock = l_item->es->fd2; +#elif defined(DAP_EVENTS_CAPS_QUEUE_MQUEUE) + l_sock = l_item->es->mqd; +#elif defined(DAP_EVENTS_CAPS_KQUEUE) +#error "Undefined waiting for KQUEUE CAPS" #endif - if (l_res == 0){ + // wait max 5 min + l_res = wait_send_socket(l_sock, 300000); + if (l_res == 0) { dap_events_socket_queue_ptr_send(l_item->es, l_item->arg); break; } - l_count++; } - if(l_res != 0) + if (l_res != 0) log_it(L_WARNING, "Lost data bulk in events socket buf thread"); DAP_DELETE(l_item); @@ -1708,21 +1705,10 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool return; } - if ( a_is_ready ) { + if ( a_is_ready ) a_esocket->flags |= DAP_SOCK_READY_TO_WRITE; -#ifdef DAP_EVENTS_CAPS_EPOLL - if (a_esocket->type == DESCRIPTOR_TYPE_QUEUE) - a_esocket->ev_base_flags |= EPOLLONESHOT; -#endif - } - else { + else a_esocket->flags ^= DAP_SOCK_READY_TO_WRITE; -#ifdef DAP_EVENTS_CAPS_EPOLL - if (a_esocket->type == DESCRIPTOR_TYPE_QUEUE) - a_esocket->ev_base_flags ^= EPOLLONESHOT; -#endif - } - #ifdef DAP_EVENTS_CAPS_EVENT_KEVENT if( a_esocket->type != DESCRIPTOR_TYPE_EVENT && diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 2d13300d4f..40e8193269 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -159,7 +159,7 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t __at { dap_proc_thread_t *l_thread; dap_proc_queue_item_t *l_item; -int l_rc, l_is_anybody_for_repeat, l_is_finished, l_iter_cnt, l_cur_pri; +int l_rc, l_is_anybody_in_queue, l_is_finished, l_iter_cnt, l_cur_pri; size_t l_size; dap_proc_queue_t *l_queue; @@ -171,12 +171,11 @@ dap_proc_queue_t *l_queue; return; } - l_iter_cnt = l_is_anybody_for_repeat = 0; + l_iter_cnt = l_is_anybody_in_queue = 0; /*@RRL: l_iter_cnt = DAP_QUE$K_ITER_NR; */ - l_cur_pri = (DAP_QUE$K_PRIMAX - 1); l_queue = l_thread->proc_queue; - for ( ; l_cur_pri; l_cur_pri--, l_iter_cnt++ ) /* Run from higest to lowest ... */ + for (l_cur_pri = (DAP_QUE$K_PRIMAX - 1); l_cur_pri; l_cur_pri--, l_iter_cnt++ ) /* Run from higest to lowest ... */ { if ( !l_queue->list[l_cur_pri].items.nr ) /* A lockless quick check */ continue; @@ -194,7 +193,6 @@ dap_proc_queue_t *l_queue; l_item->callback, l_item->callback_arg, l_cur_pri, l_iter_cnt); l_is_finished = l_item->callback(l_thread, l_item->callback_arg); - l_is_anybody_for_repeat++; debug_if (g_debug_reactor, L_INFO, "Proc event callback: %p/%p, prio=%d, iteration=%d - is %sfinished", l_item->callback, l_item->callback_arg, l_cur_pri, l_iter_cnt, l_is_finished ? "" : "not "); @@ -206,17 +204,16 @@ dap_proc_queue_t *l_queue; pthread_mutex_unlock(&l_queue->list[l_cur_pri].lock); } else { - DAP_DELETE(l_item); + DAP_DELETE(l_item); } + } + for (l_cur_pri = (DAP_QUE$K_PRIMAX - 1); l_cur_pri; l_cur_pri--) + l_is_anybody_in_queue += l_queue->list[l_cur_pri].items.nr; - l_is_anybody_for_repeat += (!l_is_finished); - - } - - if ( l_is_anybody_for_repeat ) /* Arm event if we have something to proc again */ + if ( l_is_anybody_in_queue ) /* Arm event if we have something to proc again */ dap_events_socket_event_signal(a_esocket, 1); - debug_if(g_debug_reactor, L_DEBUG, "<-- Proc event callback end, repeat flag is: %d, iterations: %d", l_is_anybody_for_repeat, l_iter_cnt); + debug_if(g_debug_reactor, L_DEBUG, "<-- Proc event callback end, items rest: %d, iterations: %d", l_is_anybody_in_queue, l_iter_cnt); } @@ -789,9 +786,6 @@ static void * s_proc_thread_function(void * a_arg) } #elif defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE) char * l_ptr = (char *) l_cur->buf_out; - void *l_ptr_in; - memcpy(&l_ptr_in,l_ptr, sizeof (l_ptr_in) ); - l_bytes_sent = mq_send(l_cur->mqd, l_ptr, sizeof (l_ptr),0); if (l_bytes_sent==0){ // log_it(L_DEBUG,"mq_send %p success", l_ptr_in); @@ -801,7 +795,7 @@ static void * s_proc_thread_function(void * a_arg) // log_it(L_DEBUG,"mq_send %p EAGAIN", l_ptr_in); }else{ l_errno = errno; - log_it(L_WARNING,"mq_send %p errno: %d", l_ptr_in, l_errno); + log_it(L_WARNING,"mq_send %p errno: %d", l_ptr, l_errno); } #elif defined (DAP_EVENTS_CAPS_KQUEUE) @@ -936,6 +930,7 @@ static void * s_proc_thread_function(void * a_arg) for (size_t n=0; n<dap_events_worker_get_count(); n++){ dap_events_socket_delete_unsafe(l_thread->queue_assign_input[n], false); dap_events_socket_delete_unsafe(l_thread->queue_io_input[n], false); + dap_events_socket_delete_unsafe(l_thread->queue_callback_input[n], false); } return NULL; diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index a5507db49f..ea3fec873b 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -210,7 +210,7 @@ void *dap_worker_thread(void *arg) } time_t l_cur_time = time( NULL); - for(size_t n = 0; n < l_sockets_max; n++) { + for(ssize_t n = 0; n < l_sockets_max; n++) { bool l_flag_hup, l_flag_rdhup, l_flag_read, l_flag_write, l_flag_error, l_flag_nval, l_flag_msg, l_flag_pri; #ifdef DAP_EVENTS_CAPS_EPOLL @@ -631,30 +631,27 @@ void *dap_worker_thread(void *arg) } } - /* - * Socket is ready to write and not going to close - */ - if ( !l_cur->buf_out_size ) /* Check firstly that output buffer is not empty */ - { - dap_events_socket_set_writable_unsafe(l_cur, false); /* Clear "enable write flag" */ - - if ( l_cur->callbacks.write_finished_callback ) /* Optionaly call I/O completion routine */ - l_cur->callbacks.write_finished_callback(l_cur, l_worker, l_errno); - - l_flag_write = 0; /* Clear flag to exclude unecessary processing of output */ - } - l_bytes_sent = 0; - if ( ( l_flag_write && (l_cur->flags & DAP_SOCK_READY_TO_WRITE) ) || - ( (l_cur->flags & DAP_SOCK_READY_TO_WRITE) && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) ) ) { - + if (l_flag_write && (l_cur->flags & DAP_SOCK_READY_TO_WRITE) && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE)) { debug_if (g_debug_reactor, L_DEBUG, "Main loop output: %zu bytes to send", l_cur->buf_out_size); + /* + * Socket is ready to write and not going to close + */ + if ( !l_cur->buf_out_size ) /* Check firstly that output buffer is not empty */ + { + dap_events_socket_set_writable_unsafe(l_cur, false); /* Clear "enable write flag" */ + + if ( l_cur->callbacks.write_finished_callback ) /* Optionaly call I/O completion routine */ + l_cur->callbacks.write_finished_callback(l_cur, l_cur->callbacks.arg, l_errno); - if(l_cur->callbacks.write_callback) + l_flag_write = 0; /* Clear flag to exclude unecessary processing of output */ + } + + if (l_cur->callbacks.write_callback) l_cur->callbacks.write_callback(l_cur, NULL); /* Call callback to process write event */ - if ( l_cur->worker ){ // esocket wasn't unassigned in callback, we need some other ops with it + if ( l_cur->worker && l_flag_write ){ // esocket wasn't unassigned in callback, we need some other ops with it switch (l_cur->type){ case DESCRIPTOR_TYPE_SOCKET_CLIENT: { l_bytes_sent = send(l_cur->socket, (const char *)l_cur->buf_out, @@ -723,7 +720,6 @@ void *dap_worker_thread(void *arg) log_it(L_ERROR, "Write to socket error: %d", WSAGetLastError()); } l_bytes_sent = sizeof(void*); - dap_events_socket_set_writable_unsafe(l_cur,false); } #elif defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE) @@ -799,6 +795,14 @@ void *dap_worker_thread(void *arg) l_cur->buf_out_size -= l_bytes_sent; if (l_cur->buf_out_size ) { memmove(l_cur->buf_out, &l_cur->buf_out[l_bytes_sent], l_cur->buf_out_size); + } else { + /* + * If whole buffer has been sent - clear "write flag" for socket/file descriptor to prevent + * generation of unexpected I/O events like POLLOUT and consuming CPU by this. + */ + dap_events_socket_set_writable_unsafe(l_cur, false);/* Clear "enable write flag" */ + if ( l_cur->callbacks.write_finished_callback ) /* Optionaly call I/O completion routine */ + l_cur->callbacks.write_finished_callback(l_cur, l_cur->callbacks.arg, l_errno); } }else{ log_it(L_ERROR, "Wrong bytes sent, %zd more then was in buffer %zd",l_bytes_sent, l_cur->buf_out_size); @@ -807,21 +811,8 @@ void *dap_worker_thread(void *arg) } } } - - /* - * If whole buffer has been sent (or it was clrered) - clear "write flag" for socket/file descriptor to prevent - * generation of unexpected I/O events like POLLOUT and consuming CPU by this. - */ - if ( (l_cur->buf_out_size ) || (l_bytes_sent == l_cur->buf_out_size) ) - { - dap_events_socket_set_writable_unsafe(l_cur, false);/* Clear "enable write flag" */ - - if ( l_cur->callbacks.write_finished_callback ) /* Optionaly call I/O completion routine */ - l_cur->callbacks.write_finished_callback(l_cur, l_worker, l_errno); - } } - if (l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) { if (l_cur->buf_out_size == 0) { @@ -830,7 +821,7 @@ void *dap_worker_thread(void *arg) l_cur->remote_addr_str ? l_cur->remote_addr_str : "", l_cur->socket, l_cur, l_cur->uuid, l_cur->type, l_tn); - for(size_t nn=n+1; nn<l_sockets_max; nn++){ // Check for current selection if it has event duplication + for (ssize_t nn = n + 1; nn < l_sockets_max; nn++) { // Check for current selection if it has event duplication dap_events_socket_t *l_es_selected = NULL; #ifdef DAP_EVENTS_CAPS_EPOLL l_es_selected = (dap_events_socket_t *) l_epoll_events[nn].data.ptr; diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 7df579ff05..3061b701f1 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -130,7 +130,7 @@ typedef void (*dap_events_socket_worker_callback_t) (dap_events_socket_t *,dap_w /* A callback routine is supposed to be called on completion I/O */ -typedef void (*dap_events_socket_worker_complete_io_t) (dap_events_socket_t *, dap_worker_t *, int a_errno); +typedef void (*dap_events_socket_worker_complete_io_t) (dap_events_socket_t *, void *, int a_errno); typedef struct dap_events_socket_callbacks { union{ // Specific callbacks @@ -152,12 +152,13 @@ typedef struct dap_events_socket_callbacks { dap_events_socket_worker_callback_t worker_assign_callback; /* After successful worker assign */ dap_events_socket_worker_callback_t worker_unassign_callback; /* After successful worker unassign */ + void *arg; /* Callbacks argument */ } dap_events_socket_callbacks_t; #define DAP_STREAM_PKT_SIZE_MAX (1 * 1024 * 1024) #define DAP_EVENTS_SOCKET_BUF DAP_STREAM_PKT_SIZE_MAX #define DAP_EVENTS_SOCKET_BUF_LIMIT (DAP_STREAM_PKT_SIZE_MAX * 4) -#define DAP_QUEUE_MAX_MSGS 8 +#define DAP_QUEUE_MAX_MSGS 512 typedef enum { DESCRIPTOR_TYPE_SOCKET_CLIENT = 0, @@ -219,8 +220,6 @@ typedef struct dap_events_socket { atomic_bool is_initalized; bool was_reassigned; // Was reassigment at least once - uint32_t buf_out_zero_count; - // Input section byte_t *buf_in; size_t buf_in_size_max; // size of alloced buffer @@ -342,9 +341,6 @@ void dap_events_socket_assign_on_worker_inter(dap_events_socket_t * a_es_input, void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, dap_events_socket_t * a_es, dap_worker_t * a_worker_new); void dap_events_socket_reassign_between_workers_unsafe(dap_events_socket_t * a_es, dap_worker_t * a_worker_new); - -size_t dap_events_socket_pop_from_buf_in(dap_events_socket_t *sc, void * data, size_t data_size); - // Non-MT functions dap_events_socket_t * dap_worker_esocket_find_uuid(dap_worker_t * a_worker, dap_events_socket_uuid_t a_es_uuid); @@ -376,7 +372,11 @@ void dap_events_socket_remove_and_delete_unsafe_delayed( dap_events_socket_t *a_ void dap_events_socket_descriptor_close(dap_events_socket_t *a_socket); void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap_worker_t * a_worker); + +// Buffer functions +size_t dap_events_socket_pop_from_buf_in(dap_events_socket_t *sc, void * data, size_t data_size); void dap_events_socket_shrink_buf_in(dap_events_socket_t * cl, size_t shrink_size); +DAP_STATIC_INLINE size_t dap_events_socket_get_free_buf_size(dap_events_socket_t *a_es) { return a_es->buf_out_size_max - a_es->buf_out_size; } #ifdef DAP_OS_WINDOWS DAP_STATIC_INLINE int dap_recvfrom(SOCKET s, void* buf_in, size_t buf_size) { diff --git a/dap-sdk/net/server/http_server/dap_http_simple.c b/dap-sdk/net/server/http_server/dap_http_simple.c index 0dddbf1d3a..575a06f19c 100644 --- a/dap-sdk/net/server/http_server/dap_http_simple.c +++ b/dap-sdk/net/server/http_server/dap_http_simple.c @@ -228,12 +228,19 @@ inline static bool s_is_supported_user_agents_list_setted() return cnt; } -inline static void s_set_writable_flags(dap_http_simple_t * a_simple) +inline static void s_write_data_to_socket(dap_proc_thread_t *a_thread, dap_http_simple_t * a_simple) { - // log_it(L_DEBUG,"_set_only_write_http_client_state"); - a_simple->http_client->state_write=DAP_HTTP_CLIENT_STATE_START; - dap_events_socket_set_writable_unsafe( a_simple->http_client->esocket,true); - + a_simple->http_client->state_write = DAP_HTTP_CLIENT_STATE_START; + if (!a_simple->reply) { + a_simple->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; + log_it( L_WARNING, "No reply to write, close connection" ); + } else { + a_simple->reply_sent += dap_events_socket_write_unsafe(a_simple->esocket, + a_simple->reply_byte + a_simple->reply_sent, + a_simple->http_client->out_content_length - a_simple->reply_sent); + dap_events_socket_set_writable_unsafe(a_simple->esocket, true); + } + dap_proc_thread_assign_on_worker_inter(a_thread, a_simple->worker, a_simple->esocket); } static void s_copy_reply_and_mime_to_response( dap_http_simple_t *a_simple ) @@ -293,8 +300,7 @@ static bool s_proc_queue_callback(dap_proc_thread_t * a_thread, void * a_arg ) if (!header && !is_unknown_user_agents_pass) { const char error_msg[] = "Not found User-Agent HTTP header"; s_write_response_bad_request(l_http_simple, error_msg); - s_set_writable_flags( l_http_simple); - dap_proc_thread_assign_on_worker_inter(a_thread, l_http_simple->worker, l_http_simple->esocket); + s_write_data_to_socket(a_thread, l_http_simple); return true; } @@ -303,8 +309,7 @@ static bool s_proc_queue_callback(dap_proc_thread_t * a_thread, void * a_arg ) log_it(L_DEBUG, "Not supported user agent in request: %s", header->value); const char* error_msg = "User-Agent version not supported. Update your software"; s_write_response_bad_request(l_http_simple, error_msg); - s_set_writable_flags( l_http_simple); - dap_proc_thread_assign_on_worker_inter(a_thread, l_http_simple->worker, l_http_simple->esocket); + s_write_data_to_socket(a_thread, l_http_simple); return true; } } @@ -320,10 +325,7 @@ static bool s_proc_queue_callback(dap_proc_thread_t * a_thread, void * a_arg ) l_http_simple->http_client->reply_status_code = Http_Status_InternalServerError; } dap_http_client_out_header_generate(l_http_simple->http_client); - - - s_set_writable_flags( l_http_simple); - dap_proc_thread_assign_on_worker_inter(a_thread, l_http_simple->worker, l_http_simple->esocket); + s_write_data_to_socket(a_thread, l_http_simple); return true; } @@ -377,34 +379,13 @@ static void s_http_client_headers_read( dap_http_client_t *a_http_client, void * } } -static void s_http_client_data_write( dap_http_client_t * a_http_client, void *a_arg ) +static void s_http_client_data_write(dap_http_client_t * a_http_client, void *a_arg) { (void) a_arg; dap_http_simple_t *l_http_simple = DAP_HTTP_SIMPLE( a_http_client ); - // log_it(L_DEBUG,"dap_http_simple_data_write"); - // Sleep(300); - if (!l_http_simple){ - a_http_client->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; - log_it( L_WARNING, "No http_simple object in write callback, close connection" ); - return; - } - - if ( !l_http_simple->reply ) { - a_http_client->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; - log_it( L_WARNING, "No reply to write, close connection" ); - return; - } - - l_http_simple->reply_sent += dap_events_socket_write_unsafe( a_http_client->esocket, - l_http_simple->reply_byte + l_http_simple->reply_sent, - a_http_client->out_content_length - l_http_simple->reply_sent ); - if ( l_http_simple->reply_sent >= a_http_client->out_content_length ) { log_it(L_INFO, "All the reply (%zu) is sent out", a_http_client->out_content_length ); - //cl_ht->client->signal_close=cl_ht->keep_alive; a_http_client->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; - //dap_client_ready_to_write(cl_ht->client,false); - //DAP_DELETE(l_http_simple->reply ); } } diff --git a/dap-sdk/net/server/http_server/http_client/dap_http_client.c b/dap-sdk/net/server/http_server/http_client/dap_http_client.c index 0081fdb83e..0068be8ce2 100644 --- a/dap-sdk/net/server/http_server/http_client/dap_http_client.c +++ b/dap-sdk/net/server/http_server/http_client/dap_http_client.c @@ -227,6 +227,8 @@ static bool s_request_line_parse( dap_http_client_t *a_http_client, char *a_buf, log_it( L_NOTICE, "dap_http_request_line_parse" ); + a_http_client->url_path[0] = a_http_client->action[0] = '\0'; + for( l_pos = 0; l_pos < a_buf_length; l_pos ++ ) { if ( a_buf[l_pos] == '\n' ) @@ -253,8 +255,10 @@ static bool s_request_line_parse( dap_http_client_t *a_http_client, char *a_buf, case PS_URL: { size_t c_size = l_pos - l_pos_kw_begin; - if ( c_size + 1 > sizeof(a_http_client->action) ) - c_size = sizeof( a_http_client->url_path ) - 1; + if ( c_size + 1 > sizeof(a_http_client->url_path) ) { + log_it(L_ERROR, "Too long URL with size %zu is truncated", c_size); + c_size = sizeof( a_http_client->url_path ) - 1; + } memcpy( a_http_client->url_path, a_buf + l_pos_kw_begin, c_size ); a_http_client->url_path[c_size] = 0; @@ -363,7 +367,7 @@ void dap_http_client_read( dap_events_socket_t *a_esocket, void *a_arg ) memcpy( l_buf_line, a_esocket->buf_in, eol + 1 ); // copy with LF dap_events_socket_shrink_buf_in( a_esocket, eol + 1 ); - l_buf_line[ eol + 2 ] = 0; // null terminate + l_buf_line[ eol + 1 ] = 0; // null terminate // parse http_request_line if ( !s_request_line_parse(l_http_client, l_buf_line, eol + 1) ) { @@ -542,6 +546,7 @@ void dap_http_client_write( dap_events_socket_t * a_esocket, void *a_arg ) switch( l_http_client->state_write ) { case DAP_HTTP_CLIENT_STATE_NONE: + default: return; case DAP_HTTP_CLIENT_STATE_START:{ if ( l_http_client->proc ){ @@ -563,9 +568,8 @@ void dap_http_client_write( dap_events_socket_t * a_esocket, void *a_arg ) log_it( L_INFO," HTTP response with %u status code", l_http_client->reply_status_code ); dap_events_socket_write_f_unsafe(a_esocket, "HTTP/1.1 %u %s\r\n",l_http_client->reply_status_code, l_http_client->reply_reason_phrase[0] ? l_http_client->reply_reason_phrase : http_status_reason_phrase(l_http_client->reply_status_code) ); - dap_events_socket_set_writable_unsafe(a_esocket, true); l_http_client->state_write = DAP_HTTP_CLIENT_STATE_HEADERS; - } break; + } case DAP_HTTP_CLIENT_STATE_HEADERS: { dap_http_header_t *hdr = l_http_client->out_headers; @@ -573,7 +577,6 @@ void dap_http_client_write( dap_events_socket_t * a_esocket, void *a_arg ) log_it(L_DEBUG, "Output: headers are over (reply status code %hu content_lentgh %zu)", l_http_client->reply_status_code, l_http_client->out_content_length); dap_events_socket_write_f_unsafe(a_esocket, "\r\n"); - dap_events_socket_set_writable_unsafe(a_esocket, true); if ( l_http_client->out_content_length || l_http_client->out_content_ready ) { l_http_client->state_write=DAP_HTTP_CLIENT_STATE_DATA; } else { @@ -581,16 +584,17 @@ void dap_http_client_write( dap_events_socket_t * a_esocket, void *a_arg ) l_http_client->state_write = DAP_HTTP_CLIENT_STATE_NONE; dap_events_socket_set_writable_unsafe( a_esocket, false ); a_esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; + break; } dap_events_socket_set_readable_unsafe( a_esocket, true ); } else { //log_it(L_DEBUG,"Output: header %s: %s",hdr->name,hdr->value); dap_events_socket_write_f_unsafe(a_esocket, "%s: %s\r\n", hdr->name, hdr->value); - dap_events_socket_set_writable_unsafe(a_esocket, true); dap_http_header_remove( &l_http_client->out_headers, hdr ); } - } break; - case DAP_HTTP_CLIENT_STATE_DATA:{ + } + + case DAP_HTTP_CLIENT_STATE_DATA: { if ( l_http_client->proc ){ pthread_rwlock_rdlock(&l_http_client->proc->cache_rwlock); if ( ( l_http_client->proc->cache == NULL && diff --git a/dap-sdk/net/stream/stream/dap_stream_pkt.c b/dap-sdk/net/stream/stream/dap_stream_pkt.c index 022fc64dbe..58a7aecc38 100644 --- a/dap-sdk/net/stream/stream/dap_stream_pkt.c +++ b/dap-sdk/net/stream/stream/dap_stream_pkt.c @@ -125,9 +125,6 @@ size_t dap_stream_pkt_read_unsafe( dap_stream_t * a_stream, dap_stream_pkt_t * a return ds; } - -#define DAP_STREAM_CH_PKT_ENCRYPTION_OVERHEAD 200 //in fact is's about 2*16+15 for OAES - /** * @brief stream_ch_pkt_write * @param ch @@ -144,7 +141,7 @@ size_t dap_stream_pkt_write_unsafe(dap_stream_t * a_stream, const void * a_data, uint8_t * l_buf_allocated = NULL; uint8_t * l_buf_selected = a_stream->buf; - size_t l_buf_size_required = a_data_size + DAP_STREAM_CH_PKT_ENCRYPTION_OVERHEAD; + size_t l_buf_size_required = a_data_size + DAP_STREAM_PKT_ENCRYPTION_OVERHEAD; if(l_buf_size_required > sizeof(a_stream->buf) ){ l_buf_allocated = DAP_NEW_SIZE(uint8_t, l_buf_size_required); diff --git a/dap-sdk/net/stream/stream/include/dap_stream_pkt.h b/dap-sdk/net/stream/stream/include/dap_stream_pkt.h index cfd5c3d435..fb6346c87c 100644 --- a/dap-sdk/net/stream/stream/include/dap_stream_pkt.h +++ b/dap-sdk/net/stream/stream/include/dap_stream_pkt.h @@ -31,6 +31,8 @@ typedef struct dap_stream_session dap_stream_session_t; #define STREAM_PKT_TYPE_ALIVE 0x12 #define STREAM_PKT_SIG_SIZE 8 +#define DAP_STREAM_PKT_ENCRYPTION_OVERHEAD 200 //in fact is's about 2*16+15 for OAES + typedef struct dap_stream_pkt_hdr{ uint8_t sig[STREAM_PKT_SIG_SIZE]; // Signature to find out beginning of the frame uint32_t size; diff --git a/modules/chain/dap_chain.c b/modules/chain/dap_chain.c index 7e263c83db..f2a9f44538 100644 --- a/modules/chain/dap_chain.c +++ b/modules/chain/dap_chain.c @@ -415,14 +415,12 @@ dap_chain_t * dap_chain_load_from_cfg(dap_ledger_t* a_ledger, const char * a_cha } // add datum types if(l_chain && l_datum_types && l_datum_types_count) { - l_chain->autoproc_datum_types = DAP_NEW_SIZE(uint16_t, l_datum_types_count * sizeof(uint16_t)); + l_chain->autoproc_datum_types = DAP_NEW_Z_SIZE(uint16_t, l_chain->datum_types_count * sizeof(uint16_t)); uint16_t l_count_recognized = 0; for(uint16_t i = 0; i < l_datum_types_count; i++) { if (!strcmp(l_datum_types[i], "all") && l_chain->datum_types_count) { - l_chain->autoproc_datum_types = DAP_REALLOC(l_chain->autoproc_datum_types, l_chain->datum_types_count * sizeof(uint16_t)); - for (int j = 0; j < l_chain->datum_types_count; j++) { + for (int j = 0; j < l_chain->datum_types_count; j++) l_chain->autoproc_datum_types[j] = s_chain_type_convert(l_chain->datum_types[j]); - } l_count_recognized = l_chain->datum_types_count; break; } diff --git a/modules/chain/dap_chain_ledger.c b/modules/chain/dap_chain_ledger.c index 37445ee986..7afcd38f76 100644 --- a/modules/chain/dap_chain_ledger.c +++ b/modules/chain/dap_chain_ledger.c @@ -309,8 +309,10 @@ struct json_object *wallet_info_json_collect(dap_ledger_t *a_ledger, dap_ledger_ json_object_object_add(l_network, "name", json_object_new_string(a_ledger->net_name)); char *pos = strrchr(a_bal->key, ' '); if (pos) { - char *l_addr_str = DAP_NEW_S_SIZE(char, pos - a_bal->key + 1); + size_t l_addr_len = pos - a_bal->key; + char *l_addr_str = DAP_NEW_S_SIZE(char, l_addr_len + 1); memcpy(l_addr_str, a_bal->key, pos - a_bal->key); + *(l_addr_str + l_addr_len) = '\0'; json_object_object_add(l_network, "address", json_object_new_string(l_addr_str)); } else { json_object_object_add(l_network, "address", json_object_new_string("Unknown")); diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 5a3c322d41..2b7c3b16ed 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -49,6 +49,7 @@ #include "dap_worker.h" #include "dap_events.h" #include "dap_proc_thread.h" +#include "dap_client_pvt.h" #include "dap_chain.h" #include "dap_chain_datum.h" @@ -58,6 +59,7 @@ #include "dap_chain_global_db.h" #include "dap_stream.h" +#include "dap_stream_pkt.h" #include "dap_stream_worker.h" #include "dap_stream_ch_pkt.h" #include "dap_stream_ch.h" @@ -99,6 +101,7 @@ static void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg); static void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg); static void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg); static void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg); +static void s_stream_ch_io_complete(dap_events_socket_t *a_es, void *a_arg, int a_errno); static void s_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net_id, uint64_t a_chain_id, uint64_t a_cell_id, const char * a_err_string); static bool s_sync_out_chains_proc_callback(dap_proc_thread_t *a_thread, void *a_arg); @@ -157,6 +160,7 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); l_ch_chain->_inheritor = a_ch; pthread_rwlock_init(&l_ch_chain->idle_lock, NULL); + a_ch->stream->esocket->callbacks.write_finished_callback = s_stream_ch_io_complete; } /** @@ -165,9 +169,9 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg) * @param a_arg * @return */ -static bool s_stream_ch_delete_in_proc(dap_proc_thread_t * a_thread, void * a_arg) +static void s_stream_ch_delete_in_proc(dap_worker_t *a_worker, void *a_arg) { - (void) a_thread; + UNUSED(a_worker); dap_stream_ch_chain_t *l_ch_chain = (dap_stream_ch_chain_t *)a_arg; if (l_ch_chain->callback_notify_packet_out) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_DELETE, NULL, 0, @@ -176,7 +180,6 @@ static bool s_stream_ch_delete_in_proc(dap_proc_thread_t * a_thread, void * a_ar s_free_log_list_gdb(l_ch_chain); pthread_rwlock_destroy(&l_ch_chain->idle_lock); DAP_DELETE(l_ch_chain); - return true; } /** @@ -187,7 +190,7 @@ static bool s_stream_ch_delete_in_proc(dap_proc_thread_t * a_thread, void * a_ar static void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) { (void) a_arg; - dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input,s_stream_ch_delete_in_proc,a_ch->internal ); + dap_worker_exec_callback_on(a_ch->stream_worker->worker, s_stream_ch_delete_in_proc, a_ch->internal); a_ch->internal = NULL; // To prevent its cleaning in worker } @@ -884,6 +887,8 @@ static bool s_chain_timer_callback(void *a_arg) l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); } + if (l_ch_chain->state != CHAIN_STATE_WAITING) + s_stream_ch_packet_out(l_ch, NULL); return true; } @@ -1252,14 +1257,14 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_element++){ dap_stream_ch_chain_hash_item_t * l_hash_item = NULL; unsigned l_hash_item_hashv; - HASH_VALUE(&l_element->hash, sizeof(l_element->hash), l_hash_item_hashv); - HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_atoms, &l_element->hash, sizeof(l_element->hash), + HASH_VALUE(&l_element->hash, sizeof(dap_hash_fast_t), l_hash_item_hashv); + HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_atoms, &l_element->hash, sizeof(dap_hash_fast_t), l_hash_item_hashv, l_hash_item); if( ! l_hash_item ){ l_hash_item = DAP_NEW(dap_stream_ch_chain_hash_item_t); - memcpy(&l_hash_item->hash, &l_element->hash, sizeof (l_element->hash)); + memcpy(&l_hash_item->hash, &l_element->hash, sizeof(dap_hash_fast_t)); l_hash_item->size = l_element->size; - HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_atoms, hash, sizeof(l_hash_item->hash), + HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_atoms, hash, sizeof(dap_hash_fast_t), l_hash_item_hashv, l_hash_item); l_count_added++; /* @@ -1491,13 +1496,13 @@ static void s_free_log_list_gdb ( dap_stream_ch_chain_t * a_ch_chain) */ static void s_ch_chain_go_idle(dap_stream_ch_chain_t *a_ch_chain) { - pthread_rwlock_wrlock(&a_ch_chain->idle_lock); + //pthread_rwlock_wrlock(&a_ch_chain->idle_lock); if (a_ch_chain->state == CHAIN_STATE_IDLE) { - pthread_rwlock_unlock(&a_ch_chain->idle_lock); + //pthread_rwlock_unlock(&a_ch_chain->idle_lock); return; } a_ch_chain->state = CHAIN_STATE_IDLE; - pthread_rwlock_unlock(&a_ch_chain->idle_lock); + //pthread_rwlock_unlock(&a_ch_chain->idle_lock); if(s_debug_more) log_it(L_INFO, "Go in CHAIN_STATE_IDLE"); @@ -1523,12 +1528,67 @@ static void s_ch_chain_go_idle(dap_stream_ch_chain_t *a_ch_chain) static bool s_ch_chain_get_idle(dap_stream_ch_chain_t *a_ch_chain) { - pthread_rwlock_wrlock(&a_ch_chain->idle_lock); + //pthread_rwlock_wrlock(&a_ch_chain->idle_lock); bool ret = a_ch_chain->state == CHAIN_STATE_IDLE; - pthread_rwlock_unlock(&a_ch_chain->idle_lock); + //pthread_rwlock_unlock(&a_ch_chain->idle_lock); return ret; } +struct chain_io_complete { + dap_stream_ch_uuid_t ch_uuid; + dap_stream_ch_chain_state_t state; + uint8_t type; + uint64_t net_id; + uint64_t chain_id; + uint64_t cell_id; + size_t data_size; + byte_t data[]; +}; + +static void s_stream_ch_io_complete(dap_events_socket_t *a_es, void *a_arg, int a_errno) +{ + UNUSED(a_errno); // TODO process it + if (!a_arg) { + if (a_es->callbacks.write_callback) + a_es->callbacks.write_callback(a_es, NULL); + return; + } + struct chain_io_complete *l_arg = (struct chain_io_complete *)a_arg; + dap_client_pvt_t *l_client_pvt = DAP_ESOCKET_CLIENT_PVT(a_es); + if (l_client_pvt->stream) { + dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(l_client_pvt->stream->stream_worker, l_arg->ch_uuid); + if (l_ch) { + DAP_STREAM_CH_CHAIN(l_ch)->state = l_arg->state; + dap_stream_ch_chain_pkt_write_unsafe(l_ch, l_arg->type, l_arg->net_id, l_arg->chain_id, + l_arg->cell_id, l_arg->data, l_arg->data_size); + } + } + a_es->callbacks.arg = NULL; + DAP_DELETE(a_arg); +} + +static void s_stream_ch_chain_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id, + uint64_t a_chain_id, uint64_t a_cell_id, + const void * a_data, size_t a_data_size) +{ + size_t l_free_buf_size = dap_events_socket_get_free_buf_size(a_ch->stream->esocket) - + sizeof(dap_stream_ch_chain_pkt_t) - sizeof(dap_stream_ch_pkt_t) - + sizeof(dap_stream_pkt_t) - DAP_STREAM_PKT_ENCRYPTION_OVERHEAD; + if (l_free_buf_size < a_data_size) { + struct chain_io_complete *l_arg = DAP_NEW_SIZE(struct chain_io_complete, sizeof(struct chain_io_complete) + a_data_size); + l_arg->ch_uuid = a_ch->uuid; + l_arg->state = DAP_STREAM_CH_CHAIN(a_ch)->state; + DAP_STREAM_CH_CHAIN(a_ch)->state = CHAIN_STATE_WAITING; + l_arg->type = a_type; + l_arg->chain_id = a_chain_id; + l_arg->cell_id = a_cell_id; + l_arg->data_size = a_data_size; + memcpy(l_arg->data, a_data, a_data_size); + a_ch->stream->esocket->callbacks.arg = l_arg; + } + else + dap_stream_ch_chain_pkt_write_unsafe(a_ch, a_type, a_net_id, a_chain_id, a_cell_id, a_data, a_data_size); +} /** * @brief s_stream_ch_packet_out @@ -1539,13 +1599,10 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) { UNUSED(a_arg); - if (a_ch->stream->esocket->buf_out_size >= a_ch->stream->esocket->buf_out_size_max / 2) - return; dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); - bool l_go_idle = false; bool l_timer_reset = false; - pthread_rwlock_rdlock(&l_ch_chain->idle_lock); + //pthread_rwlock_rdlock(&l_ch_chain->idle_lock); switch (l_ch_chain->state) { // Update list of global DB records to remote case CHAIN_STATE_UPDATE_GLOBAL_DB: { @@ -1561,20 +1618,17 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_data[i].size = l_obj->pkt->data_size; } if (i) { - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB, - l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, - l_ch_chain->request_hdr.cell_id.uint64, - l_data, i * sizeof(dap_stream_ch_chain_update_element_t)); + s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB, + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, + l_data, i * sizeof(dap_stream_ch_chain_update_element_t)); l_ch_chain->stats_request_gdb_processed += i; if (s_debug_more) log_it(L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB"); - } else if (l_obj) { - // We need to return into the write callback - a_ch->stream->esocket->buf_out_zero_count = 0; - } else { + } else if (!l_obj) { l_ch_chain->request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(dap_chain_net_by_id( l_ch_chain->request_hdr.net_id)); - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END, + s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, @@ -1631,19 +1685,16 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_INFO, "Send one global_db packet len=%zu (rest=%zu/%zu items)", l_pkt_size, dap_db_log_list_get_count_rest(l_ch_chain->request_db_log), dap_db_log_list_get_count(l_ch_chain->request_db_log)); - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, + s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, l_pkt, l_pkt_size); DAP_DELETE(l_pkt); - } else if (l_obj) { - // We need to return into the write callback - a_ch->stream->esocket->buf_out_zero_count = 0; - } else { + } else if (!l_obj) { log_it( L_INFO,"Syncronized database: items syncronyzed %"DAP_UINT64_FORMAT_U" from %zu", l_ch_chain->stats_request_gdb_processed, dap_db_log_list_get_count(l_ch_chain->request_db_log)); // last message dap_stream_ch_chain_sync_request_t l_request = {}; - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); l_go_idle = true; @@ -1669,7 +1720,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) if (l_data_size){ if(s_debug_more) log_it(L_DEBUG,"Out: UPDATE_CHAINS with %zu hashes sent", l_data_size / sizeof(dap_stream_ch_chain_update_element_t)); - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS, + s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, @@ -1679,7 +1730,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) if(s_debug_more) log_it(L_INFO,"Out: UPDATE_CHAINS_END sent "); dap_stream_ch_chain_sync_request_t l_request = {}; - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END, + s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, @@ -1719,7 +1770,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_INFO, "Out CHAIN pkt: atom hash %s (size %zd) ", l_atom_hash_str, l_ch_chain->request_atom_iter->cur_size); DAP_DELETE(l_atom_hash_str); } - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id.uint64, + s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size); l_was_sent_smth = true; @@ -1727,21 +1778,19 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_hash_item->size = l_ch_chain->request_atom_iter->cur_size; // Because we sent this atom to remote - we record it to not to send it twice - HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_atoms, hash, sizeof(l_hash_item->hash), l_hash_item_hashv, + HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_atoms, hash, sizeof(dap_hash_fast_t), l_hash_item_hashv, l_hash_item); } // Then get next atom and populate new last l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); if (l_was_sent_smth) break; - else - l_timer_reset = true; } if(!l_ch_chain->request_atom_iter || !l_ch_chain->request_atom_iter->cur) { // All chains synced dap_stream_ch_chain_sync_request_t l_request = {}; // last message l_was_sent_smth = true; - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, + s_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); log_it( L_INFO,"Synced: %"DAP_UINT64_FORMAT_U" atoms processed", l_ch_chain->stats_request_atoms_processed); @@ -1750,15 +1799,14 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, NULL, 0, l_ch_chain->callback_notify_arg); } - if (! l_was_sent_smth ){ + if (!l_was_sent_smth) l_ch_chain->timer_shots = -1; - // We need to return into the write callback - a_ch->stream->esocket->buf_out_zero_count = 0; - } + else + l_timer_reset = true; } break; default: break; } - pthread_rwlock_unlock(&l_ch_chain->idle_lock); + //pthread_rwlock_unlock(&l_ch_chain->idle_lock); if (l_go_idle) s_ch_chain_go_idle(l_ch_chain); else if (l_timer_reset) diff --git a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h index 15ef31d244..be56ebffee 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h +++ b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h @@ -81,6 +81,7 @@ typedef enum dap_stream_ch_chain_state{ CHAIN_STATE_IDLE=0, + CHAIN_STATE_WAITING, CHAIN_STATE_UPDATE_GLOBAL_DB_REMOTE, // Downloadn GDB hashtable from remote CHAIN_STATE_UPDATE_GLOBAL_DB, // Update GDB hashtable to remote CHAIN_STATE_SYNC_GLOBAL_DB, diff --git a/modules/consensus/block-ton/dap_chain_cs_block_ton.c b/modules/consensus/block-ton/dap_chain_cs_block_ton.c index f1919a86f3..3c53a8f804 100644 --- a/modules/consensus/block-ton/dap_chain_cs_block_ton.c +++ b/modules/consensus/block-ton/dap_chain_cs_block_ton.c @@ -1069,7 +1069,7 @@ static void s_session_packet_in(void *a_arg, dap_chain_node_addr_t *a_sender_nod dap_time_t l_time = dap_time_now(); l_message->hdr.is_verified=false; - dap_chain_hash_fast_t l_data_hash; + dap_chain_hash_fast_t l_data_hash = {}; dap_hash_fast(a_data, a_data_size, &l_data_hash); if (memcmp(a_data_hash, &l_data_hash, sizeof(dap_chain_hash_fast_t)) != 0) { if (PVT(l_session->ton)->debug) diff --git a/modules/global-db/dap_chain_global_db_driver_cdb.c b/modules/global-db/dap_chain_global_db_driver_cdb.c index 9d19531b0c..51a967814a 100644 --- a/modules/global-db/dap_chain_global_db_driver_cdb.c +++ b/modules/global-db/dap_chain_global_db_driver_cdb.c @@ -610,9 +610,7 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) { if(a_store_obj->type == DAP_DB$K_OPTYPE_ADD) { if(!a_store_obj->key) { return -2; - } - cdb_record l_rec; - l_rec.key = (char *)a_store_obj->key; //dap_strdup(a_store_obj->key); + }; int offset = 0; char *l_val = DAP_NEW_Z_SIZE(char, sizeof(uint64_t) + sizeof(uint8_t) + sizeof(uint64_t) + a_store_obj->value_len + sizeof(uint64_t)); dap_uint_to_hex(l_val, ++l_cdb_i->id, sizeof(uint64_t)); @@ -631,19 +629,20 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) { // Add a timestamp dap_uint_to_hex(l_val + offset, a_store_obj->timestamp, sizeof(uint64_t)); offset += sizeof(uint64_t); - l_rec.val = l_val; - if (cdb_set2(l_cdb_i->cdb, l_rec.key, (int)strlen(l_rec.key), l_rec.val, offset, CDB_INSERTCACHE | CDB_OVERWRITE, 0) != CDB_SUCCESS) { - log_it(L_ERROR, "Couldn't add record with key [%s] to CDB: \"%s\"", l_rec.key, cdb_errmsg(cdb_errno(l_cdb_i->cdb))); + if (cdb_set2(l_cdb_i->cdb, a_store_obj->key, (int)strlen(a_store_obj->key), l_val, offset, CDB_INSERTCACHE | CDB_OVERWRITE, 0) != CDB_SUCCESS) { + log_it(L_ERROR, "Couldn't add record with key [%s] to CDB: \"%s\"", a_store_obj->key, cdb_errmsg(cdb_errno(l_cdb_i->cdb))); ret = -1; } cdb_flushalldpage(l_cdb_i->cdb); - DAP_DELETE(l_rec.val); + DAP_DELETE(l_val); } else if(a_store_obj->type == DAP_DB$K_OPTYPE_DEL) { if(a_store_obj->key) { if(cdb_del(l_cdb_i->cdb, a_store_obj->key, (int) strlen(a_store_obj->key)) == -3) ret = 1; - } else if (!dap_cdb_init_group(a_store_obj->group, CDB_TRUNC | CDB_PAGEWARMUP)) - ret = -1; + } else { + dap_cdb_init_group(a_store_obj->group, CDB_TRUNC | CDB_PAGEWARMUP); + ret = 0; + } } return ret; } diff --git a/modules/net/srv/dap_chain_net_srv_order.c b/modules/net/srv/dap_chain_net_srv_order.c index 096384d333..26b841cb7d 100644 --- a/modules/net/srv/dap_chain_net_srv_order.c +++ b/modules/net/srv/dap_chain_net_srv_order.c @@ -286,8 +286,15 @@ dap_chain_net_srv_order_t *dap_chain_net_srv_order_compose( ) { UNUSED(a_expires); - if (!a_net && !a_key) // Order must have network & sign + // Order must have network & sign + if (!a_net) { + log_it(L_WARNING, "Order mast have a network"); return NULL; + } + if (!a_key) { + log_it(L_WARNING, "Order mast have a sign"); + return NULL; + } dap_chain_net_srv_order_t *l_order; if (a_ext_size) { l_order = (dap_chain_net_srv_order_t *)DAP_NEW_Z_SIZE(void, sizeof(dap_chain_net_srv_order_t) + a_ext_size); diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index 0c4ede5eac..70bf83f326 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -1386,7 +1386,7 @@ static void s_es_tun_write(dap_events_socket_t *a_es, void *arg) ch_vpn_pkt_t *l_vpn_pkt = (ch_vpn_pkt_t *)l_tun->fifo->data; if (!l_vpn_pkt) return; - a_es->buf_out_zero_count = 0; + //a_es->buf_out_zero_count = 0; // TODO remake it with new writing logic size_t l_size_to_send = l_vpn_pkt->header.op_data.data_size; ssize_t l_ret = write(l_tun->es->fd, l_vpn_pkt->data, l_size_to_send); if (l_ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { diff --git a/modules/service/xchange/dap_chain_net_srv_xchange.c b/modules/service/xchange/dap_chain_net_srv_xchange.c index c96bbe1a95..017f40e800 100644 --- a/modules/service/xchange/dap_chain_net_srv_xchange.c +++ b/modules/service/xchange/dap_chain_net_srv_xchange.c @@ -417,7 +417,7 @@ char *s_xchange_order_create(dap_chain_net_srv_xchange_price_t *a_price, dap_cha uint256_t l_datoshi_buy = {}; // TODO rework it with fixed point MULT_256_FLOAT(a_price->datoshi_sell, 1 / a_price->rate); char *l_order_hash_str = dap_chain_net_srv_order_create(a_price->net_buy, SERV_DIR_SELL, l_uid, *l_node_addr, l_tx_hash, l_datoshi_buy, l_unit, a_price->token_buy, 0, - (uint8_t *)&l_ext, l_ext_size, NULL, 0, NULL); + (uint8_t *)&l_ext, l_ext_size, NULL, 0, a_price->wallet_key); return l_order_hash_str; } @@ -606,16 +606,18 @@ static int s_cli_srv_xchange_price(int a_argc, char **a_argv, int a_arg_index, c l_price->rate = l_rate; // Create conditional transaction dap_chain_datum_tx_t *l_tx = s_xchange_tx_create_request(l_price, l_wallet); - dap_chain_wallet_close(l_wallet); if (!l_tx) { dap_chain_node_cli_set_reply_text(a_str_reply, "Can't compose the conditional transaction"); DAP_DELETE(l_price->key_ptr); DAP_DELETE(l_price->wallet_str); DAP_DELETE(l_price); + dap_chain_wallet_close(l_wallet); return -14; } // Create the order & put it to GDB + l_price->wallet_key = dap_chain_wallet_get_key(l_wallet, 0); char *l_order_hash_str = s_xchange_order_create(l_price, l_tx); + dap_chain_wallet_close(l_wallet); if (l_order_hash_str) { dap_chain_hash_fast_from_str(l_order_hash_str, &l_price->order_hash); if(!s_xchange_tx_put(l_tx, l_net_buy)) { diff --git a/modules/service/xchange/include/dap_chain_net_srv_xchange.h b/modules/service/xchange/include/dap_chain_net_srv_xchange.h index f7ffaafed5..6776a7e0ba 100644 --- a/modules/service/xchange/include/dap_chain_net_srv_xchange.h +++ b/modules/service/xchange/include/dap_chain_net_srv_xchange.h @@ -40,6 +40,8 @@ typedef struct dap_chain_net_srv_xchange_price { long double rate; dap_chain_hash_fast_t tx_hash; dap_chain_hash_fast_t order_hash; + dap_enc_key_t *wallet_key; + char *key_ptr; UT_hash_handle hh; } dap_chain_net_srv_xchange_price_t; diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index c43ae400ba..a8a752905e 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -289,11 +289,11 @@ int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) if(!l_dag->is_celled){ char *l_gdb_group = dap_strdup_printf( "dag-%s-%s-round", l_net->pub.gdb_groups_prefix, a_chain->name); l_dag->gdb_group_events_round_new = dap_strdup_printf( "%s.%s", l_gdb_group, l_round_new_str); - dap_chain_global_db_add_sync_group(l_net->pub.name, l_gdb_group, s_history_callback_round_notify, l_dag); + dap_chain_global_db_add_sync_extra_group(l_net->pub.name, l_gdb_group, s_history_callback_round_notify, l_dag); } else { char *l_gdb_group = dap_strdup_printf( "dag-%s-%s-%016llx-round", l_net->pub.gdb_groups_prefix, a_chain->name, 0);//a_chain->cells->id.uint64); l_dag->gdb_group_events_round_new = dap_strdup_printf( "%s.%s", l_gdb_group, l_round_new_str); - dap_chain_global_db_add_sync_group(l_net->pub.name, l_gdb_group, s_history_callback_round_notify, l_dag); + dap_chain_global_db_add_sync_extra_group(l_net->pub.name, l_gdb_group, s_history_callback_round_notify, l_dag); } dap_chain_global_db_gr_del(NULL, l_dag->gdb_group_events_round_new); l_dag->gdb_group_datums_queue = dap_strdup_printf("local.datums-queue.chain-%s.%s", -- GitLab