Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
svdimchenko authored Mar 27, 2024
2 parents 9dc5e03 + bef77fd commit bb00577
Show file tree
Hide file tree
Showing 27 changed files with 1,862 additions and 351 deletions.
172 changes: 0 additions & 172 deletions .all-contributorsrc

This file was deleted.

1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ DBT_TEST_ATHENA_DATABASE=
DBT_TEST_ATHENA_SCHEMA=
DBT_TEST_ATHENA_WORK_GROUP=
DBT_TEST_ATHENA_AWS_PROFILE_NAME=
DBT_TEST_ATHENA_SPARK_WORK_GROUP=
186 changes: 152 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
- Supports two incremental update strategies: `insert_overwrite` and `append`
- Does **not** support the use of `unique_key`
- Supports [snapshots][snapshots]
- Does not support [Python models][python-models]
- Supports [Python models][python-models]

[seeds]: https://docs.getdbt.com/docs/building-a-dbt-project/seeds

Expand Down Expand Up @@ -132,6 +132,7 @@ A dbt profile can be configured to run against AWS Athena using the following co
| aws_profile_name | Profile to use from your AWS shared credentials file. | Optional | `my-profile` |
| work_group | Identifier of Athena workgroup | Optional | `my-custom-workgroup` |
| num_retries | Number of times to retry a failing query | Optional | `3` |
| spark_work_group | Identifier of Athena Spark workgroup | Optional | `my-spark-workgroup` |
| num_boto3_retries | Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables) | Optional | `5` |
| seed_s3_upload_args | Dictionary containing boto3 ExtraArgs when uploading to S3 | Optional | `{"ACL": "bucket-owner-full-control"}` |
| lf_tags_database | Default LF tags for new database if it's created by dbt | Optional | `tag_key: tag_value` |
Expand All @@ -151,8 +152,10 @@ athena:
region_name: eu-west-1
schema: dbt
database: awsdatacatalog
threads: 4
aws_profile_name: my-profile
work_group: my-workgroup
spark_work_group: my-spark-workgroup
seed_s3_upload_args:
ACL: bucket-owner-full-control
```
Expand Down Expand Up @@ -546,6 +549,151 @@ You may find the following links useful to manage that:
* [terraform aws_lakeformation_resource_lf_tags](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_resource_lf_tags)
<!-- markdownlint-restore -->

## Python Models

The adapter supports python models using [`spark`](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html).

### Setup

- A spark enabled work group created in athena
- Spark execution role granted access to Athena, Glue and S3
- The spark work group is added to the ~/.dbt/profiles.yml file and the profile is referenced in dbt_project.yml
that will be created. It is recommended to keep this same as threads.

### Spark specific table configuration

- `timeout` (`default=43200`)
- Time out in seconds for each python model execution. Defaults to 12 hours/43200 seconds.
- `spark_encryption` (`default=false`)
- If this flag is set to true, encrypts data in transit between Spark nodes and also encrypts data at rest stored
locally by Spark.
- `spark_cross_account_catalog` (`default=false`)
- In spark, you can query the external account catalog and for that the consumer account has to be configured to
access the producer catalog.
- If this flag is set to true, "/" can be used as the glue catalog separator.
Ex: 999999999999/mydatabase.cloudfront_logs (*where *999999999999* is the external catalog id*)
- `spark_requester_pays` (`default=false`)
- When an Amazon S3 bucket is configured as requester pays, the account of the user running the query is charged for
data access and data transfer fees associated with the query.
- If this flag is set to true, requester pays S3 buckets are enabled in Athena for Spark.

### Spark notes

- A session is created for each unique engine configuration defined in the models that are part of the invocation.
- A session's idle timeout is set to 10 minutes. Within the timeout period, if there is a new calculation
(spark python model) ready for execution and the engine configuration matches, the process will reuse the same session.
- Number of python models running at a time depends on the `threads`. Number of sessions created for the entire run
depends on number of unique engine configurations and availability of session to maintain threads concurrency.
- For iceberg table, it is recommended to use table_properties configuration to set the format_version to 2. This is to
maintain compatability between iceberg tables created by Trino with those created by Spark.

### Example models

#### Simple pandas model

```python
import pandas as pd
def model(dbt, session):
dbt.config(materialized="table")
model_df = pd.DataFrame({"A": [1, 2, 3, 4]})
return model_df
```

#### Simple spark

```python
def model(dbt, spark_session):
dbt.config(materialized="table")
data = [(1,), (2,), (3,), (4,)]
df = spark_session.createDataFrame(data, ["A"])
return df
```

#### Spark incremental

```python
def model(dbt, spark_session):
dbt.config(materialized="incremental")
df = dbt.ref("model")
if dbt.is_incremental:
max_from_this = (
f"select max(run_date) from {dbt.this.schema}.{dbt.this.identifier}"
)
df = df.filter(df.run_date >= spark_session.sql(max_from_this).collect()[0][0])
return df
```

#### Config spark model

```python
def model(dbt, spark_session):
dbt.config(
materialized="table",
engine_config={
"CoordinatorDpuSize": 1,
"MaxConcurrentDpus": 3,
"DefaultExecutorDpuSize": 1
},
spark_encryption=True,
spark_cross_account_catalog=True,
spark_requester_pays=True
polling_interval=15,
timeout=120,
)
data = [(1,), (2,), (3,), (4,)]
df = spark_session.createDataFrame(data, ["A"])
return df
```

#### Create pySpark udf using imported external python files

```python
def model(dbt, spark_session):
dbt.config(
materialized="incremental",
incremental_strategy="merge",
unique_key="num",
)
sc = spark_session.sparkContext
sc.addPyFile("s3://athena-dbt/test/file1.py")
sc.addPyFile("s3://athena-dbt/test/file2.py")
def func(iterator):
from file2 import transform
return [transform(i) for i in iterator]
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
udf_with_import = udf(func)
data = [(1, "a"), (2, "b"), (3, "c")]
cols = ["num", "alpha"]
df = spark_session.createDataFrame(data, cols)
return df.withColumn("udf_test_col", udf_with_import(col("alpha")))
```

#### Known issues in python models

- Incremental models do not fully utilize spark capabilities. They depend partially on existing sql based logic which
runs on trino.
- Snapshots materializations are not supported.
- Spark can only reference tables within the same catalog.

### Working example

seed file - employent_indicators_november_2022_csv_tables.csv
Expand Down Expand Up @@ -670,38 +818,8 @@ See [CONTRIBUTING](CONTRIBUTING.md) for more information on how to contribute to

Thanks goes to these wonderful people ([emoji key](https://allcontributors.org/docs/en/emoji-key)):

<!-- ALL-CONTRIBUTORS-LIST:START - Do not remove or modify this section -->
<!-- prettier-ignore-start -->
<!-- markdownlint-disable -->
<table>
<tbody>
<tr>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/nicor88"><img src="https://avatars.githubusercontent.com/u/6278547?v=4?s=100" width="100px;" alt="nicor88"/><br /><sub><b>nicor88</b></sub></a><br /><a href="https://github.com/dbt-athena/dbt-athena/commits?author=nicor88" title="Code">💻</a> <a href="#maintenance-nicor88" title="Maintenance">🚧</a> <a href="https://github.com/dbt-athena/dbt-athena/issues?q=author%3Anicor88" title="Bug reports">🐛</a></td>
<td align="center" valign="top" width="14.28%"><a href="https://jessedobbelae.re"><img src="https://avatars.githubusercontent.com/u/1352979?v=4?s=100" width="100px;" alt="Jesse Dobbelaere"/><br /><sub><b>Jesse Dobbelaere</b></sub></a><br /><a href="https://github.com/dbt-athena/dbt-athena/issues?q=author%3Ajessedobbelaere" title="Bug reports">🐛</a> <a href="#maintenance-jessedobbelaere" title="Maintenance">🚧</a></td>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/lemiffe"><img src="https://avatars.githubusercontent.com/u/7487772?v=4?s=100" width="100px;" alt="Lemiffe"/><br /><sub><b>Lemiffe</b></sub></a><br /><a href="#design-lemiffe" title="Design">🎨</a></td>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/Jrmyy"><img src="https://avatars.githubusercontent.com/u/9251353?v=4?s=100" width="100px;" alt="Jérémy Guiselin"/><br /><sub><b>Jérémy Guiselin</b></sub></a><br /><a href="#maintenance-Jrmyy" title="Maintenance">🚧</a> <a href="https://github.com/dbt-athena/dbt-athena/commits?author=Jrmyy" title="Code">💻</a> <a href="https://github.com/dbt-athena/dbt-athena/issues?q=author%3AJrmyy" title="Bug reports">🐛</a></td>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/Tomme"><img src="https://avatars.githubusercontent.com/u/932895?v=4?s=100" width="100px;" alt="Tom"/><br /><sub><b>Tom</b></sub></a><br /><a href="#maintenance-Tomme" title="Maintenance">🚧</a> <a href="https://github.com/dbt-athena/dbt-athena/commits?author=Tomme" title="Code">💻</a></td>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/mattiamatrix"><img src="https://avatars.githubusercontent.com/u/5013654?v=4?s=100" width="100px;" alt="Mattia"/><br /><sub><b>Mattia</b></sub></a><br /><a href="#maintenance-mattiamatrix" title="Maintenance">🚧</a></td>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/Gatsby-Lee"><img src="https://avatars.githubusercontent.com/u/22950880?v=4?s=100" width="100px;" alt="Gatsby Lee"/><br /><sub><b>Gatsby Lee</b></sub></a><br /><a href="https://github.com/dbt-athena/dbt-athena/issues?q=author%3AGatsby-Lee" title="Bug reports">🐛</a></td>
</tr>
<tr>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/BrechtDeVlieger"><img src="https://avatars.githubusercontent.com/u/12074972?v=4?s=100" width="100px;" alt="BrechtDeVlieger"/><br /><sub><b>BrechtDeVlieger</b></sub></a><br /><a href="https://github.com/dbt-athena/dbt-athena/issues?q=author%3ABrechtDeVlieger" title="Bug reports">🐛</a></td>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/aartaria"><img src="https://avatars.githubusercontent.com/u/10273710?v=4?s=100" width="100px;" alt="Andrea Artaria"/><br /><sub><b>Andrea Artaria</b></sub></a><br /><a href="https://github.com/dbt-athena/dbt-athena/issues?q=author%3Aaartaria" title="Bug reports">🐛</a></td>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/maiarareinaldo"><img src="https://avatars.githubusercontent.com/u/72740386?v=4?s=100" width="100px;" alt="Maiara Reinaldo"/><br /><sub><b>Maiara Reinaldo</b></sub></a><br /><a href="https://github.com/dbt-athena/dbt-athena/issues?q=author%3Amaiarareinaldo" title="Bug reports">🐛</a></td>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/henriblancke"><img src="https://avatars.githubusercontent.com/u/1708162?v=4?s=100" width="100px;" alt="Henri Blancke"/><br /><sub><b>Henri Blancke</b></sub></a><br /><a href="https://github.com/dbt-athena/dbt-athena/commits?author=henriblancke" title="Code">💻</a> <a href="https://github.com/dbt-athena/dbt-athena/issues?q=author%3Ahenriblancke" title="Bug reports">🐛</a></td>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/svdimchenko"><img src="https://avatars.githubusercontent.com/u/39801237?v=4?s=100" width="100px;" alt="Serhii Dimchenko"/><br /><sub><b>Serhii Dimchenko</b></sub></a><br /><a href="https://github.com/dbt-athena/dbt-athena/commits?author=svdimchenko" title="Code">💻</a> <a href="https://github.com/dbt-athena/dbt-athena/issues?q=author%3Asvdimchenko" title="Bug reports">🐛</a></td>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/chrischin478"><img src="https://avatars.githubusercontent.com/u/47199426?v=4?s=100" width="100px;" alt="chrischin478"/><br /><sub><b>chrischin478</b></sub></a><br /><a href="https://github.com/dbt-athena/dbt-athena/commits?author=chrischin478" title="Code">💻</a> <a href="https://github.com/dbt-athena/dbt-athena/issues?q=author%3Achrischin478" title="Bug reports">🐛</a></td>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/sanromeo"><img src="https://avatars.githubusercontent.com/u/44975602?v=4?s=100" width="100px;" alt="Roman Korsun"/><br /><sub><b>Roman Korsun</b></sub></a><br /><a href="https://github.com/dbt-athena/dbt-athena/commits?author=sanromeo" title="Code">💻</a> <a href="https://github.com/dbt-athena/dbt-athena/issues?q=author%3Asanromeo" title="Bug reports">🐛</a></td>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/Danya-Fpnk"><img src="https://avatars.githubusercontent.com/u/122433975?v=4?s=100" width="100px;" alt="DanyaF"/><br /><sub><b>DanyaF</b></sub></a><br /><a href="https://github.com/dbt-athena/dbt-athena/commits?author=Danya-Fpnk" title="Code">💻</a> <a href="https://github.com/dbt-athena/dbt-athena/issues?q=author%3ADanya-Fpnk" title="Bug reports">🐛</a></td>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/octiva"><img src="https://avatars.githubusercontent.com/u/53303191?v=4?s=100" width="100px;" alt="Spencer"/><br /><sub><b>Spencer</b></sub></a><br /><a href="https://github.com/dbt-athena/dbt-athena/commits?author=octiva" title="Code">💻</a></td>
</tr>
</tbody>
</table>

<!-- markdownlint-restore -->
<!-- prettier-ignore-end -->

<!-- ALL-CONTRIBUTORS-LIST:END -->
<a href="https://github.com/dbt-athena/dbt-athena/graphs/contributors">
<img src="https://contrib.rocks/image?repo=dbt-athena/dbt-athena" />
</a>

This project follows the [all-contributors](https://github.com/all-contributors/all-contributors) specification.
Contributions of any kind welcome!
Loading

0 comments on commit bb00577

Please sign in to comment.