Skip to content
Snippets Groups Projects
Commit c6ce2255 authored by Roman Khlopkov's avatar Roman Khlopkov 🔜
Browse files

[*] Atom copies stored in list now

parent 03b95a26
No related branches found
No related tags found
2 merge requests!220bugs-4547,!219bugs-4547
Pipeline #5641 passed with stage
in 13 seconds
...@@ -455,8 +455,8 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) ...@@ -455,8 +455,8 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg)
dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg; dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg;
dap_worker_t * w = a_es->worker; dap_worker_t * w = a_es->worker;
//log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new); //log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new);
if(dap_events_socket_check_unsafe( w, a_es)){ if(dap_events_socket_check_unsafe( w, l_es_new)){
log_it(L_ERROR, "Already assigned %d (%p), you're doing smth wrong", a_es->socket, a_es); log_it(L_ERROR, "Already assigned %d (%p), you're doing smth wrong", l_es_new->socket, l_es_new);
return; return;
} }
......
...@@ -219,11 +219,12 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) ...@@ -219,11 +219,12 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
} }
dap_chain_hash_fast_t l_atom_hash = {}; dap_chain_hash_fast_t l_atom_hash = {};
dap_chain_atom_ptr_t l_atom_copy = l_ch_chain->pkt_data; dap_list_t *l_pkt_copy_list = l_ch_chain->pkt_copy_list;
uint64_t l_atom_copy_size = l_ch_chain->pkt_data_size; if (l_pkt_copy_list) {
l_ch_chain->pkt_data = NULL; l_ch_chain->pkt_copy_list = l_ch_chain->pkt_copy_list->next;
l_ch_chain->pkt_data_size = 0; dap_chain_pkt_copy_t *l_pkt_copy = (dap_chain_pkt_copy_t *)l_pkt_copy_list->data;
if( l_atom_copy && l_atom_copy_size){ dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_copy->pkt_data;
uint64_t l_atom_copy_size = l_pkt_copy->pkt_data_size;
dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash); dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash);
dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain); dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain);
size_t l_atom_size =0; size_t l_atom_size =0;
...@@ -277,8 +278,10 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) ...@@ -277,8 +278,10 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
DAP_DELETE(l_atom_copy); DAP_DELETE(l_atom_copy);
} }
l_chain->callback_atom_iter_delete(l_atom_iter); l_chain->callback_atom_iter_delete(l_atom_iter);
DAP_DELETE(l_pkt_copy);
DAP_DELETE(l_pkt_copy_list);
}else }else
log_it(L_WARNING, "In proc thread got stream ch packet with pkt_size: %zd and pkt_data: %p", l_atom_copy_size, l_atom_copy); log_it(L_WARNING, "In proc thread got CHAINS stream ch packet with zero data");
dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker);
return true; return true;
} }
...@@ -288,93 +291,101 @@ bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) ...@@ -288,93 +291,101 @@ bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
UNUSED(a_thread); UNUSED(a_thread);
dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg;
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
size_t l_data_obj_count = 0; dap_list_t *l_pkt_copy_list = l_ch_chain->pkt_copy_list;
// deserialize data & Parse data from dap_db_log_pack() if (l_pkt_copy_list) {
dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_ch_chain->pkt_data,l_ch_chain->pkt_data_size, &l_data_obj_count); l_ch_chain->pkt_copy_list = l_ch_chain->pkt_copy_list->next;
//log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count ); dap_chain_pkt_copy_t *l_pkt_copy = (dap_chain_pkt_copy_t *)l_pkt_copy_list->data;
size_t l_data_obj_count = 0;
for(size_t i = 0; i < l_data_obj_count; i++) { // deserialize data & Parse data from dap_db_log_pack()
// timestamp for exist obj dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_pkt_copy->pkt_data, l_pkt_copy->pkt_data_size, &l_data_obj_count);
time_t l_timestamp_cur = 0; //log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count );
// obj to add
dap_store_obj_t* l_obj = l_store_obj + i; for(size_t i = 0; i < l_data_obj_count; i++) {
// read item from base; // timestamp for exist obj
size_t l_count_read = 0; time_t l_timestamp_cur = 0;
dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group, // obj to add
l_obj->key, &l_count_read); dap_store_obj_t* l_obj = l_store_obj + i;
// get timestamp for the exist entry // read item from base;
if(l_read_obj) size_t l_count_read = 0;
l_timestamp_cur = l_read_obj->timestamp; dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group,
// get timestamp for the deleted entry l_obj->key, &l_count_read);
else // get timestamp for the exist entry
{ if(l_read_obj)
l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key); l_timestamp_cur = l_read_obj->timestamp;
} // get timestamp for the deleted entry
else
{
l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key);
}
//check whether to apply the received data into the database //check whether to apply the received data into the database
bool l_apply = true; bool l_apply = true;
if(l_obj->timestamp < l_timestamp_cur) if(l_obj->timestamp < l_timestamp_cur)
l_apply = false;
else if(l_obj->type == 'd') {
// already deleted
if(!l_read_obj)
l_apply = false;
}
else if(l_obj->type == 'a') {
bool l_is_the_same_present = false;
if(l_read_obj &&
l_read_obj->value_len == l_obj->value_len &&
!memcmp(l_read_obj->value, l_obj->value, l_obj->value_len))
l_is_the_same_present = true;
// this data already present in global_db and not obsolete (out of date)
if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp))
l_apply = false; l_apply = false;
} else if(l_obj->type == 'd') {
if(l_read_obj) // already deleted
dap_store_obj_free(l_read_obj, l_count_read); if(!l_read_obj)
l_apply = false;
}
else if(l_obj->type == 'a') {
bool l_is_the_same_present = false;
if(l_read_obj &&
l_read_obj->value_len == l_obj->value_len &&
!memcmp(l_read_obj->value, l_obj->value, l_obj->value_len))
l_is_the_same_present = true;
// this data already present in global_db and not obsolete (out of date)
if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp))
l_apply = false;
}
if(l_read_obj)
dap_store_obj_free(l_read_obj, l_count_read);
if(!l_apply) { if(!l_apply) {
// If request was from defined node_addr we update its state // If request was from defined node_addr we update its state
if(l_ch_chain->request.node_addr.uint64) { if(l_ch_chain->request.node_addr.uint64) {
dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id); dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
}
continue;
} }
continue;
}
char l_ts_str[50]; char l_ts_str[50];
dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp); dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp);
/*log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\"" /*log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\""
" timestamp=\"%s\" value_len=%u ", " timestamp=\"%s\" value_len=%u ",
(char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group, (char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group,
l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);*/ l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);*/
// apply received transaction // apply received transaction
dap_chain_t *l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); dap_chain_t *l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
if(l_chain) { if(l_chain) {
if(l_chain->callback_datums_pool_proc_with_group){ if(l_chain->callback_datums_pool_proc_with_group){
void * restrict l_store_obj_value = l_store_obj->value; void * restrict l_store_obj_value = l_store_obj->value;
l_chain->callback_datums_pool_proc_with_group(l_chain, l_chain->callback_datums_pool_proc_with_group(l_chain,
(dap_chain_datum_t** restrict) l_store_obj_value, 1, (dap_chain_datum_t** restrict) l_store_obj_value, 1,
l_store_obj[i].group); l_store_obj[i].group);
}
} }
} // save data to global_db
// save data to global_db if(!dap_chain_global_db_obj_save(l_obj, 1)) {
if(!dap_chain_global_db_obj_save(l_obj, 1)) { dap_stream_ch_chain_pkt_write_error(l_ch, l_ch_chain->request_net_id,
dap_stream_ch_chain_pkt_write_error(l_ch, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_cell_id,
l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED");
"ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED"); dap_stream_ch_set_ready_to_write_unsafe(l_ch, true);
dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); } else {
} else { // If request was from defined node_addr we update its state
// If request was from defined node_addr we update its state if(l_ch_chain->request.node_addr.uint64) {
if(l_ch_chain->request.node_addr.uint64) { dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id); }
//log_it(L_DEBUG, "Added new GLOBAL_DB history pack");
} }
//log_it(L_DEBUG, "Added new GLOBAL_DB history pack");
} }
if(l_store_obj)
dap_store_obj_free(l_store_obj, l_data_obj_count);
DAP_DELETE(l_pkt_copy);
DAP_DELETE(l_pkt_copy_list);
} else {
log_it(L_WARNING, "In proc thread got GDB stream ch packet with zero data");
} }
if(l_store_obj)
dap_store_obj_free(l_store_obj, l_data_obj_count);
dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker);
return true; return true;
} }
...@@ -516,9 +527,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -516,9 +527,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
l_ch_chain->pkt_data = DAP_NEW_SIZE(byte_t, l_chain_pkt_data_size); dap_chain_pkt_copy_t *l_pkt_copy = DAP_NEW_Z(dap_chain_pkt_copy_t);
memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); l_pkt_copy->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size);
l_ch_chain->pkt_data_size = l_chain_pkt_data_size; memcpy(l_pkt_copy->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
l_pkt_copy->pkt_data_size = l_chain_pkt_data_size;
l_ch_chain->pkt_copy_list = dap_list_append(l_ch_chain->pkt_copy_list, l_pkt_copy);
dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_chain_pkt_callback, a_ch); dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_chain_pkt_callback, a_ch);
} else { } else {
...@@ -545,9 +558,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -545,9 +558,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size); dap_chain_pkt_copy_t *l_pkt_copy = DAP_NEW_Z(dap_chain_pkt_copy_t);
memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); l_pkt_copy->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size);
l_ch_chain->pkt_data_size = l_chain_pkt_data_size; memcpy(l_pkt_copy->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
l_pkt_copy->pkt_data_size = l_chain_pkt_data_size;
l_ch_chain->pkt_copy_list = dap_list_append(l_ch_chain->pkt_copy_list, l_pkt_copy);
dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_gdb_pkt_callback, a_ch); dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_gdb_pkt_callback, a_ch);
} else { } else {
......
...@@ -45,6 +45,11 @@ typedef struct dap_chain_atom_item{ ...@@ -45,6 +45,11 @@ typedef struct dap_chain_atom_item{
UT_hash_handle hh; UT_hash_handle hh;
} dap_chain_atom_item_t; } dap_chain_atom_item_t;
typedef struct dap_chain_pkt_copy {
uint64_t pkt_data_size;
byte_t *pkt_data;
} dap_chain_pkt_copy_t;
typedef struct dap_stream_ch_chain { typedef struct dap_stream_ch_chain {
pthread_mutex_t mutex; pthread_mutex_t mutex;
dap_stream_ch_t * ch; dap_stream_ch_t * ch;
...@@ -54,8 +59,7 @@ typedef struct dap_stream_ch_chain { ...@@ -54,8 +59,7 @@ typedef struct dap_stream_ch_chain {
dap_stream_ch_chain_state_t state; dap_stream_ch_chain_state_t state;
dap_chain_atom_iter_t * request_atom_iter; dap_chain_atom_iter_t * request_atom_iter;
byte_t *pkt_data; dap_list_t *pkt_copy_list;
uint64_t pkt_data_size;
uint64_t stats_request_atoms_processed; uint64_t stats_request_atoms_processed;
uint64_t stats_request_gdb_processed; uint64_t stats_request_gdb_processed;
dap_stream_ch_chain_sync_request_t request; dap_stream_ch_chain_sync_request_t request;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment