From 886d14bb3902a587959f86030a314dccc235f5ef Mon Sep 17 00:00:00 2001 From: Konrad Stepniak Date: Tue, 17 Dec 2024 17:03:28 +0100 Subject: [PATCH] fix(storage-provider): multiple sectors, multiple cache dirs --- lib/polka-storage-proofs/src/post/mod.rs | 8 ++++---- storage-provider/client/src/commands/proofs.rs | 2 +- storage-provider/server/src/pipeline/mod.rs | 16 +++++++++------- storage-provider/server/src/pipeline/types.rs | 11 +++++++++++ 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/lib/polka-storage-proofs/src/post/mod.rs b/lib/polka-storage-proofs/src/post/mod.rs index 9efe4376..a62ee7d3 100644 --- a/lib/polka-storage-proofs/src/post/mod.rs +++ b/lib/polka-storage-proofs/src/post/mod.rs @@ -1,6 +1,6 @@ use std::{ collections::BTreeMap, - path::{Path, PathBuf}, + path::PathBuf, }; use bellperson::groth16; @@ -54,6 +54,7 @@ pub struct ReplicaInfo { pub sector_id: SectorNumber, pub comm_r: Commitment, pub replica_path: PathBuf, + pub cache_path: PathBuf, } /// Generates Windowed PoSt for a replica. @@ -61,13 +62,12 @@ pub struct ReplicaInfo { /// /// References: /// * -pub fn generate_window_post>( +pub fn generate_window_post( proof_type: RegisteredPoStProof, groth_params: &groth16::MappedParameters, randomness: Ticket, prover_id: ProverId, partition_replicas: Vec, - cache_dir: CacheDirectory, ) -> Result>, PoStError> { type Tree = SectorShapeBase; @@ -79,7 +79,7 @@ pub fn generate_window_post>( PrivateReplicaInfo::::new( replica.replica_path, replica.comm_r, - cache_dir.as_ref().to_path_buf(), + replica.cache_path, )?, ); } diff --git a/storage-provider/client/src/commands/proofs.rs b/storage-provider/client/src/commands/proofs.rs index 24604c86..130b6d34 100644 --- a/storage-provider/client/src/commands/proofs.rs +++ b/storage-provider/client/src/commands/proofs.rs @@ -452,6 +452,7 @@ impl ProofsCommand { .try_into() .map_err(|_| UtilsCommandError::CommRError)?, replica_path, + cache_path: cache_directory, }]; println!("Loading parameters..."); @@ -465,7 +466,6 @@ impl ProofsCommand { randomness, prover_id, replicas, - cache_directory, ) .map_err(|e| UtilsCommandError::GeneratePoStError(e))?; diff --git a/storage-provider/server/src/pipeline/mod.rs b/storage-provider/server/src/pipeline/mod.rs index 83cda540..97bc71f3 100644 --- a/storage-provider/server/src/pipeline/mod.rs +++ b/storage-provider/server/src/pipeline/mod.rs @@ -384,20 +384,22 @@ async fn precommit( &entropy, ); + let cache_path = state.sealing_cache_dir.join(sector_number.to_string()); let sealed_path = state.sealed_sectors_dir.join(sector_number.to_string()); + tokio::fs::create_dir_all(&cache_path).await?; tokio::fs::File::create_new(&sealed_path).await?; // TODO(@th7nder,31/10/2024): what happens if some of the process fails? SP will be slashed, and there is no error reporting? what about retries? let sealing_handle: JoinHandle> = { let prover_id = derive_prover_id(state.xt_keypair.account_id()); - let cache_dir = state.sealing_cache_dir.clone(); + let cache_dir = cache_path.clone(); let unsealed_path = sector.unsealed_path.clone(); let sealed_path = sealed_path.clone(); let piece_infos = sector.piece_infos.clone(); tokio::task::spawn_blocking(move || { sealer.precommit_sector( - cache_dir.as_ref(), + cache_dir, unsealed_path, sealed_path, prover_id, @@ -453,6 +455,7 @@ async fn precommit( let sector = PreCommittedSector::create( sector, + cache_path, sealed_path, sealing_output_commr, sealing_output_commd, @@ -543,14 +546,14 @@ async fn prove_commit( let sealing_handle: JoinHandle, _>> = { let porep_params = state.porep_parameters.clone(); - let cache_dir = state.sealing_cache_dir.clone(); + let cache_dir = sector.cache_path.clone(); let sealed_path = sector.sealed_path.clone(); let piece_infos = sector.piece_infos.clone(); tokio::task::spawn_blocking(move || { sealer.prove_sector( porep_params.as_ref(), - cache_dir.as_ref(), + cache_dir, sealed_path, prover_id, sector_number, @@ -695,6 +698,7 @@ async fn submit_windowed_post( replicas.push(ReplicaInfo { sector_id: *sector_number, comm_r: sector.comm_r.raw(), + cache_path: sector.cache_path.clone(), replica_path: sector.sealed_path.clone(), }); } @@ -703,7 +707,6 @@ async fn submit_windowed_post( tracing::info!("Proving PoSt partitions... {:?}", partitions); let handle: JoinHandle, _>> = { let post_params = state.post_parameters.clone(); - let cache_dir = state.sealing_cache_dir.clone(); let post_proof = state.server_info.post_proof; tokio::task::spawn_blocking(move || { @@ -713,7 +716,6 @@ async fn submit_windowed_post( randomness, prover_id, replicas, - cache_dir.as_ref(), ) }) }; @@ -767,7 +769,7 @@ async fn submit_windowed_post( #[tracing::instrument(skip_all)] async fn schedule_posts(state: Arc) -> Result<(), PipelineError> { - let proving_period = state.xt_client.proving_period_info()?; + let proving_period = state.xt_client.proving_period_info().await?; for deadline_index in 0..proving_period.deadlines { schedule_post(state.clone(), deadline_index)?; diff --git a/storage-provider/server/src/pipeline/types.rs b/storage-provider/server/src/pipeline/types.rs index b09eb3fc..1e5f657b 100644 --- a/storage-provider/server/src/pipeline/types.rs +++ b/storage-provider/server/src/pipeline/types.rs @@ -99,6 +99,10 @@ pub struct PreCommittedSector { /// Tracks all of the deals that have been added to the sector. pub deals: Vec<(DealId, DealProposal)>, + /// Cache directory of the sector. + /// Each sector needs to have it's cache directory in a different place, because `p_aux` and `t_aux` are stored there. + pub cache_path: std::path::PathBuf, + /// Path of an existing file where the sealed sector data is stored. /// /// File at this path is initially created by [`Sector::create`], however it's empty. @@ -154,6 +158,7 @@ impl PreCommittedSector { /// Should only be called after sealing and pre-commit process has ended. pub async fn create( unsealed: UnsealedSector, + cache_path: std::path::PathBuf, sealed_path: std::path::PathBuf, comm_r: Commitment, comm_d: Commitment, @@ -166,6 +171,7 @@ impl PreCommittedSector { sector_number: unsealed.sector_number, piece_infos: unsealed.piece_infos, deals: unsealed.deals, + cache_path, sealed_path, comm_r, comm_d, @@ -190,6 +196,10 @@ pub struct ProvenSector { /// Tracks all of the deals that have been added to the sector. pub deals: Vec<(DealId, DealProposal)>, + /// Cache directory of the sector. + /// Each sector needs to have it's cache directory in a different place, because `p_aux` and `t_aux` are stored there. + pub cache_path: std::path::PathBuf, + /// Path of an existing file where the sealed sector data is stored. pub sealed_path: std::path::PathBuf, @@ -207,6 +217,7 @@ impl ProvenSector { sector_number: sector.sector_number, piece_infos: sector.piece_infos, deals: sector.deals, + cache_path: sector.cache_path, sealed_path: sector.sealed_path, comm_r: sector.comm_r, comm_d: sector.comm_d,