Skip to content

Commit db3243f

Browse files
authored
[DEV-3394] Device tree trace (#23)
Signed-off-by: Max Chesterfield <max.chesterfield@zepben.com>
1 parent 409bd08 commit db3243f

File tree

2 files changed

+148
-1
lines changed

2 files changed

+148
-1
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
python_requires='>=3.9, <3.13',
2222
install_requires=[
2323
"zepben.eas==0.17.1",
24-
"zepben.evolve==0.45.0",
24+
"zepben.evolve==0.48.0",
2525
"numba==0.60.0",
2626
"geojson==2.5.0",
2727
"gql[requests]==3.4.1"
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
# Copyright 2025 Zeppelin Bend Pty Ltd
2+
#
3+
# This Source Code Form is subject to the terms of the Mozilla Public
4+
# License, v. 2.0. If a copy of the MPL was not distributed with this
5+
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
6+
7+
import asyncio
8+
import json
9+
import os
10+
from dataclasses import dataclass
11+
from multiprocessing import Pool
12+
13+
import pandas as pd
14+
from zepben.evolve import NetworkConsumerClient, connect_with_token, Tracing, upstream, EnergyConsumer, NetworkTraceStep, StepContext, PowerTransformer, \
15+
TransformerFunctionKind, Breaker, ConductingEquipment, Fuse, IdentifiedObject, NetworkTrace
16+
from zepben.protobuf.nc.nc_requests_pb2 import IncludedEnergizingContainers, IncludedEnergizedContainers
17+
18+
19+
@dataclass
20+
class EnergyConsumerDeviceHeirarchy:
21+
energy_consumer_mrid: str
22+
lv_circuit_name: str
23+
upstream_switch_mrid: str
24+
lv_circuit_name: str
25+
upstream_switch_class: str
26+
distribution_power_transformer_mrid: str
27+
distribution_power_transformer_name: str
28+
regulator_mrid: str
29+
breaker_mrid: str
30+
feeder_mrid: str
31+
32+
33+
def _get_client():
34+
with open('config.json') as f:
35+
config = json.load(f)
36+
37+
# Connect to server
38+
channel = connect_with_token(
39+
host=config["host"],
40+
access_token=config["access_token"],
41+
rpc_port=config['port'],
42+
ca_filename=config['ca_filename']
43+
)
44+
return NetworkConsumerClient(channel)
45+
46+
47+
def _get_equipment_tree_trace(up_data: dict) -> NetworkTrace:
48+
49+
def step_action(step: NetworkTraceStep, _: StepContext):
50+
to_equip: ConductingEquipment = step.path.to_equipment
51+
if isinstance(to_equip, Breaker):
52+
if not up_data.get('breaker'):
53+
up_data['breaker'] = to_equip
54+
elif isinstance(to_equip, Fuse):
55+
if not up_data.get('upstream_switch'):
56+
up_data['upstream_switch'] = to_equip
57+
elif isinstance(to_equip, PowerTransformer):
58+
if not up_data.get('distribution_power_transformer'):
59+
up_data['distribution_power_transformer'] = to_equip
60+
elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator:
61+
up_data['regulator'] = to_equip
62+
63+
return (
64+
Tracing.network_trace()
65+
.add_condition(upstream())
66+
.add_step_action(step_action)
67+
)
68+
69+
70+
async def get_feeders():
71+
client = _get_client()
72+
73+
_feeders = (await client.get_network_hierarchy()).result.feeders
74+
return _feeders
75+
76+
77+
async def trace_from_ec(feeder):
78+
"""
79+
Fetch the equipment container from the given feeder, then trace upstream from every EnergyConsumer
80+
and create a CSV with the relevant information.
81+
"""
82+
client = _get_client()
83+
print(f'processing feeder {feeder}')
84+
# Get all objects under the feeder, including Substations and LV Feeders
85+
feeder_objects = (
86+
await client.get_equipment_container(
87+
feeder,
88+
include_energizing_containers = IncludedEnergizingContainers.INCLUDE_ENERGIZING_SUBSTATIONS,
89+
include_energized_containers = IncludedEnergizedContainers.INCLUDE_ENERGIZED_LV_FEEDERS
90+
)
91+
).result.objects
92+
93+
energy_consumers = []
94+
95+
for up in feeder_objects.values():
96+
if isinstance(up, EnergyConsumer):
97+
up_data = {'feeder': feeder, 'energy_consumer_mrid': up.mrid}
98+
99+
# Trace upstream from EnergyConsumer.
100+
await _get_equipment_tree_trace(up_data).run(up)
101+
energy_consumers.append(_build_row(up_data))
102+
103+
csv_sfx = "energy_consumers.csv"
104+
network_objects = pd.DataFrame(energy_consumers)
105+
os.makedirs("csvs", exist_ok=True)
106+
network_objects.to_csv(f"csvs/{feeder}_{csv_sfx}", index=False)
107+
108+
109+
class NullEquipment:
110+
"""empty class to simplify code below in the case of an equipment not existing in that position of the network"""
111+
mrid = None
112+
name = None
113+
114+
115+
def _build_row(up_data: dict[str, IdentifiedObject | str]) -> EnergyConsumerDeviceHeirarchy:
116+
return EnergyConsumerDeviceHeirarchy(
117+
energy_consumer_mrid = up_data['energy_consumer_mrid'],
118+
upstream_switch_mrid = (up_data.get('upstream_switch') or NullEquipment).mrid,
119+
lv_circuit_name = (up_data.get('upstream_switch') or NullEquipment).name,
120+
upstream_switch_class = type(up_data.get('upstream_switch')).__name__,
121+
distribution_power_transformer_mrid = (up_data.get('distribution_power_transformer') or NullEquipment).mrid,
122+
distribution_power_transformer_name = (up_data.get('distribution_power_transformer') or NullEquipment).name,
123+
regulator_mrid = (up_data.get('regulator') or NullEquipment).mrid,
124+
breaker_mrid = (up_data.get('breaker') or NullEquipment).mrid,
125+
feeder_mrid = up_data.get('feeder'),
126+
)
127+
128+
129+
def process_target(feeder):
130+
asyncio.run(trace_from_ec(feeder))
131+
132+
133+
if __name__ == "__main__":
134+
# Get a list of feeders before entering main compute section of script.
135+
feeders = asyncio.run(get_feeders())
136+
137+
# Spin up a multiprocess pool of $CPU_COUNT processes to handle the workload, otherwise we saturate a single cpu core and it's slow.
138+
cpus = os.cpu_count()
139+
print(f'Spawning {cpus} processes')
140+
pool = Pool(cpus)
141+
142+
print(f'mapping to process pool')
143+
pool.map(process_target, feeders)
144+
145+
print('finishing remaining processes')
146+
pool.close()
147+
pool.join()

0 commit comments

Comments
 (0)