Skip to content
Open
26 changes: 26 additions & 0 deletions examples/z_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# Contributors:
# ZettaScale Zenoh Team, <zenoh@zettascale.tech>
#
import time

import zenoh


Expand All @@ -24,6 +26,30 @@ def main(conf: zenoh.Config):
print(f"zid: {info.zid()}")
print(f"routers: {info.routers_zid()}")
print(f"peers: {info.peers_zid()}")
print("transports:")
for t in info.transports():
print(f" - {t}")
print("links:")
for l in info.links():
print(f" - {l}")

# listen for transport and link events using try_recv polling
transport_listener = info.declare_transport_events_listener(history=False)
link_listener = info.declare_link_events_listener(history=False)

print("Listening for transport and link events... (press Ctrl+C to exit)")
try:
while True:
while (event := transport_listener.try_recv()) is not None:
print(f"Transport event: {event}")
while (event := link_listener.try_recv()) is not None:
print(f"Link event: {event}")
time.sleep(0.1)
except KeyboardInterrupt:
pass

transport_listener.undeclare()
link_listener.undeclare()


# --- Command line argument parsing --- --- --- --- --- ---
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ pub(crate) mod zenoh {
},
sample::{Locality, Sample, SampleKind, SourceInfo},
scouting::{scout, Hello, Scout},
session::{open, EntityGlobalId, Session, SessionInfo},
session::{
open, EntityGlobalId, Link, LinkEvent, LinkEventsListener, Session, SessionInfo,
Transport, TransportEvent, TransportEventsListener,
},
time::{Timestamp, TimestampId, NTP64},
ZError,
};
Expand Down
289 changes: 285 additions & 4 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,23 @@ use std::time::Duration;

use pyo3::{
prelude::*,
types::{PyDict, PyList, PyTuple},
types::{PyDict, PyIterator, PyList, PyTuple},
IntoPyObjectExt,
};
use zenoh::{session::EntityId, Wait};

use crate::{
bytes::{Encoding, ZBytes},
cancellation::CancellationToken,
config::{Config, ZenohId},
config::{Config, WhatAmI, ZenohId},
handlers::{into_handler, HandlerImpl},
key_expr::KeyExpr,
liveliness::Liveliness,
macros::{build, wrapper},
macros::{build, option_wrapper, wrapper},
pubsub::{Publisher, Subscriber},
qos::{CongestionControl, Priority, Reliability},
query::{Querier, QueryConsolidation, QueryTarget, Queryable, Reply, ReplyKeyExpr, Selector},
sample::{Locality, SourceInfo},
sample::{Locality, SampleKind, SourceInfo},
time::Timestamp,
utils::{duration, wait, IntoPython, MapInto},
};
Expand Down Expand Up @@ -311,6 +312,150 @@ pub(crate) fn open(py: Python, config: Config) -> PyResult<Session> {

wrapper!(zenoh::session::SessionInfo);

wrapper!(zenoh::session::Transport);

#[pymethods]
impl Transport {
#[getter]
fn zid(&self) -> ZenohId {
(*self.0.zid()).into()
}

#[getter]
fn whatami(&self) -> WhatAmI {
self.0.whatami().into()
}

#[getter]
fn is_qos(&self) -> bool {
self.0.is_qos()
}

#[cfg(feature = "shared-memory")]
#[getter]
fn is_shm(&self) -> bool {
self.0.is_shm()
}

#[getter]
fn is_multicast(&self) -> bool {
self.0.is_multicast()
}

fn __eq__(&self, other: &Transport) -> bool {
self.0 == other.0
}

fn __repr__(&self) -> String {
format!("{:?}", self.0)
}
}

wrapper!(zenoh::session::Link);

#[pymethods]
impl Link {
#[getter]
fn zid(&self) -> ZenohId {
(*self.0.zid()).into()
}

#[getter]
fn src(&self) -> String {
self.0.src().to_string()
}

#[getter]
fn dst(&self) -> String {
self.0.dst().to_string()
}

#[getter]
fn group(&self) -> Option<String> {
self.0.group().map(|g| g.to_string())
}

#[getter]
fn mtu(&self) -> u16 {
self.0.mtu()
}

#[getter]
fn is_streamed(&self) -> bool {
self.0.is_streamed()
}

#[getter]
fn interfaces<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
let list = PyList::empty(py);
for interface in self.0.interfaces() {
list.append(interface)?;
}
Ok(list)
}

#[getter]
fn auth_identifier(&self) -> Option<String> {
self.0.auth_identifier().map(|s| s.to_string())
}

#[getter]
fn priorities(&self) -> Option<(u8, u8)> {
self.0.priorities()
}

#[getter]
fn reliability(&self) -> Option<Reliability> {
self.0.reliability().map(Into::into)
}

fn __eq__(&self, other: &Link) -> bool {
self.0 == other.0
}

fn __repr__(&self) -> String {
format!("{:?}", self.0)
}
}

