diff --git a/storage/cmd/containers-storage/splitfdstream.go b/storage/cmd/containers-storage/splitfdstream.go new file mode 100644 index 0000000000..f7bca05664 --- /dev/null +++ b/storage/cmd/containers-storage/splitfdstream.go @@ -0,0 +1,202 @@ +//go:build linux + +package main + +import ( + "bytes" + "fmt" + "os" + "os/signal" + "path/filepath" + "syscall" + + "go.podman.io/storage" + graphdriver "go.podman.io/storage/drivers" + "go.podman.io/storage/pkg/archive" + "go.podman.io/storage/pkg/mflag" + "go.podman.io/storage/pkg/splitfdstream" +) + +const defaultJSONRPCSocket = "json-rpc.sock" + +var ( + splitfdstreamSocket = "" + applyFdstreamSocket = "" + applyFdstreamParent = "" + applyFdstreamMountLabel = "" +) + +// splitFDStreamDiffer implements graphdriver.Differ for splitfdstream data +type splitFDStreamDiffer struct { + streamData []byte + fds []*os.File + store storage.Store +} + +func (d *splitFDStreamDiffer) ApplyDiff(dest string, options *archive.TarOptions, differOpts *graphdriver.DifferOptions) (graphdriver.DriverWithDifferOutput, error) { + driver, err := d.store.GraphDriver() + if err != nil { + return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("failed to get graph driver: %w", err) + } + + splitDriver, ok := driver.(splitfdstream.SplitFDStreamDriver) + if !ok { + return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("driver %s does not support splitfdstream", driver.String()) + } + + opts := &splitfdstream.ApplySplitFDStreamOpts{ + Stream: bytes.NewReader(d.streamData), + FileDescriptors: d.fds, + StagingDir: dest, + } + + size, err := splitDriver.ApplySplitFDStream(opts) + if err != nil { + return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("failed to apply splitfdstream to staging dir %s: %w", dest, err) + } + + return graphdriver.DriverWithDifferOutput{ + Target: dest, + Size: size, + }, nil +} + +func (d *splitFDStreamDiffer) Close() error { + return nil +} + +func splitfdstreamServer(flags *mflag.FlagSet, action string, m storage.Store, args []string) (int, error) { + driver, err := m.GraphDriver() + if err != nil { + return 1, fmt.Errorf("failed to get graph driver: %w", err) + } + + server := splitfdstream.NewJSONRPCServer(driver) + + socketPath := splitfdstreamSocket + if socketPath == "" { + socketPath = filepath.Join(m.RunRoot(), defaultJSONRPCSocket) + } + + if err := server.Start(socketPath); err != nil { + return 1, fmt.Errorf("failed to start server: %w", err) + } + defer func() { _ = server.Stop() }() + + fmt.Printf("%s\n", socketPath) + + // Wait for interrupt signal + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + + return 0, nil +} + +func applySplitfdstream(flags *mflag.FlagSet, action string, m storage.Store, args []string) (int, error) { + layerID := args[0] + + socketPath := applyFdstreamSocket + if socketPath == "" { + socketPath = filepath.Join(m.RunRoot(), defaultJSONRPCSocket) + } + + defer func() { + if _, err := m.Shutdown(false); err != nil { + fmt.Fprintf(os.Stderr, "warning: failed to shutdown storage: %v\n", err) + } + }() + + client, err := splitfdstream.NewJSONRPCClient(socketPath) + if err != nil { + return 1, fmt.Errorf("failed to connect to server: %w", err) + } + defer client.Close() + + // Get splitfdstream data from remote server + streamData, fds, err := client.GetSplitFDStream(layerID, "") + if err != nil { + return 1, fmt.Errorf("failed to get splitfdstream from server: %w", err) + } + + // Close received FDs when done + defer func() { + for _, fd := range fds { + fd.Close() + } + }() + + // Create a custom differ for splitfdstream data + differ := &splitFDStreamDiffer{ + streamData: streamData, + fds: fds, + store: m, + } + defer differ.Close() + + // Prepare the staged layer + diffOptions := &graphdriver.ApplyDiffWithDifferOpts{} + diffOutput, err := m.PrepareStagedLayer(diffOptions, differ) + if err != nil { + return 1, fmt.Errorf("failed to prepare staged layer: %w", err) + } + + // Apply the staged layer to create the final layer + applyArgs := storage.ApplyStagedLayerOptions{ + ID: layerID, + ParentLayer: applyFdstreamParent, + MountLabel: applyFdstreamMountLabel, + Writeable: false, + LayerOptions: &storage.LayerOptions{}, + DiffOutput: diffOutput, + DiffOptions: diffOptions, + } + + layer, err := m.ApplyStagedLayer(applyArgs) + if err != nil { + // Clean up the staged layer on failure + if cleanupErr := m.CleanupStagedLayer(diffOutput); cleanupErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to cleanup staged layer: %v\n", cleanupErr) + } + return 1, fmt.Errorf("failed to apply staged layer: %w", err) + } + + // Output the result + if jsonOutput { + return outputJSON(map[string]interface{}{"id": layer.ID, "size": diffOutput.Size}) + } + fmt.Printf("%s\n", layer.ID) + return 0, nil +} + +func init() { + commands = append(commands, command{ + names: []string{"json-rpc-server"}, + optionsHelp: "[options]", + usage: "Start a JSON-RPC server", + minArgs: 0, + maxArgs: 0, + action: splitfdstreamServer, + addFlags: func(flags *mflag.FlagSet, cmd *command) { + flags.StringVar(&splitfdstreamSocket, []string{"-socket"}, "", + "Path to UNIX socket") + }, + }) + commands = append(commands, command{ + names: []string{"apply-splitfdstream"}, + optionsHelp: "[options] layerID", + usage: "Fetch a layer from remote server and apply it locally", + minArgs: 1, + maxArgs: 1, + action: applySplitfdstream, + addFlags: func(flags *mflag.FlagSet, cmd *command) { + flags.StringVar(&applyFdstreamSocket, []string{"-socket"}, "", + "Path to remote UNIX socket") + flags.StringVar(&applyFdstreamParent, []string{"-parent"}, "", + "Parent layer ID for the new layer") + flags.StringVar(&applyFdstreamMountLabel, []string{"-mount-label"}, "", + "SELinux mount label for the layer") + flags.BoolVar(&jsonOutput, []string{"-json", "j"}, jsonOutput, "Prefer JSON output") + }, + }) +} diff --git a/storage/drivers/overlay/overlay_splitfdstream.go b/storage/drivers/overlay/overlay_splitfdstream.go new file mode 100644 index 0000000000..e698eaef99 --- /dev/null +++ b/storage/drivers/overlay/overlay_splitfdstream.go @@ -0,0 +1,277 @@ +//go:build linux + +package overlay + +import ( + "archive/tar" + "bytes" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/sirupsen/logrus" + "go.podman.io/storage/pkg/archive" + "go.podman.io/storage/pkg/chrootarchive" + "go.podman.io/storage/pkg/directory" + "go.podman.io/storage/pkg/fileutils" + "go.podman.io/storage/pkg/idtools" + "go.podman.io/storage/pkg/splitfdstream" + "go.podman.io/storage/pkg/unshare" + "golang.org/x/sys/unix" +) + +// ErrSplitFDStreamNotSupported is returned when splitfdstream operations +// are not supported for a layer (e.g., composefs layers). +var ErrSplitFDStreamNotSupported = errors.New("splitfdstream not supported for this layer") + +// untarSplitFDStream defines the splitfdstream untar method (through chrootarchive for security isolation) +var untarSplitFDStream = chrootarchive.UnpackSplitFDStream + +// ApplySplitFDStream applies changes from a split FD stream to the specified layer. +// It iterates over the splitfdstream entries and extracts them using +// archive.UnpackFromIterator, which enables reflink-based copying for +// external file descriptor references. +// This API is experimental and can be changed without bumping the major version number. +func (d *Driver) ApplySplitFDStream(options *splitfdstream.ApplySplitFDStreamOpts) (int64, error) { + if options == nil { + return 0, fmt.Errorf("options cannot be nil") + } + if err := options.Validate(); err != nil { + return 0, fmt.Errorf("invalid options: %w", err) + } + + var diffPath string + + if options.StagingDir != "" { + diffPath = options.StagingDir + logrus.Debugf("overlay: ApplySplitFDStream applying to staging dir %s", diffPath) + } else { + dir := d.dir(options.LayerID) + if err := fileutils.Exists(dir); err != nil { + return 0, fmt.Errorf("layer %s does not exist: %w", options.LayerID, err) + } + + composefsData := d.getComposefsData(options.LayerID) + if err := fileutils.Exists(composefsData); err == nil { + return 0, fmt.Errorf("%w: layer %s uses composefs", ErrSplitFDStreamNotSupported, options.LayerID) + } + + var err error + diffPath, err = d.getDiffPath(options.LayerID) + if err != nil { + return 0, fmt.Errorf("failed to get diff path for layer %s: %w", options.LayerID, err) + } + + logrus.Debugf("overlay: ApplySplitFDStream applying to layer %s at %s", options.LayerID, diffPath) + } + + idMappings := options.IDMappings + if idMappings == nil { + idMappings = &idtools.IDMappings{} + } + + if err := untarSplitFDStream(options.Stream, options.FileDescriptors, diffPath, &archive.TarOptions{ + UIDMaps: idMappings.UIDs(), + GIDMaps: idMappings.GIDs(), + IgnoreChownErrors: options.IgnoreChownErrors || d.options.ignoreChownErrors, + WhiteoutFormat: d.getWhiteoutFormat(), + ForceMask: options.ForceMask, + InUserNS: unshare.IsRootless(), + }); err != nil { + return 0, fmt.Errorf("failed to apply split FD stream: %w", err) + } + + return directory.Size(diffPath) +} + +// GetSplitFDStream generates a split FD stream from the layer differences. +// The returned ReadCloser contains the splitfdstream-formatted data, and the +// []*os.File slice contains the external file descriptors referenced by the stream. +// Regular files are passed as external file descriptors for reflink-based copying. +// The caller is responsible for closing both the ReadCloser and all file descriptors. +// This API is experimental and can be changed without bumping the major version number. +func (d *Driver) GetSplitFDStream(id, parent string, options *splitfdstream.GetSplitFDStreamOpts) (io.ReadCloser, []*os.File, error) { + if options == nil { + return nil, nil, fmt.Errorf("options cannot be nil") + } + + dir := d.dir(id) + if err := fileutils.Exists(dir); err != nil { + return nil, nil, fmt.Errorf("layer %s does not exist: %w", id, err) + } + + // Check if this is a composefs layer - splitfdstream is not supported for composefs yet + composefsData := d.getComposefsData(id) + if err := fileutils.Exists(composefsData); err == nil { + return nil, nil, fmt.Errorf("%w: layer %s uses composefs", ErrSplitFDStreamNotSupported, id) + } else if !errors.Is(err, unix.ENOENT) { + return nil, nil, err + } + + logrus.Debugf("overlay: GetSplitFDStream for layer %s with parent %s", id, parent) + + // Set up ID mappings + idMappings := options.IDMappings + if idMappings == nil { + idMappings = &idtools.IDMappings{} + } + + // Get the diff path for file access (used for FD references) + diffPath, err := d.getDiffPath(id) + if err != nil { + return nil, nil, fmt.Errorf("failed to get diff path for layer %s: %w", id, err) + } + + // Use Diff() to generate the tar stream - it handles naiveDiff + // and all the edge cases correctly. + tarStream, err := d.Diff(id, idMappings, parent, nil, options.MountLabel) + if err != nil { + return nil, nil, fmt.Errorf("failed to generate diff for layer %s: %w", id, err) + } + defer tarStream.Close() + + // Buffer the splitfdstream data in memory + var buf bytes.Buffer + var fds []*os.File + writer := splitfdstream.NewWriter(&buf) + + // Convert tar stream to splitfdstream + err = d.convertTarToSplitFDStream(tarStream, writer, diffPath, &fds) + if err != nil { + // Close any opened FDs on error + for _, f := range fds { + f.Close() + } + return nil, nil, fmt.Errorf("failed to convert tar to splitfdstream: %w", err) + } + + logrus.Debugf("overlay: GetSplitFDStream complete for layer %s: streamSize=%d, numFDs=%d", id, buf.Len(), len(fds)) + return io.NopCloser(bytes.NewReader(buf.Bytes())), fds, nil +} + +// convertTarToSplitFDStream converts a tar stream to a splitfdstream by parsing +// tar headers and replacing file content with file descriptor references. +func (d *Driver) convertTarToSplitFDStream(tarStream io.ReadCloser, writer *splitfdstream.SplitFDStreamWriter, diffPath string, fds *[]*os.File) error { + tr := tar.NewReader(tarStream) + + // Open diff directory for safe file access + diffDirFd, err := unix.Open(diffPath, unix.O_RDONLY|unix.O_DIRECTORY|unix.O_CLOEXEC, 0) + if err != nil { + return fmt.Errorf("failed to open diff directory %s: %w", diffPath, err) + } + defer unix.Close(diffDirFd) + + for { + header, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed to read tar header: %w", err) + } + + // Write the tar header as inline data + if err := d.writeTarHeaderInline(writer, header); err != nil { + return fmt.Errorf("failed to write tar header for %s: %w", header.Name, err) + } + + // Handle file content + if header.Typeflag == tar.TypeReg && header.Size > 0 { + // Try to open file and write FD reference + ok, err := d.tryWriteFileAsFDReference(writer, diffDirFd, header, fds) + if err != nil { + return fmt.Errorf("failed to write FD reference for %s: %w", header.Name, err) + } + if ok { + // Skip the content in the tar stream since we're using FD reference + if _, err := io.CopyN(io.Discard, tr, header.Size); err != nil { + return fmt.Errorf("failed to skip content for %s: %w", header.Name, err) + } + } else { + // File not found in diff directory (e.g., naiveDiff was used), + // write content inline from the tar stream + content := make([]byte, header.Size) + if _, err := io.ReadFull(tr, content); err != nil { + return fmt.Errorf("failed to read content for %s: %w", header.Name, err) + } + if err := writer.WriteInline(content); err != nil { + return fmt.Errorf("failed to write inline content for %s: %w", header.Name, err) + } + } + } + // For non-regular files or empty files, there's no content to handle + } + + return nil +} + +// writeTarHeaderInline writes a tar header as inline data to the splitfdstream. +func (d *Driver) writeTarHeaderInline(writer *splitfdstream.SplitFDStreamWriter, header *tar.Header) error { + var headerBuf bytes.Buffer + tw := tar.NewWriter(&headerBuf) + if err := tw.WriteHeader(header); err != nil { + return fmt.Errorf("failed to serialize tar header: %w", err) + } + + headerBytes := headerBuf.Bytes() + if len(headerBytes) > 0 { + if err := writer.WriteInline(headerBytes); err != nil { + return fmt.Errorf("failed to write inline header: %w", err) + } + } + + return nil +} + +// tryWriteFileAsFDReference tries to open a file and write an FD reference to the splitfdstream. +// Returns (true, nil) if the file was successfully written as FD reference. +// Returns (false, nil) if the file doesn't exist in the diff directory (caller should write inline). +// Returns (false, error) on other errors. +func (d *Driver) tryWriteFileAsFDReference(writer *splitfdstream.SplitFDStreamWriter, diffDirFd int, header *tar.Header, fds *[]*os.File) (bool, error) { + // Clean the file name to prevent path traversal + cleanName := filepath.Clean(header.Name) + if strings.Contains(cleanName, "..") { + return false, fmt.Errorf("invalid file path: %s", header.Name) + } + + // Open the file safely using openat2 + fd, err := unix.Openat2(diffDirFd, cleanName, &unix.OpenHow{ + Flags: unix.O_RDONLY | unix.O_CLOEXEC, + Resolve: unix.RESOLVE_NO_SYMLINKS | unix.RESOLVE_BENEATH, + }) + if err != nil { + // File not found in diff directory - caller should write inline + return false, nil + } + + // Verify it's still a regular file + var fdStat unix.Stat_t + if err := unix.Fstat(fd, &fdStat); err != nil { + unix.Close(fd) + return false, fmt.Errorf("failed to fstat opened file %s: %w", cleanName, err) + } + if fdStat.Mode&unix.S_IFMT != unix.S_IFREG { + unix.Close(fd) + return false, fmt.Errorf("file %s is not a regular file", cleanName) + } + + // Create os.File from fd + f := os.NewFile(uintptr(fd), cleanName) + if f == nil { + unix.Close(fd) + return false, fmt.Errorf("failed to create File from fd for %s", cleanName) + } + + fdIndex := len(*fds) + *fds = append(*fds, f) + + // Write FD reference + if err := writer.WriteExternal(fdIndex); err != nil { + return false, fmt.Errorf("failed to write external FD reference: %w", err) + } + + return true, nil +} diff --git a/storage/drivers/overlay/splitfdstream_test.go b/storage/drivers/overlay/splitfdstream_test.go new file mode 100644 index 0000000000..ea2fd2b2b8 --- /dev/null +++ b/storage/drivers/overlay/splitfdstream_test.go @@ -0,0 +1,58 @@ +//go:build linux + +package overlay + +import ( + "testing" + + "go.podman.io/storage/pkg/splitfdstream" +) + +func TestApplySplitFDStreamStub(t *testing.T) { + driver := &Driver{ + home: t.TempDir(), + } + + // Test with nil options + _, err := driver.ApplySplitFDStream(nil) + if err == nil { + t.Error("Expected error with nil options") + } + + // Test with valid options but non-existent layer + opts := &splitfdstream.ApplySplitFDStreamOpts{LayerID: "non-existent-layer"} + _, err = driver.ApplySplitFDStream(opts) + if err == nil { + t.Error("Expected error for non-existent layer") + } +} + +func TestGetSplitFDStreamStub(t *testing.T) { + driver := &Driver{ + home: t.TempDir(), + } + + // Test with nil options + _, _, err := driver.GetSplitFDStream("test-layer", "parent-layer", nil) + if err == nil { + t.Error("Expected error with nil options") + } + + // Test with valid options but non-existent layer + opts := &splitfdstream.GetSplitFDStreamOpts{} + _, _, err = driver.GetSplitFDStream("non-existent-layer", "parent-layer", opts) + if err == nil { + t.Error("Expected error for non-existent layer") + } +} + +// TestOverlayImplementsSplitFDStreamDriver verifies that the overlay driver +// implements the SplitFDStreamDriver interface via type assertion. +func TestOverlayImplementsSplitFDStreamDriver(t *testing.T) { + driver := &Driver{} + + // Verify the driver implements SplitFDStreamDriver + if _, ok := interface{}(driver).(splitfdstream.SplitFDStreamDriver); !ok { + t.Error("Expected overlay driver to implement SplitFDStreamDriver interface") + } +} diff --git a/storage/go.mod b/storage/go.mod index c25f03adf9..6d94234682 100644 --- a/storage/go.mod +++ b/storage/go.mod @@ -6,6 +6,7 @@ module go.podman.io/storage require ( github.com/BurntSushi/toml v1.6.0 + github.com/cgwalters/jsonrpc-fdpass-go v0.0.0-20260126203148-2bca851a3863 github.com/containerd/stargz-snapshotter/estargz v0.18.2 github.com/cyphar/filepath-securejoin v0.6.1 github.com/docker/go-units v0.5.0 diff --git a/storage/go.sum b/storage/go.sum index d90d86395f..222b24bae1 100644 --- a/storage/go.sum +++ b/storage/go.sum @@ -2,6 +2,8 @@ cyphar.com/go-pathrs v0.2.1 h1:9nx1vOgwVvX1mNBWDu93+vaceedpbsDqo+XuBGL40b8= cyphar.com/go-pathrs v0.2.1/go.mod h1:y8f1EMG7r+hCuFf/rXsKqMJrJAUoADZGNh5/vZPKcGc= github.com/BurntSushi/toml v1.6.0 h1:dRaEfpa2VI55EwlIW72hMRHdWouJeRF7TPYhI+AUQjk= github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/cgwalters/jsonrpc-fdpass-go v0.0.0-20260126203148-2bca851a3863 h1:IxSkyu1DEg3XekvAJs3JNNiDg8fPJgR5BItqb1ZSLWI= +github.com/cgwalters/jsonrpc-fdpass-go v0.0.0-20260126203148-2bca851a3863/go.mod h1:naXj4fiEUjm+AaZDEGja5cJgASuAZwlsPSDBewGK+iY= github.com/containerd/stargz-snapshotter/estargz v0.18.2 h1:yXkZFYIzz3eoLwlTUZKz2iQ4MrckBxJjkmD16ynUTrw= github.com/containerd/stargz-snapshotter/estargz v0.18.2/go.mod h1:XyVU5tcJ3PRpkA9XS2T5us6Eg35yM0214Y+wvrZTBrY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= diff --git a/storage/pkg/archive/archive.go b/storage/pkg/archive/archive.go index 5cdd751307..0a58885ec8 100644 --- a/storage/pkg/archive/archive.go +++ b/storage/pkg/archive/archive.go @@ -704,7 +704,7 @@ func (ta *tarWriter) addFile(headers *addFileData) error { return nil } -func extractTarFileEntry(path, extractDir string, hdr *tar.Header, reader io.Reader, Lchown bool, chownOpts *idtools.IDPair, inUserns, ignoreChownErrors bool, forceMask *os.FileMode, buffer []byte) error { +func extractTarFileEntry(path, extractDir string, hdr *tar.Header, writeContent func(*os.File) error, Lchown bool, chownOpts *idtools.IDPair, inUserns, ignoreChownErrors bool, forceMask *os.FileMode) error { // hdr.Mode is in linux format, which we can use for sycalls, // but for os.Foo() calls we need the mode converted to os.FileMode, // so use hdrInfo.Mode() (they differ for e.g. setuid bits) @@ -741,9 +741,11 @@ func extractTarFileEntry(path, extractDir string, hdr *tar.Header, reader io.Rea if err != nil { return err } - if _, err := io.CopyBuffer(file, reader, buffer); err != nil { - file.Close() - return err + if writeContent != nil { + if err := writeContent(file); err != nil { + file.Close() + return err + } } if err := file.Close(); err != nil { return err @@ -1087,17 +1089,67 @@ func TarWithOptions(srcPath string, options *TarOptions) (io.ReadCloser, error) return pipeReader, nil } -// Unpack unpacks the decompressedArchive to dest with options. -func Unpack(decompressedArchive io.Reader, dest string, options *TarOptions) error { +// TarEntryIterator abstracts iteration over tar entries. +// Standard implementation wraps tar.Reader; splitfdstream provides +// entries from its chunk-based format with reflink support. +type TarEntryIterator interface { + // Next advances to the next entry and returns its header. + Next() (*tar.Header, error) + // WriteContentTo writes the current entry's file content to dst. + // Only called for TypeReg entries with Size > 0. + WriteContentTo(dst *os.File) error +} + +// tarReaderIterator implements TarEntryIterator for a standard tar.Reader. +type tarReaderIterator struct { + tr *tar.Reader + trBuf *bufio.Reader + buffer []byte +} + +func newTarReaderIterator(decompressedArchive io.Reader) *tarReaderIterator { tr := tar.NewReader(decompressedArchive) trBuf := pools.BufioReader32KPool.Get(nil) - defer pools.BufioReader32KPool.Put(trBuf) + return &tarReaderIterator{ + tr: tr, + trBuf: trBuf, + buffer: make([]byte, 1<<20), + } +} +func (i *tarReaderIterator) Next() (*tar.Header, error) { + hdr, err := i.tr.Next() + if err != nil { + return nil, err + } + i.trBuf.Reset(i.tr) + return hdr, nil +} + +func (i *tarReaderIterator) WriteContentTo(dst *os.File) error { + _, err := io.CopyBuffer(dst, i.trBuf, i.buffer) + return err +} + +func (i *tarReaderIterator) close() { + pools.BufioReader32KPool.Put(i.trBuf) +} + +// Unpack unpacks the decompressedArchive to dest with options. +func Unpack(decompressedArchive io.Reader, dest string, options *TarOptions) error { + iter := newTarReaderIterator(decompressedArchive) + defer iter.close() + return UnpackFromIterator(iter, dest, options) +} + +// UnpackFromIterator unpacks tar entries from the given iterator to dest with options. +// This allows plugging in alternative sources of tar entries (e.g., splitfdstream) +// while reusing the full extraction logic including xattrs, whiteouts, device nodes, etc. +func UnpackFromIterator(iter TarEntryIterator, dest string, options *TarOptions) error { var dirs []*tar.Header idMappings := idtools.NewIDMappingsFromMaps(options.UIDMaps, options.GIDMaps) rootIDs := idMappings.RootPair() whiteoutConverter := GetWhiteoutConverter(options.WhiteoutFormat, options.WhiteoutData) - buffer := make([]byte, 1<<20) doChown := !options.NoLchown if options.ForceMask != nil { @@ -1109,7 +1161,7 @@ func Unpack(decompressedArchive io.Reader, dest string, options *TarOptions) err // Iterate through the files in the archive. loop: for { - hdr, err := tr.Next() + hdr, err := iter.Next() if err == io.EOF { // end of tar archive break @@ -1183,7 +1235,6 @@ loop: } } } - trBuf.Reset(tr) chownOpts := options.ChownOpts if err := remapIDs(nil, idMappings, chownOpts, hdr); err != nil { @@ -1204,7 +1255,10 @@ loop: chownOpts = &idtools.IDPair{UID: hdr.Uid, GID: hdr.Gid} } - if err = extractTarFileEntry(path, dest, hdr, trBuf, doChown, chownOpts, options.InUserNS, options.IgnoreChownErrors, options.ForceMask, buffer); err != nil { + writeContent := func(dst *os.File) error { + return iter.WriteContentTo(dst) + } + if err = extractTarFileEntry(path, dest, hdr, writeContent, doChown, chownOpts, options.InUserNS, options.IgnoreChownErrors, options.ForceMask); err != nil { return err } diff --git a/storage/pkg/archive/archive_test.go b/storage/pkg/archive/archive_test.go index 50d714b5ac..40e1009f2a 100644 --- a/storage/pkg/archive/archive_test.go +++ b/storage/pkg/archive/archive_test.go @@ -747,8 +747,7 @@ func TestTarWithOptions(t *testing.T) { func TestTypeXGlobalHeaderDoesNotFail(t *testing.T) { hdr := tar.Header{Typeflag: tar.TypeXGlobalHeader} tmpDir := t.TempDir() - buffer := make([]byte, 1<<20) - err := extractTarFileEntry(filepath.Join(tmpDir, "pax_global_header"), tmpDir, &hdr, nil, true, nil, false, false, nil, buffer) + err := extractTarFileEntry(filepath.Join(tmpDir, "pax_global_header"), tmpDir, &hdr, nil, true, nil, false, false, nil) if err != nil { t.Fatal(err) } diff --git a/storage/pkg/archive/diff.go b/storage/pkg/archive/diff.go index 355d65f212..64be4f8fdd 100644 --- a/storage/pkg/archive/diff.go +++ b/storage/pkg/archive/diff.go @@ -104,7 +104,8 @@ func UnpackLayer(dest string, layer io.Reader, options *TarOptions) (size int64, } defer os.RemoveAll(aufsTempdir) } - if err := extractTarFileEntry(filepath.Join(aufsTempdir, basename), dest, hdr, tr, true, nil, options.InUserNS, options.IgnoreChownErrors, options.ForceMask, buffer); err != nil { + writeContent := func(dst *os.File) error { _, err := io.CopyBuffer(dst, tr, buffer); return err } + if err := extractTarFileEntry(filepath.Join(aufsTempdir, basename), dest, hdr, writeContent, true, nil, options.InUserNS, options.IgnoreChownErrors, options.ForceMask); err != nil { return 0, err } } @@ -209,7 +210,8 @@ func UnpackLayer(dest string, layer io.Reader, options *TarOptions) (size int64, return 0, err } - if err := extractTarFileEntry(path, dest, srcHdr, srcData, true, nil, options.InUserNS, options.IgnoreChownErrors, options.ForceMask, buffer); err != nil { + writeContent := func(dst *os.File) error { _, err := io.CopyBuffer(dst, srcData, buffer); return err } + if err := extractTarFileEntry(path, dest, srcHdr, writeContent, true, nil, options.InUserNS, options.IgnoreChownErrors, options.ForceMask); err != nil { return 0, err } diff --git a/storage/pkg/archive/splitfdstream.go b/storage/pkg/archive/splitfdstream.go new file mode 100644 index 0000000000..e3d4b739d2 --- /dev/null +++ b/storage/pkg/archive/splitfdstream.go @@ -0,0 +1,97 @@ +package archive + +import ( + "archive/tar" + "bytes" + "encoding/binary" + "fmt" + "io" + "os" + + "go.podman.io/storage/pkg/fileutils" +) + +// splitFDStreamIterator implements TarEntryIterator for splitfdstream data. +// It parses the composefs-rs splitfdstream format and provides tar headers. +// For external FD references, WriteContentTo uses ReflinkOrCopy to efficiently +// copy file content via reflinks when possible. +type splitFDStreamIterator struct { + stream io.Reader + fds []*os.File + contentFD *os.File // FD for current entry's content (external reference) + content io.Reader // reader for current entry's inline content +} + +// NewSplitFDStreamIterator creates a TarEntryIterator that reads entries from +// a splitfdstream-formatted stream, using the provided file descriptors for +// external content references. +func NewSplitFDStreamIterator(stream io.Reader, fds []*os.File) TarEntryIterator { + return &splitFDStreamIterator{ + stream: stream, + fds: fds, + } +} + +func (i *splitFDStreamIterator) Next() (*tar.Header, error) { + var prefix int64 + err := binary.Read(i.stream, binary.LittleEndian, &prefix) + if err == io.EOF { + return nil, io.EOF + } + if err != nil { + return nil, fmt.Errorf("failed to read chunk prefix: %w", err) + } + + if prefix >= 0 { + return nil, fmt.Errorf("expected inline chunk for tar header, got external reference %d", prefix) + } + + // Inline chunk: read the serialized tar header + dataLen := int(-prefix) + headerData := make([]byte, dataLen) + if _, err := io.ReadFull(i.stream, headerData); err != nil { + return nil, fmt.Errorf("failed to read inline data: %w", err) + } + + header, err := tar.NewReader(bytes.NewReader(headerData)).Next() + if err != nil { + return nil, fmt.Errorf("failed to parse tar header from inline chunk: %w", err) + } + + // Reset content state + i.contentFD = nil + i.content = nil + + // For regular files with content, read the next chunk to determine source + if header.Typeflag == tar.TypeReg && header.Size > 0 { + if err := binary.Read(i.stream, binary.LittleEndian, &prefix); err != nil { + return nil, fmt.Errorf("failed to read content chunk prefix for %s: %w", header.Name, err) + } + + if prefix < 0 { + // Inline content + contentLen := -prefix + i.content = io.LimitReader(i.stream, contentLen) + } else { + // External content from FD + fdIndex := int(prefix) + if fdIndex >= len(i.fds) { + return nil, fmt.Errorf("fd index %d out of range (have %d fds)", fdIndex, len(i.fds)) + } + i.contentFD = i.fds[fdIndex] + } + } + + return header, nil +} + +func (i *splitFDStreamIterator) WriteContentTo(dst *os.File) error { + if i.contentFD != nil { + return fileutils.ReflinkOrCopy(i.contentFD, dst) + } + if i.content != nil { + _, err := io.Copy(dst, i.content) + return err + } + return nil +} diff --git a/storage/pkg/chrootarchive/init_unix.go b/storage/pkg/chrootarchive/init_unix.go index 1b566c817e..2ad9b78627 100644 --- a/storage/pkg/chrootarchive/init_unix.go +++ b/storage/pkg/chrootarchive/init_unix.go @@ -13,6 +13,7 @@ import ( func init() { reexec.Register("storage-applyLayer", applyLayer) reexec.Register("storage-untar", untar) + reexec.Register("storage-untar-splitfdstream", untarSplitFDStream) reexec.Register("storage-tar", tar) } diff --git a/storage/pkg/chrootarchive/splitfdstream_unix.go b/storage/pkg/chrootarchive/splitfdstream_unix.go new file mode 100644 index 0000000000..0f3ac107fd --- /dev/null +++ b/storage/pkg/chrootarchive/splitfdstream_unix.go @@ -0,0 +1,213 @@ +//go:build !windows && !darwin + +package chrootarchive + +import ( + "bytes" + "flag" + "fmt" + "io" + "os" + "path/filepath" + "runtime" + "strconv" + "sync" + + "golang.org/x/sys/unix" + + "go.podman.io/storage/pkg/archive" + "go.podman.io/storage/pkg/fileutils" + "go.podman.io/storage/pkg/idtools" + "go.podman.io/storage/pkg/reexec" + "go.podman.io/storage/pkg/unshare" +) + +// fdSocketDescriptor is the file descriptor number for the Unix socket used to +// pass content FDs from the parent to the child process via SCM_RIGHTS. +// FD 3 = tar options, FD 4 = root dir, FD 5 = FD socket. +const fdSocketDescriptor = 5 + +// UnpackSplitFDStream unpacks a splitfdstream into dest within a chroot for security isolation. +// The stream contains splitfdstream-formatted data read from stdin, and fds are the external +// file descriptors referenced by the stream for reflink-based copying. +// +// Content FDs are sent to the child process via SCM_RIGHTS over a Unix socket +// after the child starts, rather than inherited via ExtraFiles at fork time. +// This avoids exceeding the file descriptor limit during the child's dynamic +// linker phase (EMFILE when loading shared libraries). +func UnpackSplitFDStream(stream io.Reader, fds []*os.File, dest string, options *archive.TarOptions) error { + if options == nil { + options = &archive.TarOptions{} + options.InUserNS = unshare.IsRootless() + } + + idMappings := idtools.NewIDMappingsFromMaps(options.UIDMaps, options.GIDMaps) + rootIDs := idMappings.RootPair() + + dest = filepath.Clean(dest) + if err := fileutils.Exists(dest); os.IsNotExist(err) { + if err := idtools.MkdirAllAndChownNew(dest, 0o755, rootIDs); err != nil { + return err + } + } + + destVal, err := newUnpackDestination(dest, dest) + if err != nil { + return err + } + defer destVal.Close() + + // Create pipe for tar options + r, w, err := os.Pipe() + if err != nil { + return fmt.Errorf("splitfdstream pipe failure: %w", err) + } + + // Create a Unix socketpair for passing content FDs to the child process. + socketPair, err := unix.Socketpair(unix.AF_UNIX, unix.SOCK_STREAM|unix.SOCK_CLOEXEC, 0) + if err != nil { + r.Close() + w.Close() + return fmt.Errorf("splitfdstream socketpair failure: %w", err) + } + parentSocketFD := socketPair[0] + childSocket := os.NewFile(uintptr(socketPair[1]), "splitfdstream-child-socket") + + numFDs := strconv.Itoa(len(fds)) + cmd := reexec.Command("storage-untar-splitfdstream", destVal.dest, procPathForFd(rootFileDescriptor), numFDs) + cmd.Stdin = stream + + cmd.ExtraFiles = append(cmd.ExtraFiles, r) // fd 3: tar options + cmd.ExtraFiles = append(cmd.ExtraFiles, destVal.root) // fd 4: root dir + cmd.ExtraFiles = append(cmd.ExtraFiles, childSocket) // fd 5: FD socket + + output := bytes.NewBuffer(nil) + cmd.Stdout = output + cmd.Stderr = output + + if err := cmd.Start(); err != nil { + w.Close() + unix.Close(parentSocketFD) + childSocket.Close() + return fmt.Errorf("splitfdstream untar error on re-exec cmd: %w", err) + } + + // Parent no longer needs the child's socket end or the read end of the pipe + childSocket.Close() + r.Close() + + // Send content FDs to the child via SCM_RIGHTS on the Unix socket. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + defer unix.Close(parentSocketFD) + for _, fd := range fds { + rights := unix.UnixRights(int(fd.Fd())) + if err := unix.Sendmsg(parentSocketFD, []byte{0}, rights, nil, 0); err != nil { + return + } + } + }() + + if err := json.NewEncoder(w).Encode(options); err != nil { + w.Close() + return fmt.Errorf("splitfdstream untar json encode to pipe failed: %w", err) + } + w.Close() + + if err := cmd.Wait(); err != nil { + wg.Wait() + return fmt.Errorf("splitfdstream unpacking failed (error: %w; output: %s)", err, output) + } + wg.Wait() + return nil +} + +// untarSplitFDStream is the reexec handler for "storage-untar-splitfdstream". +// It chroots into the destination and unpacks the splitfdstream from stdin. +func untarSplitFDStream() { + runtime.LockOSThread() + flag.Parse() + + var options archive.TarOptions + + // Read the options from the pipe (FD 3) + if err := json.NewDecoder(os.NewFile(tarOptionsDescriptor, "options")).Decode(&options); err != nil { + fatal(err) + } + + dest := flag.Arg(0) + root := flag.Arg(1) + numFDs, err := strconv.Atoi(flag.Arg(2)) + if err != nil { + fatal(fmt.Errorf("invalid numFDs argument %q: %w", flag.Arg(2), err)) + } + + // Handle root directory FD for chroot (same pattern as untar) + if root == procPathForFd(rootFileDescriptor) { + rootFd := os.NewFile(rootFileDescriptor, "tar-root") + defer rootFd.Close() + if err := unix.Fchdir(int(rootFd.Fd())); err != nil { + fatal(err) + } + root = "." + } else if root == "" { + root = dest + } + + if err := chroot(root); err != nil { + fatal(err) + } + + // Raise the file descriptor soft limit to the hard limit to + // accommodate the content FDs that will be received from the parent. + if numFDs > 0 { + var rLimit unix.Rlimit + if err := unix.Getrlimit(unix.RLIMIT_NOFILE, &rLimit); err == nil { + rLimit.Cur = rLimit.Max + _ = unix.Setrlimit(unix.RLIMIT_NOFILE, &rLimit) + } + } + + // Receive content FDs from the parent via SCM_RIGHTS on the Unix socket (FD 5). + fds := make([]*os.File, 0, numFDs) + if numFDs > 0 { + buf := make([]byte, 1) + oob := make([]byte, unix.CmsgSpace(4)) + for i := range numFDs { + _, oobn, _, _, err := unix.Recvmsg(fdSocketDescriptor, buf, oob, 0) + if err != nil { + fatal(fmt.Errorf("receiving content FD %d: %w", i, err)) + } + scms, err := unix.ParseSocketControlMessage(oob[:oobn]) + if err != nil { + fatal(fmt.Errorf("parsing socket control message for FD %d: %w", i, err)) + } + if len(scms) == 0 { + fatal(fmt.Errorf("no control message received for FD %d", i)) + } + receivedFDs, err := unix.ParseUnixRights(&scms[0]) + if err != nil { + fatal(fmt.Errorf("parsing unix rights for FD %d: %w", i, err)) + } + fds = append(fds, os.NewFile(uintptr(receivedFDs[0]), fmt.Sprintf("content-fd-%d", i))) + } + } + unix.Close(fdSocketDescriptor) + + iter := archive.NewSplitFDStreamIterator(os.Stdin, fds) + if err := archive.UnpackFromIterator(iter, dest, &options); err != nil { + fatal(err) + } + // fully consume stdin in case it is zero padded + if _, err := flush(os.Stdin); err != nil { + fatal(err) + } + + for _, f := range fds { + f.Close() + } + + os.Exit(0) +} diff --git a/storage/pkg/chrootarchive/splitfdstream_unsupported.go b/storage/pkg/chrootarchive/splitfdstream_unsupported.go new file mode 100644 index 0000000000..55f922de67 --- /dev/null +++ b/storage/pkg/chrootarchive/splitfdstream_unsupported.go @@ -0,0 +1,17 @@ +//go:build windows || darwin + +package chrootarchive + +import ( + "fmt" + "io" + "os" + "runtime" + + "go.podman.io/storage/pkg/archive" +) + +// UnpackSplitFDStream is not supported on this platform. +func UnpackSplitFDStream(stream io.Reader, fds []*os.File, dest string, options *archive.TarOptions) error { + return fmt.Errorf("UnpackSplitFDStream is not supported on %s", runtime.GOOS) +} diff --git a/storage/pkg/fileutils/reflink_linux.go b/storage/pkg/fileutils/reflink_linux.go index 9f5c6c90bb..6c3570fa8e 100644 --- a/storage/pkg/fileutils/reflink_linux.go +++ b/storage/pkg/fileutils/reflink_linux.go @@ -8,13 +8,57 @@ import ( ) // ReflinkOrCopy attempts to reflink the source to the destination fd. -// If reflinking fails or is unsupported, it falls back to io.Copy(). +// If reflinking fails, it tries copy_file_range for kernel-level copying. +// If that also fails, it falls back to io.Copy(). func ReflinkOrCopy(src, dst *os.File) error { err := unix.IoctlFileClone(int(dst.Fd()), int(src.Fd())) if err == nil { return nil } + // Get source file size for copy_file_range + srcInfo, statErr := src.Stat() + if statErr != nil { + // Fall back to io.Copy if we can't stat + _, err = io.Copy(dst, src) + return err + } + + // Try copy_file_range - kernel-level copy, more efficient than userspace + // but only works within the same filesystem + if err := doCopyFileRange(src, dst, srcInfo.Size()); err == nil { + return nil + } + + // copy_file_range may have partially written data before failing, + // so reset both file offsets and truncate dst before falling back. + if _, err := src.Seek(0, io.SeekStart); err != nil { + return err + } + if _, err := dst.Seek(0, io.SeekStart); err != nil { + return err + } + if err := dst.Truncate(0); err != nil { + return err + } + + // Fall back to userspace io.Copy _, err = io.Copy(dst, src) return err } + +// doCopyFileRange uses the copy_file_range syscall for kernel-level copying. +func doCopyFileRange(src, dst *os.File, size int64) error { + remaining := size + for remaining > 0 { + n, err := unix.CopyFileRange(int(src.Fd()), nil, int(dst.Fd()), nil, int(remaining), 0) + if err != nil { + return err + } + if n == 0 { + break + } + remaining -= int64(n) + } + return nil +} diff --git a/storage/pkg/splitfdstream/server_linux.go b/storage/pkg/splitfdstream/server_linux.go new file mode 100644 index 0000000000..908adbdc80 --- /dev/null +++ b/storage/pkg/splitfdstream/server_linux.go @@ -0,0 +1,471 @@ +//go:build linux + +package splitfdstream + +import ( + "errors" + "fmt" + "io" + "net" + "os" + "runtime" + "sync" + + fdpass "github.com/cgwalters/jsonrpc-fdpass-go" + "golang.org/x/sys/unix" +) + +// sendRetry retries sender.Send on EAGAIN (non-blocking socket buffer full). +func sendRetry(sender *fdpass.Sender, msg *fdpass.MessageWithFds) error { + for { + err := sender.Send(msg) + if err == nil { + return nil + } + if errors.Is(err, unix.EAGAIN) || errors.Is(err, unix.EWOULDBLOCK) { + runtime.Gosched() + continue + } + return err + } +} + +// JSONRPCServer manages a JSON-RPC server using the external library. +type JSONRPCServer struct { + driver any + listener net.Listener + running bool + mu sync.RWMutex + shutdown chan struct{} + connections sync.WaitGroup +} + +// NewJSONRPCServer creates a new JSON-RPC server. +func NewJSONRPCServer(driver any) *JSONRPCServer { + return &JSONRPCServer{ + driver: driver, + shutdown: make(chan struct{}), + } +} + +// Start starts the JSON-RPC server listening on the given Unix socket. +func (s *JSONRPCServer) Start(socketPath string) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.running { + return fmt.Errorf("server already running") + } + + os.Remove(socketPath) + + listener, err := net.Listen("unix", socketPath) + if err != nil { + return fmt.Errorf("failed to listen on %s: %w", socketPath, err) + } + + s.listener = listener + s.running = true + + go s.acceptConnections() + + return nil +} + +// Stop stops the JSON-RPC server. +func (s *JSONRPCServer) Stop() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.running { + return nil + } + + close(s.shutdown) + if s.listener != nil { + s.listener.Close() + } + s.connections.Wait() + s.running = false + + return nil +} + +func (s *JSONRPCServer) acceptConnections() { + for { + conn, err := s.listener.Accept() + if err != nil { + select { + case <-s.shutdown: + return + default: + continue + } + } + + unixConn, ok := conn.(*net.UnixConn) + if !ok { + conn.Close() + continue + } + + go s.HandleConnection(unixConn) + } +} + +// HandleConnection handles a single client connection. +func (s *JSONRPCServer) HandleConnection(conn *net.UnixConn) { + s.connections.Add(1) + defer s.connections.Done() + defer conn.Close() + + receiver := fdpass.NewReceiver(conn) + sender := fdpass.NewSender(conn) + defer receiver.Close() + + for { + select { + case <-s.shutdown: + return + default: + } + + msgWithFds, err := receiver.Receive() + if err != nil { + return + } + + req, ok := msgWithFds.Message.(*fdpass.Request) + if !ok { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: -32600, Message: "Invalid Request"}, + nil, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + return + } + continue + } + + s.handleRequest(sender, req, msgWithFds.FileDescriptors) + } +} + +func (s *JSONRPCServer) handleRequest(sender *fdpass.Sender, req *fdpass.Request, fds []*os.File) { + switch req.Method { + case "GetSplitFDStream": + s.handleGetSplitFDStream(sender, req) + default: + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: -32601, Message: fmt.Sprintf("method %s not found", req.Method)}, + req.ID, + ) + _ = sendRetry(sender, &fdpass.MessageWithFds{Message: resp}) + } +} + +func (s *JSONRPCServer) handleGetSplitFDStream(sender *fdpass.Sender, req *fdpass.Request) { + params, ok := req.Params.(map[string]interface{}) + if !ok { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: -32602, Message: "params must be an object"}, + req.ID, + ) + _ = sendRetry(sender, &fdpass.MessageWithFds{Message: resp}) + return + } + + layerID, _ := params["layerId"].(string) + if layerID == "" { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: -32602, Message: "layerId is required"}, + req.ID, + ) + _ = sendRetry(sender, &fdpass.MessageWithFds{Message: resp}) + return + } + + parentID, _ := params["parentId"].(string) + + splitDriver, ok := s.driver.(SplitFDStreamDriver) + if !ok { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: -32000, Message: "driver does not support splitfdstream"}, + req.ID, + ) + _ = sendRetry(sender, &fdpass.MessageWithFds{Message: resp}) + return + } + + stream, fileFDs, err := splitDriver.GetSplitFDStream(layerID, parentID, &GetSplitFDStreamOpts{}) + if err != nil { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: -32000, Message: err.Error()}, + req.ID, + ) + _ = sendRetry(sender, &fdpass.MessageWithFds{Message: resp}) + return + } + + streamData, err := io.ReadAll(stream) + stream.Close() + if err != nil { + for _, f := range fileFDs { + f.Close() + } + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: -32000, Message: fmt.Sprintf("failed to read stream: %v", err)}, + req.ID, + ) + _ = sendRetry(sender, &fdpass.MessageWithFds{Message: resp}) + return + } + + // Write stream data to a memfd so we can pass it as a file descriptor. + streamFd, err := unix.MemfdCreate("splitfdstream", unix.MFD_CLOEXEC) + if err != nil { + for _, f := range fileFDs { + f.Close() + } + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: -32000, Message: fmt.Sprintf("memfd_create: %v", err)}, + req.ID, + ) + _ = sendRetry(sender, &fdpass.MessageWithFds{Message: resp}) + return + } + streamFile := os.NewFile(uintptr(streamFd), "splitfdstream") + if _, err := streamFile.Write(streamData); err != nil { + streamFile.Close() + for _, f := range fileFDs { + f.Close() + } + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: -32000, Message: fmt.Sprintf("memfd write: %v", err)}, + req.ID, + ) + _ = sendRetry(sender, &fdpass.MessageWithFds{Message: resp}) + return + } + if _, err := streamFile.Seek(0, 0); err != nil { + streamFile.Close() + for _, f := range fileFDs { + f.Close() + } + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: -32000, Message: fmt.Sprintf("memfd seek: %v", err)}, + req.ID, + ) + _ = sendRetry(sender, &fdpass.MessageWithFds{Message: resp}) + return + } + + // Prepend the stream memfd to the file descriptor list. + // allFDs[0] = stream data, allFDs[1:] = content file descriptors. + allFDs := make([]*os.File, 0, 1+len(fileFDs)) + allFDs = append(allFDs, streamFile) + allFDs = append(allFDs, fileFDs...) + + // Send the response with the first batch of FDs. + // The library limits to MaxFDsPerMessage per sendmsg, so remaining + // FDs are sent as follow-up "fds" notifications. + firstBatch := allFDs + if len(firstBatch) > fdpass.MaxFDsPerMessage { + firstBatch = allFDs[:fdpass.MaxFDsPerMessage] + } + + result := map[string]interface{}{ + "streamSize": len(streamData), + "totalFDs": len(allFDs), + } + + resp := fdpass.NewResponse(result, req.ID) + if err := sendRetry(sender, &fdpass.MessageWithFds{ + Message: resp, + FileDescriptors: firstBatch, + }); err != nil { + fmt.Fprintf(os.Stderr, "error sending initial response: %v\n", err) + return + } + + // Send remaining FDs in batches via notifications + for i := fdpass.MaxFDsPerMessage; i < len(allFDs); i += fdpass.MaxFDsPerMessage { + end := i + fdpass.MaxFDsPerMessage + if end > len(allFDs) { + end = len(allFDs) + } + batch := allFDs[i:end] + + notif := fdpass.NewNotification("fds", nil) + if err := sendRetry(sender, &fdpass.MessageWithFds{ + Message: notif, + FileDescriptors: batch, + }); err != nil { + fmt.Fprintf(os.Stderr, "error sending FD batch at %d/%d: %v\n", i, len(allFDs), err) + return + } + } +} + +// JSONRPCClient implements a JSON-RPC client. +type JSONRPCClient struct { + conn *net.UnixConn + sender *fdpass.Sender + receiver *fdpass.Receiver + mu sync.Mutex + nextID int64 +} + +// NewJSONRPCClient connects to a JSON-RPC server on the given Unix socket. +func NewJSONRPCClient(socketPath string) (*JSONRPCClient, error) { + conn, err := net.Dial("unix", socketPath) + if err != nil { + return nil, fmt.Errorf("failed to connect to socket: %w", err) + } + + unixConn, ok := conn.(*net.UnixConn) + if !ok { + conn.Close() + return nil, fmt.Errorf("connection is not a unix socket") + } + + return &JSONRPCClient{ + conn: unixConn, + sender: fdpass.NewSender(unixConn), + receiver: fdpass.NewReceiver(unixConn), + nextID: 1, + }, nil +} + +// Close closes the client connection. +func (c *JSONRPCClient) Close() error { + if c.receiver != nil { + c.receiver.Close() + } + if c.conn != nil { + return c.conn.Close() + } + return nil +} + +// GetSplitFDStream sends a GetSplitFDStream request and returns the response. +func (c *JSONRPCClient) GetSplitFDStream(layerID, parentID string) ([]byte, []*os.File, error) { + c.mu.Lock() + id := c.nextID + c.nextID++ + c.mu.Unlock() + + req := fdpass.NewRequest("GetSplitFDStream", map[string]interface{}{ + "layerId": layerID, + "parentId": parentID, + }, id) + + if err := sendRetry(c.sender, &fdpass.MessageWithFds{Message: req}); err != nil { + return nil, nil, fmt.Errorf("failed to send request: %w", err) + } + + // Receive the initial response with stream data and first batch of FDs + respMsg, err := c.receiver.Receive() + if err != nil { + return nil, nil, fmt.Errorf("failed to receive response: %w", err) + } + + resp, ok := respMsg.Message.(*fdpass.Response) + if !ok { + return nil, nil, fmt.Errorf("unexpected response type: %T", respMsg.Message) + } + + if resp.Error != nil { + return nil, nil, fmt.Errorf("server error: %s", resp.Error.Message) + } + + result, ok := resp.Result.(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("unexpected result type: %T", resp.Result) + } + + // Collect FDs: first batch came with the response + var allFDs []*os.File + allFDs = append(allFDs, respMsg.FileDescriptors...) + + // Read totalFDs to know how many more to expect + totalFDs := 0 + if tf, ok := result["totalFDs"].(float64); ok { + totalFDs = int(tf) + } + + // Receive remaining FDs from follow-up notifications + for len(allFDs) < totalFDs { + msg, err := c.receiver.Receive() + if err != nil { + for _, f := range allFDs { + f.Close() + } + return nil, nil, fmt.Errorf("failed to receive FD batch (%d/%d received): %w", len(allFDs), totalFDs, err) + } + allFDs = append(allFDs, msg.FileDescriptors...) + } + + if len(allFDs) == 0 { + return nil, nil, fmt.Errorf("no file descriptors received") + } + + // allFDs[0] is a memfd containing the stream data, the rest are content FDs + streamFile := allFDs[0] + contentFDs := allFDs[1:] + + streamData, err := io.ReadAll(streamFile) + streamFile.Close() + if err != nil { + for _, f := range contentFDs { + f.Close() + } + return nil, nil, fmt.Errorf("failed to read stream data from fd: %w", err) + } + + return streamData, contentFDs, nil +} + +// CreateSocketPair creates a pair of connected UNIX sockets. +func CreateSocketPair() (*net.UnixConn, *net.UnixConn, error) { + fds, err := unix.Socketpair(unix.AF_UNIX, unix.SOCK_STREAM, 0) + if err != nil { + return nil, nil, fmt.Errorf("failed to create socket pair: %w", err) + } + + clientFile := os.NewFile(uintptr(fds[0]), "client") + serverFile := os.NewFile(uintptr(fds[1]), "server") + + clientConn, err := net.FileConn(clientFile) + if err != nil { + clientFile.Close() + serverFile.Close() + return nil, nil, fmt.Errorf("failed to create client connection: %w", err) + } + + serverConn, err := net.FileConn(serverFile) + if err != nil { + clientConn.Close() + serverFile.Close() + return nil, nil, fmt.Errorf("failed to create server connection: %w", err) + } + + clientFile.Close() + serverFile.Close() + + clientUnix, ok := clientConn.(*net.UnixConn) + if !ok { + clientConn.Close() + serverConn.Close() + return nil, nil, fmt.Errorf("failed to cast client to UnixConn") + } + + serverUnix, ok := serverConn.(*net.UnixConn) + if !ok { + clientConn.Close() + serverConn.Close() + return nil, nil, fmt.Errorf("failed to cast server to UnixConn") + } + + return clientUnix, serverUnix, nil +} diff --git a/storage/pkg/splitfdstream/server_unsupported.go b/storage/pkg/splitfdstream/server_unsupported.go new file mode 100644 index 0000000000..fc9e8276e0 --- /dev/null +++ b/storage/pkg/splitfdstream/server_unsupported.go @@ -0,0 +1,55 @@ +//go:build !linux + +package splitfdstream + +import ( + "fmt" + "net" + "os" +) + +// JSONRPCServer is not supported on this platform. +type JSONRPCServer struct{} + +// NewJSONRPCServer creates a new JSON-RPC server stub for unsupported platforms. +func NewJSONRPCServer(driver any) *JSONRPCServer { + return &JSONRPCServer{} +} + +// HandleConnection is not supported on this platform. +func (s *JSONRPCServer) HandleConnection(conn *net.UnixConn) { + panic("JSONRPCServer is not supported on this platform") +} + +// Start is not supported on this platform. +func (s *JSONRPCServer) Start(socketPath string) error { + return fmt.Errorf("JSONRPCServer is not supported on this platform") +} + +// Stop is not supported on this platform. +func (s *JSONRPCServer) Stop() error { + return fmt.Errorf("JSONRPCServer is not supported on this platform") +} + +// JSONRPCClient is not supported on this platform. +type JSONRPCClient struct{} + +// NewJSONRPCClient creates a new JSON-RPC client stub for unsupported platforms. +func NewJSONRPCClient(socketPath string) (*JSONRPCClient, error) { + return nil, fmt.Errorf("JSONRPCClient is not supported on this platform") +} + +// Close is not supported on this platform. +func (c *JSONRPCClient) Close() error { + return fmt.Errorf("JSONRPCClient is not supported on this platform") +} + +// GetSplitFDStream is not supported on this platform. +func (c *JSONRPCClient) GetSplitFDStream(layerID, parentID string) ([]byte, []*os.File, error) { + return nil, nil, fmt.Errorf("GetSplitFDStream is not supported on this platform") +} + +// CreateSocketPair is not supported on this platform. +func CreateSocketPair() (*net.UnixConn, *net.UnixConn, error) { + return nil, nil, fmt.Errorf("CreateSocketPair is not supported on this platform") +} diff --git a/storage/pkg/splitfdstream/types.go b/storage/pkg/splitfdstream/types.go new file mode 100644 index 0000000000..af3e660558 --- /dev/null +++ b/storage/pkg/splitfdstream/types.go @@ -0,0 +1,83 @@ +package splitfdstream + +import ( + "encoding/binary" + "fmt" + "io" + "os" + + "go.podman.io/storage/pkg/idtools" +) + +// SplitFDStreamDriver defines the interface that storage drivers must implement +// to support splitfdstream operations. +type SplitFDStreamDriver interface { + // ApplySplitFDStream applies a splitfdstream to a layer. + ApplySplitFDStream(options *ApplySplitFDStreamOpts) (int64, error) + + // GetSplitFDStream generates a splitfdstream for a layer. + GetSplitFDStream(id, parent string, options *GetSplitFDStreamOpts) (io.ReadCloser, []*os.File, error) +} + +// ApplySplitFDStreamOpts provides options for ApplySplitFDStream operations. +type ApplySplitFDStreamOpts struct { + LayerID string + Stream io.Reader + FileDescriptors []*os.File + IgnoreChownErrors bool + MountLabel string + StagingDir string + IDMappings *idtools.IDMappings + ForceMask *os.FileMode +} + +// Validate checks if the options are valid. +func (opts *ApplySplitFDStreamOpts) Validate() error { + if opts.LayerID == "" && opts.StagingDir == "" { + return fmt.Errorf("either LayerID or StagingDir must be specified") + } + return nil +} + +// GetSplitFDStreamOpts provides options for GetSplitFDStream operations. +type GetSplitFDStreamOpts struct { + MountLabel string + IDMappings *idtools.IDMappings +} + +// SplitFDStreamWriter writes data in the composefs-rs splitfdstream format. +// The format uses signed 64-bit little-endian prefixes: +// - Negative prefix: abs(prefix) bytes of inline data follow +// - Non-negative prefix: reference to external file descriptor at index prefix +type SplitFDStreamWriter struct { + writer io.Writer +} + +// NewWriter creates a new SplitFDStreamWriter. +func NewWriter(w io.Writer) *SplitFDStreamWriter { + return &SplitFDStreamWriter{writer: w} +} + +// WriteInline writes inline data with a negative prefix indicating the data length. +func (w *SplitFDStreamWriter) WriteInline(data []byte) error { + if len(data) == 0 { + return nil + } + prefix := int64(-len(data)) + if err := binary.Write(w.writer, binary.LittleEndian, prefix); err != nil { + return fmt.Errorf("failed to write inline prefix: %w", err) + } + if _, err := w.writer.Write(data); err != nil { + return fmt.Errorf("failed to write inline data: %w", err) + } + return nil +} + +// WriteExternal writes a reference to an external file descriptor. +func (w *SplitFDStreamWriter) WriteExternal(fdIndex int) error { + prefix := int64(fdIndex) + if err := binary.Write(w.writer, binary.LittleEndian, prefix); err != nil { + return fmt.Errorf("failed to write external fd reference: %w", err) + } + return nil +} diff --git a/storage/store.go b/storage/store.go index 3d8ea50759..7a25136bfc 100644 --- a/storage/store.go +++ b/storage/store.go @@ -32,6 +32,7 @@ import ( "go.podman.io/storage/pkg/ioutils" "go.podman.io/storage/pkg/lockfile" "go.podman.io/storage/pkg/parsers" + "go.podman.io/storage/pkg/splitfdstream" "go.podman.io/storage/pkg/stringutils" "go.podman.io/storage/pkg/system" "go.podman.io/storage/types" @@ -612,6 +613,15 @@ type Store interface { Dedup(DedupArgs) (drivers.DedupResult, error) } +// SplitFDStreamStore extends the Store interface with splitfdstream capabilities. +// This API is experimental and can be changed without bumping the major version number. +type SplitFDStreamStore interface { + Store + + // SplitFDStreamSocket returns a socket for splitfdstream operations. + SplitFDStreamSocket() (*os.File, error) +} + // AdditionalLayer represents a layer that is contained in the additional layer store // This API is experimental and can be changed without bumping the major version number. type AdditionalLayer interface { @@ -783,9 +793,13 @@ type store struct { layerStoreUseGetters rwLayerStore // Almost all users should use the provided accessors instead of accessing this field directly. roLayerStoresUseGetters []roLayerStore // Almost all users should use the provided accessors instead of accessing this field directly. - // FIXME: The following fields need locking, and don’t have it. + // FIXME: The following fields need locking, and don't have it. additionalUIDs *idSet // Set by getAvailableIDs() additionalGIDs *idSet // Set by getAvailableIDs() + + // jsonRPCServer manages the JSON-RPC server for storage operations. + // This API is experimental and can be changed without bumping the major version number. + jsonRPCServer *splitfdstream.JSONRPCServer } // GetStore attempts to find an already-created Store object matching the @@ -4091,3 +4105,45 @@ func (s *store) Dedup(req DedupArgs) (drivers.DedupResult, error) { return rlstore.dedup(r) }) } + +// SplitFDStreamSocket returns a UNIX socket file descriptor for split FD stream operations. +// JSON-RPC requests for split FD stream operations are sent over this socket. +// The caller is responsible for closing the returned file when done. +// This API is experimental and can be changed without bumping the major version number. +func (s *store) SplitFDStreamSocket() (*os.File, error) { + if err := s.startUsingGraphDriver(); err != nil { + return nil, err + } + defer s.stopUsingGraphDriver() + + // Check if driver supports splitfdstream operations + if _, ok := s.graphDriver.(splitfdstream.SplitFDStreamDriver); !ok { + return nil, fmt.Errorf("driver %s does not support split FD stream operations: %w", s.graphDriver.String(), drivers.ErrNotSupported) + } + + // Create socket pair - one end for the caller, one end for the server + clientConn, serverConn, err := splitfdstream.CreateSocketPair() + if err != nil { + return nil, fmt.Errorf("failed to create socket pair: %w", err) + } + + // Get file descriptor from client connection before starting + // the server goroutine, so cleanup is straightforward on error. + clientFile, err := clientConn.File() + if err != nil { + clientConn.Close() + serverConn.Close() + return nil, fmt.Errorf("failed to get file from connection: %w", err) + } + clientConn.Close() + + // Initialize server if not already created + if s.jsonRPCServer == nil { + s.jsonRPCServer = splitfdstream.NewJSONRPCServer(s.graphDriver) + } + + // Start handling the server connection in a goroutine + go s.jsonRPCServer.HandleConnection(serverConn) + + return clientFile, nil +} diff --git a/storage/tests/splitfdstream.bats b/storage/tests/splitfdstream.bats new file mode 100755 index 0000000000..79c09f3b39 --- /dev/null +++ b/storage/tests/splitfdstream.bats @@ -0,0 +1,133 @@ +#!/usr/bin/env bats + +load helpers + +# start_server launches the json-rpc-server in the background, fully +# detached from bats file descriptors so it won't block test output. +# Sets SERVER_PID to the actual binary PID. +start_server() { + local extra_args=("$@") + ${STORAGE_BINARY} --graph ${TESTDIR}/root --run ${TESTDIR}/runroot \ + --storage-driver ${STORAGE_DRIVER} \ + ${STORAGE_OPTION:+--storage-opt=${STORAGE_OPTION}} \ + json-rpc-server "${extra_args[@]}" \ + /dev/null 2>&1 3>&- 4>&- 5>&- 6>&- 7>&- 8>&- 9>&- & + SERVER_PID=$! +} + +# stop_server kills the json-rpc-server and waits for it to exit. +stop_server() { + if [[ -n "$SERVER_PID" ]]; then + kill "$SERVER_PID" 2>/dev/null || true + wait "$SERVER_PID" 2>/dev/null || true + SERVER_PID= + fi +} + +# Override teardown to stop the server before the default teardown runs +# storage wipe. If the server is still running it holds the store lock +# and wipe would deadlock. +teardown() { + stop_server + run storage wipe + if [[ $status -ne 0 ]] ; then + echo "$output" + fi + run storage shutdown + if [[ $status -ne 0 ]] ; then + echo "$output" + fi + rm -fr ${TESTDIR} +} + +@test "splitfdstream json-rpc-server and apply-splitfdstream" { + case "$STORAGE_DRIVER" in + overlay*) + ;; + *) + skip "driver $STORAGE_DRIVER does not support splitfdstream" + ;; + esac + + # Create and populate a test layer + populate + + # Get the socket path from runroot + local runroot=`storage status 2>&1 | awk '/^Run Root:/{print $3}'` + local socket_path="$runroot/json-rpc.sock" + + # Start the JSON-RPC server in the background + start_server --socket "$socket_path" + + # Wait for socket to be created (max 10 seconds) + local count=0 + while [[ ! -S "$socket_path" && $count -lt 50 ]]; do + sleep 0.2 + count=$((count + 1)) + done + + # Check that the socket exists + [ -S "$socket_path" ] + + # Create a new layer using apply-splitfdstream + # This should connect to our JSON-RPC server and fetch the layer + run storage --debug=false apply-splitfdstream --socket "$socket_path" "$lowerlayer" + echo "apply-splitfdstream output: $output" + [ "$status" -eq 0 ] + [ "$output" != "" ] + + applied_layer="$output" + + # Verify the layer was created + run storage --debug=false layers + [ "$status" -eq 0 ] + [[ "$output" =~ "$applied_layer" ]] + + # Check that we can mount the applied layer + run storage --debug=false mount "$applied_layer" + [ "$status" -eq 0 ] + [ "$output" != "" ] + local applied_mount="$output" + + # Verify some expected content exists (from populate function) + [ -f "$applied_mount/layer1file1" ] + [ -f "$applied_mount/layer1file2" ] + [ -d "$applied_mount/layerdir1" ] + + # Unmount the layer + run storage unmount "$applied_layer" + [ "$status" -eq 0 ] + + # Kill the server before teardown runs storage wipe (which needs the store lock) + stop_server +} + +@test "splitfdstream server socket path uses runroot" { + case "$STORAGE_DRIVER" in + overlay*) + ;; + *) + skip "driver $STORAGE_DRIVER does not support splitfdstream" + ;; + esac + + # Get the expected socket path from runroot + local runroot=`storage status 2>&1 | awk '/^Run Root:/{print $3}'` + local expected_socket="$runroot/json-rpc.sock" + + # Start the JSON-RPC server in the background + start_server + + # Wait for socket to be created (max 10 seconds) + local count=0 + while [[ ! -S "$expected_socket" && $count -lt 50 ]]; do + sleep 0.2 + count=$((count + 1)) + done + + # Verify the socket is created in the correct location + [ -S "$expected_socket" ] + + # Kill the server before teardown runs storage wipe (which needs the store lock) + stop_server +} diff --git a/vendor/github.com/cgwalters/jsonrpc-fdpass-go/.gitignore b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/.gitignore new file mode 100644 index 0000000000..1ee0e6b891 --- /dev/null +++ b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/.gitignore @@ -0,0 +1,23 @@ +# Binaries +*.exe +*.exe~ +*.dll +*.so +*.dylib +/echo + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool +*.out + +# Dependency directories +vendor/ + +# Build output +/target/ + +# Integration test Rust build artifacts +tests-integration/target/ +tests-integration/Cargo.lock diff --git a/vendor/github.com/cgwalters/jsonrpc-fdpass-go/Justfile b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/Justfile new file mode 100644 index 0000000000..b76683a7ed --- /dev/null +++ b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/Justfile @@ -0,0 +1,49 @@ +# Format and lint Go code +check: + go fmt ./... + go vet ./... + +# Run unit tests +unit: + go test -v ./... + +# Run unit tests with race detector +test-race: + go test -race -v ./... + +# Build all packages +build: + go build ./... + +# Build the example +build-example: + go build -o target/echo ./examples/echo + +# Run all tests +test-all: unit + +# Clean build artifacts +clean: + rm -rf target/ + rm -rf tests-integration/target/ + go clean ./... + +# Full CI check (format, lint, test) +ci: check unit + +# Run the integration tests against the Rust implementation +# Requires: cargo, go +test-integration: build-integration-server + go test -v ./tests-integration/... + +# Build the Rust integration test server +build-integration-server: + cargo build --manifest-path tests-integration/Cargo.toml + +# Run the echo server example +run-server socket="/tmp/echo.sock": + go run ./examples/echo server {{socket}} + +# Run the echo client example +run-client socket="/tmp/echo.sock": + go run ./examples/echo client {{socket}} diff --git a/vendor/github.com/cgwalters/jsonrpc-fdpass-go/LICENSE b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/LICENSE new file mode 100644 index 0000000000..4e800f548b --- /dev/null +++ b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Colin Walters + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/cgwalters/jsonrpc-fdpass-go/README.md b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/README.md new file mode 100644 index 0000000000..7c5a6bf1e7 --- /dev/null +++ b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/README.md @@ -0,0 +1,68 @@ +# jsonrpc-fdpass-go + +A Go implementation of JSON-RPC 2.0 with file descriptor passing over Unix domain sockets. + +This library implements the protocol specified in [jsonrpc-fdpass](https://github.com/cgwalters/jsonrpc-fdpass). + +## Protocol Overview + +- **Transport**: Unix domain sockets (SOCK_STREAM) +- **Framing**: Self-delimiting JSON (streaming parser) +- **FD Passing**: Via sendmsg/recvmsg with SCM_RIGHTS ancillary data +- **FD Count**: Top-level `fds` field indicates the number of file descriptors attached + +When file descriptors are attached to a message, the `fds` field is automatically +set to the count of FDs. File descriptors are passed positionally—the application +layer defines the semantic mapping between FD positions and parameters. + +## Installation + +```bash +go get github.com/cgwalters/jsonrpc-fdpass-go +``` + +## Usage + +```go +package main + +import ( + "net" + "os" + + fdpass "github.com/cgwalters/jsonrpc-fdpass-go" +) + +func main() { + // Connect to a Unix socket + conn, _ := net.DialUnix("unix", nil, &net.UnixAddr{Name: "/tmp/socket.sock", Net: "unix"}) + + // Create sender and receiver + sender := fdpass.NewSender(conn) + receiver := fdpass.NewReceiver(conn) + + // Send a request with a file descriptor + file, _ := os.Open("example.txt") + defer file.Close() + + req := fdpass.NewRequest("readFile", map[string]interface{}{ + "path": "example.txt", + }, 1) + + msg := &fdpass.MessageWithFds{ + Message: req, + FileDescriptors: []*os.File{file}, + } + + // The sender automatically sets the "fds" field to 1 + sender.Send(msg) + + // Receive response + resp, _ := receiver.Receive() + // Handle resp.Message and resp.FileDescriptors +} +``` + +## License + +MIT diff --git a/vendor/github.com/cgwalters/jsonrpc-fdpass-go/message.go b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/message.go new file mode 100644 index 0000000000..b4dcde7e83 --- /dev/null +++ b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/message.go @@ -0,0 +1,216 @@ +// Package fdpass implements JSON-RPC 2.0 with file descriptor passing over Unix domain sockets. +package fdpass + +import ( + "encoding/json" + "os" +) + +// JSONRPCVersion is the JSON-RPC protocol version. +const JSONRPCVersion = "2.0" + +// FDsKey is the JSON key for the file descriptor count field. +const FDsKey = "fds" + +// FileDescriptorErrorCode is the error code for FD-related protocol errors. +const FileDescriptorErrorCode = -32050 + +// Request represents a JSON-RPC 2.0 request. +type Request struct { + JsonRpc string `json:"jsonrpc"` + Method string `json:"method"` + Params interface{} `json:"params,omitempty"` + ID interface{} `json:"id"` + // Fds is the number of file descriptors attached to this message. + Fds *int `json:"fds,omitempty"` +} + +// Response represents a JSON-RPC 2.0 response. +type Response struct { + JsonRpc string `json:"jsonrpc"` + Result interface{} `json:"result,omitempty"` + Error *Error `json:"error,omitempty"` + ID interface{} `json:"id"` + // Fds is the number of file descriptors attached to this message. + Fds *int `json:"fds,omitempty"` +} + +// Notification represents a JSON-RPC 2.0 notification (a request without an ID). +type Notification struct { + JsonRpc string `json:"jsonrpc"` + Method string `json:"method"` + Params interface{} `json:"params,omitempty"` + // Fds is the number of file descriptors attached to this message. + Fds *int `json:"fds,omitempty"` +} + +// Error represents a JSON-RPC 2.0 error object. +type Error struct { + Code int `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data,omitempty"` +} + +func (e *Error) Error() string { + return e.Message +} + +// MessageWithFds wraps a JSON-RPC message with associated file descriptors. +type MessageWithFds struct { + // Message is the JSON-RPC message (Request, Response, or Notification). + Message interface{} + // FileDescriptors are the file descriptors to pass with this message. + // The order corresponds to indices 0..N-1 matching the message's fds count. + FileDescriptors []*os.File +} + +// NewRequest creates a new JSON-RPC request. +func NewRequest(method string, params interface{}, id interface{}) *Request { + return &Request{ + JsonRpc: JSONRPCVersion, + Method: method, + Params: params, + ID: id, + } +} + +// NewResponse creates a new successful JSON-RPC response. +func NewResponse(result interface{}, id interface{}) *Response { + return &Response{ + JsonRpc: JSONRPCVersion, + Result: result, + ID: id, + } +} + +// NewErrorResponse creates a new error JSON-RPC response. +func NewErrorResponse(err *Error, id interface{}) *Response { + return &Response{ + JsonRpc: JSONRPCVersion, + Error: err, + ID: id, + } +} + +// NewNotification creates a new JSON-RPC notification. +func NewNotification(method string, params interface{}) *Notification { + return &Notification{ + JsonRpc: JSONRPCVersion, + Method: method, + Params: params, + } +} + +// GetFDCount reads the file descriptor count from a JSON value. +// Returns 0 if the `fds` field is absent or not a valid number. +func GetFDCount(value map[string]interface{}) int { + if fds, ok := value[FDsKey]; ok { + switch v := fds.(type) { + case float64: + return int(v) + case int: + return v + } + } + return 0 +} + +// FileDescriptorError creates a standard FD error for protocol violations. +func FileDescriptorError() *Error { + return &Error{ + Code: FileDescriptorErrorCode, + Message: "File Descriptor Error", + } +} + +// SetFDs sets the fds count on a Request. +func (r *Request) SetFDs(count int) { + if count > 0 { + r.Fds = &count + } else { + r.Fds = nil + } +} + +// GetFDs returns the fds count from a Request. +func (r *Request) GetFDs() int { + if r.Fds != nil { + return *r.Fds + } + return 0 +} + +// SetFDs sets the fds count on a Response. +func (r *Response) SetFDs(count int) { + if count > 0 { + r.Fds = &count + } else { + r.Fds = nil + } +} + +// GetFDs returns the fds count from a Response. +func (r *Response) GetFDs() int { + if r.Fds != nil { + return *r.Fds + } + return 0 +} + +// SetFDs sets the fds count on a Notification. +func (n *Notification) SetFDs(count int) { + if count > 0 { + n.Fds = &count + } else { + n.Fds = nil + } +} + +// GetFDs returns the fds count from a Notification. +func (n *Notification) GetFDs() int { + if n.Fds != nil { + return *n.Fds + } + return 0 +} + +// ParseMessage parses a raw JSON message into the appropriate type. +// It returns one of *Request, *Response, or *Notification. +func ParseMessage(data []byte) (interface{}, error) { + // First parse as a generic map to determine type + var raw map[string]json.RawMessage + if err := json.Unmarshal(data, &raw); err != nil { + return nil, err + } + + // Determine message type based on fields present + _, hasMethod := raw["method"] + _, hasID := raw["id"] + _, hasResult := raw["result"] + _, hasError := raw["error"] + + if hasMethod && hasID { + // Request + var req Request + if err := json.Unmarshal(data, &req); err != nil { + return nil, err + } + return &req, nil + } else if hasResult || hasError { + // Response + var resp Response + if err := json.Unmarshal(data, &resp); err != nil { + return nil, err + } + return &resp, nil + } else if hasMethod { + // Notification + var notif Notification + if err := json.Unmarshal(data, ¬if); err != nil { + return nil, err + } + return ¬if, nil + } + + return nil, &Error{Code: -32600, Message: "Invalid JSON-RPC message"} +} diff --git a/vendor/github.com/cgwalters/jsonrpc-fdpass-go/transport.go b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/transport.go new file mode 100644 index 0000000000..3a65af9f59 --- /dev/null +++ b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/transport.go @@ -0,0 +1,302 @@ +package fdpass + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "sync" + + "golang.org/x/sys/unix" +) + +const ( + // MaxFDsPerMessage is the maximum number of file descriptors per message. + MaxFDsPerMessage = 8 + // ReadBufferSize is the size of the read buffer. + ReadBufferSize = 4096 +) + +var ( + // ErrConnectionClosed is returned when the connection is closed. + ErrConnectionClosed = errors.New("connection closed") + // ErrFramingError is returned when JSON parsing fails. + ErrFramingError = errors.New("framing error: invalid JSON") + // ErrMismatchedCount is returned when the number of FDs doesn't match the fds field. + ErrMismatchedCount = errors.New("mismatched file descriptor count") +) + +// Sender sends JSON-RPC messages with file descriptors over a Unix socket. +type Sender struct { + conn *net.UnixConn + mu sync.Mutex +} + +// NewSender creates a new Sender for the given Unix connection. +func NewSender(conn *net.UnixConn) *Sender { + return &Sender{conn: conn} +} + +// Send sends a message with optional file descriptors. +func (s *Sender) Send(msg *MessageWithFds) error { + s.mu.Lock() + defer s.mu.Unlock() + + // Set the fds field on the message based on the number of file descriptors + fdCount := len(msg.FileDescriptors) + switch m := msg.Message.(type) { + case *Request: + m.SetFDs(fdCount) + case *Response: + m.SetFDs(fdCount) + case *Notification: + m.SetFDs(fdCount) + } + + // Serialize the message with the fds field set + msgData, err := json.Marshal(msg.Message) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + + // Get the raw file descriptor for the socket + rawConn, err := s.conn.SyscallConn() + if err != nil { + return fmt.Errorf("failed to get syscall conn: %w", err) + } + + var sendErr error + err = rawConn.Control(func(fd uintptr) { + sendErr = s.sendWithFDs(int(fd), msgData, msg.FileDescriptors) + }) + if err != nil { + return err + } + return sendErr +} + +func (s *Sender) sendWithFDs(sockfd int, data []byte, files []*os.File) error { + bytesSent := 0 + fdsSent := false + + for bytesSent < len(data) { + remaining := data[bytesSent:] + + var n int + var err error + + if !fdsSent && len(files) > 0 { + // First chunk with FDs: use sendmsg with ancillary data + fds := make([]int, len(files)) + for i, f := range files { + fds[i] = int(f.Fd()) + } + + rights := unix.UnixRights(fds...) + n, err = unix.SendmsgN(sockfd, remaining, rights, nil, 0) + if err != nil { + return fmt.Errorf("sendmsg failed: %w", err) + } + fdsSent = true + } else { + // No FDs or FDs already sent: use regular send + n, err = unix.Write(sockfd, remaining) + if err != nil { + return fmt.Errorf("write failed: %w", err) + } + } + + bytesSent += n + } + + return nil +} + +// Receiver receives JSON-RPC messages with file descriptors from a Unix socket. +type Receiver struct { + conn *net.UnixConn + buffer []byte + fdQueue []*os.File + mu sync.Mutex +} + +// NewReceiver creates a new Receiver for the given Unix connection. +func NewReceiver(conn *net.UnixConn) *Receiver { + return &Receiver{ + conn: conn, + buffer: make([]byte, 0), + fdQueue: make([]*os.File, 0), + } +} + +// Receive receives the next message with its file descriptors. +func (r *Receiver) Receive() (*MessageWithFds, error) { + r.mu.Lock() + defer r.mu.Unlock() + + for { + // Try to parse a complete message from the buffer + msg, err := r.tryParseMessage() + if err != nil { + return nil, err + } + if msg != nil { + return msg, nil + } + + // Need more data + if err := r.readMoreData(); err != nil { + return nil, err + } + } +} + +func (r *Receiver) tryParseMessage() (*MessageWithFds, error) { + if len(r.buffer) == 0 { + return nil, nil + } + + // Use streaming JSON decoder to find message boundaries + decoder := json.NewDecoder(bytes.NewReader(r.buffer)) + var value map[string]interface{} + + err := decoder.Decode(&value) + if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) { + // Incomplete JSON - need more data + return nil, nil + } + if err != nil { + // Actual parse error - framing error + return nil, fmt.Errorf("%w: %v", ErrFramingError, err) + } + + // Successfully parsed a complete JSON value + // Use InputOffset to find consumed bytes (Go 1.21+) + bytesConsumed := decoder.InputOffset() + + // Extract the consumed bytes for re-parsing + consumedData := r.buffer[:bytesConsumed] + + // Remove consumed bytes from buffer + r.buffer = r.buffer[bytesConsumed:] + + // Read the fds count from the message + fdCount := GetFDCount(value) + + // Check we have enough FDs + if fdCount > len(r.fdQueue) { + return nil, fmt.Errorf("%w: expected %d FDs, have %d in queue", + ErrMismatchedCount, fdCount, len(r.fdQueue)) + } + + // Dequeue FDs + fds := make([]*os.File, fdCount) + copy(fds, r.fdQueue[:fdCount]) + r.fdQueue = r.fdQueue[fdCount:] + + // Parse the message into the appropriate type + msg, err := ParseMessage(consumedData) + if err != nil { + return nil, err + } + + return &MessageWithFds{ + Message: msg, + FileDescriptors: fds, + }, nil +} + +func (r *Receiver) readMoreData() error { + rawConn, err := r.conn.SyscallConn() + if err != nil { + return fmt.Errorf("failed to get syscall conn: %w", err) + } + + var readErr error + var bytesRead int + var receivedFDs []*os.File + + err = rawConn.Read(func(fd uintptr) bool { + bytesRead, receivedFDs, readErr = r.recvWithFDs(int(fd)) + // Return true to indicate we're done with this read operation + // Return false only if we get EAGAIN/EWOULDBLOCK + if readErr != nil { + if errors.Is(readErr, unix.EAGAIN) || errors.Is(readErr, unix.EWOULDBLOCK) { + readErr = nil + return false // Tell runtime to wait and retry + } + } + return true + }) + + if err != nil { + return err + } + if readErr != nil { + return readErr + } + + if bytesRead == 0 && len(receivedFDs) == 0 { + return ErrConnectionClosed + } + + // Append received FDs to queue + r.fdQueue = append(r.fdQueue, receivedFDs...) + + return nil +} + +func (r *Receiver) recvWithFDs(sockfd int) (int, []*os.File, error) { + buf := make([]byte, ReadBufferSize) + // Allocate space for control message (for up to MaxFDsPerMessage FDs) + // Each FD is 4 bytes (int32), use CmsgSpace to get properly aligned size + oob := make([]byte, unix.CmsgSpace(MaxFDsPerMessage*4)) + + n, oobn, _, _, err := unix.Recvmsg(sockfd, buf, oob, unix.MSG_CMSG_CLOEXEC) + if err != nil { + return 0, nil, err + } + + // Append data to buffer + if n > 0 { + r.buffer = append(r.buffer, buf[:n]...) + } + + // Parse control messages for FDs + var files []*os.File + if oobn > 0 { + scms, err := unix.ParseSocketControlMessage(oob[:oobn]) + if err != nil { + return n, nil, fmt.Errorf("failed to parse control message: %w", err) + } + + for _, scm := range scms { + fds, err := unix.ParseUnixRights(&scm) + if err != nil { + continue + } + for _, fd := range fds { + files = append(files, os.NewFile(uintptr(fd), "")) + } + } + } + + return n, files, nil +} + +// Close closes the receiver and any pending file descriptors in the queue. +func (r *Receiver) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + + // Close any FDs remaining in the queue to prevent leaks + for _, f := range r.fdQueue { + f.Close() + } + r.fdQueue = nil + + return nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 1df200be5f..ee707eba23 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -30,6 +30,9 @@ github.com/VividCortex/ewma # github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d ## explicit github.com/acarl005/stripansi +# github.com/cgwalters/jsonrpc-fdpass-go v0.0.0-20260126203148-2bca851a3863 +## explicit; go 1.21 +github.com/cgwalters/jsonrpc-fdpass-go # github.com/checkpoint-restore/checkpointctl v1.5.0 ## explicit; go 1.24.6 github.com/checkpoint-restore/checkpointctl/lib