Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add table changes constructor #505

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ pub enum KernelError {
FileAlreadyExists,
MissingCommitInfo,
UnsupportedError,
TableChangesDisabled,
OussamaSaoudi-db marked this conversation as resolved.
Show resolved Hide resolved
}

impl From<Error> for KernelError {
Expand Down Expand Up @@ -435,6 +436,7 @@ impl From<Error> for KernelError {
Error::FileAlreadyExists(_) => KernelError::FileAlreadyExists,
Error::MissingCommitInfo => KernelError::MissingCommitInfo,
Error::Unsupported(_) => KernelError::UnsupportedError,
Error::TableChangesDisabled(_) => KernelError::TableChangesDisabled,
OussamaSaoudi-db marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions kernel/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use crate::schema::DataType;
use crate::Version;

/// A [`std::result::Result`] that has the kernel [`Error`] as the error variant
pub type DeltaResult<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -179,6 +180,9 @@ pub enum Error {
/// Some functionality is currently unsupported
#[error("Unsupported: {0}")]
Unsupported(String),

#[error("Change data feed is unsupported for the table at version {0}")]
ChangeDataFeedUnsupported(Version),
}

// Convenience constructors for Error types that take a String argument
Expand Down Expand Up @@ -242,6 +246,9 @@ impl Error {
pub fn unsupported(msg: impl ToString) -> Self {
Self::Unsupported(msg.to_string())
}
pub fn change_data_feed_unsupported(version: impl Into<Version>) -> Self {
Self::ChangeDataFeedUnsupported(version.into())
}

// Capture a backtrace when the error is constructed.
#[must_use]
Expand Down
1 change: 1 addition & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub mod engine_data;
pub mod error;
pub mod expressions;
pub(crate) mod predicates;
pub mod table_changes;
pub mod table_features;

#[cfg(feature = "developer-visibility")]
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl Snapshot {
///
/// # Parameters
///
/// - `location`: url pointing at the table root (where `_delta_log` folder is located)
/// - `table_root`: url pointing at the table root (where `_delta_log` folder is located)
/// - `engine`: Implementation of [`Engine`] apis.
/// - `version`: target version of the [`Snapshot`]
pub fn try_new(
Expand Down
18 changes: 18 additions & 0 deletions kernel/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::path::PathBuf;
use url::Url;

use crate::snapshot::Snapshot;
use crate::table_changes::TableChanges;
use crate::transaction::Transaction;
use crate::{DeltaResult, Engine, Error, Version};

Expand Down Expand Up @@ -80,6 +81,23 @@ impl Table {
Snapshot::try_new(self.location.clone(), engine, version)
}

/// Create a [`TableChanges`] to get a change data feed for the table between `start_version`,
/// and `end_version`. If no `end_version` is supplied, the latest version will be used as the
/// `end_version`.
pub fn table_changes(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to be able to specify:

  1. A schema? Does CDF always return the full table schema?
  2. A predicate? Can't you get a CDF with a predicate?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is usually done in the Scan Builder. The plan is to specify the schema and predicate when building a TableChangesScan.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and i suppose there are no optimizations to be done here with that information? If yes we likely would want to propagate that information here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is the main 'entrypoint' API for users to interact with reading CDF can we add some docs on semantics and ideally examples too? important things to call out: (1) require that CDF is enabled for the whole range, (2) require the same schema for the whole range (for now!), (3) how do i scan this thing?

(i'm probably forgetting other bits of semantics so don't take the above as exhaustive)

&self,
engine: &dyn Engine,
start_version: Version,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not impl Into<Version>?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't considered that. Tho this change seems to cause issues:

154 |         let table_changes_res = table.table_changes(engine.as_ref(), 3, 4);
    |                                       -------------                  ^ the trait `From<i32>` is not implemented for `u64`, which is required by `{integer}: Into<u64>

Integers are treated as i32 by default, and i32 can't be converted into u64. By using just start_version: Version, seems that the compiler treats it as a u64 from the get go.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what situation would produce an Into<Version> that is not already Version, that we need the fancy arg passing?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, maybe my question should actually have been, "why not Option<Version> for end_version"? When do we need the into there? I just erred on the side of flexibility

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh using end_version: impl Into<Option<Version>> lets us pass in either Version or Option<Version> to the function. This is a trick I learned from @scovich
So the following are legal:
table.table_changes(engine.as_ref(), 3, 4);
table.table_changes(engine.as_ref(), Some(3), 4);
table.table_changes(engine.as_ref(), None, 4);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh cool, that's nice. Sorry for the long aside :)

end_version: impl Into<Option<Version>>,
) -> DeltaResult<TableChanges> {
TableChanges::try_new(
self.location.clone(),
engine,
start_version,
end_version.into(),
)
}

/// Create a new write transaction for this table.
pub fn new_transaction(&self, engine: &dyn Engine) -> DeltaResult<Transaction> {
Transaction::try_new(self.snapshot(engine, None)?)
Expand Down
209 changes: 209 additions & 0 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
//! Provides an API to read the table's change data feed between two versions.
use std::sync::LazyLock;

use url::Url;

use crate::log_segment::LogSegment;
use crate::path::AsUrl;
use crate::schema::{DataType, Schema, StructField, StructType};
use crate::snapshot::Snapshot;
use crate::table_features::ColumnMappingMode;
use crate::{DeltaResult, Engine, Error, Version};

static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| {
[
StructField::new("_change_type", DataType::STRING, false),
StructField::new("_commit_version", DataType::LONG, false),
StructField::new("_commit_timestamp", DataType::TIMESTAMP, false),
Comment on lines +15 to +17
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: delta-spark does some hand-wringing about table schemas that already provide columns with these names. I think the solution was to block enabling CDF on such tables, and to block creating columns with those names on tables that are already CDF-enabled? (both are writer issues, not reader, but it's prob worth tracking)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in #524. Thx!

]
});

/// Represents a call to read the Change Data Feed between two versions of a table. The schema of
/// `TableChanges` will be the schema of the table with three additional columns:
/// - `_change_type`: String representing the type of change that for that commit. This may be one
/// of `delete`, `insert`, `update_preimage`, or `update_postimage`.
/// - `_commit_version`: Long representing the commit the change occurred in.
/// - `_commit_timestamp`: Time at which the commit occurred. If In-commit timestamps is enabled,
/// this is retrieved from the [`CommitInfo`] action. Otherwise, the timestamp is the same as the
/// commit file's modification timestamp.
///
/// For more details, see the following sections of the protocol:
/// - [Add CDC File](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file)
/// - [Change Data Files](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-data-files).
///
/// [`CommitInfo`]: crate::actions::CommitInfo
#[derive(Debug)]
pub struct TableChanges {
pub log_segment: LogSegment,
OussamaSaoudi-db marked this conversation as resolved.
Show resolved Hide resolved
table_root: Url,
end_snapshot: Snapshot,
start_version: Version,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking, we won't have to re-find the start snapshot when we actually go to return results right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we don't. Henceforth all we are interested in are the commits in the LogSegment, and the schema we got from the end_snapshot.

schema: Schema,
}

impl TableChanges {
/// Creates a new [`TableChanges`] instance for the given version range. This function checks
/// these two properties:
/// - The change data feed table feature must be enabled in both the start or end versions.
/// - The schema at the start and end versions are the same.
///
/// Note that this does not check that change data feed is enabled for every commit in the
/// range. It also does not check that the schema remains the same for the entire range.
///
/// # Parameters
/// - `table_root`: url pointing at the table root (where `_delta_log` folder is located)
/// - `engine`: Implementation of [`Engine`] apis.
/// - `start_version`: The start version of the change data feed
/// - `end_version`: The end version (inclusive) of the change data feed. If this is none, this
/// defaults to the newest table version.
pub fn try_new(
table_root: Url,
engine: &dyn Engine,
start_version: Version,
end_version: Option<Version>,
) -> DeltaResult<Self> {
// Both snapshots ensure that reading is supported at the start and end version using
// [`Protocol::ensure_read_supported`]. Note that we must still verify that reading is
// supported for every intermediary protocol actions
// [`Protocol`]: crate::actions::Protocol
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
let start_snapshot =
Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?;
let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?;
Comment on lines +88 to +90
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to do protocol.ensure_read_supported()? and perhaps we should do that in Snapshot::try_new() and we could just have a comment here that says we rely on snapshot to check?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related #518

Comment on lines +88 to +90
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: Relating to the optimization we've discussed a few times -- the end snapshot should be able to use the first snapshot as a starting point for listing (and P&M), if the two versions aren't too far apart?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I would like that to be the case eventually.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a note to #489


// Verify CDF is enabled at the beginning and end of the interval. We must still check every
// intermediary metadata action in the range.
let is_cdf_enabled = |snapshot: &Snapshot| {
static ENABLE_CDF_FLAG: &str = "delta.enableChangeDataFeed";
snapshot
.metadata()
.configuration
.get(ENABLE_CDF_FLAG)
.is_some_and(|val| val == "true")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this will simplify once #453 merges?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!

};
if !is_cdf_enabled(&start_snapshot) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to check at every point along the way, not just start and end.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, we could leave that till later. I was hoping to do some checking at this stage since we can catch that error earlier. Should I just remove this check?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I think this is okay, as long as we somehow check at each point. I think if it's cheap to error early we should

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No extra cost since we needed to get both snapshots anyway.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add comments here for clarity? (e.g. "we check start/end to fail early if needed but at each point yielding CDF batches we do schema compat check" or something like that)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comments addressing enable cdf flag, schema, and protocol.

return Err(Error::change_data_feed_unsupported(start_version));
} else if !is_cdf_enabled(&end_snapshot) {
return Err(Error::change_data_feed_unsupported(end_snapshot.version()));
}

if start_snapshot.schema() != end_snapshot.schema() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: Technically users can stuff whatever metadata they want into the schema fields; should we track an item to ignore unknown metadata entries when comparing schemas?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we could do that. I think schema checking needs to be changed a lot anyway to support at least adding columns and changing nullability, so there's more work to do in that department.

Copy link
Collaborator Author

@OussamaSaoudi-db OussamaSaoudi-db Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added link to issue to track schema compatibility checks. #523

return Err(Error::generic(format!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could create a schema error or a CDF error (maybe take as a follow-up just wanted to call it out)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm leaving this to another PR because the error can't have StructType since it's too big (192 bytes). Theres other options, but I'll leave it to another PR.

"Failed to build TableChanges: Start and end version schemas are different. Found start version schema {:?} and end version schema {:?}", start_snapshot.schema(), end_snapshot.schema(),
)));
}

let log_root = table_root.join("_delta_log/")?;
let log_segment = LogSegment::for_table_changes(
engine.get_file_system_client().as_ref(),
log_root,
start_version,
end_version,
)?;

let schema = StructType::new(
end_snapshot
.schema()
.fields()
.cloned()
.chain(CDF_FIELDS.clone()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CDF fields are generated columns right? (not read directly from parquet)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, correct. I was planning on making them a special case only in CDF code. If you feel that we can legitimately treat these as generated column, we could add a new column type ColumnType::GeneratedColumn.

);

Ok(TableChanges {
table_root,
end_snapshot,
log_segment,
start_version,
schema,
})
}

/// The start version of the `TableChanges`.
pub fn start_version(&self) -> Version {
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
self.start_version
}
/// The end version (inclusive) of the [`TableChanges`]. If no `end_version` was specified in
/// [`TableChanges::try_new`], this returns the newest version as of the call to `try_new`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// [`TableChanges::try_new`], this returns the newest version as of the call to `try_new`.
/// [`TableChanges::try_new`], this returns the newest version as of the call to [`try_new`].

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes issues with cargo doc, and I think it's a lot of visual clutter to repeat the full [`TableChanges::try_new`]

pub fn end_version(&self) -> Version {
self.log_segment.end_version
}
/// The logical schema of the change data feed. For details on the shape of the schema, see
/// [`TableChanges`].
pub fn schema(&self) -> &Schema {
&self.schema
}
/// Path to the root of the table that is being read.
pub fn table_root(&self) -> &Url {
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
&self.table_root
}
/// The partition columns that will be read.
#[allow(unused)]
pub(crate) fn partition_columns(&self) -> &Vec<String> {
&self.end_snapshot.metadata().partition_columns
}
/// The column mapping mode at the end schema.
#[allow(unused)]
pub(crate) fn column_mapping_mode(&self) -> &ColumnMappingMode {
&self.end_snapshot.column_mapping_mode
}
}

#[cfg(test)]
mod tests {
use crate::engine::sync::SyncEngine;
use crate::schema::DataType;
use crate::schema::StructField;
use crate::table_changes::CDF_FIELDS;
use crate::Error;
use crate::Table;
use itertools::assert_equal;

#[test]
fn table_changes_checks_enable_cdf_flag() {
// Table with CDF enabled, then disabled at 2 and enabled at 3
let path = "./tests/data/table-with-cdf";
OussamaSaoudi-db marked this conversation as resolved.
Show resolved Hide resolved
let engine = Box::new(SyncEngine::new());
let table = Table::try_from_uri(path).unwrap();

let valid_ranges = [(0, 1), (0, 0), (1, 1)];
for (start_version, end_version) in valid_ranges {
let table_changes = table
.table_changes(engine.as_ref(), start_version, end_version)
.unwrap();
assert_eq!(table_changes.start_version, start_version);
assert_eq!(table_changes.end_version(), end_version);
}

let invalid_ranges = [(0, 2), (1, 2), (2, 2), (2, 3)];
for (start_version, end_version) in invalid_ranges {
let res = table.table_changes(engine.as_ref(), start_version, end_version);
assert!(matches!(res, Err(Error::ChangeDataFeedUnsupported(_))))
}
}
#[test]
fn schema_evolution_fails() {
let path = "./tests/data/table-with-cdf";
let engine = Box::new(SyncEngine::new());
let table = Table::try_from_uri(path).unwrap();
let expected_msg = "Failed to build TableChanges: Start and end version schemas are different. Found start version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: true, metadata: {} }} } and end version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: false, metadata: {} }} }";

// A field in the schema goes from being nullable to non-nullable
let table_changes_res = table.table_changes(engine.as_ref(), 3, 4);
assert!(matches!(table_changes_res, Err(Error::Generic(msg)) if msg == expected_msg));
}

#[test]
fn table_changes_has_cdf_schema() {
let path = "./tests/data/table-with-cdf";
let engine = Box::new(SyncEngine::new());
let table = Table::try_from_uri(path).unwrap();
let expected_schema = [
StructField::new("part", DataType::INTEGER, true),
StructField::new("id", DataType::INTEGER, true),
]
.into_iter()
.chain(CDF_FIELDS.clone());

let table_changes = table.table_changes(engine.as_ref(), 0, 0).unwrap();
assert_equal(expected_schema, table_changes.schema().fields().cloned());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1704392842074,"operation":"Manual Update","operationParameters":{},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0-SNAPSHOT","txnId":"95ec924a-6859-4433-8008-6d6b4a0e3ba5"}}
{"protocol":{"minReaderVersion":3,"minWriterVersion":7, "readerFeatures":[], "writerFeatures":[]}}
{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed": "true"}}}
{"add":{"path":"fake/path/1","partitionValues":{},"size":1,"modificationTime":1,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1704392846030,"operation":"Manual Update","operationParameters":{},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0-SNAPSHOT","txnId":"01d40235-c8b4-4f8e-8f19-8c97872217fd"}}
{"cdc":{"path":"fake/path/2","partitionValues":{"partition_foo":"partition_bar"},"size":1,"tags":{"tag_foo":"tag_bar"},"dataChange":false}}
{"remove":{"path":"fake/path/1","deletionTimestamp":100,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1704392846603,"operation":"Manual Update","operationParameters":{},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0-SNAPSHOT","txnId":"6cef7579-ca93-4427-988e-9269e8db50c7"}}
{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed": "false"}}}
{"remove":{"path":"fake/path/1","deletionTimestamp":1704392846603,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":635,"tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"txn":{"appId":"fakeAppId","version":3,"lastUpdated":200}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1704392846603,"operation":"Manual Update","operationParameters":{},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0-SNAPSHOT","txnId":"6cef7579-ca93-4427-988e-9269e8db50c7"}}
{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed": "true"}}}
{"txn":{"appId":"fakeAppId","version":3,"lastUpdated":200}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1704392846603,"operation":"Manual Update","operationParameters":{},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0-SNAPSHOT","txnId":"6cef7579-ca93-4427-988e-9269e8db50c7"}}
{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed": "true"}}}
{"txn":{"appId":"fakeAppId","version":3,"lastUpdated":200}}
Loading