diff --git a/libs/telemetry/src/helpers.rs b/libs/telemetry/src/helpers.rs index 4a332e86af63..966de90be551 100644 --- a/libs/telemetry/src/helpers.rs +++ b/libs/telemetry/src/helpers.rs @@ -49,8 +49,6 @@ impl TraceParent { } } - // TODO(aqrln): remove this method once the log capturing doesn't rely on trace IDs anymore - #[deprecated = "this must only be used to create an artificial traceparent for log capturing when tracing is disabled on the client"] pub fn new_random() -> Self { Self { trace_id: TraceId::from_bytes(rand::random()), diff --git a/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/logs.rs b/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/logs.rs new file mode 100644 index 000000000000..3de56c102c39 --- /dev/null +++ b/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/logs.rs @@ -0,0 +1,198 @@ +use query_engine_tests::*; + +#[test_suite( + schema(schema), + exclude( + MongoDb, + Vitess("planetscale.js", "planetscale.js.wasm"), + Postgres("neon.js", "pg.js", "neon.js.wasm", "pg.js.wasm"), + Sqlite("libsql.js", "libsql.js.wasm", "cfd1", "react-native") + ) +)] +mod logs { + use indoc::indoc; + use query_core::executor::TraceParent; + + fn schema() -> String { + let schema = indoc! { + r#"model ModelA { + #id(id, Int, @id) + bs ModelB[] + } + + model ModelB { + #id(id, Int, @id) + str1 String + str2 String? + str3 String? @default("SOME_DEFAULT") + a_id Int? + a ModelA? @relation(fields: [a_id], references: [id]) + }"# + }; + + schema.to_owned() + } + + #[connector_test] + async fn nested_read_logs_all_have_traceparent(mut runner: Runner) -> TestResult<()> { + let traceparent = TraceParent::new_random(); + + runner + .query_with_traceparent( + traceparent, + r#"{ + findManyModelA { + id + bs { id, str1 } + } + }"#, + ) + .await? + .assert_success(); + + assert_all_logs_contain_traceparents(&mut runner, traceparent).await + } + + #[connector_test] + async fn nested_create_logs_all_have_traceparent(mut runner: Runner) -> TestResult<()> { + let traceparent = TraceParent::new_random(); + runner + .query_with_traceparent( + traceparent, + r#"mutation { + createOneModelA(data: { + id: 1, + bs: { + createMany: { + data: [ + { id: 1, str1: "1", str2: "1", str3: "1"}, + { id: 2, str1: "2", str3: null}, + { id: 3, str1: "1"}, + ] + } + } + }) { + bs { id, str1 } + } + }"#, + ) + .await? + .assert_success(); + + assert_all_logs_contain_traceparents(&mut runner, traceparent).await + } + + #[connector_test] + async fn nested_update_logs_all_have_traceparent(mut runner: Runner) -> TestResult<()> { + let traceparent = TraceParent::new_random(); + runner + .query_with_traceparent( + traceparent, + r#"mutation { + createOneModelA(data: { + id: 1, + bs: { + create: { id: 1, str1: "1", str2: "1", str3: "1" } + } + }) { id } + }"#, + ) + .await? + .assert_success(); + + runner + .query_with_traceparent( + traceparent, + r#"mutation { + updateOneModelA( + where: { + id: 1 + } + data: { + bs: { + updateMany: { + where: { id: 1 } + data: { str1: { set: "updated" } } + } + } + } + ) { + bs { id, str1 } + } + }"#, + ) + .await? + .assert_success(); + + assert_all_logs_contain_traceparents(&mut runner, traceparent).await + } + + #[connector_test] + async fn nested_delete_in_update_logs_all_have_traceparent(mut runner: Runner) -> TestResult<()> { + let traceparent = TraceParent::new_random(); + runner + .query_with_traceparent( + traceparent, + r#"mutation { + createOneModelA(data: { + id: 1, + bs: { + create: { id: 1, str1: "1", str2: "1", str3: "1" } + } + }) { id } + }"#, + ) + .await? + .assert_success(); + + runner + .query_with_traceparent( + traceparent, + r#"mutation { + updateOneModelA( + where: { + id: 1 + } + data: { + bs: { + delete: { id: 1 } + } + } + ) { + bs { id, str1 } + } + }"#, + ) + .await? + .assert_success(); + + assert_all_logs_contain_traceparents(&mut runner, traceparent).await + } + + async fn assert_all_logs_contain_traceparents(runner: &mut Runner, traceparent: TraceParent) -> TestResult<()> { + let logs = runner.get_logs().await; + + let query_logs = logs + .iter() + .filter(|log| { + log.split_once("db.statement=").is_some_and(|(_, q)| { + !q.starts_with("BEGIN") && !q.starts_with("COMMIT") && !q.starts_with("SET TRANSACTION") + }) + }) + .collect::>(); + assert!(!query_logs.is_empty(), "expected db.statement logs in {logs:?}"); + + let expected_traceparent = format!("/* traceparent='{}' */", traceparent); + let matching = query_logs + .iter() + .filter(|log| log.contains(&expected_traceparent)) + .collect::>(); + + assert!( + !matching.is_empty() && matching.len() == query_logs.len(), + "expected all logs to contain traceparent, got {logs:?}" + ); + + Ok(()) + } +} diff --git a/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/mod.rs b/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/mod.rs index d25815d93fa7..b4e4baa89b15 100644 --- a/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/mod.rs +++ b/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/mod.rs @@ -3,6 +3,7 @@ mod create_many; mod cursor; mod disconnect; mod interactive_tx; +mod logs; mod metrics; mod multi_schema; mod native_types; diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/mod.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/mod.rs index de8ee9bd33be..d1a1ba0f55a5 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/mod.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/runner/mod.rs @@ -2,6 +2,7 @@ mod json_adapter; pub use json_adapter::*; use serde::{Deserialize, Serialize}; +use telemetry::helpers::TraceParent; use crate::{ executor_process_request, ConnectorTag, ConnectorVersion, QueryResult, TestError, TestLogCapture, TestResult, @@ -307,14 +308,27 @@ impl Runner { } pub async fn query(&self, query: impl Into) -> TestResult { - self.query_with_maybe_tx_id(self.current_tx_id.as_ref(), query).await + self.query_with_params(self.current_tx_id.as_ref(), None, query).await } pub async fn query_in_tx(&self, tx_id: &TxId, query: impl Into) -> TestResult { - self.query_with_maybe_tx_id(Some(tx_id), query).await + self.query_with_params(Some(tx_id), None, query).await } - async fn query_with_maybe_tx_id(&self, tx_id: Option<&TxId>, query: T) -> TestResult + pub async fn query_with_traceparent( + &self, + traceparent: TraceParent, + query: impl Into, + ) -> TestResult { + self.query_with_params(None, Some(traceparent), query).await + } + + async fn query_with_params( + &self, + tx_id: Option<&TxId>, + traceparent: Option, + query: T, + ) -> TestResult where T: Into, { @@ -361,7 +375,7 @@ impl Runner { } }; - let response = handler.handle(request_body, tx_id.cloned(), None).await; + let response = handler.handle(request_body, tx_id.cloned(), traceparent).await; let result: QueryResult = match self.protocol { EngineProtocol::Json => JsonResponse::from_graphql(response).into(), diff --git a/query-engine/core/src/executor/mod.rs b/query-engine/core/src/executor/mod.rs index c7846f7ff7cb..e871e8ffb046 100644 --- a/query-engine/core/src/executor/mod.rs +++ b/query-engine/core/src/executor/mod.rs @@ -14,7 +14,7 @@ mod request_context; pub use self::{execute_operation::*, interpreting_executor::InterpretingExecutor}; pub(crate) use request_context::*; -use telemetry::helpers::TraceParent; +pub use telemetry::helpers::TraceParent; use crate::{ protocol::EngineProtocol, query_document::Operation, response_ir::ResponseData, schema::QuerySchemaRef, diff --git a/query-engine/core/src/interpreter/query_interpreters/read.rs b/query-engine/core/src/interpreter/query_interpreters/read.rs index d79f4fd5c998..4111afcb993b 100644 --- a/query-engine/core/src/interpreter/query_interpreters/read.rs +++ b/query-engine/core/src/interpreter/query_interpreters/read.rs @@ -47,7 +47,7 @@ fn read_one( match record { Some(record) if query.relation_load_strategy.is_query() => { let records = record.into(); - let nested = process_nested(tx, query.nested, Some(&records)).await?; + let nested = process_nested(tx, query.nested, Some(&records), traceparent).await?; Ok(RecordSelection { name: query.name, @@ -137,7 +137,7 @@ fn read_many_by_queries( if records.records.is_empty() && query.options.contains(QueryOption::ThrowOnEmpty) { record_not_found() } else { - let nested: Vec = process_nested(tx, query.nested, Some(&records)).await?; + let nested: Vec = process_nested(tx, query.nested, Some(&records), traceparent).await?; Ok(RecordSelection { name: query.name, @@ -230,7 +230,7 @@ fn read_related<'conn>( .await? }; let model = query.parent_field.related_model(); - let nested: Vec = process_nested(tx, query.nested, Some(&records)).await?; + let nested: Vec = process_nested(tx, query.nested, Some(&records), traceparent).await?; Ok(RecordSelection { name: query.name, @@ -274,6 +274,7 @@ pub(crate) fn process_nested<'conn>( tx: &'conn mut dyn ConnectionLike, nested: Vec, parent_result: Option<&'conn ManyRecords>, + traceparent: Option, ) -> BoxFuture<'conn, InterpretationResult>> { let fut = async move { let results = if matches!(parent_result, Some(parent_records) if parent_records.records.is_empty()) { @@ -284,7 +285,7 @@ pub(crate) fn process_nested<'conn>( let mut nested_results = Vec::with_capacity(nested.len()); for query in nested { - let result = execute(tx, query, parent_result, None).await?; + let result = execute(tx, query, parent_result, traceparent).await?; nested_results.push(result); } diff --git a/query-engine/core/src/interpreter/query_interpreters/write.rs b/query-engine/core/src/interpreter/query_interpreters/write.rs index 453964369801..7421097af283 100644 --- a/query-engine/core/src/interpreter/query_interpreters/write.rs +++ b/query-engine/core/src/interpreter/query_interpreters/write.rs @@ -77,7 +77,8 @@ async fn create_many( .create_records_returning(&q.model, q.args, q.skip_duplicates, selected_fields.fields, traceparent) .await?; - let nested: Vec = super::read::process_nested(tx, selected_fields.nested, Some(&records)).await?; + let nested: Vec = + super::read::process_nested(tx, selected_fields.nested, Some(&records), traceparent).await?; let selection = RecordSelection { name: q.name, @@ -149,7 +150,7 @@ async fn create_many_split_by_shape( }; let nested: Vec = - super::read::process_nested(tx, selected_fields.nested.clone(), Some(&records)).await?; + super::read::process_nested(tx, selected_fields.nested.clone(), Some(&records), traceparent).await?; let selection = RecordSelection { name: q.name,