diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index cc0be3b14..993a89912 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -89,7 +89,7 @@ fn add_transform_expr() -> Expression { /// of time whether to filter out add/remove actions. /// - Constructs the remove deletion vector map from paths belonging to `remove` actions to the /// action's corresponding [`DvInfo`]. This map will be filtered to only contain paths that -/// exists in another `add` action _within the same commit_. We store the result in `remove_dvs`. +/// exists in another `add` action _within the same commit_. We store the result in `remove_dvs`. /// Deletion vector resolution affects whether a remove action is selected in the second /// phase, so we must perform it ahead of time in phase 1. /// - Ensure that reading is supported on any protocol updates. diff --git a/kernel/src/table_changes/log_replay/tests.rs b/kernel/src/table_changes/log_replay/tests.rs index 5409d5eb1..9953dd464 100644 --- a/kernel/src/table_changes/log_replay/tests.rs +++ b/kernel/src/table_changes/log_replay/tests.rs @@ -113,7 +113,8 @@ async fn cdf_not_enabled() { let res: DeltaResult> = table_changes_action_iter(engine, commits, get_schema().into(), None) - .and_then(|iter| iter.try_collect()); + .unwrap() + .try_collect(); assert!(matches!(res, Err(Error::ChangeDataFeedUnsupported(_)))); } @@ -143,7 +144,8 @@ async fn unsupported_reader_feature() { let res: DeltaResult> = table_changes_action_iter(engine, commits, get_schema().into(), None) - .and_then(|iter| iter.try_collect()); + .unwrap() + .try_collect(); assert!(matches!(res, Err(Error::ChangeDataFeedUnsupported(_)))); } @@ -173,7 +175,8 @@ async fn column_mapping_should_fail() { let res: DeltaResult> = table_changes_action_iter(engine, commits, get_schema().into(), None) - .and_then(|iter| iter.try_collect()); + .unwrap() + .try_collect(); assert!(matches!(res, Err(Error::ChangeDataFeedUnsupported(_)))); } @@ -203,7 +206,8 @@ async fn incompatible_schemas_fail() { let res: DeltaResult> = table_changes_action_iter(engine, commits, cdf_schema.into(), None) - .and_then(|iter| iter.try_collect()); + .unwrap() + .try_collect(); assert!(matches!( res, @@ -211,15 +215,6 @@ async fn incompatible_schemas_fail() { )); } - // The CDF schema has fields: `id: int` and `value: string`. - // This commit has schema with fields: `id: long`, `value: string` and `year: int` (non-nullable). - let schema = StructType::new([ - StructField::new("id", DataType::LONG, true), - StructField::new("value", DataType::STRING, true), - StructField::new("year", DataType::INTEGER, false), - ]); - assert_incompatible_schema(schema, get_schema()).await; - // The CDF schema has fields: `id: int` and `value: string`. // This commit has schema with fields: `id: long`, `value: string` and `year: int` (nullable). let schema = StructType::new([ @@ -580,7 +575,8 @@ async fn failing_protocol() { let res: DeltaResult> = table_changes_action_iter(engine, commits, get_schema().into(), None) - .and_then(|iter| iter.try_collect()); + .unwrap() + .try_collect(); assert!(res.is_err()); } diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 67d72014b..87fd918f9 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -176,7 +176,7 @@ impl TableChangesScan { /// deletion vectors present in the commit. The engine data in each scan data is guaranteed /// to belong to the same commit. Several [`TableChangesScanData`] may belong to the same commit. #[allow(unused)] - pub(crate) fn scan_data( + fn scan_data( &self, engine: Arc, ) -> DeltaResult>> {