Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3369c70
Add API endpoint and log reader for deadline callback logs
May 27, 2026
fb3f4a5
UI: Add CallbackLogViewer component for deadline alerts
May 27, 2026
00982e6
Regenerate services.gen.ts to include getCallbackLogs endpoint
Jun 3, 2026
ec78401
Fix services.gen.ts to match generated output from OpenAPI spec
Jun 3, 2026
1f0b018
Retrigger CI — LatestBoto failure is pre-existing on main
Jun 5, 2026
c1cf756
Skip bedrock-agentcore tests when service unavailable in LatestBoto
Jun 5, 2026
e663cd5
Retrigger CI — WebKit e2e flake + Glue timeout unrelated to PR
Jun 8, 2026
f307b79
Retrigger CI — Docker Hub 500 error (infra)
Jun 8, 2026
e67ff4f
Retrigger CI — Docker Hub 502 Bad Gateway
Jun 8, 2026
8de8984
Add triggerer callback log capture for async deadline callbacks
Jun 9, 2026
28179b1
UI: Reuse TaskLogContent for callback logs to match task log UX
Jun 10, 2026
0ad90f7
Retrigger CI — GitHub artifact download 401 (infra)
Jun 10, 2026
52f6230
Retrigger CI — GitHub artifact service auth outage
Jun 10, 2026
7868a51
Retrigger CI — persistent GitHub artifact auth outage
Jun 10, 2026
e9acca6
Retrigger CI — verify TS6196 was stale (StructuredLogMessage used on …
Jun 10, 2026
b7dfacd
UI: Remove redundant type assertion in callback log parser
Jun 10, 2026
3eb2989
Retrigger CI — docker provider test timeout + kafka infra errors (unr…
Jun 10, 2026
4d331ee
Retrigger CI — docker compose up failed to start e2e containers (runn…
Jun 10, 2026
1e5aa2a
Retrigger CI — flaky test_one_failed_trigger_rule_in_mapped_task_grou…
Jun 12, 2026
637b886
Retrigger CI — flaky test_one_failed_trigger_rule_in_mapped_task_grou…
Jun 12, 2026
339c421
Retrigger CI — flaky test_one_failed_trigger_rule_in_mapped_task_grou…
Jun 12, 2026
37a61c6
Retrigger CI — flaky test_one_failed_trigger_rule_in_mapped_task_grou…
Jun 12, 2026
d1abd75
Retrigger CI — flaky test_one_failed_trigger_rule_in_mapped_task_grou…
Jun 12, 2026
7567838
Retrigger CI — flaky mapped-task test (6th) + pre-existing main spell…
Jun 12, 2026
3a0f813
Retrigger CI — migration-ref-doc hook hit cache-extraction failure (p…
Jun 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class DeadlineResponse(BaseModel):
dag_run_id: str = Field(validation_alias=AliasPath("dagrun", "run_id"))
alert_id: UUID | None = Field(validation_alias="deadline_alert_id", default=None)
alert_name: str | None = Field(validation_alias=AliasPath("deadline_alert", "name"), default=None)
callback_id: UUID | None = Field(validation_alias="callback_id", default=None)
callback_state: str | None = Field(validation_alias=AliasPath("callback", "state"), default=None)


class DeadlineCollectionResponse(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,60 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/ui/dags/{dag_id}/dagRuns/{dag_run_id}/callbacks/{callback_id}/logs:
get:
tags:
- Deadlines
summary: Get Callback Logs
description: 'Get execution logs for a callback associated with a deadline.


Returns the logs produced during callback execution. These logs are uploaded

to remote storage (or written locally) by the callback supervisor after execution.'
operationId: get_callback_logs
security:
- OAuth2PasswordBearer: []
- HTTPBearer: []
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: callback_id
in: path
required: true
schema:
type: string
format: uuid
title: Callback Id
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/TaskInstancesLogResponse'
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/ui/structure/structure_data:
get:
tags:
Expand Down Expand Up @@ -2575,6 +2629,17 @@ components:
- type: string
- type: 'null'
title: Alert Name
callback_id:
anyOf:
- type: string
format: uuid
- type: 'null'
title: Callback Id
callback_state:
anyOf:
- type: string
- type: 'null'
title: Callback State
type: object
required:
- id
Expand Down Expand Up @@ -3558,6 +3623,21 @@ components:
- nodes
title: StructureDataResponse
description: Structure Data serializer for responses.
StructuredLogMessage:
properties:
timestamp:
type: string
format: date-time
title: Timestamp
event:
type: string
title: Event
additionalProperties: true
type: object
required:
- event
title: StructuredLogMessage
description: An individual log message.
TaskInstanceResponse:
properties:
id:
Expand Down Expand Up @@ -3826,6 +3906,28 @@ components:
- awaiting_input
title: TaskInstanceStateCount
description: TaskInstance serializer for responses.
TaskInstancesLogResponse:
properties:
content:
anyOf:
- items:
$ref: '#/components/schemas/StructuredLogMessage'
type: array
- items:
type: string
type: array
title: Content
continuation_token:
anyOf:
- type: string
- type: 'null'
title: Continuation Token
type: object
required:
- content
- continuation_token
title: TaskInstancesLogResponse
description: Log serializer for responses.
TeamCollectionResponse:
properties:
teams:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

from __future__ import annotations

import os
from typing import Annotated
from uuid import UUID

from fastapi import Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.orm import contains_eager, noload
from sqlalchemy.orm import contains_eager, joinedload, noload

from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
Expand All @@ -35,16 +37,19 @@
filter_param_factory,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse
from airflow.api_fastapi.core_api.datamodels.ui.deadline import (
DeadlineAlertCollectionResponse,
DeadlineCollectionResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import ReadableDagRunsFilterDep, requires_access_dag
from airflow.models.callback import Callback
from airflow.models.dagrun import DagRun
from airflow.models.deadline import Deadline
from airflow.models.deadline_alert import DeadlineAlert
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.log.callback_log_reader import read_callback_log

deadlines_router = AirflowRouter(prefix="/dags/{dag_id}", tags=["Deadlines"])

Expand Down Expand Up @@ -106,7 +111,7 @@ def get_deadlines(
.options(
contains_eager(Deadline.dagrun).options(noload(DagRun.deadlines)),
contains_eager(Deadline.deadline_alert),
noload(Deadline.callback),
joinedload(Deadline.callback),
)
)

Expand Down Expand Up @@ -201,3 +206,80 @@ def get_dag_deadline_alerts(
alerts = session.scalars(alerts_select)

return DeadlineAlertCollectionResponse(deadline_alerts=alerts, total_entries=total_entries)


@deadlines_router.get(
"/dagRuns/{dag_run_id}/callbacks/{callback_id}/logs",
responses=create_openapi_http_exception_doc(
[
status.HTTP_404_NOT_FOUND,
]
),
dependencies=[
Depends(
requires_access_dag(
method="GET",
access_entity=DagAccessEntity.TASK_LOGS,
)
),
],
response_model=TaskInstancesLogResponse,
response_model_exclude_unset=True,
)
def get_callback_logs(
dag_id: str,
dag_run_id: str,
callback_id: UUID,
session: SessionDep,
) -> TaskInstancesLogResponse:
"""
Get execution logs for a callback associated with a deadline.

Returns the logs produced during callback execution. These logs are uploaded
to remote storage (or written locally) by the callback supervisor after execution.
"""
# Sanitize path components to prevent path traversal via URL parameters.
for param_name, param_value in [("dag_id", dag_id), ("dag_run_id", dag_run_id)]:
if os.sep in param_value or "\\" in param_value or ".." in param_value:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
f"Invalid characters in {param_name}",
)

# Verify the callback exists
callback = session.scalar(select(Callback).where(Callback.id == callback_id))
if callback is None:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"Callback with id `{callback_id}` was not found",
)

# Verify the dag_run exists with matching dag_id
dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id))
if dag_run is None:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found",
)

Comment thread
ferruzzi marked this conversation as resolved.
# Verify the callback actually belongs to this dag_run (via the Deadline relationship)
deadline = session.scalar(
select(Deadline).where(Deadline.callback_id == callback_id, Deadline.dagrun_id == dag_run.id)
)
if deadline is None:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"Callback `{callback_id}` is not associated with DagRun `{dag_run_id}` of Dag `{dag_id}`",
)

try:
log_stream = read_callback_log(
dag_id=dag_id,
run_id=dag_run_id,
callback_id=str(callback_id),
)
except ValueError:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid callback log path")

content = list(log_stream)
return TaskInstancesLogResponse.model_construct(content=content, continuation_token=None)
41 changes: 40 additions & 1 deletion airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ def from_api_response(cls, response: HITLDetailResponse) -> HITLDetailResponseRe
class TriggerLoggingFactory:
log_path: str

ti: RuntimeTI = attrs.field(repr=False)
ti: RuntimeTI | None = attrs.field(default=None, repr=False)

bound_logger: WrappedLogger = attrs.field(init=False, repr=False)

Expand Down Expand Up @@ -416,8 +416,35 @@ def upload_to_remote(self):
# Never actually called, nothing to do
return

if self.ti is None:
# Callback triggers have no task instance — upload using the path directly.
self._upload_callback_log_to_remote()
return

upload_to_remote(self.bound_logger, self.ti)

def _upload_callback_log_to_remote(self):
"""Upload callback trigger logs to remote storage without a task instance."""
from airflow.sdk.log import load_remote_log_handler, relative_path_from_logger

handler = load_remote_log_handler()
if not handler:
return

raw_logger = getattr(self.bound_logger, "_logger")
try:
relative_path = relative_path_from_logger(raw_logger)
except Exception:
return
if not relative_path:
return

log_relative_path = relative_path.as_posix()
try:
handler.upload(log_relative_path, None) # type: ignore[arg-type]
except Exception:
log.warning("Failed to upload callback trigger logs to remote", log_path=log_relative_path)


def in_process_api_server() -> InProcessExecutionAPI:
from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
Expand Down Expand Up @@ -752,6 +779,18 @@ def _create_workload(
session: Session,
) -> workloads.RunTrigger | None:
if trigger.task_instance is None:
# Set up dedicated logging for callback triggers so their output is
# captured to a file that the UI log endpoint can later read.
if trigger.callback:
callback_data = trigger.callback.data or {}
dag_id = callback_data.get("dag_id", "unknown")
run_id = callback_data.get("run_id", "unknown")
callback_id = str(trigger.callback.id)
log_path = f"triggerer_callbacks/{dag_id}/{run_id}/{callback_id}"
self.logger_cache[trigger.id] = TriggerLoggingFactory(
log_path=log_path,
ti=None,
)
return workloads.RunTrigger(
id=trigger.id,
classpath=trigger.classpath,
Expand Down
8 changes: 8 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,14 @@ export const UseDeadlinesServiceGetDagDeadlineAlertsKeyFn = ({ dagId, limit, off
offset?: number;
orderBy?: string[];
}, queryKey?: Array<unknown>) => [useDeadlinesServiceGetDagDeadlineAlertsKey, ...(queryKey ?? [{ dagId, limit, offset, orderBy }])];
export type DeadlinesServiceGetCallbackLogsDefaultResponse = Awaited<ReturnType<typeof DeadlinesService.getCallbackLogs>>;
export type DeadlinesServiceGetCallbackLogsQueryResult<TData = DeadlinesServiceGetCallbackLogsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDeadlinesServiceGetCallbackLogsKey = "DeadlinesServiceGetCallbackLogs";
export const UseDeadlinesServiceGetCallbackLogsKeyFn = ({ callbackId, dagId, dagRunId }: {
callbackId: string;
dagId: string;
dagRunId: string;
}, queryKey?: Array<unknown>) => [useDeadlinesServiceGetCallbackLogsKey, ...(queryKey ?? [{ callbackId, dagId, dagRunId }])];
export type StructureServiceStructureDataDefaultResponse = Awaited<ReturnType<typeof StructureService.structureData>>;
export type StructureServiceStructureDataQueryResult<TData = StructureServiceStructureDataDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useStructureServiceStructureDataKey = "StructureServiceStructureData";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1907,6 +1907,24 @@ export const ensureUseDeadlinesServiceGetDagDeadlineAlertsData = (queryClient: Q
orderBy?: string[];
}) => queryClient.ensureQueryData({ queryKey: Common.UseDeadlinesServiceGetDagDeadlineAlertsKeyFn({ dagId, limit, offset, orderBy }), queryFn: () => DeadlinesService.getDagDeadlineAlerts({ dagId, limit, offset, orderBy }) });
/**
* Get Callback Logs
* Get execution logs for a callback associated with a deadline.
*
* Returns the logs produced during callback execution. These logs are uploaded
* to remote storage (or written locally) by the callback supervisor after execution.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.callbackId
* @returns TaskInstancesLogResponse Successful Response
* @throws ApiError
*/
export const ensureUseDeadlinesServiceGetCallbackLogsData = (queryClient: QueryClient, { callbackId, dagId, dagRunId }: {
callbackId: string;
dagId: string;
dagRunId: string;
}) => queryClient.ensureQueryData({ queryKey: Common.UseDeadlinesServiceGetCallbackLogsKeyFn({ callbackId, dagId, dagRunId }), queryFn: () => DeadlinesService.getCallbackLogs({ callbackId, dagId, dagRunId }) });
/**
* Structure Data
* Get Structure Data.
* @param data The data for the request.
Expand Down
18 changes: 18 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1907,6 +1907,24 @@ export const prefetchUseDeadlinesServiceGetDagDeadlineAlerts = (queryClient: Que
orderBy?: string[];
}) => queryClient.prefetchQuery({ queryKey: Common.UseDeadlinesServiceGetDagDeadlineAlertsKeyFn({ dagId, limit, offset, orderBy }), queryFn: () => DeadlinesService.getDagDeadlineAlerts({ dagId, limit, offset, orderBy }) });
/**
* Get Callback Logs
* Get execution logs for a callback associated with a deadline.
*
* Returns the logs produced during callback execution. These logs are uploaded
* to remote storage (or written locally) by the callback supervisor after execution.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.callbackId
* @returns TaskInstancesLogResponse Successful Response
* @throws ApiError
*/
export const prefetchUseDeadlinesServiceGetCallbackLogs = (queryClient: QueryClient, { callbackId, dagId, dagRunId }: {
callbackId: string;
dagId: string;
dagRunId: string;
}) => queryClient.prefetchQuery({ queryKey: Common.UseDeadlinesServiceGetCallbackLogsKeyFn({ callbackId, dagId, dagRunId }), queryFn: () => DeadlinesService.getCallbackLogs({ callbackId, dagId, dagRunId }) });
/**
* Structure Data
* Get Structure Data.
* @param data The data for the request.
Expand Down
Loading
Loading