Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions internal/execute/tsctests/tscwatch_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tsctests

import (
"fmt"
"strings"
"testing"

Expand All @@ -9,6 +10,30 @@ import (

func TestWatch(t *testing.T) {
t.Parallel()
bunDependencyTest := func() *tscInput {
files := FileMap{}
var index strings.Builder
var fileNames strings.Builder
fileNames.WriteString(`"index.ts"`)
for i := range 12 {
name := fmt.Sprintf("pkg%d", i)
value := fmt.Sprintf("value%d", i)
index.WriteString(fmt.Sprintf(`import { %[1]s } from "./node_modules/.bun/%[2]s/index"; %[1]s;`, value, name))
index.WriteString("\n")
files["/home/src/workspaces/project/node_modules/.bun/"+name+"/index.ts"] = fmt.Sprintf("export const %s = %d;", value, i)
fileNames.WriteString(fmt.Sprintf(`, "node_modules/.bun/%s/index.ts"`, name))
}
files["/home/src/workspaces/project/index.ts"] = index.String()
files["/home/src/workspaces/project/tsconfig.json"] = fmt.Sprintf(`{
"compilerOptions": {},
"files": [%s]
}`, fileNames.String())
return &tscInput{
subScenario: "watch handles many bun dependency files",
files: files,
commandLineArgs: []string{"--watch"},
}
}
testCases := []*tscInput{
{
subScenario: "watch with no tsconfig",
Expand All @@ -25,6 +50,7 @@ func TestWatch(t *testing.T) {
},
commandLineArgs: []string{"--watch", "--incremental"},
},
bunDependencyTest(),
{
subScenario: "watch skips build when no files change",
files: FileMap{
Expand Down
107 changes: 91 additions & 16 deletions internal/fswatch/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ type watcher struct {
debounce *debounce // lazily created in getOrCreateDirWatch
}

const recursiveConsolidateThreshold = 10

func (w *watcher) Name() string { return w.name }
func (w *watcher) String() string { return w.name }
func (w *watcher) Available() bool { return w.factory != nil }
Expand Down Expand Up @@ -266,6 +268,54 @@ func (w *watcher) getImpl() (watcherImpl, error) {
return impl, nil
}

func (w *watcher) keyForDirWatch(dir string, recursive bool) string {
if recursive {
return dir + "\x00recursive"
}
return dir
}

func (w *watcher) findCoveringRecursiveWatchLocked(dir string) *dirWatch {
var best *dirWatch
for _, dw := range w.dirWatches {
if !dw.recursive || !isInDirectoryOrSelf(dw.dir, dir) {
continue
}
if best == nil || len(dw.dir) > len(best.dir) {
best = dw
}
}
return best
}

func (w *watcher) findConsolidationDirLocked(dir string) string {
if !w.HasFastRecursiveBackend() {
return ""
}
parent := filepath.Dir(dir)
for parent != dir && parent != "." {
if filepath.Dir(parent) == parent {
break
}
count := 1
for _, dw := range w.dirWatches {
if isInDirectoryOrSelf(parent, dw.dir) {
count++
}
}
if count >= recursiveConsolidateThreshold {
return parent
}
next := filepath.Dir(parent)
if next == parent {
break
}
dir = parent
parent = next
}
return ""
}

func (w *watcher) getOrCreateDirWatch(dir string, physicalDir string, recursive bool) *dirWatch {
w.mu.Lock()
defer w.mu.Unlock()
Expand All @@ -275,10 +325,22 @@ func (w *watcher) getOrCreateDirWatch(dir string, physicalDir string, recursive
if w.debounce == nil {
w.debounce = newDebounce()
}
key := dir
if recursive {
key = dir + "\x00recursive"

if w.HasFastRecursiveBackend() {
if dw := w.findCoveringRecursiveWatchLocked(dir); dw != nil {
return dw
}
if consolidationDir := w.findConsolidationDirLocked(dir); consolidationDir != "" {
dir = consolidationDir
physicalDir = physicalDirFor(dir)
recursive = true
if dw := w.findCoveringRecursiveWatchLocked(dir); dw != nil {
return dw
}
}
}

key := w.keyForDirWatch(dir, recursive)
if dw, ok := w.dirWatches[key]; ok {
return dw
}
Expand All @@ -291,10 +353,7 @@ func (w *watcher) getOrCreateDirWatch(dir string, physicalDir string, recursive
func (w *watcher) removeDirWatch(dw *dirWatch) {
w.mu.Lock()
defer w.mu.Unlock()
key := dw.dir
if dw.recursive {
key = dw.dir + "\x00recursive"
}
key := w.keyForDirWatch(dw.dir, dw.recursive)
if existing, ok := w.dirWatches[key]; ok && existing == dw {
delete(w.dirWatches, key)
dw.destroyDebounce()
Expand All @@ -321,7 +380,7 @@ func (w *watcher) WatchDirectory(dir string, fn WatchCallback, opts ...WatchOpti
}

dw := w.getOrCreateDirWatch(dir, physicalDir, sopts.recursive)
id, _ := dw.watch(fn, sopts.ignore)
id, _ := dw.watch(dir, sopts.recursive, fn, sopts.ignore)

impl, err := w.getImpl()
if err != nil {
Expand Down Expand Up @@ -512,9 +571,11 @@ func (b *watcherBase) handleWatcherError(werr *dirWatchError) {
// ----- dirWatch: per-directory watch state -------------------------

type callback struct {
id uint64
fn WatchCallback
ignore func(path string) bool
id uint64
dir string
recursive bool
fn WatchCallback
ignore func(path string) bool
}

// dirWatchError associates an error with a specific directory watch.
Expand Down Expand Up @@ -663,18 +724,21 @@ func (dw *dirWatch) triggerCallbacks() {
}
events, err := dw.events.drain()
cbs := slices.Clone(dw.callbacks)
recursive := dw.recursive
dw.mu.Unlock()

for _, cb := range cbs {
cbEvents := events
if cb.ignore != nil || !recursive {
if cb.ignore != nil || !cb.recursive || cb.dir != dw.dir {
filtered := make([]Event, 0, len(events))
for _, e := range events {
if cb.ignore != nil && cb.ignore(e.Path) {
continue
}
if !recursive && !isDirectChild(dw.dir, e.Path) {
if cb.recursive {
if !isInDirectoryOrSelf(cb.dir, e.Path) {
continue
}
} else if !isDirectChild(cb.dir, e.Path) {
continue
}
filtered = append(filtered, e)
Expand All @@ -687,6 +751,17 @@ func (dw *dirWatch) triggerCallbacks() {
}
}

func isInDirectoryOrSelf(dir, path string) bool {
if path == dir {
return true
}
if !strings.HasPrefix(path, dir) {
return false
}
rest := path[len(dir):]
return len(rest) > 0 && (rest[0] == '/' || rest[0] == filepath.Separator)
}

// isDirectChild reports whether path is an immediate child of dir.
// Both paths must be absolute. Returns false for path == dir.
func isDirectChild(dir, path string) bool {
Expand All @@ -704,12 +779,12 @@ func isDirectChild(dir, path string) bool {
return len(rest) > 0 && !strings.ContainsRune(rest, '/') && !strings.ContainsRune(rest, filepath.Separator)
}

func (dw *dirWatch) watch(fn WatchCallback, ignore func(path string) bool) (uint64, bool) {
func (dw *dirWatch) watch(dir string, recursive bool, fn WatchCallback, ignore func(path string) bool) (uint64, bool) {
dw.mu.Lock()
defer dw.mu.Unlock()
dw.nextCBID++
id := dw.nextCBID
dw.callbacks = append(dw.callbacks, callback{id: id, fn: fn, ignore: ignore})
dw.callbacks = append(dw.callbacks, callback{id: id, dir: dir, recursive: recursive, fn: fn, ignore: ignore})
return id, true
}

Expand Down
102 changes: 102 additions & 0 deletions internal/fswatch/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,108 @@ func TestSubscribeMultipleDifferentDirs(t *testing.T) {
})
}

type countingWatcherImpl struct {
watcherBase
subscribed []*dirWatch
closed []*dirWatch
}

func newCountingWatcherImpl() *countingWatcherImpl {
impl := &countingWatcherImpl{}
impl.watcherBase.init(impl)
return impl
}

func (b *countingWatcherImpl) start() error {
b.notifyStarted()
return nil
}

func (b *countingWatcherImpl) subscribe(w *dirWatch) error {
b.subscribed = append(b.subscribed, w)
return nil
}

func (b *countingWatcherImpl) closeWatch(w *dirWatch) error {
b.closed = append(b.closed, w)
return nil
}

func TestFastRecursiveWatcherConsolidatesSiblingDirectories(t *testing.T) {
t.Parallel()

root := t.TempDir()
parent := filepath.Join(root, "node_modules", ".bun")
if err := os.MkdirAll(parent, 0o755); err != nil {
t.Fatal(err)
}

var impl *countingWatcherImpl
watcherImpl := &watcher{
name: "fsevents",
factory: func() watcherImpl {
impl = newCountingWatcherImpl()
return impl
},
}

var subs []Watch
for i := range recursiveConsolidateThreshold + 2 {
dir := filepath.Join(parent, fmt.Sprintf("pkg%d", i))
if err := os.MkdirAll(dir, 0o755); err != nil {
t.Fatal(err)
}
sub, err := watcherImpl.WatchDirectory(dir, func([]Event, error) {})
if err != nil {
t.Fatal(err)
}
subs = append(subs, sub)
}
t.Cleanup(func() {
for _, sub := range subs {
_ = sub.Close()
}
})

if got := len(impl.subscribed); got != recursiveConsolidateThreshold {
t.Fatalf("expected %d subscriptions after consolidation, got %d", recursiveConsolidateThreshold, got)
}
consolidated := impl.subscribed[len(impl.subscribed)-1]
if consolidated.dir != parent || !consolidated.recursive {
t.Fatalf("expected consolidated recursive watch on %s, got dir=%s recursive=%v", parent, consolidated.dir, consolidated.recursive)
}

watcherImpl.mu.Lock()
_, hasPkgWatch := watcherImpl.dirWatches[watcherImpl.keyForDirWatch(filepath.Join(parent, "pkg11"), false)]
watcherImpl.mu.Unlock()
if hasPkgWatch {
t.Fatal("expected later package watch to reuse consolidated parent instead of creating its own stream")
}
}

func TestConsolidatedChildWatchFiltersAgainstRequestedDir(t *testing.T) {
t.Parallel()

parent := filepath.Join(t.TempDir(), "parent")
child := filepath.Join(parent, "child")
sibling := filepath.Join(parent, "sibling")
dw := newDirectWatcher(t, parent)

var got []Event
dw.watch(child, false, func(events []Event, err error) {
if err != nil {
t.Fatal(err)
}
got = append(got, events...)
}, nil)
dw.events.update(filepath.Join(child, "file.ts"))
dw.events.update(filepath.Join(child, "nested", "file.ts"))
dw.events.update(filepath.Join(sibling, "file.ts"))
dw.triggerCallbacks()

assertEventSequence(t, got, []wantEvent{{EventUpdate, filepath.Join(child, "file.ts")}})
}

// ----- errors ------------------------------------------------------------

func TestSubscribeMissingDirError(t *testing.T) {
Expand Down
11 changes: 10 additions & 1 deletion internal/lsp/lspwatcher/lspwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const throttleWindow = 75 * time.Millisecond

type watcherBackend interface {
WatchDirectory(dir string, fn fswatch.WatchCallback, opts ...fswatch.WatchOption) (io.Closer, error)
HasFastRecursiveBackend() bool
}

type defaultWatcherBackend struct {
Expand All @@ -35,6 +36,10 @@ func (d defaultWatcherBackend) WatchDirectory(dir string, fn fswatch.WatchCallba
return d.watcher.WatchDirectory(dir, fn, opts...)
}

func (d defaultWatcherBackend) HasFastRecursiveBackend() bool {
return d.watcher.HasFastRecursiveBackend()
}

// Watcher manages a set of file system subscriptions identified by
// WatcherID strings (matching the LSP server's project.WatcherID type).
// Events are delivered to onChanges in batches as `*lsproto.FileEvent`,
Expand Down Expand Up @@ -286,7 +291,11 @@ func (w *watch) reconcile(emitSyntheticCreates bool) error {
if !w.watchingTarget && w.subscription != nil && w.watchedDirectory == ancestorDirectory {
return nil // already watching the correct ancestor
}
subscription, err := watcher.backend.WatchDirectory(ancestorDirectory, w.ancestorCallback())
var options []fswatch.WatchOption
if watcher.backend.HasFastRecursiveBackend() {
options = append(options, fswatch.WithRecursive())
}
subscription, err := watcher.backend.WatchDirectory(ancestorDirectory, w.ancestorCallback(), options...)
if err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions internal/lsp/lspwatcher/lspwatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ type fakeBackend struct {
closed map[string]int
optCount map[string]int
failDirs map[string]error
fast bool
}

func newFakeBackend() *fakeBackend {
Expand Down Expand Up @@ -199,6 +200,10 @@ func (f *fakeBackend) WatchDirectory(dir string, fn fswatch.WatchCallback, opts
}}, nil
}

func (f *fakeBackend) HasFastRecursiveBackend() bool {
return f.fast
}

// watchedDirs returns the directories currently subscribed, for assertions.
func (f *fakeBackend) watchedDirs() []string {
f.mu.Lock()
Expand Down
Loading