Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 174 additions & 0 deletions libby/cli/libby_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from __future__ import annotations

import argparse, json, os, signal, sys, time
from typing import Any, Dict, List, Optional
from libby.libby import Libby

DEFAULT_SELF_ID = "cli"
DEFAULT_BIND = "tcp://127.0.0.1:56001"

def _parse_addr_kv(kv: str) -> tuple[str, str]:
if "=" not in kv:
raise argparse.ArgumentTypeError("Expected 'peerId=tcp://host:port'")
k, v = kv.split("=", 1)
k, v = k.strip(), v.strip()
if not k or not v:
raise argparse.ArgumentTypeError("Expected 'peerId=tcp://host:port'")
return k, v

def _load_book(path: Optional[str]) -> Dict[str, str]:
if not path:
return {}
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
raise SystemExit("--book JSON must be an object mapping peer->endpoint")
return {str(k): str(v) for k, v in data.items()}

def _parse_json(s: Optional[str]) -> Dict[str, Any]:
if not s:
return {}
try:
return json.loads(s)
except Exception as ex:
raise SystemExit(f"--data must be JSON: {ex}")

def _self_id(ns: argparse.Namespace) -> str:
return ns.self_id or os.environ.get("LIBBY_SELF_ID", DEFAULT_SELF_ID)

def _bind(ns: argparse.Namespace) -> str:
return ns.bind or os.environ.get("LIBBY_BIND", DEFAULT_BIND)

def _mk_libby(self_id: str, bind: str, book: Dict[str, str]) -> Libby:
lib = Libby.zmq(
self_id=self_id,
bind=bind,
address_book=book,
keys=[],
callback=None,
discover=True,
discover_interval_s=2.0,
hello_on_start=True,
)
try:
lib.hello()
except Exception:
pass
return lib

def cmd_req(ns: argparse.Namespace) -> int:
book = _load_book(ns.book)
for kv in ns.addr or []:
k, v = _parse_addr_kv(kv); book[k] = v

self_id = _self_id(ns)
bind = _bind(ns)
payload = _parse_json(ns.data)

lib: Optional[Libby] = None
try:
lib = _mk_libby(self_id, bind, book)
ttl_ms = int(ns.ttl_ms) if ns.ttl_ms is not None else int(ns.timeout * 1000.0)
res = lib.rpc(ns.peer, ns.key, payload, ttl_ms=ttl_ms)
print(json.dumps(res, indent=2 if ns.raw_json else 2))
return 0 if res.get("status") == "delivered" else 2
except KeyboardInterrupt:
return 130
except Exception as ex:
print(f"libby-cli req: {ex}", file=sys.stderr)
return 2
finally:
if lib:
try: lib.stop()
except Exception: pass

def cmd_sub(ns: argparse.Namespace) -> int:
topics: List[str] = ns.topics
if not topics:
print("sub: provide at least one topic", file=sys.stderr)
return 2

book = _load_book(ns.book)
for kv in ns.addr or []:
k, v = _parse_addr_kv(kv); book[k] = v

self_id = _self_id(ns)
bind = _bind(ns)

lib: Optional[Libby] = None
stop = False

def on_sig(_s, _f):
nonlocal stop; stop = True

signal.signal(signal.SIGINT, on_sig)
signal.signal(signal.SIGTERM, on_sig)

try:
lib = _mk_libby(self_id, bind, book)

def _printer(msg):
try:
print(json.dumps(
{"source": msg.env.sourceid, "topic": msg.env.key, "payload": msg.env.payload},
indent=2 if ns.raw_json else 2,
))
except Exception as ex:
print(f"[event decode error] {ex}", file=sys.stderr)

for t in topics:
lib.listen(t, _printer)
lib.subscribe(*topics)

print(f"[libby sub] up: id={self_id} bind={bind} topics={topics}")
while not stop:
time.sleep(0.25)
return 0
except KeyboardInterrupt:
return 130
except Exception as ex:
print(f"libby-cli sub: {ex}", file=sys.stderr)
return 2
finally:
if lib:
try: lib.stop()
except Exception: pass
print("[libby sub] stopped")

def build_parser() -> argparse.ArgumentParser:
ap = argparse.ArgumentParser(
prog="libby-cli",
description="Simple Libby CLI: request a key or subscribe to topics."
)
sub = ap.add_subparsers(dest="cmd", required=True)

def common(p):
p.add_argument("--self-id", help=f"Local peer id (default: {DEFAULT_SELF_ID} or $LIBBY_SELF_ID)")
p.add_argument("--bind", help=f"Local ROUTER bind (default: {DEFAULT_BIND} or $LIBBY_BIND)")
p.add_argument("--book", help="Path to JSON {peer_id:'tcp://host:port'}")
p.add_argument("--addr", action="append", metavar="peer=tcp://host:port",
help="Add/override address-book entry (repeatable)")
p.add_argument("--raw-json", action="store_true", help="Pretty-print JSON")

pr = sub.add_parser("req", help="Send a keyed request (RPC) to a peer and print the response")
common(pr)
pr.add_argument("-p", "--peer", required=True, help="Destination peer id")
pr.add_argument("-k", "--key", required=True, help="Key to request (service name)")
pr.add_argument("-d", "--data", help="JSON payload to send (default: {})")
pr.add_argument("--timeout", type=float, default=8.0, help="Timeout seconds (default 8.0)")
pr.add_argument("--ttl-ms", type=int, help="Override TTL ms (default: timeout*1000)")
pr.set_defaults(func=cmd_req)

