Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
* None.

### Enhancements
* None.
* `EquipmentTreeBuilder` will now calculate `leaves` when specified to do so.

### Fixes
* Marked some extensions properties and classes with [ZBEX] that were missing them (might still be more). In addition to the ones moved into the extensions
Expand Down Expand Up @@ -127,6 +127,8 @@
* `RegulatingControl.ratedCurrent`
* `Sensor.relayFunctions`
* `UsagePoint.approvedInverterCapacity`
* using `EquipmentTreeBuilder` more then once per interpreter will no longer cause the `roots` to contain more objects then it should due to `_roots` being a
class var

### Notes
* None.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,40 @@ class EquipmentTreeBuilder(StepActionWithContextValue):
>>> .add_step_action(tree_builder)).run()
"""

_roots: dict[ConductingEquipment, EquipmentTreeNode] = {}

def __init__(self):
def __init__(self, calculate_leaves: bool = False):
super().__init__(key=str(uuid.uuid4()))

self._roots: dict[ConductingEquipment, EquipmentTreeNode] = {}
self._leaves: set[EquipmentTreeNode] = set()

self._calculate_leaves = calculate_leaves

@property
def roots(self) -> Generator[TreeNode[ConductingEquipment], None, None]:
return (r for r in self._roots.values())

def recurse_nodes(self) -> Generator[TreeNode[ConductingEquipment], None, None]:
"""
Returns a generator that will yield every node in the tree structure.
"""
def recurse(node: TreeNode[ConductingEquipment]):
yield node
for child in node.children:
yield from recurse(child)

for root in self._roots.values():
yield from recurse(root)

@property
def leaves(self) -> set[EquipmentTreeNode]:
"""
Return the leaves of the tree structure.
Depending on how the backing trace is configured, there may be extra unexpected leaves in loops.
"""
if not self._calculate_leaves:
raise AttributeError('leaves were not calculated, you must pass calculate_leaves=True to the EquipmentTreeBuilder when creating.')
return set(self._leaves)

def compute_initial_value(self, item: NetworkTraceStep[Any]) -> EquipmentTreeNode:
node = self._roots.get(item.path.to_equipment)
if node is None:
Expand All @@ -62,10 +87,18 @@ def compute_next_value(
else:
return TreeNode(next_item.path.to_equipment, current_value)

def _apply(self, item: NetworkTraceStep[Any], context: StepContext):
def _apply(self, _: NetworkTraceStep[Any], context: StepContext):
current_node: TreeNode = self.get_context_value(context)
if current_node.parent:
current_node.parent.add_child(current_node)

if self._calculate_leaves:
self._process_leaf(current_node)

def _process_leaf(self, current_node: TreeNode[ConductingEquipment]):
self._leaves.add(current_node) # add this node to _leaves as it has no children
if current_node.parent:
self._leaves.discard(current_node.parent) # this nodes parent now has a child, it's not a leaf anymore

def clear(self):
self._roots.clear()
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,31 @@

__all__ = ['TreeNode']

from typing import List, TypeVar, Generic

from zepben.ewb import IdentifiedObject
from typing import TypeVar, Generic, Set

T = TypeVar('T')


class TreeNode(Generic[T]):
"""
represents a node in the NetworkTrace tree
Represents a node in the NetworkTrace tree
"""

def __init__(self, identified_object: IdentifiedObject, parent=None):
def __init__(self, identified_object: T, parent=None):
self.identified_object = identified_object
self._parent: TreeNode = parent
self._children: List[TreeNode] = []
self._children: Set[TreeNode] = set()

@property
def parent(self) -> 'TreeNode[T]':
return self._parent

@property
def children(self):
return list(self._children)
def children(self) -> Set['TreeNode[T]']:
return set(self._children)

def add_child(self, child: 'TreeNode'):
self._children.append(child)
def add_child(self, child: 'TreeNode[T]'):
self._children.add(child)

def __str__(self):
return f"{{object: {self.identified_object}, parent: {self.parent or ''}, num children: {len(self.children)}}}"
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,41 @@
from zepben.ewb.services.network.tracing.networktrace.actions.tree_node import TreeNode


def test_accessing_leaves_when_not_calculated_raises_exception():
builder = EquipmentTreeBuilder()
with pytest.raises(AttributeError):
builder.leaves

@pytest.mark.asyncio
async def test_equipment_tree_builder_leaves():
n = create_looping_network()
normal = NetworkStateOperators.NORMAL
current = NetworkStateOperators.CURRENT

await Tracing.set_phases().run(n)
feeder_head = n.get("j0", ConductingEquipment)
await Tracing.set_direction().run_terminal(feeder_head, network_state_operators=normal)
await Tracing.set_direction().run_terminal(feeder_head, network_state_operators=current)
await log_directions(n.get('j0', ConductingEquipment))

start = n.get("j1", ConductingEquipment)
assert start is not None
tree_builder = EquipmentTreeBuilder(calculate_leaves=True)
trace = (
Tracing.network_trace_branching(
network_state_operators=normal,
action_step_type=NetworkTraceActionType.FIRST_STEP_ON_EQUIPMENT
)
.add_condition(downstream())
.add_step_action(tree_builder)
)

await trace.run(start)

for ce in (n['j5'], n['j13']):
assert ce in {l.identified_object for l in tree_builder.leaves}


@pytest.mark.asyncio
async def test_downstream_tree():
n = create_looping_network()
Expand All @@ -33,12 +68,15 @@ async def test_downstream_tree():
start = n.get("j1", ConductingEquipment)
assert start is not None
tree_builder = EquipmentTreeBuilder()
trace = Tracing.network_trace_branching(
network_state_operators=normal,
action_step_type=NetworkTraceActionType.FIRST_STEP_ON_EQUIPMENT) \
.add_condition(downstream()) \
.add_step_action(tree_builder) \
trace = (
Tracing.network_trace_branching(
network_state_operators=normal,
action_step_type=NetworkTraceActionType.FIRST_STEP_ON_EQUIPMENT
)
.add_condition(downstream())
.add_step_action(tree_builder)
.add_step_action(lambda item, context: visited_ce.append(item.path.to_equipment.mrid))
)

await trace.run(start)

Expand All @@ -51,34 +89,39 @@ async def test_downstream_tree():

pprint.pprint(visit_counts)

root = list(tree_builder.roots)[0]
root = tree_builder._roots[start]

assert root is not None
_verify_tree_asset(root, n["j1"], None, [n["ac1"], n["ac3"]])

test_node = root.children[0]
_verify_tree_asset(test_node, n["ac1"], n["j1"], [n["j2"]])
assert len(root.children) == 2
for test_node in root.children:
if test_node.identified_object == n['ac1']:
_verify_tree_asset(test_node, n["ac1"], n["j1"], [n["j2"]])

test_node = test_node.children[0]
_verify_tree_asset(test_node, n["j2"], n["ac1"], [n["ac2"]])
test_node = test_node.children.pop()
_verify_tree_asset(test_node, n["j2"], n["ac1"], [n["ac2"]])

test_node = test_node.children[0]
_verify_tree_asset(test_node, n["ac2"], n["j2"], [n["j3"]])
test_node = test_node.children.pop()
_verify_tree_asset(test_node, n["ac2"], n["j2"], [n["j3"]])

test_node = next(iter(test_node.children))
_verify_tree_asset(test_node, n["j3"], n["ac2"], [n["ac4"]])
test_node = next(iter(test_node.children))
_verify_tree_asset(test_node, n["j3"], n["ac2"], [n["ac4"]])

test_node = next(iter(test_node.children))
_verify_tree_asset(test_node, n["ac4"], n["j3"], [n["j6"]])
test_node = next(iter(test_node.children))
_verify_tree_asset(test_node, n["ac4"], n["j3"], [n["j6"]])

test_node = next(iter(test_node.children))
_verify_tree_asset(test_node, n["j6"], n["ac4"], [])
test_node = next(iter(test_node.children))
_verify_tree_asset(test_node, n["j6"], n["ac4"], [])
break

test_node = list(root.children)[1]
_verify_tree_asset(test_node, n["ac3"], n["j1"], [n["j4"]])
elif test_node.identified_object == n['ac3']:
_verify_tree_asset(test_node, n["ac3"], n["j1"], [n["j4"]])

test_node = next(iter(test_node.children))
_verify_tree_asset(test_node, n["j4"], n["ac3"], [n["ac5"], n["ac6"]])
test_node = next(iter(test_node.children))
_verify_tree_asset(test_node, n["j4"], n["ac3"], [n["ac5"], n["ac6"]])
else:
assert False

assert len(_find_nodes(root, "j0")) == 0
assert len(_find_nodes(root, "ac0")) == 0
Expand Down Expand Up @@ -162,8 +205,15 @@ def _verify_tree_asset(
else:
assert tree_node.parent is None

children_nodes = list(c.identified_object for c in tree_node.children)
assert children_nodes == expected_children
children_nodes = [c.identified_object for c in tree_node.children]
try:
for child in expected_children:
assert child in children_nodes
for child in children_nodes:
assert child in expected_children
except AssertionError as e:
e.args = (expected_children, children_nodes)
raise e


def _find_nodes(root: TreeNode[ConductingEquipment], asset_id: str) -> List[TreeNode[ConductingEquipment]]:
Expand Down
48 changes: 47 additions & 1 deletion test/services/network/tracing/networktrace/test_network_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from services.network.tracing.networktrace.test_network_trace_step_path_provider import PathTerminal, _verify_paths
from zepben.ewb import AcLineSegment, Clamp, Terminal, NetworkTraceStep, Cut, ConductingEquipment, TraversalQueue, Junction, ngen, NetworkTraceActionType, \
Tracing
Tracing, StepActionWithContextValue, EnergyConsumer
from zepben.ewb.testing.test_network_builder import TestNetworkBuilder

Terminal.__add__ = PathTerminal.__add__
Expand Down Expand Up @@ -286,3 +286,49 @@ async def validate(start: Tuple[str, str], action_step_type: NetworkTraceActionT
# Can even use bizarre paths, they are just the same as any other external path.
await validate(('c0-t1', 'c2-t1'), NetworkTraceActionType.ALL_STEPS, ["c2-t1", "c2-t2"])
await validate(('c0-t1', 'c2-t1'), NetworkTraceActionType.FIRST_STEP_ON_EQUIPMENT, ["c2-t1"])

async def test_context_is_not_shared_between_branches(self):
#
# 1--c0--21--c1-*-21--c2--21--ec3
# 1
# 1--c4--21--ec5
#

ns = (
TestNetworkBuilder()
.from_acls() # c0
.to_acls() # c1
.with_clamp() # c1-clamp1
.to_acls() # c2
.to_energy_consumer() # ec3
.branch_from('c1-clamp1')
.to_acls() # c4
.to_energy_consumer() # ec5
).network

data_capture: list[tuple[str, list]] = []

class StepActionWithContext(StepActionWithContextValue):
def _apply(self, item: NetworkTraceStep, context: List):
print(item)
if isinstance((ec := item.path.to_equipment), EnergyConsumer):
data_capture.append((ec.mrid, self.get_context_value(context)))

def compute_next_value(self, next_item: NetworkTraceStep, current_item: NetworkTraceStep, current_value: List):
nv = list(current_value)
nv.append(next_item.path.from_equipment.mrid)
return nv

def compute_initial_value(self, item: NetworkTraceStep):
return [item.path.from_equipment.mrid]

await (
Tracing.network_trace()
.add_step_action(StepActionWithContext('key'))
).run(ns.get('c0'))

assert len(data_capture) == 2
assert data_capture == [('ec3', ['c0', 'c0', 'c1', 'c1', 'c2', 'c2']),
('ec5', ['c0', 'c0', 'c1', 'c1-clamp1', 'c4', 'c4'])]