Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 60 additions & 4 deletions kuksa-client/kuksa_client/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1188,13 +1188,32 @@ def subscribe_current_values(
]):
for path, dp in updates.items():
print(f"Current value for {path} is now: {dp.value}")

Branch paths (e.g. ``['Vehicle']`` or ``['Vehicle.Cabin.*']``) are
accepted: if the v2 Subscribe RPC rejects a path with NOT_FOUND, the
paths are expanded via ListMetadata and the subscription is retried
with the resulting leaf signals. This restores the wildcard semantics
that v1 provided natively.
"""
paths = list(paths)
try:
logger.debug("Try to subscribe current values via v2")
for updates in self.v2_subscribe(paths, **rpc_kwargs):
yield {
update.entry.path: update.entry.value for update in updates
}
try:
for updates in self.v2_subscribe(paths, **rpc_kwargs):
yield {
update.entry.path: update.entry.value for update in updates
}
except VSSClientError as exc:
if exc.error["code"] != grpc.StatusCode.NOT_FOUND.value[0]:
raise
logger.debug(
"v2 Subscribe returned NOT_FOUND; expanding branch paths via ListMetadata"
)
expanded = self._expand_v2_branch_paths(paths, **rpc_kwargs)
for updates in self.v2_subscribe(expanded, **rpc_kwargs):
yield {
update.entry.path: update.entry.value for update in updates
}
except VSSClientError as exc:
if exc.error["code"] != grpc.StatusCode.UNIMPLEMENTED.value[0]:
raise
Expand Down Expand Up @@ -1351,6 +1370,43 @@ def get_path(self, signal_id: types_v2.SignalID) -> str:
return self.id_to_path_mapping[signal_id.id]
return "<unknown signal>"

def _expand_v2_branch_paths(
self, paths: Iterable[str], **rpc_kwargs
) -> List[str]:
"""Expand branch / wildcard paths into concrete leaf signals.

For each input path, calls ``ListMetadata(root=path)`` and returns
every signal path beneath that branch. Leaf paths pass through
(ListMetadata returns a single entry). Non-existent paths surface
as NOT_FOUND. Trailing ``.*`` suffixes are stripped before lookup.

Order is preserved and duplicates (from overlapping branches) are
removed. Used to restore v1-style wildcard semantics on top of the
v2 Subscribe RPC, which only accepts fully-qualified leaf paths.
"""
rpc_kwargs["metadata"] = self.generate_metadata_header(
rpc_kwargs.get("metadata")
)
expanded: List[str] = []
for path in paths:
lookup = path[:-2] if path.endswith(".*") else path
req = self._prepare_v2_list_metadata_request(lookup)
try:
resp = self.client_stub_v2.ListMetadata(req, **rpc_kwargs)
except RpcError as exc:
raise VSSClientError.from_grpc_error(exc) from exc
if not resp.metadata:
raise VSSClientError(
error={
"code": grpc.StatusCode.NOT_FOUND.value[0],
"reason": grpc.StatusCode.NOT_FOUND.value[1],
"message": f"Path {path} not found on server",
},
errors=[],
)
expanded.extend(m.path for m in resp.metadata)
return list(dict.fromkeys(expanded))

def ensure_id_mapping(self, paths: Iterable[str], **rpc_kwargs):
for path in paths:
if path not in self.path_to_id_mapping:
Expand Down
64 changes: 60 additions & 4 deletions kuksa-client/kuksa_client/grpc/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,32 @@ async def subscribe_current_values(
]):
for path, dp in updates.items():
print(f"Current value for {path} is now: {dp.value}")

Branch paths (e.g. ``['Vehicle']`` or ``['Vehicle.Cabin.*']``) are
accepted: if the v2 Subscribe RPC rejects a path with NOT_FOUND, the
paths are expanded via ListMetadata and the subscription is retried
with the resulting leaf signals. This restores the wildcard semantics
that v1 provided natively.
"""
paths = list(paths)
try:
logger.debug("Try to subscribe current values via v2")
async for updates in self.v2_subscribe(paths=paths, **rpc_kwargs):
yield {
update.entry.path: update.entry.value for update in updates
}
try:
async for updates in self.v2_subscribe(paths=paths, **rpc_kwargs):
yield {
update.entry.path: update.entry.value for update in updates
}
except VSSClientError as exc:
if exc.error["code"] != grpc.StatusCode.NOT_FOUND.value[0]:
raise
logger.debug(
"v2 Subscribe returned NOT_FOUND; expanding branch paths via ListMetadata"
)
expanded = await self._expand_v2_branch_paths(paths, **rpc_kwargs)
async for updates in self.v2_subscribe(paths=expanded, **rpc_kwargs):
yield {
update.entry.path: update.entry.value for update in updates
}
except VSSClientError as exc:
if exc.error["code"] != grpc.StatusCode.UNIMPLEMENTED.value[0]:
raise
Expand Down Expand Up @@ -468,6 +487,43 @@ def get_path(self, signal_id: types_v2.SignalID) -> str:
return self.id_to_path_mapping[signal_id.id]
return "<unknown signal>"

async def _expand_v2_branch_paths(
self, paths: Iterable[str], **rpc_kwargs
) -> List[str]:
"""Expand branch / wildcard paths into concrete leaf signals.

For each input path, calls ``ListMetadata(root=path)`` and returns
every signal path beneath that branch. Leaf paths pass through
(ListMetadata returns a single entry). Non-existent paths surface
as NOT_FOUND. Trailing ``.*`` suffixes are stripped before lookup.

Order is preserved and duplicates (from overlapping branches) are
removed. Used to restore v1-style wildcard semantics on top of the
v2 Subscribe RPC, which only accepts fully-qualified leaf paths.
"""
rpc_kwargs["metadata"] = self.generate_metadata_header(
rpc_kwargs.get("metadata")
)
expanded: List[str] = []
for path in paths:
lookup = path[:-2] if path.endswith(".*") else path
req = self._prepare_v2_list_metadata_request(lookup)
try:
resp = await self.client_stub_v2.ListMetadata(req, **rpc_kwargs)
except AioRpcError as exc:
raise VSSClientError.from_grpc_error(exc) from exc
if not resp.metadata:
raise VSSClientError(
error={
"code": grpc.StatusCode.NOT_FOUND.value[0],
"reason": grpc.StatusCode.NOT_FOUND.value[1],
"message": f"Path {path} not found on server",
},
errors=[],
)
expanded.extend(m.path for m in resp.metadata)
return list(dict.fromkeys(expanded))

async def ensure_id_mapping(self, paths: Iterable[str], **rpc_kwargs):
for path in paths:
if path not in self.path_to_id_mapping:
Expand Down
62 changes: 62 additions & 0 deletions kuksa-client/tests/test_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,68 @@ async def subscribe_response_stream(**kwargs):
'Vehicle.Chassis.Height': Datapoint(666),
}

async def test_subscribe_current_values_branch_path_expansion(
self, mocker, unused_tcp_port,
):
"""Branch path (e.g. 'Vehicle') is rejected by v2 Subscribe with
NOT_FOUND; client falls back to ListMetadata expansion and retries
with the resulting leaf signals (restores pre-0.5.1 semantics for
issue #53)."""
client = VSSClient('127.0.0.1', unused_tcp_port)
client.connected = True # To bypass connection check

not_found = VSSClientError(
error={
"code": grpc.StatusCode.NOT_FOUND.value[0],
"reason": grpc.StatusCode.NOT_FOUND.value[1],
"message": "Path not found",
},
errors=[],
)

call_count = {"v2_subscribe": 0}

async def v2_subscribe_side_effect(paths, **kwargs):
call_count["v2_subscribe"] += 1
if call_count["v2_subscribe"] == 1:
# First call with branch path — server rejects
raise not_found
# Second call with expanded leaves — yield real data
yield [
EntryUpdate(DataEntry(
'Vehicle.Speed', value=Datapoint(42.0),
), (Field.VALUE,)),
EntryUpdate(DataEntry(
'Vehicle.ADAS.ABS.IsActive', value=Datapoint(True),
), (Field.VALUE,)),
]
mocker.patch.object(
client, 'v2_subscribe', side_effect=v2_subscribe_side_effect,
)

async def expand_side_effect(paths, **kwargs):
# Simulate ListMetadata resolving 'Vehicle' to two concrete leaves
assert list(paths) == ['Vehicle']
return ['Vehicle.Speed', 'Vehicle.ADAS.ABS.IsActive']
mocker.patch.object(
client, '_expand_v2_branch_paths', side_effect=expand_side_effect,
)

received_updates: Dict[str, Datapoint] = {}
async for updates in client.subscribe_current_values(['Vehicle']):
received_updates.update(updates)

assert call_count["v2_subscribe"] == 2
# First call used the original branch path; retry used expanded leaves.
first_paths = list(client.v2_subscribe.call_args_list[0][1]['paths'])
second_paths = list(client.v2_subscribe.call_args_list[1][1]['paths'])
assert first_paths == ['Vehicle']
assert second_paths == ['Vehicle.Speed', 'Vehicle.ADAS.ABS.IsActive']
assert received_updates == {
'Vehicle.Speed': Datapoint(42.0),
'Vehicle.ADAS.ABS.IsActive': Datapoint(True),
}

async def test_subscribe_target_values(self, mocker, unused_tcp_port):
client = VSSClient('127.0.0.1', unused_tcp_port)
client.connected = True # To bypass connection check
Expand Down