Skip to content

Commit 07568fc

Browse files
committed
refactor: move standalone CLI into TaskiqCMD subcommand
1 parent 6e8cdc4 commit 07568fc

5 files changed

Lines changed: 101 additions & 94 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ taskiq = "taskiq.__main__:main"
9090
[project.entry-points.taskiq_cli]
9191
worker = "taskiq.cli.worker.cmd:WorkerCMD"
9292
scheduler = "taskiq.cli.scheduler.cmd:SchedulerCMD"
93+
nng-hub = "taskiq.cli.nng.cmd:NNGHubCMD"
9394

9495
[project.entry-points.opentelemetry_instrumentor]
9596
taskiq = "taskiq.instrumentation:TaskiqInstrumentor"

taskiq/brokers/nng/broker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def _ipc_addr(prefix: str = "taskiq-nng") -> str:
4242

4343
class NNGBroker(AsyncBroker):
4444
"""
45-
Taskiq broker backed by a standalone :class:`~taskiq.brokers.nng_hub.NNGHub`.
45+
Taskiq broker backed by a dedicated :class:`~taskiq.brokers.nng_hub.NNGHub`.
4646
4747
The hub must be running before workers or clients start. Launch it with::
4848

taskiq/brokers/nng/hub.py

Lines changed: 2 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
"""
22
NNG hub: central control plane, task dispatcher, and lease manager.
33
4-
Run as a standalone process::
4+
Run as a taskiq sub-command::
55
6-
taskiq-nng-hub --control-addr ipc:///tmp/taskiq-nng.ipc
6+
taskiq nng-hub --control-addr ipc:///tmp/taskiq-nng.ipc
77
88
Or embed it in an application for testing::
99
@@ -14,12 +14,9 @@
1414
"""
1515
from __future__ import annotations
1616

17-
import argparse
1817
import asyncio
1918
import base64
2019
import logging
21-
import os
22-
import signal
2320
import time
2421
import uuid
2522
from contextlib import suppress
@@ -369,91 +366,3 @@ async def _reaper_loop(self) -> None:
369366
raise
370367
except Exception:
371368
logger.exception("Reaper loop error")
372-
373-
374-
# ── standalone CLI entry point ────────────────────────────────────────────────
375-
376-
def _build_config() -> HubConfig:
377-
p = argparse.ArgumentParser(
378-
description="taskiq-nng-hub — NNG task router, dispatcher, and lease manager",
379-
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
380-
)
381-
p.add_argument(
382-
"--control-addr",
383-
default=os.getenv("NNG_CONTROL_ADDR", "ipc:///tmp/taskiq-nng.ipc"),
384-
help="NNG address the hub listens on. Env: NNG_CONTROL_ADDR",
385-
)
386-
p.add_argument(
387-
"--max-pending",
388-
type=int,
389-
default=int(os.getenv("NNG_MAX_PENDING", "10000")),
390-
)
391-
p.add_argument(
392-
"--heartbeat-timeout",
393-
type=float,
394-
default=float(os.getenv("NNG_HEARTBEAT_TIMEOUT", "15.0")),
395-
help="Seconds of silence before a worker is declared dead.",
396-
)
397-
p.add_argument(
398-
"--lease-timeout",
399-
type=float,
400-
default=float(os.getenv("NNG_LEASE_TIMEOUT", "20.0")),
401-
help="Seconds before an unacked task lease is reaped.",
402-
)
403-
p.add_argument(
404-
"--routing-policy",
405-
choices=["least_loaded", "p2c", "round_robin"],
406-
default=os.getenv("NNG_ROUTING_POLICY", "least_loaded"),
407-
)
408-
p.add_argument(
409-
"--control-concurrency",
410-
type=int,
411-
default=int(os.getenv("NNG_CONTROL_CONCURRENCY", "16")),
412-
help="Number of concurrent Rep0 contexts.",
413-
)
414-
p.add_argument(
415-
"--log-level",
416-
default=os.getenv("NNG_LOG_LEVEL", "INFO"),
417-
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
418-
)
419-
args = p.parse_args()
420-
logging.basicConfig(
421-
level=getattr(logging, args.log_level),
422-
format="%(asctime)s %(name)-24s %(levelname)-8s %(message)s",
423-
)
424-
return HubConfig(
425-
control_addr=args.control_addr,
426-
max_pending=args.max_pending,
427-
heartbeat_timeout=args.heartbeat_timeout,
428-
lease_timeout=args.lease_timeout,
429-
routing_policy=args.routing_policy,
430-
control_concurrency=args.control_concurrency,
431-
)
432-
433-
434-
async def _run(config: HubConfig) -> None:
435-
hub = NNGHub(config)
436-
loop = asyncio.get_running_loop()
437-
stop_event = asyncio.Event()
438-
439-
def _on_signal() -> None:
440-
logger.info("Shutdown signal received")
441-
stop_event.set()
442-
443-
for sig in (signal.SIGTERM, signal.SIGINT):
444-
loop.add_signal_handler(sig, _on_signal)
445-
446-
await hub.start()
447-
try:
448-
await stop_event.wait()
449-
finally:
450-
await hub.stop()
451-
452-
453-
def main() -> None:
454-
"""Entry point for the ``taskiq-nng-hub`` CLI command."""
455-
config = _build_config()
456-
try:
457-
asyncio.run(_run(config))
458-
except KeyboardInterrupt:
459-
pass

taskiq/cli/nng/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""NNG CLI command."""

taskiq/cli/nng/cmd.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
from __future__ import annotations
2+
3+
import argparse
4+
import asyncio
5+
import logging
6+
import os
7+
import signal
8+
from collections.abc import Sequence
9+
from contextlib import suppress
10+
11+
from taskiq.abc.cmd import TaskiqCMD
12+
13+
from taskiq.brokers.nng.hub import HubConfig, NNGHub
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
class NNGHubCMD(TaskiqCMD):
19+
"""Command to run the NNG hub."""
20+
21+
short_help = "Run the NNG hub"
22+
23+
def exec(self, args: Sequence[str]) -> int | None:
24+
parser = argparse.ArgumentParser(
25+
prog="taskiq nng-hub",
26+
description="NNG task router, dispatcher, and lease manager",
27+
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
28+
)
29+
parser.add_argument(
30+
"--control-addr",
31+
default=os.getenv("NNG_CONTROL_ADDR", "ipc:///tmp/taskiq-nng.ipc"),
32+
)
33+
parser.add_argument(
34+
"--max-pending",
35+
type=int,
36+
default=int(os.getenv("NNG_MAX_PENDING", "10000")),
37+
)
38+
parser.add_argument(
39+
"--heartbeat-timeout",
40+
type=float,
41+
default=float(os.getenv("NNG_HEARTBEAT_TIMEOUT", "15.0")),
42+
)
43+
parser.add_argument(
44+
"--lease-timeout",
45+
type=float,
46+
default=float(os.getenv("NNG_LEASE_TIMEOUT", "20.0")),
47+
)
48+
parser.add_argument(
49+
"--routing-policy",
50+
choices=["least_loaded", "p2c", "round_robin"],
51+
default=os.getenv("NNG_ROUTING_POLICY", "least_loaded"),
52+
)
53+
parser.add_argument(
54+
"--control-concurrency",
55+
type=int,
56+
default=int(os.getenv("NNG_CONTROL_CONCURRENCY", "16")),
57+
)
58+
parser.add_argument(
59+
"--log-level",
60+
default=os.getenv("NNG_LOG_LEVEL", "INFO"),
61+
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
62+
)
63+
ns = parser.parse_args(list(args))
64+
logging.basicConfig(
65+
level=getattr(logging, ns.log_level),
66+
format="%(asctime)s %(name)-24s %(levelname)-8s %(message)s",
67+
)
68+
config = HubConfig(
69+
control_addr=ns.control_addr,
70+
max_pending=ns.max_pending,
71+
heartbeat_timeout=ns.heartbeat_timeout,
72+
lease_timeout=ns.lease_timeout,
73+
routing_policy=ns.routing_policy,
74+
control_concurrency=ns.control_concurrency,
75+
)
76+
asyncio.run(self._run(config))
77+
return 0
78+
79+
async def _run(self, config: HubConfig) -> None:
80+
hub = NNGHub(config)
81+
loop = asyncio.get_running_loop()
82+
stop_event = asyncio.Event()
83+
84+
def _on_signal() -> None:
85+
logger.info("Shutdown signal received")
86+
stop_event.set()
87+
88+
for sig in (signal.SIGTERM, signal.SIGINT):
89+
with suppress(Exception):
90+
loop.add_signal_handler(sig, _on_signal)
91+
92+
await hub.start()
93+
try:
94+
await stop_event.wait()
95+
finally:
96+
await hub.stop()

0 commit comments

Comments
 (0)