diff --git a/CMakeLists.txt b/CMakeLists.txt index 9fd7506ccae17033d0a5c7b5cacc1487dd687955..4b4350d04b463d9f91e5a85ca2f1df415655f275 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.5-17") +set(CELLFRAME_SDK_NATIVE_VERSION "2.5-18") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 1d6eb6a2d6f28c2ec9e75f885f077a0e4285278b..c38416f2aea8c34a8b36bccd8e7483d027310184 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -131,32 +131,10 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); - - dap_chain_atom_ptr_t * l_lasts = NULL; - size_t *l_lasts_sizes = NULL; - size_t l_lasts_count = 0; - dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create(l_chain); - l_ch_chain->request_atom_iter = l_iter; - l_lasts = l_chain->callback_atom_iter_get_lasts(l_iter, &l_lasts_count, &l_lasts_sizes); - log_it(L_INFO, "Found %d last atoms for synchronization", l_lasts_count); - if (l_lasts && l_lasts_sizes) { - for(long int i = 0; i < l_lasts_count; i++) { - dap_chain_atom_item_t * l_item = NULL; - dap_chain_hash_fast_t l_atom_hash; - dap_hash_fast(l_lasts[i], l_lasts_sizes[i], &l_atom_hash); - pthread_mutex_lock(&l_ch_chain->mutex); - HASH_FIND(hh, l_ch_chain->request_atoms_lasts, &l_atom_hash, sizeof(l_atom_hash), - l_item); - if(l_item == NULL) { // Not found, add new lasts - l_item = DAP_NEW_Z(dap_chain_atom_item_t); - l_item->atom = l_lasts[i]; - l_item->atom_size = l_lasts_sizes[i]; - memcpy(&l_item->atom_hash, &l_atom_hash, sizeof(l_atom_hash)); - HASH_ADD(hh, l_ch_chain->request_atoms_lasts, atom_hash, sizeof(l_atom_hash), - l_item); - } - pthread_mutex_unlock(&l_ch_chain->mutex); - } + l_ch_chain->request_atom_iter = l_chain->callback_atom_iter_create(l_chain); + size_t l_first_size = 0; + dap_chain_atom_ptr_t *l_first = l_chain->callback_atom_iter_get_first(l_ch_chain->request_atom_iter, &l_first_size); + if (l_first && l_first_size) { // first packet l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS; dap_chain_node_addr_t l_node_addr = { 0 }; @@ -168,15 +146,13 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) } else { // last packet - dap_stream_ch_chain_sync_request_t l_request; - memset(&l_request,0,sizeof (l_request)); + dap_stream_ch_chain_sync_request_t l_request = {}; dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); + DAP_DEL_Z(l_ch_chain->request_atom_iter); l_ch_chain->state = CHAIN_STATE_IDLE; } - DAP_DELETE(l_lasts); - DAP_DELETE(l_iter); dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); return true; @@ -603,15 +579,7 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain) memset(&a_ch_chain->request_cell_id, 0, sizeof(a_ch_chain->request_cell_id)); memset(&a_ch_chain->request_chain_id, 0, sizeof(a_ch_chain->request_chain_id)); memset(&a_ch_chain->request_last_ts, 0, sizeof(a_ch_chain->request_last_ts)); - - dap_chain_atom_item_t *l_atom_item = NULL, *l_atom_item_tmp = NULL; - pthread_mutex_lock(&a_ch_chain->mutex); - HASH_ITER( hh,a_ch_chain->request_atoms_lasts, l_atom_item, l_atom_item_tmp) - HASH_DEL(a_ch_chain->request_atoms_lasts, l_atom_item); - - HASH_ITER( hh, a_ch_chain->request_atoms_processed, l_atom_item, l_atom_item_tmp ) - HASH_DEL(a_ch_chain->request_atoms_processed, l_atom_item); - pthread_mutex_unlock(&a_ch_chain->mutex); + DAP_DEL_Z(a_ch_chain->request_atom_iter); } bool s_process_gdb_iter(dap_stream_ch_t *a_ch) @@ -699,79 +667,26 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) // Synchronize chains case CHAIN_STATE_SYNC_CHAINS: { //log_it(L_DEBUG, "CHAIN_STATE_SYNC_CHAINS"); - dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); - dap_chain_atom_item_t * l_atom_item = NULL, *l_atom_item_tmp = NULL;//, *l_chains_lasts_new = NULL; - if(l_ch_chain->request_atoms_lasts == NULL) { // All chains synced - dap_stream_ch_chain_sync_request_t l_request; - memset(&l_request,0,sizeof (l_request)); - uint8_t l_send_pkt_type = l_ch_chain->state == CHAIN_STATE_SYNC_CHAINS ? - DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS : - DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL; + if (l_ch_chain->request_atom_iter->cur == NULL) { // All chains synced + dap_stream_ch_chain_sync_request_t l_request = {}; // last message - dap_stream_ch_chain_pkt_write_unsafe(l_ch, l_send_pkt_type, + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); l_packet_out = true; - log_it( L_DEBUG,"Synced: %llu atoms processed", - l_ch_chain->stats_request_atoms_processed); + log_it( L_DEBUG,"Synced: %llu atoms processed", l_ch_chain->stats_request_atoms_processed); dap_stream_ch_chain_go_idle(l_ch_chain); - - if(l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, l_send_pkt_type, NULL, 0, l_ch_chain->callback_notify_arg); - }else{ // Process one chain from l_ch_chain->request_atoms_lasts - pthread_mutex_lock(&l_ch_chain->mutex); - HASH_ITER(hh,l_ch_chain->request_atoms_lasts, l_atom_item, l_atom_item_tmp) { - dap_chain_atom_item_t * l_atom_item_proc = NULL; - // Check if its processed already - HASH_FIND(hh, l_ch_chain->request_atoms_processed, &l_atom_item->atom_hash, - sizeof(l_atom_item->atom_hash), l_atom_item_proc); - - if(l_atom_item_proc == NULL) { // If not processed we first store it in special table - l_atom_item_proc = DAP_NEW_Z(dap_chain_atom_item_t); - l_atom_item_proc->atom = l_atom_item->atom; - memcpy(&l_atom_item_proc->atom_hash, &l_atom_item->atom_hash, sizeof(l_atom_item->atom_hash)); - HASH_ADD(hh, l_ch_chain->request_atoms_processed, atom_hash, sizeof(l_atom_item->atom_hash), - l_atom_item_proc); - - // Then flush it out to the remote - size_t l_atom_size = l_atom_item->atom_size; - log_it(L_INFO, "Send one chain packet len=%d", l_atom_size); - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_net_id, - l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, - l_atom_item->atom, l_atom_size); - l_packet_out = true; - l_ch_chain->stats_request_atoms_processed++; - // Then parse links and populate new lasts - size_t l_links_count = 0; - dap_chain_atom_ptr_t * l_links = NULL; - size_t *l_links_sizes = NULL; - dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create_from(l_chain, l_atom_item->atom, l_atom_item->atom_size); - l_links = l_chain->callback_atom_iter_get_links(l_iter, &l_links_count, &l_links_sizes); - DAP_DELETE(l_iter); - if (l_links && l_links_sizes) { - for(size_t i = 0; i < l_links_count; i++) { // Find links - dap_chain_atom_item_t * l_link_item = NULL; - dap_chain_hash_fast_t l_link_hash; - dap_hash_fast(l_links[i], l_links_sizes[i], &l_link_hash); - // Check link in processed atims - HASH_FIND(hh, l_ch_chain->request_atoms_processed, &l_link_hash, sizeof(l_link_hash), l_link_item); - if(l_link_item == NULL) { // Not found, add new lasts - l_link_item = DAP_NEW_Z(dap_chain_atom_item_t); - l_link_item->atom = l_links[i];// do not use memory cause it will be deleted - l_link_item->atom_size = l_links_sizes[i]; - memcpy(&l_link_item->atom_hash, &l_link_hash, sizeof(l_link_hash)); - HASH_ADD(hh, l_ch_chain->request_atoms_lasts, atom_hash, sizeof(l_link_hash), l_link_item); - } - } - DAP_DELETE(l_links); - } - HASH_DEL(l_ch_chain->request_atoms_lasts, l_atom_item); - break; - } else { - HASH_DEL(l_ch_chain->request_atoms_lasts, l_atom_item); - } - } - pthread_mutex_unlock(&l_ch_chain->mutex); + if (l_ch_chain->callback_notify_packet_out) + l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, NULL, + 0, l_ch_chain->callback_notify_arg); + } else { // Process one chain from l_ch_chain->request_atom_iter + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_net_id, + l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, + l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size); + l_packet_out = true; + l_ch_chain->stats_request_atoms_processed++; + // Then get next atom and populate new last + l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); } } break; default: break; diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index 9e576b7656c964ae5dbe01f999b38f7dcb6fedb5..17992a7de03552522bdae7834a2ddf6703cf74a4 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -54,8 +54,6 @@ typedef struct dap_stream_ch_chain { dap_stream_ch_chain_state_t state; dap_chain_atom_iter_t * request_atom_iter; - dap_chain_atom_item_t * request_atoms_lasts; - dap_chain_atom_item_t * request_atoms_processed; uint8_t *pkt_data; uint64_t pkt_data_size; uint64_t stats_request_atoms_processed;