Skip to content

Commit

Permalink
dbtLoggerDatabase class for logging to database
Browse files Browse the repository at this point in the history
  • Loading branch information
austinweisgrau committed Jul 22, 2024
1 parent 4f10c3b commit 15a3872
Showing 1 changed file with 46 additions and 1 deletion.
47 changes: 46 additions & 1 deletion parsons/utilities/dbt/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
from abc import ABC, abstractmethod
from typing import Optional

from dbt.contracts.graph.manifest import Manifest
from rich.console import Console
from rich.logging import RichHandler
from rich.markdown import Markdown

from dbt.contracts.graph.manifest import Manifest
from parsons import Table
from parsons.databases.database_connector import DatabaseConnector

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -175,3 +177,46 @@ def send(self, manifests: list[Manifest]) -> None:
from parsons.notifications.slack import Slack

Slack.message(channel=self.slack_channel, text=log_text, webhook=self.slack_webhook)


class dbtLoggerDatabase(dbtLogger, ABC):
"""Log dbt artifacts by loading to a database."""

def __init__(self, database_connector: DatabaseConnector, destination_table: str) -> None:
self.db_connector = database_connector
self.destination_table = destination_table

def format_command_result(self, manifest: Manifest) -> Table:
"""Loads all artifact results into a Parsons Table."""
run_metadata = {
key: getattr(manifest, key)
for key in (
"command",
"args",
"generated_at",
)
}
rows = []
for result in manifest.results:
row = run_metadata.copy()
row.update(
{
key: value
for key, value in result.__dict__.items
if key in ("status", "execution_time", "message", "node")
}
)
rows.append(row)
tbl = Table(rows)
return tbl

def format_result(self) -> Table:
tbls = [self.format_command_result(command) for command in self.commands]
tbl = tbls[0].concat(*tbls[1:])
return tbl

def send(self, manifests: list[Manifest]) -> None:
self.commands = manifests
log_tbl = self.format_result()

self.db_connector.copy(log_tbl, self.destination_table, if_exists="append")

0 comments on commit 15a3872

Please sign in to comment.