Skip to content
Snippets Groups Projects
Commit 174e0d2d authored by dmitriy.gerasimov's avatar dmitriy.gerasimov
Browse files

Merge branch 'bugs-4503-d' into 'develop'

bugs-4503

See merge request !204
parents 64b7ecfc 1bb2aba1
No related branches found
No related tags found
3 merge requests!251Master,!250Master,!204bugs-4503
Pipeline #5347 passed with stage
in 23 seconds
......@@ -131,32 +131,10 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg)
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
dap_chain_atom_ptr_t * l_lasts = NULL;
size_t *l_lasts_sizes = NULL;
size_t l_lasts_count = 0;
dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create(l_chain);
l_ch_chain->request_atom_iter = l_iter;
l_lasts = l_chain->callback_atom_iter_get_lasts(l_iter, &l_lasts_count, &l_lasts_sizes);
log_it(L_INFO, "Found %d last atoms for synchronization", l_lasts_count);
if (l_lasts && l_lasts_sizes) {
for(long int i = 0; i < l_lasts_count; i++) {
dap_chain_atom_item_t * l_item = NULL;
dap_chain_hash_fast_t l_atom_hash;
dap_hash_fast(l_lasts[i], l_lasts_sizes[i], &l_atom_hash);
pthread_mutex_lock(&l_ch_chain->mutex);
HASH_FIND(hh, l_ch_chain->request_atoms_lasts, &l_atom_hash, sizeof(l_atom_hash),
l_item);
if(l_item == NULL) { // Not found, add new lasts
l_item = DAP_NEW_Z(dap_chain_atom_item_t);
l_item->atom = l_lasts[i];
l_item->atom_size = l_lasts_sizes[i];
memcpy(&l_item->atom_hash, &l_atom_hash, sizeof(l_atom_hash));
HASH_ADD(hh, l_ch_chain->request_atoms_lasts, atom_hash, sizeof(l_atom_hash),
l_item);
}
pthread_mutex_unlock(&l_ch_chain->mutex);
}
l_ch_chain->request_atom_iter = l_chain->callback_atom_iter_create(l_chain);
size_t l_first_size = 0;
dap_chain_atom_ptr_t *l_first = l_chain->callback_atom_iter_get_first(l_ch_chain->request_atom_iter, &l_first_size);
if (l_first && l_first_size) {
// first packet
l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS;
dap_chain_node_addr_t l_node_addr = { 0 };
......@@ -168,18 +146,16 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg)
}
else {
// last packet
dap_stream_ch_chain_sync_request_t l_request;
memset(&l_request,0,sizeof (l_request));
dap_stream_ch_chain_sync_request_t l_request = {};
dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS,
l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
l_ch_chain->request_cell_id, &l_request, sizeof(l_request));
DAP_DEL_Z(l_ch_chain->request_atom_iter);
l_ch_chain->state = CHAIN_STATE_IDLE;
if (l_ch_chain->callback_notify_packet_out)
l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS,
NULL, 0, l_ch_chain->callback_notify_arg);
}
DAP_DELETE(l_lasts);
DAP_DELETE(l_iter);
dap_stream_ch_set_ready_to_write_unsafe(l_ch, true);
dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker);
return true;
......@@ -606,15 +582,7 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain)
memset(&a_ch_chain->request_cell_id, 0, sizeof(a_ch_chain->request_cell_id));
memset(&a_ch_chain->request_chain_id, 0, sizeof(a_ch_chain->request_chain_id));
memset(&a_ch_chain->request_last_ts, 0, sizeof(a_ch_chain->request_last_ts));
dap_chain_atom_item_t *l_atom_item = NULL, *l_atom_item_tmp = NULL;
pthread_mutex_lock(&a_ch_chain->mutex);
HASH_ITER( hh,a_ch_chain->request_atoms_lasts, l_atom_item, l_atom_item_tmp)
HASH_DEL(a_ch_chain->request_atoms_lasts, l_atom_item);
HASH_ITER( hh, a_ch_chain->request_atoms_processed, l_atom_item, l_atom_item_tmp )
HASH_DEL(a_ch_chain->request_atoms_processed, l_atom_item);
pthread_mutex_unlock(&a_ch_chain->mutex);
DAP_DEL_Z(a_ch_chain->request_atom_iter);
}
bool s_process_gdb_iter(dap_stream_ch_t *a_ch)
......@@ -702,79 +670,26 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
// Synchronize chains
case CHAIN_STATE_SYNC_CHAINS: {
//log_it(L_DEBUG, "CHAIN_STATE_SYNC_CHAINS");
dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
dap_chain_atom_item_t * l_atom_item = NULL, *l_atom_item_tmp = NULL;//, *l_chains_lasts_new = NULL;
if(l_ch_chain->request_atoms_lasts == NULL) { // All chains synced
dap_stream_ch_chain_sync_request_t l_request;
memset(&l_request,0,sizeof (l_request));
uint8_t l_send_pkt_type = l_ch_chain->state == CHAIN_STATE_SYNC_CHAINS ?
DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS :
DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL;
if (l_ch_chain->request_atom_iter->cur == NULL) { // All chains synced
dap_stream_ch_chain_sync_request_t l_request = {};
// last message
dap_stream_ch_chain_pkt_write_unsafe(l_ch, l_send_pkt_type,
dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS,
l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
l_ch_chain->request_cell_id, &l_request, sizeof(l_request));
l_packet_out = true;
log_it( L_DEBUG,"Synced: %llu atoms processed",
l_ch_chain->stats_request_atoms_processed);
log_it( L_DEBUG,"Synced: %llu atoms processed", l_ch_chain->stats_request_atoms_processed);
dap_stream_ch_chain_go_idle(l_ch_chain);
if(l_ch_chain->callback_notify_packet_out)
l_ch_chain->callback_notify_packet_out(l_ch_chain, l_send_pkt_type, NULL, 0, l_ch_chain->callback_notify_arg);
}else{ // Process one chain from l_ch_chain->request_atoms_lasts
pthread_mutex_lock(&l_ch_chain->mutex);
HASH_ITER(hh,l_ch_chain->request_atoms_lasts, l_atom_item, l_atom_item_tmp) {
dap_chain_atom_item_t * l_atom_item_proc = NULL;
// Check if its processed already
HASH_FIND(hh, l_ch_chain->request_atoms_processed, &l_atom_item->atom_hash,
sizeof(l_atom_item->atom_hash), l_atom_item_proc);
if(l_atom_item_proc == NULL) { // If not processed we first store it in special table
l_atom_item_proc = DAP_NEW_Z(dap_chain_atom_item_t);
l_atom_item_proc->atom = l_atom_item->atom;
memcpy(&l_atom_item_proc->atom_hash, &l_atom_item->atom_hash, sizeof(l_atom_item->atom_hash));
HASH_ADD(hh, l_ch_chain->request_atoms_processed, atom_hash, sizeof(l_atom_item->atom_hash),
l_atom_item_proc);
// Then flush it out to the remote
size_t l_atom_size = l_atom_item->atom_size;
log_it(L_INFO, "Send one chain packet len=%d", l_atom_size);
dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_net_id,
l_ch_chain->request_chain_id, l_ch_chain->request_cell_id,
l_atom_item->atom, l_atom_size);
l_packet_out = true;
l_ch_chain->stats_request_atoms_processed++;
// Then parse links and populate new lasts
size_t l_links_count = 0;
dap_chain_atom_ptr_t * l_links = NULL;
size_t *l_links_sizes = NULL;
dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create_from(l_chain, l_atom_item->atom, l_atom_item->atom_size);
l_links = l_chain->callback_atom_iter_get_links(l_iter, &l_links_count, &l_links_sizes);
DAP_DELETE(l_iter);
if (l_links && l_links_sizes) {
for(size_t i = 0; i < l_links_count; i++) { // Find links
dap_chain_atom_item_t * l_link_item = NULL;
dap_chain_hash_fast_t l_link_hash;
dap_hash_fast(l_links[i], l_links_sizes[i], &l_link_hash);
// Check link in processed atims
HASH_FIND(hh, l_ch_chain->request_atoms_processed, &l_link_hash, sizeof(l_link_hash), l_link_item);
if(l_link_item == NULL) { // Not found, add new lasts
l_link_item = DAP_NEW_Z(dap_chain_atom_item_t);
l_link_item->atom = l_links[i];// do not use memory cause it will be deleted
l_link_item->atom_size = l_links_sizes[i];
memcpy(&l_link_item->atom_hash, &l_link_hash, sizeof(l_link_hash));
HASH_ADD(hh, l_ch_chain->request_atoms_lasts, atom_hash, sizeof(l_link_hash), l_link_item);
}
}
DAP_DELETE(l_links);
}
HASH_DEL(l_ch_chain->request_atoms_lasts, l_atom_item);
break;
} else {
HASH_DEL(l_ch_chain->request_atoms_lasts, l_atom_item);
}
}
pthread_mutex_unlock(&l_ch_chain->mutex);
if (l_ch_chain->callback_notify_packet_out)
l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, NULL,
0, l_ch_chain->callback_notify_arg);
} else { // Process one chain from l_ch_chain->request_atom_iter
dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_net_id,
l_ch_chain->request_chain_id, l_ch_chain->request_cell_id,
l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size);
l_packet_out = true;
l_ch_chain->stats_request_atoms_processed++;
// Then get next atom and populate new last
l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL);
}
} break;
default: break;
......
......@@ -54,8 +54,6 @@ typedef struct dap_stream_ch_chain {
dap_stream_ch_chain_state_t state;
dap_chain_atom_iter_t * request_atom_iter;
dap_chain_atom_item_t * request_atoms_lasts;
dap_chain_atom_item_t * request_atoms_processed;
uint8_t *pkt_data;
uint64_t pkt_data_size;
uint64_t stats_request_atoms_processed;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment