Skip to content

Commit 12e98b0

Browse files
committed
Also expose scheduled_dttm in the Gantt Chart
1 parent 4fec93a commit 12e98b0

7 files changed

Lines changed: 56 additions & 2 deletions

File tree

airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/gantt.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class GanttTaskInstance(BaseModel):
3030
task_display_name: str
3131
try_number: int
3232
state: TaskInstanceState | None
33+
scheduled_dttm: datetime | None
3334
queued_dttm: datetime | None
3435
start_date: datetime | None
3536
end_date: datetime | None

airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2237,6 +2237,12 @@ components:
22372237
anyOf:
22382238
- $ref: '#/components/schemas/TaskInstanceState'
22392239
- type: 'null'
2240+
scheduled_dttm:
2241+
anyOf:
2242+
- type: string
2243+
format: date-time
2244+
- type: 'null'
2245+
title: Scheduled Dttm
22402246
queued_dttm:
22412247
anyOf:
22422248
- type: string
@@ -2269,6 +2275,7 @@ components:
22692275
- task_display_name
22702276
- try_number
22712277
- state
2278+
- scheduled_dttm
22722279
- queued_dttm
22732280
- start_date
22742281
- end_date

airflow-core/src/airflow/api_fastapi/core_api/routes/ui/gantt.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def get_gantt_data(
6767
TaskInstance.task_display_name.label("task_display_name"), # type: ignore[attr-defined]
6868
TaskInstance.try_number.label("try_number"),
6969
TaskInstance.state.label("state"),
70+
TaskInstance.scheduled_dttm.label("scheduled_dttm"),
7071
TaskInstance.queued_dttm.label("queued_dttm"),
7172
TaskInstance.start_date.label("start_date"),
7273
TaskInstance.end_date.label("end_date"),
@@ -82,6 +83,7 @@ def get_gantt_data(
8283
TaskInstanceHistory.task_display_name.label("task_display_name"),
8384
TaskInstanceHistory.try_number.label("try_number"),
8485
TaskInstanceHistory.state.label("state"),
86+
TaskInstanceHistory.scheduled_dttm.label("scheduled_dttm"),
8587
TaskInstanceHistory.queued_dttm.label("queued_dttm"),
8688
TaskInstanceHistory.start_date.label("start_date"),
8789
TaskInstanceHistory.end_date.label("end_date"),
@@ -108,6 +110,7 @@ def get_gantt_data(
108110
task_display_name=row.task_display_name,
109111
try_number=row.try_number,
110112
state=row.state,
113+
scheduled_dttm=row.scheduled_dttm,
111114
queued_dttm=row.queued_dttm,
112115
start_date=row.start_date,
113116
end_date=row.end_date,

airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8046,6 +8046,18 @@ export const $GanttTaskInstance = {
80468046
}
80478047
]
80488048
},
8049+
scheduled_dttm: {
8050+
anyOf: [
8051+
{
8052+
type: 'string',
8053+
format: 'date-time'
8054+
},
8055+
{
8056+
type: 'null'
8057+
}
8058+
],
8059+
title: 'Scheduled Dttm'
8060+
},
80498061
queued_dttm: {
80508062
anyOf: [
80518063
{
@@ -8094,7 +8106,7 @@ export const $GanttTaskInstance = {
80948106
}
80958107
},
80968108
type: 'object',
8097-
required: ['task_id', 'task_display_name', 'try_number', 'state', 'queued_dttm', 'start_date', 'end_date'],
8109+
required: ['task_id', 'task_display_name', 'try_number', 'state', 'scheduled_dttm', 'queued_dttm', 'start_date', 'end_date'],
80988110
title: 'GanttTaskInstance',
80998111
description: 'Task instance data for Gantt chart.'
81008112
} as const;

airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1983,6 +1983,7 @@ export type GanttTaskInstance = {
19831983
task_display_name: string;
19841984
try_number: number;
19851985
state: TaskInstanceState | null;
1986+
scheduled_dttm: string | null;
19861987
queued_dttm: string | null;
19871988
start_date: string | null;
19881989
end_date: string | null;

airflow-core/src/airflow/ui/src/layouts/Details/Gantt/utils.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export type GanttDataItem = {
3939
isGroup?: boolean | null;
4040
isMapped?: boolean | null;
4141
isQueued?: boolean;
42+
isScheduled?: boolean;
4243
state?: TaskInstanceState | null;
4344
taskId: string;
4445
tryNumber?: number;
@@ -128,6 +129,22 @@ export const transformGanttData = ({
128129
const endTime = hasTaskRunning ? dayjs().toISOString() : tryInstance.end_date;
129130
const items: Array<GanttDataItem> = [];
130131

132+
// Scheduled segment: from scheduled_dttm to queued_dttm (or start_date if no queued_dttm)
133+
if (tryInstance.scheduled_dttm !== null) {
134+
const scheduledEnd = tryInstance.queued_dttm ?? tryInstance.start_date;
135+
136+
items.push({
137+
isGroup: false,
138+
isMapped: tryInstance.is_mapped,
139+
isScheduled: true,
140+
state: "scheduled" as TaskInstanceState,
141+
taskId: tryInstance.task_id,
142+
tryNumber: tryInstance.try_number,
143+
x: [dayjs(tryInstance.scheduled_dttm).toISOString(), dayjs(scheduledEnd).toISOString()],
144+
y: tryInstance.task_display_name,
145+
});
146+
}
147+
131148
// Queue segment: from queued_dttm to start_date
132149
if (tryInstance.queued_dttm !== null) {
133150
items.push({
@@ -351,6 +368,10 @@ export const createChartOptions = ({
351368
label(tooltipItem: TooltipItem<"bar">) {
352369
const taskInstance = data[tooltipItem.dataIndex];
353370

371+
if (taskInstance?.isScheduled) {
372+
return `${translate("state")}: ${translate("states.scheduled")}`;
373+
}
374+
354375
if (taskInstance?.isQueued) {
355376
return `${translate("state")}: ${translate("states.queued")}`;
356377
}

airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_gantt.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
"task_display_name": TASK_DISPLAY_NAME,
5252
"try_number": 1,
5353
"state": "success",
54+
"scheduled_dttm": "2024-11-30T09:50:00Z",
5455
"queued_dttm": "2024-11-30T09:55:00Z",
5556
"start_date": "2024-11-30T10:00:00Z",
5657
"end_date": "2024-11-30T10:05:00Z",
@@ -63,6 +64,7 @@
6364
"task_display_name": TASK_DISPLAY_NAME_2,
6465
"try_number": 1,
6566
"state": "failed",
67+
"scheduled_dttm": "2024-11-30T10:02:00Z",
6668
"queued_dttm": "2024-11-30T10:03:00Z",
6769
"start_date": "2024-11-30T10:05:00Z",
6870
"end_date": "2024-11-30T10:10:00Z",
@@ -75,6 +77,7 @@
7577
"task_display_name": TASK_DISPLAY_NAME_3,
7678
"try_number": 1,
7779
"state": "running",
80+
"scheduled_dttm": None,
7881
"queued_dttm": None,
7982
"start_date": "2024-11-30T10:10:00Z",
8083
"end_date": None,
@@ -119,18 +122,21 @@ def setup(dag_maker, session=None):
119122
if ti.task_id == TASK_ID:
120123
ti.state = TaskInstanceState.SUCCESS
121124
ti.try_number = 1
125+
ti.scheduled_dttm = pendulum.DateTime(2024, 11, 30, 9, 50, 0, tzinfo=pendulum.UTC)
122126
ti.queued_dttm = pendulum.DateTime(2024, 11, 30, 9, 55, 0, tzinfo=pendulum.UTC)
123127
ti.start_date = pendulum.DateTime(2024, 11, 30, 10, 0, 0, tzinfo=pendulum.UTC)
124128
ti.end_date = pendulum.DateTime(2024, 11, 30, 10, 5, 0, tzinfo=pendulum.UTC)
125129
elif ti.task_id == TASK_ID_2:
126130
ti.state = TaskInstanceState.FAILED
127131
ti.try_number = 1
132+
ti.scheduled_dttm = pendulum.DateTime(2024, 11, 30, 10, 2, 0, tzinfo=pendulum.UTC)
128133
ti.queued_dttm = pendulum.DateTime(2024, 11, 30, 10, 3, 0, tzinfo=pendulum.UTC)
129134
ti.start_date = pendulum.DateTime(2024, 11, 30, 10, 5, 0, tzinfo=pendulum.UTC)
130135
ti.end_date = pendulum.DateTime(2024, 11, 30, 10, 10, 0, tzinfo=pendulum.UTC)
131136
elif ti.task_id == TASK_ID_3:
132137
ti.state = TaskInstanceState.RUNNING
133138
ti.try_number = 1
139+
ti.scheduled_dttm = None
134140
ti.queued_dttm = None
135141
ti.start_date = pendulum.DateTime(2024, 11, 30, 10, 10, 0, tzinfo=pendulum.UTC)
136142
ti.end_date = None
@@ -312,13 +318,16 @@ def test_sorted_by_task_id_and_try_number(self, test_client):
312318
sorted_tis = sorted(task_instances, key=lambda x: (x["task_id"], x["try_number"]))
313319
assert task_instances == sorted_tis
314320

315-
def test_queued_dttm_is_returned(self, test_client):
321+
def test_timing_fields_are_returned(self, test_client):
316322
response = test_client.get(f"/gantt/{DAG_ID}/run_1")
317323
assert response.status_code == 200
318324
data = response.json()
319325
tis = {ti["task_id"]: ti for ti in data["task_instances"]}
326+
assert tis[TASK_ID]["scheduled_dttm"] == "2024-11-30T09:50:00Z"
320327
assert tis[TASK_ID]["queued_dttm"] == "2024-11-30T09:55:00Z"
328+
assert tis[TASK_ID_2]["scheduled_dttm"] == "2024-11-30T10:02:00Z"
321329
assert tis[TASK_ID_2]["queued_dttm"] == "2024-11-30T10:03:00Z"
330+
assert tis[TASK_ID_3]["scheduled_dttm"] is None
322331
assert tis[TASK_ID_3]["queued_dttm"] is None
323332

324333
def test_should_response_401(self, unauthenticated_test_client):

0 commit comments

Comments
 (0)