From d8a749d449ecbce68e12d14c0d2b7d39b1deb67c Mon Sep 17 00:00:00 2001 From: Sameer Mesiah Date: Sun, 14 Jun 2026 00:13:37 +0100 Subject: [PATCH] Triggerer: add batch trigger creation duration metric Add a timing metric that records the time spent creating all pending triggers during a single create_triggers() invocation. Also add a unit test verifying that the metric is emitted when triggers are successfully created. --- .../src/airflow/jobs/triggerer_job_runner.py | 12 ++++++ .../tests/unit/jobs/test_triggerer_job.py | 42 +++++++++++++++++++ .../metrics/metrics_template.yaml | 6 +++ 3 files changed, 60 insertions(+) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 326ab14d7a4b0..512c264ad9921 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -1234,6 +1234,12 @@ def create_runtime_ti( async def create_triggers(self): """Drain the to_create queue and create all new triggers that have been requested in the DB.""" + # Emit batch creation duration only when triggers were processed. + has_work = bool(self.to_create) + + if has_work: + creation_start = time.monotonic() + while self.to_create: await asyncio.sleep(0) context: Context | None = None @@ -1296,6 +1302,12 @@ async def create_triggers(self): "events": 0, } + if has_work: + stats.timing( + "triggerer.batch_trigger_creation_duration", + (time.monotonic() - creation_start) * 1000, + ) + async def cancel_triggers(self): """ Drain the to_cancel queue and ensure all triggers that are not in the DB are cancelled. diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 6464274a93899..6d42b7bfa691c 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -1025,6 +1025,48 @@ async def test_trigger_kwargs_serialization_cleanup(self, session): trigger_instance.cancel() await runner.cleanup_finished_triggers() + @pytest.mark.asyncio + @patch("airflow.jobs.triggerer_job_runner.stats.timing") + @patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs") + @patch( + "airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath", + return_value=DateTimeTrigger, + ) + async def test_create_triggers_emits_creation_duration_metric( + self, + mock_get_trigger_by_classpath, + mock_decrypt_kwargs, + mock_timing, + ): + mock_decrypt_kwargs.return_value = {"moment": timezone.utcnow() + datetime.timedelta(hours=1)} + + workload = workloads.RunTrigger.model_construct( + id=1, + classpath="abc", + encrypted_kwargs="fake", + ) + + runner = TriggerRunner() + runner.to_create.append(workload) + + await runner.create_triggers() + + mock_timing.assert_called_once() + + metric_name, metric_value = mock_timing.call_args.args + + assert metric_name == "triggerer.batch_trigger_creation_duration" + + # Specific metric_value is not being asserted here time.monotonic is difficult + # to mock deterministically. + assert metric_value >= 0 + + task = runner.triggers[workload.id]["task"] + task.cancel() + await asyncio.gather(task, return_exceptions=True) + + await runner.cleanup_finished_triggers() + @pytest.mark.asyncio @patch("airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True) async def test_sync_state_to_supervisor(self, supervisor_builder): diff --git a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml index 42e2781dcfabb..7670cdab60012 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml +++ b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml @@ -676,6 +676,12 @@ metrics: legacy_name: "dagrun.schedule_delay.{dag_id}" name_variables: ["dag_id"] + - name: "triggerer.batch_trigger_creation_duration" + description: "Time in milliseconds spent creating a batch of pending triggers." + type: "timer" + legacy_name: "-" + name_variables: [] + - name: "scheduler.critical_section_duration" description: "Milliseconds spent in the critical section of scheduler loop" type: "timer"