From ac673e74dc83000ec0baea09aa45dbf403aada79 Mon Sep 17 00:00:00 2001 From: cellframe <roman.khlopkov@demlabs.net> Date: Mon, 28 Nov 2022 21:06:26 +0300 Subject: [PATCH] [*] GlobalDB notifiers refactoring --- modules/chain/dap_chain_pvt.c | 2 +- modules/chain/include/dap_chain.h | 6 +- .../consensus/dag-poa/dap_chain_cs_dag_poa.c | 3 +- modules/consensus/none/dap_chain_cs_none.c | 7 +- modules/net/dap_chain_net.c | 188 ++++-------------- modules/net/include/dap_chain_net.h | 5 +- modules/net/srv/dap_chain_net_srv_order.c | 27 ++- .../net/srv/include/dap_chain_net_srv_order.h | 2 +- .../service/datum/dap_chain_net_srv_datum.c | 16 +- modules/type/dag/dap_chain_cs_dag.c | 13 +- modules/type/dag/dap_chain_cs_dag_event.c | 36 ++-- .../type/dag/include/dap_chain_cs_dag_event.h | 10 +- 12 files changed, 93 insertions(+), 222 deletions(-) diff --git a/modules/chain/dap_chain_pvt.c b/modules/chain/dap_chain_pvt.c index 8481a4be4f..e1e486574a 100644 --- a/modules/chain/dap_chain_pvt.c +++ b/modules/chain/dap_chain_pvt.c @@ -34,7 +34,7 @@ #define LOG_TAG "dap_chain_pvt" -void dap_chain_add_mempool_notify_callback(dap_chain_t *a_chain, dap_global_db_obj_callback_notify_t a_callback, void *a_cb_arg) +void dap_chain_add_mempool_notify_callback(dap_chain_t *a_chain, dap_store_obj_callback_notify_t a_callback, void *a_cb_arg) { dap_chain_gdb_notifier_t *l_notifier = DAP_NEW(dap_chain_gdb_notifier_t); l_notifier->callback = a_callback; diff --git a/modules/chain/include/dap_chain.h b/modules/chain/include/dap_chain.h index 537062aae2..89d344a732 100644 --- a/modules/chain/include/dap_chain.h +++ b/modules/chain/include/dap_chain.h @@ -27,7 +27,7 @@ #include <stdbool.h> #include <pthread.h> #include "dap_global_db.h" -#include "dap_global_db_sync.h" +#include "dap_global_db_remote.h" #include "dap_config.h" #include "dap_chain_common.h" #include "dap_chain_datum.h" @@ -178,7 +178,7 @@ typedef struct dap_chain { } dap_chain_t; typedef struct dap_chain_gdb_notifier { - dap_global_db_obj_callback_notify_t callback; + dap_store_obj_callback_notify_t callback; void *cb_arg; } dap_chain_gdb_notifier_t; @@ -213,5 +213,5 @@ void dap_chain_add_callback_notify(dap_chain_t * a_chain, dap_chain_callback_not dap_chain_atom_ptr_t dap_chain_get_atom_by_hash(dap_chain_t * a_chain, dap_chain_hash_fast_t * a_atom_hash, size_t * a_atom_size); bool dap_chain_get_atom_last_hash(dap_chain_t *a_chain, dap_hash_fast_t *a_atom_hash, dap_chain_cell_id_t a_cel_id); ssize_t dap_chain_atom_save(dap_chain_t *a_chain, const uint8_t *a_atom, size_t a_atom_size, dap_chain_cell_id_t a_cell_id); -void dap_chain_add_mempool_notify_callback(dap_chain_t *a_chain, dap_global_db_obj_callback_notify_t a_callback, void *a_cb_arg); +void dap_chain_add_mempool_notify_callback(dap_chain_t *a_chain, dap_store_obj_callback_notify_t a_callback, void *a_cb_arg); int dap_cert_chain_file_save(dap_chain_datum_t *datum, char *net_name); diff --git a/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c b/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c index bab47b98f4..074c3ef671 100644 --- a/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c +++ b/modules/consensus/dag-poa/dap_chain_cs_dag_poa.c @@ -408,6 +408,7 @@ static void s_poa_round_check_callback_round_clean(dap_global_db_context_t *a_gl continue; if (a_values[i].value_len <= sizeof(dap_chain_cs_dag_event_round_item_t) + sizeof(dap_chain_cs_dag_event_t)) { log_it(L_WARNING, "Too small round item in DAG PoA rounds GDB group"); + dap_global_db_del_unsafe(a_global_db_context, a_group, a_values[i].key); continue; } dap_chain_cs_dag_event_round_item_t *l_event_round_item = (dap_chain_cs_dag_event_round_item_t *)a_values[i].value; @@ -415,7 +416,7 @@ static void s_poa_round_check_callback_round_clean(dap_global_db_context_t *a_gl uint64_t l_timeuot = dap_nanotime_from_sec(l_poa_pvt->confirmations_timeout + l_poa_pvt->wait_sync_before_complete + 10); uint64_t l_round_id = ((dap_chain_cs_dag_event_t *)l_event_round_item->event_n_signs)->header.round_id; if (l_time_diff > l_timeuot && l_round_id <= l_dag->round_completed) { - dap_global_db_del_unsafe(a_global_db_context, a_group, a_values[i].key); + dap_global_db_del_unsafe(a_global_db_context, a_group, a_values[i].key); log_it(L_DEBUG, "DAG-PoA: Remove event %s from round %"DAP_UINT64_FORMAT_U" by timer.", a_values[i].key, l_round_id); } diff --git a/modules/consensus/none/dap_chain_cs_none.c b/modules/consensus/none/dap_chain_cs_none.c index 5c39ea8389..80faa8c738 100644 --- a/modules/consensus/none/dap_chain_cs_none.c +++ b/modules/consensus/none/dap_chain_cs_none.c @@ -132,15 +132,14 @@ int dap_chain_gdb_init(void) * @param a_value buffer with data * @param a_value_len buffer size */ -static void s_history_callback_notify(void * a_arg, const char a_op_code, const char * a_group, - const char * a_key, const void * a_value, const size_t a_value_size) +static void s_history_callback_notify(dap_global_db_context_t *a_context, dap_store_obj_t *a_obj, void *a_arg) { if (a_arg){ dap_chain_gdb_t * l_gdb = (dap_chain_gdb_t *) a_arg; dap_chain_net_t *l_net = dap_chain_net_by_id( l_gdb->chain->net_id); log_it(L_DEBUG,"%s.%s: op_code='%c' group=\"%s\" key=\"%s\" value_size=%zu",l_net->pub.name, - l_gdb->chain->name, a_op_code, a_group, a_key, a_value_size); - dap_chain_net_sync_gdb_broadcast((void *)l_net, a_op_code, a_group, a_key, a_value, a_value_size); + l_gdb->chain->name, a_obj->type, a_obj->group, a_obj->key, a_obj->value_len); + dap_chain_net_sync_gdb_broadcast(a_context, a_obj, l_net); } } diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index e76ad73a63..a22e47f6f7 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -264,9 +264,7 @@ static void s_net_states_notify(dap_chain_net_t * l_net); int s_net_load(const char * a_net_name, uint16_t a_acl_idx); // Notify callback for GlobalDB changes -static void s_gbd_history_callback_notify (void * a_arg, const char a_op_code, const char * a_group, - const char * a_key, const void * a_value, - const size_t a_value_len); +static void s_gbd_history_callback_notify(dap_global_db_context_t *a_context, dap_store_obj_t *a_obj, void *a_arg); static void s_chain_callback_notify(void * a_arg, dap_chain_t *a_chain, dap_chain_cell_id_t a_id, void *a_atom, size_t a_atom_size); static int s_cli_net(int argc, char ** argv, char **str_reply); static uint8_t *s_net_set_acl(dap_chain_hash_fast_t *a_pkey_hash); @@ -376,7 +374,7 @@ dap_chain_net_state_t dap_chain_net_get_target_state(dap_chain_net_t *a_net) * * @param a_callback dap_global_db_obj_callback_notify_t callback function */ -void dap_chain_net_add_gdb_notify_callback(dap_chain_net_t *a_net, dap_global_db_obj_callback_notify_t a_callback, void *a_cb_arg) +void dap_chain_net_add_gdb_notify_callback(dap_chain_net_t *a_net, dap_store_obj_callback_notify_t a_callback, void *a_cb_arg) { dap_chain_gdb_notifier_t *l_notifier = DAP_NEW(dap_chain_gdb_notifier_t); l_notifier->callback = a_callback; @@ -406,123 +404,6 @@ int dap_chain_net_add_downlink(dap_chain_net_t *a_net, dap_stream_worker_t *a_wo pthread_rwlock_unlock(&l_net_pvt->downlinks_lock); return 0; } - -struct send_records_args{ - dap_chain_net_t * net; - dap_proc_thread_t * proc_thread; - dap_store_obj_t * arg_obj; -}; - -/** - * @brief s_net_send_records_callback_get - * @param a_global_db_context - * @param a_rc - * @param a_store_obj - * @param a_arg - */ -static void s_net_send_records_callback_get_raw (dap_global_db_context_t * a_global_db_context,int a_rc, dap_store_obj_t * a_store_obj, void * a_arg) -{ - struct send_records_args * l_args = (struct send_records_args*) a_arg; - dap_chain_net_t * l_net = l_args->net; - dap_store_obj_t * l_arg_obj = l_args->arg_obj; - DAP_DEL_Z(l_args); - - if (a_rc != DAP_GLOBAL_DB_RC_SUCCESS ) { - log_it(L_DEBUG, "Notified GDB event does not exist"); - dap_store_obj_free_one(l_arg_obj); - return; - } - - if (!a_store_obj->group || !a_store_obj->key) { - dap_store_obj_free_one(l_arg_obj); - return; - } - - a_store_obj->type = l_arg_obj->type; - - if (a_store_obj->type == DAP_DB$K_OPTYPE_DEL) { - DAP_DELETE(a_store_obj->group); - a_store_obj->group = l_arg_obj->group; - } else - DAP_DELETE(l_arg_obj->group); - - DAP_DELETE(l_arg_obj->key); - DAP_DELETE(l_arg_obj); - - pthread_rwlock_wrlock(&PVT(l_net)->gdbs_lock); - if (PVT(l_net)->state != NET_STATE_OFFLINE) { - dap_list_t *it = NULL; - do { - dap_store_obj_t *l_obj_cur = it ? (dap_store_obj_t *)it->data : a_store_obj; - dap_chain_t *l_chain = NULL; - if (l_obj_cur->type == DAP_DB$K_OPTYPE_ADD) - l_chain = dap_chain_get_chain_from_group_name(l_net->pub.id, a_store_obj->group); - dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t) {}; - dap_chain_cell_id_t l_cell_id = l_chain ? l_chain->cells->id : (dap_chain_cell_id_t){}; - dap_global_db_pkt_t *l_data_out = dap_store_packet_single(l_obj_cur); - dap_store_obj_free_one(l_obj_cur); - struct downlink *l_link, *l_tmp; - pthread_rwlock_rdlock(&PVT(l_net)->downlinks_lock); - HASH_ITER(hh, PVT(l_net)->downlinks, l_link, l_tmp) { - bool l_ch_alive = dap_stream_ch_check_uuid_mt(l_link->worker, l_link->ch_uuid); - if (!l_ch_alive) { - HASH_DEL(PVT(l_net)->downlinks, l_link); - DAP_DELETE(l_link); - continue; - } - if (!dap_stream_ch_chain_pkt_write_inter( a_global_db_context->queue_worker_ch_io_input[l_link->worker->worker->id], - l_link->ch_uuid, - DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_net->pub.id.uint64, - l_chain_id.uint64, l_cell_id.uint64, l_data_out, - sizeof(dap_global_db_pkt_t) + l_data_out->data_size)) - debug_if(g_debug_reactor, L_ERROR, "Can't send pkt to worker (%d) for writing", l_link->worker->worker->id); - } - pthread_rwlock_unlock(&PVT(l_net)->downlinks_lock); - DAP_DELETE(l_data_out); - if (it) - PVT(l_net)->records_queue = dap_list_delete_link(PVT(l_net)->records_queue, it); - it = PVT(l_net)->records_queue; - } while (it); - } else - //PVT(l_net)->records_queue = dap_list_append(PVT(l_net)->records_queue, l_obj); - pthread_rwlock_unlock(&PVT(l_net)->gdbs_lock); -} - -/** - * @brief s_net_send_records - * @param a_thread - * @param a_arg - * @return - */ -static bool s_net_send_records(dap_proc_thread_t *a_thread, void *a_arg) -{ - UNUSED(a_thread); - dap_store_obj_t *l_arg_obj = (dap_store_obj_t *)a_arg; - dap_chain_net_t *l_net = (dap_chain_net_t *)l_arg_obj->callback_proc_thread_arg; - - struct send_records_args * l_args = DAP_NEW_Z(struct send_records_args); - l_args->arg_obj = l_arg_obj; - l_args->net = l_net; - l_args->proc_thread = a_thread; - - if (l_arg_obj->type == DAP_DB$K_OPTYPE_DEL) { - char *l_group = dap_strdup_printf("%s.del", l_arg_obj->group); - if( dap_global_db_get_raw( l_group, l_arg_obj->key, s_net_send_records_callback_get_raw, - l_args) != 0 ){ - log_it(L_ERROR, "Can't execute get request for s_net_send_records() function"); - DAP_DELETE(l_args); - } - DAP_DELETE(l_group); - } else if(dap_global_db_get_raw( l_arg_obj->key, l_arg_obj->key,s_net_send_records_callback_get_raw, - l_args) != 0){ - log_it(L_ERROR, "Can't execute get request for s_net_send_records() function"); - DAP_DELETE(l_args); - } - return true; -} - -static void s_record_obj_free(void *a_obj) { return dap_store_obj_free_one((dap_store_obj_t *)a_obj); } - /** * @brief executes, when you add data to gdb and sends it to current network connected nodes * @param a_arg arguments. Can be network object (dap_chain_net_t) @@ -532,33 +413,36 @@ static void s_record_obj_free(void *a_obj) { return dap_store_obj_free_one((dap_ * @param a_value buffer with data * @param a_value_size buffer size */ -void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const char *a_group, - const char *a_key, const void *a_value, const size_t a_value_size) +void dap_chain_net_sync_gdb_broadcast(dap_global_db_context_t *a_context, dap_store_obj_t *a_obj, void *a_arg) { - UNUSED(a_value); - UNUSED(a_value_size); - if (!a_arg || !a_group || !a_key){ + if (!a_arg || !a_obj || !a_obj->group || !a_obj->key) return; - } dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; - if (!HASH_COUNT(PVT(l_net)->downlinks)) { - if (PVT(l_net)->records_queue) { - pthread_rwlock_wrlock(&PVT(l_net)->gdbs_lock); - dap_list_free_full(PVT(l_net)->records_queue, s_record_obj_free); - PVT(l_net)->records_queue = NULL; - pthread_rwlock_unlock(&PVT(l_net)->gdbs_lock); + if (PVT(l_net)->state != NET_STATE_SYNC_GDB) { + dap_chain_t *l_chain = NULL; + if (a_obj->type == DAP_DB$K_OPTYPE_ADD) + l_chain = dap_chain_get_chain_from_group_name(l_net->pub.id, a_obj->group); + dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t) {}; + dap_chain_cell_id_t l_cell_id = l_chain ? l_chain->cells->id : (dap_chain_cell_id_t){}; + dap_global_db_pkt_t *l_data_out = dap_store_packet_single(a_obj); + struct downlink *l_link, *l_tmp; + pthread_rwlock_rdlock(&PVT(l_net)->downlinks_lock); + HASH_ITER(hh, PVT(l_net)->downlinks, l_link, l_tmp) { + bool l_ch_alive = dap_stream_ch_check_uuid_mt(l_link->worker, l_link->ch_uuid); + if (!l_ch_alive) { + HASH_DEL(PVT(l_net)->downlinks, l_link); + DAP_DELETE(l_link); + continue; + } + if (!dap_stream_ch_chain_pkt_write_inter(a_context->queue_worker_ch_io_input[l_link->worker->worker->id], + l_link->ch_uuid, + DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_net->pub.id.uint64, + l_chain_id.uint64, l_cell_id.uint64, l_data_out, + sizeof(dap_global_db_pkt_t) + l_data_out->data_size)) + debug_if(g_debug_reactor, L_ERROR, "Can't send pkt to worker (%d) for writing", l_link->worker->worker->id); } - return; + pthread_rwlock_unlock(&PVT(l_net)->downlinks_lock); } - // Use it instead of new type definition to pack params in one callback arg - dap_store_obj_t *l_obj = DAP_NEW(dap_store_obj_t); - l_obj->type = a_op_code; - l_obj->key = dap_strdup(a_key); - l_obj->key_len = dap_strlen(l_obj->key)+1; - l_obj->group = dap_strdup(a_group); - l_obj->callback_proc_thread_arg = a_arg; - - dap_proc_queue_add_callback(dap_events_worker_get_auto(), s_net_send_records, l_obj); } static void s_atom_obj_free(void *a_atom_obj) @@ -647,20 +531,17 @@ static void s_chain_callback_notify(void *a_arg, dap_chain_t *a_chain, dap_chain * @param a_value buffer with data * @param a_value_len buffer size */ -static void s_gbd_history_callback_notify(void *a_arg, const char a_op_code, const char *a_group, - const char *a_key, const void *a_value, const size_t a_value_len) +static void s_gbd_history_callback_notify(dap_global_db_context_t *a_context, dap_store_obj_t *a_obj, void *a_arg) { - if (!a_arg) { + if (!a_obj || !a_arg) return; - } dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; for (dap_list_t *it = PVT(l_net)->gdb_notifiers; it; it = it->next) { dap_chain_gdb_notifier_t *el = (dap_chain_gdb_notifier_t *)it->data; if (!el) continue; - dap_global_db_obj_callback_notify_t l_callback = el->callback; - if (l_callback) - l_callback(el->cb_arg, a_op_code, a_group, a_key, a_value, a_value_len); + if (el->callback) + el->callback(a_context, a_obj, el->cb_arg); } dap_chain_t *l_chain; DL_FOREACH(l_net->pub.chains, l_chain) { @@ -668,14 +549,13 @@ static void s_gbd_history_callback_notify(void *a_arg, const char a_op_code, con continue; } char *l_gdb_group_str = dap_chain_net_get_gdb_group_mempool_new(l_chain); - if (!strcmp(a_group, l_gdb_group_str)) { + if (!strcmp(a_obj->group, l_gdb_group_str)) { for (dap_list_t *it = DAP_CHAIN_PVT(l_chain)->mempool_notifires; it; it = it->next) { dap_chain_gdb_notifier_t *el = (dap_chain_gdb_notifier_t *)it->data; if (!el) continue; - dap_global_db_obj_callback_notify_t l_callback = el->callback; - if (l_callback) - l_callback(el->cb_arg, a_op_code, a_group, a_key, a_value, a_value_len); + if (el->callback) + el->callback(a_context, a_obj, el->cb_arg); } } DAP_DELETE(l_gdb_group_str); diff --git a/modules/net/include/dap_chain_net.h b/modules/net/include/dap_chain_net.h index ce2a975d44..1aecefcb8a 100644 --- a/modules/net/include/dap_chain_net.h +++ b/modules/net/include/dap_chain_net.h @@ -182,9 +182,8 @@ bool dap_chain_net_get_extra_gdb_group(dap_chain_net_t *a_net, dap_chain_node_ad int dap_chain_net_verify_datum_for_add(dap_chain_net_t *a_net, dap_chain_datum_t * a_datum ); int dap_chain_net_add_downlink(dap_chain_net_t *a_net, dap_stream_worker_t *a_worker, dap_stream_ch_uuid_t a_ch_uuid, dap_events_socket_uuid_t a_esocket_uuid); -void dap_chain_net_add_gdb_notify_callback(dap_chain_net_t *a_net, dap_global_db_obj_callback_notify_t a_callback, void *a_cb_arg); -void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const char *a_group, - const char *a_key, const void *a_value, const size_t a_value_len); +void dap_chain_net_add_gdb_notify_callback(dap_chain_net_t *a_net, dap_store_obj_callback_notify_t a_callback, void *a_cb_arg); +void dap_chain_net_sync_gdb_broadcast(dap_global_db_context_t *a_context, dap_store_obj_t *a_obj, void *a_arg); struct dap_chain_node_client * dap_chain_net_client_create_n_connect( dap_chain_net_t * a_net, struct dap_chain_node_info *a_link_info); struct dap_chain_node_client * dap_chain_net_client_create_n_connect_channels( dap_chain_net_t * a_net,struct dap_chain_node_info *a_link_info, diff --git a/modules/net/srv/dap_chain_net_srv_order.c b/modules/net/srv/dap_chain_net_srv_order.c index 85480272b0..4da228fb0a 100644 --- a/modules/net/srv/dap_chain_net_srv_order.c +++ b/modules/net/srv/dap_chain_net_srv_order.c @@ -58,13 +58,12 @@ char *s_server_continents[]={ struct dap_order_notify { dap_chain_net_t *net; - dap_global_db_obj_callback_notify_t callback; + dap_store_obj_callback_notify_t callback; void *cb_arg; }; static dap_list_t *s_order_notify_callbacks = NULL; -static void s_srv_order_callback_notify(void *a_arg, const char a_op_code, const char *a_group, - const char *a_key, const void *a_value, const size_t a_value_len); +static void s_srv_order_callback_notify(dap_global_db_context_t *a_context, dap_store_obj_t *a_obj, void *a_arg); /** * @brief dap_chain_net_srv_order_init @@ -586,10 +585,9 @@ void dap_chain_net_srv_order_dump_to_string(dap_chain_net_srv_order_t *a_order,d } } -static void s_srv_order_callback_notify(void *a_arg, const char a_op_code, const char *a_group, - const char *a_key, const void *a_value, const size_t a_value_len) +static void s_srv_order_callback_notify(dap_global_db_context_t *a_context, dap_store_obj_t *a_obj, void *a_arg) { - if (!a_arg || !a_key) + if (!a_arg || !a_obj || !a_obj->key) return; dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; dap_global_db_context_t * l_gdb_context = dap_global_db_context_current(); @@ -598,27 +596,26 @@ static void s_srv_order_callback_notify(void *a_arg, const char a_op_code, const char *l_gdb_group_str = dap_chain_net_srv_order_get_gdb_group(l_net); - if (!dap_strcmp(a_group, l_gdb_group_str)) { + if (!dap_strcmp(a_obj->group, l_gdb_group_str)) { for (dap_list_t *it = s_order_notify_callbacks; it; it = it->next) { struct dap_order_notify *l_notifier = (struct dap_order_notify *)it->data; if ((l_notifier->net == NULL || l_notifier->net == l_net) && l_notifier->callback) { - l_notifier->callback(l_notifier->cb_arg, a_op_code, a_group, - a_key, a_value, a_value_len); + l_notifier->callback(a_context, a_obj, l_notifier->cb_arg); } } - if (a_value && a_op_code == DAP_DB$K_OPTYPE_ADD && + if (a_obj->value && a_obj->type == DAP_DB$K_OPTYPE_ADD && dap_config_get_item_bool_default(g_config, "srv", "order_signed_only", true)) { - dap_chain_net_srv_order_t *l_order = (dap_chain_net_srv_order_t *)a_value; + dap_chain_net_srv_order_t *l_order = (dap_chain_net_srv_order_t *)a_obj->value; if (l_order->version != 2) { - dap_global_db_del_unsafe(l_gdb_context, a_group, a_key); + dap_global_db_del_unsafe(l_gdb_context, a_obj->group, a_obj->key); } else { dap_sign_t *l_sign = (dap_sign_t *)(l_order->ext_n_sign + l_order->ext_size); - size_t l_max_size = a_value_len - sizeof(dap_chain_net_srv_order_t) - l_order->ext_size; + size_t l_max_size = a_obj->value_len - sizeof(dap_chain_net_srv_order_t) - l_order->ext_size; int l_verify = dap_sign_verify_all(l_sign, l_max_size, l_order, sizeof(dap_chain_net_srv_order_t) + l_order->ext_size); if (l_verify) { log_it(L_ERROR, "Order unverified, err %d", l_verify); - dap_global_db_del_unsafe(l_gdb_context, a_group, a_key); + dap_global_db_del_unsafe(l_gdb_context, a_obj->group, a_obj->key); } /*dap_chain_hash_fast_t l_pkey_hash; if (!dap_sign_get_pkey_hash(l_sign, &l_pkey_hash)) { @@ -639,7 +636,7 @@ static void s_srv_order_callback_notify(void *a_arg, const char a_op_code, const DAP_DELETE(l_gdb_group_str); } -void dap_chain_net_srv_order_add_notify_callback(dap_chain_net_t *a_net, dap_global_db_obj_callback_notify_t a_callback, void *a_cb_arg) +void dap_chain_net_srv_order_add_notify_callback(dap_chain_net_t *a_net, dap_store_obj_callback_notify_t a_callback, void *a_cb_arg) { struct dap_order_notify *l_notifier = DAP_NEW(struct dap_order_notify); l_notifier->net = a_net; diff --git a/modules/net/srv/include/dap_chain_net_srv_order.h b/modules/net/srv/include/dap_chain_net_srv_order.h index cf21280c85..f656781628 100644 --- a/modules/net/srv/include/dap_chain_net_srv_order.h +++ b/modules/net/srv/include/dap_chain_net_srv_order.h @@ -156,7 +156,7 @@ dap_chain_net_srv_order_t *dap_chain_net_srv_order_compose( char *dap_chain_net_srv_order_save(dap_chain_net_t *a_net, dap_chain_net_srv_order_t *a_order); void dap_chain_net_srv_order_dump_to_string(dap_chain_net_srv_order_t *a_order,dap_string_t * a_str_out, const char *a_hash_out_type); -void dap_chain_net_srv_order_add_notify_callback(dap_chain_net_t *a_net, dap_global_db_obj_callback_notify_t a_callback, void *a_cb_arg); +void dap_chain_net_srv_order_add_notify_callback(dap_chain_net_t *a_net, dap_store_obj_callback_notify_t a_callback, void *a_cb_arg); /** * @brief dap_chain_net_srv_order_get_gdb_group_mempool * @param l_chain diff --git a/modules/service/datum/dap_chain_net_srv_datum.c b/modules/service/datum/dap_chain_net_srv_datum.c index 2f406f452f..e2b79c9b36 100644 --- a/modules/service/datum/dap_chain_net_srv_datum.c +++ b/modules/service/datum/dap_chain_net_srv_datum.c @@ -36,7 +36,7 @@ static dap_chain_net_srv_t *s_srv_datum = NULL; static int s_srv_datum_cli(int argc, char ** argv, char **a_str_reply); -void s_order_notficator(void *a_arg, const char a_op_code, const char *a_group, const char *a_key, const void *a_value, const size_t a_value_len); +void s_order_notficator(dap_global_db_context_t *a_context, dap_store_obj_t *a_obj, void *a_arg); int dap_chain_net_srv_datum_init() { @@ -192,18 +192,18 @@ static int s_srv_datum_cli(int argc, char ** argv, char **a_str_reply) { * @param a_value * @param a_value_len */ -void s_order_notficator(void *a_arg, const char a_op_code, const char *a_group, const char *a_key, const void *a_value, const size_t a_value_len) +void s_order_notficator(dap_global_db_context_t *a_context, dap_store_obj_t *a_obj, void *a_arg) { - if (a_op_code == DAP_DB$K_OPTYPE_DEL) + if (a_obj->type == DAP_DB$K_OPTYPE_DEL) return; dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; - dap_chain_net_srv_order_t *l_order = dap_chain_net_srv_order_read((byte_t *)a_value, a_value_len); // Old format comliance + dap_chain_net_srv_order_t *l_order = dap_chain_net_srv_order_read((byte_t *)a_obj->value, a_obj->value_len); // Old format comliance dap_global_db_context_t * l_gdb_context = dap_global_db_context_current(); assert(l_gdb_context); - if (!l_order && a_key) { - log_it(L_NOTICE, "Order %s is corrupted", a_key); - if(dap_global_db_del_unsafe(l_gdb_context, a_group, a_key) != 0 ){ - log_it(L_ERROR,"Can't delete order %s", a_key); + if (!l_order && a_obj->key) { + log_it(L_NOTICE, "Order %s is corrupted", a_obj->key); + if(dap_global_db_del_unsafe(l_gdb_context, a_obj->group, a_obj->key) != 0 ){ + log_it(L_ERROR,"Can't delete order %s", a_obj->key); } return; // order is corrupted diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 8f5c80179e..cfe93722f8 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -169,20 +169,19 @@ void dap_chain_cs_dag_deinit(void) } -static void s_history_callback_round_notify(void *a_arg, const char a_op_code, const char *a_group, - const char *a_key, const void *a_value, const size_t a_value_size) +static void s_history_callback_round_notify(dap_global_db_context_t *a_context, dap_store_obj_t *a_obj, void *a_arg) { dap_chain_cs_dag_t *l_dag = (dap_chain_cs_dag_t *)a_arg; assert(l_dag); dap_chain_net_t *l_net = dap_chain_net_by_id(l_dag->chain->net_id); debug_if(s_debug_more, L_DEBUG, "%s.%s: op_code='%c' group=\"%s\" key=\"%s\" value_size=%zu", - l_net->pub.name, l_dag->chain->name, a_op_code, a_group, a_key, a_value_size); - if (a_op_code == DAP_DB$K_OPTYPE_ADD && + l_net->pub.name, l_dag->chain->name, a_obj->type, a_obj->group, a_obj->key, a_obj->value_len); + if (a_obj->type == DAP_DB$K_OPTYPE_ADD && l_dag->callback_cs_event_round_sync) { if (!l_dag->broadcast_disable) - dap_chain_cs_dag_event_broadcast(l_dag, a_op_code, a_group, a_key, a_value, a_value_size); - if (dap_strcmp(a_key, DAG_ROUND_CURRENT_KEY)) // check key for round increment, if no than process event - l_dag->callback_cs_event_round_sync(l_dag, a_op_code, a_group, a_key, a_value, a_value_size); + dap_chain_cs_dag_event_broadcast(l_dag, a_obj, a_context); + if (dap_strcmp(a_obj->key, DAG_ROUND_CURRENT_KEY)) // check key for round increment, if no than process event + l_dag->callback_cs_event_round_sync(l_dag, a_obj->type, a_obj->group, a_obj->key, a_obj->value, a_obj->value_len); } } diff --git a/modules/type/dag/dap_chain_cs_dag_event.c b/modules/type/dag/dap_chain_cs_dag_event.c index 0d055be5c3..e2ae38fe11 100644 --- a/modules/type/dag/dap_chain_cs_dag_event.c +++ b/modules/type/dag/dap_chain_cs_dag_event.c @@ -222,37 +222,37 @@ size_t dap_chain_cs_dag_event_round_sign_add(dap_chain_cs_dag_event_round_item_t return a_round_item_size+l_sign_size; } - -static bool s_event_broadcast_send(dap_chain_cs_dag_event_round_broadcast_t *l_arg) { +static void s_event_broadcast_from_context(dap_global_db_context_t *a_context, void *a_arg) +{ + dap_chain_cs_dag_event_round_broadcast_t *l_arg = a_arg; dap_chain_net_t *l_net = dap_chain_net_by_id(l_arg->dag->chain->net_id); + dap_chain_net_sync_gdb_broadcast(a_context, l_arg->obj, l_net); + dap_store_obj_free_one(l_arg->obj); + DAP_DELETE(a_arg); +} + +static bool s_event_broadcast_send(dap_chain_cs_dag_event_round_broadcast_t *a_arg) +{ + dap_chain_net_t *l_net = dap_chain_net_by_id(a_arg->dag->chain->net_id); if (dap_chain_net_get_state(l_net) != NET_STATE_SYNC_GDB) - dap_chain_net_sync_gdb_broadcast((void *)l_net, l_arg->op_code, l_arg->group, l_arg->key, l_arg->value, l_arg->value_size); - else if ( l_arg->attempts < 10 ) { - l_arg->attempts++; + dap_global_db_context_exec(s_event_broadcast_from_context, a_arg); + else if (a_arg->attempts++ < 10) return true; - } - DAP_DELETE(l_arg->group); - DAP_DELETE(l_arg->key); - DAP_DELETE(l_arg->value); - DAP_DELETE(l_arg); return false; } -void dap_chain_cs_dag_event_broadcast(dap_chain_cs_dag_t *a_dag, const char a_op_code, const char *a_group, - const char *a_key, const void *a_value, const size_t a_value_size) { +void dap_chain_cs_dag_event_broadcast(dap_chain_cs_dag_t *a_dag, dap_store_obj_t *a_obj, dap_global_db_context_t *a_context) +{ dap_chain_cs_dag_event_round_broadcast_t *l_arg = DAP_NEW(dap_chain_cs_dag_event_round_broadcast_t); l_arg->dag = a_dag; - l_arg->op_code = a_op_code; - l_arg->group = dap_strdup(a_group); - l_arg->key = dap_strdup(a_key); - l_arg->value = DAP_DUP_SIZE(a_value, a_value_size); - l_arg->value_size = a_value_size; + l_arg->obj = dap_store_obj_copy(a_obj, 1); + l_arg->context = a_context; l_arg->attempts = 0; if (dap_timerfd_start(3*1000, (dap_timerfd_callback_t)s_event_broadcast_send, l_arg) == NULL) { - log_it(L_ERROR,"Can't run timer for broadcast Event %s", a_key); + log_it(L_ERROR, "Can't run timer for broadcast Event %s", a_obj->key); } } diff --git a/modules/type/dag/include/dap_chain_cs_dag_event.h b/modules/type/dag/include/dap_chain_cs_dag_event.h index 20befac788..e81278c7a8 100644 --- a/modules/type/dag/include/dap_chain_cs_dag_event.h +++ b/modules/type/dag/include/dap_chain_cs_dag_event.h @@ -62,11 +62,8 @@ typedef struct dap_chain_cs_dag_event_round_item { typedef struct dap_chain_cs_dag_event_round_broadcast { dap_chain_cs_dag_t *dag; - char op_code; - char *group; - char *key; - void *value; - size_t value_size; + dap_store_obj_t *obj; + dap_global_db_context_t *context; int attempts; } dap_chain_cs_dag_event_round_broadcast_t; @@ -163,8 +160,7 @@ static inline size_t dap_chain_cs_dag_event_round_item_get_size(dap_chain_cs_dag return sizeof(dap_chain_cs_dag_event_round_item_t)+a_event_round_item->data_size; } -void dap_chain_cs_dag_event_broadcast(dap_chain_cs_dag_t *a_dag, const char a_op_code, const char *a_group, - const char *a_key, const void *a_value, const size_t a_value_size); +void dap_chain_cs_dag_event_broadcast(dap_chain_cs_dag_t *a_dag, dap_store_obj_t *a_obj, dap_global_db_context_t *a_context); bool dap_chain_cs_dag_event_gdb_set(dap_chain_cs_dag_t *a_dag, char *a_event_hash_str, dap_chain_cs_dag_event_t *a_event, size_t a_event_size, dap_chain_cs_dag_event_round_item_t *a_round_item); -- GitLab