diff --git a/go.mod b/go.mod index 98e708e..4853b31 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/vishvananda/netlink v1.3.1 golang.org/x/sync v0.16.0 golang.org/x/sys v0.35.0 + golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173 gvisor.dev/gvisor v0.0.0-20250606001031-fa4c4dd86b43 ) @@ -25,5 +26,6 @@ require ( golang.org/x/crypto v0.41.0 // indirect golang.org/x/net v0.43.0 // indirect golang.org/x/time v0.7.0 // indirect + golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index bab901a..7f2d171 100644 --- a/go.sum +++ b/go.sum @@ -58,6 +58,10 @@ golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 h1:B82qJJgjvYKsXS9jeunTOisW56dUokqW/FOteYJJ/yg= +golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2/go.mod h1:deeaetjYA+DHMHg+sMSMI58GrEteJUUzzw7en6TJQcI= +golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173 h1:/jFs0duh4rdb8uIfPMv78iAJGcPKDeqAFnaLBropIC4= +golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173/go.mod h1:tkCQ4FQXmpAgYVh++1cq16/dH4QJtmvpRv19DWGAHSA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/vtep/afxdp/datapath.go b/vtep/afxdp/datapath.go new file mode 100644 index 0000000..9dc4482 --- /dev/null +++ b/vtep/afxdp/datapath.go @@ -0,0 +1,53 @@ +//go:build linux + +// Package afxdp adapts the zero-copy AF_XDP forwarder.Forwarder to the +// vtep.Datapath seam. It is the privileged, kernel-zero-copy driver of the vtep +// family (veth + driver-XDP NIC, shared UMEM, in-place transform), used for the +// per-node / tunnelproxy VTEP where the overlay consumer is the host kernel. +// +// Unlike the netstack and tun drivers — which drive the engine through the +// cross-buffer EngineXfrm contract — the AF_XDP driver keeps the in-place +// forwarder.Handler contract and its shared-UMEM zero-copy fast path entirely +// internally. This wrapper is a pure lifecycle adapter (Run/Close delegate to the +// forwarder's existing per-queue poll loop); it deliberately does not route the +// hot path through EngineXfrm, whose copy-based contract would forfeit the +// zero-copy handoff. The forwarder's in-place loop, shared UMEM, and +// minInPlaceHeadroom invariant are untouched. +package afxdp + +import ( + "context" + + "github.com/apoxy-dev/icx/forwarder" + "github.com/apoxy-dev/icx/vtep" +) + +// Datapath wraps a *forwarder.Forwarder so it satisfies vtep.Datapath. The +// wrapped forwarder owns its veth/XDP device and AF_XDP underlay and releases +// them on Close. +type Datapath struct { + fwd *forwarder.Forwarder +} + +var _ vtep.Datapath = (*Datapath)(nil) + +// Wrap adapts an already-constructed forwarder.Forwarder (built via +// forwarder.NewForwarder with the desired phy/virt/filter options) to the +// vtep.Datapath interface. The caller transfers ownership: the returned +// Datapath's Close closes the forwarder. +func Wrap(fwd *forwarder.Forwarder) *Datapath { + return &Datapath{fwd: fwd} +} + +// Run drives the forwarder's per-queue poll loop, blocking until ctx is +// cancelled or a queue errors, then returns. Run must not be called more than +// once (the forwarder's own guards apply). +func (d *Datapath) Run(ctx context.Context) error { + return d.fwd.Start(ctx) +} + +// Close releases the forwarder's device and AF_XDP resources. It is idempotent +// and safe to call after Run returns. +func (d *Datapath) Close() error { + return d.fwd.Close() +} diff --git a/vtep/tun/datapath.go b/vtep/tun/datapath.go new file mode 100644 index 0000000..3634f32 --- /dev/null +++ b/vtep/tun/datapath.go @@ -0,0 +1,468 @@ +// Package tun implements the tun VTEP datapath driver: it splices a kernel +// /dev/net/tun device (the overlay-side, L3 link the consumer routes to) to the +// ICX engine, moving encap'd frames over a UDP-socket underlay. +// +// It is the kernel-device, NET_ADMIN-only driver of the vtep family. Unlike the +// netstack driver — which is handed both its endpoint and underlay by the +// consumer and only drives the pump — the tun driver OWNS both the TUN device +// and the UDP socket and tears them down on Close (see vtep.Datapath). This is +// the consumer-locality seam for a normal kernel-socket process (Envoy on the +// backplane): the overlay consumer reaches overlay backends by kernel route, so +// the VTEP must present a kernel device it can route to. +// +// The engine speaks full Ethernet+IP+UDP+Geneve frames on the physical side +// (udp.Encode/udp.Decode own the outer stack); the backplane pod has NET_ADMIN +// but no CAP_NET_RAW, so the underlay is a plain UDP socket and the driver's +// Underlay implementation peels the outer headers on TX and synthesizes them on +// RX (see udpUnderlay). The driver drives the engine strictly through the +// cross-buffer EngineXfrm contract — never the in-place forwarder.Handler path, +// whose shared-UMEM / exact-GCM-overlap contract a copy-based driver cannot +// honor. +package tun + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "net" + "os" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/apoxy-dev/icx/vtep" + "golang.org/x/sync/errgroup" +) + +const ( + // defaultFlushInterval is how often the keep-alive pump flushes + // engine-scheduled frames (EngineXfrm.ToPhy) when there is no overlay traffic. + defaultFlushInterval = 100 * time.Millisecond + + // defaultInnerMTU is the mandatory static inner-MTU clamp for a software/TUN + // VTEP. icx has no PMTUD, so an inner packet whose encapsulated size exceeds + // the underlay path MTU would be black-holed; 1280 (the IPv6 minimum MTU) is + // the universally safe floor (seam doc Risk 3 / APO-794). + defaultInnerMTU = 1280 + + // maxFrameSize bounds a single underlay/decap scratch buffer. + maxFrameSize = 65535 + + // encapHeadroom bounds the outer-header + AEAD-tag overhead added to an inner + // packet on encap: udp.PayloadOffsetIPv6 (62) + the 32-byte Geneve header + + // the 16-byte AES-GCM tag = 110. Rounded up for alignment slack. + encapHeadroom = 128 + + // tunHeadroom is extra room in a TUN read buffer beyond offset+MTU, for the + // device's virtio-net header prefix (the device is created with IFF_VNET_HDR). + tunHeadroom = 64 + + // maxErrorBackoff caps the interruptible sleep a pump takes after consecutive + // non-closed read errors, so a wedged device/socket cannot peg a core. + maxErrorBackoff = time.Second + // maxConsecReadErrors is how many consecutive non-closed read errors a pump + // tolerates before treating the device/socket as fatally wedged and returning, + // so Run tears down and a supervisor can learn and restart rather than the + // datapath warning-storming forever. + maxConsecReadErrors = 16 +) + +// Device is the overlay-side L3 TUN device the driver owns. It is the subset of +// golang.zx2c4.com/wireguard/tun.Device the datapath uses; the real /dev/net/tun +// device (device_linux.go) and the in-memory test fake both satisfy it. Packets +// are raw L3 IP (the device is opened IFF_NO_PI); Read/Write take a fixed offset +// (Config.DeviceOffset) so the device implementation has the headroom it needs. +type Device interface { + // Read reads up to len(bufs) packets, writing packet i into bufs[i][offset:] + // and its length into sizes[i] (len(sizes) >= len(bufs)); returns the packet + // count. It blocks until at least one packet is available or the device is + // closed (after which it returns a closed error). + Read(bufs [][]byte, sizes []int, offset int) (int, error) + // Write writes len(bufs) packets, taking packet i from bufs[i][offset:]. + Write(bufs [][]byte, offset int) (int, error) + // BatchSize is the max number of packets a single Read/Write handles. + BatchSize() int + io.Closer +} + +// Underlay is the encap'd-frame transport the driver owns. It carries full +// Ethernet+IP+UDP+Geneve "phy" frames to/from the engine — the same contract the +// netstack driver's Underlay uses — peeling the outer headers onto a UDP socket +// internally (see udpUnderlay). It additionally satisfies io.Closer because the +// tun driver owns the socket's lifecycle; closing it unblocks a running +// inbound pump. +type Underlay interface { + // ReadFrame reads a single underlay frame into buf, returning its length. A + // length of 0 with a nil error is skipped. Once Close has been called it must + // return an error satisfying errors.Is(err, net.ErrClosed). + ReadFrame(buf []byte) (int, error) + // WriteFrames writes a batch of full phy frames, returning the number actually + // sent. A short write (n < len(frames)) is permitted; the datapath drains the + // unsent suffix by calling again, so implementations must not drop the tail. A + // zero return with a nil error is treated as a stall. The frames are owned by + // the caller and valid only until the call returns. + WriteFrames(frames [][]byte) (int, error) + io.Closer +} + +// Config configures a Datapath. Engine, Device and Underlay are required. +type Config struct { + // Engine is the ICX engine performing encap/decap + crypto. *icx.Handler + // satisfies this; it must be configured in layer3 mode (WithLayer3VirtFrames), + // since the TUN device carries raw L3 inner packets. + Engine vtep.EngineXfrm + // Device is the overlay-side TUN device. The driver owns it and closes it on + // Close. + Device Device + // Underlay is the encap'd-frame transport. The driver owns it and closes it on + // Close. + Underlay Underlay + // DeviceOffset is the read/write headroom offset into Device buffers. The real + // wireguard TUN needs >= virtioNetHdrLen (10); device_linux.go uses 16. The + // test fake uses 0. Negative values are treated as 0. + DeviceOffset int + // InnerMTU is the overlay MTU; 0 uses defaultInnerMTU (1280). It sizes the TUN + // read/decap buffers and must not exceed what the underlay path can carry. + InnerMTU int + // FlushInterval overrides how often scheduled (keep-alive) frames are flushed. + // Zero uses defaultFlushInterval. + FlushInterval time.Duration +} + +// Datapath splices a TUN device to the ICX engine over a UDP underlay. It +// implements vtep.Datapath. +type Datapath struct { + engine vtep.EngineXfrm + dev Device + underlay Underlay + offset int + innerMTU int + flush time.Duration + + // running guards against a second Run, which would start duplicate pumps + // racing on the same device/underlay. + running atomic.Bool + // closed is set by Close so Run can reject a call after Close per the contract. + closed atomic.Bool + + // done is closed once by Close to stop the keep-alive pump (the data pumps + // stop when their blocking device/underlay read returns a closed error). + done chan struct{} + closeOnce sync.Once +} + +var _ vtep.Datapath = (*Datapath)(nil) + +// New creates a Datapath over an injected device and underlay. The pumps do not +// run until Run is called. +func New(cfg Config) (*Datapath, error) { + if cfg.Engine == nil { + return nil, fmt.Errorf("tun datapath: engine is required") + } + if cfg.Device == nil { + return nil, fmt.Errorf("tun datapath: device is required") + } + if cfg.Underlay == nil { + return nil, fmt.Errorf("tun datapath: underlay is required") + } + flush := cfg.FlushInterval + if flush <= 0 { + flush = defaultFlushInterval + } + mtu := cfg.InnerMTU + if mtu <= 0 { + mtu = defaultInnerMTU + } + off := cfg.DeviceOffset + if off < 0 { + off = 0 + } + return &Datapath{ + engine: cfg.Engine, + dev: cfg.Device, + underlay: cfg.Underlay, + offset: off, + innerMTU: mtu, + flush: flush, + done: make(chan struct{}), + }, nil +} + +// Close stops the datapath and releases the device and underlay it owns. It is +// safe to call after Run returns and idempotent. Closing the device unblocks the +// outbound pump; closing the underlay unblocks the inbound pump; closing done +// stops the keep-alive pump. +func (d *Datapath) Close() error { + d.closeOnce.Do(func() { + d.closed.Store(true) + close(d.done) + _ = d.dev.Close() + _ = d.underlay.Close() + }) + return nil +} + +// Run drives the pump loops and blocks until shutdown. Cancelling ctx (or calling +// Close) tears down the device and underlay, which unblocks the data pumps; Run +// then returns. A nil return means a clean shutdown. Run must not be called more +// than once, nor after Close. +func (d *Datapath) Run(ctx context.Context) error { + if !d.running.CompareAndSwap(false, true) { + return fmt.Errorf("tun datapath: Run already called") + } + if d.closed.Load() { + return fmt.Errorf("tun datapath: Run called after Close") + } + + g, ctx := errgroup.WithContext(ctx) + + // On cancellation, tear down so the blocking pumps unblock and return. + g.Go(func() error { + <-ctx.Done() + _ = d.Close() + return nil + }) + + g.Go(d.outbound) + g.Go(d.inbound) + g.Go(d.keepalive) + + if err := g.Wait(); err != nil && !isClosedErr(err) { + return fmt.Errorf("tun datapath splicing failed: %w", err) + } + return nil +} + +// outbound pumps TUN (overlay L3) -> engine -> underlay (batched encap). +func (d *Datapath) outbound() error { + bs := d.dev.BatchSize() + if bs < 1 { + bs = 1 + } + readBufLen := d.offset + d.innerMTU + tunHeadroom + // Size the encap buffer for the largest inner packet the read buffer can admit + // (innerMTU+tunHeadroom) plus the outer/Geneve/tag overhead, so a packet in the + // (innerMTU, innerMTU+tunHeadroom] range encaps instead of being dropped by the + // VirtToPhy (APO-667) bound. + encBufLen := d.innerMTU + tunHeadroom + encapHeadroom + + readBufs := make([][]byte, bs) + encBufs := make([][]byte, bs) + for i := 0; i < bs; i++ { + readBufs[i] = make([]byte, readBufLen) + encBufs[i] = make([]byte, encBufLen) + } + sizes := make([]int, bs) + frames := make([][]byte, 0, bs) + consecErr := 0 + + for { + n, err := d.dev.Read(readBufs, sizes, d.offset) + if err != nil { + if isClosedErr(err) { + return net.ErrClosed + } + consecErr++ + slog.Warn("tun datapath: error reading from device", + slog.Any("error", err), slog.Int("consecutive", consecErr)) + if consecErr >= maxConsecReadErrors { + return fmt.Errorf("tun datapath: device read failed %d times consecutively: %w", consecErr, err) + } + if !d.backoffOnError(consecErr) { + return net.ErrClosed + } + continue + } + consecErr = 0 + + frames = frames[:0] + for i := 0; i < n; i++ { + if sizes[i] <= 0 || d.offset+sizes[i] > len(readBufs[i]) { + // Defensive: the device contract is single <= MTU packets, but never + // slice past the buffer if it ever reports an oversized length. + continue + } + inner := readBufs[i][d.offset : d.offset+sizes[i]] + m, loop := d.engine.VirtToPhy(inner, encBufs[i]) + if loop { + // The local-reply flag is only set in L2 mode (an inline ARP/ND + // reply that must go back out the overlay). The tun VTEP is L3, so + // this is never expected; drop defensively rather than mis-route it + // onto the underlay. + slog.Debug("tun datapath: unexpected L2 local-reply in L3 mode, dropping") + continue + } + if m > 0 { + frames = append(frames, encBufs[i][:m]) + } + } + if len(frames) == 0 { + continue + } + if err := d.writeFrames(frames); err != nil { + if isClosedErr(err) { + return net.ErrClosed + } + slog.Warn("tun datapath: error writing underlay frames", slog.Any("error", err)) + } + } +} + +// inbound pumps underlay -> engine -> TUN (decap + L3 inject). +func (d *Datapath) inbound() error { + phyBuf := make([]byte, maxFrameSize) + // virtBuf must hold the full decapsulated inner packet at the device offset. + // PhyToVirt does NOT bound its output to the destination buffer — the APO-667 + // seal-overflow bound covers encap only, and AES-GCM Open appends, reallocating + // (and mis-placing the plaintext) if the destination is too small. A peer + // holding the SA key can emit an inner packet up to the underlay frame size, so + // size for the worst case (matching the netstack driver's 65535 buffers) rather + // than the local MTU clamp. + virtBuf := make([]byte, d.offset+maxFrameSize) + writeBatch := make([][]byte, 1) + consecErr := 0 + + for { + n, err := d.underlay.ReadFrame(phyBuf) + if err != nil { + if isClosedErr(err) { + return net.ErrClosed + } + consecErr++ + slog.Warn("tun datapath: error reading underlay frame", + slog.Any("error", err), slog.Int("consecutive", consecErr)) + if consecErr >= maxConsecReadErrors { + return fmt.Errorf("tun datapath: underlay read failed %d times consecutively: %w", consecErr, err) + } + if !d.backoffOnError(consecErr) { + return net.ErrClosed + } + continue + } + consecErr = 0 + if n == 0 { + continue + } + + // Decap into virtBuf at the device offset so the packet can be handed to + // Device.Write without a copy. + m := d.engine.PhyToVirt(phyBuf[:n], virtBuf[d.offset:]) + if m == 0 { + continue + } + if d.offset+m > len(virtBuf) { + // Unreachable given virtBuf is sized to maxFrameSize, but never slice + // past the buffer if that ever changes. + slog.Warn("tun datapath: decapsulated packet exceeds buffer, dropping", slog.Int("len", m)) + continue + } + writeBatch[0] = virtBuf[:d.offset+m] + if _, err := d.dev.Write(writeBatch, d.offset); err != nil { + if isClosedErr(err) { + return net.ErrClosed + } + slog.Warn("tun datapath: error writing to device", slog.Any("error", err)) + } + } +} + +// keepalive pumps engine-scheduled frames (EngineXfrm.ToPhy keep-alives) to the +// underlay on a flush ticker. The data pumps cannot coalesce these the way the +// netstack driver does, because the TUN Read blocks; a dedicated ticker pump is +// the cross-buffer-equivalent way to flush them with no overlay traffic. +func (d *Datapath) keepalive() error { + bs := d.dev.BatchSize() + if bs < 1 { + bs = 1 + } + encBufs := make([][]byte, bs) + for i := range encBufs { + // Size like the outbound encap buffer (tracks the configured MTU + overhead) + // rather than a fixed default, so a future scheduled-frame type that carries + // a payload cannot under-size the buffer. Today ToPhy emits only empty-payload + // keep-alives, so this is headroom, not a live requirement. + encBufs[i] = make([]byte, d.innerMTU+encapHeadroom) + } + frames := make([][]byte, 0, bs) + + ticker := time.NewTicker(d.flush) + defer ticker.Stop() + + for { + select { + case <-d.done: + return nil + case <-ticker.C: + } + + frames = frames[:0] + for i := 0; i < bs; i++ { + m := d.engine.ToPhy(encBufs[i]) + if m == 0 { + break + } + frames = append(frames, encBufs[i][:m]) + } + if len(frames) == 0 { + continue + } + if err := d.writeFrames(frames); err != nil { + if isClosedErr(err) { + return nil // shutting down + } + slog.Warn("tun datapath: error writing keep-alive frames", slog.Any("error", err)) + } + } +} + +// backoffOnError sleeps a bounded, escalating delay after `consec` consecutive +// non-closed read errors so a wedged device/socket cannot peg a core. It returns +// false if the datapath is shutting down (the caller should then return), true to +// retry. The sleep is interruptible by Close. +func (d *Datapath) backoffOnError(consec int) bool { + delay := time.Duration(consec) * 10 * time.Millisecond + if delay > maxErrorBackoff { + delay = maxErrorBackoff + } + timer := time.NewTimer(delay) + defer timer.Stop() + select { + case <-d.done: + return false + case <-timer.C: + return true + } +} + +// writeFrames writes a batch to the underlay, draining short writes so the tail +// of a batch is never silently dropped. +func (d *Datapath) writeFrames(frames [][]byte) error { + for len(frames) > 0 { + n, err := d.underlay.WriteFrames(frames) + if n > 0 { + frames = frames[n:] + } + if err != nil { + return err + } + if n == 0 { + return fmt.Errorf("tun datapath: underlay write stalled, %d frames undelivered", len(frames)) + } + } + return nil +} + +// isClosedErr reports whether err is the benign "the device/socket was closed" +// signal that means a clean shutdown. It covers net.ErrClosed, os.ErrClosed, and +// the wireguard TUN's textual close error. +func isClosedErr(err error) bool { + if err == nil { + return false + } + return errors.Is(err, net.ErrClosed) || + errors.Is(err, os.ErrClosed) || + strings.Contains(err.Error(), "closed") +} diff --git a/vtep/tun/datapath_test.go b/vtep/tun/datapath_test.go new file mode 100644 index 0000000..3b9ac5e --- /dev/null +++ b/vtep/tun/datapath_test.go @@ -0,0 +1,438 @@ +package tun + +import ( + "context" + "net" + "net/netip" + "sync" + "testing" + "time" + + "github.com/apoxy-dev/icx" + "github.com/stretchr/testify/require" + "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/header" +) + +// fakeDevice is an in-memory Device: inner packets to hand to Read are pushed on +// readCh; packets captured from Write land on written. Close unblocks a pending +// Read with net.ErrClosed. +type fakeDevice struct { + readCh chan []byte + written chan []byte + closed chan struct{} + closeOnce sync.Once + batch int +} + +func newFakeDevice(batch int) *fakeDevice { + if batch < 1 { + batch = 1 + } + return &fakeDevice{ + readCh: make(chan []byte, 16), + written: make(chan []byte, 16), + closed: make(chan struct{}), + batch: batch, + } +} + +func (f *fakeDevice) BatchSize() int { return f.batch } + +func (f *fakeDevice) Read(bufs [][]byte, sizes []int, offset int) (int, error) { + select { + case p := <-f.readCh: + n := copy(bufs[0][offset:], p) + sizes[0] = n + return 1, nil + case <-f.closed: + return 0, net.ErrClosed + } +} + +func (f *fakeDevice) Write(bufs [][]byte, offset int) (int, error) { + for _, b := range bufs { + pkt := append([]byte(nil), b[offset:]...) + select { + case f.written <- pkt: + case <-f.closed: + return 0, net.ErrClosed + } + } + return len(bufs), nil +} + +func (f *fakeDevice) Close() error { + f.closeOnce.Do(func() { close(f.closed) }) + return nil +} + +// fakeUnderlay is an in-memory Underlay carrying opaque phy frames: inbound +// frames are fed on in, outbound frames captured on out. ReadFrame unblocks with +// net.ErrClosed on Close. +type fakeUnderlay struct { + in chan []byte + out chan []byte + closed chan struct{} + closeOnce sync.Once +} + +func newFakeUnderlay() *fakeUnderlay { + return &fakeUnderlay{ + in: make(chan []byte, 16), + out: make(chan []byte, 16), + closed: make(chan struct{}), + } +} + +func (u *fakeUnderlay) ReadFrame(buf []byte) (int, error) { + select { + case f := <-u.in: + return copy(buf, f), nil + case <-u.closed: + return 0, net.ErrClosed + } +} + +func (u *fakeUnderlay) WriteFrames(frames [][]byte) (int, error) { + for _, f := range frames { + cp := append([]byte(nil), f...) + select { + case u.out <- cp: + case <-u.closed: + return 0, net.ErrClosed + } + } + return len(frames), nil +} + +func (u *fakeUnderlay) Close() error { + u.closeOnce.Do(func() { close(u.closed) }) + return nil +} + +// fakeEngine is an identity EngineXfrm used to isolate pump/lifecycle logic from +// crypto. If toPhyFrame is non-nil, ToPhy emits a copy of it (one per call), +// exercising the keep-alive pump. +type fakeEngine struct { + toPhyFrame []byte +} + +func (e *fakeEngine) VirtToPhy(virt, phy []byte) (int, bool) { return copy(phy, virt), false } +func (e *fakeEngine) PhyToVirt(phy, virt []byte) int { return copy(virt, phy) } +func (e *fakeEngine) ToPhy(phy []byte) int { + if e.toPhyFrame == nil { + return 0 + } + return copy(phy, e.toPhyFrame) +} + +// startRun runs dp.Run in a goroutine and returns a teardown that closes the +// datapath and waits for Run to return. +func startRun(t *testing.T, dp *Datapath) func() { + t.Helper() + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + if err := dp.Run(ctx); err != nil { + t.Errorf("Run: %v", err) + } + }() + return func() { + cancel() + _ = dp.Close() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Error("datapath Run did not return after shutdown") + } + } +} + +func TestOutboundPlumbing(t *testing.T) { + dev := newFakeDevice(1) + ul := newFakeUnderlay() + dp, err := New(Config{Engine: &fakeEngine{}, Device: dev, Underlay: ul}) + require.NoError(t, err) + stop := startRun(t, dp) + defer stop() + + pkt := []byte("inner-overlay-packet") + dev.readCh <- append([]byte(nil), pkt...) + + select { + case got := <-ul.out: + require.Equal(t, pkt, got, "outbound: identity engine must surface the inner packet on the underlay") + case <-time.After(2 * time.Second): + t.Fatal("timeout: outbound packet never reached the underlay") + } +} + +func TestInboundPlumbing(t *testing.T) { + dev := newFakeDevice(1) + ul := newFakeUnderlay() + dp, err := New(Config{Engine: &fakeEngine{}, Device: dev, Underlay: ul}) + require.NoError(t, err) + stop := startRun(t, dp) + defer stop() + + frame := []byte("encapd-underlay-frame") + ul.in <- append([]byte(nil), frame...) + + select { + case got := <-dev.written: + require.Equal(t, frame, got, "inbound: identity engine must surface the underlay frame on the device") + case <-time.After(2 * time.Second): + t.Fatal("timeout: inbound frame never reached the device") + } +} + +func TestKeepalivePlumbing(t *testing.T) { + ka := []byte("keep-alive-frame") + dev := newFakeDevice(1) + ul := newFakeUnderlay() + dp, err := New(Config{ + Engine: &fakeEngine{toPhyFrame: ka}, + Device: dev, + Underlay: ul, + FlushInterval: 5 * time.Millisecond, + }) + require.NoError(t, err) + stop := startRun(t, dp) + defer stop() + + select { + case got := <-ul.out: + require.Equal(t, ka, got, "keep-alive: ToPhy frames must be flushed to the underlay") + case <-time.After(2 * time.Second): + t.Fatal("timeout: no keep-alive frame flushed") + } +} + +func TestCloseStopsRun(t *testing.T) { + dev := newFakeDevice(1) + ul := newFakeUnderlay() + dp, err := New(Config{Engine: &fakeEngine{}, Device: dev, Underlay: ul}) + require.NoError(t, err) + + done := make(chan error, 1) + go func() { done <- dp.Run(context.Background()) }() + + // Let the pumps start, then Close and require a prompt clean return. + time.Sleep(20 * time.Millisecond) + require.NoError(t, dp.Close()) + + select { + case err := <-done: + require.NoError(t, err, "Run must return nil after Close") + case <-time.After(2 * time.Second): + t.Fatal("Run did not return after Close") + } +} + +func TestRunTwiceErrors(t *testing.T) { + dev := newFakeDevice(1) + ul := newFakeUnderlay() + dp, err := New(Config{Engine: &fakeEngine{}, Device: dev, Underlay: ul}) + require.NoError(t, err) + stop := startRun(t, dp) + defer stop() + + time.Sleep(20 * time.Millisecond) + require.Error(t, dp.Run(context.Background()), "second Run must be rejected") +} + +func TestRunAfterCloseErrors(t *testing.T) { + dev := newFakeDevice(1) + ul := newFakeUnderlay() + dp, err := New(Config{Engine: &fakeEngine{}, Device: dev, Underlay: ul}) + require.NoError(t, err) + require.NoError(t, dp.Close()) + require.Error(t, dp.Run(context.Background()), "Run after Close must be rejected per the contract") +} + +func TestNewValidatesConfig(t *testing.T) { + _, err := New(Config{Device: newFakeDevice(1), Underlay: newFakeUnderlay()}) + require.Error(t, err, "missing engine") + _, err = New(Config{Engine: &fakeEngine{}, Underlay: newFakeUnderlay()}) + require.Error(t, err, "missing device") + _, err = New(Config{Engine: &fakeEngine{}, Device: newFakeDevice(1)}) + require.Error(t, err, "missing underlay") +} + +// --- Integration: two real *icx.Handler engines over a real loopback UDP +// underlay, spliced through fake TUN devices. Exercises the full datapath — +// cross-buffer encap/decap + AEAD + Geneve + the peel/synthesize UDP underlay — +// end to end, with no kernel TUN device. --- + +func newPeerHandler(t *testing.T, localIP tcpip.Address, localPort uint16, remoteIP tcpip.Address, remotePort uint16, vni uint) *icx.Handler { + t.Helper() + h, err := icx.NewHandler( + icx.WithLocalAddr(&tcpip.FullAddress{Addr: localIP, Port: localPort}), + icx.WithLayer3VirtFrames(), + ) + require.NoError(t, err) + prefix := netip.MustParsePrefix("192.168.1.0/24") + require.NoError(t, h.AddVirtualNetwork(vni, + &tcpip.FullAddress{Addr: remoteIP, Port: remotePort}, + []icx.Route{{Src: prefix, Dst: prefix}})) + return h +} + +func makeInnerIPv4UDP() []byte { + b := make([]byte, header.IPv4MinimumSize+header.UDPMinimumSize) + ip := header.IPv4(b) + ip.Encode(&header.IPv4Fields{ + TotalLength: uint16(len(b)), + TTL: 64, + Protocol: uint8(header.UDPProtocolNumber), + SrcAddr: tcpip.AddrFrom4([4]byte{192, 168, 1, 1}), + DstAddr: tcpip.AddrFrom4([4]byte{192, 168, 1, 2}), + }) + ip.SetChecksum(^ip.CalculateChecksum()) + u := header.UDP(b[header.IPv4MinimumSize:]) + u.Encode(&header.UDPFields{SrcPort: 1234, DstPort: 5678, Length: header.UDPMinimumSize}) + return b +} + +func makeSizedInnerIPv4(t *testing.T, total int) []byte { + t.Helper() + require.GreaterOrEqual(t, total, header.IPv4MinimumSize+header.UDPMinimumSize) + b := make([]byte, total) + ip := header.IPv4(b) + ip.Encode(&header.IPv4Fields{ + TotalLength: uint16(total), + TTL: 64, + Protocol: uint8(header.UDPProtocolNumber), + SrcAddr: tcpip.AddrFrom4([4]byte{192, 168, 1, 1}), + DstAddr: tcpip.AddrFrom4([4]byte{192, 168, 1, 2}), + }) + ip.SetChecksum(^ip.CalculateChecksum()) + u := header.UDP(b[header.IPv4MinimumSize:]) + u.Encode(&header.UDPFields{SrcPort: 1234, DstPort: 5678, Length: uint16(total - header.IPv4MinimumSize)}) + return b +} + +// TestInboundOversizedDecapNoPanic is the regression for the inbound decap-buffer +// bound: a peer holding the SA key can encapsulate an inner packet larger than the +// receiver's MTU clamp, and PhyToVirt's AES-GCM Open does not bound its output to +// the destination buffer. An undersized inbound buffer would slice out of range +// (panic) on the decapsulated length. The frame is pre-built with a large encap +// buffer so the APO-667 *encap* bound does not drop it, then fed straight to the +// receiver datapath. +func TestInboundOversizedDecapNoPanic(t *testing.T) { + const vni = uint(7) + lo := tcpip.AddrFrom4([4]byte{127, 0, 0, 1}) + hA := newPeerHandler(t, lo, 6081, lo, 6081, vni) + hB := newPeerHandler(t, lo, 6081, lo, 6081, vni) + + const spiAB, spiBA = uint32(0x0A0A0A0A), uint32(0x0B0B0B0B) + var keyAB, keyBA [16]byte + for i := range keyAB { + keyAB[i] = 0xAA + keyBA[i] = 0xBB + } + exp := time.Now().Add(time.Hour) + require.NoError(t, hA.UpdateVirtualNetworkSAs(vni, spiBA, spiAB, keyBA, keyAB, exp)) + require.NoError(t, hB.UpdateVirtualNetworkSAs(vni, spiAB, spiBA, keyAB, keyBA, exp)) + + // 4000-byte inner packet: far above the 1280 clamp + the old inbound buffer. + largeInner := makeSizedInnerIPv4(t, 4000) + phy := make([]byte, maxFrameSize) + m, loop := hA.VirtToPhy(largeInner, phy) + require.False(t, loop) + require.NotZero(t, m, "encap with a large buffer must not be dropped") + + devB := newFakeDevice(1) + ulB := newFakeUnderlay() + dpB, err := New(Config{Engine: hB, Device: devB, Underlay: ulB, FlushInterval: time.Hour}) + require.NoError(t, err) + stop := startRun(t, dpB) + defer stop() + + ulB.in <- append([]byte(nil), phy[:m]...) + select { + case got := <-devB.written: + require.Equal(t, largeInner, got, "oversized inner packet must decapsulate intact, not panic") + case <-time.After(2 * time.Second): + t.Fatal("timeout: oversized inner packet never decapsulated") + } +} + +func mustListenUDP(t *testing.T) *net.UDPConn { + t.Helper() + c, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1)}) + require.NoError(t, err) + return c +} + +func TestDatapathIntegrationRoundTrip(t *testing.T) { + const vni = uint(7) + + connA := mustListenUDP(t) + connB := mustListenUDP(t) + portA := uint16(connA.LocalAddr().(*net.UDPAddr).Port) + portB := uint16(connB.LocalAddr().(*net.UDPAddr).Port) + lo := tcpip.AddrFrom4([4]byte{127, 0, 0, 1}) + + hA := newPeerHandler(t, lo, portA, lo, portB, vni) + hB := newPeerHandler(t, lo, portB, lo, portA, vni) + + // Directional SAs: A's TX SPI/key == B's RX SPI/key and vice versa. The keys + // differ per direction (the production seam rejects equal rx/tx keys). + const spiAB, spiBA = uint32(0x0A0A0A0A), uint32(0x0B0B0B0B) + var keyAB, keyBA [16]byte + for i := range keyAB { + keyAB[i] = 0xAA + keyBA[i] = 0xBB + } + exp := time.Now().Add(time.Hour) + require.NoError(t, hA.UpdateVirtualNetworkSAs(vni, spiBA, spiAB, keyBA, keyAB, exp)) + require.NoError(t, hB.UpdateVirtualNetworkSAs(vni, spiAB, spiBA, keyAB, keyBA, exp)) + + uuA, err := newUDPUnderlay(connA) + require.NoError(t, err) + uuB, err := newUDPUnderlay(connB) + require.NoError(t, err) + devA := newFakeDevice(1) + devB := newFakeDevice(1) + + // Long flush interval: keep keep-alives out of the assertions. + dpA, err := New(Config{Engine: hA, Device: devA, Underlay: uuA, FlushInterval: time.Hour}) + require.NoError(t, err) + dpB, err := New(Config{Engine: hB, Device: devB, Underlay: uuB, FlushInterval: time.Hour}) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(2) + go func() { defer wg.Done(); _ = dpA.Run(ctx) }() + go func() { defer wg.Done(); _ = dpB.Run(ctx) }() + t.Cleanup(func() { + cancel() + _ = dpA.Close() + _ = dpB.Close() + wg.Wait() + }) + + inner := makeInnerIPv4UDP() + + // A -> B + devA.readCh <- append([]byte(nil), inner...) + select { + case got := <-devB.written: + require.Equal(t, inner, got, "B must receive A's decapsulated inner packet") + case <-time.After(3 * time.Second): + t.Fatal("timeout: A->B inner packet never reached B") + } + + // B -> A + devB.readCh <- append([]byte(nil), inner...) + select { + case got := <-devA.written: + require.Equal(t, inner, got, "A must receive B's decapsulated inner packet") + case <-time.After(3 * time.Second): + t.Fatal("timeout: B->A inner packet never reached A") + } +} diff --git a/vtep/tun/device_linux.go b/vtep/tun/device_linux.go new file mode 100644 index 0000000..c0a38f0 --- /dev/null +++ b/vtep/tun/device_linux.go @@ -0,0 +1,164 @@ +//go:build linux + +package tun + +import ( + "fmt" + "net" + "net/netip" + "time" + + "github.com/apoxy-dev/icx/vtep" + "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" + wgtun "golang.zx2c4.com/wireguard/tun" +) + +// tunDeviceOffset is the read/write headroom the wireguard TUN device requires. +// CreateTUN opens the device with IFF_VNET_HDR, so Write must leave at least +// virtioNetHdrLen (10) bytes before the packet for the device to prepend the +// virtio-net header in place. 16 (matching wireguard/device.MessageTransportHeaderSize) +// is the conventional value and keeps the packet word-aligned. +const tunDeviceOffset = 16 + +// OpenConfig configures Open. Engine, Name and UnderlayBind are required. +type OpenConfig struct { + // Engine is the ICX engine; it must be configured in layer3 mode. + Engine vtep.EngineXfrm + // Name is the TUN interface name to create (e.g. "icx0"). An empty name lets + // the kernel pick one. + Name string + // OverlayAddrs are the L3 addresses assigned to the TUN device (the overlay + // CIDRs the consumer's stack owns). + OverlayAddrs []netip.Prefix + // Routes are the overlay prefixes routed out the TUN device (link-scoped). + Routes []netip.Prefix + // InnerMTU is the device MTU / inner-MTU clamp; 0 uses defaultInnerMTU (1280). + InnerMTU int + // UnderlayBind is the local UDP address the underlay socket binds to. + UnderlayBind netip.AddrPort + // FlushInterval overrides the keep-alive flush cadence; 0 uses the default. + FlushInterval time.Duration +} + +// Open creates a /dev/net/tun overlay device and a UDP underlay socket, wires +// them to the engine, configures the device's MTU/addresses/routes, and returns +// a ready-to-Run Datapath that OWNS both (Close tears them down). It requires +// NET_ADMIN and access to /dev/net/tun. +func Open(cfg OpenConfig) (*Datapath, error) { + if cfg.Engine == nil { + return nil, fmt.Errorf("tun datapath: engine is required") + } + mtu := cfg.InnerMTU + if mtu <= 0 { + mtu = defaultInnerMTU + } + + dev, err := wgtun.CreateTUN(cfg.Name, mtu) + if err != nil { + return nil, fmt.Errorf("tun datapath: create TUN %q: %w", cfg.Name, err) + } + + // Disable kernel GSO/GRO offload on the device so each Read returns a single + // <= MTU packet (no virtio super-frames the read buffers would have to be + // sized for) and each single-packet Write emits a plain GSO_NONE frame. The + // device keeps IFF_VNET_HDR, so the fixed tunDeviceOffset headroom is still + // required on Read/Write. + f := dev.File() + if f == nil { + // Without the fd we cannot disable offload, and proceeding with GSO/GRO + // enabled risks the kernel handing back super-frames the read buffers are + // not sized for (black-holing traffic). Fail loudly instead. + _ = dev.Close() + return nil, fmt.Errorf("tun datapath: device exposes no file descriptor; cannot disable offload") + } + if err := unix.IoctlSetInt(int(f.Fd()), unix.TUNSETOFFLOAD, 0); err != nil { + _ = dev.Close() + return nil, fmt.Errorf("tun datapath: disable offload: %w", err) + } + + name, err := dev.Name() + if err != nil { + _ = dev.Close() + return nil, fmt.Errorf("tun datapath: device name: %w", err) + } + + if err := configureLink(name, mtu, cfg.OverlayAddrs, cfg.Routes); err != nil { + _ = dev.Close() + return nil, err + } + + uc, err := net.ListenUDP("udp", net.UDPAddrFromAddrPort(cfg.UnderlayBind)) + if err != nil { + _ = dev.Close() + return nil, fmt.Errorf("tun datapath: bind underlay %s: %w", cfg.UnderlayBind, err) + } + uu, err := newUDPUnderlay(uc) + if err != nil { + _ = uc.Close() + _ = dev.Close() + return nil, err + } + + dp, err := New(Config{ + Engine: cfg.Engine, + Device: dev, + Underlay: uu, + DeviceOffset: tunDeviceOffset, + InnerMTU: mtu, + FlushInterval: cfg.FlushInterval, + }) + if err != nil { + _ = uu.Close() + _ = dev.Close() + return nil, err + } + return dp, nil +} + +// configureLink assigns the MTU, addresses and link-scoped routes to the TUN +// device and brings it up. Mirrors the apoxy-cli NetlinkRouter setup. +func configureLink(name string, mtu int, addrs, routes []netip.Prefix) error { + link, err := netlink.LinkByName(name) + if err != nil { + return fmt.Errorf("tun datapath: link %q: %w", name, err) + } + if err := netlink.LinkSetMTU(link, mtu); err != nil { + return fmt.Errorf("tun datapath: set MTU on %q: %w", name, err) + } + for _, p := range addrs { + // An interface address keeps its host bits (e.g. 10.0.0.5/24), so use the + // address as given, not the masked network. + ipnet := &net.IPNet{ + IP: net.IP(p.Addr().AsSlice()), + Mask: net.CIDRMask(p.Bits(), p.Addr().BitLen()), + } + if err := netlink.AddrAdd(link, &netlink.Addr{IPNet: ipnet}); err != nil { + return fmt.Errorf("tun datapath: add addr %s to %q: %w", p, name, err) + } + } + if err := netlink.LinkSetUp(link); err != nil { + return fmt.Errorf("tun datapath: bring up %q: %w", name, err) + } + for _, p := range routes { + // A route destination is a network, so mask off any host bits. + m := p.Masked() + dst := &net.IPNet{ + IP: net.IP(m.Addr().AsSlice()), + Mask: net.CIDRMask(m.Bits(), m.Addr().BitLen()), + } + r := &netlink.Route{ + LinkIndex: link.Attrs().Index, + Dst: dst, + Scope: netlink.SCOPE_LINK, + } + // RouteReplace (not RouteAdd) for idempotency: assigning an OverlayAddr + // already auto-installs the connected route for its prefix, and a reconciler + // may re-run Open, so RouteAdd's EEXIST on an already-present route would + // spuriously fail. Replace ensures the prefix points at our TUN either way. + if err := netlink.RouteReplace(r); err != nil { + return fmt.Errorf("tun datapath: add route %s via %q: %w", p, name, err) + } + } + return nil +} diff --git a/vtep/tun/device_linux_test.go b/vtep/tun/device_linux_test.go new file mode 100644 index 0000000..ce25464 --- /dev/null +++ b/vtep/tun/device_linux_test.go @@ -0,0 +1,66 @@ +//go:build linux + +package tun + +import ( + "context" + "net/netip" + "os" + "testing" + "time" + + "github.com/apoxy-dev/icx" + "github.com/stretchr/testify/require" + "gvisor.dev/gvisor/pkg/tcpip" +) + +// TestOpenRealDeviceSmoke exercises the real /dev/net/tun device-creation glue: +// CreateTUN + offload-disable + netlink addr/route/MTU + the UDP underlay bind, +// then a Run/Close lifecycle. It requires NET_ADMIN and /dev/net/tun, so it skips +// in unprivileged CI; the datapath logic itself is covered cross-platform by +// TestDatapathIntegrationRoundTrip. +func TestOpenRealDeviceSmoke(t *testing.T) { + if os.Geteuid() != 0 { + t.Skip("requires root (NET_ADMIN + /dev/net/tun)") + } + if _, err := os.Stat("/dev/net/tun"); err != nil { + t.Skipf("no /dev/net/tun: %v", err) + } + + h, err := icx.NewHandler( + icx.WithLocalAddr(&tcpip.FullAddress{Addr: tcpip.AddrFrom4([4]byte{127, 0, 0, 1}), Port: 6081}), + icx.WithLayer3VirtFrames(), + ) + require.NoError(t, err) + + dp, err := Open(OpenConfig{ + Engine: h, + Name: "", // let the kernel pick a free name + OverlayAddrs: []netip.Prefix{netip.MustParsePrefix("192.168.77.1/24")}, + // Route both the connected prefix (which assigning the OverlayAddr already + // auto-installs — exercises the idempotent RouteReplace path) and a distinct + // remote overlay prefix (the realistic backplane shape). + Routes: []netip.Prefix{ + netip.MustParsePrefix("192.168.77.0/24"), + netip.MustParsePrefix("10.99.0.0/24"), + }, + InnerMTU: defaultInnerMTU, + UnderlayBind: netip.MustParseAddrPort("127.0.0.1:0"), + }) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- dp.Run(ctx) }() + + time.Sleep(50 * time.Millisecond) + cancel() + require.NoError(t, dp.Close()) + + select { + case err := <-done: + require.NoError(t, err, "Run must shut down cleanly") + case <-time.After(2 * time.Second): + t.Fatal("Run did not return after Close") + } +} diff --git a/vtep/tun/underlay.go b/vtep/tun/underlay.go new file mode 100644 index 0000000..dc65b40 --- /dev/null +++ b/vtep/tun/underlay.go @@ -0,0 +1,172 @@ +package tun + +import ( + "errors" + "fmt" + "log/slog" + "net" + "net/netip" + + "github.com/apoxy-dev/icx/udp" + "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/header" +) + +// Peel/synthesize errors. The engine's phy frame is a full Ethernet+IP+UDP+Geneve +// frame, but the backplane pod has no CAP_NET_RAW for a packet socket, so the +// underlay is a plain UDP socket: WriteFrames peels the outer headers and sends +// only the Geneve UDP payload to the destination the engine selected, and +// ReadFrame synthesizes a matching outer frame around an inbound payload so the +// engine's PhyToVirt (which parses a full L2 frame) can decode it. This is the +// same bridge apoxy-cli's l2pc performs for the netstack driver. +var ( + errShortFrame = errors.New("tun underlay: frame shorter than ethernet header") + errInvalidFrame = errors.New("tun underlay: invalid outer IPv4/IPv6+UDP frame") + errUnsupportedEthertype = errors.New("tun underlay: unsupported ethertype") + errFrameTooLarge = errors.New("tun underlay: frame exceeds buffer") +) + +// udpUnderlay adapts a *net.UDPConn into an Underlay that carries full +// Ethernet+IP+UDP+Geneve phy frames, peeling/synthesizing the outer headers. +type udpUnderlay struct { + conn *net.UDPConn +} + +var _ Underlay = (*udpUnderlay)(nil) + +// newUDPUnderlay wraps a bound UDP socket. The driver owns the socket and closes +// it on Close. +func newUDPUnderlay(conn *net.UDPConn) (*udpUnderlay, error) { + if conn == nil { + return nil, fmt.Errorf("tun underlay: conn is required") + } + if _, ok := conn.LocalAddr().(*net.UDPAddr); !ok { + return nil, fmt.Errorf("tun underlay: conn must be UDP") + } + return &udpUnderlay{conn: conn}, nil +} + +func (u *udpUnderlay) Close() error { return u.conn.Close() } + +// WriteFrames peels each full phy frame to (Geneve payload, outer destination) +// and sends the payload over the UDP socket. A frame that fails to peel (malformed +// engine output) is skipped rather than stalling the batch. On a socket write +// error it returns the number sent so far so the caller can drain the remainder. +func (u *udpUnderlay) WriteFrames(frames [][]byte) (int, error) { + for i, f := range frames { + payload, dst, err := peel(f) + if err != nil { + // peel only ever sees our own engine's VirtToPhy/ToPhy output, so a + // failure here implies an engine bug, not untrusted input — surface it + // rather than dropping silently. Keep the batch moving. + slog.Warn("tun underlay: dropping unpeelable engine frame", slog.Any("error", err)) + continue + } + if _, err := u.conn.WriteToUDPAddrPort(payload, dst); err != nil { + if isClosedErr(err) { + return i, net.ErrClosed + } + return i, err + } + } + return len(frames), nil +} + +// ReadFrame reads one UDP datagram (a Geneve payload) and synthesizes a full +// Ethernet+IP+UDP frame around it into buf, returning the frame length. +func (u *udpUnderlay) ReadFrame(buf []byte) (int, error) { + // Read the payload into the part of buf where it will live for an IPv6 outer + // frame (the larger header room); synthesize then shifts it down for an IPv4 + // outer frame, avoiding a second buffer. + const reserve = 62 // udp.PayloadOffsetIPv6 + if len(buf) <= reserve { + return 0, errFrameTooLarge + } + n, peer, err := u.conn.ReadFromUDPAddrPort(buf[reserve:]) + if err != nil { + return 0, err + } + if n == 0 { + return 0, nil + } + return synthesize(buf, reserve, n, peer) +} + +// peel extracts the Geneve UDP payload and the outer destination AddrPort from a +// full Ethernet+IP+UDP frame produced by the engine's VirtToPhy/ToPhy. The +// returned payload aliases frame. +func peel(frame []byte) ([]byte, netip.AddrPort, error) { + if len(frame) < header.EthernetMinimumSize { + return nil, netip.AddrPort{}, errShortFrame + } + eth := header.Ethernet(frame) + switch eth.Type() { + case header.IPv4ProtocolNumber: + ip := header.IPv4(frame[header.EthernetMinimumSize:]) + if !ip.IsValid(len(ip)) || ip.Protocol() != uint8(header.UDPProtocolNumber) { + return nil, netip.AddrPort{}, errInvalidFrame + } + udpHdr := header.UDP(ip.Payload()) + if len(udpHdr) < header.UDPMinimumSize { + return nil, netip.AddrPort{}, errInvalidFrame + } + dstIP, ok := netip.AddrFromSlice(ip.DestinationAddressSlice()) + if !ok { + return nil, netip.AddrPort{}, errInvalidFrame + } + return udpHdr.Payload(), netip.AddrPortFrom(dstIP.Unmap(), udpHdr.DestinationPort()), nil + + case header.IPv6ProtocolNumber: + ip := header.IPv6(frame[header.EthernetMinimumSize:]) + if !ip.IsValid(len(ip)) || ip.TransportProtocol() != header.UDPProtocolNumber { + return nil, netip.AddrPort{}, errInvalidFrame + } + udpHdr := header.UDP(ip.Payload()) + if len(udpHdr) < header.UDPMinimumSize { + return nil, netip.AddrPort{}, errInvalidFrame + } + dstIP, ok := netip.AddrFromSlice(ip.DestinationAddressSlice()) + if !ok { + return nil, netip.AddrPort{}, errInvalidFrame + } + return udpHdr.Payload(), netip.AddrPortFrom(dstIP.Unmap(), udpHdr.DestinationPort()), nil + + default: + return nil, netip.AddrPort{}, errUnsupportedEthertype + } +} + +// synthesize builds a full Ethernet+IP+UDP frame around the payload currently at +// buf[payloadAt:payloadAt+payloadLen] into buf, returning the frame length. The +// outer family is taken from peer so udp.Decode routes the correct path. The +// outer addresses are decorative to the engine: udp.Decode (called with +// skip-checksum) ignores the MACs and checksum, validating only the length +// fields, and PhyToVirt selects the SA by Geneve SPI and never learns the peer. +// Both outer src and dst are set to peer, which trivially keeps the address +// families equal as udp.Encode requires. +func synthesize(buf []byte, payloadAt, payloadLen int, peer netip.AddrPort) (int, error) { + a := peer.Addr().Unmap() + + var payloadOff int + var fa tcpip.FullAddress + if a.Is4() { + payloadOff = udp.PayloadOffsetIPv4 + fa = tcpip.FullAddress{Addr: tcpip.AddrFrom4(a.As4()), Port: peer.Port()} + } else { + payloadOff = udp.PayloadOffsetIPv6 + fa = tcpip.FullAddress{Addr: tcpip.AddrFrom16(a.As16()), Port: peer.Port()} + } + + if payloadOff+payloadLen > len(buf) { + return 0, errFrameTooLarge + } + if payloadOff != payloadAt { + // IPv4 outer: shift the payload down from the IPv6 reserve to the IPv4 + // offset. copy handles the overlap (dst < src) correctly. + copy(buf[payloadOff:payloadOff+payloadLen], buf[payloadAt:payloadAt+payloadLen]) + } + // skip-checksum: the synthesized frame is consumed only by our own engine's + // PhyToVirt in this process and never goes on a wire; udp.Decode is called + // with skipChecksumValidation, so a real checksum would be wasted work. + return udp.Encode(buf, &fa, &fa, payloadLen, true) +} diff --git a/vtep/tun/underlay_test.go b/vtep/tun/underlay_test.go new file mode 100644 index 0000000..e25b17e --- /dev/null +++ b/vtep/tun/underlay_test.go @@ -0,0 +1,139 @@ +package tun + +import ( + "net" + "net/netip" + "testing" + + "github.com/apoxy-dev/icx/udp" + "github.com/stretchr/testify/require" + "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/header" +) + +func fullAddr(ap netip.AddrPort) *tcpip.FullAddress { + a := ap.Addr().Unmap() + if a.Is4() { + return &tcpip.FullAddress{Addr: tcpip.AddrFrom4(a.As4()), Port: ap.Port()} + } + return &tcpip.FullAddress{Addr: tcpip.AddrFrom16(a.As16()), Port: ap.Port()} +} + +// buildPhyFrame builds a full Ethernet+IP+UDP frame carrying payload, the shape +// the engine's VirtToPhy/ToPhy emit and the underlay peels. +func buildPhyFrame(t *testing.T, src, dst netip.AddrPort, payload []byte) []byte { + t.Helper() + off := udp.PayloadOffsetIPv4 + if !dst.Addr().Unmap().Is4() { + off = udp.PayloadOffsetIPv6 + } + buf := make([]byte, off+len(payload)) + copy(buf[off:], payload) + n, err := udp.Encode(buf, fullAddr(src), fullAddr(dst), len(payload), false) + require.NoError(t, err) + return buf[:n] +} + +func TestPeelIPv4(t *testing.T) { + src := netip.MustParseAddrPort("1.2.3.4:1111") + dst := netip.MustParseAddrPort("5.6.7.8:6081") + payload := []byte("geneve-and-ciphertext") + frame := buildPhyFrame(t, src, dst, payload) + + gotPayload, gotDst, err := peel(frame) + require.NoError(t, err) + require.Equal(t, payload, gotPayload) + require.Equal(t, dst, gotDst) +} + +func TestPeelIPv6(t *testing.T) { + src := netip.MustParseAddrPort("[2001:db8::1]:1111") + dst := netip.MustParseAddrPort("[2001:db8::2]:6081") + payload := []byte("geneve-and-ciphertext-v6") + frame := buildPhyFrame(t, src, dst, payload) + + gotPayload, gotDst, err := peel(frame) + require.NoError(t, err) + require.Equal(t, payload, gotPayload) + require.Equal(t, dst, gotDst) +} + +func TestPeelRejectsMalformed(t *testing.T) { + _, _, err := peel([]byte{0x00, 0x01, 0x02}) + require.ErrorIs(t, err, errShortFrame) + + // A 14-byte ethernet header with an unsupported ethertype. + bad := make([]byte, header.EthernetMinimumSize) + header.Ethernet(bad).Encode(&header.EthernetFields{Type: 0x9999}) + _, _, err = peel(bad) + require.ErrorIs(t, err, errUnsupportedEthertype) +} + +// TestSynthesizeRoundTrip checks that a frame synthesized from a payload + peer +// address decodes back to that payload via the same udp.Decode the engine uses +// (skip-checksum), for both address families. The payload starts at the IPv6 +// reserve offset, matching udpUnderlay.ReadFrame's in-place contract. +func TestSynthesizeRoundTrip(t *testing.T) { + const reserve = 62 // udp.PayloadOffsetIPv6 + for _, tc := range []struct { + name string + peer netip.AddrPort + }{ + {"ipv4", netip.MustParseAddrPort("10.0.0.9:6081")}, + {"ipv6", netip.MustParseAddrPort("[fd00::9]:6081")}, + } { + t.Run(tc.name, func(t *testing.T) { + payload := []byte("decryptable-geneve-payload") + buf := make([]byte, 2048) + copy(buf[reserve:], payload) + + n, err := synthesize(buf, reserve, len(payload), tc.peer) + require.NoError(t, err) + + out, err := udp.Decode(buf[:n], nil, true) + require.NoError(t, err) + require.Equal(t, payload, out) + }) + } +} + +func TestSynthesizeRejectsOversized(t *testing.T) { + buf := make([]byte, 100) + _, err := synthesize(buf, 62, 200, netip.MustParseAddrPort("10.0.0.1:6081")) + require.ErrorIs(t, err, errFrameTooLarge) +} + +// TestUDPUnderlayLoopback drives the real udpUnderlay over a loopback UDP socket +// pair: a full phy frame written to A is peeled, sent, received by B, and +// synthesized back into a frame whose decoded payload matches the original. +func TestUDPUnderlayLoopback(t *testing.T) { + connA, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1)}) + require.NoError(t, err) + connB, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1)}) + require.NoError(t, err) + + uuA, err := newUDPUnderlay(connA) + require.NoError(t, err) + t.Cleanup(func() { _ = uuA.Close() }) + uuB, err := newUDPUnderlay(connB) + require.NoError(t, err) + t.Cleanup(func() { _ = uuB.Close() }) + + src := connA.LocalAddr().(*net.UDPAddr).AddrPort() + dst := connB.LocalAddr().(*net.UDPAddr).AddrPort() + payload := []byte("geneve-header-plus-aead-ciphertext") + frame := buildPhyFrame(t, src, dst, payload) + + n, err := uuA.WriteFrames([][]byte{frame}) + require.NoError(t, err) + require.Equal(t, 1, n) + + buf := make([]byte, maxFrameSize) + fn, err := uuB.ReadFrame(buf) + require.NoError(t, err) + require.NotZero(t, fn) + + out, err := udp.Decode(buf[:fn], nil, true) + require.NoError(t, err) + require.Equal(t, payload, out, "payload must survive peel -> UDP wire -> synthesize") +}