Skip to content

Commit 5270e8c

Browse files
authored
[Serve] Add unit tests for NodePortManager (#61491)
## Why `NodePortManager` handles per-node port allocation for direct ingress replicas but has zero unit test coverage in OSS. Any regression in allocation, release, or cleanup logic would go undetected until integration tests or production. ## What Add `test_node_port_manager.py` covering: - HTTP/gRPC port allocation and release - Port reuse after release - Port blocking (post-release exclusion) - Re-allocation of an already-allocated replica (idempotency) - Cleanup/pruning of inactive replicas and stale node managers - Port exhaustion (`NoAvailablePortError`) - Invalid port release (wrong port number) - Concurrent allocation within a single node - Mixed-protocol allocation for the same replica - Blocked port persistence across allocation cycles - `get_port` error path for unallocated replicas All tests use monkeypatched port ranges (5 ports each for HTTP/gRPC) to keep execution fast and deterministic. ## Test plan - `pytest python/ray/serve/tests/unit/test_node_port_manager.py -v` — 11/11 passed locally Signed-off-by: Seiji Eicher <seiji@anyscale.com>
1 parent 462e4c9 commit 5270e8c

1 file changed

Lines changed: 197 additions & 0 deletions

File tree

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
import sys
2+
from typing import Set
3+
4+
import pytest
5+
6+
from ray.serve._private.common import RequestProtocol
7+
from ray.serve._private.node_port_manager import NoAvailablePortError, NodePortManager
8+
9+
10+
@pytest.fixture
11+
def port_range_constants():
12+
"""Fixture to set up port range constants for testing."""
13+
return {
14+
"RAY_SERVE_DIRECT_INGRESS_MIN_HTTP_PORT": 8000,
15+
"RAY_SERVE_DIRECT_INGRESS_MAX_HTTP_PORT": 8005,
16+
"RAY_SERVE_DIRECT_INGRESS_MIN_GRPC_PORT": 9000,
17+
"RAY_SERVE_DIRECT_INGRESS_MAX_GRPC_PORT": 9005,
18+
}
19+
20+
21+
@pytest.fixture(autouse=True)
22+
def setup_port_constants(monkeypatch, port_range_constants):
23+
monkeypatch.setattr(
24+
"ray.serve._private.node_port_manager.RAY_SERVE_DIRECT_INGRESS_MIN_HTTP_PORT",
25+
port_range_constants["RAY_SERVE_DIRECT_INGRESS_MIN_HTTP_PORT"],
26+
)
27+
monkeypatch.setattr(
28+
"ray.serve._private.node_port_manager.RAY_SERVE_DIRECT_INGRESS_MAX_HTTP_PORT",
29+
port_range_constants["RAY_SERVE_DIRECT_INGRESS_MAX_HTTP_PORT"],
30+
)
31+
monkeypatch.setattr(
32+
"ray.serve._private.node_port_manager.RAY_SERVE_DIRECT_INGRESS_MIN_GRPC_PORT",
33+
port_range_constants["RAY_SERVE_DIRECT_INGRESS_MIN_GRPC_PORT"],
34+
)
35+
monkeypatch.setattr(
36+
"ray.serve._private.node_port_manager.RAY_SERVE_DIRECT_INGRESS_MAX_GRPC_PORT",
37+
port_range_constants["RAY_SERVE_DIRECT_INGRESS_MAX_GRPC_PORT"],
38+
)
39+
yield
40+
41+
42+
def test_http_port_allocation_and_release(port_range_constants):
43+
manager = NodePortManager.get_node_manager("node-1")
44+
port = manager.allocate_port("replica-1", RequestProtocol.HTTP)
45+
assert (
46+
port_range_constants["RAY_SERVE_DIRECT_INGRESS_MIN_HTTP_PORT"]
47+
<= port
48+
< port_range_constants["RAY_SERVE_DIRECT_INGRESS_MAX_HTTP_PORT"]
49+
)
50+
51+
# Port should be reusable after release
52+
manager.release_port("replica-1", port, RequestProtocol.HTTP)
53+
port2 = manager.allocate_port("replica-2", RequestProtocol.HTTP)
54+
assert port2 == port
55+
56+
57+
def test_grpc_port_allocation_and_blocking():
58+
manager = NodePortManager.get_node_manager("node-2")
59+
port = manager.allocate_port("replica-a", RequestProtocol.GRPC)
60+
manager.release_port("replica-a", port, RequestProtocol.GRPC, block_port=True)
61+
62+
# New allocation should skip blocked port
63+
allocated_ports = set()
64+
for i in range(4):
65+
new_port = manager.allocate_port(f"replica-b{i}", RequestProtocol.GRPC)
66+
assert new_port != port
67+
allocated_ports.add(new_port)
68+
69+
assert len(allocated_ports) == 4
70+
71+
72+
def test_reuse_existing_port_if_already_allocated():
73+
manager = NodePortManager.get_node_manager("node-3")
74+
port1 = manager.allocate_port("replica-x", RequestProtocol.HTTP)
75+
port2 = manager.allocate_port("replica-x", RequestProtocol.HTTP)
76+
assert port1 == port2
77+
78+
79+
def test_cleanup_releases_ports():
80+
manager = NodePortManager.get_node_manager("node-4")
81+
allocated = set()
82+
83+
# Allocate all HTTP ports
84+
for i in range(5):
85+
port = manager.allocate_port(f"replica-{i}", RequestProtocol.HTTP)
86+
allocated.add(port)
87+
88+
# Only keep some replicas active
89+
active_replica_ids: Set[str] = {"replica-0", "replica-1"}
90+
NodePortManager.prune({"node-4": active_replica_ids})
91+
92+
# We should be able to reallocate some of the cleaned-up ports
93+
new_port = manager.allocate_port("replica-new", RequestProtocol.HTTP)
94+
assert new_port in allocated - {
95+
manager.get_port("replica-0", RequestProtocol.HTTP),
96+
manager.get_port("replica-1", RequestProtocol.HTTP),
97+
}
98+
99+
100+
def test_node_manager_cleanup():
101+
NodePortManager.get_node_manager("node-5")
102+
assert "node-5" in NodePortManager._node_managers
103+
104+
NodePortManager.prune(node_id_to_alive_replica_ids={})
105+
assert "node-5" not in NodePortManager._node_managers
106+
107+
108+
def test_port_exhaustion():
109+
"""Test behavior when all ports are exhausted."""
110+
manager = NodePortManager.get_node_manager("node-6")
111+
112+
# Allocate all HTTP ports
113+
allocated_ports = set()
114+
for i in range(5): # 5 ports available (8000-8004)
115+
port = manager.allocate_port(f"replica-{i}", RequestProtocol.HTTP)
116+
allocated_ports.add(port)
117+
118+
# Try to allocate one more port
119+
with pytest.raises(NoAvailablePortError, match="No available"):
120+
manager.allocate_port("replica-extra", RequestProtocol.HTTP)
121+
122+
123+
def test_invalid_port_release():
124+
"""Test error handling when releasing invalid ports."""
125+
manager = NodePortManager.get_node_manager("node-7")
126+
127+
# Allocate a port
128+
port = manager.allocate_port("replica-1", RequestProtocol.HTTP)
129+
130+
# Try to release with wrong port number
131+
with pytest.raises(AssertionError, match="port mismatch"):
132+
manager.release_port("replica-1", port + 1, RequestProtocol.HTTP)
133+
134+
135+
def test_concurrent_port_allocation():
136+
"""Test concurrent port allocation from multiple coroutines."""
137+
manager = NodePortManager.get_node_manager("node-8")
138+
139+
def allocate_port(replica_id: str):
140+
return manager.allocate_port(replica_id, RequestProtocol.HTTP)
141+
142+
# Allocate ports concurrently
143+
ports = [allocate_port(f"replica-{i}") for i in range(5)]
144+
145+
# Verify all ports are unique and in range
146+
assert len(set(ports)) == len(ports)
147+
assert all(8000 <= port < 8005 for port in ports)
148+
149+
150+
def test_mixed_protocol_port_allocation():
151+
"""Test allocating both HTTP and gRPC ports for the same replica."""
152+
manager = NodePortManager.get_node_manager("node-9")
153+
154+
http_port = manager.allocate_port("replica-1", RequestProtocol.HTTP)
155+
grpc_port = manager.allocate_port("replica-1", RequestProtocol.GRPC)
156+
157+
assert 8000 <= http_port < 8005
158+
assert 9000 <= grpc_port < 9005
159+
assert http_port != grpc_port
160+
161+
162+
def test_port_blocking_persistence():
163+
"""Test that blocked ports remain blocked across multiple allocations."""
164+
manager = NodePortManager.get_node_manager("node-10")
165+
166+
# Block a port
167+
port = manager.allocate_port("replica-1", RequestProtocol.HTTP)
168+
manager.release_port("replica-1", port, RequestProtocol.HTTP, block_port=True)
169+
170+
# Try to allocate ports multiple times
171+
for _ in range(3):
172+
new_port = manager.allocate_port("replica-2", RequestProtocol.HTTP)
173+
assert new_port != port
174+
manager.release_port("replica-2", new_port, RequestProtocol.HTTP)
175+
176+
177+
def test_check_replica_port_allocated():
178+
"""Test the check_replica_port_allocated method."""
179+
manager = NodePortManager.get_node_manager("node-11")
180+
181+
# Test case where port is allocated
182+
port = manager.allocate_port("replica-1", RequestProtocol.HTTP)
183+
assert manager.get_port("replica-1", RequestProtocol.HTTP) == port
184+
185+
# Test case where port is not allocated
186+
with pytest.raises(
187+
ValueError,
188+
match="HTTP port not allocated for replica non-existent on node node-11",
189+
):
190+
manager.get_port("non-existent", RequestProtocol.HTTP)
191+
192+
# Clean up
193+
manager.release_port("replica-1", port, RequestProtocol.HTTP)
194+
195+
196+
if __name__ == "__main__":
197+
sys.exit(pytest.main(["-v", "-s", __file__]))

0 commit comments

Comments
 (0)