From e83b1d090e70286945390f1efe545904d1fefc7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B9=94=E7=90=9B=2010307740?= Date: Wed, 3 Apr 2024 14:11:07 +0800 Subject: [PATCH] Feature: Add oom monitor for cgroupv2. --- crates/runc-shim/Cargo.toml | 2 +- crates/runc-shim/src/cgroup_memory.rs | 207 +++++++++++++++++++++----- crates/runc-shim/src/task.rs | 13 +- 3 files changed, 178 insertions(+), 44 deletions(-) diff --git a/crates/runc-shim/Cargo.toml b/crates/runc-shim/Cargo.toml index 2d2a67c5..da0efd7e 100644 --- a/crates/runc-shim/Cargo.toml +++ b/crates/runc-shim/Cargo.toml @@ -45,4 +45,4 @@ tokio = { workspace = true, features = ["full"] } [target.'cfg(target_os = "linux")'.dependencies] cgroups-rs = "0.3.3" -nix = { workspace = true, features = ["event"] } \ No newline at end of file +nix = { workspace = true, features = ["event", "inotify"] } \ No newline at end of file diff --git a/crates/runc-shim/src/cgroup_memory.rs b/crates/runc-shim/src/cgroup_memory.rs index 304b07cf..a5cc22fe 100644 --- a/crates/runc-shim/src/cgroup_memory.rs +++ b/crates/runc-shim/src/cgroup_memory.rs @@ -17,21 +17,32 @@ #![cfg(target_os = "linux")] use std::{ - os::unix::io::{AsRawFd, FromRawFd}, + collections::HashMap, + mem::size_of, + os::{ + fd::AsFd, + unix::io::{AsRawFd, FromRawFd}, + }, path::Path, + sync::Arc, }; use containerd_shim::{ error::{Error, Result}, io_error, other_error, }; -use nix::sys::eventfd::{EfdFlags, EventFd}; +use nix::sys::{ + eventfd::{EfdFlags, EventFd}, + inotify, +}; use tokio::{ fs::{self, read_to_string, File}, io::AsyncReadExt, sync::mpsc::{self, Receiver}, }; +pub const DEFAULT_CGROUPV2_PATH: &str = "/sys/fs/cgroup"; + pub async fn get_path_from_cgorup(pid: u32) -> Result { let proc_path = format!("/proc/{}/cgroup", pid); let path_string = read_to_string(&proc_path) @@ -126,6 +137,111 @@ pub async fn register_memory_event( Ok(receiver) } +fn memory_event_fd(path: &Path) -> Result { + let instance = inotify::Inotify::init(inotify::InitFlags::empty())?; + + let fpath = path.join("memory.events"); + instance.add_watch(&fpath, inotify::AddWatchFlags::IN_MODIFY)?; + + let evpath = path.join("cgroup.events"); + instance.add_watch(&evpath, inotify::AddWatchFlags::IN_MODIFY)?; + + Ok(instance) +} + +async fn parse_kv_file(cg_dir: &Path, file: &str) -> Result> { + let path = cg_dir.join(file); + let mut map: HashMap = HashMap::new(); + + let file_string = read_to_string(path.clone()).await.map_err(io_error!( + e, + "open {}.", + path.to_string_lossy() + ))?; + + for line in file_string.lines() { + if let Some((key, val)) = line.split_once(" ") { + let val = val.parse::()?; + map.insert(key.to_string(), val); + } + } + + Ok(map) +} + +pub async fn register_memory_event_v2(key: &str, cg_dir: &Path) -> Result> { + let (sender, receiver) = mpsc::channel(128); + let cg_dir = Arc::new(Box::from(cg_dir)); + let key = key.to_string(); + + tokio::spawn(async move { + let inotify = memory_event_fd(&cg_dir).unwrap(); + let mut eventfd_file = unsafe { File::from_raw_fd(inotify.as_fd().as_raw_fd()) }; + let mut buffer: [u8; 4096] = [0u8; 4096]; + + let mut lastoom_map: HashMap = HashMap::new(); + loop { + let nread = match eventfd_file.read(&mut buffer).await { + Ok(nread) => nread, + Err(_) => return, + }; + if nread >= size_of::() { + match parse_kv_file(&cg_dir, "memory.events").await { + Ok(mem_map) => { + let last_oom_kill = match lastoom_map.get(&key) { + Some(v) => v, + None => &0, + }; + + let oom_kill = match mem_map.get("oom_kill") { + Some(v) => v, + None => return, + }; + + if *oom_kill > *last_oom_kill { + sender.send(key.clone()).await.unwrap(); + } + if *oom_kill > 0 { + lastoom_map.insert(key.to_string(), *oom_kill); + } + } + Err(_) => return, + }; + + let cg_map = match parse_kv_file(&cg_dir, "cgroup.events").await { + Ok(cg_map) => cg_map, + Err(_) => return, + }; + match cg_map.get("populated") { + Some(v) if *v == 0 => return, + Some(_) => (), + None => return, + }; + } + } + }); + + Ok(receiver) +} + +pub async fn get_path_from_cgorup_v2(pid: u32) -> Result { + let proc_path = format!("/proc/{}/cgroup", pid); + let path_string = read_to_string(&proc_path) + .await + .map_err(io_error!(e, "open {}.", &proc_path))?; + + let (_, path) = path_string + .lines() + .nth(0) + .ok_or(Error::Other( + "Error happened while geting the path from cgroup of container process pid.".into(), + ))? + .split_once("::") + .ok_or(Error::Other("Failed to parse memory line".into()))?; + + Ok(path.to_string()) +} + #[cfg(test)] mod tests { use std::path::Path; @@ -140,28 +256,29 @@ mod tests { use crate::cgroup_memory; #[tokio::test] - async fn test_cgroupv1_oom_monitor() { - if !is_cgroup2_unified_mode() { - // Create a memory cgroup with limits on both memory and swap. - let path = "cgroupv1_oom_monitor"; - let cg = Cgroup::new(hierarchies::auto(), path).unwrap(); - - let mem_controller: &MemController = cg.controller_of().unwrap(); - mem_controller.set_limit(10 * 1024 * 1024).unwrap(); // 10M - mem_controller.set_swappiness(0).unwrap(); - - // Create a sh sub process, and let it wait for the stdinput. - let mut child_process = Command::new("sh") - .stdin(std::process::Stdio::piped()) - .spawn() - .unwrap(); + async fn test_cgroup_oom_monitor() { + // Create a memory cgroup with limits on both memory and swap. + let path = "cgroupv1_oom_monitor"; + let cg = Cgroup::new(hierarchies::auto(), path).unwrap(); + + let mem_controller: &MemController = cg.controller_of().unwrap(); + mem_controller.set_limit(10 * 1024 * 1024).unwrap(); // 10M + mem_controller.set_swappiness(0).unwrap(); + + // Create a sh sub process, and let it wait for the stdinput. + let mut child_process = Command::new("sh") + .stdin(std::process::Stdio::piped()) + .spawn() + .unwrap(); - let pid = child_process.id().unwrap(); + let pid = child_process.id().unwrap(); - // Add the sh subprocess to the cgroup. - cg.add_task_by_tgid(CgroupPid::from(pid as u64)).unwrap(); + // Add the sh subprocess to the cgroup. + cg.add_task_by_tgid(CgroupPid::from(pid as u64)).unwrap(); + let mut rx: tokio::sync::mpsc::Receiver; - // Set oom monitor + if !is_cgroup2_unified_mode() { + // Set cgroupv1 oom monitor let path_from_cgorup = cgroup_memory::get_path_from_cgorup(pid).await.unwrap(); let (mount_root, mount_point) = cgroup_memory::get_existing_cgroup_mem_path(path_from_cgorup) @@ -169,34 +286,42 @@ mod tests { .unwrap(); let mem_cgroup_path = mount_point + &mount_root; - let mut rx = cgroup_memory::register_memory_event( + rx = cgroup_memory::register_memory_event( pid.to_string().as_str(), Path::new(&mem_cgroup_path), "memory.oom_control", ) .await .unwrap(); + } else { + // Set cgroupv2 oom monitor + let path_from_cgorup = cgroup_memory::get_path_from_cgorup_v2(pid).await.unwrap(); + let mem_cgroup_path = + cgroup_memory::DEFAULT_CGROUPV2_PATH.to_owned() + &path_from_cgorup; - // Exec the sh subprocess to a dd command that consumes more than 10M of memory. - if let Some(mut stdin) = child_process.stdin.take() { - stdin - .write_all( - b"exec dd if=/dev/zero of=/tmp/test_oom_monitor_file bs=11M count=1\n", - ) - .await - .unwrap(); - stdin.flush().await.unwrap(); - } - - // Wait for the oom message. - if let Some(item) = rx.recv().await { - assert_eq!(pid.to_string(), item, "Receive error oom message"); - } + rx = cgroup_memory::register_memory_event_v2( + pid.to_string().as_str(), + Path::new(&mem_cgroup_path), + ) + .await + .unwrap(); + } + // Exec the sh subprocess to a dd command that consumes more than 10M of memory. + if let Some(mut stdin) = child_process.stdin.take() { + stdin + .write_all(b"exec dd if=/dev/zero of=/tmp/test_oom_monitor_file bs=11M count=1\n") + .await + .unwrap(); + stdin.flush().await.unwrap(); + } - // Clean. - child_process.wait().await.unwrap(); - cg.delete().unwrap(); - remove_file("/tmp/test_oom_monitor_file").await.unwrap(); + // Wait for the oom message. + if let Some(item) = rx.recv().await { + assert_eq!(pid.to_string(), item, "Receive error oom message"); } + // Clean. + child_process.wait().await.unwrap(); + cg.delete().unwrap(); + remove_file("/tmp/test_oom_monitor_file").await.unwrap(); } } diff --git a/crates/runc-shim/src/task.rs b/crates/runc-shim/src/task.rs index eec1237d..343811f6 100644 --- a/crates/runc-shim/src/task.rs +++ b/crates/runc-shim/src/task.rs @@ -131,22 +131,31 @@ fn run_oom_monitor(mut rx: Receiver, id: String, tx: EventSender) { #[cfg(target_os = "linux")] async fn monitor_oom(id: &String, pid: u32, tx: EventSender) -> Result<()> { + // std::thread::sleep(std::time::Duration::from_secs(20)); + let rx: Receiver; if !is_cgroup2_unified_mode() { let path_from_cgorup = cgroup_memory::get_path_from_cgorup(pid).await?; let (mount_root, mount_point) = cgroup_memory::get_existing_cgroup_mem_path(path_from_cgorup).await?; let mem_cgroup_path = mount_point + &mount_root; - let rx = cgroup_memory::register_memory_event( + rx = cgroup_memory::register_memory_event( id, Path::new(&mem_cgroup_path), "memory.oom_control", ) .await .map_err(other_error!(e, "register_memory_event failed:"))?; + } else { + let path_from_cgorup = cgroup_memory::get_path_from_cgorup_v2(pid).await?; + let mem_cgroup_path = cgroup_memory::DEFAULT_CGROUPV2_PATH.to_owned() + &path_from_cgorup; - run_oom_monitor(rx, id.to_string(), tx); + rx = cgroup_memory::register_memory_event_v2(id, Path::new(&mem_cgroup_path)) + .await + .map_err(other_error!(e, "register_memory_event failed:"))?; } + + run_oom_monitor(rx, id.to_string(), tx); Ok(()) }