Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions commands/imagetools/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,22 @@ func runCreate(ctx context.Context, dockerCli command.Cli, in createOptions, arg
seed := repoTags[0]
return progress.Wrap(fmt.Sprintf("pushing %s", repo), pw.Write, func(sub progress.SubLogger) error {
ctx = withMediaTypeKeyPrefix(ctx)
// Create a single shared ingester for all concurrent
// copies to this repo. The pushingIngester's per-digest
// locking prevents concurrent pushes of the same blob
// from racing against each other on the registry.
ingester, err := r.IngesterForLocation(ctx, seed)
if err != nil {
return err
}
eg2, _ := errgroup.WithContext(ctx)
for _, desc := range manifests {
eg2.Go(func() error {
sub.Log(1, fmt.Appendf(nil, "copying %s from %s to %s\n", desc.Digest.String(), desc.Source.Ref.String(), repo))
err := r.Copy(ctx, &imagetools.Source{
err := r.CopyWithIngester(ctx, &imagetools.Source{
Ref: desc.Source.Ref,
Desc: desc.Descriptor,
}, seed)
}, seed, ingester)
if err != nil {
return errors.Wrapf(err, "copy %s from %s to %s", desc.Digest.String(), desc.Source.Ref.String(), seed.String())
}
Expand Down
24 changes: 18 additions & 6 deletions util/imagetools/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,19 @@ func (r *Resolver) Push(ctx context.Context, ref *Location, desc ocispecs.Descri
}

func (r *Resolver) Copy(ctx context.Context, src *Source, dest *Location) error {
ingester, err := r.IngesterForLocation(ctx, dest)
if err != nil {
return err
}
return r.CopyWithIngester(ctx, src, dest, ingester)
}

// CopyWithIngester copies a source manifest and its referrers to the
// destination using the provided ingester. Callers that issue multiple
// concurrent copies to the same destination should share a single ingester
// so that the underlying per-digest locking prevents duplicate blob pushes
// from racing against each other.
func (r *Resolver) CopyWithIngester(ctx context.Context, src *Source, dest *Location, ingester content.Ingester) error {
ctx = remotes.WithMediaTypeKeyPrefix(ctx, "application/vnd.in-toto+json", "intoto")
ctx = remotes.WithMediaTypeKeyPrefix(ctx, "application/vnd.oci.empty.v1+json", "empty")

Expand All @@ -268,10 +281,6 @@ func (r *Resolver) Copy(ctx context.Context, src *Source, dest *Location) error
if err != nil {
return err
}
ingester, err := r.ingesterForLocation(dest)
if err != nil {
return err
}

referrers := &referrersProvider{base: referrersFunc(func(ctx context.Context, subject ocispecs.Descriptor) ([]ocispecs.Descriptor, error) {
descs, err := r.FetchReferrers(ctx, src.Ref, subject.Digest)
Expand Down Expand Up @@ -526,9 +535,12 @@ func dedupeDescriptors(descs []ocispecs.Descriptor) []ocispecs.Descriptor {
return out
}

func (r *Resolver) ingesterForLocation(loc *Location) (content.Ingester, error) {
// IngesterForLocation returns a content ingester for the given location.
// For registry locations a new pusher is created; for OCI layout locations
// the local content store is returned.
func (r *Resolver) IngesterForLocation(ctx context.Context, loc *Location) (content.Ingester, error) {
if loc.IsRegistry() {
p, err := r.registryResolver().Pusher(context.TODO(), loc.Name())
p, err := r.registryResolver().Pusher(ctx, loc.Name())
if err != nil {
return nil, err
}
Expand Down
Loading