diff --git a/CMakeLists.txt b/CMakeLists.txt index c3b3b72aa92e10697ecf8ed12b730ebe9d533f12..0eb778919de0ac8c8c54163b5158d5c88b193bf8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.6-21") +set(CELLFRAME_SDK_NATIVE_VERSION "2.6-22") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 4c3a2e62f399d2bd349a85313f627b44685eb95d..86afea3b6d3b8f3b5b0bfbc43f5f4b9f6f60e933 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -139,7 +139,7 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, void * a_valu l_item_old = l_item; l_item=l_item->next; } - l_is_anybody_for_repeat &= (!l_is_finished); + l_is_anybody_for_repeat = !l_is_finished; } if(l_is_anybody_for_repeat) // Arm event if we have smth to proc again dap_events_socket_event_signal(a_esocket,1); diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 419569d8a512f7a2f404200192be1331db036ee3..0ed9f4c479c4c3bda75903e4e1e0c9d7afc5d89f 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -650,7 +650,7 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain) DAP_DEL_Z(a_ch_chain->request_atom_iter); } -bool s_process_gdb_iter(dap_stream_ch_t *a_ch) +static void s_process_gdb_iter(dap_stream_ch_t *a_ch) { dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs; @@ -670,27 +670,25 @@ bool s_process_gdb_iter(dap_stream_ch_t *a_ch) dap_list_free_full(l_ch_chain->db_iter, free); l_ch_chain->db_iter = NULL; } - return true; } -bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) +static bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) { UNUSED(a_thread); dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); //log_it( L_DEBUG,"s_stream_ch_packet_out state=%d", l_ch_chain ? l_ch_chain->state : -1); - // log_it( L_DEBUG,"l_ch_chain %X", l_ch_chain ); - bool l_packet_out = false; switch (l_ch_chain->state) { case CHAIN_STATE_IDLE: { dap_stream_ch_chain_go_idle(l_ch_chain); } break; + // Synchronize GDB case CHAIN_STATE_SYNC_GLOBAL_DB: { if (l_ch_chain->db_iter) { - l_packet_out = s_process_gdb_iter(l_ch); + s_process_gdb_iter(l_ch); } else { dap_global_db_obj_t *l_obj; do { // Get log diff @@ -703,9 +701,8 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) // Item not found, maybe it has deleted? Then go to the next item } while (l_obj); if (l_ch_chain->db_iter) { - l_packet_out = s_process_gdb_iter(l_ch); + s_process_gdb_iter(l_ch); } else { - //log_it(L_DEBUG, "l_obj == 0, STOP"); // free log list dap_db_log_list_delete(l_ch_chain->request_global_db_trs); l_ch_chain->request_global_db_trs = NULL; @@ -714,51 +711,50 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) // last message dap_stream_ch_chain_sync_request_t l_request = {}; dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, - l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); - l_packet_out = true; - - if(l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - NULL, 0, l_ch_chain->callback_notify_arg); + l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, + l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); dap_stream_ch_chain_go_idle(l_ch_chain); + if (l_ch_chain->callback_notify_packet_out) + l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + NULL, 0, l_ch_chain->callback_notify_arg); } } } break; // Synchronize chains case CHAIN_STATE_SYNC_CHAINS: { - //log_it(L_DEBUG, "CHAIN_STATE_SYNC_CHAINS"); - if (l_ch_chain->request_atom_iter->cur == NULL) { // All chains synced + if (l_ch_chain->request_atom_iter->cur) { // Process one chain from l_ch_chain->request_atom_iter + log_it(L_INFO, "Send one CHAIN packet len=%d", l_ch_chain->request_atom_iter->cur_size); + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id, + l_ch_chain->request_hdr.chain_id, l_ch_chain->request_hdr.cell_id, + l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size); + l_ch_chain->stats_request_atoms_processed++; + // Then get next atom and populate new last + l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); + } else { // All chains synced dap_stream_ch_chain_sync_request_t l_request = {}; // last message dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); - l_packet_out = true; log_it( L_DEBUG,"Synced: %llu atoms processed", l_ch_chain->stats_request_atoms_processed); dap_stream_ch_chain_go_idle(l_ch_chain); if (l_ch_chain->callback_notify_packet_out) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, NULL, 0, l_ch_chain->callback_notify_arg); - } else { // Process one chain from l_ch_chain->request_atom_iter - log_it(L_INFO, "Send one CHAIN packet len=%d", l_ch_chain->request_atom_iter->cur_size); - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id, - l_ch_chain->request_hdr.chain_id, l_ch_chain->request_hdr.cell_id, - l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size); - l_packet_out = true; - l_ch_chain->stats_request_atoms_processed++; - // Then get next atom and populate new last - l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); } } break; default: break; } - if (l_packet_out) { - dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); + if (l_ch->stream->esocket->buf_out_size > DAP_EVENTS_SOCKET_BUF - DAP_CHAIN_PKT_MAX_SIZE || + l_ch_chain->state == CHAIN_STATE_IDLE) { + if (l_ch->stream->esocket->buf_out_size) { + dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); + } + dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); + return true; } - dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); - return true; + return false; } /** @@ -769,7 +765,7 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) { (void) a_arg; - if (a_ch->stream->esocket->buf_out_size > DAP_EVENTS_SOCKET_BUF / 2) { + if (a_ch->stream->esocket->buf_out_size > DAP_EVENTS_SOCKET_BUF / 4) { return; } dap_stream_ch_set_ready_to_write_unsafe(a_ch, false); diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index 5adb61246d42934e69115b0bf82816a22ca7d678..f4c87986fc60f6acb85ce3f5698e9082f601e1db 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -33,6 +33,7 @@ #include "dap_stream_ch_chain_pkt.h" #include "uthash.h" +#define DAP_CHAIN_PKT_MAX_SIZE 25000 // WARNING: be sure to not exceed this limit typedef struct dap_stream_ch_chain dap_stream_ch_chain_t; typedef void (*dap_stream_ch_chain_callback_packet_t)(dap_stream_ch_chain_t*, uint8_t a_pkt_type, diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 29e2e3a087668e3bc5517ea280816cf1b45bf11c..c98f27266557d891af8b3a52caa0ae767d45e8fb 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -607,7 +607,7 @@ static int s_net_states_proc(dap_chain_net_t *a_net) dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, a_net->pub.id, l_chain->id, a_net->pub.cell_id, &l_request, sizeof(l_request)); // wait for finishing of request - int timeout_ms = 120000; // 2 min = 120 sec = 120 000 ms + int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms // TODO add progress info to console l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); switch (l_res) {