Skip to content

Commit

Permalink
fix: rocksdb panic caused by out of bounds (#860)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Nov 22, 2024
1 parent 88b1f7e commit 10bab19
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 223 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.1.117"
version = "0.1.118"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,15 +22,15 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.117" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.117" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.117" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.117" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.117" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.117" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.117" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.118" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.118" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.118" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.118" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.118" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.118" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.118" }
thiserror = "1.0"
dragonfly-api = "=2.0.171"
dragonfly-api = "=2.0.173"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
rcgen = { version = "0.12.1", features = ["x509-parser"] }
hyper = { version = "1.5", features = ["full"] }
Expand Down
4 changes: 2 additions & 2 deletions dragonfly-client-config/src/dfdaemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,11 +805,11 @@ pub struct Storage {
#[serde(default = "default_storage_keep")]
pub keep: bool,

/// write_buffer_size is the buffer size for writing piece to disk, default is 4KB.
/// write_buffer_size is the buffer size for writing piece to disk, default is 128KB.
#[serde(default = "default_storage_write_buffer_size")]
pub write_buffer_size: usize,

/// read_buffer_size is the buffer size for reading piece from disk, default is 4KB.
/// read_buffer_size is the buffer size for reading piece from disk, default is 128KB.
#[serde(default = "default_storage_read_buffer_size")]
pub read_buffer_size: usize,
}
Expand Down
43 changes: 19 additions & 24 deletions dragonfly-client-storage/src/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::cmp::{max, min};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs::{self, File, OpenOptions};
use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, SeekFrom};
use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, BufWriter, SeekFrom};
use tokio_util::io::InspectReader;
use tracing::{error, info, instrument, warn};

Expand Down Expand Up @@ -249,35 +249,29 @@ impl Content {
range: Option<Range>,
) -> Result<impl AsyncRead> {
let task_path = self.get_task_path(task_id);
if let Some(range) = range {
let mut f = File::open(task_path.as_path()).await.map_err(|err| {
error!("open {:?} failed: {}", task_path, err);
err
})?;

// Calculate the target offset and length based on the range.
let (target_offset, target_length) = if let Some(range) = range {
let target_offset = max(offset, range.start);
let target_length =
min(offset + length - 1, range.start + range.length - 1) - target_offset + 1;
(target_offset, target_length)
} else {
(offset, length)
};

let mut f = File::open(task_path.as_path()).await.map_err(|err| {
error!("open {:?} failed: {}", task_path, err);
f.seek(SeekFrom::Start(target_offset))
.await
.map_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
err
})?;

f.seek(SeekFrom::Start(target_offset))
.await
.map_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
err
})?;
return Ok(f.take(target_length));
}

let mut f = File::open(task_path.as_path()).await.map_err(|err| {
error!("open {:?} failed: {}", task_path, err);
err
})?;

f.seek(SeekFrom::Start(offset)).await.map_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
err
})?;
Ok(f.take(length))
Ok(f.take(target_length))
}

/// write_piece writes the piece to the content.
Expand Down Expand Up @@ -318,7 +312,8 @@ impl Content {
})?;

// Copy the piece to the file.
let length = io::copy(&mut tee, &mut f).await.map_err(|err| {
let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f);
let length = io::copy(&mut tee, &mut writer).await.map_err(|err| {
error!("copy {:?} failed: {}", task_path, err);
err
})?;
Expand Down
4 changes: 2 additions & 2 deletions dragonfly-client-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncRead;
use tracing::{debug, error, info, instrument};
use tracing::{debug, error, instrument};

pub mod content;
pub mod metadata;
Expand Down Expand Up @@ -449,7 +449,7 @@ impl Storage {

// If the piece is finished, return.
if piece.is_finished() {
info!("wait piece finished success");
debug!("wait piece finished success");
return Ok(piece);
}

Expand Down
4 changes: 2 additions & 2 deletions dragonfly-client-storage/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct Task {
pub response_header: HashMap<String, String>,

/// uploading_count is the count of the task being uploaded by other peers.
pub uploading_count: u64,
pub uploading_count: i64,

/// uploaded_count is the count of the task has been uploaded by other peers.
pub uploaded_count: u64,
Expand Down Expand Up @@ -244,7 +244,7 @@ pub struct Piece {
pub parent_id: Option<String>,

/// uploading_count is the count of the piece being uploaded by other peers.
pub uploading_count: u64,
pub uploading_count: i64,

/// uploaded_count is the count of the piece has been uploaded by other peers.
pub uploaded_count: u64,
Expand Down
4 changes: 2 additions & 2 deletions dragonfly-client/src/grpc/dfdaemon_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
let download_clone = download.clone();
let task_manager_clone = task_manager.clone();
let task_clone = task.clone();
let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10);
let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024);
tokio::spawn(
async move {
match task_manager_clone
Expand Down Expand Up @@ -743,7 +743,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
let request_clone = request.clone();
let task_manager_clone = task_manager.clone();
let task_clone = task.clone();
let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10);
let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024);
tokio::spawn(
async move {
match task_manager_clone
Expand Down
6 changes: 3 additions & 3 deletions dragonfly-client/src/grpc/dfdaemon_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
let download_clone = download.clone();
let task_manager_clone = task_manager.clone();
let task_clone = task.clone();
let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10);
let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024);
tokio::spawn(
async move {
match task_manager_clone
Expand Down Expand Up @@ -644,7 +644,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
let task_manager = self.task.clone();

// Initialize stream channel.
let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10);
let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024);
tokio::spawn(
async move {
loop {
Expand Down Expand Up @@ -940,7 +940,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
let request_clone = request.clone();
let task_manager_clone = task_manager.clone();
let task_clone = task.clone();
let (out_stream_tx, out_stream_rx) = mpsc::channel(1024 * 10);
let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024);
tokio::spawn(
async move {
match task_manager_clone
Expand Down
22 changes: 11 additions & 11 deletions dragonfly-client/src/grpc/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,19 +527,21 @@ impl SchedulerClient {
#[instrument(skip(self))]
async fn update_available_scheduler_addrs(&self) -> Result<()> {
// Get the endpoints of available schedulers.
let data = self.dynconfig.data.read().await;
let data_available_schedulers_clone = data.available_schedulers.clone();
drop(data);
let data_available_schedulers_clone = {
let data = self.dynconfig.data.read().await;
data.available_schedulers.clone()
};

// Check if the available schedulers is empty.
if data_available_schedulers_clone.is_empty() {
return Err(Error::AvailableSchedulersNotFound);
}

// Get the available schedulers.
let available_schedulers = self.available_schedulers.read().await;
let available_schedulers_clone = available_schedulers.clone();
drop(available_schedulers);
let available_schedulers_clone = {
let available_schedulers = self.available_schedulers.read().await;
available_schedulers.clone()
};

// Check if the available schedulers is not changed.
if data_available_schedulers_clone.len() == available_schedulers_clone.len()
Expand Down Expand Up @@ -574,13 +576,11 @@ impl SchedulerClient {
new_available_schedulers.push(available_scheduler.clone());

// Add the scheduler address to the addresses of available schedulers.
new_available_scheduler_addrs
.push(SocketAddr::new(ip, available_scheduler.port as u16));
let socket_addr = SocketAddr::new(ip, available_scheduler.port as u16);
new_available_scheduler_addrs.push(socket_addr);

// Add the scheduler to the hashring.
new_hashring.add(VNode {
addr: SocketAddr::new(ip, available_scheduler.port as u16),
});
new_hashring.add(VNode { addr: socket_addr });
}

// Update the available schedulers.
Expand Down
Loading

0 comments on commit 10bab19

Please sign in to comment.