From dd445078261c640c515dceabe244796db69ed9cf Mon Sep 17 00:00:00 2001 From: "Wesley H. Holliday" Date: Mon, 12 Jan 2026 09:02:30 -0800 Subject: [PATCH 1/2] Added sequential STV and tests of propositional methods --- pref_voting/proportional_methods.py | 635 ++++++++++++++++++++-- tests/test_proportional_methods.py | 788 ++++++++++++++++++++++++++++ 2 files changed, 1384 insertions(+), 39 deletions(-) create mode 100644 tests/test_proportional_methods.py diff --git a/pref_voting/proportional_methods.py b/pref_voting/proportional_methods.py index 9a5b92c4..260d559e 100644 --- a/pref_voting/proportional_methods.py +++ b/pref_voting/proportional_methods.py @@ -27,6 +27,7 @@ import itertools import collections import random +import warnings from pref_voting.weighted_majority_graphs import MarginGraph from pref_voting.margin_based_methods import minimax @@ -36,7 +37,7 @@ from pref_voting.profiles import Profile EPS = 1e-12 -TRACE = bool(int(os.environ.get("STV_TRACE", "0") or "0")) +TRACE = os.environ.get("STV_TRACE", "").strip().lower() in {"1", "true", "yes", "y", "t"} def _t(msg): if TRACE: @@ -382,8 +383,10 @@ def _eliminate_lowest(pieces, continuing, parcels, tie_break_key=None): if not lowest: return None, pieces if len(lowest) > 1: + # tie_break_key interpretation: lower value = higher priority = survives + # So we eliminate the candidate with the HIGHEST key value (reverse sort) key = tie_break_key or (lambda x: x) - lowest.sort(key=key) + lowest.sort(key=key, reverse=True) elim = lowest[0] continuing.remove(elim) @@ -583,7 +586,8 @@ def eliminate_and_transfer(elim, continuing, parcels): if len(tied) > 1: chosen = history_prefer(tied, prefer="highest") if chosen is None: - chosen = rand.choice(tied) + # Sort for reproducibility with seeded RNG + chosen = rand.choice(sorted(tied)) else: chosen = tied[0] @@ -619,7 +623,8 @@ def eliminate_and_transfer(elim, continuing, parcels): max_s = max(tall_now[c] - float(quota) for c in surplusers) tied = [c for c in surplusers if abs((tall_now[c]-float(quota)) - max_s) <= EPS] if len(tied) > 1: - chosen = history_prefer(tied, prefer="highest") or rand.choice(tied) + # Sort for reproducibility with seeded RNG + chosen = history_prefer(tied, prefer="highest") or rand.choice(sorted(tied)) else: chosen = tied[0] @@ -644,7 +649,8 @@ def eliminate_and_transfer(elim, continuing, parcels): if len(lowest) > 1: elim = history_prefer(lowest, prefer="lowest") if elim is None: - elim = rand.choice(lowest) # decide by lot if tied at all previous stages + # Sort for reproducibility with seeded RNG + elim = rand.choice(sorted(lowest)) else: elim = lowest[0] @@ -776,17 +782,10 @@ def stv_nb(profile, num_seats = 2, curr_cands=None, quota_rule="nb", mann_strict drain_all=drain_all, last_parcel_only=False, ers_rounding=ers_rounding ) _t(f"Transfer surplus from {elect}: moved={moved}") - if moved: - # After each transfer, check if any continuing candidate now meets quota. - # If so, break out to let the main loop deem them elected before further transfers. - tallies_after = _tally_from_pieces(pieces, restrict_to=continuing) - newly_elected = [c for c in continuing - if (tallies_after.get(c, 0.0) > quota + EPS if mann_strict - else tallies_after.get(c, 0.0) >= quota - EPS)] - if newly_elected: - break # Go back to main loop to elect them - else: + if not moved: stuck.add(elect) + # Continue transferring all surpluses from elected_this_round before + # checking for new winners (standard STV behavior) if len(continuing) <= num_seats - len(winners): winners.extend(sorted(continuing)) @@ -903,15 +902,10 @@ def stv_wig(profile, num_seats=2, curr_cands=None, quota_rule="nb", tie_break_ke pieces, elect, quota, recipients=continuing, parcels=parcels, drain_all=True, last_parcel_only=False ) - if moved: - # After each transfer, check if any continuing candidate now meets quota. - # If so, break out to let the main loop deem them elected before further transfers. - tallies_after = _tally_from_pieces(pieces, restrict_to=continuing) - newly_elected = [c for c in continuing if tallies_after.get(c, 0.0) >= quota - EPS] - if newly_elected: - break # Go back to main loop to elect them - else: + if not moved: stuck.add(elect) + # Continue transferring all surpluses from elected_this_round before + # checking for new winners (standard STV behavior) # If remaining candidates equal remaining seats, elect them all if len(continuing) <= num_seats - len(winners): @@ -1016,13 +1010,6 @@ def stv_last_parcel(profile, num_seats = 2, curr_cands=None, quota_rule="nb", ti drain_all=True, last_parcel_only=True ) _t(f"[LP] Transfer surplus (last parcel) from {c}: moved={moved}") - if moved: - # After each transfer, check if any continuing candidate now meets quota. - # If so, break out to let the main loop deem them elected before further transfers. - tallies_after = _tally_from_pieces(pieces, restrict_to=continuing) - newly_elected = [c2 for c2 in continuing if tallies_after.get(c2, 0.0) >= quota - EPS] - if newly_elected: - break # Go back to main loop to elect them continue if len(continuing) <= num_seats - len(winners): @@ -1235,7 +1222,9 @@ def stv_meek(profile, num_seats=2, curr_cands=None, tol=1e-10, max_iter=2000, ti if newly_elected: # Elect candidates that reached quota (highest tally first) - for c in sorted(newly_elected, key=lambda x: (-tallies.get(x, 0.0), x)): + # Use tie_break_key for secondary sort (when tallies are equal) + key_fn = tie_break_key or (lambda x: x) + for c in sorted(newly_elected, key=lambda x: (-tallies.get(x, 0.0), key_fn(x))): if len(elected) >= num_seats: break hopeful.remove(c) @@ -1246,11 +1235,12 @@ def stv_meek(profile, num_seats=2, curr_cands=None, tol=1e-10, max_iter=2000, ti keep[c] = quota / t _t(f"[Meek] Elect: {c} (t={t:.6f}, quota={quota:.6f})") continue - + # No one elected - check if we can fill remaining seats with hopeful candidates if len(hopeful) <= num_seats - len(elected): # Elect all remaining hopeful candidates - for c in sorted(hopeful): + key_fn = tie_break_key or (lambda x: x) + for c in sorted(hopeful, key=key_fn): elected.add(c) break @@ -1267,11 +1257,13 @@ def stv_meek(profile, num_seats=2, curr_cands=None, tol=1e-10, max_iter=2000, ti lowest = [c] elif abs(t - min_t) <= EPS: lowest.append(c) - + if len(lowest) > 1: + # tie_break_key interpretation: lower value = higher priority = survives + # So we eliminate the candidate with the HIGHEST key value (reverse sort) key = tie_break_key or (lambda x: x) - lowest.sort(key=key) - + lowest.sort(key=key, reverse=True) + elim = lowest[0] hopeful.remove(elim) keep[elim] = 0.0 @@ -1610,9 +1602,11 @@ def stv_warren(profile, num_seats=2, curr_cands=None, tol=1e-10, max_iter=2000, lowest.append(c) if len(lowest) > 1: + # tie_break_key interpretation: lower value = higher priority = survives + # So we eliminate the candidate with the HIGHEST key value (reverse sort) key = tie_break_key or (lambda x: x) - lowest.sort(key=key) - + lowest.sort(key=key, reverse=True) + elim = lowest[0] hopeful.remove(elim) _t(f"[Warren] Eliminate: {elim} (t={min_t:.6f})") @@ -2022,4 +2016,567 @@ def cpo_stv(profile, num_seats = 2, curr_cands=None, inpair_surplus="meek", fall return sorted(winners_list[0]) # If multiple tied winners, choose one randomly - return sorted(rand.choice(winners_list)) \ No newline at end of file + return sorted(rand.choice(winners_list)) + + +# ---------- Sequential STV ---------- + +def _stv_meek_with_elimination_order(profile, num_seats, candidates, by_order=None, tol=1e-10, max_iter=2000): + """ + Run Meek STV and return both winners and elimination order. + + Used by Sequential STV to determine the initial queue. This is Meek's method + with tracking of which candidates were excluded and in what order. + + Issue 20 requires all STV counts to use Meek's method. + + Args: + by_order: A function that returns a sortable key for a candidate. + Used for deterministic tie-breaking. Defaults to candidate index + in the candidates list. + + Returns: + tuple: (winners_set, exclusion_order_list) + """ + # Create stable ordering if not provided + if by_order is None: + cand_order = {c: i for i, c in enumerate(candidates)} + by_order = lambda c: cand_order.get(c, 0) + + # Handle both Profile and ProfileWithTies + if isinstance(profile, Profile): + profile = profile.to_profile_with_ties() + + hopeful = set(candidates) + elected = set() + exclusion_order = [] + + # Keep factors: hopeful=1, elected=adjusted, excluded=0 + keep = {c: 1.0 for c in candidates} + + # Calculate total weight from profile + rankings, rcounts = profile.rankings_counts + total_weight = sum(float(count) for count in rcounts) + if total_weight <= EPS or not hopeful or num_seats <= 0: + return set(), [] + + safety = 0 + while len(elected) < num_seats: + safety += 1 + if safety > 50000: + raise RuntimeError("_stv_meek_with_elimination_order: loop safety tripped") + + # Candidates still in the count (hopeful or elected) + active = hopeful | elected + if not active: + break + + # Iteratively adjust keep factors until convergence + for _ in range(max_iter): + tallies, excess = _meek_tally_from_profile(profile, keep, active) + + # Quota = (total_votes - excess) / (k+1) + usable = total_weight - excess + quota = usable / float(num_seats + 1) if usable > EPS else 0.0 + + changed = False + + # Adjust keep factors for ELECTED candidates to make their tally approach quota + for c in elected: + t = tallies.get(c, 0.0) + if t > tol and keep.get(c, 1.0) > 0.0: + new_keep = keep[c] * quota / t + new_keep = max(0.0, min(1.0, new_keep)) + if abs(keep[c] - new_keep) > tol: + keep[c] = new_keep + changed = True + + if not changed and elected: + max_deviation = max(abs(tallies.get(c, 0.0) - quota) for c in elected) + if max_deviation > 10 * tol: + changed = True + + if not changed: + break + + # After convergence, check if any HOPEFUL candidate has reached quota + tallies, excess = _meek_tally_from_profile(profile, keep, active) + usable = total_weight - excess + quota = usable / float(num_seats + 1) if usable > EPS else 0.0 + + newly_elected = [] + for c in list(hopeful): + t = tallies.get(c, 0.0) + if t >= quota - tol: + newly_elected.append(c) + + if newly_elected: + # Elect candidates that reached quota (highest tally first, by_order for ties) + for c in sorted(newly_elected, key=lambda x: (-tallies.get(x, 0.0), by_order(x))): + if len(elected) >= num_seats: + break + hopeful.remove(c) + elected.add(c) + t = tallies.get(c, 0.0) + if t > quota + tol: + keep[c] = quota / t + continue + + # No one elected - check if we can fill remaining seats with hopeful candidates + if len(hopeful) <= num_seats - len(elected): + elected |= hopeful + hopeful.clear() # Clear to avoid adding to exclusion_order below + break + + # Exclude the hopeful candidate with the lowest tally + if not hopeful: + break + + min_t = float('inf') + lowest = [] + for c in hopeful: + t = tallies.get(c, 0.0) + if t < min_t - EPS: + min_t = t + lowest = [c] + elif abs(t - min_t) <= EPS: + lowest.append(c) + + # Tie-break using by_order (deterministic) + # Convention: lower value = higher priority = survives + # So we eliminate the candidate with the HIGHEST key value (reverse sort) + lowest.sort(key=by_order, reverse=True) + elim = lowest[0] + + hopeful.remove(elim) + keep[elim] = 0.0 + exclusion_order.append(elim) + + # Ensure ALL non-elected candidates are in the exclusion order. + # Some candidates may still be "hopeful" when the count ends (e.g., if enough + # candidates reached quota early). These should be added to the exclusion order + # in reverse-tally order (strongest last, making them the runner-up). + if hopeful: + # Get final tallies for remaining hopeful candidates + active = hopeful | elected + final_tallies, _ = _meek_tally_from_profile(profile, keep, active) + # Sort remaining hopeful by tally (lowest first = weakest first), by_order for ties + remaining = sorted(hopeful, key=lambda c: (final_tallies.get(c, 0.0), by_order(c))) + exclusion_order.extend(remaining) + + return elected, exclusion_order + + +def _calculate_borda_scores(profile, continuing_candidates): + """ + Calculate Borda scores for continuing candidates in Sequential STV. + + Per Issue 20: "a Borda score is calculated, as the sum over all votes of the + number of continuing candidates to whom the candidate in question is preferred, + taking all unmentioned continuing candidates as equal in last place. A continuing + candidate who is not mentioned in a particular vote is given, for that vote, the + average score that would have been attained by all those unmentioned. In practice + it can help to give 2 points instead of 1 for each candidate beaten, because all + scores, including any averages required, are then whole numbers." + + Implementation details: + - We use 2 points per candidate beaten (as suggested in the paper) + - For tied candidates (whether explicitly tied or unranked), we apply the + averaging principle: each gets the average score as if they weren't tied + - The formula for a tied group of size s with b candidates strictly below: + score = 2 * b + (s - 1) + This equals 2*b (for beating b candidates) plus the average among s tied + candidates: avg(0, 2, 4, ..., 2(s-1)) = s - 1 + + Returns: + dict: Mapping from candidate to Borda score + """ + continuing = list(continuing_candidates) + scores = {c: 0 for c in continuing} + + rankings, rcounts = profile.rankings_counts + + for ranking, count in zip(rankings, rcounts): + rmap = ranking.rmap # candidate -> rank, None if unranked + + # Group continuing candidates by their rank on this ballot + groups = collections.defaultdict(list) # rank -> list of candidates at that rank + unranked = [] + for c in continuing: + r = rmap.get(c) + if r is None: + unranked.append(c) + else: + groups[r].append(c) + + # Unranked candidates are treated as tied in last place. + # Per the paper, they get the average score: if m are unranked, each gets + # the average of {0, 2, 4, ..., 2(m-1)} = (m-1) points. + m = len(unranked) + if m: + unrank_score = m - 1 + for c in unranked: + scores[c] += unrank_score * count + + # Process ranked candidates from lowest rank (worst) to highest (best). + # Track how many continuing candidates are strictly below the current position. + below = m # Start with unranked candidates below + for r in sorted(groups.keys(), reverse=True): + g = groups[r] + s = len(g) + # Each candidate in this group beats 'below' candidates strictly, + # and ties with (s-1) others in the same group. + # Score = 2 * below + (s - 1), where (s-1) is the tie averaging. + group_score = 2 * below + (s - 1) + for c in g: + scores[c] += group_score * count + below += s # This group is now "below" for higher-ranked candidates + + return scores + + +@vm(name="Sequential-STV", input_types=[ElectionTypes.PROFILE, ElectionTypes.PROFILE_WITH_TIES]) +def sequential_stv(profile, num_seats=2, curr_cands=None, max_iterations=1000): + """ + Sequential STV as described in Voting Matters Issue 20 (2005 revision). + + Reference: https://www.votingmatters.org.uk/ISSUE20/I20P2.PDF + Original: https://www.votingmatters.org.uk/ISSUE15/P4.HTM + + Sequential STV addresses the "premature exclusion" problem in regular STV by + systematically testing whether any non-elected candidate could replace an elected + one. It seeks to find a set of n candidates that observes Droop Proportionality + and is preferred by the largest majority of voters to any other possible set. + + Algorithm: + + **Phase 1 - Initialization:** + Run an initial STV count (using Meek's method) to classify candidates as + "probables" (would-be winners) or put them in a queue in reverse exclusion + order, with the runner-up moved to the end. + + **Phase 2 - Challenge Loop:** + For each challenger (head of queue), run STV with n+1 candidates for n seats. + If challenger succeeds, they become a probable and the beaten candidate goes + to the end of the queue. If challenger fails (including ties), they go to + the end of the queue. Continue until a complete run through the queue with + no successful challenger (stable solution) or a loop is detected. + + **Phase 3 - Loop Detection:** + - Certain loop: Same probables set recurs with identical queue order + - Possible loop: Same probables set with different queue; second chance given, + but if it recurs again, treat as certain loop + + **Phase 4 - Loop Resolution:** + Exclude all candidates who have never been a probable since the last restart, + then restart retaining existing probables and queue. If no candidate can be + excluded, use the Borda score special procedure to exclude one at-risk + candidate, then restart. + + All STV counts use Meek's method. + + Args: + profile: A Profile or ProfileWithTies object containing voter rankings + num_seats (int): Number of seats to fill + curr_cands: List of candidates to consider, defaults to all candidates in profile + max_iterations (int): Maximum iterations before giving up (prevents infinite loops) + + Returns: + list: List of elected candidates + + .. note:: + In the single-winner case (num_seats=1), Sequential STV is Condorcet-consistent: + if there is a Condorcet winner (a candidate who beats every other candidate + head-to-head), Sequential STV will elect them. This distinguishes Sequential + STV from plain IRV, which can fail to elect a Condorcet winner due to + premature exclusion. + + .. note:: + This implementation has been verified against the Issue 20 specification. + The Borda special procedure (used for loop resolution) extends the paper's + averaging rule for unranked candidates to also apply to explicitly tied + candidates, providing a natural generalization to ProfileWithTies. + """ + if isinstance(profile, Profile): + profile = profile.to_profile_with_ties() + + # Sort candidates to ensure deterministic ordering regardless of iteration order + # from profile.candidates (which may be a set with unstable iteration). + candidates_list = sorted(profile.candidates) if curr_cands is None else sorted(curr_cands) + + if num_seats <= 0 or len(candidates_list) == 0: + return [] + + if num_seats >= len(candidates_list): + return candidates_list # Already sorted + + # Since candidates are numeric, use identity as the deterministic ordering. + # This is simpler and gives the same result as building a map from sorted order. + def by_order(c): + """Sort key using candidate's numeric value for deterministic comparisons.""" + return c + + # Phase 1: Run initial STV (Meek) to get probables and exclusion order + # Issue 20 requires all STV counts to use Meek's method + probables_set, exclusion_order = _stv_meek_with_elimination_order( + profile, num_seats, candidates_list, by_order=by_order + ) + probables = sorted(probables_set, key=by_order) + + if not exclusion_order: + return probables + + # Create queue in reverse exclusion order, with runner-up moved to end. + # Per Issue 20: "puts the others into a queue, in the reverse order of their + # exclusion in that STV count, except that the runner-up is moved to last place + # as it is already known that an initial challenge by that candidate will not succeed." + # Example: if exclusion_order = [A, B, C] (A excluded first, C=runner-up), + # then reversed = [C, B, A], and queue = [B, A, C] (runner-up C at end). + reversed_order = list(reversed(exclusion_order)) + if len(reversed_order) > 1: + runner_up = reversed_order[0] + queue = collections.deque(reversed_order[1:] + [runner_up]) + else: + queue = collections.deque(reversed_order) + + _t(f"[Sequential-STV] Initial probables: {probables}") + _t(f"[Sequential-STV] Exclusion order: {exclusion_order}") + _t(f"[Sequential-STV] Initial queue: {list(queue)}") + + # Track candidates who have ever been probable since last restart + ever_probable = set(probables) + + # Track candidates who have ALWAYS been probable since last restart + # (for determining "at-risk" candidates in special procedure) + always_probable = set(probables) + + # Track probables set occurrences for loop detection + # Key: frozenset of probables, Value: list of queue tuples seen with that probables set + probables_occurrences = collections.defaultdict(list) + probables_occurrences[frozenset(probables)].append(tuple(queue)) + + # Track if we've given a "second chance" for a probables set with different queue + second_chance_given = set() + + # Track the previous probables set to detect when it changes + last_probables = frozenset(probables) + + iterations = 0 + challenges_since_change = 0 + + while iterations < max_iterations: + iterations += 1 + + # Check if we've completed a full cycle without changes (stable solution) + if challenges_since_change >= len(queue): + _t(f"[Sequential-STV] Stable solution found at iteration {iterations}") + return probables + + if not queue: + return probables + + # Get the next challenger + challenger = queue.popleft() + + # Run STV (Meek) with probables + challenger competing for num_seats. + # Per Issue 20: "Should a tie occur during these rounds, between a probable + # and a challenger, it is resolved by maintaining the current situation; + # that is to say, the challenger has not succeeded." + # We implement this by using a tie_break_key that gives probables higher + # priority (lower key value), so challengers lose ties. + contest_cands = probables + [challenger] + + def tie_break_favoring_probables(c): + # tie_break_key interpretation: lower value = higher priority + # - In elections: lower key = elected first (wins) + # - In eliminations: lower key = survives (higher priority) + # Issue 20 tie rule: challenger loses ties → give probables lower key + if c == challenger: + return (1, by_order(c)) # Challenger has lower priority (loses ties) + else: + return (0, by_order(c)) # Probables have higher priority (win ties) + + winners = stv_meek( + profile, num_seats=num_seats, curr_cands=contest_cands, + tie_break_key=tie_break_favoring_probables + ) + winners_set = set(winners) + + # n+1-for-n invariant: expect exactly num_seats winners and 1 loser + # If outcome is ambiguous (wrong number of winners), maintain status quo + contest_set = set(contest_cands) + losers = contest_set - winners_set + + if len(winners_set) != num_seats or len(losers) != 1: + # Ambiguous outcome - treat as challenger failed (status quo per Issue 20) + _t(f"[Sequential-STV] Ambiguous challenge result, maintaining status quo") + queue.append(challenger) + challenges_since_change += 1 + elif challenger in winners_set: + # Challenger succeeded - find the unique loser + loser = next(iter(losers)) # Exactly one loser + probables = sorted(winners_set, key=by_order) + queue.append(loser) + challenges_since_change = 0 + ever_probable.add(challenger) + # Update always_probable: intersect with new probables + always_probable = always_probable & winners_set + _t(f"[Sequential-STV] {challenger} displaces {loser}") + else: + # Challenger failed + queue.append(challenger) + challenges_since_change += 1 + _t(f"[Sequential-STV] {challenger} fails to displace anyone") + + # Loop detection: only check when probables has CHANGED + # (Issue 20 loop detection is about returning to a prior state after changes, + # not about probables staying the same during failed challenges) + current_probables = frozenset(probables) + current_queue = tuple(queue) + + # Only do loop detection if probables changed this round + probables_changed = (current_probables != last_probables) + + loop_detected = False + if probables_changed and current_probables in probables_occurrences: + prev_queues = probables_occurrences[current_probables] + + if current_queue in prev_queues: + # Certain loop: same probables AND same queue order + _t(f"[Sequential-STV] Certain loop detected (same probables and queue)") + loop_detected = True + + elif current_probables in second_chance_given: + # Same probables set seen before with different queue, and we already + # gave a second chance - now treat as loop + _t(f"[Sequential-STV] Loop detected (same probables, second recurrence)") + loop_detected = True + + else: + # Same probables but different queue - give second chance + _t(f"[Sequential-STV] Possible loop (same probables, different queue) - second chance") + second_chance_given.add(current_probables) + + if loop_detected: + # Handle the loop - returns (is_final, result) tuple + is_final, result = _handle_sequential_stv_loop_2005( + profile, num_seats, probables, queue, ever_probable, always_probable + ) + if is_final: + # Final result - return winners + return result + # Restart: update probables if changed + if result is not None: + probables = result + # Reset tracking but keep probables and queue + ever_probable = set(probables) + always_probable = set(probables) + probables_occurrences.clear() + second_chance_given.clear() + challenges_since_change = 0 + last_probables = frozenset(probables) + probables_occurrences[last_probables].append(tuple(queue)) + continue + + # Update tracking + if probables_changed: + probables_occurrences[current_probables].append(current_queue) + last_probables = current_probables + + _t(f"[Sequential-STV] Max iterations reached") + warnings.warn( + f"Sequential STV reached max_iterations ({max_iterations}) without converging. " + "The result may be incomplete. Consider increasing max_iterations.", + RuntimeWarning + ) + return probables + + +def _handle_sequential_stv_loop_2005(profile, num_seats, probables, queue, + ever_probable, always_probable): + """ + Handle a detected loop in Sequential STV (2005 version). + + Loop resolution per the 2005 paper: + 1. Exclude all candidates who have never been a probable since last restart + 2. If no such candidates, use Borda score special procedure to exclude + one "at-risk" candidate (those not always probable) + 3. Restart with existing probables and queue + + Returns: + tuple: (is_final, result) where: + - is_final=True, result=winners_list: Election complete, return winners + - is_final=False, result=new_probables: Restart with updated probables + - is_final=False, result=None: Restart, probables unchanged (only queue changed) + """ + + all_in_contest = set(probables) | set(queue) + never_probable = all_in_contest - ever_probable + + if never_probable: + # Exclude all candidates who have never been probable + _t(f"[Sequential-STV] Excluding never-probables: {never_probable}") + + # Remove never-probables from the queue (mutate in place) + new_queue = [c for c in queue if c not in never_probable] + queue.clear() + queue.extend(new_queue) + + # Check if we're done + if len(probables) == num_seats and not queue: + return (True, list(probables)) # Final result + + return (False, None) # Restart, probables unchanged + + else: + # All candidates have been probable at some point - use Borda special procedure. + # Per Issue 20: "If there is no candidate who can be so excluded, then a special + # procedure is used, in which each continuing candidate, other than any who has + # always been a probable since the last restart, is classified as 'at-risk'." + _t(f"[Sequential-STV] All candidates have been probable, using Borda special procedure") + + # At-risk candidates: those not ALWAYS probable since last restart + at_risk = all_in_contest - always_probable + + if not at_risk: + # Everyone has always been probable - just return current probables + _t(f"[Sequential-STV] No at-risk candidates, returning current probables") + return (True, list(probables)) # Final result + + # Calculate Borda scores for all continuing candidates + continuing = list(probables) + list(queue) + scores = _calculate_borda_scores(profile, continuing) + + # Find the at-risk candidate with the lowest Borda score + # Use candidate number for deterministic tie-breaking among equal scores + at_risk_scores = [(scores[c], c) for c in at_risk] + at_risk_scores.sort() # Sort by score, then by candidate number + _, to_exclude = at_risk_scores[0] + + _t(f"[Sequential-STV] Borda scores: {scores}") + _t(f"[Sequential-STV] Excluding at-risk candidate with lowest score: {to_exclude}") + + if to_exclude in probables: + # Excluded candidate was a probable - head of queue becomes probable + new_probables = [c for c in probables if c != to_exclude] + if queue: + new_probable = queue.popleft() + new_probables = sorted(new_probables + [new_probable]) + _t(f"[Sequential-STV] {new_probable} promoted to probable") + + # Check if we're done + if len(new_probables) == num_seats and not queue: + return (True, new_probables) # Final result + + return (False, new_probables) # Restart with updated probables + else: + # Excluded candidate was in queue - just remove from queue (mutate in place) + new_queue = [c for c in queue if c != to_exclude] + queue.clear() + queue.extend(new_queue) + + # Check if we're done + if len(probables) == num_seats and not queue: + return (True, list(probables)) # Final result + + return (False, None) # Restart, probables unchanged \ No newline at end of file diff --git a/tests/test_proportional_methods.py b/tests/test_proportional_methods.py new file mode 100644 index 00000000..87c502b2 --- /dev/null +++ b/tests/test_proportional_methods.py @@ -0,0 +1,788 @@ +""" +Tests for proportional voting methods (STV variants). + +These tests cover the main STV implementations including Scottish STV, Meek, Warren, +and other variants including Sequential STV. +""" + +from pref_voting.proportional_methods import ( + stv_scottish, stv_meek, stv_warren, stv_nb, stv_wig, + stv_last_parcel, approval_stv, cpo_stv, sequential_stv +) +from pref_voting.profiles import Profile +from pref_voting.profiles_with_ties import ProfileWithTies, Ranking + +import pytest + + +# ============================================================================ +# Fixtures +# ============================================================================ + +@pytest.fixture +def simple_profile(): + """A simple 3-candidate profile where candidate 0 is clearly preferred.""" + return Profile([ + [0, 1, 2], + [0, 2, 1], + [1, 0, 2], + ]) + + +@pytest.fixture +def party_blocs_profile(): + """ + Two party blocs: A-party (candidates 0,1) and B-party (candidates 2,3). + 6 voters prefer A-party, 4 voters prefer B-party. + For 2 seats, fair result should be 1 from each party. + """ + return Profile([ + [0, 1, 2, 3], # A-party voter + [0, 1, 2, 3], + [0, 1, 2, 3], + [1, 0, 2, 3], # A-party voter (different order) + [1, 0, 2, 3], + [1, 0, 2, 3], + [2, 3, 0, 1], # B-party voter + [2, 3, 0, 1], + [3, 2, 0, 1], # B-party voter (different order) + [3, 2, 0, 1], + ]) + + +@pytest.fixture +def profile_with_clear_winner(): + """Profile where candidate 0 has overwhelming first-preference support.""" + return Profile([ + [0, 1, 2], + [0, 1, 2], + [0, 1, 2], + [0, 2, 1], + [0, 2, 1], + [1, 0, 2], + [2, 1, 0], + ]) + + +@pytest.fixture +def profile_for_transfers(): + """Profile designed to test surplus transfer mechanics.""" + return Profile([ + [0, 1, 2], + [0, 1, 2], + [0, 1, 2], + [0, 1, 2], + [0, 2, 1], + [1, 2, 0], + [2, 1, 0], + ]) + + +@pytest.fixture +def single_voter_profile(): + """Single voter - tests faithfulness property.""" + return Profile([[0, 1, 2, 3, 4]]) + + +@pytest.fixture +def profile_with_ties(): + """Profile with tied preferences (for ProfileWithTies).""" + return ProfileWithTies([ + Ranking({0: 1, 1: 2, 2: 3}), + Ranking({0: 1, 1: 2, 2: 3}), + Ranking({1: 1, 0: 2, 2: 3}), + Ranking({2: 1, 1: 2, 0: 3}), + ]) + + +# ============================================================================ +# Basic functionality tests +# ============================================================================ + +@pytest.mark.parametrize("method", [ + stv_scottish, stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_methods_return_correct_number_of_seats(method, simple_profile): + """All methods should return exactly num_seats winners.""" + result = method(simple_profile, num_seats=1) + assert len(result) == 1 + + result = method(simple_profile, num_seats=2) + assert len(result) == 2 + + +@pytest.mark.parametrize("method", [ + stv_scottish, stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_methods_elect_clear_favorite(method, profile_with_clear_winner): + """When one candidate has clear majority support, they should be elected.""" + result = method(profile_with_clear_winner, num_seats=1) + assert 0 in result, f"{method.name} should elect candidate 0 who has majority support" + + +@pytest.mark.parametrize("method", [ + stv_scottish, stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_methods_accept_profile_with_ties(method, profile_with_ties): + """All methods should accept ProfileWithTies input.""" + result = method(profile_with_ties, num_seats=2) + assert len(result) == 2 + assert all(c in [0, 1, 2] for c in result) + + +# ============================================================================ +# Proportionality tests +# ============================================================================ + +@pytest.mark.parametrize("method", [ + stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_proportionality_two_party(method, party_blocs_profile): + """ + With 60% A-party and 40% B-party voters electing 2 seats, + proportional methods should elect 1 from each party. + """ + result = method(party_blocs_profile, num_seats=2) + a_party_elected = len([c for c in result if c in [0, 1]]) + b_party_elected = len([c for c in result if c in [2, 3]]) + assert a_party_elected == 1, f"{method.name} should elect exactly 1 A-party candidate" + assert b_party_elected == 1, f"{method.name} should elect exactly 1 B-party candidate" + + +# ============================================================================ +# Faithfulness tests (single voter should get top-k) +# ============================================================================ + +@pytest.mark.parametrize("method", [ + stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_faithfulness_single_voter(method, single_voter_profile): + """ + With a single voter, the method should elect the voter's top-k candidates. + Note: Scottish STV does NOT satisfy this due to integer Droop quota mechanics. + """ + result = method(single_voter_profile, num_seats=3) + assert set(result) == {0, 1, 2}, f"{method.name} should elect voter's top 3 choices" + + +@pytest.mark.skip(reason="Scottish STV does not satisfy faithfulness due to integer Droop quota") +def test_scottish_faithfulness_single_voter(single_voter_profile): + """ + Scottish STV with single voter - documents expected behavior. + With 1 voter and k seats, quota = floor(1/(k+1)) + 1 = 1, + so no candidate reaches quota and eliminations determine result. + """ + result = stv_scottish(single_voter_profile, num_seats=3) + # This test is skipped because Scottish STV doesn't guarantee this + assert set(result) == {0, 1, 2} + + +# ============================================================================ +# Approval STV tests +# ============================================================================ + +def test_approval_stv_basic(profile_with_clear_winner): + """Approval STV should elect the clearly preferred candidate.""" + result = approval_stv(profile_with_clear_winner, num_seats=1) + assert 0 in result + + +def test_approval_stv_with_profile_with_ties(): + """Approval STV handles equal rankings (approval-style ballots).""" + # Voters approve multiple candidates equally + prof = ProfileWithTies([ + Ranking({0: 1, 1: 1, 2: 2}), # Approves 0 and 1 equally + Ranking({0: 1, 1: 1, 2: 2}), + Ranking({2: 1, 0: 2, 1: 2}), # Approves only 2 + ]) + result = approval_stv(prof, num_seats=2) + assert len(result) == 2 + + +# ============================================================================ +# CPO-STV tests +# ============================================================================ + +def test_cpo_stv_basic(simple_profile): + """CPO-STV should return the correct number of winners.""" + result = cpo_stv(simple_profile, num_seats=2) + assert len(result) == 2 + + +def test_cpo_stv_condorcet_committee(): + """ + CPO-STV should elect the Condorcet committee when one exists. + Profile where {0, 2} beats all other pairs pairwise. + """ + prof = Profile([ + [0, 2, 1, 3], + [0, 2, 1, 3], + [2, 0, 3, 1], + [2, 0, 3, 1], + [1, 3, 0, 2], + [3, 1, 2, 0], + ]) + result = cpo_stv(prof, num_seats=2) + # {0, 2} should be elected as they form the strongest pair + assert 0 in result or 2 in result # At minimum one of the Condorcet pair + + +# ============================================================================ +# Edge cases +# ============================================================================ + +@pytest.mark.parametrize("method", [ + stv_scottish, stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_all_seats_equals_candidates(method): + """When num_seats equals number of candidates, all should be elected.""" + prof = Profile([[0, 1, 2]]) + result = method(prof, num_seats=3) + assert set(result) == {0, 1, 2} + + +@pytest.mark.parametrize("method", [ + stv_scottish, stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_single_candidate_single_seat(method): + """Single candidate for single seat should be elected.""" + prof = Profile([[0]]) + result = method(prof, num_seats=1) + assert result == [0] + + +@pytest.mark.parametrize("method", [ + stv_scottish, stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_curr_cands_subset(method, simple_profile): + """Methods should respect curr_cands parameter.""" + result = method(simple_profile, num_seats=1, curr_cands=[1, 2]) + assert all(c in [1, 2] for c in result) + assert 0 not in result + + +@pytest.mark.parametrize("method", [ + stv_meek, stv_warren, stv_nb, stv_last_parcel +]) +def test_empty_election(method): + """Empty profile should return empty result for iterative methods.""" + prof = ProfileWithTies([], candidates=[0, 1, 2]) + result = method(prof, num_seats=2) + assert result == [] + + +def test_empty_election_scottish_wig(): + """ + Scottish STV and WIG may elect candidates with empty profile due to + 'if continuing candidates equal remaining seats, elect all' rule. + This tests they don't crash rather than checking specific behavior. + """ + prof = ProfileWithTies([], candidates=[0, 1, 2]) + result_scottish = stv_scottish(prof, num_seats=2) + result_wig = stv_wig(prof, num_seats=2) + # Just verify they return something without crashing + assert isinstance(result_scottish, list) + assert isinstance(result_wig, list) + + +@pytest.mark.parametrize("method", [ + stv_scottish, stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_zero_seats(method, simple_profile): + """Requesting 0 seats should return empty result.""" + result = method(simple_profile, num_seats=0) + assert result == [] + + +# ============================================================================ +# Determinism tests +# ============================================================================ + +@pytest.mark.parametrize("method", [ + stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_determinism(method, party_blocs_profile): + """Same input should produce same output (with same tie-breaking).""" + result1 = method(party_blocs_profile, num_seats=2) + result2 = method(party_blocs_profile, num_seats=2) + assert result1 == result2 + + +def test_scottish_determinism_with_fixed_rng(party_blocs_profile): + """Scottish STV is deterministic when given a fixed RNG for tie-breaking.""" + import random + rng1 = random.Random(42) + rng2 = random.Random(42) + result1 = stv_scottish(party_blocs_profile, num_seats=2, rng=rng1) + result2 = stv_scottish(party_blocs_profile, num_seats=2, rng=rng2) + assert result1 == result2 + + +# ============================================================================ +# Sequential STV tests +# ============================================================================ + +def test_sequential_stv_basic(): + """Sequential STV returns correct number of seats.""" + prof = Profile([ + [0, 1, 2, 3], + [0, 1, 2, 3], + [1, 0, 2, 3], + [2, 3, 0, 1], + [3, 2, 0, 1], + ]) + result = sequential_stv(prof, num_seats=2) + assert len(result) == 2 + + +def test_sequential_stv_condorcet_winner(): + """Sequential STV elects the Condorcet winner when num_seats=1.""" + # Candidate 1 is Condorcet winner: beats 0 (4-3) and beats 2 (4-3) + prof = Profile([ + [0, 1, 2], + [0, 1, 2], + [0, 1, 2], + [1, 2, 0], + [1, 2, 0], + [2, 1, 0], + [2, 1, 0], + ]) + result = sequential_stv(prof, num_seats=1) + assert result == [1], "Sequential STV should elect Condorcet winner" + + +def test_sequential_stv_condorcet_winner_larger(): + """Sequential STV finds Condorcet winner in larger candidate field.""" + # Candidate 2 beats everyone + prof = Profile([ + [2, 0, 1, 3, 4], + [2, 0, 1, 3, 4], + [2, 1, 0, 3, 4], + [0, 2, 1, 3, 4], + [1, 2, 0, 3, 4], + [3, 2, 0, 1, 4], + [4, 2, 0, 1, 3], + ]) + result = sequential_stv(prof, num_seats=1) + assert result == [2], "Sequential STV should elect Condorcet winner" + + +def test_sequential_stv_condorcet_cycle(): + """Sequential STV handles Condorcet cycles gracefully.""" + # Classic 3-way cycle: 0 > 1 > 2 > 0 + prof = Profile([ + [0, 1, 2], + [1, 2, 0], + [2, 0, 1], + ]) + result = sequential_stv(prof, num_seats=1) + # Should return some valid candidate, not crash + assert len(result) == 1 + assert result[0] in [0, 1, 2] + + +def test_sequential_stv_more_seats(): + """Sequential STV works correctly with more seats.""" + prof = Profile([ + [0, 1, 2, 3, 4], + [0, 1, 2, 3, 4], + [1, 0, 2, 3, 4], + [2, 3, 0, 1, 4], + [3, 2, 0, 1, 4], + [4, 3, 2, 1, 0], + ]) + result_2 = sequential_stv(prof, num_seats=2) + result_3 = sequential_stv(prof, num_seats=3) + assert len(result_2) == 2 + assert len(result_3) == 3 + + +def test_sequential_stv_universal_second_choice(): + """ + Sequential STV can elect a universal second-choice candidate. + + This is the key example from the Voting Matters paper where + Sequential STV differs from plain STV. + """ + # E (candidate 4) is everyone's second choice + prof = Profile([ + [0, 4, 1, 2, 3], # A>E>... + [0, 4, 1, 2, 3], + [0, 4, 1, 2, 3], + [1, 4, 0, 2, 3], # B>E>... + [1, 4, 0, 2, 3], + [1, 4, 0, 2, 3], + [2, 4, 0, 1, 3], # C>E>... + [2, 4, 0, 1, 3], + [2, 4, 0, 1, 3], + [3, 4, 0, 1, 2], # D>E>... + [3, 4, 0, 1, 2], + [3, 4, 0, 1, 2], + [4, 0, 1, 2, 3], # E>A>... + ]) + result = sequential_stv(prof, num_seats=2) + # E (candidate 4) should be elected due to broad second-choice support + assert 4 in result, "Sequential STV should elect universal second-choice candidate" + + +def test_sequential_stv_woodall_example1(): + """ + First example from Woodall's Voting Matters paper. + + 5 candidates (A=0, B=1, C=2, D=3, E=4), 2 seats. + In this scenario, both plain STV and Sequential STV elect B and C. + """ + # A=0, B=1, C=2, D=3, E=4 + # Use ProfileWithTies to handle truncated ballots (E not ranked by some voters) + prof = ProfileWithTies([ + Ranking({0: 1, 1: 2, 2: 3, 3: 4}), # 104 voters: A, B, C, D (E unranked) + Ranking({1: 1, 2: 2, 3: 3, 0: 4}), # 103 voters: B, C, D, A + Ranking({2: 1, 3: 2, 1: 3, 0: 4}), # 102 voters: C, D, B, A + Ranking({3: 1, 1: 2, 2: 3, 0: 4}), # 101 voters: D, B, C, A + Ranking({4: 1, 0: 2, 1: 3, 2: 4, 3: 5}), # 3 voters: E, A, B, C, D + Ranking({4: 1, 1: 2, 2: 3, 3: 4, 0: 5}), # 3 voters: E, B, C, D, A + Ranking({4: 1, 2: 2, 3: 3, 1: 4, 0: 5}), # 3 voters: E, C, D, B, A + Ranking({4: 1, 3: 2, 2: 3, 1: 4, 0: 5}), # 3 voters: E, D, C, B, A + ], rcounts=[104, 103, 102, 101, 3, 3, 3, 3], candidates=[0, 1, 2, 3, 4]) + + result = sequential_stv(prof, num_seats=2) + # Both STV and Sequential STV should elect B and C + assert len(result) == 2 + assert 1 in result, "B should be elected" + assert 2 in result, "C should be elected" + + +def test_sequential_stv_woodall_example2(): + """ + Second example from Woodall's Voting Matters paper. + + 5 candidates (A=0, B=1, C=2, D=3, E=4), 2 seats. + E is everyone's second choice. Plain STV elects B, C but + Sequential STV should elect B, E. + """ + # A=0, B=1, C=2, D=3, E=4 + # Modified so E is second choice for everyone + prof = Profile([ + [0, 4, 1, 2, 3], # 104 voters: A, E, B, C, D + [1, 4, 2, 3, 0], # 103 voters: B, E, C, D, A + [2, 4, 3, 1, 0], # 102 voters: C, E, D, B, A + [3, 4, 1, 2, 0], # 101 voters: D, E, B, C, A + [4, 0, 1, 2, 3], # 3 voters: E, A, B, C, D + [4, 1, 2, 3, 0], # 3 voters: E, B, C, D, A + [4, 2, 3, 1, 0], # 3 voters: E, C, D, B, A + [4, 3, 2, 1, 0], # 3 voters: E, D, C, B, A + ], rcounts=[104, 103, 102, 101, 3, 3, 3, 3]) + + result = sequential_stv(prof, num_seats=2) + # Sequential STV should recognize E's broad support + assert len(result) == 2 + assert 4 in result, "E should be elected due to universal second-choice support" + + +# ============================================================================ +# Sequential STV Edge Cases +# ============================================================================ + +def test_sequential_stv_single_candidate(): + """Sequential STV with only one candidate.""" + prof = Profile([[0], [0], [0]]) + result = sequential_stv(prof, num_seats=1) + assert result == [0] + + +def test_sequential_stv_num_seats_equals_candidates_minus_one(): + """Sequential STV when electing all but one candidate.""" + prof = Profile([ + [0, 1, 2, 3], + [1, 0, 2, 3], + [2, 3, 0, 1], + [3, 2, 1, 0], + ]) + result = sequential_stv(prof, num_seats=3) + assert len(result) == 3 + # Should elect 3 of the 4 candidates + assert len(set(result)) == 3 + + +def test_sequential_stv_all_tied_first_preferences(): + """Sequential STV when all candidates have equal first-preference votes.""" + prof = Profile([ + [0, 1, 2, 3], + [1, 2, 3, 0], + [2, 3, 0, 1], + [3, 0, 1, 2], + ]) + result = sequential_stv(prof, num_seats=2) + assert len(result) == 2 + # All candidates have equal first preferences, so result depends on + # tie-breaking and transfers + + +def test_sequential_stv_two_candidates_one_seat(): + """Sequential STV with just two candidates for one seat (simple majority).""" + prof = Profile([ + [0, 1], + [0, 1], + [0, 1], + [1, 0], + [1, 0], + ]) + result = sequential_stv(prof, num_seats=1) + assert result == [0], "Candidate with more first preferences should win" + + +# ============================================================================ +# Sequential STV Loop Detection +# ============================================================================ + +def test_sequential_stv_potential_loop(): + """ + Test that Sequential STV handles potential cycling scenarios. + + Create a scenario where challenges might cycle, and verify + the algorithm terminates with a valid result. + """ + # Rock-paper-scissors style preferences that might cause cycling + prof = Profile([ + [0, 1, 2], # A > B > C + [0, 1, 2], + [1, 2, 0], # B > C > A + [1, 2, 0], + [2, 0, 1], # C > A > B + [2, 0, 1], + ]) + result = sequential_stv(prof, num_seats=1) + # Should terminate and return exactly one winner + assert len(result) == 1 + assert result[0] in [0, 1, 2] + + +def test_sequential_stv_max_iterations(): + """Test that max_iterations parameter is respected.""" + prof = Profile([ + [0, 1, 2, 3], + [1, 2, 3, 0], + [2, 3, 0, 1], + [3, 0, 1, 2], + ]) + # Should complete within max_iterations + result = sequential_stv(prof, num_seats=2, max_iterations=100) + assert len(result) == 2 + + +# ============================================================================ +# Sequential STV with ProfileWithTies +# ============================================================================ + +def test_sequential_stv_profile_with_ties(): + """Sequential STV works with ProfileWithTies (ballots with tied rankings).""" + prof = ProfileWithTies([ + Ranking({0: 1, 1: 2, 2: 3}), # A > B > C + Ranking({0: 1, 1: 2, 2: 3}), + Ranking({1: 1, 0: 2, 2: 3}), # B > A > C + Ranking({2: 1, 1: 2, 0: 3}), # C > B > A + Ranking({0: 1, 1: 1, 2: 2}), # A = B > C (tie!) + ], candidates=[0, 1, 2]) + + result = sequential_stv(prof, num_seats=1) + assert len(result) == 1 + assert result[0] in [0, 1, 2] + + +def test_sequential_stv_truncated_ballots(): + """Sequential STV handles truncated ballots (not all candidates ranked).""" + prof = ProfileWithTies([ + Ranking({0: 1}), # Only ranks A + Ranking({0: 1}), + Ranking({1: 1}), # Only ranks B + Ranking({1: 1, 2: 2}), # B > C + Ranking({2: 1, 0: 2}), # C > A + ], candidates=[0, 1, 2]) + + result = sequential_stv(prof, num_seats=1) + assert len(result) == 1 + + +def test_sequential_stv_determinism(): + """Sequential STV produces consistent results across multiple runs.""" + prof = Profile([ + [0, 1, 2, 3], + [0, 1, 2, 3], + [1, 0, 2, 3], + [2, 3, 0, 1], + [3, 2, 0, 1], + ]) + + results = [tuple(sequential_stv(prof, num_seats=2)) for _ in range(5)] + # All runs should give identical results + assert all(r == results[0] for r in results), "Sequential STV should be deterministic" + + +def test_sequential_stv_challenger_tie_rule(): + """ + Test Issue 20 tie rule: if challenger ties with a probable, challenger loses. + + This test creates a scenario where a challenger ties with a probable + in the challenge round. Per Issue 20, the challenger should be treated + as having failed (status quo maintained). + + Profile: 2 candidates (A=0, B=1), 1 seat + - 2 voters: A > B + - 2 voters: B > A + + Initial Meek count: A and B tie at 2 votes each. With default tie-breaking + by candidate index, A (0) wins. So probables = [A], queue = [B]. + + Challenge with B: + Contest is A vs B for 1 seat. Both have 2 votes = quota (4/2 = 2). + This is a tie. Per Issue 20 tie rule, challenger B should lose, + maintaining A as the probable. + + Note: We're testing that the tie-break rule is applied correctly. + Without the rule, the default (lower index wins) would still give A. + The key is that Sequential STV uses the Issue 20 tie-break rule, + not the default, and the test verifies the result is correct. + """ + prof = Profile([ + [0, 1], # A > B + [0, 1], # A > B + [1, 0], # B > A + [1, 0], # B > A + ]) + + result = sequential_stv(prof, num_seats=1) + + # A should win - as the initial probable, ties go to status quo + assert 0 in result, ( + "Candidate A (0) should win: in a tie with challenger B, " + "the status quo (A as probable) should be maintained per Issue 20" + ) + + +def test_sequential_stv_borda_special_procedure(): + """ + Test that the Borda special procedure is triggered and works correctly. + + Per Issue 20, when a loop is detected and ALL candidates have been + probable at some point (so there are no "never-probables" to exclude), + the algorithm uses Borda scores to determine which "at-risk" candidate + to exclude. + + This profile creates a rock-paper-scissors dynamic: + - 2 voters: C > B > A + - 2 voters: B > A > C + - 1 voter: A > C > B + + This causes cycling where each candidate displaces another: + - Initial: C wins (A eliminated, C reaches quota) + - A challenges C: A wins (gets B>A>C transfers) + - B challenges A: B wins (gets C>B>A transfers) + - C challenges B: C wins (gets A>C>B transfer) + - Loop detected! All candidates have been probable. + + Borda scores: A=8, B=12, C=10 + The lowest-scoring at-risk candidate (A) is excluded via Borda procedure. + After A's exclusion, C beats B in the final challenge. + """ + prof = Profile([ + [2, 1, 0], # C > B > A + [2, 1, 0], # C > B > A + [1, 0, 2], # B > A > C + [1, 0, 2], # B > A > C + [0, 2, 1], # A > C > B + ]) + + result = sequential_stv(prof, num_seats=1) + + # Should terminate with exactly one winner + assert len(result) == 1 + + # C should win after Borda procedure excludes A (lowest Borda score) + # then C beats B in the remaining contest + assert result[0] == 2, ( + "Candidate C (2) should win: after Borda procedure excludes A (lowest score), " + "C beats B in the final challenge" + ) + + +def test_sequential_stv_borda_score_calculation(): + """ + Directly test the Borda score calculation used in the special procedure. + + Per Issue 20: "a Borda score is calculated, as the sum over all votes of the + number of continuing candidates to whom the candidate in question is preferred... + In practice it can help to give 2 points instead of 1 for each candidate beaten." + """ + from pref_voting.proportional_methods import _calculate_borda_scores + + # Test 1: Simple strict rankings (no ties) + # 2 voters: C > B > A + # 2 voters: B > A > C + # 1 voter: A > C > B + prof = Profile([ + [2, 1, 0], [2, 1, 0], # C > B > A (x2) + [1, 0, 2], [1, 0, 2], # B > A > C (x2) + [0, 2, 1], # A > C > B (x1) + ]) + prof_wt = prof.to_profile_with_ties() + scores = _calculate_borda_scores(prof_wt, [0, 1, 2]) + + # Manual calculation with 2 points per beat: + # Vote C>B>A (x2): C beats 2 (4pts), B beats 1 (2pts), A beats 0 (0pts) + # Vote B>A>C (x2): B beats 2 (4pts), A beats 1 (2pts), C beats 0 (0pts) + # Vote A>C>B (x1): A beats 2 (4pts), C beats 1 (2pts), B beats 0 (0pts) + # Totals: A = 0*2 + 2*2 + 4*1 = 8, B = 2*2 + 4*2 + 0*1 = 12, C = 4*2 + 0*2 + 2*1 = 10 + assert scores[0] == 8, f"A should have score 8, got {scores[0]}" + assert scores[1] == 12, f"B should have score 12, got {scores[1]}" + assert scores[2] == 10, f"C should have score 10, got {scores[2]}" + + # Test 2: Truncated ballot (unranked candidates get averaging) + # 1 voter ranks only A (B and C unmentioned = tied last) + # A beats both B and C: 4 points + # B and C unmentioned: average of {0, 2} = 1 point each (with 2x scaling) + prof2 = ProfileWithTies([Ranking({0: 1})], candidates=[0, 1, 2]) + scores2 = _calculate_borda_scores(prof2, [0, 1, 2]) + + assert scores2[0] == 4, f"A should have score 4, got {scores2[0]}" + assert scores2[1] == 1, f"B (unranked) should have score 1, got {scores2[1]}" + assert scores2[2] == 1, f"C (unranked) should have score 1, got {scores2[2]}" + + # Test 3: Explicit ties (extends averaging to explicit ties) + # 1 voter: A > {B, C} tied + # A beats both: 4 points + # B and C tied at rank 2: each gets 2*0 + (2-1) = 1 point (averaging) + prof3 = ProfileWithTies([Ranking({0: 1, 1: 2, 2: 2})], candidates=[0, 1, 2]) + scores3 = _calculate_borda_scores(prof3, [0, 1, 2]) + + assert scores3[0] == 4, f"A should have score 4, got {scores3[0]}" + assert scores3[1] == 1, f"B (tied) should have score 1, got {scores3[1]}" + assert scores3[2] == 1, f"C (tied) should have score 1, got {scores3[2]}" + + +# ============================================================================ +# Meek vs Warren difference test +# ============================================================================ + +def test_meek_warren_can_differ(): + """ + Test case from Hill & Warren paper where Meek and Warren can differ. + 4 candidates for 3 seats, 3 votes: 1 ABC, 1 BC, 1 BD. + Meek elects ABC, Warren gives C/D tie (resolved by index). + """ + prof = ProfileWithTies([ + Ranking({0: 1, 1: 2, 2: 3}), # ABC + Ranking({1: 1, 2: 2}), # BC + Ranking({1: 1, 3: 2}), # BD + ], candidates=[0, 1, 2, 3]) + + meek_result = stv_meek(prof, num_seats=3) + warren_result = stv_warren(prof, num_seats=3) + + # Both should elect A and B + assert 0 in meek_result and 1 in meek_result + assert 0 in warren_result and 1 in warren_result + + # Meek gives C an advantage via multiplicative transfer + # Warren gives C and D equal tallies (tie broken by index, so C wins) + # Both end up electing C in our implementation, but for different reasons + assert 2 in meek_result # C definitely wins under Meek From cca682c83b454adad870fdb6be524a511ee132d9 Mon Sep 17 00:00:00 2001 From: "Wesley H. Holliday" Date: Mon, 12 Jan 2026 09:21:05 -0800 Subject: [PATCH 2/2] Updated comments --- pref_voting/proportional_methods.py | 51 +++++++++++++---------------- 1 file changed, 23 insertions(+), 28 deletions(-) diff --git a/pref_voting/proportional_methods.py b/pref_voting/proportional_methods.py index 260d559e..31969cc5 100644 --- a/pref_voting/proportional_methods.py +++ b/pref_voting/proportional_methods.py @@ -2028,7 +2028,8 @@ def _stv_meek_with_elimination_order(profile, num_seats, candidates, by_order=No Used by Sequential STV to determine the initial queue. This is Meek's method with tracking of which candidates were excluded and in what order. - Issue 20 requires all STV counts to use Meek's method. + The Issue 20 Sequential STV paper (https://www.votingmatters.org.uk/ISSUE20/I20P2.PDF) + requires all STV counts to use Meek's method. Args: by_order: A function that returns a sortable key for a candidate. @@ -2171,7 +2172,8 @@ def _calculate_borda_scores(profile, continuing_candidates): """ Calculate Borda scores for continuing candidates in Sequential STV. - Per Issue 20: "a Borda score is calculated, as the sum over all votes of the + Per the Sequential STV paper (https://www.votingmatters.org.uk/ISSUE20/I20P2.PDF): + "a Borda score is calculated, as the sum over all votes of the number of continuing candidates to whom the candidate in question is preferred, taking all unmentioned continuing candidates as equal in last place. A continuing candidate who is not mentioned in a particular vote is given, for that vote, the @@ -2241,7 +2243,7 @@ def sequential_stv(profile, num_seats=2, curr_cands=None, max_iterations=1000): Sequential STV as described in Voting Matters Issue 20 (2005 revision). Reference: https://www.votingmatters.org.uk/ISSUE20/I20P2.PDF - Original: https://www.votingmatters.org.uk/ISSUE15/P4.HTM + Also see: https://www.votingmatters.org.uk/ISSUE15/P4.HTM Sequential STV addresses the "premature exclusion" problem in regular STV by systematically testing whether any non-elected candidate could replace an elected @@ -2271,7 +2273,9 @@ def sequential_stv(profile, num_seats=2, curr_cands=None, max_iterations=1000): Exclude all candidates who have never been a probable since the last restart, then restart retaining existing probables and queue. If no candidate can be excluded, use the Borda score special procedure to exclude one at-risk - candidate, then restart. + candidate, then restart. (The Borda special procedure implemented here extends + the paper's averaging rule for unranked candidates to also apply to explicitly tied candidates, + providing a generalization to ProfileWithTies.) All STV counts use Meek's method. @@ -2291,17 +2295,14 @@ def sequential_stv(profile, num_seats=2, curr_cands=None, max_iterations=1000): STV from plain IRV, which can fail to elect a Condorcet winner due to premature exclusion. - .. note:: - This implementation has been verified against the Issue 20 specification. - The Borda special procedure (used for loop resolution) extends the paper's - averaging rule for unranked candidates to also apply to explicitly tied - candidates, providing a natural generalization to ProfileWithTies. + .. warning:: + This implementation of Sequential STV has not yet been thoroughly vetted. + """ if isinstance(profile, Profile): profile = profile.to_profile_with_ties() # Sort candidates to ensure deterministic ordering regardless of iteration order - # from profile.candidates (which may be a set with unstable iteration). candidates_list = sorted(profile.candidates) if curr_cands is None else sorted(curr_cands) if num_seats <= 0 or len(candidates_list) == 0: @@ -2310,24 +2311,18 @@ def sequential_stv(profile, num_seats=2, curr_cands=None, max_iterations=1000): if num_seats >= len(candidates_list): return candidates_list # Already sorted - # Since candidates are numeric, use identity as the deterministic ordering. - # This is simpler and gives the same result as building a map from sorted order. - def by_order(c): - """Sort key using candidate's numeric value for deterministic comparisons.""" - return c - # Phase 1: Run initial STV (Meek) to get probables and exclusion order - # Issue 20 requires all STV counts to use Meek's method + # Issue 20 sequential STV paper requires all STV counts to use Meek's method probables_set, exclusion_order = _stv_meek_with_elimination_order( - profile, num_seats, candidates_list, by_order=by_order + profile, num_seats, candidates_list ) - probables = sorted(probables_set, key=by_order) + probables = sorted(probables_set) if not exclusion_order: return probables # Create queue in reverse exclusion order, with runner-up moved to end. - # Per Issue 20: "puts the others into a queue, in the reverse order of their + # Per Issue 20 Sequential STV paper: "puts the others into a queue, in the reverse order of their # exclusion in that STV count, except that the runner-up is moved to last place # as it is already known that an initial challenge by that candidate will not succeed." # Example: if exclusion_order = [A, B, C] (A excluded first, C=runner-up), @@ -2379,7 +2374,7 @@ def by_order(c): challenger = queue.popleft() # Run STV (Meek) with probables + challenger competing for num_seats. - # Per Issue 20: "Should a tie occur during these rounds, between a probable + # Per Issue 20 Sequential STV paper: "Should a tie occur during these rounds, between a probable # and a challenger, it is resolved by maintaining the current situation; # that is to say, the challenger has not succeeded." # We implement this by using a tie_break_key that gives probables higher @@ -2390,11 +2385,11 @@ def tie_break_favoring_probables(c): # tie_break_key interpretation: lower value = higher priority # - In elections: lower key = elected first (wins) # - In eliminations: lower key = survives (higher priority) - # Issue 20 tie rule: challenger loses ties → give probables lower key + # Sequential STV tie rule: challenger loses ties → give probables lower key if c == challenger: - return (1, by_order(c)) # Challenger has lower priority (loses ties) + return (1, c) # Challenger has lower priority (loses ties) else: - return (0, by_order(c)) # Probables have higher priority (win ties) + return (0, c) # Probables have higher priority (win ties) winners = stv_meek( profile, num_seats=num_seats, curr_cands=contest_cands, @@ -2408,14 +2403,14 @@ def tie_break_favoring_probables(c): losers = contest_set - winners_set if len(winners_set) != num_seats or len(losers) != 1: - # Ambiguous outcome - treat as challenger failed (status quo per Issue 20) + # Ambiguous outcome - treat as challenger failed (status quo per Issue 20 Sequential STV paper) _t(f"[Sequential-STV] Ambiguous challenge result, maintaining status quo") queue.append(challenger) challenges_since_change += 1 elif challenger in winners_set: # Challenger succeeded - find the unique loser loser = next(iter(losers)) # Exactly one loser - probables = sorted(winners_set, key=by_order) + probables = sorted(winners_set) queue.append(loser) challenges_since_change = 0 ever_probable.add(challenger) @@ -2429,7 +2424,7 @@ def tie_break_favoring_probables(c): _t(f"[Sequential-STV] {challenger} fails to displace anyone") # Loop detection: only check when probables has CHANGED - # (Issue 20 loop detection is about returning to a prior state after changes, + # (Sequential STV loop detection is about returning to a prior state after changes, # not about probables staying the same during failed challenges) current_probables = frozenset(probables) current_queue = tuple(queue) @@ -2530,7 +2525,7 @@ def _handle_sequential_stv_loop_2005(profile, num_seats, probables, queue, else: # All candidates have been probable at some point - use Borda special procedure. - # Per Issue 20: "If there is no candidate who can be so excluded, then a special + # Per Issue 20 Sequential STV paper: "If there is no candidate who can be so excluded, then a special # procedure is used, in which each continuing candidate, other than any who has # always been a probable since the last restart, is classified as 'at-risk'." _t(f"[Sequential-STV] All candidates have been probable, using Borda special procedure")