Skip to content

Commit

Permalink
feat(telemetry): add dynamic "db.system" attribute to OTEL spans (#5019)
Browse files Browse the repository at this point in the history
* feat(telemetry): add dynamic "db.system" attribute to OTEL spans

* chore: clippy

* chore(driver-adapters): add "db_system_name()" to "AdapterFlavour"

* chore(driver-adapters): rename SYSTEM_NAME to DB_SYSTEM_NAME, and change member variable to constant

* chore(driver-adapters): get rid of db_system_name(), adapt name() on Connector trait

* chore(driver-adapters): fix typos

* chore: clippy

* chore: fix compilation on wasm32

* feat(tracing): add "otel.kind" = "client" on query spans

* chore: fix Wasm compilation

* feat(core): extract otel_kind from span attributes

* chore(core): clippy

* chore(core): remove percentage sigil for constant strings

* chore(core): rename "system_name" to "db_system_name"

* feat(core): add mongodb-specific "db.collection.name" and "db.operation.name" attributes in spans

* chore(core): review comments

* chore: Update query-engine/connectors/mongodb-query-connector/src/root_queries/mod.rs

Co-authored-by: Alexey Orlenko <[email protected]>

* chore: address review comments

---------

Co-authored-by: Alexey Orlenko <[email protected]>
  • Loading branch information
jkomyno and aqrln authored Oct 22, 2024
1 parent 4fe298b commit 8263a1f
Show file tree
Hide file tree
Showing 18 changed files with 236 additions and 115 deletions.
11 changes: 9 additions & 2 deletions quaint/src/connector/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@ use crate::ast::{Params, Value};
use crosstarget_utils::time::ElapsedTimeCounter;
use std::future::Future;

pub async fn query<'a, F, T, U>(tag: &'static str, query: &'a str, params: &'a [Value<'_>], f: F) -> crate::Result<T>
pub async fn query<'a, F, T, U>(
tag: &'static str,
db_system_name: &'static str,
query: &'a str,
params: &'a [Value<'_>],
f: F,
) -> crate::Result<T>
where
F: FnOnce() -> U + 'a,
U: Future<Output = crate::Result<T>>,
{
let span = info_span!("quaint:query", "db.statement" = %query);
let span =
info_span!("quaint:query", "db.system" = db_system_name, "db.statement" = %query, "otel.kind" = "client");
do_query(tag, query, params, f).instrument(span).await
}

Expand Down
7 changes: 4 additions & 3 deletions quaint/src/connector/mssql/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tokio_util::compat::{Compat, TokioAsyncWriteCompatExt};
pub use tiberius;

static SQL_SERVER_DEFAULT_ISOLATION: IsolationLevel = IsolationLevel::ReadCommitted;
const DB_SYSTEM_NAME: &str = "mssql";

