diff --git a/README.md b/README.md index 135e927..06dca70 100644 --- a/README.md +++ b/README.md @@ -91,7 +91,7 @@ Three scripts for gradius are available to try: - `python -m copain.commands.gradius_random_inputs` will start a visible FCEUX instance, start a 1p game, turn on autofire and then perform random directional inputs. If vic viper dies, the game resets. - `python -m copain.commands.gradius_brute_force` will start a visible FCEUX instance, start a 1p game and initiate a range of savestates, and perform a naïve brute-force exploration of the gamestates using a die-and-retry strategy, using savestates to unlock a fast exploration of savestates, until it reaches the end of the game. At maximum emulation speed, it takes about 10 minutes to see the end boss (TODO: detection of end game not implemented yet causes an infinite loop after end of game) -- `python -m copain.commands.gradius_brute_force`: a naïve attempt at applying q-learning to vic viper controls, reading the gamestate directly from the game RAM. Runs seemingly smoothly, but so far I got no signs of learning. Probably lacks many analysis tools, q-learning tricks, and understanding of the search space, before getting interesting results. The previous brute-force command might help creating a bank of states/transitions and savestates that would kickstart the learning process. +- `python -m copain.commands.gradius_q_learning`: a naïve attempt at applying q-learning to vic viper controls, reading the gamestate directly from the game RAM. Runs seemingly smoothly, but so far I got no signs of learning. Probably lacks many analysis tools, q-learning tricks, and understanding of the search space, before getting interesting results. The previous brute-force command might help creating a bank of states/transitions and savestates that would kickstart the learning process. A nice milestone would be managing to perfect-score the game using a combination of those 3 approaches. @@ -111,7 +111,6 @@ A nice milestone would be managing to perfect-score the game using a combination │ ├── rl.py # classes that setup the RL framework │ ├── run.lua # lua entrypoint │ ├── run.py # python high-level classes to start a run and a scripting loop -│ ├── utils.lua │ ├── utils.py │ └── VERSION.txt ├── LICENSE diff --git a/copain/commands/gradius_brute_force.py b/copain/commands/gradius_brute_force.py index fda579b..2d50ce4 100644 --- a/copain/commands/gradius_brute_force.py +++ b/copain/commands/gradius_brute_force.py @@ -13,10 +13,6 @@ TMP_FOLDER = "/tmp/" -NUM_RUNNERS = 1 -THREADED_SOCKET = True -THREADED_REQUESTS = True - FRAME_PER_ACTION = 16 NUMBER_OF_SAVESTATES = 10 ACTIONS_PER_SAVESTATES = 12 @@ -108,7 +104,7 @@ def __init__( self.nb_preferred_positions = len(preferred_positions) self.nb_fails_before_position_change = nb_fails_before_position_change - def __call__(self, handler, run_metadata): + def __call__(self, handler): self._register_handler(handler) # temporally ordered savestates @@ -326,15 +322,12 @@ def gradius_loop_fn_init(): rom_path=ROM_PATH, rom_hash=ROM_HASH, loop_fn_init=gradius_loop_fn_init, - threaded_socket=THREADED_SOCKET, - threaded_requests=THREADED_REQUESTS, - num_runners=NUM_RUNNERS, enable_game_genie=ENABLE_GAME_GENIE, display_fceux_gui=DISPLAY_FCEUX_GUI, - visible_enable_sound=ENABLE_SOUND, - visible_speedmode=SPEEDMODE, - visible_render_sprites=RENDER_SPRITES, - visible_render_background=RENDER_BACKGROUND, + enable_sound=ENABLE_SOUND, + speedmode=SPEEDMODE, + render_sprites=RENDER_SPRITES, + render_background=RENDER_BACKGROUND, tmp_folder=TMP_FOLDER, fceux_executable=FCEUX_EXECUTABLE, ).run() diff --git a/copain/commands/gradius_q_learning.py b/copain/commands/gradius_q_learning.py index a002b4a..378bb2e 100644 --- a/copain/commands/gradius_q_learning.py +++ b/copain/commands/gradius_q_learning.py @@ -8,51 +8,58 @@ ENABLE_SOUND = False SPEEDMODE = "maximum" -RENDER_SPRITES = False -RENDER_BACKGROUND = False +RENDER_SPRITES = True +RENDER_BACKGROUND = True TMP_FOLDER = "/tmp/" -NUM_RUNNERS = 3 -THREADED_SOCKET = True -THREADED_REQUESTS = True +FRAME_PER_ACTION = 16 -FRAME_PER_ACTION = 30 +MAX_NB_ORDINARY_MEMORIES = 5000 +MAX_NB_OUTSTANDING_MEMORIES = 10000 +REWARD_CUTOFF = 0.5 +MEMORY_BURN_IN = 1000 -DEVICE = "cuda" -EPOCH_DURATION = 600 - -EXPERIENCE_REPLAY_SIZE = 100000 -EXPERIENCE_REPLAY_BURN_IN = 30000 - -ENABLE_TRAINING = True - -ENABLE_AMP = True -AMP_INIT_SCALING = 20 -TRAINING_BATCH_SIZE = 64 +TORCH_NUM_THREADS = 1 +DEVICE="cuda" +AMP_ENABLED = True +AMP_INIT_SCALE = 20 +BATCH_SIZE = 64 STARTING_NB_EMBEDDINGS = 100000 NB_EMBEDDINGS_STEP = 50000 LR = 0.001 -EVALUATION_BATCH_SIZE = 32 - -NN_DEPTH = 1 -EMBEDDING_SIZE = 256 -HIDDEN_DIM = 8192 +NN_DEPTH = 2 +EMBEDDING_SIZE = 64 +HIDDEN_DIM = 4096 P_DROPOUT = 0.5 -DISCOUNT_RATE = 0.9 -EXPLORATION_RATE = 0.05 +DISCOUNT_RATE = 0.5 +EXPLORATION_RATE = 0.001 +RANDOM_START = True +PLAYER_WEIGHT_REFRESH_RATE = 60 + + +import io +from dataclasses import astuple +from time import sleep, perf_counter +from threading import Lock +from concurrent.futures import ThreadPoolExecutor import numpy as np import torch -from torch.utils.data.dataloader import default_collate + +from sklearn.base import clone + +from skorch.callbacks import Callback + +from torch.utils.data import IterableDataset from copain import CopainRun from copain.copain_driver import P1, DIRECTIONS -from copain.rl import CopainAI -from copain.nn import CopainANN +from copain.nn import NeuralNet, CopainANN +from copain.rl import Experience, ExperienceMemory _GRADIUS_MEMORY_SIZE = 0x8000 _GRADIUS_SLOT_SIZE = 0x100 @@ -63,7 +70,7 @@ def gradius_random_action(): return torch.randint(36, size=(1,))[0] -class GradiusLoopFn: +class GradiusIALoopFn: MEMORY_BYTERANGE_START = np.uint16(0).tobytes() MEMORY_BYTERANGE_LENGTH = np.uint16(_GRADIUS_MEMORY_SIZE) MEMORY_BYTERANGE_BLENGTH = MEMORY_BYTERANGE_LENGTH.tobytes() @@ -72,32 +79,84 @@ class GradiusLoopFn: SKIP_MULTIPLIERS = [1, 2, 4] - def __init__(self, frame_per_action): + def __init__(self, exploration_rate, frame_per_action, player, trainer, weight_sync_lock, + experience_memory, weight_refresh_rate, random_start): self._skip_after_input = frame_per_action - 1 - - def __call__(self, handler, copain_ai, run_metadata): + self.player = player + self.trainer = trainer + self.weight_sync_lock = weight_sync_lock + self.weight_refresh_rate = weight_refresh_rate + self.experience_memory = experience_memory + self.exploration_rate = exploration_rate + self._random = np.random.default_rng(42) + self.random_start = random_start + + def __call__(self, handler): self.handler = handler self._fire_frame = True - runner_id = run_metadata.runner_id - copain_ai.register_run(runner_id) + timer = perf_counter() while True: handler.emu_poweron() self._starting_sequence() + experience = None while True: - gamestate = self._get_gamestate() - action = copain_ai.ask_action(gamestate, runner_id) - time, inputs = self._get_inputs(action) - self._frameadvance_with_inputs_and_autofire(inputs) - inputs_directions = [press for press in inputs if press in DIRECTIONS] - for i in range(self.SKIP_MULTIPLIERS[time] * self._skip_after_input): + nb_frameadvances = np.inf + i = 0 + + while (i < nb_frameadvances): gamestate = self._get_gamestate() must_reset = self._done_condition(gamestate) if must_reset: - copain_ai.tell_done(gamestate, runner_id) + if experience is not None: + experience.state1 = gamestate + experience.done = True + experience.score1 = 0 + self.experience_memory.memorize_new_experience(experience) break - self._frameadvance_with_inputs_and_autofire(inputs_directions) + + if i == 0: + if (perf_counter() - timer) >= self.weight_refresh_rate: + print("syncing...") + del self.player.module_ + del self.player.initialized_ + with self.weight_sync_lock, io.BytesIO() as ram_buffer: + torch.save(self.trainer.module_, ram_buffer) + ram_buffer.seek(0) + self.player.module_ = torch.load( + ram_buffer, map_location=torch.device("cpu")) + del ram_buffer + self.player.initialized_ = True + self.update_experience_scores() + print("synced successfully!") + timer = perf_counter() + + scores = self.player.predict_proba( + gamestate.reshape((1, -1))) + if experience is None or self._random.random() < self.exploration_rate: + action = self._random.integers(scores.shape[1]) + else: + action = scores.argmax(1)[0] + score = scores[0, action] + if experience is not None: + experience.state1 = gamestate + experience.score1 = score + experience.done = False + self.experience_memory.memorize_new_experience(experience) + experience = Experience(state0=gamestate, + action=action, + score0=score) + + time, inputs = self._get_inputs(action) + inputs_directions = [press for press in inputs if press in DIRECTIONS] + nb_frameadvances = self.SKIP_MULTIPLIERS[time] * self._skip_after_input + self._frameadvance_with_inputs_and_autofire(inputs) + else: + self._frameadvance_with_inputs_and_autofire(inputs_directions) + + i += 1 + if must_reset: break @@ -150,15 +209,37 @@ def _get_inputs(self, action): return time, inputs - -def gradius_loop_fn_init(): - return GradiusLoopFn(FRAME_PER_ACTION) + def update_experience_scores(self): + self.trainer.module_.to(torch.device("cpu")) + self.player.module_.to(torch.device("cuda")) + self.player.device = "cuda" + experiences = self.experience_memory.get_all_experiences() + current_idx = 0 + batch_size = self.player.batch_size + while current_idx < len(experiences): + experience_batch = experiences[current_idx:(current_idx+batch_size)] + gamestates = [] + actions = [] + for experience in experience_batch: + experience.score1 = None + gamestates.append(experience.state1) + actions.append(experience.action) + gamestates = np.vstack(gamestates) + actions = np.array(actions) + scores = self.player.predict_proba(gamestates) + scores = scores[np.arange(scores.shape[0]), actions] + for i, experience in enumerate(experience_batch): + experience.score1 = scores[i] + current_idx += batch_size + self.player.module_.to(torch.device("cpu")) + self.player.device = "cpu" + self.trainer.module_.to(torch.device("cuda")) def _gradius_parse_action_nb(action): - time = torch.div(action, 18, rounding_mode="floor") + time = action // 18 action = action % 18 - press_b = torch.div(action, 9, rounding_mode="floor") + press_b = action // 9 direction = action % 9 return time, press_b, direction @@ -187,14 +268,100 @@ def _gradius_reward_fn(data_before, data_after, action, done): - int(any_direction) + 2 * time - 4 * press_b - + 8 * score_diff + + 32 * score_diff - 64 * int(done) - ) / 83 +# ) / 83 + ) / 100 + +class GradiusExperienceMemory: + + def __init__(self, ordinary_memory_size, oustanding_memory_size, reward_cutoff): + self.ordinary_memory = ExperienceMemory(ordinary_memory_size) + self.outstanding_memory = ExperienceMemory(oustanding_memory_size) + self.reward_cutoff = reward_cutoff + self._random = np.random.default_rng(42) + + def memorize_new_experience(self, experience): + experience.reward = _gradius_reward_fn( + experience.state0, experience.state1, experience.action, experience.done) + + memory = (self.outstanding_memory if (experience.reward < self.reward_cutoff) + else self.ordinary_memory) + memory.memorize_new_experience(experience) + + def get_random_experiences(self, nb_experiences): + is_outstanding = (self._random.random(nb_experiences) < ( + self.outstanding_memory.nb_memories/(self.outstanding_memory.nb_memories + + self.ordinary_memory.nb_memories))) + nb_outstanding = is_outstanding.sum() + outstanding_memories = self.outstanding_memory.get_random_experiences(nb_outstanding) + ordinary_memories = self.ordinary_memory.get_random_experiences( + nb_experiences-nb_outstanding) + memories = np.hstack((outstanding_memories, ordinary_memories)) + self._random.shuffle(memories) + return memories + + def get_all_experiences(self): + return np.hstack((self.ordinary_memory.get_all_experiences(), + self.outstanding_memory.get_all_experiences())) + + @property + def nb_memories(self): + return self.ordinary_memory.nb_memories + self.outstanding_memory.nb_memories + + +class MemoryStreamDataset(IterableDataset): + def __init__(self, experience_memory, min_nb_memories): + self.experience_memory = experience_memory + self.min_nb_memories = min_nb_memories + + def __iter__(self): + while self.experience_memory.nb_memories < self.min_nb_memories: + sleep(1) + + while True: + experience = self.experience_memory.get_random_experiences(1)[0] + (data_before, data_after, action, done, reward, + score_before, score_after) = astuple(experience) + yield data_before, (action, reward, done, score_after) +class RLLoss(torch.nn.Module): + + def __init__(self, base_criterion, discount): + super().__init__() + self.base_criterion = base_criterion() + self.discount = discount + + def forward(self, prediction, y): + action, reward, done, score_after = y + prediction = prediction[torch.arange(prediction.size(0)), action] + actual = reward + self.discount * score_after * (~done) + return self.base_criterion(prediction, actual) + + +class UpdateEmbeddings(Callback): + def on_batch_end(self, net, *args, **kwargs): + if net.module_._embedding_bag.update_embeddings(): + del net.optimizer_ + net._initialize_optimizer() + + +class LockCompute(Callback): + def __init__(self, lock): + self.lock = lock + def on_batch_begin(self, *args, **kwargs): + self.lock.acquire() + def on_batch_end(self, *args, **kwargs): + self.lock.release() + if __name__ == "__main__": - copain_ai = CopainAI( + torch.set_num_threads(1) + + weight_sync_lock = Lock() + + player = NeuralNet( module=CopainANN, module__n_actions=_GRADIUS_N_ACTIONS, module__input_dim=_GRADIUS_MEMORY_SIZE, @@ -207,42 +374,59 @@ def _gradius_reward_fn(data_before, data_after, action, done): module__p_dropout=P_DROPOUT, module__initialize_fn=torch.nn.init.kaiming_normal_, module__initialize_fn_kwargs=dict(nonlinearity="relu"), - criterion=torch.nn.HuberLoss, - reward_fn=_gradius_reward_fn, + criterion=RLLoss, + criterion__base_criterion = torch.nn.HuberLoss, + criterion__discount = DISCOUNT_RATE, optimizer=torch.optim.Adam, optimizer__amsgrad=True, lr=LR, - discount=DISCOUNT_RATE, - exploration_rate=EXPLORATION_RATE, - epoch_duration=EPOCH_DURATION, max_epochs=100, - experience_replay_size=EXPERIENCE_REPLAY_SIZE, - experience_replay_burn_in=EXPERIENCE_REPLAY_BURN_IN, - training_batch_size=TRAINING_BATCH_SIZE, - evaluation_batch_size=EVALUATION_BATCH_SIZE, - collate_fn=default_collate, - random_action_fn=gradius_random_action, - enable_amp=ENABLE_AMP, - amp_init_scaling=AMP_INIT_SCALING, - device=DEVICE, + batch_size=BATCH_SIZE, + amp_enabled=AMP_ENABLED, + amp_init_scale=AMP_INIT_SCALE, + device="cpu", + train_split=None, ) - copain_ai.set_training(ENABLE_TRAINING) + + trainer = clone(player) + trainer.set_params(device=DEVICE, + callbacks=[("Embedding_Mgt", UpdateEmbeddings()), + ("sync_callback", LockCompute(weight_sync_lock))]) + + experience_memory = GradiusExperienceMemory(MAX_NB_ORDINARY_MEMORIES, + MAX_NB_OUTSTANDING_MEMORIES, REWARD_CUTOFF) + + player.initialize() + + # create experience memory using EXPERIENCE_REPLAY_SIZE (EXPERIENCE_REPLAY_BURN_IN ?) + def gradius_loop_fn_init(): + return GradiusIALoopFn(EXPLORATION_RATE, FRAME_PER_ACTION, player, trainer, + weight_sync_lock, experience_memory, PLAYER_WEIGHT_REFRESH_RATE, + RANDOM_START) copain = CopainRun( rom_path=ROM_PATH, rom_hash=ROM_HASH, loop_fn_init=gradius_loop_fn_init, - threaded_socket=THREADED_SOCKET, - threaded_requests=THREADED_REQUESTS, - num_runners=NUM_RUNNERS, enable_game_genie=ENABLE_GAME_GENIE, display_fceux_gui=DISPLAY_FCEUX_GUI, - visible_enable_sound=ENABLE_SOUND, - visible_speedmode=SPEEDMODE, - visible_render_sprites=RENDER_SPRITES, - visible_render_background=RENDER_BACKGROUND, + enable_sound=ENABLE_SOUND, + speedmode=SPEEDMODE, + render_sprites=RENDER_SPRITES, + render_background=RENDER_BACKGROUND, tmp_folder=TMP_FOLDER, fceux_executable=FCEUX_EXECUTABLE, - copain_ai=copain_ai, - threaded_ai=True, - ).run() + ) + + with ThreadPoolExecutor(max_workers=1) as thread_executor: + thread_executor.submit(copain.run) + dataset = MemoryStreamDataset(experience_memory, min_nb_memories=MEMORY_BURN_IN) + trainer.fit(dataset) + +# TODO: +# sync in separate thread, and update memory with gpu +# several player threads +# one of the players is normal speed to spectate +# reduce input space (no bonus, no time) +# prefill the memory with a pre-saved movie ? +# clean mess diff --git a/copain/commands/gradius_random_inputs.py b/copain/commands/gradius_random_inputs.py index 5538b26..9d88eab 100644 --- a/copain/commands/gradius_random_inputs.py +++ b/copain/commands/gradius_random_inputs.py @@ -13,10 +13,6 @@ TMP_FOLDER = "/tmp/" -NUM_RUNNERS = 1 -THREADED_SOCKET = True -THREADED_REQUESTS = True - FRAME_PER_ACTION = 5 import numpy as np @@ -38,7 +34,7 @@ def __init__(self, frame_per_action): self._skip_after_input = frame_per_action - 1 self.random = np.random.RandomState() - def __call__(self, handler, run_metadata): + def __call__(self, handler): self._register_handler(handler) while True: @@ -118,15 +114,12 @@ def gradius_loop_fn_init(): rom_path=ROM_PATH, rom_hash=ROM_HASH, loop_fn_init=gradius_loop_fn_init, - threaded_socket=THREADED_SOCKET, - threaded_requests=THREADED_REQUESTS, - num_runners=NUM_RUNNERS, enable_game_genie=ENABLE_GAME_GENIE, display_fceux_gui=DISPLAY_FCEUX_GUI, - visible_enable_sound=ENABLE_SOUND, - visible_speedmode=SPEEDMODE, - visible_render_sprites=RENDER_SPRITES, - visible_render_background=RENDER_BACKGROUND, + enable_sound=ENABLE_SOUND, + speedmode=SPEEDMODE, + render_sprites=RENDER_SPRITES, + render_background=RENDER_BACKGROUND, tmp_folder=TMP_FOLDER, fceux_executable=FCEUX_EXECUTABLE, ).run() diff --git a/copain/copain_driver.lua b/copain/copain_driver.lua index edb6706..798f05b 100644 --- a/copain/copain_driver.lua +++ b/copain/copain_driver.lua @@ -254,14 +254,4 @@ end register_action(movie_rerecordcount) -- 22 --- misc - -local function get_runner_id() - local runner_id = assert(os.getenv("COPAIN_RUN_ID")) - if string.len(runner_id) == 1 then runner_id = runner_id .. "\0" end - assert(socket.send(socket_fd, runner_id)) -end -register_action(get_runner_id) -- 23 - - return driver diff --git a/copain/copain_driver.py b/copain/copain_driver.py index 687314e..fd639b1 100644 --- a/copain/copain_driver.py +++ b/copain/copain_driver.py @@ -246,13 +246,6 @@ def movie_rerecordcounting(self, counting): def movie_rerecordcount(self): return np.frombuffer(self.request.recv(8), dtype=np.uint64)[0] - - """misc""" - - @register_action # 23 - def get_runner_id(self): - return np.frombuffer(self.request.recv(2), dtype=np.uint16)[0] - """internal only""" def _gc(self): @@ -268,82 +261,38 @@ def _send_load(self, load): self.request.sendall(np.uint16(len(load)).tobytes()) self.request.sendall(load) + @dataclass(frozen=True) class CopainRunMetadata: - _VISIBLE_RUNNER_ID = 1 - rom_path: str rom_hash: str - runner_id: int - total_nb_runners: int - is_visible_runner: bool = field(init=False) - - def __post_init__(self): - object.__setattr__( - self, "is_visible_runner", self.runner_id == self._VISIBLE_RUNNER_ID - ) class _CopainLoopFn: def __init__( self, - display_visible_runner, - total_nb_runners, - visible_speedmode, - visible_render_sprites, - visible_render_background, + speedmode, + render_sprites, + render_background, rom_path, rom_hash, loop_fn_init, - copain_ai, ): - self.display_visible_runner = display_visible_runner - self.total_nb_runners = total_nb_runners - self.visible_speedmode = visible_speedmode - self.visible_render_sprites = visible_render_sprites - self.visible_render_background = visible_render_background + self.speedmode = speedmode + self.render_sprites = render_sprites + self.render_background = render_background self.rom_path = rom_path self.rom_hash = rom_hash self.loop_fn_init = loop_fn_init - self.copain_ai = copain_ai def __call__(self, handler): if not handler.request.getblocking(): raise RuntimeError("Expected a socket in blocking mode.") - runner_id = handler.get_runner_id() - - run_metadata = CopainRunMetadata( - rom_path=self.rom_path, - rom_hash=self.rom_hash, - runner_id=runner_id, - total_nb_runners=self.total_nb_runners, - ) - is_visible_runner = run_metadata.is_visible_runner - - if is_visible_runner and self.display_visible_runner: - if self.visible_speedmode is not None: - speedmode = self.visible_speedmode - else: - speedmode = "maximum" if self.total_nb_runners == 1 else "normal" - - visible_render_sprites = ( - self.visible_render_sprites is None - ) or self.visible_render_sprites - - visible_render_background = ( - self.visible_render_background is None - ) or self.visible_render_background - - else: - speedmode = "maximum" - visible_render_sprites = False - visible_render_background = False - - handler.emu_speedmode(speedmode) + handler.emu_speedmode(self.speedmode) handler.emu_setrenderplanes( - sprites=b"\x01" if visible_render_sprites else b"\x00", - background=b"\x01" if visible_render_background else b"\x00", + sprites=b"\x01" if self.render_sprites else b"\x00", + background=b"\x01" if self.render_background else b"\x00", ) actual_hash = handler.rom_gethash("md5") @@ -353,6 +302,11 @@ def __call__(self, handler): f"{actual_hash} but expected hash {self.rom_hash}" ) + run_metadata = CopainRunMetadata( + rom_path=self.rom_path, + rom_hash=self.rom_hash, + ) + kwargs = dict() loop_fn = self.loop_fn_init() @@ -367,22 +321,6 @@ def __call__(self, handler): ): kwargs["run_metadata"] = run_metadata - pass_copain_ai = ("copain_ai" in loop_fn_signature) and not any( - kind is loop_fn_signature["copain_ai"].kind - for kind in ( - inspect.Parameter.VAR_KEYWORD, - inspect.Parameter.VAR_POSITIONAL, - ) - ) - - if pass_copain_ai is not (self.copain_ai is not None): - raise ValueError( - "Expecting an instantiated copain_ai if and only if the loop_fn expects a parameter named copain_ai" - ) - - if pass_copain_ai: - kwargs["copain_ai"] = self.copain_ai - return loop_fn( handler, **kwargs, diff --git a/copain/nn.py b/copain/nn.py index 087e63b..fcc4272 100644 --- a/copain/nn.py +++ b/copain/nn.py @@ -1,5 +1,11 @@ import torch import torch.nn as nn +from torch.cuda.amp import GradScaler, autocast + +from skorch import NeuralNet +from skorch.utils import TeeGenerator +from skorch.dataset import unpack_data + from copain.utils import WeightInitializer @@ -33,22 +39,32 @@ def __init__( initialize_fn_kwargs, ) - feed_forward_steps = [] + per_value_feed_forward_steps = [] for d in range(depth): in_dim = hidden_dim if (d > 0) else (embedding_size) out_dim = hidden_dim if (d < (depth - 1)) else n_actions - feed_forward_steps.extend( + per_value_feed_forward_steps.extend( + [nn.Dropout(p_dropout), nn.ReLU(), nn.Linear(in_dim, out_dim)] + ) + + per_slot_feed_forward_steps = [] + for d in range(depth): + in_dim = hidden_dim if (d > 0) else (input_dim) + out_dim = hidden_dim if (d < (depth - 1)) else n_actions + per_slot_feed_forward_steps.extend( [nn.Dropout(p_dropout), nn.ReLU(), nn.Linear(in_dim, out_dim)] ) - feed_forward_steps.extend([nn.Dropout(p_dropout), nn.ReLU()]) + self.nb_values_per_dim = nb_values_per_dim - self._feed_forward = nn.Sequential(*feed_forward_steps) + self._per_value_feed_forward = nn.Sequential(*per_value_feed_forward_steps) + self._per_slot_feed_forward = nn.Sequential(*per_slot_feed_forward_steps) self.apply(WeightInitializer(initialize_fn, initialize_fn_kwargs)) def forward(self, X): - return self._feed_forward(self._embedding_bag(X)) + return (self._per_value_feed_forward(self._embedding_bag(X)) + + self._per_slot_feed_forward(X/self.nb_values_per_dim))/2 class _DynamicEmbeddingBag(nn.Module): @@ -163,3 +179,61 @@ def forward(self, X): self._detect_unindexed_data(X, X_remapped, row_ix) return self._embedding_bag(X_remapped) + +class NeuralNet(NeuralNet): + + def __init__( + self, + module, criterion, + amp_enabled=True, amp_init_scale=19, + *args, **kwargs + ): + + super(NeuralNet, self).__init__(module, criterion, *args, **kwargs) + self.amp_enabled = amp_enabled + self.amp_init_scale = amp_init_scale + + def fit(self, X, y=None, **fit_params): + if self.amp_enabled: + self.scaler = GradScaler(init_scale=2**self.amp_init_scale) + super().fit(X, y, **fit_params) + return self + + def train_step(self, batch, **fit_params): + step_accumulator = self.get_train_step_accumulator() + + step = self.train_step_single(batch, **fit_params) + step_accumulator.store_step(step) + if self.amp_enabled: + self.scaler.step(self.optimizer_) + self.scaler.update() + else: + self.optimizer_.step() + self.optimizer_.zero_grad() + + return step_accumulator.get_step() + + def train_step_single(self, batch, **fit_params): + self._set_training(True) + Xi, yi = unpack_data(batch) + y_pred = self.infer(Xi, **fit_params) + with autocast(enabled=self.amp_enabled): + loss = self.get_loss(y_pred, yi, X=Xi, training=True) + + self.scaler.scale(loss).backward() if self.amp_enabled else loss.backward() + + self.notify( + 'on_grad_computed', + named_parameters=TeeGenerator(self.get_all_learnable_params()), + batch=batch + ) + + return { + # get back the loss on the last batch (independent of gradient accumulation tricks) + 'loss': loss, + 'y_pred': y_pred, + } + + def infer(self, *args, **kwargs): + with autocast(enabled=self.amp_enabled): + return super(NeuralNet, self).infer(*args, **kwargs) diff --git a/copain/rl.py b/copain/rl.py index a275fb5..6885fb6 100644 --- a/copain/rl.py +++ b/copain/rl.py @@ -1,15 +1,11 @@ import inspect -import threading -from queue import SimpleQueue -from dataclasses import dataclass, astuple +from dataclasses import dataclass import numpy as np -import torch -import torch.cuda.amp as amp -from torch.utils.data.dataloader import default_collate from skorch import NeuralNet -from skorch.utils import to_device + +from threading import Lock _default_nn_params = { @@ -19,295 +15,46 @@ } -class _Pending: +class PendingData: pass @dataclass -class _Experience: - state0: np.array = _Pending - state1: np.array = _Pending - action: np.array = _Pending - reward: np.float32 = _Pending +class Experience: + state0: np.array = PendingData + state1: np.array = PendingData + action: np.array = PendingData + done: bool = PendingData + reward: np.float32 = PendingData + score0: int = PendingData + score1: int = PendingData -class _ExperienceMemory: +class ExperienceMemory: def __init__(self, size): self.size = size self._memory = np.empty(size, dtype=object) self._memory_writing_pointer = 0 self._max_memory_idx = -1 - self._random = np.random.RandomState(42) + self._random = np.random.default_rng(42) + self._lock = Lock() def memorize_new_experience(self, experience): - self._memory[self._memory_writing_pointer] = experience - self._memory_writing_pointer = (self._memory_writing_pointer + 1) % self.size - self._max_memory_idx = max(self._max_memory_idx, self._memory_writing_pointer) + with self._lock: + self._memory[self._memory_writing_pointer] = experience + self._max_memory_idx = max(self._max_memory_idx, self._memory_writing_pointer) + self._memory_writing_pointer = (self._memory_writing_pointer + 1) % self.size def get_random_experiences(self, nb_experiences): - return self._random.choice( - self._memory[: self._max_memory_idx], nb_experiences, replace=True - ) + with self._lock: + return self._random.choice( + self._memory[: self._max_memory_idx], nb_experiences, replace=True + ) + + def get_all_experiences(self): + with self._lock: + return self._memory[:self.nb_memories] @property def nb_memories(self): return self._max_memory_idx + 1 - - -class CopainAI(NeuralNet): - def __init__( - self, - *, - module, - criterion, - reward_fn, - optimizer=_default_nn_params["optimizer"], - lr=_default_nn_params["lr"], - discount=0.9, - exploration_rate=1, - epoch_duration=600, # TODO: implement - max_epochs=_default_nn_params["max_epochs"], - experience_replay_size=10000, - experience_replay_burn_in=10000, - training_batch_size=_default_nn_params["batch_size"], - evaluation_batch_size=_default_nn_params["batch_size"], - collate_fn=default_collate, - random_action_fn=None, - callbacks=_default_nn_params["callbacks"], - warm_start=_default_nn_params["warm_start"], - verbose=_default_nn_params["verbose"], - device=_default_nn_params["device"], - enable_amp=False, - amp_init_scaling=20, - **kwargs, - ): - batch_size = None - dataset = None - iterator_train = None - iterator_valid = None - train_split = None - predict_nonlinearity = None - super().__init__( - module, - criterion, - optimizer, - lr, - max_epochs, - batch_size, - iterator_train, - iterator_valid, - dataset, - train_split, - callbacks, - predict_nonlinearity, - warm_start, - verbose, - device, - **kwargs, - ) - self.discount = discount - self.exploration_rate = exploration_rate - self.reward_fn = reward_fn - - self.experience_replay_size = experience_replay_size - self.experience_replay_burn_in = experience_replay_burn_in - - self.training_batch_size = training_batch_size - self.evaluation_batch_size = evaluation_batch_size - - self.epoch_duration = epoch_duration - self.collate_fn = collate_fn - self.random_action_fn = random_action_fn - self.enable_amp = enable_amp - self.amp_init_scaling = amp_init_scaling - - self._connection_queues = dict() - self._current_step = dict() - self._requests_queue = SimpleQueue() - - def ask_action(self, X, run_id): - self._requests_queue.put((run_id, False, X)) - return self._connection_queues[run_id].get() - - def tell_done(self, X, run_id): - self._requests_queue.put((run_id, True, X)) - - def register_run(self, run_id): - if run_id in self._connection_queues: - raise ValueError(f"A run with run_id {run_id} has already been registered.") - - self._connection_queues[run_id] = SimpleQueue() - self._current_step[run_id] = _Experience() - - def unregister_run(self, run_id): - if run_id not in self._connection_queues: - raise ValueError(f"No run with run_id {run_id} in the run registry.") - - del self._connection_queues[run_id] - del self._current_step[run_id] - - def serve_forever(self): - self.is_serving = True - __is_shut_down = self.__is_shut_down = threading.Event() - try: - self._serve_forever() - finally: - del self._requests_queue - del self.__is_shut_down - __is_shut_down.set() - - def _serve_forever(self): - requests_queue = self._requests_queue - random = np.random.RandomState(41) - if self.training and not (self.warm_start and self.initialized_): - self.initialize() - amp_scaler = amp.GradScaler( - init_scale=2.0 ** self.amp_init_scaling, enabled=self.enable_amp - ) - experience = _ExperienceMemory(self.experience_replay_size) - - while self.is_serving: - self._step(requests_queue, experience, amp_scaler, random) - - def _step(self, requests_queue, experience, amp_scaler, random): - prediction_run_id = [] - evaluation_states = [] - while self._wait_for_next_request( - requests_queue, - len(prediction_run_id), - burn_in=(experience.nb_memories >= self.experience_replay_burn_in), - ): - if prediction_request := self._receive_next_request( - requests_queue, experience, random - ): - run_id, data = prediction_request - prediction_run_id.append(run_id) - evaluation_states.append(data) - - if self.training and (experience.nb_memories >= self.experience_replay_burn_in): - ( - training_states, - training_batch1, - training_actions, - rewards, - ) = self._load_next_training_batch(experience) - evaluation_states.extend(training_batch1) - training_actions = to_device(default_collate(training_actions), self.device) - with amp.autocast(enabled=self.enable_amp), torch.no_grad(): - scores_eval = self._evaluation_step( - evaluation_states, prediction_run_id, training_actions - ) - - if not self.training: - return - - self._train_step( - training_states, training_actions, rewards, scores_eval, amp_scaler - ) - - # TODO: when cb is done this should be on batch cb - if self.module_._embedding_bag.update_embeddings(): - del self.optimizer_ - self._initialize_optimizer() - - def _wait_for_next_request(self, requests_queue, nb_requests_preprocessed, burn_in): - return ( - not requests_queue.empty() - or ( - (not self.training or not burn_in) # - and (nb_requests_preprocessed == 0) - ) - ) and nb_requests_preprocessed < self.evaluation_batch_size - - def _receive_next_request(self, requests_queue, experience, random): - ( - run_id, - done, - data, - ) = requests_queue.get() - if not self.training: - return (run_id, data) - current_step = self._current_step[run_id] - if current_step.state0 is not _Pending: - current_step.state1 = data - current_step.reward = self.reward_fn( - current_step.state0, - current_step.state1, - current_step.action, - done, - ) - experience.memorize_new_experience(current_step) - self._current_step[run_id] = _Experience() - if done: - return False - - self._current_step[run_id].state0 = data - if experience.nb_memories < self.experience_replay_burn_in: - random_action = not self.warm_start - - else: - random_action = random.random_sample() < self.exploration_rate - if not random_action: - return (run_id, data) - requested_action = self._random_action() - self._current_step[run_id].action = requested_action - self._connection_queues[run_id].put(requested_action) - return False - - def _random_action(self): - if self.random_action_fn is None: - return torch.randint(self.module_.n_actions, size=(1,))[0] - - return self.random_action_fn() - - def _load_next_training_batch(self, experience): - training_samples = experience.get_random_experiences(self.training_batch_size) - training_samples = [ - astuple(training_sample) for training_sample in training_samples - ] - return map(list, zip(*training_samples)) - - def _evaluation_step(self, evaluation_states, prediction_run_id, training_actions): - nb_predictions_scheduled = len(prediction_run_id) - - evaluation_states = to_device(self.collate_fn(evaluation_states), self.device) - self._set_training(training=False) - scores_eval = self.module_(evaluation_states) - - if nb_predictions_scheduled == 0: - return scores_eval[:, training_actions] - - for request_run_id, prediction in zip( - prediction_run_id, - scores_eval[:nb_predictions_scheduled].argmax(1).to("cpu"), - ): - self._current_step[request_run_id].action = prediction - self._connection_queues[request_run_id].put(prediction) - - return scores_eval[nb_predictions_scheduled:, training_actions] - - def _train_step( - self, training_states, training_actions, rewards, scores_eval, amp_scaler - ): - self.optimizer_.zero_grad(set_to_none=True) - with amp.autocast(enabled=self.enable_amp): - training_states = to_device(self.collate_fn(training_states), self.device) - self._set_training(training=True) - scores_training = self.module_(training_states)[:, training_actions] - rewards = to_device(default_collate(rewards), self.device) - expected_scores_training = rewards + self.discount * scores_eval - loss = self.criterion_(scores_training, expected_scores_training) - amp_scaler.scale(loss).backward() - amp_scaler.step(self.optimizer_) - amp_scaler.update() - - def set_training(self, training): - if training is getattr(self, "training", None): - raise ValueError - - self.training = training - - def shutdown(self): - self.is_serving = False - if hasattr(self, "_CopainAI__is_shut_down"): - self.__is_shut_down.wait() diff --git a/copain/run.py b/copain/run.py index 8f65555..05e0c36 100644 --- a/copain/run.py +++ b/copain/run.py @@ -3,10 +3,9 @@ from pathlib import Path from contextlib import ExitStack from tempfile import TemporaryDirectory -from socketserver import UnixStreamServer, ThreadingMixIn from concurrent.futures import ThreadPoolExecutor +from socketserver import UnixStreamServer, ThreadingMixIn -import numpy as np from pyvirtualdisplay import Display from copain.copain_driver import _CopainSocketHandler, _CopainLoopFn @@ -16,60 +15,33 @@ class _SequentialSocketServer(ThreadingMixIn, UnixStreamServer): pass -class _ThreadedSocketServer(ThreadingMixIn, UnixStreamServer): - pass - - class CopainRun: def __init__( self, rom_path, rom_hash, loop_fn_init, - threaded_socket=True, - threaded_requests=True, - num_runners=1, enable_game_genie=False, display_fceux_gui=True, - visible_enable_sound=True, - visible_speedmode=None, - visible_render_sprites=None, - visible_render_background=None, + enable_sound=True, + speedmode=None, + render_sprites=None, + render_background=None, tmp_folder="/tmp", fceux_executable="/usr/bin/fceux", - copain_ai=None, - threaded_ai=True, ): self.rom_path = rom_path self.rom_hash = rom_hash self.loop_fn_init = loop_fn_init - self.threaded_socket = threaded_socket - - if num_runners > 1 and not threaded_requests: - raise ValueError( - "A threaded handling of the requests is mandatory when " - "num_runners > 1" - ) - self.threaded_requests = threaded_requests - - max_nb_runners = np.iinfo(np.uint16).max - if num_runners > max_nb_runners: - raise ValueError( - f"num_runners is expected to be below " - f"{max_nb_runners}, got {self.num_runners}" - ) - self.num_runners = num_runners self.enable_game_genie = enable_game_genie self.display_fceux_gui = display_fceux_gui - self.visible_enable_sound = visible_enable_sound - self.visible_speedmode = visible_speedmode - self.visible_render_sprites = visible_render_sprites - self.visible_render_background = visible_render_background + self.enable_sound = enable_sound + self.speedmode = speedmode + self.render_sprites = render_sprites + self.render_background = render_background self.tmp_folder = tmp_folder self.fceux_executable = fceux_executable - self.copain_ai = copain_ai - self.threaded_ai = threaded_ai def run(self): @@ -84,130 +56,67 @@ def run(self): socket_file = os.path.join(tmp_folder, "copain_socket") + copain_loop_fn = _CopainLoopFn( + self.speedmode, + self.render_sprites, + self.render_background, + self.rom_path, + self.rom_hash, + self.loop_fn_init, + ) + + class _SocketHandler(_CopainSocketHandler): + def handle(self): + return copain_loop_fn(self) + + server = stack.enter_context(_SequentialSocketServer(socket_file, _SocketHandler)) + print(f"Started the socket server at {socket_file}...") + print("Starting the socket server thread...") + + server_executor = stack.enter_context(ThreadPoolExecutor(max_workers=1)) + server_thread = server_executor.submit(server.serve_forever) + env = dict(os.environb) env[b"COPAIN_SOCKET_SERVER_FILE_PATH"] = socket_file.encode() - base_start_command = [ + start_command = [ self.fceux_executable, "--loadlua", lua_script, self.rom_path, ] - visible_start_command = None - background_start_command = None - if self.display_fceux_gui: - visible_start_command = base_start_command.copy() - - visible_start_command.extend( - ["-s", "1" if self.visible_enable_sound else "0"] + start_command.extend( + ["-s", "1" if self.enable_sound else "0"] ) - visible_start_command.extend( + start_command.extend( ["-g", "1" if self.enable_game_genie else "0"] ) print( "Starting visible fceux instance with command \n %s..." - % " ".join(visible_start_command) + % " ".join(start_command) ) - if not self.display_fceux_gui or self.num_runners > 1: - background_start_command = base_start_command.copy() - background_start_command.extend( + else: + start_command.extend( ["-g", "1" if self.enable_game_genie else "0"] ) - background_start_command.extend(["-s", "0"]) - - nb_background_instances = ( - (self.num_runners - 1) - if self.display_fceux_gui - else self.num_runners - ) + start_command.extend(["-s", "0"]) print( - "Starting %s background fceux instances with command \n %s..." - % (nb_background_instances, " ".join(background_start_command)) + "Starting a background fceux instance with command \n %s..." + % (" ".join(start_command)) ) - - runs = [] - for i in range(1, self.num_runners + 1): - command = background_start_command - - if i == 1 and self.display_fceux_gui: - command = visible_start_command - - elif i == 1 or (self.display_fceux_gui and i == 2): - stack.enter_context( - Display(use_xauth=True, visible=False, size=(1, 1)) - ) - - i8 = np.uint8(i) - i16 = np.uint16(i) - i = i8 if (i8 == i16) else i16 - env[b"COPAIN_RUN_ID"] = i.tobytes() - env[b"DISPLAY"] = os.environb[b"DISPLAY"] - runs.append( - subprocess.Popen( - command, - env=env, - ) + stack.enter_context( + Display(use_xauth=True, visible=False, size=(1, 1)) ) - server_type = ( - _ThreadedSocketServer - if self.threaded_socket - else _SequentialSocketServer - ) + env[b"DISPLAY"] = os.environb[b"DISPLAY"] + subprocess.Popen( + start_command, + env=env + ).wait() - copain_loop_fn = _CopainLoopFn( - self.display_fceux_gui, - self.num_runners, - self.visible_speedmode, - self.visible_render_sprites, - self.visible_render_background, - self.rom_path, - self.rom_hash, - self.loop_fn_init, - self.copain_ai, - ) - - class _SocketHandler(_CopainSocketHandler): - def handle(self): - return copain_loop_fn(self) - - server = stack.enter_context(server_type(socket_file, _SocketHandler)) - - if self.copain_ai is not None and self.threaded_ai: - ai_executor = stack.enter_context(ThreadPoolExecutor(max_workers=1)) - copain_ai_thread = ai_executor.submit(self.copain_ai.serve_forever) - - print(f"Started the socket server at {socket_file}...") - - print("Starting the socket server thread...") - if self.threaded_socket: - server_executor = stack.enter_context(ThreadPoolExecutor(max_workers=1)) - server_thread = server_executor.submit(server.serve_forever) - else: - server.serve_forever() - - try: - for run in runs: - try: - run.wait() - except BaseException: - break - finally: - for run in runs: - if run.poll() is None: - run.kill() - - server.shutdown() - server.server_close() - - if self.threaded_socket: - server_thread.result() - del server_thread - - if self.copain_ai is not None: - self.copain_ai.shutdown() - if self.threaded_ai: - copain_ai_thread.result() + server.shutdown() + server.server_close() + server_thread.result()