Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/deltalake s3 #78

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open

Conversation

abduldjafar
Copy link

@abduldjafar abduldjafar commented Dec 6, 2024

What Kind of Change Does This PR Introduce?

This PR adds support for using DeltaLake with an S3 backend, expanding its capabilities beyond the current local filesystem option.


Current Behavior

DeltaLake only supports the local filesystem as the backend storage.


New Behavior

DeltaLake now supports S3 as a backend. Users can choose between the local filesystem and S3 for storing their DeltaLake data.


Configuration Instructions

To use DeltaLake with an S3 backend, ensure your AWS configuration is set up as shown in the example below:

AWS Configuration Example

If you are using a service like LocalStack, just add endpoint_url in config file like picture above

@abduldjafar abduldjafar reopened this Dec 10, 2024
@abduldjafar
Copy link
Author

hi @imor . for non API test i already test in local and its run well

@imor
Copy link
Contributor

imor commented Dec 11, 2024

Hey @abduldjafar, I have committed a few fixes to your branch, please take a look. In particular the changes are:

  • Nit: fixed spacing and formatting in Cargo.toml and .vscode/settings.json files.
  • Dependencies in Cargo.toml are sorted alphabetically.
  • Removed maplit dependencies. We add dependencies only when absolutely needed because they add extra compilation and security burden. A crate like maplit doesn't add much value, we can just write the code by hand and it's not much worse.
  • Removed _write_to_table table as it wasn't called from anywhere.
  • get_lsn_schema function has been shortened.
  • Error messages start with small letters.
  • open_delta_table function was always discarding the error.

Please also double check the data type mappings in postgres_to_delta and postgres_to_arrow as some of them look wrong. E.g. in &Type::INT2 | &Type::INT4 | &Type::INT8 => ArrowDataType::Int32 all integer types are mapped to a 4 byte integer which for smaller types will waste space and for large types truncate the values.

Also in cell_to_arrow why are bytes converted to a string?

            Cell::Bytes(value) => {
                let data = std::str::from_utf8(value)
                    .map_err(|e| format!("Failed to convert to string: {}", e));
                let result = match data {
                    Ok(value) => value.to_string(),
                    Err(err_msg) => err_msg,
                };
                Arc::new(StringArray::from(vec![result]))
            }

There's no guarantee that the bytes type can be converted to a utf8 string. I think a thorough review of data types mapping is needed.

@abduldjafar
Copy link
Author

Thanks a lot for the feedback, @imor! Regarding cell_to_arrow, I initially couldn't find the matching type, but let me double-check it again.

@imor
Copy link
Contributor

imor commented Dec 11, 2024

Also could you please take a look at why the tests are failing.

@abduldjafar abduldjafar marked this pull request as draft December 15, 2024 03:39
@abduldjafar abduldjafar marked this pull request as ready for review December 16, 2024 05:50
@abduldjafar
Copy link
Author

Hi @imor ready to review

pg_replicate/src/clients/delta.rs Outdated Show resolved Hide resolved
pg_replicate/src/clients/delta.rs Show resolved Hide resolved
pg_replicate/src/clients/delta.rs Show resolved Hide resolved
pg_replicate/src/clients/delta.rs Show resolved Hide resolved
pg_replicate/src/clients/delta.rs Show resolved Hide resolved
@imor
Copy link
Contributor

imor commented Dec 17, 2024

What was the reason for failing tests and how were they fixed?

@abduldjafar
Copy link
Author

What was the reason for failing tests and how were they fixed?

Some sources said its because resources limitation. so the runner kill automatically. so in test part i add --job=1 . this will execute compiling process one by one. and will not consume much resource.

@@ -50,7 +50,7 @@ jobs:
# key: sqlx
- name: Run non-api tests
run: |
cargo test --workspace --all-features --exclude api
cargo test --workspace --all-features --exclude api --jobs 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are jobs restricted to 1 here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +43 to +49
match config.endpoint_url() {
Some(endpoint) => {
storage_options.insert(AWS_ENDPOINT_URL.to_string(), endpoint.to_string());
storage_options.insert(AWS_ALLOW_HTTP.to_string(), "true".to_string());
}
None => (),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please run cargo clippy because clippy is emitting a warning in this code and suggesting to change it to an if let expression.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already run clippy and it passed in my local

Arc::new(StringArray::from(vec![result]))
let data: Vec<&[u8]> = value
.iter()
.map(|item| std::slice::from_ref(item))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another clippy warning.

match config.endpoint_url() {
Some(endpoint) => {
storage_options.insert(AWS_ENDPOINT_URL.to_string(), endpoint.to_string());
storage_options.insert(AWS_ALLOW_HTTP.to_string(), "true".to_string());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to enable insecure non-TLS connections?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one if we want to test in minio or localstack

let mut storage_options = HashMap::new();

storage_options.insert(AWS_FORCE_CREDENTIAL_LOAD.to_string(), "true".to_string());
storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_string(), "true".to_string());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, why no concurrent writer protection?

&Type::INT8 => DataType::LONG,
&Type::INT2 => DataType::SHORT,
&Type::FLOAT4 => DataType::FLOAT,
&Type::FLOAT8 | &Type::NUMERIC => DataType::DOUBLE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think PG's Numeric type will fit in a double. It should be a decimal type.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know the precission?

Comment on lines +134 to +136
&Type::TIME | &Type::TIMESTAMP | &Type::TIMESTAMPTZ => {
DataType::Primitive(PrimitiveType::TimestampNtz)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do all of three of these types map to PrimitiveType::TimestampNtz correctly?

&Type::INT4 => ArrowDataType::Int32,
&Type::INT8 => ArrowDataType::Int64,
&Type::FLOAT4 => ArrowDataType::Float32,
&Type::FLOAT8 | &Type::NUMERIC => ArrowDataType::Float64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, numeric shouldn't be mapped to float64

.iter()
.map(|col| StructField::new(col.name.as_str(), Self::postgres_to_delta(&col.typ), true))
.chain([
StructField::new("OP", Self::postgres_to_delta(&Type::VARCHAR), true),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this column nullable? Wouldn't its value be always present?

StructField::new(
"pg_replicate_inserted_time",
Self::postgres_to_delta(&Type::TIMESTAMP),
true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for this column, why is it nullable?

@imor
Copy link
Contributor

imor commented Dec 17, 2024

Also in the function cell_to_arrow, Cell::Null is mapped to an empty string, is this correct?

.create_table(&table_name, &table_schema.column_schemas)
.await?
} else {
info!("Table already exists: {}", table_name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants