Skip to content

Commit

Permalink
Update tokio to 1.9.0
Browse files Browse the repository at this point in the history
Moving from 0.3.7 -> 1.9.0 was a big change, and introduced some runtime
panics related to unanticipated drops. Seems like that was caused by the
mix-and-match sync/async mess, so tried to remove the most explicit
"blocking" calls and ported most functions to async. Feels like I went
overboard, since the "new" constructors definitely should be blocking,
but things are working now.

Was able to drop dependency on custom ctrl+c handling, since apparently
tokio has that built in, as long as everything's async.
  • Loading branch information
conorsch committed Aug 2, 2021
1 parent 7f923eb commit 19230b9
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 95 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Innisfree changelog

## 0.2.8

* Updates all dependencies to latest
* Uses async function calls where possible
* Debian package reloads systemd, and loosens version dependencies

## 0.2.7

* Publish to crates.io
Expand Down
40 changes: 14 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "innisfree"
version = "0.2.8-alpha.0"
version = "0.2.8-alpha.1"
authors = ["Conor Schaefer <[email protected]>"]
edition = "2018"
description = "Exposes local services on public IPv4 address, via cloud server."
Expand All @@ -13,20 +13,19 @@ keywords = ["self-hosting", "wireguard"]

[dependencies]
clap = "3.0.0-beta.2"
ctrlc = { version = "3.0", features = ["termination"] }
custom_error = "~1.9"
env_logger = "~0.9"
futures = "0.3"
home = "~0.5"
log = "~0.4"
rand = "~0.8"
reqwest = { version = "0.11", features = ["blocking", "json", "rustls"] }
reqwest = { version = "~0.11", features = ["json", "rustls"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.39"
serde_yaml = "0.8.17"
tempfile = "3"
tera = "1"
tokio = { version = "~1.8", features = [ "io-util", "net", "rt-multi-thread", "macros" ] }
tokio = { version = "~1.9", features = [ "io-util", "macros", "net", "rt-multi-thread", "signal"] }

[package.metadata.deb]
maintainer-scripts = "debian/"
Expand Down
7 changes: 4 additions & 3 deletions src/floating_ip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ pub struct FloatingIp {
}

impl FloatingIp {
pub fn assign(&self) {
pub async fn assign(&self) {
let api_key = env::var("DIGITALOCEAN_API_TOKEN").expect("DIGITALOCEAN_API_TOKEN not set.");
let req_body = json!({
"type": "assign",
"droplet_id": self.droplet_id,
});
let request_url = DO_API_BASE_URL.to_owned() + "/" + &self.ip + "/actions";

let client = reqwest::blocking::Client::new();
let client = reqwest::Client::new();
let response = client
.post(request_url)
.json(&req_body)
.bearer_auth(api_key)
.send();
.send()
.await;

match response {
Ok(_) => {
Expand Down
35 changes: 15 additions & 20 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use std::sync::Arc;
extern crate log;
use env_logger::Env;

extern crate ctrlc;

// Innisfree imports
mod cloudinit;
mod config;
Expand Down Expand Up @@ -116,7 +114,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
info!("Will provide proxies for {:?}", services);

info!("Creating server '{}'", &tunnel_name);
let mgr = match manager::InnisfreeManager::new(&tunnel_name, services) {
let mgr = match manager::InnisfreeManager::new(&tunnel_name, services).await {
Ok(m) => m,
Err(e) => {
error!("{}", e);
Expand All @@ -133,24 +131,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
error!("Failed bringing up tunnel: {}", e);
// Error probably unrecoverable
warn!("Attempting to exit gracefully...");
let _ = mgr.clean();
mgr.clean().await;
std::process::exit(2);
}
}

let mgr_ctrlc = mgr.clone();
ctrlc::set_handler(move || {
warn!("Received stop signal, exiting gracefully");
mgr_ctrlc.clean();
debug!("Clean up complete, exiting!");
std::process::exit(0);
})
.expect("Error setting Ctrl-C handler");

// Really need a better default case for floating-ip
if floating_ip != "None" {
debug!("Configuring floating IP...");
mgr.assign_floating_ip(floating_ip);
mgr.assign_floating_ip(floating_ip).await;
info!("Server ready! IPv4 address: {}", floating_ip);
} else {
let ip = &mgr.server.ipv4_address();
Expand All @@ -159,17 +148,16 @@ async fn main() -> Result<(), Box<dyn Error>> {
debug!("Try logging in with 'innisfree ssh'");
let local_ip = String::from(WIREGUARD_LOCAL_IP);
if dest_ip != "127.0.0.1" {
manager::run_proxy(local_ip, dest_ip, mgr.services.clone()).await;
tokio::spawn(manager::run_proxy(local_ip, dest_ip, mgr.services.clone()));
mgr.block().await;
} else {
info!(
"Ready to listen on {}. Start local services. Make sure to bind to 0.0.0.0, rather than 127.0.0.1!",
port_spec
);
debug!("Blocking forever. Press ctrl+c to tear down the tunnel and destroy server.");
// Block forever, ctrl+c will interrupt
loop {
std::thread::sleep(std::time::Duration::from_secs(10));
}
mgr.block().await;
}
} else if let Some(ref _matches) = matches.subcommand_matches("ssh") {
let result = manager::open_shell();
Expand Down Expand Up @@ -197,8 +185,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
let port_spec = matches.value_of("ports").unwrap();
let ports = config::ServicePort::from_str_multi(port_spec);
let local_ip = String::from(WIREGUARD_LOCAL_IP);
warn!("Ctrl+c will not halt proxy, use ctrl+z and `kill -9 %1`");
info!("Starting proxy for services {:?}", ports);
manager::run_proxy(local_ip, dest_ip, ports).await;
match manager::run_proxy(local_ip, dest_ip, ports).await {
Ok(_) => {}
Err(e) => {
error!("Proxy failed: {}", e);
std::process::exit(4);
}
};
} else if let Some(ref _matches) = matches.subcommand_matches("doctor") {
info!("Running doctor, to determine platform support...");
match doctor::platform_is_supported() {
Expand All @@ -214,7 +209,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
"Platform is not supported. =( \
Only recent Linux distros such as Debian, Ubuntu, or Fedora are supported."
);
std::process::exit(4);
std::process::exit(5);
}
}
}
Expand Down
40 changes: 32 additions & 8 deletions src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::proxy::proxy_handler;
use crate::server::InnisfreeServer;
use crate::wg::WireguardManager;
use futures::future::join_all;
use tokio::signal;

#[derive(Debug)]
pub struct InnisfreeManager {
Expand All @@ -15,13 +16,14 @@ pub struct InnisfreeManager {
}

impl InnisfreeManager {
pub fn new(
pub async fn new(
tunnel_name: &str,
services: Vec<ServicePort>,
) -> Result<InnisfreeManager, InnisfreeError> {
clean_config_dir();
let wg = WireguardManager::new()?;
let server = InnisfreeServer::new(&tunnel_name, services, wg.clone().wg_remote_device)?;
let server =
InnisfreeServer::new(&tunnel_name, services, wg.clone().wg_remote_device).await?;
Ok(InnisfreeManager {
name: tunnel_name.to_string(),
services: server.services.to_vec(),
Expand Down Expand Up @@ -71,6 +73,22 @@ impl InnisfreeManager {
}
}
}
/// Wait for an interrupt signal, then terminate gracefully,
/// cleaning up droplet resources before exit.
pub async fn block(&self) {
match signal::ctrl_c().await {
Ok(()) => {
warn!("Received stop signal, exiting gracefully");
self.clean().await;
debug!("Clean up complete, exiting!");
std::process::exit(0);
}
Err(e) => {
error!("Unable to register hook for ctrl+c: {}", e);
std::process::exit(10);
}
}
}
fn test_connection(&self) -> Result<(), InnisfreeError> {
trace!("Inside test connection, setting up vars");
let ip = &self.wg.wg_remote_ip;
Expand Down Expand Up @@ -219,13 +237,14 @@ impl InnisfreeManager {
}
}
}
pub fn clean(&self) {
pub async fn clean(&self) {
debug!("Removing local Wireguard interface");
// Ignore errors, since we want to try all handlers
let _ = self.bring_down_local_wg();
self.server.destroy();
let _ = self.server.destroy().await;
}
pub fn assign_floating_ip(&self, floating_ip: &str) {
self.server.assign_floating_ip(floating_ip);
pub async fn assign_floating_ip(&self, floating_ip: &str) {
self.server.assign_floating_ip(floating_ip).await;
}
}

Expand Down Expand Up @@ -273,7 +292,11 @@ pub fn open_shell() -> Result<(), std::io::Error> {
}
}

pub async fn run_proxy(local_ip: String, dest_ip: String, services: Vec<ServicePort>) {
pub async fn run_proxy(
local_ip: String,
dest_ip: String,
services: Vec<ServicePort>,
) -> Result<(), InnisfreeError> {
// We'll kick off a dedicated proxy for each service,
// and collect the handles to await them all together, concurrently.
let mut tasks = vec![];
Expand All @@ -297,8 +320,9 @@ pub async fn run_proxy(local_ip: String, dest_ip: String, services: Vec<ServiceP
}
Err(e) => {
error!("Service proxy failed: {}", e);
return Err(InnisfreeError::Unknown);
}
}
}
warn!("All proxies down, exiting");
Ok(())
}
6 changes: 3 additions & 3 deletions src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::config::InnisfreeError;
use futures::FutureExt;
use std::error::Error;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;

// Taken from Tokio proxy example (MIT license):
// https://github.com/tokio-rs/tokio/blob/a08ce0d3e06d650361283dc87c8fe14b146df15d/examples/proxy.rs
pub async fn transfer(mut inbound: TcpStream, proxy_addr: String) -> Result<(), Box<dyn Error>> {
pub async fn transfer(mut inbound: TcpStream, proxy_addr: String) -> Result<(), InnisfreeError> {
let mut outbound = TcpStream::connect(proxy_addr).await?;

let (mut ri, mut wi) = inbound.split();
Expand All @@ -26,7 +26,7 @@ pub async fn transfer(mut inbound: TcpStream, proxy_addr: String) -> Result<(),
Ok(())
}

pub async fn proxy_handler(listen_addr: String, dest_addr: String) -> Result<(), Box<dyn Error>> {
pub async fn proxy_handler(listen_addr: String, dest_addr: String) -> Result<(), InnisfreeError> {
debug!("Proxying traffic: {} -> {}", listen_addr, dest_addr);
let listener = tokio::net::TcpListener::bind(listen_addr.clone()).await?;
while let Ok((inbound, _)) = listener.accept().await {
Expand Down
Loading

0 comments on commit 19230b9

Please sign in to comment.