From 633df2826ef15cd4be178896af4d7d8136fe2ed8 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Sat, 24 Oct 2020 11:54:39 +0000 Subject: [PATCH] features-4576 --- CMakeLists.txt | 2 +- dap-sdk/net/core/dap_proc_thread.c | 2 +- modules/channel/chain/dap_stream_ch_chain.c | 60 +++++++++---------- .../chain/include/dap_stream_ch_chain.h | 1 + modules/net/dap_chain_net.c | 2 +- 5 files changed, 32 insertions(+), 35 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index c3b3b72aa9..0eb778919d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.6-21") +set(CELLFRAME_SDK_NATIVE_VERSION "2.6-22") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 4c3a2e62f3..86afea3b6d 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -139,7 +139,7 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, void * a_valu l_item_old = l_item; l_item=l_item->next; } - l_is_anybody_for_repeat &= (!l_is_finished); + l_is_anybody_for_repeat = !l_is_finished; } if(l_is_anybody_for_repeat) // Arm event if we have smth to proc again dap_events_socket_event_signal(a_esocket,1); diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 419569d8a5..0ed9f4c479 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -650,7 +650,7 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain) DAP_DEL_Z(a_ch_chain->request_atom_iter); } -bool s_process_gdb_iter(dap_stream_ch_t *a_ch) +static void s_process_gdb_iter(dap_stream_ch_t *a_ch) { dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs; @@ -670,27 +670,25 @@ bool s_process_gdb_iter(dap_stream_ch_t *a_ch) dap_list_free_full(l_ch_chain->db_iter, free); l_ch_chain->db_iter = NULL; } - return true; } -bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) +static bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) { UNUSED(a_thread); dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); //log_it( L_DEBUG,"s_stream_ch_packet_out state=%d", l_ch_chain ? l_ch_chain->state : -1); - // log_it( L_DEBUG,"l_ch_chain %X", l_ch_chain ); - bool l_packet_out = false; switch (l_ch_chain->state) { case CHAIN_STATE_IDLE: { dap_stream_ch_chain_go_idle(l_ch_chain); } break; + // Synchronize GDB case CHAIN_STATE_SYNC_GLOBAL_DB: { if (l_ch_chain->db_iter) { - l_packet_out = s_process_gdb_iter(l_ch); + s_process_gdb_iter(l_ch); } else { dap_global_db_obj_t *l_obj; do { // Get log diff @@ -703,9 +701,8 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) // Item not found, maybe it has deleted? Then go to the next item } while (l_obj); if (l_ch_chain->db_iter) { - l_packet_out = s_process_gdb_iter(l_ch); + s_process_gdb_iter(l_ch); } else { - //log_it(L_DEBUG, "l_obj == 0, STOP"); // free log list dap_db_log_list_delete(l_ch_chain->request_global_db_trs); l_ch_chain->request_global_db_trs = NULL; @@ -714,51 +711,50 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) // last message dap_stream_ch_chain_sync_request_t l_request = {}; dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, - l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); - l_packet_out = true; - - if(l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - NULL, 0, l_ch_chain->callback_notify_arg); + l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, + l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); dap_stream_ch_chain_go_idle(l_ch_chain); + if (l_ch_chain->callback_notify_packet_out) + l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + NULL, 0, l_ch_chain->callback_notify_arg); } } } break; // Synchronize chains case CHAIN_STATE_SYNC_CHAINS: { - //log_it(L_DEBUG, "CHAIN_STATE_SYNC_CHAINS"); - if (l_ch_chain->request_atom_iter->cur == NULL) { // All chains synced + if (l_ch_chain->request_atom_iter->cur) { // Process one chain from l_ch_chain->request_atom_iter + log_it(L_INFO, "Send one CHAIN packet len=%d", l_ch_chain->request_atom_iter->cur_size); + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id, + l_ch_chain->request_hdr.chain_id, l_ch_chain->request_hdr.cell_id, + l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size); + l_ch_chain->stats_request_atoms_processed++; + // Then get next atom and populate new last + l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); + } else { // All chains synced dap_stream_ch_chain_sync_request_t l_request = {}; // last message dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); - l_packet_out = true; log_it( L_DEBUG,"Synced: %llu atoms processed", l_ch_chain->stats_request_atoms_processed); dap_stream_ch_chain_go_idle(l_ch_chain); if (l_ch_chain->callback_notify_packet_out) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, NULL, 0, l_ch_chain->callback_notify_arg); - } else { // Process one chain from l_ch_chain->request_atom_iter - log_it(L_INFO, "Send one CHAIN packet len=%d", l_ch_chain->request_atom_iter->cur_size); - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id, - l_ch_chain->request_hdr.chain_id, l_ch_chain->request_hdr.cell_id, - l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size); - l_packet_out = true; - l_ch_chain->stats_request_atoms_processed++; - // Then get next atom and populate new last - l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); } } break; default: break; } - if (l_packet_out) { - dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); + if (l_ch->stream->esocket->buf_out_size > DAP_EVENTS_SOCKET_BUF - DAP_CHAIN_PKT_MAX_SIZE || + l_ch_chain->state == CHAIN_STATE_IDLE) { + if (l_ch->stream->esocket->buf_out_size) { + dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); + } + dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); + return true; } - dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); - return true; + return false; } /** @@ -769,7 +765,7 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) { (void) a_arg; - if (a_ch->stream->esocket->buf_out_size > DAP_EVENTS_SOCKET_BUF / 2) { + if (a_ch->stream->esocket->buf_out_size > DAP_EVENTS_SOCKET_BUF / 4) { return; } dap_stream_ch_set_ready_to_write_unsafe(a_ch, false); diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index 5adb61246d..f4c87986fc 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -33,6 +33,7 @@ #include "dap_stream_ch_chain_pkt.h" #include "uthash.h" +#define DAP_CHAIN_PKT_MAX_SIZE 25000 // WARNING: be sure to not exceed this limit typedef struct dap_stream_ch_chain dap_stream_ch_chain_t; typedef void (*dap_stream_ch_chain_callback_packet_t)(dap_stream_ch_chain_t*, uint8_t a_pkt_type, diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 29e2e3a087..c98f272665 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -607,7 +607,7 @@ static int s_net_states_proc(dap_chain_net_t *a_net) dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, a_net->pub.id, l_chain->id, a_net->pub.cell_id, &l_request, sizeof(l_request)); // wait for finishing of request - int timeout_ms = 120000; // 2 min = 120 sec = 120 000 ms + int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms // TODO add progress info to console l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); switch (l_res) { -- GitLab