diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index d6f20b3581edec47188047ab164ce90110b92108..2f6429690bc109696e37bd27b9bb19a2c6016e9c 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -1255,6 +1255,8 @@ static void s_stream_es_callback_delete(dap_events_socket_t *a_es, void *arg) } l_client_pvt->stream = NULL; l_client_pvt->stream_es = NULL; + //dap_client_delete_mt(l_client_pvt->client); // TODO find a way to delete the client if it no closed yet + } /** diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 5d7d8850150bf4602b268590923884c852be1f51..a6414b71d0039722ca3d84feb658478c3c68a6b5 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -452,6 +452,11 @@ static bool s_sync_update_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a log_it(L_DEBUG, "Prepare request to gdb sync from %s", l_sync_request->request.id_start ? "last sync" : "zero"); dap_chain_net_t *l_net = dap_chain_net_by_id(l_sync_request->request_hdr.net_id); dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(l_sync_request->worker), l_sync_request->ch_uuid); + if (!l_ch) { + log_it(L_INFO, "Client disconnected before we sent the reply"); + DAP_DELETE(l_sync_request); + return true; + } dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); int l_flags = 0; if (dap_chain_net_get_add_gdb_group(l_net, l_sync_request->request.node_addr)) @@ -501,25 +506,13 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_item->pkt_data; uint64_t l_atom_copy_size = l_pkt_item->pkt_data_size; dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash); - size_t l_atom_size = 0; - if (dap_chain_get_atom_by_hash(l_chain, &l_atom_hash, &l_atom_size)) { - if (s_debug_more){ - char l_atom_hash_str[72] = {'\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); - log_it(L_WARNING, "Atom hash %s is already present", l_atom_hash_str); - } - dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); - DAP_DELETE(l_atom_copy); - DAP_DELETE(l_sync_request); - return true; - } dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size); switch (l_atom_add_res) { case ATOM_PASS: if (s_debug_more){ char l_atom_hash_str[72]={[0]='\0'}; dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); - log_it(L_WARNING,"Atom with hash %s for %s:%s not accepted (code ATOM_PASS, already present)", l_atom_hash_str, l_chain->net_name, l_chain->name); + log_it(L_WARNING, "Atom with hash %s for %s:%s not accepted (code ATOM_PASS, already present)", l_atom_hash_str, l_chain->net_name, l_chain->name); } dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); DAP_DELETE(l_atom_copy); @@ -535,8 +528,7 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) if (s_debug_more) { char l_atom_hash_str[72]={'\0'}; dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); - log_it(L_INFO, "%s atom with hash %s for %s:%s", l_atom_add_res == ATOM_ACCEPT ? "Accepted" : "Thresholded", - l_atom_hash_str, l_chain->net_name, l_chain->name); + log_it(L_INFO,"Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); } int l_res = dap_chain_atom_save(l_chain, l_atom_copy, l_atom_copy_size, l_sync_request->request_hdr.cell_id); if(l_res < 0) { @@ -548,30 +540,34 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) } dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain->net_id); dap_chain_t *l_cur_chain; - DL_FOREACH(l_net->pub.chains, l_cur_chain) { - if (l_cur_chain->callback_atom_add_from_treshold) { - dap_chain_atom_ptr_t l_atom_treshold; - do { - size_t l_atom_treshold_size; - if (s_debug_more) - log_it(L_DEBUG, "Try to add atom from treshold"); - l_atom_treshold = l_cur_chain->callback_atom_add_from_treshold(l_cur_chain, &l_atom_treshold_size); - if (l_atom_treshold) { - dap_chain_cell_id_t l_cell_id = (l_cur_chain == l_chain) ? l_sync_request->request_hdr.cell_id - : l_cur_chain->cells->id; - int l_res = dap_chain_atom_save(l_cur_chain, l_atom_copy, l_atom_copy_size, l_cell_id); - log_it(L_INFO, "Added atom from treshold"); - if (l_res < 0) { - char l_atom_hash_str[72] = {'\0'}; - dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str, sizeof(l_atom_hash_str) - 1); - log_it(L_ERROR, "Can't save atom %s from treshold to file", l_atom_hash_str); - } else if (l_cur_chain == l_chain) { - dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); + bool l_processed; + do { + l_processed = false; + DL_FOREACH(l_net->pub.chains, l_cur_chain) { + if (l_cur_chain->callback_atom_add_from_treshold) { + dap_chain_atom_ptr_t l_atom_treshold; + do { + size_t l_atom_treshold_size; + if (s_debug_more) + log_it(L_DEBUG, "Try to add atom from treshold"); + l_atom_treshold = l_cur_chain->callback_atom_add_from_treshold(l_cur_chain, &l_atom_treshold_size); + if (l_atom_treshold) { + dap_chain_cell_id_t l_cell_id = (l_cur_chain == l_chain) ? l_sync_request->request_hdr.cell_id + : l_cur_chain->cells->id; + int l_res = dap_chain_atom_save(l_cur_chain, l_atom_copy, l_atom_copy_size, l_cell_id); + log_it(L_INFO, "Added atom from treshold"); + if (l_res < 0) { + char l_atom_hash_str[72] = {'\0'}; + dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str, sizeof(l_atom_hash_str) - 1); + log_it(L_ERROR, "Can't save atom %s from treshold to file", l_atom_hash_str); + } else if (l_cur_chain == l_chain) { + dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); + } } - } - } while(l_atom_treshold); + } while(l_atom_treshold); + } } - } + } while (l_processed); break; case ATOM_REJECT: { if (s_debug_more) { diff --git a/modules/global-db/dap_chain_global_db.c b/modules/global-db/dap_chain_global_db.c index 35665007b98228ec5a06c0ab663490eb04eb6f21..be114e090cd24a3b4579cdf4ab6b0284d8a0af95 100644 --- a/modules/global-db/dap_chain_global_db.c +++ b/modules/global-db/dap_chain_global_db.c @@ -689,6 +689,7 @@ bool dap_chain_global_db_gr_save(dap_global_db_obj_t* a_objs, size_t a_objs_coun for(size_t q = 0; q < a_objs_count; ++q) { dap_store_obj_t *store_data_cur = l_store_data + q; dap_global_db_obj_t *a_obj_cur = a_objs + q; + store_data_cur->type = 'a'; store_data_cur->key = a_obj_cur->key; store_data_cur->group = (char*)a_group; store_data_cur->value = a_obj_cur->value; diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 68dde5c3068beba5bd92d23226c2cd8621ba366d..4be57955c7b10c687da469d3b7e78ce799b858b9 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -728,6 +728,7 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t *a_node_cl log_it(L_ERROR, "Links count is zero in disconnected callback, looks smbd decreased it twice or forget to increase on connect/reconnect"); } if (l_net_pvt->state_target != NET_STATE_OFFLINE) { + a_node_client->keep_connection = true; for (dap_list_t *it = l_net_pvt->net_links; it; it = it->next) { if (((struct net_link *)it->data)->link == NULL) { // We have a free prepared link s_node_link_remove(l_net_pvt, a_node_client); @@ -738,9 +739,7 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t *a_node_cl } } dap_chain_node_info_t *l_link_node_info = s_get_dns_link_from_cfg(l_net); - if (!l_link_node_info) { // Try to keep this connection - a_node_client->keep_connection = true; - } else { + if (l_link_node_info) { struct link_dns_request *l_dns_request = DAP_NEW_Z(struct link_dns_request); l_dns_request->net = l_net; if (dap_chain_node_info_dns_request(l_link_node_info->hdr.ext_addr_v4, @@ -753,7 +752,6 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t *a_node_cl log_it(L_ERROR, "Can't process node info dns request"); DAP_DELETE(l_link_node_info); DAP_DELETE(l_dns_request); - a_node_client->keep_connection = true; } else { s_node_link_remove(l_net_pvt, a_node_client); a_node_client->keep_connection = false; @@ -826,13 +824,15 @@ static void s_node_link_callback_delete(dap_chain_node_client_t * a_node_client, "}\n", a_node_client->net->pub.id.uint64, a_node_client->info->hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address)); return; + } else if (a_node_client->is_connected) { + a_node_client->is_connected = false; + if (l_net_pvt->links_connected_count) + l_net_pvt->links_connected_count--; + else + log_it(L_ERROR, "Links count is zero in delete callback"); } dap_chain_net_sync_unlock(l_net, a_node_client); pthread_rwlock_wrlock(&l_net_pvt->rwlock); - if (l_net_pvt->links_connected_count) - l_net_pvt->links_connected_count--; - else - log_it(L_ERROR, "Links count is zero in delete callback"); for ( dap_list_t * it = l_net_pvt->net_links; it; it=it->next ){ if (((struct net_link *)it->data)->link == a_node_client) { log_it(L_DEBUG,"Replace node client with new one"); @@ -1157,7 +1157,6 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) for (dap_list_t *l_tmp = l_net_pvt->net_links; l_tmp; l_tmp = dap_list_next(l_tmp)) { dap_chain_node_info_t *l_link_info = ((struct net_link *)l_tmp->data)->link_info; dap_chain_node_client_t *l_client = dap_chain_net_client_create_n_connect(l_net, l_link_info); - l_client->keep_connection = true; ((struct net_link *)l_tmp->data)->link = l_client; if (++l_used_links == s_required_links_count) break; @@ -1253,7 +1252,14 @@ bool dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t * */ struct dap_chain_node_client * dap_chain_net_client_create_n_connect( dap_chain_net_t * a_net,struct dap_chain_node_info* a_link_info) { - return dap_chain_node_client_create_n_connect(a_net, a_link_info,"CN",(dap_chain_node_client_callbacks_t *)&s_node_link_callbacks,a_net); + dap_chain_node_client_t *l_ret = dap_chain_node_client_create_n_connect(a_net, + a_link_info, + "CN", + (dap_chain_node_client_callbacks_t *)&s_node_link_callbacks, + a_net); + if (l_ret) + l_ret->keep_connection = true; + return l_ret; } /** @@ -1818,13 +1824,17 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) dap_chain_load_all(l_chain); } } - DL_FOREACH(l_net->pub.chains, l_chain) { - if (l_chain->callback_atom_add_from_treshold) { - while (l_chain->callback_atom_add_from_treshold(l_chain, NULL)) { - log_it(L_DEBUG, "Added atom from treshold"); + bool l_processed; + do { + l_processed = false; + DL_FOREACH(l_net->pub.chains, l_chain) { + if (l_chain->callback_atom_add_from_treshold) { + while (l_chain->callback_atom_add_from_treshold(l_chain, NULL)) { + log_it(L_DEBUG, "Added atom from treshold"); + } } } - } + } while (l_processed); } else { dap_chain_node_cli_set_reply_text(a_str_reply, "Command requires one of subcomand: sync, link, go, get, stats, ca, ledger"); diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 0a67b42759f0bc9abf37194f20def7a04dfa7b36..5e7daeb3996b6199793d649f01f1375746447461 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -197,13 +197,12 @@ static void s_node_client_connected_synchro_start_callback(dap_worker_t *a_worke DAP_DELETE(a_arg); } -dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_events_socket_uuid_t *l_uuid) +dap_chain_node_sync_status_t dap_chain_node_client_start_sync(dap_events_socket_uuid_t *a_uuid) { dap_chain_node_client_handle_t *l_client_found = NULL; - HASH_FIND(hh, s_clients, l_uuid, sizeof(*l_uuid), l_client_found); + HASH_FIND(hh, s_clients, a_uuid, sizeof(*a_uuid), l_client_found); if(!l_client_found){ - log_it(L_DEBUG,"Chain node client %p was deleted before timer fired, nothing to do", l_uuid); - DAP_DELETE(l_uuid); + log_it(L_DEBUG,"Chain node client %p was deleted before timer fired, nothing to do", a_uuid); return NODE_SYNC_STATUS_MISSING; } @@ -258,8 +257,10 @@ static bool s_timer_update_states_callback(void *a_arg) dap_events_socket_uuid_t *l_uuid = (dap_events_socket_uuid_t *)a_arg; assert(l_uuid); dap_chain_node_sync_status_t l_status = dap_chain_node_client_start_sync(l_uuid); - if (l_status == NODE_SYNC_STATUS_MISSING) + if (l_status == NODE_SYNC_STATUS_MISSING) { + DAP_DELETE(l_uuid); return false; + } if (l_status == NODE_SYNC_STATUS_FAILED) { dap_chain_node_client_handle_t *l_client_found = NULL; HASH_FIND(hh, s_clients, l_uuid, sizeof(*l_uuid), l_client_found); @@ -268,9 +269,16 @@ static bool s_timer_update_states_callback(void *a_arg) l_me->state = NODE_CLIENT_STATE_DISCONNECTED; if (l_me->keep_connection) { if (dap_client_pvt_find(l_me->client->pvt_uuid)) { - log_it(L_INFO, "Reconnecting node client with peer "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_me->remote_node_addr)); - l_me->state = NODE_CLIENT_STATE_CONNECTING ; - dap_client_go_stage(l_me->client, STAGE_STREAM_STREAMING, s_stage_connected_callback); + if (l_me->callbacks.disconnected) { + l_me->callbacks.disconnected(l_me, l_me->callbacks_arg); + } + if (l_me->keep_connection) { + log_it(L_INFO, "Reconnecting node client with peer "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_me->remote_node_addr)); + l_me->state = NODE_CLIENT_STATE_CONNECTING ; + dap_client_go_stage(l_me->client, STAGE_STREAM_STREAMING, s_stage_connected_callback); + } else { + dap_chain_node_client_close(l_me); + } } else dap_chain_node_client_close(l_me); } @@ -589,7 +597,7 @@ static int save_stat_to_database(dap_stream_ch_chain_net_srv_pkt_test_t *a_reque l_key = strtoll(l_obj->key, NULL, 16); } char *l_key_str = dap_strdup_printf("%06x", ++l_key); - if(!dap_chain_global_db_gr_set(dap_strdup(l_key_str), (uint8_t *) json_str, strlen(json_str) + 1, l_group)) { + if(!dap_chain_global_db_gr_set(dap_strdup(l_key_str), dap_strdup(json_str), strlen(json_str) + 1, l_group)) { l_ret = -1; } DAP_DELETE(l_key_str); diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 767270a9eef320ef7e29297306d26f301f7ca1fe..18def718401730c3158a60ee541a22915622d132 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -457,7 +457,7 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha switch (ret) { case ATOM_MOVE_TO_THRESHOLD: pthread_rwlock_wrlock(l_events_rwlock); - HASH_ADD(hh, PVT(l_dag)->events_treshold, hash,sizeof (l_event_item->hash), l_event_item); + HASH_ADD(hh, PVT(l_dag)->events_treshold, hash, sizeof(l_event_item->hash), l_event_item); pthread_rwlock_unlock(l_events_rwlock); if(s_debug_more) log_it(L_DEBUG, "... added to threshold"); @@ -467,7 +467,7 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha switch (l_consensus_check) { case 0: pthread_rwlock_wrlock(l_events_rwlock); - HASH_ADD(hh, PVT(l_dag)->events,hash,sizeof (l_event_item->hash), l_event_item); + HASH_ADD(hh, PVT(l_dag)->events,hash, sizeof(l_event_item->hash), l_event_item); s_dag_events_lasts_process_new_last_event(l_dag, l_event_item); pthread_rwlock_unlock(l_events_rwlock); if(s_debug_more) @@ -644,23 +644,27 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain // add all atoms from treshold dap_chain_net_t *l_net = dap_chain_net_by_id(a_chain->net_id); dap_chain_t *l_cur_chain; - DL_FOREACH(l_net->pub.chains, l_cur_chain) { - if (l_cur_chain->callback_atom_add_from_treshold) { - dap_chain_atom_ptr_t l_atom_treshold; - do { - size_t l_atom_treshold_size; - // add in ledger - l_atom_treshold = l_cur_chain->callback_atom_add_from_treshold(l_cur_chain, &l_atom_treshold_size); - // add into file - if (l_atom_treshold) { - int l_res = dap_chain_atom_save(l_cur_chain, l_atom_treshold, l_atom_treshold_size, l_cur_chain->cells->id); - if (l_res < 0) { - log_it(L_ERROR, "Can't save event %p from treshold", l_atom_treshold); + bool l_processed; + do { + l_processed = false; + DL_FOREACH(l_net->pub.chains, l_cur_chain) { + if (l_cur_chain->callback_atom_add_from_treshold) { + dap_chain_atom_ptr_t l_atom_treshold; + do { + size_t l_atom_treshold_size; + // add in ledger + l_atom_treshold = l_cur_chain->callback_atom_add_from_treshold(l_cur_chain, &l_atom_treshold_size); + // add into file + if (l_atom_treshold) { + int l_res = dap_chain_atom_save(l_cur_chain, l_atom_treshold, l_atom_treshold_size, l_cur_chain->cells->id); + if (l_res < 0) { + log_it(L_ERROR, "Can't save event %p from treshold", l_atom_treshold); + } } - } - } while (l_atom_treshold); + } while (l_atom_treshold); + } } - } + } while (l_processed); l_datum_processed++; } else { @@ -938,10 +942,10 @@ int dap_chain_cs_dag_event_verify_hashes_with_treshold(dap_chain_cs_dag_t * a_da if (ret == DAP_THRESHOLD_CONFLICTING) return ret; return l_is_events_all_hashes ? - l_is_events_main_hashes ? + (l_is_events_main_hashes ? DAP_THRESHOLD_OK : - DAP_THRESHOLD_NO_HASHES : - DAP_THRESHOLD_NO_HASHES_IN_MAIN; + DAP_THRESHOLD_NO_HASHES_IN_MAIN) : + DAP_THRESHOLD_NO_HASHES; } /** @@ -952,40 +956,35 @@ int dap_chain_cs_dag_event_verify_hashes_with_treshold(dap_chain_cs_dag_t * a_da dap_chain_cs_dag_event_item_t* dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger) { bool res = false; - // TODO Process finish treshold. For now - easiest from possible dap_chain_cs_dag_event_item_t * l_event_item = NULL, * l_event_item_tmp = NULL; pthread_rwlock_wrlock(&PVT(a_dag)->events_rwlock); - // !!! int l_count = HASH_COUNT(PVT(a_dag)->events_treshold); log_it(L_DEBUG, "*** %d events in threshold", l_count); - HASH_ITER(hh,PVT(a_dag)->events_treshold,l_event_item, l_event_item_tmp){ - dap_dag_threshold_verification_res_t ret = dap_chain_cs_dag_event_verify_hashes_with_treshold (a_dag, l_event_item->event); - if ( ret == DAP_THRESHOLD_OK || ret == DAP_THRESHOLD_CONFLICTING ){ // All its hashes are in main table, move thats one too into it - HASH_DEL(PVT(a_dag)->events_treshold,l_event_item); - - if(ret == DAP_THRESHOLD_OK){ - if(s_debug_more) { - char * l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_item->hash); - log_it(L_DEBUG, "Processing event (threshold): %s...", l_event_hash_str); - DAP_DELETE(l_event_hash_str); - } - int l_add_res = s_dap_chain_add_atom_to_events_table(a_dag, a_ledger, l_event_item); - if (!l_add_res) { - HASH_ADD(hh, PVT(a_dag)->events,hash,sizeof (l_event_item->hash), l_event_item); - s_dag_events_lasts_process_new_last_event(a_dag, l_event_item); - if(s_debug_more) - log_it(L_INFO, "... moved from treshold to main chains"); - res = true; - break; - }else{ - if(s_debug_more) - log_it(L_WARNING, "... error adding"); - DAP_DELETE(l_event_item); - } - //res = true; - }else if(ret == DAP_THRESHOLD_CONFLICTING) - HASH_ADD(hh, PVT(a_dag)->events_treshold_conflicted, hash,sizeof (l_event_item->hash), l_event_item); - + HASH_ITER(hh, PVT(a_dag)->events_treshold, l_event_item, l_event_item_tmp) { + dap_dag_threshold_verification_res_t ret = dap_chain_cs_dag_event_verify_hashes_with_treshold(a_dag, l_event_item->event); + if (ret == DAP_THRESHOLD_OK) { + if (s_debug_more) { + char * l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_item->hash); + log_it(L_DEBUG, "Processing event (threshold): %s...", l_event_hash_str); + DAP_DELETE(l_event_hash_str); + } + int l_add_res = s_dap_chain_add_atom_to_events_table(a_dag, a_ledger, l_event_item); + if (!l_add_res) { + HASH_DEL(PVT(a_dag)->events_treshold, l_event_item); + HASH_ADD(hh, PVT(a_dag)->events, hash, sizeof(l_event_item->hash), l_event_item); + s_dag_events_lasts_process_new_last_event(a_dag, l_event_item); + if(s_debug_more) + log_it(L_INFO, "... moved from treshold to main chains"); + res = true; + break; + } else { + if(s_debug_more) + log_it(L_WARNING, "... error adding"); + } + //res = true; + } else if (ret == DAP_THRESHOLD_CONFLICTING) { + HASH_DEL(PVT(a_dag)->events_treshold, l_event_item); + HASH_ADD(hh, PVT(a_dag)->events_treshold_conflicted, hash, sizeof (l_event_item->hash), l_event_item); } } pthread_rwlock_unlock(&PVT(a_dag)->events_rwlock);