Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub(crate) mod zenoh {
qos::{CongestionControl, Priority, Reliability},
query::{
ConsolidationMode, Parameters, Querier, Query, QueryConsolidation, QueryTarget,
Queryable, Reply, ReplyError, Selector,
Queryable, Reply, ReplyError, ReplyKeyExpr, Selector,
},
sample::{Locality, Sample, SampleKind, SourceInfo},
scouting::{scout, Hello, Scout},
Expand Down
20 changes: 20 additions & 0 deletions src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ impl QueryTarget {
const DEFAULT: Self = Self::BestMatching;
}

enum_mapper!(zenoh::query::ReplyKeyExpr: u8 {
Any,
MatchingQuery,
});

#[pymethods]
impl ReplyKeyExpr {
#[classattr]
const DEFAULT: Self = Self::MatchingQuery;
}

enum_mapper!(zenoh::query::ConsolidationMode: u8 {
Auto,
None,
Expand Down Expand Up @@ -130,6 +141,10 @@ impl Query {
Ok(self.get_ref()?.attachment().cloned().map_into())
}

fn accepts_replies(&self) -> PyResult<ReplyKeyExpr> {
Ok(self.get_ref()?.accepts_replies().into_pyres()?.into())
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (key_expr, payload, *, encoding = None, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None))]
fn reply(
Expand Down Expand Up @@ -382,6 +397,11 @@ impl Querier {
Ok(self.get_ref()?.key_expr().clone().into())
}

#[getter]
fn accept_replies(&self) -> PyResult<ReplyKeyExpr> {
Ok(self.get_ref()?.accept_replies().into())
}

#[getter]
fn matching_status(&self, py: Python) -> PyResult<MatchingStatus> {
Ok(wait(py, self.get_ref()?.matching_status())?.into())
Expand Down
10 changes: 7 additions & 3 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
macros::{build, wrapper},
pubsub::{Publisher, Subscriber},
qos::{CongestionControl, Priority, Reliability},
query::{Querier, QueryConsolidation, QueryTarget, Queryable, Reply, Selector},
query::{Querier, QueryConsolidation, QueryTarget, Queryable, Reply, ReplyKeyExpr, Selector},
sample::{Locality, SourceInfo},
time::Timestamp,
utils::{duration, wait, IntoPython, MapInto},
Expand Down Expand Up @@ -150,7 +150,7 @@ impl Session {
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (selector, handler = None, *, target = None, consolidation = None, timeout = None, congestion_control = None, priority = None, express = None, payload = None, encoding = None, attachment = None, allowed_destination = None, source_info = None, cancellation_token = None))]
#[pyo3(signature = (selector, handler = None, *, target = None, consolidation = None, accept_replies = None, timeout = None, congestion_control = None, priority = None, express = None, payload = None, encoding = None, attachment = None, allowed_destination = None, source_info = None, cancellation_token = None))]
fn get(
&self,
py: Python,
Expand All @@ -160,6 +160,7 @@ impl Session {
#[pyo3(from_py_with = QueryConsolidation::from_py_opt)] consolidation: Option<
QueryConsolidation,
>,
accept_replies: Option<ReplyKeyExpr>,
#[pyo3(from_py_with = duration)] timeout: Option<Duration>,
congestion_control: Option<CongestionControl>,
priority: Option<Priority>,
Expand All @@ -176,6 +177,7 @@ impl Session {
self.0.get(selector),
target,
consolidation,
accept_replies,
timeout,
congestion_control,
priority,
Expand Down Expand Up @@ -257,7 +259,7 @@ impl Session {
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (key_expr, *, target = None, consolidation = None, timeout = None, congestion_control = None, priority = None, express = None, allowed_destination = None))]
#[pyo3(signature = (key_expr, *, target = None, consolidation = None, accept_replies = None, timeout = None, congestion_control = None, priority = None, express = None, allowed_destination = None))]
fn declare_querier(
&self,
py: Python,
Expand All @@ -266,6 +268,7 @@ impl Session {
#[pyo3(from_py_with = QueryConsolidation::from_py_opt)] consolidation: Option<
QueryConsolidation,
>,
accept_replies: Option<ReplyKeyExpr>,
#[pyo3(from_py_with = duration)] timeout: Option<Duration>,
congestion_control: Option<CongestionControl>,
priority: Option<Priority>,
Expand All @@ -276,6 +279,7 @@ impl Session {
self.0.declare_querier(key_expr),
target,
consolidation,
accept_replies,
timeout,
congestion_control,
priority,
Expand Down
41 changes: 39 additions & 2 deletions zenoh/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,9 @@ class Query:
By default, queries only accept replies whose key expression intersects with the query's.
I.e. it's not possible to send reply with key expression ``foo/bar`` to a query with
key expression ``baz/*``.
The query may contain special unstable parameter ``_anyke`` which enables disjoint replies.
To allow disjoint replies, use the ``accept_replies`` parameter with :attr:`ReplyKeyExpr.ANY`
in :meth:`Session.get` or :meth:`Session.declare_querier`.
Alternatively, the query may contain special parameter ``_anyke`` which also enables disjoint replies.
See the :class:`Selector` documentation for more information about this parameter.

See :ref:`query-reply` for more information on the query/reply paradigm.
Expand Down Expand Up @@ -837,6 +839,10 @@ class Query:
def attachment(self) -> ZBytes | None:
"""The attachment of this query, if any."""

def accepts_replies(self) -> ReplyKeyExpr:
"""Returns the :class:`ReplyKeyExpr` setting of this query, indicating whether replies
must match the query's key expression or can use any key expression."""

def reply(
self,
key_expr: _IntoKeyExpr,
Expand Down Expand Up @@ -965,6 +971,10 @@ class Querier:
def key_expr(self) -> KeyExpr:
"""Returns the :class:`KeyExpr` this querier sends queries on."""

@property
def accept_replies(self) -> ReplyKeyExpr:
"""Returns the :class:`ReplyKeyExpr` setting of this querier."""

@property
def matching_status(self) -> bool:
"""Returns true if there are :class:`Queryable`\\s matching the Querier's key expression and target, false otherwise."""
Expand Down Expand Up @@ -1092,6 +1102,29 @@ QueryTarget.ALL.__doc__ = (
)
QueryTarget.ALL_COMPLETE.__doc__ = """Deliver the query to all queryables matching the query's key expression that are declared as complete."""

@final
class ReplyKeyExpr(Enum):
"""Controls whether replies to a query must match the query's key expression.

:attr:`ReplyKeyExpr.MATCHING_QUERY` (default) means that replies must have a key expression
matching the query's key expression.
:attr:`ReplyKeyExpr.ANY` allows replies with any key expression, even if it doesn't match the query.

It is set by the ``accept_replies`` parameter of :meth:`Session.get` or :meth:`Session.declare_querier` methods.
"""

ANY = auto()
MATCHING_QUERY = auto()

DEFAULT = MATCHING_QUERY

ReplyKeyExpr.ANY.__doc__ = (
"""Accept replies whose key expressions may not match the query key expression."""
)
ReplyKeyExpr.MATCHING_QUERY.__doc__ = (
"""Accept replies whose key expressions match the query key expression."""
)

@final
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the point of this code in a stub?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It follows the pattern how documentation strigns are defined for other enum fields. As far as I remember, this is a way to document each enum variant individually

@_unstable
class Reliability(Enum):
Expand Down Expand Up @@ -1315,7 +1348,7 @@ class Selector:
for the exhaustive list):

- ``[unstable]`` ``_time``: used to express interest in only values dated within a certain time range, values for this parameter must be readable by the Zenoh Time DSL for the value to be considered valid.
- ``[unstable]`` ``_anyke``: used in queries to express interest in replies coming from any key expression. By default, only replies whose key expression match query's key expression are accepted. ``_anyke`` disables the query-reply key expression matching check.
- ``_anyke``: used in queries to express interest in replies coming from any key expression. By default, only replies whose key expression match query's key expression are accepted. ``_anyke`` disables the query-reply key expression matching check. See also :attr:`ReplyKeyExpr.ANY` as the preferred API for this functionality.

See also: :ref:`key-expressions`, :ref:`query-parameters`
"""
Expand Down Expand Up @@ -1438,6 +1471,7 @@ class Session:
*,
target: QueryTarget | None = None,
consolidation: _IntoQueryConsolidation | None = None,
accept_replies: ReplyKeyExpr | None = None,
timeout: float | int | None = None,
congestion_control: CongestionControl | None = None,
priority: Priority | None = None,
Expand All @@ -1462,6 +1496,7 @@ class Session:
*,
target: QueryTarget | None = None,
consolidation: _IntoQueryConsolidation | None = None,
accept_replies: ReplyKeyExpr | None = None,
timeout: float | int | None = None,
congestion_control: CongestionControl | None = None,
priority: Priority | None = None,
Expand All @@ -1486,6 +1521,7 @@ class Session:
*,
target: QueryTarget | None = None,
consolidation: _IntoQueryConsolidation | None = None,
accept_replies: ReplyKeyExpr | None = None,
timeout: float | int | None = None,
congestion_control: CongestionControl | None = None,
priority: Priority | None = None,
Expand Down Expand Up @@ -1584,6 +1620,7 @@ class Session:
*,
target: QueryTarget | None = None,
consolidation: _IntoQueryConsolidation | None = None,
accept_replies: ReplyKeyExpr | None = None,
timeout: float | int | None = None,
congestion_control: CongestionControl | None = None,
priority: Priority | None = None,
Expand Down
Loading