Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions NanoCtrl/lua/register_agent.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Atomically register a peer agent hash and set its TTL.
--
-- KEYS[1] = scoped agent key, e.g. "{scope}:agent:{name}" or "agent:{name}"
-- ARGV[1] = TTL seconds
-- ARGV[2] = "1" to reject an existing key, "0" to allow upsert
-- ARGV[3..] = alternating hash field/value pairs

local key = KEYS[1]
local ttl = tonumber(ARGV[1])
local reject_existing = ARGV[2] == "1"

if reject_existing and redis.call("EXISTS", key) == 1 then
return 0
end

for i = 3, #ARGV, 2 do
redis.call("HSET", key, ARGV[i], ARGV[i + 1])
end

redis.call("EXPIRE", key, ttl)
return 1
2 changes: 1 addition & 1 deletion NanoCtrl/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "maturin"

[project]
name = "nanoctrl"
version = "0.0.7"
version = "0.0.8"
description = "Shared NanoCtrl lifecycle client for NanoInfra services"
requires-python = ">=3.10"
dependencies = ["httpx>=0.24"]
Expand Down
31 changes: 18 additions & 13 deletions NanoCtrl/src/redis_repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub const ENTITY_TTL_SECS: usize = 60;

/// Lua scripts embedded once at compile time.
pub struct LuaScripts {
pub register_agent: String,
pub register_entity: String,
pub unregister_entity: String,
pub heartbeat: String,
Expand All @@ -27,6 +28,7 @@ impl LuaScripts {
/// Load all Lua scripts from embedded source.
pub fn load() -> Result<Self, AppError> {
Ok(Self {
register_agent: include_str!("../lua/register_agent.lua").to_string(),
register_entity: include_str!("../lua/register_entity.lua").to_string(),
unregister_entity: include_str!("../lua/unregister_entity.lua").to_string(),
heartbeat: include_str!("../lua/heartbeat.lua").to_string(),
Expand Down Expand Up @@ -99,15 +101,15 @@ impl RedisRepo {
) -> Result<String, AppError> {
let mut conn = self.conn().await?;

let agent_name = if let Some(alias) = alias {
alias
let (agent_name, alias_provided) = if let Some(alias) = alias {
(alias, true)
} else {
let counter_key = self.scoped_key(scope, &["agent_name_counter"]);
let counter: i64 = redis::cmd("INCR")
.arg(&counter_key)
.query_async(&mut *conn)
.await?;
format!("{name_prefix}-{counter:x}")
(format!("{name_prefix}-{counter:x}"), false)
};

let key = self.scoped_key(scope, &["agent", &agent_name]);
Expand All @@ -116,8 +118,13 @@ impl RedisRepo {
.map(|d| d.as_secs())
.unwrap_or_default()
.to_string();
let mut cmd = redis::cmd("HSET");
cmd.arg(&key)

let mut cmd = redis::cmd("EVAL");
cmd.arg(&*self.scripts.register_agent)
.arg(1)
.arg(&key)
.arg(ENTITY_TTL_SECS)
.arg(if alias_provided { "1" } else { "0" })
.arg("addr")
.arg(address)
.arg("updated_at")
Expand All @@ -138,14 +145,12 @@ impl RedisRepo {
cmd.arg("resource").arg(&resource_json);
cmd.arg("topology").arg(resource_json);
}
cmd.query_async::<()>(&mut *conn).await?;

// Set TTL so stale agents expire if heartbeat stops
redis::cmd("EXPIRE")
.arg(&key)
.arg(ENTITY_TTL_SECS)
.query_async::<()>(&mut *conn)
.await?;
let registered: i64 = cmd.query_async(&mut *conn).await?;
if registered == 0 {
return Err(AppError::Conflict(format!(
"Peer agent alias '{agent_name}' already exists in this scope"
)));
}

tracing::info!("Registered peer agent: {agent_name} (TTL: {ENTITY_TTL_SECS}s)");
Ok(agent_name)
Expand Down
13 changes: 13 additions & 0 deletions dlslime/peer_agent/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,14 @@ def _register(self) -> None:
httpx.TimeoutException,
httpx.HTTPStatusError,
) as e:
if (
isinstance(e, httpx.HTTPStatusError)
and e.response.status_code == 409
):
raise RuntimeError(
f"PeerAgent {self.alias!r} registration conflict: "
f"{e.response.text}"
) from e
if attempt < max_retries - 1:
wait_time = retry_delay * (2**attempt)
logger.warning(
Expand Down Expand Up @@ -966,6 +974,11 @@ def connect_to(
"""Start connecting to a peer and return a connection handle."""
if not isinstance(peer_alias, str) or not peer_alias:
raise TypeError("connect_to() requires a non-empty peer alias string")
if peer_alias == self.alias:
raise ValueError(
f"connect_to() cannot target this agent's own alias {peer_alias!r}. "
"Start peer agents with distinct aliases."
)

_tlog(f"{self.alias}: connect_to({peer_alias}) ENTER")
t0 = time.perf_counter()
Expand Down
16 changes: 16 additions & 0 deletions examples/python/p2p_rdma_rc_read_ctrl_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
from dlslime import start_peer_agent


EXAMPLE_SCOPE = "ctrl_plane_rc_read_ctrl_plane_example"


def print_topology_discovery(agent, label, peer_aliases):
print(f"\nTopology discovery ({label} view):")
print("Active agents:", agent.list_agents())
Expand All @@ -33,11 +36,15 @@ def print_topology_discovery(agent, label, peer_aliases):
initiator_agent = start_peer_agent(
# alias=None (default) - NanoCtrl will auto-generate unique name
nanoctrl_url="http://127.0.0.1:3000",
alias="dlslime0",
scope=EXAMPLE_SCOPE,
)

target_agent = start_peer_agent(
# alias=None (default) - NanoCtrl will auto-generate unique name
nanoctrl_url="http://127.0.0.1:3000",
alias="dlslime1",
scope=EXAMPLE_SCOPE,
)

# Get allocated names
Expand All @@ -58,6 +65,15 @@ def print_topology_discovery(agent, label, peer_aliases):
[initiator_name, target_name],
)

if (
target_name not in initiator_agent.list_agents()
or initiator_name not in target_agent.list_agents()
):
raise RuntimeError(
"Both peer agents must use the same NanoCtrl scope so they can discover "
f"each other. scope={EXAMPLE_SCOPE!r}"
)

# Connect both sides so each agent has a connection handle to wait on.
print("Connecting peers...")
initiator_conn = initiator_agent.connect_to(target_name, ib_port=1, qp_num=1)
Expand Down
40 changes: 40 additions & 0 deletions tests/python/test_peer_agent_topology_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ def test_peer_connection_wait_rejects_names_without_connection():
conn.wait(timeout=0.01)


def test_connect_to_rejects_self_alias():
agent = _bare_agent()

with pytest.raises(ValueError, match="cannot target this agent's own alias"):
agent.connect_to(agent.alias)


def _write(path, text):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(text, encoding="utf-8")
Expand Down Expand Up @@ -322,6 +329,39 @@ def test_publish_resource_record_and_get_resource_round_trip():
assert resource["nics"][0]["ports"][0]["link_type"] == "RoCE"


def test_peer_agent_scope_isolates_discovery_namespace():
redis_client = FakeRedis()

initiator = _bare_agent(address="10.0.0.1")
initiator.alias = "dlslime1"
initiator._redis_key_prefix = "example_scope"
initiator._redis_client = redis_client
initiator._publish_resource_record()

same_scope_target = _bare_agent(address="10.0.0.2")
same_scope_target.alias = "dlslime0"
same_scope_target._redis_key_prefix = "example_scope"
same_scope_target._redis_client = redis_client
same_scope_target._publish_resource_record()

default_scope_target_with_same_alias = _bare_agent(address="10.0.0.3")
default_scope_target_with_same_alias.alias = "dlslime0"
default_scope_target_with_same_alias._redis_key_prefix = ""
default_scope_target_with_same_alias._redis_client = redis_client
default_scope_target_with_same_alias._publish_resource_record()

assert initiator.list_agents() == ["dlslime0", "dlslime1"]
assert initiator.get_resource("dlslime0") is not None
assert initiator.get_resource("dlslime0")["host"]["address"] == "10.0.0.2"

assert default_scope_target_with_same_alias.list_agents() == ["dlslime0"]
assert (
default_scope_target_with_same_alias.get_resource("dlslime0")["host"]["address"]
== "10.0.0.3"
)
assert default_scope_target_with_same_alias.get_resource("dlslime1") is None


def test_discover_topology_requires_at_least_one_nic(tmp_path):
with pytest.raises(RuntimeError, match="No RDMA devices available"):
discover_topology(
Expand Down