From ab089b763839a29e289249465afc476549fc4d72 Mon Sep 17 00:00:00 2001 From: Ge Gao Date: Wed, 18 Dec 2024 16:05:36 -0500 Subject: [PATCH] comments, round 3 --- .../sui-indexer-alt-restorer/src/archives.rs | 81 ++++++++++--------- crates/sui-indexer-alt-restorer/src/lib.rs | 5 +- 2 files changed, 48 insertions(+), 38 deletions(-) diff --git a/crates/sui-indexer-alt-restorer/src/archives.rs b/crates/sui-indexer-alt-restorer/src/archives.rs index d8849d8891a9ef..3c06561f706f83 100644 --- a/crates/sui-indexer-alt-restorer/src/archives.rs +++ b/crates/sui-indexer-alt-restorer/src/archives.rs @@ -16,44 +16,53 @@ use crate::Args; #[derive(Clone, Debug)] pub(crate) struct ArchivalCheckpointInfo { pub next_checkpoint_after_epoch: u64, + // TODO(gegaowp): unused for now, will be used for kv_genesis. #[allow(unused)] pub chain_identifier: CheckpointDigest, } -pub(crate) async fn read_archival_checkpoint_info( - args: &Args, -) -> Result { - let archive_store_config = ObjectStoreConfig { - object_store: Some(ObjectStoreType::GCS), - bucket: Some(args.archive_bucket.clone()), - object_store_connection_limit: args.concurrency, - no_sign_request: false, - ..Default::default() - }; - let archive_reader_config = ArchiveReaderConfig { - remote_store_config: archive_store_config, - download_concurrency: NonZeroUsize::new(args.concurrency).unwrap(), - use_for_pruning_watermark: false, - }; - let metrics = ArchiveReaderMetrics::new(&Registry::default()); - let archive_reader = ArchiveReader::new(archive_reader_config, &metrics)?; - archive_reader.sync_manifest_once().await?; - let manifest = archive_reader.get_manifest().await?; - let next_checkpoint_after_epoch = manifest.next_checkpoint_after_epoch(args.start_epoch); - info!( - epoch = args.start_epoch, - checkpoint = next_checkpoint_after_epoch, - "Next checkpoint after epoch", - ); - let cp_summaries = archive_reader - .get_summaries_for_list_no_verify(vec![0]) - .await?; - let first_cp = cp_summaries - .first() - .ok_or_else(|| anyhow::anyhow!("No checkpoint found"))?; - let chain_identifier = *first_cp.digest(); - Ok(ArchivalCheckpointInfo { - next_checkpoint_after_epoch, - chain_identifier, - }) +impl ArchivalCheckpointInfo { + /// Reads checkpoint information from archival storage to determine: + /// - The next checkpoint number after `start_epoch` for watermarking; + /// - The genesis checkpoint digest for kv_genesis. + pub async fn read_archival_checkpoint_info(args: &Args) -> anyhow::Result { + // Configure GCS object store and initialize archive reader with minimal concurrency. + let archive_store_config = ObjectStoreConfig { + object_store: Some(ObjectStoreType::GCS), + bucket: Some(args.archive_bucket.clone()), + object_store_connection_limit: 1, // 1 connection is sufficient + no_sign_request: false, + ..Default::default() + }; + let archive_reader_config = ArchiveReaderConfig { + remote_store_config: archive_store_config, + download_concurrency: NonZeroUsize::new(1).unwrap(), + use_for_pruning_watermark: false, + }; + let metrics = ArchiveReaderMetrics::new(&Registry::default()); + let archive_reader = ArchiveReader::new(archive_reader_config, &metrics)?; + + // Sync and read manifest to get next checkpoint after `start_epoch` for watermarking. + archive_reader.sync_manifest_once().await?; + let manifest = archive_reader.get_manifest().await?; + let next_checkpoint_after_epoch = manifest.next_checkpoint_after_epoch(args.start_epoch); + info!( + epoch = args.start_epoch, + checkpoint = next_checkpoint_after_epoch, + "Next checkpoint after epoch", + ); + + // Get genesis checkpoint (checkpoint 0) to determine chain identifier for kv_genesis. + let cp_summaries = archive_reader + .get_summaries_for_list_no_verify(vec![0]) + .await?; + let first_cp = cp_summaries + .first() + .ok_or_else(|| anyhow::anyhow!("No checkpoint found"))?; + let chain_identifier = *first_cp.digest(); + Ok(Self { + next_checkpoint_after_epoch, + chain_identifier, + }) + } } diff --git a/crates/sui-indexer-alt-restorer/src/lib.rs b/crates/sui-indexer-alt-restorer/src/lib.rs index 85be13df104a81..cc06195e5bf5cb 100644 --- a/crates/sui-indexer-alt-restorer/src/lib.rs +++ b/crates/sui-indexer-alt-restorer/src/lib.rs @@ -4,7 +4,7 @@ mod archives; mod snapshot; -use archives::read_archival_checkpoint_info; +use archives::ArchivalCheckpointInfo; use clap::Parser; use sui_pg_db::DbArgs; @@ -43,7 +43,8 @@ pub struct Args { } pub async fn restore(args: &Args) -> anyhow::Result<()> { - let archival_checkpoint_info = read_archival_checkpoint_info(args).await?; + let archival_checkpoint_info = + ArchivalCheckpointInfo::read_archival_checkpoint_info(args).await?; let mut snapshot_restorer = SnapshotRestorer::new(args, archival_checkpoint_info.next_checkpoint_after_epoch).await?; snapshot_restorer.restore().await?;