Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc-alt: metrics #20728

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
292 changes: 222 additions & 70 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ members = [
"crates/sui-indexer-alt",
"crates/sui-indexer-alt-framework",
"crates/sui-indexer-alt-jsonrpc",
"crates/sui-indexer-alt-metrics",
"crates/sui-indexer-alt-schema",
"crates/sui-indexer-builder",
"crates/sui-json",
Expand Down Expand Up @@ -408,6 +409,7 @@ once_cell = "1.18.0"
ouroboros = "0.17"
parking_lot = "0.12.1"
parquet = "52"
pin-project-lite = "0.2.13"
pkcs8 = { version = "0.9.0", features = ["std"] }
pprof = { version = "0.14.0", features = ["cpp", "frame-pointer"] }
pretty_assertions = "1.3.0"
Expand Down Expand Up @@ -643,6 +645,7 @@ sui-genesis-builder = { path = "crates/sui-genesis-builder" }
sui-indexer = { path = "crates/sui-indexer" }
sui-indexer-alt-framework = { path = "crates/sui-indexer-alt-framework" }
sui-indexer-alt-jsonrpc = { path = "crates/sui-indexer-alt-jsonrpc" }
sui-indexer-alt-metrics = { path = "crates/sui-indexer-alt-metrics" }
sui-indexer-alt-schema = { path = "crates/sui-indexer-alt-schema" }
sui-indexer-builder = { path = "crates/sui-indexer-builder" }
sui-json = { path = "crates/sui-json" }
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer-alt-framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ tracing.workspace = true
url.workspace = true

sui-field-count.workspace = true
sui-indexer-alt-metrics.workspace = true
sui-pg-db.workspace = true
sui-storage.workspace = true
sui-types.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub(crate) mod tests {
use crate::ingestion::client::IngestionClient;
use crate::ingestion::test_utils::test_checkpoint_data;
use crate::metrics::tests::test_metrics;
use std::sync::Arc;
use sui_storage::blob::{Blob, BlobEncoding};
use tokio_util::sync::CancellationToken;

Expand All @@ -51,8 +50,7 @@ pub(crate) mod tests {
let test_checkpoint = test_checkpoint_data(1);
tokio::fs::write(&path, &test_checkpoint).await.unwrap();

let metrics = Arc::new(test_metrics());
let local_client = IngestionClient::new_local(tempdir, metrics);
let local_client = IngestionClient::new_local(tempdir, test_metrics());
let checkpoint = local_client
.fetch(1, &CancellationToken::new())
.await
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt-framework/src/ingestion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ mod tests {
ingest_concurrency,
..Default::default()
},
Arc::new(test_metrics()),
test_metrics(),
cancel,
)
.unwrap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub(crate) mod tests {
use crate::ingestion::test_utils::test_checkpoint_data;
use crate::metrics::tests::test_metrics;
use axum::http::StatusCode;
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use tokio_util::sync::CancellationToken;
use wiremock::{
matchers::{method, path_regex},
Expand All @@ -133,7 +133,7 @@ pub(crate) mod tests {
}

fn remote_test_client(uri: String) -> IngestionClient {
IngestionClient::new_remote(Url::parse(&uri).unwrap(), Arc::new(test_metrics())).unwrap()
IngestionClient::new_remote(Url::parse(&uri).unwrap(), test_metrics()).unwrap()
}

fn assert_http_error(error: Error, checkpoint: u64, code: StatusCode) {
Expand Down
49 changes: 11 additions & 38 deletions crates/sui-indexer-alt-framework/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::BTreeSet, net::SocketAddr, sync::Arc};
use std::{collections::BTreeSet, sync::Arc};

use anyhow::{ensure, Context, Result};
use diesel::{
Expand All @@ -10,12 +10,14 @@ use diesel::{
};
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
use ingestion::{client::IngestionClient, ClientArgs, IngestionConfig, IngestionService};
use metrics::{IndexerMetrics, MetricsService};
use metrics::IndexerMetrics;
use pipeline::{
concurrent::{self, ConcurrentConfig},
sequential::{self, SequentialConfig},
Processor,
};
use prometheus::Registry;
use sui_indexer_alt_metrics::db::DbConnectionStatsCollector;
use sui_pg_db::{Db, DbArgs};
use task::graceful_shutdown;
use tokio::task::JoinHandle;
Expand All @@ -33,7 +35,7 @@ pub(crate) mod watermarks;
const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");

/// Command-line arguments for the indexer
#[derive(clap::Args, Debug, Clone)]
#[derive(clap::Args, Default, Debug, Clone)]
pub struct IndexerArgs {
/// Override for the checkpoint to start ingestion from -- useful for backfills. By default,
/// ingestion will start just after the lowest checkpoint watermark across all active
Expand All @@ -54,10 +56,6 @@ pub struct IndexerArgs {
/// Don't write to the watermark tables for concurrent pipelines.
#[arg(long)]
pub skip_watermark: bool,

/// Address to serve Prometheus Metrics from.
#[arg(long, default_value_t = Self::default().metrics_address)]
pub metrics_address: SocketAddr,
}

pub struct Indexer {
Expand All @@ -67,9 +65,6 @@ pub struct Indexer {
/// Prometheus Metrics.
metrics: Arc<IndexerMetrics>,

/// Service for serving Prometheis metrics.
metrics_service: MetricsService,

/// Service for downloading and disseminating checkpoint data.
ingestion_service: IngestionService,

Expand Down Expand Up @@ -125,14 +120,14 @@ impl Indexer {
client_args: ClientArgs,
ingestion_config: IngestionConfig,
migrations: &'static EmbeddedMigrations,
registry: &Registry,
cancel: CancellationToken,
) -> Result<Self> {
let IndexerArgs {
first_checkpoint,
last_checkpoint,
pipeline,
skip_watermark,
metrics_address,
} = indexer_args;

let db = Db::for_write(db_args)
Expand All @@ -144,8 +139,11 @@ impl Indexer {
.await
.context("Failed to run pending migrations")?;

let (metrics, metrics_service) =
MetricsService::new(metrics_address, db.clone(), cancel.clone())?;
let metrics = IndexerMetrics::new(registry);
registry.register(Box::new(DbConnectionStatsCollector::new(
Some("indexer_db"),
db.clone(),
)))?;

let ingestion_service = IngestionService::new(
client_args,
Expand All @@ -157,7 +155,6 @@ impl Indexer {
Ok(Self {
db,
metrics,
metrics_service,
ingestion_service,
first_checkpoint,
last_checkpoint,
Expand Down Expand Up @@ -301,12 +298,6 @@ impl Indexer {
);
}

let metrics_handle = self
.metrics_service
.run()
.await
.context("Failed to start metrics service")?;

// If an override has been provided, start ingestion from there, otherwise start ingestion
// from just after the lowest committer watermark across all enabled pipelines.
let first_checkpoint = self
Expand All @@ -326,19 +317,13 @@ impl Indexer {
self.handles.push(regulator_handle);
self.handles.push(broadcaster_handle);

let cancel = self.cancel.clone();
Ok(tokio::spawn(async move {
// Wait for the ingestion service and all its related tasks to wind down gracefully:
// If ingestion has been configured to only handle a specific range of checkpoints, we
// want to make sure that tasks are allowed to run to completion before shutting them
// down.
graceful_shutdown(self.handles, self.cancel).await;

info!("Indexing pipeline gracefully shut down");

// Pick off any stragglers (in this case, just the metrics service).
cancel.cancel();
metrics_handle.await.unwrap();
}))
}

Expand Down Expand Up @@ -396,15 +381,3 @@ impl Indexer {
Ok(Some(watermark))
}
}

impl Default for IndexerArgs {
fn default() -> Self {
Self {
first_checkpoint: None,
last_checkpoint: None,
pipeline: vec![],
skip_watermark: false,
metrics_address: "0.0.0.0:9184".parse().unwrap(),
}
}
}
Loading
Loading