diff --git a/substrate/bin/node-template/node/src/service.rs b/substrate/bin/node-template/node/src/service.rs index c39f363501e5..ff73c21bec07 100644 --- a/substrate/bin/node-template/node/src/service.rs +++ b/substrate/bin/node-template/node/src/service.rs @@ -63,16 +63,18 @@ pub fn new_partial(config: &Configuration) -> Result { }); let telemetry_handle = telemetry.as_ref().map(|t| t.handle()); - let custom_telemetry_worker = CustomTelemetryWorker - { - handle: telemetry_handle, + let custom_telemetry_worker = CustomTelemetryWorker { + handle: telemetry_handle, sampling_interval_ms: 6_000u128, max_interval_buffer_size: 20, max_block_request_buffer_size: 15, + is_authority: config.role.is_authority(), }; - task_manager - .spawn_handle() - .spawn("custom_telemetry", None, custom_telemetry_worker.run(None, None)); + task_manager.spawn_handle().spawn( + "custom_telemetry", + None, + custom_telemetry_worker.run(None, None), + ); let select_chain = sc_consensus::LongestChain::new(backend.clone()); diff --git a/substrate/client/consensus/common/src/import_queue.rs b/substrate/client/consensus/common/src/import_queue.rs index ae012613498e..c64fbf3d01af 100644 --- a/substrate/client/consensus/common/src/import_queue.rs +++ b/substrate/client/consensus/common/src/import_queue.rs @@ -177,8 +177,8 @@ impl BlockImportStatus { /// Returns the imported block number. pub fn number(&self) -> &N { match self { - BlockImportStatus::ImportedKnown(n, _) | - BlockImportStatus::ImportedUnknown(n, _, _) => n, + BlockImportStatus::ImportedKnown(n, _) + | BlockImportStatus::ImportedUnknown(n, _, _) => n, } } } @@ -251,14 +251,14 @@ pub(crate) async fn import_single_block_metered_v2>( import_single_block_metered(import_handle, block_origin, block, verifier, metrics).await; let end_timestamp = BlockMetrics::get_current_timestamp_in_ms_or_default(); - let interval = IntervalWithBlockInformation { - kind: IntervalKind::Import, - block_number, - block_hash, - start_timestamp, - end_timestamp, + match &res { + Ok(BlockImportStatus::ImportedUnknown(_, _, peer_id)) => { + let peer_id = peer_id.clone(); + let value = IntervalDetailsImport { peer_id, start_timestamp, end_timestamp }; + BlockMetrics::observe_interval(block_number, block_hash, value.into()); + }, + _ => (), }; - BlockMetrics::observe_interval(interval); res } @@ -284,7 +284,7 @@ pub(crate) async fn import_single_block_metered>( } else { debug!(target: LOG_TARGET, "Header {} was not provided ", block.hash); } - return Err(BlockImportError::IncompleteHeader(peer)) + return Err(BlockImportError::IncompleteHeader(peer)); }, }; @@ -299,8 +299,9 @@ pub(crate) async fn import_single_block_metered>( trace!(target: LOG_TARGET, "Block already in chain {}: {:?}", number, hash); Ok(BlockImportStatus::ImportedKnown(number, peer)) }, - Ok(ImportResult::Imported(aux)) => - Ok(BlockImportStatus::ImportedUnknown(number, aux, peer)), + Ok(ImportResult::Imported(aux)) => { + Ok(BlockImportStatus::ImportedUnknown(number, aux, peer)) + }, Ok(ImportResult::MissingState) => { debug!( target: LOG_TARGET, diff --git a/substrate/client/consensus/common/src/import_queue/basic_queue.rs b/substrate/client/consensus/common/src/import_queue/basic_queue.rs index 60ceaa44d4dd..38e8463d02c7 100644 --- a/substrate/client/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/client/consensus/common/src/import_queue/basic_queue.rs @@ -124,7 +124,7 @@ impl BasicQueueHandle { impl ImportQueueService for BasicQueueHandle { fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { if blocks.is_empty() { - return + return; } trace!(target: LOG_TARGET, "Scheduling {} blocks for import", blocks.len()); @@ -192,7 +192,7 @@ impl ImportQueue for BasicQueue { loop { if let Err(_) = self.result_port.next_action(&mut *link).await { log::error!(target: "sync", "poll_actions: Background import task is no longer alive"); - return + return; } } } @@ -235,7 +235,7 @@ async fn block_import_process( target: LOG_TARGET, "Stopping block import because the import channel was closed!", ); - return + return; }, }; @@ -309,26 +309,27 @@ impl BlockImportWorker { target: LOG_TARGET, "Stopping block import because result channel was closed!", ); - return + return; } // Make sure to first process all justifications while let Poll::Ready(justification) = futures::poll!(justification_port.next()) { match justification { - Some(ImportJustification(who, hash, number, justification)) => - worker.import_justification(who, hash, number, justification).await, + Some(ImportJustification(who, hash, number, justification)) => { + worker.import_justification(who, hash, number, justification).await + }, None => { log::debug!( target: LOG_TARGET, "Stopping block import because justification channel was closed!", ); - return + return; }, } } if let Poll::Ready(()) = futures::poll!(&mut block_import_process) { - return + return; } // All futures that we polled are now pending. @@ -422,7 +423,7 @@ async fn import_many_blocks>( Some(b) => b, None => { // No block left to import, success! - return ImportManyBlocksResult { block_count: count, imported, results } + return ImportManyBlocksResult { block_count: count, imported, results }; }, }; diff --git a/substrate/client/consensus/slots/src/lib.rs b/substrate/client/consensus/slots/src/lib.rs index c861f4f967cc..407e22e95a40 100644 --- a/substrate/client/consensus/slots/src/lib.rs +++ b/substrate/client/consensus/slots/src/lib.rs @@ -37,7 +37,7 @@ use futures_timer::Delay; use log::{debug, info, warn}; use sc_consensus::{BlockImport, JustificationSyncLink}; use sc_telemetry::{ - custom_telemetry::{BlockMetrics, IntervalKind, IntervalWithBlockInformation}, + custom_telemetry::{BlockMetrics, IntervalDetailsImport, IntervalDetailsProposal}, telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO, CONSENSUS_WARN, }; use sp_arithmetic::traits::BaseArithmetic; @@ -222,7 +222,7 @@ pub trait SimpleSlotWorker { Either::Left((Err(err), _)) => { warn!(target: log_target, "Proposing failed: {}", err); - return None + return None; }, Either::Right(_) => { info!( @@ -242,7 +242,7 @@ pub trait SimpleSlotWorker { "slot" => *slot, ); - return None + return None; }, }; @@ -268,7 +268,7 @@ pub trait SimpleSlotWorker { err, ); - return None + return None; }, Either::Left(_) => { warn!( @@ -278,7 +278,7 @@ pub trait SimpleSlotWorker { slot_info.chain_head.hash(), ); - return None + return None; }, }; @@ -305,7 +305,7 @@ pub trait SimpleSlotWorker { "Skipping proposal slot {} since there's no time left to propose", slot, ); - return None + return None; } else { Instant::now() + proposing_remaining_duration }; @@ -328,7 +328,7 @@ pub trait SimpleSlotWorker { "err" => ?err, ); - return None + return None; }, }; @@ -336,9 +336,9 @@ pub trait SimpleSlotWorker { let authorities_len = self.authorities_len(&aux_data); - if !self.force_authoring() && - self.sync_oracle().is_offline() && - authorities_len.map(|a| a > 1).unwrap_or(false) + if !self.force_authoring() + && self.sync_oracle().is_offline() + && authorities_len.map(|a| a > 1).unwrap_or(false) { debug!(target: logging_target, "Skipping proposal slot. Waiting for the network."); telemetry!( @@ -348,13 +348,13 @@ pub trait SimpleSlotWorker { "authorities_len" => authorities_len, ); - return None + return None; } let claim = self.claim_slot(&slot_info.chain_head, slot, &aux_data).await?; if self.should_backoff(slot, &slot_info.chain_head) { - return None + return None; } debug!(target: logging_target, "Starting authorship at slot: {slot}"); @@ -375,7 +375,7 @@ pub trait SimpleSlotWorker { "err" => ?err ); - return None + return None; }, }; @@ -402,7 +402,7 @@ pub trait SimpleSlotWorker { Err(err) => { warn!(target: logging_target, "Failed to create block import params: {}", err); - return None + return None; }, }; @@ -452,22 +452,20 @@ pub trait SimpleSlotWorker { } let end_import_timestamp = BlockMetrics::get_current_timestamp_in_ms_or_default(); + let block_number: u64 = header_num.try_into().unwrap_or_default(); + let block_hash = std::format!("{:?}", post_header_hash); // Metrics - let proposal_interval = IntervalWithBlockInformation { - kind: IntervalKind::Proposal, - block_number: header_num.try_into().unwrap_or_default(), - block_hash: std::format!("{:?}", post_header_hash), + let proposal_interval = IntervalDetailsProposal { start_timestamp: start_proposal_timestamp, end_timestamp: end_proposal_timestamp, }; - let import_interval = IntervalWithBlockInformation { + let import_interval = IntervalDetailsImport { + peer_id: None, start_timestamp: start_import_timestamp, end_timestamp: end_import_timestamp, - kind: IntervalKind::Import, - ..proposal_interval.clone() }; - BlockMetrics::observe_interval(proposal_interval); - BlockMetrics::observe_interval(import_interval); + BlockMetrics::observe_interval(block_number, block_hash.clone(), proposal_interval.into()); + BlockMetrics::observe_interval(block_number, block_hash, import_interval.into()); Some(SlotResult { block: B::new(header, body), storage_proof }) } @@ -549,7 +547,7 @@ pub async fn start_slot_worker( if sync_oracle.is_major_syncing() { debug!(target: LOG_TARGET, "Skipping proposal slot due to sync."); - continue + continue; } let _ = worker.on_slot(slot_info).await; @@ -629,7 +627,7 @@ pub fn proposing_remaining_duration( // If parent is genesis block, we don't require any lenience factor. if slot_info.chain_head.number().is_zero() { - return proposing_duration + return proposing_duration; } let parent_slot = match parent_slot { @@ -792,7 +790,7 @@ where ) -> bool { // This should not happen, but we want to keep the previous behaviour if it does. if slot_now <= chain_head_slot { - return false + return false; } // There can be race between getting the finalized number and getting the best number. diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index 54603f2428c7..f154e90f9878 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -49,7 +49,7 @@ use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; use sc_network_common::sync::message::{ BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock, }; -use sc_telemetry::custom_telemetry::{BlockMetrics, IntervalKind}; +use sc_telemetry::custom_telemetry::{BlockMetrics, IntervalDetailsPartialSync}; use sp_arithmetic::traits::Saturating; use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata}; use sp_consensus::{BlockOrigin, BlockStatus}; @@ -470,8 +470,9 @@ where /// Notify syncing state machine that a new sync peer has connected. pub fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { match self.add_peer_inner(peer_id, best_hash, best_number) { - Ok(Some(request)) => - self.actions.push(ChainSyncAction::SendBlockRequest { peer_id, request }), + Ok(Some(request)) => { + self.actions.push(ChainSyncAction::SendBlockRequest { peer_id, request }) + }, Ok(None) => {}, Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)), } @@ -502,7 +503,7 @@ where "💔 New peer {} with unknown genesis hash {} ({}).", peer_id, best_hash, best_number, ); - return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH)) + return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH)); } // If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have @@ -526,7 +527,7 @@ where state: PeerSyncState::Available, }, ); - return Ok(None) + return Ok(None); } // If we are at genesis, just start downloading. @@ -572,9 +573,9 @@ where Ok(req) }, - Ok(BlockStatus::Queued) | - Ok(BlockStatus::InChainWithState) | - Ok(BlockStatus::InChainPruned) => { + Ok(BlockStatus::Queued) + | Ok(BlockStatus::InChainWithState) + | Ok(BlockStatus::InChainPruned) => { debug!( target: LOG_TARGET, "New peer {peer_id} with known best hash {best_hash} ({best_number}).", @@ -649,14 +650,14 @@ where if self.is_known(hash) { debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}"); - return + return; } trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}"); for peer_id in &peers { if let Some(peer) = self.peers.get_mut(peer_id) { if let PeerSyncState::AncestorSearch { .. } = peer.state { - continue + continue; } if number > peer.best_number { @@ -702,12 +703,15 @@ where let timestamp = BlockMetrics::get_current_timestamp_in_ms_or_default(); for block in &blocks { if let Some(header) = &block.header { - BlockMetrics::observe_interval_partial( - IntervalKind::Sync, + let value = IntervalDetailsPartialSync { + peer_id: peer_id.clone(), + start_timestamp: None, + end_timestamp: Some(timestamp), + }; + BlockMetrics::observe_interval( header.number().clone().try_into().unwrap_or_default(), std::format!("{:?}", header.hash()), - timestamp, - false, + value.into(), ); } } @@ -760,16 +764,33 @@ where blocks } else { debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}"); - return Err(BadPeer(*peer_id, rep::NO_BLOCK)) + return Err(BadPeer(*peer_id, rep::NO_BLOCK)); } }, PeerSyncState::DownloadingStale(_) => { peer.state = PeerSyncState::Available; if blocks.is_empty() { debug!(target: LOG_TARGET, "Empty block response from {peer_id}"); - return Err(BadPeer(*peer_id, rep::NO_BLOCK)) + return Err(BadPeer(*peer_id, rep::NO_BLOCK)); } validate_blocks::(&blocks, peer_id, Some(request))?; + + let timestamp = BlockMetrics::get_current_timestamp_in_ms_or_default(); + for block in &blocks { + if let Some(header) = &block.header { + let value = IntervalDetailsPartialSync { + peer_id: peer_id.clone(), + start_timestamp: None, + end_timestamp: Some(timestamp), + }; + BlockMetrics::observe_interval( + header.number().clone().try_into().unwrap_or_default(), + std::format!("{:?}", header.hash()), + value.into(), + ); + } + } + blocks .into_iter() .map(|b| { @@ -808,19 +829,19 @@ where target: LOG_TARGET, "Invalid response when searching for ancestor from {peer_id}", ); - return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR)) + return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR)); }, (_, Err(e)) => { info!( target: LOG_TARGET, "❌ Error answering legitimate blockchain query: {e}", ); - return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR)) + return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR)); }, }; if matching_hash.is_some() { - if *start < self.best_queued_number && - self.best_queued_number <= peer.best_number + if *start < self.best_queued_number + && self.best_queued_number <= peer.best_number { // We've made progress on this chain since the search was started. // Opportunistically set common number to updated number @@ -849,7 +870,7 @@ where target: LOG_TARGET, "Ancestry search: genesis mismatch for peer {peer_id}", ); - return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH)) + return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH)); } if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *current, matching_hash.is_some()) @@ -864,7 +885,7 @@ where peer_id: *peer_id, request, }); - return Ok(()) + return Ok(()); } else { // Ancestry search is complete. Check if peer is on a stale fork unknown // to us and add it to sync targets if necessary. @@ -878,8 +899,8 @@ where matching_hash, peer.common_number, ); - if peer.common_number < peer.best_number && - peer.best_number < self.best_queued_number + if peer.common_number < peer.best_number + && peer.best_number < self.best_queued_number { trace!( target: LOG_TARGET, @@ -898,12 +919,12 @@ where .insert(*peer_id); } peer.state = PeerSyncState::Available; - return Ok(()) + return Ok(()); } }, - PeerSyncState::Available | - PeerSyncState::DownloadingJustification(..) | - PeerSyncState::DownloadingState => Vec::new(), + PeerSyncState::Available + | PeerSyncState::DownloadingJustification(..) + | PeerSyncState::DownloadingState => Vec::new(), } } else { // When request.is_none() this is a block announcement. Just accept blocks. @@ -931,7 +952,7 @@ where } } else { // We don't know of this peer, so we also did not request anything from it. - return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)); }; self.validate_and_queue_blocks(new_blocks, gap); @@ -953,7 +974,7 @@ where target: LOG_TARGET, "💔 Called on_block_justification with a peer ID of an unknown peer", ); - return Ok(()) + return Ok(()); }; self.allowed_requests.add(&peer_id); @@ -970,7 +991,7 @@ where hash, block.hash, ); - return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION)) + return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION)); } block @@ -996,7 +1017,7 @@ where number, justifications, }); - return Ok(()) + return Ok(()); } } @@ -1063,6 +1084,7 @@ where ) -> Option<(B::Hash, NumberFor)> { let number = *announce.header.number(); let hash = announce.header.hash(); + let parent_status = self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown); let known_parent = parent_status != BlockStatus::Unknown; @@ -1073,12 +1095,12 @@ where peer } else { error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID"); - return Some((hash, number)) + return Some((hash, number)); }; if let PeerSyncState::AncestorSearch { .. } = peer.state { trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id); - return None + return None; } let peer_info = is_best.then(|| { @@ -1094,8 +1116,8 @@ where if is_best { if known && self.best_queued_number >= number { self.update_peer_common_number(&peer_id, number); - } else if announce.header.parent_hash() == &self.best_queued_hash || - known_parent && self.best_queued_number >= number + } else if announce.header.parent_hash() == &self.best_queued_hash + || known_parent && self.best_queued_number >= number { self.update_peer_common_number(&peer_id, number.saturating_sub(One::one())); } @@ -1108,7 +1130,21 @@ where if let Some(target) = self.fork_targets.get_mut(&hash) { target.peers.insert(peer_id); } - return peer_info + + let summary = announce.summary(); + let now = BlockMetrics::get_current_timestamp_in_ms_or_default(); + let value = IntervalDetailsPartialSync { + peer_id, + start_timestamp: Some(now), + end_timestamp: None, + }; + BlockMetrics::observe_interval( + summary.number.try_into().unwrap_or_default(), + std::format!("{:?}", summary.block_hash), + value.into(), + ); + + return peer_info; } if ancient_parent { @@ -1119,7 +1155,7 @@ where hash, announce.header, ); - return peer_info + return peer_info; } if self.status().state == SyncState::Idle { @@ -1141,12 +1177,16 @@ where .insert(peer_id); let summary = announce.summary(); - BlockMetrics::observe_interval_partial( - IntervalKind::Sync, + let now = BlockMetrics::get_current_timestamp_in_ms_or_default(); + let value = IntervalDetailsPartialSync { + peer_id, + start_timestamp: Some(now), + end_timestamp: None, + }; + BlockMetrics::observe_interval( summary.number.try_into().unwrap_or_default(), std::format!("{:?}", summary.block_hash), - BlockMetrics::get_current_timestamp_in_ms_or_default(), - true, + value.into(), ); } @@ -1220,14 +1260,17 @@ where fn required_block_attributes(&self) -> BlockAttributes { match self.mode { - ChainSyncMode::Full => - BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY, - ChainSyncMode::LightState { storage_chain_mode: false, .. } => - BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY, - ChainSyncMode::LightState { storage_chain_mode: true, .. } => - BlockAttributes::HEADER | - BlockAttributes::JUSTIFICATION | - BlockAttributes::INDEXED_BODY, + ChainSyncMode::Full => { + BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY + }, + ChainSyncMode::LightState { storage_chain_mode: false, .. } => { + BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY + }, + ChainSyncMode::LightState { storage_chain_mode: true, .. } => { + BlockAttributes::HEADER + | BlockAttributes::JUSTIFICATION + | BlockAttributes::INDEXED_BODY + }, } } @@ -1299,7 +1342,7 @@ where for (n, peer) in self.peers.iter_mut() { if let PeerSyncState::AncestorSearch { .. } = peer.state { // Wait for ancestry search to complete first. - continue + continue; } let new_common_number = if peer.best_number >= number { number } else { peer.best_number }; @@ -1349,7 +1392,7 @@ where ); p.common_number = self.best_queued_number; self.peers.insert(peer_id, p); - return + return; } // handle peers that were in other states. @@ -1382,8 +1425,8 @@ where self.best_queued_hash = info.best_hash; self.best_queued_number = info.best_number; - if self.mode == ChainSyncMode::Full && - self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState + if self.mode == ChainSyncMode::Full + && self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState { self.import_existing = true; // Latest state is missing, start with the last finalized state or genesis instead. @@ -1418,7 +1461,7 @@ where /// What is the status of the block corresponding to the given hash? fn block_status(&self, hash: &B::Hash) -> Result { if self.queue_blocks.contains(hash) { - return Ok(BlockStatus::Queued) + return Ok(BlockStatus::Queued); } self.client.block_status(*hash) } @@ -1538,12 +1581,12 @@ where /// Get block requests scheduled by sync to be sent out. fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { if self.allowed_requests.is_empty() || self.state_sync.is_some() { - return Vec::new() + return Vec::new(); } if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { trace!(target: LOG_TARGET, "Too many blocks in the queue."); - return Vec::new() + return Vec::new(); } let is_major_syncing = self.status().state.is_major_syncing(); let attrs = self.required_block_attributes(); @@ -1562,7 +1605,7 @@ where .iter_mut() .filter_map(move |(&id, peer)| { if !peer.state.is_available() || !allowed_requests.contains(&id) { - return None + return None; } // If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from @@ -1570,11 +1613,11 @@ where // common number is smaller than the last finalized block number, we should do an // ancestor search to find a better common block. If the queue is full we wait till // all blocks are imported though. - if best_queued.saturating_sub(peer.common_number) > - MAX_BLOCKS_TO_LOOK_BACKWARDS.into() && - best_queued < peer.best_number && - peer.common_number < last_finalized && - queue.len() <= MAJOR_SYNC_BLOCKS.into() + if best_queued.saturating_sub(peer.common_number) + > MAX_BLOCKS_TO_LOOK_BACKWARDS.into() + && best_queued < peer.best_number + && peer.common_number < last_finalized + && queue.len() <= MAJOR_SYNC_BLOCKS.into() { trace!( target: LOG_TARGET, @@ -1659,17 +1702,17 @@ where /// Get a state request scheduled by sync to be sent out (if any). fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> { if self.allowed_requests.is_empty() { - return None + return None; } - if self.state_sync.is_some() && - self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) + if self.state_sync.is_some() + && self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) { // Only one pending state request is allowed. - return None + return None; } if let Some(sync) = &self.state_sync { if sync.is_complete() { - return None + return None; } for (id, peer) in self.peers.iter_mut() { @@ -1678,7 +1721,7 @@ where let request = sync.next_request(); trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request); self.allowed_requests.clear(); - return Some((*id, OpaqueStateRequest(Box::new(request)))) + return Some((*id, OpaqueStateRequest(Box::new(request)))); } } } @@ -1717,7 +1760,7 @@ where sync.import(*response) } else { debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}"); - return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)); }; match import_result { @@ -1769,16 +1812,17 @@ where } for (result, hash) in results { if has_error { - break + break; } has_error |= result.is_err(); match result { - Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => + Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => { if let Some(peer) = peer_id { self.update_peer_common_number(&peer, number); - }, + } + }, Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => { if aux.clear_justification_requests { trace!( @@ -1831,7 +1875,7 @@ where self.gap_sync = None; } }, - Err(BlockImportError::IncompleteHeader(peer_id)) => + Err(BlockImportError::IncompleteHeader(peer_id)) => { if let Some(peer) = peer_id { warn!( target: LOG_TARGET, @@ -1840,7 +1884,8 @@ where self.actions .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER))); self.restart(); - }, + } + }, Err(BlockImportError::VerificationFailed(peer_id, e)) => { let extra_message = peer_id .map_or_else(|| "".into(), |peer| format!(" received from ({peer})")); @@ -1857,14 +1902,15 @@ where self.restart(); }, - Err(BlockImportError::BadBlock(peer_id)) => + Err(BlockImportError::BadBlock(peer_id)) => { if let Some(peer) = peer_id { warn!( target: LOG_TARGET, "💔 Block {hash:?} received from peer {peer} has been blacklisted", ); self.actions.push(ChainSyncAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK))); - }, + } + }, Err(BlockImportError::MissingState) => { // This may happen if the chain we were requesting upon has been discarded // in the meantime because other chain has been finalized. @@ -1968,7 +2014,7 @@ fn handle_ancestor_search_state( if block_hash_match && next_distance_to_tip == One::one() { // We found the ancestor in the first step so there is no need to execute binary // search. - return None + return None; } if block_hash_match { let left = curr_block_num; @@ -1987,7 +2033,7 @@ fn handle_ancestor_search_state( }, AncestorSearchState::BinarySearch(mut left, mut right) => { if left >= curr_block_num { - return None + return None; } if block_hash_match { left = curr_block_num; @@ -2018,7 +2064,7 @@ fn peer_block_request( ) -> Option<(Range>, BlockRequest)> { if best_num >= peer.best_number { // Will be downloaded as alternative fork instead. - return None + return None; } else if peer.common_number < finalized { trace!( target: LOG_TARGET, @@ -2106,7 +2152,7 @@ fn fork_sync_request( hash, r.number, ); - return false + return false; } if check_block(hash) != BlockStatus::Unknown { trace!( @@ -2115,18 +2161,18 @@ fn fork_sync_request( hash, r.number, ); - return false + return false; } true }); for (hash, r) in targets { if !r.peers.contains(&id) { - continue + continue; } // Download the fork only if it is behind or not too far ahead our tip of the chain // Otherwise it should be downloaded in full sync mode. - if r.number <= best_num || - (r.number - best_num).saturated_into::() < max_blocks_per_request as u32 + if r.number <= best_num + || (r.number - best_num).saturated_into::() < max_blocks_per_request as u32 { let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block); let count = if parent_status == BlockStatus::Unknown { @@ -2148,7 +2194,7 @@ fn fork_sync_request( direction: Direction::Descending, max: Some(count), }, - )) + )); } else { trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number); } @@ -2167,7 +2213,7 @@ where T: HeaderMetadata + ?Sized, { if base == block { - return Ok(false) + return Ok(false); } let ancestor = sp_blockchain::lowest_common_ancestor(client, *block, *base)?; @@ -2194,7 +2240,7 @@ pub fn validate_blocks( blocks.len(), ); - return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)); } let block_header = @@ -2214,18 +2260,18 @@ pub fn validate_blocks( block_header, ); - return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)); } - if request.fields.contains(BlockAttributes::HEADER) && - blocks.iter().any(|b| b.header.is_none()) + if request.fields.contains(BlockAttributes::HEADER) + && blocks.iter().any(|b| b.header.is_none()) { trace!( target: LOG_TARGET, "Missing requested header for a block in response from {peer_id}.", ); - return Err(BadPeer(*peer_id, rep::BAD_RESPONSE)) + return Err(BadPeer(*peer_id, rep::BAD_RESPONSE)); } if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none()) @@ -2235,7 +2281,7 @@ pub fn validate_blocks( "Missing requested body for a block in response from {peer_id}.", ); - return Err(BadPeer(*peer_id, rep::BAD_RESPONSE)) + return Err(BadPeer(*peer_id, rep::BAD_RESPONSE)); } } @@ -2250,7 +2296,7 @@ pub fn validate_blocks( b.hash, hash, ); - return Err(BadPeer(*peer_id, rep::BAD_BLOCK)) + return Err(BadPeer(*peer_id, rep::BAD_BLOCK)); } } if let (Some(header), Some(body)) = (&b.header, &b.body) { @@ -2268,7 +2314,7 @@ pub fn validate_blocks( expected, got, ); - return Err(BadPeer(*peer_id, rep::BAD_BLOCK)) + return Err(BadPeer(*peer_id, rep::BAD_BLOCK)); } } } diff --git a/substrate/client/telemetry/src/custom_telemetry.rs b/substrate/client/telemetry/src/custom_telemetry.rs index d10263a5b253..c947a3b05cd6 100644 --- a/substrate/client/telemetry/src/custom_telemetry.rs +++ b/substrate/client/telemetry/src/custom_telemetry.rs @@ -5,10 +5,12 @@ // use std::{ + collections::BTreeMap, sync::Mutex, time::{Duration, SystemTimeError}, }; +use libp2p::PeerId; use serde::Serialize; use wasm_timer::{SystemTime, UNIX_EPOCH}; @@ -30,15 +32,71 @@ pub enum IntervalKind { Import = 2, } -/// Interval information bundled together with block information. -#[derive(Serialize, Clone)] -pub struct IntervalWithBlockInformation { +/// +#[derive(Debug, Clone, Default)] +pub struct BlockIntervals { + proposal: Option, + import: Option, + sync: Option, + partial_syncs: Vec, +} + +/// +#[derive(Debug, Clone)] +pub enum IntervalDetails { + /// + Proposal(IntervalDetailsProposal), + /// + Import(IntervalDetailsImport), + /// + Sync(IntervalDetailsSync), + /// + PartialSync(IntervalDetailsPartialSync), +} + +impl From for IntervalDetails { + fn from(value: IntervalDetailsProposal) -> Self { + Self::Proposal(value) + } +} + +impl From for IntervalDetails { + fn from(value: IntervalDetailsImport) -> Self { + Self::Import(value) + } +} + +impl From for IntervalDetails { + fn from(value: IntervalDetailsPartialSync) -> Self { + Self::PartialSync(value) + } +} + +/// +#[derive(Debug, Clone)] +pub struct IntervalDetailsProposal { + /// + pub start_timestamp: u64, /// - pub kind: IntervalKind, + pub end_timestamp: u64, +} + +/// +#[derive(Debug, Clone)] +pub struct IntervalDetailsImport { /// - pub block_number: u64, + pub peer_id: Option, /// - pub block_hash: String, + pub start_timestamp: u64, + /// + pub end_timestamp: u64, +} + +/// +#[derive(Debug, Clone)] +pub struct IntervalDetailsSync { + /// + pub peer_id: PeerId, /// pub start_timestamp: u64, /// @@ -46,7 +104,18 @@ pub struct IntervalWithBlockInformation { } /// -#[derive(Serialize)] +#[derive(Debug, Clone)] +pub struct IntervalDetailsPartialSync { + /// + pub peer_id: PeerId, + /// + pub start_timestamp: Option, + /// + pub end_timestamp: Option, +} + +/// +#[derive(Serialize, Debug, Clone)] pub struct BlockRequestsDetail { /// pub current_queue_size: u32, @@ -56,13 +125,13 @@ pub struct BlockRequestsDetail { pub time_frame: u64, } +const MAX_BLOCKS_PER_HEIGHT: usize = 50; + /// -#[derive(Default)] +#[derive(Default, Debug, Clone)] pub struct BlockMetrics { /// - intervals: Vec, - /// - partial_intervals: Vec, + intervals: Option>>, /// block_requests: Vec, /// @@ -71,80 +140,133 @@ pub struct BlockMetrics { max_block_request_buffer_size: usize, } +static BLOCK_METRICS: Mutex = Mutex::new(BlockMetrics::new()); + impl BlockMetrics { /// pub const fn new() -> Self { Self { - intervals: Vec::new(), - partial_intervals: Vec::new(), + intervals: None, block_requests: Vec::new(), max_interval_buffer_size: 0, max_block_request_buffer_size: 0, } } -} - -static BLOCK_METRICS: Mutex = Mutex::new(BlockMetrics::new()); -impl BlockMetrics { /// - pub fn observe_interval(value: IntervalWithBlockInformation) { - let Ok(mut lock) = BLOCK_METRICS.lock() else { - return; + pub fn take_intervals(&mut self) -> Vec<(u64, BTreeMap)> { + let Some(intervals) = &mut self.intervals else { + return Vec::new(); }; - lock.intervals.push(value); + let mut data: Vec<(u64, BTreeMap)> = Vec::new(); - if lock.intervals.len() > lock.max_interval_buffer_size { - lock.intervals.remove(0); + while let Some((_, blocks)) = intervals.first_key_value() { + if intervals.len() > 1 || Self::is_interval_ready(blocks) { + if let Some(interval) = intervals.pop_first() { + data.push(interval) + } else { + break; + } + } else { + break; + } } + + data } /// - pub fn observe_interval_partial( - kind: IntervalKind, - block_number: u64, - block_hash: String, - timestamp: u64, - is_start: bool, - ) { - let mut entry = { - let Ok(mut lock) = BLOCK_METRICS.lock() else { - return; - }; - - if is_start { - let value = IntervalWithBlockInformation { - kind, - block_number, - block_hash, - start_timestamp: timestamp, - end_timestamp: 0, - }; - - lock.partial_intervals.push(value); + pub fn take_block_requests(&mut self) -> Vec { + std::mem::take(&mut self.block_requests) + } - if lock.partial_intervals.len() > lock.max_interval_buffer_size { - lock.partial_intervals.remove(0); - } + fn is_interval_ready(blocks: &BTreeMap) -> bool { + let mut done = true; + for (_, block) in blocks { + let count = block.proposal.is_some() as u32 + + block.import.is_some() as u32 + + block.sync.is_some() as u32; - return; + if count < 2 { + done = false; } + } - let existing_entry_pos = lock.partial_intervals.iter_mut().position(|v| { - v.block_hash == block_hash && v.block_number == block_number && v.kind == kind - }); - - let Some(pos) = existing_entry_pos else { - return; - }; + done + } - lock.partial_intervals.remove(pos) + /// + pub fn observe_interval(block_number: u64, block_hash: String, value: IntervalDetails) { + let Ok(mut lock) = BLOCK_METRICS.lock() else { + return; }; + let max_buffer_size = lock.max_interval_buffer_size; + + let intervals = lock.intervals.get_or_insert(BTreeMap::new()); + if intervals.len() >= max_buffer_size { + return; + } - entry.end_timestamp = timestamp; + let block_height = intervals.entry(block_number).or_default(); + if block_height.len() >= MAX_BLOCKS_PER_HEIGHT { + return; + } + + let block = block_height.entry(block_hash.clone()).or_default(); + + match value { + IntervalDetails::Proposal(v) => { + if block.proposal.is_some() { + return; + } + block.proposal = Some(v); + }, + IntervalDetails::Import(v) => { + if block.import.is_some() { + return; + } + let peer_id = v.peer_id.clone(); + block.import = Some(v); + + if let Some(sync) = + block.partial_syncs.iter().find(|ps| Some(ps.peer_id) == peer_id) + { + if let (Some(start_timestamp), Some(end_timestamp)) = + (sync.start_timestamp, sync.end_timestamp) + { + block.sync = Some(IntervalDetailsSync { + peer_id: sync.peer_id, + start_timestamp, + end_timestamp, + }) + } + } + }, + IntervalDetails::PartialSync(v) => { + // If we already have sync that is paired with import, then don't do anything. + if block.sync.is_some() { + return; + } - Self::observe_interval(entry); + if let Some(p_sync) = + block.partial_syncs.iter_mut().find(|s| s.peer_id == v.peer_id) + { + p_sync.start_timestamp = match v.start_timestamp { + Some(s) => Some(s), + _ => p_sync.start_timestamp, + }; + + p_sync.end_timestamp = match v.end_timestamp { + Some(s) => Some(s), + _ => p_sync.end_timestamp, + }; + } else { + block.partial_syncs.push(v); + } + }, + _ => (), + }; } /// @@ -161,16 +283,16 @@ impl BlockMetrics { } /// - pub fn take_metrics() -> Option { + pub fn take_metrics( + ) -> Option<(Vec<(u64, BTreeMap)>, Vec)> { let Ok(mut lock) = BLOCK_METRICS.lock() else { return None; }; - let metrics = std::mem::take(&mut *lock); - lock.max_interval_buffer_size = metrics.max_interval_buffer_size; - lock.max_block_request_buffer_size = metrics.max_block_request_buffer_size; + let intervals = lock.take_intervals(); + let block_requests = lock.take_block_requests(); - Some(metrics) + Some((intervals, block_requests)) } /// @@ -184,13 +306,15 @@ impl BlockMetrics { } } -/// This will be send to the telemetry +/// This will be send to the telemetry backend pub mod external { use super::*; /// #[derive(Debug, Serialize, Clone)] pub struct IntervalFromNode { + /// + pub peer_id: Option, /// pub kind: IntervalKind, /// @@ -199,6 +323,40 @@ pub mod external { pub end_timestamp: u64, } + impl From for IntervalFromNode { + fn from(value: IntervalDetailsProposal) -> Self { + Self { + peer_id: None, + kind: IntervalKind::Proposal, + start_timestamp: value.start_timestamp, + end_timestamp: value.end_timestamp, + } + } + } + + impl From for IntervalFromNode { + fn from(value: IntervalDetailsImport) -> Self { + let peer_id = value.peer_id.and_then(|p| Some(p.to_string())); + Self { + peer_id, + kind: IntervalKind::Import, + start_timestamp: value.start_timestamp, + end_timestamp: value.end_timestamp, + } + } + } + + impl From for IntervalFromNode { + fn from(value: IntervalDetailsSync) -> Self { + Self { + peer_id: Some(value.peer_id.to_string()), + kind: IntervalKind::Sync, + start_timestamp: value.start_timestamp, + end_timestamp: value.end_timestamp, + } + } + } + /// #[derive(Debug, Default, Serialize, Clone)] pub struct BlockIntervalFromNode { @@ -207,46 +365,38 @@ pub mod external { /// pub block_hash: String, /// - pub intervals: Vec, + pub proposal: Option, + /// + pub import: Option, + /// + pub sync: Option, } /// pub fn prepare_data( - mut value: Vec, + block_heights: Vec<(u64, BTreeMap)>, ) -> Vec { - let mut output = Vec::with_capacity(value.len() / 2); - value.sort_by(|l, r| { - if l.block_number == r.block_number { - l.block_hash.cmp(&r.block_hash) - } else { - l.block_number.cmp(&r.block_number) - } - }); - - let mut block = BlockIntervalFromNode::default(); - for v in value { - let interval = IntervalFromNode { - kind: v.kind, - start_timestamp: v.start_timestamp, - end_timestamp: v.end_timestamp, - }; - - if (v.block_number != block.block_number || v.block_hash != block.block_hash) - && block.block_number != u64::default() - { - output.push(std::mem::take(&mut block)); - } + let mut processed_blocks: Vec = Vec::new(); - block.block_number = v.block_number; - block.block_hash = v.block_hash; - block.intervals.push(interval); - } + for (block_number, forks) in block_heights { + for (block_hash, data) in forks { + if data.import.is_none() && data.proposal.is_none() && data.sync.is_none() { + continue; + } + + let block = BlockIntervalFromNode { + block_number, + block_hash: block_hash.clone(), + proposal: data.proposal.and_then(|p| Some(p.into())), + import: data.import.and_then(|p| Some(p.into())), + sync: data.sync.and_then(|p| Some(p.into())), + }; - if block.block_number != u64::default() { - output.push(block); + processed_blocks.push(block); + } } - output + processed_blocks } } @@ -260,6 +410,8 @@ pub struct CustomTelemetryWorker { pub max_interval_buffer_size: usize, /// pub max_block_request_buffer_size: usize, + /// + pub is_authority: bool, } impl CustomTelemetryWorker { @@ -303,6 +455,7 @@ impl CustomTelemetryWorker { "block.metrics"; "block_intervals" => block_intervals, "block_requests" => block_requests, + "is_authority" => self.is_authority, ); } } @@ -311,15 +464,15 @@ impl CustomTelemetryWorker { filter_intervals: Option) -> Vec>, filter_block_requests: Option) -> Vec>, ) -> (Vec, Vec) { - let metrics = BlockMetrics::take_metrics().unwrap_or_default(); + let (intervals, block_requests) = BlockMetrics::take_metrics().unwrap_or_default(); - let block_intervals = external::prepare_data(metrics.intervals); + let block_intervals = external::prepare_data(intervals); let block_intervals = match filter_intervals { Some(f) => f(block_intervals), _ => block_intervals, }; - let block_requests = metrics.block_requests; + let block_requests = block_requests; let block_requests = match filter_block_requests { Some(f) => f(block_requests), _ => block_requests, @@ -328,210 +481,3 @@ impl CustomTelemetryWorker { (block_intervals, block_requests) } } - -#[cfg(test)] -mod tests { - use super::*; - - fn dummy_interval(block_number: Option) -> IntervalWithBlockInformation { - IntervalWithBlockInformation { - kind: IntervalKind::Import, - block_number: block_number.unwrap_or(0), - block_hash: "".to_string(), - start_timestamp: 0, - end_timestamp: 0, - } - } - - fn observe_block_request(time_frame: u64) { - let value = BlockRequestsDetail { current_queue_size: 0, requests_handled: 0, time_frame }; - BlockMetrics::observe_block_request(value); - } - - fn reset_global_variable(buffer_size: usize) { - { - let mut lock = BLOCK_METRICS.lock().unwrap(); - lock.max_interval_buffer_size = buffer_size; - lock.max_block_request_buffer_size = buffer_size; - } - - _ = BlockMetrics::take_metrics().unwrap(); - } - - #[test] - fn buffer_interval_0_buffer_size() { - reset_global_variable(0); - - BlockMetrics::observe_interval(dummy_interval(None)); - let metrics = BlockMetrics::take_metrics().unwrap(); - assert_eq!(metrics.intervals.len(), 0); - } - - #[test] - fn buffer_interval_1_buffer_size() { - reset_global_variable(1); - - BlockMetrics::observe_interval(dummy_interval(Some(0))); - let metrics = BlockMetrics::take_metrics().unwrap(); - assert_eq!(metrics.intervals.len(), 1); - assert_eq!(metrics.intervals[0].block_number, 0); - - BlockMetrics::observe_interval(dummy_interval(Some(1))); - let metrics = BlockMetrics::take_metrics().unwrap(); - assert_eq!(metrics.intervals.len(), 1); - assert_eq!(metrics.intervals[0].block_number, 1); - } - - #[test] - fn mem_take_works() { - const BUFFER_SIZE: usize = 10; - reset_global_variable(BUFFER_SIZE); - - BlockMetrics::observe_interval(dummy_interval(Some(0))); - BlockMetrics::observe_interval_partial(IntervalKind::Sync, 0, "".to_string(), 0, true); - BlockMetrics::observe_interval_partial(IntervalKind::Sync, 10, "".to_string(), 0, true); - BlockMetrics::observe_interval_partial(IntervalKind::Sync, 10, "".to_string(), 0, false); - - { - let lock = BLOCK_METRICS.lock().unwrap(); - assert_eq!(lock.max_interval_buffer_size, BUFFER_SIZE); - assert_eq!(lock.intervals.len(), 2); - assert_eq!(lock.partial_intervals.len(), 1); - assert_eq!(lock.block_requests.len(), 0); - } - - let old_metrics = BlockMetrics::take_metrics().unwrap(); - assert_eq!(old_metrics.max_interval_buffer_size, BUFFER_SIZE); - assert_eq!(old_metrics.intervals.len(), 2); - assert_eq!(old_metrics.partial_intervals.len(), 1); - assert_eq!(old_metrics.block_requests.len(), 0); - - let lock = BLOCK_METRICS.lock().unwrap(); - assert_eq!(lock.max_interval_buffer_size, BUFFER_SIZE); - assert_eq!(lock.intervals.len(), 0); - assert_eq!(lock.partial_intervals.len(), 0); - assert_eq!(lock.block_requests.len(), 0); - } - - #[test] - fn buffer_partial_interval_0_buffer_size() { - reset_global_variable(0); - - BlockMetrics::observe_interval_partial(IntervalKind::Sync, 0, "".to_string(), 0, true); - let metrics = BlockMetrics::take_metrics().unwrap(); - assert_eq!(metrics.partial_intervals.len(), 0); - } - - #[test] - fn buffer_partial_interval_1_buffer_size() { - reset_global_variable(1); - - BlockMetrics::observe_interval_partial(IntervalKind::Sync, 0, "".to_string(), 0, true); - let metrics = BlockMetrics::take_metrics().unwrap(); - assert_eq!(metrics.partial_intervals.len(), 1); - assert_eq!(metrics.partial_intervals[0].block_number, 0); - - BlockMetrics::observe_interval_partial(IntervalKind::Sync, 1, "".to_string(), 0, true); - let metrics = BlockMetrics::take_metrics().unwrap(); - assert_eq!(metrics.partial_intervals.len(), 1); - assert_eq!(metrics.partial_intervals[0].block_number, 1); - } - - #[test] - fn buffer_partial_interval_works() { - reset_global_variable(1); - - BlockMetrics::observe_interval_partial(IntervalKind::Sync, 25, "".to_string(), 0, true); - { - let lock = BLOCK_METRICS.lock().unwrap(); - assert_eq!(lock.partial_intervals.len(), 1); - assert_eq!(lock.partial_intervals[0].block_number, 25); - } - - BlockMetrics::observe_interval_partial(IntervalKind::Sync, 25, "".to_string(), 0, false); - { - let lock = BLOCK_METRICS.lock().unwrap(); - assert_eq!(lock.partial_intervals.len(), 0); - assert_eq!(lock.intervals.len(), 1); - assert_eq!(lock.intervals[0].block_number, 25); - } - } - - #[test] - fn get_and_filter_data_works() { - reset_global_variable(10); - use IntervalKind::*; - - let setup_scenartion = || { - BlockMetrics::observe_interval_partial(Import, 1, "".to_string(), 0, true); - BlockMetrics::observe_interval_partial(Import, 1, "".to_string(), 0, false); - BlockMetrics::observe_interval_partial(Sync, 1, "".to_string(), 0, true); - BlockMetrics::observe_interval_partial(Sync, 1, "".to_string(), 0, false); - BlockMetrics::observe_interval_partial(Proposal, 2, "".to_string(), 0, true); - BlockMetrics::observe_interval_partial(Proposal, 2, "".to_string(), 0, false); - BlockMetrics::observe_interval_partial(Proposal, 3, "".to_string(), 0, true); - observe_block_request(1); - observe_block_request(2); - observe_block_request(3); - }; - - setup_scenartion(); - - let (block_intervals, block_requests) = - CustomTelemetryWorker::get_and_filter_data(None, None); - - assert_eq!(block_intervals.len(), 2); - assert_eq!(block_intervals[0].block_number, 1); - assert_eq!(block_intervals[0].block_number, 1); - assert_eq!(block_intervals[0].intervals.len(), 2); - assert_eq!(block_intervals[1].intervals.len(), 1); - - assert_eq!(block_requests.len(), 3); - assert_eq!(block_requests[0].time_frame, 1); - assert_eq!(block_requests[1].time_frame, 2); - assert_eq!(block_requests[2].time_frame, 3); - - // Second test. Filter Interval data 1 - setup_scenartion(); - - let (block_intervals, block_requests) = - CustomTelemetryWorker::get_and_filter_data(Some(no_data_interval), Some(no_data_request)); - assert_eq!(block_intervals.len(), 0); - assert_eq!(block_requests.len(), 0); - - // Third test. Filter Interval data 2 - setup_scenartion(); - - let (block_intervals, block_requests) = - CustomTelemetryWorker::get_and_filter_data(Some(one_interval), Some(one_request)); - assert_eq!(block_intervals.len(), 1); - assert_eq!(block_intervals[0].block_number, 1); - assert_eq!(block_requests.len(), 1); - assert_eq!(block_requests[0].time_frame, 1); - } - - fn no_data_interval(_: Vec) -> Vec { - vec![] - } - - fn no_data_request(_: Vec) -> Vec { - vec![] - } - - fn one_interval(mut value: Vec) -> Vec { - while value.len() > 1 { - value.pop(); - } - - value - } - - fn one_request(mut value: Vec) -> Vec { - while value.len() > 1 { - value.pop(); - } - - - value - } -}