Skip to content

Commit f836260

Browse files
authored
pkg/scrape: Never block discovery with scrapes (#6327)
The theory is that either scraping or pushing data might be keeping scrapers from updating their discovered targets. This holds target group locks for as short as possible and adds timeouts and the right contexts in various places.
1 parent da48a59 commit f836260

2 files changed

Lines changed: 40 additions & 25 deletions

File tree

pkg/scrape/manager.go

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -190,18 +190,26 @@ func (m *Manager) reloader() {
190190
}
191191

192192
func (m *Manager) reload() {
193-
m.mtxScrape.Lock()
194-
defer m.mtxScrape.Unlock()
195-
var wg sync.WaitGroup
196193
level.Debug(m.logger).Log("msg", "Reloading scrape manager")
194+
195+
type syncWork struct {
196+
sp *scrapePool
197+
groups []*targetgroup.Group
198+
}
199+
200+
// Snapshot the work to do under the lock, but release it before waiting
201+
// on pool syncs. Holding mtxScrape across sp.Sync() means a single slow
202+
// or hung pool would block the scrape manager's Run() loop from draining
203+
// new target sets, causing all pools to scrape stale endpoints.
204+
m.mtxScrape.Lock()
205+
work := make([]syncWork, 0, len(m.targetSets))
197206
for setName, groups := range m.targetSets {
198-
var sp *scrapePool
199-
existing, ok := m.scrapePools[setName]
207+
sp, ok := m.scrapePools[setName]
200208
if !ok {
201209
scrapeConfig, ok := m.scrapeConfigs[setName]
202210
if !ok {
203211
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
204-
return
212+
continue
205213
}
206214
sp = newScrapePool(
207215
scrapeConfig,
@@ -221,16 +229,19 @@ func (m *Manager) reload() {
221229
},
222230
)
223231
m.scrapePools[setName] = sp
224-
} else {
225-
sp = existing
226232
}
233+
work = append(work, syncWork{sp: sp, groups: groups})
234+
}
235+
m.mtxScrape.Unlock()
227236

237+
var wg sync.WaitGroup
238+
for _, w := range work {
228239
wg.Add(1)
229240
// Run the sync in parallel as these take a while and at high load can't catch up.
230-
go func(sp *scrapePool, groups []*targetgroup.Group) {
231-
sp.Sync(groups)
232-
wg.Done()
233-
}(sp, groups)
241+
go func(w syncWork) {
242+
defer wg.Done()
243+
w.sp.Sync(w.groups)
244+
}(w)
234245
}
235246
wg.Wait()
236247
}

pkg/scrape/scrape.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,6 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
248248
// It returns after all stopped scrape loops terminated.
249249
func (sp *scrapePool) sync(targets []*Target) {
250250
sp.mtx.Lock()
251-
defer sp.mtx.Unlock()
252251

253252
var (
254253
uniqueTargets = map[uint64]struct{}{}
@@ -276,27 +275,32 @@ func (sp *scrapePool) sync(targets []*Target) {
276275
}
277276
}
278277

279-
var wg sync.WaitGroup
280-
281-
// Stop and remove old targets and scraper loops.
278+
// Collect loops to stop and remove them from the pool's state while
279+
// still holding sp.mtx. The actual wait happens after the lock is
280+
// released so a hung scrape loop cannot wedge other sp.mtx readers
281+
// (and, transitively, the scrape manager's mtxScrape).
282+
var toStop []loop
282283
for hash := range sp.activeTargets {
283284
if _, ok := uniqueTargets[hash]; !ok {
284-
wg.Add(1)
285-
go func(l loop) {
286-
l.stop()
287-
288-
wg.Done()
289-
}(sp.loops[hash])
290-
285+
toStop = append(toStop, sp.loops[hash])
291286
delete(sp.loops, hash)
292287
delete(sp.activeTargets, hash)
293288
}
294289
}
290+
sp.mtx.Unlock()
295291

296292
// Wait for all potentially stopped scrapers to terminate.
297293
// This covers the case of flapping targets. If the server is under high load, a new scraper
298294
// may be active and tries to insert. The old scraper that didn't terminate yet could still
299295
// be inserting a previous sample set.
296+
var wg sync.WaitGroup
297+
for _, l := range toStop {
298+
wg.Add(1)
299+
go func(l loop) {
300+
l.stop()
301+
wg.Done()
302+
}(l)
303+
}
300304
wg.Wait()
301305
}
302306

@@ -461,7 +465,7 @@ mainLoop:
461465

462466
profileType := sl.target.labels.Get(ProfileName)
463467

464-
scrapeCtx, cancel := context.WithTimeout(sl.ctx, timeout)
468+
scrapeCtx, cancel := context.WithTimeout(sl.scrapeCtx, timeout)
465469
scrapeErr := sl.scraper.scrape(scrapeCtx, buf, profileType)
466470
cancel()
467471

@@ -582,7 +586,7 @@ func processScrapeResp(buf *bytes.Buffer, sl *scrapeLoop, profileType string) er
582586
byt = newBuf.Bytes()
583587
}
584588

585-
_, err = sl.store.WriteRaw(sl.ctx, &profilepb.WriteRawRequest{
589+
_, err = sl.store.WriteRaw(sl.scrapeCtx, &profilepb.WriteRawRequest{
586590
Normalized: sl.normalizedAddresses,
587591
Series: []*profilepb.RawProfileSeries{
588592
{

0 commit comments

Comments
 (0)