Skip to content

Commit

Permalink
feat: handle variants during knowledge graph construction and scope g…
Browse files Browse the repository at this point in the history
…raph to node, add example prefix in config block
  • Loading branch information
z3z1ma committed Dec 31, 2024
1 parent 0b62d64 commit 5b31abf
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 15 deletions.
Binary file modified demo_duckdb/jaffle_shop.duckdb
Binary file not shown.
60 changes: 60 additions & 0 deletions demo_duckdb/models/orders_prefix.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{{ config(
dbt_osmosis_prefix="o_",
) }}

{% set payment_methods = ['credit_card', 'coupon', 'bank_transfer', 'gift_card'] %}

with orders as (

select * from {{ ref('stg_orders') }}

),

payments as (

select * from {{ ref('stg_payments') }}

),

order_payments as (

select
order_id,

{% for payment_method in payment_methods %}
sum(case when payment_method = '{{ payment_method }}' then amount else 0 end) as {{ payment_method }}_amount,
{% endfor %}

sum(amount) as total_amount

from payments

group by order_id

),

final as (

select
orders.order_id as o_order_id,
orders.customer_id as o_customer_id,
orders.order_date as o_order_date,
orders.status as o_status,

{% for payment_method in payment_methods %}

order_payments.{{ payment_method }}_amount as o_{{ payment_method }}_amount,

{% endfor -%}

order_payments.total_amount as o_amount

from orders


left join order_payments
on orders.order_id = order_payments.order_id

)

select * from final
30 changes: 29 additions & 1 deletion demo_duckdb/models/schema.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
version: 2

models:
- name: customers
description: This table has basic information about a customer, as well as some derived facts based on a customer's orders
Expand Down Expand Up @@ -83,3 +82,32 @@ models:
description: Total amount (AUD) of the order
tests:
- not_null
- name: orders_prefix
columns:
- name: o_order_id
description: ''
data_type: INTEGER
- name: o_customer_id
description: ''
data_type: INTEGER
- name: o_order_date
description: ''
data_type: DATE
- name: o_status
description: '{{ doc("orders_status") }}'
data_type: VARCHAR
- name: o_credit_card_amount
description: ''
data_type: DOUBLE
- name: o_coupon_amount
description: ''
data_type: DOUBLE
- name: o_bank_transfer_amount
description: ''
data_type: DOUBLE
- name: o_gift_card_amount
description: ''
data_type: DOUBLE
- name: o_amount
description: ''
data_type: DOUBLE
32 changes: 19 additions & 13 deletions src/dbt_osmosis/core/osmosis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,13 @@ def _build_column_knowledge_graph(
"""Generate a column knowledge graph for a dbt model or source node."""
tree = _build_node_ancestor_tree(context.project.manifest, node)

pm = get_plugin_manager()
node_column_variants: dict[str, list[str]] = {}
for column_name, _ in node.columns.items():
variants = node_column_variants.setdefault(column_name, [column_name])
for v in pm.hook.get_candidates(name=column_name, node=node, context=context.project):
variants.extend(t.cast(list[str], v))

column_knowledge_graph: dict[str, dict[str, t.Any]] = {}
for generation in reversed(sorted(tree.keys())):
ancestors = tree[generation]
Expand All @@ -1108,15 +1115,21 @@ def _build_column_knowledge_graph(
if not isinstance(ancestor, (SourceDefinition, SeedNode, ModelNode)):
continue

for name, metadata in ancestor.columns.items():
for name, _ in node.columns.items():
graph_node = column_knowledge_graph.setdefault(name, {})
for variant in node_column_variants[name]:
incoming = ancestor.columns.get(variant)
if incoming is not None:
break
else:
continue
graph_edge = incoming.to_dict()

if context.settings.add_progenitor_to_meta:
graph_node.setdefault("meta", {}).setdefault(
"osmosis_progenitor", ancestor.unique_id
)

graph_edge = metadata.to_dict()

if context.settings.use_unrendered_descriptions:
raw_yaml = _get_member_yaml(context, ancestor) or {}
raw_columns = t.cast(list[dict[str, t.Any]], raw_yaml.get("columns", []))
Expand All @@ -1125,7 +1138,7 @@ def _build_column_knowledge_graph(
lambda c: normalize_column_name(
c["name"], context.project.config.credentials.type
)
== name,
in node_column_variants[name],
{},
)
if unrendered_description := raw_column_metadata.get("description"):
Expand Down Expand Up @@ -1183,15 +1196,8 @@ def inherit_upstream_column_knowledge(
column_knowledge_graph = _build_column_knowledge_graph(context, node)
kwargs = None
for name, node_column in node.columns.items():
variants: list[str] = [name]
pm = get_plugin_manager()
for v in pm.hook.get_candidates(name=name, node=node, context=context.project):
variants.extend(t.cast(list[str], v))
for variant in variants:
kwargs = column_knowledge_graph.get(variant)
if kwargs is not None:
break
else:
kwargs = column_knowledge_graph.get(name)
if kwargs is None:
continue

updated_metadata = {k: v for k, v in kwargs.items() if v is not None and k in inheritable}
Expand Down
2 changes: 1 addition & 1 deletion tests/test_column_level_knowledge_propagator.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ def test_update_undocumented_columns_with_prior_knowledge_skip_merge_meta(
assert target_node_columns["customer_id"].description == "THIS COLUMN IS UPDATED FOR TESTING"
assert (
target_node_columns["customer_id"].meta == {"my_key": "my_value"}
) # NOTE: nodes meta is not mutated beyond our original mutation in the manifest node since skip_merge_tags is True
) # NOTE: nodes meta is not mutated beyond our original mutation in the manifest node since skip_merge_meta is True
assert sorted(target_node_columns["customer_id"].tags) == [
"my_tag1",
"my_tag2",
Expand Down

0 comments on commit 5b31abf

Please sign in to comment.