Skip to content

Commit

Permalink
comments, round 3
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Dec 18, 2024
1 parent 65b5b26 commit ab089b7
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 38 deletions.
81 changes: 45 additions & 36 deletions crates/sui-indexer-alt-restorer/src/archives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,53 @@ use crate::Args;
#[derive(Clone, Debug)]
pub(crate) struct ArchivalCheckpointInfo {
pub next_checkpoint_after_epoch: u64,
// TODO(gegaowp): unused for now, will be used for kv_genesis.
#[allow(unused)]
pub chain_identifier: CheckpointDigest,
}

pub(crate) async fn read_archival_checkpoint_info(
args: &Args,
) -> Result<ArchivalCheckpointInfo, anyhow::Error> {
let archive_store_config = ObjectStoreConfig {
object_store: Some(ObjectStoreType::GCS),
bucket: Some(args.archive_bucket.clone()),
object_store_connection_limit: args.concurrency,
no_sign_request: false,
..Default::default()
};
let archive_reader_config = ArchiveReaderConfig {
remote_store_config: archive_store_config,
download_concurrency: NonZeroUsize::new(args.concurrency).unwrap(),
use_for_pruning_watermark: false,
};
let metrics = ArchiveReaderMetrics::new(&Registry::default());
let archive_reader = ArchiveReader::new(archive_reader_config, &metrics)?;
archive_reader.sync_manifest_once().await?;
let manifest = archive_reader.get_manifest().await?;
let next_checkpoint_after_epoch = manifest.next_checkpoint_after_epoch(args.start_epoch);
info!(
epoch = args.start_epoch,
checkpoint = next_checkpoint_after_epoch,
"Next checkpoint after epoch",
);
let cp_summaries = archive_reader
.get_summaries_for_list_no_verify(vec![0])
.await?;
let first_cp = cp_summaries
.first()
.ok_or_else(|| anyhow::anyhow!("No checkpoint found"))?;
let chain_identifier = *first_cp.digest();
Ok(ArchivalCheckpointInfo {
next_checkpoint_after_epoch,
chain_identifier,
})
impl ArchivalCheckpointInfo {
/// Reads checkpoint information from archival storage to determine:
/// - The next checkpoint number after `start_epoch` for watermarking;
/// - The genesis checkpoint digest for kv_genesis.
pub async fn read_archival_checkpoint_info(args: &Args) -> anyhow::Result<Self> {
// Configure GCS object store and initialize archive reader with minimal concurrency.
let archive_store_config = ObjectStoreConfig {
object_store: Some(ObjectStoreType::GCS),
bucket: Some(args.archive_bucket.clone()),
object_store_connection_limit: 1, // 1 connection is sufficient
no_sign_request: false,
..Default::default()
};
let archive_reader_config = ArchiveReaderConfig {
remote_store_config: archive_store_config,
download_concurrency: NonZeroUsize::new(1).unwrap(),
use_for_pruning_watermark: false,
};
let metrics = ArchiveReaderMetrics::new(&Registry::default());
let archive_reader = ArchiveReader::new(archive_reader_config, &metrics)?;

// Sync and read manifest to get next checkpoint after `start_epoch` for watermarking.
archive_reader.sync_manifest_once().await?;
let manifest = archive_reader.get_manifest().await?;
let next_checkpoint_after_epoch = manifest.next_checkpoint_after_epoch(args.start_epoch);
info!(
epoch = args.start_epoch,
checkpoint = next_checkpoint_after_epoch,
"Next checkpoint after epoch",
);

// Get genesis checkpoint (checkpoint 0) to determine chain identifier for kv_genesis.
let cp_summaries = archive_reader
.get_summaries_for_list_no_verify(vec![0])
.await?;
let first_cp = cp_summaries
.first()
.ok_or_else(|| anyhow::anyhow!("No checkpoint found"))?;
let chain_identifier = *first_cp.digest();
Ok(Self {
next_checkpoint_after_epoch,
chain_identifier,
})
}
}
5 changes: 3 additions & 2 deletions crates/sui-indexer-alt-restorer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
mod archives;
mod snapshot;

use archives::read_archival_checkpoint_info;
use archives::ArchivalCheckpointInfo;
use clap::Parser;
use sui_pg_db::DbArgs;

Expand Down Expand Up @@ -43,7 +43,8 @@ pub struct Args {
}

pub async fn restore(args: &Args) -> anyhow::Result<()> {
let archival_checkpoint_info = read_archival_checkpoint_info(args).await?;
let archival_checkpoint_info =
ArchivalCheckpointInfo::read_archival_checkpoint_info(args).await?;
let mut snapshot_restorer =
SnapshotRestorer::new(args, archival_checkpoint_info.next_checkpoint_after_epoch).await?;
snapshot_restorer.restore().await?;
Expand Down

0 comments on commit ab089b7

Please sign in to comment.