Skip to content

DM-54879: Add --retry-missing-outputs-for for when upstream outputs are selectively retained#363

Open
hsinfang wants to merge 2 commits into
mainfrom
tickets/DM-54879
Open

DM-54879: Add --retry-missing-outputs-for for when upstream outputs are selectively retained#363
hsinfang wants to merge 2 commits into
mainfrom
tickets/DM-54879

Conversation

@hsinfang
Copy link
Copy Markdown
Contributor

@hsinfang hsinfang commented May 12, 2026

Checklist

  • ran Jenkins
  • added a release note for user-visible changes to doc/changes

@codecov
Copy link
Copy Markdown

codecov Bot commented May 12, 2026

❌ 12 Tests Failed:

Tests completed Failed Passed Skipped
49 12 37 0
View the top 3 failed test(s) by shortest run time
tests/test_run.py::RunTestCase::test_simple_qg_retry_missing_outputs_for_skipped
Stack Traces | 0.546s run time
self = <test_run.RunTestCase testMethod=test_simple_qg_retry_missing_outputs_for_skipped>

    def test_simple_qg_retry_missing_outputs_for_skipped(self):
        """With --retry-missing-outputs-for, a task whose outputs are all
        present in skip_existing_in is still skipped, even if metadata is
        absent.
    
        Verifies that the option does not re-run when outputs are complete.
        """
        with DirectButlerRepo.make_temporary() as (helper, root):
            helper.add_task()
            helper.insert_datasets("dataset_auto0")
            kwargs = self._make_run_args(
                "-b",
                root,
                "-i",
                helper.input_chain,
                "-o",
                "output",
                "--register-dataset-types",
                pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
            )
>           qg1 = script.qgraph(**kwargs)
                  ^^^^^^^^^^^^^^^^^^^^^^^

