Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions image/docker/docker_image_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import (
"go.podman.io/image/v5/internal/set"
"go.podman.io/image/v5/internal/signature"
"go.podman.io/image/v5/internal/streamdigest"
"go.podman.io/image/v5/internal/uploadreader"
"go.podman.io/image/v5/manifest"
"go.podman.io/image/v5/pkg/blobinfocache/none"
compressiontypes "go.podman.io/image/v5/pkg/compression/types"
"go.podman.io/image/v5/types"
"go.podman.io/storage/pkg/terminablereader"
)

type dockerImageDestination struct {
Expand Down Expand Up @@ -183,7 +183,7 @@ func (d *dockerImageDestination) PutBlobWithOptions(ctx context.Context, stream
stream = io.TeeReader(stream, sizeCounter)

uploadLocation, err = func() (*url.URL, error) { // A scope for defer
uploadReader := uploadreader.NewUploadReader(stream)
uploadReader := terminablereader.NewTerminableReader(stream)
// This error text should never be user-visible, we terminate only after makeRequestToResolvedURL
// returns, so there isn’t a way for the error text to be provided to any of our callers.
defer uploadReader.Terminate(errors.New("Reading data from an already terminated upload"))
Expand Down
6 changes: 4 additions & 2 deletions storage/layers.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"go.podman.io/storage/pkg/stringid"
"go.podman.io/storage/pkg/system"
"go.podman.io/storage/pkg/tarlog"
"go.podman.io/storage/pkg/terminablereader"
"go.podman.io/storage/pkg/truncindex"
)

Expand Down Expand Up @@ -2619,7 +2620,8 @@
if err != nil {
return -1, err
}
defer uncompressed.Close()
var uncompressed_reader = terminablereader.NewTerminableReader(uncompressed)

Check failure on line 2623 in storage/layers.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofumpt)
defer uncompressed_reader.Terminate(errors.New("Reading data from an already terminated stream"))

Check failure on line 2624 in storage/layers.go

View workflow job for this annotation

GitHub Actions / lint

ST1005: error strings should not be capitalized (staticcheck)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(If this design persists: A comment somewhere about why we need the TerminableReader would be nice.)

idLogger, err := tarlog.NewLogger(func(h *tar.Header) {
if !strings.HasPrefix(path.Base(h.Name), archive.WhiteoutPrefix) {
uidLog[uint32(h.Uid)] = struct{}{}
Expand All @@ -2635,7 +2637,7 @@
if uncompressedDigester != nil {
uncompressedWriter = io.MultiWriter(uncompressedWriter, uncompressedDigester.Hash())
}
payload, err := asm.NewInputTarStream(io.TeeReader(uncompressed, uncompressedWriter), metadata, storage.NewDiscardFilePutter())
payload, err := asm.NewInputTarStream(io.TeeReader(uncompressed_reader, uncompressedWriter), metadata, storage.NewDiscardFilePutter())
if err != nil {
return -1, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package uploadreader
package terminablereader

import (
"io"
"sync"
)

// UploadReader is a pass-through reader for use in sending non-trivial data using the net/http
// TerminableReader is a pass-through reader for use in sending non-trivial data using the net/http
// package (http.NewRequest, http.Post and the like).
//
// The net/http package uses a separate goroutine to upload data to a HTTP connection,
Expand All @@ -16,29 +16,29 @@ import (
// As a result, any data used/updated by the io.Reader() provided as the request body may be
// used/updated even after http.Client.Do returns, causing races.
//
// To fix this, UploadReader provides a synchronized Terminate() method, which can block for
// To fix this, TerminableReader provides a synchronized Terminate() method, which can block for
// a not-completely-negligible time (for a duration of the underlying Read()), but guarantees that
// after Terminate() returns, the underlying reader is never used any more (unlike calling
// the cancellation callback of context.WithCancel, which returns before any recipients may have
// reacted to the cancellation).
type UploadReader struct {
type TerminableReader struct {
mutex sync.Mutex
// The following members can only be used with mutex held
reader io.Reader
terminationError error // nil if not terminated yet
}

// NewUploadReader returns an UploadReader for an "underlying" reader.
func NewUploadReader(underlying io.Reader) *UploadReader {
return &UploadReader{
// NewTerminableReader returns an TerminableReader for an "underlying" reader.
func NewTerminableReader(underlying io.Reader) *TerminableReader {
return &TerminableReader{
reader: underlying,
terminationError: nil,
}
}

// Read returns the error set by Terminate, if any, or calls the underlying reader.
// It is safe to call this from a different goroutine than Terminate.
func (ur *UploadReader) Read(p []byte) (int, error) {
func (ur *TerminableReader) Read(p []byte) (int, error) {
ur.mutex.Lock()
defer ur.mutex.Unlock()

Expand All @@ -53,7 +53,7 @@ func (ur *UploadReader) Read(p []byte) (int, error) {
// reader will never be used any more.
//
// It is safe to call this from a different goroutine than Read.
func (ur *UploadReader) Terminate(err error) {
func (ur *TerminableReader) Terminate(err error) {
ur.mutex.Lock() // May block for some time if ur.reader.Read() is in progress
defer ur.mutex.Unlock()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uploadreader
package terminablereader

import (
"bytes"
Expand All @@ -10,18 +10,18 @@ import (
"github.com/stretchr/testify/require"
)

func TestUploadReader(t *testing.T) {
func TestTerminableReader(t *testing.T) {
// This is a smoke test in a single goroutine, without really testing the locking.

data := bytes.Repeat([]byte{0x01}, 65535)
// No termination
ur := NewUploadReader(bytes.NewReader(data))
ur := NewTerminableReader(bytes.NewReader(data))
read, err := io.ReadAll(ur)
require.NoError(t, err)
assert.Equal(t, data, read)

// Terminated
ur = NewUploadReader(bytes.NewReader(data))
ur = NewTerminableReader(bytes.NewReader(data))
readLen := len(data) / 2
read, err = io.ReadAll(io.LimitReader(ur, int64(readLen)))
require.NoError(t, err)
Expand Down
Loading