forked from harrischristiansen/generals-bot
-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathArmyTracker.py
More file actions
4106 lines (3410 loc) · 188 KB
/
ArmyTracker.py
File metadata and controls
4106 lines (3410 loc) · 188 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
@ Travis Drake (EklipZ) eklipz.io - tdrake0x45 at gmail)
April 2017
Generals.io Automated Client - https://github.com/harrischristiansen/generals-bot
EklipZ bot - Tries to play generals lol
"""
from __future__ import annotations
import itertools
import DebugHelper
import SearchUtils
from Algorithms import MapSpanningUtils
from Army import Army
from BoardAnalyzer import BoardAnalyzer
from Interfaces import TileSet
from Models import Move
from MapMatrix import MapMatrixSet, TileSet
from PerformanceTimer import PerformanceTimer
from SearchUtils import *
from Path import Path
from base.client.map import Tile, TILE_OBSTACLE, TileDelta, Player, MODIFIER_TORUS, MAX_ALLY_SPAWN_DISTANCE, MIN_ALLY_SPAWN_DISTANCE
class PlayerAggressionTracker(object):
def __init__(self, index):
self.player = index
class ArmyTracker(object):
def __init__(self, map: MapBase, perfTimer: PerformanceTimer | None = None):
self.connectedByPlayer: typing.List[TileSet] = [set() for p in map.players]
self.coreConnectedByPlayer: typing.List[TileSet] = [set() for p in map.players]
"""Connected by player except with ends trimmed to just the core of the graph."""
self.perf_timer: PerformanceTimer = perfTimer
if self.perf_timer is None:
self.perf_timer = PerformanceTimer()
self.player_moves_this_turn: typing.Set[int] = set()
self.map: MapBase = map
self.general: Tile = map.generals[map.player_index]
self.armies: typing.Dict[Tile, Army] = {}
"""Actual armies. During a scan, this stores the armies that haven't been dealt with since last turn, still."""
self.initial_expansion_tile_counts: typing.List[int] = [0 for p in self.map.players]
self.should_recalc_fog_land_by_player: typing.List[bool] = [True for p in self.map.players]
self.unaccounted_tile_diffs: typing.Dict[Tile, int] = {}
"""
Used to keep track of messy army interaction diffs discovered when determining an army didn't
do exactly what was expected to use to infer what armies on adjacent tiles did.
Negative numbers mean attacked by opp, (or opp tile attacked by friendly army).
Positive would mean ally merged with our army (or enemy ally merged with enemy army...?)
"""
self.valid_general_positions_by_player: typing.List[MapMatrixSet] = [MapMatrixSet(map) for _ in self.map.players]
"""The true/false matrix of valid general positions by player"""
#
# self.player_fog_connections: typing.List[typing.Set[Tile]] = [set() for player in self.map.players]
# """The tiles we assume are connecting """
self.tiles_ever_owned_by_player: typing.List[typing.Set[Tile]] = [set([t for t in player.tiles if t.visible or t.discovered]) for player in self.map.players]
"""The set of tiles that we've ever seen owned by a player. TODO exclude tiles from player captures...?"""
self.uneliminated_emergence_events: typing.List[typing.Dict[Tile, int]] = [{} for player in self.map.players]
"""The set of emergence events that have resulted in general location restrictions in the past and have not been dropped in favor of more restrictive restrictions."""
self.unrecaptured_emergence_events: typing.List[typing.Set[Tile]] = [set() for player in self.map.players]
"""The set of emergence events from which an army emerged and the target player still owns the tile. ONLY counts when we didn't find an obvious emergence path for the player."""
self.uneliminated_emergence_event_city_perfect_info: typing.List[typing.Set[Tile]] = [set() for player in self.map.players]
"""Whether a given emergence event had perfect city info or not."""
self.pathed_fog_emergence_tiles = set()
"""Tiles that have been pathed successfully with good paths and thus have no emergence associated with them. Reset each turn."""
self.seen_player_lookup: typing.List[bool] = [False for player in self.map.players]
"""Whether a player has been seen."""
self.is_long_spawns: bool = len(self.map.players) == 2
self.min_spawn_distance: int = 9
if self.is_long_spawns:
self.min_spawn_distance = 15
if self.map.is_2v2:
self.min_spawn_distance = 15
if self.map.is_custom_map:
self.min_spawn_distance = 1
self._initialize_viable_general_positions()
self.player_launch_timings = [0 for _ in self.map.players]
self.skip_emergence_tile_pathings = set()
self.updated_city_tiles: typing.Set[Tile] = set()
self.unconnectable_tiles: typing.List[typing.Set[Tile]] = [set() for p in self.map.players]
self.player_connected_tiles: typing.List[typing.Set[Tile]] = [set() for p in self.map.players]
self.players_with_incorrect_tile_predictions: typing.Set[int] = set()
"""Cities that were revealed or changed players or mountains that were fog-guess-cities will get stuck here during map updates."""
self.lastMove: Move | None = None
self.track_threshold: int = 3
"""Minimum tile value required to track an 'army' for performance reasons."""
self.update_track_threshold()
self._flipped_tiles: typing.Set[Tile] = set()
"""Tracks any tile that changed players on a given turn"""
self._flipped_by_army_tracker_this_turn: typing.List[typing.Tuple[int, Tile]] = []
"""The list of all (oldOwner,tile)s that were updated by armyTracker this turn."""
self.fogPaths = []
# TODO replace me with mapmatrix, emergenceLocationMap\[([^\]]+).x\]\[[^\]]+.y\] -> emergenceLocationMap[$1]
self.emergenceLocationMap: typing.List[MapMatrixInterface[float]] = [MapMatrix(self.map, 0.0) for z in range(len(self.map.players))]
"""List by player of emergence values."""
self.player_targets: typing.List[Tile] = []
"""The list of tiles we expect an enemy player might be trying to attack."""
self.notify_unresolved_army_emerged: typing.List[typing.Callable[[Tile], None]] = []
self.notify_army_moved: typing.List[typing.Callable[[Army], None]] = []
self.player_aggression_ratings = [PlayerAggressionTracker(z) for z in range(len(self.map.players))]
self.lastTurn = -1
self.decremented_fog_tiles_this_turn: typing.Set[Tile] = set()
self.dropped_fog_tiles_this_turn: typing.Set[Tile] = set()
self._boardAnalysis: BoardAnalyzer | None = None
def __getstate__(self):
state = self.__dict__.copy()
if 'notify_unresolved_army_emerged' in state:
del state['notify_unresolved_army_emerged']
if 'notify_army_moved' in state:
del state['notify_army_moved']
if 'perf_timer' in state:
del state['perf_timer']
# if '_boardAnalysis' in state:
# del state['_boardAnalysis']
return state
def __setstate__(self, state):
self.__dict__.update(state)
self.notify_unresolved_army_emerged = []
self.notify_army_moved = []
# distMap used to determine how to move armies under fog
def scan_movement_and_emergences(
self,
lastMove: Move | None,
turn: int,
boardAnalysis: BoardAnalyzer
):
self.pathed_fog_emergence_tiles = set()
self._flipped_by_army_tracker_this_turn = []
self._boardAnalysis = boardAnalysis
self.skip_emergence_tile_pathings = set()
self.lastMove = lastMove
self.decremented_fog_tiles_this_turn = set()
self.dropped_fog_tiles_this_turn = set()
self.players_with_incorrect_tile_predictions = set()
for player in self.map.players:
if self.player_launch_timings[player.index] == 0 and player.tileCount > 1:
if player.tileCount == 2:
self.player_launch_timings[player.index] = self.map.turn - 1
else:
# then this is a unit test or custom map with starting tiles
self.player_launch_timings[player.index] = 24
if self.map.turn == 50 and not self.map.is_low_cost_city_game:
with self.perf_timer.begin_move_event(f'limiting player spawns by start value'):
usTeam = self.map.team_ids_by_player_index[self.general.player]
for player in self.map.players:
if player.team != usTeam and player.tileCount >= 22 and player.cityCount < 2:
# Then they can't have spawned at any tile with only one liberty.
self.limit_player_spawn_by_good_start(player)
with self.perf_timer.begin_move_event('ArmyTracker neutral discovery'):
self._pre_army_track_handle_flipped_tiles()
with self.perf_timer.begin_move_event('ArmyTracker city rescan'):
self.rescan_city_information()
self.player_targets = self.map.players[self.map.player_index].cities.copy()
self.player_targets.append(self.map.generals[self.map.player_index])
for teammate in self.map.teammates:
teamPlayer = self.map.players[teammate]
self.player_targets.extend(teamPlayer.cities)
self.player_targets.append(teamPlayer.general)
advancedTurn = False
if turn > self.lastTurn:
advancedTurn = True
if self.lastTurn == -1:
logbook.info('armyTracker last turn wasnt set, exclusively scanning for new armies to avoid pushing fog tiles around on the turn the map was loaded up')
self.lastTurn = turn
self.find_new_armies()
return
self.lastTurn = turn
self.player_moves_this_turn: typing.Set[int] = set()
else:
logbook.info(f'army tracker scan ran twice this turn {turn}...? Bailing?')
return
# if we have perfect info about a players general / cities, we don't need to track emergence, clear the emergence map
for player in self.map.players:
if player.team == self.map.friendly_team:
continue
if self.has_perfect_information_of_player_cities_and_general(player.index):
logbook.info(f'Resetting p{player.index} emergences because we have perfect info at the moment.')
self._reset_player_emergences(player.index)
self.fogPaths = []
with self.perf_timer.begin_move_event('ArmyTracker army movement'):
self.track_army_movement()
with self.perf_timer.begin_move_event('ArmyTracker non-neutral flipped tiles'):
self._post_army_track_handle_flipped_tiles()
self._flipped_tiles.clear()
with self.perf_timer.begin_move_event('ArmyTracker find new armies'):
self.find_new_armies()
with self.perf_timer.begin_move_event('ArmyTracker fog movement / increment'):
if advancedTurn:
self.move_fogged_army_paths()
self.increment_fogged_armies()
for army in self.armies.values():
if army.tile.visible:
logbook.info(f'updating {army} last seen turn to {self.map.turn}')
army.last_seen_turn = self.map.turn
for player in self.map.players:
if len(player.tiles) > 0:
self.seen_player_lookup[player.index] = True
for player in self.map.players:
general = self.map.generals[player.index]
if general and general.player != player.index and general.isGeneral:
self.map.generals[player.index] = None
general.isGeneral = False
logbook.info(f' RESET BAD GENERAL {general}')
def limit_player_spawn_by_good_start(self, player, debugTile = None):
if player.cityCount > 1:
return
if player.tileCount > 25:
return
if player.score > 52:
return
depthCheck = 15
# isPerfect = player.tileCount == 25
realWastesAllowed = (25 - player.tileCount) * 2
pIdx = player.index
tilesWithBarely = []
tilesCouldntGetBetter = []
biggestNegative = 0
for tile in self.map.get_all_tiles():
if not self.valid_general_positions_by_player[pIdx].raw[tile.tile_index]:
continue
wastesAllowed = realWastesAllowed
foundByRange = [0] * (depthCheck + 1)
def foreachFunc(t: Tile, d: int):
if t.visible and t.player != player.index and t not in self.tiles_ever_owned_by_player[player.index]:
return True
foundByRange[d] += 1
SearchUtils.breadth_first_foreach_dist_fast_no_neut_cities(
self.map,
[tile],
maxDepth=depthCheck,
foreachFunc=foreachFunc,
)
# if isPerfect:
# numValid = len(list(tile.movableNoObstacles))
# if numValid == 1:
# self.valid_general_positions_by_player[pIdx].raw[tile.tile_index] = False
# for mv in tile.movableNoObstacles:
# if self.valid_general_positions_by_player[pIdx].raw[mv.tile_index]:
# numValid = len(list(mv.movableNoObstacles))
# if numValid <= 2:
# self.valid_general_positions_by_player[pIdx].raw[mv.tile_index] = False
isDebugTile = debugTile and debugTile.coords == tile.coords
if isDebugTile:
pass
avail = wastesAllowed
minValid = [1, 1, 4, 7, 10, 13, 15, 17, 19, 20, 21, 22, 23, 24, 24, 24] # ethryns lower bound for arbitrary graphs (assumes infinite tiles at range 2 are available) -- tweaked to be -1 since counting general
minValid = [1, 2, 5, 8, 11, 13, 16, 17, 19, 20, 21, 22, 23, 24, 24, 24] # mine empirical
minValid = [1, 2, 5, 8, 10, 13, 15, 17, 19, 20, 21, 22, 23, 24, 24, 24] # with range 4 and range 6 adjusted downwards due to ethryn_hack_4_10__rx07Ek732.txtmap and ethryn_hack_6_15__rx07Ek732.txtmap counter-examples
elimmed = False
exactlyMets = 0
barelyMets = 0
for i in range(1, depthCheck + 1):
avail += foundByRange[i]
cutoff = minValid[i]
if cutoff > avail:
if isDebugTile:
logbook.info(f'elimmed {tile} based on {i} min {cutoff} vs found {avail}')
elimmed = True
self.valid_general_positions_by_player[pIdx].raw[tile.tile_index] = False
else:
if cutoff == avail and i > 2:
exactlyMets += 1
if cutoff > avail - 4:
barelyMets += 1
if isDebugTile:
logbook.info(f'legal {tile} based on {i} min {cutoff} vs found {avail}')
if isDebugTile:
logbook.info(f'exactlyMets {exactlyMets}, barelyMets {barelyMets}')
if elimmed:
continue
tilesWithBarely.append((exactlyMets, barelyMets, tile))
if player.tileCount < 25:
self.emergenceLocationMap[player.index].raw[tile.tile_index] += min(1.5, barelyMets / 2)
exactPenalty = min(2, exactlyMets / 2)
if exactPenalty > biggestNegative:
biggestNegative = exactPenalty
self.emergenceLocationMap[player.index].raw[tile.tile_index] -= exactPenalty
if wastesAllowed > 0:
wastesAllowed -= 2
avail = wastesAllowed
for i in range(1, depthCheck + 1):
avail += foundByRange[i]
cutoff = minValid[i]
if cutoff > avail:
if isDebugTile:
logbook.info(f'r2 elimmed {tile} based on {i} min {cutoff} vs found {avail}')
elimmed = True
else:
if isDebugTile:
logbook.info(f'r2 legal {tile} based on {i} min {cutoff} vs found {avail}')
if isDebugTile:
logbook.info(f'POST better check, elimmed waste {wastesAllowed}: {elimmed}')
if elimmed:
tilesCouldntGetBetter.append(tile)
# if player.tileCount > 22 and player.tileCount < 25:
for tile in self.map.pathable_tiles:
if not self.valid_general_positions_by_player[pIdx].raw[tile.tile_index]:
continue
self.emergenceLocationMap[player.index].raw[tile.tile_index] += biggestNegative
if tilesCouldntGetBetter:
for tile in tilesCouldntGetBetter:
self.emergenceLocationMap[player.index].raw[tile.tile_index] += 3
def update_fog_prediction(
self,
playerIndex: int,
playersExpectedFogTileCounts: typing.Dict[int, int],
predictedGeneralLocation: Tile | None,
force: bool = False
):
# TODO really, only REBUILD when player.index in self.players_with_incorrect_tile_predictions
# Otherwise, just add or subtract one fog tile to match.
player = self.map.players[playerIndex]
if self.map.is_player_on_team_with(self.map.player_index, player.index) or len(player.tiles) == 0 or (player.tileDelta == 0 and player.index not in self.players_with_incorrect_tile_predictions and not force):
return
if self.should_recalc_fog_land_by_player[player.index]:
self._build_fog_prediction_internal(player.index, playersExpectedFogTileCounts, predictedGeneralLocation)
self.should_recalc_fog_land_by_player[player.index] = False
else:
self._inc_slash_dec_fog_prediction_to_get_player_tile_count_right(player.index, playersExpectedFogTileCounts, predictedGeneralLocation)
def increment_fogged_armies(self):
if not self.map.is_army_bonus_turn:
return
if DebugHelper.IS_DEBUGGING:
logbook.info(f'FOG DEBUG: Incrementing fogged armies for army bonus turn')
fogArmies = [a for a in self.armies.values() if not a.tile.visible]
logbook.info(f'FOG DEBUG: Found {len(fogArmies)} fog armies to increment')
for army in list(self.armies.values()):
if army.tile.visible:
continue
if army.tile.army > army.value + 1 > army.tile.army - 2:
if DebugHelper.IS_DEBUGGING:
logbook.info(f'FOG DEBUG: Incrementing army {str(army)} from {army.value} to {army.value + 1}')
army.value += 1
else:
if DebugHelper.IS_DEBUGGING:
logbook.info(f'FOG DEBUG: NOT incrementing army {str(army)} - army.tile.army={army.tile.army}, army.value={army.value}')
def move_fogged_army_paths(self):
armyVals = list(a for a in self.armies.values() if not a.tile.visible)
if DebugHelper.IS_DEBUGGING:
logbook.info(f"FOG DEBUG: Starting move_fogged_army_paths with {len(armyVals)} fog armies")
for army in armyVals:
if army.player == self.map.player_index or army.player in self.map.teammates:
self.scrap_army(army, scrapEntangled=False)
continue
if army.last_moved_turn == self.map.turn - 1:
army.value = army.tile.army - 1
continue
if army.player in self.player_moves_this_turn:
continue
if not army.visible and army.last_seen_turn < self.map.turn - 10 and (army.tile.isCity or army.tile.isGeneral):
logbook.info(f'skipping army {army} as it hasnt moved and is on a city/gen')
continue
if DebugHelper.IS_DEBUGGING:
logbook.info(f'FOG DEBUG: Processing fog army {str(army)} at {army.tile} with value {army.value}')
logbook.info(f'FOG DEBUG: Army {str(army)} has {len(army.expectedPaths)} expected paths')
origTile = army.tile
origTileArmy = army.tile.army
anyNextVisible = False
fogPathNexts = {}
for path in army.expectedPaths:
if (path is None
or path.start is None
or path.start.next is None
or path.start.next.tile is None
):
continue
nextTile = path.start.next.tile
if nextTile.visible:
anyNextVisible = True
# can't move out of fog, so will leave tile there
nextTile = path.start.tile
try:
nextPaths = fogPathNexts[nextTile]
except KeyError:
nextPaths = []
fogPathNexts[nextTile] = nextPaths
nextPaths.append(path)
if DebugHelper.IS_DEBUGGING:
logbook.info(f'FOG DEBUG: Army {str(army)} fogPathNexts: {[(str(tile), len(paths)) for tile, paths in fogPathNexts.items()]}')
if len(fogPathNexts) > 1:
if DebugHelper.IS_DEBUGGING:
logbook.info(f'FOG DEBUG: SPLITTING army {str(army)} into {len(fogPathNexts)} paths')
self.armies.pop(army.tile, None)
nextArmies = army.get_split_for_fog(list(fogPathNexts.keys()))
for nextTile, paths in fogPathNexts.items():
nextArmy = nextArmies.pop()
nextArmy.expectedPaths = []
if nextTile == origTile:
for path in paths:
if DebugHelper.IS_DEBUG_OR_UNIT_TEST_MODE:
logbook.info(f'for army {str(nextArmy)} ignoring SPLIT fog path move into visible: {str(path)}')
nextArmy.include_path(path)
if not nextArmy.scrapped:
self.armies[nextArmy.tile] = nextArmy
continue
for path in paths:
nextArmy.include_path(path)
if DebugHelper.IS_DEBUG_OR_UNIT_TEST_MODE:
logbook.info(f'respecting army {str(nextArmy)} SPLIT fog path: {str(path)}')
self._move_fogged_army_along_path(nextArmy, path, armyAlreadyPopped=True)
if DebugHelper.IS_DEBUG_OR_UNIT_TEST_MODE:
logbook.info(f'AFTER: army {str(nextArmy)}: {str(nextArmy.expectedPaths)}')
if not nextArmy.scrapped:
self.armies[nextArmy.tile] = nextArmy
elif len(fogPathNexts) == 1:
if DebugHelper.IS_DEBUGGING:
logbook.info(f'FOG DEBUG: Army {str(army)} has single path, moving normally')
for path in army.expectedPaths:
if path is not None and path.start.next is not None and path.start.next.tile.visible:
if DebugHelper.IS_DEBUG_OR_UNIT_TEST_MODE:
logbook.info(f'for army {str(army)} ignoring fog path move into visible: {str(path)}')
continue
if DebugHelper.IS_DEBUG_OR_UNIT_TEST_MODE:
logbook.info(f'respecting army {str(army)} fog path: {str(path)}')
self._move_fogged_army_along_path(army, path)
if DebugHelper.IS_DEBUG_OR_UNIT_TEST_MODE:
logbook.info(f'AFTER: army {str(army)}: {str(army.expectedPaths)}')
if anyNextVisible:
origTile.army = origTileArmy
def clean_up_armies(self):
for army in list(self.armies.values()):
if army is not None and army.tile.visible:
army.last_seen_turn = self.map.turn
if army.scrapped:
logbook.info(f"Army {str(army)} was scrapped last turn, deleting.")
if army.tile in self.armies and self.armies[army.tile] == army:
del self.armies[army.tile]
continue
elif (army.player == self.map.player_index or army.player in self.map.teammates) and not army.tile.visible:
logbook.info(f"Army {str(army)} was ours but under fog now, so was destroyed. Scrapping.")
self.scrap_army(army, scrapEntangled=True)
elif army.tile.visible and len(army.entangledArmies) > 0 and army.tile.player == army.player:
if army.tile.army * 1.2 > army.value > (army.tile.army - 1) * 0.8:
# we're within range of expected army value, resolve entanglement :D
logbook.info(f"Army {str(army)} was entangled and rediscovered :D disentangling other armies")
self.resolve_entangled_armies(army)
else:
logbook.info(
f"Army {str(army)} was entangled at this tile, but army value doesn't match expected?\n - NOT army.tile.army * 1.2 ({army.tile.army * 1.2}) > army.value ({army.value}) > (army.tile.army - 1) * 0.8 ({(army.tile.army - 1) * 0.8})")
for entangled in army.entangledArmies:
logbook.info(f" removing {str(army)} from entangled {str(entangled)}")
try:
entangled.entangledArmies.remove(army)
except:
pass
if army.tile in self.armies and self.armies[army.tile] == army:
del self.armies[army.tile]
continue
elif army.tile.delta.gainedSight and (
army.tile.player == -1 or (army.tile.player != army.player and len(army.entangledArmies) > 0)):
logbook.info(
f"Army {str(army)} just uncovered was an incorrect army prediction. Disentangle and remove from other entangley bois")
for entangled in army.entangledArmies:
logbook.info(f" removing {str(army)} from entangled {str(entangled)}")
try:
entangled.entangledArmies.remove(army)
except:
pass
if army.tile in self.armies and self.armies[army.tile] == army:
del self.armies[army.tile]
def track_army_movement(self):
# TODO tile.delta.imperfectArmyDelta
# for army in list(self.armies.values()):
# self.determine_army_movement(army, adjArmies)
trackingArmies = {}
skip = set()
# for tile in self.map.get_all_tiles():
# if tile.delta.armyDelta != 0 and not tile.delta.gainedSight and not tile.delta.lostSight:
# self.unaccounted_tile_diffs[tile] = tile.delta.armyDelta
with self.perf_timer.begin_move_event('ArmyTracker move respect'):
if self.lastMove is not None:
playerMoveArmy = self.get_or_create_army_at(self.lastMove.source)
playerMoveArmy.player = self.map.player_index
playerMoveArmy.value = self.lastMove.source.delta.oldArmy - 1
self.try_track_own_move(playerMoveArmy, skip, trackingArmies)
for player in self.map.players:
if player.index == self.map.player_index:
continue
if player.last_move is not None:
src: Tile
dest: Tile
src, dest, movedHalf = player.last_move
try:
armyAtSrc = self.armies[src]
except KeyError:
continue
if armyAtSrc.player == player.index:
logbook.info(f'RESPECTING MAP DETERMINED PLAYER MOVE {str(src)}->{str(dest)} BY p{player.index} FOR ARMY {str(armyAtSrc)}')
self.army_moved(armyAtSrc, dest, trackingArmies, dontUpdateOldFogArmyTile=True) # map already took care of this for us
skip.add(src)
else:
logbook.info(f'ARMY {str(armyAtSrc)} AT SOURCE OF PLAYER {player.index} MOVE {str(src)}->{str(dest)} DID NOT MATCH THE PLAYER THE MAP DETECTED AS MOVER, SCRAPPING ARMY...')
self.scrap_army(armyAtSrc, scrapEntangled=False)
with self.perf_timer.begin_move_event('ArmyTracker emergence pathing'):
self.unaccounted_tile_diffs: typing.Dict[Tile, int] = {}
for tile, diffTuple in self.map.army_emergences.items():
emergedAmount, emergingPlayer = diffTuple
if emergingPlayer == -1:
continue
self.should_recalc_fog_land_by_player[emergingPlayer] = True
self.unaccounted_tile_diffs[tile] = emergedAmount
# self.unaccounted_tile_diffs[tile] = emergedAmount
# map has already dealt with ALL possible perfect-information moves.
isMoveIntoFog = (
(
False
# (tile.delta.oldOwner == tile.player and tile.player != self.map.player_index)
or 0 - Tile.get_move_half_amount(tile.delta.oldArmy) == emergedAmount
or 0 - tile.delta.oldArmy + 1 == emergedAmount
)
and emergedAmount < 0 # must be negative emergence to be a move into fog
)
try:
existingArmy = self.armies[tile]
except KeyError:
existingArmy = None
if isMoveIntoFog:
if tile.delta.gainedSight and existingArmy:
logbook.info(
f'gainedSight splitting existingArmy back into fog 1 - emerged {emergedAmount} by player {emergingPlayer} on tile {repr(tile)} because it appears to have been a move INTO fog. Will be tracked via emergence.')
self.try_split_fogged_army_back_into_fog(existingArmy, trackingArmies)
skip.add(tile)
else:
logbook.info(f'IGNORING maps determined unexplained emergence of {emergedAmount} by player {emergingPlayer} on tile {repr(tile)} because it appears to have been a move INTO fog. Will be tracked via emergence.')
continue
# Jump straight to fog source detection.
if tile.delta.toTile is not None:
# the map does its job too well and tells us EXACTLY where the army emerged from, but armytracker wants the armies final destination and will re-do that work itself, so use the toTile.
# tile = tile.delta.toTile
if tile.delta.toTile in skip:
continue
if tile.isCity and tile.discovered:
logbook.info(f'(TEMP NOT IG???) IGNORING maps determined unexplained emergence of {emergedAmount} by player {emergingPlayer} on tile {repr(tile)} because it was a discovered city..?')
# continue
if existingArmy and existingArmy.player == tile.player and existingArmy.value + 1 > tile.army:
logbook.info(f'TODO CHECK IF EVER CALLED splitting existingArmy back into fog 2 - emerged {emergedAmount} by player {emergingPlayer} on tile {repr(tile)} because it appears to have been a move INTO fog. Will be tracked via emergence.')
self.try_split_fogged_army_back_into_fog(existingArmy, trackingArmies)
skip.add(tile)
continue
if emergedAmount < 0:
emergedAmount = 0 - emergedAmount
logbook.info(f'Respecting maps determined unexplained emergence of {emergedAmount} by player {emergingPlayer} on tile {repr(tile)} (from tile if known {repr(tile.delta.fromTile)})')
emergedArmy = self.handle_unaccounted_delta(tile, emergingPlayer, emergedAmount)
if emergedArmy is not None:
if tile.delta.toTile is not None and tile.delta.toTile.player == emergedArmy.player:
self.army_moved(emergedArmy, tile.delta.toTile, trackingArmies)
skip.add(tile.delta.toTile)
else:
trackingArmies[tile] = emergedArmy
skip.add(tile)
with self.perf_timer.begin_move_event('ArmyTracker lastmove loop'):
for tile in self.map.get_all_tiles():
if tile in skip:
continue
if self.lastMove is not None and tile == self.lastMove.source:
continue
if tile.delta.oldOwner == self.map.player_index:
# we track our own moves elsewhere
continue
if tile.delta.toTile is None:
continue
if tile.isUndiscoveredObstacle or tile.isMountain:
msg = f'are we really sure {str(tile)} moved to {str(tile.delta.toTile)}'
if BYPASS_TIMEOUTS_FOR_DEBUGGING:
raise AssertionError(msg)
else:
logbook.error(msg)
continue
if tile.delta.toTile.isMountain:
msg = f'are we really sure {str(tile.delta.toTile)} was moved to from {str(tile)}'
if BYPASS_TIMEOUTS_FOR_DEBUGGING:
raise AssertionError(msg)
else:
logbook.error(msg)
continue
# if armyDetectedAsMove is not None:
armyDetectedAsMove = self.get_or_create_army_at(tile)
logbook.info(f'Map detected army move, honoring that: {str(tile)}->{str(tile.delta.toTile)}')
self.army_moved(armyDetectedAsMove, tile.delta.toTile, trackingArmies)
if tile.delta.toTile.isUndiscoveredObstacle:
# if map detected a move into an obstacle, then
toTile = self.map.tiles_by_index[tile.delta.toTile.tile_index]
toTile.isCity = True
toTile.player = armyDetectedAsMove.player
toTile.army = armyDetectedAsMove.value
logbook.warning(f'CONVERTING {str(tile.delta.toTile)} UNDISCOVERED MOUNTAIN TO CITY DUE TO MAP SAYING DEFINITELY TILE MOVED THERE. {str(tile)}->{str(tile.delta.toTile)}')
armyDetectedAsMove.update()
if not tile.delta.toTile.visible:
# map knows what it is doing, force tile army update.
armyDetectedAsMove.value = tile.delta.toTile.army - 1
with self.perf_timer.begin_move_event('ArmyTracker try_track_army loop'):
for army in sorted(self.armies.values(), key=lambda a: self.map.get_distance_between(self.general, a.tile)):
# any of our armies CANT have moved (other than the one handled explicitly above), ignore them, let other armies collide with / capture them.
if army.player == self.map.player_index:
if army.last_moved_turn < self.map.turn - 1 and army.tile.army < self.track_threshold:
self.scrap_army(army, scrapEntangled=False)
else:
army.update()
continue
self.try_track_army(army, skip, trackingArmies)
with self.perf_timer.begin_move_event('ArmyTracker scrap unmoved'):
self.scrap_unmoved_low_armies()
for armyDetectedAsMove in trackingArmies.values():
self.armies[armyDetectedAsMove.tile] = armyDetectedAsMove
with self.perf_timer.begin_move_event('ArmyTracker clean up armies'):
self.clean_up_armies()
def find_visible_source(self, tile: Tile):
if tile.delta.armyDelta == 0:
return None
for adjacent in tile.movable:
isMatch = False
unexplainedAdjDelta = self.unaccounted_tile_diffs.get(adjacent, adjacent.delta.armyDelta)
if tile.delta.armyDelta + unexplainedAdjDelta == 0:
isMatch = True
if isMatch:
logbook.info(
f" Find visible source {str(tile)} ({tile.delta.armyDelta}) <- {adjacent.toString()} ({unexplainedAdjDelta}) ? {isMatch}")
return adjacent
# try more lenient
for adjacent in tile.movable:
isMatch = False
unexplainedAdjDelta = self.unaccounted_tile_diffs.get(adjacent, adjacent.delta.armyDelta)
if 2 >= tile.delta.armyDelta + unexplainedAdjDelta >= -2:
isMatch = True
logbook.info(
f" Find visible source {str(tile)} ({tile.delta.armyDelta}) <- {adjacent.toString()} ({unexplainedAdjDelta}) ? {isMatch}")
if isMatch:
return adjacent
return None
def army_moved(
self,
army: Army,
toTile: Tile,
trackingArmies: typing.Dict[Tile, Army],
dontUpdateOldFogArmyTile=False
):
"""
@param army:
@param toTile:
@param trackingArmies:
@param dontUpdateOldFogArmyTile: If True, will not update the old fogged army tile to be 1
@return:
"""
oldTile = army.tile
existingArmy = self.armies.pop(army.tile, None)
if army.visible and toTile.was_visible_last_turn(): # or visible?
if army.player in self.player_moves_this_turn:
logbook.error(f'Yo, we think player {army.player} moved twice this turn...? {str(army)} -> {str(toTile)}')
self.player_moves_this_turn.add(army.player)
try:
existingTracking = trackingArmies[toTile]
except KeyError:
existingTracking = None
if (
existingTracking is None
or existingTracking.value < army.value
or existingTracking.player != toTile.player
):
trackingArmies[toTile] = army
try:
potentialMergeOrKilled = self.armies[toTile]
except KeyError:
potentialMergeOrKilled = None
if potentialMergeOrKilled is not None:
if potentialMergeOrKilled.player == army.player:
self.merge_armies(army, potentialMergeOrKilled, toTile, trackingArmies)
elif toTile.delta.toTile is None:
self.collide_armies(army, potentialMergeOrKilled, toTile, trackingArmies)
army.update_tile(toTile)
if army.value < -1 or (army.player != army.tile.player and army.tile.visible):
logbook.info(f" Army {str(army)} scrapped for being negative or run into larger tile")
self.scrap_army(army, scrapEntangled=False)
if army.tile.visible and len(army.entangledArmies) > 0:
self.resolve_entangled_armies(army)
if not oldTile.visible and not dontUpdateOldFogArmyTile:
oldTile.army = 1
oldTile.player = army.player
if self.map.is_army_bonus_turn:
oldTile.army += 1
if oldTile.isCity or oldTile.isGeneral and self.map.is_city_bonus_turn:
oldTile.army += 1
# if not toTile.visible:
# toTile.player = army.player
if army.scrapped:
army.expectedPaths = []
else:
# Ok then we need to recalculate the expected path.
# TODO detect if enemy army is likely trying to defend
army.expectedPaths = ArmyTracker.get_army_expected_path(self.map, army, self.general, self.player_targets)
# logbook.info(f'set army {str(army)} expected paths to {str(army.expectedPaths)}')
if army.last_seen_turn > self.map.turn - 6:
army.last_moved_turn = self.map.turn - 1
for listener in self.notify_army_moved:
listener(army)
def scrap_army(self, army: Army, scrapEntangled: bool = False):
army.scrapped = True
if scrapEntangled:
for entangledArmy in army.entangledArmies:
entangledArmy.scrapped = True
self.resolve_entangled_armies(army)
else:
for entangledArmy in army.entangledArmies:
try:
entangledArmy.entangledArmies.remove(army)
except:
pass
def resolve_entangled_armies(self, army):
if len(army.entangledArmies) > 0:
logbook.info(f"{str(army)} resolving {len(army.entangledArmies)} entangled armies")
# resolvedByTile: typing.Set[Tile] = set()
for entangledArmy in army.entangledArmies:
# if entangledArmy.tile in resolvedByTile:
# continue
# resolvedByTile.add(entangledArmy.tile)
logbook.info(f" {entangledArmy.toString()} entangled, entangledValue {entangledArmy.entangledValue}")
if entangledArmy.tile in self.armies and self.armies[entangledArmy.tile] == entangledArmy:
del self.armies[entangledArmy.tile]
entangledArmy.scrapped = True
if not entangledArmy.tile.visible and entangledArmy.tile.army > 0:
newArmy = max(entangledArmy.tile.army - entangledArmy.entangledValue, 1)
logbook.info(
f" updating entangled army tile {entangledArmy.toString()} from army {entangledArmy.tile.army} to {newArmy}")
entangledArmy.tile.army = newArmy
if not entangledArmy.tile.discovered and entangledArmy.tile.player >= 0:
self.reset_temp_tile_marked(entangledArmy.tile)
entangledArmy.entangledArmies = []
army.entangledArmies = []
def army_could_capture(self, army, fogTargetTile):
if army.player != fogTargetTile.player:
return army.value > fogTargetTile.army
return True
def move_army_into_fog(self, army: Army, fogTargetTile: Tile):
self.armies.pop(army.tile, None)
# if fogTargetTile in self.armies:
# army.scrapped = True
# return
try:
existingTargetFoggedArmy = self.armies[fogTargetTile]
except KeyError:
existingTargetFoggedArmy = None
if existingTargetFoggedArmy is not None:
if army in existingTargetFoggedArmy.entangledArmies:
army.scrapped = True
return
movingPlayer = self.map.players[army.player]
if not fogTargetTile.visible:
if self.map.is_player_on_team_with(fogTargetTile.player, army.player):
fogTargetTile.army += army.value
if not fogTargetTile.isGeneral:
oldPlayer = self.map.players[fogTargetTile.player]
if fogTargetTile in oldPlayer.tiles:
oldPlayer.tiles.remove(fogTargetTile)
if not fogTargetTile.discovered and fogTargetTile.player != army.player:
fogTargetTile.isTempFogPrediction = True
fogTargetTile.player = army.player
movingPlayer.tiles.append(fogTargetTile)
else:
fogTargetTile.army -= army.value
if fogTargetTile.army < 0:
fogTargetTile.army = 0 - fogTargetTile.army
oldPlayer = self.map.players[fogTargetTile.player]
if fogTargetTile in oldPlayer.tiles:
oldPlayer.tiles.remove(fogTargetTile)
movingPlayer.tiles.append(fogTargetTile)
# if not fogTargetTile.discovered and len(army.entangledArmies) == 0:
fogTargetTile.player = army.player
logbook.info(f" fogTargetTile {fogTargetTile.toString()} updated army to {fogTargetTile.army}")
# breaks stuff real bad. Don't really want to do this anyway.
# Rather track the army through fog with no consideration of owning the tiles it crosses
# fogTargetTile.player = army.player
army.update_tile(fogTargetTile)
army.value = fogTargetTile.army - 1
self.armies[fogTargetTile] = army
for listener in self.notify_army_moved:
listener(army)
def get_nearby_armies(self, army, armyMap=None):
if armyMap is None:
armyMap = self.armies
# super fastMode depth 2 bfs effectively
nearbyArmies = []
for tile in army.tile.movable:
if tile in armyMap:
nearbyArmies.append(armyMap[tile])
for nextTile in tile.movable:
if nextTile != army.tile and nextTile in armyMap:
nearbyArmies.append(armyMap[nextTile])
for nearbyArmy in nearbyArmies:
logbook.info(f"Army {str(army)} had nearbyArmy {str(nearbyArmy)}")
return nearbyArmies
def find_new_armies(self):
logbook.info("Finding new armies:")
playerLargest = [None for x in range(len(self.map.players))]
if self.map.is_army_bonus_turn:
self.update_track_threshold()
# don't do largest tile for now?
# for tile in self.map.pathableTiles:
# if tile.player != -1 and (playerLargest[tile.player] == None or tile.army > playerLargest[tile.player].army):
# playerLargest[tile.player] = tile
for player in self.map.players:
for tile in player.tiles:
notOurMove = (self.lastMove is None or (tile != self.lastMove.source and tile != self.lastMove.dest))
tileNewlyMovedByEnemy = (
tile not in self.armies
and not tile.delta.gainedSight
and tile.player != self.map.player_index
and abs(tile.delta.armyDelta) > 2
and tile.army > 2
and notOurMove
)
isTileValidForArmy = (
playerLargest[tile.player] == tile
or tile.army > self.track_threshold
or tileNewlyMovedByEnemy
)
if isTileValidForArmy:
try:
tileArmy = self.armies[tile]
except KeyError:
tileArmy = None
if tileArmy is None or tileArmy.scrapped:
logbook.info(
f"{str(tile)} Discovered as Army! (tile.army {tile.army}, tile.delta {tile.delta.armyDelta}) - no fog emergence")
army = self.get_or_create_army_at(tile, skip_expected_path=not tile.visible and tile.isCity or tile.isGeneral)
if not army.visible:
army.value = army.tile.army - 1
else:
army.last_seen_turn = self.map.turn
# if tile WAS bordered by fog find the closest fog army and remove it (not tile.visible or tile.delta.gainedSight)
def new_army_emerged(self, emergedTile: Tile, armyEmergenceValue: float, emergingPlayer: int = -1, distance: int | None = None):
"""
when an army can't be resolved to coming from the fog from a known source, this method gets called to track its emergence location.
@param emergedTile:
@param armyEmergenceValue:
@return: