Skip to content

Commit

Permalink
feat: storage needs split by the first 3 characters of task id(sha256…
Browse files Browse the repository at this point in the history
…) to avoid too many files in one directory (#792)

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Oct 21, 2024
1 parent 07c9cb4 commit 1a85ccc
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 25 deletions.
98 changes: 76 additions & 22 deletions dragonfly-client-storage/src/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@ use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, SeekFrom
use tokio_util::io::InspectReader;
use tracing::{error, info, instrument, warn};

/// DEFAULT_DIR_NAME is the default directory name to store content.
const DEFAULT_DIR_NAME: &str = "content";
/// DEFAULT_CONTENT_DIR is the default directory for store content.
pub const DEFAULT_CONTENT_DIR: &str = "content";

/// DEFAULT_TASK_DIR is the default directory for store task.
pub const DEFAULT_TASK_DIR: &str = "tasks";

/// DEFAULT_PERSISTENT_CACHE_TASK_DIR is the default directory for store persistent cache task.
pub const DEFAULT_PERSISTENT_CACHE_TASK_DIR: &str = "persistent-cache-tasks";

/// Content is the content of a piece.
pub struct Content {
Expand Down Expand Up @@ -60,7 +66,7 @@ impl Content {
/// new returns a new content.
#[instrument(skip_all)]
pub async fn new(config: Arc<Config>, dir: &Path) -> Result<Content> {
let dir = dir.join(DEFAULT_DIR_NAME);
let dir = dir.join(DEFAULT_CONTENT_DIR);

// If the storage is not kept, remove the directory.
if !config.storage.keep {
Expand All @@ -69,9 +75,9 @@ impl Content {
});
}

fs::create_dir_all(&dir).await?;
fs::create_dir_all(&dir.join(DEFAULT_TASK_DIR)).await?;
fs::create_dir_all(&dir.join(DEFAULT_PERSISTENT_CACHE_TASK_DIR)).await?;
info!("content initialized directory: {:?}", dir);

Ok(Content { config, dir })
}

Expand All @@ -83,7 +89,7 @@ impl Content {
to: &Path,
range: Option<Range>,
) -> Result<()> {
let task_path = self.dir.join(task.id.as_str());
let task_path = self.get_task_path(task.id.as_str());

// Copy the task content to the destination by range
// if the range is specified.
Expand Down Expand Up @@ -147,7 +153,7 @@ impl Content {
/// hard_link_task hard links the task content.
#[instrument(skip_all)]
async fn hard_link_task(&self, task_id: &str, link: &Path) -> Result<()> {
fs::hard_link(self.dir.join(task_id), link).await?;
fs::hard_link(self.get_task_path(task_id), link).await?;
Ok(())
}

Expand All @@ -164,7 +170,7 @@ impl Content {
}
}

fs::copy(self.dir.join(task_id), to).await?;
fs::copy(self.get_task_path(task_id), to).await?;
Ok(())
}

Expand All @@ -181,7 +187,7 @@ impl Content {
}
}

let mut from_f = File::open(self.dir.join(task_id)).await?;
let mut from_f = File::open(self.get_task_path(task_id)).await?;
from_f.seek(SeekFrom::Start(range.start)).await?;
let range_reader = from_f.take(range.length);

Expand All @@ -203,7 +209,7 @@ impl Content {
/// read_task reads the task content by range.
#[instrument(skip_all)]
pub async fn read_task_by_range(&self, task_id: &str, range: Range) -> Result<impl AsyncRead> {
let task_path = self.dir.join(task_id);
let task_path = self.get_task_path(task_id);
let mut from_f = File::open(task_path.as_path()).await.map_err(|err| {
error!("open {:?} failed: {}", task_path, err);
err
Expand All @@ -225,7 +231,7 @@ impl Content {
#[instrument(skip_all)]
pub async fn delete_task(&self, task_id: &str) -> Result<()> {
info!("delete task content: {}", task_id);
let task_path = self.dir.join(task_id);
let task_path = self.get_task_path(task_id);
fs::remove_file(task_path.as_path()).await.map_err(|err| {
error!("remove {:?} failed: {}", task_path, err);
err
Expand All @@ -242,7 +248,7 @@ impl Content {
length: u64,
range: Option<Range>,
) -> Result<impl AsyncRead> {
let task_path = self.dir.join(task_id);
let task_path = self.get_task_path(task_id);
if let Some(range) = range {
let target_offset = max(offset, range.start);
let target_length =
Expand Down Expand Up @@ -282,8 +288,6 @@ impl Content {
offset: u64,
reader: &mut R,
) -> Result<WritePieceResponse> {
let task_path = self.dir.join(task_id);

// Use a buffer to read the piece.
let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader);

Expand All @@ -296,6 +300,7 @@ impl Content {
});

// Open the file and seek to the offset.
let task_path = self.create_or_get_task_path(task_id).await?;
let mut f = OpenOptions::new()
.create(true)
.truncate(false)
Expand Down Expand Up @@ -326,6 +331,29 @@ impl Content {
})
}

/// get_task_path returns the task path by task id.
#[instrument(skip_all)]
fn get_task_path(&self, task_id: &str) -> PathBuf {
// The task needs split by the first 3 characters of task id(sha256) to
// avoid too many files in one directory.
self.dir
.join(DEFAULT_TASK_DIR)
.join(&task_id[..3])
.join(task_id)
}

/// create_or_get_task_path creates parent directories or returns the task path by task id.
#[instrument(skip_all)]
async fn create_or_get_task_path(&self, task_id: &str) -> Result<PathBuf> {
let task_dir = self.dir.join(DEFAULT_TASK_DIR).join(&task_id[..3]);
fs::create_dir_all(&task_dir).await.map_err(|err| {
error!("create {:?} failed: {}", task_dir, err);
err
})?;

Ok(task_dir.join(task_id))
}

/// hard_link_or_copy_persistent_cache_task hard links or copies the task content to the destination.
#[instrument(skip_all)]
pub async fn hard_link_or_copy_persistent_cache_task(
Expand All @@ -344,7 +372,7 @@ impl Content {
}

// Get the persistent cache task path.
let task_path = self.dir.join(task.id.as_str());
let task_path = self.get_persistent_cache_task_path(task.id.as_str());

// If the hard link fails, copy the task content to the destination.
fs::remove_file(to).await.unwrap_or_else(|err| {
Expand Down Expand Up @@ -400,7 +428,9 @@ impl Content {
hasher.update(bytes);
});

let task_path = self.dir.join(task_id);
let task_path = self
.create_or_get_persistent_cache_task_path(task_id)
.await?;
let mut to_f = OpenOptions::new()
.create(true)
.truncate(true)
Expand Down Expand Up @@ -428,12 +458,9 @@ impl Content {

/// delete_task deletes the persistent cache task content.
#[instrument(skip_all)]
pub async fn delete_persistent_cache_task(&self, persistent_cache_task_id: &str) -> Result<()> {
info!(
"delete persistent cache task content: {}",
persistent_cache_task_id
);
let persistent_cache_task_path = self.dir.join(persistent_cache_task_id);
pub async fn delete_persistent_cache_task(&self, task_id: &str) -> Result<()> {
info!("delete persistent cache task content: {}", task_id);
let persistent_cache_task_path = self.get_persistent_cache_task_path(task_id);
fs::remove_file(persistent_cache_task_path.as_path())
.await
.map_err(|err| {
Expand All @@ -442,4 +469,31 @@ impl Content {
})?;
Ok(())
}

/// get_persistent_cache_task_path returns the persistent cache task path by task id.
#[instrument(skip_all)]
fn get_persistent_cache_task_path(&self, task_id: &str) -> PathBuf {
// The persistent cache task needs split by the first 3 characters of task id(sha256) to
// avoid too many files in one directory.
self.dir
.join(DEFAULT_PERSISTENT_CACHE_TASK_DIR)
.join(&task_id[..3])
.join(task_id)
}

/// create_or_get_persistent_cache_task_path creates parent directories or returns the persistent cache task path by task id.
#[instrument(skip_all)]
async fn create_or_get_persistent_cache_task_path(&self, task_id: &str) -> Result<PathBuf> {
let task_dir = self
.dir
.join(DEFAULT_PERSISTENT_CACHE_TASK_DIR)
.join(&task_id[..3]);

fs::create_dir_all(&task_dir).await.map_err(|err| {
error!("create {:?} failed: {}", task_dir, err);
err
})?;

Ok(task_dir.join(task_id))
}
}
23 changes: 20 additions & 3 deletions dragonfly-client/src/gc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use crate::shutdown;
use dragonfly_api::scheduler::v2::{DeletePersistentCacheTaskRequest, DeleteTaskRequest};
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::Result;
use dragonfly_client_storage::{metadata, Storage};
use dragonfly_client_storage::{
content::{DEFAULT_CONTENT_DIR, DEFAULT_PERSISTENT_CACHE_TASK_DIR, DEFAULT_TASK_DIR},
metadata, Storage,
};
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{error, info, instrument};
Expand Down Expand Up @@ -127,7 +130,14 @@ impl GC {
/// evict_task_by_disk_usage evicts the task by disk usage.
#[instrument(skip_all)]
async fn evict_task_by_disk_usage(&self) -> Result<()> {
let stats = fs2::statvfs(self.config.storage.dir.as_path())?;
let stats = fs2::statvfs(
self.config
.storage
.dir
.join(DEFAULT_CONTENT_DIR)
.join(DEFAULT_TASK_DIR)
.as_path(),
)?;
let available_space = stats.available_space();
let total_space = stats.total_space();

Expand Down Expand Up @@ -226,7 +236,14 @@ impl GC {
/// evict_persistent_cache_task_by_disk_usage evicts the persistent cache task by disk usage.
#[instrument(skip_all)]
async fn evict_persistent_cache_task_by_disk_usage(&self) -> Result<()> {
let stats = fs2::statvfs(self.config.storage.dir.as_path())?;
let stats = fs2::statvfs(
self.config
.storage
.dir
.join(DEFAULT_CONTENT_DIR)
.join(DEFAULT_PERSISTENT_CACHE_TASK_DIR)
.as_path(),
)?;
let available_space = stats.available_space();
let total_space = stats.total_space();

Expand Down

0 comments on commit 1a85ccc

Please sign in to comment.