Skip to content

Commit

Permalink
feat(datasets): add SparkStreamingDataSet (#198)
Browse files Browse the repository at this point in the history
* Fix links on GitHub issue templates (#150)

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>
Signed-off-by: Tingting_Wan <[email protected]>

* add spark_stream_dataset.py

Signed-off-by: Tingting_Wan <[email protected]>

* Migrate most of `kedro-datasets` metadata to `pyproject.toml` (#161)

* Include missing requirements files in sdist

Fix gh-86.

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Migrate most project metadata to `pyproject.toml`

See kedro-org/kedro#2334.

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Move requirements to `pyproject.toml`

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

---------

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>
Signed-off-by: Tingting_Wan <[email protected]>

* restructure the strean dataset to align with the other spark dataset

Signed-off-by: Tingting_Wan <[email protected]>

* adding README.md for specification

Signed-off-by: Tingting_Wan <[email protected]>

* Update kedro-datasets/kedro_datasets/spark/spark_stream_dataset.py

Co-authored-by: Nok Lam Chan <[email protected]>
Signed-off-by: Tingting_Wan <[email protected]>

* rename the dataset

Signed-off-by: Tingting_Wan <[email protected]>

* resolve comments

Signed-off-by: Tingting_Wan <[email protected]>

* fix format and pylint

Signed-off-by: Tingting_Wan <[email protected]>

* Update kedro-datasets/kedro_datasets/spark/README.md

Co-authored-by: Deepyaman Datta <[email protected]>
Signed-off-by: Tingting_Wan <[email protected]>

* add unit tests and SparkStreamingDataset in init.py

Signed-off-by: Tingting_Wan <[email protected]>

* add unit tests

Signed-off-by: Tingting_Wan <[email protected]>

* update test_save

Signed-off-by: Tingting_Wan <[email protected]>

* Upgrade Polars (#171)

* Upgrade Polars

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Update Polars to 0.17.x

---------

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>
Signed-off-by: Tingting_Wan <[email protected]>

* if release is failed, it return exit code and fail the CI (#158)

Signed-off-by: Tingting_Wan <[email protected]>

* Migrate `kedro-airflow` to static metadata (#172)

* Migrate kedro-airflow to static metadata

See kedro-org/kedro#2334.

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Add explicit PEP 518 build requirements for kedro-datasets

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Typos

Co-authored-by: Merel Theisen <[email protected]>

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Remove dangling reference to requirements.txt

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Add release notes

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

---------

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>
Signed-off-by: Tingting_Wan <[email protected]>

* Migrate `kedro-telemetry` to static metadata (#174)

* Migrate kedro-telemetry to static metadata

See kedro-org/kedro#2334.

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Add release notes

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

---------

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>
Signed-off-by: Tingting_Wan <[email protected]>

* ci: port lint, unit test, and e2e tests to Actions (#155)

* Add unit test + lint test on GA

* trigger GA - will revert

Signed-off-by: Ankita Katiyar <[email protected]>

* Fix lint

Signed-off-by: Ankita Katiyar <[email protected]>

* Add end to end tests

* Add cache key

Signed-off-by: Ankita Katiyar <[email protected]>

* Add cache action

Signed-off-by: Ankita Katiyar <[email protected]>

* Rename workflow files

Signed-off-by: Ankita Katiyar <[email protected]>

* Lint + add comment + default bash

Signed-off-by: Ankita Katiyar <[email protected]>

* Add windows test

Signed-off-by: Ankita Katiyar <[email protected]>

* Update workflow name + revert changes to READMEs

Signed-off-by: Ankita Katiyar <[email protected]>

* Add kedro-telemetry/RELEASE.md to trufflehog ignore

Signed-off-by: Ankita Katiyar <[email protected]>

* Add pytables to test_requirements remove from workflow

Signed-off-by: Ankita Katiyar <[email protected]>

* Revert "Add pytables to test_requirements remove from workflow"

This reverts commit 8203daa.

* Separate pip freeze step

Signed-off-by: Ankita Katiyar <[email protected]>

---------

Signed-off-by: Ankita Katiyar <[email protected]>
Signed-off-by: Tingting_Wan <[email protected]>

* Migrate `kedro-docker` to static metadata (#173)

* Migrate kedro-docker to static metadata

See kedro-org/kedro#2334.

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Address packaging warning

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Fix tests

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Actually install current plugin with dependencies

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Add release notes

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

---------

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>
Signed-off-by: Tingting_Wan <[email protected]>

* Introdcuing .gitpod.yml to kedro-plugins (#185)

Currently opening gitpod will installed a Python 3.11 which breaks everything because we don't support it set. This PR introduce a simple .gitpod.yml to get it started.

Signed-off-by: Tingting_Wan <[email protected]>

* sync APIDataSet  from kedro's `develop` (#184)

* Update APIDataSet

Signed-off-by: Nok Chan <[email protected]>

* Sync ParquetDataSet

Signed-off-by: Nok Chan <[email protected]>

* Sync Test

Signed-off-by: Nok Chan <[email protected]>

* Linting

Signed-off-by: Nok Chan <[email protected]>

* Revert Unnecessary ParquetDataSet Changes

Signed-off-by: Nok Chan <[email protected]>

* Sync release notes

Signed-off-by: Nok Chan <[email protected]>

---------

Signed-off-by: Nok Chan <[email protected]>
Signed-off-by: Tingting_Wan <[email protected]>

* formatting

Signed-off-by: Tingting_Wan <[email protected]>

* formatting

Signed-off-by: Tingting_Wan <[email protected]>

* formatting

Signed-off-by: Tingting_Wan <[email protected]>

* formatting

Signed-off-by: Tingting_Wan <[email protected]>

* add spark_stream_dataset.py

Signed-off-by: Tingting_Wan <[email protected]>

* restructure the strean dataset to align with the other spark dataset

Signed-off-by: Tingting_Wan <[email protected]>

* adding README.md for specification

Signed-off-by: Tingting_Wan <[email protected]>

* Update kedro-datasets/kedro_datasets/spark/spark_stream_dataset.py

Co-authored-by: Nok Lam Chan <[email protected]>
Signed-off-by: Tingting_Wan <[email protected]>

* rename the dataset

Signed-off-by: Tingting_Wan <[email protected]>

* resolve comments

Signed-off-by: Tingting_Wan <[email protected]>

* fix format and pylint

Signed-off-by: Tingting_Wan <[email protected]>

* Update kedro-datasets/kedro_datasets/spark/README.md

Co-authored-by: Deepyaman Datta <[email protected]>
Signed-off-by: Tingting_Wan <[email protected]>

* add unit tests and SparkStreamingDataset in init.py

Signed-off-by: Tingting_Wan <[email protected]>

* add unit tests

Signed-off-by: Tingting_Wan <[email protected]>

* update test_save

Signed-off-by: Tingting_Wan <[email protected]>

* formatting

Signed-off-by: Tingting_Wan <[email protected]>

* formatting

Signed-off-by: Tingting_Wan <[email protected]>

* formatting

Signed-off-by: Tingting_Wan <[email protected]>

* formatting

Signed-off-by: Tingting_Wan <[email protected]>

* lint

Signed-off-by: Tingting_Wan <[email protected]>

* lint

Signed-off-by: Tingting_Wan <[email protected]>

* lint

Signed-off-by: Tingting_Wan <[email protected]>

* update test cases

Signed-off-by: Tingting_Wan <[email protected]>

* add negative test

Signed-off-by: Tingting_Wan <[email protected]>

* remove code snippets fpr testing

Signed-off-by: Tingting_Wan <[email protected]>

* lint

Signed-off-by: Tingting_Wan <[email protected]>

* update tests

Signed-off-by: Tingting_Wan <[email protected]>

* update test and remove redundacy

Signed-off-by: Tingting_Wan <[email protected]>

* linting

Signed-off-by: Tingting_Wan <[email protected]>

* refactor file format

Signed-off-by: Tom Kurian <[email protected]>

* fix read me file

Signed-off-by: Tom Kurian <[email protected]>

* docs: Add community contributions (#199)

* Add community contributions

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Use newer link to docs

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

---------

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* adding test for raise error

Signed-off-by: Tingting_Wan <[email protected]>

* update test and remove redundacy

Signed-off-by: Tingting_Wan <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* linting

Signed-off-by: Tingting_Wan <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* refactor file format

Signed-off-by: Tom Kurian <[email protected]>

* fix read me file

Signed-off-by: Tom Kurian <[email protected]>

* adding test for raise error

Signed-off-by: Tingting_Wan <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* fix readme file

Signed-off-by: Tom Kurian <[email protected]>

* fix readme

Signed-off-by: Tom Kurian <[email protected]>

* fix conflicts

Signed-off-by: Tom Kurian <[email protected]>

* fix ci erors

Signed-off-by: Tom Kurian <[email protected]>

* fix lint issue

Signed-off-by: Tom Kurian <[email protected]>

* update class documentation

Signed-off-by: Tom Kurian <[email protected]>

* add additional test cases

Signed-off-by: Tom Kurian <[email protected]>

* add s3 read test cases

Signed-off-by: Tom Kurian <[email protected]>

* add s3 read test cases

Signed-off-by: Tom Kurian <[email protected]>

* add s3 read test case

Signed-off-by: Tom Kurian <[email protected]>

* test s3 read

Signed-off-by: Tom Kurian <[email protected]>

* remove redundant test cases

Signed-off-by: Tom Kurian <[email protected]>

* fix streaming dataset configurations

Signed-off-by: Tom Kurian <[email protected]>

* update streaming datasets doc

Signed-off-by: Tingting_Wan <[email protected]>

* resolve comments re documentation

Signed-off-by: Tingting_Wan <[email protected]>

* bugfix lint

Signed-off-by: Tingting_Wan <[email protected]>

* update link

Signed-off-by: Tingting_Wan <[email protected]>

* revert the changes on CI

Signed-off-by: Nok Chan <[email protected]>

* test(docker): remove outdated logging-related step (#207)

* fixkedro- docker e2e test

Signed-off-by: Nok Chan <[email protected]>

* fix: add timeout to request to satisfy bandit lint

---------

Signed-off-by: Nok Chan <[email protected]>
Co-authored-by: Deepyaman Datta <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* ci: ensure plugin requirements get installed in CI (#208)

* ci: install the plugin alongside test requirements

* ci: install the plugin alongside test requirements

* Update kedro-airflow.yml

* Update kedro-datasets.yml

* Update kedro-docker.yml

* Update kedro-telemetry.yml

* Update kedro-airflow.yml

* Update kedro-datasets.yml

* Update kedro-airflow.yml

* Update kedro-docker.yml

* Update kedro-telemetry.yml

* ci(telemetry): update isort config to correct sort

* Don't use profile ¯\_(ツ)_/¯

Signed-off-by: Deepyaman Datta <[email protected]>

* chore(datasets): remove empty `tool.black` section

* chore(docker): remove empty `tool.black` section

---------

Signed-off-by: Deepyaman Datta <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* ci: Migrate the release workflow from CircleCI to GitHub Actions (#203)

* Create check-release.yml

* change from test pypi to pypi

* split into jobs and move version logic into script

* update github actions output

* lint

* changes based on review

* changes based on review

* fix script to not append continuously

* change pypi api token logic

Signed-off-by: Tom Kurian <[email protected]>

* build: Relax Kedro bound for `kedro-datasets` (#140)

* Less strict pin on Kedro for datasets

Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* ci: don't run checks on both `push`/`pull_request` (#192)

* ci: don't run checks on both `push`/`pull_request`

* ci: don't run checks on both `push`/`pull_request`

* ci: don't run checks on both `push`/`pull_request`

* ci: don't run checks on both `push`/`pull_request`

Signed-off-by: Tom Kurian <[email protected]>

* chore: delete extra space ending check-release.yml (#210)

Signed-off-by: Tom Kurian <[email protected]>

* ci: Create merge-gatekeeper.yml to make sure PR only merged when all tests checked. (#215)

* Create merge-gatekeeper.yml

* Update .github/workflows/merge-gatekeeper.yml

---------

Co-authored-by: Sajid Alam <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* ci: Remove the CircleCI setup (#209)

* remove circleci setup files and utils

* remove circleci configs in kedro-telemetry

* remove redundant .github in kedro-telemetry

* Delete continue_config.yml

* Update check-release.yml

* lint

* increase timeout to 40 mins for docker e2e tests

Signed-off-by: Tom Kurian <[email protected]>

* feat: Dataset API add `save` method (#180)

* [FEAT] add save method to APIDataset

Signed-off-by: jmcdonnell <[email protected]>

* [ENH] create save_args parameter for api_dataset

Signed-off-by: jmcdonnell <[email protected]>

* [ENH] add tests for socket + http errors

Signed-off-by: <[email protected]>
Signed-off-by: jmcdonnell <[email protected]>

* [ENH] check save data is json

Signed-off-by: <[email protected]>
Signed-off-by: jmcdonnell <[email protected]>

* [FIX] clean code

Signed-off-by: jmcdonnell <[email protected]>

* [ENH] handle different data types

Signed-off-by: jmcdonnell <[email protected]>

* [FIX] test coverage for exceptions

Signed-off-by: jmcdonnell <[email protected]>

* [ENH] add examples in APIDataSet docstring

Signed-off-by: jmcdonnell <[email protected]>

* sync APIDataSet  from kedro's `develop` (#184)

* Update APIDataSet

Signed-off-by: Nok Chan <[email protected]>

* Sync ParquetDataSet

Signed-off-by: Nok Chan <[email protected]>

* Sync Test

Signed-off-by: Nok Chan <[email protected]>

* Linting

Signed-off-by: Nok Chan <[email protected]>

* Revert Unnecessary ParquetDataSet Changes

Signed-off-by: Nok Chan <[email protected]>

* Sync release notes

Signed-off-by: Nok Chan <[email protected]>

---------

Signed-off-by: Nok Chan <[email protected]>
Signed-off-by: jmcdonnell <[email protected]>

* [FIX] remove support for delete method

Signed-off-by: jmcdonnell <[email protected]>

* [FIX] lint files

Signed-off-by: jmcdonnell <[email protected]>

* [FIX] fix conflicts

Signed-off-by: jmcdonnell <[email protected]>

* [FIX] remove fail save test

Signed-off-by: jmcdonnell <[email protected]>

* [ENH] review suggestions

Signed-off-by: jmcdonnell <[email protected]>

* [ENH] fix tests

Signed-off-by: jmcdonnell <[email protected]>

* [FIX] reorder arguments

Signed-off-by: jmcdonnell <[email protected]>

---------

Signed-off-by: jmcdonnell <[email protected]>
Signed-off-by: <[email protected]>
Signed-off-by: Nok Chan <[email protected]>
Co-authored-by: jmcdonnell <[email protected]>
Co-authored-by: Nok Lam Chan <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* ci: Automatically extract release notes for GitHub Releases (#212)

* ci: Automatically extract release notes

Signed-off-by: Ankita Katiyar <[email protected]>

* fix lint

Signed-off-by: Ankita Katiyar <[email protected]>

* Raise exceptions

Signed-off-by: Ankita Katiyar <[email protected]>

* Lint

Signed-off-by: Ankita Katiyar <[email protected]>

* Lint

Signed-off-by: Ankita Katiyar <[email protected]>

---------

Signed-off-by: Ankita Katiyar <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* feat: Add metadata attribute to datasets (#189)

* Add metadata attribute to all datasets

Signed-off-by: Ahdra Merali <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* feat: Add ManagedTableDataset for managed Delta Lake tables in Databricks (#206)

* committing first version of UnityTableCatalog with unit tests. This datasets allows users to interface with Unity catalog tables in Databricks to both read and write.

Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* renaming dataset

Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* adding mlflow connectors

Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* fixing mlflow imports

Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* cleaned up mlflow for initial release

Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* cleaned up mlflow references from setup.py for initial release

Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* fixed deps in setup.py

Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* adding comments before intiial PR

Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* moved validation to dataclass

Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* bug fix in type of partition column and cleanup

Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* updated docstring for ManagedTableDataSet

Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* added backticks to catalog

Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* fixing regex to allow hyphens

Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py

Co-authored-by: Jannic <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py

Co-authored-by: Jannic <[email protected]>
Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py

Co-authored-by: Jannic <[email protected]>
Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py

Co-authored-by: Jannic <[email protected]>
Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py

Co-authored-by: Jannic <[email protected]>
Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py

Co-authored-by: Jannic <[email protected]>
Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py

Co-authored-by: Jannic <[email protected]>
Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* Update kedro-datasets/test_requirements.txt

Co-authored-by: Jannic <[email protected]>
Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py

Co-authored-by: Jannic <[email protected]>
Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py

Co-authored-by: Jannic <[email protected]>
Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py

Co-authored-by: Jannic <[email protected]>
Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py

Co-authored-by: Jannic <[email protected]>
Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* adding backticks to catalog

Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>

* Require pandas < 2.0 for compatibility with spark < 3.4

Signed-off-by: Jannic Holzer <[email protected]>

* Replace use of walrus operator

Signed-off-by: Jannic Holzer <[email protected]>

* Add test coverage for validation methods

Signed-off-by: Jannic Holzer <[email protected]>

* Remove unused versioning functions

Signed-off-by: Jannic Holzer <[email protected]>

* Fix exception catching for invalid schema, add test for invalid schema

Signed-off-by: Jannic Holzer <[email protected]>

* Add pylint ignore

Signed-off-by: Jannic Holzer <[email protected]>

* Add tests/databricks to ignore for no-spark tests

Signed-off-by: Jannic Holzer <[email protected]>

* Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py

Co-authored-by: Nok Lam Chan <[email protected]>

* Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py

Co-authored-by: Nok Lam Chan <[email protected]>

* Remove spurious mlflow test dependency

Signed-off-by: Jannic Holzer <[email protected]>

* Add explicit check for database existence

Signed-off-by: Jannic Holzer <[email protected]>

* Remove character limit for table names

Signed-off-by: Jannic Holzer <[email protected]>

* Refactor validation steps in ManagedTable

Signed-off-by: Jannic Holzer <[email protected]>

* Remove spurious checks for table and schema name existence

Signed-off-by: Jannic Holzer <[email protected]>

---------

Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>
Co-authored-by: Danny Farah <[email protected]>
Co-authored-by: Danny Farah <[email protected]>
Co-authored-by: Nok Lam Chan <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* docs: Update APIDataset docs and refactor (#217)

* Update APIDataset docs and refactor

* Acknowledge community contributor

* Fix more broken doc

Signed-off-by: Nok Chan <[email protected]>

* Lint

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Fix release notes of upcoming kedro-datasets

---------

Signed-off-by: Nok Chan <[email protected]>
Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>
Co-authored-by: Juan Luis Cano Rodríguez <[email protected]>
Co-authored-by: Jannic <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* feat: Release `kedro-datasets` version `1.3.0` (#219)

* Modify release version and RELEASE.md

Signed-off-by: Jannic Holzer <[email protected]>

* Add proper name for ManagedTableDataSet

Signed-off-by: Jannic Holzer <[email protected]>

* Update kedro-datasets/RELEASE.md

Co-authored-by: Juan Luis Cano Rodríguez <[email protected]>

* Revert lost semicolon for release 1.2.0

Signed-off-by: Jannic Holzer <[email protected]>

---------

Signed-off-by: Jannic Holzer <[email protected]>
Co-authored-by: Juan Luis Cano Rodríguez <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* docs: Fix APIDataSet docstring (#220)

* Fix APIDataSet docstring

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Add release notes

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Separate [docs] extras from [all] in kedro-datasets

Fix gh-143.

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

---------

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* Update kedro-datasets/tests/spark/test_spark_streaming_dataset.py

Co-authored-by: Deepyaman Datta <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* Update kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py

Co-authored-by: Deepyaman Datta <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* Update kedro-datasets/setup.py

Co-authored-by: Deepyaman Datta <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>

* fix linting issue

Signed-off-by: Tom Kurian <[email protected]>

---------

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>
Signed-off-by: Tingting_Wan <[email protected]>
Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>
Signed-off-by: Ankita Katiyar <[email protected]>
Signed-off-by: Nok Chan <[email protected]>
Signed-off-by: Tom Kurian <[email protected]>
Signed-off-by: Deepyaman Datta <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: jmcdonnell <[email protected]>
Signed-off-by: <[email protected]>
Signed-off-by: Ahdra Merali <[email protected]>
Signed-off-by: Danny Farah <[email protected]>
Signed-off-by: Jannic Holzer <[email protected]>
Co-authored-by: Juan Luis Cano Rodríguez <[email protected]>
Co-authored-by: Tingting Wan <[email protected]>
Co-authored-by: Nok Lam Chan <[email protected]>
Co-authored-by: Deepyaman Datta <[email protected]>
Co-authored-by: Nok Lam Chan <[email protected]>
Co-authored-by: Ankita Katiyar <[email protected]>
Co-authored-by: Juan Luis Cano Rodríguez <[email protected]>
Co-authored-by: Tom Kurian <[email protected]>
Co-authored-by: Sajid Alam <[email protected]>
Co-authored-by: Merel Theisen <[email protected]>
Co-authored-by: McDonnellJoseph <[email protected]>
Co-authored-by: jmcdonnell <[email protected]>
Co-authored-by: Ahdra Merali <[email protected]>
Co-authored-by: Jannic <[email protected]>
Co-authored-by: Danny Farah <[email protected]>
Co-authored-by: Danny Farah <[email protected]>
Co-authored-by: kuriantom369 <[email protected]>
  • Loading branch information
18 people authored May 31, 2023
1 parent 339de8d commit 2acb007
Show file tree
Hide file tree
Showing 5 changed files with 394 additions and 6 deletions.
44 changes: 44 additions & 0 deletions kedro-datasets/kedro_datasets/spark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Spark Streaming

``SparkStreamingDataSet`` loads and saves data to streaming DataFrames.
See [Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) for details.

To work with multiple streaming nodes, 2 hooks are required for:
- Integrating Pyspark, see [Build a Kedro pipeline with PySpark](https://docs.kedro.org/en/stable/integrations/pyspark_integration.html) for details
- Running streaming query without termination unless exception

#### Supported file formats

Supported file formats are:

- Text
- CSV
- JSON
- ORC
- Parquet

#### Example SparkStreamsHook:

```python
from kedro.framework.hooks import hook_impl
from pyspark.sql import SparkSession

class SparkStreamsHook:
@hook_impl
def after_pipeline_run(self) -> None:
"""Starts a spark streaming await session
once the pipeline reaches the last node
"""

spark = SparkSession.builder.getOrCreate()
spark.streams.awaitAnyTermination()
```
To make the application work with Kafka format, the respective spark configuration needs to be added to``conf/base/spark.yml``.

#### Example spark.yml:

```yaml
spark.driver.maxResultSize: 3g
spark.scheduler.mode: FAIR

```
10 changes: 9 additions & 1 deletion kedro-datasets/kedro_datasets/spark/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
"""Provides I/O modules for Apache Spark."""

__all__ = ["SparkDataSet", "SparkHiveDataSet", "SparkJDBCDataSet", "DeltaTableDataSet"]
__all__ = [
"SparkDataSet",
"SparkHiveDataSet",
"SparkJDBCDataSet",
"DeltaTableDataSet",
"SparkStreamingDataSet",
]

from contextlib import suppress

Expand All @@ -12,3 +18,5 @@
from .spark_jdbc_dataset import SparkJDBCDataSet
with suppress(ImportError):
from .deltatable_dataset import DeltaTableDataSet
with suppress(ImportError):
from .spark_streaming_dataset import SparkStreamingDataSet
155 changes: 155 additions & 0 deletions kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""SparkStreamingDataSet to load and save a PySpark Streaming DataFrame."""
from copy import deepcopy
from pathlib import PurePosixPath
from typing import Any, Dict

from kedro.io.core import AbstractDataSet
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.utils import AnalysisException

from kedro_datasets.spark.spark_dataset import (
SparkDataSet,
_split_filepath,
_strip_dbfs_prefix,
)


class SparkStreamingDataSet(AbstractDataSet):
"""``SparkStreamingDataSet`` loads data into Spark Streaming Dataframe objects.
Example usage for the
`YAML API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-yaml-api>`_:
.. code-block:: yaml
raw.new_inventory:
type: streaming.extras.datasets.spark_streaming_dataset.SparkStreamingDataSet
filepath: data/01_raw/stream/inventory/
file_format: json
save_args:
output_mode: append
checkpoint: data/04_checkpoint/raw_new_inventory
header: True
load_args:
schema:
filepath: data/01_raw/schema/inventory_schema.json
"""

DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any]

def __init__(
self,
filepath: str = "",
file_format: str = "",
save_args: Dict[str, Any] = None,
load_args: Dict[str, Any] = None,
) -> None:
"""Creates a new instance of SparkStreamingDataSet.
Args:
filepath: Filepath in POSIX format to a Spark dataframe. When using Databricks
specify ``filepath``s starting with ``/dbfs/``. For message brokers such as
Kafka and all filepath is not required.
file_format: File format used during load and save
operations. These are formats supported by the running
SparkContext include parquet, csv, delta. For a list of supported
formats please refer to Apache Spark documentation at
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
load_args: Load args passed to Spark DataFrameReader load method.
It is dependent on the selected file format. You can find
a list of read options for each supported format
in Spark DataFrame read documentation:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html,
Please note that a schema is mandatory for a streaming DataFrame
if ``schemaInference`` is not True.
save_args: Save args passed to Spark DataFrame write options.
Similar to load_args this is dependent on the selected file
format. You can pass ``mode`` and ``partitionBy`` to specify
your overwrite mode and partitioning respectively. You can find
a list of options for each format in Spark DataFrame
write documentation:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
"""
self._file_format = file_format
self._save_args = save_args
self._load_args = load_args

fs_prefix, filepath = _split_filepath(filepath)

self._fs_prefix = fs_prefix
self._filepath = PurePosixPath(filepath)

# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)

# Handle schema load argument
self._schema = self._load_args.pop("schema", None)
if self._schema is not None:
if isinstance(self._schema, dict):
self._schema = SparkDataSet._load_schema_from_file(self._schema)

def _describe(self) -> Dict[str, Any]:
"""Returns a dict that describes attributes of the dataset."""
return {
"filepath": self._fs_prefix + str(self._filepath),
"file_format": self._file_format,
"load_args": self._load_args,
"save_args": self._save_args,
}

@staticmethod
def _get_spark():
return SparkSession.builder.getOrCreate()

def _load(self) -> DataFrame:
"""Loads data from filepath.
If the connector type is kafka then no file_path is required, schema needs to be
seperated from load_args.
Returns:
Data from filepath as pyspark dataframe.
"""
load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._filepath))
data_stream_reader = (
self._get_spark()
.readStream.schema(self._schema)
.format(self._file_format)
.options(**self._load_args)
)
return data_stream_reader.load(load_path)

def _save(self, data: DataFrame) -> None:
"""Saves pyspark dataframe.
Args:
data: PySpark streaming dataframe for saving
"""
save_path = _strip_dbfs_prefix(self._fs_prefix + str(self._filepath))
output_constructor = data.writeStream.format(self._file_format)

(
output_constructor.option(
"checkpointLocation", self._save_args.pop("checkpoint")
)
.option("path", save_path)
.outputMode(self._save_args.pop("output_mode"))
.options(**self._save_args)
.start()
)

def _exists(self) -> bool:
load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._filepath))

try:
self._get_spark().readStream.schema(self._schema).load(
load_path, self._file_format
)
except AnalysisException as exception:
if (
exception.desc.startswith("Path does not exist:")
or "is not a Streaming data" in exception.desc
):
return False
raise
return True
13 changes: 8 additions & 5 deletions kedro-datasets/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,15 @@ def _collect_requirements(requires):
"plotly.PlotlyDataSet": [PANDAS, "plotly>=4.8.0, <6.0"],
"plotly.JSONDataSet": ["plotly>=4.8.0, <6.0"],
}
polars_require = {"polars.CSVDataSet": [POLARS],}
polars_require = {
"polars.CSVDataSet": [POLARS]
}
redis_require = {"redis.PickleDataSet": ["redis~=4.1"]}
snowflake_require = {
"snowflake.SnowparkTableDataSet": ["snowflake-snowpark-python~=1.0.0", "pyarrow~=8.0"]
"snowflake.SnowparkTableDataSet": [
"snowflake-snowpark-python~=1.0.0",
"pyarrow~=8.0",
]
}
spark_require = {
"spark.SparkDataSet": [SPARK, HDFS, S3FS],
Expand All @@ -71,9 +76,7 @@ def _collect_requirements(requires):
"tensorflow-macos~=2.0; platform_system == 'Darwin' and platform_machine == 'arm64'",
]
}
video_require = {
"video.VideoDataSet": ["opencv-python~=4.5.5.64"]
}
video_require = {"video.VideoDataSet": ["opencv-python~=4.5.5.64"]}
yaml_require = {"yaml.YAMLDataSet": [PANDAS, "PyYAML>=4.2, <7.0"]}

extras_require = {
Expand Down
Loading

0 comments on commit 2acb007

Please sign in to comment.