Skip to content

Conversation

@nicoschmdt
Copy link
Contributor

@nicoschmdt nicoschmdt commented Jan 21, 2026

A complement of #3657.

Code used for testing the endpoints can be found here.

Summary by Sourcery

Extend Zenoh integration to support additional HTTP methods and richer response handling across FastAPI routes.

New Features:

  • Route Zenoh POST, PUT, and DELETE requests to FastAPI versioned endpoints alongside existing GET handling.
  • Support publishing handler responses, including streaming responses, back over Zenoh via a response topic namespace.
  • Parse and type-cast Zenoh payload/attachment parameters into typed FastAPI handler arguments, including support for complex types.

Enhancements:

  • Introduce a route tree structure to match Zenoh samples to registered endpoints and HTTP methods using wildcard path segments.
  • Generalize JSON serialization through a helper that robustly handles non-serializable data and errors.
  • Refine Zenoh path cleaning and parameter matching to allow typed path parameters and more flexible routing.

@sourcery-ai
Copy link

sourcery-ai bot commented Jan 21, 2026

Reviewer's Guide

Adds Zenoh support for POST, PUT, and DELETE HTTP methods by generalizing route discovery, introducing a routing tree to map Zenoh samples to FastAPI endpoints by method and path, handling parameter parsing and type conversion, and supporting both standard and streaming responses via Zenoh publishers/subscribers.

Sequence diagram for Zenoh POST/PUT/DELETE handling via subscribers

sequenceDiagram
  actor ZenohClient
  participant ZenohSession
  participant ZenohRouter
  participant TreeNode
  participant EndpointFunc
  participant Publisher

  ZenohClient->>ZenohSession: send PUT or DELETE sample
  ZenohSession->>ZenohRouter: invoke subscriber wrapper(sample)

  ZenohRouter->>ZenohRouter: _should_process(sample, endpoint, func)
  ZenohRouter->>TreeNode: tree.get_match(sample.key_expr)
  TreeNode-->>ZenohRouter: matched_path, node
  ZenohRouter->>TreeNode: node.get_corresponding_method(sample.kind)
  TreeNode-->>ZenohRouter: func
  ZenohRouter-->>ZenohRouter: decide to process or return

  ZenohRouter->>ZenohSession: submit_to_executor(_handle_async)
  ZenohSession-->>ZenohRouter: run _handle_async()

  ZenohRouter->>ZenohRouter: get_parameters(sample, func)
  ZenohRouter->>EndpointFunc: await func(**parameters)
  EndpointFunc-->>ZenohRouter: result

  ZenohRouter->>ZenohRouter: process_subscriber_response(result, sample)
  ZenohRouter->>ZenohSession: session.declare_publisher(response/key_expr)
  ZenohSession-->>Publisher: create publisher

  alt result is StreamingResponse
    ZenohRouter->>ZenohRouter: _handle_streaming_response(result, publisher)
    ZenohRouter->>Publisher: publisher.put(chunk_data) loop for chunks
  else non streaming result
    ZenohRouter->>Publisher: publisher.put(handle_json(result))
  end

  Publisher-->>ZenohClient: publish response samples
Loading

Updated class diagram for ZenohRouter and TreeNode

classDiagram

class ZenohSession {
  +ZenohSession(service_name: str)
  +zenoh_config(service_name: str) void
  +submit_to_executor(task: Callable) void
  +session: zenoh.Session
}

class TreeNode {
  +segment: str
  +children: dict
  +is_valid: bool
  +methods: dict
  +TreeNode(segment: str)
  +add_child(child: TreeNode) TreeNode
  +get_methods() dict
  +add_node(segments: list, method: str, func: Callable) void
  +process_path(path: str, method: str, func: Callable) void
  +get_match(path: str) tuple
  +get_corresponding_method(sampleKind: zenoh.SampleKind) Callable
}

