diff --git a/libs/telemetry/src/capturing/ng/collector.rs b/libs/telemetry/src/capturing/ng/collector.rs index 910ac2dc9ba..aa5c580fcee 100644 --- a/libs/telemetry/src/capturing/ng/collector.rs +++ b/libs/telemetry/src/capturing/ng/collector.rs @@ -3,6 +3,7 @@ use std::{ collections::HashMap, num::NonZeroU64, sync::atomic::{AtomicU64, Ordering}, + time::{Duration, SystemTime}, }; use derive_more::Display; @@ -116,37 +117,39 @@ impl Default for RequestId { #[derive(Debug, Clone)] #[cfg_attr(test, derive(Serialize))] pub struct CollectedSpan { - id: SpanId, - parent_id: Option, - name: Cow<'static, str>, + pub(crate) id: SpanId, + pub(crate) parent_id: Option, + pub(crate) name: Cow<'static, str>, #[cfg_attr(test, serde(skip_serializing))] - start_time: Instant, + pub(crate) start_time: SystemTime, #[cfg_attr(test, serde(skip_serializing))] - end_time: Instant, - attributes: HashMap<&'static str, serde_json::Value>, - kind: SpanKind, - links: Vec, + pub(crate) duration: Duration, + pub(crate) attributes: HashMap<&'static str, serde_json::Value>, + pub(crate) kind: SpanKind, + pub(crate) links: Vec, } pub(crate) struct SpanBuilder { request_id: Option, id: SpanId, name: Cow<'static, str>, - start_time: Instant, - end_time: Option, + // we store both the wall clock time and a monotonically increasing instant to + // be resilient against clock changes between the start and end of the span + start_time: SystemTime, + start_instant: Instant, attributes: HashMap<&'static str, serde_json::Value>, kind: Option, links: Vec, } impl SpanBuilder { - pub fn new(name: &'static str, id: impl Into, start_time: Instant, attrs_size_hint: usize) -> Self { + pub fn new(name: &'static str, id: impl Into, attrs_size_hint: usize) -> Self { Self { request_id: None, id: id.into(), name: name.into(), - start_time, - end_time: None, + start_time: SystemTime::now(), + start_instant: Instant::now(), attributes: HashMap::with_capacity(attrs_size_hint), kind: None, links: Vec::new(), @@ -177,13 +180,13 @@ impl SpanBuilder { self.links.push(link); } - pub fn end(self, parent_id: Option>, end_time: Instant) -> CollectedSpan { + pub fn end(self, parent_id: Option>) -> CollectedSpan { CollectedSpan { id: self.id, parent_id: parent_id.map(Into::into), name: self.name, start_time: self.start_time, - end_time, + duration: self.start_instant.elapsed(), attributes: self.attributes, kind: self.kind.unwrap_or(SpanKind::Internal), links: self.links, @@ -194,35 +197,29 @@ impl SpanBuilder { #[derive(Debug, Clone)] #[cfg_attr(test, derive(Serialize))] pub struct CollectedEvent { - span_id: SpanId, - name: &'static str, - level: LogLevel, + pub(crate) span_id: SpanId, + pub(crate) name: &'static str, + pub(crate) level: LogLevel, #[cfg_attr(test, serde(skip_serializing))] - timestamp: Instant, - attributes: HashMap<&'static str, serde_json::Value>, + pub(crate) timestamp: SystemTime, + pub(crate) attributes: HashMap<&'static str, serde_json::Value>, } pub(crate) struct EventBuilder { span_id: SpanId, name: &'static str, level: LogLevel, - timestamp: Instant, + timestamp: SystemTime, attributes: HashMap<&'static str, serde_json::Value>, } impl EventBuilder { - pub fn new( - span_id: SpanId, - name: &'static str, - level: LogLevel, - timestamp: Instant, - attrs_size_hint: usize, - ) -> Self { + pub fn new(span_id: SpanId, name: &'static str, level: LogLevel, attrs_size_hint: usize) -> Self { Self { span_id, name, level, - timestamp, + timestamp: SystemTime::now(), attributes: HashMap::with_capacity(attrs_size_hint), } } diff --git a/libs/telemetry/src/capturing/ng/exporter.rs b/libs/telemetry/src/capturing/ng/exporter.rs index 7a444c07947..5341fb6af37 100644 --- a/libs/telemetry/src/capturing/ng/exporter.rs +++ b/libs/telemetry/src/capturing/ng/exporter.rs @@ -1,17 +1,107 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{borrow::Cow, collections::HashMap, sync::Arc}; + +use serde::Serialize; +use tokio::sync::{mpsc, oneshot, Mutex, RwLock}; + +use crate::models::{HrTime, LogLevel, SpanKind}; use super::collector::{CollectedEvent, CollectedSpan, Collector, RequestId, SpanId}; +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ExportedSpan { + id: SpanId, + parent_id: Option, + name: Cow<'static, str>, + start_time: HrTime, + end_time: HrTime, + kind: SpanKind, + #[serde(skip_serializing_if = "HashMap::is_empty")] + attributes: HashMap<&'static str, serde_json::Value>, + #[serde(skip_serializing_if = "Vec::is_empty")] + links: Vec, +} + +impl From for ExportedSpan { + fn from(span: CollectedSpan) -> Self { + Self { + id: span.id, + parent_id: span.parent_id, + name: span.name, + start_time: span.start_time.into(), + end_time: (span.start_time + span.duration).into(), + kind: span.kind, + attributes: span.attributes, + links: span.links, + } + } +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ExportedEvent { + span_id: SpanId, + name: &'static str, + level: LogLevel, + timestamp: HrTime, + attributes: HashMap<&'static str, serde_json::Value>, +} + +impl From for ExportedEvent { + fn from(event: CollectedEvent) -> Self { + Self { + span_id: event.span_id, + name: event.name, + level: event.level, + timestamp: event.timestamp.into(), + attributes: event.attributes, + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct Trace { + pub spans: Vec, + pub events: Vec, +} + #[derive(Clone)] -pub struct Exporter(Arc); +pub struct Exporter(Arc); -struct ExporterInner { - tasks: HashMap, +struct Inner { + // We use fine-grained locking here to avoid contention. On any operations with the existing + // traces, the outer lock should only be held for a tiny amount of time to clone the inner Arc. + traces: RwLock>>>, } impl Exporter { pub fn new() -> Self { - Self(Arc::new(ExporterInner { tasks: HashMap::new() })) + Self(Arc::new(Inner { + traces: RwLock::new(HashMap::new()), + })) + } + + pub async fn start_capturing(&self) -> RequestId { + let request_id = RequestId::next(); + + self.0.traces.write().await.insert( + request_id, + Arc::new(Mutex::new(Trace { + spans: Vec::new(), + events: Vec::new(), + })), + ); + + request_id + } + + pub async fn stop_capturing(&self, request_id: RequestId) -> Option { + let trace = self.0.traces.write().await.remove(&request_id)?; + + Some(match Arc::try_unwrap(trace) { + Ok(trace) => trace.into_inner(), + Err(trace) => trace.lock().await.clone(), + }) } } @@ -22,11 +112,29 @@ impl Default for Exporter { } impl Collector for Exporter { - fn add_span(&self, _trace: RequestId, _span: CollectedSpan) { - todo!() + fn add_span(&self, trace: RequestId, span: CollectedSpan) { + let inner = Arc::clone(&self.0); + + tokio::spawn(async move { + let trace = inner.traces.read().await.get(&trace).cloned(); + + if let Some(trace) = trace { + let span = span.into(); + trace.lock().await.spans.push(span); + } + }); } - fn add_event(&self, _trace: RequestId, _event: CollectedEvent) { - todo!() + fn add_event(&self, trace: RequestId, event: CollectedEvent) { + let inner = Arc::clone(&self.0); + + tokio::spawn(async move { + let trace = inner.traces.read().await.get(&trace).cloned(); + + if let Some(trace) = trace { + let event = event.into(); + trace.lock().await.events.push(event); + } + }); } } diff --git a/libs/telemetry/src/capturing/ng/layer.rs b/libs/telemetry/src/capturing/ng/layer.rs index 0213d69860d..14722c6dbe7 100644 --- a/libs/telemetry/src/capturing/ng/layer.rs +++ b/libs/telemetry/src/capturing/ng/layer.rs @@ -1,6 +1,5 @@ use std::marker::PhantomData; -use tokio::time::Instant; use tracing::{ field, span::{Attributes, Id}, @@ -66,15 +65,6 @@ where fn require_span<'a>(id: &Id, ctx: &'a Context<'_, S>) -> SpanRef<'a, S> { ctx.span(id).expect("span must exist in the registry, this is a bug") } - - fn root_span_checked<'a>(id: &Id, ctx: &'a Context<'_, S>) -> Option> { - ctx.span_scope(id)?.from_root().next() - } - - fn root_span<'a>(id: &Id, ctx: &'a Context<'_, S>) -> SpanRef<'a, S> { - Self::root_span_checked(id, ctx) - .expect("span scope must exist in the registry and include at least the requested span ID") - } } impl Layer for CapturingLayer @@ -84,7 +74,7 @@ where { fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { let span = Self::require_span(id, &ctx); - let mut span_builder = SpanBuilder::new(span.name(), id, Instant::now(), attrs.fields().len()); + let mut span_builder = SpanBuilder::new(span.name(), id, attrs.fields().len()); if let Some(request_id) = span .parent() @@ -127,7 +117,7 @@ where return; }; - let Some(request_id) = Self::root_span(&parent, &ctx) + let Some(request_id) = Self::require_span(&parent, &ctx) .extensions() .get::() .and_then(|sb| sb.request_id()) @@ -139,7 +129,6 @@ where parent.into(), event.metadata().name(), event.metadata().level().into(), - Instant::now(), event.metadata().fields().len(), ); @@ -159,9 +148,8 @@ where return; }; - let end_time = Instant::now(); let parent_id = span.parent().map(|parent| parent.id()); - let collected_span = span_builder.end(parent_id, end_time); + let collected_span = span_builder.end(parent_id); self.collector.add_span(request_id, collected_span); } diff --git a/libs/telemetry/src/models.rs b/libs/telemetry/src/models.rs index 4c101554f4d..0da27f1ca3f 100644 --- a/libs/telemetry/src/models.rs +++ b/libs/telemetry/src/models.rs @@ -237,23 +237,51 @@ pub type LogEvent = Event; /// metrics are modeled as span events pub type MetricEvent = Event; -pub type HrTime = [u64; 2]; - -/// Take from the otel library on what the format should be for High-Resolution time -/// Defines High-Resolution Time. +/// High-resolution time in the same format that OpenTelemetry uses. +/// +/// The first number is Unix time in seconds since 00:00:00 UTC on 1 January 1970. +/// The second number is the sub-second amount of time elapsed since time represented by the first +/// number in nanoseconds. +/// +/// ## Example +/// +/// For example, `2021-01-01T12:30:10.150Z` in Unix time in milliseconds is 1609504210150. +/// Then the first number can be calculated by converting and truncating the epoch time in +/// milliseconds to seconds: +/// +/// ```js +/// time[0] = Math.trunc(1609504210150 / 1000) // = 1609504210 +/// ``` +/// +/// The second number can be calculated by converting the digits after the decimal point of the +/// expression `(1609504210150 / 1000) - time[0]` to nanoseconds: +/// +/// ```js +/// time[1] = Number((1609504210.150 - time[0]).toFixed(9)) * 1e9 // = 150000000. +/// ``` /// -/// The first number, HrTime[0], is UNIX Epoch time in seconds since 00:00:00 UTC on 1 January 1970. -/// The second number, HrTime[1], represents the partial second elapsed since Unix Epoch time represented by first number in nanoseconds. -/// For example, 2021-01-01T12:30:10.150Z in UNIX Epoch time in milliseconds is represented as 1609504210150. -/// The first number is calculated by converting and truncating the Epoch time in milliseconds to seconds: -/// HrTime[0] = Math.trunc(1609504210150 / 1000) = 1609504210. -/// The second number is calculated by converting the digits after the decimal point of the subtraction, (1609504210150 / 1000) - HrTime[0], to nanoseconds: -/// HrTime[1] = Number((1609504210.150 - HrTime[0]).toFixed(9)) * 1e9 = 150000000. -/// This is represented in HrTime format as [1609504210, 150000000]. +/// Therefore, this time is represented in `HrTime` format as `[1609504210, 150000000]`. +#[derive(Clone, Copy, Debug, Serialize, PartialEq, Eq)] +pub struct HrTime(u64, u32); + +impl From for HrTime { + fn from(time: Duration) -> Self { + Self(time.as_secs(), time.subsec_nanos()) + } +} + +impl From for HrTime { + fn from(time: SystemTime) -> Self { + time.duration_since(SystemTime::UNIX_EPOCH) + .expect("time can't be before unix epoch") + .into() + } +} + fn convert_to_high_res_time(time: Duration) -> HrTime { let secs = time.as_secs(); let partial = time.subsec_nanos(); - [secs, partial as u64] + HrTime(secs, partial) } /// Transforms an [`opentelemetry::Value`] to a [`serde_json::Value`] @@ -283,6 +311,6 @@ mod tests { fn test_high_resolution_time_works() { // 2021-01-01T12:30:10.150Z in UNIX Epoch time in milliseconds let time_val = Duration::from_millis(1609504210150); - assert_eq!([1609504210, 150000000], convert_to_high_res_time(time_val)); + assert_eq!(HrTime::from(time_val), HrTime(1609504210, 150000000)); } }