Skip to content

Commit cba1147

Browse files
committed
combined all different methods into 1 file
Easier to maintain, less code dupe Signed-off-by: Max Chesterfield <max.chesterfield@zepben.com>
1 parent ea0f703 commit cba1147

4 files changed

Lines changed: 266 additions & 432 deletions
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
# Copyright 2025 Zeppelin Bend Pty Ltd
2+
#
3+
# This Source Code Form is subject to the terms of the Mozilla Public
4+
# License, v. 2.0. If a copy of the MPL was not distributed with this
5+
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
6+
7+
import asyncio
8+
import json
9+
import os
10+
import pandas as pd
11+
12+
from typing import Dict, Callable, List
13+
from dataclasses import dataclass
14+
15+
from zepben.evolve import connect_with_token, NetworkConsumerClient, Feeder, Tracing, downstream, StepActionWithContextValue, \
16+
NetworkTraceStep, EnergyConsumer, StepContext, IdentifiedObject, Breaker, Fuse, PowerTransformer, TransformerFunctionKind, TreeNode, EquipmentTreeBuilder, \
17+
ConductingEquipment, NetworkTrace, upstream
18+
from zepben.protobuf.nc.nc_requests_pb2 import IncludedEnergizedContainers
19+
20+
21+
@dataclass
22+
class EnergyConsumerDeviceHierarchy:
23+
energy_consumer_mrid: str
24+
lv_circuit_name: str
25+
upstream_switch_mrid: str
26+
lv_circuit_name: str
27+
upstream_switch_class: str
28+
distribution_power_transformer_mrid: str
29+
distribution_power_transformer_name: str
30+
regulator_mrid: str
31+
breaker_mrid: str
32+
feeder_mrid: str
33+
34+
35+
def _get_client():
36+
with open('config.json') as f:
37+
config = json.load(f)
38+
39+
# Connect to server
40+
channel = connect_with_token(**config)
41+
return NetworkConsumerClient(channel)
42+
43+
44+
async def get_feeders(_client = None) -> Dict[str, Feeder]:
45+
_feeders = (await (_client or _get_client()).get_network_hierarchy()).result.feeders
46+
return _feeders
47+
48+
49+
async def get_feeder_equipmet(client: NetworkConsumerClient, feeder_mrid: str) -> None:
50+
"""Get all objects under the feeder, including LV Feeders"""
51+
(await client.get_equipment_container(
52+
feeder_mrid,
53+
include_energized_containers=IncludedEnergizedContainers.INCLUDE_ENERGIZED_LV_FEEDERS
54+
)).throw_on_error()
55+
56+
57+
async def trace_from_energy_consumers(feeder_mrid: str, client=None):
58+
"""
59+
Least efficient/the slowest
60+
Inefficient upstream tracing example.
61+
Trace upstream from every EnergyConsumer.
62+
"""
63+
client = client or _get_client()
64+
await get_feeder_equipmet(client, feeder_mrid)
65+
66+
def _get_equipment_tree_trace(up_data: dict) -> NetworkTrace:
67+
def step_action(step: NetworkTraceStep, _: StepContext):
68+
to_equip: ConductingEquipment = step.path.to_equipment
69+
70+
if isinstance(to_equip, Breaker):
71+
if not up_data.get('breaker'):
72+
up_data['breaker'] = to_equip
73+
elif isinstance(to_equip, Fuse):
74+
if not up_data.get('upstream_switch'):
75+
up_data['upstream_switch'] = to_equip
76+
elif isinstance(to_equip, PowerTransformer):
77+
if not up_data.get('distribution_power_transformer'):
78+
up_data['distribution_power_transformer'] = to_equip
79+
elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator:
80+
up_data['regulator'] = to_equip
81+
82+
return (
83+
Tracing.network_trace()
84+
.add_condition(upstream())
85+
.add_step_action(step_action)
86+
)
87+
88+
feeder = client.service.get(feeder_mrid, Feeder)
89+
90+
energy_consumers = []
91+
for lvf in feeder.normal_energized_lv_feeders:
92+
for ce in lvf.equipment:
93+
if isinstance(ce, EnergyConsumer):
94+
up_data = {'feeder': feeder_mrid, 'energy_consumer_mrid': ce.mrid}
95+
96+
# Trace upstream from EnergyConsumer.
97+
await _get_equipment_tree_trace(up_data).run(ce)
98+
energy_consumers.append(_build_row(up_data))
99+
100+
write_csv(energy_consumers, feeder.mrid)
101+
102+
103+
async def trace_from_feeder_downstream(feeder_mrid: str, client=None):
104+
"""
105+
More memory use than `trace_from_feeder_context`, more efficient/faster than `trace_from_energy_consumers`
106+
Build an equipment tree of everything downstream of the feeder.
107+
Use the Equipment tree to recurse through parent equipment of all EC's and get the equipment we are interested in.
108+
"""
109+
def process_leaf(up_data: dict, leaf: TreeNode):
110+
to_equip: IdentifiedObject = leaf.identified_object
111+
112+
if isinstance(to_equip, Breaker):
113+
if not up_data.get('breaker'):
114+
up_data['breaker'] = to_equip
115+
elif isinstance(to_equip, Fuse):
116+
if not up_data.get('upstream_switch'):
117+
up_data['upstream_switch'] = to_equip
118+
elif isinstance(to_equip, PowerTransformer):
119+
if not up_data.get('distribution_power_transformer'):
120+
up_data['distribution_power_transformer'] = to_equip
121+
elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator:
122+
up_data['regulator'] = to_equip
123+
124+
client = client or _get_client()
125+
await get_feeder_equipmet(client, feeder_mrid)
126+
127+
builder = EquipmentTreeBuilder()
128+
129+
feeder = client.service.get(feeder_mrid, Feeder)
130+
await (
131+
Tracing.network_trace()
132+
.add_condition(downstream())
133+
.add_step_action(builder)
134+
).run(getattr(feeder, 'normal_head_terminal'))
135+
136+
energy_consumers = []
137+
138+
for leaf in (l for l in builder.leaves if isinstance((ec := l.identified_object), EnergyConsumer)):
139+
ec_data = {'feeder': feeder.mrid, 'energy_consumer_mrid': ec.mrid}
140+
def _process(_leaf):
141+
process_leaf(ec_data, _leaf)
142+
if _leaf.parent:
143+
_process(_leaf.parent)
144+
_process(leaf)
145+
146+
row = _build_row(ec_data)
147+
energy_consumers.append(row)
148+
149+
write_csv(energy_consumers, feeder.mrid)
150+
151+
152+
async def trace_from_feeder_context(feeder_mrid: str, client=None):
153+
"""
154+
Most efficient/fastest.
155+
trace downstream from the feeder recording relevant information using `NetworkTrace` `StepContext`.
156+
"""
157+
client = client or _get_client()
158+
# Get all objects under the feeder, including Substations and LV Feeders
159+
await get_feeder_equipmet(client, feeder_mrid)
160+
161+
energy_consumers = []
162+
163+
feeder = client.service.get(feeder_mrid, Feeder)
164+
165+
class StepActionWithContext(StepActionWithContextValue):
166+
def _apply(self, item: NetworkTraceStep, context: StepContext):
167+
if isinstance((ec := item.path.to_equipment), EnergyConsumer):
168+
nonlocal energy_consumers
169+
data = self.get_context_value(context)
170+
data.update({'feeder': feeder.mrid, 'energy_consumer_mrid': ec.mrid})
171+
172+
row = _build_row(data)
173+
energy_consumers.append(row)
174+
175+
def compute_next_value(self, next_item: NetworkTraceStep, current_item: NetworkTraceStep, current_value: Dict[str, IdentifiedObject]):
176+
data = dict(current_value)
177+
equip = next_item.path.to_equipment
178+
if isinstance(equip, Breaker):
179+
data['breaker'] = equip
180+
elif isinstance(equip, Fuse):
181+
data['upstream_switch'] = equip
182+
elif isinstance(equip, PowerTransformer):
183+
if equip.function == TransformerFunctionKind.distributionTransformer:
184+
data['distribution_power_transformer'] = equip
185+
elif equip.function == TransformerFunctionKind.voltageRegulator:
186+
data['regulator'] = equip
187+
return data
188+
189+
def compute_initial_value(self, item: NetworkTraceStep):
190+
return {}
191+
192+
await (
193+
Tracing.network_trace()
194+
.add_condition(downstream())
195+
.add_step_action(StepActionWithContext('key'))
196+
).run(getattr(feeder, 'normal_head_terminal'))
197+
198+
write_csv(energy_consumers, feeder.mrid)
199+
200+
201+
def write_csv(energy_consumers: List[EnergyConsumerDeviceHierarchy], feeder_mrid: str):
202+
network_objects = pd.DataFrame(energy_consumers)
203+
os.makedirs("csvs", exist_ok=True)
204+
network_objects.to_csv(f"csvs/{feeder_mrid}_energy_consumers.csv", index=False)
205+
206+
207+
class NullEquipment:
208+
"""empty class to simplify code below in the case of an equipment not existing in that position of the network"""
209+
mrid = None
210+
name = None
211+
212+
213+
def _build_row(up_data: dict[str, IdentifiedObject | str]) -> EnergyConsumerDeviceHierarchy:
214+
return EnergyConsumerDeviceHierarchy(
215+
energy_consumer_mrid = up_data['energy_consumer_mrid'],
216+
upstream_switch_mrid = (up_data.get('upstream_switch') or NullEquipment).mrid,
217+
lv_circuit_name = (up_data.get('upstream_switch') or NullEquipment).name,
218+
upstream_switch_class = type(up_data.get('upstream_switch')).__name__,
219+
distribution_power_transformer_mrid = (up_data.get('distribution_power_transformer') or NullEquipment).mrid,
220+
distribution_power_transformer_name = (up_data.get('distribution_power_transformer') or NullEquipment).name,
221+
regulator_mrid = (up_data.get('regulator') or NullEquipment).mrid,
222+
breaker_mrid = (up_data.get('breaker') or NullEquipment).mrid,
223+
feeder_mrid = up_data.get('feeder'),
224+
)
225+
226+
227+
def process_feeders_sequentially():
228+
async def main_async(trace_type: Callable):
229+
"""
230+
Fetch the equipment container from the given feeder and create a CSV with the relevant information.
231+
Differences between the functions passable as `trace_type` are documented in the relevant docstrings.
232+
233+
`trace_type` must be one of the following:
234+
- `trace_from_energy_consumers`
235+
- `trace_from_energy_consumers_with_context`
236+
- `trace_from_feeder_downstream`
237+
238+
"""
239+
from tqdm import tqdm
240+
client = _get_client()
241+
feeders = list(await get_feeders(client))
242+
for _feeder in tqdm(feeders):
243+
await trace_type(_feeder, client)
244+
245+
# Uncomment to run other trace functions
246+
asyncio.run(main_async(trace_from_feeder_context))
247+
#asyncio.run(main_async(trace_from_feeder_downstream))
248+
#asyncio.run(main_async(trace_from_energy_consumers))
249+
250+
251+
def process_feeders_concurrently():
252+
def multi_proc(_feeder):
253+
# Uncomment to run other trace functions
254+
asyncio.run(trace_from_feeder_context(_feeder))
255+
#asyncio.run(trace_from_feeder_downstream(_feeder))
256+
#asyncio.run(trace_from_energy_consumers(_feeder))
257+
258+
# Get a list of feeders before entering main compute section of script.
259+
feeders = list(asyncio.run(get_feeders()))
260+
261+
from tqdm.contrib.concurrent import process_map
262+
process_map(multi_proc, feeders ,max_workers=int(os.cpu_count() / 2))
263+
264+
265+
if __name__ == "__main__":
266+
process_feeders_sequentially()

0 commit comments

Comments
 (0)