From ee21989120b16db4e8456e7d163f9ba06aad98c9 Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 8 Nov 2024 17:34:13 +0800 Subject: [PATCH] feat: when the task is downloading, it is not allowed to delete the task (#828) Signed-off-by: Gaius --- Cargo.lock | 16 ++--- Cargo.toml | 16 ++--- dragonfly-client-storage/src/metadata.rs | 81 ++++++------------------ dragonfly-client/src/gc/mod.rs | 32 +++++++++- 4 files changed, 67 insertions(+), 78 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4e41e60d..2f39ed47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -856,7 +856,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.1.115" +version = "0.1.116" dependencies = [ "anyhow", "blake3", @@ -928,7 +928,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.1.115" +version = "0.1.116" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -956,7 +956,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.1.115" +version = "0.1.116" dependencies = [ "bytesize", "bytesize-serde", @@ -982,7 +982,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.1.115" +version = "0.1.116" dependencies = [ "headers 0.4.0", "hyper 1.4.1", @@ -1000,7 +1000,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.1.115" +version = "0.1.116" dependencies = [ "anyhow", "clap", @@ -1017,7 +1017,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.1.115" +version = "0.1.116" dependencies = [ "base16ct", "bincode", @@ -1042,7 +1042,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.1.115" +version = "0.1.116" dependencies = [ "base16ct", "base64 0.22.1", @@ -1394,7 +1394,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.1.115" +version = "0.1.116" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index cd4c78f0..cd86f9df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.1.115" +version = "0.1.116" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,13 +22,13 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.1.115" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.115" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.115" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.115" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.115" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.115" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.115" } +dragonfly-client = { path = "dragonfly-client", version = "0.1.116" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.116" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.116" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.116" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.116" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.116" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.116" } thiserror = "1.0" dragonfly-api = "=2.0.169" reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] } diff --git a/dragonfly-client-storage/src/metadata.rs b/dragonfly-client-storage/src/metadata.rs index 3166d05c..665a3e95 100644 --- a/dragonfly-client-storage/src/metadata.rs +++ b/dragonfly-client-storage/src/metadata.rs @@ -81,7 +81,7 @@ impl Task { self.finished_at.is_none() } - /// is_downloading returns whether the task is downloading. + /// is_uploading returns whether the task is uploading. pub fn is_uploading(&self) -> bool { self.uploading_count > 0 } @@ -184,7 +184,7 @@ impl PersistentCacheTask { self.finished_at.is_none() } - /// is_downloading returns whether the persistent cache task is downloading. + /// is_uploading returns whether the persistent cache task is uploading. pub fn is_uploading(&self) -> bool { self.uploading_count > 0 } @@ -947,47 +947,30 @@ mod tests { assert!(task.response_header.is_empty()); assert_eq!(task.uploading_count, 0); assert_eq!(task.uploaded_count, 0); + assert!(!task.is_finished()); // Test download_task_finished. metadata.download_task_finished(task_id).unwrap(); let task = metadata.get_task(task_id).unwrap().unwrap(); - assert!( - task.is_finished(), - "task should be finished after download_task_finished" - ); + assert!(task.is_finished()); // Test upload_task_started. metadata.upload_task_started(task_id).unwrap(); let task = metadata.get_task(task_id).unwrap().unwrap(); - assert_eq!( - task.uploading_count, 1, - "uploading_count should be increased by 1 after upload_task_started" - ); + assert_eq!(task.uploading_count, 1,); // Test upload_task_finished. metadata.upload_task_finished(task_id).unwrap(); let task = metadata.get_task(task_id).unwrap().unwrap(); - assert_eq!( - task.uploading_count, 0, - "uploading_count should be decreased by 1 after upload_task_finished" - ); - assert_eq!( - task.uploaded_count, 1, - "uploaded_count should be increased by 1 after upload_task_finished" - ); + assert_eq!(task.uploading_count, 0,); + assert_eq!(task.uploaded_count, 1,); // Test upload_task_failed. let task = metadata.upload_task_started(task_id).unwrap(); assert_eq!(task.uploading_count, 1); let task = metadata.upload_task_failed(task_id).unwrap(); - assert_eq!( - task.uploading_count, 0, - "uploading_count should be decreased by 1 after upload_task_failed" - ); - assert_eq!( - task.uploaded_count, 1, - "uploaded_count should not be changed after upload_task_failed" - ); + assert_eq!(task.uploading_count, 0,); + assert_eq!(task.uploaded_count, 1,); // Test get_tasks. let task_id = "a535b115f18d96870f0422ac891f91dd162f2f391e4778fb84279701fcd02dd1"; @@ -995,12 +978,12 @@ mod tests { .download_task_started(task_id, Some(1024), None, None) .unwrap(); let tasks = metadata.get_tasks().unwrap(); - assert_eq!(tasks.len(), 2, "should get 2 tasks in total"); + assert_eq!(tasks.len(), 2); // Test delete_task. metadata.delete_task(task_id).unwrap(); let task = metadata.get_task(task_id).unwrap(); - assert!(task.is_none(), "task should be deleted after delete_task"); + assert!(task.is_none()); } #[test] @@ -1013,24 +996,15 @@ mod tests { // Test download_piece_started. metadata.download_piece_started(task_id, 1).unwrap(); let piece = metadata.get_piece(task_id, 1).unwrap().unwrap(); - assert_eq!( - piece.number, 1, - "should get newly created piece with number specified" - ); + assert_eq!(piece.number, 1,); // Test download_piece_finished. metadata .download_piece_finished(task_id, 1, 0, 1024, "digest1", None) .unwrap(); let piece = metadata.get_piece(task_id, 1).unwrap().unwrap(); - assert_eq!( - piece.length, 1024, - "piece should be updated after download_piece_finished" - ); - assert_eq!( - piece.digest, "digest1", - "piece should be updated after download_piece_finished" - ); + assert_eq!(piece.length, 1024,); + assert_eq!(piece.digest, "digest1",); // Test get_pieces. metadata.download_piece_started(task_id, 2).unwrap(); @@ -1043,43 +1017,28 @@ mod tests { metadata.download_piece_started(task_id, 3).unwrap(); metadata.download_piece_failed(task_id, 2).unwrap(); let piece = metadata.get_piece(task_id, 2).unwrap(); - assert!( - piece.is_none(), - "piece should be deleted after download_piece_failed" - ); + assert!(piece.is_none()); // Test upload_piece_started. metadata.upload_piece_started(task_id, 3).unwrap(); let piece = metadata.get_piece(task_id, 3).unwrap().unwrap(); - assert_eq!( - piece.uploading_count, 1, - "piece should be updated after upload_piece_started" - ); + assert_eq!(piece.uploading_count, 1,); // Test upload_piece_finished. metadata.upload_piece_finished(task_id, 3).unwrap(); let piece = metadata.get_piece(task_id, 3).unwrap().unwrap(); - assert_eq!( - piece.uploading_count, 0, - "piece should be updated after upload_piece_finished" - ); - assert_eq!( - piece.uploaded_count, 1, - "piece should be updated after upload_piece_finished" - ); + assert_eq!(piece.uploading_count, 0,); + assert_eq!(piece.uploaded_count, 1,); // Test upload_piece_failed. metadata.upload_piece_started(task_id, 3).unwrap(); metadata.upload_piece_failed(task_id, 3).unwrap(); let piece = metadata.get_piece(task_id, 3).unwrap().unwrap(); - assert_eq!( - piece.uploading_count, 0, - "piece should be updated after upload_piece_failed" - ); + assert_eq!(piece.uploading_count, 0,); // Test delete_pieces. metadata.delete_pieces(task_id).unwrap(); let pieces = metadata.get_pieces(task_id).unwrap(); - assert!(pieces.is_empty(), "should get 0 pieces after delete_pieces"); + assert!(pieces.is_empty()); } } diff --git a/dragonfly-client/src/gc/mod.rs b/dragonfly-client/src/gc/mod.rs index 15e82d47..f2d54486 100644 --- a/dragonfly-client/src/gc/mod.rs +++ b/dragonfly-client/src/gc/mod.rs @@ -16,6 +16,7 @@ use crate::grpc::scheduler::SchedulerClient; use crate::shutdown; +use chrono::Utc; use dragonfly_api::scheduler::v2::{DeletePersistentCacheTaskRequest, DeleteTaskRequest}; use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::Result; @@ -24,9 +25,14 @@ use dragonfly_client_storage::{ metadata, Storage, }; use std::sync::Arc; +use std::time::Duration; use tokio::sync::mpsc; use tracing::{error, info, instrument}; +// DOWNLOAD_TASK_TIMEOUT is the timeout of downloading the task. If the task download timeout, the +// task will be garbage collected by disk usage, default 2 hours. +pub const DOWNLOAD_TASK_TIMEOUT: Duration = Duration::from_secs(2 * 60 * 60); + /// GC is the garbage collector of dfdaemon. pub struct GC { /// config is the configuration of the dfdaemon. @@ -185,6 +191,17 @@ impl GC { } }; + // If the task is started and not finished, and the task download is not timeout, + // skip it. + if task.is_started() + && !task.is_finished() + && !task.is_failed() + && (task.created_at + DOWNLOAD_TASK_TIMEOUT > Utc::now().naive_utc()) + { + info!("task {} is started and not finished, skip it", task.id); + continue; + } + // Evict the task. self.storage.delete_task(&task.id).await; @@ -290,12 +307,25 @@ impl GC { continue; } - let task_space = task.content_length(); + // If the task is started and not finished, and the task download is not timeout, + // skip it. + if task.is_started() + && !task.is_finished() + && !task.is_failed() + && (task.created_at + DOWNLOAD_TASK_TIMEOUT > Utc::now().naive_utc()) + { + info!( + "persistent cache task {} is started and not finished, skip it", + task.id + ); + continue; + } // Evict the task. self.storage.delete_task(&task.id).await; // Update the evicted space. + let task_space = task.content_length(); evicted_space += task_space; info!( "evict persistent cache task {} size {}",