ps = sub.add_parser("sub", help="Subscribe to one or more topics and print publishes")
common(ps)
ps.add_argument("topics", nargs="+", help="Topic(s) to subscribe to")
ps.set_defaults(func=cmd_sub)

return ap

def main(argv: Optional[List[str]] = None) -> int:
ns = build_parser().parse_args(argv)
return ns.func(ns)

if __name__ == "__main__":
raise SystemExit(main())
60 changes: 45 additions & 15 deletions libby/zmq_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(self, bind_router: str, address_book: Dict[str, str], my_id: str):
self._router = self._ctx.socket(zmq.ROUTER)
self._router.setsockopt(zmq.LINGER, 0)
self._router.bind(bind_router)
self._router_id_by_peer: Dict[str, bytes] = {}

self._dealers: Dict[str, zmq.Socket] = {}
self._book: Dict[str, str] = dict(address_book)
Expand Down Expand Up @@ -72,17 +73,41 @@ def stop(self) -> None:
def on_receive(self, cb: Callable[[SrcStr, bytes], None]) -> None:
self._cb = cb

def reply_to(self, peer_id: str, frame: bytes) -> bool:
"""
Try to send directly back to the peer using the ROUTER routing-id
we observed on the incoming request. Returns True if used, False otherwise.
"""
rid = self._router_id_by_peer.get(peer_id)
if rid is None:
return False
with self._send_lock:
self._router.send_multipart([rid, frame])
return True

def send(self, dest: DestStr, frame: bytes) -> None:
"""
dest:
- "peer:<peer_id>" -> send direct to that peer via DEALER
- "broadcast:*" -> send to all known peers (fire-and-forget)
- "peer:<peer_id>" or "<peer_id>" -> direct to that peer
- "broadcast:*" -> to all known peers
"""
if dest.startswith("peer:"):
peer_id = dest.split(":", 1)[1]
endpoint = self._book.get(peer_id)
if not endpoint:
return # unknown peer_id; drop silently to match bamboo no-NACK
# 1) broadcast
if dest.startswith("broadcast:"):
with self._send_lock:
for _peer_id, _endpoint in self._book.items():
dealer = self._dealers.get(_peer_id)
if dealer is None:
dealer = self._new_dealer(_peer_id, _endpoint)
self._dealers[_peer_id] = dealer
dealer.send(frame)
return

# 2) normalize to plain id
peer_id = dest.split(":", 1)[1] if dest.startswith("peer:") else dest

# 3) preferred path: DEALER via address_book
endpoint = self._book.get(peer_id)
if endpoint:
dealer = self._dealers.get(peer_id)
if dealer is None:
dealer = self._new_dealer(peer_id, endpoint)
Expand All @@ -91,14 +116,18 @@ def send(self, dest: DestStr, frame: bytes) -> None:
dealer.send(frame)
return

if dest.startswith("broadcast:"):
with self._send_lock:
for peer_id, endpoint in self._book.items():
dealer = self._dealers.get(peer_id)
if dealer is None:
dealer = self._new_dealer(peer_id, endpoint)
self._dealers[peer_id] = dealer
dealer.send(frame)
# 4) fallback: reply via ROUTER if we cached this peer's routing-id
rid_map = getattr(self, "_router_id_by_peer", None)
if rid_map is not None:
rid = rid_map.get(peer_id)
if rid is not None:
with self._send_lock:
self._router.send_multipart([rid, frame])
return

# 5) unknown route -> drop silently (matches bamboo no-NACK semantics)
return


def add_peer(self, peer_id: str, endpoint: str) -> None:
"""Dynamically add or update an endpoint for a peer."""
Expand Down Expand Up @@ -145,6 +174,7 @@ def _rx_loop(self) -> None:
except Exception:
src_peer = "unknown"

self._router_id_by_peer[src_peer] = ident
if self._cb:
# Pass the *remote* peer id as the source
self._cb(f"peer:{src_peer}", payload)
1 change: 1 addition & 0 deletions peers/peer_a.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class PeerA(LibbyDaemon):
address_book = {
"peer-B": "tcp://127.0.0.1:5556",
"peer-C": "tcp://127.0.0.1:5557",
"cli": "tcp://127.0.0.1:56001",
}

# Transport selection: "zmq" (default) or "rabbitmq"
Expand Down
2 changes: 2 additions & 0 deletions peers/peer_b.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class PeerB(LibbyDaemon):
address_book = {
"peer-A": "tcp://127.0.0.1:5555",
"peer-C": "tcp://127.0.0.1:5557",
"peer-D": "tcp://127.0.0.1:5558",
"cli": "tcp://127.0.0.1:56001",
}

# Transport selection: "zmq" (default) or "rabbitmq"
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ Repository = "https://github.com/yourusername/your-repo"
where = ["."]
include = ["libby", "libby.*"]
exclude = ["peers*", "package*"]

[project.scripts]
lshow = "libby.cli.lshow:main"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would that be lshow = "libby.cli.libby_cli:main" instead?