From f8c89829eb0a6930c943a1a40f64827de48fa476 Mon Sep 17 00:00:00 2001 From: Andy Salnikov Date: Thu, 28 May 2026 11:53:15 -0700 Subject: [PATCH 1/5] Add garbage collection metrics to task metadata (DM-54645) SingleQuantumExecutor uses new GcMetrics context manager to collect GC metrics and add them to task metadata under "quantum.gc_metrics" key. --- python/lsst/pipe/base/gc_metrics.py | 139 ++++++++++++++++++ .../lsst/pipe/base/single_quantum_executor.py | 4 +- tests/test_single_quantum_executor.py | 13 ++ 3 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 python/lsst/pipe/base/gc_metrics.py diff --git a/python/lsst/pipe/base/gc_metrics.py b/python/lsst/pipe/base/gc_metrics.py new file mode 100644 index 000000000..ef93c7c76 --- /dev/null +++ b/python/lsst/pipe/base/gc_metrics.py @@ -0,0 +1,139 @@ +# This file is part of pipe_base. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["GcMetrics"] + +import gc +from collections import defaultdict +from types import TracebackType +from typing import Self + +import pydantic + +from ._task_metadata import TaskMetadata + + +def _gc_stats() -> dict[str, list[int]]: + """Convert result of `gc.get_stats` to a dictionary of lists.""" + result: dict[str, list[int]] = defaultdict(list) + for gen_stats in gc.get_stats(): + for key, stat in gen_stats.items(): + result[key].append(stat) + return result + + +class GcMetrics(pydantic.BaseModel): + """Context manager which collects GC metrics and converts them into + a dictionary suitable for TaskMetadata. + """ + + start_isenabled: bool | None = None + """Whether GC is enabled on entering context.""" + + end_isenabled: bool | None = None + """Whether GC is enabled on exiting context.""" + + start_threshold: list[int] | None = None + """GC thresholds on entering context.""" + + end_threshold: list[int] | None = None + """GC thresholds on exiting context.""" + + start_count: list[int] | None = None + """GC collection counts on entering context.""" + + end_count: list[int] | None = None + """GC collection counts on exiting context.""" + + start_stats: dict[str, list[int]] | None = None + """GC stats on entering context. + + These are the same values as returned from `gc.get_stats` but rearranged + to be indexed by string key first and generation second. + """ + + end_stats: dict[str, list[int]] | None = None + """GC stats on exiting context, same format as `start_stats`.""" + + def __enter__(self) -> Self: + self.start_isenabled = gc.isenabled() + self.start_threshold = list(gc.get_threshold()) + self.start_count = list(gc.get_count()) + self.start_stats = _gc_stats() + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + self.end_isenabled = gc.isenabled() + self.end_threshold = list(gc.get_threshold()) + self.end_count = list(gc.get_count()) + self.end_stats = _gc_stats() + + @classmethod + def from_task_metadata(cls, metadata: TaskMetadata) -> GcMetrics | None: + """Extract GC metrics from task metadata. + + Parameters + ---------- + metadata : `TaskMetadata` + Metadata written by + `.single_quantum_executor.SingleQuantumExecutor`. + + Returns + ------- + gc_metrics : `GcMetrics` or `None` + GC metrics for this quantum, or `None` if the expected fields were + not found. + """ + try: + quantum_metadata = metadata["quantum"] + except KeyError: + return None + try: + gc_metadata = quantum_metadata["gc_metrics"] + except KeyError: + return None + + try: + return GcMetrics( + start_isenabled=gc_metadata["start_isenabled"], + end_isenabled=gc_metadata["end_isenabled"], + start_threshold=gc_metadata.getArray("start_threshold"), + end_threshold=gc_metadata.getArray("end_threshold"), + start_count=gc_metadata.getArray("start_count"), + end_count=gc_metadata.getArray("end_count"), + start_stats=gc_metadata["start_stats"].to_dict(), + end_stats=gc_metadata["end_stats"].to_dict(), + ) + except KeyError: + return None diff --git a/python/lsst/pipe/base/single_quantum_executor.py b/python/lsst/pipe/base/single_quantum_executor.py index 97c2ad5bd..cdec3e98f 100644 --- a/python/lsst/pipe/base/single_quantum_executor.py +++ b/python/lsst/pipe/base/single_quantum_executor.py @@ -56,6 +56,7 @@ QuantumSuccessCaveats, ) from .connections import AdjustQuantumHelper +from .gc_metrics import GcMetrics from .log_capture import LogCapture, _ExecutionLogRecordsExtra from .pipeline_graph import TaskNode from .pipelineTask import PipelineTask @@ -307,7 +308,7 @@ def _execute_with_limited_butler( task = self._task_factory.makeTask(task_node, limited_butler, init_input_refs) logInfo(None, "start", metadata=quantumMetadata) # type: ignore[arg-type] outputs_put: list[uuid.UUID] = [] - with limited_butler.record_metrics() as butler_metrics: + with limited_butler.record_metrics() as butler_metrics, GcMetrics() as gc_metrics: caveats = self._run_quantum( task, quantum, task_node, limited_butler, quantum_id=quantum_id, ids_put=outputs_put ) @@ -327,6 +328,7 @@ def _execute_with_limited_butler( raise else: quantumMetadata["butler_metrics"] = butler_metrics.model_dump() + quantumMetadata["gc_metrics"] = gc_metrics.model_dump() quantumMetadata["caveats"] = caveats.value # Stringify the UUID for easier compatibility with # PropertyList. diff --git a/tests/test_single_quantum_executor.py b/tests/test_single_quantum_executor.py index e640670c5..82410541e 100644 --- a/tests/test_single_quantum_executor.py +++ b/tests/test_single_quantum_executor.py @@ -32,6 +32,7 @@ import unittest import lsst.pipe.base.automatic_connection_constants as acc +from lsst.pipe.base.gc_metrics import GcMetrics from lsst.pipe.base.resource_usage import QuantumResourceUsage from lsst.pipe.base.single_quantum_executor import SingleQuantumExecutor from lsst.pipe.base.tests.mocks import InMemoryRepo @@ -71,6 +72,18 @@ def test_simple_execute(self) -> None: self.assertGreater(ru.total_time, 0) self.assertLess(ru.total_time, t2 - t1) + # Check that GC metrics are filled. + gc_metrics = GcMetrics.from_task_metadata(md) + self.assertIsNotNone(gc_metrics) + self.assertTrue(gc_metrics.start_isenabled) + self.assertTrue(gc_metrics.end_isenabled) + self.assertEqual(len(gc_metrics.start_threshold), 3) + self.assertEqual(len(gc_metrics.end_threshold), 3) + self.assertEqual(len(gc_metrics.start_count), 3) + self.assertEqual(len(gc_metrics.end_count), 3) + self.assertEqual(set(gc_metrics.start_stats), {"collections", "collected", "uncollectable"}) + self.assertEqual(set(gc_metrics.end_stats), {"collections", "collected", "uncollectable"}) + def test_skip_existing_execute(self) -> None: """Run execute() method twice, with skip_existing_in.""" helper = InMemoryRepo("base.yaml") From 8d293264ce9dd9e98a27b7cdc07dffe057af8e2b Mon Sep 17 00:00:00 2001 From: Andy Salnikov Date: Thu, 28 May 2026 11:58:10 -0700 Subject: [PATCH 2/5] Add news fragment --- doc/changes/DM-54645.misc.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 doc/changes/DM-54645.misc.md diff --git a/doc/changes/DM-54645.misc.md b/doc/changes/DM-54645.misc.md new file mode 100644 index 000000000..8a5ddc100 --- /dev/null +++ b/doc/changes/DM-54645.misc.md @@ -0,0 +1 @@ +SingleQuantumExecutor adds garbage collection metrics to task metadata under "quantum.gc_metrics" key. From ac0bcb1872388a8005a4d26097dc0fb0bb6dc524 Mon Sep 17 00:00:00 2001 From: Andy Salnikov Date: Thu, 28 May 2026 12:39:29 -0700 Subject: [PATCH 3/5] Fix mypy2 complaint --- python/lsst/pipe/base/quantum_graph/aggregator/_writer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/lsst/pipe/base/quantum_graph/aggregator/_writer.py b/python/lsst/pipe/base/quantum_graph/aggregator/_writer.py index bb7e08a52..5db6d91ff 100644 --- a/python/lsst/pipe/base/quantum_graph/aggregator/_writer.py +++ b/python/lsst/pipe/base/quantum_graph/aggregator/_writer.py @@ -30,6 +30,7 @@ __all__ = ("Writer",) import dataclasses +from collections.abc import ByteString import zstandard @@ -163,7 +164,7 @@ def make_compression_dictionary(self) -> zstandard.ZstdCompressionDict: self.comms.log.info("Making compressor with no dictionary.") return zstandard.ZstdCompressionDict(b"") self.comms.log.info("Training compression dictionary.") - training_inputs: list[bytes] = [] + training_inputs: list[ByteString] = [] # We start the dictionary training with *predicted* quantum dataset # models, since those have almost all of the same attributes as the # provenance quantum and dataset models, and we can get a nice random From fad68ac244776f668d61216890d4c26df78baed8 Mon Sep 17 00:00:00 2001 From: Andy Salnikov Date: Thu, 28 May 2026 14:52:03 -0700 Subject: [PATCH 4/5] Update doc/changes/DM-54645.misc.md Co-authored-by: Tim Jenness --- doc/changes/DM-54645.misc.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/changes/DM-54645.misc.md b/doc/changes/DM-54645.misc.md index 8a5ddc100..177e14254 100644 --- a/doc/changes/DM-54645.misc.md +++ b/doc/changes/DM-54645.misc.md @@ -1 +1 @@ -SingleQuantumExecutor adds garbage collection metrics to task metadata under "quantum.gc_metrics" key. +Added garbage collection metrics to `SingleQuantumExecutor` with metrics stored in task metadata under `quantum.gc_metrics` key. From d9935856b0e22a0cd63cebb31d23217c2b16be5c Mon Sep 17 00:00:00 2001 From: Andy Salnikov Date: Thu, 28 May 2026 15:00:22 -0700 Subject: [PATCH 5/5] Apply review suggestions --- python/lsst/pipe/base/gc_metrics.py | 32 ++++++++++------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/python/lsst/pipe/base/gc_metrics.py b/python/lsst/pipe/base/gc_metrics.py index ef93c7c76..a667314b9 100644 --- a/python/lsst/pipe/base/gc_metrics.py +++ b/python/lsst/pipe/base/gc_metrics.py @@ -54,32 +54,34 @@ class GcMetrics(pydantic.BaseModel): """ start_isenabled: bool | None = None - """Whether GC is enabled on entering context.""" + """Whether GC is enabled on entering context (`bool` or `None`).""" end_isenabled: bool | None = None - """Whether GC is enabled on exiting context.""" + """Whether GC is enabled on exiting context (`bool` or `None`).""" start_threshold: list[int] | None = None - """GC thresholds on entering context.""" + """GC thresholds on entering context (`list`[`int`] or `None`).""" end_threshold: list[int] | None = None - """GC thresholds on exiting context.""" + """GC thresholds on exiting context (`list`[`int`] or `None`).""" start_count: list[int] | None = None - """GC collection counts on entering context.""" + """GC collection counts on entering context (`list`[`int`] or `None`).""" end_count: list[int] | None = None - """GC collection counts on exiting context.""" + """GC collection counts on exiting context (`list`[`int`] or `None`).""" start_stats: dict[str, list[int]] | None = None - """GC stats on entering context. + """GC stats on entering context (`dict`[`str`, `list`[`int`]] or `None`). These are the same values as returned from `gc.get_stats` but rearranged to be indexed by string key first and generation second. """ end_stats: dict[str, list[int]] | None = None - """GC stats on exiting context, same format as `start_stats`.""" + """GC stats on exiting context, same format as `start_stats` + (`dict`[`str`, `list`[`int`]] or `None`). + """ def __enter__(self) -> Self: self.start_isenabled = gc.isenabled() @@ -124,16 +126,4 @@ def from_task_metadata(cls, metadata: TaskMetadata) -> GcMetrics | None: except KeyError: return None - try: - return GcMetrics( - start_isenabled=gc_metadata["start_isenabled"], - end_isenabled=gc_metadata["end_isenabled"], - start_threshold=gc_metadata.getArray("start_threshold"), - end_threshold=gc_metadata.getArray("end_threshold"), - start_count=gc_metadata.getArray("start_count"), - end_count=gc_metadata.getArray("end_count"), - start_stats=gc_metadata["start_stats"].to_dict(), - end_stats=gc_metadata["end_stats"].to_dict(), - ) - except KeyError: - return None + return GcMetrics(**gc_metadata.to_dict())