Skip to content

Commit

Permalink
method docs
Browse files Browse the repository at this point in the history
  • Loading branch information
divi255 committed Aug 22, 2024
1 parent d726920 commit 5c0e9dc
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 17 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "metrics-exporter-scope"
version = "0.1.2"
version = "0.1.3"
edition = "2021"
authors = ["Serhij S. <[email protected]>"]
license = "Apache-2.0"
Expand Down
8 changes: 4 additions & 4 deletions proto.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ where

## Communication

The server sends serialized metrics snapshot packets as well as informational
ones to the client. The first packet is always an informational one. The client
The server sends serialized metrics snapshot packets as well as information
ones to the client. The first packet is always an information one. The client
should determine the packet type according to its structure.

### Informational packets
### Information packets

The informational packets are used to send metrics metadata to the client. The
The information packets are used to send metrics metadata to the client. The
server sends such packets every 5 seconds (by default).

```json
Expand Down
57 changes: 46 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
#![deny(missing_docs)]
#[cfg(feature = "msrv")]
extern crate metrics_legacy as metrics;
#[cfg(feature = "msrv")]
Expand All @@ -20,18 +21,24 @@ use rtsc::time::interval;
use serde::{Deserialize, Serialize};
use tracing::{error, info};

/// Crate error type
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// I/O errors
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("{0}")]
Other(String),
/// Data serialization errors
#[error("encode error: {0}")]
Encode(#[from] rmp_serde::encode::Error),
/// Data deserialization errors
#[error("decode error: {0}")]
Decode(#[from] rmp_serde::decode::Error),
/// Recorder setup errors
#[error("set recorder error: {0}")]
SetRecorder(#[from] metrics::SetRecorderError<ScopeRecorder>),
/// Other errors
#[error("{0}")]
Other(String),
}

impl From<TryFromIntError> for Error {
Expand All @@ -46,23 +53,18 @@ const SEND_INFO_INTERVAL: Duration = Duration::from_secs(5);

const SERVER_THREAD_NAME: &str = "MScopeSrv";

/// Communication protocol
pub mod protocol {

/// Current protocol version
pub const VERSION: u16 = 1;

use std::io::{Read, Write};

use crate::{ClientSettings, Error, Packet};
use serde::{Deserialize, Serialize};

pub fn write_version<W>(mut stream: W) -> Result<(), Error>
where
W: Write,
{
stream.write_all(&VERSION.to_le_bytes())?;
Ok(())
}

/// Read protocol version from a stream
pub fn read_version<R>(mut stream: R) -> Result<u16, Error>
where
R: Read,
Expand All @@ -72,27 +74,40 @@ pub mod protocol {
Ok(u16::from_le_bytes(*buf))
}

/// Write protocol version to a stream
pub fn write_version<W>(mut stream: W) -> Result<(), Error>
where
W: Write,
{
stream.write_all(&VERSION.to_le_bytes())?;
Ok(())
}

/// Read a packet from a stream
pub fn read_packet<R>(stream: R) -> Result<Packet, Error>
where
R: Read,
{
read(stream)
}

/// Write a packet to a stream
pub fn write_packet<W>(stream: W, packet: &Packet) -> Result<(), Error>
where
W: Write,
{
write(stream, packet)
}

/// Read client settings from a stream
pub fn read_client_settings<R>(stream: R) -> Result<ClientSettings, Error>
where
R: Read,
{
read(stream)
}

/// Write client settings to a stream
pub fn write_client_settings<W>(stream: W, settings: &ClientSettings) -> Result<(), Error>
where
W: Write,
Expand Down Expand Up @@ -125,13 +140,17 @@ pub mod protocol {
}
}

/// Communication packets
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(untagged)]
pub enum Packet {
/// Information packet (metrics metadata)
Info(Info),
/// Snapshot packet (metrics data)
Snapshot(Snapshot),
}

/// Client settings
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct ClientSettings {
sampling_interval: u64,
Expand All @@ -148,49 +167,59 @@ impl ClientSettings {
}
}

/// Information packet
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct Info {
metrics: BTreeMap<String, MetricInfo>,
}

impl Info {
/// Get metrics metadata map
pub fn metrics(&self) -> &BTreeMap<String, MetricInfo> {
&self.metrics
}
}

/// Metrics metadata
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct MetricInfo {
labels: BTreeMap<String, String>,
}

impl MetricInfo {
/// Metric labels map
pub fn labels(&self) -> &BTreeMap<String, String> {
&self.labels
}
}

/// Snapshot packet
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct Snapshot {
t: Monotonic,
d: BTreeMap<String, f64>,
}

impl Snapshot {
/// Snapshot timestamp (monotonic, relative to the communication start)
pub fn ts(&self) -> Monotonic {
self.t
}
/// Snapshot data map (metric name -> value)
pub fn data(&self) -> &BTreeMap<String, f64> {
&self.d
}
/// Snapshot data map mutable (metric name -> value)
pub fn data_mut(&mut self) -> &mut BTreeMap<String, f64> {
&mut self.d
}
/// Take snapshot data map
pub fn take_data(&mut self) -> BTreeMap<String, f64> {
std::mem::take(&mut self.d)
}
}

/// Exporter builder
pub struct ScopeBuilder {
addr: SocketAddr,
fallback: Option<Box<dyn Recorder>>,
Expand All @@ -203,28 +232,34 @@ impl Default for ScopeBuilder {
}

impl ScopeBuilder {
/// Create a new exporter builder
pub fn new() -> Self {
Self {
addr: (std::net::Ipv4Addr::UNSPECIFIED, 5001).into(),
fallback: None,
}
}
/// Set the server listening address and port
pub fn with_addr<A: Into<SocketAddr>>(mut self, addr: A) -> Self {
self.addr = addr.into();
self
}
/// Set the fallback recorder
pub fn with_fallback(mut self, fallback: Box<dyn Recorder>) -> Self {
self.fallback = Some(fallback);
self
}
/// Build the exporter's recorder
pub fn build(self) -> ScopeRecorder {
ScopeRecorder::build(self.addr, self.fallback)
}
/// Build the exporter's recorder and install it as the global recorder
pub fn install(self) -> Result<(), Error> {
self.build().install()
}
}

/// Scope recorder
#[derive(Clone)]
pub struct ScopeRecorder {
inner: Arc<Inner>,
Expand All @@ -245,7 +280,7 @@ impl ScopeRecorder {
self.spawn_tasks()?;
metrics::set_global_recorder(self).map_err(Into::into)
}
pub fn spawn_tasks(&self) -> Result<(), std::io::Error> {
fn spawn_tasks(&self) -> Result<(), std::io::Error> {
self.inner.spawn_server(self.inner.addr)?;
Ok(())
}
Expand Down

0 comments on commit 5c0e9dc

Please sign in to comment.