diff --git a/dap-sdk b/dap-sdk index a2431fb2bac221b9a5393292d02bede1f237a99f..f2b762fcfbf3d799029d9a52963b09fefd9e711b 160000 --- a/dap-sdk +++ b/dap-sdk @@ -1 +1 @@ -Subproject commit a2431fb2bac221b9a5393292d02bede1f237a99f +Subproject commit f2b762fcfbf3d799029d9a52963b09fefd9e711b diff --git a/modules/chain/dap_chain.c b/modules/chain/dap_chain.c index a09641236e9366f5a51718a70cb9ca72ea8b3233..20c8dfc5bc801e4014bbb9922eee78bb1ec563e1 100644 --- a/modules/chain/dap_chain.c +++ b/modules/chain/dap_chain.c @@ -697,7 +697,7 @@ void dap_chain_info_dump_log(dap_chain_t * a_chain) * @param a_callback * @param a_arg */ -void dap_chain_add_callback_notify(dap_chain_t * a_chain, dap_chain_callback_notify_t a_callback, void * a_callback_arg) +void dap_chain_add_callback_notify(dap_chain_t *a_chain, dap_chain_callback_notify_t a_callback, dap_proc_thread_t *a_thread, void *a_callback_arg) { if(!a_chain){ log_it(L_ERROR, "NULL chain passed to dap_chain_add_callback_notify()"); @@ -714,6 +714,7 @@ void dap_chain_add_callback_notify(dap_chain_t * a_chain, dap_chain_callback_not } l_notifier->callback = a_callback; + l_notifier->proc_thread = a_thread; l_notifier->arg = a_callback_arg; pthread_rwlock_wrlock(&a_chain->rwlock); a_chain->atom_notifiers = dap_list_append(a_chain->atom_notifiers, l_notifier); @@ -845,7 +846,7 @@ void dap_chain_atom_notify(dap_chain_cell_t *a_chain_cell, dap_hash_fast_t *a_ha .hash = *a_hash, .atom = a_chain_cell->chain->is_mapped ? (byte_t*)a_atom : DAP_DUP_SIZE(a_atom, a_atom_size), .atom_size = a_atom_size }; - dap_proc_thread_callback_add_pri(NULL, s_notify_atom_on_thread, l_arg, DAP_QUEUE_MSG_PRIORITY_LOW); + dap_proc_thread_callback_add_pri(l_notifier->proc_thread, s_notify_atom_on_thread, l_arg, DAP_QUEUE_MSG_PRIORITY_LOW); } } diff --git a/modules/chain/include/dap_chain.h b/modules/chain/include/dap_chain.h index e665a6ae20dbd876aec0d058b388ecf4dad22373..beb262ba03eb3fe930dbbf24599f28b5199d8327 100644 --- a/modules/chain/include/dap_chain.h +++ b/modules/chain/include/dap_chain.h @@ -231,8 +231,11 @@ typedef struct dap_chain { void * _inheritor; // inheritor object } dap_chain_t; +typedef struct dap_proc_thread dap_proc_thread_t; + typedef struct dap_chain_atom_notifier { dap_chain_callback_notify_t callback; + dap_proc_thread_t *proc_thread; void *arg; } dap_chain_atom_notifier_t; @@ -274,7 +277,7 @@ dap_chain_t * dap_chain_find_by_id(dap_chain_net_id_t a_chain_net_id,dap_chain_i dap_chain_t *dap_chain_load_from_cfg(const char *a_chain_net_name, dap_chain_net_id_t a_chain_net_id, const char *a_chain_cfg_name); void dap_chain_delete(dap_chain_t * a_chain); -void dap_chain_add_callback_notify(dap_chain_t * a_chain, dap_chain_callback_notify_t a_callback, void * a_arg); +void dap_chain_add_callback_notify(dap_chain_t *a_chain, dap_chain_callback_notify_t a_callback, dap_proc_thread_t *a_thread, void *a_arg); void dap_chain_atom_notify(dap_chain_cell_t *a_chain_cell, dap_hash_fast_t *a_hash, const uint8_t *a_atom, size_t a_atom_size); void dap_chain_atom_add_from_threshold(dap_chain_t *a_chain); dap_chain_atom_ptr_t dap_chain_get_atom_by_hash(dap_chain_t * a_chain, dap_chain_hash_fast_t * a_atom_hash, size_t * a_atom_size); diff --git a/modules/consensus/esbocs/dap_chain_cs_esbocs.c b/modules/consensus/esbocs/dap_chain_cs_esbocs.c index 7c9334ad83915cb1464839646254392dbb790f64..75e8f9c05ef405b23d2c9254fc0e7dabaf83d648 100644 --- a/modules/consensus/esbocs/dap_chain_cs_esbocs.c +++ b/modules/consensus/esbocs/dap_chain_cs_esbocs.c @@ -591,8 +591,9 @@ static int s_callback_created(dap_chain_t *a_chain, dap_config_t *a_chain_net_cf log_it(L_ERROR, "This validator is not allowed to work in emergency mode. Use special decree to supply it"); return -5; } - dap_chain_add_callback_notify(a_chain, s_new_atom_notifier, l_session); - s_session_round_new(l_session); + dap_chain_add_callback_notify(a_chain, s_new_atom_notifier, l_session->proc_thread, l_session); + //s_session_round_new(l_session); + dap_proc_thread_callback_add(l_session->proc_thread, s_session_round_new, l_session); l_session->cs_timer = !dap_proc_thread_timer_add(l_session->proc_thread, s_session_proc_state, l_session, 1000); debug_if(l_esbocs_pvt->debug && l_session->cs_timer, L_MSG, "Consensus main timer is started"); @@ -992,8 +993,9 @@ static void s_db_calc_sync_hash(dap_chain_esbocs_session_t *a_session) a_session->is_actual_hash = true; } -static void s_session_send_startsync(dap_chain_esbocs_session_t *a_session) +static void s_session_send_startsync(void *a_arg) { + dap_chain_esbocs_session_t *a_session = (dap_chain_esbocs_session_t*)a_arg; if (a_session->cur_round.sync_sent) return; // Sync message already was sent dap_chain_hash_fast_t l_last_block_hash; @@ -1023,14 +1025,6 @@ static void s_session_send_startsync(dap_chain_esbocs_session_t *a_session) a_session->cur_round.sync_sent = true; } -static bool s_session_send_startsync_on_timer(void *a_arg) -{ - dap_chain_esbocs_session_t *l_session = a_arg; - s_session_send_startsync(l_session); - l_session->sync_timer = NULL; - return false; -} - static void s_session_update_penalty(dap_chain_esbocs_session_t *a_session) { for (dap_list_t *it = a_session->cur_round.all_validators; it; it = it->next) { @@ -1092,11 +1086,6 @@ static bool s_session_round_new(void *a_arg) s_session_round_clear(a_session); a_session->cur_round.id++; a_session->cur_round.sync_attempt++; - - if (a_session->sync_timer) { - dap_timerfd_delete_mt(a_session->sync_timer->worker, a_session->sync_timer->esocket_uuid); - a_session->sync_timer = NULL; - } a_session->state = DAP_CHAIN_ESBOCS_SESSION_STATE_WAIT_START; a_session->ts_round_sync_start = 0; a_session->ts_stage_entry = 0; @@ -1156,7 +1145,8 @@ static bool s_session_round_new(void *a_arg) a_session->chain->net_name, a_session->chain->name, a_session->cur_round.id, l_sync_send_delay); if (l_sync_send_delay) - a_session->sync_timer = dap_timerfd_start(l_sync_send_delay * 1000, s_session_send_startsync_on_timer, a_session); + dap_proc_thread_timer_add_pri(a_session->proc_thread, s_session_send_startsync, a_session, + l_sync_send_delay * 1000, true, DAP_QUEUE_MSG_PRIORITY_NORMAL); else s_session_send_startsync(a_session); } diff --git a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h index e26355cf476bd50dc4a8496d4829c7c0ceedd057..e097d109a6114acb9a7e01ea5c6dc99e3323d675 100644 --- a/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h +++ b/modules/consensus/esbocs/include/dap_chain_cs_esbocs.h @@ -201,7 +201,6 @@ typedef struct dap_chain_esbocs_session { dap_time_t ts_stage_entry; // time of current stage entrance dap_chain_esbocs_sync_item_t *sync_items; - dap_timerfd_t *sync_timer; dap_chain_addr_t my_signing_addr;