Skip to content

Commit

Permalink
feature/databricks-delta-incremental-support (#130)
Browse files Browse the repository at this point in the history
* feature/databricks-delta-incremental-support

* changelog and integration test updates

* pre review modifications

* Update consistency__audit_table.sql

* Apply suggestions from code review

Co-authored-by: Avinash Kunnath <[email protected]>

* changelog, readme, and docs rebuild updates

* Apply suggestions from code review

Co-authored-by: Avinash Kunnath <[email protected]>

* macro updates for full coverage

* removed artifacts

* schema change and CHANGELOG edit

* added supported destinations to elif and docs regen

* Update is_incremental_compatible.sql

---------

Co-authored-by: Avinash Kunnath <[email protected]>
  • Loading branch information
fivetran-joemarkiewicz and fivetran-avinash authored Jun 11, 2024
1 parent 5fef364 commit ce41a02
Show file tree
Hide file tree
Showing 26 changed files with 701 additions and 31 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
# dbt_fivetran_log v1.8.0
[PR #130](https://github.com/fivetran/dbt_fivetran_log/pull/130) includes the following updates:

## 🚨 Breaking Changes 🚨
> ⚠️ Since the following changes result in the table format changing, we recommend running a `--full-refresh` after upgrading to this version to avoid possible incremental failures.
- For Databricks All-Purpose clusters, the `fivetran_platform__audit_table` model will now be materialized using the delta table format (previously parquet).
- Delta tables are generally more performant than parquet and are also more widely available for Databricks users. Previously, the parquet file format was causing compilation issues on customers' managed tables.

## Documentation Updates
- Updated the `sync_start` and `sync_end` field descriptions for the `fivetran_platform__audit_table` to explicitly define that these fields only represent the sync start/end times for when the connector wrote new or modified existing records to the specified table.
- Addition of integrity and consistency validation tests within integration tests for every end model.
- Removed duplicate Databricks dispatch instructions listed in the README.

## Under the Hood
- The `is_databricks_sql_warehouse` macro has been renamed to `is_incremental_compatible` and has been modified to return `true` if the Databricks runtime being used is an all-purpose cluster (previously this macro checked if a sql warehouse runtime was used) **or** if any other non-Databricks supported destination is being used.
- This update was applied as there have been other Databricks runtimes discovered (ie. an endpoint and external runtime) which do not support the `insert_overwrite` incremental strategy used in the `fivetran_platform__audit_table` model.
- In addition to the above, for Databricks users the `fivetran_platform__audit_table` model will now leverage the incremental strategy only if the Databricks runtime is all-purpose. Otherwise, all other Databricks runtimes will not leverage an incremental strategy.

# dbt_fivetran_log v1.7.3
[PR #126](https://github.com/fivetran/dbt_fivetran_log/pull/126) includes the following updates:

Expand Down
10 changes: 1 addition & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Include the following Fivetran Platform package version range in your `packages.
```yaml
packages:
- package: fivetran/fivetran_log
version: [">=1.7.0", "<1.8.0"]
version: [">=1.8.0", "<1.9.0"]
```

> Note that although the source connector is now "Fivetran Platform", the package retains the old name of "fivetran_log".
Expand Down Expand Up @@ -112,14 +112,6 @@ vars:
fivetran_platform_<default_table_name>_identifier: your_table_name
```

### Databricks Additional Configuration
If you are using a Databricks destination with this package you will need to add the below (or a variation of the below) dispatch configuration within your root `dbt_project.yml`. This is required in order for the package to accurately search for macros within the `dbt-labs/spark_utils` then the `dbt-labs/dbt_utils` packages respectively.
```yml
dispatch:
- macro_namespace: dbt_utils
search_order: ['spark_utils', 'dbt_utils']
```

## (Optional) Step 6: Orchestrate your models with Fivetran Transformations for dbt Core™
<details><summary>Expand for details</summary>
<br>
Expand Down
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
config-version: 2
name: 'fivetran_log'
version: '1.7.3'
version: '1.8.0'
require-dbt-version: [">=1.3.0", "<2.0.0"]

models:
Expand Down
2 changes: 1 addition & 1 deletion docs/catalog.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/manifest.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/run_results.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions integration_tests/ci/sample.profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ integration_tests:
pass: "{{ env_var('CI_REDSHIFT_DBT_PASS') }}"
dbname: "{{ env_var('CI_REDSHIFT_DBT_DBNAME') }}"
port: 5439
schema: fivetran_platform_integration_tests
schema: fivetran_platform_integration_tests_5
threads: 8
bigquery:
type: bigquery
method: service-account-json
project: 'dbt-package-testing'
schema: fivetran_platform_integration_tests
schema: fivetran_platform_integration_tests_5
threads: 8
keyfile_json: "{{ env_var('GCLOUD_SERVICE_KEY') | as_native }}"
snowflake:
Expand All @@ -33,7 +33,7 @@ integration_tests:
role: "{{ env_var('CI_SNOWFLAKE_DBT_ROLE') }}"
database: "{{ env_var('CI_SNOWFLAKE_DBT_DATABASE') }}"
warehouse: "{{ env_var('CI_SNOWFLAKE_DBT_WAREHOUSE') }}"
schema: fivetran_platform_integration_tests
schema: fivetran_platform_integration_tests_5
threads: 8
postgres:
type: postgres
Expand All @@ -42,13 +42,13 @@ integration_tests:
pass: "{{ env_var('CI_POSTGRES_DBT_PASS') }}"
dbname: "{{ env_var('CI_POSTGRES_DBT_DBNAME') }}"
port: 5432
schema: fivetran_platform_integration_tests
schema: fivetran_platform_integration_tests_5
threads: 8
databricks:
catalog: "{{ env_var('CI_DATABRICKS_DBT_CATALOG') }}"
host: "{{ env_var('CI_DATABRICKS_DBT_HOST') }}"
http_path: "{{ env_var('CI_DATABRICKS_DBT_HTTP_PATH') }}"
schema: fivetran_platform_integration_tests
schema: fivetran_platform_integration_tests_5
threads: 8
token: "{{ env_var('CI_DATABRICKS_DBT_TOKEN') }}"
type: databricks
Expand All @@ -66,7 +66,7 @@ integration_tests:
server: "{{ env_var('CI_SQLSERVER_DBT_SERVER') }}"
port: 1433
database: "{{ env_var('CI_SQLSERVER_DBT_DATABASE') }}"
schema: fivetran_platform_integration_tests
schema: fivetran_platform_integration_tests_5
user: "{{ env_var('CI_SQLSERVER_DBT_USER') }}"
password: "{{ env_var('CI_SQLSERVER_DBT_PASS') }}"
threads: 8
6 changes: 3 additions & 3 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: 'fivetran_log_integration_tests'
version: '1.7.3'
version: '1.8.0'

config-version: 2
profile: 'integration_tests'
Expand All @@ -10,7 +10,7 @@ dispatch:

vars:
fivetran_log:
fivetran_platform_schema: "fivetran_platform_integration_tests"
fivetran_platform_schema: "fivetran_platform_integration_tests_5"
fivetran_platform_account_identifier: "account"
fivetran_platform_incremental_mar_identifier: "incremental_mar"
fivetran_platform_connector_identifier: "connector"
Expand All @@ -21,10 +21,10 @@ vars:
fivetran_platform_log_identifier: "log"
fivetran_platform_user_identifier: "user"


models:
fivetran_log:
+schema: "{{ 'sqlw_tests' if target.name == 'databricks-sql' else 'fivetran_platform' }}"
# +schema: "fivetran_platform_{{ var('directed_schema','dev') }}"

seeds:
fivetran_log_integration_tests:
Expand Down
3 changes: 1 addition & 2 deletions integration_tests/packages.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@

packages:
- local: ../
- local: ../
67 changes: 67 additions & 0 deletions integration_tests/tests/consistency/consistency__audit_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@

{{ config(
tags="fivetran_validations",
enabled=var('fivetran_validation_tests_enabled', false)
) }}

with prod as (
select
connector_id,
table_name,
count(*) as total_records
from {{ target.schema }}_fivetran_platform_prod.fivetran_platform__audit_table
group by 1, 2
),

dev as (
select
connector_id,
table_name,
count(*) as total_records
from {{ target.schema }}_fivetran_platform_dev.fivetran_platform__audit_table
group by 1, 2
),

final_consistency_check as (
select
prod.connector_id,
prod.table_name,
prod.total_records as prod_total,
dev.total_records as dev_total
from prod
left join dev
on dev.connector_id = prod.connector_id
and dev.table_name = prod.table_name
),

-- Checking to ensure the dev totals match the prod totals
consistency_check as (
select *
from final_consistency_check
where prod_total != dev_total
),

-- For use when the current release changes the row count of the audit table model intentionally.
-- The below queries prove the records that do not match are still accurate by checking the source.
verification_staging_setup as (
select
connector_id,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name,
count(*) as row_count
from {{ target.schema }}_fivetran_platform_dev.stg_fivetran_platform__log
where event_subtype in ('write_to_table_start')
group by 1, 2
),

final_verification as (
select *
from consistency_check
left join verification_staging_setup
on consistency_check.connector_id = verification_staging_setup.connector_id
and consistency_check.table_name = verification_staging_setup.table_name
where consistency_check.dev_total != verification_staging_setup.row_count
)

select *
from final_verification

Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@

{{ config(
tags="fivetran_validations",
enabled=var('fivetran_validation_tests_enabled', false)
) }}

with prod as (
select
connector_id,
email,
date_day,
count(*) as total_records
from {{ target.schema }}_fivetran_platform_prod.fivetran_platform__audit_user_activity
group by 1, 2, 3
),

dev as (
select
connector_id,
email,
date_day,
count(*) as total_records
from {{ target.schema }}_fivetran_platform_dev.fivetran_platform__audit_user_activity
group by 1, 2, 3
),

final as (
select
prod.connector_id,
prod.email,
prod.date_day,
prod.total_records as prod_total,
dev.total_records as dev_total
from prod
left join dev
on dev.connector_id = prod.connector_id
and dev.email = prod.email
and dev.date_day = prod.date_day
)

select *
from final
where prod_total != dev_total
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@

{{ config(
tags="fivetran_validations",
enabled=var('fivetran_validation_tests_enabled', false)
) }}

with prod as (
select
date_day,
connector_id,
destination_id,
count(*) as total_records
from {{ target.schema }}_fivetran_platform_prod.fivetran_platform__connector_daily_events
group by 1, 2, 3
),

dev as (
select
date_day,
connector_id,
destination_id,
count(*) as total_records
from {{ target.schema }}_fivetran_platform_dev.fivetran_platform__connector_daily_events
group by 1, 2, 3
),

final as (
select
prod.date_day,
prod.connector_id,
prod.destination_id,
prod.total_records as prod_total,
dev.total_records as dev_total
from prod
left join dev
on dev.date_day = prod.date_day
and dev.connector_id = prod.connector_id
and dev.destination_id = prod.destination_id
)

select *
from final
where prod_total != dev_total
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@

{{ config(
tags="fivetran_validations",
enabled=var('fivetran_validation_tests_enabled', false)
) }}

with prod as (
select
1 as join_key,
count(*) as total_records,
sum(number_of_schema_changes_last_month) as total_schema_changes_last_month
from {{ target.schema }}_fivetran_platform_prod.fivetran_platform__connector_status
group by 1
),

dev as (
select
1 as join_key,
count(*) as total_records,
sum(number_of_schema_changes_last_month) as total_schema_changes_last_month
from {{ target.schema }}_fivetran_platform_dev.fivetran_platform__connector_status
group by 1
),

final as (
select
prod.join_key,
dev.join_key,
prod.total_records as prod_total,
dev.total_records as dev_total,
prod.total_schema_changes_last_month as prod_total_schema_changes,
dev.total_schema_changes_last_month as dev_total_schema_changes
from prod
left join dev
on dev.join_key = prod.join_key
)

select *
from final
where prod_total != dev_total
or prod_total_schema_changes != dev_total_schema_changes
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@

{{ config(
tags="fivetran_validations",
enabled=var('fivetran_validation_tests_enabled', false)
) }}

with prod as (
select
connector_name,
schema_name,
table_name,
destination_id,
measured_month,
sum(total_monthly_active_rows) as total_mar,
count(*) as total_records
from {{ target.schema }}_fivetran_platform_prod.fivetran_platform__mar_table_history
group by 1, 2, 3, 4, 5
),

dev as (
select
connector_name,
schema_name,
table_name,
destination_id,
measured_month,
sum(total_monthly_active_rows) as total_mar,
count(*) as total_records
from {{ target.schema }}_fivetran_platform_dev.fivetran_platform__mar_table_history
group by 1, 2, 3, 4, 5
),

final as (
select
prod.connector_name,
prod.schema_name,
prod.table_name,
prod.destination_id,
prod.measured_month,
prod.total_records as prod_total,
dev.total_records as dev_total,
prod.total_mar as prod_total_mar,
dev.total_mar as dev_total_mar
from prod
left join dev
on dev.connector_name = prod.connector_name
and dev.schema_name = prod.schema_name
and dev.table_name = prod.table_name
and dev.destination_id = prod.destination_id
and dev.measured_month = prod.measured_month
)

select *
from final
where prod_total != dev_total
or prod_total_mar != dev_total_mar
Loading

0 comments on commit ce41a02

Please sign in to comment.