Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (2)
......@@ -455,8 +455,9 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg)
dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg;
dap_worker_t * w = a_es->worker;
//log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new);
if(dap_events_socket_check_unsafe( w, a_es)){
log_it(L_ERROR, "Already assigned %d (%p), you're doing smth wrong", a_es->socket, a_es);
if(dap_events_socket_check_unsafe( w, l_es_new)){
//log_it(L_ERROR, "Already assigned %d (%p), you're doing smth wrong", l_es_new->socket, l_es_new);
// Socket already present in worker, it's OK
return;
}
......
......@@ -219,11 +219,12 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
}
dap_chain_hash_fast_t l_atom_hash = {};
dap_chain_atom_ptr_t l_atom_copy = l_ch_chain->pkt_data;
uint64_t l_atom_copy_size = l_ch_chain->pkt_data_size;
l_ch_chain->pkt_data = NULL;
l_ch_chain->pkt_data_size = 0;
if( l_atom_copy && l_atom_copy_size){
dap_list_t *l_pkt_copy_list = l_ch_chain->pkt_copy_list;
if (l_pkt_copy_list) {
l_ch_chain->pkt_copy_list = l_ch_chain->pkt_copy_list->next;
dap_chain_pkt_copy_t *l_pkt_copy = (dap_chain_pkt_copy_t *)l_pkt_copy_list->data;
dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_copy->pkt_data;
uint64_t l_atom_copy_size = l_pkt_copy->pkt_data_size;
dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash);
dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain);
size_t l_atom_size =0;
......@@ -277,8 +278,10 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
DAP_DELETE(l_atom_copy);
}
l_chain->callback_atom_iter_delete(l_atom_iter);
DAP_DELETE(l_pkt_copy);
DAP_DELETE(l_pkt_copy_list);
}else
log_it(L_WARNING, "In proc thread got stream ch packet with pkt_size: %zd and pkt_data: %p", l_atom_copy_size, l_atom_copy);
log_it(L_WARNING, "In proc thread got CHAINS stream ch packet with zero data");
dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker);
return true;
}
......@@ -288,93 +291,101 @@ bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
UNUSED(a_thread);
dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg;
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
size_t l_data_obj_count = 0;
// deserialize data & Parse data from dap_db_log_pack()
dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_ch_chain->pkt_data,l_ch_chain->pkt_data_size, &l_data_obj_count);
//log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count );
for(size_t i = 0; i < l_data_obj_count; i++) {
// timestamp for exist obj
time_t l_timestamp_cur = 0;
// obj to add
dap_store_obj_t* l_obj = l_store_obj + i;
// read item from base;
size_t l_count_read = 0;
dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group,
l_obj->key, &l_count_read);
// get timestamp for the exist entry
if(l_read_obj)
l_timestamp_cur = l_read_obj->timestamp;
// get timestamp for the deleted entry
else
{
l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key);
}
dap_list_t *l_pkt_copy_list = l_ch_chain->pkt_copy_list;
if (l_pkt_copy_list) {
l_ch_chain->pkt_copy_list = l_ch_chain->pkt_copy_list->next;
dap_chain_pkt_copy_t *l_pkt_copy = (dap_chain_pkt_copy_t *)l_pkt_copy_list->data;
size_t l_data_obj_count = 0;
// deserialize data & Parse data from dap_db_log_pack()
dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_pkt_copy->pkt_data, l_pkt_copy->pkt_data_size, &l_data_obj_count);
//log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count );
for(size_t i = 0; i < l_data_obj_count; i++) {
// timestamp for exist obj
time_t l_timestamp_cur = 0;
// obj to add
dap_store_obj_t* l_obj = l_store_obj + i;
// read item from base;
size_t l_count_read = 0;
dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group,
l_obj->key, &l_count_read);
// get timestamp for the exist entry
if(l_read_obj)
l_timestamp_cur = l_read_obj->timestamp;
// get timestamp for the deleted entry
else
{
l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key);
}
//check whether to apply the received data into the database
bool l_apply = true;
if(l_obj->timestamp < l_timestamp_cur)
l_apply = false;
else if(l_obj->type == 'd') {
// already deleted
if(!l_read_obj)
l_apply = false;
}
else if(l_obj->type == 'a') {
bool l_is_the_same_present = false;
if(l_read_obj &&
l_read_obj->value_len == l_obj->value_len &&
!memcmp(l_read_obj->value, l_obj->value, l_obj->value_len))
l_is_the_same_present = true;
// this data already present in global_db and not obsolete (out of date)
if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp))
//check whether to apply the received data into the database
bool l_apply = true;
if(l_obj->timestamp < l_timestamp_cur)
l_apply = false;
}
if(l_read_obj)
dap_store_obj_free(l_read_obj, l_count_read);
else if(l_obj->type == 'd') {
// already deleted
if(!l_read_obj)
l_apply = false;
}
else if(l_obj->type == 'a') {
bool l_is_the_same_present = false;
if(l_read_obj &&
l_read_obj->value_len == l_obj->value_len &&
!memcmp(l_read_obj->value, l_obj->value, l_obj->value_len))
l_is_the_same_present = true;
// this data already present in global_db and not obsolete (out of date)
if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp))
l_apply = false;
}
if(l_read_obj)
dap_store_obj_free(l_read_obj, l_count_read);
if(!l_apply) {
// If request was from defined node_addr we update its state
if(l_ch_chain->request.node_addr.uint64) {
dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
if(!l_apply) {
// If request was from defined node_addr we update its state
if(l_ch_chain->request.node_addr.uint64) {
dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
}
continue;
}
continue;
}
char l_ts_str[50];
dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp);
/*log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\""
" timestamp=\"%s\" value_len=%u ",
(char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group,
l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);*/
// apply received transaction
dap_chain_t *l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
if(l_chain) {
if(l_chain->callback_datums_pool_proc_with_group){
void * restrict l_store_obj_value = l_store_obj->value;
l_chain->callback_datums_pool_proc_with_group(l_chain,
(dap_chain_datum_t** restrict) l_store_obj_value, 1,
l_store_obj[i].group);
char l_ts_str[50];
dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp);
/*log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\""
" timestamp=\"%s\" value_len=%u ",
(char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group,
l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);*/
// apply received transaction
dap_chain_t *l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
if(l_chain) {
if(l_chain->callback_datums_pool_proc_with_group){
void * restrict l_store_obj_value = l_store_obj->value;
l_chain->callback_datums_pool_proc_with_group(l_chain,
(dap_chain_datum_t** restrict) l_store_obj_value, 1,
l_store_obj[i].group);
}
}
}
// save data to global_db
if(!dap_chain_global_db_obj_save(l_obj, 1)) {
dap_stream_ch_chain_pkt_write_error(l_ch, l_ch_chain->request_net_id,
l_ch_chain->request_chain_id, l_ch_chain->request_cell_id,
"ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED");
dap_stream_ch_set_ready_to_write_unsafe(l_ch, true);
} else {
// If request was from defined node_addr we update its state
if(l_ch_chain->request.node_addr.uint64) {
dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
// save data to global_db
if(!dap_chain_global_db_obj_save(l_obj, 1)) {
dap_stream_ch_chain_pkt_write_error(l_ch, l_ch_chain->request_net_id,
l_ch_chain->request_chain_id, l_ch_chain->request_cell_id,
"ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED");
dap_stream_ch_set_ready_to_write_unsafe(l_ch, true);
} else {
// If request was from defined node_addr we update its state
if(l_ch_chain->request.node_addr.uint64) {
dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id);
}
//log_it(L_DEBUG, "Added new GLOBAL_DB history pack");
}
//log_it(L_DEBUG, "Added new GLOBAL_DB history pack");
}
if(l_store_obj)
dap_store_obj_free(l_store_obj, l_data_obj_count);
DAP_DELETE(l_pkt_copy);
DAP_DELETE(l_pkt_copy_list);
} else {
log_it(L_WARNING, "In proc thread got GDB stream ch packet with zero data");
}
if(l_store_obj)
dap_store_obj_free(l_store_obj, l_data_obj_count);
dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker);
return true;
}
......@@ -516,9 +527,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
l_ch_chain->pkt_data = DAP_NEW_SIZE(byte_t, l_chain_pkt_data_size);
memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
l_ch_chain->pkt_data_size = l_chain_pkt_data_size;
dap_chain_pkt_copy_t *l_pkt_copy = DAP_NEW_Z(dap_chain_pkt_copy_t);
l_pkt_copy->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size);
memcpy(l_pkt_copy->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
l_pkt_copy->pkt_data_size = l_chain_pkt_data_size;
l_ch_chain->pkt_copy_list = dap_list_append(l_ch_chain->pkt_copy_list, l_pkt_copy);
dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_chain_pkt_callback, a_ch);
} else {
......@@ -545,9 +558,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size);
memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
l_ch_chain->pkt_data_size = l_chain_pkt_data_size;
dap_chain_pkt_copy_t *l_pkt_copy = DAP_NEW_Z(dap_chain_pkt_copy_t);
l_pkt_copy->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size);
memcpy(l_pkt_copy->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
l_pkt_copy->pkt_data_size = l_chain_pkt_data_size;
l_ch_chain->pkt_copy_list = dap_list_append(l_ch_chain->pkt_copy_list, l_pkt_copy);
dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_gdb_pkt_callback, a_ch);
} else {
......
......@@ -45,6 +45,11 @@ typedef struct dap_chain_atom_item{
UT_hash_handle hh;
} dap_chain_atom_item_t;
typedef struct dap_chain_pkt_copy {
uint64_t pkt_data_size;
byte_t *pkt_data;
} dap_chain_pkt_copy_t;
typedef struct dap_stream_ch_chain {
pthread_mutex_t mutex;
dap_stream_ch_t * ch;
......@@ -54,8 +59,7 @@ typedef struct dap_stream_ch_chain {
dap_stream_ch_chain_state_t state;
dap_chain_atom_iter_t * request_atom_iter;
byte_t *pkt_data;
uint64_t pkt_data_size;
dap_list_t *pkt_copy_list;
uint64_t stats_request_atoms_processed;
uint64_t stats_request_gdb_processed;
dap_stream_ch_chain_sync_request_t request;
......