class ZenohRouter {
  -prefix: str
  -zenoh_session: ZenohSession
  -tree: TreeNode
  +ZenohRouter(service_name: str)
  +add_queryable(path: str, func: Callable) void
  +add_subscriber(path: str, func: Callable) void
  +process_subscriber_response(result: Any, sample: zenoh.Sample) void
  +_handle_streaming_response(result: StreamingResponse, publisher: zenoh.Publisher) void
  +add_routes_to_zenoh(app: fastapi.FastAPI) void
  +_get_methods(app: fastapi.FastAPI) list
  +get_route_path(path: str) str
  +_should_process(sample: zenoh.Sample, endpoint: str, func: Callable) bool
}

ZenohRouter --> ZenohSession : uses
ZenohRouter --> TreeNode : uses
TreeNode "1" --> "*" TreeNode : children
Loading

Flow diagram for Zenoh parameter parsing and type validation

flowchart TD
  A["Start get_parameters"] --> B["Check sample.kind"]
  B -->|PUT| C["source = sample.payload"]
  B -->|else| D["source = sample.attachment"]
  C --> E["source is None?"]
  D --> E
  E -->|Yes| F["Return empty dict"]
  E -->|No| G["parameters_process(source.to_string())"]
  G --> H["parameters_type_validation(parameters, func)"]
  H --> I["Return typed_parameters"]

  subgraph parameters_process
    G1["Split string by ';'"] --> G2["For each parameter"]
    G2 --> G3["Trim and skip empty"]
    G3 --> G4["Contains '='?"]
    G4 -->|No| G5["Log warning and continue"]
    G4 -->|Yes| G6["Split into key and value"]
    G6 --> G7["Trim and store key -> value"]
  end

  subgraph parameters_type_validation
    H1["Inspect signature of func"] --> H2["For each key, value in parameters"]
    H2 --> H3["Key in signature?"]
    H3 -->|No| H4["Skip parameter"]
    H3 -->|Yes| H5["Get annotation"]
    H5 --> H6["is_primitive(annotation)?"]
    H6 -->|Yes| H7["typed_parameters[key] = annotation(value)"]
    H6 -->|No| H8["parsed = ast.literal_eval(value)"]
    H8 --> H9["typed_parameters[key] = annotation(**parsed)"]
    H7 --> H10["On error: log warning, skip"]
    H9 --> H10
  end
Loading

File-Level Changes

Change Details Files
Generalize Zenoh routing to support multiple HTTP methods and non-GET handling, including streaming responses.
  • Extend PARAM_REGEX to support colon in path parameters and introduce RESPONSE_PREFIX for response topics.
  • Refactor ZenohRouter.add_routes_to_zenoh to collect VersionedAPIRoute endpoints for GET, POST, PUT, DELETE and decide between queryable vs subscriber based on method and streaming return type.
  • Add add_subscriber and process_subscriber_response to handle Zenoh subscriptions for non-GET or streaming endpoints, publishing responses (including StreamingResponse bodies) to response/<key_expr>.
  • Introduce is_streaming_response utility to detect endpoints returning StreamingResponse and route them via subscribers.
core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py
Introduce a path/method routing tree to map Zenoh samples (key_expr + SampleKind) to the correct FastAPI endpoint.
  • Add TreeNode class with support for building a tree of path segments, associating HTTP methods to endpoint callables, and wildcard '*' matching.
  • Instantiate a TreeNode in ZenohRouter, populate it from FastAPI routes via process_path, and use it in _should_process to match samples to endpoints and HTTP methods.
  • Map zenoh.SampleKind to HTTP methods (DELETE vs other methods) in TreeNode.get_corresponding_method to choose the right handler.
core/libs/commonwealth/src/commonwealth/utils/tree.py
core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py
Improve async handling and parameter/JSON processing for Zenoh handlers.
  • Introduce _async_to_sync decorator and use it in queryable and subscriber wrappers so async FastAPI handlers can be run on the ThreadPoolExecutor cleanly.
  • Refactor add_queryable to use handle_json for response serialization and submit the async handler directly to the executor.
  • Add get_parameters to derive parameters from Zenoh Sample payload/attachment based on SampleKind and parameter string format.
  • Implement parameters_process and parameters_type_validation to parse semicolon-separated key=value pairs, coerce them to the endpoint’s annotated parameter types (primitive or Pydantic-like), and log malformed parameters or conversion errors.
  • Add is_primitive and handle_json helpers to support type checking and robust JSON serialization with error logging and fallback.
