Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/test_pr_and_main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,11 @@ jobs:
mpisppy/tests/test_reduced_costs_rho_bundles.py \
mpisppy/tests/test_rho_deprecations.py \
mpisppy/tests/test_buffer_inspect.py \
mpisppy/tests/test_comm_lor_check.py
mpisppy/tests/test_comm_lor_check.py \
mpisppy/tests/test_rank_apportionment.py \
mpisppy/tests/test_overlap_map.py \
mpisppy/tests/test_spwindow_partial_get.py \
mpisppy/tests/test_flexible_rank_gate.py

- name: Upload coverage data
if: always()
Expand Down
241 changes: 196 additions & 45 deletions doc/designs/flexible_rank_assignments.md

Large diffs are not rendered by default.

93 changes: 93 additions & 0 deletions mpisppy/cylinders/overlap_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
###############################################################################
# mpi-sppy: MPI-based Stochastic Programming in PYthon
#
# Copyright (c) 2024, Lawrence Livermore National Security, LLC, Alliance for
# Sustainable Energy, LLC, The Regents of the University of California, et al.
# All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for
# full copyright and license information.
###############################################################################
"""Overlap maps for reading a local-sized field across cylinders that have
different MPI rank counts.

When two cylinders hold the same scenarios but split them over a different
number of ranks, a reader rank's local buffer for a per-scenario field is
backed by one or more *segments* living in remote ranks' buffers. An
``OverlapSegment`` records, for one such segment, which remote rank holds
it and the matching offsets/length (in field *items*, e.g. nonants) in the
remote and local buffers.

This module is deliberately free of MPI and Pyomo: it consumes the
contiguous per-rank scenario partition (the ``slices`` produced by
``_ScenTree.scen_names_to_ranks``) plus a per-scenario item count, so it
can be unit tested directly. See
``doc/designs/flexible_rank_assignments.md`` (the "Overlap Maps" section).
"""

from dataclasses import dataclass


@dataclass
class OverlapSegment:
"""One contiguous run of a local field buffer sourced from a single
remote rank's buffer. All offsets/lengths are in field items."""
remote_rank: int # rank in the peer cylinder to read from
remote_offset: int # item offset of this run within the remote buffer
local_offset: int # item offset of this run within the local buffer
count: int # number of items in this run


def compute_overlap_segments(local_scen_idxs, remote_slices, items_per_scen):
"""Segments mapping a local rank's field buffer onto remote buffers.

Args:
local_scen_idxs: ordered global scenario indices held by this
(local) rank -- i.e. ``local_slices[local_rank]``.
remote_slices: the peer cylinder's partition, ``rank -> ordered
list of global scenario indices`` (the ``slices`` output of
``scen_names_to_ranks`` for that cylinder). Determines both
which remote rank holds each scenario and the scenario's
offset within that rank's buffer.
items_per_scen: sequence indexed by global scenario index giving
the number of field items each scenario contributes (e.g.
``len(nonant_indices)``). Uniform two-stage problems pass a
constant for every scenario; multistage problems may vary.

Returns:
list[OverlapSegment]: segments covering the local buffer in order,
with contiguous same-rank runs coalesced. When both cylinders
have the same rank count this degenerates to a single identity
segment ``(remote_rank=local_rank, remote_offset=0,
local_offset=0, count=<all items>)``.
"""
# Invert remote_slices: global scenario index -> (remote rank, item
# offset of that scenario within the remote rank's buffer).
remote_rank_of_scen = {}
remote_offset_of_scen = {}
for rank, scen_idxs in enumerate(remote_slices):
offset = 0
for scen in scen_idxs:
remote_rank_of_scen[scen] = rank
remote_offset_of_scen[scen] = offset
offset += items_per_scen[scen]