#[async_trait]
impl TransactionCapable for Mssql {
Expand Down Expand Up @@ -130,7 +131,7 @@ impl Queryable for Mssql {
}

async fn query_raw(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<ResultSet> {
metrics::query("mssql.query_raw", sql, params, move || async move {
metrics::query("mssql.query_raw", DB_SYSTEM_NAME, sql, params, move || async move {
let mut client = self.client.lock().await;

let mut query = tiberius::Query::new(sql);
Expand Down Expand Up @@ -193,7 +194,7 @@ impl Queryable for Mssql {
}

async fn execute_raw(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<u64> {
metrics::query("mssql.execute_raw", sql, params, move || async move {
metrics::query("mssql.execute_raw", DB_SYSTEM_NAME, sql, params, move || async move {
let mut query = tiberius::Query::new(sql);

for param in params {
Expand All @@ -213,7 +214,7 @@ impl Queryable for Mssql {
}

async fn raw_cmd(&self, cmd: &str) -> crate::Result<()> {
metrics::query("mssql.raw_cmd", cmd, &[], move || async move {
metrics::query("mssql.raw_cmd", DB_SYSTEM_NAME, cmd, &[], move || async move {
let mut client = self.client.lock().await;
self.perform_io(client.simple_query(cmd)).await?.into_results().await?;
Ok(())
Expand Down
8 changes: 5 additions & 3 deletions quaint/src/connector/mysql/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ impl MysqlUrl {
}
}

const DB_SYSTEM_NAME: &str = "mysql";

/// A connector interface for the MySQL database.
#[derive(Debug)]
pub struct Mysql {
Expand Down Expand Up @@ -195,7 +197,7 @@ impl Queryable for Mysql {
}

async fn query_raw(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<ResultSet> {
metrics::query("mysql.query_raw", sql, params, move || async move {
metrics::query("mysql.query_raw", DB_SYSTEM_NAME, sql, params, move || async move {
self.prepared(sql, |stmt| async move {
let mut conn = self.conn.lock().await;
let rows: Vec<my::Row> = conn.exec(&stmt, conversion::conv_params(params)?).await?;
Expand Down Expand Up @@ -280,7 +282,7 @@ impl Queryable for Mysql {
}

async fn execute_raw(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<u64> {
metrics::query("mysql.execute_raw", sql, params, move || async move {
metrics::query("mysql.execute_raw", DB_SYSTEM_NAME, sql, params, move || async move {
self.prepared(sql, |stmt| async move {
let mut conn = self.conn.lock().await;
conn.exec_drop(stmt, conversion::conv_params(params)?).await?;
Expand All @@ -297,7 +299,7 @@ impl Queryable for Mysql {
}

async fn raw_cmd(&self, cmd: &str) -> crate::Result<()> {
metrics::query("mysql.raw_cmd", cmd, &[], move || async move {
metrics::query("mysql.raw_cmd", DB_SYSTEM_NAME, cmd, &[], move || async move {
self.perform_io(|| async move {
let mut conn = self.conn.lock().await;
let mut result = cmd.run(&mut *conn).await?;
Expand Down
206 changes: 121 additions & 85 deletions quaint/src/connector/postgres/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ impl Debug for PostgresClient {
}
}

const DB_SYSTEM_NAME_POSTGRESQL: &str = "postgresql";
const DB_SYSTEM_NAME_COCKROACHDB: &str = "cockroachdb";

/// A connector interface for the PostgreSQL database.
#[derive(Debug)]
pub struct PostgreSql {
Expand All @@ -65,6 +68,7 @@ pub struct PostgreSql {
is_healthy: AtomicBool,
is_cockroachdb: bool,
is_materialize: bool,
db_system_name: &'static str,
}

/// Key uniquely representing an SQL statement in the prepared statements cache.
Expand Down Expand Up @@ -285,6 +289,12 @@ impl PostgreSql {
}
}

let db_system_name = if is_cockroachdb {
DB_SYSTEM_NAME_COCKROACHDB
} else {
DB_SYSTEM_NAME_POSTGRESQL
};

Ok(Self {
client: PostgresClient(client),
socket_timeout: url.query_params.socket_timeout,
Expand All @@ -293,6 +303,7 @@ impl PostgreSql {
is_healthy: AtomicBool::new(true),
is_cockroachdb,
is_materialize,
db_system_name,
})
}

Expand All @@ -308,6 +319,7 @@ impl PostgreSql {
is_healthy: AtomicBool::new(true),
is_cockroachdb: false,
is_materialize: false,
db_system_name: DB_SYSTEM_NAME_POSTGRESQL,
})
}

Expand Down Expand Up @@ -539,72 +551,84 @@ impl Queryable for PostgreSql {
async fn query_raw(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<ResultSet> {
self.check_bind_variables_len(params)?;

metrics::query("postgres.query_raw", sql, params, move || async move {
let stmt = self.fetch_cached(sql, &[]).await?;

if stmt.params().len() != params.len() {
let kind = ErrorKind::IncorrectNumberOfParameters {
expected: stmt.params().len(),
actual: params.len(),
};

return Err(Error::builder(kind).build());
}
metrics::query(
"postgres.query_raw",
self.db_system_name,
sql,
params,
move || async move {
let stmt = self.fetch_cached(sql, &[]).await?;

if stmt.params().len() != params.len() {
let kind = ErrorKind::IncorrectNumberOfParameters {
expected: stmt.params().len(),
actual: params.len(),
};

return Err(Error::builder(kind).build());
}

let rows = self
.perform_io(self.client.0.query(&stmt, conversion::conv_params(params).as_slice()))
.await?;
let rows = self
.perform_io(self.client.0.query(&stmt, conversion::conv_params(params).as_slice()))
.await?;

let col_types = stmt
.columns()
.iter()
.map(|c| PGColumnType::from_pg_type(c.type_()))
.map(ColumnType::from)
.collect::<Vec<_>>();
let mut result = ResultSet::new(stmt.to_column_names(), col_types, Vec::new());
let col_types = stmt
.columns()
.iter()
.map(|c| PGColumnType::from_pg_type(c.type_()))
.map(ColumnType::from)
.collect::<Vec<_>>();
let mut result = ResultSet::new(stmt.to_column_names(), col_types, Vec::new());

for row in rows {
result.rows.push(row.get_result_row()?);
}
for row in rows {
result.rows.push(row.get_result_row()?);
}

Ok(result)
})
Ok(result)
},
)
.await
}

async fn query_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<ResultSet> {
self.check_bind_variables_len(params)?;

metrics::query("postgres.query_raw", sql, params, move || async move {
let stmt = self.fetch_cached(sql, params).await?;

if stmt.params().len() != params.len() {
let kind = ErrorKind::IncorrectNumberOfParameters {
expected: stmt.params().len(),
actual: params.len(),
};

return Err(Error::builder(kind).build());
}
metrics::query(
"postgres.query_raw",
self.db_system_name,
sql,
params,
move || async move {
let stmt = self.fetch_cached(sql, params).await?;

if stmt.params().len() != params.len() {
let kind = ErrorKind::IncorrectNumberOfParameters {
expected: stmt.params().len(),
actual: params.len(),
};

return Err(Error::builder(kind).build());
}

let col_types = stmt
.columns()
.iter()
.map(|c| PGColumnType::from_pg_type(c.type_()))
.map(ColumnType::from)
.collect::<Vec<_>>();
let rows = self
.perform_io(self.client.0.query(&stmt, conversion::conv_params(params).as_slice()))
.await?;
let col_types = stmt
.columns()
.iter()
.map(|c| PGColumnType::from_pg_type(c.type_()))
.map(ColumnType::from)
.collect::<Vec<_>>();
let rows = self
.perform_io(self.client.0.query(&stmt, conversion::conv_params(params).as_slice()))
.await?;

let mut result = ResultSet::new(stmt.to_column_names(), col_types, Vec::new());
let mut result = ResultSet::new(stmt.to_column_names(), col_types, Vec::new());

for row in rows {
result.rows.push(row.get_result_row()?);
}
for row in rows {
result.rows.push(row.get_result_row()?);
}

Ok(result)
})
Ok(result)
},
)
.await
}

Expand Down Expand Up @@ -692,53 +716,65 @@ impl Queryable for PostgreSql {
async fn execute_raw(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<u64> {
self.check_bind_variables_len(params)?;

metrics::query("postgres.execute_raw", sql, params, move || async move {
let stmt = self.fetch_cached(sql, &[]).await?;

if stmt.params().len() != params.len() {
let kind = ErrorKind::IncorrectNumberOfParameters {
expected: stmt.params().len(),
actual: params.len(),
};

return Err(Error::builder(kind).build());
}
metrics::query(
"postgres.execute_raw",
self.db_system_name,
sql,
params,
move || async move {
let stmt = self.fetch_cached(sql, &[]).await?;

if stmt.params().len() != params.len() {
let kind = ErrorKind::IncorrectNumberOfParameters {
expected: stmt.params().len(),
actual: params.len(),
};

return Err(Error::builder(kind).build());
}

let changes = self
.perform_io(self.client.0.execute(&stmt, conversion::conv_params(params).as_slice()))
.await?;
let changes = self
.perform_io(self.client.0.execute(&stmt, conversion::conv_params(params).as_slice()))
.await?;

Ok(changes)
})
Ok(changes)
},
)
.await
}

async fn execute_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<u64> {
self.check_bind_variables_len(params)?;

metrics::query("postgres.execute_raw", sql, params, move || async move {
let stmt = self.fetch_cached(sql, params).await?;

if stmt.params().len() != params.len() {
let kind = ErrorKind::IncorrectNumberOfParameters {
expected: stmt.params().len(),
actual: params.len(),
};

return Err(Error::builder(kind).build());
}
metrics::query(
"postgres.execute_raw",
self.db_system_name,
sql,
params,
move || async move {
let stmt = self.fetch_cached(sql, params).await?;

if stmt.params().len() != params.len() {
let kind = ErrorKind::IncorrectNumberOfParameters {
expected: stmt.params().len(),
actual: params.len(),
};

return Err(Error::builder(kind).build());
}

let changes = self
.perform_io(self.client.0.execute(&stmt, conversion::conv_params(params).as_slice()))
.await?;
let changes = self
.perform_io(self.client.0.execute(&stmt, conversion::conv_params(params).as_slice()))
.await?;

Ok(changes)
})
Ok(changes)
},
)
.await
}

async fn raw_cmd(&self, cmd: &str) -> crate::Result<()> {
metrics::query("postgres.raw_cmd", cmd, &[], move || async move {
metrics::query("postgres.raw_cmd", self.db_system_name, cmd, &[], move || async move {
self.perform_io(self.client.0.simple_query(cmd)).await?;
Ok(())
})
Expand Down
Loading

0 comments on commit 8263a1f

Please sign in to comment.