Skip to content
Snippets Groups Projects
Commit 7ec6ef70 authored by Roman Khlopkov's avatar Roman Khlopkov 🔜 Committed by dmitriy.gerasimov
Browse files

bugs-4547

parent 03b95a26
No related branches found
No related tags found
2 merge requests!251Master,!250Master
......@@ -455,8 +455,9 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg)
dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg;
dap_worker_t * w = a_es->worker;
//log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new);
if(dap_events_socket_check_unsafe( w, a_es)){
log_it(L_ERROR, "Already assigned %d (%p), you're doing smth wrong", a_es->socket, a_es);
if(dap_events_socket_check_unsafe( w, l_es_new)){
//log_it(L_ERROR, "Already assigned %d (%p), you're doing smth wrong", l_es_new->socket, l_es_new);
// Socket already present in worker, it's OK
return;
}
......
......@@ -219,11 +219,12 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
}
dap_chain_hash_fast_t l_atom_hash = {};
dap_chain_atom_ptr_t l_atom_copy = l_ch_chain->pkt_data;
uint64_t l_atom_copy_size = l_ch_chain->pkt_data_size;
l_ch_chain->pkt_data = NULL;
l_ch_chain->pkt_data_size = 0;
if( l_atom_copy && l_atom_copy_size){
dap_list_t *l_pkt_copy_list = l_ch_chain->pkt_copy_list;
if (l_pkt_copy_list) {
l_ch_chain->pkt_copy_list = l_ch_chain->pkt_copy_list->next;
dap_chain_pkt_copy_t *l_pkt_copy = (dap_chain_pkt_copy_t *)l_pkt_copy_list->data;
dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_copy->pkt_data;
uint64_t l_atom_copy_size = l_pkt_copy->pkt_data_size;
dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash);
dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain);
size_t l_atom_size =0;
......@@ -277,8 +278,10 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
DAP_DELETE(l_atom_copy);
}
l_chain->callback_atom_iter_delete(l_atom_iter);
DAP_DELETE(l_pkt_copy);
DAP_DELETE(l_pkt_copy_list);
}else
log_it(L_WARNING, "In proc thread got stream ch packet with pkt_size: %zd and pkt_data: %p", l_atom_copy_size, l_atom_copy);
log_it(L_WARNING, "In proc thread got CHAINS stream ch packet with zero data");
dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker);
return true;
}
......@@ -288,93 +291,101 @@ bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
UNUSED(a_thread);
dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg;
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
size_t l_data_obj_count = 0;
// deserialize data & Parse data from dap_db_log_pack()
dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_ch_chain->pkt_data,l_ch_chain->pkt_data_size, &l_data_obj_count);
//log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count );
for(size_t i = 0; i < l_data_obj_count; i++) {
// timestamp for exist obj
time_t l_timestamp_cur = 0;
// obj to add
dap_store_obj_t* l_obj = l_store_obj + i;
// read item from base;
size_t l_count_read = 0;
dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group,
l_obj->key, &l_count_read);
// get timestamp for the exist entry
if(l_read_obj)
l_timestamp_cur = l_read_obj->timestamp;
// get timestamp for the deleted entry
else
{
l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key);
}
dap_list_t *l_pkt_copy_list = l_ch_chain->pkt_copy_list;
if (l_pkt_copy_list) {
l_ch_chain->pkt_copy_list = l_ch_chain->pkt_copy_list->next;
dap_chain_pkt_copy_t *l_pkt_copy = (dap_chain_pkt_copy_t *)l_pkt_copy_list->data;
size_t l_data_obj_count = 0;
// deserialize data & Parse data from dap_db_log_pack()
dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_pkt_copy->pkt_data, l_pkt_copy->pkt_data_size, &l_data_obj_count);
//log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count );
for(size_t i = 0; i < l_data_obj_count; i++) {
// timestamp for exist obj
time_t l_timestamp_cur = 0;
// obj to add
dap_store_obj_t* l_obj = l_store_obj + i;
// read item from base;
size_t l_count_read = 0;
dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group,
l_obj->key, &l_count_read);
// get timestamp for the exist entry
if(l_read_obj)
l_timestamp_cur = l_read_obj->timestamp;
// get timestamp for the deleted entry
else
{
l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key);
}
//check whether to apply the received data into the database
bool l_apply = true;
if(l_obj->timestamp < l_timestamp_cur)
l_apply = false;
else if(l_obj->type == 'd') {
// already deleted
if(!l_read_obj)
l_apply = false;
}
else if(l_obj->type == 'a') {
bool l_is_the_same_present = false;
if(l_read_obj &&
l_read_obj->value_len == l_obj->value_len &&
!memcmp(l_read_obj->value, l_obj->value, l_obj->value_len))
l_is_the_same_present = true;
// this data already present in global_db and not obsolete (out of date)
if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp))
//check whether to apply the received data into the database
bool l_apply = true;
if(l_obj->timestamp < l_timestamp_cur)
l_apply = false;
}
if(l_read_obj)
dap_store_obj_free(l_read_obj, l_count_read);
else if(l_obj->type == 'd') {
// already deleted
if(!l_read_obj)
l_apply = false;
}
else if(l_obj->type == 'a') {
bool l_is_the_same_present = false;
if(l_read_obj &&
l_read_obj->value_len == l_obj->value_len &&
!memcmp(l_read_obj->value, l_obj->value, l_obj->value_len))
l_is_the_same_present = true;
// this data already present in global_db and not obsolete (out of date)
if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp))
l_apply = false;
}
if(l_read_obj)
dap_store_obj_free(l_read_obj, l_count_read);
if(!l_apply) {
// If request was from defined node_addr we update its state
if(l_ch_chain->request.node_addr.uint64) {
dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
if(!l_apply) {
// If request was from defined node_addr we update its state
if(l_ch_chain->request.node_addr.uint64) {
dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
}
continue;
}
continue;
}
char l_ts_str[50];
dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp);
/*log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\""
" timestamp=\"%s\" value_len=%u ",
(char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group,
l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);*/
// apply received transaction
dap_chain_t *l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
if(l_chain) {
if(l_chain->callback_datums_pool_proc_with_group){
void * restrict l_store_obj_value = l_store_obj->value;
l_chain->callback_datums_pool_proc_with_group(l_chain,
(dap_chain_datum_t** restrict) l_store_obj_value, 1,
l_store_obj[i].group);
char l_ts_str[50];
dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp);
/*log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\""
" timestamp=\"%s\" value_len=%u ",
(char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group,
l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);*/
// apply received transaction
dap_chain_t *l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
if(l_chain) {
if(l_chain->callback_datums_pool_proc_with_group){
void * restrict l_store_obj_value = l_store_obj->value;
l_chain->callback_datums_pool_proc_with_group(l_chain,
(dap_chain_datum_t** restrict) l_store_obj_value, 1,
l_store_obj[i].group);
}
}
}
// save data to global_db
if(!dap_chain_global_db_obj_save(l_obj, 1)) {
dap_stream_ch_chain_pkt_write_error(l_ch, l_ch_chain->request_net_id,
l_ch_chain->request_chain_id, l_ch_chain->request_cell_id,
"ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED");
dap_stream_ch_set_ready_to_write_unsafe(l_ch, true);
} else {
// If request was from defined node_addr we update its state
if(l_ch_chain->request.node_addr.uint64) {
dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
// save data to global_db
if(!dap_chain_global_db_obj_save(l_obj, 1)) {
dap_stream_ch_chain_pkt_write_error(l_ch, l_ch_chain->request_net_id,
l_ch_chain->request_chain_id, l_ch_chain->request_cell_id,
"ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED");
dap_stream_ch_set_ready_to_write_unsafe(l_ch, true);
} else {
// If request was from defined node_addr we update its state
if(l_ch_chain->request.node_addr.uint64) {
dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
}
//log_it(L_DEBUG, "Added new GLOBAL_DB history pack");
}
//log_it(L_DEBUG, "Added new GLOBAL_DB history pack");
}
if(l_store_obj)
dap_store_obj_free(l_store_obj, l_data_obj_count);
DAP_DELETE(l_pkt_copy);
DAP_DELETE(l_pkt_copy_list);
} else {
log_it(L_WARNING, "In proc thread got GDB stream ch packet with zero data");
}
if(l_store_obj)
dap_store_obj_free(l_store_obj, l_data_obj_count);
dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker);
return true;
}
......@@ -516,9 +527,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
l_ch_chain->pkt_data = DAP_NEW_SIZE(byte_t, l_chain_pkt_data_size);
memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
l_ch_chain->pkt_data_size = l_chain_pkt_data_size;
dap_chain_pkt_copy_t *l_pkt_copy = DAP_NEW_Z(dap_chain_pkt_copy_t);
l_pkt_copy->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size);
memcpy(l_pkt_copy->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
l_pkt_copy->pkt_data_size = l_chain_pkt_data_size;
l_ch_chain->pkt_copy_list = dap_list_append(l_ch_chain->pkt_copy_list, l_pkt_copy);
dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_chain_pkt_callback, a_ch);
} else {
......@@ -545,9 +558,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size);
memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
l_ch_chain->pkt_data_size = l_chain_pkt_data_size;
dap_chain_pkt_copy_t *l_pkt_copy = DAP_NEW_Z(dap_chain_pkt_copy_t);
l_pkt_copy->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size);
memcpy(l_pkt_copy->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
l_pkt_copy->pkt_data_size = l_chain_pkt_data_size;
l_ch_chain->pkt_copy_list = dap_list_append(l_ch_chain->pkt_copy_list, l_pkt_copy);
dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_gdb_pkt_callback, a_ch);
} else {
......
......@@ -45,6 +45,11 @@ typedef struct dap_chain_atom_item{
UT_hash_handle hh;
} dap_chain_atom_item_t;
typedef struct dap_chain_pkt_copy {
uint64_t pkt_data_size;
byte_t *pkt_data;
} dap_chain_pkt_copy_t;
typedef struct dap_stream_ch_chain {
pthread_mutex_t mutex;
dap_stream_ch_t * ch;
......@@ -54,8 +59,7 @@ typedef struct dap_stream_ch_chain {
dap_stream_ch_chain_state_t state;
dap_chain_atom_iter_t * request_atom_iter;
byte_t *pkt_data;
uint64_t pkt_data_size;
dap_list_t *pkt_copy_list;
uint64_t stats_request_atoms_processed;
uint64_t stats_request_gdb_processed;
dap_stream_ch_chain_sync_request_t request;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment