Skip to content

Commit

Permalink
Add support for group operation
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdelrahmanElawady committed May 8, 2024
1 parent 367cc28 commit 94f914c
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 115 deletions.
47 changes: 38 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ thiserror = "1.0"
clap = "2.33"
git-version = "0.3.5"
command-group = "1.0.8"
async-recursion = "1.1.1"
134 changes: 106 additions & 28 deletions src/app/api.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use crate::zinit::{config, ZInit};
use crate::zinit::{self, config, ZInit, ZInitStatus};
use anyhow::{Context, Result};
use nix::sys::signal;
use serde::{Deserialize, Serialize};
use serde_json::{self as encoder, Value};
use std::collections::HashMap;
use std::env::current_dir;
use std::io::{self, ErrorKind};
use std::marker::Unpin;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use tokio::fs;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufStream};
use tokio::net::{UnixListener, UnixStream};

Expand All @@ -24,16 +27,30 @@ enum State {
Error,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase", untagged)]
pub enum Status {
Service(ServiceStatus),
Group(GroupStatus),
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub struct Status {
pub struct ServiceStatus {
pub name: String,
pub pid: u32,
pub state: String,
pub target: String,
pub after: HashMap<String, String>,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub struct GroupStatus {
pub name: String,
pub services: Vec<ServiceStatus>,
}

pub struct Api {
zinit: ZInit,
socket: PathBuf,
Expand Down Expand Up @@ -149,17 +166,55 @@ impl Api {
let services = zinit.list().await?;
let mut map: HashMap<String, String> = HashMap::new();
for service in services {
let state = zinit.status(&service).await?;
map.insert(service, format!("{:?}", state.state));
if let ZInitStatus::Service(state) = zinit.status(&service).await? {
map.insert(service, format!("{:?}", state.state));
}
}

Ok(encoder::to_value(map)?)
}

async fn monitor<S: AsRef<str>>(name: S, zinit: ZInit) -> Result<Value> {
let (name, service) = config::load(format!("{}.yaml", name.as_ref()))
.context("failed to load service config")?;
zinit.monitor(name, service).await?;
match config::load(format!("{}.yaml", name.as_ref())) {
Ok((name, service)) => zinit.monitor(name, config::Entry::Service(service)).await?,
Err(e) => {
if let Some(err) = e.downcast_ref::<io::Error>() {
if err.kind() != ErrorKind::NotFound {
return Err(e.context("failed to load service config"));
}
} else {
return Err(e.context("failed to load service config"));
}
}
}
let canonical_path = fs::canonicalize(name.as_ref()).await?;
let path = if !canonical_path.starts_with(current_dir()?) {
bail!("directory outside of zinit configuration directory")
} else {
canonical_path.strip_prefix(current_dir()?)?
};
let prefix = path.to_str().ok_or(anyhow!("invalid path name"))?;
match config::load_dir_with_prefix(path, prefix.to_string()) {
Ok(services) => {
for (k, v) in services {
if let Err(err) = zinit.monitor(&k, v).await {
error!("failed to monitor service {}: {}", k, err);
};
}
}
Err(e) => {
if let Some(err) = e.downcast_ref::<io::Error>() {
if err.kind() == ErrorKind::NotFound {
bail!(
"neither {}.yaml nor {} directory was found",
name.as_ref(),
name.as_ref()
)
}
}
return Err(e.context("failed to load service config"));
}
}
Ok(Value::Null)
}

Expand Down Expand Up @@ -206,30 +261,50 @@ impl Api {
}

async fn status<S: AsRef<str>>(name: S, zinit: ZInit) -> Result<Value> {
let status = zinit.status(&name).await?;

let result = Status {
name: name.as_ref().into(),
pid: status.pid.as_raw() as u32,
state: format!("{:?}", status.state),
target: format!("{:?}", status.target),
after: {
let mut after = HashMap::new();
for service in status.service.after {
let status = match zinit.status(&service).await {
Ok(dep) => dep.state,
Err(_) => crate::zinit::State::Unknown,
};
after.insert(service, format!("{:?}", status));
}
after
},
let result = match zinit.status(&name).await? {
ZInitStatus::Service(status) => {
Status::Service(zinit_status_to_service_status(name, zinit, status).await)
}
ZInitStatus::Group(group) => Status::Group(GroupStatus {
name: name.as_ref().into(),
services: {
let mut services = vec![];
for (name, status) in group.services {
services
.push(zinit_status_to_service_status(name, zinit.clone(), status).await)
}
services
},
}),
};

Ok(encoder::to_value(result)?)
}
}

async fn zinit_status_to_service_status<S: AsRef<str>>(
name: S,
zinit: ZInit,
status: zinit::ServiceStatus,
) -> ServiceStatus {
ServiceStatus {
name: name.as_ref().into(),
pid: status.pid.as_raw() as u32,
state: format!("{:?}", status.state),
target: format!("{:?}", status.target),
after: {
let mut after = HashMap::new();
for service in status.service.after {
if let Ok(ZInitStatus::Service(status)) = zinit.status(&service).await {
after.insert(service, format!("{:?}", status.state));
} else {
after.insert(service, format!("{:?}", crate::zinit::State::Unknown));
}
}
after
},
}
}

pub struct Client {
socket: PathBuf,
}
Expand Down Expand Up @@ -291,7 +366,8 @@ impl Client {
match filter {
None => tokio::io::copy(&mut con, &mut out).await?,
Some(filter) => {
let filter = format!("{}:", filter.as_ref());
let service_filter = format!("{}:", filter.as_ref());
let group_filter = format!("{}/", filter.as_ref());
let mut stream = BufStream::new(con);
loop {
let mut line = String::new();
Expand All @@ -303,7 +379,9 @@ impl Client {
}
}

if line[4..].starts_with(&filter) {
if line[4..].starts_with(&service_filter)
|| line[4..].starts_with(&group_filter)
{
let _ = out.write_all(line.as_bytes()).await;
}
}
Expand Down
25 changes: 21 additions & 4 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::time;

use self::api::Status;

fn logger(level: log::LevelFilter) -> Result<()> {
let logger = fern::Dispatch::new()
.format(|out, message, record| {
Expand Down Expand Up @@ -135,10 +137,25 @@ pub async fn restart(socket: &str, name: &str) -> Result<()> {
client.stop(name).await?;
//pull status
for _ in 0..20 {
let result = client.status(name).await?;
if result.pid == 0 && result.target == "Down" {
client.start(name).await?;
return Ok(());
match client.status(name).await? {
Status::Service(result) => {
if result.pid == 0 && result.target == "Down" {
client.start(name).await?;
return Ok(());
}
}
Status::Group(result) => {
let mut start = true;
for service in result.services {
if service.pid != 0 || service.target != "Down" {
start = false;
}
}
if start {
client.start(name).await?;
return Ok(());
}
}
}
time::sleep(std::time::Duration::from_secs(1)).await;
}
Expand Down
Loading

0 comments on commit 94f914c

Please sign in to comment.