wrapper!(zenoh::session::TransportEvent);

#[pymethods]
impl TransportEvent {
#[getter]
fn kind(&self) -> SampleKind {
self.0.kind().into()
}

#[getter]
fn transport(&self) -> Transport {
self.0.transport().clone().into()
}

fn __repr__(&self) -> String {
format!("{:?}", self.0)
}
}

wrapper!(zenoh::session::LinkEvent);

#[pymethods]
impl LinkEvent {
#[getter]
fn kind(&self) -> SampleKind {
self.0.kind().into()
}

#[getter]
fn link(&self) -> Link {
self.0.link().clone().into()
}

fn __repr__(&self) -> String {
format!("{:?}", self.0)
}
}

#[pymethods]
impl SessionInfo {
fn zid(&self, py: Python) -> ZenohId {
Expand All @@ -333,9 +478,145 @@ impl SessionInfo {
Ok(list)
}

fn transports<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
let list = PyList::empty(py);
for transport in py.allow_threads(|| self.0.transports().wait()) {
list.append(transport.into_pyobject(py))?;
}
Ok(list)
}

fn links<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
let list = PyList::empty(py);
for link in py.allow_threads(|| self.0.links().wait()) {
list.append(link.into_pyobject(py))?;
}
Ok(list)
}

#[pyo3(signature = (handler = None, *, history = None))]
fn declare_transport_events_listener(
&self,
py: Python,
handler: Option<&Bound<PyAny>>,
history: Option<bool>,
) -> PyResult<TransportEventsListener> {
let (handler, background) = into_handler(py, handler, None)?;
let builder = build!(self.0.transport_events_listener(), history);
let mut listener = wait(py, builder.with(handler))?;
if background {
listener.set_background(true);
}
Ok(listener.into())
}

#[pyo3(signature = (handler = None, *, history = None))]
fn declare_link_events_listener(
&self,
py: Python,
handler: Option<&Bound<PyAny>>,
history: Option<bool>,
) -> PyResult<LinkEventsListener> {
let (handler, background) = into_handler(py, handler, None)?;
let builder = build!(self.0.link_events_listener(), history);
let mut listener = wait(py, builder.with(handler))?;
if background {
listener.set_background(true);
}
Ok(listener.into())
}

// TODO __repr__
}

option_wrapper!(
zenoh::session::TransportEventsListener<HandlerImpl<TransportEvent>>,
"Undeclared transport events listener"
);

#[pymethods]
impl TransportEventsListener {
fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> &'a Bound<'py, Self> {
this
}

#[pyo3(signature = (*_args, **_kwargs))]
fn __exit__(
&mut self,
py: Python,
_args: &Bound<PyTuple>,
_kwargs: Option<&Bound<PyDict>>,
) -> PyResult<PyObject> {
self.undeclare(py)?;
Ok(py.None())
}

#[getter]
fn handler(&self, py: Python) -> PyResult<PyObject> {
self.get_ref()?.handler().into_py_any(py)
}

fn try_recv(&self, py: Python) -> PyResult<PyObject> {
self.get_ref()?.handler().try_recv(py)
}

fn recv(&self, py: Python) -> PyResult<PyObject> {
self.get_ref()?.handler().recv(py)
}

fn undeclare(&mut self, py: Python) -> PyResult<()> {
wait(py, self.take()?.undeclare())
}

fn __iter__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyIterator>> {
self.handler(py)?.bind(py).try_iter()
}
}

option_wrapper!(
zenoh::session::LinkEventsListener<HandlerImpl<LinkEvent>>,
"Undeclared link events listener"
);

#[pymethods]
impl LinkEventsListener {
fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> &'a Bound<'py, Self> {
this
}

#[pyo3(signature = (*_args, **_kwargs))]
fn __exit__(
&mut self,
py: Python,
_args: &Bound<PyTuple>,
_kwargs: Option<&Bound<PyDict>>,
) -> PyResult<PyObject> {
self.undeclare(py)?;
Ok(py.None())
}

#[getter]
fn handler(&self, py: Python) -> PyResult<PyObject> {
self.get_ref()?.handler().into_py_any(py)
}

fn try_recv(&self, py: Python) -> PyResult<PyObject> {
self.get_ref()?.handler().try_recv(py)
}

fn recv(&self, py: Python) -> PyResult<PyObject> {
self.get_ref()?.handler().recv(py)
}

fn undeclare(&mut self, py: Python) -> PyResult<()> {
wait(py, self.take()?.undeclare())
}

fn __iter__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyIterator>> {
self.handler(py)?.bind(py).try_iter()
}
}

wrapper!(zenoh::session::EntityGlobalId: Clone);

#[pymethods]
Expand Down
Loading
Loading