Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Return agate_table in dbt run-operation result #10957

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241031-201031.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Return agate_table in dbt run-operation result
time: 2024-10-31T20:10:31.10956+08:00
custom:
Author: acjh
Issue: "10956"
10 changes: 9 additions & 1 deletion core/dbt/artifacts/schemas/run/v5/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class RunResultOutput(BaseResult):
compiled: Optional[bool]
compiled_code: Optional[str]
relation_name: Optional[str]
agate_table: Optional["agate.Table"] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
)
batch_results: Optional[BatchResults] = None


Expand All @@ -88,6 +91,7 @@ def process_run_result(result: RunResult) -> RunResultOutput:
message=result.message,
adapter_response=result.adapter_response,
failures=result.failures,
agate_table=result.agate_table,
batch_results=result.batch_results,
compiled=result.node.compiled if compiled else None, # type:ignore
compiled_code=result.node.compiled_code if compiled else None, # type:ignore
Expand Down Expand Up @@ -128,7 +132,7 @@ def from_execution_results(
args: Dict,
):
processed_results = [
process_run_result(result) for result in results if isinstance(result, RunResult)
cls._process_run_result(result) for result in results if isinstance(result, RunResult)
]
meta = RunResultsMetadata(
dbt_schema_version=str(cls.dbt_schema_version),
Expand Down Expand Up @@ -178,5 +182,9 @@ def upgrade_schema_version(cls, data):
result["relation_name"] = ""
return cls.from_dict(data)

@classmethod
def _process_run_result(cls, result: RunResult) -> RunResultOutput:
return process_run_result(result)

def write(self, path: str):
write_json(path, self.to_dict(omit_none=False))
7 changes: 6 additions & 1 deletion core/dbt/task/run_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ def _run_unsafe(self, package_name, macro_name) -> "agate.Table":
macro_name, project=package_name, kwargs=macro_kwargs, macro_resolver=self.manifest
)

if isinstance(res, str):
return None
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: When there is no explicit return function call in the macro — like for the select_something test — res is a str of newline characters and spaces, at least as observed with the Postgres and SQLite adapters.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more robust check is not isinstance(res, agate.Table) but there appears to be deliberate avoidance of import agate. Let me know if that is preferred.


return res

def run(self) -> RunResultsArtifact:
Expand All @@ -61,10 +64,11 @@ def run(self) -> RunResultsArtifact:

success = True
package_name, macro_name = self._get_macro_parts()
execute_macro_result = None

with collect_timing_info("execute", timing.append):
try:
self._run_unsafe(package_name, macro_name)
execute_macro_result = self._run_unsafe(package_name, macro_name)
except dbt_common.exceptions.DbtBaseException as exc:
fire_event(RunningOperationCaughtError(exc=str(exc)))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
Expand Down Expand Up @@ -113,6 +117,7 @@ def run(self) -> RunResultsArtifact:
),
thread_id=threading.current_thread().name,
timing=timing,
agate_table=execute_macro_result,
batch_results=None,
)

Expand Down
Empty file.
16 changes: 16 additions & 0 deletions tests/functional/adapter/dbt_run_operations/fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
happy_macros_sql = """
{% macro select_something(name) %}
{% set query %}
select 'hello, {{ name }}' as name
{% endset %}
{% set table = run_query(query) %}
{% endmacro %}

{% macro select_something_with_return(name) %}
{% set query %}
select 'hello, {{ name }}' as name
{% endset %}
{% set table = run_query(query) %}
{% do return(table) %}
{% endmacro %}
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import pytest
import yaml

from dbt.tests.util import run_dbt
from tests.functional.adapter.dbt_run_operations.fixtures import happy_macros_sql


# -- Below we define base classes for tests you import based on if your adapter supports dbt run-operation or not --
class BaseRunOperationResult:
@pytest.fixture(scope="class")
def macros(self):
return {"happy_macros.sql": happy_macros_sql}

def run_operation(self, macro, expect_pass=True, extra_args=None, **kwargs):
args = ["run-operation", macro]
if kwargs:
args.extend(("--args", yaml.safe_dump(kwargs)))
if extra_args:
args.extend(extra_args)
return run_dbt(args, expect_pass=expect_pass)

def test_result_without_return(self, project):
results = self.run_operation("select_something", name="world")
assert results.results[0].agate_table is None

def test_result_with_return(self, project):
results = self.run_operation("select_something_with_return", name="world")
assert len(results.results[0].agate_table) == 1
assert results.results[0].agate_table.rows[0]["name"] == "hello, world"


class TestPostgresRunOperationResult(BaseRunOperationResult):
pass