Skip to content

Commit c3b4733

Browse files
feat(git): serve fresh snapshots via delta bundles (#197)
## Summary Serves fresh git snapshots by supplementing cached S3 snapshots with small delta bundles, avoiding expensive full snapshot regeneration on every request. ### Problem For busy repos, the cached snapshot becomes stale quickly. Previously, detecting staleness meant regenerating the entire snapshot from scratch — effectively invalidating the cache on every request. The regenerated snapshot would itself be stale by the next request. ### Solution When the cached snapshot HEAD differs from the local mirror HEAD, cachew: 1. Streams the cached snapshot as-is (`application/zstd`) 2. Sets an `X-Cachew-Bundle-Url` response header pointing to a separate `/snapshot.bundle` endpoint 3. The client fetches the bundle **in parallel** with snapshot extraction 4. The bundle is a small git bundle covering commits between the snapshot HEAD and the mirror current HEAD Delta bundles are cached in S3 (2h TTL) so any cachew pod can serve them, eliminating cross-pod 404s. ### Key details - **Wire format**: Snapshot response is always plain `application/zstd`. Bundle is served at a separate URL as `application/x-git-bundle`. Fully backward compatible — old clients ignore the header. - **Freshness**: Bounded by the mirror fetch interval (15m), but in practice much fresher since each snapshot request triggers an async mirror refresh. - **Bundle size**: Bounded by the S3 snapshot regeneration interval (1h), keeping bundles small. - **S3 caching**: Bundles are proactively generated and cached in S3 during snapshot serving, and also cached on-demand at the bundle endpoint. - **No read locks** on `git bundle create` or `git rev-parse` — git handles its own file-level locking, consistent with `serveFromBackend`. ### Deploy order Cachew first (backward compatible), then blox. --------- Co-authored-by: Amp <amp@ampcode.com>
1 parent 67f6395 commit c3b4733

File tree

4 files changed

+194
-41
lines changed

4 files changed

+194
-41
lines changed

internal/cache/s3.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net/http"
1212
"os"
1313
"runtime"
14+
"strconv"
1415
"time"
1516

1617
"github.com/alecthomas/errors"
@@ -244,6 +245,8 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, err
244245
headers.Set("Last-Modified", objInfo.LastModified.UTC().Format(http.TimeFormat))
245246
}
246247

248+
headers.Set("Content-Length", strconv.FormatInt(objInfo.Size, 10))
249+
247250
// Reset expiration time to implement LRU (same as disk cache).
248251
// Only refresh when remaining TTL is below 50% of max to avoid a
249252
// server-side copy on every read.

internal/snapshot/snapshot.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ import (
2424
// The operation is fully streaming - no temporary files are created.
2525
// Exclude patterns use tar's --exclude syntax.
2626
// threads controls zstd parallelism; 0 uses all available CPU cores.
27-
func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory string, ttl time.Duration, excludePatterns []string, threads int) error {
27+
// Any extra headers are merged into the cache metadata alongside the default
28+
// Content-Type and Content-Disposition headers.
29+
func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory string, ttl time.Duration, excludePatterns []string, threads int, extraHeaders ...http.Header) error {
2830
if threads <= 0 {
2931
threads = runtime.NumCPU()
3032
}
@@ -39,6 +41,13 @@ func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory st
3941
headers := make(http.Header)
4042
headers.Set("Content-Type", "application/zstd")
4143
headers.Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(directory)+".tar.zst"))
44+
for _, eh := range extraHeaders {
45+
for k, vals := range eh {
46+
for _, v := range vals {
47+
headers.Set(k, v)
48+
}
49+
}
50+
}
4251

4352
wc, err := remote.Create(ctx, key, headers, ttl)
4453
if err != nil {

internal/strategy/git/git.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,11 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) {
219219
return
220220
}
221221

222+
if strings.HasSuffix(pathValue, "/snapshot.bundle") {
223+
s.handleBundleRequest(w, r, host, pathValue)
224+
return
225+
}
226+
222227
service := r.URL.Query().Get("service")
223228
isReceivePack := service == "git-receive-pack" || strings.HasSuffix(pathValue, "/git-receive-pack")
224229

internal/strategy/git/snapshot.go

