From 2acb007889d6188f72f1ac1e2398aafdf6736396 Mon Sep 17 00:00:00 2001 From: Tingting Wan <110382691+tingtingQB@users.noreply.github.com> Date: Wed, 31 May 2023 12:24:41 +0100 Subject: [PATCH] feat(datasets): add `SparkStreamingDataSet` (#198) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix links on GitHub issue templates (#150) Signed-off-by: Juan Luis Cano Rodríguez Signed-off-by: Tingting_Wan * add spark_stream_dataset.py Signed-off-by: Tingting_Wan * Migrate most of `kedro-datasets` metadata to `pyproject.toml` (#161) * Include missing requirements files in sdist Fix gh-86. Signed-off-by: Juan Luis Cano Rodríguez * Migrate most project metadata to `pyproject.toml` See https://github.com/kedro-org/kedro/issues/2334. Signed-off-by: Juan Luis Cano Rodríguez * Move requirements to `pyproject.toml` Signed-off-by: Juan Luis Cano Rodríguez --------- Signed-off-by: Juan Luis Cano Rodríguez Signed-off-by: Tingting_Wan * restructure the strean dataset to align with the other spark dataset Signed-off-by: Tingting_Wan * adding README.md for specification Signed-off-by: Tingting_Wan * Update kedro-datasets/kedro_datasets/spark/spark_stream_dataset.py Co-authored-by: Nok Lam Chan Signed-off-by: Tingting_Wan * rename the dataset Signed-off-by: Tingting_Wan * resolve comments Signed-off-by: Tingting_Wan * fix format and pylint Signed-off-by: Tingting_Wan * Update kedro-datasets/kedro_datasets/spark/README.md Co-authored-by: Deepyaman Datta Signed-off-by: Tingting_Wan * add unit tests and SparkStreamingDataset in init.py Signed-off-by: Tingting_Wan * add unit tests Signed-off-by: Tingting_Wan * update test_save Signed-off-by: Tingting_Wan * Upgrade Polars (#171) * Upgrade Polars Signed-off-by: Juan Luis Cano Rodríguez * Update Polars to 0.17.x --------- Signed-off-by: Juan Luis Cano Rodríguez Signed-off-by: Tingting_Wan * if release is failed, it return exit code and fail the CI (#158) Signed-off-by: Tingting_Wan * Migrate `kedro-airflow` to static metadata (#172) * Migrate kedro-airflow to static metadata See https://github.com/kedro-org/kedro/issues/2334. Signed-off-by: Juan Luis Cano Rodríguez * Add explicit PEP 518 build requirements for kedro-datasets Signed-off-by: Juan Luis Cano Rodríguez * Typos Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> Signed-off-by: Juan Luis Cano Rodríguez * Remove dangling reference to requirements.txt Signed-off-by: Juan Luis Cano Rodríguez * Add release notes Signed-off-by: Juan Luis Cano Rodríguez --------- Signed-off-by: Juan Luis Cano Rodríguez Signed-off-by: Tingting_Wan * Migrate `kedro-telemetry` to static metadata (#174) * Migrate kedro-telemetry to static metadata See kedro-org/kedro#2334. Signed-off-by: Juan Luis Cano Rodríguez * Add release notes Signed-off-by: Juan Luis Cano Rodríguez --------- Signed-off-by: Juan Luis Cano Rodríguez Signed-off-by: Tingting_Wan * ci: port lint, unit test, and e2e tests to Actions (#155) * Add unit test + lint test on GA * trigger GA - will revert Signed-off-by: Ankita Katiyar * Fix lint Signed-off-by: Ankita Katiyar * Add end to end tests * Add cache key Signed-off-by: Ankita Katiyar * Add cache action Signed-off-by: Ankita Katiyar * Rename workflow files Signed-off-by: Ankita Katiyar * Lint + add comment + default bash Signed-off-by: Ankita Katiyar * Add windows test Signed-off-by: Ankita Katiyar * Update workflow name + revert changes to READMEs Signed-off-by: Ankita Katiyar * Add kedro-telemetry/RELEASE.md to trufflehog ignore Signed-off-by: Ankita Katiyar * Add pytables to test_requirements remove from workflow Signed-off-by: Ankita Katiyar * Revert "Add pytables to test_requirements remove from workflow" This reverts commit 8203daa6405d325c74ec2097c9d0c5859bae8257. * Separate pip freeze step Signed-off-by: Ankita Katiyar --------- Signed-off-by: Ankita Katiyar Signed-off-by: Tingting_Wan * Migrate `kedro-docker` to static metadata (#173) * Migrate kedro-docker to static metadata See https://github.com/kedro-org/kedro/issues/2334. Signed-off-by: Juan Luis Cano Rodríguez * Address packaging warning Signed-off-by: Juan Luis Cano Rodríguez * Fix tests Signed-off-by: Juan Luis Cano Rodríguez * Actually install current plugin with dependencies Signed-off-by: Juan Luis Cano Rodríguez * Add release notes Signed-off-by: Juan Luis Cano Rodríguez --------- Signed-off-by: Juan Luis Cano Rodríguez Signed-off-by: Tingting_Wan * Introdcuing .gitpod.yml to kedro-plugins (#185) Currently opening gitpod will installed a Python 3.11 which breaks everything because we don't support it set. This PR introduce a simple .gitpod.yml to get it started. Signed-off-by: Tingting_Wan * sync APIDataSet from kedro's `develop` (#184) * Update APIDataSet Signed-off-by: Nok Chan * Sync ParquetDataSet Signed-off-by: Nok Chan * Sync Test Signed-off-by: Nok Chan * Linting Signed-off-by: Nok Chan * Revert Unnecessary ParquetDataSet Changes Signed-off-by: Nok Chan * Sync release notes Signed-off-by: Nok Chan --------- Signed-off-by: Nok Chan Signed-off-by: Tingting_Wan * formatting Signed-off-by: Tingting_Wan * formatting Signed-off-by: Tingting_Wan * formatting Signed-off-by: Tingting_Wan * formatting Signed-off-by: Tingting_Wan * add spark_stream_dataset.py Signed-off-by: Tingting_Wan * restructure the strean dataset to align with the other spark dataset Signed-off-by: Tingting_Wan * adding README.md for specification Signed-off-by: Tingting_Wan * Update kedro-datasets/kedro_datasets/spark/spark_stream_dataset.py Co-authored-by: Nok Lam Chan Signed-off-by: Tingting_Wan * rename the dataset Signed-off-by: Tingting_Wan * resolve comments Signed-off-by: Tingting_Wan * fix format and pylint Signed-off-by: Tingting_Wan * Update kedro-datasets/kedro_datasets/spark/README.md Co-authored-by: Deepyaman Datta Signed-off-by: Tingting_Wan * add unit tests and SparkStreamingDataset in init.py Signed-off-by: Tingting_Wan * add unit tests Signed-off-by: Tingting_Wan * update test_save Signed-off-by: Tingting_Wan * formatting Signed-off-by: Tingting_Wan * formatting Signed-off-by: Tingting_Wan * formatting Signed-off-by: Tingting_Wan * formatting Signed-off-by: Tingting_Wan * lint Signed-off-by: Tingting_Wan * lint Signed-off-by: Tingting_Wan * lint Signed-off-by: Tingting_Wan * update test cases Signed-off-by: Tingting_Wan * add negative test Signed-off-by: Tingting_Wan * remove code snippets fpr testing Signed-off-by: Tingting_Wan * lint Signed-off-by: Tingting_Wan * update tests Signed-off-by: Tingting_Wan * update test and remove redundacy Signed-off-by: Tingting_Wan * linting Signed-off-by: Tingting_Wan * refactor file format Signed-off-by: Tom Kurian * fix read me file Signed-off-by: Tom Kurian * docs: Add community contributions (#199) * Add community contributions Signed-off-by: Juan Luis Cano Rodríguez * Use newer link to docs Signed-off-by: Juan Luis Cano Rodríguez --------- Signed-off-by: Juan Luis Cano Rodríguez * adding test for raise error Signed-off-by: Tingting_Wan * update test and remove redundacy Signed-off-by: Tingting_Wan Signed-off-by: Tom Kurian * linting Signed-off-by: Tingting_Wan Signed-off-by: Tom Kurian * refactor file format Signed-off-by: Tom Kurian * fix read me file Signed-off-by: Tom Kurian * adding test for raise error Signed-off-by: Tingting_Wan Signed-off-by: Tom Kurian * fix readme file Signed-off-by: Tom Kurian * fix readme Signed-off-by: Tom Kurian * fix conflicts Signed-off-by: Tom Kurian * fix ci erors Signed-off-by: Tom Kurian * fix lint issue Signed-off-by: Tom Kurian * update class documentation Signed-off-by: Tom Kurian * add additional test cases Signed-off-by: Tom Kurian * add s3 read test cases Signed-off-by: Tom Kurian * add s3 read test cases Signed-off-by: Tom Kurian * add s3 read test case Signed-off-by: Tom Kurian * test s3 read Signed-off-by: Tom Kurian * remove redundant test cases Signed-off-by: Tom Kurian * fix streaming dataset configurations Signed-off-by: Tom Kurian * update streaming datasets doc Signed-off-by: Tingting_Wan * resolve comments re documentation Signed-off-by: Tingting_Wan * bugfix lint Signed-off-by: Tingting_Wan * update link Signed-off-by: Tingting_Wan * revert the changes on CI Signed-off-by: Nok Chan * test(docker): remove outdated logging-related step (#207) * fixkedro- docker e2e test Signed-off-by: Nok Chan * fix: add timeout to request to satisfy bandit lint --------- Signed-off-by: Nok Chan Co-authored-by: Deepyaman Datta Signed-off-by: Tom Kurian * ci: ensure plugin requirements get installed in CI (#208) * ci: install the plugin alongside test requirements * ci: install the plugin alongside test requirements * Update kedro-airflow.yml * Update kedro-datasets.yml * Update kedro-docker.yml * Update kedro-telemetry.yml * Update kedro-airflow.yml * Update kedro-datasets.yml * Update kedro-airflow.yml * Update kedro-docker.yml * Update kedro-telemetry.yml * ci(telemetry): update isort config to correct sort * Don't use profile ¯\_(ツ)_/¯ Signed-off-by: Deepyaman Datta * chore(datasets): remove empty `tool.black` section * chore(docker): remove empty `tool.black` section --------- Signed-off-by: Deepyaman Datta Signed-off-by: Tom Kurian * ci: Migrate the release workflow from CircleCI to GitHub Actions (#203) * Create check-release.yml * change from test pypi to pypi * split into jobs and move version logic into script * update github actions output * lint * changes based on review * changes based on review * fix script to not append continuously * change pypi api token logic Signed-off-by: Tom Kurian * build: Relax Kedro bound for `kedro-datasets` (#140) * Less strict pin on Kedro for datasets Signed-off-by: Merel Theisen Signed-off-by: Tom Kurian * ci: don't run checks on both `push`/`pull_request` (#192) * ci: don't run checks on both `push`/`pull_request` * ci: don't run checks on both `push`/`pull_request` * ci: don't run checks on both `push`/`pull_request` * ci: don't run checks on both `push`/`pull_request` Signed-off-by: Tom Kurian * chore: delete extra space ending check-release.yml (#210) Signed-off-by: Tom Kurian * ci: Create merge-gatekeeper.yml to make sure PR only merged when all tests checked. (#215) * Create merge-gatekeeper.yml * Update .github/workflows/merge-gatekeeper.yml --------- Co-authored-by: Sajid Alam <90610031+SajidAlamQB@users.noreply.github.com> Signed-off-by: Tom Kurian * ci: Remove the CircleCI setup (#209) * remove circleci setup files and utils * remove circleci configs in kedro-telemetry * remove redundant .github in kedro-telemetry * Delete continue_config.yml * Update check-release.yml * lint * increase timeout to 40 mins for docker e2e tests Signed-off-by: Tom Kurian * feat: Dataset API add `save` method (#180) * [FEAT] add save method to APIDataset Signed-off-by: jmcdonnell * [ENH] create save_args parameter for api_dataset Signed-off-by: jmcdonnell * [ENH] add tests for socket + http errors Signed-off-by: Signed-off-by: jmcdonnell * [ENH] check save data is json Signed-off-by: Signed-off-by: jmcdonnell * [FIX] clean code Signed-off-by: jmcdonnell * [ENH] handle different data types Signed-off-by: jmcdonnell * [FIX] test coverage for exceptions Signed-off-by: jmcdonnell * [ENH] add examples in APIDataSet docstring Signed-off-by: jmcdonnell * sync APIDataSet from kedro's `develop` (#184) * Update APIDataSet Signed-off-by: Nok Chan * Sync ParquetDataSet Signed-off-by: Nok Chan * Sync Test Signed-off-by: Nok Chan * Linting Signed-off-by: Nok Chan * Revert Unnecessary ParquetDataSet Changes Signed-off-by: Nok Chan * Sync release notes Signed-off-by: Nok Chan --------- Signed-off-by: Nok Chan Signed-off-by: jmcdonnell * [FIX] remove support for delete method Signed-off-by: jmcdonnell * [FIX] lint files Signed-off-by: jmcdonnell * [FIX] fix conflicts Signed-off-by: jmcdonnell * [FIX] remove fail save test Signed-off-by: jmcdonnell * [ENH] review suggestions Signed-off-by: jmcdonnell * [ENH] fix tests Signed-off-by: jmcdonnell * [FIX] reorder arguments Signed-off-by: jmcdonnell --------- Signed-off-by: jmcdonnell Signed-off-by: Signed-off-by: Nok Chan Co-authored-by: jmcdonnell Co-authored-by: Nok Lam Chan Signed-off-by: Tom Kurian * ci: Automatically extract release notes for GitHub Releases (#212) * ci: Automatically extract release notes Signed-off-by: Ankita Katiyar * fix lint Signed-off-by: Ankita Katiyar * Raise exceptions Signed-off-by: Ankita Katiyar * Lint Signed-off-by: Ankita Katiyar * Lint Signed-off-by: Ankita Katiyar --------- Signed-off-by: Ankita Katiyar Signed-off-by: Tom Kurian * feat: Add metadata attribute to datasets (#189) * Add metadata attribute to all datasets Signed-off-by: Ahdra Merali Signed-off-by: Tom Kurian * feat: Add ManagedTableDataset for managed Delta Lake tables in Databricks (#206) * committing first version of UnityTableCatalog with unit tests. This datasets allows users to interface with Unity catalog tables in Databricks to both read and write. Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * renaming dataset Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * adding mlflow connectors Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * fixing mlflow imports Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * cleaned up mlflow for initial release Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * cleaned up mlflow references from setup.py for initial release Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * fixed deps in setup.py Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * adding comments before intiial PR Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * moved validation to dataclass Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * bug fix in type of partition column and cleanup Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * updated docstring for ManagedTableDataSet Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * added backticks to catalog Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * fixing regex to allow hyphens Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py Co-authored-by: Jannic <37243923+jmholzer@users.noreply.github.com> Signed-off-by: Jannic Holzer * Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py Co-authored-by: Jannic <37243923+jmholzer@users.noreply.github.com> Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py Co-authored-by: Jannic <37243923+jmholzer@users.noreply.github.com> Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py Co-authored-by: Jannic <37243923+jmholzer@users.noreply.github.com> Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py Co-authored-by: Jannic <37243923+jmholzer@users.noreply.github.com> Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py Co-authored-by: Jannic <37243923+jmholzer@users.noreply.github.com> Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py Co-authored-by: Jannic <37243923+jmholzer@users.noreply.github.com> Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * Update kedro-datasets/test_requirements.txt Co-authored-by: Jannic <37243923+jmholzer@users.noreply.github.com> Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py Co-authored-by: Jannic <37243923+jmholzer@users.noreply.github.com> Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py Co-authored-by: Jannic <37243923+jmholzer@users.noreply.github.com> Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py Co-authored-by: Jannic <37243923+jmholzer@users.noreply.github.com> Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py Co-authored-by: Jannic <37243923+jmholzer@users.noreply.github.com> Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * adding backticks to catalog Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer * Require pandas < 2.0 for compatibility with spark < 3.4 Signed-off-by: Jannic Holzer * Replace use of walrus operator Signed-off-by: Jannic Holzer * Add test coverage for validation methods Signed-off-by: Jannic Holzer * Remove unused versioning functions Signed-off-by: Jannic Holzer * Fix exception catching for invalid schema, add test for invalid schema Signed-off-by: Jannic Holzer * Add pylint ignore Signed-off-by: Jannic Holzer * Add tests/databricks to ignore for no-spark tests Signed-off-by: Jannic Holzer * Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py Co-authored-by: Nok Lam Chan * Update kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py Co-authored-by: Nok Lam Chan * Remove spurious mlflow test dependency Signed-off-by: Jannic Holzer * Add explicit check for database existence Signed-off-by: Jannic Holzer * Remove character limit for table names Signed-off-by: Jannic Holzer * Refactor validation steps in ManagedTable Signed-off-by: Jannic Holzer * Remove spurious checks for table and schema name existence Signed-off-by: Jannic Holzer --------- Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer Co-authored-by: Danny Farah Co-authored-by: Danny Farah Co-authored-by: Nok Lam Chan Signed-off-by: Tom Kurian * docs: Update APIDataset docs and refactor (#217) * Update APIDataset docs and refactor * Acknowledge community contributor * Fix more broken doc Signed-off-by: Nok Chan * Lint Signed-off-by: Juan Luis Cano Rodríguez * Fix release notes of upcoming kedro-datasets --------- Signed-off-by: Nok Chan Signed-off-by: Juan Luis Cano Rodríguez Co-authored-by: Juan Luis Cano Rodríguez Co-authored-by: Jannic <37243923+jmholzer@users.noreply.github.com> Signed-off-by: Tom Kurian * feat: Release `kedro-datasets` version `1.3.0` (#219) * Modify release version and RELEASE.md Signed-off-by: Jannic Holzer * Add proper name for ManagedTableDataSet Signed-off-by: Jannic Holzer * Update kedro-datasets/RELEASE.md Co-authored-by: Juan Luis Cano Rodríguez * Revert lost semicolon for release 1.2.0 Signed-off-by: Jannic Holzer --------- Signed-off-by: Jannic Holzer Co-authored-by: Juan Luis Cano Rodríguez Signed-off-by: Tom Kurian * docs: Fix APIDataSet docstring (#220) * Fix APIDataSet docstring Signed-off-by: Juan Luis Cano Rodríguez * Add release notes Signed-off-by: Juan Luis Cano Rodríguez * Separate [docs] extras from [all] in kedro-datasets Fix gh-143. Signed-off-by: Juan Luis Cano Rodríguez --------- Signed-off-by: Juan Luis Cano Rodríguez Signed-off-by: Tom Kurian * Update kedro-datasets/tests/spark/test_spark_streaming_dataset.py Co-authored-by: Deepyaman Datta Signed-off-by: Tom Kurian * Update kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py Co-authored-by: Deepyaman Datta Signed-off-by: Tom Kurian * Update kedro-datasets/setup.py Co-authored-by: Deepyaman Datta Signed-off-by: Tom Kurian * fix linting issue Signed-off-by: Tom Kurian --------- Signed-off-by: Juan Luis Cano Rodríguez Signed-off-by: Tingting_Wan Signed-off-by: Juan Luis Cano Rodríguez Signed-off-by: Ankita Katiyar Signed-off-by: Nok Chan Signed-off-by: Tom Kurian Signed-off-by: Deepyaman Datta Signed-off-by: Merel Theisen Signed-off-by: jmcdonnell Signed-off-by: Signed-off-by: Ahdra Merali Signed-off-by: Danny Farah Signed-off-by: Jannic Holzer Co-authored-by: Juan Luis Cano Rodríguez Co-authored-by: Tingting Wan <110382691+Tingting711@users.noreply.github.com> Co-authored-by: Nok Lam Chan Co-authored-by: Deepyaman Datta Co-authored-by: Nok Lam Chan Co-authored-by: Ankita Katiyar <110245118+ankatiyar@users.noreply.github.com> Co-authored-by: Juan Luis Cano Rodríguez Co-authored-by: Tom Kurian Co-authored-by: Sajid Alam <90610031+SajidAlamQB@users.noreply.github.com> Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> Co-authored-by: McDonnellJoseph <90898184+McDonnellJoseph@users.noreply.github.com> Co-authored-by: jmcdonnell Co-authored-by: Ahdra Merali <90615669+AhdraMeraliQB@users.noreply.github.com> Co-authored-by: Jannic <37243923+jmholzer@users.noreply.github.com> Co-authored-by: Danny Farah Co-authored-by: Danny Farah Co-authored-by: kuriantom369 <116743025+kuriantom369@users.noreply.github.com> --- kedro-datasets/kedro_datasets/spark/README.md | 44 +++++ .../kedro_datasets/spark/__init__.py | 10 +- .../spark/spark_streaming_dataset.py | 155 +++++++++++++++ kedro-datasets/setup.py | 13 +- .../spark/test_spark_streaming_dataset.py | 178 ++++++++++++++++++ 5 files changed, 394 insertions(+), 6 deletions(-) create mode 100644 kedro-datasets/kedro_datasets/spark/README.md create mode 100644 kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py create mode 100644 kedro-datasets/tests/spark/test_spark_streaming_dataset.py diff --git a/kedro-datasets/kedro_datasets/spark/README.md b/kedro-datasets/kedro_datasets/spark/README.md new file mode 100644 index 000000000..7400c3c47 --- /dev/null +++ b/kedro-datasets/kedro_datasets/spark/README.md @@ -0,0 +1,44 @@ +# Spark Streaming + +``SparkStreamingDataSet`` loads and saves data to streaming DataFrames. +See [Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) for details. + +To work with multiple streaming nodes, 2 hooks are required for: + - Integrating Pyspark, see [Build a Kedro pipeline with PySpark](https://docs.kedro.org/en/stable/integrations/pyspark_integration.html) for details + - Running streaming query without termination unless exception + +#### Supported file formats + +Supported file formats are: + +- Text +- CSV +- JSON +- ORC +- Parquet + +#### Example SparkStreamsHook: + +```python +from kedro.framework.hooks import hook_impl +from pyspark.sql import SparkSession + +class SparkStreamsHook: + @hook_impl + def after_pipeline_run(self) -> None: + """Starts a spark streaming await session + once the pipeline reaches the last node + """ + + spark = SparkSession.builder.getOrCreate() + spark.streams.awaitAnyTermination() +``` +To make the application work with Kafka format, the respective spark configuration needs to be added to``conf/base/spark.yml``. + +#### Example spark.yml: + +```yaml +spark.driver.maxResultSize: 3g +spark.scheduler.mode: FAIR + +``` diff --git a/kedro-datasets/kedro_datasets/spark/__init__.py b/kedro-datasets/kedro_datasets/spark/__init__.py index 3dede09aa..bd649f5c7 100644 --- a/kedro-datasets/kedro_datasets/spark/__init__.py +++ b/kedro-datasets/kedro_datasets/spark/__init__.py @@ -1,6 +1,12 @@ """Provides I/O modules for Apache Spark.""" -__all__ = ["SparkDataSet", "SparkHiveDataSet", "SparkJDBCDataSet", "DeltaTableDataSet"] +__all__ = [ + "SparkDataSet", + "SparkHiveDataSet", + "SparkJDBCDataSet", + "DeltaTableDataSet", + "SparkStreamingDataSet", +] from contextlib import suppress @@ -12,3 +18,5 @@ from .spark_jdbc_dataset import SparkJDBCDataSet with suppress(ImportError): from .deltatable_dataset import DeltaTableDataSet +with suppress(ImportError): + from .spark_streaming_dataset import SparkStreamingDataSet diff --git a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py new file mode 100644 index 000000000..2f7743e65 --- /dev/null +++ b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py @@ -0,0 +1,155 @@ +"""SparkStreamingDataSet to load and save a PySpark Streaming DataFrame.""" +from copy import deepcopy +from pathlib import PurePosixPath +from typing import Any, Dict + +from kedro.io.core import AbstractDataSet +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql.utils import AnalysisException + +from kedro_datasets.spark.spark_dataset import ( + SparkDataSet, + _split_filepath, + _strip_dbfs_prefix, +) + + +class SparkStreamingDataSet(AbstractDataSet): + """``SparkStreamingDataSet`` loads data into Spark Streaming Dataframe objects. + Example usage for the + `YAML API `_: + .. code-block:: yaml + raw.new_inventory: + type: streaming.extras.datasets.spark_streaming_dataset.SparkStreamingDataSet + filepath: data/01_raw/stream/inventory/ + file_format: json + save_args: + output_mode: append + checkpoint: data/04_checkpoint/raw_new_inventory + header: True + load_args: + schema: + filepath: data/01_raw/schema/inventory_schema.json + """ + + DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any] + DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any] + + def __init__( + self, + filepath: str = "", + file_format: str = "", + save_args: Dict[str, Any] = None, + load_args: Dict[str, Any] = None, + ) -> None: + """Creates a new instance of SparkStreamingDataSet. + Args: + filepath: Filepath in POSIX format to a Spark dataframe. When using Databricks + specify ``filepath``s starting with ``/dbfs/``. For message brokers such as + Kafka and all filepath is not required. + file_format: File format used during load and save + operations. These are formats supported by the running + SparkContext include parquet, csv, delta. For a list of supported + formats please refer to Apache Spark documentation at + https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html + load_args: Load args passed to Spark DataFrameReader load method. + It is dependent on the selected file format. You can find + a list of read options for each supported format + in Spark DataFrame read documentation: + https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html, + Please note that a schema is mandatory for a streaming DataFrame + if ``schemaInference`` is not True. + save_args: Save args passed to Spark DataFrame write options. + Similar to load_args this is dependent on the selected file + format. You can pass ``mode`` and ``partitionBy`` to specify + your overwrite mode and partitioning respectively. You can find + a list of options for each format in Spark DataFrame + write documentation: + https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html + """ + self._file_format = file_format + self._save_args = save_args + self._load_args = load_args + + fs_prefix, filepath = _split_filepath(filepath) + + self._fs_prefix = fs_prefix + self._filepath = PurePosixPath(filepath) + + # Handle default load and save arguments + self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) + if load_args is not None: + self._load_args.update(load_args) + self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) + if save_args is not None: + self._save_args.update(save_args) + + # Handle schema load argument + self._schema = self._load_args.pop("schema", None) + if self._schema is not None: + if isinstance(self._schema, dict): + self._schema = SparkDataSet._load_schema_from_file(self._schema) + + def _describe(self) -> Dict[str, Any]: + """Returns a dict that describes attributes of the dataset.""" + return { + "filepath": self._fs_prefix + str(self._filepath), + "file_format": self._file_format, + "load_args": self._load_args, + "save_args": self._save_args, + } + + @staticmethod + def _get_spark(): + return SparkSession.builder.getOrCreate() + + def _load(self) -> DataFrame: + """Loads data from filepath. + If the connector type is kafka then no file_path is required, schema needs to be + seperated from load_args. + Returns: + Data from filepath as pyspark dataframe. + """ + load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._filepath)) + data_stream_reader = ( + self._get_spark() + .readStream.schema(self._schema) + .format(self._file_format) + .options(**self._load_args) + ) + return data_stream_reader.load(load_path) + + def _save(self, data: DataFrame) -> None: + """Saves pyspark dataframe. + Args: + data: PySpark streaming dataframe for saving + """ + save_path = _strip_dbfs_prefix(self._fs_prefix + str(self._filepath)) + output_constructor = data.writeStream.format(self._file_format) + + ( + output_constructor.option( + "checkpointLocation", self._save_args.pop("checkpoint") + ) + .option("path", save_path) + .outputMode(self._save_args.pop("output_mode")) + .options(**self._save_args) + .start() + ) + + def _exists(self) -> bool: + load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._filepath)) + + try: + self._get_spark().readStream.schema(self._schema).load( + load_path, self._file_format + ) + except AnalysisException as exception: + if ( + exception.desc.startswith("Path does not exist:") + or "is not a Streaming data" in exception.desc + ): + return False + raise + return True diff --git a/kedro-datasets/setup.py b/kedro-datasets/setup.py index e69de8fa9..210eb6884 100644 --- a/kedro-datasets/setup.py +++ b/kedro-datasets/setup.py @@ -50,10 +50,15 @@ def _collect_requirements(requires): "plotly.PlotlyDataSet": [PANDAS, "plotly>=4.8.0, <6.0"], "plotly.JSONDataSet": ["plotly>=4.8.0, <6.0"], } -polars_require = {"polars.CSVDataSet": [POLARS],} +polars_require = { + "polars.CSVDataSet": [POLARS] +} redis_require = {"redis.PickleDataSet": ["redis~=4.1"]} snowflake_require = { - "snowflake.SnowparkTableDataSet": ["snowflake-snowpark-python~=1.0.0", "pyarrow~=8.0"] + "snowflake.SnowparkTableDataSet": [ + "snowflake-snowpark-python~=1.0.0", + "pyarrow~=8.0", + ] } spark_require = { "spark.SparkDataSet": [SPARK, HDFS, S3FS], @@ -71,9 +76,7 @@ def _collect_requirements(requires): "tensorflow-macos~=2.0; platform_system == 'Darwin' and platform_machine == 'arm64'", ] } -video_require = { - "video.VideoDataSet": ["opencv-python~=4.5.5.64"] -} +video_require = {"video.VideoDataSet": ["opencv-python~=4.5.5.64"]} yaml_require = {"yaml.YAMLDataSet": [PANDAS, "PyYAML>=4.2, <7.0"]} extras_require = { diff --git a/kedro-datasets/tests/spark/test_spark_streaming_dataset.py b/kedro-datasets/tests/spark/test_spark_streaming_dataset.py new file mode 100644 index 000000000..c4fb6c005 --- /dev/null +++ b/kedro-datasets/tests/spark/test_spark_streaming_dataset.py @@ -0,0 +1,178 @@ +import json + +import boto3 +import pytest +from kedro.io.core import DataSetError +from moto import mock_s3 +from pyspark.sql import SparkSession +from pyspark.sql.types import IntegerType, StringType, StructField, StructType +from pyspark.sql.utils import AnalysisException + +from kedro_datasets.spark.spark_dataset import SparkDataSet +from kedro_datasets.spark.spark_streaming_dataset import SparkStreamingDataSet + +SCHEMA_FILE_NAME = "schema.json" +BUCKET_NAME = "test_bucket" +AWS_CREDENTIALS = {"key": "FAKE_ACCESS_KEY", "secret": "FAKE_SECRET_KEY"} + + +def sample_schema(schema_path): + """read the schema file from json path""" + with open(schema_path, encoding="utf-8") as f: + try: + return StructType.fromJson(json.loads(f.read())) + except Exception as exc: + raise DataSetError( + f"Contents of 'schema.filepath' ({schema_path}) are invalid. " + f"Schema is required for streaming data load, Please provide a valid schema_path." + ) from exc + + +@pytest.fixture +def sample_spark_df_schema() -> StructType: + """Spark Dataframe schema""" + return StructType( + [ + StructField("sku", StringType(), True), + StructField("new_stock", IntegerType(), True), + ] + ) + + +@pytest.fixture +def sample_spark_streaming_df(tmp_path, sample_spark_df_schema): + """Create a sample dataframe for streaming""" + data = [("0001", 2), ("0001", 7), ("0002", 4)] + schema_path = (tmp_path / SCHEMA_FILE_NAME).as_posix() + with open(schema_path, "w", encoding="utf-8") as f: + json.dump(sample_spark_df_schema.jsonValue(), f) + return SparkSession.builder.getOrCreate().createDataFrame( + data, sample_spark_df_schema + ) + + +@pytest.fixture +def mocked_s3_bucket(): + """Create a bucket for testing using moto.""" + with mock_s3(): + conn = boto3.client( + "s3", + aws_access_key_id="fake_access_key", + aws_secret_access_key="fake_secret_key", + ) + conn.create_bucket(Bucket=BUCKET_NAME) + yield conn + + +@pytest.fixture +def s3_bucket(): + with mock_s3(): + s3 = boto3.resource("s3", region_name="us-east-1") + bucket_name = "test-bucket" + s3.create_bucket(Bucket=bucket_name) + yield bucket_name + + +@pytest.fixture +def mocked_s3_schema(tmp_path, mocked_s3_bucket, sample_spark_df_schema: StructType): + """Creates schema file and adds it to mocked S3 bucket.""" + temporary_path = tmp_path / SCHEMA_FILE_NAME + temporary_path.write_text(sample_spark_df_schema.json(), encoding="utf-8") + + mocked_s3_bucket.put_object( + Bucket=BUCKET_NAME, Key=SCHEMA_FILE_NAME, Body=temporary_path.read_bytes() + ) + return mocked_s3_bucket + + +class TestSparkStreamingDataSet: + def test_load(self, tmp_path, sample_spark_streaming_df): + filepath = (tmp_path / "test_streams").as_posix() + schema_path = (tmp_path / SCHEMA_FILE_NAME).as_posix() + + spark_json_ds = SparkDataSet( + filepath=filepath, file_format="json", save_args=[{"mode", "overwrite"}] + ) + spark_json_ds.save(sample_spark_streaming_df) + + streaming_ds = SparkStreamingDataSet( + filepath=filepath, + file_format="json", + load_args={"schema": {"filepath": schema_path}}, + ).load() + assert streaming_ds.isStreaming + schema = sample_schema(schema_path) + assert streaming_ds.schema == schema + + @pytest.mark.usefixtures("mocked_s3_schema") + def test_load_options_schema_path_with_credentials( + self, tmp_path, sample_spark_streaming_df + ): + filepath = (tmp_path / "test_streams").as_posix() + schema_path = (tmp_path / SCHEMA_FILE_NAME).as_posix() + + spark_json_ds = SparkDataSet( + filepath=filepath, file_format="json", save_args=[{"mode", "overwrite"}] + ) + spark_json_ds.save(sample_spark_streaming_df) + + streaming_ds = SparkStreamingDataSet( + filepath=filepath, + file_format="json", + load_args={ + "schema": { + "filepath": f"s3://{BUCKET_NAME}/{SCHEMA_FILE_NAME}", + "credentials": AWS_CREDENTIALS, + } + }, + ).load() + + assert streaming_ds.isStreaming + schema = sample_schema(schema_path) + assert streaming_ds.schema == schema + + def test_save(self, tmp_path, sample_spark_streaming_df): + filepath_json = (tmp_path / "test_streams").as_posix() + filepath_output = (tmp_path / "test_streams_output").as_posix() + schema_path = (tmp_path / SCHEMA_FILE_NAME).as_posix() + checkpoint_path = (tmp_path / "checkpoint").as_posix() + + # Save the sample json file to temp_path for creating dataframe + spark_json_ds = SparkDataSet( + filepath=filepath_json, + file_format="json", + save_args=[{"mode", "overwrite"}], + ) + spark_json_ds.save(sample_spark_streaming_df) + + # Load the json file as the streaming dataframe + loaded_with_streaming = SparkStreamingDataSet( + filepath=filepath_json, + file_format="json", + load_args={"schema": {"filepath": schema_path}}, + ).load() + + # Append json streams to filepath_output with specified schema path + streaming_ds = SparkStreamingDataSet( + filepath=filepath_output, + file_format="json", + load_args={"schema": {"filepath": schema_path}}, + save_args={"checkpoint": checkpoint_path, "output_mode": "append"}, + ) + assert not streaming_ds.exists() + + streaming_ds.save(loaded_with_streaming) + assert streaming_ds.exists() + + def test_exists_raises_error(self, mocker): + # exists should raise all errors except for + # AnalysisExceptions clearly indicating a missing file + spark_data_set = SparkStreamingDataSet(filepath="") + mocker.patch.object( + spark_data_set, + "_get_spark", + side_effect=AnalysisException("Other Exception", []), + ) + + with pytest.raises(DataSetError, match="Other Exception"): + spark_data_set.exists()