diff --git a/doc/changes/DM-54645.misc.md b/doc/changes/DM-54645.misc.md new file mode 100644 index 000000000..177e14254 --- /dev/null +++ b/doc/changes/DM-54645.misc.md @@ -0,0 +1 @@ +Added garbage collection metrics to `SingleQuantumExecutor` with metrics stored in task metadata under `quantum.gc_metrics` key. diff --git a/python/lsst/pipe/base/gc_metrics.py b/python/lsst/pipe/base/gc_metrics.py new file mode 100644 index 000000000..a667314b9 --- /dev/null +++ b/python/lsst/pipe/base/gc_metrics.py @@ -0,0 +1,129 @@ +# This file is part of pipe_base. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["GcMetrics"] + +import gc +from collections import defaultdict +from types import TracebackType +from typing import Self + +import pydantic + +from ._task_metadata import TaskMetadata + + +def _gc_stats() -> dict[str, list[int]]: + """Convert result of `gc.get_stats` to a dictionary of lists.""" + result: dict[str, list[int]] = defaultdict(list) + for gen_stats in gc.get_stats(): + for key, stat in gen_stats.items(): + result[key].append(stat) + return result + + +class GcMetrics(pydantic.BaseModel): + """Context manager which collects GC metrics and converts them into + a dictionary suitable for TaskMetadata. + """ + + start_isenabled: bool | None = None + """Whether GC is enabled on entering context (`bool` or `None`).""" + + end_isenabled: bool | None = None + """Whether GC is enabled on exiting context (`bool` or `None`).""" + + start_threshold: list[int] | None = None + """GC thresholds on entering context (`list`[`int`] or `None`).""" + + end_threshold: list[int] | None = None + """GC thresholds on exiting context (`list`[`int`] or `None`).""" + + start_count: list[int] | None = None + """GC collection counts on entering context (`list`[`int`] or `None`).""" + + end_count: list[int] | None = None + """GC collection counts on exiting context (`list`[`int`] or `None`).""" + + start_stats: dict[str, list[int]] | None = None + """GC stats on entering context (`dict`[`str`, `list`[`int`]] or `None`). + + These are the same values as returned from `gc.get_stats` but rearranged + to be indexed by string key first and generation second. + """ + + end_stats: dict[str, list[int]] | None = None + """GC stats on exiting context, same format as `start_stats` + (`dict`[`str`, `list`[`int`]] or `None`). + """ + + def __enter__(self) -> Self: + self.start_isenabled = gc.isenabled() + self.start_threshold = list(gc.get_threshold()) + self.start_count = list(gc.get_count()) + self.start_stats = _gc_stats() + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + self.end_isenabled = gc.isenabled() + self.end_threshold = list(gc.get_threshold()) + self.end_count = list(gc.get_count()) + self.end_stats = _gc_stats() + + @classmethod + def from_task_metadata(cls, metadata: TaskMetadata) -> GcMetrics | None: + """Extract GC metrics from task metadata. + + Parameters + ---------- + metadata : `TaskMetadata` + Metadata written by + `.single_quantum_executor.SingleQuantumExecutor`. + + Returns + ------- + gc_metrics : `GcMetrics` or `None` + GC metrics for this quantum, or `None` if the expected fields were + not found. + """ + try: + quantum_metadata = metadata["quantum"] + except KeyError: + return None + try: + gc_metadata = quantum_metadata["gc_metrics"] + except KeyError: + return None + + return GcMetrics(**gc_metadata.to_dict()) diff --git a/python/lsst/pipe/base/quantum_graph/aggregator/_writer.py b/python/lsst/pipe/base/quantum_graph/aggregator/_writer.py index bb7e08a52..5db6d91ff 100644 --- a/python/lsst/pipe/base/quantum_graph/aggregator/_writer.py +++ b/python/lsst/pipe/base/quantum_graph/aggregator/_writer.py @@ -30,6 +30,7 @@ __all__ = ("Writer",) import dataclasses +from collections.abc import ByteString import zstandard @@ -163,7 +164,7 @@ def make_compression_dictionary(self) -> zstandard.ZstdCompressionDict: self.comms.log.info("Making compressor with no dictionary.") return zstandard.ZstdCompressionDict(b"") self.comms.log.info("Training compression dictionary.") - training_inputs: list[bytes] = [] + training_inputs: list[ByteString] = [] # We start the dictionary training with *predicted* quantum dataset # models, since those have almost all of the same attributes as the # provenance quantum and dataset models, and we can get a nice random diff --git a/python/lsst/pipe/base/single_quantum_executor.py b/python/lsst/pipe/base/single_quantum_executor.py index 97c2ad5bd..cdec3e98f 100644 --- a/python/lsst/pipe/base/single_quantum_executor.py +++ b/python/lsst/pipe/base/single_quantum_executor.py @@ -56,6 +56,7 @@ QuantumSuccessCaveats, ) from .connections import AdjustQuantumHelper +from .gc_metrics import GcMetrics from .log_capture import LogCapture, _ExecutionLogRecordsExtra from .pipeline_graph import TaskNode from .pipelineTask import PipelineTask @@ -307,7 +308,7 @@ def _execute_with_limited_butler( task = self._task_factory.makeTask(task_node, limited_butler, init_input_refs) logInfo(None, "start", metadata=quantumMetadata) # type: ignore[arg-type] outputs_put: list[uuid.UUID] = [] - with limited_butler.record_metrics() as butler_metrics: + with limited_butler.record_metrics() as butler_metrics, GcMetrics() as gc_metrics: caveats = self._run_quantum( task, quantum, task_node, limited_butler, quantum_id=quantum_id, ids_put=outputs_put ) @@ -327,6 +328,7 @@ def _execute_with_limited_butler( raise else: quantumMetadata["butler_metrics"] = butler_metrics.model_dump() + quantumMetadata["gc_metrics"] = gc_metrics.model_dump() quantumMetadata["caveats"] = caveats.value # Stringify the UUID for easier compatibility with # PropertyList. diff --git a/tests/test_single_quantum_executor.py b/tests/test_single_quantum_executor.py index e640670c5..82410541e 100644 --- a/tests/test_single_quantum_executor.py +++ b/tests/test_single_quantum_executor.py @@ -32,6 +32,7 @@ import unittest import lsst.pipe.base.automatic_connection_constants as acc +from lsst.pipe.base.gc_metrics import GcMetrics from lsst.pipe.base.resource_usage import QuantumResourceUsage from lsst.pipe.base.single_quantum_executor import SingleQuantumExecutor from lsst.pipe.base.tests.mocks import InMemoryRepo @@ -71,6 +72,18 @@ def test_simple_execute(self) -> None: self.assertGreater(ru.total_time, 0) self.assertLess(ru.total_time, t2 - t1) + # Check that GC metrics are filled. + gc_metrics = GcMetrics.from_task_metadata(md) + self.assertIsNotNone(gc_metrics) + self.assertTrue(gc_metrics.start_isenabled) + self.assertTrue(gc_metrics.end_isenabled) + self.assertEqual(len(gc_metrics.start_threshold), 3) + self.assertEqual(len(gc_metrics.end_threshold), 3) + self.assertEqual(len(gc_metrics.start_count), 3) + self.assertEqual(len(gc_metrics.end_count), 3) + self.assertEqual(set(gc_metrics.start_stats), {"collections", "collected", "uncollectable"}) + self.assertEqual(set(gc_metrics.end_stats), {"collections", "collected", "uncollectable"}) + def test_skip_existing_execute(self) -> None: """Run execute() method twice, with skip_existing_in.""" helper = InMemoryRepo("base.yaml")