Skip to content

Commit 9822a42

Browse files
committed
feat(git): add observability for snapshot restore and serve paths
Adds OTel metrics and structured logging to track how repositories are populated on cachew pods (local/mirror/upstream) and how snapshots are served to workstations (cache/live), including duration, bytes, and throughput.
1 parent f554aa3 commit 9822a42

8 files changed

Lines changed: 359 additions & 29 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ block-test.hcl
88
.env
99

1010
.claude/
11+
.idea/
12+
state-*/
1113

1214
# Binaries
1315
/cachew

cmd/cachew/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,11 @@ type RestoreCmd struct {
200200
func (c *RestoreCmd) Run(ctx context.Context, cache cache.Cache) error {
201201
fmt.Fprintf(os.Stderr, "Restoring to %s...\n", c.Directory) //nolint:forbidigo
202202
namespacedCache := cache.Namespace(c.Namespace)
203-
if err := snapshot.Restore(ctx, namespacedCache, c.Key.Key(), c.Directory, c.ZstdThreads); err != nil {
203+
result, err := snapshot.Restore(ctx, namespacedCache, c.Key.Key(), c.Directory, c.ZstdThreads)
204+
if err != nil {
204205
return errors.Wrap(err, "failed to restore snapshot")
205206
}
207+
fmt.Fprintf(os.Stderr, "Restored %d bytes in %dms\n", result.BytesRead, result.Duration.Milliseconds()) //nolint:forbidigo
206208

207209
fmt.Fprintf(os.Stderr, "Snapshot restored: %s\n", c.Key.String()) //nolint:forbidigo
208210
return nil

internal/snapshot/snapshot.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,20 +159,51 @@ func StreamTo(ctx context.Context, w io.Writer, directory string, excludePattern
159159
return errors.Join(errs...)
160160
}
161161

162+
// RestoreResult contains metadata about a completed restore operation.
163+
type RestoreResult struct {
164+
// BytesRead is the number of compressed bytes read from the cache.
165+
BytesRead int64
166+
// Duration is the wall-clock time for the restore operation.
167+
Duration time.Duration
168+
}
169+
170+
// countingReader wraps an io.Reader and counts the number of bytes read.
171+
type countingReader struct {
172+
r io.Reader
173+
n int64
174+
}
175+
176+
func (c *countingReader) Read(p []byte) (int, error) {
177+
n, err := c.r.Read(p)
178+
c.n += int64(n)
179+
return n, err //nolint:wrapcheck
180+
}
181+
162182
// Restore downloads an archive from the cache and extracts it to a directory.
163183
//
164184
// The archive is decompressed with zstd and extracted with tar, preserving
165185
// all file permissions, ownership, and symlinks.
166186
// The operation is fully streaming - no temporary files are created.
167187
// threads controls zstd parallelism; 0 uses all available CPU cores.
168-
func Restore(ctx context.Context, remote cache.Cache, key cache.Key, directory string, threads int) error {
188+
func Restore(ctx context.Context, remote cache.Cache, key cache.Key, directory string, threads int) (*RestoreResult, error) {
189+
start := time.Now()
190+
169191
rc, _, err := remote.Open(ctx, key)
170192
if err != nil {
171-
return errors.Wrap(err, "failed to open object")
193+
return nil, errors.Wrap(err, "failed to open object")
172194
}
173195
defer rc.Close()
174196

175-
return Extract(ctx, rc, directory, threads)
197+
cr := &countingReader{r: rc}
198+
199+
if err := Extract(ctx, cr, directory, threads); err != nil {
200+
return nil, err
201+
}
202+
203+
return &RestoreResult{
204+
BytesRead: cr.n,
205+
Duration: time.Since(start),
206+
}, nil
176207
}
177208

178209
// Extract decompresses a zstd+tar stream into directory, preserving all file
@@ -183,6 +214,7 @@ func Extract(ctx context.Context, r io.Reader, directory string, threads int) er
183214
threads = runtime.NumCPU()
184215
}
185216

217+
// Create target directory if it doesn't exist
186218
if err := os.MkdirAll(directory, 0o750); err != nil {
187219
return errors.Wrap(err, "failed to create target directory")
188220
}

internal/snapshot/snapshot_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestCreateAndRestoreRoundTrip(t *testing.T) {
3838
assert.Equal(t, "application/zstd", headers.Get("Content-Type"))
3939

4040
dstDir := t.TempDir()
41-
err = snapshot.Restore(ctx, mem, key, dstDir, 0)
41+
_, err = snapshot.Restore(ctx, mem, key, dstDir, 0)
4242
assert.NoError(t, err)
4343

4444
content1, err := os.ReadFile(filepath.Join(dstDir, "file1.txt"))
@@ -75,7 +75,7 @@ func TestCreateWithExcludePatterns(t *testing.T) {
7575
assert.NoError(t, err)
7676

7777
dstDir := t.TempDir()
78-
err = snapshot.Restore(ctx, mem, key, dstDir, 0)
78+
_, err = snapshot.Restore(ctx, mem, key, dstDir, 0)
7979
assert.NoError(t, err)
8080

8181
_, err = os.Stat(filepath.Join(dstDir, "include.txt"))
@@ -111,7 +111,7 @@ func TestCreateExcludesOnlyGitLockFiles(t *testing.T) {
111111
assert.NoError(t, err)
112112

113113
dstDir := t.TempDir()
114-
err = snapshot.Restore(ctx, mem, key, dstDir, 0)
114+
_, err = snapshot.Restore(ctx, mem, key, dstDir, 0)
115115
assert.NoError(t, err)
116116

117117
// Tracked lock files must be present.
@@ -152,7 +152,7 @@ func TestCreatePreservesSymlinks(t *testing.T) {
152152
assert.NoError(t, err)
153153

154154
dstDir := t.TempDir()
155-
err = snapshot.Restore(ctx, mem, key, dstDir, 0)
155+
_, err = snapshot.Restore(ctx, mem, key, dstDir, 0)
156156
assert.NoError(t, err)
157157

158158
info, err := os.Lstat(filepath.Join(dstDir, "link.txt"))
@@ -219,7 +219,7 @@ func TestRestoreNonexistentKey(t *testing.T) {
219219
key := cache.Key{1, 2, 3}
220220

221221
dstDir := t.TempDir()
222-
err = snapshot.Restore(ctx, mem, key, dstDir, 0)
222+
_, err = snapshot.Restore(ctx, mem, key, dstDir, 0)
223223
assert.Error(t, err)
224224
}
225225

@@ -237,7 +237,7 @@ func TestRestoreCreatesTargetDirectory(t *testing.T) {
237237
assert.NoError(t, err)
238238

239239
dstDir := filepath.Join(t.TempDir(), "nested", "target")
240-
err = snapshot.Restore(ctx, mem, key, dstDir, 0)
240+
_, err = snapshot.Restore(ctx, mem, key, dstDir, 0)
241241
assert.NoError(t, err)
242242

243243
content, err := os.ReadFile(filepath.Join(dstDir, "file.txt"))
@@ -266,7 +266,7 @@ func TestRestoreContextCancellation(t *testing.T) {
266266
cancel()
267267

268268
dstDir := t.TempDir()
269-
err = snapshot.Restore(cancelCtx, mem, key, dstDir, 0)
269+
_, err = snapshot.Restore(cancelCtx, mem, key, dstDir, 0)
270270
assert.Error(t, err)
271271
}
272272

@@ -283,7 +283,7 @@ func TestCreateEmptyDirectory(t *testing.T) {
283283
assert.NoError(t, err)
284284

285285
dstDir := t.TempDir()
286-
err = snapshot.Restore(ctx, mem, key, dstDir, 0)
286+
_, err = snapshot.Restore(ctx, mem, key, dstDir, 0)
287287
assert.NoError(t, err)
288288

289289
entries, err := os.ReadDir(dstDir)
@@ -307,7 +307,7 @@ func TestCreateWithNestedDirectories(t *testing.T) {
307307
assert.NoError(t, err)
308308

309309
dstDir := t.TempDir()
310-
err = snapshot.Restore(ctx, mem, key, dstDir, 0)
310+
_, err = snapshot.Restore(ctx, mem, key, dstDir, 0)
311311
assert.NoError(t, err)
312312

313313
content, err := os.ReadFile(filepath.Join(dstDir, "a", "b", "c", "d", "e", "deep.txt"))

internal/strategy/git/git.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ func New(
140140
logger.InfoContext(ctx, "Startup fetch completed for existing repo", "upstream", repo.UpstreamURL(),
141141
"duration", time.Since(start))
142142

143+
recordCloneMetrics(ctx, "local", time.Since(start), 0)
144+
143145
postRefs, err := repo.GetLocalRefs(ctx)
144146
if err != nil {
145147
logger.WarnContext(ctx, "Failed to get post-fetch refs for existing repo", "upstream", repo.UpstreamURL(),
@@ -469,8 +471,10 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
469471

470472
logger.InfoContext(ctx, "Attempting mirror snapshot restore", "upstream", upstream)
471473

472-
if err := s.tryRestoreSnapshot(ctx, repo); err != nil {
473-
logger.InfoContext(ctx, "Mirror snapshot restore failed, falling back to clone", "upstream", upstream, "error", err)
474+
cloneStart := time.Now()
475+
restoreResult, restoreErr := s.tryRestoreSnapshot(ctx, repo)
476+
if restoreErr != nil {
477+
logger.InfoContext(ctx, "Mirror snapshot restore failed, falling back to clone", "upstream", upstream, "error", restoreErr)
474478
} else {
475479
logger.InfoContext(ctx, "Mirror snapshot restored, fetching to freshen", "upstream", upstream)
476480

@@ -500,6 +504,9 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
500504

501505
logger.InfoContext(ctx, "Post-restore fetch completed, serving", "upstream", upstream)
502506

507+
recordCloneSuccess(ctx, "mirror")
508+
recordCloneMetrics(ctx, "mirror", time.Since(cloneStart), restoreResult.BytesRead)
509+
503510
if s.config.SnapshotInterval > 0 {
504511
s.scheduleSnapshotJobs(repo)
505512
}
@@ -522,11 +529,15 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
522529

523530
if err != nil {
524531
logger.ErrorContext(ctx, "Clone failed", "upstream", upstream, "error", err)
532+
recordCloneFailure(ctx, "upstream", err)
525533
return
526534
}
527535

528536
logger.InfoContext(ctx, "Clone completed", "upstream", upstream, "path", repo.Path())
529537

538+
recordCloneSuccess(ctx, "upstream")
539+
recordCloneMetrics(ctx, "upstream", time.Since(cloneStart), 0)
540+
530541
if s.config.SnapshotInterval > 0 {
531542
s.scheduleSnapshotJobs(repo)
532543
}
@@ -539,28 +550,30 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
539550
// Mirror snapshots are bare repositories that can be extracted and used directly
540551
// without any conversion. On failure the repo path is cleaned up so the caller
541552
// can fall back to clone.
542-
func (s *Strategy) tryRestoreSnapshot(ctx context.Context, repo *gitclone.Repository) error {
553+
func (s *Strategy) tryRestoreSnapshot(ctx context.Context, repo *gitclone.Repository) (*snapshot.RestoreResult, error) {
543554
cacheKey := mirrorSnapshotCacheKey(repo.UpstreamURL())
544555

545556
if err := os.MkdirAll(filepath.Dir(repo.Path()), 0o750); err != nil {
546-
return errors.Wrap(err, "create parent directory for restore")
557+
return nil, errors.Wrap(err, "create parent directory for restore")
547558
}
548559

549560
logger := logging.FromContext(ctx)
550561

551-
if err := snapshot.Restore(ctx, s.cache, cacheKey, repo.Path(), s.config.ZstdThreads); err != nil {
562+
result, err := snapshot.Restore(ctx, s.cache, cacheKey, repo.Path(), s.config.ZstdThreads)
563+
if err != nil {
552564
_ = os.RemoveAll(repo.Path())
553-
return errors.Wrap(err, "restore mirror snapshot")
565+
return nil, errors.Wrap(err, "restore mirror snapshot")
554566
}
555-
logger.InfoContext(ctx, "Mirror snapshot extracted", "upstream", repo.UpstreamURL(), "path", repo.Path())
567+
logger.InfoContext(ctx, "Mirror snapshot extracted", "upstream", repo.UpstreamURL(), "path", repo.Path(),
568+
"bytes_read", result.BytesRead, "duration_ms", result.Duration.Milliseconds())
556569

557570
if err := repo.MarkRestored(ctx); err != nil {
558571
_ = os.RemoveAll(repo.Path())
559-
return errors.Wrap(err, "mark restored")
572+
return nil, errors.Wrap(err, "mark restored")
560573
}
561574
logger.InfoContext(ctx, "Repository marked as restored", "upstream", repo.UpstreamURL(), "state", repo.State())
562575

563-
return nil
576+
return result, nil
564577
}
565578

566579
func (s *Strategy) maybeBackgroundFetch(repo *gitclone.Repository) {

0 commit comments

Comments
 (0)