core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py

Possibly linked issues

  • #core: PR implements zenoh-based handling (GET, POST, PUT, DELETE) and routing, directly advancing the Python services-to-zenoh migration.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 5 issues, and left some high level feedback:

  • In TreeNode.get_corresponding_method, all non-DELETE requests are mapped to the first non-DELETE method stored, so different handlers for POST vs PUT on the same path can never be selected; consider keying off the actual SampleKind (e.g., PUT vs PUT, PUT vs POST) instead of collapsing them.
  • The _get_methods helper uses next(iter(route.methods), None) and so only ever registers a single HTTP method per FastAPI route; if a route is declared with multiple methods (e.g., GET/POST), the others are silently ignored and won't receive zenoh wiring—consider iterating all route.methods.
  • In parameters_type_validation, using ast.literal_eval plus annotation(**parsed) for all non-primitive annotations is fragile and may be unsafe for unexpected types; you might want to explicitly support only known container/model types (e.g., Pydantic models) and fall back or reject in other cases instead of attempting a generic eval-and-call.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `TreeNode.get_corresponding_method`, all non-DELETE requests are mapped to the first non-DELETE method stored, so different handlers for POST vs PUT on the same path can never be selected; consider keying off the actual `SampleKind` (e.g., PUT vs PUT, PUT vs POST) instead of collapsing them.
- The `_get_methods` helper uses `next(iter(route.methods), None)` and so only ever registers a single HTTP method per FastAPI route; if a route is declared with multiple methods (e.g., GET/POST), the others are silently ignored and won't receive zenoh wiring—consider iterating all route.methods.
- In `parameters_type_validation`, using `ast.literal_eval` plus `annotation(**parsed)` for all non-primitive annotations is fragile and may be unsafe for unexpected types; you might want to explicitly support only known container/model types (e.g., Pydantic models) and fall back or reject in other cases instead of attempting a generic eval-and-call.

## Individual Comments

### Comment 1
<location> `core/libs/commonwealth/src/commonwealth/utils/tree.py:62-67` </location>
<code_context>
+            return matched_path, node
+        return None
+
+    def get_corresponding_method(self, sampleKind: zenoh.SampleKind) -> Callable[..., Any] | None:
+        if sampleKind == zenoh.SampleKind.DELETE:
+            keyword = "DELETE"
+        else:
+            methods = [method for method, _ in self.methods.items() if method != "DELETE"]
+            keyword = methods[0]
+
+        try:
</code_context>

<issue_to_address>
**issue (bug_risk):** Guard against empty or inconsistent method mappings when resolving the handler, to avoid IndexError and surprising routing.

