Skip to content

Commit

Permalink
fix: include traceparent in nested queries (#5080)
Browse files Browse the repository at this point in the history
* fix: include traceparent in nested queries

* fix: include additional field to make sure nested queries happen

* fix: fix a lint

* fix: corrections for different setups
  • Loading branch information
jacek-prisma authored Dec 10, 2024
1 parent 34f162e commit 3872ce4
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 13 deletions.
2 changes: 0 additions & 2 deletions libs/telemetry/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ impl TraceParent {
}
}

// TODO(aqrln): remove this method once the log capturing doesn't rely on trace IDs anymore
#[deprecated = "this must only be used to create an artificial traceparent for log capturing when tracing is disabled on the client"]
pub fn new_random() -> Self {
Self {
trace_id: TraceId::from_bytes(rand::random()),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
use query_engine_tests::*;

#[test_suite(
schema(schema),
exclude(
MongoDb,
Vitess("planetscale.js", "planetscale.js.wasm"),
Postgres("neon.js", "pg.js", "neon.js.wasm", "pg.js.wasm"),
Sqlite("libsql.js", "libsql.js.wasm", "cfd1", "react-native")
)
)]
mod logs {
use indoc::indoc;
use query_core::executor::TraceParent;

fn schema() -> String {
let schema = indoc! {
r#"model ModelA {
#id(id, Int, @id)
bs ModelB[]
}
model ModelB {
#id(id, Int, @id)
str1 String
str2 String?
str3 String? @default("SOME_DEFAULT")
a_id Int?
a ModelA? @relation(fields: [a_id], references: [id])
}"#
};

schema.to_owned()
}

#[connector_test]
async fn nested_read_logs_all_have_traceparent(mut runner: Runner) -> TestResult<()> {
let traceparent = TraceParent::new_random();

runner
.query_with_traceparent(
traceparent,
r#"{
findManyModelA {
id
bs { id, str1 }
}
}"#,
)
.await?
.assert_success();

assert_all_logs_contain_traceparents(&mut runner, traceparent).await
}

#[connector_test]
async fn nested_create_logs_all_have_traceparent(mut runner: Runner) -> TestResult<()> {
let traceparent = TraceParent::new_random();
runner
.query_with_traceparent(
traceparent,
r#"mutation {
createOneModelA(data: {
id: 1,
bs: {
createMany: {
data: [
{ id: 1, str1: "1", str2: "1", str3: "1"},
{ id: 2, str1: "2", str3: null},
{ id: 3, str1: "1"},
]
}
}
}) {
bs { id, str1 }
}
}"#,
)
.await?
.assert_success();

assert_all_logs_contain_traceparents(&mut runner, traceparent).await
}

#[connector_test]
async fn nested_update_logs_all_have_traceparent(mut runner: Runner) -> TestResult<()> {
let traceparent = TraceParent::new_random();
runner
.query_with_traceparent(
traceparent,
r#"mutation {
createOneModelA(data: {
id: 1,
bs: {
create: { id: 1, str1: "1", str2: "1", str3: "1" }
}
}) { id }
}"#,
)
.await?
.assert_success();

runner
.query_with_traceparent(
traceparent,
r#"mutation {
updateOneModelA(
where: {
id: 1
}
data: {
bs: {
updateMany: {
where: { id: 1 }
data: { str1: { set: "updated" } }
}
}
}
) {
bs { id, str1 }
}
}"#,
)
.await?
.assert_success();

assert_all_logs_contain_traceparents(&mut runner, traceparent).await
}

#[connector_test]
async fn nested_delete_in_update_logs_all_have_traceparent(mut runner: Runner) -> TestResult<()> {
let traceparent = TraceParent::new_random();
runner
.query_with_traceparent(
traceparent,
r#"mutation {
createOneModelA(data: {
id: 1,
bs: {
create: { id: 1, str1: "1", str2: "1", str3: "1" }
}
}) { id }
}"#,
)
.await?
.assert_success();

runner
.query_with_traceparent(
traceparent,
r#"mutation {
updateOneModelA(
where: {
id: 1
}
data: {
bs: {
delete: { id: 1 }
}
}
) {
bs { id, str1 }
}
}"#,
)
.await?
.assert_success();

assert_all_logs_contain_traceparents(&mut runner, traceparent).await
}

