diff --git a/CMakeLists.txt b/CMakeLists.txt index 3e0af01c28027db875c2b7403ff673d354ee1529..da537d9eb808ccf7fae2568c460cf4a0f6b31844 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 3.0) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.8-21") +set(CELLFRAME_SDK_NATIVE_VERSION "2.8-22") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index fac3c794a9d376afc6715976a2eb9f3b8bb2ae17..b4b4825a75e99dfd6a0884af93b6ce41e36677de 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -103,6 +103,8 @@ static void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg); static void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg); static bool s_sync_out_chains_proc_callback(dap_proc_thread_t *a_thread, void *a_arg); +static void s_sync_out_chains_last_worker_callback(dap_worker_t *a_worker, void *a_arg); +static void s_sync_out_chains_first_worker_callback(dap_worker_t *a_worker, void *a_arg); static bool s_sync_out_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_arg); static void s_sync_out_gdb_first_gdb_worker_callback(dap_worker_t *a_worker, void *a_arg); @@ -1348,7 +1350,9 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) bool l_was_sent_smth=false; // Process one chain from l_ch_chain->request_atom_iter // Pack loop to skip quicker - for(uint_fast16_t k=0; k<s_skip_in_reactor_count && l_ch_chain->request_atom_iter && l_ch_chain->request_atom_iter->cur; k++){ + for(uint_fast16_t k=0; k<s_skip_in_reactor_count && + l_ch_chain->request_atom_iter && + l_ch_chain->request_atom_iter->cur; k++){ // Check if present and skip if present dap_stream_ch_chain_hash_item_t * l_hash_item = NULL; HASH_FIND(hh,l_ch_chain->remote_atoms, l_ch_chain->request_atom_iter->cur_hash , sizeof (l_hash_item->hash), l_hash_item ); @@ -1361,10 +1365,10 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) } }else{ l_hash_item = DAP_NEW_Z(dap_stream_ch_chain_hash_item_t); + dap_hash_fast(l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size, + &l_hash_item->hash); if(s_debug_more){ - dap_hash_fast(l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size,&l_hash_item->hash); char *l_atom_hash_str= dap_chain_hash_fast_to_str_new(&l_hash_item->hash); - log_it(L_INFO, "Out CHAIN pkt: atom hash %s (size %zd) ", l_atom_hash_str, l_ch_chain->request_atom_iter->cur_size); DAP_DELETE(l_atom_hash_str); } @@ -1375,9 +1379,21 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->stats_request_atoms_processed++; l_hash_item->size = l_ch_chain->request_atom_iter->cur_size; - // Because we sent this atom to remote - we record it to not to send it twice - HASH_ADD(hh, l_ch_chain->remote_atoms, hash, sizeof (l_hash_item->hash), l_hash_item); - //break; // If sent smth - break out from pack loop + + unsigned l_hash_item_hashv =0; + dap_stream_ch_chain_hash_item_t *l_hash_item_check = NULL; + + HASH_VALUE(&l_hash_item->hash ,sizeof (l_hash_item->hash), + l_hash_item_hashv); + HASH_FIND_BYHASHVALUE(hh, l_ch_chain->remote_atoms,&l_hash_item->hash ,sizeof (l_hash_item->hash), + l_hash_item_hashv, l_hash_item_check); + if (l_hash_item_check ==NULL ){ + // Because we sent this atom to remote - we record it to not to send it twice + HASH_ADD_BYHASHVALUE(hh, l_ch_chain->remote_atoms, hash, sizeof (l_hash_item->hash),l_hash_item_hashv, + l_hash_item); + }else + DAP_DELETE(l_hash_item); + } // Then get next atom and populate new last l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 67a2dd52f61e11a5972bbd7814830267d4f1e74d..63534655a853b094fdc23e6e346b2f35e6b67153 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -1130,11 +1130,12 @@ static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get_next( dap_chain_atom_ a_atom_iter->cur = l_event_item ? l_event_item->event : NULL; a_atom_iter->cur_size = a_atom_iter->cur ? l_event_item->event_size : 0; a_atom_iter->cur_hash = l_event_item ? &l_event_item->hash : NULL; - } - if(a_atom_size) - *a_atom_size = a_atom_iter->cur_size; + if(a_atom_size) + *a_atom_size = a_atom_iter->cur_size; + return a_atom_iter->cur; + }else + return NULL; - return a_atom_iter->cur; } /**