-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreplay_buffer.py
More file actions
74 lines (55 loc) · 2.39 KB
/
replay_buffer.py
File metadata and controls
74 lines (55 loc) · 2.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from collections import deque
import torch
import numpy as np
class ReplayBuffer:
def __init__(self, capacity=1000, device=None):
device = device or ("cuda" if torch.cuda.is_available() else "cpu")
self.device = torch.device(device)
self.total_episodes = 0
self.buffer = deque(maxlen=capacity)
def push(self, obss, rews, dfa_trans, dfa_rews):
self.buffer.append((obss, rews, dfa_trans, dfa_rews))
self.total_episodes += 1
def __len__(self):
return len(self.buffer)
def __iter__(self):
for obss, rews, dfa_trans, dfa_rews in self.buffer:
obss = obss.to(self.device)
rews = rews.to(self.device)
dfa_trans = dfa_trans.to(self.device)
dfa_rews = dfa_rews.to(self.device)
yield obss, rews, dfa_trans, dfa_rews
def sample(self, batch_size):
indices = np.random.choice(len(self.buffer), batch_size, replace=False)
batch = [self.buffer[idx] for idx in indices]
obss, rews, dfa_trans, dfa_rews = zip(*batch)
obss = self._pad_batch(obss).to(self.device)
rews = self._pad_batch(rews).to(self.device)
dfa_trans = [tr.to(self.device) for tr in dfa_trans]
dfa_rews = [rw.to(self.device) for rw in dfa_rews]
return obss, rews, dfa_trans, dfa_rews
def _pad_batch(self, sequences):
max_len = max(seq.shape[0] for seq in sequences)
padded = []
for seq in sequences:
seq = self._pad_repeat_last(seq, max_len)
padded.append(seq)
return torch.stack(padded)
def _pad_repeat_last(self, seq, length):
pad_len = length - seq.shape[0]
if pad_len > 0:
repeat = seq[-1:].expand(pad_len, *seq.shape[1:])
seq = torch.cat([seq, repeat], dim=0)
return seq
def iter_batches(self, batch_size):
buffer_list = list(self.buffer)
for i in range(0, len(buffer_list), batch_size):
batch = buffer_list[i:i + batch_size]
obss, rews, dfa_trans, dfa_rews = zip(*batch)
obss = self._pad_batch(obss).to(self.device)
rews = self._pad_batch(rews).to(self.device)
dfa_trans = [tr.to(self.device) for tr in dfa_trans]
dfa_rews = [rw.to(self.device) for rw in dfa_rews]
yield obss, rews, dfa_trans, dfa_rews
def clear(self):
self.buffer.clear()