Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions vtether-cli/src/gc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use aya::maps::{HashMap, Map, MapData};

use anyhow::Context as _;
use log::info;
use log::{info, warn};

pub const IPPROTO_TCP: u8 = 6;

Expand Down Expand Up @@ -133,7 +133,9 @@ pub fn reap_conntrack(pin_path: &std::path::Path) -> anyhow::Result<GcResult> {
);
}
for key in &expired_keys {
let _ = ct4.remove(key);
ct4.remove(key)
.inspect_err(|error| warn!("failed to remove expired CT entry: {error}"))
.ok();
}

// ---- Phase 2: Purge orphan SNAT entries ----
Expand Down Expand Up @@ -194,7 +196,10 @@ pub fn reap_conntrack(pin_path: &std::path::Path) -> anyhow::Result<GcResult> {
info!("gc: purging {} orphan SNAT entries", orphan_keys.len());
}
for key in &orphan_keys {
let _ = snat4.remove(key);
snat4
.remove(key)
.inspect_err(|error| warn!("failed to remove orphan SNAT entry: {error}"))
.ok();
}
snat_orphans = orphan_keys.len() as u64;
}
Expand Down
47 changes: 46 additions & 1 deletion vtether-cli/src/helper.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::io::ErrorKind;
use std::net::Ipv4Addr;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::process::Command;

use anyhow::Context as _;
use log::warn;

const STATE_BASE_DIR: &str = "/run/vtether";

Expand All @@ -28,3 +31,45 @@ pub fn get_interface_ipv4(interface: &str) -> anyhow::Result<Ipv4Addr> {
}
anyhow::bail!("no IPv4 address found on interface '{interface}'")
}

pub fn best_effort_command(mut command: Command, description: &str) {
match command
.status()
.inspect_err(|error| warn!("{description}: {error}"))
{
Ok(status) if !status.success() => {
warn!("{description}: exited with status {status}");
}
_ => {}
}
}

pub fn best_effort_remove_file(path: &Path) {
std::fs::remove_file(path)
.inspect_err(|error| {
if error.kind() != ErrorKind::NotFound {
warn!("failed to remove {}: {error}", path.display());
}
})
.ok();
}

pub fn best_effort_remove_dir(path: &Path) {
std::fs::remove_dir(path)
.inspect_err(|error| {
if error.kind() != ErrorKind::NotFound {
warn!("failed to remove {}: {error}", path.display());
}
})
.ok();
}

pub fn best_effort_remove_dir_all(path: &Path) {
std::fs::remove_dir_all(path)
.inspect_err(|error| {
if error.kind() != ErrorKind::NotFound {
warn!("failed to remove {}: {error}", path.display());
}
})
.ok();
}
34 changes: 30 additions & 4 deletions vtether-cli/src/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::net::Ipv4Addr;

use anyhow::Context as _;
use aya::maps::{HashMap, Map, MapData, PerCpuHashMap, PerCpuValues};
use log::warn;

use crate::{
gc::{CtEntry, Ipv4CtTuple, SnatEntry, ktime_get_ns},
Expand Down Expand Up @@ -39,7 +40,9 @@ pub fn inspect(pin_path: &std::path::Path, verbose: bool) -> anyhow::Result<()>
);

let state_dir = state_dir_for(pin_path);
let interface = std::fs::read_to_string(state_dir.join("interface"))
let interface_path = state_dir.join("interface");
let interface = std::fs::read_to_string(&interface_path)
.inspect_err(|error| warn!("failed to read {}: {error}", interface_path.display()))
.unwrap_or_else(|_| "unknown".to_string());
println!("vtether: attached to {}", interface.trim());

Expand Down Expand Up @@ -104,7 +107,16 @@ pub fn inspect(pin_path: &std::path::Path, verbose: bool) -> anyhow::Result<()>
};
if let Some(s) = route_stats
.as_ref()
.and_then(|m| m.get(&stats_key, 0).ok())
.and_then(|map| {
map.get(&stats_key, 0)
.inspect_err(|error| {
warn!(
"failed to read route stats for rev_nat_index {}: {error}",
svc.rev_nat_index
)
})
.ok()
})
.map(|v| aggregate_stats(&v))
{
print!(
Expand All @@ -129,7 +141,14 @@ pub fn inspect(pin_path: &std::path::Path, verbose: bool) -> anyhow::Result<()>
let map = Map::LruHashMap(map_data);
let ct4: HashMap<_, Ipv4CtTuple, CtEntry> =
HashMap::try_from(map).context("failed to parse CT4 map")?;
Ok(ct4.iter().filter_map(Result::ok).collect())
Ok(ct4
.iter()
.filter_map(|entry| {
entry
.inspect_err(|error| warn!("skipping unreadable CT4 entry: {error}"))
.ok()
})
.collect())
})() {
Ok(entries) => {
println!("\nActive connections: {} CT entries", entries.len());
Expand All @@ -155,7 +174,14 @@ pub fn inspect(pin_path: &std::path::Path, verbose: bool) -> anyhow::Result<()>
let map = Map::LruHashMap(map_data);
let snat4: HashMap<_, Ipv4CtTuple, SnatEntry> =
HashMap::try_from(map).context("failed to parse SNAT4 map")?;
Ok(snat4.iter().filter_map(Result::ok).collect())
Ok(snat4
.iter()
.filter_map(|entry| {
entry
.inspect_err(|error| warn!("skipping unreadable SNAT4 entry: {error}"))
.ok()
})
.collect())
})() {
Ok(entries) => {
println!("\nSNAT4 entries ({}):", entries.len());
Expand Down
103 changes: 61 additions & 42 deletions vtether-cli/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
use std::net::{Ipv4Addr, SocketAddrV4};
use std::path::PathBuf;
use std::process::Command;

use anyhow::Context as _;
use aya::maps::{Array, HashMap};
use aya::programs::links::FdLink;
use aya::programs::{Xdp, XdpFlags};
use log::info;
use log::{info, warn};
use serde::Deserialize;

use crate::{
gc::{
GC_INTERVAL_DEFAULT_SECS, GC_INTERVAL_MAX_SECS, GC_INTERVAL_MIN_SECS, IPPROTO_TCP,
adapt_gc_interval, reap_conntrack,
},
helper::{get_interface_ipv4, state_dir_for},
helper::{
best_effort_command, best_effort_remove_dir, best_effort_remove_dir_all,
best_effort_remove_file, get_interface_ipv4, state_dir_for,
},
setup::setup_sysctl,
};

Expand Down Expand Up @@ -122,6 +126,28 @@ pub struct SnatConfig {
}
unsafe impl aya::Pod for SnatConfig {}

fn cleanup_partial_proxy_state(
interface: &str,
prog_pin: &std::path::Path,
pin_path: &std::path::Path,
state_dir: &std::path::Path,
) {
let mut detach_xdp = Command::new("ip");
detach_xdp.args(["link", "set", "dev", interface, "xdp", "off"]);
best_effort_command(
detach_xdp,
&format!("failed to detach XDP from {interface}"),
);

best_effort_remove_file(prog_pin);
best_effort_remove_file(&pin_path.join("link"));
for &(_, pin_name) in MAP_PINS {
best_effort_remove_file(&pin_path.join(pin_name));
}
best_effort_remove_dir(pin_path);
best_effort_remove_dir_all(state_dir);
}

#[allow(clippy::too_many_lines)]
pub async fn proxy_up(config_path: PathBuf, pin_path: PathBuf) -> anyhow::Result<()> {
let config_str = std::fs::read_to_string(&config_path)
Expand Down Expand Up @@ -288,9 +314,9 @@ pub async fn proxy_up(config_path: PathBuf, pin_path: PathBuf) -> anyhow::Result
}

// Init eBPF logger
if let Err(e) = aya_log::EbpfLogger::init(&mut ebpf) {
log::warn!("failed to init eBPF logger: {e}");
}
aya_log::EbpfLogger::init(&mut ebpf)
.inspect_err(|error| warn!("failed to init eBPF logger: {error}"))
.ok();

// Pin maps
std::fs::create_dir_all(&pin_path)
Expand Down Expand Up @@ -336,19 +362,12 @@ pub async fn proxy_up(config_path: PathBuf, pin_path: PathBuf) -> anyhow::Result
Ok(())
};

if let Err(e) = finish() {
let _ = std::process::Command::new("ip")
.args(["link", "set", "dev", &config.interface, "xdp", "off"])
.status();
let _ = std::fs::remove_file(&prog_pin);
let _ = std::fs::remove_file(pin_path.join("link"));
for &(_, pin_name) in MAP_PINS {
let _ = std::fs::remove_file(pin_path.join(pin_name));
}
let _ = std::fs::remove_dir(&pin_path);
let _ = std::fs::remove_dir_all(&state_dir);
return Err(e.context("proxy up failed, cleaned up partial state"));
}
finish()
.inspect_err(|error| {
warn!("proxy up finalization failed: {error:#}");
cleanup_partial_proxy_state(&config.interface, &prog_pin, &pin_path, &state_dir);
})
.map_err(|error| error.context("proxy up failed, cleaned up partial state"))?;

info!(
"proxy up: xdp on {}, snat_ip: {}, conntrack: {}",
Expand All @@ -372,23 +391,20 @@ pub async fn proxy_up(config_path: PathBuf, pin_path: PathBuf) -> anyhow::Result
);
loop {
tokio::time::sleep(std::time::Duration::from_secs(gc_interval_secs)).await;
match reap_conntrack(&reaper_pin_path) {
Ok(result) => {
if result.expired > 0 || result.orphans > 0 {
info!(
"gc cycle: total={} expired={} orphans={} next_interval={}s",
result.total,
result.expired,
result.orphans,
adapt_gc_interval(gc_interval_secs, result.total, result.expired),
);
}
gc_interval_secs =
adapt_gc_interval(gc_interval_secs, result.total, result.expired);
}
Err(e) => {
log::warn!("conntrack gc error: {e:#}");
if let Ok(result) = reap_conntrack(&reaper_pin_path)
.inspect_err(|error| warn!("conntrack gc error: {error:#}"))
{
if result.expired > 0 || result.orphans > 0 {
info!(
"gc cycle: total={} expired={} orphans={} next_interval={}s",
result.total,
result.expired,
result.orphans,
adapt_gc_interval(gc_interval_secs, result.total, result.expired),
);
}
gc_interval_secs =
adapt_gc_interval(gc_interval_secs, result.total, result.expired);
}
}
});
Expand Down Expand Up @@ -432,17 +448,20 @@ pub fn proxy_destroy(pin_path: &std::path::Path) -> anyhow::Result<()> {
link.unpin().context("failed to unpin link")?;
}

let _ = std::process::Command::new("ip")
.args(["link", "set", "dev", interface.trim(), "xdp", "off"])
.status();
let mut detach_xdp = Command::new("ip");
detach_xdp.args(["link", "set", "dev", interface.trim(), "xdp", "off"]);
best_effort_command(
detach_xdp,
&format!("failed to detach XDP from {}", interface.trim()),
);

let _ = std::fs::remove_file(&prog_pin);
let _ = std::fs::remove_file(pin_path.join("link"));
best_effort_remove_file(&prog_pin);
best_effort_remove_file(&pin_path.join("link"));
for &(_, pin_name) in MAP_PINS {
let _ = std::fs::remove_file(pin_path.join(pin_name));
best_effort_remove_file(&pin_path.join(pin_name));
}
let _ = std::fs::remove_dir(pin_path);
let _ = std::fs::remove_dir_all(&state_dir);
best_effort_remove_dir(pin_path);
best_effort_remove_dir_all(&state_dir);

println!(
"vtether: proxy destroy (detached from {})",
Expand Down
33 changes: 22 additions & 11 deletions vtether-cli/src/setup.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::path::PathBuf;
use std::process::Command;

use anyhow::Context as _;

use crate::helper::best_effort_command;

pub const DEFAULT_CONFIG_PATH: &str = "/etc/vtether/config.yaml";
const SYSTEMD_UNIT_PATH: &str = "/etc/systemd/system/vtether.service";

Expand Down Expand Up @@ -65,9 +68,16 @@ fn get_default_interface() -> anyhow::Result<String> {

pub fn setup() -> anyhow::Result<()> {
let vtether_bin = std::env::current_exe().context("failed to determine vtether binary path")?;
let vtether_bin = vtether_bin.canonicalize().unwrap_or(vtether_bin);
let vtether_bin = vtether_bin
.canonicalize()
.inspect_err(|error| log::warn!("failed to canonicalize current executable: {error}"))
.unwrap_or(vtether_bin);

let default_iface = get_default_interface().unwrap_or_else(|_| "eth0".to_string());
let default_iface = get_default_interface()
.inspect_err(|error| {
log::warn!("failed to detect default interface, falling back to eth0: {error:#}")
})
.unwrap_or_else(|_| "eth0".to_string());

let config_dir = PathBuf::from(DEFAULT_CONFIG_PATH)
.parent()
Expand Down Expand Up @@ -109,22 +119,23 @@ pub fn setup() -> anyhow::Result<()> {
}

pub fn remove() -> anyhow::Result<()> {
let _ = std::process::Command::new("systemctl")
.args(["stop", "vtether"])
.status();
let _ = std::process::Command::new("systemctl")
.args(["disable", "vtether"])
.status();
let mut stop = Command::new("systemctl");
stop.args(["stop", "vtether"]);
best_effort_command(stop, "failed to stop vtether service");

let mut disable = Command::new("systemctl");
disable.args(["disable", "vtether"]);
best_effort_command(disable, "failed to disable vtether service");

if PathBuf::from(SYSTEMD_UNIT_PATH).exists() {
std::fs::remove_file(SYSTEMD_UNIT_PATH)
.with_context(|| format!("failed to remove {SYSTEMD_UNIT_PATH}"))?;
println!(" removed {SYSTEMD_UNIT_PATH}");
}

let _ = std::process::Command::new("systemctl")
.args(["daemon-reload"])
.status();
let mut daemon_reload = Command::new("systemctl");
daemon_reload.args(["daemon-reload"]);
best_effort_command(daemon_reload, "failed to reload systemd units");

println!("\nvtether removed.");
Ok(())
Expand Down
Loading