diff --git a/shed/futures_ext/BUCK b/shed/futures_ext/BUCK index e5f5fd193..9011ae948 100644 --- a/shed/futures_ext/BUCK +++ b/shed/futures_ext/BUCK @@ -19,7 +19,9 @@ rust_library( deps = [ "fbsource//third-party/rust:anyhow", "fbsource//third-party/rust:futures", + "fbsource//third-party/rust:maybe-owned", "fbsource//third-party/rust:pin-project", + "fbsource//third-party/rust:slog", "fbsource//third-party/rust:thiserror", "fbsource//third-party/rust:tokio", "//common/rust/shed/shared_error:shared_error", diff --git a/shed/futures_ext/Cargo.toml b/shed/futures_ext/Cargo.toml index ed8f8f117..19a176b5e 100644 --- a/shed/futures_ext/Cargo.toml +++ b/shed/futures_ext/Cargo.toml @@ -13,8 +13,10 @@ license = "MIT OR Apache-2.0" [dependencies] anyhow = "1.0.86" futures = { version = "0.3.30", features = ["async-await", "compat"] } +maybe-owned = "0.3.4" pin-project = "0.4.30" shared_error = { version = "0.1.0", path = "../shared_error" } +slog = { version = "2.7", features = ["max_level_trace", "nested-values"] } thiserror = "1.0.64" tokio = { version = "1.41.0", features = ["full", "test-util", "tracing"] } diff --git a/shed/futures_ext/src/stream.rs b/shed/futures_ext/src/stream.rs index cb098c089..75212d0ea 100644 --- a/shed/futures_ext/src/stream.rs +++ b/shed/futures_ext/src/stream.rs @@ -67,11 +67,22 @@ pub trait FbStreamExt: Stream { } /// Construct a new [self::yield_periodically::YieldPeriodically], with a sensible default. - fn yield_periodically(self) -> YieldPeriodically + #[track_caller] + fn yield_periodically<'a>(self) -> YieldPeriodically<'a, Self> where Self: Sized, { - YieldPeriodically::new(self, Duration::from_millis(10)) + let location = std::panic::Location::caller(); + + let location = slog::RecordLocation { + file: location.file(), + line: location.line(), + column: location.column(), + function: "", + module: "", + }; + + YieldPeriodically::new(self, location, Duration::from_millis(10)) } } diff --git a/shed/futures_ext/src/stream/yield_periodically.rs b/shed/futures_ext/src/stream/yield_periodically.rs index 020c41215..ec206a9ca 100644 --- a/shed/futures_ext/src/stream/yield_periodically.rs +++ b/shed/futures_ext/src/stream/yield_periodically.rs @@ -7,6 +7,7 @@ * of this source tree. */ +use std::fmt::Arguments; use std::pin::Pin; use std::time::Duration; use std::time::Instant; @@ -14,13 +15,19 @@ use std::time::Instant; use futures::stream::Stream; use futures::task::Context; use futures::task::Poll; +use maybe_owned::MaybeOwned; use pin_project::pin_project; +use slog::Logger; +use slog::Record; + +/// If the budget is exceeded, we will log a warning if the total overshoot is more than this multiplier. +const BUDGET_OVERSHOOT_MULTIPLIER: f32 = 3.0; /// A stream that will yield control back to the caller if it runs for more than a given duration /// without yielding (i.e. returning Poll::Pending). The clock starts counting the first time the /// stream is polled, and is reset every time the stream yields. #[pin_project] -pub struct YieldPeriodically { +pub struct YieldPeriodically<'a, S> { #[pin] inner: S, /// Default budget. @@ -29,21 +36,52 @@ pub struct YieldPeriodically { current_budget: Duration, /// Whether the next iteration must yield because the budget was exceeded. must_yield: bool, + /// The code location where yield_periodically was called. + location: slog::RecordLocation, + /// Enable logging to the provided logger. + logger: Option>, + /// The threshold for logging. + log_threshold: Duration, } -impl YieldPeriodically { +impl YieldPeriodically<'_, S> { /// Create a new [YieldPeriodically]. - pub fn new(inner: S, budget: Duration) -> Self { + pub fn new(inner: S, location: slog::RecordLocation, budget: Duration) -> Self { + let multiplier = BUDGET_OVERSHOOT_MULTIPLIER + 1.0; + Self { inner, budget, current_budget: budget, must_yield: false, + location, + logger: None, + log_threshold: Duration::from_millis( + budget.mul_f32(multiplier).as_millis().try_into().unwrap(), + ), + } + } + + /// Set the budget for this stream. + pub fn with_budget(mut self, budget: Duration) -> Self { + self.budget = budget; + self.current_budget = budget; + self + } + + /// Enable debug logging. + pub fn with_logger<'a, L>(self, logger: L) -> YieldPeriodically<'a, S> + where + L: Into>, + { + YieldPeriodically { + logger: Some(logger.into()), + ..self } } } -impl Stream for YieldPeriodically { +impl Stream for YieldPeriodically<'_, S> { type Item = ::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -63,11 +101,22 @@ impl Stream for YieldPeriodically { return res; } + let current_budget = *this.current_budget; let elapsed = now.elapsed(); match this.current_budget.checked_sub(elapsed) { Some(new_budget) => *this.current_budget = new_budget, None => { + if elapsed > *this.log_threshold { + maybe_log( + this.logger, + this.location, + &format_args!( + "yield_periodically(): budget overshot: current_budget={:?}, elapsed={:?}", + current_budget, elapsed, + ), + ); + } *this.must_yield = true; *this.current_budget = *this.budget; } @@ -77,6 +126,24 @@ impl Stream for YieldPeriodically { } } +fn maybe_log( + logger: &Option>, + location: &slog::RecordLocation, + fmt: &Arguments<'_>, +) { + if let Some(logger) = &logger { + logger.log(&Record::new( + &slog::RecordStatic { + location, + level: slog::Level::Warning, + tag: "futures_watchdog", + }, + fmt, + slog::b!(), + )); + } +} + #[cfg(test)] mod test { use futures::stream::StreamExt; @@ -90,7 +157,8 @@ mod test { std::thread::sleep(Duration::from_millis(1)); }); - let stream = YieldPeriodically::new(stream, Duration::from_millis(100)); + let stream = + YieldPeriodically::new(stream, location_for_test(), Duration::from_millis(100)); futures::pin_mut!(stream); @@ -131,7 +199,19 @@ mod test { }) .take(30); - let stream = YieldPeriodically::new(stream, Duration::from_millis(10)); + let stream = YieldPeriodically::new(stream, location_for_test(), Duration::from_millis(10)); stream.collect::>().await; } + + #[track_caller] + fn location_for_test() -> slog::RecordLocation { + let location = std::panic::Location::caller(); + slog::RecordLocation { + file: location.file(), + line: location.line(), + column: location.column(), + function: "", + module: "", + } + } }