Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: algo semantic supports dbt Cloud #122

Merged
merged 4 commits into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbterd/adapters/algos/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ def get_relationships_from_metadata(data=[], **kwargs) -> List[Ref]:
if (
test_id.startswith("test")
and rule.get("name").lower() in test_id.lower()
and test_meta is not None
and test_meta.get(TEST_META_IGNORE_IN_ERD, "0") == "0"
):
test_metadata_kwargs = (
Expand Down
126 changes: 124 additions & 2 deletions dbterd/adapters/algos/semantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,42 @@


def parse_metadata(data, **kwargs) -> Tuple[List[Table], List[Ref]]:
raise NotImplementedError() # pragma: no cover
"""Get all information (tables, relationships) needed for building diagram
(from Metadata, with Semantic Entities)

Args:
data (dict): metadata dict

Returns:
Tuple(List[Table], List[Ref]): Info of parsed tables and relationships
"""
tables = []
relationships = []

# Parse Table
tables = base.get_tables_from_metadata(data=data, **kwargs)
tables = base.filter_tables_based_on_selection(tables=tables, **kwargs)

# Parse Ref
relationships = _get_relationships_from_metadata(data=data, **kwargs)
relationships = base.make_up_relationships(
relationships=relationships, tables=tables
)

logger.info(
f"Collected {len(tables)} table(s) and {len(relationships)} relationship(s)"
)
return (
sorted(tables, key=lambda tbl: tbl.node_name),
sorted(relationships, key=lambda rel: rel.name),
)


def parse(
manifest: Manifest, catalog: Union[str, Catalog], **kwargs
) -> Tuple[List[Table], List[Ref]]:
# Parse metadata
if catalog == "metadata": # pragma: no cover
if catalog == "metadata":
return parse_metadata(data=manifest, **kwargs)

# Parse Table
Expand Down Expand Up @@ -69,6 +97,32 @@ def find_related_nodes_by_id(
return list(set(found_nodes))


def _get_relationships_from_metadata(data=[], **kwargs) -> List[Ref]:
"""Extract relationships from Metadata result list on Semantic Entities

Args:
data (List): Metadata result list. Defaults to [].

Returns:
list[Ref]: List of parsed relationship
"""
entities = _get_linked_semantic_entities_from_metadata(data=data)
return base.get_unique_refs(
refs=[
Ref(
name=primary_entity.semantic_model,
table_map=(primary_entity.model, foreign_entity.model),
column_map=(
primary_entity.column_name,
foreign_entity.column_name,
),
type=primary_entity.relationship_type,
)
for foreign_entity, primary_entity in entities
]
)


def _get_relationships(manifest: Manifest, **kwargs) -> List[Ref]:
"""_summary_

Expand All @@ -95,6 +149,27 @@ def _get_relationships(manifest: Manifest, **kwargs) -> List[Ref]:
)


def _get_linked_semantic_entities_from_metadata(
data=[],
) -> List[Tuple[SemanticEntity, SemanticEntity]]:
"""Get filtered list of Semantic Entities which are linked
(Metadata)

Args:
data (List): Metadata result list. Defaults to [].

Returns:
List[Tuple[SemanticEntity, SemanticEntity]]: List of (FK, PK) objects
"""
foreigns, primaries = _get_semantic_entities_from_metadata(data=data)
linked_entities = []
for foreign_entity in foreigns:
for primary_entity in primaries:
if foreign_entity.entity_name == primary_entity.entity_name:
linked_entities.append((foreign_entity, primary_entity))
return linked_entities


def _get_linked_semantic_entities(
manifest: Manifest,
) -> List[Tuple[SemanticEntity, SemanticEntity]]:
Expand All @@ -115,6 +190,53 @@ def _get_linked_semantic_entities(
return linked_entities


def _get_semantic_entities_from_metadata(
data=[],
) -> Tuple[List[SemanticEntity], List[SemanticEntity]]:
"""Get all Semantic Entities
(Metadata)

Args:
data (List): Metadata result list. Defaults to [].

Returns:
Tuple[List[SemanticEntity], List[SemanticEntity]]: FK list and PK list
"""
FK = "foreign"
PK = "primary"

semantic_entities = []
for data_item in data:
for semantic_model in data_item.get("semanticModels", {}).get("edges", []):
id = semantic_model.get("node", {}).get("uniqueId", "")
meta = semantic_model.get("node", {}).get("meta", {}) or {}
# currently only 1 parent with rs type of "model"
model_id = (
semantic_model.get("node", {}).get("parents", {})[0].get("uniqueId", "")
)

entities = semantic_model.get("node", {}).get("entities", [])
for e in entities:
entity_name = e.get("name")
semantic_entities.append(
SemanticEntity(
semantic_model=id,
model=model_id,
entity_name=entity_name,
entity_type=e.get("type"),
column_name=e.get("expr") or entity_name,
relationship_type=(
meta.get(TEST_META_RELATIONSHIP_TYPE, "") if meta else ""
),
)
)

return (
[x for x in semantic_entities if x.entity_type == FK],
[x for x in semantic_entities if x.entity_type == PK],
)


def _get_semantic_entities(
manifest: Manifest,
) -> Tuple[List[SemanticEntity], List[SemanticEntity]]:
Expand Down
2 changes: 2 additions & 0 deletions dbterd/adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ def run(
self, node_unique_id: str = None, **kwargs
) -> Tuple[List[Table], List[Ref]]:
"""Generate ERD from files"""
logger.info(f"Using algorithm [{kwargs.get('algo')}]")
kwargs = self.evaluate_kwargs(**kwargs)
return self.__run_by_strategy(node_unique_id=node_unique_id, **kwargs)

def run_metadata(self, **kwargs) -> Tuple[List[Table], List[Ref]]:
"""Generate ERD from API metadata"""
logger.info(f"Using algorithm [{kwargs.get('algo')}]")
kwargs = self.evaluate_kwargs(**kwargs)
return self.__run_metadata_by_strategy(**kwargs)

Expand Down
38 changes: 31 additions & 7 deletions dbterd/adapters/dbt_cloud/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ def __init__(self, **kwargs) -> None:
self.graphql = GraphQLHelper(**kwargs)
self.environment_id = kwargs.get("dbt_cloud_environment_id")
self.erd_query = Query().take(
file_path=kwargs.get("dbt_cloud_query_file_path", None)
file_path=kwargs.get("dbt_cloud_query_file_path", None),
algo=kwargs.get("algo", None),
)
self.last_cursor = {}

Expand All @@ -31,6 +32,7 @@ def query_erd_data(self, page_size: int = 500, poll_until_end: bool = True):
"source_first": page_size,
"exposure_first": page_size,
"test_first": page_size,
"semantic_model_first": page_size,
}
data = [
self.extract_data(
Expand All @@ -47,6 +49,7 @@ def query_erd_data(self, page_size: int = 500, poll_until_end: bool = True):
self.has_data(data=data[-1], resource_type="source"),
self.has_data(data=data[-1], resource_type="exposure"),
self.has_data(data=data[-1], resource_type="test"),
self.has_data(data=data[-1], resource_type="semanticModel"),
]
):
variables["model_after"] = self.get_last_cursor(
Expand All @@ -61,6 +64,9 @@ def query_erd_data(self, page_size: int = 500, poll_until_end: bool = True):
variables["test_after"] = self.get_last_cursor(
data=data[-1], resource_type="test"
)
variables["semantic_model_after"] = self.get_last_cursor(
data=data[-1], resource_type="semanticModel"
)

self.save_last_cursor(data=data[-1])
data.append(
Expand All @@ -75,7 +81,9 @@ def query_erd_data(self, page_size: int = 500, poll_until_end: bool = True):
def extract_data(self, graphql_data: dict):
"""Extract the core nested dict only:
environment:
applied: <-- HERE
definition: <-- HERE
semanticModels
applied: <-- and HERE
models
sources
tests
Expand All @@ -87,7 +95,16 @@ def extract_data(self, graphql_data: dict):
Returns:
dict: Applied data
"""
return graphql_data.get("environment", {}).get("applied", {})
result = graphql_data.get("environment", {}).get("applied", {})
semantic_models = (
graphql_data.get("environment", {})
.get("definition", {})
.get("semanticModels", {})
)
if semantic_models:
result["semanticModels"] = semantic_models

return result

def has_data(self, data, resource_type: str = "model") -> bool:
"""Check if there is still having data to poll more given the resource type.
Expand All @@ -97,6 +114,7 @@ def has_data(self, data, resource_type: str = "model") -> bool:
- source
- exposure
- test
- semanticModel

Args:
data (dict): Metadata result
Expand All @@ -112,15 +130,17 @@ def has_data(self, data, resource_type: str = "model") -> bool:
)

def save_last_cursor(
self, data, resource_types=["model", "source", "exposure", "test"]
self,
data,
resource_types=["model", "source", "exposure", "test", "semanticModel"],
):
"""Save last poll's cursor of all resource types.

Args:
data (dict): Metadata result
resource_types (list, optional): |
Resource types.
Defaults to ["model", "source", "exposure", "test"].
Defaults to ["model", "source", "exposure", "test", "semanticModel"].
"""
for resource_type in resource_types:
self.last_cursor[resource_type] = self.get_last_cursor(
Expand Down Expand Up @@ -153,14 +173,18 @@ def get_count(self, data, resource_type: str = "model") -> int:
"""
return len(data.get(f"{resource_type}s", {}).get("edges", []))

def show_counts(self, data, resource_types=["model", "source", "exposure", "test"]):
def show_counts(
self,
data,
resource_types=["model", "source", "exposure", "test", "semanticModel"],
):
"""Print the metadata result count for all resource types

Args:
data (dict): Metadata result
resource_types (list, optional): |
Resource types.
Defaults to ["model", "source", "exposure", "test"].
Defaults to ["model", "source", "exposure", "test", "semanticModel"].
"""
results = [
f"{self.get_count(data=data, resource_type=x)} {x}(s)"
Expand Down
82 changes: 82 additions & 0 deletions dbterd/adapters/dbt_cloud/include/erd_query__semantic.gql
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
query (
$environment_id: BigInt!,
$model_first: Int!,
$model_after: String,
$source_first: Int!,
$source_after: String,
$exposure_first: Int!,
$exposure_after: String,
$semantic_model_first: Int!,
$semantic_model_after: String
) {
environment(id: $environment_id){
definition {
semanticModels(first: $semantic_model_first, after: $semantic_model_after){
edges {
node {
uniqueId,
entities {
name, type
}
meta,
parents {
uniqueId, resourceType
}
}
}
pageInfo {
startCursor,
endCursor,
hasNextPage
}
totalCount
}
}
applied {
models(first: $model_first, after: $model_after){
edges {
node {
uniqueId, name, description,
database, schema, alias,
catalog {columns {name, description, type}},
}
}
pageInfo {
startCursor,
endCursor,
hasNextPage
}
totalCount
}
sources(first: $source_first, after: $source_after){
edges {
node {
uniqueId, name, description,
database, schema,
catalog {columns {name, description, type}},
}
}
pageInfo {
startCursor,
endCursor,
hasNextPage
}
totalCount
}
exposures(first: $exposure_first, after: $exposure_after) {
edges {
node {
uniqueId, name, description,
parents { uniqueId }
}
}
pageInfo {
startCursor,
endCursor,
hasNextPage
}
totalCount
}
}
}
}
4 changes: 2 additions & 2 deletions dbterd/adapters/dbt_cloud/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(self) -> None:
"""
self.dir = f"{os.path.dirname(os.path.realpath(__file__))}/include"

def take(self, file_path: str = None) -> str:
def take(self, file_path: str = None, algo: str = None) -> str:
"""Read the given file path and return the content as the query string

Args:
Expand All @@ -21,7 +21,7 @@ def take(self, file_path: str = None) -> str:
Returns:
str: Query string
"""
query_file = file_path or f"{self.dir}/erd_query.gql"
query_file = file_path or f"{self.dir}/erd_query__{algo}.gql"
logger.info(f"Looking for the query in: {query_file}")
return self.get_file_content(file_path=query_file)

Expand Down
Loading
Loading