diff --git a/libby/__init__.py b/libby/__init__.py index 8562ce6..57ef96d 100644 --- a/libby/__init__.py +++ b/libby/__init__.py @@ -3,5 +3,6 @@ from bamboo.keys import KeyRegistry from .zmq_transport import ZmqTransport from .libby import Libby +from .mqtt_rpc_client import MqttRpcClient -__all__ = ["Libby", "ZmqTransport", "Protocol", "MessageBuilder", "KeyRegistry"] +__all__ = ["Libby", "ZmqTransport", "Protocol", "MessageBuilder", "KeyRegistry", "MqttRpcClient"] diff --git a/libby/mqtt_dummy_device.py b/libby/mqtt_dummy_device.py new file mode 100644 index 0000000..de528d9 --- /dev/null +++ b/libby/mqtt_dummy_device.py @@ -0,0 +1,82 @@ +import json +import time +import paho.mqtt.client as mqtt + +MQTT_HOST = "localhost" +MQTT_PORT = 1883 +BASE = "mktl/dev-001" +REQ_TOPIC = f"{BASE}/req" +RESP_TOPIC = f"{BASE}/resp" +TELEMETRY_TOPIC = f"{BASE}/telemetry" + +state = { + "switch": False, + "last_ts": time.time(), +} + +def make_resp(msg_id: str, ok: bool, payload: dict | None = None, error: str | None = None) -> dict: + return { + "type": "response", + "ok": ok, + "payload": payload or {}, + "error": error, + "msg_id": msg_id, + "ts": time.time(), + } + +def on_connect(client, userdata, flags, rc): + client.subscribe(REQ_TOPIC, qos=1) + print("[dummy] online, listening", REQ_TOPIC) + +def on_message(client, userdata, msg): + try: + req = json.loads(msg.payload.decode("utf-8")) + except Exception: + return + + if msg.topic != REQ_TOPIC: + return + + action = req.get("action", "") + payload = req.get("payload") or {} + msg_id = req.get("msg_id", "") + + if action in ("device.status.get", "device.switch.get"): + resp = make_resp(msg_id, True, {"switch": state["switch"], "ts": time.time()}) + elif action == "device.switch.set": + desired = payload.get("on", None) + if isinstance(desired, bool): + state["switch"] = desired + state["last_ts"] = time.time() + print(f"[dummy] switch -> {state['switch']}") + resp = make_resp(msg_id, True, {"switch": state["switch"], "ts": state["last_ts"]}) + else: + resp = make_resp(msg_id, False, error="payload.on must be boolean") + else: + resp = make_resp(msg_id, False, error=f"unknown action '{action}'") + + client.publish(RESP_TOPIC, json.dumps(resp), qos=1, retain=False) + +def main(): + c = mqtt.Client(client_id="dummy-dev-001") + c.on_connect = on_connect + c.on_message = on_message + c.connect(MQTT_HOST, MQTT_PORT, keepalive=30) + + # periodic status + def publish_telemetry(): + t = {"switch": state["switch"], "uptime_s": int(time.time() - state["last_ts"]), "ts": time.time()} + c.publish(TELEMETRY_TOPIC, json.dumps(t), qos=0, retain=False) + + c.loop_start() + try: + while True: + publish_telemetry() + time.sleep(5) + except KeyboardInterrupt: + pass + finally: + c.loop_stop() + +if __name__ == "__main__": + main() diff --git a/libby/mqtt_rpc_client.py b/libby/mqtt_rpc_client.py new file mode 100644 index 0000000..98aaffa --- /dev/null +++ b/libby/mqtt_rpc_client.py @@ -0,0 +1,63 @@ +import json, time, uuid, threading +import paho.mqtt.client as mqtt + + +BASE = "mktl/dev-001" +REQ_TOPIC = f"{BASE}/req" +RESP_TOPIC = f"{BASE}/resp" +TELEMETRY_TOPIC = f"{BASE}/telemetry" +VERBOSE = True + +class MqttRpcClient: + def __init__(self, client_id="libby-bridge-mqtt", host="localhost", port=1883): + self._c = mqtt.Client(client_id=client_id) + self._c.on_connect = self._on_connect + self._c.on_message = self._on_message + self._host, self._port = host, port + self._waiters = {} + self._lock = threading.Lock() + self._connected = threading.Event() + + def start(self): + self._c.connect(self._host, self._port, keepalive=30) + self._c.loop_start() + self._connected.wait(5) + + def _on_connect(self, client, userdata, flags, rc, properties=None): + client.subscribe(RESP_TOPIC, qos=1) + client.subscribe(TELEMETRY_TOPIC, qos=0) + if VERBOSE: + print(f"[bridge] MQTT connected; sub {RESP_TOPIC}, {TELEMETRY_TOPIC}") + self._connected.set() + + def _on_message(self, client, userdata, msg): + try: + data = json.loads(msg.payload.decode("utf-8")) + except Exception: + if VERBOSE: print(f"[bridge] MQTT ← {msg.topic}: ") + return + if VERBOSE: print(f"[bridge] MQTT ← {msg.topic}: {data}") + if msg.topic == RESP_TOPIC: + msg_id = data.get("msg_id") + if not msg_id: return + with self._lock: + w = self._waiters.get(msg_id) + if w: + w["resp"] = data + w["ev"].set() + + def call(self, action: str, payload=None, timeout=5.0) -> dict: + msg_id = str(uuid.uuid4()) + req = {"type": "request", "action": action, "payload": payload or {}, "msg_id": msg_id, "ts": time.time()} + if VERBOSE: print(f"[bridge] MQTT β†’ {REQ_TOPIC}: {req}") + ev = threading.Event() + with self._lock: + self._waiters[msg_id] = {"ev": ev, "resp": None} + self._c.publish(REQ_TOPIC, json.dumps(req), qos=1) + if not ev.wait(timeout): + with self._lock: + self._waiters.pop(msg_id, None) + raise TimeoutError(f"MQTT RPC timeout for '{action}'") + with self._lock: + resp = self._waiters.pop(msg_id)["resp"] + return resp or {"ok": False, "error": "empty response"} diff --git a/package/module.py b/package/module.py deleted file mode 100644 index 8f2e068..0000000 --- a/package/module.py +++ /dev/null @@ -1,14 +0,0 @@ -import random - -def random_celestial_emoji(): - """Returns a random celestial emoji.""" - emojis = [ - "🌞", # Sun - "🌝", # Full Moon - "🌚", # New Moon - "🌍", # Earth - "🌟", # Star - "🌠", # Shooting Star - "🌌", # Milky Way - ] - return random.choice(emojis) diff --git a/peers/peer_mqtt_bridge.py b/peers/peer_mqtt_bridge.py new file mode 100644 index 0000000..0324efc --- /dev/null +++ b/peers/peer_mqtt_bridge.py @@ -0,0 +1,63 @@ +import time +from libby import Libby +from libby import MqttRpcClient + +PEER_ID = "peer-bridge" +BIND = "tcp://*:5557" +ADDRESS_BOOK = {} +EXPOSED_KEYS = ["device.status.get", "device.switch.get", "device.switch.set"] +VERBOSE = True + + +def make_libby_handler(mqtt_rpc: MqttRpcClient): + def handle(payload: dict, meta: dict) -> dict: + key = meta.get("key", "") + ttl_ms = meta.get("ttl_ms", 5000) + + if VERBOSE: + print(f"[bridge] Libby ← key={key} payload={payload} meta={{src:{meta.get('src')}, dst:{meta.get('dst')}, ttl_ms:{ttl_ms}}}") + + if key not in EXPOSED_KEYS: + return {"error": f"unknown key '{key}'"} + + try: + resp = mqtt_rpc.call(key, payload or {}, timeout=ttl_ms / 1000.0) + except TimeoutError as e: + if VERBOSE: + print(f"[bridge] Libby β†’ timeout for {key}: {e}") + return {"error": str(e)} + + if not isinstance(resp, dict): + return {"error": "malformed device response"} + + if not resp.get("ok", False): + err = resp.get("error", "device error") + if VERBOSE: + print(f"[bridge] Libby β†’ device error for {key}: {err}") + return {"error": err} + + out = resp.get("payload", {}) + if VERBOSE: + print(f"[bridge] Libby β†’ payload for {key}: {out}") + return out + return handle + +def main(): + mqtt_rpc = MqttRpcClient() + mqtt_rpc.start() + with Libby.zmq( + self_id=PEER_ID, + bind=BIND, + address_book=ADDRESS_BOOK, + keys=EXPOSED_KEYS, # advertise keys the bridge serves + callback=make_libby_handler(mqtt_rpc), + discover=True, + discover_interval_s=1.5, + hello_on_start=True, + ) as libby: + print(f"[{PEER_ID}] Libby↔MQTT bridge online. Serving: {', '.join(EXPOSED_KEYS)}") + while True: + time.sleep(1) + +if __name__ == "__main__": + main() diff --git a/peers/peer_query_the_bridge.py b/peers/peer_query_the_bridge.py new file mode 100644 index 0000000..8908852 --- /dev/null +++ b/peers/peer_query_the_bridge.py @@ -0,0 +1,42 @@ +from libby import Libby + +PEER_ID = "peer-controller" +BIND = "tcp://*:5558" +ADDRESS_BOOK = { + "peer-bridge": "tcp://127.0.0.1:5557", +} +BRIDGE = "peer-bridge" + +def main(): + with Libby.zmq( + self_id=PEER_ID, + bind=BIND, + address_book=ADDRESS_BOOK, + discover=True, + discover_interval_s=1.5, + hello_on_start=True, + ) as libby: + libby.learn_peer_keys(BRIDGE, [ + "device.status.get", + "device.switch.get", + "device.switch.set", + ]) + + print("[ctl] β†’ device.status.get") + r = libby.request(BRIDGE, key="device.status.get", payload={}, ttl_ms=8000) + print("[ctl] ←", r) + + print("[ctl] β†’ device.switch.set(on=True)") + r = libby.request(BRIDGE, key="device.switch.set", payload={"on": True}, ttl_ms=8000) + print("[ctl] ←", r) + + print("[ctl] β†’ device.switch.get") + r = libby.request(BRIDGE, key="device.switch.get", payload={}, ttl_ms=8000) + print("[ctl] ←", r) + + print("[ctl] β†’ device.switch.set(on=False)") + r = libby.request(BRIDGE, key="device.switch.set", payload={"on": False}, ttl_ms=8000) + print("[ctl] ←", r) + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 3f554a6..995902d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ authors = [ ] dependencies = [ "pyzmq>=25.0.0", + "paho-mqtt>=2.1.0", "bamboo @ git+https://github.com/CaltechOpticalObservatories/bamboo.git@main" ]