segments = []
cur = None
local_offset = 0
for scen in local_scen_idxs:
rank = remote_rank_of_scen[scen]
r_off = remote_offset_of_scen[scen]
n = items_per_scen[scen]
# Coalesce only while we stay on the same remote rank AND the
# remote items remain contiguous with the run so far.
if cur is not None and cur.remote_rank == rank \
and cur.remote_offset + cur.count == r_off:
cur.count += n
else:
if cur is not None:
segments.append(cur)
cur = OverlapSegment(remote_rank=rank, remote_offset=r_off,
local_offset=local_offset, count=n)
local_offset += n
if cur is not None:
segments.append(cur)
return segments
26 changes: 23 additions & 3 deletions mpisppy/cylinders/spwindow.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,38 @@ def free(self):
return

#### Functions ####
def get(self, dest: nptyping.ArrayLike, strata_rank: int, field: Field):
def get(self, dest: nptyping.ArrayLike, strata_rank: int, field: Field,
item_offset: int = 0, item_count: int = None):
"""Read a remote rank's buffer for ``field`` into ``dest``.

By default the whole padded field is transferred (``dest`` must be
``padded_len`` long), preserving the original behavior. When
``item_count`` is given, only that many doubles are read, starting
``item_offset`` doubles into the field -- a partial read used for
multi-source assembly across cylinders with different rank counts
(see ``overlap_map.py``). ``dest`` must then be ``item_count`` long.
"""
assert (0 <= strata_rank < len(self.strata_buffer_layouts))

that_layout = self.strata_buffer_layouts[strata_rank]
assert field in that_layout

(offset, logical_len, padded_len) = that_layout[field]
assert np.size(dest) == padded_len

if item_count is None:
count = padded_len
disp = offset
else:
assert item_offset >= 0 and item_count >= 0
assert item_offset + item_count <= padded_len, \
f"{field=} partial get {item_offset=}+{item_count=} exceeds {padded_len=}"
count = item_count
disp = offset + item_offset
assert np.size(dest) == count

window = self.window
window.Lock(strata_rank, MPI.LOCK_SHARED)
window.Get((dest, padded_len, MPI.DOUBLE), strata_rank, offset)
window.Get((dest, count, MPI.DOUBLE), strata_rank, disp)
window.Unlock(strata_rank)
return

Expand Down
22 changes: 22 additions & 0 deletions mpisppy/spin_the_wheel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from mpisppy.utils import nice_join
from mpisppy.utils.sputils import first_stage_nonant_writer, scenario_tree_solution_writer
from mpisppy.utils.rank_apportionment import apportion_ranks

class WheelSpinner:

Expand Down Expand Up @@ -103,6 +104,27 @@ def run(self, comm_world=None):

# Create the necessary communicators
fullcomm = comm_world

# Flexible rank assignments: each cylinder may request a rank ratio
# relative to the hub (default 1.0) via a "rank_ratio" dict key. The
# uniform case (all ratios 1.0) is handled exactly as before. A
# non-uniform request is apportioned (largest-remainder, floor of one)
# and reported, but building the communicators for unequal per-cylinder
# counts is not yet wired into the communicator/window layer.
rank_ratios = [d.get("rank_ratio", 1.0) for d in communicator_list]
if any(r != 1.0 for r in rank_ratios):
rank_counts = apportion_ranks(rank_ratios, fullcomm.Get_size())
global_toc(
f"Requested per-cylinder rank ratios {rank_ratios} -> "
f"rank counts {rank_counts}",
fullcomm.Get_rank() == 0,
)
raise NotImplementedError(
"Per-cylinder rank counts (flexible rank assignments) are not "
f"yet wired into the communicator layer; computed allocation "
f"{rank_counts}. Run with all rank_ratio == 1.0 for now."
)

