From 87d85369025202c3770b1f91b63eab6b139abc06 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Thu, 1 Oct 2020 14:24:27 +0000 Subject: [PATCH] [+] INT/TERM singal callbacks [+] Deinit functions call for proper global_db and chain net close --- CMakeLists.txt | 2 +- dap-sdk/net/client/dap_client.c | 57 +++++- dap-sdk/net/client/dap_client_pvt.c | 18 +- dap-sdk/net/client/include/dap_client.h | 1 + dap-sdk/net/core/dap_events.c | 5 +- dap-sdk/net/core/dap_events_socket.c | 170 +++++++++-------- dap-sdk/net/core/dap_proc_queue.c | 10 +- dap-sdk/net/core/dap_proc_thread.c | 93 ++++++++-- dap-sdk/net/core/dap_worker.c | 170 +++++++++++++---- dap-sdk/net/core/include/dap_events.h | 2 +- dap-sdk/net/core/include/dap_events_socket.h | 9 +- dap-sdk/net/core/include/dap_proc_queue.h | 4 +- dap-sdk/net/core/include/dap_proc_thread.h | 2 + dap-sdk/net/core/include/dap_worker.h | 11 ++ dap-sdk/net/server/enc_server/dap_enc_http.c | 58 +++--- .../net/server/http_server/dap_http_simple.c | 4 +- dap-sdk/net/stream/stream/dap_stream_ctl.c | 43 +++-- modules/chain/dap_chain_ledger.c | 171 ++++++++++++------ modules/chain/include/dap_chain_ledger.h | 10 + modules/channel/chain/dap_stream_ch_chain.c | 25 +-- .../chain/include/dap_stream_ch_chain.h | 2 +- modules/net/dap_chain_net.c | 7 +- modules/net/dap_chain_node_cli_cmd.c | 2 + modules/type/dag/dap_chain_cs_dag.c | 109 ++++++----- modules/type/dag/include/dap_chain_cs_dag.h | 6 + 25 files changed, 693 insertions(+), 298 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 86f838c069..50ddb66daa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.6-7") +set(CELLFRAME_SDK_NATIVE_VERSION "2.6-9") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/dap-sdk/net/client/dap_client.c b/dap-sdk/net/client/dap_client.c index b164bab857..b55f4d0229 100644 --- a/dap-sdk/net/client/dap_client.c +++ b/dap-sdk/net/client/dap_client.c @@ -109,7 +109,11 @@ MEM_ALLOC_ERR: void dap_client_set_uplink(dap_client_t * a_client,const char* a_addr, uint16_t a_port) { if(a_addr == NULL){ - log_it(L_ERROR,"Address is NULL"); + log_it(L_ERROR,"Address is NULL for dap_client_set_uplink"); + return; + } + if(a_client == NULL){ + log_it(L_ERROR,"Client is NULL for dap_client_set_uplink"); return; } DAP_CLIENT_PVT(a_client)->uplink_addr = strdup(a_addr); @@ -123,6 +127,11 @@ void dap_client_set_uplink(dap_client_t * a_client,const char* a_addr, uint16_t */ const char* dap_client_get_uplink_addr(dap_client_t * a_client) { + if(a_client == NULL){ + log_it(L_ERROR,"Client is NULL for dap_client_get_uplink"); + return NULL; + } + return DAP_CLIENT_PVT(a_client)->uplink_addr; } @@ -133,9 +142,14 @@ const char* dap_client_get_uplink_addr(dap_client_t * a_client) */ void dap_client_set_active_channels (dap_client_t * a_client, const char * a_active_channels) { + if(a_client == NULL){ + log_it(L_ERROR,"Client is NULL for dap_client_set_active_channels"); + return; + } + if ( DAP_CLIENT_PVT(a_client)->active_channels ) DAP_DELETE(DAP_CLIENT_PVT(a_client)->active_channels ); - DAP_CLIENT_PVT(a_client)->active_channels = dap_strdup( a_active_channels); + DAP_CLIENT_PVT(a_client)->active_channels = a_active_channels? dap_strdup( a_active_channels) : NULL; } /** @@ -145,11 +159,21 @@ void dap_client_set_active_channels (dap_client_t * a_client, const char * a_act */ uint16_t dap_client_get_uplink_port(dap_client_t * a_client) { + if(a_client == NULL){ + log_it(L_ERROR,"Client is NULL for dap_client_get_uplink_port"); + return 0; + } + return DAP_CLIENT_PVT(a_client)->uplink_port; } void dap_client_set_auth_cert(dap_client_t * a_client, dap_cert_t *a_cert) { + if(a_client == NULL){ + log_it(L_ERROR,"Client is NULL for dap_client_set_auth_cert"); + return; + } + DAP_CLIENT_PVT(a_client)->auth_cert = a_cert; } @@ -348,7 +372,11 @@ const char * dap_client_error_str(dap_client_error_t a_client_error) */ const char * dap_client_get_error_str(dap_client_t * a_client) { - return dap_client_error_str( DAP_CLIENT_PVT(a_client)->last_error ); + if(a_client == NULL){ + log_it(L_ERROR,"Client is NULL for dap_client_get_error_str"); + return NULL; + } + return dap_client_error_str( DAP_CLIENT_PVT(a_client)->last_error ); } /** * @brief dap_client_get_stage @@ -357,6 +385,10 @@ const char * dap_client_get_error_str(dap_client_t * a_client) */ dap_client_stage_t dap_client_get_stage(dap_client_t * a_client) { + if(a_client == NULL){ + log_it(L_ERROR,"Client is NULL for dap_client_get_stage"); + return -1; + } return DAP_CLIENT_PVT(a_client)->stage; } @@ -366,6 +398,10 @@ dap_client_stage_t dap_client_get_stage(dap_client_t * a_client) * @return */ const char * dap_client_get_stage_status_str(dap_client_t *a_client){ + if(a_client == NULL){ + log_it(L_ERROR,"Client is NULL for dap_client_get_stage_status_str"); + return NULL; + } return dap_client_stage_status_str(DAP_CLIENT_PVT(a_client)->stage_status); } @@ -392,6 +428,10 @@ const char * dap_client_stage_status_str(dap_client_stage_status_t a_stage_statu */ const char * dap_client_get_stage_str(dap_client_t *a_client) { + if(a_client == NULL){ + log_it(L_ERROR,"Client is NULL for dap_client_get_stage_str"); + return NULL; + } return dap_client_stage_str(DAP_CLIENT_PVT(a_client)->stage); } @@ -439,6 +479,11 @@ dap_enc_key_t * dap_client_get_key_stream(dap_client_t * a_client){ */ dap_stream_t * dap_client_get_stream(dap_client_t * a_client) { + if(a_client == NULL){ + log_it(L_ERROR,"Client is NULL for dap_client_get_stream"); + return NULL; + } + dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); return (l_client_internal) ? l_client_internal->stream : NULL; } @@ -450,6 +495,10 @@ dap_stream_t * dap_client_get_stream(dap_client_t * a_client) */ dap_stream_worker_t * dap_client_get_stream_worker(dap_client_t * a_client) { + if(a_client == NULL){ + log_it(L_ERROR,"Client is NULL for dap_client_get_stream_worker"); + return NULL; + } dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); return (l_client_internal) ? l_client_internal->stream_worker : NULL; @@ -460,7 +509,7 @@ dap_stream_ch_t * dap_client_get_stream_ch(dap_client_t * a_client, uint8_t a_ch dap_stream_ch_t * l_ch = NULL; dap_client_pvt_t * l_client_internal = a_client ? DAP_CLIENT_PVT(a_client) : NULL; if(l_client_internal && l_client_internal->stream && l_client_internal->stream_es) - for(int i = 0; i < l_client_internal->stream->channel_count; i++) { + for(size_t i = 0; i < l_client_internal->stream->channel_count; i++) { if(l_client_internal->stream->channel[i]->proc->id == a_ch_id) { l_ch = l_client_internal->stream->channel[i]; break; diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index f2be7f840f..3eb3a14148 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -421,7 +421,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) l_sign_size = dap_sign_get_size(l_sign); } uint8_t l_data[l_key_size + l_sign_size]; - memcpy(l_data, a_client_pvt->session_key_open->pub_key_data, l_key_size); + memcpy(l_data,a_client_pvt->session_key_open->pub_key_data, l_key_size); if (l_sign) { memcpy(l_data + l_key_size, l_sign, l_sign_size); } @@ -506,6 +506,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) a_client_pvt->stream_es->_inheritor = a_client_pvt;//->client; a_client_pvt->stream = dap_stream_new_es_client(a_client_pvt->stream_es); + assert(a_client_pvt->stream); a_client_pvt->stream->is_client_to_uplink = true; a_client_pvt->stream->session = dap_stream_session_pure_new(); // may be from in packet? @@ -522,7 +523,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) if(inet_pton(AF_INET, a_client_pvt->uplink_addr, &(l_remote_addr.sin_addr)) < 0) { log_it(L_ERROR, "Wrong remote address '%s:%u'", a_client_pvt->uplink_addr, a_client_pvt->uplink_port); //close(a_client_pvt->stream_socket); - dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_es->worker, a_client_pvt->stream_es); + dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es); //a_client_pvt->stream_socket = 0; a_client_pvt->stage_status = STAGE_STATUS_ERROR; } @@ -530,7 +531,8 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) int l_err = 0; if((l_err = connect(a_client_pvt->stream_socket, (struct sockaddr *) &l_remote_addr, sizeof(struct sockaddr_in))) != -1) { - a_client_pvt->stream_es->flags &= ~DAP_SOCK_SIGNAL_CLOSE; + + // a_client_pvt->stream_es->flags &= ~DAP_SOCK_SIGNAL_CLOSE;// ??? what it was? Why out of esocket context??? //s_set_sock_nonblock(a_client_pvt->stream_socket, false); log_it(L_INFO, "Remote address connected (%s:%u) with sock_id %d (assign on worker #%u)", a_client_pvt->uplink_addr, a_client_pvt->uplink_port, a_client_pvt->stream_socket, l_worker->id); @@ -539,7 +541,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) else { log_it(L_ERROR, "Remote address can't connected (%s:%u) with sock_id %d", a_client_pvt->uplink_addr, a_client_pvt->uplink_port); - dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_es->worker, a_client_pvt->stream_es); + dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es); //close(a_client_pvt->stream_socket); a_client_pvt->stream_socket = 0; a_client_pvt->stage_status = STAGE_STATUS_ERROR; @@ -551,7 +553,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) break; case STAGE_STREAM_CONNECTED: { log_it(L_INFO, "Go to stage STAGE_STREAM_CONNECTED"); - size_t count_channels = strlen(a_client_pvt->active_channels); + size_t count_channels = a_client_pvt->active_channels? strlen(a_client_pvt->active_channels) : 0; for(size_t i = 0; i < count_channels; i++) { dap_stream_ch_new(a_client_pvt->stream, (uint8_t) a_client_pvt->active_channels[i]); //sid->channel[i]->ready_to_write = true; @@ -850,9 +852,10 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char } } - size_t l_key_hdr_str_size_max = strlen(a_client_internal->session_key_id) + 10; + size_t l_key_hdr_str_size_max = a_client_internal->session_key_id ? strlen(a_client_internal->session_key_id) + 10 : 12; char *l_key_hdr_str = DAP_NEW_Z_SIZE(char, l_key_hdr_str_size_max); - snprintf(l_key_hdr_str, l_key_hdr_str_size_max, "KeyID: %s", a_client_internal->session_key_id); + snprintf(l_key_hdr_str, l_key_hdr_str_size_max, "KeyID: %s", + a_client_internal->session_key_id ? a_client_internal->session_key_id : "NULL"); char *a_custom_new[2]; size_t a_custom_count = 1; @@ -1237,6 +1240,7 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) log_it(L_INFO, "================= stream delete/peer reconnect"); dap_client_pvt_t * l_client_pvt = a_es->_inheritor; + a_es->_inheritor = NULL; // To prevent delete in reactor if(l_client_pvt == NULL) { log_it(L_ERROR, "dap_client_pvt_t is not initialized"); diff --git a/dap-sdk/net/client/include/dap_client.h b/dap-sdk/net/client/include/dap_client.h index b18b329d95..272bf758ee 100644 --- a/dap-sdk/net/client/include/dap_client.h +++ b/dap-sdk/net/client/include/dap_client.h @@ -34,6 +34,7 @@ * @brief The dap_client_stage enum. Top level of client's state machine **/ typedef enum dap_client_stage { + STAGE_UNDEFINED=-1, STAGE_BEGIN=0, STAGE_ENC_INIT=1, STAGE_STREAM_CTL=2, diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c index f29f7f3d8f..7541a3c7ba 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/net/core/dap_events.c @@ -250,7 +250,7 @@ int dap_events_start( dap_events_t *a_events ) l_worker->events = a_events; l_worker->proc_queue = dap_proc_thread_get(i)->proc_queue; #ifdef DAP_EVENTS_CAPS_EPOLL - l_worker->epoll_fd = epoll_create( DAP_MAX_EPOLL_EVENTS ); + l_worker->epoll_fd = epoll_create( DAP_MAX_EVENTS_COUNT ); pthread_mutex_init(& l_worker->started_mutex, NULL); pthread_cond_init( & l_worker->started_cond, NULL); //log_it(L_DEBUG, "Created event_fd %d for worker %u", l_worker->epoll_fd,i); @@ -304,6 +304,9 @@ int dap_events_wait( dap_events_t *a_events ) */ void dap_events_stop_all( ) { + for( uint32_t i = 0; i < s_threads_count; i ++ ) { + dap_events_socket_event_signal( s_workers[i]->event_exit, 0); + } // TODO implement signal to stop the workers } diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 4ede03d902..0676e1b9fb 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -106,6 +106,8 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, #if defined(DAP_EVENTS_CAPS_EPOLL) ret->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP; + #elif defined(DAP_EVENTS_CAPS_POLL) + ret->poll_base_flags = POLLERR | POLLRDHUP | POLLHUP; #endif if ( a_sock!= 0 && a_sock != -1){ @@ -152,22 +154,6 @@ void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, dap_events_socket_queue_ptr_send(a_worker_old->queue_es_reassign, l_msg); } -/** - * @brief dap_events_socket_assign_on_worker_unsafe - * @param a_es - * @param a_worker - */ -void dap_events_socket_assign_on_worker_unsafe(dap_events_socket_t * a_es, struct dap_worker * a_worker) -{ -#if defined(DAP_EVENTS_CAPS_EPOLL) - int l_event_fd = a_es->fd; - //log_it( L_INFO, "Create event descriptor with queue %d (%p) and add it on epoll fd %d", l_event_fd, l_es, a_w->epoll_fd); - a_es->ev.events = a_es->ev_base_flags; - a_es->ev.data.ptr = a_es; - epoll_ctl(a_worker->epoll_fd, EPOLL_CTL_ADD, l_event_fd, &a_es->ev); -#endif -} - /** * @brief s_create_type_pipe * @param a_w @@ -183,7 +169,13 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c l_es->worker = a_w; l_es->events = a_w->events; l_es->callbacks.read_callback = a_callback; // Arm event callback +#if defined(DAP_EVENTS_CAPS_EPOLL) l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; +#elif defined(DAP_EVENTS_CAPS_POLL) + l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; +#else +#error "Not defined s_create_type_pipe for your platform" +#endif #if defined(DAP_EVENTS_CAPS_PIPE_POSIX) int l_pipe[2]; @@ -216,7 +208,7 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c dap_events_socket_t * dap_events_socket_create_type_pipe_mt(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags) { dap_events_socket_t * l_es = s_create_type_pipe(a_w, a_callback, a_flags); - dap_events_socket_assign_on_worker_unsafe(l_es,a_w); + dap_worker_add_events_socket_unsafe(l_es,a_w); return l_es; } @@ -230,7 +222,7 @@ dap_events_socket_t * dap_events_socket_create_type_pipe_mt(dap_worker_t * a_w, dap_events_socket_t * dap_events_socket_create_type_pipe_unsafe(dap_worker_t * a_w, dap_events_socket_callback_t a_callback, uint32_t a_flags) { dap_events_socket_t * l_es = s_create_type_pipe(a_w, a_callback, a_flags); - dap_events_socket_assign_on_worker_unsafe(l_es,a_w); + dap_worker_add_events_socket_unsafe(l_es,a_w); return l_es; } @@ -250,7 +242,15 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc l_es->worker = a_w; } l_es->callbacks.queue_ptr_callback = a_callback; // Arm event callback + +#if defined(DAP_EVENTS_CAPS_EPOLL) l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; +#elif defined(DAP_EVENTS_CAPS_POLL) + l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; +#else +#error "Not defined s_create_type_queue_ptr for your platform" +#endif + #ifdef DAP_EVENTS_CAPS_QUEUE_PIPE2 int l_pipe[2]; @@ -337,7 +337,7 @@ dap_events_socket_t * dap_events_socket_create_type_queue_ptr_unsafe(dap_worker_ assert(l_es); // If no worker - don't assign if ( a_w) - dap_events_socket_assign_on_worker_unsafe(l_es,a_w); + dap_worker_add_events_socket_unsafe(l_es,a_w); return l_es; } @@ -398,7 +398,13 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_ l_es->worker = a_w; } l_es->callbacks.event_callback = a_callback; // Arm event callback +#if defined(DAP_EVENTS_CAPS_EPOLL) l_es->ev_base_flags = EPOLLIN | EPOLLERR | EPOLLRDHUP | EPOLLHUP; +#elif defined(DAP_EVENTS_CAPS_POLL) + l_es->poll_base_flags = POLLIN | POLLERR | POLLRDHUP | POLLHUP; +#else +#error "Not defined s_create_type_event for your platform" +#endif #ifdef DAP_EVENTS_CAPS_EVENT_EVENTFD if((l_es->fd = eventfd(0,0) ) < 0 ){ @@ -448,7 +454,7 @@ dap_events_socket_t * dap_events_socket_create_type_event_unsafe(dap_worker_t * dap_events_socket_t * l_es = s_create_type_event(a_w, a_callback); // If no worker - don't assign if ( a_w) - dap_events_socket_assign_on_worker_unsafe(l_es,a_w); + dap_worker_add_events_socket_unsafe(l_es,a_w); return l_es; } @@ -578,8 +584,10 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg) char l_errbuf[128]; log_it(L_ERROR, "Can't send ptr to queue:\"%s\" code %d", strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)), l_errno); // Try again - if(l_errno == EAGAIN) + if(l_errno == EAGAIN){ add_ptr_to_buf(a_es, a_arg); + return 0; + } return l_errno; } #elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX) @@ -597,6 +605,8 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg) #endif } + + /** * @brief dap_events_socket_event_signal * @param a_es @@ -684,79 +694,85 @@ dap_events_socket_t *dap_events_socket_find_unsafe( int sock, struct dap_events return ret; } -/** - * @brief dap_events_socket_ready_to_read - * @param sc - * @param isReady - */ -void dap_events_socket_set_readable_unsafe( dap_events_socket_t *sc, bool is_ready ) +static void s_worker_poll_mod(dap_events_socket_t * a_esocket) { - if( is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_READ)) - return; - - sc->ev.events = sc->ev_base_flags; - sc->ev.events |= EPOLLERR; +#if defined (DAP_EVENTS_CAPS_EPOLL) + int events = a_esocket->ev_base_flags | EPOLLERR; - if ( is_ready ) - sc->flags |= DAP_SOCK_READY_TO_READ; - else - sc->flags ^= DAP_SOCK_READY_TO_READ; - - int events = EPOLLERR; + // Check & add + if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) + events |= EPOLLIN; - if( sc->flags & DAP_SOCK_READY_TO_READ ) - events |= EPOLLIN; + if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) + events |= EPOLLOUT; - if( sc->flags & DAP_SOCK_READY_TO_WRITE ) - events |= EPOLLOUT; + a_esocket->ev.events = events; - sc->ev.events = events; - if (sc->worker) - if ( epoll_ctl(sc->worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &sc->ev) == -1 ){ + if ( epoll_ctl(a_esocket->worker->epoll_fd, EPOLL_CTL_MOD, a_esocket->socket, &a_esocket->ev) ){ int l_errno = errno; char l_errbuf[128]; l_errbuf[0]=0; - strerror_r( l_errno, l_errbuf, sizeof (l_errbuf)); - log_it( L_ERROR,"Can't update read client socket state in the epoll_fd: \"%s\" (%d)", l_errbuf, l_errno ); + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + log_it(L_ERROR,"Can't update client socket state in the epoll_fd %d: \"%s\" (%d)", + a_esocket->worker->epoll_fd, l_errbuf, l_errno); + } +#elif defined (DAP_EVENTS_CAPS_POLL) + if (a_esocket->poll_index < a_esocket->worker->poll_count ){ + struct pollfd * l_poll = &a_esocket->worker->poll[a_esocket->poll_index]; + l_poll->events = a_esocket->poll_base_flags | POLLERR ; + // Check & add + if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) + l_poll->events |= POLLIN; + if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) + l_poll->events |= POLLOUT; + }else{ + log_it(L_ERROR, "Wrong poll index when remove from worker (unsafe): %u when total count %u", a_esocket->poll_index, + a_esocket->worker->poll_count); } + +#else +#error "Not defined dap_events_socket_set_writable_unsafe for your platform" +#endif + } /** - * @brief dap_events_socket_ready_to_write + * @brief dap_events_socket_ready_to_read * @param sc * @param isReady */ -void dap_events_socket_set_writable_unsafe( dap_events_socket_t *sc, bool a_is_ready ) +void dap_events_socket_set_readable_unsafe( dap_events_socket_t *a_esocket, bool is_ready ) { - if ( a_is_ready == (bool)(sc->flags & DAP_SOCK_READY_TO_WRITE)) { + if( is_ready == (bool)(a_esocket->flags & DAP_SOCK_READY_TO_READ)) return; - } - if ( a_is_ready ) - sc->flags |= DAP_SOCK_READY_TO_WRITE; + if ( is_ready ) + a_esocket->flags |= DAP_SOCK_READY_TO_READ; else - sc->flags ^= DAP_SOCK_READY_TO_WRITE; + a_esocket->flags ^= DAP_SOCK_READY_TO_READ; - int events = sc->ev_base_flags | EPOLLERR; - - // Check & add - if( sc->flags & DAP_SOCK_READY_TO_READ ) - events |= EPOLLIN; + if( a_esocket->worker) + s_worker_poll_mod( a_esocket); +} - if( sc->flags & DAP_SOCK_READY_TO_WRITE ) - events |= EPOLLOUT; +/** + * @brief dap_events_socket_ready_to_write + * @param a_esocket + * @param isReady + */ +void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool a_is_ready ) +{ + if ( a_is_ready == (bool)(a_esocket->flags & DAP_SOCK_READY_TO_WRITE)) { + return; + } - sc->ev.events = events; + if ( a_is_ready ) + a_esocket->flags |= DAP_SOCK_READY_TO_WRITE; + else + a_esocket->flags ^= DAP_SOCK_READY_TO_WRITE; - if (sc->worker && sc->type != DESCRIPTOR_TYPE_SOCKET_UDP) - if ( epoll_ctl(sc->worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &sc->ev) ){ - int l_errno = errno; - char l_errbuf[128]; - l_errbuf[0]=0; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it(L_ERROR,"Can't update write client socket state in the epoll_fd %d: \"%s\" (%d)", - sc->worker->epoll_fd, l_errbuf, l_errno); - } + if( a_esocket->worker ) + s_worker_poll_mod(a_esocket); } /** @@ -817,6 +833,8 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap // Socket already removed from worker return; } +#ifdef DAP_EVENTS_CAPS_EPOLL + if ( epoll_ctl( a_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 ) { int l_errno = errno; char l_errbuf[128]; @@ -825,6 +843,16 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap a_worker->epoll_fd, l_errbuf, l_errno); } //else // log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_worker->id ); +#elif defined (DAP_EVENTS_CAPS_POLL) + if (a_es->poll_index < a_worker->poll_count ){ + a_worker->poll[a_es->poll_index].fd = -1; + }else{ + log_it(L_ERROR, "Wrong poll index when remove from worker (unsafe): %u when total count %u", a_es->poll_index, a_worker->poll_count); + } +#else +#error "Unimplemented new esocket on worker callback for current platform" +#endif + a_worker->event_sockets_count--; if(a_worker->esockets) HASH_DELETE(hh_worker,a_worker->esockets, a_es); diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/net/core/dap_proc_queue.c index d41f6604ca..4b91b2d996 100644 --- a/dap-sdk/net/core/dap_proc_queue.c +++ b/dap-sdk/net/core/dap_proc_queue.c @@ -20,6 +20,7 @@ You should have received a copy of the GNU General Public License along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/>. */ +#include "dap_worker.h" #include "dap_proc_queue.h" #include "dap_proc_thread.h" #define LOG_TAG "dap_proc_queue" @@ -87,15 +88,10 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg) } -void dap_proc_queue_add_callback(dap_proc_queue_t * a_queue,dap_proc_queue_callback_t a_callback, void * a_callback_arg) +void dap_proc_queue_add_callback(dap_worker_t * a_worker,dap_proc_queue_callback_t a_callback, void * a_callback_arg) { dap_proc_queue_msg_t * l_msg = DAP_NEW_Z(dap_proc_queue_msg_t); l_msg->callback = a_callback; l_msg->callback_arg = a_callback_arg; - dap_events_socket_queue_ptr_send( a_queue->esocket, l_msg ); -} - -void dap_proc_queue_add_callback_auto(dap_proc_queue_callback_t a_callback, void * a_callback_arg) -{ - dap_proc_queue_add_callback( dap_proc_thread_get_auto()->proc_queue ,a_callback,a_callback_arg); + dap_events_socket_queue_ptr_send( a_worker->proc_queue->esocket , l_msg ); } diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 133bf81001..203875d1d1 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -24,9 +24,10 @@ #include <assert.h> #include "dap_server.h" -#if defined(DAP_EVENTS_CAPS_WEPOLL) -#elif defined(DAP_EVENTS_CAPS_EPOLL) +#if defined(DAP_EVENTS_CAPS_EPOLL) #include <sys/epoll.h> +#elif defined (DAP_EVENTS_CAPS_POLL) +#include <poll.h> #else #error "Unimplemented poll for this platform" #endif @@ -181,6 +182,24 @@ static void * s_proc_thread_function(void * a_arg) log_it(L_CRITICAL, "Can't add proc queue on epoll ctl"); return NULL; } +#elif defined(DAP_EVENTS_CAPS_POLL) + size_t l_poll_count_max = DAP_MAX_EVENTS_COUNT; + size_t l_poll_count = 0; + bool l_poll_compress = false; + struct pollfd * l_poll = DAP_NEW_Z_SIZE(struct pollfd,l_poll_count_max *sizeof (*l_poll)); + dap_events_socket_t ** l_esockets = DAP_NEW_Z_SIZE(dap_events_socket_t*,l_poll_count_max *sizeof (*l_esockets)); + + // Add proc queue + l_poll[0].fd = l_thread->proc_queue->esocket->fd; + l_poll[0].events = l_thread->proc_queue->esocket->poll_base_flags; + l_esockets[0] = l_thread->proc_queue->esocket; + l_poll_count++; + + // Add proc event + l_poll[1].fd = l_thread->proc_event->fd; + l_poll[1].events = l_thread->proc_event->poll_base_flags; + l_esockets[1] = l_thread->proc_event; + l_poll_count++; #else #error "Unimplemented poll events analog for this platform" @@ -196,6 +215,10 @@ static void * s_proc_thread_function(void * a_arg) #ifdef DAP_EVENTS_CAPS_EPOLL //log_it(L_DEBUG, "Epoll_wait call"); int l_selected_sockets = epoll_wait(l_thread->epoll_ctl, l_epoll_events, DAP_MAX_EPOLL_EVENTS, -1); + size_t l_sockets_max = l_selected_sockets; +#elif defined (DAP_EVENTS_CAPS_POLL) + int l_selected_sockets = poll(l_poll,l_poll_count,-1); + size_t l_sockets_max = l_poll_count; #else #error "Unimplemented poll wait analog for this platform" #endif @@ -209,30 +232,50 @@ static void * s_proc_thread_function(void * a_arg) log_it(L_ERROR, "Proc thread #%d got errno:\"%s\" (%d)", l_thread->cpu_id , l_errbuf, l_errno); break; } - time_t l_cur_time = time( NULL); - for(int32_t n = 0; n < l_selected_sockets; n++) { + for(size_t n = 0; n < l_sockets_max; n++) { dap_events_socket_t * l_cur; + bool l_flag_hup, l_flag_rdhup, l_flag_read, l_flag_write, l_flag_error; +#ifdef DAP_EVENTS_CAPS_EPOLL l_cur = (dap_events_socket_t *) l_epoll_events[n].data.ptr; uint32_t l_cur_events = l_epoll_events[n].events; + l_flag_hup = l_cur_events & EPOLLHUP; + l_flag_rdhup = l_cur_events & EPOLLHUP; + l_flag_write = l_cur_events & EPOLLOUT; + l_flag_read = l_cur_events & EPOLLIN; + l_flag_error = l_cur_events & EPOLLERR; +#elif defined ( DAP_EVENTS_CAPS_POLL) + if(n>=(int32_t)l_poll_count){ + log_it(L_WARNING,"selected_sockets(%d) is bigger then poll count (%u)", l_selected_sockets, l_poll_count); + break; + } + short l_cur_events = l_poll[n].revents ; + if (!l_cur_events) + continue; + l_cur = l_esockets[n]; + l_flag_hup = l_cur_events & POLLHUP; + l_flag_rdhup = l_cur_events & POLLHUP; + l_flag_write = l_cur_events & POLLOUT; + l_flag_read = l_cur_events & POLLIN; + l_flag_error = l_cur_events & POLLERR; +#else +#error "Unimplemented fetch esocket after poll" +#endif + if(!l_cur) { log_it(L_ERROR, "dap_events_socket NULL"); continue; } + time_t l_cur_time = time( NULL); l_cur->last_time_active = l_cur_time; - if (l_cur_events & EPOLLERR ){ - char l_buferr[128]; - strerror_r(errno,l_buferr, sizeof (l_buferr)); - log_it(L_ERROR,"Some error happend in proc thread #%u: %s", l_thread->cpu_id, l_buferr); - } - if (l_cur_events & EPOLLERR ){ + if (l_flag_error){ int l_errno = errno; char l_errbuf[128]; strerror_r(l_errno, l_errbuf,sizeof (l_errbuf)); - log_it(L_ERROR,"Some error with %d socket: %s(%d)", l_cur->socket, l_errbuf, l_errno); + log_it(L_ERROR,"Some error on proc thread #%u with %d socket: %s(%d)",l_thread->cpu_id, l_cur->socket, l_errbuf, l_errno); if(l_cur->callbacks.error_callback) l_cur->callbacks.error_callback(l_cur,errno); } - if (l_cur_events & EPOLLIN ){ + if (l_flag_read ){ switch (l_cur->type) { case DESCRIPTOR_TYPE_QUEUE: dap_events_socket_queue_proc_input_unsafe(l_cur); @@ -255,12 +298,38 @@ static void * s_proc_thread_function(void * a_arg) if(l_cur->_inheritor) DAP_DELETE(l_cur->_inheritor); DAP_DELETE(l_cur); +#elif defined (DAP_EVENTS_CAPS_POLL) + l_poll[n].fd = -1; + l_poll_compress = true; #else #error "Unimplemented poll ctl analog for this platform" #endif } } +#ifdef DAP_EVENTS_CAPS_POLL + /***********************************************************/ + /* If the compress_array flag was turned on, we need */ + /* to squeeze together the array and decrement the number */ + /* of file descriptors. We do not need to move back the */ + /* events and revents fields because the events will always*/ + /* be POLLIN in this case, and revents is output. */ + /***********************************************************/ + if ( l_poll_compress){ + l_poll_compress = false; + for (size_t i = 0; i < l_poll_count ; i++) { + if ( l_poll[i].fd == -1){ + for(size_t j = i; j < l_poll_count-1; j++){ + l_poll[j].fd = l_poll[j+1].fd; + l_esockets[j] = l_esockets[j+1]; + l_esockets[j]->poll_index = j; + } + i--; + l_poll_count--; + } + } + } +#endif } log_it(L_NOTICE, "Stop processing thread #%u", l_thread->cpu_id); return NULL; diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index affb812410..d740dcd794 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -48,6 +48,7 @@ static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_arg); static void s_queue_callback_callback( dap_events_socket_t * a_es, void * a_arg); static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg); +static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags); /** * @brief dap_worker_init @@ -75,7 +76,7 @@ void *dap_worker_thread(void *arg) { dap_events_socket_t *l_cur; dap_worker_t *l_worker = (dap_worker_t *) arg; - time_t l_next_time_timeout_check = time( NULL) + s_connection_timeout / 2; + //time_t l_next_time_timeout_check = time( NULL) + s_connection_timeout / 2; uint32_t l_tn = l_worker->id; dap_cpu_assign_thread_on(l_worker->id); @@ -83,25 +84,36 @@ void *dap_worker_thread(void *arg) l_shed_params.sched_priority = 0; pthread_setschedparam(pthread_self(),SCHED_FIFO ,&l_shed_params); +#ifdef DAP_EVENTS_CAPS_EPOLL + struct epoll_event l_epoll_events[ DAP_MAX_EPOLL_EVENTS]= {{0}}; + log_it(L_INFO, "Worker #%d started with epoll fd %d and assigned to dedicated CPU unit", l_worker->id, l_worker->epoll_fd); +#elif defined(DAP_EVENTS_CAPS_POLL) + l_worker->poll_count_max = _SC_PAGE_SIZE; + l_worker->poll = DAP_NEW_Z_SIZE(struct pollfd,l_worker->poll_count_max*sizeof (struct pollfd)); + l_worker->poll_esocket = DAP_NEW_Z_SIZE(dap_events_socket_t*,l_worker->poll_count_max*sizeof (dap_events_socket_t*)); +#else +#error "Unimplemented socket array for this platform" +#endif + + l_worker->queue_es_new = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_new_es_callback); l_worker->queue_es_delete = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_delete_es_callback); l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_io_callback); l_worker->queue_es_reassign = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_reassign_callback ); l_worker->queue_callback= dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_callback_callback); + l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback ); l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker,s_connection_timeout / 2,s_socket_all_check_activity,l_worker); -#ifdef DAP_EVENTS_CAPS_EPOLL - struct epoll_event l_epoll_events[ DAP_MAX_EPOLL_EVENTS]= {{0}}; - log_it(L_INFO, "Worker #%d started with epoll fd %d and assigned to dedicated CPU unit", l_worker->id, l_worker->epoll_fd); -#else -#error "Unimplemented socket array for this platform" -#endif pthread_cond_broadcast(&l_worker->started_cond); bool s_loop_is_active = true; while(s_loop_is_active) { #ifdef DAP_EVENTS_CAPS_EPOLL int l_selected_sockets = epoll_wait(l_worker->epoll_fd, l_epoll_events, DAP_MAX_EPOLL_EVENTS, -1); + size_t l_sockets_max = l_selected_sockets; +#elif defined(DAP_EVENTS_CAPS_POLL) + int l_selected_sockets = poll(l_worker->poll, l_worker->poll_count, -1); + size_t l_sockets_max = l_worker->poll_count; #else #error "Unimplemented poll wait analog for this platform" #endif @@ -116,20 +128,40 @@ void *dap_worker_thread(void *arg) } time_t l_cur_time = time( NULL); - for(int32_t n = 0; n < l_selected_sockets; n++) { + for(size_t n = 0; n < l_sockets_max; n++) { + bool l_flag_hup, l_flag_rdhup, l_flag_read, l_flag_write, l_flag_error; +#ifdef DAP_EVENTS_CAPS_EPOLL l_cur = (dap_events_socket_t *) l_epoll_events[n].data.ptr; + l_flag_hup = l_epoll_events[n].events & EPOLLHUP; + l_flag_rdhup = l_epoll_events[n].events & EPOLLHUP; + l_flag_write = l_epoll_events[n].events & EPOLLOUT; + l_flag_read = l_epoll_events[n].events & EPOLLIN; + l_flag_error = l_epoll_events[n].events & EPOLLERR; +#elif defined ( DAP_EVENTS_CAPS_POLL) + short l_cur_events =l_worker->poll[n].revents; + if (!l_cur_events) // No events for this socket + continue; + l_flag_hup = l_cur_events& POLLHUP; + l_flag_rdhup = l_cur_events & POLLHUP; + l_flag_write = l_cur_events & POLLOUT; + l_flag_read = l_cur_events & POLLIN; + l_flag_error = l_cur_events & POLLERR; + l_cur = l_worker->poll_esocket[n]; + //log_it(L_DEBUG, "flags: returned events 0x%0X requested events 0x%0X",l_worker->poll[n].revents,l_worker->poll[n].events ); +#else +#error "Unimplemented fetch esocket after poll" +#endif if(!l_cur) { log_it(L_ERROR, "dap_events_socket NULL"); continue; } l_cur->last_time_active = l_cur_time; +// log_it(L_DEBUG, "Worker=%d fd=%d", l_worker->id, l_cur->socket); - //log_it(L_DEBUG, "Worker=%d fd=%d socket=%d event=0x%x(%d)", l_worker->id, - // l_worker->epoll_fd,l_cur->socket, l_epoll_events[n].events,l_epoll_events[n].events); int l_sock_err = 0, l_sock_err_size = sizeof(l_sock_err); //connection already closed (EPOLLHUP - shutdown has been made in both directions) - if(l_epoll_events[n].events & EPOLLHUP) { // && events[n].events & EPOLLERR) { + if( l_flag_hup) { // && events[n].events & EPOLLERR) { switch (l_cur->type ){ case DESCRIPTOR_TYPE_SOCKET_LISTENING: case DESCRIPTOR_TYPE_SOCKET: @@ -148,7 +180,7 @@ void *dap_worker_thread(void *arg) } } - if(l_epoll_events[n].events & EPOLLERR) { + if(l_flag_error) { switch (l_cur->type ){ case DESCRIPTOR_TYPE_SOCKET_LISTENING: case DESCRIPTOR_TYPE_SOCKET: @@ -163,7 +195,7 @@ void *dap_worker_thread(void *arg) l_cur->callbacks.error_callback(l_cur, 0); // Call callback to process error event } - if (l_epoll_events[n].events & EPOLLRDHUP) { + if (l_flag_hup) { log_it(L_INFO, "Client socket disconnected"); dap_events_socket_set_readable_unsafe(l_cur, false); dap_events_socket_set_writable_unsafe(l_cur, false); @@ -172,7 +204,7 @@ void *dap_worker_thread(void *arg) } - if(l_epoll_events[n].events & EPOLLIN) { + if(l_flag_read) { //log_it(L_DEBUG, "Comes connection with type %d", l_cur->type); if(l_cur->buf_in_size == sizeof(l_cur->buf_in)) { @@ -271,7 +303,7 @@ void *dap_worker_thread(void *arg) l_cur->buf_out_size = 0; } } - else if (!(l_epoll_events[n].events & EPOLLRDHUP) || !(l_epoll_events[n].events & EPOLLERR)) { + else if (! l_flag_rdhup || !l_flag_error) { log_it(L_WARNING, "EPOLLIN triggered but nothing to read"); dap_events_socket_set_readable_unsafe(l_cur,false); } @@ -279,7 +311,7 @@ void *dap_worker_thread(void *arg) } // Socket is ready to write - if(((l_epoll_events[n].events & EPOLLOUT) || (l_cur->flags & DAP_SOCK_READY_TO_WRITE)) + if(( l_flag_write || (l_cur->flags & DAP_SOCK_READY_TO_WRITE)) && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE)) { //log_it(L_DEBUG, "Main loop output: %u bytes to send", l_cur->buf_out_size); @@ -346,7 +378,7 @@ void *dap_worker_thread(void *arg) //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", l_bytes_sent,l_cur->buf_out_size); if (l_bytes_sent) { - if ( l_bytes_sent <= l_cur->buf_out_size ){ + if ( l_bytes_sent <= (ssize_t) l_cur->buf_out_size ){ l_cur->buf_out_size -= l_bytes_sent; if (l_cur->buf_out_size ) { memmove(l_cur->buf_out, &l_cur->buf_out[l_bytes_sent], l_cur->buf_out_size); @@ -379,8 +411,35 @@ void *dap_worker_thread(void *arg) l_cur->buf_out_size); } - } + if( l_worker->signal_exit){ + log_it(L_NOTICE,"Worker :%u finished", l_worker->id); + return NULL; + } + } +#ifdef DAP_EVENTS_CAPS_POLL + /***********************************************************/ + /* If the compress_array flag was turned on, we need */ + /* to squeeze together the array and decrement the number */ + /* of file descriptors. We do not need to move back the */ + /* events and revents fields because the events will always*/ + /* be POLLIN in this case, and revents is output. */ + /***********************************************************/ + if ( l_worker->poll_compress){ + l_worker->poll_compress = false; + for (size_t i = 0; i < l_worker->poll_count ; i++) { + if ( l_worker->poll[i].fd == -1){ + for(size_t j = i; j < l_worker->poll_count-1; j++){ + l_worker->poll[j].fd = l_worker->poll[j+1].fd; + l_worker->poll_esocket[j] = l_worker->poll_esocket[j+1]; + l_worker->poll_esocket[j]->poll_index = j; + } + i--; + l_worker->poll_count--; + } + } + } +#endif } // while log_it(L_NOTICE,"Exiting thread #%u", l_worker->id); return NULL; @@ -405,7 +464,7 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) int l_cpu = w->id; setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu)); } - bool l_socket_present = (l_es_new->worker && l_es_new->is_initalized) ? true : false; + l_es_new->worker = w; // We need to differ new and reassigned esockets. If its new - is_initialized is false if ( ! l_es_new->is_initalized ){ @@ -415,23 +474,7 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) } if (l_es_new->socket>0){ - int l_ret = -1; -#ifdef DAP_EVENTS_CAPS_EPOLL - // Init events for EPOLL - l_es_new->ev.events = l_es_new->ev_base_flags ; - if(l_es_new->flags & DAP_SOCK_READY_TO_READ ) - l_es_new->ev.events |= EPOLLIN; - if(l_es_new->flags & DAP_SOCK_READY_TO_WRITE ) - l_es_new->ev.events |= EPOLLOUT; - l_es_new->ev.data.ptr = l_es_new; - if (l_socket_present) { - // Update only flags, socket already present in worker - return; - } - l_ret = epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, l_es_new->socket, &l_es_new->ev); -#else -#error "Unimplemented new esocket on worker callback for current platform" -#endif + int l_ret = dap_worker_add_events_socket_unsafe(l_es_new,w); if ( l_ret != 0 ){ log_it(L_CRITICAL,"Can't add event socket's handler to worker i/o poll mechanism with error %d", errno); }else{ @@ -502,6 +545,18 @@ static void s_queue_callback_callback( dap_events_socket_t * a_es, void * a_arg) DAP_DELETE(l_msg); } +/** + * @brief s_event_exit_callback + * @param a_es + * @param a_flags + */ +static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags) +{ + (void) a_flags; + a_es->worker->signal_exit = true; + log_it(L_NOTICE, "Worker :%u signaled to exit", a_es->worker->id); +} + /** * @brief s_pipe_data_out_read_callback * @param a_es @@ -570,11 +625,53 @@ void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_wor int l_ret = dap_events_socket_queue_ptr_send( a_worker->queue_es_new, a_events_socket ); if(l_ret != 0 ){ char l_errbuf[128]; + *l_errbuf = 0; strerror_r(l_ret,l_errbuf,sizeof (l_errbuf)); log_it(L_ERROR, "Cant send pointer in queue: \"%s\"(code %d)", l_errbuf, l_ret); } } +/** + * @brief dap_worker_add_events_socket_unsafe + * @param a_worker + * @param a_esocket + */ +int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_worker_t * a_worker ) +{ + +#ifdef DAP_EVENTS_CAPS_EPOLL + // Init events for EPOLL + a_esocket->ev.events = a_esocket->ev_base_flags ; + if(a_esocket->flags & DAP_SOCK_READY_TO_READ ) + a_esocket->ev.events |= EPOLLIN; + if(a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) + a_esocket->ev.events |= EPOLLOUT; + a_esocket->ev.data.ptr = a_esocket; + return epoll_ctl(a_worker->epoll_fd, EPOLL_CTL_ADD, a_esocket->socket, &a_esocket->ev); +#elif defined (DAP_EVENTS_CAPS_POLL) + if ( a_worker->poll_count == a_worker->poll_count_max ){ // realloc + log_it(L_WARNING, "Too many descriptors, resizing array twice"); + a_worker->poll_count_max *= 2; + a_worker->poll =DAP_REALLOC(a_worker->poll, a_worker->poll_count_max * sizeof(*a_worker->poll)); + a_worker->poll_esocket =DAP_REALLOC(a_worker->poll_esocket, a_worker->poll_count_max * sizeof(*a_worker->poll_esocket)); + } + a_worker->poll[a_worker->poll_count].fd = a_esocket->socket; + a_esocket->poll_index = a_worker->poll_count; + a_worker->poll[a_worker->poll_count].events = a_esocket->poll_base_flags; + if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) + a_worker->poll[a_worker->poll_count].events |= POLLIN; + if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE ) + a_worker->poll[a_worker->poll_count].events |= POLLOUT; + + a_worker->poll_esocket[a_worker->poll_count] = a_esocket; + a_worker->poll_count++; + return 0; +#else +#error "Unimplemented new esocket on worker callback for current platform" +#endif + +} + /** * @brief dap_worker_exec_callback_on */ @@ -586,6 +683,7 @@ void dap_worker_exec_callback_on(dap_worker_t * a_worker, dap_worker_callback_t int l_ret=dap_events_socket_queue_ptr_send( a_worker->queue_callback,l_msg ); if(l_ret != 0 ){ char l_errbuf[128]; + *l_errbuf = 0; strerror_r(l_ret,l_errbuf,sizeof (l_errbuf)); log_it(L_ERROR, "Cant send pointer in queue: \"%s\"(code %d)", l_errbuf, l_ret); } diff --git a/dap-sdk/net/core/include/dap_events.h b/dap-sdk/net/core/include/dap_events.h index c8e5471fb9..0f119ee031 100644 --- a/dap-sdk/net/core/include/dap_events.h +++ b/dap-sdk/net/core/include/dap_events.h @@ -28,7 +28,7 @@ #include "dap_server.h" #include "dap_worker.h" struct dap_events; -#define DAP_MAX_EPOLL_EVENTS 8192 +#define DAP_MAX_EVENTS_COUNT 8192 typedef void (*dap_events_callback_t) (struct dap_events *, void *arg); // Callback for specific server's operations diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 0b9f6dcd07..84df687ffa 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -32,7 +32,8 @@ // Caps for different platforms #if defined(DAP_OS_LINUX) - #define DAP_EVENTS_CAPS_EPOLL +// #define DAP_EVENTS_CAPS_EPOLL + #define DAP_EVENTS_CAPS_POLL #define DAP_EVENTS_CAPS_PIPE_POSIX #define DAP_EVENTS_CAPS_QUEUE_PIPE2 //#define DAP_EVENTS_CAPS_QUEUE_POSIX @@ -60,6 +61,8 @@ #elif defined (DAP_EVENTS_CAPS_EPOLL) #include <sys/epoll.h> #define EPOLL_HANDLE int +#elif defined (DAP_EVENTS_CAPS_POLL) +#include <poll.h> #endif #define BIT( x ) ( 1 << x ) @@ -172,6 +175,9 @@ typedef struct dap_events_socket { #ifdef DAP_EVENTS_CAPS_EPOLL uint32_t ev_base_flags; struct epoll_event ev; +#elif defined (DAP_EVENTS_CAPS_POLL) + short poll_base_flags; + uint32_t poll_index; // index in poll array on worker #endif dap_events_socket_callbacks_t callbacks; @@ -208,7 +214,6 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct dap_events *a_events, int a_sock, dap_events_socket_callbacks_t *a_callbacks ); -void dap_events_socket_assign_on_worker_unsafe(dap_events_socket_t * a_es, struct dap_worker * a_worker); void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct dap_worker * a_worker); void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, dap_events_socket_t * a_es, dap_worker_t * a_worker_new); diff --git a/dap-sdk/net/core/include/dap_proc_queue.h b/dap-sdk/net/core/include/dap_proc_queue.h index e38c2d82c3..097a44c179 100644 --- a/dap-sdk/net/core/include/dap_proc_queue.h +++ b/dap-sdk/net/core/include/dap_proc_queue.h @@ -42,7 +42,5 @@ typedef struct dap_proc_queue{ dap_proc_queue_t * dap_proc_queue_create(dap_proc_thread_t * a_thread); void dap_proc_queue_delete(dap_proc_queue_t * a_queue); -void dap_proc_queue_add_callback(dap_proc_queue_t * a_queue,dap_proc_queue_callback_t a_callback, void * a_callback_arg); - -void dap_proc_queue_add_callback_auto(dap_proc_queue_callback_t a_callback, void * a_callback_arg); +void dap_proc_queue_add_callback(dap_worker_t * a_worker, dap_proc_queue_callback_t a_callback, void * a_callback_arg); diff --git a/dap-sdk/net/core/include/dap_proc_thread.h b/dap-sdk/net/core/include/dap_proc_thread.h index 0682f285b2..b637cc8159 100644 --- a/dap-sdk/net/core/include/dap_proc_thread.h +++ b/dap-sdk/net/core/include/dap_proc_thread.h @@ -42,6 +42,8 @@ typedef struct dap_proc_thread{ #ifdef DAP_EVENTS_CAPS_EPOLL EPOLL_HANDLE epoll_ctl; +#elif defined (DAP_EVENTS_CAPS_POLL) + int poll_fd; #else #error "No poll for proc thread for your platform" #endif diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/net/core/include/dap_worker.h index eb02022f63..f8c88d66a6 100644 --- a/dap-sdk/net/core/include/dap_worker.h +++ b/dap-sdk/net/core/include/dap_worker.h @@ -36,17 +36,27 @@ typedef struct dap_worker atomic_uint event_sockets_count; dap_events_socket_t *esockets; // Hashmap of event sockets + // Signal to exit + bool signal_exit; // worker control queues dap_events_socket_t * queue_es_new; // Events socket for new socket dap_events_socket_t * queue_es_delete; // Events socke dap_events_socket_t * queue_es_reassign; // Reassign between workers dap_events_socket_t * queue_es_io; // Events socket for new socket + dap_events_socket_t * event_exit; // Events socket for exit dap_events_socket_t * queue_callback; // Queue for pure callback on worker dap_timerfd_t * timer_check_activity; #ifdef DAP_EVENTS_CAPS_EPOLL EPOLL_HANDLE epoll_fd; +#elif defined ( DAP_EVENTS_CAPS_POLL) + int poll_fd; + struct pollfd * poll; + dap_events_socket_t ** poll_esocket; + size_t poll_count; + size_t poll_count_max; + bool poll_compress; // Some of fd's became NULL so arrays need to be reassigned #endif pthread_cond_t started_cond; pthread_mutex_t started_mutex; @@ -78,6 +88,7 @@ typedef struct dap_worker_msg_callback{ int dap_worker_init( size_t a_conn_timeout ); void dap_worker_deinit(); +int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_worker_t * a_worker); void dap_worker_add_events_socket(dap_events_socket_t * a_events_socket, dap_worker_t * a_worker); dap_worker_t *dap_worker_add_events_socket_auto( dap_events_socket_t * a_events_socket ); void dap_worker_exec_callback_on(dap_worker_t * a_worker, dap_worker_callback_t a_callback, void * a_arg); diff --git a/dap-sdk/net/server/enc_server/dap_enc_http.c b/dap-sdk/net/server/enc_server/dap_enc_http.c index d8db585694..840cdb5a68 100644 --- a/dap-sdk/net/server/enc_server/dap_enc_http.c +++ b/dap-sdk/net/server/enc_server/dap_enc_http.c @@ -100,51 +100,67 @@ void enc_http_proc(struct dap_http_simple *cl_st, void * arg) http_status_code_t * return_code = (http_status_code_t*)arg; if(strcmp(cl_st->http_client->url_path,"gd4y5yh78w42aaagh") == 0 ) { - + dap_enc_key_type_t l_pkey_exchange_type =DAP_ENC_KEY_TYPE_MSRLN ; + dap_enc_key_type_t l_enc_type = DAP_ENC_KEY_TYPE_IAES; + size_t l_pkey_exchange_size=MSRLN_PKA_BYTES; + sscanf(cl_st->http_client->in_query_string, "enc_type=%d,pkey_exchange_type=%d,pkey_exchange_size=%zd", + &l_enc_type,&l_pkey_exchange_type,&l_pkey_exchange_size); + + log_it(L_DEBUG, "Stream encryption: %s\t public key exchange: %s",dap_enc_get_type_name(l_enc_type), + dap_enc_get_type_name(l_pkey_exchange_type)); uint8_t alice_msg[cl_st->request_size]; - size_t decode_len = dap_enc_base64_decode(cl_st->request, cl_st->request_size, alice_msg, DAP_ENC_DATA_TYPE_B64); + size_t l_decode_len = dap_enc_base64_decode(cl_st->request, cl_st->request_size, alice_msg, DAP_ENC_DATA_TYPE_B64); dap_chain_hash_fast_t l_sign_hash = {}; - if (decode_len < MSRLN_PKA_BYTES) { - log_it(L_WARNING, "Wrong http_enc request. Key not equal MSRLN_PKA_BYTES"); + if (l_decode_len < l_pkey_exchange_size) { + log_it(L_WARNING, "Wrong http_enc request. Key not equal pkey exchange size %zd", l_pkey_exchange_size); *return_code = Http_Status_BadRequest; return; - } else if (decode_len > MSRLN_PKA_BYTES) { - dap_sign_t *l_sign = (dap_sign_t *)&alice_msg[MSRLN_PKA_BYTES]; - if (dap_sign_verify(l_sign, alice_msg, MSRLN_PKA_BYTES) != 1) { + } else if (l_decode_len > l_pkey_exchange_size+ sizeof (dap_sign_hdr_t)) { + dap_sign_t *l_sign = (dap_sign_t *)&alice_msg[l_pkey_exchange_size]; + if (l_sign->header.sign_size == 0 || l_sign->header.sign_size > (l_decode_len - l_pkey_exchange_size) ){ + log_it(L_WARNING,"Wrong signature size %zd (decoded length %zd)",l_sign->header.sign_size, l_decode_len); + *return_code = Http_Status_BadRequest; + return; + } + if (dap_sign_verify(l_sign, alice_msg, l_pkey_exchange_size) != 1) { *return_code = Http_Status_Unauthorized; return; } dap_sign_get_pkey_hash(l_sign, &l_sign_hash); + }else if( l_decode_len != l_pkey_exchange_size){ + log_it(L_WARNING, "Wrong http_enc request. Data after pkey exchange is lesser or equal then signature's header"); + *return_code = Http_Status_BadRequest; + return; } - dap_enc_key_t* msrln_key = dap_enc_key_new(DAP_ENC_KEY_TYPE_MSRLN); - msrln_key->gen_bob_shared_key(msrln_key, alice_msg, MSRLN_PKA_BYTES, (void**)&msrln_key->pub_key_data); + dap_enc_key_t* l_pkey_exchange_key = dap_enc_key_new(l_pkey_exchange_type); + l_pkey_exchange_key->gen_bob_shared_key(l_pkey_exchange_key, alice_msg, l_pkey_exchange_size, (void**)&l_pkey_exchange_key->pub_key_data); - dap_enc_ks_key_t * key_ks = dap_enc_ks_new(); + dap_enc_ks_key_t * l_enc_key_ks = dap_enc_ks_new(); if (s_acl_callback) { - key_ks->acl_list = s_acl_callback(&l_sign_hash); + l_enc_key_ks->acl_list = s_acl_callback(&l_sign_hash); } else { - log_it(L_WARNING, "Callback for ACL is not set, pass anauthorized"); + log_it(L_DEBUG, "Callback for ACL is not set, pass anauthorized"); } - char encrypt_msg[DAP_ENC_BASE64_ENCODE_SIZE(msrln_key->pub_key_data_size) + 1]; - size_t encrypt_msg_size = dap_enc_base64_encode(msrln_key->pub_key_data, msrln_key->pub_key_data_size, encrypt_msg, DAP_ENC_DATA_TYPE_B64); + char encrypt_msg[DAP_ENC_BASE64_ENCODE_SIZE(l_pkey_exchange_key->pub_key_data_size) + 1]; + size_t encrypt_msg_size = dap_enc_base64_encode(l_pkey_exchange_key->pub_key_data, l_pkey_exchange_key->pub_key_data_size, encrypt_msg, DAP_ENC_DATA_TYPE_B64); encrypt_msg[encrypt_msg_size] = '\0'; - key_ks->key = dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_IAES, - msrln_key->priv_key_data, // shared key - msrln_key->priv_key_data_size, - key_ks->id, DAP_ENC_KS_KEY_ID_SIZE, 0); - dap_enc_ks_save_in_storage(key_ks); + l_enc_key_ks->key = dap_enc_key_new_generate(l_enc_type, + l_pkey_exchange_key->priv_key_data, // shared key + l_pkey_exchange_key->priv_key_data_size, + l_enc_key_ks->id, DAP_ENC_KS_KEY_ID_SIZE, 0); + dap_enc_ks_save_in_storage(l_enc_key_ks); char encrypt_id[DAP_ENC_BASE64_ENCODE_SIZE(DAP_ENC_KS_KEY_ID_SIZE) + 1]; - size_t encrypt_id_size = dap_enc_base64_encode(key_ks->id, DAP_ENC_KS_KEY_ID_SIZE, encrypt_id, DAP_ENC_DATA_TYPE_B64); + size_t encrypt_id_size = dap_enc_base64_encode(l_enc_key_ks->id, DAP_ENC_KS_KEY_ID_SIZE, encrypt_id, DAP_ENC_DATA_TYPE_B64); encrypt_id[encrypt_id_size] = '\0'; _enc_http_write_reply(cl_st, encrypt_id, encrypt_msg); - dap_enc_key_delete(msrln_key); + dap_enc_key_delete(l_pkey_exchange_key); *return_code = Http_Status_OK; diff --git a/dap-sdk/net/server/http_server/dap_http_simple.c b/dap-sdk/net/server/http_server/dap_http_simple.c index 0bc3b51be6..17f0cf83bc 100644 --- a/dap-sdk/net/server/http_server/dap_http_simple.c +++ b/dap-sdk/net/server/http_server/dap_http_simple.c @@ -354,7 +354,7 @@ static void s_http_client_headers_read( dap_http_client_t *a_http_client, void * } else { log_it( L_DEBUG, "No data section, execution proc callback" ); dap_events_socket_remove_from_worker_unsafe(a_http_client->esocket,a_http_client->esocket->worker); - dap_proc_queue_add_callback( l_http_simple->worker->proc_queue, s_proc_queue_callback, l_http_simple); + dap_proc_queue_add_callback( l_http_simple->worker, s_proc_queue_callback, l_http_simple); } } @@ -417,7 +417,7 @@ void s_http_client_data_read( dap_http_client_t *a_http_client, void * a_arg ) // bool isOK=true; log_it( L_INFO,"Data for http_simple_request collected" ); dap_events_socket_remove_from_worker_unsafe(a_http_client->esocket,a_http_client->esocket->worker); - dap_proc_queue_add_callback( l_http_simple->worker->proc_queue , s_proc_queue_callback, l_http_simple); + dap_proc_queue_add_callback( l_http_simple->worker , s_proc_queue_callback, l_http_simple); } } diff --git a/dap-sdk/net/stream/stream/dap_stream_ctl.c b/dap-sdk/net/stream/stream/dap_stream_ctl.c index dc82ed592c..52bf55be3d 100644 --- a/dap-sdk/net/stream/stream/dap_stream_ctl.c +++ b/dap-sdk/net/stream/stream/dap_stream_ctl.c @@ -121,26 +121,35 @@ void s_proc(struct dap_http_simple *a_http_simple, void * a_arg) char l_channels_str[sizeof(ss->active_channels)]; dap_enc_key_type_t l_enc_type = s_socket_forward_key.type; int l_enc_headers = 0; - bool l_is_legacy=false; - int l_url_sscanf_res = sscanf(l_dg->url_path, "stream_ctl,channels=%16s,enc_type=%d,enc_headers=%d", l_channels_str, &l_enc_type, &l_enc_headers); - if(l_url_sscanf_res > 0){ - if(l_url_sscanf_res < 3){ - log_it(L_INFO, "legacy encryption mode used (OAES)"); - l_enc_type = DAP_ENC_KEY_TYPE_OAES; - l_is_legacy = true; + bool l_is_legacy=true; + char * l_tok_tmp = NULL; + char * l_tok = strtok_r(l_dg->url_path, ",",&l_tok_tmp) ; + do { + char * l_subtok_tmp = NULL; + char * l_subtok_name = strtok_r(l_tok, "=",&l_subtok_tmp); + char * l_subtok_value = strtok_r(NULL, "=",&l_subtok_tmp); + if (l_subtok_value){ + log_it(L_DEBUG, "tok = %s value =%s",l_subtok_name,l_subtok_value); + if ( strcmp(l_subtok_name,"channels")==0 ){ + strncpy(l_channels_str,l_subtok_value,sizeof (l_channels_str)-1); + log_it(L_DEBUG,"Param: channels=%s",l_channels_str); + }else if(strcmp(l_subtok_name,"enc_type")==0){ + l_enc_type = atoi(l_subtok_value); + log_it(L_DEBUG,"Param: enc_type=%s",dap_enc_get_type_name(l_enc_type)); + l_is_legacy = false; + }else if(strcmp(l_subtok_name,"enc_headers")==0){ + l_enc_headers = atoi(l_subtok_value); + log_it(L_DEBUG,"Param: enc_headers=%d",l_enc_headers); + } } + l_tok = strtok_r(NULL, ",",&l_tok_tmp) ; + } while(l_tok); + l_new_session = true; + if(l_is_legacy){ + log_it(L_INFO, "legacy encryption mode used (OAES)"); + l_enc_type = DAP_ENC_KEY_TYPE_OAES; l_new_session = true; } - else if(strcmp(l_dg->url_path, "socket_forward" ) == 0) { - l_channels_str[0] = '\0'; - l_new_session = true; - } - else{ - log_it(L_ERROR,"ctl command unknown: %s",l_dg->url_path); - enc_http_delegate_delete(l_dg); - *return_code = Http_Status_MethodNotAllowed; - return; - } if(l_new_session){ ss = dap_stream_session_pure_new(); diff --git a/modules/chain/dap_chain_ledger.c b/modules/chain/dap_chain_ledger.c index e7723c66b7..91ab38c600 100644 --- a/modules/chain/dap_chain_ledger.c +++ b/modules/chain/dap_chain_ledger.c @@ -69,6 +69,7 @@ static pthread_rwlock_t s_verificators_rwlock; typedef struct dap_chain_ledger_token_emission_item { dap_chain_hash_fast_t datum_token_emission_hash; dap_chain_datum_token_emission_t *datum_token_emission; + size_t datum_token_emission_size; UT_hash_handle hh; } dap_chain_ledger_token_emission_item_t; @@ -290,7 +291,7 @@ int dap_chain_ledger_token_add(dap_ledger_t * a_ledger, dap_chain_datum_token_t // Add it to cache dap_chain_datum_token_t *l_token_cache = DAP_NEW_Z_SIZE(dap_chain_datum_token_t, a_token_size); memcpy(l_token_cache, a_token, a_token_size); - char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "tokens"); + char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, DAP_CHAIN_LEDGER_TOKENS_STR); if (!dap_chain_global_db_gr_set(dap_strdup(a_token->ticker), l_token_cache, a_token_size, l_gdb_group)) { log_it(L_WARNING, "Ledger cache mismatch"); DAP_DELETE(l_token_cache); @@ -332,20 +333,46 @@ int dap_chain_ledger_token_load(dap_ledger_t *a_ledger, dap_chain_datum_token_t * @brief s_treshold_emissions_proc * @param a_ledger */ -static void s_treshold_emissions_proc( dap_ledger_t * a_ledger) +static void s_treshold_emissions_proc(dap_ledger_t * a_ledger) { - UNUSED(a_ledger); - // TODO + bool l_success; + do { + l_success = false; + dap_chain_ledger_token_emission_item_t *l_emission_item, *l_emission_tmp; + HASH_ITER(hh, PVT(a_ledger)->treshold_emissions, l_emission_item, l_emission_tmp) { + int l_res = dap_chain_ledger_token_emission_add(a_ledger, l_emission_item->datum_token_emission, + l_emission_item->datum_token_emission_size); + if (!l_res) { + HASH_DEL(PVT(a_ledger)->treshold_emissions, l_emission_item); + DAP_DELETE(l_emission_item->datum_token_emission); + DAP_DELETE(l_emission_item); + l_success = true; + } + } + } while (l_success); } /** * @brief s_treshold_txs_proc * @param a_ledger */ -static void s_treshold_txs_proc( dap_ledger_t * a_ledger) +static void s_treshold_txs_proc( dap_ledger_t *a_ledger) { - UNUSED(a_ledger); - // TODO + bool l_success; + do { + l_success = false; + dap_chain_ledger_tx_item_t *l_tx_item, *l_tx_tmp; + HASH_ITER(hh, PVT(a_ledger)->treshold_txs, l_tx_item, l_tx_tmp) { + int l_res = dap_chain_ledger_tx_add(a_ledger, l_tx_item->tx); + if (!l_res) { + HASH_DEL(PVT(a_ledger)->treshold_txs, l_tx_item); + DAP_DELETE(l_tx_item->tx); + DAP_DELETE(l_tx_item); + l_success = true; + } + } + } while (l_success); + } @@ -353,7 +380,7 @@ void dap_chain_ledger_load_cache(dap_ledger_t *a_ledger) { dap_ledger_private_t *l_ledger_pvt = PVT(a_ledger); - char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "tokens"); + char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, DAP_CHAIN_LEDGER_TOKENS_STR); size_t l_objs_count = 0; dap_global_db_obj_t *l_objs = dap_chain_global_db_gr_load(l_gdb_group, &l_objs_count); for (size_t i = 0; i < l_objs_count; i++) { @@ -376,7 +403,7 @@ void dap_chain_ledger_load_cache(dap_ledger_t *a_ledger) l_ledger_pvt->last_ticker.found = true; } - l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "emissions"); + l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, DAP_CHAIN_LEDGER_EMISSIONS_STR); l_objs_count = 0; l_objs = dap_chain_global_db_gr_load(l_gdb_group, &l_objs_count); for (size_t i = 0; i < l_objs_count; i++) { @@ -404,7 +431,7 @@ void dap_chain_ledger_load_cache(dap_ledger_t *a_ledger) l_ledger_pvt->last_emit.found = true; } - l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "txs"); + l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, DAP_CHAIN_LEDGER_TXS_STR); l_objs_count = 0; l_objs = dap_chain_global_db_gr_load(l_gdb_group, &l_objs_count); for (size_t i = 0; i < l_objs_count; i++) { @@ -424,10 +451,26 @@ void dap_chain_ledger_load_cache(dap_ledger_t *a_ledger) l_ledger_pvt->last_tx.found = true; } - //TODO add tx threshold cache if need - l_ledger_pvt->last_thres_tx.found = true; + l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, DAP_CHAIN_LEDGER_TXS_THRES_STR); + l_objs_count = 0; + l_objs = dap_chain_global_db_gr_load(l_gdb_group, &l_objs_count); + for (size_t i = 0; i < l_objs_count; i++) { + dap_chain_ledger_tx_item_t *l_tx_item = DAP_NEW_Z(dap_chain_ledger_tx_item_t); + dap_chain_str_to_hash_fast(l_objs[i].key, &l_tx_item->tx_hash_fast); + l_tx_item->tx = DAP_NEW_SIZE(dap_chain_datum_tx_t, l_objs[i].value_len); + memcpy(l_tx_item->tx, l_objs[i].value, l_objs[i].value_len); + HASH_ADD(hh, l_ledger_pvt->treshold_txs, tx_hash_fast, sizeof(dap_chain_hash_fast_t), l_tx_item); + if (i == l_objs_count - 1) { + PVT(a_ledger)->last_thres_tx.hash = &l_tx_item->tx_hash_fast; + } + } + dap_chain_global_db_objs_delete(l_objs, l_objs_count); + DAP_DELETE(l_gdb_group); + if (l_objs_count == 0) { + l_ledger_pvt->last_thres_tx.found = true; + } - l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "balances"); + l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, DAP_CHAIN_LEDGER_BALANCES_STR); l_objs_count = 0; l_objs = dap_chain_global_db_gr_load(l_gdb_group, &l_objs_count); for (size_t i = 0; i < l_objs_count; i++) { @@ -546,6 +589,7 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, memcpy(l_token_emission_item->datum_token_emission, a_token_emission, a_token_emission_size); memcpy(&l_token_emission_item->datum_token_emission_hash, l_token_emission_hash_ptr, sizeof(l_token_emission_hash)); + l_token_emission_item->datum_token_emission_size = a_token_emission_size; if (l_token_item) { HASH_ADD(hh, l_token_item->token_emissions, datum_token_emission_hash, sizeof(l_token_emission_hash), l_token_emission_item); @@ -556,13 +600,12 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, // Add it to cache dap_chain_datum_token_emission_t *l_emission_cache = DAP_NEW_Z_SIZE(dap_chain_datum_token_emission_t, a_token_emission_size); memcpy(l_emission_cache, a_token_emission, a_token_emission_size); - char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "emissions"); + char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, DAP_CHAIN_LEDGER_EMISSIONS_STR); if (!dap_chain_global_db_gr_set(dap_strdup(l_hash_str), l_emission_cache, a_token_emission_size, l_gdb_group)) { log_it(L_WARNING, "Ledger cache mismatch"); DAP_DELETE(l_emission_cache); } DAP_DELETE(l_gdb_group); - char * l_token_emission_address_str = dap_chain_addr_to_str( &(a_token_emission->hdr.address) ); log_it(L_NOTICE, "Added token emission datum to %s: type=%s value=%.1llf token=%s to_addr=%s ", @@ -577,8 +620,10 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, ret = -2; } } else { - log_it(L_ERROR, "Can't add token emission datum of %llu %s ( %s )", + if (l_token_item) { + log_it(L_ERROR, "Can't add token emission datum of %llu %s ( %s )", a_token_emission->hdr.value, c_token_ticker, l_hash_str); + } ret = -1; } pthread_rwlock_unlock(l_token_item ? @@ -913,7 +958,7 @@ int dap_chain_ledger_tx_cache_check(dap_ledger_t *a_ledger, dap_chain_datum_tx_t bound_item->item_out = l_item_out; if(!l_tx_prev) { // First transaction log_it(L_DEBUG,"No previous transaction was found for hash %s",l_tx_prev_hash_str); - l_err_num = -5; + l_err_num = DAP_CHAIN_LEDGER_TX_NO_PREVIOUS; break; } //log_it(L_INFO,"Previous transaction was found for hash %s",l_tx_prev_hash_str); @@ -1196,7 +1241,7 @@ int dap_chain_ledger_tx_add_check(dap_ledger_t *a_ledger, dap_chain_datum_tx_t * int dap_chain_ledger_balance_cache_update(dap_ledger_t *a_ledger, dap_ledger_wallet_balance_t *a_balance) { - char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "balances"); + char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, DAP_CHAIN_LEDGER_BALANCES_STR); uint128_t *l_balance_value = DAP_NEW_Z(uint128_t); *l_balance_value = a_balance->balance; if (!dap_chain_global_db_gr_set(dap_strdup(a_balance->key), l_balance_value, sizeof(uint128_t), l_gdb_group)) { @@ -1222,39 +1267,62 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx) dap_ledger_private_t *l_ledger_priv = PVT(a_ledger); dap_list_t *l_list_bound_items = NULL; dap_list_t *l_list_tx_out = NULL; + dap_chain_ledger_tx_item_t *l_item_tmp = NULL; dap_chain_hash_fast_t *l_tx_hash = dap_chain_node_datum_tx_calc_hash(a_tx); char l_tx_hash_str[70]; dap_chain_hash_fast_to_str(l_tx_hash,l_tx_hash_str,sizeof(l_tx_hash_str)); - int l_ret_check; - if( (l_ret_check = dap_chain_ledger_tx_cache_check( - a_ledger, a_tx, &l_list_bound_items, &l_list_tx_out)) < 0){ - log_it (L_WARNING, "dap_chain_ledger_tx_add() tx %s not passed the check: code %d ",l_tx_hash_str, l_ret_check); - return -1; - } - log_it ( L_DEBUG, "dap_chain_ledger_tx_add() check passed for tx %s",l_tx_hash_str); - - - char *l_token_ticker = NULL, *l_token_ticker_old = NULL; - dap_chain_ledger_tx_item_t *l_item_tmp = NULL; pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); - HASH_FIND(hh, l_ledger_priv->ledger_items, l_tx_hash, sizeof(dap_chain_hash_fast_t), l_item_tmp); // tx_hash already in the hash? + HASH_FIND(hh, l_ledger_priv->ledger_items, l_tx_hash, sizeof(dap_chain_hash_fast_t), l_item_tmp); + pthread_rwlock_unlock(&l_ledger_priv->ledger_rwlock); // transaction already present in the cache list - if(l_item_tmp) { - // delete transaction from the cache list - //ret = dap_chain_ledger_tx_remove(a_ledger, l_tx_hash); - // there should be no duplication - char * l_hash_str = dap_chain_hash_fast_to_str_new(l_tx_hash); - log_it(L_WARNING, "Transaction (hash=%s) deleted from cache because there is an attempt to add it to cache", - l_hash_str); - DAP_DELETE(l_hash_str); + if (l_item_tmp) { + log_it(L_WARNING, "Transaction %s already present in the cache", l_tx_hash_str); ret = 1; goto FIN; } - if (ret == -1) { + + int l_ret_check; + l_item_tmp = NULL; + if( (l_ret_check = dap_chain_ledger_tx_cache_check( + a_ledger, a_tx, &l_list_bound_items, &l_list_tx_out)) < 0) { + if (l_ret_check == DAP_CHAIN_LEDGER_TX_NO_PREVIOUS) { + HASH_FIND(hh, l_ledger_priv->treshold_txs, l_tx_hash, sizeof(*l_tx_hash), l_item_tmp); + if (!l_item_tmp) { + if (HASH_COUNT(l_ledger_priv->treshold_txs) >= s_treshold_txs_max) { + log_it(L_WARNING,"Treshold for tranactions is overfulled (%lu max), dropping down new data, added nothing", + s_treshold_txs_max); + } else { + l_item_tmp = DAP_NEW_Z(dap_chain_ledger_tx_item_t); + memcpy(&l_item_tmp->tx_hash_fast, l_tx_hash, sizeof(dap_chain_hash_fast_t)); + size_t l_tx_size = dap_chain_datum_tx_get_size(a_tx); + l_item_tmp->tx = DAP_NEW_SIZE(dap_chain_datum_tx_t, l_tx_size); + memcpy(l_item_tmp->tx, a_tx, l_tx_size); + pthread_rwlock_wrlock(&l_ledger_priv->treshold_txs_rwlock); + HASH_ADD(hh, l_ledger_priv->treshold_txs, tx_hash_fast, sizeof(dap_chain_hash_fast_t), l_item_tmp); + pthread_rwlock_unlock(&l_ledger_priv->treshold_txs_rwlock); + log_it (L_DEBUG, "dap_chain_ledger_tx_add() tx %s added to threshold", l_tx_hash_str); + // Add it to cache + dap_chain_datum_tx_t *l_tx_cache = DAP_NEW_Z_SIZE(dap_chain_datum_tx_t, l_tx_size); + memcpy(l_tx_cache, a_tx, l_tx_size); + char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, DAP_CHAIN_LEDGER_TXS_THRES_STR); + if (!dap_chain_global_db_gr_set(dap_strdup(l_tx_hash_str), l_tx_cache, l_tx_size, l_gdb_group)) { + log_it(L_WARNING, "Ledger cache mismatch"); + DAP_DELETE(l_tx_cache); + } + DAP_DELETE(l_gdb_group); + } + } + } else { + log_it (L_WARNING, "dap_chain_ledger_tx_add() tx %s not passed the check: code %d ",l_tx_hash_str, l_ret_check); + } + ret = l_ret_check; goto FIN; } + log_it ( L_DEBUG, "dap_chain_ledger_tx_add() check passed for tx %s",l_tx_hash_str); + + char *l_token_ticker = NULL, *l_token_ticker_old = NULL; bool l_multichannel = false; // Mark 'out' items in cache if they were used & delete previous transactions from cache if it need // find all bound pairs 'in' and 'out' @@ -1320,14 +1388,14 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx) int res = dap_chain_ledger_tx_remove(a_ledger, &l_tx_prev_hash_to_del); if(res == -2) { log_it(L_ERROR, "Can't delete previous transactions because hash=0x%x not found", l_tx_prev_hash_str); - ret = -2; + ret = -100; DAP_DELETE(l_tx_prev_hash_str); dap_list_free_full(l_list_bound_items, free); goto FIN; } else if(res != 1) { log_it(L_ERROR, "Can't delete previous transactions with hash=0x%x", l_tx_prev_hash_str); - ret = -3; + ret = -101; DAP_DELETE(l_tx_prev_hash_str); dap_list_free_full(l_list_bound_items, free); goto FIN; @@ -1465,22 +1533,23 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx) strncpy(l_item_tmp->cache_data.token_tiker, l_token_ticker, sizeof(l_item_tmp->cache_data.token_tiker) - 1); size_t l_tx_size = dap_chain_datum_tx_get_size(a_tx); memcpy(l_item_tmp->tx, a_tx, l_tx_size); - HASH_ADD(hh, l_ledger_priv->ledger_items, tx_hash_fast, sizeof(dap_chain_hash_fast_t), l_item_tmp); // tx_hash_fast: name of key field + pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); + HASH_ADD(hh, l_ledger_priv->ledger_items, tx_hash_fast, sizeof(dap_chain_hash_fast_t), l_item_tmp); // tx_hash_fast: name of key field + pthread_rwlock_unlock(&l_ledger_priv->ledger_rwlock); // Add it to cache uint8_t *l_tx_cache = DAP_NEW_Z_SIZE(uint8_t, l_tx_size + sizeof(l_item_tmp->cache_data)); memcpy(l_tx_cache, &l_item_tmp->cache_data, sizeof(l_item_tmp->cache_data)); memcpy(l_tx_cache + sizeof(l_item_tmp->cache_data), a_tx, l_tx_size); - char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "txs"); + char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, DAP_CHAIN_LEDGER_TXS_STR); if (!dap_chain_global_db_gr_set(dap_strdup(l_tx_hash_str), l_tx_cache, l_tx_size + sizeof(l_item_tmp->cache_data), l_gdb_group)) { log_it(L_WARNING, "Ledger cache mismatch"); DAP_DELETE(l_tx_cache); } DAP_DELETE(l_gdb_group); + s_treshold_txs_proc(a_ledger); ret = 1; } FIN: - pthread_rwlock_tryrdlock (&l_ledger_priv->ledger_rwlock); - pthread_rwlock_unlock(&l_ledger_priv->ledger_rwlock); DAP_DELETE(l_tx_hash); return ret; } @@ -1525,7 +1594,7 @@ int dap_chain_ledger_tx_remove(dap_ledger_t *a_ledger, dap_chain_hash_fast_t *a_ // del struct for hash DAP_DELETE(l_item_tmp); // Remove it from cache - char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "txs"); + char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, DAP_CHAIN_LEDGER_TXS_STR); dap_chain_global_db_gr_del(dap_chain_hash_fast_to_str_new(a_tx_hash), l_gdb_group); DAP_DELETE(l_gdb_group); l_ret = 1; @@ -1552,7 +1621,7 @@ void dap_chain_ledger_purge(dap_ledger_t *a_ledger) // delete transactions dap_chain_ledger_tx_item_t *l_item_current, *l_item_tmp; - char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "txs"); + char *l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, DAP_CHAIN_LEDGER_TXS_STR); HASH_ITER(hh, l_ledger_priv->ledger_items , l_item_current, l_item_tmp) { DAP_DELETE(l_item_current->tx); HASH_DEL(l_ledger_priv->ledger_items, l_item_current); @@ -1563,7 +1632,7 @@ void dap_chain_ledger_purge(dap_ledger_t *a_ledger) DAP_DELETE(l_gdb_group); // delete threshold txs - l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "thres_txs"); + l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, DAP_CHAIN_LEDGER_TXS_THRES_STR); HASH_ITER(hh, l_ledger_priv->treshold_txs, l_item_current, l_item_tmp) { HASH_DEL(l_ledger_priv->treshold_txs, l_item_current); dap_chain_hash_fast_to_str(&l_item_current->tx_hash_fast, l_hash_str, l_hash_str_size); @@ -1575,7 +1644,7 @@ void dap_chain_ledger_purge(dap_ledger_t *a_ledger) // delete balances dap_ledger_wallet_balance_t *l_balance_current, *l_balance_tmp; - l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "balances"); + l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, DAP_CHAIN_LEDGER_BALANCES_STR); HASH_ITER(hh, l_ledger_priv->balance_accounts, l_balance_current, l_balance_tmp) { HASH_DEL(l_ledger_priv->balance_accounts, l_balance_current); dap_chain_global_db_gr_del(l_balance_current->key, l_gdb_group); @@ -1586,7 +1655,7 @@ void dap_chain_ledger_purge(dap_ledger_t *a_ledger) // delete threshold emissions dap_chain_ledger_token_emission_item_t *l_emission_current, *l_emission_tmp; - char *l_emissions_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "emissions"); + char *l_emissions_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, DAP_CHAIN_LEDGER_EMISSIONS_STR); HASH_ITER(hh, l_ledger_priv->treshold_emissions, l_emission_current, l_emission_tmp) { HASH_DEL(l_ledger_priv->treshold_emissions, l_emission_current); dap_chain_hash_fast_to_str(&l_emission_current->datum_token_emission_hash, l_hash_str, l_hash_str_size); @@ -1597,7 +1666,7 @@ void dap_chain_ledger_purge(dap_ledger_t *a_ledger) // delete tokens & its emissions dap_chain_ledger_token_item_t *l_token_current, *l_token_tmp; - l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, "tokens"); + l_gdb_group = dap_chain_ledger_get_gdb_group(a_ledger, DAP_CHAIN_LEDGER_TOKENS_STR); HASH_ITER(hh, l_ledger_priv->tokens, l_token_current, l_token_tmp) { HASH_DEL(l_ledger_priv->tokens, l_token_current); HASH_ITER(hh, l_token_current->token_emissions, l_emission_current, l_emission_tmp) { diff --git a/modules/chain/include/dap_chain_ledger.h b/modules/chain/include/dap_chain_ledger.h index 3eb630475f..c29478efed 100644 --- a/modules/chain/include/dap_chain_ledger.h +++ b/modules/chain/include/dap_chain_ledger.h @@ -53,8 +53,18 @@ typedef bool (* dap_chain_ledger_verificator_callback_t)(dap_chain_tx_out_cond_t // Check the double spending in all cells #define DAP_CHAIN_LEDGER_CHECK_CELLS_DS 0x0100 +// Error code for no previous transaction (candidate to threshold) +#define DAP_CHAIN_LEDGER_TX_NO_PREVIOUS -5 + +#define DAP_CHAIN_LEDGER_TOKENS_STR "tokens" +#define DAP_CHAIN_LEDGER_EMISSIONS_STR "emissions" +#define DAP_CHAIN_LEDGER_TXS_STR "txs" +#define DAP_CHAIN_LEDGER_TXS_THRES_STR "thres_txs" +#define DAP_CHAIN_LEDGER_BALANCES_STR "balances" + dap_ledger_t* dap_chain_ledger_create(uint16_t a_check_flags, char *a_net_name); + // Remove dap_ledger_t structure void dap_chain_ledger_handle_free(dap_ledger_t *a_ledger); diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index e7a5ce4f4d..fb32aab1c4 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -232,12 +232,13 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) size_t l_atom_size =0; if ( l_chain->callback_atom_find_by_hash(l_atom_iter, &l_atom_hash, &l_atom_size) == NULL ) { dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size); - if(l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) { + if (l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) { // append to file dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_ch_chain->request_cell_id); - if(l_cell){ + int l_res; + if (l_cell) { // add one atom only - int l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size); + l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size); // rewrite all file //l_res = dap_chain_cell_file_update(l_cell); if(l_res < 0) { @@ -245,15 +246,17 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) l_cell ? l_cell->file_storage_path : "[null]"); } // add all atoms from treshold - if(l_res && l_chain->callback_atom_add_from_treshold){ + if (l_chain->callback_atom_add_from_treshold){ dap_chain_atom_ptr_t l_atom_treshold; do{ size_t l_atom_treshold_size; // add into ledger + log_it(L_DEBUG, "Try to add atom from treshold"); l_atom_treshold = l_chain->callback_atom_add_from_treshold(l_chain, &l_atom_treshold_size); // add into file if(l_atom_treshold) { - int l_res = dap_chain_cell_file_append(l_cell, l_atom_treshold, l_atom_treshold_size); + l_res = dap_chain_cell_file_append(l_cell, l_atom_treshold, l_atom_treshold_size); + log_it(L_DEBUG, "Added atom from treshold"); if(l_res < 0) { log_it(L_ERROR, "Can't save event 0x%x from treshold to the file '%s'", l_atom_treshold, l_cell ? l_cell->file_storage_path : "[null]"); @@ -464,7 +467,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); } dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_chains_callback, a_ch); + dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_sync_chains_callback, a_ch); } } } @@ -488,7 +491,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_gdb_callback, a_ch); + dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_sync_gdb_callback, a_ch); } else { log_it(L_ERROR, "Get DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB session_id=%u bad request", @@ -516,11 +519,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); - l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size); + l_ch_chain->pkt_data = DAP_NEW_SIZE(byte_t, l_chain_pkt_data_size); memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); l_ch_chain->pkt_data_size = l_chain_pkt_data_size; dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_chain_pkt_callback, a_ch); + dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_chain_pkt_callback, a_ch); } else { log_it(L_WARNING, "Empty chain packet"); dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, @@ -549,7 +552,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); l_ch_chain->pkt_data_size = l_chain_pkt_data_size; dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_gdb_pkt_callback, a_ch); + dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_gdb_pkt_callback, a_ch); } else { log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size"); dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, @@ -741,6 +744,6 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); if (l_ch_chain && l_ch_chain->state != CHAIN_STATE_IDLE) { dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_out_pkt_callback, a_ch); + dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_out_pkt_callback, a_ch); } } diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index 17992a7de0..931d2b9714 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -54,7 +54,7 @@ typedef struct dap_stream_ch_chain { dap_stream_ch_chain_state_t state; dap_chain_atom_iter_t * request_atom_iter; - uint8_t *pkt_data; + byte_t *pkt_data; uint64_t pkt_data_size; uint64_t stats_request_atoms_processed; uint64_t stats_request_gdb_processed; diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 567229f2ef..18e9ed2b3c 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -420,7 +420,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) default: { // Get DNS request result from root nodes as synchronization links int l_max_tries = 5; - int l_tries =0; + int l_tries = 0; while (dap_list_length(l_pvt_net->links_info) < s_max_links_count && l_tries < l_max_tries) { int i = rand() % l_pvt_net->seed_aliases_count; dap_chain_node_addr_t *l_remote_addr = dap_chain_node_alias_find(l_net, l_pvt_net->seed_aliases[i]); @@ -428,9 +428,9 @@ static int s_net_states_proc(dap_chain_net_t * l_net) if(l_remote_node_info) { dap_chain_node_info_t *l_link_node_info = DAP_NEW_Z(dap_chain_node_info_t); int l_res = dap_dns_client_get_addr(l_remote_node_info->hdr.ext_addr_v4.s_addr, l_net->pub.name, l_link_node_info); - //memcpy(l_link_node_info, l_remote_node_info, sizeof(dap_chain_node_info_t)); if (!l_res && l_link_node_info->hdr.address.uint64 != l_own_addr) { l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info); + l_tries = 0; } DAP_DELETE(l_remote_node_info); } @@ -443,8 +443,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) } l_tries++; } - - if (!l_pvt_net->links){ + if (dap_list_length(l_pvt_net->links_info) < s_max_links_count) { for (int i = 0; i < MIN(s_max_links_count, l_pvt_net->seed_aliases_count); i++) { dap_chain_node_addr_t *l_link_addr = dap_chain_node_alias_find(l_net, l_pvt_net->seed_aliases[i]); if (l_link_addr->uint64 == l_own_addr) { diff --git a/modules/net/dap_chain_node_cli_cmd.c b/modules/net/dap_chain_node_cli_cmd.c index c74f2a5ef4..9557e2ba43 100644 --- a/modules/net/dap_chain_node_cli_cmd.c +++ b/modules/net/dap_chain_node_cli_cmd.c @@ -3905,7 +3905,9 @@ int com_stats(int argc, char ** argv, void *arg_func, char **str_reply) int com_exit(int argc, char ** argv, void *arg_func, char **str_reply) { + //dap_events_stop_all(); exit(0); + return 0; } diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 5c0f1a14f1..5794df044b 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -215,6 +215,13 @@ int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg) log_it( L_ERROR, "Can't read hash from static_genesis_event \"%s\", ret code %d ", l_static_genesis_event_hash_str, lhr); } } + uint16_t l_list_len = 0; + char **l_hard_accept_list = dap_config_get_array_str(a_chain_cfg, "dag", "hard_accept_list", &l_list_len); + for (uint16_t i = 0; i < l_list_len; i++) { + dap_chain_cs_dag_hal_item_t *l_hal_item = DAP_NEW_Z(dap_chain_cs_dag_hal_item_t); + dap_chain_str_to_hash_fast(l_hard_accept_list[i], &l_hal_item->hash); + HASH_ADD(hh, l_dag->hal, hash, sizeof(l_hal_item->hash), l_hal_item); + } l_dag->is_static_genesis_event = (l_static_genesis_event_hash_str != NULL) && dap_config_get_item_bool_default(a_chain_cfg,"dag","is_static_genesis_event",false); @@ -282,61 +289,57 @@ void dap_chain_cs_dag_delete(dap_chain_t * a_chain) static int s_dap_chain_add_atom_to_ledger(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger, dap_chain_cs_dag_event_item_t * a_event_item){ - dap_chain_datum_t *l_datum = (dap_chain_datum_t*) dap_chain_cs_dag_event_get_datum(a_event_item->event, a_event_item->event_size); - switch (l_datum->header.type_id) { - case DAP_CHAIN_DATUM_TOKEN_DECL: { - dap_chain_datum_token_t *l_token = (dap_chain_datum_token_t*) l_datum->data; - return dap_chain_ledger_token_load(a_ledger, l_token, l_datum->header.data_size); - } - break; - case DAP_CHAIN_DATUM_TOKEN_EMISSION: { - dap_chain_datum_token_emission_t *l_token_emission = (dap_chain_datum_token_emission_t*) l_datum->data; - return dap_chain_ledger_token_emission_load(a_ledger, l_token_emission, l_datum->header.data_size); - } - break; - case DAP_CHAIN_DATUM_TX: { - dap_chain_datum_tx_t *l_tx = (dap_chain_datum_tx_t*) l_datum->data; - dap_chain_cs_dag_event_item_t * l_tx_event= DAP_NEW_Z(dap_chain_cs_dag_event_item_t); - l_tx_event->ts_added = a_event_item->ts_added; - l_tx_event->event = a_event_item->event; - l_tx_event->event_size = a_event_item->event_size; - memcpy(&l_tx_event->hash, &a_event_item->hash, sizeof (l_tx_event->hash) ); - - - HASH_ADD(hh,PVT(a_dag)->tx_events,hash,sizeof (l_tx_event->hash),l_tx_event); + dap_chain_datum_t *l_datum = (dap_chain_datum_t*) dap_chain_cs_dag_event_get_datum(a_event_item->event, a_event_item->event_size); + switch (l_datum->header.type_id) { + case DAP_CHAIN_DATUM_TOKEN_DECL: { + dap_chain_datum_token_t *l_token = (dap_chain_datum_token_t*) l_datum->data; + return dap_chain_ledger_token_load(a_ledger, l_token, l_datum->header.data_size); + } + break; + case DAP_CHAIN_DATUM_TOKEN_EMISSION: { + dap_chain_datum_token_emission_t *l_token_emission = (dap_chain_datum_token_emission_t*) l_datum->data; + return dap_chain_ledger_token_emission_load(a_ledger, l_token_emission, l_datum->header.data_size); + } + break; + case DAP_CHAIN_DATUM_TX: { + dap_chain_datum_tx_t *l_tx = (dap_chain_datum_tx_t*) l_datum->data; + // don't save bad transactions to base + int l_ret = dap_chain_ledger_tx_load(a_ledger, l_tx); + if( l_ret != 1 ) { + return l_ret; + } + dap_chain_cs_dag_event_item_t * l_tx_event= DAP_NEW_Z(dap_chain_cs_dag_event_item_t); + l_tx_event->ts_added = a_event_item->ts_added; + l_tx_event->event = a_event_item->event; + l_tx_event->event_size = a_event_item->event_size; + memcpy(&l_tx_event->hash, &a_event_item->hash, sizeof (l_tx_event->hash) ); - // don't save bad transactions to base - if(dap_chain_ledger_tx_load(a_ledger, l_tx) != 1) { - return -1; - } + HASH_ADD(hh,PVT(a_dag)->tx_events,hash,sizeof (l_tx_event->hash),l_tx_event); + } + break; + default: + return -1; } - break; - default: - return -1; - } - return 0; + return 0; } -static int s_dap_chain_add_atom_to_events_table(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger, dap_chain_cs_dag_event_item_t * a_event_item ){ +static int s_dap_chain_add_atom_to_events_table(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger, dap_chain_cs_dag_event_item_t * a_event_item ) +{ int res = a_dag->callback_cs_verify(a_dag,a_event_item->event, a_event_item->event_size); - if (res != 0) { - if ( memcmp( &a_event_item->hash, &a_dag->static_genesis_event_hash, sizeof(a_event_item->hash) ) == 0 ){ - res = 0; - } - } - - if(res == 0){ - char l_buf_hash[128]; - dap_chain_hash_fast_to_str(&a_event_item->hash,l_buf_hash,sizeof(l_buf_hash)-1); + char l_buf_hash[128]; + dap_chain_hash_fast_to_str(&a_event_item->hash,l_buf_hash,sizeof(l_buf_hash)-1); + if (res == 0 || memcmp( &a_event_item->hash, &a_dag->static_genesis_event_hash, sizeof(a_event_item->hash) ) == 0) { log_it(L_DEBUG,"Dag event %s checked, add it to ledger", l_buf_hash); - s_dap_chain_add_atom_to_ledger(a_dag, a_ledger, a_event_item); + res = s_dap_chain_add_atom_to_ledger(a_dag, a_ledger, a_event_item); + if (res) { + log_it(L_DEBUG,"Dag event %s checked, but ledger declined", l_buf_hash); + return res; + } //All correct, no matter for result HASH_ADD(hh, PVT(a_dag)->events,hash,sizeof (a_event_item->hash), a_event_item); s_dag_events_lasts_process_new_last_event(a_dag, a_event_item); } else { - char l_buf_hash[128]; - dap_chain_hash_fast_to_str(&a_event_item->hash,l_buf_hash,sizeof(l_buf_hash)-1); log_it(L_WARNING,"Dag event %s check failed: code %d", l_buf_hash, res ); } return res; @@ -406,8 +409,12 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha int l_consensus_check = s_dap_chain_add_atom_to_events_table(l_dag, a_chain->ledger, l_event_item); if(!l_consensus_check){ log_it(L_DEBUG, "... added"); + }else if (l_consensus_check == DAP_CHAIN_LEDGER_TX_NO_PREVIOUS){ + HASH_ADD(hh, PVT(l_dag)->events_treshold, hash, sizeof(l_event_item->hash), l_event_item); + log_it(L_DEBUG, "... tresholded"); + ret = ATOM_MOVE_TO_THRESHOLD; }else{ - log_it(L_DEBUG, "... error adding"); + log_it(L_DEBUG, "... error adding (code %d)", l_consensus_check); ret = ATOM_REJECT; } } @@ -670,9 +677,19 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_verify(dap_chain_t * a_ dap_chain_atom_verify_res_t res = ATOM_ACCEPT; if(sizeof (l_event->header) >= a_atom_size){ - log_it(L_WARNING,"Size of atom is %zd that is equel or less then header %zd",a_atom_size,sizeof (l_event->header)); + log_it(L_WARNING,"Size of atom is %zd that is equal or less then header %zd",a_atom_size,sizeof (l_event->header)); return ATOM_REJECT; } + // Hard accept list + if (l_dag->hal) { + dap_chain_hash_fast_t l_event_hash; + dap_chain_cs_dag_event_calc_hash(l_event,a_atom_size, &l_event_hash); + dap_chain_cs_dag_hal_item_t *l_hash_found = NULL; + HASH_FIND(hh, l_dag->hal, &l_event_hash, sizeof(l_event_hash), l_hash_found); + if (l_hash_found) { + return ATOM_ACCEPT; + } + } // genesis or seed mode if (l_event->header.hash_count == 0){ if(s_seed_mode && !PVT(l_dag)->events){ diff --git a/modules/type/dag/include/dap_chain_cs_dag.h b/modules/type/dag/include/dap_chain_cs_dag.h index fab0c32016..43a823579d 100644 --- a/modules/type/dag/include/dap_chain_cs_dag.h +++ b/modules/type/dag/include/dap_chain_cs_dag.h @@ -22,6 +22,7 @@ along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. */ #pragma once +#include "uthash.h" #include "dap_chain.h" #include "dap_chain_cs_dag_event.h" @@ -36,6 +37,10 @@ typedef dap_chain_cs_dag_event_t * (*dap_chain_cs_dag_callback_event_create_t)(d dap_chain_datum_t *, dap_chain_hash_fast_t *, size_t, size_t*); +typedef struct dap_chain_cs_dag_hal_item { + dap_chain_hash_fast_t hash; + UT_hash_handle hh; +} dap_chain_cs_dag_hal_item_t; typedef struct dap_chain_cs_dag { @@ -45,6 +50,7 @@ typedef struct dap_chain_cs_dag bool is_add_directy; bool is_static_genesis_event; dap_chain_hash_fast_t static_genesis_event_hash; + dap_chain_cs_dag_hal_item_t *hal; uint16_t datum_add_hashes_count; char * gdb_group_events_round_new; -- GitLab