diff --git a/examples/z_info.py b/examples/z_info.py index 4b3edf2c..5abbffd9 100644 --- a/examples/z_info.py +++ b/examples/z_info.py @@ -11,6 +11,8 @@ # Contributors: # ZettaScale Zenoh Team, # +import time + import zenoh @@ -24,6 +26,30 @@ def main(conf: zenoh.Config): print(f"zid: {info.zid()}") print(f"routers: {info.routers_zid()}") print(f"peers: {info.peers_zid()}") + print("transports:") + for t in info.transports(): + print(f" - {t}") + print("links:") + for l in info.links(): + print(f" - {l}") + + # listen for transport and link events using try_recv polling + transport_listener = info.declare_transport_events_listener(history=False) + link_listener = info.declare_link_events_listener(history=False) + + print("Listening for transport and link events... (press Ctrl+C to exit)") + try: + while True: + while (event := transport_listener.try_recv()) is not None: + print(f"Transport event: {event}") + while (event := link_listener.try_recv()) is not None: + print(f"Link event: {event}") + time.sleep(0.1) + except KeyboardInterrupt: + pass + + transport_listener.undeclare() + link_listener.undeclare() # --- Command line argument parsing --- --- --- --- --- --- diff --git a/src/lib.rs b/src/lib.rs index ff9abe6a..09235cce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,7 +72,10 @@ pub(crate) mod zenoh { }, sample::{Locality, Sample, SampleKind, SourceInfo}, scouting::{scout, Hello, Scout}, - session::{open, EntityGlobalId, Session, SessionInfo}, + session::{ + open, EntityGlobalId, Link, LinkEvent, LinkEventsListener, Session, SessionInfo, + Transport, TransportEvent, TransportEventsListener, + }, time::{Timestamp, TimestampId, NTP64}, ZError, }; diff --git a/src/session.rs b/src/session.rs index a4564afe..92ae591e 100644 --- a/src/session.rs +++ b/src/session.rs @@ -15,22 +15,23 @@ use std::time::Duration; use pyo3::{ prelude::*, - types::{PyDict, PyList, PyTuple}, + types::{PyDict, PyIterator, PyList, PyTuple}, + IntoPyObjectExt, }; use zenoh::{session::EntityId, Wait}; use crate::{ bytes::{Encoding, ZBytes}, cancellation::CancellationToken, - config::{Config, ZenohId}, + config::{Config, WhatAmI, ZenohId}, handlers::{into_handler, HandlerImpl}, key_expr::KeyExpr, liveliness::Liveliness, - macros::{build, wrapper}, + macros::{build, option_wrapper, wrapper}, pubsub::{Publisher, Subscriber}, qos::{CongestionControl, Priority, Reliability}, query::{Querier, QueryConsolidation, QueryTarget, Queryable, Reply, ReplyKeyExpr, Selector}, - sample::{Locality, SourceInfo}, + sample::{Locality, SampleKind, SourceInfo}, time::Timestamp, utils::{duration, wait, IntoPython, MapInto}, }; @@ -311,6 +312,150 @@ pub(crate) fn open(py: Python, config: Config) -> PyResult { wrapper!(zenoh::session::SessionInfo); +wrapper!(zenoh::session::Transport); + +#[pymethods] +impl Transport { + #[getter] + fn zid(&self) -> ZenohId { + (*self.0.zid()).into() + } + + #[getter] + fn whatami(&self) -> WhatAmI { + self.0.whatami().into() + } + + #[getter] + fn is_qos(&self) -> bool { + self.0.is_qos() + } + + #[cfg(feature = "shared-memory")] + #[getter] + fn is_shm(&self) -> bool { + self.0.is_shm() + } + + #[getter] + fn is_multicast(&self) -> bool { + self.0.is_multicast() + } + + fn __eq__(&self, other: &Transport) -> bool { + self.0 == other.0 + } + + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } +} + +wrapper!(zenoh::session::Link); + +#[pymethods] +impl Link { + #[getter] + fn zid(&self) -> ZenohId { + (*self.0.zid()).into() + } + + #[getter] + fn src(&self) -> String { + self.0.src().to_string() + } + + #[getter] + fn dst(&self) -> String { + self.0.dst().to_string() + } + + #[getter] + fn group(&self) -> Option { + self.0.group().map(|g| g.to_string()) + } + + #[getter] + fn mtu(&self) -> u16 { + self.0.mtu() + } + + #[getter] + fn is_streamed(&self) -> bool { + self.0.is_streamed() + } + + #[getter] + fn interfaces<'py>(&self, py: Python<'py>) -> PyResult> { + let list = PyList::empty(py); + for interface in self.0.interfaces() { + list.append(interface)?; + } + Ok(list) + } + + #[getter] + fn auth_identifier(&self) -> Option { + self.0.auth_identifier().map(|s| s.to_string()) + } + + #[getter] + fn priorities(&self) -> Option<(u8, u8)> { + self.0.priorities() + } + + #[getter] + fn reliability(&self) -> Option { + self.0.reliability().map(Into::into) + } + + fn __eq__(&self, other: &Link) -> bool { + self.0 == other.0 + } + + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } +} + +wrapper!(zenoh::session::TransportEvent); + +#[pymethods] +impl TransportEvent { + #[getter] + fn kind(&self) -> SampleKind { + self.0.kind().into() + } + + #[getter] + fn transport(&self) -> Transport { + self.0.transport().clone().into() + } + + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } +} + +wrapper!(zenoh::session::LinkEvent); + +#[pymethods] +impl LinkEvent { + #[getter] + fn kind(&self) -> SampleKind { + self.0.kind().into() + } + + #[getter] + fn link(&self) -> Link { + self.0.link().clone().into() + } + + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } +} + #[pymethods] impl SessionInfo { fn zid(&self, py: Python) -> ZenohId { @@ -333,9 +478,145 @@ impl SessionInfo { Ok(list) } + fn transports<'py>(&self, py: Python<'py>) -> PyResult> { + let list = PyList::empty(py); + for transport in py.allow_threads(|| self.0.transports().wait()) { + list.append(transport.into_pyobject(py))?; + } + Ok(list) + } + + fn links<'py>(&self, py: Python<'py>) -> PyResult> { + let list = PyList::empty(py); + for link in py.allow_threads(|| self.0.links().wait()) { + list.append(link.into_pyobject(py))?; + } + Ok(list) + } + + #[pyo3(signature = (handler = None, *, history = None))] + fn declare_transport_events_listener( + &self, + py: Python, + handler: Option<&Bound>, + history: Option, + ) -> PyResult { + let (handler, background) = into_handler(py, handler, None)?; + let builder = build!(self.0.transport_events_listener(), history); + let mut listener = wait(py, builder.with(handler))?; + if background { + listener.set_background(true); + } + Ok(listener.into()) + } + + #[pyo3(signature = (handler = None, *, history = None))] + fn declare_link_events_listener( + &self, + py: Python, + handler: Option<&Bound>, + history: Option, + ) -> PyResult { + let (handler, background) = into_handler(py, handler, None)?; + let builder = build!(self.0.link_events_listener(), history); + let mut listener = wait(py, builder.with(handler))?; + if background { + listener.set_background(true); + } + Ok(listener.into()) + } + // TODO __repr__ } +option_wrapper!( + zenoh::session::TransportEventsListener>, + "Undeclared transport events listener" +); + +#[pymethods] +impl TransportEventsListener { + fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> &'a Bound<'py, Self> { + this + } + + #[pyo3(signature = (*_args, **_kwargs))] + fn __exit__( + &mut self, + py: Python, + _args: &Bound, + _kwargs: Option<&Bound>, + ) -> PyResult { + self.undeclare(py)?; + Ok(py.None()) + } + + #[getter] + fn handler(&self, py: Python) -> PyResult { + self.get_ref()?.handler().into_py_any(py) + } + + fn try_recv(&self, py: Python) -> PyResult { + self.get_ref()?.handler().try_recv(py) + } + + fn recv(&self, py: Python) -> PyResult { + self.get_ref()?.handler().recv(py) + } + + fn undeclare(&mut self, py: Python) -> PyResult<()> { + wait(py, self.take()?.undeclare()) + } + + fn __iter__<'py>(&self, py: Python<'py>) -> PyResult> { + self.handler(py)?.bind(py).try_iter() + } +} + +option_wrapper!( + zenoh::session::LinkEventsListener>, + "Undeclared link events listener" +); + +#[pymethods] +impl LinkEventsListener { + fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> &'a Bound<'py, Self> { + this + } + + #[pyo3(signature = (*_args, **_kwargs))] + fn __exit__( + &mut self, + py: Python, + _args: &Bound, + _kwargs: Option<&Bound>, + ) -> PyResult { + self.undeclare(py)?; + Ok(py.None()) + } + + #[getter] + fn handler(&self, py: Python) -> PyResult { + self.get_ref()?.handler().into_py_any(py) + } + + fn try_recv(&self, py: Python) -> PyResult { + self.get_ref()?.handler().try_recv(py) + } + + fn recv(&self, py: Python) -> PyResult { + self.get_ref()?.handler().recv(py) + } + + fn undeclare(&mut self, py: Python) -> PyResult<()> { + wait(py, self.take()?.undeclare()) + } + + fn __iter__<'py>(&self, py: Python<'py>) -> PyResult> { + self.handler(py)?.bind(py).try_iter() + } +} + wrapper!(zenoh::session::EntityGlobalId: Clone); #[pymethods] diff --git a/zenoh/__init__.pyi b/zenoh/__init__.pyi index a9a0c8a0..5fe200e5 100644 --- a/zenoh/__init__.pyi +++ b/zenoh/__init__.pyi @@ -1649,6 +1649,236 @@ class SessionInfo: def peers_zid(self) -> list[ZenohId]: """Return the :class:`ZenohId` of the zenoh peers this process is currently connected to.""" + def transports(self) -> list[Transport]: + """Return the list of :class:`Transport` instances for currently open transports.""" + + def links(self) -> list[Link]: + """Return the list of :class:`Link` instances for currently open links.""" + + @overload + def declare_transport_events_listener( + self, + handler: _RustHandler[TransportEvent] | None = None, + *, + history: bool | None = None, + ) -> TransportEventsListener[Handler[TransportEvent]]: + """Declare a listener for transport events (connections opening/closing). + + :param handler: The handler for receiving transport events (see :ref:`channels-and-callbacks`). + :param history: If True, existing transports will be reported upon declaration. + :returns: A :class:`TransportEventsListener` that yields :class:`TransportEvent` instances. + """ + + @overload + def declare_transport_events_listener( + self, + handler: _PythonHandler[TransportEvent, _H], + *, + history: bool | None = None, + ) -> TransportEventsListener[_H]: ... + @overload + def declare_transport_events_listener( + self, handler: _PythonCallback[TransportEvent], *, history: bool | None = None + ) -> TransportEventsListener[None]: ... + @overload + def declare_link_events_listener( + self, + handler: _RustHandler[LinkEvent] | None = None, + *, + history: bool | None = None, + ) -> LinkEventsListener[Handler[LinkEvent]]: + """Declare a listener for link events (links being added/removed). + + :param handler: The handler for receiving link events (see :ref:`channels-and-callbacks`). + :param history: If True, existing links will be reported upon declaration. + :returns: A :class:`LinkEventsListener` that yields :class:`LinkEvent` instances. + """ + + @overload + def declare_link_events_listener( + self, handler: _PythonHandler[LinkEvent, _H], *, history: bool | None = None + ) -> LinkEventsListener[_H]: ... + @overload + def declare_link_events_listener( + self, handler: _PythonCallback[LinkEvent], *, history: bool | None = None + ) -> LinkEventsListener[None]: ... + +@final +class Transport: + """Information about a Zenoh transport connection. + + A Transport represents a connection to another Zenoh node (peer or router). + It provides information about the remote node and the transport characteristics. + """ + + @property + def zid(self) -> ZenohId: + """The :class:`ZenohId` of the remote node.""" + + @property + def whatami(self) -> WhatAmI: + """The :class:`WhatAmI` type of the remote node.""" + + @property + def is_qos(self) -> bool: + """Whether this transport supports QoS (Quality of Service).""" + + @property + def is_multicast(self) -> bool: + """Whether this is a multicast transport.""" + + def __eq__(self, other: Transport) -> bool: ... + def __repr__(self) -> str: ... + +@final +class Link: + """Information about a Zenoh link within a transport. + + A Link represents a single network connection within a transport. + Transports may have multiple links for redundancy or different network paths. + """ + + @property + def zid(self) -> ZenohId: + """The :class:`ZenohId` of the remote node.""" + + @property + def src(self) -> str: + """The source locator of this link.""" + + @property + def dst(self) -> str: + """The destination locator of this link.""" + + @property + def group(self) -> str | None: + """The multicast group this link belongs to, if any.""" + + @property + def mtu(self) -> int: + """The Maximum Transmission Unit (MTU) of this link.""" + + @property + def is_streamed(self) -> bool: + """Whether this link uses a streamed protocol (e.g., TCP) or datagram (e.g., UDP).""" + + @property + def interfaces(self) -> list[str]: + """The network interfaces used by this link.""" + + @property + def auth_identifier(self) -> str | None: + """The authentication identifier for this link, if any.""" + + @property + def priorities(self) -> tuple[int, int] | None: + """The priority range (min, max) for this link, if configured.""" + + @property + def reliability(self) -> Reliability | None: + """The reliability setting for this link, if configured.""" + + def __eq__(self, other: Link) -> bool: ... + def __repr__(self) -> str: ... + +@final +class TransportEvent: + """An event indicating a transport connection was opened or closed. + + TransportEvent is emitted by :class:`TransportEventsListener` when a transport + connection to another Zenoh node is established or terminated. + """ + + @property + def kind(self) -> SampleKind: + """The kind of event: :attr:`SampleKind.PUT` for opened, :attr:`SampleKind.DELETE` for closed.""" + + @property + def transport(self) -> Transport: + """The :class:`Transport` that was opened or closed.""" + + def __repr__(self) -> str: ... + +@final +class TransportEventsListener(Generic[_H]): + """A listener that receives notifications when transport connections open or close. + + The listener is created using :meth:`SessionInfo.declare_transport_events_listener` and + yields :class:`TransportEvent` instances when connections to other Zenoh nodes are + established or terminated. + """ + + def __enter__(self) -> Self: ... + def __exit__(self, *_args, **_kwargs): ... + @property + def handler(self) -> _H: + """The handler associated with this TransportEventsListener instance. + + See :ref:`channels-and-callbacks` for more information on handlers.""" + + def undeclare(self): + """Stop listening for transport events.""" + + def try_recv( + self: TransportEventsListener[Handler[TransportEvent]], + ) -> TransportEvent | None: + """Try to receive a :class:`TransportEvent` without blocking.""" + + def recv(self: TransportEventsListener[Handler[TransportEvent]]) -> TransportEvent: + """Receive a :class:`TransportEvent`, blocking until one is available.""" + + def __iter__( + self: TransportEventsListener[Handler[TransportEvent]], + ) -> Handler[TransportEvent]: + """Iterate over received :class:`TransportEvent` instances.""" + +@final +class LinkEvent: + """An event indicating a link was added or removed. + + LinkEvent is emitted by :class:`LinkEventsListener` when a link + within a transport is established or terminated. + """ + + @property + def kind(self) -> SampleKind: + """The kind of event: :attr:`SampleKind.PUT` for added, :attr:`SampleKind.DELETE` for removed.""" + + @property + def link(self) -> Link: + """The :class:`Link` that was added or removed.""" + + def __repr__(self) -> str: ... + +@final +class LinkEventsListener(Generic[_H]): + """A listener that receives notifications when links are added or removed. + + The listener is created using :meth:`SessionInfo.declare_link_events_listener` and + yields :class:`LinkEvent` instances when links within transports are + established or terminated. + """ + + def __enter__(self) -> Self: ... + def __exit__(self, *_args, **_kwargs): ... + @property + def handler(self) -> _H: + """The handler associated with this LinkEventsListener instance. + + See :ref:`channels-and-callbacks` for more information on handlers.""" + + def undeclare(self): + """Stop listening for link events.""" + + def try_recv(self: LinkEventsListener[Handler[LinkEvent]]) -> LinkEvent | None: + """Try to receive a :class:`LinkEvent` without blocking.""" + + def recv(self: LinkEventsListener[Handler[LinkEvent]]) -> LinkEvent: + """Receive a :class:`LinkEvent`, blocking until one is available.""" + + def __iter__(self: LinkEventsListener[Handler[LinkEvent]]) -> Handler[LinkEvent]: + """Iterate over received :class:`LinkEvent` instances.""" + @_unstable @final class SetIntersectionLevel(Enum):