Lines changed: 176 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import (
88
"os"
99
"os/exec"
1010
"path/filepath"
11+
"strconv"
1112
"strings"
1213
"sync"
14+
"time"
1315

1416
"github.com/alecthomas/errors"
1517

@@ -35,6 +37,10 @@ func mirrorSnapshotCacheKey(upstreamURL string) cache.Key {
3537
return cache.NewKey(upstreamURL + ".mirror-snapshot")
3638
}
3739

40+
func bundleCacheKey(upstreamURL, baseCommit string) cache.Key {
41+
return cache.NewKey(upstreamURL + ".bundle." + baseCommit)
42+
}
43+
3844
// remoteURLForSnapshot returns the URL to embed as remote.origin.url in snapshots.
3945
// When a server URL is configured, it returns the cachew URL for the repo so that
4046
// git pull goes through cachew. Otherwise it falls back to the upstream URL.
@@ -101,10 +107,19 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone
101107
return err
102108
}
103109

110+
// Capture the snapshot's HEAD so we can later build a delta bundle between
111+
// the cached snapshot and the current mirror state.
112+
headSHA, err := revParse(ctx, snapshotDir, "HEAD")
113+
if err != nil {
114+
_ = os.RemoveAll(snapshotDir) //nolint:gosec
115+
return errors.Wrap(err, "rev-parse HEAD for snapshot")
116+
}
117+
extraHeaders := http.Header{}
118+
extraHeaders.Set("X-Cachew-Snapshot-Commit", headSHA)
119+
104120
cacheKey := snapshotCacheKey(upstream)
105-
excludePatterns := []string{"./.git/*.lock"}
106121

107-
err = snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, 0, excludePatterns, s.config.ZstdThreads)
122+
err = snapshot.Create(ctx, s.cache, cacheKey, snapshotDir, 0, nil, s.config.ZstdThreads, extraHeaders)
108123

109124
// Always clean up the snapshot working directory.
110125
if rmErr := os.RemoveAll(snapshotDir); rmErr != nil { //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL
@@ -176,8 +191,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
176191
repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.tar.zst"))
177192
upstreamURL := "https://" + host + "/" + repoPath
178193

179-
// Ensure the local mirror is ready and up to date before considering any
180-
// cached snapshot, so we never serve stale data to workstations.
194+
// Ensure the local mirror is ready before considering any cached snapshot.
181195
repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL)
182196
if repoErr != nil {
183197
logger.ErrorContext(ctx, "Failed to get or create clone", "upstream", upstreamURL, "error", repoErr)
@@ -189,23 +203,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
189203
http.Error(w, "Repository unavailable", http.StatusServiceUnavailable)
190204
return
191205
}
192-
// Fetch in the background to keep the mirror fresh for subsequent
193-
// git-fetch/git-pull operations through cachew, but don't block
194-
// snapshot serving on it.
195-
refsStale, err := s.checkRefsStale(ctx, repo)
196-
if err != nil {
197-
logger.WarnContext(ctx, "Failed to check upstream refs", "upstream", upstreamURL, "error", err)
198-
}
199-
if refsStale {
200-
logger.InfoContext(ctx, "Refs stale for snapshot request, fetching in background", "upstream", upstreamURL)
201-
go func() {
202-
bgCtx := context.WithoutCancel(ctx)
203-
if err := repo.Fetch(bgCtx); err != nil {
204-
logger.WarnContext(bgCtx, "Background fetch for snapshot failed", "upstream", upstreamURL, "error", err)
205-
}
206-
}()
207-
}
208-
206+
s.maybeBackgroundFetch(repo)
209207
cacheKey := snapshotCacheKey(upstreamURL)
210208

211209
reader, headers, err := s.cache.Open(ctx, cacheKey)
@@ -215,30 +213,170 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
215213
return
216214
}
217215

218-
// Always serve a cached snapshot if one exists. The workstation will
219-
// git-fetch through cachew after extracting the snapshot, which picks
220-
// up any commits that arrived since the snapshot was built. Regeneration
221-
// happens in the background via the periodic snapshot job and the
222-
// background upload in writeSnapshotSpool, keeping the cached snapshot
223-
// reasonably fresh without blocking requests.
224-
if reader != nil {
225-
logger.DebugContext(ctx, "Serving cached snapshot", "upstream", upstreamURL)
226-
}
227-
228216
if reader == nil {
229217
s.serveSnapshotWithSpool(w, r, repo, upstreamURL)
230218
return
231219
}
232220
defer reader.Close()
233221

