Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature request: add ability to ingest from S3 bucket #912

Closed
mingfang opened this issue Oct 20, 2024 · 6 comments
Closed

feature request: add ability to ingest from S3 bucket #912

mingfang opened this issue Oct 20, 2024 · 6 comments
Labels
enhancement New feature or request need more info Further information is requested

Comments

@mingfang
Copy link

We need to ingest many objects(billions) from a S3 bucket.
Due to the large number of objects, just getting the object listing will take a very long time.

The ingestion should be similar to FilesGlob.

@sergiimk
Copy link
Member

FilesGlob could definitely be extended to support listing of S3 objects, but can you clarify the scale of your problem? Are we talking about potentially billions of Parquet or JSON files that you want to read and ingest via kamu?

FilesGlob has some tricky login to sort files by timestamps extracted from file names to:

  • ingest them in event time order
  • and also keep the last timestamp as the "current state" of the ingest and avoid re-scanning files that are below this timestamp.

Thus current logic will attempt to list all entries with every ingest iteration. So if you really have billions of object, I'm afraid even if FilesGlob supported S3 - the listing would take a very long time and cause issues.

I assume your objects aren't just sitting under one key prefix, but partitioned somehow. Thus you likely can express a listing logic far more efficiently than what generic FilesGlob implementation would do.

I therefore see three options:

  • Implement FilesGlob support for S3
    • Convenient, but may not work for your scale
  • Write a custom script and use push sources and kamu ingest my.dataset s3://bucket/file to load objects into the dataset
    • The downside here is that it will be on you to maintain the state of where your script left off
  • Support some kind of ListingSource - a scriptable Fetch source in polling ingest that similarly how Container ingest streams data, would stream object URLs of where to load data from
    • This would allow kamu to store the state needed for resuming the ingest process

@sergiimk sergiimk added enhancement New feature or request need more info Further information is requested labels Oct 20, 2024
@mingfang
Copy link
Author

mingfang commented Oct 20, 2024

One example of very large S3 bucket is to process AWS Cloudtrail logs
In general, new data is written to S3 using monotonically increasing object keys, e.g. timestamp.
We process batches of objects and have to key track of the last processed key.

I'm looking at using Container but I think it needs to way to keep track of state.
Maybe we can have some similar to ODF_NEW_ETAG_PATH, say ODF_NEW_HANDBACK_DATA_PATH for container to write arbitrary state data. Then Kamu can handback the content of that file to the container on next poll as env ODF_NEW_HANDBACK_DATA.

@sergiimk
Copy link
Member

For Container ingest the state management is very simple. If your data objects are keyed by timestamp you can use ODF_LAST_MODIFIED / ODF_NEW_LAST_MODIFIED to store last processed timestamp in dataset state and resume from it.

Using ETAG similarly allows you to store any kind of state data - it's a fully opaque object. For example this image is using ETAG to store the sequence number of the last processed blockchain block.

Container pipes the raw data via stdout for kamu to read - this may be not very efficient in your case where data is already in S3 and in some structured format - it would be better to read it directly. A variation of Container that allows the script to return URLs to data is what I meant earlier by ListingSource. I guess something like ODF_NEW_HANDBACK_DATA_PATH would be one way to implement it.

@mingfang
Copy link
Author

mingfang commented Nov 7, 2024

This crate looks applicable https://github.com/datafusion-contrib/datafusion-objectstore-s3

@sergiimk
Copy link
Member

sergiimk commented Nov 7, 2024

Yea, we are already using s3 object store crate under the hood, to have DataFusion query data directly from Parquet on S3.

I think the key problem here is not how to access the data, but what listing and state management mechanism to use.

I've re-read all above and extracted two work items from this feature request:

@mingfang could you please take a look and confirm that those would address your need?

@sergiimk
Copy link
Member

Now that all requirements are captured in the feature tickets I will close this one.

I added both of the linked tickets as stretch goals to the "Usability" objective of our current milestone - hope we'll get to them soon.

Thanks again @mingfang!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request need more info Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants