Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,12 @@ def create_runtime_ti(

async def create_triggers(self):
"""Drain the to_create queue and create all new triggers that have been requested in the DB."""
# Emit batch creation duration only when triggers were processed.
has_work = bool(self.to_create)

if has_work:
creation_start = time.monotonic()

while self.to_create:
await asyncio.sleep(0)
context: Context | None = None
Expand Down Expand Up @@ -1296,6 +1302,12 @@ async def create_triggers(self):
"events": 0,
}

if has_work:
stats.timing(
"triggerer.batch_trigger_creation_duration",
(time.monotonic() - creation_start) * 1000,
)

async def cancel_triggers(self):
"""
Drain the to_cancel queue and ensure all triggers that are not in the DB are cancelled.
Expand Down
42 changes: 42 additions & 0 deletions airflow-core/tests/unit/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,48 @@ async def test_trigger_kwargs_serialization_cleanup(self, session):
trigger_instance.cancel()
await runner.cleanup_finished_triggers()

@pytest.mark.asyncio
@patch("airflow.jobs.triggerer_job_runner.stats.timing")
@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs")
@patch(
"airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath",
return_value=DateTimeTrigger,
)
async def test_create_triggers_emits_creation_duration_metric(
self,
mock_get_trigger_by_classpath,
mock_decrypt_kwargs,
mock_timing,
):
mock_decrypt_kwargs.return_value = {"moment": timezone.utcnow() + datetime.timedelta(hours=1)}

workload = workloads.RunTrigger.model_construct(
id=1,
classpath="abc",
encrypted_kwargs="fake",
)

runner = TriggerRunner()
runner.to_create.append(workload)

await runner.create_triggers()

mock_timing.assert_called_once()

metric_name, metric_value = mock_timing.call_args.args

assert metric_name == "triggerer.batch_trigger_creation_duration"

# Specific metric_value is not being asserted here time.monotonic is difficult
# to mock deterministically.
assert metric_value >= 0

task = runner.triggers[workload.id]["task"]
task.cancel()
await asyncio.gather(task, return_exceptions=True)

await runner.cleanup_finished_triggers()

@pytest.mark.asyncio
@patch("airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True)
async def test_sync_state_to_supervisor(self, supervisor_builder):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,12 @@ metrics:
legacy_name: "dagrun.schedule_delay.{dag_id}"
name_variables: ["dag_id"]

- name: "triggerer.batch_trigger_creation_duration"
description: "Time in milliseconds spent creating a batch of pending triggers."
type: "timer"
legacy_name: "-"
name_variables: []

- name: "scheduler.critical_section_duration"
description: "Milliseconds spent in the critical section of scheduler loop"
type: "timer"
Expand Down
Loading