diff --git a/NanoCtrl/lua/register_agent.lua b/NanoCtrl/lua/register_agent.lua new file mode 100644 index 0000000..141c3a4 --- /dev/null +++ b/NanoCtrl/lua/register_agent.lua @@ -0,0 +1,21 @@ +-- Atomically register a peer agent hash and set its TTL. +-- +-- KEYS[1] = scoped agent key, e.g. "{scope}:agent:{name}" or "agent:{name}" +-- ARGV[1] = TTL seconds +-- ARGV[2] = "1" to reject an existing key, "0" to allow upsert +-- ARGV[3..] = alternating hash field/value pairs + +local key = KEYS[1] +local ttl = tonumber(ARGV[1]) +local reject_existing = ARGV[2] == "1" + +if reject_existing and redis.call("EXISTS", key) == 1 then + return 0 +end + +for i = 3, #ARGV, 2 do + redis.call("HSET", key, ARGV[i], ARGV[i + 1]) +end + +redis.call("EXPIRE", key, ttl) +return 1 diff --git a/NanoCtrl/pyproject.toml b/NanoCtrl/pyproject.toml index 78f322c..f2ed637 100644 --- a/NanoCtrl/pyproject.toml +++ b/NanoCtrl/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "maturin" [project] name = "nanoctrl" -version = "0.0.7" +version = "0.0.8" description = "Shared NanoCtrl lifecycle client for NanoInfra services" requires-python = ">=3.10" dependencies = ["httpx>=0.24"] diff --git a/NanoCtrl/src/redis_repo.rs b/NanoCtrl/src/redis_repo.rs index af43b1b..25b04e7 100644 --- a/NanoCtrl/src/redis_repo.rs +++ b/NanoCtrl/src/redis_repo.rs @@ -18,6 +18,7 @@ pub const ENTITY_TTL_SECS: usize = 60; /// Lua scripts embedded once at compile time. pub struct LuaScripts { + pub register_agent: String, pub register_entity: String, pub unregister_entity: String, pub heartbeat: String, @@ -27,6 +28,7 @@ impl LuaScripts { /// Load all Lua scripts from embedded source. pub fn load() -> Result { Ok(Self { + register_agent: include_str!("../lua/register_agent.lua").to_string(), register_entity: include_str!("../lua/register_entity.lua").to_string(), unregister_entity: include_str!("../lua/unregister_entity.lua").to_string(), heartbeat: include_str!("../lua/heartbeat.lua").to_string(), @@ -99,15 +101,15 @@ impl RedisRepo { ) -> Result { let mut conn = self.conn().await?; - let agent_name = if let Some(alias) = alias { - alias + let (agent_name, alias_provided) = if let Some(alias) = alias { + (alias, true) } else { let counter_key = self.scoped_key(scope, &["agent_name_counter"]); let counter: i64 = redis::cmd("INCR") .arg(&counter_key) .query_async(&mut *conn) .await?; - format!("{name_prefix}-{counter:x}") + (format!("{name_prefix}-{counter:x}"), false) }; let key = self.scoped_key(scope, &["agent", &agent_name]); @@ -116,8 +118,13 @@ impl RedisRepo { .map(|d| d.as_secs()) .unwrap_or_default() .to_string(); - let mut cmd = redis::cmd("HSET"); - cmd.arg(&key) + + let mut cmd = redis::cmd("EVAL"); + cmd.arg(&*self.scripts.register_agent) + .arg(1) + .arg(&key) + .arg(ENTITY_TTL_SECS) + .arg(if alias_provided { "1" } else { "0" }) .arg("addr") .arg(address) .arg("updated_at") @@ -138,14 +145,12 @@ impl RedisRepo { cmd.arg("resource").arg(&resource_json); cmd.arg("topology").arg(resource_json); } - cmd.query_async::<()>(&mut *conn).await?; - - // Set TTL so stale agents expire if heartbeat stops - redis::cmd("EXPIRE") - .arg(&key) - .arg(ENTITY_TTL_SECS) - .query_async::<()>(&mut *conn) - .await?; + let registered: i64 = cmd.query_async(&mut *conn).await?; + if registered == 0 { + return Err(AppError::Conflict(format!( + "Peer agent alias '{agent_name}' already exists in this scope" + ))); + } tracing::info!("Registered peer agent: {agent_name} (TTL: {ENTITY_TTL_SECS}s)"); Ok(agent_name) diff --git a/dlslime/peer_agent/_agent.py b/dlslime/peer_agent/_agent.py index a5a4004..ef44a7f 100644 --- a/dlslime/peer_agent/_agent.py +++ b/dlslime/peer_agent/_agent.py @@ -425,6 +425,14 @@ def _register(self) -> None: httpx.TimeoutException, httpx.HTTPStatusError, ) as e: + if ( + isinstance(e, httpx.HTTPStatusError) + and e.response.status_code == 409 + ): + raise RuntimeError( + f"PeerAgent {self.alias!r} registration conflict: " + f"{e.response.text}" + ) from e if attempt < max_retries - 1: wait_time = retry_delay * (2**attempt) logger.warning( @@ -966,6 +974,11 @@ def connect_to( """Start connecting to a peer and return a connection handle.""" if not isinstance(peer_alias, str) or not peer_alias: raise TypeError("connect_to() requires a non-empty peer alias string") + if peer_alias == self.alias: + raise ValueError( + f"connect_to() cannot target this agent's own alias {peer_alias!r}. " + "Start peer agents with distinct aliases." + ) _tlog(f"{self.alias}: connect_to({peer_alias}) ENTER") t0 = time.perf_counter() diff --git a/examples/python/p2p_rdma_rc_read_ctrl_plane.py b/examples/python/p2p_rdma_rc_read_ctrl_plane.py index d408a85..1d00c6c 100755 --- a/examples/python/p2p_rdma_rc_read_ctrl_plane.py +++ b/examples/python/p2p_rdma_rc_read_ctrl_plane.py @@ -13,6 +13,9 @@ from dlslime import start_peer_agent +EXAMPLE_SCOPE = "ctrl_plane_rc_read_ctrl_plane_example" + + def print_topology_discovery(agent, label, peer_aliases): print(f"\nTopology discovery ({label} view):") print("Active agents:", agent.list_agents()) @@ -33,11 +36,15 @@ def print_topology_discovery(agent, label, peer_aliases): initiator_agent = start_peer_agent( # alias=None (default) - NanoCtrl will auto-generate unique name nanoctrl_url="http://127.0.0.1:3000", + alias="dlslime0", + scope=EXAMPLE_SCOPE, ) target_agent = start_peer_agent( # alias=None (default) - NanoCtrl will auto-generate unique name nanoctrl_url="http://127.0.0.1:3000", + alias="dlslime1", + scope=EXAMPLE_SCOPE, ) # Get allocated names @@ -58,6 +65,15 @@ def print_topology_discovery(agent, label, peer_aliases): [initiator_name, target_name], ) +if ( + target_name not in initiator_agent.list_agents() + or initiator_name not in target_agent.list_agents() +): + raise RuntimeError( + "Both peer agents must use the same NanoCtrl scope so they can discover " + f"each other. scope={EXAMPLE_SCOPE!r}" + ) + # Connect both sides so each agent has a connection handle to wait on. print("Connecting peers...") initiator_conn = initiator_agent.connect_to(target_name, ib_port=1, qp_num=1) diff --git a/tests/python/test_peer_agent_topology_discovery.py b/tests/python/test_peer_agent_topology_discovery.py index ccfac60..a130842 100644 --- a/tests/python/test_peer_agent_topology_discovery.py +++ b/tests/python/test_peer_agent_topology_discovery.py @@ -78,6 +78,13 @@ def test_peer_connection_wait_rejects_names_without_connection(): conn.wait(timeout=0.01) +def test_connect_to_rejects_self_alias(): + agent = _bare_agent() + + with pytest.raises(ValueError, match="cannot target this agent's own alias"): + agent.connect_to(agent.alias) + + def _write(path, text): path.parent.mkdir(parents=True, exist_ok=True) path.write_text(text, encoding="utf-8") @@ -322,6 +329,39 @@ def test_publish_resource_record_and_get_resource_round_trip(): assert resource["nics"][0]["ports"][0]["link_type"] == "RoCE" +def test_peer_agent_scope_isolates_discovery_namespace(): + redis_client = FakeRedis() + + initiator = _bare_agent(address="10.0.0.1") + initiator.alias = "dlslime1" + initiator._redis_key_prefix = "example_scope" + initiator._redis_client = redis_client + initiator._publish_resource_record() + + same_scope_target = _bare_agent(address="10.0.0.2") + same_scope_target.alias = "dlslime0" + same_scope_target._redis_key_prefix = "example_scope" + same_scope_target._redis_client = redis_client + same_scope_target._publish_resource_record() + + default_scope_target_with_same_alias = _bare_agent(address="10.0.0.3") + default_scope_target_with_same_alias.alias = "dlslime0" + default_scope_target_with_same_alias._redis_key_prefix = "" + default_scope_target_with_same_alias._redis_client = redis_client + default_scope_target_with_same_alias._publish_resource_record() + + assert initiator.list_agents() == ["dlslime0", "dlslime1"] + assert initiator.get_resource("dlslime0") is not None + assert initiator.get_resource("dlslime0")["host"]["address"] == "10.0.0.2" + + assert default_scope_target_with_same_alias.list_agents() == ["dlslime0"] + assert ( + default_scope_target_with_same_alias.get_resource("dlslime0")["host"]["address"] + == "10.0.0.3" + ) + assert default_scope_target_with_same_alias.get_resource("dlslime1") is None + + def test_discover_topology_requires_at_least_one_nic(tmp_path): with pytest.raises(RuntimeError, match="No RDMA devices available"): discover_topology(