diff --git a/CMakeLists.txt b/CMakeLists.txt index 67bf9ca3bcafcdf56420cfc08733a682c1b5af6b..485fcabbfc1aa39987cb74fb6e9ba35d292c0372 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.8-10") +set(CELLFRAME_SDK_NATIVE_VERSION "2.8-11") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/modules/channel/chain-net/dap_stream_ch_chain_net.c b/modules/channel/chain-net/dap_stream_ch_chain_net.c index 6f3027e8a90f40b88e5c9ffbd635727adfd774e3..acb6cd7523fcfa15086d519b505426d6a77f4879 100644 --- a/modules/channel/chain-net/dap_stream_ch_chain_net.c +++ b/modules/channel/chain-net/dap_stream_ch_chain_net.c @@ -326,5 +326,4 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) */ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) { - dap_stream_ch_set_ready_to_write_unsafe(a_ch, false); } diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index f0e739c476f8a719f504f527b5a966973631d6b5..ff171b10f67037bacf826d468d4d782fa80ab80f 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -115,7 +115,7 @@ static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_thread, void *a_a static bool s_debug_more=false; static uint_fast16_t s_update_pack_size=100; // Number of hashes packed into the one packet -static uint_fast16_t s_skip_in_reactor_count=10; // Number of hashes packed to skip in one reactor loop callback out packet +static uint_fast16_t s_skip_in_reactor_count=50; // Number of hashes packed to skip in one reactor loop callback out packet /** * @brief dap_stream_ch_chain_init * @return @@ -856,9 +856,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) uint l_count_added=0; uint l_count_total=0; - dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id); + dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if (! l_chain){ - log_it(L_ERROR, "Invalid UPDATE_CHAINS request from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x" chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", a_ch->stream->esocket->remote_addr_str? + log_it(L_ERROR, "Invalid UPDATE_CHAINS packet from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x" chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", a_ch->stream->esocket->remote_addr_str? a_ch->stream->esocket->remote_addr_str: "<unknown>", l_chain_pkt->hdr.ext_id, l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); @@ -1192,12 +1192,20 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) */ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain) { + a_ch_chain->is_on_request = false; a_ch_chain->state = CHAIN_STATE_IDLE; - //log_it(L_DEBUG, "CHAIN_STATE_IDLE"); + if(s_debug_more) + log_it(L_INFO, "Go in CHAIN_STATE_IDLE"); // Cleanup after request memset(&a_ch_chain->request, 0, sizeof(a_ch_chain->request)); memset(&a_ch_chain->request_hdr, 0, sizeof(a_ch_chain->request_hdr)); + if(a_ch_chain->request_atom_iter) + if(a_ch_chain->request_atom_iter->chain) + if(a_ch_chain->request_atom_iter->chain->callback_atom_iter_delete){ + a_ch_chain->request_atom_iter->chain->callback_atom_iter_delete(a_ch_chain->request_atom_iter); + return; + } DAP_DEL_Z(a_ch_chain->request_atom_iter); } @@ -1316,56 +1324,63 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) // Synchronize chains case CHAIN_STATE_SYNC_CHAINS: { - - if (l_ch_chain->request_atom_iter && l_ch_chain->request_atom_iter->cur) { // Process one chain from l_ch_chain->request_atom_iter - - // Pack loop to skip quicker - for(uint_fast16_t k=0; k<s_skip_in_reactor_count; k++){ - // Check if present and skip if present - dap_stream_ch_chain_hash_item_t * l_hash_item = NULL; - HASH_FIND(hh,l_ch_chain->remote_atoms, l_ch_chain->request_atom_iter->cur_hash , sizeof (l_hash_item->hash), l_hash_item ); - if( l_hash_item ){ // If found - skip it - if(s_debug_more){ - char l_request_atom_hash_str[81]={[0]='\0'}; - dap_chain_hash_fast_to_str(l_ch_chain->request_atom_iter->cur_hash,l_request_atom_hash_str,sizeof (l_request_atom_hash_str)); - log_it(L_DEBUG, "Out CHAIN: skip atom hash %s because its already present in remote atom hash table", - l_request_atom_hash_str); - } - }else{ - l_hash_item = DAP_NEW_Z(dap_stream_ch_chain_hash_item_t); - if(s_debug_more){ - dap_hash_fast(l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size,&l_hash_item->hash); - char *l_atom_hash_str= dap_chain_hash_fast_to_str_new(&l_hash_item->hash); - - log_it(L_INFO, "Out CHAIN pkt: atom hash %s (size %zd) ", l_atom_hash_str, l_ch_chain->request_atom_iter->cur_size); - DAP_DELETE(l_atom_hash_str); - } - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id.uint64, - l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, - l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size); - break; // If sent smth - break out from pack loop - l_ch_chain->stats_request_atoms_processed++; - - l_hash_item->size = l_ch_chain->request_atom_iter->cur_size; - // Because we sent this atom to remote - we record it to not to send it twice - HASH_ADD(hh, l_ch_chain->remote_atoms, hash, sizeof (l_hash_item->hash), l_hash_item); + bool l_was_sent_smth=false; + // Process one chain from l_ch_chain->request_atom_iter + // Pack loop to skip quicker + for(uint_fast16_t k=0; k<s_skip_in_reactor_count && l_ch_chain->request_atom_iter && l_ch_chain->request_atom_iter->cur; k++){ + // Check if present and skip if present + dap_stream_ch_chain_hash_item_t * l_hash_item = NULL; + HASH_FIND(hh,l_ch_chain->remote_atoms, l_ch_chain->request_atom_iter->cur_hash , sizeof (l_hash_item->hash), l_hash_item ); + if( l_hash_item ){ // If found - skip it + if(s_debug_more){ + char l_request_atom_hash_str[81]={[0]='\0'}; + dap_chain_hash_fast_to_str(l_ch_chain->request_atom_iter->cur_hash,l_request_atom_hash_str,sizeof (l_request_atom_hash_str)); + log_it(L_DEBUG, "Out CHAIN: skip atom hash %s because its already present in remote atom hash table", + l_request_atom_hash_str); } - // Then get next atom and populate new last - l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); + }else{ + l_hash_item = DAP_NEW_Z(dap_stream_ch_chain_hash_item_t); + if(s_debug_more){ + dap_hash_fast(l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size,&l_hash_item->hash); + char *l_atom_hash_str= dap_chain_hash_fast_to_str_new(&l_hash_item->hash); + + log_it(L_INFO, "Out CHAIN pkt: atom hash %s (size %zd) ", l_atom_hash_str, l_ch_chain->request_atom_iter->cur_size); + DAP_DELETE(l_atom_hash_str); + } + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id.uint64, + l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, + l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size); + l_was_sent_smth = true; + break; // If sent smth - break out from pack loop + l_ch_chain->stats_request_atoms_processed++; + + l_hash_item->size = l_ch_chain->request_atom_iter->cur_size; + // Because we sent this atom to remote - we record it to not to send it twice + HASH_ADD(hh, l_ch_chain->remote_atoms, hash, sizeof (l_hash_item->hash), l_hash_item); } - } else { // All chains synced + // Then get next atom and populate new last + l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); + } + if(!l_ch_chain->request_atom_iter || + ( l_ch_chain->request_atom_iter &&(! l_ch_chain->request_atom_iter->cur) ) ) { // All chains synced dap_stream_ch_chain_sync_request_t l_request = {0}; // last message + l_was_sent_smth = true; dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); log_it( L_INFO,"Synced: %llu atoms processed", l_ch_chain->stats_request_atoms_processed); - l_ch_chain->is_on_request = false; dap_stream_ch_chain_go_idle(l_ch_chain); if (l_ch_chain->callback_notify_packet_out) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, NULL, 0, l_ch_chain->callback_notify_arg); } + if (! l_was_sent_smth ){ + // Sending dumb packet with nothing to inform remote thats we're just skiping atoms, nothing freezed + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD, + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); + } } break; default: break; } diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 67b32b6e9eeacfad20d38e8ba175c265c23b3635..d11b8e5cf3b7a3b34ba2b0e1ba094477c0590c26 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -342,8 +342,9 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha } // Check if we over with it before - if ( !l_node_client->cur_cell ){ - log_it(L_WARNING, "In: No current cell in sync state, anyway we over it"); + if ( ! l_node_client->cur_cell ){ + if(s_stream_ch_chain_debug_more) + log_it(L_INFO, "In: No current cell in sync state, anyway we over it"); }else l_node_client->cur_cell =(dap_chain_cell_t *) l_node_client->cur_cell->hh.next;