diff --git a/Cargo.lock b/Cargo.lock index c025347..6a9f8db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -463,7 +463,7 @@ dependencies = [ [[package]] name = "metrics-exporter-scope" -version = "0.1.2" +version = "0.1.3" dependencies = [ "bma-ts", "metrics 0.22.3", diff --git a/Cargo.toml b/Cargo.toml index 99a542c..cfb6ea4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "metrics-exporter-scope" -version = "0.1.2" +version = "0.1.3" edition = "2021" authors = ["Serhij S. "] license = "Apache-2.0" diff --git a/proto.md b/proto.md index 461b846..a8c393b 100644 --- a/proto.md +++ b/proto.md @@ -32,13 +32,13 @@ where ## Communication -The server sends serialized metrics snapshot packets as well as informational -ones to the client. The first packet is always an informational one. The client +The server sends serialized metrics snapshot packets as well as information +ones to the client. The first packet is always an information one. The client should determine the packet type according to its structure. -### Informational packets +### Information packets -The informational packets are used to send metrics metadata to the client. The +The information packets are used to send metrics metadata to the client. The server sends such packets every 5 seconds (by default). ```json diff --git a/src/lib.rs b/src/lib.rs index 18ad2bc..7f40c47 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ #![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ] +#![deny(missing_docs)] #[cfg(feature = "msrv")] extern crate metrics_legacy as metrics; #[cfg(feature = "msrv")] @@ -20,18 +21,24 @@ use rtsc::time::interval; use serde::{Deserialize, Serialize}; use tracing::{error, info}; +/// Crate error type #[derive(thiserror::Error, Debug)] pub enum Error { + /// I/O errors #[error("io error: {0}")] Io(#[from] std::io::Error), - #[error("{0}")] - Other(String), + /// Data serialization errors #[error("encode error: {0}")] Encode(#[from] rmp_serde::encode::Error), + /// Data deserialization errors #[error("decode error: {0}")] Decode(#[from] rmp_serde::decode::Error), + /// Recorder setup errors #[error("set recorder error: {0}")] SetRecorder(#[from] metrics::SetRecorderError), + /// Other errors + #[error("{0}")] + Other(String), } impl From for Error { @@ -46,8 +53,10 @@ const SEND_INFO_INTERVAL: Duration = Duration::from_secs(5); const SERVER_THREAD_NAME: &str = "MScopeSrv"; +/// Communication protocol pub mod protocol { + /// Current protocol version pub const VERSION: u16 = 1; use std::io::{Read, Write}; @@ -55,14 +64,7 @@ pub mod protocol { use crate::{ClientSettings, Error, Packet}; use serde::{Deserialize, Serialize}; - pub fn write_version(mut stream: W) -> Result<(), Error> - where - W: Write, - { - stream.write_all(&VERSION.to_le_bytes())?; - Ok(()) - } - + /// Read protocol version from a stream pub fn read_version(mut stream: R) -> Result where R: Read, @@ -72,6 +74,16 @@ pub mod protocol { Ok(u16::from_le_bytes(*buf)) } + /// Write protocol version to a stream + pub fn write_version(mut stream: W) -> Result<(), Error> + where + W: Write, + { + stream.write_all(&VERSION.to_le_bytes())?; + Ok(()) + } + + /// Read a packet from a stream pub fn read_packet(stream: R) -> Result where R: Read, @@ -79,6 +91,7 @@ pub mod protocol { read(stream) } + /// Write a packet to a stream pub fn write_packet(stream: W, packet: &Packet) -> Result<(), Error> where W: Write, @@ -86,6 +99,7 @@ pub mod protocol { write(stream, packet) } + /// Read client settings from a stream pub fn read_client_settings(stream: R) -> Result where R: Read, @@ -93,6 +107,7 @@ pub mod protocol { read(stream) } + /// Write client settings to a stream pub fn write_client_settings(stream: W, settings: &ClientSettings) -> Result<(), Error> where W: Write, @@ -125,13 +140,17 @@ pub mod protocol { } } +/// Communication packets #[derive(Clone, Serialize, Deserialize, Debug)] #[serde(untagged)] pub enum Packet { + /// Information packet (metrics metadata) Info(Info), + /// Snapshot packet (metrics data) Snapshot(Snapshot), } +/// Client settings #[derive(Clone, Serialize, Deserialize, Debug)] pub struct ClientSettings { sampling_interval: u64, @@ -148,28 +167,33 @@ impl ClientSettings { } } +/// Information packet #[derive(Clone, Serialize, Deserialize, Debug)] pub struct Info { metrics: BTreeMap, } impl Info { + /// Get metrics metadata map pub fn metrics(&self) -> &BTreeMap { &self.metrics } } +/// Metrics metadata #[derive(Clone, Serialize, Deserialize, Debug)] pub struct MetricInfo { labels: BTreeMap, } impl MetricInfo { + /// Metric labels map pub fn labels(&self) -> &BTreeMap { &self.labels } } +/// Snapshot packet #[derive(Clone, Serialize, Deserialize, Debug)] pub struct Snapshot { t: Monotonic, @@ -177,20 +201,25 @@ pub struct Snapshot { } impl Snapshot { + /// Snapshot timestamp (monotonic, relative to the communication start) pub fn ts(&self) -> Monotonic { self.t } + /// Snapshot data map (metric name -> value) pub fn data(&self) -> &BTreeMap { &self.d } + /// Snapshot data map mutable (metric name -> value) pub fn data_mut(&mut self) -> &mut BTreeMap { &mut self.d } + /// Take snapshot data map pub fn take_data(&mut self) -> BTreeMap { std::mem::take(&mut self.d) } } +/// Exporter builder pub struct ScopeBuilder { addr: SocketAddr, fallback: Option>, @@ -203,28 +232,34 @@ impl Default for ScopeBuilder { } impl ScopeBuilder { + /// Create a new exporter builder pub fn new() -> Self { Self { addr: (std::net::Ipv4Addr::UNSPECIFIED, 5001).into(), fallback: None, } } + /// Set the server listening address and port pub fn with_addr>(mut self, addr: A) -> Self { self.addr = addr.into(); self } + /// Set the fallback recorder pub fn with_fallback(mut self, fallback: Box) -> Self { self.fallback = Some(fallback); self } + /// Build the exporter's recorder pub fn build(self) -> ScopeRecorder { ScopeRecorder::build(self.addr, self.fallback) } + /// Build the exporter's recorder and install it as the global recorder pub fn install(self) -> Result<(), Error> { self.build().install() } } +/// Scope recorder #[derive(Clone)] pub struct ScopeRecorder { inner: Arc, @@ -245,7 +280,7 @@ impl ScopeRecorder { self.spawn_tasks()?; metrics::set_global_recorder(self).map_err(Into::into) } - pub fn spawn_tasks(&self) -> Result<(), std::io::Error> { + fn spawn_tasks(&self) -> Result<(), std::io::Error> { self.inner.spawn_server(self.inner.addr)?; Ok(()) }