From 1bb9e634242d26d327985ec5e4b108d01194dc44 Mon Sep 17 00:00:00 2001 From: Max Chesterfield Date: Fri, 11 Jul 2025 01:34:08 +1000 Subject: [PATCH 1/4] working upstream trace from EC Signed-off-by: Max Chesterfield --- .../energy_consumer_device_heirarchy.py | 123 ++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 src/zepben/examples/energy_consumer_device_heirarchy.py diff --git a/src/zepben/examples/energy_consumer_device_heirarchy.py b/src/zepben/examples/energy_consumer_device_heirarchy.py new file mode 100644 index 0000000..d24c5e6 --- /dev/null +++ b/src/zepben/examples/energy_consumer_device_heirarchy.py @@ -0,0 +1,123 @@ +# Copyright 2025 Zeppelin Bend Pty Ltd +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +import asyncio +import json +import os +from dataclasses import dataclass + +import pandas as pd +from zepben.evolve import NetworkConsumerClient, connect_with_token, Tracing, upstream, EnergyConsumer, NetworkTraceStep, StepContext, PowerTransformer, \ + TransformerFunctionKind, Breaker, ConductingEquipment, Fuse, IdentifiedObject +from zepben.protobuf.nc.nc_requests_pb2 import IncludedEnergizingContainers, IncludedEnergizedContainers + + +@dataclass +class EnergyConsumerDeviceHeirarchy: + energy_consumer_mrid: str + lv_circuit_name: str + upstream_switch_mrid: str + lv_circuit_name: str + upstream_switch_class: str + distribution_power_transformer_mrid: str + distribution_power_transformer_name: str + regulator_mrid: str + breaker_mrid: str + feeder_mrid: str + + +def _get_client(): + with open('config.json') as f: + config = json.load(f) + + # Connect to server + channel = connect_with_token( + host=config["host"], + access_token=config["access_token"], + rpc_port=config['port'], + ca_filename=config['ca_filename'] + ) + return NetworkConsumerClient(channel) + + +def _get_equipment_tree_trace(up_data): + + def step_action(step: NetworkTraceStep, _: StepContext): + to_equip: ConductingEquipment = step.path.to_equipment + match to_equip: + case Breaker(): + if not up_data.get('breaker'): + up_data['breaker'] = to_equip + case Fuse(): + if not up_data.get('upstream_switch'): + up_data['upstream_switch'] = to_equip + case PowerTransformer(): + if not up_data.get('distribution_power_transformer'): + up_data['distribution_power_transformer'] = to_equip + elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator: + up_data['regulator'] = to_equip + + return ( + Tracing.network_trace() + .add_condition(upstream()) + .add_step_action(step_action) + ) + + +async def main(): + client = _get_client() + + + for feeder in (await client.get_network_hierarchy()).result.feeders: + # Get all objects under the feeder, including Substations and LV Feeders + feeder_objects = ( + await client.get_equipment_container( + feeder, + include_energizing_containers = IncludedEnergizingContainers.INCLUDE_ENERGIZING_SUBSTATIONS, + include_energized_containers = IncludedEnergizedContainers.INCLUDE_ENERGIZED_LV_FEEDERS + ) + ).result.objects + + energy_consumers = [] + + for up in feeder_objects.values(): + if isinstance(up, EnergyConsumer): + up_data = {'feeder': feeder, 'energy_consumer_mrid': up.mrid} + + # Trace upstream from EnergyConsumer. + await _get_equipment_tree_trace(up_data).run(up) + energy_consumers.append(_build_row(up_data)) + + csv_sfx = "energy_consumers.csv" + print(f"Writing csvs/{feeder}_{csv_sfx}") + network_objects = pd.DataFrame(energy_consumers) + os.makedirs("csvs", exist_ok=True) + network_objects.to_csv(f"csvs/{feeder}_{csv_sfx}", index=False) + print(f"Finished processing {feeder}") + + +class NullEquipment: + """empty class to simplify code below in the case of an equipment not existing in that position of the network""" + mrid = None + name = None + + +def _build_row(up_data: dict[str, IdentifiedObject | str]) -> EnergyConsumerDeviceHeirarchy: + return EnergyConsumerDeviceHeirarchy( + energy_consumer_mrid = up_data['energy_consumer_mrid'], + upstream_switch_mrid = (up_data.get('upstream_switch') or NullEquipment).mrid, + lv_circuit_name = (up_data.get('upstream_switch') or NullEquipment).name, + upstream_switch_class = type(up_data.get('upstream_switch')).__name__, + distribution_power_transformer_mrid = up_data.get('distribution_power_transformer').mrid, + distribution_power_transformer_name = up_data.get('distribution_power_transformer').name, + regulator_mrid = (up_data.get('regulator') or NullEquipment).mrid, + breaker_mrid = (up_data.get('breaker') or NullEquipment).mrid, + feeder_mrid = up_data.get('feeder'), + ) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file From 52c8ad5484cf9179b0dd37708f99b8261cd1a8ca Mon Sep 17 00:00:00 2001 From: Max Chesterfield Date: Fri, 11 Jul 2025 01:34:40 +1000 Subject: [PATCH 2/4] Machines these days have multiple cores for a reason. Signed-off-by: Max Chesterfield --- .../energy_consumer_device_heirarchy.py | 74 ++++++++++++------- 1 file changed, 48 insertions(+), 26 deletions(-) diff --git a/src/zepben/examples/energy_consumer_device_heirarchy.py b/src/zepben/examples/energy_consumer_device_heirarchy.py index d24c5e6..b8b1f5f 100644 --- a/src/zepben/examples/energy_consumer_device_heirarchy.py +++ b/src/zepben/examples/energy_consumer_device_heirarchy.py @@ -8,6 +8,7 @@ import json import os from dataclasses import dataclass +from multiprocessing import Pool import pandas as pd from zepben.evolve import NetworkConsumerClient, connect_with_token, Tracing, upstream, EnergyConsumer, NetworkTraceStep, StepContext, PowerTransformer, \ @@ -67,36 +68,39 @@ def step_action(step: NetworkTraceStep, _: StepContext): ) -async def main(): +async def get_feeders(): client = _get_client() + _feeders = (await client.get_network_hierarchy()).result.feeders + return _feeders - for feeder in (await client.get_network_hierarchy()).result.feeders: - # Get all objects under the feeder, including Substations and LV Feeders - feeder_objects = ( - await client.get_equipment_container( - feeder, - include_energizing_containers = IncludedEnergizingContainers.INCLUDE_ENERGIZING_SUBSTATIONS, - include_energized_containers = IncludedEnergizedContainers.INCLUDE_ENERGIZED_LV_FEEDERS - ) - ).result.objects - energy_consumers = [] +async def trace_from_ec(feeder): + client = _get_client() + print(f'processing feeder {feeder}') + # Get all objects under the feeder, including Substations and LV Feeders + feeder_objects = ( + await client.get_equipment_container( + feeder, + include_energizing_containers = IncludedEnergizingContainers.INCLUDE_ENERGIZING_SUBSTATIONS, + include_energized_containers = IncludedEnergizedContainers.INCLUDE_ENERGIZED_LV_FEEDERS + ) + ).result.objects + + energy_consumers = [] - for up in feeder_objects.values(): - if isinstance(up, EnergyConsumer): - up_data = {'feeder': feeder, 'energy_consumer_mrid': up.mrid} + for up in feeder_objects.values(): + if isinstance(up, EnergyConsumer): + up_data = {'feeder': feeder, 'energy_consumer_mrid': up.mrid} - # Trace upstream from EnergyConsumer. - await _get_equipment_tree_trace(up_data).run(up) - energy_consumers.append(_build_row(up_data)) + # Trace upstream from EnergyConsumer. + await _get_equipment_tree_trace(up_data).run(up) + energy_consumers.append(_build_row(up_data)) - csv_sfx = "energy_consumers.csv" - print(f"Writing csvs/{feeder}_{csv_sfx}") - network_objects = pd.DataFrame(energy_consumers) - os.makedirs("csvs", exist_ok=True) - network_objects.to_csv(f"csvs/{feeder}_{csv_sfx}", index=False) - print(f"Finished processing {feeder}") + csv_sfx = "energy_consumers.csv" + network_objects = pd.DataFrame(energy_consumers) + os.makedirs("csvs", exist_ok=True) + network_objects.to_csv(f"csvs/{feeder}_{csv_sfx}", index=False) class NullEquipment: @@ -111,13 +115,31 @@ def _build_row(up_data: dict[str, IdentifiedObject | str]) -> EnergyConsumerDevi upstream_switch_mrid = (up_data.get('upstream_switch') or NullEquipment).mrid, lv_circuit_name = (up_data.get('upstream_switch') or NullEquipment).name, upstream_switch_class = type(up_data.get('upstream_switch')).__name__, - distribution_power_transformer_mrid = up_data.get('distribution_power_transformer').mrid, - distribution_power_transformer_name = up_data.get('distribution_power_transformer').name, + distribution_power_transformer_mrid = (up_data.get('distribution_power_transformer') or NullEquipment).mrid, + distribution_power_transformer_name = (up_data.get('distribution_power_transformer') or NullEquipment).name, regulator_mrid = (up_data.get('regulator') or NullEquipment).mrid, breaker_mrid = (up_data.get('breaker') or NullEquipment).mrid, feeder_mrid = up_data.get('feeder'), ) +def process_target(feeder): + asyncio.run(trace_from_ec(feeder)) + + if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + # Get a list of feeders before entering main compute section of script. + feeders = asyncio.run(get_feeders()) + + # Spin up a multiprocess pool of $CPU_COUNT processes to handle the workload, otherwise we saturate a single cpu core and it's slow. + cpus = os.cpu_count() + print(f'Spawning {cpus} processes') + pool = Pool(cpus) + + + print(f'mapping to process pool') + pool.map(process_target, feeders) + + print('finishing remaining processes') + pool.close() + pool.join() \ No newline at end of file From 12e633dd32234014ee7325f555874a287ad306b3 Mon Sep 17 00:00:00 2001 From: Max Chesterfield Date: Fri, 11 Jul 2025 16:24:57 +1000 Subject: [PATCH 3/4] bump zepben.evolve version Signed-off-by: Max Chesterfield --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index f005e35..f5f1d79 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ python_requires='>=3.9, <3.13', install_requires=[ "zepben.eas==0.17.1", - "zepben.evolve==0.45.0", + "zepben.evolve==0.48.0", "numba==0.60.0", "geojson==2.5.0", "gql[requests]==3.4.1" From dca9c2dc69d8e2779bec40f15edec7f66ae4e3d4 Mon Sep 17 00:00:00 2001 From: Max Chesterfield Date: Fri, 11 Jul 2025 16:57:43 +1000 Subject: [PATCH 4/4] python 3.9 Signed-off-by: Max Chesterfield --- .../energy_consumer_device_heirarchy.py | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/zepben/examples/energy_consumer_device_heirarchy.py b/src/zepben/examples/energy_consumer_device_heirarchy.py index b8b1f5f..2e12626 100644 --- a/src/zepben/examples/energy_consumer_device_heirarchy.py +++ b/src/zepben/examples/energy_consumer_device_heirarchy.py @@ -12,7 +12,7 @@ import pandas as pd from zepben.evolve import NetworkConsumerClient, connect_with_token, Tracing, upstream, EnergyConsumer, NetworkTraceStep, StepContext, PowerTransformer, \ - TransformerFunctionKind, Breaker, ConductingEquipment, Fuse, IdentifiedObject + TransformerFunctionKind, Breaker, ConductingEquipment, Fuse, IdentifiedObject, NetworkTrace from zepben.protobuf.nc.nc_requests_pb2 import IncludedEnergizingContainers, IncludedEnergizedContainers @@ -44,22 +44,21 @@ def _get_client(): return NetworkConsumerClient(channel) -def _get_equipment_tree_trace(up_data): +def _get_equipment_tree_trace(up_data: dict) -> NetworkTrace: def step_action(step: NetworkTraceStep, _: StepContext): to_equip: ConductingEquipment = step.path.to_equipment - match to_equip: - case Breaker(): - if not up_data.get('breaker'): - up_data['breaker'] = to_equip - case Fuse(): - if not up_data.get('upstream_switch'): - up_data['upstream_switch'] = to_equip - case PowerTransformer(): - if not up_data.get('distribution_power_transformer'): - up_data['distribution_power_transformer'] = to_equip - elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator: - up_data['regulator'] = to_equip + if isinstance(to_equip, Breaker): + if not up_data.get('breaker'): + up_data['breaker'] = to_equip + elif isinstance(to_equip, Fuse): + if not up_data.get('upstream_switch'): + up_data['upstream_switch'] = to_equip + elif isinstance(to_equip, PowerTransformer): + if not up_data.get('distribution_power_transformer'): + up_data['distribution_power_transformer'] = to_equip + elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator: + up_data['regulator'] = to_equip return ( Tracing.network_trace() @@ -76,6 +75,10 @@ async def get_feeders(): async def trace_from_ec(feeder): + """ + Fetch the equipment container from the given feeder, then trace upstream from every EnergyConsumer + and create a CSV with the relevant information. + """ client = _get_client() print(f'processing feeder {feeder}') # Get all objects under the feeder, including Substations and LV Feeders @@ -136,10 +139,9 @@ def process_target(feeder): print(f'Spawning {cpus} processes') pool = Pool(cpus) - print(f'mapping to process pool') pool.map(process_target, feeders) print('finishing remaining processes') pool.close() - pool.join() \ No newline at end of file + pool.join()