Skip to content

Commit

Permalink
Fixed deadlocks with cronjobs
Browse files Browse the repository at this point in the history
  • Loading branch information
maximevanhees committed Oct 10, 2024
1 parent 085cde3 commit e61fc55
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 218 deletions.
2 changes: 0 additions & 2 deletions src/app/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ impl Api {
}

async fn status<S: AsRef<str>>(name: S, zinit: ZInit) -> Result<Value> {
println!("running status function");
let status = zinit.status(&name).await?;

let result = Status {
Expand All @@ -226,7 +225,6 @@ impl Api {
after
},
};
println!("finished running status function");

Ok(encoder::to_value(result)?)
}
Expand Down
3 changes: 0 additions & 3 deletions src/zinit/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ pub struct Service {

impl Service {
pub fn validate(&self) -> Result<()> {
println!("Validating service from dir:{}", self.dir);

use nix::sys::signal::Signal;
use std::str::FromStr;
if self.exec.is_empty() {
Expand Down Expand Up @@ -101,7 +99,6 @@ pub fn load<T: AsRef<Path>>(t: T) -> Result<(String, Service)> {
/// a file, the callback can decide to either ignore the file, or stop
/// the directory walking
pub fn load_dir<T: AsRef<Path>>(p: T) -> Result<Services> {
println!("loading services from directory");
let mut services: Services = HashMap::new();

for entry in fs::read_dir(p)? {
Expand Down
227 changes: 14 additions & 213 deletions src/zinit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,12 @@ impl ZInit {
bail!(ZInitError::ShuttingDown);
}
let name = name.into();
let mut services = self.services.write().await;

let services = self.services.read().await;
if services.contains_key(&name) {
bail!(ZInitError::ServiceAlreadyMonitored { name })
}
drop(services);

// Check that service does not have dependecies that are cron jobs
for dep in &service.after {
Expand All @@ -228,7 +229,11 @@ impl ZInit {
}

let service = Arc::new(RwLock::new(ZInitService::new(service, State::Unknown)));

let mut services = self.services.write().await;
services.insert(name.clone(), Arc::clone(&service));
drop(services);

let m = self.clone();
debug!("service '{}' monitored", name);
tokio::spawn(m.watch(name, service));
Expand Down Expand Up @@ -560,7 +565,6 @@ impl ZInit {
}

async fn handle_scheduling(&self, name: &str, config: &config::Service) -> Result<()> {
println!("entering handle_scheduling function");
'checks: loop {
let sig = self.notify.notified();
debug!("checking {} if it can schedule", name);
Expand All @@ -576,12 +580,10 @@ impl ZInit {
debug!("service {} is blocked, waiting release", name);
sig.await;
}
println!("EXITING handle_scheduling function");
Ok(())
}

async fn wait_for_cron_schedule(&self, name: &str, config: &config::Service) -> Result<()> {
println!("entering wait_for_cron_schedule function");
if let Some(ref schedule) = config.cron {
let now: DateTime<Utc> = Utc::now();
if let Some(next_datetime) = schedule.upcoming(Utc).next() {
Expand All @@ -606,7 +608,6 @@ impl ZInit {
}
}

println!("EXITING wait_for_cron_schedule function");
Ok(())
}

Expand All @@ -616,21 +617,17 @@ impl ZInit {
input: &Arc<RwLock<ZInitService>>,
config: &config::Service,
) -> Result<()> {
println!("entering run_service function");
let log = match config.log {
config::Log::None => Log::None,
config::Log::Stdout => Log::Stdout,
config::Log::Ring => Log::Ring(name.to_string()),
};

println!("1");
let mut service = input.write().await;
println!("2");

if service.target == Target::Down {
return Err(anyhow!("Service '{}' target is down", name));
}
println!("3");

let child = self
.pm
Expand All @@ -639,19 +636,14 @@ impl ZInit {
log.clone(),
)
.await?;
println!("4");

service.state.set(State::Spawned);
service.pid = child.pid;
println!("run_service funciton: dropping write lock");
drop(service);
println!("run_service funciton: dropped write lock");

let result = child.wait().await;

println!("run_service funciton: aqcuiring write lock");
let mut service = input.write().await;
println!("run_service funciton: got write lock");
service.pid = Pid::from_raw(0);

match result {
Expand All @@ -668,12 +660,10 @@ impl ZInit {
}
}

println!("EXITING run_service function");
Ok(())
}

async fn watch(self, name: String, input: Arc<RwLock<ZInitService>>) {
println!("entering watch function");
let mut service = input.write().await;

if service.target == Target::Down || service.scheduled {
Expand All @@ -684,27 +674,21 @@ impl ZInit {
drop(service); // Release the lock

loop {
println!("entering watch function loop");
if let Err(err) = self
.handle_scheduling(&name, &input.read().await.service)
.await
{
let service_clone = {
let service_read_guard = input.read().await;
service_read_guard.service.clone()
};

if let Err(err) = self.handle_scheduling(&name, &service_clone).await {
error!("Scheduling error for '{}': {}", name, err);
break;
}

if let Err(err) = self
.wait_for_cron_schedule(&name, &input.read().await.service)
.await
{
if let Err(err) = self.wait_for_cron_schedule(&name, &service_clone).await {
debug!("Service '{}' scheduling completed: {}", name, err);
break;
}

let service_clone = {
let service_read_guard = input.read().await;
service_read_guard.service.clone()
};
if let Err(err) = self.run_service(&name, &input, &service_clone).await {
error!("Error running service '{}': {}", name, err);
break;
Expand All @@ -724,190 +708,7 @@ impl ZInit {

let mut service = input.write().await;
service.scheduled = false;
println!("EXITING watch function");
}

// async fn watch(self, name: String, input: Arc<RwLock<ZInitService>>) {
// println!("entering watch function for {}", name.clone());
// let name = name.clone();
// let mut service = input.write().await;
//
// if service.target == Target::Down {
// debug!("service '{}' target is down", name);
// return;
// }
//
// if service.scheduled {
// debug!("service '{}' already scheduled", name);
// return;
// }
//
// service.scheduled = true;
// drop(service); // Release the lock
//
// loop {
// let name = name.clone();
//
// let service = input.read().await;
// // early check if service is down, so we don't have to do extra checks
// if service.target == Target::Down {
// // we check target in loop in case service have
// // been set down.
// break;
// }
// let config = service.service.clone();
// // we need to spawn this service now, but is it ready?
// // are all dependent services are running ?
//
// // so we drop the table to give other services
// // chance to acquire the lock and schedule themselves
// drop(service);
//
// // If cron is specified for a oneshot service, schedule the execution
// if let Some(ref schedule) = config.cron {
// println!("Got a CRON service: {:#?}", schedule);
// // Get current time (for now only UTC time supported)
// // TODO: dynamically adjust to machine's timezone
// let now: DateTime<Utc> = Utc::now();
//
// // Get next scheduled time
// if let Some(next_datetime) = schedule.upcoming(Utc).next() {
// let duration = next_datetime
// .signed_duration_since(now)
// .to_std()
// .unwrap_or(Duration::from_secs(0));
//
// debug!("service {} scheduled to run at {}", name, next_datetime);
// println!("service {} scheduled to run at {}", name, next_datetime);
// // Set state of service to `Scheduled`
// self.set(&name, Some(State::ScheduledCron), None).await;
// // Wait until the next scheduled time
// sleep_until(Instant::now() + duration).await;
//
// // Before executing, check if service is still up and not shutting down
// if *self.shutdown.read().await || self.is_target_down(&name).await {
// break;
// }
// } else {
// // No upcoming scheduled times; exit the loop
// debug!("service '{}' has not more scheduled runs", name);
// println!("service '{}' has not more scheduled runs", name);
// // break;
// }
// } else if config.one_shot {
// // For oneshot services without cron, proceed immediately
// debug!(
// "service '{}' is a oneshot service without cron; starting immediately",
// name
// );
// } else {
// // For non-oneshot services, proceed as usual
// debug!("service '{}' is not a oneshot service: proceeding", name);
// }
//
// 'checks: loop {
// let sig = self.notify.notified();
// debug!("checking {} if it can schedule", name);
// println!("checking {} if it can schedule", name);
// if self.can_schedule(&config).await {
// debug!("service {} can schedule", name);
// println!("service {} can schedule", name);
// break 'checks;
// }
//
// self.set(&name, Some(State::Blocked), None).await;
// // don't even care if i am lagging
// // as long i am notified that some services status
// // has changed
// debug!("service {} is blocked, waiting release", name);
// println!("service {} is blocked, waiting release", name);
// sig.await;
// }
//
// let log = match config.log {
// config::Log::None => Log::None,
// config::Log::Stdout => Log::Stdout,
// config::Log::Ring => Log::Ring(name.clone()),
// };
//
// let mut service = input.write().await;
// // we check again in case target has changed. Since we had to release the lock
// // earlier to not block locking on this service (for example if a stop was called)
// // while the service was waiting for dependencies.
// // the lock is kept until the spawning and the update of the pid.
// if service.target == Target::Down {
// // we check target in loop in case service have
// // been set down.
// break;
// }
//
// let child = self
// .pm
// .run(
// Process::new(&config.exec, &config.dir, Some(config.env.clone())),
// log.clone(),
// )
// .await;
//
// let child = match child {
// Ok(child) => {
// service.state.set(State::Spawned);
// service.pid = child.pid;
// child
// }
// Err(err) => {
// // so, spawning failed. and nothing we can do about it
// // this can be duo to a bad command or exe not found.
// // set service to failure.
// error!("service {} failed to start: {}", name, err);
// service.state.set(State::Failure);
// break;
// }
// };
//
// service.state.set(State::Running);
// drop(service);
//
// // Wait for the child process to finish
// let result = child.wait().await;
//
// let mut service = input.write().await;
// service.pid = Pid::from_raw(0);
//
// match result {
// Err(err) => {
// error!("failed to read service '{}' status: {}", name, err);
// service.state.set(State::Unknown);
// }
// Ok(status) => {
// service.state.set(if status.success() {
// State::Success
// } else {
// State::Error(status)
// });
// }
// }
//
// drop(service);
//
// // For oneshot services with cron, loop to schedule next execution
// if config.one_shot {
// if config.cron.is_some() {
// continue; // Schedule the next execution
// } else {
// self.notify.notify_waiters();
// break; // No cron; exit the loop
// }
// } else {
// // For non-oneshot services, handle respawn logic
// // Wait before restarting
// time::sleep(Duration::from_secs(2)).await;
// }
// }
//
// let mut service = input.write().await;
// service.scheduled = false;
// }
}

// Helper function to check if the service target is down
async fn is_target_down(&self, name: &str) -> bool {
Expand Down

0 comments on commit e61fc55

Please sign in to comment.