From 19230b9979d42f1a8888808ac9de55702d68d1aa Mon Sep 17 00:00:00 2001 From: Conor Schaefer Date: Sun, 1 Aug 2021 17:00:29 -0400 Subject: [PATCH] Update tokio to 1.9.0 Moving from 0.3.7 -> 1.9.0 was a big change, and introduced some runtime panics related to unanticipated drops. Seems like that was caused by the mix-and-match sync/async mess, so tried to remove the most explicit "blocking" calls and ported most functions to async. Feels like I went overboard, since the "new" constructors definitely should be blocking, but things are working now. Was able to drop dependency on custom ctrl+c handling, since apparently tokio has that built in, as long as everything's async. --- CHANGELOG.md | 6 ++++ Cargo.lock | 40 +++++++++----------------- Cargo.toml | 7 ++--- src/floating_ip.rs | 7 +++-- src/main.rs | 35 ++++++++++------------ src/manager.rs | 40 ++++++++++++++++++++------ src/proxy.rs | 6 ++-- src/server.rs | 72 ++++++++++++++++++++++++++-------------------- 8 files changed, 118 insertions(+), 95 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 242a1dd..4fd84f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Innisfree changelog +## 0.2.8 + +* Updates all dependencies to latest +* Uses async function calls where possible +* Debian package reloads systemd, and loosens version dependencies + ## 0.2.7 * Publish to crates.io diff --git a/Cargo.lock b/Cargo.lock index 49a5ec7..ff1886f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -186,16 +186,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "ctrlc" -version = "3.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "232295399409a8b7ae41276757b5a1cc21032848d42bff2352261f958b3ca29a" -dependencies = [ - "nix", - "winapi", -] - [[package]] name = "custom_error" version = "1.9.2" @@ -596,10 +586,9 @@ dependencies = [ [[package]] name = "innisfree" -version = "0.2.8-alpha.0" +version = "0.2.8-alpha.1" dependencies = [ "clap", - "ctrlc", "custom_error", "env_logger", "futures", @@ -727,18 +716,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "nix" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa9b4819da1bc61c0ea48b63b7bc8604064dd43013e7cc325df098d49cd7c18a" -dependencies = [ - "bitflags", - "cc", - "cfg-if", - "libc", -] - [[package]] name = "ntapi" version = "0.3.6" @@ -1228,6 +1205,15 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.3" @@ -1356,9 +1342,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.8.3" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00a287ce596d527f273dea7638a044739234740dbad141e7ed0c62c7d0c9c55a" +checksum = "4b7b349f11a7047e6d1276853e612d152f5e8a352c61917887cc2169e2366b4c" dependencies = [ "autocfg", "bytes", @@ -1366,7 +1352,9 @@ dependencies = [ "memchr", "mio", "num_cpus", + "once_cell", "pin-project-lite", + "signal-hook-registry", "tokio-macros", "winapi", ] diff --git a/Cargo.toml b/Cargo.toml index fefecac..190446f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "innisfree" -version = "0.2.8-alpha.0" +version = "0.2.8-alpha.1" authors = ["Conor Schaefer "] edition = "2018" description = "Exposes local services on public IPv4 address, via cloud server." @@ -13,20 +13,19 @@ keywords = ["self-hosting", "wireguard"] [dependencies] clap = "3.0.0-beta.2" -ctrlc = { version = "3.0", features = ["termination"] } custom_error = "~1.9" env_logger = "~0.9" futures = "0.3" home = "~0.5" log = "~0.4" rand = "~0.8" -reqwest = { version = "0.11", features = ["blocking", "json", "rustls"] } +reqwest = { version = "~0.11", features = ["json", "rustls"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.39" serde_yaml = "0.8.17" tempfile = "3" tera = "1" -tokio = { version = "~1.8", features = [ "io-util", "net", "rt-multi-thread", "macros" ] } +tokio = { version = "~1.9", features = [ "io-util", "macros", "net", "rt-multi-thread", "signal"] } [package.metadata.deb] maintainer-scripts = "debian/" diff --git a/src/floating_ip.rs b/src/floating_ip.rs index 857f16e..537dfc6 100644 --- a/src/floating_ip.rs +++ b/src/floating_ip.rs @@ -12,7 +12,7 @@ pub struct FloatingIp { } impl FloatingIp { - pub fn assign(&self) { + pub async fn assign(&self) { let api_key = env::var("DIGITALOCEAN_API_TOKEN").expect("DIGITALOCEAN_API_TOKEN not set."); let req_body = json!({ "type": "assign", @@ -20,12 +20,13 @@ impl FloatingIp { }); let request_url = DO_API_BASE_URL.to_owned() + "/" + &self.ip + "/actions"; - let client = reqwest::blocking::Client::new(); + let client = reqwest::Client::new(); let response = client .post(request_url) .json(&req_body) .bearer_auth(api_key) - .send(); + .send() + .await; match response { Ok(_) => { diff --git a/src/main.rs b/src/main.rs index 72cafd8..770bedc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,8 +8,6 @@ use std::sync::Arc; extern crate log; use env_logger::Env; -extern crate ctrlc; - // Innisfree imports mod cloudinit; mod config; @@ -116,7 +114,7 @@ async fn main() -> Result<(), Box> { info!("Will provide proxies for {:?}", services); info!("Creating server '{}'", &tunnel_name); - let mgr = match manager::InnisfreeManager::new(&tunnel_name, services) { + let mgr = match manager::InnisfreeManager::new(&tunnel_name, services).await { Ok(m) => m, Err(e) => { error!("{}", e); @@ -133,24 +131,15 @@ async fn main() -> Result<(), Box> { error!("Failed bringing up tunnel: {}", e); // Error probably unrecoverable warn!("Attempting to exit gracefully..."); - let _ = mgr.clean(); + mgr.clean().await; std::process::exit(2); } } - let mgr_ctrlc = mgr.clone(); - ctrlc::set_handler(move || { - warn!("Received stop signal, exiting gracefully"); - mgr_ctrlc.clean(); - debug!("Clean up complete, exiting!"); - std::process::exit(0); - }) - .expect("Error setting Ctrl-C handler"); - // Really need a better default case for floating-ip if floating_ip != "None" { debug!("Configuring floating IP..."); - mgr.assign_floating_ip(floating_ip); + mgr.assign_floating_ip(floating_ip).await; info!("Server ready! IPv4 address: {}", floating_ip); } else { let ip = &mgr.server.ipv4_address(); @@ -159,7 +148,8 @@ async fn main() -> Result<(), Box> { debug!("Try logging in with 'innisfree ssh'"); let local_ip = String::from(WIREGUARD_LOCAL_IP); if dest_ip != "127.0.0.1" { - manager::run_proxy(local_ip, dest_ip, mgr.services.clone()).await; + tokio::spawn(manager::run_proxy(local_ip, dest_ip, mgr.services.clone())); + mgr.block().await; } else { info!( "Ready to listen on {}. Start local services. Make sure to bind to 0.0.0.0, rather than 127.0.0.1!", @@ -167,9 +157,7 @@ async fn main() -> Result<(), Box> { ); debug!("Blocking forever. Press ctrl+c to tear down the tunnel and destroy server."); // Block forever, ctrl+c will interrupt - loop { - std::thread::sleep(std::time::Duration::from_secs(10)); - } + mgr.block().await; } } else if let Some(ref _matches) = matches.subcommand_matches("ssh") { let result = manager::open_shell(); @@ -197,8 +185,15 @@ async fn main() -> Result<(), Box> { let port_spec = matches.value_of("ports").unwrap(); let ports = config::ServicePort::from_str_multi(port_spec); let local_ip = String::from(WIREGUARD_LOCAL_IP); + warn!("Ctrl+c will not halt proxy, use ctrl+z and `kill -9 %1`"); info!("Starting proxy for services {:?}", ports); - manager::run_proxy(local_ip, dest_ip, ports).await; + match manager::run_proxy(local_ip, dest_ip, ports).await { + Ok(_) => {} + Err(e) => { + error!("Proxy failed: {}", e); + std::process::exit(4); + } + }; } else if let Some(ref _matches) = matches.subcommand_matches("doctor") { info!("Running doctor, to determine platform support..."); match doctor::platform_is_supported() { @@ -214,7 +209,7 @@ async fn main() -> Result<(), Box> { "Platform is not supported. =( \ Only recent Linux distros such as Debian, Ubuntu, or Fedora are supported." ); - std::process::exit(4); + std::process::exit(5); } } } diff --git a/src/manager.rs b/src/manager.rs index d366ad8..25898cc 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -3,6 +3,7 @@ use crate::proxy::proxy_handler; use crate::server::InnisfreeServer; use crate::wg::WireguardManager; use futures::future::join_all; +use tokio::signal; #[derive(Debug)] pub struct InnisfreeManager { @@ -15,13 +16,14 @@ pub struct InnisfreeManager { } impl InnisfreeManager { - pub fn new( + pub async fn new( tunnel_name: &str, services: Vec, ) -> Result { clean_config_dir(); let wg = WireguardManager::new()?; - let server = InnisfreeServer::new(&tunnel_name, services, wg.clone().wg_remote_device)?; + let server = + InnisfreeServer::new(&tunnel_name, services, wg.clone().wg_remote_device).await?; Ok(InnisfreeManager { name: tunnel_name.to_string(), services: server.services.to_vec(), @@ -71,6 +73,22 @@ impl InnisfreeManager { } } } + /// Wait for an interrupt signal, then terminate gracefully, + /// cleaning up droplet resources before exit. + pub async fn block(&self) { + match signal::ctrl_c().await { + Ok(()) => { + warn!("Received stop signal, exiting gracefully"); + self.clean().await; + debug!("Clean up complete, exiting!"); + std::process::exit(0); + } + Err(e) => { + error!("Unable to register hook for ctrl+c: {}", e); + std::process::exit(10); + } + } + } fn test_connection(&self) -> Result<(), InnisfreeError> { trace!("Inside test connection, setting up vars"); let ip = &self.wg.wg_remote_ip; @@ -219,13 +237,14 @@ impl InnisfreeManager { } } } - pub fn clean(&self) { + pub async fn clean(&self) { debug!("Removing local Wireguard interface"); + // Ignore errors, since we want to try all handlers let _ = self.bring_down_local_wg(); - self.server.destroy(); + let _ = self.server.destroy().await; } - pub fn assign_floating_ip(&self, floating_ip: &str) { - self.server.assign_floating_ip(floating_ip); + pub async fn assign_floating_ip(&self, floating_ip: &str) { + self.server.assign_floating_ip(floating_ip).await; } } @@ -273,7 +292,11 @@ pub fn open_shell() -> Result<(), std::io::Error> { } } -pub async fn run_proxy(local_ip: String, dest_ip: String, services: Vec) { +pub async fn run_proxy( + local_ip: String, + dest_ip: String, + services: Vec, +) -> Result<(), InnisfreeError> { // We'll kick off a dedicated proxy for each service, // and collect the handles to await them all together, concurrently. let mut tasks = vec![]; @@ -297,8 +320,9 @@ pub async fn run_proxy(local_ip: String, dest_ip: String, services: Vec { error!("Service proxy failed: {}", e); + return Err(InnisfreeError::Unknown); } } } - warn!("All proxies down, exiting"); + Ok(()) } diff --git a/src/proxy.rs b/src/proxy.rs index 9e2a77b..35ad331 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -1,11 +1,11 @@ +use crate::config::InnisfreeError; use futures::FutureExt; -use std::error::Error; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; // Taken from Tokio proxy example (MIT license): // https://github.com/tokio-rs/tokio/blob/a08ce0d3e06d650361283dc87c8fe14b146df15d/examples/proxy.rs -pub async fn transfer(mut inbound: TcpStream, proxy_addr: String) -> Result<(), Box> { +pub async fn transfer(mut inbound: TcpStream, proxy_addr: String) -> Result<(), InnisfreeError> { let mut outbound = TcpStream::connect(proxy_addr).await?; let (mut ri, mut wi) = inbound.split(); @@ -26,7 +26,7 @@ pub async fn transfer(mut inbound: TcpStream, proxy_addr: String) -> Result<(), Ok(()) } -pub async fn proxy_handler(listen_addr: String, dest_addr: String) -> Result<(), Box> { +pub async fn proxy_handler(listen_addr: String, dest_addr: String) -> Result<(), InnisfreeError> { debug!("Proxying traffic: {} -> {}", listen_addr, dest_addr); let listener = tokio::net::TcpListener::bind(listen_addr.clone()).await?; while let Ok((inbound, _)) = listener.accept().await { diff --git a/src/server.rs b/src/server.rs index fd755e3..6fac9e5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -40,7 +40,7 @@ pub struct InnisfreeServer { } impl InnisfreeServer { - pub fn new( + pub async fn new( name: &str, services: Vec, wg_device: WireguardDevice, @@ -54,7 +54,7 @@ impl InnisfreeServer { &wg_device, &services, )?; - let droplet = Droplet::new(&name, &user_data)?; + let droplet = Droplet::new(&name, &user_data).await?; Ok(InnisfreeServer { services, ssh_client_keypair, @@ -68,12 +68,12 @@ impl InnisfreeServer { let droplet = &self.droplet; droplet.ipv4_address() } - pub fn assign_floating_ip(&self, floating_ip: &str) { + pub async fn assign_floating_ip(&self, floating_ip: &str) { let f = FloatingIp { ip: floating_ip.to_owned(), droplet_id: self.droplet.id, }; - f.assign(); + f.assign().await; } // Dead code because it's debug-only, might want again. #[allow(dead_code)] @@ -90,9 +90,9 @@ impl InnisfreeServer { fpath.push("cloudinit.cfg"); std::fs::write(&fpath.to_str().unwrap(), &user_data).expect("Failed to create cloud-init"); } - pub fn destroy(&self) { + pub async fn destroy(&self) -> Result<(), InnisfreeError> { // Destroys backing droplet - self.droplet.destroy(); + Ok(self.droplet.destroy().await?) } } @@ -106,7 +106,7 @@ struct Droplet { } impl Droplet { - fn new(name: &str, user_data: &str) -> Result { + async fn new(name: &str, user_data: &str) -> Result { debug!("Creating new DigitalOcean Droplet"); // Build JSON request body, for sending to DigitalOcean API let droplet_body = json!({ @@ -121,33 +121,41 @@ impl Droplet { // Right now we only create Droplet resources, but an API Firewall would be nice. let api_key = env::var("DIGITALOCEAN_API_TOKEN").expect("DIGITALOCEAN_API_TOKEN not set."); let request_url = DO_API_BASE_URL; - let client = reqwest::blocking::Client::new(); + let client = reqwest::Client::new(); let response = client .post(request_url) .json(&droplet_body) .bearer_auth(api_key) - .send()?; + .send() + .await?; - let j: serde_json::Value = response.json().unwrap(); + let j: serde_json::Value = response.json().await?; let d: String = j["droplet"].to_string(); let droplet: Droplet = serde_json::from_str(&d).unwrap(); debug!("Server created, waiting for networking"); - Ok(droplet.wait_for_boot()) + Ok(droplet.wait_for_boot().await?) } - fn wait_for_boot(&self) -> Droplet { + async fn wait_for_boot(&self) -> Result { // The JSON response for droplet creation won't include info like // public IPv4 address, because that hasn't been assigned yet. The 'status' // field will show as "new", so wait until it's "active", then network info // will be populated. Might be a good use of enums here. loop { thread::sleep(time::Duration::from_secs(10)); - let droplet: Droplet = get_droplet(&self); - if droplet.status == "active" { - return droplet; - } else { - info!("Server still booting, waiting..."); + match get_droplet(&self).await { + Ok(droplet) => { + if droplet.status == "active" { + return Ok(droplet); + } else { + info!("Server still booting, waiting..."); + continue; + } + } + Err(_e) => { + return Err(InnisfreeError::Unknown); + } } } } @@ -163,38 +171,40 @@ impl Droplet { } ip } - pub fn destroy(&self) { - destroy_droplet(&self); + pub async fn destroy(&self) -> Result<(), InnisfreeError> { + Ok(destroy_droplet(&self).await?) } } // Polls a droplet resource to get the latest data. Used during wait for boot, // to capture networking info like PublicIPv4, which is assigned after creation. -fn get_droplet(droplet: &Droplet) -> Droplet { +async fn get_droplet(droplet: &Droplet) -> Result { let api_key = env::var("DIGITALOCEAN_API_TOKEN").expect("DIGITALOCEAN_API_TOKEN not set."); let request_url = DO_API_BASE_URL.to_owned() + "/" + &droplet.id.to_string(); - let client = reqwest::blocking::Client::new(); - let response = client.get(request_url).bearer_auth(api_key).send().unwrap(); - - let j: serde_json::Value = response.json().unwrap(); - let d: String = j["droplet"].to_string(); - serde_json::from_str(&d).unwrap() + let client = reqwest::Client::new(); + let response = client.get(request_url).bearer_auth(api_key).send().await?; + let j: serde_json::Value = response.json().await?; + let d_s: String = j["droplet"].to_string(); + let d = serde_json::from_str(&d_s).unwrap(); + Ok(d) } // Calls the API to destroy a droplet. -fn destroy_droplet(droplet: &Droplet) { +async fn destroy_droplet(droplet: &Droplet) -> Result<(), InnisfreeError> { let api_key = env::var("DIGITALOCEAN_API_TOKEN").expect("DIGITALOCEAN_API_TOKEN not set."); let request_url = DO_API_BASE_URL.to_owned() + "/" + &droplet.id.to_string(); - let client = reqwest::blocking::Client::new(); - let response = client.delete(request_url).bearer_auth(api_key).send(); + let client = reqwest::Client::new(); + let response = client.delete(request_url).bearer_auth(api_key).send().await; match response { - Ok(_) => { - debug!("Destroying droplet"); + Ok(_r) => { + debug!("Droplet destroyed"); + Ok(()) } Err(e) => { error!("Failed to destroy droplet: {}", e); + Err(InnisfreeError::NetworkError { source: e }) } } }