diff --git a/pref_voting/proportional_methods.py b/pref_voting/proportional_methods.py index 9a5b92c4..31969cc5 100644 --- a/pref_voting/proportional_methods.py +++ b/pref_voting/proportional_methods.py @@ -27,6 +27,7 @@ import itertools import collections import random +import warnings from pref_voting.weighted_majority_graphs import MarginGraph from pref_voting.margin_based_methods import minimax @@ -36,7 +37,7 @@ from pref_voting.profiles import Profile EPS = 1e-12 -TRACE = bool(int(os.environ.get("STV_TRACE", "0") or "0")) +TRACE = os.environ.get("STV_TRACE", "").strip().lower() in {"1", "true", "yes", "y", "t"} def _t(msg): if TRACE: @@ -382,8 +383,10 @@ def _eliminate_lowest(pieces, continuing, parcels, tie_break_key=None): if not lowest: return None, pieces if len(lowest) > 1: + # tie_break_key interpretation: lower value = higher priority = survives + # So we eliminate the candidate with the HIGHEST key value (reverse sort) key = tie_break_key or (lambda x: x) - lowest.sort(key=key) + lowest.sort(key=key, reverse=True) elim = lowest[0] continuing.remove(elim) @@ -583,7 +586,8 @@ def eliminate_and_transfer(elim, continuing, parcels): if len(tied) > 1: chosen = history_prefer(tied, prefer="highest") if chosen is None: - chosen = rand.choice(tied) + # Sort for reproducibility with seeded RNG + chosen = rand.choice(sorted(tied)) else: chosen = tied[0] @@ -619,7 +623,8 @@ def eliminate_and_transfer(elim, continuing, parcels): max_s = max(tall_now[c] - float(quota) for c in surplusers) tied = [c for c in surplusers if abs((tall_now[c]-float(quota)) - max_s) <= EPS] if len(tied) > 1: - chosen = history_prefer(tied, prefer="highest") or rand.choice(tied) + # Sort for reproducibility with seeded RNG + chosen = history_prefer(tied, prefer="highest") or rand.choice(sorted(tied)) else: chosen = tied[0] @@ -644,7 +649,8 @@ def eliminate_and_transfer(elim, continuing, parcels): if len(lowest) > 1: elim = history_prefer(lowest, prefer="lowest") if elim is None: - elim = rand.choice(lowest) # decide by lot if tied at all previous stages + # Sort for reproducibility with seeded RNG + elim = rand.choice(sorted(lowest)) else: elim = lowest[0] @@ -776,17 +782,10 @@ def stv_nb(profile, num_seats = 2, curr_cands=None, quota_rule="nb", mann_strict drain_all=drain_all, last_parcel_only=False, ers_rounding=ers_rounding ) _t(f"Transfer surplus from {elect}: moved={moved}") - if moved: - # After each transfer, check if any continuing candidate now meets quota. - # If so, break out to let the main loop deem them elected before further transfers. - tallies_after = _tally_from_pieces(pieces, restrict_to=continuing) - newly_elected = [c for c in continuing - if (tallies_after.get(c, 0.0) > quota + EPS if mann_strict - else tallies_after.get(c, 0.0) >= quota - EPS)] - if newly_elected: - break # Go back to main loop to elect them - else: + if not moved: stuck.add(elect) + # Continue transferring all surpluses from elected_this_round before + # checking for new winners (standard STV behavior) if len(continuing) <= num_seats - len(winners): winners.extend(sorted(continuing)) @@ -903,15 +902,10 @@ def stv_wig(profile, num_seats=2, curr_cands=None, quota_rule="nb", tie_break_ke pieces, elect, quota, recipients=continuing, parcels=parcels, drain_all=True, last_parcel_only=False ) - if moved: - # After each transfer, check if any continuing candidate now meets quota. - # If so, break out to let the main loop deem them elected before further transfers. - tallies_after = _tally_from_pieces(pieces, restrict_to=continuing) - newly_elected = [c for c in continuing if tallies_after.get(c, 0.0) >= quota - EPS] - if newly_elected: - break # Go back to main loop to elect them - else: + if not moved: stuck.add(elect) + # Continue transferring all surpluses from elected_this_round before + # checking for new winners (standard STV behavior) # If remaining candidates equal remaining seats, elect them all if len(continuing) <= num_seats - len(winners): @@ -1016,13 +1010,6 @@ def stv_last_parcel(profile, num_seats = 2, curr_cands=None, quota_rule="nb", ti drain_all=True, last_parcel_only=True ) _t(f"[LP] Transfer surplus (last parcel) from {c}: moved={moved}") - if moved: - # After each transfer, check if any continuing candidate now meets quota. - # If so, break out to let the main loop deem them elected before further transfers. - tallies_after = _tally_from_pieces(pieces, restrict_to=continuing) - newly_elected = [c2 for c2 in continuing if tallies_after.get(c2, 0.0) >= quota - EPS] - if newly_elected: - break # Go back to main loop to elect them continue if len(continuing) <= num_seats - len(winners): @@ -1235,7 +1222,9 @@ def stv_meek(profile, num_seats=2, curr_cands=None, tol=1e-10, max_iter=2000, ti if newly_elected: # Elect candidates that reached quota (highest tally first) - for c in sorted(newly_elected, key=lambda x: (-tallies.get(x, 0.0), x)): + # Use tie_break_key for secondary sort (when tallies are equal) + key_fn = tie_break_key or (lambda x: x) + for c in sorted(newly_elected, key=lambda x: (-tallies.get(x, 0.0), key_fn(x))): if len(elected) >= num_seats: break hopeful.remove(c) @@ -1246,11 +1235,12 @@ def stv_meek(profile, num_seats=2, curr_cands=None, tol=1e-10, max_iter=2000, ti keep[c] = quota / t _t(f"[Meek] Elect: {c} (t={t:.6f}, quota={quota:.6f})") continue - + # No one elected - check if we can fill remaining seats with hopeful candidates if len(hopeful) <= num_seats - len(elected): # Elect all remaining hopeful candidates - for c in sorted(hopeful): + key_fn = tie_break_key or (lambda x: x) + for c in sorted(hopeful, key=key_fn): elected.add(c) break @@ -1267,11 +1257,13 @@ def stv_meek(profile, num_seats=2, curr_cands=None, tol=1e-10, max_iter=2000, ti lowest = [c] elif abs(t - min_t) <= EPS: lowest.append(c) - + if len(lowest) > 1: + # tie_break_key interpretation: lower value = higher priority = survives + # So we eliminate the candidate with the HIGHEST key value (reverse sort) key = tie_break_key or (lambda x: x) - lowest.sort(key=key) - + lowest.sort(key=key, reverse=True) + elim = lowest[0] hopeful.remove(elim) keep[elim] = 0.0 @@ -1610,9 +1602,11 @@ def stv_warren(profile, num_seats=2, curr_cands=None, tol=1e-10, max_iter=2000, lowest.append(c) if len(lowest) > 1: + # tie_break_key interpretation: lower value = higher priority = survives + # So we eliminate the candidate with the HIGHEST key value (reverse sort) key = tie_break_key or (lambda x: x) - lowest.sort(key=key) - + lowest.sort(key=key, reverse=True) + elim = lowest[0] hopeful.remove(elim) _t(f"[Warren] Eliminate: {elim} (t={min_t:.6f})") @@ -2022,4 +2016,562 @@ def cpo_stv(profile, num_seats = 2, curr_cands=None, inpair_surplus="meek", fall return sorted(winners_list[0]) # If multiple tied winners, choose one randomly - return sorted(rand.choice(winners_list)) \ No newline at end of file + return sorted(rand.choice(winners_list)) + + +# ---------- Sequential STV ---------- + +def _stv_meek_with_elimination_order(profile, num_seats, candidates, by_order=None, tol=1e-10, max_iter=2000): + """ + Run Meek STV and return both winners and elimination order. + + Used by Sequential STV to determine the initial queue. This is Meek's method + with tracking of which candidates were excluded and in what order. + + The Issue 20 Sequential STV paper (https://www.votingmatters.org.uk/ISSUE20/I20P2.PDF) + requires all STV counts to use Meek's method. + + Args: + by_order: A function that returns a sortable key for a candidate. + Used for deterministic tie-breaking. Defaults to candidate index + in the candidates list. + + Returns: + tuple: (winners_set, exclusion_order_list) + """ + # Create stable ordering if not provided + if by_order is None: + cand_order = {c: i for i, c in enumerate(candidates)} + by_order = lambda c: cand_order.get(c, 0) + + # Handle both Profile and ProfileWithTies + if isinstance(profile, Profile): + profile = profile.to_profile_with_ties() + + hopeful = set(candidates) + elected = set() + exclusion_order = [] + + # Keep factors: hopeful=1, elected=adjusted, excluded=0 + keep = {c: 1.0 for c in candidates} + + # Calculate total weight from profile + rankings, rcounts = profile.rankings_counts + total_weight = sum(float(count) for count in rcounts) + if total_weight <= EPS or not hopeful or num_seats <= 0: + return set(), [] + + safety = 0 + while len(elected) < num_seats: + safety += 1 + if safety > 50000: + raise RuntimeError("_stv_meek_with_elimination_order: loop safety tripped") + + # Candidates still in the count (hopeful or elected) + active = hopeful | elected + if not active: + break + + # Iteratively adjust keep factors until convergence + for _ in range(max_iter): + tallies, excess = _meek_tally_from_profile(profile, keep, active) + + # Quota = (total_votes - excess) / (k+1) + usable = total_weight - excess + quota = usable / float(num_seats + 1) if usable > EPS else 0.0 + + changed = False + + # Adjust keep factors for ELECTED candidates to make their tally approach quota + for c in elected: + t = tallies.get(c, 0.0) + if t > tol and keep.get(c, 1.0) > 0.0: + new_keep = keep[c] * quota / t + new_keep = max(0.0, min(1.0, new_keep)) + if abs(keep[c] - new_keep) > tol: + keep[c] = new_keep + changed = True + + if not changed and elected: + max_deviation = max(abs(tallies.get(c, 0.0) - quota) for c in elected) + if max_deviation > 10 * tol: + changed = True + + if not changed: + break + + # After convergence, check if any HOPEFUL candidate has reached quota + tallies, excess = _meek_tally_from_profile(profile, keep, active) + usable = total_weight - excess + quota = usable / float(num_seats + 1) if usable > EPS else 0.0 + + newly_elected = [] + for c in list(hopeful): + t = tallies.get(c, 0.0) + if t >= quota - tol: + newly_elected.append(c) + + if newly_elected: + # Elect candidates that reached quota (highest tally first, by_order for ties) + for c in sorted(newly_elected, key=lambda x: (-tallies.get(x, 0.0), by_order(x))): + if len(elected) >= num_seats: + break + hopeful.remove(c) + elected.add(c) + t = tallies.get(c, 0.0) + if t > quota + tol: + keep[c] = quota / t + continue + + # No one elected - check if we can fill remaining seats with hopeful candidates + if len(hopeful) <= num_seats - len(elected): + elected |= hopeful + hopeful.clear() # Clear to avoid adding to exclusion_order below + break + + # Exclude the hopeful candidate with the lowest tally + if not hopeful: + break + + min_t = float('inf') + lowest = [] + for c in hopeful: + t = tallies.get(c, 0.0) + if t < min_t - EPS: + min_t = t + lowest = [c] + elif abs(t - min_t) <= EPS: + lowest.append(c) + + # Tie-break using by_order (deterministic) + # Convention: lower value = higher priority = survives + # So we eliminate the candidate with the HIGHEST key value (reverse sort) + lowest.sort(key=by_order, reverse=True) + elim = lowest[0] + + hopeful.remove(elim) + keep[elim] = 0.0 + exclusion_order.append(elim) + + # Ensure ALL non-elected candidates are in the exclusion order. + # Some candidates may still be "hopeful" when the count ends (e.g., if enough + # candidates reached quota early). These should be added to the exclusion order + # in reverse-tally order (strongest last, making them the runner-up). + if hopeful: + # Get final tallies for remaining hopeful candidates + active = hopeful | elected + final_tallies, _ = _meek_tally_from_profile(profile, keep, active) + # Sort remaining hopeful by tally (lowest first = weakest first), by_order for ties + remaining = sorted(hopeful, key=lambda c: (final_tallies.get(c, 0.0), by_order(c))) + exclusion_order.extend(remaining) + + return elected, exclusion_order + + +def _calculate_borda_scores(profile, continuing_candidates): + """ + Calculate Borda scores for continuing candidates in Sequential STV. + + Per the Sequential STV paper (https://www.votingmatters.org.uk/ISSUE20/I20P2.PDF): + "a Borda score is calculated, as the sum over all votes of the + number of continuing candidates to whom the candidate in question is preferred, + taking all unmentioned continuing candidates as equal in last place. A continuing + candidate who is not mentioned in a particular vote is given, for that vote, the + average score that would have been attained by all those unmentioned. In practice + it can help to give 2 points instead of 1 for each candidate beaten, because all + scores, including any averages required, are then whole numbers." + + Implementation details: + - We use 2 points per candidate beaten (as suggested in the paper) + - For tied candidates (whether explicitly tied or unranked), we apply the + averaging principle: each gets the average score as if they weren't tied + - The formula for a tied group of size s with b candidates strictly below: + score = 2 * b + (s - 1) + This equals 2*b (for beating b candidates) plus the average among s tied + candidates: avg(0, 2, 4, ..., 2(s-1)) = s - 1 + + Returns: + dict: Mapping from candidate to Borda score + """ + continuing = list(continuing_candidates) + scores = {c: 0 for c in continuing} + + rankings, rcounts = profile.rankings_counts + + for ranking, count in zip(rankings, rcounts): + rmap = ranking.rmap # candidate -> rank, None if unranked + + # Group continuing candidates by their rank on this ballot + groups = collections.defaultdict(list) # rank -> list of candidates at that rank + unranked = [] + for c in continuing: + r = rmap.get(c) + if r is None: + unranked.append(c) + else: + groups[r].append(c) + + # Unranked candidates are treated as tied in last place. + # Per the paper, they get the average score: if m are unranked, each gets + # the average of {0, 2, 4, ..., 2(m-1)} = (m-1) points. + m = len(unranked) + if m: + unrank_score = m - 1 + for c in unranked: + scores[c] += unrank_score * count + + # Process ranked candidates from lowest rank (worst) to highest (best). + # Track how many continuing candidates are strictly below the current position. + below = m # Start with unranked candidates below + for r in sorted(groups.keys(), reverse=True): + g = groups[r] + s = len(g) + # Each candidate in this group beats 'below' candidates strictly, + # and ties with (s-1) others in the same group. + # Score = 2 * below + (s - 1), where (s-1) is the tie averaging. + group_score = 2 * below + (s - 1) + for c in g: + scores[c] += group_score * count + below += s # This group is now "below" for higher-ranked candidates + + return scores + + +@vm(name="Sequential-STV", input_types=[ElectionTypes.PROFILE, ElectionTypes.PROFILE_WITH_TIES]) +def sequential_stv(profile, num_seats=2, curr_cands=None, max_iterations=1000): + """ + Sequential STV as described in Voting Matters Issue 20 (2005 revision). + + Reference: https://www.votingmatters.org.uk/ISSUE20/I20P2.PDF + Also see: https://www.votingmatters.org.uk/ISSUE15/P4.HTM + + Sequential STV addresses the "premature exclusion" problem in regular STV by + systematically testing whether any non-elected candidate could replace an elected + one. It seeks to find a set of n candidates that observes Droop Proportionality + and is preferred by the largest majority of voters to any other possible set. + + Algorithm: + + **Phase 1 - Initialization:** + Run an initial STV count (using Meek's method) to classify candidates as + "probables" (would-be winners) or put them in a queue in reverse exclusion + order, with the runner-up moved to the end. + + **Phase 2 - Challenge Loop:** + For each challenger (head of queue), run STV with n+1 candidates for n seats. + If challenger succeeds, they become a probable and the beaten candidate goes + to the end of the queue. If challenger fails (including ties), they go to + the end of the queue. Continue until a complete run through the queue with + no successful challenger (stable solution) or a loop is detected. + + **Phase 3 - Loop Detection:** + - Certain loop: Same probables set recurs with identical queue order + - Possible loop: Same probables set with different queue; second chance given, + but if it recurs again, treat as certain loop + + **Phase 4 - Loop Resolution:** + Exclude all candidates who have never been a probable since the last restart, + then restart retaining existing probables and queue. If no candidate can be + excluded, use the Borda score special procedure to exclude one at-risk + candidate, then restart. (The Borda special procedure implemented here extends + the paper's averaging rule for unranked candidates to also apply to explicitly tied candidates, + providing a generalization to ProfileWithTies.) + + All STV counts use Meek's method. + + Args: + profile: A Profile or ProfileWithTies object containing voter rankings + num_seats (int): Number of seats to fill + curr_cands: List of candidates to consider, defaults to all candidates in profile + max_iterations (int): Maximum iterations before giving up (prevents infinite loops) + + Returns: + list: List of elected candidates + + .. note:: + In the single-winner case (num_seats=1), Sequential STV is Condorcet-consistent: + if there is a Condorcet winner (a candidate who beats every other candidate + head-to-head), Sequential STV will elect them. This distinguishes Sequential + STV from plain IRV, which can fail to elect a Condorcet winner due to + premature exclusion. + + .. warning:: + This implementation of Sequential STV has not yet been thoroughly vetted. + + """ + if isinstance(profile, Profile): + profile = profile.to_profile_with_ties() + + # Sort candidates to ensure deterministic ordering regardless of iteration order + candidates_list = sorted(profile.candidates) if curr_cands is None else sorted(curr_cands) + + if num_seats <= 0 or len(candidates_list) == 0: + return [] + + if num_seats >= len(candidates_list): + return candidates_list # Already sorted + + # Phase 1: Run initial STV (Meek) to get probables and exclusion order + # Issue 20 sequential STV paper requires all STV counts to use Meek's method + probables_set, exclusion_order = _stv_meek_with_elimination_order( + profile, num_seats, candidates_list + ) + probables = sorted(probables_set) + + if not exclusion_order: + return probables + + # Create queue in reverse exclusion order, with runner-up moved to end. + # Per Issue 20 Sequential STV paper: "puts the others into a queue, in the reverse order of their + # exclusion in that STV count, except that the runner-up is moved to last place + # as it is already known that an initial challenge by that candidate will not succeed." + # Example: if exclusion_order = [A, B, C] (A excluded first, C=runner-up), + # then reversed = [C, B, A], and queue = [B, A, C] (runner-up C at end). + reversed_order = list(reversed(exclusion_order)) + if len(reversed_order) > 1: + runner_up = reversed_order[0] + queue = collections.deque(reversed_order[1:] + [runner_up]) + else: + queue = collections.deque(reversed_order) + + _t(f"[Sequential-STV] Initial probables: {probables}") + _t(f"[Sequential-STV] Exclusion order: {exclusion_order}") + _t(f"[Sequential-STV] Initial queue: {list(queue)}") + + # Track candidates who have ever been probable since last restart + ever_probable = set(probables) + + # Track candidates who have ALWAYS been probable since last restart + # (for determining "at-risk" candidates in special procedure) + always_probable = set(probables) + + # Track probables set occurrences for loop detection + # Key: frozenset of probables, Value: list of queue tuples seen with that probables set + probables_occurrences = collections.defaultdict(list) + probables_occurrences[frozenset(probables)].append(tuple(queue)) + + # Track if we've given a "second chance" for a probables set with different queue + second_chance_given = set() + + # Track the previous probables set to detect when it changes + last_probables = frozenset(probables) + + iterations = 0 + challenges_since_change = 0 + + while iterations < max_iterations: + iterations += 1 + + # Check if we've completed a full cycle without changes (stable solution) + if challenges_since_change >= len(queue): + _t(f"[Sequential-STV] Stable solution found at iteration {iterations}") + return probables + + if not queue: + return probables + + # Get the next challenger + challenger = queue.popleft() + + # Run STV (Meek) with probables + challenger competing for num_seats. + # Per Issue 20 Sequential STV paper: "Should a tie occur during these rounds, between a probable + # and a challenger, it is resolved by maintaining the current situation; + # that is to say, the challenger has not succeeded." + # We implement this by using a tie_break_key that gives probables higher + # priority (lower key value), so challengers lose ties. + contest_cands = probables + [challenger] + + def tie_break_favoring_probables(c): + # tie_break_key interpretation: lower value = higher priority + # - In elections: lower key = elected first (wins) + # - In eliminations: lower key = survives (higher priority) + # Sequential STV tie rule: challenger loses ties → give probables lower key + if c == challenger: + return (1, c) # Challenger has lower priority (loses ties) + else: + return (0, c) # Probables have higher priority (win ties) + + winners = stv_meek( + profile, num_seats=num_seats, curr_cands=contest_cands, + tie_break_key=tie_break_favoring_probables + ) + winners_set = set(winners) + + # n+1-for-n invariant: expect exactly num_seats winners and 1 loser + # If outcome is ambiguous (wrong number of winners), maintain status quo + contest_set = set(contest_cands) + losers = contest_set - winners_set + + if len(winners_set) != num_seats or len(losers) != 1: + # Ambiguous outcome - treat as challenger failed (status quo per Issue 20 Sequential STV paper) + _t(f"[Sequential-STV] Ambiguous challenge result, maintaining status quo") + queue.append(challenger) + challenges_since_change += 1 + elif challenger in winners_set: + # Challenger succeeded - find the unique loser + loser = next(iter(losers)) # Exactly one loser + probables = sorted(winners_set) + queue.append(loser) + challenges_since_change = 0 + ever_probable.add(challenger) + # Update always_probable: intersect with new probables + always_probable = always_probable & winners_set + _t(f"[Sequential-STV] {challenger} displaces {loser}") + else: + # Challenger failed + queue.append(challenger) + challenges_since_change += 1 + _t(f"[Sequential-STV] {challenger} fails to displace anyone") + + # Loop detection: only check when probables has CHANGED + # (Sequential STV loop detection is about returning to a prior state after changes, + # not about probables staying the same during failed challenges) + current_probables = frozenset(probables) + current_queue = tuple(queue) + + # Only do loop detection if probables changed this round + probables_changed = (current_probables != last_probables) + + loop_detected = False + if probables_changed and current_probables in probables_occurrences: + prev_queues = probables_occurrences[current_probables] + + if current_queue in prev_queues: + # Certain loop: same probables AND same queue order + _t(f"[Sequential-STV] Certain loop detected (same probables and queue)") + loop_detected = True + + elif current_probables in second_chance_given: + # Same probables set seen before with different queue, and we already + # gave a second chance - now treat as loop + _t(f"[Sequential-STV] Loop detected (same probables, second recurrence)") + loop_detected = True + + else: + # Same probables but different queue - give second chance + _t(f"[Sequential-STV] Possible loop (same probables, different queue) - second chance") + second_chance_given.add(current_probables) + + if loop_detected: + # Handle the loop - returns (is_final, result) tuple + is_final, result = _handle_sequential_stv_loop_2005( + profile, num_seats, probables, queue, ever_probable, always_probable + ) + if is_final: + # Final result - return winners + return result + # Restart: update probables if changed + if result is not None: + probables = result + # Reset tracking but keep probables and queue + ever_probable = set(probables) + always_probable = set(probables) + probables_occurrences.clear() + second_chance_given.clear() + challenges_since_change = 0 + last_probables = frozenset(probables) + probables_occurrences[last_probables].append(tuple(queue)) + continue + + # Update tracking + if probables_changed: + probables_occurrences[current_probables].append(current_queue) + last_probables = current_probables + + _t(f"[Sequential-STV] Max iterations reached") + warnings.warn( + f"Sequential STV reached max_iterations ({max_iterations}) without converging. " + "The result may be incomplete. Consider increasing max_iterations.", + RuntimeWarning + ) + return probables + + +def _handle_sequential_stv_loop_2005(profile, num_seats, probables, queue, + ever_probable, always_probable): + """ + Handle a detected loop in Sequential STV (2005 version). + + Loop resolution per the 2005 paper: + 1. Exclude all candidates who have never been a probable since last restart + 2. If no such candidates, use Borda score special procedure to exclude + one "at-risk" candidate (those not always probable) + 3. Restart with existing probables and queue + + Returns: + tuple: (is_final, result) where: + - is_final=True, result=winners_list: Election complete, return winners + - is_final=False, result=new_probables: Restart with updated probables + - is_final=False, result=None: Restart, probables unchanged (only queue changed) + """ + + all_in_contest = set(probables) | set(queue) + never_probable = all_in_contest - ever_probable + + if never_probable: + # Exclude all candidates who have never been probable + _t(f"[Sequential-STV] Excluding never-probables: {never_probable}") + + # Remove never-probables from the queue (mutate in place) + new_queue = [c for c in queue if c not in never_probable] + queue.clear() + queue.extend(new_queue) + + # Check if we're done + if len(probables) == num_seats and not queue: + return (True, list(probables)) # Final result + + return (False, None) # Restart, probables unchanged + + else: + # All candidates have been probable at some point - use Borda special procedure. + # Per Issue 20 Sequential STV paper: "If there is no candidate who can be so excluded, then a special + # procedure is used, in which each continuing candidate, other than any who has + # always been a probable since the last restart, is classified as 'at-risk'." + _t(f"[Sequential-STV] All candidates have been probable, using Borda special procedure") + + # At-risk candidates: those not ALWAYS probable since last restart + at_risk = all_in_contest - always_probable + + if not at_risk: + # Everyone has always been probable - just return current probables + _t(f"[Sequential-STV] No at-risk candidates, returning current probables") + return (True, list(probables)) # Final result + + # Calculate Borda scores for all continuing candidates + continuing = list(probables) + list(queue) + scores = _calculate_borda_scores(profile, continuing) + + # Find the at-risk candidate with the lowest Borda score + # Use candidate number for deterministic tie-breaking among equal scores + at_risk_scores = [(scores[c], c) for c in at_risk] + at_risk_scores.sort() # Sort by score, then by candidate number + _, to_exclude = at_risk_scores[0] + + _t(f"[Sequential-STV] Borda scores: {scores}") + _t(f"[Sequential-STV] Excluding at-risk candidate with lowest score: {to_exclude}") + + if to_exclude in probables: + # Excluded candidate was a probable - head of queue becomes probable + new_probables = [c for c in probables if c != to_exclude] + if queue: + new_probable = queue.popleft() + new_probables = sorted(new_probables + [new_probable]) + _t(f"[Sequential-STV] {new_probable} promoted to probable") + + # Check if we're done + if len(new_probables) == num_seats and not queue: + return (True, new_probables) # Final result + + return (False, new_probables) # Restart with updated probables + else: + # Excluded candidate was in queue - just remove from queue (mutate in place) + new_queue = [c for c in queue if c != to_exclude] + queue.clear() + queue.extend(new_queue) + + # Check if we're done + if len(probables) == num_seats and not queue: + return (True, list(probables)) # Final result + + return (False, None) # Restart, probables unchanged \ No newline at end of file diff --git a/tests/test_proportional_methods.py b/tests/test_proportional_methods.py new file mode 100644 index 00000000..87c502b2 --- /dev/null +++ b/tests/test_proportional_methods.py @@ -0,0 +1,788 @@ +""" +Tests for proportional voting methods (STV variants). + +These tests cover the main STV implementations including Scottish STV, Meek, Warren, +and other variants including Sequential STV. +""" + +from pref_voting.proportional_methods import ( + stv_scottish, stv_meek, stv_warren, stv_nb, stv_wig, + stv_last_parcel, approval_stv, cpo_stv, sequential_stv +) +from pref_voting.profiles import Profile +from pref_voting.profiles_with_ties import ProfileWithTies, Ranking + +import pytest + + +# ============================================================================ +# Fixtures +# ============================================================================ + +@pytest.fixture +def simple_profile(): + """A simple 3-candidate profile where candidate 0 is clearly preferred.""" + return Profile([ + [0, 1, 2], + [0, 2, 1], + [1, 0, 2], + ]) + + +@pytest.fixture +def party_blocs_profile(): + """ + Two party blocs: A-party (candidates 0,1) and B-party (candidates 2,3). + 6 voters prefer A-party, 4 voters prefer B-party. + For 2 seats, fair result should be 1 from each party. + """ + return Profile([ + [0, 1, 2, 3], # A-party voter + [0, 1, 2, 3], + [0, 1, 2, 3], + [1, 0, 2, 3], # A-party voter (different order) + [1, 0, 2, 3], + [1, 0, 2, 3], + [2, 3, 0, 1], # B-party voter + [2, 3, 0, 1], + [3, 2, 0, 1], # B-party voter (different order) + [3, 2, 0, 1], + ]) + + +@pytest.fixture +def profile_with_clear_winner(): + """Profile where candidate 0 has overwhelming first-preference support.""" + return Profile([ + [0, 1, 2], + [0, 1, 2], + [0, 1, 2], + [0, 2, 1], + [0, 2, 1], + [1, 0, 2], + [2, 1, 0], + ]) + + +@pytest.fixture +def profile_for_transfers(): + """Profile designed to test surplus transfer mechanics.""" + return Profile([ + [0, 1, 2], + [0, 1, 2], + [0, 1, 2], + [0, 1, 2], + [0, 2, 1], + [1, 2, 0], + [2, 1, 0], + ]) + + +@pytest.fixture +def single_voter_profile(): + """Single voter - tests faithfulness property.""" + return Profile([[0, 1, 2, 3, 4]]) + + +@pytest.fixture +def profile_with_ties(): + """Profile with tied preferences (for ProfileWithTies).""" + return ProfileWithTies([ + Ranking({0: 1, 1: 2, 2: 3}), + Ranking({0: 1, 1: 2, 2: 3}), + Ranking({1: 1, 0: 2, 2: 3}), + Ranking({2: 1, 1: 2, 0: 3}), + ]) + + +# ============================================================================ +# Basic functionality tests +# ============================================================================ + +@pytest.mark.parametrize("method", [ + stv_scottish, stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_methods_return_correct_number_of_seats(method, simple_profile): + """All methods should return exactly num_seats winners.""" + result = method(simple_profile, num_seats=1) + assert len(result) == 1 + + result = method(simple_profile, num_seats=2) + assert len(result) == 2 + + +@pytest.mark.parametrize("method", [ + stv_scottish, stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_methods_elect_clear_favorite(method, profile_with_clear_winner): + """When one candidate has clear majority support, they should be elected.""" + result = method(profile_with_clear_winner, num_seats=1) + assert 0 in result, f"{method.name} should elect candidate 0 who has majority support" + + +@pytest.mark.parametrize("method", [ + stv_scottish, stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_methods_accept_profile_with_ties(method, profile_with_ties): + """All methods should accept ProfileWithTies input.""" + result = method(profile_with_ties, num_seats=2) + assert len(result) == 2 + assert all(c in [0, 1, 2] for c in result) + + +# ============================================================================ +# Proportionality tests +# ============================================================================ + +@pytest.mark.parametrize("method", [ + stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_proportionality_two_party(method, party_blocs_profile): + """ + With 60% A-party and 40% B-party voters electing 2 seats, + proportional methods should elect 1 from each party. + """ + result = method(party_blocs_profile, num_seats=2) + a_party_elected = len([c for c in result if c in [0, 1]]) + b_party_elected = len([c for c in result if c in [2, 3]]) + assert a_party_elected == 1, f"{method.name} should elect exactly 1 A-party candidate" + assert b_party_elected == 1, f"{method.name} should elect exactly 1 B-party candidate" + + +# ============================================================================ +# Faithfulness tests (single voter should get top-k) +# ============================================================================ + +@pytest.mark.parametrize("method", [ + stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_faithfulness_single_voter(method, single_voter_profile): + """ + With a single voter, the method should elect the voter's top-k candidates. + Note: Scottish STV does NOT satisfy this due to integer Droop quota mechanics. + """ + result = method(single_voter_profile, num_seats=3) + assert set(result) == {0, 1, 2}, f"{method.name} should elect voter's top 3 choices" + + +@pytest.mark.skip(reason="Scottish STV does not satisfy faithfulness due to integer Droop quota") +def test_scottish_faithfulness_single_voter(single_voter_profile): + """ + Scottish STV with single voter - documents expected behavior. + With 1 voter and k seats, quota = floor(1/(k+1)) + 1 = 1, + so no candidate reaches quota and eliminations determine result. + """ + result = stv_scottish(single_voter_profile, num_seats=3) + # This test is skipped because Scottish STV doesn't guarantee this + assert set(result) == {0, 1, 2} + + +# ============================================================================ +# Approval STV tests +# ============================================================================ + +def test_approval_stv_basic(profile_with_clear_winner): + """Approval STV should elect the clearly preferred candidate.""" + result = approval_stv(profile_with_clear_winner, num_seats=1) + assert 0 in result + + +def test_approval_stv_with_profile_with_ties(): + """Approval STV handles equal rankings (approval-style ballots).""" + # Voters approve multiple candidates equally + prof = ProfileWithTies([ + Ranking({0: 1, 1: 1, 2: 2}), # Approves 0 and 1 equally + Ranking({0: 1, 1: 1, 2: 2}), + Ranking({2: 1, 0: 2, 1: 2}), # Approves only 2 + ]) + result = approval_stv(prof, num_seats=2) + assert len(result) == 2 + + +# ============================================================================ +# CPO-STV tests +# ============================================================================ + +def test_cpo_stv_basic(simple_profile): + """CPO-STV should return the correct number of winners.""" + result = cpo_stv(simple_profile, num_seats=2) + assert len(result) == 2 + + +def test_cpo_stv_condorcet_committee(): + """ + CPO-STV should elect the Condorcet committee when one exists. + Profile where {0, 2} beats all other pairs pairwise. + """ + prof = Profile([ + [0, 2, 1, 3], + [0, 2, 1, 3], + [2, 0, 3, 1], + [2, 0, 3, 1], + [1, 3, 0, 2], + [3, 1, 2, 0], + ]) + result = cpo_stv(prof, num_seats=2) + # {0, 2} should be elected as they form the strongest pair + assert 0 in result or 2 in result # At minimum one of the Condorcet pair + + +# ============================================================================ +# Edge cases +# ============================================================================ + +@pytest.mark.parametrize("method", [ + stv_scottish, stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_all_seats_equals_candidates(method): + """When num_seats equals number of candidates, all should be elected.""" + prof = Profile([[0, 1, 2]]) + result = method(prof, num_seats=3) + assert set(result) == {0, 1, 2} + + +@pytest.mark.parametrize("method", [ + stv_scottish, stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_single_candidate_single_seat(method): + """Single candidate for single seat should be elected.""" + prof = Profile([[0]]) + result = method(prof, num_seats=1) + assert result == [0] + + +@pytest.mark.parametrize("method", [ + stv_scottish, stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_curr_cands_subset(method, simple_profile): + """Methods should respect curr_cands parameter.""" + result = method(simple_profile, num_seats=1, curr_cands=[1, 2]) + assert all(c in [1, 2] for c in result) + assert 0 not in result + + +@pytest.mark.parametrize("method", [ + stv_meek, stv_warren, stv_nb, stv_last_parcel +]) +def test_empty_election(method): + """Empty profile should return empty result for iterative methods.""" + prof = ProfileWithTies([], candidates=[0, 1, 2]) + result = method(prof, num_seats=2) + assert result == [] + + +def test_empty_election_scottish_wig(): + """ + Scottish STV and WIG may elect candidates with empty profile due to + 'if continuing candidates equal remaining seats, elect all' rule. + This tests they don't crash rather than checking specific behavior. + """ + prof = ProfileWithTies([], candidates=[0, 1, 2]) + result_scottish = stv_scottish(prof, num_seats=2) + result_wig = stv_wig(prof, num_seats=2) + # Just verify they return something without crashing + assert isinstance(result_scottish, list) + assert isinstance(result_wig, list) + + +@pytest.mark.parametrize("method", [ + stv_scottish, stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_zero_seats(method, simple_profile): + """Requesting 0 seats should return empty result.""" + result = method(simple_profile, num_seats=0) + assert result == [] + + +# ============================================================================ +# Determinism tests +# ============================================================================ + +@pytest.mark.parametrize("method", [ + stv_meek, stv_warren, stv_nb, stv_wig, stv_last_parcel +]) +def test_determinism(method, party_blocs_profile): + """Same input should produce same output (with same tie-breaking).""" + result1 = method(party_blocs_profile, num_seats=2) + result2 = method(party_blocs_profile, num_seats=2) + assert result1 == result2 + + +def test_scottish_determinism_with_fixed_rng(party_blocs_profile): + """Scottish STV is deterministic when given a fixed RNG for tie-breaking.""" + import random + rng1 = random.Random(42) + rng2 = random.Random(42) + result1 = stv_scottish(party_blocs_profile, num_seats=2, rng=rng1) + result2 = stv_scottish(party_blocs_profile, num_seats=2, rng=rng2) + assert result1 == result2 + + +# ============================================================================ +# Sequential STV tests +# ============================================================================ + +def test_sequential_stv_basic(): + """Sequential STV returns correct number of seats.""" + prof = Profile([ + [0, 1, 2, 3], + [0, 1, 2, 3], + [1, 0, 2, 3], + [2, 3, 0, 1], + [3, 2, 0, 1], + ]) + result = sequential_stv(prof, num_seats=2) + assert len(result) == 2 + + +def test_sequential_stv_condorcet_winner(): + """Sequential STV elects the Condorcet winner when num_seats=1.""" + # Candidate 1 is Condorcet winner: beats 0 (4-3) and beats 2 (4-3) + prof = Profile([ + [0, 1, 2], + [0, 1, 2], + [0, 1, 2], + [1, 2, 0], + [1, 2, 0], + [2, 1, 0], + [2, 1, 0], + ]) + result = sequential_stv(prof, num_seats=1) + assert result == [1], "Sequential STV should elect Condorcet winner" + + +def test_sequential_stv_condorcet_winner_larger(): + """Sequential STV finds Condorcet winner in larger candidate field.""" + # Candidate 2 beats everyone + prof = Profile([ + [2, 0, 1, 3, 4], + [2, 0, 1, 3, 4], + [2, 1, 0, 3, 4], + [0, 2, 1, 3, 4], + [1, 2, 0, 3, 4], + [3, 2, 0, 1, 4], + [4, 2, 0, 1, 3], + ]) + result = sequential_stv(prof, num_seats=1) + assert result == [2], "Sequential STV should elect Condorcet winner" + + +def test_sequential_stv_condorcet_cycle(): + """Sequential STV handles Condorcet cycles gracefully.""" + # Classic 3-way cycle: 0 > 1 > 2 > 0 + prof = Profile([ + [0, 1, 2], + [1, 2, 0], + [2, 0, 1], + ]) + result = sequential_stv(prof, num_seats=1) + # Should return some valid candidate, not crash + assert len(result) == 1 + assert result[0] in [0, 1, 2] + + +def test_sequential_stv_more_seats(): + """Sequential STV works correctly with more seats.""" + prof = Profile([ + [0, 1, 2, 3, 4], + [0, 1, 2, 3, 4], + [1, 0, 2, 3, 4], + [2, 3, 0, 1, 4], + [3, 2, 0, 1, 4], + [4, 3, 2, 1, 0], + ]) + result_2 = sequential_stv(prof, num_seats=2) + result_3 = sequential_stv(prof, num_seats=3) + assert len(result_2) == 2 + assert len(result_3) == 3 + + +def test_sequential_stv_universal_second_choice(): + """ + Sequential STV can elect a universal second-choice candidate. + + This is the key example from the Voting Matters paper where + Sequential STV differs from plain STV. + """ + # E (candidate 4) is everyone's second choice + prof = Profile([ + [0, 4, 1, 2, 3], # A>E>... + [0, 4, 1, 2, 3], + [0, 4, 1, 2, 3], + [1, 4, 0, 2, 3], # B>E>... + [1, 4, 0, 2, 3], + [1, 4, 0, 2, 3], + [2, 4, 0, 1, 3], # C>E>... + [2, 4, 0, 1, 3], + [2, 4, 0, 1, 3], + [3, 4, 0, 1, 2], # D>E>... + [3, 4, 0, 1, 2], + [3, 4, 0, 1, 2], + [4, 0, 1, 2, 3], # E>A>... + ]) + result = sequential_stv(prof, num_seats=2) + # E (candidate 4) should be elected due to broad second-choice support + assert 4 in result, "Sequential STV should elect universal second-choice candidate" + + +def test_sequential_stv_woodall_example1(): + """ + First example from Woodall's Voting Matters paper. + + 5 candidates (A=0, B=1, C=2, D=3, E=4), 2 seats. + In this scenario, both plain STV and Sequential STV elect B and C. + """ + # A=0, B=1, C=2, D=3, E=4 + # Use ProfileWithTies to handle truncated ballots (E not ranked by some voters) + prof = ProfileWithTies([ + Ranking({0: 1, 1: 2, 2: 3, 3: 4}), # 104 voters: A, B, C, D (E unranked) + Ranking({1: 1, 2: 2, 3: 3, 0: 4}), # 103 voters: B, C, D, A + Ranking({2: 1, 3: 2, 1: 3, 0: 4}), # 102 voters: C, D, B, A + Ranking({3: 1, 1: 2, 2: 3, 0: 4}), # 101 voters: D, B, C, A + Ranking({4: 1, 0: 2, 1: 3, 2: 4, 3: 5}), # 3 voters: E, A, B, C, D + Ranking({4: 1, 1: 2, 2: 3, 3: 4, 0: 5}), # 3 voters: E, B, C, D, A + Ranking({4: 1, 2: 2, 3: 3, 1: 4, 0: 5}), # 3 voters: E, C, D, B, A + Ranking({4: 1, 3: 2, 2: 3, 1: 4, 0: 5}), # 3 voters: E, D, C, B, A + ], rcounts=[104, 103, 102, 101, 3, 3, 3, 3], candidates=[0, 1, 2, 3, 4]) + + result = sequential_stv(prof, num_seats=2) + # Both STV and Sequential STV should elect B and C + assert len(result) == 2 + assert 1 in result, "B should be elected" + assert 2 in result, "C should be elected" + + +def test_sequential_stv_woodall_example2(): + """ + Second example from Woodall's Voting Matters paper. + + 5 candidates (A=0, B=1, C=2, D=3, E=4), 2 seats. + E is everyone's second choice. Plain STV elects B, C but + Sequential STV should elect B, E. + """ + # A=0, B=1, C=2, D=3, E=4 + # Modified so E is second choice for everyone + prof = Profile([ + [0, 4, 1, 2, 3], # 104 voters: A, E, B, C, D + [1, 4, 2, 3, 0], # 103 voters: B, E, C, D, A + [2, 4, 3, 1, 0], # 102 voters: C, E, D, B, A + [3, 4, 1, 2, 0], # 101 voters: D, E, B, C, A + [4, 0, 1, 2, 3], # 3 voters: E, A, B, C, D + [4, 1, 2, 3, 0], # 3 voters: E, B, C, D, A + [4, 2, 3, 1, 0], # 3 voters: E, C, D, B, A + [4, 3, 2, 1, 0], # 3 voters: E, D, C, B, A + ], rcounts=[104, 103, 102, 101, 3, 3, 3, 3]) + + result = sequential_stv(prof, num_seats=2) + # Sequential STV should recognize E's broad support + assert len(result) == 2 + assert 4 in result, "E should be elected due to universal second-choice support" + + +# ============================================================================ +# Sequential STV Edge Cases +# ============================================================================ + +def test_sequential_stv_single_candidate(): + """Sequential STV with only one candidate.""" + prof = Profile([[0], [0], [0]]) + result = sequential_stv(prof, num_seats=1) + assert result == [0] + + +def test_sequential_stv_num_seats_equals_candidates_minus_one(): + """Sequential STV when electing all but one candidate.""" + prof = Profile([ + [0, 1, 2, 3], + [1, 0, 2, 3], + [2, 3, 0, 1], + [3, 2, 1, 0], + ]) + result = sequential_stv(prof, num_seats=3) + assert len(result) == 3 + # Should elect 3 of the 4 candidates + assert len(set(result)) == 3 + + +def test_sequential_stv_all_tied_first_preferences(): + """Sequential STV when all candidates have equal first-preference votes.""" + prof = Profile([ + [0, 1, 2, 3], + [1, 2, 3, 0], + [2, 3, 0, 1], + [3, 0, 1, 2], + ]) + result = sequential_stv(prof, num_seats=2) + assert len(result) == 2 + # All candidates have equal first preferences, so result depends on + # tie-breaking and transfers + + +def test_sequential_stv_two_candidates_one_seat(): + """Sequential STV with just two candidates for one seat (simple majority).""" + prof = Profile([ + [0, 1], + [0, 1], + [0, 1], + [1, 0], + [1, 0], + ]) + result = sequential_stv(prof, num_seats=1) + assert result == [0], "Candidate with more first preferences should win" + + +# ============================================================================ +# Sequential STV Loop Detection +# ============================================================================ + +def test_sequential_stv_potential_loop(): + """ + Test that Sequential STV handles potential cycling scenarios. + + Create a scenario where challenges might cycle, and verify + the algorithm terminates with a valid result. + """ + # Rock-paper-scissors style preferences that might cause cycling + prof = Profile([ + [0, 1, 2], # A > B > C + [0, 1, 2], + [1, 2, 0], # B > C > A + [1, 2, 0], + [2, 0, 1], # C > A > B + [2, 0, 1], + ]) + result = sequential_stv(prof, num_seats=1) + # Should terminate and return exactly one winner + assert len(result) == 1 + assert result[0] in [0, 1, 2] + + +def test_sequential_stv_max_iterations(): + """Test that max_iterations parameter is respected.""" + prof = Profile([ + [0, 1, 2, 3], + [1, 2, 3, 0], + [2, 3, 0, 1], + [3, 0, 1, 2], + ]) + # Should complete within max_iterations + result = sequential_stv(prof, num_seats=2, max_iterations=100) + assert len(result) == 2 + + +# ============================================================================ +# Sequential STV with ProfileWithTies +# ============================================================================ + +def test_sequential_stv_profile_with_ties(): + """Sequential STV works with ProfileWithTies (ballots with tied rankings).""" + prof = ProfileWithTies([ + Ranking({0: 1, 1: 2, 2: 3}), # A > B > C + Ranking({0: 1, 1: 2, 2: 3}), + Ranking({1: 1, 0: 2, 2: 3}), # B > A > C + Ranking({2: 1, 1: 2, 0: 3}), # C > B > A + Ranking({0: 1, 1: 1, 2: 2}), # A = B > C (tie!) + ], candidates=[0, 1, 2]) + + result = sequential_stv(prof, num_seats=1) + assert len(result) == 1 + assert result[0] in [0, 1, 2] + + +def test_sequential_stv_truncated_ballots(): + """Sequential STV handles truncated ballots (not all candidates ranked).""" + prof = ProfileWithTies([ + Ranking({0: 1}), # Only ranks A + Ranking({0: 1}), + Ranking({1: 1}), # Only ranks B + Ranking({1: 1, 2: 2}), # B > C + Ranking({2: 1, 0: 2}), # C > A + ], candidates=[0, 1, 2]) + + result = sequential_stv(prof, num_seats=1) + assert len(result) == 1 + + +def test_sequential_stv_determinism(): + """Sequential STV produces consistent results across multiple runs.""" + prof = Profile([ + [0, 1, 2, 3], + [0, 1, 2, 3], + [1, 0, 2, 3], + [2, 3, 0, 1], + [3, 2, 0, 1], + ]) + + results = [tuple(sequential_stv(prof, num_seats=2)) for _ in range(5)] + # All runs should give identical results + assert all(r == results[0] for r in results), "Sequential STV should be deterministic" + + +def test_sequential_stv_challenger_tie_rule(): + """ + Test Issue 20 tie rule: if challenger ties with a probable, challenger loses. + + This test creates a scenario where a challenger ties with a probable + in the challenge round. Per Issue 20, the challenger should be treated + as having failed (status quo maintained). + + Profile: 2 candidates (A=0, B=1), 1 seat + - 2 voters: A > B + - 2 voters: B > A + + Initial Meek count: A and B tie at 2 votes each. With default tie-breaking + by candidate index, A (0) wins. So probables = [A], queue = [B]. + + Challenge with B: + Contest is A vs B for 1 seat. Both have 2 votes = quota (4/2 = 2). + This is a tie. Per Issue 20 tie rule, challenger B should lose, + maintaining A as the probable. + + Note: We're testing that the tie-break rule is applied correctly. + Without the rule, the default (lower index wins) would still give A. + The key is that Sequential STV uses the Issue 20 tie-break rule, + not the default, and the test verifies the result is correct. + """ + prof = Profile([ + [0, 1], # A > B + [0, 1], # A > B + [1, 0], # B > A + [1, 0], # B > A + ]) + + result = sequential_stv(prof, num_seats=1) + + # A should win - as the initial probable, ties go to status quo + assert 0 in result, ( + "Candidate A (0) should win: in a tie with challenger B, " + "the status quo (A as probable) should be maintained per Issue 20" + ) + + +def test_sequential_stv_borda_special_procedure(): + """ + Test that the Borda special procedure is triggered and works correctly. + + Per Issue 20, when a loop is detected and ALL candidates have been + probable at some point (so there are no "never-probables" to exclude), + the algorithm uses Borda scores to determine which "at-risk" candidate + to exclude. + + This profile creates a rock-paper-scissors dynamic: + - 2 voters: C > B > A + - 2 voters: B > A > C + - 1 voter: A > C > B + + This causes cycling where each candidate displaces another: + - Initial: C wins (A eliminated, C reaches quota) + - A challenges C: A wins (gets B>A>C transfers) + - B challenges A: B wins (gets C>B>A transfers) + - C challenges B: C wins (gets A>C>B transfer) + - Loop detected! All candidates have been probable. + + Borda scores: A=8, B=12, C=10 + The lowest-scoring at-risk candidate (A) is excluded via Borda procedure. + After A's exclusion, C beats B in the final challenge. + """ + prof = Profile([ + [2, 1, 0], # C > B > A + [2, 1, 0], # C > B > A + [1, 0, 2], # B > A > C + [1, 0, 2], # B > A > C + [0, 2, 1], # A > C > B + ]) + + result = sequential_stv(prof, num_seats=1) + + # Should terminate with exactly one winner + assert len(result) == 1 + + # C should win after Borda procedure excludes A (lowest Borda score) + # then C beats B in the remaining contest + assert result[0] == 2, ( + "Candidate C (2) should win: after Borda procedure excludes A (lowest score), " + "C beats B in the final challenge" + ) + + +def test_sequential_stv_borda_score_calculation(): + """ + Directly test the Borda score calculation used in the special procedure. + + Per Issue 20: "a Borda score is calculated, as the sum over all votes of the + number of continuing candidates to whom the candidate in question is preferred... + In practice it can help to give 2 points instead of 1 for each candidate beaten." + """ + from pref_voting.proportional_methods import _calculate_borda_scores + + # Test 1: Simple strict rankings (no ties) + # 2 voters: C > B > A + # 2 voters: B > A > C + # 1 voter: A > C > B + prof = Profile([ + [2, 1, 0], [2, 1, 0], # C > B > A (x2) + [1, 0, 2], [1, 0, 2], # B > A > C (x2) + [0, 2, 1], # A > C > B (x1) + ]) + prof_wt = prof.to_profile_with_ties() + scores = _calculate_borda_scores(prof_wt, [0, 1, 2]) + + # Manual calculation with 2 points per beat: + # Vote C>B>A (x2): C beats 2 (4pts), B beats 1 (2pts), A beats 0 (0pts) + # Vote B>A>C (x2): B beats 2 (4pts), A beats 1 (2pts), C beats 0 (0pts) + # Vote A>C>B (x1): A beats 2 (4pts), C beats 1 (2pts), B beats 0 (0pts) + # Totals: A = 0*2 + 2*2 + 4*1 = 8, B = 2*2 + 4*2 + 0*1 = 12, C = 4*2 + 0*2 + 2*1 = 10 + assert scores[0] == 8, f"A should have score 8, got {scores[0]}" + assert scores[1] == 12, f"B should have score 12, got {scores[1]}" + assert scores[2] == 10, f"C should have score 10, got {scores[2]}" + + # Test 2: Truncated ballot (unranked candidates get averaging) + # 1 voter ranks only A (B and C unmentioned = tied last) + # A beats both B and C: 4 points + # B and C unmentioned: average of {0, 2} = 1 point each (with 2x scaling) + prof2 = ProfileWithTies([Ranking({0: 1})], candidates=[0, 1, 2]) + scores2 = _calculate_borda_scores(prof2, [0, 1, 2]) + + assert scores2[0] == 4, f"A should have score 4, got {scores2[0]}" + assert scores2[1] == 1, f"B (unranked) should have score 1, got {scores2[1]}" + assert scores2[2] == 1, f"C (unranked) should have score 1, got {scores2[2]}" + + # Test 3: Explicit ties (extends averaging to explicit ties) + # 1 voter: A > {B, C} tied + # A beats both: 4 points + # B and C tied at rank 2: each gets 2*0 + (2-1) = 1 point (averaging) + prof3 = ProfileWithTies([Ranking({0: 1, 1: 2, 2: 2})], candidates=[0, 1, 2]) + scores3 = _calculate_borda_scores(prof3, [0, 1, 2]) + + assert scores3[0] == 4, f"A should have score 4, got {scores3[0]}" + assert scores3[1] == 1, f"B (tied) should have score 1, got {scores3[1]}" + assert scores3[2] == 1, f"C (tied) should have score 1, got {scores3[2]}" + + +# ============================================================================ +# Meek vs Warren difference test +# ============================================================================ + +def test_meek_warren_can_differ(): + """ + Test case from Hill & Warren paper where Meek and Warren can differ. + 4 candidates for 3 seats, 3 votes: 1 ABC, 1 BC, 1 BD. + Meek elects ABC, Warren gives C/D tie (resolved by index). + """ + prof = ProfileWithTies([ + Ranking({0: 1, 1: 2, 2: 3}), # ABC + Ranking({1: 1, 2: 2}), # BC + Ranking({1: 1, 3: 2}), # BD + ], candidates=[0, 1, 2, 3]) + + meek_result = stv_meek(prof, num_seats=3) + warren_result = stv_warren(prof, num_seats=3) + + # Both should elect A and B + assert 0 in meek_result and 1 in meek_result + assert 0 in warren_result and 1 in warren_result + + # Meek gives C an advantage via multiplicative transfer + # Warren gives C and D equal tallies (tie broken by index, so C wins) + # Both end up electing C in our implementation, but for different reasons + assert 2 in meek_result # C definitely wins under Meek