-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathplot.py
More file actions
340 lines (294 loc) · 12.2 KB
/
plot.py
File metadata and controls
340 lines (294 loc) · 12.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
import datetime as dt
import numpy as np, matplotlib.pyplot as plt
from scipy.interpolate import interp1d
import matplotlib.dates as mdates
from storage import csv_storage
# configs
warning_timedelta = dt.timedelta(days=3)
recent_timedelta = dt.timedelta(days=7)
estimate_timedelta = dt.timedelta(days=1)
# reads csv and returns history
def read_csv(cs: csv_storage):
history: list[tuple[float, dt.datetime, dt.datetime]] = []
for rows in filter(lambda x:x, cs.read().split("\n")):
rows = rows.split(",")
remain, query_time, request_time = [col.strip() for col in rows]
remain = float(remain)
query_time = dt.datetime.fromisoformat(query_time)
request_time = dt.datetime.fromisoformat(request_time)
history.append((remain, query_time, request_time))
return history
# filter out entries within recent_timedelta
def filter_recent(
history: list[tuple[float, dt.datetime, dt.datetime]]
) -> list[tuple[float, dt.datetime, dt.datetime]]:
first_idx = 0
last_datetime = history[-1][1]
for _, datetime, _ in history:
if last_datetime - datetime < recent_timedelta:
break
first_idx += 1
return history[first_idx:]
# subtract recharges from remaining, letting values go negtive
def decharge(history: list[tuple[float, dt.datetime, dt.datetime]]):
def find_smallest_greater(arr, val):
arr = np.asarray(arr)
greater_than_val = arr[arr > val]
if greater_than_val.size == 0:
return None
return greater_than_val.min()
recharge_values = [25, 50, 75, 100, 150, 200]
last_value = history[0][0]
recharges: list[tuple[float, int]] = []
recharged_sum = 0 # to be subtracted from values
decharged: list[tuple[float, dt.datetime, dt.datetime]] = [
history[0]
] # history with recharged values removed, values can be below zero
for i, (value, query_time, request_time) in enumerate(history[1:]):
if value > last_value:
# recharge occurs
delta = value - last_value
recharged = find_smallest_greater(recharge_values, delta)
if not recharged:
recharged = delta
#assert (
# recharged
#), "amount of recharge more than the maximum value of 200 within sample interval"
# todo: handle cases when the amount is less than a minimum of 25
recharged_sum += recharged
recharges.append((recharged, i))
# print(f"recharged by {recharged}(delta:{delta}) between {last_query} and {query_time}")
decharged_value = value - recharged_sum
decharged.append((decharged_value, query_time, request_time))
last_value = value
# return decharged, recharges
# test
# print("\n".join(map(str, decharged)))
for recharged, idx in recharges:
before = history[idx]
after = history[idx + 1]
delta = after[0] - before[0]
before_time = before[1]
after_time = after[1]
print(
f"recharged by {recharged}(delta:{delta}) between {before_time} and {after_time}"
)
return decharged, recharges
# costs of each day
def get_cost(history_decharged: list[tuple[float, dt.datetime, dt.datetime]]):
costs: list[tuple[float, dt.date]] = []
# complete first and last day
# interop
x = [x[1].timestamp() for x in history_decharged]
y = [x[0] for x in history_decharged]
interop_f = interp1d(
x, y, kind="linear", bounds_error=False, fill_value=(y[0], y[-1])
)
start_date = history_decharged[0][1].date()
end_date = history_decharged[-1][1].date() + dt.timedelta(days=1)
duration_days = (end_date - start_date).days
for i in range(duration_days):
date = start_date + dt.timedelta(days=i)
before_ts = dt.datetime.combine(date, dt.time()).timestamp()
after_ts = dt.datetime.combine(
start_date + dt.timedelta(days=i + 1), dt.time()
).timestamp()
before_val = interop_f(before_ts)
after_val = interop_f(after_ts)
costs.append((before_val - after_val, date))
# return costs
# test
for cost, date in costs:
print(f"{cost} kWh spent on {date}")
# plot history diagram with recharge events annotated
def plot_history(
history: list[tuple[float, dt.datetime, dt.datetime]],
recharges: list[tuple[float, int]],
):
# make extended_history
recharge_dict = dict(map(lambda vi: (vi[1], vi[0]), recharges))
extended_history: list[tuple[float, dt.datetime, dt.datetime]] = []
for i, (val, query_time, request_time) in enumerate(history):
extended_history.append((val, query_time, request_time))
if i in recharge_dict.keys():
before = history[i]
after = history[i + 1]
mid_val = (before[0] + after[0]) / 2
mid_time = before[1] + (after[1] - before[1]) / 2
recharge_val = recharge_dict[i]
extended_history.append((mid_val - recharge_val / 2, mid_time, None))
extended_history.append((mid_val + recharge_val / 2, mid_time, None))
def plot_segment(history_segment: list[tuple[float, dt.datetime, dt.datetime]]):
values = [item[0] for item in history_segment]
dates = [item[1] for item in history_segment]
plt.plot(
dates,
values,
# marker="o",
linestyle="-",
color="b",
label="Remaining Amount",
)
def plot_recharge(extended_history: list[tuple[float, dt.datetime, dt.datetime]]):
vbars = list(filter(lambda vqr: vqr[2] is None, extended_history))
assert len(vbars) % 2 == 0
for before, after in zip(vbars[0::2], vbars[1::2]):
plt.plot(
[before[1], after[1]], # x-axis
[before[0], after[0]], # y-axis
linestyle="-",
color="orange",
label="Recharge",
)
# 在垂直条的中间位置添加充电量的注释
plt.text(
before[1], # before[1] == after[1]
(after[0] + before[0]) / 2,
f"+{after[0] - before[0]} kWh",
color="red",
fontsize=10,
ha="right",
va="center",
rotation="vertical",
)
# split extended_history into segments
values = np.array(list(map(lambda x: x[0], extended_history)))
lshift = values[1:]
rcrop = values[:-1]
diff = rcrop - lshift
# recharges_count = np.count_nonzero(recharges_event)
begin_idx = 0
plot_recharge(extended_history)
for i in (
np.where(diff < 0)[0] if recharges else []
): # np.where returns [[]] when condition is never satisfied, and [i] fails
i += 1 # offset 1 from differentiation
plot_segment(extended_history[begin_idx:i])
begin_idx = i
plot_segment(extended_history[begin_idx:])
# plot an arrow to when the estimated exhaustion occurs with text description
def plot_exhaustion(
history_decharged: list[tuple[float, dt.datetime, dt.datetime]],
history_last: tuple[float, dt.datetime, dt.datetime],
):
tlast = history_last[1]
for i, (v, tq, tr) in enumerate(history_decharged):
if tlast - tq < estimate_timedelta:
break
history_decharged = history_decharged[i:]
# 分离出电量值和时间戳
values = np.array([item[0] for item in history_decharged])
timestamps = np.array([item[1].timestamp() for item in history_decharged])
# linear fit
slope, intercept = np.polyfit(timestamps, values, 1) # k, b
# calculate offset
y_est = slope * history_last[1].timestamp() + intercept
y_actual = history_last[0]
y_offset = y_actual - y_est
intercept += y_offset
# y=kx+b
# y=0 => kx=-b => x=-b/k
exhaustion_x = -intercept / slope
exhaustion_y = 0.0
ts_overflow = dt.datetime(3000, 1, 1, 0, 0, 0).timestamp() # 32503651200.0
if slope == 0.0 or abs(exhaustion_x) > ts_overflow:
print("low electricity usage")
return None
exhaustion_x = history_last[1] + warning_timedelta
exhaustion_y = slope * exhaustion_x.timestamp() + intercept
plt.text(
exhaustion_x,
exhaustion_y + 10,
f"no exhaustion\nuntil year 3000",
fontsize=10,
ha="center",
)
return None
print(f"slope={slope}, exhaustion={exhaustion_x}")
begin_x = timestamps[-1]
begin_y = slope * begin_x + intercept
begin_x = dt.datetime.fromtimestamp(begin_x)
# 将时间戳转换为datetime对象
exhaustion_x = dt.datetime.fromtimestamp(exhaustion_x)
print(f"exhaustion_x = {exhaustion_x}")
# time of exhausation threshold = 3 days
if (exhaustion_x - history_last[1]) >= warning_timedelta:
exhaustion_x = history_last[1] + warning_timedelta
exhaustion_y = slope * exhaustion_x.timestamp() + intercept
plt.text(
exhaustion_x,
exhaustion_y + 10,
f"no exhaustion\nwithin {warning_timedelta.days} days",
fontsize=10,
ha="center",
)
else:
time_diff = exhaustion_x - dt.datetime.now()
plt.annotate(
f"estimated exhaustion at\n{str(exhaustion_x).split('.')[0]}\nor {str(time_diff).split('.')[0]} later",
xy=(exhaustion_x, exhaustion_y), # 箭头指向的位置
xytext=(exhaustion_x, exhaustion_y + 10), # 箭头起始位置
arrowprops=dict(facecolor="red", shrink=0.05, headwidth=10, width=2),
fontsize=10,
ha="center",
)
# 绘制延长虚线
plt.plot(
[begin_x, exhaustion_x],
[begin_y, exhaustion_y],
linestyle="--",
color="gray",
label="Estimated Exhaustion",
)
return exhaustion_x
def plot_watts(history_decharged: list[tuple[float, dt.datetime, dt.datetime]]):
timestamps = np.array([x[1] for x in history_decharged])
widths = timestamps[1:] - timestamps[:-1]
timestamps = timestamps[:-1]
values = np.array([x[0] for x in history_decharged])
diffs = values[:-1] - values[1:]
watts = diffs / [x.total_seconds() for x in widths] * 3.6e6
plt.bar(timestamps, watts, width=widths, align="edge", color="skyblue")
def plot(cs: csv_storage):
# history = [
# (10.0, dt.datetime(2024, 8, 8, 0, 0, 0), dt.datetime(2024, 8, 8, 23, 59, 59)),
# (8.0, dt.datetime(2024, 8, 9, 0, 0, 0), dt.datetime(2024, 8, 9, 23, 59, 59)),
# (6.0, dt.datetime(2024, 8, 10, 0, 0, 0), dt.datetime(2024, 8, 10, 23, 59, 59)),
# (4.0, dt.datetime(2024, 8, 11, 0, 0, 0), dt.datetime(2024, 8, 11, 23, 59, 59)),
# (2.0, dt.datetime(2024, 8, 12, 0, 0, 0), dt.datetime(2024, 8, 12, 23, 59, 59)),
# (25, dt.datetime(2024, 8, 13, 0, 0, 0), dt.datetime(2024, 8, 13, 23, 59, 59)),
# (21, dt.datetime(2024, 8, 14, 0, 0, 0), dt.datetime(2024, 8, 13, 23, 59, 59)),
# (16, dt.datetime(2024, 8, 15, 0, 0, 0), dt.datetime(2024, 8, 13, 23, 59, 59)),
# (11, dt.datetime(2024, 8, 16, 0, 0, 0), dt.datetime(2024, 8, 13, 23, 59, 59)),
# ]
history = read_csv(cs)
history = filter_recent(history)
decharged, recharges = decharge(history)
costs = get_cost(decharged)
plt.figure(1, figsize=(10, 6))
exhaust_time = plot_exhaustion(decharged, history[-1])
plot_history(history, recharges)
print(f"estimate time of exhaustion: {exhaust_time}")
plt.title("History of Remaining Amount Over Time")
plt.xlabel("Date")
plt.ylabel("Remaining Amount (kWh)")
# auto format x-axis date
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d %H:%M"))
plt.gcf().autofmt_xdate()
# plt.legend()
# plt.show() # we don't have a display to show the plot in github actions
plt.savefig(f"{cs.filepath}/recent.png", format="png")
# delete figure
plt.clf()
# plot watts
plt.figure(2, figsize=(10, 6))
plot_watts(decharged)
plt.title("History of Power Consumption")
plt.xlabel("Date")
plt.ylabel("Power(W)")
# auto format x-axis date
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d %H:%M"))
plt.gcf().autofmt_xdate()
plt.savefig(f"{cs.filepath}/watts.png", format="png")
# delete figure
plt.clf()