In `get_corresponding_method`, when `sampleKind != zenoh.SampleKind.DELETE` you build `methods = [...]` and then access `methods[0]`. If `self.methods` is empty or only contains `
</issue_to_address>

### Comment 2
<location> `core/libs/commonwealth/src/commonwealth/utils/tree.py:63-67` </location>
<code_context>
+        return None
+
+    def get_corresponding_method(self, sampleKind: zenoh.SampleKind) -> Callable[..., Any] | None:
+        if sampleKind == zenoh.SampleKind.DELETE:
+            keyword = "DELETE"
+        else:
+            methods = [method for method, _ in self.methods.items() if method != "DELETE"]
+            keyword = methods[0]
+
+        try:
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Clarify and tighten the mapping between SampleKind and HTTP methods to avoid arbitrary handler selection.

Here you special-case `DELETE` but otherwise just pick the first non-`DELETE` method from `self.methods`. If a node supports both `GET` and `POST`, the selected handler depends on dict insertion order, and different `SampleKind` values (e.g. `PUT` vs `POST`) end up mapped to the same handler. Consider defining an explicit `SampleKind`→HTTP method mapping, or at least a deterministic preference order (e.g. prefer `PUT`, then `POST`, etc.) to avoid dispatching to the wrong endpoint.

Suggested implementation:

```python
        if node.is_valid:
            return matched_path, node
        return None

    def get_corresponding_method(self, sampleKind: zenoh.SampleKind) -> Callable[..., Any] | None:
        """
        Map a zenoh.SampleKind to an HTTP method in a deterministic way.

        1. Try an explicit SampleKind -> HTTP verb mapping.
        2. If the mapped verb is not supported by this node, fall back to a
           deterministic preference order among supported non-DELETE methods.
        3. As a final fallback, choose the first non-DELETE method in sorted
           order to keep behavior deterministic.
        """
        # Explicit mapping from SampleKind to preferred HTTP method
        samplekind_to_method: dict[zenoh.SampleKind, str] = {
            zenoh.SampleKind.DELETE: "DELETE",
            getattr(zenoh.SampleKind, "PUT", None): "PUT",
            getattr(zenoh.SampleKind, "PATCH", None): "PATCH",
        }

        # Remove any None keys in case PUT/PATCH do not exist in this zenoh version
        samplekind_to_method = {
            k: v for k, v in samplekind_to_method.items() if k is not None
        }

        keyword: str | None = samplekind_to_method.get(sampleKind)

        # If there is no direct mapping or the mapped method is not supported,
        # fall back to a deterministic preference order (excluding DELETE).
        if keyword is None or keyword not in self.methods:
            preference_order: tuple[str, ...] = ("PUT", "POST", "PATCH", "GET")
            keyword = None
            for method in preference_order:
                if method in self.methods and method != "DELETE":
                    keyword = method
                    break

            # As a last resort, pick the first non-DELETE method in sorted order
            if keyword is None:
                non_delete_methods = sorted(
                    m for m in self.methods.keys() if m != "DELETE"
                )
                if non_delete_methods:
                    keyword = non_delete_methods[0]

        if keyword is None:
            return None

        return self.methods.get(keyword)