strata_comm, cylinder_comm = _make_comms(n_spcomms, fullcomm=fullcomm)
strata_rank = strata_comm.Get_rank()
cylinder_rank = cylinder_comm.Get_rank()
Expand Down
75 changes: 75 additions & 0 deletions mpisppy/tests/test_flexible_rank_gate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
###############################################################################
# mpi-sppy: MPI-based Stochastic Programming in PYthon
#
# Copyright (c) 2024, Lawrence Livermore National Security, LLC, Alliance for
# Sustainable Energy, LLC, The Regents of the University of California, et al.
# All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for
# full copyright and license information.
###############################################################################
"""WheelSpinner reads per-cylinder rank_ratio dict keys and, when any is
non-default, apportions and reports the allocation then raises (the live
unequal-rank communicator path is not yet wired in). The uniform default
path is exercised by the cylinder test suite, not here.

The gate fires before any communicator is built, so a stub comm with just
Get_size/Get_rank is enough to drive it without MPI."""

import unittest

from mpisppy.spin_the_wheel import WheelSpinner


class _StubComm:
"""Minimal comm: the gate only needs size and rank."""
def __init__(self, size, rank=0):
self._size = size
self._rank = rank

def Get_size(self):
return self._size

def Get_rank(self):
return self._rank


def _min_dict(role, **extra):
# Dicts only need the keys WheelSpinner validates before the gate;
# the classes are never instantiated on this path.
d = {f"{role}_class": object, "opt_class": object}
d.update(extra)
return d


class TestFlexibleRankGate(unittest.TestCase):

def test_nonuniform_ratio_raises_not_implemented(self):
hub = _min_dict("hub", rank_ratio=1.0)
spoke = _min_dict("spoke", rank_ratio=0.5)
ws = WheelSpinner(hub, [spoke])
with self.assertRaises(NotImplementedError):
ws.run(comm_world=_StubComm(size=4))

def test_nonuniform_with_too_few_ranks_raises_value_error(self):
# When even the floor-of-one is infeasible, apportion_ranks raises
# ValueError first (before the NotImplementedError gate).
hub = _min_dict("hub", rank_ratio=1.0)
spoke_a = _min_dict("spoke", rank_ratio=0.5)
spoke_b = _min_dict("spoke", rank_ratio=0.5)
ws = WheelSpinner(hub, [spoke_a, spoke_b])
with self.assertRaises(ValueError):
ws.run(comm_world=_StubComm(size=2))

def test_explicit_uniform_ratios_pass_the_gate(self):
# All ratios 1.0 -> gate is skipped; the next step (_make_comms) is
# reached and fails on the stub (no Split). Reaching that point at
# all proves the NotImplementedError gate did not fire.
hub = _min_dict("hub", rank_ratio=1.0)
spoke = _min_dict("spoke", rank_ratio=1.0)
ws = WheelSpinner(hub, [spoke])
with self.assertRaises(Exception) as ctx:
ws.run(comm_world=_StubComm(size=2))
self.assertNotIsInstance(ctx.exception, NotImplementedError)


if __name__ == "__main__":
unittest.main()
115 changes: 115 additions & 0 deletions mpisppy/tests/test_overlap_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
###############################################################################
# mpi-sppy: MPI-based Stochastic Programming in PYthon
#
# Copyright (c) 2024, Lawrence Livermore National Security, LLC, Alliance for
# Sustainable Energy, LLC, The Regents of the University of California, et al.
# All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for
# full copyright and license information.
###############################################################################
"""Unit tests for mpisppy.cylinders.overlap_map.compute_overlap_segments."""

import unittest

from mpisppy.cylinders.overlap_map import OverlapSegment, compute_overlap_segments


def _seg(remote_rank, remote_offset, local_offset, count):
return OverlapSegment(remote_rank, remote_offset, local_offset, count)


class TestOverlapMap(unittest.TestCase):

def _assert_tiles_local(self, segments, local_scen_idxs, items_per_scen):
# Segments must tile the local buffer contiguously from offset 0,
# with total length equal to the local rank's item count.
expected_local = sum(items_per_scen[s] for s in local_scen_idxs)
running = 0
for seg in segments:
self.assertEqual(seg.local_offset, running)
running += seg.count
self.assertEqual(running, expected_local)

