Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1ad988c
feat(dgw): agent tunnel transparent routing + cert renewal
irvingoujAtDevolution Apr 21, 2026
9abb9fb
fix(agent): reconnect after successful cert renewal
irvingoujAtDevolution Apr 21, 2026
f928a36
fix(agent-tunnel): address Copilot review feedback on PR #1741
irvingoujAtDevolution Apr 22, 2026
b6c500d
chore(agent-tunnel): fix CI, relocate tests, elaborate override ratio…
irvingoujAtDevolution Apr 22, 2026
f3cff9d
refactor(agent-tunnel): remove gateway QUIC endpoint self-report
irvingoujAtDevolution Apr 22, 2026
418b925
refactor(dgw): simplify forwarded upstream routing
irvingoujAtDevolution Apr 22, 2026
6c23728
refactor(dgw): extract upstream routing into shared module
irvingoujAtDevolution Apr 23, 2026
f6b6229
feat(agent-tunnel): admin-facing /jet/tunnel/enrollment-string endpoint
irvingoujAtDevolution Apr 23, 2026
ac929d2
feat(utils): add GatewayAgentEnroll / GatewayAgentRead scopes + bump …
irvingoujAtDevolution Apr 23, 2026
a370e1e
chore(agent-tunnel): address review feedback on PR
irvingoujAtDevolution Apr 27, 2026
f338617
chore(upstream): log routing decision for implicit lookups
irvingoujAtDevolution Apr 27, 2026
b011432
style: cargo fmt
irvingoujAtDevolution Apr 27, 2026
e65f199
fix(agent-tunnel): address Copilot follow-up review
irvingoujAtDevolution Apr 27, 2026
fd80bf6
fix(fwd): restore per-mode log message string
irvingoujAtDevolution Apr 27, 2026
6ed4220
chore(utils): release Devolutions.Gateway.Utils 2025.10.2
irvingoujAtDevolution Apr 27, 2026
4e52c64
refactor(agent-tunnel): drop server-side enrollment-string mint
irvingoujAtDevolution Apr 27, 2026
88c2539
refactor(token): collapse two enrollment scopes into agent.enroll
irvingoujAtDevolution Apr 27, 2026
2982a9c
fix(agent-tunnel): periodic cert renewal + KDC scheme guard
irvingoujAtDevolution Apr 27, 2026
37e3d37
fix(agent-tunnel): address Copilot review on agent enrollment PR
irvingoujAtDevolution Apr 27, 2026
0bf8efa
chore(utils): release Devolutions.Gateway.Utils 2026.4.27
irvingoujAtDevolution Apr 27, 2026
b763ab7
feat(agent): --advertise-domains CLI flag + enroll.nu demo helper
irvingoujAtDevolution Apr 27, 2026
e3cfe35
fix(enroll.nu): drop PowerShell-style line continuation
irvingoujAtDevolution Apr 27, 2026
22b1663
fix(enroll.nu): default agent name to 'demo-agent'
irvingoujAtDevolution Apr 27, 2026
b513422
chore(agent-tunnel): address maintainer review feedback
irvingoujAtDevolution Apr 28, 2026
d112b91
refactor(pr2): trim cert renewal and JWT enrollment refactor
irvingoujAtDevolution Apr 29, 2026
69ef61e
refactor(pr2): drop residual D content from agent CLI
irvingoujAtDevolution Apr 29, 2026
1188fbe
refactor(pr2): split cert.rs refactor and agent-tunnel tests into own…
irvingoujAtDevolution Apr 29, 2026
a2b2ff5
refactor(pr2): defer KDC tunnel routing to DGW-384
irvingoujAtDevolution May 12, 2026
839b656
feat(dgw): route KDC traffic through agent tunnel (DGW-384)
irvingoujAtDevolution May 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/agent-tunnel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod cert;
pub mod enrollment_store;
pub mod listener;
pub mod registry;
pub mod routing;
pub mod stream;

