Skip to content

Commit 1e54ca2

Browse files
authored
Merge pull request #3731 from tonistiigi/imagetools-push-parallelization
imagetools: share ingester across concurrent copies
2 parents de2c485 + e4f6e37 commit 1e54ca2

2 files changed

Lines changed: 28 additions & 8 deletions

File tree

commands/imagetools/create.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,14 +225,22 @@ func runCreate(ctx context.Context, dockerCli command.Cli, in createOptions, arg
225225
seed := repoTags[0]
226226
return progress.Wrap(fmt.Sprintf("pushing %s", repo), pw.Write, func(sub progress.SubLogger) error {
227227
ctx = withMediaTypeKeyPrefix(ctx)
228+
// Create a single shared ingester for all concurrent
229+
// copies to this repo. The pushingIngester's per-digest
230+
// locking prevents concurrent pushes of the same blob
231+
// from racing against each other on the registry.
232+
ingester, err := r.IngesterForLocation(ctx, seed)
233+
if err != nil {
234+
return err
235+
}
228236
eg2, _ := errgroup.WithContext(ctx)
229237
for _, desc := range manifests {
230238
eg2.Go(func() error {
231239
sub.Log(1, fmt.Appendf(nil, "copying %s from %s to %s\n", desc.Digest.String(), desc.Source.Ref.String(), repo))
232-
err := r.Copy(ctx, &imagetools.Source{
240+
err := r.CopyWithIngester(ctx, &imagetools.Source{
233241
Ref: desc.Source.Ref,
234242
Desc: desc.Descriptor,
235-
}, seed)
243+
}, seed, ingester)
236244
if err != nil {
237245
return errors.Wrapf(err, "copy %s from %s to %s", desc.Digest.String(), desc.Source.Ref.String(), seed.String())
238246
}

util/imagetools/create.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,19 @@ func (r *Resolver) Push(ctx context.Context, ref *Location, desc ocispecs.Descri
252252
}
253253

254254
func (r *Resolver) Copy(ctx context.Context, src *Source, dest *Location) error {
255+
ingester, err := r.IngesterForLocation(ctx, dest)
256+
if err != nil {
257+
return err
258+
}
259+
return r.CopyWithIngester(ctx, src, dest, ingester)
260+
}
261+
262+
// CopyWithIngester copies a source manifest and its referrers to the
263+
// destination using the provided ingester. Callers that issue multiple
264+
// concurrent copies to the same destination should share a single ingester
265+
// so that the underlying per-digest locking prevents duplicate blob pushes
266+
// from racing against each other.
267+
func (r *Resolver) CopyWithIngester(ctx context.Context, src *Source, dest *Location, ingester content.Ingester) error {
255268
ctx = remotes.WithMediaTypeKeyPrefix(ctx, "application/vnd.in-toto+json", "intoto")
256269
ctx = remotes.WithMediaTypeKeyPrefix(ctx, "application/vnd.oci.empty.v1+json", "empty")
257270

@@ -268,10 +281,6 @@ func (r *Resolver) Copy(ctx context.Context, src *Source, dest *Location) error
268281
if err != nil {
269282
return err
270283
}
271-
ingester, err := r.ingesterForLocation(dest)
272-
if err != nil {
273-
return err
274-
}
275284

276285
referrers := &referrersProvider{base: referrersFunc(func(ctx context.Context, subject ocispecs.Descriptor) ([]ocispecs.Descriptor, error) {
277286
descs, err := r.FetchReferrers(ctx, src.Ref, subject.Digest)
@@ -526,9 +535,12 @@ func dedupeDescriptors(descs []ocispecs.Descriptor) []ocispecs.Descriptor {
526535
return out
527536
}
528537

529-
func (r *Resolver) ingesterForLocation(loc *Location) (content.Ingester, error) {
538+
// IngesterForLocation returns a content ingester for the given location.
539+
// For registry locations a new pusher is created; for OCI layout locations
540+
// the local content store is returned.
541+
func (r *Resolver) IngesterForLocation(ctx context.Context, loc *Location) (content.Ingester, error) {
530542
if loc.IsRegistry() {
531-
p, err := r.registryResolver().Pusher(context.TODO(), loc.Name())
543+
p, err := r.registryResolver().Pusher(ctx, loc.Name())
532544
if err != nil {
533545
return nil, err
534546
}

0 commit comments

Comments
 (0)