-
-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/deltalake s3 #78
base: main
Are you sure you want to change the base?
Conversation
hi @imor . for non API test i already test in local and its run well |
Hey @abduldjafar, I have committed a few fixes to your branch, please take a look. In particular the changes are:
Please also double check the data type mappings in Also in Cell::Bytes(value) => {
let data = std::str::from_utf8(value)
.map_err(|e| format!("Failed to convert to string: {}", e));
let result = match data {
Ok(value) => value.to_string(),
Err(err_msg) => err_msg,
};
Arc::new(StringArray::from(vec![result]))
} There's no guarantee that the bytes type can be converted to a utf8 string. I think a thorough review of data types mapping is needed. |
Thanks a lot for the feedback, @imor! Regarding |
Also could you please take a look at why the tests are failing. |
Hi @imor ready to review |
What was the reason for failing tests and how were they fixed? |
Some sources said its because resources limitation. so the runner kill automatically. so in test part i add |
Co-authored-by: Raminder Singh <[email protected]>
@@ -50,7 +50,7 @@ jobs: | |||
# key: sqlx | |||
- name: Run non-api tests | |||
run: | | |||
cargo test --workspace --all-features --exclude api | |||
cargo test --workspace --all-features --exclude api --jobs 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are jobs restricted to 1 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match config.endpoint_url() { | ||
Some(endpoint) => { | ||
storage_options.insert(AWS_ENDPOINT_URL.to_string(), endpoint.to_string()); | ||
storage_options.insert(AWS_ALLOW_HTTP.to_string(), "true".to_string()); | ||
} | ||
None => (), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please run cargo clippy
because clippy is emitting a warning in this code and suggesting to change it to an if let
expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already run clippy and it passed in my local
Arc::new(StringArray::from(vec![result])) | ||
let data: Vec<&[u8]> = value | ||
.iter() | ||
.map(|item| std::slice::from_ref(item)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another clippy warning.
match config.endpoint_url() { | ||
Some(endpoint) => { | ||
storage_options.insert(AWS_ENDPOINT_URL.to_string(), endpoint.to_string()); | ||
storage_options.insert(AWS_ALLOW_HTTP.to_string(), "true".to_string()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we want to enable insecure non-TLS connections?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one if we want to test in minio or localstack
let mut storage_options = HashMap::new(); | ||
|
||
storage_options.insert(AWS_FORCE_CREDENTIAL_LOAD.to_string(), "true".to_string()); | ||
storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_string(), "true".to_string()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, why no concurrent writer protection?
&Type::INT8 => DataType::LONG, | ||
&Type::INT2 => DataType::SHORT, | ||
&Type::FLOAT4 => DataType::FLOAT, | ||
&Type::FLOAT8 | &Type::NUMERIC => DataType::DOUBLE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think PG's Numeric type will fit in a double. It should be a decimal type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you know the precission?
&Type::TIME | &Type::TIMESTAMP | &Type::TIMESTAMPTZ => { | ||
DataType::Primitive(PrimitiveType::TimestampNtz) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do all of three of these types map to PrimitiveType::TimestampNtz
correctly?
&Type::INT4 => ArrowDataType::Int32, | ||
&Type::INT8 => ArrowDataType::Int64, | ||
&Type::FLOAT4 => ArrowDataType::Float32, | ||
&Type::FLOAT8 | &Type::NUMERIC => ArrowDataType::Float64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, numeric shouldn't be mapped to float64
.iter() | ||
.map(|col| StructField::new(col.name.as_str(), Self::postgres_to_delta(&col.typ), true)) | ||
.chain([ | ||
StructField::new("OP", Self::postgres_to_delta(&Type::VARCHAR), true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this column nullable? Wouldn't its value be always present?
StructField::new( | ||
"pg_replicate_inserted_time", | ||
Self::postgres_to_delta(&Type::TIMESTAMP), | ||
true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for this column, why is it nullable?
Also in the function |
.create_table(&table_name, &table_schema.column_schemas) | ||
.await? | ||
} else { | ||
info!("Table already exists: {}", table_name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this?
What Kind of Change Does This PR Introduce?
This PR adds support for using DeltaLake with an S3 backend, expanding its capabilities beyond the current local filesystem option.
Current Behavior
DeltaLake only supports the local filesystem as the backend storage.
New Behavior
DeltaLake now supports S3 as a backend. Users can choose between the local filesystem and S3 for storing their DeltaLake data.
Configuration Instructions
To use DeltaLake with an S3 backend, ensure your AWS configuration is set up as shown in the example below:
If you are using a service like LocalStack, just add
endpoint_url
in config file like picture above