From 26854ac00ca7bb596d3bd1c96713e540ae856fe4 Mon Sep 17 00:00:00 2001 From: "Komada (aki1770-del)" Date: Mon, 20 Apr 2026 23:34:38 +0900 Subject: [PATCH] Fix wildcard regression in subscribe_current_values on v2 (closes #53) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The v2 Subscribe RPC only accepts fully-qualified leaf paths, so passing a branch path like `['Vehicle']` — which v1 Subscribe accepted as a wildcard — now fails with NOT_FOUND since 0.5.1. This breaks existing user code that worked on 0.4.x. Restore the prior behavior by catching NOT_FOUND from v2 Subscribe and retrying with paths expanded via ListMetadata(root=path). Leaf paths continue on the fast path (no extra RPC). The UNIMPLEMENTED fallback to v1 Subscribe is preserved for old databrokers. - kuksa-client/kuksa_client/grpc/aio.py: async path, new _expand_v2_branch_paths helper. - kuksa-client/kuksa_client/grpc/__init__.py: sync mirror of the same change. - kuksa-client/tests/test_grpc.py: new test asserting branch-path expansion on NOT_FOUND. Trailing `.*` in paths (e.g. `Vehicle.Cabin.*`) is stripped before lookup so both the idiomatic v1 form and the explicit wildcard form work. Signed-off-by: Komada (aki1770-del) --- kuksa-client/kuksa_client/grpc/__init__.py | 64 ++++++++++++++++++++-- kuksa-client/kuksa_client/grpc/aio.py | 64 ++++++++++++++++++++-- kuksa-client/tests/test_grpc.py | 62 +++++++++++++++++++++ 3 files changed, 182 insertions(+), 8 deletions(-) diff --git a/kuksa-client/kuksa_client/grpc/__init__.py b/kuksa-client/kuksa_client/grpc/__init__.py index fdd1dc6..d98b893 100644 --- a/kuksa-client/kuksa_client/grpc/__init__.py +++ b/kuksa-client/kuksa_client/grpc/__init__.py @@ -1188,13 +1188,32 @@ def subscribe_current_values( ]): for path, dp in updates.items(): print(f"Current value for {path} is now: {dp.value}") + + Branch paths (e.g. ``['Vehicle']`` or ``['Vehicle.Cabin.*']``) are + accepted: if the v2 Subscribe RPC rejects a path with NOT_FOUND, the + paths are expanded via ListMetadata and the subscription is retried + with the resulting leaf signals. This restores the wildcard semantics + that v1 provided natively. """ + paths = list(paths) try: logger.debug("Try to subscribe current values via v2") - for updates in self.v2_subscribe(paths, **rpc_kwargs): - yield { - update.entry.path: update.entry.value for update in updates - } + try: + for updates in self.v2_subscribe(paths, **rpc_kwargs): + yield { + update.entry.path: update.entry.value for update in updates + } + except VSSClientError as exc: + if exc.error["code"] != grpc.StatusCode.NOT_FOUND.value[0]: + raise + logger.debug( + "v2 Subscribe returned NOT_FOUND; expanding branch paths via ListMetadata" + ) + expanded = self._expand_v2_branch_paths(paths, **rpc_kwargs) + for updates in self.v2_subscribe(expanded, **rpc_kwargs): + yield { + update.entry.path: update.entry.value for update in updates + } except VSSClientError as exc: if exc.error["code"] != grpc.StatusCode.UNIMPLEMENTED.value[0]: raise @@ -1351,6 +1370,43 @@ def get_path(self, signal_id: types_v2.SignalID) -> str: return self.id_to_path_mapping[signal_id.id] return "" + def _expand_v2_branch_paths( + self, paths: Iterable[str], **rpc_kwargs + ) -> List[str]: + """Expand branch / wildcard paths into concrete leaf signals. + + For each input path, calls ``ListMetadata(root=path)`` and returns + every signal path beneath that branch. Leaf paths pass through + (ListMetadata returns a single entry). Non-existent paths surface + as NOT_FOUND. Trailing ``.*`` suffixes are stripped before lookup. + + Order is preserved and duplicates (from overlapping branches) are + removed. Used to restore v1-style wildcard semantics on top of the + v2 Subscribe RPC, which only accepts fully-qualified leaf paths. + """ + rpc_kwargs["metadata"] = self.generate_metadata_header( + rpc_kwargs.get("metadata") + ) + expanded: List[str] = [] + for path in paths: + lookup = path[:-2] if path.endswith(".*") else path + req = self._prepare_v2_list_metadata_request(lookup) + try: + resp = self.client_stub_v2.ListMetadata(req, **rpc_kwargs) + except RpcError as exc: + raise VSSClientError.from_grpc_error(exc) from exc + if not resp.metadata: + raise VSSClientError( + error={ + "code": grpc.StatusCode.NOT_FOUND.value[0], + "reason": grpc.StatusCode.NOT_FOUND.value[1], + "message": f"Path {path} not found on server", + }, + errors=[], + ) + expanded.extend(m.path for m in resp.metadata) + return list(dict.fromkeys(expanded)) + def ensure_id_mapping(self, paths: Iterable[str], **rpc_kwargs): for path in paths: if path not in self.path_to_id_mapping: diff --git a/kuksa-client/kuksa_client/grpc/aio.py b/kuksa-client/kuksa_client/grpc/aio.py index 41e61da..bdfa244 100644 --- a/kuksa-client/kuksa_client/grpc/aio.py +++ b/kuksa-client/kuksa_client/grpc/aio.py @@ -303,13 +303,32 @@ async def subscribe_current_values( ]): for path, dp in updates.items(): print(f"Current value for {path} is now: {dp.value}") + + Branch paths (e.g. ``['Vehicle']`` or ``['Vehicle.Cabin.*']``) are + accepted: if the v2 Subscribe RPC rejects a path with NOT_FOUND, the + paths are expanded via ListMetadata and the subscription is retried + with the resulting leaf signals. This restores the wildcard semantics + that v1 provided natively. """ + paths = list(paths) try: logger.debug("Try to subscribe current values via v2") - async for updates in self.v2_subscribe(paths=paths, **rpc_kwargs): - yield { - update.entry.path: update.entry.value for update in updates - } + try: + async for updates in self.v2_subscribe(paths=paths, **rpc_kwargs): + yield { + update.entry.path: update.entry.value for update in updates + } + except VSSClientError as exc: + if exc.error["code"] != grpc.StatusCode.NOT_FOUND.value[0]: + raise + logger.debug( + "v2 Subscribe returned NOT_FOUND; expanding branch paths via ListMetadata" + ) + expanded = await self._expand_v2_branch_paths(paths, **rpc_kwargs) + async for updates in self.v2_subscribe(paths=expanded, **rpc_kwargs): + yield { + update.entry.path: update.entry.value for update in updates + } except VSSClientError as exc: if exc.error["code"] != grpc.StatusCode.UNIMPLEMENTED.value[0]: raise @@ -468,6 +487,43 @@ def get_path(self, signal_id: types_v2.SignalID) -> str: return self.id_to_path_mapping[signal_id.id] return "" + async def _expand_v2_branch_paths( + self, paths: Iterable[str], **rpc_kwargs + ) -> List[str]: + """Expand branch / wildcard paths into concrete leaf signals. + + For each input path, calls ``ListMetadata(root=path)`` and returns + every signal path beneath that branch. Leaf paths pass through + (ListMetadata returns a single entry). Non-existent paths surface + as NOT_FOUND. Trailing ``.*`` suffixes are stripped before lookup. + + Order is preserved and duplicates (from overlapping branches) are + removed. Used to restore v1-style wildcard semantics on top of the + v2 Subscribe RPC, which only accepts fully-qualified leaf paths. + """ + rpc_kwargs["metadata"] = self.generate_metadata_header( + rpc_kwargs.get("metadata") + ) + expanded: List[str] = [] + for path in paths: + lookup = path[:-2] if path.endswith(".*") else path + req = self._prepare_v2_list_metadata_request(lookup) + try: + resp = await self.client_stub_v2.ListMetadata(req, **rpc_kwargs) + except AioRpcError as exc: + raise VSSClientError.from_grpc_error(exc) from exc + if not resp.metadata: + raise VSSClientError( + error={ + "code": grpc.StatusCode.NOT_FOUND.value[0], + "reason": grpc.StatusCode.NOT_FOUND.value[1], + "message": f"Path {path} not found on server", + }, + errors=[], + ) + expanded.extend(m.path for m in resp.metadata) + return list(dict.fromkeys(expanded)) + async def ensure_id_mapping(self, paths: Iterable[str], **rpc_kwargs): for path in paths: if path not in self.path_to_id_mapping: diff --git a/kuksa-client/tests/test_grpc.py b/kuksa-client/tests/test_grpc.py index d2f4a05..67f0f3d 100644 --- a/kuksa-client/tests/test_grpc.py +++ b/kuksa-client/tests/test_grpc.py @@ -670,6 +670,68 @@ async def subscribe_response_stream(**kwargs): 'Vehicle.Chassis.Height': Datapoint(666), } + async def test_subscribe_current_values_branch_path_expansion( + self, mocker, unused_tcp_port, + ): + """Branch path (e.g. 'Vehicle') is rejected by v2 Subscribe with + NOT_FOUND; client falls back to ListMetadata expansion and retries + with the resulting leaf signals (restores pre-0.5.1 semantics for + issue #53).""" + client = VSSClient('127.0.0.1', unused_tcp_port) + client.connected = True # To bypass connection check + + not_found = VSSClientError( + error={ + "code": grpc.StatusCode.NOT_FOUND.value[0], + "reason": grpc.StatusCode.NOT_FOUND.value[1], + "message": "Path not found", + }, + errors=[], + ) + + call_count = {"v2_subscribe": 0} + + async def v2_subscribe_side_effect(paths, **kwargs): + call_count["v2_subscribe"] += 1 + if call_count["v2_subscribe"] == 1: + # First call with branch path — server rejects + raise not_found + # Second call with expanded leaves — yield real data + yield [ + EntryUpdate(DataEntry( + 'Vehicle.Speed', value=Datapoint(42.0), + ), (Field.VALUE,)), + EntryUpdate(DataEntry( + 'Vehicle.ADAS.ABS.IsActive', value=Datapoint(True), + ), (Field.VALUE,)), + ] + mocker.patch.object( + client, 'v2_subscribe', side_effect=v2_subscribe_side_effect, + ) + + async def expand_side_effect(paths, **kwargs): + # Simulate ListMetadata resolving 'Vehicle' to two concrete leaves + assert list(paths) == ['Vehicle'] + return ['Vehicle.Speed', 'Vehicle.ADAS.ABS.IsActive'] + mocker.patch.object( + client, '_expand_v2_branch_paths', side_effect=expand_side_effect, + ) + + received_updates: Dict[str, Datapoint] = {} + async for updates in client.subscribe_current_values(['Vehicle']): + received_updates.update(updates) + + assert call_count["v2_subscribe"] == 2 + # First call used the original branch path; retry used expanded leaves. + first_paths = list(client.v2_subscribe.call_args_list[0][1]['paths']) + second_paths = list(client.v2_subscribe.call_args_list[1][1]['paths']) + assert first_paths == ['Vehicle'] + assert second_paths == ['Vehicle.Speed', 'Vehicle.ADAS.ABS.IsActive'] + assert received_updates == { + 'Vehicle.Speed': Datapoint(42.0), + 'Vehicle.ADAS.ABS.IsActive': Datapoint(True), + } + async def test_subscribe_target_values(self, mocker, unused_tcp_port): client = VSSClient('127.0.0.1', unused_tcp_port) client.connected = True # To bypass connection check