234-
for key, values := range headers {
235-
for _, value := range values {
236-
w.Header().Add(key, value)
222+
if err := s.serveSnapshotWithBundle(ctx, w, reader, headers, repo, upstreamURL); err != nil {
223+
logger.ErrorContext(ctx, "Failed to serve snapshot", "upstream", upstreamURL, "error", err)
224+
}
225+
}
226+
227+
func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) {
228+
ctx := r.Context()
229+
logger := logging.FromContext(ctx)
230+
231+
repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/snapshot.bundle"))
232+
upstreamURL := "https://" + host + "/" + repoPath
233+
234+
base := r.URL.Query().Get("base")
235+
if base == "" {
236+
http.Error(w, "missing base query parameter", http.StatusBadRequest)
237+
return
238+
}
239+
240+
bKey := bundleCacheKey(upstreamURL, base)
241+
242+
// Try serving from cache first — works on any pod.
243+
if reader, _, err := s.cache.Open(ctx, bKey); err == nil && reader != nil {
244+
defer reader.Close()
245+
w.Header().Set("Content-Type", "application/x-git-bundle")
246+
if _, err := io.Copy(w, reader); err != nil {
247+
logger.WarnContext(ctx, "Failed to stream cached bundle", "upstream", upstreamURL, "error", err)
237248
}
249+
return
250+
}
251+
252+
// Fallback: generate from local mirror.
253+
repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL)
254+
if repoErr != nil {
255+
logger.ErrorContext(ctx, "Failed to get or create clone", "upstream", upstreamURL, "error", repoErr)
256+
http.Error(w, "Internal server error", http.StatusInternalServerError)
257+
return
258+
}
259+
if cloneErr := s.ensureCloneReady(ctx, repo); cloneErr != nil {
260+
logger.ErrorContext(ctx, "Clone unavailable for bundle", "upstream", upstreamURL, "error", cloneErr)
261+
http.Error(w, "Repository unavailable", http.StatusServiceUnavailable)
262+
return
238263
}
239-
if _, err = io.Copy(w, reader); err != nil {
240-
logger.ErrorContext(ctx, "Failed to stream snapshot", "upstream", upstreamURL, "error", err)
264+
265+
bundleData, err := s.createBundle(ctx, repo, base)
266+
if err != nil {
267+
logger.WarnContext(ctx, "Failed to create bundle", "upstream", upstreamURL, "base", base, "error", err)
268+
http.Error(w, "Bundle not available", http.StatusNotFound)
269+
return
270+
}
271+
272+
// Cache for future requests from any pod.
273+
s.cacheBundleAsync(ctx, bKey, bundleData)
274+
275+
w.Header().Set("Content-Type", "application/x-git-bundle")
276+
w.Header().Set("Content-Length", strconv.Itoa(len(bundleData)))
277+
if _, err := w.Write(bundleData); err != nil { //nolint:gosec // bundleData is a git bundle generated from a trusted local mirror
278+
logger.WarnContext(ctx, "Failed to write bundle response", "upstream", upstreamURL, "error", err)
279+
}
280+
}
281+
282+
func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseWriter, reader io.ReadCloser, headers http.Header, repo *gitclone.Repository, upstreamURL string) error {
283+
snapshotCommit := headers.Get("X-Cachew-Snapshot-Commit")
284+
mirrorHead := s.getMirrorHead(ctx, repo)
285+
286+
if snapshotCommit != "" && mirrorHead != "" && snapshotCommit != mirrorHead {
287+
repoPath, err := gitclone.RepoPathFromURL(upstreamURL)
288+
if err == nil {
289+
bundleURL := fmt.Sprintf("/git/%s/snapshot.bundle?base=%s", repoPath, snapshotCommit)
290+
w.Header().Set("X-Cachew-Bundle-Url", bundleURL)
291+
}
292+
293+
// Proactively generate and cache the bundle so any pod can serve it.
294+
go func() {
295+
bgCtx := context.WithoutCancel(ctx)
296+
logger := logging.FromContext(bgCtx)
297+
bundleData, err := s.createBundle(bgCtx, repo, snapshotCommit)
298+
if err != nil {
299+
logger.WarnContext(bgCtx, "Failed to pre-generate bundle", "upstream", upstreamURL, "error", err)
300+
return
301+
}
302+
s.cacheBundleSync(bgCtx, bundleCacheKey(upstreamURL, snapshotCommit), bundleData)
303+
}()
304+
}
305+
306+
w.Header().Set("Content-Type", "application/zstd")
307+
_, err := io.Copy(w, reader)
308+
return errors.Wrap(err, "stream snapshot")
309+
}
310+
311+
const bundleCacheTTL = 2 * time.Hour
312+
313+
func (s *Strategy) cacheBundleAsync(ctx context.Context, key cache.Key, data []byte) {
314+
go func() {
315+
s.cacheBundleSync(context.WithoutCancel(ctx), key, data)
316+
}()
317+
}
318+
319+
func (s *Strategy) cacheBundleSync(ctx context.Context, key cache.Key, data []byte) {
320+
logger := logging.FromContext(ctx)
321+
headers := http.Header{"Content-Type": {"application/x-git-bundle"}}
322+
wc, err := s.cache.Create(ctx, key, headers, bundleCacheTTL)
323+
if err != nil {
324+
logger.WarnContext(ctx, "Failed to cache bundle", "error", err)
325+
return
326+
}
327+
if _, err := wc.Write(data); err != nil {
328+
logger.WarnContext(ctx, "Failed to write bundle to cache", "error", err)
329+
_ = wc.Close()
330+
return
331+
}
332+
if err := wc.Close(); err != nil {
333+
logger.WarnContext(ctx, "Failed to close bundle cache writer", "error", err)
334+
}
335+
}
336+
337+
func revParse(ctx context.Context, repoDir, ref string) (string, error) {
338+
cmd := exec.CommandContext(ctx, "git", "-C", repoDir, "rev-parse", ref) // #nosec G204 G702
339+
output, err := cmd.Output()
340+
if err != nil {
341+
return "", errors.Wrapf(err, "git rev-parse %s", ref)
342+
}
343+
return strings.TrimSpace(string(output)), nil
344+
}
345+
346+
func (s *Strategy) getMirrorHead(ctx context.Context, repo *gitclone.Repository) string {
347+
head, _ := revParse(ctx, repo.Path(), "HEAD") //nolint:errcheck // best-effort; empty string signals failure to callers
348+
return head
349+
}
350+
351+
func (s *Strategy) createBundle(ctx context.Context, repo *gitclone.Repository, baseCommit string) ([]byte, error) {
352+
// No read lock needed: git bundle create reads objects through git's own
353+
// file-level locking, safe to run concurrently with fetches.
354+
headRef := "HEAD"
355+
if out, err := exec.CommandContext(ctx, "git", "-C", repo.Path(), "symbolic-ref", "HEAD").Output(); err == nil { // #nosec G204 G702
356+
headRef = strings.TrimSpace(string(out))
357+
}
358+
359+
tmpFile, err := os.CreateTemp("", "cachew-bundle-*.bundle")
360+
if err != nil {
361+
return nil, errors.Wrap(err, "create bundle temp file")
362+
}
363+
bundlePath := tmpFile.Name()
364+
defer os.Remove(bundlePath) //nolint:errcheck
365+
if err := tmpFile.Close(); err != nil {
366+
return nil, errors.Wrap(err, "close bundle temp file")
367+
}
368+
369+
cmd := exec.CommandContext(ctx, "git", "-C", repo.Path(), "bundle", "create", //nolint:gosec // baseCommit is a SHA string from rev-parse
370+
bundlePath, headRef, "^"+baseCommit)
371+
if output, err := cmd.CombinedOutput(); err != nil {
372+
return nil, errors.Wrapf(err, "git bundle create: %s", string(output))
373+
}
374+
375+
data, err := os.ReadFile(bundlePath) //nolint:gosec // bundlePath is a temp file we created
376+
if err != nil {
377+
return nil, errors.Wrap(err, "read bundle file")
241378
}
379+
return data, nil
242380
}
243381

