diff --git a/doc/designs/admm_user_api_automation_design.md b/doc/designs/admm_user_api_automation_design.md index 0ef9a7bbf..f17caf8e4 100644 --- a/doc/designs/admm_user_api_automation_design.md +++ b/doc/designs/admm_user_api_automation_design.md @@ -213,6 +213,24 @@ manual name formatting", not "skip the find_component step". `consensus_vars_creator` produces the same `varprob_dict` as the string-based form. +#### Findings during implementation + +- **Pyomo VarData weakref**: a `VarData` holds its parent block via + a weakref, so the wrapper's `.name` lookup only resolves to a real + name if the before-wrap scenario the Var was taken from is still + alive at wrapper-construction time. Callers that build a + throwaway before-wrap scenario inside a helper must keep it alive + across the wrapper call, or snapshot the names eagerly before + letting it go out of scope. Documented in the helper docstring; + test_admmWrapper.py's integration test has to hold a + `live_scenarios` list across the wrapper call. +- **Normalization location**: rather than pre-normalizing inside + `assign_variable_probs` (as originally sketched), implementation + normalizes once in `__init__` so `_consensus_vars_number_creator` + and the B.2 / B.3 plumbing all see canonical string identifiers. + Helper: `_admm_normalize_consensus_vars(consensus_vars, *, + tuple_form)` in `mpisppy/utils/admmWrapper.py`. + ### B.2 Auto-merge first-stage Vars into `consensus_vars` #### Today's contract @@ -260,6 +278,35 @@ already-merged list *and* has the hooks defined, to avoid a double-merge — easiest is to de-duplicate the merged list by Var identity. +#### Findings during implementation + +- **First-stage Vars are per-ADMM-subproblem, not global** + (significant divergence from the original "append to every + subproblem entry" wording above). In stoch_distr each region + owns its own factory production decisions, so `first_stage_varlist` + returns *different* Var names for different ADMM subproblems + even though they share Var-name conventions. The implemented + helper takes a per-subproblem dict, not a flat list: + `_merge_first_stage_into_consensus_vars(consensus_vars, + first_stage_var_names_per_sub, root_stage=1)`. De-dup is + string-name based (no Var-identity comparison needed once the + consensus_vars dict is normalized). +- **Per-rank gathering**: every rank must end up with the same + `consensus_vars` so `_consensus_vars_number_creator` and + `assign_variable_probs` stay consistent. `Stoch_AdmmWrapper` + pulls first-stage Var names from already-built local before-wrap + scenarios where possible, then builds one *probe* before-wrap + scenario per ADMM subproblem the local rank does not host. + `AdmmBundler` always builds one probe per ADMM subproblem in + `__init__` because it pre-builds no before-wrap scenarios there. +- **Vocabulary glossary established**: the prose for B.2's + docstrings forced us to settle on consistent terms across the + ADMM family — before-wrap scenario, wrapped scenario, wrap (verb, + narrow scope), ADMM subproblem, first-stage Vars vs. root-node + Vars. The full glossary now lives in the module docstring at the + top of `mpisppy/utils/admmWrapper.py` and is referenced by header + comments at the top of the three admm test files. + ### B.3 Optional surrogate / EF-supplemental hooks #### Today's contract @@ -306,6 +353,28 @@ message). - Test: a synthetic scenario with a surrogate Var; verify the attached node carries `surrogate_nonant_list` correctly. +#### Findings during implementation + +- **Bundle ROOT does not propagate constituent surrogates.** + `AdmmBundler` builds the bundle's own ROOT by flattening every + consensus Var into a fresh `attach_root_node(bundle, 0, + nonantlist)` call with no advanced kwargs. The surrogate / + EF-supplemental Vars instead live on the *per-constituent* + before-wrap scenario root nodes — the EF construction reads them + from there when assembling the bundle. The B.3 plumbing + therefore forwards the advanced lists only to the constituent + attach_root_node calls inside `_process_scenario`, not to the + bundle-level attach. This is correct (EF reads from + constituents) but worth flagging because a reader might expect + the bundle ROOT to carry the surrogate annotation directly. + Bundler test verifies forwarding via a spy on + `sputils.attach_root_node`. +- **Shared discovery helper**: discovery and validation moved into + `_discover_first_stage_hooks(module)` in `mpisppy/generic/admm.py` + so `setup_stoch_admm` and `setup_stoch_admm_with_bundles` no + longer duplicate the both-or-neither / advanced-needs-core + checks. + ### Open questions None significant. diff --git a/doc/src/generic_admm.rst b/doc/src/generic_admm.rst index b17784bde..f0b834020 100644 --- a/doc/src/generic_admm.rst +++ b/doc/src/generic_admm.rst @@ -426,13 +426,49 @@ inside ``scenario_creator`` so the hook can find it. ``RuntimeError`` at ``setup_stoch_admm`` time. Mixing the hooks with a manual ``attach_root_node`` call also raises. +Advanced first-stage hooks (optional) +""""""""""""""""""""""""""""""""""""""" + +``sputils.attach_root_node`` accepts two further optional parameters, +``surrogate_nonant_list`` and ``nonant_ef_suppl_list`` (see +:ref:`surrogate_nonant_list` and :ref:`ef_supplement_list` for what +each does), for problems that need to mark some first-stage Vars as +surrogates (EF skips their nonant equality) or as EF-supplemental +nonants (extra Vars carried through the EF construction). If your +problem needs either, define the corresponding optional module-level +hook: + +.. code-block:: python + + def first_stage_surrogate_nonant_list(scenario): + """Optional. Forwarded to attach_root_node's surrogate_nonant_list.""" + return scenario._surrogate_nonants # stashed in scenario_creator + + def first_stage_nonant_ef_suppl_list(scenario): + """Optional. Forwarded to attach_root_node's nonant_ef_suppl_list.""" + return scenario._ef_suppl_nonants + +Each advanced hook is independent of the other — defining either one +alone is fine — but both depend on the two core hooks +(``first_stage_cost`` and ``first_stage_varlist``) also being defined, +because there is nothing for the wrapper to attach the advanced lists +onto otherwise. Defining an advanced hook without the core hooks +raises ``RuntimeError`` at ``setup_stoch_admm`` time. + +On the legacy path (no core hooks), pass ``surrogate_nonant_list`` and +``nonant_ef_suppl_list`` directly to your own ``sputils.attach_root_node`` +call inside ``scenario_creator``; the wrapper inherits whatever you +attached. + First-stage attachment via manual ``attach_root_node`` (legacy) """"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" If you omit both hooks, ``scenario_creator`` must itself call ``sputils.attach_root_node`` with the original problem's first-stage -cost and varlist. Skipping the call (when no hooks are defined) -raises ``RuntimeError`` with a message pointing at both options. +cost and varlist (and ``surrogate_nonant_list`` / +``nonant_ef_suppl_list`` if you need them). Skipping the call (when +no hooks are defined) raises ``RuntimeError`` with a message pointing +at both options. This path is preserved for backward compatibility with model modules written before the hooks existed (and for direct uses of diff --git a/doc/src/scenario_creator.rst b/doc/src/scenario_creator.rst index 5e6b7efea..1048e04f8 100644 --- a/doc/src/scenario_creator.rst +++ b/doc/src/scenario_creator.rst @@ -63,6 +63,8 @@ equally likely, you can avoid a warning by assigning the string "uniform" to the ``_mpisppy_probability`` attribute of the scenario model. +.. _ef_supplement_list: + EF Supplement List ------------------ @@ -72,6 +74,8 @@ constraints when an EF is formed, either to solve the EF or when bundles are formed. For some problems, with the appropriate solver, adding redundant nonanticipativity constraints for auxiliary variables to the bundle/EF will result in a (much) smaller pre-solved model. +.. _surrogate_nonant_list: + Surrogate Nonant List --------------------- diff --git a/examples/stoch_distr/stoch_distr.py b/examples/stoch_distr/stoch_distr.py index 011aee494..ff2a5c481 100644 --- a/examples/stoch_distr/stoch_distr.py +++ b/examples/stoch_distr/stoch_distr.py @@ -268,19 +268,9 @@ def consensus_vars_creator(admm_subproblem_names, stoch_scenario_name, inter_reg if admm_subproblem_name not in consensus_vars: print(f"WARNING: {admm_subproblem_name} has no consensus_vars") consensus_vars[admm_subproblem_name] = list() - - # Now add the original problem's first-stage variables (factory - # production decisions) to each subproblem's consensus list. - # Read them directly from the first_stage_varlist hook — - # scenario_creator no longer attaches the root node, so - # model._mpisppy_node_list does not exist at this point. - # The original first stage is stage 1 in the augmented tree. - for admm_subproblem_name in admm_subproblem_names: - admm_stoch_subproblem_scenario_name = combining_names(admm_subproblem_name,stoch_scenario_name) - model = scenario_creator(admm_stoch_subproblem_scenario_name, inter_region_dict=inter_region_dict, cfg=cfg, data_params=data_params, all_nodes_dict=all_nodes_dict) - for var in first_stage_varlist(model): - if (var.name, 1) not in consensus_vars[admm_subproblem_name]: - consensus_vars[admm_subproblem_name].append((var.name, 1)) + # Each ADMM subproblem's first-stage Vars are added to its + # consensus list by the wrapper (Stoch_AdmmWrapper / AdmmBundler) + # at construction time, driven by the first_stage_varlist hook. return consensus_vars diff --git a/mpisppy/generic/admm.py b/mpisppy/generic/admm.py index 271abcaa3..55398fb45 100644 --- a/mpisppy/generic/admm.py +++ b/mpisppy/generic/admm.py @@ -146,6 +146,64 @@ def setup_admm(module, cfg, n_cylinders): all_scenario_names, None) +def _discover_first_stage_hooks(module): + """Discover the four optional first-stage hooks on a stoch-admm + model module and validate the both-or-neither contract. + + The two core hooks (first_stage_cost, first_stage_varlist) must + be defined together or both omitted; mixing them is an error. + + The two advanced hooks (first_stage_surrogate_nonant_list, + first_stage_nonant_ef_suppl_list) are each independent of the + other; defining either alone is fine, but only when the two + core hooks are also defined -- they forward to + sputils.attach_root_node's optional surrogate_nonant_list / + nonant_ef_suppl_list parameters and have nothing to attach to + otherwise. + + Returns: + dict: keyword arguments suitable for forwarding to + Stoch_AdmmWrapper / AdmmBundler. Hooks that the module did + not define are passed as None. + """ + first_stage_cost = getattr(module, "first_stage_cost", None) + first_stage_varlist = getattr(module, "first_stage_varlist", None) + if (first_stage_cost is None) != (first_stage_varlist is None): + present = "first_stage_cost" if first_stage_cost is not None else "first_stage_varlist" + missing = "first_stage_varlist" if first_stage_cost is not None else "first_stage_cost" + raise RuntimeError( + f"Module {module.__name__!r} defines {present} but not " + f"{missing}. These hooks must be defined together " + f"(or both omitted). See doc/src/generic_admm.rst." + ) + + first_stage_surrogate_nonant_list = getattr( + module, "first_stage_surrogate_nonant_list", None) + first_stage_nonant_ef_suppl_list = getattr( + module, "first_stage_nonant_ef_suppl_list", None) + advanced = { + "first_stage_surrogate_nonant_list": first_stage_surrogate_nonant_list, + "first_stage_nonant_ef_suppl_list": first_stage_nonant_ef_suppl_list, + } + present_advanced = [n for n, h in advanced.items() if h is not None] + if present_advanced and first_stage_cost is None: + raise RuntimeError( + f"Module {module.__name__!r} defines the advanced hook(s) " + f"{present_advanced} but not first_stage_cost / " + f"first_stage_varlist. Advanced hooks forward to " + f"sputils.attach_root_node's optional parameters and only " + f"make sense when the core hooks are also defined. See " + f"doc/src/generic_admm.rst." + ) + + return { + "first_stage_cost": first_stage_cost, + "first_stage_varlist": first_stage_varlist, + "first_stage_surrogate_nonant_list": first_stage_surrogate_nonant_list, + "first_stage_nonant_ef_suppl_list": first_stage_nonant_ef_suppl_list, + } + + def setup_stoch_admm(module, cfg, n_cylinders): """Create Stoch_AdmmWrapper for stochastic ADMM. @@ -165,19 +223,7 @@ def setup_stoch_admm(module, cfg, n_cylinders): consensus_vars = module.consensus_vars_creator( admm_subproblem_names, stoch_scenario_name, **scenario_creator_kwargs) - # Discover optional first-stage hooks on the module. Both must be - # defined together or both omitted; mixing produces a clear error - # here (rather than half-migrating silently). - first_stage_cost = getattr(module, "first_stage_cost", None) - first_stage_varlist = getattr(module, "first_stage_varlist", None) - if (first_stage_cost is None) != (first_stage_varlist is None): - present = "first_stage_cost" if first_stage_cost is not None else "first_stage_varlist" - missing = "first_stage_varlist" if first_stage_cost is not None else "first_stage_cost" - raise RuntimeError( - f"Module {module.__name__!r} defines {present} but not " - f"{missing}. These hooks must be defined together " - f"(or both omitted). See doc/src/generic_admm.rst." - ) + first_stage_hooks = _discover_first_stage_hooks(module) admm = Stoch_AdmmWrapper( options={}, @@ -191,8 +237,7 @@ def setup_stoch_admm(module, cfg, n_cylinders): mpicomm=MPI.COMM_WORLD, scenario_creator_kwargs=scenario_creator_kwargs, BFs=cfg.get("branching_factors"), - first_stage_cost=first_stage_cost, - first_stage_varlist=first_stage_varlist, + **first_stage_hooks, ) # Store on cfg as plain attributes (Pyomo Config can't handle these types) @@ -232,18 +277,7 @@ def setup_stoch_admm_with_bundles(module, cfg, n_cylinders): consensus_vars = module.consensus_vars_creator( admm_subproblem_names, stoch_scenario_name, **scenario_creator_kwargs) - # Discover optional first-stage hooks on the module. Same - # both-or-neither contract as the non-bundled path. - first_stage_cost = getattr(module, "first_stage_cost", None) - first_stage_varlist = getattr(module, "first_stage_varlist", None) - if (first_stage_cost is None) != (first_stage_varlist is None): - present = "first_stage_cost" if first_stage_cost is not None else "first_stage_varlist" - missing = "first_stage_varlist" if first_stage_cost is not None else "first_stage_cost" - raise RuntimeError( - f"Module {module.__name__!r} defines {present} but not " - f"{missing}. These hooks must be defined together " - f"(or both omitted). See doc/src/generic_admm.rst." - ) + first_stage_hooks = _discover_first_stage_hooks(module) bundler = AdmmBundler( module=module, @@ -254,8 +288,7 @@ def setup_stoch_admm_with_bundles(module, cfg, n_cylinders): combining_fn=module.combining_names, split_fn=module.split_admm_stoch_subproblem_scenario_name, scenario_creator_kwargs=scenario_creator_kwargs, - first_stage_cost=first_stage_cost, - first_stage_varlist=first_stage_varlist, + **first_stage_hooks, ) bundle_names = bundler.bundle_names_creator() diff --git a/mpisppy/tests/test_admmWrapper.py b/mpisppy/tests/test_admmWrapper.py index ea810083f..489169187 100644 --- a/mpisppy/tests/test_admmWrapper.py +++ b/mpisppy/tests/test_admmWrapper.py @@ -7,8 +7,21 @@ # full copyright and license information. ############################################################################### # TBD: make these tests less fragile +"""Tests for AdmmWrapper. + +Phase references (A, B.1, B.2, ...) in this file's docstrings track the +phased plan in doc/designs/admm_user_api_automation_design.md. +For the ADMM vocabulary used below (before-wrap scenario, wrapped +scenario, wrap, ADMM subproblem, ...), see the module docstring of +mpisppy.utils.admmWrapper. +""" import unittest +import pyomo.environ as pyo import mpisppy.utils.admmWrapper as admmWrapper +from mpisppy.utils.admmWrapper import ( + _admm_normalize_consensus_vars, + _merge_first_stage_into_consensus_vars, +) import mpisppy.tests.examples.distr.distr as distr from mpisppy.utils import config from mpisppy.tests.utils import get_solver @@ -196,5 +209,142 @@ def test_values(self): os.chdir(original_dir) +class TestAdmmConsensusVarsNormalize(unittest.TestCase): + """B.1: consensus_vars accepts Pyomo Var/VarData as well as strings.""" + + def _model_with_named_vars(self): + m = pyo.ConcreteModel() + m.x = pyo.Var() + m.y = pyo.Var([("A", "B"), ("B", "C")]) + return m + + def test_flat_form_strings_unchanged(self): + cv = {"Sub1": ["x", "y[('A', 'B')]"], "Sub2": ["x"]} + out = _admm_normalize_consensus_vars(cv, tuple_form=False) + self.assertEqual(out, cv) + self.assertIsNot(out, cv) # new dict, not aliased + + def test_flat_form_var_objects(self): + m = self._model_with_named_vars() + cv = {"Sub1": [m.x, m.y[("A", "B")]], "Sub2": [m.x]} + out = _admm_normalize_consensus_vars(cv, tuple_form=False) + self.assertEqual(out["Sub1"], [m.x.name, m.y[("A", "B")].name]) + self.assertEqual(out["Sub2"], [m.x.name]) + + def test_flat_form_mixed(self): + m = self._model_with_named_vars() + cv = {"Sub1": ["x", m.y[("B", "C")]]} + out = _admm_normalize_consensus_vars(cv, tuple_form=False) + self.assertEqual(out["Sub1"], ["x", m.y[("B", "C")].name]) + + def test_tuple_form_strings_unchanged(self): + cv = {"Sub1": [("x", 1), ("y[('A', 'B')]", 2)]} + out = _admm_normalize_consensus_vars(cv, tuple_form=True) + self.assertEqual(out, cv) + + def test_tuple_form_var_objects(self): + m = self._model_with_named_vars() + cv = {"Sub1": [(m.x, 1), (m.y[("A", "B")], 2)]} + out = _admm_normalize_consensus_vars(cv, tuple_form=True) + self.assertEqual(out["Sub1"], [(m.x.name, 1), (m.y[("A", "B")].name, 2)]) + + def test_unsupported_type_raises(self): + with self.assertRaises(TypeError): + _admm_normalize_consensus_vars({"Sub1": [42]}, tuple_form=False) + + +class TestAdmmWrapperVarConsensusInputs(unittest.TestCase): + """B.1: AdmmWrapper accepts Pyomo Var objects in consensus_vars + and produces the same varprob_dict as the equivalent string form + (no behavioral change other than the relaxed input type).""" + + def _cfg(self, num_scens): + cfg = config.Config() + cfg.num_scens_required() + cfg.num_scens = num_scens + return cfg + + def _make_admm_from(self, consensus_vars, cfg): + options = {} + all_scenario_names = distr.scenario_names_creator(num_scens=cfg.num_scens) + return admmWrapper.AdmmWrapper( + options, + all_scenario_names, + distr.scenario_creator, + consensus_vars, + n_cylinders=1, + mpicomm=MPI.COMM_WORLD, + scenario_creator_kwargs=distr.kw_creator(cfg), + verbose=False, + ) + + def test_var_input_matches_string_input(self): + cfg = self._cfg(3) + consensus_vars_str = distr.consensus_vars_creator(cfg.num_scens) + + # Build a Var-flavored consensus_vars by resolving each name on + # the ADMM subproblem's own before-wrap scenario. Pyomo + # VarData holds its parent block via a weakref, so we must + # keep the source before-wrap scenarios alive until the + # wrapper has snapshotted their .name attributes. + kw = distr.kw_creator(cfg) + live_scenarios = [] + consensus_vars_var = {} + for sub, entries in consensus_vars_str.items(): + scen = distr.scenario_creator(sub, **kw) + live_scenarios.append(scen) + resolved = [] + for vstr in entries: + v = scen.find_component(vstr) + resolved.append(v if v is not None else vstr) + consensus_vars_var[sub] = resolved + + admm_str = self._make_admm_from(consensus_vars_str, self._cfg(3)) + admm_var = self._make_admm_from(consensus_vars_var, self._cfg(3)) + del live_scenarios + + # consensus_vars on the wrapper should be string-equal post-normalization + self.assertEqual(admm_str.consensus_vars, admm_var.consensus_vars) + # ...and probability bookkeeping should match (compare by probability + # values; the underlying Var id()s differ across constructor calls). + for sname in admm_str.local_scenarios: + probs_str = [p for (_, p) in admm_str.var_prob_list(admm_str.local_scenarios[sname])] + probs_var = [p for (_, p) in admm_var.var_prob_list(admm_var.local_scenarios[sname])] + self.assertEqual(probs_str, probs_var, f"mismatch for {sname}") + + +class TestMergeFirstStageIntoConsensusVars(unittest.TestCase): + """B.2: helper that appends each ADMM subproblem's first-stage Var + names to its consensus_vars entry, with per-subproblem semantics + and de-dup.""" + + def test_merge_basic(self): + cv = {"A": [("x", 2)], "B": [("y", 2)]} + fs = {"A": ["fsA"], "B": ["fsB"]} + out = _merge_first_stage_into_consensus_vars(cv, fs, root_stage=1) + self.assertEqual(out["A"], [("x", 2), ("fsA", 1)]) + self.assertEqual(out["B"], [("y", 2), ("fsB", 1)]) + + def test_merge_dedup(self): + cv = {"A": [("x", 2), ("fsA", 1)]} + fs = {"A": ["fsA"]} + out = _merge_first_stage_into_consensus_vars(cv, fs, root_stage=1) + self.assertEqual(out["A"], [("x", 2), ("fsA", 1)]) + + def test_merge_subs_with_different_first_stage(self): + cv = {"R1": [("z", 2)], "R2": [("z", 2)]} + fs = {"R1": ["y[F1_1]", "y[F1_2]"], "R2": ["y[F2_1]"]} + out = _merge_first_stage_into_consensus_vars(cv, fs, root_stage=1) + self.assertEqual(out["R1"], [("z", 2), ("y[F1_1]", 1), ("y[F1_2]", 1)]) + self.assertEqual(out["R2"], [("z", 2), ("y[F2_1]", 1)]) + + def test_merge_missing_sub_is_noop(self): + cv = {"A": [("x", 2)], "B": [("y", 2)]} + fs = {"A": ["fsA"]} # B absent + out = _merge_first_stage_into_consensus_vars(cv, fs, root_stage=1) + self.assertEqual(out["A"], [("x", 2), ("fsA", 1)]) + self.assertEqual(out["B"], [("y", 2)]) + + if __name__ == '__main__': unittest.main() diff --git a/mpisppy/tests/test_admm_bundler.py b/mpisppy/tests/test_admm_bundler.py index dc9986610..954a8850c 100644 --- a/mpisppy/tests/test_admm_bundler.py +++ b/mpisppy/tests/test_admm_bundler.py @@ -6,7 +6,17 @@ # All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for # full copyright and license information. ############################################################################### +"""Tests for AdmmBundler. + +Phase references (A, B.1, B.2, ...) in this file's docstrings track the +phased plan in doc/designs/admm_user_api_automation_design.md. +For the ADMM vocabulary used below (before-wrap scenario, wrapped +scenario, wrap, ADMM subproblem, ...), see the module docstring of +mpisppy.utils.admmWrapper. +""" +import types import unittest +import pyomo.environ as pyo import mpisppy.tests.examples.stoch_distr.stoch_distr as stoch_distr from mpisppy.utils.admm_bundler import AdmmBundler from mpisppy.utils import config @@ -190,6 +200,172 @@ def test_var_prob_consistent_lengths(self): ) +class TestAdmmBundlerB2AutoMerge(unittest.TestCase): + """B.2: AdmmBundler auto-merges each admm subproblem's own + first-stage Var names into its consensus_vars entry when the + first_stage_varlist hook is supplied.""" + + def _build_synthetic_module(self): + """A tiny module whose scenario_creator gives each admm sub its + own first-stage Var (fs_A or fs_B).""" + + def scenario_creator(sname, **kwargs): + parts = sname.split("__ADMM__") + admm_part = parts[1] + m = pyo.ConcreteModel() + if admm_part == "A": + m.x = pyo.Var(bounds=(0, 1)) + m.fs_A = pyo.Var(bounds=(0, 1)) + m._fs_vars = [m.fs_A] + m.FirstStageCost = pyo.Expression(expr=m.fs_A) + m.obj = pyo.Objective(expr=m.x + m.fs_A, sense=pyo.minimize) + else: + m.y = pyo.Var(bounds=(0, 1)) + m.fs_B = pyo.Var(bounds=(0, 1)) + m._fs_vars = [m.fs_B] + m.FirstStageCost = pyo.Expression(expr=m.fs_B) + m.obj = pyo.Objective(expr=m.y + m.fs_B, sense=pyo.minimize) + return m + + mod = types.SimpleNamespace() + mod.scenario_creator = scenario_creator + return mod + + def test_per_subproblem_first_stage_merge(self): + mod = self._build_synthetic_module() + admm_subs = ["A", "B"] + stoch_names = ["S1", "S2"] + consensus_vars = {"A": [("x", 1)], "B": [("y", 1)]} + + def combining(sub, stoch): + return f"ADMM__ADMM__{sub}__ADMM__{stoch}" + + def split(name): + parts = name.split("__ADMM__") + return parts[1], parts[2] + + bundler = AdmmBundler( + module=mod, + scenarios_per_bundle=len(stoch_names), + admm_subproblem_names=admm_subs, + stoch_scenario_names=stoch_names, + consensus_vars=consensus_vars, + combining_fn=combining, + split_fn=split, + first_stage_cost=lambda s: s.FirstStageCost, + first_stage_varlist=lambda s: s._fs_vars, + ) + self.assertIn(("fs_A", 1), bundler.consensus_vars["A"]) + self.assertNotIn(("fs_B", 1), bundler.consensus_vars["A"]) + self.assertIn(("fs_B", 1), bundler.consensus_vars["B"]) + self.assertNotIn(("fs_A", 1), bundler.consensus_vars["B"]) + + +class TestAdmmBundlerB3AdvancedHooks(unittest.TestCase): + """B.3: AdmmBundler accepts first_stage_surrogate_nonant_list / + first_stage_nonant_ef_suppl_list and forwards them to + sputils.attach_root_node when wrap builds each bundle's + constituent before-wrap scenarios.""" + + def _build_module_with_surrogate(self): + def scenario_creator(sname, **kwargs): + parts = sname.split("__ADMM__") + admm_part = parts[1] + m = pyo.ConcreteModel() + if admm_part == "A": + m.x = pyo.Var(bounds=(0, 1)) + own = m.x + else: + m.y = pyo.Var(bounds=(0, 1)) + own = m.y + m.fs = pyo.Var(bounds=(0, 1)) + m.z = pyo.Var(bounds=(0, 1)) # surrogate + m.e = pyo.Var(bounds=(0, 1)) # EF-supplemental + m.FirstStageCost = pyo.Expression(expr=m.fs) + m.obj = pyo.Objective(expr=own + m.fs, sense=pyo.minimize) + m._fs_vars = [m.fs] + m._surrogate_vars = [m.z] + m._ef_suppl_vars = [m.e] + return m + + mod = types.SimpleNamespace(scenario_creator=scenario_creator) + return mod + + def _common_kwargs(self): + admm_subs = ["A", "B"] + stoch_names = ["S1", "S2"] + consensus_vars = {"A": [("x", 1)], "B": [("y", 1)]} + + def combining(sub, stoch): + return f"ADMM__ADMM__{sub}__ADMM__{stoch}" + + def split(name): + parts = name.split("__ADMM__") + return parts[1], parts[2] + + return { + "scenarios_per_bundle": len(stoch_names), + "admm_subproblem_names": admm_subs, + "stoch_scenario_names": stoch_names, + "consensus_vars": consensus_vars, + "combining_fn": combining, + "split_fn": split, + } + + def test_advanced_hooks_forwarded_to_attach_root_node(self): + """When the bundler wraps each constituent before-wrap scenario + it should call sputils.attach_root_node with the advanced + kwargs supplied by the hooks. (The bundle then builds a + separate ROOT for PH consumption that flattens all consensus + Vars; surrogates/ef_suppl live on the per-constituent roots + the EF reads from when assembling.)""" + from unittest import mock + mod = self._build_module_with_surrogate() + bundler = AdmmBundler( + module=mod, + first_stage_cost=lambda s: s.FirstStageCost, + first_stage_varlist=lambda s: s._fs_vars, + first_stage_surrogate_nonant_list=lambda s: s._surrogate_vars, + first_stage_nonant_ef_suppl_list=lambda s: s._ef_suppl_vars, + **self._common_kwargs(), + ) + bundle_names = bundler.bundle_names_creator() + + import mpisppy.utils.sputils as sputils + real_attach = sputils.attach_root_node + calls_with_advanced = [] + def spy(*args, **kwargs): + if "surrogate_nonant_list" in kwargs or "nonant_ef_suppl_list" in kwargs: + calls_with_advanced.append(kwargs) + return real_attach(*args, **kwargs) + with mock.patch.object(sputils, "attach_root_node", side_effect=spy): + bundler.scenario_creator(bundle_names[0]) + + # Expect one such call per constituent before-wrap scenario in + # the bundle (one ADMM subproblem * num stoch scens). + self.assertEqual( + len(calls_with_advanced), + len(self._common_kwargs()["stoch_scenario_names"]), + f"expected one advanced-kwarg attach_root_node call per " + f"constituent; got {calls_with_advanced}") + for kwargs in calls_with_advanced: + self.assertIn("surrogate_nonant_list", kwargs) + self.assertIn("nonant_ef_suppl_list", kwargs) + + def test_advanced_hook_without_core_errors(self): + mod = self._build_module_with_surrogate() + with self.assertRaises(RuntimeError) as cm: + AdmmBundler( + module=mod, + # no first_stage_cost / first_stage_varlist + first_stage_surrogate_nonant_list=lambda s: s._surrogate_vars, + **self._common_kwargs(), + ) + msg = str(cm.exception) + self.assertIn("advanced hook", msg) + self.assertIn("first_stage_cost", msg) + + class TestAdmmArgs(unittest.TestCase): """Test admm_args, _count_cylinders, and _check_admm_compatibility.""" diff --git a/mpisppy/tests/test_stoch_admmWrapper.py b/mpisppy/tests/test_stoch_admmWrapper.py index 0d0f6d84a..318eee2c6 100644 --- a/mpisppy/tests/test_stoch_admmWrapper.py +++ b/mpisppy/tests/test_stoch_admmWrapper.py @@ -6,6 +6,14 @@ # All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for # full copyright and license information. ############################################################################### +"""Tests for Stoch_AdmmWrapper. + +Phase references (A, B.1, B.2, ...) in this file's docstrings track the +phased plan in doc/designs/admm_user_api_automation_design.md. +For the ADMM vocabulary used below (before-wrap scenario, wrapped +scenario, wrap, ADMM subproblem, ...), see the module docstring of +mpisppy.utils.admmWrapper. +""" import unittest import subprocess import sys @@ -651,9 +659,11 @@ def test_hooks_path_attaches_root_node(self): self.assertEqual(s._mpisppy_node_list[0].name, "ROOT") def test_hooks_and_legacy_paths_produce_same_node_list(self): - """The new hooks path must produce the same final - _mpisppy_node_list as the legacy path (where scenario_creator - called attach_root_node itself).""" + """The first-stage-hooks path's B.2 auto-merge must produce + the same final _mpisppy_node_list as the legacy path with + first-stage entries pre-merged into consensus_vars by hand + (i.e. wrap operates on equivalent inputs and yields equivalent + wrapped scenarios).""" from mpisppy.utils.stoch_admmWrapper import Stoch_AdmmWrapper from mpisppy import MPI @@ -666,11 +676,18 @@ def test_hooks_and_legacy_paths_produce_same_node_list(self): first_stage_varlist=fs_varlist, **self._common_kwargs(), ) + # Legacy path: pre-merge ("fs", 1) into both subproblems' + # consensus_vars so the comparison is apples-to-apples. + legacy_kwargs = self._common_kwargs() + legacy_kwargs["consensus_vars"] = { + sub: entries + [("fs", 1)] + for sub, entries in legacy_kwargs["consensus_vars"].items() + } admm_legacy = Stoch_AdmmWrapper( options={}, scenario_creator=self._minimal_scenario_creator(call_attach=True), mpicomm=MPI.COMM_WORLD, - **self._common_kwargs(), + **legacy_kwargs, ) for sname in admm_hooks.local_admm_stoch_subproblem_scenarios: s_h = admm_hooks.local_admm_stoch_subproblem_scenarios[sname] @@ -684,6 +701,128 @@ def test_hooks_and_legacy_paths_produce_same_node_list(self): len(s_l._mpisppy_node_list[0].nonant_vardata_list), f"{sname}: root nonant count differs") + def test_b2_auto_merge_first_stage_into_consensus_vars(self): + """B.2: with first-stage hooks defined and a consensus_vars + that does NOT pre-merge first-stage Vars, the wrapper merges + them at construction time so the final self.consensus_vars + carries both admm-consensus and first-stage entries.""" + from mpisppy.utils.stoch_admmWrapper import Stoch_AdmmWrapper + from mpisppy import MPI + + fs_cost, fs_varlist = self._hooks() + admm = Stoch_AdmmWrapper( + options={}, + scenario_creator=self._minimal_scenario_creator(call_attach=False), + mpicomm=MPI.COMM_WORLD, + first_stage_cost=fs_cost, + first_stage_varlist=fs_varlist, + **self._common_kwargs(), + ) + self.assertIn(("fs", 1), admm.consensus_vars["A"]) + self.assertIn(("fs", 1), admm.consensus_vars["B"]) + # Existing admm entry preserved. + self.assertIn(("x", 1), admm.consensus_vars["A"]) + self.assertIn(("y", 1), admm.consensus_vars["B"]) + + def test_b2_per_subproblem_first_stage_vars(self): + """B.2: when different ADMM subproblems carry different + first-stage Vars (mirrors stoch_distr: each region has its + own factory production decisions), each ADMM subproblem's + consensus_vars must contain only ITS OWN first-stage Vars, + not the other ADMM subproblem's.""" + import pyomo.environ as pyo + from mpisppy.utils.stoch_admmWrapper import Stoch_AdmmWrapper + from mpisppy import MPI + + def per_sub_scenario_creator(sname, **kwargs): + parts = sname.split("_") + admm_part = parts[2] + m = pyo.ConcreteModel() + if admm_part == "A": + m.x = pyo.Var(bounds=(0, 1)) + m.fs_A = pyo.Var(bounds=(0, 1)) + m._fs_vars = [m.fs_A] + m.FirstStageCost = pyo.Expression(expr=m.fs_A) + m.obj = pyo.Objective(expr=m.x + m.fs_A, sense=pyo.minimize) + else: + m.y = pyo.Var(bounds=(0, 1)) + m.fs_B = pyo.Var(bounds=(0, 1)) + m._fs_vars = [m.fs_B] + m.FirstStageCost = pyo.Expression(expr=m.fs_B) + m.obj = pyo.Objective(expr=m.y + m.fs_B, sense=pyo.minimize) + return m + + def fs_cost(s): + return s.FirstStageCost + + def fs_varlist(s): + return s._fs_vars + + admm = Stoch_AdmmWrapper( + options={}, + scenario_creator=per_sub_scenario_creator, + mpicomm=MPI.COMM_WORLD, + first_stage_cost=fs_cost, + first_stage_varlist=fs_varlist, + **self._common_kwargs(), + ) + self.assertIn(("fs_A", 1), admm.consensus_vars["A"]) + self.assertNotIn(("fs_B", 1), admm.consensus_vars["A"]) + self.assertIn(("fs_B", 1), admm.consensus_vars["B"]) + self.assertNotIn(("fs_A", 1), admm.consensus_vars["B"]) + + def test_consensus_vars_accepts_var_objects(self): + """B.1: consensus_vars may contain Pyomo Var/VarData objects in + place of (or mixed with) name strings. The wrapper's normalized + consensus_vars and the resulting nonant lists must match the + string-form result.""" + from mpisppy.utils.stoch_admmWrapper import Stoch_AdmmWrapper + from mpisppy import MPI + + fs_cost, fs_varlist = self._hooks() + sc = self._minimal_scenario_creator(call_attach=False) + + # Build a Var-flavored consensus_vars. Each subproblem owns + # exactly one consensus var per _minimal_scenario_creator. + # Pyomo VarData holds its parent block via weakref, so we must + # keep these sample scenarios alive across the wrapper call. + sample_A = sc("ADMM_STOCH_A_S1") + sample_B = sc("ADMM_STOCH_B_S1") + cv_var = {"A": [(sample_A.x, 1)], "B": [(sample_B.y, 1)]} + + kw_str = self._common_kwargs() + kw_var = dict(kw_str) + kw_var["consensus_vars"] = cv_var + + admm_str = Stoch_AdmmWrapper( + options={}, + scenario_creator=sc, + mpicomm=MPI.COMM_WORLD, + first_stage_cost=fs_cost, + first_stage_varlist=fs_varlist, + **kw_str, + ) + admm_var = Stoch_AdmmWrapper( + options={}, + scenario_creator=sc, + mpicomm=MPI.COMM_WORLD, + first_stage_cost=fs_cost, + first_stage_varlist=fs_varlist, + **kw_var, + ) + # sample_{A,B} can be released now; the wrapper has snapshotted + # the names. + del sample_A, sample_B + + self.assertEqual(admm_str.consensus_vars, admm_var.consensus_vars) + for sname in admm_str.local_admm_stoch_subproblem_scenarios: + s_s = admm_str.local_admm_stoch_subproblem_scenarios[sname] + s_v = admm_var.local_admm_stoch_subproblem_scenarios[sname] + self.assertEqual( + [n.name for n in s_s._mpisppy_node_list], + [n.name for n in s_v._mpisppy_node_list], + f"{sname}: node list differs between str and var consensus_vars") + def test_hooks_plus_manual_attach_errors(self): """Hooks defined AND scenario_creator also calls attach_root_node — error so the user knows the hooks make the @@ -816,6 +955,222 @@ def test_setup_stoch_admm_with_bundles_half_hooks_errors(self): self.assertIn("first_stage_cost", msg) self.assertIn("first_stage_varlist", msg) + # ----- B.3: advanced hooks for surrogate / EF-supplemental nonants ----- + + @staticmethod + def _scenario_creator_with_surrogate_and_ef_suppl(): + """Like _minimal_scenario_creator(call_attach=False), but each + before-wrap scenario carries extra Vars m.z (a surrogate) and + m.e (an EF-supplemental nonant) that B.3 forwards to + attach_root_node via the new advanced hooks.""" + import pyomo.environ as pyo + + def sc(sname, **kwargs): + parts = sname.split("_") + admm_part = parts[2] + m = pyo.ConcreteModel() + if admm_part == "A": + m.x = pyo.Var(bounds=(0, 1)) + own = m.x + else: + m.y = pyo.Var(bounds=(0, 1)) + own = m.y + m.fs = pyo.Var(bounds=(0, 1)) + m.z = pyo.Var(bounds=(0, 1)) # surrogate nonant + m.e = pyo.Var(bounds=(0, 1)) # EF-supplemental nonant + m.FirstStageCost = pyo.Expression(expr=m.fs) + m.obj = pyo.Objective(expr=own + m.fs, sense=pyo.minimize) + m._first_stage_vars = [m.fs] + m._surrogate_vars = [m.z] + m._ef_suppl_vars = [m.e] + return m + + return sc + + def test_advanced_hooks_forwarded(self): + """B.3: when both advanced hooks are supplied, attach_root_node + receives surrogate_nonant_list and nonant_ef_suppl_list, and + the surrogate/EF-suppl Vars survive the wrapper's stage rewrite + on the resulting wrapped scenario's root node.""" + from mpisppy.utils.stoch_admmWrapper import Stoch_AdmmWrapper + from mpisppy import MPI + + fs_cost, fs_varlist = self._hooks() + admm = Stoch_AdmmWrapper( + options={}, + scenario_creator=self._scenario_creator_with_surrogate_and_ef_suppl(), + mpicomm=MPI.COMM_WORLD, + first_stage_cost=fs_cost, + first_stage_varlist=fs_varlist, + first_stage_surrogate_nonant_list=lambda s: s._surrogate_vars, + first_stage_nonant_ef_suppl_list=lambda s: s._ef_suppl_vars, + **self._common_kwargs(), + ) + for sname, s in admm.local_admm_stoch_subproblem_scenarios.items(): + root = s._mpisppy_node_list[0] + self.assertIn( + s.z, root.surrogate_vardatas, + f"{sname}: surrogate Var m.z missing from root node") + self.assertIn( + s.e, root.nonant_ef_suppl_vardata_list, + f"{sname}: EF-supplemental Var m.e missing from root node") + + def test_advanced_hook_alone_ok(self): + """B.3: only one of the two advanced hooks supplied is fine + (they are independent of each other; each is independent of + the other but both depend on the two core hooks).""" + from mpisppy.utils.stoch_admmWrapper import Stoch_AdmmWrapper + from mpisppy import MPI + + fs_cost, fs_varlist = self._hooks() + admm = Stoch_AdmmWrapper( + options={}, + scenario_creator=self._scenario_creator_with_surrogate_and_ef_suppl(), + mpicomm=MPI.COMM_WORLD, + first_stage_cost=fs_cost, + first_stage_varlist=fs_varlist, + first_stage_surrogate_nonant_list=lambda s: s._surrogate_vars, + # first_stage_nonant_ef_suppl_list NOT supplied + **self._common_kwargs(), + ) + for sname, s in admm.local_admm_stoch_subproblem_scenarios.items(): + root = s._mpisppy_node_list[0] + self.assertIn(s.z, root.surrogate_vardatas, + f"{sname}: surrogate Var missing") + # No EF-suppl Vars attached. + self.assertNotIn(s.e, root.nonant_ef_suppl_vardata_list) + + def test_advanced_hook_without_core_hooks_errors(self): + """B.3: passing an advanced hook to Stoch_AdmmWrapper without + also supplying first_stage_cost / first_stage_varlist is an + error -- there is nothing for the wrapper to attach the + advanced lists onto.""" + from mpisppy.utils.stoch_admmWrapper import Stoch_AdmmWrapper + from mpisppy import MPI + + for kw in ( + {"first_stage_surrogate_nonant_list": lambda s: []}, + {"first_stage_nonant_ef_suppl_list": lambda s: []}, + ): + with self.assertRaises(RuntimeError) as cm: + Stoch_AdmmWrapper( + options={}, + scenario_creator=self._minimal_scenario_creator(call_attach=True), + mpicomm=MPI.COMM_WORLD, + **kw, + **self._common_kwargs(), + ) + msg = str(cm.exception) + self.assertIn("advanced hook", msg) + self.assertIn("first_stage_cost", msg) + + def test_b2_probes_for_admm_subproblems_not_local_to_rank(self): + """B.2 probe-fallback path: when this rank's slice of the + before-wrap scenarios does not cover every ADMM subproblem, + the wrapper must build a fresh probe before-wrap scenario for + each missing ADMM subproblem and gather its first-stage Var + names from there. All ranks must end up with the same + self.consensus_vars regardless of slice. + + Triggered with a synthetic 2-rank mpicomm (Get_size=2, + Get_rank=0) and admm-major scenario ordering so the local + slice covers only ADMM subproblem A; B's first-stage Var + must arrive via the probe block. + """ + from mpisppy.utils.stoch_admmWrapper import Stoch_AdmmWrapper + + class FakeMpicomm: + def __init__(self, size, rank): + self._size, self._rank = size, rank + def Get_size(self): + return self._size + def Get_rank(self): + return self._rank + + fs_cost, fs_varlist = self._hooks() + base_sc = self._minimal_scenario_creator(call_attach=False) + call_log = [] + def spy(sname, **kwargs): + call_log.append(sname) + return base_sc(sname, **kwargs) + + admm_names = ["A", "B"] + stoch_names = ["S1", "S2"] + # admm-major ordering: rank 0 will get the first two entries + # (A_S1, A_S2) and miss B entirely. + all_names = [f"ADMM_STOCH_{a}_{s}" + for a in admm_names for s in stoch_names] + + def split(sname): + parts = sname.split("_") + return parts[2], "_".join(parts[3:]) + + admm = Stoch_AdmmWrapper( + options={}, + all_admm_stoch_subproblem_scenario_names=all_names, + split_admm_stoch_subproblem_scenario_name=split, + admm_subproblem_names=admm_names, + stoch_scenario_names=stoch_names, + scenario_creator=spy, + consensus_vars={"A": [("x", 1)], "B": [("y", 1)]}, + n_cylinders=1, + mpicomm=FakeMpicomm(size=2, rank=0), + scenario_creator_kwargs={}, + first_stage_cost=fs_cost, + first_stage_varlist=fs_varlist, + ) + + # Only A_S1 and A_S2 are local on rank 0. + local_names = list(admm.local_admm_stoch_subproblem_scenarios.keys()) + self.assertEqual(sorted(local_names), + ["ADMM_STOCH_A_S1", "ADMM_STOCH_A_S2"]) + + # B's first-stage Var must have arrived via the probe. + self.assertIn(("fs", 1), admm.consensus_vars["B"]) + self.assertIn(("fs", 1), admm.consensus_vars["A"]) + + # scenario_creator was invoked 2x for local + 1x for B's + # probe = 3 total. The probe targets the first B-flavored + # name in all_names, which is ADMM_STOCH_B_S1. + self.assertEqual(len(call_log), 3, + f"expected 3 scenario_creator calls " + f"(2 local + 1 probe), got {call_log}") + self.assertEqual(call_log[-1], "ADMM_STOCH_B_S1", + f"probe should target the first B-flavored " + f"name; got {call_log[-1]}") + + def test_setup_stoch_admm_advanced_without_core_errors(self): + """B.3: the setup_stoch_admm-level discovery rejects a module + that defines an advanced hook without the two core hooks.""" + import types + from mpisppy.generic.admm import setup_stoch_admm + + module = types.SimpleNamespace( + __name__="fake_module", + admm_subproblem_names_creator=lambda cfg: ["A", "B"], + stoch_scenario_names_creator=lambda cfg: ["S1", "S2"], + admm_stoch_subproblem_scenario_names_creator=( + lambda an, sn: [f"ADMM_STOCH_{a}_{s}" for s in sn for a in an]), + split_admm_stoch_subproblem_scenario_name=( + lambda name: (name.split("_")[2], + "_".join(name.split("_")[3:]))), + kw_creator=lambda cfg: {}, + consensus_vars_creator=( + lambda an, sn, **kw: {"A": [("x", 1)], "B": [("y", 1)]}), + scenario_creator=self._minimal_scenario_creator(call_attach=False), + # No first_stage_cost / first_stage_varlist; only an advanced hook. + first_stage_surrogate_nonant_list=lambda s: [], + ) + cfg = config.Config() + cfg.add_to_config("branching_factors", description="", + domain=list, default=None) + with self.assertRaises(RuntimeError) as cm: + setup_stoch_admm(module, cfg, n_cylinders=1) + msg = str(cm.exception) + self.assertIn("fake_module", msg) + self.assertIn("first_stage_surrogate_nonant_list", msg) + self.assertIn("first_stage_cost", msg) + if __name__ == '__main__': unittest.main() diff --git a/mpisppy/utils/admmWrapper.py b/mpisppy/utils/admmWrapper.py index de76f64bc..9969ec8e3 100644 --- a/mpisppy/utils/admmWrapper.py +++ b/mpisppy/utils/admmWrapper.py @@ -6,12 +6,172 @@ # All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for # full copyright and license information. ############################################################################### -#creating the class AdmmWrapper +"""ADMM wrapper for --admm; shared helpers for the stoch-admm family. + +This module hosts AdmmWrapper (used by --admm) and helpers that +Stoch_AdmmWrapper and AdmmBundler also import. + +---------------------------------------------------------------------- +Vocabulary used consistently across the ADMM wrappers and their tests +---------------------------------------------------------------------- + +before-wrap scenario (a.k.a. before-wrap extended scenario) + The per-(ADMM subproblem, stochastic scenario) Pyomo model that + the user's scenario_creator returns. Has the user's scenario + tree (stages 1..T) but no ADMM consensus stage yet. + +wrapped scenario (a.k.a. wrapped extended scenario) + The same Pyomo model after the wrapper applies the wrap + operation. This is what mpi-sppy iterates over. For + Stoch_AdmmWrapper, the tree gains an ADMM consensus stage at a + new last node; for AdmmBundler, the bundle carries every + consensus Var on a single ROOT node. + +wrap (verb), narrow scope + The ADMM-specific transformation between before-wrap and + wrapped: append the ADMM consensus stage (Stoch_AdmmWrapper) or + flatten consensus Vars into ROOT (AdmmBundler), plus the + consequent surrogate-Var / probability bookkeeping. Wrap does + NOT cover sputils.attach_root_node -- whether the user calls it + in scenario_creator or the wrapper calls it on the user's behalf + via the first_stage_cost / first_stage_varlist hooks, that step + is setup, not wrap. + +the wrapper + Any of AdmmWrapper, Stoch_AdmmWrapper, AdmmBundler. Use the + class name when which wrapper matters. + +stochastic scenario (paper: xi in Xi) + One realization of the random data; the user has |Xi| of them. + Always write "stochastic scenario" in prose -- bare "scenario" + is ambiguous now that before-wrap / wrapped also exist. + +ADMM subproblem (paper: a in A) + One partition / region of the decomposed problem; the user has + |A| of them. ADMM capitalized in prose; the code identifier is + admm_subproblem. scenario_creator is called once per + (ADMM subproblem, stochastic scenario) pair, ordering matched to + the ADMM_STOCH_{admm}_{stoch} naming convention. + +first-stage Vars / first-stage cost + The Vars at stage 1 of the before-wrap scenario tree, and the + corresponding cost expression. Paper-aligned, algorithm-level + term; use it when describing the user-facing API + (first_stage_varlist / first_stage_cost hooks) or the algorithm + in the abstract. + +root-node Vars + The Vars attached to _mpisppy_node_list[0]. In the standard + case these are the user's first-stage Vars after + sputils.attach_root_node has run. Use this term when the + surrounding prose is about what mpi-sppy is doing to the + node-list data structure. +""" + import mpisppy.utils.sputils as sputils import pyomo.environ as pyo from mpisppy import MPI global_rank = MPI.COMM_WORLD.Get_rank() +def _admm_normalize_consensus_vars(consensus_vars, *, tuple_form): + """Coerce every Var identifier in consensus_vars to its .name string. + + Each entry of a subproblem's consensus list may be a string + (legacy) or a Pyomo Var / VarData (anything exposing a .name + attribute). Mixed lists are allowed so callers can migrate one + entry at a time. For indexed Vars, pass individual VarData + objects (e.g., scenario.x[idx]) rather than the container. + + Caveat: a Pyomo VarData holds its parent block via a weakref, so + the .name lookup here only resolves to a real name if the + before-wrap scenario the Var was taken from is still alive when + the wrapper is constructed. Callers that build a before-wrap + scenario inside a helper function must keep it alive across the + wrapper call, or snapshot the names themselves before letting it + go out of scope. + + Args: + consensus_vars (dict): {ADMM subproblem name: list_of_entries}. + tuple_form=False (--admm): entries are Var identifiers. + tuple_form=True (--stoch-admm): entries are + (Var identifier, stage) tuples. + tuple_form (bool): which list shape consensus_vars uses. + + Returns: + dict: a new dict with the same shape, all identifiers as + strings. + """ + def _to_name(v): + if isinstance(v, str): + return v + name = getattr(v, "name", None) + if name is None: + raise TypeError( + f"consensus_vars entry must be a string or a Pyomo " + f"Var/VarData (anything with a .name attribute), got " + f"{type(v).__name__}: {v!r}" + ) + return name + + out = {} + for sub, entries in consensus_vars.items(): + if tuple_form: + out[sub] = [(_to_name(v), stage) for (v, stage) in entries] + else: + out[sub] = [_to_name(v) for v in entries] + return out + + +def _merge_first_stage_into_consensus_vars(consensus_vars, first_stage_var_names_per_sub, root_stage=1): + """Add each ADMM subproblem's first-stage Var names to its + consensus_vars entry, tagged at root_stage. + + Used by --stoch-admm at wrapper-construction time (NOT during + wrap) when the first_stage_cost / first_stage_varlist hooks are + defined, so the user's consensus_vars_creator can return only the + admm-consensus Vars and leave first-stage Vars to the wrapper. + + Per-subproblem because different ADMM subproblems may carry + different first-stage Vars (e.g., one region owns its factory + production decisions, a different region owns its own). Callers + gather the names by invoking first_stage_varlist on one + before-wrap scenario per ADMM subproblem. Those names will end + up on the root node of every wrapped scenario in the subproblem, + because Stoch_AdmmWrapper later runs sputils.attach_root_node + over the same first_stage_varlist; this helper is what makes the + merge of first-stage Vars into the cross-subproblem consensus + list happen automatically. + + De-duplicates: entries already present (e.g., from a partially + migrated consensus_vars_creator that still pre-merges + first-stage Vars by hand) are left in place. + + Args: + consensus_vars (dict): {ADMM subproblem name: [(name_str, stage), + ...]}, already normalized by _admm_normalize_consensus_vars. + first_stage_var_names_per_sub (dict): {ADMM subproblem name: + [name_str, ...]}. ADMM subproblems absent from this map + get no merge. + root_stage (int): stage tag for the appended entries (1 for the + user's stage 1, i.e. the root of the before-wrap scenario + tree in a 2-stage-origin problem). + + Returns: + dict: a new consensus_vars dict with first-stage entries merged in. + """ + out = {} + for sub, entries in consensus_vars.items(): + existing = set(entries) + merged = list(entries) + for name in first_stage_var_names_per_sub.get(sub, []): + key = (name, root_stage) + if key not in existing: + merged.append(key) + existing.add(key) + out[sub] = merged + return out + + def _consensus_vars_number_creator(consensus_vars): """associates to each consensus vars the number of time it appears @@ -86,9 +246,9 @@ def __init__(self, self.local_scenarios[sname] = s #we are not collecting instantiation time - self.consensus_vars = consensus_vars + self.consensus_vars = _admm_normalize_consensus_vars(consensus_vars, tuple_form=False) self.verbose = verbose - self.consensus_vars_number = _consensus_vars_number_creator(consensus_vars) + self.consensus_vars_number = _consensus_vars_number_creator(self.consensus_vars) #check_consensus_vars(consensus_vars) self.assign_variable_probs(verbose=self.verbose) self.number_of_scenario = len(all_scenario_names) diff --git a/mpisppy/utils/admm_bundler.py b/mpisppy/utils/admm_bundler.py index e9b583cdd..e77a6119d 100644 --- a/mpisppy/utils/admm_bundler.py +++ b/mpisppy/utils/admm_bundler.py @@ -23,6 +23,10 @@ import mpisppy.utils.sputils as sputils import mpisppy.scenario_tree as scenario_tree +from mpisppy.utils.admmWrapper import ( + _admm_normalize_consensus_vars, + _merge_first_stage_into_consensus_vars, +) from mpisppy.utils.stoch_admmWrapper import _consensus_vars_number_creator @@ -55,7 +59,9 @@ def __init__(self, module, scenarios_per_bundle, admm_subproblem_names, stoch_scenario_names, consensus_vars, combining_fn, split_fn, scenario_creator_kwargs=None, - first_stage_cost=None, first_stage_varlist=None): + first_stage_cost=None, first_stage_varlist=None, + first_stage_surrogate_nonant_list=None, + first_stage_nonant_ef_suppl_list=None): # Same both-or-neither contract as Stoch_AdmmWrapper. if (first_stage_cost is None) != (first_stage_varlist is None): present = "first_stage_cost" if first_stage_cost is not None else "first_stage_varlist" @@ -64,23 +70,62 @@ def __init__(self, module, scenarios_per_bundle, f"AdmmBundler was given {present} but not {missing}. " f"These hooks must be defined together (or both omitted)." ) + # Advanced hooks forward to attach_root_node's + # surrogate_nonant_list / nonant_ef_suppl_list parameters. + # Each may be defined alone, but only when the two core hooks + # are also defined. + advanced_hooks = { + "first_stage_surrogate_nonant_list": first_stage_surrogate_nonant_list, + "first_stage_nonant_ef_suppl_list": first_stage_nonant_ef_suppl_list, + } + present_advanced = [n for n, h in advanced_hooks.items() if h is not None] + if present_advanced and first_stage_cost is None: + raise RuntimeError( + f"AdmmBundler was given the advanced hook(s) " + f"{present_advanced} but first_stage_cost / " + f"first_stage_varlist were not defined. The advanced " + f"hooks forward to sputils.attach_root_node's optional " + f"parameters and only make sense when the core hooks " + f"are also driving attach_root_node." + ) self.module = module self.scenarios_per_bundle = scenarios_per_bundle self.admm_subproblem_names = admm_subproblem_names self.stoch_scenario_names = stoch_scenario_names - self.consensus_vars = consensus_vars + self.consensus_vars = _admm_normalize_consensus_vars(consensus_vars, tuple_form=True) self.combining_fn = combining_fn self.split_fn = split_fn self.scenario_creator_kwargs = scenario_creator_kwargs or {} self.first_stage_cost = first_stage_cost self.first_stage_varlist = first_stage_varlist + self.first_stage_surrogate_nonant_list = first_stage_surrogate_nonant_list + self.first_stage_nonant_ef_suppl_list = first_stage_nonant_ef_suppl_list self.number_admm_subproblems = len(admm_subproblem_names) - self.consensus_vars_number = _consensus_vars_number_creator(consensus_vars) + + # Same first-stage auto-merge as Stoch_AdmmWrapper. + # AdmmBundler does not pre-build before-wrap scenarios in + # __init__, so to snapshot first-stage Var names we build one + # probe before-wrap scenario per ADMM subproblem (different + # ADMM subproblems may carry different first-stage Vars). + # scenario_creator is assumed side-effect-free. This is + # setup, not wrap. + if first_stage_varlist is not None: + fs_names_per_sub = {} + for sub in admm_subproblem_names: + probe_sname = combining_fn(sub, stoch_scenario_names[0]) + probe_scen = module.scenario_creator(probe_sname, + **self.scenario_creator_kwargs) + fs_names_per_sub[sub] = [v.name for v in first_stage_varlist(probe_scen)] + del probe_scen + self.consensus_vars = _merge_first_stage_into_consensus_vars( + self.consensus_vars, fs_names_per_sub, root_stage=1) + + self.consensus_vars_number = _consensus_vars_number_creator(self.consensus_vars) # Collect all consensus vars with stages self.all_consensus_vars = {} - for sub in consensus_vars: - for var_stage_tuple in consensus_vars[sub]: + for sub in self.consensus_vars: + for var_stage_tuple in self.consensus_vars[sub]: self.all_consensus_vars[var_stage_tuple[0]] = var_stage_tuple[1] # Maps bundle model object → var_prob list @@ -236,8 +281,16 @@ def scenario_creator(self, bundle_name, **kwargs): f"sputils.attach_root_node. Remove the " f"attach_root_node call from scenario_creator." ) + attach_kwargs = {} + if self.first_stage_surrogate_nonant_list is not None: + attach_kwargs["surrogate_nonant_list"] = \ + self.first_stage_surrogate_nonant_list(s) + if self.first_stage_nonant_ef_suppl_list is not None: + attach_kwargs["nonant_ef_suppl_list"] = \ + self.first_stage_nonant_ef_suppl_list(s) sputils.attach_root_node( - s, self.first_stage_cost(s), self.first_stage_varlist(s)) + s, self.first_stage_cost(s), self.first_stage_varlist(s), + **attach_kwargs) else: if not already_attached: raise RuntimeError( diff --git a/mpisppy/utils/stoch_admmWrapper.py b/mpisppy/utils/stoch_admmWrapper.py index 201a5a767..c99c5ad1c 100644 --- a/mpisppy/utils/stoch_admmWrapper.py +++ b/mpisppy/utils/stoch_admmWrapper.py @@ -11,6 +11,10 @@ import pyomo.environ as pyo import mpisppy.scenario_tree as scenario_tree import numpy as np +from mpisppy.utils.admmWrapper import ( + _admm_normalize_consensus_vars, + _merge_first_stage_into_consensus_vars, +) def _tag_dummies_as_surrogate(node, dummies): @@ -94,6 +98,8 @@ def __init__(self, BFs=None, first_stage_cost=None, first_stage_varlist=None, + first_stage_surrogate_nonant_list=None, + first_stage_nonant_ef_suppl_list=None, ): assert len(options) == 0, "no options supported by stoch_admmWrapper" # first_stage_cost / first_stage_varlist must be defined together @@ -107,6 +113,25 @@ def __init__(self, f"These hooks must be defined together (or both omitted)." ) has_first_stage_hooks = first_stage_cost is not None + # The advanced hooks forward to attach_root_node's + # surrogate_nonant_list / nonant_ef_suppl_list parameters. + # Each may be defined alone, but only when the two core hooks + # are also defined (there is nothing for the wrapper to attach + # them onto otherwise). + advanced_hooks = { + "first_stage_surrogate_nonant_list": first_stage_surrogate_nonant_list, + "first_stage_nonant_ef_suppl_list": first_stage_nonant_ef_suppl_list, + } + present_advanced = [n for n, h in advanced_hooks.items() if h is not None] + if present_advanced and not has_first_stage_hooks: + raise RuntimeError( + f"Stoch_AdmmWrapper was given the advanced hook(s) " + f"{present_advanced} but first_stage_cost / " + f"first_stage_varlist were not defined. The advanced " + f"hooks forward to sputils.attach_root_node's optional " + f"parameters and only make sense when the core hooks " + f"are also driving attach_root_node." + ) # We need local_scenarios self.local_admm_stoch_subproblem_scenarios = {} scen_tree = sputils._ScenTree(["ROOT"], all_admm_stoch_subproblem_scenario_names) @@ -139,8 +164,16 @@ def __init__(self, f"sputils.attach_root_node. Remove the " f"attach_root_node call from scenario_creator." ) + attach_kwargs = {} + if first_stage_surrogate_nonant_list is not None: + attach_kwargs["surrogate_nonant_list"] = \ + first_stage_surrogate_nonant_list(s) + if first_stage_nonant_ef_suppl_list is not None: + attach_kwargs["nonant_ef_suppl_list"] = \ + first_stage_nonant_ef_suppl_list(s) sputils.attach_root_node( - s, first_stage_cost(s), first_stage_varlist(s)) + s, first_stage_cost(s), first_stage_varlist(s), + **attach_kwargs) else: if not already_attached: raise RuntimeError( @@ -154,9 +187,40 @@ def __init__(self, # we are not collecting instantiation time self.split_admm_stoch_subproblem_scenario_name = split_admm_stoch_subproblem_scenario_name - self.consensus_vars = consensus_vars + self.consensus_vars = _admm_normalize_consensus_vars(consensus_vars, tuple_form=True) + # When first-stage hooks are defined, auto-merge each ADMM + # subproblem's first-stage Vars into its consensus list so + # consensus_vars_creator can return only the admm-consensus + # Vars and leave first-stage Vars to the wrapper. Different + # ADMM subproblems may carry different first-stage Vars, so + # names are gathered per-subproblem: from already-built + # before-wrap scenarios where possible, falling back to a + # fresh probe before-wrap scenario for ADMM subproblems this + # rank does not host locally (every rank must end up with + # the same consensus_vars to keep _consensus_vars_number + # and assign_variable_probs consistent). This is setup, not + # wrap. + if has_first_stage_hooks: + fs_names_per_sub = {} + for sname, s in self.local_admm_stoch_subproblem_scenarios.items(): + sub = split_admm_stoch_subproblem_scenario_name(sname)[0] + if sub not in fs_names_per_sub: + fs_names_per_sub[sub] = [v.name for v in first_stage_varlist(s)] + if len(fs_names_per_sub) < len(admm_subproblem_names): + name_for_sub = {} + for cand in all_admm_stoch_subproblem_scenario_names: + cand_sub = split_admm_stoch_subproblem_scenario_name(cand)[0] + name_for_sub.setdefault(cand_sub, cand) + for sub in admm_subproblem_names: + if sub not in fs_names_per_sub: + probe = scenario_creator(name_for_sub[sub], + **(scenario_creator_kwargs or {})) + fs_names_per_sub[sub] = [v.name for v in first_stage_varlist(probe)] + del probe + self.consensus_vars = _merge_first_stage_into_consensus_vars( + self.consensus_vars, fs_names_per_sub, root_stage=1) self.verbose = verbose - self.consensus_vars_number = _consensus_vars_number_creator(consensus_vars) + self.consensus_vars_number = _consensus_vars_number_creator(self.consensus_vars) self.admm_subproblem_names = admm_subproblem_names self.stoch_scenario_names = stoch_scenario_names self.BFs = BFs