pub use enrollment_store::EnrollmentTokenStore;
Expand Down
13 changes: 8 additions & 5 deletions crates/agent-tunnel/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl AgentTunnelListener {
let handle = AgentTunnelHandle {
registry: Arc::clone(&registry),
agent_connections: Arc::clone(&agent_connections),
ca_manager,
ca_manager: Arc::clone(&ca_manager),
enrollment_token_store,
};

Expand All @@ -170,6 +170,11 @@ impl AgentTunnelListener {

Ok((listener, handle))
}

/// Returns the local address the QUIC endpoint is bound to.
pub fn local_addr(&self) -> SocketAddr {
self.endpoint.local_addr().expect("endpoint has local addr")
}
}

#[async_trait]
Expand Down Expand Up @@ -202,9 +207,7 @@ impl devolutions_gateway_task::Task for AgentTunnelListener {
let registry = Arc::clone(&self.registry);
let agent_connections = Arc::clone(&self.agent_connections);

conn_handles.spawn(
run_agent_connection(registry, agent_connections, incoming),
);
conn_handles.spawn(run_agent_connection(registry, agent_connections, incoming));
}

// Reap completed connection tasks to prevent unbounded growth.
Expand Down Expand Up @@ -253,7 +256,7 @@ async fn run_agent_connection(

info!(%agent_id, %agent_name, %peer_addr, "Agent authenticated via mTLS");

let peer = Arc::new(AgentPeer::new(agent_id, agent_name, fingerprint));
let peer = Arc::new(AgentPeer::new(agent_id, agent_name.clone(), fingerprint));
registry.register(Arc::clone(&peer)).await;
agent_connections.write().await.insert(agent_id, conn.clone());

Expand Down
90 changes: 90 additions & 0 deletions crates/agent-tunnel/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use serde::Serialize;
use tokio::sync::RwLock as TokioRwLock;
use uuid::Uuid;

use crate::routing::RouteTarget;

/// Duration after which an agent is considered offline if no heartbeat has been received.
pub const AGENT_OFFLINE_TIMEOUT: Duration = Duration::from_secs(90);

Expand All @@ -32,6 +34,33 @@ pub struct RouteAdvertisementState {
pub updated_at: SystemTime,
}

impl RouteAdvertisementState {
/// Match this route set against a parsed target host.
///
/// Returns a specificity score if matched, or `None` if no match.
/// IP subnet matches return `usize::MAX` (always highest priority).
/// Domain matches return the matched domain length (longer = more specific).
pub fn matches_target(&self, target: &RouteTarget) -> Option<usize> {
use std::net::IpAddr;

match target {
// Only IPv4 subnets are currently tracked; only match IPv4 target IPs.
RouteTarget::Ip(IpAddr::V4(ipv4)) => self
.subnets
.iter()
.any(|subnet| subnet.contains(*ipv4))
.then_some(usize::MAX),
RouteTarget::Ip(IpAddr::V6(_)) => None,
RouteTarget::Hostname(hostname) => self
.domains
.iter()
.filter(|adv| adv.domain.matches_hostname(hostname.as_str()))
.map(|adv| adv.domain.as_str().len())
.max(),
}
}
}

impl Default for RouteAdvertisementState {
fn default() -> Self {
let now = SystemTime::now();
Expand Down Expand Up @@ -79,6 +108,34 @@ impl AgentPeer {
self.last_seen.store(now_ms, Ordering::Release);
}

/// Set `last_seen` to an explicit timestamp (milliseconds since UNIX epoch).
///
/// Test-only API — the `_for_test` suffix is the project's signal that
/// production code must not call this. Used by integration tests in
/// `devolutions-gateway/tests/agent_tunnel_*` to force an agent into the
/// "offline" state without waiting for the real timeout to elapse;
/// production code should use [`touch`](Self::touch) instead. We deliberately
/// keep this `pub` instead of `cfg(test)` because the consumer is a
/// different crate's tests, and `cfg(test)` is only set within the
/// declaring crate's own test build.
#[doc(hidden)]
pub fn set_last_seen_for_test(&self, last_seen_ms: u64) {
self.last_seen.store(last_seen_ms, Ordering::Release);
}

/// Overwrite `received_at` on the current route state.
///
/// Test-only API. Intended for tests that need to assert ordering by
/// arrival time without relying on wall-clock `thread::sleep` — which is
/// flaky on platforms with coarse timer resolution (e.g. Windows ~16 ms).
/// See the note on [`set_last_seen_for_test`](Self::set_last_seen_for_test)
/// for why this is `pub` rather than `cfg(test)`.
#[doc(hidden)]
pub fn set_received_at_for_test(&self, received_at: SystemTime) {
let mut state = self.route_state.write();
state.received_at = received_at;
}

/// Returns the last-seen timestamp as milliseconds since UNIX epoch.
pub fn last_seen_ms(&self) -> u64 {
self.last_seen.load(Ordering::Acquire)
Expand Down Expand Up @@ -223,6 +280,39 @@ impl AgentRegistry {
pub async fn agent_infos(&self) -> Vec<AgentInfo> {
self.agents.read().await.values().map(AgentInfo::from).collect()
}

/// Find all online agents that can route to the given parsed target host.
///
/// For IP targets: matches against advertised subnets.
/// For domain targets: uses longest suffix match (more specific domain wins).
///
/// Results with equal specificity are sorted by `received_at` descending (most recent first).
pub async fn find_agents_for(&self, target: &RouteTarget) -> Vec<Arc<AgentPeer>> {
let mut best_specificity: usize = 0;
let mut candidates: Vec<(SystemTime, Arc<AgentPeer>)> = Vec::new();

let agents = self.agents.read().await;
for agent in agents.values() {
if !agent.is_online(AGENT_OFFLINE_TIMEOUT) {
continue;
}

let route_state = agent.route_state();

if let Some(specificity) = route_state.matches_target(target) {
if specificity > best_specificity {
best_specificity = specificity;
candidates.clear();
candidates.push((route_state.received_at, Arc::clone(agent)));
} else if specificity == best_specificity {
candidates.push((route_state.received_at, Arc::clone(agent)));
}
}
}

candidates.sort_by(|a, b| b.0.cmp(&a.0));
candidates.into_iter().map(|(_, agent)| agent).collect()
}
}

impl Default for AgentRegistry {
Expand Down
194 changes: 194 additions & 0 deletions crates/agent-tunnel/src/routing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
//! Shared routing pipeline for agent tunnel.
//!
//! Consumed by the upstream connection paths (forwarding, RDP clean path,
//! generic client) and by the KDC proxy (HTTP endpoint plus the CredSSP/NLA
//! sub-flow inside `rdp_proxy.rs`) to ensure consistent routing behavior and
//! error messages.

use std::net::IpAddr;
use std::sync::Arc;

use agent_tunnel_proto::DomainName;
use anyhow::{Result, anyhow};
use uuid::Uuid;

use super::listener::AgentTunnelHandle;
use super::registry::{AgentPeer, AgentRegistry};
use super::stream::TunnelStream;

/// A parsed target host used for route matching.
///
/// Routing cares only about the host identity, not the port or scheme used by
/// the eventual connection attempt.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RouteTarget {
Ip(IpAddr),
Hostname(DomainName),
}

impl RouteTarget {
pub fn ip(ip: IpAddr) -> Self {
Self::Ip(ip)
}

pub fn hostname(hostname: impl Into<String>) -> Self {
Self::Hostname(DomainName::new(hostname))
}
}

impl std::fmt::Display for RouteTarget {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Ip(ip) => ip.fmt(f),
Self::Hostname(hostname) => hostname.fmt(f),
}
}
}

/// Result of the routing pipeline.
///
/// Each variant carries enough context for the caller to produce an actionable error message.
#[derive(Debug)]
pub enum RoutingDecision {
/// Route through these agent candidates (try in order, first success wins).
ViaAgent(Vec<Arc<AgentPeer>>),
/// Explicit agent_id was specified but not found in registry.
ExplicitAgentNotFound(Uuid),
/// No agent matched — caller should attempt direct connection.
Direct,
}

/// Determines how to route a connection to the given target.
///
/// Pipeline (in order of priority):
/// 1. Explicit agent_id (from JWT) → route to that agent
/// 2. Target match (IP subnet or domain suffix) → best match wins
/// 3. No match → direct connection
pub async fn resolve_route(
registry: &AgentRegistry,
explicit_agent_id: Option<Uuid>,
target: &RouteTarget,
) -> RoutingDecision {
// Step 1: Explicit agent ID (from JWT)
if let Some(id) = explicit_agent_id {
return match registry.get(&id).await {
Some(agent) => RoutingDecision::ViaAgent(vec![agent]),
None => RoutingDecision::ExplicitAgentNotFound(id),
};
}

// Step 2: Match target against all agents (IP subnet or domain suffix)
let agents = registry.find_agents_for(target).await;

if agents.is_empty() {
RoutingDecision::Direct
} else {
RoutingDecision::ViaAgent(agents)
}
}

/// Attempt to route a connection via the agent tunnel.
///
/// Returns `Ok(Some(stream))` if routed through an agent, `Ok(None)` if the caller
/// should fall through to direct connect, or `Err` if an explicit agent was specified
/// but not found (or all candidates failed).
pub async fn try_route(
handle: Option<&AgentTunnelHandle>,
explicit_agent_id: Option<Uuid>,
target: &RouteTarget,
session_id: Uuid,
target_addr: &str,
) -> Result<Option<(TunnelStream, Arc<AgentPeer>)>> {
let Some(handle) = handle else {
// An explicit `jet_agent_id` claim means the token requires routing via that
// specific agent; silently falling back to a direct connect would bypass the
// intended network boundary. Reject instead.
return match explicit_agent_id {
Some(id) => Err(anyhow!(
"agent {id} specified in token requires agent tunnel routing, but no tunnel handle is configured"
)),
None => Ok(None),
};
};

match resolve_route(handle.registry(), explicit_agent_id, target).await {
RoutingDecision::ExplicitAgentNotFound(id) => {
Err(anyhow!("agent {id} specified in token not found in registry"))
}
RoutingDecision::Direct => Ok(None),
RoutingDecision::ViaAgent(candidates) => {
let result = route_and_connect(handle, &candidates, session_id, target_addr).await?;
Ok(Some(result))
}
}
}

/// Try connecting to target through agent candidates (try-fail-retry).
///
/// Returns the connected `TunnelStream` and the agent that succeeded.
///
/// Callers must handle `RoutingDecision::ExplicitAgentNotFound` and
/// `RoutingDecision::Direct` before calling this function.
pub async fn route_and_connect(
handle: &AgentTunnelHandle,
candidates: &[Arc<AgentPeer>],
session_id: Uuid,
target: &str,
) -> Result<(TunnelStream, Arc<AgentPeer>)> {
if candidates.is_empty() {
return Err(anyhow!("route_and_connect called with empty candidates"));
}

let mut last_error = None;

for agent in candidates {
info!(
agent_id = %agent.agent_id,
agent_name = %agent.name,
%target,
"Routing via agent tunnel"
);

match handle.connect_via_agent(agent.agent_id, session_id, target).await {
Ok(stream) => {
info!(
agent_id = %agent.agent_id,
agent_name = %agent.name,
%target,
"Agent tunnel connection established"
);
return Ok((stream, Arc::clone(agent)));
}
Err(error) => {
warn!(
agent_id = %agent.agent_id,
agent_name = %agent.name,
%target,
error = format!("{error:#}"),
"Agent tunnel connection failed, trying next candidate"
);
last_error = Some(error);
}
}
}

let agent_names: Vec<&str> = candidates.iter().map(|a| a.name.as_str()).collect();
let last_err_msg = last_error.as_ref().map(|e| format!("{e:#}")).unwrap_or_default();

error!(
agent_count = candidates.len(),
%target,
agents = ?agent_names,
last_error = %last_err_msg,
"All agent tunnel candidates failed"
);

Err(last_error.unwrap_or_else(|| {
anyhow!(
"All {} agents matching target '{}' failed to connect. Agents tried: [{}]",
candidates.len(),
target,
agent_names.join(", "),
)
}))
}
Loading
Loading