```

If `Callable` and `Any` are not already imported in this file, add:

```python
from typing import Any, Callable
```

near the other imports. Also ensure that `zenoh.SampleKind.PUT` and `zenoh.SampleKind.PATCH` exist in the version of `zenoh` you are using; the `getattr` usage in the mapping is there to avoid attribute errors on older versions.
</issue_to_address>

### Comment 3
<location> `core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py:259-268` </location>
<code_context>
+    return result
+
+
+def parameters_type_validation(parameters: dict[str, Any], func: Callable[..., Any]) -> dict[str, Any]:
+    signature = inspect.signature(func)
+    typed_parameters = {}
+
+    for key, value in parameters.items():
+        if key not in signature.parameters:
+            continue
+
+        annotation = signature.parameters[key].annotation
+        try:
+            if is_primitive(annotation):
+                typed_parameters[key] = annotation(value)
+            else:
+                parsed = ast.literal_eval(value)
+                typed_parameters[key] = annotation(**parsed)
+        except Exception as e:
+            logger.warning(f"Error converting parameter {key} to type {annotation}: {e}")
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Parameter type conversion logic is fragile for non-primitive annotations and may skip valid parameters.

In `parameters_type_validation`, any annotation outside `(int, float, str, bool)` is treated as a complex type, parsed with `ast.literal_eval`, and then instantiated via `annotation(**parsed)`. This will fail for common cases such as:
- `list`/`dict`/`set` (or `list[str]`, etc.), which aren’t primitives but also don’t support `annotation(**parsed)`, so you end up with warnings and dropped params.
- `Optional[int]` / `Union[int, None]` and other `typing` constructs, which also fall into this branch and fail.

Consider special-casing container types (returning `parsed` when the structure already matches) and handling `typing` constructs more defensively, or falling back to the raw `value` when conversion is unclear, to avoid noisy warnings and unnecessary parameter drops.

Suggested implementation:

```python
def parameters_type_validation(parameters: dict[str, Any], func: Callable[..., Any]) -> dict[str, Any]:
    signature = inspect.signature(func)
    typed_parameters: dict[str, Any] = {}

    # Local import to avoid introducing a hard dependency if typing.get_origin is not needed elsewhere
    try:
        from typing import get_origin, get_args  # type: ignore
    except ImportError:  # Python < 3.8 fallback – we simply won't special‑case typing constructs
        get_origin = lambda x: None  # type: ignore
        get_args = lambda x: ()      # type: ignore

    def _convert_value(raw_value: Any, annotation: Any) -> Any:
        """
        Best‑effort conversion of string/primitive values to the annotated type.

        - Primitives: cast directly (int, float, str, bool).
        - Built‑in containers: return the parsed structure (list/dict/set/tuple).
        - typing.Optional/Union: try the non‑None argument if it's primitive, otherwise
          leave as raw.
        - Other annotations: try to interpret as a dataclass/typed‑object taking **kwargs
          from a parsed dict; if that fails, fall back to the raw value.
        """
        if annotation is inspect._empty:
            return raw_value

        # Handle primitives
        if is_primitive(annotation):
            return annotation(raw_value)

        origin = get_origin(annotation)
        args = get_args(annotation)

        # Handle built‑in containers and collections from typing (list[str], dict[str, int], etc.)
        container_types = (list, dict, set, tuple)
        container_origin = origin or annotation

        if container_origin in container_types:
            parsed = raw_value
            if isinstance(raw_value, str):
                try:
                    parsed = ast.literal_eval(raw_value)
                except Exception:
                    # If we cannot parse, just return raw_value instead of dropping it
                    return raw_value

            # If the parsed value already matches the expected container type, just use it
            if isinstance(parsed, container_origin):
                return parsed

            # Last resort: try to coerce into the container without enforcing element types
            try:
                return container_origin(parsed)
            except Exception:
                return raw_value

        # Handle Optional[T] / Union[T, None] in a conservative way
        if origin is not None and origin is getattr(__import__("typing"), "Union", None):
            non_none_args = [a for a in args if a is not type(None)]  # noqa: E721
            if len(non_none_args) == 1 and is_primitive(non_none_args[0]):
                try:
                    return non_none_args[0](raw_value)
                except Exception:
                    return raw_value
            # If we cannot clearly determine the conversion, keep the raw value
            return raw_value

        # Fallback for custom / complex types: try to instantiate with **parsed
        if isinstance(raw_value, str):
            try:
                parsed = ast.literal_eval(raw_value)
            except Exception:
                return raw_value
        else:
            parsed = raw_value

        if isinstance(parsed, dict):
            try:
                return annotation(**parsed)
            except Exception:
                return raw_value

        # If nothing else worked, return the raw value
        return raw_value

    for key, value in parameters.items():
        if key not in signature.parameters:
            continue

        annotation = signature.parameters[key].annotation
        try:
            typed_parameters[key] = _convert_value(value, annotation)
        except Exception as e:
            logger.warning(f"Error converting parameter {key} to type {annotation}: {e}")
            # Preserve the original value instead of dropping the parameter
            typed_parameters[key] = value

    return typed_parameters

```

To fully support handling of typing constructs (e.g. `Optional[int]`, `list[str]`) in a more idiomatic way, you may want to:
1. Add a module‑level import near the top of `zenoh_helper.py`:
   - `from typing import get_origin, get_args`
   and then remove the local `try/except ImportError` block and directly use `get_origin`/`get_args` in `parameters_type_validation`.
2. If your project already uses `typing.get_origin`/`get_args` elsewhere with a helper, you might instead want to reuse that helper to keep behavior consistent across the codebase.
</issue_to_address>

### Comment 4
<location> `core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py:269-270` </location>
<code_context>
+
+        annotation = signature.parameters[key].annotation
+        try:
+            if is_primitive(annotation):
+                typed_parameters[key] = annotation(value)
+            else:
+                parsed = ast.literal_eval(value)
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Boolean parameter parsing via direct `bool(value)` is misleading for typical string inputs.

For primitive annotations this makes `bool("false")` and `bool("0")` evaluate to `True`, which is likely surprising for string parameters. If booleans are provided as strings, special-case `bool` to parse known textual forms (e.g. "true"/"false", "1"/"0") and handle or reject anything else explicitly.

Suggested implementation:

```python
        annotation = signature.parameters[key].annotation
        try:
            if is_primitive(annotation):
                if annotation is bool:
                    typed_parameters[key] = parse_bool(value)
                else:
                    typed_parameters[key] = annotation(value)
            else:
                parsed = ast.literal_eval(value)
                typed_parameters[key] = annotation(**parsed)

