diff --git a/setup.py b/setup.py index 27eb4fd..47800cb 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ python_requires='>=3.9, <3.13', install_requires=[ "zepben.eas==0.17.1", - "zepben.evolve==0.45.0", + "zepben.evolve==0.48.0b2", "numba==0.60.0", "geojson==2.5.0", "gql[requests]==3.4.1", diff --git a/src/zepben/examples/dsub_from_nmi.py b/src/zepben/examples/dsub_from_nmi.py new file mode 100644 index 0000000..5e34803 --- /dev/null +++ b/src/zepben/examples/dsub_from_nmi.py @@ -0,0 +1,80 @@ +# Copyright 2025 Zeppelin Bend Pty Ltd +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +""" +Example trace showing methods to traverse upstream of a given `IdentifiedObject` to find the first occurrence + of another specified `IdentifiedObject` +""" + +import asyncio +import json + +from zepben.evolve import NetworkStateOperators, NetworkTraceActionType, NetworkTraceStep, StepContext, \ + NetworkConsumerClient, ConductingEquipment, connect_with_token +from zepben.evolve import PowerTransformer, UsagePoint, Tracing, Switch +from zepben.protobuf.nc.nc_requests_pb2 import INCLUDE_ENERGIZED_LV_FEEDERS + +with open("config.json") as f: + c = json.loads(f.read()) + + +def _trace(start_item, results, stop_condition): + """Returns a `NetworkTrace` configured with our parameters""" + + def step_action(step: NetworkTraceStep, context: StepContext): + if context.is_stopping: # if the trace is stopping, we have found the equipment we're looking for + results.append(step.path.to_equipment) + + state_operators = NetworkStateOperators.NORMAL + + return ( + Tracing.network_trace( + network_state_operators=state_operators, + action_step_type=NetworkTraceActionType.ALL_STEPS + ).add_condition(state_operators.upstream()) + .add_stop_condition(stop_condition) + .add_step_action(step_action) + .add_start_item(start_item) + ) + + +async def main(mrid: str, feeder_mrid: str): + channel = connect_with_token(host=c["host"], access_token=c["access_token"], rpc_port=c["rpc_port"]) + client = NetworkConsumerClient(channel) + await client.get_equipment_container(feeder_mrid, include_energized_containers=INCLUDE_ENERGIZED_LV_FEEDERS) + network = client.service + + try: + usage_point = network.get(mrid, UsagePoint) + # get the `ConductingEquipment` from the `UsagePoint` + start_item = next(filter(lambda ce: isinstance(ce, ConductingEquipment), usage_point.equipment)) + except TypeError: + start_item = network.get(mrid, ConductingEquipment) + + results = [] + + # Get DSUB from which any given customer is supplied from using a basic upstream trace + def dsub_stop_condition(step: NetworkTraceStep, context: StepContext): + return isinstance(step.path.to_equipment, PowerTransformer) + + # Get Circuit Breaker from which any given customer is supplied from using a basic upstream trace + # Uncomment stop condition below to use + def circuit_breaker_stop_condition(step: NetworkTraceStep, context: StepContext): + return isinstance(step.path.to_equipment, Switch) + + await _trace( + start_item=start_item, + results=results, + stop_condition=dsub_stop_condition, + # stop_condition=circuit_breaker_stop_condition, + ).run() + + print(results) + + +if __name__ == "__main__": + # EnergyConsumer: 50763684 + asyncio.run(main(mrid='4310990779', feeder_mrid='RW1292')) diff --git a/src/zepben/examples/find_isolation_section_from_equipment.py b/src/zepben/examples/find_isolation_section_from_equipment.py new file mode 100644 index 0000000..58fb2f6 --- /dev/null +++ b/src/zepben/examples/find_isolation_section_from_equipment.py @@ -0,0 +1,58 @@ +# Copyright 2025 Zeppelin Bend Pty Ltd +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +""" +Example trace showing method to traverse outwards from any given `IdentifiableObject` to the next `Switch` object, and + build a list of all contained equipment (isolate-able section) +""" + +import asyncio +import json + +from zepben.evolve import NetworkStateOperators, NetworkTraceActionType, NetworkTraceStep, StepContext, \ + NetworkConsumerClient, AcLineSegment, connect_with_token +from zepben.evolve import Tracing, Switch +from zepben.protobuf.nc.nc_requests_pb2 import INCLUDE_ENERGIZED_LV_FEEDERS + +with open("config.json") as f: + c = json.loads(f.read()) + + +async def main(conductor_mrid: str, feeder_mrid: str): + channel = connect_with_token(host=c["host"], access_token=c["access_token"], rpc_port=c["rpc_port"]) + client = NetworkConsumerClient(channel) + await client.get_equipment_container(feeder_mrid, include_energized_containers=INCLUDE_ENERGIZED_LV_FEEDERS) + network = client.service + + hv_acls = network.get(conductor_mrid, AcLineSegment) + + found_equip = set() + + def queue_condition(step: NetworkTraceStep, context: StepContext, _, __): + """Queue the next step unless it's a `Switch`""" + return not isinstance(step.path.to_equipment, Switch) + + def step_action(step: NetworkTraceStep, context: StepContext): + """Add to our list of equipment, and equipment stepped on during this trace""" + found_equip.add(step.path.to_equipment.mrid) + + state_operators = NetworkStateOperators.NORMAL + + await ( + Tracing.network_trace( + network_state_operators=state_operators, + action_step_type=NetworkTraceActionType.ALL_STEPS + ).add_condition(state_operators.stop_at_open()) + .add_queue_condition(queue_condition) + .add_step_action(step_action) + .add_start_item(hv_acls) + ).run() + + print(found_equip) # prints a list of all mRID's for all equipment in the isolation area. + + +if __name__ == "__main__": + asyncio.run(main(conductor_mrid='50434998', feeder_mrid='RW1292')) diff --git a/src/zepben/examples/isolation_equipment_between_nodes.py b/src/zepben/examples/isolation_equipment_between_nodes.py new file mode 100644 index 0000000..be9c518 --- /dev/null +++ b/src/zepben/examples/isolation_equipment_between_nodes.py @@ -0,0 +1,67 @@ +# Copyright 2025 Zeppelin Bend Pty Ltd +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +""" +Example trace showing method to traverse between any 2 given `IdentifiableObject` and build a list of `ProtectedSwitch` objects found, if any +""" + +import asyncio +import json +from typing import Tuple, Type + +from zepben.evolve import NetworkStateOperators, NetworkTraceActionType, NetworkTraceStep, StepContext, \ + NetworkConsumerClient, ProtectedSwitch, Recloser, LoadBreakSwitch, connect_with_token +from zepben.evolve import Tracing +from zepben.protobuf.nc.nc_requests_pb2 import INCLUDE_ENERGIZED_LV_FEEDERS + +with open("config.json") as f: + c = json.loads(f.read()) + + +async def main(mrids: Tuple[str, str], io_type: Type[ProtectedSwitch], feeder_mrid): + channel = connect_with_token(host=c["host"], access_token=c["access_token"], rpc_port=c["rpc_port"]) + client = NetworkConsumerClient(channel) + await client.get_equipment_container(feeder_mrid, include_energized_containers=INCLUDE_ENERGIZED_LV_FEEDERS) + network = client.service + + nodes = [network.get(_id) for _id in mrids] + + state_operators = NetworkStateOperators.NORMAL + + found_switch = set() + found_node = [] + + def stop_condition(step: NetworkTraceStep, context: StepContext): + """if we encounter any of the equipment we have specified, we stop the trace and mark the `found_switch` list as valid""" + if step.path.to_equipment in nodes: + found_node.append(True) + return True + + def step_action(step: NetworkTraceStep, context: StepContext): + """Add any equipment matching the type passed in to the list, this list is invalid unless we trace onto our other node""" + if isinstance(step.path.to_equipment, io_type): + found_switch.add(step.path.to_equipment) + + trace = ( + Tracing.network_trace( + network_state_operators=state_operators, + action_step_type=NetworkTraceActionType.ALL_STEPS + ).add_condition(state_operators.upstream()) + .add_stop_condition(stop_condition) + .add_step_action(step_action) + ) + + queue = iter(nodes) + while not found_node: # run an upstream trace for every node specified until we encounter another specified node + await trace.run(start=next(queue), can_stop_on_start_item=False) + + all(map(print, found_switch)) # print the list of switches + print(bool(found_switch)) # print whether we found what we were looking for + + +if __name__ == "__main__": + asyncio.run(main(mrids=('50735858', '66598892'), io_type=LoadBreakSwitch, feeder_mrid='RW1292')) + asyncio.run(main(mrids=('50735858', '50295424'), io_type=Recloser, feeder_mrid='RW1292'))