async fn assert_all_logs_contain_traceparents(runner: &mut Runner, traceparent: TraceParent) -> TestResult<()> {
let logs = runner.get_logs().await;

let query_logs = logs
.iter()
.filter(|log| {
log.split_once("db.statement=").is_some_and(|(_, q)| {
!q.starts_with("BEGIN") && !q.starts_with("COMMIT") && !q.starts_with("SET TRANSACTION")
})
})
.collect::<Vec<_>>();
assert!(!query_logs.is_empty(), "expected db.statement logs in {logs:?}");

let expected_traceparent = format!("/* traceparent='{}' */", traceparent);
let matching = query_logs
.iter()
.filter(|log| log.contains(&expected_traceparent))
.collect::<Vec<_>>();

assert!(
!matching.is_empty() && matching.len() == query_logs.len(),
"expected all logs to contain traceparent, got {logs:?}"
);

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod create_many;
mod cursor;
mod disconnect;
mod interactive_tx;
mod logs;
mod metrics;
mod multi_schema;
mod native_types;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod json_adapter;

pub use json_adapter::*;
use serde::{Deserialize, Serialize};
use telemetry::helpers::TraceParent;

use crate::{
executor_process_request, ConnectorTag, ConnectorVersion, QueryResult, TestError, TestLogCapture, TestResult,
Expand Down Expand Up @@ -307,14 +308,27 @@ impl Runner {
}

pub async fn query(&self, query: impl Into<String>) -> TestResult<QueryResult> {
self.query_with_maybe_tx_id(self.current_tx_id.as_ref(), query).await
self.query_with_params(self.current_tx_id.as_ref(), None, query).await
}

pub async fn query_in_tx(&self, tx_id: &TxId, query: impl Into<String>) -> TestResult<QueryResult> {
self.query_with_maybe_tx_id(Some(tx_id), query).await
self.query_with_params(Some(tx_id), None, query).await
}

async fn query_with_maybe_tx_id<T>(&self, tx_id: Option<&TxId>, query: T) -> TestResult<QueryResult>
pub async fn query_with_traceparent(
&self,
traceparent: TraceParent,
query: impl Into<String>,
) -> TestResult<QueryResult> {
self.query_with_params(None, Some(traceparent), query).await
}

async fn query_with_params<T>(
&self,
tx_id: Option<&TxId>,
traceparent: Option<TraceParent>,
query: T,
) -> TestResult<QueryResult>
where
T: Into<String>,
{
Expand Down Expand Up @@ -361,7 +375,7 @@ impl Runner {
}
};

let response = handler.handle(request_body, tx_id.cloned(), None).await;
let response = handler.handle(request_body, tx_id.cloned(), traceparent).await;

let result: QueryResult = match self.protocol {
EngineProtocol::Json => JsonResponse::from_graphql(response).into(),
Expand Down
2 changes: 1 addition & 1 deletion query-engine/core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod request_context;
pub use self::{execute_operation::*, interpreting_executor::InterpretingExecutor};

pub(crate) use request_context::*;
use telemetry::helpers::TraceParent;
pub use telemetry::helpers::TraceParent;

use crate::{
protocol::EngineProtocol, query_document::Operation, response_ir::ResponseData, schema::QuerySchemaRef,
Expand Down
9 changes: 5 additions & 4 deletions query-engine/core/src/interpreter/query_interpreters/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ fn read_one(
match record {
Some(record) if query.relation_load_strategy.is_query() => {
let records = record.into();
let nested = process_nested(tx, query.nested, Some(&records)).await?;
let nested = process_nested(tx, query.nested, Some(&records), traceparent).await?;

Ok(RecordSelection {
name: query.name,
Expand Down Expand Up @@ -137,7 +137,7 @@ fn read_many_by_queries(
if records.records.is_empty() && query.options.contains(QueryOption::ThrowOnEmpty) {
record_not_found()
} else {
let nested: Vec<QueryResult> = process_nested(tx, query.nested, Some(&records)).await?;
let nested: Vec<QueryResult> = process_nested(tx, query.nested, Some(&records), traceparent).await?;

Ok(RecordSelection {
name: query.name,
Expand Down Expand Up @@ -230,7 +230,7 @@ fn read_related<'conn>(
.await?
};
let model = query.parent_field.related_model();
let nested: Vec<QueryResult> = process_nested(tx, query.nested, Some(&records)).await?;
let nested: Vec<QueryResult> = process_nested(tx, query.nested, Some(&records), traceparent).await?;

Ok(RecordSelection {
name: query.name,
Expand Down Expand Up @@ -274,6 +274,7 @@ pub(crate) fn process_nested<'conn>(
tx: &'conn mut dyn ConnectionLike,
nested: Vec<ReadQuery>,
parent_result: Option<&'conn ManyRecords>,
traceparent: Option<TraceParent>,
) -> BoxFuture<'conn, InterpretationResult<Vec<QueryResult>>> {
let fut = async move {
let results = if matches!(parent_result, Some(parent_records) if parent_records.records.is_empty()) {
Expand All @@ -284,7 +285,7 @@ pub(crate) fn process_nested<'conn>(
let mut nested_results = Vec::with_capacity(nested.len());

for query in nested {
let result = execute(tx, query, parent_result, None).await?;
let result = execute(tx, query, parent_result, traceparent).await?;
nested_results.push(result);
}

Expand Down
5 changes: 3 additions & 2 deletions query-engine/core/src/interpreter/query_interpreters/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ async fn create_many(
.create_records_returning(&q.model, q.args, q.skip_duplicates, selected_fields.fields, traceparent)
.await?;

let nested: Vec<QueryResult> = super::read::process_nested(tx, selected_fields.nested, Some(&records)).await?;
let nested: Vec<QueryResult> =
super::read::process_nested(tx, selected_fields.nested, Some(&records), traceparent).await?;

let selection = RecordSelection {
name: q.name,
Expand Down Expand Up @@ -149,7 +150,7 @@ async fn create_many_split_by_shape(
};

let nested: Vec<QueryResult> =
super::read::process_nested(tx, selected_fields.nested.clone(), Some(&records)).await?;
super::read::process_nested(tx, selected_fields.nested.clone(), Some(&records), traceparent).await?;

let selection = RecordSelection {
name: q.name,
Expand Down

0 comments on commit 3872ce4

Please sign in to comment.