Skip to content

Commit

Permalink
fix(storage-provider): multiple sectors, multiple cache dirs
Browse files Browse the repository at this point in the history
  • Loading branch information
th7nder committed Dec 17, 2024
1 parent 48893f3 commit 0cb161b
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 12 deletions.
8 changes: 4 additions & 4 deletions lib/polka-storage-proofs/src/post/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::BTreeMap,
path::{Path, PathBuf},
path::PathBuf,
};

use bellperson::groth16;
Expand Down Expand Up @@ -54,20 +54,20 @@ pub struct ReplicaInfo {
pub sector_id: SectorNumber,
pub comm_r: Commitment,
pub replica_path: PathBuf,
pub cache_path: PathBuf,
}

/// Generates Windowed PoSt for a replica.
/// Only supports 2KiB sectors.
///
/// References:
/// * <https://github.com/filecoin-project/rust-fil-proofs/blob/5a0523ae1ddb73b415ce2fa819367c7989aaf73f/filecoin-proofs/src/api/window_post.rs#L100>
pub fn generate_window_post<CacheDirectory: AsRef<Path>>(
pub fn generate_window_post(
proof_type: RegisteredPoStProof,
groth_params: &groth16::MappedParameters<Bls12>,
randomness: Ticket,
prover_id: ProverId,
partition_replicas: Vec<ReplicaInfo>,
cache_dir: CacheDirectory,
) -> Result<Vec<groth16::Proof<Bls12>>, PoStError> {
type Tree = SectorShapeBase;

Expand All @@ -79,7 +79,7 @@ pub fn generate_window_post<CacheDirectory: AsRef<Path>>(
PrivateReplicaInfo::<Tree>::new(
replica.replica_path,
replica.comm_r,
cache_dir.as_ref().to_path_buf(),
replica.cache_path,
)?,
);
}
Expand Down
2 changes: 1 addition & 1 deletion storage-provider/client/src/commands/proofs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ impl ProofsCommand {
.try_into()
.map_err(|_| UtilsCommandError::CommRError)?,
replica_path,
cache_path: cache_directory,
}];

println!("Loading parameters...");
Expand All @@ -465,7 +466,6 @@ impl ProofsCommand {
randomness,
prover_id,
replicas,
cache_directory,
)
.map_err(|e| UtilsCommandError::GeneratePoStError(e))?;

Expand Down
16 changes: 9 additions & 7 deletions storage-provider/server/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,20 +384,22 @@ async fn precommit(
&entropy,
);

let cache_path = state.sealing_cache_dir.join(sector_number.to_string());
let sealed_path = state.sealed_sectors_dir.join(sector_number.to_string());
tokio::fs::create_dir_all(&cache_path).await?;
tokio::fs::File::create_new(&sealed_path).await?;

