diff --git a/libs/telemetry/src/capturing/ng/exporter.rs b/libs/telemetry/src/capturing/ng/exporter.rs index b4c990488a1..c49cb72d25d 100644 --- a/libs/telemetry/src/capturing/ng/exporter.rs +++ b/libs/telemetry/src/capturing/ng/exporter.rs @@ -1,4 +1,4 @@ -use std::{borrow::Cow, collections::HashMap, str::FromStr, sync::Arc}; +use std::{borrow::Cow, collections::HashMap, fmt::Debug, str::FromStr, sync::Arc}; use enumflags2::{bitflags, BitFlags}; use serde::Serialize; @@ -63,7 +63,7 @@ impl From for ExportedEvent { } } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Default, Serialize)] pub struct Trace { pub spans: Vec, pub events: Vec, @@ -113,6 +113,18 @@ pub struct CaptureSettings { #[derive(Clone)] pub struct Exporter(Arc); +impl Debug for Exporter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Exporter").finish() + } +} + +impl Default for Exporter { + fn default() -> Self { + Self::new() + } +} + struct Inner { tx: UnboundedSender, } @@ -129,14 +141,29 @@ impl Exporter { let (tx, mut rx) = mpsc::unbounded_channel(); crosstarget_utils::task::spawn(async move { + let mut traces = HashMap::new(); + while let Some(msg) = rx.recv().await { match msg { - Message::StartCapturing(request_id) => {} + Message::StartCapturing(request_id) => { + traces.insert(request_id, Trace::default()); + } + Message::StopCapturing(request_id, tx) => { - let _ = tx.send(None); + _ = tx.send(traces.remove(&request_id)); + } + + Message::AddSpan(request_id, span) => { + if let Some(trace) = traces.get_mut(&request_id) { + trace.spans.push(span.into()); + } + } + + Message::AddEvent(request_id, event) => { + if let Some(trace) = traces.get_mut(&request_id) { + trace.events.push(event.into()); + } } - Message::AddSpan(request_id, span) => {} - Message::AddEvent(request_id, event) => {} } } }); @@ -146,45 +173,23 @@ impl Exporter { pub async fn start_capturing(&self) -> RequestId { let request_id = RequestId::next(); - - self.0 - .tx - .send(Message::StartCapturing(request_id)) - .expect("capturer task dropped the receiver"); - + _ = self.0.tx.send(Message::StartCapturing(request_id)); request_id } pub async fn stop_capturing(&self, request_id: RequestId) -> Option { let (tx, rx) = oneshot::channel(); - - self.0 - .tx - .send(Message::StopCapturing(request_id, tx)) - .expect("capturer task dropped the receiver"); - + _ = self.0.tx.send(Message::StopCapturing(request_id, tx)); rx.await.expect("capturer task dropped the sender") } } -impl Default for Exporter { - fn default() -> Self { - Self::new() - } -} - impl Collector for Exporter { fn add_span(&self, trace: RequestId, span: CollectedSpan) { - self.0 - .tx - .send(Message::AddSpan(trace, span)) - .expect("capturer task dropped the receiver"); + _ = self.0.tx.send(Message::AddSpan(trace, span)); } fn add_event(&self, trace: RequestId, event: CollectedEvent) { - self.0 - .tx - .send(Message::AddEvent(trace, event)) - .expect("capturer task dropped the receiver"); + _ = self.0.tx.send(Message::AddEvent(trace, event)); } }