-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathscript_v3
More file actions
402 lines (345 loc) · 20.8 KB
/
script_v3
File metadata and controls
402 lines (345 loc) · 20.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
# Priority Rules Decision
# Powston Inverter Decision Script — V3.1 Revised Forecast Shaping + Planner + Kiosk
# Comments: powston.com.au@bol.la
# ─────────────────────────────────────────────────────────────────────────────
# ⚡ PRICING / BEHAVIOUR CONSTANTS (c/kWh and %)
# ─────────────────────────────────────────────────────────────────────────────
peak_time = 16 # Evening peak window start (hour, 24h)
peak_time_end = 20 # Evening peak window end (hour)
max_am_buy_price = 40.0 # Max import price for AM/day charging
min_sell_price = 25.0 # Min price to export energy
min_day_sell_price = 1.0 # Allow daytime export ≥ this (avoid curtail)
always_sell_price = 100.0 # Force export if sell price ≥ this
min_sell_soc = 20 # Min SOC (%) to allow export
discharge_discount = 0.80 # Use 80% of discharge headroom for pre-empt
desired_margin = 5.0 # Margin (c/kWh) for arbitrage comparisons
# Asymmetric uncertainty shaping (per hour)
buy_uncertainty_discount = 0.03 # Buy forecast “drifts up” per hour
sell_uncertainty_discount = 0.07 # Sell forecast “drifts down” per hour
future_forecast_hours = 8.0 # Look-ahead window (hours)
forecast_margin_kwh = 2.0 # Extra buffer to avoid undercharging
# Day/Night fallback minimum house load (Wh per inverter) if telemetry missing
min_day_house_power = 2000.0
min_night_house_power = 1000.0
# Facility / battery config (some are also provided by Powston runtime)
facility_name = "65 Qld" # noqa Display only
num_inverters = 2 # Total inverters in the site
inverter_ids = [43923, 43924] # For cross-inverter SoC averaging
solar_active_hours = 2.0 # Hours after sunrise to consider “day”
max_charge_rate_kW = 10.0 # Per-site target charge power (kW)
full_charge_discount = 0.8 # Planner safety discount on max rate
full_battery = 100.0 # Target full SoC (%)
# ─────────────────────────────────────────────────────────────────────────────
# Helpers (no leading underscores, no lambdas)
# ─────────────────────────────────────────────────────────────────────────────
def as_float_safe(val, default=0.0):
try:
return float(val)
except Exception:
return float(default)
def clean_number_list(seq, default_value):
if not isinstance(seq, list):
return [default_value]
out = []
for x in seq:
if isinstance(x, (int, float)):
out.append(float(x))
return out or [default_value]
def sort_by_second(pair):
# key function for sorted(); avoids lambda (Powston disallows lambda)
return pair[1]
# --- Safe sort/min/max key helpers (Powston may pass extra args) ---
def key_second(item, *args, **kwargs):
"""Return item[1] if it's a tuple/list with ≥2 items; else return item."""
try:
return item[1]
except Exception:
return item
# ─────────────────────────────────────────────────────────────────────────────
# MQTT feed (optional) — site-specific PV forecast inputs
# ─────────────────────────────────────────────────────────────────────────────
solar_estimate_remaining = 0.0
solar_surplus_deficit = 0.0
try:
ser = mqtt_data.get('solar_estimate', {})
solar_estimate_remaining = as_float_safe(ser.get('solar_estimate_remaining'), 0.0)
solar_surplus_deficit = as_float_safe(ser.get('solar_surplus_deficit'), 0.0)
except Exception:
solar_estimate_remaining = 0.0 # noqa
solar_surplus_deficit = 0.0
# ─────────────────────────────────────────────────────────────────────────────
# Combine inverter SoCs where available
# ─────────────────────────────────────────────────────────────────────────────
try:
socs = []
for inv_id in inverter_ids:
soc_val = inverters.get(f'inverter_params_{inv_id}', {}).get('battery_soc')
if isinstance(soc_val, (int, float)):
socs.append(float(soc_val))
if socs:
combined_battery_soc = sum(socs) / len(socs)
else:
combined_battery_soc = as_float_safe(battery_soc, 0.0)
except Exception:
combined_battery_soc = as_float_safe(battery_soc, 0.0)
# Battery capacity (Powston reports in Wh)
battery_capacity_Wh = battery_capacity if isinstance(battery_capacity, (int, float)) and battery_capacity > 0 else 25600.0
battery_capacity_kWh = battery_capacity_Wh / 1000.0
# ─────────────────────────────────────────────────────────────────────────────
# Inputs: buy/sell now + forecasts (clean + shape with uncertainty)
# ─────────────────────────────────────────────────────────────────────────────
buy_price = as_float_safe(buy_price, 0.0)
sell_price = as_float_safe(sell_price, 0.0)
buy_forecast = clean_number_list(buy_forecast, 9999.0)
sell_forecast = clean_number_list(sell_forecast, 0.0)
discounted_buy_forecast = []
discounted_sell_forecast = []
# shape horizons independently to reflect different uncertainty
for i in range(int(future_forecast_hours)):
if i < len(buy_forecast):
discounted_buy_forecast.append(buy_forecast[i] * ((1 + buy_uncertainty_discount) ** i))
if i < len(sell_forecast):
discounted_sell_forecast.append(sell_forecast[i] * ((1 - sell_uncertainty_discount) ** i))
if not discounted_buy_forecast:
discounted_buy_forecast = [9999.0]
if not discounted_sell_forecast:
discounted_sell_forecast = [0.0]
# ─────────────────────────────────────────────────────────────────────────────
# Time / sunrise windowing
# ─────────────────────────────────────────────────────────────────────────────
local_time = interval_time
current_hour = local_time.hour
# If sunrise/sunset after local_time, normalize to “today” context
normalized_sunrise = sunrise if sunrise <= local_time else sunrise - timedelta(days=1)
normalized_sunset = sunset if sunset <= local_time else sunset - timedelta(days=1)
sunrise_plus_active = normalized_sunrise + timedelta(hours=solar_active_hours)
sunset_minus_active = normalized_sunset - timedelta(hours=solar_active_hours) # noqa
# Hours until sunrise_plus_active (bounded for sanity only)
hours_until_sunrise_plus_active = (sunrise_plus_active - local_time).total_seconds() / 3600.0
if hours_until_sunrise_plus_active > 24:
hours_until_sunrise_plus_active -= 24
# Daylight heuristic (window starts solar_active_hours after sunrise, and before peak)
daytime = (local_time >= sunrise_plus_active) and (local_time.hour < peak_time)
# Reserve-factor easing from sunrise to sunrise+solar_active_hours
if 0 <= hours_until_sunrise_plus_active <= max(solar_active_hours, 0.0001):
reserve_factor = max(0.0, 1.0 - (hours_until_sunrise_plus_active / max(solar_active_hours, 1.0)))
else:
reserve_factor = 1.0
# ─────────────────────────────────────────────────────────────────────────────
# Compute “required_min_soc” to bridge to the cheapest upcoming buy hour
# ─────────────────────────────────────────────────────────────────────────────
house_pwr = house_power if isinstance(house_power, (int, float)) else 0.0
effective_min_house_power = min_day_house_power if 7 <= current_hour < 22 else min_night_house_power
effective_house_power = max(house_pwr / max(num_inverters, 1), effective_min_house_power / max(num_inverters, 1))
# When is the lowest buy?
index_lowest_buy = 0
if buy_forecast:
try:
index_lowest_buy = int(buy_forecast.index(min(buy_forecast)))
except Exception:
index_lowest_buy = 0
hours_until_lowest_buy = index_lowest_buy
estimated_consumption_Wh = effective_house_power * hours_until_lowest_buy
required_min_soc = reserve_factor * (estimated_consumption_Wh / max(battery_capacity_Wh, 1.0)) * 100.0
# ─────────────────────────────────────────────────────────────────────────────
# Charge-for-peak planner (function only)
# ─────────────────────────────────────────────────────────────────────────────
def charging_plan_inline(
current_hour_val,
combined_battery_soc_val,
solar_surplus_deficit_val,
discounted_buy_forecast_val,
battery_capacity_Wh_val,
full_battery_val,
max_charge_rate_W_val,
peak_time_val,
future_forecast_hours_val,
forecast_margin_kwh_val,
max_am_buy_price_val,
buy_price_val,
):
"""Pick cheapest positive-tariff hours before peak to reach the full-battery target (with buffer)."""
if combined_battery_soc_val >= full_battery_val:
return False, "Battery at/above full target.", [], []
# If PV can cover load+buffer, skip grid charging
if (solar_surplus_deficit_val - forecast_margin_kwh_val) >= 0:
return False, "Solar expected to cover load + buffer.", [], []
batt_kwh = (battery_capacity_Wh_val / 1000.0) if battery_capacity_Wh_val else 0.0
max_rate_kW = (max_charge_rate_W_val / 1000.0) if max_charge_rate_W_val else 0.0
if batt_kwh <= 0.0 or max_rate_kW <= 0.0:
return False, "Invalid battery or charge-rate configuration.", [], []
required_kwh = max(0.0, (full_battery_val - combined_battery_soc_val) / 100.0 * batt_kwh)
adjusted_required_kwh = required_kwh + forecast_margin_kwh_val
finish_by_hour = max(0, int(peak_time_val) - 1)
if finish_by_hour < current_hour_val:
return False, "Past the planning window for pre-peak charge.", [], []
# candidate hours within window
window = []
max_len = int(future_forecast_hours_val)
max_idx = max_len if max_len <= len(discounted_buy_forecast_val) else len(discounted_buy_forecast_val)
for i in range(max_idx):
h = current_hour_val + i
price = discounted_buy_forecast_val[i]
if h <= finish_by_hour and isinstance(price, (int, float)) and price > 0 and price <= max_am_buy_price_val:
window.append((i, float(price)))
if not window:
return False, "No acceptable buy hours before peak under constraints.", [], []
# Define a key func locally (no lambda/imports); accept extra args just in case
def sort_key_price(item, *args, **kwargs):
return item[1]
window_sorted = sorted(window, key=sort_key_price)
required_hours = int(adjusted_required_kwh / max_rate_kW + 0.999)
chosen = window_sorted[: min(required_hours, len(window_sorted))]
hours_to_charge = [i for i, _ in chosen]
schedule = [(current_hour_val + i, p) for i, p in chosen]
# Deadline feasibility: if we skip “now”, can we still meet target?
remaining_hours = max(0, finish_by_hour - current_hour_val + 1)
must_start = (required_hours >= remaining_hours)
can_meet_without_now = (max_rate_kW * max(0, remaining_hours - 1)) >= adjusted_required_kwh
must_start = must_start or (not can_meet_without_now)
achievable_kwh = max_rate_kW * len(hours_to_charge)
start_now = must_start or (0 in hours_to_charge) or (buy_price_val > 0 and buy_price_val <= min(p for _, p in window))
need_str = f"Need {adjusted_required_kwh:.2f}kWh; can get ~{achievable_kwh:.2f}kWh pre-peak."
if start_now:
return True, f"Charge now; {need_str}", hours_to_charge, schedule
return False, f"Plan later hours; {need_str}", hours_to_charge, schedule
# Planner call (positionally to avoid linter keyword warnings)
plan_tuple = charging_plan_inline(
current_hour,
combined_battery_soc,
solar_surplus_deficit,
discounted_buy_forecast,
battery_capacity_Wh,
full_battery,
int(max_charge_rate_kW * 1000 * full_charge_discount), # W, apply safety discount
peak_time,
future_forecast_hours,
forecast_margin_kwh,
max_am_buy_price,
buy_price,
)
charge_now = plan_tuple[0]
charge_now_reason = plan_tuple[1]
# ─────────────────────────────────────────────────────────────────────────────
# Begin decision evaluations (flattened, priority-based)
# ─────────────────────────────────────────────────────────────────────────────
action = decisions.reason('auto', 'start', priority=1) # seed default
solar = 'maximize' # default PV stance unless curtailed explicitly
# Common values for logs
base_buy = round(buy_price, 2)
base_sell = round(sell_price, 2)
base_soc = round(combined_battery_soc, 1)
base_need = round(required_min_soc, 1)
# Period tag (info only)
am_peak_start = sunrise_plus_active - timedelta(hours=3)
am_peak_end = sunrise_plus_active
if am_peak_start <= local_time < am_peak_end:
time_period = 'AM Peak'
elif peak_time <= current_hour <= peak_time_end:
time_period = 'Peak'
elif daytime:
time_period = 'Day'
else:
time_period = 'Night'
decisions.reason('auto', f'period={time_period}', priority=1, buy=base_buy, sell=base_sell, soc=base_soc, need=base_need)
# ----- Priority 4: Emergencies / hard edges -----
# 4A) Negative FiT (free/paid to import) → buy & curtail solar
if buy_price <= 0:
action = decisions.reason('import', 'negative buy price — import & curtail', priority=4,
buy=base_buy, sell=base_sell, soc=base_soc)
solar = 'curtail'
# 4B) Always-sell threshold reached → export immediately (with battery protection)
if sell_price >= always_sell_price:
has_disc_buy = bool(discounted_buy_forecast)
min_disc_buy = min(discounted_buy_forecast) if has_disc_buy else None
ok_to_sell_now = (
(has_disc_buy and min_disc_buy is not None and sell_price >= (min_disc_buy + desired_margin))
or (combined_battery_soc > required_min_soc)
)
if ok_to_sell_now:
action = decisions.reason('export', 'always-sell threshold reached', priority=4,
buy=base_buy, sell=base_sell, soc=base_soc,
min_upcoming_buy=(round(min_disc_buy, 2) if min_disc_buy is not None else None),
need=base_need)
# ----- Priority 3: Ensure we’re full by peak (plan + panic window) -----
# Plan charge window outcome
if charge_now and daytime:
action = decisions.reason('import', f'charge for peak — {charge_now_reason}', priority=3,
buy=base_buy, sell=base_sell, soc=base_soc)
# Panic fill between 15:00–16:00 if still low and price is acceptable
if 14 < local_time.hour < 16 and combined_battery_soc < 60 and buy_price < 30:
action = decisions.reason('import', 'panic fill before evening peak', priority=3,
buy=base_buy, sell=base_sell, soc=base_soc, threshold=30)
# ----- Priority 2: Normal trading logic + PM peak lock -----
# PM Peak import lock / selling preference
if time_period == 'Peak':
# Prefer selling during peak if it beats upcoming buys or SoC is healthy
has_disc_buy = bool(discounted_buy_forecast)
min_disc_buy = min(discounted_buy_forecast) if has_disc_buy else None
should_full_export = (
sell_price >= min_sell_price and
(
(has_disc_buy and min_disc_buy is not None and sell_price >= (min_disc_buy + desired_margin))
or (combined_battery_soc > required_min_soc)
)
)
if should_full_export:
action = decisions.reason('export', 'PM peak sell — profitable vs upcoming buys', priority=2,
buy=base_buy, sell=base_sell, soc=base_soc, need=base_need)
solar = 'maximize'
else:
action = decisions.reason('export', 'PM peak import lock — trickle export', priority=2,
buy=base_buy, sell=base_sell, soc=base_soc, need=base_need)
solar = 'maximize'
feed_in_power_limitation = 100
# “Sell now” if better than upcoming buys + margin, and price is decent
has_disc_buy = bool(discounted_buy_forecast)
min_disc_buy = min(discounted_buy_forecast) if has_disc_buy else None
if (
combined_battery_soc > required_min_soc and
has_disc_buy and min_disc_buy is not None and
sell_price >= (min_disc_buy + desired_margin) and
sell_price >= min_sell_price
):
action = decisions.reason('export', 'sell now — beats upcoming buys + margin', priority=2,
buy=base_buy, sell=base_sell, soc=base_soc,
min_disc_buy=round(min_disc_buy, 2), margin=desired_margin)
# AM-peak opportunistic sell (refillable via PV or cheap day import)
cutoff_index = min(len(discounted_buy_forecast), int(solar_active_hours))
has_day_buy = bool(discounted_buy_forecast and cutoff_index < len(discounted_buy_forecast))
min_day_buy = (min(discounted_buy_forecast[cutoff_index:]) if has_day_buy else None)
day_buy_ok = bool(has_day_buy and min_day_buy is not None and min_day_buy <= max_am_buy_price)
pv_refill_ok = bool(solar_surplus_deficit is not None and solar_surplus_deficit >= forecast_margin_kwh)
if (
time_period == 'AM Peak' and
combined_battery_soc > min_sell_soc and
sell_price >= min_sell_price and
(pv_refill_ok or day_buy_ok)
):
action = decisions.reason('export', 'AM sell — refillable via solar/cheap day import', priority=2,
buy=base_buy, sell=base_sell, soc=base_soc,
pv_surplus=solar_surplus_deficit,
min_day_buy=(round(min_day_buy, 2) if min_day_buy is not None else None))
# Daytime curtailment avoidance — export to make room if negative FiT is ahead
if (
time_period == 'Day' and
any((p is not None and p < 0) for p in sell_forecast) and
sell_price >= min_day_sell_price and
combined_battery_soc > 20 and
solar_surplus_deficit is not None
):
available_discharge_kWh = ((combined_battery_soc - 20) / 100.0 * battery_capacity_kWh) * discharge_discount
if solar_surplus_deficit > (available_discharge_kWh + forecast_margin_kwh):
action = decisions.reason('export', 'pre-emptive export to avoid curtailment', priority=2,
buy=base_buy, sell=base_sell, soc=base_soc,
pv_surplus=solar_surplus_deficit,
avail_kwh=round(available_discharge_kWh, 2))
# Full battery + negative FiT → curtail (don’t export at a loss)
if sell_price < 0.0:
action = decisions.reason('auto', 'curtail — full battery & negative FiT', priority=2,
buy=base_buy, sell=base_sell, soc=base_soc)
feed_in_power_limitation is not None
# Informational: missed sell opportunity (SoC too low)
if (discounted_sell_forecast and sell_price >= max(discounted_sell_forecast) and sell_price >= min_sell_price):
decisions.reason('auto', 'missed sell — SoC insufficient', priority=1,
buy=base_buy, sell=base_sell, soc=base_soc)