Skip to content

Commit 23edb05

Browse files
authored
skip deduping running nested execs (dagger#10264)
* skip deduping running nested exec function calls De-duping currently running nested execs, such as module function calls, can break when: 1. Client A starts the function call 2. Client B makes the same call, gets deduped with the existing one 3. Client A cancels the call (i.e. ctrl-C's, CI job shuts down for whatever reason, etc.) 4. The call continues running, but now is trying to talk back to a session that doesn't exist anymore when dagger APIs are invoked Errors vary, but example from the new integ test that repros this: ``` testctx.go:197: 29 : │ [15.0s] | failed to return error: input: get or init client: initialize client: failed to load module: load moduleSource(refString: ".").asModule: Module!: load base: load: failed to stat local path: failed to stat path: failed to get requester session: session for "b9y05fidme5ahpym64tr8ymq5" not found testctx.go:197: 29 : │ [15.0s] | testctx.go:197: 29 : │ [15.0s] | original error: input: get or init client: initialize client: failed to load module: load moduleSource(refString: ".").asModule: Module!: load base: load: failed to stat local path: failed to stat path: failed to get requester session: session for "b9y05fidme5ahpym64tr8ymq5" not found testctx.go:197: 29 : │ [15.0s] | Stderr: testctx.go:197: 29 : │ [15.0s] | time=2025-04-24T20:39:19.608Z level=ERROR msg="failed to emit telemetry" error="failed to upload metrics: failed to send metrics to http://127.0.0.1:38113/v1/metrics: 400 Bad Request" ``` Fixing this by adding support for skipping the dedupe behavior for selected calls. Signed-off-by: Erik Sipsma <erik@sipsma.dev> * use optional buildkit edge merging Signed-off-by: Erik Sipsma <erik@sipsma.dev> * cleanup zillions of return vals Signed-off-by: Erik Sipsma <erik@sipsma.dev> * add extra unit test coverage for skip dedupe Signed-off-by: Erik Sipsma <erik@sipsma.dev> * add changelog Signed-off-by: Erik Sipsma <erik@sipsma.dev> * de-dupe all dagql calls for now There were occasional failures in the newly added integ test from non-nested calls. This turned out to be because buildkit refs were attempted to be used from the session that closed, which makes sense since those buildkit refs get released when the session closes. This is fairly tricky to address while buildkit refs are still being used in our codebase, so this disables all de-duping on the dagql level for now. The ability to de-dupe is left in (and still used in other codepaths that use that code, namely filesync), so it can be re-enabled in the future. So far there doesn't seem to be a huge difference in test runtime, so the performance impact is likely not that great. The dagql cache is still enabled so we only lose de-duping of exactly overlapping concurrent calls. Signed-off-by: Erik Sipsma <erik@sipsma.dev> --------- Signed-off-by: Erik Sipsma <erik@sipsma.dev>
1 parent 63d9e00 commit 23edb05

11 files changed

Lines changed: 235 additions & 37 deletions

File tree

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
kind: Fixed
2+
body: Fix "failed to return error" and "failed to emit telemetry" errors when two
3+
identical functions execute at the same time and one client cancels the request.
4+
time: 2025-05-01T18:44:24.032929067-07:00
5+
custom:
6+
Author: sipsma
7+
PR: "10264"

core/container_exec.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,9 @@ func (container *Container) WithExec(ctx context.Context, opts ContainerExecOpts
308308
llb.Platform(platform.Spec()),
309309
execMDOpt,
310310
}
311+
if opts.ExperimentalPrivilegedNesting {
312+
marshalOpts = append(marshalOpts, llb.SkipEdgeMerge)
313+
}
311314
execDef, err := execSt.Root().Marshal(ctx, marshalOpts...)
312315
if err != nil {
313316
return nil, fmt.Errorf("marshal root: %w", err)

core/integration/cross_session_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/moby/buildkit/identity"
1919
"github.com/stretchr/testify/require"
2020
fs "github.com/tonistiigi/fsutil/copy"
21+
"golang.org/x/sync/errgroup"
2122
)
2223

2324
// TODO: add more tests for longer chains of cache hits that then diverge (i.e. constructor + some function cache hit, then diverge)
@@ -1518,3 +1519,71 @@ func (*Test) Fn2(ctx context.Context, secret *dagger.Secret) *dagger.Container {
15181519
}
15191520
})
15201521
}
1522+
1523+
func (ModuleSuite) TestCrossSessionDedupeOfNestedExec(ctx context.Context, t *testctx.T) {
1524+
callMod := func(c *dagger.Client) error {
1525+
_, err := goGitBase(t, c).
1526+
WithWorkdir("/work").
1527+
WithEnvVariable("CACHEBUSTER", identity.NewID()).
1528+
With(daggerExec("init", "--source=.", "--name=test", "--sdk=go")).
1529+
WithNewFile("main.go", `package main
1530+
1531+
import (
1532+
"context"
1533+
"fmt"
1534+
"strconv"
1535+
"time"
1536+
)
1537+
1538+
type Test struct {}
1539+
1540+
func (Test) Fn(ctx context.Context) error {
1541+
ctr, err := dag.Container().
1542+
From("alpine:3.20").
1543+
WithExec([]string{"sh", "-c", "echo "+strconv.Itoa(int(time.Now().UnixNano()))+"> /foo.txt"}).
1544+
Sync(ctx)
1545+
if err != nil {
1546+
return err
1547+
}
1548+
1549+
fmt.Println("sleeping", time.Now().UnixNano())
1550+
time.Sleep(20 * time.Second)
1551+
fmt.Println("awoken", time.Now().UnixNano())
1552+
1553+
ctr, err = ctr.WithExec([]string{"true"}).Sync(ctx)
1554+
return err
1555+
}
1556+
`,
1557+
).
1558+
With(daggerCall("fn")).
1559+
Sync(ctx)
1560+
return err
1561+
}
1562+
1563+
c1 := connect(ctx, t)
1564+
c2 := connect(ctx, t)
1565+
1566+
var eg errgroup.Group
1567+
1568+
eg.Go(func() error {
1569+
time.Sleep(10 * time.Second)
1570+
t.Log("closing c1")
1571+
c1.Close()
1572+
t.Log("closed c1")
1573+
return nil
1574+
})
1575+
1576+
eg.Go(func() error {
1577+
callMod(c1)
1578+
t.Log("c1 call complete")
1579+
return nil
1580+
})
1581+
1582+
eg.Go(func() error {
1583+
time.Sleep(5 * time.Second)
1584+
return callMod(c2)
1585+
})
1586+
1587+
err := eg.Wait()
1588+
require.NoError(t, err)
1589+
}

core/postcall.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func ResourceTransferPostCall(
143143
// The longer term fix for this type of issue is to have more dagql awareness of edges between
144144
// cache results such that a function call return value result inherently results in any referenced
145145
// secrets also staying in cache.
146-
_, err = destDag.Cache.GetOrInitializeWithCallbacks(ctx, secret.inst.ID().Digest(),
146+
_, err = destDag.Cache.GetOrInitializeWithCallbacks(ctx, secret.inst.ID().Digest(), true,
147147
func(ctx context.Context) (*dagql.CacheValWithCallbacks, error) {
148148
return &dagql.CacheValWithCallbacks{
149149
Value: secret.inst,

dagql/objects.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -359,30 +359,36 @@ func NoopDone(res Typed, cached bool, rerr error) {}
359359

360360
// Select calls the field on the instance specified by the selector
361361
func (r Instance[T]) Select(ctx context.Context, s *Server, sel Selector) (Typed, *call.ID, error) {
362-
inputArgs, newID, doNotCache, err := r.preselect(ctx, s, sel)
362+
preselectResult, err := r.preselect(ctx, s, sel)
363363
if err != nil {
364364
return nil, nil, err
365365
}
366-
return r.call(ctx, s, newID, inputArgs, doNotCache)
366+
return r.call(ctx, s, preselectResult.newID, preselectResult.inputArgs, preselectResult.doNotCache)
367367
}
368368

369369
func (r Instance[T]) ReturnType(ctx context.Context, s *Server, sel Selector) (Typed, *call.ID, error) {
370-
_, newID, _, err := r.preselect(ctx, s, sel)
370+
preselectResult, err := r.preselect(ctx, s, sel)
371371
if err != nil {
372372
return nil, nil, err
373373
}
374-
returnType, err := r.returnType(newID)
374+
returnType, err := r.returnType(preselectResult.newID)
375375
if err != nil {
376376
return nil, nil, err
377377
}
378-
return returnType, newID, nil
378+
return returnType, preselectResult.newID, nil
379+
}
380+
381+
type preselectResult struct {
382+
inputArgs map[string]Input
383+
newID *call.ID
384+
doNotCache bool
379385
}
380386

381-
func (r Instance[T]) preselect(ctx context.Context, s *Server, sel Selector) (map[string]Input, *call.ID, bool, error) {
387+
func (r Instance[T]) preselect(ctx context.Context, s *Server, sel Selector) (*preselectResult, error) {
382388
view := sel.View
383389
field, ok := r.Class.Field(sel.Field, view)
384390
if !ok {
385-
return nil, nil, false, fmt.Errorf("Select: %s has no such field: %q", r.Class.TypeName(), sel.Field)
391+
return nil, fmt.Errorf("Select: %s has no such field: %q", r.Class.TypeName(), sel.Field)
386392
}
387393
if field.Spec.ViewFilter == nil {
388394
// fields in the global view shouldn't attach the current view to the
@@ -417,7 +423,7 @@ func (r Instance[T]) preselect(ctx context.Context, s *Server, sel Selector) (ma
417423

418424
case argSpec.Type.Type().NonNull:
419425
// error out if the arg is missing but required
420-
return nil, nil, false, fmt.Errorf("missing required argument: %q", argSpec.Name)
426+
return nil, fmt.Errorf("missing required argument: %q", argSpec.Name)
421427
}
422428
}
423429
// TODO: it's better DX if it matches schema order
@@ -450,7 +456,7 @@ func (r Instance[T]) preselect(ctx context.Context, s *Server, sel Selector) (ma
450456
Digest: origDgst,
451457
})
452458
if err != nil {
453-
return nil, nil, false, fmt.Errorf("failed to compute cache key for %s.%s: %w", r.Type().Name(), sel.Field, err)
459+
return nil, fmt.Errorf("failed to compute cache key for %s.%s: %w", r.Type().Name(), sel.Field, err)
454460
}
455461

456462
if len(cacheCfg.UpdatedArgs) > 0 {
@@ -488,7 +494,11 @@ func (r Instance[T]) preselect(ctx context.Context, s *Server, sel Selector) (ma
488494
}
489495
}
490496

491-
return inputArgs, newID, doNotCache, nil
497+
return &preselectResult{
498+
inputArgs: inputArgs,
499+
newID: newID,
500+
doNotCache: doNotCache,
501+
}, nil
492502
}
493503

494504
// Call calls the field on the instance specified by the ID.
@@ -554,7 +564,7 @@ func (r Instance[T]) call(
554564
return s.telemetry(ctx, r, newID)
555565
}))
556566
}
557-
res, err := s.Cache.GetOrInitializeWithCallbacks(ctx, callCacheKey, func(ctx context.Context) (*CacheValWithCallbacks, error) {
567+
res, err := s.Cache.GetOrInitializeWithCallbacks(ctx, callCacheKey, true, func(ctx context.Context) (*CacheValWithCallbacks, error) {
558568
valWithCallbacks, err := r.Class.Call(ctx, r, newID.Field(), View(newID.View()), inputArgs)
559569
if err != nil {
560570
return nil, err

dagql/session_cache.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (c *SessionCache) GetOrInitialize(
7676
fn func(context.Context) (CacheValueType, error),
7777
opts ...CacheCallOpt,
7878
) (CacheResult, error) {
79-
return c.GetOrInitializeWithCallbacks(ctx, key, func(ctx context.Context) (*CacheValWithCallbacks, error) {
79+
return c.GetOrInitializeWithCallbacks(ctx, key, false, func(ctx context.Context) (*CacheValWithCallbacks, error) {
8080
val, err := fn(ctx)
8181
if err != nil {
8282
return nil, err
@@ -88,6 +88,7 @@ func (c *SessionCache) GetOrInitialize(
8888
func (c *SessionCache) GetOrInitializeWithCallbacks(
8989
ctx context.Context,
9090
key CacheKeyType,
91+
skipDedupe bool,
9192
fn func(context.Context) (*CacheValWithCallbacks, error),
9293
opts ...CacheCallOpt,
9394
) (res CacheResult, err error) {
@@ -114,7 +115,7 @@ func (c *SessionCache) GetOrInitializeWithCallbacks(
114115
ctx = telemetryCtx
115116
}
116117

117-
res, err = c.cache.GetOrInitializeWithCallbacks(ctx, key, fn)
118+
res, err = c.cache.GetOrInitializeWithCallbacks(ctx, key, skipDedupe, fn)
118119
if err != nil {
119120
return nil, err
120121
}

engine/buildkit/ref.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"os"
88
"path"
9+
"runtime/debug"
910
"strconv"
1011
"strings"
1112

@@ -240,7 +241,9 @@ func (r *ref) Result(ctx context.Context) (bksolver.CachedResult, error) {
240241
res, err := r.resultProxy.Result(ctx)
241242
if err != nil {
242243
// writing log w/ %+v so that we can see stack traces embedded in err by buildkit's usage of pkg/errors
243-
bklog.G(ctx).Errorf("ref evaluate error: %+v", err)
244+
bklog.G(ctx).
245+
WithField("caller stack", string(debug.Stack())).
246+
Errorf("ref evaluate error: %+v", err)
244247
err = includeBuildkitContextCancelledLine(err)
245248
return nil, WrapError(ctx, err, r.c)
246249
}

engine/cache/cache.go

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Cache[K comparable, V any] interface {
2626
GetOrInitializeWithCallbacks(
2727
context.Context,
2828
K,
29+
bool,
2930
func(context.Context) (*ValueWithCallbacks[V], error),
3031
) (Result[K, V], error)
3132

@@ -60,8 +61,9 @@ func NewCache[K comparable, V any]() Cache[K, V] {
6061
}
6162

6263
type cache[K comparable, V any] struct {
63-
mu sync.Mutex
64-
calls map[K]*result[K, V]
64+
mu sync.Mutex
65+
ongoingCalls map[K]*result[K, V]
66+
completedCalls map[K]*result[K, V]
6567
}
6668

6769
var _ Cache[int, int] = &cache[int, int]{}
@@ -107,7 +109,7 @@ func (c *cache[K, V]) Size() int {
107109
c.mu.Lock()
108110
defer c.mu.Unlock()
109111

110-
return len(c.calls)
112+
return len(c.ongoingCalls) + len(c.completedCalls)
111113
}
112114

113115
func (c *cache[K, V]) GetOrInitializeValue(
@@ -125,7 +127,7 @@ func (c *cache[K, V]) GetOrInitialize(
125127
key K,
126128
fn func(context.Context) (V, error),
127129
) (Result[K, V], error) {
128-
return c.GetOrInitializeWithCallbacks(ctx, key, func(ctx context.Context) (*ValueWithCallbacks[V], error) {
130+
return c.GetOrInitializeWithCallbacks(ctx, key, false, func(ctx context.Context) (*ValueWithCallbacks[V], error) {
129131
val, err := fn(ctx)
130132
if err != nil {
131133
return nil, err
@@ -137,6 +139,7 @@ func (c *cache[K, V]) GetOrInitialize(
137139
func (c *cache[K, V]) GetOrInitializeWithCallbacks(
138140
ctx context.Context,
139141
key K,
142+
skipDedupe bool,
140143
fn func(context.Context) (*ValueWithCallbacks[V], error),
141144
) (Result[K, V], error) {
142145
var zeroKey K
@@ -160,15 +163,29 @@ func (c *cache[K, V]) GetOrInitializeWithCallbacks(
160163
}
161164

162165
c.mu.Lock()
163-
if c.calls == nil {
164-
c.calls = make(map[K]*result[K, V])
166+
if c.ongoingCalls == nil {
167+
c.ongoingCalls = make(map[K]*result[K, V])
168+
}
169+
if c.completedCalls == nil {
170+
c.completedCalls = make(map[K]*result[K, V])
165171
}
166172

167-
if res, ok := c.calls[key]; ok {
168-
// already an ongoing call
169-
res.waiters++
173+
if res, ok := c.completedCalls[key]; ok {
174+
res.refCount++
170175
c.mu.Unlock()
171-
return c.wait(ctx, key, res)
176+
return &perCallResult[K, V]{
177+
result: res,
178+
hitCache: true,
179+
}, nil
180+
}
181+
182+
if !skipDedupe {
183+
if res, ok := c.ongoingCalls[key]; ok {
184+
// already an ongoing call
185+
res.waiters++
186+
c.mu.Unlock()
187+
return c.wait(ctx, key, res)
188+
}
172189
}
173190

174191
// make a new call with ctx that's only canceled when all caller contexts are canceled
@@ -183,7 +200,11 @@ func (c *cache[K, V]) GetOrInitializeWithCallbacks(
183200
cancel: cancel,
184201
waiters: 1,
185202
}
186-
c.calls[key] = res
203+
204+
if !skipDedupe {
205+
c.ongoingCalls[key] = res
206+
}
207+
187208
go func() {
188209
defer close(res.waitCh)
189210
valWithCallbacks, err := fn(callCtx)
@@ -237,6 +258,13 @@ func (c *cache[K, V]) wait(ctx context.Context, key K, res *result[K, V]) (*perC
237258
}
238259

239260
if err == nil {
261+
delete(c.ongoingCalls, key)
262+
if existingRes, ok := c.completedCalls[key]; ok {
263+
res = existingRes
264+
} else {
265+
c.completedCalls[key] = res
266+
}
267+
240268
res.refCount++
241269
return &perCallResult[K, V]{
242270
result: res,
@@ -246,7 +274,8 @@ func (c *cache[K, V]) wait(ctx context.Context, key K, res *result[K, V]) (*perC
246274

247275
if res.refCount == 0 {
248276
// error happened and no refs left, delete it now
249-
delete(c.calls, key)
277+
delete(c.ongoingCalls, key)
278+
delete(c.completedCalls, key)
250279
}
251280
return nil, err
252281
}
@@ -270,7 +299,8 @@ func (res *result[K, V]) Release(ctx context.Context) error {
270299
var onRelease OnReleaseFunc
271300
if res.refCount == 0 && res.waiters == 0 {
272301
// no refs left and no one waiting on it, delete from cache
273-
delete(res.cache.calls, res.key)
302+
delete(res.cache.ongoingCalls, res.key)
303+
delete(res.cache.completedCalls, res.key)
274304
onRelease = res.onRelease
275305
}
276306
res.cache.mu.Unlock()

0 commit comments

Comments
 (0)