Skip to content

Commit e36275e

Browse files
chestm007kgreav
andauthored
Add another example to use NetworkTrace StopContext to track equipment instead (#30)
Signed-off-by: Max Chesterfield <max.chesterfield@zepben.com> Signed-off-by: Kurt Greaves <kurt.greaves@zepben.com> Co-authored-by: Kurt Greaves <kurt.greaves@zepben.com>
1 parent 8389ba5 commit e36275e

3 files changed

Lines changed: 186 additions & 201 deletions

File tree

pyproject.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@ authors = [
2323
{name = "Zeppelin Bend", email = "oss@zepben.com"}
2424
]
2525
dependencies = [
26-
"zepben.eas==0.19.0",
27-
"zepben.ewb==1.0.0b7",
26+
"zepben.eas==0.23.0",
27+
"zepben.ewb==1.0.3",
2828
"numba==0.60.0",
2929
"geojson==2.5.0",
3030
"gql[requests]==3.4.1",
3131
"geopandas",
3232
"pandas",
33-
"shapely"
33+
"shapely",
34+
"tqdm"
3435
]
3536
classifiers = [
3637
"Programming Language :: Python :: 3",

src/zepben/examples/energy_consumer_device_hierarchy.py

Lines changed: 182 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,13 @@
77
import asyncio
88
import json
99
import os
10-
from dataclasses import dataclass
11-
from multiprocessing import Pool
12-
from typing import Union
13-
1410
import pandas as pd
15-
from zepben.ewb import NetworkConsumerClient, connect_with_token, Tracing, upstream, EnergyConsumer, NetworkTraceStep, StepContext, PowerTransformer, \
16-
TransformerFunctionKind, Breaker, ConductingEquipment, Fuse, IdentifiedObject, NetworkTrace, Feeder
17-
from zepben.protobuf.nc.nc_requests_pb2 import IncludedEnergizedContainers
11+
12+
from typing import Dict, Callable, List
13+
from dataclasses import dataclass
14+
from zepben.ewb import connect_with_token, NetworkConsumerClient, Feeder, Tracing, downstream, StepActionWithContextValue, \
15+
NetworkTraceStep, EnergyConsumer, StepContext, IdentifiedObject, Breaker, Fuse, PowerTransformer, TransformerFunctionKind, TreeNode, EquipmentTreeBuilder, \
16+
ConductingEquipment, NetworkTrace, upstream, IncludedEnergizedContainers
1817

1918

2019
@dataclass
@@ -35,19 +34,80 @@ def _get_client():
3534
with open('config.json') as f:
3635
config = json.load(f)
3736

38-
# Connect to server
39-
channel = connect_with_token(
40-
host=config["host"],
41-
access_token=config["access_token"],
42-
rpc_port=config['rpc_port'],
43-
ca_filename=config['ca_path']
44-
)
37+
# Connect to server
38+
channel = connect_with_token(**config)
4539
return NetworkConsumerClient(channel)
4640

4741

48-
def _get_equipment_tree_trace(up_data: dict) -> NetworkTrace:
49-
def step_action(step: NetworkTraceStep, _: StepContext):
50-
to_equip: ConductingEquipment = step.path.to_equipment
42+
async def get_feeders(_client=None) -> Dict[str, Feeder]:
43+
_feeders = (await (_client or _get_client()).get_network_hierarchy()).result.feeders
44+
return _feeders
45+
46+
47+
async def get_feeder_equipment(client: NetworkConsumerClient, feeder_mrid: str) -> None:
48+
"""Get all objects under the feeder, including LV Feeders"""
49+
(await client.get_equipment_container(
50+
feeder_mrid,
51+
include_energized_containers=IncludedEnergizedContainers.LV_FEEDERS
52+
)).throw_on_error()
53+
54+
55+
async def trace_from_energy_consumers(feeder_mrid: str, client=None):
56+
"""
57+
Least efficient/the slowest
58+
Inefficient upstream tracing example.
59+
Trace upstream from every EnergyConsumer.
60+
"""
61+
client = client or _get_client()
62+
await get_feeder_equipment(client, feeder_mrid)
63+
64+
def _get_equipment_tree_trace(up_data: dict) -> NetworkTrace:
65+
def step_action(step: NetworkTraceStep, _: StepContext):
66+
to_equip: ConductingEquipment = step.path.to_equipment
67+
68+
if isinstance(to_equip, Breaker):
69+
if not up_data.get('breaker'):
70+
up_data['breaker'] = to_equip
71+
elif isinstance(to_equip, Fuse):
72+
if not up_data.get('upstream_switch'):
73+
up_data['upstream_switch'] = to_equip
74+
elif isinstance(to_equip, PowerTransformer):
75+
if not up_data.get('distribution_power_transformer'):
76+
up_data['distribution_power_transformer'] = to_equip
77+
elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator:
78+
up_data['regulator'] = to_equip
79+
80+
return (
81+
Tracing.network_trace()
82+
.add_condition(upstream())
83+
.add_step_action(step_action)
84+
)
85+
86+
feeder = client.service.get(feeder_mrid, Feeder)
87+
88+
energy_consumers = []
89+
for lvf in feeder.normal_energized_lv_feeders:
90+
for ce in lvf.equipment:
91+
if isinstance(ce, EnergyConsumer):
92+
up_data = {'feeder': feeder_mrid, 'energy_consumer_mrid': ce.mrid}
93+
94+
# Trace upstream from EnergyConsumer.
95+
await _get_equipment_tree_trace(up_data).run(ce)
96+
energy_consumers.append(_build_row(up_data))
97+
98+
write_csv(energy_consumers, feeder.mrid)
99+
100+
101+
async def trace_from_feeder_downstream(feeder_mrid: str, client=None):
102+
"""
103+
More memory use than `trace_from_feeder_context`, more efficient/faster than `trace_from_energy_consumers`
104+
Build an equipment tree of everything downstream of the feeder.
105+
Use the Equipment tree to recurse through parent equipment of all EC's and get the equipment we are interested in.
106+
"""
107+
108+
def process_leaf(up_data: dict, leaf: TreeNode):
109+
to_equip: IdentifiedObject = leaf.identified_object
110+
51111
if isinstance(to_equip, Breaker):
52112
if not up_data.get('breaker'):
53113
up_data['breaker'] = to_equip
@@ -60,46 +120,89 @@ def step_action(step: NetworkTraceStep, _: StepContext):
60120
elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator:
61121
up_data['regulator'] = to_equip
62122

63-
return (
123+
client = client or _get_client()
124+
await get_feeder_equipment(client, feeder_mrid)
125+
126+
builder = EquipmentTreeBuilder()
127+
128+
feeder = client.service.get(feeder_mrid, Feeder)
129+
await (
64130
Tracing.network_trace()
65-
.add_condition(upstream())
66-
.add_step_action(step_action)
67-
)
131+
.add_condition(downstream())
132+
.add_step_action(builder)
133+
).run(getattr(feeder, 'normal_head_terminal'))
134+
135+
energy_consumers = []
68136

137+
for leaf in (l for l in builder.leaves if isinstance((ec := l.identified_object), EnergyConsumer)):
138+
ec_data = {'feeder': feeder.mrid, 'energy_consumer_mrid': ec.mrid}
69139

70-
async def get_feeders():
71-
client = _get_client()
140+
def _process(_leaf):
141+
process_leaf(ec_data, _leaf)
142+
if _leaf.parent:
143+
_process(_leaf.parent)
72144

73-
_feeders = (await client.get_network_hierarchy()).result.feeders
74-
return _feeders
145+
_process(leaf)
75146

147+
row = _build_row(ec_data)
148+
energy_consumers.append(row)
76149

77-
async def trace_from_energy_consumers(feeder):
150+
write_csv(energy_consumers, feeder.mrid)
151+
152+
153+
async def trace_from_feeder_context(feeder_mrid: str, client=None):
78154
"""
79-
Fetch the equipment container from the given feeder, then trace upstream from every EnergyConsumer
80-
and create a CSV with the relevant information.
155+
Most efficient/fastest.
156+
trace downstream from the feeder recording relevant information using `NetworkTrace` `StepContext`.
81157
"""
82-
client = _get_client()
83-
print(f'processing feeder {feeder}')
158+
client = client or _get_client()
84159
# Get all objects under the feeder, including Substations and LV Feeders
85-
(await client.get_equipment_container(feeder, include_energized_containers=IncludedEnergizedContainers.INCLUDE_ENERGIZED_LV_FEEDERS)).throw_on_error()
86-
network = client.service
87-
f = network.get(feeder, Feeder)
160+
await get_feeder_equipment(client, feeder_mrid)
88161

89162
energy_consumers = []
90-
for lvf in f.normal_energized_lv_feeders:
91-
for ce in lvf.equipment:
92-
if isinstance(ce, EnergyConsumer):
93-
up_data = {'feeder': feeder, 'energy_consumer_mrid': ce.mrid}
94163

95-
# Trace upstream from EnergyConsumer.
96-
await _get_equipment_tree_trace(up_data).run(ce)
97-
energy_consumers.append(_build_row(up_data))
164+
feeder = client.service.get(feeder_mrid, Feeder)
98165

99-
csv_sfx = "energy_consumers.csv"
166+
class StepActionWithContext(StepActionWithContextValue):
167+
def _apply(self, item: NetworkTraceStep, context: StepContext):
168+
if isinstance((ec := item.path.to_equipment), EnergyConsumer):
169+
nonlocal energy_consumers
170+
data = self.get_context_value(context)
171+
data.update({'feeder': feeder.mrid, 'energy_consumer_mrid': ec.mrid})
172+
173+
row = _build_row(data)
174+
energy_consumers.append(row)
175+
176+
def compute_next_value(self, next_item: NetworkTraceStep, current_item: NetworkTraceStep, current_value: Dict[str, IdentifiedObject]):
177+
data = dict(current_value)
178+
equip = next_item.path.to_equipment
179+
if isinstance(equip, Breaker):
180+
data['breaker'] = equip
181+
elif isinstance(equip, Fuse):
182+
data['upstream_switch'] = equip
183+
elif isinstance(equip, PowerTransformer):
184+
if equip.function == TransformerFunctionKind.distributionTransformer:
185+
data['distribution_power_transformer'] = equip
186+
elif equip.function == TransformerFunctionKind.voltageRegulator:
187+
data['regulator'] = equip
188+
return data
189+
190+
def compute_initial_value(self, item: NetworkTraceStep):
191+
return {}
192+
193+
await (
194+
Tracing.network_trace()
195+
.add_condition(downstream())
196+
.add_step_action(StepActionWithContext('key'))
197+
).run(getattr(feeder, 'normal_head_terminal'))
198+
199+
write_csv(energy_consumers, feeder.mrid)
200+
201+
202+
def write_csv(energy_consumers: List[EnergyConsumerDeviceHierarchy], feeder_mrid: str):
100203
network_objects = pd.DataFrame(energy_consumers)
101204
os.makedirs("csvs", exist_ok=True)
102-
network_objects.to_csv(f"csvs/{f.mrid}_{csv_sfx}", index=False)
205+
network_objects.to_csv(f"csvs/{feeder_mrid}_energy_consumers.csv", index=False)
103206

104207

105208
class NullEquipment:
@@ -108,7 +211,7 @@ class NullEquipment:
108211
name = None
109212

110213

111-
def _build_row(up_data: dict[str, Union[IdentifiedObject, str]]) -> EnergyConsumerDeviceHierarchy:
214+
def _build_row(up_data: dict[str, IdentifiedObject | str]) -> EnergyConsumerDeviceHierarchy:
112215
return EnergyConsumerDeviceHierarchy(
113216
energy_consumer_mrid=up_data['energy_consumer_mrid'],
114217
upstream_switch_mrid=(up_data.get('upstream_switch') or NullEquipment).mrid,
@@ -122,22 +225,45 @@ def _build_row(up_data: dict[str, Union[IdentifiedObject, str]]) -> EnergyConsum
122225
)
123226

124227

125-
def process_target(feeder):
126-
asyncio.run(trace_from_energy_consumers(feeder))
228+
def process_feeders_sequentially():
229+
async def main_async(trace_type: Callable):
230+
"""
231+
Fetch the equipment container from the given feeder and create a CSV with the relevant information.
232+
Differences between the functions passable as `trace_type` are documented in the relevant docstrings.
127233
234+
`trace_type` must be one of the following:
235+
- `trace_from_energy_consumers`
236+
- `trace_from_energy_consumers_with_context`
237+
- `trace_from_feeder_downstream`
238+
239+
"""
240+
from tqdm import tqdm
241+
client = _get_client()
242+
feeders = ["<FEEDER_ID>"]
243+
# feeders = list(await get_feeders(client)) # Uncomment to process all feeders
244+
for _feeder in tqdm(feeders):
245+
await trace_type(_feeder, client)
246+
247+
# Uncomment to run other trace functions
248+
asyncio.run(main_async(trace_from_feeder_context))
249+
# asyncio.run(main_async(trace_from_feeder_downstream))
250+
# asyncio.run(main_async(trace_from_energy_consumers))
251+
252+
253+
def process_feeders_concurrently():
254+
def multi_proc(_feeder):
255+
# Uncomment to run other trace functions
256+
asyncio.run(trace_from_feeder_context(_feeder))
257+
# asyncio.run(trace_from_feeder_downstream(_feeder))
258+
# asyncio.run(trace_from_energy_consumers(_feeder))
128259

129-
if __name__ == "__main__":
130260
# Get a list of feeders before entering main compute section of script.
131-
feeders = asyncio.run(get_feeders())
261+
feeders = list(asyncio.run(get_feeders()))
132262

133-
# Spin up a multiprocess pool of $CPU_COUNT processes to handle the workload, otherwise we saturate a single cpu core and it's slow.
134-
cpus = os.cpu_count()
135-
print(f'Spawning {cpus} processes')
136-
pool = Pool(cpus)
263+
from tqdm.contrib.concurrent import process_map
264+
process_map(multi_proc, feeders, max_workers=int(os.cpu_count() / 2))
137265

138-
print(f'mapping to process pool')
139-
pool.map(process_target, feeders)
140266

141-
print('finishing remaining processes')
142-
pool.close()
143-
pool.join()
267+
if __name__ == "__main__":
268+
process_feeders_sequentially()
269+
# process_feeders_concurrently() # Uncomment and comment sequentially above to multi-process, note this is resource intensive and may cause issues.

0 commit comments

Comments
 (0)