Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions pkg/scrape/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,26 @@ func (m *Manager) reloader() {
}

func (m *Manager) reload() {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
var wg sync.WaitGroup
level.Debug(m.logger).Log("msg", "Reloading scrape manager")

type syncWork struct {
sp *scrapePool
groups []*targetgroup.Group
}

// Snapshot the work to do under the lock, but release it before waiting
// on pool syncs. Holding mtxScrape across sp.Sync() means a single slow
// or hung pool would block the scrape manager's Run() loop from draining
// new target sets, causing all pools to scrape stale endpoints.
m.mtxScrape.Lock()
work := make([]syncWork, 0, len(m.targetSets))
for setName, groups := range m.targetSets {
var sp *scrapePool
existing, ok := m.scrapePools[setName]
sp, ok := m.scrapePools[setName]
if !ok {
scrapeConfig, ok := m.scrapeConfigs[setName]
if !ok {
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
return
continue
}
sp = newScrapePool(
scrapeConfig,
Expand All @@ -221,16 +229,19 @@ func (m *Manager) reload() {
},
)
m.scrapePools[setName] = sp
} else {
sp = existing
}
work = append(work, syncWork{sp: sp, groups: groups})
}
m.mtxScrape.Unlock()

var wg sync.WaitGroup
for _, w := range work {
wg.Add(1)
// Run the sync in parallel as these take a while and at high load can't catch up.
go func(sp *scrapePool, groups []*targetgroup.Group) {
sp.Sync(groups)
wg.Done()
}(sp, groups)
go func(w syncWork) {
defer wg.Done()
w.sp.Sync(w.groups)
}(w)
}
wg.Wait()
}
Expand Down
30 changes: 17 additions & 13 deletions pkg/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
// It returns after all stopped scrape loops terminated.
func (sp *scrapePool) sync(targets []*Target) {
sp.mtx.Lock()
defer sp.mtx.Unlock()

var (
uniqueTargets = map[uint64]struct{}{}
Expand Down Expand Up @@ -276,27 +275,32 @@ func (sp *scrapePool) sync(targets []*Target) {
}
}

var wg sync.WaitGroup

// Stop and remove old targets and scraper loops.
// Collect loops to stop and remove them from the pool's state while
// still holding sp.mtx. The actual wait happens after the lock is
// released so a hung scrape loop cannot wedge other sp.mtx readers
// (and, transitively, the scrape manager's mtxScrape).
var toStop []loop
for hash := range sp.activeTargets {
if _, ok := uniqueTargets[hash]; !ok {
wg.Add(1)
go func(l loop) {
l.stop()

wg.Done()
}(sp.loops[hash])

toStop = append(toStop, sp.loops[hash])
delete(sp.loops, hash)
delete(sp.activeTargets, hash)
}
}
sp.mtx.Unlock()

// Wait for all potentially stopped scrapers to terminate.
// This covers the case of flapping targets. If the server is under high load, a new scraper
// may be active and tries to insert. The old scraper that didn't terminate yet could still
// be inserting a previous sample set.
var wg sync.WaitGroup
for _, l := range toStop {
wg.Add(1)
go func(l loop) {
l.stop()
wg.Done()
}(l)
}
wg.Wait()
}

Expand Down Expand Up @@ -461,7 +465,7 @@ mainLoop:

profileType := sl.target.labels.Get(ProfileName)

scrapeCtx, cancel := context.WithTimeout(sl.ctx, timeout)
scrapeCtx, cancel := context.WithTimeout(sl.scrapeCtx, timeout)
scrapeErr := sl.scraper.scrape(scrapeCtx, buf, profileType)
cancel()

Expand Down Expand Up @@ -582,7 +586,7 @@ func processScrapeResp(buf *bytes.Buffer, sl *scrapeLoop, profileType string) er
byt = newBuf.Bytes()
}

_, err = sl.store.WriteRaw(sl.ctx, &profilepb.WriteRawRequest{
_, err = sl.store.WriteRaw(sl.scrapeCtx, &profilepb.WriteRawRequest{
Normalized: sl.normalizedAddresses,
Series: []*profilepb.RawProfileSeries{
{
Expand Down
Loading