Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 Migrate /object/grid_data from views to FastAPI #44332

Merged
merged 22 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7b88e81
Include grid endpoint to FastAPI
bugraoz93 Nov 25, 2024
2af0c39
Finalise the endpoint other than dynamically created tasks and addres…
bugraoz93 Nov 27, 2024
c6446ad
Finalise the endpoint with dynamically mapped tasks and include unit …
bugraoz93 Nov 28, 2024
54588be
Use SessionDep for session param
bugraoz93 Dec 1, 2024
95c66d5
Create service/grid.py and move methods, adjust usage of SortParam, u…
bugraoz93 Dec 4, 2024
a2b0d43
Fix unit test according to adjustments
bugraoz93 Dec 4, 2024
7c8227f
Include missing MappedTaskGroup parent check to MappedOperator
bugraoz93 Dec 4, 2024
b4939ff
Remove remaining version check
bugraoz93 Dec 5, 2024
bb2c770
Move service/ to services/ui/, make sort_param unique for dag_run_sor…
bugraoz93 Dec 8, 2024
3d2fd13
Fix SortParam creation
bugraoz93 Dec 8, 2024
58dadbc
Revert changes in QueryLastDagRunStateFilter
bugraoz93 Dec 8, 2024
fd451ec
include missing task_count to parent_node for recursive taskgroups, f…
bugraoz93 Dec 9, 2024
458423b
Rename num_runs to limit for consistency, make base_date filter range…
bugraoz93 Dec 9, 2024
59d7377
Fix task_count and states for nested task_groups, add alias to run_id…
bugraoz93 Dec 12, 2024
282d44c
Rebase and rerun pre-commit
bugraoz93 Dec 12, 2024
81b728e
Change GridTaskInstanceSummary state to TaskInstanceState object
bugraoz93 Dec 12, 2024
e9cd69c
Fix setting state in GridTaskInstanceSummary, change name states to c…
bugraoz93 Dec 12, 2024
c2ba6e3
Select all model columns, move priority to paramteres as state_priori…
bugraoz93 Dec 20, 2024
418d6f7
Fix run_type and state param not working due to naming and include re…
bugraoz93 Dec 21, 2024
f8cfc9d
Move SortParam not provided comparison logic to view
bugraoz93 Dec 22, 2024
f3908ee
Remove forgotten code piece
bugraoz93 Dec 22, 2024
f957ac4
Remove None from param definition, adjust the log of wrong DagRun typ…
bugraoz93 Dec 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from airflow.typing_compat import Self
from airflow.utils import timezone
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType

if TYPE_CHECKING:
from sqlalchemy.sql import ColumnElement, Select
Expand Down Expand Up @@ -528,6 +529,32 @@ def _transform_dag_run_states(states: Iterable[str] | None) -> list[DagRunState
),
]


def _transform_dag_run_types(types: list[str] | None) -> list[DagRunType | None] | None:
try:
if not types:
return None
return [None if run_type in ("none", None) else DagRunType(run_type) for run_type in types]
except ValueError:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid value for run type. Valid values are {', '.join(DagRunType)}",
)


QueryDagRunRunTypesFilter = Annotated[
FilterParam[list[str]],
Depends(
filter_param_factory(
attribute=DagRun.run_type,
_type=list[str],
filter_option=FilterOptionEnum.ANY_EQUAL,
default_factory=list,
transform_callable=_transform_dag_run_types,
)
),
]

# DAGTags
QueryDagTagPatternSearch = Annotated[
_SearchParam, Depends(search_param_factory(DagTag.name, "tag_name_pattern"))
Expand Down Expand Up @@ -601,3 +628,28 @@ def _transform_ti_states(states: list[str] | None) -> list[TaskInstanceState | N
QueryVariableKeyPatternSearch = Annotated[
_SearchParam, Depends(search_param_factory(Variable.key, "variable_key_pattern"))
]


# UI Shared
def _optional_boolean(value: bool | None) -> bool | None:
return value if value is not None else False


QueryIncludeUpstream = Annotated[Union[bool], AfterValidator(_optional_boolean)]
QueryIncludeDownstream = Annotated[Union[bool], AfterValidator(_optional_boolean)]

state_priority: list[None | TaskInstanceState] = [
TaskInstanceState.FAILED,
TaskInstanceState.UPSTREAM_FAILED,
TaskInstanceState.UP_FOR_RETRY,
TaskInstanceState.UP_FOR_RESCHEDULE,
TaskInstanceState.QUEUED,
TaskInstanceState.SCHEDULED,
TaskInstanceState.DEFERRED,
TaskInstanceState.RUNNING,
TaskInstanceState.RESTARTING,
None,
TaskInstanceState.SUCCESS,
TaskInstanceState.SKIPPED,
TaskInstanceState.REMOVED,
]
62 changes: 62 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/ui/grid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from datetime import datetime
from uuid import UUID

from pydantic import BaseModel, Field

from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType


class GridTaskInstanceSummary(BaseModel):
"""Task Instance Summary model for the Grid UI."""

task_id: str
try_number: int
start_date: datetime | None
end_date: datetime | None
queued_dttm: datetime | None
child_states: dict[str, int] | None
task_count: int
state: TaskInstanceState | None
note: str | None


class GridDAGRunwithTIs(BaseModel):
"""DAG Run model for the Grid UI."""

run_id: str = Field(serialization_alias="dag_run_id", validation_alias="run_id")
queued_at: datetime | None
start_date: datetime | None
end_date: datetime | None
state: DagRunState
run_type: DagRunType
data_interval_start: datetime | None
data_interval_end: datetime | None
version_number: UUID | None
note: str | None
task_instances: list[GridTaskInstanceSummary]


class GridResponse(BaseModel):
"""Response model for the Grid UI."""

dag_runs: list[GridDAGRunwithTIs]
Loading