Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/68544.feature.rst

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Newsfragment is only added for important changes. I think this is not needed here.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The Dags page now offers a collapsible folder navigation tree, built from each Dag's file location, that lets you browse and filter Dags by the folder they live in.
40 changes: 40 additions & 0 deletions airflow-core/src/airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,43 @@ def depends(cls, owners: list[str] = Query(default_factory=list)) -> _OwnersFilt
return cls().set_value(owners)


class _RelativeFilelocPrefixFilter(BaseParam[str | None]):
"""
Filter Dags by the folder they live in, derived from ``relative_fileloc``.

The value is treated as a directory path relative to the bundle root (e.g.
``team_a/etl``). It matches every Dag whose file lives directly in that folder
or in any of its subfolders, using an escaped ``LIKE 'team_a/etl/%'`` so a
folder name is never a substring/prefix of another (``team_a`` won't match
``team_alpha``). Dags at the bundle root (no ``/`` in ``relative_fileloc``)
are not matched by any folder value and appear only when no folder is selected.
"""

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select

if not self.value:
return select

directory = self.value.rstrip("/")
escaped = _escape_like_pattern(directory)
return select.where(DagModel.relative_fileloc.like(f"{escaped}/%", escape=_LIKE_ESCAPE_CHAR))

@classmethod
def depends(
cls,
relative_fileloc_prefix: str | None = Query(
default=None,
description=(
"Filter Dags by the folder (directory of ``relative_fileloc``) they live in. "
"Matches the given folder and all of its subfolders."
),
),
) -> _RelativeFilelocPrefixFilter:
return cls().set_value(relative_fileloc_prefix)


def _safe_parse_datetime(date_to_check: str) -> datetime:
"""
Parse datetime and raise error for invalid dates.
Expand Down Expand Up @@ -1126,6 +1163,9 @@ def depends_float(
]
QueryTagsFilter = Annotated[_TagsFilter, Depends(_TagsFilter.depends)]
QueryOwnersFilter = Annotated[_OwnersFilter, Depends(_OwnersFilter.depends)]
QueryRelativeFilelocPrefixFilter = Annotated[
_RelativeFilelocPrefixFilter, Depends(_RelativeFilelocPrefixFilter.depends)
]


class _HasAssetScheduleFilter(BaseParam[bool]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,10 @@ class DAGWithLatestDagRunsCollectionResponse(BaseModel):

total_entries: int
dags: list[DAGWithLatestDagRunsResponse]


class DagFolderCollectionResponse(BaseModel):
"""Collection of distinct Dag folders (directories of ``relative_fileloc``)."""

folders: list[str]
total_entries: int
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,18 @@ paths:
- type: string
- type: 'null'
title: Bundle Version
- name: relative_fileloc_prefix
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
description: Filter Dags by the folder (directory of ``relative_fileloc``)
they live in. Matches the given folder and all of its subfolders.
title: Relative Fileloc Prefix
description: Filter Dags by the folder (directory of ``relative_fileloc``)
they live in. Matches the given folder and all of its subfolders.
- name: order_by
in: query
required: false
Expand Down Expand Up @@ -538,6 +550,35 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/ui/dags/folders:
get:
tags:
- DAG
summary: Get Dag Folders
description: 'Get the distinct folders the readable Dags live in.


A folder is the directory part of a Dag''s ``relative_fileloc`` (relative
to its

bundle root). Dags located directly at the bundle root have no folder and
are

not represented here. The result powers the folder navigation tree in the
UI,

which reconstructs the hierarchy by splitting each path on ``/``.'
operationId: get_dag_folders
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DagFolderCollectionResponse'
security:
- OAuth2PasswordBearer: []
- HTTPBearer: []
/ui/dags/{dag_id}/latest_run:
get:
tags:
Expand Down Expand Up @@ -2360,6 +2401,22 @@ components:
- file_token
title: DAGWithLatestDagRunsResponse
description: DAG with latest dag runs response serializer.
DagFolderCollectionResponse:
properties:
folders:
items:
type: string
type: array
title: Folders
total_entries:
type: integer
title: Total Entries
type: object
required:
- folders
- total_entries
title: DagFolderCollectionResponse
description: Collection of distinct Dag folders (directories of ``relative_fileloc``).
DagRunState:
type: string
enum:
Expand Down
35 changes: 35 additions & 0 deletions airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from __future__ import annotations

from pathlib import PurePosixPath
from typing import Annotated

from fastapi import Depends, HTTPException, status
Expand Down Expand Up @@ -49,6 +50,7 @@
QueryOwnersFilter,
QueryPausedFilter,
QueryPendingActionsFilter,
QueryRelativeFilelocPrefixFilter,
QueryTagsFilter,
SortParam,
filter_param_factory,
Expand All @@ -57,6 +59,7 @@
from airflow.api_fastapi.core_api.datamodels.dags import DAG_ALIAS_MAPPING, DAGResponse
from airflow.api_fastapi.core_api.datamodels.ui.dag_runs import DAGRunLightResponse
from airflow.api_fastapi.core_api.datamodels.ui.dags import (
DagFolderCollectionResponse,
DAGWithLatestDagRunsCollectionResponse,
DAGWithLatestDagRunsResponse,
)
Expand Down Expand Up @@ -105,6 +108,7 @@ def get_dags(
last_dag_run_state: QueryLastDagRunStateFilter,
bundle_name: QueryBundleNameFilter,
bundle_version: QueryBundleVersionFilter,
relative_fileloc_prefix: QueryRelativeFilelocPrefixFilter,
order_by: Annotated[
SortParam,
Depends(
Expand Down Expand Up @@ -155,6 +159,7 @@ def get_dags(
readable_dags_filter,
bundle_name,
bundle_version,
relative_fileloc_prefix,
],
order_by=order_by,
offset=offset,
Expand Down Expand Up @@ -253,6 +258,36 @@ def get_dags(
)


@dags_router.get(
"/folders",
dependencies=[Depends(requires_access_dag(method="GET"))],
operation_id="get_dag_folders",
)
def get_dag_folders(
readable_dags_filter: ReadableDagsFilterDep,
session: SessionDep,
) -> DagFolderCollectionResponse:
"""
Get the distinct folders the readable Dags live in.

A folder is the directory part of a Dag's ``relative_fileloc`` (relative to its
bundle root). Dags located directly at the bundle root have no folder and are
not represented here. The result powers the folder navigation tree in the UI,
which reconstructs the hierarchy by splitting each path on ``/``.
"""
query = readable_dags_filter.to_orm(
select(DagModel.relative_fileloc).where(DagModel.relative_fileloc.is_not(None)).distinct()
)
folders: set[str] = set()
for relative_fileloc in session.scalars(query):
parent = PurePosixPath(relative_fileloc).parent
if str(parent) != ".":
folders.add(str(parent))

sorted_folders = sorted(folders)
return DagFolderCollectionResponse(folders=sorted_folders, total_entries=len(sorted_folders))


@dags_router.get(
"/{dag_id}/latest_run",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
Expand Down
9 changes: 7 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ export const UseDagServiceGetDagTagsKeyFn = ({ limit, offset, orderBy, tagNamePa
export type DagServiceGetDagsUiDefaultResponse = Awaited<ReturnType<typeof DagService.getDagsUi>>;
export type DagServiceGetDagsUiQueryResult<TData = DagServiceGetDagsUiDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDagServiceGetDagsUiKey = "DagServiceGetDagsUi";
export const UseDagServiceGetDagsUiKeyFn = ({ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagDisplayNamePrefixPattern, dagIdPattern, dagIdPrefixPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, hasImportErrors, hasPendingActions, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }: {
export const UseDagServiceGetDagsUiKeyFn = ({ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagDisplayNamePrefixPattern, dagIdPattern, dagIdPrefixPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, hasImportErrors, hasPendingActions, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, relativeFilelocPrefix, tags, tagsMatchMode }: {
assetDependency?: string;
bundleName?: string;
bundleVersion?: string;
Expand All @@ -347,9 +347,14 @@ export const UseDagServiceGetDagsUiKeyFn = ({ assetDependency, bundleName, bundl
orderBy?: string[];
owners?: string[];
paused?: boolean;
relativeFilelocPrefix?: string;
tags?: string[];
tagsMatchMode?: "any" | "all";
} = {}, queryKey?: Array<unknown>) => [useDagServiceGetDagsUiKey, ...(queryKey ?? [{ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagDisplayNamePrefixPattern, dagIdPattern, dagIdPrefixPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, hasImportErrors, hasPendingActions, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }])];
} = {}, queryKey?: Array<unknown>) => [useDagServiceGetDagsUiKey, ...(queryKey ?? [{ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagDisplayNamePrefixPattern, dagIdPattern, dagIdPrefixPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, hasImportErrors, hasPendingActions, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, relativeFilelocPrefix, tags, tagsMatchMode }])];
export type DagServiceGetDagFoldersDefaultResponse = Awaited<ReturnType<typeof DagService.getDagFolders>>;
export type DagServiceGetDagFoldersQueryResult<TData = DagServiceGetDagFoldersDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDagServiceGetDagFoldersKey = "DagServiceGetDagFolders";
export const UseDagServiceGetDagFoldersKeyFn = (queryKey?: Array<unknown>) => [useDagServiceGetDagFoldersKey, ...(queryKey ?? [])];
export type DagServiceGetLatestRunInfoDefaultResponse = Awaited<ReturnType<typeof DagService.getLatestRunInfo>>;
export type DagServiceGetLatestRunInfoQueryResult<TData = DagServiceGetLatestRunInfoDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDagServiceGetLatestRunInfoKey = "DagServiceGetLatestRunInfo";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ export const ensureUseDagServiceGetDagTagsData = (queryClient: QueryClient, { li
* @param data.lastDagRunState
* @param data.bundleName
* @param data.bundleVersion
* @param data.relativeFilelocPrefix Filter Dags by the folder (directory of ``relative_fileloc``) they live in. Matches the given folder and all of its subfolders.
* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `dag_id, dag_display_name, next_dagrun, state, start_date, last_run_state, last_run_start_date`
* @param data.isFavorite
* @param data.hasAssetSchedule Filter Dags with asset-based scheduling
Expand All @@ -673,7 +674,7 @@ export const ensureUseDagServiceGetDagTagsData = (queryClient: QueryClient, { li
* @returns DAGWithLatestDagRunsCollectionResponse Successful Response
* @throws ApiError
*/
export const ensureUseDagServiceGetDagsUiData = (queryClient: QueryClient, { assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagDisplayNamePrefixPattern, dagIdPattern, dagIdPrefixPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, hasImportErrors, hasPendingActions, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }: {
export const ensureUseDagServiceGetDagsUiData = (queryClient: QueryClient, { assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagDisplayNamePrefixPattern, dagIdPattern, dagIdPrefixPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, hasImportErrors, hasPendingActions, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, relativeFilelocPrefix, tags, tagsMatchMode }: {
assetDependency?: string;
bundleName?: string;
bundleVersion?: string;
Expand All @@ -694,9 +695,22 @@ export const ensureUseDagServiceGetDagsUiData = (queryClient: QueryClient, { ass
orderBy?: string[];
owners?: string[];
paused?: boolean;
relativeFilelocPrefix?: string;
tags?: string[];
tagsMatchMode?: "any" | "all";
} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseDagServiceGetDagsUiKeyFn({ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagDisplayNamePrefixPattern, dagIdPattern, dagIdPrefixPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, hasImportErrors, hasPendingActions, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }), queryFn: () => DagService.getDagsUi({ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagDisplayNamePrefixPattern, dagIdPattern, dagIdPrefixPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, hasImportErrors, hasPendingActions, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }) });
} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseDagServiceGetDagsUiKeyFn({ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagDisplayNamePrefixPattern, dagIdPattern, dagIdPrefixPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, hasImportErrors, hasPendingActions, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, relativeFilelocPrefix, tags, tagsMatchMode }), queryFn: () => DagService.getDagsUi({ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagDisplayNamePrefixPattern, dagIdPattern, dagIdPrefixPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, hasImportErrors, hasPendingActions, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, relativeFilelocPrefix, tags, tagsMatchMode }) });
/**
* Get Dag Folders
* Get the distinct folders the readable Dags live in.
*
* A folder is the directory part of a Dag's ``relative_fileloc`` (relative to its
* bundle root). Dags located directly at the bundle root have no folder and are
* not represented here. The result powers the folder navigation tree in the UI,
* which reconstructs the hierarchy by splitting each path on ``/``.
* @returns DagFolderCollectionResponse Successful Response
* @throws ApiError
*/
export const ensureUseDagServiceGetDagFoldersData = (queryClient: QueryClient) => queryClient.ensureQueryData({ queryKey: Common.UseDagServiceGetDagFoldersKeyFn(), queryFn: () => DagService.getDagFolders() });
/**
* Get Latest Run Info
* Get latest run.
Expand Down
18 changes: 16 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ export const prefetchUseDagServiceGetDagTags = (queryClient: QueryClient, { limi
* @param data.lastDagRunState
* @param data.bundleName
* @param data.bundleVersion
* @param data.relativeFilelocPrefix Filter Dags by the folder (directory of ``relative_fileloc``) they live in. Matches the given folder and all of its subfolders.
* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `dag_id, dag_display_name, next_dagrun, state, start_date, last_run_state, last_run_start_date`
* @param data.isFavorite
* @param data.hasAssetSchedule Filter Dags with asset-based scheduling
Expand All @@ -673,7 +674,7 @@ export const prefetchUseDagServiceGetDagTags = (queryClient: QueryClient, { limi
* @returns DAGWithLatestDagRunsCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseDagServiceGetDagsUi = (queryClient: QueryClient, { assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagDisplayNamePrefixPattern, dagIdPattern, dagIdPrefixPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, hasImportErrors, hasPendingActions, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }: {
export const prefetchUseDagServiceGetDagsUi = (queryClient: QueryClient, { assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagDisplayNamePrefixPattern, dagIdPattern, dagIdPrefixPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, hasImportErrors, hasPendingActions, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, relativeFilelocPrefix, tags, tagsMatchMode }: {
assetDependency?: string;
bundleName?: string;
bundleVersion?: string;
Expand All @@ -694,9 +695,22 @@ export const prefetchUseDagServiceGetDagsUi = (queryClient: QueryClient, { asset
orderBy?: string[];
owners?: string[];
paused?: boolean;
relativeFilelocPrefix?: string;
tags?: string[];
tagsMatchMode?: "any" | "all";
} = {}) => queryClient.prefetchQuery({ queryKey: Common.UseDagServiceGetDagsUiKeyFn({ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagDisplayNamePrefixPattern, dagIdPattern, dagIdPrefixPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, hasImportErrors, hasPendingActions, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }), queryFn: () => DagService.getDagsUi({ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagDisplayNamePrefixPattern, dagIdPattern, dagIdPrefixPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, hasImportErrors, hasPendingActions, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }) });
} = {}) => queryClient.prefetchQuery({ queryKey: Common.UseDagServiceGetDagsUiKeyFn({ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagDisplayNamePrefixPattern, dagIdPattern, dagIdPrefixPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, hasImportErrors, hasPendingActions, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, relativeFilelocPrefix, tags, tagsMatchMode }), queryFn: () => DagService.getDagsUi({ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagDisplayNamePrefixPattern, dagIdPattern, dagIdPrefixPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, hasImportErrors, hasPendingActions, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, relativeFilelocPrefix, tags, tagsMatchMode }) });
/**
* Get Dag Folders
* Get the distinct folders the readable Dags live in.
*
* A folder is the directory part of a Dag's ``relative_fileloc`` (relative to its
* bundle root). Dags located directly at the bundle root have no folder and are
* not represented here. The result powers the folder navigation tree in the UI,
* which reconstructs the hierarchy by splitting each path on ``/``.
* @returns DagFolderCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseDagServiceGetDagFolders = (queryClient: QueryClient) => queryClient.prefetchQuery({ queryKey: Common.UseDagServiceGetDagFoldersKeyFn(), queryFn: () => DagService.getDagFolders() });
/**
* Get Latest Run Info
* Get latest run.
Expand Down
Loading
Loading