diff --git a/commands/imagetools/create.go b/commands/imagetools/create.go index bea7f36bee86..70308391150f 100644 --- a/commands/imagetools/create.go +++ b/commands/imagetools/create.go @@ -225,14 +225,22 @@ func runCreate(ctx context.Context, dockerCli command.Cli, in createOptions, arg seed := repoTags[0] return progress.Wrap(fmt.Sprintf("pushing %s", repo), pw.Write, func(sub progress.SubLogger) error { ctx = withMediaTypeKeyPrefix(ctx) + // Create a single shared ingester for all concurrent + // copies to this repo. The pushingIngester's per-digest + // locking prevents concurrent pushes of the same blob + // from racing against each other on the registry. + ingester, err := r.IngesterForLocation(ctx, seed) + if err != nil { + return err + } eg2, _ := errgroup.WithContext(ctx) for _, desc := range manifests { eg2.Go(func() error { sub.Log(1, fmt.Appendf(nil, "copying %s from %s to %s\n", desc.Digest.String(), desc.Source.Ref.String(), repo)) - err := r.Copy(ctx, &imagetools.Source{ + err := r.CopyWithIngester(ctx, &imagetools.Source{ Ref: desc.Source.Ref, Desc: desc.Descriptor, - }, seed) + }, seed, ingester) if err != nil { return errors.Wrapf(err, "copy %s from %s to %s", desc.Digest.String(), desc.Source.Ref.String(), seed.String()) } diff --git a/util/imagetools/create.go b/util/imagetools/create.go index 2490bb86a75a..c13cae7ff9e1 100644 --- a/util/imagetools/create.go +++ b/util/imagetools/create.go @@ -252,6 +252,19 @@ func (r *Resolver) Push(ctx context.Context, ref *Location, desc ocispecs.Descri } func (r *Resolver) Copy(ctx context.Context, src *Source, dest *Location) error { + ingester, err := r.IngesterForLocation(ctx, dest) + if err != nil { + return err + } + return r.CopyWithIngester(ctx, src, dest, ingester) +} + +// CopyWithIngester copies a source manifest and its referrers to the +// destination using the provided ingester. Callers that issue multiple +// concurrent copies to the same destination should share a single ingester +// so that the underlying per-digest locking prevents duplicate blob pushes +// from racing against each other. +func (r *Resolver) CopyWithIngester(ctx context.Context, src *Source, dest *Location, ingester content.Ingester) error { ctx = remotes.WithMediaTypeKeyPrefix(ctx, "application/vnd.in-toto+json", "intoto") ctx = remotes.WithMediaTypeKeyPrefix(ctx, "application/vnd.oci.empty.v1+json", "empty") @@ -268,10 +281,6 @@ func (r *Resolver) Copy(ctx context.Context, src *Source, dest *Location) error if err != nil { return err } - ingester, err := r.ingesterForLocation(dest) - if err != nil { - return err - } referrers := &referrersProvider{base: referrersFunc(func(ctx context.Context, subject ocispecs.Descriptor) ([]ocispecs.Descriptor, error) { descs, err := r.FetchReferrers(ctx, src.Ref, subject.Digest) @@ -526,9 +535,12 @@ func dedupeDescriptors(descs []ocispecs.Descriptor) []ocispecs.Descriptor { return out } -func (r *Resolver) ingesterForLocation(loc *Location) (content.Ingester, error) { +// IngesterForLocation returns a content ingester for the given location. +// For registry locations a new pusher is created; for OCI layout locations +// the local content store is returned. +func (r *Resolver) IngesterForLocation(ctx context.Context, loc *Location) (content.Ingester, error) { if loc.IsRegistry() { - p, err := r.registryResolver().Pusher(context.TODO(), loc.Name()) + p, err := r.registryResolver().Pusher(ctx, loc.Name()) if err != nil { return nil, err }