Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored custom telemetry. Expanded it as well. #16

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions substrate/bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,18 @@ pub fn new_partial(config: &Configuration) -> Result<Service, ServiceError> {
});

let telemetry_handle = telemetry.as_ref().map(|t| t.handle());
let custom_telemetry_worker = CustomTelemetryWorker
{
handle: telemetry_handle,
let custom_telemetry_worker = CustomTelemetryWorker {
handle: telemetry_handle,
sampling_interval_ms: 6_000u128,
max_interval_buffer_size: 20,
max_block_request_buffer_size: 15,
is_authority: config.role.is_authority(),
};
task_manager
.spawn_handle()
.spawn("custom_telemetry", None, custom_telemetry_worker.run(None, None));
task_manager.spawn_handle().spawn(
"custom_telemetry",
None,
custom_telemetry_worker.run(None, None),
);

let select_chain = sc_consensus::LongestChain::new(backend.clone());

Expand Down
25 changes: 13 additions & 12 deletions substrate/client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ impl<N: std::fmt::Debug + PartialEq> BlockImportStatus<N> {
/// Returns the imported block number.
pub fn number(&self) -> &N {
match self {
BlockImportStatus::ImportedKnown(n, _) |
BlockImportStatus::ImportedUnknown(n, _, _) => n,
BlockImportStatus::ImportedKnown(n, _)
| BlockImportStatus::ImportedUnknown(n, _, _) => n,
}
}
}
Expand Down Expand Up @@ -251,14 +251,14 @@ pub(crate) async fn import_single_block_metered_v2<B: BlockT, V: Verifier<B>>(
import_single_block_metered(import_handle, block_origin, block, verifier, metrics).await;
let end_timestamp = BlockMetrics::get_current_timestamp_in_ms_or_default();

let interval = IntervalWithBlockInformation {
kind: IntervalKind::Import,
block_number,
block_hash,
start_timestamp,
end_timestamp,
match &res {
Ok(BlockImportStatus::ImportedUnknown(_, _, peer_id)) => {
let peer_id = peer_id.clone();
let value = IntervalDetailsImport { peer_id, start_timestamp, end_timestamp };
BlockMetrics::observe_interval(block_number, block_hash, value.into());
},
_ => (),
};
BlockMetrics::observe_interval(interval);

res
}
Expand All @@ -284,7 +284,7 @@ pub(crate) async fn import_single_block_metered<B: BlockT, V: Verifier<B>>(
} else {
debug!(target: LOG_TARGET, "Header {} was not provided ", block.hash);
}
return Err(BlockImportError::IncompleteHeader(peer))
return Err(BlockImportError::IncompleteHeader(peer));
},
};

Expand All @@ -299,8 +299,9 @@ pub(crate) async fn import_single_block_metered<B: BlockT, V: Verifier<B>>(
trace!(target: LOG_TARGET, "Block already in chain {}: {:?}", number, hash);
Ok(BlockImportStatus::ImportedKnown(number, peer))
},
Ok(ImportResult::Imported(aux)) =>
Ok(BlockImportStatus::ImportedUnknown(number, aux, peer)),
Ok(ImportResult::Imported(aux)) => {
Ok(BlockImportStatus::ImportedUnknown(number, aux, peer))
},
Ok(ImportResult::MissingState) => {
debug!(
target: LOG_TARGET,
Expand Down
19 changes: 10 additions & 9 deletions substrate/client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl<B: BlockT> BasicQueueHandle<B> {
impl<B: BlockT> ImportQueueService<B> for BasicQueueHandle<B> {
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
if blocks.is_empty() {
return
return;
}

trace!(target: LOG_TARGET, "Scheduling {} blocks for import", blocks.len());
Expand Down Expand Up @@ -192,7 +192,7 @@ impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
loop {
if let Err(_) = self.result_port.next_action(&mut *link).await {
log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
return
return;
}
}
}
Expand Down Expand Up @@ -235,7 +235,7 @@ async fn block_import_process<B: BlockT>(
target: LOG_TARGET,
"Stopping block import because the import channel was closed!",
);
return
return;
},
};

Expand Down Expand Up @@ -309,26 +309,27 @@ impl<B: BlockT> BlockImportWorker<B> {
target: LOG_TARGET,
"Stopping block import because result channel was closed!",
);
return
return;
}

// Make sure to first process all justifications
while let Poll::Ready(justification) = futures::poll!(justification_port.next()) {
match justification {
Some(ImportJustification(who, hash, number, justification)) =>
worker.import_justification(who, hash, number, justification).await,
Some(ImportJustification(who, hash, number, justification)) => {
worker.import_justification(who, hash, number, justification).await
},
None => {
log::debug!(
target: LOG_TARGET,
"Stopping block import because justification channel was closed!",
);
return
return;
},
}
}

if let Poll::Ready(()) = futures::poll!(&mut block_import_process) {
return
return;
}

// All futures that we polled are now pending.
Expand Down Expand Up @@ -422,7 +423,7 @@ async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
Some(b) => b,
None => {
// No block left to import, success!
return ImportManyBlocksResult { block_count: count, imported, results }
return ImportManyBlocksResult { block_count: count, imported, results };
},
};

Expand Down
50 changes: 24 additions & 26 deletions substrate/client/consensus/slots/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use futures_timer::Delay;
use log::{debug, info, warn};
use sc_consensus::{BlockImport, JustificationSyncLink};
use sc_telemetry::{
custom_telemetry::{BlockMetrics, IntervalKind, IntervalWithBlockInformation},
custom_telemetry::{BlockMetrics, IntervalDetailsImport, IntervalDetailsProposal},
telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO, CONSENSUS_WARN,
};
use sp_arithmetic::traits::BaseArithmetic;
Expand Down Expand Up @@ -222,7 +222,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
Either::Left((Err(err), _)) => {
warn!(target: log_target, "Proposing failed: {}", err);

return None
return None;
},
Either::Right(_) => {
info!(
Expand All @@ -242,7 +242,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
"slot" => *slot,
);

return None
return None;
},
};

Expand All @@ -268,7 +268,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
err,
);

return None
return None;
},
Either::Left(_) => {
warn!(
Expand All @@ -278,7 +278,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
slot_info.chain_head.hash(),
);

return None
return None;
},
};

Expand All @@ -305,7 +305,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
"Skipping proposal slot {} since there's no time left to propose", slot,
);

return None
return None;
} else {
Instant::now() + proposing_remaining_duration
};
Expand All @@ -328,17 +328,17 @@ pub trait SimpleSlotWorker<B: BlockT> {
"err" => ?err,
);

return None
return None;
},
};

self.notify_slot(&slot_info.chain_head, slot, &aux_data);

let authorities_len = self.authorities_len(&aux_data);

if !self.force_authoring() &&
self.sync_oracle().is_offline() &&
authorities_len.map(|a| a > 1).unwrap_or(false)
if !self.force_authoring()
&& self.sync_oracle().is_offline()
&& authorities_len.map(|a| a > 1).unwrap_or(false)
{
debug!(target: logging_target, "Skipping proposal slot. Waiting for the network.");
telemetry!(
Expand All @@ -348,13 +348,13 @@ pub trait SimpleSlotWorker<B: BlockT> {
"authorities_len" => authorities_len,
);

return None
return None;
}

let claim = self.claim_slot(&slot_info.chain_head, slot, &aux_data).await?;

if self.should_backoff(slot, &slot_info.chain_head) {
return None
return None;
}

debug!(target: logging_target, "Starting authorship at slot: {slot}");
Expand All @@ -375,7 +375,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
"err" => ?err
);

return None
return None;
},
};

Expand All @@ -402,7 +402,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
Err(err) => {
warn!(target: logging_target, "Failed to create block import params: {}", err);

return None
return None;
},
};

Expand Down Expand Up @@ -452,22 +452,20 @@ pub trait SimpleSlotWorker<B: BlockT> {
}
let end_import_timestamp = BlockMetrics::get_current_timestamp_in_ms_or_default();

let block_number: u64 = header_num.try_into().unwrap_or_default();
let block_hash = std::format!("{:?}", post_header_hash);
// Metrics
let proposal_interval = IntervalWithBlockInformation {
kind: IntervalKind::Proposal,
block_number: header_num.try_into().unwrap_or_default(),
block_hash: std::format!("{:?}", post_header_hash),
let proposal_interval = IntervalDetailsProposal {
start_timestamp: start_proposal_timestamp,
end_timestamp: end_proposal_timestamp,
};
let import_interval = IntervalWithBlockInformation {
let import_interval = IntervalDetailsImport {
peer_id: None,
start_timestamp: start_import_timestamp,
end_timestamp: end_import_timestamp,
kind: IntervalKind::Import,
..proposal_interval.clone()
};
BlockMetrics::observe_interval(proposal_interval);
BlockMetrics::observe_interval(import_interval);
BlockMetrics::observe_interval(block_number, block_hash.clone(), proposal_interval.into());
BlockMetrics::observe_interval(block_number, block_hash, import_interval.into());

Some(SlotResult { block: B::new(header, body), storage_proof })
}
Expand Down Expand Up @@ -549,7 +547,7 @@ pub async fn start_slot_worker<B, C, W, SO, CIDP, Proof>(

if sync_oracle.is_major_syncing() {
debug!(target: LOG_TARGET, "Skipping proposal slot due to sync.");
continue
continue;
}

let _ = worker.on_slot(slot_info).await;
Expand Down Expand Up @@ -629,7 +627,7 @@ pub fn proposing_remaining_duration<Block: BlockT>(

// If parent is genesis block, we don't require any lenience factor.
if slot_info.chain_head.number().is_zero() {
return proposing_duration
return proposing_duration;
}

let parent_slot = match parent_slot {
Expand Down Expand Up @@ -792,7 +790,7 @@ where
) -> bool {
// This should not happen, but we want to keep the previous behaviour if it does.
if slot_now <= chain_head_slot {
return false
return false;
}

// There can be race between getting the finalized number and getting the best number.
Expand Down
Loading
Loading