From 888e81f684bd76f648d349001c36b752eea9412a Mon Sep 17 00:00:00 2001 From: Yuri Astrakhan Date: Wed, 2 Oct 2024 23:41:51 -0400 Subject: [PATCH 1/3] Format code, use local dependencies --- Cargo.lock | 3 --- Cargo.toml | 11 ++++------ src/lib.rs | 60 ++++++++++++++++++++++++++++++++++++++---------------- 3 files changed, 46 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 893551a..29aebe5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1373,7 +1373,6 @@ dependencies = [ [[package]] name = "varnish" version = "0.0.19" -source = "git+https://github.com/gquintard/varnish-rs.git?branch=proc-macro-cleanup-gq#2ccb28e4e77df8300a1b3b20532e56735a8b9804" dependencies = [ "glob", "varnish-macros", @@ -1383,7 +1382,6 @@ dependencies = [ [[package]] name = "varnish-macros" version = "0.0.19" -source = "git+https://github.com/gquintard/varnish-rs.git?branch=proc-macro-cleanup-gq#2ccb28e4e77df8300a1b3b20532e56735a8b9804" dependencies = [ "darling", "prettyplease", @@ -1398,7 +1396,6 @@ dependencies = [ [[package]] name = "varnish-sys" version = "0.0.19" -source = "git+https://github.com/gquintard/varnish-rs.git?branch=proc-macro-cleanup-gq#2ccb28e4e77df8300a1b3b20532e56735a8b9804" dependencies = [ "bindgen", "pkg-config", diff --git a/Cargo.toml b/Cargo.toml index 19cd9b5..02ab361 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,15 +3,12 @@ name = "vmod_reqwest" version = "0.0.12" edition = "2021" license = "BSD-3-Clause" -authors = [ "Guillaume Quintard guillaume.quintard@gmail.com" ] - -[build-dependencies] -varnish = { git = "https://github.com/gquintard/varnish-rs.git", branch = "proc-macro-cleanup-gq"} +authors = ["Guillaume Quintard guillaume.quintard@gmail.com"] [dependencies] -varnish = { git = "https://github.com/gquintard/varnish-rs.git", branch = "proc-macro-cleanup-gq"} -varnish-sys = { git = "https://github.com/gquintard/varnish-rs.git", branch = "proc-macro-cleanup-gq"} -varnish-macros = { git = "https://github.com/gquintard/varnish-rs.git", branch = "proc-macro-cleanup-gq" } +varnish = { path = "../varnish-rs/varnish" } +varnish-sys = { path = "../varnish-rs/varnish-sys" } +varnish-macros = { path = "../varnish-rs/varnish-macros" } regex = "1.5" lru = "0.7.1" bytes = "1.1.0" diff --git a/src/lib.rs b/src/lib.rs index 243c431..effc7ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,9 +8,9 @@ mod reqwest { use reqwest::header::HeaderValue; use tokio::sync::mpsc::Sender; + use varnish::vcl::Backend; use varnish::vcl::Probe; use varnish::vcl::{Ctx, Event}; - use varnish::vcl::Backend; // FIXME: needed for header() use varnish::ffi::{VCL_BACKEND, VCL_STRING}; @@ -57,28 +57,36 @@ mod reqwest { rcb = rcb.connect_timeout(t); } if let Some(proxy) = http_proxy { - rcb = rcb.proxy(reqwest::Proxy::https(proxy).map_err(|e| VclError::new(format!("reqwest: couldn't initialize {vcl_name}'s HTTP proxy ({e})")))?); + rcb = rcb.proxy(reqwest::Proxy::https(proxy).map_err(|e| { + VclError::new(format!( + "reqwest: couldn't initialize {vcl_name}'s HTTP proxy ({e})" + )) + })?); } if let Some(proxy) = https_proxy { - rcb = rcb.proxy(reqwest::Proxy::https(proxy).map_err(|e| VclError::new(format!("reqwest: couldn't initialize {vcl_name}'s HTTPS proxy ({e})")))?); + rcb = rcb.proxy(reqwest::Proxy::https(proxy).map_err(|e| { + VclError::new(format!( + "reqwest: couldn't initialize {vcl_name}'s HTTPS proxy ({e})" + )) + })?); } if follow <= 0 { rcb = rcb.redirect(reqwest::redirect::Policy::none()); } else { rcb = rcb.redirect(reqwest::redirect::Policy::limited(follow as usize)); } - let reqwest_client = rcb - .build() - .map_err(|e| VclError::new(format!("reqwest: couldn't initialize {vcl_name} ({e})")))?; + let reqwest_client = rcb.build().map_err(|e| { + VclError::new(format!("reqwest: couldn't initialize {vcl_name} ({e})")) + })?; if https.is_some() && base_url.is_some() { return Err(VclError::new(format!("reqwest: couldn't initialize {vcl_name}: can't take both an https and a base_url argument"))); } let probe_state = match probe { - Some(spec) => Some( - build_probe_state(spec, base_url).map_err(|e| VclError::new(format!("reqwest: failed to add probe to {vcl_name} ({e})")))?, - ), + Some(spec) => Some(build_probe_state(spec, base_url).map_err(|e| { + VclError::new(format!("reqwest: failed to add probe to {vcl_name} ({e})")) + })?), None => None, }; let has_probe = probe_state.is_some(); @@ -239,15 +247,22 @@ mod reqwest { match (n, sep) { (0, _) => Ok(VCL_STRING::default()), - (_, None) => all_headers.next().map(HeaderValue::as_ref).into_vcl(&mut ctx.ws), + (_, None) => all_headers + .next() + .map(HeaderValue::as_ref) + .into_vcl(&mut ctx.ws), (_, Some(s)) => { let mut ws = ctx.ws.reserve(); for (i, h) in all_headers.enumerate() { if i != 0 { - ws.buf.write(s.as_ref()).map_err(|e| VclError::new(e.to_string()))?; + ws.buf + .write(s.as_ref()) + .map_err(|e| VclError::new(e.to_string()))?; } - ws.buf.write(h.as_ref()).map_err(|e| VclError::new(e.to_string()))?; + ws.buf + .write(h.as_ref()) + .map_err(|e| VclError::new(e.to_string()))?; } let buf = ws.release(0); buf.into_vcl(&mut ctx.ws) @@ -270,7 +285,7 @@ mod reqwest { Ok(resp) => match resp.body { None => Ok(VCL_STRING::default()), Some(ref b) => b.into_vcl(&mut ctx.ws), - } + }, } } @@ -319,8 +334,8 @@ mod reqwest { } mod reqwest_private { + use anyhow::Error; use bytes::Bytes; -use anyhow::Error; use std::boxed::Box; use std::io::Write; use std::os::raw::{c_char, c_uint, c_void}; @@ -902,7 +917,10 @@ use anyhow::Error; })); } - pub fn build_probe_state(mut probe: Probe, base_url: Option<&str>) -> Result { + pub fn build_probe_state( + mut probe: Probe, + base_url: Option<&str>, + ) -> Result { // sanitize probe (see vbp_set_defaults in Varnish Cache) if probe.timeout.is_zero() { probe.timeout = Duration::from_secs(2); @@ -929,11 +947,17 @@ use anyhow::Error; }; let url = if let Some(base_url) = base_url { let full_url = format!("{}{}", base_url, spec_url); - Url::parse(&full_url).map_err(|e| VclError::new(format!("problem with probe endpoint {full_url} ({e})")))? + Url::parse(&full_url).map_err(|e| { + VclError::new(format!("problem with probe endpoint {full_url} ({e})")) + })? } else if spec_url.starts_with('/') { - return Err(VclError::new("client has no .base_url, and the probe doesn't have a fully-qualified URL as .url".to_string())); + return Err(VclError::new( + "client has no .base_url, and the probe doesn't have a fully-qualified URL as .url" + .to_string(), + )); } else { - Url::parse(spec_url).map_err(|e| VclError::new(format!("probe endpoint {spec_url} ({e})")))? + Url::parse(spec_url) + .map_err(|e| VclError::new(format!("probe endpoint {spec_url} ({e})")))? }; Ok(ProbeState { spec: probe, From 5b173e1da7015a0116d37727bd41006beaad86f1 Mon Sep 17 00:00:00 2001 From: Yuri Astrakhan Date: Thu, 3 Oct 2024 11:43:02 -0400 Subject: [PATCH 2/3] make it all compile, cleanup --- README.md | 4 +- build.sh | 26 -- src/implementation.rs | 685 +++++++++++++++++++++++++++++++++++++++ src/lib.rs | 729 +----------------------------------------- 4 files changed, 703 insertions(+), 741 deletions(-) delete mode 100755 build.sh create mode 100644 src/implementation.rs diff --git a/README.md b/README.md index 4b62487..376f6e7 100644 --- a/README.md +++ b/README.md @@ -22,11 +22,11 @@ import reqwest from "path/to/libreqwest.so"; ```vcl // Create a new instance of the object in your VCL init function sub vcl_init { - new new = client.new([STRING base_url], [BOOL https], [INT follow], [DURATION timeout], [DURATION connect_timeout], [BOOL auto_gzip], [BOOL auto_deflate], [BOOL auto_brotli], [BOOL accept_invalid_certs], [BOOL accept_invalid_hostnames], [STRING http_proxy], [STRING https_proxy]); + new new = client.new([STRING base_url], [BOOL https], INT follow = 10, [DURATION timeout], [DURATION connect_timeout], BOOL auto_gzip = 1, BOOL auto_deflate = 1, BOOL auto_brotli = 1, BOOL accept_invalid_certs = 0, BOOL accept_invalid_hostnames = 0, [STRING http_proxy], [STRING https_proxy], [PROBE probe]); } ``` -#### Method `VOID init(STRING name, STRING url, [STRING method])` +#### Method `VOID init(STRING name, STRING url, STRING method = "GET")` #### Method `VOID send(STRING name)` diff --git a/build.sh b/build.sh deleted file mode 100755 index b6671ef..0000000 --- a/build.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/sh - -set -ex - -OUT="$1" - -if [ -z "$OUT" ]; then - OUT=. -fi - -if [ -z "$PKG" ]; then - PKG="$(cargo metadata --no-deps --format-version 1 | jq -r '.packages[0].name')" -fi - -if [ -z "" ]; then - VMODTOOL="$(pkg-config --variable=vmodtool varnishapi)" -fi - -cargo build --release -cargo test --release - -mkdir -p "$OUT" -cp target/release/lib$PKG.so "$OUT" -rst2man $PKG.man.rst > "$OUT/$PKG.3" -"$VMODTOOL" vmod.vcc -w "$OUT" --output /tmp/tmp_file_to_delete -rm /tmp/tmp_file_to_delete.* diff --git a/src/implementation.rs b/src/implementation.rs new file mode 100644 index 0000000..f8a4e13 --- /dev/null +++ b/src/implementation.rs @@ -0,0 +1,685 @@ +pub mod reqwest_private { + use anyhow::Error; + use bytes::Bytes; + use std::boxed::Box; + use std::io::Write; + use std::os::raw::{c_char, c_uint, c_void}; + use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::Mutex; + use std::time::{Duration, Instant, SystemTime}; + + use ::reqwest::Client; + use ::reqwest::Url; + //use reqwest::header::HeaderValue; + use tokio::sync::mpsc::{Receiver, Sender, UnboundedSender}; + use varnish::ffi::{BS_CACHED, BS_ERROR, BS_NONE}; + use varnish::vcl::Vsb; + use varnish::vcl::{log, Ctx, Event, LogTag}; + use varnish::vcl::{Backend, Serve, Transfer /*, VCLBackendPtr*/}; + use varnish::vcl::{Probe, Request as ProbeRequest}; + use varnish::vcl::{VclError, VclResult}; + + pub struct ProbeState { + spec: Probe, + history: AtomicU64, + health_changed: std::time::SystemTime, + url: Url, + join_handle: Option>, + avg: Mutex, + } + #[allow(non_camel_case_types)] + pub struct client { + pub name: String, + pub be: Backend, + } + + pub struct VCLBackend { + pub name: String, + pub bgt: *const BgThread, + pub client: Client, + pub probe_state: Option, + pub https: bool, + pub base_url: Option, + } + + impl<'a> Serve for VCLBackend { + fn get_type(&self) -> &str { + "reqwest" + } + + fn get_headers(&self, ctx: &mut Ctx<'_>) -> VclResult> { + if !self.healthy(ctx).0 { + return Err("unhealthy".into()); + } + + let bereq = ctx.http_bereq.as_ref().unwrap(); + + let bereq_url = bereq.url().unwrap(); + + let url = if let Some(base_url) = &self.base_url { + // if the client has a base_url, prepend it to bereq.url + format!("{}{}", base_url, bereq_url) + } else if bereq_url.starts_with('/') { + // otherwise, if bereq.url looks like a path, try to find a host to build a full URL + if let Some(host) = bereq.header("host") { + format!( + "{}://{}{}", + if self.https { "https" } else { "http" }, + host, + bereq_url + ) + } else { + return Err("no host found (reqwest.client doesn't have a base_url, bereq.url doesn't specify a host and bereq.http.host is unset)".into()); + } + } else { + // else use bereq.url as-is + bereq_url.to_string() + }; + + let (req_body_tx, body) = hyper::body::Body::channel(); + let req = Request { + method: bereq.method().unwrap().to_string(), + url, + client: self.client.clone(), + body: ReqBody::Stream(body), + vcl: false, + headers: bereq + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect(), + }; + + let mut resp_rx = unsafe { (*self.bgt).spawn_req(req) }; + + unsafe { + struct BodyChan<'a> { + chan: hyper::body::Sender, + rt: &'a tokio::runtime::Runtime, + } + + unsafe extern "C" fn body_send_iterate( + priv_: *mut c_void, + _flush: c_uint, + ptr: *const c_void, + l: isize, + ) -> i32 { + // nothing to do + if ptr.is_null() || l == 0 { + return 0; + } + let body_chan = (priv_ as *mut BodyChan).as_mut().unwrap(); + let buf = std::slice::from_raw_parts(ptr as *const u8, l as usize); + let bytes = hyper::body::Bytes::copy_from_slice(buf); + + body_chan + .rt + .block_on(async { body_chan.chan.send_data(bytes).await }) + .is_err() + .into() + } + + // manually dropped a few lines below + let bcp = Box::into_raw(Box::new(BodyChan { + chan: req_body_tx, + rt: &(*self.bgt).rt, + })); + let p = bcp as *mut c_void; + // mimicking V1F_SendReq in varnish-cache + let bo = ctx.raw.bo.as_mut().unwrap(); + + if !bo.bereq_body.is_null() { + varnish::ffi::ObjIterate(bo.wrk, bo.bereq_body, p, Some(body_send_iterate), 0); + } else if !bo.req.is_null() && (*bo.req).req_body_status != BS_NONE.as_ptr() { + let i = varnish::ffi::VRB_Iterate( + bo.wrk, + bo.vsl.as_mut_ptr(), + bo.req, + Some(body_send_iterate), + p, + ); + + if (*bo.req).req_body_status != BS_CACHED.as_ptr() { + bo.no_retry = "req.body not cached\0".as_ptr() as *const c_char; + } + + if (*bo.req).req_body_status == BS_ERROR.as_ptr() { + assert!(i < 0); + (*bo.req).doclose = &varnish::ffi::SC_RX_BODY[0]; + } + + if i < 0 { + return Err("req.body read error".into()); + } + } + // manually drop so reqwest knows there's no more body to push + drop(Box::from_raw(bcp)); + } + let resp = match resp_rx.blocking_recv().unwrap() { + RespMsg::Hdrs(resp) => resp, + RespMsg::Err(e) => return Err(e.to_string().into()), + _ => unreachable!(), + }; + let beresp = ctx.http_beresp.as_mut().unwrap(); + beresp.set_status(resp.status as u16); + beresp.set_proto("HTTP/1.1")?; + for (k, v) in &resp.headers { + beresp.set_header( + k.as_str(), + v.to_str().map_err(|e| { + >::into(e.to_string()) + })?, + )?; + } + Ok(Some(BackendResp { + bytes: None, + cursor: 0, + chan: Some(resp_rx), + content_length: resp.content_length.map(|s| s as usize), + })) + } + + fn event(&self, event: Event) { + // nothing to do + let probe_state = match self.probe_state { + None => return, + Some(ref probe_state) => probe_state, + }; + + // enter the runtime to + let _guard = unsafe { (*self.bgt).rt.enter() }; + match event { + // start the probing loop + Event::Warm => { + spawn_probe( + unsafe { &*self.bgt }, + probe_state as *const ProbeState as *mut ProbeState, + self.name.clone(), + ); + } + Event::Cold => { + // XXX: we should set the handle to None, be we don't have mutability, oh well... + probe_state.join_handle.as_ref().unwrap().abort(); + } + _ => {} + } + } + + fn healthy(&self, _ctx: &mut Ctx<'_>) -> (bool, SystemTime) { + let probe_state = match self.probe_state { + None => return (true, SystemTime::UNIX_EPOCH), + Some(ref ps) => ps, + }; + + assert!(probe_state.spec.window <= 64); + + let bitmap = probe_state.history.load(Ordering::Relaxed); + ( + is_healthy(bitmap, probe_state.spec.window, probe_state.spec.threshold), + probe_state.health_changed, + ) + } + + fn list(&self, ctx: &mut Ctx<'_>, vsb: &mut Vsb<'_>, detailed: bool, json: bool) { + if self.probe_state.is_none() { + return self.list_without_probe(ctx, vsb, detailed, json); + } + let ProbeState { + history, + avg, + spec: Probe { + window, threshold, .. + }, + .. + } = self.probe_state.as_ref().unwrap(); + let bitmap = history.load(Ordering::Relaxed); + let window = *window; + let threshold = *threshold; + let health_str = if is_healthy(bitmap, window, threshold) { + "healthy" + } else { + "sick" + }; + let msg = match (json, detailed) { + // json, no details + (true, false) => { + format!( + "[{}, {}, \"{}\"]", + good_probes(bitmap, window), + window, + health_str, + ) + } + // json, details + (true, true) => { + // TODO: talk to upstream, we shouldn't have to add the colon + serde_json::to_string(&self.probe_state.as_ref().unwrap().spec) + .as_ref() + .unwrap() + .to_owned() + + ",\n" + } + // no json, no details + (false, false) => { + format!("{}/{}\t{}", good_probes(bitmap, window), window, health_str) + } + // no json, details + (false, true) => { + let mut s = format!( + " + Current states good: {:2} threshold: {:2} window: {:2} + Average response time of good probes: {:.06} + Oldest ================================================== Newest + ", + good_probes(bitmap, window), + threshold, + window, + avg.lock().unwrap() + ); + for i in 0..64 { + s += if bitmap.wrapping_shr(63 - i) & 1 == 1 { + "H" + } else { + "-" + }; + } + s + } + }; + vsb.cat(&msg).unwrap(); + } + } + + macro_rules! send { + ($tx:ident, $payload:expr) => { + if $tx.send($payload).await.is_err() { + return; + } + }; + } + + pub struct BackendResp { + pub chan: Option>, + pub bytes: Option, + pub cursor: usize, + pub content_length: Option, + } + + impl Transfer for BackendResp { + fn read(&mut self, mut buf: &mut [u8]) -> VclResult { + let mut n = 0; + loop { + if self.bytes.is_none() && self.chan.is_some() { + match self.chan.as_mut().unwrap().blocking_recv() { + Some(RespMsg::Hdrs(_)) => panic!("invalid message type: RespMsg::Hdrs"), + Some(RespMsg::Chunk(bytes)) => { + self.bytes = Some(bytes); + self.cursor = 0 + } + Some(RespMsg::Err(e)) => return Err(e.to_string().into()), + None => return Ok(n), + }; + } + + let pull_buf = self.bytes.as_ref().unwrap(); + let to_write = &pull_buf[self.cursor..]; + let used = buf.write(to_write).unwrap(); + self.cursor += used; + n += used; + assert!(self.cursor <= pull_buf.len()); + if self.cursor == pull_buf.len() { + self.bytes = None; + } + } + } + + fn len(&self) -> Option { + self.content_length + } + } + #[derive(Debug)] + pub enum RespMsg { + Hdrs(Response), + Chunk(Bytes), + Err(Error), + } + + #[derive(Debug)] + pub struct Entry { + pub client_name: String, + pub req_name: String, + pub transaction: VclTransaction, + } + + // try to keep the object on stack as small as possible, we'll flesh it out into a reqwest::Request + // once in the Background thread + #[derive(Debug)] + pub struct Request { + pub url: String, + pub method: String, + pub headers: Vec<(String, String)>, + pub body: ReqBody, + pub client: Client, + pub vcl: bool, + } + + use ::reqwest::header::HeaderMap; + + // calling reqwest::Response::body() consumes the object, so we keep a copy of the interesting bits + // in this struct + #[derive(Debug)] + pub struct Response { + pub headers: HeaderMap, + pub content_length: Option, + pub body: Option, + pub status: i64, + } + + #[derive(Debug)] + pub enum ReqBody { + None, + Full(Vec), + Stream(hyper::Body), + } + + #[derive(Debug)] + pub enum VclTransaction { + Transition, + Req(Request), + Sent(Receiver), + Resp(Result), + } + + impl VclTransaction { + fn unwrap_resp(&self) -> Result<&Response, VclError> { + match self { + VclTransaction::Resp(Ok(rsp)) => Ok(rsp), + VclTransaction::Resp(Err(e)) => Err(VclError::new(e.to_string())), + _ => panic!("wrong VclTransaction type"), + } + } + fn into_req(self) -> Request { + match self { + VclTransaction::Req(rq) => rq, + _ => panic!("wrong VclTransaction type"), + } + } + } + + pub struct BgThread { + pub rt: tokio::runtime::Runtime, + pub sender: UnboundedSender<(Request, Sender)>, + } + + impl BgThread { + fn spawn_req(&self, req: Request) -> Receiver { + let (tx, rx) = tokio::sync::mpsc::channel(1); + self.sender.send((req, tx)).unwrap(); + rx + } + } + + pub async fn process_req(req: Request, tx: Sender) { + let method = match reqwest::Method::from_bytes(req.method.as_bytes()) { + Ok(m) => m, + Err(e) => { + send!(tx, RespMsg::Err(e.into())); + return; + } + }; + let mut rreq = req.client.request(method, req.url); + for (k, v) in req.headers { + rreq = rreq.header(k, v); + } + match req.body { + ReqBody::None => (), + ReqBody::Stream(b) => rreq = rreq.body(b), + ReqBody::Full(v) => rreq = rreq.body(v), + } + let mut resp = match rreq.send().await { + Err(e) => { + send!(tx, RespMsg::Err(e.into())); + return; + } + Ok(resp) => resp, + }; + let mut beresp = Response { + status: resp.status().as_u16() as i64, + headers: resp.headers().clone(), + content_length: resp.content_length(), + body: None, + }; + + if req.vcl { + beresp.body = match resp.bytes().await { + Err(e) => { + send!(tx, RespMsg::Err(e.into())); + return; + } + Ok(b) => Some(b), + }; + send!(tx, RespMsg::Hdrs(beresp)); + } else { + send!(tx, RespMsg::Hdrs(beresp)); + + loop { + match resp.chunk().await { + Ok(None) => return, + Ok(Some(bytes)) => { + if tx.send(RespMsg::Chunk(bytes)).await.is_err() { + return; + } + } + Err(e) => { + send!(tx, RespMsg::Err(e.into())); + return; + } + }; + } + } + } + + fn good_probes(bitmap: u64, window: u32) -> u32 { + bitmap.wrapping_shl(64_u32 - window).count_ones() + } + + fn is_healthy(bitmap: u64, window: u32, threshold: u32) -> bool { + good_probes(bitmap, window) >= threshold + } + + fn update_health( + mut bitmap: u64, + threshold: u32, + window: u32, + probe_ok: bool, + ) -> (u64, bool, bool) { + let old_health = is_healthy(bitmap, window, threshold); + let new_bit = if probe_ok { 1 } else { 0 }; + bitmap = bitmap.wrapping_shl(1) | new_bit; + let new_health = is_healthy(bitmap, window, threshold); + (bitmap, new_health, new_health == old_health) + } + + // cheating hard with the pointer here, but the be_event function will stop us + // before the references are invalid + fn spawn_probe(bgt: &'static BgThread, probe_statep: *mut ProbeState, name: String) { + let probe_state = unsafe { probe_statep.as_mut().unwrap() }; + let spec = probe_state.spec.clone(); + let url = probe_state.url.clone(); + let history = &probe_state.history; + let avg = &probe_state.avg; + probe_state.join_handle = Some(bgt.rt.spawn(async move { + let mut h = 0_u64; + for i in 0..std::cmp::min(spec.initial, 64) { + h |= 1 << i; + } + history.store(h, Ordering::Relaxed); + let mut avg_rate = 0_f64; + loop { + let msg; + let mut time = 0_f64; + let new_bit = match reqwest::ClientBuilder::new() + .timeout(spec.timeout) + .build() + .map(|req| req.get(url.clone()).send()) + { + Err(e) => { + msg = e.to_string(); + false + } + Ok(resp) => { + let start = Instant::now(); + match resp.await { + Err(e) => { + msg = format!("Error: {}", e); + false + } + Ok(resp) if resp.status().as_u16() as u32 == spec.exp_status => { + msg = format!("Success: {}", resp.status().as_u16()); + if avg_rate < 4.0 { + avg_rate += 1.0; + } + time = start.elapsed().as_secs_f64(); + let mut _avg = avg.lock().unwrap(); + *_avg += (time - *_avg) / avg_rate; + true + } + Ok(resp) => { + msg = format!( + "Error: expected {} status, got {}", + spec.exp_status, + resp.status().as_u16() + ); + false + } + } + } + }; + let bitmap = history.load(Ordering::Relaxed); + let (bitmap, healthy, changed) = + update_health(bitmap, spec.threshold, spec.window, new_bit); + log( + LogTag::BackendHealth, + &format!( + "{} {} {} {} {} {} {} {} {} {}", + name, + if changed { "Went" } else { "Still" }, + if healthy { "healthy" } else { "sick" }, + "UNIMPLEMENTED", + good_probes(bitmap, spec.window), + spec.threshold, + spec.window, + time, + *avg.lock().unwrap(), + msg + ), + ); + history.store(bitmap, Ordering::Relaxed); + tokio::time::sleep(spec.interval).await; + } + })); + } + + pub fn build_probe_state( + mut probe: Probe, + base_url: Option<&str>, + ) -> Result { + // sanitize probe (see vbp_set_defaults in Varnish Cache) + if probe.timeout.is_zero() { + probe.timeout = Duration::from_secs(2); + } + if probe.interval.is_zero() { + probe.interval = Duration::from_secs(5); + } + if probe.window == 0 { + probe.window = 8; + } + if probe.threshold == 0 { + probe.threshold = 3; + } + if probe.exp_status == 0 { + probe.exp_status = 200; + } + if probe.initial == 0 { + probe.initial = probe.threshold - 1; + } + probe.initial = std::cmp::min(probe.initial, probe.threshold); + let spec_url = match probe.request { + ProbeRequest::URL(ref u) => u, + _ => return Err(VclError::new("can't use a probe without .url".to_string())), + }; + let url = if let Some(base_url) = base_url { + let full_url = format!("{}{}", base_url, spec_url); + Url::parse(&full_url).map_err(|e| { + VclError::new(format!("problem with probe endpoint {full_url} ({e})")) + })? + } else if spec_url.starts_with('/') { + return Err(VclError::new( + "client has no .base_url, and the probe doesn't have a fully-qualified URL as .url" + .to_string(), + )); + } else { + Url::parse(spec_url) + .map_err(|e| VclError::new(format!("probe endpoint {spec_url} ({e})")))? + }; + Ok(ProbeState { + spec: probe, + history: AtomicU64::new(0), + health_changed: std::time::SystemTime::now(), + join_handle: None, + url, + avg: Mutex::new(0_f64), + }) + } + + impl client { + pub fn vcl_send(&self, bgt: &BgThread, t: &mut VclTransaction) { + let old_t = std::mem::replace(t, VclTransaction::Transition); + *t = VclTransaction::Sent(bgt.spawn_req(old_t.into_req())); + } + + pub fn wait_on(&self, bgt: &BgThread, t: &mut VclTransaction) { + match t { + VclTransaction::Req(_) => { + self.vcl_send(bgt, t); + self.wait_on(bgt, t) + } + VclTransaction::Sent(rx) => { + *t = match rx.blocking_recv().unwrap() { + RespMsg::Hdrs(resp) => VclTransaction::Resp(Ok(resp)), + RespMsg::Chunk(_) => unreachable!(), + RespMsg::Err(e) => VclTransaction::Resp(Err(VclError::new(e.to_string()))), + }; + } + VclTransaction::Resp(_) => (), + VclTransaction::Transition => panic!("impossible"), + } + } + + pub fn get_transaction<'a>( + &self, + vp_task: &'a mut Option>>, + name: &str, + ) -> VclResult<&'a mut VclTransaction> { + vp_task + .as_mut() + .ok_or_else(|| <&str as Into>::into(name))? + .iter_mut() + .find(|e| name == e.req_name && self.name == e.client_name) + .map(|e| &mut e.transaction) + .ok_or_else(|| name.into()) + } + + // we have a stacked Result here because the first one will fail at the + // vcl level, while the core one is salvageable + pub fn get_resp<'a>( + &self, + vp_vcl: Option<&BgThread>, + vp_task: &'a mut Option>>, + name: &str, + ) -> VclResult> { + let t = self.get_transaction(vp_task, name)?; + self.wait_on(vp_vcl.as_ref().unwrap(), t); + Ok(t.unwrap_resp()) + } + } +} diff --git a/src/lib.rs b/src/lib.rs index effc7ce..bfca3c2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,11 @@ +mod implementation; + +use varnish::run_vtc_tests; +run_vtc_tests!("tests/*.vtc"); + #[varnish::vmod(docs = "README.md")] mod reqwest { - use crate::reqwest_private::*; + use crate::implementation::reqwest_private::*; use std::boxed::Box; use std::error::Error; use std::io::Write; @@ -24,26 +29,19 @@ mod reqwest { #[shared_per_vcl] vp_vcl: &mut Option>, base_url: Option<&str>, https: Option, - follow: Option, + #[arg(default = 10)] follow: i64, timeout: Option, connect_timeout: Option, - auto_gzip: Option, - auto_deflate: Option, - auto_brotli: Option, - accept_invalid_certs: Option, - accept_invalid_hostnames: Option, + #[arg(default = true)] auto_gzip: bool, + #[arg(default = true)] auto_deflate: bool, + #[arg(default = true)] auto_brotli: bool, + #[arg(default = false)] accept_invalid_certs: bool, + #[arg(default = false)] accept_invalid_hostnames: bool, http_proxy: Option<&str>, https_proxy: Option<&str>, - probe: Option>, + probe: Option, ) -> Result { // set some default - let follow = follow.unwrap_or(10); - let auto_gzip = auto_gzip.unwrap_or(true); - let auto_deflate = auto_deflate.unwrap_or(true); - let auto_brotli = auto_brotli.unwrap_or(true); - let accept_invalid_certs = accept_invalid_certs.unwrap_or(false); - let accept_invalid_hostnames = accept_invalid_hostnames.unwrap_or(false); - let mut rcb = reqwest::ClientBuilder::new() .brotli(auto_brotli) .deflate(auto_deflate) @@ -113,11 +111,10 @@ mod reqwest { pub fn init( &self, - _ctx: &Ctx, #[shared_per_task] vp_task: &mut Option>>, name: &str, url: &str, - method: &str, + #[arg(default = "GET")] method: &str, ) { if vp_task.as_ref().is_none() { *vp_task = Some(Box::new(Vec::new())); @@ -148,7 +145,6 @@ mod reqwest { pub fn send( &self, - _ctx: &Ctx, #[shared_per_vcl] vp_vcl: Option<&BgThread>, #[shared_per_task] vp_task: &mut Option>>, name: &str, @@ -165,7 +161,6 @@ mod reqwest { pub fn set_header( &self, - _ctx: &Ctx, #[shared_per_task] vp_task: &mut Option>>, name: &str, key: &str, @@ -181,7 +176,6 @@ mod reqwest { pub fn set_body( &self, - _ctx: &Ctx, #[shared_per_task] vp_task: &mut Option>>, name: &str, body: &str, @@ -214,9 +208,9 @@ mod reqwest { Ok(()) } + pub fn status( &self, - _ctx: &Ctx, #[shared_per_vcl] vp_vcl: Option<&BgThread>, #[shared_per_task] vp_task: &mut Option>>, name: &str, @@ -291,7 +285,6 @@ mod reqwest { pub fn error( &self, - _ctx: &Ctx, #[shared_per_vcl] vp_vcl: Option<&BgThread>, #[shared_per_task] vp_task: &mut Option>>, name: &str, @@ -302,14 +295,13 @@ mod reqwest { } } - pub fn backend(&self, _ctx: &Ctx) -> VCL_BACKEND { + pub fn backend(&self) -> VCL_BACKEND { self.be.vcl_ptr() } } #[event] pub fn event( - _ctx: &Ctx, #[shared_per_vcl] vp_vcl: &mut Option>, event: Event, ) -> Result<(), Box> { @@ -332,692 +324,3 @@ mod reqwest { Ok(()) } } - -mod reqwest_private { - use anyhow::Error; - use bytes::Bytes; - use std::boxed::Box; - use std::io::Write; - use std::os::raw::{c_char, c_uint, c_void}; - use std::sync::atomic::{AtomicU64, Ordering}; - use std::sync::Mutex; - use std::time::{Duration, Instant, SystemTime}; - - use ::reqwest::Client; - use ::reqwest::Url; - //use reqwest::header::HeaderValue; - use tokio::sync::mpsc::{Receiver, Sender, UnboundedSender}; - use varnish::ffi::{BS_CACHED, BS_ERROR, BS_NONE}; - use varnish::vcl::Vsb; - use varnish::vcl::{log, Ctx, Event, LogTag}; - use varnish::vcl::{Backend, Serve, Transfer /*, VCLBackendPtr*/}; - use varnish::vcl::{Probe, Request as ProbeRequest}; - use varnish::vcl::{VclError, VclResult}; - - use varnish::run_vtc_tests; - run_vtc_tests!("tests/*.vtc"); - - pub struct ProbeState { - spec: Probe, - history: AtomicU64, - health_changed: std::time::SystemTime, - url: Url, - join_handle: Option>, - avg: Mutex, - } - #[allow(non_camel_case_types)] - pub struct client { - pub name: String, - pub be: Backend, - } - - pub struct VCLBackend { - pub name: String, - pub bgt: *const BgThread, - pub client: Client, - pub probe_state: Option, - pub https: bool, - pub base_url: Option, - } - - impl<'a> Serve for VCLBackend { - fn get_type(&self) -> &str { - "reqwest" - } - - fn get_headers(&self, ctx: &mut Ctx<'_>) -> VclResult> { - if !self.healthy(ctx).0 { - return Err("unhealthy".into()); - } - - let bereq = ctx.http_bereq.as_ref().unwrap(); - - let bereq_url = bereq.url().unwrap(); - - let url = if let Some(base_url) = &self.base_url { - // if the client has a base_url, prepend it to bereq.url - format!("{}{}", base_url, bereq_url) - } else if bereq_url.starts_with('/') { - // otherwise, if bereq.url looks like a path, try to find a host to build a full URL - if let Some(host) = bereq.header("host") { - format!( - "{}://{}{}", - if self.https { "https" } else { "http" }, - host, - bereq_url - ) - } else { - return Err("no host found (reqwest.client doesn't have a base_url, bereq.url doesn't specify a host and bereq.http.host is unset)".into()); - } - } else { - // else use bereq.url as-is - bereq_url.to_string() - }; - - let (req_body_tx, body) = hyper::body::Body::channel(); - let req = Request { - method: bereq.method().unwrap().to_string(), - url, - client: self.client.clone(), - body: ReqBody::Stream(body), - vcl: false, - headers: bereq - .into_iter() - .map(|(k, v)| (k.into(), v.into())) - .collect(), - }; - - let mut resp_rx = unsafe { (*self.bgt).spawn_req(req) }; - - unsafe { - struct BodyChan<'a> { - chan: hyper::body::Sender, - rt: &'a tokio::runtime::Runtime, - } - - unsafe extern "C" fn body_send_iterate( - priv_: *mut c_void, - _flush: c_uint, - ptr: *const c_void, - l: isize, - ) -> i32 { - // nothing to do - if ptr.is_null() || l == 0 { - return 0; - } - let body_chan = (priv_ as *mut BodyChan).as_mut().unwrap(); - let buf = std::slice::from_raw_parts(ptr as *const u8, l as usize); - let bytes = hyper::body::Bytes::copy_from_slice(buf); - - body_chan - .rt - .block_on(async { body_chan.chan.send_data(bytes).await }) - .is_err() - .into() - } - - // manually dropped a few lines below - let bcp = Box::into_raw(Box::new(BodyChan { - chan: req_body_tx, - rt: &(*self.bgt).rt, - })); - let p = bcp as *mut c_void; - // mimicking V1F_SendReq in varnish-cache - let bo = ctx.raw.bo.as_mut().unwrap(); - - if !bo.bereq_body.is_null() { - varnish::ffi::ObjIterate(bo.wrk, bo.bereq_body, p, Some(body_send_iterate), 0); - } else if !bo.req.is_null() && (*bo.req).req_body_status != BS_NONE.as_ptr() { - let i = varnish::ffi::VRB_Iterate( - bo.wrk, - bo.vsl.as_mut_ptr(), - bo.req, - Some(body_send_iterate), - p, - ); - - if (*bo.req).req_body_status != BS_CACHED.as_ptr() { - bo.no_retry = "req.body not cached\0".as_ptr() as *const c_char; - } - - if (*bo.req).req_body_status == BS_ERROR.as_ptr() { - assert!(i < 0); - (*bo.req).doclose = &varnish::ffi::SC_RX_BODY[0]; - } - - if i < 0 { - return Err("req.body read error".into()); - } - } - // manually drop so reqwest knows there's no more body to push - drop(Box::from_raw(bcp)); - } - let resp = match resp_rx.blocking_recv().unwrap() { - RespMsg::Hdrs(resp) => resp, - RespMsg::Err(e) => return Err(e.to_string().into()), - _ => unreachable!(), - }; - let beresp = ctx.http_beresp.as_mut().unwrap(); - beresp.set_status(resp.status as u16); - beresp.set_proto("HTTP/1.1")?; - for (k, v) in &resp.headers { - beresp.set_header( - k.as_str(), - v.to_str().map_err(|e| { - >::into(e.to_string()) - })?, - )?; - } - Ok(Some(BackendResp { - bytes: None, - cursor: 0, - chan: Some(resp_rx), - content_length: resp.content_length.map(|s| s as usize), - })) - } - - fn event(&self, event: Event) { - // nothing to do - let probe_state = match self.probe_state { - None => return, - Some(ref probe_state) => probe_state, - }; - - // enter the runtime to - let _guard = unsafe { (*self.bgt).rt.enter() }; - match event { - // start the probing loop - Event::Warm => { - spawn_probe( - unsafe { &*self.bgt }, - probe_state as *const ProbeState as *mut ProbeState, - self.name.clone(), - ); - } - Event::Cold => { - // XXX: we should set the handle to None, be we don't have mutability, oh well... - probe_state.join_handle.as_ref().unwrap().abort(); - } - _ => {} - } - } - - fn healthy(&self, _ctx: &mut Ctx<'_>) -> (bool, SystemTime) { - let probe_state = match self.probe_state { - None => return (true, SystemTime::UNIX_EPOCH), - Some(ref ps) => ps, - }; - - assert!(probe_state.spec.window <= 64); - - let bitmap = probe_state.history.load(Ordering::Relaxed); - ( - is_healthy(bitmap, probe_state.spec.window, probe_state.spec.threshold), - probe_state.health_changed, - ) - } - - fn list(&self, ctx: &mut Ctx<'_>, vsb: &mut Vsb<'_>, detailed: bool, json: bool) { - if self.probe_state.is_none() { - return self.list_without_probe(ctx, vsb, detailed, json); - } - let ProbeState { - history, - avg, - spec: Probe { - window, threshold, .. - }, - .. - } = self.probe_state.as_ref().unwrap(); - let bitmap = history.load(Ordering::Relaxed); - let window = *window; - let threshold = *threshold; - let health_str = if is_healthy(bitmap, window, threshold) { - "healthy" - } else { - "sick" - }; - let msg = match (json, detailed) { - // json, no details - (true, false) => { - format!( - "[{}, {}, \"{}\"]", - good_probes(bitmap, window), - window, - health_str, - ) - } - // json, details - (true, true) => { - // TODO: talk to upstream, we shouldn't have to add the colon - serde_json::to_string(&self.probe_state.as_ref().unwrap().spec) - .as_ref() - .unwrap() - .to_owned() - + ",\n" - } - // no json, no details - (false, false) => { - format!("{}/{}\t{}", good_probes(bitmap, window), window, health_str) - } - // no json, details - (false, true) => { - let mut s = format!( - " - Current states good: {:2} threshold: {:2} window: {:2} - Average response time of good probes: {:.06} - Oldest ================================================== Newest - ", - good_probes(bitmap, window), - threshold, - window, - avg.lock().unwrap() - ); - for i in 0..64 { - s += if bitmap.wrapping_shr(63 - i) & 1 == 1 { - "H" - } else { - "-" - }; - } - s - } - }; - vsb.cat(&msg).unwrap(); - } - } - - macro_rules! send { - ($tx:ident, $payload:expr) => { - if $tx.send($payload).await.is_err() { - return; - } - }; - } - - pub struct BackendResp { - pub chan: Option>, - pub bytes: Option, - pub cursor: usize, - pub content_length: Option, - } - - impl Transfer for BackendResp { - fn read(&mut self, mut buf: &mut [u8]) -> VclResult { - let mut n = 0; - loop { - if self.bytes.is_none() && self.chan.is_some() { - match self.chan.as_mut().unwrap().blocking_recv() { - Some(RespMsg::Hdrs(_)) => panic!("invalid message type: RespMsg::Hdrs"), - Some(RespMsg::Chunk(bytes)) => { - self.bytes = Some(bytes); - self.cursor = 0 - } - Some(RespMsg::Err(e)) => return Err(e.to_string().into()), - None => return Ok(n), - }; - } - - let pull_buf = self.bytes.as_ref().unwrap(); - let to_write = &pull_buf[self.cursor..]; - let used = buf.write(to_write).unwrap(); - self.cursor += used; - n += used; - assert!(self.cursor <= pull_buf.len()); - if self.cursor == pull_buf.len() { - self.bytes = None; - } - } - } - - fn len(&self) -> Option { - self.content_length - } - } - #[derive(Debug)] - pub enum RespMsg { - Hdrs(Response), - Chunk(Bytes), - Err(Error), - } - - #[derive(Debug)] - pub struct Entry { - pub client_name: String, - pub req_name: String, - pub transaction: VclTransaction, - } - - // try to keep the object on stack as small as possible, we'll flesh it out into a reqwest::Request - // once in the Background thread - #[derive(Debug)] - pub struct Request { - pub url: String, - pub method: String, - pub headers: Vec<(String, String)>, - pub body: ReqBody, - pub client: Client, - pub vcl: bool, - } - - use ::reqwest::header::HeaderMap; - - // calling reqwest::Response::body() consumes the object, so we keep a copy of the interesting bits - // in this struct - #[derive(Debug)] - pub struct Response { - pub headers: HeaderMap, - pub content_length: Option, - pub body: Option, - pub status: i64, - } - - #[derive(Debug)] - pub enum ReqBody { - None, - Full(Vec), - Stream(hyper::Body), - } - - #[derive(Debug)] - pub enum VclTransaction { - Transition, - Req(Request), - Sent(Receiver), - Resp(Result), - } - - impl VclTransaction { - fn unwrap_resp(&self) -> Result<&Response, VclError> { - match self { - VclTransaction::Resp(Ok(rsp)) => Ok(rsp), - VclTransaction::Resp(Err(e)) => Err(VclError::new(e.to_string())), - _ => panic!("wrong VclTransaction type"), - } - } - fn into_req(self) -> Request { - match self { - VclTransaction::Req(rq) => rq, - _ => panic!("wrong VclTransaction type"), - } - } - } - - pub struct BgThread { - pub rt: tokio::runtime::Runtime, - pub sender: UnboundedSender<(Request, Sender)>, - } - - impl BgThread { - fn spawn_req(&self, req: Request) -> Receiver { - let (tx, rx) = tokio::sync::mpsc::channel(1); - self.sender.send((req, tx)).unwrap(); - rx - } - } - - pub async fn process_req(req: Request, tx: Sender) { - let method = match reqwest::Method::from_bytes(req.method.as_bytes()) { - Ok(m) => m, - Err(e) => { - send!(tx, RespMsg::Err(e.into())); - return; - } - }; - let mut rreq = req.client.request(method, req.url); - for (k, v) in req.headers { - rreq = rreq.header(k, v); - } - match req.body { - ReqBody::None => (), - ReqBody::Stream(b) => rreq = rreq.body(b), - ReqBody::Full(v) => rreq = rreq.body(v), - } - let mut resp = match rreq.send().await { - Err(e) => { - send!(tx, RespMsg::Err(e.into())); - return; - } - Ok(resp) => resp, - }; - let mut beresp = Response { - status: resp.status().as_u16() as i64, - headers: resp.headers().clone(), - content_length: resp.content_length(), - body: None, - }; - - if req.vcl { - beresp.body = match resp.bytes().await { - Err(e) => { - send!(tx, RespMsg::Err(e.into())); - return; - } - Ok(b) => Some(b), - }; - send!(tx, RespMsg::Hdrs(beresp)); - } else { - send!(tx, RespMsg::Hdrs(beresp)); - - loop { - match resp.chunk().await { - Ok(None) => return, - Ok(Some(bytes)) => { - if tx.send(RespMsg::Chunk(bytes)).await.is_err() { - return; - } - } - Err(e) => { - send!(tx, RespMsg::Err(e.into())); - return; - } - }; - } - } - } - - fn good_probes(bitmap: u64, window: u32) -> u32 { - bitmap.wrapping_shl(64_u32 - window).count_ones() - } - - fn is_healthy(bitmap: u64, window: u32, threshold: u32) -> bool { - good_probes(bitmap, window) >= threshold - } - - fn update_health( - mut bitmap: u64, - threshold: u32, - window: u32, - probe_ok: bool, - ) -> (u64, bool, bool) { - let old_health = is_healthy(bitmap, window, threshold); - let new_bit = if probe_ok { 1 } else { 0 }; - bitmap = bitmap.wrapping_shl(1) | new_bit; - let new_health = is_healthy(bitmap, window, threshold); - (bitmap, new_health, new_health == old_health) - } - - // cheating hard with the pointer here, but the be_event function will stop us - // before the references are invalid - fn spawn_probe(bgt: &'static BgThread, probe_statep: *mut ProbeState, name: String) { - let probe_state = unsafe { probe_statep.as_mut().unwrap() }; - let spec = probe_state.spec.clone(); - let url = probe_state.url.clone(); - let history = &probe_state.history; - let avg = &probe_state.avg; - probe_state.join_handle = Some(bgt.rt.spawn(async move { - let mut h = 0_u64; - for i in 0..std::cmp::min(spec.initial, 64) { - h |= 1 << i; - } - history.store(h, Ordering::Relaxed); - let mut avg_rate = 0_f64; - loop { - let msg; - let mut time = 0_f64; - let new_bit = match reqwest::ClientBuilder::new() - .timeout(spec.timeout) - .build() - .map(|req| req.get(url.clone()).send()) - { - Err(e) => { - msg = e.to_string(); - false - } - Ok(resp) => { - let start = Instant::now(); - match resp.await { - Err(e) => { - msg = format!("Error: {}", e); - false - } - Ok(resp) if resp.status().as_u16() as u32 == spec.exp_status => { - msg = format!("Success: {}", resp.status().as_u16()); - if avg_rate < 4.0 { - avg_rate += 1.0; - } - time = start.elapsed().as_secs_f64(); - let mut _avg = avg.lock().unwrap(); - *_avg += (time - *_avg) / avg_rate; - true - } - Ok(resp) => { - msg = format!( - "Error: expected {} status, got {}", - spec.exp_status, - resp.status().as_u16() - ); - false - } - } - } - }; - let bitmap = history.load(Ordering::Relaxed); - let (bitmap, healthy, changed) = - update_health(bitmap, spec.threshold, spec.window, new_bit); - log( - LogTag::BackendHealth, - &format!( - "{} {} {} {} {} {} {} {} {} {}", - name, - if changed { "Went" } else { "Still" }, - if healthy { "healthy" } else { "sick" }, - "UNIMPLEMENTED", - good_probes(bitmap, spec.window), - spec.threshold, - spec.window, - time, - *avg.lock().unwrap(), - msg - ), - ); - history.store(bitmap, Ordering::Relaxed); - tokio::time::sleep(spec.interval).await; - } - })); - } - - pub fn build_probe_state( - mut probe: Probe, - base_url: Option<&str>, - ) -> Result { - // sanitize probe (see vbp_set_defaults in Varnish Cache) - if probe.timeout.is_zero() { - probe.timeout = Duration::from_secs(2); - } - if probe.interval.is_zero() { - probe.interval = Duration::from_secs(5); - } - if probe.window == 0 { - probe.window = 8; - } - if probe.threshold == 0 { - probe.threshold = 3; - } - if probe.exp_status == 0 { - probe.exp_status = 200; - } - if probe.initial == 0 { - probe.initial = probe.threshold - 1; - } - probe.initial = std::cmp::min(probe.initial, probe.threshold); - let spec_url = match probe.request { - ProbeRequest::URL(ref u) => u, - _ => return Err(VclError::new("can't use a probe without .url".to_string())), - }; - let url = if let Some(base_url) = base_url { - let full_url = format!("{}{}", base_url, spec_url); - Url::parse(&full_url).map_err(|e| { - VclError::new(format!("problem with probe endpoint {full_url} ({e})")) - })? - } else if spec_url.starts_with('/') { - return Err(VclError::new( - "client has no .base_url, and the probe doesn't have a fully-qualified URL as .url" - .to_string(), - )); - } else { - Url::parse(spec_url) - .map_err(|e| VclError::new(format!("probe endpoint {spec_url} ({e})")))? - }; - Ok(ProbeState { - spec: probe, - history: AtomicU64::new(0), - health_changed: std::time::SystemTime::now(), - join_handle: None, - url, - avg: Mutex::new(0_f64), - }) - } - - impl client { - pub fn vcl_send(&self, bgt: &BgThread, t: &mut VclTransaction) { - let old_t = std::mem::replace(t, VclTransaction::Transition); - *t = VclTransaction::Sent(bgt.spawn_req(old_t.into_req())); - } - - pub fn wait_on(&self, bgt: &BgThread, t: &mut VclTransaction) { - match t { - VclTransaction::Req(_) => { - self.vcl_send(bgt, t); - self.wait_on(bgt, t) - } - VclTransaction::Sent(rx) => { - *t = match rx.blocking_recv().unwrap() { - RespMsg::Hdrs(resp) => VclTransaction::Resp(Ok(resp)), - RespMsg::Chunk(_) => unreachable!(), - RespMsg::Err(e) => VclTransaction::Resp(Err(VclError::new(e.to_string()))), - }; - } - VclTransaction::Resp(_) => (), - VclTransaction::Transition => panic!("impossible"), - } - } - - pub fn get_transaction<'a>( - &self, - vp_task: &'a mut Option>>, - name: &str, - ) -> VclResult<&'a mut VclTransaction> { - vp_task - .as_mut() - .ok_or_else(|| <&str as Into>::into(name))? - .iter_mut() - .find(|e| name == e.req_name && self.name == e.client_name) - .map(|e| &mut e.transaction) - .ok_or_else(|| name.into()) - } - - // we have a stacked Result here because the first one will fail at the - // vcl level, while the core one is salvageable - pub fn get_resp<'a>( - &self, - vp_vcl: Option<&BgThread>, - vp_task: &'a mut Option>>, - name: &str, - ) -> VclResult> { - let t = self.get_transaction(vp_task, name)?; - self.wait_on(vp_vcl.as_ref().unwrap(), t); - Ok(t.unwrap_resp()) - } - } -} From b7a9b48c4267bbc0e1fd88d673629f73c3e8a166 Mon Sep 17 00:00:00 2001 From: Yuri Astrakhan Date: Thu, 3 Oct 2024 17:56:51 -0400 Subject: [PATCH 3/3] use new defaults --- .github/workflows/tests.yaml | 52 +++++++++++++++++++-------- Cargo.lock | 3 ++ Cargo.toml | 16 ++++----- justfile | 69 ++++++++++++++++++++++++++++++++++++ src/implementation.rs | 16 ++++----- src/lib.rs | 25 +++++++------ 6 files changed, 136 insertions(+), 45 deletions(-) create mode 100644 justfile diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index c664365..869ed0b 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -1,19 +1,41 @@ name: Build -on: [push] + +# FIXME: remove testing_gq from below + +on: + push: + branches: [ main, testing_gq ] + pull_request: + branches: [ main, testing_gq ] + release: + types: [ published ] + workflow_dispatch: + jobs: - main: + test: runs-on: ubuntu-latest + strategy: + fail-fast: false # 7.6 is not yet supported properly. Once fixed, this can be set to true + matrix: + include: + - setup: varnish74 + - setup: varnish75 + - setup: varnish76 + env: + RUST_BACKTRACE: 1 + RUSTDOCFLAGS: -D warnings + RUSTFLAGS: -D warnings steps: - - uses: actions/checkout@v2 - - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - - run: | - sudo apt-get install -y curl - curl -s https://packagecloud.io/install/repositories/varnishcache/varnish75/script.deb.sh | sudo bash - sudo apt-get install varnish-dev - - run: | - export RUST_BACKTRACE=1 - cargo doc - cargo build - cargo test + - uses: taiki-e/install-action@v2 + with: { tool: just } + - uses: actions/checkout@v4 + - name: Ensure this crate has not yet been published (on release) + if: github.event_name == 'release' + run: just check-if-published + - uses: Swatinem/rust-cache@v2 + if: github.event_name != 'release' && github.event_name != 'workflow_dispatch' + - name: install varnish-dev + run: | + curl -s https://packagecloud.io/install/repositories/varnishcache/${{ matrix.setup }}/script.deb.sh | sudo bash + sudo apt-get install -y varnish-dev + - run: just -v ci-test diff --git a/Cargo.lock b/Cargo.lock index 29aebe5..a89a9be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1373,6 +1373,7 @@ dependencies = [ [[package]] name = "varnish" version = "0.0.19" +source = "git+https://github.com/gquintard/varnish-rs.git?branch=feature-macro#a99d66cdb32a4da309b283ba3bccc46bfd9ddf04" dependencies = [ "glob", "varnish-macros", @@ -1382,6 +1383,7 @@ dependencies = [ [[package]] name = "varnish-macros" version = "0.0.19" +source = "git+https://github.com/gquintard/varnish-rs.git?branch=feature-macro#a99d66cdb32a4da309b283ba3bccc46bfd9ddf04" dependencies = [ "darling", "prettyplease", @@ -1396,6 +1398,7 @@ dependencies = [ [[package]] name = "varnish-sys" version = "0.0.19" +source = "git+https://github.com/gquintard/varnish-rs.git?branch=feature-macro#a99d66cdb32a4da309b283ba3bccc46bfd9ddf04" dependencies = [ "bindgen", "pkg-config", diff --git a/Cargo.toml b/Cargo.toml index 02ab361..aec9e21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,19 +6,19 @@ license = "BSD-3-Clause" authors = ["Guillaume Quintard guillaume.quintard@gmail.com"] [dependencies] -varnish = { path = "../varnish-rs/varnish" } -varnish-sys = { path = "../varnish-rs/varnish-sys" } -varnish-macros = { path = "../varnish-rs/varnish-macros" } -regex = "1.5" -lru = "0.7.1" +anyhow = "1.0" bytes = "1.1.0" -reqwest = { version = "0.11", features = ["stream", "deflate", "gzip", "brotli", "native-tls"] } -tokio = { version = "1", features = ["full"] } futures = "0.3" futures-util = "0.3" hyper = "0.14.16" -anyhow = "1.0" +lru = "0.7.1" +regex = "1.5" +reqwest = { version = "0.11", features = ["stream", "deflate", "gzip", "brotli", "native-tls"] } serde_json = "1" +tokio = { version = "1", features = ["full"] } +varnish = { git = "https://github.com/gquintard/varnish-rs.git", branch = "feature-macro"} +varnish-macros = { git = "https://github.com/gquintard/varnish-rs.git", branch = "feature-macro" } +varnish-sys = { git = "https://github.com/gquintard/varnish-rs.git", branch = "feature-macro"} [lib] crate-type = ["cdylib"] diff --git a/justfile b/justfile new file mode 100644 index 0000000..69f989c --- /dev/null +++ b/justfile @@ -0,0 +1,69 @@ +#!/usr/bin/env just --justfile + +@_default: + just --list --unsorted + +# Clean all build artifacts +clean: + cargo clean + rm -f Cargo.lock + +# Update dependencies, including breaking changes +update: + cargo +nightly -Z unstable-options update --breaking + cargo update + +# Run cargo clippy +clippy: + cargo clippy --workspace --all-targets -- -D warnings + +# Test code formatting +test-fmt: + cargo fmt --all -- --check + +# Run cargo fmt +fmt: + cargo +nightly fmt -- --config imports_granularity=Module,group_imports=StdExternalCrate + +# Build and open code documentation +docs: + cargo doc --no-deps --open + +# Quick compile +check: + cargo check --workspace --all-targets + +# Default build +build: + cargo build --workspace --all-targets + +# Run all tests +test: build + cargo test --workspace --all-targets + +# Test documentation +test-doc: + cargo doc --no-deps + +rust-info: + rustc --version + cargo --version + +# Run all tests as expected by CI +ci-test: rust-info test-fmt clippy test test-doc + +# Verify that the current version of the crate is not the same as the one published on crates.io +check-if-published: + #!/usr/bin/env bash + LOCAL_VERSION="$(grep '^version =' Cargo.toml | sed -E 's/version = "([^"]*)".*/\1/')" + echo "Detected crate version: $LOCAL_VERSION" + CRATE_NAME="$(grep '^name =' Cargo.toml | head -1 | sed -E 's/name = "(.*)"/\1/')" + echo "Detected crate name: $CRATE_NAME" + PUBLISHED_VERSION="$(cargo search ${CRATE_NAME} | grep "^${CRATE_NAME} =" | sed -E 's/.* = "(.*)".*/\1/')" + echo "Published crate version: $PUBLISHED_VERSION" + if [ "$LOCAL_VERSION" = "$PUBLISHED_VERSION" ]; then + echo "ERROR: The current crate version has already been published." + exit 1 + else + echo "The current crate version has not yet been published." + fi diff --git a/src/implementation.rs b/src/implementation.rs index f8a4e13..4004143 100644 --- a/src/implementation.rs +++ b/src/implementation.rs @@ -1,6 +1,4 @@ pub mod reqwest_private { - use anyhow::Error; - use bytes::Bytes; use std::boxed::Box; use std::io::Write; use std::os::raw::{c_char, c_uint, c_void}; @@ -8,16 +6,16 @@ pub mod reqwest_private { use std::sync::Mutex; use std::time::{Duration, Instant, SystemTime}; - use ::reqwest::Client; - use ::reqwest::Url; + use ::reqwest::{Client, Url}; + use anyhow::Error; + use bytes::Bytes; //use reqwest::header::HeaderValue; use tokio::sync::mpsc::{Receiver, Sender, UnboundedSender}; use varnish::ffi::{BS_CACHED, BS_ERROR, BS_NONE}; - use varnish::vcl::Vsb; - use varnish::vcl::{log, Ctx, Event, LogTag}; + use varnish::vcl::{ + log, Ctx, Event, LogTag, Probe, Request as ProbeRequest, VclError, VclResult, Vsb, + }; use varnish::vcl::{Backend, Serve, Transfer /*, VCLBackendPtr*/}; - use varnish::vcl::{Probe, Request as ProbeRequest}; - use varnish::vcl::{VclError, VclResult}; pub struct ProbeState { spec: Probe, @@ -559,7 +557,7 @@ pub mod reqwest_private { update_health(bitmap, spec.threshold, spec.window, new_bit); log( LogTag::BackendHealth, - &format!( + format!( "{} {} {} {} {} {} {} {} {} {}", name, if changed { "Went" } else { "Still" }, diff --git a/src/lib.rs b/src/lib.rs index bfca3c2..c660cc2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +#![expect(clippy::box_collection)] // Box> is required for now by proc macro + mod implementation; use varnish::run_vtc_tests; @@ -5,7 +7,6 @@ run_vtc_tests!("tests/*.vtc"); #[varnish::vmod(docs = "README.md")] mod reqwest { - use crate::implementation::reqwest_private::*; use std::boxed::Box; use std::error::Error; use std::io::Write; @@ -13,13 +14,11 @@ mod reqwest { use reqwest::header::HeaderValue; use tokio::sync::mpsc::Sender; - use varnish::vcl::Backend; - use varnish::vcl::Probe; - use varnish::vcl::{Ctx, Event}; - // FIXME: needed for header() use varnish::ffi::{VCL_BACKEND, VCL_STRING}; - use varnish::vcl::{IntoVCL, VclError}; + use varnish::vcl::{Backend, Ctx, Event, IntoVCL, Probe, VclError}; + + use crate::implementation::reqwest_private::*; impl client { #[allow(clippy::too_many_arguments)] @@ -29,14 +28,14 @@ mod reqwest { #[shared_per_vcl] vp_vcl: &mut Option>, base_url: Option<&str>, https: Option, - #[arg(default = 10)] follow: i64, + #[default(10)] follow: i64, timeout: Option, connect_timeout: Option, - #[arg(default = true)] auto_gzip: bool, - #[arg(default = true)] auto_deflate: bool, - #[arg(default = true)] auto_brotli: bool, - #[arg(default = false)] accept_invalid_certs: bool, - #[arg(default = false)] accept_invalid_hostnames: bool, + #[default(true)] auto_gzip: bool, + #[default(true)] auto_deflate: bool, + #[default(true)] auto_brotli: bool, + #[default(false)] accept_invalid_certs: bool, + #[default(false)] accept_invalid_hostnames: bool, http_proxy: Option<&str>, https_proxy: Option<&str>, probe: Option, @@ -114,7 +113,7 @@ mod reqwest { #[shared_per_task] vp_task: &mut Option>>, name: &str, url: &str, - #[arg(default = "GET")] method: &str, + #[default("GET")] method: &str, ) { if vp_task.as_ref().is_none() { *vp_task = Some(Box::new(Vec::new()));