-
Notifications
You must be signed in to change notification settings - Fork 57
Expand file tree
/
Copy pathmain.py
More file actions
3194 lines (2702 loc) · 136 KB
/
main.py
File metadata and controls
3194 lines (2702 loc) · 136 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""CopperHead Server - 2-player Snake game server with competition mode."""
import argparse
import asyncio
import json
import os
import random
import logging
import secrets
import subprocess
import sys
from datetime import datetime
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import JSONResponse
from typing import Optional
from enum import Enum
import hashlib
import aiohttp
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(message)s",
datefmt="%H:%M:%S"
)
logger = logging.getLogger("copperhead")
def _setup_file_logging(log_file_path: str):
"""Add a file handler to the logger so events are written to a log file.
The file handler uses the same format as console output, but includes the
date in the timestamp for long-running servers.
"""
# Remove any existing file handlers to avoid duplicates on config reload.
for handler in logger.handlers[:]:
if isinstance(handler, logging.FileHandler):
handler.close()
logger.removeHandler(handler)
try:
file_handler = logging.FileHandler(log_file_path, encoding="utf-8")
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter(
"%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
))
logger.addHandler(file_handler)
logger.info(f"📝 Logging to {log_file_path}")
except (OSError, IOError) as e:
logger.warning(f"⚠️ Could not open log file '{log_file_path}': {e}")
# Server configuration (set by CLI args or defaults)
class ServerConfig:
arenas: int = 1
points_to_win: int = 5
reset_delay: int = 10
game_timeout: int = 30
grid_width: int = 30
grid_height: int = 20
tick_rate: float = 0.15
bots: int = 0
tournament_countdown: int = 0 # Seconds to count down before tournament start. 0 = disabled.
log_file: str = "server-log.txt" # Log file path for significant events
# Auto-start mode: "always" (auto-admit + auto-start), "admit_only" (auto-admit, admin starts),
# or "never" (admin admits and starts). Default: "admit_only"
auto_start: str = "admit_only"
# Fruit settings
fruit_warning: int = 20 # Ticks before expiry when lifetime is reported to client
max_fruits: int = 1 # Max fruits on screen at once
fruit_interval: int = 5 # Min ticks between fruit spawns
# Fruit properties: {type: {"propensity": int, "lifetime": int (0=infinite)}}
fruits: dict = None
def __init__(self):
# Default fruit config: only apples, never expire
self.fruits = {
"apple": {"propensity": 1, "lifetime": 0},
"orange": {"propensity": 0, "lifetime": 0},
"lemon": {"propensity": 0, "lifetime": 0},
"grapes": {"propensity": 0, "lifetime": 0},
"strawberry": {"propensity": 0, "lifetime": 0},
"banana": {"propensity": 0, "lifetime": 0},
"peach": {"propensity": 0, "lifetime": 0},
"cherry": {"propensity": 0, "lifetime": 0},
"watermelon": {"propensity": 0, "lifetime": 0},
"kiwi": {"propensity": 0, "lifetime": 0},
}
config = ServerConfig()
# For backward compatibility
GRID_WIDTH = property(lambda self: config.grid_width)
GRID_HEIGHT = property(lambda self: config.grid_height)
TICK_RATE = property(lambda self: config.tick_rate)
app = FastAPI(title="CopperHead Server")
# Enable CORS for client requests
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def _update_readme_admin_url(admin_url: str):
"""Replace {{ADMIN_URL}} placeholder in README.md with the actual admin URL."""
import re
script_dir = os.path.dirname(os.path.abspath(__file__))
readme_path = os.path.join(script_dir, "README.md")
try:
with open(readme_path, "r", encoding="utf-8") as f:
content = f.read()
if "{{ADMIN_URL}}" in content:
content = content.replace("{{ADMIN_URL}}", admin_url)
with open(readme_path, "w", encoding="utf-8") as f:
f.write(content)
logger.info("✓ Updated README.md with admin URL")
except Exception as e:
logger.warning(f"Could not update README.md with admin URL: {e}")
@app.on_event("startup")
async def startup_event():
logger.info("🐍 CopperHead Server started")
logger.info(f" Grid: {config.grid_width}x{config.grid_height}, Tick rate: {config.tick_rate}s")
logger.info(f" Arenas: {config.arenas}, Points to win: {config.points_to_win}")
logger.info(f" Auto-start: {config.auto_start}")
logger.info(f" Admin token: {admin_token}")
# Detect Codespaces environment
codespace_name = os.environ.get("CODESPACE_NAME")
github_domain = os.environ.get("GITHUB_CODESPACES_PORT_FORWARDING_DOMAIN", "app.github.dev")
if codespace_name:
ws_url = f"wss://{codespace_name}-8765.{github_domain}/ws/"
else:
ws_url = "ws://localhost:8765/ws/"
# Only show full connection info if not launched via start.py (which shows its own banner)
if not os.environ.get("COPPERHEAD_QUIET_STARTUP"):
if codespace_name:
logger.info("")
logger.info("=" * 60)
logger.info("📡 CLIENT CONNECTION URL:")
logger.info(f" {ws_url}")
logger.info("")
logger.info("⚠️ IMPORTANT: Make port 8765 public!")
logger.info(" 1. Open the Ports tab (bottom panel)")
logger.info(" 2. Right-click port 8765 → Port Visibility → Public")
logger.info("=" * 60)
logger.info("")
else:
logger.info("")
logger.info("📡 Client connection URL: ws://localhost:8765/ws/")
logger.info("")
# Initialize competition
await competition.start_waiting()
# Build client URL with server parameter and admin token
import urllib.parse
client_base = "https://revodavid.github.io/copperhead-client/"
client_url = f"{client_base}?server={urllib.parse.quote(ws_url, safe='')}"
admin_client_url = f"{client_url}&admin={admin_token}"
# Show URL reminder at the bottom so it's visible after all startup messages
logger.info("")
logger.info(f"📡 Server URL: {ws_url}")
logger.info(f"🎮 Play now (player): {client_url}")
logger.info(f"🔑 Admin console: {admin_client_url}")
if codespace_name:
logger.info(f"⚠️ Remember to make port 8765 PUBLIC in the Ports tab!")
# In Codespaces, update README.md with the admin URL
# (start.py already inserted the template with {{ADMIN_URL}} placeholder)
if codespace_name:
_update_readme_admin_url(admin_client_url)
# Start config file watcher for auto-restart on config changes
if _config_file_path:
asyncio.create_task(watch_config_file())
logger.info(f"👁️ Watching {os.path.basename(_config_file_path)} for changes")
class CompetitionState(Enum):
WAITING_FOR_PLAYERS = "waiting_for_players"
IN_PROGRESS = "in_progress"
PAUSED = "paused"
COMPLETE = "complete"
RESETTING = "resetting"
class PlayerInfo:
"""Track a player in the competition."""
def __init__(self, player_uid: str, name: str, websocket: WebSocket, is_bot: bool = False):
self.uid = player_uid # Unique ID across competition
self.name = name
self.websocket = websocket
self.is_bot = is_bot # True if this is a CopperBot
self.match_wins = 0 # Matches won in competition
self.game_points = 0 # Total game points across all matches
self.opponent_points = 0 # Opponent's total points (for tiebreaker)
self.eliminated = False
self.current_room: Optional["GameRoom"] = None
self.current_player_id: Optional[int] = None # 1 or 2 in current room
self.last_match_finish_time: float = 0 # Timestamp when last match finished (for Bye tiebreaker)
class MatchResult:
"""Result of a completed match."""
def __init__(self, player1_uid: str, player2_uid: str, winner_uid: str,
player1_points: int, player2_points: int):
self.player1_uid = player1_uid
self.player2_uid = player2_uid
self.winner_uid = winner_uid
self.player1_points = player1_points
self.player2_points = player2_points
# Admin token for lobby management — can be set via CLI or config, otherwise generated randomly
admin_token: str = secrets.token_hex(8) # 16-character hex string
class Lobby:
"""Manages players waiting to join a competition.
When auto_start is disabled, players join the lobby first instead of going
directly into the competition. An administrator controls which players
get assigned to match slots and when the tournament starts.
Players in the lobby are either:
- In the waiting list (joined but not assigned to a match slot)
- Assigned to a match slot (selected by the admin for the next tournament)
"""
def __init__(self):
# All players currently in the lobby (uid -> PlayerInfo)
self.players: dict[str, PlayerInfo] = {}
# Players assigned to match slots, in order (list of uids)
self.slot_assignments: list[str] = []
# Join order tracking for auto-fill (list of uids, oldest first)
self._join_order: list[str] = []
self._lock = asyncio.Lock()
self._next_uid = 1
def _generate_uid(self) -> str:
uid = f"L{self._next_uid}"
self._next_uid += 1
return uid
def max_slots(self) -> int:
"""Maximum number of match slots (arenas × 2)."""
return config.arenas * 2
def open_slots(self) -> int:
"""Number of unfilled match slots."""
return self.max_slots() - len(self.slot_assignments)
def waiting_players(self) -> list[PlayerInfo]:
"""Players in the lobby but NOT assigned to a match slot."""
assigned = set(self.slot_assignments)
return [p for p in self.players.values() if p.uid not in assigned]
async def join(self, name: str, websocket: WebSocket) -> Optional[PlayerInfo]:
"""Add a player to the lobby. Returns PlayerInfo if accepted.
When auto_start is "always" or "admit_only", also auto-assigns them to a slot."""
async with self._lock:
uid = self._generate_uid()
is_bot = name.startswith("CopperBot")
player = PlayerInfo(uid, name, websocket, is_bot=is_bot)
self.players[uid] = player
self._join_order.append(uid)
logger.info(f"📝 {name} ({uid}) joined lobby ({len(self.players)} in lobby)")
# Auto-admit to a slot when auto_start is "always" or "admit_only"
auto_admit = config.auto_start in ("always", "admit_only")
waiting = competition.state == CompetitionState.WAITING_FOR_PLAYERS
if auto_admit and waiting and self.open_slots() > 0:
self.slot_assignments.append(uid)
logger.info(f"✅ Auto-admitted {name} ({uid}) to slot ({len(self.slot_assignments)}/{self.max_slots()})")
await self._broadcast_lobby_update()
# Auto-start competition only in "always" mode when all slots are filled
if config.auto_start == "always" and waiting and self.open_slots() <= 0:
logger.info("🚀 All slots filled (auto_start=always) — starting competition")
import asyncio
asyncio.create_task(competition.start_from_lobby())
return player
async def leave(self, uid: str):
"""Remove a player from the lobby (voluntary leave or disconnect)."""
async with self._lock:
if uid not in self.players:
return
player = self.players[uid]
# Remove from slot assignments if assigned
if uid in self.slot_assignments:
self.slot_assignments.remove(uid)
# Remove from join order and player list
if uid in self._join_order:
self._join_order.remove(uid)
del self.players[uid]
logger.info(f"📝 {player.name} ({uid}) left lobby ({len(self.players)} in lobby)")
await self._broadcast_lobby_update()
async def kick(self, uid: str) -> bool:
"""Admin: kick a player from the lobby entirely. Returns True if found."""
async with self._lock:
if uid not in self.players:
return False
player = self.players[uid]
# Remove from slot assignments if assigned
if uid in self.slot_assignments:
self.slot_assignments.remove(uid)
# Remove from join order and player list
if uid in self._join_order:
self._join_order.remove(uid)
del self.players[uid]
logger.info(f"🚫 {player.name} ({uid}) kicked from lobby")
# Notify the kicked player
try:
await player.websocket.send_json({
"type": "lobby_kicked",
"message": "You have been removed from the lobby by an administrator"
})
except Exception:
pass
await self._broadcast_lobby_update()
return True
async def add_to_slot(self, uid: str) -> bool:
"""Admin: move a lobby player into the next open match slot. Returns True if successful."""
async with self._lock:
if uid not in self.players:
return False
if uid in self.slot_assignments:
return False # Already assigned
if self.open_slots() <= 0:
return False # No open slots
self.slot_assignments.append(uid)
player = self.players[uid]
logger.info(f"✅ {player.name} ({uid}) added to match slot ({len(self.slot_assignments)}/{self.max_slots()})")
await self._broadcast_lobby_update()
# Auto-start competition only in "always" mode when all slots are filled
waiting = competition.state == CompetitionState.WAITING_FOR_PLAYERS
if config.auto_start == "always" and waiting and self.open_slots() <= 0:
logger.info("🚀 All slots filled (auto_start=always) — starting competition")
import asyncio
asyncio.create_task(competition.start_from_lobby())
return True
async def remove_from_slot(self, uid: str) -> bool:
"""Admin: remove a player from their match slot (back to waiting). Returns True if successful."""
async with self._lock:
if uid not in self.slot_assignments:
return False
self.slot_assignments.remove(uid)
player = self.players.get(uid)
name = player.name if player else uid
logger.info(f"↩️ {name} ({uid}) removed from match slot")
await self._broadcast_lobby_update()
return True
def get_players_for_tournament(self) -> list[PlayerInfo]:
"""Get the players assigned to match slots, filling remaining slots
from the waiting list (in join order). Does NOT fill with bots —
that happens in the Competition start logic."""
# Start with explicitly assigned players
assigned = list(self.slot_assignments)
# Fill remaining slots from waiting list (join order)
remaining = self.open_slots()
if remaining > 0:
waiting_uids = [uid for uid in self._join_order
if uid in self.players and uid not in self.slot_assignments]
for uid in waiting_uids[:remaining]:
assigned.append(uid)
return [self.players[uid] for uid in assigned if uid in self.players]
async def clear_tournament_players(self, tournament_uids: set[str]):
"""After a tournament ends, remove players who were in the tournament.
Waitlisted players stay in the lobby for the next tournament."""
async with self._lock:
for uid in tournament_uids:
if uid in self.players:
del self.players[uid]
if uid in self.slot_assignments:
self.slot_assignments.remove(uid)
if uid in self._join_order:
self._join_order.remove(uid)
# Reset slot assignments for next tournament
self.slot_assignments.clear()
logger.info(f"🔄 Lobby reset: {len(self.players)} players remain in lobby")
await self._broadcast_lobby_update()
def get_status(self) -> dict:
"""Get current lobby state for API responses."""
assigned_set = set(self.slot_assignments)
return {
"auto_start": config.auto_start,
"countdown_remaining": competition.countdown_remaining,
"players": [
{"uid": p.uid, "name": p.name, "is_bot": p.is_bot, "in_slot": p.uid in assigned_set}
for p in self.players.values()
],
"slot_assignments": [
{"uid": uid, "name": self.players[uid].name}
for uid in self.slot_assignments if uid in self.players
],
"max_slots": self.max_slots(),
"filled_slots": len(self.slot_assignments),
"open_slots": self.open_slots(),
"waiting_count": len(self.players) - len(self.slot_assignments),
}
async def _broadcast_lobby_update(self):
"""Send lobby state update to all players in the lobby."""
status = {
"type": "lobby_update",
**self.get_status()
}
for player in list(self.players.values()):
try:
await player.websocket.send_json(status)
except Exception:
pass
# Global lobby instance
lobby = Lobby()
class Competition:
"""Manages a round-robin knockout competition."""
# Class-level history persists across competition resets
championship_history: list[dict] = []
def __init__(self):
self.state = CompetitionState.WAITING_FOR_PLAYERS
self.players: dict[str, PlayerInfo] = {} # uid -> PlayerInfo
self.current_round = 0
self.rounds: list[list[tuple[str, str]]] = [] # Each round is list of (uid1, uid2) pairings
self.match_results: list[list[MatchResult]] = [] # Results per round
self.champion_uid: Optional[str] = None
self.current_bye_uid: Optional[str] = None # Player with Bye in current round
self._lock = asyncio.Lock()
self._next_uid = 1
self.reset_start_time: Optional[float] = None # Track when reset countdown started
self.countdown_remaining: int = 0 # Current countdown seconds remaining
self._countdown_task: Optional[asyncio.Task] = None # Background countdown task
self._next_round_task: Optional[asyncio.Task] = None # Scheduled next-round start
self._generation = 0 # Incremented on every reset; used to detect stale tasks
# Pause/resume: event is set (unblocked) when running, cleared when paused
self._pause_event = asyncio.Event()
self._pause_event.set() # Start unpaused
def _generate_uid(self) -> str:
uid = f"P{self._next_uid}"
self._next_uid += 1
return uid
async def start_waiting(self):
"""Initialize competition to waiting state."""
# Clear all rooms - bots will disconnect themselves when they receive
# competition_complete or when their websocket closes
room_manager.clear_all_rooms()
self._pause_event.set() # New tournament cycle should always start unpaused
self.state = CompetitionState.WAITING_FOR_PLAYERS
self.players.clear()
self.current_round = 0
self.rounds.clear()
self.match_results.clear()
self.champion_uid = None
self.current_bye_uid = None
self.reset_start_time = None
# Cancel any running countdown or pending round-start from the previous cycle.
if self._countdown_task and not self._countdown_task.done():
self._countdown_task.cancel()
self._countdown_task = None
if self._next_round_task and not self._next_round_task.done():
self._next_round_task.cancel()
self._next_round_task = None
self._generation += 1
self._next_uid = 1
if config.auto_start == "always":
logger.info(f"🏆 Competition waiting for {config.arenas * 2} players (auto-start: always)")
elif config.auto_start == "admit_only":
logger.info(f"🏆 Competition waiting for {config.arenas * 2} players (auto-start: admit_only — admin starts)")
else:
logger.info(f"🏆 Competition waiting for {config.arenas * 2} players (auto-start: never — admin controls)")
# Start tournament countdown if configured.
if config.tournament_countdown > 0:
self.countdown_remaining = config.tournament_countdown
logger.info(f"⏱️ Tournament countdown: {config.tournament_countdown}s")
self._countdown_task = asyncio.create_task(self._run_countdown())
else:
self.countdown_remaining = 0
# Auto-admit existing lobby players to slots if auto_start is "always" or "admit_only"
# These are players who joined the lobby during the previous competition
if config.auto_start in ("always", "admit_only") and len(lobby.players) > 0:
max_slots = config.arenas * 2
for uid in list(lobby._join_order):
if uid in lobby.players and uid not in lobby.slot_assignments:
if len(lobby.slot_assignments) < max_slots:
lobby.slot_assignments.append(uid)
logger.info(f"✅ Auto-admitted {lobby.players[uid].name} ({uid}) to slot ({len(lobby.slot_assignments)}/{max_slots})")
await lobby._broadcast_lobby_update()
# If all slots filled by existing players and auto_start is "always", start immediately
if config.auto_start == "always" and lobby.open_slots() <= 0:
logger.info("🚀 All slots filled by returning players (auto_start=always) — starting competition")
asyncio.create_task(self.start_from_lobby())
# Spawn bots into the lobby if configured, minus players already waiting
# Players who joined during the previous competition persist in the lobby
if config.bots > 0:
existing_lobby_count = len(lobby.players)
bots_to_spawn = max(0, config.bots - existing_lobby_count)
if bots_to_spawn > 0:
async def delayed_bot_spawn():
await asyncio.sleep(1)
_spawn_bots_for_lobby(bots_to_spawn)
asyncio.create_task(delayed_bot_spawn())
logger.info(f"🤖 Will spawn {bots_to_spawn} bot(s) ({existing_lobby_count} player(s) already in lobby)")
elif existing_lobby_count > 0:
logger.info(f"👥 {existing_lobby_count} player(s) already in lobby — no bots needed")
async def _run_countdown(self):
"""Run the tournament countdown timer while waiting for players.
When the timer reaches zero, the tournament either starts automatically
or stays ready for an admin to start, depending on auto_start.
"""
try:
while self.countdown_remaining > 0 and self.state == CompetitionState.WAITING_FOR_PLAYERS:
await asyncio.sleep(1)
if self.state != CompetitionState.WAITING_FOR_PLAYERS:
break
self.countdown_remaining = max(0, self.countdown_remaining - 1)
await lobby._broadcast_lobby_update()
if self.state == CompetitionState.WAITING_FOR_PLAYERS and self.countdown_remaining <= 0:
if config.auto_start == "always":
logger.info("⏱️ Countdown reached zero (auto_start=always) — starting tournament")
await self.start_from_lobby()
else:
logger.info("⏱️ Countdown reached zero — tournament ready to start")
except asyncio.CancelledError:
pass
def required_players(self) -> int:
return config.arenas * 2
async def register_player(self, name: str, websocket: WebSocket) -> Optional[PlayerInfo]:
"""Register a player for the competition. Returns PlayerInfo if accepted.
When auto_start is off, players join via the Lobby instead of calling this directly."""
async with self._lock:
if self.state != CompetitionState.WAITING_FOR_PLAYERS:
return None
if len(self.players) >= self.required_players():
return None
uid = self._generate_uid()
is_bot = name.startswith("CopperBot")
player = PlayerInfo(uid, name, websocket, is_bot=is_bot)
self.players[uid] = player
logger.info(f"📝 {name} ({uid}) registered ({len(self.players)}/{self.required_players()})")
# Broadcast updated lobby status
await self._broadcast_lobby_status()
# Auto-start when full (only in "always" mode)
if config.auto_start == "always" and len(self.players) >= self.required_players():
await self._start_competition()
return player
async def unregister_player(self, uid: str):
"""Remove a player from competition."""
async with self._lock:
if uid not in self.players:
return
player = self.players[uid]
if self.state == CompetitionState.WAITING_FOR_PLAYERS:
del self.players[uid]
logger.info(f"📝 {player.name} ({uid}) left lobby ({len(self.players)}/{self.required_players()})")
await self._broadcast_lobby_status()
elif self.state == CompetitionState.IN_PROGRESS:
# Mark as eliminated - opponent wins by forfeit
player.eliminated = True
logger.info(f"🚪 {player.name} ({uid}) disconnected - forfeit")
# If this player has a Bye, they forfeit it and we need to handle round advancement
if self.current_bye_uid == uid:
logger.info(f"🎫 Bye player {player.name} disconnected - eliminated")
self.current_bye_uid = None
# Check if this was the last match needed to advance
if self.match_results and len(self.match_results[self.current_round - 1]) >= len(self.rounds[self.current_round - 1]):
await self._advance_round()
# The room will handle the forfeit logic for active games
async def _broadcast_lobby_status(self):
"""Send lobby status to all waiting players."""
status = {
"type": "lobby_status",
"players": [{"uid": p.uid, "name": p.name} for p in self.players.values()],
"required": self.required_players(),
"current": len(self.players)
}
for player in self.players.values():
try:
await player.websocket.send_json(status)
except Exception:
pass
async def _start_competition(self):
"""Start the competition with all registered players."""
self.state = CompetitionState.IN_PROGRESS
self._pause_event.set() # Clear any leftover paused state from a previous tournament
# Stop the waiting-room countdown once the tournament actually starts.
countdown_task = self._countdown_task
self._countdown_task = None
self.countdown_remaining = 0
if countdown_task and countdown_task is not asyncio.current_task() and not countdown_task.done():
countdown_task.cancel()
self.current_round = 1
# Randomly pair players for round 1
uids = list(self.players.keys())
random.shuffle(uids)
# Handle odd number of players — last player gets a bye
if len(uids) % 2 != 0:
self.current_bye_uid = uids.pop()
bye_player = self.players[self.current_bye_uid]
logger.info(f"🎫 {bye_player.name} gets a bye in Round 1")
pairings = [(uids[i], uids[i + 1]) for i in range(0, len(uids), 2)]
self.rounds.append(pairings)
self.match_results.append([])
logger.info(f"🏆 Competition started! Round 1 with {len(pairings)} matches")
for i, (uid1, uid2) in enumerate(pairings):
p1, p2 = self.players[uid1], self.players[uid2]
logger.info(f" Arena {i + 1}: {p1.name} vs {p2.name}")
# Notify all players and create rooms
await self._broadcast_competition_status()
await self._create_round_matches()
async def start_from_lobby(self) -> tuple[bool, str]:
"""Start the competition using players from the lobby.
Fills slots in this order:
1. Players already assigned to slots by the admin
2. Remaining lobby players in join order
3. CopperBots for any remaining empty slots
Returns (success, message) tuple.
Holds the competition lock for the entire operation to prevent
concurrent calls (e.g. admin click + auto-start race).
"""
async with self._lock:
if self.state != CompetitionState.WAITING_FOR_PLAYERS:
return False, "Competition is not in waiting state"
required = self.required_players()
# Count how many lobby players we have before spawning bots
lobby_count = len(lobby.get_players_for_tournament())
bots_needed = required - lobby_count
# Spawn bots if needed BEFORE collecting lobby players
if bots_needed > 0:
logger.info(f"🤖 Auto-filling {bots_needed} slot(s) with CopperBots")
_spawn_bots_for_lobby(bots_needed)
# Wait for bots to connect to the lobby
# (up to 15s — Windows subprocess startup can be slow)
for _ in range(150):
await asyncio.sleep(0.1)
if len(lobby.get_players_for_tournament()) >= required:
break
# Now collect all lobby players (including newly-joined bots)
lobby_players = lobby.get_players_for_tournament()
if len(lobby_players) < required:
return False, f"Not enough players ({len(lobby_players)}/{required}). Try again in a moment."
# Register lobby players into the competition
for lp in lobby_players[:required]:
uid = self._generate_uid()
player = PlayerInfo(uid, lp.name, lp.websocket, is_bot=lp.is_bot)
self.players[uid] = player
# Store mapping so we can route messages from lobby websocket
lp._competition_uid = uid
# Track which lobby players were used (for cleanup after tournament)
tournament_lobby_uids = {lp.uid for lp in lobby_players[:required]}
self._tournament_lobby_uids = tournament_lobby_uids
# Start the competition (outside lock — _start_competition sets state)
await self._start_competition()
# Clean up lobby — remove players who entered the tournament
await lobby.clear_tournament_players(tournament_lobby_uids)
return True, f"Competition started with {len(self.players)} players"
async def _create_round_matches(self):
"""Create game rooms for current round's matches."""
pairings = self.rounds[self.current_round - 1]
for arena_id, (uid1, uid2) in enumerate(pairings, 1):
player1 = self.players[uid1]
player2 = self.players[uid2]
# Create a room for this match
room = room_manager.create_competition_room(arena_id, uid1, uid2)
player1.current_room = room
player1.current_player_id = 1
player2.current_room = room
player2.current_player_id = 2
# Connect players to their room
await room.connect_competition_player(1, player1)
await room.connect_competition_player(2, player2)
# Notify observers in lobby about new rooms so they get reassigned
await room_manager.broadcast_room_list_to_all_observers()
async def report_match_complete(self, room: "GameRoom", winner_uid: str,
p1_uid: str, p2_uid: str, p1_points: int, p2_points: int):
"""Called when a match (first to points_to_win) completes."""
async with self._lock:
try:
result = MatchResult(p1_uid, p2_uid, winner_uid, p1_points, p2_points)
self.match_results[self.current_round - 1].append(result)
# Update player stats
if winner_uid not in self.players:
logger.error(f"❌ Winner UID {winner_uid} not in competition players: {list(self.players.keys())}")
return
winner = self.players[winner_uid]
loser_uid = p2_uid if winner_uid == p1_uid else p1_uid
if loser_uid not in self.players:
logger.error(f"❌ Loser UID {loser_uid} not in competition players: {list(self.players.keys())}")
return
loser = self.players[loser_uid]
winner.match_wins += 1
winner.game_points += p1_points if winner_uid == p1_uid else p2_points
winner.opponent_points += p2_points if winner_uid == p1_uid else p1_points
winner.last_match_finish_time = datetime.now().timestamp() # Track for Bye tiebreaker
loser.game_points += p2_points if winner_uid == p1_uid else p1_points
loser.opponent_points += p1_points if winner_uid == p1_uid else p2_points
loser.eliminated = True
loser.current_room = None # Prevent disconnect from affecting new rooms
loser.current_player_id = None
logger.info(f"🏆 Match complete: {winner.name} defeats {loser.name} ({p1_points}-{p2_points})")
# Check if all matches in round are complete
pairings = self.rounds[self.current_round - 1]
# Bye results are pre-added to match_results, so account for them
bye_count = 1 if self.current_bye_uid else 0
actual_matches = len(self.match_results[self.current_round - 1]) - bye_count
logger.info(f"📊 Round {self.current_round}: {actual_matches}/{len(pairings)} matches complete")
if actual_matches >= len(pairings):
await self._advance_round()
except Exception as e:
logger.error(f"❌ Error in report_match_complete: {e}")
import traceback
traceback.print_exc()
async def _advance_round(self):
"""Advance to next round or declare champion."""
# Get winners from current round
winners = [r.winner_uid for r in self.match_results[self.current_round - 1]]
logger.info(f"📊 Round {self.current_round} complete. Winners: {[self.players[uid].name for uid in winners]}")
# Clear all rooms from previous round
room_manager.clear_all_rooms()
# Reset bye for new round
self.current_bye_uid = None
if len(winners) == 1:
# We have a champion!
self.champion_uid = winners[0]
self.state = CompetitionState.COMPLETE
self.reset_start_time = datetime.now().timestamp() # Start countdown immediately
champion = self.players[self.champion_uid]
logger.info(f"🎉 Competition complete! Champion: {champion.name}")
# Record in championship history (including match scores)
champion_matches = self._get_champion_matches()
Competition.championship_history.append({
"champion": champion.name,
"players": len(self.players),
"timestamp": datetime.now().isoformat(),
"champion_matches": champion_matches
})
await self._broadcast_competition_complete()
# Schedule reset
asyncio.create_task(self._schedule_reset())
return
# Handle odd number of winners - highest scorer gets a Bye
bye_player = None
if len(winners) % 2 == 1:
# Sort winners by: game_points (desc), opponent_points (asc), random tiebreaker
winner_players = [self.players[uid] for uid in winners]
winner_players.sort(
key=lambda p: (-p.game_points, p.opponent_points, random.random())
)
bye_player = winner_players[0]
winners.remove(bye_player.uid)
self.current_bye_uid = bye_player.uid
logger.info(f"🎫 {bye_player.name} receives Bye (highest scorer with {bye_player.game_points} points)")
# Create next round pairings from remaining winners
self.current_round += 1
random.shuffle(winners)
pairings = [(winners[i], winners[i + 1]) for i in range(0, len(winners), 2)]
self.rounds.append(pairings)
self.match_results.append([])
logger.info(f"🏆 Round {self.current_round} starting with {len(pairings)} match(es)")
# If there was a Bye player, they auto-advance to next round's results
if bye_player:
# Create a "Bye" result - player advances without playing
bye_result = MatchResult(bye_player.uid, bye_player.uid, bye_player.uid, 0, 0)
self.match_results[self.current_round - 1].append(bye_result)
bye_player.last_match_finish_time = datetime.now().timestamp()
logger.info(f"🎫 {bye_player.name} auto-advances via Bye")
await self._broadcast_competition_status()
# Schedule the next round outside the lock so cancel/reset can proceed
generation = self._generation
self._next_round_task = asyncio.create_task(self._start_next_round(generation))
async def _start_next_round(self, generation: int):
"""Wait, create matches, optionally pause for admin, then let games run.
Runs as a separate task (outside the competition lock) so that
cancel/reset can proceed immediately without waiting for the delay.
"""
# Brief pause so observers can see round results
await asyncio.sleep(5)
# Abort if the competition was reset while we waited
if self._generation != generation:
return
# Create the match rooms so the match table is populated
await self._create_round_matches()
# Auto-pause between rounds when auto_start is "never".
# Games are frozen because game loops check _pause_event each tick.
if config.auto_start == "never":
self.state = CompetitionState.PAUSED
self._pause_event.clear()
logger.info("⏸️ Auto-paused between rounds (auto_start=never). Admin must resume.")
await self._broadcast_competition_status()
await self._pause_event.wait()
# Abort if competition was cancelled/reset during the pause
if self._generation != generation:
logger.info("🔄 Round advancement cancelled during pause")
return
# Start games in rooms where players readied up during the pause
for room in room_manager.rooms.values():
if len(room.ready) >= 2 and not room.game.running:
game_task_active = room.game_task and not room.game_task.done()
if not game_task_active:
await room.start_game()
async def _broadcast_competition_status(self):
"""Send competition status to all players."""
pairings = self.rounds[self.current_round - 1] if self.rounds else []
bye_player_name = None
if self.current_bye_uid and self.current_bye_uid in self.players:
bye_player_name = self.players[self.current_bye_uid].name
status = {
"type": "competition_status",
"state": self.state.value,
"round": self.current_round,
"total_rounds": self._calculate_total_rounds(),
"bye_player": bye_player_name,
"pairings": [
{
"arena": i + 1,
"player1": {"uid": uid1, "name": self.players[uid1].name},
"player2": {"uid": uid2, "name": self.players[uid2].name}
}
for i, (uid1, uid2) in enumerate(pairings)
]
}
for player in self.players.values():
try:
await player.websocket.send_json(status)
except Exception:
pass
def _get_champion_matches(self) -> list[dict]:
"""Build the champion's match history (most recent round first)."""
champion_matches = []
for round_idx in range(len(self.match_results) - 1, -1, -1):
for result in self.match_results[round_idx]:
if result.winner_uid == self.champion_uid:
if result.player1_uid == self.champion_uid:
opponent_uid = result.player2_uid
champ_score = result.player1_points
opp_score = result.player2_points
else:
opponent_uid = result.player1_uid
champ_score = result.player2_points
opp_score = result.player1_points
opponent_name = self.players[opponent_uid].name if opponent_uid in self.players else "Unknown"
champion_matches.append({
"round": round_idx + 1,
"opponent": opponent_name,
"champion_score": champ_score,
"opponent_score": opp_score
})
return champion_matches
async def _broadcast_competition_complete(self):
"""Announce competition winner with their match history."""
champion = self.players[self.champion_uid]
msg = {
"type": "competition_complete",
"champion": {"uid": champion.uid, "name": champion.name},
"reset_in": config.reset_delay,
"champion_matches": self._get_champion_matches()
}
for player in self.players.values():
try:
await player.websocket.send_json(msg)
except Exception:
pass
async def _schedule_reset(self):
"""Wait and then reset competition."""
# State stays COMPLETE (set by caller), countdown already started
logger.info(f"⏳ Competition resetting in {config.reset_delay} seconds...")
await asyncio.sleep(config.reset_delay)
await self.start_waiting()
logger.info("🔄 Competition reset - ready for new players")
async def pause(self) -> tuple[bool, str]:
"""Pause a running tournament. Game loops will freeze at their next tick."""
if self.state != CompetitionState.IN_PROGRESS:
return False, f"Cannot pause: competition is {self.state.value}"
self.state = CompetitionState.PAUSED
self._pause_event.clear() # Block game loops