tests/test_run.py:743: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../cli/script/qgraph.py:309: in qgraph
    graph_builder = AllDimensionsQuantumGraphBuilder(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <lsst.pipe.base.all_dimensions_quantum_graph_builder.AllDimensionsQuantumGraphBuilder object at 0x7ff5652aea80>
pipeline_graph = PipelineGraph('', tasks={task_auto1})
butler = <lsst.daf.butler.direct_butler._direct_butler.DirectButler object at 0x7ff565800150>
where = ''
dataset_query_constraint = <class 'lsst.pipe.base._datasetQueryConstraints._ALL'>
bind = None, data_id_tables = []
kwargs = {'clobber': False, 'input_collections': ('input_run',), 'output_run': 'output/20260528T234307Z', 'retry_missing_outputs_for': (), ...}

    def __init__(
        self,
        pipeline_graph: PipelineGraph,
        butler: Butler,
        *,
        where: str = "",
        dataset_query_constraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL,
        bind: Mapping[str, Any] | None = None,
        data_id_tables: Iterable[astropy.table.Table] = (),
        **kwargs: Any,
    ):
>       super().__init__(pipeline_graph, butler, **kwargs)
E       TypeError: QuantumGraphBuilder.__init__() got an unexpected keyword argument 'retry_missing_outputs_for'

.../test/lib/python3.13.../pipe/base/all_dimensions_quantum_graph_builder.py:126: TypeError
tests/test_run.py::RunTestCase::test_empty_qg
Stack Traces | 0.645s run time
self = <test_run.RunTestCase testMethod=test_empty_qg>

    def test_empty_qg(self):
        """Test that making an empty QG produces the right error messages."""
        with DirectButlerRepo.make_temporary("base.yaml") as (helper, root):
            helper.add_task(dimensions=["instrument"])
            helper.add_task(dimensions=["instrument"])
            helper.pipeline_graph.resolve(registry=helper.butler.registry)
            helper.butler.registry.registerDatasetType(
                helper.pipeline_graph.dataset_types["dataset_auto0"].dataset_type
            )
            kwargs = self._make_run_args(
                "-b",
                root,
                "-i",
                helper.input_chain,
                "-o",
                "output",
                pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
            )
            # Note that we haven't inserted any datasets into this repo; that's
            # how we'll force an empty graph.
            with self.assertLogs(level=logging.ERROR) as cm:
>               qg = script.qgraph(**kwargs)
                     ^^^^^^^^^^^^^^^^^^^^^^^

tests/test_run.py:275: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../cli/script/qgraph.py:309: in qgraph
    graph_builder = AllDimensionsQuantumGraphBuilder(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <lsst.pipe.base.all_dimensions_quantum_graph_builder.AllDimensionsQuantumGraphBuilder object at 0x7f7915516060>
pipeline_graph = PipelineGraph('', tasks={task_auto1, task_auto2})
butler = <lsst.daf.butler.direct_butler._direct_butler.DirectButler object at 0x7f7915733ed0>
where = ''
dataset_query_constraint = <class 'lsst.pipe.base._datasetQueryConstraints._ALL'>
bind = None, data_id_tables = []
kwargs = {'clobber': False, 'input_collections': ('input_run',), 'output_run': 'output/20260528T234302Z', 'retry_missing_outputs_for': (), ...}

    def __init__(
        self,
        pipeline_graph: PipelineGraph,
        butler: Butler,
        *,
        where: str = "",
        dataset_query_constraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL,
        bind: Mapping[str, Any] | None = None,
        data_id_tables: Iterable[astropy.table.Table] = (),
        **kwargs: Any,
    ):
>       super().__init__(pipeline_graph, butler, **kwargs)
E       TypeError: QuantumGraphBuilder.__init__() got an unexpected keyword argument 'retry_missing_outputs_for'

.../test/lib/python3.13.../pipe/base/all_dimensions_quantum_graph_builder.py:126: TypeError
tests/test_run.py::RunTestCase::test_simple_qgraph_qbb
Stack Traces | 0.747s run time
self = <test_run.RunTestCase testMethod=test_simple_qgraph_qbb>

    def test_simple_qgraph_qbb(self):
        """Test execution of a trivial quantum graph in QBB mode."""
        with DirectButlerRepo.make_temporary() as (helper, root):
            helper.add_task()
            helper.add_task()
            helper.insert_datasets("dataset_auto0")
            # It's unusual to put a QG in a butler root, but since we've
            # already got a temp dir, we might as well use it.
            qg_file_1 = os.path.join(root, "test1.qg")
            kwargs = self._make_run_args(
                "-b",
                root,
                "-i",
                helper.input_chain,
                "-o",
                "output",
                "--register-dataset-types",
                "--qgraph-datastore-records",
                "--save-qgraph",
                qg_file_1,
                pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
            )
>           qg = script.qgraph(**kwargs)
                 ^^^^^^^^^^^^^^^^^^^^^^^

tests/test_run.py:195: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../cli/script/qgraph.py:309: in qgraph
    graph_builder = AllDimensionsQuantumGraphBuilder(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <lsst.pipe.base.all_dimensions_quantum_graph_builder.AllDimensionsQuantumGraphBuilder object at 0x7f79150b7550>
pipeline_graph = PipelineGraph('', tasks={task_auto1, task_auto2})
butler = <lsst.daf.butler.direct_butler._direct_butler.DirectButler object at 0x7f7915e08dd0>
where = ''
dataset_query_constraint = <class 'lsst.pipe.base._datasetQueryConstraints._ALL'>
bind = None, data_id_tables = []
kwargs = {'clobber': False, 'input_collections': ('input_run',), 'output_run': 'output/20260528T234308Z', 'retry_missing_outputs_for': (), ...}

    def __init__(
        self,
        pipeline_graph: PipelineGraph,
        butler: Butler,
        *,
        where: str = "",
        dataset_query_constraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL,
        bind: Mapping[str, Any] | None = None,
        data_id_tables: Iterable[astropy.table.Table] = (),
        **kwargs: Any,
    ):
>       super().__init__(pipeline_graph, butler, **kwargs)
E       TypeError: QuantumGraphBuilder.__init__() got an unexpected keyword argument 'retry_missing_outputs_for'

.../test/lib/python3.13.../pipe/base/all_dimensions_quantum_graph_builder.py:126: TypeError
tests/test_run.py::RunTestCase::test_simple_qg_skip_existing_inputs
Stack Traces | 0.751s run time
self = <test_run.RunTestCase testMethod=test_simple_qg_skip_existing_inputs>

    def test_simple_qg_skip_existing_inputs(self):
        """Test for case when output data for one task already appears in
        the *input* collection, but no ``--extend-run`` or ``-skip-existing``
        option is present.
        """
        with DirectButlerRepo.make_temporary() as (helper, root):
            helper.add_task()
            helper.insert_datasets("dataset_auto0")
            kwargs = self._make_run_args(
                "-b",
                root,
                "-i",
                helper.input_chain,
                "-o",
                "output",
                "--register-dataset-types",
                pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
            )
>           qg1 = script.qgraph(**kwargs)
                  ^^^^^^^^^^^^^^^^^^^^^^^

tests/test_run.py:349: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../cli/script/qgraph.py:309: in qgraph
    graph_builder = AllDimensionsQuantumGraphBuilder(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <lsst.pipe.base.all_dimensions_quantum_graph_builder.AllDimensionsQuantumGraphBuilder object at 0x7f7ba90de250>
pipeline_graph = PipelineGraph('', tasks={task_auto1})
butler = <lsst.daf.butler.direct_butler._direct_butler.DirectButler object at 0x7f7ba85defd0>
where = ''
dataset_query_constraint = <class 'lsst.pipe.base._datasetQueryConstraints._ALL'>
bind = None, data_id_tables = []
kwargs = {'clobber': False, 'input_collections': ('input_run',), 'output_run': 'output/20260528T234308Z', 'retry_missing_outputs_for': (), ...}

    def __init__(
        self,
        pipeline_graph: PipelineGraph,
        butler: Butler,
        *,
        where: str = "",
        dataset_query_constraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL,
        bind: Mapping[str, Any] | None = None,
        data_id_tables: Iterable[astropy.table.Table] = (),
        **kwargs: Any,
    ):
>       super().__init__(pipeline_graph, butler, **kwargs)
E       TypeError: QuantumGraphBuilder.__init__() got an unexpected keyword argument 'retry_missing_outputs_for'

.../test/lib/python3.13.../pipe/base/all_dimensions_quantum_graph_builder.py:126: TypeError
tests/test_run.py::RunTestCase::test_simple_qg_rebase
Stack Traces | 0.758s run time
self = <test_run.RunTestCase testMethod=test_simple_qg_rebase>

    def test_simple_qg_rebase(self):
        """Test execution of a trivial quantum graph, with --rebase used to
        force redefinition of the output collection.
        """
        with DirectButlerRepo.make_temporary(input_chain="test1") as (helper, root):
            helper.add_task()
            helper.add_task()
            helper.insert_datasets("dataset_auto0")
            # Pass one input collection here for the usual test setup; we'll
            # override it later.
            kwargs = self._make_run_args(
                "-b",
                root,
                "-i",
                helper.input_chain,
                "-o",
                "output",
                "--register-dataset-types",
                pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
            )
            # We'll actually pass two input collections in.  One is empty, but
            # the stuff we're testing here doesn't care.
            kwargs["input"] = ["test2", "test1"]
            helper.butler.collections.register("test2", CollectionType.RUN)
            # Set up the output collection with a sequence that doesn't end the
            # same way as the input collection.  This is normally an error.
            helper.butler.collections.register("output", CollectionType.CHAINED)
            helper.butler.collections.register("unexpected_input", CollectionType.RUN)
            helper.butler.collections.register("output/run0", CollectionType.RUN)
            helper.butler.collections.redefine_chain(
                "output", ["test2", "unexpected_input", "test1", "output/run0"]
            )
            # Without --rebase, the inconsistent input and output collections
            # are an error.
            with self.assertRaises(ValueError):
                script.qgraph(**kwargs)
            # With --rebase, the output collection gets redefined.
            kwargs["rebase"] = True
>           qg = script.qgraph(**kwargs)
                 ^^^^^^^^^^^^^^^^^^^^^^^

tests/test_run.py:161: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../cli/script/qgraph.py:309: in qgraph
    graph_builder = AllDimensionsQuantumGraphBuilder(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <lsst.pipe.base.all_dimensions_quantum_graph_builder.AllDimensionsQuantumGraphBuilder object at 0x7f791562f350>
pipeline_graph = PipelineGraph('', tasks={task_auto1, task_auto2})
butler = <lsst.daf.butler.direct_butler._direct_butler.DirectButler object at 0x7f7915788350>
where = ''
dataset_query_constraint = <class 'lsst.pipe.base._datasetQueryConstraints._ALL'>
bind = None, data_id_tables = []
kwargs = {'clobber': False, 'input_collections': ('output',), 'output_run': 'output/20260528T234306Z', 'retry_missing_outputs_for': (), ...}

    def __init__(
        self,
        pipeline_graph: PipelineGraph,
        butler: Butler,
        *,
        where: str = "",
        dataset_query_constraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL,
        bind: Mapping[str, Any] | None = None,
        data_id_tables: Iterable[astropy.table.Table] = (),
        **kwargs: Any,
    ):
>       super().__init__(pipeline_graph, butler, **kwargs)
E       TypeError: QuantumGraphBuilder.__init__() got an unexpected keyword argument 'retry_missing_outputs_for'

.../test/lib/python3.13.../pipe/base/all_dimensions_quantum_graph_builder.py:126: TypeError
tests/test_run.py::RunTestCase::test_simple_qg_no_skip_existing_inputs
Stack Traces | 0.787s run time
self = <test_run.RunTestCase testMethod=test_simple_qg_no_skip_existing_inputs>

    def test_simple_qg_no_skip_existing_inputs(self):
        """Test for case when output data for one task already appears in
        the *input* collection, but no ``--extend-run`` or ``-skip-existing``
        option is present.
        """
        with DirectButlerRepo.make_temporary() as (helper, root):
            helper.add_task()
            helper.add_task()
            helper.insert_datasets("dataset_auto0")
            kwargs = self._make_run_args(
                "-b",
                root,
                "-i",
                helper.input_chain,
                "-o",
                "output",
                "--register-dataset-types",
                pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
            )
>           qg1 = script.qgraph(**kwargs)
                  ^^^^^^^^^^^^^^^^^^^^^^^

tests/test_run.py:301: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../cli/script/qgraph.py:309: in qgraph
    graph_builder = AllDimensionsQuantumGraphBuilder(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <lsst.pipe.base.all_dimensions_quantum_graph_builder.AllDimensionsQuantumGraphBuilder object at 0x7f791562f790>
pipeline_graph = PipelineGraph('', tasks={task_auto1, task_auto2})
butler = <lsst.daf.butler.direct_butler._direct_butler.DirectButler object at 0x7f7915df2850>
where = ''
dataset_query_constraint = <class 'lsst.pipe.base._datasetQueryConstraints._ALL'>
bind = None, data_id_tables = []
kwargs = {'clobber': False, 'input_collections': ('input_run',), 'output_run': 'output/20260528T234305Z', 'retry_missing_outputs_for': (), ...}

    def __init__(
        self,
        pipeline_graph: PipelineGraph,
        butler: Butler,
        *,
        where: str = "",
        dataset_query_constraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL,
        bind: Mapping[str, Any] | None = None,
        data_id_tables: Iterable[astropy.table.Table] = (),
        **kwargs: Any,
    ):
>       super().__init__(pipeline_graph, butler, **kwargs)
E       TypeError: QuantumGraphBuilder.__init__() got an unexpected keyword argument 'retry_missing_outputs_for'

.../test/lib/python3.13.../pipe/base/all_dimensions_quantum_graph_builder.py:126: TypeError
tests/test_run.py::RunTestCase::test_simple_qg_replace_run
Stack Traces | 0.787s run time
self = <test_run.RunTestCase testMethod=test_simple_qg_replace_run>

    def test_simple_qg_replace_run(self):
        """Test repeated execution of a trivial quantum graph with
        --replace-run.
        """
        with DirectButlerRepo.make_temporary() as (helper, root):
            helper.add_task()
            helper.insert_datasets("dataset_auto0")
            kwargs = self._make_run_args(
                "-b",
                root,
                "-i",
                helper.input_chain,
                "-o",
                "output",
                "--register-dataset-types",
                pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
            )
>           qg1 = script.qgraph(**kwargs)
                  ^^^^^^^^^^^^^^^^^^^^^^^

tests/test_run.py:534: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../cli/script/qgraph.py:309: in qgraph
    graph_builder = AllDimensionsQuantumGraphBuilder(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <lsst.pipe.base.all_dimensions_quantum_graph_builder.AllDimensionsQuantumGraphBuilder object at 0x7ff5651626c0>
pipeline_graph = PipelineGraph('', tasks={task_auto1})
butler = <lsst.daf.butler.direct_butler._direct_butler.DirectButler object at 0x7ff5651597d0>
where = ''
dataset_query_constraint = <class 'lsst.pipe.base._datasetQueryConstraints._ALL'>
bind = None, data_id_tables = []
kwargs = {'clobber': False, 'input_collections': ('input_run',), 'output_run': 'output/20260528T234306Z', 'retry_missing_outputs_for': (), ...}

    def __init__(
        self,
        pipeline_graph: PipelineGraph,
        butler: Butler,
        *,
        where: str = "",
        dataset_query_constraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL,
        bind: Mapping[str, Any] | None = None,
        data_id_tables: Iterable[astropy.table.Table] = (),
        **kwargs: Any,
    ):
>       super().__init__(pipeline_graph, butler, **kwargs)
E       TypeError: QuantumGraphBuilder.__init__() got an unexpected keyword argument 'retry_missing_outputs_for'

.../test/lib/python3.13.../pipe/base/all_dimensions_quantum_graph_builder.py:126: TypeError
tests/test_run.py::RunTestCase::test_simple_qg
Stack Traces | 0.801s run time
self = <test_run.RunTestCase testMethod=test_simple_qg>

    def test_simple_qg(self):
        """Test execution of a trivial quantum graph."""
        with DirectButlerRepo.make_temporary() as (helper, root):
            helper.add_task()
            helper.add_task()
            helper.insert_datasets("dataset_auto0")
            kwargs = self._make_run_args(
                "-b",
                root,
                "-i",
                helper.input_chain,
                "-o",
                "output",
                "--register-dataset-types",
                pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
            )
>           qg = script.qgraph(**kwargs)
                 ^^^^^^^^^^^^^^^^^^^^^^^

tests/test_run.py:108: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../cli/script/qgraph.py:309: in qgraph
    graph_builder = AllDimensionsQuantumGraphBuilder(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <lsst.pipe.base.all_dimensions_quantum_graph_builder.AllDimensionsQuantumGraphBuilder object at 0x7ff564f04550>
pipeline_graph = PipelineGraph('', tasks={task_auto1, task_auto2})
butler = <lsst.daf.butler.direct_butler._direct_butler.DirectButler object at 0x7ff588b6f050>
where = ''
dataset_query_constraint = <class 'lsst.pipe.base._datasetQueryConstraints._ALL'>
bind = None, data_id_tables = []
kwargs = {'clobber': False, 'input_collections': ('input_run',), 'output_run': 'output/20260528T234305Z', 'retry_missing_outputs_for': (), ...}

    def __init__(
        self,
        pipeline_graph: PipelineGraph,
        butler: Butler,
        *,
        where: str = "",
        dataset_query_constraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL,
        bind: Mapping[str, Any] | None = None,
        data_id_tables: Iterable[astropy.table.Table] = (),
        **kwargs: Any,
    ):
>       super().__init__(pipeline_graph, butler, **kwargs)
E       TypeError: QuantumGraphBuilder.__init__() got an unexpected keyword argument 'retry_missing_outputs_for'

.../test/lib/python3.13.../pipe/base/all_dimensions_quantum_graph_builder.py:126: TypeError
tests/test_run.py::RunTestCase::test_simple_qg_clobber
Stack Traces | 0.807s run time
self = <test_run.RunTestCase testMethod=test_simple_qg_clobber>

    def test_simple_qg_clobber(self):
        """Test for case when output data for one task already appears in
        the output RUN collection, and `--extend-run --clobber-outputs` is used
        to skip it.
        """
        with DirectButlerRepo.make_temporary() as (helper, root):
            helper.add_task()
            helper.insert_datasets("dataset_auto0")
            kwargs = self._make_run_args(
                "-b",
                root,
                "-i",
                helper.input_chain,
                "-o",
                "output",
                "--register-dataset-types",
                pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
            )
>           qg1 = script.qgraph(**kwargs)
                  ^^^^^^^^^^^^^^^^^^^^^^^

tests/test_run.py:468: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../cli/script/qgraph.py:309: in qgraph
    graph_builder = AllDimensionsQuantumGraphBuilder(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <lsst.pipe.base.all_dimensions_quantum_graph_builder.AllDimensionsQuantumGraphBuilder object at 0x7f7ba8a98d10>
pipeline_graph = PipelineGraph('', tasks={task_auto1})
butler = <lsst.daf.butler.direct_butler._direct_butler.DirectButler object at 0x7f7bc7f82650>
where = ''
dataset_query_constraint = <class 'lsst.pipe.base._datasetQueryConstraints._ALL'>
bind = None, data_id_tables = []
kwargs = {'clobber': False, 'input_collections': ('input_run',), 'output_run': 'output/20260528T234307Z', 'retry_missing_outputs_for': (), ...}

    def __init__(
        self,
        pipeline_graph: PipelineGraph,
        butler: Butler,
        *,
        where: str = "",
        dataset_query_constraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL,
        bind: Mapping[str, Any] | None = None,
        data_id_tables: Iterable[astropy.table.Table] = (),
        **kwargs: Any,
    ):
>       super().__init__(pipeline_graph, butler, **kwargs)
E       TypeError: QuantumGraphBuilder.__init__() got an unexpected keyword argument 'retry_missing_outputs_for'

.../test/lib/python3.13.../pipe/base/all_dimensions_quantum_graph_builder.py:126: TypeError
tests/test_run.py::RunTestCase::test_qg_partial_failure
Stack Traces | 0.879s run time
self = <test_run.RunTestCase testMethod=test_qg_partial_failure>

    def test_qg_partial_failure(self):
        """Test execution of a quantum graph where one quantum fails but others
        should continue.
        """
        with DirectButlerRepo.make_temporary("base.yaml") as (helper, root):
            helper.add_task(
                dimensions=["detector"], config=DynamicTestPipelineTaskConfig(fail_condition="detector=3")
            )
            helper.insert_datasets("dataset_auto0")
            kwargs = self._make_run_args(
                "-b",
                root,
                "-i",
                helper.input_chain,
                "-o",
                "output",
                "--register-dataset-types",
                pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
            )
>           qg = script.qgraph(**kwargs)
                 ^^^^^^^^^^^^^^^^^^^^^^^

tests/test_run.py:645: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../cli/script/qgraph.py:309: in qgraph
    graph_builder = AllDimensionsQuantumGraphBuilder(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <lsst.pipe.base.all_dimensions_quantum_graph_builder.AllDimensionsQuantumGraphBuilder object at 0x7f7914cc4770>
pipeline_graph = PipelineGraph('', tasks={task_auto1})
butler = <lsst.daf.butler.direct_butler._direct_butler.DirectButler object at 0x7f7914d8ded0>
where = ''
dataset_query_constraint = <class 'lsst.pipe.base._datasetQueryConstraints._ALL'>
bind = None, data_id_tables = []
kwargs = {'clobber': False, 'input_collections': ('input_run',), 'output_run': 'output/20260528T234303Z', 'retry_missing_outputs_for': (), ...}

    def __init__(
        self,
        pipeline_graph: PipelineGraph,
        butler: Butler,
        *,
        where: str = "",
        dataset_query_constraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL,
        bind: Mapping[str, Any] | None = None,
        data_id_tables: Iterable[astropy.table.Table] = (),
        **kwargs: Any,
    ):
>       super().__init__(pipeline_graph, butler, **kwargs)
E       TypeError: QuantumGraphBuilder.__init__() got an unexpected keyword argument 'retry_missing_outputs_for'

.../test/lib/python3.13.../pipe/base/all_dimensions_quantum_graph_builder.py:126: TypeError
tests/test_run.py::RunTestCase::test_simple_qg_retry_missing_outputs_for_not_skipped
Stack Traces | 0.885s run time
self = <test_run.RunTestCase testMethod=test_simple_qg_retry_missing_outputs_for_not_skipped>

    def test_simple_qg_retry_missing_outputs_for_not_skipped(self):
        """With --retry-missing-outputs-for, a task whose metadata exists
        in skip_existing_in but whose outputs do not is not skipped.
        """
        with DirectButlerRepo.make_temporary() as (helper, root):
            helper.add_task()
            helper.insert_datasets("dataset_auto0")
            kwargs = self._make_run_args(
                "-b",
                root,
                "-i",
                helper.input_chain,
                "-o",
                "output",
                "--register-dataset-types",
                pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
            )
>           qg1 = script.qgraph(**kwargs)
                  ^^^^^^^^^^^^^^^^^^^^^^^

tests/test_run.py:692: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../cli/script/qgraph.py:309: in qgraph
    graph_builder = AllDimensionsQuantumGraphBuilder(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <lsst.pipe.base.all_dimensions_quantum_graph_builder.AllDimensionsQuantumGraphBuilder object at 0x7f7915355650>
pipeline_graph = PipelineGraph('', tasks={task_auto1})
butler = <lsst.daf.butler.direct_butler._direct_butler.DirectButler object at 0x7f7915d552d0>
where = ''
dataset_query_constraint = <class 'lsst.pipe.base._datasetQueryConstraints._ALL'>
bind = None, data_id_tables = []
kwargs = {'clobber': False, 'input_collections': ('input_run',), 'output_run': 'output/20260528T234307Z', 'retry_missing_outputs_for': (), ...}

    def __init__(
        self,
        pipeline_graph: PipelineGraph,
        butler: Butler,
        *,
        where: str = "",
        dataset_query_constraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL,
        bind: Mapping[str, Any] | None = None,
        data_id_tables: Iterable[astropy.table.Table] = (),
        **kwargs: Any,
    ):
>       super().__init__(pipeline_graph, butler, **kwargs)
E       TypeError: QuantumGraphBuilder.__init__() got an unexpected keyword argument 'retry_missing_outputs_for'

.../test/lib/python3.13.../pipe/base/all_dimensions_quantum_graph_builder.py:126: TypeError
tests/test_run.py::RunTestCase::test_simple_qg_extend_run
Stack Traces | 0.952s run time
self = <test_run.RunTestCase testMethod=test_simple_qg_extend_run>

    def test_simple_qg_extend_run(self):
        """Test for case when output data for one task already appears in
        the output RUN collection, and `--extend-run` is used to skip it.
        """
        with DirectButlerRepo.make_temporary() as (helper, root):
            helper.add_task()
            helper.insert_datasets("dataset_auto0")
            kwargs = self._make_run_args(
                "-b",
                root,
                "-i",
                helper.input_chain,
                "-o",
                "output",
                "--register-dataset-types",
                pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
            )
>           qg1 = script.qgraph(**kwargs)
                  ^^^^^^^^^^^^^^^^^^^^^^^

tests/test_run.py:409: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../cli/script/qgraph.py:309: in qgraph
    graph_builder = AllDimensionsQuantumGraphBuilder(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <lsst.pipe.base.all_dimensions_quantum_graph_builder.AllDimensionsQuantumGraphBuilder object at 0x7f7914587d10>
pipeline_graph = PipelineGraph('', tasks={task_auto1})
butler = <lsst.daf.butler.direct_butler._direct_butler.DirectButler object at 0x7f7914779c50>
where = ''
dataset_query_constraint = <class 'lsst.pipe.base._datasetQueryConstraints._ALL'>
bind = None, data_id_tables = []
kwargs = {'clobber': False, 'input_collections': ('input_run',), 'output_run': 'output/20260528T234304Z', 'retry_missing_outputs_for': (), ...}

    def __init__(
        self,
        pipeline_graph: PipelineGraph,
        butler: Butler,
        *,
        where: str = "",
        dataset_query_constraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL,
        bind: Mapping[str, Any] | None = None,
        data_id_tables: Iterable[astropy.table.Table] = (),
        **kwargs: Any,
    ):
>       super().__init__(pipeline_graph, butler, **kwargs)
E       TypeError: QuantumGraphBuilder.__init__() got an unexpected keyword argument 'retry_missing_outputs_for'

.../test/lib/python3.13.../pipe/base/all_dimensions_quantum_graph_builder.py:126: TypeError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@hsinfang hsinfang force-pushed the tickets/DM-54879 branch 2 times, most recently from 5f0c018 to 4a242aa Compare May 13, 2026 17:16
hsinfang added 2 commits May 28, 2026 16:39
Expose the new `retry_missing_outputs_for` parameter of `QuantumGraphBuilder`
as a CLI option. When specified alongside `--skip-existing-in`, the named
task labels use their science outputs as the completion signal instead
of task metadata.

script/run.py is not modified because commands.run passes all Click
options as **kwargs to both script.qgraph (which consumes the new
option at graph-build time) and script.run (which absorbs unrecognized
kwargs, like the existing dataset_query_constraint).
@hsinfang hsinfang force-pushed the tickets/DM-54879 branch from 4a242aa to 0b65260 Compare May 28, 2026 23:41
@hsinfang
Copy link
Copy Markdown
Contributor Author

ignore_metadata_for is renamed to retry_missing_outputs_for

@hsinfang hsinfang changed the title DM-54879: Add --ignore-existing-metadata-for for when upstream outputs are selectively retained DM-54879: Add --retry-missing-outputs-for for when upstream outputs are selectively retained May 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants