diff --git a/Cargo.lock b/Cargo.lock index 8544cc23..c7b334d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5897,6 +5897,8 @@ dependencies = [ "pluto-p2p", "pluto-tracing", "rand 0.8.6", + "reqwest 0.13.3", + "serde_json", "thiserror 2.0.18", "tokio", "tokio-util", diff --git a/crates/p2p/src/utils.rs b/crates/p2p/src/utils.rs index 6acc054c..cb91d710 100644 --- a/crates/p2p/src/utils.rs +++ b/crates/p2p/src/utils.rs @@ -28,7 +28,7 @@ use crate::{ /// Returns the external IP and Hostname fields as multiaddrs using the listen /// TCP addresses ports. -pub(crate) fn external_tcp_multiaddrs(cfg: &P2PConfig) -> crate::p2p::Result> { +pub fn external_tcp_multiaddrs(cfg: &P2PConfig) -> crate::p2p::Result> { let addrs = cfg.parse_tcp_addrs()?; let mut ports = vec![]; @@ -60,7 +60,7 @@ pub(crate) fn external_tcp_multiaddrs(cfg: &P2PConfig) -> crate::p2p::Result crate::p2p::Result> { +pub fn external_udp_multiaddrs(cfg: &P2PConfig) -> crate::p2p::Result> { let addrs = cfg.parse_udp_addrs()?; let mut ports = vec![]; diff --git a/crates/relay-server/Cargo.toml b/crates/relay-server/Cargo.toml index f508bc1b..8e04354e 100644 --- a/crates/relay-server/Cargo.toml +++ b/crates/relay-server/Cargo.toml @@ -25,6 +25,8 @@ pluto-p2p.workspace = true pluto-core.workspace = true [dev-dependencies] +reqwest = { workspace = true } +serde_json = { workspace = true } [lints] workspace = true diff --git a/crates/relay-server/src/lib.rs b/crates/relay-server/src/lib.rs index a85f0cd6..dd6d0616 100644 --- a/crates/relay-server/src/lib.rs +++ b/crates/relay-server/src/lib.rs @@ -21,3 +21,6 @@ pub mod utils; pub use error::RelayP2PError; pub(crate) use error::Result; + +#[doc(hidden)] +pub use web::enr_server; diff --git a/crates/relay-server/src/p2p.rs b/crates/relay-server/src/p2p.rs index 951b7766..11e6c142 100644 --- a/crates/relay-server/src/p2p.rs +++ b/crates/relay-server/src/p2p.rs @@ -22,6 +22,7 @@ use pluto_p2p::{ manet::Manet, p2p::{Node, NodeType}, p2p_context::P2PContext, + utils::{external_tcp_multiaddrs, external_udp_multiaddrs}, }; /// Runs a relay P2P node. @@ -69,12 +70,20 @@ pub async fn run_relay_p2p_node( let listeners = Arc::new(RwLock::new(Vec::new())); + // Compute external multiaddrs from external_ip / external_host config so + // they're advertised on `/` and folded into ENR responses on `/enr` even + // when libp2p only sees private listen addresses (e.g., K8s pods behind + // NodePort). + let mut external_addrs = external_tcp_multiaddrs(&config.p2p_config)?; + external_addrs.extend(external_udp_multiaddrs(&config.p2p_config)?); + let enr_server_handle = tokio::spawn(enr_server( server_errors.clone(), config.clone(), key.clone(), *node.local_peer_id(), listeners.clone(), + external_addrs, ct.child_token(), )); diff --git a/crates/relay-server/src/utils.rs b/crates/relay-server/src/utils.rs index 19ed49e8..7f7d8b36 100644 --- a/crates/relay-server/src/utils.rs +++ b/crates/relay-server/src/utils.rs @@ -61,3 +61,172 @@ pub(crate) fn extract_ip_and_udp_port(addr: &Multiaddr) -> Option<(Ipv4Addr, u16 _ => None, } } + +/// Extracts DNS hostname and TCP port from a `/dns(4|6)//tcp/` +/// multiaddr. +pub(crate) fn extract_dns_and_tcp_port(addr: &Multiaddr) -> Option<(String, u16)> { + let mut host: Option = None; + let mut port: Option = None; + + for protocol in addr.iter() { + match protocol { + Protocol::Dns(h) | Protocol::Dns4(h) | Protocol::Dns6(h) => { + host = Some(h.into_owned()); + } + Protocol::Tcp(p) => port = Some(p), + _ => {} + } + } + + match (host, port) { + (Some(h), Some(p)) => Some((h, p)), + _ => None, + } +} + +/// Extracts DNS hostname and UDP port from a +/// `/dns(4|6)//udp//quic-v1` multiaddr. +pub(crate) fn extract_dns_and_udp_port(addr: &Multiaddr) -> Option<(String, u16)> { + let mut host: Option = None; + let mut port: Option = None; + + for protocol in addr.iter() { + match protocol { + Protocol::Dns(h) | Protocol::Dns4(h) | Protocol::Dns6(h) => { + host = Some(h.into_owned()); + } + Protocol::Udp(p) => port = Some(p), + _ => {} + } + } + + match (host, port) { + (Some(h), Some(p)) => Some((h, p)), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::Ipv6Addr; + + fn ma(s: &str) -> Multiaddr { + s.parse().expect("valid multiaddr") + } + + #[test] + fn is_public_addr_public_ipv4() { + assert!(is_public_addr(&ma("/ip4/1.2.3.4/tcp/8000"))); + } + + #[test] + fn is_public_addr_private_ipv4() { + assert!(!is_public_addr(&ma("/ip4/10.0.0.1/tcp/8000"))); + assert!(!is_public_addr(&ma("/ip4/192.168.1.1/tcp/8000"))); + assert!(!is_public_addr(&ma("/ip4/172.16.0.1/tcp/8000"))); + } + + #[test] + fn is_public_addr_loopback_unspecified_linklocal() { + assert!(!is_public_addr(&ma("/ip4/127.0.0.1/tcp/8000"))); + assert!(!is_public_addr(&ma("/ip4/0.0.0.0/tcp/8000"))); + assert!(!is_public_addr(&ma("/ip4/169.254.1.1/tcp/8000"))); + } + + #[test] + fn is_public_addr_dns_is_not_public() { + // No IP component: function falls through to `false`. + assert!(!is_public_addr(&ma("/dns/example.com/tcp/8000"))); + } + + #[test] + fn extract_ip_and_tcp_port_happy() { + let ip = Ipv4Addr::new(1, 2, 3, 4); + let got = extract_ip_and_tcp_port(&ma("/ip4/1.2.3.4/tcp/8000")).unwrap(); + assert_eq!(got, (ip, 8000)); + } + + #[test] + fn extract_ip_and_tcp_port_missing_ip() { + assert!(extract_ip_and_tcp_port(&ma("/dns/example.com/tcp/8000")).is_none()); + } + + #[test] + fn extract_ip_and_tcp_port_missing_tcp() { + assert!(extract_ip_and_tcp_port(&ma("/ip4/1.2.3.4/udp/8000/quic-v1")).is_none()); + } + + #[test] + fn extract_ip_and_udp_port_quic_v1() { + let ip = Ipv4Addr::new(5, 6, 7, 8); + let got = extract_ip_and_udp_port(&ma("/ip4/5.6.7.8/udp/9000/quic-v1")).unwrap(); + assert_eq!(got, (ip, 9000)); + } + + #[test] + fn extract_ip_and_udp_port_ignores_tcp() { + assert!(extract_ip_and_udp_port(&ma("/ip4/1.2.3.4/tcp/8000")).is_none()); + } + + #[test] + fn extract_dns_and_tcp_port_dns() { + let got = extract_dns_and_tcp_port(&ma("/dns/relay.example.com/tcp/3610")).unwrap(); + assert_eq!(got, ("relay.example.com".to_string(), 3610)); + } + + #[test] + fn extract_dns_and_tcp_port_dns4() { + let got = extract_dns_and_tcp_port(&ma("/dns4/relay.example.com/tcp/3610")).unwrap(); + assert_eq!(got, ("relay.example.com".to_string(), 3610)); + } + + #[test] + fn extract_dns_and_tcp_port_dns6() { + let got = extract_dns_and_tcp_port(&ma("/dns6/relay.example.com/tcp/3610")).unwrap(); + assert_eq!(got, ("relay.example.com".to_string(), 3610)); + } + + #[test] + fn extract_dns_and_tcp_port_skips_ip4() { + assert!(extract_dns_and_tcp_port(&ma("/ip4/1.2.3.4/tcp/3610")).is_none()); + } + + #[test] + fn extract_dns_and_tcp_port_missing_tcp() { + assert!(extract_dns_and_tcp_port(&ma("/dns/relay.example.com/udp/3610/quic-v1")).is_none()); + } + + #[test] + fn extract_dns_and_udp_port_quic_v1() { + let got = extract_dns_and_udp_port(&ma("/dns/relay.example.com/udp/3610/quic-v1")).unwrap(); + assert_eq!(got, ("relay.example.com".to_string(), 3610)); + } + + #[test] + fn extract_dns_and_udp_port_skips_tcp() { + assert!(extract_dns_and_udp_port(&ma("/dns/relay.example.com/tcp/3610")).is_none()); + } + + #[test] + fn extract_dns_and_udp_port_dns4_dns6() { + let got4 = + extract_dns_and_udp_port(&ma("/dns4/relay.example.com/udp/3610/quic-v1")).unwrap(); + assert_eq!(got4, ("relay.example.com".to_string(), 3610)); + + let got6 = + extract_dns_and_udp_port(&ma("/dns6/relay.example.com/udp/3610/quic-v1")).unwrap(); + assert_eq!(got6, ("relay.example.com".to_string(), 3610)); + } + + #[test] + fn ipv6_helpers_do_not_crash() { + // Sanity: IPv6-shaped multiaddrs don't match the IPv4 extractors but + // also don't panic. + let addr: Multiaddr = format!("/ip6/{}/tcp/8000", Ipv6Addr::LOCALHOST) + .parse() + .unwrap(); + assert!(extract_ip_and_tcp_port(&addr).is_none()); + assert!(extract_ip_and_udp_port(&addr).is_none()); + } +} diff --git a/crates/relay-server/src/web.rs b/crates/relay-server/src/web.rs index 20e5f24d..8f29185c 100644 --- a/crates/relay-server/src/web.rs +++ b/crates/relay-server/src/web.rs @@ -36,8 +36,12 @@ pub struct AppState { secret_key: SecretKey, /// The peer ID of this node. peer_id: PeerId, - /// The addresses of this node. + /// The libp2p-discovered listen addresses of this node. addrs: Arc>>, + /// External multiaddrs derived from `external_ip` / `external_host` config. + /// Fixed at startup. Includes `/ip4//...` and + /// `/dns//...` variants for both TCP and UDP/QUIC. + external_addrs: Vec, /// The resolved external host IP (if configured). external_host_ip: Arc>>, } @@ -49,16 +53,34 @@ impl AppState { secret_key: SecretKey, peer_id: PeerId, addrs: Arc>>, + external_addrs: Vec, ) -> Self { Self { p2p_config, secret_key, peer_id, addrs, + external_addrs, external_host_ip: Arc::new(RwLock::new(None)), } } + /// Returns the union of configured external multiaddrs and the live + /// libp2p listen addresses, externals first, deduped while preserving + /// order. Mirrors Go charon's `filterAdvertisedAddrs(externalAddrs, + /// internalAddrs, …)` — listeners are already filtered for private + /// addresses at ingest time when `filter_private_addrs` is set. + async fn advertised_addrs(&self) -> Vec { + let listeners = self.addrs.read().await; + let mut union: Vec = self.external_addrs.clone(); + for addr in listeners.iter() { + if !union.contains(addr) { + union.push(addr.clone()); + } + } + union + } + /// Gets the external host IP if set. async fn get_external_host_ip(&self) -> Option { *self.external_host_ip.read().await @@ -72,13 +94,14 @@ impl AppState { } /// Starts the ENR HTTP server. -#[instrument(skip(server_errors, config, secret_key, peer_id, addrs, ct))] +#[instrument(skip(server_errors, config, secret_key, peer_id, addrs, external_addrs, ct))] pub async fn enr_server( server_errors: mpsc::Sender, config: Config, secret_key: SecretKey, peer_id: PeerId, addrs: Arc>>, + external_addrs: Vec, ct: CancellationToken, ) { let Some(http_addr) = config.http_addr.clone() else { @@ -88,7 +111,13 @@ pub async fn enr_server( info!("Starting ENR server"); - let state = AppState::new(config.p2p_config.clone(), secret_key, peer_id, addrs); + let state = AppState::new( + config.p2p_config.clone(), + secret_key, + peer_id, + addrs, + external_addrs, + ); let state_arc = Arc::new(state); // Start external host resolver task if configured @@ -156,6 +185,7 @@ pub async fn monitoring_server(bind_addr: SocketAddr, ct: CancellationToken) { } /// Error response for HTTP handlers. +#[derive(Debug)] pub struct HandlerError { status: StatusCode, message: String, @@ -174,11 +204,7 @@ pub async fn enr_handler( ) -> std::result::Result { debug!("Getting ENR for node {}", state.peer_id); - let addrs = state.addrs.read().await; - - // Sort addresses with public addresses first - let mut sorted_addrs: Vec = addrs.clone(); - drop(addrs); + let mut sorted_addrs = state.advertised_addrs().await; if sorted_addrs.is_empty() { return Err(HandlerError { @@ -199,18 +225,24 @@ pub async fn enr_handler( let mut udp_addr: Option<(Ipv4Addr, u16)> = None; for addr in &sorted_addrs { - if tcp_addr.is_none() - && utils::is_tcp_addr(addr) - && let Some((ip, port)) = utils::extract_ip_and_tcp_port(addr) - { - tcp_addr = Some((apply_ip_override(&state, ip).await, port)); + if tcp_addr.is_none() && utils::is_tcp_addr(addr) { + if let Some((ip, port)) = utils::extract_ip_and_tcp_port(addr) { + tcp_addr = Some((apply_ip_override(&state, ip).await, port)); + } else if let Some((_host, port)) = utils::extract_dns_and_tcp_port(addr) + && let Some(resolved) = state.get_external_host_ip().await + { + tcp_addr = Some((resolved, port)); + } } - if udp_addr.is_none() - && utils::is_quic_addr(addr) - && let Some((ip, port)) = utils::extract_ip_and_udp_port(addr) - { - udp_addr = Some((apply_ip_override(&state, ip).await, port)); + if udp_addr.is_none() && utils::is_quic_addr(addr) { + if let Some((ip, port)) = utils::extract_ip_and_udp_port(addr) { + udp_addr = Some((apply_ip_override(&state, ip).await, port)); + } else if let Some((_host, port)) = utils::extract_dns_and_udp_port(addr) + && let Some(resolved) = state.get_external_host_ip().await + { + udp_addr = Some((resolved, port)); + } } if tcp_addr.is_some() && udp_addr.is_some() { @@ -290,7 +322,7 @@ pub async fn multiaddr_handler( ) -> std::result::Result>, HandlerError> { debug!("Getting multiaddrs for node {}", state.peer_id); - let addrs = state.addrs.read().await.clone(); + let addrs = state.advertised_addrs().await; // Encapsulate peer ID into each address let full_addrs: Vec = addrs @@ -352,3 +384,257 @@ async fn resolve_external_host(state: Arc, external_host: &str) { } } } + +#[cfg(test)] +mod tests { + use super::*; + use axum::response::IntoResponse; + use libp2p::identity::Keypair; + use pluto_eth2util::enr::Record; + use rand::rngs::OsRng; + + fn ma(s: &str) -> Multiaddr { + s.parse().expect("valid multiaddr") + } + + fn test_state( + external_ip: Option<&str>, + external_host: Option<&str>, + external_addrs: Vec, + listeners: Vec, + ) -> (Arc, PeerId) { + let secret_key = SecretKey::random(&mut OsRng); + let peer_id = Keypair::generate_secp256k1().public().to_peer_id(); + let p2p_config = P2PConfig { + external_ip: external_ip.map(String::from), + external_host: external_host.map(String::from), + ..Default::default() + }; + let state = AppState::new( + p2p_config, + secret_key, + peer_id, + Arc::new(RwLock::new(listeners)), + external_addrs, + ); + (Arc::new(state), peer_id) + } + + // ------ AppState::advertised_addrs ------ + + #[tokio::test] + async fn advertised_addrs_externals_only() { + let externals = vec![ma("/dns/example.com/tcp/3610")]; + let (state, _) = test_state(None, Some("example.com"), externals.clone(), vec![]); + assert_eq!(state.advertised_addrs().await, externals); + } + + #[tokio::test] + async fn advertised_addrs_listeners_only() { + let listeners = vec![ma("/ip4/127.0.0.1/tcp/3610")]; + let (state, _) = test_state(None, None, vec![], listeners.clone()); + assert_eq!(state.advertised_addrs().await, listeners); + } + + #[tokio::test] + async fn advertised_addrs_externals_first_then_listeners() { + let externals = vec![ma("/ip4/1.2.3.4/tcp/3610"), ma("/dns/example.com/tcp/3610")]; + let listeners = vec![ma("/ip4/127.0.0.1/tcp/3610")]; + let (state, _) = test_state( + Some("1.2.3.4"), + Some("example.com"), + externals.clone(), + listeners.clone(), + ); + let got = state.advertised_addrs().await; + // Externals first, in order, then listeners. + assert_eq!(got[0], externals[0]); + assert_eq!(got[1], externals[1]); + assert_eq!(got[2], listeners[0]); + assert_eq!(got.len(), 3); + } + + #[tokio::test] + async fn advertised_addrs_dedupes_listener_matching_external() { + let dup = ma("/ip4/1.2.3.4/tcp/3610"); + let externals = vec![dup.clone()]; + // A listener that's byte-equal to an existing external must not be + // emitted twice. + let listeners = vec![dup.clone(), ma("/ip4/10.0.0.1/tcp/3610")]; + let (state, _) = test_state(Some("1.2.3.4"), None, externals, listeners); + + let got = state.advertised_addrs().await; + assert_eq!(got.len(), 2); + assert_eq!(got[0], dup); + assert_eq!(got[1], ma("/ip4/10.0.0.1/tcp/3610")); + } + + #[tokio::test] + async fn advertised_addrs_empty_when_nothing_configured() { + let (state, _) = test_state(None, None, vec![], vec![]); + assert!(state.advertised_addrs().await.is_empty()); + } + + // ------ multiaddr_handler ------ + + #[tokio::test] + async fn multiaddr_handler_returns_externals_with_peer_id() { + let externals = vec![ + ma("/dns/example.com/tcp/3610"), + ma("/dns/example.com/udp/3610/quic-v1"), + ]; + let (state, peer_id) = test_state(None, Some("example.com"), externals, vec![]); + + let Json(addrs) = multiaddr_handler(State(state)).await.expect("ok"); + let peer = peer_id.to_string(); + assert_eq!( + addrs, + vec![ + format!("/dns/example.com/tcp/3610/p2p/{peer}"), + format!("/dns/example.com/udp/3610/quic-v1/p2p/{peer}"), + ] + ); + } + + #[tokio::test] + async fn multiaddr_handler_empty_when_nothing_configured() { + let (state, _) = test_state(None, None, vec![], vec![]); + let Json(addrs) = multiaddr_handler(State(state)).await.expect("ok"); + assert!(addrs.is_empty()); + } + + #[tokio::test] + async fn multiaddr_handler_union_external_ip_only() { + let externals = vec![ + ma("/ip4/1.2.3.4/tcp/3610"), + ma("/ip4/1.2.3.4/udp/3610/quic-v1"), + ]; + let (state, peer_id) = test_state(Some("1.2.3.4"), None, externals, vec![]); + + let Json(addrs) = multiaddr_handler(State(state)).await.expect("ok"); + let peer = peer_id.to_string(); + assert!( + addrs.contains(&format!("/ip4/1.2.3.4/tcp/3610/p2p/{peer}")), + "got {addrs:?}" + ); + assert!( + addrs.contains(&format!("/ip4/1.2.3.4/udp/3610/quic-v1/p2p/{peer}")), + "got {addrs:?}" + ); + } + + // ------ enr_handler ------ + + fn parse_enr(s: &str) -> Record { + Record::try_from(s).expect("valid ENR string") + } + + #[tokio::test] + async fn enr_handler_500_when_nothing_configured() { + let (state, _) = test_state(None, None, vec![], vec![]); + let err = enr_handler(State(state)).await.expect_err("should be 500"); + let response = err.into_response(); + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); + } + + #[tokio::test] + async fn enr_handler_uses_external_ip() { + let externals = vec![ + ma("/ip4/1.2.3.4/tcp/3610"), + ma("/ip4/1.2.3.4/udp/3610/quic-v1"), + ]; + let (state, _) = test_state(Some("1.2.3.4"), None, externals, vec![]); + + let s = enr_handler(State(state)).await.expect("ok"); + let record = parse_enr(&s); + assert_eq!(record.ip().unwrap(), Ipv4Addr::new(1, 2, 3, 4)); + assert_eq!(record.tcp().unwrap(), 3610); + assert_eq!(record.udp().unwrap(), 3610); + } + + #[tokio::test] + async fn enr_handler_external_ip_overrides_listener_ip() { + let externals = vec![ma("/ip4/1.2.3.4/tcp/3610")]; + // Listener has a different IP — `apply_ip_override` must rewrite it. + let listeners = vec![ma("/ip4/127.0.0.1/udp/3610/quic-v1")]; + let (state, _) = test_state(Some("1.2.3.4"), None, externals, listeners); + + let s = enr_handler(State(state)).await.expect("ok"); + let record = parse_enr(&s); + assert_eq!(record.ip().unwrap(), Ipv4Addr::new(1, 2, 3, 4)); + assert_eq!(record.tcp().unwrap(), 3610); + assert_eq!(record.udp().unwrap(), 3610); + } + + #[tokio::test] + async fn enr_handler_dns_fallback_uses_resolved_external_host() { + let externals = vec![ + ma("/dns/example.com/tcp/3610"), + ma("/dns/example.com/udp/3610/quic-v1"), + ]; + let (state, _) = test_state(None, Some("example.com"), externals, vec![]); + + // Simulate the resolver loop populating the cache with a resolved IP. + state + .set_external_host_ip(Some(Ipv4Addr::new(5, 6, 7, 8))) + .await; + + let s = enr_handler(State(state)).await.expect("ok"); + let record = parse_enr(&s); + assert_eq!(record.ip().unwrap(), Ipv4Addr::new(5, 6, 7, 8)); + assert_eq!(record.tcp().unwrap(), 3610); + assert_eq!(record.udp().unwrap(), 3610); + } + + #[tokio::test] + async fn enr_handler_dns_only_without_resolved_ip_returns_500() { + let externals = vec![ + ma("/dns/example.com/tcp/3610"), + ma("/dns/example.com/udp/3610/quic-v1"), + ]; + // No listeners, resolver hasn't populated external_host_ip yet — both + // TCP and UDP scan attempts skip, so the handler bails out. + let (state, _) = test_state(None, Some("example.com"), externals, vec![]); + + let err = enr_handler(State(state)).await.expect_err("should be 500"); + let response = err.into_response(); + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); + } + + #[tokio::test] + async fn enr_handler_external_ip_wins_over_external_host() { + // Both external_ip and external_host configured. The IP-form multiaddr + // sorts first (public IP), and apply_ip_override returns external_ip + // before any DNS fallback runs. + let externals = vec![ + ma("/ip4/1.2.3.4/tcp/3610"), + ma("/ip4/1.2.3.4/udp/3610/quic-v1"), + ma("/dns/example.com/tcp/3610"), + ma("/dns/example.com/udp/3610/quic-v1"), + ]; + let (state, _) = test_state(Some("1.2.3.4"), Some("example.com"), externals, vec![]); + state + .set_external_host_ip(Some(Ipv4Addr::new(9, 9, 9, 9))) + .await; + + let s = enr_handler(State(state)).await.expect("ok"); + let record = parse_enr(&s); + // external_ip beats the resolver-cached external_host IP. + assert_eq!(record.ip().unwrap(), Ipv4Addr::new(1, 2, 3, 4)); + } + + #[tokio::test] + async fn enr_handler_public_listener_used_when_no_externals() { + let listeners = vec![ + ma("/ip4/8.8.8.8/tcp/3610"), + ma("/ip4/8.8.8.8/udp/3610/quic-v1"), + ]; + let (state, _) = test_state(None, None, vec![], listeners); + + let s = enr_handler(State(state)).await.expect("ok"); + let record = parse_enr(&s); + assert_eq!(record.ip().unwrap(), Ipv4Addr::new(8, 8, 8, 8)); + assert_eq!(record.tcp().unwrap(), 3610); + assert_eq!(record.udp().unwrap(), 3610); + } +} diff --git a/crates/relay-server/tests/http_integration.rs b/crates/relay-server/tests/http_integration.rs new file mode 100644 index 00000000..1bf6dca1 --- /dev/null +++ b/crates/relay-server/tests/http_integration.rs @@ -0,0 +1,250 @@ +//! End-to-end integration tests for the relay HTTP layer. +//! +//! Spins up the real `enr_server` axum app on an ephemeral port and asserts +//! `/` and `/enr` over a live HTTP socket via `reqwest`. Tests are isolated +//! by binding to `127.0.0.1:0`-equivalent (find free port, then bind), +//! shutting down via `CancellationToken`, and using config-only knobs so no +//! libp2p swarm is started. +//! +//! DNS scenarios use `localhost` (resolved via `/etc/hosts`) so the suite +//! does not rely on a working public-DNS path in CI. + +use std::{net::Ipv4Addr, sync::Arc, time::Duration}; + +use k256::SecretKey; +use libp2p::{Multiaddr, identity::Keypair}; +use pluto_eth2util::enr::Record; +use pluto_p2p::{ + config::P2PConfig, + utils::{external_tcp_multiaddrs, external_udp_multiaddrs}, +}; +use pluto_relay_server::config::Config; +use rand::rngs::OsRng; +use tokio::{ + net::TcpListener, + sync::{RwLock, mpsc}, +}; +use tokio_util::sync::CancellationToken; + +/// Ephemeral port helper: bind 127.0.0.1:0, capture the assigned port, then +/// drop the listener so `enr_server` can bind it. Small TOCTOU window, fine +/// for tests. +async fn pick_free_port() -> u16 { + let l = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind ephemeral"); + let port = l.local_addr().expect("local_addr").port(); + drop(l); + port +} + +/// Constructs a `P2PConfig` with sensible listen addrs so the external-addr +/// helpers produce something to advertise. The listen ports are advisory: +/// `enr_server` only binds the HTTP listener, not p2p sockets. +fn p2p_config(external_ip: Option<&str>, external_host: Option<&str>, port: u16) -> P2PConfig { + P2PConfig { + tcp_addrs: vec![format!("127.0.0.1:{port}")], + udp_addrs: vec![format!("127.0.0.1:{port}")], + external_ip: external_ip.map(String::from), + external_host: external_host.map(String::from), + ..Default::default() + } +} + +/// Spawn an `enr_server` task bound to a free port and return the base URL +/// plus a cancellation handle. +async fn spawn_server( + p2p_config: P2PConfig, + listeners: Vec, +) -> (String, CancellationToken, tokio::task::JoinHandle<()>) { + let http_port = pick_free_port().await; + let http_addr = format!("127.0.0.1:{http_port}"); + + let config = Config::builder() + .http_addr(http_addr.clone()) + .p2p_config(p2p_config.clone()) + .max_res_per_peer(8) + .max_conns(64) + .build(); + + let external_addrs = { + let mut v = external_tcp_multiaddrs(&p2p_config).expect("tcp externals"); + v.extend(external_udp_multiaddrs(&p2p_config).expect("udp externals")); + v + }; + + let secret_key = SecretKey::random(&mut OsRng); + let peer_id = Keypair::generate_secp256k1().public().to_peer_id(); + let listeners = Arc::new(RwLock::new(listeners)); + let ct = CancellationToken::new(); + let (errs, _errs_rx) = mpsc::channel(4); + + let ct_inner = ct.clone(); + let handle = tokio::spawn(pluto_relay_server::enr_server( + errs, + config, + secret_key, + peer_id, + listeners, + external_addrs, + ct_inner, + )); + + // Wait until the server is actually accepting connections — the spawn is + // racy with the bind, and `reqwest` would otherwise hit `ConnectionRefused`. + let base_url = format!("http://{http_addr}"); + let start = std::time::Instant::now(); + let timeout = Duration::from_secs(5); + loop { + match reqwest::Client::new() + .get(format!("{base_url}/")) + .timeout(Duration::from_millis(200)) + .send() + .await + { + Ok(_) => break, + Err(_) if start.elapsed() < timeout => { + tokio::time::sleep(Duration::from_millis(20)).await; + } + Err(e) => panic!("server never came up: {e}"), + } + } + + (base_url, ct, handle) +} + +async fn shutdown(ct: CancellationToken, handle: tokio::task::JoinHandle<()>) { + ct.cancel(); + // The server may take a moment to drain; bound the wait so a hung test + // fails loudly instead of hanging CI. + let _ = tokio::time::timeout(Duration::from_secs(5), handle).await; +} + +// --------------------------------------------------------------------------- +// Scenario 1 — external_ip only +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn external_ip_only_serves_ip4_multiaddrs_and_enr() { + let cfg = p2p_config(Some("1.2.3.4"), None, 3610); + let (base, ct, handle) = spawn_server(cfg, vec![]).await; + + // GET / + let body: Vec = reqwest::get(format!("{base}/")) + .await + .expect("/ request") + .json() + .await + .expect("/ json"); + assert_eq!( + body.len(), + 2, + "expected exactly 2 advertised addrs: {body:?}" + ); + assert!( + body.iter() + .any(|a| a.starts_with("/ip4/1.2.3.4/tcp/3610/p2p/")), + "missing tcp external addr in {body:?}" + ); + assert!( + body.iter() + .any(|a| a.starts_with("/ip4/1.2.3.4/udp/3610/quic-v1/p2p/")), + "missing udp external addr in {body:?}" + ); + + // GET /enr + let resp = reqwest::get(format!("{base}/enr")) + .await + .expect("/enr request"); + assert_eq!(resp.status(), 200); + let enr_str = resp.text().await.expect("/enr body"); + let record = Record::try_from(enr_str.as_str()).expect("valid ENR"); + assert_eq!(record.ip().expect("ip"), Ipv4Addr::new(1, 2, 3, 4)); + assert_eq!(record.tcp().expect("tcp"), 3610); + assert_eq!(record.udp().expect("udp"), 3610); + + shutdown(ct, handle).await; +} + +// --------------------------------------------------------------------------- +// Scenario 2 — nothing configured +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn empty_config_returns_empty_list_and_500_for_enr() { + let cfg = P2PConfig::default(); + let (base, ct, handle) = spawn_server(cfg, vec![]).await; + + // GET / — empty array. + let body: Vec = reqwest::get(format!("{base}/")) + .await + .expect("/ request") + .json() + .await + .expect("/ json"); + assert!(body.is_empty(), "expected []: {body:?}"); + + // GET /enr — 500 "no addresses". + let resp = reqwest::get(format!("{base}/enr")) + .await + .expect("/enr request"); + assert_eq!(resp.status(), 500); + + shutdown(ct, handle).await; +} + +// --------------------------------------------------------------------------- +// Scenario 3 — external_host=localhost; resolver populates 127.0.0.1 +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn external_host_localhost_resolves_for_enr() { + let cfg = p2p_config(None, Some("localhost"), 3610); + let (base, ct, handle) = spawn_server(cfg, vec![]).await; + + // GET / — DNS-form multiaddrs are emitted verbatim, no resolution needed. + let body: Vec = reqwest::get(format!("{base}/")) + .await + .expect("/ request") + .json() + .await + .expect("/ json"); + assert!( + body.iter() + .any(|a| a.starts_with("/dns/localhost/tcp/3610/p2p/")), + "missing dns tcp addr in {body:?}" + ); + assert!( + body.iter() + .any(|a| a.starts_with("/dns/localhost/udp/3610/quic-v1/p2p/")), + "missing dns udp addr in {body:?}" + ); + + // GET /enr — the resolver loop fires immediately on first tick, but the + // server may briefly respond 500 before the cache is populated. Poll + // until 200 or timeout. + let start = std::time::Instant::now(); + let timeout = Duration::from_secs(5); + let record = loop { + let resp = reqwest::get(format!("{base}/enr")) + .await + .expect("/enr request"); + if resp.status() == 200 { + let body = resp.text().await.expect("/enr body"); + break Record::try_from(body.as_str()).expect("valid ENR"); + } + if start.elapsed() >= timeout { + panic!("/enr never returned 200; last status={}", resp.status()); + } + tokio::time::sleep(Duration::from_millis(50)).await; + }; + + let ip = record.ip().expect("ENR has ip"); + // `localhost` may resolve to either 127.0.0.1 (typical) or another + // loopback alias depending on /etc/hosts; just assert it's loopback. + assert!(ip.is_loopback(), "expected loopback IP, got {ip}"); + assert_eq!(record.tcp().expect("tcp"), 3610); + assert_eq!(record.udp().expect("udp"), 3610); + + shutdown(ct, handle).await; +}