forked from DamienLegros/PANDROIDE
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathepisode.py
More file actions
119 lines (108 loc) · 4.22 KB
/
episode.py
File metadata and controls
119 lines (108 loc) · 4.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import numpy as np
import math
class Episode:
"""
This class stores the samples of an episode
"""
def __init__(self):
self.state_pool = []
self.action_pool = []
self.reward_pool = []
self.done_pool = []
self.next_state_pool = []
self.len = 0
def add(self, state, action, reward, done, next_state) -> None:
"""
Add a sample to the episode
:param state: the current state
:param action: the taken action
:param reward: the resulting reward
:param done: whether the episode is over
:param next_state: the resulting next state
:return: nothing
"""
self.state_pool.append(state)
self.action_pool.append(action)
self.reward_pool.append(reward)
self.done_pool.append(done)
self.next_state_pool.append(next_state)
self.len += 1
def size(self):
"""
Return the number of samples already stored in the episode
:return: the number of samples in the episode
"""
return self.len
def discounted_sum_rewards(self, gamma) -> None:
"""
Apply a discounted sum of rewards to all samples of the episode
:param gamma: the discount factor
:return: nothing
"""
summ = 0
for i in reversed(range(len(self.state_pool))):
summ = summ * gamma + self.reward_pool[i]
self.reward_pool[i] = summ
def sum_rewards(self) -> None:
"""
Apply a sum of rewards to all samples of the episode
:return: nothing
"""
summ = np.sum(self.reward_pool)
for i in range(len(self.reward_pool)):
self.reward_pool[i] = summ
def substract_baseline(self, critic) -> None:
"""
Substracts a baseline to the reward of all samples of the episode
:param critic: the baseline critic to be substracted
:return: nothing
"""
val = critic.evaluate(np.array(self.state_pool), np.array(self.action_pool))
for i in range(len(self.reward_pool)):
self.reward_pool[i] -= val[i][0]
def nstep_return(self, n, gamma, critic) -> None:
"""
Apply Bellman backup n-step return to all rewards of all samples of the episode
Warning, we rewrite on reward_pools, must be done in the right order
:param n: the number of steps in n-step
:param gamma: the discount factor
:param critic: the critic used to perform Bellman backups
:return: nothing
"""
for i in range(len(self.reward_pool)):
horizon = i + n
summ = self.reward_pool[i]
if horizon < len(self.reward_pool):
bootstrap_val = critic.evaluate(self.state_pool[horizon], self.action_pool[horizon])[0][0]
summ += gamma ** n * bootstrap_val
for j in range(1, n):
if i + j >= len(self.reward_pool):
break
summ += gamma**j * self.reward_pool[i+j]
self.reward_pool[i] = summ
def normalize_rewards(self, reward_mean, reward_std) -> None:
"""
Apply a normalized sum of rewards (non discounted) to all samples of the episode
:param gamma: the discount factor
:return: nothing
"""
for i in range(len(self.reward_pool)):
self.reward_pool[i] = (self.reward_pool[i] - reward_mean) / reward_std
def normalize_discounted_rewards(self, gamma, reward_mean, reward_std) -> None:
"""
Apply a normalized and discounted sum of rewards to all samples of the episode
:param gamma: the discount factor
:return: nothing
"""
summ = 0
for i in reversed(range(len(self.state_pool))):
summ = summ * gamma + self.reward_pool[i]
self.reward_pool[i] = (summ - reward_mean) / reward_std
def exponentiate_rewards(self, beta) -> None:
"""
Apply an exponentiation factor to the rewards of all samples of the episode
:param beta: the exponentiation factor
:return: nothing
"""
for i in range(len(self.reward_pool)):
self.reward_pool[i] = math.exp(self.reward_pool[i]/beta)