Skip to content

Commit

Permalink
Re-implement Exporter using channel
Browse files Browse the repository at this point in the history
  • Loading branch information
aqrln committed Nov 28, 2024
1 parent 62204e6 commit 1ec40ac
Showing 1 changed file with 37 additions and 32 deletions.
69 changes: 37 additions & 32 deletions libs/telemetry/src/capturing/ng/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{borrow::Cow, collections::HashMap, str::FromStr, sync::Arc};
use std::{borrow::Cow, collections::HashMap, fmt::Debug, str::FromStr, sync::Arc};

use enumflags2::{bitflags, BitFlags};
use serde::Serialize;
Expand Down Expand Up @@ -63,7 +63,7 @@ impl From<CollectedEvent> for ExportedEvent {
}
}

#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Default, Serialize)]
pub struct Trace {
pub spans: Vec<ExportedSpan>,
pub events: Vec<ExportedEvent>,
Expand Down Expand Up @@ -113,6 +113,18 @@ pub struct CaptureSettings {
#[derive(Clone)]
pub struct Exporter(Arc<Inner>);

impl Debug for Exporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Exporter").finish()
}
}

impl Default for Exporter {
fn default() -> Self {
Self::new()
}
}

struct Inner {
tx: UnboundedSender<Message>,
}
Expand All @@ -129,14 +141,29 @@ impl Exporter {
let (tx, mut rx) = mpsc::unbounded_channel();

crosstarget_utils::task::spawn(async move {
let mut traces = HashMap::new();

while let Some(msg) = rx.recv().await {
match msg {
Message::StartCapturing(request_id) => {}
Message::StartCapturing(request_id) => {
traces.insert(request_id, Trace::default());
}

Message::StopCapturing(request_id, tx) => {
let _ = tx.send(None);
_ = tx.send(traces.remove(&request_id));
}

Message::AddSpan(request_id, span) => {
if let Some(trace) = traces.get_mut(&request_id) {
trace.spans.push(span.into());
}
}

Message::AddEvent(request_id, event) => {
if let Some(trace) = traces.get_mut(&request_id) {
trace.events.push(event.into());
}
}
Message::AddSpan(request_id, span) => {}
Message::AddEvent(request_id, event) => {}
}
}
});
Expand All @@ -146,45 +173,23 @@ impl Exporter {

pub async fn start_capturing(&self) -> RequestId {
let request_id = RequestId::next();

self.0
.tx
.send(Message::StartCapturing(request_id))
.expect("capturer task dropped the receiver");

_ = self.0.tx.send(Message::StartCapturing(request_id));
request_id
}

pub async fn stop_capturing(&self, request_id: RequestId) -> Option<Trace> {
let (tx, rx) = oneshot::channel();

self.0
.tx
.send(Message::StopCapturing(request_id, tx))
.expect("capturer task dropped the receiver");

_ = self.0.tx.send(Message::StopCapturing(request_id, tx));
rx.await.expect("capturer task dropped the sender")
}
}

impl Default for Exporter {
fn default() -> Self {
Self::new()
}
}

impl Collector for Exporter {
fn add_span(&self, trace: RequestId, span: CollectedSpan) {
self.0
.tx
.send(Message::AddSpan(trace, span))
.expect("capturer task dropped the receiver");
_ = self.0.tx.send(Message::AddSpan(trace, span));
}

fn add_event(&self, trace: RequestId, event: CollectedEvent) {
self.0
.tx
.send(Message::AddEvent(trace, event))
.expect("capturer task dropped the receiver");
_ = self.0.tx.send(Message::AddEvent(trace, event));
}
}

0 comments on commit 1ec40ac

Please sign in to comment.