diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index b74f65b7a..dad2f4e9b 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -138,6 +138,14 @@ impl TableChanges { start_version: Version, end_version: Option, ) -> DeltaResult { + let log_root = table_root.join("_delta_log/")?; + let log_segment = LogSegment::for_table_changes( + engine.get_file_system_client().as_ref(), + log_root, + start_version, + end_version, + )?; + // Both snapshots ensure that reading is supported at the start and end version using // `ensure_read_supported`. Note that we must still verify that reading is // supported for every protocol action in the CDF range. @@ -173,14 +181,6 @@ impl TableChanges { ))); } - let log_root = table_root.join("_delta_log/")?; - let log_segment = LogSegment::for_table_changes( - engine.get_file_system_client().as_ref(), - log_root, - start_version, - end_version, - )?; - let schema = StructType::new( end_snapshot .schema() diff --git a/kernel/tests/cdf.rs b/kernel/tests/cdf.rs index 2be5324fc..2560dc71d 100644 --- a/kernel/tests/cdf.rs +++ b/kernel/tests/cdf.rs @@ -6,7 +6,7 @@ use delta_kernel::engine::sync::SyncEngine; use itertools::Itertools; use delta_kernel::engine::arrow_data::ArrowEngineData; -use delta_kernel::{DeltaResult, Table, Version}; +use delta_kernel::{DeltaResult, Error, ExpressionRef, Table, Version}; mod common; use common::{load_test_data, to_arrow}; @@ -15,6 +15,7 @@ fn read_cdf_for_table( test_name: impl AsRef, start_version: Version, end_version: impl Into>, + predicate: impl Into>, ) -> DeltaResult> { let test_dir = load_test_data("tests/data", test_name.as_ref()).unwrap(); let test_path = test_dir.path().join(test_name.as_ref()); @@ -34,6 +35,7 @@ fn read_cdf_for_table( let scan = table_changes .into_scan_builder() .with_schema(schema) + .with_predicate(predicate) .build()?; let batches: Vec = scan .execute(engine)? @@ -53,7 +55,7 @@ fn read_cdf_for_table( #[test] fn cdf_with_deletion_vector() -> Result<(), Box> { - let batches = read_cdf_for_table("cdf-table-with-dv", 0, None)?; + let batches = read_cdf_for_table("cdf-table-with-dv", 0, None, None)?; // Each commit performs the following: // 0. Insert 0..=9 // 1. Remove [0, 9] @@ -99,7 +101,7 @@ fn cdf_with_deletion_vector() -> Result<(), Box> { #[test] fn basic_cdf() -> Result<(), Box> { - let batches = read_cdf_for_table("cdf-table", 0, None)?; + let batches = read_cdf_for_table("cdf-table", 0, None, None)?; let mut expected = vec![ "+----+--------+------------+------------------+-----------------+", "| id | name | birthday | _change_type | _commit_version |", @@ -136,7 +138,7 @@ fn basic_cdf() -> Result<(), Box> { #[test] fn cdf_non_partitioned() -> Result<(), Box> { - let batches = read_cdf_for_table("cdf-table-non-partitioned", 0, None)?; + let batches = read_cdf_for_table("cdf-table-non-partitioned", 0, None, None)?; let mut expected = vec![ "+----+--------+------------+-------------------+---------------+--------------+----------------+------------------+-----------------+", "| id | name | birthday | long_field | boolean_field | double_field | smallint_field | _change_type | _commit_version |", @@ -175,7 +177,7 @@ fn cdf_non_partitioned() -> Result<(), Box> { #[test] fn cdf_with_cdc_and_dvs() -> Result<(), Box> { - let batches = read_cdf_for_table("cdf-table-with-cdc-and-dvs", 0, None)?; + let batches = read_cdf_for_table("cdf-table-with-cdc-and-dvs", 0, None, None)?; let mut expected = vec![ "+----+--------------------+------------------+-----------------+", "| id | comment | _change_type | _commit_version |", @@ -229,3 +231,312 @@ fn cdf_with_cdc_and_dvs() -> Result<(), Box> { assert_batches_sorted_eq!(expected, &batches); Ok(()) } + +#[test] +fn simple_cdf_version_ranges() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-simple", 0, 0, None)?; + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 0 | insert | 0 |", + "| 1 | insert | 0 |", + "| 2 | insert | 0 |", + "| 3 | insert | 0 |", + "| 4 | insert | 0 |", + "| 5 | insert | 0 |", + "| 6 | insert | 0 |", + "| 7 | insert | 0 |", + "| 8 | insert | 0 |", + "| 9 | insert | 0 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + + let batches = read_cdf_for_table("cdf-table-simple", 1, 1, None)?; + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 0 | delete | 1 |", + "| 1 | delete | 1 |", + "| 2 | delete | 1 |", + "| 3 | delete | 1 |", + "| 4 | delete | 1 |", + "| 5 | delete | 1 |", + "| 6 | delete | 1 |", + "| 7 | delete | 1 |", + "| 8 | delete | 1 |", + "| 9 | delete | 1 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + + let batches = read_cdf_for_table("cdf-table-simple", 2, 2, None)?; + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 20 | insert | 2 |", + "| 21 | insert | 2 |", + "| 22 | insert | 2 |", + "| 23 | insert | 2 |", + "| 24 | insert | 2 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + + let batches = read_cdf_for_table("cdf-table-simple", 0, 2, None)?; + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 0 | insert | 0 |", + "| 1 | insert | 0 |", + "| 2 | insert | 0 |", + "| 3 | insert | 0 |", + "| 4 | insert | 0 |", + "| 5 | insert | 0 |", + "| 6 | insert | 0 |", + "| 7 | insert | 0 |", + "| 8 | insert | 0 |", + "| 9 | insert | 0 |", + "| 0 | delete | 1 |", + "| 1 | delete | 1 |", + "| 2 | delete | 1 |", + "| 3 | delete | 1 |", + "| 4 | delete | 1 |", + "| 5 | delete | 1 |", + "| 6 | delete | 1 |", + "| 7 | delete | 1 |", + "| 8 | delete | 1 |", + "| 9 | delete | 1 |", + "| 20 | insert | 2 |", + "| 21 | insert | 2 |", + "| 22 | insert | 2 |", + "| 23 | insert | 2 |", + "| 24 | insert | 2 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn update_operations() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-update-ops", 0, 2, None)?; + // Note: `update_pre` and `update_post` are technically not part of the delta spec, and instead + // should be `update_preimage` and `update_postimage` respectively. However, the tests in + // delta-spark use the post and pre. + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 0 | insert | 0 |", + "| 1 | insert | 0 |", + "| 2 | insert | 0 |", + "| 3 | insert | 0 |", + "| 4 | insert | 0 |", + "| 5 | insert | 0 |", + "| 6 | insert | 0 |", + "| 7 | insert | 0 |", + "| 8 | insert | 0 |", + "| 9 | insert | 0 |", + "| 20 | update_pre | 1 |", + "| 21 | update_pre | 1 |", + "| 22 | update_pre | 1 |", + "| 23 | update_pre | 1 |", + "| 24 | update_pre | 1 |", + "| 30 | update_post | 2 |", + "| 31 | update_post | 2 |", + "| 32 | update_post | 2 |", + "| 33 | update_post | 2 |", + "| 34 | update_post | 2 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn false_data_change_is_ignored() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-data-change", 0, 1, None)?; + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 0 | insert | 0 |", + "| 1 | insert | 0 |", + "| 2 | insert | 0 |", + "| 3 | insert | 0 |", + "| 4 | insert | 0 |", + "| 5 | insert | 0 |", + "| 6 | insert | 0 |", + "| 7 | insert | 0 |", + "| 8 | insert | 0 |", + "| 9 | insert | 0 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn invalid_range_end_before_start() { + let res = read_cdf_for_table("cdf-table-simple", 1, 0, None); + let expected_msg = + "Failed to build LogSegment: start_version cannot be greater than end_version"; + assert!(matches!(res, Err(Error::Generic(msg)) if msg == expected_msg)); +} + +#[test] +fn invalid_range_start_after_last_version_of_table() { + let res = read_cdf_for_table("cdf-table-simple", 3, 4, None); + let expected_msg = "Expected the first commit to have version 3"; + assert!(matches!(res, Err(Error::Generic(msg)) if msg == expected_msg)); +} + +#[test] +fn partition_table() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-partitioned", 0, 2, None)?; + let mut expected = vec![ + "+----+------+------+------------------+-----------------+", + "| id | text | part | _change_type | _commit_version |", + "+----+------+------+------------------+-----------------+", + "| 0 | old | 0 | insert | 0 |", + "| 1 | old | 1 | insert | 0 |", + "| 2 | old | 0 | insert | 0 |", + "| 3 | old | 1 | insert | 0 |", + "| 4 | old | 0 | insert | 0 |", + "| 5 | old | 1 | insert | 0 |", + "| 3 | old | 1 | delete | 1 |", + "| 1 | old | 1 | update_preimage | 1 |", + "| 1 | new | 1 | update_postimage | 1 |", + "| 0 | old | 0 | delete | 2 |", + "| 2 | old | 0 | delete | 2 |", + "| 4 | old | 0 | delete | 2 |", + "+----+------+------+------------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn backtick_column_names() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-backtick-column-names", 0, None, None)?; + let mut expected = vec![ + "+--------+----------+--------------------------+--------------+-----------------+", + "| id.num | id.num`s | struct_col | _change_type | _commit_version |", + "+--------+----------+--------------------------+--------------+-----------------+", + "| 2 | 10 | {field: 1, field.one: 2} | insert | 0 |", + "| 4 | 10 | {field: 1, field.one: 2} | insert | 0 |", + "| 1 | 10 | {field: 1, field.one: 2} | insert | 1 |", + "| 3 | 10 | {field: 1, field.one: 2} | insert | 1 |", + "| 5 | 10 | {field: 1, field.one: 2} | insert | 1 |", + "+--------+----------+--------------------------+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn unconditional_delete() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-delete-unconditional", 0, None, None)?; + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 0 | insert | 0 |", + "| 1 | insert | 0 |", + "| 2 | insert | 0 |", + "| 3 | insert | 0 |", + "| 4 | insert | 0 |", + "| 5 | insert | 0 |", + "| 6 | insert | 0 |", + "| 7 | insert | 0 |", + "| 8 | insert | 0 |", + "| 9 | insert | 0 |", + "| 0 | delete | 1 |", + "| 1 | delete | 1 |", + "| 2 | delete | 1 |", + "| 3 | delete | 1 |", + "| 4 | delete | 1 |", + "| 5 | delete | 1 |", + "| 6 | delete | 1 |", + "| 7 | delete | 1 |", + "| 8 | delete | 1 |", + "| 9 | delete | 1 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn conditional_delete_all_rows() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-delete-conditional-all-rows", 0, None, None)?; + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 0 | insert | 0 |", + "| 1 | insert | 0 |", + "| 2 | insert | 0 |", + "| 3 | insert | 0 |", + "| 4 | insert | 0 |", + "| 5 | insert | 0 |", + "| 6 | insert | 0 |", + "| 7 | insert | 0 |", + "| 8 | insert | 0 |", + "| 9 | insert | 0 |", + "| 0 | delete | 1 |", + "| 1 | delete | 1 |", + "| 2 | delete | 1 |", + "| 3 | delete | 1 |", + "| 4 | delete | 1 |", + "| 5 | delete | 1 |", + "| 6 | delete | 1 |", + "| 7 | delete | 1 |", + "| 8 | delete | 1 |", + "| 9 | delete | 1 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn conditional_delete_two_rows() -> DeltaResult<()> { + let batches = read_cdf_for_table("cdf-table-delete-conditional-two-rows", 0, None, None)?; + let mut expected = vec![ + "+----+--------------+-----------------+", + "| id | _change_type | _commit_version |", + "+----+--------------+-----------------+", + "| 0 | insert | 0 |", + "| 1 | insert | 0 |", + "| 2 | insert | 0 |", + "| 3 | insert | 0 |", + "| 4 | insert | 0 |", + "| 5 | insert | 0 |", + "| 6 | insert | 0 |", + "| 7 | insert | 0 |", + "| 8 | insert | 0 |", + "| 9 | insert | 0 |", + "| 2 | delete | 1 |", + "| 8 | delete | 1 |", + "+----+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} diff --git a/kernel/tests/data/cdf-table-backtick-column-names.tar.zst b/kernel/tests/data/cdf-table-backtick-column-names.tar.zst new file mode 100644 index 000000000..59d0a28fa Binary files /dev/null and b/kernel/tests/data/cdf-table-backtick-column-names.tar.zst differ diff --git a/kernel/tests/data/cdf-table-data-change.tar.zst b/kernel/tests/data/cdf-table-data-change.tar.zst new file mode 100644 index 000000000..3c1f9a464 Binary files /dev/null and b/kernel/tests/data/cdf-table-data-change.tar.zst differ diff --git a/kernel/tests/data/cdf-table-delete-conditional-all-rows.tar.zst b/kernel/tests/data/cdf-table-delete-conditional-all-rows.tar.zst new file mode 100644 index 000000000..dbd28fad4 Binary files /dev/null and b/kernel/tests/data/cdf-table-delete-conditional-all-rows.tar.zst differ diff --git a/kernel/tests/data/cdf-table-delete-conditional-two-rows.tar.zst b/kernel/tests/data/cdf-table-delete-conditional-two-rows.tar.zst new file mode 100644 index 000000000..60896b969 Binary files /dev/null and b/kernel/tests/data/cdf-table-delete-conditional-two-rows.tar.zst differ diff --git a/kernel/tests/data/cdf-table-delete-unconditional.tar.zst b/kernel/tests/data/cdf-table-delete-unconditional.tar.zst new file mode 100644 index 000000000..eb4895fb7 Binary files /dev/null and b/kernel/tests/data/cdf-table-delete-unconditional.tar.zst differ diff --git a/kernel/tests/data/cdf-table-partitioned.tar.zst b/kernel/tests/data/cdf-table-partitioned.tar.zst new file mode 100644 index 000000000..4e17d537d Binary files /dev/null and b/kernel/tests/data/cdf-table-partitioned.tar.zst differ diff --git a/kernel/tests/data/cdf-table-simple.tar.zst b/kernel/tests/data/cdf-table-simple.tar.zst new file mode 100644 index 000000000..98440ef62 Binary files /dev/null and b/kernel/tests/data/cdf-table-simple.tar.zst differ diff --git a/kernel/tests/data/cdf-table-update-ops.tar.zst b/kernel/tests/data/cdf-table-update-ops.tar.zst new file mode 100644 index 000000000..1745d6aca Binary files /dev/null and b/kernel/tests/data/cdf-table-update-ops.tar.zst differ