Skip to content

Commit

Permalink
First pass at exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
aqrln committed Nov 21, 2024
1 parent d02d1fb commit 3453dd6
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 67 deletions.
55 changes: 26 additions & 29 deletions libs/telemetry/src/capturing/ng/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
collections::HashMap,
num::NonZeroU64,
sync::atomic::{AtomicU64, Ordering},
time::{Duration, SystemTime},
};

use derive_more::Display;
Expand Down Expand Up @@ -116,37 +117,39 @@ impl Default for RequestId {
#[derive(Debug, Clone)]
#[cfg_attr(test, derive(Serialize))]
pub struct CollectedSpan {
id: SpanId,
parent_id: Option<SpanId>,
name: Cow<'static, str>,
pub(crate) id: SpanId,
pub(crate) parent_id: Option<SpanId>,
pub(crate) name: Cow<'static, str>,
#[cfg_attr(test, serde(skip_serializing))]
start_time: Instant,
pub(crate) start_time: SystemTime,
#[cfg_attr(test, serde(skip_serializing))]
end_time: Instant,
attributes: HashMap<&'static str, serde_json::Value>,
kind: SpanKind,
links: Vec<SpanId>,
pub(crate) duration: Duration,
pub(crate) attributes: HashMap<&'static str, serde_json::Value>,
pub(crate) kind: SpanKind,
pub(crate) links: Vec<SpanId>,
}

pub(crate) struct SpanBuilder {
request_id: Option<RequestId>,
id: SpanId,
name: Cow<'static, str>,
start_time: Instant,
end_time: Option<Instant>,
// we store both the wall clock time and a monotonically increasing instant to
// be resilient against clock changes between the start and end of the span
start_time: SystemTime,
start_instant: Instant,
attributes: HashMap<&'static str, serde_json::Value>,
kind: Option<SpanKind>,
links: Vec<SpanId>,
}

impl SpanBuilder {
pub fn new(name: &'static str, id: impl Into<SpanId>, start_time: Instant, attrs_size_hint: usize) -> Self {
pub fn new(name: &'static str, id: impl Into<SpanId>, attrs_size_hint: usize) -> Self {
Self {
request_id: None,
id: id.into(),
name: name.into(),
start_time,
end_time: None,
start_time: SystemTime::now(),
start_instant: Instant::now(),
attributes: HashMap::with_capacity(attrs_size_hint),
kind: None,
links: Vec::new(),
Expand Down Expand Up @@ -177,13 +180,13 @@ impl SpanBuilder {
self.links.push(link);
}

pub fn end(self, parent_id: Option<impl Into<SpanId>>, end_time: Instant) -> CollectedSpan {
pub fn end(self, parent_id: Option<impl Into<SpanId>>) -> CollectedSpan {
CollectedSpan {
id: self.id,
parent_id: parent_id.map(Into::into),
name: self.name,
start_time: self.start_time,
end_time,
duration: self.start_instant.elapsed(),
attributes: self.attributes,
kind: self.kind.unwrap_or(SpanKind::Internal),
links: self.links,
Expand All @@ -194,35 +197,29 @@ impl SpanBuilder {
#[derive(Debug, Clone)]
#[cfg_attr(test, derive(Serialize))]
pub struct CollectedEvent {
span_id: SpanId,
name: &'static str,
level: LogLevel,
pub(crate) span_id: SpanId,
pub(crate) name: &'static str,
pub(crate) level: LogLevel,
#[cfg_attr(test, serde(skip_serializing))]
timestamp: Instant,
attributes: HashMap<&'static str, serde_json::Value>,
pub(crate) timestamp: SystemTime,
pub(crate) attributes: HashMap<&'static str, serde_json::Value>,
}

pub(crate) struct EventBuilder {
span_id: SpanId,
name: &'static str,
level: LogLevel,
timestamp: Instant,
timestamp: SystemTime,
attributes: HashMap<&'static str, serde_json::Value>,
}

impl EventBuilder {
pub fn new(
span_id: SpanId,
name: &'static str,
level: LogLevel,
timestamp: Instant,
attrs_size_hint: usize,
) -> Self {
pub fn new(span_id: SpanId, name: &'static str, level: LogLevel, attrs_size_hint: usize) -> Self {
Self {
span_id,
name,
level,
timestamp,
timestamp: SystemTime::now(),
attributes: HashMap::with_capacity(attrs_size_hint),
}
}
Expand Down
126 changes: 117 additions & 9 deletions libs/telemetry/src/capturing/ng/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,107 @@
use std::{collections::HashMap, sync::Arc};
use std::{borrow::Cow, collections::HashMap, sync::Arc};

use serde::Serialize;
use tokio::sync::{mpsc, oneshot, Mutex, RwLock};

use crate::models::{HrTime, LogLevel, SpanKind};

use super::collector::{CollectedEvent, CollectedSpan, Collector, RequestId, SpanId};

#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ExportedSpan {
id: SpanId,
parent_id: Option<SpanId>,
name: Cow<'static, str>,
start_time: HrTime,
end_time: HrTime,
kind: SpanKind,
#[serde(skip_serializing_if = "HashMap::is_empty")]
attributes: HashMap<&'static str, serde_json::Value>,
#[serde(skip_serializing_if = "Vec::is_empty")]
links: Vec<SpanId>,
}

impl From<CollectedSpan> for ExportedSpan {
fn from(span: CollectedSpan) -> Self {
Self {
id: span.id,
parent_id: span.parent_id,
name: span.name,
start_time: span.start_time.into(),
end_time: (span.start_time + span.duration).into(),
kind: span.kind,
attributes: span.attributes,
links: span.links,
}
}
}

#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ExportedEvent {
span_id: SpanId,
name: &'static str,
level: LogLevel,
timestamp: HrTime,
attributes: HashMap<&'static str, serde_json::Value>,
}

impl From<CollectedEvent> for ExportedEvent {
fn from(event: CollectedEvent) -> Self {
Self {
span_id: event.span_id,
name: event.name,
level: event.level,
timestamp: event.timestamp.into(),
attributes: event.attributes,
}
}
}

#[derive(Debug, Clone, Serialize)]
pub struct Trace {
pub spans: Vec<ExportedSpan>,
pub events: Vec<ExportedEvent>,
}

#[derive(Clone)]
pub struct Exporter(Arc<ExporterInner>);
pub struct Exporter(Arc<Inner>);

struct ExporterInner {
tasks: HashMap<SpanId, ()>,
struct Inner {
// We use fine-grained locking here to avoid contention. On any operations with the existing
// traces, the outer lock should only be held for a tiny amount of time to clone the inner Arc.
traces: RwLock<HashMap<RequestId, Arc<Mutex<Trace>>>>,
}

impl Exporter {
pub fn new() -> Self {
Self(Arc::new(ExporterInner { tasks: HashMap::new() }))
Self(Arc::new(Inner {
traces: RwLock::new(HashMap::new()),
}))
}

pub async fn start_capturing(&self) -> RequestId {
let request_id = RequestId::next();

self.0.traces.write().await.insert(
request_id,
Arc::new(Mutex::new(Trace {
spans: Vec::new(),
events: Vec::new(),
})),
);

request_id
}

pub async fn stop_capturing(&self, request_id: RequestId) -> Option<Trace> {
let trace = self.0.traces.write().await.remove(&request_id)?;

Some(match Arc::try_unwrap(trace) {
Ok(trace) => trace.into_inner(),
Err(trace) => trace.lock().await.clone(),
})
}
}

Expand All @@ -22,11 +112,29 @@ impl Default for Exporter {
}

impl Collector for Exporter {
fn add_span(&self, _trace: RequestId, _span: CollectedSpan) {
todo!()
fn add_span(&self, trace: RequestId, span: CollectedSpan) {
let inner = Arc::clone(&self.0);

tokio::spawn(async move {
let trace = inner.traces.read().await.get(&trace).cloned();

if let Some(trace) = trace {
let span = span.into();
trace.lock().await.spans.push(span);
}
});
}

fn add_event(&self, _trace: RequestId, _event: CollectedEvent) {
todo!()
fn add_event(&self, trace: RequestId, event: CollectedEvent) {
let inner = Arc::clone(&self.0);

tokio::spawn(async move {
let trace = inner.traces.read().await.get(&trace).cloned();

if let Some(trace) = trace {
let event = event.into();
trace.lock().await.events.push(event);
}
});
}
}
18 changes: 3 additions & 15 deletions libs/telemetry/src/capturing/ng/layer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::marker::PhantomData;

use tokio::time::Instant;
use tracing::{
field,
span::{Attributes, Id},
Expand Down Expand Up @@ -66,15 +65,6 @@ where
fn require_span<'a>(id: &Id, ctx: &'a Context<'_, S>) -> SpanRef<'a, S> {
ctx.span(id).expect("span must exist in the registry, this is a bug")
}

fn root_span_checked<'a>(id: &Id, ctx: &'a Context<'_, S>) -> Option<SpanRef<'a, S>> {
ctx.span_scope(id)?.from_root().next()
}

fn root_span<'a>(id: &Id, ctx: &'a Context<'_, S>) -> SpanRef<'a, S> {
Self::root_span_checked(id, ctx)
.expect("span scope must exist in the registry and include at least the requested span ID")
}
}

impl<S, C> Layer<S> for CapturingLayer<S, C>
Expand All @@ -84,7 +74,7 @@ where
{
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
let span = Self::require_span(id, &ctx);
let mut span_builder = SpanBuilder::new(span.name(), id, Instant::now(), attrs.fields().len());
let mut span_builder = SpanBuilder::new(span.name(), id, attrs.fields().len());

if let Some(request_id) = span
.parent()
Expand Down Expand Up @@ -127,7 +117,7 @@ where
return;
};

let Some(request_id) = Self::root_span(&parent, &ctx)
let Some(request_id) = Self::require_span(&parent, &ctx)
.extensions()
.get::<SpanBuilder>()
.and_then(|sb| sb.request_id())
Expand All @@ -139,7 +129,6 @@ where
parent.into(),
event.metadata().name(),
event.metadata().level().into(),
Instant::now(),
event.metadata().fields().len(),
);

Expand All @@ -159,9 +148,8 @@ where
return;
};

let end_time = Instant::now();
let parent_id = span.parent().map(|parent| parent.id());
let collected_span = span_builder.end(parent_id, end_time);
let collected_span = span_builder.end(parent_id);

self.collector.add_span(request_id, collected_span);
}
Expand Down
Loading

0 comments on commit 3453dd6

Please sign in to comment.