diff --git a/dap-sdk b/dap-sdk index 73972ba85d3456f6ee317e9da315cbfa99444eac..15126aeba885a838db3984ceff83f72e213fbbca 160000 --- a/dap-sdk +++ b/dap-sdk @@ -1 +1 @@ -Subproject commit 73972ba85d3456f6ee317e9da315cbfa99444eac +Subproject commit 15126aeba885a838db3984ceff83f72e213fbbca diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 78c898dca95735101a0a100b3933165362e167be..18f97da5ca36e5a12c66b610e66c4da9b6595625 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -182,7 +182,6 @@ void dap_stream_ch_chain_deinit() */ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg) { - UNUSED(a_arg); a_ch->internal = DAP_NEW_Z(dap_stream_ch_chain_t); dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); @@ -192,46 +191,28 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg) #ifdef DAP_SYS_DEBUG atomic_fetch_add(&s_memstat[MEMSTAT$K_STM_CH_CHAIN].alloc_nr, 1); #endif - debug_if(s_debug_more, L_DEBUG, "[stm_ch_chain:%p] --- created chain:%p", a_ch, l_ch_chain); - - } /** - * @brief s_stream_ch_delete_in_proc - * @param a_thread - * @param a_arg - * @return + * @brief s_stream_ch_delete + * @param ch + * @param arg */ -static void s_stream_ch_delete_in_proc(dap_worker_t *a_worker, void *a_arg) +static void s_stream_ch_delete(dap_stream_ch_t *a_ch, void *a_arg) { - UNUSED(a_worker); - assert(a_arg); - dap_stream_ch_chain_t *l_ch_chain = (dap_stream_ch_chain_t *)a_arg; + UNUSED(a_arg); + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); if (l_ch_chain->callback_notify_packet_out) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_DELETE, NULL, 0, l_ch_chain->callback_notify_arg); s_ch_chain_go_idle(l_ch_chain); - DAP_DELETE(l_ch_chain); + debug_if(s_debug_more, L_DEBUG, "[stm_ch_chain:%p] --- deleted chain:%p", a_ch, l_ch_chain); + DAP_DEL_Z(a_ch->internal); #ifdef DAP_SYS_DEBUG - atomic_fetch_add(&s_memstat[MEMSTAT$K_STM_CH_CHAIN].free_nr, 1); + atomic_fetch_add(&s_memstat[MEMSTAT$K_STM_CH_CHAIN].free_nr, 1); #endif - -} - -/** - * @brief s_stream_ch_delete - * @param ch - * @param arg - */ -static void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) -{ - UNUSED(a_arg); - void *l_arg = a_ch->internal; - a_ch->internal = NULL; // To prevent its cleaning in worker - dap_worker_exec_callback_on(a_ch->stream_worker->worker, s_stream_ch_delete_in_proc, l_arg); } void dap_stream_ch_chain_reset_unsafe(dap_stream_ch_chain_t *a_ch_chain) @@ -967,8 +948,11 @@ static bool s_chain_timer_callback(void *a_arg) l_ch_chain->activity_timer = NULL; return false; } - if (l_ch_chain->state != CHAIN_STATE_WAITING && l_ch_chain->sent_breaks) - s_stream_ch_packet_out(l_ch, NULL); + if (l_ch_chain->state != CHAIN_STATE_WAITING && l_ch_chain->sent_breaks) { + s_stream_ch_packet_out(l_ch, a_arg); + if (l_ch_chain->activity_timer == NULL) + return false; + } // Sending dumb packet with nothing to inform remote thats we're just skiping atoms of GDB's, nothing freezed if (l_ch_chain->state == CHAIN_STATE_SYNC_CHAINS && l_ch_chain->sent_breaks >= 3 * DAP_SYNC_TICKS_PER_SECOND) { debug_if(s_debug_more, L_INFO, "Send one chain TSD packet"); @@ -1505,6 +1489,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_hash_to_str[0] ? l_hash_to_str: "(null)"); } + s_ch_chain_get_idle(l_ch_chain); + if (l_ch_chain->activity_timer) { + dap_timerfd_delete_unsafe(l_ch_chain->activity_timer); + l_ch_chain->activity_timer = NULL; + } if (!l_ch_chain->callback_notify_packet_in) { // we haven't node client waitng, so reply to other side dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if (!l_chain) { @@ -1716,10 +1705,8 @@ static void s_stream_ch_chain_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, u * @param ch * @param arg */ -void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) +void s_stream_ch_packet_out(dap_stream_ch_t *a_ch, void *a_arg) { - UNUSED(a_arg); - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); bool l_go_idle = false, l_was_sent_smth = false; switch (l_ch_chain->state) { @@ -1926,11 +1913,17 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) } break; default: break; } - if (l_go_idle) - s_ch_chain_go_idle(l_ch_chain); if (l_was_sent_smth) { s_chain_timer_reset(l_ch_chain); l_ch_chain->sent_breaks = 0; } else l_ch_chain->sent_breaks++; + if (l_go_idle) { + s_ch_chain_go_idle(l_ch_chain); + if (l_ch_chain->activity_timer) { + if (!a_arg) + dap_timerfd_delete_unsafe(l_ch_chain->activity_timer); + l_ch_chain->activity_timer = NULL; + } + } } diff --git a/modules/consensus/esbocs/dap_chain_cs_esbocs.c b/modules/consensus/esbocs/dap_chain_cs_esbocs.c index 38872ece932c7fab8be384fe2111743ea680fd9d..d48bfa1e6df4e1570e5e642bc974ca5f6889dfb4 100644 --- a/modules/consensus/esbocs/dap_chain_cs_esbocs.c +++ b/modules/consensus/esbocs/dap_chain_cs_esbocs.c @@ -303,7 +303,7 @@ static void s_callback_delete(dap_chain_cs_blocks_t *a_blocks) pthread_mutex_lock(&l_session->mutex); DL_DELETE(s_session_items, l_session); if (!s_session_items) - dap_timerfd_delete_mt(s_session_cs_timer); + dap_timerfd_delete_mt(s_session_cs_timer->worker, s_session_cs_timer->esocket_uuid); s_session_round_clear(l_session); dap_chain_esbocs_sync_item_t *l_item, *l_tmp; HASH_ITER(hh, l_session->sync_items, l_item, l_tmp) { @@ -481,7 +481,7 @@ static void s_session_round_new(dap_chain_esbocs_session_t *a_session) a_session->cur_round.id++; a_session->cur_round.sync_attempt++; - dap_timerfd_delete_mt(a_session->sync_timer); + dap_timerfd_delete_mt(a_session->sync_timer->worker, a_session->sync_timer->esocket_uuid); a_session->sync_timer = NULL; a_session->state = DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_START; a_session->ts_round_sync_start = 0; diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index b5241f93953dcb3fe0aca2eaf6d7b64242c5248a..1452fd26f9bb822bedf4ef72821c88baa173f297 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -651,8 +651,10 @@ static void s_net_link_remove(dap_chain_net_pvt_t *a_net_pvt, dap_chain_node_cli return; } HASH_DEL(a_net_pvt->net_links, l_link_found); - if (l_link_found->delay_timer) - dap_timerfd_delete_mt(l_link_found->delay_timer); + if (l_link_found->delay_timer) { + dap_timerfd_delete_mt(l_link_found->delay_timer->worker, l_link_found->delay_timer->esocket_uuid); + l_link_found->delay_timer = NULL; + } dap_chain_node_client_t *l_client = l_link_found->link; a_net_pvt->links_queue = dap_list_remove_all(a_net_pvt->links_queue, l_client); if (a_rebase) { @@ -687,9 +689,11 @@ static struct net_link *s_get_free_link(dap_chain_net_t *a_net) static bool s_net_link_callback_connect_delayed(void *a_arg) { - dap_chain_node_client_t *l_client = a_arg; + struct net_link *l_link = a_arg; + dap_chain_node_client_t *l_client = l_link->link; debug_if(s_debug_more, L_DEBUG, "Link "NODE_ADDR_FP_STR" started", NODE_ADDR_FP_ARGS_S(l_client->info->hdr.address)); dap_chain_node_client_connect(l_client, "CN"); + l_link->delay_timer = NULL; return false; } @@ -704,7 +708,7 @@ static bool s_net_link_start(dap_chain_net_t *a_net, struct net_link *a_link, ui return false; a_link->link = l_client; if (a_delay) { - a_link->delay_timer = dap_timerfd_start(a_delay * 1000, s_net_link_callback_connect_delayed, l_client); + a_link->delay_timer = dap_timerfd_start(a_delay * 1000, s_net_link_callback_connect_delayed, a_link); return true; } debug_if(s_debug_more, L_DEBUG, "Link "NODE_ADDR_FP_STR" started", NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address)); @@ -795,6 +799,7 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t *a_node_cl if (l_net_pvt->state_target != NET_STATE_OFFLINE) { pthread_mutex_lock(&l_net_pvt->uplinks_mutex); s_net_link_remove(l_net_pvt, a_node_client, l_net_pvt->only_static_links); + a_node_client->keep_connection = false; a_node_client->callbacks.delete = NULL; dap_chain_node_client_close_mt(a_node_client); // Remove it on next context iteration struct net_link *l_free_link = s_get_free_link(l_net); @@ -1175,6 +1180,8 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) struct net_link *l_link, *l_link_tmp; struct downlink *l_downlink, *l_dltmp; HASH_ITER(hh, l_net_pvt->net_links, l_link, l_link_tmp) { + if (l_link->delay_timer) + dap_timerfd_delete_mt(l_link->delay_timer->worker, l_link->delay_timer->esocket_uuid); if (l_link->link) { dap_chain_node_client_t *l_client = l_link->link; HASH_FIND(hh, l_net_pvt->downlinks, &l_client->ch_chain_uuid, diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 15f00c133079e0daadf23f41ee68f37c2b14e386..ef4db89a63414daeb17e10d5085fd2f2b80ef226 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -139,6 +139,13 @@ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) dap_chain_node_client_t *l_node_client = DAP_CHAIN_NODE_CLIENT(a_client); if(!l_node_client) return; + + if (l_node_client->sync_timer) { + // Disable timer, it will be restarted with new connection + dap_timerfd_delete_unsafe(l_node_client->sync_timer); + l_node_client->sync_timer = NULL; + } + // check for last attempt bool l_is_last_attempt = a_arg ? true : false; if (l_is_last_attempt) { @@ -162,17 +169,6 @@ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) l_node_client->callbacks.error(l_node_client, EINVAL, l_node_client->callbacks_arg); } -/** - * @brief s_node_client_connected_synchro_start_callback - * - * @param a_worker dap_worker_t - * @param a_arg void - */ -static void s_node_client_connected_synchro_start_callback(dap_worker_t *a_worker, void *a_arg) -{ - UNUSED(a_worker); - s_timer_update_states_callback(a_arg); -} /** * @brief dap_chain_node_client_start_sync @@ -253,8 +249,11 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) dap_chain_node_client_t *l_node_client = DAP_CHAIN_NODE_CLIENT(a_client); UNUSED(a_arg); if(l_node_client) { - log_it(L_NOTICE, "Stream connection with node " NODE_ADDR_FP_STR " established", - NODE_ADDR_FP_ARGS_S( l_node_client->remote_node_addr)); + char l_ip_addr_str[INET_ADDRSTRLEN] = {}; + inet_ntop(AF_INET, &l_node_client->info->hdr.ext_addr_v4, l_ip_addr_str, INET_ADDRSTRLEN); + log_it(L_NOTICE, "Stream connection with node "NODE_ADDR_FP_STR" (%s:%hu) established", + NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr), + l_ip_addr_str, l_node_client->info->hdr.ext_port); // set callbacks for C and N channels; for R and S it is not needed if (a_client->active_channels) { size_t l_channels_count = dap_strlen(a_client->active_channels); @@ -281,7 +280,7 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) l_node_client->stream_worker = l_stream->stream_worker; if (l_node_client->keep_connection) { if(l_node_client->stream_worker){ - dap_worker_exec_callback_on(l_stream->esocket->context->worker, s_node_client_connected_synchro_start_callback, l_node_client); + s_timer_update_states_callback(l_node_client); l_node_client->sync_timer = dap_timerfd_start_on_worker(l_stream->esocket->context->worker, s_timer_update_states * 1000, s_timer_update_states_callback, @@ -699,7 +698,9 @@ dap_chain_node_client_t *dap_chain_node_client_create(dap_chain_net_t *a_net, void s_client_delete_callback(UNUSED_ARG dap_client_t *a_client, void *a_arg) { + // TODO make decision for possible client replacement assert(a_arg); + ((dap_chain_node_client_t *)a_arg)->client = NULL; dap_chain_node_client_close_unsafe(a_arg); } /** @@ -761,15 +762,18 @@ void dap_chain_node_client_reset(dap_chain_node_client_t *a_client) */ void dap_chain_node_client_close_unsafe(dap_chain_node_client_t *a_node_client) { + char l_node_addr_str[INET_ADDRSTRLEN] = {}; + inet_ntop(AF_INET, &a_node_client->info->hdr.ext_addr_v4, l_node_addr_str, INET_ADDRSTRLEN); + log_it(L_INFO, "Closing node client to uplink %s:%d ["NODE_ADDR_FP_STR"]", + l_node_addr_str, a_node_client->info->hdr.ext_port, NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr)); + if (a_node_client->sync_timer) dap_timerfd_delete_unsafe(a_node_client->sync_timer); if (a_node_client->reconnect_timer) - dap_timerfd_delete_mt(a_node_client->reconnect_timer); + dap_timerfd_delete_mt(a_node_client->reconnect_timer->worker, a_node_client->reconnect_timer->esocket_uuid); if (a_node_client->callbacks.delete) a_node_client->callbacks.delete(a_node_client, a_node_client->net); - char l_node_addr_str[INET_ADDRSTRLEN] = {}; - inet_ntop(AF_INET, &a_node_client->info->hdr.ext_addr_v4, l_node_addr_str, INET_ADDRSTRLEN); - log_it(L_INFO, "Closing node client to uplink %s:%d", l_node_addr_str, a_node_client->info->hdr.ext_port); + if (a_node_client->stream_worker) { dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(a_node_client->stream_worker, a_node_client->ch_chain_uuid); if (l_ch) { diff --git a/modules/net/srv/dap_chain_net_srv.c b/modules/net/srv/dap_chain_net_srv.c index e05ed4ee42f32bf7dc670693ab0de1a918f0b55c..9c3bad1835bcac05ef0ad600e9ff6255e15966c4 100644 --- a/modules/net/srv/dap_chain_net_srv.c +++ b/modules/net/srv/dap_chain_net_srv.c @@ -96,8 +96,6 @@ int dap_chain_net_srv_init() dap_stream_ch_chain_net_srv_init(); m_uid = NULL; m_uid_count = 0; - if( dap_chain_net_srv_order_init() != 0 ) - return -1; dap_cli_server_cmd_add ("net_srv", s_cli_net_srv, "Network services managment", "net_srv -net <net_name> order find [-direction {sell | buy}] [-srv_uid <Service UID>] [-price_unit <price unit>]\n"