def test_equal_ranks_is_identity_single_segment(self):
# Same rank count on both cylinders -> one identity segment per
# local rank (the no-op-at-equal-ranks backbone).
remote_slices = [[0, 1], [2, 3], [4, 5], [6, 7]]
items = [1] * 8
for rank, local in enumerate(remote_slices):
segs = compute_overlap_segments(local, remote_slices, items)
self.assertEqual(segs, [_seg(rank, 0, 0, len(local))])

def test_equal_ranks_identity_multi_item(self):
# Identity holds with >1 item per scenario.
remote_slices = [[0, 1], [2, 3]]
items = [2] * 4
segs = compute_overlap_segments([2, 3], remote_slices, items)
self.assertEqual(segs, [_seg(1, 0, 0, 4)])

def test_fewer_remote_ranks_doc_example(self):
# 4-rank hub reading a 2-rank spoke, 10 scenarios, 1 item each.
# Spoke split is contiguous: rank0 = scen0-4, rank1 = scen5-9.
remote_slices = [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
items = [1] * 10

# Hub rank holding scen0,1: wholly inside spoke rank 0.
self.assertEqual(
compute_overlap_segments([0, 1], remote_slices, items),
[_seg(0, 0, 0, 2)],
)
# Hub rank holding scen4,5: straddles the spoke split -> two
# segments (scen4 from spoke rank0 @offset4, scen5 from spoke
# rank1 @offset0).
self.assertEqual(
compute_overlap_segments([4, 5], remote_slices, items),
[_seg(0, 4, 0, 1), _seg(1, 0, 1, 1)],
)
# Hub rank holding scen7,8,9: wholly inside spoke rank1, coalesced.
self.assertEqual(
compute_overlap_segments([7, 8, 9], remote_slices, items),
[_seg(1, 2, 0, 3)],
)

def test_more_remote_ranks(self):
# 2-rank local cylinder reading a 4-rank remote cylinder.
remote_slices = [[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]
items = [1] * 10
segs = compute_overlap_segments([0, 1, 2, 3, 4], remote_slices, items)
self.assertEqual(segs, [_seg(0, 0, 0, 2), _seg(1, 0, 2, 3)])
self._assert_tiles_local(segs, [0, 1, 2, 3, 4], items)

def test_coalesces_contiguous_run_within_remote_rank(self):
remote_slices = [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
items = [1] * 10
segs = compute_overlap_segments([2, 3, 4], remote_slices, items)
self.assertEqual(segs, [_seg(0, 2, 0, 3)])

def test_variable_items_per_scenario(self):
# Multistage-style: scenarios contribute different item counts.
items = [1, 2, 3, 1, 2]
remote_slices = [[0, 1, 2], [3, 4]]
# Local rank holds scen1,2 (both in remote rank0). scen1 starts at
# remote offset items[0]=1; coalesced count = 2+3 = 5.
segs = compute_overlap_segments([1, 2], remote_slices, items)
self.assertEqual(segs, [_seg(0, 1, 0, 5)])
self._assert_tiles_local(segs, [1, 2], items)

def test_variable_items_across_remote_ranks(self):
items = [1, 2, 3, 1, 2]
remote_slices = [[0, 1], [2, 3, 4]]
# Local holds scen1 (remote rank0 @offset1, count2) then scen2
# (remote rank1 @offset0, count3).
segs = compute_overlap_segments([1, 2], remote_slices, items)
self.assertEqual(segs, [_seg(0, 1, 0, 2), _seg(1, 0, 2, 3)])
self._assert_tiles_local(segs, [1, 2], items)

def test_single_scenario(self):
remote_slices = [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
items = [1] * 10
self.assertEqual(
compute_overlap_segments([6], remote_slices, items),
[_seg(1, 1, 0, 1)],
)


if __name__ == "__main__":
unittest.main()
Loading
Loading