```

```python
def is_primitive(value: Any) -> bool:
    return value in (int, float, str, bool)


def parse_bool(value: Any) -> bool:
    """
    Parse a value into a boolean, with explicit handling for common textual forms.

    Accepts:
    - bool: returned as-is
    - str: case-insensitive handling of "true"/"false", "1"/"0", "yes"/"no", "y"/"n", "t"/"f"
    - other types: falls back to bool(value)

    Raises:
        ValueError: if the string value does not match a known boolean representation.
    """
    if isinstance(value, bool):
        return value

    if isinstance(value, str):
        normalized = value.strip().lower()
        if normalized in {"true", "t", "yes", "y", "1"}:
            return True
        if normalized in {"false", "f", "no", "n", "0"}:
            return False
        raise ValueError(f"Invalid boolean string value: {value!r}")

    return bool(value)

```
</issue_to_address>

### Comment 5
<location> `core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py:248` </location>
<code_context>
+            continue
+
+        if "=" not in parameter:
+            logger.warning(f"Skipping malformated parameter (no '=' found): {parameter}")
+            continue
+
</code_context>

<issue_to_address>
**nitpick (typo):** Fix the typo in the warning message for malformed parameters.

```suggestion
            logger.warning(f"Skipping malformed parameter (no '=' found): {parameter}")
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +62 to +67
def get_corresponding_method(self, sampleKind: zenoh.SampleKind) -> Callable[..., Any] | None:
if sampleKind == zenoh.SampleKind.DELETE:
keyword = "DELETE"
else:
methods = [method for method, _ in self.methods.items() if method != "DELETE"]
keyword = methods[0]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Guard against empty or inconsistent method mappings when resolving the handler, to avoid IndexError and surprising routing.