// TODO(@th7nder,31/10/2024): what happens if some of the process fails? SP will be slashed, and there is no error reporting? what about retries?
let sealing_handle: JoinHandle<Result<PreCommitOutput, _>> = {
let prover_id = derive_prover_id(state.xt_keypair.account_id());
let cache_dir = state.sealing_cache_dir.clone();
let cache_dir = cache_path.clone();
let unsealed_path = sector.unsealed_path.clone();
let sealed_path = sealed_path.clone();

let piece_infos = sector.piece_infos.clone();
tokio::task::spawn_blocking(move || {
sealer.precommit_sector(
cache_dir.as_ref(),
cache_dir,
unsealed_path,
sealed_path,
prover_id,
Expand Down Expand Up @@ -453,6 +455,7 @@ async fn precommit(

let sector = PreCommittedSector::create(
sector,
cache_path,
sealed_path,
sealing_output_commr,
sealing_output_commd,
Expand Down Expand Up @@ -543,14 +546,14 @@ async fn prove_commit(

let sealing_handle: JoinHandle<Result<Vec<BlstrsProof>, _>> = {
let porep_params = state.porep_parameters.clone();
let cache_dir = state.sealing_cache_dir.clone();
let cache_dir = sector.cache_path.clone();
let sealed_path = sector.sealed_path.clone();
let piece_infos = sector.piece_infos.clone();

tokio::task::spawn_blocking(move || {
sealer.prove_sector(
porep_params.as_ref(),
cache_dir.as_ref(),
cache_dir,
sealed_path,
prover_id,
sector_number,
Expand Down Expand Up @@ -695,6 +698,7 @@ async fn submit_windowed_post(
replicas.push(ReplicaInfo {
sector_id: *sector_number,
comm_r: sector.comm_r.raw(),
cache_path: sector.cache_path.clone(),
replica_path: sector.sealed_path.clone(),
});
}
Expand All @@ -703,7 +707,6 @@ async fn submit_windowed_post(
tracing::info!("Proving PoSt partitions... {:?}", partitions);
let handle: JoinHandle<Result<Vec<BlstrsProof>, _>> = {
let post_params = state.post_parameters.clone();
let cache_dir = state.sealing_cache_dir.clone();
let post_proof = state.server_info.post_proof;

tokio::task::spawn_blocking(move || {
Expand All @@ -713,7 +716,6 @@ async fn submit_windowed_post(
randomness,
prover_id,
replicas,
cache_dir.as_ref(),
)
})
};
Expand Down Expand Up @@ -767,7 +769,7 @@ async fn submit_windowed_post(

#[tracing::instrument(skip_all)]
async fn schedule_posts(state: Arc<PipelineState>) -> Result<(), PipelineError> {
let proving_period = state.xt_client.proving_period_info()?;
let proving_period = state.xt_client.proving_period_info().await?;

for deadline_index in 0..proving_period.deadlines {
schedule_post(state.clone(), deadline_index)?;
Expand Down
11 changes: 11 additions & 0 deletions storage-provider/server/src/pipeline/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ pub struct PreCommittedSector {
/// Tracks all of the deals that have been added to the sector.
pub deals: Vec<(DealId, DealProposal)>,

/// Cache directory of the sector.
/// Each sector needs to have it's cache directory in a different place, because `p_aux` and `t_aux` are stored there.
pub cache_path: std::path::PathBuf,

/// Path of an existing file where the sealed sector data is stored.
///
/// File at this path is initially created by [`Sector::create`], however it's empty.
Expand Down Expand Up @@ -154,6 +158,7 @@ impl PreCommittedSector {
/// Should only be called after sealing and pre-commit process has ended.
pub async fn create(
unsealed: UnsealedSector,
cache_path: std::path::PathBuf,
sealed_path: std::path::PathBuf,
comm_r: Commitment<CommR>,
comm_d: Commitment<CommD>,
Expand All @@ -166,6 +171,7 @@ impl PreCommittedSector {
sector_number: unsealed.sector_number,
piece_infos: unsealed.piece_infos,
deals: unsealed.deals,
cache_path,
sealed_path,
comm_r,
comm_d,
Expand All @@ -190,6 +196,10 @@ pub struct ProvenSector {
/// Tracks all of the deals that have been added to the sector.
pub deals: Vec<(DealId, DealProposal)>,

/// Cache directory of the sector.
/// Each sector needs to have it's cache directory in a different place, because `p_aux` and `t_aux` are stored there.
pub cache_path: std::path::PathBuf,

/// Path of an existing file where the sealed sector data is stored.
pub sealed_path: std::path::PathBuf,

Expand All @@ -207,6 +217,7 @@ impl ProvenSector {
sector_number: sector.sector_number,
piece_infos: sector.piece_infos,
deals: sector.deals,
cache_path: sector.cache_path,
sealed_path: sector.sealed_path,
comm_r: sector.comm_r,
comm_d: sector.comm_d,
Expand Down

0 comments on commit 0cb161b

Please sign in to comment.