244382
// serveSnapshotWithSpool handles snapshot cache misses using the spool pattern.
@@ -303,8 +441,7 @@ func (s *Strategy) streamSnapshotDirect(w http.ResponseWriter, r *http.Request,
303441
w.Header().Set("Content-Type", "application/zstd")
304442
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(repoDir)+".tar.zst"))
305443

306-
excludePatterns := []string{"./.git/*.lock"}
307-
if err := snapshot.StreamTo(ctx, w, repoDir, excludePatterns, s.config.ZstdThreads); err != nil {
444+
if err := snapshot.StreamTo(ctx, w, repoDir, nil, s.config.ZstdThreads); err != nil {
308445
logger.ErrorContext(ctx, "Failed to stream snapshot to client", "upstream", upstreamURL, "error", err)
309446
}
310447
}
@@ -369,8 +506,7 @@ func (s *Strategy) writeSnapshotSpool(w http.ResponseWriter, r *http.Request, re
369506
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(repoDir)+".tar.zst"))
370507

371508
tw := NewSpoolTeeWriter(w, spool)
372-
excludePatterns := []string{"./.git/*.lock"}
373-
if err := snapshot.StreamTo(ctx, tw, repoDir, excludePatterns, s.config.ZstdThreads); err != nil {
509+
if err := snapshot.StreamTo(ctx, tw, repoDir, nil, s.config.ZstdThreads); err != nil {
374510
logger.ErrorContext(ctx, "Failed to stream snapshot to client", "upstream", upstreamURL, "error", err)
375511
spool.MarkError(err)
376512
} else {

0 commit comments

Comments
 (0)