In get_corresponding_method, when sampleKind != zenoh.SampleKind.DELETE you build methods = [...] and then access methods[0]. If self.methods is empty or only contains `

Comment on lines +63 to +67
if sampleKind == zenoh.SampleKind.DELETE:
keyword = "DELETE"
else:
methods = [method for method, _ in self.methods.items() if method != "DELETE"]
keyword = methods[0]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Clarify and tighten the mapping between SampleKind and HTTP methods to avoid arbitrary handler selection.

Here you special-case DELETE but otherwise just pick the first non-DELETE method from self.methods. If a node supports both GET and POST, the selected handler depends on dict insertion order, and different SampleKind values (e.g. PUT vs POST) end up mapped to the same handler. Consider defining an explicit SampleKind→HTTP method mapping, or at least a deterministic preference order (e.g. prefer PUT, then POST, etc.) to avoid dispatching to the wrong endpoint.

Suggested implementation:

        if node.is_valid:
            return matched_path, node
        return None

    def get_corresponding_method(self, sampleKind: zenoh.SampleKind) -> Callable[..., Any] | None:
        """
        Map a zenoh.SampleKind to an HTTP method in a deterministic way.

        1. Try an explicit SampleKind -> HTTP verb mapping.
        2. If the mapped verb is not supported by this node, fall back to a
           deterministic preference order among supported non-DELETE methods.
        3. As a final fallback, choose the first non-DELETE method in sorted
           order to keep behavior deterministic.
        """
        # Explicit mapping from SampleKind to preferred HTTP method
        samplekind_to_method: dict[zenoh.SampleKind, str] = {
            zenoh.SampleKind.DELETE: "DELETE",
            getattr(zenoh.SampleKind, "PUT", None): "PUT",
            getattr(zenoh.SampleKind, "PATCH", None): "PATCH",
        }

        # Remove any None keys in case PUT/PATCH do not exist in this zenoh version
        samplekind_to_method = {
            k: v for k, v in samplekind_to_method.items() if k is not None
        }

        keyword: str | None = samplekind_to_method.get(sampleKind)

        # If there is no direct mapping or the mapped method is not supported,
        # fall back to a deterministic preference order (excluding DELETE).
        if keyword is None or keyword not in self.methods:
            preference_order: tuple[str, ...] = ("PUT", "POST", "PATCH", "GET")
            keyword = None
            for method in preference_order:
                if method in self.methods and method != "DELETE":
                    keyword = method
                    break

            # As a last resort, pick the first non-DELETE method in sorted order
            if keyword is None:
                non_delete_methods = sorted(
                    m for m in self.methods.keys() if m != "DELETE"
                )
                if non_delete_methods:
                    keyword = non_delete_methods[0]

        if keyword is None:
            return None

        return self.methods.get(keyword)

If Callable and Any are not already imported in this file, add:

from typing import Any, Callable

near the other imports. Also ensure that zenoh.SampleKind.PUT and zenoh.SampleKind.PATCH exist in the version of zenoh you are using; the getattr usage in the mapping is there to avoid attribute errors on older versions.

Comment on lines +259 to +268
def parameters_type_validation(parameters: dict[str, Any], func: Callable[..., Any]) -> dict[str, Any]:
signature = inspect.signature(func)
typed_parameters = {}

for key, value in parameters.items():
if key not in signature.parameters:
continue

annotation = signature.parameters[key].annotation
try:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Parameter type conversion logic is fragile for non-primitive annotations and may skip valid parameters.

In parameters_type_validation, any annotation outside (int, float, str, bool) is treated as a complex type, parsed with ast.literal_eval, and then instantiated via annotation(**parsed). This will fail for common cases such as:

  • list/dict/set (or list[str], etc.), which aren’t primitives but also don’t support annotation(**parsed), so you end up with warnings and dropped params.
  • Optional[int] / Union[int, None] and other typing constructs, which also fall into this branch and fail.

Consider special-casing container types (returning parsed when the structure already matches) and handling typing constructs more defensively, or falling back to the raw value when conversion is unclear, to avoid noisy warnings and unnecessary parameter drops.

Suggested implementation:

def parameters_type_validation(parameters: dict[str, Any], func: Callable[..., Any]) -> dict[str, Any]:
    signature = inspect.signature(func)
    typed_parameters: dict[str, Any] = {}

    # Local import to avoid introducing a hard dependency if typing.get_origin is not needed elsewhere
    try:
        from typing import get_origin, get_args  # type: ignore
    except ImportError:  # Python < 3.8 fallback – we simply won't special‑case typing constructs
        get_origin = lambda x: None  # type: ignore
        get_args = lambda x: ()      # type: ignore

    def _convert_value(raw_value: Any, annotation: Any) -> Any:
        """
        Best‑effort conversion of string/primitive values to the annotated type.

        - Primitives: cast directly (int, float, str, bool).
        - Built‑in containers: return the parsed structure (list/dict/set/tuple).
        - typing.Optional/Union: try the non‑None argument if it's primitive, otherwise
          leave as raw.
        - Other annotations: try to interpret as a dataclass/typed‑object taking **kwargs
          from a parsed dict; if that fails, fall back to the raw value.
        """
        if annotation is inspect._empty:
            return raw_value

        # Handle primitives
        if is_primitive(annotation):
            return annotation(raw_value)

        origin = get_origin(annotation)
        args = get_args(annotation)

        # Handle built‑in containers and collections from typing (list[str], dict[str, int], etc.)
        container_types = (list, dict, set, tuple)
        container_origin = origin or annotation

        if container_origin in container_types:
            parsed = raw_value
            if isinstance(raw_value, str):
                try:
                    parsed = ast.literal_eval(raw_value)
                except Exception:
                    # If we cannot parse, just return raw_value instead of dropping it
                    return raw_value

            # If the parsed value already matches the expected container type, just use it
            if isinstance(parsed, container_origin):
                return parsed

            # Last resort: try to coerce into the container without enforcing element types
            try:
                return container_origin(parsed)
            except Exception:
                return raw_value

        # Handle Optional[T] / Union[T, None] in a conservative way
        if origin is not None and origin is getattr(__import__("typing"), "Union", None):
            non_none_args = [a for a in args if a is not type(None)]  # noqa: E721
            if len(non_none_args) == 1 and is_primitive(non_none_args[0]):
                try:
                    return non_none_args[0](raw_value)
                except Exception:
                    return raw_value
            # If we cannot clearly determine the conversion, keep the raw value
            return raw_value

        # Fallback for custom / complex types: try to instantiate with **parsed
        if isinstance(raw_value, str):
            try:
                parsed = ast.literal_eval(raw_value)
            except Exception:
                return raw_value
        else:
            parsed = raw_value

        if isinstance(parsed, dict):
            try:
                return annotation(**parsed)
            except Exception:
                return raw_value

        # If nothing else worked, return the raw value
        return raw_value

    for key, value in parameters.items():
        if key not in signature.parameters:
            continue

        annotation = signature.parameters[key].annotation
        try:
            typed_parameters[key] = _convert_value(value, annotation)
        except Exception as e:
            logger.warning(f"Error converting parameter {key} to type {annotation}: {e}")
            # Preserve the original value instead of dropping the parameter
            typed_parameters[key] = value

    return typed_parameters

To fully support handling of typing constructs (e.g. Optional[int], list[str]) in a more idiomatic way, you may want to:

  1. Add a module‑level import near the top of zenoh_helper.py:
    • from typing import get_origin, get_args
      and then remove the local try/except ImportError block and directly use get_origin/get_args in parameters_type_validation.
  2. If your project already uses typing.get_origin/get_args elsewhere with a helper, you might instead want to reuse that helper to keep behavior consistent across the codebase.

Comment on lines +269 to +270
if is_primitive(annotation):
typed_parameters[key] = annotation(value)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Boolean parameter parsing via direct bool(value) is misleading for typical string inputs.

For primitive annotations this makes bool("false") and bool("0") evaluate to True, which is likely surprising for string parameters. If booleans are provided as strings, special-case bool to parse known textual forms (e.g. "true"/"false", "1"/"0") and handle or reject anything else explicitly.

Suggested implementation:

        annotation = signature.parameters[key].annotation
        try:
            if is_primitive(annotation):
                if annotation is bool:
                    typed_parameters[key] = parse_bool(value)
                else:
                    typed_parameters[key] = annotation(value)
            else:
                parsed = ast.literal_eval(value)
                typed_parameters[key] = annotation(**parsed)
def is_primitive(value: Any) -> bool:
    return value in (int, float, str, bool)


def parse_bool(value: Any) -> bool:
    """
    Parse a value into a boolean, with explicit handling for common textual forms.

    Accepts:
    - bool: returned as-is
    - str: case-insensitive handling of "true"/"false", "1"/"0", "yes"/"no", "y"/"n", "t"/"f"
    - other types: falls back to bool(value)

    Raises:
        ValueError: if the string value does not match a known boolean representation.
    """
    if isinstance(value, bool):
        return value

    if isinstance(value, str):
        normalized = value.strip().lower()
        if normalized in {"true", "t", "yes", "y", "1"}:
            return True
        if normalized in {"false", "f", "no", "n", "0"}:
            return False
        raise ValueError(f"Invalid boolean string value: {value!r}")

    return bool(value)

continue

if "=" not in parameter:
logger.warning(f"Skipping malformated parameter (no '=' found): {parameter}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick (typo): Fix the typo in the warning message for malformed parameters.

Suggested change
logger.warning(f"Skipping malformated parameter (no '=' found): {parameter}")
logger.warning(f"Skipping malformed parameter (no '=' found): {parameter}")

Copy link
Member

@patrickelectric patrickelectric left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the integration in a service service just to see how does it integrates ?

Comment on lines +14 to +17
if child.segment in self.children:
return self.children[child.segment]
self.children[child.segment] = child
return child
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if child.segment in self.children:
return self.children[child.segment]
self.children[child.segment] = child
return child
if child.segment not in self.children:
self.children[child.segment] = child
return self.children[child.segment]

self.children[child.segment] = child
return child

def get_methods(self) -> Dict[str, Callable[..., Any]]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend to use _ prefix for private properties and avoid using get prefix in functions.

https://github.com/bluerobotics/software-guidelines/blob/master/guidelines/style.md



class TreeNode:
def __init__(self, segment: str):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a description of how this class operates ? It's pretty complex just looking at the code.

@nicoschmdt nicoschmdt marked this pull request as draft January 28, 2026 15:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants