From b86f59be5bdf21dbd6adedd166791ce926ba2d65 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Tue, 14 Apr 2026 17:02:24 +0200 Subject: [PATCH] pkg/scrape: Never block discovery with scrapes The theory is that either scraping or pushing data might be keeping scrapers from updating their discovered targets. This holds target group locks for as short as possible and adds timeouts and the right contexts in various places. --- pkg/scrape/manager.go | 35 +++++++++++++++++++++++------------ pkg/scrape/scrape.go | 30 +++++++++++++++++------------- 2 files changed, 40 insertions(+), 25 deletions(-) diff --git a/pkg/scrape/manager.go b/pkg/scrape/manager.go index 49f70954091..732c854e8d6 100644 --- a/pkg/scrape/manager.go +++ b/pkg/scrape/manager.go @@ -190,18 +190,26 @@ func (m *Manager) reloader() { } func (m *Manager) reload() { - m.mtxScrape.Lock() - defer m.mtxScrape.Unlock() - var wg sync.WaitGroup level.Debug(m.logger).Log("msg", "Reloading scrape manager") + + type syncWork struct { + sp *scrapePool + groups []*targetgroup.Group + } + + // Snapshot the work to do under the lock, but release it before waiting + // on pool syncs. Holding mtxScrape across sp.Sync() means a single slow + // or hung pool would block the scrape manager's Run() loop from draining + // new target sets, causing all pools to scrape stale endpoints. + m.mtxScrape.Lock() + work := make([]syncWork, 0, len(m.targetSets)) for setName, groups := range m.targetSets { - var sp *scrapePool - existing, ok := m.scrapePools[setName] + sp, ok := m.scrapePools[setName] if !ok { scrapeConfig, ok := m.scrapeConfigs[setName] if !ok { level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName) - return + continue } sp = newScrapePool( scrapeConfig, @@ -221,16 +229,19 @@ func (m *Manager) reload() { }, ) m.scrapePools[setName] = sp - } else { - sp = existing } + work = append(work, syncWork{sp: sp, groups: groups}) + } + m.mtxScrape.Unlock() + var wg sync.WaitGroup + for _, w := range work { wg.Add(1) // Run the sync in parallel as these take a while and at high load can't catch up. - go func(sp *scrapePool, groups []*targetgroup.Group) { - sp.Sync(groups) - wg.Done() - }(sp, groups) + go func(w syncWork) { + defer wg.Done() + w.sp.Sync(w.groups) + }(w) } wg.Wait() } diff --git a/pkg/scrape/scrape.go b/pkg/scrape/scrape.go index 08a4d10f019..e082a2d4ab3 100644 --- a/pkg/scrape/scrape.go +++ b/pkg/scrape/scrape.go @@ -248,7 +248,6 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { // It returns after all stopped scrape loops terminated. func (sp *scrapePool) sync(targets []*Target) { sp.mtx.Lock() - defer sp.mtx.Unlock() var ( uniqueTargets = map[uint64]struct{}{} @@ -276,27 +275,32 @@ func (sp *scrapePool) sync(targets []*Target) { } } - var wg sync.WaitGroup - - // Stop and remove old targets and scraper loops. + // Collect loops to stop and remove them from the pool's state while + // still holding sp.mtx. The actual wait happens after the lock is + // released so a hung scrape loop cannot wedge other sp.mtx readers + // (and, transitively, the scrape manager's mtxScrape). + var toStop []loop for hash := range sp.activeTargets { if _, ok := uniqueTargets[hash]; !ok { - wg.Add(1) - go func(l loop) { - l.stop() - - wg.Done() - }(sp.loops[hash]) - + toStop = append(toStop, sp.loops[hash]) delete(sp.loops, hash) delete(sp.activeTargets, hash) } } + sp.mtx.Unlock() // Wait for all potentially stopped scrapers to terminate. // This covers the case of flapping targets. If the server is under high load, a new scraper // may be active and tries to insert. The old scraper that didn't terminate yet could still // be inserting a previous sample set. + var wg sync.WaitGroup + for _, l := range toStop { + wg.Add(1) + go func(l loop) { + l.stop() + wg.Done() + }(l) + } wg.Wait() } @@ -461,7 +465,7 @@ mainLoop: profileType := sl.target.labels.Get(ProfileName) - scrapeCtx, cancel := context.WithTimeout(sl.ctx, timeout) + scrapeCtx, cancel := context.WithTimeout(sl.scrapeCtx, timeout) scrapeErr := sl.scraper.scrape(scrapeCtx, buf, profileType) cancel() @@ -582,7 +586,7 @@ func processScrapeResp(buf *bytes.Buffer, sl *scrapeLoop, profileType string) er byt = newBuf.Bytes() } - _, err = sl.store.WriteRaw(sl.ctx, &profilepb.WriteRawRequest{ + _, err = sl.store.WriteRaw(sl.scrapeCtx, &profilepb.WriteRawRequest{ Normalized: sl.normalizedAddresses, Series: []*profilepb.RawProfileSeries{ {