Skip to content
Snippets Groups Projects
Commit 1bb2aba1 authored by Roman Khlopkov's avatar Roman Khlopkov 🔜 Committed by dmitriy.gerasimov
Browse files

bugs-4503

parent 64b7ecfc
No related branches found
No related tags found
2 merge requests!251Master,!250Master
...@@ -131,32 +131,10 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) ...@@ -131,32 +131,10 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg)
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
l_ch_chain->request_atom_iter = l_chain->callback_atom_iter_create(l_chain);
dap_chain_atom_ptr_t * l_lasts = NULL; size_t l_first_size = 0;
size_t *l_lasts_sizes = NULL; dap_chain_atom_ptr_t *l_first = l_chain->callback_atom_iter_get_first(l_ch_chain->request_atom_iter, &l_first_size);
size_t l_lasts_count = 0; if (l_first && l_first_size) {
dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create(l_chain);
l_ch_chain->request_atom_iter = l_iter;
l_lasts = l_chain->callback_atom_iter_get_lasts(l_iter, &l_lasts_count, &l_lasts_sizes);
log_it(L_INFO, "Found %d last atoms for synchronization", l_lasts_count);
if (l_lasts && l_lasts_sizes) {
for(long int i = 0; i < l_lasts_count; i++) {
dap_chain_atom_item_t * l_item = NULL;
dap_chain_hash_fast_t l_atom_hash;
dap_hash_fast(l_lasts[i], l_lasts_sizes[i], &l_atom_hash);
pthread_mutex_lock(&l_ch_chain->mutex);
HASH_FIND(hh, l_ch_chain->request_atoms_lasts, &l_atom_hash, sizeof(l_atom_hash),
l_item);
if(l_item == NULL) { // Not found, add new lasts
l_item = DAP_NEW_Z(dap_chain_atom_item_t);
l_item->atom = l_lasts[i];
l_item->atom_size = l_lasts_sizes[i];
memcpy(&l_item->atom_hash, &l_atom_hash, sizeof(l_atom_hash));
HASH_ADD(hh, l_ch_chain->request_atoms_lasts, atom_hash, sizeof(l_atom_hash),
l_item);
}
pthread_mutex_unlock(&l_ch_chain->mutex);
}
// first packet // first packet
l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS; l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS;
dap_chain_node_addr_t l_node_addr = { 0 }; dap_chain_node_addr_t l_node_addr = { 0 };
...@@ -168,18 +146,16 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) ...@@ -168,18 +146,16 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg)
} }
else { else {
// last packet // last packet
dap_stream_ch_chain_sync_request_t l_request; dap_stream_ch_chain_sync_request_t l_request = {};
memset(&l_request,0,sizeof (l_request));
dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS,
l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); l_ch_chain->request_cell_id, &l_request, sizeof(l_request));
DAP_DEL_Z(l_ch_chain->request_atom_iter);
l_ch_chain->state = CHAIN_STATE_IDLE; l_ch_chain->state = CHAIN_STATE_IDLE;
if (l_ch_chain->callback_notify_packet_out) if (l_ch_chain->callback_notify_packet_out)
l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS,
NULL, 0, l_ch_chain->callback_notify_arg); NULL, 0, l_ch_chain->callback_notify_arg);
} }
DAP_DELETE(l_lasts);
DAP_DELETE(l_iter);
dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); dap_stream_ch_set_ready_to_write_unsafe(l_ch, true);
dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker);
return true; return true;
...@@ -606,15 +582,7 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain) ...@@ -606,15 +582,7 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain)
memset(&a_ch_chain->request_cell_id, 0, sizeof(a_ch_chain->request_cell_id)); memset(&a_ch_chain->request_cell_id, 0, sizeof(a_ch_chain->request_cell_id));
memset(&a_ch_chain->request_chain_id, 0, sizeof(a_ch_chain->request_chain_id)); memset(&a_ch_chain->request_chain_id, 0, sizeof(a_ch_chain->request_chain_id));
memset(&a_ch_chain->request_last_ts, 0, sizeof(a_ch_chain->request_last_ts)); memset(&a_ch_chain->request_last_ts, 0, sizeof(a_ch_chain->request_last_ts));
DAP_DEL_Z(a_ch_chain->request_atom_iter);
dap_chain_atom_item_t *l_atom_item = NULL, *l_atom_item_tmp = NULL;
pthread_mutex_lock(&a_ch_chain->mutex);
HASH_ITER( hh,a_ch_chain->request_atoms_lasts, l_atom_item, l_atom_item_tmp)
HASH_DEL(a_ch_chain->request_atoms_lasts, l_atom_item);
HASH_ITER( hh, a_ch_chain->request_atoms_processed, l_atom_item, l_atom_item_tmp )
HASH_DEL(a_ch_chain->request_atoms_processed, l_atom_item);
pthread_mutex_unlock(&a_ch_chain->mutex);
} }
bool s_process_gdb_iter(dap_stream_ch_t *a_ch) bool s_process_gdb_iter(dap_stream_ch_t *a_ch)
...@@ -702,79 +670,26 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) ...@@ -702,79 +670,26 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
// Synchronize chains // Synchronize chains
case CHAIN_STATE_SYNC_CHAINS: { case CHAIN_STATE_SYNC_CHAINS: {
//log_it(L_DEBUG, "CHAIN_STATE_SYNC_CHAINS"); //log_it(L_DEBUG, "CHAIN_STATE_SYNC_CHAINS");
dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); if (l_ch_chain->request_atom_iter->cur == NULL) { // All chains synced
dap_chain_atom_item_t * l_atom_item = NULL, *l_atom_item_tmp = NULL;//, *l_chains_lasts_new = NULL; dap_stream_ch_chain_sync_request_t l_request = {};
if(l_ch_chain->request_atoms_lasts == NULL) { // All chains synced
dap_stream_ch_chain_sync_request_t l_request;
memset(&l_request,0,sizeof (l_request));
uint8_t l_send_pkt_type = l_ch_chain->state == CHAIN_STATE_SYNC_CHAINS ?
DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS :
DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL;
// last message // last message
dap_stream_ch_chain_pkt_write_unsafe(l_ch, l_send_pkt_type, dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS,
l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); l_ch_chain->request_cell_id, &l_request, sizeof(l_request));
l_packet_out = true; l_packet_out = true;
log_it( L_DEBUG,"Synced: %llu atoms processed", log_it( L_DEBUG,"Synced: %llu atoms processed", l_ch_chain->stats_request_atoms_processed);
l_ch_chain->stats_request_atoms_processed);
dap_stream_ch_chain_go_idle(l_ch_chain); dap_stream_ch_chain_go_idle(l_ch_chain);
if (l_ch_chain->callback_notify_packet_out)
if(l_ch_chain->callback_notify_packet_out) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, NULL,
l_ch_chain->callback_notify_packet_out(l_ch_chain, l_send_pkt_type, NULL, 0, l_ch_chain->callback_notify_arg); 0, l_ch_chain->callback_notify_arg);
}else{ // Process one chain from l_ch_chain->request_atoms_lasts } else { // Process one chain from l_ch_chain->request_atom_iter
pthread_mutex_lock(&l_ch_chain->mutex); dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_net_id,
HASH_ITER(hh,l_ch_chain->request_atoms_lasts, l_atom_item, l_atom_item_tmp) { l_ch_chain->request_chain_id, l_ch_chain->request_cell_id,
dap_chain_atom_item_t * l_atom_item_proc = NULL; l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size);
// Check if its processed already l_packet_out = true;
HASH_FIND(hh, l_ch_chain->request_atoms_processed, &l_atom_item->atom_hash, l_ch_chain->stats_request_atoms_processed++;
sizeof(l_atom_item->atom_hash), l_atom_item_proc); // Then get next atom and populate new last
l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL);
if(l_atom_item_proc == NULL) { // If not processed we first store it in special table
l_atom_item_proc = DAP_NEW_Z(dap_chain_atom_item_t);
l_atom_item_proc->atom = l_atom_item->atom;
memcpy(&l_atom_item_proc->atom_hash, &l_atom_item->atom_hash, sizeof(l_atom_item->atom_hash));
HASH_ADD(hh, l_ch_chain->request_atoms_processed, atom_hash, sizeof(l_atom_item->atom_hash),
l_atom_item_proc);
// Then flush it out to the remote
size_t l_atom_size = l_atom_item->atom_size;
log_it(L_INFO, "Send one chain packet len=%d", l_atom_size);
dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_net_id,
l_ch_chain->request_chain_id, l_ch_chain->request_cell_id,
l_atom_item->atom, l_atom_size);
l_packet_out = true;
l_ch_chain->stats_request_atoms_processed++;
// Then parse links and populate new lasts
size_t l_links_count = 0;
dap_chain_atom_ptr_t * l_links = NULL;
size_t *l_links_sizes = NULL;
dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create_from(l_chain, l_atom_item->atom, l_atom_item->atom_size);
l_links = l_chain->callback_atom_iter_get_links(l_iter, &l_links_count, &l_links_sizes);
DAP_DELETE(l_iter);
if (l_links && l_links_sizes) {
for(size_t i = 0; i < l_links_count; i++) { // Find links
dap_chain_atom_item_t * l_link_item = NULL;
dap_chain_hash_fast_t l_link_hash;
dap_hash_fast(l_links[i], l_links_sizes[i], &l_link_hash);
// Check link in processed atims
HASH_FIND(hh, l_ch_chain->request_atoms_processed, &l_link_hash, sizeof(l_link_hash), l_link_item);
if(l_link_item == NULL) { // Not found, add new lasts
l_link_item = DAP_NEW_Z(dap_chain_atom_item_t);
l_link_item->atom = l_links[i];// do not use memory cause it will be deleted
l_link_item->atom_size = l_links_sizes[i];
memcpy(&l_link_item->atom_hash, &l_link_hash, sizeof(l_link_hash));
HASH_ADD(hh, l_ch_chain->request_atoms_lasts, atom_hash, sizeof(l_link_hash), l_link_item);
}
}
DAP_DELETE(l_links);
}
HASH_DEL(l_ch_chain->request_atoms_lasts, l_atom_item);
break;
} else {
HASH_DEL(l_ch_chain->request_atoms_lasts, l_atom_item);
}
}
pthread_mutex_unlock(&l_ch_chain->mutex);
} }
} break; } break;
default: break; default: break;
......
...@@ -54,8 +54,6 @@ typedef struct dap_stream_ch_chain { ...@@ -54,8 +54,6 @@ typedef struct dap_stream_ch_chain {
dap_stream_ch_chain_state_t state; dap_stream_ch_chain_state_t state;
dap_chain_atom_iter_t * request_atom_iter; dap_chain_atom_iter_t * request_atom_iter;
dap_chain_atom_item_t * request_atoms_lasts;
dap_chain_atom_item_t * request_atoms_processed;
uint8_t *pkt_data; uint8_t *pkt_data;
uint64_t pkt_data_size; uint64_t pkt_data_size;
uint64_t stats_request_atoms_processed; uint64_t stats_request_atoms_processed;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment