Skip to content

Commit

Permalink
[indexer-alt] Add more checkpoint lag metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Dec 17, 2024
1 parent 8d2a77c commit b0196d9
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 8 deletions.
86 changes: 86 additions & 0 deletions crates/sui-indexer-alt-framework/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub(crate) struct IndexerMetrics {
pub total_ingested_transient_retries: IntCounterVec,
pub total_ingested_not_found_retries: IntCounter,

// Checkpoint lag metrics for the ingestion pipeline.
pub latest_ingested_checkpoint: IntGauge,
pub latest_ingested_checkpoint_timestamp_lag_ms: IntGauge,
pub ingested_checkpoint_timestamp_lag: Histogram,
Expand Down Expand Up @@ -102,6 +103,26 @@ pub(crate) struct IndexerMetrics {
pub total_pruner_chunks_deleted: IntCounterVec,
pub total_pruner_rows_deleted: IntCounterVec,

// Checkpoint lag metrics for the collector.
pub latest_collected_checkpoint: IntGaugeVec,
pub latest_collected_checkpoint_timestamp_lag_ms: IntGaugeVec,
pub collected_checkpoint_timestamp_lag: HistogramVec,

// Checkpoint lag metrics for the committer.
// We can only report partially committed checkpoints, since the concurrent committer isn't aware of
// when a checkpoint is fully committed. So we report whenever we see a checkpoint. Since data from
// the same checkpoint is batched continuously, this is a good proxy for the last committed checkpoint.
pub latest_partially_committed_checkpoint: IntGaugeVec,
pub latest_partially_committed_checkpoint_timestamp_lag_ms: IntGaugeVec,
pub partially_committed_checkpoint_timestamp_lag: HistogramVec,

// Checkpoint lag metrics for the watermarker.
// The latest watermarked checkpoint metric is already covered by watermark_checkpoint_in_db.
// While we already have watermark_timestamp_in_db_ms metric, reporting the lag explicitly
// for consistency.
pub latest_watermarked_checkpoint_timestamp_lag_ms: IntGaugeVec,
pub watermarked_checkpoint_timestamp_lag: HistogramVec,

pub collector_gather_latency: HistogramVec,
pub collector_batch_size: HistogramVec,
pub committer_commit_latency: HistogramVec,
Expand Down Expand Up @@ -409,6 +430,71 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
latest_collected_checkpoint: register_int_gauge_vec_with_registry!(
"indexer_latest_collected_checkpoint",
"Latest checkpoint sequence number collected by this collector",
&["pipeline"],
registry,
)
.unwrap(),
latest_collected_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
"indexer_latest_collected_checkpoint_timestamp_lag_ms",
"Difference between the system timestamp when the latest checkpoint was collected and the \
timestamp in the checkpoint, in milliseconds",
&["pipeline"],
registry,
)
.unwrap(),
collected_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
"indexer_collected_checkpoint_timestamp_lag",
"Difference between the system timestamp when a checkpoint was collected and the \
timestamp in each checkpoint, in seconds",
&["pipeline"],
LAG_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
latest_partially_committed_checkpoint: register_int_gauge_vec_with_registry!(
"indexer_latest_partially_committed_checkpoint",
"Latest checkpoint sequence number partially committed by this collector",
&["pipeline"],
registry,
)
.unwrap(),
latest_partially_committed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
"indexer_latest_partially_committed_checkpoint_timestamp_lag_ms",
"Difference between the system timestamp when the latest checkpoint was partially committed and the \
timestamp in the checkpoint, in milliseconds",
&["pipeline"],
registry,
)
.unwrap(),
partially_committed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
"indexer_partially_committed_checkpoint_timestamp_lag",
"Difference between the system timestamp when a checkpoint was partially committed and the \
timestamp in each checkpoint, in seconds",
&["pipeline"],
LAG_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
latest_watermarked_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
"indexer_latest_watermarked_checkpoint_timestamp_lag_ms",
"Difference between the system timestamp when the latest checkpoint was watermarked and the \
timestamp in the checkpoint, in milliseconds",
&["pipeline"],
registry,
)
.unwrap(),
watermarked_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
"indexer_watermarked_checkpoint_timestamp_lag",
"Difference between the system timestamp when a checkpoint was watermarked and the \
timestamp in each checkpoint, in seconds",
&["pipeline"],
LAG_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
collector_gather_latency: register_histogram_vec_with_registry!(
"indexer_collector_gather_latency",
"Time taken to gather rows into a batch by this collector",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info};

use crate::{
metrics::IndexerMetrics,
metrics::{CheckpointLagMetricReporter, IndexerMetrics},
pipeline::{CommitterConfig, IndexedCheckpoint, WatermarkPart},
};

Expand Down Expand Up @@ -99,6 +99,12 @@ pub(super) fn collector<H: Handler + 'static>(
let mut received: BTreeMap<u64, IndexedCheckpoint<H>> = BTreeMap::new();
let checkpoint_lag = checkpoint_lag.unwrap_or_default();

let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
&metrics.collected_checkpoint_timestamp_lag,
&metrics.latest_collected_checkpoint_timestamp_lag_ms,
&metrics.latest_collected_checkpoint,
);

// Data for checkpoints that are ready to be sent but haven't been written yet.
let mut pending: BTreeMap<u64, PendingCheckpoint<H>> = BTreeMap::new();
let mut pending_rows = 0;
Expand Down Expand Up @@ -128,6 +134,10 @@ pub(super) fn collector<H: Handler + 'static>(
let indexed = entry.get_mut();
indexed.batch_into(&mut batch);
if indexed.is_empty() {
checkpoint_lag_reporter.report_lag(
indexed.watermark.checkpoint(),
indexed.watermark.timestamp_ms(),
);
entry.remove();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};

use crate::{
metrics::IndexerMetrics,
metrics::{CheckpointLagMetricReporter, IndexerMetrics},
pipeline::{logging::WatermarkLogger, CommitterConfig, WatermarkPart, WARN_PENDING_WATERMARKS},
watermarks::CommitterWatermark,
};
Expand Down Expand Up @@ -79,6 +79,12 @@ pub(super) fn commit_watermark<H: Handler + 'static>(
// demonstrate that the pipeline is making progress.
let mut logger = WatermarkLogger::new("concurrent_committer", &watermark);

let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
&metrics.watermarked_checkpoint_timestamp_lag,
&metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
&metrics.watermark_checkpoint_in_db,
);

info!(pipeline = H::NAME, ?watermark, "Starting commit watermark");

loop {
Expand Down Expand Up @@ -202,16 +208,16 @@ pub(super) fn commit_watermark<H: Handler + 'static>(

logger.log::<H>(&watermark, elapsed);

checkpoint_lag_reporter.report_lag(
watermark.checkpoint_hi_inclusive as u64,
watermark.timestamp_ms_hi_inclusive as u64,
);

metrics
.watermark_epoch_in_db
.with_label_values(&[H::NAME])
.set(watermark.epoch_hi_inclusive);

metrics
.watermark_checkpoint_in_db
.with_label_values(&[H::NAME])
.set(watermark.checkpoint_hi_inclusive);

metrics
.watermark_transaction_in_db
.with_label_values(&[H::NAME])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};

use crate::{
metrics::IndexerMetrics,
metrics::{CheckpointLagMetricReporter, IndexerMetrics},
pipeline::{Break, CommitterConfig, WatermarkPart},
task::TrySpawnStreamExt,
};
Expand Down Expand Up @@ -45,6 +45,11 @@ pub(super) fn committer<H: Handler + 'static>(
) -> JoinHandle<()> {
tokio::spawn(async move {
info!(pipeline = H::NAME, "Starting committer");
let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
&metrics.partially_committed_checkpoint_timestamp_lag,
&metrics.latest_partially_committed_checkpoint_timestamp_lag_ms,
&metrics.latest_partially_committed_checkpoint,
);

match ReceiverStream::new(rx)
.try_for_each_spawned(
Expand All @@ -55,6 +60,7 @@ pub(super) fn committer<H: Handler + 'static>(
let db = db.clone();
let metrics = metrics.clone();
let cancel = cancel.clone();
let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();

// Repeatedly try to get a connection to the DB and write the batch. Use an
// exponential backoff in case the failure is due to contention over the DB
Expand All @@ -67,11 +73,16 @@ pub(super) fn committer<H: Handler + 'static>(
..Default::default()
};

let highest_checkpoint = watermark.iter().map(|w| w.checkpoint()).max();
let highest_checkpoint_timestamp =
watermark.iter().map(|w| w.timestamp_ms()).max();

use backoff::Error as BE;
let commit = move || {
let values = values.clone();
let db = db.clone();
let metrics = metrics.clone();
let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
async move {
if values.is_empty() {
return Ok(());
Expand Down Expand Up @@ -112,6 +123,12 @@ pub(super) fn committer<H: Handler + 'static>(
"Wrote batch",
);

checkpoint_lag_reporter.report_lag(
// unwrap is safe because we would have returned if values is empty.
highest_checkpoint.unwrap(),
highest_checkpoint_timestamp.unwrap(),
);

metrics
.total_committer_batches_succeeded
.with_label_values(&[H::NAME])
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-indexer-alt-framework/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ impl WatermarkPart {
self.watermark.checkpoint_hi_inclusive as u64
}

fn timestamp_ms(&self) -> u64 {
self.watermark.timestamp_ms_hi_inclusive as u64
}

/// Check if all the rows from this watermark are represented in this part.
fn is_complete(&self) -> bool {
self.batch_rows == self.total_rows
Expand Down

0 comments on commit b0196d9

Please sign in to comment.