Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attemp to resolve deadlock in blocking fifo open #346

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions crates/runc-shim/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use std::{
env,
fs::File,
future::Future,
io::IoSliceMut,
ops::Deref,
os::{
Expand All @@ -25,6 +26,7 @@ use std::{
},
path::Path,
sync::Arc,
time::Duration,
};

use containerd_shim::{
Expand Down Expand Up @@ -59,6 +61,8 @@ pub const INIT_PID_FILE: &str = "init.pid";
pub const LOG_JSON_FILE: &str = "log.json";
pub const FIFO_SCHEME: &str = "fifo";

const TIMEOUT_DURATION: std::time::Duration = Duration::from_secs(3);

#[derive(Deserialize)]
pub struct Log {
pub level: String,
Expand Down Expand Up @@ -248,3 +252,17 @@ pub(crate) fn xdg_runtime_dir() -> String {
env::var("XDG_RUNTIME_DIR")
.unwrap_or_else(|_| env::temp_dir().to_str().unwrap_or(".").to_string())
}

pub async fn handle_file_open<F, Fut>(file_op: F) -> Result<tokio::fs::File, tokio::io::Error>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<tokio::fs::File, tokio::io::Error>> + Send,
{
match tokio::time::timeout(TIMEOUT_DURATION, file_op()).await {
Ok(result) => result,
Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"File operation timed out",
)),
}
}
82 changes: 50 additions & 32 deletions crates/runc-shim/src/runc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ use super::{
};
use crate::{
common::{
check_kill_error, create_io, create_runc, get_spec_from_request, receive_socket,
CreateConfig, Log, ProcessIO, ShimExecutor, INIT_PID_FILE, LOG_JSON_FILE,
check_kill_error, create_io, create_runc, get_spec_from_request, handle_file_open,
receive_socket, CreateConfig, Log, ProcessIO, ShimExecutor, INIT_PID_FILE, LOG_JSON_FILE,
},
io::Stdio,
};
Expand Down Expand Up @@ -538,11 +538,14 @@ async fn copy_console(
.try_clone()
.await
.map_err(io_error!(e, "failed to clone console file"))?;
let stdin = OpenOptions::new()
.read(true)
.open(stdio.stdin.as_str())
.await
.map_err(io_error!(e, "failed to open stdin"))?;
let stdin = handle_file_open(|| async {
OpenOptions::new()
.read(true)
.open(stdio.stdin.as_str())
.await
})
.await
.map_err(io_error!(e, "failed to open stdin"))?;
spawn_copy(stdin, console_stdin, exit_signal.clone(), None::<fn()>);
}

Expand Down Expand Up @@ -587,30 +590,39 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc<ExitSignal
if let Some(w) = io.stdin() {
debug!("copy_io: pipe stdin from {}", stdio.stdin.as_str());
if !stdio.stdin.is_empty() {
let stdin = OpenOptions::new()
.read(true)
.open(stdio.stdin.as_str())
.await
.map_err(io_error!(e, "open stdin"))?;
let stdin = handle_file_open(|| async {
OpenOptions::new()
.read(true)
.open(stdio.stdin.as_str())
.await
})
.await
.map_err(io_error!(e, "open stdin"))?;
spawn_copy(stdin, w, exit_signal.clone(), None::<fn()>);
}
}

if let Some(r) = io.stdout() {
debug!("copy_io: pipe stdout from to {}", stdio.stdout.as_str());
if !stdio.stdout.is_empty() {
let stdout = OpenOptions::new()
.write(true)
.open(stdio.stdout.as_str())
.await
.map_err(io_error!(e, "open stdout"))?;
let stdout = handle_file_open(|| async {
OpenOptions::new()
.write(true)
.open(stdio.stdout.as_str())
.await
})
.await
.map_err(io_error!(e, "open stdout"))?;
// open a read to make sure even if the read end of containerd shutdown,
// copy still continue until the restart of containerd succeed
let stdout_r = OpenOptions::new()
.read(true)
.open(stdio.stdout.as_str())
.await
.map_err(io_error!(e, "open stdout for read"))?;
let stdout_r = handle_file_open(|| async {
OpenOptions::new()
.read(true)
.open(stdio.stdout.as_str())
.await
})
.await
.map_err(io_error!(e, "open stdout for read"))?;
spawn_copy(
r,
stdout,
Expand All @@ -625,18 +637,24 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc<ExitSignal
if let Some(r) = io.stderr() {
if !stdio.stderr.is_empty() {
debug!("copy_io: pipe stderr from to {}", stdio.stderr.as_str());
let stderr = OpenOptions::new()
.write(true)
.open(stdio.stderr.as_str())
.await
.map_err(io_error!(e, "open stderr"))?;
let stderr = handle_file_open(|| async {
OpenOptions::new()
.write(true)
.open(stdio.stderr.as_str())
.await
})
.await
.map_err(io_error!(e, "open stderr"))?;
// open a read to make sure even if the read end of containerd shutdown,
// copy still continue until the restart of containerd succeed
let stderr_r = OpenOptions::new()
.read(true)
.open(stdio.stderr.as_str())
.await
.map_err(io_error!(e, "open stderr for read"))?;
let stderr_r = handle_file_open(|| async {
OpenOptions::new()
.read(true)
.open(stdio.stderr.as_str())
.await
})
.await
.map_err(io_error!(e, "open stderr for read"))?;
spawn_copy(
r,
stderr,
Expand Down
Loading