Skip to content

Commit

Permalink
Logging to file
Browse files Browse the repository at this point in the history
  • Loading branch information
maximevanhees committed Oct 4, 2024
1 parent 367cc28 commit e2d0fdf
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 17 deletions.
3 changes: 2 additions & 1 deletion docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ after: # list of services that we depend on (optional)
- service2_name
signal: # optional section
stop: SIGKILL # the signal sent on `stop` action. default to SIGTERM
log: null | ring | stdout
log: null | ring | stdout | file
log_file: "/path/to/logfile" # required that `log` is set to `file`
env:
KEY: VALUE
```
Expand Down
158 changes: 142 additions & 16 deletions src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ use std::os::unix::io::IntoRawFd;
use std::process::Command;
use std::process::Stdio;
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::AsyncBufReadExt;
use tokio::fs::{self, File, OpenOptions};
use tokio::io::BufReader;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
use tokio::signal::unix;
use tokio::sync::oneshot;
use tokio::sync::Mutex;

mod buffer;
pub use buffer::Logs;

const MAX_LOG_FILE_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB

pub struct Process {
cmd: String,
env: HashMap<String, String>,
Expand Down Expand Up @@ -65,6 +67,7 @@ pub enum Log {
None,
Stdout,
Ring(String),
File(String),
}

#[derive(Clone)]
Expand Down Expand Up @@ -167,7 +170,7 @@ impl ProcessManager {

let child = match log {
Log::None => child.stdout(Stdio::null()).stderr(Stdio::null()),
Log::Ring(_) => child.stdout(Stdio::piped()).stderr(Stdio::piped()),
Log::Ring(_) | Log::File(_) => child.stdout(Stdio::piped()).stderr(Stdio::piped()),
_ => child, // default to inherit
};

Expand All @@ -178,21 +181,15 @@ impl ProcessManager {
.context("failed to spawn command")?
.into_inner();

if let Log::Ring(prefix) = log {
let _ = self
.ring
.push(format!("[-] {}: ------------ [start] ------------", prefix))
.await;

if let Some(out) = child.stdout.take() {
let out = File::from_std(unsafe { StdFile::from_raw_fd(out.into_raw_fd()) });
self.sink(out, format!("[+] {}", prefix))
match log.clone() {
Log::Ring(prefix) => {
self.handle_ring_logging(&mut child, prefix).await?;
}

if let Some(out) = child.stderr.take() {
let out = File::from_std(unsafe { StdFile::from_raw_fd(out.into_raw_fd()) });
self.sink(out, format!("[-] {}", prefix))
Log::File(file_path) => {
self.handle_file_logging(&mut child, file_path, cmd.cmd.clone())
.await?;
}
_ => {}
}

let (tx, rx) = oneshot::channel();
Expand All @@ -204,6 +201,135 @@ impl ProcessManager {

Ok(Child::new(pid, rx))
}

async fn handle_ring_logging(
&self,
child: &mut std::process::Child,
prefix: String,
) -> Result<()> {
let _ = self
.ring
.push(format!("[-] {}: ------------ [start] ------------", prefix))
.await;

if let Some(out) = child.stdout.take() {
let out = File::from_std(unsafe { StdFile::from_raw_fd(out.into_raw_fd()) });
self.sink(out, format!("[+] {}", prefix));
}

if let Some(out) = child.stderr.take() {
let out = File::from_std(unsafe { StdFile::from_raw_fd(out.into_raw_fd()) });
self.sink(out, format!("[-] {}", prefix));
}

Ok(())
}

async fn handle_file_logging(
&self,
child: &mut std::process::Child,
file_path: String,
prefix: String,
) -> Result<()> {
// Check if the file exceeds the maximum size
if let Ok(metadata) = fs::metadata(&file_path).await {
if metadata.len() >= MAX_LOG_FILE_SIZE {
// Truncate the file
fs::remove_file(&file_path).await?;
// Recreate the file
fs::File::create(&file_path).await?;
}
}

// Open the log file in append mode
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&file_path)
.await
.context(format!("failed to open log file '{}'", file_path))?;

// Wrap file in BufWriter
let writer = BufWriter::new(file);
let writer = Arc::new(Mutex::new(writer)); // Shared writer

if let Some(stdout) = child.stdout.take() {
let stdout_writer = Arc::clone(&writer);
let stdout = File::from_std(unsafe { StdFile::from_raw_fd(stdout.into_raw_fd()) });
self.sink_with_file(stdout, stdout_writer, prefix.clone(), file_path.clone())
.await;
}

if let Some(stderr) = child.stderr.take() {
let stderr_writer = Arc::clone(&writer);
let stderr = File::from_std(unsafe { StdFile::from_raw_fd(stderr.into_raw_fd()) });
self.sink_with_file(stderr, stderr_writer, prefix, file_path)
.await;
}

Ok(())
}

async fn sink_with_file(
&self,
reader: File,
writer: Arc<Mutex<BufWriter<tokio::fs::File>>>,
prefix: String,
file_path: String,
) {
let ring = self.ring.clone();
tokio::spawn(async move {
let mut lines = BufReader::new(reader).lines();
while let Ok(line) = lines.next_line().await {
match line {
Some(line) => {
// Write to ring buffer
let log_line = format!("{}: {}", prefix, line);
let _ = ring.push(log_line.clone()).await;

// Write to file
{
let mut writer = writer.lock().await; // Acquire lock
if let Err(err) = writer.write_all(log_line.as_bytes()).await {
error!("failed to write to log file: {}", err);
break;
}
if let Err(err) = writer.write_all(b"\n").await {
error!("failed to write to log file: {}", err);
break;
}
if let Err(err) = writer.flush().await {
error!("failed to flush log file buffer: {}", err);
break;
}
}

// Check file size
if let Ok(metadata) = fs::metadata(&file_path).await {
if metadata.len() >= MAX_LOG_FILE_SIZE {
// Truncate the file
{
let mut writer = writer.lock().await;
if let Err(err) = writer.get_ref().set_len(0).await {
error!("failed to truncate log file: {}", err);
break;
}
if let Err(err) = writer.flush().await {
error!(
"failed to flush log file buffer after truncation: {}",
err
);
break;
}
}
}
}
}
None => break,
}
}
});
}
}

#[derive(Clone)]
Expand Down
9 changes: 9 additions & 0 deletions src/zinit/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub enum Log {
#[default]
Ring,
Stdout,
File,
}

fn default_shutdown_timeout_fn() -> u64 {
Expand All @@ -50,6 +51,7 @@ pub struct Service {
pub after: Vec<String>,
pub signal: Signal,
pub log: Log,
pub log_file: Option<String>,
pub env: HashMap<String, String>,
pub dir: String,
}
Expand All @@ -64,6 +66,13 @@ impl Service {

Signal::from_str(&self.signal.stop.to_uppercase())?;

// Validate `log_file` when `log` is `File`
if let Log::File = self.log {
if self.log_file.is_none() {
bail!("log_file must be specified when log is set to 'file'");
}
}

Ok(())
}
}
Expand Down
18 changes: 18 additions & 0 deletions src/zinit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,15 @@ impl ZInit {
config::Log::None => Log::None,
config::Log::Stdout => Log::Stdout,
config::Log::Ring => Log::Ring(format!("{}/test", name.as_ref())),
config::Log::File => {
if let Some(log_file) = &cfg.log_file {
Log::File(log_file.clone())
} else {
error!("log_file is not specified for service '{}'", name.as_ref());

Log::None
}
}
};

let test = self
Expand Down Expand Up @@ -596,6 +605,15 @@ impl ZInit {
config::Log::None => Log::None,
config::Log::Stdout => Log::Stdout,
config::Log::Ring => Log::Ring(name.clone()),
config::Log::File => {
if let Some(log_file) = &config.log_file {
Log::File(log_file.clone())
} else {
error!("log_file is not specified for service '{}'", name);

Log::None
}
}
};

let mut service = input.write().await;
Expand Down

0 comments on commit e2d0fdf

Please sign in to comment.