Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parse table metadata.configuration as TableProperties #453

Merged
merged 27 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ba8309e
checkpoint with serde but think i need to change that
zachschuermann Nov 1, 2024
9d4b599
rough draft serde for table props
zachschuermann Nov 5, 2024
1b7b193
make everything optional
zachschuermann Nov 5, 2024
02d50ee
errors, comments, cleanup
zachschuermann Nov 8, 2024
e4676d6
fix
zachschuermann Nov 8, 2024
9f8afa4
use new col name list parsing
zachschuermann Nov 8, 2024
ed2c10a
Merge remote-tracking branch 'upstream/main' into table-properties
zachschuermann Nov 13, 2024
42e6028
docs
zachschuermann Nov 18, 2024
f1b9a16
Merge remote-tracking branch 'upstream/main' into table-properties
zachschuermann Nov 18, 2024
82370b4
remove derive
zachschuermann Nov 18, 2024
00b9d8e
make deserializer work on hashmap ref
zachschuermann Nov 18, 2024
f748f87
fix column mapping mode check
zachschuermann Nov 19, 2024
af08092
testing, errors, docs, cleanup
zachschuermann Nov 19, 2024
4587794
cleanup
zachschuermann Nov 19, 2024
1e7d286
fix skipping dat test
zachschuermann Nov 20, 2024
bd9ac7a
address feedback, cleanup
zachschuermann Nov 21, 2024
fa48054
Merge branch 'main' into table-properties
zachschuermann Nov 21, 2024
ff78623
remove unused const
zachschuermann Nov 22, 2024
b667a15
no more serde
zachschuermann Nov 22, 2024
b3cdc61
cleanup
zachschuermann Nov 22, 2024
a891b52
Merge remote-tracking branch 'upstream/main' into table-properties
zachschuermann Nov 22, 2024
d8a2933
add back col mapping mode fn
zachschuermann Nov 22, 2024
d1ce73d
address ryan review
zachschuermann Nov 23, 2024
d8af98c
Merge branch 'main' into table-properties
zachschuermann Nov 25, 2024
6d1b466
use NonZero<u64>
zachschuermann Nov 25, 2024
f18b885
Merge remote-tracking branch 'refs/remotes/origin/table-properties' i…
zachschuermann Nov 25, 2024
437b8db
clippy
zachschuermann Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion acceptance/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ pub fn sort_record_batch(batch: RecordBatch) -> DeltaResult<RecordBatch> {
Ok(RecordBatch::try_new(batch.schema(), columns)?)
}

static SKIPPED_TESTS: &[&str; 0] = &[];
// TODO(zach): skip iceberg_compat_v1 test until DAT is fixed
static SKIPPED_TESTS: &[&str; 1] = &["iceberg_compat_v1"];

// Ensure that two schema have the same field names, and dict_id/ordering.
// We ignore:
Expand Down
2 changes: 2 additions & 0 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ pub enum KernelError {
FileAlreadyExists,
MissingCommitInfo,
UnsupportedError,
ParseIntervalError,
ChangeDataFeedUnsupported,
}

Expand Down Expand Up @@ -436,6 +437,7 @@ impl From<Error> for KernelError {
Error::FileAlreadyExists(_) => KernelError::FileAlreadyExists,
Error::MissingCommitInfo => KernelError::MissingCommitInfo,
Error::Unsupported(_) => KernelError::UnsupportedError,
Error::ParseIntervalError(_) => KernelError::ParseIntervalError,
Error::ChangeDataFeedUnsupported(_) => KernelError::ChangeDataFeedUnsupported,
}
}
Expand Down
10 changes: 9 additions & 1 deletion kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::schema::{SchemaRef, StructType};
use crate::table_features::{
ReaderFeatures, WriterFeatures, SUPPORTED_READER_FEATURES, SUPPORTED_WRITER_FEATURES,
};
use crate::table_properties::TableProperties;
use crate::utils::require;
use crate::{DeltaResult, EngineData, Error, RowVisitor as _};
use visitors::{MetadataVisitor, ProtocolVisitor};
Expand Down Expand Up @@ -116,7 +117,7 @@ pub struct Metadata {
pub partition_columns: Vec<String>,
/// The time when this metadata action is created, in milliseconds since the Unix epoch
pub created_time: Option<i64>,
/// Configuration options for the metadata action
/// Configuration options for the metadata action. These are parsed into [`TableProperties`].
pub configuration: HashMap<String, String>,
}

