From 10bab190a28552cd05760b7bd75626990834877a Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 22 Nov 2024 20:53:33 +0800 Subject: [PATCH] fix: rocksdb panic caused by out of bounds (#860) Signed-off-by: Gaius --- Cargo.lock | 20 +- Cargo.toml | 18 +- dragonfly-client-config/src/dfdaemon.rs | 4 +- dragonfly-client-storage/src/content.rs | 43 ++- dragonfly-client-storage/src/lib.rs | 4 +- dragonfly-client-storage/src/metadata.rs | 4 +- .../src/grpc/dfdaemon_download.rs | 4 +- dragonfly-client/src/grpc/dfdaemon_upload.rs | 6 +- dragonfly-client/src/grpc/scheduler.rs | 22 +- dragonfly-client/src/proxy/mod.rs | 272 +++++++++--------- .../src/resource/persistent_cache_task.rs | 2 +- .../src/resource/piece_collector.rs | 23 +- dragonfly-client/src/resource/task.rs | 36 ++- 13 files changed, 235 insertions(+), 223 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ba3007c..8ee0e96e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -850,9 +850,9 @@ dependencies = [ [[package]] name = "dragonfly-api" -version = "2.0.171" +version = "2.0.173" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6af97b714cbbbc990cb644cb380803fb5b4c2fa1aa585ec47219cba08dc591d6" +checksum = "cd0e08874afeb80d0e9ac8c383eed6022ceed975dd3623b58a45d29856859161" dependencies = [ "prost 0.13.3", "prost-types", @@ -865,7 +865,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.1.117" +version = "0.1.118" dependencies = [ "anyhow", "blake3", @@ -937,7 +937,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.1.117" +version = "0.1.118" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -965,7 +965,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.1.117" +version = "0.1.118" dependencies = [ "bytesize", "bytesize-serde", @@ -991,7 +991,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.1.117" +version = "0.1.118" dependencies = [ "headers 0.4.0", "hyper 1.5.0", @@ -1009,7 +1009,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.1.117" +version = "0.1.118" dependencies = [ "anyhow", "clap", @@ -1026,7 +1026,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.1.117" +version = "0.1.118" dependencies = [ "base16ct", "bincode", @@ -1051,7 +1051,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.1.117" +version = "0.1.118" dependencies = [ "base16ct", "base64 0.22.1", @@ -1423,7 +1423,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.1.117" +version = "0.1.118" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index 4cf6e26f..5bf951f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.1.117" +version = "0.1.118" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,15 +22,15 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.1.117" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.117" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.117" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.117" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.117" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.117" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.117" } +dragonfly-client = { path = "dragonfly-client", version = "0.1.118" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.118" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.118" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.118" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.118" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.118" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.118" } thiserror = "1.0" -dragonfly-api = "=2.0.171" +dragonfly-api = "=2.0.173" reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] } rcgen = { version = "0.12.1", features = ["x509-parser"] } hyper = { version = "1.5", features = ["full"] } diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index c9b8110e..1de7b4f7 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -805,11 +805,11 @@ pub struct Storage { #[serde(default = "default_storage_keep")] pub keep: bool, - /// write_buffer_size is the buffer size for writing piece to disk, default is 4KB. + /// write_buffer_size is the buffer size for writing piece to disk, default is 128KB. #[serde(default = "default_storage_write_buffer_size")] pub write_buffer_size: usize, - /// read_buffer_size is the buffer size for reading piece from disk, default is 4KB. + /// read_buffer_size is the buffer size for reading piece from disk, default is 128KB. #[serde(default = "default_storage_read_buffer_size")] pub read_buffer_size: usize, } diff --git a/dragonfly-client-storage/src/content.rs b/dragonfly-client-storage/src/content.rs index 654164d9..2490da20 100644 --- a/dragonfly-client-storage/src/content.rs +++ b/dragonfly-client-storage/src/content.rs @@ -21,7 +21,7 @@ use std::cmp::{max, min}; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::fs::{self, File, OpenOptions}; -use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, SeekFrom}; +use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, BufWriter, SeekFrom}; use tokio_util::io::InspectReader; use tracing::{error, info, instrument, warn}; @@ -249,35 +249,29 @@ impl Content { range: Option, ) -> Result { let task_path = self.get_task_path(task_id); - if let Some(range) = range { + let mut f = File::open(task_path.as_path()).await.map_err(|err| { + error!("open {:?} failed: {}", task_path, err); + err + })?; + + // Calculate the target offset and length based on the range. + let (target_offset, target_length) = if let Some(range) = range { let target_offset = max(offset, range.start); let target_length = min(offset + length - 1, range.start + range.length - 1) - target_offset + 1; + (target_offset, target_length) + } else { + (offset, length) + }; - let mut f = File::open(task_path.as_path()).await.map_err(|err| { - error!("open {:?} failed: {}", task_path, err); + f.seek(SeekFrom::Start(target_offset)) + .await + .map_err(|err| { + error!("seek {:?} failed: {}", task_path, err); err })?; - f.seek(SeekFrom::Start(target_offset)) - .await - .map_err(|err| { - error!("seek {:?} failed: {}", task_path, err); - err - })?; - return Ok(f.take(target_length)); - } - - let mut f = File::open(task_path.as_path()).await.map_err(|err| { - error!("open {:?} failed: {}", task_path, err); - err - })?; - - f.seek(SeekFrom::Start(offset)).await.map_err(|err| { - error!("seek {:?} failed: {}", task_path, err); - err - })?; - Ok(f.take(length)) + Ok(f.take(target_length)) } /// write_piece writes the piece to the content. @@ -318,7 +312,8 @@ impl Content { })?; // Copy the piece to the file. - let length = io::copy(&mut tee, &mut f).await.map_err(|err| { + let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f); + let length = io::copy(&mut tee, &mut writer).await.map_err(|err| { error!("copy {:?} failed: {}", task_path, err); err })?; diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 24ec07f4..3aeb1c74 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -24,7 +24,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::io::AsyncRead; -use tracing::{debug, error, info, instrument}; +use tracing::{debug, error, instrument}; pub mod content; pub mod metadata; @@ -449,7 +449,7 @@ impl Storage { // If the piece is finished, return. if piece.is_finished() { - info!("wait piece finished success"); + debug!("wait piece finished success"); return Ok(piece); } diff --git a/dragonfly-client-storage/src/metadata.rs b/dragonfly-client-storage/src/metadata.rs index 3d5569ff..cf8266aa 100644 --- a/dragonfly-client-storage/src/metadata.rs +++ b/dragonfly-client-storage/src/metadata.rs @@ -46,7 +46,7 @@ pub struct Task { pub response_header: HashMap, /// uploading_count is the count of the task being uploaded by other peers. - pub uploading_count: u64, + pub uploading_count: i64, /// uploaded_count is the count of the task has been uploaded by other peers. pub uploaded_count: u64, @@ -244,7 +244,7 @@ pub struct Piece { pub parent_id: Option, /// uploading_count is the count of the piece being uploaded by other peers. - pub uploading_count: u64, + pub uploading_count: i64, /// uploaded_count is the count of the piece has been uploaded by other peers. pub uploaded_count: u64, diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index 34b76f4e..2237822d 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -354,7 +354,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { let download_clone = download.clone(); let task_manager_clone = task_manager.clone(); let task_clone = task.clone(); - let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10); + let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024); tokio::spawn( async move { match task_manager_clone @@ -743,7 +743,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { let request_clone = request.clone(); let task_manager_clone = task_manager.clone(); let task_clone = task.clone(); - let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10); + let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024); tokio::spawn( async move { match task_manager_clone diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index e792e6f3..dbb104ef 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -347,7 +347,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { let download_clone = download.clone(); let task_manager_clone = task_manager.clone(); let task_clone = task.clone(); - let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10); + let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024); tokio::spawn( async move { match task_manager_clone @@ -644,7 +644,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { let task_manager = self.task.clone(); // Initialize stream channel. - let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10); + let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024); tokio::spawn( async move { loop { @@ -940,7 +940,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { let request_clone = request.clone(); let task_manager_clone = task_manager.clone(); let task_clone = task.clone(); - let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10); + let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024); tokio::spawn( async move { match task_manager_clone diff --git a/dragonfly-client/src/grpc/scheduler.rs b/dragonfly-client/src/grpc/scheduler.rs index 2d83c943..a141bdec 100644 --- a/dragonfly-client/src/grpc/scheduler.rs +++ b/dragonfly-client/src/grpc/scheduler.rs @@ -527,9 +527,10 @@ impl SchedulerClient { #[instrument(skip(self))] async fn update_available_scheduler_addrs(&self) -> Result<()> { // Get the endpoints of available schedulers. - let data = self.dynconfig.data.read().await; - let data_available_schedulers_clone = data.available_schedulers.clone(); - drop(data); + let data_available_schedulers_clone = { + let data = self.dynconfig.data.read().await; + data.available_schedulers.clone() + }; // Check if the available schedulers is empty. if data_available_schedulers_clone.is_empty() { @@ -537,9 +538,10 @@ impl SchedulerClient { } // Get the available schedulers. - let available_schedulers = self.available_schedulers.read().await; - let available_schedulers_clone = available_schedulers.clone(); - drop(available_schedulers); + let available_schedulers_clone = { + let available_schedulers = self.available_schedulers.read().await; + available_schedulers.clone() + }; // Check if the available schedulers is not changed. if data_available_schedulers_clone.len() == available_schedulers_clone.len() @@ -574,13 +576,11 @@ impl SchedulerClient { new_available_schedulers.push(available_scheduler.clone()); // Add the scheduler address to the addresses of available schedulers. - new_available_scheduler_addrs - .push(SocketAddr::new(ip, available_scheduler.port as u16)); + let socket_addr = SocketAddr::new(ip, available_scheduler.port as u16); + new_available_scheduler_addrs.push(socket_addr); // Add the scheduler to the hashring. - new_hashring.add(VNode { - addr: SocketAddr::new(ip, available_scheduler.port as u16), - }); + new_hashring.add(VNode { addr: socket_addr }); } // Update the available schedulers. diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index 20bf5473..09fd4fff 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -59,7 +59,7 @@ use tokio::sync::mpsc; use tokio::time::sleep; use tokio_rustls::TlsAcceptor; use tokio_util::io::ReaderStream; -use tracing::{debug, error, info, instrument, Span}; +use tracing::{debug, error, info, instrument, Instrument, Span}; pub mod header; @@ -589,7 +589,7 @@ pub async fn upgraded_handler( } /// proxy_by_dfdaemon proxies the request via the dfdaemon. -#[instrument(skip_all)] +#[instrument(skip_all, fields(host_id, task_id, peer_id))] async fn proxy_by_dfdaemon( config: Arc, task: Arc, @@ -657,6 +657,11 @@ async fn proxy_by_dfdaemon( )); }; + // Span record the host_id, task_id, and peer_id. + Span::current().record("host_id", message.host_id.as_str()); + Span::current().record("task_id", message.task_id.as_str()); + Span::current().record("peer_id", message.peer_id.as_str()); + // Handle the download task started response. let Some(download_task_response::Response::DownloadTaskStartedResponse( download_task_started_response, @@ -670,10 +675,10 @@ async fn proxy_by_dfdaemon( }; // Write the task data to the reader. - let (reader, mut writer) = tokio::io::duplex(4096); + let (reader, mut writer) = tokio::io::duplex(10 * 1024); // Write the status code to the writer. - let (sender, mut receiver) = mpsc::channel(1024 * 10); + let (sender, mut receiver) = mpsc::channel(10 * 1024); // Construct the response body. let reader_stream = ReaderStream::new(reader); @@ -693,157 +698,166 @@ async fn proxy_by_dfdaemon( // Write task data to pipe. If grpc received error message, // shutdown the writer. - tokio::spawn(async move { - // Initialize the hashmap of the finished piece readers and pieces. - let mut finished_piece_readers = HashMap::new(); - - // Get the first piece number from the started response. - let Some(first_piece) = download_task_started_response.pieces.first() else { - error!("response pieces is empty"); - if let Err(err) = writer.shutdown().await { - error!("writer shutdown error: {}", err); - } - - return; - }; - let mut need_piece_number = first_piece.number; - - // Read piece data from stream and write to pipe. If the piece data is - // not in order, store it in the hashmap, and write it to the pipe - // when the previous piece data is written. - loop { - match out_stream.message().await { - Ok(Some(message)) => { - if let Some(download_task_response::Response::DownloadPieceFinishedResponse( - download_task_response, - )) = message.response - { - // Sleep for a while to avoid the out stream is aborted. If the task is small, proxy read the piece - // before the task download is finished. It will cause `user body write aborted` error. - sleep(Duration::from_millis(1)).await; - - // Send the none response to the client, if the first piece is received. - if !initialized { - debug!("first piece received, send response"); - sender.send(None).await.unwrap_or_default(); - initialized = true; - } - - let Some(piece) = download_task_response.piece else { - error!("response piece is empty"); - writer.shutdown().await.unwrap_or_else(|err| { - error!("writer shutdown error: {}", err); - }); + tokio::spawn( + async move { + // Initialize the hashmap of the finished piece readers and pieces. + let mut finished_piece_readers = HashMap::new(); + + // Get the first piece number from the started response. + let Some(first_piece) = download_task_started_response.pieces.first() else { + error!("response pieces is empty"); + if let Err(err) = writer.shutdown().await { + error!("writer shutdown error: {}", err); + } - return; - }; - - let piece_reader = match task - .piece - .download_from_local_peer_into_async_read( - task.piece - .id(message.task_id.as_str(), piece.number) - .as_str(), - message.task_id.as_str(), - piece.length, - download_task_started_response.range, - true, - ) - .await + return; + }; + let mut need_piece_number = first_piece.number; + + // Read piece data from stream and write to pipe. If the piece data is + // not in order, store it in the hashmap, and write it to the pipe + // when the previous piece data is written. + loop { + match out_stream.message().await { + Ok(Some(message)) => { + if let Some( + download_task_response::Response::DownloadPieceFinishedResponse( + download_task_response, + ), + ) = message.response { - Ok(piece_reader) => piece_reader, - Err(err) => { - error!("download piece reader error: {}", err); - writer.shutdown().await.unwrap_or_else(|err| { - error!("writer shutdown error: {}", err); - }); - - return; + // Sleep for a while to avoid the out stream is aborted. If the task is small, proxy read the piece + // before the task download is finished. It will cause `user body write aborted` error. + sleep(Duration::from_millis(10)).await; + + // Send the none response to the client, if the first piece is received. + if !initialized { + debug!("first piece received, send response"); + sender.send(None).await.unwrap_or_default(); + initialized = true; } - }; - - // Use a buffer to read the piece. - let piece_reader = BufReader::with_capacity(read_buffer_size, piece_reader); - // Write the piece data to the pipe in order. - finished_piece_readers.insert(piece.number, piece_reader); - while let Some(piece_reader) = - finished_piece_readers.get_mut(&need_piece_number) - { - debug!("copy piece {} to stream", need_piece_number); - if let Err(err) = tokio::io::copy(piece_reader, &mut writer).await { - error!("download piece reader error: {}", err); + let Some(piece) = download_task_response.piece else { + error!("response piece is empty"); writer.shutdown().await.unwrap_or_else(|err| { error!("writer shutdown error: {}", err); }); return; + }; + + let piece_reader = match task + .piece + .download_from_local_peer_into_async_read( + task.piece + .id(message.task_id.as_str(), piece.number) + .as_str(), + message.task_id.as_str(), + piece.length, + download_task_started_response.range, + true, + ) + .await + { + Ok(piece_reader) => piece_reader, + Err(err) => { + error!("download piece reader error: {}", err); + writer.shutdown().await.unwrap_or_else(|err| { + error!("writer shutdown error: {}", err); + }); + + return; + } + }; + + // Use a buffer to read the piece. + let piece_reader = + BufReader::with_capacity(read_buffer_size, piece_reader); + + // Write the piece data to the pipe in order. + finished_piece_readers.insert(piece.number, piece_reader); + while let Some(piece_reader) = + finished_piece_readers.get_mut(&need_piece_number) + { + debug!("copy piece {} to stream", need_piece_number); + if let Err(err) = tokio::io::copy(piece_reader, &mut writer).await { + error!("download piece reader error: {}", err); + writer.shutdown().await.unwrap_or_else(|err| { + error!("writer shutdown error: {}", err); + }); + + return; + } + + need_piece_number += 1; } + } else { + error!("response unknown message"); + writer.shutdown().await.unwrap_or_else(|err| { + error!("writer shutdown error: {}", err); + }); - need_piece_number += 1; + return; } - } else { - error!("response unknown message"); - writer.shutdown().await.unwrap_or_else(|err| { - error!("writer shutdown error: {}", err); - }); - - return; } - } - Ok(None) => { - info!("message is none"); - if let Err(err) = writer.flush().await { - error!("writer flush error: {}", err); - } - - return; - } - Err(err) => { - if initialized { - error!("stream error: {}", err); + Ok(None) => { + info!("message is none"); if let Err(err) = writer.flush().await { error!("writer flush error: {}", err); } return; } + Err(err) => { + if initialized { + error!("stream error: {}", err); + if let Err(err) = writer.flush().await { + error!("writer flush error: {}", err); + } - match serde_json::from_slice::(err.details()) { - Ok(backend) => { - error!("download task failed: {:?}", backend); - sender - .send(Some(make_error_response( - http::StatusCode::from_u16( - backend.status_code.unwrap_or_default() as u16, - ) - .unwrap_or(http::StatusCode::INTERNAL_SERVER_ERROR), - Some(hashmap_to_headermap(&backend.header).unwrap_or_default()), - ))) - .await - .unwrap_or_default(); + return; } - Err(_) => { - error!("download task failed: {}", err); - sender - .send(Some(make_error_response( - http::StatusCode::INTERNAL_SERVER_ERROR, - None, - ))) - .await - .unwrap_or_default(); + + match serde_json::from_slice::(err.details()) { + Ok(backend) => { + error!("download task failed: {:?}", backend); + sender + .send(Some(make_error_response( + http::StatusCode::from_u16( + backend.status_code.unwrap_or_default() as u16, + ) + .unwrap_or(http::StatusCode::INTERNAL_SERVER_ERROR), + Some( + hashmap_to_headermap(&backend.header) + .unwrap_or_default(), + ), + ))) + .await + .unwrap_or_default(); + } + Err(_) => { + error!("download task failed: {}", err); + sender + .send(Some(make_error_response( + http::StatusCode::INTERNAL_SERVER_ERROR, + None, + ))) + .await + .unwrap_or_default(); + } } - } - return; - } - }; + return; + } + }; + } } - }); + .in_current_span(), + ); match receiver.recv().await { - Some(Some(response)) => return Ok(response), - Some(None) => Ok(response), + Some(Some(response)) => Ok(response), + Some(None) => return Ok(response), None => Ok(make_error_response( http::StatusCode::INTERNAL_SERVER_ERROR, None, diff --git a/dragonfly-client/src/resource/persistent_cache_task.rs b/dragonfly-client/src/resource/persistent_cache_task.rs index c6013297..95ae17ce 100644 --- a/dragonfly-client/src/resource/persistent_cache_task.rs +++ b/dragonfly-client/src/resource/persistent_cache_task.rs @@ -499,7 +499,7 @@ impl PersistentCacheTask { let mut finished_pieces: Vec = Vec::new(); // Initialize stream channel. - let (in_stream_tx, in_stream_rx) = mpsc::channel(1024 * 10); + let (in_stream_tx, in_stream_rx) = mpsc::channel(10 * 1024); // Send the register peer request. in_stream_tx diff --git a/dragonfly-client/src/resource/piece_collector.rs b/dragonfly-client/src/resource/piece_collector.rs index 21558779..f3d3562c 100644 --- a/dragonfly-client/src/resource/piece_collector.rs +++ b/dragonfly-client/src/resource/piece_collector.rs @@ -83,12 +83,9 @@ impl PieceCollector { parents: Vec, ) -> Self { let collected_pieces = Arc::new(DashMap::new()); - interested_pieces - .clone() - .into_iter() - .for_each(|interested_piece| { - collected_pieces.insert(interested_piece.number, "".to_string()); - }); + for interested_piece in &interested_pieces { + collected_pieces.insert(interested_piece.number, String::new()); + } Self { config, @@ -110,13 +107,13 @@ impl PieceCollector { let interested_pieces = self.interested_pieces.clone(); let collected_pieces = self.collected_pieces.clone(); let collected_piece_timeout = self.config.download.piece_timeout; - let (collected_piece_tx, collected_piece_rx) = mpsc::channel(1024 * 10); + let (collected_piece_tx, collected_piece_rx) = mpsc::channel(10 * 1024); tokio::spawn( async move { Self::collect_from_remote_peers( config, - host_id, - task_id, + host_id.as_str(), + task_id.as_str(), parents, interested_pieces, collected_pieces, @@ -139,8 +136,8 @@ impl PieceCollector { #[instrument(skip_all)] async fn collect_from_remote_peers( config: Arc, - host_id: String, - task_id: String, + host_id: &str, + task_id: &str, parents: Vec, interested_pieces: Vec, collected_pieces: Arc>, @@ -250,8 +247,8 @@ impl PieceCollector { join_set.spawn( sync_pieces( config.clone(), - host_id.clone(), - task_id.clone(), + host_id.to_string(), + task_id.to_string(), parent.clone(), parents.clone(), interested_pieces.clone(), diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 6b982ce1..624b2579 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -26,7 +26,7 @@ use dragonfly_api::dfdaemon::{ self, v2::{download_task_response, DownloadTaskResponse}, }; -use dragonfly_api::errordetails::v2::Backend; +use dragonfly_api::errordetails::v2::{Backend, Unknown}; use dragonfly_api::scheduler::v2::{ announce_peer_request, announce_peer_response, download_piece_back_to_source_failed_request, AnnouncePeerRequest, DeleteTaskRequest, DownloadPeerBackToSourceFailedRequest, @@ -484,7 +484,7 @@ impl Task { let mut finished_pieces: Vec = Vec::new(); // Initialize stream channel. - let (in_stream_tx, in_stream_rx) = mpsc::channel(1024 * 10); + let (in_stream_tx, in_stream_rx) = mpsc::channel(10 * 1024); // Send the register peer request. in_stream_tx @@ -916,11 +916,10 @@ impl Task { task.id.as_str(), interested_pieces.clone(), parents - .clone() .into_iter() .map(|peer| piece_collector::CollectedParent { - id: peer.id.clone(), - host: peer.host.clone(), + id: peer.id, + host: peer.host, }) .collect(), ); @@ -1364,7 +1363,11 @@ impl Task { request: Some(announce_peer_request::Request::DownloadPieceBackToSourceFailedRequest( DownloadPieceBackToSourceFailedRequest{ piece_number: None, - response: None, + response: Some(download_piece_back_to_source_failed_request::Response::Unknown( + Unknown{ + message: Some("send timeout".to_string()), + } + )), } )), }, REQUEST_TIMEOUT) @@ -1387,7 +1390,11 @@ impl Task { request: Some(announce_peer_request::Request::DownloadPieceBackToSourceFailedRequest( DownloadPieceBackToSourceFailedRequest{ piece_number: None, - response: None, + response: Some(download_piece_back_to_source_failed_request::Response::Unknown( + Unknown{ + message: Some(err.to_string()), + } + )), } )), }, REQUEST_TIMEOUT) @@ -1429,7 +1436,7 @@ impl Task { let piece = match self.piece.get(piece_id.as_str()) { Ok(Some(piece)) => piece, Ok(None) => { - info!("piece {} not found in local storage", piece_id); + debug!("piece {} not found in local storage", piece_id); continue; } Err(err) => { @@ -1592,7 +1599,6 @@ impl Task { })?; info!("finished piece {} from source", piece_id); - Ok(metadata) } @@ -1640,14 +1646,14 @@ impl Task { } // Check if all pieces are downloaded. - if finished_pieces.len() == interested_pieces.len() { - return Ok(finished_pieces); + if finished_pieces.len() != interested_pieces.len() { + // If not all pieces are downloaded, return an error. + return Err(Error::Unknown( + "not all pieces are downloaded from source".to_string(), + )); } - // If not all pieces are downloaded, return an error. - Err(Error::Unknown( - "not all pieces are downloaded from source".to_string(), - )) + return Ok(finished_pieces); } /// stat_task returns the task metadata.