Expand All @@ -130,6 +131,13 @@ impl Metadata {
pub fn schema(&self) -> DeltaResult<StructType> {
Ok(serde_json::from_str(&self.schema_string)?)
}

/// Parse the metadata configuration HashMap<String, String> into a TableProperties struct.
/// Note that parsing is infallible -- any items that fail to parse are simply propagated
/// through to the `TableProperties.unknown_properties` field.
pub fn parse_table_properties(&self) -> TableProperties {
TableProperties::from(self.configuration.iter())
}
}

#[derive(Default, Debug, Clone, PartialEq, Eq, Schema, Serialize, Deserialize)]
Expand Down
5 changes: 5 additions & 0 deletions kernel/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use crate::schema::DataType;
use crate::table_properties::ParseIntervalError;
use crate::Version;

/// A [`std::result::Result`] that has the kernel [`Error`] as the error variant
Expand Down Expand Up @@ -181,6 +182,10 @@ pub enum Error {
#[error("Unsupported: {0}")]
Unsupported(String),

/// Parsing error when attempting to deserialize an interval
#[error(transparent)]
ParseIntervalError(#[from] ParseIntervalError),

#[error("Change data feed is unsupported for the table at version {0}")]
ChangeDataFeedUnsupported(Version),
}
Expand Down
17 changes: 9 additions & 8 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,17 @@ pub mod actions;
pub mod engine_data;
pub mod error;
pub mod expressions;
pub(crate) mod predicates;
pub mod scan;
pub mod schema;
pub mod snapshot;
pub mod table;
pub mod table_changes;
pub mod table_features;
pub mod table_properties;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only meaningful change is adding pub mod table_properties, other changes are shifting to colocate module declarations

pub mod transaction;

pub(crate) mod predicates;
pub(crate) mod utils;

#[cfg(feature = "developer-visibility")]
pub mod path;
Expand All @@ -77,13 +85,6 @@ pub mod log_segment;
#[cfg(not(feature = "developer-visibility"))]
pub(crate) mod log_segment;

pub mod scan;
pub mod schema;
pub mod snapshot;
pub mod table;
pub mod transaction;
pub(crate) mod utils;

pub use delta_kernel_derive;
pub use engine_data::{EngineData, RowVisitor};
pub use error::{DeltaResult, Error};
Expand Down
17 changes: 11 additions & 6 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! In-memory representation of snapshots of tables (snapshot is a table at given point in time, it
//! has schema etc.)
//!

use serde::{Deserialize, Serialize};
use std::sync::Arc;
Expand All @@ -11,7 +10,8 @@ use crate::actions::{Metadata, Protocol};
use crate::log_segment::LogSegment;
use crate::scan::ScanBuilder;
use crate::schema::Schema;
use crate::table_features::{ColumnMappingMode, COLUMN_MAPPING_MODE_KEY};
use crate::table_features::{column_mapping_mode, ColumnMappingMode};
use crate::table_properties::TableProperties;
use crate::{DeltaResult, Engine, Error, FileSystemClient, Version};

const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint";
Expand All @@ -26,6 +26,7 @@ pub struct Snapshot {
metadata: Metadata,
protocol: Protocol,
schema: Schema,
table_properties: TableProperties,
pub(crate) column_mapping_mode: ColumnMappingMode,
}

Expand Down Expand Up @@ -82,16 +83,15 @@ impl Snapshot {
protocol.ensure_read_supported()?;

let schema = metadata.schema()?;
let column_mapping_mode = match metadata.configuration.get(COLUMN_MAPPING_MODE_KEY) {
Some(mode) if protocol.min_reader_version() >= 2 => mode.as_str().try_into(),
_ => Ok(ColumnMappingMode::None),
}?;
let table_properties = metadata.parse_table_properties();
let column_mapping_mode = column_mapping_mode(&protocol, &table_properties);
Ok(Self {
table_root: location,
log_segment,
metadata,
protocol,
schema,
table_properties,
column_mapping_mode,
})
}
Expand Down Expand Up @@ -126,6 +126,11 @@ impl Snapshot {
&self.protocol
}

/// Get the [`TableProperties`] for this [`Snapshot`].
pub fn table_properties(&self) -> &TableProperties {
&self.table_properties
}

/// Get the [column mapping
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
/// mode](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#column-mapping) at this
/// `Snapshot`s version.
Expand Down
89 changes: 55 additions & 34 deletions kernel/src/table_features/column_mapping.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,79 @@
//! Code to handle column mapping, including modes and schema transforms
use std::str::FromStr;
use super::ReaderFeatures;
use crate::actions::Protocol;
use crate::table_properties::TableProperties;

use serde::{Deserialize, Serialize};

use crate::{DeltaResult, Error};
use strum::EnumString;

/// Modes of column mapping a table can be in
#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq)]
#[derive(Debug, Default, EnumString, Serialize, Deserialize, Copy, Clone, PartialEq, Eq)]
#[strum(serialize_all = "camelCase")]
#[serde(rename_all = "camelCase")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: Do these classes still need serde support?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately yea, since it is still a field in GlobalScanState (which derives Serialize/Deserialize)

pub enum ColumnMappingMode {
/// No column mapping is applied
None,
/// Columns are mapped by their field_id in parquet
Id,
/// Columns are mapped to a physical name
#[default]
Name,
}

// key to look in metadata.configuration for to get column mapping mode
pub(crate) const COLUMN_MAPPING_MODE_KEY: &str = "delta.columnMapping.mode";

impl TryFrom<&str> for ColumnMappingMode {
type Error = Error;

fn try_from(s: &str) -> DeltaResult<Self> {
match s.to_ascii_lowercase().as_str() {
"none" => Ok(Self::None),
"id" => Ok(Self::Id),
"name" => Ok(Self::Name),
_ => Err(Error::invalid_column_mapping_mode(s)),
/// Determine the column mapping mode for a table based on the [`Protocol`] and [`TableProperties`]
pub(crate) fn column_mapping_mode(
protocol: &Protocol,
table_properties: &TableProperties,
) -> ColumnMappingMode {
match table_properties.column_mapping_mode {
Some(mode) if protocol.min_reader_version() == 2 => mode,
Some(mode)
if protocol.min_reader_version() == 3
&& protocol.has_reader_feature(&ReaderFeatures::ColumnMapping) =>
{
mode
}
_ => ColumnMappingMode::None,
}
}

impl FromStr for ColumnMappingMode {
type Err = Error;
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;

fn from_str(s: &str) -> Result<Self, Self::Err> {
s.try_into()
}
}
#[test]
fn test_column_mapping_mode() {
let table_properties: HashMap<_, _> =
[("delta.columnMapping.mode".to_string(), "id".to_string())]
.into_iter()
.collect();
let table_properties = TableProperties::from(table_properties.iter());

impl Default for ColumnMappingMode {
fn default() -> Self {
Self::None
}
}
let protocol = Protocol::try_new(2, 5, None::<Vec<String>>, None::<Vec<String>>).unwrap();
assert_eq!(
column_mapping_mode(&protocol, &table_properties),
ColumnMappingMode::Id
);

impl AsRef<str> for ColumnMappingMode {
fn as_ref(&self) -> &str {
match self {
Self::None => "none",
Self::Id => "id",
Self::Name => "name",
}
let empty_features = Some::<[String; 0]>([]);
let protocol =
Protocol::try_new(3, 7, empty_features.clone(), empty_features.clone()).unwrap();
assert_eq!(
column_mapping_mode(&protocol, &table_properties),
ColumnMappingMode::None
);

let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::DeletionVectors]),
empty_features,
)
.unwrap();
assert_eq!(
column_mapping_mode(&protocol, &table_properties),
ColumnMappingMode::None
);
}
}
5 changes: 2 additions & 3 deletions kernel/src/table_features/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ use std::collections::HashSet;
use std::sync::LazyLock;

use serde::{Deserialize, Serialize};

pub use column_mapping::ColumnMappingMode;
pub(crate) use column_mapping::COLUMN_MAPPING_MODE_KEY;
use strum::{AsRefStr, Display as StrumDisplay, EnumString, VariantNames};

pub(crate) use column_mapping::column_mapping_mode;
pub use column_mapping::ColumnMappingMode;
mod column_mapping;

/// Reader features communicate capabilities that must be implemented in order to correctly read a
Expand Down
Loading
Loading